• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 /**
17  * @file safe_block_queue.h
18  *
19  * Provides interfaces for thread-safe blocking queues in c_utils.
20  * The file includes the <b>SafeBlockQueue</b> class and
21  * the <b>SafeBlockQueueTracking</b> class for trackable tasks.
22  */
23 
24 #ifndef UTILS_BASE_BLOCK_QUEUE_H
25 #define UTILS_BASE_BLOCK_QUEUE_H
26 
27 #include <climits>
28 #include <condition_variable>
29 #include <mutex>
30 #include <queue>
31 #include <atomic>
32 
33 namespace OHOS {
34 
35 /**
36  * @brief Provides interfaces for thread-safe blocking queues.
37  *
38  * The interfaces can be used to perform blocking and non-blocking push and
39  * pop operations on queues.
40  */
41 template <typename T>
42 class SafeBlockQueue {
43 public:
SafeBlockQueue(int capacity)44     explicit SafeBlockQueue(int capacity) : maxSize_(capacity)
45     {
46     }
47 
48 /**
49  * @brief Inserts an element at the end of this queue in blocking mode.
50  *
51  * If the queue is full, the thread of the push operation will be blocked
52  * until the queue has space.
53  * If the queue is not full, the push operation can be performed and one of the
54  * pop threads (blocked when the queue is empty) is woken up.
55  *
56  * @param elem Indicates the element to insert.
57  */
Push(T const & elem)58     virtual void Push(T const& elem)
59     {
60         std::unique_lock<std::mutex> lock(mutexLock_);
61         while (queueT_.size() >= maxSize_) {
62             // If the queue is full, wait for jobs to be taken.
63             cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); });
64         }
65 
66         // Insert the element into the queue if the queue is not full.
67         queueT_.push(elem);
68         cvNotEmpty_.notify_one();
69     }
70 
71 /**
72  * @brief Removes the first element from this queue in blocking mode.
73  *
74  * If the queue is empty, the thread of the pop operation will be blocked
75  * until the queue has elements.
76  * If the queue is not empty, the pop operation can be performed, the first
77  * element of the queue is returned, and one of the push threads (blocked
78  * when the queue is full) is woken up.
79  */
Pop()80     T Pop()
81     {
82         std::unique_lock<std::mutex> lock(mutexLock_);
83 
84         while (queueT_.empty()) {
85             // If the queue is empty, wait for elements to be pushed in.
86             cvNotEmpty_.wait(lock, [&] { return !queueT_.empty(); });
87         }
88 
89         T elem = queueT_.front();
90         queueT_.pop();
91         cvNotFull_.notify_one();
92         return elem;
93     }
94 
95 /**
96  * @brief Inserts an element at the end of this queue in non-blocking mode.
97  *
98  * If the queue is full, <b>false</b> is returned directly.
99  * If the queue is not full, the push operation can be performed, one of the
100  * pop threads (blocked when the queue is empty) is woken up, and <b>true</b>
101  * is returned.
102  *
103  * @param elem Indicates the element to insert.
104  */
PushNoWait(T const & elem)105     virtual bool PushNoWait(T const& elem)
106     {
107         std::unique_lock<std::mutex> lock(mutexLock_);
108         if (queueT_.size() >= maxSize_) {
109             return false;
110         }
111         // Insert the element if the queue is not full.
112         queueT_.push(elem);
113         cvNotEmpty_.notify_one();
114         return true;
115     }
116 
117 /**
118  * @brief Removes the first element from this queue in non-blocking mode.
119  *
120  * If the queue is empty, <b>false</b> is returned directly.
121  * If the queue is not empty, the pop operation can be performed, one of the
122  * push threads (blocked when the queue is full) is woken up, and <b>true</b>
123  * is returned.
124  *
125  * @param outtask Indicates the data of the pop operation.
126  */
PopNotWait(T & outtask)127     bool PopNotWait(T& outtask)
128     {
129         std::unique_lock<std::mutex> lock(mutexLock_);
130         if (queueT_.empty()) {
131             return false;
132         }
133         outtask = queueT_.front();
134         queueT_.pop();
135 
136         cvNotFull_.notify_one();
137 
138         return true;
139     }
140 
Size()141     unsigned int Size()
142     {
143         std::unique_lock<std::mutex> lock(mutexLock_);
144         return queueT_.size();
145     }
146 
IsEmpty()147     bool IsEmpty()
148     {
149         std::unique_lock<std::mutex> lock(mutexLock_);
150         return queueT_.empty();
151     }
152 
IsFull()153     bool IsFull()
154     {
155         std::unique_lock<std::mutex> lock(mutexLock_);
156         return queueT_.size() == maxSize_;
157     }
158 
~SafeBlockQueue()159     virtual ~SafeBlockQueue() {}
160 
161 protected:
162     unsigned long maxSize_;  // Capacity of the queue
163     std::mutex mutexLock_;
164     std::condition_variable cvNotEmpty_;
165     std::condition_variable cvNotFull_;
166     std::queue<T> queueT_;
167 };
168 
169 /**
170  * @brief Provides interfaces for operating the thread-safe blocking queues
171  * and tracking the number of pending tasks.
172  * This class inherits from <b>SafeBlockQueue</b>.
173  */
174 template <typename T>
175 class SafeBlockQueueTracking : public SafeBlockQueue<T> {
176 public:
SafeBlockQueueTracking(int capacity)177     explicit SafeBlockQueueTracking(int capacity) : SafeBlockQueue<T>(capacity)
178     {
179         unfinishedTaskCount_ = 0;
180     }
181 
~SafeBlockQueueTracking()182     virtual ~SafeBlockQueueTracking() {}
183 
184 /**
185  * @brief Inserts an element at the end of this queue in blocking mode.
186  *
187  * If the queue is full, the thread of the push operation will be blocked
188  * until the queue has space.
189  * If the queue is not full, the push operation can be performed and one of the
190  * pop threads (blocked when the queue is empty) is woken up.
191  */
Push(T const & elem)192     virtual void Push(T const& elem)
193     {
194         unfinishedTaskCount_++;
195         std::unique_lock<std::mutex> lock(mutexLock_);
196         while (queueT_.size() >= maxSize_) {
197             // If the queue is full, wait for jobs to be taken.
198             cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); });
199         }
200 
201         // If the queue is not full, insert the element.
202         queueT_.push(elem);
203 
204         cvNotEmpty_.notify_one();
205     }
206 
207 /**
208  * @brief Inserts an element at the end of this queue in non-blocking mode.
209  *
210  * If the queue is full, <b>false</b> is returned directly.
211  * If the queue is not full, the push operation can be performed,
212  * one of the pop threads (blocked when the queue is empty) is woken up,
213  * and <b>true</b> is returned.
214  */
PushNoWait(T const & elem)215     virtual bool PushNoWait(T const& elem)
216     {
217         std::unique_lock<std::mutex> lock(mutexLock_);
218         if (queueT_.size() >= maxSize_) {
219             return false;
220         }
221         // Insert the element if the queue is not full.
222         queueT_.push(elem);
223         unfinishedTaskCount_++;
224         cvNotEmpty_.notify_one();
225         return true;
226     }
227 
228 /**
229  * @brief Called to return the result when a task is complete.
230  *
231  * If the count of unfinished tasks < 1, <b>false</b> is returned directly.
232  * If the count of unfinished tasks = 1, all the threads blocked
233  * by calling Join() will be woken up,
234  * the count of unfinished tasks decrements by 1, and <b>true</b> is returned.
235  * If the count of unfinished tasks > 1,
236  * the count of unfinished tasks decrements by 1, and <b>true</b> is returned.
237  */
OneTaskDone()238     bool OneTaskDone()
239     {
240         std::unique_lock<std::mutex> lock(mutexLock_);
241         int unfinished = unfinishedTaskCount_ - 1;
242 
243         if (unfinished <= 0) {
244             if (unfinished < 0) {
245                 return false; // false mean call elem done too many times
246             }
247             cvAllTasksDone_.notify_all();
248         }
249 
250         unfinishedTaskCount_ = unfinished;
251         return true;
252     }
253 
254 /**
255  * @brief Waits for all tasks to complete.
256  *
257  * If there is any task not completed, the current thread will be
258  * blocked even if it is just woken up.
259  */
Join()260     void Join()
261     {
262         std::unique_lock<std::mutex> lock(mutexLock_);
263         cvAllTasksDone_.wait(lock, [&] { return unfinishedTaskCount_ == 0; });
264     }
265 
266 /**
267  * @brief Obtains the number of unfinished tasks.
268  */
GetUnfinishTaskNum()269     int GetUnfinishTaskNum()
270     {
271         return unfinishedTaskCount_;
272     }
273 
274 protected:
275     using SafeBlockQueue<T>::maxSize_;
276     using SafeBlockQueue<T>::mutexLock_;
277     using SafeBlockQueue<T>::cvNotEmpty_;
278     using SafeBlockQueue<T>::cvNotFull_;
279     using SafeBlockQueue<T>::queueT_;
280 
281     std::atomic<int> unfinishedTaskCount_;
282     std::condition_variable cvAllTasksDone_;
283 };
284 
285 } // namespace OHOS
286 
287 #endif
288