1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef JS_CONCURRENT_MODULE_WORKER_WORKER_H 17 #define JS_CONCURRENT_MODULE_WORKER_WORKER_H 18 19 #include <condition_variable> 20 #include <list> 21 #include <map> 22 #include <mutex> 23 24 #include "helper/napi_helper.h" 25 #include "helper/object_helper.h" 26 #include "message_queue.h" 27 #include "napi/native_api.h" 28 #include "napi/native_node_api.h" 29 #include "native_engine/native_engine.h" 30 #include "native_engine/worker_manager.h" 31 #include "worker_runner.h" 32 #if defined(ENABLE_WORKER_EVENTHANDLER) 33 #include "event_handler.h" 34 #endif 35 36 namespace Commonlibrary::Concurrent::WorkerModule { 37 using namespace Commonlibrary::Concurrent::Common::Helper; 38 39 enum class WorkerPriority { INVALID = -1, HIGH = 0, MEDIUM, LOW, IDLE }; 40 41 class Worker { 42 public: 43 enum RunnerState { STARTING, RUNNING, TERMINATEING, TERMINATED }; 44 enum HostState { ACTIVE, INACTIVE }; 45 enum ListenerMode { ONCE, PERMANENT }; 46 enum ScriptMode { CLASSIC, MODULE }; 47 48 using DebuggerPostTask = std::function<void()>; 49 50 struct WorkerListener { WorkerListenerWorkerListener51 WorkerListener(napi_env env, napi_ref callback, ListenerMode mode) 52 : env_(env), callback_(callback), mode_(mode) 53 {} 54 ~WorkerListenerWorkerListener55 ~WorkerListener() 56 { 57 NapiHelper::DeleteReference(env_, callback_); 58 callback_ = nullptr; 59 } 60 NextIsAvailableWorkerListener61 bool NextIsAvailable() const 62 { 63 return mode_ != ONCE; 64 } 65 SetModeWorkerListener66 void SetMode(ListenerMode mode) 67 { 68 mode_ = mode; 69 } 70 71 bool operator==(const WorkerListener& listener) const; 72 73 napi_env env_ {NULL}; 74 napi_ref callback_ {NULL}; 75 ListenerMode mode_ {PERMANENT}; 76 }; 77 78 struct FindWorkerListener { FindWorkerListenerFindWorkerListener79 FindWorkerListener(napi_env env, napi_ref ref) : env_(env), ref_(ref) {} 80 operatorFindWorkerListener81 bool operator()(const WorkerListener* listener) const 82 { 83 napi_value compareObj = NapiHelper::GetReferenceValue(env_, listener->callback_); 84 napi_value obj = NapiHelper::GetReferenceValue(env_, ref_); 85 // the env of listener and cmp listener must be same env because of Synchronization method 86 return NapiHelper::StrictEqual(env_, compareObj, obj); 87 } 88 89 napi_env env_ {nullptr}; 90 napi_ref ref_ {nullptr}; 91 }; 92 93 struct WorkerParams { 94 std::string name_ {}; 95 ScriptMode type_ {CLASSIC}; 96 WorkerPriority workerPriority_ { WorkerPriority::INVALID }; 97 }; 98 99 struct WorkerWrapper { WorkerWrapperWorkerWrapper100 explicit WorkerWrapper(Worker* worker) : workerPtr_(worker) {} 101 GetWorkerWorkerWrapper102 Worker* GetWorker() const 103 { 104 return workerPtr_; 105 } 106 107 Worker* workerPtr_ {nullptr}; 108 }; 109 110 /** 111 * Creates a worker instance. 112 * 113 * @param env NAPI environment parameters. 114 * @param thisVar URL of the script to be executed by the worker. 115 */ 116 Worker(napi_env env, napi_ref thisVar); 117 118 /** 119 * The destructor of the Worker. 120 */ 121 ~Worker(); 122 123 /** 124 * The host thread receives the information. 125 * 126 * @param req The value of the object passed in by the js layer. 127 */ 128 static void HostOnMessage(const uv_async_t* req); 129 130 /** 131 * The host thread receives the information. 132 * 133 * @param req The value of the object passed in by the js layer. 134 */ 135 static void HostOnError(const uv_async_t* req); 136 137 /** 138 * The host thread receives the worker exception. 139 * 140 * @param req The value of the object passed in by the js layer. 141 */ 142 static void HostOnAllErrors(const uv_async_t* req); 143 144 /** 145 * The worker thread receives the information. 146 * 147 * @param req The value of the object passed in by the js layer. 148 */ 149 static void WorkerOnMessage(const uv_async_t* req); 150 151 /** 152 * ExecuteIn in thread. 153 * 154 * @param data The worker pointer. 155 */ 156 static void ExecuteInThread(const void* data); 157 158 /** 159 * Post a message. 160 * 161 * @param env NAPI environment parameters. 162 * @param thisVar The callback information of the js layer. 163 */ 164 static napi_value PostMessage(napi_env env, napi_callback_info cbinfo); 165 166 /** 167 * Post a message, if has sendable objects in it pass sendable objects' reference. 168 * 169 * @param env NAPI environment parameters. 170 * @param thisVar The callback information of the js layer. 171 */ 172 static napi_value PostMessageWithSharedSendable(napi_env env, napi_callback_info cbinfo); 173 174 /** 175 * postMessage implementation 176 * 177 * @param env NAPI environment parameters. 178 * @param thisVar The callback information of the js layer. 179 */ 180 static napi_value CommonPostMessage(napi_env env, napi_callback_info cbinfo, bool cloneSendable); 181 182 /** 183 * Add event listeners to host. 184 * 185 * @param env NAPI environment parameters. 186 * @param cbinfo The callback information of the js layer. 187 */ 188 static napi_value PostMessageToHost(napi_env env, napi_callback_info cbinfo); 189 190 /** 191 * Post a message, if has sendable objects in it pass sendable objects' reference. 192 * 193 * @param env NAPI environment parameters. 194 * @param thisVar The callback information of the js layer. 195 */ 196 static napi_value PostMessageWithSharedSendableToHost(napi_env env, napi_callback_info cbinfo); 197 198 /** 199 * postMessage implementation 200 * 201 * @param env NAPI environment parameters. 202 * @param thisVar The callback information of the js layer. 203 */ 204 static napi_value CommonPostMessageToHost(napi_env env, napi_callback_info cbinfo, bool cloneSendable); 205 206 /** 207 * Terminates the worker thread to stop the worker from receiving messages. 208 * 209 * @param env NAPI environment parameters. 210 * @param cbinfo The callback information of the js layer. 211 */ 212 static napi_value Terminate(napi_env env, napi_callback_info cbinfo); 213 214 /** 215 * Close the worker. 216 * 217 * @param env NAPI environment parameters. 218 * @param cbinfo The callback information of the js layer. 219 */ 220 static napi_value CloseWorker(napi_env env, napi_callback_info cbinfo); 221 222 /** 223 * Adds an event listener to the worker. 224 * 225 * @param env NAPI environment parameters. 226 * @param cbinfo The callback information of the js layer. 227 */ 228 static napi_value On(napi_env env, napi_callback_info cbinfo); 229 230 /** 231 * Adds an event listener to the worker and removes the event listener automatically after it is invoked once. 232 * 233 * @param env NAPI environment parameters. 234 * @param cbinfo The callback information of the js layer. 235 */ 236 static napi_value Once(napi_env env, napi_callback_info cbinfo); 237 238 /** 239 * Removes an event listener to the worker. 240 * 241 * @param env NAPI environment parameters. 242 * @param cbinfo The callback information of the js layer. 243 */ 244 static napi_value Off(napi_env env, napi_callback_info cbinfo); 245 246 /** 247 * Add event listeners. 248 * 249 * @param env NAPI environment parameters. 250 * @param cbinfo The callback information of the js layer. 251 */ 252 static napi_value AddEventListener(napi_env env, napi_callback_info cbinfo); 253 254 /** 255 * Dispatch the event. 256 * 257 * @param env NAPI environment parameters. 258 * @param cbinfo The callback information of the js layer. 259 */ 260 static napi_value DispatchEvent(napi_env env, napi_callback_info cbinfo); 261 262 /** 263 * Remove the event listener. 264 * 265 * @param env NAPI environment parameters. 266 * @param cbinfo The callback information of the js layer. 267 */ 268 static napi_value RemoveEventListener(napi_env env, napi_callback_info cbinfo); 269 270 /** 271 * Remove all listener. 272 * 273 * @param env NAPI environment parameters. 274 * @param cbinfo The callback information of the js layer. 275 */ 276 static napi_value RemoveAllListener(napi_env env, napi_callback_info cbinfo); 277 278 /** 279 * Add the listener. 280 * 281 * @param env NAPI environment parameters. 282 * @param cbinfo The callback information of the js layer. 283 */ 284 static napi_value AddListener(napi_env env, napi_callback_info cbinfo, ListenerMode mode); 285 286 /** 287 * Remove the listener. 288 * 289 * @param env NAPI environment parameters. 290 * @param cbinfo The callback information of the js layer. 291 */ 292 static napi_value RemoveListener(napi_env env, napi_callback_info cbinfo); 293 294 /** 295 * The constructor of worker. 296 * 297 * @param env NAPI environment parameters. 298 * @param cbinfo The callback information of the js layer. 299 */ 300 static napi_value LimitedWorkerConstructor(napi_env env, napi_callback_info cbinfo); 301 static napi_value ThreadWorkerConstructor(napi_env env, napi_callback_info cbinfo); 302 static napi_value WorkerConstructor(napi_env env, napi_callback_info cbinfo); 303 static napi_value Constructor(napi_env env, napi_callback_info cbinfo, bool limitSign = false, 304 WorkerVersion version = WorkerVersion::NONE); 305 306 /** 307 * Initialize the worker and port. 308 * 309 * @param env NAPI environment parameters. 310 * @param cbinfo The callback information of the js layer. 311 */ 312 static napi_value InitWorker(napi_env env, napi_value exports); 313 static void InitPriorityObject(napi_env env, napi_value exports); 314 static napi_value InitPort(napi_env env, napi_value exports); 315 316 /** 317 * Cancel the task. 318 * 319 * @param env NAPI environment parameters. 320 * @param cbinfo The callback information of the js layer. 321 */ 322 static napi_value CancelTask(napi_env env, napi_callback_info cbinfo); 323 324 /** 325 * The parent port cancels the task. 326 * 327 * @param env NAPI environment parameters. 328 * @param cbinfo The callback information of the js layer. 329 */ 330 static napi_value ParentPortCancelTask(napi_env env, napi_callback_info cbinfo); 331 332 /** 333 * The parent port adds an event listener. 334 * 335 * @param env NAPI environment parameters. 336 * @param cbinfo The callback information of the js layer. 337 */ 338 static napi_value ParentPortAddEventListener(napi_env env, napi_callback_info cbinfo); 339 340 /** 341 * The parent port removes all event listener. 342 * 343 * @param env NAPI environment parameters. 344 * @param cbinfo The callback information of the js layer. 345 */ 346 static napi_value ParentPortRemoveAllListener(napi_env env, napi_callback_info cbinfo); 347 348 /** 349 * The parent port dispatch the event listener. 350 * 351 * @param env NAPI environment parameters. 352 * @param cbinfo The callback information of the js layer. 353 */ 354 static napi_value ParentPortDispatchEvent(napi_env env, napi_callback_info cbinfo); 355 356 /** 357 * The parent port removes the event listener. 358 * 359 * @param env NAPI environment parameters. 360 * @param cbinfo The callback information of the js layer. 361 */ 362 static napi_value ParentPortRemoveEventListener(napi_env env, napi_callback_info cbinfo); 363 364 /** 365 * Register a globalCallObject on host side. 366 * 367 * @param env NAPI environment parameters. 368 * @param cbinfo The callback information of the js layer. 369 */ 370 static napi_value RegisterGlobalCallObject(napi_env env, napi_callback_info cbinfo); 371 372 /** 373 * Unregister the specific globalCallObject on host side. 374 * 375 * @param env NAPI environment parameters. 376 * @param cbinfo The callback information of the js layer. 377 */ 378 static napi_value UnregisterGlobalCallObject(napi_env env, napi_callback_info cbinfo); 379 380 /** 381 * Post a global synchronized call request to an object registered on host side. 382 * 383 * @param env NAPI environment parameters. 384 * @param cbinfo The callback information of the js layer. 385 */ 386 static napi_value GlobalCall(napi_env env, napi_callback_info cbinfo); 387 388 static void HostOnGlobalCall(const uv_async_t* req); 389 390 static bool CanCreateWorker(napi_env env, WorkerVersion target); 391 392 static WorkerParams* CheckWorkerArgs(napi_env env, napi_value argsValue, 393 WorkerVersion version = WorkerVersion::NONE); 394 395 static WorkerPriority GetPriorityArg(napi_env env, napi_value argsValue); 396 397 static void WorkerThrowError(napi_env env, int32_t errCode, const char* errMessage = nullptr); 398 399 static void WorkerDestructor(napi_env env, void* data, void* hint); 400 static void WorkerHostEnvCleanCallback(void* data); 401 static void LimitedWorkerHostEnvCleanCallback(void* data); 402 403 #if defined(ENABLE_WORKER_EVENTHANDLER) 404 static std::shared_ptr<OHOS::AppExecFwk::EventHandler> GetMainThreadHandler(); 405 #endif 406 407 void StartExecuteInThread(napi_env env, const char* script); 408 409 bool UpdateWorkerState(RunnerState state); 410 bool UpdateHostState(HostState state); 411 IsNotTerminate()412 bool IsNotTerminate() const 413 { 414 return runnerState_.load(std::memory_order_acquire) <= RUNNING; 415 } 416 IsRunning()417 bool IsRunning() const 418 { 419 return runnerState_.load(std::memory_order_acquire) == RUNNING; 420 } 421 IsTerminated()422 bool IsTerminated() const 423 { 424 return runnerState_.load(std::memory_order_acquire) >= TERMINATED; 425 } 426 IsTerminating()427 bool IsTerminating() const 428 { 429 return runnerState_.load(std::memory_order_acquire) == TERMINATEING; 430 } 431 SetScriptMode(ScriptMode mode)432 void SetScriptMode(ScriptMode mode) 433 { 434 scriptMode_ = mode; 435 } 436 437 void AddListenerInner(napi_env env, const char* type, const WorkerListener* listener); 438 void RemoveListenerInner(napi_env env, const char* type, napi_ref callback); 439 void RemoveAllListenerInner(); 440 void EraseWorker(); GetWorkerLoop()441 uv_loop_t* GetWorkerLoop() const 442 { 443 if (workerEnv_ != nullptr) { 444 return NapiHelper::GetLibUV(workerEnv_); 445 } 446 return nullptr; 447 } 448 SetWorkerEnv(napi_env workerEnv)449 void SetWorkerEnv(napi_env workerEnv) 450 { 451 workerEnv_ = workerEnv; 452 if (workerEnvCallback_) { 453 workerEnvCallback_(workerEnv_); 454 } 455 } 456 GetScript()457 std::string GetScript() const 458 { 459 return script_; 460 } 461 GetName()462 std::string GetName() const 463 { 464 return name_; 465 } 466 ClearWorkerTasks()467 bool ClearWorkerTasks() 468 { 469 if (hostEnv_ != nullptr) { 470 workerMessageQueue_.Clear(hostEnv_); 471 return true; 472 } 473 return false; 474 } 475 HostIsStop()476 bool HostIsStop() const 477 { 478 return hostState_.load(std::memory_order_acquire) == INACTIVE; 479 } 480 IsSameWorkerEnv(napi_env env)481 bool IsSameWorkerEnv(napi_env env) const 482 { 483 return workerEnv_ == env; 484 } 485 Loop()486 void Loop() const 487 { 488 uv_loop_t* loop = GetWorkerLoop(); 489 if (loop != nullptr) { 490 uv_run(loop, UV_RUN_DEFAULT); 491 } else { 492 return; 493 } 494 } 495 RegisterCallbackForWorkerEnv(std::function<void (napi_env)> callback)496 void RegisterCallbackForWorkerEnv(std::function<void (napi_env)> callback) 497 { 498 workerEnvCallback_ = callback; 499 if (workerEnv_ != nullptr) { 500 workerEnvCallback_(workerEnv_); 501 } 502 } 503 GetWorkerEnv()504 napi_env GetWorkerEnv() const 505 { 506 return workerEnv_; 507 } 508 GetHostEnv()509 napi_env GetHostEnv() const 510 { 511 return hostEnv_; 512 } 513 SetQOSLevelUpdatedCallback(std::function<void ()> callback)514 void SetQOSLevelUpdatedCallback(std::function<void()> callback) 515 { 516 qosUpdatedCallback_ = callback; 517 } 518 private: 519 void WorkerOnMessageInner(); 520 void HostOnMessageInner(); 521 void HostOnErrorInner(); 522 void HostOnAllErrorsInner(); 523 void HostOnMessageErrorInner(); 524 void HostOnGlobalCallInner(); 525 void WorkerOnMessageErrorInner(); 526 void WorkerOnErrorInner(napi_value error); 527 528 void HandleHostException() const; 529 void HandleException(); 530 void HandleWorkerUncaughtException(napi_env env, napi_value exception); 531 void HandleUncaughtException(napi_value exception); 532 bool CallWorkerFunction(size_t argc, const napi_value* argv, const char* methodName, bool tryCatch); 533 void CallHostFunction(size_t argc, const napi_value* argv, const char* methodName) const; 534 535 bool HandleEventListeners(napi_env env, napi_value recv, size_t argc, const napi_value* argv, const char* type); 536 void ParentPortHandleEventListeners(napi_env env, napi_value recv, size_t argc, 537 const napi_value* argv, const char* type, bool tryCatch); 538 void TerminateInner(); 539 540 void PostMessageInner(MessageDataType data); 541 void PostMessageToHostInner(MessageDataType data); 542 543 void TerminateWorker(); 544 545 void CloseInner(); 546 547 void PublishWorkerOverSignal(); 548 void CloseWorkerCallback(); 549 void CloseHostCallback(); 550 551 void PostWorkerOverTask(); 552 void PostWorkerErrorTask(); 553 void PostWorkerExceptionTask(); 554 void PostWorkerMessageTask(); 555 void PostWorkerGlobalCallTask(); 556 static bool IsValidWorker(Worker* worker); 557 static bool IsValidLimitedWorker(Worker* limitedWorker); 558 static void HostEnvCleanCallbackInner(Worker* worker); 559 560 void InitHostHandle(uv_loop_t* loop); 561 void CloseHostHandle(); 562 563 void ReleaseWorkerThreadContent(); 564 void ReleaseHostThreadContent(); 565 bool PrepareForWorkerInstance(); 566 void ApplyNameSetting(); 567 void ParentPortAddListenerInner(napi_env env, const char* type, const WorkerListener* listener); 568 void ParentPortRemoveAllListenerInner(); 569 void ParentPortRemoveListenerInner(napi_env env, const char* type, napi_ref callback); 570 void GetContainerScopeId(napi_env env); 571 572 void AddGlobalCallObject(const std::string &instanceName, napi_ref obj); 573 bool RemoveGlobalCallObject(const std::string &instanceName); 574 void ClearGlobalCallObject(); 575 void AddGlobalCallError(int32_t errCode, napi_value errData = nullptr); 576 void HandleGlobalCallError(napi_env env); 577 void ClearGlobalCallError(napi_env env); 578 void InitGlobalCallStatus(napi_env env); 579 void IncreaseGlobalCallId(); 580 581 void ClearHostMessage(napi_env env); 582 583 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 584 static void HandleDebuggerTask(const uv_async_t* req); 585 void DebuggerOnPostTask(std::function<void()>&& task); 586 #endif 587 588 #ifdef ENABLE_QOS 589 void SetQOSLevel(); 590 #endif 591 std::string script_ {}; 592 std::string fileName_ {}; 593 std::string name_ {}; 594 ScriptMode scriptMode_ {CLASSIC}; 595 WorkerType workerType_ {WorkerType::THREAD_WORKER}; 596 bool isLimitedWorker_ {false}; 597 bool isRelativePath_ {false}; 598 int32_t scopeId_ {-1}; 599 600 MessageQueue workerMessageQueue_ {}; 601 MessageQueue hostMessageQueue_ {}; 602 std::mutex globalCallMutex_; 603 MarkedMessageQueue hostGlobalCallQueue_ {}; 604 MessageQueue workerGlobalCallQueue_ {}; 605 MessageQueue errorQueue_ {}; 606 MessageQueue exceptionQueue_ {}; 607 608 uv_async_t* workerOnMessageSignal_ = nullptr; 609 uv_async_t* workerOnTerminateSignal_ = nullptr; 610 uv_async_t* hostOnMessageSignal_ = nullptr; 611 uv_async_t* hostOnErrorSignal_ = nullptr; 612 uv_async_t* hostOnAllErrorsSignal_ = nullptr; 613 uv_async_t* hostOnGlobalCallSignal_ = nullptr; 614 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 615 uv_async_t* debuggerOnPostTaskSignal_ = nullptr; 616 std::mutex debuggerMutex_; 617 std::queue<DebuggerPostTask> debuggerQueue_ {}; 618 #endif 619 620 std::atomic<RunnerState> runnerState_ {STARTING}; 621 std::atomic<HostState> hostState_ {ACTIVE}; 622 std::unique_ptr<WorkerRunner> runner_ {}; 623 624 std::atomic<bool> isErrorExit_ = false; 625 626 napi_env hostEnv_ {nullptr}; 627 napi_env workerEnv_ {nullptr}; 628 629 napi_ref workerRef_ {nullptr}; 630 napi_ref workerPort_ {nullptr}; 631 632 std::map<std::string, std::list<WorkerListener*>> eventListeners_ {}; 633 std::map<std::string, std::list<WorkerListener*>> parentPortEventListeners_ {}; 634 std::unordered_map<std::string, napi_ref> globalCallObjects_ {}; 635 std::queue<std::pair<int32_t, napi_value>> globalCallErrors_ {}; 636 std::atomic<uint32_t> globalCallId_ = 1; // 0: reserved for error check 637 638 std::recursive_mutex liveStatusLock_ {}; 639 std::mutex workerOnmessageMutex_ {}; 640 641 std::condition_variable cv_; 642 std::atomic<bool> globalCallSuccess_ = true; 643 std::function<void(napi_env)> workerEnvCallback_; 644 645 bool isMainThreadWorker_ = true; 646 bool isNewVersion_ = true; 647 std::atomic<bool> isTerminated_ = false; 648 std::atomic<bool> isHostEnvExited_ = false; 649 650 std::shared_ptr<WorkerWrapper> workerWrapper_ = nullptr; 651 WorkerPriority workerPriority_ = WorkerPriority::INVALID; 652 std::function<void()> qosUpdatedCallback_; 653 654 friend class WorkersTest; 655 }; 656 657 class ContainerScope { 658 public: ContainerScope(NativeEngine * engine,uint32_t scopeId)659 ContainerScope(NativeEngine* engine, uint32_t scopeId) 660 : engine_(engine), scopeId_(scopeId), initialized_(false) 661 { 662 if (engine_ != nullptr) { 663 initialized_ = engine_->InitContainerScopeFunc(scopeId_); 664 } 665 } 666 ~ContainerScope()667 ~ContainerScope() 668 { 669 if (engine_ != nullptr && initialized_) { 670 engine_->FinishContainerScopeFunc(scopeId_); 671 } 672 } 673 IsInitialized()674 bool IsInitialized() const 675 { 676 return initialized_; 677 } 678 679 private: 680 NativeEngine* engine_ {nullptr}; 681 int32_t scopeId_ {-1}; 682 bool initialized_ {false}; 683 }; 684 } // namespace Commonlibrary::Concurrent::WorkerModule 685 #endif // JS_CONCURRENT_MODULE_WORKER_WORKER_H 686