• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "worker_monitor.h"
17 #include <cstring>
18 #include <iostream>
19 #include <fstream>
20 #include <sstream>
21 #include <regex>
22 #ifdef FFRT_OH_TRACE_ENABLE
23 #include "backtrace_local.h"
24 #endif
25 
26 #include "dfx/sysevent/sysevent.h"
27 #include "eu/execute_unit.h"
28 #include "eu/worker_manager.h"
29 #include "eu/co_routine_factory.h"
30 #include "internal_inc/osal.h"
31 #include "sched/scheduler.h"
32 #include "util/ffrt_facade.h"
33 #include "dfx/bbox/bbox.h"
34 
35 namespace {
36 constexpr int HISYSEVENT_TIMEOUT_SEC = 60;
37 constexpr int MONITOR_SAMPLING_CYCLE_US = 500 * 1000;
38 constexpr unsigned int RECORD_POLLER_INFO_FREQ = 120;
39 constexpr int SAMPLING_TIMES_PER_SEC = 1000 * 1000 / MONITOR_SAMPLING_CYCLE_US;
40 constexpr uint64_t TIMEOUT_MEMSHRINK_CYCLE_US = 60 * 1000 * 1000;
41 constexpr int RECORD_IPC_INFO_TIME_THRESHOLD = 600;
42 constexpr int BACKTRACE_TASK_QOS = 7;
43 constexpr char IPC_STACK_NAME[] = "libipc_common";
44 constexpr char TRANSACTION_PATH[] = "/proc/transaction_proc";
45 constexpr char CONF_FILEPATH[] = "/etc/ffrt/worker_monitor.conf";
46 const std::vector<int> TIMEOUT_RECORD_CYCLE_LIST = { 1, 3, 5, 10, 30, 60, 10 * 60, 30 * 60 };
47 unsigned int g_samplingTaskCount = 0;
48 }
49 
50 namespace ffrt {
WorkerMonitor()51 WorkerMonitor::WorkerMonitor()
52 {
53     // 获取当前进程名称
54     const char* processName = GetCurrentProcessName();
55     if (strlen(processName) == 0) {
56         FFRT_LOGW("Get process name failed, skip worker monitor.");
57         skipSampling_ = true;
58         return;
59     }
60 
61     // 从配置文件读取黑名单比对
62     std::string skipProcess;
63     std::ifstream file(CONF_FILEPATH);
64     if (file.is_open()) {
65         while (std::getline(file, skipProcess)) {
66             if (strstr(processName, skipProcess.c_str()) != nullptr) {
67                 skipSampling_ = true;
68                 return;
69             }
70         }
71     } else {
72         FFRT_LOGW("worker_monitor.conf does not exist or file permission denied");
73     }
74 
75     watchdogWaitEntry_.cb = ([this](WaitEntry* we) { CheckWorkerStatus(); });
76     memReleaseWaitEntry_.cb = ([this](WaitEntry* we) {
77         std::lock_guard lock(mutex_);
78         if (skipSampling_) {
79             return;
80         }
81 
82         WorkerGroupCtl* workerGroup = FFRTFacade::GetEUInstance().GetGroupCtl();
83         {
84             bool noWorkerThreads = true;
85             std::lock_guard submitTaskLock(submitTaskMutex_);
86             for (int i = 0; i < QoS::MaxNum(); i++) {
87                 std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
88                 if (!workerGroup[i].threads.empty()) {
89                     noWorkerThreads = false;
90                     break;
91                 }
92             }
93             if (noWorkerThreads) {
94                 CoRoutineReleaseMem();
95                 memReleaseTaskExit_ = true;
96                 return;
97             }
98         }
99 
100         CoRoutineReleaseMem();
101         SubmitMemReleaseTask();
102     });
103 }
104 
~WorkerMonitor()105 WorkerMonitor::~WorkerMonitor()
106 {
107     FFRT_LOGW("WorkerMonitor destruction enter");
108     std::lock_guard lock(mutex_);
109     skipSampling_ = true;
110 }
111 
GetInstance()112 WorkerMonitor& WorkerMonitor::GetInstance()
113 {
114     static WorkerMonitor instance;
115     return instance;
116 }
117 
SubmitTask()118 void WorkerMonitor::SubmitTask()
119 {
120     if (skipSampling_) {
121         return;
122     }
123 
124     std::lock_guard submitTaskLock(submitTaskMutex_);
125     if (samplingTaskExit_) {
126         SubmitSamplingTask();
127         samplingTaskExit_ = false;
128     }
129     if (memReleaseTaskExit_) {
130         SubmitMemReleaseTask();
131         memReleaseTaskExit_ = false;
132     }
133 }
134 
SubmitSamplingTask()135 void WorkerMonitor::SubmitSamplingTask()
136 {
137     watchdogWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(MONITOR_SAMPLING_CYCLE_US);
138     if (!DelayedWakeup(watchdogWaitEntry_.tp, &watchdogWaitEntry_, watchdogWaitEntry_.cb)) {
139         FFRT_LOGW("Set delayed worker failed.");
140     }
141 }
142 
SubmitMemReleaseTask()143 void WorkerMonitor::SubmitMemReleaseTask()
144 {
145     if (skipSampling_) {
146         return;
147     }
148     memReleaseWaitEntry_.tp = std::chrono::steady_clock::now() + std::chrono::microseconds(TIMEOUT_MEMSHRINK_CYCLE_US);
149     if (!DelayedWakeup(memReleaseWaitEntry_.tp, &memReleaseWaitEntry_, memReleaseWaitEntry_.cb)) {
150         FFRT_LOGW("Set delayed worker failed.");
151     }
152 }
153 
CheckWorkerStatus()154 void WorkerMonitor::CheckWorkerStatus()
155 {
156     std::lock_guard lock(mutex_);
157     if (skipSampling_) {
158         return;
159     }
160 
161     WorkerGroupCtl* workerGroup = FFRTFacade::GetEUInstance().GetGroupCtl();
162     {
163         bool noWorkerThreads = true;
164         std::lock_guard submitTaskLock(submitTaskMutex_);
165         for (int i = 0; i < QoS::MaxNum(); i++) {
166             std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
167             if (!workerGroup[i].threads.empty()) {
168                 noWorkerThreads = false;
169                 break;
170             }
171         }
172         if (noWorkerThreads) {
173             samplingTaskExit_ = true;
174             return;
175         }
176     }
177 
178     if (g_samplingTaskCount++ % RECORD_POLLER_INFO_FREQ == 0) {
179         RecordPollerInfo();
180     }
181 
182     std::vector<TimeoutFunctionInfo> timeoutFunctions;
183     for (int i = 0; i < QoS::MaxNum(); i++) {
184         int executionNum = FFRTFacade::GetEUInstance().GetCPUMonitor()->WakedWorkerNum(i);
185         int sleepingWorkerNum = FFRTFacade::GetEUInstance().GetCPUMonitor()->SleepingWorkerNum(i);
186 
187         std::shared_lock<std::shared_mutex> lck(workerGroup[i].tgMutex);
188         CoWorkerInfo coWorkerInfo(i, workerGroup[i].threads.size(), executionNum, sleepingWorkerNum);
189         for (auto& thread : workerGroup[i].threads) {
190             WorkerThread* worker = thread.first;
191             CPUEUTask* workerTask = static_cast<CPUEUTask*>(worker->curTask);
192             if (workerTask == nullptr) {
193                 workerStatus_.erase(worker);
194                 continue;
195             }
196 
197             RecordTimeoutFunctionInfo(coWorkerInfo, worker, workerTask, timeoutFunctions);
198         }
199     }
200 
201     if (timeoutFunctions.size() > 0) {
202         FFRTFacade::GetDWInstance().SubmitAsyncTask([this, timeoutFunctions] {
203             for (const auto& timeoutFunction : timeoutFunctions) {
204                 RecordSymbolAndBacktrace(timeoutFunction);
205             }
206         });
207     }
208 
209     SubmitSamplingTask();
210 }
211 
RecordTimeoutFunctionInfo(const CoWorkerInfo & coWorkerInfo,WorkerThread * worker,CPUEUTask * workerTask,std::vector<TimeoutFunctionInfo> & timeoutFunctions)212 void WorkerMonitor::RecordTimeoutFunctionInfo(const CoWorkerInfo& coWorkerInfo, WorkerThread* worker,
213     CPUEUTask* workerTask, std::vector<TimeoutFunctionInfo>& timeoutFunctions)
214 {
215     auto workerIter = workerStatus_.find(worker);
216     if (workerIter == workerStatus_.end()) {
217         workerStatus_[worker] = TaskTimeoutInfo(workerTask);
218         return;
219     }
220 
221     TaskTimeoutInfo& taskInfo = workerIter->second;
222     if (taskInfo.task_ == workerTask) {
223         if (++taskInfo.sampledTimes_ < SAMPLING_TIMES_PER_SEC) {
224             return;
225         }
226 
227         taskInfo.sampledTimes_ = 0;
228         if (++taskInfo.executionTime_ % TIMEOUT_RECORD_CYCLE_LIST[taskInfo.recordLevel_] == 0) {
229             WorkerInfo workerInfo(worker->Id(), worker->curTaskGid_, worker->curTaskType_, worker->curTaskLabel_);
230             timeoutFunctions.emplace_back(coWorkerInfo, workerInfo, taskInfo.executionTime_);
231             if (taskInfo.recordLevel_ < static_cast<int>(TIMEOUT_RECORD_CYCLE_LIST.size()) - 1) {
232                 taskInfo.recordLevel_++;
233             }
234         }
235 
236         return;
237     }
238 
239     if (taskInfo.executionTime_ > 0) {
240         FFRT_LOGI("Tid[%d] function is executed, which occupies worker for [%d]s.",
241             worker->Id(), taskInfo.executionTime_);
242     }
243     workerIter->second = TaskTimeoutInfo(workerTask);
244 }
245 
RecordSymbolAndBacktrace(const TimeoutFunctionInfo & timeoutFunction)246 void WorkerMonitor::RecordSymbolAndBacktrace(const TimeoutFunctionInfo& timeoutFunction)
247 {
248     std::stringstream ss;
249     std::string processNameStr = std::string(GetCurrentProcessName());
250     ss << "Task_Sch_Timeout: process name:[" << processNameStr << "], Tid:[" << timeoutFunction.workerInfo_.tid_ <<
251         "], Worker QoS Level:[" << timeoutFunction.coWorkerInfo_.qosLevel_ << "], Concurrent Worker Count:[" <<
252         timeoutFunction.coWorkerInfo_.coWorkerCount_ << "], Execution Worker Number:[" <<
253         timeoutFunction.coWorkerInfo_.executionNum_ << "], Sleeping Worker Number:[" <<
254         timeoutFunction.coWorkerInfo_.sleepingWorkerNum_ << "], Task Type:[" <<
255         timeoutFunction.workerInfo_.workerTaskType_ << "], ";
256 
257 #ifdef WORKER_CACHE_TASKNAMEID
258     if (timeoutFunction.workerInfo_.workerTaskType_ == ffrt_normal_task ||
259         timeoutFunction.workerInfo_.workerTaskType_ == ffrt_queue_task) {
260         ss << "Task Name:[" << timeoutFunction.workerInfo_.label_ <<
261             "], Task Id:[" << timeoutFunction.workerInfo_.gid_ << "], ";
262     }
263 #endif
264 
265     ss << "occupies worker for more than [" << timeoutFunction.executionTime_ << "]s";
266     FFRT_LOGW("%s", ss.str().c_str());
267 
268 #ifdef FFRT_OH_TRACE_ENABLE
269     std::string dumpInfo;
270     if (OHOS::HiviewDFX::GetBacktraceStringByTid(dumpInfo, timeoutFunction.workerInfo_.tid_, 0, false)) {
271         FFRT_LOGW("Backtrace:\n%s", dumpInfo.c_str());
272         if (timeoutFunction.executionTime_ >= RECORD_IPC_INFO_TIME_THRESHOLD) {
273             RecordIpcInfo(dumpInfo, timeoutFunction.workerInfo_.tid_);
274         }
275     }
276 #endif
277 #ifdef FFRT_SEND_EVENT
278     if (timeoutFunction.executionTime_ == HISYSEVENT_TIMEOUT_SEC) {
279         std::string senarioName = "Task_Sch_Timeout";
280         TaskTimeoutReport(ss, processNameStr, senarioName);
281     }
282 #endif
283 }
284 
RecordIpcInfo(const std::string & dumpInfo,int tid)285 void WorkerMonitor::RecordIpcInfo(const std::string& dumpInfo, int tid)
286 {
287     if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos) {
288         return;
289     }
290 
291     std::ifstream transactionFile(TRANSACTION_PATH);
292     FFRT_COND_DO_ERR(!transactionFile.is_open(), return, "open transaction_proc failed");
293 
294     FFRT_LOGW("transaction_proc:");
295     std::string line;
296     std::string regexStr = ".*" + std::to_string(tid) + ".*to.*code.*";
297     while (getline(transactionFile, line)) {
298         if (std::regex_match(line, std::regex(regexStr))) {
299             FFRT_LOGW("%s", line.c_str());
300         }
301     }
302 
303     transactionFile.close();
304 }
305 
RecordKeyInfo(const std::string & dumpInfo)306 void WorkerMonitor::RecordKeyInfo(const std::string& dumpInfo)
307 {
308     if (dumpInfo.find(IPC_STACK_NAME) == std::string::npos || dumpInfo.find("libpower") == std::string::npos) {
309         return;
310     }
311 
312 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
313     std::string keyInfo = SaveKeyInfo();
314     FFRT_LOGW("%s", keyInfo.c_str());
315 #endif
316 }
317 
RecordPollerInfo()318 void WorkerMonitor::RecordPollerInfo()
319 {
320     std::stringstream ss;
321     for (int qos = 0; qos < QoS::MaxNum(); qos++) {
322         uint64_t pollCount = FFRTFacade::GetPPInstance().GetPoller(qos).GetPollCount();
323         if (pollCount > 0) {
324             ss << qos << ":" << pollCount << ";";
325         }
326     }
327 
328     std::string result = ss.str();
329     if (!result.empty()) {
330         FFRT_LOGW("%s", result.c_str());
331     }
332 }
333 }
334