1 #ifndef SRC_NODE_MESSAGING_H_ 2 #define SRC_NODE_MESSAGING_H_ 3 4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 5 6 #include "env.h" 7 #include "node_mutex.h" 8 #include <list> 9 10 namespace node { 11 namespace worker { 12 13 class MessagePortData; 14 class MessagePort; 15 16 typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList; 17 18 // Used to represent the in-flight structure of an object that is being 19 // transferred or cloned using postMessage(). 20 class TransferData : public MemoryRetainer { 21 public: 22 // Deserialize this object on the receiving end after a .postMessage() call. 23 // - `context` may not be the same as `env->context()`. This method should 24 // not produce JS objects coming from Contexts other than `context`. 25 // - `self` is a unique_ptr for the object that this is being called on. 26 // - The return value is treated like a `Maybe`, i.e. if `nullptr` is 27 // returned, any further deserialization of the message is stopped and 28 // control is returned to the event loop or JS as soon as possible. 29 virtual BaseObjectPtr<BaseObject> Deserialize( 30 Environment* env, 31 v8::Local<v8::Context> context, 32 std::unique_ptr<TransferData> self) = 0; 33 // FinalizeTransferWrite() is the counterpart to 34 // BaseObject::FinalizeTransferRead(). It is called right after the transfer 35 // data was created, and defaults to doing nothing. After this function, 36 // this object should not hold any more Isolate-specific data. 37 virtual v8::Maybe<bool> FinalizeTransferWrite( 38 v8::Local<v8::Context> context, v8::ValueSerializer* serializer); 39 }; 40 41 // Represents a single communication message. 42 class Message : public MemoryRetainer { 43 public: 44 // Create a Message with a specific underlying payload, in the format of the 45 // V8 ValueSerializer API. If `payload` is empty, this message indicates 46 // that the receiving message port should close itself. 47 explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>()); 48 49 Message(Message&& other) = default; 50 Message& operator=(Message&& other) = default; 51 Message& operator=(const Message&) = delete; 52 Message(const Message&) = delete; 53 54 // Whether this is a message indicating that the port is to be closed. 55 // This is the last message to be received by a MessagePort. 56 bool IsCloseMessage() const; 57 58 // Deserialize the contained JS value. May only be called once, and only 59 // after Serialize() has been called (e.g. by another thread). 60 v8::MaybeLocal<v8::Value> Deserialize(Environment* env, 61 v8::Local<v8::Context> context); 62 63 // Serialize a JS value, and optionally transfer objects, into this message. 64 // The Message object retains ownership of all transferred objects until 65 // deserialization. 66 // The source_port parameter, if provided, will make Serialize() throw a 67 // "DataCloneError" DOMException if source_port is found in transfer_list. 68 v8::Maybe<bool> Serialize(Environment* env, 69 v8::Local<v8::Context> context, 70 v8::Local<v8::Value> input, 71 const TransferList& transfer_list, 72 v8::Local<v8::Object> source_port = 73 v8::Local<v8::Object>()); 74 75 // Internal method of Message that is called when a new SharedArrayBuffer 76 // object is encountered in the incoming value's structure. 77 void AddSharedArrayBuffer(std::shared_ptr<v8::BackingStore> backing_store); 78 // Internal method of Message that is called once serialization finishes 79 // and that transfers ownership of `data` to this message. 80 void AddTransferable(std::unique_ptr<TransferData>&& data); 81 // Internal method of Message that is called when a new WebAssembly.Module 82 // object is encountered in the incoming value's structure. 83 uint32_t AddWASMModule(v8::CompiledWasmModule&& mod); 84 85 // The host objects that will be transferred, as recorded by Serialize() 86 // (e.g. MessagePorts). 87 // Used for warning user about posting the target MessagePort to itself, 88 // which will as a side effect destroy the communication channel. transferables()89 const std::vector<std::unique_ptr<TransferData>>& transferables() const { 90 return transferables_; 91 } 92 93 void MemoryInfo(MemoryTracker* tracker) const override; 94 95 SET_MEMORY_INFO_NAME(Message) 96 SET_SELF_SIZE(Message) 97 98 private: 99 MallocedBuffer<char> main_message_buf_; 100 std::vector<std::shared_ptr<v8::BackingStore>> array_buffers_; 101 std::vector<std::shared_ptr<v8::BackingStore>> shared_array_buffers_; 102 std::vector<std::unique_ptr<TransferData>> transferables_; 103 std::vector<v8::CompiledWasmModule> wasm_modules_; 104 105 friend class MessagePort; 106 }; 107 108 // This contains all data for a `MessagePort` instance that is not tied to 109 // a specific Environment/Isolate/event loop, for easier transfer between those. 110 class MessagePortData : public TransferData { 111 public: 112 explicit MessagePortData(MessagePort* owner); 113 ~MessagePortData() override; 114 115 MessagePortData(MessagePortData&& other) = delete; 116 MessagePortData& operator=(MessagePortData&& other) = delete; 117 MessagePortData(const MessagePortData& other) = delete; 118 MessagePortData& operator=(const MessagePortData& other) = delete; 119 120 // Add a message to the incoming queue and notify the receiver. 121 // This may be called from any thread. 122 void AddToIncomingQueue(Message&& message); 123 124 // Turns `a` and `b` into siblings, i.e. connects the sending side of one 125 // to the receiving side of the other. This is not thread-safe. 126 static void Entangle(MessagePortData* a, MessagePortData* b); 127 128 // Removes any possible sibling. This is thread-safe (it acquires both 129 // `sibling_mutex_` and `mutex_`), and has to be because it is called once 130 // the corresponding JS handle handle wants to close 131 // which can happen on either side of a worker. 132 void Disentangle(); 133 134 void MemoryInfo(MemoryTracker* tracker) const override; 135 BaseObjectPtr<BaseObject> Deserialize( 136 Environment* env, 137 v8::Local<v8::Context> context, 138 std::unique_ptr<TransferData> self) override; 139 140 SET_MEMORY_INFO_NAME(MessagePortData) 141 SET_SELF_SIZE(MessagePortData) 142 143 private: 144 // This mutex protects all fields below it, with the exception of 145 // sibling_. 146 mutable Mutex mutex_; 147 std::list<Message> incoming_messages_; 148 MessagePort* owner_ = nullptr; 149 // This mutex protects the sibling_ field and is shared between two entangled 150 // MessagePorts. If both mutexes are acquired, this one needs to be 151 // acquired first. 152 std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>(); 153 MessagePortData* sibling_ = nullptr; 154 155 friend class MessagePort; 156 }; 157 158 // A message port that receives messages from other threads, including 159 // the uv_async_t handle that is used to notify the current event loop of 160 // new incoming messages. 161 class MessagePort : public HandleWrap { 162 private: 163 // Create a new MessagePort. The `context` argument specifies the Context 164 // instance that is used for creating the values emitted from this port. 165 // This is called by MessagePort::New(), which is the public API used for 166 // creating MessagePort instances. 167 MessagePort(Environment* env, 168 v8::Local<v8::Context> context, 169 v8::Local<v8::Object> wrap); 170 171 public: 172 ~MessagePort() override; 173 174 // Create a new message port instance, optionally over an existing 175 // `MessagePortData` object. 176 static MessagePort* New(Environment* env, 177 v8::Local<v8::Context> context, 178 std::unique_ptr<MessagePortData> data = nullptr); 179 180 // Send a message, i.e. deliver it into the sibling's incoming queue. 181 // If this port is closed, or if there is no sibling, this message is 182 // serialized with transfers, then silently discarded. 183 v8::Maybe<bool> PostMessage(Environment* env, 184 v8::Local<v8::Context> context, 185 v8::Local<v8::Value> message, 186 const TransferList& transfer); 187 188 // Start processing messages on this port as a receiving end. 189 void Start(); 190 // Stop processing messages on this port as a receiving end. 191 void Stop(); 192 193 /* constructor */ 194 static void New(const v8::FunctionCallbackInfo<v8::Value>& args); 195 /* prototype methods */ 196 static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args); 197 static void Start(const v8::FunctionCallbackInfo<v8::Value>& args); 198 static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args); 199 static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args); 200 static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args); 201 202 /* static */ 203 static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args); 204 205 // Turns `a` and `b` into siblings, i.e. connects the sending side of one 206 // to the receiving side of the other. This is not thread-safe. 207 static void Entangle(MessagePort* a, MessagePort* b); 208 static void Entangle(MessagePort* a, MessagePortData* b); 209 210 // Detach this port's data for transferring. After this, the MessagePortData 211 // is no longer associated with this handle, although it can still receive 212 // messages. 213 std::unique_ptr<MessagePortData> Detach(); 214 215 void Close( 216 v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override; 217 218 // Returns true if either data_ has been freed, or if the handle is being 219 // closed. Equivalent to the [[Detached]] internal slot in the HTML Standard. 220 // 221 // If checking if a JavaScript MessagePort object is detached, this method 222 // alone is often not enough, since the backing C++ MessagePort object may 223 // have been deleted already. For all intents and purposes, an object with a 224 // NULL pointer to the C++ MessagePort object is also detached. 225 inline bool IsDetached() const; 226 227 TransferMode GetTransferMode() const override; 228 std::unique_ptr<TransferData> TransferForMessaging() override; 229 230 void MemoryInfo(MemoryTracker* tracker) const override; 231 SET_MEMORY_INFO_NAME(MessagePort) 232 SET_SELF_SIZE(MessagePort) 233 234 private: 235 void OnClose() override; 236 void OnMessage(); 237 void TriggerAsync(); 238 v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context, 239 bool only_if_receiving); 240 241 std::unique_ptr<MessagePortData> data_ = nullptr; 242 bool receiving_messages_ = false; 243 uv_async_t async_; 244 v8::Global<v8::Function> emit_message_fn_; 245 246 friend class MessagePortData; 247 }; 248 249 // Provide a base class from which JS classes that should be transferable or 250 // cloneable by postMesssage() can inherit. 251 // See e.g. FileHandle in internal/fs/promises.js for an example. 252 class JSTransferable : public BaseObject { 253 public: 254 JSTransferable(Environment* env, v8::Local<v8::Object> obj); 255 static void New(const v8::FunctionCallbackInfo<v8::Value>& args); 256 257 TransferMode GetTransferMode() const override; 258 std::unique_ptr<TransferData> TransferForMessaging() override; 259 std::unique_ptr<TransferData> CloneForMessaging() const override; 260 v8::Maybe<std::vector<BaseObjectPtr<BaseObject>>> 261 NestedTransferables() const override; 262 v8::Maybe<bool> FinalizeTransferRead( 263 v8::Local<v8::Context> context, 264 v8::ValueDeserializer* deserializer) override; 265 266 SET_NO_MEMORY_INFO() 267 SET_MEMORY_INFO_NAME(JSTransferable) 268 SET_SELF_SIZE(JSTransferable) 269 270 private: 271 std::unique_ptr<TransferData> TransferOrClone(TransferMode mode) const; 272 273 class Data : public TransferData { 274 public: 275 Data(std::string&& deserialize_info, v8::Global<v8::Value>&& data); 276 277 BaseObjectPtr<BaseObject> Deserialize( 278 Environment* env, 279 v8::Local<v8::Context> context, 280 std::unique_ptr<TransferData> self) override; 281 v8::Maybe<bool> FinalizeTransferWrite( 282 v8::Local<v8::Context> context, 283 v8::ValueSerializer* serializer) override; 284 285 SET_NO_MEMORY_INFO() 286 SET_MEMORY_INFO_NAME(JSTransferableTransferData) 287 SET_SELF_SIZE(Data) 288 289 private: 290 std::string deserialize_info_; 291 v8::Global<v8::Value> data_; 292 }; 293 }; 294 295 v8::Local<v8::FunctionTemplate> GetMessagePortConstructorTemplate( 296 Environment* env); 297 298 } // namespace worker 299 } // namespace node 300 301 #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 302 303 304 #endif // SRC_NODE_MESSAGING_H_ 305