/* * Copyright (c) 2022 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H #include #include #include #include #include #include #include #include "task.h" #include "task_queue.h" #include "napi/native_api.h" #include "worker.h" namespace Commonlibrary::Concurrent::TaskPoolModule { using namespace Commonlibrary::Concurrent::Common; static constexpr char FUNCTION_STR[] = "function"; static constexpr char ARGUMENTS_STR[] = "arguments"; static constexpr char TASKID_STR[] = "taskId"; static constexpr char TASKINFO_STR[] = "taskInfo"; static constexpr char TRANSFERLIST_STR[] = "transferList"; static constexpr char GROUP_ID_STR[] = "groupId"; class TaskManager { public: TaskManager(); ~TaskManager(); static TaskManager& GetInstance(); static napi_value IsCanceled(napi_env env, napi_callback_info cbinfo); uint32_t GenerateTaskId(); uint32_t GenerateExecuteId(); TaskInfo* GetTaskInfo(uint32_t executeId); TaskInfo* PopTaskInfo(uint32_t executeId); void StoreRunningInfo(uint32_t taskId, uint32_t executeId); void AddExecuteState(uint32_t executeId); bool UpdateExecuteState(uint32_t executeId, ExecuteState state); void RemoveExecuteState(uint32_t executeId); void PopRunningInfo(uint32_t taskId, uint32_t executeId); void PopTaskEnvInfo(napi_env env); void EnqueueExecuteId(uint32_t executeId, Priority priority = Priority::DEFAULT); std::pair DequeueExecuteId(); void CancelTask(napi_env env, uint32_t taskId); TaskInfo* GenerateTaskInfo(napi_env env, napi_value func, napi_value args, uint32_t taskId, uint32_t executeId, napi_value transferList = nullptr); TaskInfo* GenerateTaskInfoFromTask(napi_env env, napi_value task, uint32_t executeId); void ReleaseTaskContent(TaskInfo* taskInfo); // for worker state void NotifyWorkerIdle(Worker* worker); void NotifyWorkerCreated(Worker* worker); void RemoveWorker(Worker* worker); // for load balance void InitTaskManager(napi_env env); void TryTriggerLoadBalance(); void UpdateExecutedInfo(uint64_t duration); // for taskpool state uint32_t GetTaskNum(); uint32_t GetThreadNum(); uint32_t GetIdleWorkers(); uint32_t GetRunningWorkers(); uint32_t GetTimeoutWorkers(); // for get thread info napi_value GetThreadInfos(napi_env env); // for get task info napi_value GetTaskInfos(napi_env env); private: TaskManager(const TaskManager &) = delete; TaskManager& operator=(const TaskManager &) = delete; TaskManager(TaskManager &&) = delete; TaskManager& operator=(TaskManager &&) = delete; ExecuteState QueryExecuteState(uint32_t executeId); void CreateWorkers(napi_env env, uint32_t num = 1); void NotifyExecuteTask(); void NotifyWorkerAdded(Worker* worker); void StoreTaskInfo(uint32_t executeId, TaskInfo* taskInfo); bool MarkCanceledState(uint32_t executeId); void CancelExecution(napi_env env, uint32_t executeId); // for load balance void RunTaskManager(); void StoreTaskEnvInfo(napi_env env); void CheckForBlockedWorkers(); void CreateOrDeleteWorkers(uint32_t targetNum); bool HasTaskEnvInfo(napi_env env); uint32_t ComputeSuitableThreadNum(); static void RestartTimer(const uv_async_t* req); static void TriggerLoadBalance(const uv_timer_t* req = nullptr); std::atomic currentExecuteId_ = 1; // 1: executeId begin from 1, 0 for exception std::atomic currentTaskId_ = 1; // 1: task will begin from 1, 0 for func // std::unordered_map taskInfos_ {}; std::shared_mutex taskInfosMutex_; // std::unordered_map executeStates_ {}; std::shared_mutex executeStatesMutex_; // > std::unordered_map> runningInfos_ {}; std::shared_mutex runningInfosMutex_; std::unordered_map taskEnvInfo_ {}; std::shared_mutex taskEnvInfoMutex_; std::unordered_set workers_ {}; std::unordered_set idleWorkers_ {}; std::unordered_set timeoutWorkers_ {}; std::recursive_mutex workersMutex_; // for load balance napi_env hostEnv_ = nullptr; uv_loop_t* loop_ = nullptr; uv_timer_t* timer_ = nullptr; uv_async_t* notifyRestartTimer_ = nullptr; std::atomic suspend_ = false; std::atomic retryCount_ = 0; std::atomic totalExecCount_ = 0; std::atomic totalExecTime_ = 0; std::atomic expandingCount_ = 0; std::atomic nextCheckTime_ = 0; // for task priority uint32_t highPrioExecuteCount_ = 0; uint32_t mediumPrioExecuteCount_ = 0; std::array, Priority::NUMBER> taskQueues_ {}; std::mutex taskQueuesMutex_; std::atomic isInitialized_ = false; friend class TaskGroupManager; }; class TaskGroupManager { public: TaskGroupManager() = default; ~TaskGroupManager() = default; static TaskGroupManager &GetInstance(); uint32_t GenerateGroupId(); uint32_t GenerateGroupExecuteId(); void AddTask(uint32_t groupId, napi_ref task); const std::list& GetTasksByGroup(uint32_t groupId); void ClearTasks(napi_env env, uint32_t groupId); GroupInfo* GenerateGroupInfo(napi_env env, uint32_t taskNum, uint32_t groupId, uint32_t groupExecuteId); void ClearGroupInfo(napi_env env, uint32_t groupExecuteId, GroupInfo* groupInfo); void CancelGroup(napi_env env, uint32_t groupId); void RemoveExecuteId(uint32_t groupId, uint32_t groupExecuteId); void ClearExecuteId(uint32_t groupId); bool IsRunning(uint32_t groupExecuteId); GroupInfo* GetGroupInfoByExecutionId(uint32_t groupExecuteId); private: TaskGroupManager(const TaskGroupManager &) = delete; TaskGroupManager& operator=(const TaskGroupManager &) = delete; TaskGroupManager(TaskGroupManager &&) = delete; TaskGroupManager& operator=(TaskGroupManager &&) = delete; void StoreExecuteId(uint32_t groupId, uint32_t groupExecuteId); void StoreRunningExecuteId(uint32_t groupExecuteId); void RemoveRunningExecuteId(uint32_t groupExecuteId); void AddGroupInfoById(uint32_t groupExecuteId, GroupInfo* info); void RemoveGroupInfoById(uint32_t groupExecuteId); void CancelGroupExecution(uint32_t executeId); std::atomic groupId_ = 0; std::atomic groupExecuteId_ = 1; // 1: 0 reserved for those tasks not in any group // > std::unordered_map> groupExecuteIds_ {}; std::mutex groupExecuteIdsMutex_; // > std::unordered_map> tasks_ {}; std::shared_mutex tasksMutex_; // std::unordered_set runningGroupExecutions_ {}; std::shared_mutex groupExecutionsMutex_; // <, , ...> std::unordered_map groupInfoMap_ {}; std::shared_mutex groupInfoMapMutex_; }; } // namespace Commonlibrary::Concurrent::TaskPoolModule #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H