1 #ifndef SRC_NODE_MESSAGING_H_ 2 #define SRC_NODE_MESSAGING_H_ 3 4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 5 6 #include "env.h" 7 #include "node_mutex.h" 8 #include "v8.h" 9 #include <deque> 10 #include <string> 11 #include <unordered_map> 12 #include <set> 13 14 namespace node { 15 namespace worker { 16 17 class MessagePortData; 18 class MessagePort; 19 20 typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList; 21 22 // Used to represent the in-flight structure of an object that is being 23 // transferred or cloned using postMessage(). 24 class TransferData : public MemoryRetainer { 25 public: 26 // Deserialize this object on the receiving end after a .postMessage() call. 27 // - `context` may not be the same as `env->context()`. This method should 28 // not produce JS objects coming from Contexts other than `context`. 29 // - `self` is a unique_ptr for the object that this is being called on. 30 // - The return value is treated like a `Maybe`, i.e. if `nullptr` is 31 // returned, any further deserialization of the message is stopped and 32 // control is returned to the event loop or JS as soon as possible. 33 virtual BaseObjectPtr<BaseObject> Deserialize( 34 Environment* env, 35 v8::Local<v8::Context> context, 36 std::unique_ptr<TransferData> self) = 0; 37 // FinalizeTransferWrite() is the counterpart to 38 // BaseObject::FinalizeTransferRead(). It is called right after the transfer 39 // data was created, and defaults to doing nothing. After this function, 40 // this object should not hold any more Isolate-specific data. 41 virtual v8::Maybe<bool> FinalizeTransferWrite( 42 v8::Local<v8::Context> context, v8::ValueSerializer* serializer); 43 }; 44 45 // Represents a single communication message. 46 class Message : public MemoryRetainer { 47 public: 48 // Create a Message with a specific underlying payload, in the format of the 49 // V8 ValueSerializer API. If `payload` is empty, this message indicates 50 // that the receiving message port should close itself. 51 explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>()); 52 ~Message() = default; 53 54 Message(Message&& other) = default; 55 Message& operator=(Message&& other) = default; 56 Message& operator=(const Message&) = delete; 57 Message(const Message&) = delete; 58 59 // Whether this is a message indicating that the port is to be closed. 60 // This is the last message to be received by a MessagePort. 61 bool IsCloseMessage() const; 62 63 // Deserialize the contained JS value. May only be called once, and only 64 // after Serialize() has been called (e.g. by another thread). 65 v8::MaybeLocal<v8::Value> Deserialize( 66 Environment* env, 67 v8::Local<v8::Context> context, 68 v8::Local<v8::Value>* port_list = nullptr); 69 70 // Serialize a JS value, and optionally transfer objects, into this message. 71 // The Message object retains ownership of all transferred objects until 72 // deserialization. 73 // The source_port parameter, if provided, will make Serialize() throw a 74 // "DataCloneError" DOMException if source_port is found in transfer_list. 75 v8::Maybe<bool> Serialize(Environment* env, 76 v8::Local<v8::Context> context, 77 v8::Local<v8::Value> input, 78 const TransferList& transfer_list, 79 v8::Local<v8::Object> source_port = 80 v8::Local<v8::Object>()); 81 82 // Internal method of Message that is called when a new SharedArrayBuffer 83 // object is encountered in the incoming value's structure. 84 void AddSharedArrayBuffer(std::shared_ptr<v8::BackingStore> backing_store); 85 // Internal method of Message that is called once serialization finishes 86 // and that transfers ownership of `data` to this message. 87 void AddTransferable(std::unique_ptr<TransferData>&& data); 88 // Internal method of Message that is called when a new WebAssembly.Module 89 // object is encountered in the incoming value's structure. 90 uint32_t AddWASMModule(v8::CompiledWasmModule&& mod); 91 92 // The host objects that will be transferred, as recorded by Serialize() 93 // (e.g. MessagePorts). 94 // Used for warning user about posting the target MessagePort to itself, 95 // which will as a side effect destroy the communication channel. transferables()96 const std::vector<std::unique_ptr<TransferData>>& transferables() const { 97 return transferables_; 98 } has_transferables()99 bool has_transferables() const { 100 return !transferables_.empty() || !array_buffers_.empty(); 101 } 102 103 void MemoryInfo(MemoryTracker* tracker) const override; 104 105 SET_MEMORY_INFO_NAME(Message) 106 SET_SELF_SIZE(Message) 107 108 private: 109 MallocedBuffer<char> main_message_buf_; 110 // TODO(addaleax): Make this a std::variant to save storage size in the common 111 // case (which is that all of these vectors are empty) once that is available 112 // with C++17. 113 std::vector<std::shared_ptr<v8::BackingStore>> array_buffers_; 114 std::vector<std::shared_ptr<v8::BackingStore>> shared_array_buffers_; 115 std::vector<std::unique_ptr<TransferData>> transferables_; 116 std::vector<v8::CompiledWasmModule> wasm_modules_; 117 118 friend class MessagePort; 119 }; 120 121 class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> { 122 public: 123 // Named SiblingGroup, Used for one-to-many BroadcastChannels. 124 static std::shared_ptr<SiblingGroup> Get(const std::string& name); 125 126 // Anonymous SiblingGroup, Used for one-to-one MessagePort pairs. 127 SiblingGroup() = default; 128 explicit SiblingGroup(const std::string& name); 129 ~SiblingGroup(); 130 131 // Dispatches the Message to the collection of associated 132 // ports. If there is more than one destination port and 133 // the Message contains transferables, Dispatch will fail. 134 // Returns Just(true) if successful and the message was 135 // dispatched to at least one destination. Returns Just(false) 136 // if there were no destinations. Returns Nothing<bool>() 137 // if there was an error. If error is not nullptr, it will 138 // be set to an error message or warning message as appropriate. 139 v8::Maybe<bool> Dispatch( 140 MessagePortData* source, 141 std::shared_ptr<Message> message, 142 std::string* error = nullptr); 143 144 void Entangle(MessagePortData* data); 145 void Entangle(std::initializer_list<MessagePortData*> data); 146 void Disentangle(MessagePortData* data); 147 name()148 const std::string& name() const { return name_; } 149 size()150 size_t size() const { return ports_.size(); } 151 152 private: 153 const std::string name_; 154 RwLock group_mutex_; // Protects ports_. 155 std::set<MessagePortData*> ports_; 156 157 static void CheckSiblingGroup(const std::string& name); 158 159 using Map = 160 std::unordered_map<std::string, std::weak_ptr<SiblingGroup>>; 161 162 static Mutex groups_mutex_; 163 static Map groups_; 164 }; 165 166 // This contains all data for a `MessagePort` instance that is not tied to 167 // a specific Environment/Isolate/event loop, for easier transfer between those. 168 class MessagePortData : public TransferData { 169 public: 170 explicit MessagePortData(MessagePort* owner); 171 ~MessagePortData() override; 172 173 MessagePortData(MessagePortData&& other) = delete; 174 MessagePortData& operator=(MessagePortData&& other) = delete; 175 MessagePortData(const MessagePortData& other) = delete; 176 MessagePortData& operator=(const MessagePortData& other) = delete; 177 178 // Add a message to the incoming queue and notify the receiver. 179 // This may be called from any thread. 180 void AddToIncomingQueue(std::shared_ptr<Message> message); 181 v8::Maybe<bool> Dispatch( 182 std::shared_ptr<Message> message, 183 std::string* error = nullptr); 184 185 // Turns `a` and `b` into siblings, i.e. connects the sending side of one 186 // to the receiving side of the other. This is not thread-safe. 187 static void Entangle(MessagePortData* a, MessagePortData* b); 188 189 // Removes any possible sibling. This is thread-safe (it acquires both 190 // `sibling_mutex_` and `mutex_`), and has to be because it is called once 191 // the corresponding JS handle handle wants to close 192 // which can happen on either side of a worker. 193 void Disentangle(); 194 195 void MemoryInfo(MemoryTracker* tracker) const override; 196 BaseObjectPtr<BaseObject> Deserialize( 197 Environment* env, 198 v8::Local<v8::Context> context, 199 std::unique_ptr<TransferData> self) override; 200 201 SET_MEMORY_INFO_NAME(MessagePortData) 202 SET_SELF_SIZE(MessagePortData) 203 204 private: 205 // This mutex protects all fields below it, with the exception of 206 // sibling_. 207 mutable Mutex mutex_; 208 // TODO(addaleax): Make this a std::variant<std::shared_ptr, std::unique_ptr> 209 // once that is available with C++17, because std::shared_ptr comes with 210 // overhead that is only necessary for BroadcastChannel. 211 std::deque<std::shared_ptr<Message>> incoming_messages_; 212 MessagePort* owner_ = nullptr; 213 std::shared_ptr<SiblingGroup> group_; 214 friend class MessagePort; 215 friend class SiblingGroup; 216 }; 217 218 // A message port that receives messages from other threads, including 219 // the uv_async_t handle that is used to notify the current event loop of 220 // new incoming messages. 221 class MessagePort : public HandleWrap { 222 private: 223 // Create a new MessagePort. The `context` argument specifies the Context 224 // instance that is used for creating the values emitted from this port. 225 // This is called by MessagePort::New(), which is the public API used for 226 // creating MessagePort instances. 227 MessagePort(Environment* env, 228 v8::Local<v8::Context> context, 229 v8::Local<v8::Object> wrap); 230 231 public: 232 ~MessagePort() override; 233 234 // Create a new message port instance, optionally over an existing 235 // `MessagePortData` object. 236 static MessagePort* New(Environment* env, 237 v8::Local<v8::Context> context, 238 std::unique_ptr<MessagePortData> data = {}, 239 std::shared_ptr<SiblingGroup> sibling_group = {}); 240 241 // Send a message, i.e. deliver it into the sibling's incoming queue. 242 // If this port is closed, or if there is no sibling, this message is 243 // serialized with transfers, then silently discarded. 244 v8::Maybe<bool> PostMessage(Environment* env, 245 v8::Local<v8::Context> context, 246 v8::Local<v8::Value> message, 247 const TransferList& transfer); 248 249 // Start processing messages on this port as a receiving end. 250 void Start(); 251 // Stop processing messages on this port as a receiving end. 252 void Stop(); 253 254 /* constructor */ 255 static void New(const v8::FunctionCallbackInfo<v8::Value>& args); 256 /* prototype methods */ 257 static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args); 258 static void Start(const v8::FunctionCallbackInfo<v8::Value>& args); 259 static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args); 260 static void CheckType(const v8::FunctionCallbackInfo<v8::Value>& args); 261 static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args); 262 static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args); 263 264 /* static */ 265 static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args); 266 267 // Turns `a` and `b` into siblings, i.e. connects the sending side of one 268 // to the receiving side of the other. This is not thread-safe. 269 static void Entangle(MessagePort* a, MessagePort* b); 270 static void Entangle(MessagePort* a, MessagePortData* b); 271 272 // Detach this port's data for transferring. After this, the MessagePortData 273 // is no longer associated with this handle, although it can still receive 274 // messages. 275 std::unique_ptr<MessagePortData> Detach(); 276 277 void Close( 278 v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override; 279 280 // Returns true if either data_ has been freed, or if the handle is being 281 // closed. Equivalent to the [[Detached]] internal slot in the HTML Standard. 282 // 283 // If checking if a JavaScript MessagePort object is detached, this method 284 // alone is often not enough, since the backing C++ MessagePort object may 285 // have been deleted already. For all intents and purposes, an object with a 286 // NULL pointer to the C++ MessagePort object is also detached. 287 inline bool IsDetached() const; 288 289 TransferMode GetTransferMode() const override; 290 std::unique_ptr<TransferData> TransferForMessaging() override; 291 292 void MemoryInfo(MemoryTracker* tracker) const override; 293 SET_MEMORY_INFO_NAME(MessagePort) 294 SET_SELF_SIZE(MessagePort) 295 296 private: 297 enum class MessageProcessingMode { 298 kNormalOperation, 299 kForceReadMessages 300 }; 301 302 void OnClose() override; 303 void OnMessage(MessageProcessingMode mode); 304 void TriggerAsync(); 305 v8::MaybeLocal<v8::Value> ReceiveMessage( 306 v8::Local<v8::Context> context, 307 MessageProcessingMode mode, 308 v8::Local<v8::Value>* port_list = nullptr); 309 310 std::unique_ptr<MessagePortData> data_ = nullptr; 311 bool receiving_messages_ = false; 312 uv_async_t async_; 313 v8::Global<v8::Function> emit_message_fn_; 314 315 friend class MessagePortData; 316 }; 317 318 // Provide a base class from which JS classes that should be transferable or 319 // cloneable by postMessage() can inherit. 320 // See e.g. FileHandle in internal/fs/promises.js for an example. 321 class JSTransferable : public BaseObject { 322 public: 323 JSTransferable(Environment* env, v8::Local<v8::Object> obj); 324 static void New(const v8::FunctionCallbackInfo<v8::Value>& args); 325 326 TransferMode GetTransferMode() const override; 327 std::unique_ptr<TransferData> TransferForMessaging() override; 328 std::unique_ptr<TransferData> CloneForMessaging() const override; 329 v8::Maybe<std::vector<BaseObjectPtr<BaseObject>>> 330 NestedTransferables() const override; 331 v8::Maybe<bool> FinalizeTransferRead( 332 v8::Local<v8::Context> context, 333 v8::ValueDeserializer* deserializer) override; 334 335 SET_NO_MEMORY_INFO() 336 SET_MEMORY_INFO_NAME(JSTransferable) 337 SET_SELF_SIZE(JSTransferable) 338 339 private: 340 std::unique_ptr<TransferData> TransferOrClone(TransferMode mode) const; 341 342 class Data : public TransferData { 343 public: 344 Data(std::string&& deserialize_info, v8::Global<v8::Value>&& data); 345 346 BaseObjectPtr<BaseObject> Deserialize( 347 Environment* env, 348 v8::Local<v8::Context> context, 349 std::unique_ptr<TransferData> self) override; 350 v8::Maybe<bool> FinalizeTransferWrite( 351 v8::Local<v8::Context> context, 352 v8::ValueSerializer* serializer) override; 353 354 SET_NO_MEMORY_INFO() 355 SET_MEMORY_INFO_NAME(JSTransferableTransferData) 356 SET_SELF_SIZE(Data) 357 358 private: 359 std::string deserialize_info_; 360 v8::Global<v8::Value> data_; 361 }; 362 }; 363 364 v8::Local<v8::FunctionTemplate> GetMessagePortConstructorTemplate( 365 Environment* env); 366 367 } // namespace worker 368 } // namespace node 369 370 #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 371 372 373 #endif // SRC_NODE_MESSAGING_H_ 374