• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "node_worker.h"
2 #include "debug_utils-inl.h"
3 #include "histogram-inl.h"
4 #include "memory_tracker-inl.h"
5 #include "node_errors.h"
6 #include "node_buffer.h"
7 #include "node_options-inl.h"
8 #include "node_perf.h"
9 #include "util-inl.h"
10 #include "async_wrap-inl.h"
11 
12 #include <memory>
13 #include <string>
14 #include <vector>
15 
16 using node::kAllowedInEnvironment;
17 using node::kDisallowedInEnvironment;
18 using v8::Array;
19 using v8::ArrayBuffer;
20 using v8::Boolean;
21 using v8::Context;
22 using v8::Float64Array;
23 using v8::FunctionCallbackInfo;
24 using v8::FunctionTemplate;
25 using v8::HandleScope;
26 using v8::Integer;
27 using v8::Isolate;
28 using v8::Local;
29 using v8::Locker;
30 using v8::MaybeLocal;
31 using v8::Null;
32 using v8::Number;
33 using v8::Object;
34 using v8::ResourceConstraints;
35 using v8::SealHandleScope;
36 using v8::String;
37 using v8::TryCatch;
38 using v8::Value;
39 
40 namespace node {
41 namespace worker {
42 
43 constexpr double kMB = 1024 * 1024;
44 
Worker(Environment * env,Local<Object> wrap,const std::string & url,std::shared_ptr<PerIsolateOptions> per_isolate_opts,std::vector<std::string> && exec_argv,std::shared_ptr<KVStore> env_vars)45 Worker::Worker(Environment* env,
46                Local<Object> wrap,
47                const std::string& url,
48                std::shared_ptr<PerIsolateOptions> per_isolate_opts,
49                std::vector<std::string>&& exec_argv,
50                std::shared_ptr<KVStore> env_vars)
51     : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
52       per_isolate_opts_(per_isolate_opts),
53       exec_argv_(exec_argv),
54       platform_(env->isolate_data()->platform()),
55       thread_id_(AllocateEnvironmentThreadId()),
56       env_vars_(env_vars) {
57   Debug(this, "Creating new worker instance with thread id %llu",
58         thread_id_.id);
59 
60   // Set up everything that needs to be set up in the parent environment.
61   parent_port_ = MessagePort::New(env, env->context());
62   if (parent_port_ == nullptr) {
63     // This can happen e.g. because execution is terminating.
64     return;
65   }
66 
67   child_port_data_ = std::make_unique<MessagePortData>(nullptr);
68   MessagePort::Entangle(parent_port_, child_port_data_.get());
69 
70   object()->Set(env->context(),
71                 env->message_port_string(),
72                 parent_port_->object()).Check();
73 
74   object()->Set(env->context(),
75                 env->thread_id_string(),
76                 Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
77       .Check();
78 
79   inspector_parent_handle_ = GetInspectorParentHandle(
80       env, thread_id_, url.c_str());
81 
82   argv_ = std::vector<std::string>{env->argv()[0]};
83   // Mark this Worker object as weak until we actually start the thread.
84   MakeWeak();
85 
86   Debug(this, "Preparation for worker %llu finished", thread_id_.id);
87 }
88 
is_stopped() const89 bool Worker::is_stopped() const {
90   Mutex::ScopedLock lock(mutex_);
91   if (env_ != nullptr)
92     return env_->is_stopping();
93   return stopped_;
94 }
95 
UpdateResourceConstraints(ResourceConstraints * constraints)96 void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
97   constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
98 
99   if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
100     constraints->set_max_young_generation_size_in_bytes(
101         resource_limits_[kMaxYoungGenerationSizeMb] * kMB);
102   } else {
103     resource_limits_[kMaxYoungGenerationSizeMb] =
104         constraints->max_young_generation_size_in_bytes() / kMB;
105   }
106 
107   if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
108     constraints->set_max_old_generation_size_in_bytes(
109         resource_limits_[kMaxOldGenerationSizeMb] * kMB);
110   } else {
111     resource_limits_[kMaxOldGenerationSizeMb] =
112         constraints->max_old_generation_size_in_bytes() / kMB;
113   }
114 
115   if (resource_limits_[kCodeRangeSizeMb] > 0) {
116     constraints->set_code_range_size_in_bytes(
117         resource_limits_[kCodeRangeSizeMb] * kMB);
118   } else {
119     resource_limits_[kCodeRangeSizeMb] =
120         constraints->code_range_size_in_bytes() / kMB;
121   }
122 }
123 
124 // This class contains data that is only relevant to the child thread itself,
125 // and only while it is running.
126 // (Eventually, the Environment instance should probably also be moved here.)
127 class WorkerThreadData {
128  public:
WorkerThreadData(Worker * w)129   explicit WorkerThreadData(Worker* w)
130     : w_(w) {
131     int ret = uv_loop_init(&loop_);
132     if (ret != 0) {
133       char err_buf[128];
134       uv_err_name_r(ret, err_buf, sizeof(err_buf));
135       w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
136       return;
137     }
138     loop_init_failed_ = false;
139     uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
140 
141     std::shared_ptr<ArrayBufferAllocator> allocator =
142         ArrayBufferAllocator::Create();
143     Isolate::CreateParams params;
144     SetIsolateCreateParamsForNode(&params);
145     params.array_buffer_allocator_shared = allocator;
146 
147     w->UpdateResourceConstraints(&params.constraints);
148 
149     Isolate* isolate = Isolate::Allocate();
150     if (isolate == nullptr) {
151       // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
152       // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
153       w->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Isolate");
154       return;
155     }
156 
157     w->platform_->RegisterIsolate(isolate, &loop_);
158     Isolate::Initialize(isolate, params);
159     SetIsolateUpForNode(isolate);
160 
161     // Be sure it's called before Environment::InitializeDiagnostics()
162     // so that this callback stays when the callback of
163     // --heapsnapshot-near-heap-limit gets is popped.
164     isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
165 
166     {
167       Locker locker(isolate);
168       Isolate::Scope isolate_scope(isolate);
169       // V8 computes its stack limit the first time a `Locker` is used based on
170       // --stack-size. Reset it to the correct value.
171       isolate->SetStackLimit(w->stack_base_);
172 
173       HandleScope handle_scope(isolate);
174       isolate_data_.reset(CreateIsolateData(isolate,
175                                             &loop_,
176                                             w_->platform_,
177                                             allocator.get()));
178       CHECK(isolate_data_);
179       if (w_->per_isolate_opts_)
180         isolate_data_->set_options(std::move(w_->per_isolate_opts_));
181       isolate_data_->set_worker_context(w_);
182       isolate_data_->max_young_gen_size =
183           params.constraints.max_young_generation_size_in_bytes();
184     }
185 
186     Mutex::ScopedLock lock(w_->mutex_);
187     w_->isolate_ = isolate;
188   }
189 
~WorkerThreadData()190   ~WorkerThreadData() {
191     Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
192     Isolate* isolate;
193     {
194       Mutex::ScopedLock lock(w_->mutex_);
195       isolate = w_->isolate_;
196       w_->isolate_ = nullptr;
197     }
198 
199     if (isolate != nullptr) {
200       CHECK(!loop_init_failed_);
201       bool platform_finished = false;
202 
203       isolate_data_.reset();
204 
205       w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
206         *static_cast<bool*>(data) = true;
207       }, &platform_finished);
208 
209       // The order of these calls is important; if the Isolate is first disposed
210       // and then unregistered, there is a race condition window in which no
211       // new Isolate at the same address can successfully be registered with
212       // the platform.
213       // (Refs: https://github.com/nodejs/node/issues/30846)
214       w_->platform_->UnregisterIsolate(isolate);
215       isolate->Dispose();
216 
217       // Wait until the platform has cleaned up all relevant resources.
218       while (!platform_finished) {
219         uv_run(&loop_, UV_RUN_ONCE);
220       }
221     }
222     if (!loop_init_failed_) {
223       CheckedUvLoopClose(&loop_);
224     }
225   }
226 
loop_is_usable() const227   bool loop_is_usable() const { return !loop_init_failed_; }
228 
229  private:
230   Worker* const w_;
231   uv_loop_t loop_;
232   bool loop_init_failed_ = true;
233   DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
234 
235   friend class Worker;
236 };
237 
NearHeapLimit(void * data,size_t current_heap_limit,size_t initial_heap_limit)238 size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
239                              size_t initial_heap_limit) {
240   Worker* worker = static_cast<Worker*>(data);
241   worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
242   // Give the current GC some extra leeway to let it finish rather than
243   // crash hard. We are not going to perform further allocations anyway.
244   constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
245   return current_heap_limit + kExtraHeapAllowance;
246 }
247 
Run()248 void Worker::Run() {
249   std::string name = "WorkerThread ";
250   name += std::to_string(thread_id_.id);
251   TRACE_EVENT_METADATA1(
252       "__metadata", "thread_name", "name",
253       TRACE_STR_COPY(name.c_str()));
254   CHECK_NOT_NULL(platform_);
255 
256   Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
257 
258   WorkerThreadData data(this);
259   if (isolate_ == nullptr) return;
260   CHECK(data.loop_is_usable());
261 
262   Debug(this, "Starting worker with id %llu", thread_id_.id);
263   {
264     Locker locker(isolate_);
265     Isolate::Scope isolate_scope(isolate_);
266     SealHandleScope outer_seal(isolate_);
267 
268     DeleteFnPtr<Environment, FreeEnvironment> env_;
269     auto cleanup_env = OnScopeLeave([&]() {
270       // TODO(addaleax): This call is harmless but should not be necessary.
271       // Figure out why V8 is raising a DCHECK() here without it
272       // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
273       isolate_->CancelTerminateExecution();
274 
275       if (!env_) return;
276       env_->set_can_call_into_js(false);
277 
278       {
279         Mutex::ScopedLock lock(mutex_);
280         stopped_ = true;
281         this->env_ = nullptr;
282       }
283 
284       // TODO(addaleax): Try moving DisallowJavascriptExecutionScope into
285       // FreeEnvironment().
286       Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
287           Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
288       env_.reset();
289     });
290 
291     if (is_stopped()) return;
292     {
293       HandleScope handle_scope(isolate_);
294       Local<Context> context;
295       {
296         // We create the Context object before we have an Environment* in place
297         // that we could use for error handling. If creation fails due to
298         // resource constraints, we need something in place to handle it,
299         // though.
300         TryCatch try_catch(isolate_);
301         context = NewContext(isolate_);
302         if (context.IsEmpty()) {
303           // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
304           // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
305           Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Context");
306           return;
307         }
308       }
309 
310       if (is_stopped()) return;
311       CHECK(!context.IsEmpty());
312       Context::Scope context_scope(context);
313       {
314         env_.reset(CreateEnvironment(
315             data.isolate_data_.get(),
316             context,
317             std::move(argv_),
318             std::move(exec_argv_),
319             static_cast<EnvironmentFlags::Flags>(environment_flags_),
320             thread_id_,
321             std::move(inspector_parent_handle_)));
322         if (is_stopped()) return;
323         CHECK_NOT_NULL(env_);
324         env_->set_env_vars(std::move(env_vars_));
325         SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
326           Exit(exit_code);
327         });
328       }
329       {
330         Mutex::ScopedLock lock(mutex_);
331         if (stopped_) return;
332         this->env_ = env_.get();
333       }
334       Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
335       if (is_stopped()) return;
336       {
337         if (!CreateEnvMessagePort(env_.get())) {
338           return;
339         }
340 
341         Debug(this, "Created message port for worker %llu", thread_id_.id);
342         if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
343           return;
344 
345         Debug(this, "Loaded environment for worker %llu", thread_id_.id);
346       }
347 
348       if (is_stopped()) return;
349       {
350         SealHandleScope seal(isolate_);
351         bool more;
352         env_->performance_state()->Mark(
353             node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
354         do {
355           if (is_stopped()) break;
356           uv_run(&data.loop_, UV_RUN_DEFAULT);
357           if (is_stopped()) break;
358 
359           platform_->DrainTasks(isolate_);
360 
361           more = uv_loop_alive(&data.loop_);
362           if (more && !is_stopped()) continue;
363 
364           if (EmitProcessBeforeExit(env_.get()).IsNothing())
365             break;
366 
367           // Emit `beforeExit` if the loop became alive either after emitting
368           // event, or after running some callbacks.
369           more = uv_loop_alive(&data.loop_);
370         } while (more == true && !is_stopped());
371         env_->performance_state()->Mark(
372             node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
373       }
374     }
375 
376     {
377       int exit_code;
378       bool stopped = is_stopped();
379       if (!stopped) {
380         env_->VerifyNoStrongBaseObjects();
381         exit_code = EmitProcessExit(env_.get()).FromMaybe(1);
382       }
383       Mutex::ScopedLock lock(mutex_);
384       if (exit_code_ == 0 && !stopped)
385         exit_code_ = exit_code;
386 
387       Debug(this, "Exiting thread for worker %llu with exit code %d",
388             thread_id_.id, exit_code_);
389     }
390   }
391 
392   Debug(this, "Worker %llu thread stops", thread_id_.id);
393 }
394 
CreateEnvMessagePort(Environment * env)395 bool Worker::CreateEnvMessagePort(Environment* env) {
396   HandleScope handle_scope(isolate_);
397   std::unique_ptr<MessagePortData> data;
398   {
399     Mutex::ScopedLock lock(mutex_);
400     data = std::move(child_port_data_);
401   }
402 
403   // Set up the message channel for receiving messages in the child.
404   MessagePort* child_port = MessagePort::New(env,
405                                              env->context(),
406                                              std::move(data));
407   // MessagePort::New() may return nullptr if execution is terminated
408   // within it.
409   if (child_port != nullptr)
410     env->set_message_port(child_port->object(isolate_));
411 
412   return child_port;
413 }
414 
JoinThread()415 void Worker::JoinThread() {
416   if (thread_joined_)
417     return;
418   CHECK_EQ(uv_thread_join(&tid_), 0);
419   thread_joined_ = true;
420 
421   env()->remove_sub_worker_context(this);
422 
423   {
424     HandleScope handle_scope(env()->isolate());
425     Context::Scope context_scope(env()->context());
426 
427     // Reset the parent port as we're closing it now anyway.
428     object()->Set(env()->context(),
429                   env()->message_port_string(),
430                   Undefined(env()->isolate())).Check();
431 
432     Local<Value> args[] = {
433         Integer::New(env()->isolate(), exit_code_),
434         custom_error_ != nullptr
435             ? OneByteString(env()->isolate(), custom_error_).As<Value>()
436             : Null(env()->isolate()).As<Value>(),
437         !custom_error_str_.empty()
438             ? OneByteString(env()->isolate(), custom_error_str_.c_str())
439                   .As<Value>()
440             : Null(env()->isolate()).As<Value>(),
441     };
442 
443     MakeCallback(env()->onexit_string(), arraysize(args), args);
444   }
445 
446   // If we get here, the !thread_joined_ condition at the top of the function
447   // implies that the thread was running. In that case, its final action will
448   // be to schedule a callback on the parent thread which will delete this
449   // object, so there's nothing more to do here.
450 }
451 
~Worker()452 Worker::~Worker() {
453   Mutex::ScopedLock lock(mutex_);
454 
455   CHECK(stopped_);
456   CHECK_NULL(env_);
457   CHECK(thread_joined_);
458 
459   Debug(this, "Worker %llu destroyed", thread_id_.id);
460 }
461 
New(const FunctionCallbackInfo<Value> & args)462 void Worker::New(const FunctionCallbackInfo<Value>& args) {
463   Environment* env = Environment::GetCurrent(args);
464   Isolate* isolate = args.GetIsolate();
465 
466   CHECK(args.IsConstructCall());
467 
468   if (env->isolate_data()->platform() == nullptr) {
469     THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
470     return;
471   }
472 
473   std::string url;
474   std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
475   std::shared_ptr<KVStore> env_vars = nullptr;
476 
477   std::vector<std::string> exec_argv_out;
478 
479   // Argument might be a string or URL
480   if (!args[0]->IsNullOrUndefined()) {
481     Utf8Value value(
482         isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
483     url.append(value.out(), value.length());
484   }
485 
486   if (args[1]->IsNull()) {
487     // Means worker.env = { ...process.env }.
488     env_vars = env->env_vars()->Clone(isolate);
489   } else if (args[1]->IsObject()) {
490     // User provided env.
491     env_vars = KVStore::CreateMapKVStore();
492     env_vars->AssignFromObject(isolate->GetCurrentContext(),
493                                args[1].As<Object>());
494   } else {
495     // Env is shared.
496     env_vars = env->env_vars();
497   }
498 
499   if (args[1]->IsObject() || args[2]->IsArray()) {
500     per_isolate_opts.reset(new PerIsolateOptions());
501 
502     HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
503       return env_vars->Get(name).FromMaybe("");
504     });
505 
506 #ifndef NODE_WITHOUT_NODE_OPTIONS
507     MaybeLocal<String> maybe_node_opts =
508         env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
509     Local<String> node_opts;
510     if (maybe_node_opts.ToLocal(&node_opts)) {
511       std::string node_options(*String::Utf8Value(isolate, node_opts));
512       std::vector<std::string> errors{};
513       std::vector<std::string> env_argv =
514           ParseNodeOptionsEnvVar(node_options, &errors);
515       // [0] is expected to be the program name, add dummy string.
516       env_argv.insert(env_argv.begin(), "");
517       std::vector<std::string> invalid_args{};
518       options_parser::Parse(&env_argv,
519                             nullptr,
520                             &invalid_args,
521                             per_isolate_opts.get(),
522                             kAllowedInEnvironment,
523                             &errors);
524       if (!errors.empty() && args[1]->IsObject()) {
525         // Only fail for explicitly provided env, this protects from failures
526         // when NODE_OPTIONS from parent's env is used (which is the default).
527         Local<Value> error;
528         if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
529         Local<String> key =
530             FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
531         // Ignore the return value of Set() because exceptions bubble up to JS
532         // when we return anyway.
533         USE(args.This()->Set(env->context(), key, error));
534         return;
535       }
536     }
537 #endif
538   }
539 
540   if (args[2]->IsArray()) {
541     Local<Array> array = args[2].As<Array>();
542     // The first argument is reserved for program name, but we don't need it
543     // in workers.
544     std::vector<std::string> exec_argv = {""};
545     uint32_t length = array->Length();
546     for (uint32_t i = 0; i < length; i++) {
547       Local<Value> arg;
548       if (!array->Get(env->context(), i).ToLocal(&arg)) {
549         return;
550       }
551       Local<String> arg_v8;
552       if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
553         return;
554       }
555       Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
556       std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
557       exec_argv.push_back(arg_string);
558     }
559 
560     std::vector<std::string> invalid_args{};
561     std::vector<std::string> errors{};
562     // Using invalid_args as the v8_args argument as it stores unknown
563     // options for the per isolate parser.
564     options_parser::Parse(
565         &exec_argv,
566         &exec_argv_out,
567         &invalid_args,
568         per_isolate_opts.get(),
569         kDisallowedInEnvironment,
570         &errors);
571 
572     // The first argument is program name.
573     invalid_args.erase(invalid_args.begin());
574     if (errors.size() > 0 || invalid_args.size() > 0) {
575       Local<Value> error;
576       if (!ToV8Value(env->context(),
577                      errors.size() > 0 ? errors : invalid_args)
578                          .ToLocal(&error)) {
579         return;
580       }
581       Local<String> key =
582           FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
583       // Ignore the return value of Set() because exceptions bubble up to JS
584       // when we return anyway.
585       USE(args.This()->Set(env->context(), key, error));
586       return;
587     }
588   } else {
589     exec_argv_out = env->exec_argv();
590   }
591 
592   Worker* worker = new Worker(env,
593                               args.This(),
594                               url,
595                               per_isolate_opts,
596                               std::move(exec_argv_out),
597                               env_vars);
598 
599   CHECK(args[3]->IsFloat64Array());
600   Local<Float64Array> limit_info = args[3].As<Float64Array>();
601   CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
602   limit_info->CopyContents(worker->resource_limits_,
603                            sizeof(worker->resource_limits_));
604 
605   CHECK(args[4]->IsBoolean());
606   if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
607     worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
608   if (env->hide_console_windows())
609     worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
610   if (env->no_native_addons())
611     worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
612 }
613 
StartThread(const FunctionCallbackInfo<Value> & args)614 void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
615   Worker* w;
616   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
617   Mutex::ScopedLock lock(w->mutex_);
618 
619   w->stopped_ = false;
620 
621   if (w->resource_limits_[kStackSizeMb] > 0) {
622     if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
623       w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
624       w->stack_size_ = kStackBufferSize;
625     } else {
626       w->stack_size_ = w->resource_limits_[kStackSizeMb] * kMB;
627     }
628   } else {
629     w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
630   }
631 
632   uv_thread_options_t thread_options;
633   thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
634   thread_options.stack_size = w->stack_size_;
635   int ret = uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
636     // XXX: This could become a std::unique_ptr, but that makes at least
637     // gcc 6.3 detect undefined behaviour when there shouldn't be any.
638     // gcc 7+ handles this well.
639     Worker* w = static_cast<Worker*>(arg);
640     const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
641 
642     // Leave a few kilobytes just to make sure we're within limits and have
643     // some space to do work in C++ land.
644     w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
645 
646     w->Run();
647 
648     Mutex::ScopedLock lock(w->mutex_);
649     w->env()->SetImmediateThreadsafe(
650         [w = std::unique_ptr<Worker>(w)](Environment* env) {
651           if (w->has_ref_)
652             env->add_refs(-1);
653           w->JoinThread();
654           // implicitly delete w
655         });
656   }, static_cast<void*>(w));
657 
658   if (ret == 0) {
659     // The object now owns the created thread and should not be garbage
660     // collected until that finishes.
661     w->ClearWeak();
662     w->thread_joined_ = false;
663 
664     if (w->has_ref_)
665       w->env()->add_refs(1);
666 
667     w->env()->add_sub_worker_context(w);
668   } else {
669     w->stopped_ = true;
670 
671     char err_buf[128];
672     uv_err_name_r(ret, err_buf, sizeof(err_buf));
673     {
674       Isolate* isolate = w->env()->isolate();
675       HandleScope handle_scope(isolate);
676       THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
677     }
678   }
679 }
680 
StopThread(const FunctionCallbackInfo<Value> & args)681 void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
682   Worker* w;
683   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
684 
685   Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
686   w->Exit(1);
687 }
688 
Ref(const FunctionCallbackInfo<Value> & args)689 void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
690   Worker* w;
691   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
692   if (!w->has_ref_ && !w->thread_joined_) {
693     w->has_ref_ = true;
694     w->env()->add_refs(1);
695   }
696 }
697 
Unref(const FunctionCallbackInfo<Value> & args)698 void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
699   Worker* w;
700   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
701   if (w->has_ref_ && !w->thread_joined_) {
702     w->has_ref_ = false;
703     w->env()->add_refs(-1);
704   }
705 }
706 
GetResourceLimits(const FunctionCallbackInfo<Value> & args)707 void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
708   Worker* w;
709   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
710   args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
711 }
712 
GetResourceLimits(Isolate * isolate) const713 Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
714   Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
715 
716   memcpy(ab->GetBackingStore()->Data(),
717          resource_limits_,
718          sizeof(resource_limits_));
719   return Float64Array::New(ab, 0, kTotalResourceLimitCount);
720 }
721 
Exit(int code,const char * error_code,const char * error_message)722 void Worker::Exit(int code, const char* error_code, const char* error_message) {
723   Mutex::ScopedLock lock(mutex_);
724   Debug(this, "Worker %llu called Exit(%d, %s, %s)",
725         thread_id_.id, code, error_code, error_message);
726 
727   if (error_code != nullptr) {
728     custom_error_ = error_code;
729     custom_error_str_ = error_message;
730   }
731 
732   if (env_ != nullptr) {
733     exit_code_ = code;
734     Stop(env_);
735   } else {
736     stopped_ = true;
737   }
738 }
739 
MemoryInfo(MemoryTracker * tracker) const740 void Worker::MemoryInfo(MemoryTracker* tracker) const {
741   tracker->TrackField("parent_port", parent_port_);
742 }
743 
IsNotIndicativeOfMemoryLeakAtExit() const744 bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
745   // Worker objects always stay alive as long as the child thread, regardless
746   // of whether they are being referenced in the parent thread.
747   return true;
748 }
749 
750 class WorkerHeapSnapshotTaker : public AsyncWrap {
751  public:
WorkerHeapSnapshotTaker(Environment * env,Local<Object> obj)752   WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
753     : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
754 
755   SET_NO_MEMORY_INFO()
756   SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
757   SET_SELF_SIZE(WorkerHeapSnapshotTaker)
758 };
759 
TakeHeapSnapshot(const FunctionCallbackInfo<Value> & args)760 void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
761   Worker* w;
762   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
763 
764   Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
765 
766   Environment* env = w->env();
767   AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
768   Local<Object> wrap;
769   if (!env->worker_heap_snapshot_taker_template()
770       ->NewInstance(env->context()).ToLocal(&wrap)) {
771     return;
772   }
773   BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
774       MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
775 
776   // Interrupt the worker thread and take a snapshot, then schedule a call
777   // on the parent thread that turns that snapshot into a readable stream.
778   bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
779     heap::HeapSnapshotPointer snapshot {
780         worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
781     CHECK(snapshot);
782     env->SetImmediateThreadsafe(
783         [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
784           HandleScope handle_scope(env->isolate());
785           Context::Scope context_scope(env->context());
786 
787           AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
788           BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
789               env, std::move(snapshot));
790           Local<Value> args[] = { stream->object() };
791           taker->MakeCallback(env->ondone_string(), arraysize(args), args);
792         }, CallbackFlags::kUnrefed);
793   });
794   args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
795 }
796 
LoopIdleTime(const FunctionCallbackInfo<Value> & args)797 void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
798   Worker* w;
799   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
800 
801   Mutex::ScopedLock lock(w->mutex_);
802   // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
803   // before locking the mutex is a race condition. So manually do the same
804   // check.
805   if (w->stopped_ || w->env_ == nullptr)
806     return args.GetReturnValue().Set(-1);
807 
808   uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
809   args.GetReturnValue().Set(1.0 * idle_time / 1e6);
810 }
811 
LoopStartTime(const FunctionCallbackInfo<Value> & args)812 void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
813   Worker* w;
814   ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
815 
816   Mutex::ScopedLock lock(w->mutex_);
817   // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
818   // before locking the mutex is a race condition. So manually do the same
819   // check.
820   if (w->stopped_ || w->env_ == nullptr)
821     return args.GetReturnValue().Set(-1);
822 
823   double loop_start_time = w->env_->performance_state()->milestones[
824       node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
825   CHECK_GE(loop_start_time, 0);
826   args.GetReturnValue().Set(
827       (loop_start_time - node::performance::timeOrigin) / 1e6);
828 }
829 
830 namespace {
831 
832 // Return the MessagePort that is global for this Environment and communicates
833 // with the internal [kPort] port of the JS Worker class in the parent thread.
GetEnvMessagePort(const FunctionCallbackInfo<Value> & args)834 void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
835   Environment* env = Environment::GetCurrent(args);
836   Local<Object> port = env->message_port();
837   CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
838   if (!port.IsEmpty()) {
839     CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
840     args.GetReturnValue().Set(port);
841   }
842 }
843 
InitWorker(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)844 void InitWorker(Local<Object> target,
845                 Local<Value> unused,
846                 Local<Context> context,
847                 void* priv) {
848   Environment* env = Environment::GetCurrent(context);
849 
850   {
851     Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
852 
853     w->InstanceTemplate()->SetInternalFieldCount(
854         Worker::kInternalFieldCount);
855     w->Inherit(AsyncWrap::GetConstructorTemplate(env));
856 
857     env->SetProtoMethod(w, "startThread", Worker::StartThread);
858     env->SetProtoMethod(w, "stopThread", Worker::StopThread);
859     env->SetProtoMethod(w, "ref", Worker::Ref);
860     env->SetProtoMethod(w, "unref", Worker::Unref);
861     env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
862     env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
863     env->SetProtoMethod(w, "loopIdleTime", Worker::LoopIdleTime);
864     env->SetProtoMethod(w, "loopStartTime", Worker::LoopStartTime);
865 
866     env->SetConstructorFunction(target, "Worker", w);
867   }
868 
869   {
870     Local<FunctionTemplate> wst = FunctionTemplate::New(env->isolate());
871 
872     wst->InstanceTemplate()->SetInternalFieldCount(
873         WorkerHeapSnapshotTaker::kInternalFieldCount);
874     wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
875 
876     Local<String> wst_string =
877         FIXED_ONE_BYTE_STRING(env->isolate(), "WorkerHeapSnapshotTaker");
878     wst->SetClassName(wst_string);
879     env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
880   }
881 
882   env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
883 
884   target
885       ->Set(env->context(),
886             env->thread_id_string(),
887             Number::New(env->isolate(), static_cast<double>(env->thread_id())))
888       .Check();
889 
890   target
891       ->Set(env->context(),
892             FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
893             Boolean::New(env->isolate(), env->is_main_thread()))
894       .Check();
895 
896   target
897       ->Set(env->context(),
898             FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
899             Boolean::New(env->isolate(), env->owns_process_state()))
900       .Check();
901 
902   if (!env->is_main_thread()) {
903     target
904         ->Set(env->context(),
905               FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
906               env->worker_context()->GetResourceLimits(env->isolate()))
907         .Check();
908   }
909 
910   NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
911   NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
912   NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
913   NODE_DEFINE_CONSTANT(target, kStackSizeMb);
914   NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
915 }
916 
917 }  // anonymous namespace
918 
919 }  // namespace worker
920 }  // namespace node
921 
922 NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
923