1 /* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef INTELL_VOICE_BUFFER_QUEUE_H 16 #define INTELL_VOICE_BUFFER_QUEUE_H 17 18 #include <unistd.h> 19 #include <queue> 20 #include <mutex> 21 #include <condition_variable> 22 #include <chrono> 23 #include "array_buffer_util.h" 24 #include "intell_voice_log.h" 25 26 #define LOG_TAG "QueueUtil" 27 28 namespace OHOS { 29 namespace IntellVoiceUtils { 30 constexpr uint32_t MAX_CAPACITY = 500; 31 32 template <typename T> 33 class QueueUtil { 34 public: 35 QueueUtil() = default; ~QueueUtil()36 ~QueueUtil() 37 { 38 Uninit(); 39 } 40 bool Init(uint32_t capacity = MAX_CAPACITY) 41 { 42 std::unique_lock<std::mutex> lock(queueMutex_); 43 SetAvailable(true); 44 capacity_ = capacity; 45 return true; 46 } 47 bool Push(const T &element, bool isWait = true) 48 { 49 std::unique_lock<std::mutex> lock(queueMutex_); 50 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 51 52 while (queue_.size() >= capacity_) { 53 CHECK_CONDITION_RETURN_FALSE((!isWait), "queue is full, no need to wait"); 54 notFullCv_.wait(lock, [&]() { return ((queue_.size() < capacity_) || (!IsAvailable())); }); 55 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 56 } 57 58 queue_.push(element); 59 notEmptyCv_.notify_one(); 60 return true; 61 } 62 bool Push(T &&element, bool isWait = true) 63 { 64 std::unique_lock<std::mutex> lock(queueMutex_); 65 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 66 67 while (queue_.size() >= capacity_) { 68 if (!isWait) { 69 return false; 70 } 71 notFullCv_.wait(lock, [&]() { return ((queue_.size() < capacity_) || (!IsAvailable())); }); 72 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 73 } 74 75 queue_.push(std::move(element)); 76 notEmptyCv_.notify_one(); 77 return true; 78 } Pop(T & element)79 bool Pop(T &element) 80 { 81 std::unique_lock<std::mutex> lock(queueMutex_); 82 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 83 84 while (queue_.empty()) { 85 notEmptyCv_.wait(lock, [&] { return (!queue_.empty() || !IsAvailable()); }); 86 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 87 } 88 89 element = std::move(queue_.front()); 90 queue_.pop(); 91 notFullCv_.notify_one(); 92 return true; 93 } PopUntilTimeout(uint32_t timeLenMs,T & element)94 bool PopUntilTimeout(uint32_t timeLenMs, T &element) 95 { 96 std::unique_lock<std::mutex> lock(queueMutex_); 97 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 98 99 while (queue_.empty()) { 100 if (!(notEmptyCv_.wait_for(lock, std::chrono::milliseconds(timeLenMs), 101 [&] { return (!queue_.empty() || !IsAvailable()); }))) { 102 INTELL_VOICE_LOG_WARN("wait time out"); 103 return false; 104 } 105 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 106 } 107 108 element = std::move(queue_.front()); 109 queue_.pop(); 110 notFullCv_.notify_one(); 111 return true; 112 } Uninit()113 void Uninit() 114 { 115 { 116 std::unique_lock<std::mutex> lock(queueMutex_); 117 capacity_ = 0; 118 ClearQueue(); 119 SetAvailable(false); 120 } 121 notEmptyCv_.notify_all(); 122 notFullCv_.notify_all(); 123 } 124 private: IsAvailable()125 bool IsAvailable() const 126 { 127 return isAvailable_; 128 } SetAvailable(bool isAvailable)129 void SetAvailable(bool isAvailable) 130 { 131 isAvailable_ = isAvailable; 132 } ClearQueue()133 void ClearQueue() 134 { 135 while (!queue_.empty()) { 136 queue_.pop(); 137 } 138 } 139 140 private: 141 bool isAvailable_ = false; 142 uint32_t capacity_ = 0; 143 std::mutex queueMutex_; 144 std::condition_variable notEmptyCv_; 145 std::condition_variable notFullCv_; 146 std::queue<T> queue_; 147 }; 148 149 using Uint8ArrayBufferQueue = QueueUtil<std::unique_ptr<Uint8ArrayBuffer>>; 150 } 151 } 152 153 #undef LOG_TAG 154 155 #endif