• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "queue/queue_handler.h"
17 #include <sstream>
18 #include "concurrent_queue.h"
19 #include "eventhandler_adapter_queue.h"
20 #include "dfx/log/ffrt_log_api.h"
21 #include "dfx/trace_record/ffrt_trace_record.h"
22 #include "dfx/sysevent/sysevent.h"
23 #include "util/event_handler_adapter.h"
24 #include "util/ffrt_facade.h"
25 #include "util/slab.h"
26 #include "util/time_format.h"
27 #include "tm/queue_task.h"
28 #include "sched/scheduler.h"
29 
30 namespace {
31 constexpr uint32_t STRING_SIZE_MAX = 128;
32 constexpr uint32_t TASK_DONE_WAIT_UNIT = 10;
33 constexpr uint64_t SCHED_TIME_ACC_ERROR_US = 5000; // 5ms
34 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000000; // 1s
35 constexpr uint64_t TASK_WAIT_COUNT = 50000; // 5s
36 constexpr uint64_t INVALID_GID = 0;
37 }
38 
39 namespace ffrt {
QueueHandler(const char * name,const ffrt_queue_attr_t * attr,const int type)40 QueueHandler::QueueHandler(const char* name, const ffrt_queue_attr_t* attr, const int type)
41 {
42     // parse queue attribute
43     if (attr) {
44         qos_ = (ffrt_queue_attr_get_qos(attr) >= ffrt_qos_background) ? ffrt_queue_attr_get_qos(attr) : qos_;
45         timeout_ = ffrt_queue_attr_get_timeout(attr);
46         timeoutCb_ = ffrt_queue_attr_get_callback(attr);
47         maxConcurrency_ = ffrt_queue_attr_get_max_concurrency(attr);
48         threadMode_ = ffrt_queue_attr_get_thread_mode(attr);
49     }
50 
51     // callback reference counting is to ensure life cycle
52     if (timeout_ > 0 && timeoutCb_ != nullptr) {
53         QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
54         cbTask->IncDeleteRef();
55     }
56     trafficRecord_.SetTimeInterval(trafficRecordInterval_);
57     curTaskVec_.resize(maxConcurrency_);
58     timeoutTaskVec_.resize(maxConcurrency_);
59 
60     queue_ = CreateQueue(type, attr);
61     FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
62 
63     if (name != nullptr && std::string(name).size() <= STRING_SIZE_MAX) {
64         name_ = "sq_" + std::string(name) + "_" + std::to_string(GetQueueId());
65     } else {
66         name_ += "sq_unnamed_" + std::to_string(GetQueueId());
67         FFRT_LOGW("failed to set [queueId=%u] name due to invalid name or length.", GetQueueId());
68     }
69 
70     FFRTFacade::GetQMInstance().RegisterQueue(this);
71     FFRT_LOGI("construct %s succ, qos[%d]", name_.c_str(), qos_);
72 }
73 
~QueueHandler()74 QueueHandler::~QueueHandler()
75 {
76     FFRT_LOGI("destruct %s enter", name_.c_str());
77     // clear tasks in queue
78     CancelAndWait();
79     FFRTFacade::GetQMInstance().DeregisterQueue(this);
80 
81     // release callback resource
82     if (timeout_ > 0) {
83         // wait for all delayedWorker to complete.
84         while ((delayedCbCnt_.load() > 0) && !GetDelayedWorkerExitFlag()) {
85             std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
86         }
87 
88         if (timeoutCb_ != nullptr) {
89             QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
90             cbTask->DecDeleteRef();
91         }
92     }
93 
94     if (we_ != nullptr) {
95         DelayedRemove(we_->tp, we_);
96         SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
97     }
98     FFRT_LOGI("destruct %s leave", name_.c_str());
99 }
100 
SetLoop(Loop * loop)101 bool QueueHandler::SetLoop(Loop* loop)
102 {
103     FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
104     if (queue_->GetQueueType() == ffrt_queue_eventhandler_interactive) {
105         return true;
106     }
107     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
108         return false, "[queueId=%u] type invalid", GetQueueId());
109     return reinterpret_cast<ConcurrentQueue*>(queue_.get())->SetLoop(loop);
110 }
111 
ClearLoop()112 bool QueueHandler::ClearLoop()
113 {
114     FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
115     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
116         return false, "[queueId=%u] type invalid", GetQueueId());
117     return reinterpret_cast<ConcurrentQueue*>(queue_.get())->ClearLoop();
118 }
119 
PickUpTask()120 QueueTask* QueueHandler::PickUpTask()
121 {
122     FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
123     return queue_->Pull();
124 }
125 
Submit(QueueTask * task)126 void QueueHandler::Submit(QueueTask* task)
127 {
128     FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot submit, [queueId=%u] constructed failed", GetQueueId());
129     FFRT_COND_DO_ERR((task == nullptr), return, "input invalid, serial task is nullptr");
130 
131     // if qos not specified, qos of the queue is inherited by task
132     if (task->GetQos() == qos_inherit || task->GetQos() == qos_default) {
133         task->SetQos(qos_);
134     }
135 
136     uint64_t gid = task->gid;
137     task->Prepare();
138 
139     trafficRecord_.SubmitTraffic(this);
140 
141 #if (FFRT_TRACE_RECORD_LEVEL < FFRT_TRACE_RECORD_LEVEL_1)
142     if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
143         task->fromTid = ExecuteCtx::Cur()->tid;
144     }
145 #endif
146 
147     // work after that schedule timeout is set for queue
148     if (task->GetSchedTimeout() > 0) {
149         AddSchedDeadline(task);
150     }
151 
152     int ret = queue_->Push(task);
153     if (ret == SUCC) {
154         FFRT_LOGD("submit task[%lu] into %s", gid, name_.c_str());
155         return;
156     }
157     if (ret == FAILED) {
158         FFRT_SYSEVENT_LOGE("push task failed");
159         return;
160     }
161 
162     if (!isUsed_.load()) {
163         isUsed_.store(true);
164     }
165 
166     // activate queue
167     if (task->GetDelay() == 0) {
168         FFRT_LOGD("task [%llu] activate %s", gid, name_.c_str());
169         {
170             std::lock_guard lock(mutex_);
171             UpdateCurTask(task);
172         }
173         TransferTask(task);
174     } else {
175         FFRT_LOGD("task [%llu] with delay [%llu] activate %s", gid, task->GetDelay(), name_.c_str());
176         if (ret == INACTIVE) {
177             queue_->Push(task);
178         }
179         TransferInitTask();
180     }
181     FFRTFacade::GetQMInstance().UpdateQueueInfo();
182 }
183 
Cancel()184 void QueueHandler::Cancel()
185 {
186     FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
187     std::lock_guard lock(mutex_);
188     std::vector<QueueTask*> taskVec = queue_->GetHeadTask();
189     for (auto& task : taskVec) {
190         for (auto& curtask : curTaskVec_) {
191             if (task == curtask) {
192                 curtask = nullptr;
193             }
194         }
195     }
196     int count = queue_->Remove();
197     trafficRecord_.DoneTraffic(count);
198 }
199 
CancelAndWait()200 void QueueHandler::CancelAndWait()
201 {
202     FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancelAndWait, [queueId=%u] constructed failed",
203         GetQueueId());
204 
205     {
206         std::unique_lock lock(mutex_);
207         for (auto& curTask : curTaskVec_) {
208             if (curTask != nullptr && curTask->curStatus != TaskStatus::EXECUTING) {
209                 curTask = nullptr;
210             }
211         }
212     }
213     queue_->Stop();
214     while (CheckExecutingTask() || queue_->GetActiveStatus() || deliverCnt_.load() > 0) {
215         std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
216         desWaitCnt_++;
217         if (desWaitCnt_ == TASK_WAIT_COUNT) {
218             std::lock_guard lock(mutex_);
219             for (int i = 0; i < static_cast<int>(curTaskVec_.size()); i++) {
220                 FFRT_LOGI("Queue Destruct blocked for 5s, %s", GetDfxInfo(i).c_str());
221             }
222             desWaitCnt_ = 0;
223         }
224     }
225 }
226 
CheckExecutingTask()227 bool QueueHandler::CheckExecutingTask()
228 {
229     std::lock_guard lock(mutex_);
230     for (const auto& curTask : curTaskVec_) {
231         if (curTask != nullptr && curTask->curStatus == TaskStatus::EXECUTING) {
232             return true;
233         }
234     }
235     return false;
236 }
237 
Cancel(const char * name)238 int QueueHandler::Cancel(const char* name)
239 {
240     FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
241         "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
242     std::lock_guard lock(mutex_);
243     std::vector<QueueTask*> taskVec = queue_->GetHeadTask();
244     for (auto& task : taskVec) {
245         for (auto& curtask : curTaskVec_) {
246             if (task == curtask && task->IsMatch(name)) {
247                 curtask = nullptr;
248             }
249         }
250     }
251     int ret = queue_->Remove(name);
252     if (ret <= 0) {
253         FFRT_LOGD("cancel task %s failed, task may have been executed", name);
254     } else {
255         trafficRecord_.DoneTraffic(ret);
256     }
257     return ret > 0 ? SUCC : FAILED;
258 }
259 
Cancel(QueueTask * task)260 int QueueHandler::Cancel(QueueTask* task)
261 {
262     FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
263          "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
264     FFRT_COND_DO_ERR((task == nullptr), return INACTIVE, "input invalid, serial task is nullptr");
265 
266     if (task->GetSchedTimeout() > 0) {
267         RemoveSchedDeadline(task);
268     }
269 
270     int ret = queue_->Remove(task);
271     if (ret == SUCC) {
272         FFRT_LOGD("cancel task[%llu] %s succ", task->gid, task->label.c_str());
273         for (int i = 0; i < static_cast<int>(curTaskVec_.size()); i++) {
274             std::lock_guard lock(mutex_);
275             if (curTaskVec_[i] == task) {
276                 curTaskVec_[i] = nullptr;
277                 break;
278             }
279         }
280         trafficRecord_.DoneTraffic();
281         task->Cancel();
282     } else {
283         FFRT_LOGD("cancel task[%llu] %s failed, task may have been executed", task->gid, task->GetLabel().c_str());
284     }
285     return ret;
286 }
287 
Dispatch(QueueTask * inTask)288 void QueueHandler::Dispatch(QueueTask* inTask)
289 {
290     QueueTask* nextTask = nullptr;
291     for (QueueTask* task = inTask; task != nullptr; task = nextTask) {
292         // dfx watchdog
293         SetTimeoutMonitor(task);
294         SetCurTask(task);
295         execTaskId_.store(task->gid);
296 
297         // run user task
298         task->SetStatus(TaskStatus::EXECUTING);
299         FFRT_LOGD("run task [gid=%llu], queueId=%u", task->gid, GetQueueId());
300         auto f = reinterpret_cast<ffrt_function_header_t*>(task->func_storage);
301         FFRTTraceRecord::TaskExecute(&(task->executeTime));
302         if (task->GetSchedTimeout() > 0) {
303             RemoveSchedDeadline(task);
304         }
305 
306         uint64_t triggerTime{0};
307         if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
308             triggerTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
309                 std::chrono::steady_clock::now().time_since_epoch()).count());
310         }
311 
312         f->exec(f);
313 
314         if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
315             uint64_t completeTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
316                 std::chrono::steady_clock::now().time_since_epoch()).count());
317             reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->PushHistoryTask(task, triggerTime, completeTime);
318         }
319 
320         if (task != inTask) {
321             task->SetStatus(TaskStatus::FINISH);
322         }
323 
324         task->Finish();
325         RemoveTimeoutMonitor(task);
326 
327         trafficRecord_.DoneTraffic();
328         // run task batch
329         nextTask = task->GetNextTask();
330         {
331             std::lock_guard lock(mutex_);
332             curTaskVec_[task->curTaskIdx] = nextTask;
333         }
334         task->DecDeleteRef();
335         if (nextTask == nullptr) {
336             if (!queue_->IsOnLoop()) {
337             execTaskId_.store(0);
338                 Deliver();
339             }
340         }
341     }
342     inTask->SetStatus(TaskStatus::FINISH);
343 }
344 
Deliver()345 void QueueHandler::Deliver()
346 {
347     deliverCnt_.fetch_add(1);
348     {
349         // curtask has to be updated to headtask of whenmap before pull
350         std::lock_guard lock(mutex_);
351         std::vector<QueueTask*> taskMap = queue_->GetHeadTask();
352         if (!taskMap.empty()) {
353             std::unordered_set<QueueTask*> curTaskSet(curTaskVec_.begin(), curTaskVec_.end());
354             for (auto& task : taskMap) {
355                 if (curTaskSet.find(task) == curTaskSet.end()) {
356                     UpdateCurTask(task);
357                     break;
358                 }
359             }
360         }
361     }
362     QueueTask* task = queue_->Pull();
363     deliverCnt_.fetch_sub(1);
364     if (task != nullptr) {
365         SetCurTask(task);
366         TransferTask(task);
367     }
368 }
369 
TransferTask(QueueTask * task)370 void QueueHandler::TransferTask(QueueTask* task)
371 {
372     if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
373         reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->SetCurrentRunningTask(task);
374     }
375     task->Ready();
376 }
377 
TransferInitTask()378 void QueueHandler::TransferInitTask()
379 {
380     std::function<void()> initFunc = [] {};
381     auto f = create_function_wrapper(initFunc, ffrt_function_kind_queue);
382     QueueTask* initTask = GetQueueTaskByFuncStorageOffset(f);
383     new (initTask)ffrt::QueueTask(this);
384     initTask->SetQos(qos_);
385     trafficRecord_.SubmitTraffic(this);
386     TransferTask(initTask);
387 }
388 
SetTimeoutMonitor(QueueTask * task)389 void QueueHandler::SetTimeoutMonitor(QueueTask* task)
390 {
391     if (timeout_ <= 0) {
392         return;
393     }
394 
395     task->IncDeleteRef();
396 
397     // set delayed worker callback
398     auto timeoutWe = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
399     timeoutWe->cb = ([this, task](WaitEntry* timeoutWe) {
400         task->MonitorTaskStart();
401         if (!task->GetFinishStatus()) {
402             RunTimeOutCallback(task);
403         }
404         delayedCbCnt_.fetch_sub(1);
405         task->DecDeleteRef();
406         SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(timeoutWe));
407     });
408 
409     // set delayed worker wakeup time
410     std::chrono::microseconds timeout(timeout_);
411     auto now = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
412     timeoutWe->tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(now + timeout);
413     task->SetMonitorTask(timeoutWe);
414 
415     delayedCbCnt_.fetch_add(1);
416     if (!DelayedWakeup(timeoutWe->tp, timeoutWe, timeoutWe->cb, true)) {
417         delayedCbCnt_.fetch_sub(1);
418         task->DecDeleteRef();
419         SimpleAllocator<WaitUntilEntry>::FreeMem(timeoutWe);
420         FFRT_LOGW("failed to set watchdog for task gid=%llu in %s with timeout [%llu us] ", task->gid,
421             name_.c_str(), timeout_);
422         return;
423     }
424 
425     FFRT_LOGD("set watchdog of task gid=%llu of %s succ", task->gid, name_.c_str());
426 }
427 
RemoveTimeoutMonitor(QueueTask * task)428 void QueueHandler::RemoveTimeoutMonitor(QueueTask* task)
429 {
430     if (timeout_ <= 0 || task->IsMonitorTaskStart()) {
431         return;
432     }
433 
434     if (DelayedRemove(task->GetMonitorTask()->tp, task->GetMonitorTask())) {
435         delayedCbCnt_.fetch_sub(1);
436         task->DecDeleteRef();
437         SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(task->GetMonitorTask()));
438     }
439 }
440 
RunTimeOutCallback(QueueTask * task)441 void QueueHandler::RunTimeOutCallback(QueueTask* task)
442 {
443     std::stringstream ss;
444     std::string processNameStr = std::string(GetCurrentProcessName());
445     ss << "[Serial_Queue_Timeout_Callback] process name:[" << processNameStr << "], serial queue:[" <<
446         name_ << "], queueId:[" << GetQueueId() << "], serial task gid:[" << task->gid << "], task name:["
447         << task->label << "], execution time exceeds[" << timeout_ << "] us";
448     FFRT_LOGE("%s", ss.str().c_str());
449     if (timeoutCb_ != nullptr) {
450         QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
451         cbTask->IncDeleteRef();
452         FFRTFacade::GetDWInstance().SubmitAsyncTask([timeoutCb = timeoutCb_, cbTask] {
453             timeoutCb->exec(timeoutCb);
454             cbTask->DecDeleteRef();
455         });
456     }
457 }
458 
GetDfxInfo(int index) const459 std::string QueueHandler::GetDfxInfo(int index) const
460 {
461     std::stringstream ss;
462     if (queue_ != nullptr && curTaskVec_[index] != nullptr) {
463         TaskStatus curTaskStatus = curTaskVec_[index]->curStatus;
464         uint64_t curTaskTime = curTaskVec_[index]->statusTime.load(std::memory_order_relaxed);
465         TaskStatus preTaskStatus = curTaskVec_[index]->preStatus.load(std::memory_order_relaxed);
466         ss << "Queue task: tskname[" << curTaskVec_[index]->label.c_str() << "], qname=[" << name_ <<
467                 "], with delay of[" <<  curTaskVec_[index]->GetDelay() << "]us, qos[" << curTaskVec_[index]->GetQos() <<
468                 "], current status[" << StatusToString(curTaskStatus) << "], start at[" <<
469                 FormatDateString4SteadyClock(curTaskTime) << "], last status[" << StatusToString(preTaskStatus)
470                 << "], type=[" << queue_->GetQueueType() << "]";
471     } else {
472         ss << "Current queue or task nullptr";
473     }
474     return ss.str();
475 }
476 
EvaluateTaskTimeout(uint64_t timeoutThreshold,uint64_t timeoutUs,std::stringstream & ss)477 std::pair<std::vector<uint64_t>, uint64_t> QueueHandler::EvaluateTaskTimeout(uint64_t timeoutThreshold,
478     uint64_t timeoutUs, std::stringstream& ss)
479 {
480     uint64_t whenmapTskCount = GetTaskCnt();
481     std::lock_guard lock(mutex_);
482     std::pair<std::vector<uint64_t>, uint64_t> curTaskInfo;
483     uint64_t minTime = UINT64_MAX;
484     for (int i = 0; i < static_cast<int>(curTaskVec_.size()); i++) {
485         QueueTask* curTask = curTaskVec_[i];
486         if (curTask == nullptr) {
487             curTaskInfo.first.emplace_back(INVALID_GID);
488             continue;
489         }
490 
491         uint64_t curTaskTime = curTask->statusTime.load(std::memory_order_relaxed);
492         if (curTaskTime == 0 || CheckDelayStatus()) {
493             curTaskInfo.first.emplace_back(INVALID_GID);
494             // Update the next inspection time if current task is delaying and there are still tasks in whenMap.
495             // Otherwise, pause the monitor timer.
496             if (whenmapTskCount > 0) {
497                 minTime = std::min(minTime, TimeStampCntvct());
498             }
499             continue;
500         }
501 
502         if (curTaskTime > timeoutThreshold) {
503             curTaskInfo.first.emplace_back(INVALID_GID);
504             minTime = std::min(minTime, curTaskTime);
505             continue;
506         }
507 
508         TimeoutTask& timeoutTaskInfo = timeoutTaskVec_[i];
509         if (curTask->gid == timeoutTaskInfo.taskGid &&
510             curTask->curStatus == timeoutTaskInfo.taskStatus) {
511                 // Check if current timeout task needs to update timeout count.
512                 uint64_t nextSchedule = curTaskTime + timeoutUs * timeoutTaskInfo.timeoutCnt;
513                 if (nextSchedule < timeoutThreshold) {
514                     timeoutTaskInfo.timeoutCnt += 1;
515                 } else {
516                     curTaskInfo.first.emplace_back(INVALID_GID);
517                     minTime = std::min(minTime, nextSchedule);
518                     continue;
519                 }
520         } else {
521             timeoutTaskInfo.timeoutCnt = 1;
522             timeoutTaskInfo.taskGid = curTask->gid;
523             timeoutTaskInfo.taskStatus = curTask->curStatus;
524         }
525 
526         // When the same task is reported multiple times, the next inspection time is updated by adding the
527         // accumulated time based on the current status time.
528         curTaskInfo.first.emplace_back(timeoutTaskInfo.taskGid);
529         uint64_t nextTimeSchedule = curTaskTime + (timeoutUs * timeoutTaskInfo.timeoutCnt);
530         minTime = std::min(minTime, nextTimeSchedule);
531 
532         if (ControlTimeoutFreq(timeoutTaskInfo.timeoutCnt)) {
533             ReportTaskTimeout(timeoutUs, ss, i);
534         }
535     }
536 
537     curTaskInfo.second = minTime;
538     return curTaskInfo;
539 }
540 
ControlTimeoutFreq(uint64_t timeoutCnt)541 bool QueueHandler::ControlTimeoutFreq(uint64_t timeoutCnt)
542 {
543     return (timeoutCnt < 10) || (timeoutCnt < 100 && timeoutCnt % 10 == 0) || (timeoutCnt % 100 == 0);
544 }
545 
ReportTaskTimeout(uint64_t timeoutUs,std::stringstream & ss,int index)546 void QueueHandler::ReportTaskTimeout(uint64_t timeoutUs, std::stringstream& ss, int index)
547 {
548     ss.str("");
549     ss << GetDfxInfo(index) << ", timeout for[" << timeoutUs / MIN_TIMEOUT_THRESHOLD_US <<
550         "]s, reported count: " << timeoutTaskVec_[index].timeoutCnt;
551     FFRT_LOGW("%s", ss.str().c_str());
552 #ifdef FFRT_SEND_EVENT
553     if (timeoutTaskVec_[index].timeoutCnt == 1) {
554         std::string senarioName = "Serial_Queue_Timeout";
555         TaskTimeoutReport(ss, GetCurrentProcessName(), senarioName);
556     }
557 #endif
558 }
559 
IsIdle()560 bool QueueHandler::IsIdle()
561 {
562     FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
563     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
564         return false, "[queueId=%u] type invalid", GetQueueId());
565 
566     return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->IsIdle();
567 }
568 
SetEventHandler(void * eventHandler)569 void QueueHandler::SetEventHandler(void* eventHandler)
570 {
571     FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
572 
573     bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
574         (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
575     FFRT_COND_DO_ERR(typeInvalid, return, "[queueId=%u] type invalid", GetQueueId());
576 
577     reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->SetEventHandler(eventHandler);
578 }
579 
GetEventHandler()580 void* QueueHandler::GetEventHandler()
581 {
582     FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
583 
584     bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
585         (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
586     FFRT_COND_DO_ERR(typeInvalid, return nullptr, "[queueId=%u] type invalid", GetQueueId());
587 
588     return reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->GetEventHandler();
589 }
590 
Dump(const char * tag,char * buf,uint32_t len,bool historyInfo)591 int QueueHandler::Dump(const char* tag, char* buf, uint32_t len, bool historyInfo)
592 {
593     FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
594     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
595         return -1, "[queueId=%u] type invalid", GetQueueId());
596     return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->Dump(tag, buf, len, historyInfo);
597 }
598 
DumpSize(ffrt_inner_queue_priority_t priority)599 int QueueHandler::DumpSize(ffrt_inner_queue_priority_t priority)
600 {
601     FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
602     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
603         return -1, "[queueId=%u] type invalid", GetQueueId());
604     return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->DumpSize(priority);
605 }
606 
SendSchedTimer(TimePoint delay)607 void QueueHandler::SendSchedTimer(TimePoint delay)
608 {
609     we_->tp = delay;
610     if (!DelayedWakeup(we_->tp, we_, we_->cb, true)) {
611         FFRT_LOGW("failed to set delayedworker");
612     }
613 }
614 
CheckSchedDeadline()615 void QueueHandler::CheckSchedDeadline()
616 {
617     std::vector<std::pair<uint64_t, std::string>> timeoutTaskInfo;
618     // Collecting Timeout Tasks
619     {
620         std::lock_guard lock(mutex_);
621         uint64_t threshold = std::chrono::duration_cast<std::chrono::microseconds>(
622                 std::chrono::steady_clock::now().time_since_epoch()).count() + SCHED_TIME_ACC_ERROR_US;
623 
624         auto it = schedDeadline_.begin();
625         uint64_t nextDeadline = UINT64_MAX;
626         while (it != schedDeadline_.end()) {
627             if (it->second < threshold) {
628                 timeoutTaskInfo.push_back(std::make_pair(it->first->gid, it->first->label));
629                 it = schedDeadline_.erase(it);
630             } else {
631                 nextDeadline = std::min(nextDeadline, it->second);
632                 ++it;
633             }
634         }
635 
636         if (schedDeadline_.empty()) {
637             initSchedTimer_ = false;
638         } else {
639             std::chrono::microseconds timeout(nextDeadline);
640             TimePoint tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(
641                 std::chrono::steady_clock::time_point() + timeout);
642             FFRT_LOGI("queueId=%u set sched timer", GetQueueId());
643             SendSchedTimer(tp);
644         }
645     }
646 
647     // Reporting Timeout Information
648     if (!timeoutTaskInfo.empty()) {
649         ReportTimeout(timeoutTaskInfo);
650     }
651 }
652 
AddSchedDeadline(QueueTask * task)653 void QueueHandler::AddSchedDeadline(QueueTask* task)
654 {
655     // sched timeout only support serial queues, other queue types will be supported based on service requirements.
656     if (queue_->GetQueueType() != ffrt_queue_serial) {
657         return;
658     }
659 
660     std::lock_guard lock(mutex_);
661     schedDeadline_.insert({task, task->GetSchedTimeout() + task->GetUptime()});
662 
663     if (!initSchedTimer_) {
664         if (we_ == nullptr) {
665             we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
666             we_->cb = ([this](WaitEntry* we) { CheckSchedDeadline(); });
667         }
668         std::chrono::microseconds timeout(schedDeadline_[task]);
669         TimePoint tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(
670             std::chrono::steady_clock::time_point() + timeout);
671         SendSchedTimer(tp);
672         initSchedTimer_ = true;
673     }
674 }
675 
RemoveSchedDeadline(QueueTask * task)676 void QueueHandler::RemoveSchedDeadline(QueueTask* task)
677 {
678     std::lock_guard lock(mutex_);
679     schedDeadline_.erase(task);
680 }
681 
ReportTimeout(const std::vector<std::pair<uint64_t,std::string>> & timeoutTaskInfo)682 void QueueHandler::ReportTimeout(const std::vector<std::pair<uint64_t, std::string>>& timeoutTaskInfo)
683 {
684     std::stringstream ss;
685     ss << "Queue_Schedule_Timeout, queueId=" << GetQueueId() << ", timeout task gid: ";
686     for (auto& info : timeoutTaskInfo) {
687         ss << info.first << ", name " << info.second.c_str() << " ";
688     }
689 
690     FFRT_LOGE("%s", ss.str().c_str());
691 
692     uint32_t queueId = GetQueueId();
693     std::string ssStr = ss.str();
694     if (ffrt_task_timeout_get_cb()) {
695         FFRTFacade::GetDWInstance().SubmitAsyncTask([queueId, ssStr] {
696             ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
697             if (func) {
698                 func(queueId, ssStr.c_str(), ssStr.size());
699             }
700         });
701     }
702 }
703 
SetCurTask(QueueTask * task)704 void QueueHandler::SetCurTask(QueueTask* task)
705 {
706     std::lock_guard lock(mutex_);
707     if (task != nullptr) {
708         curTaskVec_[task->curTaskIdx] = task;
709     }
710 }
711 
UpdateCurTask(QueueTask * task)712 void QueueHandler::UpdateCurTask(QueueTask* task)
713 {
714     for (int i = 0; i < static_cast<int>(curTaskVec_.size()); i++) {
715         if (curTaskVec_[i] == nullptr) {
716             curTaskVec_[i] = task;
717             task->curTaskIdx = i;
718             return;
719         }
720     }
721 }
722 } // namespace ffrt
723