• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef SRC_NODE_MESSAGING_H_
2 #define SRC_NODE_MESSAGING_H_
3 
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5 
6 #include "env.h"
7 #include "node_mutex.h"
8 #include "v8.h"
9 #include <deque>
10 #include <string>
11 #include <unordered_map>
12 #include <set>
13 
14 namespace node {
15 namespace worker {
16 
17 class MessagePortData;
18 class MessagePort;
19 
20 typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList;
21 
22 // Used to represent the in-flight structure of an object that is being
23 // transferred or cloned using postMessage().
24 class TransferData : public MemoryRetainer {
25  public:
26   // Deserialize this object on the receiving end after a .postMessage() call.
27   // - `context` may not be the same as `env->context()`. This method should
28   //    not produce JS objects coming from Contexts other than `context`.
29   // - `self` is a unique_ptr for the object that this is being called on.
30   // - The return value is treated like a `Maybe`, i.e. if `nullptr` is
31   //   returned, any further deserialization of the message is stopped and
32   //   control is returned to the event loop or JS as soon as possible.
33   virtual BaseObjectPtr<BaseObject> Deserialize(
34       Environment* env,
35       v8::Local<v8::Context> context,
36       std::unique_ptr<TransferData> self) = 0;
37   // FinalizeTransferWrite() is the counterpart to
38   // BaseObject::FinalizeTransferRead(). It is called right after the transfer
39   // data was created, and defaults to doing nothing. After this function,
40   // this object should not hold any more Isolate-specific data.
41   virtual v8::Maybe<bool> FinalizeTransferWrite(
42       v8::Local<v8::Context> context, v8::ValueSerializer* serializer);
43 };
44 
45 // Represents a single communication message.
46 class Message : public MemoryRetainer {
47  public:
48   // Create a Message with a specific underlying payload, in the format of the
49   // V8 ValueSerializer API. If `payload` is empty, this message indicates
50   // that the receiving message port should close itself.
51   explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
52   ~Message() = default;
53 
54   Message(Message&& other) = default;
55   Message& operator=(Message&& other) = default;
56   Message& operator=(const Message&) = delete;
57   Message(const Message&) = delete;
58 
59   // Whether this is a message indicating that the port is to be closed.
60   // This is the last message to be received by a MessagePort.
61   bool IsCloseMessage() const;
62 
63   // Deserialize the contained JS value. May only be called once, and only
64   // after Serialize() has been called (e.g. by another thread).
65   v8::MaybeLocal<v8::Value> Deserialize(
66       Environment* env,
67       v8::Local<v8::Context> context,
68       v8::Local<v8::Value>* port_list = nullptr);
69 
70   // Serialize a JS value, and optionally transfer objects, into this message.
71   // The Message object retains ownership of all transferred objects until
72   // deserialization.
73   // The source_port parameter, if provided, will make Serialize() throw a
74   // "DataCloneError" DOMException if source_port is found in transfer_list.
75   v8::Maybe<bool> Serialize(Environment* env,
76                             v8::Local<v8::Context> context,
77                             v8::Local<v8::Value> input,
78                             const TransferList& transfer_list,
79                             v8::Local<v8::Object> source_port =
80                                 v8::Local<v8::Object>());
81 
82   // Internal method of Message that is called when a new SharedArrayBuffer
83   // object is encountered in the incoming value's structure.
84   void AddSharedArrayBuffer(std::shared_ptr<v8::BackingStore> backing_store);
85   // Internal method of Message that is called once serialization finishes
86   // and that transfers ownership of `data` to this message.
87   void AddTransferable(std::unique_ptr<TransferData>&& data);
88   // Internal method of Message that is called when a new WebAssembly.Module
89   // object is encountered in the incoming value's structure.
90   uint32_t AddWASMModule(v8::CompiledWasmModule&& mod);
91 
92   // The host objects that will be transferred, as recorded by Serialize()
93   // (e.g. MessagePorts).
94   // Used for warning user about posting the target MessagePort to itself,
95   // which will as a side effect destroy the communication channel.
transferables()96   const std::vector<std::unique_ptr<TransferData>>& transferables() const {
97     return transferables_;
98   }
has_transferables()99   bool has_transferables() const {
100     return !transferables_.empty() || !array_buffers_.empty();
101   }
102 
103   void MemoryInfo(MemoryTracker* tracker) const override;
104 
105   SET_MEMORY_INFO_NAME(Message)
106   SET_SELF_SIZE(Message)
107 
108  private:
109   MallocedBuffer<char> main_message_buf_;
110   // TODO(addaleax): Make this a std::variant to save storage size in the common
111   // case (which is that all of these vectors are empty) once that is available
112   // with C++17.
113   std::vector<std::shared_ptr<v8::BackingStore>> array_buffers_;
114   std::vector<std::shared_ptr<v8::BackingStore>> shared_array_buffers_;
115   std::vector<std::unique_ptr<TransferData>> transferables_;
116   std::vector<v8::CompiledWasmModule> wasm_modules_;
117 
118   friend class MessagePort;
119 };
120 
121 class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> {
122  public:
123   // Named SiblingGroup, Used for one-to-many BroadcastChannels.
124   static std::shared_ptr<SiblingGroup> Get(const std::string& name);
125 
126   // Anonymous SiblingGroup, Used for one-to-one MessagePort pairs.
127   SiblingGroup() = default;
128   explicit SiblingGroup(const std::string& name);
129   ~SiblingGroup();
130 
131   // Dispatches the Message to the collection of associated
132   // ports. If there is more than one destination port and
133   // the Message contains transferables, Dispatch will fail.
134   // Returns Just(true) if successful and the message was
135   // dispatched to at least one destination. Returns Just(false)
136   // if there were no destinations. Returns Nothing<bool>()
137   // if there was an error. If error is not nullptr, it will
138   // be set to an error message or warning message as appropriate.
139   v8::Maybe<bool> Dispatch(
140       MessagePortData* source,
141       std::shared_ptr<Message> message,
142       std::string* error = nullptr);
143 
144   void Entangle(MessagePortData* data);
145   void Entangle(std::initializer_list<MessagePortData*> data);
146   void Disentangle(MessagePortData* data);
147 
name()148   const std::string& name() const { return name_; }
149 
size()150   size_t size() const { return ports_.size(); }
151 
152  private:
153   const std::string name_;
154   RwLock group_mutex_;  // Protects ports_.
155   std::set<MessagePortData*> ports_;
156 
157   static void CheckSiblingGroup(const std::string& name);
158 
159   using Map =
160       std::unordered_map<std::string, std::weak_ptr<SiblingGroup>>;
161 
162   static Mutex groups_mutex_;
163   static Map groups_;
164 };
165 
166 // This contains all data for a `MessagePort` instance that is not tied to
167 // a specific Environment/Isolate/event loop, for easier transfer between those.
168 class MessagePortData : public TransferData {
169  public:
170   explicit MessagePortData(MessagePort* owner);
171   ~MessagePortData() override;
172 
173   MessagePortData(MessagePortData&& other) = delete;
174   MessagePortData& operator=(MessagePortData&& other) = delete;
175   MessagePortData(const MessagePortData& other) = delete;
176   MessagePortData& operator=(const MessagePortData& other) = delete;
177 
178   // Add a message to the incoming queue and notify the receiver.
179   // This may be called from any thread.
180   void AddToIncomingQueue(std::shared_ptr<Message> message);
181   v8::Maybe<bool> Dispatch(
182       std::shared_ptr<Message> message,
183       std::string* error = nullptr);
184 
185   // Turns `a` and `b` into siblings, i.e. connects the sending side of one
186   // to the receiving side of the other. This is not thread-safe.
187   static void Entangle(MessagePortData* a, MessagePortData* b);
188 
189   // Removes any possible sibling. This is thread-safe (it acquires both
190   // `sibling_mutex_` and `mutex_`), and has to be because it is called once
191   // the corresponding JS handle handle wants to close
192   // which can happen on either side of a worker.
193   void Disentangle();
194 
195   void MemoryInfo(MemoryTracker* tracker) const override;
196   BaseObjectPtr<BaseObject> Deserialize(
197       Environment* env,
198       v8::Local<v8::Context> context,
199       std::unique_ptr<TransferData> self) override;
200 
201   SET_MEMORY_INFO_NAME(MessagePortData)
202   SET_SELF_SIZE(MessagePortData)
203 
204  private:
205   // This mutex protects all fields below it, with the exception of
206   // sibling_.
207   mutable Mutex mutex_;
208   // TODO(addaleax): Make this a std::variant<std::shared_ptr, std::unique_ptr>
209   // once that is available with C++17, because std::shared_ptr comes with
210   // overhead that is only necessary for BroadcastChannel.
211   std::deque<std::shared_ptr<Message>> incoming_messages_;
212   MessagePort* owner_ = nullptr;
213   std::shared_ptr<SiblingGroup> group_;
214   friend class MessagePort;
215   friend class SiblingGroup;
216 };
217 
218 // A message port that receives messages from other threads, including
219 // the uv_async_t handle that is used to notify the current event loop of
220 // new incoming messages.
221 class MessagePort : public HandleWrap {
222  private:
223   // Create a new MessagePort. The `context` argument specifies the Context
224   // instance that is used for creating the values emitted from this port.
225   // This is called by MessagePort::New(), which is the public API used for
226   // creating MessagePort instances.
227   MessagePort(Environment* env,
228               v8::Local<v8::Context> context,
229               v8::Local<v8::Object> wrap);
230 
231  public:
232   ~MessagePort() override;
233 
234   // Create a new message port instance, optionally over an existing
235   // `MessagePortData` object.
236   static MessagePort* New(Environment* env,
237                           v8::Local<v8::Context> context,
238                           std::unique_ptr<MessagePortData> data = {},
239                           std::shared_ptr<SiblingGroup> sibling_group = {});
240 
241   // Send a message, i.e. deliver it into the sibling's incoming queue.
242   // If this port is closed, or if there is no sibling, this message is
243   // serialized with transfers, then silently discarded.
244   v8::Maybe<bool> PostMessage(Environment* env,
245                               v8::Local<v8::Context> context,
246                               v8::Local<v8::Value> message,
247                               const TransferList& transfer);
248 
249   // Start processing messages on this port as a receiving end.
250   void Start();
251   // Stop processing messages on this port as a receiving end.
252   void Stop();
253 
254   /* constructor */
255   static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
256   /* prototype methods */
257   static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
258   static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
259   static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
260   static void CheckType(const v8::FunctionCallbackInfo<v8::Value>& args);
261   static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
262   static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
263 
264   /* static */
265   static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args);
266 
267   // Turns `a` and `b` into siblings, i.e. connects the sending side of one
268   // to the receiving side of the other. This is not thread-safe.
269   static void Entangle(MessagePort* a, MessagePort* b);
270   static void Entangle(MessagePort* a, MessagePortData* b);
271 
272   // Detach this port's data for transferring. After this, the MessagePortData
273   // is no longer associated with this handle, although it can still receive
274   // messages.
275   std::unique_ptr<MessagePortData> Detach();
276 
277   void Close(
278       v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override;
279 
280   // Returns true if either data_ has been freed, or if the handle is being
281   // closed. Equivalent to the [[Detached]] internal slot in the HTML Standard.
282   //
283   // If checking if a JavaScript MessagePort object is detached, this method
284   // alone is often not enough, since the backing C++ MessagePort object may
285   // have been deleted already. For all intents and purposes, an object with a
286   // NULL pointer to the C++ MessagePort object is also detached.
287   inline bool IsDetached() const;
288 
289   TransferMode GetTransferMode() const override;
290   std::unique_ptr<TransferData> TransferForMessaging() override;
291 
292   void MemoryInfo(MemoryTracker* tracker) const override;
293   SET_MEMORY_INFO_NAME(MessagePort)
294   SET_SELF_SIZE(MessagePort)
295 
296  private:
297   enum class MessageProcessingMode {
298     kNormalOperation,
299     kForceReadMessages
300   };
301 
302   void OnClose() override;
303   void OnMessage(MessageProcessingMode mode);
304   void TriggerAsync();
305   v8::MaybeLocal<v8::Value> ReceiveMessage(
306       v8::Local<v8::Context> context,
307       MessageProcessingMode mode,
308       v8::Local<v8::Value>* port_list = nullptr);
309 
310   std::unique_ptr<MessagePortData> data_ = nullptr;
311   bool receiving_messages_ = false;
312   uv_async_t async_;
313   v8::Global<v8::Function> emit_message_fn_;
314 
315   friend class MessagePortData;
316 };
317 
318 // Provide a base class from which JS classes that should be transferable or
319 // cloneable by postMessage() can inherit.
320 // See e.g. FileHandle in internal/fs/promises.js for an example.
321 class JSTransferable : public BaseObject {
322  public:
323   JSTransferable(Environment* env, v8::Local<v8::Object> obj);
324   static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
325 
326   TransferMode GetTransferMode() const override;
327   std::unique_ptr<TransferData> TransferForMessaging() override;
328   std::unique_ptr<TransferData> CloneForMessaging() const override;
329   v8::Maybe<std::vector<BaseObjectPtr<BaseObject>>>
330       NestedTransferables() const override;
331   v8::Maybe<bool> FinalizeTransferRead(
332       v8::Local<v8::Context> context,
333       v8::ValueDeserializer* deserializer) override;
334 
335   SET_NO_MEMORY_INFO()
336   SET_MEMORY_INFO_NAME(JSTransferable)
337   SET_SELF_SIZE(JSTransferable)
338 
339  private:
340   std::unique_ptr<TransferData> TransferOrClone(TransferMode mode) const;
341 
342   class Data : public TransferData {
343    public:
344     Data(std::string&& deserialize_info, v8::Global<v8::Value>&& data);
345 
346     BaseObjectPtr<BaseObject> Deserialize(
347         Environment* env,
348         v8::Local<v8::Context> context,
349         std::unique_ptr<TransferData> self) override;
350     v8::Maybe<bool> FinalizeTransferWrite(
351         v8::Local<v8::Context> context,
352         v8::ValueSerializer* serializer) override;
353 
354     SET_NO_MEMORY_INFO()
355     SET_MEMORY_INFO_NAME(JSTransferableTransferData)
356     SET_SELF_SIZE(Data)
357 
358    private:
359     std::string deserialize_info_;
360     v8::Global<v8::Value> data_;
361   };
362 };
363 
364 v8::Local<v8::FunctionTemplate> GetMessagePortConstructorTemplate(
365     Environment* env);
366 
367 }  // namespace worker
368 }  // namespace node
369 
370 #endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
371 
372 
373 #endif  // SRC_NODE_MESSAGING_H_
374