1 #include "node_messaging.h"
2
3 #include "async_wrap-inl.h"
4 #include "debug_utils-inl.h"
5 #include "memory_tracker-inl.h"
6 #include "node_contextify.h"
7 #include "node_buffer.h"
8 #include "node_errors.h"
9 #include "node_process.h"
10 #include "util-inl.h"
11
12 using node::contextify::ContextifyContext;
13 using node::errors::TryCatchScope;
14 using v8::Array;
15 using v8::ArrayBuffer;
16 using v8::ArrayBufferCreationMode;
17 using v8::Context;
18 using v8::EscapableHandleScope;
19 using v8::Function;
20 using v8::FunctionCallbackInfo;
21 using v8::FunctionTemplate;
22 using v8::Global;
23 using v8::HandleScope;
24 using v8::Isolate;
25 using v8::Just;
26 using v8::Local;
27 using v8::Maybe;
28 using v8::MaybeLocal;
29 using v8::Nothing;
30 using v8::Object;
31 using v8::SharedArrayBuffer;
32 using v8::String;
33 using v8::Symbol;
34 using v8::Value;
35 using v8::ValueDeserializer;
36 using v8::ValueSerializer;
37 using v8::WasmModuleObject;
38
39 namespace node {
40
41 using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
42
GetTransferMode() const43 BaseObject::TransferMode BaseObject::GetTransferMode() const {
44 return BaseObject::TransferMode::kUntransferable;
45 }
46
TransferForMessaging()47 std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
48 return CloneForMessaging();
49 }
50
CloneForMessaging() const51 std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
52 return {};
53 }
54
NestedTransferables() const55 Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
56 return Just(BaseObjectList {});
57 }
58
FinalizeTransferRead(Local<Context> context,ValueDeserializer * deserializer)59 Maybe<bool> BaseObject::FinalizeTransferRead(
60 Local<Context> context, ValueDeserializer* deserializer) {
61 return Just(true);
62 }
63
64 namespace worker {
65
FinalizeTransferWrite(Local<Context> context,ValueSerializer * serializer)66 Maybe<bool> TransferData::FinalizeTransferWrite(
67 Local<Context> context, ValueSerializer* serializer) {
68 return Just(true);
69 }
70
Message(MallocedBuffer<char> && buffer)71 Message::Message(MallocedBuffer<char>&& buffer)
72 : main_message_buf_(std::move(buffer)) {}
73
IsCloseMessage() const74 bool Message::IsCloseMessage() const {
75 return main_message_buf_.data == nullptr;
76 }
77
78 namespace {
79
80 // This is used to tell V8 how to read transferred host objects, like other
81 // `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
82 class DeserializerDelegate : public ValueDeserializer::Delegate {
83 public:
DeserializerDelegate(Message * m,Environment * env,const std::vector<BaseObjectPtr<BaseObject>> & host_objects,const std::vector<Local<SharedArrayBuffer>> & shared_array_buffers,const std::vector<WasmModuleObject::TransferrableModule> & wasm_modules)84 DeserializerDelegate(
85 Message* m,
86 Environment* env,
87 const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
88 const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
89 const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules)
90 : host_objects_(host_objects),
91 shared_array_buffers_(shared_array_buffers),
92 wasm_modules_(wasm_modules) {}
93
ReadHostObject(Isolate * isolate)94 MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
95 // Identifying the index in the message's BaseObject array is sufficient.
96 uint32_t id;
97 if (!deserializer->ReadUint32(&id))
98 return MaybeLocal<Object>();
99 CHECK_LE(id, host_objects_.size());
100 return host_objects_[id]->object(isolate);
101 }
102
GetSharedArrayBufferFromId(Isolate * isolate,uint32_t clone_id)103 MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
104 Isolate* isolate, uint32_t clone_id) override {
105 CHECK_LE(clone_id, shared_array_buffers_.size());
106 return shared_array_buffers_[clone_id];
107 }
108
GetWasmModuleFromId(Isolate * isolate,uint32_t transfer_id)109 MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
110 Isolate* isolate, uint32_t transfer_id) override {
111 CHECK_LE(transfer_id, wasm_modules_.size());
112 return WasmModuleObject::FromTransferrableModule(
113 isolate, wasm_modules_[transfer_id]);
114 }
115
116 ValueDeserializer* deserializer = nullptr;
117
118 private:
119 const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
120 const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
121 const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules_;
122 };
123
124 } // anonymous namespace
125
Deserialize(Environment * env,Local<Context> context)126 MaybeLocal<Value> Message::Deserialize(Environment* env,
127 Local<Context> context) {
128 CHECK(!IsCloseMessage());
129
130 EscapableHandleScope handle_scope(env->isolate());
131 Context::Scope context_scope(context);
132
133 // Create all necessary objects for transferables, e.g. MessagePort handles.
134 std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
135 auto cleanup = OnScopeLeave([&]() {
136 for (BaseObjectPtr<BaseObject> object : host_objects) {
137 if (!object) continue;
138
139 // If the function did not finish successfully, host_objects will contain
140 // a list of objects that will never be passed to JS. Therefore, we
141 // destroy them here.
142 object->Detach();
143 }
144 });
145
146 for (uint32_t i = 0; i < transferables_.size(); ++i) {
147 TransferData* data = transferables_[i].get();
148 host_objects[i] = data->Deserialize(
149 env, context, std::move(transferables_[i]));
150 if (!host_objects[i]) return {};
151 }
152 transferables_.clear();
153
154 std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
155 // Attach all transferred SharedArrayBuffers to their new Isolate.
156 for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
157 Local<SharedArrayBuffer> sab;
158 if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
159 .ToLocal(&sab))
160 return MaybeLocal<Value>();
161 shared_array_buffers.push_back(sab);
162 }
163 shared_array_buffers_.clear();
164
165 DeserializerDelegate delegate(
166 this, env, host_objects, shared_array_buffers, wasm_modules_);
167 ValueDeserializer deserializer(
168 env->isolate(),
169 reinterpret_cast<const uint8_t*>(main_message_buf_.data),
170 main_message_buf_.size,
171 &delegate);
172 delegate.deserializer = &deserializer;
173
174 // Attach all transferred ArrayBuffers to their new Isolate.
175 for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
176 if (!env->isolate_data()->uses_node_allocator()) {
177 // We don't use Node's allocator on the receiving side, so we have
178 // to create the ArrayBuffer from a copy of the memory.
179 AllocatedBuffer buf =
180 env->AllocateManaged(array_buffer_contents_[i].size);
181 memcpy(buf.data(),
182 array_buffer_contents_[i].data,
183 array_buffer_contents_[i].size);
184 deserializer.TransferArrayBuffer(i, buf.ToArrayBuffer());
185 continue;
186 }
187
188 env->isolate_data()->node_allocator()->RegisterPointer(
189 array_buffer_contents_[i].data, array_buffer_contents_[i].size);
190
191 Local<ArrayBuffer> ab =
192 ArrayBuffer::New(env->isolate(),
193 array_buffer_contents_[i].release(),
194 array_buffer_contents_[i].size,
195 ArrayBufferCreationMode::kInternalized);
196 deserializer.TransferArrayBuffer(i, ab);
197 }
198 array_buffer_contents_.clear();
199
200 if (deserializer.ReadHeader(context).IsNothing())
201 return {};
202 Local<Value> return_value;
203 if (!deserializer.ReadValue(context).ToLocal(&return_value))
204 return {};
205
206 for (BaseObjectPtr<BaseObject> base_object : host_objects) {
207 if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
208 return {};
209 }
210
211 host_objects.clear();
212 return handle_scope.Escape(return_value);
213 }
214
AddSharedArrayBuffer(const SharedArrayBufferMetadataReference & reference)215 void Message::AddSharedArrayBuffer(
216 const SharedArrayBufferMetadataReference& reference) {
217 shared_array_buffers_.push_back(reference);
218 }
219
AddTransferable(std::unique_ptr<TransferData> && data)220 void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
221 transferables_.emplace_back(std::move(data));
222 }
223
AddWASMModule(WasmModuleObject::TransferrableModule && mod)224 uint32_t Message::AddWASMModule(WasmModuleObject::TransferrableModule&& mod) {
225 wasm_modules_.emplace_back(std::move(mod));
226 return wasm_modules_.size() - 1;
227 }
228
229 namespace {
230
GetEmitMessageFunction(Local<Context> context)231 MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
232 Isolate* isolate = context->GetIsolate();
233 Local<Object> per_context_bindings;
234 Local<Value> emit_message_val;
235 if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
236 !per_context_bindings->Get(context,
237 FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
238 .ToLocal(&emit_message_val)) {
239 return MaybeLocal<Function>();
240 }
241 CHECK(emit_message_val->IsFunction());
242 return emit_message_val.As<Function>();
243 }
244
GetDOMException(Local<Context> context)245 MaybeLocal<Function> GetDOMException(Local<Context> context) {
246 Isolate* isolate = context->GetIsolate();
247 Local<Object> per_context_bindings;
248 Local<Value> domexception_ctor_val;
249 if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
250 !per_context_bindings->Get(context,
251 FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
252 .ToLocal(&domexception_ctor_val)) {
253 return MaybeLocal<Function>();
254 }
255 CHECK(domexception_ctor_val->IsFunction());
256 Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
257 return domexception_ctor;
258 }
259
ThrowDataCloneException(Local<Context> context,Local<String> message)260 void ThrowDataCloneException(Local<Context> context, Local<String> message) {
261 Isolate* isolate = context->GetIsolate();
262 Local<Value> argv[] = {message,
263 FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
264 Local<Value> exception;
265 Local<Function> domexception_ctor;
266 if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
267 !domexception_ctor->NewInstance(context, arraysize(argv), argv)
268 .ToLocal(&exception)) {
269 return;
270 }
271 isolate->ThrowException(exception);
272 }
273
274 // This tells V8 how to serialize objects that it does not understand
275 // (e.g. C++ objects) into the output buffer, in a way that our own
276 // DeserializerDelegate understands how to unpack.
277 class SerializerDelegate : public ValueSerializer::Delegate {
278 public:
SerializerDelegate(Environment * env,Local<Context> context,Message * m)279 SerializerDelegate(Environment* env, Local<Context> context, Message* m)
280 : env_(env), context_(context), msg_(m) {}
281
ThrowDataCloneError(Local<String> message)282 void ThrowDataCloneError(Local<String> message) override {
283 ThrowDataCloneException(context_, message);
284 }
285
WriteHostObject(Isolate * isolate,Local<Object> object)286 Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
287 if (env_->base_object_ctor_template()->HasInstance(object)) {
288 return WriteHostObject(
289 BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
290 }
291
292 ThrowDataCloneError(env_->clone_unsupported_type_str());
293 return Nothing<bool>();
294 }
295
GetSharedArrayBufferId(Isolate * isolate,Local<SharedArrayBuffer> shared_array_buffer)296 Maybe<uint32_t> GetSharedArrayBufferId(
297 Isolate* isolate,
298 Local<SharedArrayBuffer> shared_array_buffer) override {
299 uint32_t i;
300 for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
301 if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
302 shared_array_buffer) {
303 return Just(i);
304 }
305 }
306
307 auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
308 env_,
309 context_,
310 shared_array_buffer);
311 if (!reference) {
312 return Nothing<uint32_t>();
313 }
314 seen_shared_array_buffers_.emplace_back(
315 Global<SharedArrayBuffer> { isolate, shared_array_buffer });
316 msg_->AddSharedArrayBuffer(reference);
317 return Just(i);
318 }
319
GetWasmModuleTransferId(Isolate * isolate,Local<WasmModuleObject> module)320 Maybe<uint32_t> GetWasmModuleTransferId(
321 Isolate* isolate, Local<WasmModuleObject> module) override {
322 return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
323 }
324
Finish(Local<Context> context)325 Maybe<bool> Finish(Local<Context> context) {
326 for (uint32_t i = 0; i < host_objects_.size(); i++) {
327 BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
328 std::unique_ptr<TransferData> data;
329 if (i < first_cloned_object_index_)
330 data = host_object->TransferForMessaging();
331 if (!data)
332 data = host_object->CloneForMessaging();
333 if (!data) return Nothing<bool>();
334 if (data->FinalizeTransferWrite(context, serializer).IsNothing())
335 return Nothing<bool>();
336 msg_->AddTransferable(std::move(data));
337 }
338 return Just(true);
339 }
340
AddHostObject(BaseObjectPtr<BaseObject> host_object)341 inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
342 // Make sure we have not started serializing the value itself yet.
343 CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
344 host_objects_.emplace_back(std::move(host_object));
345 }
346
347 // Some objects in the transfer list may register sub-objects that can be
348 // transferred. This could e.g. be a public JS wrapper object, such as a
349 // FileHandle, that is registering its C++ handle for transfer.
AddNestedHostObjects()350 inline Maybe<bool> AddNestedHostObjects() {
351 for (size_t i = 0; i < host_objects_.size(); i++) {
352 std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
353 if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
354 return Nothing<bool>();
355 for (auto nested_transferable : nested_transferables) {
356 if (std::find(host_objects_.begin(),
357 host_objects_.end(),
358 nested_transferable) == host_objects_.end()) {
359 AddHostObject(nested_transferable);
360 }
361 }
362 }
363 return Just(true);
364 }
365
366 ValueSerializer* serializer = nullptr;
367
368 private:
WriteHostObject(BaseObjectPtr<BaseObject> host_object)369 Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
370 BaseObject::TransferMode mode = host_object->GetTransferMode();
371 if (mode == BaseObject::TransferMode::kUntransferable) {
372 ThrowDataCloneError(env_->clone_unsupported_type_str());
373 return Nothing<bool>();
374 }
375
376 for (uint32_t i = 0; i < host_objects_.size(); i++) {
377 if (host_objects_[i] == host_object) {
378 serializer->WriteUint32(i);
379 return Just(true);
380 }
381 }
382
383 if (mode == BaseObject::TransferMode::kTransferable) {
384 // TODO(addaleax): This message code is too specific. Fix that in a
385 // semver-major follow-up.
386 THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
387 return Nothing<bool>();
388 }
389
390 CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
391 uint32_t index = host_objects_.size();
392 if (first_cloned_object_index_ == SIZE_MAX)
393 first_cloned_object_index_ = index;
394 serializer->WriteUint32(index);
395 host_objects_.push_back(host_object);
396 return Just(true);
397 }
398
399 Environment* env_;
400 Local<Context> context_;
401 Message* msg_;
402 std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
403 std::vector<BaseObjectPtr<BaseObject>> host_objects_;
404 size_t first_cloned_object_index_ = SIZE_MAX;
405
406 friend class worker::Message;
407 };
408
409 } // anonymous namespace
410
Serialize(Environment * env,Local<Context> context,Local<Value> input,const TransferList & transfer_list_v,Local<Object> source_port)411 Maybe<bool> Message::Serialize(Environment* env,
412 Local<Context> context,
413 Local<Value> input,
414 const TransferList& transfer_list_v,
415 Local<Object> source_port) {
416 HandleScope handle_scope(env->isolate());
417 Context::Scope context_scope(context);
418
419 // Verify that we're not silently overwriting an existing message.
420 CHECK(main_message_buf_.is_empty());
421
422 SerializerDelegate delegate(env, context, this);
423 ValueSerializer serializer(env->isolate(), &delegate);
424 delegate.serializer = &serializer;
425
426 std::vector<Local<ArrayBuffer>> array_buffers;
427 for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
428 Local<Value> entry = transfer_list_v[i];
429 if (entry->IsObject()) {
430 // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
431 // for details.
432 bool untransferable;
433 if (!entry.As<Object>()->HasPrivate(
434 context,
435 env->untransferable_object_private_symbol())
436 .To(&untransferable)) {
437 return Nothing<bool>();
438 }
439 if (untransferable) continue;
440 }
441
442 // Currently, we support ArrayBuffers and BaseObjects for which
443 // GetTransferMode() does not return kUntransferable.
444 if (entry->IsArrayBuffer()) {
445 Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
446 // If we cannot render the ArrayBuffer unusable in this Isolate and
447 // take ownership of its memory, copying the buffer will have to do.
448 if (!ab->IsDetachable() || ab->IsExternal() ||
449 !env->isolate_data()->uses_node_allocator()) {
450 continue;
451 }
452
453 if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
454 array_buffers.end()) {
455 ThrowDataCloneException(
456 context,
457 FIXED_ONE_BYTE_STRING(
458 env->isolate(),
459 "Transfer list contains duplicate ArrayBuffer"));
460 return Nothing<bool>();
461 }
462 // We simply use the array index in the `array_buffers` list as the
463 // ID that we write into the serialized buffer.
464 uint32_t id = array_buffers.size();
465 array_buffers.push_back(ab);
466 serializer.TransferArrayBuffer(id, ab);
467 continue;
468 } else if (env->base_object_ctor_template()->HasInstance(entry)) {
469 // Check if the source MessagePort is being transferred.
470 if (!source_port.IsEmpty() && entry == source_port) {
471 ThrowDataCloneException(
472 context,
473 FIXED_ONE_BYTE_STRING(env->isolate(),
474 "Transfer list contains source port"));
475 return Nothing<bool>();
476 }
477 BaseObjectPtr<BaseObject> host_object {
478 Unwrap<BaseObject>(entry.As<Object>()) };
479 if (env->message_port_constructor_template()->HasInstance(entry) &&
480 (!host_object ||
481 static_cast<MessagePort*>(host_object.get())->IsDetached())) {
482 ThrowDataCloneException(
483 context,
484 FIXED_ONE_BYTE_STRING(
485 env->isolate(),
486 "MessagePort in transfer list is already detached"));
487 return Nothing<bool>();
488 }
489 if (std::find(delegate.host_objects_.begin(),
490 delegate.host_objects_.end(),
491 host_object) != delegate.host_objects_.end()) {
492 ThrowDataCloneException(
493 context,
494 String::Concat(env->isolate(),
495 FIXED_ONE_BYTE_STRING(
496 env->isolate(),
497 "Transfer list contains duplicate "),
498 entry.As<Object>()->GetConstructorName()));
499 return Nothing<bool>();
500 }
501 if (host_object && host_object->GetTransferMode() !=
502 BaseObject::TransferMode::kUntransferable) {
503 delegate.AddHostObject(host_object);
504 continue;
505 }
506 }
507
508 THROW_ERR_INVALID_TRANSFER_OBJECT(env);
509 return Nothing<bool>();
510 }
511 if (delegate.AddNestedHostObjects().IsNothing())
512 return Nothing<bool>();
513
514 serializer.WriteHeader();
515 if (serializer.WriteValue(context, input).IsNothing()) {
516 return Nothing<bool>();
517 }
518
519 for (Local<ArrayBuffer> ab : array_buffers) {
520 // If serialization succeeded, we want to take ownership of
521 // (a.k.a. externalize) the underlying memory region and render
522 // it inaccessible in this Isolate.
523 ArrayBuffer::Contents contents = ab->Externalize();
524 ab->Detach();
525
526 CHECK(env->isolate_data()->uses_node_allocator());
527 env->isolate_data()->node_allocator()->UnregisterPointer(
528 contents.Data(), contents.ByteLength());
529
530 array_buffer_contents_.emplace_back(MallocedBuffer<char>{
531 static_cast<char*>(contents.Data()), contents.ByteLength()});
532 }
533
534 if (delegate.Finish(context).IsNothing())
535 return Nothing<bool>();
536
537 // The serializer gave us a buffer allocated using `malloc()`.
538 std::pair<uint8_t*, size_t> data = serializer.Release();
539 CHECK_NOT_NULL(data.first);
540 main_message_buf_ =
541 MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
542 return Just(true);
543 }
544
MemoryInfo(MemoryTracker * tracker) const545 void Message::MemoryInfo(MemoryTracker* tracker) const {
546 tracker->TrackField("array_buffer_contents", array_buffer_contents_);
547 tracker->TrackFieldWithSize("shared_array_buffers",
548 shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
549 tracker->TrackField("transferables", transferables_);
550 }
551
MessagePortData(MessagePort * owner)552 MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
553
~MessagePortData()554 MessagePortData::~MessagePortData() {
555 CHECK_NULL(owner_);
556 Disentangle();
557 }
558
MemoryInfo(MemoryTracker * tracker) const559 void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
560 Mutex::ScopedLock lock(mutex_);
561 tracker->TrackField("incoming_messages", incoming_messages_);
562 }
563
AddToIncomingQueue(Message && message)564 void MessagePortData::AddToIncomingQueue(Message&& message) {
565 // This function will be called by other threads.
566 Mutex::ScopedLock lock(mutex_);
567 incoming_messages_.emplace_back(std::move(message));
568
569 if (owner_ != nullptr) {
570 Debug(owner_, "Adding message to incoming queue");
571 owner_->TriggerAsync();
572 }
573 }
574
Entangle(MessagePortData * a,MessagePortData * b)575 void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
576 CHECK_NULL(a->sibling_);
577 CHECK_NULL(b->sibling_);
578 a->sibling_ = b;
579 b->sibling_ = a;
580 a->sibling_mutex_ = b->sibling_mutex_;
581 }
582
Disentangle()583 void MessagePortData::Disentangle() {
584 // Grab a copy of the sibling mutex, then replace it so that each sibling
585 // has its own sibling_mutex_ now.
586 std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
587 Mutex::ScopedLock sibling_lock(*sibling_mutex);
588 sibling_mutex_ = std::make_shared<Mutex>();
589
590 MessagePortData* sibling = sibling_;
591 if (sibling_ != nullptr) {
592 sibling_->sibling_ = nullptr;
593 sibling_ = nullptr;
594 }
595
596 // We close MessagePorts after disentanglement, so we enqueue a corresponding
597 // message and trigger the corresponding uv_async_t to let them know that
598 // this happened.
599 AddToIncomingQueue(Message());
600 if (sibling != nullptr) {
601 sibling->AddToIncomingQueue(Message());
602 }
603 }
604
~MessagePort()605 MessagePort::~MessagePort() {
606 if (data_) Detach();
607 }
608
MessagePort(Environment * env,Local<Context> context,Local<Object> wrap)609 MessagePort::MessagePort(Environment* env,
610 Local<Context> context,
611 Local<Object> wrap)
612 : HandleWrap(env,
613 wrap,
614 reinterpret_cast<uv_handle_t*>(&async_),
615 AsyncWrap::PROVIDER_MESSAGEPORT),
616 data_(new MessagePortData(this)) {
617 auto onmessage = [](uv_async_t* handle) {
618 // Called when data has been put into the queue.
619 MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
620 channel->OnMessage();
621 };
622
623 CHECK_EQ(uv_async_init(env->event_loop(),
624 &async_,
625 onmessage), 0);
626 // Reset later to indicate success of the constructor.
627 bool succeeded = false;
628 auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
629
630 Local<Value> fn;
631 if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
632 return;
633
634 if (fn->IsFunction()) {
635 Local<Function> init = fn.As<Function>();
636 if (init->Call(context, wrap, 0, nullptr).IsEmpty())
637 return;
638 }
639
640 Local<Function> emit_message_fn;
641 if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
642 return;
643 emit_message_fn_.Reset(env->isolate(), emit_message_fn);
644
645 succeeded = true;
646 Debug(this, "Created message port");
647 }
648
IsDetached() const649 bool MessagePort::IsDetached() const {
650 return data_ == nullptr || IsHandleClosing();
651 }
652
TriggerAsync()653 void MessagePort::TriggerAsync() {
654 if (IsHandleClosing()) return;
655 CHECK_EQ(uv_async_send(&async_), 0);
656 }
657
Close(v8::Local<v8::Value> close_callback)658 void MessagePort::Close(v8::Local<v8::Value> close_callback) {
659 Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
660
661 if (data_) {
662 // Wrap this call with accessing the mutex, so that TriggerAsync()
663 // can check IsHandleClosing() without race conditions.
664 Mutex::ScopedLock sibling_lock(data_->mutex_);
665 HandleWrap::Close(close_callback);
666 } else {
667 HandleWrap::Close(close_callback);
668 }
669 }
670
New(const FunctionCallbackInfo<Value> & args)671 void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
672 // This constructor just throws an error. Unfortunately, we can’t use V8’s
673 // ConstructorBehavior::kThrow, as that also removes the prototype from the
674 // class (i.e. makes it behave like an arrow function).
675 Environment* env = Environment::GetCurrent(args);
676 THROW_ERR_CONSTRUCT_CALL_INVALID(env);
677 }
678
New(Environment * env,Local<Context> context,std::unique_ptr<MessagePortData> data)679 MessagePort* MessagePort::New(
680 Environment* env,
681 Local<Context> context,
682 std::unique_ptr<MessagePortData> data) {
683 Context::Scope context_scope(context);
684 Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
685
686 // Construct a new instance, then assign the listener instance and possibly
687 // the MessagePortData to it.
688 Local<Object> instance;
689 if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
690 return nullptr;
691 MessagePort* port = new MessagePort(env, context, instance);
692 CHECK_NOT_NULL(port);
693 if (port->IsHandleClosing()) {
694 // Construction failed with an exception.
695 return nullptr;
696 }
697
698 if (data) {
699 port->Detach();
700 port->data_ = std::move(data);
701
702 // This lock is here to avoid race conditions with the `owner_` read
703 // in AddToIncomingQueue(). (This would likely be unproblematic without it,
704 // but it's better to be safe than sorry.)
705 Mutex::ScopedLock lock(port->data_->mutex_);
706 port->data_->owner_ = port;
707 // If the existing MessagePortData object had pending messages, this is
708 // the easiest way to run that queue.
709 port->TriggerAsync();
710 }
711 return port;
712 }
713
ReceiveMessage(Local<Context> context,bool only_if_receiving)714 MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
715 bool only_if_receiving) {
716 Message received;
717 {
718 // Get the head of the message queue.
719 Mutex::ScopedLock lock(data_->mutex_);
720
721 Debug(this, "MessagePort has message");
722
723 bool wants_message = receiving_messages_ || !only_if_receiving;
724 // We have nothing to do if:
725 // - There are no pending messages
726 // - We are not intending to receive messages, and the message we would
727 // receive is not the final "close" message.
728 if (data_->incoming_messages_.empty() ||
729 (!wants_message &&
730 !data_->incoming_messages_.front().IsCloseMessage())) {
731 return env()->no_message_symbol();
732 }
733
734 received = std::move(data_->incoming_messages_.front());
735 data_->incoming_messages_.pop_front();
736 }
737
738 if (received.IsCloseMessage()) {
739 Close();
740 return env()->no_message_symbol();
741 }
742
743 if (!env()->can_call_into_js()) return MaybeLocal<Value>();
744
745 return received.Deserialize(env(), context);
746 }
747
OnMessage()748 void MessagePort::OnMessage() {
749 Debug(this, "Running MessagePort::OnMessage()");
750 HandleScope handle_scope(env()->isolate());
751 Local<Context> context = object(env()->isolate())->CreationContext();
752
753 size_t processing_limit;
754 {
755 Mutex::ScopedLock(data_->mutex_);
756 processing_limit = std::max(data_->incoming_messages_.size(),
757 static_cast<size_t>(1000));
758 }
759
760 // data_ can only ever be modified by the owner thread, so no need to lock.
761 // However, the message port may be transferred while it is processing
762 // messages, so we need to check that this handle still owns its `data_` field
763 // on every iteration.
764 while (data_) {
765 if (processing_limit-- == 0) {
766 // Prevent event loop starvation by only processing those messages without
767 // interruption that were already present when the OnMessage() call was
768 // first triggered, but at least 1000 messages because otherwise the
769 // overhead of repeatedly triggering the uv_async_t instance becomes
770 // noticable, at least on Windows.
771 // (That might require more investigation by somebody more familiar with
772 // Windows.)
773 TriggerAsync();
774 return;
775 }
776
777 HandleScope handle_scope(env()->isolate());
778 Context::Scope context_scope(context);
779 Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
780
781 Local<Value> payload;
782 Local<Value> message_error;
783 {
784 // Catch any exceptions from parsing the message itself (not from
785 // emitting it) as 'messageeror' events.
786 TryCatchScope try_catch(env());
787 if (!ReceiveMessage(context, true).ToLocal(&payload)) {
788 if (try_catch.HasCaught() && !try_catch.HasTerminated())
789 message_error = try_catch.Exception();
790 goto reschedule;
791 }
792 }
793 if (payload == env()->no_message_symbol()) break;
794
795 if (!env()->can_call_into_js()) {
796 Debug(this, "MessagePort drains queue because !can_call_into_js()");
797 // In this case there is nothing to do but to drain the current queue.
798 continue;
799 }
800
801 if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
802 reschedule:
803 if (!message_error.IsEmpty()) {
804 // This should become a `messageerror` event in the sense of the
805 // EventTarget API at some point.
806 Local<Value> argv[] = {
807 env()->messageerror_string(),
808 message_error
809 };
810 USE(MakeCallback(env()->emit_string(), arraysize(argv), argv));
811 }
812
813 // Re-schedule OnMessage() execution in case of failure.
814 if (data_)
815 TriggerAsync();
816 return;
817 }
818 }
819 }
820
OnClose()821 void MessagePort::OnClose() {
822 Debug(this, "MessagePort::OnClose()");
823 if (data_) {
824 // Detach() returns move(data_).
825 Detach()->Disentangle();
826 }
827 }
828
Detach()829 std::unique_ptr<MessagePortData> MessagePort::Detach() {
830 CHECK(data_);
831 Mutex::ScopedLock lock(data_->mutex_);
832 data_->owner_ = nullptr;
833 return std::move(data_);
834 }
835
GetTransferMode() const836 BaseObject::TransferMode MessagePort::GetTransferMode() const {
837 if (IsDetached())
838 return BaseObject::TransferMode::kUntransferable;
839 return BaseObject::TransferMode::kTransferable;
840 }
841
TransferForMessaging()842 std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
843 Close();
844 return Detach();
845 }
846
Deserialize(Environment * env,Local<Context> context,std::unique_ptr<TransferData> self)847 BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
848 Environment* env,
849 Local<Context> context,
850 std::unique_ptr<TransferData> self) {
851 return BaseObjectPtr<MessagePort> { MessagePort::New(
852 env, context,
853 static_unique_pointer_cast<MessagePortData>(std::move(self))) };
854 }
855
PostMessage(Environment * env,Local<Value> message_v,const TransferList & transfer_v)856 Maybe<bool> MessagePort::PostMessage(Environment* env,
857 Local<Value> message_v,
858 const TransferList& transfer_v) {
859 Isolate* isolate = env->isolate();
860 Local<Object> obj = object(isolate);
861 Local<Context> context = obj->CreationContext();
862
863 Message msg;
864
865 // Per spec, we need to both check if transfer list has the source port, and
866 // serialize the input message, even if the MessagePort is closed or detached.
867
868 Maybe<bool> serialization_maybe =
869 msg.Serialize(env, context, message_v, transfer_v, obj);
870 if (data_ == nullptr) {
871 return serialization_maybe;
872 }
873 if (serialization_maybe.IsNothing()) {
874 return Nothing<bool>();
875 }
876
877 Mutex::ScopedLock lock(*data_->sibling_mutex_);
878 bool doomed = false;
879
880 // Check if the target port is posted to itself.
881 if (data_->sibling_ != nullptr) {
882 for (const auto& transferable : msg.transferables()) {
883 if (data_->sibling_ == transferable.get()) {
884 doomed = true;
885 ProcessEmitWarning(env, "The target port was posted to itself, and "
886 "the communication channel was lost");
887 break;
888 }
889 }
890 }
891
892 if (data_->sibling_ == nullptr || doomed)
893 return Just(true);
894
895 data_->sibling_->AddToIncomingQueue(std::move(msg));
896 return Just(true);
897 }
898
ReadIterable(Environment * env,Local<Context> context,TransferList & transfer_list,Local<Value> object)899 static Maybe<bool> ReadIterable(Environment* env,
900 Local<Context> context,
901 // NOLINTNEXTLINE(runtime/references)
902 TransferList& transfer_list,
903 Local<Value> object) {
904 if (!object->IsObject()) return Just(false);
905
906 if (object->IsArray()) {
907 Local<Array> arr = object.As<Array>();
908 size_t length = arr->Length();
909 transfer_list.AllocateSufficientStorage(length);
910 for (size_t i = 0; i < length; i++) {
911 if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
912 return Nothing<bool>();
913 }
914 return Just(true);
915 }
916
917 Isolate* isolate = env->isolate();
918 Local<Value> iterator_method;
919 if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
920 .ToLocal(&iterator_method)) return Nothing<bool>();
921 if (!iterator_method->IsFunction()) return Just(false);
922
923 Local<Value> iterator;
924 if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
925 .ToLocal(&iterator)) return Nothing<bool>();
926 if (!iterator->IsObject()) return Just(false);
927
928 Local<Value> next;
929 if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
930 return Nothing<bool>();
931 if (!next->IsFunction()) return Just(false);
932
933 std::vector<Local<Value>> entries;
934 while (env->can_call_into_js()) {
935 Local<Value> result;
936 if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
937 .ToLocal(&result)) return Nothing<bool>();
938 if (!result->IsObject()) return Just(false);
939
940 Local<Value> done;
941 if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
942 return Nothing<bool>();
943 if (done->BooleanValue(isolate)) break;
944
945 Local<Value> val;
946 if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
947 return Nothing<bool>();
948 entries.push_back(val);
949 }
950
951 transfer_list.AllocateSufficientStorage(entries.size());
952 std::copy(entries.begin(), entries.end(), &transfer_list[0]);
953 return Just(true);
954 }
955
PostMessage(const FunctionCallbackInfo<Value> & args)956 void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
957 Environment* env = Environment::GetCurrent(args);
958 Local<Object> obj = args.This();
959 Local<Context> context = obj->CreationContext();
960
961 if (args.Length() == 0) {
962 return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
963 "MessagePort.postMessage");
964 }
965
966 if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
967 // Browsers ignore null or undefined, and otherwise accept an array or an
968 // options object.
969 return THROW_ERR_INVALID_ARG_TYPE(env,
970 "Optional transferList argument must be an iterable");
971 }
972
973 TransferList transfer_list;
974 if (args[1]->IsObject()) {
975 bool was_iterable;
976 if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
977 return;
978 if (!was_iterable) {
979 Local<Value> transfer_option;
980 if (!args[1].As<Object>()->Get(context, env->transfer_string())
981 .ToLocal(&transfer_option)) return;
982 if (!transfer_option->IsUndefined()) {
983 if (!ReadIterable(env, context, transfer_list, transfer_option)
984 .To(&was_iterable)) return;
985 if (!was_iterable) {
986 return THROW_ERR_INVALID_ARG_TYPE(env,
987 "Optional options.transfer argument must be an iterable");
988 }
989 }
990 }
991 }
992
993 MessagePort* port = Unwrap<MessagePort>(args.This());
994 // Even if the backing MessagePort object has already been deleted, we still
995 // want to serialize the message to ensure spec-compliant behavior w.r.t.
996 // transfers.
997 if (port == nullptr) {
998 Message msg;
999 USE(msg.Serialize(env, context, args[0], transfer_list, obj));
1000 return;
1001 }
1002
1003 port->PostMessage(env, args[0], transfer_list);
1004 }
1005
Start()1006 void MessagePort::Start() {
1007 Debug(this, "Start receiving messages");
1008 receiving_messages_ = true;
1009 Mutex::ScopedLock lock(data_->mutex_);
1010 if (!data_->incoming_messages_.empty())
1011 TriggerAsync();
1012 }
1013
Stop()1014 void MessagePort::Stop() {
1015 Debug(this, "Stop receiving messages");
1016 receiving_messages_ = false;
1017 }
1018
Start(const FunctionCallbackInfo<Value> & args)1019 void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
1020 MessagePort* port;
1021 ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
1022 if (!port->data_) {
1023 return;
1024 }
1025 port->Start();
1026 }
1027
Stop(const FunctionCallbackInfo<Value> & args)1028 void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
1029 MessagePort* port;
1030 CHECK(args[0]->IsObject());
1031 ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1032 if (!port->data_) {
1033 return;
1034 }
1035 port->Stop();
1036 }
1037
Drain(const FunctionCallbackInfo<Value> & args)1038 void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1039 MessagePort* port;
1040 ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1041 port->OnMessage();
1042 }
1043
ReceiveMessage(const FunctionCallbackInfo<Value> & args)1044 void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1045 Environment* env = Environment::GetCurrent(args);
1046 if (!args[0]->IsObject() ||
1047 !env->message_port_constructor_template()->HasInstance(args[0])) {
1048 return THROW_ERR_INVALID_ARG_TYPE(env,
1049 "First argument needs to be a MessagePort instance");
1050 }
1051 MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1052 if (port == nullptr) {
1053 // Return 'no messages' for a closed port.
1054 args.GetReturnValue().Set(
1055 Environment::GetCurrent(args)->no_message_symbol());
1056 return;
1057 }
1058
1059 MaybeLocal<Value> payload =
1060 port->ReceiveMessage(port->object()->CreationContext(), false);
1061 if (!payload.IsEmpty())
1062 args.GetReturnValue().Set(payload.ToLocalChecked());
1063 }
1064
MoveToContext(const FunctionCallbackInfo<Value> & args)1065 void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1066 Environment* env = Environment::GetCurrent(args);
1067 if (!args[0]->IsObject() ||
1068 !env->message_port_constructor_template()->HasInstance(args[0])) {
1069 return THROW_ERR_INVALID_ARG_TYPE(env,
1070 "First argument needs to be a MessagePort instance");
1071 }
1072 MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1073 CHECK_NOT_NULL(port);
1074
1075 Local<Value> context_arg = args[1];
1076 ContextifyContext* context_wrapper;
1077 if (!context_arg->IsObject() ||
1078 (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1079 env, context_arg.As<Object>())) == nullptr) {
1080 return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1081 }
1082
1083 std::unique_ptr<MessagePortData> data;
1084 if (!port->IsDetached())
1085 data = port->Detach();
1086
1087 Context::Scope context_scope(context_wrapper->context());
1088 MessagePort* target =
1089 MessagePort::New(env, context_wrapper->context(), std::move(data));
1090 if (target != nullptr)
1091 args.GetReturnValue().Set(target->object());
1092 }
1093
Entangle(MessagePort * a,MessagePort * b)1094 void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1095 Entangle(a, b->data_.get());
1096 }
1097
Entangle(MessagePort * a,MessagePortData * b)1098 void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1099 MessagePortData::Entangle(a->data_.get(), b);
1100 }
1101
MemoryInfo(MemoryTracker * tracker) const1102 void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1103 tracker->TrackField("data", data_);
1104 tracker->TrackField("emit_message_fn", emit_message_fn_);
1105 }
1106
GetMessagePortConstructorTemplate(Environment * env)1107 Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1108 // Factor generating the MessagePort JS constructor into its own piece
1109 // of code, because it is needed early on in the child environment setup.
1110 Local<FunctionTemplate> templ = env->message_port_constructor_template();
1111 if (!templ.IsEmpty())
1112 return templ;
1113
1114 {
1115 Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
1116 m->SetClassName(env->message_port_constructor_string());
1117 m->InstanceTemplate()->SetInternalFieldCount(
1118 MessagePort::kInternalFieldCount);
1119 m->Inherit(HandleWrap::GetConstructorTemplate(env));
1120
1121 env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
1122 env->SetProtoMethod(m, "start", MessagePort::Start);
1123
1124 env->set_message_port_constructor_template(m);
1125 }
1126
1127 return GetMessagePortConstructorTemplate(env);
1128 }
1129
JSTransferable(Environment * env,Local<Object> obj)1130 JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1131 : BaseObject(env, obj) {
1132 MakeWeak();
1133 }
1134
New(const FunctionCallbackInfo<Value> & args)1135 void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1136 CHECK(args.IsConstructCall());
1137 new JSTransferable(Environment::GetCurrent(args), args.This());
1138 }
1139
GetTransferMode() const1140 JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1141 // Implement `kClone in this ? kCloneable : kTransferable`.
1142 HandleScope handle_scope(env()->isolate());
1143 errors::TryCatchScope ignore_exceptions(env());
1144
1145 bool has_clone;
1146 if (!object()->Has(env()->context(),
1147 env()->messaging_clone_symbol()).To(&has_clone)) {
1148 return TransferMode::kUntransferable;
1149 }
1150
1151 return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1152 }
1153
TransferForMessaging()1154 std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1155 return TransferOrClone(TransferMode::kTransferable);
1156 }
1157
CloneForMessaging() const1158 std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1159 return TransferOrClone(TransferMode::kCloneable);
1160 }
1161
TransferOrClone(TransferMode mode) const1162 std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1163 TransferMode mode) const {
1164 // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1165 // which should return an object with `data` and `deserializeInfo` properties;
1166 // `data` is written to the serializer later, and `deserializeInfo` is stored
1167 // on the `TransferData` instance as a string.
1168 HandleScope handle_scope(env()->isolate());
1169 Local<Context> context = env()->isolate()->GetCurrentContext();
1170 Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1171 env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1172
1173 Local<Value> method;
1174 if (!object()->Get(context, method_name).ToLocal(&method)) {
1175 return {};
1176 }
1177 if (method->IsFunction()) {
1178 Local<Value> result_v;
1179 if (!method.As<Function>()->Call(
1180 context, object(), 0, nullptr).ToLocal(&result_v)) {
1181 return {};
1182 }
1183
1184 if (result_v->IsObject()) {
1185 Local<Object> result = result_v.As<Object>();
1186 Local<Value> data;
1187 Local<Value> deserialize_info;
1188 if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1189 !result->Get(context, env()->deserialize_info_string())
1190 .ToLocal(&deserialize_info)) {
1191 return {};
1192 }
1193 Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1194 if (*deserialize_info_str == nullptr) return {};
1195 return std::make_unique<Data>(
1196 *deserialize_info_str, Global<Value>(env()->isolate(), data));
1197 }
1198 }
1199
1200 if (mode == TransferMode::kTransferable)
1201 return TransferOrClone(TransferMode::kCloneable);
1202 else
1203 return {};
1204 }
1205
1206 Maybe<BaseObjectList>
NestedTransferables() const1207 JSTransferable::NestedTransferables() const {
1208 // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1209 HandleScope handle_scope(env()->isolate());
1210 Local<Context> context = env()->isolate()->GetCurrentContext();
1211 Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1212
1213 Local<Value> method;
1214 if (!object()->Get(context, method_name).ToLocal(&method)) {
1215 return Nothing<BaseObjectList>();
1216 }
1217 if (!method->IsFunction()) return Just(BaseObjectList {});
1218
1219 Local<Value> list_v;
1220 if (!method.As<Function>()->Call(
1221 context, object(), 0, nullptr).ToLocal(&list_v)) {
1222 return Nothing<BaseObjectList>();
1223 }
1224 if (!list_v->IsArray()) return Just(BaseObjectList {});
1225 Local<Array> list = list_v.As<Array>();
1226
1227 BaseObjectList ret;
1228 for (size_t i = 0; i < list->Length(); i++) {
1229 Local<Value> value;
1230 if (!list->Get(context, i).ToLocal(&value))
1231 return Nothing<BaseObjectList>();
1232 if (env()->base_object_ctor_template()->HasInstance(value))
1233 ret.emplace_back(Unwrap<BaseObject>(value.As<Object>()));
1234 }
1235 return Just(ret);
1236 }
1237
FinalizeTransferRead(Local<Context> context,ValueDeserializer * deserializer)1238 Maybe<bool> JSTransferable::FinalizeTransferRead(
1239 Local<Context> context, ValueDeserializer* deserializer) {
1240 // Call `this[kDeserialize](data)` where `data` comes from the return value
1241 // of `this[kTransfer]()` or `this[kClone]()`.
1242 HandleScope handle_scope(env()->isolate());
1243 Local<Value> data;
1244 if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1245
1246 Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1247 Local<Value> method;
1248 if (!object()->Get(context, method_name).ToLocal(&method)) {
1249 return Nothing<bool>();
1250 }
1251 if (!method->IsFunction()) return Just(true);
1252
1253 if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1254 return Nothing<bool>();
1255 }
1256 return Just(true);
1257 }
1258
Data(std::string && deserialize_info,v8::Global<v8::Value> && data)1259 JSTransferable::Data::Data(std::string&& deserialize_info,
1260 v8::Global<v8::Value>&& data)
1261 : deserialize_info_(std::move(deserialize_info)),
1262 data_(std::move(data)) {}
1263
Deserialize(Environment * env,Local<Context> context,std::unique_ptr<TransferData> self)1264 BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1265 Environment* env,
1266 Local<Context> context,
1267 std::unique_ptr<TransferData> self) {
1268 // Create the JS wrapper object that will later be filled with data passed to
1269 // the `[kDeserialize]()` method on it. This split is necessary, because here
1270 // we need to create an object with the right prototype and internal fields,
1271 // but the actual JS data stored in the serialized data can only be read at
1272 // the end of the stream, after the main message has been read.
1273
1274 if (context != env->context()) {
1275 THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1276 return {};
1277 }
1278 HandleScope handle_scope(env->isolate());
1279 Local<Value> info;
1280 if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1281
1282 Local<Value> ret;
1283 CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1284 if (!env->messaging_deserialize_create_object()->Call(
1285 context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1286 !env->base_object_ctor_template()->HasInstance(ret)) {
1287 return {};
1288 }
1289
1290 return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret.As<Object>()) };
1291 }
1292
FinalizeTransferWrite(Local<Context> context,ValueSerializer * serializer)1293 Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1294 Local<Context> context, ValueSerializer* serializer) {
1295 HandleScope handle_scope(context->GetIsolate());
1296 auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1297 data_.Reset();
1298 return ret;
1299 }
1300
1301 namespace {
1302
SetDeserializerCreateObjectFunction(const FunctionCallbackInfo<Value> & args)1303 static void SetDeserializerCreateObjectFunction(
1304 const FunctionCallbackInfo<Value>& args) {
1305 Environment* env = Environment::GetCurrent(args);
1306 CHECK(args[0]->IsFunction());
1307 env->set_messaging_deserialize_create_object(args[0].As<Function>());
1308 }
1309
MessageChannel(const FunctionCallbackInfo<Value> & args)1310 static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1311 Environment* env = Environment::GetCurrent(args);
1312 if (!args.IsConstructCall()) {
1313 THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1314 return;
1315 }
1316
1317 Local<Context> context = args.This()->CreationContext();
1318 Context::Scope context_scope(context);
1319
1320 MessagePort* port1 = MessagePort::New(env, context);
1321 if (port1 == nullptr) return;
1322 MessagePort* port2 = MessagePort::New(env, context);
1323 if (port2 == nullptr) {
1324 port1->Close();
1325 return;
1326 }
1327
1328 MessagePort::Entangle(port1, port2);
1329
1330 args.This()->Set(context, env->port1_string(), port1->object())
1331 .Check();
1332 args.This()->Set(context, env->port2_string(), port2->object())
1333 .Check();
1334 }
1335
InitMessaging(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)1336 static void InitMessaging(Local<Object> target,
1337 Local<Value> unused,
1338 Local<Context> context,
1339 void* priv) {
1340 Environment* env = Environment::GetCurrent(context);
1341
1342 {
1343 Local<String> message_channel_string =
1344 FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
1345 Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
1346 templ->SetClassName(message_channel_string);
1347 target->Set(context,
1348 message_channel_string,
1349 templ->GetFunction(context).ToLocalChecked()).Check();
1350 }
1351
1352 {
1353 Local<String> js_transferable_string =
1354 FIXED_ONE_BYTE_STRING(env->isolate(), "JSTransferable");
1355 Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New);
1356 t->Inherit(BaseObject::GetConstructorTemplate(env));
1357 t->SetClassName(js_transferable_string);
1358 t->InstanceTemplate()->SetInternalFieldCount(
1359 JSTransferable::kInternalFieldCount);
1360 target->Set(context,
1361 js_transferable_string,
1362 t->GetFunction(context).ToLocalChecked()).Check();
1363 }
1364
1365 target->Set(context,
1366 env->message_port_constructor_string(),
1367 GetMessagePortConstructorTemplate(env)
1368 ->GetFunction(context).ToLocalChecked()).Check();
1369
1370 // These are not methods on the MessagePort prototype, because
1371 // the browser equivalents do not provide them.
1372 env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1373 env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1374 env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1375 env->SetMethod(target, "moveMessagePortToContext",
1376 MessagePort::MoveToContext);
1377 env->SetMethod(target, "setDeserializerCreateObjectFunction",
1378 SetDeserializerCreateObjectFunction);
1379
1380 {
1381 Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1382 target
1383 ->Set(context,
1384 FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1385 domexception)
1386 .Check();
1387 }
1388 }
1389
1390 } // anonymous namespace
1391
1392 } // namespace worker
1393 } // namespace node
1394
1395 NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1396