• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
18 
19 #include <array>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <set>
24 #include <shared_mutex>
25 #include <unordered_map>
26 #include <unordered_set>
27 #include <vector>
28 
29 #include "napi/native_api.h"
30 #include "sequence_runner.h"
31 #include "task.h"
32 #include "task_queue.h"
33 #include "task_group.h"
34 #include "worker.h"
35 
36 namespace Commonlibrary::Concurrent::TaskPoolModule {
37 using namespace Commonlibrary::Concurrent::Common;
38 
39 static constexpr char ARGUMENTS_STR[] = "arguments";
40 static constexpr char NAME[] = "name";
41 static constexpr char FUNCTION_STR[] = "function";
42 static constexpr char GROUP_ID_STR[] = "groupId";
43 static constexpr char TASKID_STR[] = "taskId";
44 static constexpr char TASKINFO_STR[] = "taskInfo";
45 static constexpr char TRANSFERLIST_STR[] = "transferList";
46 static constexpr char CLONE_LIST_STR[] = "cloneList";
47 static constexpr char ADD_DEPENDENCY_STR[] = "addDependency";
48 static constexpr char REMOVE_DEPENDENCY_STR[] = "removeDependency";
49 static constexpr char TASK_CPU_TIME[] = "cpuDuration";
50 static constexpr char TASK_IO_TIME[] = "ioDuration";
51 static constexpr char TASK_TOTAL_TIME[] = "totalDuration";
52 static constexpr char DEFAULT_TRANSFER_STR[] = "defaultTransfer";
53 static constexpr char DEFAULT_CLONE_SENDABLE_STR[] = "defaultCloneSendable";
54 
55 class TaskGroup;
56 
57 class TaskManager {
58 public:
59     static TaskManager& GetInstance();
60 
61     void StoreTask(uint64_t taskId, Task* task);
62     void RemoveTask(uint64_t taskId);
63     Task* GetTask(uint64_t taskId);
64     void EnqueueTaskId(uint64_t taskId, Priority priority = Priority::DEFAULT);
65     bool EraseWaitingTaskId(uint64_t taskId, Priority priority);
66     std::pair<uint64_t, Priority> DequeueTaskId();
67     void CancelTask(napi_env env, uint64_t taskId);
68     void CancelSeqRunnerTask(napi_env env, Task* task);
69     void ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask = true);
70 
71     // for worker state
72     void NotifyWorkerIdle(Worker* worker);
73     void NotifyWorkerCreated(Worker* worker);
74     void NotifyWorkerRunning(Worker* worker);
75     void RemoveWorker(Worker* worker);
76     void RestoreWorker(Worker* worker);
77 
78     // for load balance
79     void InitTaskManager(napi_env env);
80     void UpdateExecutedInfo(uint64_t duration);
81     void TryTriggerExpand();
82 
83     // for taskpool state
84     uint32_t GetTaskNum();
85     uint32_t GetIdleWorkers();
86     uint32_t GetThreadNum();
87     uint32_t GetRunningWorkers();
88     uint32_t GetTimeoutWorkers();
89     void GetIdleWorkersList(uint32_t step);
90     bool ReadThreadInfo(pid_t tid, char* buf, uint32_t size);
91 
92     // for get thread info
93     napi_value GetThreadInfos(napi_env env);
94 
95     // for get task info
96     napi_value GetTaskInfos(napi_env env);
97 
98     // for get task name
99     std::string GetTaskName(uint64_t taskId);
100 
101     // for countTrace for worker
102     void CountTraceForWorker();
103 
104     std::shared_ptr<CallbackInfo> GetCallbackInfo(uint64_t taskId);
105     void RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo);
106     void IncreaseRefCount(uint64_t taskId);
107     void DecreaseRefCount(napi_env env, uint64_t taskId);
108     napi_value NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task);
109     MsgQueue* GetMessageQueue(const uv_async_t* req);
110     MsgQueue* GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo);
111     void ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo>& callbackInfo);
112 
113     // for task dependency
114     bool IsDependendByTaskId(uint64_t taskId);
115     bool IsDependentByTaskId(uint64_t dependentTaskId);
116     void NotifyDependencyTaskInfo(uint64_t taskId);
117     void RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId);
118     bool StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet);
119     bool RemoveTaskDependency(uint64_t taskId, uint64_t dependentId);
120     bool CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId);
121     void EnqueuePendingTaskInfo(uint64_t taskId, Priority priority);
122     std::pair<uint64_t, Priority> DequeuePendingTaskInfo(uint64_t taskId);
123     void RemovePendingTaskInfo(uint64_t taskId);
124     void StoreDependentTaskInfo(std::set<uint64_t> dependTaskIdSet, uint64_t taskId);
125     void RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId);
126     std::string GetTaskDependInfoToString(uint64_t taskId);
127 
128     bool PostTask(std::function<void()> task, const char* taskName, Priority priority = Priority::DEFAULT);
129 
130     // for duration
131     void StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration);
132     uint64_t GetTaskDuration(uint64_t taskId, std::string durationType);
133     void RemoveTaskDuration(uint64_t taskId);
134     void StoreLongTaskInfo(uint64_t taskId, Worker* worker);
135     void RemoveLongTaskInfo(uint64_t taskId);
136     void TerminateTask(uint64_t taskId);
137     Worker* GetLongTaskInfo(uint64_t taskId);
138 
139     // for callback
140     void ReleaseCallBackInfo(Task* task);
141 
142     void UpdateSystemAppFlag();
IsSystemApp()143     bool IsSystemApp() const
144     {
145         return isSystemApp_;
146     }
EnableFfrt()147     bool EnableFfrt() const
148     {
149         return globalEnableFfrtFlag_ || (isSystemApp_ && !disableFfrtFlag_);
150     }
151 
152     bool CheckTask(uint64_t taskId);
153     void BatchRejectDeferred(napi_env env, std::list<napi_deferred> deferreds, std::string error);
154 
155 private:
156     TaskManager();
157     ~TaskManager();
158     TaskManager(const TaskManager &) = delete;
159     TaskManager& operator=(const TaskManager &) = delete;
160     TaskManager(TaskManager &&) = delete;
161     TaskManager& operator=(TaskManager &&) = delete;
162 
163     void CreateWorkers(napi_env env, uint32_t num = 1);
164     void NotifyExecuteTask();
165     void NotifyWorkerAdded(Worker* worker);
166 
167     // for load balance
168     void RunTaskManager();
169     void CheckForBlockedWorkers();
170     void TryExpand();
171     void NotifyShrink(uint32_t targetNum);
172     void TriggerShrink(uint32_t step);
173     uint32_t ComputeSuitableThreadNum();
174     uint32_t ComputeSuitableIdleNum();
175     static void NotifyExpand(const uv_async_t* req);
176     static void TriggerLoadBalance(const uv_timer_t* req = nullptr);
177 
178     bool IsChooseIdle();
179     uint32_t GetNonIdleTaskNum();
180     std::pair<uint64_t, Priority> GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue, Priority priority);
181     void IncreaseNumIfNoIdle(Priority priority);
182     void DecreaseNumIfNoIdle(Priority priority);
183 
184     // <taskId, Task>
185     std::unordered_map<uint64_t, Task*> tasks_ {};
186     RECURSIVE_MUTEX tasksMutex_;
187 
188     // <taskId, <dependent taskId1, dependent taskId2, ...>>, update when removeDependency or executeTask
189     std::unordered_map<uint64_t, std::set<uint64_t>> dependTaskInfos_ {};
190     std::shared_mutex dependTaskInfosMutex_;
191 
192     // <dependent taskId, <taskId1, taskId2, ...>>, update when removeDependency or executeTask
193     std::unordered_map<uint64_t, std::set<uint64_t>> dependentTaskInfos_ {};
194     std::shared_mutex dependentTaskInfosMutex_;
195 
196     // <<pendingTaskId1, priority>, <pendingTaskId2, priority>, ...>
197     std::unordered_map<uint64_t, Priority> pendingTaskInfos_ {};
198     std::shared_mutex pendingTaskInfosMutex_;
199 
200     // <<taskId1, <totalDuration1, cpuDuration1>>, <taskId2, <totalDuration2, cpuDuration2>>, ...>
201     std::unordered_map<uint64_t, std::pair<uint64_t, uint64_t>> taskDurationInfos_ {};
202     std::shared_mutex taskDurationInfosMutex_;
203 
204     // record the longTasks and workers for efficiency
205     std::unordered_map<uint64_t, Worker*> longTasksMap_ {};
206     std::shared_mutex longTasksMutex_{};
207 
208     std::unordered_set<Worker*> workers_ {};
209     std::unordered_set<Worker*> idleWorkers_ {};
210     std::unordered_set<Worker*> timeoutWorkers_ {};
211     RECURSIVE_MUTEX workersMutex_;
212 
213     // for load balance
214     napi_env hostEnv_ = nullptr;
215     uv_loop_t* loop_ = nullptr;
216     uv_timer_t* timer_ = nullptr;
217     uv_async_t* expandHandle_ = nullptr;
218     std::atomic<bool> suspend_ = false;
219     std::atomic<uint32_t> retryCount_ = 0;
220     std::atomic<uint32_t> nonIdleTaskNum_ = 0;
221     std::atomic<uint32_t> totalExecCount_ = 0;
222     std::atomic<uint64_t> totalExecTime_ = 0;
223     std::atomic<bool> needChecking_ = false;
224     std::atomic<bool> isHandleInited_ = false;
225 
226     // for task priority
227     uint32_t highPrioExecuteCount_ = 0;
228     uint32_t mediumPrioExecuteCount_ = 0;
229     std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {};
230     std::mutex taskQueuesMutex_;
231 
232     std::atomic<bool> isInitialized_ = false;
233     std::atomic<bool> isSystemApp_ = false;
234     int disableFfrtFlag_ = 0; // 0 means enable ffrt
235     int globalEnableFfrtFlag_ = 0; // 0 means not global enable ffrt
236 
237     std::mutex callbackMutex_;
238     std::map<uint32_t, std::shared_ptr<CallbackInfo>> callbackTable_ {};
239     std::vector<Worker*> freeList_ {};
240 
241 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
242     std::shared_ptr<OHOS::AppExecFwk::EventHandler> mainThreadHandler_ {};
243 #endif
244 
245     friend class TaskGroupManager;
246     friend class NativeEngineTest;
247 };
248 
249 class TaskGroupManager {
250 public:
251     TaskGroupManager() = default;
252     ~TaskGroupManager() = default;
253 
254     static TaskGroupManager &GetInstance();
255 
256     void AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId);
257     void StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup);
258     void RemoveTaskGroup(uint64_t groupId);
259     TaskGroup* GetTaskGroup(uint64_t groupId);
260     void CancelGroup(napi_env env, uint64_t groupId);
261     void CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group);
262     void ReleaseTaskGroupData(napi_env env, TaskGroup* group);
263     bool UpdateGroupState(uint64_t groupId);
264 
265     void AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task);
266     bool TriggerSeqRunner(napi_env env, Task* lastTask);
267     void DisposeCanceledTask(napi_env env, Task* task);
268     void StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner);
269     void RemoveSequenceRunner(uint64_t seqRunnerId);
270     SequenceRunner* GetSeqRunner(uint64_t seqRunnerId);
271 
272 private:
273     TaskGroupManager(const TaskGroupManager &) = delete;
274     TaskGroupManager& operator=(const TaskGroupManager &) = delete;
275     TaskGroupManager(TaskGroupManager &&) = delete;
276     TaskGroupManager& operator=(TaskGroupManager &&) = delete;
277 
278     // <groupId, TaskGroup>
279     std::unordered_map<uint64_t, TaskGroup*> taskGroups_ {};
280     std::mutex taskGroupsMutex_;
281 
282     // <seqRunnerId, SequenceRunner>
283     std::unordered_map<uint64_t, SequenceRunner*> seqRunners_ {};
284     std::mutex seqRunnersMutex_;
285     friend class NativeEngineTest;
286 };
287 
288 class SequenceRunnerManager {
289 public:
290     SequenceRunnerManager() = default;
291     ~SequenceRunnerManager() = default;
292 
293     static SequenceRunnerManager &GetInstance();
294     SequenceRunner* CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc,
295                                             const std::string &name, uint32_t priority);
296     uint64_t DecreaseSeqCount(SequenceRunner* seqRunner);
297     void RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner);
298     void RemoveSequenceRunner(const std::string &name);
299     bool TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner);
300     void GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner);
301     bool IncreaseGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner);
302     void RemoveWaitingTask(Task* task);
303 
304 private:
305     SequenceRunnerManager(const SequenceRunnerManager &) = delete;
306     SequenceRunnerManager& operator=(const SequenceRunnerManager &) = delete;
307     SequenceRunnerManager(SequenceRunnerManager &&) = delete;
308     SequenceRunnerManager& operator=(SequenceRunnerManager &&) = delete;
309 
310     // <<name1, seqRunner>, <name2, seqRunner>, ...>
311     std::unordered_map<std::string, SequenceRunner*> globalSeqRunner_ {};
312     std::mutex globalSeqRunnerMutex_;
313 };
314 } // namespace Commonlibrary::Concurrent::TaskPoolModule
315 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
316