1 // Copyright (C) 2017 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #pragma once 16 17 #include "base/Compiler.h" 18 #include "base/Optional.h" 19 #include "base/System.h" 20 #include "base/WorkerThread.h" 21 22 #include <atomic> 23 #include <functional> 24 #include <memory> 25 #include <utility> 26 #include <vector> 27 28 // 29 // ThreadPool<Item> - a simple collection of worker threads to process enqueued 30 // items on multiple cores. 31 // 32 // To create a thread pool supply a processing function and an optional number 33 // of threads to use (default is number of CPU cores). 34 // Thread pool distributes the work in simple round robin manner over all its 35 // workers - this means individual items should be simple and take similar time 36 // to process. 37 // 38 // Usage is very similar to one of WorkerThread, with difference being in the 39 // number of worker threads used and in existence of explicit done() method: 40 // 41 // struct WorkItem { int number; }; 42 // 43 // ThreadPool<WorkItem> tp([](WorkItem&& item) { std::cout << item.num; }); 44 // CHECK(tp.start()) << "Failed to start the thread pool"; 45 // tp.enqueue({1}); 46 // tp.enqueue({2}); 47 // tp.enqueue({3}); 48 // tp.enqueue({4}); 49 // tp.enqueue({5}); 50 // tp.done(); 51 // tp.join(); 52 // 53 // Make sure that the processing function won't block worker threads - thread 54 // pool has no way of detecting it and may potentially get all workers to block, 55 // resulting in a hanging application. 56 // 57 58 namespace android { 59 namespace base { 60 61 template <class ItemT> 62 class ThreadPool { 63 DISALLOW_COPY_AND_ASSIGN(ThreadPool); 64 65 public: 66 using Item = ItemT; 67 using Worker = WorkerThread<Optional<Item>>; 68 using Processor = std::function<void(Item&&)>; 69 ThreadPool(int threads,Processor && processor)70 ThreadPool(int threads, Processor&& processor) 71 : mProcessor(std::move(processor)) { 72 if (threads < 1) { 73 threads = android::base::getCpuCoreCount(); 74 } 75 mWorkers = std::vector<Optional<Worker>>(threads); 76 for (auto& workerPtr : mWorkers) { 77 workerPtr.emplace([this](Optional<Item>&& item) { 78 if (!item) { 79 return Worker::Result::Stop; 80 } 81 mProcessor(std::move(item.value())); 82 return Worker::Result::Continue; 83 }); 84 } 85 } ThreadPool(Processor && processor)86 explicit ThreadPool(Processor&& processor) 87 : ThreadPool(0, std::move(processor)) {} ~ThreadPool()88 ~ThreadPool() { 89 done(); 90 join(); 91 } 92 start()93 bool start() { 94 for (auto& workerPtr : mWorkers) { 95 if (workerPtr->start()) { 96 ++mValidWorkersCount; 97 } else { 98 workerPtr.clear(); 99 } 100 } 101 return mValidWorkersCount > 0; 102 } 103 done()104 void done() { 105 for (auto& workerPtr : mWorkers) { 106 if (workerPtr) { 107 workerPtr->enqueue(kNullopt); 108 } 109 } 110 } 111 join()112 void join() { 113 for (auto& workerPtr : mWorkers) { 114 if (workerPtr) { 115 workerPtr->join(); 116 } 117 } 118 mWorkers.clear(); 119 mValidWorkersCount = 0; 120 } 121 enqueue(Item && item)122 void enqueue(Item&& item) { 123 // Iterate over the worker threads until we find a one that's running. 124 // TODO(b/187082169, warty): We rely on this round-robin strategy in SyncThread 125 for (;;) { 126 int currentIndex = 127 mNextWorkerIndex.fetch_add(1, std::memory_order_relaxed); 128 auto& workerPtr = mWorkers[currentIndex % mWorkers.size()]; 129 if (workerPtr) { 130 workerPtr->enqueue(std::move(item)); 131 break; 132 } 133 } 134 } 135 numWorkers()136 int numWorkers() const { return mValidWorkersCount; } 137 138 private: 139 Processor mProcessor; 140 std::vector<Optional<Worker>> mWorkers; 141 std::atomic<int> mNextWorkerIndex{0}; 142 int mValidWorkersCount{0}; 143 }; 144 145 } // namespace base 146 } // namespace android 147