• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
18 
19 #include <array>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <set>
24 #include <shared_mutex>
25 #include <unordered_map>
26 #include <unordered_set>
27 #include <vector>
28 
29 #include "dfx_hisys_event.h"
30 #include "napi/native_api.h"
31 #include "sequence_runner.h"
32 #include "task.h"
33 #include "task_queue.h"
34 #include "task_group.h"
35 #include "worker.h"
36 
37 namespace Commonlibrary::Concurrent::TaskPoolModule {
38 using namespace Commonlibrary::Concurrent::Common;
39 
40 static constexpr char ARGUMENTS_STR[] = "arguments";
41 static constexpr char NAME[] = "name";
42 static constexpr char FUNCTION_STR[] = "function";
43 static constexpr char GROUP_ID_STR[] = "groupId";
44 static constexpr char TASKID_STR[] = "taskId";
45 static constexpr char TASKINFO_STR[] = "taskInfo";
46 static constexpr char TRANSFERLIST_STR[] = "transferList";
47 static constexpr char CLONE_LIST_STR[] = "cloneList";
48 static constexpr char ADD_DEPENDENCY_STR[] = "addDependency";
49 static constexpr char REMOVE_DEPENDENCY_STR[] = "removeDependency";
50 static constexpr char TASK_CPU_TIME[] = "cpuDuration";
51 static constexpr char TASK_IO_TIME[] = "ioDuration";
52 static constexpr char TASK_TOTAL_TIME[] = "totalDuration";
53 static constexpr char DEFAULT_TRANSFER_STR[] = "defaultTransfer";
54 static constexpr char DEFAULT_CLONE_SENDABLE_STR[] = "defaultCloneSendable";
55 
56 class TaskGroup;
57 
58 class TaskManager {
59 public:
60     static TaskManager& GetInstance();
61 
62     void StoreTask(Task* task);
63     void RemoveTask(uint32_t taskId);
64     Task* GetTask(uint32_t taskId);
65     void EnqueueTaskId(uint32_t taskId, Priority priority = Priority::DEFAULT);
66     bool EraseWaitingTaskId(uint32_t taskId, Priority priority);
67     std::pair<uint32_t, Priority> DequeueTaskId();
68     void CancelTask(napi_env env, uint32_t taskId);
69     void CancelSeqRunnerTask(napi_env env, Task* task);
70     void ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask = true);
71 
72     // for worker state
73     void NotifyWorkerIdle(Worker* worker);
74     void NotifyWorkerCreated(Worker* worker);
75     void NotifyWorkerRunning(Worker* worker);
76     void RemoveWorker(Worker* worker);
77     void RestoreWorker(Worker* worker);
78 
79     // for load balance
80     void InitTaskManager(napi_env env);
81     void UpdateExecutedInfo(uint64_t duration);
82     void TryTriggerExpand();
83 
84     // for taskpool state
85     uint32_t GetTaskNum();
86     uint32_t GetIdleWorkers();
87     uint32_t GetThreadNum();
88     uint32_t GetRunningWorkers();
89     uint32_t GetTimeoutWorkers();
90     void GetIdleWorkersList(uint32_t step);
91     bool ReadThreadInfo(pid_t tid, char* buf, uint32_t size);
92 
93     // for get thread info
94     napi_value GetThreadInfos(napi_env env);
95 
96     // for get task info
97     napi_value GetTaskInfos(napi_env env);
98 
99     // for countTrace for worker
100     void CountTraceForWorker(bool needLog = false);
101     void CountTraceForWorkerWithoutLock(bool needLog = false);
102 
103     void RegisterCallback(napi_env env, uint32_t taskId, std::shared_ptr<CallbackInfo> callbackInfo,
104         const std::string& type);
105     void IncreaseSendDataRefCount(uint32_t taskId);
106     void DecreaseSendDataRefCount(napi_env env, uint32_t taskId, Task* task = nullptr);
107     void ExecuteSendData(napi_env env, TaskResultInfo* resultInfo, Task* task);
108 
109     // for task dependency
110     bool IsDependendByTaskId(uint32_t taskId);
111     bool IsDependentByTaskId(uint32_t dependentTaskId);
112     void NotifyDependencyTaskInfo(uint32_t taskId);
113     void RemoveDependencyById(uint32_t dependentTaskId, uint32_t taskId);
114     bool StoreTaskDependency(uint32_t taskId, std::set<uint32_t> taskIdSet);
115     bool RemoveTaskDependency(uint32_t taskId, uint32_t dependentId);
116     bool CheckCircularDependency(std::set<uint32_t> dependentIdSet, std::set<uint32_t> idSet, uint32_t taskId);
117     void EnqueuePendingTaskInfo(uint32_t taskId, Priority priority);
118     std::pair<uint32_t, Priority> DequeuePendingTaskInfo(uint32_t taskId);
119     void RemovePendingTaskInfo(uint32_t taskId);
120     void StoreDependentTaskInfo(std::set<uint32_t> dependTaskIdSet, uint32_t taskId);
121     void RemoveDependentTaskInfo(uint32_t dependentTaskId, uint32_t taskId);
122     std::string GetTaskDependInfoToString(uint32_t taskId);
123 
124     bool PostTask(std::function<void()> task, const char* taskName, Priority priority = Priority::DEFAULT);
125 
126     // for duration
127     void StoreTaskDuration(uint32_t taskId, uint64_t totalDuration, uint64_t cpuDuration);
128     uint64_t GetTaskDuration(uint32_t taskId, std::string durationType);
129     void RemoveTaskDuration(uint32_t taskId);
130     void StoreLongTaskInfo(uint32_t taskId, Worker* worker);
131     void RemoveLongTaskInfo(uint32_t taskId);
132     void TerminateTask(uint32_t taskId);
133     Worker* GetLongTaskInfo(uint32_t taskId);
134 
135     // for callback
136     void ReleaseCallBackInfo(Task* task);
137 
138     void UpdateSystemAppFlag();
139 
IsSystemApp()140     bool IsSystemApp() const
141     {
142         return isSystemApp_;
143     }
144 
EnableFfrt()145     bool EnableFfrt() const
146     {
147         return globalEnableFfrtFlag_ || (isSystemApp_ && !disableFfrtFlag_);
148     }
149 
150     void BatchRejectDeferred(napi_env env, std::list<napi_deferred> deferreds, std::string error);
151     uint32_t CalculateTaskId(uint64_t id);
152     void ClearDependentTask(uint32_t taskId);
153     void UvReportHisysEvent(Worker* worker, std::string methodName, std::string funName, std::string message,
154                             int32_t code);
155     napi_value CancelError(napi_env env, int32_t errCode, const char* errMessage = nullptr,
156                            napi_value result = nullptr, bool success = false);
157     void SetIsPerformIdle(bool performIdle);
158     bool IsPerformIdle() const;
159     uint32_t GetNonIdleTaskNum();
160     uint32_t GetTotalTaskNum() const;
161 
162 private:
163     TaskManager();
164     ~TaskManager();
165     TaskManager(const TaskManager &) = delete;
166     TaskManager& operator=(const TaskManager &) = delete;
167     TaskManager(TaskManager &&) = delete;
168     TaskManager& operator=(TaskManager &&) = delete;
169 
170     void CreateWorkers(napi_env env, uint32_t num = 1);
171     void NotifyExecuteTask();
172     void NotifyWorkerAdded(Worker* worker);
173 
174     // for load balance
175     void RunTaskManager();
176     void CheckForBlockedWorkers();
177     template <bool needCheckIdle> void TryExpandWithCheckIdle();
178     void NotifyShrink(uint32_t targetNum);
179     void TriggerShrink(uint32_t step);
180     uint32_t ComputeSuitableThreadNum();
181     uint32_t ComputeSuitableIdleNum();
182     void DispatchAndTryExpandInner();
183     static void TryExpand(const uv_timer_t* req = nullptr);
184     static void DispatchAndTryExpand(const uv_async_t* req);
185     static void TriggerLoadBalance(const uv_timer_t* req);
186 
187     bool IsChooseIdle();
188     std::pair<uint32_t, Priority> GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue, Priority priority);
189     void IncreaseTaskNum(Priority priority);
190     void DecreaseTaskNum(Priority priority);
191     void RemoveDependTaskByTaskId(uint32_t taskId);
192     void RemoveDependentTaskByTaskId(uint32_t taskId);
193     void CheckTasksAndReportHisysEvent();
194     void WorkerAliveAndReport(Worker* worker);
195     void WriteHisysForFfrtAndUv(Worker* worker, HisyseventParams* hisyseventParams);
196     void AddCountTraceForWorkerLog(bool needLog, int64_t threadNum, int64_t idleThreadNum, int64_t timeoutThreadNum);
197 
198     // <taskId, Task>
199     std::unordered_map<uint32_t, Task*> tasks_ {};
200     std::recursive_mutex tasksMutex_;
201 
202     // <taskId, <dependent taskId1, dependent taskId2, ...>>, update when removeDependency or executeTask
203     std::unordered_map<uint32_t, std::set<uint32_t>> dependTaskInfos_ {};
204     std::shared_mutex dependTaskInfosMutex_;
205 
206     // <dependent taskId, <taskId1, taskId2, ...>>, update when removeDependency or executeTask
207     std::unordered_map<uint32_t, std::set<uint32_t>> dependentTaskInfos_ {};
208     std::shared_mutex dependentTaskInfosMutex_;
209 
210     // <<pendingTaskId1, priority>, <pendingTaskId2, priority>, ...>
211     std::unordered_map<uint32_t, Priority> pendingTaskInfos_ {};
212     std::shared_mutex pendingTaskInfosMutex_;
213 
214     // <<taskId1, <totalDuration1, cpuDuration1>>, <taskId2, <totalDuration2, cpuDuration2>>, ...>
215     std::unordered_map<uint32_t, std::pair<uint64_t, uint64_t>> taskDurationInfos_ {};
216     std::shared_mutex taskDurationInfosMutex_;
217 
218     // record the longTasks and workers for efficiency
219     std::unordered_map<uint32_t, Worker*> longTasksMap_ {};
220     std::shared_mutex longTasksMutex_{};
221 
222     std::unordered_set<Worker*> workers_ {};
223     std::unordered_set<Worker*> idleWorkers_ {};
224     std::unordered_set<Worker*> timeoutWorkers_ {};
225     std::recursive_mutex workersMutex_;
226 
227     // for load balance
228     napi_env hostEnv_ = nullptr;
229     uv_loop_t* loop_ = nullptr;
230     uv_timer_t* balanceTimer_ = nullptr;
231     uv_timer_t* expandTimer_ = nullptr;
232     uv_async_t* dispatchHandle_ = nullptr;
233     std::atomic<bool> suspend_ = false;
234     std::atomic<uint32_t> retryCount_ = 0;
235     std::atomic<uint32_t> expandingCount_ = 0;
236     std::atomic<uint32_t> nonIdleTaskNum_ = 0;
237     std::atomic<uint32_t> totalTaskNum_ = 0;
238     std::atomic<uint32_t> totalExecCount_ = 0;
239     std::atomic<uint64_t> totalExecTime_ = 0;
240     std::atomic<bool> needChecking_ = false;
241     std::atomic<bool> isHandleInited_ = false;
242     std::atomic<uint32_t> timerTriggered_ = false;
243     std::atomic<uint64_t> preDequeneTime_ = 0;
244     std::atomic<uint64_t> reportTime_ = 0;
245 
246     // for task priority
247     uint32_t highPrioExecuteCount_ = 0;
248     uint32_t mediumPrioExecuteCount_ = 0;
249     std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {};
250     std::mutex taskQueuesMutex_;
251 
252     std::atomic<bool> isInitialized_ = false;
253     std::atomic<bool> isSystemApp_ = false;
254     int disableFfrtFlag_ = 0; // 0 means enable ffrt
255     int globalEnableFfrtFlag_ = 0; // 0 means not global enable ffrt
256 
257     std::mutex callbackMutex_;
258     std::map<uint32_t, std::shared_ptr<CallbackInfo>> callbackTable_ {};
259     std::vector<Worker*> freeList_ {};
260     uint32_t maxThreads_ = ConcurrentHelper::GetMaxThreads();
261 
262 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
263     std::shared_ptr<OHOS::AppExecFwk::EventHandler> mainThreadHandler_ {};
264 #endif
265     std::atomic<bool> isPerformIdle_ = false;
266 
267     friend class TaskGroupManager;
268     friend class NativeEngineTest;
269 };
270 } // namespace Commonlibrary::Concurrent::TaskPoolModule
271 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H