• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "eu/cpu_monitor.h"
17 #include <iostream>
18 #include <thread>
19 #include <climits>
20 #include <unistd.h>
21 #include <securec.h>
22 #include "sched/scheduler.h"
23 #include "eu/execute_unit.h"
24 #include "dfx/log/ffrt_log_api.h"
25 #include "dfx/sysevent/sysevent.h"
26 #include "internal_inc/config.h"
27 #include "internal_inc/osal.h"
28 #include "util/name_manager.h"
29 #include "sync/poller.h"
30 #include "util/ffrt_facade.h"
31 #include "util/spmc_queue.h"
32 
33 namespace {
34 const size_t MAX_ESCAPE_WORKER_NUM = 1024;
35 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
36 constexpr int JITTER_DELAY_MS = 5;
37 #endif
38 }
39 
40 namespace ffrt {
CPUMonitor(CpuMonitorOps && ops,const std::function<void (int,void *)> & executeEscapeFunc)41 CPUMonitor::CPUMonitor(CpuMonitorOps&& ops, const std::function<void(int, void*)>& executeEscapeFunc)
42     : ops(ops), escapeMgr_(executeEscapeFunc)
43 {
44     SetupMonitor();
45 }
46 
~CPUMonitor()47 CPUMonitor::~CPUMonitor()
48 {
49     if (monitorThread != nullptr) {
50         monitorThread->join();
51     }
52     delete monitorThread;
53     monitorThread = nullptr;
54 }
55 
SetupMonitor()56 void CPUMonitor::SetupMonitor()
57 {
58     for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
59         ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT;
60         ctrlQueue[qos].maxConcurrency = GlobalConfig::Instance().getCpuWorkerNum(qos);
61         setWorkerMaxNum[qos] = false;
62     }
63 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
64     memset_s(&domainInfoMonitor, sizeof(domainInfoMonitor), 0, sizeof(domainInfoMonitor));
65     wakeupCond.check_ahead = false;
66     wakeupCond.global.low = 0;
67     wakeupCond.global.high = 0;
68     for (int i = 0; i < BLOCKAWARE_DOMAIN_ID_MAX + 1; i++) {
69         wakeupCond.local[i].low = 0;
70         if (i < qosMonitorMaxNum) {
71             wakeupCond.local[i].high = UINT_MAX;
72             wakeupCond.global.low += wakeupCond.local[i].low;
73             wakeupCond.global.high = UINT_MAX;
74         } else {
75             wakeupCond.local[i].high = 0;
76         }
77     }
78 #endif
79 }
80 
StartMonitor()81 void CPUMonitor::StartMonitor()
82 {
83 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
84     int ret = BlockawareInit(&keyPtr);
85     if (ret != 0) {
86         FFRT_LOGE("blockaware init fail, ret[%d], key[0x%lx]", ret, keyPtr);
87     } else {
88         blockAwareInit = true;
89     }
90 #else
91     monitorThread = nullptr;
92 #endif
93 }
94 
SetWorkerMaxNum(const QoS & qos,uint32_t num)95 int CPUMonitor::SetWorkerMaxNum(const QoS& qos, uint32_t num)
96 {
97     WorkerCtrl& workerCtrl = ctrlQueue[qos()];
98     workerCtrl.lock.lock();
99     if (setWorkerMaxNum[qos()]) {
100         FFRT_LOGE("qos[%d] worker num can only been setup once", qos());
101         workerCtrl.lock.unlock();
102         return -1;
103     }
104 
105     workerCtrl.hardLimit = static_cast<size_t>(num);
106     setWorkerMaxNum[qos()] = true;
107     workerCtrl.lock.unlock();
108     return 0;
109 }
110 
GetMonitorTid() const111 uint32_t CPUMonitor::GetMonitorTid() const
112 {
113     return monitorTid;
114 }
115 
116 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
WakeupCond(void)117 BlockawareWakeupCond* CPUMonitor::WakeupCond(void)
118 {
119     return &wakeupCond;
120 }
121 
MonitorMain()122 void CPUMonitor::MonitorMain()
123 {
124     (void)WorkerInit();
125     int ret = BlockawareLoadSnapshot(keyPtr, &domainInfoMonitor);
126     if (ret != 0) {
127         FFRT_LOGE("blockaware load snapshot fail, ret[%d]", ret);
128         return;
129     }
130     for (int i = 0; i < qosMonitorMaxNum; i++) {
131         auto& info = domainInfoMonitor.localinfo[i];
132         if (info.nrRunning <= wakeupCond.local[i].low &&
133             (info.nrRunning + info.nrBlocked + info.nrSleeping) < MAX_ESCAPE_WORKER_NUM) {
134             Notify(i, TaskNotifyType::TASK_ESCAPED);
135         }
136     }
137     stopMonitor = true;
138 }
139 
IsExceedRunningThreshold(const QoS & qos)140 bool CPUMonitor::IsExceedRunningThreshold(const QoS& qos)
141 {
142     return blockAwareInit && (BlockawareLoadSnapshotNrRunningFast(keyPtr, qos()) > ctrlQueue[qos()].maxConcurrency);
143 }
144 
IsBlockAwareInit(void)145 bool CPUMonitor::IsBlockAwareInit(void)
146 {
147     return blockAwareInit;
148 }
149 #endif
150 
SetEscapeEnable(uint64_t oneStageIntervalMs,uint64_t twoStageIntervalMs,uint64_t threeStageIntervalMs,uint64_t oneStageWorkerNum,uint64_t twoStageWorkerNum)151 int CPUMonitor::SetEscapeEnable(uint64_t oneStageIntervalMs, uint64_t twoStageIntervalMs,
152         uint64_t threeStageIntervalMs, uint64_t oneStageWorkerNum, uint64_t twoStageWorkerNum)
153 {
154     return escapeMgr_.SetEscapeEnable(oneStageIntervalMs, twoStageIntervalMs,
155         threeStageIntervalMs, oneStageWorkerNum, twoStageWorkerNum);
156 }
157 
SetEscapeDisable()158 void CPUMonitor::SetEscapeDisable()
159 {
160     escapeMgr_.SetEscapeDisable();
161 }
162 
TimeoutCount(const QoS & qos)163 void CPUMonitor::TimeoutCount(const QoS& qos)
164 {
165     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
166     std::lock_guard lk(workerCtrl.lock);
167     workerCtrl.sleepingWorkerNum--;
168 }
169 
WakeupSleep(const QoS & qos,bool irqWake)170 void CPUMonitor::WakeupSleep(const QoS& qos, bool irqWake)
171 {
172     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
173     std::lock_guard lk(workerCtrl.lock);
174     if (irqWake) {
175         workerCtrl.irqEnable = false;
176     }
177     if (workerCtrl.pendingWakeCnt > 0) {
178         workerCtrl.pendingWakeCnt--;
179     }
180     workerCtrl.sleepingWorkerNum--;
181     workerCtrl.executionNum++;
182 }
183 
TotalCount(const QoS & qos)184 int CPUMonitor::TotalCount(const QoS& qos)
185 {
186     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
187     workerCtrl.lock.lock();
188     int total = workerCtrl.sleepingWorkerNum + workerCtrl.executionNum;
189     workerCtrl.lock.unlock();
190     return total;
191 }
192 
RollbackDestroy(const QoS & qos,bool irqWake)193 void CPUMonitor::RollbackDestroy(const QoS& qos, bool irqWake)
194 {
195     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
196     std::lock_guard lk(workerCtrl.lock);
197     if (irqWake) {
198         workerCtrl.irqEnable = false;
199     }
200     workerCtrl.executionNum++;
201 }
202 
TryDestroy(const QoS & qos)203 bool CPUMonitor::TryDestroy(const QoS& qos)
204 {
205     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
206     std::lock_guard lk(workerCtrl.lock);
207     workerCtrl.sleepingWorkerNum--;
208     return workerCtrl.sleepingWorkerNum > 0;
209 }
210 
SleepingWorkerNum(const QoS & qos)211 int CPUMonitor::SleepingWorkerNum(const QoS& qos)
212 {
213     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
214     std::unique_lock lk(workerCtrl.lock);
215     return workerCtrl.sleepingWorkerNum;
216 }
217 
WakedWorkerNum(const QoS & qos)218 int CPUMonitor::WakedWorkerNum(const QoS& qos)
219 {
220     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
221     std::lock_guard lk(workerCtrl.lock);
222     return workerCtrl.executionNum;
223 }
224 
IntoDeepSleep(const QoS & qos)225 void CPUMonitor::IntoDeepSleep(const QoS& qos)
226 {
227     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
228     std::lock_guard lk(workerCtrl.lock);
229     workerCtrl.deepSleepingWorkerNum++;
230 }
231 
WakeupDeepSleep(const QoS & qos,bool irqWake)232 void CPUMonitor::WakeupDeepSleep(const QoS& qos, bool irqWake)
233 {
234     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
235     std::lock_guard lk(workerCtrl.lock);
236     if (irqWake) {
237         workerCtrl.irqEnable = false;
238     }
239     workerCtrl.sleepingWorkerNum--;
240     workerCtrl.deepSleepingWorkerNum--;
241     workerCtrl.executionNum++;
242 }
243 
OutOfPollWait(const QoS & qos)244 void CPUMonitor::OutOfPollWait(const QoS& qos)
245 {
246     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
247     std::lock_guard lk(workerCtrl.lock);
248     workerCtrl.pollWaitFlag = false;
249 }
250 
IsExceedDeepSleepThreshold()251 bool CPUMonitor::IsExceedDeepSleepThreshold()
252 {
253     int totalWorker = 0;
254     int deepSleepingWorkerNum = 0;
255     for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
256         WorkerCtrl& workerCtrl = ctrlQueue[i];
257         std::lock_guard lk(workerCtrl.lock);
258         deepSleepingWorkerNum += workerCtrl.deepSleepingWorkerNum;
259         totalWorker += workerCtrl.executionNum + workerCtrl.sleepingWorkerNum;
260     }
261     return deepSleepingWorkerNum * 2 > totalWorker;
262 }
263 
NotifyWorkers(const QoS & qos,int number)264 void CPUMonitor::NotifyWorkers(const QoS& qos, int number)
265 {
266     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
267     workerCtrl.lock.lock();
268 
269     int increasableNumber = static_cast<int>(workerCtrl.maxConcurrency) -
270         (workerCtrl.executionNum + workerCtrl.sleepingWorkerNum);
271     int wakeupNumber = std::min(number, workerCtrl.sleepingWorkerNum);
272     for (int idx = 0; idx < wakeupNumber; idx++) {
273         ops.WakeupWorkers(qos);
274     }
275 
276     int incNumber = std::min(number - wakeupNumber, increasableNumber);
277     for (int idx = 0; idx < incNumber; idx++) {
278         workerCtrl.executionNum++;
279         ops.IncWorker(qos);
280     }
281 
282     workerCtrl.lock.unlock();
283     FFRT_LOGD("qos[%d] inc [%d] workers, wakeup [%d] workers", static_cast<int>(qos), incNumber, wakeupNumber);
284 }
285 
GetRunningNum(const QoS & qos)286 size_t CPUMonitor::GetRunningNum(const QoS& qos)
287 {
288     WorkerCtrl& workerCtrl = ctrlQueue[qos()];
289     size_t runningNum = workerCtrl.executionNum;
290 
291 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
292     /* There is no need to update running num when executionNum < maxConcurrency */
293     if (workerCtrl.executionNum >= workerCtrl.maxConcurrency && blockAwareInit) {
294         auto nrBlocked = BlockawareLoadSnapshotNrBlockedFast(keyPtr, qos());
295         if (workerCtrl.executionNum >= nrBlocked) {
296             /* nrRunning may not be updated in a timely manner */
297             runningNum = workerCtrl.executionNum - nrBlocked;
298         } else {
299             FFRT_LOGE("qos [%d] nrBlocked [%u] is larger than executionNum [%d].",
300                 qos(), nrBlocked, workerCtrl.executionNum);
301         }
302     }
303 #endif
304 
305     return runningNum;
306 }
307 
ReportEscapeEvent(int qos,size_t totalNum)308 void CPUMonitor::ReportEscapeEvent(int qos, size_t totalNum)
309 {
310 #ifdef FFRT_SEND_EVENT
311     WorkerEscapeReport(GetCurrentProcessName(), qos, totalNum);
312 #endif
313 }
314 }