1'use strict'; 2 3const { 4 ArrayPrototypeForEach, 5 ArrayPrototypeMap, 6 ArrayPrototypePush, 7 FunctionPrototypeBind, 8 FunctionPrototypeCall, 9 ObjectAssign, 10 ObjectCreate, 11 ObjectDefineProperty, 12 ObjectDefineProperties, 13 ObjectGetOwnPropertyDescriptors, 14 ObjectGetPrototypeOf, 15 ObjectSetPrototypeOf, 16 ObjectValues, 17 ReflectApply, 18 Symbol, 19 SymbolFor, 20} = primordials; 21 22const { 23 kEmptyObject, 24 kEnumerableProperty, 25} = require('internal/util'); 26 27const { 28 handle_onclose: handleOnCloseSymbol, 29 oninit: onInitSymbol, 30 no_message_symbol: noMessageSymbol, 31} = internalBinding('symbols'); 32const { 33 MessagePort, 34 MessageChannel, 35 broadcastChannel, 36 drainMessagePort, 37 moveMessagePortToContext, 38 receiveMessageOnPort: receiveMessageOnPort_, 39 stopMessagePort, 40 checkMessagePort, 41 DOMException, 42} = internalBinding('messaging'); 43const { 44 getEnvMessagePort, 45} = internalBinding('worker'); 46 47const { Readable, Writable } = require('stream'); 48const { 49 Event, 50 EventTarget, 51 NodeEventTarget, 52 defineEventHandler, 53 initNodeEventTarget, 54 kCreateEvent, 55 kNewListener, 56 kRemoveListener, 57} = require('internal/event_target'); 58const { inspect } = require('internal/util/inspect'); 59const { 60 codes: { 61 ERR_INVALID_ARG_TYPE, 62 ERR_INVALID_THIS, 63 ERR_MISSING_ARGS, 64 }, 65} = require('internal/errors'); 66 67const kData = Symbol('kData'); 68const kHandle = Symbol('kHandle'); 69const kIncrementsPortRef = Symbol('kIncrementsPortRef'); 70const kLastEventId = Symbol('kLastEventId'); 71const kName = Symbol('kName'); 72const kOrigin = Symbol('kOrigin'); 73const kOnMessage = Symbol('kOnMessage'); 74const kOnMessageError = Symbol('kOnMessageError'); 75const kPort = Symbol('kPort'); 76const kPorts = Symbol('kPorts'); 77const kWaitingStreams = Symbol('kWaitingStreams'); 78const kWritableCallbacks = Symbol('kWritableCallbacks'); 79const kSource = Symbol('kSource'); 80const kStartedReading = Symbol('kStartedReading'); 81const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback'); 82const kCurrentlyReceivingPorts = 83 SymbolFor('nodejs.internal.kCurrentlyReceivingPorts'); 84const kType = Symbol('kType'); 85 86const messageTypes = { 87 UP_AND_RUNNING: 'upAndRunning', 88 COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError', 89 ERROR_MESSAGE: 'errorMessage', 90 STDIO_PAYLOAD: 'stdioPayload', 91 STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData', 92 LOAD_SCRIPT: 'loadScript', 93}; 94 95// We have to mess with the MessagePort prototype a bit, so that a) we can make 96// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do 97// not provide methods that are not present in the Browser and not documented 98// on our side (e.g. stopMessagePort). 99const messagePortPrototypePropertyDescriptors = ObjectGetOwnPropertyDescriptors(MessagePort.prototype); 100const propertiesValues = ObjectValues(messagePortPrototypePropertyDescriptors); 101for (let i = 0; i < propertiesValues.length; i++) { 102 // We want to use null-prototype objects to not rely on globally mutable 103 // %Object.prototype%. 104 ObjectSetPrototypeOf(propertiesValues[i], null); 105} 106// Save a copy of the original set of methods as a shallow clone. 107const MessagePortPrototype = ObjectCreate( 108 ObjectGetPrototypeOf(MessagePort.prototype), 109 messagePortPrototypePropertyDescriptors); 110// Set up the new inheritance chain. 111ObjectSetPrototypeOf(MessagePort, NodeEventTarget); 112ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype); 113// Copy methods that are inherited from HandleWrap, because 114// changing the prototype of MessagePort.prototype implicitly removed them. 115MessagePort.prototype.ref = MessagePortPrototype.ref; 116MessagePort.prototype.unref = MessagePortPrototype.unref; 117MessagePort.prototype.hasRef = function() { 118 return !!FunctionPrototypeCall(MessagePortPrototype.hasRef, this); 119}; 120 121function validateMessagePort(port, name) { 122 if (!checkMessagePort(port)) 123 throw new ERR_INVALID_ARG_TYPE(name, 'MessagePort', port); 124} 125 126function isMessageEvent(value) { 127 return value != null && kData in value; 128} 129 130class MessageEvent extends Event { 131 constructor(type, { 132 data = null, 133 origin = '', 134 lastEventId = '', 135 source = null, 136 ports = [], 137 } = kEmptyObject) { 138 super(type); 139 this[kData] = data; 140 this[kOrigin] = `${origin}`; 141 this[kLastEventId] = `${lastEventId}`; 142 this[kSource] = source; 143 this[kPorts] = [...ports]; 144 145 if (this[kSource] !== null) 146 validateMessagePort(this[kSource], 'init.source'); 147 for (let i = 0; i < this[kPorts].length; i++) 148 validateMessagePort(this[kPorts][i], `init.ports[${i}]`); 149 } 150} 151 152ObjectDefineProperties(MessageEvent.prototype, { 153 data: { 154 __proto__: null, 155 get() { 156 if (!isMessageEvent(this)) 157 throw new ERR_INVALID_THIS('MessageEvent'); 158 return this[kData]; 159 }, 160 enumerable: true, 161 configurable: true, 162 set: undefined, 163 }, 164 origin: { 165 __proto__: null, 166 get() { 167 if (!isMessageEvent(this)) 168 throw new ERR_INVALID_THIS('MessageEvent'); 169 return this[kOrigin]; 170 }, 171 enumerable: true, 172 configurable: true, 173 set: undefined, 174 }, 175 lastEventId: { 176 __proto__: null, 177 get() { 178 if (!isMessageEvent(this)) 179 throw new ERR_INVALID_THIS('MessageEvent'); 180 return this[kLastEventId]; 181 }, 182 enumerable: true, 183 configurable: true, 184 set: undefined, 185 }, 186 source: { 187 __proto__: null, 188 get() { 189 if (!isMessageEvent(this)) 190 throw new ERR_INVALID_THIS('MessageEvent'); 191 return this[kSource]; 192 }, 193 enumerable: true, 194 configurable: true, 195 set: undefined, 196 }, 197 ports: { 198 __proto__: null, 199 get() { 200 if (!isMessageEvent(this)) 201 throw new ERR_INVALID_THIS('MessageEvent'); 202 return this[kPorts]; 203 }, 204 enumerable: true, 205 configurable: true, 206 set: undefined, 207 }, 208}); 209 210const originalCreateEvent = EventTarget.prototype[kCreateEvent]; 211ObjectDefineProperty( 212 MessagePort.prototype, 213 kCreateEvent, 214 { 215 __proto__: null, 216 value: function(data, type) { 217 if (type !== 'message' && type !== 'messageerror') { 218 return ReflectApply(originalCreateEvent, this, arguments); 219 } 220 const ports = this[kCurrentlyReceivingPorts]; 221 this[kCurrentlyReceivingPorts] = undefined; 222 return new MessageEvent(type, { data, ports }); 223 }, 224 configurable: false, 225 writable: false, 226 enumerable: false, 227 }); 228 229// This is called from inside the `MessagePort` constructor. 230function oninit() { 231 initNodeEventTarget(this); 232 setupPortReferencing(this, this, 'message'); 233 this[kCurrentlyReceivingPorts] = undefined; 234} 235 236defineEventHandler(MessagePort.prototype, 'message'); 237defineEventHandler(MessagePort.prototype, 'messageerror'); 238 239ObjectDefineProperty(MessagePort.prototype, onInitSymbol, { 240 __proto__: null, 241 enumerable: true, 242 writable: false, 243 value: oninit, 244}); 245 246class MessagePortCloseEvent extends Event { 247 constructor() { 248 super('close'); 249 } 250} 251 252// This is called after the underlying `uv_async_t` has been closed. 253function onclose() { 254 this.dispatchEvent(new MessagePortCloseEvent()); 255} 256 257ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, { 258 __proto__: null, 259 enumerable: false, 260 writable: false, 261 value: onclose, 262}); 263 264MessagePort.prototype.close = function(cb) { 265 if (typeof cb === 'function') 266 this.once('close', cb); 267 FunctionPrototypeCall(MessagePortPrototype.close, this); 268}; 269 270ObjectDefineProperty(MessagePort.prototype, inspect.custom, { 271 __proto__: null, 272 enumerable: false, 273 writable: false, 274 value: function inspect() { // eslint-disable-line func-name-matching 275 let ref; 276 try { 277 // This may throw when `this` does not refer to a native object, 278 // e.g. when accessing the prototype directly. 279 ref = FunctionPrototypeCall(MessagePortPrototype.hasRef, this); 280 } catch { return this; } 281 return ObjectAssign(ObjectCreate(MessagePort.prototype), 282 ref === undefined ? { 283 active: false, 284 } : { 285 active: true, 286 refed: ref, 287 }, 288 this); 289 }, 290}); 291 292function setupPortReferencing(port, eventEmitter, eventName) { 293 // Keep track of whether there are any workerMessage listeners: 294 // If there are some, ref() the channel so it keeps the event loop alive. 295 // If there are none or all are removed, unref() the channel so the worker 296 // can shutdown gracefully. 297 port.unref(); 298 eventEmitter.on('newListener', function(name) { 299 if (name === eventName) newListener(eventEmitter.listenerCount(name)); 300 }); 301 eventEmitter.on('removeListener', function(name) { 302 if (name === eventName) removeListener(eventEmitter.listenerCount(name)); 303 }); 304 const origNewListener = eventEmitter[kNewListener]; 305 eventEmitter[kNewListener] = function(size, type, ...args) { 306 if (type === eventName) newListener(size - 1); 307 return ReflectApply(origNewListener, this, arguments); 308 }; 309 const origRemoveListener = eventEmitter[kRemoveListener]; 310 eventEmitter[kRemoveListener] = function(size, type, ...args) { 311 if (type === eventName) removeListener(size); 312 return ReflectApply(origRemoveListener, this, arguments); 313 }; 314 315 function newListener(size) { 316 if (size === 0) { 317 port.ref(); 318 FunctionPrototypeCall(MessagePortPrototype.start, port); 319 } 320 } 321 322 function removeListener(size) { 323 if (size === 0) { 324 stopMessagePort(port); 325 port.unref(); 326 } 327 } 328} 329 330 331class ReadableWorkerStdio extends Readable { 332 constructor(port, name) { 333 super(); 334 this[kPort] = port; 335 this[kName] = name; 336 this[kIncrementsPortRef] = true; 337 this[kStartedReading] = false; 338 this.on('end', () => { 339 if (this[kStartedReading] && this[kIncrementsPortRef]) { 340 if (--this[kPort][kWaitingStreams] === 0) 341 this[kPort].unref(); 342 } 343 }); 344 } 345 346 _read() { 347 if (!this[kStartedReading] && this[kIncrementsPortRef]) { 348 this[kStartedReading] = true; 349 if (this[kPort][kWaitingStreams]++ === 0) 350 this[kPort].ref(); 351 } 352 353 this[kPort].postMessage({ 354 type: messageTypes.STDIO_WANTS_MORE_DATA, 355 stream: this[kName], 356 }); 357 } 358} 359 360class WritableWorkerStdio extends Writable { 361 constructor(port, name) { 362 super({ decodeStrings: false }); 363 this[kPort] = port; 364 this[kName] = name; 365 this[kWritableCallbacks] = []; 366 } 367 368 _writev(chunks, cb) { 369 this[kPort].postMessage({ 370 type: messageTypes.STDIO_PAYLOAD, 371 stream: this[kName], 372 chunks: ArrayPrototypeMap(chunks, 373 ({ chunk, encoding }) => ({ chunk, encoding })), 374 }); 375 ArrayPrototypePush(this[kWritableCallbacks], cb); 376 if (this[kPort][kWaitingStreams]++ === 0) 377 this[kPort].ref(); 378 } 379 380 _final(cb) { 381 this[kPort].postMessage({ 382 type: messageTypes.STDIO_PAYLOAD, 383 stream: this[kName], 384 chunks: [ { chunk: null, encoding: '' } ], 385 }); 386 cb(); 387 } 388 389 [kStdioWantsMoreDataCallback]() { 390 const cbs = this[kWritableCallbacks]; 391 this[kWritableCallbacks] = []; 392 ArrayPrototypeForEach(cbs, (cb) => cb()); 393 if ((this[kPort][kWaitingStreams] -= cbs.length) === 0) 394 this[kPort].unref(); 395 } 396} 397 398function createWorkerStdio() { 399 const port = getEnvMessagePort(); 400 port[kWaitingStreams] = 0; 401 return { 402 stdin: new ReadableWorkerStdio(port, 'stdin'), 403 stdout: new WritableWorkerStdio(port, 'stdout'), 404 stderr: new WritableWorkerStdio(port, 'stderr'), 405 }; 406} 407 408function receiveMessageOnPort(port) { 409 const message = receiveMessageOnPort_(port?.[kHandle] ?? port); 410 if (message === noMessageSymbol) return undefined; 411 return { message }; 412} 413 414function onMessageEvent(type, data) { 415 this.dispatchEvent(new MessageEvent(type, { data })); 416} 417 418function isBroadcastChannel(value) { 419 return value?.[kType] === 'BroadcastChannel'; 420} 421 422class BroadcastChannel extends EventTarget { 423 /** 424 * @param {string} name 425 */ 426 constructor(name) { 427 if (arguments.length === 0) 428 throw new ERR_MISSING_ARGS('name'); 429 super(); 430 this[kType] = 'BroadcastChannel'; 431 this[kName] = `${name}`; 432 this[kHandle] = broadcastChannel(this[kName]); 433 this[kOnMessage] = FunctionPrototypeBind(onMessageEvent, this, 'message'); 434 this[kOnMessageError] = 435 FunctionPrototypeBind(onMessageEvent, this, 'messageerror'); 436 this[kHandle].on('message', this[kOnMessage]); 437 this[kHandle].on('messageerror', this[kOnMessageError]); 438 } 439 440 [inspect.custom](depth, options) { 441 if (!isBroadcastChannel(this)) 442 throw new ERR_INVALID_THIS('BroadcastChannel'); 443 if (depth < 0) 444 return 'BroadcastChannel'; 445 446 const opts = { 447 ...options, 448 depth: options.depth == null ? null : options.depth - 1, 449 }; 450 451 return `BroadcastChannel ${inspect({ 452 name: this[kName], 453 active: this[kHandle] !== undefined, 454 }, opts)}`; 455 } 456 457 /** 458 * @type {string} 459 */ 460 get name() { 461 if (!isBroadcastChannel(this)) 462 throw new ERR_INVALID_THIS('BroadcastChannel'); 463 return this[kName]; 464 } 465 466 /** 467 * @returns {void} 468 */ 469 close() { 470 if (!isBroadcastChannel(this)) 471 throw new ERR_INVALID_THIS('BroadcastChannel'); 472 if (this[kHandle] === undefined) 473 return; 474 this[kHandle].off('message', this[kOnMessage]); 475 this[kHandle].off('messageerror', this[kOnMessageError]); 476 this[kOnMessage] = undefined; 477 this[kOnMessageError] = undefined; 478 this[kHandle].close(); 479 this[kHandle] = undefined; 480 } 481 482 /** 483 * 484 * @param {any} message 485 * @returns {void} 486 */ 487 postMessage(message) { 488 if (!isBroadcastChannel(this)) 489 throw new ERR_INVALID_THIS('BroadcastChannel'); 490 if (arguments.length === 0) 491 throw new ERR_MISSING_ARGS('message'); 492 if (this[kHandle] === undefined) 493 throw new DOMException('BroadcastChannel is closed.'); 494 if (this[kHandle].postMessage(message) === undefined) 495 throw new DOMException('Message could not be posted.'); 496 } 497 498 // The ref() method is Node.js specific and not part of the standard 499 // BroadcastChannel API definition. Typically we shouldn't extend Web 500 // Platform APIs with Node.js specific methods but ref and unref 501 // are a bit special. 502 /** 503 * @returns {BroadcastChannel} 504 */ 505 ref() { 506 if (!isBroadcastChannel(this)) 507 throw new ERR_INVALID_THIS('BroadcastChannel'); 508 if (this[kHandle]) 509 this[kHandle].ref(); 510 return this; 511 } 512 513 // The unref() method is Node.js specific and not part of the standard 514 // BroadcastChannel API definition. Typically we shouldn't extend Web 515 // Platform APIs with Node.js specific methods but ref and unref 516 // are a bit special. 517 /** 518 * @returns {BroadcastChannel} 519 */ 520 unref() { 521 if (!isBroadcastChannel(this)) 522 throw new ERR_INVALID_THIS('BroadcastChannel'); 523 if (this[kHandle]) 524 this[kHandle].unref(); 525 return this; 526 } 527} 528 529ObjectDefineProperties(BroadcastChannel.prototype, { 530 name: kEnumerableProperty, 531 close: kEnumerableProperty, 532 postMessage: kEnumerableProperty, 533}); 534 535defineEventHandler(BroadcastChannel.prototype, 'message'); 536defineEventHandler(BroadcastChannel.prototype, 'messageerror'); 537 538module.exports = { 539 drainMessagePort, 540 messageTypes, 541 kPort, 542 kIncrementsPortRef, 543 kWaitingStreams, 544 kStdioWantsMoreDataCallback, 545 moveMessagePortToContext, 546 MessagePort, 547 MessageChannel, 548 MessageEvent, 549 receiveMessageOnPort, 550 setupPortReferencing, 551 ReadableWorkerStdio, 552 WritableWorkerStdio, 553 createWorkerStdio, 554 BroadcastChannel, 555}; 556