1 //
2 // Copyright 2016 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #define LOG_TAG "async_manager"
18
19 #include "async_manager.h"
20
21 #include <algorithm>
22 #include <atomic>
23 #include <condition_variable>
24 #include <mutex>
25 #include <thread>
26 #include <vector>
27 #include "fcntl.h"
28 #include "sys/select.h"
29 #include "unistd.h"
30
31 namespace test_vendor_lib {
32 // Implementation of AsyncManager is divided between two classes, three if
33 // AsyncManager itself is taken into account, but its only responsability
34 // besides being a proxy for the other two classes is to provide a global
35 // synchronization mechanism for callbacks and client code to use.
36
37 // The watching of file descriptors is done through AsyncFdWatcher. Several
38 // objects of this class may coexist simultaneosly as they share no state.
39 // After construction of this objects nothing happens beyond some very simple
40 // member initialization. When the first FD is set up for watching the object
41 // starts a new thread which watches the given (and later provided) FDs using
42 // select() inside a loop. A special FD (a pipe) is also watched which is
43 // used to notify the thread of internal changes on the object state (like
44 // the addition of new FDs to watch on). Every access to internal state is
45 // synchronized using a single internal mutex. The thread is only stopped on
46 // destruction of the object, by modifying a flag, which is the only member
47 // variable accessed without acquiring the lock (because the notification to
48 // the thread is done later by writing to a pipe which means the thread will
49 // be notified regardless of what phase of the loop it is in that moment)
50
51 // The scheduling of asynchronous tasks, periodic or not, is handled by the
52 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
53 // state between different instances so it is safe to use several objects of
54 // this class, also nothing interesting happens upon construction, but only
55 // after a Task has been scheduled and access to internal state is synchronized
56 // using a single internal mutex. When the first task is scheduled a thread
57 // is started which monitors a queue of tasks. The queue is peeked to see
58 // when the next task should be carried out and then the thread performs a
59 // (absolute) timed wait on a condition variable. The wait ends because of a
60 // time out or a notify on the cond var, the former means a task is due
61 // for execution while the later means there has been a change in internal
62 // state, like a task has been scheduled/canceled or the flag to stop has
63 // been set. Setting and querying the stop flag or modifying the task queue
64 // and subsequent notification on the cond var is done atomically (e.g while
65 // holding the lock on the internal mutex) to ensure that the thread never
66 // misses the notification, since notifying a cond var is not persistent as
67 // writing on a pipe (if not done this way, the thread could query the
68 // stopping flag and be put aside by the OS scheduler right after, then the
69 // 'stop thread' procedure could run, setting the flag, notifying a cond
70 // var that no one is waiting on and joining the thread, the thread then
71 // resumes execution believing that it needs to continue and waits on the
72 // cond var possibly forever if there are no tasks scheduled, efectively
73 // causing a deadlock).
74
75 // This number also states the maximum number of scheduled tasks we can handle
76 // at a given time
77 static const uint16_t kMaxTaskId =
78 -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)79 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
80 return (id == kMaxTaskId) ? 1 : id + 1;
81 }
82 // The buffer is only 10 bytes because the expected number of bytes
83 // written on this socket is 1. It is possible that the thread is notified
84 // more than once but highly unlikely, so a buffer of size 10 seems enough
85 // and the reads are performed inside a while just in case it isn't. From
86 // the thread routine's point of view it is the same to have been notified
87 // just once or 100 times so it just tries to consume the entire buffer.
88 // In the cases where an interrupt would cause read to return without
89 // having read everything that was available a new iteration of the thread
90 // loop will bring execution to this point almost immediately, so there is
91 // no need to treat that case.
92 static const int kNotificationBufferSize = 10;
93
94 // Async File Descriptor Watcher Implementation:
95 class AsyncManager::AsyncFdWatcher {
96 public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)97 int WatchFdForNonBlockingReads(
98 int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
99 // add file descriptor and callback
100 {
101 std::unique_lock<std::mutex> guard(internal_mutex_);
102 watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
103 }
104
105 // start the thread if not started yet
106 int started = tryStartThread();
107 if (started != 0) {
108 LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
109 return started;
110 }
111
112 // notify the thread so that it knows of the new FD
113 notifyThread();
114
115 return 0;
116 }
117
StopWatchingFileDescriptor(int file_descriptor)118 void StopWatchingFileDescriptor(int file_descriptor) {
119 std::unique_lock<std::mutex> guard(internal_mutex_);
120 watched_shared_fds_.erase(file_descriptor);
121 }
122
123 AsyncFdWatcher() = default;
124
125 ~AsyncFdWatcher() = default;
126
stopThread()127 int stopThread() {
128 if (!std::atomic_exchange(&running_, false)) {
129 return 0; // if not running already
130 }
131
132 notifyThread();
133
134 if (std::this_thread::get_id() != thread_.get_id()) {
135 thread_.join();
136 } else {
137 LOG_WARN(LOG_TAG,
138 "%s: Starting thread stop from inside the reading thread itself",
139 __func__);
140 }
141
142 {
143 std::unique_lock<std::mutex> guard(internal_mutex_);
144 watched_shared_fds_.clear();
145 }
146
147 return 0;
148 }
149
150 private:
151 AsyncFdWatcher(const AsyncFdWatcher&) = delete;
152 AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
153
154 // Make sure to call this with at least one file descriptor ready to be
155 // watched upon or the thread routine will return immediately
tryStartThread()156 int tryStartThread() {
157 if (std::atomic_exchange(&running_, true)) {
158 return 0; // if already running
159 }
160 // set up the communication channel
161 int pipe_fds[2];
162 if (pipe2(pipe_fds, O_NONBLOCK)) {
163 LOG_ERROR(LOG_TAG,
164 "%s:Unable to establish a communication channel to the reading "
165 "thread",
166 __func__);
167 return -1;
168 }
169 notification_listen_fd_ = pipe_fds[0];
170 notification_write_fd_ = pipe_fds[1];
171
172 thread_ = std::thread([this]() { ThreadRoutine(); });
173 if (!thread_.joinable()) {
174 LOG_ERROR(LOG_TAG, "%s: Unable to start reading thread", __func__);
175 return -1;
176 }
177 return 0;
178 }
179
notifyThread()180 int notifyThread() {
181 char buffer = '0';
182 if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
183 LOG_ERROR(LOG_TAG, "%s: Unable to send message to reading thread",
184 __func__);
185 return -1;
186 }
187 return 0;
188 }
189
setUpFileDescriptorSet(fd_set & read_fds)190 int setUpFileDescriptorSet(fd_set& read_fds) {
191 // add comm channel to the set
192 FD_SET(notification_listen_fd_, &read_fds);
193 int nfds = notification_listen_fd_;
194
195 // add watched FDs to the set
196 {
197 std::unique_lock<std::mutex> guard(internal_mutex_);
198 for (auto& fdp : watched_shared_fds_) {
199 FD_SET(fdp.first, &read_fds);
200 nfds = std::max(fdp.first, nfds);
201 }
202 }
203 return nfds;
204 }
205
206 // check the comm channel and read everything there
consumeThreadNotifications(fd_set & read_fds)207 bool consumeThreadNotifications(fd_set& read_fds) {
208 if (FD_ISSET(notification_listen_fd_, &read_fds)) {
209 char buffer[kNotificationBufferSize];
210 while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer,
211 kNotificationBufferSize)) ==
212 kNotificationBufferSize) {
213 }
214 return true;
215 }
216 return false;
217 }
218
219 // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(fd_set & read_fds)220 void runAppropriateCallbacks(fd_set& read_fds) {
221 // not a good idea to call a callback while holding the FD lock,
222 // nor to release the lock while traversing the map
223 std::vector<decltype(watched_shared_fds_)::value_type> fds;
224 {
225 std::unique_lock<std::mutex> guard(internal_mutex_);
226 for (auto& fdc : watched_shared_fds_) {
227 if (FD_ISSET(fdc.first, &read_fds)) {
228 fds.push_back(fdc);
229 }
230 }
231 }
232 for (auto& p : fds) {
233 p.second(p.first);
234 }
235 }
236
ThreadRoutine()237 void ThreadRoutine() {
238 while (running_) {
239 fd_set read_fds;
240 FD_ZERO(&read_fds);
241 int nfds = setUpFileDescriptorSet(read_fds);
242
243 // wait until there is data available to read on some FD
244 int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
245 if (retval <= 0) { // there was some error or a timeout
246 LOG_ERROR(LOG_TAG,
247 "%s: There was an error while waiting for data on the file "
248 "descriptors",
249 __func__);
250 continue;
251 }
252
253 consumeThreadNotifications(read_fds);
254
255 // Do not read if there was a call to stop running
256 if (!running_) {
257 break;
258 }
259
260 runAppropriateCallbacks(read_fds);
261 }
262 }
263
264 std::atomic_bool running_{false};
265 std::thread thread_;
266 std::mutex internal_mutex_;
267
268 std::map<int, ReadCallback> watched_shared_fds_;
269
270 // A pair of FD to send information to the reading thread
271 int notification_listen_fd_;
272 int notification_write_fd_;
273 };
274
275 // Async task manager implementation
276 class AsyncManager::AsyncTaskManager {
277 public:
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)278 AsyncTaskId ExecAsync(std::chrono::milliseconds delay,
279 const TaskCallback& callback) {
280 return scheduleTask(std::make_shared<Task>(
281 std::chrono::steady_clock::now() + delay, callback));
282 }
283
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)284 AsyncTaskId ExecAsyncPeriodically(std::chrono::milliseconds delay,
285 std::chrono::milliseconds period,
286 const TaskCallback& callback) {
287 return scheduleTask(std::make_shared<Task>(
288 std::chrono::steady_clock::now() + delay, period, callback));
289 }
290
CancelAsyncTask(AsyncTaskId async_task_id)291 bool CancelAsyncTask(AsyncTaskId async_task_id) {
292 // remove task from queue (and task id asociation) while holding lock
293 std::unique_lock<std::mutex> guard(internal_mutex_);
294 if (tasks_by_id.count(async_task_id) == 0) {
295 return false;
296 }
297 task_queue_.erase(tasks_by_id[async_task_id]);
298 tasks_by_id.erase(async_task_id);
299 return true;
300 }
301
302 AsyncTaskManager() = default;
303
304 ~AsyncTaskManager() = default;
305
stopThread()306 int stopThread() {
307 {
308 std::unique_lock<std::mutex> guard(internal_mutex_);
309 tasks_by_id.clear();
310 task_queue_.clear();
311 if (!running_) {
312 return 0;
313 }
314 running_ = false;
315 // notify the thread
316 internal_cond_var_.notify_one();
317 } // release the lock before joining a thread that is likely waiting for it
318 if (std::this_thread::get_id() != thread_.get_id()) {
319 thread_.join();
320 } else {
321 LOG_WARN(LOG_TAG,
322 "%s: Starting thread stop from inside the task thread itself",
323 __func__);
324 }
325 return 0;
326 }
327
328 private:
329 // Holds the data for each task
330 class Task {
331 public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback)332 Task(std::chrono::steady_clock::time_point time,
333 std::chrono::milliseconds period, const TaskCallback& callback)
334 : time(time),
335 periodic(true),
336 period(period),
337 callback(callback),
338 task_id(kInvalidTaskId) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback)339 Task(std::chrono::steady_clock::time_point time,
340 const TaskCallback& callback)
341 : time(time),
342 periodic(false),
343 callback(callback),
344 task_id(kInvalidTaskId) {}
345
346 // Operators needed to be in a collection
operator <(const Task & another) const347 bool operator<(const Task& another) const {
348 return std::make_pair(time, task_id) <
349 std::make_pair(another.time, another.task_id);
350 }
351
isPeriodic() const352 bool isPeriodic() const { return periodic; }
353
354 // These fields should no longer be public if the class ever becomes
355 // public or gets more complex
356 std::chrono::steady_clock::time_point time;
357 bool periodic;
358 std::chrono::milliseconds period;
359 TaskCallback callback;
360 AsyncTaskId task_id;
361 };
362
363 // A comparator class to put shared pointers to tasks in an ordered set
364 struct task_p_comparator {
operator ()test_vendor_lib::AsyncManager::AsyncTaskManager::task_p_comparator365 bool operator()(const std::shared_ptr<Task>& t1,
366 const std::shared_ptr<Task>& t2) const {
367 return *t1 < *t2;
368 }
369 };
370
371 AsyncTaskManager(const AsyncTaskManager&) = delete;
372 AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
373
scheduleTask(const std::shared_ptr<Task> & task)374 AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
375 AsyncTaskId task_id = kInvalidTaskId;
376 {
377 std::unique_lock<std::mutex> guard(internal_mutex_);
378 // no more room for new tasks, we need a larger type for IDs
379 if (tasks_by_id.size() == kMaxTaskId) // TODO potentially type unsafe
380 return kInvalidTaskId;
381 do {
382 lastTaskId_ = NextAsyncTaskId(lastTaskId_);
383 } while (isTaskIdInUse(lastTaskId_));
384 task->task_id = lastTaskId_;
385 // add task to the queue and map
386 tasks_by_id[lastTaskId_] = task;
387 task_queue_.insert(task);
388 task_id = lastTaskId_;
389 }
390 // start thread if necessary
391 int started = tryStartThread();
392 if (started != 0) {
393 LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
394 return kInvalidTaskId;
395 }
396 // notify the thread so that it knows of the new task
397 internal_cond_var_.notify_one();
398 // return task id
399 return task_id;
400 }
401
isTaskIdInUse(const AsyncTaskId & task_id) const402 bool isTaskIdInUse(const AsyncTaskId& task_id) const {
403 return tasks_by_id.count(task_id) != 0;
404 }
405
tryStartThread()406 int tryStartThread() {
407 // need the lock because of the running flag and the cond var
408 std::unique_lock<std::mutex> guard(internal_mutex_);
409 // check that the thread is not yet running
410 if (running_) {
411 return 0;
412 }
413 // start the thread
414 running_ = true;
415 thread_ = std::thread([this]() { ThreadRoutine(); });
416 if (!thread_.joinable()) {
417 LOG_ERROR(LOG_TAG, "%s: Unable to start task thread", __func__);
418 return -1;
419 }
420 return 0;
421 }
422
ThreadRoutine()423 void ThreadRoutine() {
424 while (1) {
425 TaskCallback callback;
426 bool run_it = false;
427 {
428 std::unique_lock<std::mutex> guard(internal_mutex_);
429 if (!task_queue_.empty()) {
430 std::shared_ptr<Task> task_p = *(task_queue_.begin());
431 if (task_p->time < std::chrono::steady_clock::now()) {
432 run_it = true;
433 callback = task_p->callback;
434 task_queue_.erase(task_p); // need to remove and add again if
435 // periodic to update order
436 if (task_p->isPeriodic()) {
437 task_p->time += task_p->period;
438 task_queue_.insert(task_p);
439 } else {
440 tasks_by_id.erase(task_p->task_id);
441 }
442 }
443 }
444 }
445 if (run_it) {
446 callback();
447 }
448 {
449 std::unique_lock<std::mutex> guard(internal_mutex_);
450 // wait on condition variable with timeout just in time for next task if
451 // any
452 if (task_queue_.size() > 0) {
453 internal_cond_var_.wait_until(guard, (*task_queue_.begin())->time);
454 } else {
455 internal_cond_var_.wait(guard);
456 }
457 // check for termination right after being notified (and maybe before?)
458 if (!running_) break;
459 }
460 }
461 }
462
463 bool running_ = false;
464 std::thread thread_;
465 std::mutex internal_mutex_;
466 std::condition_variable internal_cond_var_;
467
468 AsyncTaskId lastTaskId_ = kInvalidTaskId;
469 std::map<AsyncTaskId, std::shared_ptr<Task> > tasks_by_id;
470 std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
471 };
472
473 // Async Manager Implementation:
AsyncManager()474 AsyncManager::AsyncManager()
475 : fdWatcher_p_(new AsyncFdWatcher()),
476 taskManager_p_(new AsyncTaskManager()) {}
477
~AsyncManager()478 AsyncManager::~AsyncManager() {
479 // Make sure the threads are stopped before destroying the object.
480 // The threads need to be stopped here and not in each internal class'
481 // destructor because unique_ptr's reset() first assigns nullptr to the
482 // pointer and only then calls the destructor, so any callback running
483 // on these threads would dereference a null pointer if they called a member
484 // function of this class.
485 fdWatcher_p_->stopThread();
486 taskManager_p_->stopThread();
487 }
488
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)489 int AsyncManager::WatchFdForNonBlockingReads(
490 int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
491 return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor,
492 on_read_fd_ready_callback);
493 }
494
StopWatchingFileDescriptor(int file_descriptor)495 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
496 fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
497 }
498
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)499 AsyncTaskId AsyncManager::ExecAsync(std::chrono::milliseconds delay,
500 const TaskCallback& callback) {
501 return taskManager_p_->ExecAsync(delay, callback);
502 }
503
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)504 AsyncTaskId AsyncManager::ExecAsyncPeriodically(
505 std::chrono::milliseconds delay, std::chrono::milliseconds period,
506 const TaskCallback& callback) {
507 return taskManager_p_->ExecAsyncPeriodically(delay, period, callback);
508 }
509
CancelAsyncTask(AsyncTaskId async_task_id)510 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
511 return taskManager_p_->CancelAsyncTask(async_task_id);
512 }
513
Synchronize(const CriticalCallback & critical)514 void AsyncManager::Synchronize(const CriticalCallback& critical) {
515 std::unique_lock<std::mutex> guard(synchronization_mutex_);
516 critical();
517 }
518 }
519