1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "util/worker_monitor.h"
17 #include <cstring>
18 #include <iostream>
19 #include <fstream>
20 #include <sstream>
21 #include <regex>
22 #ifdef FFRT_OH_TRACE_ENABLE
23 #include "backtrace_local.h"
24 #endif
25
26 #include "dfx/sysevent/sysevent.h"
27 #include "eu/execute_unit.h"
28 #include "eu/execute_unit.h"
29 #include "eu/co_routine_factory.h"
30 #include "internal_inc/osal.h"
31 #include "sched/scheduler.h"
32 #include "util/ffrt_facade.h"
33 #include "util/white_list.h"
34 #include "dfx/bbox/bbox.h"
35 #include "tm/task_factory.h"
36
37 namespace {
38 constexpr int HISYSEVENT_TIMEOUT_SEC = 60;
39 constexpr int MONITOR_SAMPLING_CYCLE_US = 500 * 1000;
40 constexpr unsigned int RECORD_WORKER_STATUS_INFO_FREQ = 120;
41 constexpr int SAMPLING_TIMES_PER_SEC = 1000 * 1000 / MONITOR_SAMPLING_CYCLE_US;
42 constexpr uint64_t TIMEOUT_MEMSHRINK_CYCLE_US = 60 * 1000 * 1000;
43 constexpr int RECORD_IPC_INFO_TIME_THRESHOLD = 600;
44 constexpr int BACKTRACE_TASK_QOS = 7;
45 constexpr char IPC_STACK_NAME[] = "libipc_common";
46 constexpr char TRANSACTION_PATH[] = "/proc/transaction_proc";
47 const std::vector<int> TIMEOUT_RECORD_CYCLE_LIST = { 1, 3, 5, 10, 30, 60, 10 * 60, 30 * 60 };
48 constexpr uint32_t US_PER_MS = 1000;
49 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000 * US_PER_MS; // 1s
50 constexpr uint64_t ALLOW_ACC_ERROR_US = 10 * US_PER_MS; // 10ms
51 constexpr uint32_t INITIAL_RECORD_LIMIT = 16;
52 constexpr uint32_t MAX_RECORD_LIMIT = 64;
53 constexpr int FIRST_THRESHOLD = 10;
54 constexpr int SECOND_THRESHOLD = 100;
55 }
56
57 namespace ffrt {
WorkerMonitor()58 WorkerMonitor::WorkerMonitor()
59 {
60 if (WhiteList::GetInstance().IsEnabled("worker_monitor", false)) {
61 FFRT_SYSEVENT_LOGW("Skip worker monitor.");
62 skipSampling_ = true;
63 return;
64 }
65 taskTimeoutInfo_.reserve(INITIAL_RECORD_LIMIT);
66 uint64_t timeout = ffrt_task_timeout_get_threshold() * US_PER_MS;
67 timeoutUs_ = timeout;
68 if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
69 FFRT_LOGE("invalid watchdog timeout [%llu] us, using 1s instead", timeout);
70 timeoutUs_ = MIN_TIMEOUT_THRESHOLD_US;
71 }
72
73 watchdogWaitEntry_.cb = ([this](WaitEntry* we) {
74 (void)we;
75 CheckWorkerStatus();
76 FFRTFacade::GetPPInstance().MonitTimeOut();
77 });
78 tskMonitorWaitEntry_.cb = ([this](WaitEntry* we) { CheckTaskStatus(); });
79 memReleaseWaitEntry_.cb = ([this](WaitEntry* we) {
80 std::lock_guard lock(mutex_);
81 if (skipSampling_) {
82 return;
83 }
84
85 {
86 bool noWorkerThreads = true;
87 std::lock_guard submitTaskLock(submitTaskMutex_);
88 for (int i = 0; i < QoS::MaxNum(); i++) {
89 CPUWorkerGroup& workerGroup = FFRTFacade::GetEUInstance().GetWorkerGroup(i);
90 std::shared_lock<std::shared_mutex> lck(workerGroup.tgMutex);
91 if (!workerGroup.threads.empty()) {
92 noWorkerThreads = false;
93 break;
94 }
95 }
96 if (noWorkerThreads) {
97 CoRoutineReleaseMem();
98 memReleaseTaskExit_ = true;
99 return;
100 }
101 }
102
103 CoRoutineReleaseMem();
104 SubmitMemReleaseTask();
105 });
106 }
107
~WorkerMonitor()108 WorkerMonitor::~WorkerMonitor()
109 {
110 FFRT_LOGW("WorkerMonitor destruction enter");
111 std::lock_guard lock(mutex_);
112 skipSampling_ = true;
113 }
114
GetInstance()115 WorkerMonitor& WorkerMonitor::GetInstance()
116 {
117 static WorkerMonitor instance;
118 return instance;
119 }
120
SubmitTask()121 void WorkerMonitor::SubmitTask()
122 {
123 if (skipSampling_) {
124 return;
125 }
126
127 std::lock_guard submitTaskLock(submitTaskMutex_);
128 if (samplingTaskExit_) {
129 SubmitSamplingTask();
130 samplingTaskExit_ = false;
131 }
132 if (taskMonitorExit_) {
133 SubmitTaskMonitor(timeoutUs_);
134 taskMonitorExit_ = false;
135 }
136 if (memReleaseTaskExit_) {
137 SubmitMemReleaseTask();
138 memReleaseTaskExit_ = false;
139 }
140 }
141
SubmitSamplingTask()142 void WorkerMonitor::SubmitSamplingTask()
143 {
144 watchdogWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(MONITOR_SAMPLING_CYCLE_US);
145 if (!DelayedWakeup(watchdogWaitEntry_.tp, &watchdogWaitEntry_, watchdogWaitEntry_.cb, true)) {
146 FFRT_LOGW("Set delayed worker failed.");
147 }
148 }
149
SubmitTaskMonitor(uint64_t nextTimeoutUs)150 void WorkerMonitor::SubmitTaskMonitor(uint64_t nextTimeoutUs)
151 {
152 tskMonitorWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(nextTimeoutUs);
153 if (!DelayedWakeup(tskMonitorWaitEntry_.tp, &tskMonitorWaitEntry_, tskMonitorWaitEntry_.cb, true)) {
154 FFRT_LOGW("Set delayed worker failed.");
155 }
156 }
157
SubmitMemReleaseTask()158 void WorkerMonitor::SubmitMemReleaseTask()
159 {
160 memReleaseWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(TIMEOUT_MEMSHRINK_CYCLE_US);
161 if (!DelayedWakeup(memReleaseWaitEntry_.tp, &memReleaseWaitEntry_, memReleaseWaitEntry_.cb, true)) {
162 FFRT_LOGW("Set delayed worker failed.");
163 }
164 }
165
CheckWorkerStatus()166 void WorkerMonitor::CheckWorkerStatus()
167 {
168 std::lock_guard lock(mutex_);
169 if (skipSampling_) {
170 return;
171 }
172
173 {
174 bool noWorkerThreads = true;
175 std::lock_guard submitTaskLock(submitTaskMutex_);
176 for (int i = 0; i < QoS::MaxNum(); i++) {
177 CPUWorkerGroup& workerGroup = FFRTFacade::GetEUInstance().GetWorkerGroup(i);
178 std::shared_lock<std::shared_mutex> lck(workerGroup.tgMutex);
179 if (!workerGroup.threads.empty()) {
180 noWorkerThreads = false;
181 break;
182 }
183 }
184 if (noWorkerThreads) {
185 samplingTaskExit_ = true;
186 return;
187 }
188 }
189
190 if (samplingTaskCount_++ % RECORD_WORKER_STATUS_INFO_FREQ == 0) {
191 RecordWorkerStatusInfo();
192 }
193
194 std::vector<TimeoutFunctionInfo> timeoutFunctions;
195 for (int i = 0; i < QoS::MaxNum(); i++) {
196 CPUWorkerGroup& workerGroup = FFRTFacade::GetEUInstance().GetWorkerGroup(i);
197 std::shared_lock<std::shared_mutex> lck(workerGroup.tgMutex);
198 CoWorkerInfo coWorkerInfo(i, workerGroup.threads.size(), workerGroup.executingNum, workerGroup.sleepingNum);
199 for (auto& thread : workerGroup.threads) {
200 CPUWorker* worker = thread.first;
201 if (!worker->Monitor()) {
202 continue;
203 }
204
205 TaskBase* workerTask = worker->curTask.load(std::memory_order_relaxed);
206 if (workerTask == nullptr) {
207 workerStatus_.erase(worker);
208 continue;
209 }
210
211 RecordTimeoutFunctionInfo(coWorkerInfo, worker, workerTask, timeoutFunctions);
212 }
213 }
214
215 if (timeoutFunctions.size() > 0) {
216 FFRTFacade::GetDWInstance().SubmitAsyncTask([this, timeoutFunctions] {
217 for (const auto& timeoutFunction : timeoutFunctions) {
218 RecordSymbolAndBacktrace(timeoutFunction);
219 }
220 });
221 }
222
223 SubmitSamplingTask();
224 }
225
CheckTaskStatus()226 void WorkerMonitor::CheckTaskStatus()
227 {
228 std::lock_guard lock(mutex_);
229 if (skipSampling_) {
230 return;
231 }
232
233 std::vector<CPUEUTask*> activeTask;
234 auto unfree = TaskFactory<CPUEUTask>::GetUnfreedTasksFiltered();
235 for (auto task : unfree) {
236 auto t = reinterpret_cast<CPUEUTask*>(task);
237 if (t->type == ffrt_normal_task && t->aliveStatus.load(std::memory_order_relaxed) == AliveStatus::INITED) {
238 activeTask.emplace_back(t);
239 }
240 }
241
242 {
243 bool noWorkerThreads = true;
244 std::lock_guard submitTaskLock(submitTaskMutex_);
245
246 for (int i = 0; i < QoS::MaxNum(); i++) {
247 CPUWorkerGroup& workerGroup = FFRTFacade::GetEUInstance().GetWorkerGroup(i);
248 std::shared_lock<std::shared_mutex> lck(workerGroup.tgMutex);
249 if (!workerGroup.threads.empty()) {
250 noWorkerThreads = false;
251 break;
252 }
253 }
254
255 // no active worker, no active normal task, no need to monitor
256 if (noWorkerThreads && activeTask.empty()) {
257 taskMonitorExit_ = true;
258 for (auto& task : unfree) {
259 reinterpret_cast<CPUEUTask*>(task)->DecDeleteRef();
260 }
261 return;
262 }
263 }
264
265 uint64_t now = TimeStampCntvct();
266 uint64_t minStart = now - ((timeoutUs_ - ALLOW_ACC_ERROR_US));
267 uint64_t curMinTimeStamp = UINT64_MAX;
268
269 for (auto task : activeTask) {
270 uint64_t curTimeStamp = CalculateTaskTimeout(task, minStart);
271 curMinTimeStamp = curTimeStamp < curMinTimeStamp ? curTimeStamp : curMinTimeStamp;
272 }
273 for (auto& task : unfree) {
274 reinterpret_cast<CPUEUTask*>(task)->DecDeleteRef();
275 }
276
277 // 下次检查时间为所有当前任务中的最小状态时间
278 uint64_t nextTimeout = (curMinTimeStamp == UINT64_MAX) ? timeoutUs_ :
279 std::min(timeoutUs_, timeoutUs_ - (now - curMinTimeStamp));
280
281 SubmitTaskMonitor(nextTimeout);
282 }
283
CalculateTaskTimeout(CPUEUTask * task,uint64_t timeoutThreshold)284 uint64_t WorkerMonitor::CalculateTaskTimeout(CPUEUTask* task, uint64_t timeoutThreshold)
285 {
286 // 主动延时的任务不检测
287 if (!task->monitorTimeout_ || isDelayingTask(task) ||
288 (task->delayTime > 0 && task->curStatus == TaskStatus::SUBMITTED)) {
289 return UINT64_MAX;
290 }
291
292 uint64_t curTaskTime = task->statusTime.load(std::memory_order_relaxed);
293 uint64_t timeoutCount = task->timeoutTask.timeoutCnt;
294
295 if (curTaskTime + timeoutCount * timeoutUs_ < timeoutThreshold) {
296 RecordTimeoutTaskInfo(task);
297 return UINT64_MAX;
298 }
299 return curTaskTime;
300 }
301
ControlTimeoutFreq(CPUEUTask * task)302 bool WorkerMonitor::ControlTimeoutFreq(CPUEUTask* task)
303 {
304 uint64_t timoutCnt = task->timeoutTask.timeoutCnt;
305 return (timoutCnt < FIRST_THRESHOLD) || (timoutCnt < SECOND_THRESHOLD && timoutCnt % FIRST_THRESHOLD == 0) ||
306 (timoutCnt % SECOND_THRESHOLD == 0);
307 }
308
RecordTimeoutTaskInfo(CPUEUTask * task)309 void WorkerMonitor::RecordTimeoutTaskInfo(CPUEUTask* task)
310 {
311 TaskStatus curTaskStatus = task->curStatus;
312 uint64_t curTaskTime = task->statusTime.load(std::memory_order_relaxed);
313 TaskStatus preTaskStatus = task->preStatus.load(std::memory_order_relaxed);
314
315 TimeoutTask& timeoutTskInfo = task->timeoutTask;
316 if (task->gid == timeoutTskInfo.taskGid && curTaskStatus == timeoutTskInfo.taskStatus) {
317 timeoutTskInfo.timeoutCnt += 1;
318 } else {
319 timeoutTskInfo.timeoutCnt = 1;
320 timeoutTskInfo.taskStatus = curTaskStatus;
321 timeoutTskInfo.taskGid = task->gid;
322 }
323
324 if (!ControlTimeoutFreq(task)) {
325 return;
326 }
327
328 std::stringstream ss;
329 uint64_t time = TimeStampCntvct();
330
331 ss << "Normal task[" << task->label.c_str() << "], gid[" << task->gid << "], qos[" << task->GetQos() <<
332 "], delay[" << task->delayTime << "]us, current status[" << StatusToString(curTaskStatus) <<
333 "], start at[" << FormatDateString4SteadyClock(curTaskTime) << "]";
334
335 if (taskTimeoutInfo_.size() > MAX_RECORD_LIMIT) {
336 taskTimeoutInfo_.erase(taskTimeoutInfo_.begin());
337 }
338 taskTimeoutInfo_.emplace_back(std::make_pair(time, ss.str()));
339
340 ss << ", last status[" << StatusToString(preTaskStatus) << "], timeout for[" <<
341 timeoutUs_ / MIN_TIMEOUT_THRESHOLD_US << "]s, reported count: " << timeoutTskInfo.timeoutCnt;
342 FFRT_LOGW("%s", ss.str().c_str());
343 return;
344 }
345
DumpTimeoutInfo()346 std::string WorkerMonitor::DumpTimeoutInfo()
347 {
348 std::lock_guard lock(mutex_);
349 std::stringstream ss;
350 if (taskTimeoutInfo_.size() != 0) {
351 for (auto it = taskTimeoutInfo_.rbegin(); it != taskTimeoutInfo_.rend(); ++it) {
352 auto& record = *it;
353 ss << "{" << FormatDateString4SteadyClock(record.first) << ", " << record.second << "} \n";
354 }
355 } else {
356 ss << "Timeout info Empty";
357 }
358 return ss.str();
359 }
360
RecordTimeoutFunctionInfo(const CoWorkerInfo & coWorkerInfo,CPUWorker * worker,TaskBase * workerTask,std::vector<TimeoutFunctionInfo> & timeoutFunctions)361 void WorkerMonitor::RecordTimeoutFunctionInfo(const CoWorkerInfo& coWorkerInfo, CPUWorker* worker,
362 TaskBase* workerTask, std::vector<TimeoutFunctionInfo>& timeoutFunctions)
363 {
364 auto workerIter = workerStatus_.find(worker);
365 if (workerIter == workerStatus_.end()) {
366 workerStatus_[worker] = TaskTimeoutInfo(workerTask);
367 return;
368 }
369
370 TaskTimeoutInfo& taskInfo = workerIter->second;
371 if (taskInfo.task_ == workerTask) {
372 if (++taskInfo.sampledTimes_ < SAMPLING_TIMES_PER_SEC) {
373 return;
374 }
375
376 taskInfo.sampledTimes_ = 0;
377 if (++taskInfo.executionTime_ % TIMEOUT_RECORD_CYCLE_LIST[taskInfo.recordLevel_] == 0) {
378 WorkerInfo workerInfo(worker->Id(), worker->curTaskGid_,
379 worker->curTaskType_.load(std::memory_order_relaxed), worker->curTaskLabel_);
380 timeoutFunctions.emplace_back(coWorkerInfo, workerInfo, taskInfo.executionTime_);
381 if (taskInfo.recordLevel_ < static_cast<int>(TIMEOUT_RECORD_CYCLE_LIST.size()) - 1) {
382 taskInfo.recordLevel_++;
383 }
384 }
385
386 return;
387 }
388
389 if (taskInfo.executionTime_ > 0) {
390 FFRT_LOGI("Tid[%d] function is executed, which occupies worker for [%d]s.",
391 worker->Id(), taskInfo.executionTime_);
392 }
393 workerIter->second = TaskTimeoutInfo(workerTask);
394 }
395
RecordSymbolAndBacktrace(const TimeoutFunctionInfo & timeoutFunction)396 void WorkerMonitor::RecordSymbolAndBacktrace(const TimeoutFunctionInfo& timeoutFunction)
397 {
398 std::stringstream ss;
399 std::string processNameStr = std::string(GetCurrentProcessName());
400 ss << "Task_Sch_Timeout: process name:[" << processNameStr << "], Tid:[" << timeoutFunction.workerInfo_.tid_ <<
401 "], Worker QoS Level:[" << timeoutFunction.coWorkerInfo_.qosLevel_ << "], Concurrent Worker Count:[" <<
402 timeoutFunction.coWorkerInfo_.coWorkerCount_ << "], Execution Worker Number:[" <<
403 timeoutFunction.coWorkerInfo_.executionNum_ << "], Sleeping Worker Number:[" <<
404 timeoutFunction.coWorkerInfo_.sleepingWorkerNum_ << "], Task Type:[" <<
405 timeoutFunction.workerInfo_.workerTaskType_ << "], ";
406
407 #ifdef WORKER_CACHE_TASKNAMEID
408 if (timeoutFunction.workerInfo_.workerTaskType_ == ffrt_normal_task ||
409 timeoutFunction.workerInfo_.workerTaskType_ == ffrt_queue_task) {
410 ss << "Task Name:[" << timeoutFunction.workerInfo_.label_ <<
411 "], Task Id:[" << timeoutFunction.workerInfo_.gid_ << "], ";
412 }
413 #endif
414
415 ss << "occupies worker for more than [" << timeoutFunction.executionTime_ << "]s";
416 FFRT_LOGW("%s", ss.str().c_str());
417
418 #ifdef FFRT_OH_TRACE_ENABLE
419 std::string dumpInfo;
420 if (OHOS::HiviewDFX::GetBacktraceStringByTid(dumpInfo, timeoutFunction.workerInfo_.tid_, 0, false)) {
421 FFRT_LOGW("Backtrace:\n%s", dumpInfo.c_str());
422 if (timeoutFunction.executionTime_ >= RECORD_IPC_INFO_TIME_THRESHOLD) {
423 RecordIpcInfo(dumpInfo, timeoutFunction.workerInfo_.tid_);
424 }
425 }
426 #endif
427 #ifdef FFRT_SEND_EVENT
428 if (timeoutFunction.executionTime_ == HISYSEVENT_TIMEOUT_SEC) {
429 std::string senarioName = "Task_Sch_Timeout";
430 TaskTimeoutReport(ss, processNameStr, senarioName);
431 }
432 #endif
433 }
434
RecordIpcInfo(const std::string & dumpInfo,int tid)435 void WorkerMonitor::RecordIpcInfo(const std::string& dumpInfo, int tid)
436 {
437 if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos) {
438 return;
439 }
440
441 std::ifstream transactionFile(TRANSACTION_PATH);
442 FFRT_COND_DO_ERR(!transactionFile.is_open(), return, "open transaction_proc failed");
443
444 FFRT_LOGW("transaction_proc:");
445 std::string line;
446 std::string regexStr = ".*" + std::to_string(tid) + ".*to.*code.*";
447 while (getline(transactionFile, line)) {
448 if (std::regex_match(line, std::regex(regexStr))) {
449 FFRT_LOGW("%s", line.c_str());
450 }
451 }
452
453 transactionFile.close();
454 }
455
RecordKeyInfo(const std::string & dumpInfo)456 void WorkerMonitor::RecordKeyInfo(const std::string& dumpInfo)
457 {
458 if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos || dumpInfo.find("libpower") == std::string::npos) {
459 return;
460 }
461
462 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
463 std::string keyInfo = SaveKeyInfo();
464 FFRT_LOGW("%s", keyInfo.c_str());
465 #endif
466 }
467
ProcessWorkerInfo(std::ostringstream & oss,bool & firstQos,int qos,unsigned int cnt,const std::deque<pid_t> & tids)468 void WorkerMonitor::ProcessWorkerInfo(std::ostringstream& oss, bool& firstQos, int qos, unsigned int cnt,
469 const std::deque<pid_t>& tids)
470 {
471 if (cnt == 0) {
472 return;
473 }
474
475 if (!firstQos) {
476 oss << " ";
477 }
478 firstQos = false;
479
480 oss << "qos:" << qos << " cnt:" << cnt << " tids:";
481 bool firstTid = true;
482 for (const auto& tid : tids) {
483 if (!firstTid) {
484 oss << ",";
485 }
486 firstTid = false;
487 oss << tid;
488 }
489 }
490
RecordWorkerStatusInfo()491 void WorkerMonitor::RecordWorkerStatusInfo()
492 {
493 std::ostringstream startedOss;
494 std::ostringstream exitedOss;
495 bool startedFirstQos = true;
496 bool exitedFirstQos = true;
497
498 for (int qos = 0; qos < QoS::MaxNum(); qos++) {
499 auto workerStatusInfo = FFRTFacade::GetEUInstance().GetWorkerStatusInfoAndReset(qos);
500 ProcessWorkerInfo(startedOss, startedFirstQos, qos, workerStatusInfo.startedCnt, workerStatusInfo.startedTids);
501 ProcessWorkerInfo(exitedOss, exitedFirstQos, qos, workerStatusInfo.exitedCnt, workerStatusInfo.exitedTids);
502 }
503
504 if (!startedOss.str().empty()) {
505 FFRT_LOGW("worker start: %s", startedOss.str().c_str());
506 }
507 if (!exitedOss.str().empty()) {
508 FFRT_LOGW("worker exit: %s", exitedOss.str().c_str());
509 }
510 }
511 }
512