• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef CPU_MONITOR_H
17 #define CPU_MONITOR_H
18 
19 #include <atomic>
20 #include <vector>
21 #include <functional>
22 #include <mutex>
23 #include "qos.h"
24 #include "cpp/mutex.h"
25 #include "eu/cpu_manager_strategy.h"
26 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
27 #include "eu/blockaware.h"
28 #endif
29 #include "sync/sync.h"
30 
31 namespace ffrt {
32 struct WorkerCtrl {
33     alignas(cacheline_size) fast_mutex lock;
34     alignas(cacheline_size) int executionNum = 0;
35     alignas(cacheline_size) int sleepingWorkerNum = 0;
36     alignas(cacheline_size) bool irqEnable = false;
37     /* used for performance mode */
38     alignas(cacheline_size) bool fastWakeEnable = false; // directly wakeup first worker by futex
39     alignas(cacheline_size) int pendingWakeCnt = 0; // number of workers waking but not waked-up yet
40     alignas(cacheline_size) int pendingTaskCnt = 0; // number of tasks submitted to RTB but not picked-up yet
41     size_t hardLimit = 0;
42     size_t maxConcurrency = 0;
43     bool pollWaitFlag = false;
44     int deepSleepingWorkerNum = 0;
45     bool retryBeforeDeepSleep = true;
46 };
47 
48 class EscapeMgr {
49 public:
EscapeMgr(const std::function<void (int,void *)> & executeEscapeFunc)50     EscapeMgr(const std::function<void(int, void*)>& executeEscapeFunc) : executeEscapeFunc_(executeEscapeFunc)
51     {
52         for (int idx = 0; idx < QoS::MaxNum(); idx++) {
53             we_[idx] = new WaitUntilEntry;
54             we_[idx]->cb = nullptr;
55         }
56     }
57 
~EscapeMgr()58     ~EscapeMgr()
59     {
60         FFRT_LOGI("Destructor.");
61         for (int idx = 0; idx < QoS::MaxNum(); idx++) {
62             if (we_[idx] != nullptr) {
63                 delete we_[idx];
64                 we_[idx] = nullptr;
65             }
66         }
67     }
68 
SetEscapeEnable(uint64_t oneStageIntervalMs,uint64_t twoStageIntervalMs,uint64_t threeStageIntervalMs,uint64_t oneStageWorkerNum,uint64_t twoStageWorkerNum)69     int SetEscapeEnable(uint64_t oneStageIntervalMs, uint64_t twoStageIntervalMs,
70         uint64_t threeStageIntervalMs, uint64_t oneStageWorkerNum, uint64_t twoStageWorkerNum)
71     {
72         if (enableEscape_) {
73             FFRT_LOGW("Worker escape is enabled, the interface cannot be invoked repeatedly.");
74             return 1;
75         }
76 
77         if (oneStageIntervalMs < oneStageIntervalMs_ || twoStageIntervalMs < twoStageIntervalMs_ ||
78             threeStageIntervalMs < threeStageIntervalMs_ || oneStageWorkerNum > twoStageWorkerNum) {
79             FFRT_LOGE("Setting failed, each stage interval value [%lu, %lu, %lu] "
80                 "cannot be smaller than default value [%lu, %lu, %lu], "
81                 "and one-stage worker number [%lu] cannot be larger than two-stage worker number [%lu].",
82                 oneStageIntervalMs, twoStageIntervalMs, threeStageIntervalMs, oneStageIntervalMs_,
83                 twoStageIntervalMs_, threeStageIntervalMs_, oneStageWorkerNum, twoStageWorkerNum);
84             return 1;
85         }
86 
87         enableEscape_ = true;
88         oneStageIntervalMs_ = oneStageIntervalMs;
89         twoStageIntervalMs_ = twoStageIntervalMs;
90         threeStageIntervalMs_ = threeStageIntervalMs;
91         oneStageWorkerNum_ = oneStageWorkerNum;
92         twoStageWorkerNum_ = twoStageWorkerNum;
93         FFRT_LOGI("Enable worker escape success, one-stage interval ms %lu, two-stage interval ms %lu, "
94             "three-stage interval ms %lu, one-stage worker number %lu, two-stage worker number %lu.",
95             oneStageIntervalMs_, twoStageIntervalMs_, threeStageIntervalMs_, oneStageWorkerNum_, twoStageWorkerNum_);
96         return 0;
97     }
98 
SetEscapeDisable()99     void SetEscapeDisable()
100     {
101         enableEscape_ = false;
102         // after the escape function is disabled, parameters are restored to default values
103         oneStageIntervalMs_ = 10;
104         twoStageIntervalMs_ = 100;
105         threeStageIntervalMs_ = 1000;
106         oneStageWorkerNum_ = 128;
107         twoStageWorkerNum_ = 256;
108     }
109 
IsEscapeEnable()110     bool IsEscapeEnable()
111     {
112         return enableEscape_;
113     }
114 
CalEscapeInterval(uint64_t totalWorkerNum)115     uint64_t CalEscapeInterval(uint64_t totalWorkerNum)
116     {
117         if (totalWorkerNum < oneStageWorkerNum_) {
118             return oneStageIntervalMs_;
119         } else if (totalWorkerNum >= oneStageWorkerNum_ && totalWorkerNum < twoStageWorkerNum_) {
120             return twoStageIntervalMs_;
121         } else {
122             return threeStageIntervalMs_;
123         }
124     }
125 
SubmitEscape(int qos,uint64_t totalWorkerNum,void * monitor)126     void SubmitEscape(int qos, uint64_t totalWorkerNum, void* monitor)
127     {
128         // escape event has been triggered and will not be submitted repeatedly
129         if (submittedDelayedTask_[qos]) {
130             return;
131         }
132 
133         we_[qos]->tp = std::chrono::steady_clock::now() + std::chrono::milliseconds(CalEscapeInterval(totalWorkerNum));
134         if (we_[qos]->cb == nullptr) {
135             we_[qos]->cb = [this, qos, monitor] (WaitEntry* we) {
136                 (void)we;
137                 executeEscapeFunc_(qos, monitor);
138                 submittedDelayedTask_[qos] = false;
139             };
140         }
141 
142         if (!DelayedWakeup(we_[qos]->tp, we_[qos], we_[qos]->cb)) {
143             FFRT_LOGW("Failed to set qos %d escape task.", qos);
144             return;
145         }
146 
147         submittedDelayedTask_[qos] = true;
148     }
149 
150 private:
151     bool enableEscape_ = false;
152     uint64_t oneStageIntervalMs_ = 10;
153     uint64_t twoStageIntervalMs_ = 100;
154     uint64_t threeStageIntervalMs_ = 1000;
155     uint64_t oneStageWorkerNum_ = 128;
156     uint64_t twoStageWorkerNum_ = 256;
157 
158     bool submittedDelayedTask_[QoS::MaxNum()] = {0};
159     WaitUntilEntry* we_[QoS::MaxNum()] = {nullptr};
160 
161     std::function<void(int, void*)> executeEscapeFunc_;
162 };
163 
164 class CPUMonitor {
165 public:
166     CPUMonitor(CpuMonitorOps&& ops, const std::function<void(int, void*)>& executeEscapeFunc);
167     CPUMonitor(const CPUMonitor&) = delete;
168     CPUMonitor& operator=(const CPUMonitor&) = delete;
169     virtual ~CPUMonitor();
170     uint32_t GetMonitorTid() const;
171     int TotalCount(const QoS& qos);
172     virtual void IntoSleep(const QoS& qos) = 0;
173     virtual void IntoPollWait(const QoS& qos) = 0;
174 
175     void WakeupSleep(const QoS& qos, bool irqWake = false);
176     void IntoDeepSleep(const QoS& qos);
177     void WakeupDeepSleep(const QoS& qos, bool irqWake = false);
178     void TimeoutCount(const QoS& qos);
179     bool IsExceedDeepSleepThreshold();
180     void OutOfPollWait(const QoS& qos);
181     void RollbackDestroy(const QoS& qos, bool irqWake = false);
182     bool TryDestroy(const QoS& qos);
183 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
184     bool IsExceedRunningThreshold(const QoS& qos);
185     bool IsBlockAwareInit(void);
186     void MonitorMain();
187     BlockawareWakeupCond* WakeupCond(void);
188 #endif
189     virtual void Notify(const QoS& qos, TaskNotifyType notifyType) = 0;
190     virtual void WorkerInit() = 0;
191     int SetWorkerMaxNum(const QoS& qos, uint32_t num);
192     int WakedWorkerNum(const QoS& qos);
193     int SleepingWorkerNum(const QoS& qos);
194     void NotifyWorkers(const QoS& qos, int number);
195     void StartMonitor();
196     int SetEscapeEnable(uint64_t oneStageIntervalMs, uint64_t twoStageIntervalMs,
197         uint64_t threeStageIntervalMs, uint64_t oneStageWorkerNum, uint64_t twoStageWorkerNum);
198     void SetEscapeDisable();
199 
200     CpuMonitorOps ops;
201     std::thread* monitorThread = nullptr;
202     uint32_t monitorTid = 0;
203 
204 protected:
205     size_t GetRunningNum(const QoS& qos);
206     void ReportEscapeEvent(int qos, size_t totalNum);
207 
GetOps()208     CpuMonitorOps& GetOps()
209     {
210         return ops;
211     }
212 
213     WorkerCtrl ctrlQueue[QoS::MaxNum()];
214 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
215     bool blockAwareInit = false;
216     bool stopMonitor = false;
217     unsigned long keyPtr = 0;
218     int qosMonitorMaxNum = std::min(QoS::Max(), BLOCKAWARE_DOMAIN_ID_MAX + 1);
219     BlockawareWakeupCond wakeupCond;
220     BlockawareDomainInfoArea domainInfoMonitor;
221 #endif
222     EscapeMgr escapeMgr_;
223 
224 private:
225     void SetupMonitor();
226 
227     std::atomic<bool> setWorkerMaxNum[QoS::MaxNum()];
228 };
229 }
230 #endif /* CPU_MONITOR_H */
231