1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "eu/cpu_monitor.h"
17 #include <iostream>
18 #include <thread>
19 #include <unistd.h>
20 #include <sys/prctl.h>
21 #include <sys/syscall.h>
22 #include "sched/scheduler.h"
23 #include "eu/wgcm.h"
24 #include "eu/execute_unit.h"
25 #include "dfx/log/ffrt_log_api.h"
26 #include "internal_inc/config.h"
27 #include "util/name_manager.h"
28 #ifdef FFRT_IO_TASK_SCHEDULER
29 #include "sync/poller.h"
30 #include "util/spmc_queue.h"
31
32 namespace {
33 const int TRIGGER_SUPPRESS_WORKER_COUNT = 4;
34 const int TRIGGER_SUPPRESS_EXECUTION_NUM = 2;
35 }
36 #endif
37 namespace ffrt {
HandleBlocked(const QoS & qos)38 void CPUMonitor::HandleBlocked(const QoS& qos)
39 {
40 int taskCount = ops.GetTaskCount(qos);
41 if (taskCount == 0) {
42 return;
43 }
44 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
45 workerCtrl.lock.lock();
46 size_t exeValue = static_cast<uint32_t>(workerCtrl.executionNum);
47 workerCtrl.lock.unlock();
48 size_t blockedNum = CountBlockedNum(qos);
49 if (blockedNum > 0 && (exeValue - blockedNum < workerCtrl.maxConcurrency) && exeValue < workerCtrl.hardLimit) {
50 Poke(qos, TaskNotifyType::TASK_ADDED);
51 }
52 }
53
MonitorMain(CPUMonitor * monitor)54 void MonitorMain(CPUMonitor* monitor)
55 {
56 (void)pthread_setname_np(pthread_self(), CPU_MONITOR_NAME);
57 int ret = prctl(PR_WGCM_CTL, WGCM_CTL_SERVER_REG, 0, 0, 0);
58 if (ret) {
59 FFRT_LOGE("[SERVER] wgcm register server failed ret is %{public}d", ret);
60 }
61
62 ret = syscall(SYS_gettid);
63 if (ret == -1) {
64 monitor->monitorTid = 0;
65 FFRT_LOGE("syscall(SYS_gettid) failed");
66 } else {
67 monitor->monitorTid = static_cast<uint32_t>(ret);
68 }
69
70 for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
71 struct wgcm_workergrp_data grp = {0, 0, 0, 0, 0, 0, 0, 0};
72 grp.gid = i;
73 grp.min_concur_workers = DEFAULT_MINCONCURRENCY;
74 grp.max_workers_sum = DEFAULT_HARDLIMIT;
75 ret = prctl(PR_WGCM_CTL, WGCM_CTL_SET_GRP, &grp, 0, 0);
76 if (ret) {
77 FFRT_LOGE("[SERVER] wgcm group %u register failed\n ret is %{public}d", i, ret);
78 }
79 }
80
81 while (true) {
82 struct wgcm_workergrp_data data = {0, 0, 0, 0, 0, 0, 0, 0};
83 ret = prctl(PR_WGCM_CTL, WGCM_CTL_WAIT, &data, 0, 0);
84 if (ret) {
85 FFRT_LOGE("[SERVER] wgcm server wait failed ret is %{public}d", ret);
86 sleep(1);
87 continue;
88 }
89 if (data.woken_flag == WGCM_ACTIVELY_WAKE) {
90 break;
91 }
92
93 for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
94 monitor->HandleBlocked(qos);
95 }
96 }
97 ret = prctl(PR_WGCM_CTL, WGCM_CTL_UNREGISTER, 0, 0, 0);
98 if (ret) {
99 FFRT_LOGE("[SERVER] wgcm server unregister failed ret is %{public}d.", ret);
100 }
101 }
102
SetupMonitor()103 void CPUMonitor::SetupMonitor()
104 {
105 for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
106 ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT;
107 ctrlQueue[qos].workerManagerID = static_cast<uint32_t>(qos);
108 ctrlQueue[qos].maxConcurrency = GlobalConfig::Instance().getCpuWorkerNum(qos);
109 }
110 }
111
SetWorkerMaxNum(const QoS & qos,int num)112 int CPUMonitor::SetWorkerMaxNum(const QoS& qos, int num)
113 {
114 WorkerCtrl& workerCtrl = ctrlQueue[qos()];
115 workerCtrl.lock.lock();
116 static bool setFlag[QoS::Max()] = {false};
117 if (setFlag[qos()]) {
118 FFRT_LOGE("qos[%d] worker num can only been setup once", qos());
119 workerCtrl.lock.unlock();
120 return -1;
121 }
122 if (num <= 0 || num > QOS_WORKER_MAXNUM) {
123 FFRT_LOGE("qos[%d] worker num[%d] is invalid.", qos(), num);
124 workerCtrl.lock.unlock();
125 return -1;
126 }
127 workerCtrl.maxConcurrency = num;
128 setFlag[qos()] = true;
129 workerCtrl.lock.unlock();
130 return 0;
131 }
132
RegWorker(const QoS & qos)133 void CPUMonitor::RegWorker(const QoS& qos)
134 {
135 struct wgcm_workergrp_data grp = {0, 0, 0, 0, 0, 0, 0, 0};
136 grp.gid = static_cast<uint32_t>(qos);
137 grp.server_tid = monitorTid;
138 int ret = prctl(PR_WGCM_CTL, WGCM_CTL_WORKER_REG, &grp, 0, 0);
139 if (ret) {
140 FFRT_LOGE("[WORKER] Register failed! error=%d\n", ret);
141 }
142 }
143
UnRegWorker()144 void CPUMonitor::UnRegWorker()
145 {
146 int ret = prctl(PR_WGCM_CTL, WGCM_CTL_UNREGISTER, 0, 0, 0);
147 if (ret) {
148 FFRT_LOGE("leave workgroup failed error=%d\n", ret);
149 }
150 }
151
CPUMonitor(CpuMonitorOps && ops)152 CPUMonitor::CPUMonitor(CpuMonitorOps&& ops) : ops(ops)
153 {
154 SetupMonitor();
155 monitorThread = nullptr;
156 }
157
~CPUMonitor()158 CPUMonitor::~CPUMonitor()
159 {
160 if (monitorThread != nullptr) {
161 monitorThread->join();
162 }
163 delete monitorThread;
164 monitorThread = nullptr;
165 }
166
StartMonitor()167 void CPUMonitor::StartMonitor()
168 {
169 monitorThread = new std::thread(MonitorMain, this);
170 }
171
GetMonitorTid() const172 uint32_t CPUMonitor::GetMonitorTid() const
173 {
174 return monitorTid;
175 }
176
IncSleepingRef(const QoS & qos)177 void CPUMonitor::IncSleepingRef(const QoS& qos)
178 {
179 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
180 workerCtrl.lock.lock();
181 workerCtrl.sleepingWorkerNum++;
182 workerCtrl.lock.unlock();
183 }
184
DecSleepingRef(const QoS & qos)185 void CPUMonitor::DecSleepingRef(const QoS& qos)
186 {
187 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
188 workerCtrl.lock.lock();
189 workerCtrl.sleepingWorkerNum--;
190 workerCtrl.lock.unlock();
191 }
192
DecExeNumRef(const QoS & qos)193 void CPUMonitor::DecExeNumRef(const QoS& qos)
194 {
195 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
196 workerCtrl.lock.lock();
197 workerCtrl.executionNum--;
198 workerCtrl.lock.unlock();
199 }
200
CountBlockedNum(const QoS & qos)201 size_t CPUMonitor::CountBlockedNum(const QoS& qos)
202 {
203 struct wgcm_workergrp_data grp = {0, 0, 0, 0, 0, 0, 0, 0};
204 grp.gid = static_cast<uint32_t>(qos);
205 grp.server_tid = monitorTid;
206 int ret = prctl(PR_WGCM_CTL, WGCM_CTL_GET, &grp, 0, 0);
207 if (ret) {
208 FFRT_LOGE("failed to get wgcm count");
209 } else {
210 return static_cast<size_t>(grp.blk_workers_sum);
211 }
212 return 0;
213 }
214
TimeoutCount(const QoS & qos)215 void CPUMonitor::TimeoutCount(const QoS& qos)
216 {
217 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
218 workerCtrl.lock.lock();
219 workerCtrl.sleepingWorkerNum--;
220 workerCtrl.lock.unlock();
221 }
222
WakeupCount(const QoS & qos)223 void CPUMonitor::WakeupCount(const QoS& qos)
224 {
225 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
226 workerCtrl.lock.lock();
227 workerCtrl.sleepingWorkerNum--;
228 workerCtrl.executionNum++;
229 workerCtrl.lock.unlock();
230 }
231
232 #ifdef FFRT_IO_TASK_SCHEDULER
WakedWorkerNum(const QoS & qos)233 int CPUMonitor::WakedWorkerNum(const QoS& qos)
234 {
235 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
236 std::unique_lock lk(workerCtrl.lock);
237 return workerCtrl.executionNum;
238 }
239 #endif
240
IntoDeepSleep(const QoS & qos)241 void CPUMonitor::IntoDeepSleep(const QoS& qos)
242 {
243 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
244 workerCtrl.lock.lock();
245 workerCtrl.deepSleepingWorkerNum++;
246 workerCtrl.lock.unlock();
247 }
248
OutOfDeepSleep(const QoS & qos)249 void CPUMonitor::OutOfDeepSleep(const QoS& qos)
250 {
251 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
252 workerCtrl.lock.lock();
253 workerCtrl.sleepingWorkerNum--;
254 workerCtrl.executionNum++;
255 workerCtrl.deepSleepingWorkerNum--;
256 workerCtrl.lock.unlock();
257 }
258
259 #ifdef FFRT_IO_TASK_SCHEDULER
IntoPollWait(const QoS & qos)260 void CPUMonitor::IntoPollWait(const QoS& qos)
261 {
262 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
263 workerCtrl.lock.lock();
264 workerCtrl.pollWaitFlag = true;
265 workerCtrl.lock.unlock();
266 }
267
OutOfPollWait(const QoS & qos)268 void CPUMonitor::OutOfPollWait(const QoS& qos)
269 {
270 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
271 workerCtrl.lock.lock();
272 workerCtrl.pollWaitFlag = false;
273 workerCtrl.lock.unlock();
274 }
275 #endif
276
IsExceedDeepSleepThreshold()277 bool CPUMonitor::IsExceedDeepSleepThreshold()
278 {
279 int totalWorker = 0;
280 int deepSleepingWorkerNum = 0;
281 for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
282 WorkerCtrl& workerCtrl = ctrlQueue[i];
283 workerCtrl.lock.lock();
284 deepSleepingWorkerNum += workerCtrl.deepSleepingWorkerNum;
285 totalWorker += workerCtrl.executionNum + workerCtrl.sleepingWorkerNum;
286 workerCtrl.lock.unlock();
287 }
288 return deepSleepingWorkerNum * 2 > totalWorker;
289 }
290
Poke(const QoS & qos,TaskNotifyType notifyType)291 void CPUMonitor::Poke(const QoS& qos, TaskNotifyType notifyType)
292 {
293 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
294 workerCtrl.lock.lock();
295
296 #ifdef FFRT_IO_TASK_SCHEDULER
297 bool triggerSuppression = (ops.GetWorkerCount(qos) > TRIGGER_SUPPRESS_WORKER_COUNT) &&
298 (workerCtrl.executionNum > TRIGGER_SUPPRESS_EXECUTION_NUM) && (ops.GetTaskCount(qos) < workerCtrl.executionNum);
299 if (notifyType != TaskNotifyType::TASK_ADDED && triggerSuppression) {
300 workerCtrl.lock.unlock();
301 return;
302 }
303 #endif
304
305 FFRT_LOGD("qos[%d] exe num[%d] slp num[%d]", (int)qos, workerCtrl.executionNum, workerCtrl.sleepingWorkerNum);
306 if (static_cast<uint32_t>(workerCtrl.executionNum) < workerCtrl.maxConcurrency) {
307 if (workerCtrl.sleepingWorkerNum == 0) {
308 workerCtrl.executionNum++;
309 workerCtrl.lock.unlock();
310 ops.IncWorker(qos);
311 } else {
312 workerCtrl.lock.unlock();
313 ops.WakeupWorkers(qos);
314 }
315 } else {
316 #ifdef FFRT_IO_TASK_SCHEDULER
317 if (workerCtrl.pollWaitFlag) {
318 PollerProxy::Instance()->GetPoller(qos).WakeUp();
319 }
320 #endif
321 workerCtrl.lock.unlock();
322 }
323 }
324 }
325