• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef FFRT_EXECUTE_UNIT_HPP
17 #define FFRT_EXECUTE_UNIT_HPP
18 
19 #include <memory>
20 #include <atomic>
21 #include <deque>
22 #include <vector>
23 #include <functional>
24 #include <mutex>
25 #include <shared_mutex>
26 #include <condition_variable>
27 #include <unordered_map>
28 #include <set>
29 #include <map>
30 #include <array>
31 #include "cpp/mutex.h"
32 #include "sched/workgroup_internal.h"
33 #include "eu/thread_group.h"
34 #include "eu/cpu_worker.h"
35 #include "sync/sync.h"
36 #include "internal_inc/osal.h"
37 #include "util/cb_func.h"
38 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
39 #include "eu/blockaware.h"
40 #endif
41 
42 namespace {
43 constexpr uint64_t ONE_STAGE_INTERVAL = 10;
44 constexpr uint64_t TWO_STAGE_INTERVAL = 100;
45 constexpr uint64_t THREE_STAGE_INTERVAL = 1000;
46 constexpr uint64_t ONE_STAGE_WORKER_NUM = 128;
47 constexpr uint64_t TWO_STAGE_WORKER_NUM = 256;
48 constexpr int DEEP_SLEEP_NUM_DOUBLE = 2;
49 }
50 
51 namespace ffrt {
52 enum class TaskNotifyType {
53     TASK_PICKED = 0,
54     TASK_ADDED,
55     TASK_LOCAL,
56     TASK_ESCAPED,
57     TASK_ADDED_RTQ,
58 };
59 
60 struct WorkerStatusInfo {
61     unsigned int startedCnt = 0;
62     unsigned int exitedCnt = 0;
63     std::deque<pid_t> startedTids;
64     std::deque<pid_t> exitedTids;
65 };
66 
67 struct CPUWorkerGroup {
68     // rtg parameters
69     std::unique_ptr<ThreadGroup> tg;
70     uint64_t tgRefCount = 0;
71     mutable std::shared_mutex tgMutex;
72 
73     // worker manage parameters
74     size_t hardLimit{0};
75     size_t maxConcurrency{0};
76     size_t workerStackSize{0};
77     bool setWorkerMaxNum{false};
78     std::unordered_map<CPUWorker *, std::unique_ptr<CPUWorker>> threads;
79     std::mutex mutex;
80     std::condition_variable cv;
81 
82     // group status parameters
83     alignas(cacheline_size) fast_mutex lock;
84     alignas(cacheline_size) int executingNum{0};
85     alignas(cacheline_size) int sleepingNum{0};
86     alignas(cacheline_size) bool irqEnable{false};
87     /* used for performance mode */
88     alignas(cacheline_size) bool fastWakeEnable = false; // directly wakeup first worker by futex
89     alignas(cacheline_size) int pendingWakeCnt = 0;      // number of workers waking but not waked-up yet
90     alignas(cacheline_size) int pendingTaskCnt = 0;      // number of tasks submitted to RTB but not picked-up yet
91 
92     // used for worker share
93     std::vector<std::pair<QoS, bool>> workerShareConfig;
94     int deepSleepingWorkerNum{0};
95     bool retryBeforeDeepSleep{true};
96 
WorkerCreateCPUWorkerGroup97     inline void WorkerCreate()
98     {
99         executingNum++;
100     }
101 
RollBackCreateCPUWorkerGroup102     inline void RollBackCreate()
103     {
104         std::lock_guard lk(lock);
105         executingNum--;
106     }
107 
IntoDeepSleepCPUWorkerGroup108     inline void IntoDeepSleep()
109     {
110         std::lock_guard lk(lock);
111         deepSleepingWorkerNum++;
112     }
113 
114     inline void OutOfDeepSleep(bool irqWake = false)
115     {
116         std::lock_guard lk(lock);
117         if (irqWake) {
118             irqEnable = false;
119         }
120         sleepingNum--;
121         deepSleepingWorkerNum--;
122         executingNum++;
123     }
124 
125     inline void OutOfSleep(bool irqWake = false)
126     {
127         std::lock_guard lk(lock);
128         if (irqWake) {
129             irqEnable = false;
130         }
131         if (pendingWakeCnt > 0) {
132             pendingWakeCnt--;
133         }
134         sleepingNum--;
135         executingNum++;
136     }
137 
WorkerDestroyCPUWorkerGroup138     inline void WorkerDestroy()
139     {
140         std::lock_guard lk(lock);
141         sleepingNum--;
142     }
143 
TryDestroyCPUWorkerGroup144     inline bool TryDestroy()
145     {
146         std::lock_guard lk(lock);
147         sleepingNum--;
148         return sleepingNum > 0;
149     }
150 
151     inline void RollbackDestroy(bool irqWake = false)
152     {
153         std::lock_guard lk(lock);
154         if (irqWake) {
155             irqEnable = false;
156         }
157         executingNum++;
158     }
159 
SetTearDownCPUWorkerGroup160     inline void SetTearDown()
161     {
162         std::shared_lock<std::shared_mutex> lck(tgMutex);
163         for (const auto& pair : threads) {
164             pair.second->SetExited();
165         }
166     }
167 };
168 
169 struct EscapeConfig {
170     bool enableEscape_ = false;
171     uint64_t oneStageIntervalMs_ = ONE_STAGE_INTERVAL;
172     uint64_t twoStageIntervalMs_ = TWO_STAGE_INTERVAL;
173     uint64_t threeStageIntervalMs_ = THREE_STAGE_INTERVAL;
174     uint64_t oneStageWorkerNum_ = ONE_STAGE_WORKER_NUM;
175     uint64_t twoStageWorkerNum_ = TWO_STAGE_WORKER_NUM;
176 };
177 
178 class ExecuteUnit {
179 public:
180     static ExecuteUnit &Instance();
181 
182     static void RegistInsCb(SingleInsCB<ExecuteUnit>::Instance &&cb);
183 
184     ThreadGroup *BindTG(QoS& qos);
185     void UnbindTG(QoS& qos);
186     void BindWG(QoS& qos);
187 
188     // event notify
189     template <TaskNotifyType TYPE>
190     void NotifyTask(const QoS &qos, bool isPollWait = false, bool isRisingEdge = false)
191     {
192         if constexpr (TYPE == TaskNotifyType::TASK_ADDED) {
193             PokeAdd(qos);
194         } else if constexpr (TYPE == TaskNotifyType::TASK_PICKED) {
195             PokePick(qos);
196         } else if constexpr (TYPE == TaskNotifyType::TASK_ESCAPED) {
197             PokeEscape(qos, isPollWait);
198         } else if constexpr (TYPE == TaskNotifyType::TASK_LOCAL) {
199             PokeLocal(qos);
200         } else if constexpr (TYPE == TaskNotifyType::TASK_ADDED_RTQ) {
201             PokeAddRtq(qos, isRisingEdge);
202         }
203     }
204 
205     // dfx op
206     virtual void WorkerInit() = 0;
GetWorkerGroup(int qos)207     CPUWorkerGroup &GetWorkerGroup(int qos)
208     {
209         return workerGroup[qos];
210     }
211 
SetWorkerMaxNum(const QoS & qos,uint32_t num)212     inline int SetWorkerMaxNum(const QoS &qos, uint32_t num)
213     {
214         CPUWorkerGroup &group = workerGroup[qos];
215         std::lock_guard lk(group.lock);
216         if (group.setWorkerMaxNum) {
217             FFRT_SYSEVENT_LOGE("qos[%d] worker num can only been setup once", qos());
218             return -1;
219         }
220         group.hardLimit = static_cast<size_t>(num);
221         group.setWorkerMaxNum = true;
222         return 0;
223     }
224 
225     int SetWorkerStackSize(const QoS &qos, size_t stack_size);
226 
227     // worker escape
228     int SetEscapeEnable(uint64_t oneStageIntervalMs, uint64_t twoStageIntervalMs, uint64_t threeStageIntervalMs,
229         uint64_t oneStageWorkerNum, uint64_t twoStageWorkerNum);
230 
SetEscapeDisable()231     inline void SetEscapeDisable()
232     {
233         escapeConfig.enableEscape_ = false;
234         // after the escape function is disabled, parameters are restored to default values
235         escapeConfig.oneStageIntervalMs_ = ONE_STAGE_INTERVAL;
236         escapeConfig.twoStageIntervalMs_ = TWO_STAGE_INTERVAL;
237         escapeConfig.threeStageIntervalMs_ = THREE_STAGE_INTERVAL;
238         escapeConfig.oneStageWorkerNum_ = ONE_STAGE_WORKER_NUM;
239         escapeConfig.twoStageWorkerNum_ = TWO_STAGE_WORKER_NUM;
240     }
241 
IsEscapeEnable()242     inline bool IsEscapeEnable()
243     {
244         return escapeConfig.enableEscape_;
245     }
246 
247     void SubmitEscape(int qos, uint64_t totalWorkerNum);
248 
GetWorkerNum()249     inline uint64_t GetWorkerNum()
250     {
251         return workerNum.load();
252     }
253 
SetSchedMode(const QoS qos,const sched_mode_type mode)254     inline void SetSchedMode(const QoS qos, const sched_mode_type mode)
255     {
256         schedMode[qos].store(mode);
257     }
258 
GetSchedMode(const QoS qos)259     inline sched_mode_type GetSchedMode(const QoS qos)
260     {
261         return schedMode[qos].load();
262     }
263 
SetWorkerShare(const std::map<QoS,std::vector<std::pair<QoS,bool>>> workerShareConfig)264     inline void SetWorkerShare(const std::map<QoS, std::vector<std::pair<QoS, bool>>> workerShareConfig)
265     {
266         for (const auto& item : workerShareConfig) {
267             workerGroup[item.first].workerShareConfig = item.second;
268         }
269     }
270 
SetTaskBacklog(const std::set<QoS> userTaskBacklogConfig)271     inline void SetTaskBacklog(const std::set<QoS> userTaskBacklogConfig)
272     {
273         for (const QoS& qos : userTaskBacklogConfig) {
274             taskBacklogConfig[qos] = true;
275         }
276     }
277 
278     void DisableWorkerMonitor(const QoS& qos, int tid);
279     void RestoreThreadConfig();
280 
281     void NotifyWorkers(const QoS &qos, int number);
282     // used for worker sharing
283     bool WorkerShare(CPUWorker* worker, std::function<bool(int, CPUWorker*)> taskFunction);
284 // worker dynamic scaling
285 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
286     void MonitorMain();
287     BlockawareWakeupCond *WakeupCond(void);
288 #endif
289     void WorkerStart(int qos);
290     void WorkerExit(int qos);
291     WorkerStatusInfo GetWorkerStatusInfoAndReset(int qos);
292 
293 protected:
294     virtual void WakeupWorkers(const QoS &qos) = 0;
295 
296     // worker manipulate op
297     bool IncWorker(const QoS &qos);
298     virtual void WorkerPrepare(CPUWorker *thread) = 0;
299     virtual WorkerAction WorkerIdleAction(CPUWorker *thread) = 0;
300     void WorkerRetired(CPUWorker *thread);
301 
302     // worker rtg config
303     void WorkerJoinTg(const QoS &qos, pid_t pid);
304     void WorkerLeaveTg(const QoS &qos, pid_t pid);
305 
306     // worker group info
IsExceedDeepSleepThreshold()307     inline bool IsExceedDeepSleepThreshold()
308     {
309         int totalWorker = 0;
310         int deepSleepingWorkerNum = 0;
311         for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
312             CPUWorkerGroup &group = workerGroup[i];
313             std::lock_guard lk(group.lock);
314             deepSleepingWorkerNum += group.deepSleepingWorkerNum;
315             totalWorker += group.executingNum + group.sleepingNum;
316         }
317         return deepSleepingWorkerNum * DEEP_SLEEP_NUM_DOUBLE > totalWorker;
318     }
319 
320     // worker group state
321     virtual void IntoSleep(const QoS &qos) = 0;
322 
323     ExecuteUnit();
324     virtual ~ExecuteUnit();
325 
326     size_t GetRunningNum(const QoS &qos);
327     void ReportEscapeEvent(int qos, size_t totalNum);
328 
329     CPUWorkerGroup workerGroup[QoS::MaxNum()];
330     std::atomic_uint64_t workerNum = 0;
331     std::atomic_bool tearDown{false};
332 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
333     bool blockAwareInit{false};
334     bool stopMonitor{false};
335     unsigned long keyPtr{0};
336     int qosMonitorMaxNum{std::min(QoS::Max(), BLOCKAWARE_DOMAIN_ID_MAX + 1)};
337     BlockawareWakeupCond wakeupCond;
338     BlockawareDomainInfoArea domainInfoMonitor;
339 #endif
340 
341     // eu sched task bacllog array
342     bool taskBacklogConfig[QoS::MaxNum()] = {};
343 private:
344     CPUWorker *CreateCPUWorker(const QoS &qos);
345 
346     virtual void PokeAdd(const QoS &qos) = 0;
347     virtual void PokePick(const QoS &qos) = 0;
348     virtual void PokeLocal(const QoS &qos) = 0;
349     virtual void PokeEscape(const QoS &qos, bool isPollWait) = 0;
350     virtual void PokeAddRtq(const QoS &qos, bool isRisingEdge) = 0;
351 
352     // eu sched mode array
353     static std::array<std::atomic<sched_mode_type>, QoS::MaxNum()> schedMode;
354 
355     // worker escape
356     EscapeConfig escapeConfig;
357     std::atomic<bool> submittedDelayedTask_[QoS::MaxNum()] = {0};
358     WaitUntilEntry *we_[QoS::MaxNum()] = {nullptr};
359     virtual void ExecuteEscape(int qos) = 0;
360 
CalEscapeInterval(uint64_t totalWorkerNum)361     inline uint64_t CalEscapeInterval(uint64_t totalWorkerNum)
362     {
363         if (totalWorkerNum < escapeConfig.oneStageWorkerNum_) {
364             return escapeConfig.oneStageIntervalMs_;
365         } else if (totalWorkerNum >= escapeConfig.oneStageWorkerNum_ &&
366             totalWorkerNum < escapeConfig.twoStageWorkerNum_) {
367             return escapeConfig.twoStageIntervalMs_;
368         } else {
369             return escapeConfig.threeStageIntervalMs_;
370         }
371     }
372 
373 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
374     bool IsBlockAwareInit(void);
375 #endif
376 };
377 } // namespace ffrt
378 #endif
379