1'use strict'; 2 3const { 4 ArrayPrototypeForEach, 5 ArrayPrototypeMap, 6 ArrayPrototypePush, 7 FunctionPrototypeBind, 8 FunctionPrototypeCall, 9 ObjectAssign, 10 ObjectCreate, 11 ObjectDefineProperty, 12 ObjectDefineProperties, 13 ObjectGetOwnPropertyDescriptors, 14 ObjectGetPrototypeOf, 15 ObjectSetPrototypeOf, 16 ObjectValues, 17 ReflectApply, 18 Symbol, 19 SymbolFor, 20} = primordials; 21 22const { 23 kEmptyObject, 24 kEnumerableProperty, 25 setOwnProperty, 26} = require('internal/util'); 27 28const { 29 handle_onclose: handleOnCloseSymbol, 30 oninit: onInitSymbol, 31 no_message_symbol: noMessageSymbol, 32} = internalBinding('symbols'); 33const { 34 MessagePort, 35 MessageChannel, 36 broadcastChannel, 37 drainMessagePort, 38 moveMessagePortToContext, 39 receiveMessageOnPort: receiveMessageOnPort_, 40 stopMessagePort, 41 checkMessagePort, 42 DOMException, 43} = internalBinding('messaging'); 44const { 45 getEnvMessagePort, 46} = internalBinding('worker'); 47 48const { Readable, Writable } = require('stream'); 49const { 50 Event, 51 EventTarget, 52 NodeEventTarget, 53 defineEventHandler, 54 initNodeEventTarget, 55 kCreateEvent, 56 kNewListener, 57 kRemoveListener, 58} = require('internal/event_target'); 59const { inspect } = require('internal/util/inspect'); 60const { 61 codes: { 62 ERR_INVALID_ARG_TYPE, 63 ERR_INVALID_THIS, 64 ERR_MISSING_ARGS, 65 }, 66} = require('internal/errors'); 67 68const kData = Symbol('kData'); 69const kHandle = Symbol('kHandle'); 70const kIncrementsPortRef = Symbol('kIncrementsPortRef'); 71const kLastEventId = Symbol('kLastEventId'); 72const kName = Symbol('kName'); 73const kOrigin = Symbol('kOrigin'); 74const kOnMessage = Symbol('kOnMessage'); 75const kOnMessageError = Symbol('kOnMessageError'); 76const kPort = Symbol('kPort'); 77const kPorts = Symbol('kPorts'); 78const kWaitingStreams = Symbol('kWaitingStreams'); 79const kWritableCallbacks = Symbol('kWritableCallbacks'); 80const kSource = Symbol('kSource'); 81const kStartedReading = Symbol('kStartedReading'); 82const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback'); 83const kCurrentlyReceivingPorts = 84 SymbolFor('nodejs.internal.kCurrentlyReceivingPorts'); 85const kType = Symbol('kType'); 86 87const messageTypes = { 88 UP_AND_RUNNING: 'upAndRunning', 89 COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError', 90 ERROR_MESSAGE: 'errorMessage', 91 STDIO_PAYLOAD: 'stdioPayload', 92 STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData', 93 LOAD_SCRIPT: 'loadScript', 94}; 95 96// We have to mess with the MessagePort prototype a bit, so that a) we can make 97// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do 98// not provide methods that are not present in the Browser and not documented 99// on our side (e.g. stopMessagePort). 100const messagePortPrototypePropertyDescriptors = ObjectGetOwnPropertyDescriptors(MessagePort.prototype); 101const propertiesValues = ObjectValues(messagePortPrototypePropertyDescriptors); 102for (let i = 0; i < propertiesValues.length; i++) { 103 // We want to use null-prototype objects to not rely on globally mutable 104 // %Object.prototype%. 105 ObjectSetPrototypeOf(propertiesValues[i], null); 106} 107// Save a copy of the original set of methods as a shallow clone. 108const MessagePortPrototype = ObjectCreate( 109 ObjectGetPrototypeOf(MessagePort.prototype), 110 messagePortPrototypePropertyDescriptors); 111// Set up the new inheritance chain. 112ObjectSetPrototypeOf(MessagePort, NodeEventTarget); 113ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype); 114// Copy methods that are inherited from HandleWrap, because 115// changing the prototype of MessagePort.prototype implicitly removed them. 116MessagePort.prototype.ref = MessagePortPrototype.ref; 117MessagePort.prototype.unref = MessagePortPrototype.unref; 118MessagePort.prototype.hasRef = function() { 119 return !!FunctionPrototypeCall(MessagePortPrototype.hasRef, this); 120}; 121 122function validateMessagePort(port, name) { 123 if (!checkMessagePort(port)) 124 throw new ERR_INVALID_ARG_TYPE(name, 'MessagePort', port); 125} 126 127function isMessageEvent(value) { 128 return value != null && kData in value; 129} 130 131class MessageEvent extends Event { 132 constructor(type, { 133 data = null, 134 origin = '', 135 lastEventId = '', 136 source = null, 137 ports = [], 138 } = kEmptyObject) { 139 super(type); 140 this[kData] = data; 141 this[kOrigin] = `${origin}`; 142 this[kLastEventId] = `${lastEventId}`; 143 this[kSource] = source; 144 this[kPorts] = [...ports]; 145 146 if (this[kSource] !== null) 147 validateMessagePort(this[kSource], 'init.source'); 148 for (let i = 0; i < this[kPorts].length; i++) 149 validateMessagePort(this[kPorts][i], `init.ports[${i}]`); 150 } 151} 152 153ObjectDefineProperties(MessageEvent.prototype, { 154 data: { 155 __proto__: null, 156 get() { 157 if (!isMessageEvent(this)) 158 throw new ERR_INVALID_THIS('MessageEvent'); 159 return this[kData]; 160 }, 161 enumerable: true, 162 configurable: true, 163 set: undefined, 164 }, 165 origin: { 166 __proto__: null, 167 get() { 168 if (!isMessageEvent(this)) 169 throw new ERR_INVALID_THIS('MessageEvent'); 170 return this[kOrigin]; 171 }, 172 enumerable: true, 173 configurable: true, 174 set: undefined, 175 }, 176 lastEventId: { 177 __proto__: null, 178 get() { 179 if (!isMessageEvent(this)) 180 throw new ERR_INVALID_THIS('MessageEvent'); 181 return this[kLastEventId]; 182 }, 183 enumerable: true, 184 configurable: true, 185 set: undefined, 186 }, 187 source: { 188 __proto__: null, 189 get() { 190 if (!isMessageEvent(this)) 191 throw new ERR_INVALID_THIS('MessageEvent'); 192 return this[kSource]; 193 }, 194 enumerable: true, 195 configurable: true, 196 set: undefined, 197 }, 198 ports: { 199 __proto__: null, 200 get() { 201 if (!isMessageEvent(this)) 202 throw new ERR_INVALID_THIS('MessageEvent'); 203 return this[kPorts]; 204 }, 205 enumerable: true, 206 configurable: true, 207 set: undefined, 208 }, 209}); 210 211const originalCreateEvent = EventTarget.prototype[kCreateEvent]; 212ObjectDefineProperty( 213 MessagePort.prototype, 214 kCreateEvent, 215 { 216 __proto__: null, 217 value: function(data, type) { 218 if (type !== 'message' && type !== 'messageerror') { 219 return ReflectApply(originalCreateEvent, this, arguments); 220 } 221 const ports = this[kCurrentlyReceivingPorts]; 222 this[kCurrentlyReceivingPorts] = undefined; 223 return new MessageEvent(type, { data, ports }); 224 }, 225 configurable: false, 226 writable: false, 227 enumerable: false, 228 }); 229 230// This is called from inside the `MessagePort` constructor. 231function oninit() { 232 initNodeEventTarget(this); 233 setupPortReferencing(this, this, 'message'); 234 this[kCurrentlyReceivingPorts] = undefined; 235} 236 237defineEventHandler(MessagePort.prototype, 'message'); 238defineEventHandler(MessagePort.prototype, 'messageerror'); 239 240ObjectDefineProperty(MessagePort.prototype, onInitSymbol, { 241 __proto__: null, 242 enumerable: true, 243 writable: false, 244 value: oninit, 245}); 246 247class MessagePortCloseEvent extends Event { 248 constructor() { 249 super('close'); 250 } 251} 252 253// This is called after the underlying `uv_async_t` has been closed. 254function onclose() { 255 this.dispatchEvent(new MessagePortCloseEvent()); 256} 257 258ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, { 259 __proto__: null, 260 enumerable: false, 261 writable: false, 262 value: onclose, 263}); 264 265MessagePort.prototype.close = function(cb) { 266 if (typeof cb === 'function') 267 this.once('close', cb); 268 FunctionPrototypeCall(MessagePortPrototype.close, this); 269}; 270 271ObjectDefineProperty(MessagePort.prototype, inspect.custom, { 272 __proto__: null, 273 enumerable: false, 274 writable: false, 275 value: function inspect() { // eslint-disable-line func-name-matching 276 let ref; 277 try { 278 // This may throw when `this` does not refer to a native object, 279 // e.g. when accessing the prototype directly. 280 ref = FunctionPrototypeCall(MessagePortPrototype.hasRef, this); 281 } catch { return this; } 282 return ObjectAssign(ObjectCreate(MessagePort.prototype), 283 ref === undefined ? { 284 active: false, 285 } : { 286 active: true, 287 refed: ref, 288 }, 289 this); 290 }, 291}); 292 293function setupPortReferencing(port, eventEmitter, eventName) { 294 // Keep track of whether there are any workerMessage listeners: 295 // If there are some, ref() the channel so it keeps the event loop alive. 296 // If there are none or all are removed, unref() the channel so the worker 297 // can shutdown gracefully. 298 port.unref(); 299 eventEmitter.on('newListener', function(name) { 300 if (name === eventName) newListener(eventEmitter.listenerCount(name)); 301 }); 302 eventEmitter.on('removeListener', function(name) { 303 if (name === eventName) removeListener(eventEmitter.listenerCount(name)); 304 }); 305 const origNewListener = eventEmitter[kNewListener]; 306 setOwnProperty(eventEmitter, kNewListener, function(size, type, ...args) { 307 if (type === eventName) newListener(size - 1); 308 return ReflectApply(origNewListener, this, arguments); 309 }); 310 const origRemoveListener = eventEmitter[kRemoveListener]; 311 setOwnProperty(eventEmitter, kRemoveListener, function(size, type, ...args) { 312 if (type === eventName) removeListener(size); 313 return ReflectApply(origRemoveListener, this, arguments); 314 }); 315 316 function newListener(size) { 317 if (size === 0) { 318 port.ref(); 319 FunctionPrototypeCall(MessagePortPrototype.start, port); 320 } 321 } 322 323 function removeListener(size) { 324 if (size === 0) { 325 stopMessagePort(port); 326 port.unref(); 327 } 328 } 329} 330 331 332class ReadableWorkerStdio extends Readable { 333 constructor(port, name) { 334 super(); 335 this[kPort] = port; 336 this[kName] = name; 337 this[kIncrementsPortRef] = true; 338 this[kStartedReading] = false; 339 this.on('end', () => { 340 if (this[kStartedReading] && this[kIncrementsPortRef]) { 341 if (--this[kPort][kWaitingStreams] === 0) 342 this[kPort].unref(); 343 } 344 }); 345 } 346 347 _read() { 348 if (!this[kStartedReading] && this[kIncrementsPortRef]) { 349 this[kStartedReading] = true; 350 if (this[kPort][kWaitingStreams]++ === 0) 351 this[kPort].ref(); 352 } 353 354 this[kPort].postMessage({ 355 type: messageTypes.STDIO_WANTS_MORE_DATA, 356 stream: this[kName], 357 }); 358 } 359} 360 361class WritableWorkerStdio extends Writable { 362 constructor(port, name) { 363 super({ decodeStrings: false }); 364 this[kPort] = port; 365 this[kName] = name; 366 this[kWritableCallbacks] = []; 367 } 368 369 _writev(chunks, cb) { 370 this[kPort].postMessage({ 371 type: messageTypes.STDIO_PAYLOAD, 372 stream: this[kName], 373 chunks: ArrayPrototypeMap(chunks, 374 ({ chunk, encoding }) => ({ chunk, encoding })), 375 }); 376 ArrayPrototypePush(this[kWritableCallbacks], cb); 377 if (this[kPort][kWaitingStreams]++ === 0) 378 this[kPort].ref(); 379 } 380 381 _final(cb) { 382 this[kPort].postMessage({ 383 type: messageTypes.STDIO_PAYLOAD, 384 stream: this[kName], 385 chunks: [ { chunk: null, encoding: '' } ], 386 }); 387 cb(); 388 } 389 390 [kStdioWantsMoreDataCallback]() { 391 const cbs = this[kWritableCallbacks]; 392 this[kWritableCallbacks] = []; 393 ArrayPrototypeForEach(cbs, (cb) => cb()); 394 if ((this[kPort][kWaitingStreams] -= cbs.length) === 0) 395 this[kPort].unref(); 396 } 397} 398 399function createWorkerStdio() { 400 const port = getEnvMessagePort(); 401 port[kWaitingStreams] = 0; 402 return { 403 stdin: new ReadableWorkerStdio(port, 'stdin'), 404 stdout: new WritableWorkerStdio(port, 'stdout'), 405 stderr: new WritableWorkerStdio(port, 'stderr'), 406 }; 407} 408 409function receiveMessageOnPort(port) { 410 const message = receiveMessageOnPort_(port?.[kHandle] ?? port); 411 if (message === noMessageSymbol) return undefined; 412 return { message }; 413} 414 415function onMessageEvent(type, data) { 416 this.dispatchEvent(new MessageEvent(type, { data })); 417} 418 419function isBroadcastChannel(value) { 420 return value?.[kType] === 'BroadcastChannel'; 421} 422 423class BroadcastChannel extends EventTarget { 424 /** 425 * @param {string} name 426 */ 427 constructor(name) { 428 if (arguments.length === 0) 429 throw new ERR_MISSING_ARGS('name'); 430 super(); 431 this[kType] = 'BroadcastChannel'; 432 this[kName] = `${name}`; 433 this[kHandle] = broadcastChannel(this[kName]); 434 this[kOnMessage] = FunctionPrototypeBind(onMessageEvent, this, 'message'); 435 this[kOnMessageError] = 436 FunctionPrototypeBind(onMessageEvent, this, 'messageerror'); 437 this[kHandle].on('message', this[kOnMessage]); 438 this[kHandle].on('messageerror', this[kOnMessageError]); 439 } 440 441 [inspect.custom](depth, options) { 442 if (!isBroadcastChannel(this)) 443 throw new ERR_INVALID_THIS('BroadcastChannel'); 444 if (depth < 0) 445 return 'BroadcastChannel'; 446 447 const opts = { 448 ...options, 449 depth: options.depth == null ? null : options.depth - 1, 450 }; 451 452 return `BroadcastChannel ${inspect({ 453 name: this[kName], 454 active: this[kHandle] !== undefined, 455 }, opts)}`; 456 } 457 458 /** 459 * @type {string} 460 */ 461 get name() { 462 if (!isBroadcastChannel(this)) 463 throw new ERR_INVALID_THIS('BroadcastChannel'); 464 return this[kName]; 465 } 466 467 /** 468 * @returns {void} 469 */ 470 close() { 471 if (!isBroadcastChannel(this)) 472 throw new ERR_INVALID_THIS('BroadcastChannel'); 473 if (this[kHandle] === undefined) 474 return; 475 this[kHandle].off('message', this[kOnMessage]); 476 this[kHandle].off('messageerror', this[kOnMessageError]); 477 this[kOnMessage] = undefined; 478 this[kOnMessageError] = undefined; 479 this[kHandle].close(); 480 this[kHandle] = undefined; 481 } 482 483 /** 484 * 485 * @param {any} message 486 * @returns {void} 487 */ 488 postMessage(message) { 489 if (!isBroadcastChannel(this)) 490 throw new ERR_INVALID_THIS('BroadcastChannel'); 491 if (arguments.length === 0) 492 throw new ERR_MISSING_ARGS('message'); 493 if (this[kHandle] === undefined) 494 throw new DOMException('BroadcastChannel is closed.'); 495 if (this[kHandle].postMessage(message) === undefined) 496 throw new DOMException('Message could not be posted.'); 497 } 498 499 // The ref() method is Node.js specific and not part of the standard 500 // BroadcastChannel API definition. Typically we shouldn't extend Web 501 // Platform APIs with Node.js specific methods but ref and unref 502 // are a bit special. 503 /** 504 * @returns {BroadcastChannel} 505 */ 506 ref() { 507 if (!isBroadcastChannel(this)) 508 throw new ERR_INVALID_THIS('BroadcastChannel'); 509 if (this[kHandle]) 510 this[kHandle].ref(); 511 return this; 512 } 513 514 // The unref() method is Node.js specific and not part of the standard 515 // BroadcastChannel API definition. Typically we shouldn't extend Web 516 // Platform APIs with Node.js specific methods but ref and unref 517 // are a bit special. 518 /** 519 * @returns {BroadcastChannel} 520 */ 521 unref() { 522 if (!isBroadcastChannel(this)) 523 throw new ERR_INVALID_THIS('BroadcastChannel'); 524 if (this[kHandle]) 525 this[kHandle].unref(); 526 return this; 527 } 528} 529 530ObjectDefineProperties(BroadcastChannel.prototype, { 531 name: kEnumerableProperty, 532 close: kEnumerableProperty, 533 postMessage: kEnumerableProperty, 534}); 535 536defineEventHandler(BroadcastChannel.prototype, 'message'); 537defineEventHandler(BroadcastChannel.prototype, 'messageerror'); 538 539module.exports = { 540 drainMessagePort, 541 messageTypes, 542 kPort, 543 kIncrementsPortRef, 544 kWaitingStreams, 545 kStdioWantsMoreDataCallback, 546 moveMessagePortToContext, 547 MessagePort, 548 MessageChannel, 549 MessageEvent, 550 receiveMessageOnPort, 551 setupPortReferencing, 552 ReadableWorkerStdio, 553 WritableWorkerStdio, 554 createWorkerStdio, 555 BroadcastChannel, 556}; 557