1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_LOCK_FREE_QUEUE_H
17 #define LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_LOCK_FREE_QUEUE_H
18
19 #include "coherency_line_size.h"
20 #include "libpandabase/utils/math_helpers.h"
21 #include <optional>
22 #include <map>
23 #include <atomic>
24 #include <array>
25 #include <set>
26
27 namespace ark::taskmanager::internal {
28
29 static constexpr size_t SP_MC_LOCK_FREE_QUEUE_DEFAULT_QUEUE_NODE_SIZE = 1UL << 5U;
30
31 /**
32 * @brief SPMCLockFreeQueue is single producer, multiple consumer lock free queue.
33 * @tparam T: Type of class you want ot store in queue
34 * @tparam QUEUE_NODE_SIZE: Size of one node in queue. Default value: SP_MC_LOCK_FREE_QUEUE_DEFAULT_QUEUE_NODE_SIZE
35 */
36 template <class T, size_t QUEUE_NODE_SIZE = SP_MC_LOCK_FREE_QUEUE_DEFAULT_QUEUE_NODE_SIZE>
37 class SPMCLockFreeQueue {
38 static_assert(ark::helpers::math::IsPowerOfTwo(QUEUE_NODE_SIZE));
39 static constexpr size_t QUEUE_NODE_MASK = QUEUE_NODE_SIZE - 1U;
40 static_assert(QUEUE_NODE_MASK > 0);
41 static constexpr size_t QUEUE_NODE_SHIFT = ark::helpers::math::GetIntLog2(QUEUE_NODE_SIZE);
42
43 static constexpr size_t CONSUMER_MAX_COUNT = 1UL << 5U;
44 static constexpr size_t DELETE_TRIGGER_PUSH_COUNT = QUEUE_NODE_SIZE;
45 static_assert(ark::helpers::math::IsPowerOfTwo(DELETE_TRIGGER_PUSH_COUNT));
46 static constexpr size_t DELETE_TRIGGER_PUSH_MASK = DELETE_TRIGGER_PUSH_COUNT - 1U;
47
48 // NOLINTBEGIN(misc-non-private-member-variables-in-classes)
49 template <class U, size_t NODE_SIZE>
50 struct QueueNode {
51 QueueNode() = default;
52 ~QueueNode() = default;
53
54 DEFAULT_COPY_SEMANTIC(QueueNode);
55 DEFAULT_MOVE_SEMANTIC(QueueNode);
56
57 size_t id {0};
58 std::atomic<QueueNode *> next {nullptr};
59 std::array<U, NODE_SIZE> buffer {};
60 };
61 // NOLINTEND(misc-non-private-member-variables-in-classes)
62
63 using NodeType = QueueNode<T, QUEUE_NODE_SIZE>;
64 using NodePtr = NodeType *;
65
66 class PopUserScope {
67 public:
PopUserScope(SPMCLockFreeQueue * queue)68 explicit PopUserScope(SPMCLockFreeQueue *queue) : queue_(queue)
69 {
70 // Atomic with acq_rel order reason: other threads should see correct value
71 queue_->popUserCount_.fetch_add(1U, std::memory_order_acq_rel);
72 }
~PopUserScope()73 ~PopUserScope()
74 {
75 // Atomic with acq_rel order reason: other threads should see correct value
76 queue_->popUserCount_.fetch_sub(1U, std::memory_order_acq_rel);
77 }
78
79 NO_COPY_SEMANTIC(PopUserScope);
80 NO_MOVE_SEMANTIC(PopUserScope);
81
82 private:
83 SPMCLockFreeQueue *queue_ = nullptr;
84 };
85
86 public:
87 using ThreadId = size_t;
88 using ValueType = T;
89
90 SPMCLockFreeQueue();
91 ~SPMCLockFreeQueue();
92
93 NO_COPY_SEMANTIC(SPMCLockFreeQueue);
94 NO_MOVE_SEMANTIC(SPMCLockFreeQueue);
95
96 /**
97 * @brief Registers thread in queue, should be called only on time by one thread.
98 * @returns Unique id for thread
99 */
100 size_t RegisterConsumer();
101
102 /**
103 * @brief Method pushes value in queue. Method can allocate new queue node.
104 * @param val: value that should be pushed.
105 */
106 void Push(T &&val);
107
108 /**
109 * @brief Method pops value from queue.
110 * @param consumer_id: unique consumer id that was gotten with RegisterConsumerMethod
111 * @returns value if queue have it, otherwise std::nullopt.
112 */
113 std::optional<T> Pop(size_t id);
114
115 /// @returns true if queue is empty.
116 bool inline IsEmpty() const;
117
118 /// @returns count of tasks inside queue.
119 size_t inline Size() const;
120
121 /**
122 * @brief Method deletes all retired nodes. Use this method if you know that no consumer
123 * threads will use Pop method until method execution ends
124 */
TryDeleteRetiredPtrs()125 void TryDeleteRetiredPtrs()
126 {
127 if (LIKELY(GetPopUserCount() == 0)) {
128 // Atomic with acquire order reason: get the latest value
129 auto currentCountOfThreads = consumerCount_.load(std::memory_order_acquire);
130 for (size_t id = 0; id < currentCountOfThreads; id++) {
131 auto &retiredPtrs = perConsumerRetiredPtrs_[id];
132 for (auto node : retiredPtrs) {
133 delete node;
134 }
135 retiredPtrs.clear();
136 }
137 }
138 }
139
140 private:
GetBufferIndex(size_t index)141 size_t inline GetBufferIndex(size_t index) const
142 {
143 return index & QUEUE_NODE_MASK;
144 }
145
GetNodeId(size_t index)146 size_t inline GetNodeId(size_t index) const
147 {
148 return index >> QUEUE_NODE_SHIFT;
149 }
150
151 /// Method @returns value of popUserCount_
GetPopUserCount()152 size_t GetPopUserCount() const
153 {
154 // Atomic with acquire order reason: getting correct value of popUserCount
155 return popUserCount_.load(std::memory_order_acquire);
156 }
157
158 /**
159 * @brief Method load atomic ptr to QueueNode
160 * @param atomicPtr: ref to prt that should be load
161 * @return correct value of atomicPtr
162 */
LoadAtomicPtr(std::atomic<NodePtr> & atomicPtr)163 NodePtr LoadAtomicPtr(std::atomic<NodePtr> &atomicPtr)
164 {
165 // Atomic with acquire order reason: get the latest value
166 return atomicPtr.load(std::memory_order_acquire);
167 }
168
169 /**
170 * @brief Writes ptr to list of retired ptrs to delete them later
171 * @param consumerId: id of consumer that retire ptr
172 * @param ptr: pointer to QueueNode that should be retired
173 */
RetirePtr(size_t consumerId,NodePtr ptr)174 void RetirePtr(size_t consumerId, NodePtr ptr)
175 {
176 auto &retiredPtrs = perConsumerRetiredPtrs_[consumerId];
177 retiredPtrs.push_back(ptr);
178 }
179
180 /**
181 * @brief Method tries move pop_index_ on next position.
182 * @param current_pop_index: value of pop_index_ that are owned by thread
183 * @returns true if currant_pop_index==pop_index and CAS was done, else false
184 */
TryMovePopIndex(size_t popIndex)185 inline bool TryMovePopIndex(size_t popIndex)
186 {
187 return popIndex_.compare_exchange_strong(
188 popIndex, popIndex + 1U,
189 // Atomic with acq_rel order reason: other threads should be correct value
190 std::memory_order_acq_rel);
191 }
192
193 /**
194 * @brief Method tries sets next head based on current. Also it's move old ptr to retired list
195 * @param currentHead: value of head that was loaded
196 * @param val: ref to T type variable to write first element of next node
197 * @param consumerId: id of consumer that want to set new head
198 * @returns true if head was changed, otherwise returns false
199 */
CompareAndSetNextHead(NodePtr currentHead,T & val,size_t consumerId)200 bool CompareAndSetNextHead(NodePtr currentHead, T &val, size_t consumerId)
201 {
202 auto nextHead = LoadAtomicPtr(currentHead->next);
203 ASSERT(nextHead != nullptr);
204 // Atomic with acq_rel order reason: other threads should be correct value
205 if (LIKELY(head_.compare_exchange_strong(currentHead, nextHead, std::memory_order_acq_rel))) {
206 // Now we should correctly delete a current_head.
207 RetirePtr(consumerId, currentHead);
208 // Finally we can return first value of new head
209 val = std::move(nextHead->buffer[0]);
210 return true;
211 }
212 return false;
213 }
214
215 alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t pushIndex_ {0};
216 alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t popIndex_ {0};
217
218 alignas(ark::COHERENCY_LINE_SIZE) std::atomic<NodePtr> head_ {nullptr};
219 alignas(ark::COHERENCY_LINE_SIZE) std::atomic<NodePtr> tail_ {nullptr};
220
221 alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t popUserCount_ {0};
222 alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t consumerCount_ {0};
223 alignas(ark::COHERENCY_LINE_SIZE) std::array<std::vector<NodePtr>, CONSUMER_MAX_COUNT> perConsumerRetiredPtrs_;
224 };
225
226 template <class T, size_t QUEUE_NODE_SIZE>
SPMCLockFreeQueue()227 SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::SPMCLockFreeQueue()
228 {
229 for (size_t id = 0; id < CONSUMER_MAX_COUNT; id++) {
230 perConsumerRetiredPtrs_[id] = {};
231 }
232 auto *node = new NodeType();
233 // Atomic with release order reason: other threads should see correct value
234 head_.store(node, std::memory_order_release);
235 // Atomic with release order reason: other threads should see correct value
236 tail_.store(node, std::memory_order_release);
237 }
238
239 template <class T, size_t QUEUE_NODE_SIZE>
~SPMCLockFreeQueue()240 SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::~SPMCLockFreeQueue()
241 {
242 // Check if now there are no elements in queue
243 ASSERT(IsEmpty());
244 ASSERT(GetPopUserCount() == 0);
245 TryDeleteRetiredPtrs();
246 // Atomic with acquire order reason: get the latest value
247 auto head = head_.load(std::memory_order_acquire);
248 delete head;
249 }
250
251 template <class T, size_t QUEUE_NODE_SIZE>
RegisterConsumer()252 size_t SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::RegisterConsumer()
253 {
254 // Atomic with acq_rel order reason: other threads should be correct value
255 auto id = consumerCount_.fetch_add(1UL, std::memory_order_acq_rel);
256 ASSERT(id < CONSUMER_MAX_COUNT);
257 return id;
258 }
259
260 template <class T, size_t QUEUE_NODE_SIZE>
Size()261 size_t inline SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::Size() const
262 {
263 while (true) {
264 // Atomic with acquire order reason: get the latest value
265 auto pushIndex = pushIndex_.load(std::memory_order_acquire);
266 // Atomic with acquire order reason: get the latest value
267 auto popIndex = popIndex_.load(std::memory_order_acquire);
268 // Atomic with acquire order reason: get the latest value
269 if (UNLIKELY(pushIndex < popIndex && popIndex != pushIndex_.load(std::memory_order_acquire))) {
270 continue;
271 }
272 return pushIndex - popIndex;
273 }
274 }
275
276 template <class T, size_t QUEUE_NODE_SIZE>
IsEmpty()277 bool inline SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::IsEmpty() const
278 {
279 return Size() == 0;
280 }
281
282 template <class T, size_t QUEUE_NODE_SIZE>
Push(T && val)283 void SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::Push(T &&val)
284 {
285 // Atomic with acquire order reason: get the latest value
286 auto pushIndex = pushIndex_.load(std::memory_order_acquire);
287
288 // Atomic with acquire order reason: get the latest value
289 auto currentTail = tail_.load(std::memory_order_acquire);
290 auto bufferIndex = GetBufferIndex(pushIndex);
291 if (UNLIKELY(bufferIndex == 0 && pushIndex != 0)) {
292 // Creating new node
293 auto newNode = new NodeType();
294 // We set new id that define a order of nodes
295 newNode->id = currentTail->id + 1UL;
296 newNode->buffer[0] = std::move(val);
297 // Atomic with release order reason: other threads should see correct value
298 currentTail->next.store(newNode, std::memory_order_release);
299 // Atomic with release order reason: other threads should see correct value
300 tail_.store(newNode, std::memory_order_release);
301 } else {
302 // otherwise we can write new val in buffer
303 currentTail->buffer[bufferIndex] = std::move(val);
304 }
305 ASSERT(pushIndex != SIZE_MAX);
306 // We have finished with pushing so we need to mode push_index_
307 // Atomic with release order reason: other threads should see correct value
308 pushIndex_.store(pushIndex + 1UL, std::memory_order_release);
309 }
310
311 template <class T, size_t QUEUE_NODE_SIZE>
Pop(size_t consumerId)312 std::optional<T> SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::Pop(size_t consumerId)
313 {
314 // Atomic with acquire order reason: get the latest value
315 ASSERT(consumerId <= consumerCount_.load(std::memory_order_acquire));
316 PopUserScope popUser(this);
317 while (true) {
318 // Atomic with acquire order reason: get the latest value
319 auto pushIndex = pushIndex_.load(std::memory_order_acquire);
320 // Atomic with acquire order reason: get the latest value
321 auto popIndex = popIndex_.load(std::memory_order_acquire);
322 // Check if we loaded correct indexes
323 if (UNLIKELY(pushIndex < popIndex)) {
324 // Retry
325 continue;
326 }
327 // if no tasks we can return nullopt
328 if (UNLIKELY(pushIndex == popIndex)) {
329 // Atomic with acquire order reason: get the latest value
330 if (pushIndex == pushIndex_.load(std::memory_order_acquire)) {
331 return std::nullopt;
332 }
333 continue;
334 }
335 auto bufferIndex = GetBufferIndex(popIndex);
336 auto nodeId = GetNodeId(popIndex);
337 auto *currentHead = LoadAtomicPtr(head_);
338 // Check if current_head is correct indexes corresponds to current_head.
339 if (UNLIKELY(currentHead == nullptr)) {
340 // Retry
341 continue;
342 }
343 if (UNLIKELY(currentHead->id > nodeId)) {
344 // Retry
345 continue;
346 }
347 T val;
348 // Check if need to change head_.
349 if (UNLIKELY(bufferIndex == 0 && popIndex != 0)) {
350 // We should move pop_index_ first to occupy current_pop_index.
351 if (UNLIKELY(!TryMovePopIndex(popIndex))) {
352 // if we can not occupy it we should retry.
353 continue;
354 }
355 if (LIKELY(CompareAndSetNextHead(currentHead, val, consumerId))) {
356 return val;
357 }
358 continue;
359 }
360 // Else we can assume that pop_index was moved but head_ may still be old one, we should check it
361 if (UNLIKELY(currentHead->id < nodeId)) {
362 // Help to change head_ to a new one and return it's first element
363 if (LIKELY(CompareAndSetNextHead(currentHead, val, consumerId))) {
364 return val;
365 }
366 continue;
367 }
368 // Otherwise we try to occupy index
369 if (UNLIKELY(!TryMovePopIndex(popIndex))) {
370 // if we can not occupy it we should retry.
371 continue;
372 }
373 return std::move(currentHead->buffer[bufferIndex]);
374 }
375 }
376
377 } // namespace ark::taskmanager::internal
378
379 #endif // LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_LOCK_FREE_QUEUE_H
380