• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_RING_QUEUE_H_
18 #define MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_RING_QUEUE_H_
19 
20 #include <atomic>
21 #include <array>
22 #include <cstddef>
23 #include <thread>
24 
25 namespace mindspore {
26 // A simple ring buffer (or circular queue) with atomic operations for
27 // thread-safe enqueue, dequeue, and check for emptiness.
28 // RingQueue is only applicable to single-producer and single-consumer scenarios.
29 template <typename T, std::size_t Capacity>
30 class RingQueue {
31  public:
RingQueue()32   RingQueue() : head_(0), tail_(0) {}
33 
Enqueue(const T & value)34   void Enqueue(const T &value) {
35     std::size_t current_tail = tail_.load(std::memory_order_relaxed);
36     std::size_t next_tail = (current_tail + 1) % Capacity;
37 
38     while (next_tail == head_.load(std::memory_order_acquire)) {
39     }
40 
41     buffer_[current_tail] = value;
42     tail_.store(next_tail, std::memory_order_release);
43   }
44 
Dequeue()45   void Dequeue() {
46     std::size_t current_head = head_.load(std::memory_order_relaxed);
47     while (current_head == tail_.load(std::memory_order_acquire)) {
48     }
49 
50     // Free memory when task is finished.
51     buffer_[current_head] = nullptr;
52     head_.store((current_head + 1) % Capacity, std::memory_order_release);
53   }
54 
Head()55   const T &Head() {
56     std::size_t current_head = head_.load(std::memory_order_acquire);
57     while (current_head == tail_.load(std::memory_order_acquire)) {
58     }
59     return buffer_[current_head];
60   }
61 
IsEmpty()62   bool IsEmpty() const { return head_.load(std::memory_order_acquire) == tail_.load(std::memory_order_acquire); }
63 
64  private:
65   std::array<T, Capacity> buffer_;
66   // CPU cache line size is 64.
67   alignas(64) std::atomic<std::size_t> head_;
68   alignas(64) std::atomic<std::size_t> tail_;
69 };
70 }  // namespace mindspore
71 
72 #endif  // MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_RING_QUEUE_H_
73