• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "node_messaging.h"
2 
3 #include "async_wrap-inl.h"
4 #include "debug_utils-inl.h"
5 #include "memory_tracker-inl.h"
6 #include "node_contextify.h"
7 #include "node_buffer.h"
8 #include "node_errors.h"
9 #include "node_process.h"
10 #include "util-inl.h"
11 
12 using node::contextify::ContextifyContext;
13 using node::errors::TryCatchScope;
14 using v8::Array;
15 using v8::ArrayBuffer;
16 using v8::ArrayBufferCreationMode;
17 using v8::Context;
18 using v8::EscapableHandleScope;
19 using v8::Function;
20 using v8::FunctionCallbackInfo;
21 using v8::FunctionTemplate;
22 using v8::Global;
23 using v8::HandleScope;
24 using v8::Isolate;
25 using v8::Just;
26 using v8::Local;
27 using v8::Maybe;
28 using v8::MaybeLocal;
29 using v8::Nothing;
30 using v8::Object;
31 using v8::SharedArrayBuffer;
32 using v8::String;
33 using v8::Symbol;
34 using v8::Value;
35 using v8::ValueDeserializer;
36 using v8::ValueSerializer;
37 using v8::WasmModuleObject;
38 
39 namespace node {
40 
41 using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
42 
GetTransferMode() const43 BaseObject::TransferMode BaseObject::GetTransferMode() const {
44   return BaseObject::TransferMode::kUntransferable;
45 }
46 
TransferForMessaging()47 std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
48   return CloneForMessaging();
49 }
50 
CloneForMessaging() const51 std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
52   return {};
53 }
54 
NestedTransferables() const55 Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
56   return Just(BaseObjectList {});
57 }
58 
FinalizeTransferRead(Local<Context> context,ValueDeserializer * deserializer)59 Maybe<bool> BaseObject::FinalizeTransferRead(
60     Local<Context> context, ValueDeserializer* deserializer) {
61   return Just(true);
62 }
63 
64 namespace worker {
65 
FinalizeTransferWrite(Local<Context> context,ValueSerializer * serializer)66 Maybe<bool> TransferData::FinalizeTransferWrite(
67     Local<Context> context, ValueSerializer* serializer) {
68   return Just(true);
69 }
70 
Message(MallocedBuffer<char> && buffer)71 Message::Message(MallocedBuffer<char>&& buffer)
72     : main_message_buf_(std::move(buffer)) {}
73 
IsCloseMessage() const74 bool Message::IsCloseMessage() const {
75   return main_message_buf_.data == nullptr;
76 }
77 
78 namespace {
79 
80 // This is used to tell V8 how to read transferred host objects, like other
81 // `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
82 class DeserializerDelegate : public ValueDeserializer::Delegate {
83  public:
DeserializerDelegate(Message * m,Environment * env,const std::vector<BaseObjectPtr<BaseObject>> & host_objects,const std::vector<Local<SharedArrayBuffer>> & shared_array_buffers,const std::vector<WasmModuleObject::TransferrableModule> & wasm_modules)84   DeserializerDelegate(
85       Message* m,
86       Environment* env,
87       const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
88       const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
89       const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules)
90       : host_objects_(host_objects),
91         shared_array_buffers_(shared_array_buffers),
92         wasm_modules_(wasm_modules) {}
93 
ReadHostObject(Isolate * isolate)94   MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
95     // Identifying the index in the message's BaseObject array is sufficient.
96     uint32_t id;
97     if (!deserializer->ReadUint32(&id))
98       return MaybeLocal<Object>();
99     CHECK_LE(id, host_objects_.size());
100     return host_objects_[id]->object(isolate);
101   }
102 
GetSharedArrayBufferFromId(Isolate * isolate,uint32_t clone_id)103   MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
104       Isolate* isolate, uint32_t clone_id) override {
105     CHECK_LE(clone_id, shared_array_buffers_.size());
106     return shared_array_buffers_[clone_id];
107   }
108 
GetWasmModuleFromId(Isolate * isolate,uint32_t transfer_id)109   MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
110       Isolate* isolate, uint32_t transfer_id) override {
111     CHECK_LE(transfer_id, wasm_modules_.size());
112     return WasmModuleObject::FromTransferrableModule(
113         isolate, wasm_modules_[transfer_id]);
114   }
115 
116   ValueDeserializer* deserializer = nullptr;
117 
118  private:
119   const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
120   const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
121   const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules_;
122 };
123 
124 }  // anonymous namespace
125 
Deserialize(Environment * env,Local<Context> context)126 MaybeLocal<Value> Message::Deserialize(Environment* env,
127                                        Local<Context> context) {
128   CHECK(!IsCloseMessage());
129 
130   EscapableHandleScope handle_scope(env->isolate());
131   Context::Scope context_scope(context);
132 
133   // Create all necessary objects for transferables, e.g. MessagePort handles.
134   std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
135   auto cleanup = OnScopeLeave([&]() {
136     for (BaseObjectPtr<BaseObject> object : host_objects) {
137       if (!object) continue;
138 
139       // If the function did not finish successfully, host_objects will contain
140       // a list of objects that will never be passed to JS. Therefore, we
141       // destroy them here.
142       object->Detach();
143     }
144   });
145 
146   for (uint32_t i = 0; i < transferables_.size(); ++i) {
147     TransferData* data = transferables_[i].get();
148     host_objects[i] = data->Deserialize(
149         env, context, std::move(transferables_[i]));
150     if (!host_objects[i]) return {};
151   }
152   transferables_.clear();
153 
154   std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
155   // Attach all transferred SharedArrayBuffers to their new Isolate.
156   for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
157     Local<SharedArrayBuffer> sab;
158     if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
159             .ToLocal(&sab))
160       return MaybeLocal<Value>();
161     shared_array_buffers.push_back(sab);
162   }
163   shared_array_buffers_.clear();
164 
165   DeserializerDelegate delegate(
166       this, env, host_objects, shared_array_buffers, wasm_modules_);
167   ValueDeserializer deserializer(
168       env->isolate(),
169       reinterpret_cast<const uint8_t*>(main_message_buf_.data),
170       main_message_buf_.size,
171       &delegate);
172   delegate.deserializer = &deserializer;
173 
174   // Attach all transferred ArrayBuffers to their new Isolate.
175   for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
176     if (!env->isolate_data()->uses_node_allocator()) {
177       // We don't use Node's allocator on the receiving side, so we have
178       // to create the ArrayBuffer from a copy of the memory.
179       AllocatedBuffer buf =
180           env->AllocateManaged(array_buffer_contents_[i].size);
181       memcpy(buf.data(),
182              array_buffer_contents_[i].data,
183              array_buffer_contents_[i].size);
184       deserializer.TransferArrayBuffer(i, buf.ToArrayBuffer());
185       continue;
186     }
187 
188     env->isolate_data()->node_allocator()->RegisterPointer(
189         array_buffer_contents_[i].data, array_buffer_contents_[i].size);
190 
191     Local<ArrayBuffer> ab =
192         ArrayBuffer::New(env->isolate(),
193                          array_buffer_contents_[i].release(),
194                          array_buffer_contents_[i].size,
195                          ArrayBufferCreationMode::kInternalized);
196     deserializer.TransferArrayBuffer(i, ab);
197   }
198   array_buffer_contents_.clear();
199 
200   if (deserializer.ReadHeader(context).IsNothing())
201     return {};
202   Local<Value> return_value;
203   if (!deserializer.ReadValue(context).ToLocal(&return_value))
204     return {};
205 
206   for (BaseObjectPtr<BaseObject> base_object : host_objects) {
207     if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
208       return {};
209   }
210 
211   host_objects.clear();
212   return handle_scope.Escape(return_value);
213 }
214 
AddSharedArrayBuffer(const SharedArrayBufferMetadataReference & reference)215 void Message::AddSharedArrayBuffer(
216     const SharedArrayBufferMetadataReference& reference) {
217   shared_array_buffers_.push_back(reference);
218 }
219 
AddTransferable(std::unique_ptr<TransferData> && data)220 void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
221   transferables_.emplace_back(std::move(data));
222 }
223 
AddWASMModule(WasmModuleObject::TransferrableModule && mod)224 uint32_t Message::AddWASMModule(WasmModuleObject::TransferrableModule&& mod) {
225   wasm_modules_.emplace_back(std::move(mod));
226   return wasm_modules_.size() - 1;
227 }
228 
229 namespace {
230 
GetEmitMessageFunction(Local<Context> context)231 MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
232   Isolate* isolate = context->GetIsolate();
233   Local<Object> per_context_bindings;
234   Local<Value> emit_message_val;
235   if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
236       !per_context_bindings->Get(context,
237                                 FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
238           .ToLocal(&emit_message_val)) {
239     return MaybeLocal<Function>();
240   }
241   CHECK(emit_message_val->IsFunction());
242   return emit_message_val.As<Function>();
243 }
244 
GetDOMException(Local<Context> context)245 MaybeLocal<Function> GetDOMException(Local<Context> context) {
246   Isolate* isolate = context->GetIsolate();
247   Local<Object> per_context_bindings;
248   Local<Value> domexception_ctor_val;
249   if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
250       !per_context_bindings->Get(context,
251                                 FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
252           .ToLocal(&domexception_ctor_val)) {
253     return MaybeLocal<Function>();
254   }
255   CHECK(domexception_ctor_val->IsFunction());
256   Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
257   return domexception_ctor;
258 }
259 
ThrowDataCloneException(Local<Context> context,Local<String> message)260 void ThrowDataCloneException(Local<Context> context, Local<String> message) {
261   Isolate* isolate = context->GetIsolate();
262   Local<Value> argv[] = {message,
263                          FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
264   Local<Value> exception;
265   Local<Function> domexception_ctor;
266   if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
267       !domexception_ctor->NewInstance(context, arraysize(argv), argv)
268            .ToLocal(&exception)) {
269     return;
270   }
271   isolate->ThrowException(exception);
272 }
273 
274 // This tells V8 how to serialize objects that it does not understand
275 // (e.g. C++ objects) into the output buffer, in a way that our own
276 // DeserializerDelegate understands how to unpack.
277 class SerializerDelegate : public ValueSerializer::Delegate {
278  public:
SerializerDelegate(Environment * env,Local<Context> context,Message * m)279   SerializerDelegate(Environment* env, Local<Context> context, Message* m)
280       : env_(env), context_(context), msg_(m) {}
281 
ThrowDataCloneError(Local<String> message)282   void ThrowDataCloneError(Local<String> message) override {
283     ThrowDataCloneException(context_, message);
284   }
285 
WriteHostObject(Isolate * isolate,Local<Object> object)286   Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
287     if (env_->base_object_ctor_template()->HasInstance(object)) {
288       return WriteHostObject(
289           BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
290     }
291 
292     ThrowDataCloneError(env_->clone_unsupported_type_str());
293     return Nothing<bool>();
294   }
295 
GetSharedArrayBufferId(Isolate * isolate,Local<SharedArrayBuffer> shared_array_buffer)296   Maybe<uint32_t> GetSharedArrayBufferId(
297       Isolate* isolate,
298       Local<SharedArrayBuffer> shared_array_buffer) override {
299     uint32_t i;
300     for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
301       if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
302           shared_array_buffer) {
303         return Just(i);
304       }
305     }
306 
307     auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
308         env_,
309         context_,
310         shared_array_buffer);
311     if (!reference) {
312       return Nothing<uint32_t>();
313     }
314     seen_shared_array_buffers_.emplace_back(
315       Global<SharedArrayBuffer> { isolate, shared_array_buffer });
316     msg_->AddSharedArrayBuffer(reference);
317     return Just(i);
318   }
319 
GetWasmModuleTransferId(Isolate * isolate,Local<WasmModuleObject> module)320   Maybe<uint32_t> GetWasmModuleTransferId(
321       Isolate* isolate, Local<WasmModuleObject> module) override {
322     return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
323   }
324 
Finish(Local<Context> context)325   Maybe<bool> Finish(Local<Context> context) {
326     for (uint32_t i = 0; i < host_objects_.size(); i++) {
327       BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
328       std::unique_ptr<TransferData> data;
329       if (i < first_cloned_object_index_)
330         data = host_object->TransferForMessaging();
331       if (!data)
332         data = host_object->CloneForMessaging();
333       if (!data) return Nothing<bool>();
334       if (data->FinalizeTransferWrite(context, serializer).IsNothing())
335         return Nothing<bool>();
336       msg_->AddTransferable(std::move(data));
337     }
338     return Just(true);
339   }
340 
AddHostObject(BaseObjectPtr<BaseObject> host_object)341   inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
342     // Make sure we have not started serializing the value itself yet.
343     CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
344     host_objects_.emplace_back(std::move(host_object));
345   }
346 
347   // Some objects in the transfer list may register sub-objects that can be
348   // transferred. This could e.g. be a public JS wrapper object, such as a
349   // FileHandle, that is registering its C++ handle for transfer.
AddNestedHostObjects()350   inline Maybe<bool> AddNestedHostObjects() {
351     for (size_t i = 0; i < host_objects_.size(); i++) {
352       std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
353       if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
354         return Nothing<bool>();
355       for (auto nested_transferable : nested_transferables) {
356         if (std::find(host_objects_.begin(),
357                       host_objects_.end(),
358                       nested_transferable) == host_objects_.end()) {
359           AddHostObject(nested_transferable);
360         }
361       }
362     }
363     return Just(true);
364   }
365 
366   ValueSerializer* serializer = nullptr;
367 
368  private:
WriteHostObject(BaseObjectPtr<BaseObject> host_object)369   Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
370     BaseObject::TransferMode mode = host_object->GetTransferMode();
371     if (mode == BaseObject::TransferMode::kUntransferable) {
372       ThrowDataCloneError(env_->clone_unsupported_type_str());
373       return Nothing<bool>();
374     }
375 
376     for (uint32_t i = 0; i < host_objects_.size(); i++) {
377       if (host_objects_[i] == host_object) {
378         serializer->WriteUint32(i);
379         return Just(true);
380       }
381     }
382 
383     if (mode == BaseObject::TransferMode::kTransferable) {
384       // TODO(addaleax): This message code is too specific. Fix that in a
385       // semver-major follow-up.
386       THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
387       return Nothing<bool>();
388     }
389 
390     CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
391     uint32_t index = host_objects_.size();
392     if (first_cloned_object_index_ == SIZE_MAX)
393       first_cloned_object_index_ = index;
394     serializer->WriteUint32(index);
395     host_objects_.push_back(host_object);
396     return Just(true);
397   }
398 
399   Environment* env_;
400   Local<Context> context_;
401   Message* msg_;
402   std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
403   std::vector<BaseObjectPtr<BaseObject>> host_objects_;
404   size_t first_cloned_object_index_ = SIZE_MAX;
405 
406   friend class worker::Message;
407 };
408 
409 }  // anonymous namespace
410 
Serialize(Environment * env,Local<Context> context,Local<Value> input,const TransferList & transfer_list_v,Local<Object> source_port)411 Maybe<bool> Message::Serialize(Environment* env,
412                                Local<Context> context,
413                                Local<Value> input,
414                                const TransferList& transfer_list_v,
415                                Local<Object> source_port) {
416   HandleScope handle_scope(env->isolate());
417   Context::Scope context_scope(context);
418 
419   // Verify that we're not silently overwriting an existing message.
420   CHECK(main_message_buf_.is_empty());
421 
422   SerializerDelegate delegate(env, context, this);
423   ValueSerializer serializer(env->isolate(), &delegate);
424   delegate.serializer = &serializer;
425 
426   std::vector<Local<ArrayBuffer>> array_buffers;
427   for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
428     Local<Value> entry = transfer_list_v[i];
429     if (entry->IsObject()) {
430       // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
431       // for details.
432       bool untransferable;
433       if (!entry.As<Object>()->HasPrivate(
434               context,
435               env->untransferable_object_private_symbol())
436               .To(&untransferable)) {
437         return Nothing<bool>();
438       }
439       if (untransferable) continue;
440     }
441 
442     // Currently, we support ArrayBuffers and BaseObjects for which
443     // GetTransferMode() does not return kUntransferable.
444     if (entry->IsArrayBuffer()) {
445       Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
446       // If we cannot render the ArrayBuffer unusable in this Isolate and
447       // take ownership of its memory, copying the buffer will have to do.
448       if (!ab->IsDetachable() || ab->IsExternal() ||
449           !env->isolate_data()->uses_node_allocator()) {
450         continue;
451       }
452 
453       if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
454           array_buffers.end()) {
455         ThrowDataCloneException(
456             context,
457             FIXED_ONE_BYTE_STRING(
458                 env->isolate(),
459                 "Transfer list contains duplicate ArrayBuffer"));
460         return Nothing<bool>();
461       }
462       // We simply use the array index in the `array_buffers` list as the
463       // ID that we write into the serialized buffer.
464       uint32_t id = array_buffers.size();
465       array_buffers.push_back(ab);
466       serializer.TransferArrayBuffer(id, ab);
467       continue;
468     } else if (env->base_object_ctor_template()->HasInstance(entry)) {
469       // Check if the source MessagePort is being transferred.
470       if (!source_port.IsEmpty() && entry == source_port) {
471         ThrowDataCloneException(
472             context,
473             FIXED_ONE_BYTE_STRING(env->isolate(),
474                                   "Transfer list contains source port"));
475         return Nothing<bool>();
476       }
477       BaseObjectPtr<BaseObject> host_object {
478           Unwrap<BaseObject>(entry.As<Object>()) };
479       if (env->message_port_constructor_template()->HasInstance(entry) &&
480           (!host_object ||
481            static_cast<MessagePort*>(host_object.get())->IsDetached())) {
482         ThrowDataCloneException(
483             context,
484             FIXED_ONE_BYTE_STRING(
485                 env->isolate(),
486                 "MessagePort in transfer list is already detached"));
487         return Nothing<bool>();
488       }
489       if (std::find(delegate.host_objects_.begin(),
490                     delegate.host_objects_.end(),
491                     host_object) != delegate.host_objects_.end()) {
492         ThrowDataCloneException(
493             context,
494             String::Concat(env->isolate(),
495                 FIXED_ONE_BYTE_STRING(
496                   env->isolate(),
497                   "Transfer list contains duplicate "),
498                 entry.As<Object>()->GetConstructorName()));
499         return Nothing<bool>();
500       }
501       if (host_object && host_object->GetTransferMode() !=
502               BaseObject::TransferMode::kUntransferable) {
503         delegate.AddHostObject(host_object);
504         continue;
505       }
506     }
507 
508     THROW_ERR_INVALID_TRANSFER_OBJECT(env);
509     return Nothing<bool>();
510   }
511   if (delegate.AddNestedHostObjects().IsNothing())
512     return Nothing<bool>();
513 
514   serializer.WriteHeader();
515   if (serializer.WriteValue(context, input).IsNothing()) {
516     return Nothing<bool>();
517   }
518 
519   for (Local<ArrayBuffer> ab : array_buffers) {
520     // If serialization succeeded, we want to take ownership of
521     // (a.k.a. externalize) the underlying memory region and render
522     // it inaccessible in this Isolate.
523     ArrayBuffer::Contents contents = ab->Externalize();
524     ab->Detach();
525 
526     CHECK(env->isolate_data()->uses_node_allocator());
527     env->isolate_data()->node_allocator()->UnregisterPointer(
528         contents.Data(), contents.ByteLength());
529 
530     array_buffer_contents_.emplace_back(MallocedBuffer<char>{
531         static_cast<char*>(contents.Data()), contents.ByteLength()});
532   }
533 
534   if (delegate.Finish(context).IsNothing())
535     return Nothing<bool>();
536 
537   // The serializer gave us a buffer allocated using `malloc()`.
538   std::pair<uint8_t*, size_t> data = serializer.Release();
539   CHECK_NOT_NULL(data.first);
540   main_message_buf_ =
541       MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
542   return Just(true);
543 }
544 
MemoryInfo(MemoryTracker * tracker) const545 void Message::MemoryInfo(MemoryTracker* tracker) const {
546   tracker->TrackField("array_buffer_contents", array_buffer_contents_);
547   tracker->TrackFieldWithSize("shared_array_buffers",
548       shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
549   tracker->TrackField("transferables", transferables_);
550 }
551 
MessagePortData(MessagePort * owner)552 MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
553 
~MessagePortData()554 MessagePortData::~MessagePortData() {
555   CHECK_NULL(owner_);
556   Disentangle();
557 }
558 
MemoryInfo(MemoryTracker * tracker) const559 void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
560   Mutex::ScopedLock lock(mutex_);
561   tracker->TrackField("incoming_messages", incoming_messages_);
562 }
563 
AddToIncomingQueue(Message && message)564 void MessagePortData::AddToIncomingQueue(Message&& message) {
565   // This function will be called by other threads.
566   Mutex::ScopedLock lock(mutex_);
567   incoming_messages_.emplace_back(std::move(message));
568 
569   if (owner_ != nullptr) {
570     Debug(owner_, "Adding message to incoming queue");
571     owner_->TriggerAsync();
572   }
573 }
574 
Entangle(MessagePortData * a,MessagePortData * b)575 void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
576   CHECK_NULL(a->sibling_);
577   CHECK_NULL(b->sibling_);
578   a->sibling_ = b;
579   b->sibling_ = a;
580   a->sibling_mutex_ = b->sibling_mutex_;
581 }
582 
Disentangle()583 void MessagePortData::Disentangle() {
584   // Grab a copy of the sibling mutex, then replace it so that each sibling
585   // has its own sibling_mutex_ now.
586   std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
587   Mutex::ScopedLock sibling_lock(*sibling_mutex);
588   sibling_mutex_ = std::make_shared<Mutex>();
589 
590   MessagePortData* sibling = sibling_;
591   if (sibling_ != nullptr) {
592     sibling_->sibling_ = nullptr;
593     sibling_ = nullptr;
594   }
595 
596   // We close MessagePorts after disentanglement, so we enqueue a corresponding
597   // message and trigger the corresponding uv_async_t to let them know that
598   // this happened.
599   AddToIncomingQueue(Message());
600   if (sibling != nullptr) {
601     sibling->AddToIncomingQueue(Message());
602   }
603 }
604 
~MessagePort()605 MessagePort::~MessagePort() {
606   if (data_) Detach();
607 }
608 
MessagePort(Environment * env,Local<Context> context,Local<Object> wrap)609 MessagePort::MessagePort(Environment* env,
610                          Local<Context> context,
611                          Local<Object> wrap)
612   : HandleWrap(env,
613                wrap,
614                reinterpret_cast<uv_handle_t*>(&async_),
615                AsyncWrap::PROVIDER_MESSAGEPORT),
616     data_(new MessagePortData(this)) {
617   auto onmessage = [](uv_async_t* handle) {
618     // Called when data has been put into the queue.
619     MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
620     channel->OnMessage();
621   };
622 
623   CHECK_EQ(uv_async_init(env->event_loop(),
624                          &async_,
625                          onmessage), 0);
626   // Reset later to indicate success of the constructor.
627   bool succeeded = false;
628   auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
629 
630   Local<Value> fn;
631   if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
632     return;
633 
634   if (fn->IsFunction()) {
635     Local<Function> init = fn.As<Function>();
636     if (init->Call(context, wrap, 0, nullptr).IsEmpty())
637       return;
638   }
639 
640   Local<Function> emit_message_fn;
641   if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
642     return;
643   emit_message_fn_.Reset(env->isolate(), emit_message_fn);
644 
645   succeeded = true;
646   Debug(this, "Created message port");
647 }
648 
IsDetached() const649 bool MessagePort::IsDetached() const {
650   return data_ == nullptr || IsHandleClosing();
651 }
652 
TriggerAsync()653 void MessagePort::TriggerAsync() {
654   if (IsHandleClosing()) return;
655   CHECK_EQ(uv_async_send(&async_), 0);
656 }
657 
Close(v8::Local<v8::Value> close_callback)658 void MessagePort::Close(v8::Local<v8::Value> close_callback) {
659   Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
660 
661   if (data_) {
662     // Wrap this call with accessing the mutex, so that TriggerAsync()
663     // can check IsHandleClosing() without race conditions.
664     Mutex::ScopedLock sibling_lock(data_->mutex_);
665     HandleWrap::Close(close_callback);
666   } else {
667     HandleWrap::Close(close_callback);
668   }
669 }
670 
New(const FunctionCallbackInfo<Value> & args)671 void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
672   // This constructor just throws an error. Unfortunately, we can’t use V8’s
673   // ConstructorBehavior::kThrow, as that also removes the prototype from the
674   // class (i.e. makes it behave like an arrow function).
675   Environment* env = Environment::GetCurrent(args);
676   THROW_ERR_CONSTRUCT_CALL_INVALID(env);
677 }
678 
New(Environment * env,Local<Context> context,std::unique_ptr<MessagePortData> data)679 MessagePort* MessagePort::New(
680     Environment* env,
681     Local<Context> context,
682     std::unique_ptr<MessagePortData> data) {
683   Context::Scope context_scope(context);
684   Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
685 
686   // Construct a new instance, then assign the listener instance and possibly
687   // the MessagePortData to it.
688   Local<Object> instance;
689   if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
690     return nullptr;
691   MessagePort* port = new MessagePort(env, context, instance);
692   CHECK_NOT_NULL(port);
693   if (port->IsHandleClosing()) {
694     // Construction failed with an exception.
695     return nullptr;
696   }
697 
698   if (data) {
699     port->Detach();
700     port->data_ = std::move(data);
701 
702     // This lock is here to avoid race conditions with the `owner_` read
703     // in AddToIncomingQueue(). (This would likely be unproblematic without it,
704     // but it's better to be safe than sorry.)
705     Mutex::ScopedLock lock(port->data_->mutex_);
706     port->data_->owner_ = port;
707     // If the existing MessagePortData object had pending messages, this is
708     // the easiest way to run that queue.
709     port->TriggerAsync();
710   }
711   return port;
712 }
713 
ReceiveMessage(Local<Context> context,bool only_if_receiving)714 MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
715                                               bool only_if_receiving) {
716   Message received;
717   {
718     // Get the head of the message queue.
719     Mutex::ScopedLock lock(data_->mutex_);
720 
721     Debug(this, "MessagePort has message");
722 
723     bool wants_message = receiving_messages_ || !only_if_receiving;
724     // We have nothing to do if:
725     // - There are no pending messages
726     // - We are not intending to receive messages, and the message we would
727     //   receive is not the final "close" message.
728     if (data_->incoming_messages_.empty() ||
729         (!wants_message &&
730          !data_->incoming_messages_.front().IsCloseMessage())) {
731       return env()->no_message_symbol();
732     }
733 
734     received = std::move(data_->incoming_messages_.front());
735     data_->incoming_messages_.pop_front();
736   }
737 
738   if (received.IsCloseMessage()) {
739     Close();
740     return env()->no_message_symbol();
741   }
742 
743   if (!env()->can_call_into_js()) return MaybeLocal<Value>();
744 
745   return received.Deserialize(env(), context);
746 }
747 
OnMessage()748 void MessagePort::OnMessage() {
749   Debug(this, "Running MessagePort::OnMessage()");
750   HandleScope handle_scope(env()->isolate());
751   Local<Context> context = object(env()->isolate())->CreationContext();
752 
753   size_t processing_limit;
754   {
755     Mutex::ScopedLock(data_->mutex_);
756     processing_limit = std::max(data_->incoming_messages_.size(),
757                                 static_cast<size_t>(1000));
758   }
759 
760   // data_ can only ever be modified by the owner thread, so no need to lock.
761   // However, the message port may be transferred while it is processing
762   // messages, so we need to check that this handle still owns its `data_` field
763   // on every iteration.
764   while (data_) {
765     if (processing_limit-- == 0) {
766       // Prevent event loop starvation by only processing those messages without
767       // interruption that were already present when the OnMessage() call was
768       // first triggered, but at least 1000 messages because otherwise the
769       // overhead of repeatedly triggering the uv_async_t instance becomes
770       // noticable, at least on Windows.
771       // (That might require more investigation by somebody more familiar with
772       // Windows.)
773       TriggerAsync();
774       return;
775     }
776 
777     HandleScope handle_scope(env()->isolate());
778     Context::Scope context_scope(context);
779     Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
780 
781     Local<Value> payload;
782     Local<Value> message_error;
783     {
784       // Catch any exceptions from parsing the message itself (not from
785       // emitting it) as 'messageeror' events.
786       TryCatchScope try_catch(env());
787       if (!ReceiveMessage(context, true).ToLocal(&payload)) {
788         if (try_catch.HasCaught() && !try_catch.HasTerminated())
789           message_error = try_catch.Exception();
790         goto reschedule;
791       }
792     }
793     if (payload == env()->no_message_symbol()) break;
794 
795     if (!env()->can_call_into_js()) {
796       Debug(this, "MessagePort drains queue because !can_call_into_js()");
797       // In this case there is nothing to do but to drain the current queue.
798       continue;
799     }
800 
801     if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
802     reschedule:
803       if (!message_error.IsEmpty()) {
804         // This should become a `messageerror` event in the sense of the
805         // EventTarget API at some point.
806         Local<Value> argv[] = {
807           env()->messageerror_string(),
808           message_error
809         };
810         USE(MakeCallback(env()->emit_string(), arraysize(argv), argv));
811       }
812 
813       // Re-schedule OnMessage() execution in case of failure.
814       if (data_)
815         TriggerAsync();
816       return;
817     }
818   }
819 }
820 
OnClose()821 void MessagePort::OnClose() {
822   Debug(this, "MessagePort::OnClose()");
823   if (data_) {
824     // Detach() returns move(data_).
825     Detach()->Disentangle();
826   }
827 }
828 
Detach()829 std::unique_ptr<MessagePortData> MessagePort::Detach() {
830   CHECK(data_);
831   Mutex::ScopedLock lock(data_->mutex_);
832   data_->owner_ = nullptr;
833   return std::move(data_);
834 }
835 
GetTransferMode() const836 BaseObject::TransferMode MessagePort::GetTransferMode() const {
837   if (IsDetached())
838     return BaseObject::TransferMode::kUntransferable;
839   return BaseObject::TransferMode::kTransferable;
840 }
841 
TransferForMessaging()842 std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
843   Close();
844   return Detach();
845 }
846 
Deserialize(Environment * env,Local<Context> context,std::unique_ptr<TransferData> self)847 BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
848     Environment* env,
849     Local<Context> context,
850     std::unique_ptr<TransferData> self) {
851   return BaseObjectPtr<MessagePort> { MessagePort::New(
852       env, context,
853       static_unique_pointer_cast<MessagePortData>(std::move(self))) };
854 }
855 
PostMessage(Environment * env,Local<Value> message_v,const TransferList & transfer_v)856 Maybe<bool> MessagePort::PostMessage(Environment* env,
857                                      Local<Value> message_v,
858                                      const TransferList& transfer_v) {
859   Isolate* isolate = env->isolate();
860   Local<Object> obj = object(isolate);
861   Local<Context> context = obj->CreationContext();
862 
863   Message msg;
864 
865   // Per spec, we need to both check if transfer list has the source port, and
866   // serialize the input message, even if the MessagePort is closed or detached.
867 
868   Maybe<bool> serialization_maybe =
869       msg.Serialize(env, context, message_v, transfer_v, obj);
870   if (data_ == nullptr) {
871     return serialization_maybe;
872   }
873   if (serialization_maybe.IsNothing()) {
874     return Nothing<bool>();
875   }
876 
877   Mutex::ScopedLock lock(*data_->sibling_mutex_);
878   bool doomed = false;
879 
880   // Check if the target port is posted to itself.
881   if (data_->sibling_ != nullptr) {
882     for (const auto& transferable : msg.transferables()) {
883       if (data_->sibling_ == transferable.get()) {
884         doomed = true;
885         ProcessEmitWarning(env, "The target port was posted to itself, and "
886                                 "the communication channel was lost");
887         break;
888       }
889     }
890   }
891 
892   if (data_->sibling_ == nullptr || doomed)
893     return Just(true);
894 
895   data_->sibling_->AddToIncomingQueue(std::move(msg));
896   return Just(true);
897 }
898 
ReadIterable(Environment * env,Local<Context> context,TransferList & transfer_list,Local<Value> object)899 static Maybe<bool> ReadIterable(Environment* env,
900                                 Local<Context> context,
901                                 // NOLINTNEXTLINE(runtime/references)
902                                 TransferList& transfer_list,
903                                 Local<Value> object) {
904   if (!object->IsObject()) return Just(false);
905 
906   if (object->IsArray()) {
907     Local<Array> arr = object.As<Array>();
908     size_t length = arr->Length();
909     transfer_list.AllocateSufficientStorage(length);
910     for (size_t i = 0; i < length; i++) {
911       if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
912         return Nothing<bool>();
913     }
914     return Just(true);
915   }
916 
917   Isolate* isolate = env->isolate();
918   Local<Value> iterator_method;
919   if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
920       .ToLocal(&iterator_method)) return Nothing<bool>();
921   if (!iterator_method->IsFunction()) return Just(false);
922 
923   Local<Value> iterator;
924   if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
925       .ToLocal(&iterator)) return Nothing<bool>();
926   if (!iterator->IsObject()) return Just(false);
927 
928   Local<Value> next;
929   if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
930     return Nothing<bool>();
931   if (!next->IsFunction()) return Just(false);
932 
933   std::vector<Local<Value>> entries;
934   while (env->can_call_into_js()) {
935     Local<Value> result;
936     if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
937         .ToLocal(&result)) return Nothing<bool>();
938     if (!result->IsObject()) return Just(false);
939 
940     Local<Value> done;
941     if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
942       return Nothing<bool>();
943     if (done->BooleanValue(isolate)) break;
944 
945     Local<Value> val;
946     if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
947       return Nothing<bool>();
948     entries.push_back(val);
949   }
950 
951   transfer_list.AllocateSufficientStorage(entries.size());
952   std::copy(entries.begin(), entries.end(), &transfer_list[0]);
953   return Just(true);
954 }
955 
PostMessage(const FunctionCallbackInfo<Value> & args)956 void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
957   Environment* env = Environment::GetCurrent(args);
958   Local<Object> obj = args.This();
959   Local<Context> context = obj->CreationContext();
960 
961   if (args.Length() == 0) {
962     return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
963                                        "MessagePort.postMessage");
964   }
965 
966   if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
967     // Browsers ignore null or undefined, and otherwise accept an array or an
968     // options object.
969     return THROW_ERR_INVALID_ARG_TYPE(env,
970         "Optional transferList argument must be an iterable");
971   }
972 
973   TransferList transfer_list;
974   if (args[1]->IsObject()) {
975     bool was_iterable;
976     if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
977       return;
978     if (!was_iterable) {
979       Local<Value> transfer_option;
980       if (!args[1].As<Object>()->Get(context, env->transfer_string())
981           .ToLocal(&transfer_option)) return;
982       if (!transfer_option->IsUndefined()) {
983         if (!ReadIterable(env, context, transfer_list, transfer_option)
984             .To(&was_iterable)) return;
985         if (!was_iterable) {
986           return THROW_ERR_INVALID_ARG_TYPE(env,
987               "Optional options.transfer argument must be an iterable");
988         }
989       }
990     }
991   }
992 
993   MessagePort* port = Unwrap<MessagePort>(args.This());
994   // Even if the backing MessagePort object has already been deleted, we still
995   // want to serialize the message to ensure spec-compliant behavior w.r.t.
996   // transfers.
997   if (port == nullptr) {
998     Message msg;
999     USE(msg.Serialize(env, context, args[0], transfer_list, obj));
1000     return;
1001   }
1002 
1003   port->PostMessage(env, args[0], transfer_list);
1004 }
1005 
Start()1006 void MessagePort::Start() {
1007   Debug(this, "Start receiving messages");
1008   receiving_messages_ = true;
1009   Mutex::ScopedLock lock(data_->mutex_);
1010   if (!data_->incoming_messages_.empty())
1011     TriggerAsync();
1012 }
1013 
Stop()1014 void MessagePort::Stop() {
1015   Debug(this, "Stop receiving messages");
1016   receiving_messages_ = false;
1017 }
1018 
Start(const FunctionCallbackInfo<Value> & args)1019 void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
1020   MessagePort* port;
1021   ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
1022   if (!port->data_) {
1023     return;
1024   }
1025   port->Start();
1026 }
1027 
Stop(const FunctionCallbackInfo<Value> & args)1028 void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
1029   MessagePort* port;
1030   CHECK(args[0]->IsObject());
1031   ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1032   if (!port->data_) {
1033     return;
1034   }
1035   port->Stop();
1036 }
1037 
Drain(const FunctionCallbackInfo<Value> & args)1038 void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1039   MessagePort* port;
1040   ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1041   port->OnMessage();
1042 }
1043 
ReceiveMessage(const FunctionCallbackInfo<Value> & args)1044 void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1045   Environment* env = Environment::GetCurrent(args);
1046   if (!args[0]->IsObject() ||
1047       !env->message_port_constructor_template()->HasInstance(args[0])) {
1048     return THROW_ERR_INVALID_ARG_TYPE(env,
1049         "First argument needs to be a MessagePort instance");
1050   }
1051   MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1052   if (port == nullptr) {
1053     // Return 'no messages' for a closed port.
1054     args.GetReturnValue().Set(
1055         Environment::GetCurrent(args)->no_message_symbol());
1056     return;
1057   }
1058 
1059   MaybeLocal<Value> payload =
1060       port->ReceiveMessage(port->object()->CreationContext(), false);
1061   if (!payload.IsEmpty())
1062     args.GetReturnValue().Set(payload.ToLocalChecked());
1063 }
1064 
MoveToContext(const FunctionCallbackInfo<Value> & args)1065 void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1066   Environment* env = Environment::GetCurrent(args);
1067   if (!args[0]->IsObject() ||
1068       !env->message_port_constructor_template()->HasInstance(args[0])) {
1069     return THROW_ERR_INVALID_ARG_TYPE(env,
1070         "First argument needs to be a MessagePort instance");
1071   }
1072   MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1073   CHECK_NOT_NULL(port);
1074 
1075   Local<Value> context_arg = args[1];
1076   ContextifyContext* context_wrapper;
1077   if (!context_arg->IsObject() ||
1078       (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1079           env, context_arg.As<Object>())) == nullptr) {
1080     return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1081   }
1082 
1083   std::unique_ptr<MessagePortData> data;
1084   if (!port->IsDetached())
1085     data = port->Detach();
1086 
1087   Context::Scope context_scope(context_wrapper->context());
1088   MessagePort* target =
1089       MessagePort::New(env, context_wrapper->context(), std::move(data));
1090   if (target != nullptr)
1091     args.GetReturnValue().Set(target->object());
1092 }
1093 
Entangle(MessagePort * a,MessagePort * b)1094 void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1095   Entangle(a, b->data_.get());
1096 }
1097 
Entangle(MessagePort * a,MessagePortData * b)1098 void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1099   MessagePortData::Entangle(a->data_.get(), b);
1100 }
1101 
MemoryInfo(MemoryTracker * tracker) const1102 void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1103   tracker->TrackField("data", data_);
1104   tracker->TrackField("emit_message_fn", emit_message_fn_);
1105 }
1106 
GetMessagePortConstructorTemplate(Environment * env)1107 Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1108   // Factor generating the MessagePort JS constructor into its own piece
1109   // of code, because it is needed early on in the child environment setup.
1110   Local<FunctionTemplate> templ = env->message_port_constructor_template();
1111   if (!templ.IsEmpty())
1112     return templ;
1113 
1114   {
1115     Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
1116     m->SetClassName(env->message_port_constructor_string());
1117     m->InstanceTemplate()->SetInternalFieldCount(
1118         MessagePort::kInternalFieldCount);
1119     m->Inherit(HandleWrap::GetConstructorTemplate(env));
1120 
1121     env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
1122     env->SetProtoMethod(m, "start", MessagePort::Start);
1123 
1124     env->set_message_port_constructor_template(m);
1125   }
1126 
1127   return GetMessagePortConstructorTemplate(env);
1128 }
1129 
JSTransferable(Environment * env,Local<Object> obj)1130 JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1131     : BaseObject(env, obj) {
1132   MakeWeak();
1133 }
1134 
New(const FunctionCallbackInfo<Value> & args)1135 void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1136   CHECK(args.IsConstructCall());
1137   new JSTransferable(Environment::GetCurrent(args), args.This());
1138 }
1139 
GetTransferMode() const1140 JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1141   // Implement `kClone in this ? kCloneable : kTransferable`.
1142   HandleScope handle_scope(env()->isolate());
1143   errors::TryCatchScope ignore_exceptions(env());
1144 
1145   bool has_clone;
1146   if (!object()->Has(env()->context(),
1147                      env()->messaging_clone_symbol()).To(&has_clone)) {
1148     return TransferMode::kUntransferable;
1149   }
1150 
1151   return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1152 }
1153 
TransferForMessaging()1154 std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1155   return TransferOrClone(TransferMode::kTransferable);
1156 }
1157 
CloneForMessaging() const1158 std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1159   return TransferOrClone(TransferMode::kCloneable);
1160 }
1161 
TransferOrClone(TransferMode mode) const1162 std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1163     TransferMode mode) const {
1164   // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1165   // which should return an object with `data` and `deserializeInfo` properties;
1166   // `data` is written to the serializer later, and `deserializeInfo` is stored
1167   // on the `TransferData` instance as a string.
1168   HandleScope handle_scope(env()->isolate());
1169   Local<Context> context = env()->isolate()->GetCurrentContext();
1170   Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1171       env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1172 
1173   Local<Value> method;
1174   if (!object()->Get(context, method_name).ToLocal(&method)) {
1175     return {};
1176   }
1177   if (method->IsFunction()) {
1178     Local<Value> result_v;
1179     if (!method.As<Function>()->Call(
1180             context, object(), 0, nullptr).ToLocal(&result_v)) {
1181       return {};
1182     }
1183 
1184     if (result_v->IsObject()) {
1185       Local<Object> result = result_v.As<Object>();
1186       Local<Value> data;
1187       Local<Value> deserialize_info;
1188       if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1189           !result->Get(context, env()->deserialize_info_string())
1190               .ToLocal(&deserialize_info)) {
1191         return {};
1192       }
1193       Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1194       if (*deserialize_info_str == nullptr) return {};
1195       return std::make_unique<Data>(
1196           *deserialize_info_str, Global<Value>(env()->isolate(), data));
1197     }
1198   }
1199 
1200   if (mode == TransferMode::kTransferable)
1201     return TransferOrClone(TransferMode::kCloneable);
1202   else
1203     return {};
1204 }
1205 
1206 Maybe<BaseObjectList>
NestedTransferables() const1207 JSTransferable::NestedTransferables() const {
1208   // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1209   HandleScope handle_scope(env()->isolate());
1210   Local<Context> context = env()->isolate()->GetCurrentContext();
1211   Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1212 
1213   Local<Value> method;
1214   if (!object()->Get(context, method_name).ToLocal(&method)) {
1215     return Nothing<BaseObjectList>();
1216   }
1217   if (!method->IsFunction()) return Just(BaseObjectList {});
1218 
1219   Local<Value> list_v;
1220   if (!method.As<Function>()->Call(
1221           context, object(), 0, nullptr).ToLocal(&list_v)) {
1222     return Nothing<BaseObjectList>();
1223   }
1224   if (!list_v->IsArray()) return Just(BaseObjectList {});
1225   Local<Array> list = list_v.As<Array>();
1226 
1227   BaseObjectList ret;
1228   for (size_t i = 0; i < list->Length(); i++) {
1229     Local<Value> value;
1230     if (!list->Get(context, i).ToLocal(&value))
1231       return Nothing<BaseObjectList>();
1232     if (env()->base_object_ctor_template()->HasInstance(value))
1233       ret.emplace_back(Unwrap<BaseObject>(value.As<Object>()));
1234   }
1235   return Just(ret);
1236 }
1237 
FinalizeTransferRead(Local<Context> context,ValueDeserializer * deserializer)1238 Maybe<bool> JSTransferable::FinalizeTransferRead(
1239     Local<Context> context, ValueDeserializer* deserializer) {
1240   // Call `this[kDeserialize](data)` where `data` comes from the return value
1241   // of `this[kTransfer]()` or `this[kClone]()`.
1242   HandleScope handle_scope(env()->isolate());
1243   Local<Value> data;
1244   if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1245 
1246   Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1247   Local<Value> method;
1248   if (!object()->Get(context, method_name).ToLocal(&method)) {
1249     return Nothing<bool>();
1250   }
1251   if (!method->IsFunction()) return Just(true);
1252 
1253   if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1254     return Nothing<bool>();
1255   }
1256   return Just(true);
1257 }
1258 
Data(std::string && deserialize_info,v8::Global<v8::Value> && data)1259 JSTransferable::Data::Data(std::string&& deserialize_info,
1260                            v8::Global<v8::Value>&& data)
1261     : deserialize_info_(std::move(deserialize_info)),
1262       data_(std::move(data)) {}
1263 
Deserialize(Environment * env,Local<Context> context,std::unique_ptr<TransferData> self)1264 BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1265     Environment* env,
1266     Local<Context> context,
1267     std::unique_ptr<TransferData> self) {
1268   // Create the JS wrapper object that will later be filled with data passed to
1269   // the `[kDeserialize]()` method on it. This split is necessary, because here
1270   // we need to create an object with the right prototype and internal fields,
1271   // but the actual JS data stored in the serialized data can only be read at
1272   // the end of the stream, after the main message has been read.
1273 
1274   if (context != env->context()) {
1275     THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1276     return {};
1277   }
1278   HandleScope handle_scope(env->isolate());
1279   Local<Value> info;
1280   if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1281 
1282   Local<Value> ret;
1283   CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1284   if (!env->messaging_deserialize_create_object()->Call(
1285           context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1286       !env->base_object_ctor_template()->HasInstance(ret)) {
1287     return {};
1288   }
1289 
1290   return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret.As<Object>()) };
1291 }
1292 
FinalizeTransferWrite(Local<Context> context,ValueSerializer * serializer)1293 Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1294     Local<Context> context, ValueSerializer* serializer) {
1295   HandleScope handle_scope(context->GetIsolate());
1296   auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1297   data_.Reset();
1298   return ret;
1299 }
1300 
1301 namespace {
1302 
SetDeserializerCreateObjectFunction(const FunctionCallbackInfo<Value> & args)1303 static void SetDeserializerCreateObjectFunction(
1304     const FunctionCallbackInfo<Value>& args) {
1305   Environment* env = Environment::GetCurrent(args);
1306   CHECK(args[0]->IsFunction());
1307   env->set_messaging_deserialize_create_object(args[0].As<Function>());
1308 }
1309 
MessageChannel(const FunctionCallbackInfo<Value> & args)1310 static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1311   Environment* env = Environment::GetCurrent(args);
1312   if (!args.IsConstructCall()) {
1313     THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1314     return;
1315   }
1316 
1317   Local<Context> context = args.This()->CreationContext();
1318   Context::Scope context_scope(context);
1319 
1320   MessagePort* port1 = MessagePort::New(env, context);
1321   if (port1 == nullptr) return;
1322   MessagePort* port2 = MessagePort::New(env, context);
1323   if (port2 == nullptr) {
1324     port1->Close();
1325     return;
1326   }
1327 
1328   MessagePort::Entangle(port1, port2);
1329 
1330   args.This()->Set(context, env->port1_string(), port1->object())
1331       .Check();
1332   args.This()->Set(context, env->port2_string(), port2->object())
1333       .Check();
1334 }
1335 
InitMessaging(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)1336 static void InitMessaging(Local<Object> target,
1337                           Local<Value> unused,
1338                           Local<Context> context,
1339                           void* priv) {
1340   Environment* env = Environment::GetCurrent(context);
1341 
1342   {
1343     Local<String> message_channel_string =
1344         FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
1345     Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
1346     templ->SetClassName(message_channel_string);
1347     target->Set(context,
1348                 message_channel_string,
1349                 templ->GetFunction(context).ToLocalChecked()).Check();
1350   }
1351 
1352   {
1353     Local<String> js_transferable_string =
1354         FIXED_ONE_BYTE_STRING(env->isolate(), "JSTransferable");
1355     Local<FunctionTemplate> t = env->NewFunctionTemplate(JSTransferable::New);
1356     t->Inherit(BaseObject::GetConstructorTemplate(env));
1357     t->SetClassName(js_transferable_string);
1358     t->InstanceTemplate()->SetInternalFieldCount(
1359         JSTransferable::kInternalFieldCount);
1360     target->Set(context,
1361                 js_transferable_string,
1362                 t->GetFunction(context).ToLocalChecked()).Check();
1363   }
1364 
1365   target->Set(context,
1366               env->message_port_constructor_string(),
1367               GetMessagePortConstructorTemplate(env)
1368                   ->GetFunction(context).ToLocalChecked()).Check();
1369 
1370   // These are not methods on the MessagePort prototype, because
1371   // the browser equivalents do not provide them.
1372   env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
1373   env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
1374   env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1375   env->SetMethod(target, "moveMessagePortToContext",
1376                  MessagePort::MoveToContext);
1377   env->SetMethod(target, "setDeserializerCreateObjectFunction",
1378                  SetDeserializerCreateObjectFunction);
1379 
1380   {
1381     Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1382     target
1383         ->Set(context,
1384               FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1385               domexception)
1386         .Check();
1387   }
1388 }
1389 
1390 }  // anonymous namespace
1391 
1392 }  // namespace worker
1393 }  // namespace node
1394 
1395 NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1396