1 // Copyright 2017 The Abseil Authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_ 16 #define ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_ 17 18 #include <cassert> 19 #include <cstddef> 20 #include <functional> 21 #include <queue> 22 #include <thread> // NOLINT(build/c++11) 23 #include <vector> 24 25 #include "absl/base/thread_annotations.h" 26 #include "absl/synchronization/mutex.h" 27 28 namespace absl { 29 ABSL_NAMESPACE_BEGIN 30 namespace synchronization_internal { 31 32 // A simple ThreadPool implementation for tests. 33 class ThreadPool { 34 public: ThreadPool(int num_threads)35 explicit ThreadPool(int num_threads) { 36 for (int i = 0; i < num_threads; ++i) { 37 threads_.push_back(std::thread(&ThreadPool::WorkLoop, this)); 38 } 39 } 40 41 ThreadPool(const ThreadPool &) = delete; 42 ThreadPool &operator=(const ThreadPool &) = delete; 43 ~ThreadPool()44 ~ThreadPool() { 45 { 46 absl::MutexLock l(&mu_); 47 for (size_t i = 0; i < threads_.size(); i++) { 48 queue_.push(nullptr); // Shutdown signal. 49 } 50 } 51 for (auto &t : threads_) { 52 t.join(); 53 } 54 } 55 56 // Schedule a function to be run on a ThreadPool thread immediately. Schedule(std::function<void ()> func)57 void Schedule(std::function<void()> func) { 58 assert(func != nullptr); 59 absl::MutexLock l(&mu_); 60 queue_.push(std::move(func)); 61 } 62 63 private: WorkAvailable()64 bool WorkAvailable() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 65 return !queue_.empty(); 66 } 67 WorkLoop()68 void WorkLoop() { 69 while (true) { 70 std::function<void()> func; 71 { 72 absl::MutexLock l(&mu_); 73 mu_.Await(absl::Condition(this, &ThreadPool::WorkAvailable)); 74 func = std::move(queue_.front()); 75 queue_.pop(); 76 } 77 if (func == nullptr) { // Shutdown signal. 78 break; 79 } 80 func(); 81 } 82 } 83 84 absl::Mutex mu_; 85 std::queue<std::function<void()>> queue_ ABSL_GUARDED_BY(mu_); 86 std::vector<std::thread> threads_; 87 }; 88 89 } // namespace synchronization_internal 90 ABSL_NAMESPACE_END 91 } // namespace absl 92 93 #endif // ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_ 94