• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "async_manager"
18 
19 #include "async_manager.h"
20 
21 #include "osi/include/log.h"
22 
23 #include <algorithm>
24 #include <atomic>
25 #include <condition_variable>
26 #include <mutex>
27 #include <thread>
28 #include <vector>
29 #include "fcntl.h"
30 #include "sys/select.h"
31 #include "unistd.h"
32 
33 namespace test_vendor_lib {
34 // Implementation of AsyncManager is divided between two classes, three if
35 // AsyncManager itself is taken into account, but its only responsability
36 // besides being a proxy for the other two classes is to provide a global
37 // synchronization mechanism for callbacks and client code to use.
38 
39 // The watching of file descriptors is done through AsyncFdWatcher. Several
40 // objects of this class may coexist simultaneosly as they share no state.
41 // After construction of this objects nothing happens beyond some very simple
42 // member initialization. When the first FD is set up for watching the object
43 // starts a new thread which watches the given (and later provided) FDs using
44 // select() inside a loop. A special FD (a pipe) is also watched which is
45 // used to notify the thread of internal changes on the object state (like
46 // the addition of new FDs to watch on). Every access to internal state is
47 // synchronized using a single internal mutex. The thread is only stopped on
48 // destruction of the object, by modifying a flag, which is the only member
49 // variable accessed without acquiring the lock (because the notification to
50 // the thread is done later by writing to a pipe which means the thread will
51 // be notified regardless of what phase of the loop it is in that moment)
52 
53 // The scheduling of asynchronous tasks, periodic or not, is handled by the
54 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
55 // state between different instances so it is safe to use several objects of
56 // this class, also nothing interesting happens upon construction, but only
57 // after a Task has been scheduled and access to internal state is synchronized
58 // using a single internal mutex. When the first task is scheduled a thread
59 // is started which monitors a queue of tasks. The queue is peeked to see
60 // when the next task should be carried out and then the thread performs a
61 // (absolute) timed wait on a condition variable. The wait ends because of a
62 // time out or a notify on the cond var, the former means a task is due
63 // for execution while the later means there has been a change in internal
64 // state, like a task has been scheduled/canceled or the flag to stop has
65 // been set. Setting and querying the stop flag or modifying the task queue
66 // and subsequent notification on the cond var is done atomically (e.g while
67 // holding the lock on the internal mutex) to ensure that the thread never
68 // misses the notification, since notifying a cond var is not persistent as
69 // writing on a pipe (if not done this way, the thread could query the
70 // stopping flag and be put aside by the OS scheduler right after, then the
71 // 'stop thread' procedure could run, setting the flag, notifying a cond
72 // var that no one is waiting on and joining the thread, the thread then
73 // resumes execution believing that it needs to continue and waits on the
74 // cond var possibly forever if there are no tasks scheduled, efectively
75 // causing a deadlock).
76 
77 // This number also states the maximum number of scheduled tasks we can handle
78 // at a given time
79 static const uint16_t kMaxTaskId = -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)80 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
81   return (id == kMaxTaskId) ? 1 : id + 1;
82 }
83 // The buffer is only 10 bytes because the expected number of bytes
84 // written on this socket is 1. It is possible that the thread is notified
85 // more than once but highly unlikely, so a buffer of size 10 seems enough
86 // and the reads are performed inside a while just in case it isn't. From
87 // the thread routine's point of view it is the same to have been notified
88 // just once or 100 times so it just tries to consume the entire buffer.
89 // In the cases where an interrupt would cause read to return without
90 // having read everything that was available a new iteration of the thread
91 // loop will bring execution to this point almost immediately, so there is
92 // no need to treat that case.
93 static const int kNotificationBufferSize = 10;
94 
95 // Async File Descriptor Watcher Implementation:
96 class AsyncManager::AsyncFdWatcher {
97  public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)98   int WatchFdForNonBlockingReads(int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
99     // add file descriptor and callback
100     {
101       std::unique_lock<std::mutex> guard(internal_mutex_);
102       watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
103     }
104 
105     // start the thread if not started yet
106     int started = tryStartThread();
107     if (started != 0) {
108       LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
109       return started;
110     }
111 
112     // notify the thread so that it knows of the new FD
113     notifyThread();
114 
115     return 0;
116   }
117 
StopWatchingFileDescriptor(int file_descriptor)118   void StopWatchingFileDescriptor(int file_descriptor) {
119     std::unique_lock<std::mutex> guard(internal_mutex_);
120     watched_shared_fds_.erase(file_descriptor);
121   }
122 
123   AsyncFdWatcher() = default;
124 
125   ~AsyncFdWatcher() = default;
126 
stopThread()127   int stopThread() {
128     if (!std::atomic_exchange(&running_, false)) {
129       return 0;  // if not running already
130     }
131 
132     notifyThread();
133 
134     if (std::this_thread::get_id() != thread_.get_id()) {
135       thread_.join();
136     } else {
137       LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the reading thread itself", __func__);
138     }
139 
140     {
141       std::unique_lock<std::mutex> guard(internal_mutex_);
142       watched_shared_fds_.clear();
143     }
144 
145     return 0;
146   }
147 
148  private:
149   AsyncFdWatcher(const AsyncFdWatcher&) = delete;
150   AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
151 
152   // Make sure to call this with at least one file descriptor ready to be
153   // watched upon or the thread routine will return immediately
tryStartThread()154   int tryStartThread() {
155     if (std::atomic_exchange(&running_, true)) {
156       return 0;  // if already running
157     }
158     // set up the communication channel
159     int pipe_fds[2];
160     if (pipe2(pipe_fds, O_NONBLOCK)) {
161       LOG_ERROR(LOG_TAG,
162                 "%s:Unable to establish a communication channel to the reading "
163                 "thread",
164                 __func__);
165       return -1;
166     }
167     notification_listen_fd_ = pipe_fds[0];
168     notification_write_fd_ = pipe_fds[1];
169 
170     thread_ = std::thread([this]() { ThreadRoutine(); });
171     if (!thread_.joinable()) {
172       LOG_ERROR(LOG_TAG, "%s: Unable to start reading thread", __func__);
173       return -1;
174     }
175     return 0;
176   }
177 
notifyThread()178   int notifyThread() {
179     char buffer = '0';
180     if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
181       LOG_ERROR(LOG_TAG, "%s: Unable to send message to reading thread", __func__);
182       return -1;
183     }
184     return 0;
185   }
186 
setUpFileDescriptorSet(fd_set & read_fds)187   int setUpFileDescriptorSet(fd_set& read_fds) {
188     // add comm channel to the set
189     FD_SET(notification_listen_fd_, &read_fds);
190     int nfds = notification_listen_fd_;
191 
192     // add watched FDs to the set
193     {
194       std::unique_lock<std::mutex> guard(internal_mutex_);
195       for (auto& fdp : watched_shared_fds_) {
196         FD_SET(fdp.first, &read_fds);
197         nfds = std::max(fdp.first, nfds);
198       }
199     }
200     return nfds;
201   }
202 
203   // check the comm channel and read everything there
consumeThreadNotifications(fd_set & read_fds)204   bool consumeThreadNotifications(fd_set& read_fds) {
205     if (FD_ISSET(notification_listen_fd_, &read_fds)) {
206       char buffer[kNotificationBufferSize];
207       while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer, kNotificationBufferSize)) ==
208              kNotificationBufferSize) {
209       }
210       return true;
211     }
212     return false;
213   }
214 
215   // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(fd_set & read_fds)216   void runAppropriateCallbacks(fd_set& read_fds) {
217     // not a good idea to call a callback while holding the FD lock,
218     // nor to release the lock while traversing the map
219     std::vector<decltype(watched_shared_fds_)::value_type> fds;
220     {
221       std::unique_lock<std::mutex> guard(internal_mutex_);
222       for (auto& fdc : watched_shared_fds_) {
223         if (FD_ISSET(fdc.first, &read_fds)) {
224           fds.push_back(fdc);
225         }
226       }
227     }
228     for (auto& p : fds) {
229       p.second(p.first);
230     }
231   }
232 
ThreadRoutine()233   void ThreadRoutine() {
234     while (running_) {
235       fd_set read_fds;
236       FD_ZERO(&read_fds);
237       int nfds = setUpFileDescriptorSet(read_fds);
238 
239       // wait until there is data available to read on some FD
240       int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
241       if (retval <= 0) {  // there was some error or a timeout
242         LOG_ERROR(LOG_TAG,
243                   "%s: There was an error while waiting for data on the file "
244                   "descriptors: %s",
245                   __func__, strerror(errno));
246         continue;
247       }
248 
249       consumeThreadNotifications(read_fds);
250 
251       // Do not read if there was a call to stop running
252       if (!running_) {
253         break;
254       }
255 
256       runAppropriateCallbacks(read_fds);
257     }
258   }
259 
260   std::atomic_bool running_{false};
261   std::thread thread_;
262   std::mutex internal_mutex_;
263 
264   std::map<int, ReadCallback> watched_shared_fds_;
265 
266   // A pair of FD to send information to the reading thread
267   int notification_listen_fd_;
268   int notification_write_fd_;
269 };
270 
271 // Async task manager implementation
272 class AsyncManager::AsyncTaskManager {
273  public:
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)274   AsyncTaskId ExecAsync(std::chrono::milliseconds delay, const TaskCallback& callback) {
275     return scheduleTask(std::make_shared<Task>(std::chrono::steady_clock::now() + delay, callback));
276   }
277 
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)278   AsyncTaskId ExecAsyncPeriodically(std::chrono::milliseconds delay, std::chrono::milliseconds period,
279                                     const TaskCallback& callback) {
280     return scheduleTask(std::make_shared<Task>(std::chrono::steady_clock::now() + delay, period, callback));
281   }
282 
CancelAsyncTask(AsyncTaskId async_task_id)283   bool CancelAsyncTask(AsyncTaskId async_task_id) {
284     // remove task from queue (and task id asociation) while holding lock
285     std::unique_lock<std::mutex> guard(internal_mutex_);
286     if (tasks_by_id.count(async_task_id) == 0) {
287       return false;
288     }
289     task_queue_.erase(tasks_by_id[async_task_id]);
290     tasks_by_id.erase(async_task_id);
291     return true;
292   }
293 
294   AsyncTaskManager() = default;
295 
296   ~AsyncTaskManager() = default;
297 
stopThread()298   int stopThread() {
299     {
300       std::unique_lock<std::mutex> guard(internal_mutex_);
301       tasks_by_id.clear();
302       task_queue_.clear();
303       if (!running_) {
304         return 0;
305       }
306       running_ = false;
307       // notify the thread
308       internal_cond_var_.notify_one();
309     }  // release the lock before joining a thread that is likely waiting for it
310     if (std::this_thread::get_id() != thread_.get_id()) {
311       thread_.join();
312     } else {
313       LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the task thread itself", __func__);
314     }
315     return 0;
316   }
317 
318  private:
319   // Holds the data for each task
320   class Task {
321    public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback)322     Task(std::chrono::steady_clock::time_point time, std::chrono::milliseconds period, const TaskCallback& callback)
323         : time(time), periodic(true), period(period), callback(callback), task_id(kInvalidTaskId) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback)324     Task(std::chrono::steady_clock::time_point time, const TaskCallback& callback)
325         : time(time), periodic(false), callback(callback), task_id(kInvalidTaskId) {}
326 
327     // Operators needed to be in a collection
operator <(const Task & another) const328     bool operator<(const Task& another) const {
329       return std::make_pair(time, task_id) < std::make_pair(another.time, another.task_id);
330     }
331 
isPeriodic() const332     bool isPeriodic() const {
333       return periodic;
334     }
335 
336     // These fields should no longer be public if the class ever becomes
337     // public or gets more complex
338     std::chrono::steady_clock::time_point time;
339     bool periodic;
340     std::chrono::milliseconds period;
341     TaskCallback callback;
342     AsyncTaskId task_id;
343   };
344 
345   // A comparator class to put shared pointers to tasks in an ordered set
346   struct task_p_comparator {
operator ()test_vendor_lib::AsyncManager::AsyncTaskManager::task_p_comparator347     bool operator()(const std::shared_ptr<Task>& t1, const std::shared_ptr<Task>& t2) const {
348       return *t1 < *t2;
349     }
350   };
351 
352   AsyncTaskManager(const AsyncTaskManager&) = delete;
353   AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
354 
scheduleTask(const std::shared_ptr<Task> & task)355   AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
356     AsyncTaskId task_id = kInvalidTaskId;
357     {
358       std::unique_lock<std::mutex> guard(internal_mutex_);
359       // no more room for new tasks, we need a larger type for IDs
360       if (tasks_by_id.size() == kMaxTaskId)  // TODO potentially type unsafe
361         return kInvalidTaskId;
362       do {
363         lastTaskId_ = NextAsyncTaskId(lastTaskId_);
364       } while (isTaskIdInUse(lastTaskId_));
365       task->task_id = lastTaskId_;
366       // add task to the queue and map
367       tasks_by_id[lastTaskId_] = task;
368       task_queue_.insert(task);
369       task_id = lastTaskId_;
370     }
371     // start thread if necessary
372     int started = tryStartThread();
373     if (started != 0) {
374       LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
375       return kInvalidTaskId;
376     }
377     // notify the thread so that it knows of the new task
378     internal_cond_var_.notify_one();
379     // return task id
380     return task_id;
381   }
382 
isTaskIdInUse(const AsyncTaskId & task_id) const383   bool isTaskIdInUse(const AsyncTaskId& task_id) const {
384     return tasks_by_id.count(task_id) != 0;
385   }
386 
tryStartThread()387   int tryStartThread() {
388     // need the lock because of the running flag and the cond var
389     std::unique_lock<std::mutex> guard(internal_mutex_);
390     // check that the thread is not yet running
391     if (running_) {
392       return 0;
393     }
394     // start the thread
395     running_ = true;
396     thread_ = std::thread([this]() { ThreadRoutine(); });
397     if (!thread_.joinable()) {
398       LOG_ERROR(LOG_TAG, "%s: Unable to start task thread", __func__);
399       return -1;
400     }
401     return 0;
402   }
403 
ThreadRoutine()404   void ThreadRoutine() {
405     while (1) {
406       TaskCallback callback;
407       bool run_it = false;
408       {
409         std::unique_lock<std::mutex> guard(internal_mutex_);
410         if (!task_queue_.empty()) {
411           std::shared_ptr<Task> task_p = *(task_queue_.begin());
412           if (task_p->time < std::chrono::steady_clock::now()) {
413             run_it = true;
414             callback = task_p->callback;
415             task_queue_.erase(task_p);  // need to remove and add again if
416                                         // periodic to update order
417             if (task_p->isPeriodic()) {
418               task_p->time += task_p->period;
419               task_queue_.insert(task_p);
420             } else {
421               tasks_by_id.erase(task_p->task_id);
422             }
423           }
424         }
425       }
426       if (run_it) {
427         callback();
428       }
429       {
430         std::unique_lock<std::mutex> guard(internal_mutex_);
431         // wait on condition variable with timeout just in time for next task if
432         // any
433         if (task_queue_.size() > 0) {
434           internal_cond_var_.wait_until(guard, (*task_queue_.begin())->time);
435         } else {
436           internal_cond_var_.wait(guard);
437         }
438         // check for termination right after being notified (and maybe before?)
439         if (!running_) break;
440       }
441     }
442   }
443 
444   bool running_ = false;
445   std::thread thread_;
446   std::mutex internal_mutex_;
447   std::condition_variable internal_cond_var_;
448 
449   AsyncTaskId lastTaskId_ = kInvalidTaskId;
450   std::map<AsyncTaskId, std::shared_ptr<Task> > tasks_by_id;
451   std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
452 };
453 
454 // Async Manager Implementation:
AsyncManager()455 AsyncManager::AsyncManager() : fdWatcher_p_(new AsyncFdWatcher()), taskManager_p_(new AsyncTaskManager()) {}
456 
~AsyncManager()457 AsyncManager::~AsyncManager() {
458   // Make sure the threads are stopped before destroying the object.
459   // The threads need to be stopped here and not in each internal class'
460   // destructor because unique_ptr's reset() first assigns nullptr to the
461   // pointer and only then calls the destructor, so any callback running
462   // on these threads would dereference a null pointer if they called a member
463   // function of this class.
464   fdWatcher_p_->stopThread();
465   taskManager_p_->stopThread();
466 }
467 
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)468 int AsyncManager::WatchFdForNonBlockingReads(int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
469   return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor, on_read_fd_ready_callback);
470 }
471 
StopWatchingFileDescriptor(int file_descriptor)472 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
473   fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
474 }
475 
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)476 AsyncTaskId AsyncManager::ExecAsync(std::chrono::milliseconds delay, const TaskCallback& callback) {
477   return taskManager_p_->ExecAsync(delay, callback);
478 }
479 
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)480 AsyncTaskId AsyncManager::ExecAsyncPeriodically(std::chrono::milliseconds delay, std::chrono::milliseconds period,
481                                                 const TaskCallback& callback) {
482   return taskManager_p_->ExecAsyncPeriodically(delay, period, callback);
483 }
484 
CancelAsyncTask(AsyncTaskId async_task_id)485 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
486   return taskManager_p_->CancelAsyncTask(async_task_id);
487 }
488 
Synchronize(const CriticalCallback & critical)489 void AsyncManager::Synchronize(const CriticalCallback& critical) {
490   std::unique_lock<std::mutex> guard(synchronization_mutex_);
491   critical();
492 }
493 }  // namespace test_vendor_lib
494