• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_LOCK_FREE_QUEUE_H
17 #define LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_LOCK_FREE_QUEUE_H
18 
19 #include "coherency_line_size.h"
20 #include "libpandabase/utils/math_helpers.h"
21 #include <optional>
22 #include <map>
23 #include <atomic>
24 #include <array>
25 #include <set>
26 
27 namespace ark::taskmanager::internal {
28 
29 static constexpr size_t SP_MC_LOCK_FREE_QUEUE_DEFAULT_QUEUE_NODE_SIZE = 1UL << 5U;
30 
31 /**
32  * @brief SPMCLockFreeQueue is single producer, multiple consumer lock free queue.
33  * @tparam T: Type of class you want ot store in queue
34  * @tparam QUEUE_NODE_SIZE: Size of one node in queue. Default value: SP_MC_LOCK_FREE_QUEUE_DEFAULT_QUEUE_NODE_SIZE
35  */
36 template <class T, size_t QUEUE_NODE_SIZE = SP_MC_LOCK_FREE_QUEUE_DEFAULT_QUEUE_NODE_SIZE>
37 class SPMCLockFreeQueue {
38     static_assert(ark::helpers::math::IsPowerOfTwo(QUEUE_NODE_SIZE));
39     static constexpr size_t QUEUE_NODE_MASK = QUEUE_NODE_SIZE - 1U;
40     static_assert(QUEUE_NODE_MASK > 0);
41 #ifdef __APPLE__
42     static constexpr size_t QUEUE_NODE_SHIFT = ark::helpers::math::GetIntLog2(static_cast<uint64_t>(QUEUE_NODE_SIZE));
43 #else
44     static constexpr size_t QUEUE_NODE_SHIFT = ark::helpers::math::GetIntLog2(QUEUE_NODE_SIZE);
45 #endif
46 
47     static constexpr size_t CONSUMER_MAX_COUNT = 1UL << 5U;
48     static constexpr size_t DELETE_TRIGGER_PUSH_COUNT = QUEUE_NODE_SIZE;
49     static_assert(ark::helpers::math::IsPowerOfTwo(DELETE_TRIGGER_PUSH_COUNT));
50     static constexpr size_t DELETE_TRIGGER_PUSH_MASK = DELETE_TRIGGER_PUSH_COUNT - 1U;
51 
52     // NOLINTBEGIN(misc-non-private-member-variables-in-classes)
53     template <class U, size_t NODE_SIZE>
54     struct QueueNode {
55         QueueNode() = default;
56         ~QueueNode() = default;
57 
58         DEFAULT_COPY_SEMANTIC(QueueNode);
59         DEFAULT_MOVE_SEMANTIC(QueueNode);
60 
61         size_t id {0};
62         std::atomic<QueueNode *> next {nullptr};
63         std::array<U, NODE_SIZE> buffer {};
64     };
65     // NOLINTEND(misc-non-private-member-variables-in-classes)
66 
67     using NodeType = QueueNode<T, QUEUE_NODE_SIZE>;
68     using NodePtr = NodeType *;
69 
70     class PopUserScope {
71     public:
PopUserScope(SPMCLockFreeQueue * queue)72         explicit PopUserScope(SPMCLockFreeQueue *queue) : queue_(queue)
73         {
74             // Atomic with acq_rel order reason: other threads should see correct value
75             queue_->popUserCount_.fetch_add(1U, std::memory_order_acq_rel);
76         }
~PopUserScope()77         ~PopUserScope()
78         {
79             // Atomic with acq_rel order reason: other threads should see correct value
80             queue_->popUserCount_.fetch_sub(1U, std::memory_order_acq_rel);
81         }
82 
83         NO_COPY_SEMANTIC(PopUserScope);
84         NO_MOVE_SEMANTIC(PopUserScope);
85 
86     private:
87         SPMCLockFreeQueue *queue_ = nullptr;
88     };
89 
90 public:
91     using ThreadId = size_t;
92     using ValueType = T;
93 
94     SPMCLockFreeQueue();
95     ~SPMCLockFreeQueue();
96 
97     NO_COPY_SEMANTIC(SPMCLockFreeQueue);
98     NO_MOVE_SEMANTIC(SPMCLockFreeQueue);
99 
100     /**
101      * @brief Registers thread in queue, should be called only on time by one thread.
102      * @returns Unique id for thread
103      */
104     size_t RegisterConsumer();
105 
106     /**
107      * @brief Method pushes value in queue. Method can allocate new queue node.
108      * @param val: value that should be pushed.
109      */
110     void Push(T &&val);
111 
112     /**
113      * @brief Method pops value from queue.
114      * @param consumer_id: unique consumer id that was gotten with RegisterConsumerMethod
115      * @returns value if queue have it, otherwise std::nullopt.
116      */
117     std::optional<T> Pop(size_t id);
118 
119     /// @returns true if queue is empty.
120     bool inline IsEmpty() const;
121 
122     /// @returns count of tasks inside queue.
123     size_t inline Size() const;
124 
125     /**
126      * @brief Method deletes all retired nodes. Use this method if you know that no consumer
127      * threads will use Pop method until method execution ends
128      */
TryDeleteRetiredPtrs()129     void TryDeleteRetiredPtrs()
130     {
131         if (LIKELY(GetPopUserCount() == 0)) {
132             // Atomic with acquire order reason: get the latest value
133             auto currentCountOfThreads = consumerCount_.load(std::memory_order_acquire);
134             for (size_t id = 0; id < currentCountOfThreads; id++) {
135                 auto &retiredPtrs = perConsumerRetiredPtrs_[id];
136                 for (auto node : retiredPtrs) {
137                     delete node;
138                 }
139                 retiredPtrs.clear();
140             }
141         }
142     }
143 
144 private:
GetBufferIndex(size_t index)145     size_t inline GetBufferIndex(size_t index) const
146     {
147         return index & QUEUE_NODE_MASK;
148     }
149 
GetNodeId(size_t index)150     size_t inline GetNodeId(size_t index) const
151     {
152         return index >> QUEUE_NODE_SHIFT;
153     }
154 
155     /// Method @returns value of popUserCount_
GetPopUserCount()156     size_t GetPopUserCount() const
157     {
158         // Atomic with acquire order reason: getting correct value of popUserCount
159         return popUserCount_.load(std::memory_order_acquire);
160     }
161 
162     /**
163      * @brief Method load atomic ptr to QueueNode
164      * @param atomicPtr: ref to prt that should be load
165      * @return correct value of atomicPtr
166      */
LoadAtomicPtr(std::atomic<NodePtr> & atomicPtr)167     NodePtr LoadAtomicPtr(std::atomic<NodePtr> &atomicPtr)
168     {
169         // Atomic with acquire order reason: get the latest value
170         return atomicPtr.load(std::memory_order_acquire);
171     }
172 
173     /**
174      * @brief Writes ptr to list of retired ptrs to delete them later
175      * @param consumerId: id of consumer that retire ptr
176      * @param ptr: pointer to QueueNode that should be retired
177      */
RetirePtr(size_t consumerId,NodePtr ptr)178     void RetirePtr(size_t consumerId, NodePtr ptr)
179     {
180         auto &retiredPtrs = perConsumerRetiredPtrs_[consumerId];
181         retiredPtrs.push_back(ptr);
182     }
183 
184     /**
185      * @brief Method tries move pop_index_ on next position.
186      * @param current_pop_index: value of pop_index_ that are owned by thread
187      * @returns true if currant_pop_index==pop_index and CAS was done, else false
188      */
TryMovePopIndex(size_t popIndex)189     inline bool TryMovePopIndex(size_t popIndex)
190     {
191         return popIndex_.compare_exchange_strong(
192             popIndex, popIndex + 1U,
193             // Atomic with acq_rel order reason: other threads should be correct value
194             std::memory_order_acq_rel);
195     }
196 
197     /**
198      * @brief Method tries sets next head based on current. Also it's move old ptr to retired list
199      * @param currentHead: value of head that was loaded
200      * @param val: ref to T type variable to write first element of next node
201      * @param consumerId: id of consumer that want to set new head
202      * @returns true if head was changed, otherwise returns false
203      */
CompareAndSetNextHead(NodePtr currentHead,T & val,size_t consumerId)204     bool CompareAndSetNextHead(NodePtr currentHead, T &val, size_t consumerId)
205     {
206         auto nextHead = LoadAtomicPtr(currentHead->next);
207         ASSERT(nextHead != nullptr);
208         //   Atomic with acq_rel order reason: other threads should be correct value
209         if (LIKELY(head_.compare_exchange_strong(currentHead, nextHead, std::memory_order_acq_rel))) {
210             // Now we should correctly delete a current_head.
211             RetirePtr(consumerId, currentHead);
212             // Finally we can return first value of new head
213             val = std::move(nextHead->buffer[0]);
214             return true;
215         }
216         return false;
217     }
218 
219     alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t pushIndex_ {0};
220     alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t popIndex_ {0};
221 
222     alignas(ark::COHERENCY_LINE_SIZE) std::atomic<NodePtr> head_ {nullptr};
223     alignas(ark::COHERENCY_LINE_SIZE) std::atomic<NodePtr> tail_ {nullptr};
224 
225     alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t popUserCount_ {0};
226     alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t consumerCount_ {0};
227     alignas(ark::COHERENCY_LINE_SIZE) std::array<std::vector<NodePtr>, CONSUMER_MAX_COUNT> perConsumerRetiredPtrs_;
228 };
229 
230 template <class T, size_t QUEUE_NODE_SIZE>
SPMCLockFreeQueue()231 SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::SPMCLockFreeQueue()
232 {
233     for (size_t id = 0; id < CONSUMER_MAX_COUNT; id++) {
234         perConsumerRetiredPtrs_[id] = {};
235     }
236     auto *node = new NodeType();
237     // Atomic with release order reason: other threads should see correct value
238     head_.store(node, std::memory_order_release);
239     // Atomic with release order reason: other threads should see correct value
240     tail_.store(node, std::memory_order_release);
241 }
242 
243 template <class T, size_t QUEUE_NODE_SIZE>
~SPMCLockFreeQueue()244 SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::~SPMCLockFreeQueue()
245 {
246     // Check if now there are no elements in queue
247     ASSERT(IsEmpty());
248     ASSERT(GetPopUserCount() == 0);
249     TryDeleteRetiredPtrs();
250     // Atomic with acquire order reason: get the latest value
251     auto head = head_.load(std::memory_order_acquire);
252     delete head;
253 }
254 
255 template <class T, size_t QUEUE_NODE_SIZE>
RegisterConsumer()256 size_t SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::RegisterConsumer()
257 {
258     // Atomic with acq_rel order reason: other threads should be correct value
259     auto id = consumerCount_.fetch_add(1UL, std::memory_order_acq_rel);
260     ASSERT(id < CONSUMER_MAX_COUNT);
261     return id;
262 }
263 
264 template <class T, size_t QUEUE_NODE_SIZE>
Size()265 size_t inline SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::Size() const
266 {
267     while (true) {
268         // Atomic with acquire order reason: get the latest value
269         auto pushIndex = pushIndex_.load(std::memory_order_acquire);
270         // Atomic with acquire order reason: get the latest value
271         auto popIndex = popIndex_.load(std::memory_order_acquire);
272         // Atomic with acquire order reason: get the latest value
273         if (UNLIKELY(pushIndex < popIndex && popIndex != pushIndex_.load(std::memory_order_acquire))) {
274             continue;
275         }
276         return pushIndex - popIndex;
277     }
278 }
279 
280 template <class T, size_t QUEUE_NODE_SIZE>
IsEmpty()281 bool inline SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::IsEmpty() const
282 {
283     return Size() == 0;
284 }
285 
286 template <class T, size_t QUEUE_NODE_SIZE>
Push(T && val)287 void SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::Push(T &&val)
288 {
289     // Atomic with acquire order reason: get the latest value
290     auto pushIndex = pushIndex_.load(std::memory_order_acquire);
291 
292     // Atomic with acquire order reason: get the latest value
293     auto currentTail = tail_.load(std::memory_order_acquire);
294     auto bufferIndex = GetBufferIndex(pushIndex);
295     if (UNLIKELY(bufferIndex == 0 && pushIndex != 0)) {
296         // Creating new node
297         auto newNode = new NodeType();
298         // We set new id that define a order of nodes
299         newNode->id = currentTail->id + 1UL;
300         newNode->buffer[0] = std::move(val);
301         // Atomic with release order reason: other threads should see correct value
302         currentTail->next.store(newNode, std::memory_order_release);
303         // Atomic with release order reason: other threads should see correct value
304         tail_.store(newNode, std::memory_order_release);
305     } else {
306         // otherwise we can write new val in buffer
307         currentTail->buffer[bufferIndex] = std::move(val);
308     }
309     ASSERT(pushIndex != SIZE_MAX);
310     // We have finished with pushing so we need to mode push_index_
311     // Atomic with release order reason: other threads should see correct value
312     pushIndex_.store(pushIndex + 1UL, std::memory_order_release);
313 }
314 
315 template <class T, size_t QUEUE_NODE_SIZE>
Pop(size_t consumerId)316 std::optional<T> SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::Pop(size_t consumerId)
317 {
318     // Atomic with acquire order reason: get the latest value
319     ASSERT(consumerId <= consumerCount_.load(std::memory_order_acquire));
320     PopUserScope popUser(this);
321     while (true) {
322         // Atomic with acquire order reason: get the latest value
323         auto pushIndex = pushIndex_.load(std::memory_order_acquire);
324         // Atomic with acquire order reason: get the latest value
325         auto popIndex = popIndex_.load(std::memory_order_acquire);
326         // Check if we loaded correct indexes
327         if (UNLIKELY(pushIndex < popIndex)) {
328             // Retry
329             continue;
330         }
331         // if no tasks we can return nullopt
332         if (UNLIKELY(pushIndex == popIndex)) {
333             // Atomic with acquire order reason: get the latest value
334             if (pushIndex == pushIndex_.load(std::memory_order_acquire)) {
335                 return std::nullopt;
336             }
337             continue;
338         }
339         auto bufferIndex = GetBufferIndex(popIndex);
340         auto nodeId = GetNodeId(popIndex);
341         auto *currentHead = LoadAtomicPtr(head_);
342         // Check if current_head is correct indexes corresponds to current_head.
343         if (UNLIKELY(currentHead == nullptr)) {
344             // Retry
345             continue;
346         }
347         if (UNLIKELY(currentHead->id > nodeId)) {
348             // Retry
349             continue;
350         }
351         T val;
352         // Check if need to change head_.
353         if (UNLIKELY(bufferIndex == 0 && popIndex != 0)) {
354             // We should move pop_index_ first to occupy current_pop_index.
355             if (UNLIKELY(!TryMovePopIndex(popIndex))) {
356                 // if we can not occupy it we should retry.
357                 continue;
358             }
359             if (LIKELY(CompareAndSetNextHead(currentHead, val, consumerId))) {
360                 return val;
361             }
362             continue;
363         }
364         // Else we can assume that pop_index was moved but head_ may still be old one, we should check it
365         if (UNLIKELY(currentHead->id < nodeId)) {
366             // Help to change head_ to a new one and return it's first element
367             if (LIKELY(CompareAndSetNextHead(currentHead, val, consumerId))) {
368                 return val;
369             }
370             continue;
371         }
372         // Otherwise we try to occupy index
373         if (UNLIKELY(!TryMovePopIndex(popIndex))) {
374             // if we can not occupy it we should retry.
375             continue;
376         }
377         return std::move(currentHead->buffer[bufferIndex]);
378     }
379 }
380 
381 }  // namespace ark::taskmanager::internal
382 
383 #endif  // LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_LOCK_FREE_QUEUE_H
384