• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayPrototypeForEach,
5  ArrayPrototypeMap,
6  ArrayPrototypePush,
7  FunctionPrototypeCall,
8  ObjectAssign,
9  ObjectCreate,
10  ObjectDefineProperty,
11  ObjectGetOwnPropertyDescriptors,
12  ObjectGetPrototypeOf,
13  ObjectSetPrototypeOf,
14  ReflectApply,
15  Symbol,
16} = primordials;
17
18const {
19  handle_onclose: handleOnCloseSymbol,
20  oninit: onInitSymbol,
21  no_message_symbol: noMessageSymbol
22} = internalBinding('symbols');
23const {
24  MessagePort,
25  MessageChannel,
26  drainMessagePort,
27  moveMessagePortToContext,
28  receiveMessageOnPort: receiveMessageOnPort_,
29  stopMessagePort
30} = internalBinding('messaging');
31const {
32  getEnvMessagePort
33} = internalBinding('worker');
34
35const { Readable, Writable } = require('stream');
36const {
37  Event,
38  EventTarget,
39  NodeEventTarget,
40  defineEventHandler,
41  initNodeEventTarget,
42  kCreateEvent,
43  kNewListener,
44  kRemoveListener,
45} = require('internal/event_target');
46const { inspect } = require('internal/util/inspect');
47
48const kIncrementsPortRef = Symbol('kIncrementsPortRef');
49const kName = Symbol('kName');
50const kPort = Symbol('kPort');
51const kWaitingStreams = Symbol('kWaitingStreams');
52const kWritableCallbacks = Symbol('kWritableCallbacks');
53const kStartedReading = Symbol('kStartedReading');
54const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
55
56const messageTypes = {
57  UP_AND_RUNNING: 'upAndRunning',
58  COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError',
59  ERROR_MESSAGE: 'errorMessage',
60  STDIO_PAYLOAD: 'stdioPayload',
61  STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
62  LOAD_SCRIPT: 'loadScript'
63};
64
65// We have to mess with the MessagePort prototype a bit, so that a) we can make
66// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do
67// not provide methods that are not present in the Browser and not documented
68// on our side (e.g. hasRef).
69// Save a copy of the original set of methods as a shallow clone.
70const MessagePortPrototype = ObjectCreate(
71  ObjectGetPrototypeOf(MessagePort.prototype),
72  ObjectGetOwnPropertyDescriptors(MessagePort.prototype));
73// Set up the new inheritance chain.
74ObjectSetPrototypeOf(MessagePort, NodeEventTarget);
75ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype);
76// Copy methods that are inherited from HandleWrap, because
77// changing the prototype of MessagePort.prototype implicitly removed them.
78MessagePort.prototype.ref = MessagePortPrototype.ref;
79MessagePort.prototype.unref = MessagePortPrototype.unref;
80
81class MessageEvent extends Event {
82  constructor(data, target, type) {
83    super(type);
84    this.data = data;
85  }
86}
87
88const originalCreateEvent = EventTarget.prototype[kCreateEvent];
89ObjectDefineProperty(
90  MessagePort.prototype,
91  kCreateEvent,
92  {
93    value: function(data, type) {
94      if (type !== 'message' && type !== 'messageerror') {
95        return ReflectApply(originalCreateEvent, this, arguments);
96      }
97      return new MessageEvent(data, this, type);
98    },
99    configurable: false,
100    writable: false,
101    enumerable: false,
102  });
103
104// This is called from inside the `MessagePort` constructor.
105function oninit() {
106  initNodeEventTarget(this);
107  setupPortReferencing(this, this, 'message');
108}
109
110defineEventHandler(MessagePort.prototype, 'message');
111defineEventHandler(MessagePort.prototype, 'messageerror');
112
113ObjectDefineProperty(MessagePort.prototype, onInitSymbol, {
114  enumerable: true,
115  writable: false,
116  value: oninit
117});
118
119class MessagePortCloseEvent extends Event {
120  constructor() {
121    super('close');
122  }
123}
124
125// This is called after the underlying `uv_async_t` has been closed.
126function onclose() {
127  this.dispatchEvent(new MessagePortCloseEvent());
128}
129
130ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, {
131  enumerable: false,
132  writable: false,
133  value: onclose
134});
135
136MessagePort.prototype.close = function(cb) {
137  if (typeof cb === 'function')
138    this.once('close', cb);
139  FunctionPrototypeCall(MessagePortPrototype.close, this);
140};
141
142ObjectDefineProperty(MessagePort.prototype, inspect.custom, {
143  enumerable: false,
144  writable: false,
145  value: function inspect() {  // eslint-disable-line func-name-matching
146    let ref;
147    try {
148      // This may throw when `this` does not refer to a native object,
149      // e.g. when accessing the prototype directly.
150      ref = FunctionPrototypeCall(MessagePortPrototype.hasRef, this);
151    } catch { return this; }
152    return ObjectAssign(ObjectCreate(MessagePort.prototype),
153                        ref === undefined ? {
154                          active: false,
155                        } : {
156                          active: true,
157                          refed: ref
158                        },
159                        this);
160  }
161});
162
163function setupPortReferencing(port, eventEmitter, eventName) {
164  // Keep track of whether there are any workerMessage listeners:
165  // If there are some, ref() the channel so it keeps the event loop alive.
166  // If there are none or all are removed, unref() the channel so the worker
167  // can shutdown gracefully.
168  port.unref();
169  eventEmitter.on('newListener', function(name) {
170    if (name === eventName) newListener(eventEmitter.listenerCount(name));
171  });
172  eventEmitter.on('removeListener', function(name) {
173    if (name === eventName) removeListener(eventEmitter.listenerCount(name));
174  });
175  const origNewListener = eventEmitter[kNewListener];
176  eventEmitter[kNewListener] = function(size, type, ...args) {
177    if (type === eventName) newListener(size - 1);
178    return ReflectApply(origNewListener, this, arguments);
179  };
180  const origRemoveListener = eventEmitter[kRemoveListener];
181  eventEmitter[kRemoveListener] = function(size, type, ...args) {
182    if (type === eventName) removeListener(size);
183    return ReflectApply(origRemoveListener, this, arguments);
184  };
185
186  function newListener(size) {
187    if (size === 0) {
188      port.ref();
189      FunctionPrototypeCall(MessagePortPrototype.start, port);
190    }
191  }
192
193  function removeListener(size) {
194    if (size === 0) {
195      stopMessagePort(port);
196      port.unref();
197    }
198  }
199}
200
201
202class ReadableWorkerStdio extends Readable {
203  constructor(port, name) {
204    super();
205    this[kPort] = port;
206    this[kName] = name;
207    this[kIncrementsPortRef] = true;
208    this[kStartedReading] = false;
209    this.on('end', () => {
210      if (this[kStartedReading] && this[kIncrementsPortRef]) {
211        if (--this[kPort][kWaitingStreams] === 0)
212          this[kPort].unref();
213      }
214    });
215  }
216
217  _read() {
218    if (!this[kStartedReading] && this[kIncrementsPortRef]) {
219      this[kStartedReading] = true;
220      if (this[kPort][kWaitingStreams]++ === 0)
221        this[kPort].ref();
222    }
223
224    this[kPort].postMessage({
225      type: messageTypes.STDIO_WANTS_MORE_DATA,
226      stream: this[kName]
227    });
228  }
229}
230
231class WritableWorkerStdio extends Writable {
232  constructor(port, name) {
233    super({ decodeStrings: false });
234    this[kPort] = port;
235    this[kName] = name;
236    this[kWritableCallbacks] = [];
237  }
238
239  _writev(chunks, cb) {
240    this[kPort].postMessage({
241      type: messageTypes.STDIO_PAYLOAD,
242      stream: this[kName],
243      chunks: ArrayPrototypeMap(chunks,
244                                ({ chunk, encoding }) => ({ chunk, encoding })),
245    });
246    ArrayPrototypePush(this[kWritableCallbacks], cb);
247    if (this[kPort][kWaitingStreams]++ === 0)
248      this[kPort].ref();
249  }
250
251  _final(cb) {
252    this[kPort].postMessage({
253      type: messageTypes.STDIO_PAYLOAD,
254      stream: this[kName],
255      chunks: [ { chunk: null, encoding: '' } ]
256    });
257    cb();
258  }
259
260  [kStdioWantsMoreDataCallback]() {
261    const cbs = this[kWritableCallbacks];
262    this[kWritableCallbacks] = [];
263    ArrayPrototypeForEach(cbs, (cb) => cb());
264    if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
265      this[kPort].unref();
266  }
267}
268
269function createWorkerStdio() {
270  const port = getEnvMessagePort();
271  port[kWaitingStreams] = 0;
272  return {
273    stdin: new ReadableWorkerStdio(port, 'stdin'),
274    stdout: new WritableWorkerStdio(port, 'stdout'),
275    stderr: new WritableWorkerStdio(port, 'stderr')
276  };
277}
278
279function receiveMessageOnPort(port) {
280  const message = receiveMessageOnPort_(port);
281  if (message === noMessageSymbol) return undefined;
282  return { message };
283}
284
285module.exports = {
286  drainMessagePort,
287  messageTypes,
288  kPort,
289  kIncrementsPortRef,
290  kWaitingStreams,
291  kStdioWantsMoreDataCallback,
292  moveMessagePortToContext,
293  MessagePort,
294  MessageChannel,
295  receiveMessageOnPort,
296  setupPortReferencing,
297  ReadableWorkerStdio,
298  WritableWorkerStdio,
299  createWorkerStdio
300};
301