• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "eu/execute_unit.h"
17 #include "eu/sexecute_unit.h"
18 
19 #include <climits>
20 #include <cstring>
21 #include <sys/stat.h>
22 #include "dfx/trace_record/ffrt_trace_record.h"
23 #include "eu/co_routine_factory.h"
24 #include "eu/qos_interface.h"
25 #include "sched/scheduler.h"
26 #include "sched/workgroup_internal.h"
27 #include "util/ffrt_facade.h"
28 #include "util/slab.h"
29 #include "dfx/perf/ffrt_perf.h"
30 #include "dfx/sysevent/sysevent.h"
31 #include "internal_inc/config.h"
32 
33 namespace {
34 /* SUPPORT_WORKER_DESTRUCT indicates that the idle thread destruction function is supported.
35  * The stack canary is saved or restored during coroutine switch-out and switch-in,
36  * currently, only the stack canary used by the ohos compiler stack protection is global
37  * and is not affected by worker destruction.
38  */
39 #if !defined(SUPPORT_WORKER_DESTRUCT)
40 constexpr int waiting_seconds = 10;
41 #else
42 constexpr int waiting_seconds = 5;
43 #endif
44 const size_t TIGGER_SUPPRESS_WORKER_COUNT = 4;
45 const size_t TIGGER_SUPPRESS_EXECUTION_NUM = 2;
46 const size_t MAX_ESCAPE_WORKER_NUM = 1024;
47 const int SEXECUTE_DESTRY_SLEEP_TIME = 1000;
48 
49 const std::map<std::string,
50     void(*)(ffrt::SExecuteUnit*, const ffrt::QoS&, ffrt::TaskNotifyType)> NOTIFY_FUNCTION_FACTORY = {
51     { "CameraDaemon", ffrt::SExecuteUnit::HandleTaskNotifyConservative },
52     { "bluetooth", ffrt::SExecuteUnit::HandleTaskNotifyUltraConservative },
53 };
54 }
55 
56 namespace ffrt {
57 constexpr int MANAGER_DESTRUCT_TIMESOUT = 1000;
58 constexpr uint64_t DELAYED_WAKED_UP_TASK_TIME_INTERVAL = 5 * 1000 * 1000;
59 
SExecuteUnit()60 SExecuteUnit::SExecuteUnit() : ExecuteUnit(), handleTaskNotify(SExecuteUnit::HandleTaskNotifyDefault)
61 {
62 #ifdef OHOS_STANDARD_SYSTEM
63     for (const auto& notifyFunc : NOTIFY_FUNCTION_FACTORY) {
64         if (strstr(GetCurrentProcessName(), notifyFunc.first.c_str())) {
65             handleTaskNotify = notifyFunc.second;
66             break;
67         }
68     }
69 #endif
70 
71 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
72     int ret = BlockawareInit(&keyPtr);
73     if (ret != 0) {
74         FFRT_SYSEVENT_LOGE("blockaware init fail, ret[%d], key[0x%lx]", ret, keyPtr);
75     } else {
76         blockAwareInit = true;
77     }
78 #endif
79     FFRT_LOGD("Construction completed.");
80 }
81 
~SExecuteUnit()82 SExecuteUnit::~SExecuteUnit()
83 {
84     tearDown = true;
85     for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
86         workerGroup[qos].SetTearDown();
87     }
88     // Before destroying this object, we need to make sure that all threads that
89     // might access this object or its members have exited.
90     // If the destruction of this object happens before or in parallel of
91     // these threads access to freed memory can occur.
92     for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
93         int try_cnt = MANAGER_DESTRUCT_TIMESOUT;
94         while (try_cnt-- > 0) {
95             {
96                 std::lock_guard lk(workerGroup[qos].mutex);
97                 workerGroup[qos].cv.notify_all();
98             }
99             {
100                 usleep(SEXECUTE_DESTRY_SLEEP_TIME);
101                 std::shared_lock<std::shared_mutex> lck(workerGroup[qos].tgMutex);
102                 if (workerGroup[qos].threads.empty()) {
103                     break;
104                 }
105             }
106         }
107 
108         if (try_cnt <= 0) {
109             FFRT_SYSEVENT_LOGE("erase qos[%d] threads failed", qos);
110         }
111     }
112     // Note that delayedWorker might
113     // call ffrt::SExecuteUnit::WakeupWorkers
114     // We need to ensure the object is still
115     // alive when that happens. Hence, we
116     // delay the destruction till we ensure
117     // this access cannot happen.
118     FFRTFacade::GetDWInstance().Terminate();
119     FFRT_LOGD("Destruction completed.");
120 }
121 
WorkerIdleAction(CPUWorker * thread)122 WorkerAction SExecuteUnit::WorkerIdleAction(CPUWorker* thread)
123 {
124     if (tearDown) {
125         return WorkerAction::RETIRE;
126     }
127     auto& group = workerGroup[thread->GetQos()];
128     std::unique_lock lk(group.mutex);
129     IntoSleep(thread->GetQos());
130     thread->SetWorkerState(WorkerStatus::SLEEPING);
131 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
132     BlockawareEnterSleeping();
133 #endif
134     if (group.cv.wait_for(lk, std::chrono::seconds(waiting_seconds), [this, thread] {
135         bool taskExistence = FFRTFacade::GetSchedInstance()->GetGlobalTaskCnt(thread->GetQos());
136         return tearDown || taskExistence;
137         })) {
138         workerGroup[thread->GetQos()].OutOfSleep();
139         thread->SetWorkerState(WorkerStatus::EXECUTING);
140 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
141         BlockawareLeaveSleeping();
142 #endif
143         return WorkerAction::RETRY;
144     } else {
145 #if !defined(SUPPORT_WORKER_DESTRUCT)
146         workerGroup[thread->GetQos()].IntoDeepSleep();
147         CoStackFree();
148         if (IsExceedDeepSleepThreshold()) {
149             ffrt::CoRoutineReleaseMem();
150         }
151         group.cv.wait(lk, [this, thread] {
152             return tearDown ||
153                 FFRTFacade::GetSchedInstance()->GetTotalTaskCnt(thread->GetQos()) > 0;
154         });
155         workerGroup[thread->GetQos()].OutOfDeepSleep();
156         thread->SetWorkerState(WorkerStatus::EXECUTING);
157         return WorkerAction::RETRY;
158 #else
159         workerGroup[thread->GetQos()].WorkerDestroy();
160         return WorkerAction::RETIRE;
161 #endif
162     }
163 }
164 
WakeupWorkers(const QoS & qos)165 void SExecuteUnit::WakeupWorkers(const QoS& qos)
166 {
167     if (tearDown) {
168         FFRT_SYSEVENT_LOGE("CPU Worker Manager exit");
169         return;
170     }
171     workerGroup[qos].cv.notify_one();
172 }
173 
174 // default strategy which is kind of radical for poking workers
HandleTaskNotifyDefault(SExecuteUnit * manager,const QoS & qos,TaskNotifyType notifyType)175 void SExecuteUnit::HandleTaskNotifyDefault(SExecuteUnit* manager, const QoS& qos, TaskNotifyType notifyType)
176 {
177     size_t taskCount = FFRTFacade::GetSchedInstance()->GetGlobalTaskCnt(qos);
178     switch (notifyType) {
179         case TaskNotifyType::TASK_ADDED:
180         case TaskNotifyType::TASK_PICKED:
181         case TaskNotifyType::TASK_ESCAPED:
182             if (taskCount > 0) {
183                 manager->PokeImpl(qos, taskCount, notifyType);
184             }
185             break;
186         case TaskNotifyType::TASK_LOCAL:
187                 manager->PokeImpl(qos, taskCount, notifyType);
188             break;
189         default:
190             break;
191     }
192 }
193 
194 // conservative strategy for poking workers
HandleTaskNotifyConservative(SExecuteUnit * manager,const QoS & qos,TaskNotifyType notifyType)195 void SExecuteUnit::HandleTaskNotifyConservative(SExecuteUnit* manager, const QoS& qos, TaskNotifyType notifyType)
196 {
197     int taskCount = FFRTFacade::GetSchedInstance()->GetGlobalTaskCnt(qos);
198     if (taskCount == 0) {
199         // no available task in global queue, skip
200         return;
201     }
202     constexpr double thresholdTaskPick = 1.0;
203     CPUWorkerGroup& workerCtrl = manager->workerGroup[qos];
204     workerCtrl.lock.lock();
205 
206     if (notifyType == TaskNotifyType::TASK_PICKED) {
207         int wakedWorkerCount = workerCtrl.executingNum;
208         double remainingLoadRatio = (wakedWorkerCount == 0) ? static_cast<double>(workerCtrl.maxConcurrency) :
209             static_cast<double>(taskCount) / static_cast<double>(wakedWorkerCount);
210         if (remainingLoadRatio <= thresholdTaskPick) {
211             // for task pick, wake worker when load ratio > 1
212             workerCtrl.lock.unlock();
213             return;
214         }
215     }
216 
217     if (static_cast<uint32_t>(workerCtrl.executingNum) < workerCtrl.maxConcurrency) {
218         if (workerCtrl.sleepingNum == 0) {
219             FFRT_LOGI("begin to create worker, notifyType[%d]"
220                 "execnum[%d], maxconcur[%d], slpnum[%d], dslpnum[%d]",
221                 notifyType, workerCtrl.executingNum, workerCtrl.maxConcurrency,
222                 workerCtrl.sleepingNum, workerCtrl.deepSleepingWorkerNum);
223             workerCtrl.WorkerCreate();
224             workerCtrl.lock.unlock();
225             if (!manager->IncWorker(qos)) {
226                 workerCtrl.RollBackCreate();
227             }
228         } else {
229             workerCtrl.lock.unlock();
230             manager->WakeupWorkers(qos);
231         }
232     } else {
233         workerCtrl.lock.unlock();
234     }
235 }
236 
HandleTaskNotifyUltraConservative(SExecuteUnit * manager,const QoS & qos,TaskNotifyType notifyType)237 void SExecuteUnit::HandleTaskNotifyUltraConservative(SExecuteUnit* manager, const QoS& qos, TaskNotifyType notifyType)
238 {
239     (void)notifyType;
240     int taskCount = FFRTFacade::GetSchedInstance()->GetGlobalTaskCnt(qos);
241     if (taskCount == 0) {
242         // no available task in global queue, skip
243         return;
244     }
245 
246     CPUWorkerGroup& workerCtrl = manager->workerGroup[qos];
247     std::lock_guard lock(workerCtrl.lock);
248 
249     int runningNum = static_cast<int>(manager->GetRunningNum(qos));
250 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
251     if (manager->blockAwareInit && !manager->stopMonitor && taskCount == runningNum) {
252         return;
253     }
254 #endif
255 
256     if (taskCount < runningNum) {
257         return;
258     }
259 
260     if (runningNum < static_cast<int>(workerCtrl.maxConcurrency)) {
261         if (workerCtrl.sleepingNum == 0) {
262             workerCtrl.WorkerCreate();
263             if (!manager->IncWorker(qos)) {
264                 workerCtrl.RollBackCreate();
265             }
266         } else {
267             manager->WakeupWorkers(qos);
268         }
269     }
270 }
271 
PokeImpl(const QoS & qos,uint32_t taskCount,TaskNotifyType notifyType)272 void SExecuteUnit::PokeImpl(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyType)
273 {
274     CPUWorkerGroup& workerCtrl = workerGroup[qos];
275     workerCtrl.lock.lock();
276     size_t runningNum = GetRunningNum(qos);
277     size_t totalNum = static_cast<size_t>(workerCtrl.sleepingNum + workerCtrl.executingNum);
278 
279     bool tiggerSuppression = (totalNum > TIGGER_SUPPRESS_WORKER_COUNT) &&
280         (runningNum > TIGGER_SUPPRESS_EXECUTION_NUM) && (taskCount < runningNum);
281     if (notifyType != TaskNotifyType::TASK_ADDED && notifyType != TaskNotifyType::TASK_ESCAPED && tiggerSuppression) {
282         workerCtrl.lock.unlock();
283         return;
284     }
285 
286     if ((static_cast<uint32_t>(workerCtrl.sleepingNum) > 0) && (runningNum < workerCtrl.maxConcurrency)) {
287         workerCtrl.lock.unlock();
288         WakeupWorkers(qos);
289     } else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
290         workerCtrl.WorkerCreate();
291         FFRTTraceRecord::WorkRecord(qos(), workerCtrl.executingNum);
292         workerCtrl.lock.unlock();
293         if (!IncWorker(qos)) {
294             workerCtrl.RollBackCreate();
295         }
296     } else if ((runningNum == 0) && (totalNum < MAX_ESCAPE_WORKER_NUM)) {
297         SubmitEscape(qos, totalNum);
298         workerCtrl.lock.unlock();
299     } else {
300         workerCtrl.lock.unlock();
301     }
302 }
303 
ExecuteEscape(int qos)304 void SExecuteUnit::ExecuteEscape(int qos)
305 {
306     if (FFRTFacade::GetSchedInstance()->GetGlobalTaskCnt(qos) > 0) {
307         CPUWorkerGroup& workerCtrl = workerGroup[qos];
308         workerCtrl.lock.lock();
309 
310         size_t runningNum = GetRunningNum(qos);
311         size_t totalNum = static_cast<size_t>(workerCtrl.sleepingNum + workerCtrl.executingNum);
312         if ((workerCtrl.sleepingNum > 0) && (runningNum < workerCtrl.maxConcurrency)) {
313             workerCtrl.lock.unlock();
314             WakeupWorkers(qos);
315         } else if ((runningNum == 0) && (totalNum < MAX_ESCAPE_WORKER_NUM)) {
316             if (IsEscapeEnable()) {
317                 workerCtrl.WorkerCreate();
318                 FFRTTraceRecord::WorkRecord(qos, workerCtrl.executingNum);
319                 workerCtrl.lock.unlock();
320                 if (!IncWorker(qos)) {
321                     workerCtrl.RollBackCreate();
322                 }
323             } else {
324                 workerCtrl.lock.unlock();
325             }
326             ReportEscapeEvent(qos, totalNum);
327         } else {
328             workerCtrl.lock.unlock();
329         }
330     }
331 }
332 } // namespace ffrt
333