• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "queue_handler.h"
16 #include <sys/syscall.h>
17 #include <sstream>
18 #include "dfx/log/ffrt_log_api.h"
19 #include "dfx/trace_record/ffrt_trace_record.h"
20 #include "util/event_handler_adapter.h"
21 #include "util/ffrt_facade.h"
22 #include "util/slab.h"
23 #include "tm/queue_task.h"
24 #include "concurrent_queue.h"
25 #include "eventhandler_adapter_queue.h"
26 #include "sched/scheduler.h"
27 
28 namespace {
29 constexpr uint32_t STRING_SIZE_MAX = 128;
30 constexpr uint32_t TASK_DONE_WAIT_UNIT = 10;
31 constexpr uint64_t SCHED_TIME_ACC_ERROR_US = 5000; // 5ms
32 }
33 
34 namespace ffrt {
QueueHandler(const char * name,const ffrt_queue_attr_t * attr,const int type)35 QueueHandler::QueueHandler(const char* name, const ffrt_queue_attr_t* attr, const int type)
36 {
37     // parse queue attribute
38     if (attr) {
39         qos_ = (ffrt_queue_attr_get_qos(attr) >= ffrt_qos_background) ? ffrt_queue_attr_get_qos(attr) : qos_;
40         timeout_ = ffrt_queue_attr_get_timeout(attr);
41         timeoutCb_ = ffrt_queue_attr_get_callback(attr);
42     }
43 
44     // callback reference counting is to ensure life cycle
45     if (timeout_ > 0 && timeoutCb_ != nullptr) {
46         QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
47         cbTask->IncDeleteRef();
48     }
49 
50     queue_ = CreateQueue(type, attr);
51     FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
52 
53     if (name != nullptr && std::string(name).size() <= STRING_SIZE_MAX) {
54         name_ = "sq_" + std::string(name) + "_" + std::to_string(GetQueueId());
55     } else {
56         name_ += "sq_unnamed_" + std::to_string(GetQueueId());
57         FFRT_LOGW("failed to set [queueId=%u] name due to invalid name or length.", GetQueueId());
58     }
59 
60     FFRTFacade::GetQMInstance().RegisterQueueId(GetQueueId(), this);
61     FFRT_LOGI("construct %s succ, qos[%d]", name_.c_str(), qos_);
62 }
63 
~QueueHandler()64 QueueHandler::~QueueHandler()
65 {
66     FFRT_LOGI("destruct %s enter", name_.c_str());
67     // clear tasks in queue
68     CancelAndWait();
69     FFRTFacade::GetQMInstance().ResetQueueStruct(GetQueueId());
70 
71     // release callback resource
72     if (timeout_ > 0) {
73         // wait for all delayedWorker to complete.
74         while (delayedCbCnt_.load() > 0) {
75             this_task::sleep_for(std::chrono::microseconds(timeout_));
76         }
77 
78         if (timeoutCb_ != nullptr) {
79             QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
80             cbTask->DecDeleteRef();
81         }
82     }
83 
84     if (we_ != nullptr) {
85         DelayedRemove(we_->tp, we_);
86         SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
87     }
88     FFRT_LOGI("destruct %s leave", name_.c_str());
89 }
90 
SetLoop(Loop * loop)91 bool QueueHandler::SetLoop(Loop* loop)
92 {
93     FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
94     if (queue_->GetQueueType() == ffrt_queue_eventhandler_interactive) {
95         return true;
96     }
97     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
98         return false, "[queueId=%u] type invalid", GetQueueId());
99     return reinterpret_cast<ConcurrentQueue*>(queue_.get())->SetLoop(loop);
100 }
101 
ClearLoop()102 bool QueueHandler::ClearLoop()
103 {
104     FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
105     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
106         return false, "[queueId=%u] type invalid", GetQueueId());
107     return reinterpret_cast<ConcurrentQueue*>(queue_.get())->ClearLoop();
108 }
109 
PickUpTask()110 QueueTask* QueueHandler::PickUpTask()
111 {
112     FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
113     return queue_->Pull();
114 }
115 
Submit(QueueTask * task)116 void QueueHandler::Submit(QueueTask* task)
117 {
118     FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot submit, [queueId=%u] constructed failed", GetQueueId());
119     FFRT_COND_DO_ERR((task == nullptr), return, "input invalid, serial task is nullptr");
120 
121 #ifdef ENABLE_HITRACE_CHAIN
122     if (HiTraceChainGetId().valid == HITRACE_ID_VALID) {
123         task->traceId_ = HiTraceChainCreateSpan();
124     }
125 #endif
126     // if qos not specified, qos of the queue is inherited by task
127     if (task->GetQos() == qos_inherit || task->GetQos() == qos_default) {
128         task->SetQos(qos_);
129     }
130 
131     uint64_t gid = task->gid;
132     FFRTTraceRecord::TaskSubmit(&(task->createTime), &(task->fromTid));
133 #if (FFRT_TRACE_RECORD_LEVEL < FFRT_TRACE_RECORD_LEVEL_1)
134     if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
135         task->fromTid = ExecuteCtx::Cur()->tid;
136     }
137 #endif
138 
139     // work after that schedule timeout is set for queue
140     if (task->GetSchedTimeout() > 0) {
141         AddSchedDeadline(task);
142     }
143 
144     int ret = queue_->Push(task);
145     if (ret == SUCC) {
146         FFRT_LOGD("submit task[%lu] into %s", gid, name_.c_str());
147         return;
148     }
149     if (ret == FAILED) {
150         return;
151     }
152 
153     if (!isUsed_.load()) {
154         isUsed_.store(true);
155     }
156 
157     // activate queue
158     if (task->GetDelay() == 0) {
159         FFRT_LOGD("task [%llu] activate %s", gid, name_.c_str());
160         TransferTask(task);
161     } else {
162         FFRT_LOGD("task [%llu] with delay [%llu] activate %s", gid, task->GetDelay(), name_.c_str());
163         if (ret == INACTIVE) {
164             queue_->Push(task);
165         }
166         TransferInitTask();
167     }
168 }
169 
Cancel()170 void QueueHandler::Cancel()
171 {
172     FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
173     queue_->Remove();
174 }
175 
CancelAndWait()176 void QueueHandler::CancelAndWait()
177 {
178     FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancelAndWait, [queueId=%u] constructed failed",
179         GetQueueId());
180     queue_->Stop();
181     while ((FFRTFacade::GetQMInstance().QueryQueueStatus(GetQueueId()) != 0) || queue_->GetActiveStatus() ||
182         deliverCnt_.load() > 0) {
183         std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
184     }
185 }
186 
Cancel(const char * name)187 int QueueHandler::Cancel(const char* name)
188 {
189     FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
190          "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
191     int ret = queue_->Remove(name);
192     if (ret != SUCC) {
193         FFRT_LOGD("cancel task %s failed, task may have been executed", name);
194     }
195 
196     return ret;
197 }
198 
Cancel(QueueTask * task)199 int QueueHandler::Cancel(QueueTask* task)
200 {
201     FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
202          "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
203     FFRT_COND_DO_ERR((task == nullptr), return INACTIVE, "input invalid, serial task is nullptr");
204 
205     if (task->GetSchedTimeout() > 0) {
206         RemoveSchedDeadline(task);
207     }
208 
209     int ret = queue_->Remove(task);
210     if (ret == SUCC) {
211         FFRT_LOGD("cancel task[%llu] %s succ", task->gid, task->label.c_str());
212         task->Notify();
213         task->Destroy();
214     } else {
215         FFRT_LOGD("cancel task[%llu] %s failed, task may have been executed", task->gid, task->label.c_str());
216     }
217     return ret;
218 }
219 
Dispatch(QueueTask * inTask)220 void QueueHandler::Dispatch(QueueTask* inTask)
221 {
222     QueueTask* nextTask = nullptr;
223     for (QueueTask* task = inTask; task != nullptr; task = nextTask) {
224         // dfx watchdog
225         SetTimeoutMonitor(task);
226         FFRTFacade::GetQMInstance().UpdateQueueInfo(GetQueueId(), task->gid);
227         execTaskId_.store(task->gid);
228 
229         // run user task
230         FFRT_LOGD("run task [gid=%llu], queueId=%u", task->gid, GetQueueId());
231         auto f = reinterpret_cast<ffrt_function_header_t*>(task->func_storage);
232         FFRTTraceRecord::TaskExecute(&(task->executeTime));
233         if (task->GetSchedTimeout() > 0) {
234             RemoveSchedDeadline(task);
235         }
236 
237         uint64_t triggerTime{0};
238         if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
239             triggerTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
240                 std::chrono::steady_clock::now().time_since_epoch()).count());
241         }
242 
243         f->exec(f);
244         if (task->createTime != 0) {
245             FFRTTraceRecord::TaskDone<ffrt_queue_task>(task->GetQos(), task);
246         }
247         if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
248             uint64_t completeTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
249                 std::chrono::steady_clock::now().time_since_epoch()).count());
250             reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->PushHistoryTask(task, triggerTime, completeTime);
251         }
252 
253         f->destroy(f);
254         task->Notify();
255 
256         // run task batch
257         nextTask = task->GetNextTask();
258         if (nextTask == nullptr) {
259             FFRTFacade::GetQMInstance().ResetQueueInfo(GetQueueId());
260             execTaskId_.store(0);
261             if (!queue_->IsOnLoop()) {
262                 Deliver();
263             }
264         }
265         task->DecDeleteRef();
266     }
267 }
268 
Deliver()269 void QueueHandler::Deliver()
270 {
271     deliverCnt_.fetch_add(1);
272     QueueTask* task = queue_->Pull();
273     deliverCnt_.fetch_sub(1);
274     if (task != nullptr) {
275         TransferTask(task);
276     }
277 }
278 
TransferTask(QueueTask * task)279 void QueueHandler::TransferTask(QueueTask* task)
280 {
281     auto entry = &task->fq_we;
282     if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
283         reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->SetCurrentRunningTask(task);
284     }
285     FFRTScheduler* sch = FFRTFacade::GetSchedInstance();
286     FFRT_READY_MARKER(task->gid); // ffrt queue task ready to enque
287     if (!sch->InsertNode(&entry->node, task->GetQos())) {
288         FFRT_LOGE("failed to insert task [%llu] into %s", task->gid, name_.c_str());
289         return;
290     }
291 }
292 
TransferInitTask()293 void QueueHandler::TransferInitTask()
294 {
295     std::function<void()> initFunc = [] {};
296     auto f = create_function_wrapper(initFunc, ffrt_function_kind_queue);
297     QueueTask* initTask = GetQueueTaskByFuncStorageOffset(f);
298     new (initTask)ffrt::QueueTask(this);
299     initTask->SetQos(qos_);
300     TransferTask(initTask);
301 }
302 
SetTimeoutMonitor(QueueTask * task)303 void QueueHandler::SetTimeoutMonitor(QueueTask* task)
304 {
305     if (timeout_ <= 0) {
306         return;
307     }
308 
309     task->IncDeleteRef();
310     WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
311     // set delayed worker callback
312     we->cb = ([this, task](WaitEntry* we) {
313         if (!task->GetFinishStatus()) {
314             RunTimeOutCallback(task);
315         }
316         delayedCbCnt_.fetch_sub(1);
317         task->DecDeleteRef();
318         SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
319     });
320 
321     // set delayed worker wakeup time
322     std::chrono::microseconds timeout(timeout_);
323     auto now = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
324     we->tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(now + timeout);
325 
326     delayedCbCnt_.fetch_add(1);
327     if (!DelayedWakeup(we->tp, we, we->cb)) {
328         delayedCbCnt_.fetch_sub(1);
329         task->DecDeleteRef();
330         SimpleAllocator<WaitUntilEntry>::FreeMem(we);
331         FFRT_LOGW("failed to set watchdog for task gid=%llu in %s with timeout [%llu us] ", task->gid,
332             name_.c_str(), timeout_);
333         return;
334     }
335 
336     FFRT_LOGD("set watchdog of task gid=%llu of %s succ", task->gid, name_.c_str());
337 }
338 
RunTimeOutCallback(QueueTask * task)339 void QueueHandler::RunTimeOutCallback(QueueTask* task)
340 {
341     std::stringstream ss;
342     std::string processNameStr = std::string(GetCurrentProcessName());
343     ss << "[Serial_Queue_Timeout_Callback] process name:[" << processNameStr << "], serial queue:[" <<
344         name_ << "], queueId:[" << GetQueueId() << "], serial task gid:[" << task->gid << "], task name:["
345         << task->label << "], execution time exceeds[" << timeout_ << "] us";
346     FFRT_LOGE("%s", ss.str().c_str());
347     if (timeoutCb_ != nullptr) {
348         delayedCbCnt_.fetch_add(1);
349         FFRTFacade::GetDWInstance().SubmitAsyncTask([this] {
350             timeoutCb_->exec(timeoutCb_);
351             delayedCbCnt_.fetch_sub(1);
352         });
353     }
354 }
355 
GetDfxInfo() const356 std::string QueueHandler::GetDfxInfo() const
357 {
358     std::stringstream ss;
359     ss << " queue name [" << name_ << "]";
360     if (queue_ != nullptr) {
361         ss << ", remaining tasks count=" << queue_->GetMapSize();
362     }
363     return ss.str();
364 }
365 
IsIdle()366 bool QueueHandler::IsIdle()
367 {
368     FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
369     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
370         return false, "[queueId=%u] type invalid", GetQueueId());
371 
372     return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->IsIdle();
373 }
374 
SetEventHandler(void * eventHandler)375 void QueueHandler::SetEventHandler(void* eventHandler)
376 {
377     FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
378 
379     bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
380         (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
381     FFRT_COND_DO_ERR(typeInvalid, return, "[queueId=%u] type invalid", GetQueueId());
382 
383     reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->SetEventHandler(eventHandler);
384 }
385 
GetEventHandler()386 void* QueueHandler::GetEventHandler()
387 {
388     FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
389 
390     bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
391         (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
392     FFRT_COND_DO_ERR(typeInvalid, return nullptr, "[queueId=%u] type invalid", GetQueueId());
393 
394     return reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->GetEventHandler();
395 }
396 
Dump(const char * tag,char * buf,uint32_t len,bool historyInfo)397 int QueueHandler::Dump(const char* tag, char* buf, uint32_t len, bool historyInfo)
398 {
399     FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
400     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
401         return -1, "[queueId=%u] type invalid", GetQueueId());
402     return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->Dump(tag, buf, len, historyInfo);
403 }
404 
DumpSize(ffrt_inner_queue_priority_t priority)405 int QueueHandler::DumpSize(ffrt_inner_queue_priority_t priority)
406 {
407     FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
408     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
409         return -1, "[queueId=%u] type invalid", GetQueueId());
410     return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->DumpSize(priority);
411 }
412 
SendSchedTimer(TimePoint delay)413 void QueueHandler::SendSchedTimer(TimePoint delay)
414 {
415     we_->tp = delay;
416     bool result = DelayedWakeup(we_->tp, we_, we_->cb);
417     while (!result) {
418         FFRT_LOGW("failed to set delayedworker, retry");
419         we_->tp = std::chrono::steady_clock::now() + std::chrono::microseconds(SCHED_TIME_ACC_ERROR_US);
420         result = DelayedWakeup(we_->tp, we_, we_->cb);
421     }
422 }
423 
CheckSchedDeadline()424 void QueueHandler::CheckSchedDeadline()
425 {
426     std::vector<uint64_t> timeoutTaskId;
427     // Collecting Timeout Tasks
428     {
429         std::unique_lock lock(mutex_);
430         uint64_t threshold = std::chrono::duration_cast<std::chrono::microseconds>(
431                 std::chrono::steady_clock::now().time_since_epoch()).count() + SCHED_TIME_ACC_ERROR_US;
432 
433         auto it = schedDeadline_.begin();
434         uint64_t nextDeadline = UINT64_MAX;
435         while (it != schedDeadline_.end()) {
436             if (it->second < threshold) {
437                 timeoutTaskId.push_back(it->first->gid);
438                 it = schedDeadline_.erase(it);
439             } else {
440                 nextDeadline = std::min(nextDeadline, it->second);
441                 ++it;
442             }
443         }
444 
445         if (schedDeadline_.empty()) {
446             initSchedTimer_ = false;
447         } else {
448             std::chrono::microseconds timeout(nextDeadline);
449             TimePoint tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(
450                 std::chrono::steady_clock::time_point() + timeout);
451             FFRT_LOGI("queueId=%u set sched timer", GetQueueId());
452             SendSchedTimer(tp);
453         }
454     }
455 
456     // Reporting Timeout Information
457     if (!timeoutTaskId.empty()) {
458         ReportTimeout(timeoutTaskId);
459     }
460 }
461 
AddSchedDeadline(QueueTask * task)462 void QueueHandler::AddSchedDeadline(QueueTask* task)
463 {
464     // sched timeout only support serial queues, other queue types will be supported based on service requirements.
465     if (queue_->GetQueueType() != ffrt_queue_serial) {
466         return;
467     }
468 
469     std::unique_lock lock(mutex_);
470     schedDeadline_.insert({task, task->GetSchedTimeout() + task->GetUptime()});
471 
472     if (!initSchedTimer_) {
473         if (we_ == nullptr) {
474             we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
475             we_->cb = ([this](WaitEntry* we) { CheckSchedDeadline(); });
476         }
477         std::chrono::microseconds timeout(schedDeadline_[task]);
478         TimePoint tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(
479             std::chrono::steady_clock::time_point() + timeout);
480         SendSchedTimer(tp);
481         initSchedTimer_ = true;
482     }
483 }
484 
RemoveSchedDeadline(QueueTask * task)485 void QueueHandler::RemoveSchedDeadline(QueueTask* task)
486 {
487     std::unique_lock lock(mutex_);
488     schedDeadline_.erase(task);
489 }
490 
ReportTimeout(const std::vector<uint64_t> & timeoutTaskId)491 void QueueHandler::ReportTimeout(const std::vector<uint64_t>& timeoutTaskId)
492 {
493     std::stringstream ss;
494     ss << "Queue_Schedule_Timeout, queueId=" << GetQueueId() << ", timeout task gid: ";
495     for (auto& id : timeoutTaskId) {
496         ss << id << " ";
497     }
498 
499     FFRT_LOGE("%s", ss.str().c_str());
500 
501     uint32_t queueId = GetQueueId();
502     std::string ssStr = ss.str();
503     if (ffrt_task_timeout_get_cb()) {
504         FFRTFacade::GetDWInstance().SubmitAsyncTask([queueId, ssStr] {
505             ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
506             if (func) {
507                 func(queueId, ssStr.c_str(), ssStr.size());
508             }
509         });
510     }
511 }
512 } // namespace ffrt
513