• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "queue/traffic_record.h"
17 #include <future>
18 #include <sstream>
19 #include "queue/queue_handler.h"
20 #include "dfx/sysevent/sysevent.h"
21 #include "util/time_format.h"
22 
23 namespace {
24 constexpr uint64_t DEFAULT_TIME_INTERVAL = 6000000;
25 constexpr uint32_t REPORT_INTERVAL = 60000000; // 60s
26 constexpr uint32_t MIN_OVERLOAD_INTERVAL = 15;
27 constexpr uint32_t INITIAL_RECORD_LIMIT = 16;
28 constexpr uint32_t MAX_RECORD_LIMIT = 128;
29 constexpr uint32_t US_TO_MS = 1000;
30 }
31 
32 namespace ffrt {
33 std::vector<std::pair<uint64_t, std::string>> TrafficRecord::trafficRecordInfo =
__anon0d78244f0202() 34     std::async(std::launch::deferred, []() {
35         std::vector<std::pair<uint64_t, std::string>> trafficInfo;
36         trafficInfo.reserve(INITIAL_RECORD_LIMIT);
37         return trafficInfo;
38     }).get();
39 std::mutex TrafficRecord::mtx_;
40 
TrafficRecord()41 TrafficRecord::TrafficRecord() {}
42 
43 #ifdef FFRT_ENABLE_TRAFFIC_MONITOR
SetTimeInterval(const uint64_t timeInterval)44 void TrafficRecord::SetTimeInterval(const uint64_t timeInterval)
45 {
46     const uint64_t& time = TimeStampCntvct();
47     timeInterval_ = timeInterval;
48     nextUpdateTime_ = time + timeInterval_;
49 }
50 
SubmitTraffic(QueueHandler * handler)51 void TrafficRecord::SubmitTraffic(QueueHandler* handler)
52 {
53     submitCnt_.fetch_add(1, std::memory_order_relaxed);
54 
55     if (detectCnt_.fetch_add(1, std::memory_order_relaxed) < MIN_OVERLOAD_INTERVAL) {
56         return;
57     }
58     detectCnt_ = 0;
59     const uint64_t& time = TimeStampCntvct();
60     uint64_t oldVal = nextUpdateTime_.load();
61     if (likely(time < oldVal) || timeInterval_ == 0) {
62         return;
63     }
64 
65     uint64_t nextTime = time + timeInterval_;
66 
67     if (nextUpdateTime_.compare_exchange_strong(oldVal, nextTime)) {
68         CalculateTraffic(handler, time);
69     }
70 }
71 
DoneTraffic()72 void TrafficRecord::DoneTraffic()
73 {
74     doneCnt_.fetch_add(1, std::memory_order_relaxed);
75 }
76 
DoneTraffic(uint32_t count)77 void TrafficRecord::DoneTraffic(uint32_t count)
78 {
79     doneCnt_.fetch_add(count, std::memory_order_relaxed);
80 }
81 #else
SetTimeInterval(const uint64_t timeInterval)82 void TrafficRecord::SetTimeInterval(const uint64_t timeInterval) {}
SubmitTraffic(QueueHandler * handler)83 void TrafficRecord::SubmitTraffic(QueueHandler* handler) {}
DoneTraffic()84 void TrafficRecord::DoneTraffic() {}
DoneTraffic(uint32_t count)85 void TrafficRecord::DoneTraffic(uint32_t count) {}
86 #endif
87 
CalculateTraffic(QueueHandler * handler,const uint64_t & time)88 void TrafficRecord::CalculateTraffic(QueueHandler* handler, const uint64_t& time)
89 {
90     uint32_t inflows = submitCnt_ - submitCntOld_;
91     uint32_t outflows = doneCnt_ - doneCntOld_;
92     DetectCountThreshold();
93     submitCntOld_.store(submitCnt_.load());
94     doneCntOld_.store(doneCnt_.load());
95 
96     uint64_t reportInterval = lastCheckTime_ != 0 ? time - lastCheckTime_ : timeInterval_;
97     lastCheckTime_ = time;
98 
99     if (inflows > outflows && (submitCnt_ - doneCnt_ > MIN_OVERLOAD_INTERVAL * (reportInterval / timeInterval_))
100         && handler->GetTaskCnt() > MIN_OVERLOAD_INTERVAL) {
101         std::stringstream ss;
102         ss << "[" << handler->GetName().c_str() << "]overload over[" << reportInterval / US_TO_MS <<
103             "]ms, in[" << inflows << "], out[" << outflows << "], subcnt[" << submitCnt_.load() << "], doncnt[" <<
104             doneCnt_.load() << "]";
105         FFRT_LOGW("%s", ss.str().c_str());
106 
107         {
108             std::lock_guard lock(mtx_);
109             if (trafficRecordInfo.size() > MAX_RECORD_LIMIT) {
110                 trafficRecordInfo.erase(trafficRecordInfo.begin());
111             }
112             trafficRecordInfo.emplace_back(std::make_pair(time, ss.str()));
113         }
114 
115         if (submitCnt_ - doneCnt_ > outflows && handler->GetQueueDueCount() > outflows) {
116             if (time - lastReportTime_ > REPORT_INTERVAL) {
117                 ReportOverload(ss);
118                 lastReportTime_ = time;
119             }
120             FFRT_LOGE("Queue overload syswarning is triggerred, duecnt %llu", handler->GetQueueDueCount());
121         }
122     }
123 }
124 
DetectCountThreshold()125 void TrafficRecord::DetectCountThreshold()
126 {
127     if (doneCnt_.load() > 0x00FFFFFF) {
128         FFRT_LOGW("Traffic Record exceed threshold, trigger remove");
129         uint32_t oldTemp = doneCnt_.load();
130         doneCnt_.fetch_sub(oldTemp);
131         submitCnt_.fetch_sub(oldTemp);
132     }
133 }
134 
ReportOverload(std::stringstream & ss)135 void TrafficRecord::ReportOverload(std::stringstream& ss)
136 {
137 #ifdef FFRT_SEND_EVENT
138     std::string senarioName = "QUEUE_TASK_OVERLOAD";
139     TrafficOverloadReport(ss, senarioName);
140 #endif
141 }
142 
DumpTrafficInfo(bool withLock)143 std::string TrafficRecord::DumpTrafficInfo(bool withLock)
144 {
145     std::stringstream ss;
146     if (withLock) {
147         mtx_.lock();
148     }
149     if (trafficRecordInfo.size() != 0) {
150         for (auto it = trafficRecordInfo.rbegin(); it != trafficRecordInfo.rend(); ++it) {
151             auto& record = *it;
152             ss << "{" << FormatDateString4SteadyClock(record.first) << ", " << record.second << "} \n";
153         }
154     } else {
155         ss << "Queue Traffic Record Empty";
156     }
157     if (withLock) {
158         mtx_.unlock();
159     }
160     return ss.str();
161 }
162 } // namespace ffrt
163