• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_LOCK_FREE_QUEUE_H
17 #define LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_LOCK_FREE_QUEUE_H
18 
19 #include "coherency_line_size.h"
20 #include "libpandabase/utils/math_helpers.h"
21 #include <optional>
22 #include <map>
23 #include <atomic>
24 #include <array>
25 #include <set>
26 
27 namespace ark::taskmanager::internal {
28 
29 static constexpr size_t SP_MC_LOCK_FREE_QUEUE_DEFAULT_QUEUE_NODE_SIZE = 1UL << 5U;
30 
31 /**
32  * @brief SPMCLockFreeQueue is single producer, multiple consumer lock free queue.
33  * @tparam T: Type of class you want ot store in queue
34  * @tparam QUEUE_NODE_SIZE: Size of one node in queue. Default value: SP_MC_LOCK_FREE_QUEUE_DEFAULT_QUEUE_NODE_SIZE
35  */
36 template <class T, size_t QUEUE_NODE_SIZE = SP_MC_LOCK_FREE_QUEUE_DEFAULT_QUEUE_NODE_SIZE>
37 class SPMCLockFreeQueue {
38     static_assert(ark::helpers::math::IsPowerOfTwo(QUEUE_NODE_SIZE));
39     static constexpr size_t QUEUE_NODE_MASK = QUEUE_NODE_SIZE - 1U;
40     static_assert(QUEUE_NODE_MASK > 0);
41     static constexpr size_t QUEUE_NODE_SHIFT = ark::helpers::math::GetIntLog2(QUEUE_NODE_SIZE);
42 
43     static constexpr size_t CONSUMER_MAX_COUNT = 1UL << 5U;
44     static constexpr size_t DELETE_TRIGGER_PUSH_COUNT = QUEUE_NODE_SIZE;
45     static_assert(ark::helpers::math::IsPowerOfTwo(DELETE_TRIGGER_PUSH_COUNT));
46     static constexpr size_t DELETE_TRIGGER_PUSH_MASK = DELETE_TRIGGER_PUSH_COUNT - 1U;
47 
48     // NOLINTBEGIN(misc-non-private-member-variables-in-classes)
49     template <class U, size_t NODE_SIZE>
50     struct QueueNode {
51         QueueNode() = default;
52         ~QueueNode() = default;
53 
54         DEFAULT_COPY_SEMANTIC(QueueNode);
55         DEFAULT_MOVE_SEMANTIC(QueueNode);
56 
57         size_t id {0};
58         std::atomic<QueueNode *> next {nullptr};
59         std::array<U, NODE_SIZE> buffer {};
60     };
61     // NOLINTEND(misc-non-private-member-variables-in-classes)
62 
63     using NodeType = QueueNode<T, QUEUE_NODE_SIZE>;
64     using NodePtr = NodeType *;
65 
66     class PopUserScope {
67     public:
PopUserScope(SPMCLockFreeQueue * queue)68         explicit PopUserScope(SPMCLockFreeQueue *queue) : queue_(queue)
69         {
70             // Atomic with acq_rel order reason: other threads should see correct value
71             queue_->popUserCount_.fetch_add(1U, std::memory_order_acq_rel);
72         }
~PopUserScope()73         ~PopUserScope()
74         {
75             // Atomic with acq_rel order reason: other threads should see correct value
76             queue_->popUserCount_.fetch_sub(1U, std::memory_order_acq_rel);
77         }
78 
79         NO_COPY_SEMANTIC(PopUserScope);
80         NO_MOVE_SEMANTIC(PopUserScope);
81 
82     private:
83         SPMCLockFreeQueue *queue_ = nullptr;
84     };
85 
86 public:
87     using ThreadId = size_t;
88     using ValueType = T;
89 
90     SPMCLockFreeQueue();
91     ~SPMCLockFreeQueue();
92 
93     NO_COPY_SEMANTIC(SPMCLockFreeQueue);
94     NO_MOVE_SEMANTIC(SPMCLockFreeQueue);
95 
96     /**
97      * @brief Registers thread in queue, should be called only on time by one thread.
98      * @returns Unique id for thread
99      */
100     size_t RegisterConsumer();
101 
102     /**
103      * @brief Method pushes value in queue. Method can allocate new queue node.
104      * @param val: value that should be pushed.
105      */
106     void Push(T &&val);
107 
108     /**
109      * @brief Method pops value from queue.
110      * @param consumer_id: unique consumer id that was gotten with RegisterConsumerMethod
111      * @returns value if queue have it, otherwise std::nullopt.
112      */
113     std::optional<T> Pop(size_t id);
114 
115     /// @returns true if queue is empty.
116     bool inline IsEmpty() const;
117 
118     /// @returns count of tasks inside queue.
119     size_t inline Size() const;
120 
121     /**
122      * @brief Method deletes all retired nodes. Use this method if you know that no consumer
123      * threads will use Pop method until method execution ends
124      */
TryDeleteRetiredPtrs()125     void TryDeleteRetiredPtrs()
126     {
127         if (LIKELY(GetPopUserCount() == 0)) {
128             // Atomic with acquire order reason: get the latest value
129             auto currentCountOfThreads = consumerCount_.load(std::memory_order_acquire);
130             for (size_t id = 0; id < currentCountOfThreads; id++) {
131                 auto &retiredPtrs = perConsumerRetiredPtrs_[id];
132                 for (auto node : retiredPtrs) {
133                     delete node;
134                 }
135                 retiredPtrs.clear();
136             }
137         }
138     }
139 
140 private:
GetBufferIndex(size_t index)141     size_t inline GetBufferIndex(size_t index) const
142     {
143         return index & QUEUE_NODE_MASK;
144     }
145 
GetNodeId(size_t index)146     size_t inline GetNodeId(size_t index) const
147     {
148         return index >> QUEUE_NODE_SHIFT;
149     }
150 
151     /// Method @returns value of popUserCount_
GetPopUserCount()152     size_t GetPopUserCount() const
153     {
154         // Atomic with acquire order reason: getting correct value of popUserCount
155         return popUserCount_.load(std::memory_order_acquire);
156     }
157 
158     /**
159      * @brief Method load atomic ptr to QueueNode
160      * @param atomicPtr: ref to prt that should be load
161      * @return correct value of atomicPtr
162      */
LoadAtomicPtr(std::atomic<NodePtr> & atomicPtr)163     NodePtr LoadAtomicPtr(std::atomic<NodePtr> &atomicPtr)
164     {
165         // Atomic with acquire order reason: get the latest value
166         return atomicPtr.load(std::memory_order_acquire);
167     }
168 
169     /**
170      * @brief Writes ptr to list of retired ptrs to delete them later
171      * @param consumerId: id of consumer that retire ptr
172      * @param ptr: pointer to QueueNode that should be retired
173      */
RetirePtr(size_t consumerId,NodePtr ptr)174     void RetirePtr(size_t consumerId, NodePtr ptr)
175     {
176         auto &retiredPtrs = perConsumerRetiredPtrs_[consumerId];
177         retiredPtrs.push_back(ptr);
178     }
179 
180     /**
181      * @brief Method tries move pop_index_ on next position.
182      * @param current_pop_index: value of pop_index_ that are owned by thread
183      * @returns true if currant_pop_index==pop_index and CAS was done, else false
184      */
TryMovePopIndex(size_t popIndex)185     inline bool TryMovePopIndex(size_t popIndex)
186     {
187         return popIndex_.compare_exchange_strong(
188             popIndex, popIndex + 1U,
189             // Atomic with acq_rel order reason: other threads should be correct value
190             std::memory_order_acq_rel);
191     }
192 
193     /**
194      * @brief Method tries sets next head based on current. Also it's move old ptr to retired list
195      * @param currentHead: value of head that was loaded
196      * @param val: ref to T type variable to write first element of next node
197      * @param consumerId: id of consumer that want to set new head
198      * @returns true if head was changed, otherwise returns false
199      */
CompareAndSetNextHead(NodePtr currentHead,T & val,size_t consumerId)200     bool CompareAndSetNextHead(NodePtr currentHead, T &val, size_t consumerId)
201     {
202         auto nextHead = LoadAtomicPtr(currentHead->next);
203         ASSERT(nextHead != nullptr);
204         //   Atomic with acq_rel order reason: other threads should be correct value
205         if (LIKELY(head_.compare_exchange_strong(currentHead, nextHead, std::memory_order_acq_rel))) {
206             // Now we should correctly delete a current_head.
207             RetirePtr(consumerId, currentHead);
208             // Finally we can return first value of new head
209             val = std::move(nextHead->buffer[0]);
210             return true;
211         }
212         return false;
213     }
214 
215     alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t pushIndex_ {0};
216     alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t popIndex_ {0};
217 
218     alignas(ark::COHERENCY_LINE_SIZE) std::atomic<NodePtr> head_ {nullptr};
219     alignas(ark::COHERENCY_LINE_SIZE) std::atomic<NodePtr> tail_ {nullptr};
220 
221     alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t popUserCount_ {0};
222     alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t consumerCount_ {0};
223     alignas(ark::COHERENCY_LINE_SIZE) std::array<std::vector<NodePtr>, CONSUMER_MAX_COUNT> perConsumerRetiredPtrs_;
224 };
225 
226 template <class T, size_t QUEUE_NODE_SIZE>
SPMCLockFreeQueue()227 SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::SPMCLockFreeQueue()
228 {
229     for (size_t id = 0; id < CONSUMER_MAX_COUNT; id++) {
230         perConsumerRetiredPtrs_[id] = {};
231     }
232     auto *node = new NodeType();
233     // Atomic with release order reason: other threads should see correct value
234     head_.store(node, std::memory_order_release);
235     // Atomic with release order reason: other threads should see correct value
236     tail_.store(node, std::memory_order_release);
237 }
238 
239 template <class T, size_t QUEUE_NODE_SIZE>
~SPMCLockFreeQueue()240 SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::~SPMCLockFreeQueue()
241 {
242     // Check if now there are no elements in queue
243     ASSERT(IsEmpty());
244     ASSERT(GetPopUserCount() == 0);
245     TryDeleteRetiredPtrs();
246     // Atomic with acquire order reason: get the latest value
247     auto head = head_.load(std::memory_order_acquire);
248     delete head;
249 }
250 
251 template <class T, size_t QUEUE_NODE_SIZE>
RegisterConsumer()252 size_t SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::RegisterConsumer()
253 {
254     // Atomic with acq_rel order reason: other threads should be correct value
255     auto id = consumerCount_.fetch_add(1UL, std::memory_order_acq_rel);
256     ASSERT(id < CONSUMER_MAX_COUNT);
257     return id;
258 }
259 
260 template <class T, size_t QUEUE_NODE_SIZE>
Size()261 size_t inline SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::Size() const
262 {
263     while (true) {
264         // Atomic with acquire order reason: get the latest value
265         auto pushIndex = pushIndex_.load(std::memory_order_acquire);
266         // Atomic with acquire order reason: get the latest value
267         auto popIndex = popIndex_.load(std::memory_order_acquire);
268         // Atomic with acquire order reason: get the latest value
269         if (UNLIKELY(pushIndex < popIndex && popIndex != pushIndex_.load(std::memory_order_acquire))) {
270             continue;
271         }
272         return pushIndex - popIndex;
273     }
274 }
275 
276 template <class T, size_t QUEUE_NODE_SIZE>
IsEmpty()277 bool inline SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::IsEmpty() const
278 {
279     return Size() == 0;
280 }
281 
282 template <class T, size_t QUEUE_NODE_SIZE>
Push(T && val)283 void SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::Push(T &&val)
284 {
285     // Atomic with acquire order reason: get the latest value
286     auto pushIndex = pushIndex_.load(std::memory_order_acquire);
287 
288     // Atomic with acquire order reason: get the latest value
289     auto currentTail = tail_.load(std::memory_order_acquire);
290     auto bufferIndex = GetBufferIndex(pushIndex);
291     if (UNLIKELY(bufferIndex == 0 && pushIndex != 0)) {
292         // Creating new node
293         auto newNode = new NodeType();
294         // We set new id that define a order of nodes
295         newNode->id = currentTail->id + 1UL;
296         newNode->buffer[0] = std::move(val);
297         // Atomic with release order reason: other threads should see correct value
298         currentTail->next.store(newNode, std::memory_order_release);
299         // Atomic with release order reason: other threads should see correct value
300         tail_.store(newNode, std::memory_order_release);
301     } else {
302         // otherwise we can write new val in buffer
303         currentTail->buffer[bufferIndex] = std::move(val);
304     }
305     ASSERT(pushIndex != SIZE_MAX);
306     // We have finished with pushing so we need to mode push_index_
307     // Atomic with release order reason: other threads should see correct value
308     pushIndex_.store(pushIndex + 1UL, std::memory_order_release);
309 }
310 
311 template <class T, size_t QUEUE_NODE_SIZE>
Pop(size_t consumerId)312 std::optional<T> SPMCLockFreeQueue<T, QUEUE_NODE_SIZE>::Pop(size_t consumerId)
313 {
314     // Atomic with acquire order reason: get the latest value
315     ASSERT(consumerId <= consumerCount_.load(std::memory_order_acquire));
316     PopUserScope popUser(this);
317     while (true) {
318         // Atomic with acquire order reason: get the latest value
319         auto pushIndex = pushIndex_.load(std::memory_order_acquire);
320         // Atomic with acquire order reason: get the latest value
321         auto popIndex = popIndex_.load(std::memory_order_acquire);
322         // Check if we loaded correct indexes
323         if (UNLIKELY(pushIndex < popIndex)) {
324             // Retry
325             continue;
326         }
327         // if no tasks we can return nullopt
328         if (UNLIKELY(pushIndex == popIndex)) {
329             // Atomic with acquire order reason: get the latest value
330             if (pushIndex == pushIndex_.load(std::memory_order_acquire)) {
331                 return std::nullopt;
332             }
333             continue;
334         }
335         auto bufferIndex = GetBufferIndex(popIndex);
336         auto nodeId = GetNodeId(popIndex);
337         auto *currentHead = LoadAtomicPtr(head_);
338         // Check if current_head is correct indexes corresponds to current_head.
339         if (UNLIKELY(currentHead == nullptr)) {
340             // Retry
341             continue;
342         }
343         if (UNLIKELY(currentHead->id > nodeId)) {
344             // Retry
345             continue;
346         }
347         T val;
348         // Check if need to change head_.
349         if (UNLIKELY(bufferIndex == 0 && popIndex != 0)) {
350             // We should move pop_index_ first to occupy current_pop_index.
351             if (UNLIKELY(!TryMovePopIndex(popIndex))) {
352                 // if we can not occupy it we should retry.
353                 continue;
354             }
355             if (LIKELY(CompareAndSetNextHead(currentHead, val, consumerId))) {
356                 return val;
357             }
358             continue;
359         }
360         // Else we can assume that pop_index was moved but head_ may still be old one, we should check it
361         if (UNLIKELY(currentHead->id < nodeId)) {
362             // Help to change head_ to a new one and return it's first element
363             if (LIKELY(CompareAndSetNextHead(currentHead, val, consumerId))) {
364                 return val;
365             }
366             continue;
367         }
368         // Otherwise we try to occupy index
369         if (UNLIKELY(!TryMovePopIndex(popIndex))) {
370             // if we can not occupy it we should retry.
371             continue;
372         }
373         return std::move(currentHead->buffer[bufferIndex]);
374     }
375 }
376 
377 }  // namespace ark::taskmanager::internal
378 
379 #endif  // LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_LOCK_FREE_QUEUE_H
380