1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "queue_monitor.h"
16 #include <cstdint>
17 #include <sstream>
18 #include <iomanip>
19 #include "dfx/log/ffrt_log_api.h"
20 #include "internal_inc/osal.h"
21 #include "sync/sync.h"
22 #include "c/ffrt_watchdog.h"
23
24 namespace {
25 constexpr uint32_t INVALID_TASK_ID = 0;
26 constexpr uint32_t TIME_CONVERT_UNIT = 1000;
27 constexpr uint64_t QUEUE_INFO_INITIAL_CAPACITY = 64;
28 constexpr uint64_t ALLOW_TIME_ACC_ERROR_US = 500;
29 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000;
30
GetDelayedTimeStamp(uint64_t delayUs)31 inline std::chrono::steady_clock::time_point GetDelayedTimeStamp(uint64_t delayUs)
32 {
33 return std::chrono::steady_clock::now() + std::chrono::microseconds(delayUs);
34 }
35 }
36
37 namespace ffrt {
QueueMonitor()38 QueueMonitor::QueueMonitor()
39 {
40 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
41 QueuesRunningInfo.reserve(QUEUE_INFO_INITIAL_CAPACITY);
42 uint64_t timeout = ffrt_watchdog_get_timeout() * TIME_CONVERT_UNIT;
43 if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
44 timeoutUs_ = 0;
45 FFRT_LOGE("failed to setup watchdog because [%llu] us less than precision threshold", timeout);
46 return;
47 }
48 timeoutUs_ = timeout;
49 SendDelayedWorker(GetDelayedTimeStamp(timeoutUs_));
50 FFRT_LOGD("send delayedworker with %llu us", timeoutUs_);
51 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
52 }
53
GetInstance()54 QueueMonitor& QueueMonitor::GetInstance()
55 {
56 static QueueMonitor instance;
57 return instance;
58 }
59
RegisterQueueId(uint32_t queueId)60 void QueueMonitor::RegisterQueueId(uint32_t queueId)
61 {
62 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
63 std::unique_lock lock(mutex_);
64 if (queueId == QueuesRunningInfo.size()) {
65 QueuesRunningInfo.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
66 FFRT_LOGD("queue registration in monitor gid=%u in turn succ", queueId);
67 return;
68 }
69
70 // only need to ensure that the corresponding info index has been initialized after constructed.
71 if (queueId > QueuesRunningInfo.size()) {
72 for (uint32_t i = QueuesRunningInfo.size(); i <= queueId; ++i) {
73 QueuesRunningInfo.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
74 }
75 }
76 FFRT_LOGD("queue registration in monitor gid=%u by skip succ", queueId);
77 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
78 }
79
ResetQueueInfo(uint32_t queueId)80 void QueueMonitor::ResetQueueInfo(uint32_t queueId)
81 {
82 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
83 std::shared_lock lock(mutex_);
84 QueuesRunningInfo[queueId].first = INVALID_TASK_ID;
85 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
86 }
87
UpdateQueueInfo(uint32_t queueId,const uint64_t & taskId)88 void QueueMonitor::UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId)
89 {
90 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
91 std::shared_lock lock(mutex_);
92 QueuesRunningInfo[queueId] = {taskId, std::chrono::steady_clock::now()};
93 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
94 }
95
SendDelayedWorker(time_point_t delay)96 void QueueMonitor::SendDelayedWorker(time_point_t delay)
97 {
98 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
99 static WaitUntilEntry we;
100 we.tp = delay;
101 we.cb = ([this](WaitEntry* we) { CheckQueuesStatus(); });
102
103 bool result = DelayedWakeup(we.tp, &we, we.cb);
104 // insurance mechanism, generally does not fail
105 while (!result) {
106 FFRT_LOGW("failed to set delayedworker because the given timestamp has passed");
107 we.tp = GetDelayedTimeStamp(ALLOW_TIME_ACC_ERROR_US);
108 result = DelayedWakeup(we.tp, &we, we.cb);
109 }
110 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
111 }
112
ResetTaskTimestampAfterWarning(uint32_t queueId,const uint64_t & taskId)113 void QueueMonitor::ResetTaskTimestampAfterWarning(uint32_t queueId, const uint64_t &taskId)
114 {
115 std::unique_lock lock(mutex_);
116 if (QueuesRunningInfo[queueId].first == taskId) {
117 QueuesRunningInfo[queueId].second += std::chrono::microseconds(timeoutUs_);
118 }
119 }
120
CheckQueuesStatus()121 void QueueMonitor::CheckQueuesStatus()
122 {
123 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
124 time_point_t oldestStartedTime = std::chrono::steady_clock::now();
125 time_point_t startThreshold = oldestStartedTime - std::chrono::microseconds(timeoutUs_ - ALLOW_TIME_ACC_ERROR_US);
126
127 uint64_t taskId = 0;
128 time_point_t taskTimestamp = oldestStartedTime;
129 for (uint32_t i = 0; i < QueuesRunningInfo.size(); ++i) {
130 {
131 std::unique_lock lock(mutex_);
132 taskId = QueuesRunningInfo[i].first;
133 taskTimestamp = QueuesRunningInfo[i].second;
134 }
135
136 if (taskId == INVALID_TASK_ID) {
137 continue;
138 }
139
140 if (taskTimestamp < startThreshold) {
141 std::stringstream ss;
142 ss << "SERIAL_TASK_TIMEOUT: serial queue qid=" << i << ", serial task gid=" << taskId << " execution " <<
143 timeoutUs_ << " us.";
144 FFRT_LOGE("%s", ss.str().c_str());
145
146 ffrt_watchdog_cb func = ffrt_watchdog_get_cb();
147 if (func) {
148 func(taskId, ss.str().c_str(), ss.str().size());
149 }
150 // reset timeout task timestamp for next warning
151 ResetTaskTimestampAfterWarning(i, taskId);
152 continue;
153 }
154
155 if (taskTimestamp < oldestStartedTime) {
156 oldestStartedTime = taskTimestamp;
157 }
158 }
159
160 time_point_t nextCheckTime = oldestStartedTime + std::chrono::microseconds(timeoutUs_);
161 SendDelayedWorker(nextCheckTime);
162 FFRT_LOGD("global watchdog completed queue status check and scheduled the next");
163 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
164 }
165 } // namespace ffrt
166