1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "tm/queue_task.h"
16 #include "dfx/trace/ffrt_trace.h"
17 #include "dfx/log/ffrt_log_api.h"
18 #include "dfx/trace_record/ffrt_trace_record.h"
19 #include "c/task.h"
20 #include "util/ffrt_facade.h"
21 #include "tm/task_factory.h"
22
23 namespace {
24 constexpr uint64_t MIN_SCHED_TIMEOUT = 100000; // 0.1s
25 }
26 namespace ffrt {
QueueTask(QueueHandler * handler,const task_attr_private * attr,bool insertHead)27 QueueTask::QueueTask(QueueHandler* handler, const task_attr_private* attr, bool insertHead)
28 : CoTask(ffrt_queue_task, attr), handler_(handler), insertHead_(insertHead)
29 {
30 if (handler) {
31 if (attr) {
32 label = handler->GetName() + "_" + attr->name_ + "_" + std::to_string(gid);
33 } else {
34 label = handler->GetName() + "_" + std::to_string(gid);
35 }
36 threadMode_ = handler->GetMode();
37 }
38
39 uptime_ = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
40 std::chrono::steady_clock::now().time_since_epoch()).count());
41
42 if (attr) {
43 delay_ = attr->delay_;
44 qos_ = attr->qos_;
45 uptime_ += delay_;
46 prio_ = attr->prio_;
47 if (delay_ && attr->timeout_) {
48 FFRT_SYSEVENT_LOGW("task [gid=%llu] not support delay and timeout at the same time, timeout ignored", gid);
49 } else if (attr->timeout_) {
50 schedTimeout_ = std::max(attr->timeout_, MIN_SCHED_TIMEOUT); // min 0.1s
51 }
52 }
53
54 FFRT_LOGD("ctor task [gid=%llu], delay=%lluus, type=%lu, prio=%d, timeout=%luus", gid, delay_, type, prio_,
55 schedTimeout_);
56 }
57
~QueueTask()58 QueueTask::~QueueTask()
59 {
60 FFRT_LOGD("dtor task [gid=%llu]", gid);
61 }
62
Prepare()63 void QueueTask::Prepare()
64 {
65 SetStatus(TaskStatus::ENQUEUED);
66 FFRTTraceRecord::TaskSubmit<ffrt_queue_task>(qos_, &createTime, &fromTid);
67 #ifdef FFRT_ENABLE_HITRACE_CHAIN
68 if (TraceChainAdapter::Instance().HiTraceChainGetId().valid == HITRACE_ID_VALID) {
69 traceId_ = TraceChainAdapter::Instance().HiTraceChainCreateSpan();
70 }
71 #endif
72 }
73
Ready()74 void QueueTask::Ready()
75 {
76 QoS taskQos = qos_;
77 FFRTTraceRecord::TaskSubmit<ffrt_queue_task>(taskQos);
78 this->SetStatus(TaskStatus::READY);
79 bool isRisingEdge = FFRTFacade::GetSchedInstance()->GetScheduler(taskQos).PushTaskGlobal(this, false);
80 FFRTTraceRecord::TaskEnqueue<ffrt_queue_task>(taskQos);
81 FFRTFacade::GetEUInstance().NotifyTask<TaskNotifyType::TASK_ADDED_RTQ>(taskQos, false, isRisingEdge);
82 }
83
Finish()84 void QueueTask::Finish()
85 {
86 if (createTime != 0) {
87 FFRTTraceRecord::TaskDone<ffrt_queue_task>(qos_(), this);
88 }
89 auto f = reinterpret_cast<ffrt_function_header_t *>(func_storage);
90 if (f->destroy) {
91 f->destroy(f);
92 }
93 Notify();
94 FFRT_TASKDONE_MARKER(gid);
95 }
96
FreeMem()97 void QueueTask::FreeMem()
98 {
99 // only tasks which called ffrt_poll_ctl may have cached events
100 if (pollerEnable) {
101 FFRTFacade::GetPPInstance().ClearCachedEvents(this);
102 }
103 TaskFactory<QueueTask>::Free(this);
104 }
105
Destroy()106 void QueueTask::Destroy()
107 {
108 // release user func
109 auto f = reinterpret_cast<ffrt_function_header_t*>(func_storage);
110 if (f->destroy) {
111 f->destroy(f);
112 }
113 // free serial task object
114 DecDeleteRef();
115 }
116
Notify()117 void QueueTask::Notify()
118 {
119 std::lock_guard lock(mutex_);
120 isFinished_.store(true);
121 if (onWait_) {
122 waitCond_.notify_all();
123 }
124 }
125
Execute()126 void QueueTask::Execute()
127 {
128 IncDeleteRef();
129 FFRT_LOGD("Execute stask[%lu], name[%s]", gid, label.c_str());
130 if (isFinished_.load()) {
131 FFRT_SYSEVENT_LOGE("task [gid=%llu] is complete, no need to execute again", gid);
132 DecDeleteRef();
133 return;
134 }
135 handler_->Dispatch(this);
136 UnbindCoRoutione();
137 DecDeleteRef();
138 }
139
Wait()140 void QueueTask::Wait()
141 {
142 std::unique_lock lock(mutex_);
143 onWait_ = true;
144 while (!isFinished_.load()) {
145 waitCond_.wait(lock);
146 }
147 }
148
GetQueueId() const149 uint32_t QueueTask::GetQueueId() const
150 {
151 return handler_->GetQueueId();
152 }
153 } // namespace ffrt
154