1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "queue_monitor.h"
16 #include <cstdint>
17 #include <sstream>
18 #include <iomanip>
19 #include "dfx/log/ffrt_log_api.h"
20 #include "internal_inc/osal.h"
21 #include "sync/sync.h"
22 #include "c/ffrt_watchdog.h"
23
24 namespace {
25 constexpr uint32_t INVAILD_TASK_ID = 0;
26 constexpr uint32_t TIME_CONVERT_UNIT = 1000;
27 constexpr uint64_t QUEUE_INFO_INITIAL_CAPACITY = 64;
28 constexpr uint64_t ALLOW_TIME_ACC_ERROR_US = 500;
29 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000;
30
GetDelayedTimeStamp(uint64_t delayUs)31 inline std::chrono::steady_clock::time_point GetDelayedTimeStamp(uint64_t delayUs)
32 {
33 return std::chrono::steady_clock::now() + std::chrono::microseconds(delayUs);
34 }
35 }
36
37 namespace ffrt {
QueueMonitor()38 QueueMonitor::QueueMonitor()
39 {
40 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
41 QueuesRunningInfo.reserve(QUEUE_INFO_INITIAL_CAPACITY);
42 uint64_t timeout = ffrt_watchdog_get_timeout() * TIME_CONVERT_UNIT;
43 if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
44 timeoutUs_ = 0;
45 FFRT_LOGE("failed to setup watchdog because [%llu] us less than precision threshold", timeout);
46 return;
47 }
48 timeoutUs_ = timeout;
49 SendDelayedWorker(GetDelayedTimeStamp(timeoutUs_));
50 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
51 }
52
GetInstance()53 QueueMonitor& QueueMonitor::GetInstance()
54 {
55 static QueueMonitor instance;
56 return instance;
57 }
58
RegisterQueueId(uint32_t queueId)59 void QueueMonitor::RegisterQueueId(uint32_t queueId)
60 {
61 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
62 if (queueId == QueuesRunningInfo.size()) {
63 QueuesRunningInfo.emplace_back(std::make_pair(INVAILD_TASK_ID, std::chrono::steady_clock::now()));
64 FFRT_LOGD("queue registration in monitor gid=%u in turn succ", queueId);
65 return;
66 }
67
68 // only need to ensure that the corresponding info index has been initialized after constructed.
69 if (queueId > QueuesRunningInfo.size()) {
70 for (uint32_t i = QueuesRunningInfo.size(); i <= queueId; ++i) {
71 QueuesRunningInfo.emplace_back(std::make_pair(INVAILD_TASK_ID, std::chrono::steady_clock::now()));
72 }
73 }
74 FFRT_LOGD("queue registration in monitor gid=%u by skip succ", queueId);
75 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
76 }
77
ResetQueueInfo(uint32_t queueId)78 void QueueMonitor::ResetQueueInfo(uint32_t queueId)
79 {
80 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
81 std::shared_lock lock(mutex_);
82 QueuesRunningInfo[queueId].first = INVAILD_TASK_ID;
83 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
84 }
85
UpdateQueueInfo(uint32_t queueId,const uint64_t & taskId)86 void QueueMonitor::UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId)
87 {
88 #ifdef FRRT_CO_BACKTRACE_OH_ENABLE
89 std::shared_lock lock(mutex_);
90 QueuesRunningInfo[queueId] = {taskId, std::chrono::steady_clock::now()};
91 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
92 }
93
SendDelayedWorker(time_point_t delay)94 void QueueMonitor::SendDelayedWorker(time_point_t delay)
95 {
96 #ifdef FRRT_CO_BACKTRACE_OH_ENABLE
97 static WaitUntilEntry we;
98 we.tp = delay;
99 we.cb = ([this](WaitEntry* we) { CheckQueuesStatus(); });
100
101 bool result = DelayedWakeup(we.tp, &we, we.cb);
102 // insurance mechanism, generally does not fail
103 while (!result) {
104 FFRT_LOGW("failed to set delayedworker because the given timestamp has passed");
105 we.tp = GetDelayedTimeStamp(ALLOW_TIME_ACC_ERROR_US);
106 result = DelayedWakeup(we.tp, &we, we.cb);
107 }
108 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
109 }
110
ResetTaskTimestampAfterWarning(uint32_t queueId,const uint64_t & taskId)111 void QueueMonitor::ResetTaskTimestampAfterWarning(uint32_t queueId, const uint64_t &taskId)
112 {
113 #ifdef FRRT_CO_BACKTRACE_OH_ENABLE
114 std::unique_lock lock(mutex_);
115 if (QueuesRunningInfo[i].first == taskId) {
116 QueuesRunningInfo[i].second += std::chrono::microseconds(timeoutUs_);
117 }
118 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
119 }
120
CheckQueuesStatus()121 void QueueMonitor::CheckQueuesStatus()
122 {
123 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
124 time_point_t oldestStartedTime = std::chrono::steady_clock::now();
125 time_point_t startThreshold = oldestStartedTime - std::chrono::microseconds(timeoutUs_ - ALLOW_TIME_ACC_ERROR_US);
126
127 uint64_t taskId = 0;
128 time_point_t taskTimestamp = oldestStartedTime;
129 for (uint32_t i = 0; i < QueuesRunningInfo.size(); ++i) {
130 {
131 std::unique_lock lock(mutex_);
132 taskId = QueuesRunningInfo[i].first;
133 time_point_t taskTimestamp = QueuesRunningInfo[i].second;
134 }
135
136 if (taskId == INVAILD_TASK_ID) {
137 continue;
138 }
139
140 if (taskTimestamp < startThreshold) {
141 std::stringstream ss;
142 ss << "SERIAL_TASK_TIMEOUT: serial queue qid=" << i <<
143 ", serial task gid=" << taskId << " execution " << timeoutUs_ << "us.";
144 FFRT_LOGE("%s", ss.str().c_str());
145
146 ffrt_watchdog_cb func = ffrt_watchdog_get_cb();
147 if (func) {
148 func(taskId, ss.str().c_str(), ss.str().size());
149 }
150 // reset timeout task timestampe for next warning
151 ResetTaskTimestampAfterWarning(i, taskId);
152 continue;
153 }
154
155 if (taskTimestamp < oldestStartedTime) {
156 oldestStartedTime = taskTimestamp;
157 }
158 }
159
160 time_point_t nextCheckTime = oldestStartedTime + std::chrono::microseconds(timeoutUs_);
161 SendDelayedWorker(nextCheckTime);
162 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
163 }
164 } // namespace ffrt
165