1 // Copyright (C) 2017 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #pragma once 16 17 #include <functional> 18 #include <future> 19 #include <optional> 20 #include <utility> 21 #include <vector> 22 23 #include "base/Compiler.h" 24 #include "base/ConditionVariable.h" 25 #include "base/FunctorThread.h" 26 #include "base/Lock.h" 27 28 // 29 // WorkerThread<Item> encapsulates an asynchronous processing queue for objects 30 // of type Item. It manages queue memory, runs processing function in a separate 31 // thread and allows the processing function to stop it at any moment. 32 // 33 // Expected usage of the class: 34 // 35 // - Define an object to store all data for processing: 36 // struct WorkItem { int number; }; 37 // 38 // - Create a WorkerThread with processing function: 39 // WorkerThread<WorkItem> worker([](WorkItem&& item) { 40 // std::cout << item.number; 41 // return item.number 42 // ? WorkerProcessingResult::Continue 43 // : WorkerProcessingResult::Stop; 44 // }); 45 // 46 // - Start the worker and send some data for asynchronous processing 47 // worker.start(); 48 // worker.enqueue({1}); 49 // worker.enqueue({2}); 50 // worker.enqueue({}); // <--- this item will stop processing. 51 // worker.join(); 52 // 53 // WorkerThread<>'s all methods are thread-safe, with an expectation that the 54 // work could be added from any number of threads at once. 55 // 56 // Note: destructor calls join() implicitly - it's better to send some 57 // end-of-work marker before trying to destroy a worker thread. 58 // 59 60 namespace android { 61 namespace base { 62 63 // Return values for a worker thread's processing function. 64 enum class WorkerProcessingResult { Continue, Stop }; 65 66 template <class Item> 67 class WorkerThread { 68 DISALLOW_COPY_AND_ASSIGN(WorkerThread); 69 70 public: 71 using Result = WorkerProcessingResult; 72 // A function that's called for each enqueued item in a separate thread. 73 using Processor = std::function<Result(Item&&)>; 74 WorkerThread(Processor && processor)75 WorkerThread(Processor&& processor) 76 : mProcessor(std::move(processor)), mThread([this]() { worker(); }) { 77 mQueue.reserve(10); 78 } ~WorkerThread()79 ~WorkerThread() { join(); } 80 81 // Starts the worker thread. start()82 bool start() { 83 mStarted = true; 84 if (!mThread.start()) { 85 mFinished = true; 86 return false; 87 } 88 return true; 89 } isStarted()90 bool isStarted() const { return mStarted; } 91 // Waits for all enqueue()'d items to finish. waitQueuedItems()92 void waitQueuedItems() { 93 if (!mStarted || mFinished) 94 return; 95 96 // Enqueue an empty sync command. 97 std::future<void> completeFuture = enqueueImpl(Command()); 98 completeFuture.wait(); 99 } 100 // Waits for worker thread to complete. join()101 void join() { mThread.wait(); } 102 103 // Moves the |item| into internal queue for processing. enqueue(Item && item)104 std::future<void> enqueue(Item&& item) { 105 return enqueueImpl(Command(std::move(item))); 106 } 107 108 private: 109 struct Command { CommandCommand110 Command() : mWorkItem(std::nullopt) {} CommandCommand111 Command(Item&& it) : mWorkItem(std::move(it)) {} CommandCommand112 Command(Command&& other) 113 : mCompletedPromise(std::move(other.mCompletedPromise)), 114 mWorkItem(std::move(other.mWorkItem)) {} 115 116 std::promise<void> mCompletedPromise; 117 std::optional<Item> mWorkItem; 118 }; 119 enqueueImpl(Command command)120 std::future<void> enqueueImpl(Command command) { 121 base::AutoLock lock(mLock); 122 bool signal = mQueue.empty(); 123 std::future<void> res = command.mCompletedPromise.get_future(); 124 mQueue.emplace_back(std::move(command)); 125 if (signal) { 126 mCv.signalAndUnlock(&lock); 127 } 128 return res; 129 } 130 worker()131 void worker() { 132 std::vector<Command> todo; 133 todo.reserve(10); 134 for (;;) { 135 { 136 base::AutoLock lock(mLock); 137 while (mQueue.empty()) { 138 mCv.wait(&lock); 139 } 140 todo.swap(mQueue); 141 } 142 143 for (Command& item : todo) { 144 bool shouldStop = false; 145 if (item.mWorkItem) { 146 // Normal work item 147 if (mProcessor(std::move(item.mWorkItem.value())) == 148 Result::Stop) { 149 shouldStop = true; 150 } 151 } 152 item.mCompletedPromise.set_value(); 153 if (shouldStop) { 154 return; 155 } 156 } 157 158 todo.clear(); 159 } 160 mFinished = true; 161 } 162 163 Processor mProcessor; 164 base::FunctorThread mThread; 165 std::vector<Command> mQueue; 166 base::Lock mLock; 167 base::ConditionVariable mCv; 168 169 bool mStarted = false; 170 bool mFinished = false; 171 }; 172 173 } // namespace base 174 } // namespace android 175