1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "queue/traffic_record.h"
17 #include <future>
18 #include <sstream>
19 #include "queue/queue_handler.h"
20 #include "dfx/sysevent/sysevent.h"
21 #include "util/time_format.h"
22
23 namespace {
24 constexpr uint64_t DEFAULT_TIME_INTERVAL = 6000000;
25 constexpr uint32_t REPORT_INTERVAL = 60000000; // 60s
26 constexpr uint32_t MIN_OVERLOAD_INTERVAL = 15;
27 constexpr uint32_t INITIAL_RECORD_LIMIT = 16;
28 constexpr uint32_t MAX_RECORD_LIMIT = 128;
29 constexpr uint32_t US_TO_MS = 1000;
30 }
31
32 namespace ffrt {
33 std::vector<std::pair<uint64_t, std::string>> TrafficRecord::trafficRecordInfo =
__anon0d78244f0202() 34 std::async(std::launch::deferred, []() {
35 std::vector<std::pair<uint64_t, std::string>> trafficInfo;
36 trafficInfo.reserve(INITIAL_RECORD_LIMIT);
37 return trafficInfo;
38 }).get();
39 std::mutex TrafficRecord::mtx_;
40
TrafficRecord()41 TrafficRecord::TrafficRecord() {}
42
43 #ifdef FFRT_ENABLE_TRAFFIC_MONITOR
SetTimeInterval(const uint64_t timeInterval)44 void TrafficRecord::SetTimeInterval(const uint64_t timeInterval)
45 {
46 const uint64_t& time = TimeStampCntvct();
47 timeInterval_ = timeInterval;
48 nextUpdateTime_ = time + timeInterval_;
49 }
50
SubmitTraffic(QueueHandler * handler)51 void TrafficRecord::SubmitTraffic(QueueHandler* handler)
52 {
53 submitCnt_.fetch_add(1, std::memory_order_relaxed);
54
55 if (detectCnt_.fetch_add(1, std::memory_order_relaxed) < MIN_OVERLOAD_INTERVAL) {
56 return;
57 }
58 detectCnt_ = 0;
59 const uint64_t& time = TimeStampCntvct();
60 uint64_t oldVal = nextUpdateTime_.load();
61 if (likely(time < oldVal) || timeInterval_ == 0) {
62 return;
63 }
64
65 uint64_t nextTime = time + timeInterval_;
66
67 if (nextUpdateTime_.compare_exchange_strong(oldVal, nextTime)) {
68 CalculateTraffic(handler, time);
69 }
70 }
71
DoneTraffic()72 void TrafficRecord::DoneTraffic()
73 {
74 doneCnt_.fetch_add(1, std::memory_order_relaxed);
75 }
76
DoneTraffic(uint32_t count)77 void TrafficRecord::DoneTraffic(uint32_t count)
78 {
79 doneCnt_.fetch_add(count, std::memory_order_relaxed);
80 }
81 #else
SetTimeInterval(const uint64_t timeInterval)82 void TrafficRecord::SetTimeInterval(const uint64_t timeInterval) {}
SubmitTraffic(QueueHandler * handler)83 void TrafficRecord::SubmitTraffic(QueueHandler* handler) {}
DoneTraffic()84 void TrafficRecord::DoneTraffic() {}
DoneTraffic(uint32_t count)85 void TrafficRecord::DoneTraffic(uint32_t count) {}
86 #endif
87
CalculateTraffic(QueueHandler * handler,const uint64_t & time)88 void TrafficRecord::CalculateTraffic(QueueHandler* handler, const uint64_t& time)
89 {
90 uint32_t inflows = submitCnt_ - submitCntOld_;
91 uint32_t outflows = doneCnt_ - doneCntOld_;
92 DetectCountThreshold();
93 submitCntOld_.store(submitCnt_.load());
94 doneCntOld_.store(doneCnt_.load());
95
96 uint64_t reportInterval = lastCheckTime_ != 0 ? time - lastCheckTime_ : timeInterval_;
97 lastCheckTime_ = time;
98
99 if (inflows > outflows && (submitCnt_ - doneCnt_ > MIN_OVERLOAD_INTERVAL * (reportInterval / timeInterval_))
100 && handler->GetTaskCnt() > MIN_OVERLOAD_INTERVAL) {
101 std::stringstream ss;
102 ss << "[" << handler->GetName().c_str() << "]overload over[" << reportInterval / US_TO_MS <<
103 "]ms, in[" << inflows << "], out[" << outflows << "], subcnt[" << submitCnt_.load() << "], doncnt[" <<
104 doneCnt_.load() << "]";
105 FFRT_LOGW("%s", ss.str().c_str());
106
107 {
108 std::lock_guard lock(mtx_);
109 if (trafficRecordInfo.size() > MAX_RECORD_LIMIT) {
110 trafficRecordInfo.erase(trafficRecordInfo.begin());
111 }
112 trafficRecordInfo.emplace_back(std::make_pair(time, ss.str()));
113 }
114
115 if (submitCnt_ - doneCnt_ > outflows && handler->GetQueueDueCount() > outflows) {
116 if (time - lastReportTime_ > REPORT_INTERVAL) {
117 ReportOverload(ss);
118 lastReportTime_ = time;
119 }
120 FFRT_LOGE("Queue overload syswarning is triggerred, duecnt %llu", handler->GetQueueDueCount());
121 }
122 }
123 }
124
DetectCountThreshold()125 void TrafficRecord::DetectCountThreshold()
126 {
127 if (doneCnt_.load() > 0x00FFFFFF) {
128 FFRT_LOGW("Traffic Record exceed threshold, trigger remove");
129 uint32_t oldTemp = doneCnt_.load();
130 doneCnt_.fetch_sub(oldTemp);
131 submitCnt_.fetch_sub(oldTemp);
132 }
133 }
134
ReportOverload(std::stringstream & ss)135 void TrafficRecord::ReportOverload(std::stringstream& ss)
136 {
137 #ifdef FFRT_SEND_EVENT
138 std::string senarioName = "QUEUE_TASK_OVERLOAD";
139 TrafficOverloadReport(ss, senarioName);
140 #endif
141 }
142
DumpTrafficInfo(bool withLock)143 std::string TrafficRecord::DumpTrafficInfo(bool withLock)
144 {
145 std::stringstream ss;
146 if (withLock) {
147 mtx_.lock();
148 }
149 if (trafficRecordInfo.size() != 0) {
150 for (auto it = trafficRecordInfo.rbegin(); it != trafficRecordInfo.rend(); ++it) {
151 auto& record = *it;
152 ss << "{" << FormatDateString4SteadyClock(record.first) << ", " << record.second << "} \n";
153 }
154 } else {
155 ss << "Queue Traffic Record Empty";
156 }
157 if (withLock) {
158 mtx_.unlock();
159 }
160 return ss.str();
161 }
162 } // namespace ffrt
163