1 /** 2 * Copyright 2022 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_RING_QUEUE_H_ 18 #define MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_RING_QUEUE_H_ 19 20 #include <atomic> 21 #include <array> 22 #include <cstddef> 23 #include <thread> 24 25 namespace mindspore { 26 // A simple ring buffer (or circular queue) with atomic operations for 27 // thread-safe enqueue, dequeue, and check for emptiness. 28 // RingQueue is only applicable to single-producer and single-consumer scenarios. 29 template <typename T, std::size_t Capacity> 30 class RingQueue { 31 public: RingQueue()32 RingQueue() : head_(0), tail_(0) {} 33 Enqueue(const T & value)34 void Enqueue(const T &value) { 35 std::size_t current_tail = tail_.load(std::memory_order_relaxed); 36 std::size_t next_tail = (current_tail + 1) % Capacity; 37 38 while (next_tail == head_.load(std::memory_order_acquire)) { 39 } 40 41 buffer_[current_tail] = value; 42 tail_.store(next_tail, std::memory_order_release); 43 } 44 Dequeue()45 void Dequeue() { 46 std::size_t current_head = head_.load(std::memory_order_relaxed); 47 while (current_head == tail_.load(std::memory_order_acquire)) { 48 } 49 50 // Free memory when task is finished. 51 buffer_[current_head] = nullptr; 52 head_.store((current_head + 1) % Capacity, std::memory_order_release); 53 } 54 Head()55 const T &Head() { 56 std::size_t current_head = head_.load(std::memory_order_acquire); 57 while (current_head == tail_.load(std::memory_order_acquire)) { 58 } 59 return buffer_[current_head]; 60 } 61 IsEmpty()62 bool IsEmpty() const { return head_.load(std::memory_order_acquire) == tail_.load(std::memory_order_acquire); } 63 64 private: 65 std::array<T, Capacity> buffer_; 66 // CPU cache line size is 64. 67 alignas(64) std::atomic<std::size_t> head_; 68 alignas(64) std::atomic<std::size_t> tail_; 69 }; 70 } // namespace mindspore 71 72 #endif // MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_RING_QUEUE_H_ 73