• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef SRC_NODE_MESSAGING_H_
2 #define SRC_NODE_MESSAGING_H_
3 
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5 
6 #include "env.h"
7 #include "node_mutex.h"
8 #include <list>
9 
10 namespace node {
11 namespace worker {
12 
13 class MessagePortData;
14 class MessagePort;
15 
16 typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList;
17 
18 // Used to represent the in-flight structure of an object that is being
19 // transferred or cloned using postMessage().
20 class TransferData : public MemoryRetainer {
21  public:
22   // Deserialize this object on the receiving end after a .postMessage() call.
23   // - `context` may not be the same as `env->context()`. This method should
24   //    not produce JS objects coming from Contexts other than `context`.
25   // - `self` is a unique_ptr for the object that this is being called on.
26   // - The return value is treated like a `Maybe`, i.e. if `nullptr` is
27   //   returned, any further deserialization of the message is stopped and
28   //   control is returned to the event loop or JS as soon as possible.
29   virtual BaseObjectPtr<BaseObject> Deserialize(
30       Environment* env,
31       v8::Local<v8::Context> context,
32       std::unique_ptr<TransferData> self) = 0;
33   // FinalizeTransferWrite() is the counterpart to
34   // BaseObject::FinalizeTransferRead(). It is called right after the transfer
35   // data was created, and defaults to doing nothing. After this function,
36   // this object should not hold any more Isolate-specific data.
37   virtual v8::Maybe<bool> FinalizeTransferWrite(
38       v8::Local<v8::Context> context, v8::ValueSerializer* serializer);
39 };
40 
41 // Represents a single communication message.
42 class Message : public MemoryRetainer {
43  public:
44   // Create a Message with a specific underlying payload, in the format of the
45   // V8 ValueSerializer API. If `payload` is empty, this message indicates
46   // that the receiving message port should close itself.
47   explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
48 
49   Message(Message&& other) = default;
50   Message& operator=(Message&& other) = default;
51   Message& operator=(const Message&) = delete;
52   Message(const Message&) = delete;
53 
54   // Whether this is a message indicating that the port is to be closed.
55   // This is the last message to be received by a MessagePort.
56   bool IsCloseMessage() const;
57 
58   // Deserialize the contained JS value. May only be called once, and only
59   // after Serialize() has been called (e.g. by another thread).
60   v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
61                                         v8::Local<v8::Context> context);
62 
63   // Serialize a JS value, and optionally transfer objects, into this message.
64   // The Message object retains ownership of all transferred objects until
65   // deserialization.
66   // The source_port parameter, if provided, will make Serialize() throw a
67   // "DataCloneError" DOMException if source_port is found in transfer_list.
68   v8::Maybe<bool> Serialize(Environment* env,
69                             v8::Local<v8::Context> context,
70                             v8::Local<v8::Value> input,
71                             const TransferList& transfer_list,
72                             v8::Local<v8::Object> source_port =
73                                 v8::Local<v8::Object>());
74 
75   // Internal method of Message that is called when a new SharedArrayBuffer
76   // object is encountered in the incoming value's structure.
77   void AddSharedArrayBuffer(std::shared_ptr<v8::BackingStore> backing_store);
78   // Internal method of Message that is called once serialization finishes
79   // and that transfers ownership of `data` to this message.
80   void AddTransferable(std::unique_ptr<TransferData>&& data);
81   // Internal method of Message that is called when a new WebAssembly.Module
82   // object is encountered in the incoming value's structure.
83   uint32_t AddWASMModule(v8::CompiledWasmModule&& mod);
84 
85   // The host objects that will be transferred, as recorded by Serialize()
86   // (e.g. MessagePorts).
87   // Used for warning user about posting the target MessagePort to itself,
88   // which will as a side effect destroy the communication channel.
transferables()89   const std::vector<std::unique_ptr<TransferData>>& transferables() const {
90     return transferables_;
91   }
92 
93   void MemoryInfo(MemoryTracker* tracker) const override;
94 
95   SET_MEMORY_INFO_NAME(Message)
96   SET_SELF_SIZE(Message)
97 
98  private:
99   MallocedBuffer<char> main_message_buf_;
100   std::vector<std::shared_ptr<v8::BackingStore>> array_buffers_;
101   std::vector<std::shared_ptr<v8::BackingStore>> shared_array_buffers_;
102   std::vector<std::unique_ptr<TransferData>> transferables_;
103   std::vector<v8::CompiledWasmModule> wasm_modules_;
104 
105   friend class MessagePort;
106 };
107 
108 // This contains all data for a `MessagePort` instance that is not tied to
109 // a specific Environment/Isolate/event loop, for easier transfer between those.
110 class MessagePortData : public TransferData {
111  public:
112   explicit MessagePortData(MessagePort* owner);
113   ~MessagePortData() override;
114 
115   MessagePortData(MessagePortData&& other) = delete;
116   MessagePortData& operator=(MessagePortData&& other) = delete;
117   MessagePortData(const MessagePortData& other) = delete;
118   MessagePortData& operator=(const MessagePortData& other) = delete;
119 
120   // Add a message to the incoming queue and notify the receiver.
121   // This may be called from any thread.
122   void AddToIncomingQueue(Message&& message);
123 
124   // Turns `a` and `b` into siblings, i.e. connects the sending side of one
125   // to the receiving side of the other. This is not thread-safe.
126   static void Entangle(MessagePortData* a, MessagePortData* b);
127 
128   // Removes any possible sibling. This is thread-safe (it acquires both
129   // `sibling_mutex_` and `mutex_`), and has to be because it is called once
130   // the corresponding JS handle handle wants to close
131   // which can happen on either side of a worker.
132   void Disentangle();
133 
134   void MemoryInfo(MemoryTracker* tracker) const override;
135   BaseObjectPtr<BaseObject> Deserialize(
136       Environment* env,
137       v8::Local<v8::Context> context,
138       std::unique_ptr<TransferData> self) override;
139 
140   SET_MEMORY_INFO_NAME(MessagePortData)
141   SET_SELF_SIZE(MessagePortData)
142 
143  private:
144   // This mutex protects all fields below it, with the exception of
145   // sibling_.
146   mutable Mutex mutex_;
147   std::list<Message> incoming_messages_;
148   MessagePort* owner_ = nullptr;
149   // This mutex protects the sibling_ field and is shared between two entangled
150   // MessagePorts. If both mutexes are acquired, this one needs to be
151   // acquired first.
152   std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>();
153   MessagePortData* sibling_ = nullptr;
154 
155   friend class MessagePort;
156 };
157 
158 // A message port that receives messages from other threads, including
159 // the uv_async_t handle that is used to notify the current event loop of
160 // new incoming messages.
161 class MessagePort : public HandleWrap {
162  private:
163   // Create a new MessagePort. The `context` argument specifies the Context
164   // instance that is used for creating the values emitted from this port.
165   // This is called by MessagePort::New(), which is the public API used for
166   // creating MessagePort instances.
167   MessagePort(Environment* env,
168               v8::Local<v8::Context> context,
169               v8::Local<v8::Object> wrap);
170 
171  public:
172   ~MessagePort() override;
173 
174   // Create a new message port instance, optionally over an existing
175   // `MessagePortData` object.
176   static MessagePort* New(Environment* env,
177                           v8::Local<v8::Context> context,
178                           std::unique_ptr<MessagePortData> data = nullptr);
179 
180   // Send a message, i.e. deliver it into the sibling's incoming queue.
181   // If this port is closed, or if there is no sibling, this message is
182   // serialized with transfers, then silently discarded.
183   v8::Maybe<bool> PostMessage(Environment* env,
184                               v8::Local<v8::Context> context,
185                               v8::Local<v8::Value> message,
186                               const TransferList& transfer);
187 
188   // Start processing messages on this port as a receiving end.
189   void Start();
190   // Stop processing messages on this port as a receiving end.
191   void Stop();
192 
193   /* constructor */
194   static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
195   /* prototype methods */
196   static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
197   static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
198   static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
199   static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
200   static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
201 
202   /* static */
203   static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args);
204 
205   // Turns `a` and `b` into siblings, i.e. connects the sending side of one
206   // to the receiving side of the other. This is not thread-safe.
207   static void Entangle(MessagePort* a, MessagePort* b);
208   static void Entangle(MessagePort* a, MessagePortData* b);
209 
210   // Detach this port's data for transferring. After this, the MessagePortData
211   // is no longer associated with this handle, although it can still receive
212   // messages.
213   std::unique_ptr<MessagePortData> Detach();
214 
215   void Close(
216       v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override;
217 
218   // Returns true if either data_ has been freed, or if the handle is being
219   // closed. Equivalent to the [[Detached]] internal slot in the HTML Standard.
220   //
221   // If checking if a JavaScript MessagePort object is detached, this method
222   // alone is often not enough, since the backing C++ MessagePort object may
223   // have been deleted already. For all intents and purposes, an object with a
224   // NULL pointer to the C++ MessagePort object is also detached.
225   inline bool IsDetached() const;
226 
227   TransferMode GetTransferMode() const override;
228   std::unique_ptr<TransferData> TransferForMessaging() override;
229 
230   void MemoryInfo(MemoryTracker* tracker) const override;
231   SET_MEMORY_INFO_NAME(MessagePort)
232   SET_SELF_SIZE(MessagePort)
233 
234  private:
235   void OnClose() override;
236   void OnMessage();
237   void TriggerAsync();
238   v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context,
239                                            bool only_if_receiving);
240 
241   std::unique_ptr<MessagePortData> data_ = nullptr;
242   bool receiving_messages_ = false;
243   uv_async_t async_;
244   v8::Global<v8::Function> emit_message_fn_;
245 
246   friend class MessagePortData;
247 };
248 
249 // Provide a base class from which JS classes that should be transferable or
250 // cloneable by postMesssage() can inherit.
251 // See e.g. FileHandle in internal/fs/promises.js for an example.
252 class JSTransferable : public BaseObject {
253  public:
254   JSTransferable(Environment* env, v8::Local<v8::Object> obj);
255   static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
256 
257   TransferMode GetTransferMode() const override;
258   std::unique_ptr<TransferData> TransferForMessaging() override;
259   std::unique_ptr<TransferData> CloneForMessaging() const override;
260   v8::Maybe<std::vector<BaseObjectPtr<BaseObject>>>
261       NestedTransferables() const override;
262   v8::Maybe<bool> FinalizeTransferRead(
263       v8::Local<v8::Context> context,
264       v8::ValueDeserializer* deserializer) override;
265 
266   SET_NO_MEMORY_INFO()
267   SET_MEMORY_INFO_NAME(JSTransferable)
268   SET_SELF_SIZE(JSTransferable)
269 
270  private:
271   std::unique_ptr<TransferData> TransferOrClone(TransferMode mode) const;
272 
273   class Data : public TransferData {
274    public:
275     Data(std::string&& deserialize_info, v8::Global<v8::Value>&& data);
276 
277     BaseObjectPtr<BaseObject> Deserialize(
278         Environment* env,
279         v8::Local<v8::Context> context,
280         std::unique_ptr<TransferData> self) override;
281     v8::Maybe<bool> FinalizeTransferWrite(
282         v8::Local<v8::Context> context,
283         v8::ValueSerializer* serializer) override;
284 
285     SET_NO_MEMORY_INFO()
286     SET_MEMORY_INFO_NAME(JSTransferableTransferData)
287     SET_SELF_SIZE(Data)
288 
289    private:
290     std::string deserialize_info_;
291     v8::Global<v8::Value> data_;
292   };
293 };
294 
295 v8::Local<v8::FunctionTemplate> GetMessagePortConstructorTemplate(
296     Environment* env);
297 
298 }  // namespace worker
299 }  // namespace node
300 
301 #endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
302 
303 
304 #endif  // SRC_NODE_MESSAGING_H_
305