• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
18 
19 #include <array>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <shared_mutex>
24 #include <unordered_map>
25 #include <unordered_set>
26 
27 #include "task.h"
28 #include "task_queue.h"
29 #include "napi/native_api.h"
30 #include "worker.h"
31 
32 namespace Commonlibrary::Concurrent::TaskPoolModule {
33 using namespace Commonlibrary::Concurrent::Common;
34 
35 static constexpr char FUNCTION_STR[] = "function";
36 static constexpr char ARGUMENTS_STR[] = "arguments";
37 static constexpr char TASKID_STR[] = "taskId";
38 static constexpr char TASKINFO_STR[] = "taskInfo";
39 static constexpr char TRANSFERLIST_STR[] = "transferList";
40 static constexpr char GROUP_ID_STR[] = "groupId";
41 
42 class TaskManager {
43 public:
44     TaskManager();
45     ~TaskManager();
46 
47     static TaskManager& GetInstance();
48 
49     static napi_value IsCanceled(napi_env env, napi_callback_info cbinfo);
50 
51     uint32_t GenerateTaskId();
52     uint32_t GenerateExecuteId();
53     TaskInfo* GetTaskInfo(uint32_t executeId);
54     TaskInfo* PopTaskInfo(uint32_t executeId);
55     void StoreRunningInfo(uint32_t taskId, uint32_t executeId);
56     void AddExecuteState(uint32_t executeId);
57     bool UpdateExecuteState(uint32_t executeId, ExecuteState state);
58     void RemoveExecuteState(uint32_t executeId);
59     void PopRunningInfo(uint32_t taskId, uint32_t executeId);
60     void PopTaskEnvInfo(napi_env env);
61     void EnqueueExecuteId(uint32_t executeId, Priority priority = Priority::DEFAULT);
62     std::pair<uint32_t, Priority> DequeueExecuteId();
63     void CancelTask(napi_env env, uint32_t taskId);
64     TaskInfo* GenerateTaskInfo(napi_env env, napi_value func, napi_value args, uint32_t taskId, uint32_t executeId,
65                                napi_value transferList = nullptr);
66     TaskInfo* GenerateTaskInfoFromTask(napi_env env, napi_value task, uint32_t executeId);
67     void ReleaseTaskContent(TaskInfo* taskInfo);
68 
69     // for worker state
70     void NotifyWorkerIdle(Worker* worker);
71     void NotifyWorkerCreated(Worker* worker);
72     void RemoveWorker(Worker* worker);
73 
74     // for load balance
75     void InitTaskManager(napi_env env);
76     void TryTriggerLoadBalance();
77     void UpdateExecutedInfo(uint64_t duration);
78 
79     // for taskpool state
80     uint32_t GetTaskNum();
81     uint32_t GetThreadNum();
82     uint32_t GetIdleWorkers();
83     uint32_t GetRunningWorkers();
84     uint32_t GetTimeoutWorkers();
85 
86     // for get thread info
87     napi_value GetThreadInfos(napi_env env);
88 
89     // for get task info
90     napi_value GetTaskInfos(napi_env env);
91 
92 private:
93     TaskManager(const TaskManager &) = delete;
94     TaskManager& operator=(const TaskManager &) = delete;
95     TaskManager(TaskManager &&) = delete;
96     TaskManager& operator=(TaskManager &&) = delete;
97 
98     ExecuteState QueryExecuteState(uint32_t executeId);
99     void CreateWorkers(napi_env env, uint32_t num = 1);
100     void NotifyExecuteTask();
101     void NotifyWorkerAdded(Worker* worker);
102     void StoreTaskInfo(uint32_t executeId, TaskInfo* taskInfo);
103     bool MarkCanceledState(uint32_t executeId);
104     void CancelExecution(napi_env env, uint32_t executeId);
105 
106     // for load balance
107     void RunTaskManager();
108     void StoreTaskEnvInfo(napi_env env);
109     void CheckForBlockedWorkers();
110     void CreateOrDeleteWorkers(uint32_t targetNum);
111     bool HasTaskEnvInfo(napi_env env);
112     uint32_t ComputeSuitableThreadNum();
113     static void RestartTimer(const uv_async_t* req);
114     static void TriggerLoadBalance(const uv_timer_t* req = nullptr);
115 
116     std::atomic<int32_t> currentExecuteId_ = 1; // 1: executeId begin from 1, 0 for exception
117     std::atomic<int32_t> currentTaskId_ = 1; // 1: task will begin from 1, 0 for func
118 
119     // <executeId, TaskInfo>
120     std::unordered_map<uint32_t, TaskInfo*> taskInfos_ {};
121     std::shared_mutex taskInfosMutex_;
122 
123     // <executeId, executeState>
124     std::unordered_map<uint32_t, ExecuteState> executeStates_ {};
125     std::shared_mutex executeStatesMutex_;
126 
127     // <taskId, <executeId1, executeId2, ...>>
128     std::unordered_map<uint32_t, std::list<uint32_t>> runningInfos_ {};
129     std::shared_mutex runningInfosMutex_;
130 
131     std::unordered_map<napi_env, uint32_t> taskEnvInfo_ {};
132     std::shared_mutex taskEnvInfoMutex_;
133 
134     std::unordered_set<Worker*> workers_ {};
135     std::unordered_set<Worker*> idleWorkers_ {};
136     std::unordered_set<Worker*> timeoutWorkers_ {};
137     std::recursive_mutex workersMutex_;
138 
139     // for load balance
140     napi_env hostEnv_ = nullptr;
141     uv_loop_t* loop_ = nullptr;
142     uv_timer_t* timer_ = nullptr;
143     uv_async_t* notifyRestartTimer_ = nullptr;
144     std::atomic<bool> suspend_ = false;
145     std::atomic<uint32_t> retryCount_ = 0;
146     std::atomic<uint32_t> totalExecCount_ = 0;
147     std::atomic<uint64_t> totalExecTime_ = 0;
148     std::atomic<uint32_t> expandingCount_ = 0;
149     std::atomic<uint64_t> nextCheckTime_ = 0;
150 
151     // for task priority
152     uint32_t highPrioExecuteCount_ = 0;
153     uint32_t mediumPrioExecuteCount_ = 0;
154     std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {};
155     std::mutex taskQueuesMutex_;
156 
157     std::atomic<bool> isInitialized_ = false;
158 
159     friend class TaskGroupManager;
160 };
161 
162 class TaskGroupManager {
163 public:
164     TaskGroupManager() = default;
165     ~TaskGroupManager() = default;
166 
167     static TaskGroupManager &GetInstance();
168 
169     uint32_t GenerateGroupId();
170     uint32_t GenerateGroupExecuteId();
171     void AddTask(uint32_t groupId, napi_ref task);
172     const std::list<napi_ref>& GetTasksByGroup(uint32_t groupId);
173     void ClearTasks(napi_env env, uint32_t groupId);
174 
175     GroupInfo* GenerateGroupInfo(napi_env env, uint32_t taskNum, uint32_t groupId, uint32_t groupExecuteId);
176     void ClearGroupInfo(napi_env env, uint32_t groupExecuteId, GroupInfo* groupInfo);
177     void CancelGroup(napi_env env, uint32_t groupId);
178     void RemoveExecuteId(uint32_t groupId, uint32_t groupExecuteId);
179     void ClearExecuteId(uint32_t groupId);
180     bool IsRunning(uint32_t groupExecuteId);
181     GroupInfo* GetGroupInfoByExecutionId(uint32_t groupExecuteId);
182 
183 private:
184     TaskGroupManager(const TaskGroupManager &) = delete;
185     TaskGroupManager& operator=(const TaskGroupManager &) = delete;
186     TaskGroupManager(TaskGroupManager &&) = delete;
187     TaskGroupManager& operator=(TaskGroupManager &&) = delete;
188 
189     void StoreExecuteId(uint32_t groupId, uint32_t groupExecuteId);
190 
191     void StoreRunningExecuteId(uint32_t groupExecuteId);
192     void RemoveRunningExecuteId(uint32_t groupExecuteId);
193 
194     void AddGroupInfoById(uint32_t groupExecuteId, GroupInfo* info);
195     void RemoveGroupInfoById(uint32_t groupExecuteId);
196 
197     void CancelGroupExecution(uint32_t executeId);
198 
199     std::atomic<uint32_t> groupId_ = 0;
200     std::atomic<uint32_t> groupExecuteId_ = 1; // 1: 0 reserved for those tasks not in any group
201 
202     // <groupId, <groupExecuteId1, groupExecuteId2, ...>>
203     std::unordered_map<uint32_t, std::list<uint32_t>> groupExecuteIds_ {};
204     std::mutex groupExecuteIdsMutex_;
205 
206     // <groupId, <task1, task2, ...>>
207     std::unordered_map<uint32_t, std::list<napi_ref>> tasks_ {};
208     std::shared_mutex tasksMutex_;
209 
210     // <groupExecuteId1, groupExecuteId2, ...>
211     std::unordered_set<uint32_t> runningGroupExecutions_ {};
212     std::shared_mutex groupExecutionsMutex_;
213 
214     // <<groupExecuteId1, GroupInfo1>, <groupExecuteId2, GroupInfo2>, ...>
215     std::unordered_map<uint32_t, GroupInfo*> groupInfoMap_ {};
216     std::shared_mutex groupInfoMapMutex_;
217 };
218 } // namespace Commonlibrary::Concurrent::TaskPoolModule
219 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H