• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
12 
13 namespace Eigen {
14 
15 // Use the SimpleThreadPool by default. We'll switch to the new non blocking
16 // thread pool later.
17 #ifndef EIGEN_USE_SIMPLE_THREAD_POOL
18 template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>;
19 typedef NonBlockingThreadPool ThreadPool;
20 #else
21 template <typename Env> using ThreadPoolTempl = SimpleThreadPoolTempl<Env>;
22 typedef SimpleThreadPool ThreadPool;
23 #endif
24 
25 
26 // Barrier is an object that allows one or more threads to wait until
27 // Notify has been called a specified number of times.
28 class Barrier {
29  public:
Barrier(unsigned int count)30   Barrier(unsigned int count) : state_(count << 1), notified_(false) {
31     eigen_assert(((count << 1) >> 1) == count);
32   }
~Barrier()33   ~Barrier() {
34     eigen_assert((state_>>1) == 0);
35   }
36 
Notify()37   void Notify() {
38     unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2;
39     if (v != 1) {
40       eigen_assert(((v + 2) & ~1) != 0);
41       return;  // either count has not dropped to 0, or waiter is not waiting
42     }
43     std::unique_lock<std::mutex> l(mu_);
44     eigen_assert(!notified_);
45     notified_ = true;
46     cv_.notify_all();
47   }
48 
Wait()49   void Wait() {
50     unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel);
51     if ((v >> 1) == 0) return;
52     std::unique_lock<std::mutex> l(mu_);
53     while (!notified_) {
54       cv_.wait(l);
55     }
56   }
57 
58  private:
59   std::mutex mu_;
60   std::condition_variable cv_;
61   std::atomic<unsigned int> state_;  // low bit is waiter flag
62   bool notified_;
63 };
64 
65 
66 // Notification is an object that allows a user to to wait for another
67 // thread to signal a notification that an event has occurred.
68 //
69 // Multiple threads can wait on the same Notification object,
70 // but only one caller must call Notify() on the object.
71 struct Notification : Barrier {
NotificationNotification72   Notification() : Barrier(1) {};
73 };
74 
75 
76 // Runs an arbitrary function and then calls Notify() on the passed in
77 // Notification.
78 template <typename Function, typename... Args> struct FunctionWrapperWithNotification
79 {
runFunctionWrapperWithNotification80   static void run(Notification* n, Function f, Args... args) {
81     f(args...);
82     if (n) {
83       n->Notify();
84     }
85   }
86 };
87 
88 template <typename Function, typename... Args> struct FunctionWrapperWithBarrier
89 {
runFunctionWrapperWithBarrier90   static void run(Barrier* b, Function f, Args... args) {
91     f(args...);
92     if (b) {
93       b->Notify();
94     }
95   }
96 };
97 
98 template <typename SyncType>
wait_until_ready(SyncType * n)99 static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
100   if (n) {
101     n->Wait();
102   }
103 }
104 
105 
106 // Build a thread pool device on top the an existing pool of threads.
107 struct ThreadPoolDevice {
108   // The ownership of the thread pool remains with the caller.
ThreadPoolDeviceThreadPoolDevice109   ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores) : pool_(pool), num_threads_(num_cores) { }
110 
allocateThreadPoolDevice111   EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
112     return internal::aligned_malloc(num_bytes);
113   }
114 
deallocateThreadPoolDevice115   EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
116     internal::aligned_free(buffer);
117   }
118 
memcpyThreadPoolDevice119   EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
120     ::memcpy(dst, src, n);
121   }
memcpyHostToDeviceThreadPoolDevice122   EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const {
123     memcpy(dst, src, n);
124   }
memcpyDeviceToHostThreadPoolDevice125   EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const {
126     memcpy(dst, src, n);
127   }
128 
memsetThreadPoolDevice129   EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const {
130     ::memset(buffer, c, n);
131   }
132 
numThreadsThreadPoolDevice133   EIGEN_STRONG_INLINE int numThreads() const {
134     return num_threads_;
135   }
136 
firstLevelCacheSizeThreadPoolDevice137   EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const {
138     return l1CacheSize();
139   }
140 
lastLevelCacheSizeThreadPoolDevice141   EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
142     // The l3 cache size is shared between all the cores.
143     return l3CacheSize() / num_threads_;
144   }
145 
majorDeviceVersionThreadPoolDevice146   EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
147     // Should return an enum that encodes the ISA supported by the CPU
148     return 1;
149   }
150 
151   template <class Function, class... Args>
enqueueThreadPoolDevice152   EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const {
153     Notification* n = new Notification();
154     pool_->Schedule(std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, f, args...));
155     return n;
156   }
157 
158   template <class Function, class... Args>
enqueue_with_barrierThreadPoolDevice159   EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b,
160                                                 Function&& f,
161                                                 Args&&... args) const {
162     pool_->Schedule(std::bind(
163         &FunctionWrapperWithBarrier<Function, Args...>::run, b, f, args...));
164   }
165 
166   template <class Function, class... Args>
enqueueNoNotificationThreadPoolDevice167   EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
168     pool_->Schedule(std::bind(f, args...));
169   }
170 
171   // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if
172   // called from one of the threads in pool_. Returns -1 otherwise.
currentThreadIdThreadPoolDevice173   EIGEN_STRONG_INLINE int currentThreadId() const {
174     return pool_->CurrentThreadId();
175   }
176 
177   // parallelFor executes f with [0, n) arguments in parallel and waits for
178   // completion. F accepts a half-open interval [first, last).
179   // Block size is choosen based on the iteration cost and resulting parallel
180   // efficiency. If block_align is not nullptr, it is called to round up the
181   // block size.
parallelForThreadPoolDevice182   void parallelFor(Index n, const TensorOpCost& cost,
183                    std::function<Index(Index)> block_align,
184                    std::function<void(Index, Index)> f) const {
185     typedef TensorCostModel<ThreadPoolDevice> CostModel;
186     if (n <= 1 || numThreads() == 1 ||
187         CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
188       f(0, n);
189       return;
190     }
191 
192     // Calculate block size based on (1) the iteration cost and (2) parallel
193     // efficiency. We want blocks to be not too small to mitigate
194     // parallelization overheads; not too large to mitigate tail
195     // effect and potential load imbalance and we also want number
196     // of blocks to be evenly dividable across threads.
197 
198     double block_size_f = 1.0 / CostModel::taskSize(1, cost);
199     Index block_size = numext::mini(n, numext::maxi<Index>(1, block_size_f));
200     const Index max_block_size =
201         numext::mini(n, numext::maxi<Index>(1, 2 * block_size_f));
202     if (block_align) {
203       Index new_block_size = block_align(block_size);
204       eigen_assert(new_block_size >= block_size);
205       block_size = numext::mini(n, new_block_size);
206     }
207     Index block_count = divup(n, block_size);
208     // Calculate parallel efficiency as fraction of total CPU time used for
209     // computations:
210     double max_efficiency =
211         static_cast<double>(block_count) /
212         (divup<int>(block_count, numThreads()) * numThreads());
213     // Now try to increase block size up to max_block_size as long as it
214     // doesn't decrease parallel efficiency.
215     for (Index prev_block_count = block_count; prev_block_count > 1;) {
216       // This is the next block size that divides size into a smaller number
217       // of blocks than the current block_size.
218       Index coarser_block_size = divup(n, prev_block_count - 1);
219       if (block_align) {
220         Index new_block_size = block_align(coarser_block_size);
221         eigen_assert(new_block_size >= coarser_block_size);
222         coarser_block_size = numext::mini(n, new_block_size);
223       }
224       if (coarser_block_size > max_block_size) {
225         break;  // Reached max block size. Stop.
226       }
227       // Recalculate parallel efficiency.
228       const Index coarser_block_count = divup(n, coarser_block_size);
229       eigen_assert(coarser_block_count < prev_block_count);
230       prev_block_count = coarser_block_count;
231       const double coarser_efficiency =
232           static_cast<double>(coarser_block_count) /
233           (divup<int>(coarser_block_count, numThreads()) * numThreads());
234       if (coarser_efficiency + 0.01 >= max_efficiency) {
235         // Taking it.
236         block_size = coarser_block_size;
237         block_count = coarser_block_count;
238         if (max_efficiency < coarser_efficiency) {
239           max_efficiency = coarser_efficiency;
240         }
241       }
242     }
243 
244     // Recursively divide size into halves until we reach block_size.
245     // Division code rounds mid to block_size, so we are guaranteed to get
246     // block_count leaves that do actual computations.
247     Barrier barrier(static_cast<unsigned int>(block_count));
248     std::function<void(Index, Index)> handleRange;
249     handleRange = [=, &handleRange, &barrier, &f](Index first, Index last) {
250       if (last - first <= block_size) {
251         // Single block or less, execute directly.
252         f(first, last);
253         barrier.Notify();
254         return;
255       }
256       // Split into halves and submit to the pool.
257       Index mid = first + divup((last - first) / 2, block_size) * block_size;
258       pool_->Schedule([=, &handleRange]() { handleRange(mid, last); });
259       pool_->Schedule([=, &handleRange]() { handleRange(first, mid); });
260     };
261     handleRange(0, n);
262     barrier.Wait();
263   }
264 
265   // Convenience wrapper for parallelFor that does not align blocks.
parallelForThreadPoolDevice266   void parallelFor(Index n, const TensorOpCost& cost,
267                    std::function<void(Index, Index)> f) const {
268     parallelFor(n, cost, nullptr, std::move(f));
269   }
270 
271  private:
272   ThreadPoolInterface* pool_;
273   int num_threads_;
274 };
275 
276 
277 }  // end namespace Eigen
278 
279 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
280