1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 18 19 #include <array> 20 #include <list> 21 #include <memory> 22 #include <mutex> 23 #include <shared_mutex> 24 #include <unordered_map> 25 #include <unordered_set> 26 27 #include "task.h" 28 #include "task_queue.h" 29 #include "napi/native_api.h" 30 #include "worker.h" 31 32 namespace Commonlibrary::Concurrent::TaskPoolModule { 33 using namespace Commonlibrary::Concurrent::Common; 34 35 static constexpr char FUNCTION_STR[] = "function"; 36 static constexpr char ARGUMENTS_STR[] = "arguments"; 37 static constexpr char TASKID_STR[] = "taskId"; 38 static constexpr char TASKINFO_STR[] = "taskInfo"; 39 static constexpr char TRANSFERLIST_STR[] = "transferList"; 40 static constexpr char GROUP_ID_STR[] = "groupId"; 41 42 class TaskManager { 43 public: 44 TaskManager(); 45 ~TaskManager(); 46 47 static TaskManager& GetInstance(); 48 49 static napi_value IsCanceled(napi_env env, napi_callback_info cbinfo); 50 51 uint32_t GenerateTaskId(); 52 uint32_t GenerateExecuteId(); 53 TaskInfo* GetTaskInfo(uint32_t executeId); 54 TaskInfo* PopTaskInfo(uint32_t executeId); 55 void StoreRunningInfo(uint32_t taskId, uint32_t executeId); 56 void AddExecuteState(uint32_t executeId); 57 bool UpdateExecuteState(uint32_t executeId, ExecuteState state); 58 void RemoveExecuteState(uint32_t executeId); 59 void PopRunningInfo(uint32_t taskId, uint32_t executeId); 60 void PopTaskEnvInfo(napi_env env); 61 void EnqueueExecuteId(uint32_t executeId, Priority priority = Priority::DEFAULT); 62 std::pair<uint32_t, Priority> DequeueExecuteId(); 63 void CancelTask(napi_env env, uint32_t taskId); 64 TaskInfo* GenerateTaskInfo(napi_env env, napi_value func, napi_value args, uint32_t taskId, uint32_t executeId, 65 napi_value transferList = nullptr); 66 TaskInfo* GenerateTaskInfoFromTask(napi_env env, napi_value task, uint32_t executeId); 67 void ReleaseTaskContent(TaskInfo* taskInfo); 68 69 // for worker state 70 void NotifyWorkerIdle(Worker* worker); 71 void NotifyWorkerCreated(Worker* worker); 72 void RemoveWorker(Worker* worker); 73 74 // for load balance 75 void InitTaskManager(napi_env env); 76 void TryTriggerLoadBalance(); 77 void UpdateExecutedInfo(uint64_t duration); 78 79 // for taskpool state 80 uint32_t GetTaskNum(); 81 uint32_t GetThreadNum(); 82 uint32_t GetIdleWorkers(); 83 uint32_t GetRunningWorkers(); 84 uint32_t GetTimeoutWorkers(); 85 86 // for get thread info 87 napi_value GetThreadInfos(napi_env env); 88 89 // for get task info 90 napi_value GetTaskInfos(napi_env env); 91 92 private: 93 TaskManager(const TaskManager &) = delete; 94 TaskManager& operator=(const TaskManager &) = delete; 95 TaskManager(TaskManager &&) = delete; 96 TaskManager& operator=(TaskManager &&) = delete; 97 98 ExecuteState QueryExecuteState(uint32_t executeId); 99 void CreateWorkers(napi_env env, uint32_t num = 1); 100 void NotifyExecuteTask(); 101 void NotifyWorkerAdded(Worker* worker); 102 void StoreTaskInfo(uint32_t executeId, TaskInfo* taskInfo); 103 bool MarkCanceledState(uint32_t executeId); 104 void CancelExecution(napi_env env, uint32_t executeId); 105 106 // for load balance 107 void RunTaskManager(); 108 void StoreTaskEnvInfo(napi_env env); 109 void CheckForBlockedWorkers(); 110 void CreateOrDeleteWorkers(uint32_t targetNum); 111 bool HasTaskEnvInfo(napi_env env); 112 uint32_t ComputeSuitableThreadNum(); 113 static void RestartTimer(const uv_async_t* req); 114 static void TriggerLoadBalance(const uv_timer_t* req = nullptr); 115 116 std::atomic<int32_t> currentExecuteId_ = 1; // 1: executeId begin from 1, 0 for exception 117 std::atomic<int32_t> currentTaskId_ = 1; // 1: task will begin from 1, 0 for func 118 119 // <executeId, TaskInfo> 120 std::unordered_map<uint32_t, TaskInfo*> taskInfos_ {}; 121 std::shared_mutex taskInfosMutex_; 122 123 // <executeId, executeState> 124 std::unordered_map<uint32_t, ExecuteState> executeStates_ {}; 125 std::shared_mutex executeStatesMutex_; 126 127 // <taskId, <executeId1, executeId2, ...>> 128 std::unordered_map<uint32_t, std::list<uint32_t>> runningInfos_ {}; 129 std::shared_mutex runningInfosMutex_; 130 131 std::unordered_map<napi_env, uint32_t> taskEnvInfo_ {}; 132 std::shared_mutex taskEnvInfoMutex_; 133 134 std::unordered_set<Worker*> workers_ {}; 135 std::unordered_set<Worker*> idleWorkers_ {}; 136 std::unordered_set<Worker*> timeoutWorkers_ {}; 137 std::recursive_mutex workersMutex_; 138 139 // for load balance 140 napi_env hostEnv_ = nullptr; 141 uv_loop_t* loop_ = nullptr; 142 uv_timer_t* timer_ = nullptr; 143 uv_async_t* notifyRestartTimer_ = nullptr; 144 std::atomic<bool> suspend_ = false; 145 std::atomic<uint32_t> retryCount_ = 0; 146 std::atomic<uint32_t> totalExecCount_ = 0; 147 std::atomic<uint64_t> totalExecTime_ = 0; 148 std::atomic<uint32_t> expandingCount_ = 0; 149 std::atomic<uint64_t> nextCheckTime_ = 0; 150 151 // for task priority 152 uint32_t highPrioExecuteCount_ = 0; 153 uint32_t mediumPrioExecuteCount_ = 0; 154 std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {}; 155 std::mutex taskQueuesMutex_; 156 157 std::atomic<bool> isInitialized_ = false; 158 159 friend class TaskGroupManager; 160 }; 161 162 class TaskGroupManager { 163 public: 164 TaskGroupManager() = default; 165 ~TaskGroupManager() = default; 166 167 static TaskGroupManager &GetInstance(); 168 169 uint32_t GenerateGroupId(); 170 uint32_t GenerateGroupExecuteId(); 171 void AddTask(uint32_t groupId, napi_ref task); 172 const std::list<napi_ref>& GetTasksByGroup(uint32_t groupId); 173 void ClearTasks(napi_env env, uint32_t groupId); 174 175 GroupInfo* GenerateGroupInfo(napi_env env, uint32_t taskNum, uint32_t groupId, uint32_t groupExecuteId); 176 void ClearGroupInfo(napi_env env, uint32_t groupExecuteId, GroupInfo* groupInfo); 177 void CancelGroup(napi_env env, uint32_t groupId); 178 void RemoveExecuteId(uint32_t groupId, uint32_t groupExecuteId); 179 void ClearExecuteId(uint32_t groupId); 180 bool IsRunning(uint32_t groupExecuteId); 181 GroupInfo* GetGroupInfoByExecutionId(uint32_t groupExecuteId); 182 183 private: 184 TaskGroupManager(const TaskGroupManager &) = delete; 185 TaskGroupManager& operator=(const TaskGroupManager &) = delete; 186 TaskGroupManager(TaskGroupManager &&) = delete; 187 TaskGroupManager& operator=(TaskGroupManager &&) = delete; 188 189 void StoreExecuteId(uint32_t groupId, uint32_t groupExecuteId); 190 191 void StoreRunningExecuteId(uint32_t groupExecuteId); 192 void RemoveRunningExecuteId(uint32_t groupExecuteId); 193 194 void AddGroupInfoById(uint32_t groupExecuteId, GroupInfo* info); 195 void RemoveGroupInfoById(uint32_t groupExecuteId); 196 197 void CancelGroupExecution(uint32_t executeId); 198 199 std::atomic<uint32_t> groupId_ = 0; 200 std::atomic<uint32_t> groupExecuteId_ = 1; // 1: 0 reserved for those tasks not in any group 201 202 // <groupId, <groupExecuteId1, groupExecuteId2, ...>> 203 std::unordered_map<uint32_t, std::list<uint32_t>> groupExecuteIds_ {}; 204 std::mutex groupExecuteIdsMutex_; 205 206 // <groupId, <task1, task2, ...>> 207 std::unordered_map<uint32_t, std::list<napi_ref>> tasks_ {}; 208 std::shared_mutex tasksMutex_; 209 210 // <groupExecuteId1, groupExecuteId2, ...> 211 std::unordered_set<uint32_t> runningGroupExecutions_ {}; 212 std::shared_mutex groupExecutionsMutex_; 213 214 // <<groupExecuteId1, GroupInfo1>, <groupExecuteId2, GroupInfo2>, ...> 215 std::unordered_map<uint32_t, GroupInfo*> groupInfoMap_ {}; 216 std::shared_mutex groupInfoMapMutex_; 217 }; 218 } // namespace Commonlibrary::Concurrent::TaskPoolModule 219 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H