• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H
17 #define JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H
18 
19 #include <mutex>
20 
21 #if defined(ENABLE_TASKPOOL_FFRT)
22 #include "cpp/task.h"
23 #endif
24 #include "helper/concurrent_helper.h"
25 #include "helper/error_helper.h"
26 #include "helper/napi_helper.h"
27 #include "helper/object_helper.h"
28 #include "message_queue.h"
29 #include "napi/native_api.h"
30 #include "napi/native_node_api.h"
31 #include "native_engine/native_engine.h"
32 #include "qos_helper.h"
33 #include "task.h"
34 #include "task_runner.h"
35 #include "tools/log.h"
36 
37 namespace Commonlibrary::Concurrent::TaskPoolModule {
38 using namespace Commonlibrary::Concurrent::Common;
39 using namespace Commonlibrary::Concurrent::Common::Helper;
40 using namespace Commonlibrary::Platform;
41 using MsgQueue = MessageQueue<TaskResultInfo*>;
42 
43 enum class WorkerState { IDLE, RUNNING, BLOCKED };
44 
45 #if defined(ENABLE_TASKPOOL_FFRT)
46 static const std::map<Priority, int> WORKERPRIORITY_FFRTQOS_MAP = {
47     {Priority::IDLE, ffrt::qos_background},
48     {Priority::LOW, ffrt::qos_utility},
49     {Priority::MEDIUM, ffrt::qos_default},
50     {Priority::HIGH, ffrt::qos_user_initiated},
51 };
52 #endif
53 
54 class Worker {
55 public:
56     using DebuggerPostTask = std::function<void()>;
57 
58     static Worker* WorkerConstructor(napi_env env);
59 
60     void NotifyExecuteTask();
61 
Enqueue(napi_env env,TaskResultInfo * resultInfo)62     void Enqueue(napi_env env, TaskResultInfo* resultInfo)
63     {
64         std::lock_guard<std::mutex> lock(queueMutex_);
65         msgQueueMap_[env].EnQueue(resultInfo);
66     }
67 
Dequeue(napi_env env,MsgQueue * & queue)68     void Dequeue(napi_env env, MsgQueue*& queue)
69     {
70         std::lock_guard<std::mutex> lock(queueMutex_);
71         auto item = msgQueueMap_.find(env);
72         if (item != msgQueueMap_.end()) {
73             queue = &(item->second);
74         }
75     }
76 
77     void NotifyTaskBegin();
78     // the function will only be called when the task is finished or
79     // exits abnormally, so we can not put it in the scope directly
80     void NotifyTaskFinished();
81     static void NotifyTaskResult(napi_env env, Task* task, napi_value result);
82     static void NotifyHandleTaskResult(Task* task);
83 
84 #if defined(ENABLE_TASKPOOL_FFRT)
85     bool IsLoopActive();
86     uint64_t GetWaitTime();
87 #endif
88 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
89     bool IsNeedReport(uint64_t intervalTime);
90     void IncreaseReportCount();
91 #endif
92 
93 private:
Worker(napi_env env)94     explicit Worker(napi_env env) : hostEnv_(env) {};
95 
96     ~Worker() = default;
97 
98     Worker(const Worker &) = delete;
99     Worker& operator=(const Worker &) = delete;
100     Worker(Worker &&) = delete;
101     Worker& operator=(Worker &&) = delete;
102 
103     void NotifyIdle();
104     void NotifyWorkerCreated();
NotifyTaskRunning()105     void NotifyTaskRunning()
106     {
107         state_ = WorkerState::RUNNING;
108         startTime_ = ConcurrentHelper::GetMilliseconds();
109         runningCount_++;
110     }
111 
HasRunningTasks()112     bool HasRunningTasks() const
113     {
114         return runningCount_ != 0;
115     }
116 
117 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
118     static void HandleDebuggerTask(const uv_async_t* req);
119     void DebuggerOnPostTask(std::function<void()>&& task);
120 #endif
121 
GetWorkerLoop()122     uv_loop_t* GetWorkerLoop() const
123     {
124         if (workerEnv_ != nullptr) {
125             return NapiHelper::GetLibUV(workerEnv_);
126         }
127         return nullptr;
128     }
129 
RunLoop()130     void RunLoop() const
131     {
132         uv_loop_t* loop = GetWorkerLoop();
133         if (loop != nullptr) {
134             uv_run(loop, UV_RUN_DEFAULT);
135         } else {
136             HILOG_ERROR("taskpool:: Worker loop is nullptr when start worker loop");
137             return;
138         }
139     }
140 
141     // we will use the scope to manage resources automatically,
142     // including the HandleScope and NotifyRunning/NotifyIdle
143     class RunningScope {
144     public:
RunningScope(Worker * worker)145         explicit RunningScope(Worker* worker) : worker_(worker)
146         {
147             napi_open_handle_scope(worker_->workerEnv_, &scope_);
148             worker_->idleState_ = false;
149             worker->isExecutingLongTask_ = false;
150             worker_->NotifyTaskRunning();
151         }
152 
153         ~RunningScope();
154 
155     private:
156         Worker* worker_ = nullptr;
157         napi_handle_scope scope_ = nullptr;
158     };
159 
160     // use PriorityScope to manage the priority setting of workers
161     // reset qos_user_initiated when exit PriorityScope
162     class PriorityScope {
163     public:
164         PriorityScope(Worker* worker, Priority taskPriority);
~PriorityScope()165         ~PriorityScope()
166         {
167             worker_->ResetWorkerPriority();
168         }
169 
170     private:
171         Worker* worker_ = nullptr;
172     };
173 
174     void StartExecuteInThread();
175     static void ExecuteInThread(const void* data);
176     bool PrepareForWorkerInstance();
177     void ReleaseWorkerThreadContent();
178     void ResetWorkerPriority();
179     bool CheckFreeConditions();
180     bool UpdateWorkerState(WorkerState expect, WorkerState desired);
181     void StoreTaskId(uint32_t taskId);
182     bool InitTaskPoolFunc(napi_env env, napi_value func, Task* task);
183     void UpdateExecutedInfo();
184     void UpdateLongTaskInfo(Task* task);
185     bool IsExecutingLongTask();
186     bool HasLongTask();
187     void TerminateTask(uint32_t taskId);
188     void CloseHandles();
189     void PostReleaseSignal();
190     bool IsRunnable(uint64_t currTime) const;
191     void UpdateWorkerWakeUpTime();
192 
193     static void HandleFunctionException(napi_env env, Task* task);
194     static void PerformTask(const uv_async_t* req);
195     static void TaskResultCallback(napi_env env, napi_value result, bool success, void* data);
196     static void ReleaseWorkerHandles(const uv_async_t* req);
197     static void TriggerGCCheck(const uv_async_t* req);
198 
199 #if defined(ENABLE_TASKPOOL_FFRT)
200     void InitFfrtInfo();
201     void InitLoopHandleNum();
202 #endif
203 
204     napi_env hostEnv_ {nullptr};
205     napi_env workerEnv_ {nullptr};
206     uv_async_t* performTaskSignal_ {nullptr};
207     uv_async_t* clearWorkerSignal_ {nullptr};
208     uv_async_t* triggerGCCheckSignal_ {nullptr};
209 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
210     uv_async_t* debuggerOnPostTaskSignal_ {nullptr};
211     std::mutex debuggerMutex_;
212     std::queue<DebuggerPostTask> debuggerQueue_ {};
213 #endif
214     std::unique_ptr<TaskRunner> runner_ {nullptr};
215 
216     std::atomic<int32_t> runningCount_ = 0;
217     std::atomic<bool> idleState_ = true; // true means the worker is idle
218     std::atomic<uint64_t> idlePoint_ = ConcurrentHelper::GetMilliseconds();
219     std::atomic<uint64_t> startTime_ = ConcurrentHelper::GetMilliseconds();
220     std::atomic<uint64_t> wakeUpTime_ = ConcurrentHelper::GetMilliseconds();
221     std::atomic<WorkerState> state_ {WorkerState::IDLE};
222     std::atomic<bool> hasExecuted_ = false; // false means this worker hasn't execute any tasks
223     Priority priority_ {Priority::DEFAULT};
224     pid_t tid_ = 0;
225     std::vector<uint32_t> currentTaskId_ {};
226     std::mutex currentTaskIdMutex_;
227     MessageQueue<TaskResultInfo*> hostMessageQueue_ {};
228     uint64_t lastCpuTime_ = 0;
229     uint32_t idleCount_ = 0;
230     std::atomic<bool> hasLongTask_ = false;
231     std::atomic<bool> isExecutingLongTask_ = false;
232     std::mutex longMutex_;
233     std::unordered_set<uint32_t> longTasksSet_ {};
234     std::mutex queueMutex_; // for sendData
235     std::unordered_map<napi_env, MsgQueue> msgQueueMap_ {};
236     friend class TaskManager;
237     friend class NativeEngineTest;
238 
239 #if defined(ENABLE_TASKPOOL_FFRT)
240     void* ffrtTaskHandle_ = nullptr;
241     uint32_t initActiveHandleNum_ = 0;
242 #endif
243 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
244     std::atomic<int32_t> reportCount_ = 0;
245 #endif
246 };
247 } // namespace Commonlibrary::Concurrent::TaskPoolModule
248 #endif // JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H