1 /*
2 * Copyright 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "async_manager"
18
19 #include "async_manager.h"
20
21 #include "osi/include/log.h"
22
23 #include <algorithm>
24 #include <atomic>
25 #include <condition_variable>
26 #include <mutex>
27 #include <thread>
28 #include <vector>
29 #include "fcntl.h"
30 #include "sys/select.h"
31 #include "unistd.h"
32
33 namespace test_vendor_lib {
34 // Implementation of AsyncManager is divided between two classes, three if
35 // AsyncManager itself is taken into account, but its only responsability
36 // besides being a proxy for the other two classes is to provide a global
37 // synchronization mechanism for callbacks and client code to use.
38
39 // The watching of file descriptors is done through AsyncFdWatcher. Several
40 // objects of this class may coexist simultaneosly as they share no state.
41 // After construction of this objects nothing happens beyond some very simple
42 // member initialization. When the first FD is set up for watching the object
43 // starts a new thread which watches the given (and later provided) FDs using
44 // select() inside a loop. A special FD (a pipe) is also watched which is
45 // used to notify the thread of internal changes on the object state (like
46 // the addition of new FDs to watch on). Every access to internal state is
47 // synchronized using a single internal mutex. The thread is only stopped on
48 // destruction of the object, by modifying a flag, which is the only member
49 // variable accessed without acquiring the lock (because the notification to
50 // the thread is done later by writing to a pipe which means the thread will
51 // be notified regardless of what phase of the loop it is in that moment)
52
53 // The scheduling of asynchronous tasks, periodic or not, is handled by the
54 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
55 // state between different instances so it is safe to use several objects of
56 // this class, also nothing interesting happens upon construction, but only
57 // after a Task has been scheduled and access to internal state is synchronized
58 // using a single internal mutex. When the first task is scheduled a thread
59 // is started which monitors a queue of tasks. The queue is peeked to see
60 // when the next task should be carried out and then the thread performs a
61 // (absolute) timed wait on a condition variable. The wait ends because of a
62 // time out or a notify on the cond var, the former means a task is due
63 // for execution while the later means there has been a change in internal
64 // state, like a task has been scheduled/canceled or the flag to stop has
65 // been set. Setting and querying the stop flag or modifying the task queue
66 // and subsequent notification on the cond var is done atomically (e.g while
67 // holding the lock on the internal mutex) to ensure that the thread never
68 // misses the notification, since notifying a cond var is not persistent as
69 // writing on a pipe (if not done this way, the thread could query the
70 // stopping flag and be put aside by the OS scheduler right after, then the
71 // 'stop thread' procedure could run, setting the flag, notifying a cond
72 // var that no one is waiting on and joining the thread, the thread then
73 // resumes execution believing that it needs to continue and waits on the
74 // cond var possibly forever if there are no tasks scheduled, efectively
75 // causing a deadlock).
76
77 // This number also states the maximum number of scheduled tasks we can handle
78 // at a given time
79 static const uint16_t kMaxTaskId =
80 -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)81 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
82 return (id == kMaxTaskId) ? 1 : id + 1;
83 }
84 // The buffer is only 10 bytes because the expected number of bytes
85 // written on this socket is 1. It is possible that the thread is notified
86 // more than once but highly unlikely, so a buffer of size 10 seems enough
87 // and the reads are performed inside a while just in case it isn't. From
88 // the thread routine's point of view it is the same to have been notified
89 // just once or 100 times so it just tries to consume the entire buffer.
90 // In the cases where an interrupt would cause read to return without
91 // having read everything that was available a new iteration of the thread
92 // loop will bring execution to this point almost immediately, so there is
93 // no need to treat that case.
94 static const int kNotificationBufferSize = 10;
95
96 // Async File Descriptor Watcher Implementation:
97 class AsyncManager::AsyncFdWatcher {
98 public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)99 int WatchFdForNonBlockingReads(
100 int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
101 // add file descriptor and callback
102 {
103 std::unique_lock<std::mutex> guard(internal_mutex_);
104 watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
105 }
106
107 // start the thread if not started yet
108 int started = tryStartThread();
109 if (started != 0) {
110 LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
111 return started;
112 }
113
114 // notify the thread so that it knows of the new FD
115 notifyThread();
116
117 return 0;
118 }
119
StopWatchingFileDescriptor(int file_descriptor)120 void StopWatchingFileDescriptor(int file_descriptor) {
121 std::unique_lock<std::mutex> guard(internal_mutex_);
122 watched_shared_fds_.erase(file_descriptor);
123 }
124
125 AsyncFdWatcher() = default;
126
127 ~AsyncFdWatcher() = default;
128
stopThread()129 int stopThread() {
130 if (!std::atomic_exchange(&running_, false)) {
131 return 0; // if not running already
132 }
133
134 notifyThread();
135
136 if (std::this_thread::get_id() != thread_.get_id()) {
137 thread_.join();
138 } else {
139 LOG_WARN(LOG_TAG,
140 "%s: Starting thread stop from inside the reading thread itself",
141 __func__);
142 }
143
144 {
145 std::unique_lock<std::mutex> guard(internal_mutex_);
146 watched_shared_fds_.clear();
147 }
148
149 return 0;
150 }
151
152 private:
153 AsyncFdWatcher(const AsyncFdWatcher&) = delete;
154 AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
155
156 // Make sure to call this with at least one file descriptor ready to be
157 // watched upon or the thread routine will return immediately
tryStartThread()158 int tryStartThread() {
159 if (std::atomic_exchange(&running_, true)) {
160 return 0; // if already running
161 }
162 // set up the communication channel
163 int pipe_fds[2];
164 if (pipe2(pipe_fds, O_NONBLOCK)) {
165 LOG_ERROR(LOG_TAG,
166 "%s:Unable to establish a communication channel to the reading "
167 "thread",
168 __func__);
169 return -1;
170 }
171 notification_listen_fd_ = pipe_fds[0];
172 notification_write_fd_ = pipe_fds[1];
173
174 thread_ = std::thread([this]() { ThreadRoutine(); });
175 if (!thread_.joinable()) {
176 LOG_ERROR(LOG_TAG, "%s: Unable to start reading thread", __func__);
177 return -1;
178 }
179 return 0;
180 }
181
notifyThread()182 int notifyThread() {
183 char buffer = '0';
184 if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
185 LOG_ERROR(LOG_TAG, "%s: Unable to send message to reading thread",
186 __func__);
187 return -1;
188 }
189 return 0;
190 }
191
setUpFileDescriptorSet(fd_set & read_fds)192 int setUpFileDescriptorSet(fd_set& read_fds) {
193 // add comm channel to the set
194 FD_SET(notification_listen_fd_, &read_fds);
195 int nfds = notification_listen_fd_;
196
197 // add watched FDs to the set
198 {
199 std::unique_lock<std::mutex> guard(internal_mutex_);
200 for (auto& fdp : watched_shared_fds_) {
201 FD_SET(fdp.first, &read_fds);
202 nfds = std::max(fdp.first, nfds);
203 }
204 }
205 return nfds;
206 }
207
208 // check the comm channel and read everything there
consumeThreadNotifications(fd_set & read_fds)209 bool consumeThreadNotifications(fd_set& read_fds) {
210 if (FD_ISSET(notification_listen_fd_, &read_fds)) {
211 char buffer[kNotificationBufferSize];
212 while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer,
213 kNotificationBufferSize)) ==
214 kNotificationBufferSize) {
215 }
216 return true;
217 }
218 return false;
219 }
220
221 // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(fd_set & read_fds)222 void runAppropriateCallbacks(fd_set& read_fds) {
223 // not a good idea to call a callback while holding the FD lock,
224 // nor to release the lock while traversing the map
225 std::vector<decltype(watched_shared_fds_)::value_type> fds;
226 {
227 std::unique_lock<std::mutex> guard(internal_mutex_);
228 for (auto& fdc : watched_shared_fds_) {
229 if (FD_ISSET(fdc.first, &read_fds)) {
230 fds.push_back(fdc);
231 }
232 }
233 }
234 for (auto& p : fds) {
235 p.second(p.first);
236 }
237 }
238
ThreadRoutine()239 void ThreadRoutine() {
240 while (running_) {
241 fd_set read_fds;
242 FD_ZERO(&read_fds);
243 int nfds = setUpFileDescriptorSet(read_fds);
244
245 // wait until there is data available to read on some FD
246 int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
247 if (retval <= 0) { // there was some error or a timeout
248 LOG_ERROR(LOG_TAG,
249 "%s: There was an error while waiting for data on the file "
250 "descriptors",
251 __func__);
252 continue;
253 }
254
255 consumeThreadNotifications(read_fds);
256
257 // Do not read if there was a call to stop running
258 if (!running_) {
259 break;
260 }
261
262 runAppropriateCallbacks(read_fds);
263 }
264 }
265
266 std::atomic_bool running_{false};
267 std::thread thread_;
268 std::mutex internal_mutex_;
269
270 std::map<int, ReadCallback> watched_shared_fds_;
271
272 // A pair of FD to send information to the reading thread
273 int notification_listen_fd_;
274 int notification_write_fd_;
275 };
276
277 // Async task manager implementation
278 class AsyncManager::AsyncTaskManager {
279 public:
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)280 AsyncTaskId ExecAsync(std::chrono::milliseconds delay,
281 const TaskCallback& callback) {
282 return scheduleTask(std::make_shared<Task>(
283 std::chrono::steady_clock::now() + delay, callback));
284 }
285
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)286 AsyncTaskId ExecAsyncPeriodically(std::chrono::milliseconds delay,
287 std::chrono::milliseconds period,
288 const TaskCallback& callback) {
289 return scheduleTask(std::make_shared<Task>(
290 std::chrono::steady_clock::now() + delay, period, callback));
291 }
292
CancelAsyncTask(AsyncTaskId async_task_id)293 bool CancelAsyncTask(AsyncTaskId async_task_id) {
294 // remove task from queue (and task id asociation) while holding lock
295 std::unique_lock<std::mutex> guard(internal_mutex_);
296 if (tasks_by_id.count(async_task_id) == 0) {
297 return false;
298 }
299 task_queue_.erase(tasks_by_id[async_task_id]);
300 tasks_by_id.erase(async_task_id);
301 return true;
302 }
303
304 AsyncTaskManager() = default;
305
306 ~AsyncTaskManager() = default;
307
stopThread()308 int stopThread() {
309 {
310 std::unique_lock<std::mutex> guard(internal_mutex_);
311 tasks_by_id.clear();
312 task_queue_.clear();
313 if (!running_) {
314 return 0;
315 }
316 running_ = false;
317 // notify the thread
318 internal_cond_var_.notify_one();
319 } // release the lock before joining a thread that is likely waiting for it
320 if (std::this_thread::get_id() != thread_.get_id()) {
321 thread_.join();
322 } else {
323 LOG_WARN(LOG_TAG,
324 "%s: Starting thread stop from inside the task thread itself",
325 __func__);
326 }
327 return 0;
328 }
329
330 private:
331 // Holds the data for each task
332 class Task {
333 public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback)334 Task(std::chrono::steady_clock::time_point time,
335 std::chrono::milliseconds period, const TaskCallback& callback)
336 : time(time),
337 periodic(true),
338 period(period),
339 callback(callback),
340 task_id(kInvalidTaskId) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback)341 Task(std::chrono::steady_clock::time_point time,
342 const TaskCallback& callback)
343 : time(time),
344 periodic(false),
345 callback(callback),
346 task_id(kInvalidTaskId) {}
347
348 // Operators needed to be in a collection
operator <(const Task & another) const349 bool operator<(const Task& another) const {
350 return std::make_pair(time, task_id) <
351 std::make_pair(another.time, another.task_id);
352 }
353
isPeriodic() const354 bool isPeriodic() const { return periodic; }
355
356 // These fields should no longer be public if the class ever becomes
357 // public or gets more complex
358 std::chrono::steady_clock::time_point time;
359 bool periodic;
360 std::chrono::milliseconds period;
361 TaskCallback callback;
362 AsyncTaskId task_id;
363 };
364
365 // A comparator class to put shared pointers to tasks in an ordered set
366 struct task_p_comparator {
operator ()test_vendor_lib::AsyncManager::AsyncTaskManager::task_p_comparator367 bool operator()(const std::shared_ptr<Task>& t1,
368 const std::shared_ptr<Task>& t2) const {
369 return *t1 < *t2;
370 }
371 };
372
373 AsyncTaskManager(const AsyncTaskManager&) = delete;
374 AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
375
scheduleTask(const std::shared_ptr<Task> & task)376 AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
377 AsyncTaskId task_id = kInvalidTaskId;
378 {
379 std::unique_lock<std::mutex> guard(internal_mutex_);
380 // no more room for new tasks, we need a larger type for IDs
381 if (tasks_by_id.size() == kMaxTaskId) // TODO potentially type unsafe
382 return kInvalidTaskId;
383 do {
384 lastTaskId_ = NextAsyncTaskId(lastTaskId_);
385 } while (isTaskIdInUse(lastTaskId_));
386 task->task_id = lastTaskId_;
387 // add task to the queue and map
388 tasks_by_id[lastTaskId_] = task;
389 task_queue_.insert(task);
390 task_id = lastTaskId_;
391 }
392 // start thread if necessary
393 int started = tryStartThread();
394 if (started != 0) {
395 LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
396 return kInvalidTaskId;
397 }
398 // notify the thread so that it knows of the new task
399 internal_cond_var_.notify_one();
400 // return task id
401 return task_id;
402 }
403
isTaskIdInUse(const AsyncTaskId & task_id) const404 bool isTaskIdInUse(const AsyncTaskId& task_id) const {
405 return tasks_by_id.count(task_id) != 0;
406 }
407
tryStartThread()408 int tryStartThread() {
409 // need the lock because of the running flag and the cond var
410 std::unique_lock<std::mutex> guard(internal_mutex_);
411 // check that the thread is not yet running
412 if (running_) {
413 return 0;
414 }
415 // start the thread
416 running_ = true;
417 thread_ = std::thread([this]() { ThreadRoutine(); });
418 if (!thread_.joinable()) {
419 LOG_ERROR(LOG_TAG, "%s: Unable to start task thread", __func__);
420 return -1;
421 }
422 return 0;
423 }
424
ThreadRoutine()425 void ThreadRoutine() {
426 while (1) {
427 TaskCallback callback;
428 bool run_it = false;
429 {
430 std::unique_lock<std::mutex> guard(internal_mutex_);
431 if (!task_queue_.empty()) {
432 std::shared_ptr<Task> task_p = *(task_queue_.begin());
433 if (task_p->time < std::chrono::steady_clock::now()) {
434 run_it = true;
435 callback = task_p->callback;
436 task_queue_.erase(task_p); // need to remove and add again if
437 // periodic to update order
438 if (task_p->isPeriodic()) {
439 task_p->time += task_p->period;
440 task_queue_.insert(task_p);
441 } else {
442 tasks_by_id.erase(task_p->task_id);
443 }
444 }
445 }
446 }
447 if (run_it) {
448 callback();
449 }
450 {
451 std::unique_lock<std::mutex> guard(internal_mutex_);
452 // wait on condition variable with timeout just in time for next task if
453 // any
454 if (task_queue_.size() > 0) {
455 internal_cond_var_.wait_until(guard, (*task_queue_.begin())->time);
456 } else {
457 internal_cond_var_.wait(guard);
458 }
459 // check for termination right after being notified (and maybe before?)
460 if (!running_) break;
461 }
462 }
463 }
464
465 bool running_ = false;
466 std::thread thread_;
467 std::mutex internal_mutex_;
468 std::condition_variable internal_cond_var_;
469
470 AsyncTaskId lastTaskId_ = kInvalidTaskId;
471 std::map<AsyncTaskId, std::shared_ptr<Task> > tasks_by_id;
472 std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
473 };
474
475 // Async Manager Implementation:
AsyncManager()476 AsyncManager::AsyncManager()
477 : fdWatcher_p_(new AsyncFdWatcher()),
478 taskManager_p_(new AsyncTaskManager()) {}
479
~AsyncManager()480 AsyncManager::~AsyncManager() {
481 // Make sure the threads are stopped before destroying the object.
482 // The threads need to be stopped here and not in each internal class'
483 // destructor because unique_ptr's reset() first assigns nullptr to the
484 // pointer and only then calls the destructor, so any callback running
485 // on these threads would dereference a null pointer if they called a member
486 // function of this class.
487 fdWatcher_p_->stopThread();
488 taskManager_p_->stopThread();
489 }
490
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)491 int AsyncManager::WatchFdForNonBlockingReads(
492 int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
493 return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor,
494 on_read_fd_ready_callback);
495 }
496
StopWatchingFileDescriptor(int file_descriptor)497 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
498 fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
499 }
500
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)501 AsyncTaskId AsyncManager::ExecAsync(std::chrono::milliseconds delay,
502 const TaskCallback& callback) {
503 return taskManager_p_->ExecAsync(delay, callback);
504 }
505
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)506 AsyncTaskId AsyncManager::ExecAsyncPeriodically(
507 std::chrono::milliseconds delay, std::chrono::milliseconds period,
508 const TaskCallback& callback) {
509 return taskManager_p_->ExecAsyncPeriodically(delay, period, callback);
510 }
511
CancelAsyncTask(AsyncTaskId async_task_id)512 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
513 return taskManager_p_->CancelAsyncTask(async_task_id);
514 }
515
Synchronize(const CriticalCallback & critical)516 void AsyncManager::Synchronize(const CriticalCallback& critical) {
517 std::unique_lock<std::mutex> guard(synchronization_mutex_);
518 critical();
519 }
520 } // namespace test_vendor_lib
521