• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
11 #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
12 
13 
14 namespace Eigen {
15 
16 template <typename Environment>
17 class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
18  public:
19   typedef typename Environment::Task Task;
20   typedef RunQueue<Task, 1024> Queue;
21 
22   NonBlockingThreadPoolTempl(int num_threads, Environment env = Environment())
env_(env)23       : env_(env),
24         threads_(num_threads),
25         queues_(num_threads),
26         coprimes_(num_threads),
27         waiters_(num_threads),
28         blocked_(0),
29         spinning_(0),
30         done_(false),
31         ec_(waiters_) {
32     waiters_.resize(num_threads);
33 
34     // Calculate coprimes of num_threads.
35     // Coprimes are used for a random walk over all threads in Steal
36     // and NonEmptyQueueIndex. Iteration is based on the fact that if we take
37     // a walk starting thread index t and calculate num_threads - 1 subsequent
38     // indices as (t + coprime) % num_threads, we will cover all threads without
39     // repetitions (effectively getting a presudo-random permutation of thread
40     // indices).
41     for (int i = 1; i <= num_threads; i++) {
42       unsigned a = i;
43       unsigned b = num_threads;
44       // If GCD(a, b) == 1, then a and b are coprimes.
45       while (b != 0) {
46         unsigned tmp = a;
47         a = b;
48         b = tmp % b;
49       }
50       if (a == 1) {
51         coprimes_.push_back(i);
52       }
53     }
54     for (int i = 0; i < num_threads; i++) {
55       queues_.push_back(new Queue());
56     }
57     for (int i = 0; i < num_threads; i++) {
58       threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
59     }
60   }
61 
~NonBlockingThreadPoolTempl()62   ~NonBlockingThreadPoolTempl() {
63     done_ = true;
64     // Now if all threads block without work, they will start exiting.
65     // But note that threads can continue to work arbitrary long,
66     // block, submit new work, unblock and otherwise live full life.
67     ec_.Notify(true);
68 
69     // Join threads explicitly to avoid destruction order issues.
70     for (size_t i = 0; i < threads_.size(); i++) delete threads_[i];
71     for (size_t i = 0; i < threads_.size(); i++) delete queues_[i];
72   }
73 
Schedule(std::function<void ()> fn)74   void Schedule(std::function<void()> fn) {
75     Task t = env_.CreateTask(std::move(fn));
76     PerThread* pt = GetPerThread();
77     if (pt->pool == this) {
78       // Worker thread of this pool, push onto the thread's queue.
79       Queue* q = queues_[pt->thread_id];
80       t = q->PushFront(std::move(t));
81     } else {
82       // A free-standing thread (or worker of another pool), push onto a random
83       // queue.
84       Queue* q = queues_[Rand(&pt->rand) % queues_.size()];
85       t = q->PushBack(std::move(t));
86     }
87     // Note: below we touch this after making w available to worker threads.
88     // Strictly speaking, this can lead to a racy-use-after-free. Consider that
89     // Schedule is called from a thread that is neither main thread nor a worker
90     // thread of this pool. Then, execution of w directly or indirectly
91     // completes overall computations, which in turn leads to destruction of
92     // this. We expect that such scenario is prevented by program, that is,
93     // this is kept alive while any threads can potentially be in Schedule.
94     if (!t.f)
95       ec_.Notify(false);
96     else
97       env_.ExecuteTask(t);  // Push failed, execute directly.
98   }
99 
NumThreads()100   int NumThreads() const final {
101     return static_cast<int>(threads_.size());
102   }
103 
CurrentThreadId()104   int CurrentThreadId() const final {
105     const PerThread* pt =
106         const_cast<NonBlockingThreadPoolTempl*>(this)->GetPerThread();
107     if (pt->pool == this) {
108       return pt->thread_id;
109     } else {
110       return -1;
111     }
112   }
113 
114  private:
115   typedef typename Environment::EnvThread Thread;
116 
117   struct PerThread {
PerThreadPerThread118     constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { }
119     NonBlockingThreadPoolTempl* pool;  // Parent pool, or null for normal threads.
120     uint64_t rand;  // Random generator state.
121     int thread_id;  // Worker thread index in pool.
122   };
123 
124   Environment env_;
125   MaxSizeVector<Thread*> threads_;
126   MaxSizeVector<Queue*> queues_;
127   MaxSizeVector<unsigned> coprimes_;
128   MaxSizeVector<EventCount::Waiter> waiters_;
129   std::atomic<unsigned> blocked_;
130   std::atomic<bool> spinning_;
131   std::atomic<bool> done_;
132   EventCount ec_;
133 
134   // Main worker thread loop.
WorkerLoop(int thread_id)135   void WorkerLoop(int thread_id) {
136     PerThread* pt = GetPerThread();
137     pt->pool = this;
138     pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
139     pt->thread_id = thread_id;
140     Queue* q = queues_[thread_id];
141     EventCount::Waiter* waiter = &waiters_[thread_id];
142     for (;;) {
143       Task t = q->PopFront();
144       if (!t.f) {
145         t = Steal();
146         if (!t.f) {
147           // Leave one thread spinning. This reduces latency.
148           // TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it.
149           // Also, the time it takes to attempt to steal work 1000 times depends
150           // on the size of the thread pool. However the speed at which the user
151           // of the thread pool submit tasks is independent of the size of the
152           // pool. Consider a time based limit instead.
153           if (!spinning_ && !spinning_.exchange(true)) {
154             for (int i = 0; i < 1000 && !t.f; i++) {
155               t = Steal();
156             }
157             spinning_ = false;
158           }
159           if (!t.f) {
160             if (!WaitForWork(waiter, &t)) {
161               return;
162             }
163           }
164         }
165       }
166       if (t.f) {
167         env_.ExecuteTask(t);
168       }
169     }
170   }
171 
172   // Steal tries to steal work from other worker threads in best-effort manner.
Steal()173   Task Steal() {
174     PerThread* pt = GetPerThread();
175     const size_t size = queues_.size();
176     unsigned r = Rand(&pt->rand);
177     unsigned inc = coprimes_[r % coprimes_.size()];
178     unsigned victim = r % size;
179     for (unsigned i = 0; i < size; i++) {
180       Task t = queues_[victim]->PopBack();
181       if (t.f) {
182         return t;
183       }
184       victim += inc;
185       if (victim >= size) {
186         victim -= size;
187       }
188     }
189     return Task();
190   }
191 
192   // WaitForWork blocks until new work is available (returns true), or if it is
193   // time to exit (returns false). Can optionally return a task to execute in t
194   // (in such case t.f != nullptr on return).
WaitForWork(EventCount::Waiter * waiter,Task * t)195   bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
196     eigen_assert(!t->f);
197     // We already did best-effort emptiness check in Steal, so prepare for
198     // blocking.
199     ec_.Prewait(waiter);
200     // Now do a reliable emptiness check.
201     int victim = NonEmptyQueueIndex();
202     if (victim != -1) {
203       ec_.CancelWait(waiter);
204       *t = queues_[victim]->PopBack();
205       return true;
206     }
207     // Number of blocked threads is used as termination condition.
208     // If we are shutting down and all worker threads blocked without work,
209     // that's we are done.
210     blocked_++;
211     if (done_ && blocked_ == threads_.size()) {
212       ec_.CancelWait(waiter);
213       // Almost done, but need to re-check queues.
214       // Consider that all queues are empty and all worker threads are preempted
215       // right after incrementing blocked_ above. Now a free-standing thread
216       // submits work and calls destructor (which sets done_). If we don't
217       // re-check queues, we will exit leaving the work unexecuted.
218       if (NonEmptyQueueIndex() != -1) {
219         // Note: we must not pop from queues before we decrement blocked_,
220         // otherwise the following scenario is possible. Consider that instead
221         // of checking for emptiness we popped the only element from queues.
222         // Now other worker threads can start exiting, which is bad if the
223         // work item submits other work. So we just check emptiness here,
224         // which ensures that all worker threads exit at the same time.
225         blocked_--;
226         return true;
227       }
228       // Reached stable termination state.
229       ec_.Notify(true);
230       return false;
231     }
232     ec_.CommitWait(waiter);
233     blocked_--;
234     return true;
235   }
236 
NonEmptyQueueIndex()237   int NonEmptyQueueIndex() {
238     PerThread* pt = GetPerThread();
239     const size_t size = queues_.size();
240     unsigned r = Rand(&pt->rand);
241     unsigned inc = coprimes_[r % coprimes_.size()];
242     unsigned victim = r % size;
243     for (unsigned i = 0; i < size; i++) {
244       if (!queues_[victim]->Empty()) {
245         return victim;
246       }
247       victim += inc;
248       if (victim >= size) {
249         victim -= size;
250       }
251     }
252     return -1;
253   }
254 
GetPerThread()255   static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
256     EIGEN_THREAD_LOCAL PerThread per_thread_;
257     PerThread* pt = &per_thread_;
258     return pt;
259   }
260 
Rand(uint64_t * state)261   static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
262     uint64_t current = *state;
263     // Update the internal state
264     *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
265     // Generate the random output (using the PCG-XSH-RS scheme)
266     return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
267   }
268 };
269 
270 typedef NonBlockingThreadPoolTempl<StlThreadEnvironment> NonBlockingThreadPool;
271 
272 }  // namespace Eigen
273 
274 #endif  // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
275