• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "queue_monitor.h"
16 #include <sstream>
17 #include "dfx/log/ffrt_log_api.h"
18 #include "util/slab.h"
19 #include "sync/sync.h"
20 #include "c/ffrt_dump.h"
21 #include "dfx/sysevent/sysevent.h"
22 #include "internal_inc/osal.h"
23 #include "util/ffrt_facade.h"
24 
25 namespace {
26 constexpr uint32_t INVALID_TASK_ID = 0;
27 constexpr uint32_t TIME_CONVERT_UNIT = 1000;
28 constexpr uint64_t QUEUE_INFO_INITIAL_CAPACITY = 64;
29 constexpr uint64_t ALLOW_TIME_ACC_ERROR_US = 500;
30 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000;
31 
GetDelayedTimeStamp(uint64_t delayUs)32 inline std::chrono::steady_clock::time_point GetDelayedTimeStamp(uint64_t delayUs)
33 {
34     return std::chrono::steady_clock::now() + std::chrono::microseconds(delayUs);
35 }
36 }
37 
38 namespace ffrt {
QueueMonitor()39 QueueMonitor::QueueMonitor()
40 {
41     FFRT_LOGI("queue monitor ctor enter");
42     queuesRunningInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
43     queuesStructInfo_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
44     lastReportedTask_.reserve(QUEUE_INFO_INITIAL_CAPACITY);
45     we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
46     uint64_t timeout = ffrt_task_timeout_get_threshold() * TIME_CONVERT_UNIT;
47     if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
48         timeoutUs_ = 0;
49         FFRT_LOGE("failed to setup watchdog because [%llu] us less than precision threshold", timeout);
50         return;
51     }
52     timeoutUs_ = timeout;
53     FFRT_LOGI("queue monitor ctor leave, watchdog timeout %llu us", timeoutUs_);
54 }
55 
~QueueMonitor()56 QueueMonitor::~QueueMonitor()
57 {
58     FFRT_LOGI("destruction of QueueMonitor");
59     DelayedRemove(we_->tp, we_);
60     SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
61 }
62 
GetInstance()63 QueueMonitor& QueueMonitor::GetInstance()
64 {
65     static QueueMonitor instance;
66     return instance;
67 }
68 
RegisterQueueId(uint32_t queueId,QueueHandler * queueStruct)69 void QueueMonitor::RegisterQueueId(uint32_t queueId, QueueHandler* queueStruct)
70 {
71     std::unique_lock lock(mutex_);
72     if (queueId == queuesRunningInfo_.size()) {
73         queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
74         queuesStructInfo_.emplace_back(queueStruct);
75         lastReportedTask_.emplace_back(INVALID_TASK_ID);
76         FFRT_LOGD("queue registration in monitor gid=%u in turn succ", queueId);
77         return;
78     }
79 
80     // only need to ensure that the corresponding info index has been initialized after constructed.
81     if (queueId > queuesRunningInfo_.size()) {
82         for (uint32_t i = queuesRunningInfo_.size(); i <= queueId; ++i) {
83             queuesRunningInfo_.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
84             queuesStructInfo_.emplace_back(nullptr);
85             lastReportedTask_.emplace_back(INVALID_TASK_ID);
86         }
87         queuesStructInfo_[queueId] = queueStruct;
88     }
89     if (queuesStructInfo_[queueId] == nullptr) {
90         queuesStructInfo_[queueId] = queueStruct;
91     }
92     FFRT_LOGD("queue registration in monitor gid=%u by skip succ", queueId);
93 }
94 
ResetQueueInfo(uint32_t queueId)95 void QueueMonitor::ResetQueueInfo(uint32_t queueId)
96 {
97     std::shared_lock lock(mutex_);
98     FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
99         "ResetQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
100     queuesRunningInfo_[queueId].first = INVALID_TASK_ID;
101     lastReportedTask_[queueId] = INVALID_TASK_ID;
102 }
103 
ResetQueueStruct(uint32_t queueId)104 void QueueMonitor::ResetQueueStruct(uint32_t queueId)
105 {
106     std::shared_lock lock(mutex_);
107     FFRT_COND_DO_ERR((queuesStructInfo_.size() <= queueId), return,
108         "ResetQueueStruct queueId=%u access violation, StructInfo_.size=%u", queueId, queuesStructInfo_.size());
109     queuesStructInfo_[queueId] = nullptr;
110 }
111 
UpdateQueueInfo(uint32_t queueId,const uint64_t & taskId)112 void QueueMonitor::UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId)
113 {
114     std::shared_lock lock(mutex_);
115     FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return,
116         "UpdateQueueInfo queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
117     TimePoint now = std::chrono::steady_clock::now();
118     queuesRunningInfo_[queueId] = {taskId, now};
119     if (exit_.exchange(false)) {
120         SendDelayedWorker(now + std::chrono::microseconds(timeoutUs_));
121     }
122 }
123 
QueryQueueStatus(uint32_t queueId)124 uint64_t QueueMonitor::QueryQueueStatus(uint32_t queueId)
125 {
126     std::shared_lock lock(mutex_);
127     FFRT_COND_DO_ERR((queuesRunningInfo_.size() <= queueId), return INVALID_TASK_ID,
128         "QueryQueueStatus queueId=%u access violation, RunningInfo_.size=%u", queueId, queuesRunningInfo_.size());
129     return queuesRunningInfo_[queueId].first;
130 }
131 
SendDelayedWorker(TimePoint delay)132 void QueueMonitor::SendDelayedWorker(TimePoint delay)
133 {
134     we_->tp = delay;
135     we_->cb = ([this](WaitEntry* we_) { CheckQueuesStatus(); });
136 
137     bool result = DelayedWakeup(we_->tp, we_, we_->cb);
138     // insurance mechanism, generally does not fail
139     while (!result) {
140         FFRT_LOGW("failed to set delayedworker because the given timestamp has passed");
141         we_->tp = GetDelayedTimeStamp(ALLOW_TIME_ACC_ERROR_US);
142         result = DelayedWakeup(we_->tp, we_, we_->cb);
143     }
144 }
145 
ResetTaskTimestampAfterWarning(uint32_t queueId,const uint64_t & taskId)146 void QueueMonitor::ResetTaskTimestampAfterWarning(uint32_t queueId, const uint64_t &taskId)
147 {
148     std::unique_lock lock(mutex_);
149     if (queuesRunningInfo_[queueId].first == taskId) {
150         queuesRunningInfo_[queueId].second += std::chrono::microseconds(timeoutUs_);
151     }
152 }
153 
CheckQueuesStatus()154 void QueueMonitor::CheckQueuesStatus()
155 {
156     {
157         std::unique_lock lock(mutex_);
158         auto iter = std::find_if(queuesRunningInfo_.cbegin(), queuesRunningInfo_.cend(),
159             [](const auto& pair) { return pair.first != INVALID_TASK_ID; });
160         if (iter == queuesRunningInfo_.cend()) {
161             exit_ = true;
162             return;
163         }
164     }
165 
166     TimePoint oldestStartedTime = std::chrono::steady_clock::now();
167     TimePoint startThreshold = oldestStartedTime - std::chrono::microseconds(timeoutUs_ - ALLOW_TIME_ACC_ERROR_US);
168     uint64_t taskId = 0;
169     uint32_t queueRunningInfoSize = 0;
170     TimePoint taskTimestamp = oldestStartedTime;
171     {
172         std::shared_lock lock(mutex_);
173         queueRunningInfoSize = queuesRunningInfo_.size();
174     }
175 
176     // Displays information about queues whose tasks time out.
177     for (uint32_t i = 0; i < queueRunningInfoSize; ++i) {
178         {
179             std::unique_lock lock(mutex_);
180             taskId = queuesRunningInfo_[i].first;
181             taskTimestamp = queuesRunningInfo_[i].second;
182         }
183 
184         if (taskId == INVALID_TASK_ID) {
185             continue;
186         }
187 
188         if (taskTimestamp < startThreshold) {
189             std::stringstream ss;
190             std::string processNameStr = std::string(GetCurrentProcessName());
191             ss << "Serial_Queue_Timeout, process name:[" << processNameStr << "], serial queue qid:[" << i
192                 << "], serial task gid:[" << taskId << "], execution:[" << timeoutUs_ << "] us.";
193             {
194                 std::shared_lock lock(mutex_);
195                 if (queuesStructInfo_[i] != nullptr) {
196                     ss << queuesStructInfo_[i]->GetDfxInfo();
197                 }
198             }
199             FFRT_LOGE("%s", ss.str().c_str());
200 #ifdef FFRT_SEND_EVENT
201             if (lastReportedTask_[i] != taskId) {
202                 lastReportedTask_[i] = taskId;
203 
204                 std::string senarioName = "Serial_Queue_Timeout";
205                 TaskTimeoutReport(ss, processNameStr, senarioName);
206             }
207 #endif
208             std::string ssStr = ss.str();
209             if (ffrt_task_timeout_get_cb()) {
210                 FFRTFacade::GetDWInstance().SubmitAsyncTask([taskId, ssStr] {
211                     ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
212                     if (func) {
213                         func(taskId, ssStr.c_str(), ssStr.size());
214                     }
215                 });
216             }
217             // reset timeout task timestamp for next warning
218             ResetTaskTimestampAfterWarning(i, taskId);
219             continue;
220         }
221 
222         if (taskTimestamp < oldestStartedTime) {
223             oldestStartedTime = taskTimestamp;
224         }
225     }
226 
227     SendDelayedWorker(oldestStartedTime + std::chrono::microseconds(timeoutUs_));
228     FFRT_LOGD("global watchdog completed queue status check and scheduled the next");
229 }
230 
HasQueueActive()231 bool QueueMonitor::HasQueueActive()
232 {
233     std::unique_lock lock(mutex_);
234     for (uint32_t i = 0; i < queuesRunningInfo_.size(); ++i) {
235         if (queuesRunningInfo_[i].first != INVALID_TASK_ID) {
236             return true;
237         }
238     }
239     return false;
240 }
241 } // namespace ffrt
242