• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "async_manager.h"
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <condition_variable>
22 #include <mutex>
23 #include <thread>
24 #include <vector>
25 
26 #include "fcntl.h"
27 #include "os/log.h"
28 #include "sys/select.h"
29 #include "unistd.h"
30 
31 namespace test_vendor_lib {
32 // Implementation of AsyncManager is divided between two classes, three if
33 // AsyncManager itself is taken into account, but its only responsability
34 // besides being a proxy for the other two classes is to provide a global
35 // synchronization mechanism for callbacks and client code to use.
36 
37 // The watching of file descriptors is done through AsyncFdWatcher. Several
38 // objects of this class may coexist simultaneosly as they share no state.
39 // After construction of this objects nothing happens beyond some very simple
40 // member initialization. When the first FD is set up for watching the object
41 // starts a new thread which watches the given (and later provided) FDs using
42 // select() inside a loop. A special FD (a pipe) is also watched which is
43 // used to notify the thread of internal changes on the object state (like
44 // the addition of new FDs to watch on). Every access to internal state is
45 // synchronized using a single internal mutex. The thread is only stopped on
46 // destruction of the object, by modifying a flag, which is the only member
47 // variable accessed without acquiring the lock (because the notification to
48 // the thread is done later by writing to a pipe which means the thread will
49 // be notified regardless of what phase of the loop it is in that moment)
50 
51 // The scheduling of asynchronous tasks, periodic or not, is handled by the
52 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
53 // state between different instances so it is safe to use several objects of
54 // this class, also nothing interesting happens upon construction, but only
55 // after a Task has been scheduled and access to internal state is synchronized
56 // using a single internal mutex. When the first task is scheduled a thread
57 // is started which monitors a queue of tasks. The queue is peeked to see
58 // when the next task should be carried out and then the thread performs a
59 // (absolute) timed wait on a condition variable. The wait ends because of a
60 // time out or a notify on the cond var, the former means a task is due
61 // for execution while the later means there has been a change in internal
62 // state, like a task has been scheduled/canceled or the flag to stop has
63 // been set. Setting and querying the stop flag or modifying the task queue
64 // and subsequent notification on the cond var is done atomically (e.g while
65 // holding the lock on the internal mutex) to ensure that the thread never
66 // misses the notification, since notifying a cond var is not persistent as
67 // writing on a pipe (if not done this way, the thread could query the
68 // stopping flag and be put aside by the OS scheduler right after, then the
69 // 'stop thread' procedure could run, setting the flag, notifying a cond
70 // var that no one is waiting on and joining the thread, the thread then
71 // resumes execution believing that it needs to continue and waits on the
72 // cond var possibly forever if there are no tasks scheduled, efectively
73 // causing a deadlock).
74 
75 // This number also states the maximum number of scheduled tasks we can handle
76 // at a given time
77 static const uint16_t kMaxTaskId = -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)78 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
79   return (id == kMaxTaskId) ? 1 : id + 1;
80 }
81 // The buffer is only 10 bytes because the expected number of bytes
82 // written on this socket is 1. It is possible that the thread is notified
83 // more than once but highly unlikely, so a buffer of size 10 seems enough
84 // and the reads are performed inside a while just in case it isn't. From
85 // the thread routine's point of view it is the same to have been notified
86 // just once or 100 times so it just tries to consume the entire buffer.
87 // In the cases where an interrupt would cause read to return without
88 // having read everything that was available a new iteration of the thread
89 // loop will bring execution to this point almost immediately, so there is
90 // no need to treat that case.
91 static const int kNotificationBufferSize = 10;
92 
93 // Async File Descriptor Watcher Implementation:
94 class AsyncManager::AsyncFdWatcher {
95  public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)96   int WatchFdForNonBlockingReads(int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
97     // add file descriptor and callback
98     {
99       std::unique_lock<std::mutex> guard(internal_mutex_);
100       watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
101     }
102 
103     // start the thread if not started yet
104     int started = tryStartThread();
105     if (started != 0) {
106       LOG_ERROR("%s: Unable to start thread", __func__);
107       return started;
108     }
109 
110     // notify the thread so that it knows of the new FD
111     notifyThread();
112 
113     return 0;
114   }
115 
StopWatchingFileDescriptor(int file_descriptor)116   void StopWatchingFileDescriptor(int file_descriptor) {
117     std::unique_lock<std::mutex> guard(internal_mutex_);
118     watched_shared_fds_.erase(file_descriptor);
119   }
120 
121   AsyncFdWatcher() = default;
122 
123   ~AsyncFdWatcher() = default;
124 
stopThread()125   int stopThread() {
126     if (!std::atomic_exchange(&running_, false)) {
127       return 0;  // if not running already
128     }
129 
130     notifyThread();
131 
132     if (std::this_thread::get_id() != thread_.get_id()) {
133       thread_.join();
134     } else {
135       LOG_WARN("%s: Starting thread stop from inside the reading thread itself", __func__);
136     }
137 
138     {
139       std::unique_lock<std::mutex> guard(internal_mutex_);
140       watched_shared_fds_.clear();
141     }
142 
143     return 0;
144   }
145 
146  private:
147   AsyncFdWatcher(const AsyncFdWatcher&) = delete;
148   AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
149 
150   // Make sure to call this with at least one file descriptor ready to be
151   // watched upon or the thread routine will return immediately
tryStartThread()152   int tryStartThread() {
153     if (std::atomic_exchange(&running_, true)) {
154       return 0;  // if already running
155     }
156     // set up the communication channel
157     int pipe_fds[2];
158     if (pipe2(pipe_fds, O_NONBLOCK)) {
159       LOG_ERROR(
160           "%s:Unable to establish a communication channel to the reading "
161           "thread",
162           __func__);
163       return -1;
164     }
165     notification_listen_fd_ = pipe_fds[0];
166     notification_write_fd_ = pipe_fds[1];
167 
168     thread_ = std::thread([this]() { ThreadRoutine(); });
169     if (!thread_.joinable()) {
170       LOG_ERROR("%s: Unable to start reading thread", __func__);
171       return -1;
172     }
173     return 0;
174   }
175 
notifyThread()176   int notifyThread() {
177     char buffer = '0';
178     if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
179       LOG_ERROR("%s: Unable to send message to reading thread", __func__);
180       return -1;
181     }
182     return 0;
183   }
184 
setUpFileDescriptorSet(fd_set & read_fds)185   int setUpFileDescriptorSet(fd_set& read_fds) {
186     // add comm channel to the set
187     FD_SET(notification_listen_fd_, &read_fds);
188     int nfds = notification_listen_fd_;
189 
190     // add watched FDs to the set
191     {
192       std::unique_lock<std::mutex> guard(internal_mutex_);
193       for (auto& fdp : watched_shared_fds_) {
194         FD_SET(fdp.first, &read_fds);
195         nfds = std::max(fdp.first, nfds);
196       }
197     }
198     return nfds;
199   }
200 
201   // check the comm channel and read everything there
consumeThreadNotifications(fd_set & read_fds)202   bool consumeThreadNotifications(fd_set& read_fds) {
203     if (FD_ISSET(notification_listen_fd_, &read_fds)) {
204       char buffer[kNotificationBufferSize];
205       while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer, kNotificationBufferSize)) ==
206              kNotificationBufferSize) {
207       }
208       return true;
209     }
210     return false;
211   }
212 
213   // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(fd_set & read_fds)214   void runAppropriateCallbacks(fd_set& read_fds) {
215     // not a good idea to call a callback while holding the FD lock,
216     // nor to release the lock while traversing the map
217     std::vector<decltype(watched_shared_fds_)::value_type> fds;
218     {
219       std::unique_lock<std::mutex> guard(internal_mutex_);
220       for (auto& fdc : watched_shared_fds_) {
221         if (FD_ISSET(fdc.first, &read_fds)) {
222           fds.push_back(fdc);
223         }
224       }
225     }
226     for (auto& p : fds) {
227       p.second(p.first);
228     }
229   }
230 
ThreadRoutine()231   void ThreadRoutine() {
232     while (running_) {
233       fd_set read_fds;
234       FD_ZERO(&read_fds);
235       int nfds = setUpFileDescriptorSet(read_fds);
236 
237       // wait until there is data available to read on some FD
238       int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
239       if (retval <= 0) {  // there was some error or a timeout
240         LOG_ERROR(
241             "%s: There was an error while waiting for data on the file "
242             "descriptors: %s",
243             __func__, strerror(errno));
244         continue;
245       }
246 
247       consumeThreadNotifications(read_fds);
248 
249       // Do not read if there was a call to stop running
250       if (!running_) {
251         break;
252       }
253 
254       runAppropriateCallbacks(read_fds);
255     }
256   }
257 
258   std::atomic_bool running_{false};
259   std::thread thread_;
260   std::mutex internal_mutex_;
261 
262   std::map<int, ReadCallback> watched_shared_fds_;
263 
264   // A pair of FD to send information to the reading thread
265   int notification_listen_fd_;
266   int notification_write_fd_;
267 };
268 
269 // Async task manager implementation
270 class AsyncManager::AsyncTaskManager {
271  public:
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)272   AsyncTaskId ExecAsync(std::chrono::milliseconds delay, const TaskCallback& callback) {
273     return scheduleTask(std::make_shared<Task>(std::chrono::steady_clock::now() + delay, callback));
274   }
275 
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)276   AsyncTaskId ExecAsyncPeriodically(std::chrono::milliseconds delay, std::chrono::milliseconds period,
277                                     const TaskCallback& callback) {
278     return scheduleTask(std::make_shared<Task>(std::chrono::steady_clock::now() + delay, period, callback));
279   }
280 
CancelAsyncTask(AsyncTaskId async_task_id)281   bool CancelAsyncTask(AsyncTaskId async_task_id) {
282     // remove task from queue (and task id asociation) while holding lock
283     std::unique_lock<std::mutex> guard(internal_mutex_);
284     if (tasks_by_id.count(async_task_id) == 0) {
285       return false;
286     }
287     task_queue_.erase(tasks_by_id[async_task_id]);
288     tasks_by_id.erase(async_task_id);
289     return true;
290   }
291 
292   AsyncTaskManager() = default;
293 
294   ~AsyncTaskManager() = default;
295 
stopThread()296   int stopThread() {
297     {
298       std::unique_lock<std::mutex> guard(internal_mutex_);
299       tasks_by_id.clear();
300       task_queue_.clear();
301       if (!running_) {
302         return 0;
303       }
304       running_ = false;
305       // notify the thread
306       internal_cond_var_.notify_one();
307     }  // release the lock before joining a thread that is likely waiting for it
308     if (std::this_thread::get_id() != thread_.get_id()) {
309       thread_.join();
310     } else {
311       LOG_WARN("%s: Starting thread stop from inside the task thread itself", __func__);
312     }
313     return 0;
314   }
315 
316  private:
317   // Holds the data for each task
318   class Task {
319    public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback)320     Task(std::chrono::steady_clock::time_point time, std::chrono::milliseconds period, const TaskCallback& callback)
321         : time(time), periodic(true), period(period), callback(callback), task_id(kInvalidTaskId) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback)322     Task(std::chrono::steady_clock::time_point time, const TaskCallback& callback)
323         : time(time), periodic(false), callback(callback), task_id(kInvalidTaskId) {}
324 
325     // Operators needed to be in a collection
operator <(const Task & another) const326     bool operator<(const Task& another) const {
327       return std::make_pair(time, task_id) < std::make_pair(another.time, another.task_id);
328     }
329 
isPeriodic() const330     bool isPeriodic() const {
331       return periodic;
332     }
333 
334     // These fields should no longer be public if the class ever becomes
335     // public or gets more complex
336     std::chrono::steady_clock::time_point time;
337     bool periodic;
338     std::chrono::milliseconds period;
339     TaskCallback callback;
340     AsyncTaskId task_id;
341   };
342 
343   // A comparator class to put shared pointers to tasks in an ordered set
344   struct task_p_comparator {
operator ()test_vendor_lib::AsyncManager::AsyncTaskManager::task_p_comparator345     bool operator()(const std::shared_ptr<Task>& t1, const std::shared_ptr<Task>& t2) const {
346       return *t1 < *t2;
347     }
348   };
349 
350   AsyncTaskManager(const AsyncTaskManager&) = delete;
351   AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
352 
scheduleTask(const std::shared_ptr<Task> & task)353   AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
354     AsyncTaskId task_id = kInvalidTaskId;
355     {
356       std::unique_lock<std::mutex> guard(internal_mutex_);
357       // no more room for new tasks, we need a larger type for IDs
358       if (tasks_by_id.size() == kMaxTaskId)  // TODO potentially type unsafe
359         return kInvalidTaskId;
360       do {
361         lastTaskId_ = NextAsyncTaskId(lastTaskId_);
362       } while (isTaskIdInUse(lastTaskId_));
363       task->task_id = lastTaskId_;
364       // add task to the queue and map
365       tasks_by_id[lastTaskId_] = task;
366       task_queue_.insert(task);
367       task_id = lastTaskId_;
368     }
369     // start thread if necessary
370     int started = tryStartThread();
371     if (started != 0) {
372       LOG_ERROR("%s: Unable to start thread", __func__);
373       return kInvalidTaskId;
374     }
375     // notify the thread so that it knows of the new task
376     internal_cond_var_.notify_one();
377     // return task id
378     return task_id;
379   }
380 
isTaskIdInUse(const AsyncTaskId & task_id) const381   bool isTaskIdInUse(const AsyncTaskId& task_id) const {
382     return tasks_by_id.count(task_id) != 0;
383   }
384 
tryStartThread()385   int tryStartThread() {
386     // need the lock because of the running flag and the cond var
387     std::unique_lock<std::mutex> guard(internal_mutex_);
388     // check that the thread is not yet running
389     if (running_) {
390       return 0;
391     }
392     // start the thread
393     running_ = true;
394     thread_ = std::thread([this]() { ThreadRoutine(); });
395     if (!thread_.joinable()) {
396       LOG_ERROR("%s: Unable to start task thread", __func__);
397       return -1;
398     }
399     return 0;
400   }
401 
ThreadRoutine()402   void ThreadRoutine() {
403     while (1) {
404       TaskCallback callback;
405       bool run_it = false;
406       {
407         std::unique_lock<std::mutex> guard(internal_mutex_);
408         if (!task_queue_.empty()) {
409           std::shared_ptr<Task> task_p = *(task_queue_.begin());
410           if (task_p->time < std::chrono::steady_clock::now()) {
411             run_it = true;
412             callback = task_p->callback;
413             task_queue_.erase(task_p);  // need to remove and add again if
414                                         // periodic to update order
415             if (task_p->isPeriodic()) {
416               task_p->time += task_p->period;
417               task_queue_.insert(task_p);
418             } else {
419               tasks_by_id.erase(task_p->task_id);
420             }
421           }
422         }
423       }
424       if (run_it) {
425         callback();
426       }
427       {
428         std::unique_lock<std::mutex> guard(internal_mutex_);
429         // wait on condition variable with timeout just in time for next task if
430         // any
431         if (task_queue_.size() > 0) {
432           internal_cond_var_.wait_until(guard, (*task_queue_.begin())->time);
433         } else {
434           internal_cond_var_.wait(guard);
435         }
436         // check for termination right after being notified (and maybe before?)
437         if (!running_) break;
438       }
439     }
440   }
441 
442   bool running_ = false;
443   std::thread thread_;
444   std::mutex internal_mutex_;
445   std::condition_variable internal_cond_var_;
446 
447   AsyncTaskId lastTaskId_ = kInvalidTaskId;
448   std::map<AsyncTaskId, std::shared_ptr<Task> > tasks_by_id;
449   std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
450 };
451 
452 // Async Manager Implementation:
AsyncManager()453 AsyncManager::AsyncManager() : fdWatcher_p_(new AsyncFdWatcher()), taskManager_p_(new AsyncTaskManager()) {}
454 
~AsyncManager()455 AsyncManager::~AsyncManager() {
456   // Make sure the threads are stopped before destroying the object.
457   // The threads need to be stopped here and not in each internal class'
458   // destructor because unique_ptr's reset() first assigns nullptr to the
459   // pointer and only then calls the destructor, so any callback running
460   // on these threads would dereference a null pointer if they called a member
461   // function of this class.
462   fdWatcher_p_->stopThread();
463   taskManager_p_->stopThread();
464 }
465 
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)466 int AsyncManager::WatchFdForNonBlockingReads(int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
467   return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor, on_read_fd_ready_callback);
468 }
469 
StopWatchingFileDescriptor(int file_descriptor)470 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
471   fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
472 }
473 
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)474 AsyncTaskId AsyncManager::ExecAsync(std::chrono::milliseconds delay, const TaskCallback& callback) {
475   return taskManager_p_->ExecAsync(delay, callback);
476 }
477 
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)478 AsyncTaskId AsyncManager::ExecAsyncPeriodically(std::chrono::milliseconds delay, std::chrono::milliseconds period,
479                                                 const TaskCallback& callback) {
480   return taskManager_p_->ExecAsyncPeriodically(delay, period, callback);
481 }
482 
CancelAsyncTask(AsyncTaskId async_task_id)483 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
484   return taskManager_p_->CancelAsyncTask(async_task_id);
485 }
486 
Synchronize(const CriticalCallback & critical)487 void AsyncManager::Synchronize(const CriticalCallback& critical) {
488   std::unique_lock<std::mutex> guard(synchronization_mutex_);
489   critical();
490 }
491 }  // namespace test_vendor_lib
492