• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef UTILS_BASE_BLOCK_QUEUE_H
17 #define UTILS_BASE_BLOCK_QUEUE_H
18 
19 #include <climits>
20 #include <condition_variable>
21 #include <mutex>
22 #include <queue>
23 #include <atomic>
24 
25 namespace OHOS {
26 
27 template <typename T>
28 class SafeBlockQueue {
29 public:
SafeBlockQueue(int capacity)30     SafeBlockQueue(int capacity) : maxSize_(capacity)
31     {
32     }
33 
Push(T const & elem)34     virtual void Push(T const& elem)
35     {
36         std::unique_lock<std::mutex> lock(mutexLock_);
37         while (queueT_.size() >= maxSize_) {
38             // queue full , waiting for jobs to be taken
39             cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); });
40         }
41 
42         // here means not full we can push in
43         queueT_.push(elem);
44         cvNotEmpty_.notify_one();
45     }
46 
Pop()47     T Pop()
48     {
49         std::unique_lock<std::mutex> lock(mutexLock_);
50 
51         while (queueT_.empty()) {
52             // queue empty, waiting for tasks to be Push
53             cvNotEmpty_.wait(lock, [&] { return !queueT_.empty(); });
54         }
55 
56         T elem = queueT_.front();
57         queueT_.pop();
58         cvNotFull_.notify_one();
59         return elem;
60     }
61 
PushNoWait(T const & elem)62     virtual bool PushNoWait(T const& elem)
63     {
64         std::unique_lock<std::mutex> lock(mutexLock_);
65         if (queueT_.size() >= maxSize_) {
66             return false;
67         }
68         // here means not full we can push in
69         queueT_.push(elem);
70         cvNotEmpty_.notify_one();
71         return true;
72     }
73 
PopNotWait(T & outtask)74     bool PopNotWait(T& outtask)
75     {
76         std::unique_lock<std::mutex> lock(mutexLock_);
77         if (queueT_.empty()) {
78             return false;
79         }
80         outtask = queueT_.front();
81         queueT_.pop();
82 
83         cvNotFull_.notify_one();
84 
85         return true;
86     }
87 
Size()88     unsigned int Size()
89     {
90         std::unique_lock<std::mutex> lock(mutexLock_);
91         return queueT_.size();
92     }
93 
IsEmpty()94     bool IsEmpty()
95     {
96         std::unique_lock<std::mutex> lock(mutexLock_);
97         return queueT_.empty();
98     }
99 
IsFull()100     bool IsFull()
101     {
102         std::unique_lock<std::mutex> lock(mutexLock_);
103         return queueT_.size() == maxSize_;
104     }
105 
~SafeBlockQueue()106     virtual ~SafeBlockQueue() {};
107 
108 protected:
109     unsigned long maxSize_;
110     std::mutex mutexLock_;
111     std::condition_variable cvNotEmpty_;
112     std::condition_variable cvNotFull_;
113     std::queue<T> queueT_;
114 };
115 
116 template <typename T>
117 class SafeBlockQueueTracking : public SafeBlockQueue<T> {
118 public:
SafeBlockQueueTracking(int capacity)119     SafeBlockQueueTracking(int capacity) : SafeBlockQueue<T>(capacity)
120     {
121         unfinishedTaskCount_ = 0;
122     }
123 
~SafeBlockQueueTracking()124     virtual ~SafeBlockQueueTracking() {};
125 
Push(T const & elem)126     virtual void Push(T const& elem)
127     {
128         unfinishedTaskCount_++;
129         std::unique_lock<std::mutex> lock(mutexLock_);
130         while (queueT_.size() >= maxSize_) {
131             // queue full , waiting for jobs to be taken
132             cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); });
133         }
134 
135         // here means not full we can push in
136         queueT_.push(elem);
137 
138         cvNotEmpty_.notify_one();
139     }
140 
PushNoWait(T const & elem)141     virtual bool PushNoWait(T const& elem)
142     {
143         std::unique_lock<std::mutex> lock(mutexLock_);
144         if (queueT_.size() >= maxSize_) {
145             return false;
146         }
147         // here means not full we can push in
148         queueT_.push(elem);
149         unfinishedTaskCount_++;
150         cvNotEmpty_.notify_one();
151         return true;
152     }
153 
OneTaskDone()154     bool OneTaskDone()
155     {
156         std::unique_lock<std::mutex> lock(mutexLock_);
157         int unfinished = unfinishedTaskCount_ - 1;
158 
159         if (unfinished <= 0) {
160             if (unfinished < 0) {
161                 return false; // false mean call elem done too many times
162             }
163             cvAllTasksDone_.notify_all();
164         }
165 
166         unfinishedTaskCount_ = unfinished;
167         return true;
168     }
169 
Join()170     void Join()
171     {
172         std::unique_lock<std::mutex> lock(mutexLock_);
173         cvAllTasksDone_.wait(lock, [&] { return unfinishedTaskCount_ == 0; });
174     }
175 
GetUnfinishTaskNum()176     int GetUnfinishTaskNum()
177     {
178         return unfinishedTaskCount_;
179     }
180 
181 protected:
182     using SafeBlockQueue<T>::maxSize_;
183     using SafeBlockQueue<T>::mutexLock_;
184     using SafeBlockQueue<T>::cvNotEmpty_;
185     using SafeBlockQueue<T>::cvNotFull_;
186     using SafeBlockQueue<T>::queueT_;
187 
188     std::atomic<int> unfinishedTaskCount_;
189     std::condition_variable cvAllTasksDone_;
190 };
191 
192 } // namespace OHOS
193 
194 #endif
195