1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "queue_monitor.h"
16 #include <sstream>
17 #include "dfx/log/ffrt_log_api.h"
18 #include "util/slab.h"
19 #include "sync/sync.h"
20 #include "c/ffrt_dump.h"
21 #include "dfx/sysevent/sysevent.h"
22 #include "internal_inc/osal.h"
23 #include "util/ffrt_facade.h"
24
25 namespace {
26 constexpr uint32_t INVALID_TASK_ID = 0;
27 constexpr uint32_t TIME_CONVERT_UNIT = 1000;
28 constexpr uint64_t QUEUE_INFO_INITIAL_CAPACITY = 64;
29 constexpr uint64_t ALLOW_TIME_ACC_ERROR_US = 500;
30 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000;
31
GetDelayedTimeStamp(uint64_t delayUs)32 inline std::chrono::steady_clock::time_point GetDelayedTimeStamp(uint64_t delayUs)
33 {
34 return std::chrono::steady_clock::now() + std::chrono::microseconds(delayUs);
35 }
36 }
37
38 namespace ffrt {
QueueMonitor()39 QueueMonitor::QueueMonitor()
40 {
41 FFRT_LOGI("queue monitor ctor enter");
42 queuesRunningInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
43 queuesStructInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
44 lastReportedTask_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
45 we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
46 uint64_t timeout = ffrt_task_timeout_get_threshold() * TIME_CONVERT_UNIT;
47 if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
48 timeoutUs_ = 0;
49 FFRT_LOGE("failed to setup watchdog because [%llu] us less than precision threshold", timeout);
50 return;
51 }
52 timeoutUs_ = timeout;
53 FFRT_LOGI("queue monitor ctor leave, watchdog timeout %llu us", timeoutUs_);
54 }
55
~QueueMonitor()56 QueueMonitor::~QueueMonitor()
57 {
58 FFRT_LOGI("destruction of QueueMonitor");
59 DelayedRemove(we_->tp, we_);
60 SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
61 }
62
GetInstance()63 QueueMonitor& QueueMonitor::GetInstance()
64 {
65 static QueueMonitor instance;
66 return instance;
67 }
68
RegisterQueueId(uint32_t queueId,QueueHandler * queueStruct)69 void QueueMonitor::RegisterQueueId(uint32_t queueId, QueueHandler* queueStruct)
70 {
71 std::unique_lock lock(mutex_);
72 if (queueId == queuesRunningInfo_.size()) {
73 queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
74 queuesStructInfo_.emplace_back(queueStruct);
75 lastReportedTask_.emplace_back(INVALID_TASK_ID);
76 FFRT_LOGD("queue registration in monitor gid=%u in turn succ", queueId);
77 return;
78 }
79
80 // only need to ensure that the corresponding info index has been initialized after constructed.
81 if (queueId > queuesRunningInfo_.size()) {
82 for (uint32_t i = queuesRunningInfo_.size(); i <= queueId; ++i) {
83 queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
84 queuesStructInfo_.emplace_back(nullptr);
85 lastReportedTask_.emplace_back(INVALID_TASK_ID);
86 }
87 queuesStructInfo_[queueId] = queueStruct;
88 }
89 if (queuesStructInfo_[queueId] == nullptr) {
90 queuesStructInfo_[queueId] = queueStruct;
91 }
92 FFRT_LOGD("queue registration in monitor gid=%u by skip succ", queueId);
93 }
94
ResetQueueInfo(uint32_t queueId)95 void QueueMonitor::ResetQueueInfo(uint32_t queueId)
96 {
97 std::shared_lock lock(mutex_);
98 FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
99 "ResetQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
100 queuesRunningInfo_[queueId].first = INVALID_TASK_ID;
101 lastReportedTask_[queueId] = INVALID_TASK_ID;
102 }
103
ResetQueueStruct(uint32_t queueId)104 void QueueMonitor::ResetQueueStruct(uint32_t queueId)
105 {
106 std::shared_lock lock(mutex_);
107 FFRT_COND_DO_ERR((queuesStructInfo_.size() <= queueId), return,
108 "ResetQueueStruct queueId=%u access violation, StructInfo_.size=%u", queueId, queuesStructInfo_.size());
109 queuesStructInfo_[queueId] = nullptr;
110 }
111
UpdateQueueInfo(uint32_t queueId,const uint64_t & taskId)112 void QueueMonitor::UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId)
113 {
114 std::shared_lock lock(mutex_);
115 FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
116 "UpdateQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
117 TimePoint now = std::chrono::steady_clock::now();
118 queuesRunningInfo_[queueId] = {taskId, now};
119 if (exit_.exchange(false)) {
120 SendDelayedWorker(now + std::chrono::microseconds(timeoutUs_));
121 }
122 }
123
QueryQueueStatus(uint32_t queueId)124 uint64_t QueueMonitor::QueryQueueStatus(uint32_t queueId)
125 {
126 std::shared_lock lock(mutex_);
127 FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return INVALID_TASK_ID,
128 "QueryQueueStatus queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
129 return queuesRunningInfo_[queueId].first;
130 }
131
SendDelayedWorker(TimePoint delay)132 void QueueMonitor::SendDelayedWorker(TimePoint delay)
133 {
134 we_->tp = delay;
135 we_->cb = ([this](WaitEntry* we_) { CheckQueuesStatus(); });
136
137 bool result = DelayedWakeup(we_->tp, we_, we_->cb);
138 // insurance mechanism, generally does not fail
139 while (!result) {
140 FFRT_LOGW("failed to set delayedworker because the given timestamp has passed");
141 we_->tp = GetDelayedTimeStamp(ALLOW_TIME_ACC_ERROR_US);
142 result = DelayedWakeup(we_->tp, we_, we_->cb);
143 }
144 }
145
ResetTaskTimestampAfterWarning(uint32_t queueId,const uint64_t & taskId)146 void QueueMonitor::ResetTaskTimestampAfterWarning(uint32_t queueId, const uint64_t &taskId)
147 {
148 std::unique_lock lock(mutex_);
149 if (queuesRunningInfo_[queueId].first == taskId) {
150 queuesRunningInfo_[queueId].second += std::chrono::microseconds(timeoutUs_);
151 }
152 }
153
CheckQueuesStatus()154 void QueueMonitor::CheckQueuesStatus()
155 {
156 {
157 std::unique_lock lock(mutex_);
158 auto iter = std::find_if(queuesRunningInfo_.cbegin(), queuesRunningInfo_.cend(),
159 [](const auto& pair) { return pair.first != INVALID_TASK_ID; });
160 if (iter == queuesRunningInfo_.cend()) {
161 exit_ = true;
162 return;
163 }
164 }
165
166 TimePoint oldestStartedTime = std::chrono::steady_clock::now();
167 TimePoint startThreshold = oldestStartedTime - std::chrono::microseconds(timeoutUs_ - ALLOW_TIME_ACC_ERROR_US);
168 uint64_t taskId = 0;
169 uint32_t queueRunningInfoSize = 0;
170 TimePoint taskTimestamp = oldestStartedTime;
171 {
172 std::shared_lock lock(mutex_);
173 queueRunningInfoSize = queuesRunningInfo_.size();
174 }
175
176 // Displays information about queues whose tasks time out.
177 for (uint32_t i = 0; i < queueRunningInfoSize; ++i) {
178 {
179 std::unique_lock lock(mutex_);
180 taskId = queuesRunningInfo_[i].first;
181 taskTimestamp = queuesRunningInfo_[i].second;
182 }
183
184 if (taskId == INVALID_TASK_ID) {
185 continue;
186 }
187
188 if (taskTimestamp < startThreshold) {
189 std::stringstream ss;
190 std::string processNameStr = std::string(GetCurrentProcessName());
191 ss << "Serial_Queue_Timeout, process name:[" << processNameStr << "], serial queue qid:[" << i
192 << "], serial task gid:[" << taskId << "], execution:[" << timeoutUs_ << "] us.";
193 {
194 std::shared_lock lock(mutex_);
195 if (queuesStructInfo_[i] != nullptr) {
196 ss << queuesStructInfo_[i]->GetDfxInfo();
197 }
198 }
199 FFRT_LOGE("%s", ss.str().c_str());
200 #ifdef FFRT_SEND_EVENT
201 if (lastReportedTask_[i] != taskId) {
202 lastReportedTask_[i] = taskId;
203
204 std::string senarioName = "Serial_Queue_Timeout";
205 TaskTimeoutReport(ss, processNameStr, senarioName);
206 }
207 #endif
208 std::string ssStr = ss.str();
209 if (ffrt_task_timeout_get_cb()) {
210 FFRTFacade::GetDWInstance().SubmitAsyncTask([taskId, ssStr] {
211 ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
212 if (func) {
213 func(taskId, ssStr.c_str(), ssStr.size());
214 }
215 });
216 }
217 // reset timeout task timestamp for next warning
218 ResetTaskTimestampAfterWarning(i, taskId);
219 continue;
220 }
221
222 if (taskTimestamp < oldestStartedTime) {
223 oldestStartedTime = taskTimestamp;
224 }
225 }
226
227 SendDelayedWorker(oldestStartedTime + std::chrono::microseconds(timeoutUs_));
228 FFRT_LOGD("global watchdog completed queue status check and scheduled the next");
229 }
230
HasQueueActive()231 bool QueueMonitor::HasQueueActive()
232 {
233 std::unique_lock lock(mutex_);
234 for (uint32_t i = 0; i < queuesRunningInfo_.size(); ++i) {
235 if (queuesRunningInfo_[i].first != INVALID_TASK_ID) {
236 return true;
237 }
238 }
239 return false;
240 }
241 } // namespace ffrt
242