• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2017 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #pragma once
16 
17 #include "base/Compiler.h"
18 #include "base/Optional.h"
19 #include "base/System.h"
20 #include "base/WorkerThread.h"
21 
22 #include <atomic>
23 #include <functional>
24 #include <memory>
25 #include <utility>
26 #include <vector>
27 
28 //
29 // ThreadPool<Item> - a simple collection of worker threads to process enqueued
30 // items on multiple cores.
31 //
32 // To create a thread pool supply a processing function and an optional number
33 // of threads to use (default is number of CPU cores).
34 // Thread pool distributes the work in simple round robin manner over all its
35 // workers - this means individual items should be simple and take similar time
36 // to process.
37 //
38 // Usage is very similar to one of WorkerThread, with difference being in the
39 // number of worker threads used and in existence of explicit done() method:
40 //
41 //      struct WorkItem { int number; };
42 //
43 //      ThreadPool<WorkItem> tp([](WorkItem&& item) { std::cout << item.num; });
44 //      CHECK(tp.start()) << "Failed to start the thread pool";
45 //      tp.enqueue({1});
46 //      tp.enqueue({2});
47 //      tp.enqueue({3});
48 //      tp.enqueue({4});
49 //      tp.enqueue({5});
50 //      tp.done();
51 //      tp.join();
52 //
53 // Make sure that the processing function won't block worker threads - thread
54 // pool has no way of detecting it and may potentially get all workers to block,
55 // resulting in a hanging application.
56 //
57 
58 namespace android {
59 namespace base {
60 
61 template <class ItemT>
62 class ThreadPool {
63     DISALLOW_COPY_AND_ASSIGN(ThreadPool);
64 
65 public:
66     using Item = ItemT;
67     using Worker = WorkerThread<Optional<Item>>;
68     using Processor = std::function<void(Item&&)>;
69 
ThreadPool(int threads,Processor && processor)70     ThreadPool(int threads, Processor&& processor)
71         : mProcessor(std::move(processor)) {
72         if (threads < 1) {
73             threads = android::base::getCpuCoreCount();
74         }
75         mWorkers = std::vector<Optional<Worker>>(threads);
76         for (auto& workerPtr : mWorkers) {
77             workerPtr.emplace([this](Optional<Item>&& item) {
78                 if (!item) {
79                     return Worker::Result::Stop;
80                 }
81                 mProcessor(std::move(item.value()));
82                 return Worker::Result::Continue;
83             });
84         }
85     }
ThreadPool(Processor && processor)86     explicit ThreadPool(Processor&& processor)
87         : ThreadPool(0, std::move(processor)) {}
~ThreadPool()88     ~ThreadPool() {
89         done();
90         join();
91     }
92 
start()93     bool start() {
94         for (auto& workerPtr : mWorkers) {
95             if (workerPtr->start()) {
96                 ++mValidWorkersCount;
97             } else {
98                 workerPtr.clear();
99             }
100         }
101         return mValidWorkersCount > 0;
102     }
103 
done()104     void done() {
105         for (auto& workerPtr : mWorkers) {
106             if (workerPtr) {
107                 workerPtr->enqueue(kNullopt);
108             }
109         }
110     }
111 
join()112     void join() {
113         for (auto& workerPtr : mWorkers) {
114             if (workerPtr) {
115                 workerPtr->join();
116             }
117         }
118         mWorkers.clear();
119         mValidWorkersCount = 0;
120     }
121 
enqueue(Item && item)122     void enqueue(Item&& item) {
123         // Iterate over the worker threads until we find a one that's running.
124         // TODO(b/187082169, warty): We rely on this round-robin strategy in SyncThread
125         for (;;) {
126             int currentIndex =
127                     mNextWorkerIndex.fetch_add(1, std::memory_order_relaxed);
128             auto& workerPtr = mWorkers[currentIndex % mWorkers.size()];
129             if (workerPtr) {
130                 workerPtr->enqueue(std::move(item));
131                 break;
132             }
133         }
134     }
135 
numWorkers()136     int numWorkers() const { return mValidWorkersCount; }
137 
138 private:
139     Processor mProcessor;
140     std::vector<Optional<Worker>> mWorkers;
141     std::atomic<int> mNextWorkerIndex{0};
142     int mValidWorkersCount{0};
143 };
144 
145 }  // namespace base
146 }  // namespace android
147