1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "queue/queue_monitor.h"
16 #include "queue/queue_handler.h"
17 #include "dfx/log/ffrt_log_api.h"
18 #include "util/slab.h"
19 #include "sync/sync.h"
20 #include "c/ffrt_dump.h"
21 #include "c/queue.h"
22 #include "internal_inc/osal.h"
23 #include "util/ffrt_facade.h"
24 #include "util/time_format.h"
25
26 namespace {
27 constexpr uint32_t US_PER_MS = 1000;
28 constexpr uint64_t ALLOW_ACC_ERROR_US = 10 * US_PER_MS; // 10ms
29 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000 * US_PER_MS; // 1s
30 constexpr uint32_t MAX_RECORD_LIMIT = 64;
31 constexpr uint32_t INITIAL_RECORD_LIMIT = 16;
32 }
33
34 namespace ffrt {
QueueMonitor()35 QueueMonitor::QueueMonitor()
36 {
37 FFRT_LOGI("QueueMonitor ctor enter");
38 we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
39 uint64_t timeout = ffrt_task_timeout_get_threshold() * US_PER_MS;
40 timeoutUs_ = timeout;
41 if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
42 FFRT_LOGE("invalid watchdog timeout [%llu] us, using 1s instead", timeout);
43 timeoutUs_ = MIN_TIMEOUT_THRESHOLD_US;
44 }
45 FFRT_LOGI("QueueMonitor ctor leave, watchdog timeout of %llu us has been set", timeoutUs_.load());
46 }
47
~QueueMonitor()48 QueueMonitor::~QueueMonitor()
49 {
50 FFRT_LOGI("destruction of QueueMonitor");
51 DelayedRemove(we_->tp, we_);
52 SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
53 }
54
GetInstance()55 QueueMonitor& QueueMonitor::GetInstance()
56 {
57 static QueueMonitor instance;
58 return instance;
59 }
60
RegisterQueue(QueueHandler * queue)61 void QueueMonitor::RegisterQueue(QueueHandler* queue)
62 {
63 std::lock_guard lock(infoMutex_);
64 queuesInfo_.push_back(queue);
65 FFRT_LOGD("queue [%s] register in QueueMonitor", queue->GetName().c_str());
66 }
67
DeregisterQueue(QueueHandler * queue)68 void QueueMonitor::DeregisterQueue(QueueHandler* queue)
69 {
70 std::lock_guard lock(infoMutex_);
71 auto it = std::find(queuesInfo_.begin(), queuesInfo_.end(), queue);
72 if (it != queuesInfo_.end()) {
73 queuesInfo_.erase(it);
74 }
75 }
76
UpdateQueueInfo()77 void QueueMonitor::UpdateQueueInfo()
78 {
79 if (suspendAlarm_.exchange(false)) {
80 uint64_t alarmTime = static_cast<uint64_t>(std::chrono::time_point_cast<std::chrono::microseconds>(
81 std::chrono::steady_clock::now()).time_since_epoch().count()) + timeoutUs_;
82 SetAlarm(alarmTime);
83 }
84 }
85
SetAlarm(uint64_t steadyUs)86 void QueueMonitor::SetAlarm(uint64_t steadyUs)
87 {
88 we_->tp = std::chrono::steady_clock::time_point() + std::chrono::microseconds(steadyUs);
89 we_->cb = ([this](WaitEntry* we) { ScheduleAlarm(); });
90
91 // generally does not fail
92 if (!DelayedWakeup(we_->tp, we_, we_->cb, true)) {
93 FFRT_LOGW("failed to set delayedworker");
94 }
95 }
96
ScheduleAlarm()97 void QueueMonitor::ScheduleAlarm()
98 {
99 uint64_t nextTaskStart = UINT64_MAX;
100 CheckTimeout(nextTaskStart);
101 FFRT_LOGD("queue monitor checked, going next");
102 // 所有队列都没有任务,暂停定时器
103 if (nextTaskStart == UINT64_MAX) {
104 suspendAlarm_.exchange(true);
105 return;
106 }
107
108 SetAlarm(nextTaskStart + timeoutUs_);
109 }
110
CheckTimeout(uint64_t & nextTaskStart)111 void QueueMonitor::CheckTimeout(uint64_t& nextTaskStart)
112 {
113 // 未来ALLOW_ACC_ERROR_US可能超时的任务,一起上报
114 uint64_t now = TimeStampCntvct();
115 uint64_t minStart = now - ((timeoutUs_ - ALLOW_ACC_ERROR_US));
116 std::vector<std::pair<std::pair<std::vector<uint64_t>, uint64_t>, std::stringstream>> curTaskTimeInfoVec;
117
118 {
119 std::shared_lock lock(infoMutex_);
120 for (auto& queueInfo : queuesInfo_) {
121 auto curTaskTimeStamp = queueInfo->EvaluateTaskTimeout(minStart, timeoutUs_,
122 timeoutMSG_);
123 curTaskTimeInfoVec.emplace_back(std::make_pair(curTaskTimeStamp, timeoutMSG_.str()));
124 }
125 }
126
127 {
128 std::unique_lock lock(infoMutex_);
129 for (auto& curTaskTimeInfo : curTaskTimeInfoVec) {
130 // first为gid,second为下次触发超时的时间
131 for (size_t i = 0; i < curTaskTimeInfo.first.first.size(); i++) {
132 if (curTaskTimeInfo.first.second < UINT64_MAX && curTaskTimeInfo.first.first[i] != 0) {
133 ReportEventTimeout(curTaskTimeInfo.first.first[i], curTaskTimeInfo.second);
134 if (taskTimeoutInfo_.size() > MAX_RECORD_LIMIT) {
135 taskTimeoutInfo_.erase(taskTimeoutInfo_.begin());
136 }
137 taskTimeoutInfo_.emplace_back(std::make_pair(now, curTaskTimeInfo.second.str()));
138 }
139
140 if (curTaskTimeInfo.first.second < nextTaskStart) {
141 nextTaskStart = curTaskTimeInfo.first.second;
142 }
143 }
144 }
145 }
146 }
147
ReportEventTimeout(uint64_t curGid,const std::stringstream & ss)148 void QueueMonitor::ReportEventTimeout(uint64_t curGid, const std::stringstream& ss)
149 {
150 std::string ssStr = ss.str();
151 if (ffrt_task_timeout_get_cb()) {
152 FFRTFacade::GetDWInstance().SubmitAsyncTask([curGid, ssStr] {
153 ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
154 if (func) {
155 func(curGid, ssStr.c_str(), ssStr.size());
156 }
157 });
158 }
159 }
160
DumpQueueTimeoutInfo()161 std::string QueueMonitor::DumpQueueTimeoutInfo()
162 {
163 std::shared_lock<std::shared_mutex> lock(infoMutex_);
164 std::stringstream ss;
165 if (taskTimeoutInfo_.size() != 0) {
166 for (auto it = taskTimeoutInfo_.rbegin(); it != taskTimeoutInfo_.rend(); ++it) {
167 auto& record = *it;
168 ss << "{" << FormatDateString4SteadyClock(record.first) << ", " << record.second << "} \n";
169 }
170 } else {
171 ss << "Queue Timeout info Empty";
172 }
173 return ss.str();
174 }
175
UpdateTimeoutUs()176 void QueueMonitor::UpdateTimeoutUs()
177 {
178 timeoutUs_ = ffrt_task_timeout_get_threshold() * US_PER_MS;
179 }
180 } // namespace ffrt
181