1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 18 19 #include <array> 20 #include <list> 21 #include <memory> 22 #include <mutex> 23 #include <set> 24 #include <shared_mutex> 25 #include <unordered_map> 26 #include <unordered_set> 27 #include <vector> 28 29 #include "napi/native_api.h" 30 #include "sequence_runner.h" 31 #include "task.h" 32 #include "task_queue.h" 33 #include "task_group.h" 34 #include "worker.h" 35 36 37 namespace Commonlibrary::Concurrent::TaskPoolModule { 38 using namespace Commonlibrary::Concurrent::Common; 39 40 static constexpr char ARGUMENTS_STR[] = "arguments"; 41 static constexpr char NAME[] = "name"; 42 static constexpr char FUNCTION_STR[] = "function"; 43 static constexpr char GROUP_ID_STR[] = "groupId"; 44 static constexpr char TASKID_STR[] = "taskId"; 45 static constexpr char TASKINFO_STR[] = "taskInfo"; 46 static constexpr char TRANSFERLIST_STR[] = "transferList"; 47 static constexpr char CLONE_LIST_STR[] = "cloneList"; 48 static constexpr char ADD_DEPENDENCY_STR[] = "addDependency"; 49 static constexpr char REMOVE_DEPENDENCY_STR[] = "removeDependency"; 50 static constexpr char TASK_CPU_TIME[] = "cpuDuration"; 51 static constexpr char TASK_IO_TIME[] = "ioDuration"; 52 static constexpr char TASK_TOTAL_TIME[] = "totalDuration"; 53 static constexpr char DEFAULT_TRANSFER_STR[] = "defaultTransfer"; 54 static constexpr char DEFAULT_CLONE_SENDABLE_STR[] = "defaultCloneSendable"; 55 56 class TaskGroup; 57 58 class TaskManager { 59 public: 60 static TaskManager& GetInstance(); 61 62 void StoreTask(uint64_t taskId, Task* task); 63 void RemoveTask(uint64_t taskId); 64 Task* GetTask(uint64_t taskId); 65 void EnqueueTaskId(uint64_t taskId, Priority priority = Priority::DEFAULT); 66 std::pair<uint64_t, Priority> DequeueTaskId(); 67 void CancelTask(napi_env env, uint64_t taskId); 68 void ReleaseTaskData(napi_env env, Task* task); 69 70 // for worker state 71 void NotifyWorkerIdle(Worker* worker); 72 void NotifyWorkerCreated(Worker* worker); 73 void NotifyWorkerRunning(Worker* worker); 74 void RemoveWorker(Worker* worker); 75 void RestoreWorker(Worker* worker); 76 77 // for load balance 78 void InitTaskManager(napi_env env); 79 void UpdateExecutedInfo(uint64_t duration); 80 void TryTriggerExpand(); 81 82 // for taskpool state 83 uint32_t GetTaskNum(); 84 uint32_t GetIdleWorkers(); 85 uint32_t GetThreadNum(); 86 uint32_t GetRunningWorkers(); 87 uint32_t GetTimeoutWorkers(); 88 void GetIdleWorkersList(uint32_t step); 89 bool ReadThreadInfo(Worker* worker, char* buf, uint32_t size); 90 91 // for get thread info 92 napi_value GetThreadInfos(napi_env env); 93 94 // for get task info 95 napi_value GetTaskInfos(napi_env env); 96 97 // for countTrace for worker 98 void CountTraceForWorker(); 99 100 std::shared_ptr<CallbackInfo> GetCallbackInfo(uint64_t taskId); 101 void RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo); 102 void IncreaseRefCount(uint64_t taskId); 103 void DecreaseRefCount(napi_env env, uint64_t taskId); 104 napi_value NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task); 105 106 // for task dependency 107 bool IsDependendByTaskId(uint64_t taskId); 108 void NotifyDependencyTaskInfo(uint64_t taskId); 109 bool StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet); 110 bool RemoveTaskDependency(uint64_t taskId, uint64_t dependentId); 111 bool CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId); 112 void EnqueuePendingTaskInfo(uint64_t taskId, Priority priority); 113 std::pair<uint64_t, Priority> DequeuePendingTaskInfo(uint64_t taskId); 114 void RemovePendingTaskInfo(uint64_t taskId); 115 void StoreDependentTaskInfo(std::set<uint64_t> dependTaskIdSet, uint64_t taskId); 116 void RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId); 117 118 // for duration 119 void StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration); 120 uint64_t GetTaskDuration(uint64_t taskId, std::string durationType); 121 void RemoveTaskDuration(uint64_t taskId); 122 123 private: 124 TaskManager(); 125 ~TaskManager(); 126 TaskManager(const TaskManager &) = delete; 127 TaskManager& operator=(const TaskManager &) = delete; 128 TaskManager(TaskManager &&) = delete; 129 TaskManager& operator=(TaskManager &&) = delete; 130 131 void CreateWorkers(napi_env env, uint32_t num = 1); 132 void NotifyExecuteTask(); 133 void NotifyWorkerAdded(Worker* worker); 134 135 // for load balance 136 void RunTaskManager(); 137 void CheckForBlockedWorkers(); 138 void TryExpand(); 139 void NotifyShrink(uint32_t targetNum); 140 void TriggerShrink(uint32_t step); 141 uint32_t ComputeSuitableThreadNum(); 142 static void NotifyExpand(const uv_async_t* req); 143 static void TriggerLoadBalance(const uv_timer_t* req = nullptr); 144 145 // <taskId, Task> 146 std::unordered_map<uint64_t, Task*> tasks_ {}; 147 std::shared_mutex tasksMutex_; 148 149 // <taskId, <dependent taskId1, dependent taskId2, ...>>, update when removeDependency or executeTask 150 std::unordered_map<uint64_t, std::set<uint64_t>> dependTaskInfos_ {}; 151 std::shared_mutex dependTaskInfosMutex_; 152 153 // <dependent taskId, <taskId1, taskId2, ...>>, update when removeDependency or executeTask 154 std::unordered_map<uint64_t, std::set<uint64_t>> dependentTaskInfos_ {}; 155 156 // <<pendingTaskId1, priority>, <pendingTaskId2, priority>, ...> 157 std::unordered_map<uint64_t, Priority> pendingTaskInfos_ {}; 158 std::shared_mutex pendingTaskInfosMutex_; 159 160 // <<taskId1, <totalDuration1, cpuDuration1>>, <taskId2, <totalDuration2, cpuDuration2>>, ...> 161 std::unordered_map<uint64_t, std::pair<uint64_t, uint64_t>> taskDurationInfos_ {}; 162 std::shared_mutex taskDurationInfosMutex_; 163 164 std::unordered_set<Worker*> workers_ {}; 165 std::unordered_set<Worker*> idleWorkers_ {}; 166 std::unordered_set<Worker*> timeoutWorkers_ {}; 167 std::recursive_mutex workersMutex_; 168 169 // for load balance 170 napi_env hostEnv_ = nullptr; 171 uv_loop_t* loop_ = nullptr; 172 uv_timer_t* timer_ = nullptr; 173 uv_async_t* expandHandle_ = nullptr; 174 std::atomic<bool> suspend_ = false; 175 std::atomic<uint32_t> retryCount_ = 0; 176 std::atomic<uint32_t> expandingCount_ = 0; 177 std::atomic<uint32_t> totalExecCount_ = 0; 178 std::atomic<uint64_t> totalExecTime_ = 0; 179 std::atomic<bool> needChecking_ = false; 180 181 // for task priority 182 uint32_t highPrioExecuteCount_ = 0; 183 uint32_t mediumPrioExecuteCount_ = 0; 184 std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {}; 185 std::mutex taskQueuesMutex_; 186 187 std::atomic<bool> isInitialized_ = false; 188 189 std::mutex callbackMutex_; 190 std::map<uint32_t, std::shared_ptr<CallbackInfo>> callbackTable_ {}; 191 std::vector<Worker*> freeList_ {}; 192 friend class TaskGroupManager; 193 }; 194 195 class TaskGroupManager { 196 public: 197 TaskGroupManager() = default; 198 ~TaskGroupManager() = default; 199 200 static TaskGroupManager &GetInstance(); 201 202 void AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId); 203 void StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup); 204 void RemoveTaskGroup(uint64_t groupId); 205 TaskGroup* GetTaskGroup(uint64_t groupId); 206 void CancelGroup(napi_env env, uint64_t groupId); 207 void CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group); 208 void ReleaseTaskGroupData(napi_env env, TaskGroup* group); 209 void UpdateGroupState(uint64_t groupId); 210 211 void AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task); 212 bool TriggerSeqRunner(napi_env env, Task* lastTask); 213 void StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner); 214 void RemoveSequenceRunner(uint64_t seqRunnerId); 215 SequenceRunner* GetSeqRunner(uint64_t seqRunnerId); 216 217 private: 218 TaskGroupManager(const TaskGroupManager &) = delete; 219 TaskGroupManager& operator=(const TaskGroupManager &) = delete; 220 TaskGroupManager(TaskGroupManager &&) = delete; 221 TaskGroupManager& operator=(TaskGroupManager &&) = delete; 222 223 // <groupId, TaskGroup> 224 std::unordered_map<uint64_t, TaskGroup*> taskGroups_ {}; 225 std::mutex taskGroupsMutex_; 226 227 // <seqRunnerId, SequenceRunner> 228 std::unordered_map<uint64_t, SequenceRunner*> seqRunners_ {}; 229 std::mutex seqRunnersMutex_; 230 }; 231 } // namespace Commonlibrary::Concurrent::TaskPoolModule 232 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H