1 #include "node_worker.h"
2 #include "debug_utils-inl.h"
3 #include "histogram-inl.h"
4 #include "memory_tracker-inl.h"
5 #include "node_errors.h"
6 #include "node_external_reference.h"
7 #include "node_buffer.h"
8 #include "node_options-inl.h"
9 #include "node_perf.h"
10 #include "node_snapshot_builder.h"
11 #include "util-inl.h"
12 #include "async_wrap-inl.h"
13
14 #include <memory>
15 #include <string>
16 #include <vector>
17
18 using node::kAllowedInEnvvar;
19 using node::kDisallowedInEnvvar;
20 using v8::Array;
21 using v8::ArrayBuffer;
22 using v8::Boolean;
23 using v8::Context;
24 using v8::Float64Array;
25 using v8::FunctionCallbackInfo;
26 using v8::FunctionTemplate;
27 using v8::HandleScope;
28 using v8::Integer;
29 using v8::Isolate;
30 using v8::Local;
31 using v8::Locker;
32 using v8::Maybe;
33 using v8::MaybeLocal;
34 using v8::Null;
35 using v8::Number;
36 using v8::Object;
37 using v8::ResourceConstraints;
38 using v8::SealHandleScope;
39 using v8::String;
40 using v8::TryCatch;
41 using v8::Value;
42
43 namespace node {
44 namespace worker {
45
46 constexpr double kMB = 1024 * 1024;
47
Worker(Environment * env,Local<Object> wrap,const std::string & url,const std::string & name,std::shared_ptr<PerIsolateOptions> per_isolate_opts,std::vector<std::string> && exec_argv,std::shared_ptr<KVStore> env_vars,const SnapshotData * snapshot_data)48 Worker::Worker(Environment* env,
49 Local<Object> wrap,
50 const std::string& url,
51 const std::string& name,
52 std::shared_ptr<PerIsolateOptions> per_isolate_opts,
53 std::vector<std::string>&& exec_argv,
54 std::shared_ptr<KVStore> env_vars,
55 const SnapshotData* snapshot_data)
56 : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
57 per_isolate_opts_(per_isolate_opts),
58 exec_argv_(exec_argv),
59 platform_(env->isolate_data()->platform()),
60 thread_id_(AllocateEnvironmentThreadId()),
61 name_(name),
62 env_vars_(env_vars),
63 snapshot_data_(snapshot_data) {
64 Debug(this, "Creating new worker instance with thread id %llu",
65 thread_id_.id);
66
67 // Set up everything that needs to be set up in the parent environment.
68 MessagePort* parent_port = MessagePort::New(env, env->context());
69 if (parent_port == nullptr) {
70 // This can happen e.g. because execution is terminating.
71 return;
72 }
73
74 child_port_data_ = std::make_unique<MessagePortData>(nullptr);
75 MessagePort::Entangle(parent_port, child_port_data_.get());
76
77 object()
78 ->Set(env->context(), env->message_port_string(), parent_port->object())
79 .Check();
80
81 object()->Set(env->context(),
82 env->thread_id_string(),
83 Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
84 .Check();
85
86 inspector_parent_handle_ =
87 GetInspectorParentHandle(env, thread_id_, url.c_str(), name.c_str());
88
89 argv_ = std::vector<std::string>{env->argv()[0]};
90 // Mark this Worker object as weak until we actually start the thread.
91 MakeWeak();
92
93 Debug(this, "Preparation for worker %llu finished", thread_id_.id);
94 }
95
is_stopped() const96 bool Worker::is_stopped() const {
97 Mutex::ScopedLock lock(mutex_);
98 if (env_ != nullptr)
99 return env_->is_stopping();
100 return stopped_;
101 }
102
UpdateResourceConstraints(ResourceConstraints * constraints)103 void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
104 constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
105
106 if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
107 constraints->set_max_young_generation_size_in_bytes(
108 static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
109 } else {
110 resource_limits_[kMaxYoungGenerationSizeMb] =
111 constraints->max_young_generation_size_in_bytes() / kMB;
112 }
113
114 if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
115 constraints->set_max_old_generation_size_in_bytes(
116 static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
117 } else {
118 resource_limits_[kMaxOldGenerationSizeMb] =
119 constraints->max_old_generation_size_in_bytes() / kMB;
120 }
121
122 if (resource_limits_[kCodeRangeSizeMb] > 0) {
123 constraints->set_code_range_size_in_bytes(
124 static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
125 } else {
126 resource_limits_[kCodeRangeSizeMb] =
127 constraints->code_range_size_in_bytes() / kMB;
128 }
129 }
130
131 // This class contains data that is only relevant to the child thread itself,
132 // and only while it is running.
133 // (Eventually, the Environment instance should probably also be moved here.)
134 class WorkerThreadData {
135 public:
WorkerThreadData(Worker * w)136 explicit WorkerThreadData(Worker* w)
137 : w_(w) {
138 int ret = uv_loop_init(&loop_);
139 if (ret != 0) {
140 char err_buf[128];
141 uv_err_name_r(ret, err_buf, sizeof(err_buf));
142 w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
143 return;
144 }
145 loop_init_failed_ = false;
146 uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
147
148 std::shared_ptr<ArrayBufferAllocator> allocator =
149 ArrayBufferAllocator::Create();
150 Isolate::CreateParams params;
151 SetIsolateCreateParamsForNode(¶ms);
152 w->UpdateResourceConstraints(¶ms.constraints);
153 params.array_buffer_allocator_shared = allocator;
154 Isolate* isolate =
155 NewIsolate(¶ms, &loop_, w->platform_, w->snapshot_data());
156 if (isolate == nullptr) {
157 w->Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Isolate");
158 return;
159 }
160
161 SetIsolateUpForNode(isolate);
162
163 // Be sure it's called before Environment::InitializeDiagnostics()
164 // so that this callback stays when the callback of
165 // --heapsnapshot-near-heap-limit gets is popped.
166 isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
167
168 {
169 Locker locker(isolate);
170 Isolate::Scope isolate_scope(isolate);
171 // V8 computes its stack limit the first time a `Locker` is used based on
172 // --stack-size. Reset it to the correct value.
173 isolate->SetStackLimit(w->stack_base_);
174
175 HandleScope handle_scope(isolate);
176 isolate_data_.reset(CreateIsolateData(isolate,
177 &loop_,
178 w_->platform_,
179 allocator.get()));
180 CHECK(isolate_data_);
181 if (w_->per_isolate_opts_)
182 isolate_data_->set_options(std::move(w_->per_isolate_opts_));
183 isolate_data_->set_worker_context(w_);
184 isolate_data_->max_young_gen_size =
185 params.constraints.max_young_generation_size_in_bytes();
186 }
187
188 Mutex::ScopedLock lock(w_->mutex_);
189 w_->isolate_ = isolate;
190 }
191
~WorkerThreadData()192 ~WorkerThreadData() {
193 Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
194 Isolate* isolate;
195 {
196 Mutex::ScopedLock lock(w_->mutex_);
197 isolate = w_->isolate_;
198 w_->isolate_ = nullptr;
199 }
200
201 if (isolate != nullptr) {
202 CHECK(!loop_init_failed_);
203 bool platform_finished = false;
204
205 isolate_data_.reset();
206
207 w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
208 *static_cast<bool*>(data) = true;
209 }, &platform_finished);
210
211 // The order of these calls is important; if the Isolate is first disposed
212 // and then unregistered, there is a race condition window in which no
213 // new Isolate at the same address can successfully be registered with
214 // the platform.
215 // (Refs: https://github.com/nodejs/node/issues/30846)
216 w_->platform_->UnregisterIsolate(isolate);
217 isolate->Dispose();
218
219 // Wait until the platform has cleaned up all relevant resources.
220 while (!platform_finished) {
221 uv_run(&loop_, UV_RUN_ONCE);
222 }
223 }
224 if (!loop_init_failed_) {
225 CheckedUvLoopClose(&loop_);
226 }
227 }
228
loop_is_usable() const229 bool loop_is_usable() const { return !loop_init_failed_; }
230
231 private:
232 Worker* const w_;
233 uv_loop_t loop_;
234 bool loop_init_failed_ = true;
235 DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
236 const SnapshotData* snapshot_data_ = nullptr;
237 friend class Worker;
238 };
239
NearHeapLimit(void * data,size_t current_heap_limit,size_t initial_heap_limit)240 size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
241 size_t initial_heap_limit) {
242 Worker* worker = static_cast<Worker*>(data);
243 // Give the current GC some extra leeway to let it finish rather than
244 // crash hard. We are not going to perform further allocations anyway.
245 constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
246 size_t new_limit = current_heap_limit + kExtraHeapAllowance;
247 Environment* env = worker->env();
248 if (env != nullptr) {
249 DCHECK(!env->is_in_heapsnapshot_heap_limit_callback());
250 Debug(env,
251 DebugCategory::DIAGNOSTICS,
252 "Throwing ERR_WORKER_OUT_OF_MEMORY, "
253 "new_limit=%" PRIu64 "\n",
254 static_cast<uint64_t>(new_limit));
255 }
256 worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
257 return new_limit;
258 }
259
Run()260 void Worker::Run() {
261 std::string trace_name = "[worker " + std::to_string(thread_id_.id) + "]" +
262 (name_ == "" ? "" : " " + name_);
263 TRACE_EVENT_METADATA1(
264 "__metadata", "thread_name", "name", TRACE_STR_COPY(trace_name.c_str()));
265 CHECK_NOT_NULL(platform_);
266
267 Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
268
269 WorkerThreadData data(this);
270 if (isolate_ == nullptr) return;
271 CHECK(data.loop_is_usable());
272
273 Debug(this, "Starting worker with id %llu", thread_id_.id);
274 {
275 Locker locker(isolate_);
276 Isolate::Scope isolate_scope(isolate_);
277 SealHandleScope outer_seal(isolate_);
278
279 DeleteFnPtr<Environment, FreeEnvironment> env_;
280 auto cleanup_env = OnScopeLeave([&]() {
281 // TODO(addaleax): This call is harmless but should not be necessary.
282 // Figure out why V8 is raising a DCHECK() here without it
283 // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
284 isolate_->CancelTerminateExecution();
285
286 if (!env_) return;
287 env_->set_can_call_into_js(false);
288
289 {
290 Mutex::ScopedLock lock(mutex_);
291 stopped_ = true;
292 this->env_ = nullptr;
293 }
294
295 env_.reset();
296 });
297
298 if (is_stopped()) return;
299 {
300 HandleScope handle_scope(isolate_);
301 Local<Context> context;
302 {
303 // We create the Context object before we have an Environment* in place
304 // that we could use for error handling. If creation fails due to
305 // resource constraints, we need something in place to handle it,
306 // though.
307 TryCatch try_catch(isolate_);
308 if (snapshot_data_ != nullptr) {
309 context = Context::FromSnapshot(isolate_,
310 SnapshotData::kNodeBaseContextIndex)
311 .ToLocalChecked();
312 if (!context.IsEmpty() &&
313 !InitializeContextRuntime(context).IsJust()) {
314 context = Local<Context>();
315 }
316 } else {
317 context = NewContext(isolate_);
318 }
319 if (context.IsEmpty()) {
320 Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Context");
321 return;
322 }
323 }
324
325 if (is_stopped()) return;
326 CHECK(!context.IsEmpty());
327 Context::Scope context_scope(context);
328 {
329 env_.reset(CreateEnvironment(
330 data.isolate_data_.get(),
331 context,
332 std::move(argv_),
333 std::move(exec_argv_),
334 static_cast<EnvironmentFlags::Flags>(environment_flags_),
335 thread_id_,
336 std::move(inspector_parent_handle_)));
337 if (is_stopped()) return;
338 CHECK_NOT_NULL(env_);
339 env_->set_env_vars(std::move(env_vars_));
340 SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
341 Exit(exit_code);
342 });
343 }
344 {
345 Mutex::ScopedLock lock(mutex_);
346 if (stopped_) return;
347 this->env_ = env_.get();
348 }
349 Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
350 if (is_stopped()) return;
351 {
352 if (!CreateEnvMessagePort(env_.get())) {
353 return;
354 }
355
356 Debug(this, "Created message port for worker %llu", thread_id_.id);
357 if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
358 return;
359
360 Debug(this, "Loaded environment for worker %llu", thread_id_.id);
361 }
362 }
363
364 {
365 Maybe<int> exit_code = SpinEventLoop(env_.get());
366 Mutex::ScopedLock lock(mutex_);
367 if (exit_code_ == 0 && exit_code.IsJust()) {
368 exit_code_ = exit_code.FromJust();
369 }
370
371 Debug(this, "Exiting thread for worker %llu with exit code %d",
372 thread_id_.id, exit_code_);
373 }
374 }
375
376 Debug(this, "Worker %llu thread stops", thread_id_.id);
377 }
378
CreateEnvMessagePort(Environment * env)379 bool Worker::CreateEnvMessagePort(Environment* env) {
380 HandleScope handle_scope(isolate_);
381 std::unique_ptr<MessagePortData> data;
382 {
383 Mutex::ScopedLock lock(mutex_);
384 data = std::move(child_port_data_);
385 }
386
387 // Set up the message channel for receiving messages in the child.
388 MessagePort* child_port = MessagePort::New(env,
389 env->context(),
390 std::move(data));
391 // MessagePort::New() may return nullptr if execution is terminated
392 // within it.
393 if (child_port != nullptr)
394 env->set_message_port(child_port->object(isolate_));
395
396 return child_port;
397 }
398
JoinThread()399 void Worker::JoinThread() {
400 if (!tid_.has_value())
401 return;
402 CHECK_EQ(uv_thread_join(&tid_.value()), 0);
403 tid_.reset();
404
405 env()->remove_sub_worker_context(this);
406
407 {
408 HandleScope handle_scope(env()->isolate());
409 Context::Scope context_scope(env()->context());
410
411 // Reset the parent port as we're closing it now anyway.
412 object()->Set(env()->context(),
413 env()->message_port_string(),
414 Undefined(env()->isolate())).Check();
415
416 Local<Value> args[] = {
417 Integer::New(env()->isolate(), exit_code_),
418 custom_error_ != nullptr
419 ? OneByteString(env()->isolate(), custom_error_).As<Value>()
420 : Null(env()->isolate()).As<Value>(),
421 !custom_error_str_.empty()
422 ? OneByteString(env()->isolate(), custom_error_str_.c_str())
423 .As<Value>()
424 : Null(env()->isolate()).As<Value>(),
425 };
426
427 MakeCallback(env()->onexit_string(), arraysize(args), args);
428 }
429
430 // If we get here, the tid_.has_value() condition at the top of the function
431 // implies that the thread was running. In that case, its final action will
432 // be to schedule a callback on the parent thread which will delete this
433 // object, so there's nothing more to do here.
434 }
435
~Worker()436 Worker::~Worker() {
437 Mutex::ScopedLock lock(mutex_);
438
439 CHECK(stopped_);
440 CHECK_NULL(env_);
441 CHECK(!tid_.has_value());
442
443 Debug(this, "Worker %llu destroyed", thread_id_.id);
444 }
445
New(const FunctionCallbackInfo<Value> & args)446 void Worker::New(const FunctionCallbackInfo<Value>& args) {
447 Environment* env = Environment::GetCurrent(args);
448 Isolate* isolate = args.GetIsolate();
449
450 CHECK(args.IsConstructCall());
451
452 if (env->isolate_data()->platform() == nullptr) {
453 THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
454 return;
455 }
456
457 std::string url;
458 std::string name;
459 std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
460 std::shared_ptr<KVStore> env_vars = nullptr;
461
462 std::vector<std::string> exec_argv_out;
463
464 // Argument might be a string or URL
465 if (!args[0]->IsNullOrUndefined()) {
466 Utf8Value value(
467 isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
468 url.append(value.out(), value.length());
469 }
470
471 if (!args[5]->IsNullOrUndefined()) {
472 Utf8Value value(
473 isolate, args[5]->ToString(env->context()).FromMaybe(Local<String>()));
474 name.append(value.out(), value.length());
475 }
476
477 if (args[1]->IsNull()) {
478 // Means worker.env = { ...process.env }.
479 env_vars = env->env_vars()->Clone(isolate);
480 } else if (args[1]->IsObject()) {
481 // User provided env.
482 env_vars = KVStore::CreateMapKVStore();
483 env_vars->AssignFromObject(isolate->GetCurrentContext(),
484 args[1].As<Object>());
485 } else {
486 // Env is shared.
487 env_vars = env->env_vars();
488 }
489
490 if (args[1]->IsObject() || args[2]->IsArray()) {
491 per_isolate_opts.reset(new PerIsolateOptions());
492
493 HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
494 return env_vars->Get(name).FromMaybe("");
495 });
496
497 #ifndef NODE_WITHOUT_NODE_OPTIONS
498 MaybeLocal<String> maybe_node_opts =
499 env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
500 Local<String> node_opts;
501 if (maybe_node_opts.ToLocal(&node_opts)) {
502 std::string node_options(*String::Utf8Value(isolate, node_opts));
503 std::vector<std::string> errors{};
504 std::vector<std::string> env_argv =
505 ParseNodeOptionsEnvVar(node_options, &errors);
506 // [0] is expected to be the program name, add dummy string.
507 env_argv.insert(env_argv.begin(), "");
508 std::vector<std::string> invalid_args{};
509 options_parser::Parse(&env_argv,
510 nullptr,
511 &invalid_args,
512 per_isolate_opts.get(),
513 kAllowedInEnvvar,
514 &errors);
515 if (!errors.empty() && args[1]->IsObject()) {
516 // Only fail for explicitly provided env, this protects from failures
517 // when NODE_OPTIONS from parent's env is used (which is the default).
518 Local<Value> error;
519 if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
520 Local<String> key =
521 FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
522 // Ignore the return value of Set() because exceptions bubble up to JS
523 // when we return anyway.
524 USE(args.This()->Set(env->context(), key, error));
525 return;
526 }
527 }
528 #endif // NODE_WITHOUT_NODE_OPTIONS
529 }
530
531 if (args[2]->IsArray()) {
532 Local<Array> array = args[2].As<Array>();
533 // The first argument is reserved for program name, but we don't need it
534 // in workers.
535 std::vector<std::string> exec_argv = {""};
536 uint32_t length = array->Length();
537 for (uint32_t i = 0; i < length; i++) {
538 Local<Value> arg;
539 if (!array->Get(env->context(), i).ToLocal(&arg)) {
540 return;
541 }
542 Local<String> arg_v8;
543 if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
544 return;
545 }
546 Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
547 std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
548 exec_argv.push_back(arg_string);
549 }
550
551 std::vector<std::string> invalid_args{};
552 std::vector<std::string> errors{};
553 // Using invalid_args as the v8_args argument as it stores unknown
554 // options for the per isolate parser.
555 options_parser::Parse(&exec_argv,
556 &exec_argv_out,
557 &invalid_args,
558 per_isolate_opts.get(),
559 kDisallowedInEnvvar,
560 &errors);
561
562 // The first argument is program name.
563 invalid_args.erase(invalid_args.begin());
564 if (errors.size() > 0 || invalid_args.size() > 0) {
565 Local<Value> error;
566 if (!ToV8Value(env->context(),
567 errors.size() > 0 ? errors : invalid_args)
568 .ToLocal(&error)) {
569 return;
570 }
571 Local<String> key =
572 FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
573 // Ignore the return value of Set() because exceptions bubble up to JS
574 // when we return anyway.
575 USE(args.This()->Set(env->context(), key, error));
576 return;
577 }
578 } else {
579 exec_argv_out = env->exec_argv();
580 }
581
582 bool use_node_snapshot = per_process::cli_options->node_snapshot;
583 const SnapshotData* snapshot_data =
584 use_node_snapshot ? SnapshotBuilder::GetEmbeddedSnapshotData() : nullptr;
585
586 Worker* worker = new Worker(env,
587 args.This(),
588 url,
589 name,
590 per_isolate_opts,
591 std::move(exec_argv_out),
592 env_vars,
593 snapshot_data);
594
595 CHECK(args[3]->IsFloat64Array());
596 Local<Float64Array> limit_info = args[3].As<Float64Array>();
597 CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
598 limit_info->CopyContents(worker->resource_limits_,
599 sizeof(worker->resource_limits_));
600
601 CHECK(args[4]->IsBoolean());
602 if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
603 worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
604 if (env->hide_console_windows())
605 worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
606 if (env->no_native_addons())
607 worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
608 if (env->no_global_search_paths())
609 worker->environment_flags_ |= EnvironmentFlags::kNoGlobalSearchPaths;
610 if (env->no_browser_globals())
611 worker->environment_flags_ |= EnvironmentFlags::kNoBrowserGlobals;
612 }
613
StartThread(const FunctionCallbackInfo<Value> & args)614 void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
615 Worker* w;
616 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
617 Mutex::ScopedLock lock(w->mutex_);
618
619 w->stopped_ = false;
620
621 if (w->resource_limits_[kStackSizeMb] > 0) {
622 if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
623 w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
624 w->stack_size_ = kStackBufferSize;
625 } else {
626 w->stack_size_ =
627 static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
628 }
629 } else {
630 w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
631 }
632
633 uv_thread_options_t thread_options;
634 thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
635 thread_options.stack_size = w->stack_size_;
636
637 uv_thread_t* tid = &w->tid_.emplace(); // Create uv_thread_t instance
638 int ret = uv_thread_create_ex(tid, &thread_options, [](void* arg) {
639 // XXX: This could become a std::unique_ptr, but that makes at least
640 // gcc 6.3 detect undefined behaviour when there shouldn't be any.
641 // gcc 7+ handles this well.
642 Worker* w = static_cast<Worker*>(arg);
643 const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
644
645 // Leave a few kilobytes just to make sure we're within limits and have
646 // some space to do work in C++ land.
647 w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
648
649 w->Run();
650
651 Mutex::ScopedLock lock(w->mutex_);
652 w->env()->SetImmediateThreadsafe(
653 [w = std::unique_ptr<Worker>(w)](Environment* env) {
654 if (w->has_ref_)
655 env->add_refs(-1);
656 w->JoinThread();
657 // implicitly delete w
658 });
659 }, static_cast<void*>(w));
660
661 if (ret == 0) {
662 // The object now owns the created thread and should not be garbage
663 // collected until that finishes.
664 w->ClearWeak();
665
666 if (w->has_ref_)
667 w->env()->add_refs(1);
668
669 w->env()->add_sub_worker_context(w);
670 } else {
671 w->stopped_ = true;
672 w->tid_.reset();
673
674 char err_buf[128];
675 uv_err_name_r(ret, err_buf, sizeof(err_buf));
676 {
677 Isolate* isolate = w->env()->isolate();
678 HandleScope handle_scope(isolate);
679 THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
680 }
681 }
682 }
683
StopThread(const FunctionCallbackInfo<Value> & args)684 void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
685 Worker* w;
686 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
687
688 Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
689 w->Exit(1);
690 }
691
Ref(const FunctionCallbackInfo<Value> & args)692 void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
693 Worker* w;
694 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
695 if (!w->has_ref_ && w->tid_.has_value()) {
696 w->has_ref_ = true;
697 w->env()->add_refs(1);
698 }
699 }
700
HasRef(const FunctionCallbackInfo<Value> & args)701 void Worker::HasRef(const FunctionCallbackInfo<Value>& args) {
702 Worker* w;
703 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
704 args.GetReturnValue().Set(w->has_ref_);
705 }
706
Unref(const FunctionCallbackInfo<Value> & args)707 void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
708 Worker* w;
709 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
710 if (w->has_ref_ && w->tid_.has_value()) {
711 w->has_ref_ = false;
712 w->env()->add_refs(-1);
713 }
714 }
715
GetResourceLimits(const FunctionCallbackInfo<Value> & args)716 void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
717 Worker* w;
718 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
719 args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
720 }
721
GetResourceLimits(Isolate * isolate) const722 Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
723 Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
724
725 memcpy(ab->Data(), resource_limits_, sizeof(resource_limits_));
726 return Float64Array::New(ab, 0, kTotalResourceLimitCount);
727 }
728
Exit(int code,const char * error_code,const char * error_message)729 void Worker::Exit(int code, const char* error_code, const char* error_message) {
730 Mutex::ScopedLock lock(mutex_);
731 Debug(this, "Worker %llu called Exit(%d, %s, %s)",
732 thread_id_.id, code, error_code, error_message);
733
734 if (error_code != nullptr) {
735 custom_error_ = error_code;
736 custom_error_str_ = error_message;
737 }
738
739 if (env_ != nullptr) {
740 exit_code_ = code;
741 Stop(env_);
742 } else {
743 stopped_ = true;
744 }
745 }
746
IsNotIndicativeOfMemoryLeakAtExit() const747 bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
748 // Worker objects always stay alive as long as the child thread, regardless
749 // of whether they are being referenced in the parent thread.
750 return true;
751 }
752
753 class WorkerHeapSnapshotTaker : public AsyncWrap {
754 public:
WorkerHeapSnapshotTaker(Environment * env,Local<Object> obj)755 WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
756 : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
757
758 SET_NO_MEMORY_INFO()
759 SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
760 SET_SELF_SIZE(WorkerHeapSnapshotTaker)
761 };
762
TakeHeapSnapshot(const FunctionCallbackInfo<Value> & args)763 void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
764 Worker* w;
765 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
766
767 Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
768
769 Environment* env = w->env();
770 AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
771 Local<Object> wrap;
772 if (!env->worker_heap_snapshot_taker_template()
773 ->NewInstance(env->context()).ToLocal(&wrap)) {
774 return;
775 }
776
777 // The created WorkerHeapSnapshotTaker is an object owned by main
778 // thread's Isolate, it can not be accessed by worker thread
779 std::unique_ptr<BaseObjectPtr<WorkerHeapSnapshotTaker>> taker =
780 std::make_unique<BaseObjectPtr<WorkerHeapSnapshotTaker>>(
781 MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap));
782
783 // Interrupt the worker thread and take a snapshot, then schedule a call
784 // on the parent thread that turns that snapshot into a readable stream.
785 bool scheduled = w->RequestInterrupt([taker = std::move(taker),
786 env](Environment* worker_env) mutable {
787 heap::HeapSnapshotPointer snapshot{
788 worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot()};
789 CHECK(snapshot);
790
791 // Here, the worker thread temporarily owns the WorkerHeapSnapshotTaker
792 // object.
793
794 env->SetImmediateThreadsafe(
795 [taker = std::move(taker),
796 snapshot = std::move(snapshot)](Environment* env) mutable {
797 HandleScope handle_scope(env->isolate());
798 Context::Scope context_scope(env->context());
799
800 AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());
801 BaseObjectPtr<AsyncWrap> stream =
802 heap::CreateHeapSnapshotStream(env, std::move(snapshot));
803 Local<Value> args[] = {stream->object()};
804 taker->get()->MakeCallback(
805 env->ondone_string(), arraysize(args), args);
806 // implicitly delete `taker`
807 },
808 CallbackFlags::kUnrefed);
809
810 // Now, the lambda is delivered to the main thread, as a result, the
811 // WorkerHeapSnapshotTaker object is delivered to the main thread, too.
812 });
813
814 if (scheduled) {
815 args.GetReturnValue().Set(wrap);
816 } else {
817 args.GetReturnValue().Set(Local<Object>());
818 }
819 }
820
LoopIdleTime(const FunctionCallbackInfo<Value> & args)821 void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
822 Worker* w;
823 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
824
825 Mutex::ScopedLock lock(w->mutex_);
826 // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
827 // before locking the mutex is a race condition. So manually do the same
828 // check.
829 if (w->stopped_ || w->env_ == nullptr)
830 return args.GetReturnValue().Set(-1);
831
832 uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
833 args.GetReturnValue().Set(1.0 * idle_time / 1e6);
834 }
835
LoopStartTime(const FunctionCallbackInfo<Value> & args)836 void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
837 Worker* w;
838 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
839
840 Mutex::ScopedLock lock(w->mutex_);
841 // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
842 // before locking the mutex is a race condition. So manually do the same
843 // check.
844 if (w->stopped_ || w->env_ == nullptr)
845 return args.GetReturnValue().Set(-1);
846
847 double loop_start_time = w->env_->performance_state()->milestones[
848 node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
849 CHECK_GE(loop_start_time, 0);
850 args.GetReturnValue().Set(
851 (loop_start_time - node::performance::timeOrigin) / 1e6);
852 }
853
854 namespace {
855
856 // Return the MessagePort that is global for this Environment and communicates
857 // with the internal [kPort] port of the JS Worker class in the parent thread.
GetEnvMessagePort(const FunctionCallbackInfo<Value> & args)858 void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
859 Environment* env = Environment::GetCurrent(args);
860 Local<Object> port = env->message_port();
861 CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
862 if (!port.IsEmpty()) {
863 CHECK_EQ(port->GetCreationContext().ToLocalChecked()->GetIsolate(),
864 args.GetIsolate());
865 args.GetReturnValue().Set(port);
866 }
867 }
868
InitWorker(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)869 void InitWorker(Local<Object> target,
870 Local<Value> unused,
871 Local<Context> context,
872 void* priv) {
873 Environment* env = Environment::GetCurrent(context);
874 Isolate* isolate = env->isolate();
875
876 {
877 Local<FunctionTemplate> w = NewFunctionTemplate(isolate, Worker::New);
878
879 w->InstanceTemplate()->SetInternalFieldCount(
880 Worker::kInternalFieldCount);
881 w->Inherit(AsyncWrap::GetConstructorTemplate(env));
882
883 SetProtoMethod(isolate, w, "startThread", Worker::StartThread);
884 SetProtoMethod(isolate, w, "stopThread", Worker::StopThread);
885 SetProtoMethod(isolate, w, "hasRef", Worker::HasRef);
886 SetProtoMethod(isolate, w, "ref", Worker::Ref);
887 SetProtoMethod(isolate, w, "unref", Worker::Unref);
888 SetProtoMethod(isolate, w, "getResourceLimits", Worker::GetResourceLimits);
889 SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
890 SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
891 SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
892
893 SetConstructorFunction(context, target, "Worker", w);
894 }
895
896 {
897 Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
898
899 wst->InstanceTemplate()->SetInternalFieldCount(
900 WorkerHeapSnapshotTaker::kInternalFieldCount);
901 wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
902
903 Local<String> wst_string =
904 FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapSnapshotTaker");
905 wst->SetClassName(wst_string);
906 env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
907 }
908
909 SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort);
910
911 target
912 ->Set(env->context(),
913 env->thread_id_string(),
914 Number::New(isolate, static_cast<double>(env->thread_id())))
915 .Check();
916
917 target
918 ->Set(env->context(),
919 FIXED_ONE_BYTE_STRING(isolate, "isMainThread"),
920 Boolean::New(isolate, env->is_main_thread()))
921 .Check();
922
923 target
924 ->Set(env->context(),
925 FIXED_ONE_BYTE_STRING(isolate, "ownsProcessState"),
926 Boolean::New(isolate, env->owns_process_state()))
927 .Check();
928
929 if (!env->is_main_thread()) {
930 target
931 ->Set(env->context(),
932 FIXED_ONE_BYTE_STRING(isolate, "resourceLimits"),
933 env->worker_context()->GetResourceLimits(isolate))
934 .Check();
935 }
936
937 NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
938 NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
939 NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
940 NODE_DEFINE_CONSTANT(target, kStackSizeMb);
941 NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
942 }
943
RegisterExternalReferences(ExternalReferenceRegistry * registry)944 void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
945 registry->Register(GetEnvMessagePort);
946 registry->Register(Worker::New);
947 registry->Register(Worker::StartThread);
948 registry->Register(Worker::StopThread);
949 registry->Register(Worker::HasRef);
950 registry->Register(Worker::Ref);
951 registry->Register(Worker::Unref);
952 registry->Register(Worker::GetResourceLimits);
953 registry->Register(Worker::TakeHeapSnapshot);
954 registry->Register(Worker::LoopIdleTime);
955 registry->Register(Worker::LoopStartTime);
956 }
957
958 } // anonymous namespace
959 } // namespace worker
960 } // namespace node
961
962 NODE_BINDING_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
963 NODE_BINDING_EXTERNAL_REFERENCE(worker,
964 node::worker::RegisterExternalReferences)
965