• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "eu/cpu_monitor.h"
17 #include <iostream>
18 #include <thread>
19 #include <unistd.h>
20 #include <sys/prctl.h>
21 #include <sys/syscall.h>
22 #include "sched/scheduler.h"
23 #include "eu/wgcm.h"
24 #include "eu/execute_unit.h"
25 #include "dfx/log/ffrt_log_api.h"
26 #include "internal_inc/config.h"
27 #include "util/name_manager.h"
28 #ifdef FFRT_IO_TASK_SCHEDULER
29 #include "sync/poller.h"
30 #include "util/spmc_queue.h"
31 
32 namespace {
33 const int TRIGGER_SUPPRESS_WORKER_COUNT = 4;
34 const int TRIGGER_SUPPRESS_EXECUTION_NUM = 2;
35 }
36 #endif
37 namespace ffrt {
HandleBlocked(const QoS & qos)38 void CPUMonitor::HandleBlocked(const QoS& qos)
39 {
40     int taskCount = ops.GetTaskCount(qos);
41     if (taskCount == 0) {
42         return;
43     }
44     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
45     workerCtrl.lock.lock();
46     size_t exeValue = static_cast<uint32_t>(workerCtrl.executionNum);
47     workerCtrl.lock.unlock();
48     size_t blockedNum = CountBlockedNum(qos);
49     if (blockedNum > 0 && (exeValue - blockedNum < workerCtrl.maxConcurrency) && exeValue < workerCtrl.hardLimit) {
50         Poke(qos, TaskNotifyType::TASK_ADDED);
51     }
52 }
53 
MonitorMain(CPUMonitor * monitor)54 void MonitorMain(CPUMonitor* monitor)
55 {
56     (void)pthread_setname_np(pthread_self(), CPU_MONITOR_NAME);
57     int ret = prctl(PR_WGCM_CTL, WGCM_CTL_SERVER_REG, 0, 0, 0);
58     if (ret) {
59         FFRT_LOGE("[SERVER] wgcm register server failed ret is %{public}d", ret);
60     }
61 
62     ret = syscall(SYS_gettid);
63     if (ret == -1) {
64         monitor->monitorTid = 0;
65         FFRT_LOGE("syscall(SYS_gettid) failed");
66     } else {
67         monitor->monitorTid = static_cast<uint32_t>(ret);
68     }
69 
70     for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
71         struct wgcm_workergrp_data grp = {0, 0, 0, 0, 0, 0, 0, 0};
72         grp.gid = i;
73         grp.min_concur_workers = DEFAULT_MINCONCURRENCY;
74         grp.max_workers_sum = DEFAULT_HARDLIMIT;
75         ret = prctl(PR_WGCM_CTL, WGCM_CTL_SET_GRP, &grp, 0, 0);
76         if (ret) {
77             FFRT_LOGE("[SERVER] wgcm group %u register failed\n ret is %{public}d", i, ret);
78         }
79     }
80 
81     while (true) {
82         struct wgcm_workergrp_data data = {0, 0, 0, 0, 0, 0, 0, 0};
83         ret = prctl(PR_WGCM_CTL, WGCM_CTL_WAIT, &data, 0, 0);
84         if (ret) {
85             FFRT_LOGE("[SERVER] wgcm server wait failed ret is %{public}d", ret);
86             sleep(1);
87             continue;
88         }
89         if (data.woken_flag == WGCM_ACTIVELY_WAKE) {
90             break;
91         }
92 
93         for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
94             monitor->HandleBlocked(qos);
95         }
96     }
97     ret = prctl(PR_WGCM_CTL, WGCM_CTL_UNREGISTER, 0, 0, 0);
98     if (ret) {
99         FFRT_LOGE("[SERVER] wgcm server unregister failed ret is %{public}d.", ret);
100     }
101 }
102 
SetupMonitor()103 void CPUMonitor::SetupMonitor()
104 {
105     for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
106         ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT;
107         ctrlQueue[qos].workerManagerID = static_cast<uint32_t>(qos);
108         ctrlQueue[qos].maxConcurrency = GlobalConfig::Instance().getCpuWorkerNum(qos);
109     }
110 }
111 
SetWorkerMaxNum(const QoS & qos,int num)112 int CPUMonitor::SetWorkerMaxNum(const QoS& qos, int num)
113 {
114     WorkerCtrl& workerCtrl = ctrlQueue[qos()];
115     workerCtrl.lock.lock();
116     static bool setFlag[QoS::Max()] = {false};
117     if (setFlag[qos()]) {
118         FFRT_LOGE("qos[%d] worker num can only been setup once", qos());
119         workerCtrl.lock.unlock();
120         return -1;
121     }
122     if (num <= 0 || num > QOS_WORKER_MAXNUM) {
123         FFRT_LOGE("qos[%d] worker num[%d] is invalid.", qos(), num);
124         workerCtrl.lock.unlock();
125         return -1;
126     }
127     workerCtrl.maxConcurrency = num;
128     setFlag[qos()] = true;
129     workerCtrl.lock.unlock();
130     return 0;
131 }
132 
RegWorker(const QoS & qos)133 void CPUMonitor::RegWorker(const QoS& qos)
134 {
135     struct wgcm_workergrp_data grp = {0, 0, 0, 0, 0, 0, 0, 0};
136     grp.gid = static_cast<uint32_t>(qos);
137     grp.server_tid = monitorTid;
138     int ret = prctl(PR_WGCM_CTL, WGCM_CTL_WORKER_REG, &grp, 0, 0);
139     if (ret) {
140         FFRT_LOGE("[WORKER] Register failed! error=%d\n", ret);
141     }
142 }
143 
UnRegWorker()144 void CPUMonitor::UnRegWorker()
145 {
146     int ret = prctl(PR_WGCM_CTL, WGCM_CTL_UNREGISTER, 0, 0, 0);
147     if (ret) {
148         FFRT_LOGE("leave workgroup failed  error=%d\n", ret);
149     }
150 }
151 
CPUMonitor(CpuMonitorOps && ops)152 CPUMonitor::CPUMonitor(CpuMonitorOps&& ops) : ops(ops)
153 {
154     SetupMonitor();
155     monitorThread = nullptr;
156 }
157 
~CPUMonitor()158 CPUMonitor::~CPUMonitor()
159 {
160     if (monitorThread != nullptr) {
161         monitorThread->join();
162     }
163     delete monitorThread;
164     monitorThread = nullptr;
165 }
166 
StartMonitor()167 void CPUMonitor::StartMonitor()
168 {
169     monitorThread = new std::thread(MonitorMain, this);
170 }
171 
GetMonitorTid() const172 uint32_t CPUMonitor::GetMonitorTid() const
173 {
174     return monitorTid;
175 }
176 
IncSleepingRef(const QoS & qos)177 void CPUMonitor::IncSleepingRef(const QoS& qos)
178 {
179     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
180     workerCtrl.lock.lock();
181     workerCtrl.sleepingWorkerNum++;
182     workerCtrl.lock.unlock();
183 }
184 
DecSleepingRef(const QoS & qos)185 void CPUMonitor::DecSleepingRef(const QoS& qos)
186 {
187     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
188     workerCtrl.lock.lock();
189     workerCtrl.sleepingWorkerNum--;
190     workerCtrl.lock.unlock();
191 }
192 
DecExeNumRef(const QoS & qos)193 void CPUMonitor::DecExeNumRef(const QoS& qos)
194 {
195     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
196     workerCtrl.lock.lock();
197     workerCtrl.executionNum--;
198     workerCtrl.lock.unlock();
199 }
200 
CountBlockedNum(const QoS & qos)201 size_t CPUMonitor::CountBlockedNum(const QoS& qos)
202 {
203     struct wgcm_workergrp_data grp = {0, 0, 0, 0, 0, 0, 0, 0};
204     grp.gid = static_cast<uint32_t>(qos);
205     grp.server_tid = monitorTid;
206     int ret = prctl(PR_WGCM_CTL, WGCM_CTL_GET, &grp, 0, 0);
207     if (ret) {
208         FFRT_LOGE("failed to get wgcm count");
209     } else {
210         return static_cast<size_t>(grp.blk_workers_sum);
211     }
212     return 0;
213 }
214 
TimeoutCount(const QoS & qos)215 void CPUMonitor::TimeoutCount(const QoS& qos)
216 {
217     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
218     workerCtrl.lock.lock();
219     workerCtrl.sleepingWorkerNum--;
220     workerCtrl.lock.unlock();
221 }
222 
WakeupCount(const QoS & qos)223 void CPUMonitor::WakeupCount(const QoS& qos)
224 {
225     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
226     workerCtrl.lock.lock();
227     workerCtrl.sleepingWorkerNum--;
228     workerCtrl.executionNum++;
229     workerCtrl.lock.unlock();
230 }
231 
232 #ifdef FFRT_IO_TASK_SCHEDULER
WakedWorkerNum(const QoS & qos)233 int CPUMonitor::WakedWorkerNum(const QoS& qos)
234 {
235     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
236     std::unique_lock lk(workerCtrl.lock);
237     return workerCtrl.executionNum;
238 }
239 #endif
240 
IntoDeepSleep(const QoS & qos)241 void CPUMonitor::IntoDeepSleep(const QoS& qos)
242 {
243     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
244     workerCtrl.lock.lock();
245     workerCtrl.deepSleepingWorkerNum++;
246     workerCtrl.lock.unlock();
247 }
248 
OutOfDeepSleep(const QoS & qos)249 void CPUMonitor::OutOfDeepSleep(const QoS& qos)
250 {
251     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
252     workerCtrl.lock.lock();
253     workerCtrl.sleepingWorkerNum--;
254     workerCtrl.executionNum++;
255     workerCtrl.deepSleepingWorkerNum--;
256     workerCtrl.lock.unlock();
257 }
258 
259 #ifdef FFRT_IO_TASK_SCHEDULER
IntoPollWait(const QoS & qos)260 void CPUMonitor::IntoPollWait(const QoS& qos)
261 {
262     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
263     workerCtrl.lock.lock();
264     workerCtrl.pollWaitFlag = true;
265     workerCtrl.lock.unlock();
266 }
267 
OutOfPollWait(const QoS & qos)268 void CPUMonitor::OutOfPollWait(const QoS& qos)
269 {
270     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
271     workerCtrl.lock.lock();
272     workerCtrl.pollWaitFlag = false;
273     workerCtrl.lock.unlock();
274 }
275 #endif
276 
IsExceedDeepSleepThreshold()277 bool CPUMonitor::IsExceedDeepSleepThreshold()
278 {
279     int totalWorker = 0;
280     int deepSleepingWorkerNum = 0;
281     for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
282         WorkerCtrl& workerCtrl = ctrlQueue[i];
283         workerCtrl.lock.lock();
284         deepSleepingWorkerNum += workerCtrl.deepSleepingWorkerNum;
285         totalWorker += workerCtrl.executionNum + workerCtrl.sleepingWorkerNum;
286         workerCtrl.lock.unlock();
287     }
288     return deepSleepingWorkerNum * 2 > totalWorker;
289 }
290 
Poke(const QoS & qos,TaskNotifyType notifyType)291 void CPUMonitor::Poke(const QoS& qos, TaskNotifyType notifyType)
292 {
293     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
294     workerCtrl.lock.lock();
295 
296 #ifdef FFRT_IO_TASK_SCHEDULER
297     bool triggerSuppression = (ops.GetWorkerCount(qos) > TRIGGER_SUPPRESS_WORKER_COUNT) &&
298         (workerCtrl.executionNum > TRIGGER_SUPPRESS_EXECUTION_NUM) && (ops.GetTaskCount(qos) < workerCtrl.executionNum);
299     if (notifyType != TaskNotifyType::TASK_ADDED && triggerSuppression) {
300         workerCtrl.lock.unlock();
301         return;
302     }
303 #endif
304 
305     FFRT_LOGD("qos[%d] exe num[%d] slp num[%d]", (int)qos, workerCtrl.executionNum, workerCtrl.sleepingWorkerNum);
306     if (static_cast<uint32_t>(workerCtrl.executionNum) < workerCtrl.maxConcurrency) {
307         if (workerCtrl.sleepingWorkerNum == 0) {
308             workerCtrl.executionNum++;
309             workerCtrl.lock.unlock();
310             ops.IncWorker(qos);
311         } else {
312             workerCtrl.lock.unlock();
313             ops.WakeupWorkers(qos);
314         }
315     } else {
316 #ifdef FFRT_IO_TASK_SCHEDULER
317         if (workerCtrl.pollWaitFlag) {
318             PollerProxy::Instance()->GetPoller(qos).WakeUp();
319         }
320 #endif
321         workerCtrl.lock.unlock();
322     }
323 }
324 }
325