• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "cpu_worker.h"
17 #include "eu/worker_thread.h"
18 #include "ffrt_trace.h"
19 #include "sched/scheduler.h"
20 #include "eu/cpu_manager_strategy.h"
21 #include "dfx/bbox/bbox.h"
22 #include "eu/func_manager.h"
23 #include "dm/dependence_manager.h"
24 #include "dfx/perf/ffrt_perf.h"
25 #include "sync/poller.h"
26 #include "util/ffrt_facade.h"
27 #include "tm/cpu_task.h"
28 #include "tm/queue_task.h"
29 #include "eu/cpuworker_manager.h"
30 #include "dfx/sysevent/sysevent.h"
31 namespace {
32 int PLACE_HOLDER = 0;
33 const unsigned int TRY_POLL_FREQ = 51;
34 }
35 
36 namespace ffrt {
WrapDispatch(void * worker)37 void* CPUWorker::WrapDispatch(void* worker)
38 {
39     reinterpret_cast<CPUWorker*>(worker)->NativeConfig();
40     Dispatch(reinterpret_cast<CPUWorker*>(worker));
41     return nullptr;
42 }
43 
RunTask(TaskBase * task,CPUWorker * worker)44 void CPUWorker::RunTask(TaskBase* task, CPUWorker* worker)
45 {
46     bool isNotUv = task->type == ffrt_normal_task || task->type == ffrt_queue_task;
47 #ifdef FFRT_SEND_EVENT
48     static bool isBetaVersion = IsBeta();
49     uint64_t startExecuteTime = 0;
50     if (isBetaVersion) {
51         startExecuteTime = FFRTTraceRecord::TimeStamp();
52         CPUEUTask* cpu_task = reinterpret_cast<CPUEUTask*>(task);
53         if (likely(isNotUv)) {
54             worker->cacheLabel = cpu_task->label;
55         }
56     }
57 #endif
58     worker->curTask = task;
59     worker->curTaskType_ = task->type;
60 #ifdef WORKER_CACHE_TASKNAMEID
61     if (isNotUv) {
62         worker->curTaskLabel_ = task->GetLabel();
63         worker->curTaskGid_ = task->gid;
64     }
65 #endif
66 
67     ExecuteTask(task, worker->GetQos());
68 
69     worker->curTask = nullptr;
70     worker->curTaskType_ = ffrt_invalid_task;
71 #ifdef FFRT_SEND_EVENT
72     if (isBetaVersion) {
73         uint64_t execDur = ((FFRTTraceRecord::TimeStamp() - startExecuteTime) / worker->cacheFreq);
74         TaskBlockInfoReport(execDur, isNotUv ? worker->cacheLabel : "uv_task", worker->cacheQos, worker->cacheFreq);
75     }
76 #endif
77 }
78 
RunTaskLifo(TaskBase * task,CPUWorker * worker)79 void CPUWorker::RunTaskLifo(TaskBase* task, CPUWorker* worker)
80 {
81     RunTask(task, worker);
82 
83     unsigned int lifoCount = 0;
84     while (worker->priority_task != nullptr && worker->priority_task != &PLACE_HOLDER) {
85         lifoCount++;
86         TaskBase* priorityTask = reinterpret_cast<TaskBase*>(worker->priority_task);
87         // set a placeholder to prevent the task from being placed in the priority again
88         worker->priority_task = (lifoCount > worker->budget) ? &PLACE_HOLDER : nullptr;
89 
90         RunTask(priorityTask, worker);
91     }
92 }
93 
GetTask(CPUWorker * worker)94 void* CPUWorker::GetTask(CPUWorker* worker)
95 {
96 #ifdef FFRT_LOCAL_QUEUE_ENABLE
97     // periodically pick up tasks from the global queue to prevent global queue starvation
98     if (worker->tick % worker->global_interval == 0) {
99         worker->tick = 0;
100         TaskBase* task = static_cast<TaskBase*>(worker->ops.PickUpTaskBatch(worker));
101         // the worker is not notified when the task attribute is set not to notify worker
102         if (NeedNotifyWorker(task)) {
103             worker->ops.NotifyTaskPicked(worker);
104         }
105         return task;
106     }
107 
108     // preferentially pick up tasks from the priority unless the priority is empty or occupied
109     if (worker->priority_task != nullptr) {
110         void* task = worker->priority_task;
111         worker->priority_task = nullptr;
112         if (task != &PLACE_HOLDER) {
113             return task;
114         }
115     }
116 
117     return worker->localFifo.PopHead();
118 #else
119     TaskBase* task = worker->ops.PickUpTaskBatch(worker);
120     if (task != nullptr) {
121         worker->ops.NotifyTaskPicked(worker);
122     }
123 
124     return task;
125 #endif
126 }
127 
TryPoll(CPUWorker * worker,int timeout)128 PollerRet CPUWorker::TryPoll(CPUWorker* worker, int timeout)
129 {
130     PollerRet ret = worker->ops.TryPoll(worker, timeout);
131     if (ret == PollerRet::RET_TIMER) {
132         worker->tick = 0;
133     }
134 
135     return ret;
136 }
137 
LocalEmpty(CPUWorker * worker)138 bool CPUWorker::LocalEmpty(CPUWorker* worker)
139 {
140     return ((worker->priority_task == nullptr) && (worker->localFifo.GetLength() == 0));
141 }
142 
Dispatch(CPUWorker * worker)143 void CPUWorker::Dispatch(CPUWorker* worker)
144 {
145 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
146     if (worker->ops.IsBlockAwareInit()) {
147         int ret = BlockawareRegister(worker->GetDomainId());
148         if (ret != 0) {
149             FFRT_LOGE("blockaware register fail, ret[%d]", ret);
150         }
151     }
152 #endif
153     auto ctx = ExecuteCtx::Cur();
154     ctx->localFifo = &(worker->localFifo);
155     ctx->priority_task_ptr = &(worker->priority_task);
156     ctx->qos = worker->GetQos();
157 
158     worker->ops.WorkerPrepare(worker);
159 #ifndef OHOS_STANDARD_SYSTEM
160     FFRT_LOGI("qos[%d] thread start succ", static_cast<int>(worker->GetQos()));
161 #endif
162     FFRT_PERF_WORKER_AWAKE(static_cast<int>(worker->GetQos()));
163     WorkerLooperDefault(worker);
164     CoWorkerExit();
165     worker->ops.WorkerRetired(worker);
166 }
167 
168 // work looper which inherited from history
WorkerLooperDefault(CPUWorker * worker)169 void CPUWorker::WorkerLooperDefault(CPUWorker* worker)
170 {
171     const sched_mode_type& schedMode = CPUManagerStrategy::GetSchedMode(worker->GetQos());
172     for (;;) {
173         // get task in the order of priority -> local queue -> global queue
174         TaskBase* local_task = reinterpret_cast<TaskBase*>(GetTask(worker));
175         worker->tick++;
176         if (local_task) {
177             if (worker->tick % TRY_POLL_FREQ == 0) {
178                 worker->ops.TryPoll(worker, 0);
179             }
180             goto run_task;
181         }
182 
183         if (schedMode == sched_mode_type::sched_default_mode) {
184             goto poll_once;
185         } else {
186             // direct to pollwait when no task available
187             goto poll_wait;
188         }
189 
190 run_task:
191         RunTaskLifo(local_task, worker);
192         continue;
193 
194 poll_once:
195         if (TryPoll(worker, 0) != PollerRet::RET_NULL) {
196             continue;
197         }
198 
199 #ifdef FFRT_LOCAL_QUEUE_ENABLE
200         // pick up tasks from global queue
201         local_task = static_cast<TaskBase*>(worker->ops.PickUpTaskBatch(worker));
202         // the worker is not notified when the task attribute is set not to notify worker
203         if (local_task != nullptr) {
204             if (NeedNotifyWorker(local_task)) {
205                 worker->ops.NotifyTaskPicked(worker);
206             }
207             RunTask(local_task, worker);
208             continue;
209         }
210 
211         // check the epoll status again to prevent fd or timer events from being missed
212         if (TryPoll(worker, 0) != PollerRet::RET_NULL) {
213             continue;
214         }
215 
216         if (worker->localFifo.GetLength() == 0) {
217             worker->ops.StealTaskBatch(worker);
218         }
219 
220         if (!LocalEmpty(worker)) {
221             worker->tick = 1;
222             continue;
223         }
224 #endif
225 
226 poll_wait:
227         // enable a worker to enter the epoll wait -1 state and continuously listen to fd or timer events
228         // only one worker enters this state at a QoS level
229         if (TryPoll(worker, -1) != PollerRet::RET_NULL) {
230             continue;
231         }
232 
233         FFRT_PERF_WORKER_IDLE(static_cast<int>(worker->qos));
234         auto action = worker->ops.WaitForNewAction(worker);
235         if (action == WorkerAction::RETRY) {
236             FFRT_PERF_WORKER_AWAKE(static_cast<int>(worker->qos));
237             worker->tick = 0;
238             continue;
239         } else if (action == WorkerAction::RETIRE) {
240             break;
241         }
242     }
243 }
244 } // namespace ffrt