1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 18 19 #include <array> 20 #include <list> 21 #include <memory> 22 #include <mutex> 23 #include <set> 24 #include <shared_mutex> 25 #include <unordered_map> 26 #include <unordered_set> 27 #include <vector> 28 29 #include "dfx_hisys_event.h" 30 #include "napi/native_api.h" 31 #include "sequence_runner.h" 32 #include "task.h" 33 #include "task_queue.h" 34 #include "task_group.h" 35 #include "worker.h" 36 37 namespace Commonlibrary::Concurrent::TaskPoolModule { 38 using namespace Commonlibrary::Concurrent::Common; 39 40 static constexpr char ARGUMENTS_STR[] = "arguments"; 41 static constexpr char NAME[] = "name"; 42 static constexpr char FUNCTION_STR[] = "function"; 43 static constexpr char GROUP_ID_STR[] = "groupId"; 44 static constexpr char TASKID_STR[] = "taskId"; 45 static constexpr char TASKINFO_STR[] = "taskInfo"; 46 static constexpr char TRANSFERLIST_STR[] = "transferList"; 47 static constexpr char CLONE_LIST_STR[] = "cloneList"; 48 static constexpr char ADD_DEPENDENCY_STR[] = "addDependency"; 49 static constexpr char REMOVE_DEPENDENCY_STR[] = "removeDependency"; 50 static constexpr char TASK_CPU_TIME[] = "cpuDuration"; 51 static constexpr char TASK_IO_TIME[] = "ioDuration"; 52 static constexpr char TASK_TOTAL_TIME[] = "totalDuration"; 53 static constexpr char DEFAULT_TRANSFER_STR[] = "defaultTransfer"; 54 static constexpr char DEFAULT_CLONE_SENDABLE_STR[] = "defaultCloneSendable"; 55 56 class TaskGroup; 57 58 class TaskManager { 59 public: 60 static TaskManager& GetInstance(); 61 62 void StoreTask(Task* task); 63 void RemoveTask(uint32_t taskId); 64 Task* GetTask(uint32_t taskId); 65 void EnqueueTaskId(uint32_t taskId, Priority priority = Priority::DEFAULT); 66 bool EraseWaitingTaskId(uint32_t taskId, Priority priority); 67 std::pair<uint32_t, Priority> DequeueTaskId(); 68 void CancelTask(napi_env env, uint32_t taskId); 69 void CancelSeqRunnerTask(napi_env env, Task* task); 70 void ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask = true); 71 72 // for worker state 73 void NotifyWorkerIdle(Worker* worker); 74 void NotifyWorkerCreated(Worker* worker); 75 void NotifyWorkerRunning(Worker* worker); 76 void RemoveWorker(Worker* worker); 77 void RestoreWorker(Worker* worker); 78 79 // for load balance 80 void InitTaskManager(napi_env env); 81 void UpdateExecutedInfo(uint64_t duration); 82 void TryTriggerExpand(); 83 84 // for taskpool state 85 uint32_t GetTaskNum(); 86 uint32_t GetIdleWorkers(); 87 uint32_t GetThreadNum(); 88 uint32_t GetRunningWorkers(); 89 uint32_t GetTimeoutWorkers(); 90 void GetIdleWorkersList(uint32_t step); 91 bool ReadThreadInfo(pid_t tid, char* buf, uint32_t size); 92 93 // for get thread info 94 napi_value GetThreadInfos(napi_env env); 95 96 // for get task info 97 napi_value GetTaskInfos(napi_env env); 98 99 // for countTrace for worker 100 void CountTraceForWorker(); 101 102 std::shared_ptr<CallbackInfo> GetCallbackInfo(uint32_t taskId); 103 void RegisterCallback(napi_env env, uint32_t taskId, std::shared_ptr<CallbackInfo> callbackInfo); 104 void IncreaseRefCount(uint32_t taskId); 105 void DecreaseRefCount(napi_env env, uint32_t taskId); 106 napi_value NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task); 107 MsgQueue* GetMessageQueue(const uv_async_t* req); 108 MsgQueue* GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo); 109 void ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo>& callbackInfo); 110 111 // for task dependency 112 bool IsDependendByTaskId(uint32_t taskId); 113 bool IsDependentByTaskId(uint32_t dependentTaskId); 114 void NotifyDependencyTaskInfo(uint32_t taskId); 115 void RemoveDependencyById(uint32_t dependentTaskId, uint32_t taskId); 116 bool StoreTaskDependency(uint32_t taskId, std::set<uint32_t> taskIdSet); 117 bool RemoveTaskDependency(uint32_t taskId, uint32_t dependentId); 118 bool CheckCircularDependency(std::set<uint32_t> dependentIdSet, std::set<uint32_t> idSet, uint32_t taskId); 119 void EnqueuePendingTaskInfo(uint32_t taskId, Priority priority); 120 std::pair<uint32_t, Priority> DequeuePendingTaskInfo(uint32_t taskId); 121 void RemovePendingTaskInfo(uint32_t taskId); 122 void StoreDependentTaskInfo(std::set<uint32_t> dependTaskIdSet, uint32_t taskId); 123 void RemoveDependentTaskInfo(uint32_t dependentTaskId, uint32_t taskId); 124 std::string GetTaskDependInfoToString(uint32_t taskId); 125 126 bool PostTask(std::function<void()> task, const char* taskName, Priority priority = Priority::DEFAULT); 127 128 // for duration 129 void StoreTaskDuration(uint32_t taskId, uint64_t totalDuration, uint64_t cpuDuration); 130 uint64_t GetTaskDuration(uint32_t taskId, std::string durationType); 131 void RemoveTaskDuration(uint32_t taskId); 132 void StoreLongTaskInfo(uint32_t taskId, Worker* worker); 133 void RemoveLongTaskInfo(uint32_t taskId); 134 void TerminateTask(uint32_t taskId); 135 Worker* GetLongTaskInfo(uint32_t taskId); 136 137 // for callback 138 void ReleaseCallBackInfo(Task* task); 139 140 void UpdateSystemAppFlag(); IsSystemApp()141 bool IsSystemApp() const 142 { 143 return isSystemApp_; 144 } EnableFfrt()145 bool EnableFfrt() const 146 { 147 return globalEnableFfrtFlag_ || (isSystemApp_ && !disableFfrtFlag_); 148 } 149 150 bool CheckTask(uint32_t taskId); 151 void BatchRejectDeferred(napi_env env, std::list<napi_deferred> deferreds, std::string error); 152 uint32_t CalculateTaskId(uint64_t id); 153 void ClearDependentTask(uint32_t taskId); 154 void UvReportHisysEvent(Worker* worker, std::string methodName, std::string funName, std::string message, 155 int32_t code); 156 157 private: 158 TaskManager(); 159 ~TaskManager(); 160 TaskManager(const TaskManager &) = delete; 161 TaskManager& operator=(const TaskManager &) = delete; 162 TaskManager(TaskManager &&) = delete; 163 TaskManager& operator=(TaskManager &&) = delete; 164 165 void CreateWorkers(napi_env env, uint32_t num = 1); 166 void NotifyExecuteTask(); 167 void NotifyWorkerAdded(Worker* worker); 168 169 // for load balance 170 void RunTaskManager(); 171 void CheckForBlockedWorkers(); 172 template <bool needCheckIdle> void TryExpandWithCheckIdle(); 173 void NotifyShrink(uint32_t targetNum); 174 void TriggerShrink(uint32_t step); 175 uint32_t ComputeSuitableThreadNum(); 176 uint32_t ComputeSuitableIdleNum(); 177 void DispatchAndTryExpandInner(); 178 static void TryExpand(const uv_timer_t* req = nullptr); 179 static void DispatchAndTryExpand(const uv_async_t* req); 180 static void TriggerLoadBalance(const uv_timer_t* req); 181 182 bool IsChooseIdle(); 183 uint32_t GetNonIdleTaskNum(); 184 std::pair<uint32_t, Priority> GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue, Priority priority); 185 void IncreaseNumIfNoIdle(Priority priority); 186 void DecreaseNumIfNoIdle(Priority priority); 187 void RemoveDependTaskByTaskId(uint32_t taskId); 188 void RemoveDependentTaskByTaskId(uint32_t taskId); 189 void CheckTasksAndReportHisysEvent(); 190 void WorkerAliveAndReport(Worker* worker); 191 void WriteHisysForFfrtAndUv(Worker* worker, HisyseventParams* hisyseventParams); 192 193 // <taskId, Task> 194 std::unordered_map<uint32_t, Task*> tasks_ {}; 195 std::recursive_mutex tasksMutex_; 196 197 // <taskId, <dependent taskId1, dependent taskId2, ...>>, update when removeDependency or executeTask 198 std::unordered_map<uint32_t, std::set<uint32_t>> dependTaskInfos_ {}; 199 std::shared_mutex dependTaskInfosMutex_; 200 201 // <dependent taskId, <taskId1, taskId2, ...>>, update when removeDependency or executeTask 202 std::unordered_map<uint32_t, std::set<uint32_t>> dependentTaskInfos_ {}; 203 std::shared_mutex dependentTaskInfosMutex_; 204 205 // <<pendingTaskId1, priority>, <pendingTaskId2, priority>, ...> 206 std::unordered_map<uint32_t, Priority> pendingTaskInfos_ {}; 207 std::shared_mutex pendingTaskInfosMutex_; 208 209 // <<taskId1, <totalDuration1, cpuDuration1>>, <taskId2, <totalDuration2, cpuDuration2>>, ...> 210 std::unordered_map<uint32_t, std::pair<uint64_t, uint64_t>> taskDurationInfos_ {}; 211 std::shared_mutex taskDurationInfosMutex_; 212 213 // record the longTasks and workers for efficiency 214 std::unordered_map<uint32_t, Worker*> longTasksMap_ {}; 215 std::shared_mutex longTasksMutex_{}; 216 217 std::unordered_set<Worker*> workers_ {}; 218 std::unordered_set<Worker*> idleWorkers_ {}; 219 std::unordered_set<Worker*> timeoutWorkers_ {}; 220 std::recursive_mutex workersMutex_; 221 222 // for load balance 223 napi_env hostEnv_ = nullptr; 224 uv_loop_t* loop_ = nullptr; 225 uv_timer_t* balanceTimer_ = nullptr; 226 uv_timer_t* expandTimer_ = nullptr; 227 uv_async_t* dispatchHandle_ = nullptr; 228 std::atomic<bool> suspend_ = false; 229 std::atomic<uint32_t> retryCount_ = 0; 230 std::atomic<uint32_t> expandingCount_ = 0; 231 std::atomic<uint32_t> nonIdleTaskNum_ = 0; 232 std::atomic<uint32_t> totalExecCount_ = 0; 233 std::atomic<uint64_t> totalExecTime_ = 0; 234 std::atomic<bool> needChecking_ = false; 235 std::atomic<bool> isHandleInited_ = false; 236 std::atomic<uint32_t> timerTriggered_ = false; 237 std::atomic<uint64_t> preDequeneTime_ = 0; 238 std::atomic<uint64_t> reportTime_ = 0; 239 240 // for task priority 241 uint32_t highPrioExecuteCount_ = 0; 242 uint32_t mediumPrioExecuteCount_ = 0; 243 std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {}; 244 std::mutex taskQueuesMutex_; 245 246 std::atomic<bool> isInitialized_ = false; 247 std::atomic<bool> isSystemApp_ = false; 248 int disableFfrtFlag_ = 0; // 0 means enable ffrt 249 int globalEnableFfrtFlag_ = 0; // 0 means not global enable ffrt 250 251 std::mutex callbackMutex_; 252 std::map<uint32_t, std::shared_ptr<CallbackInfo>> callbackTable_ {}; 253 std::vector<Worker*> freeList_ {}; 254 uint32_t maxThreads_ = ConcurrentHelper::GetMaxThreads(); 255 256 #if defined(ENABLE_TASKPOOL_EVENTHANDLER) 257 std::shared_ptr<OHOS::AppExecFwk::EventHandler> mainThreadHandler_ {}; 258 #endif 259 260 friend class TaskGroupManager; 261 friend class NativeEngineTest; 262 }; 263 } // namespace Commonlibrary::Concurrent::TaskPoolModule 264 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H