1 // This file is part of Eigen, a lightweight C++ template library 2 // for linear algebra. 3 // 4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com> 5 // 6 // This Source Code Form is subject to the terms of the Mozilla 7 // Public License v. 2.0. If a copy of the MPL was not distributed 8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/. 9 10 #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ 11 #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ 12 13 14 namespace Eigen { 15 16 // RunQueue is a fixed-size, partially non-blocking deque or Work items. 17 // Operations on front of the queue must be done by a single thread (owner), 18 // operations on back of the queue can be done by multiple threads concurrently. 19 // 20 // Algorithm outline: 21 // All remote threads operating on the queue back are serialized by a mutex. 22 // This ensures that at most two threads access state: owner and one remote 23 // thread (Size aside). The algorithm ensures that the occupied region of the 24 // underlying array is logically continuous (can wraparound, but no stray 25 // occupied elements). Owner operates on one end of this region, remote thread 26 // operates on the other end. Synchronization between these threads 27 // (potential consumption of the last element and take up of the last empty 28 // element) happens by means of state variable in each element. States are: 29 // empty, busy (in process of insertion of removal) and ready. Threads claim 30 // elements (empty->busy and ready->busy transitions) by means of a CAS 31 // operation. The finishing transition (busy->empty and busy->ready) are done 32 // with plain store as the element is exclusively owned by the current thread. 33 // 34 // Note: we could permit only pointers as elements, then we would not need 35 // separate state variable as null/non-null pointer value would serve as state, 36 // but that would require malloc/free per operation for large, complex values 37 // (and this is designed to store std::function<()>). 38 template <typename Work, unsigned kSize> 39 class RunQueue { 40 public: RunQueue()41 RunQueue() : front_(0), back_(0) { 42 // require power-of-two for fast masking 43 eigen_assert((kSize & (kSize - 1)) == 0); 44 eigen_assert(kSize > 2); // why would you do this? 45 eigen_assert(kSize <= (64 << 10)); // leave enough space for counter 46 for (unsigned i = 0; i < kSize; i++) 47 array_[i].state.store(kEmpty, std::memory_order_relaxed); 48 } 49 ~RunQueue()50 ~RunQueue() { eigen_assert(Size() == 0); } 51 52 // PushFront inserts w at the beginning of the queue. 53 // If queue is full returns w, otherwise returns default-constructed Work. PushFront(Work w)54 Work PushFront(Work w) { 55 unsigned front = front_.load(std::memory_order_relaxed); 56 Elem* e = &array_[front & kMask]; 57 uint8_t s = e->state.load(std::memory_order_relaxed); 58 if (s != kEmpty || 59 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) 60 return w; 61 front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed); 62 e->w = std::move(w); 63 e->state.store(kReady, std::memory_order_release); 64 return Work(); 65 } 66 67 // PopFront removes and returns the first element in the queue. 68 // If the queue was empty returns default-constructed Work. PopFront()69 Work PopFront() { 70 unsigned front = front_.load(std::memory_order_relaxed); 71 Elem* e = &array_[(front - 1) & kMask]; 72 uint8_t s = e->state.load(std::memory_order_relaxed); 73 if (s != kReady || 74 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) 75 return Work(); 76 Work w = std::move(e->w); 77 e->state.store(kEmpty, std::memory_order_release); 78 front = ((front - 1) & kMask2) | (front & ~kMask2); 79 front_.store(front, std::memory_order_relaxed); 80 return w; 81 } 82 83 // PushBack adds w at the end of the queue. 84 // If queue is full returns w, otherwise returns default-constructed Work. PushBack(Work w)85 Work PushBack(Work w) { 86 std::unique_lock<std::mutex> lock(mutex_); 87 unsigned back = back_.load(std::memory_order_relaxed); 88 Elem* e = &array_[(back - 1) & kMask]; 89 uint8_t s = e->state.load(std::memory_order_relaxed); 90 if (s != kEmpty || 91 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) 92 return w; 93 back = ((back - 1) & kMask2) | (back & ~kMask2); 94 back_.store(back, std::memory_order_relaxed); 95 e->w = std::move(w); 96 e->state.store(kReady, std::memory_order_release); 97 return Work(); 98 } 99 100 // PopBack removes and returns the last elements in the queue. 101 // Can fail spuriously. PopBack()102 Work PopBack() { 103 if (Empty()) return Work(); 104 std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); 105 if (!lock) return Work(); 106 unsigned back = back_.load(std::memory_order_relaxed); 107 Elem* e = &array_[back & kMask]; 108 uint8_t s = e->state.load(std::memory_order_relaxed); 109 if (s != kReady || 110 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) 111 return Work(); 112 Work w = std::move(e->w); 113 e->state.store(kEmpty, std::memory_order_release); 114 back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed); 115 return w; 116 } 117 118 // PopBackHalf removes and returns half last elements in the queue. 119 // Returns number of elements removed. But can also fail spuriously. PopBackHalf(std::vector<Work> * result)120 unsigned PopBackHalf(std::vector<Work>* result) { 121 if (Empty()) return 0; 122 std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); 123 if (!lock) return 0; 124 unsigned back = back_.load(std::memory_order_relaxed); 125 unsigned size = Size(); 126 unsigned mid = back; 127 if (size > 1) mid = back + (size - 1) / 2; 128 unsigned n = 0; 129 unsigned start = 0; 130 for (; static_cast<int>(mid - back) >= 0; mid--) { 131 Elem* e = &array_[mid & kMask]; 132 uint8_t s = e->state.load(std::memory_order_relaxed); 133 if (n == 0) { 134 if (s != kReady || 135 !e->state.compare_exchange_strong(s, kBusy, 136 std::memory_order_acquire)) 137 continue; 138 start = mid; 139 } else { 140 // Note: no need to store temporal kBusy, we exclusively own these 141 // elements. 142 eigen_assert(s == kReady); 143 } 144 result->push_back(std::move(e->w)); 145 e->state.store(kEmpty, std::memory_order_release); 146 n++; 147 } 148 if (n != 0) 149 back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed); 150 return n; 151 } 152 153 // Size returns current queue size. 154 // Can be called by any thread at any time. Size()155 unsigned Size() const { 156 // Emptiness plays critical role in thread pool blocking. So we go to great 157 // effort to not produce false positives (claim non-empty queue as empty). 158 for (;;) { 159 // Capture a consistent snapshot of front/tail. 160 unsigned front = front_.load(std::memory_order_acquire); 161 unsigned back = back_.load(std::memory_order_acquire); 162 unsigned front1 = front_.load(std::memory_order_relaxed); 163 if (front != front1) continue; 164 int size = (front & kMask2) - (back & kMask2); 165 // Fix overflow. 166 if (size < 0) size += 2 * kSize; 167 // Order of modification in push/pop is crafted to make the queue look 168 // larger than it is during concurrent modifications. E.g. pop can 169 // decrement size before the corresponding push has incremented it. 170 // So the computed size can be up to kSize + 1, fix it. 171 if (size > static_cast<int>(kSize)) size = kSize; 172 return size; 173 } 174 } 175 176 // Empty tests whether container is empty. 177 // Can be called by any thread at any time. Empty()178 bool Empty() const { return Size() == 0; } 179 180 private: 181 static const unsigned kMask = kSize - 1; 182 static const unsigned kMask2 = (kSize << 1) - 1; 183 struct Elem { 184 std::atomic<uint8_t> state; 185 Work w; 186 }; 187 enum { 188 kEmpty, 189 kBusy, 190 kReady, 191 }; 192 std::mutex mutex_; 193 // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of 194 // front/back, repsectively. The remaining bits contain modification counters 195 // that are incremented on Push operations. This allows us to (1) distinguish 196 // between empty and full conditions (if we would use log(kSize) bits for 197 // position, these conditions would be indistinguishable); (2) obtain 198 // consistent snapshot of front_/back_ for Size operation using the 199 // modification counters. 200 std::atomic<unsigned> front_; 201 std::atomic<unsigned> back_; 202 Elem array_[kSize]; 203 204 RunQueue(const RunQueue&) = delete; 205 void operator=(const RunQueue&) = delete; 206 }; 207 208 } // namespace Eigen 209 210 #endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ 211