1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_H 17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_H 18 19 #include <list> 20 #include <map> 21 #include <mutex> 22 #include <set> 23 #include <shared_mutex> 24 #include <string> 25 #include <uv.h> 26 27 #include "helper/concurrent_helper.h" 28 #include "napi/native_api.h" 29 #include "utils.h" 30 #include "tools/log.h" 31 #if defined(ENABLE_TASKPOOL_EVENTHANDLER) 32 #include "event_handler.h" 33 #endif 34 35 #if defined(ENABLE_TASKPOOL_FFRT) 36 #include "c/executor_task.h" 37 #include "ffrt_inner.h" 38 #endif 39 40 namespace Commonlibrary::Concurrent::TaskPoolModule { 41 using namespace Commonlibrary::Platform; 42 43 enum ExecuteState { NOT_FOUND, WAITING, RUNNING, CANCELED, FINISHED, DELAYED, ENDING}; 44 enum TaskType { 45 TASK, 46 FUNCTION_TASK, 47 SEQRUNNER_TASK, 48 COMMON_TASK, 49 GROUP_COMMON_TASK, 50 GROUP_FUNCTION_TASK, 51 ASYNCRUNNER_TASK 52 }; 53 54 struct GroupInfo; 55 class Worker; 56 struct TaskInfo { 57 napi_deferred deferred = nullptr; 58 Priority priority {Priority::DEFAULT}; 59 void* serializationFunction = nullptr; 60 void* serializationArguments = nullptr; 61 }; 62 63 struct ListenerCallBackInfo { ListenerCallBackInfoListenerCallBackInfo64 ListenerCallBackInfo(napi_env env, napi_ref callbackRef, napi_value taskError) : env_(env), 65 callbackRef_(callbackRef), taskError_(taskError) {} ~ListenerCallBackInfoListenerCallBackInfo66 ~ListenerCallBackInfo() 67 { 68 napi_delete_reference(env_, callbackRef_); 69 } 70 napi_env env_; 71 napi_ref callbackRef_; 72 napi_value taskError_; 73 }; 74 75 struct CancelTaskMessage { CancelTaskMessageCancelTaskMessage76 CancelTaskMessage(ExecuteState state, uint32_t taskId) : state(state), taskId(taskId) {} 77 ~CancelTaskMessage() = default; 78 79 ExecuteState state; 80 uint32_t taskId; 81 }; 82 83 struct DiscardTaskMessage { DiscardTaskMessageDiscardTaskMessage84 DiscardTaskMessage(napi_env env, uint32_t taskId, int32_t errCode, bool isWaiting) : env(env), 85 taskId(taskId), errCode(errCode), isWaiting(isWaiting) {} 86 ~DiscardTaskMessage() = default; 87 88 napi_env env; 89 uint32_t taskId; 90 int32_t errCode; 91 bool isWaiting; 92 }; 93 94 class Task { 95 public: 96 Task(napi_env env, TaskType taskType, std::string name); 97 Task() = default; 98 ~Task() = default; 99 100 static napi_value TaskConstructor(napi_env env, napi_callback_info cbinfo); 101 static napi_value LongTaskConstructor(napi_env env, napi_callback_info cbinfo); 102 static napi_value SetTransferList(napi_env env, napi_callback_info cbinfo); 103 static napi_value SetCloneList(napi_env env, napi_callback_info cbinfo); 104 static napi_value IsCanceled(napi_env env, napi_callback_info cbinfo); 105 static napi_value OnReceiveData(napi_env env, napi_callback_info cbinfo); 106 static napi_value SendData(napi_env env, napi_callback_info cbinfo); 107 static napi_value AddDependency(napi_env env, napi_callback_info cbinfo); 108 static napi_value RemoveDependency(napi_env env, napi_callback_info cbinfo); 109 static napi_value OnEnqueued(napi_env env, napi_callback_info cbinfo); 110 static napi_value OnStartExecution(napi_env env, napi_callback_info cbinfo); 111 static napi_value OnExecutionFailed(napi_env env, napi_callback_info cbinfo); 112 static napi_value OnExecutionSucceeded(napi_env env, napi_callback_info cbinfo); 113 static napi_value IsDone(napi_env env, napi_callback_info cbinfo); 114 static napi_value GetTotalDuration(napi_env env, napi_callback_info info); 115 static napi_value GetCPUDuration(napi_env env, napi_callback_info info); 116 static napi_value GetIODuration(napi_env env, napi_callback_info info); 117 static napi_value GetTaskDuration(napi_env env, napi_callback_info& info, std::string durationType); 118 static napi_value GetName(napi_env env, napi_callback_info info); 119 static napi_value GetTaskId(napi_env env, napi_callback_info info); 120 121 static Task* GenerateTask(napi_env env, napi_value task, napi_value func, 122 napi_value name, napi_value* args, size_t argc); 123 static Task* GenerateFunctionTask(napi_env env, napi_value func, napi_value* args, size_t argc, TaskType type); 124 static TaskInfo* GenerateTaskInfo(napi_env env, napi_value func, napi_value args, 125 napi_value transferList, napi_value cloneList, Priority priority, 126 bool defaultTransfer = true, bool defaultCloneSendable = false); 127 static void TaskDestructor(napi_env env, void* data, void* hint); 128 129 static void ThrowNoDependencyError(napi_env env); 130 static void StartExecutionCallback(const uv_async_t* req); 131 static void StartExecutionTask(ListenerCallBackInfo* listenerCallBackInfo); 132 static void ExecuteListenerCallback(ListenerCallBackInfo* listenerCallBackInfo); 133 static void CleanupHookFunc(void* arg); 134 static void Cancel(const uv_async_t* req); 135 static void DiscardTask(const uv_async_t* req); 136 137 void StoreTaskId(uint32_t taskId); 138 napi_value GetTaskInfoPromise(napi_env env, napi_value task, TaskType taskType = TaskType::COMMON_TASK, 139 Priority priority = Priority::DEFAULT); 140 TaskInfo* GetTaskInfo(napi_env env, napi_value task, Priority priority); 141 void UpdateTaskType(TaskType taskType); 142 void UpdatePeriodicTask(); 143 bool IsRepeatableTask() const; 144 bool IsGroupTask() const; 145 bool IsGroupCommonTask() const; 146 bool IsGroupFunctionTask() const; 147 bool IsCommonTask() const; 148 bool IsSeqRunnerTask() const; 149 bool IsFunctionTask() const; 150 bool IsLongTask() const; 151 bool IsPeriodicTask() const; 152 bool IsMainThreadTask() const; 153 bool IsExecuted() const; 154 void IncreaseRefCount(); 155 void DecreaseRefCount(); 156 bool IsReadyToHandle() const; 157 void NotifyPendingTask(); 158 void CancelPendingTask(napi_env env); 159 bool UpdateTask(uint64_t startTime, void* worker); 160 napi_value DeserializeValue(napi_env env, napi_value* func, napi_value* args); 161 void StoreTaskDuration(); 162 bool CanForSequenceRunner(napi_env env); 163 bool CanForTaskGroup(napi_env env); 164 bool CanExecute(napi_env env); 165 bool CanExecuteDelayed(napi_env env); 166 bool CanExecutePeriodically(napi_env env); 167 void SetHasDependency(bool hasDependency); 168 bool HasDependency() const; 169 void TryClearHasDependency(); 170 void ClearDelayedTimers(); 171 void IncreaseTaskRefCount(); 172 void DecreaseTaskRefCount(); 173 bool ShouldDeleteTask(bool needUnref = true); 174 bool VerifyAndPostResult(Priority priority); 175 bool CheckStartExecution(Priority priority); 176 bool IsValid(); 177 void SetValid(bool isValid); 178 bool CanForAsyncRunner(napi_env env); 179 bool IsAsyncRunnerTask(); 180 void SetTaskId(uint32_t taskId); 181 void TriggerCancel(CancelTaskMessage* message); 182 void CancelInner(ExecuteState state); 183 bool IsSameEnv(napi_env env); 184 void DiscardAsyncRunnerTask(DiscardTaskMessage* message); 185 void DiscardInner(DiscardTaskMessage* message); 186 void ReleaseData(); 187 void DisposeCanceledTask(); 188 189 private: 190 Task(const Task &) = delete; 191 Task& operator=(const Task &) = delete; 192 Task(Task &&) = delete; 193 Task& operator=(Task &&) = delete; 194 195 void InitHandle(napi_env env); 196 197 public: 198 napi_env env_ = nullptr; 199 std::atomic<TaskType> taskType_ {TaskType::TASK}; 200 std::string name_ {}; 201 uint32_t taskId_ {}; 202 std::atomic<ExecuteState> taskState_ {ExecuteState::NOT_FOUND}; 203 uint64_t groupId_ {}; // 0 for task outside taskgroup 204 uint64_t seqRunnerId_ {}; // 0 for task without seqRunner 205 uint64_t asyncRunnerId_ {}; // 0 for task without asyncRunner 206 TaskInfo* currentTaskInfo_ {}; 207 std::list<TaskInfo*> pendingTaskInfos_ {}; // for a common task executes multiple times 208 void* result_ = nullptr; 209 uv_async_t* onResultSignal_ = nullptr; 210 std::atomic<bool> success_ {true}; 211 std::atomic<uint64_t> startTime_ {}; 212 std::atomic<uint64_t> cpuTime_ {}; 213 std::atomic<uint64_t> ioTime_ {}; 214 void* worker_ {nullptr}; 215 napi_ref taskRef_ {}; 216 std::atomic<uint32_t> taskRefCount_ {}; 217 std::recursive_mutex taskMutex_ {}; 218 bool hasDependency_ {false}; 219 bool isLongTask_ {false}; 220 bool defaultTransfer_ {true}; 221 bool defaultCloneSendable_ {false}; 222 std::atomic<bool> isValid_ {true}; 223 std::atomic<uint32_t> refCount_ {false}; // when refCount_ is 0, the task pointer can be deleted 224 uv_async_t* onStartExecutionSignal_ = nullptr; 225 uv_async_t* onStartCancelSignal_ = nullptr; 226 uv_async_t* onStartDiscardSignal_ = nullptr; 227 ListenerCallBackInfo* onEnqueuedCallBackInfo_ = nullptr; 228 ListenerCallBackInfo* onStartExecutionCallBackInfo_ = nullptr; 229 ListenerCallBackInfo* onExecutionFailedCallBackInfo_ = nullptr; 230 ListenerCallBackInfo* onExecutionSucceededCallBackInfo_ = nullptr; 231 232 // for periodic task 233 bool isPeriodicTask_ {false}; 234 // periodic task first Generate TaskInfo 235 std::atomic<bool> isFirstTaskInfo_ {false}; 236 uv_timer_t* timer_ {nullptr}; 237 Priority periodicTaskPriority_ {Priority::DEFAULT}; 238 239 std::set<uv_timer_t*> delayedTimers_ {}; // task delayed timer 240 241 bool isMainThreadTask_ {false}; 242 Priority asyncTaskPriority_ {Priority::DEFAULT}; 243 std::atomic<bool> isCancelToFinish_ {false}; 244 }; 245 246 struct CallbackInfo { CallbackInfoCallbackInfo247 CallbackInfo(napi_env env, uint32_t count, napi_ref ref, Task* task) 248 : hostEnv(env), refCount(count), callbackRef(ref), task(task), onCallbackSignal(nullptr), worker(nullptr) {} ~CallbackInfoCallbackInfo249 ~CallbackInfo() 250 { 251 napi_delete_reference(hostEnv, callbackRef); 252 #if defined(ENABLE_TASKPOOL_EVENTHANDLER) 253 if (task == nullptr) { 254 return; 255 } 256 if (!task->IsMainThreadTask() && onCallbackSignal != nullptr) { 257 Common::Helper::ConcurrentHelper::UvHandleClose(onCallbackSignal); 258 } 259 #else 260 if (onCallbackSignal != nullptr) { 261 Common::Helper::ConcurrentHelper::UvHandleClose(onCallbackSignal); 262 } 263 #endif 264 } 265 266 napi_env hostEnv; 267 uint32_t refCount; 268 napi_ref callbackRef; 269 Task* task; 270 uv_async_t* onCallbackSignal; 271 Worker* worker; 272 }; 273 274 struct TaskResultInfo { TaskResultInfoTaskResultInfo275 TaskResultInfo(napi_env env, napi_env curEnv, uint32_t id, void* args) : hostEnv(env), workerEnv(curEnv), 276 taskId(id), serializationArgs(args) {} 277 ~TaskResultInfo() = default; 278 279 napi_env hostEnv; 280 napi_env workerEnv; 281 uint32_t taskId; 282 void* serializationArgs; 283 }; 284 } // namespace Commonlibrary::Concurrent::TaskPoolModule 285 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_H