• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "queue_monitor.h"
16 #include <cstdint>
17 #include <sstream>
18 #include <iomanip>
19 #include "dfx/log/ffrt_log_api.h"
20 #include "internal_inc/osal.h"
21 #include "sync/sync.h"
22 #include "c/ffrt_watchdog.h"
23 
24 namespace {
25 constexpr uint32_t INVALID_TASK_ID = 0;
26 constexpr uint32_t TIME_CONVERT_UNIT = 1000;
27 constexpr uint64_t QUEUE_INFO_INITIAL_CAPACITY = 64;
28 constexpr uint64_t ALLOW_TIME_ACC_ERROR_US = 500;
29 constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000;
30 
GetDelayedTimeStamp(uint64_t delayUs)31 inline std::chrono::steady_clock::time_point GetDelayedTimeStamp(uint64_t delayUs)
32 {
33     return std::chrono::steady_clock::now() + std::chrono::microseconds(delayUs);
34 }
35 }
36 
37 namespace ffrt {
QueueMonitor()38 QueueMonitor::QueueMonitor()
39 {
40 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
41     QueuesRunningInfo.reserve(QUEUE_INFO_INITIAL_CAPACITY);
42     uint64_t timeout = ffrt_watchdog_get_timeout() * TIME_CONVERT_UNIT;
43     if (timeout < MIN_TIMEOUT_THRESHOLD_US) {
44         timeoutUs_ = 0;
45         FFRT_LOGE("failed to setup watchdog because [%llu] us less than precision threshold", timeout);
46         return;
47     }
48     timeoutUs_ = timeout;
49     SendDelayedWorker(GetDelayedTimeStamp(timeoutUs_));
50     FFRT_LOGD("send delayedworker with %llu us", timeoutUs_);
51 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
52 }
53 
GetInstance()54 QueueMonitor& QueueMonitor::GetInstance()
55 {
56     static QueueMonitor instance;
57     return instance;
58 }
59 
RegisterQueueId(uint32_t queueId)60 void QueueMonitor::RegisterQueueId(uint32_t queueId)
61 {
62 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
63     std::unique_lock lock(mutex_);
64     if (queueId == QueuesRunningInfo.size()) {
65         QueuesRunningInfo.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
66         FFRT_LOGD("queue registration in monitor gid=%u in turn succ", queueId);
67         return;
68     }
69 
70     // only need to ensure that the corresponding info index has been initialized after constructed.
71     if (queueId > QueuesRunningInfo.size()) {
72         for (uint32_t i = QueuesRunningInfo.size(); i <= queueId; ++i) {
73             QueuesRunningInfo.emplace_back(std::make_pair(INVALID_TASK_ID, std::chrono::steady_clock::now()));
74         }
75     }
76     FFRT_LOGD("queue registration in monitor gid=%u by skip succ", queueId);
77 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
78 }
79 
ResetQueueInfo(uint32_t queueId)80 void QueueMonitor::ResetQueueInfo(uint32_t queueId)
81 {
82 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
83     std::shared_lock lock(mutex_);
84     QueuesRunningInfo[queueId].first = INVALID_TASK_ID;
85 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
86 }
87 
UpdateQueueInfo(uint32_t queueId,const uint64_t & taskId)88 void QueueMonitor::UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId)
89 {
90 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
91     std::shared_lock lock(mutex_);
92     QueuesRunningInfo[queueId] = {taskId, std::chrono::steady_clock::now()};
93 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
94 }
95 
SendDelayedWorker(time_point_t delay)96 void QueueMonitor::SendDelayedWorker(time_point_t delay)
97 {
98 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
99     static WaitUntilEntry we;
100     we.tp = delay;
101     we.cb = ([this](WaitEntry* we) { CheckQueuesStatus(); });
102 
103     bool result = DelayedWakeup(we.tp, &we, we.cb);
104     // insurance mechanism, generally does not fail
105     while (!result) {
106         FFRT_LOGW("failed to set delayedworker because the given timestamp has passed");
107         we.tp = GetDelayedTimeStamp(ALLOW_TIME_ACC_ERROR_US);
108         result = DelayedWakeup(we.tp, &we, we.cb);
109     }
110 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
111 }
112 
ResetTaskTimestampAfterWarning(uint32_t queueId,const uint64_t & taskId)113 void QueueMonitor::ResetTaskTimestampAfterWarning(uint32_t queueId, const uint64_t &taskId)
114 {
115     std::unique_lock lock(mutex_);
116     if (QueuesRunningInfo[queueId].first == taskId) {
117         QueuesRunningInfo[queueId].second += std::chrono::microseconds(timeoutUs_);
118     }
119 }
120 
CheckQueuesStatus()121 void QueueMonitor::CheckQueuesStatus()
122 {
123 #ifdef FFRT_CO_BACKTRACE_OH_ENABLE
124     time_point_t oldestStartedTime = std::chrono::steady_clock::now();
125     time_point_t startThreshold = oldestStartedTime - std::chrono::microseconds(timeoutUs_ - ALLOW_TIME_ACC_ERROR_US);
126 
127     uint64_t taskId = 0;
128     time_point_t taskTimestamp = oldestStartedTime;
129     for (uint32_t i = 0; i < QueuesRunningInfo.size(); ++i) {
130         {
131             std::unique_lock lock(mutex_);
132             taskId = QueuesRunningInfo[i].first;
133             taskTimestamp = QueuesRunningInfo[i].second;
134         }
135 
136         if (taskId == INVALID_TASK_ID) {
137             continue;
138         }
139 
140         if (taskTimestamp < startThreshold) {
141             std::stringstream ss;
142             ss << "SERIAL_TASK_TIMEOUT: serial queue qid=" << i << ", serial task gid=" << taskId << " execution " <<
143                 timeoutUs_ << " us.";
144             FFRT_LOGE("%s", ss.str().c_str());
145 
146             ffrt_watchdog_cb func = ffrt_watchdog_get_cb();
147             if (func) {
148                 func(taskId, ss.str().c_str(), ss.str().size());
149             }
150             // reset timeout task timestamp for next warning
151             ResetTaskTimestampAfterWarning(i, taskId);
152             continue;
153         }
154 
155         if (taskTimestamp < oldestStartedTime) {
156             oldestStartedTime = taskTimestamp;
157         }
158     }
159 
160     time_point_t nextCheckTime = oldestStartedTime + std::chrono::microseconds(timeoutUs_);
161     SendDelayedWorker(nextCheckTime);
162     FFRT_LOGD("global watchdog completed queue status check and scheduled the next");
163 #endif // FFRT_CO_BACKTRACE_OH_ENABLE
164 }
165 } // namespace ffrt
166