1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef FFRT_EXECUTE_UNIT_HPP 17 #define FFRT_EXECUTE_UNIT_HPP 18 19 #include <memory> 20 #include <atomic> 21 #include <deque> 22 #include <vector> 23 #include <functional> 24 #include <mutex> 25 #include <shared_mutex> 26 #include <condition_variable> 27 #include <unordered_map> 28 #include <set> 29 #include <map> 30 #include <array> 31 #include "cpp/mutex.h" 32 #include "sched/workgroup_internal.h" 33 #include "eu/thread_group.h" 34 #include "eu/cpu_worker.h" 35 #include "sync/sync.h" 36 #include "internal_inc/osal.h" 37 #include "util/cb_func.h" 38 #ifdef FFRT_WORKERS_DYNAMIC_SCALING 39 #include "eu/blockaware.h" 40 #endif 41 42 namespace { 43 constexpr uint64_t ONE_STAGE_INTERVAL = 10; 44 constexpr uint64_t TWO_STAGE_INTERVAL = 100; 45 constexpr uint64_t THREE_STAGE_INTERVAL = 1000; 46 constexpr uint64_t ONE_STAGE_WORKER_NUM = 128; 47 constexpr uint64_t TWO_STAGE_WORKER_NUM = 256; 48 constexpr int DEEP_SLEEP_NUM_DOUBLE = 2; 49 } 50 51 namespace ffrt { 52 enum class TaskNotifyType { 53 TASK_PICKED = 0, 54 TASK_ADDED, 55 TASK_LOCAL, 56 TASK_ESCAPED, 57 TASK_ADDED_RTQ, 58 }; 59 60 struct WorkerStatusInfo { 61 unsigned int startedCnt = 0; 62 unsigned int exitedCnt = 0; 63 std::deque<pid_t> startedTids; 64 std::deque<pid_t> exitedTids; 65 }; 66 67 struct CPUWorkerGroup { 68 // rtg parameters 69 std::unique_ptr<ThreadGroup> tg; 70 uint64_t tgRefCount = 0; 71 mutable std::shared_mutex tgMutex; 72 73 // worker manage parameters 74 size_t hardLimit{0}; 75 size_t maxConcurrency{0}; 76 size_t workerStackSize{0}; 77 bool setWorkerMaxNum{false}; 78 std::unordered_map<CPUWorker *, std::unique_ptr<CPUWorker>> threads; 79 std::mutex mutex; 80 std::condition_variable cv; 81 82 // group status parameters 83 alignas(cacheline_size) fast_mutex lock; 84 alignas(cacheline_size) int executingNum{0}; 85 alignas(cacheline_size) int sleepingNum{0}; 86 alignas(cacheline_size) bool irqEnable{false}; 87 /* used for performance mode */ 88 alignas(cacheline_size) bool fastWakeEnable = false; // directly wakeup first worker by futex 89 alignas(cacheline_size) int pendingWakeCnt = 0; // number of workers waking but not waked-up yet 90 alignas(cacheline_size) int pendingTaskCnt = 0; // number of tasks submitted to RTB but not picked-up yet 91 92 // used for worker share 93 std::vector<std::pair<QoS, bool>> workerShareConfig; 94 int deepSleepingWorkerNum{0}; 95 bool retryBeforeDeepSleep{true}; 96 WorkerCreateCPUWorkerGroup97 inline void WorkerCreate() 98 { 99 executingNum++; 100 } 101 RollBackCreateCPUWorkerGroup102 inline void RollBackCreate() 103 { 104 std::lock_guard lk(lock); 105 executingNum--; 106 } 107 IntoDeepSleepCPUWorkerGroup108 inline void IntoDeepSleep() 109 { 110 std::lock_guard lk(lock); 111 deepSleepingWorkerNum++; 112 } 113 114 inline void OutOfDeepSleep(bool irqWake = false) 115 { 116 std::lock_guard lk(lock); 117 if (irqWake) { 118 irqEnable = false; 119 } 120 sleepingNum--; 121 deepSleepingWorkerNum--; 122 executingNum++; 123 } 124 125 inline void OutOfSleep(bool irqWake = false) 126 { 127 std::lock_guard lk(lock); 128 if (irqWake) { 129 irqEnable = false; 130 } 131 if (pendingWakeCnt > 0) { 132 pendingWakeCnt--; 133 } 134 sleepingNum--; 135 executingNum++; 136 } 137 WorkerDestroyCPUWorkerGroup138 inline void WorkerDestroy() 139 { 140 std::lock_guard lk(lock); 141 sleepingNum--; 142 } 143 TryDestroyCPUWorkerGroup144 inline bool TryDestroy() 145 { 146 std::lock_guard lk(lock); 147 sleepingNum--; 148 return sleepingNum > 0; 149 } 150 151 inline void RollbackDestroy(bool irqWake = false) 152 { 153 std::lock_guard lk(lock); 154 if (irqWake) { 155 irqEnable = false; 156 } 157 executingNum++; 158 } 159 SetTearDownCPUWorkerGroup160 inline void SetTearDown() 161 { 162 std::shared_lock<std::shared_mutex> lck(tgMutex); 163 for (const auto& pair : threads) { 164 pair.second->SetExited(); 165 } 166 } 167 }; 168 169 struct EscapeConfig { 170 bool enableEscape_ = false; 171 uint64_t oneStageIntervalMs_ = ONE_STAGE_INTERVAL; 172 uint64_t twoStageIntervalMs_ = TWO_STAGE_INTERVAL; 173 uint64_t threeStageIntervalMs_ = THREE_STAGE_INTERVAL; 174 uint64_t oneStageWorkerNum_ = ONE_STAGE_WORKER_NUM; 175 uint64_t twoStageWorkerNum_ = TWO_STAGE_WORKER_NUM; 176 }; 177 178 class ExecuteUnit { 179 public: 180 static ExecuteUnit &Instance(); 181 182 static void RegistInsCb(SingleInsCB<ExecuteUnit>::Instance &&cb); 183 184 ThreadGroup *BindTG(QoS& qos); 185 void UnbindTG(QoS& qos); 186 void BindWG(QoS& qos); 187 188 // event notify 189 template <TaskNotifyType TYPE> 190 void NotifyTask(const QoS &qos, bool isPollWait = false, bool isRisingEdge = false) 191 { 192 if constexpr (TYPE == TaskNotifyType::TASK_ADDED) { 193 PokeAdd(qos); 194 } else if constexpr (TYPE == TaskNotifyType::TASK_PICKED) { 195 PokePick(qos); 196 } else if constexpr (TYPE == TaskNotifyType::TASK_ESCAPED) { 197 PokeEscape(qos, isPollWait); 198 } else if constexpr (TYPE == TaskNotifyType::TASK_LOCAL) { 199 PokeLocal(qos); 200 } else if constexpr (TYPE == TaskNotifyType::TASK_ADDED_RTQ) { 201 PokeAddRtq(qos, isRisingEdge); 202 } 203 } 204 205 // dfx op 206 virtual void WorkerInit() = 0; GetWorkerGroup(int qos)207 CPUWorkerGroup &GetWorkerGroup(int qos) 208 { 209 return workerGroup[qos]; 210 } 211 SetWorkerMaxNum(const QoS & qos,uint32_t num)212 inline int SetWorkerMaxNum(const QoS &qos, uint32_t num) 213 { 214 CPUWorkerGroup &group = workerGroup[qos]; 215 std::lock_guard lk(group.lock); 216 if (group.setWorkerMaxNum) { 217 FFRT_SYSEVENT_LOGE("qos[%d] worker num can only been setup once", qos()); 218 return -1; 219 } 220 group.hardLimit = static_cast<size_t>(num); 221 group.setWorkerMaxNum = true; 222 return 0; 223 } 224 225 int SetWorkerStackSize(const QoS &qos, size_t stack_size); 226 227 // worker escape 228 int SetEscapeEnable(uint64_t oneStageIntervalMs, uint64_t twoStageIntervalMs, uint64_t threeStageIntervalMs, 229 uint64_t oneStageWorkerNum, uint64_t twoStageWorkerNum); 230 SetEscapeDisable()231 inline void SetEscapeDisable() 232 { 233 escapeConfig.enableEscape_ = false; 234 // after the escape function is disabled, parameters are restored to default values 235 escapeConfig.oneStageIntervalMs_ = ONE_STAGE_INTERVAL; 236 escapeConfig.twoStageIntervalMs_ = TWO_STAGE_INTERVAL; 237 escapeConfig.threeStageIntervalMs_ = THREE_STAGE_INTERVAL; 238 escapeConfig.oneStageWorkerNum_ = ONE_STAGE_WORKER_NUM; 239 escapeConfig.twoStageWorkerNum_ = TWO_STAGE_WORKER_NUM; 240 } 241 IsEscapeEnable()242 inline bool IsEscapeEnable() 243 { 244 return escapeConfig.enableEscape_; 245 } 246 247 void SubmitEscape(int qos, uint64_t totalWorkerNum); 248 GetWorkerNum()249 inline uint64_t GetWorkerNum() 250 { 251 return workerNum.load(); 252 } 253 SetSchedMode(const QoS qos,const sched_mode_type mode)254 inline void SetSchedMode(const QoS qos, const sched_mode_type mode) 255 { 256 schedMode[qos].store(mode); 257 } 258 GetSchedMode(const QoS qos)259 inline sched_mode_type GetSchedMode(const QoS qos) 260 { 261 return schedMode[qos].load(); 262 } 263 SetWorkerShare(const std::map<QoS,std::vector<std::pair<QoS,bool>>> workerShareConfig)264 inline void SetWorkerShare(const std::map<QoS, std::vector<std::pair<QoS, bool>>> workerShareConfig) 265 { 266 for (const auto& item : workerShareConfig) { 267 workerGroup[item.first].workerShareConfig = item.second; 268 } 269 } 270 SetTaskBacklog(const std::set<QoS> userTaskBacklogConfig)271 inline void SetTaskBacklog(const std::set<QoS> userTaskBacklogConfig) 272 { 273 for (const QoS& qos : userTaskBacklogConfig) { 274 taskBacklogConfig[qos] = true; 275 } 276 } 277 278 void DisableWorkerMonitor(const QoS& qos, int tid); 279 void RestoreThreadConfig(); 280 281 void NotifyWorkers(const QoS &qos, int number); 282 // used for worker sharing 283 bool WorkerShare(CPUWorker* worker, std::function<bool(int, CPUWorker*)> taskFunction); 284 // worker dynamic scaling 285 #ifdef FFRT_WORKERS_DYNAMIC_SCALING 286 void MonitorMain(); 287 BlockawareWakeupCond *WakeupCond(void); 288 #endif 289 void WorkerStart(int qos); 290 void WorkerExit(int qos); 291 WorkerStatusInfo GetWorkerStatusInfoAndReset(int qos); 292 293 protected: 294 virtual void WakeupWorkers(const QoS &qos) = 0; 295 296 // worker manipulate op 297 bool IncWorker(const QoS &qos); 298 virtual void WorkerPrepare(CPUWorker *thread) = 0; 299 virtual WorkerAction WorkerIdleAction(CPUWorker *thread) = 0; 300 void WorkerRetired(CPUWorker *thread); 301 302 // worker rtg config 303 void WorkerJoinTg(const QoS &qos, pid_t pid); 304 void WorkerLeaveTg(const QoS &qos, pid_t pid); 305 306 // worker group info IsExceedDeepSleepThreshold()307 inline bool IsExceedDeepSleepThreshold() 308 { 309 int totalWorker = 0; 310 int deepSleepingWorkerNum = 0; 311 for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) { 312 CPUWorkerGroup &group = workerGroup[i]; 313 std::lock_guard lk(group.lock); 314 deepSleepingWorkerNum += group.deepSleepingWorkerNum; 315 totalWorker += group.executingNum + group.sleepingNum; 316 } 317 return deepSleepingWorkerNum * DEEP_SLEEP_NUM_DOUBLE > totalWorker; 318 } 319 320 // worker group state 321 virtual void IntoSleep(const QoS &qos) = 0; 322 323 ExecuteUnit(); 324 virtual ~ExecuteUnit(); 325 326 size_t GetRunningNum(const QoS &qos); 327 void ReportEscapeEvent(int qos, size_t totalNum); 328 329 CPUWorkerGroup workerGroup[QoS::MaxNum()]; 330 std::atomic_uint64_t workerNum = 0; 331 std::atomic_bool tearDown{false}; 332 #ifdef FFRT_WORKERS_DYNAMIC_SCALING 333 bool blockAwareInit{false}; 334 bool stopMonitor{false}; 335 unsigned long keyPtr{0}; 336 int qosMonitorMaxNum{std::min(QoS::Max(), BLOCKAWARE_DOMAIN_ID_MAX + 1)}; 337 BlockawareWakeupCond wakeupCond; 338 BlockawareDomainInfoArea domainInfoMonitor; 339 #endif 340 341 // eu sched task bacllog array 342 bool taskBacklogConfig[QoS::MaxNum()] = {}; 343 private: 344 CPUWorker *CreateCPUWorker(const QoS &qos); 345 346 virtual void PokeAdd(const QoS &qos) = 0; 347 virtual void PokePick(const QoS &qos) = 0; 348 virtual void PokeLocal(const QoS &qos) = 0; 349 virtual void PokeEscape(const QoS &qos, bool isPollWait) = 0; 350 virtual void PokeAddRtq(const QoS &qos, bool isRisingEdge) = 0; 351 352 // eu sched mode array 353 static std::array<std::atomic<sched_mode_type>, QoS::MaxNum()> schedMode; 354 355 // worker escape 356 EscapeConfig escapeConfig; 357 std::atomic<bool> submittedDelayedTask_[QoS::MaxNum()] = {0}; 358 WaitUntilEntry *we_[QoS::MaxNum()] = {nullptr}; 359 virtual void ExecuteEscape(int qos) = 0; 360 CalEscapeInterval(uint64_t totalWorkerNum)361 inline uint64_t CalEscapeInterval(uint64_t totalWorkerNum) 362 { 363 if (totalWorkerNum < escapeConfig.oneStageWorkerNum_) { 364 return escapeConfig.oneStageIntervalMs_; 365 } else if (totalWorkerNum >= escapeConfig.oneStageWorkerNum_ && 366 totalWorkerNum < escapeConfig.twoStageWorkerNum_) { 367 return escapeConfig.twoStageIntervalMs_; 368 } else { 369 return escapeConfig.threeStageIntervalMs_; 370 } 371 } 372 373 #ifdef FFRT_WORKERS_DYNAMIC_SCALING 374 bool IsBlockAwareInit(void); 375 #endif 376 }; 377 } // namespace ffrt 378 #endif 379