• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 Google Inc. All rights reserved
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 // +build ignore
16 
17 #include "thread_pool.h"
18 
19 #include <condition_variable>
20 #include <mutex>
21 #include <stack>
22 #include <thread>
23 #include <vector>
24 
25 #include "affinity.h"
26 
27 class ThreadPoolImpl : public ThreadPool {
28  public:
ThreadPoolImpl(int num_threads)29   explicit ThreadPoolImpl(int num_threads)
30       : is_waiting_(false) {
31     SetAffinityForMultiThread();
32     threads_.reserve(num_threads);
33     for (int i = 0; i < num_threads; i++) {
34       threads_.push_back(thread([this]() { Loop(); }));
35     }
36   }
37 
~ThreadPoolImpl()38   virtual ~ThreadPoolImpl() override {
39   }
40 
Submit(function<void (void)> task)41   virtual void Submit(function<void(void)> task) override {
42     unique_lock<mutex> lock(mu_);
43     tasks_.push(task);
44     cond_.notify_one();
45   }
46 
Wait()47   virtual void Wait() override {
48     {
49       unique_lock<mutex> lock(mu_);
50       is_waiting_ = true;
51       cond_.notify_all();
52     }
53 
54     for (thread& th : threads_) {
55       th.join();
56     }
57 
58     SetAffinityForSingleThread();
59   }
60 
61  private:
Loop()62   void Loop() {
63     while (true) {
64       function<void(void)> task;
65       {
66         unique_lock<mutex> lock(mu_);
67         if (tasks_.empty()) {
68           if (is_waiting_)
69             return;
70           cond_.wait(lock);
71         }
72 
73         if (tasks_.empty())
74           continue;
75 
76         task = tasks_.top();
77         tasks_.pop();
78       }
79       task();
80     }
81   }
82 
83   vector<thread> threads_;
84   mutex mu_;
85   condition_variable cond_;
86   stack<function<void(void)>> tasks_;
87   bool is_waiting_;
88 };
89 
NewThreadPool(int num_threads)90 ThreadPool* NewThreadPool(int num_threads) {
91   return new ThreadPoolImpl(num_threads);
92 }
93