• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
18 
19 #include <array>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <set>
24 #include <shared_mutex>
25 #include <unordered_map>
26 #include <unordered_set>
27 #include <vector>
28 
29 #include "dfx_hisys_event.h"
30 #include "napi/native_api.h"
31 #include "sequence_runner.h"
32 #include "task.h"
33 #include "task_queue.h"
34 #include "task_group.h"
35 #include "worker.h"
36 
37 namespace Commonlibrary::Concurrent::TaskPoolModule {
38 using namespace Commonlibrary::Concurrent::Common;
39 
40 static constexpr char ARGUMENTS_STR[] = "arguments";
41 static constexpr char NAME[] = "name";
42 static constexpr char FUNCTION_STR[] = "function";
43 static constexpr char GROUP_ID_STR[] = "groupId";
44 static constexpr char TASKID_STR[] = "taskId";
45 static constexpr char TASKINFO_STR[] = "taskInfo";
46 static constexpr char TRANSFERLIST_STR[] = "transferList";
47 static constexpr char CLONE_LIST_STR[] = "cloneList";
48 static constexpr char ADD_DEPENDENCY_STR[] = "addDependency";
49 static constexpr char REMOVE_DEPENDENCY_STR[] = "removeDependency";
50 static constexpr char TASK_CPU_TIME[] = "cpuDuration";
51 static constexpr char TASK_IO_TIME[] = "ioDuration";
52 static constexpr char TASK_TOTAL_TIME[] = "totalDuration";
53 static constexpr char DEFAULT_TRANSFER_STR[] = "defaultTransfer";
54 static constexpr char DEFAULT_CLONE_SENDABLE_STR[] = "defaultCloneSendable";
55 
56 class TaskGroup;
57 
58 class TaskManager {
59 public:
60     static TaskManager& GetInstance();
61 
62     void StoreTask(Task* task);
63     void RemoveTask(uint32_t taskId);
64     Task* GetTask(uint32_t taskId);
65     void EnqueueTaskId(uint32_t taskId, Priority priority = Priority::DEFAULT);
66     bool EraseWaitingTaskId(uint32_t taskId, Priority priority);
67     std::pair<uint32_t, Priority> DequeueTaskId();
68     void CancelTask(napi_env env, uint32_t taskId);
69     void CancelSeqRunnerTask(napi_env env, Task* task);
70     void ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask = true);
71 
72     // for worker state
73     void NotifyWorkerIdle(Worker* worker);
74     void NotifyWorkerCreated(Worker* worker);
75     void NotifyWorkerRunning(Worker* worker);
76     void RemoveWorker(Worker* worker);
77     void RestoreWorker(Worker* worker);
78 
79     // for load balance
80     void InitTaskManager(napi_env env);
81     void UpdateExecutedInfo(uint64_t duration);
82     void TryTriggerExpand();
83 
84     // for taskpool state
85     uint32_t GetTaskNum();
86     uint32_t GetIdleWorkers();
87     uint32_t GetThreadNum();
88     uint32_t GetRunningWorkers();
89     uint32_t GetTimeoutWorkers();
90     void GetIdleWorkersList(uint32_t step);
91     bool ReadThreadInfo(pid_t tid, char* buf, uint32_t size);
92 
93     // for get thread info
94     napi_value GetThreadInfos(napi_env env);
95 
96     // for get task info
97     napi_value GetTaskInfos(napi_env env);
98 
99     // for countTrace for worker
100     void CountTraceForWorker();
101 
102     std::shared_ptr<CallbackInfo> GetCallbackInfo(uint32_t taskId);
103     void RegisterCallback(napi_env env, uint32_t taskId, std::shared_ptr<CallbackInfo> callbackInfo);
104     void IncreaseRefCount(uint32_t taskId);
105     void DecreaseRefCount(napi_env env, uint32_t taskId);
106     napi_value NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task);
107     MsgQueue* GetMessageQueue(const uv_async_t* req);
108     MsgQueue* GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo);
109     void ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo>& callbackInfo);
110 
111     // for task dependency
112     bool IsDependendByTaskId(uint32_t taskId);
113     bool IsDependentByTaskId(uint32_t dependentTaskId);
114     void NotifyDependencyTaskInfo(uint32_t taskId);
115     void RemoveDependencyById(uint32_t dependentTaskId, uint32_t taskId);
116     bool StoreTaskDependency(uint32_t taskId, std::set<uint32_t> taskIdSet);
117     bool RemoveTaskDependency(uint32_t taskId, uint32_t dependentId);
118     bool CheckCircularDependency(std::set<uint32_t> dependentIdSet, std::set<uint32_t> idSet, uint32_t taskId);
119     void EnqueuePendingTaskInfo(uint32_t taskId, Priority priority);
120     std::pair<uint32_t, Priority> DequeuePendingTaskInfo(uint32_t taskId);
121     void RemovePendingTaskInfo(uint32_t taskId);
122     void StoreDependentTaskInfo(std::set<uint32_t> dependTaskIdSet, uint32_t taskId);
123     void RemoveDependentTaskInfo(uint32_t dependentTaskId, uint32_t taskId);
124     std::string GetTaskDependInfoToString(uint32_t taskId);
125 
126     bool PostTask(std::function<void()> task, const char* taskName, Priority priority = Priority::DEFAULT);
127 
128     // for duration
129     void StoreTaskDuration(uint32_t taskId, uint64_t totalDuration, uint64_t cpuDuration);
130     uint64_t GetTaskDuration(uint32_t taskId, std::string durationType);
131     void RemoveTaskDuration(uint32_t taskId);
132     void StoreLongTaskInfo(uint32_t taskId, Worker* worker);
133     void RemoveLongTaskInfo(uint32_t taskId);
134     void TerminateTask(uint32_t taskId);
135     Worker* GetLongTaskInfo(uint32_t taskId);
136 
137     // for callback
138     void ReleaseCallBackInfo(Task* task);
139 
140     void UpdateSystemAppFlag();
IsSystemApp()141     bool IsSystemApp() const
142     {
143         return isSystemApp_;
144     }
EnableFfrt()145     bool EnableFfrt() const
146     {
147         return globalEnableFfrtFlag_ || (isSystemApp_ && !disableFfrtFlag_);
148     }
149 
150     bool CheckTask(uint32_t taskId);
151     void BatchRejectDeferred(napi_env env, std::list<napi_deferred> deferreds, std::string error);
152     uint32_t CalculateTaskId(uint64_t id);
153     void ClearDependentTask(uint32_t taskId);
154     void UvReportHisysEvent(Worker* worker, std::string methodName, std::string funName, std::string message,
155                             int32_t code);
156 
157 private:
158     TaskManager();
159     ~TaskManager();
160     TaskManager(const TaskManager &) = delete;
161     TaskManager& operator=(const TaskManager &) = delete;
162     TaskManager(TaskManager &&) = delete;
163     TaskManager& operator=(TaskManager &&) = delete;
164 
165     void CreateWorkers(napi_env env, uint32_t num = 1);
166     void NotifyExecuteTask();
167     void NotifyWorkerAdded(Worker* worker);
168 
169     // for load balance
170     void RunTaskManager();
171     void CheckForBlockedWorkers();
172     template <bool needCheckIdle> void TryExpandWithCheckIdle();
173     void NotifyShrink(uint32_t targetNum);
174     void TriggerShrink(uint32_t step);
175     uint32_t ComputeSuitableThreadNum();
176     uint32_t ComputeSuitableIdleNum();
177     void DispatchAndTryExpandInner();
178     static void TryExpand(const uv_timer_t* req = nullptr);
179     static void DispatchAndTryExpand(const uv_async_t* req);
180     static void TriggerLoadBalance(const uv_timer_t* req);
181 
182     bool IsChooseIdle();
183     uint32_t GetNonIdleTaskNum();
184     std::pair<uint32_t, Priority> GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue, Priority priority);
185     void IncreaseNumIfNoIdle(Priority priority);
186     void DecreaseNumIfNoIdle(Priority priority);
187     void RemoveDependTaskByTaskId(uint32_t taskId);
188     void RemoveDependentTaskByTaskId(uint32_t taskId);
189     void CheckTasksAndReportHisysEvent();
190     void WorkerAliveAndReport(Worker* worker);
191     void WriteHisysForFfrtAndUv(Worker* worker, HisyseventParams* hisyseventParams);
192 
193     // <taskId, Task>
194     std::unordered_map<uint32_t, Task*> tasks_ {};
195     std::recursive_mutex tasksMutex_;
196 
197     // <taskId, <dependent taskId1, dependent taskId2, ...>>, update when removeDependency or executeTask
198     std::unordered_map<uint32_t, std::set<uint32_t>> dependTaskInfos_ {};
199     std::shared_mutex dependTaskInfosMutex_;
200 
201     // <dependent taskId, <taskId1, taskId2, ...>>, update when removeDependency or executeTask
202     std::unordered_map<uint32_t, std::set<uint32_t>> dependentTaskInfos_ {};
203     std::shared_mutex dependentTaskInfosMutex_;
204 
205     // <<pendingTaskId1, priority>, <pendingTaskId2, priority>, ...>
206     std::unordered_map<uint32_t, Priority> pendingTaskInfos_ {};
207     std::shared_mutex pendingTaskInfosMutex_;
208 
209     // <<taskId1, <totalDuration1, cpuDuration1>>, <taskId2, <totalDuration2, cpuDuration2>>, ...>
210     std::unordered_map<uint32_t, std::pair<uint64_t, uint64_t>> taskDurationInfos_ {};
211     std::shared_mutex taskDurationInfosMutex_;
212 
213     // record the longTasks and workers for efficiency
214     std::unordered_map<uint32_t, Worker*> longTasksMap_ {};
215     std::shared_mutex longTasksMutex_{};
216 
217     std::unordered_set<Worker*> workers_ {};
218     std::unordered_set<Worker*> idleWorkers_ {};
219     std::unordered_set<Worker*> timeoutWorkers_ {};
220     std::recursive_mutex workersMutex_;
221 
222     // for load balance
223     napi_env hostEnv_ = nullptr;
224     uv_loop_t* loop_ = nullptr;
225     uv_timer_t* balanceTimer_ = nullptr;
226     uv_timer_t* expandTimer_ = nullptr;
227     uv_async_t* dispatchHandle_ = nullptr;
228     std::atomic<bool> suspend_ = false;
229     std::atomic<uint32_t> retryCount_ = 0;
230     std::atomic<uint32_t> expandingCount_ = 0;
231     std::atomic<uint32_t> nonIdleTaskNum_ = 0;
232     std::atomic<uint32_t> totalExecCount_ = 0;
233     std::atomic<uint64_t> totalExecTime_ = 0;
234     std::atomic<bool> needChecking_ = false;
235     std::atomic<bool> isHandleInited_ = false;
236     std::atomic<uint32_t> timerTriggered_ = false;
237     std::atomic<uint64_t> preDequeneTime_ = 0;
238     std::atomic<uint64_t> reportTime_ = 0;
239 
240     // for task priority
241     uint32_t highPrioExecuteCount_ = 0;
242     uint32_t mediumPrioExecuteCount_ = 0;
243     std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {};
244     std::mutex taskQueuesMutex_;
245 
246     std::atomic<bool> isInitialized_ = false;
247     std::atomic<bool> isSystemApp_ = false;
248     int disableFfrtFlag_ = 0; // 0 means enable ffrt
249     int globalEnableFfrtFlag_ = 0; // 0 means not global enable ffrt
250 
251     std::mutex callbackMutex_;
252     std::map<uint32_t, std::shared_ptr<CallbackInfo>> callbackTable_ {};
253     std::vector<Worker*> freeList_ {};
254     uint32_t maxThreads_ = ConcurrentHelper::GetMaxThreads();
255 
256 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
257     std::shared_ptr<OHOS::AppExecFwk::EventHandler> mainThreadHandler_ {};
258 #endif
259 
260     friend class TaskGroupManager;
261     friend class NativeEngineTest;
262 };
263 } // namespace Commonlibrary::Concurrent::TaskPoolModule
264 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H