• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "node_messaging.h"
2 
3 #include "async_wrap-inl.h"
4 #include "debug_utils-inl.h"
5 #include "memory_tracker-inl.h"
6 #include "node_buffer.h"
7 #include "node_contextify.h"
8 #include "node_errors.h"
9 #include "node_external_reference.h"
10 #include "node_process-inl.h"
11 #include "util-inl.h"
12 
13 using node::contextify::ContextifyContext;
14 using node::errors::TryCatchScope;
15 using v8::Array;
16 using v8::ArrayBuffer;
17 using v8::BackingStore;
18 using v8::CompiledWasmModule;
19 using v8::Context;
20 using v8::EscapableHandleScope;
21 using v8::Function;
22 using v8::FunctionCallbackInfo;
23 using v8::FunctionTemplate;
24 using v8::Global;
25 using v8::HandleScope;
26 using v8::Isolate;
27 using v8::Just;
28 using v8::Local;
29 using v8::Maybe;
30 using v8::MaybeLocal;
31 using v8::Nothing;
32 using v8::Object;
33 using v8::SharedArrayBuffer;
34 using v8::String;
35 using v8::Symbol;
36 using v8::Value;
37 using v8::ValueDeserializer;
38 using v8::ValueSerializer;
39 using v8::WasmModuleObject;
40 
41 namespace node {
42 
43 using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
44 
45 // Hack to have WriteHostObject inform ReadHostObject that the value
46 // should be treated as a regular JS object. Used to transfer process.env.
47 static const uint32_t kNormalObject = static_cast<uint32_t>(-1);
48 
GetTransferMode() const49 BaseObject::TransferMode BaseObject::GetTransferMode() const {
50   return BaseObject::TransferMode::kUntransferable;
51 }
52 
TransferForMessaging()53 std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
54   return CloneForMessaging();
55 }
56 
CloneForMessaging() const57 std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
58   return {};
59 }
60 
NestedTransferables() const61 Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
62   return Just(BaseObjectList {});
63 }
64 
FinalizeTransferRead(Local<Context> context,ValueDeserializer * deserializer)65 Maybe<bool> BaseObject::FinalizeTransferRead(
66     Local<Context> context, ValueDeserializer* deserializer) {
67   return Just(true);
68 }
69 
70 namespace worker {
71 
FinalizeTransferWrite(Local<Context> context,ValueSerializer * serializer)72 Maybe<bool> TransferData::FinalizeTransferWrite(
73     Local<Context> context, ValueSerializer* serializer) {
74   return Just(true);
75 }
76 
Message(MallocedBuffer<char> && buffer)77 Message::Message(MallocedBuffer<char>&& buffer)
78     : main_message_buf_(std::move(buffer)) {}
79 
IsCloseMessage() const80 bool Message::IsCloseMessage() const {
81   return main_message_buf_.data == nullptr;
82 }
83 
84 namespace {
85 
86 // This is used to tell V8 how to read transferred host objects, like other
87 // `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
88 class DeserializerDelegate : public ValueDeserializer::Delegate {
89  public:
DeserializerDelegate(Message * m,Environment * env,const std::vector<BaseObjectPtr<BaseObject>> & host_objects,const std::vector<Local<SharedArrayBuffer>> & shared_array_buffers,const std::vector<CompiledWasmModule> & wasm_modules)90   DeserializerDelegate(
91       Message* m,
92       Environment* env,
93       const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
94       const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
95       const std::vector<CompiledWasmModule>& wasm_modules)
96       : host_objects_(host_objects),
97         shared_array_buffers_(shared_array_buffers),
98         wasm_modules_(wasm_modules) {}
99 
ReadHostObject(Isolate * isolate)100   MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
101     // Identifying the index in the message's BaseObject array is sufficient.
102     uint32_t id;
103     if (!deserializer->ReadUint32(&id))
104       return MaybeLocal<Object>();
105     if (id != kNormalObject) {
106       CHECK_LT(id, host_objects_.size());
107       return host_objects_[id]->object(isolate);
108     }
109     EscapableHandleScope scope(isolate);
110     Local<Context> context = isolate->GetCurrentContext();
111     Local<Value> object;
112     if (!deserializer->ReadValue(context).ToLocal(&object))
113       return MaybeLocal<Object>();
114     CHECK(object->IsObject());
115     return scope.Escape(object.As<Object>());
116   }
117 
GetSharedArrayBufferFromId(Isolate * isolate,uint32_t clone_id)118   MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
119       Isolate* isolate, uint32_t clone_id) override {
120     CHECK_LT(clone_id, shared_array_buffers_.size());
121     return shared_array_buffers_[clone_id];
122   }
123 
GetWasmModuleFromId(Isolate * isolate,uint32_t transfer_id)124   MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
125       Isolate* isolate, uint32_t transfer_id) override {
126     CHECK_LT(transfer_id, wasm_modules_.size());
127     return WasmModuleObject::FromCompiledModule(
128         isolate, wasm_modules_[transfer_id]);
129   }
130 
131   ValueDeserializer* deserializer = nullptr;
132 
133  private:
134   const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
135   const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
136   const std::vector<CompiledWasmModule>& wasm_modules_;
137 };
138 
139 }  // anonymous namespace
140 
Deserialize(Environment * env,Local<Context> context,Local<Value> * port_list)141 MaybeLocal<Value> Message::Deserialize(Environment* env,
142                                        Local<Context> context,
143                                        Local<Value>* port_list) {
144   Context::Scope context_scope(context);
145 
146   CHECK(!IsCloseMessage());
147   if (port_list != nullptr && !transferables_.empty()) {
148     // Need to create this outside of the EscapableHandleScope, but inside
149     // the Context::Scope.
150     *port_list = Array::New(env->isolate());
151   }
152 
153   EscapableHandleScope handle_scope(env->isolate());
154 
155   // Create all necessary objects for transferables, e.g. MessagePort handles.
156   std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
157   auto cleanup = OnScopeLeave([&]() {
158     for (BaseObjectPtr<BaseObject> object : host_objects) {
159       if (!object) continue;
160 
161       // If the function did not finish successfully, host_objects will contain
162       // a list of objects that will never be passed to JS. Therefore, we
163       // destroy them here.
164       object->Detach();
165     }
166   });
167 
168   for (uint32_t i = 0; i < transferables_.size(); ++i) {
169     HandleScope handle_scope(env->isolate());
170     TransferData* data = transferables_[i].get();
171     host_objects[i] = data->Deserialize(
172         env, context, std::move(transferables_[i]));
173     if (!host_objects[i]) return {};
174     if (port_list != nullptr) {
175       // If we gather a list of all message ports, and this transferred object
176       // is a message port, add it to that list. This is a bit of an odd case
177       // of special handling for MessagePorts (as opposed to applying to all
178       // transferables), but it's required for spec compliance.
179       DCHECK((*port_list)->IsArray());
180       Local<Array> port_list_array = port_list->As<Array>();
181       Local<Object> obj = host_objects[i]->object();
182       if (env->message_port_constructor_template()->HasInstance(obj)) {
183         if (port_list_array->Set(context,
184                                  port_list_array->Length(),
185                                  obj).IsNothing()) {
186           return {};
187         }
188       }
189     }
190   }
191   transferables_.clear();
192 
193   std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
194   // Attach all transferred SharedArrayBuffers to their new Isolate.
195   for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
196     Local<SharedArrayBuffer> sab =
197         SharedArrayBuffer::New(env->isolate(), shared_array_buffers_[i]);
198     shared_array_buffers.push_back(sab);
199   }
200 
201   DeserializerDelegate delegate(
202       this, env, host_objects, shared_array_buffers, wasm_modules_);
203   ValueDeserializer deserializer(
204       env->isolate(),
205       reinterpret_cast<const uint8_t*>(main_message_buf_.data),
206       main_message_buf_.size,
207       &delegate);
208   delegate.deserializer = &deserializer;
209 
210   // Attach all transferred ArrayBuffers to their new Isolate.
211   for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
212     Local<ArrayBuffer> ab =
213         ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
214     deserializer.TransferArrayBuffer(i, ab);
215   }
216 
217   if (deserializer.ReadHeader(context).IsNothing())
218     return {};
219   Local<Value> return_value;
220   if (!deserializer.ReadValue(context).ToLocal(&return_value))
221     return {};
222 
223   for (BaseObjectPtr<BaseObject> base_object : host_objects) {
224     if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
225       return {};
226   }
227 
228   host_objects.clear();
229   return handle_scope.Escape(return_value);
230 }
231 
AddSharedArrayBuffer(std::shared_ptr<BackingStore> backing_store)232 void Message::AddSharedArrayBuffer(
233     std::shared_ptr<BackingStore> backing_store) {
234   shared_array_buffers_.emplace_back(std::move(backing_store));
235 }
236 
AddTransferable(std::unique_ptr<TransferData> && data)237 void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
238   transferables_.emplace_back(std::move(data));
239 }
240 
AddWASMModule(CompiledWasmModule && mod)241 uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
242   wasm_modules_.emplace_back(std::move(mod));
243   return wasm_modules_.size() - 1;
244 }
245 
246 namespace {
247 
GetEmitMessageFunction(Local<Context> context)248 MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
249   Isolate* isolate = context->GetIsolate();
250   Local<Object> per_context_bindings;
251   Local<Value> emit_message_val;
252   if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
253       !per_context_bindings->Get(context,
254                                 FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
255           .ToLocal(&emit_message_val)) {
256     return MaybeLocal<Function>();
257   }
258   CHECK(emit_message_val->IsFunction());
259   return emit_message_val.As<Function>();
260 }
261 
GetDOMException(Local<Context> context)262 MaybeLocal<Function> GetDOMException(Local<Context> context) {
263   Isolate* isolate = context->GetIsolate();
264   Local<Object> per_context_bindings;
265   Local<Value> domexception_ctor_val;
266   if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
267       !per_context_bindings->Get(context,
268                                 FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
269           .ToLocal(&domexception_ctor_val)) {
270     return MaybeLocal<Function>();
271   }
272   CHECK(domexception_ctor_val->IsFunction());
273   Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
274   return domexception_ctor;
275 }
276 
ThrowDataCloneException(Local<Context> context,Local<String> message)277 void ThrowDataCloneException(Local<Context> context, Local<String> message) {
278   Isolate* isolate = context->GetIsolate();
279   Local<Value> argv[] = {message,
280                          FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
281   Local<Value> exception;
282   Local<Function> domexception_ctor;
283   if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
284       !domexception_ctor->NewInstance(context, arraysize(argv), argv)
285            .ToLocal(&exception)) {
286     return;
287   }
288   isolate->ThrowException(exception);
289 }
290 
291 // This tells V8 how to serialize objects that it does not understand
292 // (e.g. C++ objects) into the output buffer, in a way that our own
293 // DeserializerDelegate understands how to unpack.
294 class SerializerDelegate : public ValueSerializer::Delegate {
295  public:
SerializerDelegate(Environment * env,Local<Context> context,Message * m)296   SerializerDelegate(Environment* env, Local<Context> context, Message* m)
297       : env_(env), context_(context), msg_(m) {}
298 
ThrowDataCloneError(Local<String> message)299   void ThrowDataCloneError(Local<String> message) override {
300     ThrowDataCloneException(context_, message);
301   }
302 
WriteHostObject(Isolate * isolate,Local<Object> object)303   Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
304     if (env_->base_object_ctor_template()->HasInstance(object)) {
305       return WriteHostObject(
306           BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
307     }
308 
309     // Convert process.env to a regular object.
310     auto env_proxy_ctor_template = env_->env_proxy_ctor_template();
311     if (!env_proxy_ctor_template.IsEmpty() &&
312         env_proxy_ctor_template->HasInstance(object)) {
313       HandleScope scope(isolate);
314       // TODO(bnoordhuis) Prototype-less object in case process.env contains
315       // a "__proto__" key? process.env has a prototype with concomitant
316       // methods like toString(). It's probably confusing if that gets lost
317       // in transmission.
318       Local<Object> normal_object = Object::New(isolate);
319       env_->env_vars()->AssignToObject(isolate, env_->context(), normal_object);
320       serializer->WriteUint32(kNormalObject);  // Instead of a BaseObject.
321       return serializer->WriteValue(env_->context(), normal_object);
322     }
323 
324     ThrowDataCloneError(env_->clone_unsupported_type_str());
325     return Nothing<bool>();
326   }
327 
GetSharedArrayBufferId(Isolate * isolate,Local<SharedArrayBuffer> shared_array_buffer)328   Maybe<uint32_t> GetSharedArrayBufferId(
329       Isolate* isolate,
330       Local<SharedArrayBuffer> shared_array_buffer) override {
331     uint32_t i;
332     for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
333       if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
334           shared_array_buffer) {
335         return Just(i);
336       }
337     }
338 
339     seen_shared_array_buffers_.emplace_back(
340       Global<SharedArrayBuffer> { isolate, shared_array_buffer });
341     msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
342     return Just(i);
343   }
344 
GetWasmModuleTransferId(Isolate * isolate,Local<WasmModuleObject> module)345   Maybe<uint32_t> GetWasmModuleTransferId(
346       Isolate* isolate, Local<WasmModuleObject> module) override {
347     return Just(msg_->AddWASMModule(module->GetCompiledModule()));
348   }
349 
Finish(Local<Context> context)350   Maybe<bool> Finish(Local<Context> context) {
351     for (uint32_t i = 0; i < host_objects_.size(); i++) {
352       BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
353       std::unique_ptr<TransferData> data;
354       if (i < first_cloned_object_index_)
355         data = host_object->TransferForMessaging();
356       if (!data)
357         data = host_object->CloneForMessaging();
358       if (!data) return Nothing<bool>();
359       if (data->FinalizeTransferWrite(context, serializer).IsNothing())
360         return Nothing<bool>();
361       msg_->AddTransferable(std::move(data));
362     }
363     return Just(true);
364   }
365 
AddHostObject(BaseObjectPtr<BaseObject> host_object)366   inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
367     // Make sure we have not started serializing the value itself yet.
368     CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
369     host_objects_.emplace_back(std::move(host_object));
370   }
371 
372   // Some objects in the transfer list may register sub-objects that can be
373   // transferred. This could e.g. be a public JS wrapper object, such as a
374   // FileHandle, that is registering its C++ handle for transfer.
AddNestedHostObjects()375   inline Maybe<bool> AddNestedHostObjects() {
376     for (size_t i = 0; i < host_objects_.size(); i++) {
377       std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
378       if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
379         return Nothing<bool>();
380       for (auto& nested_transferable : nested_transferables) {
381         if (std::find(host_objects_.begin(),
382                       host_objects_.end(),
383                       nested_transferable) == host_objects_.end()) {
384           AddHostObject(nested_transferable);
385         }
386       }
387     }
388     return Just(true);
389   }
390 
391   ValueSerializer* serializer = nullptr;
392 
393  private:
WriteHostObject(BaseObjectPtr<BaseObject> host_object)394   Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
395     BaseObject::TransferMode mode = host_object->GetTransferMode();
396     if (mode == BaseObject::TransferMode::kUntransferable) {
397       ThrowDataCloneError(env_->clone_unsupported_type_str());
398       return Nothing<bool>();
399     }
400 
401     for (uint32_t i = 0; i < host_objects_.size(); i++) {
402       if (host_objects_[i] == host_object) {
403         serializer->WriteUint32(i);
404         return Just(true);
405       }
406     }
407 
408     if (mode == BaseObject::TransferMode::kTransferable) {
409       THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
410       return Nothing<bool>();
411     }
412 
413     CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
414     uint32_t index = host_objects_.size();
415     if (first_cloned_object_index_ == SIZE_MAX)
416       first_cloned_object_index_ = index;
417     serializer->WriteUint32(index);
418     host_objects_.push_back(host_object);
419     return Just(true);
420   }
421 
422   Environment* env_;
423   Local<Context> context_;
424   Message* msg_;
425   std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
426   std::vector<BaseObjectPtr<BaseObject>> host_objects_;
427   size_t first_cloned_object_index_ = SIZE_MAX;
428 
429   friend class worker::Message;
430 };
431 
432 }  // anonymous namespace
433 
Serialize(Environment * env,Local<Context> context,Local<Value> input,const TransferList & transfer_list_v,Local<Object> source_port)434 Maybe<bool> Message::Serialize(Environment* env,
435                                Local<Context> context,
436                                Local<Value> input,
437                                const TransferList& transfer_list_v,
438                                Local<Object> source_port) {
439   HandleScope handle_scope(env->isolate());
440   Context::Scope context_scope(context);
441 
442   // Verify that we're not silently overwriting an existing message.
443   CHECK(main_message_buf_.is_empty());
444 
445   SerializerDelegate delegate(env, context, this);
446   ValueSerializer serializer(env->isolate(), &delegate);
447   delegate.serializer = &serializer;
448 
449   std::vector<Local<ArrayBuffer>> array_buffers;
450   for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
451     Local<Value> entry = transfer_list_v[i];
452     if (entry->IsObject()) {
453       // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
454       // for details.
455       bool untransferable;
456       if (!entry.As<Object>()->HasPrivate(
457               context,
458               env->untransferable_object_private_symbol())
459               .To(&untransferable)) {
460         return Nothing<bool>();
461       }
462       if (untransferable) continue;
463     }
464 
465     // Currently, we support ArrayBuffers and BaseObjects for which
466     // GetTransferMode() does not return kUntransferable.
467     if (entry->IsArrayBuffer()) {
468       Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
469       // If we cannot render the ArrayBuffer unusable in this Isolate,
470       // copying the buffer will have to do.
471       // Note that we can currently transfer ArrayBuffers even if they were
472       // not allocated by Node’s ArrayBufferAllocator in the first place,
473       // because we pass the underlying v8::BackingStore around rather than
474       // raw data *and* an Isolate with a non-default ArrayBuffer allocator
475       // is always going to outlive any Workers it creates, and so will its
476       // allocator along with it.
477       if (!ab->IsDetachable()) continue;
478       if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
479           array_buffers.end()) {
480         ThrowDataCloneException(
481             context,
482             FIXED_ONE_BYTE_STRING(
483                 env->isolate(),
484                 "Transfer list contains duplicate ArrayBuffer"));
485         return Nothing<bool>();
486       }
487       // We simply use the array index in the `array_buffers` list as the
488       // ID that we write into the serialized buffer.
489       uint32_t id = array_buffers.size();
490       array_buffers.push_back(ab);
491       serializer.TransferArrayBuffer(id, ab);
492       continue;
493     } else if (env->base_object_ctor_template()->HasInstance(entry)) {
494       // Check if the source MessagePort is being transferred.
495       if (!source_port.IsEmpty() && entry == source_port) {
496         ThrowDataCloneException(
497             context,
498             FIXED_ONE_BYTE_STRING(env->isolate(),
499                                   "Transfer list contains source port"));
500         return Nothing<bool>();
501       }
502       BaseObjectPtr<BaseObject> host_object {
503           Unwrap<BaseObject>(entry.As<Object>()) };
504       if (env->message_port_constructor_template()->HasInstance(entry) &&
505           (!host_object ||
506            static_cast<MessagePort*>(host_object.get())->IsDetached())) {
507         ThrowDataCloneException(
508             context,
509             FIXED_ONE_BYTE_STRING(
510                 env->isolate(),
511                 "MessagePort in transfer list is already detached"));
512         return Nothing<bool>();
513       }
514       if (std::find(delegate.host_objects_.begin(),
515                     delegate.host_objects_.end(),
516                     host_object) != delegate.host_objects_.end()) {
517         ThrowDataCloneException(
518             context,
519             String::Concat(env->isolate(),
520                 FIXED_ONE_BYTE_STRING(
521                   env->isolate(),
522                   "Transfer list contains duplicate "),
523                 entry.As<Object>()->GetConstructorName()));
524         return Nothing<bool>();
525       }
526       if (host_object && host_object->GetTransferMode() !=
527               BaseObject::TransferMode::kUntransferable) {
528         delegate.AddHostObject(host_object);
529         continue;
530       }
531     }
532 
533     THROW_ERR_INVALID_TRANSFER_OBJECT(env);
534     return Nothing<bool>();
535   }
536   if (delegate.AddNestedHostObjects().IsNothing())
537     return Nothing<bool>();
538 
539   serializer.WriteHeader();
540   if (serializer.WriteValue(context, input).IsNothing()) {
541     return Nothing<bool>();
542   }
543 
544   for (Local<ArrayBuffer> ab : array_buffers) {
545     // If serialization succeeded, we render it inaccessible in this Isolate.
546     std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
547     ab->Detach();
548 
549     array_buffers_.emplace_back(std::move(backing_store));
550   }
551 
552   if (delegate.Finish(context).IsNothing())
553     return Nothing<bool>();
554 
555   // The serializer gave us a buffer allocated using `malloc()`.
556   std::pair<uint8_t*, size_t> data = serializer.Release();
557   CHECK_NOT_NULL(data.first);
558   main_message_buf_ =
559       MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
560   return Just(true);
561 }
562 
MemoryInfo(MemoryTracker * tracker) const563 void Message::MemoryInfo(MemoryTracker* tracker) const {
564   tracker->TrackField("array_buffers_", array_buffers_);
565   tracker->TrackField("shared_array_buffers", shared_array_buffers_);
566   tracker->TrackField("transferables", transferables_);
567 }
568 
MessagePortData(MessagePort * owner)569 MessagePortData::MessagePortData(MessagePort* owner)
570     : owner_(owner) {
571 }
572 
~MessagePortData()573 MessagePortData::~MessagePortData() {
574   CHECK_NULL(owner_);
575   Disentangle();
576 }
577 
MemoryInfo(MemoryTracker * tracker) const578 void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
579   Mutex::ScopedLock lock(mutex_);
580   tracker->TrackField("incoming_messages", incoming_messages_);
581 }
582 
AddToIncomingQueue(std::shared_ptr<Message> message)583 void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
584   // This function will be called by other threads.
585   Mutex::ScopedLock lock(mutex_);
586   incoming_messages_.emplace_back(std::move(message));
587 
588   if (owner_ != nullptr) {
589     Debug(owner_, "Adding message to incoming queue");
590     owner_->TriggerAsync();
591   }
592 }
593 
Entangle(MessagePortData * a,MessagePortData * b)594 void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
595   auto group = std::make_shared<SiblingGroup>();
596   group->Entangle({a, b});
597 }
598 
Disentangle()599 void MessagePortData::Disentangle() {
600   if (group_) {
601     group_->Disentangle(this);
602   }
603 }
604 
~MessagePort()605 MessagePort::~MessagePort() {
606   if (data_) Detach();
607 }
608 
MessagePort(Environment * env,Local<Context> context,Local<Object> wrap)609 MessagePort::MessagePort(Environment* env,
610                          Local<Context> context,
611                          Local<Object> wrap)
612   : HandleWrap(env,
613                wrap,
614                reinterpret_cast<uv_handle_t*>(&async_),
615                AsyncWrap::PROVIDER_MESSAGEPORT),
616     data_(new MessagePortData(this)) {
617   auto onmessage = [](uv_async_t* handle) {
618     // Called when data has been put into the queue.
619     MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
620     channel->OnMessage(MessageProcessingMode::kNormalOperation);
621   };
622 
623   CHECK_EQ(uv_async_init(env->event_loop(),
624                          &async_,
625                          onmessage), 0);
626   // Reset later to indicate success of the constructor.
627   bool succeeded = false;
628   auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
629 
630   Local<Value> fn;
631   if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
632     return;
633 
634   if (fn->IsFunction()) {
635     Local<Function> init = fn.As<Function>();
636     if (init->Call(context, wrap, 0, nullptr).IsEmpty())
637       return;
638   }
639 
640   Local<Function> emit_message_fn;
641   if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
642     return;
643   emit_message_fn_.Reset(env->isolate(), emit_message_fn);
644 
645   succeeded = true;
646   Debug(this, "Created message port");
647 }
648 
IsDetached() const649 bool MessagePort::IsDetached() const {
650   return data_ == nullptr || IsHandleClosing();
651 }
652 
TriggerAsync()653 void MessagePort::TriggerAsync() {
654   if (IsHandleClosing()) return;
655   CHECK_EQ(uv_async_send(&async_), 0);
656 }
657 
Close(v8::Local<v8::Value> close_callback)658 void MessagePort::Close(v8::Local<v8::Value> close_callback) {
659   Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
660 
661   if (data_) {
662     // Wrap this call with accessing the mutex, so that TriggerAsync()
663     // can check IsHandleClosing() without race conditions.
664     Mutex::ScopedLock sibling_lock(data_->mutex_);
665     HandleWrap::Close(close_callback);
666   } else {
667     HandleWrap::Close(close_callback);
668   }
669 }
670 
New(const FunctionCallbackInfo<Value> & args)671 void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
672   // This constructor just throws an error. Unfortunately, we can’t use V8’s
673   // ConstructorBehavior::kThrow, as that also removes the prototype from the
674   // class (i.e. makes it behave like an arrow function).
675   Environment* env = Environment::GetCurrent(args);
676   THROW_ERR_CONSTRUCT_CALL_INVALID(env);
677 }
678 
New(Environment * env,Local<Context> context,std::unique_ptr<MessagePortData> data,std::shared_ptr<SiblingGroup> sibling_group)679 MessagePort* MessagePort::New(
680     Environment* env,
681     Local<Context> context,
682     std::unique_ptr<MessagePortData> data,
683     std::shared_ptr<SiblingGroup> sibling_group) {
684   Context::Scope context_scope(context);
685   Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
686 
687   // Construct a new instance, then assign the listener instance and possibly
688   // the MessagePortData to it.
689   Local<Object> instance;
690   if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
691     return nullptr;
692   MessagePort* port = new MessagePort(env, context, instance);
693   CHECK_NOT_NULL(port);
694   if (port->IsHandleClosing()) {
695     // Construction failed with an exception.
696     return nullptr;
697   }
698 
699   if (data) {
700     CHECK(!sibling_group);
701     port->Detach();
702     port->data_ = std::move(data);
703 
704     // This lock is here to avoid race conditions with the `owner_` read
705     // in AddToIncomingQueue(). (This would likely be unproblematic without it,
706     // but it's better to be safe than sorry.)
707     Mutex::ScopedLock lock(port->data_->mutex_);
708     port->data_->owner_ = port;
709     // If the existing MessagePortData object had pending messages, this is
710     // the easiest way to run that queue.
711     port->TriggerAsync();
712   } else if (sibling_group) {
713     sibling_group->Entangle(port->data_.get());
714   }
715   return port;
716 }
717 
ReceiveMessage(Local<Context> context,MessageProcessingMode mode,Local<Value> * port_list)718 MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
719                                               MessageProcessingMode mode,
720                                               Local<Value>* port_list) {
721   std::shared_ptr<Message> received;
722   {
723     // Get the head of the message queue.
724     Mutex::ScopedLock lock(data_->mutex_);
725 
726     Debug(this, "MessagePort has message");
727 
728     bool wants_message =
729         receiving_messages_ ||
730         mode == MessageProcessingMode::kForceReadMessages;
731     // We have nothing to do if:
732     // - There are no pending messages
733     // - We are not intending to receive messages, and the message we would
734     //   receive is not the final "close" message.
735     if (data_->incoming_messages_.empty() ||
736         (!wants_message &&
737          !data_->incoming_messages_.front()->IsCloseMessage())) {
738       return env()->no_message_symbol();
739     }
740 
741     received = data_->incoming_messages_.front();
742     data_->incoming_messages_.pop_front();
743   }
744 
745   if (received->IsCloseMessage()) {
746     Close();
747     return env()->no_message_symbol();
748   }
749 
750   if (!env()->can_call_into_js()) return MaybeLocal<Value>();
751 
752   return received->Deserialize(env(), context, port_list);
753 }
754 
OnMessage(MessageProcessingMode mode)755 void MessagePort::OnMessage(MessageProcessingMode mode) {
756   Debug(this, "Running MessagePort::OnMessage()");
757   HandleScope handle_scope(env()->isolate());
758   Local<Context> context =
759       object(env()->isolate())->GetCreationContext().ToLocalChecked();
760 
761   size_t processing_limit;
762   if (mode == MessageProcessingMode::kNormalOperation) {
763     Mutex::ScopedLock(data_->mutex_);
764     processing_limit = std::max(data_->incoming_messages_.size(),
765                                 static_cast<size_t>(1000));
766   } else {
767     processing_limit = std::numeric_limits<size_t>::max();
768   }
769 
770   // data_ can only ever be modified by the owner thread, so no need to lock.
771   // However, the message port may be transferred while it is processing
772   // messages, so we need to check that this handle still owns its `data_` field
773   // on every iteration.
774   while (data_) {
775     if (processing_limit-- == 0) {
776       // Prevent event loop starvation by only processing those messages without
777       // interruption that were already present when the OnMessage() call was
778       // first triggered, but at least 1000 messages because otherwise the
779       // overhead of repeatedly triggering the uv_async_t instance becomes
780       // noticeable, at least on Windows.
781       // (That might require more investigation by somebody more familiar with
782       // Windows.)
783       TriggerAsync();
784       return;
785     }
786 
787     HandleScope handle_scope(env()->isolate());
788     Context::Scope context_scope(context);
789     Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
790 
791     Local<Value> payload;
792     Local<Value> port_list = Undefined(env()->isolate());
793     Local<Value> message_error;
794     Local<Value> argv[3];
795 
796     {
797       // Catch any exceptions from parsing the message itself (not from
798       // emitting it) as 'messageeror' events.
799       TryCatchScope try_catch(env());
800       if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) {
801         if (try_catch.HasCaught() && !try_catch.HasTerminated())
802           message_error = try_catch.Exception();
803         goto reschedule;
804       }
805     }
806     if (payload == env()->no_message_symbol()) break;
807 
808     if (!env()->can_call_into_js()) {
809       Debug(this, "MessagePort drains queue because !can_call_into_js()");
810       // In this case there is nothing to do but to drain the current queue.
811       continue;
812     }
813 
814     argv[0] = payload;
815     argv[1] = port_list;
816     argv[2] = env()->message_string();
817 
818     if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
819     reschedule:
820       if (!message_error.IsEmpty()) {
821         argv[0] = message_error;
822         argv[1] = Undefined(env()->isolate());
823         argv[2] = env()->messageerror_string();
824         USE(MakeCallback(emit_message, arraysize(argv), argv));
825       }
826 
827       // Re-schedule OnMessage() execution in case of failure.
828       if (data_)
829         TriggerAsync();
830       return;
831     }
832   }
833 }
834 
OnClose()835 void MessagePort::OnClose() {
836   Debug(this, "MessagePort::OnClose()");
837   if (data_) {
838     // Detach() returns move(data_).
839     Detach()->Disentangle();
840   }
841 }
842 
Detach()843 std::unique_ptr<MessagePortData> MessagePort::Detach() {
844   CHECK(data_);
845   Mutex::ScopedLock lock(data_->mutex_);
846   data_->owner_ = nullptr;
847   return std::move(data_);
848 }
849 
GetTransferMode() const850 BaseObject::TransferMode MessagePort::GetTransferMode() const {
851   if (IsDetached())
852     return BaseObject::TransferMode::kUntransferable;
853   return BaseObject::TransferMode::kTransferable;
854 }
855 
TransferForMessaging()856 std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
857   Close();
858   return Detach();
859 }
860 
Deserialize(Environment * env,Local<Context> context,std::unique_ptr<TransferData> self)861 BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
862     Environment* env,
863     Local<Context> context,
864     std::unique_ptr<TransferData> self) {
865   return BaseObjectPtr<MessagePort> { MessagePort::New(
866       env, context,
867       static_unique_pointer_cast<MessagePortData>(std::move(self))) };
868 }
869 
PostMessage(Environment * env,Local<Context> context,Local<Value> message_v,const TransferList & transfer_v)870 Maybe<bool> MessagePort::PostMessage(Environment* env,
871                                      Local<Context> context,
872                                      Local<Value> message_v,
873                                      const TransferList& transfer_v) {
874   Isolate* isolate = env->isolate();
875   Local<Object> obj = object(isolate);
876 
877   std::shared_ptr<Message> msg = std::make_shared<Message>();
878 
879   // Per spec, we need to both check if transfer list has the source port, and
880   // serialize the input message, even if the MessagePort is closed or detached.
881 
882   Maybe<bool> serialization_maybe =
883       msg->Serialize(env, context, message_v, transfer_v, obj);
884   if (data_ == nullptr) {
885     return serialization_maybe;
886   }
887   if (serialization_maybe.IsNothing()) {
888     return Nothing<bool>();
889   }
890 
891   std::string error;
892   Maybe<bool> res = data_->Dispatch(msg, &error);
893   if (res.IsNothing())
894     return res;
895 
896   if (!error.empty())
897     ProcessEmitWarning(env, error.c_str());
898 
899   return res;
900 }
901 
Dispatch(std::shared_ptr<Message> message,std::string * error)902 Maybe<bool> MessagePortData::Dispatch(
903     std::shared_ptr<Message> message,
904     std::string* error) {
905   if (!group_) {
906     if (error != nullptr)
907       *error = "MessagePortData is not entangled.";
908     return Nothing<bool>();
909   }
910   return group_->Dispatch(this, message, error);
911 }
912 
ReadIterable(Environment * env,Local<Context> context,TransferList & transfer_list,Local<Value> object)913 static Maybe<bool> ReadIterable(Environment* env,
914                                 Local<Context> context,
915                                 // NOLINTNEXTLINE(runtime/references)
916                                 TransferList& transfer_list,
917                                 Local<Value> object) {
918   if (!object->IsObject()) return Just(false);
919 
920   if (object->IsArray()) {
921     Local<Array> arr = object.As<Array>();
922     size_t length = arr->Length();
923     transfer_list.AllocateSufficientStorage(length);
924     for (size_t i = 0; i < length; i++) {
925       if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
926         return Nothing<bool>();
927     }
928     return Just(true);
929   }
930 
931   Isolate* isolate = env->isolate();
932   Local<Value> iterator_method;
933   if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
934       .ToLocal(&iterator_method)) return Nothing<bool>();
935   if (!iterator_method->IsFunction()) return Just(false);
936 
937   Local<Value> iterator;
938   if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
939       .ToLocal(&iterator)) return Nothing<bool>();
940   if (!iterator->IsObject()) return Just(false);
941 
942   Local<Value> next;
943   if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
944     return Nothing<bool>();
945   if (!next->IsFunction()) return Just(false);
946 
947   std::vector<Local<Value>> entries;
948   while (env->can_call_into_js()) {
949     Local<Value> result;
950     if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
951         .ToLocal(&result)) return Nothing<bool>();
952     if (!result->IsObject()) return Just(false);
953 
954     Local<Value> done;
955     if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
956       return Nothing<bool>();
957     if (done->BooleanValue(isolate)) break;
958 
959     Local<Value> val;
960     if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
961       return Nothing<bool>();
962     entries.push_back(val);
963   }
964 
965   transfer_list.AllocateSufficientStorage(entries.size());
966   std::copy(entries.begin(), entries.end(), &transfer_list[0]);
967   return Just(true);
968 }
969 
PostMessage(const FunctionCallbackInfo<Value> & args)970 void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
971   Environment* env = Environment::GetCurrent(args);
972   Local<Object> obj = args.This();
973   Local<Context> context = obj->GetCreationContext().ToLocalChecked();
974 
975   if (args.Length() == 0) {
976     return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
977                                        "MessagePort.postMessage");
978   }
979 
980   if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
981     // Browsers ignore null or undefined, and otherwise accept an array or an
982     // options object.
983     return THROW_ERR_INVALID_ARG_TYPE(env,
984         "Optional transferList argument must be an iterable");
985   }
986 
987   TransferList transfer_list;
988   if (args[1]->IsObject()) {
989     bool was_iterable;
990     if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
991       return;
992     if (!was_iterable) {
993       Local<Value> transfer_option;
994       if (!args[1].As<Object>()->Get(context, env->transfer_string())
995           .ToLocal(&transfer_option)) return;
996       if (!transfer_option->IsUndefined()) {
997         if (!ReadIterable(env, context, transfer_list, transfer_option)
998             .To(&was_iterable)) return;
999         if (!was_iterable) {
1000           return THROW_ERR_INVALID_ARG_TYPE(env,
1001               "Optional options.transfer argument must be an iterable");
1002         }
1003       }
1004     }
1005   }
1006 
1007   MessagePort* port = Unwrap<MessagePort>(args.This());
1008   // Even if the backing MessagePort object has already been deleted, we still
1009   // want to serialize the message to ensure spec-compliant behavior w.r.t.
1010   // transfers.
1011   if (port == nullptr || port->IsHandleClosing()) {
1012     Message msg;
1013     USE(msg.Serialize(env, context, args[0], transfer_list, obj));
1014     return;
1015   }
1016 
1017   Maybe<bool> res = port->PostMessage(env, context, args[0], transfer_list);
1018   if (res.IsJust())
1019     args.GetReturnValue().Set(res.FromJust());
1020 }
1021 
Start()1022 void MessagePort::Start() {
1023   Debug(this, "Start receiving messages");
1024   receiving_messages_ = true;
1025   Mutex::ScopedLock lock(data_->mutex_);
1026   if (!data_->incoming_messages_.empty())
1027     TriggerAsync();
1028 }
1029 
Stop()1030 void MessagePort::Stop() {
1031   Debug(this, "Stop receiving messages");
1032   receiving_messages_ = false;
1033 }
1034 
Start(const FunctionCallbackInfo<Value> & args)1035 void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
1036   MessagePort* port;
1037   ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
1038   if (!port->data_) {
1039     return;
1040   }
1041   port->Start();
1042 }
1043 
Stop(const FunctionCallbackInfo<Value> & args)1044 void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
1045   MessagePort* port;
1046   CHECK(args[0]->IsObject());
1047   ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1048   if (!port->data_) {
1049     return;
1050   }
1051   port->Stop();
1052 }
1053 
CheckType(const FunctionCallbackInfo<Value> & args)1054 void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
1055   Environment* env = Environment::GetCurrent(args);
1056   args.GetReturnValue().Set(
1057       GetMessagePortConstructorTemplate(env)->HasInstance(args[0]));
1058 }
1059 
Drain(const FunctionCallbackInfo<Value> & args)1060 void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1061   MessagePort* port;
1062   ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1063   port->OnMessage(MessageProcessingMode::kForceReadMessages);
1064 }
1065 
ReceiveMessage(const FunctionCallbackInfo<Value> & args)1066 void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1067   Environment* env = Environment::GetCurrent(args);
1068   if (!args[0]->IsObject() ||
1069       !env->message_port_constructor_template()->HasInstance(args[0])) {
1070     return THROW_ERR_INVALID_ARG_TYPE(env,
1071         "The \"port\" argument must be a MessagePort instance");
1072   }
1073   MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1074   if (port == nullptr) {
1075     // Return 'no messages' for a closed port.
1076     args.GetReturnValue().Set(
1077         Environment::GetCurrent(args)->no_message_symbol());
1078     return;
1079   }
1080 
1081   MaybeLocal<Value> payload = port->ReceiveMessage(
1082       port->object()->GetCreationContext().ToLocalChecked(),
1083       MessageProcessingMode::kForceReadMessages);
1084   if (!payload.IsEmpty())
1085     args.GetReturnValue().Set(payload.ToLocalChecked());
1086 }
1087 
MoveToContext(const FunctionCallbackInfo<Value> & args)1088 void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1089   Environment* env = Environment::GetCurrent(args);
1090   if (!args[0]->IsObject() ||
1091       !env->message_port_constructor_template()->HasInstance(args[0])) {
1092     return THROW_ERR_INVALID_ARG_TYPE(env,
1093         "The \"port\" argument must be a MessagePort instance");
1094   }
1095   MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1096   if (port == nullptr || port->IsHandleClosing()) {
1097     Isolate* isolate = env->isolate();
1098     THROW_ERR_CLOSED_MESSAGE_PORT(isolate);
1099     return;
1100   }
1101 
1102   Local<Value> context_arg = args[1];
1103   ContextifyContext* context_wrapper;
1104   if (!context_arg->IsObject() ||
1105       (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1106           env, context_arg.As<Object>())) == nullptr) {
1107     return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1108   }
1109 
1110   std::unique_ptr<MessagePortData> data;
1111   if (!port->IsDetached())
1112     data = port->Detach();
1113 
1114   Context::Scope context_scope(context_wrapper->context());
1115   MessagePort* target =
1116       MessagePort::New(env, context_wrapper->context(), std::move(data));
1117   if (target != nullptr)
1118     args.GetReturnValue().Set(target->object());
1119 }
1120 
Entangle(MessagePort * a,MessagePort * b)1121 void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1122   MessagePortData::Entangle(a->data_.get(), b->data_.get());
1123 }
1124 
Entangle(MessagePort * a,MessagePortData * b)1125 void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1126   MessagePortData::Entangle(a->data_.get(), b);
1127 }
1128 
MemoryInfo(MemoryTracker * tracker) const1129 void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1130   tracker->TrackField("data", data_);
1131   tracker->TrackField("emit_message_fn", emit_message_fn_);
1132 }
1133 
GetMessagePortConstructorTemplate(Environment * env)1134 Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1135   // Factor generating the MessagePort JS constructor into its own piece
1136   // of code, because it is needed early on in the child environment setup.
1137   Local<FunctionTemplate> templ = env->message_port_constructor_template();
1138   if (!templ.IsEmpty())
1139     return templ;
1140 
1141   {
1142     Isolate* isolate = env->isolate();
1143     Local<FunctionTemplate> m = NewFunctionTemplate(isolate, MessagePort::New);
1144     m->SetClassName(env->message_port_constructor_string());
1145     m->InstanceTemplate()->SetInternalFieldCount(
1146         MessagePort::kInternalFieldCount);
1147     m->Inherit(HandleWrap::GetConstructorTemplate(env));
1148 
1149     SetProtoMethod(isolate, m, "postMessage", MessagePort::PostMessage);
1150     SetProtoMethod(isolate, m, "start", MessagePort::Start);
1151 
1152     env->set_message_port_constructor_template(m);
1153   }
1154 
1155   return GetMessagePortConstructorTemplate(env);
1156 }
1157 
JSTransferable(Environment * env,Local<Object> obj)1158 JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1159     : BaseObject(env, obj) {
1160   MakeWeak();
1161 }
1162 
New(const FunctionCallbackInfo<Value> & args)1163 void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1164   CHECK(args.IsConstructCall());
1165   new JSTransferable(Environment::GetCurrent(args), args.This());
1166 }
1167 
GetTransferMode() const1168 JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1169   // Implement `kClone in this ? kCloneable : kTransferable`.
1170   HandleScope handle_scope(env()->isolate());
1171   errors::TryCatchScope ignore_exceptions(env());
1172 
1173   bool has_clone;
1174   if (!object()->Has(env()->context(),
1175                      env()->messaging_clone_symbol()).To(&has_clone)) {
1176     return TransferMode::kUntransferable;
1177   }
1178 
1179   return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1180 }
1181 
TransferForMessaging()1182 std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1183   return TransferOrClone(TransferMode::kTransferable);
1184 }
1185 
CloneForMessaging() const1186 std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1187   return TransferOrClone(TransferMode::kCloneable);
1188 }
1189 
TransferOrClone(TransferMode mode) const1190 std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1191     TransferMode mode) const {
1192   // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1193   // which should return an object with `data` and `deserializeInfo` properties;
1194   // `data` is written to the serializer later, and `deserializeInfo` is stored
1195   // on the `TransferData` instance as a string.
1196   HandleScope handle_scope(env()->isolate());
1197   Local<Context> context = env()->isolate()->GetCurrentContext();
1198   Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1199       env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1200 
1201   Local<Value> method;
1202   if (!object()->Get(context, method_name).ToLocal(&method)) {
1203     return {};
1204   }
1205   if (method->IsFunction()) {
1206     Local<Value> result_v;
1207     if (!method.As<Function>()->Call(
1208             context, object(), 0, nullptr).ToLocal(&result_v)) {
1209       return {};
1210     }
1211 
1212     if (result_v->IsObject()) {
1213       Local<Object> result = result_v.As<Object>();
1214       Local<Value> data;
1215       Local<Value> deserialize_info;
1216       if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1217           !result->Get(context, env()->deserialize_info_string())
1218               .ToLocal(&deserialize_info)) {
1219         return {};
1220       }
1221       Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1222       if (*deserialize_info_str == nullptr) return {};
1223       return std::make_unique<Data>(
1224           *deserialize_info_str, Global<Value>(env()->isolate(), data));
1225     }
1226   }
1227 
1228   if (mode == TransferMode::kTransferable)
1229     return TransferOrClone(TransferMode::kCloneable);
1230   else
1231     return {};
1232 }
1233 
1234 Maybe<BaseObjectList>
NestedTransferables() const1235 JSTransferable::NestedTransferables() const {
1236   // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1237   HandleScope handle_scope(env()->isolate());
1238   Local<Context> context = env()->isolate()->GetCurrentContext();
1239   Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1240 
1241   Local<Value> method;
1242   if (!object()->Get(context, method_name).ToLocal(&method)) {
1243     return Nothing<BaseObjectList>();
1244   }
1245   if (!method->IsFunction()) return Just(BaseObjectList {});
1246 
1247   Local<Value> list_v;
1248   if (!method.As<Function>()->Call(
1249           context, object(), 0, nullptr).ToLocal(&list_v)) {
1250     return Nothing<BaseObjectList>();
1251   }
1252   if (!list_v->IsArray()) return Just(BaseObjectList {});
1253   Local<Array> list = list_v.As<Array>();
1254 
1255   BaseObjectList ret;
1256   for (size_t i = 0; i < list->Length(); i++) {
1257     Local<Value> value;
1258     if (!list->Get(context, i).ToLocal(&value))
1259       return Nothing<BaseObjectList>();
1260     if (env()->base_object_ctor_template()->HasInstance(value))
1261       ret.emplace_back(Unwrap<BaseObject>(value));
1262   }
1263   return Just(ret);
1264 }
1265 
FinalizeTransferRead(Local<Context> context,ValueDeserializer * deserializer)1266 Maybe<bool> JSTransferable::FinalizeTransferRead(
1267     Local<Context> context, ValueDeserializer* deserializer) {
1268   // Call `this[kDeserialize](data)` where `data` comes from the return value
1269   // of `this[kTransfer]()` or `this[kClone]()`.
1270   HandleScope handle_scope(env()->isolate());
1271   Local<Value> data;
1272   if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1273 
1274   Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1275   Local<Value> method;
1276   if (!object()->Get(context, method_name).ToLocal(&method)) {
1277     return Nothing<bool>();
1278   }
1279   if (!method->IsFunction()) return Just(true);
1280 
1281   if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1282     return Nothing<bool>();
1283   }
1284   return Just(true);
1285 }
1286 
Data(std::string && deserialize_info,v8::Global<v8::Value> && data)1287 JSTransferable::Data::Data(std::string&& deserialize_info,
1288                            v8::Global<v8::Value>&& data)
1289     : deserialize_info_(std::move(deserialize_info)),
1290       data_(std::move(data)) {}
1291 
Deserialize(Environment * env,Local<Context> context,std::unique_ptr<TransferData> self)1292 BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1293     Environment* env,
1294     Local<Context> context,
1295     std::unique_ptr<TransferData> self) {
1296   // Create the JS wrapper object that will later be filled with data passed to
1297   // the `[kDeserialize]()` method on it. This split is necessary, because here
1298   // we need to create an object with the right prototype and internal fields,
1299   // but the actual JS data stored in the serialized data can only be read at
1300   // the end of the stream, after the main message has been read.
1301 
1302   if (context != env->context()) {
1303     THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1304     return {};
1305   }
1306   HandleScope handle_scope(env->isolate());
1307   Local<Value> info;
1308   if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1309 
1310   Local<Value> ret;
1311   CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1312   if (!env->messaging_deserialize_create_object()->Call(
1313           context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1314       !env->base_object_ctor_template()->HasInstance(ret)) {
1315     return {};
1316   }
1317 
1318   return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1319 }
1320 
FinalizeTransferWrite(Local<Context> context,ValueSerializer * serializer)1321 Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1322     Local<Context> context, ValueSerializer* serializer) {
1323   HandleScope handle_scope(context->GetIsolate());
1324   auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1325   data_.Reset();
1326   return ret;
1327 }
1328 
Get(const std::string & name)1329 std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
1330   Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1331   std::shared_ptr<SiblingGroup> group;
1332   auto i = groups_.find(name);
1333   if (i == groups_.end() || i->second.expired()) {
1334     group = std::make_shared<SiblingGroup>(name);
1335     groups_[name] = group;
1336   } else {
1337     group = i->second.lock();
1338   }
1339   return group;
1340 }
1341 
CheckSiblingGroup(const std::string & name)1342 void SiblingGroup::CheckSiblingGroup(const std::string& name) {
1343   Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1344   auto i = groups_.find(name);
1345   if (i != groups_.end() && i->second.expired())
1346     groups_.erase(name);
1347 }
1348 
SiblingGroup(const std::string & name)1349 SiblingGroup::SiblingGroup(const std::string& name)
1350     : name_(name) { }
1351 
~SiblingGroup()1352 SiblingGroup::~SiblingGroup() {
1353   // If this is a named group, check to see if we can remove the group
1354   if (!name_.empty())
1355     CheckSiblingGroup(name_);
1356 }
1357 
Dispatch(MessagePortData * source,std::shared_ptr<Message> message,std::string * error)1358 Maybe<bool> SiblingGroup::Dispatch(
1359     MessagePortData* source,
1360     std::shared_ptr<Message> message,
1361     std::string* error) {
1362 
1363   RwLock::ScopedReadLock lock(group_mutex_);
1364 
1365   // The source MessagePortData is not part of this group.
1366   if (ports_.find(source) == ports_.end()) {
1367     if (error != nullptr)
1368       *error = "Source MessagePort is not entangled with this group.";
1369     return Nothing<bool>();
1370   }
1371 
1372   // There are no destination ports.
1373   if (size() <= 1)
1374     return Just(false);
1375 
1376   // Transferables cannot be used when there is more
1377   // than a single destination.
1378   if (size() > 2 && message->has_transferables()) {
1379     if (error != nullptr)
1380       *error = "Transferables cannot be used with multiple destinations.";
1381     return Nothing<bool>();
1382   }
1383 
1384   for (MessagePortData* port : ports_) {
1385     if (port == source)
1386       continue;
1387     // This loop should only be entered if there's only a single destination
1388     for (const auto& transferable : message->transferables()) {
1389       if (port == transferable.get()) {
1390         if (error != nullptr) {
1391           *error = "The target port was posted to itself, and the "
1392                    "communication channel was lost";
1393         }
1394         return Just(true);
1395       }
1396     }
1397     port->AddToIncomingQueue(message);
1398   }
1399 
1400   return Just(true);
1401 }
1402 
Entangle(MessagePortData * port)1403 void SiblingGroup::Entangle(MessagePortData* port) {
1404   Entangle({ port });
1405 }
1406 
Entangle(std::initializer_list<MessagePortData * > ports)1407 void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
1408   RwLock::ScopedWriteLock lock(group_mutex_);
1409   for (MessagePortData* data : ports) {
1410     ports_.insert(data);
1411     CHECK(!data->group_);
1412     data->group_ = shared_from_this();
1413   }
1414 }
1415 
Disentangle(MessagePortData * data)1416 void SiblingGroup::Disentangle(MessagePortData* data) {
1417   auto self = shared_from_this();  // Keep alive until end of function.
1418   RwLock::ScopedWriteLock lock(group_mutex_);
1419   ports_.erase(data);
1420   data->group_.reset();
1421 
1422   data->AddToIncomingQueue(std::make_shared<Message>());
1423   // If this is an anonymous group and there's another port, close it.
1424   if (size() == 1 && name_.empty())
1425     (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
1426 }
1427 
1428 SiblingGroup::Map SiblingGroup::groups_;
1429 Mutex SiblingGroup::groups_mutex_;
1430 
1431 namespace {
1432 
SetDeserializerCreateObjectFunction(const FunctionCallbackInfo<Value> & args)1433 static void SetDeserializerCreateObjectFunction(
1434     const FunctionCallbackInfo<Value>& args) {
1435   Environment* env = Environment::GetCurrent(args);
1436   CHECK(args[0]->IsFunction());
1437   env->set_messaging_deserialize_create_object(args[0].As<Function>());
1438 }
1439 
MessageChannel(const FunctionCallbackInfo<Value> & args)1440 static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1441   Environment* env = Environment::GetCurrent(args);
1442   if (!args.IsConstructCall()) {
1443     THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1444     return;
1445   }
1446 
1447   Local<Context> context = args.This()->GetCreationContext().ToLocalChecked();
1448   Context::Scope context_scope(context);
1449 
1450   MessagePort* port1 = MessagePort::New(env, context);
1451   if (port1 == nullptr) return;
1452   MessagePort* port2 = MessagePort::New(env, context);
1453   if (port2 == nullptr) {
1454     port1->Close();
1455     return;
1456   }
1457 
1458   MessagePort::Entangle(port1, port2);
1459 
1460   args.This()->Set(context, env->port1_string(), port1->object())
1461       .Check();
1462   args.This()->Set(context, env->port2_string(), port2->object())
1463       .Check();
1464 }
1465 
BroadcastChannel(const FunctionCallbackInfo<Value> & args)1466 static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
1467   CHECK(args[0]->IsString());
1468   Environment* env = Environment::GetCurrent(args);
1469   Context::Scope context_scope(env->context());
1470   Utf8Value name(env->isolate(), args[0]);
1471   MessagePort* port =
1472       MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
1473   if (port != nullptr) {
1474     args.GetReturnValue().Set(port->object());
1475   }
1476 }
1477 
InitMessaging(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)1478 static void InitMessaging(Local<Object> target,
1479                           Local<Value> unused,
1480                           Local<Context> context,
1481                           void* priv) {
1482   Environment* env = Environment::GetCurrent(context);
1483   Isolate* isolate = env->isolate();
1484 
1485   {
1486     SetConstructorFunction(context,
1487                            target,
1488                            "MessageChannel",
1489                            NewFunctionTemplate(isolate, MessageChannel));
1490   }
1491 
1492   {
1493     Local<FunctionTemplate> t =
1494         NewFunctionTemplate(isolate, JSTransferable::New);
1495     t->Inherit(BaseObject::GetConstructorTemplate(env));
1496     t->InstanceTemplate()->SetInternalFieldCount(
1497         JSTransferable::kInternalFieldCount);
1498     t->SetClassName(OneByteString(isolate, "JSTransferable"));
1499     SetConstructorFunction(
1500         context, target, "JSTransferable", t, SetConstructorFunctionFlag::NONE);
1501   }
1502 
1503   SetConstructorFunction(context,
1504                          target,
1505                          env->message_port_constructor_string(),
1506                          GetMessagePortConstructorTemplate(env),
1507                          SetConstructorFunctionFlag::NONE);
1508 
1509   // These are not methods on the MessagePort prototype, because
1510   // the browser equivalents do not provide them.
1511   SetMethod(context, target, "stopMessagePort", MessagePort::Stop);
1512   SetMethod(context, target, "checkMessagePort", MessagePort::CheckType);
1513   SetMethod(context, target, "drainMessagePort", MessagePort::Drain);
1514   SetMethod(
1515       context, target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1516   SetMethod(
1517       context, target, "moveMessagePortToContext", MessagePort::MoveToContext);
1518   SetMethod(context,
1519             target,
1520             "setDeserializerCreateObjectFunction",
1521             SetDeserializerCreateObjectFunction);
1522   SetMethod(context, target, "broadcastChannel", BroadcastChannel);
1523 
1524   {
1525     Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1526     target
1527         ->Set(context,
1528               FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1529               domexception)
1530         .Check();
1531   }
1532 }
1533 
RegisterExternalReferences(ExternalReferenceRegistry * registry)1534 static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
1535   registry->Register(MessageChannel);
1536   registry->Register(BroadcastChannel);
1537   registry->Register(JSTransferable::New);
1538   registry->Register(MessagePort::New);
1539   registry->Register(MessagePort::PostMessage);
1540   registry->Register(MessagePort::Start);
1541   registry->Register(MessagePort::Stop);
1542   registry->Register(MessagePort::CheckType);
1543   registry->Register(MessagePort::Drain);
1544   registry->Register(MessagePort::ReceiveMessage);
1545   registry->Register(MessagePort::MoveToContext);
1546   registry->Register(SetDeserializerCreateObjectFunction);
1547 }
1548 
1549 }  // anonymous namespace
1550 
1551 }  // namespace worker
1552 }  // namespace node
1553 
1554 NODE_BINDING_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1555 NODE_BINDING_EXTERNAL_REFERENCE(messaging,
1556                                 node::worker::RegisterExternalReferences)
1557