1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "concurrent_queue.h"
17 #include <climits>
18 #include "dfx/log/ffrt_log_api.h"
19 #include "tm/queue_task.h"
20 #include "eu/loop.h"
21
22 namespace {
GetMinMapTime(const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec)23 uint64_t GetMinMapTime(const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec)
24 {
25 uint64_t minTime = std::numeric_limits<uint64_t>::max();
26
27 for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
28 if (!whenMapVec[idx].empty()) {
29 auto it = whenMapVec[idx].begin();
30 if (it->first < minTime) {
31 minTime = it->first;
32 }
33 }
34 }
35 return minTime;
36 }
37
WhenMapVecEmpty(const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec)38 bool WhenMapVecEmpty(const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec)
39 {
40 for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
41 if (!whenMapVec[idx].empty()) {
42 return false;
43 }
44 }
45 return true;
46 }
47 }
48
49 namespace ffrt {
DelayTaskCb(void * task)50 static void DelayTaskCb(void* task)
51 {
52 static_cast<QueueTask*>(task)->Execute();
53 }
54
~ConcurrentQueue()55 ConcurrentQueue::~ConcurrentQueue()
56 {
57 FFRT_LOGI("destruct concurrent queueId=%u leave", queueId_);
58 }
59
Push(QueueTask * task)60 int ConcurrentQueue::Push(QueueTask* task)
61 {
62 std::unique_lock lock(mutex_);
63 FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_);
64 ffrt_queue_priority_t taskPriority = task->GetPriority();
65 if (taskPriority > ffrt_queue_priority_idle) {
66 task->SetPriority(ffrt_queue_priority_low);
67 taskPriority = task->GetPriority();
68 }
69
70 if (waitingAll_) {
71 waitingMap_.insert({task->GetUptime(), task});
72 return SUCC;
73 }
74
75 if (loop_ != nullptr) {
76 if (task->GetDelay() == 0) {
77 whenMapVec_[taskPriority].insert({task->GetUptime(), task});
78 loop_->WakeUp();
79 return SUCC;
80 }
81 return PushDelayTaskToTimer(task);
82 }
83 return PushAndCalConcurrency(task, taskPriority, lock, true);
84 }
85
Pull()86 QueueTask* ConcurrentQueue::Pull()
87 {
88 std::unique_lock lock(mutex_);
89 // wait for delay task
90 uint64_t now = GetNow();
91 if (loop_ != nullptr) {
92 if (!WhenMapVecEmpty(whenMapVec_) && now >= GetMinMapTime(whenMapVec_) && !isExit_) {
93 return dequeFunc_(queueId_, now, whenMapVec_, nullptr);
94 }
95 return nullptr;
96 }
97
98 uint64_t minMaptime = GetMinMapTime(whenMapVec_);
99 while (!WhenMapVecEmpty(whenMapVec_) && now < minMaptime && !isExit_) {
100 uint64_t diff = minMaptime - now;
101 FFRT_LOGD("[queueId=%u] stuck in %llu us wait", queueId_, diff);
102 delayStatus_.store(true);
103 cond_.wait_for(lock, std::chrono::microseconds(diff));
104 delayStatus_.store(false);
105 FFRT_LOGD("[queueId=%u] wakeup from wait", queueId_);
106 now = GetNow();
107 minMaptime = GetMinMapTime(whenMapVec_);
108 }
109
110 // abort dequeue in abnormal scenarios
111 if (WhenMapVecEmpty(whenMapVec_)) {
112 int oldValue = concurrency_.fetch_sub(1); // 取不到后继的task,当前这个task正式退出
113 FFRT_LOGD("concurrency[%d] - 1 [queueId=%u] switch into inactive", oldValue, queueId_);
114 if (oldValue == 1) {
115 cond_.notify_all();
116 }
117 return nullptr;
118 }
119 FFRT_COND_DO_ERR(isExit_, return nullptr, "cannot pull task, [queueId=%u] is exiting", queueId_);
120
121 // dequeue next expired task by priority
122 return dequeFunc_(queueId_, now, whenMapVec_, nullptr);
123 }
124
Remove()125 int ConcurrentQueue::Remove()
126 {
127 std::lock_guard lock(mutex_);
128 uint64_t removeCount = 0;
129 for (auto& currentMap : whenMapVec_) {
130 removeCount += BaseQueue::Remove(currentMap);
131 }
132 return removeCount + BaseQueue::Remove(waitingMap_);
133 }
134
Remove(const char * name)135 int ConcurrentQueue::Remove(const char* name)
136 {
137 std::lock_guard lock(mutex_);
138 uint64_t removeCount = 0;
139 for (auto& currentMap : whenMapVec_) {
140 removeCount += BaseQueue::Remove(name, currentMap);
141 }
142 return removeCount + BaseQueue::Remove(name, waitingMap_);
143 }
144
Remove(const QueueTask * task)145 int ConcurrentQueue::Remove(const QueueTask* task)
146 {
147 std::lock_guard lock(mutex_);
148 for (auto& currentMap : whenMapVec_) {
149 if (BaseQueue::Remove(task, currentMap) == SUCC) {
150 return SUCC;
151 }
152 }
153 return BaseQueue::Remove(task, waitingMap_);
154 }
155
Stop()156 void ConcurrentQueue::Stop()
157 {
158 std::lock_guard lock(mutex_);
159 isExit_ = true;
160
161 for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
162 for (auto it = whenMapVec_[idx].begin(); it != whenMapVec_[idx].end(); it++) {
163 if (it->second) {
164 it->second->Notify();
165 it->second->Destroy();
166 }
167 }
168 whenMapVec_[idx].clear();
169 }
170 Stop(waitingMap_);
171 if (loop_ == nullptr) {
172 cond_.notify_all();
173 }
174
175 FFRT_LOGI("clear [queueId=%u] succ", queueId_);
176 }
177
WaitAll()178 int ConcurrentQueue::WaitAll()
179 {
180 std::unique_lock lock(mutex_);
181 FFRT_COND_DO_ERR(isOnLoop_, return -1, "loop does not support");
182 FFRT_COND_DO_ERR(isExit_, return -1, "cannot waiting task, [queueId=%u] is exiting", queueId_);
183
184 if (waitingAll_) {
185 return 1;
186 }
187
188 waitingAll_ = true;
189 cond_.wait(lock, [this] { return concurrency_.load() == 0;}); // 是否需要加上whenMap_empty()
190
191 if (waitingMap_.empty()) {
192 waitingAll_ = false;
193 return 0 ;
194 }
195
196 // resubmit tasks to the queue and wake up workers
197 for (const auto& iter : waitingMap_) {
198 QueueTask* task = iter.second;
199 QueueHandler* handler = task->GetHandler();
200 int ret = PushAndCalConcurrency(task, task->GetPriority(), lock, false);
201 if (ret == CONCURRENT) {
202 if (task->GetDelay() == 0) {
203 handler->TransferTask(task);
204 } else {
205 handler->TransferInitTask();
206 }
207 }
208 }
209 waitingAll_ = false;
210 waitingMap_.clear();
211 return 0;
212 }
213
SetLoop(Loop * loop)214 bool ConcurrentQueue::SetLoop(Loop* loop)
215 {
216 if (loop == nullptr || loop_ != nullptr) {
217 FFRT_SYSEVENT_LOGE("queueId %s should bind to loop invalid", queueId_);
218 return false;
219 }
220
221 loop_ = loop;
222 isOnLoop_.store(true);
223 return true;
224 }
225
PushDelayTaskToTimer(QueueTask * task)226 int ConcurrentQueue::PushDelayTaskToTimer(QueueTask* task)
227 {
228 uint64_t delayMs = (task->GetDelay() - 1) / 1000 + 1;
229 int timeout = delayMs > INT_MAX ? INT_MAX : delayMs;
230 if (loop_->TimerStart(timeout, task, DelayTaskCb, false) < 0) {
231 FFRT_SYSEVENT_LOGE("push delay queue task to timer fail");
232 return FAILED;
233 }
234 return SUCC;
235 }
236
PushAndCalConcurrency(QueueTask * task,ffrt_queue_priority_t taskPriority,std::unique_lock<ffrt::mutex> & lock,bool needUnlock)237 int ConcurrentQueue::PushAndCalConcurrency(QueueTask* task, ffrt_queue_priority_t taskPriority,
238 std::unique_lock<ffrt::mutex>& lock, bool needUnlock)
239 {
240 if (concurrency_.load() < maxConcurrency_) {
241 int oldValue = concurrency_.fetch_add(1);
242 FFRT_LOGD("task [gid=%llu] concurrency[%u] + 1 [queueId=%u]", task->gid, oldValue, queueId_);
243
244 if (task->GetDelay() > 0) {
245 whenMapVec_[taskPriority].insert({task->GetUptime(), task});
246 }
247
248 return CONCURRENT;
249 }
250
251 whenMapVec_[taskPriority].insert({task->GetUptime(), task});
252 if (task == whenMapVec_[taskPriority].begin()->second) {
253 if (needUnlock) {
254 lock.unlock();
255 }
256 cond_.notify_all();
257 }
258 return SUCC;
259 }
260
Stop(std::multimap<uint64_t,QueueTask * > & whenMap)261 void ConcurrentQueue::Stop(std::multimap<uint64_t, QueueTask*>& whenMap)
262 {
263 for (auto it = whenMap.begin(); it != whenMap.end(); it++) {
264 if (it->second) {
265 it->second->Notify();
266 it->second->Destroy();
267 }
268 }
269
270 whenMap.clear();
271 }
272
CreateConcurrentQueue(const ffrt_queue_attr_t * attr)273 std::unique_ptr<BaseQueue> CreateConcurrentQueue(const ffrt_queue_attr_t* attr)
274 {
275 int maxConcurrency = ffrt_queue_attr_get_max_concurrency(attr) <= 0 ? 1 : ffrt_queue_attr_get_max_concurrency(attr);
276 return std::make_unique<ConcurrentQueue>(maxConcurrency);
277 }
278
HasTask(const char * name)279 bool ConcurrentQueue::HasTask(const char* name)
280 {
281 std::lock_guard lock(mutex_);
282 for (auto& currentMap : whenMapVec_) {
283 if (BaseQueue::HasTask(name, currentMap)) {
284 return true;
285 }
286 }
287 return false;
288 }
289
GetHeadTask()290 std::vector<QueueTask*> ConcurrentQueue::GetHeadTask()
291 {
292 std::lock_guard lock(mutex_);
293 if (WhenMapVecEmpty(whenMapVec_)) {
294 return {};
295 }
296
297 allWhenmapTask.clear();
298
299 for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
300 if (!whenMapVec_[idx].empty()) {
301 for (const auto& [upTime, qtask] : whenMapVec_[idx]) {
302 allWhenmapTask.emplace_back(upTime, qtask);
303 }
304 }
305 }
306
307 std::sort(allWhenmapTask.begin(), allWhenmapTask.end(),
308 [](const auto& lhs, const auto& rhs) {
309 return lhs.first < rhs.first;
310 });
311
312 for (size_t i = 0; i < maxConcurrency_; i++) {
313 if (i < allWhenmapTask.size()) {
314 headTaskVec_[i] = allWhenmapTask[i].second;
315 } else {
316 headTaskVec_[i] = nullptr;
317 }
318 }
319 return headTaskVec_;
320 }
321 } // namespace ffrt
322