• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "node_worker.h"
2 #include "debug_utils-inl.h"
3 #include "histogram-inl.h"
4 #include "memory_tracker-inl.h"
5 #include "node_errors.h"
6 #include "node_external_reference.h"
7 #include "node_buffer.h"
8 #include "node_options-inl.h"
9 #include "node_perf.h"
10 #include "node_snapshot_builder.h"
11 #include "util-inl.h"
12 #include "async_wrap-inl.h"
13 
14 #include <memory>
15 #include <string>
16 #include <vector>
17 
18 using node::kAllowedInEnvvar;
19 using node::kDisallowedInEnvvar;
20 using v8::Array;
21 using v8::ArrayBuffer;
22 using v8::Boolean;
23 using v8::Context;
24 using v8::Float64Array;
25 using v8::FunctionCallbackInfo;
26 using v8::FunctionTemplate;
27 using v8::HandleScope;
28 using v8::Integer;
29 using v8::Isolate;
30 using v8::Local;
31 using v8::Locker;
32 using v8::Maybe;
33 using v8::MaybeLocal;
34 using v8::Null;
35 using v8::Number;
36 using v8::Object;
37 using v8::ResourceConstraints;
38 using v8::SealHandleScope;
39 using v8::String;
40 using v8::TryCatch;
41 using v8::Value;
42 
43 namespace node {
44 namespace worker {
45 
46 constexpr double kMB = 1024 * 1024;
47 
Worker(Environment * env,Local<Object> wrap,const std::string & url,const std::string & name,std::shared_ptr<PerIsolateOptions> per_isolate_opts,std::vector<std::string> && exec_argv,std::shared_ptr<KVStore> env_vars,const SnapshotData * snapshot_data)48 Worker::Worker(Environment* env,
49                Local<Object> wrap,
50                const std::string& url,
51                const std::string& name,
52                std::shared_ptr<PerIsolateOptions> per_isolate_opts,
53                std::vector<std::string>&& exec_argv,
54                std::shared_ptr<KVStore> env_vars,
55                const SnapshotData* snapshot_data)
56     : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
57       per_isolate_opts_(per_isolate_opts),
58       exec_argv_(exec_argv),
59       platform_(env->isolate_data()->platform()),
60       thread_id_(AllocateEnvironmentThreadId()),
61       name_(name),
62       env_vars_(env_vars),
63       snapshot_data_(snapshot_data) {
64   Debug(this, "Creating new worker instance with thread id %llu",
65         thread_id_.id);
66 
67   // Set up everything that needs to be set up in the parent environment.
68   MessagePort* parent_port = MessagePort::New(env, env->context());
69   if (parent_port == nullptr) {
70     // This can happen e.g. because execution is terminating.
71     return;
72   }
73 
74   child_port_data_ = std::make_unique<MessagePortData>(nullptr);
75   MessagePort::Entangle(parent_port, child_port_data_.get());
76 
77   object()
78       ->Set(env->context(), env->message_port_string(), parent_port->object())
79       .Check();
80 
81   object()->Set(env->context(),
82                 env->thread_id_string(),
83                 Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
84       .Check();
85 
86   inspector_parent_handle_ =
87       GetInspectorParentHandle(env, thread_id_, url.c_str(), name.c_str());
88 
89   argv_ = std::vector<std::string>{env->argv()[0]};
90   // Mark this Worker object as weak until we actually start the thread.
91   MakeWeak();
92 
93   Debug(this, "Preparation for worker %llu finished", thread_id_.id);
94 }
95 
is_stopped() const96 bool Worker::is_stopped() const {
97   Mutex::ScopedLock lock(mutex_);
98   if (env_ != nullptr)
99     return env_->is_stopping();
100   return stopped_;
101 }
102 
UpdateResourceConstraints(ResourceConstraints * constraints)103 void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
104   constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
105 
106   if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
107     constraints->set_max_young_generation_size_in_bytes(
108         static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
109   } else {
110     resource_limits_[kMaxYoungGenerationSizeMb] =
111         constraints->max_young_generation_size_in_bytes() / kMB;
112   }
113 
114   if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
115     constraints->set_max_old_generation_size_in_bytes(
116         static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
117   } else {
118     resource_limits_[kMaxOldGenerationSizeMb] =
119         constraints->max_old_generation_size_in_bytes() / kMB;
120   }
121 
122   if (resource_limits_[kCodeRangeSizeMb] > 0) {
123     constraints->set_code_range_size_in_bytes(
124         static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
125   } else {
126     resource_limits_[kCodeRangeSizeMb] =
127         constraints->code_range_size_in_bytes() / kMB;
128   }
129 }
130 
131 // This class contains data that is only relevant to the child thread itself,
132 // and only while it is running.
133 // (Eventually, the Environment instance should probably also be moved here.)
134 class WorkerThreadData {
135  public:
WorkerThreadData(Worker * w)136   explicit WorkerThreadData(Worker* w)
137     : w_(w) {
138     int ret = uv_loop_init(&loop_);
139     if (ret != 0) {
140       char err_buf[128];
141       uv_err_name_r(ret, err_buf, sizeof(err_buf));
142       w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
143       return;
144     }
145     loop_init_failed_ = false;
146     uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
147 
148     std::shared_ptr<ArrayBufferAllocator> allocator =
149         ArrayBufferAllocator::Create();
150     Isolate::CreateParams params;
151     SetIsolateCreateParamsForNode(&params);
152     w->UpdateResourceConstraints(&params.constraints);
153     params.array_buffer_allocator_shared = allocator;
154     Isolate* isolate =
155         NewIsolate(&params, &loop_, w->platform_, w->snapshot_data());
156     if (isolate == nullptr) {
157       w->Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Isolate");
158       return;
159     }
160 
161     SetIsolateUpForNode(isolate);
162 
163     // Be sure it's called before Environment::InitializeDiagnostics()
164     // so that this callback stays when the callback of
165     // --heapsnapshot-near-heap-limit gets is popped.
166     isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
167 
168     {
169       Locker locker(isolate);
170       Isolate::Scope isolate_scope(isolate);
171       // V8 computes its stack limit the first time a `Locker` is used based on
172       // --stack-size. Reset it to the correct value.
173       isolate->SetStackLimit(w->stack_base_);
174 
175       HandleScope handle_scope(isolate);
176       isolate_data_.reset(CreateIsolateData(isolate,
177                                             &loop_,
178                                             w_->platform_,
179                                             allocator.get()));
180       CHECK(isolate_data_);
181       if (w_->per_isolate_opts_)
182         isolate_data_->set_options(std::move(w_->per_isolate_opts_));
183       isolate_data_->set_worker_context(w_);
184       isolate_data_->max_young_gen_size =
185           params.constraints.max_young_generation_size_in_bytes();
186     }
187 
188     Mutex::ScopedLock lock(w_->mutex_);
189     w_->isolate_ = isolate;
190   }
191 
~WorkerThreadData()192   ~WorkerThreadData() {
193     Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
194     Isolate* isolate;
195     {
196       Mutex::ScopedLock lock(w_->mutex_);
197       isolate = w_->isolate_;
198       w_->isolate_ = nullptr;
199     }
200 
201     if (isolate != nullptr) {
202       CHECK(!loop_init_failed_);
203       bool platform_finished = false;
204 
205       isolate_data_.reset();
206 
207       w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
208         *static_cast<bool*>(data) = true;
209       }, &platform_finished);
210 
211       // The order of these calls is important; if the Isolate is first disposed
212       // and then unregistered, there is a race condition window in which no
213       // new Isolate at the same address can successfully be registered with
214       // the platform.
215       // (Refs: https://github.com/nodejs/node/issues/30846)
216       w_->platform_->UnregisterIsolate(isolate);
217       isolate->Dispose();
218 
219       // Wait until the platform has cleaned up all relevant resources.
220       while (!platform_finished) {
221         uv_run(&loop_, UV_RUN_ONCE);
222       }
223     }
224     if (!loop_init_failed_) {
225       CheckedUvLoopClose(&loop_);
226     }
227   }
228 
loop_is_usable() const229   bool loop_is_usable() const { return !loop_init_failed_; }
230 
231  private:
232   Worker* const w_;
233   uv_loop_t loop_;
234   bool loop_init_failed_ = true;
235   DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
236   const SnapshotData* snapshot_data_ = nullptr;
237   friend class Worker;
238 };
239 
NearHeapLimit(void * data,size_t current_heap_limit,size_t initial_heap_limit)240 size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
241                              size_t initial_heap_limit) {
242   Worker* worker = static_cast<Worker*>(data);
243   // Give the current GC some extra leeway to let it finish rather than
244   // crash hard. We are not going to perform further allocations anyway.
245   constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
246   size_t new_limit = current_heap_limit + kExtraHeapAllowance;
247   Environment* env = worker->env();
248   if (env != nullptr) {
249     DCHECK(!env->is_in_heapsnapshot_heap_limit_callback());
250     Debug(env,
251           DebugCategory::DIAGNOSTICS,
252           "Throwing ERR_WORKER_OUT_OF_MEMORY, "
253           "new_limit=%" PRIu64 "\n",
254           static_cast<uint64_t>(new_limit));
255   }
256   worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
257   return new_limit;
258 }
259 
Run()260 void Worker::Run() {
261   std::string trace_name = "[worker " + std::to_string(thread_id_.id) + "]" +
262                            (name_ == "" ? "" : " " + name_);
263   TRACE_EVENT_METADATA1(
264       "__metadata", "thread_name", "name", TRACE_STR_COPY(trace_name.c_str()));
265   CHECK_NOT_NULL(platform_);
266 
267   Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
268 
269   WorkerThreadData data(this);
270   if (isolate_ == nullptr) return;
271   CHECK(data.loop_is_usable());
272 
273   Debug(this, "Starting worker with id %llu", thread_id_.id);
274   {
275     Locker locker(isolate_);
276     Isolate::Scope isolate_scope(isolate_);
277     SealHandleScope outer_seal(isolate_);
278 
279     DeleteFnPtr<Environment, FreeEnvironment> env_;
280     auto cleanup_env = OnScopeLeave([&]() {
281       // TODO(addaleax): This call is harmless but should not be necessary.
282       // Figure out why V8 is raising a DCHECK() here without it
283       // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
284       isolate_->CancelTerminateExecution();
285 
286       if (!env_) return;
287       env_->set_can_call_into_js(false);
288 
289       {
290         Mutex::ScopedLock lock(mutex_);
291         stopped_ = true;
292         this->env_ = nullptr;
293       }
294 
295       env_.reset();
296     });
297 
298     if (is_stopped()) return;
299     {
300       HandleScope handle_scope(isolate_);
301       Local<Context> context;
302       {
303         // We create the Context object before we have an Environment* in place
304         // that we could use for error handling. If creation fails due to
305         // resource constraints, we need something in place to handle it,
306         // though.
307         TryCatch try_catch(isolate_);
308         if (snapshot_data_ != nullptr) {
309           context = Context::FromSnapshot(isolate_,
310                                           SnapshotData::kNodeBaseContextIndex)
311                         .ToLocalChecked();
312           if (!context.IsEmpty() &&
313               !InitializeContextRuntime(context).IsJust()) {
314             context = Local<Context>();
315           }
316         } else {
317           context = NewContext(isolate_);
318         }
319         if (context.IsEmpty()) {
320           Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Context");
321           return;
322         }
323       }
324 
325       if (is_stopped()) return;
326       CHECK(!context.IsEmpty());
327       Context::Scope context_scope(context);
328       {
329         env_.reset(CreateEnvironment(
330             data.isolate_data_.get(),
331             context,
332             std::move(argv_),
333             std::move(exec_argv_),
334             static_cast<EnvironmentFlags::Flags>(environment_flags_),
335             thread_id_,
336             std::move(inspector_parent_handle_)));
337         if (is_stopped()) return;
338         CHECK_NOT_NULL(env_);
339         env_->set_env_vars(std::move(env_vars_));
340         SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
341           Exit(exit_code);
342         });
343       }
344       {
345         Mutex::ScopedLock lock(mutex_);
346         if (stopped_) return;
347         this->env_ = env_.get();
348       }
349       Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
350       if (is_stopped()) return;
351       {
352         if (!CreateEnvMessagePort(env_.get())) {
353           return;
354         }
355 
356         Debug(this, "Created message port for worker %llu", thread_id_.id);
357         if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
358           return;
359 
360         Debug(this, "Loaded environment for worker %llu", thread_id_.id);
361       }
362     }
363 
364     {
365       Maybe<int> exit_code = SpinEventLoop(env_.get());
366       Mutex::ScopedLock lock(mutex_);
367       if (exit_code_ == 0 && exit_code.IsJust()) {
368         exit_code_ = exit_code.FromJust();
369       }
370 
371       Debug(this, "Exiting thread for worker %llu with exit code %d",
372             thread_id_.id, exit_code_);
373     }
374   }
375 
376   Debug(this, "Worker %llu thread stops", thread_id_.id);
377 }
378 
CreateEnvMessagePort(Environment * env)379 bool Worker::CreateEnvMessagePort(Environment* env) {
380   HandleScope handle_scope(isolate_);
381   std::unique_ptr<MessagePortData> data;
382   {
383     Mutex::ScopedLock lock(mutex_);
384     data = std::move(child_port_data_);
385   }
386 
387   // Set up the message channel for receiving messages in the child.
388   MessagePort* child_port = MessagePort::New(env,
389                                              env->context(),
390                                              std::move(data));
391   // MessagePort::New() may return nullptr if execution is terminated
392   // within it.
393   if (child_port != nullptr)
394     env->set_message_port(child_port->object(isolate_));
395 
396   return child_port;
397 }
398 
JoinThread()399 void Worker::JoinThread() {
400   if (!tid_.has_value())
401     return;
402   CHECK_EQ(uv_thread_join(&tid_.value()), 0);
403   tid_.reset();
404 
405   env()->remove_sub_worker_context(this);
406 
407   {
408     HandleScope handle_scope(env()->isolate());
409     Context::Scope context_scope(env()->context());
410 
411     // Reset the parent port as we're closing it now anyway.
412     object()->Set(env()->context(),
413                   env()->message_port_string(),
414                   Undefined(env()->isolate())).Check();
415 
416     Local<Value> args[] = {
417         Integer::New(env()->isolate(), exit_code_),
418         custom_error_ != nullptr
419             ? OneByteString(env()->isolate(), custom_error_).As<Value>()
420             : Null(env()->isolate()).As<Value>(),
421         !custom_error_str_.empty()
422             ? OneByteString(env()->isolate(), custom_error_str_.c_str())
423                   .As<Value>()
424             : Null(env()->isolate()).As<Value>(),
425     };
426 
427     MakeCallback(env()->onexit_string(), arraysize(args), args);
428   }
429 
430   // If we get here, the tid_.has_value() condition at the top of the function
431   // implies that the thread was running. In that case, its final action will
432   // be to schedule a callback on the parent thread which will delete this
433   // object, so there's nothing more to do here.
434 }
435 
~Worker()436 Worker::~Worker() {
437   Mutex::ScopedLock lock(mutex_);
438 
439   CHECK(stopped_);
440   CHECK_NULL(env_);
441   CHECK(!tid_.has_value());
442 
443   Debug(this, "Worker %llu destroyed", thread_id_.id);
444 }
445 
New(const FunctionCallbackInfo<Value> & args)446 void Worker::New(const FunctionCallbackInfo<Value>& args) {
447   Environment* env = Environment::GetCurrent(args);
448   auto is_internal = args[5];
449   CHECK(is_internal->IsBoolean());
450   Isolate* isolate = args.GetIsolate();
451 
452   CHECK(args.IsConstructCall());
453 
454   if (env->isolate_data()->platform() == nullptr) {
455     THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
456     return;
457   }
458 
459   std::string url;
460   std::string name;
461   std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
462   std::shared_ptr<KVStore> env_vars = nullptr;
463 
464   std::vector<std::string> exec_argv_out;
465 
466   // Argument might be a string or URL
467   if (!args[0]->IsNullOrUndefined()) {
468     Utf8Value value(
469         isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
470     url.append(value.out(), value.length());
471   }
472 
473   if (!args[6]->IsNullOrUndefined()) {
474     Utf8Value value(
475         isolate, args[6]->ToString(env->context()).FromMaybe(Local<String>()));
476     name.append(value.out(), value.length());
477   }
478 
479   if (args[1]->IsNull()) {
480     // Means worker.env = { ...process.env }.
481     env_vars = env->env_vars()->Clone(isolate);
482   } else if (args[1]->IsObject()) {
483     // User provided env.
484     env_vars = KVStore::CreateMapKVStore();
485     env_vars->AssignFromObject(isolate->GetCurrentContext(),
486                                args[1].As<Object>());
487   } else {
488     // Env is shared.
489     env_vars = env->env_vars();
490   }
491 
492   if (args[1]->IsObject() || args[2]->IsArray()) {
493     per_isolate_opts.reset(new PerIsolateOptions());
494 
495     HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
496       return env_vars->Get(name).FromMaybe("");
497     });
498 
499 #ifndef NODE_WITHOUT_NODE_OPTIONS
500     MaybeLocal<String> maybe_node_opts =
501         env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
502     Local<String> node_opts;
503     if (maybe_node_opts.ToLocal(&node_opts)) {
504       std::string node_options(*String::Utf8Value(isolate, node_opts));
505       std::vector<std::string> errors{};
506       std::vector<std::string> env_argv =
507           ParseNodeOptionsEnvVar(node_options, &errors);
508       // [0] is expected to be the program name, add dummy string.
509       env_argv.insert(env_argv.begin(), "");
510       std::vector<std::string> invalid_args{};
511       options_parser::Parse(&env_argv,
512                             nullptr,
513                             &invalid_args,
514                             per_isolate_opts.get(),
515                             kAllowedInEnvvar,
516                             &errors);
517       if (!errors.empty() && args[1]->IsObject()) {
518         // Only fail for explicitly provided env, this protects from failures
519         // when NODE_OPTIONS from parent's env is used (which is the default).
520         Local<Value> error;
521         if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
522         Local<String> key =
523             FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
524         // Ignore the return value of Set() because exceptions bubble up to JS
525         // when we return anyway.
526         USE(args.This()->Set(env->context(), key, error));
527         return;
528       }
529     }
530 #endif  // NODE_WITHOUT_NODE_OPTIONS
531   }
532 
533   if (args[2]->IsArray()) {
534     Local<Array> array = args[2].As<Array>();
535     // The first argument is reserved for program name, but we don't need it
536     // in workers.
537     std::vector<std::string> exec_argv = {""};
538     uint32_t length = array->Length();
539     for (uint32_t i = 0; i < length; i++) {
540       Local<Value> arg;
541       if (!array->Get(env->context(), i).ToLocal(&arg)) {
542         return;
543       }
544       Local<String> arg_v8;
545       if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
546         return;
547       }
548       Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
549       std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
550       exec_argv.push_back(arg_string);
551     }
552 
553     std::vector<std::string> invalid_args{};
554     std::vector<std::string> errors{};
555     // Using invalid_args as the v8_args argument as it stores unknown
556     // options for the per isolate parser.
557     options_parser::Parse(&exec_argv,
558                           &exec_argv_out,
559                           &invalid_args,
560                           per_isolate_opts.get(),
561                           kDisallowedInEnvvar,
562                           &errors);
563 
564     // The first argument is program name.
565     invalid_args.erase(invalid_args.begin());
566     if (errors.size() > 0 || invalid_args.size() > 0) {
567       Local<Value> error;
568       if (!ToV8Value(env->context(),
569                      errors.size() > 0 ? errors : invalid_args)
570                          .ToLocal(&error)) {
571         return;
572       }
573       Local<String> key =
574           FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
575       // Ignore the return value of Set() because exceptions bubble up to JS
576       // when we return anyway.
577       USE(args.This()->Set(env->context(), key, error));
578       return;
579     }
580   } else {
581     exec_argv_out = env->exec_argv();
582   }
583 
584   bool use_node_snapshot = per_process::cli_options->node_snapshot;
585   const SnapshotData* snapshot_data =
586       use_node_snapshot ? SnapshotBuilder::GetEmbeddedSnapshotData() : nullptr;
587 
588   Worker* worker = new Worker(env,
589                               args.This(),
590                               url,
591                               name,
592                               per_isolate_opts,
593                               std::move(exec_argv_out),
594                               env_vars,
595                               snapshot_data);
596 
597   CHECK(args[3]->IsFloat64Array());
598   Local<Float64Array> limit_info = args[3].As<Float64Array>();
599   CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
600   limit_info->CopyContents(worker->resource_limits_,
601                            sizeof(worker->resource_limits_));
602 
603   CHECK(args[4]->IsBoolean());
604   if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
605     worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
606   if (env->hide_console_windows())
607     worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
608   if (env->no_native_addons())
609     worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
610   if (env->no_global_search_paths())
611     worker->environment_flags_ |= EnvironmentFlags::kNoGlobalSearchPaths;
612   if (env->no_browser_globals())
613     worker->environment_flags_ |= EnvironmentFlags::kNoBrowserGlobals;
614 }
615 
StartThread(const FunctionCallbackInfo<Value> & args)616 void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
617   Worker* w;
618   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
619   Mutex::ScopedLock lock(w->mutex_);
620 
621   w->stopped_ = false;
622 
623   if (w->resource_limits_[kStackSizeMb] > 0) {
624     if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
625       w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
626       w->stack_size_ = kStackBufferSize;
627     } else {
628       w->stack_size_ =
629           static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
630     }
631   } else {
632     w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
633   }
634 
635   uv_thread_options_t thread_options;
636   thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
637   thread_options.stack_size = w->stack_size_;
638 
639   uv_thread_t* tid = &w->tid_.emplace();  // Create uv_thread_t instance
640   int ret = uv_thread_create_ex(tid, &thread_options, [](void* arg) {
641     // XXX: This could become a std::unique_ptr, but that makes at least
642     // gcc 6.3 detect undefined behaviour when there shouldn't be any.
643     // gcc 7+ handles this well.
644     Worker* w = static_cast<Worker*>(arg);
645     const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
646 
647     // Leave a few kilobytes just to make sure we're within limits and have
648     // some space to do work in C++ land.
649     w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
650 
651     w->Run();
652 
653     Mutex::ScopedLock lock(w->mutex_);
654     w->env()->SetImmediateThreadsafe(
655         [w = std::unique_ptr<Worker>(w)](Environment* env) {
656           if (w->has_ref_)
657             env->add_refs(-1);
658           w->JoinThread();
659           // implicitly delete w
660         });
661   }, static_cast<void*>(w));
662 
663   if (ret == 0) {
664     // The object now owns the created thread and should not be garbage
665     // collected until that finishes.
666     w->ClearWeak();
667 
668     if (w->has_ref_)
669       w->env()->add_refs(1);
670 
671     w->env()->add_sub_worker_context(w);
672   } else {
673     w->stopped_ = true;
674     w->tid_.reset();
675 
676     char err_buf[128];
677     uv_err_name_r(ret, err_buf, sizeof(err_buf));
678     {
679       Isolate* isolate = w->env()->isolate();
680       HandleScope handle_scope(isolate);
681       THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
682     }
683   }
684 }
685 
StopThread(const FunctionCallbackInfo<Value> & args)686 void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
687   Worker* w;
688   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
689 
690   Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
691   w->Exit(1);
692 }
693 
Ref(const FunctionCallbackInfo<Value> & args)694 void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
695   Worker* w;
696   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
697   if (!w->has_ref_ && w->tid_.has_value()) {
698     w->has_ref_ = true;
699     w->env()->add_refs(1);
700   }
701 }
702 
HasRef(const FunctionCallbackInfo<Value> & args)703 void Worker::HasRef(const FunctionCallbackInfo<Value>& args) {
704   Worker* w;
705   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
706   args.GetReturnValue().Set(w->has_ref_);
707 }
708 
Unref(const FunctionCallbackInfo<Value> & args)709 void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
710   Worker* w;
711   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
712   if (w->has_ref_ && w->tid_.has_value()) {
713     w->has_ref_ = false;
714     w->env()->add_refs(-1);
715   }
716 }
717 
GetResourceLimits(const FunctionCallbackInfo<Value> & args)718 void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
719   Worker* w;
720   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
721   args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
722 }
723 
GetResourceLimits(Isolate * isolate) const724 Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
725   Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
726 
727   memcpy(ab->Data(), resource_limits_, sizeof(resource_limits_));
728   return Float64Array::New(ab, 0, kTotalResourceLimitCount);
729 }
730 
Exit(int code,const char * error_code,const char * error_message)731 void Worker::Exit(int code, const char* error_code, const char* error_message) {
732   Mutex::ScopedLock lock(mutex_);
733   Debug(this, "Worker %llu called Exit(%d, %s, %s)",
734         thread_id_.id, code, error_code, error_message);
735 
736   if (error_code != nullptr) {
737     custom_error_ = error_code;
738     custom_error_str_ = error_message;
739   }
740 
741   if (env_ != nullptr) {
742     exit_code_ = code;
743     Stop(env_);
744   } else {
745     stopped_ = true;
746   }
747 }
748 
IsNotIndicativeOfMemoryLeakAtExit() const749 bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
750   // Worker objects always stay alive as long as the child thread, regardless
751   // of whether they are being referenced in the parent thread.
752   return true;
753 }
754 
755 class WorkerHeapSnapshotTaker : public AsyncWrap {
756  public:
WorkerHeapSnapshotTaker(Environment * env,Local<Object> obj)757   WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
758     : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
759 
760   SET_NO_MEMORY_INFO()
761   SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
762   SET_SELF_SIZE(WorkerHeapSnapshotTaker)
763 };
764 
TakeHeapSnapshot(const FunctionCallbackInfo<Value> & args)765 void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
766   Worker* w;
767   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
768 
769   Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
770 
771   Environment* env = w->env();
772   AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
773   Local<Object> wrap;
774   if (!env->worker_heap_snapshot_taker_template()
775       ->NewInstance(env->context()).ToLocal(&wrap)) {
776     return;
777   }
778 
779   // The created WorkerHeapSnapshotTaker is an object owned by main
780   // thread's Isolate, it can not be accessed by worker thread
781   std::unique_ptr<BaseObjectPtr<WorkerHeapSnapshotTaker>> taker =
782       std::make_unique<BaseObjectPtr<WorkerHeapSnapshotTaker>>(
783           MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap));
784 
785   // Interrupt the worker thread and take a snapshot, then schedule a call
786   // on the parent thread that turns that snapshot into a readable stream.
787   bool scheduled = w->RequestInterrupt([taker = std::move(taker),
788                                         env](Environment* worker_env) mutable {
789     heap::HeapSnapshotPointer snapshot{
790         worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot()};
791     CHECK(snapshot);
792 
793     // Here, the worker thread temporarily owns the WorkerHeapSnapshotTaker
794     // object.
795 
796     env->SetImmediateThreadsafe(
797         [taker = std::move(taker),
798          snapshot = std::move(snapshot)](Environment* env) mutable {
799           HandleScope handle_scope(env->isolate());
800           Context::Scope context_scope(env->context());
801 
802           AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());
803           BaseObjectPtr<AsyncWrap> stream =
804               heap::CreateHeapSnapshotStream(env, std::move(snapshot));
805           Local<Value> args[] = {stream->object()};
806           taker->get()->MakeCallback(
807               env->ondone_string(), arraysize(args), args);
808           // implicitly delete `taker`
809         },
810         CallbackFlags::kUnrefed);
811 
812     // Now, the lambda is delivered to the main thread, as a result, the
813     // WorkerHeapSnapshotTaker object is delivered to the main thread, too.
814   });
815 
816   if (scheduled) {
817     args.GetReturnValue().Set(wrap);
818   } else {
819     args.GetReturnValue().Set(Local<Object>());
820   }
821 }
822 
LoopIdleTime(const FunctionCallbackInfo<Value> & args)823 void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
824   Worker* w;
825   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
826 
827   Mutex::ScopedLock lock(w->mutex_);
828   // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
829   // before locking the mutex is a race condition. So manually do the same
830   // check.
831   if (w->stopped_ || w->env_ == nullptr)
832     return args.GetReturnValue().Set(-1);
833 
834   uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
835   args.GetReturnValue().Set(1.0 * idle_time / 1e6);
836 }
837 
LoopStartTime(const FunctionCallbackInfo<Value> & args)838 void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
839   Worker* w;
840   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
841 
842   Mutex::ScopedLock lock(w->mutex_);
843   // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
844   // before locking the mutex is a race condition. So manually do the same
845   // check.
846   if (w->stopped_ || w->env_ == nullptr)
847     return args.GetReturnValue().Set(-1);
848 
849   double loop_start_time = w->env_->performance_state()->milestones[
850       node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
851   CHECK_GE(loop_start_time, 0);
852   args.GetReturnValue().Set(
853       (loop_start_time - node::performance::timeOrigin) / 1e6);
854 }
855 
856 namespace {
857 
858 // Return the MessagePort that is global for this Environment and communicates
859 // with the internal [kPort] port of the JS Worker class in the parent thread.
GetEnvMessagePort(const FunctionCallbackInfo<Value> & args)860 void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
861   Environment* env = Environment::GetCurrent(args);
862   Local<Object> port = env->message_port();
863   CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
864   if (!port.IsEmpty()) {
865     CHECK_EQ(port->GetCreationContext().ToLocalChecked()->GetIsolate(),
866              args.GetIsolate());
867     args.GetReturnValue().Set(port);
868   }
869 }
870 
InitWorker(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)871 void InitWorker(Local<Object> target,
872                 Local<Value> unused,
873                 Local<Context> context,
874                 void* priv) {
875   Environment* env = Environment::GetCurrent(context);
876   Isolate* isolate = env->isolate();
877 
878   {
879     Local<FunctionTemplate> w = NewFunctionTemplate(isolate, Worker::New);
880 
881     w->InstanceTemplate()->SetInternalFieldCount(
882         Worker::kInternalFieldCount);
883     w->Inherit(AsyncWrap::GetConstructorTemplate(env));
884 
885     SetProtoMethod(isolate, w, "startThread", Worker::StartThread);
886     SetProtoMethod(isolate, w, "stopThread", Worker::StopThread);
887     SetProtoMethod(isolate, w, "hasRef", Worker::HasRef);
888     SetProtoMethod(isolate, w, "ref", Worker::Ref);
889     SetProtoMethod(isolate, w, "unref", Worker::Unref);
890     SetProtoMethod(isolate, w, "getResourceLimits", Worker::GetResourceLimits);
891     SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
892     SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
893     SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
894 
895     SetConstructorFunction(context, target, "Worker", w);
896   }
897 
898   {
899     Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
900 
901     wst->InstanceTemplate()->SetInternalFieldCount(
902         WorkerHeapSnapshotTaker::kInternalFieldCount);
903     wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
904 
905     Local<String> wst_string =
906         FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapSnapshotTaker");
907     wst->SetClassName(wst_string);
908     env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
909   }
910 
911   SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort);
912 
913   target
914       ->Set(env->context(),
915             env->thread_id_string(),
916             Number::New(isolate, static_cast<double>(env->thread_id())))
917       .Check();
918 
919   target
920       ->Set(env->context(),
921             FIXED_ONE_BYTE_STRING(isolate, "isMainThread"),
922             Boolean::New(isolate, env->is_main_thread()))
923       .Check();
924 
925   target
926       ->Set(env->context(),
927             FIXED_ONE_BYTE_STRING(isolate, "ownsProcessState"),
928             Boolean::New(isolate, env->owns_process_state()))
929       .Check();
930 
931   if (!env->is_main_thread()) {
932     target
933         ->Set(env->context(),
934               FIXED_ONE_BYTE_STRING(isolate, "resourceLimits"),
935               env->worker_context()->GetResourceLimits(isolate))
936         .Check();
937   }
938 
939   NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
940   NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
941   NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
942   NODE_DEFINE_CONSTANT(target, kStackSizeMb);
943   NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
944 }
945 
RegisterExternalReferences(ExternalReferenceRegistry * registry)946 void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
947   registry->Register(GetEnvMessagePort);
948   registry->Register(Worker::New);
949   registry->Register(Worker::StartThread);
950   registry->Register(Worker::StopThread);
951   registry->Register(Worker::HasRef);
952   registry->Register(Worker::Ref);
953   registry->Register(Worker::Unref);
954   registry->Register(Worker::GetResourceLimits);
955   registry->Register(Worker::TakeHeapSnapshot);
956   registry->Register(Worker::LoopIdleTime);
957   registry->Register(Worker::LoopStartTime);
958 }
959 
960 }  // anonymous namespace
961 }  // namespace worker
962 }  // namespace node
963 
964 NODE_BINDING_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
965 NODE_BINDING_EXTERNAL_REFERENCE(worker,
966                                 node::worker::RegisterExternalReferences)
967