1 #include "node_messaging.h"
2
3 #include "async_wrap-inl.h"
4 #include "debug_utils-inl.h"
5 #include "memory_tracker-inl.h"
6 #include "node_contextify.h"
7 #include "node_buffer.h"
8 #include "node_errors.h"
9 #include "node_process-inl.h"
10 #include "util-inl.h"
11
12 using node::contextify::ContextifyContext;
13 using node::errors::TryCatchScope;
14 using v8::Array;
15 using v8::ArrayBuffer;
16 using v8::BackingStore;
17 using v8::CompiledWasmModule;
18 using v8::Context;
19 using v8::EscapableHandleScope;
20 using v8::Function;
21 using v8::FunctionCallbackInfo;
22 using v8::FunctionTemplate;
23 using v8::Global;
24 using v8::HandleScope;
25 using v8::Isolate;
26 using v8::Just;
27 using v8::Local;
28 using v8::Maybe;
29 using v8::MaybeLocal;
30 using v8::Nothing;
31 using v8::Object;
32 using v8::SharedArrayBuffer;
33 using v8::String;
34 using v8::Symbol;
35 using v8::Value;
36 using v8::ValueDeserializer;
37 using v8::ValueSerializer;
38 using v8::WasmModuleObject;
39
40 namespace node {
41
42 using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
43
GetTransferMode() const44 BaseObject::TransferMode BaseObject::GetTransferMode() const {
45 return BaseObject::TransferMode::kUntransferable;
46 }
47
TransferForMessaging()48 std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
49 return CloneForMessaging();
50 }
51
CloneForMessaging() const52 std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
53 return {};
54 }
55
NestedTransferables() const56 Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
57 return Just(BaseObjectList {});
58 }
59
FinalizeTransferRead(Local<Context> context,ValueDeserializer * deserializer)60 Maybe<bool> BaseObject::FinalizeTransferRead(
61 Local<Context> context, ValueDeserializer* deserializer) {
62 return Just(true);
63 }
64
65 namespace worker {
66
FinalizeTransferWrite(Local<Context> context,ValueSerializer * serializer)67 Maybe<bool> TransferData::FinalizeTransferWrite(
68 Local<Context> context, ValueSerializer* serializer) {
69 return Just(true);
70 }
71
Message(MallocedBuffer<char> && buffer)72 Message::Message(MallocedBuffer<char>&& buffer)
73 : main_message_buf_(std::move(buffer)) {}
74
IsCloseMessage() const75 bool Message::IsCloseMessage() const {
76 return main_message_buf_.data == nullptr;
77 }
78
79 namespace {
80
81 // This is used to tell V8 how to read transferred host objects, like other
82 // `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
83 class DeserializerDelegate : public ValueDeserializer::Delegate {
84 public:
DeserializerDelegate(Message * m,Environment * env,const std::vector<BaseObjectPtr<BaseObject>> & host_objects,const std::vector<Local<SharedArrayBuffer>> & shared_array_buffers,const std::vector<CompiledWasmModule> & wasm_modules)85 DeserializerDelegate(
86 Message* m,
87 Environment* env,
88 const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
89 const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
90 const std::vector<CompiledWasmModule>& wasm_modules)
91 : host_objects_(host_objects),
92 shared_array_buffers_(shared_array_buffers),
93 wasm_modules_(wasm_modules) {}
94
ReadHostObject(Isolate * isolate)95 MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
96 // Identifying the index in the message's BaseObject array is sufficient.
97 uint32_t id;
98 if (!deserializer->ReadUint32(&id))
99 return MaybeLocal<Object>();
100 CHECK_LE(id, host_objects_.size());
101 return host_objects_[id]->object(isolate);
102 }
103
GetSharedArrayBufferFromId(Isolate * isolate,uint32_t clone_id)104 MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
105 Isolate* isolate, uint32_t clone_id) override {
106 CHECK_LE(clone_id, shared_array_buffers_.size());
107 return shared_array_buffers_[clone_id];
108 }
109
GetWasmModuleFromId(Isolate * isolate,uint32_t transfer_id)110 MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
111 Isolate* isolate, uint32_t transfer_id) override {
112 CHECK_LE(transfer_id, wasm_modules_.size());
113 return WasmModuleObject::FromCompiledModule(
114 isolate, wasm_modules_[transfer_id]);
115 }
116
117 ValueDeserializer* deserializer = nullptr;
118
119 private:
120 const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
121 const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
122 const std::vector<CompiledWasmModule>& wasm_modules_;
123 };
124
125 } // anonymous namespace
126
Deserialize(Environment * env,Local<Context> context)127 MaybeLocal<Value> Message::Deserialize(Environment* env,
128 Local<Context> context) {
129 CHECK(!IsCloseMessage());
130
131 EscapableHandleScope handle_scope(env->isolate());
132 Context::Scope context_scope(context);
133
134 // Create all necessary objects for transferables, e.g. MessagePort handles.
135 std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
136 auto cleanup = OnScopeLeave([&]() {
137 for (BaseObjectPtr<BaseObject> object : host_objects) {
138 if (!object) continue;
139
140 // If the function did not finish successfully, host_objects will contain
141 // a list of objects that will never be passed to JS. Therefore, we
142 // destroy them here.
143 object->Detach();
144 }
145 });
146
147 for (uint32_t i = 0; i < transferables_.size(); ++i) {
148 TransferData* data = transferables_[i].get();
149 host_objects[i] = data->Deserialize(
150 env, context, std::move(transferables_[i]));
151 if (!host_objects[i]) return {};
152 }
153 transferables_.clear();
154
155 std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
156 // Attach all transferred SharedArrayBuffers to their new Isolate.
157 for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
158 Local<SharedArrayBuffer> sab =
159 SharedArrayBuffer::New(env->isolate(),
160 std::move(shared_array_buffers_[i]));
161 shared_array_buffers.push_back(sab);
162 }
163 shared_array_buffers_.clear();
164
165 DeserializerDelegate delegate(
166 this, env, host_objects, shared_array_buffers, wasm_modules_);
167 ValueDeserializer deserializer(
168 env->isolate(),
169 reinterpret_cast<const uint8_t*>(main_message_buf_.data),
170 main_message_buf_.size,
171 &delegate);
172 delegate.deserializer = &deserializer;
173
174 // Attach all transferred ArrayBuffers to their new Isolate.
175 for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
176 Local<ArrayBuffer> ab =
177 ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
178 deserializer.TransferArrayBuffer(i, ab);
179 }
180 array_buffers_.clear();
181
182 if (deserializer.ReadHeader(context).IsNothing())
183 return {};
184 Local<Value> return_value;
185 if (!deserializer.ReadValue(context).ToLocal(&return_value))
186 return {};
187
188 for (BaseObjectPtr<BaseObject> base_object : host_objects) {
189 if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
190 return {};
191 }
192
193 host_objects.clear();
194 return handle_scope.Escape(return_value);
195 }
196
AddSharedArrayBuffer(std::shared_ptr<BackingStore> backing_store)197 void Message::AddSharedArrayBuffer(
198 std::shared_ptr<BackingStore> backing_store) {
199 shared_array_buffers_.emplace_back(std::move(backing_store));
200 }
201
AddTransferable(std::unique_ptr<TransferData> && data)202 void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
203 transferables_.emplace_back(std::move(data));
204 }
205
AddWASMModule(CompiledWasmModule && mod)206 uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
207 wasm_modules_.emplace_back(std::move(mod));
208 return wasm_modules_.size() - 1;
209 }
210
211 namespace {
212
GetEmitMessageFunction(Local<Context> context)213 MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
214 Isolate* isolate = context->GetIsolate();
215 Local<Object> per_context_bindings;
216 Local<Value> emit_message_val;
217 if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
218 !per_context_bindings->Get(context,
219 FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
220 .ToLocal(&emit_message_val)) {
221 return MaybeLocal<Function>();
222 }
223 CHECK(emit_message_val->IsFunction());
224 return emit_message_val.As<Function>();
225 }
226
GetDOMException(Local<Context> context)227 MaybeLocal<Function> GetDOMException(Local<Context> context) {
228 Isolate* isolate = context->GetIsolate();
229 Local<Object> per_context_bindings;
230 Local<Value> domexception_ctor_val;
231 if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
232 !per_context_bindings->Get(context,
233 FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
234 .ToLocal(&domexception_ctor_val)) {
235 return MaybeLocal<Function>();
236 }
237 CHECK(domexception_ctor_val->IsFunction());
238 Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
239 return domexception_ctor;
240 }
241
ThrowDataCloneException(Local<Context> context,Local<String> message)242 void ThrowDataCloneException(Local<Context> context, Local<String> message) {
243 Isolate* isolate = context->GetIsolate();
244 Local<Value> argv[] = {message,
245 FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
246 Local<Value> exception;
247 Local<Function> domexception_ctor;
248 if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
249 !domexception_ctor->NewInstance(context, arraysize(argv), argv)
250 .ToLocal(&exception)) {
251 return;
252 }
253 isolate->ThrowException(exception);
254 }
255
256 // This tells V8 how to serialize objects that it does not understand
257 // (e.g. C++ objects) into the output buffer, in a way that our own
258 // DeserializerDelegate understands how to unpack.
259 class SerializerDelegate : public ValueSerializer::Delegate {
260 public:
SerializerDelegate(Environment * env,Local<Context> context,Message * m)261 SerializerDelegate(Environment* env, Local<Context> context, Message* m)
262 : env_(env), context_(context), msg_(m) {}
263
ThrowDataCloneError(Local<String> message)264 void ThrowDataCloneError(Local<String> message) override {
265 ThrowDataCloneException(context_, message);
266 }
267
WriteHostObject(Isolate * isolate,Local<Object> object)268 Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
269 if (env_->base_object_ctor_template()->HasInstance(object)) {
270 return WriteHostObject(
271 BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
272 }
273
274 ThrowDataCloneError(env_->clone_unsupported_type_str());
275 return Nothing<bool>();
276 }
277
GetSharedArrayBufferId(Isolate * isolate,Local<SharedArrayBuffer> shared_array_buffer)278 Maybe<uint32_t> GetSharedArrayBufferId(
279 Isolate* isolate,
280 Local<SharedArrayBuffer> shared_array_buffer) override {
281 uint32_t i;
282 for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
283 if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
284 shared_array_buffer) {
285 return Just(i);
286 }
287 }
288
289 seen_shared_array_buffers_.emplace_back(
290 Global<SharedArrayBuffer> { isolate, shared_array_buffer });
291 msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
292 return Just(i);
293 }
294
GetWasmModuleTransferId(Isolate * isolate,Local<WasmModuleObject> module)295 Maybe<uint32_t> GetWasmModuleTransferId(
296 Isolate* isolate, Local<WasmModuleObject> module) override {
297 return Just(msg_->AddWASMModule(module->GetCompiledModule()));
298 }
299
Finish(Local<Context> context)300 Maybe<bool> Finish(Local<Context> context) {
301 for (uint32_t i = 0; i < host_objects_.size(); i++) {
302 BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
303 std::unique_ptr<TransferData> data;
304 if (i < first_cloned_object_index_)
305 data = host_object->TransferForMessaging();
306 if (!data)
307 data = host_object->CloneForMessaging();
308 if (!data) return Nothing<bool>();
309 if (data->FinalizeTransferWrite(context, serializer).IsNothing())
310 return Nothing<bool>();
311 msg_->AddTransferable(std::move(data));
312 }
313 return Just(true);
314 }
315
AddHostObject(BaseObjectPtr<BaseObject> host_object)316 inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
317 // Make sure we have not started serializing the value itself yet.
318 CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
319 host_objects_.emplace_back(std::move(host_object));
320 }
321
322 // Some objects in the transfer list may register sub-objects that can be
323 // transferred. This could e.g. be a public JS wrapper object, such as a
324 // FileHandle, that is registering its C++ handle for transfer.
AddNestedHostObjects()325 inline Maybe<bool> AddNestedHostObjects() {
326 for (size_t i = 0; i < host_objects_.size(); i++) {
327 std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
328 if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
329 return Nothing<bool>();
330 for (auto nested_transferable : nested_transferables) {
331 if (std::find(host_objects_.begin(),
332 host_objects_.end(),
333 nested_transferable) == host_objects_.end()) {
334 AddHostObject(nested_transferable);
335 }
336 }
337 }
338 return Just(true);
339 }
340
341 ValueSerializer* serializer = nullptr;
342
343 private:
WriteHostObject(BaseObjectPtr<BaseObject> host_object)344 Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
345 BaseObject::TransferMode mode = host_object->GetTransferMode();
346 if (mode == BaseObject::TransferMode::kUntransferable) {
347 ThrowDataCloneError(env_->clone_unsupported_type_str());
348 return Nothing<bool>();
349 }
350
351 for (uint32_t i = 0; i < host_objects_.size(); i++) {
352 if (host_objects_[i] == host_object) {
353 serializer->WriteUint32(i);
354 return Just(true);
355 }
356 }
357
358 if (mode == BaseObject::TransferMode::kTransferable) {
359 // TODO(addaleax): This message code is too specific. Fix that in a
360 // semver-major follow-up.
361 THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
362 return Nothing<bool>();
363 }
364
365 CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
366 uint32_t index = host_objects_.size();
367 if (first_cloned_object_index_ == SIZE_MAX)
368 first_cloned_object_index_ = index;
369 serializer->WriteUint32(index);
370 host_objects_.push_back(host_object);
371 return Just(true);
372 }
373
374 Environment* env_;
375 Local<Context> context_;
376 Message* msg_;
377 std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
378 std::vector<BaseObjectPtr<BaseObject>> host_objects_;
379 size_t first_cloned_object_index_ = SIZE_MAX;
380
381 friend class worker::Message;
382 };
383
384 } // anonymous namespace
385
Serialize(Environment * env,Local<Context> context,Local<Value> input,const TransferList & transfer_list_v,Local<Object> source_port)386 Maybe<bool> Message::Serialize(Environment* env,
387 Local<Context> context,
388 Local<Value> input,
389 const TransferList& transfer_list_v,
390 Local<Object> source_port) {
391 HandleScope handle_scope(env->isolate());
392 Context::Scope context_scope(context);
393
394 // Verify that we're not silently overwriting an existing message.
395 CHECK(main_message_buf_.is_empty());
396
397 SerializerDelegate delegate(env, context, this);
398 ValueSerializer serializer(env->isolate(), &delegate);
399 delegate.serializer = &serializer;
400
401 std::vector<Local<ArrayBuffer>> array_buffers;
402 for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
403 Local<Value> entry = transfer_list_v[i];
404 if (entry->IsObject()) {
405 // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
406 // for details.
407 bool untransferable;
408 if (!entry.As<Object>()->HasPrivate(
409 context,
410 env->untransferable_object_private_symbol())
411 .To(&untransferable)) {
412 return Nothing<bool>();
413 }
414 if (untransferable) continue;
415 }
416
417 // Currently, we support ArrayBuffers and BaseObjects for which
418 // GetTransferMode() does not return kUntransferable.
419 if (entry->IsArrayBuffer()) {
420 Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
421 // If we cannot render the ArrayBuffer unusable in this Isolate,
422 // copying the buffer will have to do.
423 // Note that we can currently transfer ArrayBuffers even if they were
424 // not allocated by Node’s ArrayBufferAllocator in the first place,
425 // because we pass the underlying v8::BackingStore around rather than
426 // raw data *and* an Isolate with a non-default ArrayBuffer allocator
427 // is always going to outlive any Workers it creates, and so will its
428 // allocator along with it.
429 if (!ab->IsDetachable()) continue;
430 if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
431 array_buffers.end()) {
432 ThrowDataCloneException(
433 context,
434 FIXED_ONE_BYTE_STRING(
435 env->isolate(),
436 "Transfer list contains duplicate ArrayBuffer"));
437 return Nothing<bool>();
438 }
439 // We simply use the array index in the `array_buffers` list as the
440 // ID that we write into the serialized buffer.
441 uint32_t id = array_buffers.size();
442 array_buffers.push_back(ab);
443 serializer.TransferArrayBuffer(id, ab);
444 continue;
445 } else if (env->base_object_ctor_template()->HasInstance(entry)) {
446 // Check if the source MessagePort is being transferred.
447 if (!source_port.IsEmpty() && entry == source_port) {
448 ThrowDataCloneException(
449 context,
450 FIXED_ONE_BYTE_STRING(env->isolate(),
451 "Transfer list contains source port"));
452 return Nothing<bool>();
453 }
454 BaseObjectPtr<BaseObject> host_object {
455 Unwrap<BaseObject>(entry.As<Object>()) };
456 if (env->message_port_constructor_template()->HasInstance(entry) &&
457 (!host_object ||
458 static_cast<MessagePort*>(host_object.get())->IsDetached())) {
459 ThrowDataCloneException(
460 context,
461 FIXED_ONE_BYTE_STRING(
462 env->isolate(),
463 "MessagePort in transfer list is already detached"));
464 return Nothing<bool>();
465 }
466 if (std::find(delegate.host_objects_.begin(),
467 delegate.host_objects_.end(),
468 host_object) != delegate.host_objects_.end()) {
469 ThrowDataCloneException(
470 context,
471 String::Concat(env->isolate(),
472 FIXED_ONE_BYTE_STRING(
473 env->isolate(),
474 "Transfer list contains duplicate "),
475 entry.As<Object>()->GetConstructorName()));
476 return Nothing<bool>();
477 }
478 if (host_object && host_object->GetTransferMode() !=
479 BaseObject::TransferMode::kUntransferable) {
480 delegate.AddHostObject(host_object);
481 continue;
482 }
483 }
484
485 THROW_ERR_INVALID_TRANSFER_OBJECT(env);
486 return Nothing<bool>();
487 }
488 if (delegate.AddNestedHostObjects().IsNothing())
489 return Nothing<bool>();
490
491 serializer.WriteHeader();
492 if (serializer.WriteValue(context, input).IsNothing()) {
493 return Nothing<bool>();
494 }
495
496 for (Local<ArrayBuffer> ab : array_buffers) {
497 // If serialization succeeded, we render it inaccessible in this Isolate.
498 std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
499 ab->Detach();
500
501 array_buffers_.emplace_back(std::move(backing_store));
502 }
503
504 if (delegate.Finish(context).IsNothing())
505 return Nothing<bool>();
506
507 // The serializer gave us a buffer allocated using `malloc()`.
508 std::pair<uint8_t*, size_t> data = serializer.Release();
509 CHECK_NOT_NULL(data.first);
510 main_message_buf_ =
511 MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
512 return Just(true);
513 }
514
MemoryInfo(MemoryTracker * tracker) const515 void Message::MemoryInfo(MemoryTracker* tracker) const {
516 tracker->TrackField("array_buffers_", array_buffers_);
517 tracker->TrackField("shared_array_buffers", shared_array_buffers_);
518 tracker->TrackField("transferables", transferables_);
519 }
520
MessagePortData(MessagePort * owner)521 MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
522
~MessagePortData()523 MessagePortData::~MessagePortData() {
524 CHECK_NULL(owner_);
525 Disentangle();
526 }
527
MemoryInfo(MemoryTracker * tracker) const528 void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
529 Mutex::ScopedLock lock(mutex_);
530 tracker->TrackField("incoming_messages", incoming_messages_);
531 }
532
AddToIncomingQueue(Message && message)533 void MessagePortData::AddToIncomingQueue(Message&& message) {
534 // This function will be called by other threads.
535 Mutex::ScopedLock lock(mutex_);
536 incoming_messages_.emplace_back(std::move(message));
537
538 if (owner_ != nullptr) {
539 Debug(owner_, "Adding message to incoming queue");
540 owner_->TriggerAsync();
541 }
542 }
543
Entangle(MessagePortData * a,MessagePortData * b)544 void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
545 CHECK_NULL(a->sibling_);
546 CHECK_NULL(b->sibling_);
547 a->sibling_ = b;
548 b->sibling_ = a;
549 a->sibling_mutex_ = b->sibling_mutex_;
550 }
551
Disentangle()552 void MessagePortData::Disentangle() {
553 // Grab a copy of the sibling mutex, then replace it so that each sibling
554 // has its own sibling_mutex_ now.
555 std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
556 Mutex::ScopedLock sibling_lock(*sibling_mutex);
557 sibling_mutex_ = std::make_shared<Mutex>();
558
559 MessagePortData* sibling = sibling_;
560 if (sibling_ != nullptr) {
561 sibling_->sibling_ = nullptr;
562 sibling_ = nullptr;
563 }
564
565 // We close MessagePorts after disentanglement, so we enqueue a corresponding
566 // message and trigger the corresponding uv_async_t to let them know that
567 // this happened.
568 AddToIncomingQueue(Message());
569 if (sibling != nullptr) {
570 sibling->AddToIncomingQueue(Message());
571 }
572 }
573
~MessagePort()574 MessagePort::~MessagePort() {
575 if (data_) Detach();
576 }
577
MessagePort(Environment * env,Local<Context> context,Local<Object> wrap)578 MessagePort::MessagePort(Environment* env,
579 Local<Context> context,
580 Local<Object> wrap)
581 : HandleWrap(env,
582 wrap,
583 reinterpret_cast<uv_handle_t*>(&async_),
584 AsyncWrap::PROVIDER_MESSAGEPORT),
585 data_(new MessagePortData(this)) {
586 auto onmessage = [](uv_async_t* handle) {
587 // Called when data has been put into the queue.
588 MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
589 channel->OnMessage();
590 };
591
592 CHECK_EQ(uv_async_init(env->event_loop(),
593 &async_,
594 onmessage), 0);
595 // Reset later to indicate success of the constructor.
596 bool succeeded = false;
597 auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
598
599 Local<Value> fn;
600 if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
601 return;
602
603 if (fn->IsFunction()) {
604 Local<Function> init = fn.As<Function>();
605 if (init->Call(context, wrap, 0, nullptr).IsEmpty())
606 return;
607 }
608
609 Local<Function> emit_message_fn;
610 if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
611 return;
612 emit_message_fn_.Reset(env->isolate(), emit_message_fn);
613
614 succeeded = true;
615 Debug(this, "Created message port");
616 }
617
IsDetached() const618 bool MessagePort::IsDetached() const {
619 return data_ == nullptr || IsHandleClosing();
620 }
621
TriggerAsync()622 void MessagePort::TriggerAsync() {
623 if (IsHandleClosing()) return;
624 CHECK_EQ(uv_async_send(&async_), 0);
625 }
626
Close(v8::Local<v8::Value> close_callback)627 void MessagePort::Close(v8::Local<v8::Value> close_callback) {
628 Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
629
630 if (data_) {
631 // Wrap this call with accessing the mutex, so that TriggerAsync()
632 // can check IsHandleClosing() without race conditions.
633 Mutex::ScopedLock sibling_lock(data_->mutex_);
634 HandleWrap::Close(close_callback);
635 } else {
636 HandleWrap::Close(close_callback);
637 }
638 }
639
New(const FunctionCallbackInfo<Value> & args)640 void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
641 // This constructor just throws an error. Unfortunately, we can’t use V8’s
642 // ConstructorBehavior::kThrow, as that also removes the prototype from the
643 // class (i.e. makes it behave like an arrow function).
644 Environment* env = Environment::GetCurrent(args);
645 THROW_ERR_CONSTRUCT_CALL_INVALID(env);
646 }
647
New(Environment * env,Local<Context> context,std::unique_ptr<MessagePortData> data)648 MessagePort* MessagePort::New(
649 Environment* env,
650 Local<Context> context,
651 std::unique_ptr<MessagePortData> data) {
652 Context::Scope context_scope(context);
653 Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
654
655 // Construct a new instance, then assign the listener instance and possibly
656 // the MessagePortData to it.
657 Local<Object> instance;
658 if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
659 return nullptr;
660 MessagePort* port = new MessagePort(env, context, instance);
661 CHECK_NOT_NULL(port);
662 if (port->IsHandleClosing()) {
663 // Construction failed with an exception.
664 return nullptr;
665 }
666
667 if (data) {
668 port->Detach();
669 port->data_ = std::move(data);
670
671 // This lock is here to avoid race conditions with the `owner_` read
672 // in AddToIncomingQueue(). (This would likely be unproblematic without it,
673 // but it's better to be safe than sorry.)
674 Mutex::ScopedLock lock(port->data_->mutex_);
675 port->data_->owner_ = port;
676 // If the existing MessagePortData object had pending messages, this is
677 // the easiest way to run that queue.
678 port->TriggerAsync();
679 }
680 return port;
681 }
682
ReceiveMessage(Local<Context> context,bool only_if_receiving)683 MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
684 bool only_if_receiving) {
685 Message received;
686 {
687 // Get the head of the message queue.
688 Mutex::ScopedLock lock(data_->mutex_);
689
690 Debug(this, "MessagePort has message");
691
692 bool wants_message = receiving_messages_ || !only_if_receiving;
693 // We have nothing to do if:
694 // - There are no pending messages
695 // - We are not intending to receive messages, and the message we would
696 // receive is not the final "close" message.
697 if (data_->incoming_messages_.empty() ||
698 (!wants_message &&
699 !data_->incoming_messages_.front().IsCloseMessage())) {
700 return env()->no_message_symbol();
701 }
702
703 received = std::move(data_->incoming_messages_.front());
704 data_->incoming_messages_.pop_front();
705 }
706
707 if (received.IsCloseMessage()) {
708 Close();
709 return env()->no_message_symbol();
710 }
711
712 if (!env()->can_call_into_js()) return MaybeLocal<Value>();
713
714 return received.Deserialize(env(), context);
715 }
716
OnMessage()717 void MessagePort::OnMessage() {
718 Debug(this, "Running MessagePort::OnMessage()");
719 HandleScope handle_scope(env()->isolate());
720 Local<Context> context = object(env()->isolate())->CreationContext();
721
722 size_t processing_limit;
723 {
724 Mutex::ScopedLock(data_->mutex_);
725 processing_limit = std::max(data_->incoming_messages_.size(),
726 static_cast<size_t>(1000));
727 }
728
729 // data_ can only ever be modified by the owner thread, so no need to lock.
730 // However, the message port may be transferred while it is processing
731 // messages, so we need to check that this handle still owns its `data_` field
732 // on every iteration.
733 while (data_) {
734 if (processing_limit-- == 0) {
735 // Prevent event loop starvation by only processing those messages without
736 // interruption that were already present when the OnMessage() call was
737 // first triggered, but at least 1000 messages because otherwise the
738 // overhead of repeatedly triggering the uv_async_t instance becomes
739 // noticeable, at least on Windows.
740 // (That might require more investigation by somebody more familiar with
741 // Windows.)
742 TriggerAsync();
743 return;
744 }
745
746 HandleScope handle_scope(env()->isolate());
747 Context::Scope context_scope(context);
748 Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
749
750 Local<Value> payload;
751 Local<Value> message_error;
752 Local<Value> argv[2];
753
754 {
755 // Catch any exceptions from parsing the message itself (not from
756 // emitting it) as 'messageeror' events.
757 TryCatchScope try_catch(env());
758 if (!ReceiveMessage(context, true).ToLocal(&payload)) {
759 if (try_catch.HasCaught() && !try_catch.HasTerminated())
760 message_error = try_catch.Exception();
761 goto reschedule;
762 }
763 }
764 if (payload == env()->no_message_symbol()) break;
765
766 if (!env()->can_call_into_js()) {
767 Debug(this, "MessagePort drains queue because !can_call_into_js()");
768 // In this case there is nothing to do but to drain the current queue.
769 continue;
770 }
771
772 argv[0] = payload;
773 argv[1] = env()->message_string();
774
775 if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
776 reschedule:
777 if (!message_error.IsEmpty()) {
778 argv[0] = message_error;
779 argv[1] = env()->messageerror_string();
780 USE(MakeCallback(emit_message, arraysize(argv), argv));
781 }
782
783 // Re-schedule OnMessage() execution in case of failure.
784 if (data_)
785 TriggerAsync();
786 return;
787 }
788 }
789 }
790
OnClose()791 void MessagePort::OnClose() {
792 Debug(this, "MessagePort::OnClose()");
793 if (data_) {
794 // Detach() returns move(data_).
795 Detach()->Disentangle();
796 }
797 }
798
Detach()799 std::unique_ptr<MessagePortData> MessagePort::Detach() {
800 CHECK(data_);
801 Mutex::ScopedLock lock(data_->mutex_);
802 data_->owner_ = nullptr;
803 return std::move(data_);
804 }
805
GetTransferMode() const806 BaseObject::TransferMode MessagePort::GetTransferMode() const {
807 if (IsDetached())
808 return BaseObject::TransferMode::kUntransferable;
809 return BaseObject::TransferMode::kTransferable;
810 }
811
TransferForMessaging()812 std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
813 Close();
814 return Detach();
815 }
816
Deserialize(Environment * env,Local<Context> context,std::unique_ptr<TransferData> self)817 BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
818 Environment* env,
819 Local<Context> context,
820 std::unique_ptr<TransferData> self) {
821 return BaseObjectPtr<MessagePort> { MessagePort::New(
822 env, context,
823 static_unique_pointer_cast<MessagePortData>(std::move(self))) };
824 }
825
PostMessage(Environment * env,Local<Context> context,Local<Value> message_v,const TransferList & transfer_v)826 Maybe<bool> MessagePort::PostMessage(Environment* env,
827 Local<Context> context,
828 Local<Value> message_v,
829 const TransferList& transfer_v) {
830 Isolate* isolate = env->isolate();
831 Local<Object> obj = object(isolate);
832
833 Message msg;
834
835 // Per spec, we need to both check if transfer list has the source port, and
836 // serialize the input message, even if the MessagePort is closed or detached.
837
838 Maybe<bool> serialization_maybe =
839 msg.Serialize(env, context, message_v, transfer_v, obj);
840 if (data_ == nullptr) {
841 return serialization_maybe;
842 }
843 if (serialization_maybe.IsNothing()) {
844 return Nothing<bool>();
845 }
846
847 Mutex::ScopedLock lock(*data_->sibling_mutex_);
848 bool doomed = false;
849
850 // Check if the target port is posted to itself.
851 if (data_->sibling_ != nullptr) {
852 for (const auto& transferable : msg.transferables()) {
853 if (data_->sibling_ == transferable.get()) {
854 doomed = true;
855 ProcessEmitWarning(env, "The target port was posted to itself, and "
856 "the communication channel was lost");
857 break;
858 }
859 }
860 }
861
862 if (data_->sibling_ == nullptr || doomed)
863 return Just(true);
864
865 data_->sibling_->AddToIncomingQueue(std::move(msg));
866 return Just(true);
867 }
868
ReadIterable(Environment * env,Local<Context> context,TransferList & transfer_list,Local<Value> object)869 static Maybe<bool> ReadIterable(Environment* env,
870 Local<Context> context,
871 // NOLINTNEXTLINE(runtime/references)
872 TransferList& transfer_list,
873 Local<Value> object) {
874 if (!object->IsObject()) return Just(false);
875
876 if (object->IsArray()) {
877 Local<Array> arr = object.As<Array>();
878 size_t length = arr->Length();
879 transfer_list.AllocateSufficientStorage(length);
880 for (size_t i = 0; i < length; i++) {
881 if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
882 return Nothing<bool>();
883 }
884 return Just(true);
885 }
886
887 Isolate* isolate = env->isolate();
888 Local<Value> iterator_method;
889 if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
890 .ToLocal(&iterator_method)) return Nothing<bool>();
891 if (!iterator_method->IsFunction()) return Just(false);
892
893 Local<Value> iterator;
894 if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
895 .ToLocal(&iterator)) return Nothing<bool>();
896 if (!iterator->IsObject()) return Just(false);
897
898 Local<Value> next;
899 if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
900 return Nothing<bool>();
901 if (!next->IsFunction()) return Just(false);
902
903 std::vector<Local<Value>> entries;
904 while (env->can_call_into_js()) {
905 Local<Value> result;
906 if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
907 .ToLocal(&result)) return Nothing<bool>();
908 if (!result->IsObject()) return Just(false);
909
910 Local<Value> done;
911 if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
912 return Nothing<bool>();
913 if (done->BooleanValue(isolate)) break;
914
915 Local<Value> val;
916 if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
917 return Nothing<bool>();
918 entries.push_back(val);
919 }
920
921 transfer_list.AllocateSufficientStorage(entries.size());
922 std::copy(entries.begin(), entries.end(), &transfer_list[0]);
923 return Just(true);
924 }
925
PostMessage(const FunctionCallbackInfo<Value> & args)926 void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
927 Environment* env = Environment::GetCurrent(args);
928 Local<Object> obj = args.This();
929 Local<Context> context = obj->CreationContext();
930
931 if (args.Length() == 0) {
932 return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
933 "MessagePort.postMessage");
934 }
935
936 if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
937 // Browsers ignore null or undefined, and otherwise accept an array or an
938 // options object.
939 return THROW_ERR_INVALID_ARG_TYPE(env,
940 "Optional transferList argument must be an iterable");
941 }
942
943 TransferList transfer_list;
944 if (args[1]->IsObject()) {
945 bool was_iterable;
946 if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
947 return;
948 if (!was_iterable) {
949 Local<Value> transfer_option;
950 if (!args[1].As<Object>()->Get(context, env->transfer_string())
951 .ToLocal(&transfer_option)) return;
952 if (!transfer_option->IsUndefined()) {
953 if (!ReadIterable(env, context, transfer_list, transfer_option)
954 .To(&was_iterable)) return;
955 if (!was_iterable) {
956 return THROW_ERR_INVALID_ARG_TYPE(env,
957 "Optional options.transfer argument must be an iterable");
958 }
959 }
960 }
961 }
962
963 MessagePort* port = Unwrap<MessagePort>(args.This());
964 // Even if the backing MessagePort object has already been deleted, we still
965 // want to serialize the message to ensure spec-compliant behavior w.r.t.
966 // transfers.
967 if (port == nullptr || port->IsHandleClosing()) {
968 Message msg;
969 USE(msg.Serialize(env, context, args[0], transfer_list, obj));
970 return;
971 }
972
973 Maybe<bool> res = port->PostMessage(env, context, args[0], transfer_list);
974 if (res.IsJust())
975 args.GetReturnValue().Set(res.FromJust());
976 }
977
Start()978 void MessagePort::Start() {
979 Debug(this, "Start receiving messages");
980 receiving_messages_ = true;
981 Mutex::ScopedLock lock(data_->mutex_);
982 if (!data_->incoming_messages_.empty())
983 TriggerAsync();
984 }
985
Stop()986 void MessagePort::Stop() {
987 Debug(this, "Stop receiving messages");
988 receiving_messages_ = false;
989 }
990
Start(const FunctionCallbackInfo<Value> & args)991 void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
992 MessagePort* port;
993 ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
994 if (!port->data_) {
995 return;
996 }
997 port->Start();
998 }
999
Stop(const FunctionCallbackInfo<Value> & args)1000 void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
1001 MessagePort* port;
1002 CHECK(args[0]->IsObject());
1003 ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1004 if (!port->data_) {
1005 return;
1006 }
1007 port->Stop();
1008 }
1009
Drain(const FunctionCallbackInfo<Value> & args)1010 void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1011 MessagePort* port;
1012 ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1013 port->OnMessage();
1014 }
1015
ReceiveMessage(const FunctionCallbackInfo<Value> & args)1016 void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1017 Environment* env = Environment::GetCurrent(args);
1018 if (!args[0]->IsObject() ||
1019 !env->message_port_constructor_template()->HasInstance(args[0])) {
1020 return THROW_ERR_INVALID_ARG_TYPE(env,
1021 "The \"port\" argument must be a MessagePort instance");
1022 }
1023 MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1024 if (port == nullptr) {
1025 // Return 'no messages' for a closed port.
1026 args.GetReturnValue().Set(
1027 Environment::GetCurrent(args)->no_message_symbol());
1028 return;
1029 }
1030
1031 MaybeLocal<Value> payload =
1032 port->ReceiveMessage(port->object()->CreationContext(), false);
1033 if (!payload.IsEmpty())
1034 args.GetReturnValue().Set(payload.ToLocalChecked());
1035 }
1036
MoveToContext(const FunctionCallbackInfo<Value> & args)1037 void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1038 Environment* env = Environment::GetCurrent(args);
1039 if (!args[0]->IsObject() ||
1040 !env->message_port_constructor_template()->HasInstance(args[0])) {
1041 return THROW_ERR_INVALID_ARG_TYPE(env,
1042 "The \"port\" argument must be a MessagePort instance");
1043 }
1044 MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1045 if (port == nullptr || port->IsHandleClosing()) {
1046 Isolate* isolate = env->isolate();
1047 THROW_ERR_CLOSED_MESSAGE_PORT(isolate);
1048 return;
1049 }
1050
1051 Local<Value> context_arg = args[1];
1052 ContextifyContext* context_wrapper;
1053 if (!context_arg->IsObject() ||
1054 (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1055 env, context_arg.As<Object>())) == nullptr) {
1056 return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1057 }
1058
1059 std::unique_ptr<MessagePortData> data;
1060 if (!port->IsDetached())
1061 data = port->Detach();
1062
1063 Context::Scope context_scope(context_wrapper->context());
1064 MessagePort* target =
1065 MessagePort::New(env, context_wrapper->context(), std::move(data));
1066 if (target != nullptr)
1067 args.GetReturnValue().Set(target->object());
1068 }
1069
Entangle(MessagePort * a,MessagePort * b)1070 void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1071 Entangle(a, b->data_.get());
1072 }
1073
Entangle(MessagePort * a,MessagePortData * b)1074 void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1075 MessagePortData::Entangle(a->data_.get(), b);
1076 }
1077
MemoryInfo(MemoryTracker * tracker) const1078 void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1079 tracker->TrackField("data", data_);
1080 tracker->TrackField("emit_message_fn", emit_message_fn_);
1081 }
1082
GetMessagePortConstructorTemplate(Environment * env)1083 Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1084 // Factor generating the MessagePort JS constructor into its own piece
1085 // of code, because it is needed early on in the child environment setup.
1086 Local<FunctionTemplate> templ = env->message_port_constructor_template();
1087 if (!templ.IsEmpty())
1088 return templ;
1089
1090 {
1091 Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
1092 m->SetClassName(env->message_port_constructor_string());
1093 m->InstanceTemplate()->SetInternalFieldCount(
1094 MessagePort::kInternalFieldCount);
1095 m->Inherit(HandleWrap::GetConstructorTemplate(env));
1096
1097 env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
1098 env->SetProtoMethod(m, "start", MessagePort::Start);
1099
1100 env->set_message_port_constructor_template(m);
1101 }
1102
1103 return GetMessagePortConstructorTemplate(env);
1104 }
1105
JSTransferable(Environment * env,Local<Object> obj)1106 JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1107 : BaseObject(env, obj) {
1108 MakeWeak();
1109 }
1110
New(const FunctionCallbackInfo<Value> & args)1111 void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1112 CHECK(args.IsConstructCall());
1113 new JSTransferable(Environment::GetCurrent(args), args.This());
1114 }
1115
GetTransferMode() const1116 JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1117 // Implement `kClone in this ? kCloneable : kTransferable`.
1118 HandleScope handle_scope(env()->isolate());
1119 errors::TryCatchScope ignore_exceptions(env());
1120
1121 bool has_clone;
1122 if (!object()->Has(env()->context(),
1123 env()->messaging_clone_symbol()).To(&has_clone)) {
1124 return TransferMode::kUntransferable;
1125 }
1126
1127 return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1128 }
1129
TransferForMessaging()1130 std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1131 return TransferOrClone(TransferMode::kTransferable);
1132 }
1133
CloneForMessaging() const1134 std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1135 return TransferOrClone(TransferMode::kCloneable);
1136 }
1137
TransferOrClone(TransferMode mode) const1138 std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1139 TransferMode mode) const {
1140 // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1141 // which should return an object with `data` and `deserializeInfo` properties;
1142 // `data` is written to the serializer later, and `deserializeInfo` is stored
1143 // on the `TransferData` instance as a string.
1144 HandleScope handle_scope(env()->isolate());
1145 Local<Context> context = env()->isolate()->GetCurrentContext();
1146 Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1147 env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1148
1149 Local<Value> method;
1150 if (!object()->Get(context, method_name).ToLocal(&method)) {
1151 return {};
1152 }
1153 if (method->IsFunction()) {
1154 Local<Value> result_v;
1155 if (!method.As<Function>()->Call(
1156 context, object(), 0, nullptr).ToLocal(&result_v)) {
1157 return {};
1158 }
1159
1160 if (result_v->IsObject()) {
1161 Local<Object> result = result_v.As<Object>();
1162 Local<Value> data;
1163 Local<Value> deserialize_info;
1164 if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1165 !result->Get(context, env()->deserialize_info_string())
1166 .ToLocal(&deserialize_info)) {
1167 return {};
1168 }
1169 Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1170 if (*deserialize_info_str == nullptr) return {};
1171 return std::make_unique<Data>(
1172 *deserialize_info_str, Global<Value>(env()->isolate(), data));
1173 }
1174 }
1175
1176 if (mode == TransferMode::kTransferable)
1177 return TransferOrClone(TransferMode::kCloneable);
1178 else
1179 return {};
1180 }
1181
1182 Maybe<BaseObjectList>
NestedTransferables() const1183 JSTransferable::NestedTransferables() const {
1184 // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1185 HandleScope handle_scope(env()->isolate());
1186 Local<Context> context = env()->isolate()->GetCurrentContext();
1187 Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1188
1189 Local<Value> method;
1190 if (!object()->Get(context, method_name).ToLocal(&method)) {
1191 return Nothing<BaseObjectList>();
1192 }
1193 if (!method->IsFunction()) return Just(BaseObjectList {});
1194
1195 Local<Value> list_v;
1196 if (!method.As<Function>()->Call(
1197 context, object(), 0, nullptr).ToLocal(&list_v)) {
1198 return Nothing<BaseObjectList>();
1199 }
1200 if (!list_v->IsArray()) return Just(BaseObjectList {});
1201 Local<Array> list = list_v.As<Array>();
1202
1203 BaseObjectList ret;
1204 for (size_t i = 0; i < list->Length(); i++) {
1205 Local<Value> value;
1206 if (!list->Get(context, i).ToLocal(&value))
1207 return Nothing<BaseObjectList>();
1208 if (env()->base_object_ctor_template()->HasInstance(value))
1209 ret.emplace_back(Unwrap<BaseObject>(value));
1210 }
1211 return Just(ret);
1212 }
1213
FinalizeTransferRead(Local<Context> context,ValueDeserializer * deserializer)1214 Maybe<bool> JSTransferable::FinalizeTransferRead(
1215 Local<Context> context, ValueDeserializer* deserializer) {
1216 // Call `this[kDeserialize](data)` where `data` comes from the return value
1217 // of `this[kTransfer]()` or `this[kClone]()`.
1218 HandleScope handle_scope(env()->isolate());
1219 Local<Value> data;
1220 if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1221
1222 Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1223 Local<Value> method;
1224 if (!object()->Get(context, method_name).ToLocal(&method)) {
1225 return Nothing<bool>();
1226 }
1227 if (!method->IsFunction()) return Just(true);
1228
1229 if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1230 return Nothing<bool>();
1231 }
1232 return Just(true);
1233 }
1234
Data(std::string && deserialize_info,v8::Global<v8::Value> && data)1235 JSTransferable::Data::Data(std::string&& deserialize_info,
1236 v8::Global<v8::Value>&& data)
1237 : deserialize_info_(std::move(deserialize_info)),
1238 data_(std::move(data)) {}
1239
Deserialize(Environment * env,Local<Context> context,std::unique_ptr<TransferData> self)1240 BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1241 Environment* env,
1242 Local<Context> context,
1243 std::unique_ptr<TransferData> self) {
1244 // Create the JS wrapper object that will later be filled with data passed to
1245 // the `[kDeserialize]()` method on it. This split is necessary, because here
1246 // we need to create an object with the right prototype and internal fields,
1247 // but the actual JS data stored in the serialized data can only be read at
1248 // the end of the stream, after the main message has been read.
1249
1250 if (context != env->context()) {
1251 THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1252 return {};
1253 }
1254 HandleScope handle_scope(env->isolate());
1255 Local<Value> info;
1256 if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1257
1258 Local<Value> ret;
1259 CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1260 if (!env->messaging_deserialize_create_object()->Call(
1261 context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1262 !env->base_object_ctor_template()->HasInstance(ret)) {
1263 return {};
1264 }
1265
1266 return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1267 }
1268
FinalizeTransferWrite(Local<Context> context,ValueSerializer * serializer)1269 Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1270 Local<Context> context, ValueSerializer* serializer) {
1271 HandleScope handle_scope(context->GetIsolate());
1272 auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1273 data_.Reset();
1274 return ret;
1275 }
1276
1277 namespace {
1278
SetDeserializerCreateObjectFunction(const FunctionCallbackInfo<Value> & args)1279 static void SetDeserializerCreateObjectFunction(
1280 const FunctionCallbackInfo<Value>& args) {
1281 Environment* env = Environment::GetCurrent(args);
1282 CHECK(args[0]->IsFunction());
1283 env->set_messaging_deserialize_create_object(args[0].As<Function>());
1284 }
1285
MessageChannel(const FunctionCallbackInfo<Value> & args)1286 static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1287 Environment* env = Environment::GetCurrent(args);
1288 if (!args.IsConstructCall()) {
1289 THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1290 return;
1291 }
1292
1293 Local<Context> context = args.This()->CreationContext();
1294 Context::Scope context_scope(context);
1295
1296 MessagePort* port1 = MessagePort::New(env, context);
1297 if (port1 == nullptr) return;
1298 MessagePort* port2 = MessagePort::New(env, context);
1299 if (port2 == nullptr) {
1300 port1->Close();
1301 return;
1302 }
1303
1304 MessagePort::Entangle(port1, port2);
1305
1306 args.This()->Set(context, env->port1_string(), port1->object())
1307 .Check();
1308 args.This()->Set(context, env->port2_string(), port2->object())
1309 .Check();
1310 }
1311
InitMessaging(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)1312 static void InitMessaging(Local<Object> target,
1313 Local<Value> unused,
1314 Local<Context> context,
1315 void* priv) {
1316 Environment* env = Environment::GetCurrent(context);
1317
1318 {
1319 env->SetConstructorFunction(
1320 target,
1321 "MessageChannel",
1322 env->NewFunctionTemplate(MessageChannel));
1323 }
1324
1325 {
1326 Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New);
1327 t->Inherit(BaseObject::GetConstructorTemplate(env));
1328 t->InstanceTemplate()->SetInternalFieldCount(
1329 JSTransferable::kInternalFieldCount);
1330 env->SetConstructorFunction(target, "JSTransferable", t);
1331 }
1332
1333 env->SetConstructorFunction(
1334 target,
1335 env->message_port_constructor_string(),
1336 GetMessagePortConstructorTemplate(env));
1337
1338 // These are not methods on the MessagePort prototype, because
1339 // the browser equivalents do not provide them.
1340 env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1341 env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1342 env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1343 env->SetMethod(target, "moveMessagePortToContext",
1344 MessagePort::MoveToContext);
1345 env->SetMethod(target, "setDeserializerCreateObjectFunction",
1346 SetDeserializerCreateObjectFunction);
1347
1348 {
1349 Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1350 target
1351 ->Set(context,
1352 FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1353 domexception)
1354 .Check();
1355 }
1356 }
1357
1358 } // anonymous namespace
1359
1360 } // namespace worker
1361 } // namespace node
1362
1363 NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1364