• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "async_manager.h"
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <condition_variable>
22 #include <mutex>
23 #include <thread>
24 #include <vector>
25 
26 #include "fcntl.h"
27 #include "log.h"
28 #include "sys/select.h"
29 #include "unistd.h"
30 
31 namespace rootcanal {
32 // Implementation of AsyncManager is divided between two classes, three if
33 // AsyncManager itself is taken into account, but its only responsability
34 // besides being a proxy for the other two classes is to provide a global
35 // synchronization mechanism for callbacks and client code to use.
36 
37 // The watching of file descriptors is done through AsyncFdWatcher. Several
38 // objects of this class may coexist simultaneosly as they share no state.
39 // After construction of this objects nothing happens beyond some very simple
40 // member initialization. When the first FD is set up for watching the object
41 // starts a new thread which watches the given (and later provided) FDs using
42 // select() inside a loop. A special FD (a pipe) is also watched which is
43 // used to notify the thread of internal changes on the object state (like
44 // the addition of new FDs to watch on). Every access to internal state is
45 // synchronized using a single internal mutex. The thread is only stopped on
46 // destruction of the object, by modifying a flag, which is the only member
47 // variable accessed without acquiring the lock (because the notification to
48 // the thread is done later by writing to a pipe which means the thread will
49 // be notified regardless of what phase of the loop it is in that moment)
50 
51 // The scheduling of asynchronous tasks, periodic or not, is handled by the
52 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
53 // state between different instances so it is safe to use several objects of
54 // this class, also nothing interesting happens upon construction, but only
55 // after a Task has been scheduled and access to internal state is synchronized
56 // using a single internal mutex. When the first task is scheduled a thread
57 // is started which monitors a queue of tasks. The queue is peeked to see
58 // when the next task should be carried out and then the thread performs a
59 // (absolute) timed wait on a condition variable. The wait ends because of a
60 // time out or a notify on the cond var, the former means a task is due
61 // for execution while the later means there has been a change in internal
62 // state, like a task has been scheduled/canceled or the flag to stop has
63 // been set. Setting and querying the stop flag or modifying the task queue
64 // and subsequent notification on the cond var is done atomically (e.g while
65 // holding the lock on the internal mutex) to ensure that the thread never
66 // misses the notification, since notifying a cond var is not persistent as
67 // writing on a pipe (if not done this way, the thread could query the
68 // stopping flag and be put aside by the OS scheduler right after, then the
69 // 'stop thread' procedure could run, setting the flag, notifying a cond
70 // var that no one is waiting on and joining the thread, the thread then
71 // resumes execution believing that it needs to continue and waits on the
72 // cond var possibly forever if there are no tasks scheduled, efectively
73 // causing a deadlock).
74 
75 // This number also states the maximum number of scheduled tasks we can handle
76 // at a given time
77 static const uint16_t kMaxTaskId =
78     -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)79 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
80   return (id == kMaxTaskId) ? 1 : id + 1;
81 }
82 // The buffer is only 10 bytes because the expected number of bytes
83 // written on this socket is 1. It is possible that the thread is notified
84 // more than once but highly unlikely, so a buffer of size 10 seems enough
85 // and the reads are performed inside a while just in case it isn't. From
86 // the thread routine's point of view it is the same to have been notified
87 // just once or 100 times so it just tries to consume the entire buffer.
88 // In the cases where an interrupt would cause read to return without
89 // having read everything that was available a new iteration of the thread
90 // loop will bring execution to this point almost immediately, so there is
91 // no need to treat that case.
92 static const int kNotificationBufferSize = 10;
93 
94 // Async File Descriptor Watcher Implementation:
95 class AsyncManager::AsyncFdWatcher {
96  public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)97   int WatchFdForNonBlockingReads(
98       int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
99     // add file descriptor and callback
100     {
101       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
102       watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
103     }
104 
105     // start the thread if not started yet
106     int started = tryStartThread();
107     if (started != 0) {
108       LOG_ERROR("%s: Unable to start thread", __func__);
109       return started;
110     }
111 
112     // notify the thread so that it knows of the new FD
113     notifyThread();
114 
115     return 0;
116   }
117 
StopWatchingFileDescriptor(int file_descriptor)118   void StopWatchingFileDescriptor(int file_descriptor) {
119     std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
120     watched_shared_fds_.erase(file_descriptor);
121   }
122 
123   AsyncFdWatcher() = default;
124   AsyncFdWatcher(const AsyncFdWatcher&) = delete;
125   AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
126 
127   ~AsyncFdWatcher() = default;
128 
stopThread()129   int stopThread() {
130     if (!std::atomic_exchange(&running_, false)) {
131       return 0;  // if not running already
132     }
133 
134     notifyThread();
135 
136     if (std::this_thread::get_id() != thread_.get_id()) {
137       thread_.join();
138     } else {
139       LOG_WARN("%s: Starting thread stop from inside the reading thread itself",
140                __func__);
141     }
142 
143     {
144       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
145       watched_shared_fds_.clear();
146     }
147 
148     return 0;
149   }
150 
151  private:
152   // Make sure to call this with at least one file descriptor ready to be
153   // watched upon or the thread routine will return immediately
tryStartThread()154   int tryStartThread() {
155     if (std::atomic_exchange(&running_, true)) {
156       return 0;  // if already running
157     }
158     // set up the communication channel
159     int pipe_fds[2];
160     if (pipe2(pipe_fds, O_NONBLOCK)) {
161       LOG_ERROR(
162           "%s:Unable to establish a communication channel to the reading "
163           "thread",
164           __func__);
165       return -1;
166     }
167     notification_listen_fd_ = pipe_fds[0];
168     notification_write_fd_ = pipe_fds[1];
169 
170     thread_ = std::thread([this]() { ThreadRoutine(); });
171     if (!thread_.joinable()) {
172       LOG_ERROR("%s: Unable to start reading thread", __func__);
173       return -1;
174     }
175     return 0;
176   }
177 
notifyThread() const178   int notifyThread() const {
179     char buffer = '0';
180     if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
181       LOG_ERROR("%s: Unable to send message to reading thread", __func__);
182       return -1;
183     }
184     return 0;
185   }
186 
setUpFileDescriptorSet(fd_set & read_fds)187   int setUpFileDescriptorSet(fd_set& read_fds) {
188     // add comm channel to the set
189     FD_SET(notification_listen_fd_, &read_fds);
190     int nfds = notification_listen_fd_;
191 
192     // add watched FDs to the set
193     {
194       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
195       for (auto& fdp : watched_shared_fds_) {
196         FD_SET(fdp.first, &read_fds);
197         nfds = std::max(fdp.first, nfds);
198       }
199     }
200     return nfds;
201   }
202 
203   // check the comm channel and read everything there
consumeThreadNotifications(fd_set & read_fds) const204   bool consumeThreadNotifications(fd_set& read_fds) const {
205     if (FD_ISSET(notification_listen_fd_, &read_fds)) {
206       char buffer[kNotificationBufferSize];
207       while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer,
208                                      kNotificationBufferSize)) ==
209              kNotificationBufferSize) {
210       }
211       return true;
212     }
213     return false;
214   }
215 
216   // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(fd_set & read_fds)217   void runAppropriateCallbacks(fd_set& read_fds) {
218     std::vector<decltype(watched_shared_fds_)::value_type> fds;
219     std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
220     for (auto& fdc : watched_shared_fds_) {
221       if (FD_ISSET(fdc.first, &read_fds)) {
222         fds.push_back(fdc);
223       }
224     }
225     for (auto& p : fds) {
226       p.second(p.first);
227     }
228   }
229 
ThreadRoutine()230   void ThreadRoutine() {
231     while (running_) {
232       fd_set read_fds;
233       FD_ZERO(&read_fds);
234       int nfds = setUpFileDescriptorSet(read_fds);
235 
236       // wait until there is data available to read on some FD
237       int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
238       if (retval <= 0) {  // there was some error or a timeout
239         LOG_ERROR(
240             "%s: There was an error while waiting for data on the file "
241             "descriptors: %s",
242             __func__, strerror(errno));
243         continue;
244       }
245 
246       consumeThreadNotifications(read_fds);
247 
248       // Do not read if there was a call to stop running
249       if (!running_) {
250         break;
251       }
252 
253       runAppropriateCallbacks(read_fds);
254     }
255   }
256 
257   std::atomic_bool running_{false};
258   std::thread thread_;
259   std::recursive_mutex internal_mutex_;
260 
261   std::map<int, ReadCallback> watched_shared_fds_;
262 
263   // A pair of FD to send information to the reading thread
264   int notification_listen_fd_{};
265   int notification_write_fd_{};
266 };
267 
268 // Async task manager implementation
269 class AsyncManager::AsyncTaskManager {
270  public:
GetNextUserId()271   AsyncUserId GetNextUserId() { return lastUserId_++; }
272 
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)273   AsyncTaskId ExecAsync(AsyncUserId user_id, std::chrono::milliseconds delay,
274                         const TaskCallback& callback) {
275     return scheduleTask(std::make_shared<Task>(
276         std::chrono::steady_clock::now() + delay, callback, user_id));
277   }
278 
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)279   AsyncTaskId ExecAsyncPeriodically(AsyncUserId user_id,
280                                     std::chrono::milliseconds delay,
281                                     std::chrono::milliseconds period,
282                                     const TaskCallback& callback) {
283     return scheduleTask(std::make_shared<Task>(
284         std::chrono::steady_clock::now() + delay, period, callback, user_id));
285   }
286 
CancelAsyncTask(AsyncTaskId async_task_id)287   bool CancelAsyncTask(AsyncTaskId async_task_id) {
288     // remove task from queue (and task id association) while holding lock
289     std::unique_lock<std::mutex> guard(internal_mutex_);
290     return cancel_task_with_lock_held(async_task_id);
291   }
292 
CancelAsyncTasksFromUser(AsyncUserId user_id)293   bool CancelAsyncTasksFromUser(AsyncUserId user_id) {
294     // remove task from queue (and task id association) while holding lock
295     std::unique_lock<std::mutex> guard(internal_mutex_);
296     if (tasks_by_user_id_.count(user_id) == 0) {
297       return false;
298     }
299     for (auto task : tasks_by_user_id_[user_id]) {
300       cancel_task_with_lock_held(task);
301     }
302     tasks_by_user_id_.erase(user_id);
303     return true;
304   }
305 
Synchronize(const CriticalCallback & critical)306   void Synchronize(const CriticalCallback& critical) {
307     std::unique_lock<std::mutex> guard(synchronization_mutex_);
308     critical();
309   }
310 
311   AsyncTaskManager() = default;
312   AsyncTaskManager(const AsyncTaskManager&) = delete;
313   AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
314 
315   ~AsyncTaskManager() = default;
316 
stopThread()317   int stopThread() {
318     {
319       std::unique_lock<std::mutex> guard(internal_mutex_);
320       tasks_by_id_.clear();
321       task_queue_.clear();
322       if (!running_) {
323         return 0;
324       }
325       running_ = false;
326       // notify the thread
327       internal_cond_var_.notify_one();
328     }  // release the lock before joining a thread that is likely waiting for it
329     if (std::this_thread::get_id() != thread_.get_id()) {
330       thread_.join();
331     } else {
332       LOG_WARN("%s: Starting thread stop from inside the task thread itself",
333                __func__);
334     }
335     return 0;
336   }
337 
338  private:
339   // Holds the data for each task
340   class Task {
341    public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback,AsyncUserId user)342     Task(std::chrono::steady_clock::time_point time,
343          std::chrono::milliseconds period, const TaskCallback& callback,
344          AsyncUserId user)
345         : time(time),
346           periodic(true),
347           period(period),
348           callback(callback),
349           task_id(kInvalidTaskId),
350           user_id(user) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback,AsyncUserId user)351     Task(std::chrono::steady_clock::time_point time,
352          const TaskCallback& callback, AsyncUserId user)
353         : time(time),
354           periodic(false),
355           callback(callback),
356           task_id(kInvalidTaskId),
357           user_id(user) {}
358 
359     // Operators needed to be in a collection
operator <(const Task & another) const360     bool operator<(const Task& another) const {
361       return std::make_pair(time, task_id) <
362              std::make_pair(another.time, another.task_id);
363     }
364 
isPeriodic() const365     bool isPeriodic() const { return periodic; }
366 
367     // These fields should no longer be public if the class ever becomes
368     // public or gets more complex
369     std::chrono::steady_clock::time_point time;
370     bool periodic;
371     std::chrono::milliseconds period{};
372     std::mutex in_callback; // Taken when the callback is active
373     TaskCallback callback;
374     AsyncTaskId task_id;
375     AsyncUserId user_id;
376   };
377 
378   // A comparator class to put shared pointers to tasks in an ordered set
379   struct task_p_comparator {
operator ()rootcanal::AsyncManager::AsyncTaskManager::task_p_comparator380     bool operator()(const std::shared_ptr<Task>& t1,
381                     const std::shared_ptr<Task>& t2) const {
382       return *t1 < *t2;
383     }
384   };
385 
cancel_task_with_lock_held(AsyncTaskId async_task_id)386   bool cancel_task_with_lock_held(AsyncTaskId async_task_id) {
387     if (tasks_by_id_.count(async_task_id) == 0) {
388       return false;
389     }
390 
391     // Now make sure we are not running this task.
392     // 2 cases:
393     // - This is called from thread_, this means a running
394     //   scheduled task is actually unregistering. All bets are off.
395     // - Another thread is calling us, let's make sure the task is not active.
396     if (thread_.get_id() != std::this_thread::get_id()) {
397       auto task = tasks_by_id_[async_task_id];
398       const std::lock_guard<std::mutex> lock(task->in_callback);
399       task_queue_.erase(task);
400       tasks_by_id_.erase(async_task_id);
401     } else {
402       task_queue_.erase(tasks_by_id_[async_task_id]);
403       tasks_by_id_.erase(async_task_id);
404     }
405 
406     return true;
407   }
408 
scheduleTask(const std::shared_ptr<Task> & task)409   AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
410     {
411       std::unique_lock<std::mutex> guard(internal_mutex_);
412       // no more room for new tasks, we need a larger type for IDs
413       if (tasks_by_id_.size() == kMaxTaskId) {  // TODO potentially type unsafe
414         return kInvalidTaskId;
415       }
416       do {
417         lastTaskId_ = NextAsyncTaskId(lastTaskId_);
418       } while (isTaskIdInUse(lastTaskId_));
419       task->task_id = lastTaskId_;
420       // add task to the queue and map
421       tasks_by_id_[lastTaskId_] = task;
422       tasks_by_user_id_[task->user_id].insert(task->task_id);
423       task_queue_.insert(task);
424     }
425     // start thread if necessary
426     int started = tryStartThread();
427     if (started != 0) {
428       LOG_ERROR("%s: Unable to start thread", __func__);
429       return kInvalidTaskId;
430     }
431     // notify the thread so that it knows of the new task
432     internal_cond_var_.notify_one();
433     // return task id
434     return task->task_id;
435   }
436 
isTaskIdInUse(const AsyncTaskId & task_id) const437   bool isTaskIdInUse(const AsyncTaskId& task_id) const {
438     return tasks_by_id_.count(task_id) != 0;
439   }
440 
tryStartThread()441   int tryStartThread() {
442     // need the lock because of the running flag and the cond var
443     std::unique_lock<std::mutex> guard(internal_mutex_);
444     // check that the thread is not yet running
445     if (running_) {
446       return 0;
447     }
448     // start the thread
449     running_ = true;
450     thread_ = std::thread([this]() { ThreadRoutine(); });
451     if (!thread_.joinable()) {
452       LOG_ERROR("%s: Unable to start task thread", __func__);
453       return -1;
454     }
455     return 0;
456   }
457 
ThreadRoutine()458   void ThreadRoutine() {
459     while (running_) {
460       TaskCallback callback;
461       std::shared_ptr<Task> task_p;
462       bool run_it = false;
463       {
464         std::unique_lock<std::mutex> guard(internal_mutex_);
465         if (!task_queue_.empty()) {
466           task_p = *(task_queue_.begin());
467           if (task_p->time < std::chrono::steady_clock::now()) {
468             run_it = true;
469             callback = task_p->callback;
470             task_queue_.erase(task_p);  // need to remove and add again if
471                                         // periodic to update order
472             if (task_p->isPeriodic()) {
473               task_p->time += task_p->period;
474               task_queue_.insert(task_p);
475             } else {
476               tasks_by_user_id_[task_p->user_id].erase(task_p->task_id);
477               tasks_by_id_.erase(task_p->task_id);
478             }
479           }
480         }
481       }
482       if (run_it) {
483         const std::lock_guard<std::mutex> lock(task_p->in_callback);
484         Synchronize(callback);
485       }
486       {
487         std::unique_lock<std::mutex> guard(internal_mutex_);
488         // check for termination right before waiting
489         if (!running_) {
490           break;
491         }
492         // wait until time for the next task (if any)
493         if (!task_queue_.empty()) {
494           // Make a copy of the time_point because wait_until takes a reference
495           // to it and may read it after waiting, by which time the task may
496           // have been freed (e.g. via CancelAsyncTask).
497           std::chrono::steady_clock::time_point time =
498               (*task_queue_.begin())->time;
499           internal_cond_var_.wait_until(guard, time);
500         } else {
501           internal_cond_var_.wait(guard);
502         }
503       }
504     }
505   }
506 
507   bool running_ = false;
508   std::thread thread_;
509   std::mutex internal_mutex_;
510   std::mutex synchronization_mutex_;
511   std::condition_variable internal_cond_var_;
512 
513   AsyncTaskId lastTaskId_ = kInvalidTaskId;
514   AsyncUserId lastUserId_{1};
515   std::map<AsyncTaskId, std::shared_ptr<Task>> tasks_by_id_;
516   std::map<AsyncUserId, std::set<AsyncTaskId>> tasks_by_user_id_;
517   std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
518 };
519 
520 // Async Manager Implementation:
AsyncManager()521 AsyncManager::AsyncManager()
522     : fdWatcher_p_(new AsyncFdWatcher()),
523       taskManager_p_(new AsyncTaskManager()) {}
524 
~AsyncManager()525 AsyncManager::~AsyncManager() {
526   // Make sure the threads are stopped before destroying the object.
527   // The threads need to be stopped here and not in each internal class'
528   // destructor because unique_ptr's reset() first assigns nullptr to the
529   // pointer and only then calls the destructor, so any callback running
530   // on these threads would dereference a null pointer if they called a member
531   // function of this class.
532   fdWatcher_p_->stopThread();
533   taskManager_p_->stopThread();
534 }
535 
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)536 int AsyncManager::WatchFdForNonBlockingReads(
537     int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
538   return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor,
539                                                   on_read_fd_ready_callback);
540 }
541 
StopWatchingFileDescriptor(int file_descriptor)542 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
543   fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
544 }
545 
GetNextUserId()546 AsyncUserId AsyncManager::GetNextUserId() {
547   return taskManager_p_->GetNextUserId();
548 }
549 
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)550 AsyncTaskId AsyncManager::ExecAsync(AsyncUserId user_id,
551                                     std::chrono::milliseconds delay,
552                                     const TaskCallback& callback) {
553   return taskManager_p_->ExecAsync(user_id, delay, callback);
554 }
555 
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)556 AsyncTaskId AsyncManager::ExecAsyncPeriodically(
557     AsyncUserId user_id, std::chrono::milliseconds delay,
558     std::chrono::milliseconds period, const TaskCallback& callback) {
559   return taskManager_p_->ExecAsyncPeriodically(user_id, delay, period,
560                                                callback);
561 }
562 
CancelAsyncTask(AsyncTaskId async_task_id)563 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
564   return taskManager_p_->CancelAsyncTask(async_task_id);
565 }
566 
CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id)567 bool AsyncManager::CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id) {
568   return taskManager_p_->CancelAsyncTasksFromUser(user_id);
569 }
570 
Synchronize(const CriticalCallback & critical)571 void AsyncManager::Synchronize(const CriticalCallback& critical) {
572   taskManager_p_->Synchronize(critical);
573 }
574 }  // namespace rootcanal
575