1 /** 2 * Copyright 2023 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_EMBEDDING_CACHE_BLOCKING_QUEUE_H_ 18 #define MINDSPORE_CCSRC_DISTRIBUTED_EMBEDDING_CACHE_BLOCKING_QUEUE_H_ 19 20 #include <memory> 21 #include <mutex> 22 #include <condition_variable> 23 #include "utils/ms_utils.h" 24 25 namespace mindspore { 26 namespace distributed { 27 /** 28 * @brief This class implements a generic blocking queue and could be used for the multi-producer, multi-consumer case. 29 * For performance, the queue element is a pointer, and the user needs to do their own memory management 30 * (pointer lifetime). 31 */ 32 template <typename T> 33 class BlockingQueue { 34 public: BlockingQueue(size_t capacity)35 explicit BlockingQueue(size_t capacity) : capacity_(capacity) { elements_ = std::make_unique<T *[]>(capacity_); } 36 ~BlockingQueue() = default; 37 38 /** 39 * @brief Push new data to tail of queue. 40 * @param[in] `data`: The pointer to new element to enqueue. 41 */ Push(T * data)42 void Push(T *data) { 43 std::unique_lock<std::mutex> lock(mtx_); 44 while (Full()) { 45 if (closed_) { 46 return; 47 } 48 49 full_cv_.wait(lock); 50 } 51 52 elements_[tail_] = data; 53 tail_ = (tail_ + 1) % capacity_; 54 ++size_; 55 56 empty_cv_.notify_one(); 57 } 58 59 /** 60 * @brief Get the first element(at head position in queue) of the queue and removes it from the queue. 61 * @return The element which need to dequeue. 62 */ Pop()63 T *Pop() { 64 std::unique_lock<std::mutex> lock(mtx_); 65 while (Empty()) { 66 if (closed_) { 67 return nullptr; 68 } 69 70 empty_cv_.wait(lock); 71 } 72 73 auto pop_value = elements_[head_]; 74 head_ = (head_ + 1) % capacity_; 75 --size_; 76 77 full_cv_.notify_one(); 78 return pop_value; 79 } 80 81 /** 82 * @brief Check whether there is no element in queue. 83 * @return Whether there is no element in queue. 84 */ Empty()85 bool Empty() { return size_ == 0; } 86 87 /** 88 * @brief Check whether the number of queue elements reaches capacity. 89 * @return Whether the number of queue elements reaches capacity. 90 */ Full()91 bool Full() { return size_ == capacity_; } 92 93 /** 94 * @brief Close the queue and stop push and pop operations. 95 */ Close()96 void Close() { 97 std::unique_lock<std::mutex> lock(mtx_); 98 if (!closed_) { 99 closed_ = true; 100 full_cv_.notify_all(); 101 empty_cv_.notify_all(); 102 } 103 } 104 105 private: 106 DISABLE_COPY_AND_ASSIGN(BlockingQueue); 107 108 // The maximum capacity of queue. 109 size_t capacity_; 110 // The element number in queue. 111 size_t size_{0}; 112 113 // The buffer used to record elements in the queue. 114 std::unique_ptr<T *[]> elements_; 115 116 // The cursor used to point the head position. 117 size_t head_{0}; 118 // The cursor used to point the tail position. 119 size_t tail_{0}; 120 121 // The flag indicates whether the queue is closed. 122 bool closed_{false}; 123 124 // A lock used to secure the access of queue elements. 125 std::mutex mtx_; 126 // Used to block the push operations when queue is full. 127 std::condition_variable full_cv_; 128 // Used to block the pop operations when queue is empty. 129 std::condition_variable empty_cv_; 130 }; 131 } // namespace distributed 132 } // namespace mindspore 133 #endif // MINDSPORE_CCSRC_DISTRIBUTED_EMBEDDING_CACHE_BLOCKING_QUEUE_H_ 134