• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayPrototypeForEach,
5  ArrayPrototypeMap,
6  ArrayPrototypePush,
7  FunctionPrototypeBind,
8  FunctionPrototypeCall,
9  ObjectAssign,
10  ObjectCreate,
11  ObjectDefineProperty,
12  ObjectDefineProperties,
13  ObjectGetOwnPropertyDescriptors,
14  ObjectGetPrototypeOf,
15  ObjectSetPrototypeOf,
16  ObjectValues,
17  ReflectApply,
18  Symbol,
19  SymbolFor,
20} = primordials;
21
22const {
23  kEmptyObject,
24  kEnumerableProperty,
25  setOwnProperty,
26} = require('internal/util');
27
28const {
29  handle_onclose: handleOnCloseSymbol,
30  oninit: onInitSymbol,
31  no_message_symbol: noMessageSymbol,
32} = internalBinding('symbols');
33const {
34  MessagePort,
35  MessageChannel,
36  broadcastChannel,
37  drainMessagePort,
38  moveMessagePortToContext,
39  receiveMessageOnPort: receiveMessageOnPort_,
40  stopMessagePort,
41  checkMessagePort,
42  DOMException,
43} = internalBinding('messaging');
44const {
45  getEnvMessagePort,
46} = internalBinding('worker');
47
48const { Readable, Writable } = require('stream');
49const {
50  Event,
51  EventTarget,
52  NodeEventTarget,
53  defineEventHandler,
54  initNodeEventTarget,
55  kCreateEvent,
56  kNewListener,
57  kRemoveListener,
58} = require('internal/event_target');
59const { inspect } = require('internal/util/inspect');
60const {
61  codes: {
62    ERR_INVALID_ARG_TYPE,
63    ERR_INVALID_THIS,
64    ERR_MISSING_ARGS,
65  },
66} = require('internal/errors');
67
68const kData = Symbol('kData');
69const kHandle = Symbol('kHandle');
70const kIncrementsPortRef = Symbol('kIncrementsPortRef');
71const kLastEventId = Symbol('kLastEventId');
72const kName = Symbol('kName');
73const kOrigin = Symbol('kOrigin');
74const kOnMessage = Symbol('kOnMessage');
75const kOnMessageError = Symbol('kOnMessageError');
76const kPort = Symbol('kPort');
77const kPorts = Symbol('kPorts');
78const kWaitingStreams = Symbol('kWaitingStreams');
79const kWritableCallbacks = Symbol('kWritableCallbacks');
80const kSource = Symbol('kSource');
81const kStartedReading = Symbol('kStartedReading');
82const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
83const kCurrentlyReceivingPorts =
84  SymbolFor('nodejs.internal.kCurrentlyReceivingPorts');
85const kType = Symbol('kType');
86
87const messageTypes = {
88  UP_AND_RUNNING: 'upAndRunning',
89  COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError',
90  ERROR_MESSAGE: 'errorMessage',
91  STDIO_PAYLOAD: 'stdioPayload',
92  STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
93  LOAD_SCRIPT: 'loadScript',
94};
95
96// We have to mess with the MessagePort prototype a bit, so that a) we can make
97// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do
98// not provide methods that are not present in the Browser and not documented
99// on our side (e.g. stopMessagePort).
100const messagePortPrototypePropertyDescriptors = ObjectGetOwnPropertyDescriptors(MessagePort.prototype);
101const propertiesValues = ObjectValues(messagePortPrototypePropertyDescriptors);
102for (let i = 0; i < propertiesValues.length; i++) {
103  // We want to use null-prototype objects to not rely on globally mutable
104  // %Object.prototype%.
105  ObjectSetPrototypeOf(propertiesValues[i], null);
106}
107// Save a copy of the original set of methods as a shallow clone.
108const MessagePortPrototype = ObjectCreate(
109  ObjectGetPrototypeOf(MessagePort.prototype),
110  messagePortPrototypePropertyDescriptors);
111// Set up the new inheritance chain.
112ObjectSetPrototypeOf(MessagePort, NodeEventTarget);
113ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype);
114// Copy methods that are inherited from HandleWrap, because
115// changing the prototype of MessagePort.prototype implicitly removed them.
116MessagePort.prototype.ref = MessagePortPrototype.ref;
117MessagePort.prototype.unref = MessagePortPrototype.unref;
118MessagePort.prototype.hasRef = function() {
119  return !!FunctionPrototypeCall(MessagePortPrototype.hasRef, this);
120};
121
122function validateMessagePort(port, name) {
123  if (!checkMessagePort(port))
124    throw new ERR_INVALID_ARG_TYPE(name, 'MessagePort', port);
125}
126
127function isMessageEvent(value) {
128  return value != null && kData in value;
129}
130
131class MessageEvent extends Event {
132  constructor(type, {
133    data = null,
134    origin = '',
135    lastEventId = '',
136    source = null,
137    ports = [],
138  } = kEmptyObject) {
139    super(type);
140    this[kData] = data;
141    this[kOrigin] = `${origin}`;
142    this[kLastEventId] = `${lastEventId}`;
143    this[kSource] = source;
144    this[kPorts] = [...ports];
145
146    if (this[kSource] !== null)
147      validateMessagePort(this[kSource], 'init.source');
148    for (let i = 0; i < this[kPorts].length; i++)
149      validateMessagePort(this[kPorts][i], `init.ports[${i}]`);
150  }
151}
152
153ObjectDefineProperties(MessageEvent.prototype, {
154  data: {
155    __proto__: null,
156    get() {
157      if (!isMessageEvent(this))
158        throw new ERR_INVALID_THIS('MessageEvent');
159      return this[kData];
160    },
161    enumerable: true,
162    configurable: true,
163    set: undefined,
164  },
165  origin: {
166    __proto__: null,
167    get() {
168      if (!isMessageEvent(this))
169        throw new ERR_INVALID_THIS('MessageEvent');
170      return this[kOrigin];
171    },
172    enumerable: true,
173    configurable: true,
174    set: undefined,
175  },
176  lastEventId: {
177    __proto__: null,
178    get() {
179      if (!isMessageEvent(this))
180        throw new ERR_INVALID_THIS('MessageEvent');
181      return this[kLastEventId];
182    },
183    enumerable: true,
184    configurable: true,
185    set: undefined,
186  },
187  source: {
188    __proto__: null,
189    get() {
190      if (!isMessageEvent(this))
191        throw new ERR_INVALID_THIS('MessageEvent');
192      return this[kSource];
193    },
194    enumerable: true,
195    configurable: true,
196    set: undefined,
197  },
198  ports: {
199    __proto__: null,
200    get() {
201      if (!isMessageEvent(this))
202        throw new ERR_INVALID_THIS('MessageEvent');
203      return this[kPorts];
204    },
205    enumerable: true,
206    configurable: true,
207    set: undefined,
208  },
209});
210
211const originalCreateEvent = EventTarget.prototype[kCreateEvent];
212ObjectDefineProperty(
213  MessagePort.prototype,
214  kCreateEvent,
215  {
216    __proto__: null,
217    value: function(data, type) {
218      if (type !== 'message' && type !== 'messageerror') {
219        return ReflectApply(originalCreateEvent, this, arguments);
220      }
221      const ports = this[kCurrentlyReceivingPorts];
222      this[kCurrentlyReceivingPorts] = undefined;
223      return new MessageEvent(type, { data, ports });
224    },
225    configurable: false,
226    writable: false,
227    enumerable: false,
228  });
229
230// This is called from inside the `MessagePort` constructor.
231function oninit() {
232  initNodeEventTarget(this);
233  setupPortReferencing(this, this, 'message');
234  this[kCurrentlyReceivingPorts] = undefined;
235}
236
237defineEventHandler(MessagePort.prototype, 'message');
238defineEventHandler(MessagePort.prototype, 'messageerror');
239
240ObjectDefineProperty(MessagePort.prototype, onInitSymbol, {
241  __proto__: null,
242  enumerable: true,
243  writable: false,
244  value: oninit,
245});
246
247class MessagePortCloseEvent extends Event {
248  constructor() {
249    super('close');
250  }
251}
252
253// This is called after the underlying `uv_async_t` has been closed.
254function onclose() {
255  this.dispatchEvent(new MessagePortCloseEvent());
256}
257
258ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, {
259  __proto__: null,
260  enumerable: false,
261  writable: false,
262  value: onclose,
263});
264
265MessagePort.prototype.close = function(cb) {
266  if (typeof cb === 'function')
267    this.once('close', cb);
268  FunctionPrototypeCall(MessagePortPrototype.close, this);
269};
270
271ObjectDefineProperty(MessagePort.prototype, inspect.custom, {
272  __proto__: null,
273  enumerable: false,
274  writable: false,
275  value: function inspect() {  // eslint-disable-line func-name-matching
276    let ref;
277    try {
278      // This may throw when `this` does not refer to a native object,
279      // e.g. when accessing the prototype directly.
280      ref = FunctionPrototypeCall(MessagePortPrototype.hasRef, this);
281    } catch { return this; }
282    return ObjectAssign(ObjectCreate(MessagePort.prototype),
283                        ref === undefined ? {
284                          active: false,
285                        } : {
286                          active: true,
287                          refed: ref,
288                        },
289                        this);
290  },
291});
292
293function setupPortReferencing(port, eventEmitter, eventName) {
294  // Keep track of whether there are any workerMessage listeners:
295  // If there are some, ref() the channel so it keeps the event loop alive.
296  // If there are none or all are removed, unref() the channel so the worker
297  // can shutdown gracefully.
298  port.unref();
299  eventEmitter.on('newListener', function(name) {
300    if (name === eventName) newListener(eventEmitter.listenerCount(name));
301  });
302  eventEmitter.on('removeListener', function(name) {
303    if (name === eventName) removeListener(eventEmitter.listenerCount(name));
304  });
305  const origNewListener = eventEmitter[kNewListener];
306  setOwnProperty(eventEmitter, kNewListener, function(size, type, ...args) {
307    if (type === eventName) newListener(size - 1);
308    return ReflectApply(origNewListener, this, arguments);
309  });
310  const origRemoveListener = eventEmitter[kRemoveListener];
311  setOwnProperty(eventEmitter, kRemoveListener, function(size, type, ...args) {
312    if (type === eventName) removeListener(size);
313    return ReflectApply(origRemoveListener, this, arguments);
314  });
315
316  function newListener(size) {
317    if (size === 0) {
318      port.ref();
319      FunctionPrototypeCall(MessagePortPrototype.start, port);
320    }
321  }
322
323  function removeListener(size) {
324    if (size === 0) {
325      stopMessagePort(port);
326      port.unref();
327    }
328  }
329}
330
331
332class ReadableWorkerStdio extends Readable {
333  constructor(port, name) {
334    super();
335    this[kPort] = port;
336    this[kName] = name;
337    this[kIncrementsPortRef] = true;
338    this[kStartedReading] = false;
339    this.on('end', () => {
340      if (this[kStartedReading] && this[kIncrementsPortRef]) {
341        if (--this[kPort][kWaitingStreams] === 0)
342          this[kPort].unref();
343      }
344    });
345  }
346
347  _read() {
348    if (!this[kStartedReading] && this[kIncrementsPortRef]) {
349      this[kStartedReading] = true;
350      if (this[kPort][kWaitingStreams]++ === 0)
351        this[kPort].ref();
352    }
353
354    this[kPort].postMessage({
355      type: messageTypes.STDIO_WANTS_MORE_DATA,
356      stream: this[kName],
357    });
358  }
359}
360
361class WritableWorkerStdio extends Writable {
362  constructor(port, name) {
363    super({ decodeStrings: false });
364    this[kPort] = port;
365    this[kName] = name;
366    this[kWritableCallbacks] = [];
367  }
368
369  _writev(chunks, cb) {
370    this[kPort].postMessage({
371      type: messageTypes.STDIO_PAYLOAD,
372      stream: this[kName],
373      chunks: ArrayPrototypeMap(chunks,
374                                ({ chunk, encoding }) => ({ chunk, encoding })),
375    });
376    ArrayPrototypePush(this[kWritableCallbacks], cb);
377    if (this[kPort][kWaitingStreams]++ === 0)
378      this[kPort].ref();
379  }
380
381  _final(cb) {
382    this[kPort].postMessage({
383      type: messageTypes.STDIO_PAYLOAD,
384      stream: this[kName],
385      chunks: [ { chunk: null, encoding: '' } ],
386    });
387    cb();
388  }
389
390  [kStdioWantsMoreDataCallback]() {
391    const cbs = this[kWritableCallbacks];
392    this[kWritableCallbacks] = [];
393    ArrayPrototypeForEach(cbs, (cb) => cb());
394    if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
395      this[kPort].unref();
396  }
397}
398
399function createWorkerStdio() {
400  const port = getEnvMessagePort();
401  port[kWaitingStreams] = 0;
402  return {
403    stdin: new ReadableWorkerStdio(port, 'stdin'),
404    stdout: new WritableWorkerStdio(port, 'stdout'),
405    stderr: new WritableWorkerStdio(port, 'stderr'),
406  };
407}
408
409function receiveMessageOnPort(port) {
410  const message = receiveMessageOnPort_(port?.[kHandle] ?? port);
411  if (message === noMessageSymbol) return undefined;
412  return { message };
413}
414
415function onMessageEvent(type, data) {
416  this.dispatchEvent(new MessageEvent(type, { data }));
417}
418
419function isBroadcastChannel(value) {
420  return value?.[kType] === 'BroadcastChannel';
421}
422
423class BroadcastChannel extends EventTarget {
424  /**
425   * @param {string} name
426   */
427  constructor(name) {
428    if (arguments.length === 0)
429      throw new ERR_MISSING_ARGS('name');
430    super();
431    this[kType] = 'BroadcastChannel';
432    this[kName] = `${name}`;
433    this[kHandle] = broadcastChannel(this[kName]);
434    this[kOnMessage] = FunctionPrototypeBind(onMessageEvent, this, 'message');
435    this[kOnMessageError] =
436      FunctionPrototypeBind(onMessageEvent, this, 'messageerror');
437    this[kHandle].on('message', this[kOnMessage]);
438    this[kHandle].on('messageerror', this[kOnMessageError]);
439  }
440
441  [inspect.custom](depth, options) {
442    if (!isBroadcastChannel(this))
443      throw new ERR_INVALID_THIS('BroadcastChannel');
444    if (depth < 0)
445      return 'BroadcastChannel';
446
447    const opts = {
448      ...options,
449      depth: options.depth == null ? null : options.depth - 1,
450    };
451
452    return `BroadcastChannel ${inspect({
453      name: this[kName],
454      active: this[kHandle] !== undefined,
455    }, opts)}`;
456  }
457
458  /**
459   * @type {string}
460   */
461  get name() {
462    if (!isBroadcastChannel(this))
463      throw new ERR_INVALID_THIS('BroadcastChannel');
464    return this[kName];
465  }
466
467  /**
468   * @returns {void}
469   */
470  close() {
471    if (!isBroadcastChannel(this))
472      throw new ERR_INVALID_THIS('BroadcastChannel');
473    if (this[kHandle] === undefined)
474      return;
475    this[kHandle].off('message', this[kOnMessage]);
476    this[kHandle].off('messageerror', this[kOnMessageError]);
477    this[kOnMessage] = undefined;
478    this[kOnMessageError] = undefined;
479    this[kHandle].close();
480    this[kHandle] = undefined;
481  }
482
483  /**
484   *
485   * @param {any} message
486   * @returns {void}
487   */
488  postMessage(message) {
489    if (!isBroadcastChannel(this))
490      throw new ERR_INVALID_THIS('BroadcastChannel');
491    if (arguments.length === 0)
492      throw new ERR_MISSING_ARGS('message');
493    if (this[kHandle] === undefined)
494      throw new DOMException('BroadcastChannel is closed.');
495    if (this[kHandle].postMessage(message) === undefined)
496      throw new DOMException('Message could not be posted.');
497  }
498
499  // The ref() method is Node.js specific and not part of the standard
500  // BroadcastChannel API definition. Typically we shouldn't extend Web
501  // Platform APIs with Node.js specific methods but ref and unref
502  // are a bit special.
503  /**
504   * @returns {BroadcastChannel}
505   */
506  ref() {
507    if (!isBroadcastChannel(this))
508      throw new ERR_INVALID_THIS('BroadcastChannel');
509    if (this[kHandle])
510      this[kHandle].ref();
511    return this;
512  }
513
514  // The unref() method is Node.js specific and not part of the standard
515  // BroadcastChannel API definition. Typically we shouldn't extend Web
516  // Platform APIs with Node.js specific methods but ref and unref
517  // are a bit special.
518  /**
519   * @returns {BroadcastChannel}
520   */
521  unref() {
522    if (!isBroadcastChannel(this))
523      throw new ERR_INVALID_THIS('BroadcastChannel');
524    if (this[kHandle])
525      this[kHandle].unref();
526    return this;
527  }
528}
529
530ObjectDefineProperties(BroadcastChannel.prototype, {
531  name: kEnumerableProperty,
532  close: kEnumerableProperty,
533  postMessage: kEnumerableProperty,
534});
535
536defineEventHandler(BroadcastChannel.prototype, 'message');
537defineEventHandler(BroadcastChannel.prototype, 'messageerror');
538
539module.exports = {
540  drainMessagePort,
541  messageTypes,
542  kPort,
543  kIncrementsPortRef,
544  kWaitingStreams,
545  kStdioWantsMoreDataCallback,
546  moveMessagePortToContext,
547  MessagePort,
548  MessageChannel,
549  MessageEvent,
550  receiveMessageOnPort,
551  setupPortReferencing,
552  ReadableWorkerStdio,
553  WritableWorkerStdio,
554  createWorkerStdio,
555  BroadcastChannel,
556};
557