1 #include "node_worker.h"
2 #include "debug_utils-inl.h"
3 #include "memory_tracker-inl.h"
4 #include "node_errors.h"
5 #include "node_buffer.h"
6 #include "node_options-inl.h"
7 #include "node_perf.h"
8 #include "util-inl.h"
9 #include "async_wrap-inl.h"
10
11 #include <memory>
12 #include <string>
13 #include <vector>
14
15 using node::kAllowedInEnvironment;
16 using node::kDisallowedInEnvironment;
17 using v8::Array;
18 using v8::ArrayBuffer;
19 using v8::Boolean;
20 using v8::Context;
21 using v8::Float64Array;
22 using v8::FunctionCallbackInfo;
23 using v8::FunctionTemplate;
24 using v8::HandleScope;
25 using v8::Integer;
26 using v8::Isolate;
27 using v8::Local;
28 using v8::Locker;
29 using v8::MaybeLocal;
30 using v8::Null;
31 using v8::Number;
32 using v8::Object;
33 using v8::ResourceConstraints;
34 using v8::SealHandleScope;
35 using v8::String;
36 using v8::TryCatch;
37 using v8::Value;
38
39 namespace node {
40 namespace worker {
41
42 constexpr double kMB = 1024 * 1024;
43
Worker(Environment * env,Local<Object> wrap,const std::string & url,std::shared_ptr<PerIsolateOptions> per_isolate_opts,std::vector<std::string> && exec_argv,std::shared_ptr<KVStore> env_vars)44 Worker::Worker(Environment* env,
45 Local<Object> wrap,
46 const std::string& url,
47 std::shared_ptr<PerIsolateOptions> per_isolate_opts,
48 std::vector<std::string>&& exec_argv,
49 std::shared_ptr<KVStore> env_vars)
50 : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
51 per_isolate_opts_(per_isolate_opts),
52 exec_argv_(exec_argv),
53 platform_(env->isolate_data()->platform()),
54 array_buffer_allocator_(ArrayBufferAllocator::Create()),
55 thread_id_(AllocateEnvironmentThreadId()),
56 env_vars_(env_vars) {
57 Debug(this, "Creating new worker instance with thread id %llu",
58 thread_id_.id);
59
60 // Set up everything that needs to be set up in the parent environment.
61 parent_port_ = MessagePort::New(env, env->context());
62 if (parent_port_ == nullptr) {
63 // This can happen e.g. because execution is terminating.
64 return;
65 }
66
67 child_port_data_ = std::make_unique<MessagePortData>(nullptr);
68 MessagePort::Entangle(parent_port_, child_port_data_.get());
69
70 object()->Set(env->context(),
71 env->message_port_string(),
72 parent_port_->object()).Check();
73
74 object()->Set(env->context(),
75 env->thread_id_string(),
76 Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
77 .Check();
78
79 inspector_parent_handle_ = GetInspectorParentHandle(
80 env, thread_id_, url.c_str());
81
82 argv_ = std::vector<std::string>{env->argv()[0]};
83 // Mark this Worker object as weak until we actually start the thread.
84 MakeWeak();
85
86 Debug(this, "Preparation for worker %llu finished", thread_id_.id);
87 }
88
is_stopped() const89 bool Worker::is_stopped() const {
90 Mutex::ScopedLock lock(mutex_);
91 if (env_ != nullptr)
92 return env_->is_stopping();
93 return stopped_;
94 }
95
array_buffer_allocator()96 std::shared_ptr<ArrayBufferAllocator> Worker::array_buffer_allocator() {
97 return array_buffer_allocator_;
98 }
99
UpdateResourceConstraints(ResourceConstraints * constraints)100 void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
101 constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
102
103 if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
104 constraints->set_max_young_generation_size_in_bytes(
105 resource_limits_[kMaxYoungGenerationSizeMb] * kMB);
106 } else {
107 resource_limits_[kMaxYoungGenerationSizeMb] =
108 constraints->max_young_generation_size_in_bytes() / kMB;
109 }
110
111 if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
112 constraints->set_max_old_generation_size_in_bytes(
113 resource_limits_[kMaxOldGenerationSizeMb] * kMB);
114 } else {
115 resource_limits_[kMaxOldGenerationSizeMb] =
116 constraints->max_old_generation_size_in_bytes() / kMB;
117 }
118
119 if (resource_limits_[kCodeRangeSizeMb] > 0) {
120 constraints->set_code_range_size_in_bytes(
121 resource_limits_[kCodeRangeSizeMb] * kMB);
122 } else {
123 resource_limits_[kCodeRangeSizeMb] =
124 constraints->code_range_size_in_bytes() / kMB;
125 }
126 }
127
128 // This class contains data that is only relevant to the child thread itself,
129 // and only while it is running.
130 // (Eventually, the Environment instance should probably also be moved here.)
131 class WorkerThreadData {
132 public:
WorkerThreadData(Worker * w)133 explicit WorkerThreadData(Worker* w)
134 : w_(w) {
135 int ret = uv_loop_init(&loop_);
136 if (ret != 0) {
137 char err_buf[128];
138 uv_err_name_r(ret, err_buf, sizeof(err_buf));
139 w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
140 return;
141 }
142 loop_init_failed_ = false;
143 uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
144
145 Isolate::CreateParams params;
146 SetIsolateCreateParamsForNode(¶ms);
147 params.array_buffer_allocator = w->array_buffer_allocator_.get();
148
149 w->UpdateResourceConstraints(¶ms.constraints);
150
151 Isolate* isolate = Isolate::Allocate();
152 if (isolate == nullptr) {
153 // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
154 // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
155 w->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Isolate");
156 return;
157 }
158
159 w->platform_->RegisterIsolate(isolate, &loop_);
160 Isolate::Initialize(isolate, params);
161 SetIsolateUpForNode(isolate);
162
163 isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
164
165 {
166 Locker locker(isolate);
167 Isolate::Scope isolate_scope(isolate);
168 // V8 computes its stack limit the first time a `Locker` is used based on
169 // --stack-size. Reset it to the correct value.
170 isolate->SetStackLimit(w->stack_base_);
171
172 HandleScope handle_scope(isolate);
173 isolate_data_.reset(CreateIsolateData(isolate,
174 &loop_,
175 w_->platform_,
176 w->array_buffer_allocator_.get()));
177 CHECK(isolate_data_);
178 if (w_->per_isolate_opts_)
179 isolate_data_->set_options(std::move(w_->per_isolate_opts_));
180 isolate_data_->set_worker_context(w_);
181 }
182
183 Mutex::ScopedLock lock(w_->mutex_);
184 w_->isolate_ = isolate;
185 }
186
~WorkerThreadData()187 ~WorkerThreadData() {
188 Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
189 Isolate* isolate;
190 {
191 Mutex::ScopedLock lock(w_->mutex_);
192 isolate = w_->isolate_;
193 w_->isolate_ = nullptr;
194 }
195
196 if (isolate != nullptr) {
197 CHECK(!loop_init_failed_);
198 bool platform_finished = false;
199
200 isolate_data_.reset();
201
202 w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
203 *static_cast<bool*>(data) = true;
204 }, &platform_finished);
205
206 // The order of these calls is important; if the Isolate is first disposed
207 // and then unregistered, there is a race condition window in which no
208 // new Isolate at the same address can successfully be registered with
209 // the platform.
210 // (Refs: https://github.com/nodejs/node/issues/30846)
211 w_->platform_->UnregisterIsolate(isolate);
212 isolate->Dispose();
213
214 // Wait until the platform has cleaned up all relevant resources.
215 while (!platform_finished) {
216 uv_run(&loop_, UV_RUN_ONCE);
217 }
218 }
219 if (!loop_init_failed_) {
220 CheckedUvLoopClose(&loop_);
221 }
222 }
223
loop_is_usable() const224 bool loop_is_usable() const { return !loop_init_failed_; }
225
226 private:
227 Worker* const w_;
228 uv_loop_t loop_;
229 bool loop_init_failed_ = true;
230 DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
231
232 friend class Worker;
233 };
234
NearHeapLimit(void * data,size_t current_heap_limit,size_t initial_heap_limit)235 size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
236 size_t initial_heap_limit) {
237 Worker* worker = static_cast<Worker*>(data);
238 worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
239 // Give the current GC some extra leeway to let it finish rather than
240 // crash hard. We are not going to perform further allocations anyway.
241 constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
242 return current_heap_limit + kExtraHeapAllowance;
243 }
244
Run()245 void Worker::Run() {
246 std::string name = "WorkerThread ";
247 name += std::to_string(thread_id_.id);
248 TRACE_EVENT_METADATA1(
249 "__metadata", "thread_name", "name",
250 TRACE_STR_COPY(name.c_str()));
251 CHECK_NOT_NULL(platform_);
252
253 Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
254
255 WorkerThreadData data(this);
256 if (isolate_ == nullptr) return;
257 CHECK(data.loop_is_usable());
258
259 Debug(this, "Starting worker with id %llu", thread_id_.id);
260 {
261 Locker locker(isolate_);
262 Isolate::Scope isolate_scope(isolate_);
263 SealHandleScope outer_seal(isolate_);
264
265 DeleteFnPtr<Environment, FreeEnvironment> env_;
266 auto cleanup_env = OnScopeLeave([&]() {
267 // TODO(addaleax): This call is harmless but should not be necessary.
268 // Figure out why V8 is raising a DCHECK() here without it
269 // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
270 isolate_->CancelTerminateExecution();
271
272 if (!env_) return;
273 env_->set_can_call_into_js(false);
274
275 {
276 Mutex::ScopedLock lock(mutex_);
277 stopped_ = true;
278 this->env_ = nullptr;
279 }
280
281 // TODO(addaleax): Try moving DisallowJavascriptExecutionScope into
282 // FreeEnvironment().
283 Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
284 Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
285 env_.reset();
286 });
287
288 if (is_stopped()) return;
289 {
290 HandleScope handle_scope(isolate_);
291 Local<Context> context;
292 {
293 // We create the Context object before we have an Environment* in place
294 // that we could use for error handling. If creation fails due to
295 // resource constraints, we need something in place to handle it,
296 // though.
297 TryCatch try_catch(isolate_);
298 context = NewContext(isolate_);
299 if (context.IsEmpty()) {
300 // TODO(addaleax): This should be ERR_WORKER_INIT_FAILED,
301 // ERR_WORKER_OUT_OF_MEMORY is for reaching the per-Worker heap limit.
302 Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "Failed to create new Context");
303 return;
304 }
305 }
306
307 if (is_stopped()) return;
308 CHECK(!context.IsEmpty());
309 Context::Scope context_scope(context);
310 {
311 env_.reset(CreateEnvironment(
312 data.isolate_data_.get(),
313 context,
314 std::move(argv_),
315 std::move(exec_argv_),
316 static_cast<EnvironmentFlags::Flags>(environment_flags_),
317 thread_id_,
318 std::move(inspector_parent_handle_)));
319 if (is_stopped()) return;
320 CHECK_NOT_NULL(env_);
321 env_->set_env_vars(std::move(env_vars_));
322 SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
323 Exit(exit_code);
324 });
325 }
326 {
327 Mutex::ScopedLock lock(mutex_);
328 if (stopped_) return;
329 this->env_ = env_.get();
330 }
331 Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
332 if (is_stopped()) return;
333 {
334 CreateEnvMessagePort(env_.get());
335 Debug(this, "Created message port for worker %llu", thread_id_.id);
336 if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
337 return;
338
339 Debug(this, "Loaded environment for worker %llu", thread_id_.id);
340 }
341
342 if (is_stopped()) return;
343 {
344 SealHandleScope seal(isolate_);
345 bool more;
346 env_->performance_state()->Mark(
347 node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
348 do {
349 if (is_stopped()) break;
350 uv_run(&data.loop_, UV_RUN_DEFAULT);
351 if (is_stopped()) break;
352
353 platform_->DrainTasks(isolate_);
354
355 more = uv_loop_alive(&data.loop_);
356 if (more && !is_stopped()) continue;
357
358 EmitBeforeExit(env_.get());
359
360 // Emit `beforeExit` if the loop became alive either after emitting
361 // event, or after running some callbacks.
362 more = uv_loop_alive(&data.loop_);
363 } while (more == true && !is_stopped());
364 env_->performance_state()->Mark(
365 node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
366 }
367 }
368
369 {
370 int exit_code;
371 bool stopped = is_stopped();
372 if (!stopped)
373 exit_code = EmitExit(env_.get());
374 Mutex::ScopedLock lock(mutex_);
375 if (exit_code_ == 0 && !stopped)
376 exit_code_ = exit_code;
377
378 Debug(this, "Exiting thread for worker %llu with exit code %d",
379 thread_id_.id, exit_code_);
380 }
381 }
382
383 Debug(this, "Worker %llu thread stops", thread_id_.id);
384 }
385
CreateEnvMessagePort(Environment * env)386 void Worker::CreateEnvMessagePort(Environment* env) {
387 HandleScope handle_scope(isolate_);
388 Mutex::ScopedLock lock(mutex_);
389 // Set up the message channel for receiving messages in the child.
390 MessagePort* child_port = MessagePort::New(env,
391 env->context(),
392 std::move(child_port_data_));
393 // MessagePort::New() may return nullptr if execution is terminated
394 // within it.
395 if (child_port != nullptr)
396 env->set_message_port(child_port->object(isolate_));
397 }
398
JoinThread()399 void Worker::JoinThread() {
400 if (thread_joined_)
401 return;
402 CHECK_EQ(uv_thread_join(&tid_), 0);
403 thread_joined_ = true;
404
405 env()->remove_sub_worker_context(this);
406
407 {
408 HandleScope handle_scope(env()->isolate());
409 Context::Scope context_scope(env()->context());
410
411 // Reset the parent port as we're closing it now anyway.
412 object()->Set(env()->context(),
413 env()->message_port_string(),
414 Undefined(env()->isolate())).Check();
415
416 Local<Value> args[] = {
417 Integer::New(env()->isolate(), exit_code_),
418 custom_error_ != nullptr
419 ? OneByteString(env()->isolate(), custom_error_).As<Value>()
420 : Null(env()->isolate()).As<Value>(),
421 !custom_error_str_.empty()
422 ? OneByteString(env()->isolate(), custom_error_str_.c_str())
423 .As<Value>()
424 : Null(env()->isolate()).As<Value>(),
425 };
426
427 MakeCallback(env()->onexit_string(), arraysize(args), args);
428 }
429
430 // If we get here, the !thread_joined_ condition at the top of the function
431 // implies that the thread was running. In that case, its final action will
432 // be to schedule a callback on the parent thread which will delete this
433 // object, so there's nothing more to do here.
434 }
435
~Worker()436 Worker::~Worker() {
437 Mutex::ScopedLock lock(mutex_);
438
439 CHECK(stopped_);
440 CHECK_NULL(env_);
441 CHECK(thread_joined_);
442
443 Debug(this, "Worker %llu destroyed", thread_id_.id);
444 }
445
New(const FunctionCallbackInfo<Value> & args)446 void Worker::New(const FunctionCallbackInfo<Value>& args) {
447 Environment* env = Environment::GetCurrent(args);
448 Isolate* isolate = args.GetIsolate();
449
450 CHECK(args.IsConstructCall());
451
452 if (env->isolate_data()->platform() == nullptr) {
453 THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
454 return;
455 }
456
457 std::string url;
458 std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
459 std::shared_ptr<KVStore> env_vars = nullptr;
460
461 std::vector<std::string> exec_argv_out;
462
463 // Argument might be a string or URL
464 if (!args[0]->IsNullOrUndefined()) {
465 Utf8Value value(
466 isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
467 url.append(value.out(), value.length());
468 }
469
470 if (args[1]->IsNull()) {
471 // Means worker.env = { ...process.env }.
472 env_vars = env->env_vars()->Clone(isolate);
473 } else if (args[1]->IsObject()) {
474 // User provided env.
475 env_vars = KVStore::CreateMapKVStore();
476 env_vars->AssignFromObject(isolate->GetCurrentContext(),
477 args[1].As<Object>());
478 } else {
479 // Env is shared.
480 env_vars = env->env_vars();
481 }
482
483 if (args[1]->IsObject() || args[2]->IsArray()) {
484 per_isolate_opts.reset(new PerIsolateOptions());
485
486 HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
487 return env_vars->Get(name).FromMaybe("");
488 });
489
490 #ifndef NODE_WITHOUT_NODE_OPTIONS
491 MaybeLocal<String> maybe_node_opts =
492 env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
493 Local<String> node_opts;
494 if (maybe_node_opts.ToLocal(&node_opts)) {
495 std::string node_options(*String::Utf8Value(isolate, node_opts));
496 std::vector<std::string> errors{};
497 std::vector<std::string> env_argv =
498 ParseNodeOptionsEnvVar(node_options, &errors);
499 // [0] is expected to be the program name, add dummy string.
500 env_argv.insert(env_argv.begin(), "");
501 std::vector<std::string> invalid_args{};
502 options_parser::Parse(&env_argv,
503 nullptr,
504 &invalid_args,
505 per_isolate_opts.get(),
506 kAllowedInEnvironment,
507 &errors);
508 if (!errors.empty() && args[1]->IsObject()) {
509 // Only fail for explicitly provided env, this protects from failures
510 // when NODE_OPTIONS from parent's env is used (which is the default).
511 Local<Value> error;
512 if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
513 Local<String> key =
514 FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
515 // Ignore the return value of Set() because exceptions bubble up to JS
516 // when we return anyway.
517 USE(args.This()->Set(env->context(), key, error));
518 return;
519 }
520 }
521 #endif
522 }
523
524 if (args[2]->IsArray()) {
525 Local<Array> array = args[2].As<Array>();
526 // The first argument is reserved for program name, but we don't need it
527 // in workers.
528 std::vector<std::string> exec_argv = {""};
529 uint32_t length = array->Length();
530 for (uint32_t i = 0; i < length; i++) {
531 Local<Value> arg;
532 if (!array->Get(env->context(), i).ToLocal(&arg)) {
533 return;
534 }
535 Local<String> arg_v8;
536 if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
537 return;
538 }
539 Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
540 std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
541 exec_argv.push_back(arg_string);
542 }
543
544 std::vector<std::string> invalid_args{};
545 std::vector<std::string> errors{};
546 // Using invalid_args as the v8_args argument as it stores unknown
547 // options for the per isolate parser.
548 options_parser::Parse(
549 &exec_argv,
550 &exec_argv_out,
551 &invalid_args,
552 per_isolate_opts.get(),
553 kDisallowedInEnvironment,
554 &errors);
555
556 // The first argument is program name.
557 invalid_args.erase(invalid_args.begin());
558 if (errors.size() > 0 || invalid_args.size() > 0) {
559 Local<Value> error;
560 if (!ToV8Value(env->context(),
561 errors.size() > 0 ? errors : invalid_args)
562 .ToLocal(&error)) {
563 return;
564 }
565 Local<String> key =
566 FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
567 // Ignore the return value of Set() because exceptions bubble up to JS
568 // when we return anyway.
569 USE(args.This()->Set(env->context(), key, error));
570 return;
571 }
572 } else {
573 exec_argv_out = env->exec_argv();
574 }
575
576 Worker* worker = new Worker(env,
577 args.This(),
578 url,
579 per_isolate_opts,
580 std::move(exec_argv_out),
581 env_vars);
582
583 CHECK(args[3]->IsFloat64Array());
584 Local<Float64Array> limit_info = args[3].As<Float64Array>();
585 CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
586 limit_info->CopyContents(worker->resource_limits_,
587 sizeof(worker->resource_limits_));
588
589 CHECK(args[4]->IsBoolean());
590 if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
591 worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
592 }
593
StartThread(const FunctionCallbackInfo<Value> & args)594 void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
595 Worker* w;
596 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
597 Mutex::ScopedLock lock(w->mutex_);
598
599 w->stopped_ = false;
600
601 if (w->resource_limits_[kStackSizeMb] > 0) {
602 if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
603 w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
604 w->stack_size_ = kStackBufferSize;
605 } else {
606 w->stack_size_ = w->resource_limits_[kStackSizeMb] * kMB;
607 }
608 } else {
609 w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
610 }
611
612 uv_thread_options_t thread_options;
613 thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
614 thread_options.stack_size = w->stack_size_;
615 int ret = uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
616 // XXX: This could become a std::unique_ptr, but that makes at least
617 // gcc 6.3 detect undefined behaviour when there shouldn't be any.
618 // gcc 7+ handles this well.
619 Worker* w = static_cast<Worker*>(arg);
620 const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
621
622 // Leave a few kilobytes just to make sure we're within limits and have
623 // some space to do work in C++ land.
624 w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
625
626 w->Run();
627
628 Mutex::ScopedLock lock(w->mutex_);
629 w->env()->SetImmediateThreadsafe(
630 [w = std::unique_ptr<Worker>(w)](Environment* env) {
631 if (w->has_ref_)
632 env->add_refs(-1);
633 w->JoinThread();
634 // implicitly delete w
635 });
636 }, static_cast<void*>(w));
637
638 if (ret == 0) {
639 // The object now owns the created thread and should not be garbage
640 // collected until that finishes.
641 w->ClearWeak();
642 w->thread_joined_ = false;
643
644 if (w->has_ref_)
645 w->env()->add_refs(1);
646
647 w->env()->add_sub_worker_context(w);
648 } else {
649 w->stopped_ = true;
650
651 char err_buf[128];
652 uv_err_name_r(ret, err_buf, sizeof(err_buf));
653 {
654 Isolate* isolate = w->env()->isolate();
655 HandleScope handle_scope(isolate);
656 THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
657 }
658 }
659 }
660
StopThread(const FunctionCallbackInfo<Value> & args)661 void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
662 Worker* w;
663 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
664
665 Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
666 w->Exit(1);
667 }
668
Ref(const FunctionCallbackInfo<Value> & args)669 void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
670 Worker* w;
671 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
672 if (!w->has_ref_ && !w->thread_joined_) {
673 w->has_ref_ = true;
674 w->env()->add_refs(1);
675 }
676 }
677
Unref(const FunctionCallbackInfo<Value> & args)678 void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
679 Worker* w;
680 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
681 if (w->has_ref_ && !w->thread_joined_) {
682 w->has_ref_ = false;
683 w->env()->add_refs(-1);
684 }
685 }
686
GetResourceLimits(const FunctionCallbackInfo<Value> & args)687 void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
688 Worker* w;
689 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
690 args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
691 }
692
GetResourceLimits(Isolate * isolate) const693 Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
694 Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
695 memcpy(ab->GetContents().Data(), resource_limits_, sizeof(resource_limits_));
696 return Float64Array::New(ab, 0, kTotalResourceLimitCount);
697 }
698
Exit(int code,const char * error_code,const char * error_message)699 void Worker::Exit(int code, const char* error_code, const char* error_message) {
700 Mutex::ScopedLock lock(mutex_);
701 Debug(this, "Worker %llu called Exit(%d, %s, %s)",
702 thread_id_.id, code, error_code, error_message);
703
704 if (error_code != nullptr) {
705 custom_error_ = error_code;
706 custom_error_str_ = error_message;
707 }
708
709 if (env_ != nullptr) {
710 exit_code_ = code;
711 Stop(env_);
712 } else {
713 stopped_ = true;
714 }
715 }
716
MemoryInfo(MemoryTracker * tracker) const717 void Worker::MemoryInfo(MemoryTracker* tracker) const {
718 tracker->TrackField("parent_port", parent_port_);
719 }
720
721 class WorkerHeapSnapshotTaker : public AsyncWrap {
722 public:
WorkerHeapSnapshotTaker(Environment * env,Local<Object> obj)723 WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
724 : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
725
726 SET_NO_MEMORY_INFO()
727 SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
728 SET_SELF_SIZE(WorkerHeapSnapshotTaker)
729 };
730
TakeHeapSnapshot(const FunctionCallbackInfo<Value> & args)731 void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
732 Worker* w;
733 ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
734
735 Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
736
737 Environment* env = w->env();
738 AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
739 Local<Object> wrap;
740 if (!env->worker_heap_snapshot_taker_template()
741 ->NewInstance(env->context()).ToLocal(&wrap)) {
742 return;
743 }
744 BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
745 MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);
746
747 // Interrupt the worker thread and take a snapshot, then schedule a call
748 // on the parent thread that turns that snapshot into a readable stream.
749 bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
750 heap::HeapSnapshotPointer snapshot {
751 worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
752 CHECK(snapshot);
753 env->SetImmediateThreadsafe(
754 [taker, snapshot = std::move(snapshot)](Environment* env) mutable {
755 HandleScope handle_scope(env->isolate());
756 Context::Scope context_scope(env->context());
757
758 AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
759 BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
760 env, std::move(snapshot));
761 Local<Value> args[] = { stream->object() };
762 taker->MakeCallback(env->ondone_string(), arraysize(args), args);
763 }, CallbackFlags::kUnrefed);
764 });
765 args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
766 }
767
768 namespace {
769
770 // Return the MessagePort that is global for this Environment and communicates
771 // with the internal [kPort] port of the JS Worker class in the parent thread.
GetEnvMessagePort(const FunctionCallbackInfo<Value> & args)772 void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
773 Environment* env = Environment::GetCurrent(args);
774 Local<Object> port = env->message_port();
775 CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
776 if (!port.IsEmpty()) {
777 CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
778 args.GetReturnValue().Set(port);
779 }
780 }
781
InitWorker(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)782 void InitWorker(Local<Object> target,
783 Local<Value> unused,
784 Local<Context> context,
785 void* priv) {
786 Environment* env = Environment::GetCurrent(context);
787
788 {
789 Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
790
791 w->InstanceTemplate()->SetInternalFieldCount(
792 Worker::kInternalFieldCount);
793 w->Inherit(AsyncWrap::GetConstructorTemplate(env));
794
795 env->SetProtoMethod(w, "startThread", Worker::StartThread);
796 env->SetProtoMethod(w, "stopThread", Worker::StopThread);
797 env->SetProtoMethod(w, "ref", Worker::Ref);
798 env->SetProtoMethod(w, "unref", Worker::Unref);
799 env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
800 env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
801
802 Local<String> workerString =
803 FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
804 w->SetClassName(workerString);
805 target->Set(env->context(),
806 workerString,
807 w->GetFunction(env->context()).ToLocalChecked()).Check();
808 }
809
810 {
811 Local<FunctionTemplate> wst = FunctionTemplate::New(env->isolate());
812
813 wst->InstanceTemplate()->SetInternalFieldCount(
814 WorkerHeapSnapshotTaker::kInternalFieldCount);
815 wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
816
817 Local<String> wst_string =
818 FIXED_ONE_BYTE_STRING(env->isolate(), "WorkerHeapSnapshotTaker");
819 wst->SetClassName(wst_string);
820 env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
821 }
822
823 env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
824
825 target
826 ->Set(env->context(),
827 env->thread_id_string(),
828 Number::New(env->isolate(), static_cast<double>(env->thread_id())))
829 .Check();
830
831 target
832 ->Set(env->context(),
833 FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
834 Boolean::New(env->isolate(), env->is_main_thread()))
835 .Check();
836
837 target
838 ->Set(env->context(),
839 FIXED_ONE_BYTE_STRING(env->isolate(), "ownsProcessState"),
840 Boolean::New(env->isolate(), env->owns_process_state()))
841 .Check();
842
843 if (!env->is_main_thread()) {
844 target
845 ->Set(env->context(),
846 FIXED_ONE_BYTE_STRING(env->isolate(), "resourceLimits"),
847 env->worker_context()->GetResourceLimits(env->isolate()))
848 .Check();
849 }
850
851 NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
852 NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
853 NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
854 NODE_DEFINE_CONSTANT(target, kStackSizeMb);
855 NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
856 }
857
858 } // anonymous namespace
859
860 } // namespace worker
861 } // namespace node
862
863 NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
864