1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef FFRT_TASK_SCHEDULER_HPP 17 #define FFRT_TASK_SCHEDULER_HPP 18 #include "sched/task_runqueue.h" 19 #include "tm/task_base.h" 20 #include "util/spmc_queue.h" 21 22 namespace ffrt { 23 extern int PLACE_HOLDER; 24 25 enum class TaskSchedMode : uint8_t { 26 DEFAULT_TASK_SCHED_MODE = 0, // only use global queue 27 LOCAL_TASK_SCHED_MODE, // only use local queue and priority slot 28 }; 29 30 class TaskScheduler { 31 public: 32 TaskScheduler() = default; ~TaskScheduler()33 virtual ~TaskScheduler() {} 34 PushTask(TaskBase * task)35 void PushTask(TaskBase *task) 36 { 37 if (GetTaskSchedMode() == TaskSchedMode::DEFAULT_TASK_SCHED_MODE) { 38 PushTaskGlobal(task); 39 } else if (GetTaskSchedMode() == TaskSchedMode::LOCAL_TASK_SCHED_MODE) { 40 PushTaskLocalOrPriority(task); 41 } 42 } 43 44 TaskBase* PopTask(); 45 46 virtual void SetQos(QoS &q) = 0; 47 48 int qos {0}; 49 50 int StealTask(); 51 void RemoveLocalQueue(SpmcQueue* localQueue); 52 SpmcQueue* GetLocalQueue(); 53 void** GetPriorityTask(); 54 unsigned int** GetWorkerTick(); 55 56 // global_queue.size + totalLocalTaskCnt, not include the PriorityTaskCnt GetTotalTaskCnt()57 uint64_t GetTotalTaskCnt() 58 { 59 uint64_t totalTaskCnt = GetGlobalTaskCnt(); 60 for (auto &localQueue : localQueues) { 61 totalTaskCnt += localQueue.second->GetLength(); 62 } 63 return totalTaskCnt; 64 } 65 // global_queue.size 66 virtual uint64_t GetGlobalTaskCnt() = 0; 67 68 // thread_local local_queue.size, not totalLocalTaskCnt GetLocalTaskCnt()69 inline uint64_t GetLocalTaskCnt() 70 { 71 return GetLocalQueue()->GetLength(); 72 } 73 // thread_local priority.size, not totalPriorityTaskCnt 74 uint64_t GetPriorityTaskCnt(); 75 SetTaskSchedMode(const TaskSchedMode & mode)76 inline void SetTaskSchedMode(const TaskSchedMode& mode) 77 { 78 taskSchedMode = mode; 79 } 80 GetTaskSchedMode()81 inline const TaskSchedMode& GetTaskSchedMode() 82 { 83 return taskSchedMode; 84 } 85 GetWorkerLocalQueue(pid_t pid)86 inline SpmcQueue* GetWorkerLocalQueue(pid_t pid) 87 { 88 std::lock_guard lg(*GetMutex()); 89 return localQueues[pid]; 90 } 91 92 virtual bool PushTaskGlobal(TaskBase* task, bool rtb = true) = 0; 93 virtual TaskBase* PopTaskGlobal() = 0; 94 95 bool CancelUVWork(ffrt_executor_task_t* uvWork); 96 bool PushUVTaskToWaitingQueue(UVTask* task); 97 bool CheckUVTaskConcurrency(UVTask* task); 98 UVTask* PickWaitingUVTask(); 99 100 std::mutex* GetMutex(); 101 IsStealerActive()102 inline bool IsStealerActive() 103 { 104 return stealingInProgress.load(std::memory_order_relaxed); 105 } 106 107 protected: 108 std::unordered_map<pid_t, SpmcQueue*> localQueues; 109 TaskSchedMode taskSchedMode = TaskSchedMode::DEFAULT_TASK_SCHED_MODE; 110 111 void PushTaskLocalOrPriority(TaskBase* task); 112 TaskBase* PopTaskLocalOrPriority(); 113 114 // global queue -> local queue -> priority slot 115 virtual TaskBase* PopTaskHybridProcess() = 0; 116 bool PushTaskToPriorityStack(TaskBase *executorTask); 117 GetUVTask(TaskBase * task)118 TaskBase* GetUVTask(TaskBase* task) 119 { 120 std::lock_guard<std::mutex> lg(uvMtx); 121 UVTask* uvTask = static_cast<UVTask*>(task); 122 auto it = cancelMap_.find(uvTask->uvWork); 123 if (it != cancelMap_.end()) { 124 uvTask->FreeMem(); 125 // the task has been canceled, remove it 126 if (it->second == 1) 127 cancelMap_.erase(it); 128 else 129 it->second--; 130 return nullptr; 131 } 132 133 uvTask->SetDequeued(); 134 return task; 135 } 136 137 private: 138 std::atomic<std::mutex*> mtx {nullptr}; 139 std::mutex uvMtx; 140 std::unordered_map<ffrt_executor_task_t*, uint32_t> cancelMap_; 141 int uvTaskConcurrency_ = 0; 142 std::deque<UVTask*> uvTaskWaitingQueue_; 143 std::atomic<bool> stealingInProgress { false }; /* indicates whether a stealer is in progress or not */ 144 }; 145 146 class SchedulerFactory { 147 public: 148 using AllocCB = std::function<TaskScheduler*()>; 149 using RecycleCB = std::function<void (TaskScheduler*)>; 150 151 static SchedulerFactory& Instance(); 152 Alloc()153 static TaskScheduler *Alloc() 154 { 155 return Instance().alloc_(); 156 } 157 Recycle(TaskScheduler * schd)158 static void Recycle(TaskScheduler *schd) 159 { 160 Instance().recycle_(schd); 161 } 162 RegistCb(const AllocCB & alloc,const RecycleCB & recycle)163 static void RegistCb(const AllocCB &alloc, const RecycleCB &recycle) 164 { 165 Instance().alloc_ = alloc; 166 Instance().recycle_ = recycle; 167 } 168 169 private: 170 AllocCB alloc_; 171 RecycleCB recycle_; 172 }; 173 174 struct LocalQueue { 175 explicit LocalQueue(int qos, std::unordered_map<pid_t, SpmcQueue*> localQueues); 176 ~LocalQueue(); 177 int qos {0}; 178 SpmcQueue* localQueue; 179 }; 180 } // namespace ffrt 181 182 #endif 183