• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "eu/cpu_worker.h"
17 #include <algorithm>
18 #include <sched.h>
19 #include <sys/syscall.h>
20 #include "eu/func_manager.h"
21 #include "dm/dependence_manager.h"
22 #include "dfx/perf/ffrt_perf.h"
23 #include "tm/queue_task.h"
24 #include "eu/execute_unit.h"
25 #include "dfx/sysevent/sysevent.h"
26 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
27 #include "eu/blockaware.h"
28 #endif
29 #include "util/ffrt_facade.h"
30 #ifdef OHOS_THREAD_STACK_DUMP
31 #include "dfx_dump_catcher.h"
32 #endif
33 namespace {
34 const unsigned int TRY_POLL_FREQ = 51;
35 const unsigned int LOCAL_QUEUE_SIZE = 128;
36 const unsigned int STEAL_BUFFER_SIZE = LOCAL_QUEUE_SIZE / 2;
37 }
38 
39 namespace ffrt {
CPUWorker(const QoS & qos,CpuWorkerOps && ops,size_t stackSize)40 CPUWorker::CPUWorker(const QoS& qos, CpuWorkerOps&& ops, size_t stackSize) : qos(qos), ops(ops)
41 {
42 #ifdef FFRT_PTHREAD_ENABLE
43     pthread_attr_init(&attr_);
44     if (stackSize > 0) {
45         pthread_attr_setstacksize(&attr_, stackSize);
46     }
47 #endif
48 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
49     domain_id = (qos() <= BLOCKAWARE_DOMAIN_ID_MAX) ? qos() : BLOCKAWARE_DOMAIN_ID_MAX + 1;
50 #endif
51 #ifdef FFRT_SEND_EVENT
52     uint64_t freq = 1000000;
53 #if defined(__aarch64__)
54     asm volatile("mrs %0, cntfrq_el0" : "=r"(freq));
55 #endif
56     this->cacheFreq = freq;
57     this->cacheQos = static_cast<int>(qos);
58 #endif
59 #ifdef FFRT_PTHREAD_ENABLE
60     Start(CPUWorker::WrapDispatch, this);
61 #else
62     Start(CPUWorker::Dispatch, this);
63 #endif
64 }
65 
~CPUWorker()66 CPUWorker::~CPUWorker()
67 {
68     if (!exited) {
69 #ifdef OHOS_THREAD_STACK_DUMP
70         FFRT_LOGW("CPUWorker enter destruction but not exited");
71         OHOS::HiviewDFX::DfxDumpCatcher dumplog;
72         std::string msg = "";
73         bool result = dumplog.DumpCatch(getpid(), gettid(), msg);
74         if (result) {
75             std::vector<std::string> out;
76             std::stringstream ss(msg);
77             std::string s;
78             while (std::getline(ss, s, '\n')) {
79                 out.push_back(s);
80             }
81             for (auto const& line: out) {
82                 FFRT_LOGE("ffrt callstack %s", line.c_str());
83             }
84         }
85 #endif
86     }
87     Detach();
88 }
89 
NativeConfig()90 void CPUWorker::NativeConfig()
91 {
92     pid_t pid = syscall(SYS_gettid);
93     this->tid = pid;
94     SetThreadAttr(qos);
95 }
96 
WrapDispatch(void * worker)97 void* CPUWorker::WrapDispatch(void* worker)
98 {
99     reinterpret_cast<CPUWorker*>(worker)->NativeConfig();
100     Dispatch(reinterpret_cast<CPUWorker*>(worker));
101     return nullptr;
102 }
103 
RunTask(TaskBase * task,CPUWorker * worker)104 void CPUWorker::RunTask(TaskBase* task, CPUWorker* worker)
105 {
106     bool isNotUv = task->type == ffrt_normal_task || task->type == ffrt_queue_task;
107 #ifdef FFRT_SEND_EVENT
108     static bool isBetaVersion = IsBeta();
109     uint64_t startExecuteTime = 0;
110     if (isBetaVersion) {
111         startExecuteTime = TimeStamp();
112         if (likely(isNotUv)) {
113             worker->cacheLabel = task->GetLabel();
114         }
115     }
116 #endif
117     worker->curTask.store(task, std::memory_order_relaxed);
118     worker->curTaskType_.store(task->type, std::memory_order_relaxed);
119 #ifdef WORKER_CACHE_TASKNAMEID
120     if (isNotUv) {
121         worker->curTaskLabel_ = task->GetLabel();
122         worker->curTaskGid_ = task->gid;
123     }
124 #endif
125 
126     ExecuteTask(task);
127 
128     worker->curTask.store(nullptr, std::memory_order_relaxed);
129     worker->curTaskType_.store(ffrt_invalid_task, std::memory_order_relaxed);
130 #ifdef FFRT_SEND_EVENT
131     if (isBetaVersion) {
132         uint64_t execDur = ((TimeStamp() - startExecuteTime) / worker->cacheFreq);
133         TaskBlockInfoReport(execDur, isNotUv ? worker->cacheLabel : "uv_task", worker->cacheQos, worker->cacheFreq);
134     }
135 #endif
136 }
137 
Dispatch(CPUWorker * worker)138 void CPUWorker::Dispatch(CPUWorker* worker)
139 {
140     QoS qos = worker->GetQos();
141     FFRTFacade::GetEUInstance().WorkerStart(qos());
142     worker->WorkerSetup();
143 
144 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
145     if (worker->ops.IsBlockAwareInit()) {
146         int ret = BlockawareRegister(worker->GetDomainId());
147         if (ret != 0) {
148             FFRT_SYSEVENT_LOGE("blockaware register fail, ret[%d]", ret);
149         }
150     }
151 #endif
152     auto ctx = ExecuteCtx::Cur();
153     ctx->qos = qos;
154     *(FFRTFacade::GetSchedInstance()->GetScheduler(qos).GetWorkerTick()) = &(worker->tick);
155 
156     worker->ops.WorkerPrepare(worker);
157 #ifndef OHOS_STANDARD_SYSTEM
158     FFRT_LOGI("qos[%d] thread start succ", static_cast<int>(qos));
159 #endif
160     FFRT_PERF_WORKER_AWAKE(static_cast<int>(qos));
161     WorkerLooper(worker);
162     CoWorkerExit();
163     FFRTFacade::GetEUInstance().WorkerExit(qos());
164     worker->ops.WorkerRetired(worker);
165 }
166 
RunSingleTask(int qos,CPUWorker * worker)167 bool CPUWorker::RunSingleTask(int qos, CPUWorker *worker)
168 {
169     TaskBase *task = FFRTFacade::GetSchedInstance()->PopTask(qos);
170     if (task) {
171         RunTask(task, worker);
172         return true;
173     }
174     return false;
175 }
176 
177 // work looper which inherited from history
WorkerLooper(CPUWorker * worker)178 void CPUWorker::WorkerLooper(CPUWorker* worker)
179 {
180     for (;;) {
181         if (worker->Exited()) {
182             break;
183         }
184 
185         TaskBase* task = FFRTFacade::GetSchedInstance()->PopTask(worker->GetQos());
186         worker->tick++;
187         if (task) {
188             if (FFRTFacade::GetSchedInstance()->GetTaskSchedMode(worker->GetQos()) ==
189                 TaskSchedMode::DEFAULT_TASK_SCHED_MODE) {
190                 FFRTFacade::GetEUInstance().NotifyTask<TaskNotifyType::TASK_PICKED>(worker->GetQos());
191             }
192             RunTask(task, worker);
193             continue;
194         }
195         // It is about to enter the idle state.
196         if (FFRTFacade::GetEUInstance().GetSchedMode(worker->GetQos()) == sched_mode_type::sched_energy_saving_mode) {
197             if (FFRTFacade::GetEUInstance().WorkerShare(worker, RunSingleTask)) {
198                 continue;
199             }
200         }
201         FFRT_PERF_WORKER_IDLE(static_cast<int>(worker->qos));
202         auto action = worker->ops.WorkerIdleAction(worker);
203         if (action == WorkerAction::RETRY) {
204             FFRT_PERF_WORKER_AWAKE(static_cast<int>(worker->qos));
205             worker->tick = 0;
206             continue;
207         } else if (action == WorkerAction::RETIRE) {
208             break;
209         }
210     }
211 }
212 } // namespace ffrt
213