1'use strict'; 2 3const { 4 ArrayPrototypeForEach, 5 ArrayPrototypeMap, 6 ArrayPrototypePush, 7 FunctionPrototypeCall, 8 ObjectAssign, 9 ObjectCreate, 10 ObjectDefineProperty, 11 ObjectGetOwnPropertyDescriptors, 12 ObjectGetPrototypeOf, 13 ObjectSetPrototypeOf, 14 ReflectApply, 15 Symbol, 16} = primordials; 17 18const { 19 handle_onclose: handleOnCloseSymbol, 20 oninit: onInitSymbol, 21 no_message_symbol: noMessageSymbol 22} = internalBinding('symbols'); 23const { 24 MessagePort, 25 MessageChannel, 26 drainMessagePort, 27 moveMessagePortToContext, 28 receiveMessageOnPort: receiveMessageOnPort_, 29 stopMessagePort 30} = internalBinding('messaging'); 31const { 32 getEnvMessagePort 33} = internalBinding('worker'); 34 35const { Readable, Writable } = require('stream'); 36const { 37 Event, 38 EventTarget, 39 NodeEventTarget, 40 defineEventHandler, 41 initNodeEventTarget, 42 kCreateEvent, 43 kNewListener, 44 kRemoveListener, 45} = require('internal/event_target'); 46const { inspect } = require('internal/util/inspect'); 47 48const kIncrementsPortRef = Symbol('kIncrementsPortRef'); 49const kName = Symbol('kName'); 50const kPort = Symbol('kPort'); 51const kWaitingStreams = Symbol('kWaitingStreams'); 52const kWritableCallbacks = Symbol('kWritableCallbacks'); 53const kStartedReading = Symbol('kStartedReading'); 54const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback'); 55 56const messageTypes = { 57 UP_AND_RUNNING: 'upAndRunning', 58 COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError', 59 ERROR_MESSAGE: 'errorMessage', 60 STDIO_PAYLOAD: 'stdioPayload', 61 STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData', 62 LOAD_SCRIPT: 'loadScript' 63}; 64 65// We have to mess with the MessagePort prototype a bit, so that a) we can make 66// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do 67// not provide methods that are not present in the Browser and not documented 68// on our side (e.g. hasRef). 69// Save a copy of the original set of methods as a shallow clone. 70const MessagePortPrototype = ObjectCreate( 71 ObjectGetPrototypeOf(MessagePort.prototype), 72 ObjectGetOwnPropertyDescriptors(MessagePort.prototype)); 73// Set up the new inheritance chain. 74ObjectSetPrototypeOf(MessagePort, NodeEventTarget); 75ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype); 76// Copy methods that are inherited from HandleWrap, because 77// changing the prototype of MessagePort.prototype implicitly removed them. 78MessagePort.prototype.ref = MessagePortPrototype.ref; 79MessagePort.prototype.unref = MessagePortPrototype.unref; 80 81class MessageEvent extends Event { 82 constructor(data, target, type) { 83 super(type); 84 this.data = data; 85 } 86} 87 88const originalCreateEvent = EventTarget.prototype[kCreateEvent]; 89ObjectDefineProperty( 90 MessagePort.prototype, 91 kCreateEvent, 92 { 93 value: function(data, type) { 94 if (type !== 'message' && type !== 'messageerror') { 95 return ReflectApply(originalCreateEvent, this, arguments); 96 } 97 return new MessageEvent(data, this, type); 98 }, 99 configurable: false, 100 writable: false, 101 enumerable: false, 102 }); 103 104// This is called from inside the `MessagePort` constructor. 105function oninit() { 106 initNodeEventTarget(this); 107 setupPortReferencing(this, this, 'message'); 108} 109 110defineEventHandler(MessagePort.prototype, 'message'); 111defineEventHandler(MessagePort.prototype, 'messageerror'); 112 113ObjectDefineProperty(MessagePort.prototype, onInitSymbol, { 114 enumerable: true, 115 writable: false, 116 value: oninit 117}); 118 119class MessagePortCloseEvent extends Event { 120 constructor() { 121 super('close'); 122 } 123} 124 125// This is called after the underlying `uv_async_t` has been closed. 126function onclose() { 127 this.dispatchEvent(new MessagePortCloseEvent()); 128} 129 130ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, { 131 enumerable: false, 132 writable: false, 133 value: onclose 134}); 135 136MessagePort.prototype.close = function(cb) { 137 if (typeof cb === 'function') 138 this.once('close', cb); 139 FunctionPrototypeCall(MessagePortPrototype.close, this); 140}; 141 142ObjectDefineProperty(MessagePort.prototype, inspect.custom, { 143 enumerable: false, 144 writable: false, 145 value: function inspect() { // eslint-disable-line func-name-matching 146 let ref; 147 try { 148 // This may throw when `this` does not refer to a native object, 149 // e.g. when accessing the prototype directly. 150 ref = FunctionPrototypeCall(MessagePortPrototype.hasRef, this); 151 } catch { return this; } 152 return ObjectAssign(ObjectCreate(MessagePort.prototype), 153 ref === undefined ? { 154 active: false, 155 } : { 156 active: true, 157 refed: ref 158 }, 159 this); 160 } 161}); 162 163function setupPortReferencing(port, eventEmitter, eventName) { 164 // Keep track of whether there are any workerMessage listeners: 165 // If there are some, ref() the channel so it keeps the event loop alive. 166 // If there are none or all are removed, unref() the channel so the worker 167 // can shutdown gracefully. 168 port.unref(); 169 eventEmitter.on('newListener', function(name) { 170 if (name === eventName) newListener(eventEmitter.listenerCount(name)); 171 }); 172 eventEmitter.on('removeListener', function(name) { 173 if (name === eventName) removeListener(eventEmitter.listenerCount(name)); 174 }); 175 const origNewListener = eventEmitter[kNewListener]; 176 eventEmitter[kNewListener] = function(size, type, ...args) { 177 if (type === eventName) newListener(size - 1); 178 return ReflectApply(origNewListener, this, arguments); 179 }; 180 const origRemoveListener = eventEmitter[kRemoveListener]; 181 eventEmitter[kRemoveListener] = function(size, type, ...args) { 182 if (type === eventName) removeListener(size); 183 return ReflectApply(origRemoveListener, this, arguments); 184 }; 185 186 function newListener(size) { 187 if (size === 0) { 188 port.ref(); 189 FunctionPrototypeCall(MessagePortPrototype.start, port); 190 } 191 } 192 193 function removeListener(size) { 194 if (size === 0) { 195 stopMessagePort(port); 196 port.unref(); 197 } 198 } 199} 200 201 202class ReadableWorkerStdio extends Readable { 203 constructor(port, name) { 204 super(); 205 this[kPort] = port; 206 this[kName] = name; 207 this[kIncrementsPortRef] = true; 208 this[kStartedReading] = false; 209 this.on('end', () => { 210 if (this[kStartedReading] && this[kIncrementsPortRef]) { 211 if (--this[kPort][kWaitingStreams] === 0) 212 this[kPort].unref(); 213 } 214 }); 215 } 216 217 _read() { 218 if (!this[kStartedReading] && this[kIncrementsPortRef]) { 219 this[kStartedReading] = true; 220 if (this[kPort][kWaitingStreams]++ === 0) 221 this[kPort].ref(); 222 } 223 224 this[kPort].postMessage({ 225 type: messageTypes.STDIO_WANTS_MORE_DATA, 226 stream: this[kName] 227 }); 228 } 229} 230 231class WritableWorkerStdio extends Writable { 232 constructor(port, name) { 233 super({ decodeStrings: false }); 234 this[kPort] = port; 235 this[kName] = name; 236 this[kWritableCallbacks] = []; 237 } 238 239 _writev(chunks, cb) { 240 this[kPort].postMessage({ 241 type: messageTypes.STDIO_PAYLOAD, 242 stream: this[kName], 243 chunks: ArrayPrototypeMap(chunks, 244 ({ chunk, encoding }) => ({ chunk, encoding })), 245 }); 246 ArrayPrototypePush(this[kWritableCallbacks], cb); 247 if (this[kPort][kWaitingStreams]++ === 0) 248 this[kPort].ref(); 249 } 250 251 _final(cb) { 252 this[kPort].postMessage({ 253 type: messageTypes.STDIO_PAYLOAD, 254 stream: this[kName], 255 chunks: [ { chunk: null, encoding: '' } ] 256 }); 257 cb(); 258 } 259 260 [kStdioWantsMoreDataCallback]() { 261 const cbs = this[kWritableCallbacks]; 262 this[kWritableCallbacks] = []; 263 ArrayPrototypeForEach(cbs, (cb) => cb()); 264 if ((this[kPort][kWaitingStreams] -= cbs.length) === 0) 265 this[kPort].unref(); 266 } 267} 268 269function createWorkerStdio() { 270 const port = getEnvMessagePort(); 271 port[kWaitingStreams] = 0; 272 return { 273 stdin: new ReadableWorkerStdio(port, 'stdin'), 274 stdout: new WritableWorkerStdio(port, 'stdout'), 275 stderr: new WritableWorkerStdio(port, 'stderr') 276 }; 277} 278 279function receiveMessageOnPort(port) { 280 const message = receiveMessageOnPort_(port); 281 if (message === noMessageSymbol) return undefined; 282 return { message }; 283} 284 285module.exports = { 286 drainMessagePort, 287 messageTypes, 288 kPort, 289 kIncrementsPortRef, 290 kWaitingStreams, 291 kStdioWantsMoreDataCallback, 292 moveMessagePortToContext, 293 MessagePort, 294 MessageChannel, 295 receiveMessageOnPort, 296 setupPortReferencing, 297 ReadableWorkerStdio, 298 WritableWorkerStdio, 299 createWorkerStdio 300}; 301