1 /*
2 * Copyright 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "async_manager"
18
19 #include "async_manager.h"
20
21 #include "osi/include/log.h"
22
23 #include <algorithm>
24 #include <atomic>
25 #include <condition_variable>
26 #include <mutex>
27 #include <thread>
28 #include <vector>
29 #include "fcntl.h"
30 #include "sys/select.h"
31 #include "unistd.h"
32
33 namespace test_vendor_lib {
34 // Implementation of AsyncManager is divided between two classes, three if
35 // AsyncManager itself is taken into account, but its only responsability
36 // besides being a proxy for the other two classes is to provide a global
37 // synchronization mechanism for callbacks and client code to use.
38
39 // The watching of file descriptors is done through AsyncFdWatcher. Several
40 // objects of this class may coexist simultaneosly as they share no state.
41 // After construction of this objects nothing happens beyond some very simple
42 // member initialization. When the first FD is set up for watching the object
43 // starts a new thread which watches the given (and later provided) FDs using
44 // select() inside a loop. A special FD (a pipe) is also watched which is
45 // used to notify the thread of internal changes on the object state (like
46 // the addition of new FDs to watch on). Every access to internal state is
47 // synchronized using a single internal mutex. The thread is only stopped on
48 // destruction of the object, by modifying a flag, which is the only member
49 // variable accessed without acquiring the lock (because the notification to
50 // the thread is done later by writing to a pipe which means the thread will
51 // be notified regardless of what phase of the loop it is in that moment)
52
53 // The scheduling of asynchronous tasks, periodic or not, is handled by the
54 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
55 // state between different instances so it is safe to use several objects of
56 // this class, also nothing interesting happens upon construction, but only
57 // after a Task has been scheduled and access to internal state is synchronized
58 // using a single internal mutex. When the first task is scheduled a thread
59 // is started which monitors a queue of tasks. The queue is peeked to see
60 // when the next task should be carried out and then the thread performs a
61 // (absolute) timed wait on a condition variable. The wait ends because of a
62 // time out or a notify on the cond var, the former means a task is due
63 // for execution while the later means there has been a change in internal
64 // state, like a task has been scheduled/canceled or the flag to stop has
65 // been set. Setting and querying the stop flag or modifying the task queue
66 // and subsequent notification on the cond var is done atomically (e.g while
67 // holding the lock on the internal mutex) to ensure that the thread never
68 // misses the notification, since notifying a cond var is not persistent as
69 // writing on a pipe (if not done this way, the thread could query the
70 // stopping flag and be put aside by the OS scheduler right after, then the
71 // 'stop thread' procedure could run, setting the flag, notifying a cond
72 // var that no one is waiting on and joining the thread, the thread then
73 // resumes execution believing that it needs to continue and waits on the
74 // cond var possibly forever if there are no tasks scheduled, efectively
75 // causing a deadlock).
76
77 // This number also states the maximum number of scheduled tasks we can handle
78 // at a given time
79 static const uint16_t kMaxTaskId = -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)80 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
81 return (id == kMaxTaskId) ? 1 : id + 1;
82 }
83 // The buffer is only 10 bytes because the expected number of bytes
84 // written on this socket is 1. It is possible that the thread is notified
85 // more than once but highly unlikely, so a buffer of size 10 seems enough
86 // and the reads are performed inside a while just in case it isn't. From
87 // the thread routine's point of view it is the same to have been notified
88 // just once or 100 times so it just tries to consume the entire buffer.
89 // In the cases where an interrupt would cause read to return without
90 // having read everything that was available a new iteration of the thread
91 // loop will bring execution to this point almost immediately, so there is
92 // no need to treat that case.
93 static const int kNotificationBufferSize = 10;
94
95 // Async File Descriptor Watcher Implementation:
96 class AsyncManager::AsyncFdWatcher {
97 public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)98 int WatchFdForNonBlockingReads(int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
99 // add file descriptor and callback
100 {
101 std::unique_lock<std::mutex> guard(internal_mutex_);
102 watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
103 }
104
105 // start the thread if not started yet
106 int started = tryStartThread();
107 if (started != 0) {
108 LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
109 return started;
110 }
111
112 // notify the thread so that it knows of the new FD
113 notifyThread();
114
115 return 0;
116 }
117
StopWatchingFileDescriptor(int file_descriptor)118 void StopWatchingFileDescriptor(int file_descriptor) {
119 std::unique_lock<std::mutex> guard(internal_mutex_);
120 watched_shared_fds_.erase(file_descriptor);
121 }
122
123 AsyncFdWatcher() = default;
124
125 ~AsyncFdWatcher() = default;
126
stopThread()127 int stopThread() {
128 if (!std::atomic_exchange(&running_, false)) {
129 return 0; // if not running already
130 }
131
132 notifyThread();
133
134 if (std::this_thread::get_id() != thread_.get_id()) {
135 thread_.join();
136 } else {
137 LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the reading thread itself", __func__);
138 }
139
140 {
141 std::unique_lock<std::mutex> guard(internal_mutex_);
142 watched_shared_fds_.clear();
143 }
144
145 return 0;
146 }
147
148 private:
149 AsyncFdWatcher(const AsyncFdWatcher&) = delete;
150 AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
151
152 // Make sure to call this with at least one file descriptor ready to be
153 // watched upon or the thread routine will return immediately
tryStartThread()154 int tryStartThread() {
155 if (std::atomic_exchange(&running_, true)) {
156 return 0; // if already running
157 }
158 // set up the communication channel
159 int pipe_fds[2];
160 if (pipe2(pipe_fds, O_NONBLOCK)) {
161 LOG_ERROR(LOG_TAG,
162 "%s:Unable to establish a communication channel to the reading "
163 "thread",
164 __func__);
165 return -1;
166 }
167 notification_listen_fd_ = pipe_fds[0];
168 notification_write_fd_ = pipe_fds[1];
169
170 thread_ = std::thread([this]() { ThreadRoutine(); });
171 if (!thread_.joinable()) {
172 LOG_ERROR(LOG_TAG, "%s: Unable to start reading thread", __func__);
173 return -1;
174 }
175 return 0;
176 }
177
notifyThread()178 int notifyThread() {
179 char buffer = '0';
180 if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
181 LOG_ERROR(LOG_TAG, "%s: Unable to send message to reading thread", __func__);
182 return -1;
183 }
184 return 0;
185 }
186
setUpFileDescriptorSet(fd_set & read_fds)187 int setUpFileDescriptorSet(fd_set& read_fds) {
188 // add comm channel to the set
189 FD_SET(notification_listen_fd_, &read_fds);
190 int nfds = notification_listen_fd_;
191
192 // add watched FDs to the set
193 {
194 std::unique_lock<std::mutex> guard(internal_mutex_);
195 for (auto& fdp : watched_shared_fds_) {
196 FD_SET(fdp.first, &read_fds);
197 nfds = std::max(fdp.first, nfds);
198 }
199 }
200 return nfds;
201 }
202
203 // check the comm channel and read everything there
consumeThreadNotifications(fd_set & read_fds)204 bool consumeThreadNotifications(fd_set& read_fds) {
205 if (FD_ISSET(notification_listen_fd_, &read_fds)) {
206 char buffer[kNotificationBufferSize];
207 while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer, kNotificationBufferSize)) ==
208 kNotificationBufferSize) {
209 }
210 return true;
211 }
212 return false;
213 }
214
215 // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(fd_set & read_fds)216 void runAppropriateCallbacks(fd_set& read_fds) {
217 // not a good idea to call a callback while holding the FD lock,
218 // nor to release the lock while traversing the map
219 std::vector<decltype(watched_shared_fds_)::value_type> fds;
220 {
221 std::unique_lock<std::mutex> guard(internal_mutex_);
222 for (auto& fdc : watched_shared_fds_) {
223 if (FD_ISSET(fdc.first, &read_fds)) {
224 fds.push_back(fdc);
225 }
226 }
227 }
228 for (auto& p : fds) {
229 p.second(p.first);
230 }
231 }
232
ThreadRoutine()233 void ThreadRoutine() {
234 while (running_) {
235 fd_set read_fds;
236 FD_ZERO(&read_fds);
237 int nfds = setUpFileDescriptorSet(read_fds);
238
239 // wait until there is data available to read on some FD
240 int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
241 if (retval <= 0) { // there was some error or a timeout
242 LOG_ERROR(LOG_TAG,
243 "%s: There was an error while waiting for data on the file "
244 "descriptors: %s",
245 __func__, strerror(errno));
246 continue;
247 }
248
249 consumeThreadNotifications(read_fds);
250
251 // Do not read if there was a call to stop running
252 if (!running_) {
253 break;
254 }
255
256 runAppropriateCallbacks(read_fds);
257 }
258 }
259
260 std::atomic_bool running_{false};
261 std::thread thread_;
262 std::mutex internal_mutex_;
263
264 std::map<int, ReadCallback> watched_shared_fds_;
265
266 // A pair of FD to send information to the reading thread
267 int notification_listen_fd_;
268 int notification_write_fd_;
269 };
270
271 // Async task manager implementation
272 class AsyncManager::AsyncTaskManager {
273 public:
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)274 AsyncTaskId ExecAsync(std::chrono::milliseconds delay, const TaskCallback& callback) {
275 return scheduleTask(std::make_shared<Task>(std::chrono::steady_clock::now() + delay, callback));
276 }
277
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)278 AsyncTaskId ExecAsyncPeriodically(std::chrono::milliseconds delay, std::chrono::milliseconds period,
279 const TaskCallback& callback) {
280 return scheduleTask(std::make_shared<Task>(std::chrono::steady_clock::now() + delay, period, callback));
281 }
282
CancelAsyncTask(AsyncTaskId async_task_id)283 bool CancelAsyncTask(AsyncTaskId async_task_id) {
284 // remove task from queue (and task id asociation) while holding lock
285 std::unique_lock<std::mutex> guard(internal_mutex_);
286 if (tasks_by_id.count(async_task_id) == 0) {
287 return false;
288 }
289 task_queue_.erase(tasks_by_id[async_task_id]);
290 tasks_by_id.erase(async_task_id);
291 return true;
292 }
293
294 AsyncTaskManager() = default;
295
296 ~AsyncTaskManager() = default;
297
stopThread()298 int stopThread() {
299 {
300 std::unique_lock<std::mutex> guard(internal_mutex_);
301 tasks_by_id.clear();
302 task_queue_.clear();
303 if (!running_) {
304 return 0;
305 }
306 running_ = false;
307 // notify the thread
308 internal_cond_var_.notify_one();
309 } // release the lock before joining a thread that is likely waiting for it
310 if (std::this_thread::get_id() != thread_.get_id()) {
311 thread_.join();
312 } else {
313 LOG_WARN(LOG_TAG, "%s: Starting thread stop from inside the task thread itself", __func__);
314 }
315 return 0;
316 }
317
318 private:
319 // Holds the data for each task
320 class Task {
321 public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback)322 Task(std::chrono::steady_clock::time_point time, std::chrono::milliseconds period, const TaskCallback& callback)
323 : time(time), periodic(true), period(period), callback(callback), task_id(kInvalidTaskId) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback)324 Task(std::chrono::steady_clock::time_point time, const TaskCallback& callback)
325 : time(time), periodic(false), callback(callback), task_id(kInvalidTaskId) {}
326
327 // Operators needed to be in a collection
operator <(const Task & another) const328 bool operator<(const Task& another) const {
329 return std::make_pair(time, task_id) < std::make_pair(another.time, another.task_id);
330 }
331
isPeriodic() const332 bool isPeriodic() const {
333 return periodic;
334 }
335
336 // These fields should no longer be public if the class ever becomes
337 // public or gets more complex
338 std::chrono::steady_clock::time_point time;
339 bool periodic;
340 std::chrono::milliseconds period;
341 TaskCallback callback;
342 AsyncTaskId task_id;
343 };
344
345 // A comparator class to put shared pointers to tasks in an ordered set
346 struct task_p_comparator {
operator ()test_vendor_lib::AsyncManager::AsyncTaskManager::task_p_comparator347 bool operator()(const std::shared_ptr<Task>& t1, const std::shared_ptr<Task>& t2) const {
348 return *t1 < *t2;
349 }
350 };
351
352 AsyncTaskManager(const AsyncTaskManager&) = delete;
353 AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
354
scheduleTask(const std::shared_ptr<Task> & task)355 AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
356 AsyncTaskId task_id = kInvalidTaskId;
357 {
358 std::unique_lock<std::mutex> guard(internal_mutex_);
359 // no more room for new tasks, we need a larger type for IDs
360 if (tasks_by_id.size() == kMaxTaskId) // TODO potentially type unsafe
361 return kInvalidTaskId;
362 do {
363 lastTaskId_ = NextAsyncTaskId(lastTaskId_);
364 } while (isTaskIdInUse(lastTaskId_));
365 task->task_id = lastTaskId_;
366 // add task to the queue and map
367 tasks_by_id[lastTaskId_] = task;
368 task_queue_.insert(task);
369 task_id = lastTaskId_;
370 }
371 // start thread if necessary
372 int started = tryStartThread();
373 if (started != 0) {
374 LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
375 return kInvalidTaskId;
376 }
377 // notify the thread so that it knows of the new task
378 internal_cond_var_.notify_one();
379 // return task id
380 return task_id;
381 }
382
isTaskIdInUse(const AsyncTaskId & task_id) const383 bool isTaskIdInUse(const AsyncTaskId& task_id) const {
384 return tasks_by_id.count(task_id) != 0;
385 }
386
tryStartThread()387 int tryStartThread() {
388 // need the lock because of the running flag and the cond var
389 std::unique_lock<std::mutex> guard(internal_mutex_);
390 // check that the thread is not yet running
391 if (running_) {
392 return 0;
393 }
394 // start the thread
395 running_ = true;
396 thread_ = std::thread([this]() { ThreadRoutine(); });
397 if (!thread_.joinable()) {
398 LOG_ERROR(LOG_TAG, "%s: Unable to start task thread", __func__);
399 return -1;
400 }
401 return 0;
402 }
403
ThreadRoutine()404 void ThreadRoutine() {
405 while (1) {
406 TaskCallback callback;
407 bool run_it = false;
408 {
409 std::unique_lock<std::mutex> guard(internal_mutex_);
410 if (!task_queue_.empty()) {
411 std::shared_ptr<Task> task_p = *(task_queue_.begin());
412 if (task_p->time < std::chrono::steady_clock::now()) {
413 run_it = true;
414 callback = task_p->callback;
415 task_queue_.erase(task_p); // need to remove and add again if
416 // periodic to update order
417 if (task_p->isPeriodic()) {
418 task_p->time += task_p->period;
419 task_queue_.insert(task_p);
420 } else {
421 tasks_by_id.erase(task_p->task_id);
422 }
423 }
424 }
425 }
426 if (run_it) {
427 callback();
428 }
429 {
430 std::unique_lock<std::mutex> guard(internal_mutex_);
431 // wait on condition variable with timeout just in time for next task if
432 // any
433 if (task_queue_.size() > 0) {
434 internal_cond_var_.wait_until(guard, (*task_queue_.begin())->time);
435 } else {
436 internal_cond_var_.wait(guard);
437 }
438 // check for termination right after being notified (and maybe before?)
439 if (!running_) break;
440 }
441 }
442 }
443
444 bool running_ = false;
445 std::thread thread_;
446 std::mutex internal_mutex_;
447 std::condition_variable internal_cond_var_;
448
449 AsyncTaskId lastTaskId_ = kInvalidTaskId;
450 std::map<AsyncTaskId, std::shared_ptr<Task> > tasks_by_id;
451 std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
452 };
453
454 // Async Manager Implementation:
AsyncManager()455 AsyncManager::AsyncManager() : fdWatcher_p_(new AsyncFdWatcher()), taskManager_p_(new AsyncTaskManager()) {}
456
~AsyncManager()457 AsyncManager::~AsyncManager() {
458 // Make sure the threads are stopped before destroying the object.
459 // The threads need to be stopped here and not in each internal class'
460 // destructor because unique_ptr's reset() first assigns nullptr to the
461 // pointer and only then calls the destructor, so any callback running
462 // on these threads would dereference a null pointer if they called a member
463 // function of this class.
464 fdWatcher_p_->stopThread();
465 taskManager_p_->stopThread();
466 }
467
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)468 int AsyncManager::WatchFdForNonBlockingReads(int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
469 return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor, on_read_fd_ready_callback);
470 }
471
StopWatchingFileDescriptor(int file_descriptor)472 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
473 fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
474 }
475
ExecAsync(std::chrono::milliseconds delay,const TaskCallback & callback)476 AsyncTaskId AsyncManager::ExecAsync(std::chrono::milliseconds delay, const TaskCallback& callback) {
477 return taskManager_p_->ExecAsync(delay, callback);
478 }
479
ExecAsyncPeriodically(std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)480 AsyncTaskId AsyncManager::ExecAsyncPeriodically(std::chrono::milliseconds delay, std::chrono::milliseconds period,
481 const TaskCallback& callback) {
482 return taskManager_p_->ExecAsyncPeriodically(delay, period, callback);
483 }
484
CancelAsyncTask(AsyncTaskId async_task_id)485 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
486 return taskManager_p_->CancelAsyncTask(async_task_id);
487 }
488
Synchronize(const CriticalCallback & critical)489 void AsyncManager::Synchronize(const CriticalCallback& critical) {
490 std::unique_lock<std::mutex> guard(synchronization_mutex_);
491 critical();
492 }
493 } // namespace test_vendor_lib
494