1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "eu/cpu_monitor.h"
17 #include <iostream>
18 #include <thread>
19 #include <unistd.h>
20 #include <sys/prctl.h>
21 #include <sys/syscall.h>
22 #include "sched/scheduler.h"
23 #include "eu/wgcm.h"
24 #include "eu/execute_unit.h"
25 #include "dfx/log/ffrt_log_api.h"
26 #include "internal_inc/config.h"
27 namespace ffrt {
HandleBlocked(const QoS & qos)28 void CPUMonitor::HandleBlocked(const QoS& qos)
29 {
30 int taskCount = ops.GetTaskCount(qos);
31 if (taskCount == 0) {
32 return;
33 }
34 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
35 workerCtrl.lock.lock();
36 size_t exeValue = static_cast<uint32_t>(workerCtrl.executionNum);
37 workerCtrl.lock.unlock();
38 size_t blockedNum = CountBlockedNum(qos);
39 if (blockedNum > 0 && (exeValue - blockedNum < workerCtrl.maxConcurrency) && exeValue < workerCtrl.hardLimit) {
40 Poke(qos);
41 }
42 }
43
MonitorMain(CPUMonitor * monitor)44 void MonitorMain(CPUMonitor* monitor)
45 {
46 (void)pthread_setname_np(pthread_self(), "ffrt_moniotor");
47 int ret = prctl(PR_WGCM_CTL, WGCM_CTL_SERVER_REG, 0, 0, 0);
48 if (ret) {
49 FFRT_LOGE("[SERVER] wgcm register server failed ret is %{public}d", ret);
50 }
51
52 ret = syscall(SYS_gettid);
53 if (ret == -1) {
54 monitor->monitorTid = 0;
55 FFRT_LOGE("syscall(SYS_gettid) failed");
56 } else {
57 monitor->monitorTid = static_cast<uint32_t>(ret);
58 }
59
60 for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
61 struct wgcm_workergrp_data grp = {0};
62 grp.gid = i;
63 grp.min_concur_workers = DEFAULT_MINCONCURRENCY;
64 grp.max_workers_sum = DEFAULT_HARDLIMIT;
65 ret = prctl(PR_WGCM_CTL, WGCM_CTL_SET_GRP, &grp, 0, 0);
66 if (ret) {
67 FFRT_LOGE("[SERVER] wgcm group %u register failed\n ret is %{public}d", i, ret);
68 }
69 }
70
71 while (true) {
72 struct wgcm_workergrp_data data = {0};
73 ret = prctl(PR_WGCM_CTL, WGCM_CTL_WAIT, &data, 0, 0);
74 if (ret) {
75 FFRT_LOGE("[SERVER] wgcm server wait failed ret is %{public}d", ret);
76 sleep(1);
77 continue;
78 }
79 if (data.woken_flag == WGCM_ACTIVELY_WAKE) {
80 break;
81 }
82
83 for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
84 monitor->HandleBlocked(qos);
85 }
86 }
87 ret = prctl(PR_WGCM_CTL, WGCM_CTL_UNREGISTER, 0, 0, 0);
88 if (ret) {
89 FFRT_LOGE("[SERVER] wgcm server unregister failed ret is %{public}d.", ret);
90 }
91 }
92
SetupMonitor()93 void CPUMonitor::SetupMonitor()
94 {
95 for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
96 ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT;
97 ctrlQueue[qos].workerManagerID = static_cast<uint32_t>(qos);
98 ctrlQueue[qos].maxConcurrency = GlobalConfig::Instance().getCpuWorkerNum(qos);
99 }
100 }
101
RegWorker(const QoS & qos)102 void CPUMonitor::RegWorker(const QoS& qos)
103 {
104 struct wgcm_workergrp_data grp;
105 grp.gid = static_cast<uint32_t>(qos);
106 grp.server_tid = monitorTid;
107 int ret = prctl(PR_WGCM_CTL, WGCM_CTL_WORKER_REG, &grp, 0, 0);
108 if (ret) {
109 FFRT_LOGE("[WORKER] Register failed! error=%d\n", ret);
110 }
111 }
112
UnRegWorker()113 void CPUMonitor::UnRegWorker()
114 {
115 int ret = prctl(PR_WGCM_CTL, WGCM_CTL_UNREGISTER, 0, 0, 0);
116 if (ret) {
117 FFRT_LOGE("leave workgroup failed error=%d\n", ret);
118 }
119 }
120
CPUMonitor(CpuMonitorOps && ops)121 CPUMonitor::CPUMonitor(CpuMonitorOps&& ops) : ops(ops)
122 {
123 SetupMonitor();
124 monitorThread = nullptr;
125 }
126
~CPUMonitor()127 CPUMonitor::~CPUMonitor()
128 {
129 if (monitorThread != nullptr) {
130 monitorThread->join();
131 }
132 delete monitorThread;
133 monitorThread = nullptr;
134 }
135
StartMonitor()136 void CPUMonitor::StartMonitor()
137 {
138 monitorThread = new std::thread(MonitorMain, this);
139 }
140
GetMonitorTid() const141 uint32_t CPUMonitor::GetMonitorTid() const
142 {
143 return monitorTid;
144 }
145
IncSleepingRef(const QoS & qos)146 void CPUMonitor::IncSleepingRef(const QoS& qos)
147 {
148 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
149 workerCtrl.lock.lock();
150 workerCtrl.sleepingWorkerNum++;
151 workerCtrl.lock.unlock();
152 }
153
DecSleepingRef(const QoS & qos)154 void CPUMonitor::DecSleepingRef(const QoS& qos)
155 {
156 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
157 workerCtrl.lock.lock();
158 workerCtrl.sleepingWorkerNum--;
159 workerCtrl.lock.unlock();
160 }
161
DecExeNumRef(const QoS & qos)162 void CPUMonitor::DecExeNumRef(const QoS& qos)
163 {
164 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
165 workerCtrl.lock.lock();
166 workerCtrl.executionNum--;
167 workerCtrl.lock.unlock();
168 }
169
CountBlockedNum(const QoS & qos)170 size_t CPUMonitor::CountBlockedNum(const QoS& qos)
171 {
172 struct wgcm_workergrp_data grp = {0};
173 grp.gid = static_cast<uint32_t>(qos);
174 grp.server_tid = monitorTid;
175 int ret = prctl(PR_WGCM_CTL, WGCM_CTL_GET, &grp, 0, 0);
176 if (ret) {
177 FFRT_LOGE("failed to get wgcm count");
178 } else {
179 return static_cast<size_t>(grp.blk_workers_sum);
180 }
181 return 0;
182 }
183
Notify(const QoS & qos,TaskNotifyType notifyType)184 void CPUMonitor::Notify(const QoS& qos, TaskNotifyType notifyType)
185 {
186 int taskCount = ops.GetTaskCount(qos);
187 FFRT_LOGD("qos[%d] task notify op[%d] cnt[%ld]", (int)qos, (int)notifyType, taskCount);
188 switch (notifyType) {
189 case TaskNotifyType::TASK_ADDED:
190 if (taskCount > 0) {
191 Poke(qos);
192 }
193 break;
194 case TaskNotifyType::TASK_PICKED:
195 if (taskCount > 0) {
196 Poke(qos);
197 }
198 break;
199 default:
200 break;
201 }
202 }
203
TimeoutCount(const QoS & qos)204 void CPUMonitor::TimeoutCount(const QoS& qos)
205 {
206 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
207 workerCtrl.lock.lock();
208 workerCtrl.sleepingWorkerNum--;
209 workerCtrl.lock.unlock();
210 }
211
WakeupCount(const QoS & qos)212 void CPUMonitor::WakeupCount(const QoS& qos)
213 {
214 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
215 workerCtrl.lock.lock();
216 workerCtrl.sleepingWorkerNum--;
217 workerCtrl.executionNum++;
218 workerCtrl.lock.unlock();
219 }
220
IntoSleep(const QoS & qos)221 void CPUMonitor::IntoSleep(const QoS& qos)
222 {
223 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
224 workerCtrl.lock.lock();
225 workerCtrl.sleepingWorkerNum++;
226 workerCtrl.executionNum--;
227 workerCtrl.lock.unlock();
228 }
229
Poke(const QoS & qos)230 void CPUMonitor::Poke(const QoS& qos)
231 {
232 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
233 workerCtrl.lock.lock();
234 FFRT_LOGD("qos[%d] exe num[%d] slp num[%d]", (int)qos, workerCtrl.executionNum, workerCtrl.sleepingWorkerNum);
235 if (static_cast<uint32_t>(workerCtrl.executionNum) < workerCtrl.maxConcurrency) {
236 if (workerCtrl.sleepingWorkerNum == 0) {
237 workerCtrl.executionNum++;
238 workerCtrl.lock.unlock();
239 ops.IncWorker(qos);
240 } else {
241 workerCtrl.lock.unlock();
242 ops.WakeupWorkers(qos);
243 }
244 } else {
245 workerCtrl.lock.unlock();
246 }
247 }
248 }
249