1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "worker_monitor.h"
17 #include <cstring>
18 #include <iostream>
19 #include <fstream>
20 #include <sstream>
21 #include <regex>
22 #ifdef FFRT_OH_TRACE_ENABLE
23 #include "backtrace_local.h"
24 #endif
25
26 #include "dfx/sysevent/sysevent.h"
27 #include "eu/execute_unit.h"
28 #include "eu/worker_manager.h"
29 #include "eu/co_routine_factory.h"
30 #include "internal_inc/osal.h"
31 #include "sched/scheduler.h"
32 #include "util/ffrt_facade.h"
33 #include "dfx/bbox/bbox.h"
34
35 namespace {
36 constexpr int HISYSEVENT_TIMEOUT_SEC = 60;
37 constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
38 constexpr int MONITOR_SAMPLING_CYCLE_US = 500 * 1000;
39 constexpr unsigned int RECORD_POLLER_INFO_FREQ = 120;
40 constexpr int SAMPLING_TIMES_PER_SEC = 1000 * 1000 / MONITOR_SAMPLING_CYCLE_US;
41 constexpr uint64_t TIMEOUT_MEMSHRINK_CYCLE_US = 60 * 1000 * 1000;
42 constexpr int RECORD_IPC_INFO_TIME_THRESHOLD = 600;
43 constexpr char IPC_STACK_NAME[] = "libipc_common";
44 constexpr char TRANSACTION_PATH[] = "/proc/transaction_proc";
45 constexpr char CONF_FILEPATH[] = "/etc/ffrt/worker_monitor.conf";
46 const std::vector<int> TIMEOUT_RECORD_CYCLE_LIST = { 1, 3, 5, 10, 30, 60, 10 * 60, 30 * 60 };
47 unsigned int g_samplingTaskCount = 0;
48 }
49
50 namespace ffrt {
WorkerMonitor()51 WorkerMonitor::WorkerMonitor()
52 {
53 // 获取当前进程名称
54 char processName[PROCESS_NAME_BUFFER_LENGTH];
55 GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
56
57 // 从配置文件读取黑名单比对
58 std::string skipProcess;
59 std::ifstream file(CONF_FILEPATH);
60 if (file.is_open()) {
61 while (std::getline(file, skipProcess)) {
62 if (strstr(processName, skipProcess.c_str()) != nullptr) {
63 skipSampling_ = true;
64 return;
65 }
66 }
67 } else {
68 FFRT_LOGW("worker_monitor.conf does not exist or file permission denied");
69 }
70
71 watchdogWaitEntry_.cb = ([this](WaitEntry* we) { CheckWorkerStatus(); });
72 memReleaseWaitEntry_.cb = ([this](WaitEntry* we) {
73 std::lock_guard lock(mutex_);
74 if (skipSampling_) {
75 return;
76 }
77
78 WorkerGroupCtl* workerGroup = FFRTFacade::GetEUInstance().GetGroupCtl();
79 {
80 bool noWorkerThreads = true;
81 std::lock_guard submitTaskLock(submitTaskMutex_);
82 for (int i = 0; i < QoS::MaxNum(); i++) {
83 std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
84 if (!workerGroup[i].threads.empty()) {
85 noWorkerThreads = false;
86 break;
87 }
88 }
89 if (noWorkerThreads) {
90 CoRoutineReleaseMem();
91 samplingTaskExit_ = true;
92 return;
93 }
94 }
95
96 CoRoutineReleaseMem();
97 SubmitMemReleaseTask();
98 });
99 }
100
~WorkerMonitor()101 WorkerMonitor::~WorkerMonitor()
102 {
103 FFRT_LOGW("WorkerMonitor destruction enter");
104 std::lock_guard lock(mutex_);
105 skipSampling_ = true;
106 }
107
GetInstance()108 WorkerMonitor& WorkerMonitor::GetInstance()
109 {
110 static WorkerMonitor instance;
111 return instance;
112 }
113
SubmitTask()114 void WorkerMonitor::SubmitTask()
115 {
116 if (skipSampling_) {
117 return;
118 }
119
120 std::lock_guard submitTaskLock(submitTaskMutex_);
121 if (samplingTaskExit_) {
122 SubmitSamplingTask();
123 samplingTaskExit_ = false;
124 }
125 if (memReleaseTaskExit_) {
126 SubmitMemReleaseTask();
127 memReleaseTaskExit_ = false;
128 }
129 }
130
SubmitSamplingTask()131 void WorkerMonitor::SubmitSamplingTask()
132 {
133 watchdogWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(MONITOR_SAMPLING_CYCLE_US);
134 if (!DelayedWakeup(watchdogWaitEntry_.tp, &watchdogWaitEntry_, watchdogWaitEntry_.cb)) {
135 FFRT_LOGW("Set delayed worker failed.");
136 }
137 }
138
SubmitMemReleaseTask()139 void WorkerMonitor::SubmitMemReleaseTask()
140 {
141 memReleaseWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(TIMEOUT_MEMSHRINK_CYCLE_US);
142 if (!DelayedWakeup(memReleaseWaitEntry_.tp, &memReleaseWaitEntry_, memReleaseWaitEntry_.cb)) {
143 FFRT_LOGW("Set delayed worker failed.");
144 }
145 }
146
CheckWorkerStatus()147 void WorkerMonitor::CheckWorkerStatus()
148 {
149 std::lock_guard lock(mutex_);
150 if (skipSampling_) {
151 return;
152 }
153
154 WorkerGroupCtl* workerGroup = FFRTFacade::GetEUInstance().GetGroupCtl();
155 {
156 bool noWorkerThreads = true;
157 std::lock_guard submitTaskLock(submitTaskMutex_);
158 for (int i = 0; i < QoS::MaxNum(); i++) {
159 std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
160 if (!workerGroup[i].threads.empty()) {
161 noWorkerThreads = false;
162 break;
163 }
164 }
165 if (noWorkerThreads) {
166 samplingTaskExit_ = true;
167 return;
168 }
169 }
170
171 if (g_samplingTaskCount++ % RECORD_POLLER_INFO_FREQ == 0) {
172 RecordPollerInfo();
173 }
174
175 std::vector<TimeoutFunctionInfo> timeoutFunctions;
176 for (int i = 0; i < QoS::MaxNum(); i++) {
177 int executionNum = FFRTFacade::GetEUInstance().GetCPUMonitor()->WakedWorkerNum(i);
178 int sleepingWorkerNum = FFRTFacade::GetEUInstance().GetCPUMonitor()->SleepingWorkerNum(i);
179
180 std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
181 CoWorkerInfo coWorkerInfo(i, workerGroup[i].threads.size(), executionNum, sleepingWorkerNum);
182 for (auto& thread : workerGroup[i].threads) {
183 WorkerThread* worker = thread.first;
184 CPUEUTask* workerTask = worker->curTask;
185 if (workerTask == nullptr) {
186 workerStatus_.erase(worker);
187 continue;
188 }
189
190 RecordTimeoutFunctionInfo(coWorkerInfo, worker, workerTask, timeoutFunctions);
191 }
192 }
193
194 for (const auto& timeoutFunction : timeoutFunctions) {
195 RecordSymbolAndBacktrace(timeoutFunction);
196 }
197
198 SubmitSamplingTask();
199 }
200
RecordTimeoutFunctionInfo(const CoWorkerInfo & coWorkerInfo,WorkerThread * worker,CPUEUTask * workerTask,std::vector<TimeoutFunctionInfo> & timeoutFunctions)201 void WorkerMonitor::RecordTimeoutFunctionInfo(const CoWorkerInfo& coWorkerInfo, WorkerThread* worker,
202 CPUEUTask* workerTask, std::vector<TimeoutFunctionInfo>& timeoutFunctions)
203 {
204 auto workerIter = workerStatus_.find(worker);
205 if (workerIter == workerStatus_.end()) {
206 workerStatus_[worker] = TaskTimeoutInfo(workerTask);
207 return;
208 }
209
210 TaskTimeoutInfo& taskInfo = workerIter->second;
211 if (taskInfo.task_ == workerTask) {
212 if (++taskInfo.sampledTimes_ < SAMPLING_TIMES_PER_SEC) {
213 return;
214 }
215
216 taskInfo.sampledTimes_ = 0;
217 if (++taskInfo.executionTime_ % TIMEOUT_RECORD_CYCLE_LIST[taskInfo.recordLevel_] == 0) {
218 WorkerInfo workerInfo(worker->Id(), worker->curTaskGid_, worker->curTaskType_, worker->curTaskLabel_);
219 timeoutFunctions.emplace_back(coWorkerInfo, workerInfo, taskInfo.executionTime_);
220 if (taskInfo.recordLevel_ < static_cast<int>(TIMEOUT_RECORD_CYCLE_LIST.size()) - 1) {
221 taskInfo.recordLevel_++;
222 }
223 }
224
225 return;
226 }
227
228 if (taskInfo.executionTime_ > 0) {
229 FFRT_LOGI("Tid[%d] function is executed, which occupies worker for [%d]s.",
230 worker->Id(), taskInfo.executionTime_);
231 }
232 workerIter->second = TaskTimeoutInfo(workerTask);
233 }
234
RecordSymbolAndBacktrace(const TimeoutFunctionInfo & timeoutFunction)235 void WorkerMonitor::RecordSymbolAndBacktrace(const TimeoutFunctionInfo& timeoutFunction)
236 {
237 std::stringstream ss;
238 char processName[PROCESS_NAME_BUFFER_LENGTH];
239 GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
240 ss << "Task_Sch_Timeout: process name:[" << processName << "], Tid:[" << timeoutFunction.workerInfo_.tid_ <<
241 "], Worker QoS Level:[" << timeoutFunction.coWorkerInfo_.qosLevel_ << "], Concurrent Worker Count:[" <<
242 timeoutFunction.coWorkerInfo_.coWorkerCount_ << "], Execution Worker Number:[" <<
243 timeoutFunction.coWorkerInfo_.executionNum_ << "], Sleeping Worker Number:[" <<
244 timeoutFunction.coWorkerInfo_.sleepingWorkerNum_ << "], Task Type:[" <<
245 timeoutFunction.workerInfo_.workerTaskType_ << "], ";
246
247 #ifdef WORKER_CACHE_TASKNAMEID
248 if (timeoutFunction.workerInfo_.workerTaskType_ == ffrt_normal_task ||
249 timeoutFunction.workerInfo_.workerTaskType_ == ffrt_queue_task) {
250 ss << "Task Name:[" << timeoutFunction.workerInfo_.label_ <<
251 "], Task Id:[" << timeoutFunction.workerInfo_.gid_ << "], ";
252 }
253 #endif
254
255 ss << "occupies worker for more than [" << timeoutFunction.executionTime_ << "]s";
256 FFRT_LOGW("%s", ss.str().c_str());
257
258 #ifdef FFRT_OH_TRACE_ENABLE
259 std::string dumpInfo;
260 if (OHOS::HiviewDFX::GetBacktraceStringByTid(dumpInfo, timeoutFunction.workerInfo_.tid_, 0, false)) {
261 FFRT_LOGW("Backtrace:\n%s", dumpInfo.c_str());
262 if (timeoutFunction.executionTime_ >= RECORD_IPC_INFO_TIME_THRESHOLD) {
263 RecordIpcInfo(dumpInfo, timeoutFunction.workerInfo_.tid_);
264 }
265 }
266
267 RecordKeyInfo(dumpInfo);
268 #endif
269 #ifdef FFRT_SEND_EVENT
270 if (timeoutFunction.executionTime_ == HISYSEVENT_TIMEOUT_SEC) {
271 std::string processNameStr = std::string(processName);
272 std::string senarioName = "Task_Sch_Timeout";
273 TaskTimeoutReport(ss, processNameStr, senarioName);
274 }
275 #endif
276 }
277
RecordIpcInfo(const std::string & dumpInfo,int tid)278 void WorkerMonitor::RecordIpcInfo(const std::string& dumpInfo, int tid)
279 {
280 if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos) {
281 return;
282 }
283
284 std::ifstream transactionFile(TRANSACTION_PATH);
285 FFRT_COND_DO_ERR(!transactionFile.is_open(), return, "open transaction_proc failed");
286
287 FFRT_LOGW("transaction_proc:");
288 std::string line;
289 std::string regexStr = ".*" + std::to_string(tid) + ".*to.*code.*";
290 while (getline(transactionFile, line)) {
291 if (std::regex_match(line, std::regex(regexStr))) {
292 FFRT_LOGW("%s", line.c_str());
293 }
294 }
295
296 transactionFile.close();
297 }
298
RecordKeyInfo(const std::string & dumpInfo)299 void WorkerMonitor::RecordKeyInfo(const std::string& dumpInfo)
300 {
301 if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos || dumpInfo.find("libpower") == std::string::npos) {
302 return;
303 }
304
305 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
306 std::string keyInfo = SaveKeyInfo();
307 FFRT_LOGW("%s", keyInfo.c_str());
308 #endif
309 }
310
RecordPollerInfo()311 void WorkerMonitor::RecordPollerInfo()
312 {
313 std::stringstream ss;
314 for (int qos = 0; qos < QoS::MaxNum(); qos++) {
315 uint64_t pollCount = FFRTFacade::GetPPInstance().GetPoller(qos).GetPollCount();
316 if (pollCount > 0) {
317 ss << qos << ":" << pollCount << ";";
318 }
319 }
320
321 std::string result = ss.str();
322 if (!result.empty()) {
323 FFRT_LOGW("%s", result.c_str());
324 }
325 }
326 }
327