1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H 17 #define JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H 18 19 #include <mutex> 20 21 #if defined(ENABLE_TASKPOOL_FFRT) 22 #include "cpp/task.h" 23 #endif 24 #include "helper/concurrent_helper.h" 25 #include "helper/error_helper.h" 26 #include "helper/napi_helper.h" 27 #include "helper/object_helper.h" 28 #include "message_queue.h" 29 #include "napi/native_api.h" 30 #include "napi/native_node_api.h" 31 #include "native_engine/native_engine.h" 32 #include "qos_helper.h" 33 #include "task.h" 34 #include "task_runner.h" 35 #include "tools/log.h" 36 37 namespace Commonlibrary::Concurrent::TaskPoolModule { 38 using namespace Commonlibrary::Concurrent::Common; 39 using namespace Commonlibrary::Concurrent::Common::Helper; 40 using namespace Commonlibrary::Platform; 41 using MsgQueue = MessageQueue<TaskResultInfo*>; 42 43 enum class WorkerState { IDLE, RUNNING, BLOCKED }; 44 45 #if defined(ENABLE_TASKPOOL_FFRT) 46 static const std::map<Priority, int> WORKERPRIORITY_FFRTQOS_MAP = { 47 {Priority::IDLE, ffrt::qos_background}, 48 {Priority::LOW, ffrt::qos_utility}, 49 {Priority::MEDIUM, ffrt::qos_default}, 50 {Priority::HIGH, ffrt::qos_user_initiated}, 51 }; 52 #endif 53 54 class Worker { 55 public: 56 using DebuggerPostTask = std::function<void()>; 57 58 static Worker* WorkerConstructor(napi_env env); 59 60 void NotifyExecuteTask(); 61 Enqueue(napi_env env,TaskResultInfo * resultInfo)62 void Enqueue(napi_env env, TaskResultInfo* resultInfo) 63 { 64 std::lock_guard<std::mutex> lock(queueMutex_); 65 msgQueueMap_[env].EnQueue(resultInfo); 66 } 67 Dequeue(napi_env env,MsgQueue * & queue)68 void Dequeue(napi_env env, MsgQueue*& queue) 69 { 70 std::lock_guard<std::mutex> lock(queueMutex_); 71 auto item = msgQueueMap_.find(env); 72 if (item != msgQueueMap_.end()) { 73 queue = &(item->second); 74 } 75 } 76 77 void NotifyTaskBegin(); 78 // the function will only be called when the task is finished or 79 // exits abnormally, so we can not put it in the scope directly 80 void NotifyTaskFinished(); 81 static void NotifyTaskResult(napi_env env, Task* task, napi_value result); 82 static void NotifyHandleTaskResult(Task* task); 83 84 #if defined(ENABLE_TASKPOOL_FFRT) 85 bool IsLoopActive(); 86 uint64_t GetWaitTime(); 87 #endif 88 #if defined(ENABLE_TASKPOOL_HISYSEVENT) 89 bool IsNeedReport(uint64_t intervalTime); 90 void IncreaseReportCount(); 91 #endif 92 93 private: Worker(napi_env env)94 explicit Worker(napi_env env) : hostEnv_(env) {}; 95 96 ~Worker() = default; 97 98 Worker(const Worker &) = delete; 99 Worker& operator=(const Worker &) = delete; 100 Worker(Worker &&) = delete; 101 Worker& operator=(Worker &&) = delete; 102 103 void NotifyIdle(); 104 void NotifyWorkerCreated(); NotifyTaskRunning()105 void NotifyTaskRunning() 106 { 107 state_ = WorkerState::RUNNING; 108 startTime_ = ConcurrentHelper::GetMilliseconds(); 109 runningCount_++; 110 } 111 HasRunningTasks()112 bool HasRunningTasks() const 113 { 114 return runningCount_ != 0; 115 } 116 117 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 118 static void HandleDebuggerTask(const uv_async_t* req); 119 void DebuggerOnPostTask(std::function<void()>&& task); 120 #endif 121 GetWorkerLoop()122 uv_loop_t* GetWorkerLoop() const 123 { 124 if (workerEnv_ != nullptr) { 125 return NapiHelper::GetLibUV(workerEnv_); 126 } 127 return nullptr; 128 } 129 RunLoop()130 void RunLoop() const 131 { 132 uv_loop_t* loop = GetWorkerLoop(); 133 if (loop != nullptr) { 134 uv_run(loop, UV_RUN_DEFAULT); 135 } else { 136 HILOG_ERROR("taskpool:: Worker loop is nullptr when start worker loop"); 137 return; 138 } 139 } 140 141 // we will use the scope to manage resources automatically, 142 // including the HandleScope and NotifyRunning/NotifyIdle 143 class RunningScope { 144 public: RunningScope(Worker * worker)145 explicit RunningScope(Worker* worker) : worker_(worker) 146 { 147 napi_open_handle_scope(worker_->workerEnv_, &scope_); 148 worker_->idleState_ = false; 149 worker->isExecutingLongTask_ = false; 150 worker_->NotifyTaskRunning(); 151 } 152 153 ~RunningScope(); 154 155 private: 156 Worker* worker_ = nullptr; 157 napi_handle_scope scope_ = nullptr; 158 }; 159 160 // use PriorityScope to manage the priority setting of workers 161 // reset qos_user_initiated when exit PriorityScope 162 class PriorityScope { 163 public: 164 PriorityScope(Worker* worker, Priority taskPriority); ~PriorityScope()165 ~PriorityScope() 166 { 167 worker_->ResetWorkerPriority(); 168 } 169 170 private: 171 Worker* worker_ = nullptr; 172 }; 173 174 void StartExecuteInThread(); 175 static void ExecuteInThread(const void* data); 176 bool PrepareForWorkerInstance(); 177 void ReleaseWorkerThreadContent(); 178 void ResetWorkerPriority(); 179 bool CheckFreeConditions(); 180 bool UpdateWorkerState(WorkerState expect, WorkerState desired); 181 void StoreTaskId(uint32_t taskId); 182 bool InitTaskPoolFunc(napi_env env, napi_value func, Task* task); 183 void UpdateExecutedInfo(); 184 void UpdateLongTaskInfo(Task* task); 185 bool IsExecutingLongTask(); 186 bool HasLongTask(); 187 void TerminateTask(uint32_t taskId); 188 void CloseHandles(); 189 void PostReleaseSignal(); 190 bool IsRunnable(uint64_t currTime) const; 191 void UpdateWorkerWakeUpTime(); 192 193 static void HandleFunctionException(napi_env env, Task* task); 194 static void PerformTask(const uv_async_t* req); 195 static void TaskResultCallback(napi_env env, napi_value result, bool success, void* data); 196 static void ReleaseWorkerHandles(const uv_async_t* req); 197 static void TriggerGCCheck(const uv_async_t* req); 198 199 #if defined(ENABLE_TASKPOOL_FFRT) 200 void InitFfrtInfo(); 201 void InitLoopHandleNum(); 202 #endif 203 204 napi_env hostEnv_ {nullptr}; 205 napi_env workerEnv_ {nullptr}; 206 uv_async_t* performTaskSignal_ {nullptr}; 207 uv_async_t* clearWorkerSignal_ {nullptr}; 208 uv_async_t* triggerGCCheckSignal_ {nullptr}; 209 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 210 uv_async_t* debuggerOnPostTaskSignal_ {nullptr}; 211 std::mutex debuggerMutex_; 212 std::queue<DebuggerPostTask> debuggerQueue_ {}; 213 #endif 214 std::unique_ptr<TaskRunner> runner_ {nullptr}; 215 216 std::atomic<int32_t> runningCount_ = 0; 217 std::atomic<bool> idleState_ = true; // true means the worker is idle 218 std::atomic<uint64_t> idlePoint_ = ConcurrentHelper::GetMilliseconds(); 219 std::atomic<uint64_t> startTime_ = ConcurrentHelper::GetMilliseconds(); 220 std::atomic<uint64_t> wakeUpTime_ = ConcurrentHelper::GetMilliseconds(); 221 std::atomic<WorkerState> state_ {WorkerState::IDLE}; 222 std::atomic<bool> hasExecuted_ = false; // false means this worker hasn't execute any tasks 223 Priority priority_ {Priority::DEFAULT}; 224 pid_t tid_ = 0; 225 std::vector<uint32_t> currentTaskId_ {}; 226 std::mutex currentTaskIdMutex_; 227 MessageQueue<TaskResultInfo*> hostMessageQueue_ {}; 228 uint64_t lastCpuTime_ = 0; 229 uint32_t idleCount_ = 0; 230 std::atomic<bool> hasLongTask_ = false; 231 std::atomic<bool> isExecutingLongTask_ = false; 232 std::mutex longMutex_; 233 std::unordered_set<uint32_t> longTasksSet_ {}; 234 std::mutex queueMutex_; // for sendData 235 std::unordered_map<napi_env, MsgQueue> msgQueueMap_ {}; 236 friend class TaskManager; 237 friend class NativeEngineTest; 238 239 #if defined(ENABLE_TASKPOOL_FFRT) 240 void* ffrtTaskHandle_ = nullptr; 241 uint32_t initActiveHandleNum_ = 0; 242 #endif 243 #if defined(ENABLE_TASKPOOL_HISYSEVENT) 244 std::atomic<int32_t> reportCount_ = 0; 245 #endif 246 }; 247 } // namespace Commonlibrary::Concurrent::TaskPoolModule 248 #endif // JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H