• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef LIBPANDABASE_TASKMANAGER_UTILS_SP_SC_LOCK_FREE_QUEUE_H
17 #define LIBPANDABASE_TASKMANAGER_UTILS_SP_SC_LOCK_FREE_QUEUE_H
18 
19 #include "libpandabase/macros.h"
20 #include "libpandabase/utils/math_helpers.h"
21 #include "coherency_line_size.h"
22 #include <array>
23 #include <atomic>
24 #include <optional>
25 
26 namespace ark::taskmanager::internal {
27 
28 /**
29  * @brief SPSCLockFreeQueue is single producer, single consumer lock free queue.
30  * @tparam T: Type of class you want ot store in queue
31  * @tparam Allocator: Type of allocator that will be used to allocate nodes
32  */
33 template <class T, class Allocator>
34 class SPSCLockFreeQueue {
35     static constexpr size_t QUEUE_NODE_SIZE = 1UL << 5U;
36     static_assert(ark::helpers::math::IsPowerOfTwo(QUEUE_NODE_SIZE));
37     static constexpr size_t QUEUE_NODE_MASK = QUEUE_NODE_SIZE - 1;
38     static_assert(QUEUE_NODE_MASK > 0);
39 
40     struct QueueNode {
41         std::array<T, QUEUE_NODE_SIZE> buffer;
42         std::atomic<QueueNode *> next = nullptr;
43     };
44 
45     using QueueNodeAllocatorType = typename Allocator::template rebind<QueueNode>::other;
46     template <class U, class OtherAllocator>
47     friend class SPSCLockFreeQueue;
48 
49 public:
SPSCLockFreeQueue()50     SPSCLockFreeQueue()
51     {
52         auto *node = GetNewQueueNode();
53         // Atomic with release order reason: other threads should see correct value
54         head_.store(node, std::memory_order_release);
55         // Atomic with release order reason: other threads should see correct value
56         tail_.store(node, std::memory_order_release);
57     }
58     NO_COPY_SEMANTIC(SPSCLockFreeQueue);
59     NO_MOVE_SEMANTIC(SPSCLockFreeQueue);
60 
Push(T && val)61     void Push(T &&val)
62     {
63         // Atomic with acquire order reason: gets correct value
64         auto *tail = tail_.load(std::memory_order_acquire);
65         // Atomic with acquire order reason: gets correct value
66         auto pushIndex = pushIndex_.load(std::memory_order_acquire);
67         if (UNLIKELY(GetNodeIndex(pushIndex) == 0 && pushIndex != 0)) {
68             auto *node = GetNewQueueNode();
69             // Atomic with release order reason: other threads should see correct value
70             tail->next.store(node, std::memory_order_release);
71             node->buffer[0] = std::move(val);
72             // Atomic with release order reason: other threads should see correct value
73             tail_.store(node, std::memory_order_release);
74         } else {
75             tail->buffer[GetNodeIndex(pushIndex)] = std::move(val);
76         }
77         ASSERT(pushIndex != SIZE_MAX);
78         // Atomic with release order reason: other threads should see correct value
79         pushIndex_.store(pushIndex + 1U, std::memory_order_release);
80     }
81 
Pop()82     std::optional<T> Pop()
83     {
84         // Atomic with acquire order reason: gets correct value
85         auto *head = head_.load(std::memory_order_acquire);
86         // Atomic With acquire order reason: gets correct value
87         auto pushIndex = pushIndex_.load(std::memory_order_acquire);
88         // Atomic with acquire order reason: gets correct value
89         auto popIndex = popIndex_.load(std::memory_order_acquire);
90         if (popIndex == pushIndex) {
91             return std::nullopt;
92         }
93 
94         T val;
95         if (UNLIKELY(GetNodeIndex(popIndex) == 0 && popIndex != 0)) {
96             // Atomic with acquire order reason: gets correct value
97             auto *nextHead = head->next.load(std::memory_order_acquire);
98             ASSERT(nextHead != nullptr);
99             // Atomic with release order reason: other threads should see correct value
100             head_.store(nextHead, std::memory_order_release);
101             DeleteQueueNode(head);
102             val = std::move(nextHead->buffer[0]);
103         } else {
104             val = std::move(head->buffer[GetNodeIndex(popIndex)]);
105         }
106         ASSERT(popIndex != SIZE_MAX);
107         // Atomic with release order reason: other threads should see correct value
108         popIndex_.store(popIndex + 1, std::memory_order_release);
109 
110         return val;
111     }
112 
IsEmpty()113     bool inline IsEmpty() const
114     {
115         return Size() == 0;
116     }
117 
Size()118     size_t inline Size() const
119     {
120         // Atomic with acquire order reason: gets correct value
121         auto popIndex = popIndex_.load(std::memory_order_acquire);
122         // Atomic with acquire order reason: gets correct value
123         auto pushIndex = pushIndex_.load(std::memory_order_acquire);
124         return pushIndex - popIndex;
125     }
126 
~SPSCLockFreeQueue()127     ~SPSCLockFreeQueue()
128     {
129         ASSERT(pushIndex_ == popIndex_);
130         ASSERT(head_ == tail_);
131         // Atomic with acquire order reason: gets correct value
132         auto head = head_.load(std::memory_order_acquire);
133         DeleteQueueNode(head);
134     }
135 
136 private:
GetNewQueueNode()137     QueueNode *GetNewQueueNode()
138     {
139         QueueNodeAllocatorType allocator;
140         auto *mem = allocator.allocate(1U);
141         ASSERT(mem != nullptr);
142         return new (mem) QueueNode;
143     }
144 
DeleteQueueNode(QueueNode * node)145     void DeleteQueueNode(QueueNode *node)
146     {
147         QueueNodeAllocatorType allocator;
148         std::allocator_traits<QueueNodeAllocatorType>::destroy(allocator, node);
149         allocator.deallocate(node, 1U);
150     }
151 
GetNodeIndex(size_t index)152     size_t GetNodeIndex(size_t index)
153     {
154         return index & QUEUE_NODE_MASK;
155     }
156 
157     alignas(ark::COHERENCY_LINE_SIZE) std::atomic<QueueNode *> head_ = {nullptr};
158     alignas(ark::COHERENCY_LINE_SIZE) std::atomic<QueueNode *> tail_ = {nullptr};
159 
160     alignas(ark::COHERENCY_LINE_SIZE) std::atomic<size_t> popIndex_ = {0UL};
161     alignas(ark::COHERENCY_LINE_SIZE) std::atomic<size_t> pushIndex_ = {0UL};
162 };
163 
164 }  // namespace ark::taskmanager::internal
165 
166 #endif  // LIBPANDABASE_TASKMANAGER_UTILS_SP_SC_LOCK_FREE_QUEUE_H
167