1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef UTILS_BASE_BLOCK_QUEUE_H 17 #define UTILS_BASE_BLOCK_QUEUE_H 18 19 #include <climits> 20 #include <condition_variable> 21 #include <mutex> 22 #include <queue> 23 #include <atomic> 24 25 namespace OHOS { 26 27 template <typename T> 28 class SafeBlockQueue { 29 public: SafeBlockQueue(int capacity)30 SafeBlockQueue(int capacity) : maxSize_(capacity) 31 { 32 } 33 Push(T const & elem)34 virtual void Push(T const& elem) 35 { 36 std::unique_lock<std::mutex> lock(mutexLock_); 37 while (queueT_.size() >= maxSize_) { 38 // queue full , waiting for jobs to be taken 39 cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); }); 40 } 41 42 // here means not full we can push in 43 queueT_.push(elem); 44 cvNotEmpty_.notify_one(); 45 } 46 Pop()47 T Pop() 48 { 49 std::unique_lock<std::mutex> lock(mutexLock_); 50 51 while (queueT_.empty()) { 52 // queue empty, waiting for tasks to be Push 53 cvNotEmpty_.wait(lock, [&] { return !queueT_.empty(); }); 54 } 55 56 T elem = queueT_.front(); 57 queueT_.pop(); 58 cvNotFull_.notify_one(); 59 return elem; 60 } 61 PushNoWait(T const & elem)62 virtual bool PushNoWait(T const& elem) 63 { 64 std::unique_lock<std::mutex> lock(mutexLock_); 65 if (queueT_.size() >= maxSize_) { 66 return false; 67 } 68 // here means not full we can push in 69 queueT_.push(elem); 70 cvNotEmpty_.notify_one(); 71 return true; 72 } 73 PopNotWait(T & outtask)74 bool PopNotWait(T& outtask) 75 { 76 std::unique_lock<std::mutex> lock(mutexLock_); 77 if (queueT_.empty()) { 78 return false; 79 } 80 outtask = queueT_.front(); 81 queueT_.pop(); 82 83 cvNotFull_.notify_one(); 84 85 return true; 86 } 87 Size()88 unsigned int Size() 89 { 90 std::unique_lock<std::mutex> lock(mutexLock_); 91 return queueT_.size(); 92 } 93 IsEmpty()94 bool IsEmpty() 95 { 96 std::unique_lock<std::mutex> lock(mutexLock_); 97 return queueT_.empty(); 98 } 99 IsFull()100 bool IsFull() 101 { 102 std::unique_lock<std::mutex> lock(mutexLock_); 103 return queueT_.size() == maxSize_; 104 } 105 ~SafeBlockQueue()106 virtual ~SafeBlockQueue() {}; 107 108 protected: 109 unsigned long maxSize_; 110 std::mutex mutexLock_; 111 std::condition_variable cvNotEmpty_; 112 std::condition_variable cvNotFull_; 113 std::queue<T> queueT_; 114 }; 115 116 template <typename T> 117 class SafeBlockQueueTracking : public SafeBlockQueue<T> { 118 public: SafeBlockQueueTracking(int capacity)119 SafeBlockQueueTracking(int capacity) : SafeBlockQueue<T>(capacity) 120 { 121 unfinishedTaskCount_ = 0; 122 } 123 ~SafeBlockQueueTracking()124 virtual ~SafeBlockQueueTracking() {}; 125 Push(T const & elem)126 virtual void Push(T const& elem) 127 { 128 unfinishedTaskCount_++; 129 std::unique_lock<std::mutex> lock(mutexLock_); 130 while (queueT_.size() >= maxSize_) { 131 // queue full , waiting for jobs to be taken 132 cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); }); 133 } 134 135 // here means not full we can push in 136 queueT_.push(elem); 137 138 cvNotEmpty_.notify_one(); 139 } 140 PushNoWait(T const & elem)141 virtual bool PushNoWait(T const& elem) 142 { 143 std::unique_lock<std::mutex> lock(mutexLock_); 144 if (queueT_.size() >= maxSize_) { 145 return false; 146 } 147 // here means not full we can push in 148 queueT_.push(elem); 149 unfinishedTaskCount_++; 150 cvNotEmpty_.notify_one(); 151 return true; 152 } 153 OneTaskDone()154 bool OneTaskDone() 155 { 156 std::unique_lock<std::mutex> lock(mutexLock_); 157 int unfinished = unfinishedTaskCount_ - 1; 158 159 if (unfinished <= 0) { 160 if (unfinished < 0) { 161 return false; // false mean call elem done too many times 162 } 163 cvAllTasksDone_.notify_all(); 164 } 165 166 unfinishedTaskCount_ = unfinished; 167 return true; 168 } 169 Join()170 void Join() 171 { 172 std::unique_lock<std::mutex> lock(mutexLock_); 173 cvAllTasksDone_.wait(lock, [&] { return unfinishedTaskCount_ == 0; }); 174 } 175 GetUnfinishTaskNum()176 int GetUnfinishTaskNum() 177 { 178 return unfinishedTaskCount_; 179 } 180 181 protected: 182 using SafeBlockQueue<T>::maxSize_; 183 using SafeBlockQueue<T>::mutexLock_; 184 using SafeBlockQueue<T>::cvNotEmpty_; 185 using SafeBlockQueue<T>::cvNotFull_; 186 using SafeBlockQueue<T>::queueT_; 187 188 std::atomic<int> unfinishedTaskCount_; 189 std::condition_variable cvAllTasksDone_; 190 }; 191 192 } // namespace OHOS 193 194 #endif 195