1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_LOCK_FREE_QUEUE_H
17 #define LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_LOCK_FREE_QUEUE_H
18
19 #include "coherency_line_size.h"
20 #include "libpandabase/utils/math_helpers.h"
21 #include <optional>
22 #include <map>
23 #include <atomic>
24 #include <array>
25 #include <set>
26
27 namespace ark::taskmanager::internal {
28
29 static constexpr size_t SP_MC_LOCK_FREE_QUEUE_DEFAULT_QUEUE_NODE_SIZE = 1UL << 5U;
30
31 /**
32 * @brief SPMCLockFreeQueue is single producer, multiple consumer lock free queue.
33 * @tparam T: Type of class you want ot store in queue
34 * @tparam QUEUE_NODE_SIZE: Size of one node in queue. Default value: SP_MC_LOCK_FREE_QUEUE_DEFAULT_QUEUE_NODE_SIZE
35 */
36 template <class T, size_t QUEUE_NODE_SIZE = SP_MC_LOCK_FREE_QUEUE_DEFAULT_QUEUE_NODE_SIZE>
37 class SPMCLockFreeQueue {
38 static_assert(ark::helpers::math::IsPowerOfTwo(QUEUE_NODE_SIZE));
39 static constexpr size_t QUEUE_NODE_MASK = QUEUE_NODE_SIZE - 1U;
40 static_assert(QUEUE_NODE_MASK > 0);
41 #ifdef __APPLE__
42 static constexpr size_t QUEUE_NODE_SHIFT = ark::helpers::math::GetIntLog2(static_cast<uint64_t>(QUEUE_NODE_SIZE));
43 #else
44 static constexpr size_t QUEUE_NODE_SHIFT = ark::helpers::math::GetIntLog2(QUEUE_NODE_SIZE);
45 #endif
46
47 static constexpr size_t CONSUMER_MAX_COUNT = 1UL << 5U;
48 static constexpr size_t DELETE_TRIGGER_PUSH_COUNT = QUEUE_NODE_SIZE;
49 static_assert(ark::helpers::math::IsPowerOfTwo(DELETE_TRIGGER_PUSH_COUNT));
50 static constexpr size_t DELETE_TRIGGER_PUSH_MASK = DELETE_TRIGGER_PUSH_COUNT - 1U;
51
52 // NOLINTBEGIN(misc-non-private-member-variables-in-classes)
53 template <class U, size_t NODE_SIZE>
54 struct QueueNode {
55 QueueNode() = default;
56 ~QueueNode() = default;
57
58 DEFAULT_COPY_SEMANTIC(QueueNode);
59 DEFAULT_MOVE_SEMANTIC(QueueNode);
60
61 size_t id {0};
62 std::atomic<QueueNode *> next {nullptr};
63 std::array<U, NODE_SIZE> buffer {};
64 };
65 // NOLINTEND(misc-non-private-member-variables-in-classes)
66
67 using NodeType = QueueNode<T, QUEUE_NODE_SIZE>;
68 using NodePtr = NodeType *;
69
70 class PopUserScope {
71 public:
PopUserScope(SPMCLockFreeQueue * queue)72 explicit PopUserScope(SPMCLockFreeQueue *queue) : queue_(queue)
73 {
74 // Atomic with acq_rel order reason: other threads should see correct value
75 queue_->popUserCount_.fetch_add(1U, std::memory_order_acq_rel);
76 }
~PopUserScope()77 ~PopUserScope()
78 {
79 // Atomic with acq_rel order reason: other threads should see correct value
80 queue_->popUserCount_.fetch_sub(1U, std::memory_order_acq_rel);
81 }
82
83 NO_COPY_SEMANTIC(PopUserScope);
84 NO_MOVE_SEMANTIC(PopUserScope);
85
86 private:
87 SPMCLockFreeQueue *queue_ = nullptr;
88 };
89
90 public:
91 using ThreadId = size_t;
92 using ValueType = T;
93
94 SPMCLockFreeQueue();
95 ~SPMCLockFreeQueue();
96
97 NO_COPY_SEMANTIC(SPMCLockFreeQueue);
98 NO_MOVE_SEMANTIC(SPMCLockFreeQueue);
99
100 /**
101 * @brief Registers thread in queue, should be called only on time by one thread.
102 * @returns Unique id for thread
103 */
104 size_t RegisterConsumer();
105
106 /**
107 * @brief Method pushes value in queue. Method can allocate new queue node.
108 * @param val: value that should be pushed.
109 */
110 void Push(T &&val);
111
112 /**
113 * @brief Method pops value from queue.
114 * @param consumer_id: unique consumer id that was gotten with RegisterConsumerMethod
115 * @returns value if queue have it, otherwise std::nullopt.
116 */
117 std::optional<T> Pop(size_t id);
118
119 /// @returns true if queue is empty.
120 bool inline IsEmpty() const;
121
122 /// @returns count of tasks inside queue.
123 size_t inline Size() const;
124
125 /**
126 * @brief Method deletes all retired nodes. Use this method if you know that no consumer
127 * threads will use Pop method until method execution ends
128 */
TryDeleteRetiredPtrs()129 void TryDeleteRetiredPtrs()
130 {
131 if (LIKELY(GetPopUserCount() == 0)) {
132 // Atomic with acquire order reason: get the latest value
133 auto currentCountOfThreads = consumerCount_.load(std::memory_order_acquire);
134 for (size_t id = 0; id < currentCountOfThreads; id++) {
135 auto &retiredPtrs = perConsumerRetiredPtrs_[id];
136 for (auto node : retiredPtrs) {
137 delete node;
138 }
139 retiredPtrs.clear();
140 }
141 }
142 }
143
144 private:
GetBufferIndex(size_t index)145 size_t inline GetBufferIndex(size_t index) const
146 {
147 return index & QUEUE_NODE_MASK;
148 }
149
GetNodeId(size_t index)150 size_t inline GetNodeId(size_t index) const
151 {
152 return index >> QUEUE_NODE_SHIFT;
153 }
154
155 /// Method @returns value of popUserCount_
GetPopUserCount()156 size_t GetPopUserCount() const
157 {
158 // Atomic with acquire order reason: getting correct value of popUserCount
159 return popUserCount_.load(std::memory_order_acquire);
160 }
161
162 /**
163 * @brief Method load atomic ptr to QueueNode
164 * @param atomicPtr: ref to prt that should be load
165 * @return correct value of atomicPtr
166 */
LoadAtomicPtr(std::atomic<NodePtr> & atomicPtr)167 NodePtr LoadAtomicPtr(std::atomic<NodePtr> &atomicPtr)
168 {
169 // Atomic with acquire order reason: get the latest value
170 return atomicPtr.load(std::memory_order_acquire);
171 }
172
173 /**
174 * @brief Writes ptr to list of retired ptrs to delete them later
175 * @param consumerId: id of consumer that retire ptr
176 * @param ptr: pointer to QueueNode that should be retired
177 */
RetirePtr(size_t consumerId,NodePtr ptr)178 void RetirePtr(size_t consumerId, NodePtr ptr)
179 {
180 auto &retiredPtrs = perConsumerRetiredPtrs_[consumerId];
181 retiredPtrs.push_back(ptr);
182 }
183
184 /**
185 * @brief Method tries move pop_index_ on next position.
186 * @param current_pop_index: value of pop_index_ that are owned by thread
187 * @returns true if currant_pop_index==pop_index and CAS was done, else false
188 */
TryMovePopIndex(size_t popIndex)189 inline bool TryMovePopIndex(size_t popIndex)
190 {
191 return popIndex_.compare_exchange_strong(
192 popIndex, popIndex + 1U,
193 // Atomic with acq_rel order reason: other threads should be correct value
194 std::memory_order_acq_rel);
195 }
196
197 /**
198 * @brief Method tries sets next head based on current. Also it's move old ptr to retired list
199 * @param currentHead: value of head that was loaded
200 * @param val: ref to T type variable to write first element of next node
201 * @param consumerId: id of consumer that want to set new head
202 * @returns true if head was changed, otherwise returns false
203 */
CompareAndSetNextHead(NodePtr currentHead,T & val,size_t consumerId)204 bool CompareAndSetNextHead(NodePtr currentHead, T &val, size_t consumerId)
205 {
206 auto nextHead = LoadAtomicPtr(currentHead->next);
207 ASSERT(nextHead != nullptr);
208 // Atomic with acq_rel order reason: other threads should be correct value
209 if (LIKELY(head_.compare_exchange_strong(currentHead, nextHead, std::memory_order_acq_rel))) {
210 // Now we should correctly delete a current_head.
211 RetirePtr(consumerId, currentHead);
212 // Finally we can return first value of new head
213 val = std::move(nextHead->buffer[0]);
214 return true;
215 }
216 return false;
217 }
218
219 alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t pushIndex_ {0};
220 alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t popIndex_ {0};
221
222 alignas(ark::COHERENCY_LINE_SIZE) std::atomic<NodePtr> head_ {nullptr};
223 alignas(ark::COHERENCY_LINE_SIZE) std::atomic<NodePtr> tail_ {nullptr};
224
225 alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t popUserCount_ {0};
226 alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t consumerCount_ {0};
227 alignas(ark::COHERENCY_LINE_SIZE) std::array<std::vector<NodePtr>, CONSUMER_MAX_COUNT> perConsumerRetiredPtrs_;
228 };
229
230 template <class T, size_t QUEUE_NODE_SIZE>
SPMCLockFreeQueue()231 SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::SPMCLockFreeQueue()
232 {
233 for (size_t id = 0; id < CONSUMER_MAX_COUNT; id++) {
234 perConsumerRetiredPtrs_[id] = {};
235 }
236 auto *node = new NodeType();
237 // Atomic with release order reason: other threads should see correct value
238 head_.store(node, std::memory_order_release);
239 // Atomic with release order reason: other threads should see correct value
240 tail_.store(node, std::memory_order_release);
241 }
242
243 template <class T, size_t QUEUE_NODE_SIZE>
~SPMCLockFreeQueue()244 SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::~SPMCLockFreeQueue()
245 {
246 // Check if now there are no elements in queue
247 ASSERT(IsEmpty());
248 ASSERT(GetPopUserCount() == 0);
249 TryDeleteRetiredPtrs();
250 // Atomic with acquire order reason: get the latest value
251 auto head = head_.load(std::memory_order_acquire);
252 delete head;
253 }
254
255 template <class T, size_t QUEUE_NODE_SIZE>
RegisterConsumer()256 size_t SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::RegisterConsumer()
257 {
258 // Atomic with acq_rel order reason: other threads should be correct value
259 auto id = consumerCount_.fetch_add(1UL, std::memory_order_acq_rel);
260 ASSERT(id < CONSUMER_MAX_COUNT);
261 return id;
262 }
263
264 template <class T, size_t QUEUE_NODE_SIZE>
Size()265 size_t inline SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::Size() const
266 {
267 while (true) {
268 // Atomic with acquire order reason: get the latest value
269 auto pushIndex = pushIndex_.load(std::memory_order_acquire);
270 // Atomic with acquire order reason: get the latest value
271 auto popIndex = popIndex_.load(std::memory_order_acquire);
272 // Atomic with acquire order reason: get the latest value
273 if (UNLIKELY(pushIndex < popIndex && popIndex != pushIndex_.load(std::memory_order_acquire))) {
274 continue;
275 }
276 return pushIndex - popIndex;
277 }
278 }
279
280 template <class T, size_t QUEUE_NODE_SIZE>
IsEmpty()281 bool inline SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::IsEmpty() const
282 {
283 return Size() == 0;
284 }
285
286 template <class T, size_t QUEUE_NODE_SIZE>
Push(T && val)287 void SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::Push(T &&val)
288 {
289 // Atomic with acquire order reason: get the latest value
290 auto pushIndex = pushIndex_.load(std::memory_order_acquire);
291
292 // Atomic with acquire order reason: get the latest value
293 auto currentTail = tail_.load(std::memory_order_acquire);
294 auto bufferIndex = GetBufferIndex(pushIndex);
295 if (UNLIKELY(bufferIndex == 0 && pushIndex != 0)) {
296 // Creating new node
297 auto newNode = new NodeType();
298 // We set new id that define a order of nodes
299 newNode->id = currentTail->id + 1UL;
300 newNode->buffer[0] = std::move(val);
301 // Atomic with release order reason: other threads should see correct value
302 currentTail->next.store(newNode, std::memory_order_release);
303 // Atomic with release order reason: other threads should see correct value
304 tail_.store(newNode, std::memory_order_release);
305 } else {
306 // otherwise we can write new val in buffer
307 currentTail->buffer[bufferIndex] = std::move(val);
308 }
309 ASSERT(pushIndex != SIZE_MAX);
310 // We have finished with pushing so we need to mode push_index_
311 // Atomic with release order reason: other threads should see correct value
312 pushIndex_.store(pushIndex + 1UL, std::memory_order_release);
313 }
314
315 template <class T, size_t QUEUE_NODE_SIZE>
Pop(size_t consumerId)316 std::optional<T> SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::Pop(size_t consumerId)
317 {
318 // Atomic with acquire order reason: get the latest value
319 ASSERT(consumerId <= consumerCount_.load(std::memory_order_acquire));
320 PopUserScope popUser(this);
321 while (true) {
322 // Atomic with acquire order reason: get the latest value
323 auto pushIndex = pushIndex_.load(std::memory_order_acquire);
324 // Atomic with acquire order reason: get the latest value
325 auto popIndex = popIndex_.load(std::memory_order_acquire);
326 // Check if we loaded correct indexes
327 if (UNLIKELY(pushIndex < popIndex)) {
328 // Retry
329 continue;
330 }
331 // if no tasks we can return nullopt
332 if (UNLIKELY(pushIndex == popIndex)) {
333 // Atomic with acquire order reason: get the latest value
334 if (pushIndex == pushIndex_.load(std::memory_order_acquire)) {
335 return std::nullopt;
336 }
337 continue;
338 }
339 auto bufferIndex = GetBufferIndex(popIndex);
340 auto nodeId = GetNodeId(popIndex);
341 auto *currentHead = LoadAtomicPtr(head_);
342 // Check if current_head is correct indexes corresponds to current_head.
343 if (UNLIKELY(currentHead == nullptr)) {
344 // Retry
345 continue;
346 }
347 if (UNLIKELY(currentHead->id > nodeId)) {
348 // Retry
349 continue;
350 }
351 T val;
352 // Check if need to change head_.
353 if (UNLIKELY(bufferIndex == 0 && popIndex != 0)) {
354 // We should move pop_index_ first to occupy current_pop_index.
355 if (UNLIKELY(!TryMovePopIndex(popIndex))) {
356 // if we can not occupy it we should retry.
357 continue;
358 }
359 if (LIKELY(CompareAndSetNextHead(currentHead, val, consumerId))) {
360 return val;
361 }
362 continue;
363 }
364 // Else we can assume that pop_index was moved but head_ may still be old one, we should check it
365 if (UNLIKELY(currentHead->id < nodeId)) {
366 // Help to change head_ to a new one and return it's first element
367 if (LIKELY(CompareAndSetNextHead(currentHead, val, consumerId))) {
368 return val;
369 }
370 continue;
371 }
372 // Otherwise we try to occupy index
373 if (UNLIKELY(!TryMovePopIndex(popIndex))) {
374 // if we can not occupy it we should retry.
375 continue;
376 }
377 return std::move(currentHead->buffer[bufferIndex]);
378 }
379 }
380
381 } // namespace ark::taskmanager::internal
382
383 #endif // LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_LOCK_FREE_QUEUE_H
384