1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "eu/cpu_monitor.h"
17 #include <iostream>
18 #include <thread>
19 #include <unistd.h>
20 #include <securec.h>
21 #include "sched/scheduler.h"
22 #include "eu/execute_unit.h"
23 #include "dfx/log/ffrt_log_api.h"
24 #include "dfx/trace_record/ffrt_trace_record.h"
25 #include "internal_inc/config.h"
26 #include "util/name_manager.h"
27 #include "sync/poller.h"
28 #include "util/ffrt_facade.h"
29 #include "util/spmc_queue.h"
30
31 namespace {
32 const size_t TIGGER_SUPPRESS_WORKER_COUNT = 4;
33 const size_t TIGGER_SUPPRESS_EXECUTION_NUM = 2;
34 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
35 constexpr int JITTER_DELAY_MS = 5;
36 #endif
37 }
38
39 namespace ffrt {
CPUMonitor(CpuMonitorOps && ops)40 CPUMonitor::CPUMonitor(CpuMonitorOps&& ops) : ops(ops)
41 {
42 SetupMonitor();
43 }
44
~CPUMonitor()45 CPUMonitor::~CPUMonitor()
46 {
47 if (monitorThread != nullptr) {
48 monitorThread->join();
49 }
50 delete monitorThread;
51 monitorThread = nullptr;
52 }
53
SetupMonitor()54 void CPUMonitor::SetupMonitor()
55 {
56 for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
57 ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT;
58 ctrlQueue[qos].maxConcurrency = GlobalConfig::Instance().getCpuWorkerNum(qos);
59 setWorkerMaxNum[qos] = false;
60 }
61 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
62 memset_s(&domainInfoMonitor, sizeof(domainInfoMonitor), 0, sizeof(domainInfoMonitor));
63 memset_s(&domainInfoNotify, sizeof(domainInfoNotify), 0, sizeof(domainInfoNotify));
64 wakeupCond.check_ahead = false;
65 wakeupCond.global.low = 0;
66 wakeupCond.global.high = 0;
67 for (int i = 0; i < BLOCKAWARE_DOMAIN_ID_MAX + 1; i++) {
68 wakeupCond.local[i].low = 0;
69 if (i < qosMonitorMaxNum) {
70 wakeupCond.local[i].high = ctrlQueue[i].maxConcurrency;
71 wakeupCond.global.low += wakeupCond.local[i].low;
72 wakeupCond.global.high += wakeupCond.local[i].high;
73 } else {
74 wakeupCond.local[i].high = 0;
75 }
76 }
77 for (int i = 0; i < QoS::MaxNum(); i++) {
78 exceedUpperWaterLine[i] = false;
79 }
80 #endif
81 }
82
StartMonitor()83 void CPUMonitor::StartMonitor()
84 {
85 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
86 int ret = BlockawareInit(&keyPtr);
87 if (ret != 0) {
88 FFRT_LOGE("blockaware init fail, ret[%d], key[0x%lx]", ret, keyPtr);
89 } else {
90 blockAwareInit = true;
91 }
92 #else
93 monitorThread = nullptr;
94 #endif
95 }
96
SetWorkerMaxNum(const QoS & qos,int num)97 int CPUMonitor::SetWorkerMaxNum(const QoS& qos, int num)
98 {
99 WorkerCtrl& workerCtrl = ctrlQueue[qos()];
100 workerCtrl.lock.lock();
101 if (setWorkerMaxNum[qos()]) {
102 FFRT_LOGE("qos[%d] worker num can only been setup once", qos());
103 workerCtrl.lock.unlock();
104 return -1;
105 }
106 if (num <= 0 || num > QOS_WORKER_MAXNUM) {
107 FFRT_LOGE("qos[%d] worker num[%d] is invalid.", qos(), num);
108 workerCtrl.lock.unlock();
109 return -1;
110 }
111 workerCtrl.hardLimit = num;
112 setWorkerMaxNum[qos()] = true;
113 workerCtrl.lock.unlock();
114 return 0;
115 }
116
GetMonitorTid() const117 uint32_t CPUMonitor::GetMonitorTid() const
118 {
119 return monitorTid;
120 }
121
122 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
WakeupCond(void)123 BlockawareWakeupCond* CPUMonitor::WakeupCond(void)
124 {
125 return &wakeupCond;
126 }
127
MonitorMain()128 void CPUMonitor::MonitorMain()
129 {
130 (void)WorkerInit();
131 int ret = BlockawareLoadSnapshot(keyPtr, &domainInfoMonitor);
132 if (ret != 0) {
133 FFRT_LOGE("blockaware load snapshot fail, ret[%d]", ret);
134 return;
135 }
136 for (int i = 0; i < qosMonitorMaxNum; i++) {
137 size_t taskCount = static_cast<size_t>(ops.GetTaskCount(i));
138 if (taskCount > 0 && domainInfoMonitor.localinfo[i].nrRunning <= wakeupCond.local[i].low) {
139 Poke(i, taskCount, TaskNotifyType::TASK_ADDED);
140 }
141 if (domainInfoMonitor.localinfo[i].nrRunning > wakeupCond.local[i].high) {
142 exceedUpperWaterLine[i] = true;
143 }
144 }
145 stopMonitor = true;
146 }
147
IsExceedRunningThreshold(const QoS & qos)148 bool CPUMonitor::IsExceedRunningThreshold(const QoS& qos)
149 {
150 if (blockAwareInit && exceedUpperWaterLine[qos()]) {
151 exceedUpperWaterLine[qos()] = false;
152 return true;
153 }
154 return false;
155 }
156
IsBlockAwareInit(void)157 bool CPUMonitor::IsBlockAwareInit(void)
158 {
159 return blockAwareInit;
160 }
161 #endif
162
TimeoutCount(const QoS & qos)163 void CPUMonitor::TimeoutCount(const QoS& qos)
164 {
165 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
166 std::lock_guard lk(workerCtrl.lock);
167 workerCtrl.sleepingWorkerNum--;
168 }
169
WakeupSleep(const QoS & qos,bool irqWake)170 void CPUMonitor::WakeupSleep(const QoS& qos, bool irqWake)
171 {
172 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
173 std::lock_guard lk(workerCtrl.lock);
174 if (irqWake) {
175 workerCtrl.irqEnable = false;
176 }
177 workerCtrl.sleepingWorkerNum--;
178 workerCtrl.executionNum++;
179 }
180
TotalCount(const QoS & qos)181 int CPUMonitor::TotalCount(const QoS& qos)
182 {
183 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
184 workerCtrl.lock.lock();
185 int total = workerCtrl.sleepingWorkerNum + workerCtrl.executionNum;
186 workerCtrl.lock.unlock();
187 return total;
188 }
189
RollbackDestroy(const QoS & qos,bool irqWake)190 void CPUMonitor::RollbackDestroy(const QoS& qos, bool irqWake)
191 {
192 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
193 std::lock_guard lk(workerCtrl.lock);
194 if (irqWake) {
195 workerCtrl.irqEnable = false;
196 }
197 workerCtrl.executionNum++;
198 }
199
TryDestroy(const QoS & qos)200 bool CPUMonitor::TryDestroy(const QoS& qos)
201 {
202 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
203 std::lock_guard lk(workerCtrl.lock);
204 workerCtrl.sleepingWorkerNum--;
205 return workerCtrl.sleepingWorkerNum > 0;
206 }
207
SleepingWorkerNum(const QoS & qos)208 int CPUMonitor::SleepingWorkerNum(const QoS& qos)
209 {
210 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
211 std::unique_lock lk(workerCtrl.lock);
212 return workerCtrl.sleepingWorkerNum;
213 }
214
WakedWorkerNum(const QoS & qos)215 int CPUMonitor::WakedWorkerNum(const QoS& qos)
216 {
217 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
218 std::lock_guard lk(workerCtrl.lock);
219 return workerCtrl.executionNum;
220 }
221
IntoDeepSleep(const QoS & qos)222 void CPUMonitor::IntoDeepSleep(const QoS& qos)
223 {
224 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
225 std::lock_guard lk(workerCtrl.lock);
226 workerCtrl.deepSleepingWorkerNum++;
227 }
228
WakeupDeepSleep(const QoS & qos,bool irqWake)229 void CPUMonitor::WakeupDeepSleep(const QoS& qos, bool irqWake)
230 {
231 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
232 std::lock_guard lk(workerCtrl.lock);
233 if (irqWake) {
234 workerCtrl.irqEnable = false;
235 }
236 workerCtrl.sleepingWorkerNum--;
237 workerCtrl.deepSleepingWorkerNum--;
238 workerCtrl.executionNum++;
239 }
240
IntoPollWait(const QoS & qos)241 void CPUMonitor::IntoPollWait(const QoS& qos)
242 {
243 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
244 std::lock_guard lk(workerCtrl.lock);
245 workerCtrl.pollWaitFlag = true;
246 }
247
OutOfPollWait(const QoS & qos)248 void CPUMonitor::OutOfPollWait(const QoS& qos)
249 {
250 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
251 std::lock_guard lk(workerCtrl.lock);
252 workerCtrl.pollWaitFlag = false;
253 }
254
IsExceedDeepSleepThreshold()255 bool CPUMonitor::IsExceedDeepSleepThreshold()
256 {
257 int totalWorker = 0;
258 int deepSleepingWorkerNum = 0;
259 for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
260 WorkerCtrl& workerCtrl = ctrlQueue[i];
261 std::lock_guard lk(workerCtrl.lock);
262 deepSleepingWorkerNum += workerCtrl.deepSleepingWorkerNum;
263 totalWorker += workerCtrl.executionNum + workerCtrl.sleepingWorkerNum;
264 }
265 return deepSleepingWorkerNum * 2 > totalWorker;
266 }
267
Poke(const QoS & qos,uint32_t taskCount,TaskNotifyType notifyType)268 void CPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyType)
269 {
270 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
271 workerCtrl.lock.lock();
272 size_t runningNum = workerCtrl.executionNum;
273 size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
274 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
275 /* There is no need to update running num when executionNum < maxConcurrency */
276 if (workerCtrl.executionNum >= workerCtrl.maxConcurrency) {
277 if (blockAwareInit && !BlockawareLoadSnapshot(keyPtr, &domainInfoNotify)) {
278 if (workerCtrl.executionNum >= domainInfoNotify.localinfo[qos()].nrBlocked) {
279 /* nrRunning may not be updated in a timely manner */
280 runningNum = workerCtrl.executionNum - domainInfoNotify.localinfo[qos()].nrBlocked;
281 } else {
282 FFRT_LOGE("qos [%d] nrBlocked [%u] is larger than executionNum [%d].",
283 qos(), domainInfoNotify.localinfo[qos()].nrBlocked, workerCtrl.executionNum);
284 }
285 }
286 }
287 #endif
288
289 bool tiggerSuppression = (totalNum > TIGGER_SUPPRESS_WORKER_COUNT) &&
290 (runningNum > TIGGER_SUPPRESS_EXECUTION_NUM) && (taskCount < runningNum);
291
292 if (notifyType != TaskNotifyType::TASK_ADDED && tiggerSuppression) {
293 workerCtrl.lock.unlock();
294 return;
295 }
296 if (static_cast<uint32_t>(workerCtrl.sleepingWorkerNum) > 0) {
297 workerCtrl.lock.unlock();
298 ops.WakeupWorkers(qos);
299 } else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
300 workerCtrl.executionNum++;
301 FFRTTraceRecord::WorkRecord((int)qos, workerCtrl.executionNum);
302 workerCtrl.lock.unlock();
303 ops.IncWorker(qos);
304 } else {
305 if (workerCtrl.pollWaitFlag) {
306 FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
307 }
308 workerCtrl.lock.unlock();
309 }
310 }
311
NotifyWorkers(const QoS & qos,int number)312 void CPUMonitor::NotifyWorkers(const QoS& qos, int number)
313 {
314 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
315 workerCtrl.lock.lock();
316
317 int increasableNumber = static_cast<int>(workerCtrl.maxConcurrency) -
318 (workerCtrl.executionNum + workerCtrl.sleepingWorkerNum);
319 int wakeupNumber = std::min(number, workerCtrl.sleepingWorkerNum);
320 for (int idx = 0; idx < wakeupNumber; idx++) {
321 ops.WakeupWorkers(qos);
322 }
323
324 int incNumber = std::min(number - wakeupNumber, increasableNumber);
325 for (int idx = 0; idx < incNumber; idx++) {
326 workerCtrl.executionNum++;
327 ops.IncWorker(qos);
328 }
329
330 workerCtrl.lock.unlock();
331 FFRT_LOGD("qos[%d] inc [%d] workers, wakeup [%d] workers", static_cast<int>(qos), incNumber, wakeupNumber);
332 }
333
334 // default strategy which is kind of radical for poking workers
HandleTaskNotifyDefault(const QoS & qos,void * p,TaskNotifyType notifyType)335 void CPUMonitor::HandleTaskNotifyDefault(const QoS& qos, void* p, TaskNotifyType notifyType)
336 {
337 CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
338 size_t taskCount = static_cast<size_t>(monitor->GetOps().GetTaskCount(qos));
339 switch (notifyType) {
340 case TaskNotifyType::TASK_ADDED:
341 case TaskNotifyType::TASK_PICKED:
342 if (taskCount > 0) {
343 monitor->Poke(qos, taskCount, notifyType);
344 }
345 break;
346 case TaskNotifyType::TASK_LOCAL:
347 monitor->Poke(qos, taskCount, notifyType);
348 break;
349 default:
350 break;
351 }
352 }
353
354 // conservative strategy for poking workers
HandleTaskNotifyConservative(const QoS & qos,void * p,TaskNotifyType notifyType)355 void CPUMonitor::HandleTaskNotifyConservative(const QoS& qos, void* p, TaskNotifyType notifyType)
356 {
357 CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
358 int taskCount = monitor->ops.GetTaskCount(qos);
359 if (taskCount == 0) {
360 // no available task in global queue, skip
361 return;
362 }
363 constexpr double thresholdTaskPick = 1.0;
364 WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
365 workerCtrl.lock.lock();
366
367 if (notifyType == TaskNotifyType::TASK_PICKED) {
368 int wakedWorkerCount = workerCtrl.executionNum;
369 double remainingLoadRatio = (wakedWorkerCount == 0) ? static_cast<double>(workerCtrl.maxConcurrency) :
370 static_cast<double>(taskCount) / static_cast<double>(wakedWorkerCount);
371 if (remainingLoadRatio <= thresholdTaskPick) {
372 // for task pick, wake worker when load ratio > 1
373 workerCtrl.lock.unlock();
374 return;
375 }
376 }
377
378 if (static_cast<uint32_t>(workerCtrl.executionNum) < workerCtrl.maxConcurrency) {
379 if (workerCtrl.sleepingWorkerNum == 0) {
380 FFRT_LOGI("begin to create worker, notifyType[%d]"
381 "execnum[%d], maxconcur[%d], slpnum[%d], dslpnum[%d]",
382 notifyType, workerCtrl.executionNum, workerCtrl.maxConcurrency,
383 workerCtrl.sleepingWorkerNum, workerCtrl.deepSleepingWorkerNum);
384 workerCtrl.executionNum++;
385 workerCtrl.lock.unlock();
386 monitor->ops.IncWorker(qos);
387 } else {
388 workerCtrl.lock.unlock();
389 monitor->ops.WakeupWorkers(qos);
390 }
391 } else {
392 workerCtrl.lock.unlock();
393 }
394 }
395
HandleTaskNotifyUltraConservative(const QoS & qos,void * p,TaskNotifyType notifyType)396 void CPUMonitor::HandleTaskNotifyUltraConservative(const QoS& qos, void* p, TaskNotifyType notifyType)
397 {
398 (void)notifyType;
399 CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
400 int taskCount = monitor->ops.GetTaskCount(qos);
401 if (taskCount == 0) {
402 // no available task in global queue, skip
403 return;
404 }
405
406 WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
407 std::lock_guard lock(workerCtrl.lock);
408
409 int runningNum = workerCtrl.executionNum;
410 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
411 if (monitor->blockAwareInit && !BlockawareLoadSnapshot(monitor->keyPtr, &monitor->domainInfoNotify)) {
412 /* nrRunning may not be updated in a timely manner */
413 runningNum = workerCtrl.executionNum - monitor->domainInfoNotify.localinfo[qos()].nrBlocked;
414 if (!monitor->stopMonitor && taskCount == runningNum) {
415 BlockawareWake();
416 return;
417 }
418 }
419 #endif
420
421 if (taskCount < runningNum) {
422 return;
423 }
424
425 if (runningNum < static_cast<int>(workerCtrl.maxConcurrency)) {
426 if (workerCtrl.sleepingWorkerNum == 0) {
427 workerCtrl.executionNum++;
428 monitor->ops.IncWorker(qos);
429 } else {
430 monitor->ops.WakeupWorkers(qos);
431 }
432 }
433 }
434 }