• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "node_worker.h"
2 #include "debug_utils-inl.h"
3 #include "histogram-inl.h"
4 #include "memory_tracker-inl.h"
5 #include "node_errors.h"
6 #include "node_external_reference.h"
7 #include "node_buffer.h"
8 #include "node_options-inl.h"
9 #include "node_perf.h"
10 #include "node_snapshot_builder.h"
11 #include "util-inl.h"
12 #include "async_wrap-inl.h"
13 
14 #include <memory>
15 #include <string>
16 #include <vector>
17 
18 using node::kAllowedInEnvvar;
19 using node::kDisallowedInEnvvar;
20 using v8::Array;
21 using v8::ArrayBuffer;
22 using v8::Boolean;
23 using v8::Context;
24 using v8::Float64Array;
25 using v8::FunctionCallbackInfo;
26 using v8::FunctionTemplate;
27 using v8::HandleScope;
28 using v8::Integer;
29 using v8::Isolate;
30 using v8::Local;
31 using v8::Locker;
32 using v8::Maybe;
33 using v8::MaybeLocal;
34 using v8::Null;
35 using v8::Number;
36 using v8::Object;
37 using v8::ResourceConstraints;
38 using v8::SealHandleScope;
39 using v8::String;
40 using v8::TryCatch;
41 using v8::Value;
42 
43 namespace node {
44 namespace worker {
45 
46 constexpr double kMB = 1024 * 1024;
47 
Worker(Environment * env,Local<Object> wrap,const std::string & url,const std::string & name,std::shared_ptr<PerIsolateOptions> per_isolate_opts,std::vector<std::string> && exec_argv,std::shared_ptr<KVStore> env_vars,const SnapshotData * snapshot_data)48 Worker::Worker(Environment* env,
49                Local<Object> wrap,
50                const std::string& url,
51                const std::string& name,
52                std::shared_ptr<PerIsolateOptions> per_isolate_opts,
53                std::vector<std::string>&& exec_argv,
54                std::shared_ptr<KVStore> env_vars,
55                const SnapshotData* snapshot_data)
56     : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
57       per_isolate_opts_(per_isolate_opts),
58       exec_argv_(exec_argv),
59       platform_(env->isolate_data()->platform()),
60       thread_id_(AllocateEnvironmentThreadId()),
61       name_(name),
62       env_vars_(env_vars),
63       snapshot_data_(snapshot_data) {
64   Debug(this, "Creating new worker instance with thread id %llu",
65         thread_id_.id);
66 
67   // Set up everything that needs to be set up in the parent environment.
68   MessagePort* parent_port = MessagePort::New(env, env->context());
69   if (parent_port == nullptr) {
70     // This can happen e.g. because execution is terminating.
71     return;
72   }
73 
74   child_port_data_ = std::make_unique<MessagePortData>(nullptr);
75   MessagePort::Entangle(parent_port, child_port_data_.get());
76 
77   object()
78       ->Set(env->context(), env->message_port_string(), parent_port->object())
79       .Check();
80 
81   object()->Set(env->context(),
82                 env->thread_id_string(),
83                 Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
84       .Check();
85 
86   inspector_parent_handle_ =
87       GetInspectorParentHandle(env, thread_id_, url.c_str(), name.c_str());
88 
89   argv_ = std::vector<std::string>{env->argv()[0]};
90   // Mark this Worker object as weak until we actually start the thread.
91   MakeWeak();
92 
93   Debug(this, "Preparation for worker %llu finished", thread_id_.id);
94 }
95 
is_stopped() const96 bool Worker::is_stopped() const {
97   Mutex::ScopedLock lock(mutex_);
98   if (env_ != nullptr)
99     return env_->is_stopping();
100   return stopped_;
101 }
102 
UpdateResourceConstraints(ResourceConstraints * constraints)103 void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
104   constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
105 
106   if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
107     constraints->set_max_young_generation_size_in_bytes(
108         static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
109   } else {
110     resource_limits_[kMaxYoungGenerationSizeMb] =
111         constraints->max_young_generation_size_in_bytes() / kMB;
112   }
113 
114   if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
115     constraints->set_max_old_generation_size_in_bytes(
116         static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
117   } else {
118     resource_limits_[kMaxOldGenerationSizeMb] =
119         constraints->max_old_generation_size_in_bytes() / kMB;
120   }
121 
122   if (resource_limits_[kCodeRangeSizeMb] > 0) {
123     constraints->set_code_range_size_in_bytes(
124         static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
125   } else {
126     resource_limits_[kCodeRangeSizeMb] =
127         constraints->code_range_size_in_bytes() / kMB;
128   }
129 }
130 
131 // This class contains data that is only relevant to the child thread itself,
132 // and only while it is running.
133 // (Eventually, the Environment instance should probably also be moved here.)
134 class WorkerThreadData {
135  public:
WorkerThreadData(Worker * w)136   explicit WorkerThreadData(Worker* w)
137     : w_(w) {
138     int ret = uv_loop_init(&loop_);
139     if (ret != 0) {
140       char err_buf[128];
141       uv_err_name_r(ret, err_buf, sizeof(err_buf));
142       w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
143       return;
144     }
145     loop_init_failed_ = false;
146     uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
147 
148     std::shared_ptr<ArrayBufferAllocator> allocator =
149         ArrayBufferAllocator::Create();
150     Isolate::CreateParams params;
151     SetIsolateCreateParamsForNode(&params);
152     w->UpdateResourceConstraints(&params.constraints);
153     params.array_buffer_allocator_shared = allocator;
154     Isolate* isolate =
155         NewIsolate(&params, &loop_, w->platform_, w->snapshot_data());
156     if (isolate == nullptr) {
157       w->Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Isolate");
158       return;
159     }
160 
161     SetIsolateUpForNode(isolate);
162 
163     // Be sure it's called before Environment::InitializeDiagnostics()
164     // so that this callback stays when the callback of
165     // --heapsnapshot-near-heap-limit gets is popped.
166     isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
167 
168     {
169       Locker locker(isolate);
170       Isolate::Scope isolate_scope(isolate);
171       // V8 computes its stack limit the first time a `Locker` is used based on
172       // --stack-size. Reset it to the correct value.
173       isolate->SetStackLimit(w->stack_base_);
174 
175       HandleScope handle_scope(isolate);
176       isolate_data_.reset(CreateIsolateData(isolate,
177                                             &loop_,
178                                             w_->platform_,
179                                             allocator.get()));
180       CHECK(isolate_data_);
181       if (w_->per_isolate_opts_)
182         isolate_data_->set_options(std::move(w_->per_isolate_opts_));
183       isolate_data_->set_worker_context(w_);
184       isolate_data_->max_young_gen_size =
185           params.constraints.max_young_generation_size_in_bytes();
186     }
187 
188     Mutex::ScopedLock lock(w_->mutex_);
189     w_->isolate_ = isolate;
190   }
191 
~WorkerThreadData()192   ~WorkerThreadData() {
193     Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
194     Isolate* isolate;
195     {
196       Mutex::ScopedLock lock(w_->mutex_);
197       isolate = w_->isolate_;
198       w_->isolate_ = nullptr;
199     }
200 
201     if (isolate != nullptr) {
202       CHECK(!loop_init_failed_);
203       bool platform_finished = false;
204 
205       isolate_data_.reset();
206 
207       w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
208         *static_cast<bool*>(data) = true;
209       }, &platform_finished);
210 
211       // The order of these calls is important; if the Isolate is first disposed
212       // and then unregistered, there is a race condition window in which no
213       // new Isolate at the same address can successfully be registered with
214       // the platform.
215       // (Refs: https://github.com/nodejs/node/issues/30846)
216       w_->platform_->UnregisterIsolate(isolate);
217       isolate->Dispose();
218 
219       // Wait until the platform has cleaned up all relevant resources.
220       while (!platform_finished) {
221         uv_run(&loop_, UV_RUN_ONCE);
222       }
223     }
224     if (!loop_init_failed_) {
225       CheckedUvLoopClose(&loop_);
226     }
227   }
228 
loop_is_usable() const229   bool loop_is_usable() const { return !loop_init_failed_; }
230 
231  private:
232   Worker* const w_;
233   uv_loop_t loop_;
234   bool loop_init_failed_ = true;
235   DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
236   const SnapshotData* snapshot_data_ = nullptr;
237   friend class Worker;
238 };
239 
NearHeapLimit(void * data,size_t current_heap_limit,size_t initial_heap_limit)240 size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
241                              size_t initial_heap_limit) {
242   Worker* worker = static_cast<Worker*>(data);
243   // Give the current GC some extra leeway to let it finish rather than
244   // crash hard. We are not going to perform further allocations anyway.
245   constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
246   size_t new_limit = current_heap_limit + kExtraHeapAllowance;
247   Environment* env = worker->env();
248   if (env != nullptr) {
249     DCHECK(!env->is_in_heapsnapshot_heap_limit_callback());
250     Debug(env,
251           DebugCategory::DIAGNOSTICS,
252           "Throwing ERR_WORKER_OUT_OF_MEMORY, "
253           "new_limit=%" PRIu64 "\n",
254           static_cast<uint64_t>(new_limit));
255   }
256   worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
257   return new_limit;
258 }
259 
Run()260 void Worker::Run() {
261   std::string trace_name = "[worker " + std::to_string(thread_id_.id) + "]" +
262                            (name_ == "" ? "" : " " + name_);
263   TRACE_EVENT_METADATA1(
264       "__metadata", "thread_name", "name", TRACE_STR_COPY(trace_name.c_str()));
265   CHECK_NOT_NULL(platform_);
266 
267   Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
268 
269   WorkerThreadData data(this);
270   if (isolate_ == nullptr) return;
271   CHECK(data.loop_is_usable());
272 
273   Debug(this, "Starting worker with id %llu", thread_id_.id);
274   {
275     Locker locker(isolate_);
276     Isolate::Scope isolate_scope(isolate_);
277     SealHandleScope outer_seal(isolate_);
278 
279     DeleteFnPtr<Environment, FreeEnvironment> env_;
280     auto cleanup_env = OnScopeLeave([&]() {
281       // TODO(addaleax): This call is harmless but should not be necessary.
282       // Figure out why V8 is raising a DCHECK() here without it
283       // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
284       isolate_->CancelTerminateExecution();
285 
286       if (!env_) return;
287       env_->set_can_call_into_js(false);
288 
289       {
290         Mutex::ScopedLock lock(mutex_);
291         stopped_ = true;
292         this->env_ = nullptr;
293       }
294 
295       env_.reset();
296     });
297 
298     if (is_stopped()) return;
299     {
300       HandleScope handle_scope(isolate_);
301       Local<Context> context;
302       {
303         // We create the Context object before we have an Environment* in place
304         // that we could use for error handling. If creation fails due to
305         // resource constraints, we need something in place to handle it,
306         // though.
307         TryCatch try_catch(isolate_);
308         if (snapshot_data_ != nullptr) {
309           context = Context::FromSnapshot(isolate_,
310                                           SnapshotData::kNodeBaseContextIndex)
311                         .ToLocalChecked();
312           if (!context.IsEmpty() &&
313               !InitializeContextRuntime(context).IsJust()) {
314             context = Local<Context>();
315           }
316         } else {
317           context = NewContext(isolate_);
318         }
319         if (context.IsEmpty()) {
320           Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Context");
321           return;
322         }
323       }
324 
325       if (is_stopped()) return;
326       CHECK(!context.IsEmpty());
327       Context::Scope context_scope(context);
328       {
329         env_.reset(CreateEnvironment(
330             data.isolate_data_.get(),
331             context,
332             std::move(argv_),
333             std::move(exec_argv_),
334             static_cast<EnvironmentFlags::Flags>(environment_flags_),
335             thread_id_,
336             std::move(inspector_parent_handle_)));
337         if (is_stopped()) return;
338         CHECK_NOT_NULL(env_);
339         env_->set_env_vars(std::move(env_vars_));
340         SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
341           Exit(exit_code);
342         });
343       }
344       {
345         Mutex::ScopedLock lock(mutex_);
346         if (stopped_) return;
347         this->env_ = env_.get();
348       }
349       Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
350       if (is_stopped()) return;
351       {
352         if (!CreateEnvMessagePort(env_.get())) {
353           return;
354         }
355 
356         Debug(this, "Created message port for worker %llu", thread_id_.id);
357         if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
358           return;
359 
360         Debug(this, "Loaded environment for worker %llu", thread_id_.id);
361       }
362     }
363 
364     {
365       Maybe<int> exit_code = SpinEventLoop(env_.get());
366       Mutex::ScopedLock lock(mutex_);
367       if (exit_code_ == 0 && exit_code.IsJust()) {
368         exit_code_ = exit_code.FromJust();
369       }
370 
371       Debug(this, "Exiting thread for worker %llu with exit code %d",
372             thread_id_.id, exit_code_);
373     }
374   }
375 
376   Debug(this, "Worker %llu thread stops", thread_id_.id);
377 }
378 
CreateEnvMessagePort(Environment * env)379 bool Worker::CreateEnvMessagePort(Environment* env) {
380   HandleScope handle_scope(isolate_);
381   std::unique_ptr<MessagePortData> data;
382   {
383     Mutex::ScopedLock lock(mutex_);
384     data = std::move(child_port_data_);
385   }
386 
387   // Set up the message channel for receiving messages in the child.
388   MessagePort* child_port = MessagePort::New(env,
389                                              env->context(),
390                                              std::move(data));
391   // MessagePort::New() may return nullptr if execution is terminated
392   // within it.
393   if (child_port != nullptr)
394     env->set_message_port(child_port->object(isolate_));
395 
396   return child_port;
397 }
398 
JoinThread()399 void Worker::JoinThread() {
400   if (!tid_.has_value())
401     return;
402   CHECK_EQ(uv_thread_join(&tid_.value()), 0);
403   tid_.reset();
404 
405   env()->remove_sub_worker_context(this);
406 
407   {
408     HandleScope handle_scope(env()->isolate());
409     Context::Scope context_scope(env()->context());
410 
411     // Reset the parent port as we're closing it now anyway.
412     object()->Set(env()->context(),
413                   env()->message_port_string(),
414                   Undefined(env()->isolate())).Check();
415 
416     Local<Value> args[] = {
417         Integer::New(env()->isolate(), exit_code_),
418         custom_error_ != nullptr
419             ? OneByteString(env()->isolate(), custom_error_).As<Value>()
420             : Null(env()->isolate()).As<Value>(),
421         !custom_error_str_.empty()
422             ? OneByteString(env()->isolate(), custom_error_str_.c_str())
423                   .As<Value>()
424             : Null(env()->isolate()).As<Value>(),
425     };
426 
427     MakeCallback(env()->onexit_string(), arraysize(args), args);
428   }
429 
430   // If we get here, the tid_.has_value() condition at the top of the function
431   // implies that the thread was running. In that case, its final action will
432   // be to schedule a callback on the parent thread which will delete this
433   // object, so there's nothing more to do here.
434 }
435 
~Worker()436 Worker::~Worker() {
437   Mutex::ScopedLock lock(mutex_);
438 
439   CHECK(stopped_);
440   CHECK_NULL(env_);
441   CHECK(!tid_.has_value());
442 
443   Debug(this, "Worker %llu destroyed", thread_id_.id);
444 }
445 
New(const FunctionCallbackInfo<Value> & args)446 void Worker::New(const FunctionCallbackInfo<Value>& args) {
447   Environment* env = Environment::GetCurrent(args);
448   Isolate* isolate = args.GetIsolate();
449 
450   CHECK(args.IsConstructCall());
451 
452   if (env->isolate_data()->platform() == nullptr) {
453     THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
454     return;
455   }
456 
457   std::string url;
458   std::string name;
459   std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
460   std::shared_ptr<KVStore> env_vars = nullptr;
461 
462   std::vector<std::string> exec_argv_out;
463 
464   // Argument might be a string or URL
465   if (!args[0]->IsNullOrUndefined()) {
466     Utf8Value value(
467         isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
468     url.append(value.out(), value.length());
469   }
470 
471   if (!args[5]->IsNullOrUndefined()) {
472     Utf8Value value(
473         isolate, args[5]->ToString(env->context()).FromMaybe(Local<String>()));
474     name.append(value.out(), value.length());
475   }
476 
477   if (args[1]->IsNull()) {
478     // Means worker.env = { ...process.env }.
479     env_vars = env->env_vars()->Clone(isolate);
480   } else if (args[1]->IsObject()) {
481     // User provided env.
482     env_vars = KVStore::CreateMapKVStore();
483     env_vars->AssignFromObject(isolate->GetCurrentContext(),
484                                args[1].As<Object>());
485   } else {
486     // Env is shared.
487     env_vars = env->env_vars();
488   }
489 
490   if (args[1]->IsObject() || args[2]->IsArray()) {
491     per_isolate_opts.reset(new PerIsolateOptions());
492 
493     HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
494       return env_vars->Get(name).FromMaybe("");
495     });
496 
497 #ifndef NODE_WITHOUT_NODE_OPTIONS
498     MaybeLocal<String> maybe_node_opts =
499         env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
500     Local<String> node_opts;
501     if (maybe_node_opts.ToLocal(&node_opts)) {
502       std::string node_options(*String::Utf8Value(isolate, node_opts));
503       std::vector<std::string> errors{};
504       std::vector<std::string> env_argv =
505           ParseNodeOptionsEnvVar(node_options, &errors);
506       // [0] is expected to be the program name, add dummy string.
507       env_argv.insert(env_argv.begin(), "");
508       std::vector<std::string> invalid_args{};
509       options_parser::Parse(&env_argv,
510                             nullptr,
511                             &invalid_args,
512                             per_isolate_opts.get(),
513                             kAllowedInEnvvar,
514                             &errors);
515       if (!errors.empty() && args[1]->IsObject()) {
516         // Only fail for explicitly provided env, this protects from failures
517         // when NODE_OPTIONS from parent's env is used (which is the default).
518         Local<Value> error;
519         if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
520         Local<String> key =
521             FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
522         // Ignore the return value of Set() because exceptions bubble up to JS
523         // when we return anyway.
524         USE(args.This()->Set(env->context(), key, error));
525         return;
526       }
527     }
528 #endif  // NODE_WITHOUT_NODE_OPTIONS
529   }
530 
531   if (args[2]->IsArray()) {
532     Local<Array> array = args[2].As<Array>();
533     // The first argument is reserved for program name, but we don't need it
534     // in workers.
535     std::vector<std::string> exec_argv = {""};
536     uint32_t length = array->Length();
537     for (uint32_t i = 0; i < length; i++) {
538       Local<Value> arg;
539       if (!array->Get(env->context(), i).ToLocal(&arg)) {
540         return;
541       }
542       Local<String> arg_v8;
543       if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
544         return;
545       }
546       Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
547       std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
548       exec_argv.push_back(arg_string);
549     }
550 
551     std::vector<std::string> invalid_args{};
552     std::vector<std::string> errors{};
553     // Using invalid_args as the v8_args argument as it stores unknown
554     // options for the per isolate parser.
555     options_parser::Parse(&exec_argv,
556                           &exec_argv_out,
557                           &invalid_args,
558                           per_isolate_opts.get(),
559                           kDisallowedInEnvvar,
560                           &errors);
561 
562     // The first argument is program name.
563     invalid_args.erase(invalid_args.begin());
564     if (errors.size() > 0 || invalid_args.size() > 0) {
565       Local<Value> error;
566       if (!ToV8Value(env->context(),
567                      errors.size() > 0 ? errors : invalid_args)
568                          .ToLocal(&error)) {
569         return;
570       }
571       Local<String> key =
572           FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
573       // Ignore the return value of Set() because exceptions bubble up to JS
574       // when we return anyway.
575       USE(args.This()->Set(env->context(), key, error));
576       return;
577     }
578   } else {
579     exec_argv_out = env->exec_argv();
580   }
581 
582   bool use_node_snapshot = per_process::cli_options->node_snapshot;
583   const SnapshotData* snapshot_data =
584       use_node_snapshot ? SnapshotBuilder::GetEmbeddedSnapshotData() : nullptr;
585 
586   Worker* worker = new Worker(env,
587                               args.This(),
588                               url,
589                               name,
590                               per_isolate_opts,
591                               std::move(exec_argv_out),
592                               env_vars,
593                               snapshot_data);
594 
595   CHECK(args[3]->IsFloat64Array());
596   Local<Float64Array> limit_info = args[3].As<Float64Array>();
597   CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
598   limit_info->CopyContents(worker->resource_limits_,
599                            sizeof(worker->resource_limits_));
600 
601   CHECK(args[4]->IsBoolean());
602   if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
603     worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
604   if (env->hide_console_windows())
605     worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
606   if (env->no_native_addons())
607     worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
608   if (env->no_global_search_paths())
609     worker->environment_flags_ |= EnvironmentFlags::kNoGlobalSearchPaths;
610   if (env->no_browser_globals())
611     worker->environment_flags_ |= EnvironmentFlags::kNoBrowserGlobals;
612 }
613 
StartThread(const FunctionCallbackInfo<Value> & args)614 void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
615   Worker* w;
616   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
617   Mutex::ScopedLock lock(w->mutex_);
618 
619   w->stopped_ = false;
620 
621   if (w->resource_limits_[kStackSizeMb] > 0) {
622     if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
623       w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
624       w->stack_size_ = kStackBufferSize;
625     } else {
626       w->stack_size_ =
627           static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
628     }
629   } else {
630     w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
631   }
632 
633   uv_thread_options_t thread_options;
634   thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
635   thread_options.stack_size = w->stack_size_;
636 
637   uv_thread_t* tid = &w->tid_.emplace();  // Create uv_thread_t instance
638   int ret = uv_thread_create_ex(tid, &thread_options, [](void* arg) {
639     // XXX: This could become a std::unique_ptr, but that makes at least
640     // gcc 6.3 detect undefined behaviour when there shouldn't be any.
641     // gcc 7+ handles this well.
642     Worker* w = static_cast<Worker*>(arg);
643     const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
644 
645     // Leave a few kilobytes just to make sure we're within limits and have
646     // some space to do work in C++ land.
647     w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
648 
649     w->Run();
650 
651     Mutex::ScopedLock lock(w->mutex_);
652     w->env()->SetImmediateThreadsafe(
653         [w = std::unique_ptr<Worker>(w)](Environment* env) {
654           if (w->has_ref_)
655             env->add_refs(-1);
656           w->JoinThread();
657           // implicitly delete w
658         });
659   }, static_cast<void*>(w));
660 
661   if (ret == 0) {
662     // The object now owns the created thread and should not be garbage
663     // collected until that finishes.
664     w->ClearWeak();
665 
666     if (w->has_ref_)
667       w->env()->add_refs(1);
668 
669     w->env()->add_sub_worker_context(w);
670   } else {
671     w->stopped_ = true;
672     w->tid_.reset();
673 
674     char err_buf[128];
675     uv_err_name_r(ret, err_buf, sizeof(err_buf));
676     {
677       Isolate* isolate = w->env()->isolate();
678       HandleScope handle_scope(isolate);
679       THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
680     }
681   }
682 }
683 
StopThread(const FunctionCallbackInfo<Value> & args)684 void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
685   Worker* w;
686   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
687 
688   Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
689   w->Exit(1);
690 }
691 
Ref(const FunctionCallbackInfo<Value> & args)692 void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
693   Worker* w;
694   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
695   if (!w->has_ref_ && w->tid_.has_value()) {
696     w->has_ref_ = true;
697     w->env()->add_refs(1);
698   }
699 }
700 
HasRef(const FunctionCallbackInfo<Value> & args)701 void Worker::HasRef(const FunctionCallbackInfo<Value>& args) {
702   Worker* w;
703   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
704   args.GetReturnValue().Set(w->has_ref_);
705 }
706 
Unref(const FunctionCallbackInfo<Value> & args)707 void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
708   Worker* w;
709   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
710   if (w->has_ref_ && w->tid_.has_value()) {
711     w->has_ref_ = false;
712     w->env()->add_refs(-1);
713   }
714 }
715 
GetResourceLimits(const FunctionCallbackInfo<Value> & args)716 void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
717   Worker* w;
718   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
719   args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
720 }
721 
GetResourceLimits(Isolate * isolate) const722 Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
723   Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
724 
725   memcpy(ab->Data(), resource_limits_, sizeof(resource_limits_));
726   return Float64Array::New(ab, 0, kTotalResourceLimitCount);
727 }
728 
Exit(int code,const char * error_code,const char * error_message)729 void Worker::Exit(int code, const char* error_code, const char* error_message) {
730   Mutex::ScopedLock lock(mutex_);
731   Debug(this, "Worker %llu called Exit(%d, %s, %s)",
732         thread_id_.id, code, error_code, error_message);
733 
734   if (error_code != nullptr) {
735     custom_error_ = error_code;
736     custom_error_str_ = error_message;
737   }
738 
739   if (env_ != nullptr) {
740     exit_code_ = code;
741     Stop(env_);
742   } else {
743     stopped_ = true;
744   }
745 }
746 
IsNotIndicativeOfMemoryLeakAtExit() const747 bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
748   // Worker objects always stay alive as long as the child thread, regardless
749   // of whether they are being referenced in the parent thread.
750   return true;
751 }
752 
753 class WorkerHeapSnapshotTaker : public AsyncWrap {
754  public:
WorkerHeapSnapshotTaker(Environment * env,Local<Object> obj)755   WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
756     : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
757 
758   SET_NO_MEMORY_INFO()
759   SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
760   SET_SELF_SIZE(WorkerHeapSnapshotTaker)
761 };
762 
TakeHeapSnapshot(const FunctionCallbackInfo<Value> & args)763 void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
764   Worker* w;
765   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
766 
767   Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
768 
769   Environment* env = w->env();
770   AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
771   Local<Object> wrap;
772   if (!env->worker_heap_snapshot_taker_template()
773       ->NewInstance(env->context()).ToLocal(&wrap)) {
774     return;
775   }
776 
777   // The created WorkerHeapSnapshotTaker is an object owned by main
778   // thread's Isolate, it can not be accessed by worker thread
779   std::unique_ptr<BaseObjectPtr<WorkerHeapSnapshotTaker>> taker =
780       std::make_unique<BaseObjectPtr<WorkerHeapSnapshotTaker>>(
781           MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap));
782 
783   // Interrupt the worker thread and take a snapshot, then schedule a call
784   // on the parent thread that turns that snapshot into a readable stream.
785   bool scheduled = w->RequestInterrupt([taker = std::move(taker),
786                                         env](Environment* worker_env) mutable {
787     heap::HeapSnapshotPointer snapshot{
788         worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot()};
789     CHECK(snapshot);
790 
791     // Here, the worker thread temporarily owns the WorkerHeapSnapshotTaker
792     // object.
793 
794     env->SetImmediateThreadsafe(
795         [taker = std::move(taker),
796          snapshot = std::move(snapshot)](Environment* env) mutable {
797           HandleScope handle_scope(env->isolate());
798           Context::Scope context_scope(env->context());
799 
800           AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());
801           BaseObjectPtr<AsyncWrap> stream =
802               heap::CreateHeapSnapshotStream(env, std::move(snapshot));
803           Local<Value> args[] = {stream->object()};
804           taker->get()->MakeCallback(
805               env->ondone_string(), arraysize(args), args);
806           // implicitly delete `taker`
807         },
808         CallbackFlags::kUnrefed);
809 
810     // Now, the lambda is delivered to the main thread, as a result, the
811     // WorkerHeapSnapshotTaker object is delivered to the main thread, too.
812   });
813 
814   if (scheduled) {
815     args.GetReturnValue().Set(wrap);
816   } else {
817     args.GetReturnValue().Set(Local<Object>());
818   }
819 }
820 
LoopIdleTime(const FunctionCallbackInfo<Value> & args)821 void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
822   Worker* w;
823   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
824 
825   Mutex::ScopedLock lock(w->mutex_);
826   // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
827   // before locking the mutex is a race condition. So manually do the same
828   // check.
829   if (w->stopped_ || w->env_ == nullptr)
830     return args.GetReturnValue().Set(-1);
831 
832   uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
833   args.GetReturnValue().Set(1.0 * idle_time / 1e6);
834 }
835 
LoopStartTime(const FunctionCallbackInfo<Value> & args)836 void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
837   Worker* w;
838   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
839 
840   Mutex::ScopedLock lock(w->mutex_);
841   // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
842   // before locking the mutex is a race condition. So manually do the same
843   // check.
844   if (w->stopped_ || w->env_ == nullptr)
845     return args.GetReturnValue().Set(-1);
846 
847   double loop_start_time = w->env_->performance_state()->milestones[
848       node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
849   CHECK_GE(loop_start_time, 0);
850   args.GetReturnValue().Set(
851       (loop_start_time - node::performance::timeOrigin) / 1e6);
852 }
853 
854 namespace {
855 
856 // Return the MessagePort that is global for this Environment and communicates
857 // with the internal [kPort] port of the JS Worker class in the parent thread.
GetEnvMessagePort(const FunctionCallbackInfo<Value> & args)858 void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
859   Environment* env = Environment::GetCurrent(args);
860   Local<Object> port = env->message_port();
861   CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
862   if (!port.IsEmpty()) {
863     CHECK_EQ(port->GetCreationContext().ToLocalChecked()->GetIsolate(),
864              args.GetIsolate());
865     args.GetReturnValue().Set(port);
866   }
867 }
868 
InitWorker(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)869 void InitWorker(Local<Object> target,
870                 Local<Value> unused,
871                 Local<Context> context,
872                 void* priv) {
873   Environment* env = Environment::GetCurrent(context);
874   Isolate* isolate = env->isolate();
875 
876   {
877     Local<FunctionTemplate> w = NewFunctionTemplate(isolate, Worker::New);
878 
879     w->InstanceTemplate()->SetInternalFieldCount(
880         Worker::kInternalFieldCount);
881     w->Inherit(AsyncWrap::GetConstructorTemplate(env));
882 
883     SetProtoMethod(isolate, w, "startThread", Worker::StartThread);
884     SetProtoMethod(isolate, w, "stopThread", Worker::StopThread);
885     SetProtoMethod(isolate, w, "hasRef", Worker::HasRef);
886     SetProtoMethod(isolate, w, "ref", Worker::Ref);
887     SetProtoMethod(isolate, w, "unref", Worker::Unref);
888     SetProtoMethod(isolate, w, "getResourceLimits", Worker::GetResourceLimits);
889     SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
890     SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
891     SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
892 
893     SetConstructorFunction(context, target, "Worker", w);
894   }
895 
896   {
897     Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
898 
899     wst->InstanceTemplate()->SetInternalFieldCount(
900         WorkerHeapSnapshotTaker::kInternalFieldCount);
901     wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
902 
903     Local<String> wst_string =
904         FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapSnapshotTaker");
905     wst->SetClassName(wst_string);
906     env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
907   }
908 
909   SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort);
910 
911   target
912       ->Set(env->context(),
913             env->thread_id_string(),
914             Number::New(isolate, static_cast<double>(env->thread_id())))
915       .Check();
916 
917   target
918       ->Set(env->context(),
919             FIXED_ONE_BYTE_STRING(isolate, "isMainThread"),
920             Boolean::New(isolate, env->is_main_thread()))
921       .Check();
922 
923   target
924       ->Set(env->context(),
925             FIXED_ONE_BYTE_STRING(isolate, "ownsProcessState"),
926             Boolean::New(isolate, env->owns_process_state()))
927       .Check();
928 
929   if (!env->is_main_thread()) {
930     target
931         ->Set(env->context(),
932               FIXED_ONE_BYTE_STRING(isolate, "resourceLimits"),
933               env->worker_context()->GetResourceLimits(isolate))
934         .Check();
935   }
936 
937   NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
938   NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
939   NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
940   NODE_DEFINE_CONSTANT(target, kStackSizeMb);
941   NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
942 }
943 
RegisterExternalReferences(ExternalReferenceRegistry * registry)944 void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
945   registry->Register(GetEnvMessagePort);
946   registry->Register(Worker::New);
947   registry->Register(Worker::StartThread);
948   registry->Register(Worker::StopThread);
949   registry->Register(Worker::HasRef);
950   registry->Register(Worker::Ref);
951   registry->Register(Worker::Unref);
952   registry->Register(Worker::GetResourceLimits);
953   registry->Register(Worker::TakeHeapSnapshot);
954   registry->Register(Worker::LoopIdleTime);
955   registry->Register(Worker::LoopStartTime);
956 }
957 
958 }  // anonymous namespace
959 }  // namespace worker
960 }  // namespace node
961 
962 NODE_BINDING_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
963 NODE_BINDING_EXTERNAL_REFERENCE(worker,
964                                 node::worker::RegisterExternalReferences)
965