1 #include "node_worker.h"
2 #include "debug_utils-inl.h"
3 #include "histogram-inl.h"
4 #include "memory_tracker-inl.h"
5 #include "node_errors.h"
6 #include "node_external_reference.h"
7 #include "node_buffer.h"
8 #include "node_options-inl.h"
9 #include "node_perf.h"
10 #include "node_snapshot_builder.h"
11 #include "util-inl.h"
12 #include "async_wrap-inl.h"
13
14 #include <memory>
15 #include <string>
16 #include <vector>
17
18 using node::kAllowedInEnvvar;
19 using node::kDisallowedInEnvvar;
20 using v8::Array;
21 using v8::ArrayBuffer;
22 using v8::Boolean;
23 using v8::Context;
24 using v8::Float64Array;
25 using v8::FunctionCallbackInfo;
26 using v8::FunctionTemplate;
27 using v8::HandleScope;
28 using v8::Integer;
29 using v8::Isolate;
30 using v8::Local;
31 using v8::Locker;
32 using v8::Maybe;
33 using v8::MaybeLocal;
34 using v8::Null;
35 using v8::Number;
36 using v8::Object;
37 using v8::ResourceConstraints;
38 using v8::SealHandleScope;
39 using v8::String;
40 using v8::TryCatch;
41 using v8::Value;
42
43 namespace node {
44 namespace worker {
45
46 constexpr double kMB = 1024 * 1024;
47
Worker(Environment * env,Local<Object> wrap,const std::string & url,const std::string & name,std::shared_ptr<PerIsolateOptions> per_isolate_opts,std::vector<std::string> && exec_argv,std::shared_ptr<KVStore> env_vars,const SnapshotData * snapshot_data)48 Worker::Worker(Environment* env,
49 Local<Object> wrap,
50 const std::string& url,
51 const std::string& name,
52 std::shared_ptr<PerIsolateOptions> per_isolate_opts,
53 std::vector<std::string>&& exec_argv,
54 std::shared_ptr<KVStore> env_vars,
55 const SnapshotData* snapshot_data)
56 : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
57 per_isolate_opts_(per_isolate_opts),
58 exec_argv_(exec_argv),
59 platform_(env->isolate_data()->platform()),
60 thread_id_(AllocateEnvironmentThreadId()),
61 name_(name),
62 env_vars_(env_vars),
63 snapshot_data_(snapshot_data) {
64 Debug(this, "Creating new worker instance with thread id %llu",
65 thread_id_.id);
66
67 // Set up everything that needs to be set up in the parent environment.
68 MessagePort* parent_port = MessagePort::New(env, env->context());
69 if (parent_port == nullptr) {
70 // This can happen e.g. because execution is terminating.
71 return;
72 }
73
74 child_port_data_ = std::make_unique<MessagePortData>(nullptr);
75 MessagePort::Entangle(parent_port, child_port_data_.get());
76
77 object()
78 ->Set(env->context(), env->message_port_string(), parent_port->object())
79 .Check();
80
81 object()->Set(env->context(),
82 env->thread_id_string(),
83 Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
84 .Check();
85
86 inspector_parent_handle_ =
87 GetInspectorParentHandle(env, thread_id_, url.c_str(), name.c_str());
88
89 argv_ = std::vector<std::string>{env->argv()[0]};
90 // Mark this Worker object as weak until we actually start the thread.
91 MakeWeak();
92
93 Debug(this, "Preparation for worker %llu finished", thread_id_.id);
94 }
95
is_stopped() const96 bool Worker::is_stopped() const {
97 Mutex::ScopedLock lock(mutex_);
98 if (env_ != nullptr)
99 return env_->is_stopping();
100 return stopped_;
101 }
102
UpdateResourceConstraints(ResourceConstraints * constraints)103 void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
104 constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
105
106 if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
107 constraints->set_max_young_generation_size_in_bytes(
108 static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
109 } else {
110 resource_limits_[kMaxYoungGenerationSizeMb] =
111 constraints->max_young_generation_size_in_bytes() / kMB;
112 }
113
114 if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
115 constraints->set_max_old_generation_size_in_bytes(
116 static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
117 } else {
118 resource_limits_[kMaxOldGenerationSizeMb] =
119 constraints->max_old_generation_size_in_bytes() / kMB;
120 }
121
122 if (resource_limits_[kCodeRangeSizeMb] > 0) {
123 constraints->set_code_range_size_in_bytes(
124 static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
125 } else {
126 resource_limits_[kCodeRangeSizeMb] =
127 constraints->code_range_size_in_bytes() / kMB;
128 }
129 }
130
131 // This class contains data that is only relevant to the child thread itself,
132 // and only while it is running.
133 // (Eventually, the Environment instance should probably also be moved here.)
134 class WorkerThreadData {
135 public:
WorkerThreadData(Worker * w)136 explicit WorkerThreadData(Worker* w)
137 : w_(w) {
138 int ret = uv_loop_init(&loop_);
139 if (ret != 0) {
140 char err_buf[128];
141 uv_err_name_r(ret, err_buf, sizeof(err_buf));
142 w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
143 return;
144 }
145 loop_init_failed_ = false;
146 uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
147
148 std::shared_ptr<ArrayBufferAllocator> allocator =
149 ArrayBufferAllocator::Create();
150 Isolate::CreateParams params;
151 SetIsolateCreateParamsForNode(¶ms);
152 w->UpdateResourceConstraints(¶ms.constraints);
153 params.array_buffer_allocator_shared = allocator;
154 Isolate* isolate =
155 NewIsolate(¶ms, &loop_, w->platform_, w->snapshot_data());
156 if (isolate == nullptr) {
157 w->Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Isolate");
158 return;
159 }
160
161 SetIsolateUpForNode(isolate);
162
163 // Be sure it's called before Environment::InitializeDiagnostics()
164 // so that this callback stays when the callback of
165 // --heapsnapshot-near-heap-limit gets is popped.
166 isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
167
168 {
169 Locker locker(isolate);
170 Isolate::Scope isolate_scope(isolate);
171 // V8 computes its stack limit the first time a `Locker` is used based on
172 // --stack-size. Reset it to the correct value.
173 isolate->SetStackLimit(w->stack_base_);
174
175 HandleScope handle_scope(isolate);
176 isolate_data_.reset(CreateIsolateData(isolate,
177 &loop_,
178 w_->platform_,
179 allocator.get()));
180 CHECK(isolate_data_);
181 if (w_->per_isolate_opts_)
182 isolate_data_->set_options(std::move(w_->per_isolate_opts_));
183 isolate_data_->set_worker_context(w_);
184 isolate_data_->max_young_gen_size =
185 params.constraints.max_young_generation_size_in_bytes();
186 }
187
188 Mutex::ScopedLock lock(w_->mutex_);
189 w_->isolate_ = isolate;
190 }
191
~WorkerThreadData()192 ~WorkerThreadData() {
193 Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
194 Isolate* isolate;
195 {
196 Mutex::ScopedLock lock(w_->mutex_);
197 isolate = w_->isolate_;
198 w_->isolate_ = nullptr;
199 }
200
201 if (isolate != nullptr) {
202 CHECK(!loop_init_failed_);
203 bool platform_finished = false;
204
205 isolate_data_.reset();
206
207 w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
208 *static_cast<bool*>(data) = true;
209 }, &platform_finished);
210
211 // The order of these calls is important; if the Isolate is first disposed
212 // and then unregistered, there is a race condition window in which no
213 // new Isolate at the same address can successfully be registered with
214 // the platform.
215 // (Refs: https://github.com/nodejs/node/issues/30846)
216 w_->platform_->UnregisterIsolate(isolate);
217 isolate->Dispose();
218
219 // Wait until the platform has cleaned up all relevant resources.
220 while (!platform_finished) {
221 uv_run(&loop_, UV_RUN_ONCE);
222 }
223 }
224 if (!loop_init_failed_) {
225 CheckedUvLoopClose(&loop_);
226 }
227 }
228
loop_is_usable() const229 bool loop_is_usable() const { return !loop_init_failed_; }
230
231 private:
232 Worker* const w_;
233 uv_loop_t loop_;
234 bool loop_init_failed_ = true;
235 DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
236 const SnapshotData* snapshot_data_ = nullptr;
237 friend class Worker;
238 };
239
NearHeapLimit(void * data,size_t current_heap_limit,size_t initial_heap_limit)240 size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
241 size_t initial_heap_limit) {
242 Worker* worker = static_cast<Worker*>(data);
243 // Give the current GC some extra leeway to let it finish rather than
244 // crash hard. We are not going to perform further allocations anyway.
245 constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
246 size_t new_limit = current_heap_limit + kExtraHeapAllowance;
247 Environment* env = worker->env();
248 if (env != nullptr) {
249 DCHECK(!env->is_in_heapsnapshot_heap_limit_callback());
250 Debug(env,
251 DebugCategory::DIAGNOSTICS,
252 "Throwing ERR_WORKER_OUT_OF_MEMORY, "
253 "new_limit=%" PRIu64 "\n",
254 static_cast<uint64_t>(new_limit));
255 }
256 worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
257 return new_limit;
258 }
259
Run()260 void Worker::Run() {
261 std::string trace_name = "[worker " + std::to_string(thread_id_.id) + "]" +
262 (name_ == "" ? "" : " " + name_);
263 TRACE_EVENT_METADATA1(
264 "__metadata", "thread_name", "name", TRACE_STR_COPY(trace_name.c_str()));
265 CHECK_NOT_NULL(platform_);
266
267 Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
268
269 WorkerThreadData data(this);
270 if (isolate_ == nullptr) return;
271 CHECK(data.loop_is_usable());
272
273 Debug(this, "Starting worker with id %llu", thread_id_.id);
274 {
275 Locker locker(isolate_);
276 Isolate::Scope isolate_scope(isolate_);
277 SealHandleScope outer_seal(isolate_);
278
279 DeleteFnPtr<Environment, FreeEnvironment> env_;
280 auto cleanup_env = OnScopeLeave([&]() {
281 // TODO(addaleax): This call is harmless but should not be necessary.
282 // Figure out why V8 is raising a DCHECK() here without it
283 // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
284 isolate_->CancelTerminateExecution();
285
286 if (!env_) return;
287 env_->set_can_call_into_js(false);
288
289 {
290 Mutex::ScopedLock lock(mutex_);
291 stopped_ = true;
292 this->env_ = nullptr;
293 }
294
295 env_.reset();
296 });
297
298 if (is_stopped()) return;
299 {
300 HandleScope handle_scope(isolate_);
301 Local<Context> context;
302 {
303 // We create the Context object before we have an Environment* in place
304 // that we could use for error handling. If creation fails due to
305 // resource constraints, we need something in place to handle it,
306 // though.
307 TryCatch try_catch(isolate_);
308 if (snapshot_data_ != nullptr) {
309 context = Context::FromSnapshot(isolate_,
310 SnapshotData::kNodeBaseContextIndex)
311 .ToLocalChecked();
312 if (!context.IsEmpty() &&
313 !InitializeContextRuntime(context).IsJust()) {
314 context = Local<Context>();
315 }
316 } else {
317 context = NewContext(isolate_);
318 }
319 if (context.IsEmpty()) {
320 Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Context");
321 return;
322 }
323 }
324
325 if (is_stopped()) return;
326 CHECK(!context.IsEmpty());
327 Context::Scope context_scope(context);
328 {
329 env_.reset(CreateEnvironment(
330 data.isolate_data_.get(),
331 context,
332 std::move(argv_),
333 std::move(exec_argv_),
334 static_cast<EnvironmentFlags::Flags>(environment_flags_),
335 thread_id_,
336 std::move(inspector_parent_handle_)));
337 if (is_stopped()) return;
338 CHECK_NOT_NULL(env_);
339 env_->set_env_vars(std::move(env_vars_));
340 SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
341 Exit(exit_code);
342 });
343 }
344 {
345 Mutex::ScopedLock lock(mutex_);
346 if (stopped_) return;
347 this->env_ = env_.get();
348 }
349 Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
350 if (is_stopped()) return;
351 {
352 if (!CreateEnvMessagePort(env_.get())) {
353 return;
354 }
355
356 Debug(this, "Created message port for worker %llu", thread_id_.id);
357 if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
358 return;
359
360 Debug(this, "Loaded environment for worker %llu", thread_id_.id);
361 }
362 }
363
364 {
365 Maybe<int> exit_code = SpinEventLoop(env_.get());
366 Mutex::ScopedLock lock(mutex_);
367 if (exit_code_ == 0 && exit_code.IsJust()) {
368 exit_code_ = exit_code.FromJust();
369 }
370
371 Debug(this, "Exiting thread for worker %llu with exit code %d",
372 thread_id_.id, exit_code_);
373 }
374 }
375
376 Debug(this, "Worker %llu thread stops", thread_id_.id);
377 }
378
CreateEnvMessagePort(Environment * env)379 bool Worker::CreateEnvMessagePort(Environment* env) {
380 HandleScope handle_scope(isolate_);
381 std::unique_ptr<MessagePortData> data;
382 {
383 Mutex::ScopedLock lock(mutex_);
384 data = std::move(child_port_data_);
385 }
386
387 // Set up the message channel for receiving messages in the child.
388 MessagePort* child_port = MessagePort::New(env,
389 env->context(),
390 std::move(data));
391 // MessagePort::New() may return nullptr if execution is terminated
392 // within it.
393 if (child_port != nullptr)
394 env->set_message_port(child_port->object(isolate_));
395
396 return child_port;
397 }
398
JoinThread()399 void Worker::JoinThread() {
400 if (!tid_.has_value())
401 return;
402 CHECK_EQ(uv_thread_join(&tid_.value()), 0);
403 tid_.reset();
404
405 env()->remove_sub_worker_context(this);
406
407 {
408 HandleScope handle_scope(env()->isolate());
409 Context::Scope context_scope(env()->context());
410
411 // Reset the parent port as we're closing it now anyway.
412 object()->Set(env()->context(),
413 env()->message_port_string(),
414 Undefined(env()->isolate())).Check();
415
416 Local<Value> args[] = {
417 Integer::New(env()->isolate(), exit_code_),
418 custom_error_ != nullptr
419 ? OneByteString(env()->isolate(), custom_error_).As<Value>()
420 : Null(env()->isolate()).As<Value>(),
421 !custom_error_str_.empty()
422 ? OneByteString(env()->isolate(), custom_error_str_.c_str())
423 .As<Value>()
424 : Null(env()->isolate()).As<Value>(),
425 };
426
427 MakeCallback(env()->onexit_string(), arraysize(args), args);
428 }
429
430 // If we get here, the tid_.has_value() condition at the top of the function
431 // implies that the thread was running. In that case, its final action will
432 // be to schedule a callback on the parent thread which will delete this
433 // object, so there's nothing more to do here.
434 }
435
~Worker()436 Worker::~Worker() {
437 Mutex::ScopedLock lock(mutex_);
438
439 CHECK(stopped_);
440 CHECK_NULL(env_);
441 CHECK(!tid_.has_value());
442
443 Debug(this, "Worker %llu destroyed", thread_id_.id);
444 }
445
New(const FunctionCallbackInfo<Value> & args)446 void Worker::New(const FunctionCallbackInfo<Value>& args) {
447 Environment* env = Environment::GetCurrent(args);
448 auto is_internal = args[5];
449 CHECK(is_internal->IsBoolean());
450 Isolate* isolate = args.GetIsolate();
451
452 CHECK(args.IsConstructCall());
453
454 if (env->isolate_data()->platform() == nullptr) {
455 THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
456 return;
457 }
458
459 std::string url;
460 std::string name;
461 std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
462 std::shared_ptr<KVStore> env_vars = nullptr;
463
464 std::vector<std::string> exec_argv_out;
465
466 // Argument might be a string or URL
467 if (!args[0]->IsNullOrUndefined()) {
468 Utf8Value value(
469 isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
470 url.append(value.out(), value.length());
471 }
472
473 if (!args[6]->IsNullOrUndefined()) {
474 Utf8Value value(
475 isolate, args[6]->ToString(env->context()).FromMaybe(Local<String>()));
476 name.append(value.out(), value.length());
477 }
478
479 if (args[1]->IsNull()) {
480 // Means worker.env = { ...process.env }.
481 env_vars = env->env_vars()->Clone(isolate);
482 } else if (args[1]->IsObject()) {
483 // User provided env.
484 env_vars = KVStore::CreateMapKVStore();
485 env_vars->AssignFromObject(isolate->GetCurrentContext(),
486 args[1].As<Object>());
487 } else {
488 // Env is shared.
489 env_vars = env->env_vars();
490 }
491
492 if (args[1]->IsObject() || args[2]->IsArray()) {
493 per_isolate_opts.reset(new PerIsolateOptions());
494
495 HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
496 return env_vars->Get(name).FromMaybe("");
497 });
498
499 #ifndef NODE_WITHOUT_NODE_OPTIONS
500 MaybeLocal<String> maybe_node_opts =
501 env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
502 Local<String> node_opts;
503 if (maybe_node_opts.ToLocal(&node_opts)) {
504 std::string node_options(*String::Utf8Value(isolate, node_opts));
505 std::vector<std::string> errors{};
506 std::vector<std::string> env_argv =
507 ParseNodeOptionsEnvVar(node_options, &errors);
508 // [0] is expected to be the program name, add dummy string.
509 env_argv.insert(env_argv.begin(), "");
510 std::vector<std::string> invalid_args{};
511 options_parser::Parse(&env_argv,
512 nullptr,
513 &invalid_args,
514 per_isolate_opts.get(),
515 kAllowedInEnvvar,
516 &errors);
517 if (!errors.empty() && args[1]->IsObject()) {
518 // Only fail for explicitly provided env, this protects from failures
519 // when NODE_OPTIONS from parent's env is used (which is the default).
520 Local<Value> error;
521 if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
522 Local<String> key =
523 FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
524 // Ignore the return value of Set() because exceptions bubble up to JS
525 // when we return anyway.
526 USE(args.This()->Set(env->context(), key, error));
527 return;
528 }
529 }
530 #endif // NODE_WITHOUT_NODE_OPTIONS
531 }
532
533 if (args[2]->IsArray()) {
534 Local<Array> array = args[2].As<Array>();
535 // The first argument is reserved for program name, but we don't need it
536 // in workers.
537 std::vector<std::string> exec_argv = {""};
538 uint32_t length = array->Length();
539 for (uint32_t i = 0; i < length; i++) {
540 Local<Value> arg;
541 if (!array->Get(env->context(), i).ToLocal(&arg)) {
542 return;
543 }
544 Local<String> arg_v8;
545 if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
546 return;
547 }
548 Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
549 std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
550 exec_argv.push_back(arg_string);
551 }
552
553 std::vector<std::string> invalid_args{};
554 std::vector<std::string> errors{};
555 // Using invalid_args as the v8_args argument as it stores unknown
556 // options for the per isolate parser.
557 options_parser::Parse(&exec_argv,
558 &exec_argv_out,
559 &invalid_args,
560 per_isolate_opts.get(),
561 kDisallowedInEnvvar,
562 &errors);
563
564 // The first argument is program name.
565 invalid_args.erase(invalid_args.begin());
566 if (errors.size() > 0 || invalid_args.size() > 0) {
567 Local<Value> error;
568 if (!ToV8Value(env->context(),
569 errors.size() > 0 ? errors : invalid_args)
570 .ToLocal(&error)) {
571 return;
572 }
573 Local<String> key =
574 FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
575 // Ignore the return value of Set() because exceptions bubble up to JS
576 // when we return anyway.
577 USE(args.This()->Set(env->context(), key, error));
578 return;
579 }
580 } else {
581 exec_argv_out = env->exec_argv();
582 }
583
584 bool use_node_snapshot = per_process::cli_options->node_snapshot;
585 const SnapshotData* snapshot_data =
586 use_node_snapshot ? SnapshotBuilder::GetEmbeddedSnapshotData() : nullptr;
587
588 Worker* worker = new Worker(env,
589 args.This(),
590 url,
591 name,
592 per_isolate_opts,
593 std::move(exec_argv_out),
594 env_vars,
595 snapshot_data);
596
597 CHECK(args[3]->IsFloat64Array());
598 Local<Float64Array> limit_info = args[3].As<Float64Array>();
599 CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
600 limit_info->CopyContents(worker->resource_limits_,
601 sizeof(worker->resource_limits_));
602
603 CHECK(args[4]->IsBoolean());
604 if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
605 worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
606 if (env->hide_console_windows())
607 worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
608 if (env->no_native_addons())
609 worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
610 if (env->no_global_search_paths())
611 worker->environment_flags_ |= EnvironmentFlags::kNoGlobalSearchPaths;
612 if (env->no_browser_globals())
613 worker->environment_flags_ |= EnvironmentFlags::kNoBrowserGlobals;
614 }
615
StartThread(const FunctionCallbackInfo<Value> & args)616 void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
617 Worker* w;
618 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
619 Mutex::ScopedLock lock(w->mutex_);
620
621 w->stopped_ = false;
622
623 if (w->resource_limits_[kStackSizeMb] > 0) {
624 if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
625 w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
626 w->stack_size_ = kStackBufferSize;
627 } else {
628 w->stack_size_ =
629 static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
630 }
631 } else {
632 w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
633 }
634
635 uv_thread_options_t thread_options;
636 thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
637 thread_options.stack_size = w->stack_size_;
638
639 uv_thread_t* tid = &w->tid_.emplace(); // Create uv_thread_t instance
640 int ret = uv_thread_create_ex(tid, &thread_options, [](void* arg) {
641 // XXX: This could become a std::unique_ptr, but that makes at least
642 // gcc 6.3 detect undefined behaviour when there shouldn't be any.
643 // gcc 7+ handles this well.
644 Worker* w = static_cast<Worker*>(arg);
645 const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
646
647 // Leave a few kilobytes just to make sure we're within limits and have
648 // some space to do work in C++ land.
649 w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
650
651 w->Run();
652
653 Mutex::ScopedLock lock(w->mutex_);
654 w->env()->SetImmediateThreadsafe(
655 [w = std::unique_ptr<Worker>(w)](Environment* env) {
656 if (w->has_ref_)
657 env->add_refs(-1);
658 w->JoinThread();
659 // implicitly delete w
660 });
661 }, static_cast<void*>(w));
662
663 if (ret == 0) {
664 // The object now owns the created thread and should not be garbage
665 // collected until that finishes.
666 w->ClearWeak();
667
668 if (w->has_ref_)
669 w->env()->add_refs(1);
670
671 w->env()->add_sub_worker_context(w);
672 } else {
673 w->stopped_ = true;
674 w->tid_.reset();
675
676 char err_buf[128];
677 uv_err_name_r(ret, err_buf, sizeof(err_buf));
678 {
679 Isolate* isolate = w->env()->isolate();
680 HandleScope handle_scope(isolate);
681 THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
682 }
683 }
684 }
685
StopThread(const FunctionCallbackInfo<Value> & args)686 void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
687 Worker* w;
688 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
689
690 Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
691 w->Exit(1);
692 }
693
Ref(const FunctionCallbackInfo<Value> & args)694 void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
695 Worker* w;
696 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
697 if (!w->has_ref_ && w->tid_.has_value()) {
698 w->has_ref_ = true;
699 w->env()->add_refs(1);
700 }
701 }
702
HasRef(const FunctionCallbackInfo<Value> & args)703 void Worker::HasRef(const FunctionCallbackInfo<Value>& args) {
704 Worker* w;
705 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
706 args.GetReturnValue().Set(w->has_ref_);
707 }
708
Unref(const FunctionCallbackInfo<Value> & args)709 void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
710 Worker* w;
711 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
712 if (w->has_ref_ && w->tid_.has_value()) {
713 w->has_ref_ = false;
714 w->env()->add_refs(-1);
715 }
716 }
717
GetResourceLimits(const FunctionCallbackInfo<Value> & args)718 void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
719 Worker* w;
720 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
721 args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
722 }
723
GetResourceLimits(Isolate * isolate) const724 Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
725 Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
726
727 memcpy(ab->Data(), resource_limits_, sizeof(resource_limits_));
728 return Float64Array::New(ab, 0, kTotalResourceLimitCount);
729 }
730
Exit(int code,const char * error_code,const char * error_message)731 void Worker::Exit(int code, const char* error_code, const char* error_message) {
732 Mutex::ScopedLock lock(mutex_);
733 Debug(this, "Worker %llu called Exit(%d, %s, %s)",
734 thread_id_.id, code, error_code, error_message);
735
736 if (error_code != nullptr) {
737 custom_error_ = error_code;
738 custom_error_str_ = error_message;
739 }
740
741 if (env_ != nullptr) {
742 exit_code_ = code;
743 Stop(env_);
744 } else {
745 stopped_ = true;
746 }
747 }
748
IsNotIndicativeOfMemoryLeakAtExit() const749 bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
750 // Worker objects always stay alive as long as the child thread, regardless
751 // of whether they are being referenced in the parent thread.
752 return true;
753 }
754
755 class WorkerHeapSnapshotTaker : public AsyncWrap {
756 public:
WorkerHeapSnapshotTaker(Environment * env,Local<Object> obj)757 WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
758 : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
759
760 SET_NO_MEMORY_INFO()
761 SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
762 SET_SELF_SIZE(WorkerHeapSnapshotTaker)
763 };
764
TakeHeapSnapshot(const FunctionCallbackInfo<Value> & args)765 void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
766 Worker* w;
767 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
768
769 Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
770
771 Environment* env = w->env();
772 AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
773 Local<Object> wrap;
774 if (!env->worker_heap_snapshot_taker_template()
775 ->NewInstance(env->context()).ToLocal(&wrap)) {
776 return;
777 }
778
779 // The created WorkerHeapSnapshotTaker is an object owned by main
780 // thread's Isolate, it can not be accessed by worker thread
781 std::unique_ptr<BaseObjectPtr<WorkerHeapSnapshotTaker>> taker =
782 std::make_unique<BaseObjectPtr<WorkerHeapSnapshotTaker>>(
783 MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap));
784
785 // Interrupt the worker thread and take a snapshot, then schedule a call
786 // on the parent thread that turns that snapshot into a readable stream.
787 bool scheduled = w->RequestInterrupt([taker = std::move(taker),
788 env](Environment* worker_env) mutable {
789 heap::HeapSnapshotPointer snapshot{
790 worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot()};
791 CHECK(snapshot);
792
793 // Here, the worker thread temporarily owns the WorkerHeapSnapshotTaker
794 // object.
795
796 env->SetImmediateThreadsafe(
797 [taker = std::move(taker),
798 snapshot = std::move(snapshot)](Environment* env) mutable {
799 HandleScope handle_scope(env->isolate());
800 Context::Scope context_scope(env->context());
801
802 AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());
803 BaseObjectPtr<AsyncWrap> stream =
804 heap::CreateHeapSnapshotStream(env, std::move(snapshot));
805 Local<Value> args[] = {stream->object()};
806 taker->get()->MakeCallback(
807 env->ondone_string(), arraysize(args), args);
808 // implicitly delete `taker`
809 },
810 CallbackFlags::kUnrefed);
811
812 // Now, the lambda is delivered to the main thread, as a result, the
813 // WorkerHeapSnapshotTaker object is delivered to the main thread, too.
814 });
815
816 if (scheduled) {
817 args.GetReturnValue().Set(wrap);
818 } else {
819 args.GetReturnValue().Set(Local<Object>());
820 }
821 }
822
LoopIdleTime(const FunctionCallbackInfo<Value> & args)823 void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
824 Worker* w;
825 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
826
827 Mutex::ScopedLock lock(w->mutex_);
828 // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
829 // before locking the mutex is a race condition. So manually do the same
830 // check.
831 if (w->stopped_ || w->env_ == nullptr)
832 return args.GetReturnValue().Set(-1);
833
834 uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
835 args.GetReturnValue().Set(1.0 * idle_time / 1e6);
836 }
837
LoopStartTime(const FunctionCallbackInfo<Value> & args)838 void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
839 Worker* w;
840 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
841
842 Mutex::ScopedLock lock(w->mutex_);
843 // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
844 // before locking the mutex is a race condition. So manually do the same
845 // check.
846 if (w->stopped_ || w->env_ == nullptr)
847 return args.GetReturnValue().Set(-1);
848
849 double loop_start_time = w->env_->performance_state()->milestones[
850 node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
851 CHECK_GE(loop_start_time, 0);
852 args.GetReturnValue().Set(
853 (loop_start_time - node::performance::timeOrigin) / 1e6);
854 }
855
856 namespace {
857
858 // Return the MessagePort that is global for this Environment and communicates
859 // with the internal [kPort] port of the JS Worker class in the parent thread.
GetEnvMessagePort(const FunctionCallbackInfo<Value> & args)860 void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
861 Environment* env = Environment::GetCurrent(args);
862 Local<Object> port = env->message_port();
863 CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
864 if (!port.IsEmpty()) {
865 CHECK_EQ(port->GetCreationContext().ToLocalChecked()->GetIsolate(),
866 args.GetIsolate());
867 args.GetReturnValue().Set(port);
868 }
869 }
870
InitWorker(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)871 void InitWorker(Local<Object> target,
872 Local<Value> unused,
873 Local<Context> context,
874 void* priv) {
875 Environment* env = Environment::GetCurrent(context);
876 Isolate* isolate = env->isolate();
877
878 {
879 Local<FunctionTemplate> w = NewFunctionTemplate(isolate, Worker::New);
880
881 w->InstanceTemplate()->SetInternalFieldCount(
882 Worker::kInternalFieldCount);
883 w->Inherit(AsyncWrap::GetConstructorTemplate(env));
884
885 SetProtoMethod(isolate, w, "startThread", Worker::StartThread);
886 SetProtoMethod(isolate, w, "stopThread", Worker::StopThread);
887 SetProtoMethod(isolate, w, "hasRef", Worker::HasRef);
888 SetProtoMethod(isolate, w, "ref", Worker::Ref);
889 SetProtoMethod(isolate, w, "unref", Worker::Unref);
890 SetProtoMethod(isolate, w, "getResourceLimits", Worker::GetResourceLimits);
891 SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
892 SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
893 SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
894
895 SetConstructorFunction(context, target, "Worker", w);
896 }
897
898 {
899 Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
900
901 wst->InstanceTemplate()->SetInternalFieldCount(
902 WorkerHeapSnapshotTaker::kInternalFieldCount);
903 wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
904
905 Local<String> wst_string =
906 FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapSnapshotTaker");
907 wst->SetClassName(wst_string);
908 env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
909 }
910
911 SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort);
912
913 target
914 ->Set(env->context(),
915 env->thread_id_string(),
916 Number::New(isolate, static_cast<double>(env->thread_id())))
917 .Check();
918
919 target
920 ->Set(env->context(),
921 FIXED_ONE_BYTE_STRING(isolate, "isMainThread"),
922 Boolean::New(isolate, env->is_main_thread()))
923 .Check();
924
925 target
926 ->Set(env->context(),
927 FIXED_ONE_BYTE_STRING(isolate, "ownsProcessState"),
928 Boolean::New(isolate, env->owns_process_state()))
929 .Check();
930
931 if (!env->is_main_thread()) {
932 target
933 ->Set(env->context(),
934 FIXED_ONE_BYTE_STRING(isolate, "resourceLimits"),
935 env->worker_context()->GetResourceLimits(isolate))
936 .Check();
937 }
938
939 NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
940 NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
941 NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
942 NODE_DEFINE_CONSTANT(target, kStackSizeMb);
943 NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
944 }
945
RegisterExternalReferences(ExternalReferenceRegistry * registry)946 void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
947 registry->Register(GetEnvMessagePort);
948 registry->Register(Worker::New);
949 registry->Register(Worker::StartThread);
950 registry->Register(Worker::StopThread);
951 registry->Register(Worker::HasRef);
952 registry->Register(Worker::Ref);
953 registry->Register(Worker::Unref);
954 registry->Register(Worker::GetResourceLimits);
955 registry->Register(Worker::TakeHeapSnapshot);
956 registry->Register(Worker::LoopIdleTime);
957 registry->Register(Worker::LoopStartTime);
958 }
959
960 } // anonymous namespace
961 } // namespace worker
962 } // namespace node
963
964 NODE_BINDING_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
965 NODE_BINDING_EXTERNAL_REFERENCE(worker,
966 node::worker::RegisterExternalReferences)
967