• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
18 
19 #include <array>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <set>
24 #include <shared_mutex>
25 #include <unordered_map>
26 #include <unordered_set>
27 #include <vector>
28 
29 #include "napi/native_api.h"
30 #include "sequence_runner.h"
31 #include "task.h"
32 #include "task_queue.h"
33 #include "task_group.h"
34 #include "worker.h"
35 
36 
37 namespace Commonlibrary::Concurrent::TaskPoolModule {
38 using namespace Commonlibrary::Concurrent::Common;
39 
40 static constexpr char ARGUMENTS_STR[] = "arguments";
41 static constexpr char NAME[] = "name";
42 static constexpr char FUNCTION_STR[] = "function";
43 static constexpr char GROUP_ID_STR[] = "groupId";
44 static constexpr char TASKID_STR[] = "taskId";
45 static constexpr char TASKINFO_STR[] = "taskInfo";
46 static constexpr char TRANSFERLIST_STR[] = "transferList";
47 static constexpr char CLONE_LIST_STR[] = "cloneList";
48 static constexpr char ADD_DEPENDENCY_STR[] = "addDependency";
49 static constexpr char REMOVE_DEPENDENCY_STR[] = "removeDependency";
50 static constexpr char TASK_CPU_TIME[] = "cpuDuration";
51 static constexpr char TASK_IO_TIME[] = "ioDuration";
52 static constexpr char TASK_TOTAL_TIME[] = "totalDuration";
53 static constexpr char DEFAULT_TRANSFER_STR[] = "defaultTransfer";
54 static constexpr char DEFAULT_CLONE_SENDABLE_STR[] = "defaultCloneSendable";
55 
56 class TaskGroup;
57 
58 class TaskManager {
59 public:
60     static TaskManager& GetInstance();
61 
62     void StoreTask(uint64_t taskId, Task* task);
63     void RemoveTask(uint64_t taskId);
64     Task* GetTask(uint64_t taskId);
65     void EnqueueTaskId(uint64_t taskId, Priority priority = Priority::DEFAULT);
66     std::pair<uint64_t, Priority> DequeueTaskId();
67     void CancelTask(napi_env env, uint64_t taskId);
68     void ReleaseTaskData(napi_env env, Task* task);
69 
70     // for worker state
71     void NotifyWorkerIdle(Worker* worker);
72     void NotifyWorkerCreated(Worker* worker);
73     void NotifyWorkerRunning(Worker* worker);
74     void RemoveWorker(Worker* worker);
75     void RestoreWorker(Worker* worker);
76 
77     // for load balance
78     void InitTaskManager(napi_env env);
79     void UpdateExecutedInfo(uint64_t duration);
80     void TryTriggerExpand();
81 
82     // for taskpool state
83     uint32_t GetTaskNum();
84     uint32_t GetIdleWorkers();
85     uint32_t GetThreadNum();
86     uint32_t GetRunningWorkers();
87     uint32_t GetTimeoutWorkers();
88     void GetIdleWorkersList(uint32_t step);
89     bool ReadThreadInfo(Worker* worker, char* buf, uint32_t size);
90 
91     // for get thread info
92     napi_value GetThreadInfos(napi_env env);
93 
94     // for get task info
95     napi_value GetTaskInfos(napi_env env);
96 
97     // for countTrace for worker
98     void CountTraceForWorker();
99 
100     std::shared_ptr<CallbackInfo> GetCallbackInfo(uint64_t taskId);
101     void RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo);
102     void IncreaseRefCount(uint64_t taskId);
103     void DecreaseRefCount(napi_env env, uint64_t taskId);
104     napi_value NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task);
105 
106     // for task dependency
107     bool IsDependendByTaskId(uint64_t taskId);
108     void NotifyDependencyTaskInfo(uint64_t taskId);
109     bool StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet);
110     bool RemoveTaskDependency(uint64_t taskId, uint64_t dependentId);
111     bool CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId);
112     void EnqueuePendingTaskInfo(uint64_t taskId, Priority priority);
113     std::pair<uint64_t, Priority> DequeuePendingTaskInfo(uint64_t taskId);
114     void RemovePendingTaskInfo(uint64_t taskId);
115     void StoreDependentTaskInfo(std::set<uint64_t> dependTaskIdSet, uint64_t taskId);
116     void RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId);
117 
118     // for duration
119     void StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration);
120     uint64_t GetTaskDuration(uint64_t taskId, std::string durationType);
121     void RemoveTaskDuration(uint64_t taskId);
122 
123 private:
124     TaskManager();
125     ~TaskManager();
126     TaskManager(const TaskManager &) = delete;
127     TaskManager& operator=(const TaskManager &) = delete;
128     TaskManager(TaskManager &&) = delete;
129     TaskManager& operator=(TaskManager &&) = delete;
130 
131     void CreateWorkers(napi_env env, uint32_t num = 1);
132     void NotifyExecuteTask();
133     void NotifyWorkerAdded(Worker* worker);
134 
135     // for load balance
136     void RunTaskManager();
137     void CheckForBlockedWorkers();
138     void TryExpand();
139     void NotifyShrink(uint32_t targetNum);
140     void TriggerShrink(uint32_t step);
141     uint32_t ComputeSuitableThreadNum();
142     static void NotifyExpand(const uv_async_t* req);
143     static void TriggerLoadBalance(const uv_timer_t* req = nullptr);
144 
145     // <taskId, Task>
146     std::unordered_map<uint64_t, Task*> tasks_ {};
147     std::shared_mutex tasksMutex_;
148 
149     // <taskId, <dependent taskId1, dependent taskId2, ...>>, update when removeDependency or executeTask
150     std::unordered_map<uint64_t, std::set<uint64_t>> dependTaskInfos_ {};
151     std::shared_mutex dependTaskInfosMutex_;
152 
153     // <dependent taskId, <taskId1, taskId2, ...>>, update when removeDependency or executeTask
154     std::unordered_map<uint64_t, std::set<uint64_t>> dependentTaskInfos_ {};
155 
156     // <<pendingTaskId1, priority>, <pendingTaskId2, priority>, ...>
157     std::unordered_map<uint64_t, Priority> pendingTaskInfos_ {};
158     std::shared_mutex pendingTaskInfosMutex_;
159 
160     // <<taskId1, <totalDuration1, cpuDuration1>>, <taskId2, <totalDuration2, cpuDuration2>>, ...>
161     std::unordered_map<uint64_t, std::pair<uint64_t, uint64_t>> taskDurationInfos_ {};
162     std::shared_mutex taskDurationInfosMutex_;
163 
164     std::unordered_set<Worker*> workers_ {};
165     std::unordered_set<Worker*> idleWorkers_ {};
166     std::unordered_set<Worker*> timeoutWorkers_ {};
167     std::recursive_mutex workersMutex_;
168 
169     // for load balance
170     napi_env hostEnv_ = nullptr;
171     uv_loop_t* loop_ = nullptr;
172     uv_timer_t* timer_ = nullptr;
173     uv_async_t* expandHandle_ = nullptr;
174     std::atomic<bool> suspend_ = false;
175     std::atomic<uint32_t> retryCount_ = 0;
176     std::atomic<uint32_t> expandingCount_ = 0;
177     std::atomic<uint32_t> totalExecCount_ = 0;
178     std::atomic<uint64_t> totalExecTime_ = 0;
179     std::atomic<bool> needChecking_ = false;
180 
181     // for task priority
182     uint32_t highPrioExecuteCount_ = 0;
183     uint32_t mediumPrioExecuteCount_ = 0;
184     std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {};
185     std::mutex taskQueuesMutex_;
186 
187     std::atomic<bool> isInitialized_ = false;
188 
189     std::mutex callbackMutex_;
190     std::map<uint32_t, std::shared_ptr<CallbackInfo>> callbackTable_ {};
191     std::vector<Worker*> freeList_ {};
192     friend class TaskGroupManager;
193 };
194 
195 class TaskGroupManager {
196 public:
197     TaskGroupManager() = default;
198     ~TaskGroupManager() = default;
199 
200     static TaskGroupManager &GetInstance();
201 
202     void AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId);
203     void StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup);
204     void RemoveTaskGroup(uint64_t groupId);
205     TaskGroup* GetTaskGroup(uint64_t groupId);
206     void CancelGroup(napi_env env, uint64_t groupId);
207     void CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group);
208     void ReleaseTaskGroupData(napi_env env, TaskGroup* group);
209     void UpdateGroupState(uint64_t groupId);
210 
211     void AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task);
212     bool TriggerSeqRunner(napi_env env, Task* lastTask);
213     void StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner);
214     void RemoveSequenceRunner(uint64_t seqRunnerId);
215     SequenceRunner* GetSeqRunner(uint64_t seqRunnerId);
216 
217 private:
218     TaskGroupManager(const TaskGroupManager &) = delete;
219     TaskGroupManager& operator=(const TaskGroupManager &) = delete;
220     TaskGroupManager(TaskGroupManager &&) = delete;
221     TaskGroupManager& operator=(TaskGroupManager &&) = delete;
222 
223     // <groupId, TaskGroup>
224     std::unordered_map<uint64_t, TaskGroup*> taskGroups_ {};
225     std::mutex taskGroupsMutex_;
226 
227     // <seqRunnerId, SequenceRunner>
228     std::unordered_map<uint64_t, SequenceRunner*> seqRunners_ {};
229     std::mutex seqRunnersMutex_;
230 };
231 } // namespace Commonlibrary::Concurrent::TaskPoolModule
232 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H