• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_H
17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_H
18 
19 #include <list>
20 #include <map>
21 #include <mutex>
22 #include <set>
23 #include <shared_mutex>
24 #include <string>
25 #include <tuple>
26 #include <unordered_map>
27 #include <uv.h>
28 
29 #include "helper/concurrent_helper.h"
30 #include "napi/native_api.h"
31 #include "napi/native_node_api.h"
32 #include "utils.h"
33 #include "tools/log.h"
34 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
35 #include "event_handler.h"
36 #endif
37 
38 #if defined(ENABLE_TASKPOOL_FFRT)
39 #include "c/executor_task.h"
40 #include "ffrt_inner.h"
41 #endif
42 
43 namespace Commonlibrary::Concurrent::TaskPoolModule {
44 using namespace Commonlibrary::Platform;
45 
46 extern const std::unordered_map<Priority, napi_event_priority> g_napiPriorityMap;
47 enum ExecuteState { NOT_FOUND, WAITING, RUNNING, CANCELED, FINISHED, DELAYED, ENDING};
48 enum TaskType {
49     TASK,
50     FUNCTION_TASK,
51     SEQRUNNER_TASK,
52     COMMON_TASK,
53     GROUP_COMMON_TASK,
54     GROUP_FUNCTION_TASK,
55     ASYNCRUNNER_TASK
56 };
57 
58 struct GroupInfo;
59 class Worker;
60 struct TaskInfo {
61     napi_deferred deferred = nullptr;
62     Priority priority = Priority::DEFAULT;
63     void* serializationFunction = nullptr;
64     void* serializationArguments = nullptr;
65 };
66 
67 struct TaskCurrentInfo {
68     std::string name {};
69     uint32_t taskId {};
70     ExecuteState taskState {ExecuteState::NOT_FOUND};
71     uint64_t startTime {};
72 };
73 
74 struct ListenerCallBackInfo {
ListenerCallBackInfoListenerCallBackInfo75     ListenerCallBackInfo(napi_env env, napi_ref callbackRef, napi_value taskError) : env_(env),
76         callbackRef_(callbackRef), taskError_(taskError) {}
~ListenerCallBackInfoListenerCallBackInfo77     ~ListenerCallBackInfo()
78     {
79         napi_delete_reference(env_, callbackRef_);
80     }
81     napi_env env_;
82     napi_ref callbackRef_;
83     napi_value taskError_;
84     std::string type_;
85 };
86 
87 struct CancelTaskMessage {
CancelTaskMessageCancelTaskMessage88     CancelTaskMessage(ExecuteState state, uint32_t taskId) : state(state), taskId(taskId) {}
89     ~CancelTaskMessage() = default;
90 
91     ExecuteState state;
92     uint32_t taskId;
93 };
94 
95 struct DiscardTaskMessage {
DiscardTaskMessageDiscardTaskMessage96     DiscardTaskMessage(napi_env env, uint32_t taskId, int32_t errCode, bool isWaiting) : env(env),
97         taskId(taskId), errCode(errCode), isWaiting(isWaiting) {}
98     ~DiscardTaskMessage() = default;
99 
100     napi_env env;
101     uint32_t taskId;
102     int32_t errCode;
103     bool isWaiting;
104 };
105 
106 class Task {
107 public:
108     Task() = default;
Task(napi_env env,TaskType taskType,const char * name)109     Task(napi_env env, TaskType taskType, const char* name) : env_(env), taskType_(taskType), name_(name) {}
110 
111     ~Task() = default;
112 
113     static napi_value TaskConstructor(napi_env env, napi_callback_info cbinfo);
114     static napi_value LongTaskConstructor(napi_env env, napi_callback_info cbinfo);
115     static napi_value SetTransferList(napi_env env, napi_callback_info cbinfo);
116     static napi_value SetCloneList(napi_env env, napi_callback_info cbinfo);
117     static napi_value IsCanceled(napi_env env, napi_callback_info cbinfo);
118     static napi_value OnReceiveData(napi_env env, napi_callback_info cbinfo);
119     static napi_value SendData(napi_env env, napi_callback_info cbinfo);
120     static napi_value AddDependency(napi_env env, napi_callback_info cbinfo);
121     static napi_value RemoveDependency(napi_env env, napi_callback_info cbinfo);
122     static napi_value OnEnqueued(napi_env env, napi_callback_info cbinfo);
123     static napi_value OnStartExecution(napi_env env, napi_callback_info cbinfo);
124     static napi_value OnExecutionFailed(napi_env env, napi_callback_info cbinfo);
125     static napi_value OnExecutionSucceeded(napi_env env, napi_callback_info cbinfo);
126     static napi_value IsDone(napi_env env, napi_callback_info cbinfo);
127     static napi_value GetTotalDuration(napi_env env, napi_callback_info info);
128     static napi_value GetCPUDuration(napi_env env, napi_callback_info info);
129     static napi_value GetIODuration(napi_env env, napi_callback_info info);
130     static napi_value GetTaskDuration(napi_env env, napi_callback_info& info, std::string durationType);
131     static napi_value GetName(napi_env env, napi_callback_info info);
132     static napi_value GetTaskId(napi_env env, napi_callback_info info);
133 
134     static Task* GenerateTask(napi_env env, napi_value task, napi_value func,
135                               napi_value name, napi_value* args, size_t argc);
136     static Task* GenerateFunctionTask(napi_env env, napi_value func, napi_value* args, size_t argc, TaskType type);
137     static TaskInfo* GenerateTaskInfo(napi_env env, napi_value func, napi_value args,
138                                       napi_value transferList, napi_value cloneList, Priority priority,
139                                       bool defaultTransfer = true, bool defaultCloneSendable = false);
140     static void TaskDestructor(napi_env env, void* data, void* hint);
141 
142     static void ThrowNoDependencyError(napi_env env);
143     static void StartExecutionCallback(const uv_async_t* req);
144     static void StartExecutionTask(ListenerCallBackInfo* listenerCallBackInfo);
145     static void ExecuteListenerCallback(ListenerCallBackInfo* listenerCallBackInfo, uint32_t taskId);
146     static void CleanupHookFunc(void* arg);
147     static void Cancel(const uv_async_t* req);
148     static void DiscardTask(const uv_async_t* req);
149     static bool VerifyAndPostResult(Task* task, Priority priority);
150 
151     void StoreTaskId(uint32_t taskId);
152     napi_value GetTaskInfoPromise(napi_env env, napi_value task, TaskType taskType = TaskType::COMMON_TASK,
153                                   Priority priority = Priority::DEFAULT);
154     TaskInfo* GetTaskInfo(napi_env env, napi_value task, Priority priority);
155     void UpdateTaskType(TaskType taskType);
156     void UpdatePeriodicTask();
157     bool IsRepeatableTask() const;
158     bool IsGroupTask() const;
159     bool IsGroupCommonTask() const;
160     bool IsGroupFunctionTask() const;
161     bool IsCommonTask() const;
162     bool IsSeqRunnerTask() const;
163     bool IsFunctionTask() const;
164     bool IsLongTask() const;
165     bool IsPeriodicTask() const;
166     bool IsMainThreadTask() const;
167     bool IsExecuted() const;
168     void IncreaseRefCount();
169     void DecreaseRefCount();
170     bool IsReadyToHandle() const;
171     void NotifyPendingTask();
172     void CancelPendingTask(napi_env env);
173     bool UpdateTask(uint64_t startTime, void* worker);
174     napi_value DeserializeValue(napi_env env, napi_value* func, napi_value* args);
175     void StoreTaskDuration();
176     bool CanForSequenceRunner(napi_env env);
177     bool CanForTaskGroup(napi_env env);
178     bool CanExecute(napi_env env);
179     bool CanExecuteDelayed(napi_env env);
180     bool CanExecutePeriodically(napi_env env);
181     void SetHasDependency(bool hasDependency);
182     bool HasDependency() const;
183     void TryClearHasDependency();
184     void ClearDelayedTimers();
185     void IncreaseTaskLifecycleCount();
186     void DecreaseTaskLifecycleCount();
187     bool ShouldDeleteTask(bool needUnref = true);
188     bool CheckStartExecution(Priority priority);
189     bool IsValid();
190     void SetValid(bool isValid);
191     bool CanForAsyncRunner(napi_env env);
192     bool IsAsyncRunnerTask();
193     void SetTaskId(uint32_t taskId);
194     void TriggerCancel(CancelTaskMessage* message);
195     void CancelInner(ExecuteState state);
196     bool IsSameEnv(napi_env env);
197     void DiscardAsyncRunnerTask(DiscardTaskMessage* message);
198     void DiscardInner(DiscardTaskMessage* message);
199     void ReleaseData();
200     void DisposeCanceledTask();
201     Worker* GetWorker() const;
202     napi_env GetEnv() const;
203     uint32_t GetTaskId() const;
204     bool IsRealyCanceled();
205     bool UpdateTaskStateToWaiting();
206     bool UpdateTaskStateToRunning();
207     bool UpdateTaskStateToCanceled();
208     bool UpdateTaskStateToFinished();
209     bool UpdateTaskStateToDelayed();
210     bool UpdateTaskStateToEnding();
211     static std::tuple<napi_value, napi_value, napi_value, napi_value> GetSerializeParams(napi_env env,
212                                                                                          napi_value napiTask);
213     static std::tuple<void*, void*> GetSerializeResult(napi_env env, napi_value func, napi_value args,
214         std::tuple<napi_value, napi_value, bool, bool> transferAndCloneParams);
215 
216 private:
217     Task(const Task &) = delete;
218     Task& operator=(const Task &) = delete;
219     Task(Task &&) = delete;
220     Task& operator=(Task &&) = delete;
221 
222     void InitHandle(napi_env env);
223 
224 public:
225     napi_env env_ = nullptr;
226     std::atomic<TaskType> taskType_ {TaskType::TASK};
227     std::string name_ {};
228     uint32_t taskId_ {};
229     std::atomic<ExecuteState> taskState_ {ExecuteState::NOT_FOUND};
230     uint64_t groupId_ {}; // 0 for task outside taskgroup
231     uint64_t seqRunnerId_ {}; // 0 for task without seqRunner
232     uint64_t asyncRunnerId_ {}; // 0 for task without asyncRunner
233     TaskInfo* currentTaskInfo_ {};
234     std::list<TaskInfo*> pendingTaskInfos_ {}; // for a common task executes multiple times
235     void* result_ = nullptr;
236     std::atomic<bool> success_ {true};
237     std::atomic<uint64_t> startTime_ {};
238     std::atomic<uint64_t> cpuTime_ {};
239     std::atomic<uint64_t> ioTime_ {};
240     void* worker_ {nullptr};
241     napi_ref taskRef_ {};
242     std::atomic<uint32_t> taskRefCount_ {};
243     std::recursive_mutex taskMutex_ {};
244     bool hasDependency_ {false};
245     bool isLongTask_ {false};
246     bool defaultTransfer_ {true};
247     bool defaultCloneSendable_ {false};
248     std::atomic<bool> isValid_ {true};
249     std::atomic<uint32_t> lifecycleCount_ {0}; // when lifecycleCount_ is 0, the task pointer can be deleted
250     uv_async_t* onStartExecutionSignal_ = nullptr;
251     uv_async_t* onStartCancelSignal_ = nullptr;
252     uv_async_t* onStartDiscardSignal_ = nullptr;
253     ListenerCallBackInfo* onEnqueuedCallBackInfo_ = nullptr;
254     ListenerCallBackInfo* onStartExecutionCallBackInfo_ = nullptr;
255     ListenerCallBackInfo* onExecutionFailedCallBackInfo_ = nullptr;
256     ListenerCallBackInfo* onExecutionSucceededCallBackInfo_ = nullptr;
257 
258     // for periodic task
259     bool isPeriodicTask_ {false};
260     uv_timer_t* timer_ {nullptr};
261     Priority periodicTaskPriority_ {Priority::DEFAULT};
262 
263     std::set<uv_timer_t*> delayedTimers_ {}; // task delayed timer
264 
265     bool isMainThreadTask_ {false};
266     Priority asyncTaskPriority_ {Priority::DEFAULT};
267     std::atomic<bool> isCancelToFinish_ {false};
268 };
269 
270 struct CallbackInfo {
CallbackInfoCallbackInfo271     CallbackInfo(napi_env env, uint32_t count, napi_ref ref)
272         : hostEnv(env), refCount(count), callbackRef(ref) {}
~CallbackInfoCallbackInfo273     ~CallbackInfo()
274     {
275         napi_delete_reference(hostEnv, callbackRef);
276     }
277 
278     napi_env hostEnv;
279     uint32_t refCount;
280     napi_ref callbackRef;
281     std::string type;
282 };
283 
284 struct TaskResultInfo {
TaskResultInfoTaskResultInfo285     TaskResultInfo(napi_env workerEnv, uint32_t taskId, void* args)
286         : workerEnv(workerEnv), taskId(taskId), serializationArgs(args) {}
287     ~TaskResultInfo() = default;
288 
289     napi_env workerEnv;
290     uint32_t taskId;
291     void* serializationArgs;
292 };
293 } // namespace Commonlibrary::Concurrent::TaskPoolModule
294 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_H