1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "queue/queue_handler.h"
17 #include <sstream>
18 #include "concurrent_queue.h"
19 #include "eventhandler_adapter_queue.h"
20 #include "dfx/log/ffrt_log_api.h"
21 #include "dfx/trace_record/ffrt_trace_record.h"
22 #include "dfx/sysevent/sysevent.h"
23 #include "util/event_handler_adapter.h"
24 #include "util/ffrt_facade.h"
25 #include "util/slab.h"
26 #include "util/time_format.h"
27 #include "tm/queue_task.h"
28 #include "sched/scheduler.h"
29
30 namespace {
31 constexpr uint32_t STRING_SIZE_MAX = 128;
32 constexpr uint32_t TASK_DONE_WAIT_UNIT = 10;
33 constexpr uint64_t SCHED_TIME_ACC_ERROR_US = 5000; // 5ms
34 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000000; // 1s
35 constexpr uint64_t TASK_WAIT_COUNT = 50000; // 5s
36 constexpr uint64_t INVALID_GID = 0;
37 }
38
39 namespace ffrt {
QueueHandler(const char * name,const ffrt_queue_attr_t * attr,const int type)40 QueueHandler::QueueHandler(const char* name, const ffrt_queue_attr_t* attr, const int type)
41 {
42 // parse queue attribute
43 if (attr) {
44 qos_ = (ffrt_queue_attr_get_qos(attr) >= ffrt_qos_background) ? ffrt_queue_attr_get_qos(attr) : qos_;
45 timeout_ = ffrt_queue_attr_get_timeout(attr);
46 timeoutCb_ = ffrt_queue_attr_get_callback(attr);
47 maxConcurrency_ = ffrt_queue_attr_get_max_concurrency(attr);
48 threadMode_ = ffrt_queue_attr_get_thread_mode(attr);
49 }
50
51 // callback reference counting is to ensure life cycle
52 if (timeout_ > 0 && timeoutCb_ != nullptr) {
53 QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
54 cbTask->IncDeleteRef();
55 }
56 trafficRecord_.SetTimeInterval(trafficRecordInterval_);
57 curTaskVec_.resize(maxConcurrency_);
58 timeoutTaskVec_.resize(maxConcurrency_);
59
60 queue_ = CreateQueue(type, attr);
61 FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
62
63 if (name != nullptr && std::string(name).size() <= STRING_SIZE_MAX) {
64 name_ = "sq_" + std::string(name) + "_" + std::to_string(GetQueueId());
65 } else {
66 name_ += "sq_unnamed_" + std::to_string(GetQueueId());
67 FFRT_LOGW("failed to set [queueId=%u] name due to invalid name or length.", GetQueueId());
68 }
69
70 FFRTFacade::GetQMInstance().RegisterQueue(this);
71 FFRT_LOGI("construct %s succ, qos[%d]", name_.c_str(), qos_);
72 }
73
~QueueHandler()74 QueueHandler::~QueueHandler()
75 {
76 FFRT_LOGI("destruct %s enter", name_.c_str());
77 // clear tasks in queue
78 CancelAndWait();
79 FFRTFacade::GetQMInstance().DeregisterQueue(this);
80
81 // release callback resource
82 if (timeout_ > 0) {
83 // wait for all delayedWorker to complete.
84 while ((delayedCbCnt_.load() > 0) && !GetDelayedWorkerExitFlag()) {
85 std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
86 }
87
88 if (timeoutCb_ != nullptr) {
89 QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
90 cbTask->DecDeleteRef();
91 }
92 }
93
94 if (we_ != nullptr) {
95 DelayedRemove(we_->tp, we_);
96 SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
97 }
98 FFRT_LOGI("destruct %s leave", name_.c_str());
99 }
100
SetLoop(Loop * loop)101 bool QueueHandler::SetLoop(Loop* loop)
102 {
103 FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
104 if (queue_->GetQueueType() == ffrt_queue_eventhandler_interactive) {
105 return true;
106 }
107 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
108 return false, "[queueId=%u] type invalid", GetQueueId());
109 return reinterpret_cast<ConcurrentQueue*>(queue_.get())->SetLoop(loop);
110 }
111
ClearLoop()112 bool QueueHandler::ClearLoop()
113 {
114 FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
115 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
116 return false, "[queueId=%u] type invalid", GetQueueId());
117 return reinterpret_cast<ConcurrentQueue*>(queue_.get())->ClearLoop();
118 }
119
PickUpTask()120 QueueTask* QueueHandler::PickUpTask()
121 {
122 FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
123 return queue_->Pull();
124 }
125
Submit(QueueTask * task)126 void QueueHandler::Submit(QueueTask* task)
127 {
128 FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot submit, [queueId=%u] constructed failed", GetQueueId());
129 FFRT_COND_DO_ERR((task == nullptr), return, "input invalid, serial task is nullptr");
130
131 // if qos not specified, qos of the queue is inherited by task
132 if (task->GetQos() == qos_inherit || task->GetQos() == qos_default) {
133 task->SetQos(qos_);
134 }
135
136 uint64_t gid = task->gid;
137 task->Prepare();
138
139 trafficRecord_.SubmitTraffic(this);
140
141 #if (FFRT_TRACE_RECORD_LEVEL < FFRT_TRACE_RECORD_LEVEL_1)
142 if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
143 task->fromTid = ExecuteCtx::Cur()->tid;
144 }
145 #endif
146
147 // work after that schedule timeout is set for queue
148 if (task->GetSchedTimeout() > 0) {
149 AddSchedDeadline(task);
150 }
151
152 int ret = queue_->Push(task);
153 if (ret == SUCC) {
154 FFRT_LOGD("submit task[%lu] into %s", gid, name_.c_str());
155 return;
156 }
157 if (ret == FAILED) {
158 FFRT_SYSEVENT_LOGE("push task failed");
159 return;
160 }
161
162 if (!isUsed_.load()) {
163 isUsed_.store(true);
164 }
165
166 // activate queue
167 if (task->GetDelay() == 0) {
168 FFRT_LOGD("task [%llu] activate %s", gid, name_.c_str());
169 {
170 std::lock_guard lock(mutex_);
171 UpdateCurTask(task);
172 }
173 TransferTask(task);
174 } else {
175 FFRT_LOGD("task [%llu] with delay [%llu] activate %s", gid, task->GetDelay(), name_.c_str());
176 if (ret == INACTIVE) {
177 queue_->Push(task);
178 }
179 TransferInitTask();
180 }
181 FFRTFacade::GetQMInstance().UpdateQueueInfo();
182 }
183
Cancel()184 void QueueHandler::Cancel()
185 {
186 FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
187 std::lock_guard lock(mutex_);
188 std::vector<QueueTask*> taskVec = queue_->GetHeadTask();
189 for (auto& task : taskVec) {
190 for (auto& curtask : curTaskVec_) {
191 if (task == curtask) {
192 curtask = nullptr;
193 }
194 }
195 }
196 int count = queue_->Remove();
197 trafficRecord_.DoneTraffic(count);
198 }
199
CancelAndWait()200 void QueueHandler::CancelAndWait()
201 {
202 FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancelAndWait, [queueId=%u] constructed failed",
203 GetQueueId());
204
205 {
206 std::unique_lock lock(mutex_);
207 for (auto& curTask : curTaskVec_) {
208 if (curTask != nullptr && curTask->curStatus != TaskStatus::EXECUTING) {
209 curTask = nullptr;
210 }
211 }
212 }
213 queue_->Stop();
214 while (CheckExecutingTask() || queue_->GetActiveStatus() || deliverCnt_.load() > 0) {
215 std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
216 desWaitCnt_++;
217 if (desWaitCnt_ == TASK_WAIT_COUNT) {
218 std::lock_guard lock(mutex_);
219 for (int i = 0; i < static_cast<int>(curTaskVec_.size()); i++) {
220 FFRT_LOGI("Queue Destruct blocked for 5s, %s", GetDfxInfo(i).c_str());
221 }
222 desWaitCnt_ = 0;
223 }
224 }
225 }
226
CheckExecutingTask()227 bool QueueHandler::CheckExecutingTask()
228 {
229 std::lock_guard lock(mutex_);
230 for (const auto& curTask : curTaskVec_) {
231 if (curTask != nullptr && curTask->curStatus == TaskStatus::EXECUTING) {
232 return true;
233 }
234 }
235 return false;
236 }
237
Cancel(const char * name)238 int QueueHandler::Cancel(const char* name)
239 {
240 FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
241 "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
242 std::lock_guard lock(mutex_);
243 std::vector<QueueTask*> taskVec = queue_->GetHeadTask();
244 for (auto& task : taskVec) {
245 for (auto& curtask : curTaskVec_) {
246 if (task == curtask && task->IsMatch(name)) {
247 curtask = nullptr;
248 }
249 }
250 }
251 int ret = queue_->Remove(name);
252 if (ret <= 0) {
253 FFRT_LOGD("cancel task %s failed, task may have been executed", name);
254 } else {
255 trafficRecord_.DoneTraffic(ret);
256 }
257 return ret > 0 ? SUCC : FAILED;
258 }
259
Cancel(QueueTask * task)260 int QueueHandler::Cancel(QueueTask* task)
261 {
262 FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
263 "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
264 FFRT_COND_DO_ERR((task == nullptr), return INACTIVE, "input invalid, serial task is nullptr");
265
266 if (task->GetSchedTimeout() > 0) {
267 RemoveSchedDeadline(task);
268 }
269
270 int ret = queue_->Remove(task);
271 if (ret == SUCC) {
272 FFRT_LOGD("cancel task[%llu] %s succ", task->gid, task->label.c_str());
273 for (int i = 0; i < static_cast<int>(curTaskVec_.size()); i++) {
274 std::lock_guard lock(mutex_);
275 if (curTaskVec_[i] == task) {
276 curTaskVec_[i] = nullptr;
277 break;
278 }
279 }
280 trafficRecord_.DoneTraffic();
281 task->Cancel();
282 } else {
283 FFRT_LOGD("cancel task[%llu] %s failed, task may have been executed", task->gid, task->GetLabel().c_str());
284 }
285 return ret;
286 }
287
Dispatch(QueueTask * inTask)288 void QueueHandler::Dispatch(QueueTask* inTask)
289 {
290 QueueTask* nextTask = nullptr;
291 for (QueueTask* task = inTask; task != nullptr; task = nextTask) {
292 // dfx watchdog
293 SetTimeoutMonitor(task);
294 SetCurTask(task);
295 execTaskId_.store(task->gid);
296
297 // run user task
298 task->SetStatus(TaskStatus::EXECUTING);
299 FFRT_LOGD("run task [gid=%llu], queueId=%u", task->gid, GetQueueId());
300 auto f = reinterpret_cast<ffrt_function_header_t*>(task->func_storage);
301 FFRTTraceRecord::TaskExecute(&(task->executeTime));
302 if (task->GetSchedTimeout() > 0) {
303 RemoveSchedDeadline(task);
304 }
305
306 uint64_t triggerTime{0};
307 if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
308 triggerTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
309 std::chrono::steady_clock::now().time_since_epoch()).count());
310 }
311
312 f->exec(f);
313
314 if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
315 uint64_t completeTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
316 std::chrono::steady_clock::now().time_since_epoch()).count());
317 reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->PushHistoryTask(task, triggerTime, completeTime);
318 }
319
320 if (task != inTask) {
321 task->SetStatus(TaskStatus::FINISH);
322 }
323
324 task->Finish();
325 RemoveTimeoutMonitor(task);
326
327 trafficRecord_.DoneTraffic();
328 // run task batch
329 nextTask = task->GetNextTask();
330 {
331 std::lock_guard lock(mutex_);
332 curTaskVec_[task->curTaskIdx] = nextTask;
333 }
334 task->DecDeleteRef();
335 if (nextTask == nullptr) {
336 if (!queue_->IsOnLoop()) {
337 execTaskId_.store(0);
338 Deliver();
339 }
340 }
341 }
342 inTask->SetStatus(TaskStatus::FINISH);
343 }
344
Deliver()345 void QueueHandler::Deliver()
346 {
347 deliverCnt_.fetch_add(1);
348 {
349 // curtask has to be updated to headtask of whenmap before pull
350 std::lock_guard lock(mutex_);
351 std::vector<QueueTask*> taskMap = queue_->GetHeadTask();
352 if (!taskMap.empty()) {
353 std::unordered_set<QueueTask*> curTaskSet(curTaskVec_.begin(), curTaskVec_.end());
354 for (auto& task : taskMap) {
355 if (curTaskSet.find(task) == curTaskSet.end()) {
356 UpdateCurTask(task);
357 break;
358 }
359 }
360 }
361 }
362 QueueTask* task = queue_->Pull();
363 deliverCnt_.fetch_sub(1);
364 if (task != nullptr) {
365 SetCurTask(task);
366 TransferTask(task);
367 }
368 }
369
TransferTask(QueueTask * task)370 void QueueHandler::TransferTask(QueueTask* task)
371 {
372 if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
373 reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->SetCurrentRunningTask(task);
374 }
375 task->Ready();
376 }
377
TransferInitTask()378 void QueueHandler::TransferInitTask()
379 {
380 std::function<void()> initFunc = [] {};
381 auto f = create_function_wrapper(initFunc, ffrt_function_kind_queue);
382 QueueTask* initTask = GetQueueTaskByFuncStorageOffset(f);
383 new (initTask)ffrt::QueueTask(this);
384 initTask->SetQos(qos_);
385 trafficRecord_.SubmitTraffic(this);
386 TransferTask(initTask);
387 }
388
SetTimeoutMonitor(QueueTask * task)389 void QueueHandler::SetTimeoutMonitor(QueueTask* task)
390 {
391 if (timeout_ <= 0) {
392 return;
393 }
394
395 task->IncDeleteRef();
396
397 // set delayed worker callback
398 auto timeoutWe = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
399 timeoutWe->cb = ([this, task](WaitEntry* timeoutWe) {
400 task->MonitorTaskStart();
401 if (!task->GetFinishStatus()) {
402 RunTimeOutCallback(task);
403 }
404 delayedCbCnt_.fetch_sub(1);
405 task->DecDeleteRef();
406 SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(timeoutWe));
407 });
408
409 // set delayed worker wakeup time
410 std::chrono::microseconds timeout(timeout_);
411 auto now = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
412 timeoutWe->tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(now + timeout);
413 task->SetMonitorTask(timeoutWe);
414
415 delayedCbCnt_.fetch_add(1);
416 if (!DelayedWakeup(timeoutWe->tp, timeoutWe, timeoutWe->cb, true)) {
417 delayedCbCnt_.fetch_sub(1);
418 task->DecDeleteRef();
419 SimpleAllocator<WaitUntilEntry>::FreeMem(timeoutWe);
420 FFRT_LOGW("failed to set watchdog for task gid=%llu in %s with timeout [%llu us] ", task->gid,
421 name_.c_str(), timeout_);
422 return;
423 }
424
425 FFRT_LOGD("set watchdog of task gid=%llu of %s succ", task->gid, name_.c_str());
426 }
427
RemoveTimeoutMonitor(QueueTask * task)428 void QueueHandler::RemoveTimeoutMonitor(QueueTask* task)
429 {
430 if (timeout_ <= 0 || task->IsMonitorTaskStart()) {
431 return;
432 }
433
434 if (DelayedRemove(task->GetMonitorTask()->tp, task->GetMonitorTask())) {
435 delayedCbCnt_.fetch_sub(1);
436 task->DecDeleteRef();
437 SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(task->GetMonitorTask()));
438 }
439 }
440
RunTimeOutCallback(QueueTask * task)441 void QueueHandler::RunTimeOutCallback(QueueTask* task)
442 {
443 std::stringstream ss;
444 std::string processNameStr = std::string(GetCurrentProcessName());
445 ss << "[Serial_Queue_Timeout_Callback] process name:[" << processNameStr << "], serial queue:[" <<
446 name_ << "], queueId:[" << GetQueueId() << "], serial task gid:[" << task->gid << "], task name:["
447 << task->label << "], execution time exceeds[" << timeout_ << "] us";
448 FFRT_LOGE("%s", ss.str().c_str());
449 if (timeoutCb_ != nullptr) {
450 QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
451 cbTask->IncDeleteRef();
452 FFRTFacade::GetDWInstance().SubmitAsyncTask([timeoutCb = timeoutCb_, cbTask] {
453 timeoutCb->exec(timeoutCb);
454 cbTask->DecDeleteRef();
455 });
456 }
457 }
458
GetDfxInfo(int index) const459 std::string QueueHandler::GetDfxInfo(int index) const
460 {
461 std::stringstream ss;
462 if (queue_ != nullptr && curTaskVec_[index] != nullptr) {
463 TaskStatus curTaskStatus = curTaskVec_[index]->curStatus;
464 uint64_t curTaskTime = curTaskVec_[index]->statusTime.load(std::memory_order_relaxed);
465 TaskStatus preTaskStatus = curTaskVec_[index]->preStatus.load(std::memory_order_relaxed);
466 ss << "Queue task: tskname[" << curTaskVec_[index]->label.c_str() << "], qname=[" << name_ <<
467 "], with delay of[" << curTaskVec_[index]->GetDelay() << "]us, qos[" << curTaskVec_[index]->GetQos() <<
468 "], current status[" << StatusToString(curTaskStatus) << "], start at[" <<
469 FormatDateString4SteadyClock(curTaskTime) << "], last status[" << StatusToString(preTaskStatus)
470 << "], type=[" << queue_->GetQueueType() << "]";
471 } else {
472 ss << "Current queue or task nullptr";
473 }
474 return ss.str();
475 }
476
EvaluateTaskTimeout(uint64_t timeoutThreshold,uint64_t timeoutUs,std::stringstream & ss)477 std::pair<std::vector<uint64_t>, uint64_t> QueueHandler::EvaluateTaskTimeout(uint64_t timeoutThreshold,
478 uint64_t timeoutUs, std::stringstream& ss)
479 {
480 uint64_t whenmapTskCount = GetTaskCnt();
481 std::lock_guard lock(mutex_);
482 std::pair<std::vector<uint64_t>, uint64_t> curTaskInfo;
483 uint64_t minTime = UINT64_MAX;
484 for (int i = 0; i < static_cast<int>(curTaskVec_.size()); i++) {
485 QueueTask* curTask = curTaskVec_[i];
486 if (curTask == nullptr) {
487 curTaskInfo.first.emplace_back(INVALID_GID);
488 continue;
489 }
490
491 uint64_t curTaskTime = curTask->statusTime.load(std::memory_order_relaxed);
492 if (curTaskTime == 0 || CheckDelayStatus()) {
493 curTaskInfo.first.emplace_back(INVALID_GID);
494 // Update the next inspection time if current task is delaying and there are still tasks in whenMap.
495 // Otherwise, pause the monitor timer.
496 if (whenmapTskCount > 0) {
497 minTime = std::min(minTime, TimeStampCntvct());
498 }
499 continue;
500 }
501
502 if (curTaskTime > timeoutThreshold) {
503 curTaskInfo.first.emplace_back(INVALID_GID);
504 minTime = std::min(minTime, curTaskTime);
505 continue;
506 }
507
508 TimeoutTask& timeoutTaskInfo = timeoutTaskVec_[i];
509 if (curTask->gid == timeoutTaskInfo.taskGid &&
510 curTask->curStatus == timeoutTaskInfo.taskStatus) {
511 // Check if current timeout task needs to update timeout count.
512 uint64_t nextSchedule = curTaskTime + timeoutUs * timeoutTaskInfo.timeoutCnt;
513 if (nextSchedule < timeoutThreshold) {
514 timeoutTaskInfo.timeoutCnt += 1;
515 } else {
516 curTaskInfo.first.emplace_back(INVALID_GID);
517 minTime = std::min(minTime, nextSchedule);
518 continue;
519 }
520 } else {
521 timeoutTaskInfo.timeoutCnt = 1;
522 timeoutTaskInfo.taskGid = curTask->gid;
523 timeoutTaskInfo.taskStatus = curTask->curStatus;
524 }
525
526 // When the same task is reported multiple times, the next inspection time is updated by adding the
527 // accumulated time based on the current status time.
528 curTaskInfo.first.emplace_back(timeoutTaskInfo.taskGid);
529 uint64_t nextTimeSchedule = curTaskTime + (timeoutUs * timeoutTaskInfo.timeoutCnt);
530 minTime = std::min(minTime, nextTimeSchedule);
531
532 if (ControlTimeoutFreq(timeoutTaskInfo.timeoutCnt)) {
533 ReportTaskTimeout(timeoutUs, ss, i);
534 }
535 }
536
537 curTaskInfo.second = minTime;
538 return curTaskInfo;
539 }
540
ControlTimeoutFreq(uint64_t timeoutCnt)541 bool QueueHandler::ControlTimeoutFreq(uint64_t timeoutCnt)
542 {
543 return (timeoutCnt < 10) || (timeoutCnt < 100 && timeoutCnt % 10 == 0) || (timeoutCnt % 100 == 0);
544 }
545
ReportTaskTimeout(uint64_t timeoutUs,std::stringstream & ss,int index)546 void QueueHandler::ReportTaskTimeout(uint64_t timeoutUs, std::stringstream& ss, int index)
547 {
548 ss.str("");
549 ss << GetDfxInfo(index) << ", timeout for[" << timeoutUs / MIN_TIMEOUT_THRESHOLD_US <<
550 "]s, reported count: " << timeoutTaskVec_[index].timeoutCnt;
551 FFRT_LOGW("%s", ss.str().c_str());
552 #ifdef FFRT_SEND_EVENT
553 if (timeoutTaskVec_[index].timeoutCnt == 1) {
554 std::string senarioName = "Serial_Queue_Timeout";
555 TaskTimeoutReport(ss, GetCurrentProcessName(), senarioName);
556 }
557 #endif
558 }
559
IsIdle()560 bool QueueHandler::IsIdle()
561 {
562 FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
563 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
564 return false, "[queueId=%u] type invalid", GetQueueId());
565
566 return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->IsIdle();
567 }
568
SetEventHandler(void * eventHandler)569 void QueueHandler::SetEventHandler(void* eventHandler)
570 {
571 FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
572
573 bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
574 (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
575 FFRT_COND_DO_ERR(typeInvalid, return, "[queueId=%u] type invalid", GetQueueId());
576
577 reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->SetEventHandler(eventHandler);
578 }
579
GetEventHandler()580 void* QueueHandler::GetEventHandler()
581 {
582 FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
583
584 bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
585 (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
586 FFRT_COND_DO_ERR(typeInvalid, return nullptr, "[queueId=%u] type invalid", GetQueueId());
587
588 return reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->GetEventHandler();
589 }
590
Dump(const char * tag,char * buf,uint32_t len,bool historyInfo)591 int QueueHandler::Dump(const char* tag, char* buf, uint32_t len, bool historyInfo)
592 {
593 FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
594 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
595 return -1, "[queueId=%u] type invalid", GetQueueId());
596 return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->Dump(tag, buf, len, historyInfo);
597 }
598
DumpSize(ffrt_inner_queue_priority_t priority)599 int QueueHandler::DumpSize(ffrt_inner_queue_priority_t priority)
600 {
601 FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
602 FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
603 return -1, "[queueId=%u] type invalid", GetQueueId());
604 return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->DumpSize(priority);
605 }
606
SendSchedTimer(TimePoint delay)607 void QueueHandler::SendSchedTimer(TimePoint delay)
608 {
609 we_->tp = delay;
610 if (!DelayedWakeup(we_->tp, we_, we_->cb, true)) {
611 FFRT_LOGW("failed to set delayedworker");
612 }
613 }
614
CheckSchedDeadline()615 void QueueHandler::CheckSchedDeadline()
616 {
617 std::vector<std::pair<uint64_t, std::string>> timeoutTaskInfo;
618 // Collecting Timeout Tasks
619 {
620 std::lock_guard lock(mutex_);
621 uint64_t threshold = std::chrono::duration_cast<std::chrono::microseconds>(
622 std::chrono::steady_clock::now().time_since_epoch()).count() + SCHED_TIME_ACC_ERROR_US;
623
624 auto it = schedDeadline_.begin();
625 uint64_t nextDeadline = UINT64_MAX;
626 while (it != schedDeadline_.end()) {
627 if (it->second < threshold) {
628 timeoutTaskInfo.push_back(std::make_pair(it->first->gid, it->first->label));
629 it = schedDeadline_.erase(it);
630 } else {
631 nextDeadline = std::min(nextDeadline, it->second);
632 ++it;
633 }
634 }
635
636 if (schedDeadline_.empty()) {
637 initSchedTimer_ = false;
638 } else {
639 std::chrono::microseconds timeout(nextDeadline);
640 TimePoint tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(
641 std::chrono::steady_clock::time_point() + timeout);
642 FFRT_LOGI("queueId=%u set sched timer", GetQueueId());
643 SendSchedTimer(tp);
644 }
645 }
646
647 // Reporting Timeout Information
648 if (!timeoutTaskInfo.empty()) {
649 ReportTimeout(timeoutTaskInfo);
650 }
651 }
652
AddSchedDeadline(QueueTask * task)653 void QueueHandler::AddSchedDeadline(QueueTask* task)
654 {
655 // sched timeout only support serial queues, other queue types will be supported based on service requirements.
656 if (queue_->GetQueueType() != ffrt_queue_serial) {
657 return;
658 }
659
660 std::lock_guard lock(mutex_);
661 schedDeadline_.insert({task, task->GetSchedTimeout() + task->GetUptime()});
662
663 if (!initSchedTimer_) {
664 if (we_ == nullptr) {
665 we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
666 we_->cb = ([this](WaitEntry* we) { CheckSchedDeadline(); });
667 }
668 std::chrono::microseconds timeout(schedDeadline_[task]);
669 TimePoint tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(
670 std::chrono::steady_clock::time_point() + timeout);
671 SendSchedTimer(tp);
672 initSchedTimer_ = true;
673 }
674 }
675
RemoveSchedDeadline(QueueTask * task)676 void QueueHandler::RemoveSchedDeadline(QueueTask* task)
677 {
678 std::lock_guard lock(mutex_);
679 schedDeadline_.erase(task);
680 }
681
ReportTimeout(const std::vector<std::pair<uint64_t,std::string>> & timeoutTaskInfo)682 void QueueHandler::ReportTimeout(const std::vector<std::pair<uint64_t, std::string>>& timeoutTaskInfo)
683 {
684 std::stringstream ss;
685 ss << "Queue_Schedule_Timeout, queueId=" << GetQueueId() << ", timeout task gid: ";
686 for (auto& info : timeoutTaskInfo) {
687 ss << info.first << ", name " << info.second.c_str() << " ";
688 }
689
690 FFRT_LOGE("%s", ss.str().c_str());
691
692 uint32_t queueId = GetQueueId();
693 std::string ssStr = ss.str();
694 if (ffrt_task_timeout_get_cb()) {
695 FFRTFacade::GetDWInstance().SubmitAsyncTask([queueId, ssStr] {
696 ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
697 if (func) {
698 func(queueId, ssStr.c_str(), ssStr.size());
699 }
700 });
701 }
702 }
703
SetCurTask(QueueTask * task)704 void QueueHandler::SetCurTask(QueueTask* task)
705 {
706 std::lock_guard lock(mutex_);
707 if (task != nullptr) {
708 curTaskVec_[task->curTaskIdx] = task;
709 }
710 }
711
UpdateCurTask(QueueTask * task)712 void QueueHandler::UpdateCurTask(QueueTask* task)
713 {
714 for (int i = 0; i < static_cast<int>(curTaskVec_.size()); i++) {
715 if (curTaskVec_[i] == nullptr) {
716 curTaskVec_[i] = task;
717 task->curTaskIdx = i;
718 return;
719 }
720 }
721 }
722 } // namespace ffrt
723