• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_WORKER_WORKER_H
17 #define JS_CONCURRENT_MODULE_WORKER_WORKER_H
18 
19 #include <condition_variable>
20 #include <list>
21 #include <map>
22 #include <mutex>
23 
24 #include "helper/napi_helper.h"
25 #include "helper/object_helper.h"
26 #include "message_queue.h"
27 #include "napi/native_api.h"
28 #include "napi/native_node_api.h"
29 #include "native_engine/native_engine.h"
30 #include "utils/log.h"
31 #include "worker_runner.h"
32 
33 namespace Commonlibrary::Concurrent::WorkerModule {
34 using namespace Commonlibrary::Concurrent::Common::Helper;
35 
36 class Worker {
37 public:
38     enum RunnerState { STARTING, RUNNING, TERMINATEING, TERMINATED };
39     enum HostState { ACTIVE, INACTIVE };
40     enum ListenerMode { ONCE, PERMANENT };
41     enum ScriptMode { CLASSIC, MODULE };
42 
43     using DebuggerPostTask = std::function<void()>;
44 
45     struct WorkerListener {
WorkerListenerWorkerListener46         WorkerListener(napi_env env, napi_ref callback, ListenerMode mode)
47             : env_(env), callback_(callback), mode_(mode)
48         {}
49 
~WorkerListenerWorkerListener50         ~WorkerListener()
51         {
52             NapiHelper::DeleteReference(env_, callback_);
53             callback_ = nullptr;
54         }
55 
NextIsAvailableWorkerListener56         bool NextIsAvailable() const
57         {
58             return mode_ != ONCE;
59         }
60 
SetModeWorkerListener61         void SetMode(ListenerMode mode)
62         {
63             mode_ = mode;
64         }
65 
66         bool operator==(const WorkerListener& listener) const;
67 
68         napi_env env_ {NULL};
69         napi_ref callback_ {NULL};
70         ListenerMode mode_ {PERMANENT};
71     };
72 
73     struct FindWorkerListener {
FindWorkerListenerFindWorkerListener74         FindWorkerListener(napi_env env, napi_ref ref) : env_(env), ref_(ref) {}
75 
operatorFindWorkerListener76         bool operator()(const WorkerListener* listener) const
77         {
78             napi_value compareObj = NapiHelper::GetReferenceValue(env_, listener->callback_);
79             napi_value obj = NapiHelper::GetReferenceValue(env_, ref_);
80             // the env of listener and cmp listener must be same env because of Synchronization method
81             return NapiHelper::StrictEqual(env_, compareObj, obj);
82         }
83 
84         napi_env env_ {nullptr};
85         napi_ref ref_ {nullptr};
86     };
87 
88     struct WorkerParams {
89         std::string name_ {};
90         ScriptMode type_ {CLASSIC};
91     };
92 
93     /**
94     * Creates a worker instance.
95     *
96     * @param env NAPI environment parameters.
97     * @param thisVar URL of the script to be executed by the worker.
98     */
99     Worker(napi_env env, napi_ref thisVar);
100 
101     /**
102         * The destructor of the Worker.
103         */
104     ~Worker();
105 
106     /**
107      * The host thread receives the information.
108      *
109      * @param req The value of the object passed in by the js layer.
110      */
111     static void HostOnMessage(const uv_async_t* req);
112 
113     /**
114      * The host thread receives the information.
115      *
116      * @param req The value of the object passed in by the js layer.
117      */
118     static void HostOnError(const uv_async_t* req);
119 
120     /**
121      * The worker thread receives the information.
122      *
123      * @param req The value of the object passed in by the js layer.
124      */
125     static void WorkerOnMessage(const uv_async_t* req);
126 
127     /**
128      * ExecuteIn in thread.
129      *
130      * @param data The worker pointer.
131      */
132     static void ExecuteInThread(const void* data);
133 
134     /**
135     * Post a message.
136     *
137     * @param env NAPI environment parameters.
138     * @param thisVar The callback information of the js layer.
139     */
140     static napi_value PostMessage(napi_env env, napi_callback_info cbinfo);
141 
142     /**
143     * Post a message, if has sendable objects in it pass sendable objects' reference.
144     *
145     * @param env NAPI environment parameters.
146     * @param thisVar The callback information of the js layer.
147     */
148     static napi_value PostSendableMessage(napi_env env, napi_callback_info cbinfo);
149 
150     /**
151     * postMessage implementation
152     *
153     * @param env NAPI environment parameters.
154     * @param thisVar The callback information of the js layer.
155     */
156     static napi_value CommonPostMessage(napi_env env, napi_callback_info cbinfo, bool cloneSendable);
157 
158     /**
159      * Add event listeners to host.
160      *
161      * @param env NAPI environment parameters.
162      * @param cbinfo The callback information of the js layer.
163      */
164     static napi_value PostMessageToHost(napi_env env, napi_callback_info cbinfo);
165 
166     /**
167     * Post a message, if has sendable objects in it pass sendable objects' reference.
168     *
169     * @param env NAPI environment parameters.
170     * @param thisVar The callback information of the js layer.
171     */
172     static napi_value PostSendableMessageToHost(napi_env env, napi_callback_info cbinfo);
173 
174     /**
175     * postMessage implementation
176     *
177     * @param env NAPI environment parameters.
178     * @param thisVar The callback information of the js layer.
179     */
180     static napi_value CommonPostMessageToHost(napi_env env, napi_callback_info cbinfo, bool cloneSendable);
181 
182     /**
183      * Terminates the worker thread to stop the worker from receiving messages.
184      *
185      * @param env NAPI environment parameters.
186      * @param cbinfo The callback information of the js layer.
187      */
188     static napi_value Terminate(napi_env env, napi_callback_info cbinfo);
189 
190     /**
191      * Close the worker.
192      *
193      * @param env NAPI environment parameters.
194      * @param cbinfo The callback information of the js layer.
195      */
196     static napi_value CloseWorker(napi_env env, napi_callback_info cbinfo);
197 
198     /**
199      * Adds an event listener to the worker.
200      *
201      * @param env NAPI environment parameters.
202      * @param cbinfo The callback information of the js layer.
203      */
204     static napi_value On(napi_env env, napi_callback_info cbinfo);
205 
206     /**
207      * Adds an event listener to the worker and removes the event listener automatically after it is invoked once.
208      *
209      * @param env NAPI environment parameters.
210      * @param cbinfo The callback information of the js layer.
211      */
212     static napi_value Once(napi_env env, napi_callback_info cbinfo);
213 
214     /**
215      * Removes an event listener to the worker.
216      *
217      * @param env NAPI environment parameters.
218      * @param cbinfo The callback information of the js layer.
219      */
220     static napi_value Off(napi_env env, napi_callback_info cbinfo);
221 
222     /**
223      * Add event listeners.
224      *
225      * @param env NAPI environment parameters.
226      * @param cbinfo The callback information of the js layer.
227      */
228     static napi_value AddEventListener(napi_env env, napi_callback_info cbinfo);
229 
230     /**
231      * Dispatch the event.
232      *
233      * @param env NAPI environment parameters.
234      * @param cbinfo The callback information of the js layer.
235      */
236     static napi_value DispatchEvent(napi_env env, napi_callback_info cbinfo);
237 
238     /**
239      * Remove the event listener.
240      *
241      * @param env NAPI environment parameters.
242      * @param cbinfo The callback information of the js layer.
243      */
244     static napi_value RemoveEventListener(napi_env env, napi_callback_info cbinfo);
245 
246     /**
247      * Remove all listener.
248      *
249      * @param env NAPI environment parameters.
250      * @param cbinfo The callback information of the js layer.
251      */
252     static napi_value RemoveAllListener(napi_env env, napi_callback_info cbinfo);
253 
254     /**
255      * Add the listener.
256      *
257      * @param env NAPI environment parameters.
258      * @param cbinfo The callback information of the js layer.
259      */
260     static napi_value AddListener(napi_env env, napi_callback_info cbinfo, ListenerMode mode);
261 
262     /**
263      * Remove the listener.
264      *
265      * @param env NAPI environment parameters.
266      * @param cbinfo The callback information of the js layer.
267      */
268     static napi_value RemoveListener(napi_env env, napi_callback_info cbinfo);
269 
270     /**
271      * The constructor of worker.
272      *
273      * @param env NAPI environment parameters.
274      * @param cbinfo The callback information of the js layer.
275      */
276     static napi_value LimitedWorkerConstructor(napi_env env, napi_callback_info cbinfo);
277     static napi_value ThreadWorkerConstructor(napi_env env, napi_callback_info cbinfo);
278     static napi_value WorkerConstructor(napi_env env, napi_callback_info cbinfo);
279     static napi_value Constructor(napi_env env, napi_callback_info cbinfo, bool limitSign = false);
280 
281     /**
282      * Initialize the worker and port.
283      *
284      * @param env NAPI environment parameters.
285      * @param cbinfo The callback information of the js layer.
286      */
287     static napi_value InitWorker(napi_env env, napi_value exports);
288     static napi_value InitPort(napi_env env, napi_value exports);
289 
290     /**
291      * Cancel the task.
292      *
293      * @param env NAPI environment parameters.
294      * @param cbinfo The callback information of the js layer.
295      */
296     static napi_value CancelTask(napi_env env, napi_callback_info cbinfo);
297 
298     /**
299      * The parent port cancels the task.
300      *
301      * @param env NAPI environment parameters.
302      * @param cbinfo The callback information of the js layer.
303      */
304     static napi_value ParentPortCancelTask(napi_env env, napi_callback_info cbinfo);
305 
306     /**
307      * The parent port adds an event listener.
308      *
309      * @param env NAPI environment parameters.
310      * @param cbinfo The callback information of the js layer.
311      */
312     static napi_value ParentPortAddEventListener(napi_env env, napi_callback_info cbinfo);
313 
314     /**
315      * The parent port removes all event listener.
316      *
317      * @param env NAPI environment parameters.
318      * @param cbinfo The callback information of the js layer.
319      */
320     static napi_value ParentPortRemoveAllListener(napi_env env, napi_callback_info cbinfo);
321 
322     /**
323      * The parent port dispatch the event listener.
324      *
325      * @param env NAPI environment parameters.
326      * @param cbinfo The callback information of the js layer.
327      */
328     static napi_value ParentPortDispatchEvent(napi_env env, napi_callback_info cbinfo);
329 
330     /**
331      * The parent port removes the event listener.
332      *
333      * @param env NAPI environment parameters.
334      * @param cbinfo The callback information of the js layer.
335      */
336     static napi_value ParentPortRemoveEventListener(napi_env env, napi_callback_info cbinfo);
337 
338     /**
339      * Register a globalCallObject on host side.
340      *
341      * @param env NAPI environment parameters.
342      * @param cbinfo The callback information of the js layer.
343      */
344     static napi_value RegisterGlobalCallObject(napi_env env, napi_callback_info cbinfo);
345 
346     /**
347      * Unregister the specific globalCallObject on host side.
348      *
349      * @param env NAPI environment parameters.
350      * @param cbinfo The callback information of the js layer.
351      */
352     static napi_value UnregisterGlobalCallObject(napi_env env, napi_callback_info cbinfo);
353 
354     /**
355      * Post a global synchronized call request to an object registered on host side.
356      *
357      * @param env NAPI environment parameters.
358      * @param cbinfo The callback information of the js layer.
359      */
360     static napi_value GlobalCall(napi_env env, napi_callback_info cbinfo);
361 
362     static void HostOnGlobalCall(const uv_async_t* req);
363 
364     static bool CanCreateWorker(napi_env env, WorkerVersion target);
365 
366     static WorkerParams* CheckWorkerArgs(napi_env env, napi_value argsValue);
367 
368     static void WorkerThrowError(napi_env env, int32_t errCode, const char* errMessage = nullptr);
369 
370     void StartExecuteInThread(napi_env env, const char* script);
371 
372     bool UpdateWorkerState(RunnerState state);
373     bool UpdateHostState(HostState state);
374 
IsNotTerminate()375     bool IsNotTerminate() const
376     {
377         return runnerState_.load(std::memory_order_acquire) <= RUNNING;
378     }
379 
IsRunning()380     bool IsRunning() const
381     {
382         return runnerState_.load(std::memory_order_acquire) == RUNNING;
383     }
384 
IsTerminated()385     bool IsTerminated() const
386     {
387         return runnerState_.load(std::memory_order_acquire) >= TERMINATED;
388     }
389 
IsTerminating()390     bool IsTerminating() const
391     {
392         return runnerState_.load(std::memory_order_acquire) == TERMINATEING;
393     }
394 
SetScriptMode(ScriptMode mode)395     void SetScriptMode(ScriptMode mode)
396     {
397         scriptMode_ = mode;
398     }
399 
400     void AddListenerInner(napi_env env, const char* type, const WorkerListener* listener);
401     void RemoveListenerInner(napi_env env, const char* type, napi_ref callback);
402     void RemoveAllListenerInner();
403 
GetWorkerLoop()404     uv_loop_t* GetWorkerLoop() const
405     {
406         if (workerEnv_ != nullptr) {
407             return NapiHelper::GetLibUV(workerEnv_);
408         }
409         return nullptr;
410     }
411 
SetWorkerEnv(napi_env workerEnv)412     void SetWorkerEnv(napi_env workerEnv)
413     {
414         workerEnv_ = workerEnv;
415         if (workerEnvCallback_) {
416             workerEnvCallback_(workerEnv_);
417         }
418     }
419 
GetScript()420     std::string GetScript() const
421     {
422         return script_;
423     }
424 
GetName()425     std::string GetName() const
426     {
427         return name_;
428     }
429 
ClearWorkerTasks()430     bool ClearWorkerTasks()
431     {
432         if (hostEnv_ != nullptr) {
433             workerMessageQueue_.Clear(hostEnv_);
434             return true;
435         }
436         return false;
437     }
438 
HostIsStop()439     bool HostIsStop() const
440     {
441         return hostState_.load(std::memory_order_acquire) == INACTIVE;
442     }
443 
IsSameWorkerEnv(napi_env env)444     bool IsSameWorkerEnv(napi_env env) const
445     {
446         return workerEnv_ == env;
447     }
448 
Loop()449     void Loop() const
450     {
451         uv_loop_t* loop = GetWorkerLoop();
452         if (loop != nullptr) {
453             uv_run(loop, UV_RUN_DEFAULT);
454         } else {
455             HILOG_ERROR("worker:: Worker loop is nullptr when start worker loop");
456             return;
457         }
458     }
459 
RegisterCallbackForWorkerEnv(std::function<void (napi_env)> callback)460     void RegisterCallbackForWorkerEnv(std::function<void (napi_env)> callback)
461     {
462         workerEnvCallback_ = callback;
463         if (workerEnv_ != nullptr) {
464             workerEnvCallback_(workerEnv_);
465         }
466     }
467 
GetWorkerEnv()468     napi_env GetWorkerEnv() const
469     {
470         return workerEnv_;
471     }
472 
GetHostEnv()473     napi_env GetHostEnv() const
474     {
475         return hostEnv_;
476     }
477 
478 private:
479     void WorkerOnMessageInner();
480     void HostOnMessageInner();
481     void HostOnErrorInner();
482     void HostOnMessageErrorInner();
483     void HostOnGlobalCallInner();
484     void WorkerOnMessageErrorInner();
485     void WorkerOnErrorInner(napi_value error);
486 
487     void HandleHostException() const;
488     void HandleException();
489     void HandleUncaughtException(napi_value exception);
490     bool CallWorkerFunction(size_t argc, const napi_value* argv, const char* methodName, bool tryCatch);
491     void CallHostFunction(size_t argc, const napi_value* argv, const char* methodName) const;
492 
493     bool HandleEventListeners(napi_env env, napi_value recv, size_t argc, const napi_value* argv, const char* type);
494     void ParentPortHandleEventListeners(napi_env env, napi_value recv, size_t argc,
495                                         const napi_value* argv, const char* type, bool tryCatch);
496     void TerminateInner();
497 
498     void PostMessageInner(MessageDataType data);
499     void PostMessageToHostInner(MessageDataType data);
500 
501     void TerminateWorker();
502     void CloseInner();
503 
504     void PublishWorkerOverSignal();
505     void CloseWorkerCallback();
506     void CloseHostCallback();
507 
508     void ReleaseWorkerThreadContent();
509     void ReleaseHostThreadContent();
510     bool PrepareForWorkerInstance();
511     void ParentPortAddListenerInner(napi_env env, const char* type, const WorkerListener* listener);
512     void ParentPortRemoveAllListenerInner();
513     void ParentPortRemoveListenerInner(napi_env env, const char* type, napi_ref callback);
514     void GetContainerScopeId(napi_env env);
515 
516     void AddGlobalCallObject(const std::string &instanceName, napi_ref obj);
517     bool RemoveGlobalCallObject(const std::string &instanceName);
518     void ClearGlobalCallObject();
519     void AddGlobalCallError(int32_t errCode, napi_value errData = nullptr);
520     void HandleGlobalCallError(napi_env env);
521     void ClearGlobalCallError(napi_env env);
522     void InitGlobalCallStatus(napi_env env);
523     void IncreaseGlobalCallId();
524 
525 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
526     static void HandleDebuggerTask(const uv_async_t* req);
527     void DebuggerOnPostTask(std::function<void()>&& task);
528 #endif
529 
530     std::string script_ {};
531     std::string fileName_ {};
532     std::string name_ {};
533     ScriptMode scriptMode_ {CLASSIC};
534     bool isLimitedWorker_ {false};
535     bool isRelativePath_ {false};
536     int32_t scopeId_ {-1};
537 
538     MessageQueue workerMessageQueue_ {};
539     MessageQueue hostMessageQueue_ {};
540     std::mutex globalCallMutex_;
541     MarkedMessageQueue hostGlobalCallQueue_ {};
542     MessageQueue workerGlobalCallQueue_ {};
543     MessageQueue errorQueue_ {};
544 
545     uv_async_t* workerOnMessageSignal_ = nullptr;
546     uv_async_t* hostOnMessageSignal_ = nullptr;
547     uv_async_t* hostOnErrorSignal_ = nullptr;
548     uv_async_t* hostOnGlobalCallSignal_ = nullptr;
549 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
550     uv_async_t debuggerOnPostTaskSignal_ {};
551     std::mutex debuggerMutex_;
552     std::queue<DebuggerPostTask> debuggerQueue_ {};
553 #endif
554 
555     std::atomic<RunnerState> runnerState_ {STARTING};
556     std::atomic<HostState> hostState_ {ACTIVE};
557     std::unique_ptr<WorkerRunner> runner_ {};
558 
559     std::atomic<bool> isErrorExit_ = false;
560 
561     napi_env hostEnv_ {nullptr};
562     napi_env workerEnv_ {nullptr};
563 
564     napi_ref workerRef_ {nullptr};
565     napi_ref workerPort_ {nullptr};
566 
567     std::map<std::string, std::list<WorkerListener*>> eventListeners_ {};
568     std::map<std::string, std::list<WorkerListener*>> parentPortEventListeners_ {};
569     std::unordered_map<std::string, napi_ref> globalCallObjects_ {};
570     std::queue<std::pair<int32_t, napi_value>> globalCallErrors_ {};
571     std::atomic<uint32_t> globalCallId_ = 1; // 0: reserved for error check
572 
573     std::recursive_mutex liveStatusLock_ {};
574     std::mutex workerOnmessageMutex_ {};
575 
576     std::condition_variable cv_;
577     std::atomic<bool> globalCallSuccess_ = true;
578     std::function<void(napi_env)> workerEnvCallback_;
579 };
580 } // namespace Commonlibrary::Concurrent::WorkerModule
581 #endif // JS_CONCURRENT_MODULE_WORKER_WORKER_H
582