• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef INTELL_VOICE_BUFFER_QUEUE_H
16 #define INTELL_VOICE_BUFFER_QUEUE_H
17 
18 #include <unistd.h>
19 #include <queue>
20 #include <mutex>
21 #include <condition_variable>
22 #include <chrono>
23 #include "array_buffer_util.h"
24 #include "intell_voice_log.h"
25 
26 #define LOG_TAG "QueueUtil"
27 
28 namespace OHOS {
29 namespace IntellVoiceUtils {
30 constexpr uint32_t MAX_CAPACITY = 500;
31 
32 template <typename T>
33 class QueueUtil {
34 public:
35     QueueUtil() = default;
~QueueUtil()36     ~QueueUtil()
37     {
38         Uninit();
39     }
40     bool Init(uint32_t capacity = MAX_CAPACITY)
41     {
42         std::unique_lock<std::mutex> lock(queueMutex_);
43         SetAvailable(true);
44         capacity_ = capacity;
45         return true;
46     }
47     bool Push(const T &element, bool isWait = true)
48     {
49         std::unique_lock<std::mutex> lock(queueMutex_);
50         CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
51 
52         while (queue_.size() >= capacity_) {
53             CHECK_CONDITION_RETURN_FALSE((!isWait), "queue is full, no need to wait");
54             notFullCv_.wait(lock, [&]() { return ((queue_.size() < capacity_) || (!IsAvailable())); });
55             CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
56         }
57 
58         queue_.push(element);
59         notEmptyCv_.notify_one();
60         return true;
61     }
62     bool Push(T &&element, bool isWait = true)
63     {
64         std::unique_lock<std::mutex> lock(queueMutex_);
65         CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
66 
67         while (queue_.size() >= capacity_) {
68             if (!isWait) {
69                 return false;
70             }
71             notFullCv_.wait(lock, [&]() { return ((queue_.size() < capacity_) || (!IsAvailable())); });
72             CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
73         }
74 
75         queue_.push(std::move(element));
76         notEmptyCv_.notify_one();
77         return true;
78     }
Pop(T & element)79     bool Pop(T &element)
80     {
81         std::unique_lock<std::mutex> lock(queueMutex_);
82         CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
83 
84         while (queue_.empty()) {
85             notEmptyCv_.wait(lock, [&] { return (!queue_.empty() || !IsAvailable()); });
86             CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
87         }
88 
89         element = std::move(queue_.front());
90         queue_.pop();
91         notFullCv_.notify_one();
92         return true;
93     }
PopUntilTimeout(uint32_t timeLenMs,T & element)94     bool PopUntilTimeout(uint32_t timeLenMs, T &element)
95     {
96         std::unique_lock<std::mutex> lock(queueMutex_);
97         CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
98 
99         while (queue_.empty()) {
100             if (!(notEmptyCv_.wait_for(lock, std::chrono::milliseconds(timeLenMs),
101                 [&] { return (!queue_.empty() || !IsAvailable()); }))) {
102                 INTELL_VOICE_LOG_WARN("wait time out");
103                 return false;
104             }
105             CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
106         }
107 
108         element = std::move(queue_.front());
109         queue_.pop();
110         notFullCv_.notify_one();
111         return true;
112     }
Uninit()113     void Uninit()
114     {
115         {
116             std::unique_lock<std::mutex> lock(queueMutex_);
117             capacity_ = 0;
118             ClearQueue();
119             SetAvailable(false);
120         }
121         notEmptyCv_.notify_all();
122         notFullCv_.notify_all();
123     }
124 private:
IsAvailable()125     bool IsAvailable() const
126     {
127         return isAvailable_;
128     }
SetAvailable(bool isAvailable)129     void SetAvailable(bool isAvailable)
130     {
131         isAvailable_ = isAvailable;
132     }
ClearQueue()133     void ClearQueue()
134     {
135         while (!queue_.empty()) {
136             queue_.pop();
137         }
138     }
139 
140 private:
141     bool isAvailable_ = false;
142     uint32_t capacity_ = 0;
143     std::mutex queueMutex_;
144     std::condition_variable notEmptyCv_;
145     std::condition_variable notFullCv_;
146     std::queue<T> queue_;
147 };
148 
149 using Uint8ArrayBufferQueue = QueueUtil<std::unique_ptr<Uint8ArrayBuffer>>;
150 }
151 }
152 
153 #undef LOG_TAG
154 
155 #endif