• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "queue_handler.h"
16 #include <sys/syscall.h>
17 #include <sstream>
18 #include "dfx/log/ffrt_log_api.h"
19 #include "dfx/trace_record/ffrt_trace_record.h"
20 #include "util/event_handler_adapter.h"
21 #include "util/ffrt_facade.h"
22 #include "util/slab.h"
23 #include "tm/queue_task.h"
24 #include "concurrent_queue.h"
25 #include "eventhandler_adapter_queue.h"
26 #include "sched/scheduler.h"
27 
28 namespace {
29 constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
30 constexpr uint32_t STRING_SIZE_MAX = 128;
31 constexpr uint32_t TASK_DONE_WAIT_UNIT = 10;
32 constexpr uint64_t SCHED_TIME_ACC_ERROR_US = 5000; // 5ms
33 constexpr uint32_t CONGESTION_CNT = 5;
34 constexpr uint32_t CONGESTION_TIMEOUT_US = 300000000; //5min
35 }
36 
37 namespace ffrt {
QueueHandler(const char * name,const ffrt_queue_attr_t * attr,const int type)38 QueueHandler::QueueHandler(const char* name, const ffrt_queue_attr_t* attr, const int type)
39 {
40     // parse queue attribute
41     if (attr) {
42         qos_ = (ffrt_queue_attr_get_qos(attr) >= ffrt_qos_background) ? ffrt_queue_attr_get_qos(attr) : qos_;
43         timeout_ = ffrt_queue_attr_get_timeout(attr);
44         timeoutCb_ = ffrt_queue_attr_get_callback(attr);
45     }
46 
47     // callback reference counting is to ensure life cycle
48     if (timeout_ > 0 && timeoutCb_ != nullptr) {
49         QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
50         cbTask->IncDeleteRef();
51     }
52 
53     queue_ = CreateQueue(type, attr);
54     FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
55 
56     if (name != nullptr && std::string(name).size() <= STRING_SIZE_MAX) {
57         name_ = "sq_" + std::string(name) + "_" + std::to_string(GetQueueId());
58     } else {
59         name_ += "sq_unnamed_" + std::to_string(GetQueueId());
60         FFRT_LOGW("failed to set [queueId=%u] name due to invalid name or length.", GetQueueId());
61     }
62 
63     FFRTFacade::GetQMInstance().RegisterQueueId(GetQueueId(), this);
64     FFRT_LOGI("construct %s succ, qos[%d]", name_.c_str(), qos_);
65 }
66 
~QueueHandler()67 QueueHandler::~QueueHandler()
68 {
69     FFRT_LOGI("destruct %s enter", name_.c_str());
70     // clear tasks in queue
71     CancelAndWait();
72     FFRTFacade::GetQMInstance().ResetQueueStruct(GetQueueId());
73 
74     // release callback resource
75     if (timeout_ > 0) {
76         // wait for all delayedWorker to complete.
77         while (delayedCbCnt_.load() > 0) {
78             this_task::sleep_for(std::chrono::microseconds(timeout_));
79         }
80 
81         if (timeoutCb_ != nullptr) {
82             QueueTask* cbTask = GetQueueTaskByFuncStorageOffset(timeoutCb_);
83             cbTask->DecDeleteRef();
84         }
85     }
86 
87     if (we_ != nullptr) {
88         DelayedRemove(we_->tp, we_);
89         SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
90     }
91     FFRT_LOGI("destruct %s leave", name_.c_str());
92 }
93 
SetLoop(Loop * loop)94 bool QueueHandler::SetLoop(Loop* loop)
95 {
96     FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
97     if (queue_->GetQueueType() == ffrt_queue_eventhandler_interactive) {
98         return true;
99     }
100     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
101         return false, "[queueId=%u] type invalid", GetQueueId());
102     return reinterpret_cast<ConcurrentQueue*>(queue_.get())->SetLoop(loop);
103 }
104 
ClearLoop()105 bool QueueHandler::ClearLoop()
106 {
107     FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
108     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_concurrent),
109         return false, "[queueId=%u] type invalid", GetQueueId());
110     return reinterpret_cast<ConcurrentQueue*>(queue_.get())->ClearLoop();
111 }
112 
PickUpTask()113 QueueTask* QueueHandler::PickUpTask()
114 {
115     FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
116     return queue_->Pull();
117 }
118 
Submit(QueueTask * task)119 void QueueHandler::Submit(QueueTask* task)
120 {
121     FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot submit, [queueId=%u] constructed failed", GetQueueId());
122     FFRT_COND_DO_ERR((task == nullptr), return, "input invalid, serial task is nullptr");
123 
124     // if qos not specified, qos of the queue is inherited by task
125     if (task->GetQos() == qos_inherit || task->GetQos() == qos_default) {
126         task->SetQos(qos_);
127     }
128 
129     uint64_t gid = task->gid;
130     FFRT_SERIAL_QUEUE_TASK_SUBMIT_MARKER(GetQueueId(), gid);
131     FFRTTraceRecord::TaskSubmit(&(task->createTime), &(task->fromTid));
132 #if (FFRT_TRACE_RECORD_LEVEL < FFRT_TRACE_RECORD_LEVEL_1)
133     if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
134         task->fromTid = ExecuteCtx::Cur()->tid;
135     }
136 #endif
137 
138     // work after that schedule timeout is set for queue
139     if (task->GetSchedTimeout() > 0) {
140         AddSchedDeadline(task);
141     }
142     if (we_ != nullptr) {
143         CheckOverload();
144     }
145 
146     int ret = queue_->Push(task);
147     if (ret == SUCC) {
148         FFRT_LOGD("submit task[%lu] into %s", gid, name_.c_str());
149         return;
150     }
151     if (ret == FAILED) {
152         return;
153     }
154 
155     if (!isUsed_.load()) {
156         isUsed_.store(true);
157     }
158 
159     // activate queue
160     if (task->GetDelay() == 0) {
161         FFRT_LOGD("task [%llu] activate %s", gid, name_.c_str());
162         TransferTask(task);
163     } else {
164         FFRT_LOGD("task [%llu] with delay [%llu] activate %s", gid, task->GetDelay(), name_.c_str());
165         if (ret == INACTIVE) {
166             queue_->Push(task);
167         }
168         TransferInitTask();
169     }
170 }
171 
Cancel()172 void QueueHandler::Cancel()
173 {
174     FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
175     queue_->Remove();
176 }
177 
CancelAndWait()178 void QueueHandler::CancelAndWait()
179 {
180     FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancelAndWait, [queueId=%u] constructed failed",
181         GetQueueId());
182     queue_->Stop();
183     while (FFRTFacade::GetQMInstance().QueryQueueStatus(GetQueueId()) || queue_->GetActiveStatus() ||
184         deliverCnt_.load() > 0) {
185         std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
186     }
187 }
188 
Cancel(const char * name)189 int QueueHandler::Cancel(const char* name)
190 {
191     FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
192          "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
193     int ret = queue_->Remove(name);
194     if (ret != SUCC) {
195         FFRT_LOGD("cancel task %s failed, task may have been executed", name);
196     }
197 
198     return ret;
199 }
200 
Cancel(QueueTask * task)201 int QueueHandler::Cancel(QueueTask* task)
202 {
203     FFRT_COND_DO_ERR((queue_ == nullptr), return INACTIVE,
204          "cannot cancel, [queueId=%u] constructed failed", GetQueueId());
205     FFRT_COND_DO_ERR((task == nullptr), return INACTIVE, "input invalid, serial task is nullptr");
206 
207     if (task->GetSchedTimeout() > 0) {
208         RemoveSchedDeadline(task);
209     }
210 
211     int ret = queue_->Remove(task);
212     if (ret == SUCC) {
213         FFRT_LOGD("cancel task[%llu] %s succ", task->gid, task->label.c_str());
214         task->Notify();
215         task->Destroy();
216     } else {
217         FFRT_LOGD("cancel task[%llu] %s failed, task may have been executed", task->gid, task->label.c_str());
218     }
219     return ret;
220 }
221 
Dispatch(QueueTask * inTask)222 void QueueHandler::Dispatch(QueueTask* inTask)
223 {
224     QueueTask* nextTask = nullptr;
225     for (QueueTask* task = inTask; task != nullptr; task = nextTask) {
226         // dfx watchdog
227         SetTimeoutMonitor(task);
228         FFRTFacade::GetQMInstance().UpdateQueueInfo(GetQueueId(), task->gid);
229         execTaskId_.store(task->gid);
230 
231         // run user task
232         FFRT_LOGD("run task [gid=%llu], queueId=%u", task->gid, GetQueueId());
233         auto f = reinterpret_cast<ffrt_function_header_t*>(task->func_storage);
234         FFRT_SERIAL_QUEUE_TASK_EXECUTE_MARKER(task->gid);
235         FFRTTraceRecord::TaskExecute(&(task->executeTime));
236         if (task->GetSchedTimeout() > 0) {
237             RemoveSchedDeadline(task);
238         }
239 
240         uint64_t triggerTime{0};
241         if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
242             triggerTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
243                 std::chrono::steady_clock::now().time_since_epoch()).count());
244         }
245 
246         f->exec(f);
247         FFRTTraceRecord::TaskDone<ffrt_queue_task>(task->GetQos(), task);
248         if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
249             uint64_t completeTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
250                 std::chrono::steady_clock::now().time_since_epoch()).count());
251             reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->PushHistoryTask(task, triggerTime, completeTime);
252         }
253 
254         f->destroy(f);
255         task->Notify();
256 
257         // run task batch
258         nextTask = task->GetNextTask();
259         if (nextTask == nullptr) {
260             FFRTFacade::GetQMInstance().ResetQueueInfo(GetQueueId());
261             execTaskId_.store(0);
262             if (!queue_->IsOnLoop()) {
263                 Deliver();
264             }
265         }
266         task->DecDeleteRef();
267     }
268 }
269 
Deliver()270 void QueueHandler::Deliver()
271 {
272     deliverCnt_.fetch_add(1);
273     QueueTask* task = queue_->Pull();
274     deliverCnt_.fetch_sub(1);
275     if (task != nullptr) {
276         TransferTask(task);
277     }
278 }
279 
TransferTask(QueueTask * task)280 void QueueHandler::TransferTask(QueueTask* task)
281 {
282     auto entry = &task->fq_we;
283     if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
284         reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->SetCurrentRunningTask(task);
285     }
286     FFRTScheduler* sch = FFRTFacade::GetSchedInstance();
287     FFRT_READY_MARKER(task->gid); // ffrt queue task ready to enque
288     if (!sch->InsertNode(&entry->node, task->GetQos())) {
289         FFRT_LOGE("failed to insert task [%llu] into %s", task->gid, GetQueueId(), name_.c_str());
290         return;
291     }
292 }
293 
TransferInitTask()294 void QueueHandler::TransferInitTask()
295 {
296     std::function<void()> initFunc = [] {};
297     auto f = create_function_wrapper(initFunc, ffrt_function_kind_queue);
298     QueueTask* initTask = GetQueueTaskByFuncStorageOffset(f);
299     new (initTask)ffrt::QueueTask(this);
300     initTask->SetQos(qos_);
301     TransferTask(initTask);
302 }
303 
SetTimeoutMonitor(QueueTask * task)304 void QueueHandler::SetTimeoutMonitor(QueueTask* task)
305 {
306     if (timeout_ <= 0) {
307         return;
308     }
309 
310     task->IncDeleteRef();
311     WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
312     // set delayed worker callback
313     we->cb = ([this, task](WaitEntry* we) {
314         if (!task->GetFinishStatus()) {
315             RunTimeOutCallback(task);
316         }
317         delayedCbCnt_.fetch_sub(1);
318         task->DecDeleteRef();
319         SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
320     });
321 
322     // set delayed worker wakeup time
323     std::chrono::microseconds timeout(timeout_);
324     auto now = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
325     we->tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(now + timeout);
326 
327     if (!DelayedWakeup(we->tp, we, we->cb)) {
328         task->DecDeleteRef();
329         SimpleAllocator<WaitUntilEntry>::FreeMem(we);
330         FFRT_LOGW("failed to set watchdog for task gid=%llu in %s with timeout [%llu us] ", task->gid,
331             name_.c_str(), timeout_);
332         return;
333     }
334 
335     delayedCbCnt_.fetch_add(1);
336     FFRT_LOGD("set watchdog of task gid=%llu of %s succ", task->gid, name_.c_str());
337 }
338 
RunTimeOutCallback(QueueTask * task)339 void QueueHandler::RunTimeOutCallback(QueueTask* task)
340 {
341     std::stringstream ss;
342     static std::once_flag flag;
343     static char processName[PROCESS_NAME_BUFFER_LENGTH];
344     std::call_once(flag, []() {
345         GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
346     });
347     std::string processNameStr = std::string(processName);
348     ss << "[Serial_Queue_Timeout_Callback] process name:[" << processNameStr << "], serial queue:[" <<
349         name_ << "], queueId:[" << GetQueueId() << "], serial task gid:[" << task->gid << "], task name:["
350         << task->label << "], execution time exceeds[" << timeout_ << "] us";
351     FFRT_LOGE("%s", ss.str().c_str());
352     if (timeoutCb_ != nullptr) {
353         timeoutCb_->exec(timeoutCb_);
354     }
355 }
356 
GetDfxInfo() const357 std::string QueueHandler::GetDfxInfo() const
358 {
359     std::stringstream ss;
360     ss << " queue name [" << name_ << "]";
361     if (queue_ != nullptr) {
362         ss << ", remaining tasks count=" << queue_->GetMapSize();
363     }
364     return ss.str();
365 }
366 
IsIdle()367 bool QueueHandler::IsIdle()
368 {
369     FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
370     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
371         return false, "[queueId=%u] type invalid", GetQueueId());
372 
373     return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->IsIdle();
374 }
375 
SetEventHandler(void * eventHandler)376 void QueueHandler::SetEventHandler(void* eventHandler)
377 {
378     FFRT_COND_DO_ERR((queue_ == nullptr), return, "[queueId=%u] constructed failed", GetQueueId());
379 
380     bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
381         (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
382     FFRT_COND_DO_ERR(typeInvalid, return, "[queueId=%u] type invalid", GetQueueId());
383 
384     reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->SetEventHandler(eventHandler);
385 }
386 
GetEventHandler()387 void* QueueHandler::GetEventHandler()
388 {
389     FFRT_COND_DO_ERR((queue_ == nullptr), return nullptr, "[queueId=%u] constructed failed", GetQueueId());
390 
391     bool typeInvalid = (queue_->GetQueueType() != ffrt_queue_eventhandler_interactive) &&
392         (queue_->GetQueueType() != ffrt_queue_eventhandler_adapter);
393     FFRT_COND_DO_ERR(typeInvalid, return nullptr, "[queueId=%u] type invalid", GetQueueId());
394 
395     return reinterpret_cast<EventHandlerInteractiveQueue*>(queue_.get())->GetEventHandler();
396 }
397 
Dump(const char * tag,char * buf,uint32_t len,bool historyInfo)398 int QueueHandler::Dump(const char* tag, char* buf, uint32_t len, bool historyInfo)
399 {
400     FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
401     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
402         return -1, "[queueId=%u] type invalid", GetQueueId());
403     return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->Dump(tag, buf, len, historyInfo);
404 }
405 
DumpSize(ffrt_inner_queue_priority_t priority)406 int QueueHandler::DumpSize(ffrt_inner_queue_priority_t priority)
407 {
408     FFRT_COND_DO_ERR((queue_ == nullptr), return -1, "[queueId=%u] constructed failed", GetQueueId());
409     FFRT_COND_DO_ERR((queue_->GetQueueType() != ffrt_queue_eventhandler_adapter),
410         return -1, "[queueId=%u] type invalid", GetQueueId());
411     return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->DumpSize(priority);
412 }
413 
SendSchedTimer(TimePoint delay)414 void QueueHandler::SendSchedTimer(TimePoint delay)
415 {
416     we_->tp = delay;
417     bool result = DelayedWakeup(we_->tp, we_, we_->cb);
418     while (!result) {
419         FFRT_LOGW("failed to set delayedworker, retry");
420         we_->tp = std::chrono::steady_clock::now() + std::chrono::microseconds(SCHED_TIME_ACC_ERROR_US);
421         result = DelayedWakeup(we_->tp, we_, we_->cb);
422     }
423 }
424 
CheckSchedDeadline()425 void QueueHandler::CheckSchedDeadline()
426 {
427     std::vector<uint64_t> timeoutTaskId;
428     // Collecting Timeout Tasks
429     {
430         std::unique_lock lock(mutex_);
431         uint64_t threshold = std::chrono::duration_cast<std::chrono::microseconds>(
432                 std::chrono::steady_clock::now().time_since_epoch()).count() + SCHED_TIME_ACC_ERROR_US;
433 
434         auto it = schedDeadline_.begin();
435         uint64_t nextDeadline = UINT64_MAX;
436         while (it != schedDeadline_.end()) {
437             if (it->second < threshold) {
438                 timeoutTaskId.push_back(it->first->gid);
439                 it = schedDeadline_.erase(it);
440             } else {
441                 nextDeadline = std::min(nextDeadline, it->second);
442                 ++it;
443             }
444         }
445 
446         if (schedDeadline_.empty()) {
447             initSchedTimer_ = false;
448         } else {
449             std::chrono::microseconds timeout(nextDeadline);
450             TimePoint tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(
451                 std::chrono::steady_clock::time_point() + timeout);
452             FFRT_LOGI("queueId=%u set sched timer", GetQueueId());
453             SendSchedTimer(tp);
454         }
455     }
456 
457     // Reporting Timeout Information
458     if (!timeoutTaskId.empty()) {
459         ReportTimeout(timeoutTaskId);
460     }
461 }
462 
AddSchedDeadline(QueueTask * task)463 void QueueHandler::AddSchedDeadline(QueueTask* task)
464 {
465     // sched timeout only support serial queues, other queue types will be supported based on service requirements.
466     if (queue_->GetQueueType() != ffrt_queue_serial) {
467         return;
468     }
469 
470     std::unique_lock lock(mutex_);
471     schedDeadline_.insert({task, task->GetSchedTimeout() + task->GetUptime()});
472 
473     if (!initSchedTimer_) {
474         if (we_ == nullptr) {
475             we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
476             we_->cb = ([this](WaitEntry* we_) { CheckSchedDeadline(); });
477         }
478         std::chrono::microseconds timeout(schedDeadline_[task]);
479         TimePoint tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(
480             std::chrono::steady_clock::time_point() + timeout);
481         SendSchedTimer(tp);
482         initSchedTimer_ = true;
483     }
484 }
485 
RemoveSchedDeadline(QueueTask * task)486 void QueueHandler::RemoveSchedDeadline(QueueTask* task)
487 {
488     std::unique_lock lock(mutex_);
489     schedDeadline_.erase(task);
490 }
491 
CheckOverload()492 void QueueHandler::CheckOverload()
493 {
494     if (queue_->GetMapSize() <= CONGESTION_CNT) {
495         return;
496     }
497 
498     uint64_t expect = queue_->GetHeadUptime();
499     uint64_t now = std::chrono::duration_cast<std::chrono::microseconds>(
500             std::chrono::steady_clock::now().time_since_epoch()).count();
501     if (now > expect && now - expect > CONGESTION_TIMEOUT_US * overloadTimes_.load()) {
502         overloadTimes_.fetch_add(1);
503         std::vector<uint64_t> timeoutVec = {};
504         ReportTimeout(timeoutVec);
505     }
506 }
507 
ReportTimeout(const std::vector<uint64_t> & timeoutTaskId)508 void QueueHandler::ReportTimeout(const std::vector<uint64_t>& timeoutTaskId)
509 {
510     std::stringstream ss;
511     ss << "Queue_Schedule_Timeout, queueId=" << GetQueueId() << ", timeout task gid: ";
512     for (auto& id : timeoutTaskId) {
513         ss << id << " ";
514     }
515 
516     FFRT_LOGE("%s", ss.str().c_str());
517     ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
518     if (func) {
519     func(GetQueueId(), ss.str().c_str(), ss.str().size());
520     }
521 }
522 
523 } // namespace ffrt
524