1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "worker_monitor.h"
17 #include <cstring>
18 #include <iostream>
19 #include <fstream>
20 #include <sstream>
21 #include <regex>
22 #ifdef FFRT_OH_TRACE_ENABLE
23 #include "backtrace_local.h"
24 #endif
25
26 #include "dfx/sysevent/sysevent.h"
27 #include "eu/execute_unit.h"
28 #include "eu/worker_manager.h"
29 #include "eu/co_routine_factory.h"
30 #include "internal_inc/osal.h"
31 #include "sched/scheduler.h"
32 #include "util/ffrt_facade.h"
33 #include "dfx/bbox/bbox.h"
34
35 namespace {
36 constexpr int HISYSEVENT_TIMEOUT_SEC = 60;
37 constexpr int MONITOR_SAMPLING_CYCLE_US = 500 * 1000;
38 constexpr unsigned int RECORD_POLLER_INFO_FREQ = 120;
39 constexpr int SAMPLING_TIMES_PER_SEC = 1000 * 1000 / MONITOR_SAMPLING_CYCLE_US;
40 constexpr uint64_t TIMEOUT_MEMSHRINK_CYCLE_US = 60 * 1000 * 1000;
41 constexpr int RECORD_IPC_INFO_TIME_THRESHOLD = 600;
42 constexpr int BACKTRACE_TASK_QOS = 7;
43 constexpr char IPC_STACK_NAME[] = "libipc_common";
44 constexpr char TRANSACTION_PATH[] = "/proc/transaction_proc";
45 constexpr char CONF_FILEPATH[] = "/etc/ffrt/worker_monitor.conf";
46 const std::vector<int> TIMEOUT_RECORD_CYCLE_LIST = { 1, 3, 5, 10, 30, 60, 10 * 60, 30 * 60 };
47 unsigned int g_samplingTaskCount = 0;
48 }
49
50 namespace ffrt {
WorkerMonitor()51 WorkerMonitor::WorkerMonitor()
52 {
53 // 获取当前进程名称
54 const char* processName = GetCurrentProcessName();
55 if (strlen(processName) == 0) {
56 FFRT_LOGW("Get process name failed, skip worker monitor.");
57 skipSampling_ = true;
58 return;
59 }
60
61 // 从配置文件读取黑名单比对
62 std::string skipProcess;
63 std::ifstream file(CONF_FILEPATH);
64 if (file.is_open()) {
65 while (std::getline(file, skipProcess)) {
66 if (strstr(processName, skipProcess.c_str()) != nullptr) {
67 skipSampling_ = true;
68 return;
69 }
70 }
71 } else {
72 FFRT_LOGW("worker_monitor.conf does not exist or file permission denied");
73 }
74
75 watchdogWaitEntry_.cb = ([this](WaitEntry* we) { CheckWorkerStatus(); });
76 memReleaseWaitEntry_.cb = ([this](WaitEntry* we) {
77 std::lock_guard lock(mutex_);
78 if (skipSampling_) {
79 return;
80 }
81
82 WorkerGroupCtl* workerGroup = FFRTFacade::GetEUInstance().GetGroupCtl();
83 {
84 bool noWorkerThreads = true;
85 std::lock_guard submitTaskLock(submitTaskMutex_);
86 for (int i = 0; i < QoS::MaxNum(); i++) {
87 std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
88 if (!workerGroup[i].threads.empty()) {
89 noWorkerThreads = false;
90 break;
91 }
92 }
93 if (noWorkerThreads) {
94 CoRoutineReleaseMem();
95 memReleaseTaskExit_ = true;
96 return;
97 }
98 }
99
100 CoRoutineReleaseMem();
101 SubmitMemReleaseTask();
102 });
103 }
104
~WorkerMonitor()105 WorkerMonitor::~WorkerMonitor()
106 {
107 FFRT_LOGW("WorkerMonitor destruction enter");
108 std::lock_guard lock(mutex_);
109 skipSampling_ = true;
110 }
111
GetInstance()112 WorkerMonitor& WorkerMonitor::GetInstance()
113 {
114 static WorkerMonitor instance;
115 return instance;
116 }
117
SubmitTask()118 void WorkerMonitor::SubmitTask()
119 {
120 if (skipSampling_) {
121 return;
122 }
123
124 std::lock_guard submitTaskLock(submitTaskMutex_);
125 if (samplingTaskExit_) {
126 SubmitSamplingTask();
127 samplingTaskExit_ = false;
128 }
129 if (memReleaseTaskExit_) {
130 SubmitMemReleaseTask();
131 memReleaseTaskExit_ = false;
132 }
133 }
134
SubmitSamplingTask()135 void WorkerMonitor::SubmitSamplingTask()
136 {
137 watchdogWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(MONITOR_SAMPLING_CYCLE_US);
138 if (!DelayedWakeup(watchdogWaitEntry_.tp, &watchdogWaitEntry_, watchdogWaitEntry_.cb)) {
139 FFRT_LOGW("Set delayed worker failed.");
140 }
141 }
142
SubmitMemReleaseTask()143 void WorkerMonitor::SubmitMemReleaseTask()
144 {
145 if (skipSampling_) {
146 return;
147 }
148 memReleaseWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(TIMEOUT_MEMSHRINK_CYCLE_US);
149 if (!DelayedWakeup(memReleaseWaitEntry_.tp, &memReleaseWaitEntry_, memReleaseWaitEntry_.cb)) {
150 FFRT_LOGW("Set delayed worker failed.");
151 }
152 }
153
CheckWorkerStatus()154 void WorkerMonitor::CheckWorkerStatus()
155 {
156 std::lock_guard lock(mutex_);
157 if (skipSampling_) {
158 return;
159 }
160
161 WorkerGroupCtl* workerGroup = FFRTFacade::GetEUInstance().GetGroupCtl();
162 {
163 bool noWorkerThreads = true;
164 std::lock_guard submitTaskLock(submitTaskMutex_);
165 for (int i = 0; i < QoS::MaxNum(); i++) {
166 std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
167 if (!workerGroup[i].threads.empty()) {
168 noWorkerThreads = false;
169 break;
170 }
171 }
172 if (noWorkerThreads) {
173 samplingTaskExit_ = true;
174 return;
175 }
176 }
177
178 if (g_samplingTaskCount++ % RECORD_POLLER_INFO_FREQ == 0) {
179 RecordPollerInfo();
180 }
181
182 std::vector<TimeoutFunctionInfo> timeoutFunctions;
183 for (int i = 0; i < QoS::MaxNum(); i++) {
184 int executionNum = FFRTFacade::GetEUInstance().GetCPUMonitor()->WakedWorkerNum(i);
185 int sleepingWorkerNum = FFRTFacade::GetEUInstance().GetCPUMonitor()->SleepingWorkerNum(i);
186
187 std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
188 CoWorkerInfo coWorkerInfo(i, workerGroup[i].threads.size(), executionNum, sleepingWorkerNum);
189 for (auto& thread : workerGroup[i].threads) {
190 WorkerThread* worker = thread.first;
191 CPUEUTask* workerTask = static_cast<CPUEUTask*>(worker->curTask);
192 if (workerTask == nullptr) {
193 workerStatus_.erase(worker);
194 continue;
195 }
196
197 RecordTimeoutFunctionInfo(coWorkerInfo, worker, workerTask, timeoutFunctions);
198 }
199 }
200
201 if (timeoutFunctions.size() > 0) {
202 FFRTFacade::GetDWInstance().SubmitAsyncTask([this, timeoutFunctions] {
203 for (const auto& timeoutFunction : timeoutFunctions) {
204 RecordSymbolAndBacktrace(timeoutFunction);
205 }
206 });
207 }
208
209 SubmitSamplingTask();
210 }
211
RecordTimeoutFunctionInfo(const CoWorkerInfo & coWorkerInfo,WorkerThread * worker,CPUEUTask * workerTask,std::vector<TimeoutFunctionInfo> & timeoutFunctions)212 void WorkerMonitor::RecordTimeoutFunctionInfo(const CoWorkerInfo& coWorkerInfo, WorkerThread* worker,
213 CPUEUTask* workerTask, std::vector<TimeoutFunctionInfo>& timeoutFunctions)
214 {
215 auto workerIter = workerStatus_.find(worker);
216 if (workerIter == workerStatus_.end()) {
217 workerStatus_[worker] = TaskTimeoutInfo(workerTask);
218 return;
219 }
220
221 TaskTimeoutInfo& taskInfo = workerIter->second;
222 if (taskInfo.task_ == workerTask) {
223 if (++taskInfo.sampledTimes_ < SAMPLING_TIMES_PER_SEC) {
224 return;
225 }
226
227 taskInfo.sampledTimes_ = 0;
228 if (++taskInfo.executionTime_ % TIMEOUT_RECORD_CYCLE_LIST[taskInfo.recordLevel_] == 0) {
229 WorkerInfo workerInfo(worker->Id(), worker->curTaskGid_, worker->curTaskType_, worker->curTaskLabel_);
230 timeoutFunctions.emplace_back(coWorkerInfo, workerInfo, taskInfo.executionTime_);
231 if (taskInfo.recordLevel_ < static_cast<int>(TIMEOUT_RECORD_CYCLE_LIST.size()) - 1) {
232 taskInfo.recordLevel_++;
233 }
234 }
235
236 return;
237 }
238
239 if (taskInfo.executionTime_ > 0) {
240 FFRT_LOGI("Tid[%d] function is executed, which occupies worker for [%d]s.",
241 worker->Id(), taskInfo.executionTime_);
242 }
243 workerIter->second = TaskTimeoutInfo(workerTask);
244 }
245
RecordSymbolAndBacktrace(const TimeoutFunctionInfo & timeoutFunction)246 void WorkerMonitor::RecordSymbolAndBacktrace(const TimeoutFunctionInfo& timeoutFunction)
247 {
248 std::stringstream ss;
249 std::string processNameStr = std::string(GetCurrentProcessName());
250 ss << "Task_Sch_Timeout: process name:[" << processNameStr << "], Tid:[" << timeoutFunction.workerInfo_.tid_ <<
251 "], Worker QoS Level:[" << timeoutFunction.coWorkerInfo_.qosLevel_ << "], Concurrent Worker Count:[" <<
252 timeoutFunction.coWorkerInfo_.coWorkerCount_ << "], Execution Worker Number:[" <<
253 timeoutFunction.coWorkerInfo_.executionNum_ << "], Sleeping Worker Number:[" <<
254 timeoutFunction.coWorkerInfo_.sleepingWorkerNum_ << "], Task Type:[" <<
255 timeoutFunction.workerInfo_.workerTaskType_ << "], ";
256
257 #ifdef WORKER_CACHE_TASKNAMEID
258 if (timeoutFunction.workerInfo_.workerTaskType_ == ffrt_normal_task ||
259 timeoutFunction.workerInfo_.workerTaskType_ == ffrt_queue_task) {
260 ss << "Task Name:[" << timeoutFunction.workerInfo_.label_ <<
261 "], Task Id:[" << timeoutFunction.workerInfo_.gid_ << "], ";
262 }
263 #endif
264
265 ss << "occupies worker for more than [" << timeoutFunction.executionTime_ << "]s";
266 FFRT_LOGW("%s", ss.str().c_str());
267
268 #ifdef FFRT_OH_TRACE_ENABLE
269 std::string dumpInfo;
270 if (OHOS::HiviewDFX::GetBacktraceStringByTid(dumpInfo, timeoutFunction.workerInfo_.tid_, 0, false)) {
271 FFRT_LOGW("Backtrace:\n%s", dumpInfo.c_str());
272 if (timeoutFunction.executionTime_ >= RECORD_IPC_INFO_TIME_THRESHOLD) {
273 RecordIpcInfo(dumpInfo, timeoutFunction.workerInfo_.tid_);
274 }
275 }
276 #endif
277 #ifdef FFRT_SEND_EVENT
278 if (timeoutFunction.executionTime_ == HISYSEVENT_TIMEOUT_SEC) {
279 std::string senarioName = "Task_Sch_Timeout";
280 TaskTimeoutReport(ss, processNameStr, senarioName);
281 }
282 #endif
283 }
284
RecordIpcInfo(const std::string & dumpInfo,int tid)285 void WorkerMonitor::RecordIpcInfo(const std::string& dumpInfo, int tid)
286 {
287 if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos) {
288 return;
289 }
290
291 std::ifstream transactionFile(TRANSACTION_PATH);
292 FFRT_COND_DO_ERR(!transactionFile.is_open(), return, "open transaction_proc failed");
293
294 FFRT_LOGW("transaction_proc:");
295 std::string line;
296 std::string regexStr = ".*" + std::to_string(tid) + ".*to.*code.*";
297 while (getline(transactionFile, line)) {
298 if (std::regex_match(line, std::regex(regexStr))) {
299 FFRT_LOGW("%s", line.c_str());
300 }
301 }
302
303 transactionFile.close();
304 }
305
RecordKeyInfo(const std::string & dumpInfo)306 void WorkerMonitor::RecordKeyInfo(const std::string& dumpInfo)
307 {
308 if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos || dumpInfo.find("libpower") == std::string::npos) {
309 return;
310 }
311
312 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
313 std::string keyInfo = SaveKeyInfo();
314 FFRT_LOGW("%s", keyInfo.c_str());
315 #endif
316 }
317
RecordPollerInfo()318 void WorkerMonitor::RecordPollerInfo()
319 {
320 std::stringstream ss;
321 for (int qos = 0; qos < QoS::MaxNum(); qos++) {
322 uint64_t pollCount = FFRTFacade::GetPPInstance().GetPoller(qos).GetPollCount();
323 if (pollCount > 0) {
324 ss << qos << ":" << pollCount << ";";
325 }
326 }
327
328 std::string result = ss.str();
329 if (!result.empty()) {
330 FFRT_LOGW("%s", result.c_str());
331 }
332 }
333 }
334