1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_H 17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_H 18 19 #include <list> 20 #include <map> 21 #include <mutex> 22 #include <set> 23 #include <shared_mutex> 24 #include <string> 25 #include <tuple> 26 #include <unordered_map> 27 #include <uv.h> 28 29 #include "helper/concurrent_helper.h" 30 #include "napi/native_api.h" 31 #include "napi/native_node_api.h" 32 #include "utils.h" 33 #include "tools/log.h" 34 #if defined(ENABLE_TASKPOOL_EVENTHANDLER) 35 #include "event_handler.h" 36 #endif 37 38 #if defined(ENABLE_TASKPOOL_FFRT) 39 #include "c/executor_task.h" 40 #include "ffrt_inner.h" 41 #endif 42 43 namespace Commonlibrary::Concurrent::TaskPoolModule { 44 using namespace Commonlibrary::Platform; 45 46 extern const std::unordered_map<Priority, napi_event_priority> g_napiPriorityMap; 47 enum ExecuteState { NOT_FOUND, WAITING, RUNNING, CANCELED, FINISHED, DELAYED, ENDING}; 48 enum TaskType { 49 TASK, 50 FUNCTION_TASK, 51 SEQRUNNER_TASK, 52 COMMON_TASK, 53 GROUP_COMMON_TASK, 54 GROUP_FUNCTION_TASK, 55 ASYNCRUNNER_TASK 56 }; 57 58 struct GroupInfo; 59 class Worker; 60 struct TaskInfo { 61 napi_deferred deferred = nullptr; 62 Priority priority = Priority::DEFAULT; 63 void* serializationFunction = nullptr; 64 void* serializationArguments = nullptr; 65 }; 66 67 struct TaskCurrentInfo { 68 std::string name {}; 69 uint32_t taskId {}; 70 ExecuteState taskState {ExecuteState::NOT_FOUND}; 71 uint64_t startTime {}; 72 }; 73 74 struct ListenerCallBackInfo { ListenerCallBackInfoListenerCallBackInfo75 ListenerCallBackInfo(napi_env env, napi_ref callbackRef, napi_value taskError) : env_(env), 76 callbackRef_(callbackRef), taskError_(taskError) {} ~ListenerCallBackInfoListenerCallBackInfo77 ~ListenerCallBackInfo() 78 { 79 napi_delete_reference(env_, callbackRef_); 80 } 81 napi_env env_; 82 napi_ref callbackRef_; 83 napi_value taskError_; 84 std::string type_; 85 }; 86 87 struct CancelTaskMessage { CancelTaskMessageCancelTaskMessage88 CancelTaskMessage(ExecuteState state, uint32_t taskId) : state(state), taskId(taskId) {} 89 ~CancelTaskMessage() = default; 90 91 ExecuteState state; 92 uint32_t taskId; 93 }; 94 95 struct DiscardTaskMessage { DiscardTaskMessageDiscardTaskMessage96 DiscardTaskMessage(napi_env env, uint32_t taskId, int32_t errCode, bool isWaiting) : env(env), 97 taskId(taskId), errCode(errCode), isWaiting(isWaiting) {} 98 ~DiscardTaskMessage() = default; 99 100 napi_env env; 101 uint32_t taskId; 102 int32_t errCode; 103 bool isWaiting; 104 }; 105 106 class Task { 107 public: 108 Task() = default; Task(napi_env env,TaskType taskType,const char * name)109 Task(napi_env env, TaskType taskType, const char* name) : env_(env), taskType_(taskType), name_(name) {} 110 111 ~Task() = default; 112 113 static napi_value TaskConstructor(napi_env env, napi_callback_info cbinfo); 114 static napi_value LongTaskConstructor(napi_env env, napi_callback_info cbinfo); 115 static napi_value SetTransferList(napi_env env, napi_callback_info cbinfo); 116 static napi_value SetCloneList(napi_env env, napi_callback_info cbinfo); 117 static napi_value IsCanceled(napi_env env, napi_callback_info cbinfo); 118 static napi_value OnReceiveData(napi_env env, napi_callback_info cbinfo); 119 static napi_value SendData(napi_env env, napi_callback_info cbinfo); 120 static napi_value AddDependency(napi_env env, napi_callback_info cbinfo); 121 static napi_value RemoveDependency(napi_env env, napi_callback_info cbinfo); 122 static napi_value OnEnqueued(napi_env env, napi_callback_info cbinfo); 123 static napi_value OnStartExecution(napi_env env, napi_callback_info cbinfo); 124 static napi_value OnExecutionFailed(napi_env env, napi_callback_info cbinfo); 125 static napi_value OnExecutionSucceeded(napi_env env, napi_callback_info cbinfo); 126 static napi_value IsDone(napi_env env, napi_callback_info cbinfo); 127 static napi_value GetTotalDuration(napi_env env, napi_callback_info info); 128 static napi_value GetCPUDuration(napi_env env, napi_callback_info info); 129 static napi_value GetIODuration(napi_env env, napi_callback_info info); 130 static napi_value GetTaskDuration(napi_env env, napi_callback_info& info, std::string durationType); 131 static napi_value GetName(napi_env env, napi_callback_info info); 132 static napi_value GetTaskId(napi_env env, napi_callback_info info); 133 134 static Task* GenerateTask(napi_env env, napi_value task, napi_value func, 135 napi_value name, napi_value* args, size_t argc); 136 static Task* GenerateFunctionTask(napi_env env, napi_value func, napi_value* args, size_t argc, TaskType type); 137 static TaskInfo* GenerateTaskInfo(napi_env env, napi_value func, napi_value args, 138 napi_value transferList, napi_value cloneList, Priority priority, 139 bool defaultTransfer = true, bool defaultCloneSendable = false); 140 static void TaskDestructor(napi_env env, void* data, void* hint); 141 142 static void ThrowNoDependencyError(napi_env env); 143 static void StartExecutionCallback(const uv_async_t* req); 144 static void StartExecutionTask(ListenerCallBackInfo* listenerCallBackInfo); 145 static void ExecuteListenerCallback(ListenerCallBackInfo* listenerCallBackInfo, uint32_t taskId); 146 static void CleanupHookFunc(void* arg); 147 static void Cancel(const uv_async_t* req); 148 static void DiscardTask(const uv_async_t* req); 149 static bool VerifyAndPostResult(Task* task, Priority priority); 150 151 void StoreTaskId(uint32_t taskId); 152 napi_value GetTaskInfoPromise(napi_env env, napi_value task, TaskType taskType = TaskType::COMMON_TASK, 153 Priority priority = Priority::DEFAULT); 154 TaskInfo* GetTaskInfo(napi_env env, napi_value task, Priority priority); 155 void UpdateTaskType(TaskType taskType); 156 void UpdatePeriodicTask(); 157 bool IsRepeatableTask() const; 158 bool IsGroupTask() const; 159 bool IsGroupCommonTask() const; 160 bool IsGroupFunctionTask() const; 161 bool IsCommonTask() const; 162 bool IsSeqRunnerTask() const; 163 bool IsFunctionTask() const; 164 bool IsLongTask() const; 165 bool IsPeriodicTask() const; 166 bool IsMainThreadTask() const; 167 bool IsExecuted() const; 168 void IncreaseRefCount(); 169 void DecreaseRefCount(); 170 bool IsReadyToHandle() const; 171 void NotifyPendingTask(); 172 void CancelPendingTask(napi_env env); 173 bool UpdateTask(uint64_t startTime, void* worker); 174 napi_value DeserializeValue(napi_env env, napi_value* func, napi_value* args); 175 void StoreTaskDuration(); 176 bool CanForSequenceRunner(napi_env env); 177 bool CanForTaskGroup(napi_env env); 178 bool CanExecute(napi_env env); 179 bool CanExecuteDelayed(napi_env env); 180 bool CanExecutePeriodically(napi_env env); 181 void SetHasDependency(bool hasDependency); 182 bool HasDependency() const; 183 void TryClearHasDependency(); 184 void ClearDelayedTimers(); 185 void IncreaseTaskLifecycleCount(); 186 void DecreaseTaskLifecycleCount(); 187 bool ShouldDeleteTask(bool needUnref = true); 188 bool CheckStartExecution(Priority priority); 189 bool IsValid(); 190 void SetValid(bool isValid); 191 bool CanForAsyncRunner(napi_env env); 192 bool IsAsyncRunnerTask(); 193 void SetTaskId(uint32_t taskId); 194 void TriggerCancel(CancelTaskMessage* message); 195 void CancelInner(ExecuteState state); 196 bool IsSameEnv(napi_env env); 197 void DiscardAsyncRunnerTask(DiscardTaskMessage* message); 198 void DiscardInner(DiscardTaskMessage* message); 199 void ReleaseData(); 200 void DisposeCanceledTask(); 201 Worker* GetWorker() const; 202 napi_env GetEnv() const; 203 uint32_t GetTaskId() const; 204 bool IsRealyCanceled(); 205 bool UpdateTaskStateToWaiting(); 206 bool UpdateTaskStateToRunning(); 207 bool UpdateTaskStateToCanceled(); 208 bool UpdateTaskStateToFinished(); 209 bool UpdateTaskStateToDelayed(); 210 bool UpdateTaskStateToEnding(); 211 static std::tuple<napi_value, napi_value, napi_value, napi_value> GetSerializeParams(napi_env env, 212 napi_value napiTask); 213 static std::tuple<void*, void*> GetSerializeResult(napi_env env, napi_value func, napi_value args, 214 std::tuple<napi_value, napi_value, bool, bool> transferAndCloneParams); 215 216 private: 217 Task(const Task &) = delete; 218 Task& operator=(const Task &) = delete; 219 Task(Task &&) = delete; 220 Task& operator=(Task &&) = delete; 221 222 void InitHandle(napi_env env); 223 224 public: 225 napi_env env_ = nullptr; 226 std::atomic<TaskType> taskType_ {TaskType::TASK}; 227 std::string name_ {}; 228 uint32_t taskId_ {}; 229 std::atomic<ExecuteState> taskState_ {ExecuteState::NOT_FOUND}; 230 uint64_t groupId_ {}; // 0 for task outside taskgroup 231 uint64_t seqRunnerId_ {}; // 0 for task without seqRunner 232 uint64_t asyncRunnerId_ {}; // 0 for task without asyncRunner 233 TaskInfo* currentTaskInfo_ {}; 234 std::list<TaskInfo*> pendingTaskInfos_ {}; // for a common task executes multiple times 235 void* result_ = nullptr; 236 std::atomic<bool> success_ {true}; 237 std::atomic<uint64_t> startTime_ {}; 238 std::atomic<uint64_t> cpuTime_ {}; 239 std::atomic<uint64_t> ioTime_ {}; 240 void* worker_ {nullptr}; 241 napi_ref taskRef_ {}; 242 std::atomic<uint32_t> taskRefCount_ {}; 243 std::recursive_mutex taskMutex_ {}; 244 bool hasDependency_ {false}; 245 bool isLongTask_ {false}; 246 bool defaultTransfer_ {true}; 247 bool defaultCloneSendable_ {false}; 248 std::atomic<bool> isValid_ {true}; 249 std::atomic<uint32_t> lifecycleCount_ {0}; // when lifecycleCount_ is 0, the task pointer can be deleted 250 uv_async_t* onStartExecutionSignal_ = nullptr; 251 uv_async_t* onStartCancelSignal_ = nullptr; 252 uv_async_t* onStartDiscardSignal_ = nullptr; 253 ListenerCallBackInfo* onEnqueuedCallBackInfo_ = nullptr; 254 ListenerCallBackInfo* onStartExecutionCallBackInfo_ = nullptr; 255 ListenerCallBackInfo* onExecutionFailedCallBackInfo_ = nullptr; 256 ListenerCallBackInfo* onExecutionSucceededCallBackInfo_ = nullptr; 257 258 // for periodic task 259 bool isPeriodicTask_ {false}; 260 uv_timer_t* timer_ {nullptr}; 261 Priority periodicTaskPriority_ {Priority::DEFAULT}; 262 263 std::set<uv_timer_t*> delayedTimers_ {}; // task delayed timer 264 265 bool isMainThreadTask_ {false}; 266 Priority asyncTaskPriority_ {Priority::DEFAULT}; 267 std::atomic<bool> isCancelToFinish_ {false}; 268 }; 269 270 struct CallbackInfo { CallbackInfoCallbackInfo271 CallbackInfo(napi_env env, uint32_t count, napi_ref ref) 272 : hostEnv(env), refCount(count), callbackRef(ref) {} ~CallbackInfoCallbackInfo273 ~CallbackInfo() 274 { 275 napi_delete_reference(hostEnv, callbackRef); 276 } 277 278 napi_env hostEnv; 279 uint32_t refCount; 280 napi_ref callbackRef; 281 std::string type; 282 }; 283 284 struct TaskResultInfo { TaskResultInfoTaskResultInfo285 TaskResultInfo(napi_env workerEnv, uint32_t taskId, void* args) 286 : workerEnv(workerEnv), taskId(taskId), serializationArgs(args) {} 287 ~TaskResultInfo() = default; 288 289 napi_env workerEnv; 290 uint32_t taskId; 291 void* serializationArgs; 292 }; 293 } // namespace Commonlibrary::Concurrent::TaskPoolModule 294 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_H