• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "model/setup/async_manager.h"  // for AsyncManager
17 
18 #include <errno.h>                              // for errno
19 #include <atomic>                               // for atomic_bool, atomic_e...
20 #include <condition_variable>                   // for condition_variable
21 #include <cstring>                              // for strerror
22 #include <limits>                               // for numeric_limits
23 #include <map>                                  // for map<>::value_type, map
24 #include <mutex>                                // for unique_lock, mutex
25 #include <ratio>                                // for ratio
26 #include <set>                                  // for set
27 #include <thread>                               // for thread
28 #include <type_traits>                          // for remove_extent_t
29 #include <utility>                              // for pair, make_pair, oper...
30 #include <vector>                               // for vector
31 
32 #include "aemu/base/EintrWrapper.h"          // for HANDLE_EINTR
33 #include "aemu/base/Log.h"                   // for LogStreamVoidify, Log...
34 #include "aemu/base/sockets/SocketUtils.h"   // for socketRecv, socketSet...
35 #include "aemu/base/sockets/SocketWaiter.h"  // for SocketWaiter, SocketW...
36 #include "aemu/base/logging/CLog.h"
37 
38 namespace rootcanal {
39 // Implementation of AsyncManager is divided between two classes, three if
40 // AsyncManager itself is taken into account, but its only responsability
41 // besides being a proxy for the other two classes is to provide a global
42 // synchronization mechanism for callbacks and client code to use.
43 
44 // The watching of file descriptors is done through AsyncFdWatcher. Several
45 // objects of this class may coexist simultaneosly as they share no state.
46 // After construction of this objects nothing happens beyond some very simple
47 // member initialization. When the first FD is set up for watching the object
48 // starts a new thread which watches the given (and later provided) FDs using
49 // select() inside a loop. A special FD (a pipe) is also watched which is
50 // used to notify the thread of internal changes on the object state (like
51 // the addition of new FDs to watch on). Every access to internal state is
52 // synchronized using a single internal mutex. The thread is only stopped on
53 // destruction of the object, by modifying a flag, which is the only member
54 // variable accessed without acquiring the lock (because the notification to
55 // the thread is done later by writing to a pipe which means the thread will
56 // be notified regardless of what phase of the loop it is in that moment)
57 
58 // The scheduling of asynchronous tasks, periodic or not, is handled by the
59 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
60 // state between different instances so it is safe to use several objects of
61 // this class, also nothing interesting happens upon construction, but only
62 // after a Task has been scheduled and access to internal state is synchronized
63 // using a single internal mutex. When the first task is scheduled a thread
64 // is started which monitors a queue of tasks. The queue is peeked to see
65 // when the next task should be carried out and then the thread performs a
66 // (absolute) timed wait on a condition variable. The wait ends because of a
67 // time out or a notify on the cond var, the former means a task is due
68 // for execution while the later means there has been a change in internal
69 // state, like a task has been scheduled/canceled or the flag to stop has
70 // been set. Setting and querying the stop flag or modifying the task queue
71 // and subsequent notification on the cond var is done atomically (e.g while
72 // holding the lock on the internal mutex) to ensure that the thread never
73 // misses the notification, since notifying a cond var is not persistent as
74 // writing on a pipe (if not done this way, the thread could query the
75 // stopping flag and be put aside by the OS scheduler right after, then the
76 // 'stop thread' procedure could run, setting the flag, notifying a cond
77 // var that no one is waiting on and joining the thread, the thread then
78 // resumes execution believing that it needs to continue and waits on the
79 // cond var possibly forever if there are no tasks scheduled, efectively
80 // causing a deadlock).
81 
82 // This number also states the maximum number of scheduled tasks we can handle
83 // at a given time
84 static const uint16_t kMaxTaskId = -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)85 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
86   return (id == kMaxTaskId) ? 1 : id + 1;
87 }
88 // The buffer is only 10 bytes because the expected number of bytes
89 // written on this socket is 1. It is possible that the thread is notified
90 // more than once but highly unlikely, so a buffer of size 10 seems enough
91 // and the reads are performed inside a while just in case it isn't. From
92 // the thread routine's point of view it is the same to have been notified
93 // just once or 100 times so it just tries to consume the entire buffer.
94 // In the cases where an interrupt would cause read to return without
95 // having read everything that was available a new iteration of the thread
96 // loop will bring execution to this point almost immediately, so there is
97 // no need to treat that case.
98 static const int kNotificationBufferSize = 10;
99 
100 
101 using android::base::SocketWaiter;
102 
103 // Async File Descriptor Watcher Implementation:
104 class AsyncManager::AsyncFdWatcher {
105 
106  public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)107   int WatchFdForNonBlockingReads(int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
108     // add file descriptor and callback
109     {
110       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
111       watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
112     }
113 
114     // start the thread if not started yet
115     int started = tryStartThread();
116     if (started != 0) {
117       derror("%s: Unable to start thread", __func__);
118       return started;
119     }
120 
121     // notify the thread so that it knows of the new FD
122     notifyThread();
123 
124     return 0;
125   }
126 
StopWatchingFileDescriptor(int file_descriptor)127   void StopWatchingFileDescriptor(int file_descriptor) {
128     std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
129     watched_shared_fds_.erase(file_descriptor);
130   }
131 
132   AsyncFdWatcher() = default;
133   AsyncFdWatcher(const AsyncFdWatcher&) = delete;
134   AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
135 
136   ~AsyncFdWatcher() = default;
137 
stopThread()138   int stopThread() {
139     if (!std::atomic_exchange(&running_, false)) {
140       return 0;  // if not running already
141     }
142 
143     notifyThread();
144 
145     if (std::this_thread::get_id() != thread_.get_id()) {
146       thread_.join();
147     } else {
148       dwarning("%s: Starting thread stop from inside the reading thread itself", __func__);
149     }
150 
151     {
152       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
153       watched_shared_fds_.clear();
154     }
155 
156     return 0;
157   }
158 
159  private:
160   // Make sure to call this with at least one file descriptor ready to be
161   // watched upon or the thread routine will return immediately
tryStartThread()162   int tryStartThread() {
163     if (std::atomic_exchange(&running_, true)) {
164       return 0;  // if already running
165     }
166     // set up the communication channel
167     if (android::base::socketCreatePair(&notification_listen_fd_, &notification_write_fd_)) {
168       derror(
169           "%s:Unable to establish a communication channel to the reading "
170           "thread",
171           __func__);
172       return -1;
173     }
174     android::base::socketSetNonBlocking(notification_listen_fd_);
175     android::base::socketSetNonBlocking(notification_write_fd_);
176 
177     thread_ = std::thread([this]() { ThreadRoutine(); });
178     if (!thread_.joinable()) {
179       derror("%s: Unable to start reading thread", __func__);
180       return -1;
181     }
182     return 0;
183   }
184 
notifyThread()185   int notifyThread() {
186     char buffer = '0';
187     if (android::base::socketSend(notification_write_fd_, &buffer, 1) < 0) {
188       derror("%s: Unable to send message to reading thread", __func__);
189       return -1;
190     }
191     return 0;
192   }
193 
setUpFileDescriptorSet(SocketWaiter * read_fds)194   void setUpFileDescriptorSet(SocketWaiter* read_fds) {
195     // add comm channel to the set
196     read_fds->update(notification_listen_fd_, SocketWaiter::Event::kEventRead);
197 
198     // add watched FDs to the set
199     {
200       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
201       for (auto& fdp : watched_shared_fds_) {
202         read_fds->update(fdp.first, SocketWaiter::Event::kEventRead);
203       }
204     }
205   }
206 
207   // check the comm channel and read everything there
consumeThreadNotifications(SocketWaiter * read_fds)208   bool consumeThreadNotifications(SocketWaiter* read_fds) {
209     if (read_fds->pendingEventsFor(notification_listen_fd_)) {
210       char buffer[kNotificationBufferSize];
211       while (HANDLE_EINTR(android::base::socketRecv(notification_listen_fd_, buffer, kNotificationBufferSize)) ==
212              kNotificationBufferSize) {
213       }
214       return true;
215     }
216     return false;
217   }
218 
219   // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(SocketWaiter * read_fds)220   void runAppropriateCallbacks(SocketWaiter* read_fds) {
221     // not a good idea to call a callback while holding the FD lock,
222     // nor to release the lock while traversing the map
223     std::vector<decltype(watched_shared_fds_)::value_type> fds;
224     std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
225     for (auto& fdc : watched_shared_fds_) {
226       auto pending = read_fds->pendingEventsFor(fdc.first);
227       if (pending == SocketWaiter::kEventRead) {
228         fds.push_back(fdc);
229       }
230     }
231 
232     for (auto& p : fds) {
233       p.second(p.first);
234     }
235 
236   }
237 
ThreadRoutine()238   void ThreadRoutine() {
239     auto read_fds = std::unique_ptr<SocketWaiter>(SocketWaiter::create());
240     while (running_) {
241       read_fds->reset();
242       setUpFileDescriptorSet(read_fds.get());
243 
244       // wait until there is data available to read on some FD
245       int retval = read_fds->wait(std::numeric_limits<int64_t>::max());
246       if (retval <= 0) {  // there was some error or a timeout
247         derror(
248             "%s: There was an error while waiting for data on the file "
249             "descriptors: %s",
250             __func__, strerror(errno));
251         continue;
252       }
253 
254       consumeThreadNotifications(read_fds.get());
255 
256       // Do not read if there was a call to stop running
257       if (!running_) {
258         break;
259       }
260 
261       runAppropriateCallbacks(read_fds.get());
262     }
263   }
264 
265   std::atomic_bool running_{false};
266   std::thread thread_;
267   std::recursive_mutex internal_mutex_;
268 
269 
270   //android::base::SocketWaiter socket_waiter_;
271   std::map<int, ReadCallback> watched_shared_fds_;
272 
273   // A pair of FD to send information to the reading thread
274   int notification_listen_fd_{};
275   int notification_write_fd_{};
276 };
277 
278 // Async task manager implementation
279 class AsyncManager::AsyncTaskManager {
280  public:
GetNextUserId()281   AsyncUserId GetNextUserId() { return lastUserId_++; }
282 
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)283   AsyncTaskId ExecAsync(AsyncUserId user_id, std::chrono::milliseconds delay,
284                         const TaskCallback& callback) {
285     return scheduleTask(std::make_shared<Task>(
286         std::chrono::steady_clock::now() + delay, callback, user_id));
287   }
288 
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)289   AsyncTaskId ExecAsyncPeriodically(AsyncUserId user_id,
290                                     std::chrono::milliseconds delay,
291                                     std::chrono::milliseconds period,
292                                     const TaskCallback& callback) {
293     return scheduleTask(std::make_shared<Task>(
294         std::chrono::steady_clock::now() + delay, period, callback, user_id));
295   }
296 
CancelAsyncTask(AsyncTaskId async_task_id)297   bool CancelAsyncTask(AsyncTaskId async_task_id) {
298     // remove task from queue (and task id association) while holding lock
299     std::unique_lock<std::mutex> guard(internal_mutex_);
300     return cancel_task_with_lock_held(async_task_id);
301   }
302 
CancelAsyncTasksFromUser(AsyncUserId user_id)303   bool CancelAsyncTasksFromUser(AsyncUserId user_id) {
304     // remove task from queue (and task id association) while holding lock
305     std::unique_lock<std::mutex> guard(internal_mutex_);
306     if (tasks_by_user_id_.count(user_id) == 0) {
307       return false;
308     }
309     for (auto task : tasks_by_user_id_[user_id]) {
310       cancel_task_with_lock_held(task);
311     }
312     tasks_by_user_id_.erase(user_id);
313     return true;
314   }
315 
Synchronize(const CriticalCallback & critical)316   void Synchronize(const CriticalCallback& critical) {
317     std::unique_lock<std::mutex> guard(synchronization_mutex_);
318     critical();
319   }
320 
321   AsyncTaskManager() = default;
322   AsyncTaskManager(const AsyncTaskManager&) = delete;
323   AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
324 
325   ~AsyncTaskManager() = default;
326 
stopThread()327   int stopThread() {
328     {
329       std::unique_lock<std::mutex> guard(internal_mutex_);
330       tasks_by_id_.clear();
331       task_queue_.clear();
332       if (!running_) {
333         return 0;
334       }
335       running_ = false;
336       // notify the thread
337       internal_cond_var_.notify_one();
338     }  // release the lock before joining a thread that is likely waiting for it
339     if (std::this_thread::get_id() != thread_.get_id()) {
340       thread_.join();
341     } else {
342       dwarning("%s: Starting thread stop from inside the task thread itself", __func__);
343     }
344     return 0;
345   }
346 
347  private:
348   // Holds the data for each task
349   class Task {
350    public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback,AsyncUserId user)351     Task(std::chrono::steady_clock::time_point time,
352          std::chrono::milliseconds period, const TaskCallback& callback,
353          AsyncUserId user)
354         : time(time),
355           periodic(true),
356           period(period),
357           callback(callback),
358           task_id(kInvalidTaskId),
359           user_id(user) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback,AsyncUserId user)360     Task(std::chrono::steady_clock::time_point time,
361          const TaskCallback& callback, AsyncUserId user)
362         : time(time),
363           periodic(false),
364           callback(callback),
365           task_id(kInvalidTaskId),
366           user_id(user) {}
367 
368     // Operators needed to be in a collection
operator <(const Task & another) const369     bool operator<(const Task& another) const {
370       return std::make_pair(time, task_id) < std::make_pair(another.time, another.task_id);
371     }
372 
isPeriodic() const373     bool isPeriodic() const {
374       return periodic;
375     }
376 
377     // These fields should no longer be public if the class ever becomes
378     // public or gets more complex
379     std::chrono::steady_clock::time_point time;
380     bool periodic;
381     std::chrono::milliseconds period{};
382     std::mutex in_callback; // Taken when the callback is active
383     TaskCallback callback;
384     AsyncTaskId task_id;
385     AsyncUserId user_id;
386   };
387 
388   // A comparator class to put shared pointers to tasks in an ordered set
389   struct task_p_comparator {
operator ()rootcanal::AsyncManager::AsyncTaskManager::task_p_comparator390     bool operator()(const std::shared_ptr<Task>& t1, const std::shared_ptr<Task>& t2) const {
391       return *t1 < *t2;
392     }
393   };
394 
cancel_task_with_lock_held(AsyncTaskId async_task_id)395   bool cancel_task_with_lock_held(AsyncTaskId async_task_id) {
396     if (tasks_by_id_.count(async_task_id) == 0) {
397       return false;
398     }
399 
400     // Now make sure we are not running this task.
401     // 2 cases
402     // - This is called from thread_, this means a scheduled task is actually
403     //   unregistering.
404     // - Another thread is calling us, let's make sure the task is not active.
405     if (thread_.get_id() != std::this_thread::get_id()) {
406       auto task = tasks_by_id_[async_task_id];
407       const std::lock_guard<std::mutex> lock(task->in_callback);
408       task_queue_.erase(task);
409       tasks_by_id_.erase(async_task_id);
410     } else {
411       task_queue_.erase(tasks_by_id_[async_task_id]);
412       tasks_by_id_.erase(async_task_id);
413     }
414 
415     return true;
416   }
417 
scheduleTask(const std::shared_ptr<Task> & task)418   AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
419     {
420       std::unique_lock<std::mutex> guard(internal_mutex_);
421       // no more room for new tasks, we need a larger type for IDs
422       if (tasks_by_id_.size() == kMaxTaskId)  // TODO potentially type unsafe
423         return kInvalidTaskId;
424       do {
425         lastTaskId_ = NextAsyncTaskId(lastTaskId_);
426       } while (isTaskIdInUse(lastTaskId_));
427       task->task_id = lastTaskId_;
428       // add task to the queue and map
429       tasks_by_id_[lastTaskId_] = task;
430       tasks_by_user_id_[task->user_id].insert(task->task_id);
431       task_queue_.insert(task);
432     }
433     // start thread if necessary
434     int started = tryStartThread();
435     if (started != 0) {
436       derror("%s: Unable to start thread", __func__);
437       return kInvalidTaskId;
438     }
439     // notify the thread so that it knows of the new task
440     internal_cond_var_.notify_one();
441     // return task id
442     return task->task_id;
443   }
444 
isTaskIdInUse(const AsyncTaskId & task_id) const445   bool isTaskIdInUse(const AsyncTaskId& task_id) const {
446     return tasks_by_id_.count(task_id) != 0;
447   }
448 
tryStartThread()449   int tryStartThread() {
450     // need the lock because of the running flag and the cond var
451     std::unique_lock<std::mutex> guard(internal_mutex_);
452     // check that the thread is not yet running
453     if (running_) {
454       return 0;
455     }
456     // start the thread
457     running_ = true;
458     thread_ = std::thread([this]() { ThreadRoutine(); });
459     if (!thread_.joinable()) {
460       derror("%s: Unable to start task thread", __func__);
461       return -1;
462     }
463     return 0;
464   }
465 
ThreadRoutine()466   void ThreadRoutine() {
467     while (running_) {
468       TaskCallback callback;
469       std::shared_ptr<Task> task_p;
470       bool run_it = false;
471       {
472         std::unique_lock<std::mutex> guard(internal_mutex_);
473         if (!task_queue_.empty()) {
474           task_p = *(task_queue_.begin());
475           if (task_p->time < std::chrono::steady_clock::now()) {
476             run_it = true;
477             callback = task_p->callback;
478             task_queue_.erase(task_p);  // need to remove and add again if
479                                         // periodic to update order
480             if (task_p->isPeriodic()) {
481               task_p->time += task_p->period;
482               task_queue_.insert(task_p);
483             } else {
484               tasks_by_user_id_[task_p->user_id].erase(task_p->task_id);
485               tasks_by_id_.erase(task_p->task_id);
486             }
487           }
488         }
489       }
490       if (run_it) {
491         const std::lock_guard<std::mutex> lock(task_p->in_callback);
492         Synchronize(callback);
493       }
494       {
495         std::unique_lock<std::mutex> guard(internal_mutex_);
496         // check for termination right before waiting
497         if (!running_) break;
498         // wait until time for the next task (if any)
499         if (task_queue_.size() > 0) {
500           // Make a copy of the time_point because wait_until takes a reference
501           // to it and may read it after waiting, by which time the task may
502           // have been freed (e.g. via CancelAsyncTask).
503           std::chrono::steady_clock::time_point time =
504               (*task_queue_.begin())->time;
505           internal_cond_var_.wait_until(guard, time);
506         } else {
507           internal_cond_var_.wait(guard);
508         }
509       }
510     }
511   }
512 
513   bool running_ = false;
514   std::thread thread_;
515   std::mutex internal_mutex_;
516   std::mutex synchronization_mutex_;
517   std::condition_variable internal_cond_var_;
518 
519   AsyncTaskId lastTaskId_ = kInvalidTaskId;
520   AsyncUserId lastUserId_{1};
521   std::map<AsyncTaskId, std::shared_ptr<Task> > tasks_by_id_;
522   std::map<AsyncUserId, std::set<AsyncTaskId>> tasks_by_user_id_;
523   std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
524 };
525 
526 // Async Manager Implementation:
AsyncManager()527 AsyncManager::AsyncManager() : fdWatcher_p_(new AsyncFdWatcher()), taskManager_p_(new AsyncTaskManager()) {}
528 
~AsyncManager()529 AsyncManager::~AsyncManager() {
530   // Make sure the threads are stopped before destroying the object.
531   // The threads need to be stopped here and not in each internal class'
532   // destructor because unique_ptr's reset() first assigns nullptr to the
533   // pointer and only then calls the destructor, so any callback running
534   // on these threads would dereference a null pointer if they called a member
535   // function of this class.
536   fdWatcher_p_->stopThread();
537   taskManager_p_->stopThread();
538 }
539 
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)540 int AsyncManager::WatchFdForNonBlockingReads(int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
541   return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor, on_read_fd_ready_callback);
542 }
543 
StopWatchingFileDescriptor(int file_descriptor)544 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
545   fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
546 }
547 
GetNextUserId()548 AsyncUserId AsyncManager::GetNextUserId() {
549   return taskManager_p_->GetNextUserId();
550 }
551 
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)552 AsyncTaskId AsyncManager::ExecAsync(AsyncUserId user_id,
553                                     std::chrono::milliseconds delay,
554                                     const TaskCallback& callback) {
555   return taskManager_p_->ExecAsync(user_id, delay, callback);
556 }
557 
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)558 AsyncTaskId AsyncManager::ExecAsyncPeriodically(
559     AsyncUserId user_id, std::chrono::milliseconds delay,
560     std::chrono::milliseconds period, const TaskCallback& callback) {
561   return taskManager_p_->ExecAsyncPeriodically(user_id, delay, period,
562                                                callback);
563 }
564 
CancelAsyncTask(AsyncTaskId async_task_id)565 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
566   return taskManager_p_->CancelAsyncTask(async_task_id);
567 }
568 
CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id)569 bool AsyncManager::CancelAsyncTasksFromUser(
570     rootcanal::AsyncUserId user_id) {
571   return taskManager_p_->CancelAsyncTasksFromUser(user_id);
572 }
573 
Synchronize(const CriticalCallback & critical)574 void AsyncManager::Synchronize(const CriticalCallback& critical) {
575   taskManager_p_->Synchronize(critical);
576 }
577 }  // namespace rootcanal
578