1 // Copyright (C) 2017 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #pragma once 16 17 #include "base/Compiler.h" 18 #include "base/ConditionVariable.h" 19 #include "base/Lock.h" 20 #include "base/FunctorThread.h" 21 22 #include <functional> 23 #include <utility> 24 #include <vector> 25 26 // 27 // WorkerThread<Item> encapsulates an asynchronous processing queue for objects 28 // of type Item. It manages queue memory, runs processing function in a separate 29 // thread and allows the processing function to stop it at any moment. 30 // 31 // Expected usage of the class: 32 // 33 // - Define an object to store all data for processing: 34 // struct WorkItem { int number; }; 35 // 36 // - Create a WorkerThread with processing function: 37 // WorkerThread<WorkItem> worker([](WorkItem&& item) { 38 // std::cout << item.number; 39 // return item.number 40 // ? WorkerProcessingResult::Continue 41 // : WorkerProcessingResult::Stop; 42 // }); 43 // 44 // - Start the worker and send some data for asynchronous processing 45 // worker.start(); 46 // worker.enqueue({1}); 47 // worker.enqueue({2}); 48 // worker.enqueue({}); // <--- this item will stop processing. 49 // worker.join(); 50 // 51 // WorkerThread<>'s all methods are thread-safe, with an expectation that the 52 // work could be added from any number of threads at once. 53 // 54 // Note: destructor calls join() implicitly - it's better to send some 55 // end-of-work marker before trying to destroy a worker thread. 56 // 57 58 namespace android { 59 namespace base { 60 61 // Return values for a worker thread's processing function. 62 enum class WorkerProcessingResult { Continue, Stop }; 63 64 template <class Item> 65 class WorkerThread { 66 DISALLOW_COPY_AND_ASSIGN(WorkerThread); 67 68 public: 69 using Result = WorkerProcessingResult; 70 // A function that's called for each enqueued item in a separate thread. 71 using Processor = std::function<Result(Item&&)>; 72 WorkerThread(Processor && processor)73 WorkerThread(Processor&& processor) 74 : mProcessor(std::move(processor)), mThread([this]() { worker(); }) { 75 mQueue.reserve(10); 76 } ~WorkerThread()77 ~WorkerThread() { join(); } 78 79 // Starts the worker thread. start()80 bool start() { 81 mStarted = true; 82 if (!mThread.start()) { 83 mFinished = true; 84 return false; 85 } 86 return true; 87 } isStarted()88 bool isStarted() const { return mStarted; } 89 // Waits for all enqueue()'d items to finish. waitQueuedItems()90 void waitQueuedItems() { 91 if (!mStarted || mFinished) 92 return; 93 94 SyncPoint sync; 95 enqueueImpl(&sync); 96 base::AutoLock lock(sync.lock); 97 sync.cv.wait(&lock, [&sync] { return sync.signaled; }); 98 } 99 // Waits for worker thread to complete. join()100 void join() { mThread.wait(); } 101 102 // Moves the |item| into internal queue for processing. enqueue(Item && item)103 void enqueue(Item&& item) { enqueueImpl(std::move(item)); } 104 105 private: 106 struct SyncPoint { 107 bool signaled = false; 108 base::ConditionVariable cv; 109 base::Lock lock; 110 }; 111 struct Command { CommandCommand112 Command(Item&& it) : hasItem(true), workItem(std::move(it)) {} CommandCommand113 Command(SyncPoint* sp) : hasItem(false), syncPoint(sp) {} CommandCommand114 Command(Command&& other) : hasItem(other.hasItem) { 115 if (hasItem) { 116 new (&workItem) Item(std::move(other.workItem)); 117 } else { 118 syncPoint = other.syncPoint; 119 } 120 } ~CommandCommand121 ~Command() { 122 if (hasItem) { 123 workItem.~Item(); 124 } 125 } 126 127 bool hasItem; 128 union { 129 SyncPoint* syncPoint; 130 Item workItem; 131 }; 132 }; 133 134 template <class T> enqueueImpl(T && x)135 void enqueueImpl(T&& x) { 136 base::AutoLock lock(mLock); 137 bool signal = mQueue.empty(); 138 mQueue.emplace_back(Command(std::move(x))); 139 if (signal) { 140 mCv.signalAndUnlock(&lock); 141 } 142 } 143 worker()144 void worker() { 145 std::vector<Command> todo; 146 todo.reserve(10); 147 for (;;) { 148 { 149 base::AutoLock lock(mLock); 150 while (mQueue.empty()) { 151 mCv.wait(&lock); 152 } 153 todo.swap(mQueue); 154 } 155 156 for (Command& item : todo) { 157 if (item.hasItem) { 158 // Normal work item 159 if (mProcessor(std::move(item.workItem)) == Result::Stop) { 160 return; 161 } 162 } else { 163 // Sync point 164 base::AutoLock lock(item.syncPoint->lock); 165 item.syncPoint->signaled = true; 166 item.syncPoint->cv.signalAndUnlock(&lock); 167 } 168 } 169 170 todo.clear(); 171 } 172 mFinished = true; 173 } 174 175 Processor mProcessor; 176 base::FunctorThread mThread; 177 std::vector<Command> mQueue; 178 base::Lock mLock; 179 base::ConditionVariable mCv; 180 181 bool mStarted = false; 182 bool mFinished = false; 183 }; 184 185 } // namespace base 186 } // namespace android 187