1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "eu/cpu_worker.h"
17 #include <algorithm>
18 #include <sched.h>
19 #include <sys/syscall.h>
20 #include "eu/func_manager.h"
21 #include "dm/dependence_manager.h"
22 #include "dfx/perf/ffrt_perf.h"
23 #include "tm/queue_task.h"
24 #include "eu/execute_unit.h"
25 #include "dfx/sysevent/sysevent.h"
26 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
27 #include "eu/blockaware.h"
28 #endif
29 #include "util/ffrt_facade.h"
30 #ifdef OHOS_THREAD_STACK_DUMP
31 #include "dfx_dump_catcher.h"
32 #endif
33 namespace {
34 const unsigned int TRY_POLL_FREQ = 51;
35 const unsigned int LOCAL_QUEUE_SIZE = 128;
36 const unsigned int STEAL_BUFFER_SIZE = LOCAL_QUEUE_SIZE / 2;
37 }
38
39 namespace ffrt {
CPUWorker(const QoS & qos,CpuWorkerOps && ops,size_t stackSize)40 CPUWorker::CPUWorker(const QoS& qos, CpuWorkerOps&& ops, size_t stackSize) : qos(qos), ops(ops)
41 {
42 #ifdef FFRT_PTHREAD_ENABLE
43 pthread_attr_init(&attr_);
44 if (stackSize > 0) {
45 pthread_attr_setstacksize(&attr_, stackSize);
46 }
47 #endif
48 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
49 domain_id = (qos() <= BLOCKAWARE_DOMAIN_ID_MAX) ? qos() : BLOCKAWARE_DOMAIN_ID_MAX + 1;
50 #endif
51 #ifdef FFRT_SEND_EVENT
52 uint64_t freq = 1000000;
53 #if defined(__aarch64__)
54 asm volatile("mrs %0, cntfrq_el0" : "=r"(freq));
55 #endif
56 this->cacheFreq = freq;
57 this->cacheQos = static_cast<int>(qos);
58 #endif
59 #ifdef FFRT_PTHREAD_ENABLE
60 Start(CPUWorker::WrapDispatch, this);
61 #else
62 Start(CPUWorker::Dispatch, this);
63 #endif
64 }
65
~CPUWorker()66 CPUWorker::~CPUWorker()
67 {
68 if (!exited) {
69 #ifdef OHOS_THREAD_STACK_DUMP
70 FFRT_LOGW("CPUWorker enter destruction but not exited");
71 OHOS::HiviewDFX::DfxDumpCatcher dumplog;
72 std::string msg = "";
73 bool result = dumplog.DumpCatch(getpid(), gettid(), msg);
74 if (result) {
75 std::vector<std::string> out;
76 std::stringstream ss(msg);
77 std::string s;
78 while (std::getline(ss, s, '\n')) {
79 out.push_back(s);
80 }
81 for (auto const& line: out) {
82 FFRT_LOGE("ffrt callstack %s", line.c_str());
83 }
84 }
85 #endif
86 }
87 Detach();
88 }
89
NativeConfig()90 void CPUWorker::NativeConfig()
91 {
92 pid_t pid = syscall(SYS_gettid);
93 this->tid = pid;
94 SetThreadAttr(qos);
95 }
96
WrapDispatch(void * worker)97 void* CPUWorker::WrapDispatch(void* worker)
98 {
99 reinterpret_cast<CPUWorker*>(worker)->NativeConfig();
100 Dispatch(reinterpret_cast<CPUWorker*>(worker));
101 return nullptr;
102 }
103
RunTask(TaskBase * task,CPUWorker * worker)104 void CPUWorker::RunTask(TaskBase* task, CPUWorker* worker)
105 {
106 bool isNotUv = task->type == ffrt_normal_task || task->type == ffrt_queue_task;
107 #ifdef FFRT_SEND_EVENT
108 static bool isBetaVersion = IsBeta();
109 uint64_t startExecuteTime = 0;
110 if (isBetaVersion) {
111 startExecuteTime = TimeStamp();
112 if (likely(isNotUv)) {
113 worker->cacheLabel = task->GetLabel();
114 }
115 }
116 #endif
117 worker->curTask.store(task, std::memory_order_relaxed);
118 worker->curTaskType_.store(task->type, std::memory_order_relaxed);
119 #ifdef WORKER_CACHE_TASKNAMEID
120 if (isNotUv) {
121 worker->curTaskLabel_ = task->GetLabel();
122 worker->curTaskGid_ = task->gid;
123 }
124 #endif
125
126 ExecuteTask(task);
127
128 worker->curTask.store(nullptr, std::memory_order_relaxed);
129 worker->curTaskType_.store(ffrt_invalid_task, std::memory_order_relaxed);
130 #ifdef FFRT_SEND_EVENT
131 if (isBetaVersion) {
132 uint64_t execDur = ((TimeStamp() - startExecuteTime) / worker->cacheFreq);
133 TaskBlockInfoReport(execDur, isNotUv ? worker->cacheLabel : "uv_task", worker->cacheQos, worker->cacheFreq);
134 }
135 #endif
136 }
137
Dispatch(CPUWorker * worker)138 void CPUWorker::Dispatch(CPUWorker* worker)
139 {
140 QoS qos = worker->GetQos();
141 FFRTFacade::GetEUInstance().WorkerStart(qos());
142 worker->WorkerSetup();
143
144 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
145 if (worker->ops.IsBlockAwareInit()) {
146 int ret = BlockawareRegister(worker->GetDomainId());
147 if (ret != 0) {
148 FFRT_SYSEVENT_LOGE("blockaware register fail, ret[%d]", ret);
149 }
150 }
151 #endif
152 auto ctx = ExecuteCtx::Cur();
153 ctx->qos = qos;
154 *(FFRTFacade::GetSchedInstance()->GetScheduler(qos).GetWorkerTick()) = &(worker->tick);
155
156 worker->ops.WorkerPrepare(worker);
157 #ifndef OHOS_STANDARD_SYSTEM
158 FFRT_LOGI("qos[%d] thread start succ", static_cast<int>(qos));
159 #endif
160 FFRT_PERF_WORKER_AWAKE(static_cast<int>(qos));
161 WorkerLooper(worker);
162 CoWorkerExit();
163 FFRTFacade::GetEUInstance().WorkerExit(qos());
164 worker->ops.WorkerRetired(worker);
165 }
166
RunSingleTask(int qos,CPUWorker * worker)167 bool CPUWorker::RunSingleTask(int qos, CPUWorker *worker)
168 {
169 TaskBase *task = FFRTFacade::GetSchedInstance()->PopTask(qos);
170 if (task) {
171 RunTask(task, worker);
172 return true;
173 }
174 return false;
175 }
176
177 // work looper which inherited from history
WorkerLooper(CPUWorker * worker)178 void CPUWorker::WorkerLooper(CPUWorker* worker)
179 {
180 for (;;) {
181 if (worker->Exited()) {
182 break;
183 }
184
185 TaskBase* task = FFRTFacade::GetSchedInstance()->PopTask(worker->GetQos());
186 worker->tick++;
187 if (task) {
188 if (FFRTFacade::GetSchedInstance()->GetTaskSchedMode(worker->GetQos()) ==
189 TaskSchedMode::DEFAULT_TASK_SCHED_MODE) {
190 FFRTFacade::GetEUInstance().NotifyTask<TaskNotifyType::TASK_PICKED>(worker->GetQos());
191 }
192 RunTask(task, worker);
193 continue;
194 }
195 // It is about to enter the idle state.
196 if (FFRTFacade::GetEUInstance().GetSchedMode(worker->GetQos()) == sched_mode_type::sched_energy_saving_mode) {
197 if (FFRTFacade::GetEUInstance().WorkerShare(worker, RunSingleTask)) {
198 continue;
199 }
200 }
201 FFRT_PERF_WORKER_IDLE(static_cast<int>(worker->qos));
202 auto action = worker->ops.WorkerIdleAction(worker);
203 if (action == WorkerAction::RETRY) {
204 FFRT_PERF_WORKER_AWAKE(static_cast<int>(worker->qos));
205 worker->tick = 0;
206 continue;
207 } else if (action == WorkerAction::RETIRE) {
208 break;
209 }
210 }
211 }
212 } // namespace ffrt
213