1// Ported from https://github.com/mafintosh/pump with 2// permission from the author, Mathias Buus (@mafintosh). 3 4'use strict'; 5 6const { 7 ArrayIsArray, 8 Promise, 9 SymbolAsyncIterator, 10 SymbolDispose, 11} = primordials; 12 13const eos = require('internal/streams/end-of-stream'); 14const { once } = require('internal/util'); 15const destroyImpl = require('internal/streams/destroy'); 16const Duplex = require('internal/streams/duplex'); 17const { 18 aggregateTwoErrors, 19 codes: { 20 ERR_INVALID_ARG_TYPE, 21 ERR_INVALID_RETURN_VALUE, 22 ERR_MISSING_ARGS, 23 ERR_STREAM_DESTROYED, 24 ERR_STREAM_PREMATURE_CLOSE, 25 }, 26 AbortError, 27} = require('internal/errors'); 28 29const { 30 validateFunction, 31 validateAbortSignal, 32} = require('internal/validators'); 33 34const { 35 isIterable, 36 isReadable, 37 isReadableNodeStream, 38 isNodeStream, 39 isTransformStream, 40 isWebStream, 41 isReadableStream, 42 isReadableFinished, 43} = require('internal/streams/utils'); 44const { AbortController } = require('internal/abort_controller'); 45 46let PassThrough; 47let Readable; 48let addAbortListener; 49 50function destroyer(stream, reading, writing) { 51 let finished = false; 52 stream.on('close', () => { 53 finished = true; 54 }); 55 56 const cleanup = eos(stream, { readable: reading, writable: writing }, (err) => { 57 finished = !err; 58 }); 59 60 return { 61 destroy: (err) => { 62 if (finished) return; 63 finished = true; 64 destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe')); 65 }, 66 cleanup, 67 }; 68} 69 70function popCallback(streams) { 71 // Streams should never be an empty array. It should always contain at least 72 // a single stream. Therefore optimize for the average case instead of 73 // checking for length === 0 as well. 74 validateFunction(streams[streams.length - 1], 'streams[stream.length - 1]'); 75 return streams.pop(); 76} 77 78function makeAsyncIterable(val) { 79 if (isIterable(val)) { 80 return val; 81 } else if (isReadableNodeStream(val)) { 82 // Legacy streams are not Iterable. 83 return fromReadable(val); 84 } 85 throw new ERR_INVALID_ARG_TYPE( 86 'val', ['Readable', 'Iterable', 'AsyncIterable'], val); 87} 88 89async function* fromReadable(val) { 90 if (!Readable) { 91 Readable = require('internal/streams/readable'); 92 } 93 94 yield* Readable.prototype[SymbolAsyncIterator].call(val); 95} 96 97async function pumpToNode(iterable, writable, finish, { end }) { 98 let error; 99 let onresolve = null; 100 101 const resume = (err) => { 102 if (err) { 103 error = err; 104 } 105 106 if (onresolve) { 107 const callback = onresolve; 108 onresolve = null; 109 callback(); 110 } 111 }; 112 113 const wait = () => new Promise((resolve, reject) => { 114 if (error) { 115 reject(error); 116 } else { 117 onresolve = () => { 118 if (error) { 119 reject(error); 120 } else { 121 resolve(); 122 } 123 }; 124 } 125 }); 126 127 writable.on('drain', resume); 128 const cleanup = eos(writable, { readable: false }, resume); 129 130 try { 131 if (writable.writableNeedDrain) { 132 await wait(); 133 } 134 135 for await (const chunk of iterable) { 136 if (!writable.write(chunk)) { 137 await wait(); 138 } 139 } 140 141 if (end) { 142 writable.end(); 143 await wait(); 144 } 145 146 finish(); 147 } catch (err) { 148 finish(error !== err ? aggregateTwoErrors(error, err) : err); 149 } finally { 150 cleanup(); 151 writable.off('drain', resume); 152 } 153} 154 155async function pumpToWeb(readable, writable, finish, { end }) { 156 if (isTransformStream(writable)) { 157 writable = writable.writable; 158 } 159 // https://streams.spec.whatwg.org/#example-manual-write-with-backpressure 160 const writer = writable.getWriter(); 161 try { 162 for await (const chunk of readable) { 163 await writer.ready; 164 writer.write(chunk).catch(() => {}); 165 } 166 167 await writer.ready; 168 169 if (end) { 170 await writer.close(); 171 } 172 173 finish(); 174 } catch (err) { 175 try { 176 await writer.abort(err); 177 finish(err); 178 } catch (err) { 179 finish(err); 180 } 181 } 182} 183 184function pipeline(...streams) { 185 return pipelineImpl(streams, once(popCallback(streams))); 186} 187 188function pipelineImpl(streams, callback, opts) { 189 if (streams.length === 1 && ArrayIsArray(streams[0])) { 190 streams = streams[0]; 191 } 192 193 if (streams.length < 2) { 194 throw new ERR_MISSING_ARGS('streams'); 195 } 196 197 const ac = new AbortController(); 198 const signal = ac.signal; 199 const outerSignal = opts?.signal; 200 201 // Need to cleanup event listeners if last stream is readable 202 // https://github.com/nodejs/node/issues/35452 203 const lastStreamCleanup = []; 204 205 validateAbortSignal(outerSignal, 'options.signal'); 206 207 function abort() { 208 finishImpl(new AbortError()); 209 } 210 211 addAbortListener ??= require('events').addAbortListener; 212 let disposable; 213 if (outerSignal) { 214 disposable = addAbortListener(outerSignal, abort); 215 } 216 217 let error; 218 let value; 219 const destroys = []; 220 221 let finishCount = 0; 222 223 function finish(err) { 224 finishImpl(err, --finishCount === 0); 225 } 226 227 function finishImpl(err, final) { 228 if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) { 229 error = err; 230 } 231 232 if (!error && !final) { 233 return; 234 } 235 236 while (destroys.length) { 237 destroys.shift()(error); 238 } 239 240 disposable?.[SymbolDispose](); 241 ac.abort(); 242 243 if (final) { 244 if (!error) { 245 lastStreamCleanup.forEach((fn) => fn()); 246 } 247 process.nextTick(callback, error, value); 248 } 249 } 250 251 let ret; 252 for (let i = 0; i < streams.length; i++) { 253 const stream = streams[i]; 254 const reading = i < streams.length - 1; 255 const writing = i > 0; 256 const end = reading || opts?.end !== false; 257 const isLastStream = i === streams.length - 1; 258 259 if (isNodeStream(stream)) { 260 if (end) { 261 const { destroy, cleanup } = destroyer(stream, reading, writing); 262 destroys.push(destroy); 263 264 if (isReadable(stream) && isLastStream) { 265 lastStreamCleanup.push(cleanup); 266 } 267 } 268 269 // Catch stream errors that occur after pipe/pump has completed. 270 function onError(err) { 271 if ( 272 err && 273 err.name !== 'AbortError' && 274 err.code !== 'ERR_STREAM_PREMATURE_CLOSE' 275 ) { 276 finish(err); 277 } 278 } 279 stream.on('error', onError); 280 if (isReadable(stream) && isLastStream) { 281 lastStreamCleanup.push(() => { 282 stream.removeListener('error', onError); 283 }); 284 } 285 } 286 287 if (i === 0) { 288 if (typeof stream === 'function') { 289 ret = stream({ signal }); 290 if (!isIterable(ret)) { 291 throw new ERR_INVALID_RETURN_VALUE( 292 'Iterable, AsyncIterable or Stream', 'source', ret); 293 } 294 } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) { 295 ret = stream; 296 } else { 297 ret = Duplex.from(stream); 298 } 299 } else if (typeof stream === 'function') { 300 if (isTransformStream(ret)) { 301 ret = makeAsyncIterable(ret?.readable); 302 } else { 303 ret = makeAsyncIterable(ret); 304 } 305 ret = stream(ret, { signal }); 306 307 if (reading) { 308 if (!isIterable(ret, true)) { 309 throw new ERR_INVALID_RETURN_VALUE( 310 'AsyncIterable', `transform[${i - 1}]`, ret); 311 } 312 } else { 313 if (!PassThrough) { 314 PassThrough = require('internal/streams/passthrough'); 315 } 316 317 // If the last argument to pipeline is not a stream 318 // we must create a proxy stream so that pipeline(...) 319 // always returns a stream which can be further 320 // composed through `.pipe(stream)`. 321 322 const pt = new PassThrough({ 323 objectMode: true, 324 }); 325 326 // Handle Promises/A+ spec, `then` could be a getter that throws on 327 // second use. 328 const then = ret?.then; 329 if (typeof then === 'function') { 330 finishCount++; 331 then.call(ret, 332 (val) => { 333 value = val; 334 if (val != null) { 335 pt.write(val); 336 } 337 if (end) { 338 pt.end(); 339 } 340 process.nextTick(finish); 341 }, (err) => { 342 pt.destroy(err); 343 process.nextTick(finish, err); 344 }, 345 ); 346 } else if (isIterable(ret, true)) { 347 finishCount++; 348 pumpToNode(ret, pt, finish, { end }); 349 } else if (isReadableStream(ret) || isTransformStream(ret)) { 350 const toRead = ret.readable || ret; 351 finishCount++; 352 pumpToNode(toRead, pt, finish, { end }); 353 } else { 354 throw new ERR_INVALID_RETURN_VALUE( 355 'AsyncIterable or Promise', 'destination', ret); 356 } 357 358 ret = pt; 359 360 const { destroy, cleanup } = destroyer(ret, false, true); 361 destroys.push(destroy); 362 if (isLastStream) { 363 lastStreamCleanup.push(cleanup); 364 } 365 } 366 } else if (isNodeStream(stream)) { 367 if (isReadableNodeStream(ret)) { 368 finishCount += 2; 369 const cleanup = pipe(ret, stream, finish, { end }); 370 if (isReadable(stream) && isLastStream) { 371 lastStreamCleanup.push(cleanup); 372 } 373 } else if (isTransformStream(ret) || isReadableStream(ret)) { 374 const toRead = ret.readable || ret; 375 finishCount++; 376 pumpToNode(toRead, stream, finish, { end }); 377 } else if (isIterable(ret)) { 378 finishCount++; 379 pumpToNode(ret, stream, finish, { end }); 380 } else { 381 throw new ERR_INVALID_ARG_TYPE( 382 'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret); 383 } 384 ret = stream; 385 } else if (isWebStream(stream)) { 386 if (isReadableNodeStream(ret)) { 387 finishCount++; 388 pumpToWeb(makeAsyncIterable(ret), stream, finish, { end }); 389 } else if (isReadableStream(ret) || isIterable(ret)) { 390 finishCount++; 391 pumpToWeb(ret, stream, finish, { end }); 392 } else if (isTransformStream(ret)) { 393 finishCount++; 394 pumpToWeb(ret.readable, stream, finish, { end }); 395 } else { 396 throw new ERR_INVALID_ARG_TYPE( 397 'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret); 398 } 399 ret = stream; 400 } else { 401 ret = Duplex.from(stream); 402 } 403 } 404 405 if (signal?.aborted || outerSignal?.aborted) { 406 process.nextTick(abort); 407 } 408 409 return ret; 410} 411 412function pipe(src, dst, finish, { end }) { 413 let ended = false; 414 dst.on('close', () => { 415 if (!ended) { 416 // Finish if the destination closes before the source has completed. 417 finish(new ERR_STREAM_PREMATURE_CLOSE()); 418 } 419 }); 420 421 src.pipe(dst, { end: false }); // If end is true we already will have a listener to end dst. 422 423 if (end) { 424 // Compat. Before node v10.12.0 stdio used to throw an error so 425 // pipe() did/does not end() stdio destinations. 426 // Now they allow it but "secretly" don't close the underlying fd. 427 428 function endFn() { 429 ended = true; 430 dst.end(); 431 } 432 433 if (isReadableFinished(src)) { // End the destination if the source has already ended. 434 process.nextTick(endFn); 435 } else { 436 src.once('end', endFn); 437 } 438 } else { 439 finish(); 440 } 441 442 eos(src, { readable: true, writable: false }, (err) => { 443 const rState = src._readableState; 444 if ( 445 err && 446 err.code === 'ERR_STREAM_PREMATURE_CLOSE' && 447 (rState && rState.ended && !rState.errored && !rState.errorEmitted) 448 ) { 449 // Some readable streams will emit 'close' before 'end'. However, since 450 // this is on the readable side 'end' should still be emitted if the 451 // stream has been ended and no error emitted. This should be allowed in 452 // favor of backwards compatibility. Since the stream is piped to a 453 // destination this should not result in any observable difference. 454 // We don't need to check if this is a writable premature close since 455 // eos will only fail with premature close on the reading side for 456 // duplex streams. 457 src 458 .once('end', finish) 459 .once('error', finish); 460 } else { 461 finish(err); 462 } 463 }); 464 return eos(dst, { readable: false, writable: true }, finish); 465} 466 467module.exports = { pipelineImpl, pipeline }; 468