• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
16 #define ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
17 
18 #include <cassert>
19 #include <cstddef>
20 #include <functional>
21 #include <queue>
22 #include <thread>  // NOLINT(build/c++11)
23 #include <vector>
24 
25 #include "absl/base/thread_annotations.h"
26 #include "absl/synchronization/mutex.h"
27 
28 namespace absl {
29 ABSL_NAMESPACE_BEGIN
30 namespace synchronization_internal {
31 
32 // A simple ThreadPool implementation for tests.
33 class ThreadPool {
34  public:
ThreadPool(int num_threads)35   explicit ThreadPool(int num_threads) {
36     for (int i = 0; i < num_threads; ++i) {
37       threads_.push_back(std::thread(&ThreadPool::WorkLoop, this));
38     }
39   }
40 
41   ThreadPool(const ThreadPool &) = delete;
42   ThreadPool &operator=(const ThreadPool &) = delete;
43 
~ThreadPool()44   ~ThreadPool() {
45     {
46       absl::MutexLock l(&mu_);
47       for (size_t i = 0; i < threads_.size(); i++) {
48         queue_.push(nullptr);  // Shutdown signal.
49       }
50     }
51     for (auto &t : threads_) {
52       t.join();
53     }
54   }
55 
56   // Schedule a function to be run on a ThreadPool thread immediately.
Schedule(std::function<void ()> func)57   void Schedule(std::function<void()> func) {
58     assert(func != nullptr);
59     absl::MutexLock l(&mu_);
60     queue_.push(std::move(func));
61   }
62 
63  private:
WorkAvailable()64   bool WorkAvailable() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
65     return !queue_.empty();
66   }
67 
WorkLoop()68   void WorkLoop() {
69     while (true) {
70       std::function<void()> func;
71       {
72         absl::MutexLock l(&mu_);
73         mu_.Await(absl::Condition(this, &ThreadPool::WorkAvailable));
74         func = std::move(queue_.front());
75         queue_.pop();
76       }
77       if (func == nullptr) {  // Shutdown signal.
78         break;
79       }
80       func();
81     }
82   }
83 
84   absl::Mutex mu_;
85   std::queue<std::function<void()>> queue_ ABSL_GUARDED_BY(mu_);
86   std::vector<std::thread> threads_;
87 };
88 
89 }  // namespace synchronization_internal
90 ABSL_NAMESPACE_END
91 }  // namespace absl
92 
93 #endif  // ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
94