• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_EMBEDDING_CACHE_BLOCKING_QUEUE_H_
18 #define MINDSPORE_CCSRC_DISTRIBUTED_EMBEDDING_CACHE_BLOCKING_QUEUE_H_
19 
20 #include <memory>
21 #include <mutex>
22 #include <condition_variable>
23 #include "utils/ms_utils.h"
24 
25 namespace mindspore {
26 namespace distributed {
27 /**
28  * @brief This class implements a generic blocking queue and could be used for the multi-producer, multi-consumer case.
29  * For performance, the queue element is a pointer, and the user needs to do their own memory management
30  * (pointer lifetime).
31  */
32 template <typename T>
33 class BlockingQueue {
34  public:
BlockingQueue(size_t capacity)35   explicit BlockingQueue(size_t capacity) : capacity_(capacity) { elements_ = std::make_unique<T *[]>(capacity_); }
36   ~BlockingQueue() = default;
37 
38   /**
39    * @brief Push new data to tail of queue.
40    * @param[in] `data`: The pointer to new element to enqueue.
41    */
Push(T * data)42   void Push(T *data) {
43     std::unique_lock<std::mutex> lock(mtx_);
44     while (Full()) {
45       if (closed_) {
46         return;
47       }
48 
49       full_cv_.wait(lock);
50     }
51 
52     elements_[tail_] = data;
53     tail_ = (tail_ + 1) % capacity_;
54     ++size_;
55 
56     empty_cv_.notify_one();
57   }
58 
59   /**
60    * @brief Get the first element(at head position in queue) of the queue and removes it from the queue.
61    * @return The element which need to dequeue.
62    */
Pop()63   T *Pop() {
64     std::unique_lock<std::mutex> lock(mtx_);
65     while (Empty()) {
66       if (closed_) {
67         return nullptr;
68       }
69 
70       empty_cv_.wait(lock);
71     }
72 
73     auto pop_value = elements_[head_];
74     head_ = (head_ + 1) % capacity_;
75     --size_;
76 
77     full_cv_.notify_one();
78     return pop_value;
79   }
80 
81   /**
82    * @brief Check whether there is no element in queue.
83    * @return Whether there is no element in queue.
84    */
Empty()85   bool Empty() { return size_ == 0; }
86 
87   /**
88    * @brief Check whether the number of queue elements reaches capacity.
89    * @return Whether the number of queue elements reaches capacity.
90    */
Full()91   bool Full() { return size_ == capacity_; }
92 
93   /**
94    * @brief Close the queue and stop push and pop operations.
95    */
Close()96   void Close() {
97     std::unique_lock<std::mutex> lock(mtx_);
98     if (!closed_) {
99       closed_ = true;
100       full_cv_.notify_all();
101       empty_cv_.notify_all();
102     }
103   }
104 
105  private:
106   DISABLE_COPY_AND_ASSIGN(BlockingQueue);
107 
108   // The maximum capacity of queue.
109   size_t capacity_;
110   // The element number in queue.
111   size_t size_{0};
112 
113   // The buffer used to record elements in the queue.
114   std::unique_ptr<T *[]> elements_;
115 
116   // The cursor used to point the head position.
117   size_t head_{0};
118   // The cursor used to point the tail position.
119   size_t tail_{0};
120 
121   // The flag indicates whether the queue is closed.
122   bool closed_{false};
123 
124   // A lock used to secure the access of queue elements.
125   std::mutex mtx_;
126   // Used to block the push operations when queue is full.
127   std::condition_variable full_cv_;
128   // Used to block the pop operations when queue is empty.
129   std::condition_variable empty_cv_;
130 };
131 }  // namespace distributed
132 }  // namespace mindspore
133 #endif  // MINDSPORE_CCSRC_DISTRIBUTED_EMBEDDING_CACHE_BLOCKING_QUEUE_H_
134