1 /*
2 * Copyright 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "async_manager.h"
18
19 #include <algorithm>
20 #include <atomic>
21 #include <condition_variable>
22 #include <mutex>
23 #include <thread>
24 #include <vector>
25
26 #include "fcntl.h"
27 #include "log.h"
28 #include "sys/select.h"
29 #include "unistd.h"
30
31 namespace rootcanal {
32 // Implementation of AsyncManager is divided between two classes, three if
33 // AsyncManager itself is taken into account, but its only responsability
34 // besides being a proxy for the other two classes is to provide a global
35 // synchronization mechanism for callbacks and client code to use.
36
37 // The watching of file descriptors is done through AsyncFdWatcher. Several
38 // objects of this class may coexist simultaneosly as they share no state.
39 // After construction of this objects nothing happens beyond some very simple
40 // member initialization. When the first FD is set up for watching the object
41 // starts a new thread which watches the given (and later provided) FDs using
42 // select() inside a loop. A special FD (a pipe) is also watched which is
43 // used to notify the thread of internal changes on the object state (like
44 // the addition of new FDs to watch on). Every access to internal state is
45 // synchronized using a single internal mutex. The thread is only stopped on
46 // destruction of the object, by modifying a flag, which is the only member
47 // variable accessed without acquiring the lock (because the notification to
48 // the thread is done later by writing to a pipe which means the thread will
49 // be notified regardless of what phase of the loop it is in that moment)
50
51 // The scheduling of asynchronous tasks, periodic or not, is handled by the
52 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
53 // state between different instances so it is safe to use several objects of
54 // this class, also nothing interesting happens upon construction, but only
55 // after a Task has been scheduled and access to internal state is synchronized
56 // using a single internal mutex. When the first task is scheduled a thread
57 // is started which monitors a queue of tasks. The queue is peeked to see
58 // when the next task should be carried out and then the thread performs a
59 // (absolute) timed wait on a condition variable. The wait ends because of a
60 // time out or a notify on the cond var, the former means a task is due
61 // for execution while the later means there has been a change in internal
62 // state, like a task has been scheduled/canceled or the flag to stop has
63 // been set. Setting and querying the stop flag or modifying the task queue
64 // and subsequent notification on the cond var is done atomically (e.g while
65 // holding the lock on the internal mutex) to ensure that the thread never
66 // misses the notification, since notifying a cond var is not persistent as
67 // writing on a pipe (if not done this way, the thread could query the
68 // stopping flag and be put aside by the OS scheduler right after, then the
69 // 'stop thread' procedure could run, setting the flag, notifying a cond
70 // var that no one is waiting on and joining the thread, the thread then
71 // resumes execution believing that it needs to continue and waits on the
72 // cond var possibly forever if there are no tasks scheduled, efectively
73 // causing a deadlock).
74
75 // This number also states the maximum number of scheduled tasks we can handle
76 // at a given time
77 static const uint16_t kMaxTaskId =
78 -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)79 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
80 return (id == kMaxTaskId) ? 1 : id + 1;
81 }
82 // The buffer is only 10 bytes because the expected number of bytes
83 // written on this socket is 1. It is possible that the thread is notified
84 // more than once but highly unlikely, so a buffer of size 10 seems enough
85 // and the reads are performed inside a while just in case it isn't. From
86 // the thread routine's point of view it is the same to have been notified
87 // just once or 100 times so it just tries to consume the entire buffer.
88 // In the cases where an interrupt would cause read to return without
89 // having read everything that was available a new iteration of the thread
90 // loop will bring execution to this point almost immediately, so there is
91 // no need to treat that case.
92 static const int kNotificationBufferSize = 10;
93
94 // Async File Descriptor Watcher Implementation:
95 class AsyncManager::AsyncFdWatcher {
96 public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)97 int WatchFdForNonBlockingReads(
98 int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
99 // add file descriptor and callback
100 {
101 std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
102 watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
103 }
104
105 // start the thread if not started yet
106 int started = tryStartThread();
107 if (started != 0) {
108 LOG_ERROR("%s: Unable to start thread", __func__);
109 return started;
110 }
111
112 // notify the thread so that it knows of the new FD
113 notifyThread();
114
115 return 0;
116 }
117
StopWatchingFileDescriptor(int file_descriptor)118 void StopWatchingFileDescriptor(int file_descriptor) {
119 std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
120 watched_shared_fds_.erase(file_descriptor);
121 }
122
123 AsyncFdWatcher() = default;
124 AsyncFdWatcher(const AsyncFdWatcher&) = delete;
125 AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
126
127 ~AsyncFdWatcher() = default;
128
stopThread()129 int stopThread() {
130 if (!std::atomic_exchange(&running_, false)) {
131 return 0; // if not running already
132 }
133
134 notifyThread();
135
136 if (std::this_thread::get_id() != thread_.get_id()) {
137 thread_.join();
138 } else {
139 LOG_WARN("%s: Starting thread stop from inside the reading thread itself",
140 __func__);
141 }
142
143 {
144 std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
145 watched_shared_fds_.clear();
146 }
147
148 return 0;
149 }
150
151 private:
152 // Make sure to call this with at least one file descriptor ready to be
153 // watched upon or the thread routine will return immediately
tryStartThread()154 int tryStartThread() {
155 if (std::atomic_exchange(&running_, true)) {
156 return 0; // if already running
157 }
158 // set up the communication channel
159 int pipe_fds[2];
160 if (pipe2(pipe_fds, O_NONBLOCK)) {
161 LOG_ERROR(
162 "%s:Unable to establish a communication channel to the reading "
163 "thread",
164 __func__);
165 return -1;
166 }
167 notification_listen_fd_ = pipe_fds[0];
168 notification_write_fd_ = pipe_fds[1];
169
170 thread_ = std::thread([this]() { ThreadRoutine(); });
171 if (!thread_.joinable()) {
172 LOG_ERROR("%s: Unable to start reading thread", __func__);
173 return -1;
174 }
175 return 0;
176 }
177
notifyThread()178 int notifyThread() {
179 char buffer = '0';
180 if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
181 LOG_ERROR("%s: Unable to send message to reading thread", __func__);
182 return -1;
183 }
184 return 0;
185 }
186
setUpFileDescriptorSet(fd_set & read_fds)187 int setUpFileDescriptorSet(fd_set& read_fds) {
188 // add comm channel to the set
189 FD_SET(notification_listen_fd_, &read_fds);
190 int nfds = notification_listen_fd_;
191
192 // add watched FDs to the set
193 {
194 std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
195 for (auto& fdp : watched_shared_fds_) {
196 FD_SET(fdp.first, &read_fds);
197 nfds = std::max(fdp.first, nfds);
198 }
199 }
200 return nfds;
201 }
202
203 // check the comm channel and read everything there
consumeThreadNotifications(fd_set & read_fds)204 bool consumeThreadNotifications(fd_set& read_fds) {
205 if (FD_ISSET(notification_listen_fd_, &read_fds)) {
206 char buffer[kNotificationBufferSize];
207 while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer,
208 kNotificationBufferSize)) ==
209 kNotificationBufferSize) {
210 }
211 return true;
212 }
213 return false;
214 }
215
216 // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(fd_set & read_fds)217 void runAppropriateCallbacks(fd_set& read_fds) {
218 std::vector<decltype(watched_shared_fds_)::value_type> fds;
219 std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
220 for (auto& fdc : watched_shared_fds_) {
221 if (FD_ISSET(fdc.first, &read_fds)) {
222 fds.push_back(fdc);
223 }
224 }
225 for (auto& p : fds) {
226 p.second(p.first);
227 }
228 }
229
ThreadRoutine()230 void ThreadRoutine() {
231 while (running_) {
232 fd_set read_fds;
233 FD_ZERO(&read_fds);
234 int nfds = setUpFileDescriptorSet(read_fds);
235
236 // wait until there is data available to read on some FD
237 int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
238 if (retval <= 0) { // there was some error or a timeout
239 LOG_ERROR(
240 "%s: There was an error while waiting for data on the file "
241 "descriptors: %s",
242 __func__, strerror(errno));
243 continue;
244 }
245
246 consumeThreadNotifications(read_fds);
247
248 // Do not read if there was a call to stop running
249 if (!running_) {
250 break;
251 }
252
253 runAppropriateCallbacks(read_fds);
254 }
255 }
256
257 std::atomic_bool running_{false};
258 std::thread thread_;
259 std::recursive_mutex internal_mutex_;
260
261 std::map<int, ReadCallback> watched_shared_fds_;
262
263 // A pair of FD to send information to the reading thread
264 int notification_listen_fd_{};
265 int notification_write_fd_{};
266 };
267
268 // Async task manager implementation
269 class AsyncManager::AsyncTaskManager {
270 public:
GetNextUserId()271 AsyncUserId GetNextUserId() { return lastUserId_++; }
272
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)273 AsyncTaskId ExecAsync(AsyncUserId user_id, std::chrono::milliseconds delay,
274 const TaskCallback& callback) {
275 return scheduleTask(std::make_shared<Task>(
276 std::chrono::steady_clock::now() + delay, callback, user_id));
277 }
278
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)279 AsyncTaskId ExecAsyncPeriodically(AsyncUserId user_id,
280 std::chrono::milliseconds delay,
281 std::chrono::milliseconds period,
282 const TaskCallback& callback) {
283 return scheduleTask(std::make_shared<Task>(
284 std::chrono::steady_clock::now() + delay, period, callback, user_id));
285 }
286
CancelAsyncTask(AsyncTaskId async_task_id)287 bool CancelAsyncTask(AsyncTaskId async_task_id) {
288 // remove task from queue (and task id association) while holding lock
289 std::unique_lock<std::mutex> guard(internal_mutex_);
290 return cancel_task_with_lock_held(async_task_id);
291 }
292
CancelAsyncTasksFromUser(AsyncUserId user_id)293 bool CancelAsyncTasksFromUser(AsyncUserId user_id) {
294 // remove task from queue (and task id association) while holding lock
295 std::unique_lock<std::mutex> guard(internal_mutex_);
296 if (tasks_by_user_id_.count(user_id) == 0) {
297 return false;
298 }
299 for (auto task : tasks_by_user_id_[user_id]) {
300 cancel_task_with_lock_held(task);
301 }
302 tasks_by_user_id_.erase(user_id);
303 return true;
304 }
305
306 AsyncTaskManager() = default;
307 AsyncTaskManager(const AsyncTaskManager&) = delete;
308 AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
309
310 ~AsyncTaskManager() = default;
311
stopThread()312 int stopThread() {
313 {
314 std::unique_lock<std::mutex> guard(internal_mutex_);
315 tasks_by_id_.clear();
316 task_queue_.clear();
317 if (!running_) {
318 return 0;
319 }
320 running_ = false;
321 // notify the thread
322 internal_cond_var_.notify_one();
323 } // release the lock before joining a thread that is likely waiting for it
324 if (std::this_thread::get_id() != thread_.get_id()) {
325 thread_.join();
326 } else {
327 LOG_WARN("%s: Starting thread stop from inside the task thread itself",
328 __func__);
329 }
330 return 0;
331 }
332
333 private:
334 // Holds the data for each task
335 class Task {
336 public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback,AsyncUserId user)337 Task(std::chrono::steady_clock::time_point time,
338 std::chrono::milliseconds period, const TaskCallback& callback,
339 AsyncUserId user)
340 : time(time),
341 periodic(true),
342 period(period),
343 callback(callback),
344 task_id(kInvalidTaskId),
345 user_id(user) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback,AsyncUserId user)346 Task(std::chrono::steady_clock::time_point time,
347 const TaskCallback& callback, AsyncUserId user)
348 : time(time),
349 periodic(false),
350 callback(callback),
351 task_id(kInvalidTaskId),
352 user_id(user) {}
353
354 // Operators needed to be in a collection
operator <(const Task & another) const355 bool operator<(const Task& another) const {
356 return std::make_pair(time, task_id) <
357 std::make_pair(another.time, another.task_id);
358 }
359
isPeriodic() const360 bool isPeriodic() const { return periodic; }
361
362 // These fields should no longer be public if the class ever becomes
363 // public or gets more complex
364 std::chrono::steady_clock::time_point time;
365 bool periodic;
366 std::chrono::milliseconds period{};
367 std::mutex in_callback; // Taken when the callback is active
368 TaskCallback callback;
369 AsyncTaskId task_id;
370 AsyncUserId user_id;
371 };
372
373 // A comparator class to put shared pointers to tasks in an ordered set
374 struct task_p_comparator {
operator ()rootcanal::AsyncManager::AsyncTaskManager::task_p_comparator375 bool operator()(const std::shared_ptr<Task>& t1,
376 const std::shared_ptr<Task>& t2) const {
377 return *t1 < *t2;
378 }
379 };
380
cancel_task_with_lock_held(AsyncTaskId async_task_id)381 bool cancel_task_with_lock_held(AsyncTaskId async_task_id) {
382 if (tasks_by_id_.count(async_task_id) == 0) {
383 return false;
384 }
385
386 // Now make sure we are not running this task.
387 // 2 cases:
388 // - This is called from thread_, this means a running
389 // scheduled task is actually unregistering. All bets are off.
390 // - Another thread is calling us, let's make sure the task is not active.
391 if (thread_.get_id() != std::this_thread::get_id()) {
392 auto task = tasks_by_id_[async_task_id];
393 const std::lock_guard<std::mutex> lock(task->in_callback);
394 task_queue_.erase(task);
395 tasks_by_id_.erase(async_task_id);
396 } else {
397 task_queue_.erase(tasks_by_id_[async_task_id]);
398 tasks_by_id_.erase(async_task_id);
399 }
400
401 return true;
402 }
403
scheduleTask(const std::shared_ptr<Task> & task)404 AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
405 {
406 std::unique_lock<std::mutex> guard(internal_mutex_);
407 // no more room for new tasks, we need a larger type for IDs
408 if (tasks_by_id_.size() == kMaxTaskId) // TODO potentially type unsafe
409 return kInvalidTaskId;
410 do {
411 lastTaskId_ = NextAsyncTaskId(lastTaskId_);
412 } while (isTaskIdInUse(lastTaskId_));
413 task->task_id = lastTaskId_;
414 // add task to the queue and map
415 tasks_by_id_[lastTaskId_] = task;
416 tasks_by_user_id_[task->user_id].insert(task->task_id);
417 task_queue_.insert(task);
418 }
419 // start thread if necessary
420 int started = tryStartThread();
421 if (started != 0) {
422 LOG_ERROR("%s: Unable to start thread", __func__);
423 return kInvalidTaskId;
424 }
425 // notify the thread so that it knows of the new task
426 internal_cond_var_.notify_one();
427 // return task id
428 return task->task_id;
429 }
430
isTaskIdInUse(const AsyncTaskId & task_id) const431 bool isTaskIdInUse(const AsyncTaskId& task_id) const {
432 return tasks_by_id_.count(task_id) != 0;
433 }
434
tryStartThread()435 int tryStartThread() {
436 // need the lock because of the running flag and the cond var
437 std::unique_lock<std::mutex> guard(internal_mutex_);
438 // check that the thread is not yet running
439 if (running_) {
440 return 0;
441 }
442 // start the thread
443 running_ = true;
444 thread_ = std::thread([this]() { ThreadRoutine(); });
445 if (!thread_.joinable()) {
446 LOG_ERROR("%s: Unable to start task thread", __func__);
447 return -1;
448 }
449 return 0;
450 }
451
ThreadRoutine()452 void ThreadRoutine() {
453 while (running_) {
454 TaskCallback callback;
455 std::shared_ptr<Task> task_p;
456 bool run_it = false;
457 {
458 std::unique_lock<std::mutex> guard(internal_mutex_);
459 if (!task_queue_.empty()) {
460 task_p = *(task_queue_.begin());
461 if (task_p->time < std::chrono::steady_clock::now()) {
462 run_it = true;
463 callback = task_p->callback;
464 task_queue_.erase(task_p); // need to remove and add again if
465 // periodic to update order
466 if (task_p->isPeriodic()) {
467 task_p->time += task_p->period;
468 task_queue_.insert(task_p);
469 } else {
470 tasks_by_user_id_[task_p->user_id].erase(task_p->task_id);
471 tasks_by_id_.erase(task_p->task_id);
472 }
473 }
474 }
475 }
476 if (run_it) {
477 const std::lock_guard<std::mutex> lock(task_p->in_callback);
478 callback();
479 }
480 {
481 std::unique_lock<std::mutex> guard(internal_mutex_);
482 // check for termination right before waiting
483 if (!running_) break;
484 // wait until time for the next task (if any)
485 if (task_queue_.size() > 0) {
486 // Make a copy of the time_point because wait_until takes a reference
487 // to it and may read it after waiting, by which time the task may
488 // have been freed (e.g. via CancelAsyncTask).
489 std::chrono::steady_clock::time_point time =
490 (*task_queue_.begin())->time;
491 internal_cond_var_.wait_until(guard, time);
492 } else {
493 internal_cond_var_.wait(guard);
494 }
495 }
496 }
497 }
498
499 bool running_ = false;
500 std::thread thread_;
501 std::mutex internal_mutex_;
502 std::condition_variable internal_cond_var_;
503
504 AsyncTaskId lastTaskId_ = kInvalidTaskId;
505 AsyncUserId lastUserId_{1};
506 std::map<AsyncTaskId, std::shared_ptr<Task>> tasks_by_id_;
507 std::map<AsyncUserId, std::set<AsyncTaskId>> tasks_by_user_id_;
508 std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
509 };
510
511 // Async Manager Implementation:
AsyncManager()512 AsyncManager::AsyncManager()
513 : fdWatcher_p_(new AsyncFdWatcher()),
514 taskManager_p_(new AsyncTaskManager()) {}
515
~AsyncManager()516 AsyncManager::~AsyncManager() {
517 // Make sure the threads are stopped before destroying the object.
518 // The threads need to be stopped here and not in each internal class'
519 // destructor because unique_ptr's reset() first assigns nullptr to the
520 // pointer and only then calls the destructor, so any callback running
521 // on these threads would dereference a null pointer if they called a member
522 // function of this class.
523 fdWatcher_p_->stopThread();
524 taskManager_p_->stopThread();
525 }
526
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)527 int AsyncManager::WatchFdForNonBlockingReads(
528 int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
529 return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor,
530 on_read_fd_ready_callback);
531 }
532
StopWatchingFileDescriptor(int file_descriptor)533 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
534 fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
535 }
536
GetNextUserId()537 AsyncUserId AsyncManager::GetNextUserId() {
538 return taskManager_p_->GetNextUserId();
539 }
540
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)541 AsyncTaskId AsyncManager::ExecAsync(AsyncUserId user_id,
542 std::chrono::milliseconds delay,
543 const TaskCallback& callback) {
544 return taskManager_p_->ExecAsync(user_id, delay, callback);
545 }
546
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)547 AsyncTaskId AsyncManager::ExecAsyncPeriodically(
548 AsyncUserId user_id, std::chrono::milliseconds delay,
549 std::chrono::milliseconds period, const TaskCallback& callback) {
550 return taskManager_p_->ExecAsyncPeriodically(user_id, delay, period,
551 callback);
552 }
553
CancelAsyncTask(AsyncTaskId async_task_id)554 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
555 return taskManager_p_->CancelAsyncTask(async_task_id);
556 }
557
CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id)558 bool AsyncManager::CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id) {
559 return taskManager_p_->CancelAsyncTasksFromUser(user_id);
560 }
561
Synchronize(const CriticalCallback & critical)562 void AsyncManager::Synchronize(const CriticalCallback& critical) {
563 std::unique_lock<std::mutex> guard(synchronization_mutex_);
564 critical();
565 }
566 } // namespace rootcanal
567