• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "eu/cpu_monitor.h"
17 #include <iostream>
18 #include <thread>
19 #include <unistd.h>
20 #include <securec.h>
21 #include "sched/scheduler.h"
22 #include "eu/execute_unit.h"
23 #include "dfx/log/ffrt_log_api.h"
24 #include "dfx/trace_record/ffrt_trace_record.h"
25 #include "internal_inc/config.h"
26 #include "util/name_manager.h"
27 #include "sync/poller.h"
28 #include "util/ffrt_facade.h"
29 #include "util/spmc_queue.h"
30 
31 namespace {
32 const size_t TIGGER_SUPPRESS_WORKER_COUNT = 4;
33 const size_t TIGGER_SUPPRESS_EXECUTION_NUM = 2;
34 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
35 constexpr int JITTER_DELAY_MS = 5;
36 #endif
37 }
38 
39 namespace ffrt {
CPUMonitor(CpuMonitorOps && ops)40 CPUMonitor::CPUMonitor(CpuMonitorOps&& ops) : ops(ops)
41 {
42     SetupMonitor();
43 }
44 
~CPUMonitor()45 CPUMonitor::~CPUMonitor()
46 {
47     if (monitorThread != nullptr) {
48         monitorThread->join();
49     }
50     delete monitorThread;
51     monitorThread = nullptr;
52 }
53 
SetupMonitor()54 void CPUMonitor::SetupMonitor()
55 {
56     for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
57         ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT;
58         ctrlQueue[qos].maxConcurrency = GlobalConfig::Instance().getCpuWorkerNum(qos);
59         setWorkerMaxNum[qos] = false;
60     }
61 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
62     memset_s(&domainInfoMonitor, sizeof(domainInfoMonitor), 0, sizeof(domainInfoMonitor));
63     memset_s(&domainInfoNotify, sizeof(domainInfoNotify), 0, sizeof(domainInfoNotify));
64     wakeupCond.check_ahead = false;
65     wakeupCond.global.low = 0;
66     wakeupCond.global.high = 0;
67     for (int i = 0; i < BLOCKAWARE_DOMAIN_ID_MAX + 1; i++) {
68         wakeupCond.local[i].low = 0;
69         if (i < qosMonitorMaxNum) {
70             wakeupCond.local[i].high = ctrlQueue[i].maxConcurrency;
71             wakeupCond.global.low += wakeupCond.local[i].low;
72             wakeupCond.global.high += wakeupCond.local[i].high;
73         } else {
74             wakeupCond.local[i].high = 0;
75         }
76     }
77     for (int i = 0; i < QoS::MaxNum(); i++) {
78         exceedUpperWaterLine[i] = false;
79     }
80 #endif
81 }
82 
StartMonitor()83 void CPUMonitor::StartMonitor()
84 {
85 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
86     int ret = BlockawareInit(&keyPtr);
87     if (ret != 0) {
88         FFRT_LOGE("blockaware init fail, ret[%d], key[0x%lx]", ret, keyPtr);
89     } else {
90         blockAwareInit = true;
91     }
92 #else
93     monitorThread = nullptr;
94 #endif
95 }
96 
SetWorkerMaxNum(const QoS & qos,int num)97 int CPUMonitor::SetWorkerMaxNum(const QoS& qos, int num)
98 {
99     WorkerCtrl& workerCtrl = ctrlQueue[qos()];
100     workerCtrl.lock.lock();
101     if (setWorkerMaxNum[qos()]) {
102         FFRT_LOGE("qos[%d] worker num can only been setup once", qos());
103         workerCtrl.lock.unlock();
104         return -1;
105     }
106     if (num <= 0 || num > QOS_WORKER_MAXNUM) {
107         FFRT_LOGE("qos[%d] worker num[%d] is invalid.", qos(), num);
108         workerCtrl.lock.unlock();
109         return -1;
110     }
111     workerCtrl.hardLimit = num;
112     setWorkerMaxNum[qos()] = true;
113     workerCtrl.lock.unlock();
114     return 0;
115 }
116 
GetMonitorTid() const117 uint32_t CPUMonitor::GetMonitorTid() const
118 {
119     return monitorTid;
120 }
121 
122 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
WakeupCond(void)123 BlockawareWakeupCond* CPUMonitor::WakeupCond(void)
124 {
125     return &wakeupCond;
126 }
127 
MonitorMain()128 void CPUMonitor::MonitorMain()
129 {
130     (void)WorkerInit();
131     int ret = BlockawareLoadSnapshot(keyPtr, &domainInfoMonitor);
132     if (ret != 0) {
133         FFRT_LOGE("blockaware load snapshot fail, ret[%d]", ret);
134         return;
135     }
136     for (int i = 0; i < qosMonitorMaxNum; i++) {
137         size_t taskCount = static_cast<size_t>(ops.GetTaskCount(i));
138         if (taskCount > 0 && domainInfoMonitor.localinfo[i].nrRunning <= wakeupCond.local[i].low) {
139             Poke(i, taskCount, TaskNotifyType::TASK_ADDED);
140         }
141         if (domainInfoMonitor.localinfo[i].nrRunning > wakeupCond.local[i].high) {
142             exceedUpperWaterLine[i] = true;
143         }
144     }
145     stopMonitor = true;
146 }
147 
IsExceedRunningThreshold(const QoS & qos)148 bool CPUMonitor::IsExceedRunningThreshold(const QoS& qos)
149 {
150     if (blockAwareInit && exceedUpperWaterLine[qos()]) {
151         exceedUpperWaterLine[qos()] = false;
152         return true;
153     }
154     return false;
155 }
156 
IsBlockAwareInit(void)157 bool CPUMonitor::IsBlockAwareInit(void)
158 {
159     return blockAwareInit;
160 }
161 #endif
162 
TimeoutCount(const QoS & qos)163 void CPUMonitor::TimeoutCount(const QoS& qos)
164 {
165     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
166     std::lock_guard lk(workerCtrl.lock);
167     workerCtrl.sleepingWorkerNum--;
168 }
169 
WakeupSleep(const QoS & qos,bool irqWake)170 void CPUMonitor::WakeupSleep(const QoS& qos, bool irqWake)
171 {
172     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
173     std::lock_guard lk(workerCtrl.lock);
174     if (irqWake) {
175         workerCtrl.irqEnable = false;
176     }
177     workerCtrl.sleepingWorkerNum--;
178     workerCtrl.executionNum++;
179 }
180 
TotalCount(const QoS & qos)181 int CPUMonitor::TotalCount(const QoS& qos)
182 {
183     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
184     workerCtrl.lock.lock();
185     int total = workerCtrl.sleepingWorkerNum + workerCtrl.executionNum;
186     workerCtrl.lock.unlock();
187     return total;
188 }
189 
RollbackDestroy(const QoS & qos,bool irqWake)190 void CPUMonitor::RollbackDestroy(const QoS& qos, bool irqWake)
191 {
192     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
193     std::lock_guard lk(workerCtrl.lock);
194     if (irqWake) {
195         workerCtrl.irqEnable = false;
196     }
197     workerCtrl.executionNum++;
198 }
199 
TryDestroy(const QoS & qos)200 bool CPUMonitor::TryDestroy(const QoS& qos)
201 {
202     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
203     std::lock_guard lk(workerCtrl.lock);
204     workerCtrl.sleepingWorkerNum--;
205     return workerCtrl.sleepingWorkerNum > 0;
206 }
207 
SleepingWorkerNum(const QoS & qos)208 int CPUMonitor::SleepingWorkerNum(const QoS& qos)
209 {
210     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
211     std::unique_lock lk(workerCtrl.lock);
212     return workerCtrl.sleepingWorkerNum;
213 }
214 
WakedWorkerNum(const QoS & qos)215 int CPUMonitor::WakedWorkerNum(const QoS& qos)
216 {
217     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
218     std::lock_guard lk(workerCtrl.lock);
219     return workerCtrl.executionNum;
220 }
221 
IntoDeepSleep(const QoS & qos)222 void CPUMonitor::IntoDeepSleep(const QoS& qos)
223 {
224     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
225     std::lock_guard lk(workerCtrl.lock);
226     workerCtrl.deepSleepingWorkerNum++;
227 }
228 
WakeupDeepSleep(const QoS & qos,bool irqWake)229 void CPUMonitor::WakeupDeepSleep(const QoS& qos, bool irqWake)
230 {
231     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
232     std::lock_guard lk(workerCtrl.lock);
233     if (irqWake) {
234         workerCtrl.irqEnable = false;
235     }
236     workerCtrl.sleepingWorkerNum--;
237     workerCtrl.deepSleepingWorkerNum--;
238     workerCtrl.executionNum++;
239 }
240 
IntoPollWait(const QoS & qos)241 void CPUMonitor::IntoPollWait(const QoS& qos)
242 {
243     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
244     std::lock_guard lk(workerCtrl.lock);
245     workerCtrl.pollWaitFlag = true;
246 }
247 
OutOfPollWait(const QoS & qos)248 void CPUMonitor::OutOfPollWait(const QoS& qos)
249 {
250     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
251     std::lock_guard lk(workerCtrl.lock);
252     workerCtrl.pollWaitFlag = false;
253 }
254 
IsExceedDeepSleepThreshold()255 bool CPUMonitor::IsExceedDeepSleepThreshold()
256 {
257     int totalWorker = 0;
258     int deepSleepingWorkerNum = 0;
259     for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
260         WorkerCtrl& workerCtrl = ctrlQueue[i];
261         std::lock_guard lk(workerCtrl.lock);
262         deepSleepingWorkerNum += workerCtrl.deepSleepingWorkerNum;
263         totalWorker += workerCtrl.executionNum + workerCtrl.sleepingWorkerNum;
264     }
265     return deepSleepingWorkerNum * 2 > totalWorker;
266 }
267 
Poke(const QoS & qos,uint32_t taskCount,TaskNotifyType notifyType)268 void CPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyType)
269 {
270     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
271     workerCtrl.lock.lock();
272     size_t runningNum = workerCtrl.executionNum;
273     size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
274 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
275     /* There is no need to update running num when executionNum < maxConcurrency */
276     if (workerCtrl.executionNum >= workerCtrl.maxConcurrency) {
277         if (blockAwareInit && !BlockawareLoadSnapshot(keyPtr, &domainInfoNotify)) {
278             if (workerCtrl.executionNum >= domainInfoNotify.localinfo[qos()].nrBlocked) {
279                 /* nrRunning may not be updated in a timely manner */
280                 runningNum = workerCtrl.executionNum - domainInfoNotify.localinfo[qos()].nrBlocked;
281             } else {
282                 FFRT_LOGE("qos [%d] nrBlocked [%u] is larger than executionNum [%d].",
283                     qos(), domainInfoNotify.localinfo[qos()].nrBlocked, workerCtrl.executionNum);
284             }
285         }
286     }
287 #endif
288 
289     bool tiggerSuppression = (totalNum > TIGGER_SUPPRESS_WORKER_COUNT) &&
290         (runningNum > TIGGER_SUPPRESS_EXECUTION_NUM) && (taskCount < runningNum);
291 
292     if (notifyType != TaskNotifyType::TASK_ADDED && tiggerSuppression) {
293         workerCtrl.lock.unlock();
294         return;
295     }
296     if (static_cast<uint32_t>(workerCtrl.sleepingWorkerNum) > 0) {
297         workerCtrl.lock.unlock();
298         ops.WakeupWorkers(qos);
299     } else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
300         workerCtrl.executionNum++;
301         FFRTTraceRecord::WorkRecord((int)qos, workerCtrl.executionNum);
302         workerCtrl.lock.unlock();
303         ops.IncWorker(qos);
304     } else {
305         if (workerCtrl.pollWaitFlag) {
306             FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
307         }
308         workerCtrl.lock.unlock();
309     }
310 }
311 
NotifyWorkers(const QoS & qos,int number)312 void CPUMonitor::NotifyWorkers(const QoS& qos, int number)
313 {
314     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
315     workerCtrl.lock.lock();
316 
317     int increasableNumber = static_cast<int>(workerCtrl.maxConcurrency) -
318         (workerCtrl.executionNum + workerCtrl.sleepingWorkerNum);
319     int wakeupNumber = std::min(number, workerCtrl.sleepingWorkerNum);
320     for (int idx = 0; idx < wakeupNumber; idx++) {
321         ops.WakeupWorkers(qos);
322     }
323 
324     int incNumber = std::min(number - wakeupNumber, increasableNumber);
325     for (int idx = 0; idx < incNumber; idx++) {
326         workerCtrl.executionNum++;
327         ops.IncWorker(qos);
328     }
329 
330     workerCtrl.lock.unlock();
331     FFRT_LOGD("qos[%d] inc [%d] workers, wakeup [%d] workers", static_cast<int>(qos), incNumber, wakeupNumber);
332 }
333 
334 // default strategy which is kind of radical for poking workers
HandleTaskNotifyDefault(const QoS & qos,void * p,TaskNotifyType notifyType)335 void CPUMonitor::HandleTaskNotifyDefault(const QoS& qos, void* p, TaskNotifyType notifyType)
336 {
337     CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
338     size_t taskCount = static_cast<size_t>(monitor->GetOps().GetTaskCount(qos));
339     switch (notifyType) {
340         case TaskNotifyType::TASK_ADDED:
341         case TaskNotifyType::TASK_PICKED:
342             if (taskCount > 0) {
343                 monitor->Poke(qos, taskCount, notifyType);
344             }
345             break;
346         case TaskNotifyType::TASK_LOCAL:
347                 monitor->Poke(qos, taskCount, notifyType);
348             break;
349         default:
350             break;
351     }
352 }
353 
354 // conservative strategy for poking workers
HandleTaskNotifyConservative(const QoS & qos,void * p,TaskNotifyType notifyType)355 void CPUMonitor::HandleTaskNotifyConservative(const QoS& qos, void* p, TaskNotifyType notifyType)
356 {
357     CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
358     int taskCount = monitor->ops.GetTaskCount(qos);
359     if (taskCount == 0) {
360         // no available task in global queue, skip
361         return;
362     }
363     constexpr double thresholdTaskPick = 1.0;
364     WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
365     workerCtrl.lock.lock();
366 
367     if (notifyType == TaskNotifyType::TASK_PICKED) {
368         int wakedWorkerCount = workerCtrl.executionNum;
369         double remainingLoadRatio = (wakedWorkerCount == 0) ? static_cast<double>(workerCtrl.maxConcurrency) :
370             static_cast<double>(taskCount) / static_cast<double>(wakedWorkerCount);
371         if (remainingLoadRatio <= thresholdTaskPick) {
372             // for task pick, wake worker when load ratio > 1
373             workerCtrl.lock.unlock();
374             return;
375         }
376     }
377 
378     if (static_cast<uint32_t>(workerCtrl.executionNum) < workerCtrl.maxConcurrency) {
379         if (workerCtrl.sleepingWorkerNum == 0) {
380             FFRT_LOGI("begin to create worker, notifyType[%d]"
381                 "execnum[%d], maxconcur[%d], slpnum[%d], dslpnum[%d]",
382                 notifyType, workerCtrl.executionNum, workerCtrl.maxConcurrency,
383                 workerCtrl.sleepingWorkerNum, workerCtrl.deepSleepingWorkerNum);
384             workerCtrl.executionNum++;
385             workerCtrl.lock.unlock();
386             monitor->ops.IncWorker(qos);
387         } else {
388             workerCtrl.lock.unlock();
389             monitor->ops.WakeupWorkers(qos);
390         }
391     } else {
392         workerCtrl.lock.unlock();
393     }
394 }
395 
HandleTaskNotifyUltraConservative(const QoS & qos,void * p,TaskNotifyType notifyType)396 void CPUMonitor::HandleTaskNotifyUltraConservative(const QoS& qos, void* p, TaskNotifyType notifyType)
397 {
398     (void)notifyType;
399     CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
400     int taskCount = monitor->ops.GetTaskCount(qos);
401     if (taskCount == 0) {
402         // no available task in global queue, skip
403         return;
404     }
405 
406     WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
407     std::lock_guard lock(workerCtrl.lock);
408 
409     int runningNum = workerCtrl.executionNum;
410 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
411     if (monitor->blockAwareInit && !BlockawareLoadSnapshot(monitor->keyPtr, &monitor->domainInfoNotify)) {
412         /* nrRunning may not be updated in a timely manner */
413         runningNum = workerCtrl.executionNum - monitor->domainInfoNotify.localinfo[qos()].nrBlocked;
414         if (!monitor->stopMonitor && taskCount == runningNum) {
415             BlockawareWake();
416             return;
417         }
418     }
419 #endif
420 
421     if (taskCount < runningNum) {
422         return;
423     }
424 
425     if (runningNum < static_cast<int>(workerCtrl.maxConcurrency)) {
426         if (workerCtrl.sleepingWorkerNum == 0) {
427             workerCtrl.executionNum++;
428             monitor->ops.IncWorker(qos);
429         } else {
430             monitor->ops.WakeupWorkers(qos);
431         }
432     }
433 }
434 }