1'use strict'; 2 3const { 4 ObjectAssign, 5 ObjectCreate, 6 ObjectDefineProperty, 7 ObjectGetOwnPropertyDescriptors, 8 ObjectGetPrototypeOf, 9 ObjectSetPrototypeOf, 10 Symbol, 11} = primordials; 12 13const { 14 handle_onclose: handleOnCloseSymbol, 15 oninit: onInitSymbol, 16 no_message_symbol: noMessageSymbol 17} = internalBinding('symbols'); 18const { 19 MessagePort, 20 MessageChannel, 21 drainMessagePort, 22 moveMessagePortToContext, 23 receiveMessageOnPort: receiveMessageOnPort_, 24 stopMessagePort 25} = internalBinding('messaging'); 26const { 27 threadId, 28 getEnvMessagePort 29} = internalBinding('worker'); 30 31const { Readable, Writable } = require('stream'); 32const EventEmitter = require('events'); 33const { inspect } = require('internal/util/inspect'); 34let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { 35 debug = fn; 36}); 37 38const kIncrementsPortRef = Symbol('kIncrementsPortRef'); 39const kName = Symbol('kName'); 40const kOnMessageListener = Symbol('kOnMessageListener'); 41const kPort = Symbol('kPort'); 42const kWaitingStreams = Symbol('kWaitingStreams'); 43const kWritableCallbacks = Symbol('kWritableCallbacks'); 44const kStartedReading = Symbol('kStartedReading'); 45const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback'); 46 47const messageTypes = { 48 UP_AND_RUNNING: 'upAndRunning', 49 COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError', 50 ERROR_MESSAGE: 'errorMessage', 51 STDIO_PAYLOAD: 'stdioPayload', 52 STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData', 53 LOAD_SCRIPT: 'loadScript' 54}; 55 56// We have to mess with the MessagePort prototype a bit, so that a) we can make 57// it inherit from EventEmitter, even though it is a C++ class, and b) we do 58// not provide methods that are not present in the Browser and not documented 59// on our side (e.g. hasRef). 60// Save a copy of the original set of methods as a shallow clone. 61const MessagePortPrototype = ObjectCreate( 62 ObjectGetPrototypeOf(MessagePort.prototype), 63 ObjectGetOwnPropertyDescriptors(MessagePort.prototype)); 64// Set up the new inheritance chain. 65ObjectSetPrototypeOf(MessagePort, EventEmitter); 66ObjectSetPrototypeOf(MessagePort.prototype, EventEmitter.prototype); 67// Copy methods that are inherited from HandleWrap, because 68// changing the prototype of MessagePort.prototype implicitly removed them. 69MessagePort.prototype.ref = MessagePortPrototype.ref; 70MessagePort.prototype.unref = MessagePortPrototype.unref; 71 72// A communication channel consisting of a handle (that wraps around an 73// uv_async_t) which can receive information from other threads and emits 74// .onmessage events, and a function used for sending data to a MessagePort 75// in some other thread. 76MessagePort.prototype[kOnMessageListener] = function onmessage(event) { 77 if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA) 78 debug(`[${threadId}] received message`, event); 79 // Emit the deserialized object to userland. 80 this.emit('message', event.data); 81}; 82 83// This is for compatibility with the Web's MessagePort API. It makes sense to 84// provide it as an `EventEmitter` in Node.js, but if somebody overrides 85// `onmessage`, we'll switch over to the Web API model. 86ObjectDefineProperty(MessagePort.prototype, 'onmessage', { 87 enumerable: true, 88 configurable: true, 89 get() { 90 return this[kOnMessageListener]; 91 }, 92 set(value) { 93 this[kOnMessageListener] = value; 94 if (typeof value === 'function') { 95 this.ref(); 96 MessagePortPrototype.start.call(this); 97 } else { 98 this.unref(); 99 stopMessagePort(this); 100 } 101 } 102}); 103 104// This is called from inside the `MessagePort` constructor. 105function oninit() { 106 setupPortReferencing(this, this, 'message'); 107} 108 109ObjectDefineProperty(MessagePort.prototype, onInitSymbol, { 110 enumerable: true, 111 writable: false, 112 value: oninit 113}); 114 115// This is called after the underlying `uv_async_t` has been closed. 116function onclose() { 117 this.emit('close'); 118} 119 120ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, { 121 enumerable: false, 122 writable: false, 123 value: onclose 124}); 125 126MessagePort.prototype.close = function(cb) { 127 if (typeof cb === 'function') 128 this.once('close', cb); 129 MessagePortPrototype.close.call(this); 130}; 131 132ObjectDefineProperty(MessagePort.prototype, inspect.custom, { 133 enumerable: false, 134 writable: false, 135 value: function inspect() { // eslint-disable-line func-name-matching 136 let ref; 137 try { 138 // This may throw when `this` does not refer to a native object, 139 // e.g. when accessing the prototype directly. 140 ref = MessagePortPrototype.hasRef.call(this); 141 } catch { return this; } 142 return ObjectAssign(ObjectCreate(MessagePort.prototype), 143 ref === undefined ? { 144 active: false, 145 } : { 146 active: true, 147 refed: ref 148 }, 149 this); 150 } 151}); 152 153function setupPortReferencing(port, eventEmitter, eventName) { 154 // Keep track of whether there are any workerMessage listeners: 155 // If there are some, ref() the channel so it keeps the event loop alive. 156 // If there are none or all are removed, unref() the channel so the worker 157 // can shutdown gracefully. 158 port.unref(); 159 eventEmitter.on('newListener', (name) => { 160 if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { 161 port.ref(); 162 MessagePortPrototype.start.call(port); 163 } 164 }); 165 eventEmitter.on('removeListener', (name) => { 166 if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { 167 stopMessagePort(port); 168 port.unref(); 169 } 170 }); 171} 172 173 174class ReadableWorkerStdio extends Readable { 175 constructor(port, name) { 176 super(); 177 this[kPort] = port; 178 this[kName] = name; 179 this[kIncrementsPortRef] = true; 180 this[kStartedReading] = false; 181 this.on('end', () => { 182 if (this[kStartedReading] && this[kIncrementsPortRef]) { 183 if (--this[kPort][kWaitingStreams] === 0) 184 this[kPort].unref(); 185 } 186 }); 187 } 188 189 _read() { 190 if (!this[kStartedReading] && this[kIncrementsPortRef]) { 191 this[kStartedReading] = true; 192 if (this[kPort][kWaitingStreams]++ === 0) 193 this[kPort].ref(); 194 } 195 196 this[kPort].postMessage({ 197 type: messageTypes.STDIO_WANTS_MORE_DATA, 198 stream: this[kName] 199 }); 200 } 201} 202 203class WritableWorkerStdio extends Writable { 204 constructor(port, name) { 205 super({ decodeStrings: false }); 206 this[kPort] = port; 207 this[kName] = name; 208 this[kWritableCallbacks] = []; 209 } 210 211 _writev(chunks, cb) { 212 this[kPort].postMessage({ 213 type: messageTypes.STDIO_PAYLOAD, 214 stream: this[kName], 215 chunks: chunks.map(({ chunk, encoding }) => ({ chunk, encoding })) 216 }); 217 this[kWritableCallbacks].push(cb); 218 if (this[kPort][kWaitingStreams]++ === 0) 219 this[kPort].ref(); 220 } 221 222 _final(cb) { 223 this[kPort].postMessage({ 224 type: messageTypes.STDIO_PAYLOAD, 225 stream: this[kName], 226 chunks: [ { chunk: null, encoding: '' } ] 227 }); 228 cb(); 229 } 230 231 [kStdioWantsMoreDataCallback]() { 232 const cbs = this[kWritableCallbacks]; 233 this[kWritableCallbacks] = []; 234 for (const cb of cbs) 235 cb(); 236 if ((this[kPort][kWaitingStreams] -= cbs.length) === 0) 237 this[kPort].unref(); 238 } 239} 240 241function createWorkerStdio() { 242 const port = getEnvMessagePort(); 243 port[kWaitingStreams] = 0; 244 return { 245 stdin: new ReadableWorkerStdio(port, 'stdin'), 246 stdout: new WritableWorkerStdio(port, 'stdout'), 247 stderr: new WritableWorkerStdio(port, 'stderr') 248 }; 249} 250 251function receiveMessageOnPort(port) { 252 const message = receiveMessageOnPort_(port); 253 if (message === noMessageSymbol) return undefined; 254 return { message }; 255} 256 257module.exports = { 258 drainMessagePort, 259 messageTypes, 260 kPort, 261 kIncrementsPortRef, 262 kWaitingStreams, 263 kStdioWantsMoreDataCallback, 264 moveMessagePortToContext, 265 MessagePort, 266 MessageChannel, 267 receiveMessageOnPort, 268 setupPortReferencing, 269 ReadableWorkerStdio, 270 WritableWorkerStdio, 271 createWorkerStdio 272}; 273