• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2017 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #pragma once
16 
17 #include "base/Compiler.h"
18 #include "base/ConditionVariable.h"
19 #include "base/Lock.h"
20 #include "base/FunctorThread.h"
21 
22 #include <functional>
23 #include <utility>
24 #include <vector>
25 
26 //
27 // WorkerThread<Item> encapsulates an asynchronous processing queue for objects
28 // of type Item. It manages queue memory, runs processing function in a separate
29 // thread and allows the processing function to stop it at any moment.
30 //
31 // Expected usage of the class:
32 //
33 // - Define an object to store all data for processing:
34 //      struct WorkItem { int number; };
35 //
36 // - Create a WorkerThread with processing function:
37 //      WorkerThread<WorkItem> worker([](WorkItem&& item) {
38 //          std::cout << item.number;
39 //          return item.number
40 //              ? WorkerProcessingResult::Continue
41 //              : WorkerProcessingResult::Stop;
42 //      });
43 //
44 // - Start the worker and send some data for asynchronous processing
45 //      worker.start();
46 //      worker.enqueue({1});
47 //      worker.enqueue({2});
48 //      worker.enqueue({});     // <--- this item will stop processing.
49 //      worker.join();
50 //
51 // WorkerThread<>'s all methods are thread-safe, with an expectation that the
52 // work could be added from any number of threads at once.
53 //
54 // Note: destructor calls join() implicitly - it's better to send some
55 // end-of-work marker before trying to destroy a worker thread.
56 //
57 
58 namespace android {
59 namespace base {
60 
61 // Return values for a worker thread's processing function.
62 enum class WorkerProcessingResult { Continue, Stop };
63 
64 template <class Item>
65 class WorkerThread {
66     DISALLOW_COPY_AND_ASSIGN(WorkerThread);
67 
68 public:
69     using Result = WorkerProcessingResult;
70     // A function that's called for each enqueued item in a separate thread.
71     using Processor = std::function<Result(Item&&)>;
72 
WorkerThread(Processor && processor)73     WorkerThread(Processor&& processor)
74         : mProcessor(std::move(processor)), mThread([this]() { worker(); }) {
75         mQueue.reserve(10);
76     }
~WorkerThread()77     ~WorkerThread() { join(); }
78 
79     // Starts the worker thread.
start()80     bool start() {
81         mStarted = true;
82         if (!mThread.start()) {
83             mFinished = true;
84             return false;
85         }
86         return true;
87     }
isStarted()88     bool isStarted() const { return mStarted; }
89     // Waits for all enqueue()'d items to finish.
waitQueuedItems()90     void waitQueuedItems() {
91         if (!mStarted || mFinished)
92             return;
93 
94         SyncPoint sync;
95         enqueueImpl(&sync);
96         base::AutoLock lock(sync.lock);
97         sync.cv.wait(&lock, [&sync] { return sync.signaled; });
98     }
99     // Waits for worker thread to complete.
join()100     void join() { mThread.wait(); }
101 
102     // Moves the |item| into internal queue for processing.
enqueue(Item && item)103     void enqueue(Item&& item) { enqueueImpl(std::move(item)); }
104 
105 private:
106     struct SyncPoint {
107         bool signaled = false;
108         base::ConditionVariable cv;
109         base::Lock lock;
110     };
111     struct Command {
CommandCommand112         Command(Item&& it) : hasItem(true), workItem(std::move(it)) {}
CommandCommand113         Command(SyncPoint* sp) : hasItem(false), syncPoint(sp) {}
CommandCommand114         Command(Command&& other) : hasItem(other.hasItem) {
115             if (hasItem) {
116                 new (&workItem) Item(std::move(other.workItem));
117             } else {
118                 syncPoint = other.syncPoint;
119             }
120         }
~CommandCommand121         ~Command() {
122             if (hasItem) {
123                 workItem.~Item();
124             }
125         }
126 
127         bool hasItem;
128         union {
129             SyncPoint* syncPoint;
130             Item workItem;
131         };
132     };
133 
134     template <class T>
enqueueImpl(T && x)135     void enqueueImpl(T&& x) {
136         base::AutoLock lock(mLock);
137         bool signal = mQueue.empty();
138         mQueue.emplace_back(Command(std::move(x)));
139         if (signal) {
140             mCv.signalAndUnlock(&lock);
141         }
142     }
143 
worker()144     void worker() {
145         std::vector<Command> todo;
146         todo.reserve(10);
147         for (;;) {
148             {
149                 base::AutoLock lock(mLock);
150                 while (mQueue.empty()) {
151                     mCv.wait(&lock);
152                 }
153                 todo.swap(mQueue);
154             }
155 
156             for (Command& item : todo) {
157                 if (item.hasItem) {
158                     // Normal work item
159                     if (mProcessor(std::move(item.workItem)) == Result::Stop) {
160                         return;
161                     }
162                 } else {
163                     // Sync point
164                     base::AutoLock lock(item.syncPoint->lock);
165                     item.syncPoint->signaled = true;
166                     item.syncPoint->cv.signalAndUnlock(&lock);
167                 }
168             }
169 
170             todo.clear();
171         }
172         mFinished = true;
173     }
174 
175     Processor mProcessor;
176     base::FunctorThread mThread;
177     std::vector<Command> mQueue;
178     base::Lock mLock;
179     base::ConditionVariable mCv;
180 
181     bool mStarted = false;
182     bool mFinished = false;
183 };
184 
185 }  // namespace base
186 }  // namespace android
187