• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2016 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #define LOG_TAG "async_manager"
18 
19 #include "async_manager.h"
20 
21 #include <algorithm>
22 #include <atomic>
23 #include <condition_variable>
24 #include <mutex>
25 #include <thread>
26 #include <vector>
27 #include "fcntl.h"
28 #include "sys/select.h"
29 #include "unistd.h"
30 
31 namespace test_vendor_lib {
32 // Implementation of AsyncManager is divided between two classes, three if
33 // AsyncManager itself is taken into account, but its only responsability
34 // besides being a proxy for the other two classes is to provide a global
35 // synchronization mechanism for callbacks and client code to use.
36 
37 // The watching of file descriptors is done through AsyncFdWatcher. Several
38 // objects of this class may coexist simultaneosly as they share no state.
39 // After construction of this objects nothing happens beyond some very simple
40 // member initialization. When the first FD is set up for watching the object
41 // starts a new thread which watches the given (and later provided) FDs using
42 // select() inside a loop. A special FD (a pipe) is also watched which is
43 // used to notify the thread of internal changes on the object state (like
44 // the addition of new FDs to watch on). Every access to internal state is
45 // synchronized using a single internal mutex. The thread is only stopped on
46 // destruction of the object, by modifying a flag, which is the only member
47 // variable accessed without acquiring the lock (because the notification to
48 // the thread is done later by writing to a pipe which means the thread will
49 // be notified regardless of what phase of the loop it is in that moment)
50 
51 // The scheduling of asynchronous tasks, periodic or not, is handled by the
52 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
53 // state between different instances so it is safe to use several objects of
54 // this class, also nothing interesting happens upon construction, but only
55 // after a Task has been scheduled and access to internal state is synchronized
56 // using a single internal mutex. When the first task is scheduled a thread
57 // is started which monitors a queue of tasks. The queue is peeked to see
58 // when the next task should be carried out and then the thread performs a
59 // (absolute) timed wait on a condition variable. The wait ends because of a
60 // time out or a notify on the cond var, the former means a task is due
61 // for execution while the later means there has been a change in internal
62 // state, like a task has been scheduled/canceled or the flag to stop has
63 // been set. Setting and querying the stop flag or modifying the task queue
64 // and subsequent notification on the cond var is done atomically (e.g while
65 // holding the lock on the internal mutex) to ensure that the thread never
66 // misses the notification, since notifying a cond var is not persistent as
67 // writing on a pipe (if not done this way, the thread could query the
68 // stopping flag and be put aside by the OS scheduler right after, then the
69 // 'stop thread' procedure could run, setting the flag, notifying a cond
70 // var that no one is waiting on and joining the thread, the thread then
71 // resumes execution believing that it needs to continue and waits on the
72 // cond var possibly forever if there are no tasks scheduled, efectively
73 // causing a deadlock).
74 
75 // This number also states the maximum number of scheduled tasks we can handle
76 // at a given time
77 static const uint16_t kMaxTaskId =
78     -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)79 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
80   return (id == kMaxTaskId) ? 1 : id + 1;
81 }
82 // The buffer is only 10 bytes because the expected number of bytes
83 // written on this socket is 1. It is possible that the thread is notified
84 // more than once but highly unlikely, so a buffer of size 10 seems enough
85 // and the reads are performed inside a while just in case it isn't. From
86 // the thread routine's point of view it is the same to have been notified
87 // just once or 100 times so it just tries to consume the entire buffer.
88 // In the cases where an interrupt would cause read to return without
89 // having read everything that was available a new iteration of the thread
90 // loop will bring execution to this point almost immediately, so there is
91 // no need to treat that case.
92 static const int kNotificationBufferSize = 10;
93 
94 // Async File Descriptor Watcher Implementation:
95 class AsyncManager::AsyncFdWatcher {
96  public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)97   int WatchFdForNonBlockingReads(
98       int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
99     // add file descriptor and callback
100     {
101       std::unique_lock<std::mutex> guard(internal_mutex_);
102       watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
103     }
104 
105     // start the thread if not started yet
106     int started = tryStartThread();
107     if (started != 0) {
108       LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
109       return started;
110     }
111 
112     // notify the thread so that it knows of the new FD
113     notifyThread();
114 
115     return 0;
116   }
117 
StopWatchingFileDescriptor(int file_descriptor)118   void StopWatchingFileDescriptor(int file_descriptor) {
119     std::unique_lock<std::mutex> guard(internal_mutex_);
120     watched_shared_fds_.erase(file_descriptor);
121   }
122 
123   AsyncFdWatcher() = default;
124 
125   ~AsyncFdWatcher() = default;
126 
stopThread()127   int stopThread() {
128     if (!std::atomic_exchange(&running_, false)) {
129       return 0;  // if not running already
130     }
131 
132     notifyThread();
133 
134     if (std::this_thread::get_id() != thread_.get_id()) {
135       thread_.join();
136     } else {
137       LOG_WARN(LOG_TAG,
138                "%s: Starting thread stop from inside the reading thread itself",
139                __func__);
140     }
141 
142     {
143       std::unique_lock<std::mutex> guard(internal_mutex_);
144       watched_shared_fds_.clear();
145     }
146 
147     return 0;
148   }
149 
150  private:
151   AsyncFdWatcher(const AsyncFdWatcher&) = delete;
152   AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
153 
154   // Make sure to call this with at least one file descriptor ready to be
155   // watched upon or the thread routine will return immediately
tryStartThread()156   int tryStartThread() {
157     if (std::atomic_exchange(&running_, true)) {
158       return 0;  // if already running
159     }
160     // set up the communication channel
161     int pipe_fds[2];
162     if (pipe2(pipe_fds, O_NONBLOCK)) {
163       LOG_ERROR(LOG_TAG,
164                 "%s:Unable to establish a communication channel to the reading "
165                 "thread",
166                 __func__);
167       return -1;
168     }
169     notification_listen_fd_ = pipe_fds[0];
170     notification_write_fd_ = pipe_fds[1];
171 
172     thread_ = std::thread([this]() { ThreadRoutine(); });
173     if (!thread_.joinable()) {
174       LOG_ERROR(LOG_TAG, "%s: Unable to start reading thread", __func__);
175       return -1;
176     }
177     return 0;
178   }
179 
notifyThread()180   int notifyThread() {
181     char buffer = '0';
182     if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
183       LOG_ERROR(LOG_TAG, "%s: Unable to send message to reading thread",
184                 __func__);
185       return -1;
186     }
187     return 0;
188   }
189 
setUpFileDescriptorSet(fd_set & read_fds)190   int setUpFileDescriptorSet(fd_set& read_fds) {
191     // add comm channel to the set
192     FD_SET(notification_listen_fd_, &read_fds);
193     int nfds = notification_listen_fd_;
194 
195     // add watched FDs to the set
196     {
197       std::unique_lock<std::mutex> guard(internal_mutex_);
198       for (auto& fdp : watched_shared_fds_) {
199         FD_SET(fdp.first, &read_fds);
200         nfds = std::max(fdp.first, nfds);
201       }
202     }
203     return nfds;
204   }
205 
206   // check the comm channel and read everything there
consumeThreadNotifications(fd_set & read_fds)207   bool consumeThreadNotifications(fd_set& read_fds) {
208     if (FD_ISSET(notification_listen_fd_, &read_fds)) {
209       char buffer[kNotificationBufferSize];
210       while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer,
211                                      kNotificationBufferSize)) ==
212              kNotificationBufferSize) {
213       }
214       return true;
215     }
216     return false;
217   }
218 
219   // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(fd_set & read_fds)220   void runAppropriateCallbacks(fd_set& read_fds) {
221     // not a good idea to call a callback while holding the FD lock,
222     // nor to release the lock while traversing the map
223     std::vector<decltype(watched_shared_fds_)::value_type> fds;
224     {
225       std::unique_lock<std::mutex> guard(internal_mutex_);
226       for (auto& fdc : watched_shared_fds_) {
227         if (FD_ISSET(fdc.first, &read_fds)) {
228           fds.push_back(fdc);
229         }
230       }
231     }
232     for (auto& p : fds) {
233       p.second(p.first);
234     }
235   }
236 
ThreadRoutine()237   void ThreadRoutine() {
238     while (running_) {
239       fd_set read_fds;
240       FD_ZERO(&read_fds);
241       int nfds = setUpFileDescriptorSet(read_fds);
242 
243       // wait until there is data available to read on some FD
244       int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
245       if (retval <= 0) {  // there was some error or a timeout
246         LOG_ERROR(LOG_TAG,
247                   "%s: There was an error while waiting for data on the file "
248                   "descriptors",
249                   __func__);
250         continue;
251       }
252 
253       consumeThreadNotifications(read_fds);
254 
255       // Do not read if there was a call to stop running
256       if (!running_) {
257         break;
258       }
259 
260       runAppropriateCallbacks(read_fds);
261     }
262   }
263 
264   std::atomic_bool running_{false};
265   std::thread thread_;
266   std::mutex internal_mutex_;
267 
268   std::map<int, ReadCallback> watched_shared_fds_;
269 
270   // A pair of FD to send information to the reading thread
271   int notification_listen_fd_;
272   int notification_write_fd_;
273 };
274 
275 // Async task manager implementation
276 class AsyncManager::AsyncTaskManager {
277  public:
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)278   AsyncTaskId ExecAsync(std::chrono::milliseconds delay,
279                         const TaskCallback& callback) {
280     return scheduleTask(std::make_shared<Task>(
281         std::chrono::steady_clock::now() + delay, callback));
282   }
283 
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)284   AsyncTaskId ExecAsyncPeriodically(std::chrono::milliseconds delay,
285                                     std::chrono::milliseconds period,
286                                     const TaskCallback& callback) {
287     return scheduleTask(std::make_shared<Task>(
288         std::chrono::steady_clock::now() + delay, period, callback));
289   }
290 
CancelAsyncTask(AsyncTaskId async_task_id)291   bool CancelAsyncTask(AsyncTaskId async_task_id) {
292     // remove task from queue (and task id asociation) while holding lock
293     std::unique_lock<std::mutex> guard(internal_mutex_);
294     if (tasks_by_id.count(async_task_id) == 0) {
295       return false;
296     }
297     task_queue_.erase(tasks_by_id[async_task_id]);
298     tasks_by_id.erase(async_task_id);
299     return true;
300   }
301 
302   AsyncTaskManager() = default;
303 
304   ~AsyncTaskManager() = default;
305 
stopThread()306   int stopThread() {
307     {
308       std::unique_lock<std::mutex> guard(internal_mutex_);
309       tasks_by_id.clear();
310       task_queue_.clear();
311       if (!running_) {
312         return 0;
313       }
314       running_ = false;
315       // notify the thread
316       internal_cond_var_.notify_one();
317     }  // release the lock before joining a thread that is likely waiting for it
318     if (std::this_thread::get_id() != thread_.get_id()) {
319       thread_.join();
320     } else {
321       LOG_WARN(LOG_TAG,
322                "%s: Starting thread stop from inside the task thread itself",
323                __func__);
324     }
325     return 0;
326   }
327 
328  private:
329   // Holds the data for each task
330   class Task {
331    public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback)332     Task(std::chrono::steady_clock::time_point time,
333          std::chrono::milliseconds period, const TaskCallback& callback)
334         : time(time),
335           periodic(true),
336           period(period),
337           callback(callback),
338           task_id(kInvalidTaskId) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback)339     Task(std::chrono::steady_clock::time_point time,
340          const TaskCallback& callback)
341         : time(time),
342           periodic(false),
343           callback(callback),
344           task_id(kInvalidTaskId) {}
345 
346     // Operators needed to be in a collection
operator <(const Task & another) const347     bool operator<(const Task& another) const {
348       return std::make_pair(time, task_id) <
349              std::make_pair(another.time, another.task_id);
350     }
351 
isPeriodic() const352     bool isPeriodic() const { return periodic; }
353 
354     // These fields should no longer be public if the class ever becomes
355     // public or gets more complex
356     std::chrono::steady_clock::time_point time;
357     bool periodic;
358     std::chrono::milliseconds period;
359     TaskCallback callback;
360     AsyncTaskId task_id;
361   };
362 
363   // A comparator class to put shared pointers to tasks in an ordered set
364   struct task_p_comparator {
operator ()test_vendor_lib::AsyncManager::AsyncTaskManager::task_p_comparator365     bool operator()(const std::shared_ptr<Task>& t1,
366                     const std::shared_ptr<Task>& t2) const {
367       return *t1 < *t2;
368     }
369   };
370 
371   AsyncTaskManager(const AsyncTaskManager&) = delete;
372   AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
373 
scheduleTask(const std::shared_ptr<Task> & task)374   AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
375     AsyncTaskId task_id = kInvalidTaskId;
376     {
377       std::unique_lock<std::mutex> guard(internal_mutex_);
378       // no more room for new tasks, we need a larger type for IDs
379       if (tasks_by_id.size() == kMaxTaskId)  // TODO potentially type unsafe
380         return kInvalidTaskId;
381       do {
382         lastTaskId_ = NextAsyncTaskId(lastTaskId_);
383       } while (isTaskIdInUse(lastTaskId_));
384       task->task_id = lastTaskId_;
385       // add task to the queue and map
386       tasks_by_id[lastTaskId_] = task;
387       task_queue_.insert(task);
388       task_id = lastTaskId_;
389     }
390     // start thread if necessary
391     int started = tryStartThread();
392     if (started != 0) {
393       LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
394       return kInvalidTaskId;
395     }
396     // notify the thread so that it knows of the new task
397     internal_cond_var_.notify_one();
398     // return task id
399     return task_id;
400   }
401 
isTaskIdInUse(const AsyncTaskId & task_id) const402   bool isTaskIdInUse(const AsyncTaskId& task_id) const {
403     return tasks_by_id.count(task_id) != 0;
404   }
405 
tryStartThread()406   int tryStartThread() {
407     // need the lock because of the running flag and the cond var
408     std::unique_lock<std::mutex> guard(internal_mutex_);
409     // check that the thread is not yet running
410     if (running_) {
411       return 0;
412     }
413     // start the thread
414     running_ = true;
415     thread_ = std::thread([this]() { ThreadRoutine(); });
416     if (!thread_.joinable()) {
417       LOG_ERROR(LOG_TAG, "%s: Unable to start task thread", __func__);
418       return -1;
419     }
420     return 0;
421   }
422 
ThreadRoutine()423   void ThreadRoutine() {
424     while (1) {
425       TaskCallback callback;
426       bool run_it = false;
427       {
428         std::unique_lock<std::mutex> guard(internal_mutex_);
429         if (!task_queue_.empty()) {
430           std::shared_ptr<Task> task_p = *(task_queue_.begin());
431           if (task_p->time < std::chrono::steady_clock::now()) {
432             run_it = true;
433             callback = task_p->callback;
434             task_queue_.erase(task_p);  // need to remove and add again if
435                                         // periodic to update order
436             if (task_p->isPeriodic()) {
437               task_p->time += task_p->period;
438               task_queue_.insert(task_p);
439             } else {
440               tasks_by_id.erase(task_p->task_id);
441             }
442           }
443         }
444       }
445       if (run_it) {
446         callback();
447       }
448       {
449         std::unique_lock<std::mutex> guard(internal_mutex_);
450         // wait on condition variable with timeout just in time for next task if
451         // any
452         if (task_queue_.size() > 0) {
453           internal_cond_var_.wait_until(guard, (*task_queue_.begin())->time);
454         } else {
455           internal_cond_var_.wait(guard);
456         }
457         // check for termination right after being notified (and maybe before?)
458         if (!running_) break;
459       }
460     }
461   }
462 
463   bool running_ = false;
464   std::thread thread_;
465   std::mutex internal_mutex_;
466   std::condition_variable internal_cond_var_;
467 
468   AsyncTaskId lastTaskId_ = kInvalidTaskId;
469   std::map<AsyncTaskId, std::shared_ptr<Task> > tasks_by_id;
470   std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
471 };
472 
473 // Async Manager Implementation:
AsyncManager()474 AsyncManager::AsyncManager()
475     : fdWatcher_p_(new AsyncFdWatcher()),
476       taskManager_p_(new AsyncTaskManager()) {}
477 
~AsyncManager()478 AsyncManager::~AsyncManager() {
479   // Make sure the threads are stopped before destroying the object.
480   // The threads need to be stopped here and not in each internal class'
481   // destructor because unique_ptr's reset() first assigns nullptr to the
482   // pointer and only then calls the destructor, so any callback running
483   // on these threads would dereference a null pointer if they called a member
484   // function of this class.
485   fdWatcher_p_->stopThread();
486   taskManager_p_->stopThread();
487 }
488 
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)489 int AsyncManager::WatchFdForNonBlockingReads(
490     int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
491   return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor,
492                                                   on_read_fd_ready_callback);
493 }
494 
StopWatchingFileDescriptor(int file_descriptor)495 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
496   fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
497 }
498 
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)499 AsyncTaskId AsyncManager::ExecAsync(std::chrono::milliseconds delay,
500                                     const TaskCallback& callback) {
501   return taskManager_p_->ExecAsync(delay, callback);
502 }
503 
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)504 AsyncTaskId AsyncManager::ExecAsyncPeriodically(
505     std::chrono::milliseconds delay, std::chrono::milliseconds period,
506     const TaskCallback& callback) {
507   return taskManager_p_->ExecAsyncPeriodically(delay, period, callback);
508 }
509 
CancelAsyncTask(AsyncTaskId async_task_id)510 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
511   return taskManager_p_->CancelAsyncTask(async_task_id);
512 }
513 
Synchronize(const CriticalCallback & critical)514 void AsyncManager::Synchronize(const CriticalCallback& critical) {
515   std::unique_lock<std::mutex> guard(synchronization_mutex_);
516   critical();
517 }
518 }
519