• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2017 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #pragma once
16 
17 #include <functional>
18 #include <future>
19 #include <optional>
20 #include <utility>
21 #include <vector>
22 
23 #include "base/Compiler.h"
24 #include "base/ConditionVariable.h"
25 #include "base/FunctorThread.h"
26 #include "base/Lock.h"
27 
28 //
29 // WorkerThread<Item> encapsulates an asynchronous processing queue for objects
30 // of type Item. It manages queue memory, runs processing function in a separate
31 // thread and allows the processing function to stop it at any moment.
32 //
33 // Expected usage of the class:
34 //
35 // - Define an object to store all data for processing:
36 //      struct WorkItem { int number; };
37 //
38 // - Create a WorkerThread with processing function:
39 //      WorkerThread<WorkItem> worker([](WorkItem&& item) {
40 //          std::cout << item.number;
41 //          return item.number
42 //              ? WorkerProcessingResult::Continue
43 //              : WorkerProcessingResult::Stop;
44 //      });
45 //
46 // - Start the worker and send some data for asynchronous processing
47 //      worker.start();
48 //      worker.enqueue({1});
49 //      worker.enqueue({2});
50 //      worker.enqueue({});     // <--- this item will stop processing.
51 //      worker.join();
52 //
53 // WorkerThread<>'s all methods are thread-safe, with an expectation that the
54 // work could be added from any number of threads at once.
55 //
56 // Note: destructor calls join() implicitly - it's better to send some
57 // end-of-work marker before trying to destroy a worker thread.
58 //
59 
60 namespace android {
61 namespace base {
62 
63 // Return values for a worker thread's processing function.
64 enum class WorkerProcessingResult { Continue, Stop };
65 
66 template <class Item>
67 class WorkerThread {
68     DISALLOW_COPY_AND_ASSIGN(WorkerThread);
69 
70 public:
71     using Result = WorkerProcessingResult;
72     // A function that's called for each enqueued item in a separate thread.
73     using Processor = std::function<Result(Item&&)>;
74 
WorkerThread(Processor && processor)75     WorkerThread(Processor&& processor)
76         : mProcessor(std::move(processor)), mThread([this]() { worker(); }) {
77         mQueue.reserve(10);
78     }
~WorkerThread()79     ~WorkerThread() { join(); }
80 
81     // Starts the worker thread.
start()82     bool start() {
83         mStarted = true;
84         if (!mThread.start()) {
85             mFinished = true;
86             return false;
87         }
88         return true;
89     }
isStarted()90     bool isStarted() const { return mStarted; }
91     // Waits for all enqueue()'d items to finish.
waitQueuedItems()92     void waitQueuedItems() {
93         if (!mStarted || mFinished)
94             return;
95 
96         // Enqueue an empty sync command.
97         std::future<void> completeFuture = enqueueImpl(Command());
98         completeFuture.wait();
99     }
100     // Waits for worker thread to complete.
join()101     void join() { mThread.wait(); }
102 
103     // Moves the |item| into internal queue for processing.
enqueue(Item && item)104     std::future<void> enqueue(Item&& item) {
105         return enqueueImpl(Command(std::move(item)));
106     }
107 
108    private:
109     struct Command {
CommandCommand110         Command() : mWorkItem(std::nullopt) {}
CommandCommand111         Command(Item&& it) : mWorkItem(std::move(it)) {}
CommandCommand112         Command(Command&& other)
113             : mCompletedPromise(std::move(other.mCompletedPromise)),
114               mWorkItem(std::move(other.mWorkItem)) {}
115 
116         std::promise<void> mCompletedPromise;
117         std::optional<Item> mWorkItem;
118     };
119 
enqueueImpl(Command command)120     std::future<void> enqueueImpl(Command command) {
121         base::AutoLock lock(mLock);
122         bool signal = mQueue.empty();
123         std::future<void> res = command.mCompletedPromise.get_future();
124         mQueue.emplace_back(std::move(command));
125         if (signal) {
126             mCv.signalAndUnlock(&lock);
127         }
128         return res;
129     }
130 
worker()131     void worker() {
132         std::vector<Command> todo;
133         todo.reserve(10);
134         for (;;) {
135             {
136                 base::AutoLock lock(mLock);
137                 while (mQueue.empty()) {
138                     mCv.wait(&lock);
139                 }
140                 todo.swap(mQueue);
141             }
142 
143             for (Command& item : todo) {
144                 bool shouldStop = false;
145                 if (item.mWorkItem) {
146                     // Normal work item
147                     if (mProcessor(std::move(item.mWorkItem.value())) ==
148                         Result::Stop) {
149                         shouldStop = true;
150                     }
151                 }
152                 item.mCompletedPromise.set_value();
153                 if (shouldStop) {
154                     return;
155                 }
156             }
157 
158             todo.clear();
159         }
160         mFinished = true;
161     }
162 
163     Processor mProcessor;
164     base::FunctorThread mThread;
165     std::vector<Command> mQueue;
166     base::Lock mLock;
167     base::ConditionVariable mCv;
168 
169     bool mStarted = false;
170     bool mFinished = false;
171 };
172 
173 }  // namespace base
174 }  // namespace android
175