1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "eu/execute_unit.h"
17 #include "eu/sexecute_unit.h"
18
19 #include <climits>
20 #include <cstring>
21 #include <sys/stat.h>
22 #include "dfx/trace_record/ffrt_trace_record.h"
23 #include "eu/co_routine_factory.h"
24 #include "eu/qos_interface.h"
25 #include "sched/scheduler.h"
26 #include "sched/workgroup_internal.h"
27 #include "util/ffrt_facade.h"
28 #include "util/slab.h"
29 #include "dfx/perf/ffrt_perf.h"
30 #include "dfx/sysevent/sysevent.h"
31 #include "internal_inc/config.h"
32
33 namespace {
34 /* SUPPORT_WORKER_DESTRUCT indicates that the idle thread destruction function is supported.
35 * The stack canary is saved or restored during coroutine switch-out and switch-in,
36 * currently, only the stack canary used by the ohos compiler stack protection is global
37 * and is not affected by worker destruction.
38 */
39 #if !defined(SUPPORT_WORKER_DESTRUCT)
40 constexpr int waiting_seconds = 10;
41 #else
42 constexpr int waiting_seconds = 5;
43 #endif
44 const size_t TIGGER_SUPPRESS_WORKER_COUNT = 4;
45 const size_t TIGGER_SUPPRESS_EXECUTION_NUM = 2;
46 const size_t MAX_ESCAPE_WORKER_NUM = 1024;
47 const int SEXECUTE_DESTRY_SLEEP_TIME = 1000;
48
49 const std::map<std::string,
50 void(*)(ffrt::SExecuteUnit*, const ffrt::QoS&, ffrt::TaskNotifyType)> NOTIFY_FUNCTION_FACTORY = {
51 { "CameraDaemon", ffrt::SExecuteUnit::HandleTaskNotifyConservative },
52 { "bluetooth", ffrt::SExecuteUnit::HandleTaskNotifyUltraConservative },
53 };
54 }
55
56 namespace ffrt {
57 constexpr int MANAGER_DESTRUCT_TIMESOUT = 1000;
58 constexpr uint64_t DELAYED_WAKED_UP_TASK_TIME_INTERVAL = 5 * 1000 * 1000;
59
SExecuteUnit()60 SExecuteUnit::SExecuteUnit() : ExecuteUnit(), handleTaskNotify(SExecuteUnit::HandleTaskNotifyDefault)
61 {
62 #ifdef OHOS_STANDARD_SYSTEM
63 for (const auto& notifyFunc : NOTIFY_FUNCTION_FACTORY) {
64 if (strstr(GetCurrentProcessName(), notifyFunc.first.c_str())) {
65 handleTaskNotify = notifyFunc.second;
66 break;
67 }
68 }
69 #endif
70
71 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
72 int ret = BlockawareInit(&keyPtr);
73 if (ret != 0) {
74 FFRT_SYSEVENT_LOGE("blockaware init fail, ret[%d], key[0x%lx]", ret, keyPtr);
75 } else {
76 blockAwareInit = true;
77 }
78 #endif
79 FFRT_LOGD("Construction completed.");
80 }
81
~SExecuteUnit()82 SExecuteUnit::~SExecuteUnit()
83 {
84 tearDown = true;
85 for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
86 workerGroup[qos].SetTearDown();
87 }
88 // Before destroying this object, we need to make sure that all threads that
89 // might access this object or its members have exited.
90 // If the destruction of this object happens before or in parallel of
91 // these threads access to freed memory can occur.
92 for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
93 int try_cnt = MANAGER_DESTRUCT_TIMESOUT;
94 while (try_cnt-- > 0) {
95 {
96 std::lock_guard lk(workerGroup[qos].mutex);
97 workerGroup[qos].cv.notify_all();
98 }
99 {
100 usleep(SEXECUTE_DESTRY_SLEEP_TIME);
101 std::shared_lock<std::shared_mutex> lck(workerGroup[qos].tgMutex);
102 if (workerGroup[qos].threads.empty()) {
103 break;
104 }
105 }
106 }
107
108 if (try_cnt <= 0) {
109 FFRT_SYSEVENT_LOGE("erase qos[%d] threads failed", qos);
110 }
111 }
112 // Note that delayedWorker might
113 // call ffrt::SExecuteUnit::WakeupWorkers
114 // We need to ensure the object is still
115 // alive when that happens. Hence, we
116 // delay the destruction till we ensure
117 // this access cannot happen.
118 FFRTFacade::GetDWInstance().Terminate();
119 FFRT_LOGD("Destruction completed.");
120 }
121
WorkerIdleAction(CPUWorker * thread)122 WorkerAction SExecuteUnit::WorkerIdleAction(CPUWorker* thread)
123 {
124 if (tearDown) {
125 return WorkerAction::RETIRE;
126 }
127 auto& group = workerGroup[thread->GetQos()];
128 std::unique_lock lk(group.mutex);
129 IntoSleep(thread->GetQos());
130 thread->SetWorkerState(WorkerStatus::SLEEPING);
131 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
132 BlockawareEnterSleeping();
133 #endif
134 if (group.cv.wait_for(lk, std::chrono::seconds(waiting_seconds), [this, thread] {
135 bool taskExistence = FFRTFacade::GetSchedInstance()->GetGlobalTaskCnt(thread->GetQos());
136 return tearDown || taskExistence;
137 })) {
138 workerGroup[thread->GetQos()].OutOfSleep();
139 thread->SetWorkerState(WorkerStatus::EXECUTING);
140 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
141 BlockawareLeaveSleeping();
142 #endif
143 return WorkerAction::RETRY;
144 } else {
145 #if !defined(SUPPORT_WORKER_DESTRUCT)
146 workerGroup[thread->GetQos()].IntoDeepSleep();
147 CoStackFree();
148 if (IsExceedDeepSleepThreshold()) {
149 ffrt::CoRoutineReleaseMem();
150 }
151 group.cv.wait(lk, [this, thread] {
152 return tearDown ||
153 FFRTFacade::GetSchedInstance()->GetTotalTaskCnt(thread->GetQos()) > 0;
154 });
155 workerGroup[thread->GetQos()].OutOfDeepSleep();
156 thread->SetWorkerState(WorkerStatus::EXECUTING);
157 return WorkerAction::RETRY;
158 #else
159 workerGroup[thread->GetQos()].WorkerDestroy();
160 return WorkerAction::RETIRE;
161 #endif
162 }
163 }
164
WakeupWorkers(const QoS & qos)165 void SExecuteUnit::WakeupWorkers(const QoS& qos)
166 {
167 if (tearDown) {
168 FFRT_SYSEVENT_LOGE("CPU Worker Manager exit");
169 return;
170 }
171 workerGroup[qos].cv.notify_one();
172 }
173
174 // default strategy which is kind of radical for poking workers
HandleTaskNotifyDefault(SExecuteUnit * manager,const QoS & qos,TaskNotifyType notifyType)175 void SExecuteUnit::HandleTaskNotifyDefault(SExecuteUnit* manager, const QoS& qos, TaskNotifyType notifyType)
176 {
177 size_t taskCount = FFRTFacade::GetSchedInstance()->GetGlobalTaskCnt(qos);
178 switch (notifyType) {
179 case TaskNotifyType::TASK_ADDED:
180 case TaskNotifyType::TASK_PICKED:
181 case TaskNotifyType::TASK_ESCAPED:
182 if (taskCount > 0) {
183 manager->PokeImpl(qos, taskCount, notifyType);
184 }
185 break;
186 case TaskNotifyType::TASK_LOCAL:
187 manager->PokeImpl(qos, taskCount, notifyType);
188 break;
189 default:
190 break;
191 }
192 }
193
194 // conservative strategy for poking workers
HandleTaskNotifyConservative(SExecuteUnit * manager,const QoS & qos,TaskNotifyType notifyType)195 void SExecuteUnit::HandleTaskNotifyConservative(SExecuteUnit* manager, const QoS& qos, TaskNotifyType notifyType)
196 {
197 int taskCount = FFRTFacade::GetSchedInstance()->GetGlobalTaskCnt(qos);
198 if (taskCount == 0) {
199 // no available task in global queue, skip
200 return;
201 }
202 constexpr double thresholdTaskPick = 1.0;
203 CPUWorkerGroup& workerCtrl = manager->workerGroup[qos];
204 workerCtrl.lock.lock();
205
206 if (notifyType == TaskNotifyType::TASK_PICKED) {
207 int wakedWorkerCount = workerCtrl.executingNum;
208 double remainingLoadRatio = (wakedWorkerCount == 0) ? static_cast<double>(workerCtrl.maxConcurrency) :
209 static_cast<double>(taskCount) / static_cast<double>(wakedWorkerCount);
210 if (remainingLoadRatio <= thresholdTaskPick) {
211 // for task pick, wake worker when load ratio > 1
212 workerCtrl.lock.unlock();
213 return;
214 }
215 }
216
217 if (static_cast<uint32_t>(workerCtrl.executingNum) < workerCtrl.maxConcurrency) {
218 if (workerCtrl.sleepingNum == 0) {
219 FFRT_LOGI("begin to create worker, notifyType[%d]"
220 "execnum[%d], maxconcur[%d], slpnum[%d], dslpnum[%d]",
221 notifyType, workerCtrl.executingNum, workerCtrl.maxConcurrency,
222 workerCtrl.sleepingNum, workerCtrl.deepSleepingWorkerNum);
223 workerCtrl.WorkerCreate();
224 workerCtrl.lock.unlock();
225 if (!manager->IncWorker(qos)) {
226 workerCtrl.RollBackCreate();
227 }
228 } else {
229 workerCtrl.lock.unlock();
230 manager->WakeupWorkers(qos);
231 }
232 } else {
233 workerCtrl.lock.unlock();
234 }
235 }
236
HandleTaskNotifyUltraConservative(SExecuteUnit * manager,const QoS & qos,TaskNotifyType notifyType)237 void SExecuteUnit::HandleTaskNotifyUltraConservative(SExecuteUnit* manager, const QoS& qos, TaskNotifyType notifyType)
238 {
239 (void)notifyType;
240 int taskCount = FFRTFacade::GetSchedInstance()->GetGlobalTaskCnt(qos);
241 if (taskCount == 0) {
242 // no available task in global queue, skip
243 return;
244 }
245
246 CPUWorkerGroup& workerCtrl = manager->workerGroup[qos];
247 std::lock_guard lock(workerCtrl.lock);
248
249 int runningNum = static_cast<int>(manager->GetRunningNum(qos));
250 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
251 if (manager->blockAwareInit && !manager->stopMonitor && taskCount == runningNum) {
252 return;
253 }
254 #endif
255
256 if (taskCount < runningNum) {
257 return;
258 }
259
260 if (runningNum < static_cast<int>(workerCtrl.maxConcurrency)) {
261 if (workerCtrl.sleepingNum == 0) {
262 workerCtrl.WorkerCreate();
263 if (!manager->IncWorker(qos)) {
264 workerCtrl.RollBackCreate();
265 }
266 } else {
267 manager->WakeupWorkers(qos);
268 }
269 }
270 }
271
PokeImpl(const QoS & qos,uint32_t taskCount,TaskNotifyType notifyType)272 void SExecuteUnit::PokeImpl(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyType)
273 {
274 CPUWorkerGroup& workerCtrl = workerGroup[qos];
275 workerCtrl.lock.lock();
276 size_t runningNum = GetRunningNum(qos);
277 size_t totalNum = static_cast<size_t>(workerCtrl.sleepingNum + workerCtrl.executingNum);
278
279 bool tiggerSuppression = (totalNum > TIGGER_SUPPRESS_WORKER_COUNT) &&
280 (runningNum > TIGGER_SUPPRESS_EXECUTION_NUM) && (taskCount < runningNum);
281 if (notifyType != TaskNotifyType::TASK_ADDED && notifyType != TaskNotifyType::TASK_ESCAPED && tiggerSuppression) {
282 workerCtrl.lock.unlock();
283 return;
284 }
285
286 if ((static_cast<uint32_t>(workerCtrl.sleepingNum) > 0) && (runningNum < workerCtrl.maxConcurrency)) {
287 workerCtrl.lock.unlock();
288 WakeupWorkers(qos);
289 } else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
290 workerCtrl.WorkerCreate();
291 FFRTTraceRecord::WorkRecord(qos(), workerCtrl.executingNum);
292 workerCtrl.lock.unlock();
293 if (!IncWorker(qos)) {
294 workerCtrl.RollBackCreate();
295 }
296 } else if ((runningNum == 0) && (totalNum < MAX_ESCAPE_WORKER_NUM)) {
297 SubmitEscape(qos, totalNum);
298 workerCtrl.lock.unlock();
299 } else {
300 workerCtrl.lock.unlock();
301 }
302 }
303
ExecuteEscape(int qos)304 void SExecuteUnit::ExecuteEscape(int qos)
305 {
306 if (FFRTFacade::GetSchedInstance()->GetGlobalTaskCnt(qos) > 0) {
307 CPUWorkerGroup& workerCtrl = workerGroup[qos];
308 workerCtrl.lock.lock();
309
310 size_t runningNum = GetRunningNum(qos);
311 size_t totalNum = static_cast<size_t>(workerCtrl.sleepingNum + workerCtrl.executingNum);
312 if ((workerCtrl.sleepingNum > 0) && (runningNum < workerCtrl.maxConcurrency)) {
313 workerCtrl.lock.unlock();
314 WakeupWorkers(qos);
315 } else if ((runningNum == 0) && (totalNum < MAX_ESCAPE_WORKER_NUM)) {
316 if (IsEscapeEnable()) {
317 workerCtrl.WorkerCreate();
318 FFRTTraceRecord::WorkRecord(qos, workerCtrl.executingNum);
319 workerCtrl.lock.unlock();
320 if (!IncWorker(qos)) {
321 workerCtrl.RollBackCreate();
322 }
323 } else {
324 workerCtrl.lock.unlock();
325 }
326 ReportEscapeEvent(qos, totalNum);
327 } else {
328 workerCtrl.lock.unlock();
329 }
330 }
331 }
332 } // namespace ffrt
333