1 // Copyright 2016 Google Inc. All rights reserved 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 // +build ignore 16 17 #include "thread_pool.h" 18 19 #include <condition_variable> 20 #include <mutex> 21 #include <stack> 22 #include <thread> 23 #include <vector> 24 25 #include "affinity.h" 26 27 class ThreadPoolImpl : public ThreadPool { 28 public: ThreadPoolImpl(int num_threads)29 explicit ThreadPoolImpl(int num_threads) 30 : is_waiting_(false) { 31 SetAffinityForMultiThread(); 32 threads_.reserve(num_threads); 33 for (int i = 0; i < num_threads; i++) { 34 threads_.push_back(thread([this]() { Loop(); })); 35 } 36 } 37 ~ThreadPoolImpl()38 virtual ~ThreadPoolImpl() override { 39 } 40 Submit(function<void (void)> task)41 virtual void Submit(function<void(void)> task) override { 42 unique_lock<mutex> lock(mu_); 43 tasks_.push(task); 44 cond_.notify_one(); 45 } 46 Wait()47 virtual void Wait() override { 48 { 49 unique_lock<mutex> lock(mu_); 50 is_waiting_ = true; 51 cond_.notify_all(); 52 } 53 54 for (thread& th : threads_) { 55 th.join(); 56 } 57 58 SetAffinityForSingleThread(); 59 } 60 61 private: Loop()62 void Loop() { 63 while (true) { 64 function<void(void)> task; 65 { 66 unique_lock<mutex> lock(mu_); 67 if (tasks_.empty()) { 68 if (is_waiting_) 69 return; 70 cond_.wait(lock); 71 } 72 73 if (tasks_.empty()) 74 continue; 75 76 task = tasks_.top(); 77 tasks_.pop(); 78 } 79 task(); 80 } 81 } 82 83 vector<thread> threads_; 84 mutex mu_; 85 condition_variable cond_; 86 stack<function<void(void)>> tasks_; 87 bool is_waiting_; 88 }; 89 NewThreadPool(int num_threads)90ThreadPool* NewThreadPool(int num_threads) { 91 return new ThreadPoolImpl(num_threads); 92 } 93