1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 18 19 #include <array> 20 #include <list> 21 #include <memory> 22 #include <mutex> 23 #include <set> 24 #include <shared_mutex> 25 #include <unordered_map> 26 #include <unordered_set> 27 #include <vector> 28 29 #include "dfx_hisys_event.h" 30 #include "napi/native_api.h" 31 #include "sequence_runner.h" 32 #include "task.h" 33 #include "task_queue.h" 34 #include "task_group.h" 35 #include "worker.h" 36 37 namespace Commonlibrary::Concurrent::TaskPoolModule { 38 using namespace Commonlibrary::Concurrent::Common; 39 40 static constexpr char ARGUMENTS_STR[] = "arguments"; 41 static constexpr char NAME[] = "name"; 42 static constexpr char FUNCTION_STR[] = "function"; 43 static constexpr char GROUP_ID_STR[] = "groupId"; 44 static constexpr char TASKID_STR[] = "taskId"; 45 static constexpr char TASKINFO_STR[] = "taskInfo"; 46 static constexpr char TRANSFERLIST_STR[] = "transferList"; 47 static constexpr char CLONE_LIST_STR[] = "cloneList"; 48 static constexpr char ADD_DEPENDENCY_STR[] = "addDependency"; 49 static constexpr char REMOVE_DEPENDENCY_STR[] = "removeDependency"; 50 static constexpr char TASK_CPU_TIME[] = "cpuDuration"; 51 static constexpr char TASK_IO_TIME[] = "ioDuration"; 52 static constexpr char TASK_TOTAL_TIME[] = "totalDuration"; 53 static constexpr char DEFAULT_TRANSFER_STR[] = "defaultTransfer"; 54 static constexpr char DEFAULT_CLONE_SENDABLE_STR[] = "defaultCloneSendable"; 55 56 class TaskGroup; 57 58 class TaskManager { 59 public: 60 static TaskManager& GetInstance(); 61 62 void StoreTask(Task* task); 63 void RemoveTask(uint32_t taskId); 64 Task* GetTask(uint32_t taskId); 65 void EnqueueTaskId(uint32_t taskId, Priority priority = Priority::DEFAULT); 66 bool EraseWaitingTaskId(uint32_t taskId, Priority priority); 67 std::pair<uint32_t, Priority> DequeueTaskId(); 68 void CancelTask(napi_env env, uint32_t taskId); 69 void CancelSeqRunnerTask(napi_env env, Task* task); 70 void ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask = true); 71 72 // for worker state 73 void NotifyWorkerIdle(Worker* worker); 74 void NotifyWorkerCreated(Worker* worker); 75 void NotifyWorkerRunning(Worker* worker); 76 void RemoveWorker(Worker* worker); 77 void RestoreWorker(Worker* worker); 78 79 // for load balance 80 void InitTaskManager(napi_env env); 81 void UpdateExecutedInfo(uint64_t duration); 82 void TryTriggerExpand(); 83 84 // for taskpool state 85 uint32_t GetTaskNum(); 86 uint32_t GetIdleWorkers(); 87 uint32_t GetThreadNum(); 88 uint32_t GetRunningWorkers(); 89 uint32_t GetTimeoutWorkers(); 90 void GetIdleWorkersList(uint32_t step); 91 bool ReadThreadInfo(pid_t tid, char* buf, uint32_t size); 92 93 // for get thread info 94 napi_value GetThreadInfos(napi_env env); 95 96 // for get task info 97 napi_value GetTaskInfos(napi_env env); 98 99 // for countTrace for worker 100 void CountTraceForWorker(bool needLog = false); 101 void CountTraceForWorkerWithoutLock(bool needLog = false); 102 103 void RegisterCallback(napi_env env, uint32_t taskId, std::shared_ptr<CallbackInfo> callbackInfo, 104 const std::string& type); 105 void IncreaseSendDataRefCount(uint32_t taskId); 106 void DecreaseSendDataRefCount(napi_env env, uint32_t taskId, Task* task = nullptr); 107 void ExecuteSendData(napi_env env, TaskResultInfo* resultInfo, Task* task); 108 109 // for task dependency 110 bool IsDependendByTaskId(uint32_t taskId); 111 bool IsDependentByTaskId(uint32_t dependentTaskId); 112 void NotifyDependencyTaskInfo(uint32_t taskId); 113 void RemoveDependencyById(uint32_t dependentTaskId, uint32_t taskId); 114 bool StoreTaskDependency(uint32_t taskId, std::set<uint32_t> taskIdSet); 115 bool RemoveTaskDependency(uint32_t taskId, uint32_t dependentId); 116 bool CheckCircularDependency(std::set<uint32_t> dependentIdSet, std::set<uint32_t> idSet, uint32_t taskId); 117 void EnqueuePendingTaskInfo(uint32_t taskId, Priority priority); 118 std::pair<uint32_t, Priority> DequeuePendingTaskInfo(uint32_t taskId); 119 void RemovePendingTaskInfo(uint32_t taskId); 120 void StoreDependentTaskInfo(std::set<uint32_t> dependTaskIdSet, uint32_t taskId); 121 void RemoveDependentTaskInfo(uint32_t dependentTaskId, uint32_t taskId); 122 std::string GetTaskDependInfoToString(uint32_t taskId); 123 124 bool PostTask(std::function<void()> task, const char* taskName, Priority priority = Priority::DEFAULT); 125 126 // for duration 127 void StoreTaskDuration(uint32_t taskId, uint64_t totalDuration, uint64_t cpuDuration); 128 uint64_t GetTaskDuration(uint32_t taskId, std::string durationType); 129 void RemoveTaskDuration(uint32_t taskId); 130 void StoreLongTaskInfo(uint32_t taskId, Worker* worker); 131 void RemoveLongTaskInfo(uint32_t taskId); 132 void TerminateTask(uint32_t taskId); 133 Worker* GetLongTaskInfo(uint32_t taskId); 134 135 // for callback 136 void ReleaseCallBackInfo(Task* task); 137 138 void UpdateSystemAppFlag(); 139 IsSystemApp()140 bool IsSystemApp() const 141 { 142 return isSystemApp_; 143 } 144 EnableFfrt()145 bool EnableFfrt() const 146 { 147 return globalEnableFfrtFlag_ || (isSystemApp_ && !disableFfrtFlag_); 148 } 149 150 void BatchRejectDeferred(napi_env env, std::list<napi_deferred> deferreds, std::string error); 151 uint32_t CalculateTaskId(uint64_t id); 152 void ClearDependentTask(uint32_t taskId); 153 void UvReportHisysEvent(Worker* worker, std::string methodName, std::string funName, std::string message, 154 int32_t code); 155 napi_value CancelError(napi_env env, int32_t errCode, const char* errMessage = nullptr, 156 napi_value result = nullptr, bool success = false); 157 void SetIsPerformIdle(bool performIdle); 158 bool IsPerformIdle() const; 159 uint32_t GetNonIdleTaskNum(); 160 uint32_t GetTotalTaskNum() const; 161 162 private: 163 TaskManager(); 164 ~TaskManager(); 165 TaskManager(const TaskManager &) = delete; 166 TaskManager& operator=(const TaskManager &) = delete; 167 TaskManager(TaskManager &&) = delete; 168 TaskManager& operator=(TaskManager &&) = delete; 169 170 void CreateWorkers(napi_env env, uint32_t num = 1); 171 void NotifyExecuteTask(); 172 void NotifyWorkerAdded(Worker* worker); 173 174 // for load balance 175 void RunTaskManager(); 176 void CheckForBlockedWorkers(); 177 template <bool needCheckIdle> void TryExpandWithCheckIdle(); 178 void NotifyShrink(uint32_t targetNum); 179 void TriggerShrink(uint32_t step); 180 uint32_t ComputeSuitableThreadNum(); 181 uint32_t ComputeSuitableIdleNum(); 182 void DispatchAndTryExpandInner(); 183 static void TryExpand(const uv_timer_t* req = nullptr); 184 static void DispatchAndTryExpand(const uv_async_t* req); 185 static void TriggerLoadBalance(const uv_timer_t* req); 186 187 bool IsChooseIdle(); 188 std::pair<uint32_t, Priority> GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue, Priority priority); 189 void IncreaseTaskNum(Priority priority); 190 void DecreaseTaskNum(Priority priority); 191 void RemoveDependTaskByTaskId(uint32_t taskId); 192 void RemoveDependentTaskByTaskId(uint32_t taskId); 193 void CheckTasksAndReportHisysEvent(); 194 void WorkerAliveAndReport(Worker* worker); 195 void WriteHisysForFfrtAndUv(Worker* worker, HisyseventParams* hisyseventParams); 196 void AddCountTraceForWorkerLog(bool needLog, int64_t threadNum, int64_t idleThreadNum, int64_t timeoutThreadNum); 197 198 // <taskId, Task> 199 std::unordered_map<uint32_t, Task*> tasks_ {}; 200 std::recursive_mutex tasksMutex_; 201 202 // <taskId, <dependent taskId1, dependent taskId2, ...>>, update when removeDependency or executeTask 203 std::unordered_map<uint32_t, std::set<uint32_t>> dependTaskInfos_ {}; 204 std::shared_mutex dependTaskInfosMutex_; 205 206 // <dependent taskId, <taskId1, taskId2, ...>>, update when removeDependency or executeTask 207 std::unordered_map<uint32_t, std::set<uint32_t>> dependentTaskInfos_ {}; 208 std::shared_mutex dependentTaskInfosMutex_; 209 210 // <<pendingTaskId1, priority>, <pendingTaskId2, priority>, ...> 211 std::unordered_map<uint32_t, Priority> pendingTaskInfos_ {}; 212 std::shared_mutex pendingTaskInfosMutex_; 213 214 // <<taskId1, <totalDuration1, cpuDuration1>>, <taskId2, <totalDuration2, cpuDuration2>>, ...> 215 std::unordered_map<uint32_t, std::pair<uint64_t, uint64_t>> taskDurationInfos_ {}; 216 std::shared_mutex taskDurationInfosMutex_; 217 218 // record the longTasks and workers for efficiency 219 std::unordered_map<uint32_t, Worker*> longTasksMap_ {}; 220 std::shared_mutex longTasksMutex_{}; 221 222 std::unordered_set<Worker*> workers_ {}; 223 std::unordered_set<Worker*> idleWorkers_ {}; 224 std::unordered_set<Worker*> timeoutWorkers_ {}; 225 std::recursive_mutex workersMutex_; 226 227 // for load balance 228 napi_env hostEnv_ = nullptr; 229 uv_loop_t* loop_ = nullptr; 230 uv_timer_t* balanceTimer_ = nullptr; 231 uv_timer_t* expandTimer_ = nullptr; 232 uv_async_t* dispatchHandle_ = nullptr; 233 std::atomic<bool> suspend_ = false; 234 std::atomic<uint32_t> retryCount_ = 0; 235 std::atomic<uint32_t> expandingCount_ = 0; 236 std::atomic<uint32_t> nonIdleTaskNum_ = 0; 237 std::atomic<uint32_t> totalTaskNum_ = 0; 238 std::atomic<uint32_t> totalExecCount_ = 0; 239 std::atomic<uint64_t> totalExecTime_ = 0; 240 std::atomic<bool> needChecking_ = false; 241 std::atomic<bool> isHandleInited_ = false; 242 std::atomic<uint32_t> timerTriggered_ = false; 243 std::atomic<uint64_t> preDequeneTime_ = 0; 244 std::atomic<uint64_t> reportTime_ = 0; 245 246 // for task priority 247 uint32_t highPrioExecuteCount_ = 0; 248 uint32_t mediumPrioExecuteCount_ = 0; 249 std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {}; 250 std::mutex taskQueuesMutex_; 251 252 std::atomic<bool> isInitialized_ = false; 253 std::atomic<bool> isSystemApp_ = false; 254 int disableFfrtFlag_ = 0; // 0 means enable ffrt 255 int globalEnableFfrtFlag_ = 0; // 0 means not global enable ffrt 256 257 std::mutex callbackMutex_; 258 std::map<uint32_t, std::shared_ptr<CallbackInfo>> callbackTable_ {}; 259 std::vector<Worker*> freeList_ {}; 260 uint32_t maxThreads_ = ConcurrentHelper::GetMaxThreads(); 261 262 #if defined(ENABLE_TASKPOOL_EVENTHANDLER) 263 std::shared_ptr<OHOS::AppExecFwk::EventHandler> mainThreadHandler_ {}; 264 #endif 265 std::atomic<bool> isPerformIdle_ = false; 266 267 friend class TaskGroupManager; 268 friend class NativeEngineTest; 269 }; 270 } // namespace Commonlibrary::Concurrent::TaskPoolModule 271 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H