1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "cpu_worker.h"
17 #include "eu/worker_thread.h"
18 #include "ffrt_trace.h"
19 #include "sched/scheduler.h"
20 #include "eu/cpu_manager_interface.h"
21 #include "dfx/bbox/bbox.h"
22 #include "eu/func_manager.h"
23 #include "dm/dependence_manager.h"
24 #ifdef FFRT_IO_TASK_SCHEDULER
25 #include "sync/poller.h"
26 #include "util/spmc_queue.h"
27 #endif
28 #include "tm/cpu_task.h"
29
30 namespace ffrt {
31 const int PLACE_HOLDER = 0;
32 const unsigned int TRY_POLL_FREQ = 51;
33 }
34
35 namespace ffrt {
Run(CPUEUTask * task)36 void CPUWorker::Run(CPUEUTask* task)
37 {
38 FFRT_TRACE_SCOPE(TRACE_LEVEL2, Run);
39 if constexpr(USE_COROUTINE) {
40 CoStart(task);
41 } else {
42 auto f = reinterpret_cast<ffrt_function_header_t*>(task->func_storage);
43 auto exp = ffrt::SkipStatus::SUBMITTED;
44 if (likely(__atomic_compare_exchange_n(&task->skipped, &exp, ffrt::SkipStatus::EXECUTED, 0,
45 __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
46 FFRT_TASK_BEGIN(task->label, task->gid);
47 f->exec(f);
48 FFRT_TASK_END();
49 }
50 f->destroy(f);
51 task->UpdateState(ffrt::TaskState::EXITED);
52 }
53 }
54
Run(ffrt_executor_task_t * task,ffrt_qos_t qos)55 void CPUWorker::Run(ffrt_executor_task_t* task, ffrt_qos_t qos)
56 {
57 #ifdef FFRT_BBOX_ENABLE
58 TaskRunCounterInc();
59 #endif
60
61 ffrt_executor_task_func func = nullptr;
62 ffrt_executor_task_type_t type = static_cast<ffrt_executor_task_type_t>(task->type);
63 if (type == ffrt_io_task) {
64 func = FuncManager::Instance()->getFunc(ffrt_io_task);
65 } else {
66 func = FuncManager::Instance()->getFunc(ffrt_uv_task);
67 }
68 if (func == nullptr) {
69 FFRT_LOGE("Static func is nullptr");
70 return;
71 }
72
73 FFRT_EXECUTOR_TASK_BEGIN(task);
74 func(task, qos);
75 FFRT_EXECUTOR_TASK_END();
76 if (type != ffrt_io_task) {
77 FFRT_EXECUTOR_TASK_FINISH_MARKER(task); // task finish marker for uv task
78 }
79 #ifdef FFRT_BBOX_ENABLE
80 TaskFinishCounterInc();
81 #endif
82 }
83
WarpDispatch(void * worker)84 void* CPUWorker::WarpDispatch(void* worker)
85 {
86 reinterpret_cast<CPUWorker*>(worker)->NativeConfig();
87 Dispatch(reinterpret_cast<CPUWorker*>(worker));
88 return nullptr;
89 }
90
91 #ifdef FFRT_IO_TASK_SCHEDULER
RunTask(ffrt_executor_task_t * curtask,CPUWorker * worker,CPUEUTask * & lastTask)92 void CPUWorker::RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker, CPUEUTask* &lastTask)
93 {
94 auto ctx = ExecuteCtx::Cur();
95 CPUEUTask* task = reinterpret_cast<CPUEUTask*>(curtask);
96 if (curtask->type != 0) {
97 ctx->exec_task = curtask;
98 worker->curTask = task;
99 Run(curtask, static_cast<ffrt_qos_t>(worker->GetQos()));
100 worker->curTask = nullptr;
101 ctx->exec_task = nullptr;
102 } else {
103 FFRT_LOGD("EU pick task[%lu]", task->gid);
104 task->UpdateState(TaskState::RUNNING);
105
106 lastTask = task;
107 ctx->task = task;
108 worker->curTask = task;
109 Run(task);
110 worker->curTask = nullptr;
111 ctx->task = nullptr;
112 }
113 }
114
RunTaskLifo(ffrt_executor_task_t * task,CPUWorker * worker,CPUEUTask * & lastTask)115 void CPUWorker::RunTaskLifo(ffrt_executor_task_t* task, CPUWorker* worker, CPUEUTask* &lastTask)
116 {
117 RunTask(task, worker, lastTask);
118
119 unsigned int lifoCount = 0;
120 while (worker->priority_task != nullptr && worker->priority_task != &PLACE_HOLDER) {
121 lifoCount++;
122 ffrt_executor_task_t* priorityTask = reinterpret_cast<ffrt_executor_task_t*>(worker->priority_task);
123 // set a placeholder to prevent the task from being placed in the priority again
124 worker->priority_task = (lifoCount > worker->budget) ? const_cast<int*>(&PLACE_HOLDER) : nullptr;
125
126 RunTask(priorityTask, worker, lastTask);
127 }
128 }
129
GetTask(CPUWorker * worker)130 void* CPUWorker::GetTask(CPUWorker* worker)
131 {
132 // periodically pick up tasks from the global queue to prevent global queue starvation
133 if (worker->tick % worker->global_interval == 0) {
134 worker->tick = 0;
135 void* task = worker->ops.PickUpTaskBatch(worker);
136 if (task != nullptr) {
137 worker->ops.NotifyTaskPicked(worker);
138 }
139 return task;
140 }
141
142 // preferentially pick up tasks from the priority unless the priority is empty or occupied
143 if (worker->priority_task != nullptr) {
144 void* task = worker->priority_task;
145 worker->priority_task = nullptr;
146 if (task != &PLACE_HOLDER) {
147 return task;
148 }
149 }
150
151 return worker->localFifo.PopHead();
152 }
153
TryPoll(CPUWorker * worker,int timeout)154 PollerRet CPUWorker::TryPoll(CPUWorker* worker, int timeout)
155 {
156 PollerRet ret = worker->ops.TryPoll(worker, timeout);
157 if (ret == PollerRet::RET_TIMER) {
158 worker->tick = 0;
159 }
160
161 return ret;
162 }
163
LocalEmpty(CPUWorker * worker)164 bool CPUWorker::LocalEmpty(CPUWorker* worker)
165 {
166 return (worker->priority_task == nullptr) && (worker->localFifo.GetLength() == 0);
167 }
168
Dispatch(CPUWorker * worker)169 void CPUWorker::Dispatch(CPUWorker* worker)
170 {
171 auto ctx = ExecuteCtx::Cur();
172 ctx->localFifo = &(worker->localFifo);
173 ctx->priority_task_ptr = &(worker->priority_task);
174 ctx->qos = worker->GetQos();
175 CPUEUTask* lastTask = nullptr;
176
177 FFRT_LOGD("qos[%d] thread start succ", (int)worker->GetQos());
178 for (;;) {
179 FFRT_LOGD("task picking");
180 // get task in the order of priority -> local queue -> global queue
181 void* local_task = GetTask(worker);
182 worker->tick++;
183 if (local_task) {
184 if (worker->tick % TRY_POLL_FREQ == 0) {
185 worker->ops.TryPoll(worker, 0);
186 }
187 ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(local_task);
188 RunTaskLifo(work, worker, lastTask);
189 continue;
190 }
191
192 PollerRet ret = TryPoll(worker, 0);
193 if (ret != PollerRet::RET_NULL) {
194 continue;
195 }
196
197 // pick up tasks from global queue
198 CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
199 if (task) {
200 worker->ops.NotifyTaskPicked(worker);
201 ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(task);
202 RunTask(work, worker, lastTask);
203 continue;
204 }
205
206 // check the epoll status again to prevent fd or timer events from being missed
207 ret = TryPoll(worker, 0);
208 if (ret != PollerRet::RET_NULL) {
209 continue;
210 }
211
212 if (worker->localFifo.GetLength() == 0) {
213 worker->ops.StealTaskBatch(worker);
214 }
215 if (!LocalEmpty(worker)) {
216 worker->tick = 1;
217 continue;
218 }
219
220 // enable a worker to enter the epoll wait -1 state and continuously listen to fd or timer events
221 // only one worker enters this state at a QoS level
222 ret = TryPoll(worker, -1);
223 if (ret != PollerRet::RET_NULL) {
224 continue;
225 }
226
227 FFRT_WORKER_IDLE_BEGIN_MARKER();
228 auto action = worker->ops.WaitForNewAction(worker);
229 FFRT_WORKER_IDLE_END_MARKER();
230 if (action == WorkerAction::RETRY) {
231 worker->tick = 0;
232 continue;
233 } else if (action == WorkerAction::RETIRE) {
234 break;
235 }
236 }
237
238 CoWorkerExit();
239 FFRT_LOGD("ExecutionThread exited");
240 free(worker->steal_buffer);
241 worker->ops.WorkerRetired(worker);
242 }
243 #else
Dispatch(CPUWorker * worker)244 void CPUWorker::Dispatch(CPUWorker* worker)
245 {
246 auto ctx = ExecuteCtx::Cur();
247 CPUEUTask* lastTask = nullptr;
248
249 worker->ops.WorkerPrepare(worker);
250 FFRT_LOGD("qos[%d] thread start succ", static_cast<int>(worker->GetQos()));
251 for (;;) {
252 FFRT_LOGD("task picking");
253 CPUEUTask* task = worker->ops.PickUpTask(worker);
254 if (task) {
255 worker->ops.NotifyTaskPicked(worker);
256 } else {
257 FFRT_WORKER_IDLE_BEGIN_MARKER();
258 auto action = worker->ops.WaitForNewAction(worker);
259 FFRT_WORKER_IDLE_END_MARKER();
260 if (action == WorkerAction::RETRY) {
261 continue;
262 } else if (action == WorkerAction::RETIRE) {
263 break;
264 }
265 }
266
267 BboxCheckAndFreeze();
268
269 if (task->type != 0) {
270 worker->curTask = task;
271 ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(task);
272 Run(work, static_cast<ffrt_qos_t>(worker->GetQos()));
273 } else {
274 FFRT_LOGD("EU pick task[%lu]", task->gid);
275 task->UpdateState(TaskState::RUNNING);
276
277 lastTask = task;
278 ctx->task = task;
279 worker->curTask = task;
280 Run(task);
281 }
282 BboxCheckAndFreeze();
283 worker->curTask = nullptr;
284 ctx->task = nullptr;
285 }
286
287 CoWorkerExit();
288 FFRT_LOGD("ExecutionThread exited");
289 worker->ops.WorkerRetired(worker);
290 }
291 #endif
292 } // namespace ffrt
293