1 /* 2 * Copyright (c) 2024-2025 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef LIBPANDABASE_TASKMANAGER_UTILS_SP_SC_LOCK_FREE_QUEUE_H 17 #define LIBPANDABASE_TASKMANAGER_UTILS_SP_SC_LOCK_FREE_QUEUE_H 18 19 #include "libpandabase/macros.h" 20 #include "libpandabase/utils/math_helpers.h" 21 #include "coherency_line_size.h" 22 #include <array> 23 #include <atomic> 24 25 namespace ark::taskmanager::internal { 26 27 /** 28 * @brief SPSCLockFreeQueue is single producer, single consumer lock free queue. 29 * @tparam T: Type of class you want ot store in queue 30 * @tparam Allocator: Type of allocator that will be used to allocate nodes 31 */ 32 template <class T, class Allocator> 33 class SPSCLockFreeQueue { 34 static constexpr size_t QUEUE_NODE_SIZE = 1UL << 5U; 35 static_assert(ark::helpers::math::IsPowerOfTwo(QUEUE_NODE_SIZE)); 36 static constexpr size_t QUEUE_NODE_MASK = QUEUE_NODE_SIZE - 1; 37 static_assert(QUEUE_NODE_MASK > 0); 38 39 struct QueueNode { 40 std::array<T, QUEUE_NODE_SIZE> buffer; 41 std::atomic<QueueNode *> next = nullptr; 42 }; 43 44 using QueueNodeAllocatorType = typename std::allocator_traits<Allocator>::template rebind_alloc<QueueNode>; 45 template <class U, class OtherAllocator> 46 friend class SPSCLockFreeQueue; 47 48 public: SPSCLockFreeQueue()49 SPSCLockFreeQueue() 50 { 51 auto *node = GetNewQueueNode(); 52 // Atomic with relaxed order reason: no order requirement 53 head_.store(node, std::memory_order_relaxed); 54 // Atomic with relaxed order reason: no order requirement 55 tail_.store(node, std::memory_order_relaxed); 56 } ~SPSCLockFreeQueue()57 ~SPSCLockFreeQueue() 58 { 59 ASSERT(pushIndex_ == popIndex_); 60 ASSERT(head_ == tail_); 61 // Atomic with relaxed order reason: no order requirement 62 auto head = head_.load(std::memory_order_relaxed); 63 DeleteQueueNode(head); 64 } 65 66 NO_COPY_SEMANTIC(SPSCLockFreeQueue); 67 NO_MOVE_SEMANTIC(SPSCLockFreeQueue); 68 Push(T && val)69 void Push(T &&val) 70 { 71 // Atomic with relaxed order reason: gets local variable 72 auto *tail = tail_.load(std::memory_order_relaxed); 73 // Atomic with relaxed order reason: gets local variable 74 auto pushIndex = pushIndex_.load(std::memory_order_relaxed); 75 if UNLIKELY (GetNodeIndex(pushIndex) == 0) { 76 auto *node = GetNewQueueNode(); 77 // Atomic with relaxed order reason: set in local variable 78 tail->next.store(node, std::memory_order_relaxed); 79 node->buffer[0] = std::move(val); 80 // Atomic with relaxed order reason: set in local variable 81 tail_.store(node, std::memory_order_relaxed); 82 } else { 83 tail->buffer[GetNodeIndex(pushIndex)] = std::move(val); 84 } 85 // Atomic with release order reason: other threads should see correct value 86 pushIndex_.store(pushIndex + 1U, std::memory_order_release); 87 } 88 TryPop(T * val)89 bool TryPop(T *val) 90 { 91 ASSERT(val != nullptr); 92 // Atomic with relaxed order reason: gets local variable 93 auto *head = head_.load(std::memory_order_relaxed); 94 // Atomic With relaxed order reason: gets local variable 95 auto popIndex = popIndex_.load(std::memory_order_relaxed); 96 // Atomic with acquire order reason: need observe on pushes local variables 97 auto pushIndex = pushIndex_.load(std::memory_order_acquire); 98 if (popIndex == pushIndex) { 99 return false; 100 } 101 102 if UNLIKELY (GetNodeIndex(popIndex) == 0) { 103 // Atomic with relaxed order reason: gets local variable 104 auto *nextHead = head->next.load(std::memory_order_relaxed); 105 ASSERT(nextHead != nullptr); 106 // Atomic with relaxed order reason: set in local variable 107 head_.store(nextHead, std::memory_order_relaxed); 108 DeleteQueueNode(head); 109 *val = std::move(nextHead->buffer[0]); 110 } else { 111 *val = std::move(head->buffer[GetNodeIndex(popIndex)]); 112 } 113 ASSERT(popIndex != SIZE_MAX); 114 // Atomic with relaxed order reason: set in local variable 115 popIndex_.store(popIndex + 1, std::memory_order_relaxed); 116 117 return true; 118 } 119 Pop()120 T Pop() 121 { 122 T val; 123 while (!TryPop(&val)) { 124 } 125 return val; 126 } 127 IsEmpty()128 bool inline IsEmpty() const 129 { 130 return Size() == 0; 131 } 132 Size()133 size_t inline Size() const 134 { 135 // Atomic with relaxed order reason: gets correct value 136 auto pushIndex = pushIndex_.load(std::memory_order_relaxed); 137 // Atomic with relaxed order reason: gets correct value 138 auto popIndex = popIndex_.load(std::memory_order_relaxed); 139 return pushIndex - popIndex; 140 } 141 142 private: GetNewQueueNode()143 QueueNode *GetNewQueueNode() 144 { 145 QueueNodeAllocatorType allocator; 146 auto *mem = allocator.allocate(1U); 147 ASSERT(mem != nullptr); 148 return new (mem) QueueNode; 149 } 150 DeleteQueueNode(QueueNode * node)151 void DeleteQueueNode(QueueNode *node) 152 { 153 QueueNodeAllocatorType allocator; 154 std::allocator_traits<QueueNodeAllocatorType>::destroy(allocator, node); 155 allocator.deallocate(node, 1U); 156 } 157 GetNodeIndex(size_t index)158 size_t GetNodeIndex(size_t index) 159 { 160 return index & QUEUE_NODE_MASK; 161 } 162 163 alignas(ark::COHERENCY_LINE_SIZE) std::atomic<QueueNode *> head_ = {nullptr}; 164 alignas(ark::COHERENCY_LINE_SIZE) std::atomic<QueueNode *> tail_ = {nullptr}; 165 166 alignas(ark::COHERENCY_LINE_SIZE) std::atomic<size_t> popIndex_ = {1UL}; 167 alignas(ark::COHERENCY_LINE_SIZE) std::atomic<size_t> pushIndex_ = {1UL}; 168 }; 169 170 } // namespace ark::taskmanager::internal 171 172 #endif // LIBPANDABASE_TASKMANAGER_UTILS_SP_SC_LOCK_FREE_QUEUE_H 173