1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "eu/cpu_monitor.h"
17 #include <iostream>
18 #include <thread>
19 #include <climits>
20 #include <unistd.h>
21 #include <securec.h>
22 #include "sched/scheduler.h"
23 #include "eu/execute_unit.h"
24 #include "dfx/log/ffrt_log_api.h"
25 #include "dfx/sysevent/sysevent.h"
26 #include "internal_inc/config.h"
27 #include "internal_inc/osal.h"
28 #include "util/name_manager.h"
29 #include "sync/poller.h"
30 #include "util/ffrt_facade.h"
31 #include "util/spmc_queue.h"
32
33 namespace {
34 const size_t MAX_ESCAPE_WORKER_NUM = 1024;
35 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
36 constexpr int JITTER_DELAY_MS = 5;
37 #endif
38 }
39
40 namespace ffrt {
CPUMonitor(CpuMonitorOps && ops,const std::function<void (int,void *)> & executeEscapeFunc)41 CPUMonitor::CPUMonitor(CpuMonitorOps&& ops, const std::function<void(int, void*)>& executeEscapeFunc)
42 : ops(ops), escapeMgr_(executeEscapeFunc)
43 {
44 SetupMonitor();
45 }
46
~CPUMonitor()47 CPUMonitor::~CPUMonitor()
48 {
49 if (monitorThread != nullptr) {
50 monitorThread->join();
51 }
52 delete monitorThread;
53 monitorThread = nullptr;
54 }
55
SetupMonitor()56 void CPUMonitor::SetupMonitor()
57 {
58 for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
59 ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT;
60 ctrlQueue[qos].maxConcurrency = GlobalConfig::Instance().getCpuWorkerNum(qos);
61 setWorkerMaxNum[qos] = false;
62 }
63 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
64 memset_s(&domainInfoMonitor, sizeof(domainInfoMonitor), 0, sizeof(domainInfoMonitor));
65 wakeupCond.check_ahead = false;
66 wakeupCond.global.low = 0;
67 wakeupCond.global.high = 0;
68 for (int i = 0; i < BLOCKAWARE_DOMAIN_ID_MAX + 1; i++) {
69 wakeupCond.local[i].low = 0;
70 if (i < qosMonitorMaxNum) {
71 wakeupCond.local[i].high = UINT_MAX;
72 wakeupCond.global.low += wakeupCond.local[i].low;
73 wakeupCond.global.high = UINT_MAX;
74 } else {
75 wakeupCond.local[i].high = 0;
76 }
77 }
78 #endif
79 }
80
StartMonitor()81 void CPUMonitor::StartMonitor()
82 {
83 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
84 int ret = BlockawareInit(&keyPtr);
85 if (ret != 0) {
86 FFRT_LOGE("blockaware init fail, ret[%d], key[0x%lx]", ret, keyPtr);
87 } else {
88 blockAwareInit = true;
89 }
90 #else
91 monitorThread = nullptr;
92 #endif
93 }
94
SetWorkerMaxNum(const QoS & qos,uint32_t num)95 int CPUMonitor::SetWorkerMaxNum(const QoS& qos, uint32_t num)
96 {
97 WorkerCtrl& workerCtrl = ctrlQueue[qos()];
98 workerCtrl.lock.lock();
99 if (setWorkerMaxNum[qos()]) {
100 FFRT_LOGE("qos[%d] worker num can only been setup once", qos());
101 workerCtrl.lock.unlock();
102 return -1;
103 }
104
105 workerCtrl.hardLimit = static_cast<size_t>(num);
106 setWorkerMaxNum[qos()] = true;
107 workerCtrl.lock.unlock();
108 return 0;
109 }
110
GetMonitorTid() const111 uint32_t CPUMonitor::GetMonitorTid() const
112 {
113 return monitorTid;
114 }
115
116 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
WakeupCond(void)117 BlockawareWakeupCond* CPUMonitor::WakeupCond(void)
118 {
119 return &wakeupCond;
120 }
121
MonitorMain()122 void CPUMonitor::MonitorMain()
123 {
124 (void)WorkerInit();
125 int ret = BlockawareLoadSnapshot(keyPtr, &domainInfoMonitor);
126 if (ret != 0) {
127 FFRT_LOGE("blockaware load snapshot fail, ret[%d]", ret);
128 return;
129 }
130 for (int i = 0; i < qosMonitorMaxNum; i++) {
131 auto& info = domainInfoMonitor.localinfo[i];
132 if (info.nrRunning <= wakeupCond.local[i].low &&
133 (info.nrRunning + info.nrBlocked + info.nrSleeping) < MAX_ESCAPE_WORKER_NUM) {
134 Notify(i, TaskNotifyType::TASK_ESCAPED);
135 }
136 }
137 stopMonitor = true;
138 }
139
IsExceedRunningThreshold(const QoS & qos)140 bool CPUMonitor::IsExceedRunningThreshold(const QoS& qos)
141 {
142 return blockAwareInit && (BlockawareLoadSnapshotNrRunningFast(keyPtr, qos()) > ctrlQueue[qos()].maxConcurrency);
143 }
144
IsBlockAwareInit(void)145 bool CPUMonitor::IsBlockAwareInit(void)
146 {
147 return blockAwareInit;
148 }
149 #endif
150
SetEscapeEnable(uint64_t oneStageIntervalMs,uint64_t twoStageIntervalMs,uint64_t threeStageIntervalMs,uint64_t oneStageWorkerNum,uint64_t twoStageWorkerNum)151 int CPUMonitor::SetEscapeEnable(uint64_t oneStageIntervalMs, uint64_t twoStageIntervalMs,
152 uint64_t threeStageIntervalMs, uint64_t oneStageWorkerNum, uint64_t twoStageWorkerNum)
153 {
154 return escapeMgr_.SetEscapeEnable(oneStageIntervalMs, twoStageIntervalMs,
155 threeStageIntervalMs, oneStageWorkerNum, twoStageWorkerNum);
156 }
157
SetEscapeDisable()158 void CPUMonitor::SetEscapeDisable()
159 {
160 escapeMgr_.SetEscapeDisable();
161 }
162
TimeoutCount(const QoS & qos)163 void CPUMonitor::TimeoutCount(const QoS& qos)
164 {
165 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
166 std::lock_guard lk(workerCtrl.lock);
167 workerCtrl.sleepingWorkerNum--;
168 }
169
WakeupSleep(const QoS & qos,bool irqWake)170 void CPUMonitor::WakeupSleep(const QoS& qos, bool irqWake)
171 {
172 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
173 std::lock_guard lk(workerCtrl.lock);
174 if (irqWake) {
175 workerCtrl.irqEnable = false;
176 }
177 if (workerCtrl.pendingWakeCnt > 0) {
178 workerCtrl.pendingWakeCnt--;
179 }
180 workerCtrl.sleepingWorkerNum--;
181 workerCtrl.executionNum++;
182 }
183
TotalCount(const QoS & qos)184 int CPUMonitor::TotalCount(const QoS& qos)
185 {
186 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
187 workerCtrl.lock.lock();
188 int total = workerCtrl.sleepingWorkerNum + workerCtrl.executionNum;
189 workerCtrl.lock.unlock();
190 return total;
191 }
192
RollbackDestroy(const QoS & qos,bool irqWake)193 void CPUMonitor::RollbackDestroy(const QoS& qos, bool irqWake)
194 {
195 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
196 std::lock_guard lk(workerCtrl.lock);
197 if (irqWake) {
198 workerCtrl.irqEnable = false;
199 }
200 workerCtrl.executionNum++;
201 }
202
TryDestroy(const QoS & qos)203 bool CPUMonitor::TryDestroy(const QoS& qos)
204 {
205 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
206 std::lock_guard lk(workerCtrl.lock);
207 workerCtrl.sleepingWorkerNum--;
208 return workerCtrl.sleepingWorkerNum > 0;
209 }
210
SleepingWorkerNum(const QoS & qos)211 int CPUMonitor::SleepingWorkerNum(const QoS& qos)
212 {
213 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
214 std::unique_lock lk(workerCtrl.lock);
215 return workerCtrl.sleepingWorkerNum;
216 }
217
WakedWorkerNum(const QoS & qos)218 int CPUMonitor::WakedWorkerNum(const QoS& qos)
219 {
220 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
221 std::lock_guard lk(workerCtrl.lock);
222 return workerCtrl.executionNum;
223 }
224
IntoDeepSleep(const QoS & qos)225 void CPUMonitor::IntoDeepSleep(const QoS& qos)
226 {
227 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
228 std::lock_guard lk(workerCtrl.lock);
229 workerCtrl.deepSleepingWorkerNum++;
230 }
231
WakeupDeepSleep(const QoS & qos,bool irqWake)232 void CPUMonitor::WakeupDeepSleep(const QoS& qos, bool irqWake)
233 {
234 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
235 std::lock_guard lk(workerCtrl.lock);
236 if (irqWake) {
237 workerCtrl.irqEnable = false;
238 }
239 workerCtrl.sleepingWorkerNum--;
240 workerCtrl.deepSleepingWorkerNum--;
241 workerCtrl.executionNum++;
242 }
243
OutOfPollWait(const QoS & qos)244 void CPUMonitor::OutOfPollWait(const QoS& qos)
245 {
246 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
247 std::lock_guard lk(workerCtrl.lock);
248 workerCtrl.pollWaitFlag = false;
249 }
250
IsExceedDeepSleepThreshold()251 bool CPUMonitor::IsExceedDeepSleepThreshold()
252 {
253 int totalWorker = 0;
254 int deepSleepingWorkerNum = 0;
255 for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
256 WorkerCtrl& workerCtrl = ctrlQueue[i];
257 std::lock_guard lk(workerCtrl.lock);
258 deepSleepingWorkerNum += workerCtrl.deepSleepingWorkerNum;
259 totalWorker += workerCtrl.executionNum + workerCtrl.sleepingWorkerNum;
260 }
261 return deepSleepingWorkerNum * 2 > totalWorker;
262 }
263
NotifyWorkers(const QoS & qos,int number)264 void CPUMonitor::NotifyWorkers(const QoS& qos, int number)
265 {
266 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
267 workerCtrl.lock.lock();
268
269 int increasableNumber = static_cast<int>(workerCtrl.maxConcurrency) -
270 (workerCtrl.executionNum + workerCtrl.sleepingWorkerNum);
271 int wakeupNumber = std::min(number, workerCtrl.sleepingWorkerNum);
272 for (int idx = 0; idx < wakeupNumber; idx++) {
273 ops.WakeupWorkers(qos);
274 }
275
276 int incNumber = std::min(number - wakeupNumber, increasableNumber);
277 for (int idx = 0; idx < incNumber; idx++) {
278 workerCtrl.executionNum++;
279 ops.IncWorker(qos);
280 }
281
282 workerCtrl.lock.unlock();
283 FFRT_LOGD("qos[%d] inc [%d] workers, wakeup [%d] workers", static_cast<int>(qos), incNumber, wakeupNumber);
284 }
285
GetRunningNum(const QoS & qos)286 size_t CPUMonitor::GetRunningNum(const QoS& qos)
287 {
288 WorkerCtrl& workerCtrl = ctrlQueue[qos()];
289 size_t runningNum = workerCtrl.executionNum;
290
291 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
292 /* There is no need to update running num when executionNum < maxConcurrency */
293 if (workerCtrl.executionNum >= workerCtrl.maxConcurrency && blockAwareInit) {
294 auto nrBlocked = BlockawareLoadSnapshotNrBlockedFast(keyPtr, qos());
295 if (workerCtrl.executionNum >= nrBlocked) {
296 /* nrRunning may not be updated in a timely manner */
297 runningNum = workerCtrl.executionNum - nrBlocked;
298 } else {
299 FFRT_LOGE("qos [%d] nrBlocked [%u] is larger than executionNum [%d].",
300 qos(), nrBlocked, workerCtrl.executionNum);
301 }
302 }
303 #endif
304
305 return runningNum;
306 }
307
ReportEscapeEvent(int qos,size_t totalNum)308 void CPUMonitor::ReportEscapeEvent(int qos, size_t totalNum)
309 {
310 #ifdef FFRT_SEND_EVENT
311 WorkerEscapeReport(GetCurrentProcessName(), qos, totalNum);
312 #endif
313 }
314 }