• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "node_messaging.h"
2 
3 #include "async_wrap-inl.h"
4 #include "debug_utils-inl.h"
5 #include "memory_tracker-inl.h"
6 #include "node_contextify.h"
7 #include "node_buffer.h"
8 #include "node_errors.h"
9 #include "node_process-inl.h"
10 #include "util-inl.h"
11 
12 using node::contextify::ContextifyContext;
13 using node::errors::TryCatchScope;
14 using v8::Array;
15 using v8::ArrayBuffer;
16 using v8::BackingStore;
17 using v8::CompiledWasmModule;
18 using v8::Context;
19 using v8::EscapableHandleScope;
20 using v8::Function;
21 using v8::FunctionCallbackInfo;
22 using v8::FunctionTemplate;
23 using v8::Global;
24 using v8::HandleScope;
25 using v8::Isolate;
26 using v8::Just;
27 using v8::Local;
28 using v8::Maybe;
29 using v8::MaybeLocal;
30 using v8::Nothing;
31 using v8::Object;
32 using v8::SharedArrayBuffer;
33 using v8::String;
34 using v8::Symbol;
35 using v8::Value;
36 using v8::ValueDeserializer;
37 using v8::ValueSerializer;
38 using v8::WasmModuleObject;
39 
40 namespace node {
41 
42 using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
43 
GetTransferMode() const44 BaseObject::TransferMode BaseObject::GetTransferMode() const {
45   return BaseObject::TransferMode::kUntransferable;
46 }
47 
TransferForMessaging()48 std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
49   return CloneForMessaging();
50 }
51 
CloneForMessaging() const52 std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
53   return {};
54 }
55 
NestedTransferables() const56 Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
57   return Just(BaseObjectList {});
58 }
59 
FinalizeTransferRead(Local<Context> context,ValueDeserializer * deserializer)60 Maybe<bool> BaseObject::FinalizeTransferRead(
61     Local<Context> context, ValueDeserializer* deserializer) {
62   return Just(true);
63 }
64 
65 namespace worker {
66 
FinalizeTransferWrite(Local<Context> context,ValueSerializer * serializer)67 Maybe<bool> TransferData::FinalizeTransferWrite(
68     Local<Context> context, ValueSerializer* serializer) {
69   return Just(true);
70 }
71 
Message(MallocedBuffer<char> && buffer)72 Message::Message(MallocedBuffer<char>&& buffer)
73     : main_message_buf_(std::move(buffer)) {}
74 
IsCloseMessage() const75 bool Message::IsCloseMessage() const {
76   return main_message_buf_.data == nullptr;
77 }
78 
79 namespace {
80 
81 // This is used to tell V8 how to read transferred host objects, like other
82 // `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
83 class DeserializerDelegate : public ValueDeserializer::Delegate {
84  public:
DeserializerDelegate(Message * m,Environment * env,const std::vector<BaseObjectPtr<BaseObject>> & host_objects,const std::vector<Local<SharedArrayBuffer>> & shared_array_buffers,const std::vector<CompiledWasmModule> & wasm_modules)85   DeserializerDelegate(
86       Message* m,
87       Environment* env,
88       const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
89       const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
90       const std::vector<CompiledWasmModule>& wasm_modules)
91       : host_objects_(host_objects),
92         shared_array_buffers_(shared_array_buffers),
93         wasm_modules_(wasm_modules) {}
94 
ReadHostObject(Isolate * isolate)95   MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
96     // Identifying the index in the message's BaseObject array is sufficient.
97     uint32_t id;
98     if (!deserializer->ReadUint32(&id))
99       return MaybeLocal<Object>();
100     CHECK_LE(id, host_objects_.size());
101     return host_objects_[id]->object(isolate);
102   }
103 
GetSharedArrayBufferFromId(Isolate * isolate,uint32_t clone_id)104   MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
105       Isolate* isolate, uint32_t clone_id) override {
106     CHECK_LE(clone_id, shared_array_buffers_.size());
107     return shared_array_buffers_[clone_id];
108   }
109 
GetWasmModuleFromId(Isolate * isolate,uint32_t transfer_id)110   MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
111       Isolate* isolate, uint32_t transfer_id) override {
112     CHECK_LE(transfer_id, wasm_modules_.size());
113     return WasmModuleObject::FromCompiledModule(
114         isolate, wasm_modules_[transfer_id]);
115   }
116 
117   ValueDeserializer* deserializer = nullptr;
118 
119  private:
120   const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
121   const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
122   const std::vector<CompiledWasmModule>& wasm_modules_;
123 };
124 
125 }  // anonymous namespace
126 
Deserialize(Environment * env,Local<Context> context)127 MaybeLocal<Value> Message::Deserialize(Environment* env,
128                                        Local<Context> context) {
129   CHECK(!IsCloseMessage());
130 
131   EscapableHandleScope handle_scope(env->isolate());
132   Context::Scope context_scope(context);
133 
134   // Create all necessary objects for transferables, e.g. MessagePort handles.
135   std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
136   auto cleanup = OnScopeLeave([&]() {
137     for (BaseObjectPtr<BaseObject> object : host_objects) {
138       if (!object) continue;
139 
140       // If the function did not finish successfully, host_objects will contain
141       // a list of objects that will never be passed to JS. Therefore, we
142       // destroy them here.
143       object->Detach();
144     }
145   });
146 
147   for (uint32_t i = 0; i < transferables_.size(); ++i) {
148     TransferData* data = transferables_[i].get();
149     host_objects[i] = data->Deserialize(
150         env, context, std::move(transferables_[i]));
151     if (!host_objects[i]) return {};
152   }
153   transferables_.clear();
154 
155   std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
156   // Attach all transferred SharedArrayBuffers to their new Isolate.
157   for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
158     Local<SharedArrayBuffer> sab =
159         SharedArrayBuffer::New(env->isolate(),
160                                std::move(shared_array_buffers_[i]));
161     shared_array_buffers.push_back(sab);
162   }
163   shared_array_buffers_.clear();
164 
165   DeserializerDelegate delegate(
166       this, env, host_objects, shared_array_buffers, wasm_modules_);
167   ValueDeserializer deserializer(
168       env->isolate(),
169       reinterpret_cast<const uint8_t*>(main_message_buf_.data),
170       main_message_buf_.size,
171       &delegate);
172   delegate.deserializer = &deserializer;
173 
174   // Attach all transferred ArrayBuffers to their new Isolate.
175   for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
176     Local<ArrayBuffer> ab =
177         ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
178     deserializer.TransferArrayBuffer(i, ab);
179   }
180   array_buffers_.clear();
181 
182   if (deserializer.ReadHeader(context).IsNothing())
183     return {};
184   Local<Value> return_value;
185   if (!deserializer.ReadValue(context).ToLocal(&return_value))
186     return {};
187 
188   for (BaseObjectPtr<BaseObject> base_object : host_objects) {
189     if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
190       return {};
191   }
192 
193   host_objects.clear();
194   return handle_scope.Escape(return_value);
195 }
196 
AddSharedArrayBuffer(std::shared_ptr<BackingStore> backing_store)197 void Message::AddSharedArrayBuffer(
198     std::shared_ptr<BackingStore> backing_store) {
199   shared_array_buffers_.emplace_back(std::move(backing_store));
200 }
201 
AddTransferable(std::unique_ptr<TransferData> && data)202 void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
203   transferables_.emplace_back(std::move(data));
204 }
205 
AddWASMModule(CompiledWasmModule && mod)206 uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
207   wasm_modules_.emplace_back(std::move(mod));
208   return wasm_modules_.size() - 1;
209 }
210 
211 namespace {
212 
GetEmitMessageFunction(Local<Context> context)213 MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
214   Isolate* isolate = context->GetIsolate();
215   Local<Object> per_context_bindings;
216   Local<Value> emit_message_val;
217   if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
218       !per_context_bindings->Get(context,
219                                 FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
220           .ToLocal(&emit_message_val)) {
221     return MaybeLocal<Function>();
222   }
223   CHECK(emit_message_val->IsFunction());
224   return emit_message_val.As<Function>();
225 }
226 
GetDOMException(Local<Context> context)227 MaybeLocal<Function> GetDOMException(Local<Context> context) {
228   Isolate* isolate = context->GetIsolate();
229   Local<Object> per_context_bindings;
230   Local<Value> domexception_ctor_val;
231   if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
232       !per_context_bindings->Get(context,
233                                 FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
234           .ToLocal(&domexception_ctor_val)) {
235     return MaybeLocal<Function>();
236   }
237   CHECK(domexception_ctor_val->IsFunction());
238   Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
239   return domexception_ctor;
240 }
241 
ThrowDataCloneException(Local<Context> context,Local<String> message)242 void ThrowDataCloneException(Local<Context> context, Local<String> message) {
243   Isolate* isolate = context->GetIsolate();
244   Local<Value> argv[] = {message,
245                          FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
246   Local<Value> exception;
247   Local<Function> domexception_ctor;
248   if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
249       !domexception_ctor->NewInstance(context, arraysize(argv), argv)
250            .ToLocal(&exception)) {
251     return;
252   }
253   isolate->ThrowException(exception);
254 }
255 
256 // This tells V8 how to serialize objects that it does not understand
257 // (e.g. C++ objects) into the output buffer, in a way that our own
258 // DeserializerDelegate understands how to unpack.
259 class SerializerDelegate : public ValueSerializer::Delegate {
260  public:
SerializerDelegate(Environment * env,Local<Context> context,Message * m)261   SerializerDelegate(Environment* env, Local<Context> context, Message* m)
262       : env_(env), context_(context), msg_(m) {}
263 
ThrowDataCloneError(Local<String> message)264   void ThrowDataCloneError(Local<String> message) override {
265     ThrowDataCloneException(context_, message);
266   }
267 
WriteHostObject(Isolate * isolate,Local<Object> object)268   Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
269     if (env_->base_object_ctor_template()->HasInstance(object)) {
270       return WriteHostObject(
271           BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
272     }
273 
274     ThrowDataCloneError(env_->clone_unsupported_type_str());
275     return Nothing<bool>();
276   }
277 
GetSharedArrayBufferId(Isolate * isolate,Local<SharedArrayBuffer> shared_array_buffer)278   Maybe<uint32_t> GetSharedArrayBufferId(
279       Isolate* isolate,
280       Local<SharedArrayBuffer> shared_array_buffer) override {
281     uint32_t i;
282     for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
283       if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
284           shared_array_buffer) {
285         return Just(i);
286       }
287     }
288 
289     seen_shared_array_buffers_.emplace_back(
290       Global<SharedArrayBuffer> { isolate, shared_array_buffer });
291     msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
292     return Just(i);
293   }
294 
GetWasmModuleTransferId(Isolate * isolate,Local<WasmModuleObject> module)295   Maybe<uint32_t> GetWasmModuleTransferId(
296       Isolate* isolate, Local<WasmModuleObject> module) override {
297     return Just(msg_->AddWASMModule(module->GetCompiledModule()));
298   }
299 
Finish(Local<Context> context)300   Maybe<bool> Finish(Local<Context> context) {
301     for (uint32_t i = 0; i < host_objects_.size(); i++) {
302       BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
303       std::unique_ptr<TransferData> data;
304       if (i < first_cloned_object_index_)
305         data = host_object->TransferForMessaging();
306       if (!data)
307         data = host_object->CloneForMessaging();
308       if (!data) return Nothing<bool>();
309       if (data->FinalizeTransferWrite(context, serializer).IsNothing())
310         return Nothing<bool>();
311       msg_->AddTransferable(std::move(data));
312     }
313     return Just(true);
314   }
315 
AddHostObject(BaseObjectPtr<BaseObject> host_object)316   inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
317     // Make sure we have not started serializing the value itself yet.
318     CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
319     host_objects_.emplace_back(std::move(host_object));
320   }
321 
322   // Some objects in the transfer list may register sub-objects that can be
323   // transferred. This could e.g. be a public JS wrapper object, such as a
324   // FileHandle, that is registering its C++ handle for transfer.
AddNestedHostObjects()325   inline Maybe<bool> AddNestedHostObjects() {
326     for (size_t i = 0; i < host_objects_.size(); i++) {
327       std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
328       if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
329         return Nothing<bool>();
330       for (auto nested_transferable : nested_transferables) {
331         if (std::find(host_objects_.begin(),
332                       host_objects_.end(),
333                       nested_transferable) == host_objects_.end()) {
334           AddHostObject(nested_transferable);
335         }
336       }
337     }
338     return Just(true);
339   }
340 
341   ValueSerializer* serializer = nullptr;
342 
343  private:
WriteHostObject(BaseObjectPtr<BaseObject> host_object)344   Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
345     BaseObject::TransferMode mode = host_object->GetTransferMode();
346     if (mode == BaseObject::TransferMode::kUntransferable) {
347       ThrowDataCloneError(env_->clone_unsupported_type_str());
348       return Nothing<bool>();
349     }
350 
351     for (uint32_t i = 0; i < host_objects_.size(); i++) {
352       if (host_objects_[i] == host_object) {
353         serializer->WriteUint32(i);
354         return Just(true);
355       }
356     }
357 
358     if (mode == BaseObject::TransferMode::kTransferable) {
359       // TODO(addaleax): This message code is too specific. Fix that in a
360       // semver-major follow-up.
361       THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
362       return Nothing<bool>();
363     }
364 
365     CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
366     uint32_t index = host_objects_.size();
367     if (first_cloned_object_index_ == SIZE_MAX)
368       first_cloned_object_index_ = index;
369     serializer->WriteUint32(index);
370     host_objects_.push_back(host_object);
371     return Just(true);
372   }
373 
374   Environment* env_;
375   Local<Context> context_;
376   Message* msg_;
377   std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
378   std::vector<BaseObjectPtr<BaseObject>> host_objects_;
379   size_t first_cloned_object_index_ = SIZE_MAX;
380 
381   friend class worker::Message;
382 };
383 
384 }  // anonymous namespace
385 
Serialize(Environment * env,Local<Context> context,Local<Value> input,const TransferList & transfer_list_v,Local<Object> source_port)386 Maybe<bool> Message::Serialize(Environment* env,
387                                Local<Context> context,
388                                Local<Value> input,
389                                const TransferList& transfer_list_v,
390                                Local<Object> source_port) {
391   HandleScope handle_scope(env->isolate());
392   Context::Scope context_scope(context);
393 
394   // Verify that we're not silently overwriting an existing message.
395   CHECK(main_message_buf_.is_empty());
396 
397   SerializerDelegate delegate(env, context, this);
398   ValueSerializer serializer(env->isolate(), &delegate);
399   delegate.serializer = &serializer;
400 
401   std::vector<Local<ArrayBuffer>> array_buffers;
402   for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
403     Local<Value> entry = transfer_list_v[i];
404     if (entry->IsObject()) {
405       // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
406       // for details.
407       bool untransferable;
408       if (!entry.As<Object>()->HasPrivate(
409               context,
410               env->untransferable_object_private_symbol())
411               .To(&untransferable)) {
412         return Nothing<bool>();
413       }
414       if (untransferable) continue;
415     }
416 
417     // Currently, we support ArrayBuffers and BaseObjects for which
418     // GetTransferMode() does not return kUntransferable.
419     if (entry->IsArrayBuffer()) {
420       Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
421       // If we cannot render the ArrayBuffer unusable in this Isolate,
422       // copying the buffer will have to do.
423       // Note that we can currently transfer ArrayBuffers even if they were
424       // not allocated by Node’s ArrayBufferAllocator in the first place,
425       // because we pass the underlying v8::BackingStore around rather than
426       // raw data *and* an Isolate with a non-default ArrayBuffer allocator
427       // is always going to outlive any Workers it creates, and so will its
428       // allocator along with it.
429       if (!ab->IsDetachable()) continue;
430       if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
431           array_buffers.end()) {
432         ThrowDataCloneException(
433             context,
434             FIXED_ONE_BYTE_STRING(
435                 env->isolate(),
436                 "Transfer list contains duplicate ArrayBuffer"));
437         return Nothing<bool>();
438       }
439       // We simply use the array index in the `array_buffers` list as the
440       // ID that we write into the serialized buffer.
441       uint32_t id = array_buffers.size();
442       array_buffers.push_back(ab);
443       serializer.TransferArrayBuffer(id, ab);
444       continue;
445     } else if (env->base_object_ctor_template()->HasInstance(entry)) {
446       // Check if the source MessagePort is being transferred.
447       if (!source_port.IsEmpty() && entry == source_port) {
448         ThrowDataCloneException(
449             context,
450             FIXED_ONE_BYTE_STRING(env->isolate(),
451                                   "Transfer list contains source port"));
452         return Nothing<bool>();
453       }
454       BaseObjectPtr<BaseObject> host_object {
455           Unwrap<BaseObject>(entry.As<Object>()) };
456       if (env->message_port_constructor_template()->HasInstance(entry) &&
457           (!host_object ||
458            static_cast<MessagePort*>(host_object.get())->IsDetached())) {
459         ThrowDataCloneException(
460             context,
461             FIXED_ONE_BYTE_STRING(
462                 env->isolate(),
463                 "MessagePort in transfer list is already detached"));
464         return Nothing<bool>();
465       }
466       if (std::find(delegate.host_objects_.begin(),
467                     delegate.host_objects_.end(),
468                     host_object) != delegate.host_objects_.end()) {
469         ThrowDataCloneException(
470             context,
471             String::Concat(env->isolate(),
472                 FIXED_ONE_BYTE_STRING(
473                   env->isolate(),
474                   "Transfer list contains duplicate "),
475                 entry.As<Object>()->GetConstructorName()));
476         return Nothing<bool>();
477       }
478       if (host_object && host_object->GetTransferMode() !=
479               BaseObject::TransferMode::kUntransferable) {
480         delegate.AddHostObject(host_object);
481         continue;
482       }
483     }
484 
485     THROW_ERR_INVALID_TRANSFER_OBJECT(env);
486     return Nothing<bool>();
487   }
488   if (delegate.AddNestedHostObjects().IsNothing())
489     return Nothing<bool>();
490 
491   serializer.WriteHeader();
492   if (serializer.WriteValue(context, input).IsNothing()) {
493     return Nothing<bool>();
494   }
495 
496   for (Local<ArrayBuffer> ab : array_buffers) {
497     // If serialization succeeded, we render it inaccessible in this Isolate.
498     std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
499     ab->Detach();
500 
501     array_buffers_.emplace_back(std::move(backing_store));
502   }
503 
504   if (delegate.Finish(context).IsNothing())
505     return Nothing<bool>();
506 
507   // The serializer gave us a buffer allocated using `malloc()`.
508   std::pair<uint8_t*, size_t> data = serializer.Release();
509   CHECK_NOT_NULL(data.first);
510   main_message_buf_ =
511       MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
512   return Just(true);
513 }
514 
MemoryInfo(MemoryTracker * tracker) const515 void Message::MemoryInfo(MemoryTracker* tracker) const {
516   tracker->TrackField("array_buffers_", array_buffers_);
517   tracker->TrackField("shared_array_buffers", shared_array_buffers_);
518   tracker->TrackField("transferables", transferables_);
519 }
520 
MessagePortData(MessagePort * owner)521 MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
522 
~MessagePortData()523 MessagePortData::~MessagePortData() {
524   CHECK_NULL(owner_);
525   Disentangle();
526 }
527 
MemoryInfo(MemoryTracker * tracker) const528 void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
529   Mutex::ScopedLock lock(mutex_);
530   tracker->TrackField("incoming_messages", incoming_messages_);
531 }
532 
AddToIncomingQueue(Message && message)533 void MessagePortData::AddToIncomingQueue(Message&& message) {
534   // This function will be called by other threads.
535   Mutex::ScopedLock lock(mutex_);
536   incoming_messages_.emplace_back(std::move(message));
537 
538   if (owner_ != nullptr) {
539     Debug(owner_, "Adding message to incoming queue");
540     owner_->TriggerAsync();
541   }
542 }
543 
Entangle(MessagePortData * a,MessagePortData * b)544 void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
545   CHECK_NULL(a->sibling_);
546   CHECK_NULL(b->sibling_);
547   a->sibling_ = b;
548   b->sibling_ = a;
549   a->sibling_mutex_ = b->sibling_mutex_;
550 }
551 
Disentangle()552 void MessagePortData::Disentangle() {
553   // Grab a copy of the sibling mutex, then replace it so that each sibling
554   // has its own sibling_mutex_ now.
555   std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
556   Mutex::ScopedLock sibling_lock(*sibling_mutex);
557   sibling_mutex_ = std::make_shared<Mutex>();
558 
559   MessagePortData* sibling = sibling_;
560   if (sibling_ != nullptr) {
561     sibling_->sibling_ = nullptr;
562     sibling_ = nullptr;
563   }
564 
565   // We close MessagePorts after disentanglement, so we enqueue a corresponding
566   // message and trigger the corresponding uv_async_t to let them know that
567   // this happened.
568   AddToIncomingQueue(Message());
569   if (sibling != nullptr) {
570     sibling->AddToIncomingQueue(Message());
571   }
572 }
573 
~MessagePort()574 MessagePort::~MessagePort() {
575   if (data_) Detach();
576 }
577 
MessagePort(Environment * env,Local<Context> context,Local<Object> wrap)578 MessagePort::MessagePort(Environment* env,
579                          Local<Context> context,
580                          Local<Object> wrap)
581   : HandleWrap(env,
582                wrap,
583                reinterpret_cast<uv_handle_t*>(&async_),
584                AsyncWrap::PROVIDER_MESSAGEPORT),
585     data_(new MessagePortData(this)) {
586   auto onmessage = [](uv_async_t* handle) {
587     // Called when data has been put into the queue.
588     MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
589     channel->OnMessage();
590   };
591 
592   CHECK_EQ(uv_async_init(env->event_loop(),
593                          &async_,
594                          onmessage), 0);
595   // Reset later to indicate success of the constructor.
596   bool succeeded = false;
597   auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
598 
599   Local<Value> fn;
600   if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
601     return;
602 
603   if (fn->IsFunction()) {
604     Local<Function> init = fn.As<Function>();
605     if (init->Call(context, wrap, 0, nullptr).IsEmpty())
606       return;
607   }
608 
609   Local<Function> emit_message_fn;
610   if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
611     return;
612   emit_message_fn_.Reset(env->isolate(), emit_message_fn);
613 
614   succeeded = true;
615   Debug(this, "Created message port");
616 }
617 
IsDetached() const618 bool MessagePort::IsDetached() const {
619   return data_ == nullptr || IsHandleClosing();
620 }
621 
TriggerAsync()622 void MessagePort::TriggerAsync() {
623   if (IsHandleClosing()) return;
624   CHECK_EQ(uv_async_send(&async_), 0);
625 }
626 
Close(v8::Local<v8::Value> close_callback)627 void MessagePort::Close(v8::Local<v8::Value> close_callback) {
628   Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
629 
630   if (data_) {
631     // Wrap this call with accessing the mutex, so that TriggerAsync()
632     // can check IsHandleClosing() without race conditions.
633     Mutex::ScopedLock sibling_lock(data_->mutex_);
634     HandleWrap::Close(close_callback);
635   } else {
636     HandleWrap::Close(close_callback);
637   }
638 }
639 
New(const FunctionCallbackInfo<Value> & args)640 void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
641   // This constructor just throws an error. Unfortunately, we can’t use V8’s
642   // ConstructorBehavior::kThrow, as that also removes the prototype from the
643   // class (i.e. makes it behave like an arrow function).
644   Environment* env = Environment::GetCurrent(args);
645   THROW_ERR_CONSTRUCT_CALL_INVALID(env);
646 }
647 
New(Environment * env,Local<Context> context,std::unique_ptr<MessagePortData> data)648 MessagePort* MessagePort::New(
649     Environment* env,
650     Local<Context> context,
651     std::unique_ptr<MessagePortData> data) {
652   Context::Scope context_scope(context);
653   Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
654 
655   // Construct a new instance, then assign the listener instance and possibly
656   // the MessagePortData to it.
657   Local<Object> instance;
658   if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
659     return nullptr;
660   MessagePort* port = new MessagePort(env, context, instance);
661   CHECK_NOT_NULL(port);
662   if (port->IsHandleClosing()) {
663     // Construction failed with an exception.
664     return nullptr;
665   }
666 
667   if (data) {
668     port->Detach();
669     port->data_ = std::move(data);
670 
671     // This lock is here to avoid race conditions with the `owner_` read
672     // in AddToIncomingQueue(). (This would likely be unproblematic without it,
673     // but it's better to be safe than sorry.)
674     Mutex::ScopedLock lock(port->data_->mutex_);
675     port->data_->owner_ = port;
676     // If the existing MessagePortData object had pending messages, this is
677     // the easiest way to run that queue.
678     port->TriggerAsync();
679   }
680   return port;
681 }
682 
ReceiveMessage(Local<Context> context,bool only_if_receiving)683 MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
684                                               bool only_if_receiving) {
685   Message received;
686   {
687     // Get the head of the message queue.
688     Mutex::ScopedLock lock(data_->mutex_);
689 
690     Debug(this, "MessagePort has message");
691 
692     bool wants_message = receiving_messages_ || !only_if_receiving;
693     // We have nothing to do if:
694     // - There are no pending messages
695     // - We are not intending to receive messages, and the message we would
696     //   receive is not the final "close" message.
697     if (data_->incoming_messages_.empty() ||
698         (!wants_message &&
699          !data_->incoming_messages_.front().IsCloseMessage())) {
700       return env()->no_message_symbol();
701     }
702 
703     received = std::move(data_->incoming_messages_.front());
704     data_->incoming_messages_.pop_front();
705   }
706 
707   if (received.IsCloseMessage()) {
708     Close();
709     return env()->no_message_symbol();
710   }
711 
712   if (!env()->can_call_into_js()) return MaybeLocal<Value>();
713 
714   return received.Deserialize(env(), context);
715 }
716 
OnMessage()717 void MessagePort::OnMessage() {
718   Debug(this, "Running MessagePort::OnMessage()");
719   HandleScope handle_scope(env()->isolate());
720   Local<Context> context = object(env()->isolate())->CreationContext();
721 
722   size_t processing_limit;
723   {
724     Mutex::ScopedLock(data_->mutex_);
725     processing_limit = std::max(data_->incoming_messages_.size(),
726                                 static_cast<size_t>(1000));
727   }
728 
729   // data_ can only ever be modified by the owner thread, so no need to lock.
730   // However, the message port may be transferred while it is processing
731   // messages, so we need to check that this handle still owns its `data_` field
732   // on every iteration.
733   while (data_) {
734     if (processing_limit-- == 0) {
735       // Prevent event loop starvation by only processing those messages without
736       // interruption that were already present when the OnMessage() call was
737       // first triggered, but at least 1000 messages because otherwise the
738       // overhead of repeatedly triggering the uv_async_t instance becomes
739       // noticeable, at least on Windows.
740       // (That might require more investigation by somebody more familiar with
741       // Windows.)
742       TriggerAsync();
743       return;
744     }
745 
746     HandleScope handle_scope(env()->isolate());
747     Context::Scope context_scope(context);
748     Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
749 
750     Local<Value> payload;
751     Local<Value> message_error;
752     Local<Value> argv[2];
753 
754     {
755       // Catch any exceptions from parsing the message itself (not from
756       // emitting it) as 'messageeror' events.
757       TryCatchScope try_catch(env());
758       if (!ReceiveMessage(context, true).ToLocal(&payload)) {
759         if (try_catch.HasCaught() && !try_catch.HasTerminated())
760           message_error = try_catch.Exception();
761         goto reschedule;
762       }
763     }
764     if (payload == env()->no_message_symbol()) break;
765 
766     if (!env()->can_call_into_js()) {
767       Debug(this, "MessagePort drains queue because !can_call_into_js()");
768       // In this case there is nothing to do but to drain the current queue.
769       continue;
770     }
771 
772     argv[0] = payload;
773     argv[1] = env()->message_string();
774 
775     if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
776     reschedule:
777       if (!message_error.IsEmpty()) {
778         argv[0] = message_error;
779         argv[1] = env()->messageerror_string();
780         USE(MakeCallback(emit_message, arraysize(argv), argv));
781       }
782 
783       // Re-schedule OnMessage() execution in case of failure.
784       if (data_)
785         TriggerAsync();
786       return;
787     }
788   }
789 }
790 
OnClose()791 void MessagePort::OnClose() {
792   Debug(this, "MessagePort::OnClose()");
793   if (data_) {
794     // Detach() returns move(data_).
795     Detach()->Disentangle();
796   }
797 }
798 
Detach()799 std::unique_ptr<MessagePortData> MessagePort::Detach() {
800   CHECK(data_);
801   Mutex::ScopedLock lock(data_->mutex_);
802   data_->owner_ = nullptr;
803   return std::move(data_);
804 }
805 
GetTransferMode() const806 BaseObject::TransferMode MessagePort::GetTransferMode() const {
807   if (IsDetached())
808     return BaseObject::TransferMode::kUntransferable;
809   return BaseObject::TransferMode::kTransferable;
810 }
811 
TransferForMessaging()812 std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
813   Close();
814   return Detach();
815 }
816 
Deserialize(Environment * env,Local<Context> context,std::unique_ptr<TransferData> self)817 BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
818     Environment* env,
819     Local<Context> context,
820     std::unique_ptr<TransferData> self) {
821   return BaseObjectPtr<MessagePort> { MessagePort::New(
822       env, context,
823       static_unique_pointer_cast<MessagePortData>(std::move(self))) };
824 }
825 
PostMessage(Environment * env,Local<Context> context,Local<Value> message_v,const TransferList & transfer_v)826 Maybe<bool> MessagePort::PostMessage(Environment* env,
827                                      Local<Context> context,
828                                      Local<Value> message_v,
829                                      const TransferList& transfer_v) {
830   Isolate* isolate = env->isolate();
831   Local<Object> obj = object(isolate);
832 
833   Message msg;
834 
835   // Per spec, we need to both check if transfer list has the source port, and
836   // serialize the input message, even if the MessagePort is closed or detached.
837 
838   Maybe<bool> serialization_maybe =
839       msg.Serialize(env, context, message_v, transfer_v, obj);
840   if (data_ == nullptr) {
841     return serialization_maybe;
842   }
843   if (serialization_maybe.IsNothing()) {
844     return Nothing<bool>();
845   }
846 
847   Mutex::ScopedLock lock(*data_->sibling_mutex_);
848   bool doomed = false;
849 
850   // Check if the target port is posted to itself.
851   if (data_->sibling_ != nullptr) {
852     for (const auto& transferable : msg.transferables()) {
853       if (data_->sibling_ == transferable.get()) {
854         doomed = true;
855         ProcessEmitWarning(env, "The target port was posted to itself, and "
856                                 "the communication channel was lost");
857         break;
858       }
859     }
860   }
861 
862   if (data_->sibling_ == nullptr || doomed)
863     return Just(true);
864 
865   data_->sibling_->AddToIncomingQueue(std::move(msg));
866   return Just(true);
867 }
868 
ReadIterable(Environment * env,Local<Context> context,TransferList & transfer_list,Local<Value> object)869 static Maybe<bool> ReadIterable(Environment* env,
870                                 Local<Context> context,
871                                 // NOLINTNEXTLINE(runtime/references)
872                                 TransferList& transfer_list,
873                                 Local<Value> object) {
874   if (!object->IsObject()) return Just(false);
875 
876   if (object->IsArray()) {
877     Local<Array> arr = object.As<Array>();
878     size_t length = arr->Length();
879     transfer_list.AllocateSufficientStorage(length);
880     for (size_t i = 0; i < length; i++) {
881       if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
882         return Nothing<bool>();
883     }
884     return Just(true);
885   }
886 
887   Isolate* isolate = env->isolate();
888   Local<Value> iterator_method;
889   if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
890       .ToLocal(&iterator_method)) return Nothing<bool>();
891   if (!iterator_method->IsFunction()) return Just(false);
892 
893   Local<Value> iterator;
894   if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
895       .ToLocal(&iterator)) return Nothing<bool>();
896   if (!iterator->IsObject()) return Just(false);
897 
898   Local<Value> next;
899   if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
900     return Nothing<bool>();
901   if (!next->IsFunction()) return Just(false);
902 
903   std::vector<Local<Value>> entries;
904   while (env->can_call_into_js()) {
905     Local<Value> result;
906     if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
907         .ToLocal(&result)) return Nothing<bool>();
908     if (!result->IsObject()) return Just(false);
909 
910     Local<Value> done;
911     if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
912       return Nothing<bool>();
913     if (done->BooleanValue(isolate)) break;
914 
915     Local<Value> val;
916     if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
917       return Nothing<bool>();
918     entries.push_back(val);
919   }
920 
921   transfer_list.AllocateSufficientStorage(entries.size());
922   std::copy(entries.begin(), entries.end(), &transfer_list[0]);
923   return Just(true);
924 }
925 
PostMessage(const FunctionCallbackInfo<Value> & args)926 void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
927   Environment* env = Environment::GetCurrent(args);
928   Local<Object> obj = args.This();
929   Local<Context> context = obj->CreationContext();
930 
931   if (args.Length() == 0) {
932     return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
933                                        "MessagePort.postMessage");
934   }
935 
936   if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
937     // Browsers ignore null or undefined, and otherwise accept an array or an
938     // options object.
939     return THROW_ERR_INVALID_ARG_TYPE(env,
940         "Optional transferList argument must be an iterable");
941   }
942 
943   TransferList transfer_list;
944   if (args[1]->IsObject()) {
945     bool was_iterable;
946     if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
947       return;
948     if (!was_iterable) {
949       Local<Value> transfer_option;
950       if (!args[1].As<Object>()->Get(context, env->transfer_string())
951           .ToLocal(&transfer_option)) return;
952       if (!transfer_option->IsUndefined()) {
953         if (!ReadIterable(env, context, transfer_list, transfer_option)
954             .To(&was_iterable)) return;
955         if (!was_iterable) {
956           return THROW_ERR_INVALID_ARG_TYPE(env,
957               "Optional options.transfer argument must be an iterable");
958         }
959       }
960     }
961   }
962 
963   MessagePort* port = Unwrap<MessagePort>(args.This());
964   // Even if the backing MessagePort object has already been deleted, we still
965   // want to serialize the message to ensure spec-compliant behavior w.r.t.
966   // transfers.
967   if (port == nullptr || port->IsHandleClosing()) {
968     Message msg;
969     USE(msg.Serialize(env, context, args[0], transfer_list, obj));
970     return;
971   }
972 
973   Maybe<bool> res = port->PostMessage(env, context, args[0], transfer_list);
974   if (res.IsJust())
975     args.GetReturnValue().Set(res.FromJust());
976 }
977 
Start()978 void MessagePort::Start() {
979   Debug(this, "Start receiving messages");
980   receiving_messages_ = true;
981   Mutex::ScopedLock lock(data_->mutex_);
982   if (!data_->incoming_messages_.empty())
983     TriggerAsync();
984 }
985 
Stop()986 void MessagePort::Stop() {
987   Debug(this, "Stop receiving messages");
988   receiving_messages_ = false;
989 }
990 
Start(const FunctionCallbackInfo<Value> & args)991 void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
992   MessagePort* port;
993   ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
994   if (!port->data_) {
995     return;
996   }
997   port->Start();
998 }
999 
Stop(const FunctionCallbackInfo<Value> & args)1000 void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
1001   MessagePort* port;
1002   CHECK(args[0]->IsObject());
1003   ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1004   if (!port->data_) {
1005     return;
1006   }
1007   port->Stop();
1008 }
1009 
Drain(const FunctionCallbackInfo<Value> & args)1010 void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1011   MessagePort* port;
1012   ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1013   port->OnMessage();
1014 }
1015 
ReceiveMessage(const FunctionCallbackInfo<Value> & args)1016 void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1017   Environment* env = Environment::GetCurrent(args);
1018   if (!args[0]->IsObject() ||
1019       !env->message_port_constructor_template()->HasInstance(args[0])) {
1020     return THROW_ERR_INVALID_ARG_TYPE(env,
1021         "The \"port\" argument must be a MessagePort instance");
1022   }
1023   MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1024   if (port == nullptr) {
1025     // Return 'no messages' for a closed port.
1026     args.GetReturnValue().Set(
1027         Environment::GetCurrent(args)->no_message_symbol());
1028     return;
1029   }
1030 
1031   MaybeLocal<Value> payload =
1032       port->ReceiveMessage(port->object()->CreationContext(), false);
1033   if (!payload.IsEmpty())
1034     args.GetReturnValue().Set(payload.ToLocalChecked());
1035 }
1036 
MoveToContext(const FunctionCallbackInfo<Value> & args)1037 void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1038   Environment* env = Environment::GetCurrent(args);
1039   if (!args[0]->IsObject() ||
1040       !env->message_port_constructor_template()->HasInstance(args[0])) {
1041     return THROW_ERR_INVALID_ARG_TYPE(env,
1042         "The \"port\" argument must be a MessagePort instance");
1043   }
1044   MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1045   if (port == nullptr || port->IsHandleClosing()) {
1046     Isolate* isolate = env->isolate();
1047     THROW_ERR_CLOSED_MESSAGE_PORT(isolate);
1048     return;
1049   }
1050 
1051   Local<Value> context_arg = args[1];
1052   ContextifyContext* context_wrapper;
1053   if (!context_arg->IsObject() ||
1054       (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1055           env, context_arg.As<Object>())) == nullptr) {
1056     return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1057   }
1058 
1059   std::unique_ptr<MessagePortData> data;
1060   if (!port->IsDetached())
1061     data = port->Detach();
1062 
1063   Context::Scope context_scope(context_wrapper->context());
1064   MessagePort* target =
1065       MessagePort::New(env, context_wrapper->context(), std::move(data));
1066   if (target != nullptr)
1067     args.GetReturnValue().Set(target->object());
1068 }
1069 
Entangle(MessagePort * a,MessagePort * b)1070 void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1071   Entangle(a, b->data_.get());
1072 }
1073 
Entangle(MessagePort * a,MessagePortData * b)1074 void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1075   MessagePortData::Entangle(a->data_.get(), b);
1076 }
1077 
MemoryInfo(MemoryTracker * tracker) const1078 void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1079   tracker->TrackField("data", data_);
1080   tracker->TrackField("emit_message_fn", emit_message_fn_);
1081 }
1082 
GetMessagePortConstructorTemplate(Environment * env)1083 Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1084   // Factor generating the MessagePort JS constructor into its own piece
1085   // of code, because it is needed early on in the child environment setup.
1086   Local<FunctionTemplate> templ = env->message_port_constructor_template();
1087   if (!templ.IsEmpty())
1088     return templ;
1089 
1090   {
1091     Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
1092     m->SetClassName(env->message_port_constructor_string());
1093     m->InstanceTemplate()->SetInternalFieldCount(
1094         MessagePort::kInternalFieldCount);
1095     m->Inherit(HandleWrap::GetConstructorTemplate(env));
1096 
1097     env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
1098     env->SetProtoMethod(m, "start", MessagePort::Start);
1099 
1100     env->set_message_port_constructor_template(m);
1101   }
1102 
1103   return GetMessagePortConstructorTemplate(env);
1104 }
1105 
JSTransferable(Environment * env,Local<Object> obj)1106 JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1107     : BaseObject(env, obj) {
1108   MakeWeak();
1109 }
1110 
New(const FunctionCallbackInfo<Value> & args)1111 void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1112   CHECK(args.IsConstructCall());
1113   new JSTransferable(Environment::GetCurrent(args), args.This());
1114 }
1115 
GetTransferMode() const1116 JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1117   // Implement `kClone in this ? kCloneable : kTransferable`.
1118   HandleScope handle_scope(env()->isolate());
1119   errors::TryCatchScope ignore_exceptions(env());
1120 
1121   bool has_clone;
1122   if (!object()->Has(env()->context(),
1123                      env()->messaging_clone_symbol()).To(&has_clone)) {
1124     return TransferMode::kUntransferable;
1125   }
1126 
1127   return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1128 }
1129 
TransferForMessaging()1130 std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1131   return TransferOrClone(TransferMode::kTransferable);
1132 }
1133 
CloneForMessaging() const1134 std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1135   return TransferOrClone(TransferMode::kCloneable);
1136 }
1137 
TransferOrClone(TransferMode mode) const1138 std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1139     TransferMode mode) const {
1140   // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1141   // which should return an object with `data` and `deserializeInfo` properties;
1142   // `data` is written to the serializer later, and `deserializeInfo` is stored
1143   // on the `TransferData` instance as a string.
1144   HandleScope handle_scope(env()->isolate());
1145   Local<Context> context = env()->isolate()->GetCurrentContext();
1146   Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1147       env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1148 
1149   Local<Value> method;
1150   if (!object()->Get(context, method_name).ToLocal(&method)) {
1151     return {};
1152   }
1153   if (method->IsFunction()) {
1154     Local<Value> result_v;
1155     if (!method.As<Function>()->Call(
1156             context, object(), 0, nullptr).ToLocal(&result_v)) {
1157       return {};
1158     }
1159 
1160     if (result_v->IsObject()) {
1161       Local<Object> result = result_v.As<Object>();
1162       Local<Value> data;
1163       Local<Value> deserialize_info;
1164       if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1165           !result->Get(context, env()->deserialize_info_string())
1166               .ToLocal(&deserialize_info)) {
1167         return {};
1168       }
1169       Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1170       if (*deserialize_info_str == nullptr) return {};
1171       return std::make_unique<Data>(
1172           *deserialize_info_str, Global<Value>(env()->isolate(), data));
1173     }
1174   }
1175 
1176   if (mode == TransferMode::kTransferable)
1177     return TransferOrClone(TransferMode::kCloneable);
1178   else
1179     return {};
1180 }
1181 
1182 Maybe<BaseObjectList>
NestedTransferables() const1183 JSTransferable::NestedTransferables() const {
1184   // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1185   HandleScope handle_scope(env()->isolate());
1186   Local<Context> context = env()->isolate()->GetCurrentContext();
1187   Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1188 
1189   Local<Value> method;
1190   if (!object()->Get(context, method_name).ToLocal(&method)) {
1191     return Nothing<BaseObjectList>();
1192   }
1193   if (!method->IsFunction()) return Just(BaseObjectList {});
1194 
1195   Local<Value> list_v;
1196   if (!method.As<Function>()->Call(
1197           context, object(), 0, nullptr).ToLocal(&list_v)) {
1198     return Nothing<BaseObjectList>();
1199   }
1200   if (!list_v->IsArray()) return Just(BaseObjectList {});
1201   Local<Array> list = list_v.As<Array>();
1202 
1203   BaseObjectList ret;
1204   for (size_t i = 0; i < list->Length(); i++) {
1205     Local<Value> value;
1206     if (!list->Get(context, i).ToLocal(&value))
1207       return Nothing<BaseObjectList>();
1208     if (env()->base_object_ctor_template()->HasInstance(value))
1209       ret.emplace_back(Unwrap<BaseObject>(value));
1210   }
1211   return Just(ret);
1212 }
1213 
FinalizeTransferRead(Local<Context> context,ValueDeserializer * deserializer)1214 Maybe<bool> JSTransferable::FinalizeTransferRead(
1215     Local<Context> context, ValueDeserializer* deserializer) {
1216   // Call `this[kDeserialize](data)` where `data` comes from the return value
1217   // of `this[kTransfer]()` or `this[kClone]()`.
1218   HandleScope handle_scope(env()->isolate());
1219   Local<Value> data;
1220   if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1221 
1222   Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1223   Local<Value> method;
1224   if (!object()->Get(context, method_name).ToLocal(&method)) {
1225     return Nothing<bool>();
1226   }
1227   if (!method->IsFunction()) return Just(true);
1228 
1229   if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1230     return Nothing<bool>();
1231   }
1232   return Just(true);
1233 }
1234 
Data(std::string && deserialize_info,v8::Global<v8::Value> && data)1235 JSTransferable::Data::Data(std::string&& deserialize_info,
1236                            v8::Global<v8::Value>&& data)
1237     : deserialize_info_(std::move(deserialize_info)),
1238       data_(std::move(data)) {}
1239 
Deserialize(Environment * env,Local<Context> context,std::unique_ptr<TransferData> self)1240 BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1241     Environment* env,
1242     Local<Context> context,
1243     std::unique_ptr<TransferData> self) {
1244   // Create the JS wrapper object that will later be filled with data passed to
1245   // the `[kDeserialize]()` method on it. This split is necessary, because here
1246   // we need to create an object with the right prototype and internal fields,
1247   // but the actual JS data stored in the serialized data can only be read at
1248   // the end of the stream, after the main message has been read.
1249 
1250   if (context != env->context()) {
1251     THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1252     return {};
1253   }
1254   HandleScope handle_scope(env->isolate());
1255   Local<Value> info;
1256   if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1257 
1258   Local<Value> ret;
1259   CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1260   if (!env->messaging_deserialize_create_object()->Call(
1261           context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1262       !env->base_object_ctor_template()->HasInstance(ret)) {
1263     return {};
1264   }
1265 
1266   return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1267 }
1268 
FinalizeTransferWrite(Local<Context> context,ValueSerializer * serializer)1269 Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1270     Local<Context> context, ValueSerializer* serializer) {
1271   HandleScope handle_scope(context->GetIsolate());
1272   auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1273   data_.Reset();
1274   return ret;
1275 }
1276 
1277 namespace {
1278 
SetDeserializerCreateObjectFunction(const FunctionCallbackInfo<Value> & args)1279 static void SetDeserializerCreateObjectFunction(
1280     const FunctionCallbackInfo<Value>& args) {
1281   Environment* env = Environment::GetCurrent(args);
1282   CHECK(args[0]->IsFunction());
1283   env->set_messaging_deserialize_create_object(args[0].As<Function>());
1284 }
1285 
MessageChannel(const FunctionCallbackInfo<Value> & args)1286 static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1287   Environment* env = Environment::GetCurrent(args);
1288   if (!args.IsConstructCall()) {
1289     THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1290     return;
1291   }
1292 
1293   Local<Context> context = args.This()->CreationContext();
1294   Context::Scope context_scope(context);
1295 
1296   MessagePort* port1 = MessagePort::New(env, context);
1297   if (port1 == nullptr) return;
1298   MessagePort* port2 = MessagePort::New(env, context);
1299   if (port2 == nullptr) {
1300     port1->Close();
1301     return;
1302   }
1303 
1304   MessagePort::Entangle(port1, port2);
1305 
1306   args.This()->Set(context, env->port1_string(), port1->object())
1307       .Check();
1308   args.This()->Set(context, env->port2_string(), port2->object())
1309       .Check();
1310 }
1311 
InitMessaging(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)1312 static void InitMessaging(Local<Object> target,
1313                           Local<Value> unused,
1314                           Local<Context> context,
1315                           void* priv) {
1316   Environment* env = Environment::GetCurrent(context);
1317 
1318   {
1319     env->SetConstructorFunction(
1320         target,
1321         "MessageChannel",
1322         env->NewFunctionTemplate(MessageChannel));
1323   }
1324 
1325   {
1326     Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New);
1327     t->Inherit(BaseObject::GetConstructorTemplate(env));
1328     t->InstanceTemplate()->SetInternalFieldCount(
1329         JSTransferable::kInternalFieldCount);
1330     env->SetConstructorFunction(target, "JSTransferable", t);
1331   }
1332 
1333   env->SetConstructorFunction(
1334       target,
1335       env->message_port_constructor_string(),
1336       GetMessagePortConstructorTemplate(env));
1337 
1338   // These are not methods on the MessagePort prototype, because
1339   // the browser equivalents do not provide them.
1340   env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1341   env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1342   env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1343   env->SetMethod(target, "moveMessagePortToContext",
1344                  MessagePort::MoveToContext);
1345   env->SetMethod(target, "setDeserializerCreateObjectFunction",
1346                  SetDeserializerCreateObjectFunction);
1347 
1348   {
1349     Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1350     target
1351         ->Set(context,
1352               FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1353               domexception)
1354         .Check();
1355   }
1356 }
1357 
1358 }  // anonymous namespace
1359 
1360 }  // namespace worker
1361 }  // namespace node
1362 
1363 NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1364