• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "async_manager"
18 
19 #include "async_manager.h"
20 
21 #include "osi/include/log.h"
22 
23 #include <algorithm>
24 #include <atomic>
25 #include <condition_variable>
26 #include <mutex>
27 #include <thread>
28 #include <vector>
29 #include "fcntl.h"
30 #include "sys/select.h"
31 #include "unistd.h"
32 
33 namespace test_vendor_lib {
34 // Implementation of AsyncManager is divided between two classes, three if
35 // AsyncManager itself is taken into account, but its only responsability
36 // besides being a proxy for the other two classes is to provide a global
37 // synchronization mechanism for callbacks and client code to use.
38 
39 // The watching of file descriptors is done through AsyncFdWatcher. Several
40 // objects of this class may coexist simultaneosly as they share no state.
41 // After construction of this objects nothing happens beyond some very simple
42 // member initialization. When the first FD is set up for watching the object
43 // starts a new thread which watches the given (and later provided) FDs using
44 // select() inside a loop. A special FD (a pipe) is also watched which is
45 // used to notify the thread of internal changes on the object state (like
46 // the addition of new FDs to watch on). Every access to internal state is
47 // synchronized using a single internal mutex. The thread is only stopped on
48 // destruction of the object, by modifying a flag, which is the only member
49 // variable accessed without acquiring the lock (because the notification to
50 // the thread is done later by writing to a pipe which means the thread will
51 // be notified regardless of what phase of the loop it is in that moment)
52 
53 // The scheduling of asynchronous tasks, periodic or not, is handled by the
54 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
55 // state between different instances so it is safe to use several objects of
56 // this class, also nothing interesting happens upon construction, but only
57 // after a Task has been scheduled and access to internal state is synchronized
58 // using a single internal mutex. When the first task is scheduled a thread
59 // is started which monitors a queue of tasks. The queue is peeked to see
60 // when the next task should be carried out and then the thread performs a
61 // (absolute) timed wait on a condition variable. The wait ends because of a
62 // time out or a notify on the cond var, the former means a task is due
63 // for execution while the later means there has been a change in internal
64 // state, like a task has been scheduled/canceled or the flag to stop has
65 // been set. Setting and querying the stop flag or modifying the task queue
66 // and subsequent notification on the cond var is done atomically (e.g while
67 // holding the lock on the internal mutex) to ensure that the thread never
68 // misses the notification, since notifying a cond var is not persistent as
69 // writing on a pipe (if not done this way, the thread could query the
70 // stopping flag and be put aside by the OS scheduler right after, then the
71 // 'stop thread' procedure could run, setting the flag, notifying a cond
72 // var that no one is waiting on and joining the thread, the thread then
73 // resumes execution believing that it needs to continue and waits on the
74 // cond var possibly forever if there are no tasks scheduled, efectively
75 // causing a deadlock).
76 
77 // This number also states the maximum number of scheduled tasks we can handle
78 // at a given time
79 static const uint16_t kMaxTaskId =
80     -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)81 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
82   return (id == kMaxTaskId) ? 1 : id + 1;
83 }
84 // The buffer is only 10 bytes because the expected number of bytes
85 // written on this socket is 1. It is possible that the thread is notified
86 // more than once but highly unlikely, so a buffer of size 10 seems enough
87 // and the reads are performed inside a while just in case it isn't. From
88 // the thread routine's point of view it is the same to have been notified
89 // just once or 100 times so it just tries to consume the entire buffer.
90 // In the cases where an interrupt would cause read to return without
91 // having read everything that was available a new iteration of the thread
92 // loop will bring execution to this point almost immediately, so there is
93 // no need to treat that case.
94 static const int kNotificationBufferSize = 10;
95 
96 // Async File Descriptor Watcher Implementation:
97 class AsyncManager::AsyncFdWatcher {
98  public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)99   int WatchFdForNonBlockingReads(
100       int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
101     // add file descriptor and callback
102     {
103       std::unique_lock<std::mutex> guard(internal_mutex_);
104       watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
105     }
106 
107     // start the thread if not started yet
108     int started = tryStartThread();
109     if (started != 0) {
110       LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
111       return started;
112     }
113 
114     // notify the thread so that it knows of the new FD
115     notifyThread();
116 
117     return 0;
118   }
119 
StopWatchingFileDescriptor(int file_descriptor)120   void StopWatchingFileDescriptor(int file_descriptor) {
121     std::unique_lock<std::mutex> guard(internal_mutex_);
122     watched_shared_fds_.erase(file_descriptor);
123   }
124 
125   AsyncFdWatcher() = default;
126 
127   ~AsyncFdWatcher() = default;
128 
stopThread()129   int stopThread() {
130     if (!std::atomic_exchange(&running_, false)) {
131       return 0;  // if not running already
132     }
133 
134     notifyThread();
135 
136     if (std::this_thread::get_id() != thread_.get_id()) {
137       thread_.join();
138     } else {
139       LOG_WARN(LOG_TAG,
140                "%s: Starting thread stop from inside the reading thread itself",
141                __func__);
142     }
143 
144     {
145       std::unique_lock<std::mutex> guard(internal_mutex_);
146       watched_shared_fds_.clear();
147     }
148 
149     return 0;
150   }
151 
152  private:
153   AsyncFdWatcher(const AsyncFdWatcher&) = delete;
154   AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
155 
156   // Make sure to call this with at least one file descriptor ready to be
157   // watched upon or the thread routine will return immediately
tryStartThread()158   int tryStartThread() {
159     if (std::atomic_exchange(&running_, true)) {
160       return 0;  // if already running
161     }
162     // set up the communication channel
163     int pipe_fds[2];
164     if (pipe2(pipe_fds, O_NONBLOCK)) {
165       LOG_ERROR(LOG_TAG,
166                 "%s:Unable to establish a communication channel to the reading "
167                 "thread",
168                 __func__);
169       return -1;
170     }
171     notification_listen_fd_ = pipe_fds[0];
172     notification_write_fd_ = pipe_fds[1];
173 
174     thread_ = std::thread([this]() { ThreadRoutine(); });
175     if (!thread_.joinable()) {
176       LOG_ERROR(LOG_TAG, "%s: Unable to start reading thread", __func__);
177       return -1;
178     }
179     return 0;
180   }
181 
notifyThread()182   int notifyThread() {
183     char buffer = '0';
184     if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
185       LOG_ERROR(LOG_TAG, "%s: Unable to send message to reading thread",
186                 __func__);
187       return -1;
188     }
189     return 0;
190   }
191 
setUpFileDescriptorSet(fd_set & read_fds)192   int setUpFileDescriptorSet(fd_set& read_fds) {
193     // add comm channel to the set
194     FD_SET(notification_listen_fd_, &read_fds);
195     int nfds = notification_listen_fd_;
196 
197     // add watched FDs to the set
198     {
199       std::unique_lock<std::mutex> guard(internal_mutex_);
200       for (auto& fdp : watched_shared_fds_) {
201         FD_SET(fdp.first, &read_fds);
202         nfds = std::max(fdp.first, nfds);
203       }
204     }
205     return nfds;
206   }
207 
208   // check the comm channel and read everything there
consumeThreadNotifications(fd_set & read_fds)209   bool consumeThreadNotifications(fd_set& read_fds) {
210     if (FD_ISSET(notification_listen_fd_, &read_fds)) {
211       char buffer[kNotificationBufferSize];
212       while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer,
213                                      kNotificationBufferSize)) ==
214              kNotificationBufferSize) {
215       }
216       return true;
217     }
218     return false;
219   }
220 
221   // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(fd_set & read_fds)222   void runAppropriateCallbacks(fd_set& read_fds) {
223     // not a good idea to call a callback while holding the FD lock,
224     // nor to release the lock while traversing the map
225     std::vector<decltype(watched_shared_fds_)::value_type> fds;
226     {
227       std::unique_lock<std::mutex> guard(internal_mutex_);
228       for (auto& fdc : watched_shared_fds_) {
229         if (FD_ISSET(fdc.first, &read_fds)) {
230           fds.push_back(fdc);
231         }
232       }
233     }
234     for (auto& p : fds) {
235       p.second(p.first);
236     }
237   }
238 
ThreadRoutine()239   void ThreadRoutine() {
240     while (running_) {
241       fd_set read_fds;
242       FD_ZERO(&read_fds);
243       int nfds = setUpFileDescriptorSet(read_fds);
244 
245       // wait until there is data available to read on some FD
246       int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
247       if (retval <= 0) {  // there was some error or a timeout
248         LOG_ERROR(LOG_TAG,
249                   "%s: There was an error while waiting for data on the file "
250                   "descriptors",
251                   __func__);
252         continue;
253       }
254 
255       consumeThreadNotifications(read_fds);
256 
257       // Do not read if there was a call to stop running
258       if (!running_) {
259         break;
260       }
261 
262       runAppropriateCallbacks(read_fds);
263     }
264   }
265 
266   std::atomic_bool running_{false};
267   std::thread thread_;
268   std::mutex internal_mutex_;
269 
270   std::map<int, ReadCallback> watched_shared_fds_;
271 
272   // A pair of FD to send information to the reading thread
273   int notification_listen_fd_;
274   int notification_write_fd_;
275 };
276 
277 // Async task manager implementation
278 class AsyncManager::AsyncTaskManager {
279  public:
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)280   AsyncTaskId ExecAsync(std::chrono::milliseconds delay,
281                         const TaskCallback& callback) {
282     return scheduleTask(std::make_shared<Task>(
283         std::chrono::steady_clock::now() + delay, callback));
284   }
285 
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)286   AsyncTaskId ExecAsyncPeriodically(std::chrono::milliseconds delay,
287                                     std::chrono::milliseconds period,
288                                     const TaskCallback& callback) {
289     return scheduleTask(std::make_shared<Task>(
290         std::chrono::steady_clock::now() + delay, period, callback));
291   }
292 
CancelAsyncTask(AsyncTaskId async_task_id)293   bool CancelAsyncTask(AsyncTaskId async_task_id) {
294     // remove task from queue (and task id asociation) while holding lock
295     std::unique_lock<std::mutex> guard(internal_mutex_);
296     if (tasks_by_id.count(async_task_id) == 0) {
297       return false;
298     }
299     task_queue_.erase(tasks_by_id[async_task_id]);
300     tasks_by_id.erase(async_task_id);
301     return true;
302   }
303 
304   AsyncTaskManager() = default;
305 
306   ~AsyncTaskManager() = default;
307 
stopThread()308   int stopThread() {
309     {
310       std::unique_lock<std::mutex> guard(internal_mutex_);
311       tasks_by_id.clear();
312       task_queue_.clear();
313       if (!running_) {
314         return 0;
315       }
316       running_ = false;
317       // notify the thread
318       internal_cond_var_.notify_one();
319     }  // release the lock before joining a thread that is likely waiting for it
320     if (std::this_thread::get_id() != thread_.get_id()) {
321       thread_.join();
322     } else {
323       LOG_WARN(LOG_TAG,
324                "%s: Starting thread stop from inside the task thread itself",
325                __func__);
326     }
327     return 0;
328   }
329 
330  private:
331   // Holds the data for each task
332   class Task {
333    public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback)334     Task(std::chrono::steady_clock::time_point time,
335          std::chrono::milliseconds period, const TaskCallback& callback)
336         : time(time),
337           periodic(true),
338           period(period),
339           callback(callback),
340           task_id(kInvalidTaskId) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback)341     Task(std::chrono::steady_clock::time_point time,
342          const TaskCallback& callback)
343         : time(time),
344           periodic(false),
345           callback(callback),
346           task_id(kInvalidTaskId) {}
347 
348     // Operators needed to be in a collection
operator <(const Task & another) const349     bool operator<(const Task& another) const {
350       return std::make_pair(time, task_id) <
351              std::make_pair(another.time, another.task_id);
352     }
353 
isPeriodic() const354     bool isPeriodic() const { return periodic; }
355 
356     // These fields should no longer be public if the class ever becomes
357     // public or gets more complex
358     std::chrono::steady_clock::time_point time;
359     bool periodic;
360     std::chrono::milliseconds period;
361     TaskCallback callback;
362     AsyncTaskId task_id;
363   };
364 
365   // A comparator class to put shared pointers to tasks in an ordered set
366   struct task_p_comparator {
operator ()test_vendor_lib::AsyncManager::AsyncTaskManager::task_p_comparator367     bool operator()(const std::shared_ptr<Task>& t1,
368                     const std::shared_ptr<Task>& t2) const {
369       return *t1 < *t2;
370     }
371   };
372 
373   AsyncTaskManager(const AsyncTaskManager&) = delete;
374   AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
375 
scheduleTask(const std::shared_ptr<Task> & task)376   AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
377     AsyncTaskId task_id = kInvalidTaskId;
378     {
379       std::unique_lock<std::mutex> guard(internal_mutex_);
380       // no more room for new tasks, we need a larger type for IDs
381       if (tasks_by_id.size() == kMaxTaskId)  // TODO potentially type unsafe
382         return kInvalidTaskId;
383       do {
384         lastTaskId_ = NextAsyncTaskId(lastTaskId_);
385       } while (isTaskIdInUse(lastTaskId_));
386       task->task_id = lastTaskId_;
387       // add task to the queue and map
388       tasks_by_id[lastTaskId_] = task;
389       task_queue_.insert(task);
390       task_id = lastTaskId_;
391     }
392     // start thread if necessary
393     int started = tryStartThread();
394     if (started != 0) {
395       LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
396       return kInvalidTaskId;
397     }
398     // notify the thread so that it knows of the new task
399     internal_cond_var_.notify_one();
400     // return task id
401     return task_id;
402   }
403 
isTaskIdInUse(const AsyncTaskId & task_id) const404   bool isTaskIdInUse(const AsyncTaskId& task_id) const {
405     return tasks_by_id.count(task_id) != 0;
406   }
407 
tryStartThread()408   int tryStartThread() {
409     // need the lock because of the running flag and the cond var
410     std::unique_lock<std::mutex> guard(internal_mutex_);
411     // check that the thread is not yet running
412     if (running_) {
413       return 0;
414     }
415     // start the thread
416     running_ = true;
417     thread_ = std::thread([this]() { ThreadRoutine(); });
418     if (!thread_.joinable()) {
419       LOG_ERROR(LOG_TAG, "%s: Unable to start task thread", __func__);
420       return -1;
421     }
422     return 0;
423   }
424 
ThreadRoutine()425   void ThreadRoutine() {
426     while (1) {
427       TaskCallback callback;
428       bool run_it = false;
429       {
430         std::unique_lock<std::mutex> guard(internal_mutex_);
431         if (!task_queue_.empty()) {
432           std::shared_ptr<Task> task_p = *(task_queue_.begin());
433           if (task_p->time < std::chrono::steady_clock::now()) {
434             run_it = true;
435             callback = task_p->callback;
436             task_queue_.erase(task_p);  // need to remove and add again if
437                                         // periodic to update order
438             if (task_p->isPeriodic()) {
439               task_p->time += task_p->period;
440               task_queue_.insert(task_p);
441             } else {
442               tasks_by_id.erase(task_p->task_id);
443             }
444           }
445         }
446       }
447       if (run_it) {
448         callback();
449       }
450       {
451         std::unique_lock<std::mutex> guard(internal_mutex_);
452         // wait on condition variable with timeout just in time for next task if
453         // any
454         if (task_queue_.size() > 0) {
455           internal_cond_var_.wait_until(guard, (*task_queue_.begin())->time);
456         } else {
457           internal_cond_var_.wait(guard);
458         }
459         // check for termination right after being notified (and maybe before?)
460         if (!running_) break;
461       }
462     }
463   }
464 
465   bool running_ = false;
466   std::thread thread_;
467   std::mutex internal_mutex_;
468   std::condition_variable internal_cond_var_;
469 
470   AsyncTaskId lastTaskId_ = kInvalidTaskId;
471   std::map<AsyncTaskId, std::shared_ptr<Task> > tasks_by_id;
472   std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
473 };
474 
475 // Async Manager Implementation:
AsyncManager()476 AsyncManager::AsyncManager()
477     : fdWatcher_p_(new AsyncFdWatcher()),
478       taskManager_p_(new AsyncTaskManager()) {}
479 
~AsyncManager()480 AsyncManager::~AsyncManager() {
481   // Make sure the threads are stopped before destroying the object.
482   // The threads need to be stopped here and not in each internal class'
483   // destructor because unique_ptr's reset() first assigns nullptr to the
484   // pointer and only then calls the destructor, so any callback running
485   // on these threads would dereference a null pointer if they called a member
486   // function of this class.
487   fdWatcher_p_->stopThread();
488   taskManager_p_->stopThread();
489 }
490 
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)491 int AsyncManager::WatchFdForNonBlockingReads(
492     int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
493   return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor,
494                                                   on_read_fd_ready_callback);
495 }
496 
StopWatchingFileDescriptor(int file_descriptor)497 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
498   fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
499 }
500 
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)501 AsyncTaskId AsyncManager::ExecAsync(std::chrono::milliseconds delay,
502                                     const TaskCallback& callback) {
503   return taskManager_p_->ExecAsync(delay, callback);
504 }
505 
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)506 AsyncTaskId AsyncManager::ExecAsyncPeriodically(
507     std::chrono::milliseconds delay, std::chrono::milliseconds period,
508     const TaskCallback& callback) {
509   return taskManager_p_->ExecAsyncPeriodically(delay, period, callback);
510 }
511 
CancelAsyncTask(AsyncTaskId async_task_id)512 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
513   return taskManager_p_->CancelAsyncTask(async_task_id);
514 }
515 
Synchronize(const CriticalCallback & critical)516 void AsyncManager::Synchronize(const CriticalCallback& critical) {
517   std::unique_lock<std::mutex> guard(synchronization_mutex_);
518   critical();
519 }
520 }  // namespace test_vendor_lib
521