1 #include "node_worker.h"
2 #include "debug_utils-inl.h"
3 #include "histogram-inl.h"
4 #include "memory_tracker-inl.h"
5 #include "node_errors.h"
6 #include "node_buffer.h"
7 #include "node_options-inl.h"
8 #include "node_perf.h"
9 #include "util-inl.h"
10 #include "async_wrap-inl.h"
11
12 #include <memory>
13 #include <string>
14 #include <vector>
15
16 using node::kAllowedInEnvironment;
17 using node::kDisallowedInEnvironment;
18 using v8::Array;
19 using v8::ArrayBuffer;
20 using v8::Boolean;
21 using v8::Context;
22 using v8::Float64Array;
23 using v8::FunctionCallbackInfo;
24 using v8::FunctionTemplate;
25 using v8::HandleScope;
26 using v8::Integer;
27 using v8::Isolate;
28 using v8::Local;
29 using v8::Locker;
30 using v8::MaybeLocal;
31 using v8::Null;
32 using v8::Number;
33 using v8::Object;
34 using v8::ResourceConstraints;
35 using v8::SealHandleScope;
36 using v8::String;
37 using v8::TryCatch;
38 using v8::Value;
39
40 namespace node {
41 namespace worker {
42
43 constexpr double kMB = 1024 * 1024;
44
Worker(Environment * env,Local<Object> wrap,const std::string & url,std::shared_ptr<PerIsolateOptions> per_isolate_opts,std::vector<std::string> && exec_argv,std::shared_ptr<KVStore> env_vars)45 Worker::Worker(Environment* env,
46 Local<Object> wrap,
47 const std::string& url,
48 std::shared_ptr<PerIsolateOptions> per_isolate_opts,
49 std::vector<std::string>&& exec_argv,
50 std::shared_ptr<KVStore> env_vars)
51 : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
52 per_isolate_opts_(per_isolate_opts),
53 exec_argv_(exec_argv),
54 platform_(env->isolate_data()->platform()),
55 thread_id_(AllocateEnvironmentThreadId()),
56 env_vars_(env_vars) {
57 Debug(this, "Creating new worker instance with thread id %llu",
58 thread_id_.id);
59
60 // Set up everything that needs to be set up in the parent environment.
61 parent_port_ = MessagePort::New(env, env->context());
62 if (parent_port_ == nullptr) {
63 // This can happen e.g. because execution is terminating.
64 return;
65 }
66
67 child_port_data_ = std::make_unique<MessagePortData>(nullptr);
68 MessagePort::Entangle(parent_port_, child_port_data_.get());
69
70 object()->Set(env->context(),
71 env->message_port_string(),
72 parent_port_->object()).Check();
73
74 object()->Set(env->context(),
75 env->thread_id_string(),
76 Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
77 .Check();
78
79 inspector_parent_handle_ = GetInspectorParentHandle(
80 env, thread_id_, url.c_str());
81
82 argv_ = std::vector<std::string>{env->argv()[0]};
83 // Mark this Worker object as weak until we actually start the thread.
84 MakeWeak();
85
86 Debug(this, "Preparation for worker %llu finished", thread_id_.id);
87 }
88
is_stopped() const89 bool Worker::is_stopped() const {
90 Mutex::ScopedLock lock(mutex_);
91 if (env_ != nullptr)
92 return env_->is_stopping();
93 return stopped_;
94 }
95
UpdateResourceConstraints(ResourceConstraints * constraints)96 void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
97 constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
98
99 if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
100 constraints->set_max_young_generation_size_in_bytes(
101 resource_limits_[kMaxYoungGenerationSizeMb] * kMB);
102 } else {
103 resource_limits_[kMaxYoungGenerationSizeMb] =
104 constraints->max_young_generation_size_in_bytes() / kMB;
105 }
106
107 if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
108 constraints->set_max_old_generation_size_in_bytes(
109 resource_limits_[kMaxOldGenerationSizeMb] * kMB);
110 } else {
111 resource_limits_[kMaxOldGenerationSizeMb] =
112 constraints->max_old_generation_size_in_bytes() / kMB;
113 }
114
115 if (resource_limits_[kCodeRangeSizeMb] > 0) {
116 constraints->set_code_range_size_in_bytes(
117 resource_limits_[kCodeRangeSizeMb] * kMB);
118 } else {
119 resource_limits_[kCodeRangeSizeMb] =
120 constraints->code_range_size_in_bytes() / kMB;
121 }
122 }
123
124 // This class contains data that is only relevant to the child thread itself,
125 // and only while it is running.
126 // (Eventually, the Environment instance should probably also be moved here.)
127 class WorkerThreadData {
128 public:
WorkerThreadData(Worker * w)129 explicit WorkerThreadData(Worker* w)
130 : w_(w) {
131 int ret = uv_loop_init(&loop_);
132 if (ret != 0) {
133 char err_buf[128];
134 uv_err_name_r(ret, err_buf, sizeof(err_buf));
135 w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
136 return;
137 }
138 loop_init_failed_ = false;
139 uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
140
141 std::shared_ptr<ArrayBufferAllocator> allocator =
142 ArrayBufferAllocator::Create();
143 Isolate::CreateParams params;
144 SetIsolateCreateParamsForNode(¶ms);
145 params.array_buffer_allocator_shared = allocator;
146
147 w->UpdateResourceConstraints(¶ms.constraints);
148
149 Isolate* isolate = Isolate::Allocate();
150 if (isolate == nullptr) {
151 // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
152 // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
153 w->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Isolate");
154 return;
155 }
156
157 w->platform_->RegisterIsolate(isolate, &loop_);
158 Isolate::Initialize(isolate, params);
159 SetIsolateUpForNode(isolate);
160
161 // Be sure it's called before Environment::InitializeDiagnostics()
162 // so that this callback stays when the callback of
163 // --heapsnapshot-near-heap-limit gets is popped.
164 isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
165
166 {
167 Locker locker(isolate);
168 Isolate::Scope isolate_scope(isolate);
169 // V8 computes its stack limit the first time a `Locker` is used based on
170 // --stack-size. Reset it to the correct value.
171 isolate->SetStackLimit(w->stack_base_);
172
173 HandleScope handle_scope(isolate);
174 isolate_data_.reset(CreateIsolateData(isolate,
175 &loop_,
176 w_->platform_,
177 allocator.get()));
178 CHECK(isolate_data_);
179 if (w_->per_isolate_opts_)
180 isolate_data_->set_options(std::move(w_->per_isolate_opts_));
181 isolate_data_->set_worker_context(w_);
182 isolate_data_->max_young_gen_size =
183 params.constraints.max_young_generation_size_in_bytes();
184 }
185
186 Mutex::ScopedLock lock(w_->mutex_);
187 w_->isolate_ = isolate;
188 }
189
~WorkerThreadData()190 ~WorkerThreadData() {
191 Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
192 Isolate* isolate;
193 {
194 Mutex::ScopedLock lock(w_->mutex_);
195 isolate = w_->isolate_;
196 w_->isolate_ = nullptr;
197 }
198
199 if (isolate != nullptr) {
200 CHECK(!loop_init_failed_);
201 bool platform_finished = false;
202
203 isolate_data_.reset();
204
205 w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
206 *static_cast<bool*>(data) = true;
207 }, &platform_finished);
208
209 // The order of these calls is important; if the Isolate is first disposed
210 // and then unregistered, there is a race condition window in which no
211 // new Isolate at the same address can successfully be registered with
212 // the platform.
213 // (Refs: https://github.com/nodejs/node/issues/30846)
214 w_->platform_->UnregisterIsolate(isolate);
215 isolate->Dispose();
216
217 // Wait until the platform has cleaned up all relevant resources.
218 while (!platform_finished) {
219 uv_run(&loop_, UV_RUN_ONCE);
220 }
221 }
222 if (!loop_init_failed_) {
223 CheckedUvLoopClose(&loop_);
224 }
225 }
226
loop_is_usable() const227 bool loop_is_usable() const { return !loop_init_failed_; }
228
229 private:
230 Worker* const w_;
231 uv_loop_t loop_;
232 bool loop_init_failed_ = true;
233 DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
234
235 friend class Worker;
236 };
237
NearHeapLimit(void * data,size_t current_heap_limit,size_t initial_heap_limit)238 size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
239 size_t initial_heap_limit) {
240 Worker* worker = static_cast<Worker*>(data);
241 worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
242 // Give the current GC some extra leeway to let it finish rather than
243 // crash hard. We are not going to perform further allocations anyway.
244 constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
245 return current_heap_limit + kExtraHeapAllowance;
246 }
247
Run()248 void Worker::Run() {
249 std::string name = "WorkerThread ";
250 name += std::to_string(thread_id_.id);
251 TRACE_EVENT_METADATA1(
252 "__metadata", "thread_name", "name",
253 TRACE_STR_COPY(name.c_str()));
254 CHECK_NOT_NULL(platform_);
255
256 Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
257
258 WorkerThreadData data(this);
259 if (isolate_ == nullptr) return;
260 CHECK(data.loop_is_usable());
261
262 Debug(this, "Starting worker with id %llu", thread_id_.id);
263 {
264 Locker locker(isolate_);
265 Isolate::Scope isolate_scope(isolate_);
266 SealHandleScope outer_seal(isolate_);
267
268 DeleteFnPtr<Environment, FreeEnvironment> env_;
269 auto cleanup_env = OnScopeLeave([&]() {
270 // TODO(addaleax): This call is harmless but should not be necessary.
271 // Figure out why V8 is raising a DCHECK() here without it
272 // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
273 isolate_->CancelTerminateExecution();
274
275 if (!env_) return;
276 env_->set_can_call_into_js(false);
277
278 {
279 Mutex::ScopedLock lock(mutex_);
280 stopped_ = true;
281 this->env_ = nullptr;
282 }
283
284 // TODO(addaleax): Try moving DisallowJavascriptExecutionScope into
285 // FreeEnvironment().
286 Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
287 Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
288 env_.reset();
289 });
290
291 if (is_stopped()) return;
292 {
293 HandleScope handle_scope(isolate_);
294 Local<Context> context;
295 {
296 // We create the Context object before we have an Environment* in place
297 // that we could use for error handling. If creation fails due to
298 // resource constraints, we need something in place to handle it,
299 // though.
300 TryCatch try_catch(isolate_);
301 context = NewContext(isolate_);
302 if (context.IsEmpty()) {
303 // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
304 // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
305 Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Context");
306 return;
307 }
308 }
309
310 if (is_stopped()) return;
311 CHECK(!context.IsEmpty());
312 Context::Scope context_scope(context);
313 {
314 env_.reset(CreateEnvironment(
315 data.isolate_data_.get(),
316 context,
317 std::move(argv_),
318 std::move(exec_argv_),
319 static_cast<EnvironmentFlags::Flags>(environment_flags_),
320 thread_id_,
321 std::move(inspector_parent_handle_)));
322 if (is_stopped()) return;
323 CHECK_NOT_NULL(env_);
324 env_->set_env_vars(std::move(env_vars_));
325 SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
326 Exit(exit_code);
327 });
328 }
329 {
330 Mutex::ScopedLock lock(mutex_);
331 if (stopped_) return;
332 this->env_ = env_.get();
333 }
334 Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
335 if (is_stopped()) return;
336 {
337 if (!CreateEnvMessagePort(env_.get())) {
338 return;
339 }
340
341 Debug(this, "Created message port for worker %llu", thread_id_.id);
342 if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
343 return;
344
345 Debug(this, "Loaded environment for worker %llu", thread_id_.id);
346 }
347
348 if (is_stopped()) return;
349 {
350 SealHandleScope seal(isolate_);
351 bool more;
352 env_->performance_state()->Mark(
353 node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
354 do {
355 if (is_stopped()) break;
356 uv_run(&data.loop_, UV_RUN_DEFAULT);
357 if (is_stopped()) break;
358
359 platform_->DrainTasks(isolate_);
360
361 more = uv_loop_alive(&data.loop_);
362 if (more && !is_stopped()) continue;
363
364 if (EmitProcessBeforeExit(env_.get()).IsNothing())
365 break;
366
367 // Emit `beforeExit` if the loop became alive either after emitting
368 // event, or after running some callbacks.
369 more = uv_loop_alive(&data.loop_);
370 } while (more == true && !is_stopped());
371 env_->performance_state()->Mark(
372 node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
373 }
374 }
375
376 {
377 int exit_code;
378 bool stopped = is_stopped();
379 if (!stopped) {
380 env_->VerifyNoStrongBaseObjects();
381 exit_code = EmitProcessExit(env_.get()).FromMaybe(1);
382 }
383 Mutex::ScopedLock lock(mutex_);
384 if (exit_code_ == 0 && !stopped)
385 exit_code_ = exit_code;
386
387 Debug(this, "Exiting thread for worker %llu with exit code %d",
388 thread_id_.id, exit_code_);
389 }
390 }
391
392 Debug(this, "Worker %llu thread stops", thread_id_.id);
393 }
394
CreateEnvMessagePort(Environment * env)395 bool Worker::CreateEnvMessagePort(Environment* env) {
396 HandleScope handle_scope(isolate_);
397 std::unique_ptr<MessagePortData> data;
398 {
399 Mutex::ScopedLock lock(mutex_);
400 data = std::move(child_port_data_);
401 }
402
403 // Set up the message channel for receiving messages in the child.
404 MessagePort* child_port = MessagePort::New(env,
405 env->context(),
406 std::move(data));
407 // MessagePort::New() may return nullptr if execution is terminated
408 // within it.
409 if (child_port != nullptr)
410 env->set_message_port(child_port->object(isolate_));
411
412 return child_port;
413 }
414
JoinThread()415 void Worker::JoinThread() {
416 if (thread_joined_)
417 return;
418 CHECK_EQ(uv_thread_join(&tid_), 0);
419 thread_joined_ = true;
420
421 env()->remove_sub_worker_context(this);
422
423 {
424 HandleScope handle_scope(env()->isolate());
425 Context::Scope context_scope(env()->context());
426
427 // Reset the parent port as we're closing it now anyway.
428 object()->Set(env()->context(),
429 env()->message_port_string(),
430 Undefined(env()->isolate())).Check();
431
432 Local<Value> args[] = {
433 Integer::New(env()->isolate(), exit_code_),
434 custom_error_ != nullptr
435 ? OneByteString(env()->isolate(), custom_error_).As<Value>()
436 : Null(env()->isolate()).As<Value>(),
437 !custom_error_str_.empty()
438 ? OneByteString(env()->isolate(), custom_error_str_.c_str())
439 .As<Value>()
440 : Null(env()->isolate()).As<Value>(),
441 };
442
443 MakeCallback(env()->onexit_string(), arraysize(args), args);
444 }
445
446 // If we get here, the !thread_joined_ condition at the top of the function
447 // implies that the thread was running. In that case, its final action will
448 // be to schedule a callback on the parent thread which will delete this
449 // object, so there's nothing more to do here.
450 }
451
~Worker()452 Worker::~Worker() {
453 Mutex::ScopedLock lock(mutex_);
454
455 CHECK(stopped_);
456 CHECK_NULL(env_);
457 CHECK(thread_joined_);
458
459 Debug(this, "Worker %llu destroyed", thread_id_.id);
460 }
461
New(const FunctionCallbackInfo<Value> & args)462 void Worker::New(const FunctionCallbackInfo<Value>& args) {
463 Environment* env = Environment::GetCurrent(args);
464 Isolate* isolate = args.GetIsolate();
465
466 CHECK(args.IsConstructCall());
467
468 if (env->isolate_data()->platform() == nullptr) {
469 THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
470 return;
471 }
472
473 std::string url;
474 std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
475 std::shared_ptr<KVStore> env_vars = nullptr;
476
477 std::vector<std::string> exec_argv_out;
478
479 // Argument might be a string or URL
480 if (!args[0]->IsNullOrUndefined()) {
481 Utf8Value value(
482 isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
483 url.append(value.out(), value.length());
484 }
485
486 if (args[1]->IsNull()) {
487 // Means worker.env = { ...process.env }.
488 env_vars = env->env_vars()->Clone(isolate);
489 } else if (args[1]->IsObject()) {
490 // User provided env.
491 env_vars = KVStore::CreateMapKVStore();
492 env_vars->AssignFromObject(isolate->GetCurrentContext(),
493 args[1].As<Object>());
494 } else {
495 // Env is shared.
496 env_vars = env->env_vars();
497 }
498
499 if (args[1]->IsObject() || args[2]->IsArray()) {
500 per_isolate_opts.reset(new PerIsolateOptions());
501
502 HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
503 return env_vars->Get(name).FromMaybe("");
504 });
505
506 #ifndef NODE_WITHOUT_NODE_OPTIONS
507 MaybeLocal<String> maybe_node_opts =
508 env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
509 Local<String> node_opts;
510 if (maybe_node_opts.ToLocal(&node_opts)) {
511 std::string node_options(*String::Utf8Value(isolate, node_opts));
512 std::vector<std::string> errors{};
513 std::vector<std::string> env_argv =
514 ParseNodeOptionsEnvVar(node_options, &errors);
515 // [0] is expected to be the program name, add dummy string.
516 env_argv.insert(env_argv.begin(), "");
517 std::vector<std::string> invalid_args{};
518 options_parser::Parse(&env_argv,
519 nullptr,
520 &invalid_args,
521 per_isolate_opts.get(),
522 kAllowedInEnvironment,
523 &errors);
524 if (!errors.empty() && args[1]->IsObject()) {
525 // Only fail for explicitly provided env, this protects from failures
526 // when NODE_OPTIONS from parent's env is used (which is the default).
527 Local<Value> error;
528 if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
529 Local<String> key =
530 FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
531 // Ignore the return value of Set() because exceptions bubble up to JS
532 // when we return anyway.
533 USE(args.This()->Set(env->context(), key, error));
534 return;
535 }
536 }
537 #endif
538 }
539
540 if (args[2]->IsArray()) {
541 Local<Array> array = args[2].As<Array>();
542 // The first argument is reserved for program name, but we don't need it
543 // in workers.
544 std::vector<std::string> exec_argv = {""};
545 uint32_t length = array->Length();
546 for (uint32_t i = 0; i < length; i++) {
547 Local<Value> arg;
548 if (!array->Get(env->context(), i).ToLocal(&arg)) {
549 return;
550 }
551 Local<String> arg_v8;
552 if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
553 return;
554 }
555 Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
556 std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
557 exec_argv.push_back(arg_string);
558 }
559
560 std::vector<std::string> invalid_args{};
561 std::vector<std::string> errors{};
562 // Using invalid_args as the v8_args argument as it stores unknown
563 // options for the per isolate parser.
564 options_parser::Parse(
565 &exec_argv,
566 &exec_argv_out,
567 &invalid_args,
568 per_isolate_opts.get(),
569 kDisallowedInEnvironment,
570 &errors);
571
572 // The first argument is program name.
573 invalid_args.erase(invalid_args.begin());
574 if (errors.size() > 0 || invalid_args.size() > 0) {
575 Local<Value> error;
576 if (!ToV8Value(env->context(),
577 errors.size() > 0 ? errors : invalid_args)
578 .ToLocal(&error)) {
579 return;
580 }
581 Local<String> key =
582 FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
583 // Ignore the return value of Set() because exceptions bubble up to JS
584 // when we return anyway.
585 USE(args.This()->Set(env->context(), key, error));
586 return;
587 }
588 } else {
589 exec_argv_out = env->exec_argv();
590 }
591
592 Worker* worker = new Worker(env,
593 args.This(),
594 url,
595 per_isolate_opts,
596 std::move(exec_argv_out),
597 env_vars);
598
599 CHECK(args[3]->IsFloat64Array());
600 Local<Float64Array> limit_info = args[3].As<Float64Array>();
601 CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
602 limit_info->CopyContents(worker->resource_limits_,
603 sizeof(worker->resource_limits_));
604
605 CHECK(args[4]->IsBoolean());
606 if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
607 worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
608 if (env->hide_console_windows())
609 worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
610 if (env->no_native_addons())
611 worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
612 }
613
StartThread(const FunctionCallbackInfo<Value> & args)614 void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
615 Worker* w;
616 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
617 Mutex::ScopedLock lock(w->mutex_);
618
619 w->stopped_ = false;
620
621 if (w->resource_limits_[kStackSizeMb] > 0) {
622 if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
623 w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
624 w->stack_size_ = kStackBufferSize;
625 } else {
626 w->stack_size_ = w->resource_limits_[kStackSizeMb] * kMB;
627 }
628 } else {
629 w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
630 }
631
632 uv_thread_options_t thread_options;
633 thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
634 thread_options.stack_size = w->stack_size_;
635 int ret = uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
636 // XXX: This could become a std::unique_ptr, but that makes at least
637 // gcc 6.3 detect undefined behaviour when there shouldn't be any.
638 // gcc 7+ handles this well.
639 Worker* w = static_cast<Worker*>(arg);
640 const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
641
642 // Leave a few kilobytes just to make sure we're within limits and have
643 // some space to do work in C++ land.
644 w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
645
646 w->Run();
647
648 Mutex::ScopedLock lock(w->mutex_);
649 w->env()->SetImmediateThreadsafe(
650 [w = std::unique_ptr<Worker>(w)](Environment* env) {
651 if (w->has_ref_)
652 env->add_refs(-1);
653 w->JoinThread();
654 // implicitly delete w
655 });
656 }, static_cast<void*>(w));
657
658 if (ret == 0) {
659 // The object now owns the created thread and should not be garbage
660 // collected until that finishes.
661 w->ClearWeak();
662 w->thread_joined_ = false;
663
664 if (w->has_ref_)
665 w->env()->add_refs(1);
666
667 w->env()->add_sub_worker_context(w);
668 } else {
669 w->stopped_ = true;
670
671 char err_buf[128];
672 uv_err_name_r(ret, err_buf, sizeof(err_buf));
673 {
674 Isolate* isolate = w->env()->isolate();
675 HandleScope handle_scope(isolate);
676 THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
677 }
678 }
679 }
680
StopThread(const FunctionCallbackInfo<Value> & args)681 void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
682 Worker* w;
683 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
684
685 Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
686 w->Exit(1);
687 }
688
Ref(const FunctionCallbackInfo<Value> & args)689 void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
690 Worker* w;
691 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
692 if (!w->has_ref_ && !w->thread_joined_) {
693 w->has_ref_ = true;
694 w->env()->add_refs(1);
695 }
696 }
697
Unref(const FunctionCallbackInfo<Value> & args)698 void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
699 Worker* w;
700 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
701 if (w->has_ref_ && !w->thread_joined_) {
702 w->has_ref_ = false;
703 w->env()->add_refs(-1);
704 }
705 }
706
GetResourceLimits(const FunctionCallbackInfo<Value> & args)707 void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
708 Worker* w;
709 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
710 args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
711 }
712
GetResourceLimits(Isolate * isolate) const713 Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
714 Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
715
716 memcpy(ab->GetBackingStore()->Data(),
717 resource_limits_,
718 sizeof(resource_limits_));
719 return Float64Array::New(ab, 0, kTotalResourceLimitCount);
720 }
721
Exit(int code,const char * error_code,const char * error_message)722 void Worker::Exit(int code, const char* error_code, const char* error_message) {
723 Mutex::ScopedLock lock(mutex_);
724 Debug(this, "Worker %llu called Exit(%d, %s, %s)",
725 thread_id_.id, code, error_code, error_message);
726
727 if (error_code != nullptr) {
728 custom_error_ = error_code;
729 custom_error_str_ = error_message;
730 }
731
732 if (env_ != nullptr) {
733 exit_code_ = code;
734 Stop(env_);
735 } else {
736 stopped_ = true;
737 }
738 }
739
MemoryInfo(MemoryTracker * tracker) const740 void Worker::MemoryInfo(MemoryTracker* tracker) const {
741 tracker->TrackField("parent_port", parent_port_);
742 }
743
IsNotIndicativeOfMemoryLeakAtExit() const744 bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
745 // Worker objects always stay alive as long as the child thread, regardless
746 // of whether they are being referenced in the parent thread.
747 return true;
748 }
749
750 class WorkerHeapSnapshotTaker : public AsyncWrap {
751 public:
WorkerHeapSnapshotTaker(Environment * env,Local<Object> obj)752 WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
753 : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
754
755 SET_NO_MEMORY_INFO()
756 SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
757 SET_SELF_SIZE(WorkerHeapSnapshotTaker)
758 };
759
TakeHeapSnapshot(const FunctionCallbackInfo<Value> & args)760 void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
761 Worker* w;
762 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
763
764 Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
765
766 Environment* env = w->env();
767 AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
768 Local<Object> wrap;
769 if (!env->worker_heap_snapshot_taker_template()
770 ->NewInstance(env->context()).ToLocal(&wrap)) {
771 return;
772 }
773 BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
774 MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
775
776 // Interrupt the worker thread and take a snapshot, then schedule a call
777 // on the parent thread that turns that snapshot into a readable stream.
778 bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
779 heap::HeapSnapshotPointer snapshot {
780 worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
781 CHECK(snapshot);
782 env->SetImmediateThreadsafe(
783 [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
784 HandleScope handle_scope(env->isolate());
785 Context::Scope context_scope(env->context());
786
787 AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
788 BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
789 env, std::move(snapshot));
790 Local<Value> args[] = { stream->object() };
791 taker->MakeCallback(env->ondone_string(), arraysize(args), args);
792 }, CallbackFlags::kUnrefed);
793 });
794 args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
795 }
796
LoopIdleTime(const FunctionCallbackInfo<Value> & args)797 void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
798 Worker* w;
799 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
800
801 Mutex::ScopedLock lock(w->mutex_);
802 // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
803 // before locking the mutex is a race condition. So manually do the same
804 // check.
805 if (w->stopped_ || w->env_ == nullptr)
806 return args.GetReturnValue().Set(-1);
807
808 uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
809 args.GetReturnValue().Set(1.0 * idle_time / 1e6);
810 }
811
LoopStartTime(const FunctionCallbackInfo<Value> & args)812 void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
813 Worker* w;
814 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
815
816 Mutex::ScopedLock lock(w->mutex_);
817 // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
818 // before locking the mutex is a race condition. So manually do the same
819 // check.
820 if (w->stopped_ || w->env_ == nullptr)
821 return args.GetReturnValue().Set(-1);
822
823 double loop_start_time = w->env_->performance_state()->milestones[
824 node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
825 CHECK_GE(loop_start_time, 0);
826 args.GetReturnValue().Set(
827 (loop_start_time - node::performance::timeOrigin) / 1e6);
828 }
829
830 namespace {
831
832 // Return the MessagePort that is global for this Environment and communicates
833 // with the internal [kPort] port of the JS Worker class in the parent thread.
GetEnvMessagePort(const FunctionCallbackInfo<Value> & args)834 void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
835 Environment* env = Environment::GetCurrent(args);
836 Local<Object> port = env->message_port();
837 CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
838 if (!port.IsEmpty()) {
839 CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
840 args.GetReturnValue().Set(port);
841 }
842 }
843
InitWorker(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)844 void InitWorker(Local<Object> target,
845 Local<Value> unused,
846 Local<Context> context,
847 void* priv) {
848 Environment* env = Environment::GetCurrent(context);
849
850 {
851 Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
852
853 w->InstanceTemplate()->SetInternalFieldCount(
854 Worker::kInternalFieldCount);
855 w->Inherit(AsyncWrap::GetConstructorTemplate(env));
856
857 env->SetProtoMethod(w, "startThread", Worker::StartThread);
858 env->SetProtoMethod(w, "stopThread", Worker::StopThread);
859 env->SetProtoMethod(w, "ref", Worker::Ref);
860 env->SetProtoMethod(w, "unref", Worker::Unref);
861 env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
862 env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
863 env->SetProtoMethod(w, "loopIdleTime", Worker::LoopIdleTime);
864 env->SetProtoMethod(w, "loopStartTime", Worker::LoopStartTime);
865
866 env->SetConstructorFunction(target, "Worker", w);
867 }
868
869 {
870 Local<FunctionTemplate> wst = FunctionTemplate::New(env->isolate());
871
872 wst->InstanceTemplate()->SetInternalFieldCount(
873 WorkerHeapSnapshotTaker::kInternalFieldCount);
874 wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
875
876 Local<String> wst_string =
877 FIXED_ONE_BYTE_STRING(env->isolate(), "WorkerHeapSnapshotTaker");
878 wst->SetClassName(wst_string);
879 env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
880 }
881
882 env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
883
884 target
885 ->Set(env->context(),
886 env->thread_id_string(),
887 Number::New(env->isolate(), static_cast<double>(env->thread_id())))
888 .Check();
889
890 target
891 ->Set(env->context(),
892 FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
893 Boolean::New(env->isolate(), env->is_main_thread()))
894 .Check();
895
896 target
897 ->Set(env->context(),
898 FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
899 Boolean::New(env->isolate(), env->owns_process_state()))
900 .Check();
901
902 if (!env->is_main_thread()) {
903 target
904 ->Set(env->context(),
905 FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
906 env->worker_context()->GetResourceLimits(env->isolate()))
907 .Check();
908 }
909
910 NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
911 NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
912 NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
913 NODE_DEFINE_CONSTANT(target, kStackSizeMb);
914 NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
915 }
916
917 } // anonymous namespace
918
919 } // namespace worker
920 } // namespace node
921
922 NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
923