1 #include "node_messaging.h"
2
3 #include "async_wrap-inl.h"
4 #include "debug_utils-inl.h"
5 #include "memory_tracker-inl.h"
6 #include "node_buffer.h"
7 #include "node_contextify.h"
8 #include "node_errors.h"
9 #include "node_external_reference.h"
10 #include "node_process-inl.h"
11 #include "util-inl.h"
12
13 using node::contextify::ContextifyContext;
14 using node::errors::TryCatchScope;
15 using v8::Array;
16 using v8::ArrayBuffer;
17 using v8::BackingStore;
18 using v8::CompiledWasmModule;
19 using v8::Context;
20 using v8::EscapableHandleScope;
21 using v8::Function;
22 using v8::FunctionCallbackInfo;
23 using v8::FunctionTemplate;
24 using v8::Global;
25 using v8::HandleScope;
26 using v8::Isolate;
27 using v8::Just;
28 using v8::Local;
29 using v8::Maybe;
30 using v8::MaybeLocal;
31 using v8::Nothing;
32 using v8::Object;
33 using v8::SharedArrayBuffer;
34 using v8::String;
35 using v8::Symbol;
36 using v8::Value;
37 using v8::ValueDeserializer;
38 using v8::ValueSerializer;
39 using v8::WasmModuleObject;
40
41 namespace node {
42
43 using BaseObjectList = std::vector<BaseObjectPtr<BaseObject>>;
44
45 // Hack to have WriteHostObject inform ReadHostObject that the value
46 // should be treated as a regular JS object. Used to transfer process.env.
47 static const uint32_t kNormalObject = static_cast<uint32_t>(-1);
48
GetTransferMode() const49 BaseObject::TransferMode BaseObject::GetTransferMode() const {
50 return BaseObject::TransferMode::kUntransferable;
51 }
52
TransferForMessaging()53 std::unique_ptr<worker::TransferData> BaseObject::TransferForMessaging() {
54 return CloneForMessaging();
55 }
56
CloneForMessaging() const57 std::unique_ptr<worker::TransferData> BaseObject::CloneForMessaging() const {
58 return {};
59 }
60
NestedTransferables() const61 Maybe<BaseObjectList> BaseObject::NestedTransferables() const {
62 return Just(BaseObjectList {});
63 }
64
FinalizeTransferRead(Local<Context> context,ValueDeserializer * deserializer)65 Maybe<bool> BaseObject::FinalizeTransferRead(
66 Local<Context> context, ValueDeserializer* deserializer) {
67 return Just(true);
68 }
69
70 namespace worker {
71
FinalizeTransferWrite(Local<Context> context,ValueSerializer * serializer)72 Maybe<bool> TransferData::FinalizeTransferWrite(
73 Local<Context> context, ValueSerializer* serializer) {
74 return Just(true);
75 }
76
Message(MallocedBuffer<char> && buffer)77 Message::Message(MallocedBuffer<char>&& buffer)
78 : main_message_buf_(std::move(buffer)) {}
79
IsCloseMessage() const80 bool Message::IsCloseMessage() const {
81 return main_message_buf_.data == nullptr;
82 }
83
84 namespace {
85
86 // This is used to tell V8 how to read transferred host objects, like other
87 // `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
88 class DeserializerDelegate : public ValueDeserializer::Delegate {
89 public:
DeserializerDelegate(Message * m,Environment * env,const std::vector<BaseObjectPtr<BaseObject>> & host_objects,const std::vector<Local<SharedArrayBuffer>> & shared_array_buffers,const std::vector<CompiledWasmModule> & wasm_modules)90 DeserializerDelegate(
91 Message* m,
92 Environment* env,
93 const std::vector<BaseObjectPtr<BaseObject>>& host_objects,
94 const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
95 const std::vector<CompiledWasmModule>& wasm_modules)
96 : host_objects_(host_objects),
97 shared_array_buffers_(shared_array_buffers),
98 wasm_modules_(wasm_modules) {}
99
ReadHostObject(Isolate * isolate)100 MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
101 // Identifying the index in the message's BaseObject array is sufficient.
102 uint32_t id;
103 if (!deserializer->ReadUint32(&id))
104 return MaybeLocal<Object>();
105 if (id != kNormalObject) {
106 CHECK_LT(id, host_objects_.size());
107 return host_objects_[id]->object(isolate);
108 }
109 EscapableHandleScope scope(isolate);
110 Local<Context> context = isolate->GetCurrentContext();
111 Local<Value> object;
112 if (!deserializer->ReadValue(context).ToLocal(&object))
113 return MaybeLocal<Object>();
114 CHECK(object->IsObject());
115 return scope.Escape(object.As<Object>());
116 }
117
GetSharedArrayBufferFromId(Isolate * isolate,uint32_t clone_id)118 MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
119 Isolate* isolate, uint32_t clone_id) override {
120 CHECK_LT(clone_id, shared_array_buffers_.size());
121 return shared_array_buffers_[clone_id];
122 }
123
GetWasmModuleFromId(Isolate * isolate,uint32_t transfer_id)124 MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
125 Isolate* isolate, uint32_t transfer_id) override {
126 CHECK_LT(transfer_id, wasm_modules_.size());
127 return WasmModuleObject::FromCompiledModule(
128 isolate, wasm_modules_[transfer_id]);
129 }
130
131 ValueDeserializer* deserializer = nullptr;
132
133 private:
134 const std::vector<BaseObjectPtr<BaseObject>>& host_objects_;
135 const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
136 const std::vector<CompiledWasmModule>& wasm_modules_;
137 };
138
139 } // anonymous namespace
140
Deserialize(Environment * env,Local<Context> context,Local<Value> * port_list)141 MaybeLocal<Value> Message::Deserialize(Environment* env,
142 Local<Context> context,
143 Local<Value>* port_list) {
144 Context::Scope context_scope(context);
145
146 CHECK(!IsCloseMessage());
147 if (port_list != nullptr && !transferables_.empty()) {
148 // Need to create this outside of the EscapableHandleScope, but inside
149 // the Context::Scope.
150 *port_list = Array::New(env->isolate());
151 }
152
153 EscapableHandleScope handle_scope(env->isolate());
154
155 // Create all necessary objects for transferables, e.g. MessagePort handles.
156 std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
157 auto cleanup = OnScopeLeave([&]() {
158 for (BaseObjectPtr<BaseObject> object : host_objects) {
159 if (!object) continue;
160
161 // If the function did not finish successfully, host_objects will contain
162 // a list of objects that will never be passed to JS. Therefore, we
163 // destroy them here.
164 object->Detach();
165 }
166 });
167
168 for (uint32_t i = 0; i < transferables_.size(); ++i) {
169 HandleScope handle_scope(env->isolate());
170 TransferData* data = transferables_[i].get();
171 host_objects[i] = data->Deserialize(
172 env, context, std::move(transferables_[i]));
173 if (!host_objects[i]) return {};
174 if (port_list != nullptr) {
175 // If we gather a list of all message ports, and this transferred object
176 // is a message port, add it to that list. This is a bit of an odd case
177 // of special handling for MessagePorts (as opposed to applying to all
178 // transferables), but it's required for spec compliance.
179 DCHECK((*port_list)->IsArray());
180 Local<Array> port_list_array = port_list->As<Array>();
181 Local<Object> obj = host_objects[i]->object();
182 if (env->message_port_constructor_template()->HasInstance(obj)) {
183 if (port_list_array->Set(context,
184 port_list_array->Length(),
185 obj).IsNothing()) {
186 return {};
187 }
188 }
189 }
190 }
191 transferables_.clear();
192
193 std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
194 // Attach all transferred SharedArrayBuffers to their new Isolate.
195 for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
196 Local<SharedArrayBuffer> sab =
197 SharedArrayBuffer::New(env->isolate(), shared_array_buffers_[i]);
198 shared_array_buffers.push_back(sab);
199 }
200
201 DeserializerDelegate delegate(
202 this, env, host_objects, shared_array_buffers, wasm_modules_);
203 ValueDeserializer deserializer(
204 env->isolate(),
205 reinterpret_cast<const uint8_t*>(main_message_buf_.data),
206 main_message_buf_.size,
207 &delegate);
208 delegate.deserializer = &deserializer;
209
210 // Attach all transferred ArrayBuffers to their new Isolate.
211 for (uint32_t i = 0; i < array_buffers_.size(); ++i) {
212 Local<ArrayBuffer> ab =
213 ArrayBuffer::New(env->isolate(), std::move(array_buffers_[i]));
214 deserializer.TransferArrayBuffer(i, ab);
215 }
216
217 if (deserializer.ReadHeader(context).IsNothing())
218 return {};
219 Local<Value> return_value;
220 if (!deserializer.ReadValue(context).ToLocal(&return_value))
221 return {};
222
223 for (BaseObjectPtr<BaseObject> base_object : host_objects) {
224 if (base_object->FinalizeTransferRead(context, &deserializer).IsNothing())
225 return {};
226 }
227
228 host_objects.clear();
229 return handle_scope.Escape(return_value);
230 }
231
AddSharedArrayBuffer(std::shared_ptr<BackingStore> backing_store)232 void Message::AddSharedArrayBuffer(
233 std::shared_ptr<BackingStore> backing_store) {
234 shared_array_buffers_.emplace_back(std::move(backing_store));
235 }
236
AddTransferable(std::unique_ptr<TransferData> && data)237 void Message::AddTransferable(std::unique_ptr<TransferData>&& data) {
238 transferables_.emplace_back(std::move(data));
239 }
240
AddWASMModule(CompiledWasmModule && mod)241 uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
242 wasm_modules_.emplace_back(std::move(mod));
243 return wasm_modules_.size() - 1;
244 }
245
246 namespace {
247
GetEmitMessageFunction(Local<Context> context)248 MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
249 Isolate* isolate = context->GetIsolate();
250 Local<Object> per_context_bindings;
251 Local<Value> emit_message_val;
252 if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
253 !per_context_bindings->Get(context,
254 FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
255 .ToLocal(&emit_message_val)) {
256 return MaybeLocal<Function>();
257 }
258 CHECK(emit_message_val->IsFunction());
259 return emit_message_val.As<Function>();
260 }
261
GetDOMException(Local<Context> context)262 MaybeLocal<Function> GetDOMException(Local<Context> context) {
263 Isolate* isolate = context->GetIsolate();
264 Local<Object> per_context_bindings;
265 Local<Value> domexception_ctor_val;
266 if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
267 !per_context_bindings->Get(context,
268 FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
269 .ToLocal(&domexception_ctor_val)) {
270 return MaybeLocal<Function>();
271 }
272 CHECK(domexception_ctor_val->IsFunction());
273 Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
274 return domexception_ctor;
275 }
276
ThrowDataCloneException(Local<Context> context,Local<String> message)277 void ThrowDataCloneException(Local<Context> context, Local<String> message) {
278 Isolate* isolate = context->GetIsolate();
279 Local<Value> argv[] = {message,
280 FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
281 Local<Value> exception;
282 Local<Function> domexception_ctor;
283 if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
284 !domexception_ctor->NewInstance(context, arraysize(argv), argv)
285 .ToLocal(&exception)) {
286 return;
287 }
288 isolate->ThrowException(exception);
289 }
290
291 // This tells V8 how to serialize objects that it does not understand
292 // (e.g. C++ objects) into the output buffer, in a way that our own
293 // DeserializerDelegate understands how to unpack.
294 class SerializerDelegate : public ValueSerializer::Delegate {
295 public:
SerializerDelegate(Environment * env,Local<Context> context,Message * m)296 SerializerDelegate(Environment* env, Local<Context> context, Message* m)
297 : env_(env), context_(context), msg_(m) {}
298
ThrowDataCloneError(Local<String> message)299 void ThrowDataCloneError(Local<String> message) override {
300 ThrowDataCloneException(context_, message);
301 }
302
WriteHostObject(Isolate * isolate,Local<Object> object)303 Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
304 if (env_->base_object_ctor_template()->HasInstance(object)) {
305 return WriteHostObject(
306 BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(object) });
307 }
308
309 // Convert process.env to a regular object.
310 auto env_proxy_ctor_template = env_->env_proxy_ctor_template();
311 if (!env_proxy_ctor_template.IsEmpty() &&
312 env_proxy_ctor_template->HasInstance(object)) {
313 HandleScope scope(isolate);
314 // TODO(bnoordhuis) Prototype-less object in case process.env contains
315 // a "__proto__" key? process.env has a prototype with concomitant
316 // methods like toString(). It's probably confusing if that gets lost
317 // in transmission.
318 Local<Object> normal_object = Object::New(isolate);
319 env_->env_vars()->AssignToObject(isolate, env_->context(), normal_object);
320 serializer->WriteUint32(kNormalObject); // Instead of a BaseObject.
321 return serializer->WriteValue(env_->context(), normal_object);
322 }
323
324 ThrowDataCloneError(env_->clone_unsupported_type_str());
325 return Nothing<bool>();
326 }
327
GetSharedArrayBufferId(Isolate * isolate,Local<SharedArrayBuffer> shared_array_buffer)328 Maybe<uint32_t> GetSharedArrayBufferId(
329 Isolate* isolate,
330 Local<SharedArrayBuffer> shared_array_buffer) override {
331 uint32_t i;
332 for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
333 if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
334 shared_array_buffer) {
335 return Just(i);
336 }
337 }
338
339 seen_shared_array_buffers_.emplace_back(
340 Global<SharedArrayBuffer> { isolate, shared_array_buffer });
341 msg_->AddSharedArrayBuffer(shared_array_buffer->GetBackingStore());
342 return Just(i);
343 }
344
GetWasmModuleTransferId(Isolate * isolate,Local<WasmModuleObject> module)345 Maybe<uint32_t> GetWasmModuleTransferId(
346 Isolate* isolate, Local<WasmModuleObject> module) override {
347 return Just(msg_->AddWASMModule(module->GetCompiledModule()));
348 }
349
Finish(Local<Context> context)350 Maybe<bool> Finish(Local<Context> context) {
351 for (uint32_t i = 0; i < host_objects_.size(); i++) {
352 BaseObjectPtr<BaseObject> host_object = std::move(host_objects_[i]);
353 std::unique_ptr<TransferData> data;
354 if (i < first_cloned_object_index_)
355 data = host_object->TransferForMessaging();
356 if (!data)
357 data = host_object->CloneForMessaging();
358 if (!data) return Nothing<bool>();
359 if (data->FinalizeTransferWrite(context, serializer).IsNothing())
360 return Nothing<bool>();
361 msg_->AddTransferable(std::move(data));
362 }
363 return Just(true);
364 }
365
AddHostObject(BaseObjectPtr<BaseObject> host_object)366 inline void AddHostObject(BaseObjectPtr<BaseObject> host_object) {
367 // Make sure we have not started serializing the value itself yet.
368 CHECK_EQ(first_cloned_object_index_, SIZE_MAX);
369 host_objects_.emplace_back(std::move(host_object));
370 }
371
372 // Some objects in the transfer list may register sub-objects that can be
373 // transferred. This could e.g. be a public JS wrapper object, such as a
374 // FileHandle, that is registering its C++ handle for transfer.
AddNestedHostObjects()375 inline Maybe<bool> AddNestedHostObjects() {
376 for (size_t i = 0; i < host_objects_.size(); i++) {
377 std::vector<BaseObjectPtr<BaseObject>> nested_transferables;
378 if (!host_objects_[i]->NestedTransferables().To(&nested_transferables))
379 return Nothing<bool>();
380 for (auto& nested_transferable : nested_transferables) {
381 if (std::find(host_objects_.begin(),
382 host_objects_.end(),
383 nested_transferable) == host_objects_.end()) {
384 AddHostObject(nested_transferable);
385 }
386 }
387 }
388 return Just(true);
389 }
390
391 ValueSerializer* serializer = nullptr;
392
393 private:
WriteHostObject(BaseObjectPtr<BaseObject> host_object)394 Maybe<bool> WriteHostObject(BaseObjectPtr<BaseObject> host_object) {
395 BaseObject::TransferMode mode = host_object->GetTransferMode();
396 if (mode == BaseObject::TransferMode::kUntransferable) {
397 ThrowDataCloneError(env_->clone_unsupported_type_str());
398 return Nothing<bool>();
399 }
400
401 for (uint32_t i = 0; i < host_objects_.size(); i++) {
402 if (host_objects_[i] == host_object) {
403 serializer->WriteUint32(i);
404 return Just(true);
405 }
406 }
407
408 if (mode == BaseObject::TransferMode::kTransferable) {
409 THROW_ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST(env_);
410 return Nothing<bool>();
411 }
412
413 CHECK_EQ(mode, BaseObject::TransferMode::kCloneable);
414 uint32_t index = host_objects_.size();
415 if (first_cloned_object_index_ == SIZE_MAX)
416 first_cloned_object_index_ = index;
417 serializer->WriteUint32(index);
418 host_objects_.push_back(host_object);
419 return Just(true);
420 }
421
422 Environment* env_;
423 Local<Context> context_;
424 Message* msg_;
425 std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
426 std::vector<BaseObjectPtr<BaseObject>> host_objects_;
427 size_t first_cloned_object_index_ = SIZE_MAX;
428
429 friend class worker::Message;
430 };
431
432 } // anonymous namespace
433
Serialize(Environment * env,Local<Context> context,Local<Value> input,const TransferList & transfer_list_v,Local<Object> source_port)434 Maybe<bool> Message::Serialize(Environment* env,
435 Local<Context> context,
436 Local<Value> input,
437 const TransferList& transfer_list_v,
438 Local<Object> source_port) {
439 HandleScope handle_scope(env->isolate());
440 Context::Scope context_scope(context);
441
442 // Verify that we're not silently overwriting an existing message.
443 CHECK(main_message_buf_.is_empty());
444
445 SerializerDelegate delegate(env, context, this);
446 ValueSerializer serializer(env->isolate(), &delegate);
447 delegate.serializer = &serializer;
448
449 std::vector<Local<ArrayBuffer>> array_buffers;
450 for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
451 Local<Value> entry = transfer_list_v[i];
452 if (entry->IsObject()) {
453 // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
454 // for details.
455 bool untransferable;
456 if (!entry.As<Object>()->HasPrivate(
457 context,
458 env->untransferable_object_private_symbol())
459 .To(&untransferable)) {
460 return Nothing<bool>();
461 }
462 if (untransferable) continue;
463 }
464
465 // Currently, we support ArrayBuffers and BaseObjects for which
466 // GetTransferMode() does not return kUntransferable.
467 if (entry->IsArrayBuffer()) {
468 Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
469 // If we cannot render the ArrayBuffer unusable in this Isolate,
470 // copying the buffer will have to do.
471 // Note that we can currently transfer ArrayBuffers even if they were
472 // not allocated by Node’s ArrayBufferAllocator in the first place,
473 // because we pass the underlying v8::BackingStore around rather than
474 // raw data *and* an Isolate with a non-default ArrayBuffer allocator
475 // is always going to outlive any Workers it creates, and so will its
476 // allocator along with it.
477 if (!ab->IsDetachable()) continue;
478 if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
479 array_buffers.end()) {
480 ThrowDataCloneException(
481 context,
482 FIXED_ONE_BYTE_STRING(
483 env->isolate(),
484 "Transfer list contains duplicate ArrayBuffer"));
485 return Nothing<bool>();
486 }
487 // We simply use the array index in the `array_buffers` list as the
488 // ID that we write into the serialized buffer.
489 uint32_t id = array_buffers.size();
490 array_buffers.push_back(ab);
491 serializer.TransferArrayBuffer(id, ab);
492 continue;
493 } else if (env->base_object_ctor_template()->HasInstance(entry)) {
494 // Check if the source MessagePort is being transferred.
495 if (!source_port.IsEmpty() && entry == source_port) {
496 ThrowDataCloneException(
497 context,
498 FIXED_ONE_BYTE_STRING(env->isolate(),
499 "Transfer list contains source port"));
500 return Nothing<bool>();
501 }
502 BaseObjectPtr<BaseObject> host_object {
503 Unwrap<BaseObject>(entry.As<Object>()) };
504 if (env->message_port_constructor_template()->HasInstance(entry) &&
505 (!host_object ||
506 static_cast<MessagePort*>(host_object.get())->IsDetached())) {
507 ThrowDataCloneException(
508 context,
509 FIXED_ONE_BYTE_STRING(
510 env->isolate(),
511 "MessagePort in transfer list is already detached"));
512 return Nothing<bool>();
513 }
514 if (std::find(delegate.host_objects_.begin(),
515 delegate.host_objects_.end(),
516 host_object) != delegate.host_objects_.end()) {
517 ThrowDataCloneException(
518 context,
519 String::Concat(env->isolate(),
520 FIXED_ONE_BYTE_STRING(
521 env->isolate(),
522 "Transfer list contains duplicate "),
523 entry.As<Object>()->GetConstructorName()));
524 return Nothing<bool>();
525 }
526 if (host_object && host_object->GetTransferMode() !=
527 BaseObject::TransferMode::kUntransferable) {
528 delegate.AddHostObject(host_object);
529 continue;
530 }
531 }
532
533 THROW_ERR_INVALID_TRANSFER_OBJECT(env);
534 return Nothing<bool>();
535 }
536 if (delegate.AddNestedHostObjects().IsNothing())
537 return Nothing<bool>();
538
539 serializer.WriteHeader();
540 if (serializer.WriteValue(context, input).IsNothing()) {
541 return Nothing<bool>();
542 }
543
544 for (Local<ArrayBuffer> ab : array_buffers) {
545 // If serialization succeeded, we render it inaccessible in this Isolate.
546 std::shared_ptr<BackingStore> backing_store = ab->GetBackingStore();
547 ab->Detach();
548
549 array_buffers_.emplace_back(std::move(backing_store));
550 }
551
552 if (delegate.Finish(context).IsNothing())
553 return Nothing<bool>();
554
555 // The serializer gave us a buffer allocated using `malloc()`.
556 std::pair<uint8_t*, size_t> data = serializer.Release();
557 CHECK_NOT_NULL(data.first);
558 main_message_buf_ =
559 MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
560 return Just(true);
561 }
562
MemoryInfo(MemoryTracker * tracker) const563 void Message::MemoryInfo(MemoryTracker* tracker) const {
564 tracker->TrackField("array_buffers_", array_buffers_);
565 tracker->TrackField("shared_array_buffers", shared_array_buffers_);
566 tracker->TrackField("transferables", transferables_);
567 }
568
MessagePortData(MessagePort * owner)569 MessagePortData::MessagePortData(MessagePort* owner)
570 : owner_(owner) {
571 }
572
~MessagePortData()573 MessagePortData::~MessagePortData() {
574 CHECK_NULL(owner_);
575 Disentangle();
576 }
577
MemoryInfo(MemoryTracker * tracker) const578 void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
579 Mutex::ScopedLock lock(mutex_);
580 tracker->TrackField("incoming_messages", incoming_messages_);
581 }
582
AddToIncomingQueue(std::shared_ptr<Message> message)583 void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
584 // This function will be called by other threads.
585 Mutex::ScopedLock lock(mutex_);
586 incoming_messages_.emplace_back(std::move(message));
587
588 if (owner_ != nullptr) {
589 Debug(owner_, "Adding message to incoming queue");
590 owner_->TriggerAsync();
591 }
592 }
593
Entangle(MessagePortData * a,MessagePortData * b)594 void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
595 auto group = std::make_shared<SiblingGroup>();
596 group->Entangle({a, b});
597 }
598
Disentangle()599 void MessagePortData::Disentangle() {
600 if (group_) {
601 group_->Disentangle(this);
602 }
603 }
604
~MessagePort()605 MessagePort::~MessagePort() {
606 if (data_) Detach();
607 }
608
MessagePort(Environment * env,Local<Context> context,Local<Object> wrap)609 MessagePort::MessagePort(Environment* env,
610 Local<Context> context,
611 Local<Object> wrap)
612 : HandleWrap(env,
613 wrap,
614 reinterpret_cast<uv_handle_t*>(&async_),
615 AsyncWrap::PROVIDER_MESSAGEPORT),
616 data_(new MessagePortData(this)) {
617 auto onmessage = [](uv_async_t* handle) {
618 // Called when data has been put into the queue.
619 MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
620 channel->OnMessage(MessageProcessingMode::kNormalOperation);
621 };
622
623 CHECK_EQ(uv_async_init(env->event_loop(),
624 &async_,
625 onmessage), 0);
626 // Reset later to indicate success of the constructor.
627 bool succeeded = false;
628 auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });
629
630 Local<Value> fn;
631 if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
632 return;
633
634 if (fn->IsFunction()) {
635 Local<Function> init = fn.As<Function>();
636 if (init->Call(context, wrap, 0, nullptr).IsEmpty())
637 return;
638 }
639
640 Local<Function> emit_message_fn;
641 if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
642 return;
643 emit_message_fn_.Reset(env->isolate(), emit_message_fn);
644
645 succeeded = true;
646 Debug(this, "Created message port");
647 }
648
IsDetached() const649 bool MessagePort::IsDetached() const {
650 return data_ == nullptr || IsHandleClosing();
651 }
652
TriggerAsync()653 void MessagePort::TriggerAsync() {
654 if (IsHandleClosing()) return;
655 CHECK_EQ(uv_async_send(&async_), 0);
656 }
657
Close(v8::Local<v8::Value> close_callback)658 void MessagePort::Close(v8::Local<v8::Value> close_callback) {
659 Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
660
661 if (data_) {
662 // Wrap this call with accessing the mutex, so that TriggerAsync()
663 // can check IsHandleClosing() without race conditions.
664 Mutex::ScopedLock sibling_lock(data_->mutex_);
665 HandleWrap::Close(close_callback);
666 } else {
667 HandleWrap::Close(close_callback);
668 }
669 }
670
New(const FunctionCallbackInfo<Value> & args)671 void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
672 // This constructor just throws an error. Unfortunately, we can’t use V8’s
673 // ConstructorBehavior::kThrow, as that also removes the prototype from the
674 // class (i.e. makes it behave like an arrow function).
675 Environment* env = Environment::GetCurrent(args);
676 THROW_ERR_CONSTRUCT_CALL_INVALID(env);
677 }
678
New(Environment * env,Local<Context> context,std::unique_ptr<MessagePortData> data,std::shared_ptr<SiblingGroup> sibling_group)679 MessagePort* MessagePort::New(
680 Environment* env,
681 Local<Context> context,
682 std::unique_ptr<MessagePortData> data,
683 std::shared_ptr<SiblingGroup> sibling_group) {
684 Context::Scope context_scope(context);
685 Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
686
687 // Construct a new instance, then assign the listener instance and possibly
688 // the MessagePortData to it.
689 Local<Object> instance;
690 if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
691 return nullptr;
692 MessagePort* port = new MessagePort(env, context, instance);
693 CHECK_NOT_NULL(port);
694 if (port->IsHandleClosing()) {
695 // Construction failed with an exception.
696 return nullptr;
697 }
698
699 if (data) {
700 CHECK(!sibling_group);
701 port->Detach();
702 port->data_ = std::move(data);
703
704 // This lock is here to avoid race conditions with the `owner_` read
705 // in AddToIncomingQueue(). (This would likely be unproblematic without it,
706 // but it's better to be safe than sorry.)
707 Mutex::ScopedLock lock(port->data_->mutex_);
708 port->data_->owner_ = port;
709 // If the existing MessagePortData object had pending messages, this is
710 // the easiest way to run that queue.
711 port->TriggerAsync();
712 } else if (sibling_group) {
713 sibling_group->Entangle(port->data_.get());
714 }
715 return port;
716 }
717
ReceiveMessage(Local<Context> context,MessageProcessingMode mode,Local<Value> * port_list)718 MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
719 MessageProcessingMode mode,
720 Local<Value>* port_list) {
721 std::shared_ptr<Message> received;
722 {
723 // Get the head of the message queue.
724 Mutex::ScopedLock lock(data_->mutex_);
725
726 Debug(this, "MessagePort has message");
727
728 bool wants_message =
729 receiving_messages_ ||
730 mode == MessageProcessingMode::kForceReadMessages;
731 // We have nothing to do if:
732 // - There are no pending messages
733 // - We are not intending to receive messages, and the message we would
734 // receive is not the final "close" message.
735 if (data_->incoming_messages_.empty() ||
736 (!wants_message &&
737 !data_->incoming_messages_.front()->IsCloseMessage())) {
738 return env()->no_message_symbol();
739 }
740
741 received = data_->incoming_messages_.front();
742 data_->incoming_messages_.pop_front();
743 }
744
745 if (received->IsCloseMessage()) {
746 Close();
747 return env()->no_message_symbol();
748 }
749
750 if (!env()->can_call_into_js()) return MaybeLocal<Value>();
751
752 return received->Deserialize(env(), context, port_list);
753 }
754
OnMessage(MessageProcessingMode mode)755 void MessagePort::OnMessage(MessageProcessingMode mode) {
756 Debug(this, "Running MessagePort::OnMessage()");
757 HandleScope handle_scope(env()->isolate());
758 Local<Context> context =
759 object(env()->isolate())->GetCreationContext().ToLocalChecked();
760
761 size_t processing_limit;
762 if (mode == MessageProcessingMode::kNormalOperation) {
763 Mutex::ScopedLock(data_->mutex_);
764 processing_limit = std::max(data_->incoming_messages_.size(),
765 static_cast<size_t>(1000));
766 } else {
767 processing_limit = std::numeric_limits<size_t>::max();
768 }
769
770 // data_ can only ever be modified by the owner thread, so no need to lock.
771 // However, the message port may be transferred while it is processing
772 // messages, so we need to check that this handle still owns its `data_` field
773 // on every iteration.
774 while (data_) {
775 if (processing_limit-- == 0) {
776 // Prevent event loop starvation by only processing those messages without
777 // interruption that were already present when the OnMessage() call was
778 // first triggered, but at least 1000 messages because otherwise the
779 // overhead of repeatedly triggering the uv_async_t instance becomes
780 // noticeable, at least on Windows.
781 // (That might require more investigation by somebody more familiar with
782 // Windows.)
783 TriggerAsync();
784 return;
785 }
786
787 HandleScope handle_scope(env()->isolate());
788 Context::Scope context_scope(context);
789 Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
790
791 Local<Value> payload;
792 Local<Value> port_list = Undefined(env()->isolate());
793 Local<Value> message_error;
794 Local<Value> argv[3];
795
796 {
797 // Catch any exceptions from parsing the message itself (not from
798 // emitting it) as 'messageeror' events.
799 TryCatchScope try_catch(env());
800 if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) {
801 if (try_catch.HasCaught() && !try_catch.HasTerminated())
802 message_error = try_catch.Exception();
803 goto reschedule;
804 }
805 }
806 if (payload == env()->no_message_symbol()) break;
807
808 if (!env()->can_call_into_js()) {
809 Debug(this, "MessagePort drains queue because !can_call_into_js()");
810 // In this case there is nothing to do but to drain the current queue.
811 continue;
812 }
813
814 argv[0] = payload;
815 argv[1] = port_list;
816 argv[2] = env()->message_string();
817
818 if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
819 reschedule:
820 if (!message_error.IsEmpty()) {
821 argv[0] = message_error;
822 argv[1] = Undefined(env()->isolate());
823 argv[2] = env()->messageerror_string();
824 USE(MakeCallback(emit_message, arraysize(argv), argv));
825 }
826
827 // Re-schedule OnMessage() execution in case of failure.
828 if (data_)
829 TriggerAsync();
830 return;
831 }
832 }
833 }
834
OnClose()835 void MessagePort::OnClose() {
836 Debug(this, "MessagePort::OnClose()");
837 if (data_) {
838 // Detach() returns move(data_).
839 Detach()->Disentangle();
840 }
841 }
842
Detach()843 std::unique_ptr<MessagePortData> MessagePort::Detach() {
844 CHECK(data_);
845 Mutex::ScopedLock lock(data_->mutex_);
846 data_->owner_ = nullptr;
847 return std::move(data_);
848 }
849
GetTransferMode() const850 BaseObject::TransferMode MessagePort::GetTransferMode() const {
851 if (IsDetached())
852 return BaseObject::TransferMode::kUntransferable;
853 return BaseObject::TransferMode::kTransferable;
854 }
855
TransferForMessaging()856 std::unique_ptr<TransferData> MessagePort::TransferForMessaging() {
857 Close();
858 return Detach();
859 }
860
Deserialize(Environment * env,Local<Context> context,std::unique_ptr<TransferData> self)861 BaseObjectPtr<BaseObject> MessagePortData::Deserialize(
862 Environment* env,
863 Local<Context> context,
864 std::unique_ptr<TransferData> self) {
865 return BaseObjectPtr<MessagePort> { MessagePort::New(
866 env, context,
867 static_unique_pointer_cast<MessagePortData>(std::move(self))) };
868 }
869
PostMessage(Environment * env,Local<Context> context,Local<Value> message_v,const TransferList & transfer_v)870 Maybe<bool> MessagePort::PostMessage(Environment* env,
871 Local<Context> context,
872 Local<Value> message_v,
873 const TransferList& transfer_v) {
874 Isolate* isolate = env->isolate();
875 Local<Object> obj = object(isolate);
876
877 std::shared_ptr<Message> msg = std::make_shared<Message>();
878
879 // Per spec, we need to both check if transfer list has the source port, and
880 // serialize the input message, even if the MessagePort is closed or detached.
881
882 Maybe<bool> serialization_maybe =
883 msg->Serialize(env, context, message_v, transfer_v, obj);
884 if (data_ == nullptr) {
885 return serialization_maybe;
886 }
887 if (serialization_maybe.IsNothing()) {
888 return Nothing<bool>();
889 }
890
891 std::string error;
892 Maybe<bool> res = data_->Dispatch(msg, &error);
893 if (res.IsNothing())
894 return res;
895
896 if (!error.empty())
897 ProcessEmitWarning(env, error.c_str());
898
899 return res;
900 }
901
Dispatch(std::shared_ptr<Message> message,std::string * error)902 Maybe<bool> MessagePortData::Dispatch(
903 std::shared_ptr<Message> message,
904 std::string* error) {
905 if (!group_) {
906 if (error != nullptr)
907 *error = "MessagePortData is not entangled.";
908 return Nothing<bool>();
909 }
910 return group_->Dispatch(this, message, error);
911 }
912
ReadIterable(Environment * env,Local<Context> context,TransferList & transfer_list,Local<Value> object)913 static Maybe<bool> ReadIterable(Environment* env,
914 Local<Context> context,
915 // NOLINTNEXTLINE(runtime/references)
916 TransferList& transfer_list,
917 Local<Value> object) {
918 if (!object->IsObject()) return Just(false);
919
920 if (object->IsArray()) {
921 Local<Array> arr = object.As<Array>();
922 size_t length = arr->Length();
923 transfer_list.AllocateSufficientStorage(length);
924 for (size_t i = 0; i < length; i++) {
925 if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
926 return Nothing<bool>();
927 }
928 return Just(true);
929 }
930
931 Isolate* isolate = env->isolate();
932 Local<Value> iterator_method;
933 if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
934 .ToLocal(&iterator_method)) return Nothing<bool>();
935 if (!iterator_method->IsFunction()) return Just(false);
936
937 Local<Value> iterator;
938 if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
939 .ToLocal(&iterator)) return Nothing<bool>();
940 if (!iterator->IsObject()) return Just(false);
941
942 Local<Value> next;
943 if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
944 return Nothing<bool>();
945 if (!next->IsFunction()) return Just(false);
946
947 std::vector<Local<Value>> entries;
948 while (env->can_call_into_js()) {
949 Local<Value> result;
950 if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
951 .ToLocal(&result)) return Nothing<bool>();
952 if (!result->IsObject()) return Just(false);
953
954 Local<Value> done;
955 if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
956 return Nothing<bool>();
957 if (done->BooleanValue(isolate)) break;
958
959 Local<Value> val;
960 if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
961 return Nothing<bool>();
962 entries.push_back(val);
963 }
964
965 transfer_list.AllocateSufficientStorage(entries.size());
966 std::copy(entries.begin(), entries.end(), &transfer_list[0]);
967 return Just(true);
968 }
969
PostMessage(const FunctionCallbackInfo<Value> & args)970 void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
971 Environment* env = Environment::GetCurrent(args);
972 Local<Object> obj = args.This();
973 Local<Context> context = obj->GetCreationContext().ToLocalChecked();
974
975 if (args.Length() == 0) {
976 return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
977 "MessagePort.postMessage");
978 }
979
980 if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
981 // Browsers ignore null or undefined, and otherwise accept an array or an
982 // options object.
983 return THROW_ERR_INVALID_ARG_TYPE(env,
984 "Optional transferList argument must be an iterable");
985 }
986
987 TransferList transfer_list;
988 if (args[1]->IsObject()) {
989 bool was_iterable;
990 if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
991 return;
992 if (!was_iterable) {
993 Local<Value> transfer_option;
994 if (!args[1].As<Object>()->Get(context, env->transfer_string())
995 .ToLocal(&transfer_option)) return;
996 if (!transfer_option->IsUndefined()) {
997 if (!ReadIterable(env, context, transfer_list, transfer_option)
998 .To(&was_iterable)) return;
999 if (!was_iterable) {
1000 return THROW_ERR_INVALID_ARG_TYPE(env,
1001 "Optional options.transfer argument must be an iterable");
1002 }
1003 }
1004 }
1005 }
1006
1007 MessagePort* port = Unwrap<MessagePort>(args.This());
1008 // Even if the backing MessagePort object has already been deleted, we still
1009 // want to serialize the message to ensure spec-compliant behavior w.r.t.
1010 // transfers.
1011 if (port == nullptr || port->IsHandleClosing()) {
1012 Message msg;
1013 USE(msg.Serialize(env, context, args[0], transfer_list, obj));
1014 return;
1015 }
1016
1017 Maybe<bool> res = port->PostMessage(env, context, args[0], transfer_list);
1018 if (res.IsJust())
1019 args.GetReturnValue().Set(res.FromJust());
1020 }
1021
Start()1022 void MessagePort::Start() {
1023 Debug(this, "Start receiving messages");
1024 receiving_messages_ = true;
1025 Mutex::ScopedLock lock(data_->mutex_);
1026 if (!data_->incoming_messages_.empty())
1027 TriggerAsync();
1028 }
1029
Stop()1030 void MessagePort::Stop() {
1031 Debug(this, "Stop receiving messages");
1032 receiving_messages_ = false;
1033 }
1034
Start(const FunctionCallbackInfo<Value> & args)1035 void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
1036 MessagePort* port;
1037 ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
1038 if (!port->data_) {
1039 return;
1040 }
1041 port->Start();
1042 }
1043
Stop(const FunctionCallbackInfo<Value> & args)1044 void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
1045 MessagePort* port;
1046 CHECK(args[0]->IsObject());
1047 ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1048 if (!port->data_) {
1049 return;
1050 }
1051 port->Stop();
1052 }
1053
CheckType(const FunctionCallbackInfo<Value> & args)1054 void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
1055 Environment* env = Environment::GetCurrent(args);
1056 args.GetReturnValue().Set(
1057 GetMessagePortConstructorTemplate(env)->HasInstance(args[0]));
1058 }
1059
Drain(const FunctionCallbackInfo<Value> & args)1060 void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
1061 MessagePort* port;
1062 ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1063 port->OnMessage(MessageProcessingMode::kForceReadMessages);
1064 }
1065
ReceiveMessage(const FunctionCallbackInfo<Value> & args)1066 void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
1067 Environment* env = Environment::GetCurrent(args);
1068 if (!args[0]->IsObject() ||
1069 !env->message_port_constructor_template()->HasInstance(args[0])) {
1070 return THROW_ERR_INVALID_ARG_TYPE(env,
1071 "The \"port\" argument must be a MessagePort instance");
1072 }
1073 MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1074 if (port == nullptr) {
1075 // Return 'no messages' for a closed port.
1076 args.GetReturnValue().Set(
1077 Environment::GetCurrent(args)->no_message_symbol());
1078 return;
1079 }
1080
1081 MaybeLocal<Value> payload = port->ReceiveMessage(
1082 port->object()->GetCreationContext().ToLocalChecked(),
1083 MessageProcessingMode::kForceReadMessages);
1084 if (!payload.IsEmpty())
1085 args.GetReturnValue().Set(payload.ToLocalChecked());
1086 }
1087
MoveToContext(const FunctionCallbackInfo<Value> & args)1088 void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
1089 Environment* env = Environment::GetCurrent(args);
1090 if (!args[0]->IsObject() ||
1091 !env->message_port_constructor_template()->HasInstance(args[0])) {
1092 return THROW_ERR_INVALID_ARG_TYPE(env,
1093 "The \"port\" argument must be a MessagePort instance");
1094 }
1095 MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
1096 if (port == nullptr || port->IsHandleClosing()) {
1097 Isolate* isolate = env->isolate();
1098 THROW_ERR_CLOSED_MESSAGE_PORT(isolate);
1099 return;
1100 }
1101
1102 Local<Value> context_arg = args[1];
1103 ContextifyContext* context_wrapper;
1104 if (!context_arg->IsObject() ||
1105 (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
1106 env, context_arg.As<Object>())) == nullptr) {
1107 return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
1108 }
1109
1110 std::unique_ptr<MessagePortData> data;
1111 if (!port->IsDetached())
1112 data = port->Detach();
1113
1114 Context::Scope context_scope(context_wrapper->context());
1115 MessagePort* target =
1116 MessagePort::New(env, context_wrapper->context(), std::move(data));
1117 if (target != nullptr)
1118 args.GetReturnValue().Set(target->object());
1119 }
1120
Entangle(MessagePort * a,MessagePort * b)1121 void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1122 MessagePortData::Entangle(a->data_.get(), b->data_.get());
1123 }
1124
Entangle(MessagePort * a,MessagePortData * b)1125 void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
1126 MessagePortData::Entangle(a->data_.get(), b);
1127 }
1128
MemoryInfo(MemoryTracker * tracker) const1129 void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
1130 tracker->TrackField("data", data_);
1131 tracker->TrackField("emit_message_fn", emit_message_fn_);
1132 }
1133
GetMessagePortConstructorTemplate(Environment * env)1134 Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
1135 // Factor generating the MessagePort JS constructor into its own piece
1136 // of code, because it is needed early on in the child environment setup.
1137 Local<FunctionTemplate> templ = env->message_port_constructor_template();
1138 if (!templ.IsEmpty())
1139 return templ;
1140
1141 {
1142 Isolate* isolate = env->isolate();
1143 Local<FunctionTemplate> m = NewFunctionTemplate(isolate, MessagePort::New);
1144 m->SetClassName(env->message_port_constructor_string());
1145 m->InstanceTemplate()->SetInternalFieldCount(
1146 MessagePort::kInternalFieldCount);
1147 m->Inherit(HandleWrap::GetConstructorTemplate(env));
1148
1149 SetProtoMethod(isolate, m, "postMessage", MessagePort::PostMessage);
1150 SetProtoMethod(isolate, m, "start", MessagePort::Start);
1151
1152 env->set_message_port_constructor_template(m);
1153 }
1154
1155 return GetMessagePortConstructorTemplate(env);
1156 }
1157
JSTransferable(Environment * env,Local<Object> obj)1158 JSTransferable::JSTransferable(Environment* env, Local<Object> obj)
1159 : BaseObject(env, obj) {
1160 MakeWeak();
1161 }
1162
New(const FunctionCallbackInfo<Value> & args)1163 void JSTransferable::New(const FunctionCallbackInfo<Value>& args) {
1164 CHECK(args.IsConstructCall());
1165 new JSTransferable(Environment::GetCurrent(args), args.This());
1166 }
1167
GetTransferMode() const1168 JSTransferable::TransferMode JSTransferable::GetTransferMode() const {
1169 // Implement `kClone in this ? kCloneable : kTransferable`.
1170 HandleScope handle_scope(env()->isolate());
1171 errors::TryCatchScope ignore_exceptions(env());
1172
1173 bool has_clone;
1174 if (!object()->Has(env()->context(),
1175 env()->messaging_clone_symbol()).To(&has_clone)) {
1176 return TransferMode::kUntransferable;
1177 }
1178
1179 return has_clone ? TransferMode::kCloneable : TransferMode::kTransferable;
1180 }
1181
TransferForMessaging()1182 std::unique_ptr<TransferData> JSTransferable::TransferForMessaging() {
1183 return TransferOrClone(TransferMode::kTransferable);
1184 }
1185
CloneForMessaging() const1186 std::unique_ptr<TransferData> JSTransferable::CloneForMessaging() const {
1187 return TransferOrClone(TransferMode::kCloneable);
1188 }
1189
TransferOrClone(TransferMode mode) const1190 std::unique_ptr<TransferData> JSTransferable::TransferOrClone(
1191 TransferMode mode) const {
1192 // Call `this[symbol]()` where `symbol` is `kClone` or `kTransfer`,
1193 // which should return an object with `data` and `deserializeInfo` properties;
1194 // `data` is written to the serializer later, and `deserializeInfo` is stored
1195 // on the `TransferData` instance as a string.
1196 HandleScope handle_scope(env()->isolate());
1197 Local<Context> context = env()->isolate()->GetCurrentContext();
1198 Local<Symbol> method_name = mode == TransferMode::kCloneable ?
1199 env()->messaging_clone_symbol() : env()->messaging_transfer_symbol();
1200
1201 Local<Value> method;
1202 if (!object()->Get(context, method_name).ToLocal(&method)) {
1203 return {};
1204 }
1205 if (method->IsFunction()) {
1206 Local<Value> result_v;
1207 if (!method.As<Function>()->Call(
1208 context, object(), 0, nullptr).ToLocal(&result_v)) {
1209 return {};
1210 }
1211
1212 if (result_v->IsObject()) {
1213 Local<Object> result = result_v.As<Object>();
1214 Local<Value> data;
1215 Local<Value> deserialize_info;
1216 if (!result->Get(context, env()->data_string()).ToLocal(&data) ||
1217 !result->Get(context, env()->deserialize_info_string())
1218 .ToLocal(&deserialize_info)) {
1219 return {};
1220 }
1221 Utf8Value deserialize_info_str(env()->isolate(), deserialize_info);
1222 if (*deserialize_info_str == nullptr) return {};
1223 return std::make_unique<Data>(
1224 *deserialize_info_str, Global<Value>(env()->isolate(), data));
1225 }
1226 }
1227
1228 if (mode == TransferMode::kTransferable)
1229 return TransferOrClone(TransferMode::kCloneable);
1230 else
1231 return {};
1232 }
1233
1234 Maybe<BaseObjectList>
NestedTransferables() const1235 JSTransferable::NestedTransferables() const {
1236 // Call `this[kTransferList]()` and return the resulting list of BaseObjects.
1237 HandleScope handle_scope(env()->isolate());
1238 Local<Context> context = env()->isolate()->GetCurrentContext();
1239 Local<Symbol> method_name = env()->messaging_transfer_list_symbol();
1240
1241 Local<Value> method;
1242 if (!object()->Get(context, method_name).ToLocal(&method)) {
1243 return Nothing<BaseObjectList>();
1244 }
1245 if (!method->IsFunction()) return Just(BaseObjectList {});
1246
1247 Local<Value> list_v;
1248 if (!method.As<Function>()->Call(
1249 context, object(), 0, nullptr).ToLocal(&list_v)) {
1250 return Nothing<BaseObjectList>();
1251 }
1252 if (!list_v->IsArray()) return Just(BaseObjectList {});
1253 Local<Array> list = list_v.As<Array>();
1254
1255 BaseObjectList ret;
1256 for (size_t i = 0; i < list->Length(); i++) {
1257 Local<Value> value;
1258 if (!list->Get(context, i).ToLocal(&value))
1259 return Nothing<BaseObjectList>();
1260 if (env()->base_object_ctor_template()->HasInstance(value))
1261 ret.emplace_back(Unwrap<BaseObject>(value));
1262 }
1263 return Just(ret);
1264 }
1265
FinalizeTransferRead(Local<Context> context,ValueDeserializer * deserializer)1266 Maybe<bool> JSTransferable::FinalizeTransferRead(
1267 Local<Context> context, ValueDeserializer* deserializer) {
1268 // Call `this[kDeserialize](data)` where `data` comes from the return value
1269 // of `this[kTransfer]()` or `this[kClone]()`.
1270 HandleScope handle_scope(env()->isolate());
1271 Local<Value> data;
1272 if (!deserializer->ReadValue(context).ToLocal(&data)) return Nothing<bool>();
1273
1274 Local<Symbol> method_name = env()->messaging_deserialize_symbol();
1275 Local<Value> method;
1276 if (!object()->Get(context, method_name).ToLocal(&method)) {
1277 return Nothing<bool>();
1278 }
1279 if (!method->IsFunction()) return Just(true);
1280
1281 if (method.As<Function>()->Call(context, object(), 1, &data).IsEmpty()) {
1282 return Nothing<bool>();
1283 }
1284 return Just(true);
1285 }
1286
Data(std::string && deserialize_info,v8::Global<v8::Value> && data)1287 JSTransferable::Data::Data(std::string&& deserialize_info,
1288 v8::Global<v8::Value>&& data)
1289 : deserialize_info_(std::move(deserialize_info)),
1290 data_(std::move(data)) {}
1291
Deserialize(Environment * env,Local<Context> context,std::unique_ptr<TransferData> self)1292 BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
1293 Environment* env,
1294 Local<Context> context,
1295 std::unique_ptr<TransferData> self) {
1296 // Create the JS wrapper object that will later be filled with data passed to
1297 // the `[kDeserialize]()` method on it. This split is necessary, because here
1298 // we need to create an object with the right prototype and internal fields,
1299 // but the actual JS data stored in the serialized data can only be read at
1300 // the end of the stream, after the main message has been read.
1301
1302 if (context != env->context()) {
1303 THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
1304 return {};
1305 }
1306 HandleScope handle_scope(env->isolate());
1307 Local<Value> info;
1308 if (!ToV8Value(context, deserialize_info_).ToLocal(&info)) return {};
1309
1310 Local<Value> ret;
1311 CHECK(!env->messaging_deserialize_create_object().IsEmpty());
1312 if (!env->messaging_deserialize_create_object()->Call(
1313 context, Null(env->isolate()), 1, &info).ToLocal(&ret) ||
1314 !env->base_object_ctor_template()->HasInstance(ret)) {
1315 return {};
1316 }
1317
1318 return BaseObjectPtr<BaseObject> { Unwrap<BaseObject>(ret) };
1319 }
1320
FinalizeTransferWrite(Local<Context> context,ValueSerializer * serializer)1321 Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
1322 Local<Context> context, ValueSerializer* serializer) {
1323 HandleScope handle_scope(context->GetIsolate());
1324 auto ret = serializer->WriteValue(context, PersistentToLocal::Strong(data_));
1325 data_.Reset();
1326 return ret;
1327 }
1328
Get(const std::string & name)1329 std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
1330 Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1331 std::shared_ptr<SiblingGroup> group;
1332 auto i = groups_.find(name);
1333 if (i == groups_.end() || i->second.expired()) {
1334 group = std::make_shared<SiblingGroup>(name);
1335 groups_[name] = group;
1336 } else {
1337 group = i->second.lock();
1338 }
1339 return group;
1340 }
1341
CheckSiblingGroup(const std::string & name)1342 void SiblingGroup::CheckSiblingGroup(const std::string& name) {
1343 Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
1344 auto i = groups_.find(name);
1345 if (i != groups_.end() && i->second.expired())
1346 groups_.erase(name);
1347 }
1348
SiblingGroup(const std::string & name)1349 SiblingGroup::SiblingGroup(const std::string& name)
1350 : name_(name) { }
1351
~SiblingGroup()1352 SiblingGroup::~SiblingGroup() {
1353 // If this is a named group, check to see if we can remove the group
1354 if (!name_.empty())
1355 CheckSiblingGroup(name_);
1356 }
1357
Dispatch(MessagePortData * source,std::shared_ptr<Message> message,std::string * error)1358 Maybe<bool> SiblingGroup::Dispatch(
1359 MessagePortData* source,
1360 std::shared_ptr<Message> message,
1361 std::string* error) {
1362
1363 RwLock::ScopedReadLock lock(group_mutex_);
1364
1365 // The source MessagePortData is not part of this group.
1366 if (ports_.find(source) == ports_.end()) {
1367 if (error != nullptr)
1368 *error = "Source MessagePort is not entangled with this group.";
1369 return Nothing<bool>();
1370 }
1371
1372 // There are no destination ports.
1373 if (size() <= 1)
1374 return Just(false);
1375
1376 // Transferables cannot be used when there is more
1377 // than a single destination.
1378 if (size() > 2 && message->has_transferables()) {
1379 if (error != nullptr)
1380 *error = "Transferables cannot be used with multiple destinations.";
1381 return Nothing<bool>();
1382 }
1383
1384 for (MessagePortData* port : ports_) {
1385 if (port == source)
1386 continue;
1387 // This loop should only be entered if there's only a single destination
1388 for (const auto& transferable : message->transferables()) {
1389 if (port == transferable.get()) {
1390 if (error != nullptr) {
1391 *error = "The target port was posted to itself, and the "
1392 "communication channel was lost";
1393 }
1394 return Just(true);
1395 }
1396 }
1397 port->AddToIncomingQueue(message);
1398 }
1399
1400 return Just(true);
1401 }
1402
Entangle(MessagePortData * port)1403 void SiblingGroup::Entangle(MessagePortData* port) {
1404 Entangle({ port });
1405 }
1406
Entangle(std::initializer_list<MessagePortData * > ports)1407 void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
1408 RwLock::ScopedWriteLock lock(group_mutex_);
1409 for (MessagePortData* data : ports) {
1410 ports_.insert(data);
1411 CHECK(!data->group_);
1412 data->group_ = shared_from_this();
1413 }
1414 }
1415
Disentangle(MessagePortData * data)1416 void SiblingGroup::Disentangle(MessagePortData* data) {
1417 auto self = shared_from_this(); // Keep alive until end of function.
1418 RwLock::ScopedWriteLock lock(group_mutex_);
1419 ports_.erase(data);
1420 data->group_.reset();
1421
1422 data->AddToIncomingQueue(std::make_shared<Message>());
1423 // If this is an anonymous group and there's another port, close it.
1424 if (size() == 1 && name_.empty())
1425 (*(ports_.begin()))->AddToIncomingQueue(std::make_shared<Message>());
1426 }
1427
1428 SiblingGroup::Map SiblingGroup::groups_;
1429 Mutex SiblingGroup::groups_mutex_;
1430
1431 namespace {
1432
SetDeserializerCreateObjectFunction(const FunctionCallbackInfo<Value> & args)1433 static void SetDeserializerCreateObjectFunction(
1434 const FunctionCallbackInfo<Value>& args) {
1435 Environment* env = Environment::GetCurrent(args);
1436 CHECK(args[0]->IsFunction());
1437 env->set_messaging_deserialize_create_object(args[0].As<Function>());
1438 }
1439
MessageChannel(const FunctionCallbackInfo<Value> & args)1440 static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
1441 Environment* env = Environment::GetCurrent(args);
1442 if (!args.IsConstructCall()) {
1443 THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
1444 return;
1445 }
1446
1447 Local<Context> context = args.This()->GetCreationContext().ToLocalChecked();
1448 Context::Scope context_scope(context);
1449
1450 MessagePort* port1 = MessagePort::New(env, context);
1451 if (port1 == nullptr) return;
1452 MessagePort* port2 = MessagePort::New(env, context);
1453 if (port2 == nullptr) {
1454 port1->Close();
1455 return;
1456 }
1457
1458 MessagePort::Entangle(port1, port2);
1459
1460 args.This()->Set(context, env->port1_string(), port1->object())
1461 .Check();
1462 args.This()->Set(context, env->port2_string(), port2->object())
1463 .Check();
1464 }
1465
BroadcastChannel(const FunctionCallbackInfo<Value> & args)1466 static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
1467 CHECK(args[0]->IsString());
1468 Environment* env = Environment::GetCurrent(args);
1469 Context::Scope context_scope(env->context());
1470 Utf8Value name(env->isolate(), args[0]);
1471 MessagePort* port =
1472 MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
1473 if (port != nullptr) {
1474 args.GetReturnValue().Set(port->object());
1475 }
1476 }
1477
InitMessaging(Local<Object> target,Local<Value> unused,Local<Context> context,void * priv)1478 static void InitMessaging(Local<Object> target,
1479 Local<Value> unused,
1480 Local<Context> context,
1481 void* priv) {
1482 Environment* env = Environment::GetCurrent(context);
1483 Isolate* isolate = env->isolate();
1484
1485 {
1486 SetConstructorFunction(context,
1487 target,
1488 "MessageChannel",
1489 NewFunctionTemplate(isolate, MessageChannel));
1490 }
1491
1492 {
1493 Local<FunctionTemplate> t =
1494 NewFunctionTemplate(isolate, JSTransferable::New);
1495 t->Inherit(BaseObject::GetConstructorTemplate(env));
1496 t->InstanceTemplate()->SetInternalFieldCount(
1497 JSTransferable::kInternalFieldCount);
1498 t->SetClassName(OneByteString(isolate, "JSTransferable"));
1499 SetConstructorFunction(
1500 context, target, "JSTransferable", t, SetConstructorFunctionFlag::NONE);
1501 }
1502
1503 SetConstructorFunction(context,
1504 target,
1505 env->message_port_constructor_string(),
1506 GetMessagePortConstructorTemplate(env),
1507 SetConstructorFunctionFlag::NONE);
1508
1509 // These are not methods on the MessagePort prototype, because
1510 // the browser equivalents do not provide them.
1511 SetMethod(context, target, "stopMessagePort", MessagePort::Stop);
1512 SetMethod(context, target, "checkMessagePort", MessagePort::CheckType);
1513 SetMethod(context, target, "drainMessagePort", MessagePort::Drain);
1514 SetMethod(
1515 context, target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
1516 SetMethod(
1517 context, target, "moveMessagePortToContext", MessagePort::MoveToContext);
1518 SetMethod(context,
1519 target,
1520 "setDeserializerCreateObjectFunction",
1521 SetDeserializerCreateObjectFunction);
1522 SetMethod(context, target, "broadcastChannel", BroadcastChannel);
1523
1524 {
1525 Local<Function> domexception = GetDOMException(context).ToLocalChecked();
1526 target
1527 ->Set(context,
1528 FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
1529 domexception)
1530 .Check();
1531 }
1532 }
1533
RegisterExternalReferences(ExternalReferenceRegistry * registry)1534 static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
1535 registry->Register(MessageChannel);
1536 registry->Register(BroadcastChannel);
1537 registry->Register(JSTransferable::New);
1538 registry->Register(MessagePort::New);
1539 registry->Register(MessagePort::PostMessage);
1540 registry->Register(MessagePort::Start);
1541 registry->Register(MessagePort::Stop);
1542 registry->Register(MessagePort::CheckType);
1543 registry->Register(MessagePort::Drain);
1544 registry->Register(MessagePort::ReceiveMessage);
1545 registry->Register(MessagePort::MoveToContext);
1546 registry->Register(SetDeserializerCreateObjectFunction);
1547 }
1548
1549 } // anonymous namespace
1550
1551 } // namespace worker
1552 } // namespace node
1553
1554 NODE_BINDING_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)
1555 NODE_BINDING_EXTERNAL_REFERENCE(messaging,
1556 node::worker::RegisterExternalReferences)
1557