• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "worker_monitor.h"
17 #include <cstring>
18 #include <iostream>
19 #include <fstream>
20 #include <sstream>
21 #include <regex>
22 #ifdef FFRT_OH_TRACE_ENABLE
23 #include "backtrace_local.h"
24 #endif
25 
26 #include "dfx/sysevent/sysevent.h"
27 #include "eu/execute_unit.h"
28 #include "eu/worker_manager.h"
29 #include "eu/co_routine_factory.h"
30 #include "internal_inc/osal.h"
31 #include "sched/scheduler.h"
32 #include "util/ffrt_facade.h"
33 #include "dfx/bbox/bbox.h"
34 
35 namespace {
36 constexpr int HISYSEVENT_TIMEOUT_SEC = 60;
37 constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
38 constexpr int MONITOR_SAMPLING_CYCLE_US = 500 * 1000;
39 constexpr  unsigned int RECORD_POLLER_INFO_FREQ = 120;
40 constexpr int SAMPLING_TIMES_PER_SEC = 1000 * 1000 / MONITOR_SAMPLING_CYCLE_US;
41 constexpr uint64_t TIMEOUT_MEMSHRINK_CYCLE_US = 60 * 1000 * 1000;
42 constexpr int RECORD_IPC_INFO_TIME_THRESHOLD = 600;
43 constexpr char IPC_STACK_NAME[] = "libipc_common";
44 constexpr char TRANSACTION_PATH[] = "/proc/transaction_proc";
45 constexpr char CONF_FILEPATH[] = "/etc/ffrt/worker_monitor.conf";
46 const std::vector<int> TIMEOUT_RECORD_CYCLE_LIST = { 1, 3, 5, 10, 30, 60, 10 * 60, 30 * 60 };
47 unsigned int g_samplingTaskCount = 0;
48 }
49 
50 namespace ffrt {
WorkerMonitor()51 WorkerMonitor::WorkerMonitor()
52 {
53     // 获取当前进程名称
54     char processName[PROCESS_NAME_BUFFER_LENGTH];
55     GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
56 
57     // 从配置文件读取黑名单比对
58     std::string skipProcess;
59     std::ifstream file(CONF_FILEPATH);
60     if (file.is_open()) {
61         while (std::getline(file, skipProcess)) {
62             if (strstr(processName, skipProcess.c_str()) != nullptr) {
63                 skipSampling_ = true;
64                 return;
65             }
66         }
67     } else {
68         FFRT_LOGW("worker_monitor.conf does not exist or file permission denied");
69     }
70 
71     watchdogWaitEntry_.cb = ([this](WaitEntry* we) { CheckWorkerStatus(); });
72     memReleaseWaitEntry_.cb = ([this](WaitEntry* we) {
73         std::lock_guard lock(mutex_);
74         if (skipSampling_) {
75             return;
76         }
77 
78         WorkerGroupCtl* workerGroup = FFRTFacade::GetEUInstance().GetGroupCtl();
79         {
80             bool noWorkerThreads = true;
81             std::lock_guard submitTaskLock(submitTaskMutex_);
82             for (int i = 0; i < QoS::MaxNum(); i++) {
83                 std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
84                 if (!workerGroup[i].threads.empty()) {
85                     noWorkerThreads = false;
86                     break;
87                 }
88             }
89             if (noWorkerThreads) {
90                 CoRoutineReleaseMem();
91                 samplingTaskExit_ = true;
92                 return;
93             }
94         }
95 
96         CoRoutineReleaseMem();
97         SubmitMemReleaseTask();
98     });
99 }
100 
~WorkerMonitor()101 WorkerMonitor::~WorkerMonitor()
102 {
103     FFRT_LOGW("WorkerMonitor destruction enter");
104     std::lock_guard lock(mutex_);
105     skipSampling_ = true;
106 }
107 
GetInstance()108 WorkerMonitor& WorkerMonitor::GetInstance()
109 {
110     static WorkerMonitor instance;
111     return instance;
112 }
113 
SubmitTask()114 void WorkerMonitor::SubmitTask()
115 {
116     if (skipSampling_) {
117         return;
118     }
119 
120     std::lock_guard submitTaskLock(submitTaskMutex_);
121     if (samplingTaskExit_) {
122         SubmitSamplingTask();
123         samplingTaskExit_ = false;
124     }
125     if (memReleaseTaskExit_) {
126         SubmitMemReleaseTask();
127         memReleaseTaskExit_ = false;
128     }
129 }
130 
SubmitSamplingTask()131 void WorkerMonitor::SubmitSamplingTask()
132 {
133     watchdogWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(MONITOR_SAMPLING_CYCLE_US);
134     if (!DelayedWakeup(watchdogWaitEntry_.tp, &watchdogWaitEntry_, watchdogWaitEntry_.cb)) {
135         FFRT_LOGW("Set delayed worker failed.");
136     }
137 }
138 
SubmitMemReleaseTask()139 void WorkerMonitor::SubmitMemReleaseTask()
140 {
141     memReleaseWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(TIMEOUT_MEMSHRINK_CYCLE_US);
142     if (!DelayedWakeup(memReleaseWaitEntry_.tp, &memReleaseWaitEntry_, memReleaseWaitEntry_.cb)) {
143         FFRT_LOGW("Set delayed worker failed.");
144     }
145 }
146 
CheckWorkerStatus()147 void WorkerMonitor::CheckWorkerStatus()
148 {
149     std::lock_guard lock(mutex_);
150     if (skipSampling_) {
151         return;
152     }
153 
154     WorkerGroupCtl* workerGroup = FFRTFacade::GetEUInstance().GetGroupCtl();
155     {
156         bool noWorkerThreads = true;
157         std::lock_guard submitTaskLock(submitTaskMutex_);
158         for (int i = 0; i < QoS::MaxNum(); i++) {
159             std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
160             if (!workerGroup[i].threads.empty()) {
161                 noWorkerThreads = false;
162                 break;
163             }
164         }
165         if (noWorkerThreads) {
166             samplingTaskExit_ = true;
167             return;
168         }
169     }
170 
171     if (g_samplingTaskCount++ % RECORD_POLLER_INFO_FREQ == 0) {
172         RecordPollerInfo();
173     }
174 
175     std::vector<TimeoutFunctionInfo> timeoutFunctions;
176     for (int i = 0; i < QoS::MaxNum(); i++) {
177         int executionNum = FFRTFacade::GetEUInstance().GetCPUMonitor()->WakedWorkerNum(i);
178         int sleepingWorkerNum = FFRTFacade::GetEUInstance().GetCPUMonitor()->SleepingWorkerNum(i);
179 
180         std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
181         CoWorkerInfo coWorkerInfo(i, workerGroup[i].threads.size(), executionNum, sleepingWorkerNum);
182         for (auto& thread : workerGroup[i].threads) {
183             WorkerThread* worker = thread.first;
184             CPUEUTask* workerTask = worker->curTask;
185             if (workerTask == nullptr) {
186                 workerStatus_.erase(worker);
187                 continue;
188             }
189 
190             RecordTimeoutFunctionInfo(coWorkerInfo, worker, workerTask, timeoutFunctions);
191         }
192     }
193 
194     for (const auto& timeoutFunction : timeoutFunctions) {
195         RecordSymbolAndBacktrace(timeoutFunction);
196     }
197 
198     SubmitSamplingTask();
199 }
200 
RecordTimeoutFunctionInfo(const CoWorkerInfo & coWorkerInfo,WorkerThread * worker,CPUEUTask * workerTask,std::vector<TimeoutFunctionInfo> & timeoutFunctions)201 void WorkerMonitor::RecordTimeoutFunctionInfo(const CoWorkerInfo& coWorkerInfo, WorkerThread* worker,
202     CPUEUTask* workerTask, std::vector<TimeoutFunctionInfo>& timeoutFunctions)
203 {
204     auto workerIter = workerStatus_.find(worker);
205     if (workerIter == workerStatus_.end()) {
206         workerStatus_[worker] = TaskTimeoutInfo(workerTask);
207         return;
208     }
209 
210     TaskTimeoutInfo& taskInfo = workerIter->second;
211     if (taskInfo.task_ == workerTask) {
212         if (++taskInfo.sampledTimes_ < SAMPLING_TIMES_PER_SEC) {
213             return;
214         }
215 
216         taskInfo.sampledTimes_ = 0;
217         if (++taskInfo.executionTime_ % TIMEOUT_RECORD_CYCLE_LIST[taskInfo.recordLevel_] == 0) {
218             WorkerInfo workerInfo(worker->Id(), worker->curTaskGid_, worker->curTaskType_, worker->curTaskLabel_);
219             timeoutFunctions.emplace_back(coWorkerInfo, workerInfo, taskInfo.executionTime_);
220             if (taskInfo.recordLevel_ < static_cast<int>(TIMEOUT_RECORD_CYCLE_LIST.size()) - 1) {
221                 taskInfo.recordLevel_++;
222             }
223         }
224 
225         return;
226     }
227 
228     if (taskInfo.executionTime_ > 0) {
229         FFRT_LOGI("Tid[%d] function is executed, which occupies worker for [%d]s.",
230             worker->Id(), taskInfo.executionTime_);
231     }
232     workerIter->second = TaskTimeoutInfo(workerTask);
233 }
234 
RecordSymbolAndBacktrace(const TimeoutFunctionInfo & timeoutFunction)235 void WorkerMonitor::RecordSymbolAndBacktrace(const TimeoutFunctionInfo& timeoutFunction)
236 {
237     std::stringstream ss;
238     char processName[PROCESS_NAME_BUFFER_LENGTH];
239     GetProcessName(processName, PROCESS_NAME_BUFFER_LENGTH);
240     ss << "Task_Sch_Timeout: process name:[" << processName << "], Tid:[" << timeoutFunction.workerInfo_.tid_ <<
241         "], Worker QoS Level:[" << timeoutFunction.coWorkerInfo_.qosLevel_ << "], Concurrent Worker Count:[" <<
242         timeoutFunction.coWorkerInfo_.coWorkerCount_ << "], Execution Worker Number:[" <<
243         timeoutFunction.coWorkerInfo_.executionNum_ << "], Sleeping Worker Number:[" <<
244         timeoutFunction.coWorkerInfo_.sleepingWorkerNum_ << "], Task Type:[" <<
245         timeoutFunction.workerInfo_.workerTaskType_ << "], ";
246 
247 #ifdef WORKER_CACHE_TASKNAMEID
248     if (timeoutFunction.workerInfo_.workerTaskType_ == ffrt_normal_task ||
249         timeoutFunction.workerInfo_.workerTaskType_ == ffrt_queue_task) {
250         ss << "Task Name:[" << timeoutFunction.workerInfo_.label_ <<
251             "], Task Id:[" << timeoutFunction.workerInfo_.gid_ << "], ";
252     }
253 #endif
254 
255     ss << "occupies worker for more than [" << timeoutFunction.executionTime_ << "]s";
256     FFRT_LOGW("%s", ss.str().c_str());
257 
258 #ifdef FFRT_OH_TRACE_ENABLE
259     std::string dumpInfo;
260     if (OHOS::HiviewDFX::GetBacktraceStringByTid(dumpInfo, timeoutFunction.workerInfo_.tid_, 0, false)) {
261         FFRT_LOGW("Backtrace:\n%s", dumpInfo.c_str());
262         if (timeoutFunction.executionTime_ >= RECORD_IPC_INFO_TIME_THRESHOLD) {
263             RecordIpcInfo(dumpInfo, timeoutFunction.workerInfo_.tid_);
264         }
265     }
266 
267     RecordKeyInfo(dumpInfo);
268 #endif
269 #ifdef FFRT_SEND_EVENT
270     if (timeoutFunction.executionTime_ == HISYSEVENT_TIMEOUT_SEC) {
271         std::string processNameStr = std::string(processName);
272         std::string senarioName = "Task_Sch_Timeout";
273         TaskTimeoutReport(ss, processNameStr, senarioName);
274     }
275 #endif
276 }
277 
RecordIpcInfo(const std::string & dumpInfo,int tid)278 void WorkerMonitor::RecordIpcInfo(const std::string& dumpInfo, int tid)
279 {
280     if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos) {
281         return;
282     }
283 
284     std::ifstream transactionFile(TRANSACTION_PATH);
285     FFRT_COND_DO_ERR(!transactionFile.is_open(), return, "open transaction_proc failed");
286 
287     FFRT_LOGW("transaction_proc:");
288     std::string line;
289     std::string regexStr = ".*" + std::to_string(tid) + ".*to.*code.*";
290     while (getline(transactionFile, line)) {
291         if (std::regex_match(line, std::regex(regexStr))) {
292             FFRT_LOGW("%s", line.c_str());
293         }
294     }
295 
296     transactionFile.close();
297 }
298 
RecordKeyInfo(const std::string & dumpInfo)299 void WorkerMonitor::RecordKeyInfo(const std::string& dumpInfo)
300 {
301     if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos || dumpInfo.find("libpower") == std::string::npos) {
302         return;
303     }
304 
305 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
306     std::string keyInfo = SaveKeyInfo();
307     FFRT_LOGW("%s", keyInfo.c_str());
308 #endif
309 }
310 
RecordPollerInfo()311 void WorkerMonitor::RecordPollerInfo()
312 {
313     std::stringstream ss;
314     for (int qos = 0; qos < QoS::MaxNum(); qos++) {
315         uint64_t pollCount = FFRTFacade::GetPPInstance().GetPoller(qos).GetPollCount();
316         if (pollCount > 0) {
317             ss << qos << ":" << pollCount << ";";
318         }
319     }
320 
321     std::string result = ss.str();
322     if (!result.empty()) {
323         FFRT_LOGW("%s", result.c_str());
324     }
325 }
326 }
327