1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "queue_handler.h"
16 #include <sys/syscall.h>
17 #include <sstream>
18 #include "dfx/log/ffrt_log_api.h"
19 #include "dfx/trace_record/ffrt_trace_record.h"
20 #include "util/event_handler_adapter.h"
21 #include "util/ffrt_facade.h"
22 #include "util/slab.h"
23 #include "tm/queue_task.h"
24 #include "concurrent_queue.h"
25 #include "eventhandler_adapter_queue.h"
26 #include "sched/scheduler.h"
27
28 namespace {
29 constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
30 constexpr uint32_t STRING_SIZE_MAX = 128;
31 constexpr uint32_t TASK_DONE_WAIT_UNIT = 10;
32 constexpr uint64_t SCHED_TIME_ACC_ERROR_US = 5000; // 5ms
33 constexpr uint32_t CONGESTION_CNT = 5;
34 constexpr uint32_t CONGESTION_TIMEOUT_US = 300000000; // 5min
35 }
36
37 namespace ffrt {
QueueHandler(const char * name,const ffrt_queue_attr_t * attr,const int type)38 QueueHandler::QueueHandler(const char* name, const ffrt_queue_attr_t* attr, const int type)
39 {
40 // parse queue attribute
41 if (attr) {
42 qos_ = (ffrt_queue_attr_get_qos(attr) >= ffrt_qos_background) ? ffrt_queue_attr_get_qos(attr) : qos_;
43 timeout_ = ffrt_queue_attr_get_timeout(attr);
44 timeoutCb_ = ffrt_queue_attr_get_callback(attr);
45 }
46
47 // callback reference counting is to ensure life cycle
48 if (timeout_ > 0 && timeoutCb_ != nullptr) {
49 QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
50 cbTask->IncDeleteRef();
51 }
52
53 queue_ = CreateQueue(type, attr);
54 FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
55
56 if (name != nullptr && std::string(name).size() <= STRING_SIZE_MAX) {
57 name_ = "sq_" + std::string(name) + "_" + std::to_string(GetQueueId());
58 } else {
59 name_ += "sq_unnamed_" + std::to_string(GetQueueId());
60 FFRT_LOGW("failed to set [queueId=%u] name due to invalid name or length.", GetQueueId());
61 }
62
63 FFRTFacade::GetQMInstance().RegisterQueueId(GetQueueId(), this);
64 FFRT_LOGI("construct %s succ, qos[%d]", name_.c_str(), qos_);
65 }
66
~QueueHandler()67 QueueHandler::~QueueHandler()
68 {
69 FFRT_LOGI("destruct %s enter", name_.c_str());
70 // clear tasks in queue
71 CancelAndWait();
72 FFRTFacade::GetQMInstance().ResetQueueStruct(GetQueueId());
73
74 // release callback resource
75 if (timeout_ > 0) {
76 // wait for all delayedWorker to complete.
77 while (delayedCbCnt_.load() > 0) {
78 this_task::sleep_for(std::chrono::microseconds(timeout_));
79 }
80
81 if (timeoutCb_ != nullptr) {
82 QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
83 cbTask->DecDeleteRef();
84 }
85 }
86
87 if (we_ != nullptr) {
88 DelayedRemove(we_->tp, we_);
89 SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
90 }
91 FFRT_LOGI("destruct %s leave", name_.c_str());
92 }
93
SetLoop(Loop * loop)94 bool QueueHandler::SetLoop(Loop* loop)
95 {
96 FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
97 if (queue_->GetQueueType() == ffrt_queue_eventhandler_interactive) {
98 return true;
99 }
100 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
101 return false, "[queueId=%u] type invalid", GetQueueId());
102 return reinterpret_cast<ConcurrentQueue*>(queue_.get())->SetLoop(loop);
103 }
104
ClearLoop()105 bool QueueHandler::ClearLoop()
106 {
107 FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
108 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
109 return false, "[queueId=%u] type invalid", GetQueueId());
110 return reinterpret_cast<ConcurrentQueue*>(queue_.get())->ClearLoop();
111 }
112
PickUpTask()113 QueueTask* QueueHandler::PickUpTask()
114 {
115 FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
116 return queue_->Pull();
117 }
118
Submit(QueueTask * task)119 void QueueHandler::Submit(QueueTask* task)
120 {
121 FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot submit, [queueId=%u] constructed failed", GetQueueId());
122 FFRT_COND_DO_ERR((task == nullptr), return, "input invalid, serial task is nullptr");
123
124 // if qos not specified, qos of the queue is inherited by task
125 if (task->GetQos() == qos_inherit || task->GetQos() == qos_default) {
126 task->SetQos(qos_);
127 }
128
129 uint64_t gid = task->gid;
130 FFRT_SERIAL_QUEUE_TASK_SUBMIT_MARKER(GetQueueId(), gid);
131 FFRTTraceRecord::TaskSubmit(&(task->createTime), &(task->fromTid));
132 #if (FFRT_TRACE_RECORD_LEVEL < FFRT_TRACE_RECORD_LEVEL_1)
133 if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
134 task->fromTid = ExecuteCtx::Cur()->tid;
135 }
136 #endif
137
138 // work after that schedule timeout is set for queue
139 if (task->GetSchedTimeout() > 0) {
140 AddSchedDeadline(task);
141 }
142 if (we_ != nullptr) {
143 CheckOverload();
144 }
145
146 int ret = queue_->Push(task);
147 if (ret == SUCC) {
148 FFRT_LOGD("submit task[%lu] into %s", gid, name_.c_str());
149 return;
150 }
151 if (ret == FAILED) {
152 return;
153 }
154
155 if (!isUsed_.load()) {
156 isUsed_.store(true);
157 }
158
159 // activate queue
160 if (task->GetDelay() == 0) {
161 FFRT_LOGD("task [%llu] activate %s", gid, name_.c_str());
162 TransferTask(task);
163 } else {
164 FFRT_LOGD("task [%llu] with delay [%llu] activate %s", gid, task->GetDelay(), name_.c_str());
165 if (ret == INACTIVE) {
166 queue_->Push(task);
167 }
168 TransferInitTask();
169 }
170 }
171
Cancel()172 void QueueHandler::Cancel()
173 {
174 FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
175 queue_->Remove();
176 }
177
CancelAndWait()178 void QueueHandler::CancelAndWait()
179 {
180 FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancelAndWait, [queueId=%u] constructed failed",
181 GetQueueId());
182 queue_->Stop();
183 while (FFRTFacade::GetQMInstance().QueryQueueStatus(GetQueueId()) || queue_->GetActiveStatus()) {
184 std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
185 }
186 }
187
Cancel(const char * name)188 int QueueHandler::Cancel(const char* name)
189 {
190 FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
191 "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
192 int ret = queue_->Remove(name);
193 if (ret != SUCC) {
194 FFRT_LOGD("cancel task %s failed, task may have been executed", name);
195 }
196
197 return ret;
198 }
199
Cancel(QueueTask * task)200 int QueueHandler::Cancel(QueueTask* task)
201 {
202 FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
203 "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
204 FFRT_COND_DO_ERR((task == nullptr), return INACTIVE, "input invalid, serial task is nullptr");
205
206 if (task->GetSchedTimeout() > 0) {
207 RemoveSchedDeadline(task);
208 }
209
210 int ret = queue_->Remove(task);
211 if (ret == SUCC) {
212 FFRT_LOGD("cancel task[%llu] %s succ", task->gid, task->label.c_str());
213 task->Notify();
214 task->Destroy();
215 } else {
216 FFRT_LOGD("cancel task[%llu] %s failed, task may have been executed", task->gid, task->label.c_str());
217 }
218 return ret;
219 }
220
Dispatch(QueueTask * inTask)221 void QueueHandler::Dispatch(QueueTask* inTask)
222 {
223 QueueTask* nextTask = nullptr;
224 for (QueueTask* task = inTask; task != nullptr; task = nextTask) {
225 // dfx watchdog
226 SetTimeoutMonitor(task);
227 FFRTFacade::GetQMInstance().UpdateQueueInfo(GetQueueId(), task->gid);
228 execTaskId_.store(task->gid);
229
230 // run user task
231 FFRT_LOGD("run task [gid=%llu], queueId=%u", task->gid, GetQueueId());
232 auto f = reinterpret_cast<ffrt_function_header_t*>(task->func_storage);
233 FFRT_SERIAL_QUEUE_TASK_EXECUTE_MARKER(task->gid);
234 FFRTTraceRecord::TaskExecute(&(task->executeTime));
235 if (task->GetSchedTimeout() > 0) {
236 RemoveSchedDeadline(task);
237 }
238
239 uint64_t triggerTime{0};
240 if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
241 triggerTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
242 std::chrono::steady_clock::now().time_since_epoch()).count());
243 }
244
245 f->exec(f);
246 FFRTTraceRecord::TaskDone<ffrt_queue_task>(task->GetQos(), task);
247 if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
248 uint64_t completeTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
249 std::chrono::steady_clock::now().time_since_epoch()).count());
250 reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->PushHistoryTask(task, triggerTime, completeTime);
251 }
252
253 f->destroy(f);
254 task->Notify();
255
256 // run task batch
257 nextTask = task->GetNextTask();
258 if (nextTask == nullptr) {
259 FFRTFacade::GetQMInstance().ResetQueueInfo(GetQueueId());
260 execTaskId_.store(0);
261 if (!queue_->IsOnLoop()) {
262 Deliver();
263 }
264 }
265 task->DecDeleteRef();
266 }
267 }
268
Deliver()269 void QueueHandler::Deliver()
270 {
271 QueueTask* task = queue_->Pull();
272 if (task != nullptr) {
273 TransferTask(task);
274 }
275 }
276
TransferTask(QueueTask * task)277 void QueueHandler::TransferTask(QueueTask* task)
278 {
279 auto entry = &task->fq_we;
280 if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
281 reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->SetCurrentRunningTask(task);
282 }
283 FFRTScheduler* sch = FFRTFacade::GetSchedInstance();
284 FFRT_READY_MARKER(task->gid); // ffrt queue task ready to enque
285 if (!sch->InsertNode(&entry->node, task->GetQos())) {
286 FFRT_LOGE("failed to insert task [%llu] into %s", task->gid, GetQueueId(), name_.c_str());
287 return;
288 }
289 }
290
TransferInitTask()291 void QueueHandler::TransferInitTask()
292 {
293 std::function<void()> initFunc = [] {};
294 auto f = create_function_wrapper(initFunc, ffrt_function_kind_queue);
295 QueueTask* initTask = GetQueueTaskByFuncStorageOffset(f);
296 new (initTask)ffrt::QueueTask(this);
297 initTask->SetQos(qos_);
298 TransferTask(initTask);
299 }
300
SetTimeoutMonitor(QueueTask * task)301 void QueueHandler::SetTimeoutMonitor(QueueTask* task)
302 {
303 if (timeout_ <= 0) {
304 return;
305 }
306
307 task->IncDeleteRef();
308 WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
309 // set delayed worker callback
310 we->cb = ([this, task](WaitEntry* we) {
311 if (!task->GetFinishStatus()) {
312 RunTimeOutCallback(task);
313 }
314 delayedCbCnt_.fetch_sub(1);
315 task->DecDeleteRef();
316 SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
317 });
318
319 // set delayed worker wakeup time
320 std::chrono::microseconds timeout(timeout_);
321 auto now = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
322 we->tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(now + timeout);
323
324 if (!DelayedWakeup(we->tp, we, we->cb)) {
325 task->DecDeleteRef();
326 SimpleAllocator<WaitUntilEntry>::FreeMem(we);
327 FFRT_LOGW("failed to set watchdog for task gid=%llu in %s with timeout [%llu us] ", task->gid,
328 name_.c_str(), timeout_);
329 return;
330 }
331
332 delayedCbCnt_.fetch_add(1);
333 FFRT_LOGD("set watchdog of task gid=%llu of %s succ", task->gid, name_.c_str());
334 }
335
RunTimeOutCallback(QueueTask * task)336 void QueueHandler::RunTimeOutCallback(QueueTask* task)
337 {
338 std::stringstream ss;
339 static std::once_flag flag;
340 static char processName[PROCESS_NAME_BUFFER_LENGTH];
341 std::call_once(flag, []() {
342 GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
343 });
344 std::string processNameStr = std::string(processName);
345 ss << "[Serial_Queue_Timeout_Callback] process name:[" << processNameStr << "], serial queue:[" <<
346 name_ << "], queueId:[" << GetQueueId() << "], serial task gid:[" << task->gid << "], task name:["
347 << task->label << "], execution time exceeds[" << timeout_ << "] us";
348 FFRT_LOGE("%s", ss.str().c_str());
349 if (timeoutCb_ != nullptr) {
350 timeoutCb_->exec(timeoutCb_);
351 }
352 }
353
GetDfxInfo() const354 std::string QueueHandler::GetDfxInfo() const
355 {
356 std::stringstream ss;
357 ss << " queue name [" << name_ << "]";
358 if (queue_ != nullptr) {
359 ss << ", remaining tasks count=" << queue_->GetMapSize();
360 }
361 return ss.str();
362 }
363
IsIdle()364 bool QueueHandler::IsIdle()
365 {
366 FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
367 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
368 return false, "[queueId=%u] type invalid", GetQueueId());
369
370 return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->IsIdle();
371 }
372
SetEventHandler(void * eventHandler)373 void QueueHandler::SetEventHandler(void* eventHandler)
374 {
375 FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
376
377 bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
378 (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
379 FFRT_COND_DO_ERR(typeInvalid, return, "[queueId=%u] type invalid", GetQueueId());
380
381 reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->SetEventHandler(eventHandler);
382 }
383
GetEventHandler()384 void* QueueHandler::GetEventHandler()
385 {
386 FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
387
388 bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
389 (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
390 FFRT_COND_DO_ERR(typeInvalid, return nullptr, "[queueId=%u] type invalid", GetQueueId());
391
392 return reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->GetEventHandler();
393 }
394
Dump(const char * tag,char * buf,uint32_t len,bool historyInfo)395 int QueueHandler::Dump(const char* tag, char* buf, uint32_t len, bool historyInfo)
396 {
397 FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
398 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
399 return -1, "[queueId=%u] type invalid", GetQueueId());
400 return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->Dump(tag, buf, len, historyInfo);
401 }
402
DumpSize(ffrt_inner_queue_priority_t priority)403 int QueueHandler::DumpSize(ffrt_inner_queue_priority_t priority)
404 {
405 FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
406 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
407 return -1, "[queueId=%u] type invalid", GetQueueId());
408 return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->DumpSize(priority);
409 }
410
SendSchedTimer(TimePoint delay)411 void QueueHandler::SendSchedTimer(TimePoint delay)
412 {
413 we_->tp = delay;
414 bool result = DelayedWakeup(we_->tp, we_, we_->cb);
415 while (!result) {
416 FFRT_LOGW("failed to set delayedworker, retry");
417 we_->tp = std::chrono::steady_clock::now() + std::chrono::microseconds(SCHED_TIME_ACC_ERROR_US);
418 result = DelayedWakeup(we_->tp, we_, we_->cb);
419 }
420 }
421
CheckSchedDeadline()422 void QueueHandler::CheckSchedDeadline()
423 {
424 std::vector<uint64_t> timeoutTaskId;
425 // Collecting Timeout Tasks
426 {
427 std::unique_lock lock(mutex_);
428 uint64_t threshold = std::chrono::duration_cast<std::chrono::microseconds>(
429 std::chrono::steady_clock::now().time_since_epoch()).count() + SCHED_TIME_ACC_ERROR_US;
430
431 auto it = schedDeadline_.begin();
432 uint64_t nextDeadline = UINT64_MAX;
433 while (it != schedDeadline_.end()) {
434 if (it->second < threshold) {
435 timeoutTaskId.push_back(it->first->gid);
436 it = schedDeadline_.erase(it);
437 } else {
438 nextDeadline = std::min(nextDeadline, it->second);
439 ++it;
440 }
441 }
442
443 if (schedDeadline_.empty()) {
444 initSchedTimer_ = false;
445 } else {
446 std::chrono::microseconds timeout(nextDeadline);
447 TimePoint tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(
448 std::chrono::steady_clock::time_point() + timeout);
449 FFRT_LOGI("queueId=%u set sched timer", GetQueueId());
450 SendSchedTimer(tp);
451 }
452 }
453
454 // Reporting Timeout Information
455 if (!timeoutTaskId.empty()) {
456 ReportTimeout(timeoutTaskId);
457 }
458 }
459
AddSchedDeadline(QueueTask * task)460 void QueueHandler::AddSchedDeadline(QueueTask* task)
461 {
462 // sched timeout only support serial queues, other queue types will be supported based on service requirements.
463 if (queue_->GetQueueType() != ffrt_queue_serial) {
464 return;
465 }
466
467 std::unique_lock lock(mutex_);
468 schedDeadline_.insert({task, task->GetSchedTimeout() + task->GetUptime()});
469
470 if (!initSchedTimer_) {
471 if (we_ == nullptr) {
472 we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
473 we_->cb = ([this](WaitEntry* we_) { CheckSchedDeadline(); });
474 }
475 std::chrono::microseconds timeout(schedDeadline_[task]);
476 TimePoint tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(
477 std::chrono::steady_clock::time_point() + timeout);
478 SendSchedTimer(tp);
479 initSchedTimer_ = true;
480 }
481 }
482
RemoveSchedDeadline(QueueTask * task)483 void QueueHandler::RemoveSchedDeadline(QueueTask* task)
484 {
485 std::unique_lock lock(mutex_);
486 schedDeadline_.erase(task);
487 }
488
CheckOverload()489 void QueueHandler::CheckOverload()
490 {
491 if (queue_->GetMapSize() <= CONGESTION_CNT) {
492 return;
493 }
494
495 uint64_t expect = queue_->GetHeadUptime();
496 uint64_t now = std::chrono::duration_cast<std::chrono::microseconds>(
497 std::chrono::steady_clock::now().time_since_epoch()).count();
498 if (now > expect && now - expect > CONGESTION_TIMEOUT_US * overloadTimes_.load()) {
499 overloadTimes_.fetch_add(1);
500 std::vector<uint64_t> timeoutVec = {};
501 ReportTimeout(timeoutVec);
502 }
503 }
504
ReportTimeout(const std::vector<uint64_t> & timeoutTaskId)505 void QueueHandler::ReportTimeout(const std::vector<uint64_t>& timeoutTaskId)
506 {
507 std::stringstream ss;
508 ss << "Queue_Schedule_Timeout, queueId=" << GetQueueId() << ", timeout task gid: ";
509 for (auto& id : timeoutTaskId) {
510 ss << id << " ";
511 }
512
513 FFRT_LOGE("%s", ss.str().c_str());
514 ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
515 if (func) {
516 func(GetQueueId(), ss.str().c_str(), ss.str().size());
517 }
518 }
519
520 } // namespace ffrt
521