• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ObjectAssign,
5  ObjectCreate,
6  ObjectDefineProperty,
7  ObjectGetOwnPropertyDescriptors,
8  ObjectGetPrototypeOf,
9  ObjectSetPrototypeOf,
10  Symbol,
11} = primordials;
12
13const {
14  handle_onclose: handleOnCloseSymbol,
15  oninit: onInitSymbol,
16  no_message_symbol: noMessageSymbol
17} = internalBinding('symbols');
18const {
19  MessagePort,
20  MessageChannel,
21  drainMessagePort,
22  moveMessagePortToContext,
23  receiveMessageOnPort: receiveMessageOnPort_,
24  stopMessagePort
25} = internalBinding('messaging');
26const {
27  threadId,
28  getEnvMessagePort
29} = internalBinding('worker');
30
31const { Readable, Writable } = require('stream');
32const EventEmitter = require('events');
33const { inspect } = require('internal/util/inspect');
34let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
35  debug = fn;
36});
37
38const kIncrementsPortRef = Symbol('kIncrementsPortRef');
39const kName = Symbol('kName');
40const kOnMessageListener = Symbol('kOnMessageListener');
41const kPort = Symbol('kPort');
42const kWaitingStreams = Symbol('kWaitingStreams');
43const kWritableCallbacks = Symbol('kWritableCallbacks');
44const kStartedReading = Symbol('kStartedReading');
45const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
46
47const messageTypes = {
48  UP_AND_RUNNING: 'upAndRunning',
49  COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError',
50  ERROR_MESSAGE: 'errorMessage',
51  STDIO_PAYLOAD: 'stdioPayload',
52  STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
53  LOAD_SCRIPT: 'loadScript'
54};
55
56// We have to mess with the MessagePort prototype a bit, so that a) we can make
57// it inherit from EventEmitter, even though it is a C++ class, and b) we do
58// not provide methods that are not present in the Browser and not documented
59// on our side (e.g. hasRef).
60// Save a copy of the original set of methods as a shallow clone.
61const MessagePortPrototype = ObjectCreate(
62  ObjectGetPrototypeOf(MessagePort.prototype),
63  ObjectGetOwnPropertyDescriptors(MessagePort.prototype));
64// Set up the new inheritance chain.
65ObjectSetPrototypeOf(MessagePort, EventEmitter);
66ObjectSetPrototypeOf(MessagePort.prototype, EventEmitter.prototype);
67// Copy methods that are inherited from HandleWrap, because
68// changing the prototype of MessagePort.prototype implicitly removed them.
69MessagePort.prototype.ref = MessagePortPrototype.ref;
70MessagePort.prototype.unref = MessagePortPrototype.unref;
71
72// A communication channel consisting of a handle (that wraps around an
73// uv_async_t) which can receive information from other threads and emits
74// .onmessage events, and a function used for sending data to a MessagePort
75// in some other thread.
76MessagePort.prototype[kOnMessageListener] = function onmessage(event) {
77  if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA)
78    debug(`[${threadId}] received message`, event);
79  // Emit the deserialized object to userland.
80  this.emit('message', event.data);
81};
82
83// This is for compatibility with the Web's MessagePort API. It makes sense to
84// provide it as an `EventEmitter` in Node.js, but if somebody overrides
85// `onmessage`, we'll switch over to the Web API model.
86ObjectDefineProperty(MessagePort.prototype, 'onmessage', {
87  enumerable: true,
88  configurable: true,
89  get() {
90    return this[kOnMessageListener];
91  },
92  set(value) {
93    this[kOnMessageListener] = value;
94    if (typeof value === 'function') {
95      this.ref();
96      MessagePortPrototype.start.call(this);
97    } else {
98      this.unref();
99      stopMessagePort(this);
100    }
101  }
102});
103
104// This is called from inside the `MessagePort` constructor.
105function oninit() {
106  setupPortReferencing(this, this, 'message');
107}
108
109ObjectDefineProperty(MessagePort.prototype, onInitSymbol, {
110  enumerable: true,
111  writable: false,
112  value: oninit
113});
114
115// This is called after the underlying `uv_async_t` has been closed.
116function onclose() {
117  this.emit('close');
118}
119
120ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, {
121  enumerable: false,
122  writable: false,
123  value: onclose
124});
125
126MessagePort.prototype.close = function(cb) {
127  if (typeof cb === 'function')
128    this.once('close', cb);
129  MessagePortPrototype.close.call(this);
130};
131
132ObjectDefineProperty(MessagePort.prototype, inspect.custom, {
133  enumerable: false,
134  writable: false,
135  value: function inspect() {  // eslint-disable-line func-name-matching
136    let ref;
137    try {
138      // This may throw when `this` does not refer to a native object,
139      // e.g. when accessing the prototype directly.
140      ref = MessagePortPrototype.hasRef.call(this);
141    } catch { return this; }
142    return ObjectAssign(ObjectCreate(MessagePort.prototype),
143                        ref === undefined ? {
144                          active: false,
145                        } : {
146                          active: true,
147                          refed: ref
148                        },
149                        this);
150  }
151});
152
153function setupPortReferencing(port, eventEmitter, eventName) {
154  // Keep track of whether there are any workerMessage listeners:
155  // If there are some, ref() the channel so it keeps the event loop alive.
156  // If there are none or all are removed, unref() the channel so the worker
157  // can shutdown gracefully.
158  port.unref();
159  eventEmitter.on('newListener', (name) => {
160    if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
161      port.ref();
162      MessagePortPrototype.start.call(port);
163    }
164  });
165  eventEmitter.on('removeListener', (name) => {
166    if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
167      stopMessagePort(port);
168      port.unref();
169    }
170  });
171}
172
173
174class ReadableWorkerStdio extends Readable {
175  constructor(port, name) {
176    super();
177    this[kPort] = port;
178    this[kName] = name;
179    this[kIncrementsPortRef] = true;
180    this[kStartedReading] = false;
181    this.on('end', () => {
182      if (this[kStartedReading] && this[kIncrementsPortRef]) {
183        if (--this[kPort][kWaitingStreams] === 0)
184          this[kPort].unref();
185      }
186    });
187  }
188
189  _read() {
190    if (!this[kStartedReading] && this[kIncrementsPortRef]) {
191      this[kStartedReading] = true;
192      if (this[kPort][kWaitingStreams]++ === 0)
193        this[kPort].ref();
194    }
195
196    this[kPort].postMessage({
197      type: messageTypes.STDIO_WANTS_MORE_DATA,
198      stream: this[kName]
199    });
200  }
201}
202
203class WritableWorkerStdio extends Writable {
204  constructor(port, name) {
205    super({ decodeStrings: false });
206    this[kPort] = port;
207    this[kName] = name;
208    this[kWritableCallbacks] = [];
209  }
210
211  _writev(chunks, cb) {
212    this[kPort].postMessage({
213      type: messageTypes.STDIO_PAYLOAD,
214      stream: this[kName],
215      chunks: chunks.map(({ chunk, encoding }) => ({ chunk, encoding }))
216    });
217    this[kWritableCallbacks].push(cb);
218    if (this[kPort][kWaitingStreams]++ === 0)
219      this[kPort].ref();
220  }
221
222  _final(cb) {
223    this[kPort].postMessage({
224      type: messageTypes.STDIO_PAYLOAD,
225      stream: this[kName],
226      chunks: [ { chunk: null, encoding: '' } ]
227    });
228    cb();
229  }
230
231  [kStdioWantsMoreDataCallback]() {
232    const cbs = this[kWritableCallbacks];
233    this[kWritableCallbacks] = [];
234    for (const cb of cbs)
235      cb();
236    if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
237      this[kPort].unref();
238  }
239}
240
241function createWorkerStdio() {
242  const port = getEnvMessagePort();
243  port[kWaitingStreams] = 0;
244  return {
245    stdin: new ReadableWorkerStdio(port, 'stdin'),
246    stdout: new WritableWorkerStdio(port, 'stdout'),
247    stderr: new WritableWorkerStdio(port, 'stderr')
248  };
249}
250
251function receiveMessageOnPort(port) {
252  const message = receiveMessageOnPort_(port);
253  if (message === noMessageSymbol) return undefined;
254  return { message };
255}
256
257module.exports = {
258  drainMessagePort,
259  messageTypes,
260  kPort,
261  kIncrementsPortRef,
262  kWaitingStreams,
263  kStdioWantsMoreDataCallback,
264  moveMessagePortToContext,
265  MessagePort,
266  MessageChannel,
267  receiveMessageOnPort,
268  setupPortReferencing,
269  ReadableWorkerStdio,
270  WritableWorkerStdio,
271  createWorkerStdio
272};
273