• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "eu/cpu_monitor.h"
17 #include <iostream>
18 #include <thread>
19 #include <unistd.h>
20 #include <sys/prctl.h>
21 #include <sys/syscall.h>
22 #include "sched/scheduler.h"
23 #include "eu/wgcm.h"
24 #include "eu/execute_unit.h"
25 #include "dfx/log/ffrt_log_api.h"
26 #include "internal_inc/config.h"
27 namespace ffrt {
HandleBlocked(const QoS & qos)28 void CPUMonitor::HandleBlocked(const QoS& qos)
29 {
30     int taskCount = ops.GetTaskCount(qos);
31     if (taskCount == 0) {
32         return;
33     }
34     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
35     workerCtrl.lock.lock();
36     size_t exeValue = static_cast<uint32_t>(workerCtrl.executionNum);
37     workerCtrl.lock.unlock();
38     size_t blockedNum = CountBlockedNum(qos);
39     if (blockedNum > 0 && (exeValue - blockedNum < workerCtrl.maxConcurrency) && exeValue < workerCtrl.hardLimit) {
40         Poke(qos);
41     }
42 }
43 
MonitorMain(CPUMonitor * monitor)44 void MonitorMain(CPUMonitor* monitor)
45 {
46     (void)pthread_setname_np(pthread_self(), "ffrt_moniotor");
47     int ret = prctl(PR_WGCM_CTL, WGCM_CTL_SERVER_REG, 0, 0, 0);
48     if (ret) {
49         FFRT_LOGE("[SERVER] wgcm register server failed ret is %{public}d", ret);
50     }
51 
52     ret = syscall(SYS_gettid);
53     if (ret == -1) {
54         monitor->monitorTid = 0;
55         FFRT_LOGE("syscall(SYS_gettid) failed");
56     } else {
57         monitor->monitorTid = static_cast<uint32_t>(ret);
58     }
59 
60     for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
61         struct wgcm_workergrp_data grp = {0};
62         grp.gid = i;
63         grp.min_concur_workers = DEFAULT_MINCONCURRENCY;
64         grp.max_workers_sum = DEFAULT_HARDLIMIT;
65         ret = prctl(PR_WGCM_CTL, WGCM_CTL_SET_GRP, &grp, 0, 0);
66         if (ret) {
67             FFRT_LOGE("[SERVER] wgcm group %u register failed\n ret is %{public}d", i, ret);
68         }
69     }
70 
71     while (true) {
72         struct wgcm_workergrp_data data = {0};
73         ret = prctl(PR_WGCM_CTL, WGCM_CTL_WAIT, &data, 0, 0);
74         if (ret) {
75             FFRT_LOGE("[SERVER] wgcm server wait failed ret is %{public}d", ret);
76             sleep(1);
77             continue;
78         }
79         if (data.woken_flag == WGCM_ACTIVELY_WAKE) {
80             break;
81         }
82 
83         for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
84             monitor->HandleBlocked(qos);
85         }
86     }
87     ret = prctl(PR_WGCM_CTL, WGCM_CTL_UNREGISTER, 0, 0, 0);
88     if (ret) {
89         FFRT_LOGE("[SERVER] wgcm server unregister failed ret is %{public}d.", ret);
90     }
91 }
92 
SetupMonitor()93 void CPUMonitor::SetupMonitor()
94 {
95     for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
96         ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT;
97         ctrlQueue[qos].workerManagerID = static_cast<uint32_t>(qos);
98         ctrlQueue[qos].maxConcurrency = GlobalConfig::Instance().getCpuWorkerNum(qos);
99     }
100 }
101 
RegWorker(const QoS & qos)102 void CPUMonitor::RegWorker(const QoS& qos)
103 {
104     struct wgcm_workergrp_data grp;
105     grp.gid = static_cast<uint32_t>(qos);
106     grp.server_tid = monitorTid;
107     int ret = prctl(PR_WGCM_CTL, WGCM_CTL_WORKER_REG, &grp, 0, 0);
108     if (ret) {
109         FFRT_LOGE("[WORKER] Register failed! error=%d\n", ret);
110     }
111 }
112 
UnRegWorker()113 void CPUMonitor::UnRegWorker()
114 {
115     int ret = prctl(PR_WGCM_CTL, WGCM_CTL_UNREGISTER, 0, 0, 0);
116     if (ret) {
117         FFRT_LOGE("leave workgroup failed  error=%d\n", ret);
118     }
119 }
120 
CPUMonitor(CpuMonitorOps && ops)121 CPUMonitor::CPUMonitor(CpuMonitorOps&& ops) : ops(ops)
122 {
123     SetupMonitor();
124     monitorThread = nullptr;
125 }
126 
~CPUMonitor()127 CPUMonitor::~CPUMonitor()
128 {
129     if (monitorThread != nullptr) {
130         monitorThread->join();
131     }
132     delete monitorThread;
133     monitorThread = nullptr;
134 }
135 
StartMonitor()136 void CPUMonitor::StartMonitor()
137 {
138     monitorThread = new std::thread(MonitorMain, this);
139 }
140 
GetMonitorTid() const141 uint32_t CPUMonitor::GetMonitorTid() const
142 {
143     return monitorTid;
144 }
145 
IncSleepingRef(const QoS & qos)146 void CPUMonitor::IncSleepingRef(const QoS& qos)
147 {
148     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
149     workerCtrl.lock.lock();
150     workerCtrl.sleepingWorkerNum++;
151     workerCtrl.lock.unlock();
152 }
153 
DecSleepingRef(const QoS & qos)154 void CPUMonitor::DecSleepingRef(const QoS& qos)
155 {
156     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
157     workerCtrl.lock.lock();
158     workerCtrl.sleepingWorkerNum--;
159     workerCtrl.lock.unlock();
160 }
161 
DecExeNumRef(const QoS & qos)162 void CPUMonitor::DecExeNumRef(const QoS& qos)
163 {
164     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
165     workerCtrl.lock.lock();
166     workerCtrl.executionNum--;
167     workerCtrl.lock.unlock();
168 }
169 
CountBlockedNum(const QoS & qos)170 size_t CPUMonitor::CountBlockedNum(const QoS& qos)
171 {
172     struct wgcm_workergrp_data grp = {0};
173     grp.gid = static_cast<uint32_t>(qos);
174     grp.server_tid = monitorTid;
175     int ret = prctl(PR_WGCM_CTL, WGCM_CTL_GET, &grp, 0, 0);
176     if (ret) {
177         FFRT_LOGE("failed to get wgcm count");
178     } else {
179         return static_cast<size_t>(grp.blk_workers_sum);
180     }
181     return 0;
182 }
183 
Notify(const QoS & qos,TaskNotifyType notifyType)184 void CPUMonitor::Notify(const QoS& qos, TaskNotifyType notifyType)
185 {
186     int taskCount = ops.GetTaskCount(qos);
187     FFRT_LOGD("qos[%d] task notify op[%d] cnt[%ld]", (int)qos, (int)notifyType, taskCount);
188     switch (notifyType) {
189         case TaskNotifyType::TASK_ADDED:
190             if (taskCount > 0) {
191                 Poke(qos);
192             }
193             break;
194         case TaskNotifyType::TASK_PICKED:
195             if (taskCount > 0) {
196                 Poke(qos);
197             }
198             break;
199         default:
200             break;
201     }
202 }
203 
TimeoutCount(const QoS & qos)204 void CPUMonitor::TimeoutCount(const QoS& qos)
205 {
206     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
207     workerCtrl.lock.lock();
208     workerCtrl.sleepingWorkerNum--;
209     workerCtrl.lock.unlock();
210 }
211 
WakeupCount(const QoS & qos)212 void CPUMonitor::WakeupCount(const QoS& qos)
213 {
214     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
215     workerCtrl.lock.lock();
216     workerCtrl.sleepingWorkerNum--;
217     workerCtrl.executionNum++;
218     workerCtrl.lock.unlock();
219 }
220 
IntoSleep(const QoS & qos)221 void CPUMonitor::IntoSleep(const QoS& qos)
222 {
223     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
224     workerCtrl.lock.lock();
225     workerCtrl.sleepingWorkerNum++;
226     workerCtrl.executionNum--;
227     workerCtrl.lock.unlock();
228 }
229 
Poke(const QoS & qos)230 void CPUMonitor::Poke(const QoS& qos)
231 {
232     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
233     workerCtrl.lock.lock();
234     FFRT_LOGD("qos[%d] exe num[%d] slp num[%d]", (int)qos, workerCtrl.executionNum, workerCtrl.sleepingWorkerNum);
235     if (static_cast<uint32_t>(workerCtrl.executionNum) < workerCtrl.maxConcurrency) {
236         if (workerCtrl.sleepingWorkerNum == 0) {
237             workerCtrl.executionNum++;
238             workerCtrl.lock.unlock();
239             ops.IncWorker(qos);
240         } else {
241             workerCtrl.lock.unlock();
242             ops.WakeupWorkers(qos);
243         }
244     } else {
245         workerCtrl.lock.unlock();
246     }
247 }
248 }
249