• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "queue/queue_monitor.h"
16 #include "queue/queue_handler.h"
17 #include "dfx/log/ffrt_log_api.h"
18 #include "util/slab.h"
19 #include "sync/sync.h"
20 #include "c/ffrt_dump.h"
21 #include "c/queue.h"
22 #include "internal_inc/osal.h"
23 #include "util/ffrt_facade.h"
24 #include "util/time_format.h"
25 
26 namespace {
27 constexpr uint32_t US_PER_MS = 1000;
28 constexpr uint64_t ALLOW_ACC_ERROR_US = 10 * US_PER_MS; // 10ms
29 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000 * US_PER_MS; // 1s
30 constexpr uint32_t MAX_RECORD_LIMIT = 64;
31 constexpr uint32_t INITIAL_RECORD_LIMIT = 16;
32 }
33 
34 namespace ffrt {
QueueMonitor()35 QueueMonitor::QueueMonitor()
36 {
37     FFRT_LOGI("QueueMonitor ctor enter");
38     we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
39     uint64_t timeout = ffrt_task_timeout_get_threshold() * US_PER_MS;
40     timeoutUs_ = timeout;
41     if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
42         FFRT_LOGE("invalid watchdog timeout [%llu] us, using 1s instead", timeout);
43         timeoutUs_ = MIN_TIMEOUT_THRESHOLD_US;
44     }
45     FFRT_LOGI("QueueMonitor ctor leave, watchdog timeout of %llu us has been set", timeoutUs_.load());
46 }
47 
~QueueMonitor()48 QueueMonitor::~QueueMonitor()
49 {
50     FFRT_LOGI("destruction of QueueMonitor");
51     DelayedRemove(we_->tp, we_);
52     SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
53 }
54 
GetInstance()55 QueueMonitor& QueueMonitor::GetInstance()
56 {
57     static QueueMonitor instance;
58     return instance;
59 }
60 
RegisterQueue(QueueHandler * queue)61 void QueueMonitor::RegisterQueue(QueueHandler* queue)
62 {
63     std::lock_guard lock(infoMutex_);
64     queuesInfo_.push_back(queue);
65     FFRT_LOGD("queue [%s] register in QueueMonitor", queue->GetName().c_str());
66 }
67 
DeregisterQueue(QueueHandler * queue)68 void QueueMonitor::DeregisterQueue(QueueHandler* queue)
69 {
70     std::lock_guard lock(infoMutex_);
71     auto it = std::find(queuesInfo_.begin(), queuesInfo_.end(), queue);
72     if (it != queuesInfo_.end()) {
73         queuesInfo_.erase(it);
74     }
75 }
76 
UpdateQueueInfo()77 void QueueMonitor::UpdateQueueInfo()
78 {
79     if (suspendAlarm_.exchange(false)) {
80         uint64_t alarmTime = static_cast<uint64_t>(std::chrono::time_point_cast<std::chrono::microseconds>(
81             std::chrono::steady_clock::now()).time_since_epoch().count()) + timeoutUs_;
82         SetAlarm(alarmTime);
83     }
84 }
85 
SetAlarm(uint64_t steadyUs)86 void QueueMonitor::SetAlarm(uint64_t steadyUs)
87 {
88     we_->tp = std::chrono::steady_clock::time_point() + std::chrono::microseconds(steadyUs);
89     we_->cb = ([this](WaitEntry* we) { ScheduleAlarm(); });
90 
91     // generally does not fail
92     if (!DelayedWakeup(we_->tp, we_, we_->cb, true)) {
93         FFRT_LOGW("failed to set delayedworker");
94     }
95 }
96 
ScheduleAlarm()97 void QueueMonitor::ScheduleAlarm()
98 {
99     uint64_t nextTaskStart = UINT64_MAX;
100     CheckTimeout(nextTaskStart);
101     FFRT_LOGD("queue monitor checked, going next");
102     // 所有队列都没有任务,暂停定时器
103     if (nextTaskStart == UINT64_MAX) {
104         suspendAlarm_.exchange(true);
105         return;
106     }
107 
108     SetAlarm(nextTaskStart + timeoutUs_);
109 }
110 
CheckTimeout(uint64_t & nextTaskStart)111 void QueueMonitor::CheckTimeout(uint64_t& nextTaskStart)
112 {
113     // 未来ALLOW_ACC_ERROR_US可能超时的任务,一起上报
114     uint64_t now = TimeStampCntvct();
115     uint64_t minStart = now - ((timeoutUs_ - ALLOW_ACC_ERROR_US));
116     std::vector<std::pair<std::pair<std::vector<uint64_t>, uint64_t>, std::stringstream>> curTaskTimeInfoVec;
117 
118     {
119         std::shared_lock lock(infoMutex_);
120         for (auto& queueInfo : queuesInfo_) {
121             auto curTaskTimeStamp = queueInfo->EvaluateTaskTimeout(minStart, timeoutUs_,
122             timeoutMSG_);
123             curTaskTimeInfoVec.emplace_back(std::make_pair(curTaskTimeStamp, timeoutMSG_.str()));
124         }
125     }
126 
127     {
128         std::unique_lock lock(infoMutex_);
129         for (auto& curTaskTimeInfo : curTaskTimeInfoVec) {
130             // first为gid,second为下次触发超时的时间
131             for (size_t i = 0; i < curTaskTimeInfo.first.first.size(); i++) {
132                 if (curTaskTimeInfo.first.second < UINT64_MAX && curTaskTimeInfo.first.first[i] != 0) {
133                     ReportEventTimeout(curTaskTimeInfo.first.first[i], curTaskTimeInfo.second);
134                     if (taskTimeoutInfo_.size() > MAX_RECORD_LIMIT) {
135                         taskTimeoutInfo_.erase(taskTimeoutInfo_.begin());
136                     }
137                     taskTimeoutInfo_.emplace_back(std::make_pair(now, curTaskTimeInfo.second.str()));
138                 }
139 
140                 if (curTaskTimeInfo.first.second < nextTaskStart) {
141                     nextTaskStart = curTaskTimeInfo.first.second;
142                 }
143             }
144         }
145     }
146 }
147 
ReportEventTimeout(uint64_t curGid,const std::stringstream & ss)148 void QueueMonitor::ReportEventTimeout(uint64_t curGid, const std::stringstream& ss)
149 {
150     std::string ssStr = ss.str();
151     if (ffrt_task_timeout_get_cb()) {
152         FFRTFacade::GetDWInstance().SubmitAsyncTask([curGid, ssStr] {
153             ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
154             if (func) {
155                 func(curGid, ssStr.c_str(), ssStr.size());
156             }
157         });
158     }
159 }
160 
DumpQueueTimeoutInfo()161 std::string QueueMonitor::DumpQueueTimeoutInfo()
162 {
163     std::shared_lock<std::shared_mutex> lock(infoMutex_);
164     std::stringstream ss;
165     if (taskTimeoutInfo_.size() != 0) {
166         for (auto it = taskTimeoutInfo_.rbegin(); it != taskTimeoutInfo_.rend(); ++it) {
167             auto& record = *it;
168             ss << "{" << FormatDateString4SteadyClock(record.first) << ", " << record.second << "} \n";
169         }
170     } else {
171         ss << "Queue Timeout info Empty";
172     }
173     return ss.str();
174 }
175 
UpdateTimeoutUs()176 void QueueMonitor::UpdateTimeoutUs()
177 {
178     timeoutUs_ = ffrt_task_timeout_get_threshold() * US_PER_MS;
179 }
180 } // namespace ffrt
181