1 /* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef LIBPANDABASE_TASKMANAGER_UTILS_SP_SC_LOCK_FREE_QUEUE_H 17 #define LIBPANDABASE_TASKMANAGER_UTILS_SP_SC_LOCK_FREE_QUEUE_H 18 19 #include "libpandabase/macros.h" 20 #include "libpandabase/utils/math_helpers.h" 21 #include "coherency_line_size.h" 22 #include <array> 23 #include <atomic> 24 #include <optional> 25 26 namespace ark::taskmanager::internal { 27 28 /** 29 * @brief SPSCLockFreeQueue is single producer, single consumer lock free queue. 30 * @tparam T: Type of class you want ot store in queue 31 * @tparam Allocator: Type of allocator that will be used to allocate nodes 32 */ 33 template <class T, class Allocator> 34 class SPSCLockFreeQueue { 35 static constexpr size_t QUEUE_NODE_SIZE = 1UL << 5U; 36 static_assert(ark::helpers::math::IsPowerOfTwo(QUEUE_NODE_SIZE)); 37 static constexpr size_t QUEUE_NODE_MASK = QUEUE_NODE_SIZE - 1; 38 static_assert(QUEUE_NODE_MASK > 0); 39 40 struct QueueNode { 41 std::array<T, QUEUE_NODE_SIZE> buffer; 42 std::atomic<QueueNode *> next = nullptr; 43 }; 44 45 using QueueNodeAllocatorType = typename Allocator::template rebind<QueueNode>::other; 46 template <class U, class OtherAllocator> 47 friend class SPSCLockFreeQueue; 48 49 public: SPSCLockFreeQueue()50 SPSCLockFreeQueue() 51 { 52 auto *node = GetNewQueueNode(); 53 // Atomic with release order reason: other threads should see correct value 54 head_.store(node, std::memory_order_release); 55 // Atomic with release order reason: other threads should see correct value 56 tail_.store(node, std::memory_order_release); 57 } 58 NO_COPY_SEMANTIC(SPSCLockFreeQueue); 59 NO_MOVE_SEMANTIC(SPSCLockFreeQueue); 60 Push(T && val)61 void Push(T &&val) 62 { 63 // Atomic with acquire order reason: gets correct value 64 auto *tail = tail_.load(std::memory_order_acquire); 65 // Atomic with acquire order reason: gets correct value 66 auto pushIndex = pushIndex_.load(std::memory_order_acquire); 67 if (UNLIKELY(GetNodeIndex(pushIndex) == 0 && pushIndex != 0)) { 68 auto *node = GetNewQueueNode(); 69 // Atomic with release order reason: other threads should see correct value 70 tail->next.store(node, std::memory_order_release); 71 node->buffer[0] = std::move(val); 72 // Atomic with release order reason: other threads should see correct value 73 tail_.store(node, std::memory_order_release); 74 } else { 75 tail->buffer[GetNodeIndex(pushIndex)] = std::move(val); 76 } 77 ASSERT(pushIndex != SIZE_MAX); 78 // Atomic with release order reason: other threads should see correct value 79 pushIndex_.store(pushIndex + 1U, std::memory_order_release); 80 } 81 Pop()82 std::optional<T> Pop() 83 { 84 // Atomic with acquire order reason: gets correct value 85 auto *head = head_.load(std::memory_order_acquire); 86 // Atomic With acquire order reason: gets correct value 87 auto pushIndex = pushIndex_.load(std::memory_order_acquire); 88 // Atomic with acquire order reason: gets correct value 89 auto popIndex = popIndex_.load(std::memory_order_acquire); 90 if (popIndex == pushIndex) { 91 return std::nullopt; 92 } 93 94 T val; 95 if (UNLIKELY(GetNodeIndex(popIndex) == 0 && popIndex != 0)) { 96 // Atomic with acquire order reason: gets correct value 97 auto *nextHead = head->next.load(std::memory_order_acquire); 98 ASSERT(nextHead != nullptr); 99 // Atomic with release order reason: other threads should see correct value 100 head_.store(nextHead, std::memory_order_release); 101 DeleteQueueNode(head); 102 val = std::move(nextHead->buffer[0]); 103 } else { 104 val = std::move(head->buffer[GetNodeIndex(popIndex)]); 105 } 106 ASSERT(popIndex != SIZE_MAX); 107 // Atomic with release order reason: other threads should see correct value 108 popIndex_.store(popIndex + 1, std::memory_order_release); 109 110 return val; 111 } 112 IsEmpty()113 bool inline IsEmpty() const 114 { 115 return Size() == 0; 116 } 117 Size()118 size_t inline Size() const 119 { 120 // Atomic with acquire order reason: gets correct value 121 auto popIndex = popIndex_.load(std::memory_order_acquire); 122 // Atomic with acquire order reason: gets correct value 123 auto pushIndex = pushIndex_.load(std::memory_order_acquire); 124 return pushIndex - popIndex; 125 } 126 ~SPSCLockFreeQueue()127 ~SPSCLockFreeQueue() 128 { 129 ASSERT(pushIndex_ == popIndex_); 130 ASSERT(head_ == tail_); 131 // Atomic with acquire order reason: gets correct value 132 auto head = head_.load(std::memory_order_acquire); 133 DeleteQueueNode(head); 134 } 135 136 private: GetNewQueueNode()137 QueueNode *GetNewQueueNode() 138 { 139 QueueNodeAllocatorType allocator; 140 auto *mem = allocator.allocate(1U); 141 ASSERT(mem != nullptr); 142 return new (mem) QueueNode; 143 } 144 DeleteQueueNode(QueueNode * node)145 void DeleteQueueNode(QueueNode *node) 146 { 147 QueueNodeAllocatorType allocator; 148 std::allocator_traits<QueueNodeAllocatorType>::destroy(allocator, node); 149 allocator.deallocate(node, 1U); 150 } 151 GetNodeIndex(size_t index)152 size_t GetNodeIndex(size_t index) 153 { 154 return index & QUEUE_NODE_MASK; 155 } 156 157 alignas(ark::COHERENCY_LINE_SIZE) std::atomic<QueueNode *> head_ = {nullptr}; 158 alignas(ark::COHERENCY_LINE_SIZE) std::atomic<QueueNode *> tail_ = {nullptr}; 159 160 alignas(ark::COHERENCY_LINE_SIZE) std::atomic<size_t> popIndex_ = {0UL}; 161 alignas(ark::COHERENCY_LINE_SIZE) std::atomic<size_t> pushIndex_ = {0UL}; 162 }; 163 164 } // namespace ark::taskmanager::internal 165 166 #endif // LIBPANDABASE_TASKMANAGER_UTILS_SP_SC_LOCK_FREE_QUEUE_H 167