1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "cpu_worker.h"
17 #include "eu/worker_thread.h"
18 #include "ffrt_trace.h"
19 #include "sched/scheduler.h"
20 #include "eu/cpu_manager_strategy.h"
21 #include "dfx/bbox/bbox.h"
22 #include "eu/func_manager.h"
23 #include "dm/dependence_manager.h"
24 #include "dfx/perf/ffrt_perf.h"
25 #include "sync/poller.h"
26 #include "util/spmc_queue.h"
27 #include "util/ffrt_facade.h"
28 #include "tm/cpu_task.h"
29 #include "tm/queue_task.h"
30 #ifdef FFRT_ASYNC_STACKTRACE
31 #include "dfx/async_stack/ffrt_async_stack.h"
32 #endif
33 #include "eu/cpuworker_manager.h"
34 namespace {
35 int PLACE_HOLDER = 0;
36 const unsigned int TRY_POLL_FREQ = 51;
37 }
38
39 namespace ffrt {
Run(CPUEUTask * task,CoRoutineEnv * coRoutineEnv,CPUWorker * worker)40 void CPUWorker::Run(CPUEUTask* task, CoRoutineEnv* coRoutineEnv, CPUWorker* worker)
41 {
42 if constexpr(USE_COROUTINE) {
43 if (CoStart(task, coRoutineEnv) != 0) {
44 worker->localFifo.PushTail(task);
45 }
46 return;
47 }
48
49 switch (task->type) {
50 case ffrt_normal_task: {
51 #ifdef FFRT_ASYNC_STACKTRACE
52 FFRTSetStackId(task->stackId);
53 #endif
54 task->Execute();
55 break;
56 }
57 case ffrt_queue_task: {
58 QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
59 #ifdef FFRT_ASYNC_STACKTRACE
60 FFRTSetStackId(sTask->stackId);
61 #endif
62 sTask->IncDeleteRef();
63 sTask->Execute();
64 sTask->DecDeleteRef();
65 break;
66 }
67 default: {
68 FFRT_LOGE("run unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
69 break;
70 }
71 }
72 }
73
Run(ffrt_executor_task_t * task,ffrt_qos_t qos)74 void CPUWorker::Run(ffrt_executor_task_t* task, ffrt_qos_t qos)
75 {
76 if (task == nullptr) {
77 FFRT_LOGE("task is nullptr");
78 return;
79 }
80 ffrt_executor_task_func func = nullptr;
81 ffrt_executor_task_type_t type = static_cast<ffrt_executor_task_type_t>(task->type);
82 if (type == ffrt_io_task) {
83 func = FuncManager::Instance()->getFunc(ffrt_io_task);
84 } else {
85 func = FuncManager::Instance()->getFunc(ffrt_uv_task);
86 }
87 if (func == nullptr) {
88 FFRT_LOGE("Static func is nullptr");
89 return;
90 }
91 FFRTTraceRecord::TaskExecute<ffrt_uv_task>(qos);
92 FFRT_EXECUTOR_TASK_BEGIN(task);
93 func(task, qos);
94 FFRT_EXECUTOR_TASK_END();
95 if (type != ffrt_io_task) {
96 FFRT_EXECUTOR_TASK_FINISH_MARKER(task); // task finish marker for uv task
97 }
98 FFRTTraceRecord::TaskDone<ffrt_uv_task>(qos);
99 }
100
WrapDispatch(void * worker)101 void* CPUWorker::WrapDispatch(void* worker)
102 {
103 reinterpret_cast<CPUWorker*>(worker)->NativeConfig();
104 Dispatch(reinterpret_cast<CPUWorker*>(worker));
105 return nullptr;
106 }
107
RunTask(ffrt_executor_task_t * curtask,CPUWorker * worker)108 void CPUWorker::RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker)
109 {
110 ExecuteCtx* ctx = ExecuteCtx::Cur();
111 CoRoutineEnv* coRoutineEnv = GetCoEnv();
112 RunTask(curtask, worker, ctx, coRoutineEnv);
113 }
114
RunTask(ffrt_executor_task_t * curtask,CPUWorker * worker,ExecuteCtx * ctx,CoRoutineEnv * coRoutineEnv)115 void CPUWorker::RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker, ExecuteCtx* ctx, CoRoutineEnv* coRoutineEnv)
116 {
117 CPUEUTask* task = reinterpret_cast<CPUEUTask*>(curtask);
118 worker->curTask = task;
119 worker->curTaskType_ = task->type;
120 switch (curtask->type) {
121 case ffrt_normal_task:
122 case ffrt_queue_task: {
123 #ifdef WORKER_CACHE_TASKNAMEID
124 worker->curTaskLabel_ = task->label;
125 worker->curTaskGid_ = task->gid;
126 #endif
127 ctx->task = task;
128 ctx->lastGid_ = task->gid;
129 Run(task, coRoutineEnv, worker);
130 ctx->task = nullptr;
131 break;
132 }
133 default: {
134 ctx->exec_task = curtask;
135 Run(curtask, static_cast<ffrt_qos_t>(worker->GetQos()));
136 ctx->exec_task = nullptr;
137 break;
138 }
139 }
140 worker->curTask = nullptr;
141 worker->curTaskType_ = ffrt_invalid_task;
142 }
143
RunTaskLifo(ffrt_executor_task_t * task,CPUWorker * worker)144 void CPUWorker::RunTaskLifo(ffrt_executor_task_t* task, CPUWorker* worker)
145 {
146 RunTask(task, worker);
147
148 unsigned int lifoCount = 0;
149 while (worker->priority_task != nullptr && worker->priority_task != &PLACE_HOLDER) {
150 lifoCount++;
151 ffrt_executor_task_t* priorityTask = reinterpret_cast<ffrt_executor_task_t*>(worker->priority_task);
152 // set a placeholder to prevent the task from being placed in the priority again
153 worker->priority_task = (lifoCount > worker->budget) ? &PLACE_HOLDER : nullptr;
154
155 RunTask(priorityTask, worker);
156 }
157 }
158
GetTask(CPUWorker * worker)159 void* CPUWorker::GetTask(CPUWorker* worker)
160 {
161 // periodically pick up tasks from the global queue to prevent global queue starvation
162 if (worker->tick % worker->global_interval == 0) {
163 worker->tick = 0;
164 CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
165 // the worker is not notified when the task attribute is set not to notify worker
166 if (task != nullptr) {
167 if (task->type == ffrt_normal_task && !task->notifyWorker_) {
168 task->notifyWorker_ = true;
169 return task;
170 }
171 worker->ops.NotifyTaskPicked(worker);
172 }
173 return task;
174 }
175
176 // preferentially pick up tasks from the priority unless the priority is empty or occupied
177 if (worker->priority_task != nullptr) {
178 void* task = worker->priority_task;
179 worker->priority_task = nullptr;
180 if (task != &PLACE_HOLDER) {
181 return task;
182 }
183 }
184
185 return worker->localFifo.PopHead();
186 }
187
TryPoll(CPUWorker * worker,int timeout)188 PollerRet CPUWorker::TryPoll(CPUWorker* worker, int timeout)
189 {
190 PollerRet ret = worker->ops.TryPoll(worker, timeout);
191 if (ret == PollerRet::RET_TIMER) {
192 worker->tick = 0;
193 }
194
195 return ret;
196 }
197
LocalEmpty(CPUWorker * worker)198 bool CPUWorker::LocalEmpty(CPUWorker* worker)
199 {
200 return ((worker->priority_task == nullptr) && (worker->localFifo.GetLength() == 0));
201 }
202
Dispatch(CPUWorker * worker)203 void CPUWorker::Dispatch(CPUWorker* worker)
204 {
205 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
206 if (worker->ops.IsBlockAwareInit()) {
207 int ret = BlockawareRegister(worker->GetDomainId());
208 if (ret != 0) {
209 FFRT_LOGE("blockaware register fail, ret[%d]", ret);
210 }
211 }
212 #endif
213 auto ctx = ExecuteCtx::Cur();
214 ctx->localFifo = &(worker->localFifo);
215 ctx->priority_task_ptr = &(worker->priority_task);
216 ctx->qos = worker->GetQos();
217
218 worker->ops.WorkerPrepare(worker);
219 #ifndef OHOS_STANDARD_SYSTEM
220 FFRT_LOGI("qos[%d] thread start succ", static_cast<int>(worker->GetQos()));
221 #endif
222 FFRT_PERF_WORKER_AWAKE(static_cast<int>(worker->GetQos()));
223 worker->ops.WorkerLooper(worker);
224 CoWorkerExit();
225 worker->ops.WorkerRetired(worker);
226 }
227
228 // work looper which inherited from history
WorkerLooperDefault(WorkerThread * p)229 void CPUWorker::WorkerLooperDefault(WorkerThread* p)
230 {
231 CPUWorker* worker = reinterpret_cast<CPUWorker*>(p);
232 for (;;) {
233 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
234 if (!worker->ops.IsExceedRunningThreshold(worker)) {
235 #endif
236 // get task in the order of priority -> local queue -> global queue
237 void* local_task = GetTask(worker);
238 worker->tick++;
239 if (local_task) {
240 if (worker->tick % TRY_POLL_FREQ == 0) {
241 worker->ops.TryPoll(worker, 0);
242 }
243 ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(local_task);
244 RunTaskLifo(work, worker);
245 continue;
246 }
247
248 PollerRet ret = TryPoll(worker, 0);
249 if (ret != PollerRet::RET_NULL) {
250 continue;
251 }
252
253 // pick up tasks from global queue
254 CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
255 // the worker is not notified when the task attribute is set not to notify worker
256 if (task != nullptr) {
257 if (task->type == ffrt_normal_task && !task->notifyWorker_) {
258 task->notifyWorker_ = true;
259 } else {
260 worker->ops.NotifyTaskPicked(worker);
261 }
262 ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(task);
263 RunTask(work, worker);
264 continue;
265 }
266
267 // check the epoll status again to prevent fd or timer events from being missed
268 ret = TryPoll(worker, 0);
269 if (ret != PollerRet::RET_NULL) {
270 continue;
271 }
272
273 if (worker->localFifo.GetLength() == 0) {
274 worker->ops.StealTaskBatch(worker);
275 }
276
277 if (!LocalEmpty(worker)) {
278 worker->tick = 1;
279 continue;
280 }
281
282 // enable a worker to enter the epoll wait -1 state and continuously listen to fd or timer events
283 // only one worker enters this state at a QoS level
284 ret = TryPoll(worker, -1);
285 if (ret != PollerRet::RET_NULL) {
286 continue;
287 }
288 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
289 }
290 #endif
291 auto action = worker->ops.WaitForNewAction(worker);
292 if (action == WorkerAction::RETRY) {
293 worker->tick = 0;
294 continue;
295 } else if (action == WorkerAction::RETIRE) {
296 break;
297 }
298 }
299 }
300
301 // work looper with standard procedure which could be strategical
WorkerLooperStandard(WorkerThread * p)302 void CPUWorker::WorkerLooperStandard(WorkerThread* p)
303 {
304 CPUWorker* worker = reinterpret_cast<CPUWorker*>(p);
305 auto mgr = reinterpret_cast<CPUWorkerManager*>(p->worker_mgr);
306 auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(p->GetQos());
307 auto lock = mgr->GetSleepCtl(static_cast<int>(p->GetQos()));
308 ExecuteCtx* ctx = ExecuteCtx::Cur();
309 CoRoutineEnv* coRoutineEnv = GetCoEnv();
310 for (;;) {
311 // try get task
312 CPUEUTask* task = nullptr;
313 if (!mgr->tearDown) {
314 std::lock_guard lg(*lock);
315 task = sched.PickNextTask();
316 }
317
318 // if succ, notify picked and run task
319 if (task != nullptr) {
320 mgr->NotifyTaskPicked(worker);
321 RunTask(reinterpret_cast<ffrt_executor_task_t*>(task), worker, ctx, coRoutineEnv);
322 continue;
323 }
324 // otherwise, worker wait action
325 auto action = worker->ops.WaitForNewAction(worker);
326 if (action == WorkerAction::RETRY) {
327 continue;
328 } else if (action == WorkerAction::RETIRE) {
329 break;
330 }
331 }
332 }
333 } // namespace ffrt
334