1 /* 2 * Copyright (c) 2025 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef AUDIO_SAFE_BLOCK_QUEUE_H 16 #define AUDIO_SAFE_BLOCK_QUEUE_H 17 18 namespace OHOS { 19 namespace AudioStandard { 20 21 /** 22 * @brief Provides interfaces for thread-safe blocking queues. 23 * 24 * The interfaces can be used to perform blocking and non-blocking push and 25 * pop operations on queues. 26 */ 27 template <typename T> 28 class AudioSafeBlockQueue { 29 public: AudioSafeBlockQueue(int capacity)30 explicit AudioSafeBlockQueue(int capacity) : maxSize_(capacity) 31 { 32 } 33 34 /** 35 * @brief Inserts an element at the end of this queue in blocking mode. 36 * 37 * If the queue is full, the thread of the push operation will be blocked 38 * until the queue has space. 39 * If the queue is not full, the push operation can be performed and one of the 40 * pop threads (blocked when the queue is empty) is woken up. 41 * 42 * @param elem Indicates the element to insert. 43 */ Push(T const & elem)44 virtual void Push(T const& elem) 45 { 46 std::unique_lock<std::mutex> lock(mutexLock_); 47 while (queueT_.size() >= maxSize_) { 48 // If the queue is full, wait for jobs to be taken. 49 cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); }); 50 } 51 52 // Insert the element into the queue if the queue is not full. 53 queueT_.push(elem); 54 cvNotEmpty_.notify_all(); 55 } 56 57 /** 58 * @brief Removes the first element from this queue in blocking mode. 59 * 60 * If the queue is empty, the thread of the pop operation will be blocked 61 * until the queue has elements. 62 * If the queue is not empty, the pop operation can be performed, the first 63 * element of the queue is returned, and one of the push threads (blocked 64 * when the queue is full) is woken up. 65 */ Pop()66 T Pop() 67 { 68 std::unique_lock<std::mutex> lock(mutexLock_); 69 70 while (queueT_.empty()) { 71 // If the queue is empty, wait for elements to be pushed in. 72 cvNotEmpty_.wait(lock, [&] { return !queueT_.empty(); }); 73 } 74 75 T elem = queueT_.front(); 76 queueT_.pop(); 77 cvNotFull_.notify_all(); 78 return elem; 79 } 80 81 /** 82 * @brief Inserts an element at the end of this queue in non-blocking mode. 83 * 84 * If the queue is full, <b>false</b> is returned directly. 85 * If the queue is not full, the push operation can be performed, one of the 86 * pop threads (blocked when the queue is empty) is woken up, and <b>true</b> 87 * is returned. 88 * 89 * @param elem Indicates the element to insert. 90 */ PushNoWait(T const & elem)91 virtual bool PushNoWait(T const& elem) 92 { 93 std::unique_lock<std::mutex> lock(mutexLock_); 94 if (queueT_.size() >= maxSize_) { 95 return false; 96 } 97 // Insert the element if the queue is not full. 98 queueT_.push(elem); 99 cvNotEmpty_.notify_all(); 100 return true; 101 } 102 103 /** 104 * @brief Removes the first element from this queue in non-blocking mode. 105 * 106 * If the queue is empty, <b>false</b> is returned directly. 107 * If the queue is not empty, the pop operation can be performed, one of the 108 * push threads (blocked when the queue is full) is woken up, and <b>true</b> 109 * is returned. 110 * 111 * @param outtask Indicates the data of the pop operation. 112 */ PopNotWait(T & outtask)113 bool PopNotWait(T& outtask) 114 { 115 std::unique_lock<std::mutex> lock(mutexLock_); 116 if (queueT_.empty()) { 117 return false; 118 } 119 outtask = queueT_.front(); 120 queueT_.pop(); 121 122 cvNotFull_.notify_all(); 123 124 return true; 125 } 126 PopAllNotWait()127 std::queue<T> PopAllNotWait() 128 { 129 std::queue<T> retQueue = {}; 130 std::unique_lock<std::mutex> lock(mutexLock_); 131 retQueue.swap(queueT_); 132 133 cvNotFull_.notify_all(); 134 135 return retQueue; 136 } 137 Size()138 unsigned int Size() 139 { 140 std::unique_lock<std::mutex> lock(mutexLock_); 141 return queueT_.size(); 142 } 143 144 template< class Rep, class Period > WaitNotEmptyFor(const std::chrono::duration<Rep,Period> & rel_time)145 void WaitNotEmptyFor(const std::chrono::duration<Rep, Period>& rel_time) 146 { 147 std::unique_lock<std::mutex> lock(mutexLock_); 148 cvNotEmpty_.wait_for(lock, rel_time, [this] { 149 return !queueT_.empty(); 150 }); 151 } 152 IsEmpty()153 bool IsEmpty() 154 { 155 std::unique_lock<std::mutex> lock(mutexLock_); 156 return queueT_.empty(); 157 } 158 IsFull()159 bool IsFull() 160 { 161 std::unique_lock<std::mutex> lock(mutexLock_); 162 return queueT_.size() == maxSize_; 163 } 164 Clear()165 void Clear() 166 { 167 std::unique_lock<std::mutex> lock(mutexLock_); 168 queueT_ = {}; 169 cvNotFull_.notify_all(); 170 } 171 ~AudioSafeBlockQueue()172 virtual ~AudioSafeBlockQueue() {} 173 174 protected: 175 unsigned long maxSize_; // Capacity of the queue 176 std::mutex mutexLock_; 177 std::condition_variable cvNotEmpty_; 178 std::condition_variable cvNotFull_; 179 std::queue<T> queueT_; 180 }; 181 } // namespace AudioStandard 182 } // namespace OHOS 183 #endif // AUDIO_SAFE_BLOCK_QUEUE_H