• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "util/worker_monitor.h"
17 #include <cstring>
18 #include <iostream>
19 #include <fstream>
20 #include <sstream>
21 #include <regex>
22 #ifdef FFRT_OH_TRACE_ENABLE
23 #include "backtrace_local.h"
24 #endif
25 
26 #include "dfx/sysevent/sysevent.h"
27 #include "eu/execute_unit.h"
28 #include "eu/execute_unit.h"
29 #include "eu/co_routine_factory.h"
30 #include "internal_inc/osal.h"
31 #include "sched/scheduler.h"
32 #include "util/ffrt_facade.h"
33 #include "util/white_list.h"
34 #include "dfx/bbox/bbox.h"
35 #include "tm/task_factory.h"
36 
37 namespace {
38 constexpr int HISYSEVENT_TIMEOUT_SEC = 60;
39 constexpr int MONITOR_SAMPLING_CYCLE_US = 500 * 1000;
40 constexpr unsigned int RECORD_WORKER_STATUS_INFO_FREQ = 120;
41 constexpr int SAMPLING_TIMES_PER_SEC = 1000 * 1000 / MONITOR_SAMPLING_CYCLE_US;
42 constexpr uint64_t TIMEOUT_MEMSHRINK_CYCLE_US = 60 * 1000 * 1000;
43 constexpr int RECORD_IPC_INFO_TIME_THRESHOLD = 600;
44 constexpr int BACKTRACE_TASK_QOS = 7;
45 constexpr char IPC_STACK_NAME[] = "libipc_common";
46 constexpr char TRANSACTION_PATH[] = "/proc/transaction_proc";
47 const std::vector<int> TIMEOUT_RECORD_CYCLE_LIST = { 1, 3, 5, 10, 30, 60, 10 * 60, 30 * 60 };
48 constexpr uint32_t US_PER_MS = 1000;
49 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000 * US_PER_MS; // 1s
50 constexpr uint64_t ALLOW_ACC_ERROR_US = 10 * US_PER_MS; // 10ms
51 constexpr uint32_t INITIAL_RECORD_LIMIT = 16;
52 constexpr uint32_t MAX_RECORD_LIMIT = 64;
53 constexpr int FIRST_THRESHOLD = 10;
54 constexpr int SECOND_THRESHOLD = 100;
55 }
56 
57 namespace ffrt {
WorkerMonitor()58 WorkerMonitor::WorkerMonitor()
59 {
60     if (WhiteList::GetInstance().IsEnabled("worker_monitor", false)) {
61         FFRT_SYSEVENT_LOGW("Skip worker monitor.");
62         skipSampling_ = true;
63         return;
64     }
65     taskTimeoutInfo_.reserve(INITIAL_RECORD_LIMIT);
66     uint64_t timeout = ffrt_task_timeout_get_threshold() * US_PER_MS;
67     timeoutUs_ = timeout;
68     if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
69         FFRT_LOGE("invalid watchdog timeout [%llu] us, using 1s instead", timeout);
70         timeoutUs_ = MIN_TIMEOUT_THRESHOLD_US;
71     }
72 
73     watchdogWaitEntry_.cb = ([this](WaitEntry* we) {
74         (void)we;
75         CheckWorkerStatus();
76         FFRTFacade::GetPPInstance().MonitTimeOut();
77     });
78     tskMonitorWaitEntry_.cb = ([this](WaitEntry* we) { CheckTaskStatus(); });
79     memReleaseWaitEntry_.cb = ([this](WaitEntry* we) {
80         std::lock_guard lock(mutex_);
81         if (skipSampling_) {
82             return;
83         }
84 
85         {
86             bool noWorkerThreads = true;
87             std::lock_guard submitTaskLock(submitTaskMutex_);
88             for (int i = 0; i < QoS::MaxNum(); i++) {
89                 CPUWorkerGroup& workerGroup = FFRTFacade::GetEUInstance().GetWorkerGroup(i);
90                 std::shared_lock<std::shared_mutex> lck(workerGroup.tgMutex);
91                 if (!workerGroup.threads.empty()) {
92                     noWorkerThreads = false;
93                     break;
94                 }
95             }
96             if (noWorkerThreads) {
97                 CoRoutineReleaseMem();
98                 memReleaseTaskExit_ = true;
99                 return;
100             }
101         }
102 
103         CoRoutineReleaseMem();
104         SubmitMemReleaseTask();
105     });
106 }
107 
~WorkerMonitor()108 WorkerMonitor::~WorkerMonitor()
109 {
110     FFRT_LOGW("WorkerMonitor destruction enter");
111     std::lock_guard lock(mutex_);
112     skipSampling_ = true;
113 }
114 
GetInstance()115 WorkerMonitor& WorkerMonitor::GetInstance()
116 {
117     static WorkerMonitor instance;
118     return instance;
119 }
120 
SubmitTask()121 void WorkerMonitor::SubmitTask()
122 {
123     if (skipSampling_) {
124         return;
125     }
126 
127     std::lock_guard submitTaskLock(submitTaskMutex_);
128     if (samplingTaskExit_) {
129         SubmitSamplingTask();
130         samplingTaskExit_ = false;
131     }
132     if (taskMonitorExit_) {
133         SubmitTaskMonitor(timeoutUs_);
134         taskMonitorExit_ = false;
135     }
136     if (memReleaseTaskExit_) {
137         SubmitMemReleaseTask();
138         memReleaseTaskExit_ = false;
139     }
140 }
141 
SubmitSamplingTask()142 void WorkerMonitor::SubmitSamplingTask()
143 {
144     watchdogWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(MONITOR_SAMPLING_CYCLE_US);
145     if (!DelayedWakeup(watchdogWaitEntry_.tp, &watchdogWaitEntry_, watchdogWaitEntry_.cb, true)) {
146         FFRT_LOGW("Set delayed worker failed.");
147     }
148 }
149 
SubmitTaskMonitor(uint64_t nextTimeoutUs)150 void WorkerMonitor::SubmitTaskMonitor(uint64_t nextTimeoutUs)
151 {
152     tskMonitorWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(nextTimeoutUs);
153     if (!DelayedWakeup(tskMonitorWaitEntry_.tp, &tskMonitorWaitEntry_, tskMonitorWaitEntry_.cb, true)) {
154         FFRT_LOGW("Set delayed worker failed.");
155     }
156 }
157 
SubmitMemReleaseTask()158 void WorkerMonitor::SubmitMemReleaseTask()
159 {
160     memReleaseWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(TIMEOUT_MEMSHRINK_CYCLE_US);
161     if (!DelayedWakeup(memReleaseWaitEntry_.tp, &memReleaseWaitEntry_, memReleaseWaitEntry_.cb, true)) {
162         FFRT_LOGW("Set delayed worker failed.");
163     }
164 }
165 
CheckWorkerStatus()166 void WorkerMonitor::CheckWorkerStatus()
167 {
168     std::lock_guard lock(mutex_);
169     if (skipSampling_) {
170         return;
171     }
172 
173     {
174         bool noWorkerThreads = true;
175         std::lock_guard submitTaskLock(submitTaskMutex_);
176         for (int i = 0; i < QoS::MaxNum(); i++) {
177             CPUWorkerGroup& workerGroup = FFRTFacade::GetEUInstance().GetWorkerGroup(i);
178             std::shared_lock<std::shared_mutex> lck(workerGroup.tgMutex);
179             if (!workerGroup.threads.empty()) {
180                 noWorkerThreads = false;
181                 break;
182             }
183         }
184         if (noWorkerThreads) {
185             samplingTaskExit_ = true;
186             return;
187         }
188     }
189 
190     if (samplingTaskCount_++ % RECORD_WORKER_STATUS_INFO_FREQ == 0) {
191         RecordWorkerStatusInfo();
192     }
193 
194     std::vector<TimeoutFunctionInfo> timeoutFunctions;
195     for (int i = 0; i < QoS::MaxNum(); i++) {
196         CPUWorkerGroup& workerGroup = FFRTFacade::GetEUInstance().GetWorkerGroup(i);
197         std::shared_lock<std::shared_mutex> lck(workerGroup.tgMutex);
198         CoWorkerInfo coWorkerInfo(i, workerGroup.threads.size(), workerGroup.executingNum, workerGroup.sleepingNum);
199         for (auto& thread : workerGroup.threads) {
200             CPUWorker* worker = thread.first;
201             if (!worker->Monitor()) {
202                 continue;
203             }
204 
205             TaskBase* workerTask = worker->curTask.load(std::memory_order_relaxed);
206             if (workerTask == nullptr) {
207                 workerStatus_.erase(worker);
208                 continue;
209             }
210 
211             RecordTimeoutFunctionInfo(coWorkerInfo, worker, workerTask, timeoutFunctions);
212         }
213     }
214 
215     if (timeoutFunctions.size() > 0) {
216         FFRTFacade::GetDWInstance().SubmitAsyncTask([this, timeoutFunctions] {
217             for (const auto& timeoutFunction : timeoutFunctions) {
218                 RecordSymbolAndBacktrace(timeoutFunction);
219             }
220         });
221     }
222 
223     SubmitSamplingTask();
224 }
225 
CheckTaskStatus()226 void WorkerMonitor::CheckTaskStatus()
227 {
228     std::lock_guard lock(mutex_);
229     if (skipSampling_) {
230         return;
231     }
232 
233     std::vector<CPUEUTask*> activeTask;
234     auto unfree = TaskFactory<CPUEUTask>::GetUnfreedTasksFiltered();
235     for (auto task : unfree) {
236         auto t = reinterpret_cast<CPUEUTask*>(task);
237         if (t->type == ffrt_normal_task && t->aliveStatus.load(std::memory_order_relaxed) == AliveStatus::INITED) {
238             activeTask.emplace_back(t);
239         }
240     }
241 
242     {
243         bool noWorkerThreads = true;
244         std::lock_guard submitTaskLock(submitTaskMutex_);
245 
246         for (int i = 0; i < QoS::MaxNum(); i++) {
247             CPUWorkerGroup& workerGroup = FFRTFacade::GetEUInstance().GetWorkerGroup(i);
248             std::shared_lock<std::shared_mutex> lck(workerGroup.tgMutex);
249             if (!workerGroup.threads.empty()) {
250                 noWorkerThreads = false;
251                 break;
252             }
253         }
254 
255         // no active worker, no active normal task, no need to monitor
256         if (noWorkerThreads && activeTask.empty()) {
257             taskMonitorExit_ = true;
258             for (auto& task : unfree) {
259                 reinterpret_cast<CPUEUTask*>(task)->DecDeleteRef();
260             }
261             return;
262         }
263     }
264 
265     uint64_t now = TimeStampCntvct();
266     uint64_t minStart = now - ((timeoutUs_ - ALLOW_ACC_ERROR_US));
267     uint64_t curMinTimeStamp = UINT64_MAX;
268 
269     for (auto task : activeTask) {
270         uint64_t curTimeStamp = CalculateTaskTimeout(task, minStart);
271         curMinTimeStamp = curTimeStamp < curMinTimeStamp ? curTimeStamp : curMinTimeStamp;
272     }
273     for (auto& task : unfree) {
274         reinterpret_cast<CPUEUTask*>(task)->DecDeleteRef();
275     }
276 
277     // 下次检查时间为所有当前任务中的最小状态时间
278     uint64_t nextTimeout = (curMinTimeStamp == UINT64_MAX) ? timeoutUs_ :
279         std::min(timeoutUs_, timeoutUs_ - (now - curMinTimeStamp));
280 
281     SubmitTaskMonitor(nextTimeout);
282 }
283 
CalculateTaskTimeout(CPUEUTask * task,uint64_t timeoutThreshold)284 uint64_t WorkerMonitor::CalculateTaskTimeout(CPUEUTask* task, uint64_t timeoutThreshold)
285 {
286     // 主动延时的任务不检测
287     if (!task->monitorTimeout_ || isDelayingTask(task) ||
288         (task->delayTime > 0 && task->curStatus == TaskStatus::SUBMITTED)) {
289         return UINT64_MAX;
290     }
291 
292     uint64_t curTaskTime = task->statusTime.load(std::memory_order_relaxed);
293     uint64_t timeoutCount = task->timeoutTask.timeoutCnt;
294 
295     if (curTaskTime + timeoutCount * timeoutUs_ < timeoutThreshold) {
296         RecordTimeoutTaskInfo(task);
297         return UINT64_MAX;
298     }
299     return curTaskTime;
300 }
301 
ControlTimeoutFreq(CPUEUTask * task)302 bool WorkerMonitor::ControlTimeoutFreq(CPUEUTask* task)
303 {
304     uint64_t timoutCnt = task->timeoutTask.timeoutCnt;
305     return (timoutCnt < FIRST_THRESHOLD) || (timoutCnt < SECOND_THRESHOLD && timoutCnt % FIRST_THRESHOLD == 0) ||
306         (timoutCnt % SECOND_THRESHOLD == 0);
307 }
308 
RecordTimeoutTaskInfo(CPUEUTask * task)309 void WorkerMonitor::RecordTimeoutTaskInfo(CPUEUTask* task)
310 {
311     TaskStatus curTaskStatus = task->curStatus;
312     uint64_t curTaskTime = task->statusTime.load(std::memory_order_relaxed);
313     TaskStatus preTaskStatus = task->preStatus.load(std::memory_order_relaxed);
314 
315     TimeoutTask& timeoutTskInfo = task->timeoutTask;
316     if (task->gid == timeoutTskInfo.taskGid && curTaskStatus == timeoutTskInfo.taskStatus) {
317         timeoutTskInfo.timeoutCnt += 1;
318     } else {
319         timeoutTskInfo.timeoutCnt = 1;
320         timeoutTskInfo.taskStatus = curTaskStatus;
321         timeoutTskInfo.taskGid = task->gid;
322     }
323 
324     if (!ControlTimeoutFreq(task)) {
325         return;
326     }
327 
328     std::stringstream ss;
329     uint64_t time = TimeStampCntvct();
330 
331     ss << "Normal task[" << task->label.c_str() << "], gid[" << task->gid << "], qos[" << task->GetQos() <<
332         "], delay[" << task->delayTime << "]us, current status[" << StatusToString(curTaskStatus) <<
333         "], start at[" << FormatDateString4SteadyClock(curTaskTime) << "]";
334 
335     if (taskTimeoutInfo_.size() > MAX_RECORD_LIMIT) {
336         taskTimeoutInfo_.erase(taskTimeoutInfo_.begin());
337     }
338     taskTimeoutInfo_.emplace_back(std::make_pair(time, ss.str()));
339 
340     ss << ", last status[" << StatusToString(preTaskStatus) << "], timeout for[" <<
341         timeoutUs_ / MIN_TIMEOUT_THRESHOLD_US << "]s, reported count: " << timeoutTskInfo.timeoutCnt;
342     FFRT_LOGW("%s", ss.str().c_str());
343     return;
344 }
345 
DumpTimeoutInfo()346 std::string WorkerMonitor::DumpTimeoutInfo()
347 {
348     std::lock_guard lock(mutex_);
349     std::stringstream ss;
350     if (taskTimeoutInfo_.size() != 0) {
351         for (auto it = taskTimeoutInfo_.rbegin(); it != taskTimeoutInfo_.rend(); ++it) {
352             auto& record = *it;
353             ss << "{" << FormatDateString4SteadyClock(record.first) << ", " << record.second << "} \n";
354         }
355     } else {
356         ss << "Timeout info Empty";
357     }
358     return ss.str();
359 }
360 
RecordTimeoutFunctionInfo(const CoWorkerInfo & coWorkerInfo,CPUWorker * worker,TaskBase * workerTask,std::vector<TimeoutFunctionInfo> & timeoutFunctions)361 void WorkerMonitor::RecordTimeoutFunctionInfo(const CoWorkerInfo& coWorkerInfo, CPUWorker* worker,
362     TaskBase* workerTask, std::vector<TimeoutFunctionInfo>& timeoutFunctions)
363 {
364     auto workerIter = workerStatus_.find(worker);
365     if (workerIter == workerStatus_.end()) {
366         workerStatus_[worker] = TaskTimeoutInfo(workerTask);
367         return;
368     }
369 
370     TaskTimeoutInfo& taskInfo = workerIter->second;
371     if (taskInfo.task_ == workerTask) {
372         if (++taskInfo.sampledTimes_ < SAMPLING_TIMES_PER_SEC) {
373             return;
374         }
375 
376         taskInfo.sampledTimes_ = 0;
377         if (++taskInfo.executionTime_ % TIMEOUT_RECORD_CYCLE_LIST[taskInfo.recordLevel_] == 0) {
378             WorkerInfo workerInfo(worker->Id(), worker->curTaskGid_,
379                 worker->curTaskType_.load(std::memory_order_relaxed), worker->curTaskLabel_);
380             timeoutFunctions.emplace_back(coWorkerInfo, workerInfo, taskInfo.executionTime_);
381             if (taskInfo.recordLevel_ < static_cast<int>(TIMEOUT_RECORD_CYCLE_LIST.size()) - 1) {
382                 taskInfo.recordLevel_++;
383             }
384         }
385 
386         return;
387     }
388 
389     if (taskInfo.executionTime_ > 0) {
390         FFRT_LOGI("Tid[%d] function is executed, which occupies worker for [%d]s.",
391             worker->Id(), taskInfo.executionTime_);
392     }
393     workerIter->second = TaskTimeoutInfo(workerTask);
394 }
395 
RecordSymbolAndBacktrace(const TimeoutFunctionInfo & timeoutFunction)396 void WorkerMonitor::RecordSymbolAndBacktrace(const TimeoutFunctionInfo& timeoutFunction)
397 {
398     std::stringstream ss;
399     std::string processNameStr = std::string(GetCurrentProcessName());
400     ss << "Task_Sch_Timeout: process name:[" << processNameStr << "], Tid:[" << timeoutFunction.workerInfo_.tid_ <<
401         "], Worker QoS Level:[" << timeoutFunction.coWorkerInfo_.qosLevel_ << "], Concurrent Worker Count:[" <<
402         timeoutFunction.coWorkerInfo_.coWorkerCount_ << "], Execution Worker Number:[" <<
403         timeoutFunction.coWorkerInfo_.executionNum_ << "], Sleeping Worker Number:[" <<
404         timeoutFunction.coWorkerInfo_.sleepingWorkerNum_ << "], Task Type:[" <<
405         timeoutFunction.workerInfo_.workerTaskType_ << "], ";
406 
407 #ifdef WORKER_CACHE_TASKNAMEID
408     if (timeoutFunction.workerInfo_.workerTaskType_ == ffrt_normal_task ||
409         timeoutFunction.workerInfo_.workerTaskType_ == ffrt_queue_task) {
410         ss << "Task Name:[" << timeoutFunction.workerInfo_.label_ <<
411             "], Task Id:[" << timeoutFunction.workerInfo_.gid_ << "], ";
412     }
413 #endif
414 
415     ss << "occupies worker for more than [" << timeoutFunction.executionTime_ << "]s";
416     FFRT_LOGW("%s", ss.str().c_str());
417 
418 #ifdef FFRT_OH_TRACE_ENABLE
419     std::string dumpInfo;
420     if (OHOS::HiviewDFX::GetBacktraceStringByTid(dumpInfo, timeoutFunction.workerInfo_.tid_, 0, false)) {
421         FFRT_LOGW("Backtrace:\n%s", dumpInfo.c_str());
422         if (timeoutFunction.executionTime_ >= RECORD_IPC_INFO_TIME_THRESHOLD) {
423             RecordIpcInfo(dumpInfo, timeoutFunction.workerInfo_.tid_);
424         }
425     }
426 #endif
427 #ifdef FFRT_SEND_EVENT
428     if (timeoutFunction.executionTime_ == HISYSEVENT_TIMEOUT_SEC) {
429         std::string senarioName = "Task_Sch_Timeout";
430         TaskTimeoutReport(ss, processNameStr, senarioName);
431     }
432 #endif
433 }
434 
RecordIpcInfo(const std::string & dumpInfo,int tid)435 void WorkerMonitor::RecordIpcInfo(const std::string& dumpInfo, int tid)
436 {
437     if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos) {
438         return;
439     }
440 
441     std::ifstream transactionFile(TRANSACTION_PATH);
442     FFRT_COND_DO_ERR(!transactionFile.is_open(), return, "open transaction_proc failed");
443 
444     FFRT_LOGW("transaction_proc:");
445     std::string line;
446     std::string regexStr = ".*" + std::to_string(tid) + ".*to.*code.*";
447     while (getline(transactionFile, line)) {
448         if (std::regex_match(line, std::regex(regexStr))) {
449             FFRT_LOGW("%s", line.c_str());
450         }
451     }
452 
453     transactionFile.close();
454 }
455 
RecordKeyInfo(const std::string & dumpInfo)456 void WorkerMonitor::RecordKeyInfo(const std::string& dumpInfo)
457 {
458     if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos || dumpInfo.find("libpower") == std::string::npos) {
459         return;
460     }
461 
462 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
463     std::string keyInfo = SaveKeyInfo();
464     FFRT_LOGW("%s", keyInfo.c_str());
465 #endif
466 }
467 
ProcessWorkerInfo(std::ostringstream & oss,bool & firstQos,int qos,unsigned int cnt,const std::deque<pid_t> & tids)468 void WorkerMonitor::ProcessWorkerInfo(std::ostringstream& oss, bool& firstQos, int qos, unsigned int cnt,
469     const std::deque<pid_t>& tids)
470 {
471     if (cnt == 0) {
472         return;
473     }
474 
475     if (!firstQos) {
476         oss << " ";
477     }
478     firstQos = false;
479 
480     oss << "qos:" << qos << " cnt:" << cnt << " tids:";
481     bool firstTid = true;
482     for (const auto& tid : tids) {
483         if (!firstTid) {
484             oss << ",";
485         }
486         firstTid = false;
487         oss << tid;
488     }
489 }
490 
RecordWorkerStatusInfo()491 void WorkerMonitor::RecordWorkerStatusInfo()
492 {
493     std::ostringstream startedOss;
494     std::ostringstream exitedOss;
495     bool startedFirstQos = true;
496     bool exitedFirstQos = true;
497 
498     for (int qos = 0; qos < QoS::MaxNum(); qos++) {
499         auto workerStatusInfo = FFRTFacade::GetEUInstance().GetWorkerStatusInfoAndReset(qos);
500         ProcessWorkerInfo(startedOss, startedFirstQos, qos, workerStatusInfo.startedCnt, workerStatusInfo.startedTids);
501         ProcessWorkerInfo(exitedOss, exitedFirstQos, qos, workerStatusInfo.exitedCnt, workerStatusInfo.exitedTids);
502     }
503 
504     if (!startedOss.str().empty()) {
505         FFRT_LOGW("worker start: %s", startedOss.str().c_str());
506     }
507     if (!exitedOss.str().empty()) {
508         FFRT_LOGW("worker exit: %s", exitedOss.str().c_str());
509     }
510 }
511 }
512