1 // Copyright (C) 2017 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #pragma once 16 17 #include <functional> 18 #include <future> 19 #include <optional> 20 #include <utility> 21 #include <vector> 22 23 #include "aemu/base/Compiler.h" 24 #include "aemu/base/synchronization/ConditionVariable.h" 25 #include "aemu/base/threads/FunctorThread.h" 26 #include "aemu/base/synchronization/Lock.h" 27 28 // 29 // WorkerThread<Item> encapsulates an asynchronous processing queue for objects 30 // of type Item. It manages queue memory, runs processing function in a separate 31 // thread and allows the processing function to stop it at any moment. 32 // 33 // Expected usage of the class: 34 // 35 // - Define an object to store all data for processing: 36 // struct WorkItem { int number; }; 37 // 38 // - Create a WorkerThread with processing function: 39 // WorkerThread<WorkItem> worker([](WorkItem&& item) { 40 // std::cout << item.number; 41 // return item.number 42 // ? WorkerProcessingResult::Continue 43 // : WorkerProcessingResult::Stop; 44 // }); 45 // 46 // - Start the worker and send some data for asynchronous processing 47 // worker.start(); 48 // worker.enqueue({1}); 49 // worker.enqueue({2}); 50 // worker.enqueue({}); // <--- this item will stop processing. 51 // worker.join(); 52 // 53 // WorkerThread<>'s all methods are thread-safe, with an expectation that the 54 // work could be added from any number of threads at once. 55 // 56 // Note: destructor calls join() implicitly - it's better to send some 57 // end-of-work marker before trying to destroy a worker thread. 58 // 59 60 namespace android { 61 namespace base { 62 63 // Return values for a worker thread's processing function. 64 enum class WorkerProcessingResult { Continue, Stop }; 65 66 template <class Item> 67 class WorkerThread { 68 DISALLOW_COPY_AND_ASSIGN(WorkerThread); 69 70 public: 71 using Result = WorkerProcessingResult; 72 // A function that's called for each enqueued item in a separate thread. 73 using Processor = std::function<Result(Item&&)>; 74 WorkerThread(Processor && processor)75 WorkerThread(Processor&& processor) 76 : mProcessor(std::move(processor)), mThread([this]() { worker(); }) { 77 mQueue.reserve(10); 78 } ~WorkerThread()79 ~WorkerThread() { join(); } 80 81 // Starts the worker thread. start()82 bool start() { 83 bool expectedStarted = false; 84 if (!mStarted.compare_exchange_strong(expectedStarted, true)) { 85 return true; 86 } 87 if (!mThread.start()) { 88 AutoLock lock(mLock); 89 setFinishedAndDrainTasks(); 90 return false; 91 } 92 return true; 93 } 94 // Waits for all enqueue()'d items to finish or the worker stops. waitQueuedItems()95 void waitQueuedItems() { 96 // Enqueue an empty sync command. 97 std::future<void> completeFuture = enqueueImpl(Command()); 98 completeFuture.wait(); 99 } 100 // Waits for worker thread to complete. join()101 void join() { mThread.wait(); } 102 103 // Moves the |item| into internal queue for processing. If the command is enqueued after the 104 // stop command is enqueued or before start() returns, the returned future will also be ready 105 // without processing the command. enqueue(Item && item)106 std::future<void> enqueue(Item&& item) { 107 return enqueueImpl(Command(std::move(item))); 108 } 109 110 private: 111 struct Command { CommandCommand112 Command() : mWorkItem(std::nullopt) {} CommandCommand113 Command(Item&& it) : mWorkItem(std::move(it)) {} CommandCommand114 Command(Command&& other) 115 : mCompletedPromise(std::move(other.mCompletedPromise)), 116 mWorkItem(std::move(other.mWorkItem)) {} 117 118 std::promise<void> mCompletedPromise; 119 std::optional<Item> mWorkItem; 120 }; 121 enqueueImpl(Command command)122 std::future<void> enqueueImpl(Command command) { 123 base::AutoLock lock(mLock); 124 // We don't enqueue any new items if mFinished is set to true. 125 if (!mStarted || mFinished) { 126 command.mCompletedPromise.set_value(); 127 return command.mCompletedPromise.get_future(); 128 } 129 130 std::future<void> res = command.mCompletedPromise.get_future(); 131 mQueue.emplace_back(std::move(command)); 132 mCv.signalAndUnlock(&lock); 133 return res; 134 } 135 worker()136 void worker() { 137 std::vector<Command> todo; 138 todo.reserve(10); 139 for (;;) { 140 { 141 base::AutoLock lock(mLock); 142 while (mQueue.empty()) { 143 mCv.wait(&lock); 144 } 145 todo.swap(mQueue); 146 } 147 148 bool shouldStop = false; 149 for (Command& item : todo) { 150 if (!shouldStop && item.mWorkItem) { 151 // Normal work item 152 shouldStop = mProcessor(std::move(item.mWorkItem.value())) == Result::Stop; 153 } 154 item.mCompletedPromise.set_value(); 155 } 156 if (shouldStop) { 157 setFinishedAndDrainTasks(); 158 return; 159 } 160 161 todo.clear(); 162 } 163 } 164 setFinishedAndDrainTasks()165 void setFinishedAndDrainTasks() { 166 base::AutoLock lock(mLock); 167 // Set mFinished so that no new tasks will be enqueued. 168 mFinished = true; 169 // Signal pending tasks as if they are completed. 170 for (Command& item : mQueue) { 171 item.mCompletedPromise.set_value(); 172 } 173 return; 174 } 175 176 Processor mProcessor; 177 base::FunctorThread mThread; 178 std::vector<Command> mQueue; 179 base::Lock mLock; 180 base::ConditionVariable mCv; 181 182 std::atomic_bool mStarted = false; 183 // Must be accessd after grabbing the lock. 184 bool mFinished = false; 185 }; 186 187 } // namespace base 188 } // namespace android 189