• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayPrototypeForEach,
5  ArrayPrototypeMap,
6  ArrayPrototypePush,
7  FunctionPrototypeBind,
8  FunctionPrototypeCall,
9  ObjectAssign,
10  ObjectCreate,
11  ObjectDefineProperty,
12  ObjectDefineProperties,
13  ObjectGetOwnPropertyDescriptors,
14  ObjectGetPrototypeOf,
15  ObjectSetPrototypeOf,
16  ObjectValues,
17  ReflectApply,
18  Symbol,
19  SymbolFor,
20} = primordials;
21
22const {
23  kEmptyObject,
24  kEnumerableProperty,
25} = require('internal/util');
26
27const {
28  handle_onclose: handleOnCloseSymbol,
29  oninit: onInitSymbol,
30  no_message_symbol: noMessageSymbol,
31} = internalBinding('symbols');
32const {
33  MessagePort,
34  MessageChannel,
35  broadcastChannel,
36  drainMessagePort,
37  moveMessagePortToContext,
38  receiveMessageOnPort: receiveMessageOnPort_,
39  stopMessagePort,
40  checkMessagePort,
41  DOMException,
42} = internalBinding('messaging');
43const {
44  getEnvMessagePort,
45} = internalBinding('worker');
46
47const { Readable, Writable } = require('stream');
48const {
49  Event,
50  EventTarget,
51  NodeEventTarget,
52  defineEventHandler,
53  initNodeEventTarget,
54  kCreateEvent,
55  kNewListener,
56  kRemoveListener,
57} = require('internal/event_target');
58const { inspect } = require('internal/util/inspect');
59const {
60  codes: {
61    ERR_INVALID_ARG_TYPE,
62    ERR_INVALID_THIS,
63    ERR_MISSING_ARGS,
64  },
65} = require('internal/errors');
66
67const kData = Symbol('kData');
68const kHandle = Symbol('kHandle');
69const kIncrementsPortRef = Symbol('kIncrementsPortRef');
70const kLastEventId = Symbol('kLastEventId');
71const kName = Symbol('kName');
72const kOrigin = Symbol('kOrigin');
73const kOnMessage = Symbol('kOnMessage');
74const kOnMessageError = Symbol('kOnMessageError');
75const kPort = Symbol('kPort');
76const kPorts = Symbol('kPorts');
77const kWaitingStreams = Symbol('kWaitingStreams');
78const kWritableCallbacks = Symbol('kWritableCallbacks');
79const kSource = Symbol('kSource');
80const kStartedReading = Symbol('kStartedReading');
81const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
82const kCurrentlyReceivingPorts =
83  SymbolFor('nodejs.internal.kCurrentlyReceivingPorts');
84const kType = Symbol('kType');
85
86const messageTypes = {
87  UP_AND_RUNNING: 'upAndRunning',
88  COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError',
89  ERROR_MESSAGE: 'errorMessage',
90  STDIO_PAYLOAD: 'stdioPayload',
91  STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
92  LOAD_SCRIPT: 'loadScript',
93};
94
95// We have to mess with the MessagePort prototype a bit, so that a) we can make
96// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do
97// not provide methods that are not present in the Browser and not documented
98// on our side (e.g. stopMessagePort).
99const messagePortPrototypePropertyDescriptors = ObjectGetOwnPropertyDescriptors(MessagePort.prototype);
100const propertiesValues = ObjectValues(messagePortPrototypePropertyDescriptors);
101for (let i = 0; i < propertiesValues.length; i++) {
102  // We want to use null-prototype objects to not rely on globally mutable
103  // %Object.prototype%.
104  ObjectSetPrototypeOf(propertiesValues[i], null);
105}
106// Save a copy of the original set of methods as a shallow clone.
107const MessagePortPrototype = ObjectCreate(
108  ObjectGetPrototypeOf(MessagePort.prototype),
109  messagePortPrototypePropertyDescriptors);
110// Set up the new inheritance chain.
111ObjectSetPrototypeOf(MessagePort, NodeEventTarget);
112ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype);
113// Copy methods that are inherited from HandleWrap, because
114// changing the prototype of MessagePort.prototype implicitly removed them.
115MessagePort.prototype.ref = MessagePortPrototype.ref;
116MessagePort.prototype.unref = MessagePortPrototype.unref;
117MessagePort.prototype.hasRef = function() {
118  return !!FunctionPrototypeCall(MessagePortPrototype.hasRef, this);
119};
120
121function validateMessagePort(port, name) {
122  if (!checkMessagePort(port))
123    throw new ERR_INVALID_ARG_TYPE(name, 'MessagePort', port);
124}
125
126function isMessageEvent(value) {
127  return value != null && kData in value;
128}
129
130class MessageEvent extends Event {
131  constructor(type, {
132    data = null,
133    origin = '',
134    lastEventId = '',
135    source = null,
136    ports = [],
137  } = kEmptyObject) {
138    super(type);
139    this[kData] = data;
140    this[kOrigin] = `${origin}`;
141    this[kLastEventId] = `${lastEventId}`;
142    this[kSource] = source;
143    this[kPorts] = [...ports];
144
145    if (this[kSource] !== null)
146      validateMessagePort(this[kSource], 'init.source');
147    for (let i = 0; i < this[kPorts].length; i++)
148      validateMessagePort(this[kPorts][i], `init.ports[${i}]`);
149  }
150}
151
152ObjectDefineProperties(MessageEvent.prototype, {
153  data: {
154    __proto__: null,
155    get() {
156      if (!isMessageEvent(this))
157        throw new ERR_INVALID_THIS('MessageEvent');
158      return this[kData];
159    },
160    enumerable: true,
161    configurable: true,
162    set: undefined,
163  },
164  origin: {
165    __proto__: null,
166    get() {
167      if (!isMessageEvent(this))
168        throw new ERR_INVALID_THIS('MessageEvent');
169      return this[kOrigin];
170    },
171    enumerable: true,
172    configurable: true,
173    set: undefined,
174  },
175  lastEventId: {
176    __proto__: null,
177    get() {
178      if (!isMessageEvent(this))
179        throw new ERR_INVALID_THIS('MessageEvent');
180      return this[kLastEventId];
181    },
182    enumerable: true,
183    configurable: true,
184    set: undefined,
185  },
186  source: {
187    __proto__: null,
188    get() {
189      if (!isMessageEvent(this))
190        throw new ERR_INVALID_THIS('MessageEvent');
191      return this[kSource];
192    },
193    enumerable: true,
194    configurable: true,
195    set: undefined,
196  },
197  ports: {
198    __proto__: null,
199    get() {
200      if (!isMessageEvent(this))
201        throw new ERR_INVALID_THIS('MessageEvent');
202      return this[kPorts];
203    },
204    enumerable: true,
205    configurable: true,
206    set: undefined,
207  },
208});
209
210const originalCreateEvent = EventTarget.prototype[kCreateEvent];
211ObjectDefineProperty(
212  MessagePort.prototype,
213  kCreateEvent,
214  {
215    __proto__: null,
216    value: function(data, type) {
217      if (type !== 'message' && type !== 'messageerror') {
218        return ReflectApply(originalCreateEvent, this, arguments);
219      }
220      const ports = this[kCurrentlyReceivingPorts];
221      this[kCurrentlyReceivingPorts] = undefined;
222      return new MessageEvent(type, { data, ports });
223    },
224    configurable: false,
225    writable: false,
226    enumerable: false,
227  });
228
229// This is called from inside the `MessagePort` constructor.
230function oninit() {
231  initNodeEventTarget(this);
232  setupPortReferencing(this, this, 'message');
233  this[kCurrentlyReceivingPorts] = undefined;
234}
235
236defineEventHandler(MessagePort.prototype, 'message');
237defineEventHandler(MessagePort.prototype, 'messageerror');
238
239ObjectDefineProperty(MessagePort.prototype, onInitSymbol, {
240  __proto__: null,
241  enumerable: true,
242  writable: false,
243  value: oninit,
244});
245
246class MessagePortCloseEvent extends Event {
247  constructor() {
248    super('close');
249  }
250}
251
252// This is called after the underlying `uv_async_t` has been closed.
253function onclose() {
254  this.dispatchEvent(new MessagePortCloseEvent());
255}
256
257ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, {
258  __proto__: null,
259  enumerable: false,
260  writable: false,
261  value: onclose,
262});
263
264MessagePort.prototype.close = function(cb) {
265  if (typeof cb === 'function')
266    this.once('close', cb);
267  FunctionPrototypeCall(MessagePortPrototype.close, this);
268};
269
270ObjectDefineProperty(MessagePort.prototype, inspect.custom, {
271  __proto__: null,
272  enumerable: false,
273  writable: false,
274  value: function inspect() {  // eslint-disable-line func-name-matching
275    let ref;
276    try {
277      // This may throw when `this` does not refer to a native object,
278      // e.g. when accessing the prototype directly.
279      ref = FunctionPrototypeCall(MessagePortPrototype.hasRef, this);
280    } catch { return this; }
281    return ObjectAssign(ObjectCreate(MessagePort.prototype),
282                        ref === undefined ? {
283                          active: false,
284                        } : {
285                          active: true,
286                          refed: ref,
287                        },
288                        this);
289  },
290});
291
292function setupPortReferencing(port, eventEmitter, eventName) {
293  // Keep track of whether there are any workerMessage listeners:
294  // If there are some, ref() the channel so it keeps the event loop alive.
295  // If there are none or all are removed, unref() the channel so the worker
296  // can shutdown gracefully.
297  port.unref();
298  eventEmitter.on('newListener', function(name) {
299    if (name === eventName) newListener(eventEmitter.listenerCount(name));
300  });
301  eventEmitter.on('removeListener', function(name) {
302    if (name === eventName) removeListener(eventEmitter.listenerCount(name));
303  });
304  const origNewListener = eventEmitter[kNewListener];
305  eventEmitter[kNewListener] = function(size, type, ...args) {
306    if (type === eventName) newListener(size - 1);
307    return ReflectApply(origNewListener, this, arguments);
308  };
309  const origRemoveListener = eventEmitter[kRemoveListener];
310  eventEmitter[kRemoveListener] = function(size, type, ...args) {
311    if (type === eventName) removeListener(size);
312    return ReflectApply(origRemoveListener, this, arguments);
313  };
314
315  function newListener(size) {
316    if (size === 0) {
317      port.ref();
318      FunctionPrototypeCall(MessagePortPrototype.start, port);
319    }
320  }
321
322  function removeListener(size) {
323    if (size === 0) {
324      stopMessagePort(port);
325      port.unref();
326    }
327  }
328}
329
330
331class ReadableWorkerStdio extends Readable {
332  constructor(port, name) {
333    super();
334    this[kPort] = port;
335    this[kName] = name;
336    this[kIncrementsPortRef] = true;
337    this[kStartedReading] = false;
338    this.on('end', () => {
339      if (this[kStartedReading] && this[kIncrementsPortRef]) {
340        if (--this[kPort][kWaitingStreams] === 0)
341          this[kPort].unref();
342      }
343    });
344  }
345
346  _read() {
347    if (!this[kStartedReading] && this[kIncrementsPortRef]) {
348      this[kStartedReading] = true;
349      if (this[kPort][kWaitingStreams]++ === 0)
350        this[kPort].ref();
351    }
352
353    this[kPort].postMessage({
354      type: messageTypes.STDIO_WANTS_MORE_DATA,
355      stream: this[kName],
356    });
357  }
358}
359
360class WritableWorkerStdio extends Writable {
361  constructor(port, name) {
362    super({ decodeStrings: false });
363    this[kPort] = port;
364    this[kName] = name;
365    this[kWritableCallbacks] = [];
366  }
367
368  _writev(chunks, cb) {
369    this[kPort].postMessage({
370      type: messageTypes.STDIO_PAYLOAD,
371      stream: this[kName],
372      chunks: ArrayPrototypeMap(chunks,
373                                ({ chunk, encoding }) => ({ chunk, encoding })),
374    });
375    ArrayPrototypePush(this[kWritableCallbacks], cb);
376    if (this[kPort][kWaitingStreams]++ === 0)
377      this[kPort].ref();
378  }
379
380  _final(cb) {
381    this[kPort].postMessage({
382      type: messageTypes.STDIO_PAYLOAD,
383      stream: this[kName],
384      chunks: [ { chunk: null, encoding: '' } ],
385    });
386    cb();
387  }
388
389  [kStdioWantsMoreDataCallback]() {
390    const cbs = this[kWritableCallbacks];
391    this[kWritableCallbacks] = [];
392    ArrayPrototypeForEach(cbs, (cb) => cb());
393    if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
394      this[kPort].unref();
395  }
396}
397
398function createWorkerStdio() {
399  const port = getEnvMessagePort();
400  port[kWaitingStreams] = 0;
401  return {
402    stdin: new ReadableWorkerStdio(port, 'stdin'),
403    stdout: new WritableWorkerStdio(port, 'stdout'),
404    stderr: new WritableWorkerStdio(port, 'stderr'),
405  };
406}
407
408function receiveMessageOnPort(port) {
409  const message = receiveMessageOnPort_(port?.[kHandle] ?? port);
410  if (message === noMessageSymbol) return undefined;
411  return { message };
412}
413
414function onMessageEvent(type, data) {
415  this.dispatchEvent(new MessageEvent(type, { data }));
416}
417
418function isBroadcastChannel(value) {
419  return value?.[kType] === 'BroadcastChannel';
420}
421
422class BroadcastChannel extends EventTarget {
423  /**
424   * @param {string} name
425   */
426  constructor(name) {
427    if (arguments.length === 0)
428      throw new ERR_MISSING_ARGS('name');
429    super();
430    this[kType] = 'BroadcastChannel';
431    this[kName] = `${name}`;
432    this[kHandle] = broadcastChannel(this[kName]);
433    this[kOnMessage] = FunctionPrototypeBind(onMessageEvent, this, 'message');
434    this[kOnMessageError] =
435      FunctionPrototypeBind(onMessageEvent, this, 'messageerror');
436    this[kHandle].on('message', this[kOnMessage]);
437    this[kHandle].on('messageerror', this[kOnMessageError]);
438  }
439
440  [inspect.custom](depth, options) {
441    if (!isBroadcastChannel(this))
442      throw new ERR_INVALID_THIS('BroadcastChannel');
443    if (depth < 0)
444      return 'BroadcastChannel';
445
446    const opts = {
447      ...options,
448      depth: options.depth == null ? null : options.depth - 1,
449    };
450
451    return `BroadcastChannel ${inspect({
452      name: this[kName],
453      active: this[kHandle] !== undefined,
454    }, opts)}`;
455  }
456
457  /**
458   * @type {string}
459   */
460  get name() {
461    if (!isBroadcastChannel(this))
462      throw new ERR_INVALID_THIS('BroadcastChannel');
463    return this[kName];
464  }
465
466  /**
467   * @returns {void}
468   */
469  close() {
470    if (!isBroadcastChannel(this))
471      throw new ERR_INVALID_THIS('BroadcastChannel');
472    if (this[kHandle] === undefined)
473      return;
474    this[kHandle].off('message', this[kOnMessage]);
475    this[kHandle].off('messageerror', this[kOnMessageError]);
476    this[kOnMessage] = undefined;
477    this[kOnMessageError] = undefined;
478    this[kHandle].close();
479    this[kHandle] = undefined;
480  }
481
482  /**
483   *
484   * @param {any} message
485   * @returns {void}
486   */
487  postMessage(message) {
488    if (!isBroadcastChannel(this))
489      throw new ERR_INVALID_THIS('BroadcastChannel');
490    if (arguments.length === 0)
491      throw new ERR_MISSING_ARGS('message');
492    if (this[kHandle] === undefined)
493      throw new DOMException('BroadcastChannel is closed.');
494    if (this[kHandle].postMessage(message) === undefined)
495      throw new DOMException('Message could not be posted.');
496  }
497
498  // The ref() method is Node.js specific and not part of the standard
499  // BroadcastChannel API definition. Typically we shouldn't extend Web
500  // Platform APIs with Node.js specific methods but ref and unref
501  // are a bit special.
502  /**
503   * @returns {BroadcastChannel}
504   */
505  ref() {
506    if (!isBroadcastChannel(this))
507      throw new ERR_INVALID_THIS('BroadcastChannel');
508    if (this[kHandle])
509      this[kHandle].ref();
510    return this;
511  }
512
513  // The unref() method is Node.js specific and not part of the standard
514  // BroadcastChannel API definition. Typically we shouldn't extend Web
515  // Platform APIs with Node.js specific methods but ref and unref
516  // are a bit special.
517  /**
518   * @returns {BroadcastChannel}
519   */
520  unref() {
521    if (!isBroadcastChannel(this))
522      throw new ERR_INVALID_THIS('BroadcastChannel');
523    if (this[kHandle])
524      this[kHandle].unref();
525    return this;
526  }
527}
528
529ObjectDefineProperties(BroadcastChannel.prototype, {
530  name: kEnumerableProperty,
531  close: kEnumerableProperty,
532  postMessage: kEnumerableProperty,
533});
534
535defineEventHandler(BroadcastChannel.prototype, 'message');
536defineEventHandler(BroadcastChannel.prototype, 'messageerror');
537
538module.exports = {
539  drainMessagePort,
540  messageTypes,
541  kPort,
542  kIncrementsPortRef,
543  kWaitingStreams,
544  kStdioWantsMoreDataCallback,
545  moveMessagePortToContext,
546  MessagePort,
547  MessageChannel,
548  MessageEvent,
549  receiveMessageOnPort,
550  setupPortReferencing,
551  ReadableWorkerStdio,
552  WritableWorkerStdio,
553  createWorkerStdio,
554  BroadcastChannel,
555};
556