• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "cpu_worker.h"
17 #include "eu/worker_thread.h"
18 #include "ffrt_trace.h"
19 #include "sched/scheduler.h"
20 #include "eu/cpu_manager_strategy.h"
21 #include "dfx/bbox/bbox.h"
22 #include "eu/func_manager.h"
23 #include "dm/dependence_manager.h"
24 #include "dfx/perf/ffrt_perf.h"
25 #include "sync/poller.h"
26 #include "util/spmc_queue.h"
27 #include "util/ffrt_facade.h"
28 #include "tm/cpu_task.h"
29 #include "tm/queue_task.h"
30 #ifdef FFRT_ASYNC_STACKTRACE
31 #include "dfx/async_stack/ffrt_async_stack.h"
32 #endif
33 #include "eu/cpuworker_manager.h"
34 namespace {
35 int PLACE_HOLDER = 0;
36 const unsigned int TRY_POLL_FREQ = 51;
37 }
38 
39 namespace ffrt {
Run(CPUEUTask * task,CoRoutineEnv * coRoutineEnv,CPUWorker * worker)40 void CPUWorker::Run(CPUEUTask* task, CoRoutineEnv* coRoutineEnv, CPUWorker* worker)
41 {
42     if constexpr(USE_COROUTINE) {
43         if (CoStart(task, coRoutineEnv) != 0) {
44             worker->localFifo.PushTail(task);
45         }
46         return;
47     }
48 
49     switch (task->type) {
50         case ffrt_normal_task: {
51 #ifdef FFRT_ASYNC_STACKTRACE
52             FFRTSetStackId(task->stackId);
53 #endif
54             task->Execute();
55             break;
56         }
57         case ffrt_queue_task: {
58             QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
59 #ifdef FFRT_ASYNC_STACKTRACE
60             FFRTSetStackId(sTask->stackId);
61 #endif
62             sTask->IncDeleteRef();
63             sTask->Execute();
64             sTask->DecDeleteRef();
65             break;
66         }
67         default: {
68             FFRT_LOGE("run unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
69             break;
70         }
71     }
72 }
73 
Run(ffrt_executor_task_t * task,ffrt_qos_t qos)74 void CPUWorker::Run(ffrt_executor_task_t* task, ffrt_qos_t qos)
75 {
76     if (task == nullptr) {
77         FFRT_LOGE("task is nullptr");
78         return;
79     }
80     ffrt_executor_task_func func = nullptr;
81     ffrt_executor_task_type_t type = static_cast<ffrt_executor_task_type_t>(task->type);
82     if (type == ffrt_io_task) {
83         func = FuncManager::Instance()->getFunc(ffrt_io_task);
84     } else {
85         func = FuncManager::Instance()->getFunc(ffrt_uv_task);
86     }
87     if (func == nullptr) {
88         FFRT_LOGE("Static func is nullptr");
89         return;
90     }
91     FFRTTraceRecord::TaskExecute<ffrt_uv_task>(qos);
92     FFRT_EXECUTOR_TASK_BEGIN(task);
93     func(task, qos);
94     FFRT_EXECUTOR_TASK_END();
95     if (type != ffrt_io_task) {
96         FFRT_EXECUTOR_TASK_FINISH_MARKER(task); // task finish marker for uv task
97     }
98     FFRTTraceRecord::TaskDone<ffrt_uv_task>(qos);
99 }
100 
WrapDispatch(void * worker)101 void* CPUWorker::WrapDispatch(void* worker)
102 {
103     reinterpret_cast<CPUWorker*>(worker)->NativeConfig();
104     Dispatch(reinterpret_cast<CPUWorker*>(worker));
105     return nullptr;
106 }
107 
RunTask(ffrt_executor_task_t * curtask,CPUWorker * worker)108 void CPUWorker::RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker)
109 {
110     ExecuteCtx* ctx = ExecuteCtx::Cur();
111     CoRoutineEnv* coRoutineEnv = GetCoEnv();
112     RunTask(curtask, worker, ctx, coRoutineEnv);
113 }
114 
RunTask(ffrt_executor_task_t * curtask,CPUWorker * worker,ExecuteCtx * ctx,CoRoutineEnv * coRoutineEnv)115 void CPUWorker::RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker, ExecuteCtx* ctx, CoRoutineEnv* coRoutineEnv)
116 {
117     CPUEUTask* task = reinterpret_cast<CPUEUTask*>(curtask);
118     worker->curTask = task;
119     worker->curTaskType_ = task->type;
120     switch (curtask->type) {
121         case ffrt_normal_task:
122         case ffrt_queue_task: {
123 #ifdef WORKER_CACHE_TASKNAMEID
124             worker->curTaskLabel_ = task->label;
125             worker->curTaskGid_ = task->gid;
126 #endif
127             ctx->task = task;
128             ctx->lastGid_ = task->gid;
129             Run(task, coRoutineEnv, worker);
130             ctx->task = nullptr;
131             break;
132         }
133         default: {
134             ctx->exec_task = curtask;
135             Run(curtask, static_cast<ffrt_qos_t>(worker->GetQos()));
136             ctx->exec_task = nullptr;
137             break;
138         }
139     }
140     worker->curTask = nullptr;
141     worker->curTaskType_ = ffrt_invalid_task;
142 }
143 
RunTaskLifo(ffrt_executor_task_t * task,CPUWorker * worker)144 void CPUWorker::RunTaskLifo(ffrt_executor_task_t* task, CPUWorker* worker)
145 {
146     RunTask(task, worker);
147 
148     unsigned int lifoCount = 0;
149     while (worker->priority_task != nullptr && worker->priority_task != &PLACE_HOLDER) {
150         lifoCount++;
151         ffrt_executor_task_t* priorityTask = reinterpret_cast<ffrt_executor_task_t*>(worker->priority_task);
152         // set a placeholder to prevent the task from being placed in the priority again
153         worker->priority_task = (lifoCount > worker->budget) ? &PLACE_HOLDER : nullptr;
154 
155         RunTask(priorityTask, worker);
156     }
157 }
158 
GetTask(CPUWorker * worker)159 void* CPUWorker::GetTask(CPUWorker* worker)
160 {
161     // periodically pick up tasks from the global queue to prevent global queue starvation
162     if (worker->tick % worker->global_interval == 0) {
163         worker->tick = 0;
164         CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
165         // the worker is not notified when the task attribute is set not to notify worker
166         if (task != nullptr) {
167             if (task->type == ffrt_normal_task && !task->notifyWorker_) {
168                 task->notifyWorker_ = true;
169                 return task;
170             }
171             worker->ops.NotifyTaskPicked(worker);
172         }
173         return task;
174     }
175 
176     // preferentially pick up tasks from the priority unless the priority is empty or occupied
177     if (worker->priority_task != nullptr) {
178         void* task = worker->priority_task;
179         worker->priority_task = nullptr;
180         if (task != &PLACE_HOLDER) {
181             return task;
182         }
183     }
184 
185     return worker->localFifo.PopHead();
186 }
187 
TryPoll(CPUWorker * worker,int timeout)188 PollerRet CPUWorker::TryPoll(CPUWorker* worker, int timeout)
189 {
190     PollerRet ret = worker->ops.TryPoll(worker, timeout);
191     if (ret == PollerRet::RET_TIMER) {
192         worker->tick = 0;
193     }
194 
195     return ret;
196 }
197 
LocalEmpty(CPUWorker * worker)198 bool CPUWorker::LocalEmpty(CPUWorker* worker)
199 {
200     return ((worker->priority_task == nullptr) && (worker->localFifo.GetLength() == 0));
201 }
202 
Dispatch(CPUWorker * worker)203 void CPUWorker::Dispatch(CPUWorker* worker)
204 {
205 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
206     if (worker->ops.IsBlockAwareInit()) {
207         int ret = BlockawareRegister(worker->GetDomainId());
208         if (ret != 0) {
209             FFRT_LOGE("blockaware register fail, ret[%d]", ret);
210         }
211     }
212 #endif
213     auto ctx = ExecuteCtx::Cur();
214     ctx->localFifo = &(worker->localFifo);
215     ctx->priority_task_ptr = &(worker->priority_task);
216     ctx->qos = worker->GetQos();
217 
218     worker->ops.WorkerPrepare(worker);
219 #ifndef OHOS_STANDARD_SYSTEM
220     FFRT_LOGI("qos[%d] thread start succ", static_cast<int>(worker->GetQos()));
221 #endif
222     FFRT_PERF_WORKER_AWAKE(static_cast<int>(worker->GetQos()));
223     worker->ops.WorkerLooper(worker);
224     CoWorkerExit();
225     worker->ops.WorkerRetired(worker);
226 }
227 
228 // work looper which inherited from history
WorkerLooperDefault(WorkerThread * p)229 void CPUWorker::WorkerLooperDefault(WorkerThread* p)
230 {
231     CPUWorker* worker = reinterpret_cast<CPUWorker*>(p);
232     for (;;) {
233 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
234         if (!worker->ops.IsExceedRunningThreshold(worker)) {
235 #endif
236         // get task in the order of priority -> local queue -> global queue
237         void* local_task = GetTask(worker);
238         worker->tick++;
239         if (local_task) {
240             if (worker->tick % TRY_POLL_FREQ == 0) {
241                 worker->ops.TryPoll(worker, 0);
242             }
243             ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(local_task);
244             RunTaskLifo(work, worker);
245             continue;
246         }
247 
248         PollerRet ret = TryPoll(worker, 0);
249         if (ret != PollerRet::RET_NULL) {
250             continue;
251         }
252 
253         // pick up tasks from global queue
254         CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
255         // the worker is not notified when the task attribute is set not to notify worker
256         if (task != nullptr) {
257             if (task->type == ffrt_normal_task && !task->notifyWorker_) {
258                 task->notifyWorker_ = true;
259             } else {
260                 worker->ops.NotifyTaskPicked(worker);
261             }
262             ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(task);
263             RunTask(work, worker);
264             continue;
265         }
266 
267         // check the epoll status again to prevent fd or timer events from being missed
268         ret = TryPoll(worker, 0);
269         if (ret != PollerRet::RET_NULL) {
270             continue;
271         }
272 
273         if (worker->localFifo.GetLength() == 0) {
274             worker->ops.StealTaskBatch(worker);
275         }
276 
277         if (!LocalEmpty(worker)) {
278             worker->tick = 1;
279             continue;
280         }
281 
282         // enable a worker to enter the epoll wait -1 state and continuously listen to fd or timer events
283         // only one worker enters this state at a QoS level
284         ret = TryPoll(worker, -1);
285         if (ret != PollerRet::RET_NULL) {
286             continue;
287         }
288 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
289         }
290 #endif
291         auto action = worker->ops.WaitForNewAction(worker);
292         if (action == WorkerAction::RETRY) {
293             worker->tick = 0;
294             continue;
295         } else if (action == WorkerAction::RETIRE) {
296             break;
297         }
298     }
299 }
300 
301 // work looper with standard procedure which could be strategical
WorkerLooperStandard(WorkerThread * p)302 void CPUWorker::WorkerLooperStandard(WorkerThread* p)
303 {
304     CPUWorker* worker = reinterpret_cast<CPUWorker*>(p);
305     auto mgr = reinterpret_cast<CPUWorkerManager*>(p->worker_mgr);
306     auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(p->GetQos());
307     auto lock = mgr->GetSleepCtl(static_cast<int>(p->GetQos()));
308     ExecuteCtx* ctx = ExecuteCtx::Cur();
309     CoRoutineEnv* coRoutineEnv = GetCoEnv();
310     for (;;) {
311         // try get task
312         CPUEUTask* task = nullptr;
313         if (!mgr->tearDown) {
314             std::lock_guard lg(*lock);
315             task = sched.PickNextTask();
316         }
317 
318         // if succ, notify picked and run task
319         if (task != nullptr) {
320             mgr->NotifyTaskPicked(worker);
321             RunTask(reinterpret_cast<ffrt_executor_task_t*>(task), worker, ctx, coRoutineEnv);
322             continue;
323         }
324         // otherwise, worker wait action
325         auto action = worker->ops.WaitForNewAction(worker);
326         if (action == WorkerAction::RETRY) {
327             continue;
328         } else if (action == WorkerAction::RETIRE) {
329             break;
330         }
331     }
332 }
333 } // namespace ffrt
334