• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "cpu_worker.h"
17 #include "eu/worker_thread.h"
18 #include "ffrt_trace.h"
19 #include "sched/scheduler.h"
20 #include "eu/cpu_manager_interface.h"
21 #include "dfx/bbox/bbox.h"
22 #include "eu/func_manager.h"
23 #include "dm/dependence_manager.h"
24 #ifdef FFRT_IO_TASK_SCHEDULER
25 #include "sync/poller.h"
26 #include "util/spmc_queue.h"
27 #endif
28 #include "tm/cpu_task.h"
29 
30 namespace ffrt {
31 const int PLACE_HOLDER = 0;
32 const unsigned int TRY_POLL_FREQ = 51;
33 }
34 
35 namespace ffrt {
Run(CPUEUTask * task)36 void CPUWorker::Run(CPUEUTask* task)
37 {
38     FFRT_TRACE_SCOPE(TRACE_LEVEL2, Run);
39     if constexpr(USE_COROUTINE) {
40         CoStart(task);
41     } else {
42         auto f = reinterpret_cast<ffrt_function_header_t*>(task->func_storage);
43         auto exp = ffrt::SkipStatus::SUBMITTED;
44         if (likely(__atomic_compare_exchange_n(&task->skipped, &exp, ffrt::SkipStatus::EXECUTED, 0,
45             __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
46             FFRT_TASK_BEGIN(task->label, task->gid);
47             f->exec(f);
48             FFRT_TASK_END();
49         }
50         f->destroy(f);
51         task->UpdateState(ffrt::TaskState::EXITED);
52     }
53 }
54 
Run(ffrt_executor_task_t * task,ffrt_qos_t qos)55 void CPUWorker::Run(ffrt_executor_task_t* task, ffrt_qos_t qos)
56 {
57 #ifdef FFRT_BBOX_ENABLE
58     TaskRunCounterInc();
59 #endif
60 
61     ffrt_executor_task_func func = nullptr;
62     ffrt_executor_task_type_t type = static_cast<ffrt_executor_task_type_t>(task->type);
63     if (type == ffrt_io_task) {
64         func = FuncManager::Instance()->getFunc(ffrt_io_task);
65     } else {
66         func = FuncManager::Instance()->getFunc(ffrt_uv_task);
67     }
68     if (func == nullptr) {
69         FFRT_LOGE("Static func is nullptr");
70         return;
71     }
72 
73     FFRT_EXECUTOR_TASK_BEGIN(task);
74     func(task, qos);
75     FFRT_EXECUTOR_TASK_END();
76     if (type != ffrt_io_task) {
77         FFRT_EXECUTOR_TASK_FINISH_MARKER(task); // task finish marker for uv task
78     }
79 #ifdef FFRT_BBOX_ENABLE
80     TaskFinishCounterInc();
81 #endif
82 }
83 
WarpDispatch(void * worker)84 void* CPUWorker::WarpDispatch(void* worker)
85 {
86     reinterpret_cast<CPUWorker*>(worker)->NativeConfig();
87     Dispatch(reinterpret_cast<CPUWorker*>(worker));
88     return nullptr;
89 }
90 
91 #ifdef FFRT_IO_TASK_SCHEDULER
RunTask(ffrt_executor_task_t * curtask,CPUWorker * worker,CPUEUTask * & lastTask)92 void CPUWorker::RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker, CPUEUTask* &lastTask)
93 {
94     auto ctx = ExecuteCtx::Cur();
95     CPUEUTask* task = reinterpret_cast<CPUEUTask*>(curtask);
96     if (curtask->type != 0) {
97         ctx->exec_task = curtask;
98         worker->curTask = task;
99         Run(curtask, static_cast<ffrt_qos_t>(worker->GetQos()));
100         worker->curTask = nullptr;
101         ctx->exec_task = nullptr;
102     } else {
103         FFRT_LOGD("EU pick task[%lu]", task->gid);
104         task->UpdateState(TaskState::RUNNING);
105 
106         lastTask = task;
107         ctx->task = task;
108         worker->curTask = task;
109         Run(task);
110         worker->curTask = nullptr;
111         ctx->task = nullptr;
112     }
113 }
114 
RunTaskLifo(ffrt_executor_task_t * task,CPUWorker * worker,CPUEUTask * & lastTask)115 void CPUWorker::RunTaskLifo(ffrt_executor_task_t* task, CPUWorker* worker, CPUEUTask* &lastTask)
116 {
117     RunTask(task, worker, lastTask);
118 
119     unsigned int lifoCount = 0;
120     while (worker->priority_task != nullptr && worker->priority_task != &PLACE_HOLDER) {
121         lifoCount++;
122         ffrt_executor_task_t* priorityTask = reinterpret_cast<ffrt_executor_task_t*>(worker->priority_task);
123         // set a placeholder to prevent the task from being placed in the priority again
124         worker->priority_task = (lifoCount > worker->budget) ? const_cast<int*>(&PLACE_HOLDER) : nullptr;
125 
126         RunTask(priorityTask, worker, lastTask);
127     }
128 }
129 
GetTask(CPUWorker * worker)130 void* CPUWorker::GetTask(CPUWorker* worker)
131 {
132     // periodically pick up tasks from the global queue to prevent global queue starvation
133     if (worker->tick % worker->global_interval == 0) {
134         worker->tick = 0;
135         void* task = worker->ops.PickUpTaskBatch(worker);
136         if (task != nullptr) {
137             worker->ops.NotifyTaskPicked(worker);
138         }
139         return task;
140     }
141 
142     // preferentially pick up tasks from the priority unless the priority is empty or occupied
143     if (worker->priority_task != nullptr) {
144         void* task = worker->priority_task;
145         worker->priority_task = nullptr;
146         if (task != &PLACE_HOLDER) {
147             return task;
148         }
149     }
150 
151     return worker->localFifo.PopHead();
152 }
153 
TryPoll(CPUWorker * worker,int timeout)154 PollerRet CPUWorker::TryPoll(CPUWorker* worker, int timeout)
155 {
156     PollerRet ret = worker->ops.TryPoll(worker, timeout);
157     if (ret == PollerRet::RET_TIMER) {
158         worker->tick = 0;
159     }
160 
161     return ret;
162 }
163 
LocalEmpty(CPUWorker * worker)164 bool CPUWorker::LocalEmpty(CPUWorker* worker)
165 {
166     return (worker->priority_task == nullptr) && (worker->localFifo.GetLength() == 0);
167 }
168 
Dispatch(CPUWorker * worker)169 void CPUWorker::Dispatch(CPUWorker* worker)
170 {
171     auto ctx = ExecuteCtx::Cur();
172     ctx->localFifo = &(worker->localFifo);
173     ctx->priority_task_ptr = &(worker->priority_task);
174     ctx->qos = worker->GetQos();
175     CPUEUTask* lastTask = nullptr;
176 
177     FFRT_LOGD("qos[%d] thread start succ", (int)worker->GetQos());
178     for (;;) {
179         FFRT_LOGD("task picking");
180         // get task in the order of priority -> local queue -> global queue
181         void* local_task = GetTask(worker);
182         worker->tick++;
183         if (local_task) {
184             if (worker->tick % TRY_POLL_FREQ == 0) {
185                 worker->ops.TryPoll(worker, 0);
186             }
187             ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(local_task);
188             RunTaskLifo(work, worker, lastTask);
189             continue;
190         }
191 
192         PollerRet ret = TryPoll(worker, 0);
193         if (ret != PollerRet::RET_NULL) {
194             continue;
195         }
196 
197         // pick up tasks from global queue
198         CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
199         if (task) {
200             worker->ops.NotifyTaskPicked(worker);
201             ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(task);
202             RunTask(work, worker, lastTask);
203             continue;
204         }
205 
206         // check the epoll status again to prevent fd or timer events from being missed
207         ret = TryPoll(worker, 0);
208         if (ret != PollerRet::RET_NULL) {
209             continue;
210         }
211 
212         if (worker->localFifo.GetLength() == 0) {
213             worker->ops.StealTaskBatch(worker);
214         }
215         if (!LocalEmpty(worker)) {
216             worker->tick = 1;
217             continue;
218         }
219 
220         // enable a worker to enter the epoll wait -1 state and continuously listen to fd or timer events
221         // only one worker enters this state at a QoS level
222         ret = TryPoll(worker, -1);
223         if (ret != PollerRet::RET_NULL) {
224             continue;
225         }
226 
227         FFRT_WORKER_IDLE_BEGIN_MARKER();
228         auto action = worker->ops.WaitForNewAction(worker);
229         FFRT_WORKER_IDLE_END_MARKER();
230         if (action == WorkerAction::RETRY) {
231             worker->tick = 0;
232             continue;
233         } else if (action == WorkerAction::RETIRE) {
234             break;
235         }
236     }
237 
238     CoWorkerExit();
239     FFRT_LOGD("ExecutionThread exited");
240     free(worker->steal_buffer);
241     worker->ops.WorkerRetired(worker);
242 }
243 #else
Dispatch(CPUWorker * worker)244 void CPUWorker::Dispatch(CPUWorker* worker)
245 {
246     auto ctx = ExecuteCtx::Cur();
247     CPUEUTask* lastTask = nullptr;
248 
249     worker->ops.WorkerPrepare(worker);
250     FFRT_LOGD("qos[%d] thread start succ", static_cast<int>(worker->GetQos()));
251     for (;;) {
252         FFRT_LOGD("task picking");
253         CPUEUTask* task = worker->ops.PickUpTask(worker);
254         if (task) {
255             worker->ops.NotifyTaskPicked(worker);
256         } else {
257             FFRT_WORKER_IDLE_BEGIN_MARKER();
258             auto action = worker->ops.WaitForNewAction(worker);
259             FFRT_WORKER_IDLE_END_MARKER();
260             if (action == WorkerAction::RETRY) {
261                 continue;
262             } else if (action == WorkerAction::RETIRE) {
263                 break;
264             }
265         }
266 
267         BboxCheckAndFreeze();
268 
269         if (task->type != 0) {
270             worker->curTask = task;
271             ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(task);
272             Run(work, static_cast<ffrt_qos_t>(worker->GetQos()));
273         } else {
274             FFRT_LOGD("EU pick task[%lu]", task->gid);
275             task->UpdateState(TaskState::RUNNING);
276 
277             lastTask = task;
278             ctx->task = task;
279             worker->curTask = task;
280             Run(task);
281         }
282         BboxCheckAndFreeze();
283         worker->curTask = nullptr;
284         ctx->task = nullptr;
285     }
286 
287     CoWorkerExit();
288     FFRT_LOGD("ExecutionThread exited");
289     worker->ops.WorkerRetired(worker);
290 }
291 #endif
292 } // namespace ffrt
293