1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 18 19 #include <array> 20 #include <list> 21 #include <memory> 22 #include <mutex> 23 #include <set> 24 #include <shared_mutex> 25 #include <unordered_map> 26 #include <unordered_set> 27 #include <vector> 28 29 #include "napi/native_api.h" 30 #include "sequence_runner.h" 31 #include "task.h" 32 #include "task_queue.h" 33 #include "task_group.h" 34 #include "worker.h" 35 36 namespace Commonlibrary::Concurrent::TaskPoolModule { 37 using namespace Commonlibrary::Concurrent::Common; 38 39 static constexpr char ARGUMENTS_STR[] = "arguments"; 40 static constexpr char NAME[] = "name"; 41 static constexpr char FUNCTION_STR[] = "function"; 42 static constexpr char GROUP_ID_STR[] = "groupId"; 43 static constexpr char TASKID_STR[] = "taskId"; 44 static constexpr char TASKINFO_STR[] = "taskInfo"; 45 static constexpr char TRANSFERLIST_STR[] = "transferList"; 46 static constexpr char CLONE_LIST_STR[] = "cloneList"; 47 static constexpr char ADD_DEPENDENCY_STR[] = "addDependency"; 48 static constexpr char REMOVE_DEPENDENCY_STR[] = "removeDependency"; 49 static constexpr char TASK_CPU_TIME[] = "cpuDuration"; 50 static constexpr char TASK_IO_TIME[] = "ioDuration"; 51 static constexpr char TASK_TOTAL_TIME[] = "totalDuration"; 52 static constexpr char DEFAULT_TRANSFER_STR[] = "defaultTransfer"; 53 static constexpr char DEFAULT_CLONE_SENDABLE_STR[] = "defaultCloneSendable"; 54 55 class TaskGroup; 56 57 class TaskManager { 58 public: 59 static TaskManager& GetInstance(); 60 61 void StoreTask(uint64_t taskId, Task* task); 62 void RemoveTask(uint64_t taskId); 63 Task* GetTask(uint64_t taskId); 64 void EnqueueTaskId(uint64_t taskId, Priority priority = Priority::DEFAULT); 65 bool EraseWaitingTaskId(uint64_t taskId, Priority priority); 66 std::pair<uint64_t, Priority> DequeueTaskId(); 67 void CancelTask(napi_env env, uint64_t taskId); 68 void CancelSeqRunnerTask(napi_env env, Task* task); 69 void ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask = true); 70 71 // for worker state 72 void NotifyWorkerIdle(Worker* worker); 73 void NotifyWorkerCreated(Worker* worker); 74 void NotifyWorkerRunning(Worker* worker); 75 void RemoveWorker(Worker* worker); 76 void RestoreWorker(Worker* worker); 77 78 // for load balance 79 void InitTaskManager(napi_env env); 80 void UpdateExecutedInfo(uint64_t duration); 81 void TryTriggerExpand(); 82 83 // for taskpool state 84 uint32_t GetTaskNum(); 85 uint32_t GetIdleWorkers(); 86 uint32_t GetThreadNum(); 87 uint32_t GetRunningWorkers(); 88 uint32_t GetTimeoutWorkers(); 89 void GetIdleWorkersList(uint32_t step); 90 bool ReadThreadInfo(pid_t tid, char* buf, uint32_t size); 91 92 // for get thread info 93 napi_value GetThreadInfos(napi_env env); 94 95 // for get task info 96 napi_value GetTaskInfos(napi_env env); 97 98 // for get task name 99 std::string GetTaskName(uint64_t taskId); 100 101 // for countTrace for worker 102 void CountTraceForWorker(); 103 104 std::shared_ptr<CallbackInfo> GetCallbackInfo(uint64_t taskId); 105 void RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo); 106 void IncreaseRefCount(uint64_t taskId); 107 void DecreaseRefCount(napi_env env, uint64_t taskId); 108 napi_value NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task); 109 MsgQueue* GetMessageQueue(const uv_async_t* req); 110 MsgQueue* GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo); 111 void ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo>& callbackInfo); 112 113 // for task dependency 114 bool IsDependendByTaskId(uint64_t taskId); 115 bool IsDependentByTaskId(uint64_t dependentTaskId); 116 void NotifyDependencyTaskInfo(uint64_t taskId); 117 void RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId); 118 bool StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet); 119 bool RemoveTaskDependency(uint64_t taskId, uint64_t dependentId); 120 bool CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId); 121 void EnqueuePendingTaskInfo(uint64_t taskId, Priority priority); 122 std::pair<uint64_t, Priority> DequeuePendingTaskInfo(uint64_t taskId); 123 void RemovePendingTaskInfo(uint64_t taskId); 124 void StoreDependentTaskInfo(std::set<uint64_t> dependTaskIdSet, uint64_t taskId); 125 void RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId); 126 std::string GetTaskDependInfoToString(uint64_t taskId); 127 128 bool PostTask(std::function<void()> task, const char* taskName, Priority priority = Priority::DEFAULT); 129 130 // for duration 131 void StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration); 132 uint64_t GetTaskDuration(uint64_t taskId, std::string durationType); 133 void RemoveTaskDuration(uint64_t taskId); 134 void StoreLongTaskInfo(uint64_t taskId, Worker* worker); 135 void RemoveLongTaskInfo(uint64_t taskId); 136 void TerminateTask(uint64_t taskId); 137 Worker* GetLongTaskInfo(uint64_t taskId); 138 139 // for callback 140 void ReleaseCallBackInfo(Task* task); 141 142 void UpdateSystemAppFlag(); IsSystemApp()143 bool IsSystemApp() const 144 { 145 return isSystemApp_; 146 } EnableFfrt()147 bool EnableFfrt() const 148 { 149 return globalEnableFfrtFlag_ || (isSystemApp_ && !disableFfrtFlag_); 150 } 151 152 bool CheckTask(uint64_t taskId); 153 void BatchRejectDeferred(napi_env env, std::list<napi_deferred> deferreds, std::string error); 154 155 private: 156 TaskManager(); 157 ~TaskManager(); 158 TaskManager(const TaskManager &) = delete; 159 TaskManager& operator=(const TaskManager &) = delete; 160 TaskManager(TaskManager &&) = delete; 161 TaskManager& operator=(TaskManager &&) = delete; 162 163 void CreateWorkers(napi_env env, uint32_t num = 1); 164 void NotifyExecuteTask(); 165 void NotifyWorkerAdded(Worker* worker); 166 167 // for load balance 168 void RunTaskManager(); 169 void CheckForBlockedWorkers(); 170 void TryExpand(); 171 void NotifyShrink(uint32_t targetNum); 172 void TriggerShrink(uint32_t step); 173 uint32_t ComputeSuitableThreadNum(); 174 uint32_t ComputeSuitableIdleNum(); 175 static void NotifyExpand(const uv_async_t* req); 176 static void TriggerLoadBalance(const uv_timer_t* req = nullptr); 177 178 bool IsChooseIdle(); 179 uint32_t GetNonIdleTaskNum(); 180 std::pair<uint64_t, Priority> GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue, Priority priority); 181 void IncreaseNumIfNoIdle(Priority priority); 182 void DecreaseNumIfNoIdle(Priority priority); 183 184 // <taskId, Task> 185 std::unordered_map<uint64_t, Task*> tasks_ {}; 186 RECURSIVE_MUTEX tasksMutex_; 187 188 // <taskId, <dependent taskId1, dependent taskId2, ...>>, update when removeDependency or executeTask 189 std::unordered_map<uint64_t, std::set<uint64_t>> dependTaskInfos_ {}; 190 std::shared_mutex dependTaskInfosMutex_; 191 192 // <dependent taskId, <taskId1, taskId2, ...>>, update when removeDependency or executeTask 193 std::unordered_map<uint64_t, std::set<uint64_t>> dependentTaskInfos_ {}; 194 std::shared_mutex dependentTaskInfosMutex_; 195 196 // <<pendingTaskId1, priority>, <pendingTaskId2, priority>, ...> 197 std::unordered_map<uint64_t, Priority> pendingTaskInfos_ {}; 198 std::shared_mutex pendingTaskInfosMutex_; 199 200 // <<taskId1, <totalDuration1, cpuDuration1>>, <taskId2, <totalDuration2, cpuDuration2>>, ...> 201 std::unordered_map<uint64_t, std::pair<uint64_t, uint64_t>> taskDurationInfos_ {}; 202 std::shared_mutex taskDurationInfosMutex_; 203 204 // record the longTasks and workers for efficiency 205 std::unordered_map<uint64_t, Worker*> longTasksMap_ {}; 206 std::shared_mutex longTasksMutex_{}; 207 208 std::unordered_set<Worker*> workers_ {}; 209 std::unordered_set<Worker*> idleWorkers_ {}; 210 std::unordered_set<Worker*> timeoutWorkers_ {}; 211 RECURSIVE_MUTEX workersMutex_; 212 213 // for load balance 214 napi_env hostEnv_ = nullptr; 215 uv_loop_t* loop_ = nullptr; 216 uv_timer_t* timer_ = nullptr; 217 uv_async_t* expandHandle_ = nullptr; 218 std::atomic<bool> suspend_ = false; 219 std::atomic<uint32_t> retryCount_ = 0; 220 std::atomic<uint32_t> nonIdleTaskNum_ = 0; 221 std::atomic<uint32_t> totalExecCount_ = 0; 222 std::atomic<uint64_t> totalExecTime_ = 0; 223 std::atomic<bool> needChecking_ = false; 224 std::atomic<bool> isHandleInited_ = false; 225 226 // for task priority 227 uint32_t highPrioExecuteCount_ = 0; 228 uint32_t mediumPrioExecuteCount_ = 0; 229 std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {}; 230 std::mutex taskQueuesMutex_; 231 232 std::atomic<bool> isInitialized_ = false; 233 std::atomic<bool> isSystemApp_ = false; 234 int disableFfrtFlag_ = 0; // 0 means enable ffrt 235 int globalEnableFfrtFlag_ = 0; // 0 means not global enable ffrt 236 237 std::mutex callbackMutex_; 238 std::map<uint32_t, std::shared_ptr<CallbackInfo>> callbackTable_ {}; 239 std::vector<Worker*> freeList_ {}; 240 241 #if defined(ENABLE_TASKPOOL_EVENTHANDLER) 242 std::shared_ptr<OHOS::AppExecFwk::EventHandler> mainThreadHandler_ {}; 243 #endif 244 245 friend class TaskGroupManager; 246 friend class NativeEngineTest; 247 }; 248 249 class TaskGroupManager { 250 public: 251 TaskGroupManager() = default; 252 ~TaskGroupManager() = default; 253 254 static TaskGroupManager &GetInstance(); 255 256 void AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId); 257 void StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup); 258 void RemoveTaskGroup(uint64_t groupId); 259 TaskGroup* GetTaskGroup(uint64_t groupId); 260 void CancelGroup(napi_env env, uint64_t groupId); 261 void CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group); 262 void ReleaseTaskGroupData(napi_env env, TaskGroup* group); 263 bool UpdateGroupState(uint64_t groupId); 264 265 void AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task); 266 bool TriggerSeqRunner(napi_env env, Task* lastTask); 267 void DisposeCanceledTask(napi_env env, Task* task); 268 void StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner); 269 void RemoveSequenceRunner(uint64_t seqRunnerId); 270 SequenceRunner* GetSeqRunner(uint64_t seqRunnerId); 271 272 private: 273 TaskGroupManager(const TaskGroupManager &) = delete; 274 TaskGroupManager& operator=(const TaskGroupManager &) = delete; 275 TaskGroupManager(TaskGroupManager &&) = delete; 276 TaskGroupManager& operator=(TaskGroupManager &&) = delete; 277 278 // <groupId, TaskGroup> 279 std::unordered_map<uint64_t, TaskGroup*> taskGroups_ {}; 280 std::mutex taskGroupsMutex_; 281 282 // <seqRunnerId, SequenceRunner> 283 std::unordered_map<uint64_t, SequenceRunner*> seqRunners_ {}; 284 std::mutex seqRunnersMutex_; 285 friend class NativeEngineTest; 286 }; 287 288 class SequenceRunnerManager { 289 public: 290 SequenceRunnerManager() = default; 291 ~SequenceRunnerManager() = default; 292 293 static SequenceRunnerManager &GetInstance(); 294 SequenceRunner* CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc, 295 const std::string &name, uint32_t priority); 296 uint64_t DecreaseSeqCount(SequenceRunner* seqRunner); 297 void RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner); 298 void RemoveSequenceRunner(const std::string &name); 299 bool TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner); 300 void GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner); 301 bool IncreaseGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner); 302 void RemoveWaitingTask(Task* task); 303 304 private: 305 SequenceRunnerManager(const SequenceRunnerManager &) = delete; 306 SequenceRunnerManager& operator=(const SequenceRunnerManager &) = delete; 307 SequenceRunnerManager(SequenceRunnerManager &&) = delete; 308 SequenceRunnerManager& operator=(SequenceRunnerManager &&) = delete; 309 310 // <<name1, seqRunner>, <name2, seqRunner>, ...> 311 std::unordered_map<std::string, SequenceRunner*> globalSeqRunner_ {}; 312 std::mutex globalSeqRunnerMutex_; 313 }; 314 } // namespace Commonlibrary::Concurrent::TaskPoolModule 315 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 316