1 /* 2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 /** 17 * @file safe_block_queue.h 18 * 19 * Provides interfaces for thread-safe blocking queues in c_utils. 20 * The file includes the <b>SafeBlockQueue</b> class and 21 * the <b>SafeBlockQueueTracking</b> class for trackable tasks. 22 */ 23 24 #ifndef UTILS_BASE_BLOCK_QUEUE_H 25 #define UTILS_BASE_BLOCK_QUEUE_H 26 27 #include <climits> 28 #include <condition_variable> 29 #include <mutex> 30 #include <queue> 31 #include <atomic> 32 33 namespace OHOS { 34 35 /** 36 * @brief Provides interfaces for thread-safe blocking queues. 37 * 38 * The interfaces can be used to perform blocking and non-blocking push and 39 * pop operations on queues. 40 */ 41 template <typename T> 42 class SafeBlockQueue { 43 public: SafeBlockQueue(int capacity)44 explicit SafeBlockQueue(int capacity) : maxSize_(capacity) 45 { 46 } 47 48 /** 49 * @brief Inserts an element at the end of this queue in blocking mode. 50 * 51 * If the queue is full, the thread of the push operation will be blocked 52 * until the queue has space. 53 * If the queue is not full, the push operation can be performed and one of the 54 * pop threads (blocked when the queue is empty) is woken up. 55 * 56 * @param elem Indicates the element to insert. 57 */ Push(T const & elem)58 virtual void Push(T const& elem) 59 { 60 std::unique_lock<std::mutex> lock(mutexLock_); 61 while (queueT_.size() >= maxSize_) { 62 // If the queue is full, wait for jobs to be taken. 63 cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); }); 64 } 65 66 // Insert the element into the queue if the queue is not full. 67 queueT_.push(elem); 68 cvNotEmpty_.notify_one(); 69 } 70 71 /** 72 * @brief Removes the first element from this queue in blocking mode. 73 * 74 * If the queue is empty, the thread of the pop operation will be blocked 75 * until the queue has elements. 76 * If the queue is not empty, the pop operation can be performed, the first 77 * element of the queue is returned, and one of the push threads (blocked 78 * when the queue is full) is woken up. 79 */ Pop()80 T Pop() 81 { 82 std::unique_lock<std::mutex> lock(mutexLock_); 83 84 while (queueT_.empty()) { 85 // If the queue is empty, wait for elements to be pushed in. 86 cvNotEmpty_.wait(lock, [&] { return !queueT_.empty(); }); 87 } 88 89 T elem = queueT_.front(); 90 queueT_.pop(); 91 cvNotFull_.notify_one(); 92 return elem; 93 } 94 95 /** 96 * @brief Inserts an element at the end of this queue in non-blocking mode. 97 * 98 * If the queue is full, <b>false</b> is returned directly. 99 * If the queue is not full, the push operation can be performed, one of the 100 * pop threads (blocked when the queue is empty) is woken up, and <b>true</b> 101 * is returned. 102 * 103 * @param elem Indicates the element to insert. 104 */ PushNoWait(T const & elem)105 virtual bool PushNoWait(T const& elem) 106 { 107 std::unique_lock<std::mutex> lock(mutexLock_); 108 if (queueT_.size() >= maxSize_) { 109 return false; 110 } 111 // Insert the element if the queue is not full. 112 queueT_.push(elem); 113 cvNotEmpty_.notify_one(); 114 return true; 115 } 116 117 /** 118 * @brief Removes the first element from this queue in non-blocking mode. 119 * 120 * If the queue is empty, <b>false</b> is returned directly. 121 * If the queue is not empty, the pop operation can be performed, one of the 122 * push threads (blocked when the queue is full) is woken up, and <b>true</b> 123 * is returned. 124 * 125 * @param outtask Indicates the data of the pop operation. 126 */ PopNotWait(T & outtask)127 bool PopNotWait(T& outtask) 128 { 129 std::unique_lock<std::mutex> lock(mutexLock_); 130 if (queueT_.empty()) { 131 return false; 132 } 133 outtask = queueT_.front(); 134 queueT_.pop(); 135 136 cvNotFull_.notify_one(); 137 138 return true; 139 } 140 Size()141 unsigned int Size() 142 { 143 std::unique_lock<std::mutex> lock(mutexLock_); 144 return queueT_.size(); 145 } 146 IsEmpty()147 bool IsEmpty() 148 { 149 std::unique_lock<std::mutex> lock(mutexLock_); 150 return queueT_.empty(); 151 } 152 IsFull()153 bool IsFull() 154 { 155 std::unique_lock<std::mutex> lock(mutexLock_); 156 return queueT_.size() == maxSize_; 157 } 158 ~SafeBlockQueue()159 virtual ~SafeBlockQueue() {} 160 161 protected: 162 unsigned long maxSize_; // Capacity of the queue 163 std::mutex mutexLock_; 164 std::condition_variable cvNotEmpty_; 165 std::condition_variable cvNotFull_; 166 std::queue<T> queueT_; 167 }; 168 169 /** 170 * @brief Provides interfaces for operating the thread-safe blocking queues 171 * and tracking the number of pending tasks. 172 * This class inherits from <b>SafeBlockQueue</b>. 173 */ 174 template <typename T> 175 class SafeBlockQueueTracking : public SafeBlockQueue<T> { 176 public: SafeBlockQueueTracking(int capacity)177 explicit SafeBlockQueueTracking(int capacity) : SafeBlockQueue<T>(capacity) 178 { 179 unfinishedTaskCount_ = 0; 180 } 181 ~SafeBlockQueueTracking()182 virtual ~SafeBlockQueueTracking() {} 183 184 /** 185 * @brief Inserts an element at the end of this queue in blocking mode. 186 * 187 * If the queue is full, the thread of the push operation will be blocked 188 * until the queue has space. 189 * If the queue is not full, the push operation can be performed and one of the 190 * pop threads (blocked when the queue is empty) is woken up. 191 */ Push(T const & elem)192 virtual void Push(T const& elem) 193 { 194 unfinishedTaskCount_++; 195 std::unique_lock<std::mutex> lock(mutexLock_); 196 while (queueT_.size() >= maxSize_) { 197 // If the queue is full, wait for jobs to be taken. 198 cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); }); 199 } 200 201 // If the queue is not full, insert the element. 202 queueT_.push(elem); 203 204 cvNotEmpty_.notify_one(); 205 } 206 207 /** 208 * @brief Inserts an element at the end of this queue in non-blocking mode. 209 * 210 * If the queue is full, <b>false</b> is returned directly. 211 * If the queue is not full, the push operation can be performed, 212 * one of the pop threads (blocked when the queue is empty) is woken up, 213 * and <b>true</b> is returned. 214 */ PushNoWait(T const & elem)215 virtual bool PushNoWait(T const& elem) 216 { 217 std::unique_lock<std::mutex> lock(mutexLock_); 218 if (queueT_.size() >= maxSize_) { 219 return false; 220 } 221 // Insert the element if the queue is not full. 222 queueT_.push(elem); 223 unfinishedTaskCount_++; 224 cvNotEmpty_.notify_one(); 225 return true; 226 } 227 228 /** 229 * @brief Called to return the result when a task is complete. 230 * 231 * If the count of unfinished tasks < 1, <b>false</b> is returned directly. 232 * If the count of unfinished tasks = 1, all the threads blocked 233 * by calling Join() will be woken up, 234 * the count of unfinished tasks decrements by 1, and <b>true</b> is returned. 235 * If the count of unfinished tasks > 1, 236 * the count of unfinished tasks decrements by 1, and <b>true</b> is returned. 237 */ OneTaskDone()238 bool OneTaskDone() 239 { 240 std::unique_lock<std::mutex> lock(mutexLock_); 241 int unfinished = unfinishedTaskCount_ - 1; 242 243 if (unfinished <= 0) { 244 if (unfinished < 0) { 245 return false; // false mean call elem done too many times 246 } 247 cvAllTasksDone_.notify_all(); 248 } 249 250 unfinishedTaskCount_ = unfinished; 251 return true; 252 } 253 254 /** 255 * @brief Waits for all tasks to complete. 256 * 257 * If there is any task not completed, the current thread will be 258 * blocked even if it is just woken up. 259 */ Join()260 void Join() 261 { 262 std::unique_lock<std::mutex> lock(mutexLock_); 263 cvAllTasksDone_.wait(lock, [&] { return unfinishedTaskCount_ == 0; }); 264 } 265 266 /** 267 * @brief Obtains the number of unfinished tasks. 268 */ GetUnfinishTaskNum()269 int GetUnfinishTaskNum() 270 { 271 return unfinishedTaskCount_; 272 } 273 274 protected: 275 using SafeBlockQueue<T>::maxSize_; 276 using SafeBlockQueue<T>::mutexLock_; 277 using SafeBlockQueue<T>::cvNotEmpty_; 278 using SafeBlockQueue<T>::cvNotFull_; 279 using SafeBlockQueue<T>::queueT_; 280 281 std::atomic<int> unfinishedTaskCount_; 282 std::condition_variable cvAllTasksDone_; 283 }; 284 285 } // namespace OHOS 286 287 #endif 288