• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "node_worker.h"
2 #include "debug_utils-inl.h"
3 #include "memory_tracker-inl.h"
4 #include "node_errors.h"
5 #include "node_buffer.h"
6 #include "node_options-inl.h"
7 #include "node_perf.h"
8 #include "util-inl.h"
9 #include "async_wrap-inl.h"
10 
11 #include <memory>
12 #include <string>
13 #include <vector>
14 
15 using node::kAllowedInEnvironment;
16 using node::kDisallowedInEnvironment;
17 using v8::Array;
18 using v8::ArrayBuffer;
19 using v8::Boolean;
20 using v8::Context;
21 using v8::Float64Array;
22 using v8::FunctionCallbackInfo;
23 using v8::FunctionTemplate;
24 using v8::HandleScope;
25 using v8::Integer;
26 using v8::Isolate;
27 using v8::Local;
28 using v8::Locker;
29 using v8::MaybeLocal;
30 using v8::Null;
31 using v8::Number;
32 using v8::Object;
33 using v8::ResourceConstraints;
34 using v8::SealHandleScope;
35 using v8::String;
36 using v8::TryCatch;
37 using v8::Value;
38 
39 namespace node {
40 namespace worker {
41 
42 constexpr double kMB = 1024 * 1024;
43 
Worker(Environment * env,Local<Object> wrap,const std::string & url,std::shared_ptr<PerIsolateOptions> per_isolate_opts,std::vector<std::string> && exec_argv,std::shared_ptr<KVStore> env_vars)44 Worker::Worker(Environment* env,
45                Local<Object> wrap,
46                const std::string& url,
47                std::shared_ptr<PerIsolateOptions> per_isolate_opts,
48                std::vector<std::string>&& exec_argv,
49                std::shared_ptr<KVStore> env_vars)
50     : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
51       per_isolate_opts_(per_isolate_opts),
52       exec_argv_(exec_argv),
53       platform_(env->isolate_data()->platform()),
54       array_buffer_allocator_(ArrayBufferAllocator::Create()),
55       thread_id_(AllocateEnvironmentThreadId()),
56       env_vars_(env_vars) {
57   Debug(this, "Creating new worker instance with thread id %llu",
58         thread_id_.id);
59 
60   // Set up everything that needs to be set up in the parent environment.
61   parent_port_ = MessagePort::New(env, env->context());
62   if (parent_port_ == nullptr) {
63     // This can happen e.g. because execution is terminating.
64     return;
65   }
66 
67   child_port_data_ = std::make_unique<MessagePortData>(nullptr);
68   MessagePort::Entangle(parent_port_, child_port_data_.get());
69 
70   object()->Set(env->context(),
71                 env->message_port_string(),
72                 parent_port_->object()).Check();
73 
74   object()->Set(env->context(),
75                 env->thread_id_string(),
76                 Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
77       .Check();
78 
79   inspector_parent_handle_ = GetInspectorParentHandle(
80       env, thread_id_, url.c_str());
81 
82   argv_ = std::vector<std::string>{env->argv()[0]};
83   // Mark this Worker object as weak until we actually start the thread.
84   MakeWeak();
85 
86   Debug(this, "Preparation for worker %llu finished", thread_id_.id);
87 }
88 
is_stopped() const89 bool Worker::is_stopped() const {
90   Mutex::ScopedLock lock(mutex_);
91   if (env_ != nullptr)
92     return env_->is_stopping();
93   return stopped_;
94 }
95 
array_buffer_allocator()96 std::shared_ptr<ArrayBufferAllocator> Worker::array_buffer_allocator() {
97   return array_buffer_allocator_;
98 }
99 
UpdateResourceConstraints(ResourceConstraints * constraints)100 void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
101   constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
102 
103   if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
104     constraints->set_max_young_generation_size_in_bytes(
105         resource_limits_[kMaxYoungGenerationSizeMb] * kMB);
106   } else {
107     resource_limits_[kMaxYoungGenerationSizeMb] =
108         constraints->max_young_generation_size_in_bytes() / kMB;
109   }
110 
111   if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
112     constraints->set_max_old_generation_size_in_bytes(
113         resource_limits_[kMaxOldGenerationSizeMb] * kMB);
114   } else {
115     resource_limits_[kMaxOldGenerationSizeMb] =
116         constraints->max_old_generation_size_in_bytes() / kMB;
117   }
118 
119   if (resource_limits_[kCodeRangeSizeMb] > 0) {
120     constraints->set_code_range_size_in_bytes(
121         resource_limits_[kCodeRangeSizeMb] * kMB);
122   } else {
123     resource_limits_[kCodeRangeSizeMb] =
124         constraints->code_range_size_in_bytes() / kMB;
125   }
126 }
127 
128 // This class contains data that is only relevant to the child thread itself,
129 // and only while it is running.
130 // (Eventually, the Environment instance should probably also be moved here.)
131 class WorkerThreadData {
132  public:
WorkerThreadData(Worker * w)133   explicit WorkerThreadData(Worker* w)
134     : w_(w) {
135     int ret = uv_loop_init(&loop_);
136     if (ret != 0) {
137       char err_buf[128];
138       uv_err_name_r(ret, err_buf, sizeof(err_buf));
139       w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
140       return;
141     }
142     loop_init_failed_ = false;
143     uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
144 
145     Isolate::CreateParams params;
146     SetIsolateCreateParamsForNode(&params);
147     params.array_buffer_allocator = w->array_buffer_allocator_.get();
148 
149     w->UpdateResourceConstraints(&params.constraints);
150 
151     Isolate* isolate = Isolate::Allocate();
152     if (isolate == nullptr) {
153       // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
154       // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
155       w->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Isolate");
156       return;
157     }
158 
159     w->platform_->RegisterIsolate(isolate, &loop_);
160     Isolate::Initialize(isolate, params);
161     SetIsolateUpForNode(isolate);
162 
163     isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
164 
165     {
166       Locker locker(isolate);
167       Isolate::Scope isolate_scope(isolate);
168       // V8 computes its stack limit the first time a `Locker` is used based on
169       // --stack-size. Reset it to the correct value.
170       isolate->SetStackLimit(w->stack_base_);
171 
172       HandleScope handle_scope(isolate);
173       isolate_data_.reset(CreateIsolateData(isolate,
174                                             &loop_,
175                                             w_->platform_,
176                                             w->array_buffer_allocator_.get()));
177       CHECK(isolate_data_);
178       if (w_->per_isolate_opts_)
179         isolate_data_->set_options(std::move(w_->per_isolate_opts_));
180       isolate_data_->set_worker_context(w_);
181     }
182 
183     Mutex::ScopedLock lock(w_->mutex_);
184     w_->isolate_ = isolate;
185   }
186 
~WorkerThreadData()187   ~WorkerThreadData() {
188     Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
189     Isolate* isolate;
190     {
191       Mutex::ScopedLock lock(w_->mutex_);
192       isolate = w_->isolate_;
193       w_->isolate_ = nullptr;
194     }
195 
196     if (isolate != nullptr) {
197       CHECK(!loop_init_failed_);
198       bool platform_finished = false;
199 
200       isolate_data_.reset();
201 
202       w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
203         *static_cast<bool*>(data) = true;
204       }, &platform_finished);
205 
206       // The order of these calls is important; if the Isolate is first disposed
207       // and then unregistered, there is a race condition window in which no
208       // new Isolate at the same address can successfully be registered with
209       // the platform.
210       // (Refs: https://github.com/nodejs/node/issues/30846)
211       w_->platform_->UnregisterIsolate(isolate);
212       isolate->Dispose();
213 
214       // Wait until the platform has cleaned up all relevant resources.
215       while (!platform_finished) {
216         uv_run(&loop_, UV_RUN_ONCE);
217       }
218     }
219     if (!loop_init_failed_) {
220       CheckedUvLoopClose(&loop_);
221     }
222   }
223 
loop_is_usable() const224   bool loop_is_usable() const { return !loop_init_failed_; }
225 
226  private:
227   Worker* const w_;
228   uv_loop_t loop_;
229   bool loop_init_failed_ = true;
230   DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
231 
232   friend class Worker;
233 };
234 
NearHeapLimit(void * data,size_t current_heap_limit,size_t initial_heap_limit)235 size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
236                              size_t initial_heap_limit) {
237   Worker* worker = static_cast<Worker*>(data);
238   worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
239   // Give the current GC some extra leeway to let it finish rather than
240   // crash hard. We are not going to perform further allocations anyway.
241   constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
242   return current_heap_limit + kExtraHeapAllowance;
243 }
244 
Run()245 void Worker::Run() {
246   std::string name = "WorkerThread ";
247   name += std::to_string(thread_id_.id);
248   TRACE_EVENT_METADATA1(
249       "__metadata", "thread_name", "name",
250       TRACE_STR_COPY(name.c_str()));
251   CHECK_NOT_NULL(platform_);
252 
253   Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
254 
255   WorkerThreadData data(this);
256   if (isolate_ == nullptr) return;
257   CHECK(data.loop_is_usable());
258 
259   Debug(this, "Starting worker with id %llu", thread_id_.id);
260   {
261     Locker locker(isolate_);
262     Isolate::Scope isolate_scope(isolate_);
263     SealHandleScope outer_seal(isolate_);
264 
265     DeleteFnPtr<Environment, FreeEnvironment> env_;
266     auto cleanup_env = OnScopeLeave([&]() {
267       // TODO(addaleax): This call is harmless but should not be necessary.
268       // Figure out why V8 is raising a DCHECK() here without it
269       // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
270       isolate_->CancelTerminateExecution();
271 
272       if (!env_) return;
273       env_->set_can_call_into_js(false);
274 
275       {
276         Mutex::ScopedLock lock(mutex_);
277         stopped_ = true;
278         this->env_ = nullptr;
279       }
280 
281       // TODO(addaleax): Try moving DisallowJavascriptExecutionScope into
282       // FreeEnvironment().
283       Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
284           Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
285       env_.reset();
286     });
287 
288     if (is_stopped()) return;
289     {
290       HandleScope handle_scope(isolate_);
291       Local<Context> context;
292       {
293         // We create the Context object before we have an Environment* in place
294         // that we could use for error handling. If creation fails due to
295         // resource constraints, we need something in place to handle it,
296         // though.
297         TryCatch try_catch(isolate_);
298         context = NewContext(isolate_);
299         if (context.IsEmpty()) {
300           // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
301           // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
302           Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Context");
303           return;
304         }
305       }
306 
307       if (is_stopped()) return;
308       CHECK(!context.IsEmpty());
309       Context::Scope context_scope(context);
310       {
311         env_.reset(CreateEnvironment(
312             data.isolate_data_.get(),
313             context,
314             std::move(argv_),
315             std::move(exec_argv_),
316             static_cast<EnvironmentFlags::Flags>(environment_flags_),
317             thread_id_,
318             std::move(inspector_parent_handle_)));
319         if (is_stopped()) return;
320         CHECK_NOT_NULL(env_);
321         env_->set_env_vars(std::move(env_vars_));
322         SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
323           Exit(exit_code);
324         });
325       }
326       {
327         Mutex::ScopedLock lock(mutex_);
328         if (stopped_) return;
329         this->env_ = env_.get();
330       }
331       Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
332       if (is_stopped()) return;
333       {
334         CreateEnvMessagePort(env_.get());
335         Debug(this, "Created message port for worker %llu", thread_id_.id);
336         if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
337           return;
338 
339         Debug(this, "Loaded environment for worker %llu", thread_id_.id);
340       }
341 
342       if (is_stopped()) return;
343       {
344         SealHandleScope seal(isolate_);
345         bool more;
346         env_->performance_state()->Mark(
347             node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
348         do {
349           if (is_stopped()) break;
350           uv_run(&data.loop_, UV_RUN_DEFAULT);
351           if (is_stopped()) break;
352 
353           platform_->DrainTasks(isolate_);
354 
355           more = uv_loop_alive(&data.loop_);
356           if (more && !is_stopped()) continue;
357 
358           EmitBeforeExit(env_.get());
359 
360           // Emit `beforeExit` if the loop became alive either after emitting
361           // event, or after running some callbacks.
362           more = uv_loop_alive(&data.loop_);
363         } while (more == true && !is_stopped());
364         env_->performance_state()->Mark(
365             node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
366       }
367     }
368 
369     {
370       int exit_code;
371       bool stopped = is_stopped();
372       if (!stopped)
373         exit_code = EmitExit(env_.get());
374       Mutex::ScopedLock lock(mutex_);
375       if (exit_code_ == 0 && !stopped)
376         exit_code_ = exit_code;
377 
378       Debug(this, "Exiting thread for worker %llu with exit code %d",
379             thread_id_.id, exit_code_);
380     }
381   }
382 
383   Debug(this, "Worker %llu thread stops", thread_id_.id);
384 }
385 
CreateEnvMessagePort(Environment * env)386 void Worker::CreateEnvMessagePort(Environment* env) {
387   HandleScope handle_scope(isolate_);
388   Mutex::ScopedLock lock(mutex_);
389   // Set up the message channel for receiving messages in the child.
390   MessagePort* child_port = MessagePort::New(env,
391                                              env->context(),
392                                              std::move(child_port_data_));
393   // MessagePort::New() may return nullptr if execution is terminated
394   // within it.
395   if (child_port != nullptr)
396     env->set_message_port(child_port->object(isolate_));
397 }
398 
JoinThread()399 void Worker::JoinThread() {
400   if (thread_joined_)
401     return;
402   CHECK_EQ(uv_thread_join(&tid_), 0);
403   thread_joined_ = true;
404 
405   env()->remove_sub_worker_context(this);
406 
407   {
408     HandleScope handle_scope(env()->isolate());
409     Context::Scope context_scope(env()->context());
410 
411     // Reset the parent port as we're closing it now anyway.
412     object()->Set(env()->context(),
413                   env()->message_port_string(),
414                   Undefined(env()->isolate())).Check();
415 
416     Local<Value> args[] = {
417         Integer::New(env()->isolate(), exit_code_),
418         custom_error_ != nullptr
419             ? OneByteString(env()->isolate(), custom_error_).As<Value>()
420             : Null(env()->isolate()).As<Value>(),
421         !custom_error_str_.empty()
422             ? OneByteString(env()->isolate(), custom_error_str_.c_str())
423                   .As<Value>()
424             : Null(env()->isolate()).As<Value>(),
425     };
426 
427     MakeCallback(env()->onexit_string(), arraysize(args), args);
428   }
429 
430   // If we get here, the !thread_joined_ condition at the top of the function
431   // implies that the thread was running. In that case, its final action will
432   // be to schedule a callback on the parent thread which will delete this
433   // object, so there's nothing more to do here.
434 }
435 
~Worker()436 Worker::~Worker() {
437   Mutex::ScopedLock lock(mutex_);
438 
439   CHECK(stopped_);
440   CHECK_NULL(env_);
441   CHECK(thread_joined_);
442 
443   Debug(this, "Worker %llu destroyed", thread_id_.id);
444 }
445 
New(const FunctionCallbackInfo<Value> & args)446 void Worker::New(const FunctionCallbackInfo<Value>& args) {
447   Environment* env = Environment::GetCurrent(args);
448   Isolate* isolate = args.GetIsolate();
449 
450   CHECK(args.IsConstructCall());
451 
452   if (env->isolate_data()->platform() == nullptr) {
453     THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
454     return;
455   }
456 
457   std::string url;
458   std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
459   std::shared_ptr<KVStore> env_vars = nullptr;
460 
461   std::vector<std::string> exec_argv_out;
462 
463   // Argument might be a string or URL
464   if (!args[0]->IsNullOrUndefined()) {
465     Utf8Value value(
466         isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
467     url.append(value.out(), value.length());
468   }
469 
470   if (args[1]->IsNull()) {
471     // Means worker.env = { ...process.env }.
472     env_vars = env->env_vars()->Clone(isolate);
473   } else if (args[1]->IsObject()) {
474     // User provided env.
475     env_vars = KVStore::CreateMapKVStore();
476     env_vars->AssignFromObject(isolate->GetCurrentContext(),
477                                args[1].As<Object>());
478   } else {
479     // Env is shared.
480     env_vars = env->env_vars();
481   }
482 
483   if (args[1]->IsObject() || args[2]->IsArray()) {
484     per_isolate_opts.reset(new PerIsolateOptions());
485 
486     HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
487       return env_vars->Get(name).FromMaybe("");
488     });
489 
490 #ifndef NODE_WITHOUT_NODE_OPTIONS
491     MaybeLocal<String> maybe_node_opts =
492         env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
493     Local<String> node_opts;
494     if (maybe_node_opts.ToLocal(&node_opts)) {
495       std::string node_options(*String::Utf8Value(isolate, node_opts));
496       std::vector<std::string> errors{};
497       std::vector<std::string> env_argv =
498           ParseNodeOptionsEnvVar(node_options, &errors);
499       // [0] is expected to be the program name, add dummy string.
500       env_argv.insert(env_argv.begin(), "");
501       std::vector<std::string> invalid_args{};
502       options_parser::Parse(&env_argv,
503                             nullptr,
504                             &invalid_args,
505                             per_isolate_opts.get(),
506                             kAllowedInEnvironment,
507                             &errors);
508       if (!errors.empty() && args[1]->IsObject()) {
509         // Only fail for explicitly provided env, this protects from failures
510         // when NODE_OPTIONS from parent's env is used (which is the default).
511         Local<Value> error;
512         if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
513         Local<String> key =
514             FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
515         // Ignore the return value of Set() because exceptions bubble up to JS
516         // when we return anyway.
517         USE(args.This()->Set(env->context(), key, error));
518         return;
519       }
520     }
521 #endif
522   }
523 
524   if (args[2]->IsArray()) {
525     Local<Array> array = args[2].As<Array>();
526     // The first argument is reserved for program name, but we don't need it
527     // in workers.
528     std::vector<std::string> exec_argv = {""};
529     uint32_t length = array->Length();
530     for (uint32_t i = 0; i < length; i++) {
531       Local<Value> arg;
532       if (!array->Get(env->context(), i).ToLocal(&arg)) {
533         return;
534       }
535       Local<String> arg_v8;
536       if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
537         return;
538       }
539       Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
540       std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
541       exec_argv.push_back(arg_string);
542     }
543 
544     std::vector<std::string> invalid_args{};
545     std::vector<std::string> errors{};
546     // Using invalid_args as the v8_args argument as it stores unknown
547     // options for the per isolate parser.
548     options_parser::Parse(
549         &exec_argv,
550         &exec_argv_out,
551         &invalid_args,
552         per_isolate_opts.get(),
553         kDisallowedInEnvironment,
554         &errors);
555 
556     // The first argument is program name.
557     invalid_args.erase(invalid_args.begin());
558     if (errors.size() > 0 || invalid_args.size() > 0) {
559       Local<Value> error;
560       if (!ToV8Value(env->context(),
561                      errors.size() > 0 ? errors : invalid_args)
562                          .ToLocal(&error)) {
563         return;
564       }
565       Local<String> key =
566           FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
567       // Ignore the return value of Set() because exceptions bubble up to JS
568       // when we return anyway.
569       USE(args.This()->Set(env->context(), key, error));
570       return;
571     }
572   } else {
573     exec_argv_out = env->exec_argv();
574   }
575 
576   Worker* worker = new Worker(env,
577                               args.This(),
578                               url,
579                               per_isolate_opts,
580                               std::move(exec_argv_out),
581                               env_vars);
582 
583   CHECK(args[3]->IsFloat64Array());
584   Local<Float64Array> limit_info = args[3].As<Float64Array>();
585   CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
586   limit_info->CopyContents(worker->resource_limits_,
587                            sizeof(worker->resource_limits_));
588 
589   CHECK(args[4]->IsBoolean());
590   if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
591     worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
592 }
593 
StartThread(const FunctionCallbackInfo<Value> & args)594 void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
595   Worker* w;
596   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
597   Mutex::ScopedLock lock(w->mutex_);
598 
599   w->stopped_ = false;
600 
601   if (w->resource_limits_[kStackSizeMb] > 0) {
602     if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
603       w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
604       w->stack_size_ = kStackBufferSize;
605     } else {
606       w->stack_size_ = w->resource_limits_[kStackSizeMb] * kMB;
607     }
608   } else {
609     w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
610   }
611 
612   uv_thread_options_t thread_options;
613   thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
614   thread_options.stack_size = w->stack_size_;
615   int ret = uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
616     // XXX: This could become a std::unique_ptr, but that makes at least
617     // gcc 6.3 detect undefined behaviour when there shouldn't be any.
618     // gcc 7+ handles this well.
619     Worker* w = static_cast<Worker*>(arg);
620     const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
621 
622     // Leave a few kilobytes just to make sure we're within limits and have
623     // some space to do work in C++ land.
624     w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
625 
626     w->Run();
627 
628     Mutex::ScopedLock lock(w->mutex_);
629     w->env()->SetImmediateThreadsafe(
630         [w = std::unique_ptr<Worker>(w)](Environment* env) {
631           if (w->has_ref_)
632             env->add_refs(-1);
633           w->JoinThread();
634           // implicitly delete w
635         });
636   }, static_cast<void*>(w));
637 
638   if (ret == 0) {
639     // The object now owns the created thread and should not be garbage
640     // collected until that finishes.
641     w->ClearWeak();
642     w->thread_joined_ = false;
643 
644     if (w->has_ref_)
645       w->env()->add_refs(1);
646 
647     w->env()->add_sub_worker_context(w);
648   } else {
649     w->stopped_ = true;
650 
651     char err_buf[128];
652     uv_err_name_r(ret, err_buf, sizeof(err_buf));
653     {
654       Isolate* isolate = w->env()->isolate();
655       HandleScope handle_scope(isolate);
656       THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
657     }
658   }
659 }
660 
StopThread(const FunctionCallbackInfo<Value> & args)661 void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
662   Worker* w;
663   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
664 
665   Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
666   w->Exit(1);
667 }
668 
Ref(const FunctionCallbackInfo<Value> & args)669 void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
670   Worker* w;
671   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
672   if (!w->has_ref_ && !w->thread_joined_) {
673     w->has_ref_ = true;
674     w->env()->add_refs(1);
675   }
676 }
677 
Unref(const FunctionCallbackInfo<Value> & args)678 void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
679   Worker* w;
680   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
681   if (w->has_ref_ && !w->thread_joined_) {
682     w->has_ref_ = false;
683     w->env()->add_refs(-1);
684   }
685 }
686 
GetResourceLimits(const FunctionCallbackInfo<Value> & args)687 void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
688   Worker* w;
689   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
690   args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
691 }
692 
GetResourceLimits(Isolate * isolate) const693 Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
694   Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
695   memcpy(ab->GetContents().Data(), resource_limits_, sizeof(resource_limits_));
696   return Float64Array::New(ab, 0, kTotalResourceLimitCount);
697 }
698 
Exit(int code,const char * error_code,const char * error_message)699 void Worker::Exit(int code, const char* error_code, const char* error_message) {
700   Mutex::ScopedLock lock(mutex_);
701   Debug(this, "Worker %llu called Exit(%d, %s, %s)",
702         thread_id_.id, code, error_code, error_message);
703 
704   if (error_code != nullptr) {
705     custom_error_ = error_code;
706     custom_error_str_ = error_message;
707   }
708 
709   if (env_ != nullptr) {
710     exit_code_ = code;
711     Stop(env_);
712   } else {
713     stopped_ = true;
714   }
715 }
716 
MemoryInfo(MemoryTracker * tracker) const717 void Worker::MemoryInfo(MemoryTracker* tracker) const {
718   tracker->TrackField("parent_port", parent_port_);
719 }
720 
721 class WorkerHeapSnapshotTaker : public AsyncWrap {
722  public:
WorkerHeapSnapshotTaker(Environment * env,Local<Object> obj)723   WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
724     : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
725 
726   SET_NO_MEMORY_INFO()
727   SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
728   SET_SELF_SIZE(WorkerHeapSnapshotTaker)
729 };
730 
TakeHeapSnapshot(const FunctionCallbackInfo<Value> & args)731 void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
732   Worker* w;
733   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
734 
735   Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
736 
737   Environment* env = w->env();
738   AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
739   Local<Object> wrap;
740   if (!env->worker_heap_snapshot_taker_template()
741       ->NewInstance(env->context()).ToLocal(&wrap)) {
742     return;
743   }
744   BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
745       MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
746 
747   // Interrupt the worker thread and take a snapshot, then schedule a call
748   // on the parent thread that turns that snapshot into a readable stream.
749   bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
750     heap::HeapSnapshotPointer snapshot {
751         worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
752     CHECK(snapshot);
753     env->SetImmediateThreadsafe(
754         [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
755           HandleScope handle_scope(env->isolate());
756           Context::Scope context_scope(env->context());
757 
758           AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
759           BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
760               env, std::move(snapshot));
761           Local<Value> args[] = { stream->object() };
762           taker->MakeCallback(env->ondone_string(), arraysize(args), args);
763         }, CallbackFlags::kUnrefed);
764   });
765   args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
766 }
767 
768 namespace {
769 
770 // Return the MessagePort that is global for this Environment and communicates
771 // with the internal [kPort] port of the JS Worker class in the parent thread.
GetEnvMessagePort(const FunctionCallbackInfo<Value> & args)772 void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
773   Environment* env = Environment::GetCurrent(args);
774   Local<Object> port = env->message_port();
775   CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
776   if (!port.IsEmpty()) {
777     CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
778     args.GetReturnValue().Set(port);
779   }
780 }
781 
InitWorker(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)782 void InitWorker(Local<Object> target,
783                 Local<Value> unused,
784                 Local<Context> context,
785                 void* priv) {
786   Environment* env = Environment::GetCurrent(context);
787 
788   {
789     Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
790 
791     w->InstanceTemplate()->SetInternalFieldCount(
792         Worker::kInternalFieldCount);
793     w->Inherit(AsyncWrap::GetConstructorTemplate(env));
794 
795     env->SetProtoMethod(w, "startThread", Worker::StartThread);
796     env->SetProtoMethod(w, "stopThread", Worker::StopThread);
797     env->SetProtoMethod(w, "ref", Worker::Ref);
798     env->SetProtoMethod(w, "unref", Worker::Unref);
799     env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
800     env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
801 
802     Local<String> workerString =
803         FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
804     w->SetClassName(workerString);
805     target->Set(env->context(),
806                 workerString,
807                 w->GetFunction(env->context()).ToLocalChecked()).Check();
808   }
809 
810   {
811     Local<FunctionTemplate> wst = FunctionTemplate::New(env->isolate());
812 
813     wst->InstanceTemplate()->SetInternalFieldCount(
814         WorkerHeapSnapshotTaker::kInternalFieldCount);
815     wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
816 
817     Local<String> wst_string =
818         FIXED_ONE_BYTE_STRING(env->isolate(), "WorkerHeapSnapshotTaker");
819     wst->SetClassName(wst_string);
820     env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
821   }
822 
823   env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
824 
825   target
826       ->Set(env->context(),
827             env->thread_id_string(),
828             Number::New(env->isolate(), static_cast<double>(env->thread_id())))
829       .Check();
830 
831   target
832       ->Set(env->context(),
833             FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
834             Boolean::New(env->isolate(), env->is_main_thread()))
835       .Check();
836 
837   target
838       ->Set(env->context(),
839             FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
840             Boolean::New(env->isolate(), env->owns_process_state()))
841       .Check();
842 
843   if (!env->is_main_thread()) {
844     target
845         ->Set(env->context(),
846               FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
847               env->worker_context()->GetResourceLimits(env->isolate()))
848         .Check();
849   }
850 
851   NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
852   NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
853   NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
854   NODE_DEFINE_CONSTANT(target, kStackSizeMb);
855   NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
856 }
857 
858 }  // anonymous namespace
859 
860 }  // namespace worker
861 }  // namespace node
862 
863 NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
864