• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef LIBPANDABASE_TASKMANAGER_UTILS_SP_SC_LOCK_FREE_QUEUE_H
17 #define LIBPANDABASE_TASKMANAGER_UTILS_SP_SC_LOCK_FREE_QUEUE_H
18 
19 #include "libpandabase/macros.h"
20 #include "libpandabase/utils/math_helpers.h"
21 #include "coherency_line_size.h"
22 #include <array>
23 #include <atomic>
24 
25 namespace ark::taskmanager::internal {
26 
27 /**
28  * @brief SPSCLockFreeQueue is single producer, single consumer lock free queue.
29  * @tparam T: Type of class you want ot store in queue
30  * @tparam Allocator: Type of allocator that will be used to allocate nodes
31  */
32 template <class T, class Allocator>
33 class SPSCLockFreeQueue {
34     static constexpr size_t QUEUE_NODE_SIZE = 1UL << 5U;
35     static_assert(ark::helpers::math::IsPowerOfTwo(QUEUE_NODE_SIZE));
36     static constexpr size_t QUEUE_NODE_MASK = QUEUE_NODE_SIZE - 1;
37     static_assert(QUEUE_NODE_MASK > 0);
38 
39     struct QueueNode {
40         std::array<T, QUEUE_NODE_SIZE> buffer;
41         std::atomic<QueueNode *> next = nullptr;
42     };
43 
44     using QueueNodeAllocatorType = typename std::allocator_traits<Allocator>::template rebind_alloc<QueueNode>;
45     template <class U, class OtherAllocator>
46     friend class SPSCLockFreeQueue;
47 
48 public:
SPSCLockFreeQueue()49     SPSCLockFreeQueue()
50     {
51         auto *node = GetNewQueueNode();
52         // Atomic with relaxed order reason: no order requirement
53         head_.store(node, std::memory_order_relaxed);
54         // Atomic with relaxed order reason: no order requirement
55         tail_.store(node, std::memory_order_relaxed);
56     }
~SPSCLockFreeQueue()57     ~SPSCLockFreeQueue()
58     {
59         ASSERT(pushIndex_ == popIndex_);
60         ASSERT(head_ == tail_);
61         // Atomic with relaxed order reason: no order requirement
62         auto head = head_.load(std::memory_order_relaxed);
63         DeleteQueueNode(head);
64     }
65 
66     NO_COPY_SEMANTIC(SPSCLockFreeQueue);
67     NO_MOVE_SEMANTIC(SPSCLockFreeQueue);
68 
Push(T && val)69     void Push(T &&val)
70     {
71         // Atomic with relaxed order reason: gets local variable
72         auto *tail = tail_.load(std::memory_order_relaxed);
73         // Atomic with relaxed order reason: gets local variable
74         auto pushIndex = pushIndex_.load(std::memory_order_relaxed);
75         if UNLIKELY (GetNodeIndex(pushIndex) == 0) {
76             auto *node = GetNewQueueNode();
77             // Atomic with relaxed order reason: set in local variable
78             tail->next.store(node, std::memory_order_relaxed);
79             node->buffer[0] = std::move(val);
80             // Atomic with relaxed order reason: set in local variable
81             tail_.store(node, std::memory_order_relaxed);
82         } else {
83             tail->buffer[GetNodeIndex(pushIndex)] = std::move(val);
84         }
85         // Atomic with release order reason: other threads should see correct value
86         pushIndex_.store(pushIndex + 1U, std::memory_order_release);
87     }
88 
TryPop(T * val)89     bool TryPop(T *val)
90     {
91         ASSERT(val != nullptr);
92         // Atomic with relaxed order reason: gets local variable
93         auto *head = head_.load(std::memory_order_relaxed);
94         // Atomic With relaxed order reason: gets local variable
95         auto popIndex = popIndex_.load(std::memory_order_relaxed);
96         // Atomic with acquire order reason: need observe on pushes local variables
97         auto pushIndex = pushIndex_.load(std::memory_order_acquire);
98         if (popIndex == pushIndex) {
99             return false;
100         }
101 
102         if UNLIKELY (GetNodeIndex(popIndex) == 0) {
103             // Atomic with relaxed order reason: gets local variable
104             auto *nextHead = head->next.load(std::memory_order_relaxed);
105             ASSERT(nextHead != nullptr);
106             // Atomic with relaxed order reason: set in local variable
107             head_.store(nextHead, std::memory_order_relaxed);
108             DeleteQueueNode(head);
109             *val = std::move(nextHead->buffer[0]);
110         } else {
111             *val = std::move(head->buffer[GetNodeIndex(popIndex)]);
112         }
113         ASSERT(popIndex != SIZE_MAX);
114         // Atomic with relaxed order reason: set in local variable
115         popIndex_.store(popIndex + 1, std::memory_order_relaxed);
116 
117         return true;
118     }
119 
Pop()120     T Pop()
121     {
122         T val;
123         while (!TryPop(&val)) {
124         }
125         return val;
126     }
127 
IsEmpty()128     bool inline IsEmpty() const
129     {
130         return Size() == 0;
131     }
132 
Size()133     size_t inline Size() const
134     {
135         // Atomic with relaxed order reason: gets correct value
136         auto pushIndex = pushIndex_.load(std::memory_order_relaxed);
137         // Atomic with relaxed order reason: gets correct value
138         auto popIndex = popIndex_.load(std::memory_order_relaxed);
139         return pushIndex - popIndex;
140     }
141 
142 private:
GetNewQueueNode()143     QueueNode *GetNewQueueNode()
144     {
145         QueueNodeAllocatorType allocator;
146         auto *mem = allocator.allocate(1U);
147         ASSERT(mem != nullptr);
148         return new (mem) QueueNode;
149     }
150 
DeleteQueueNode(QueueNode * node)151     void DeleteQueueNode(QueueNode *node)
152     {
153         QueueNodeAllocatorType allocator;
154         std::allocator_traits<QueueNodeAllocatorType>::destroy(allocator, node);
155         allocator.deallocate(node, 1U);
156     }
157 
GetNodeIndex(size_t index)158     size_t GetNodeIndex(size_t index)
159     {
160         return index & QUEUE_NODE_MASK;
161     }
162 
163     alignas(ark::COHERENCY_LINE_SIZE) std::atomic<QueueNode *> head_ = {nullptr};
164     alignas(ark::COHERENCY_LINE_SIZE) std::atomic<QueueNode *> tail_ = {nullptr};
165 
166     alignas(ark::COHERENCY_LINE_SIZE) std::atomic<size_t> popIndex_ = {1UL};
167     alignas(ark::COHERENCY_LINE_SIZE) std::atomic<size_t> pushIndex_ = {1UL};
168 };
169 
170 }  // namespace ark::taskmanager::internal
171 
172 #endif  // LIBPANDABASE_TASKMANAGER_UTILS_SP_SC_LOCK_FREE_QUEUE_H
173