1// Ported from https://github.com/mafintosh/pump with 2// permission from the author, Mathias Buus (@mafintosh). 3 4'use strict'; 5 6const { 7 ArrayIsArray, 8 ReflectApply, 9 SymbolAsyncIterator, 10} = primordials; 11 12let eos; 13 14const { once } = require('internal/util'); 15const destroyImpl = require('internal/streams/destroy'); 16const { 17 ERR_INVALID_ARG_TYPE, 18 ERR_INVALID_RETURN_VALUE, 19 ERR_INVALID_CALLBACK, 20 ERR_MISSING_ARGS, 21 ERR_STREAM_DESTROYED 22} = require('internal/errors').codes; 23 24const { 25 isIterable, 26 isReadable, 27 isStream, 28} = require('internal/streams/utils'); 29 30let EE; 31let PassThrough; 32let Readable; 33 34function destroyer(stream, reading, writing, callback) { 35 callback = once(callback); 36 37 let finished = false; 38 stream.on('close', () => { 39 finished = true; 40 }); 41 42 if (eos === undefined) eos = require('internal/streams/end-of-stream'); 43 eos(stream, { readable: reading, writable: writing }, (err) => { 44 finished = !err; 45 46 const rState = stream._readableState; 47 if ( 48 err && 49 err.code === 'ERR_STREAM_PREMATURE_CLOSE' && 50 reading && 51 (rState && rState.ended && !rState.errored && !rState.errorEmitted) 52 ) { 53 // Some readable streams will emit 'close' before 'end'. However, since 54 // this is on the readable side 'end' should still be emitted if the 55 // stream has been ended and no error emitted. This should be allowed in 56 // favor of backwards compatibility. Since the stream is piped to a 57 // destination this should not result in any observable difference. 58 // We don't need to check if this is a writable premature close since 59 // eos will only fail with premature close on the reading side for 60 // duplex streams. 61 stream 62 .once('end', callback) 63 .once('error', callback); 64 } else { 65 callback(err); 66 } 67 }); 68 69 return (err) => { 70 if (finished) return; 71 finished = true; 72 destroyImpl.destroyer(stream, err); 73 callback(err || new ERR_STREAM_DESTROYED('pipe')); 74 }; 75} 76 77function popCallback(streams) { 78 // Streams should never be an empty array. It should always contain at least 79 // a single stream. Therefore optimize for the average case instead of 80 // checking for length === 0 as well. 81 if (typeof streams[streams.length - 1] !== 'function') 82 throw new ERR_INVALID_CALLBACK(streams[streams.length - 1]); 83 return streams.pop(); 84} 85 86function makeAsyncIterable(val) { 87 if (isIterable(val)) { 88 return val; 89 } else if (isReadable(val)) { 90 // Legacy streams are not Iterable. 91 return fromReadable(val); 92 } 93 throw new ERR_INVALID_ARG_TYPE( 94 'val', ['Readable', 'Iterable', 'AsyncIterable'], val); 95} 96 97async function* fromReadable(val) { 98 if (!Readable) { 99 Readable = require('_stream_readable'); 100 } 101 yield* Readable.prototype[SymbolAsyncIterator].call(val); 102} 103 104async function pump(iterable, writable, finish) { 105 if (!EE) { 106 EE = require('events'); 107 } 108 let error; 109 try { 110 if (writable.writableNeedDrain === true) { 111 await EE.once(writable, 'drain'); 112 } 113 114 for await (const chunk of iterable) { 115 if (!writable.write(chunk)) { 116 if (writable.destroyed) return; 117 await EE.once(writable, 'drain'); 118 } 119 } 120 writable.end(); 121 } catch (err) { 122 error = err; 123 } finally { 124 finish(error); 125 } 126} 127 128function pipeline(...streams) { 129 const callback = once(popCallback(streams)); 130 131 // stream.pipeline(streams, callback) 132 if (ArrayIsArray(streams[0]) && streams.length === 1) { 133 streams = streams[0]; 134 } 135 136 if (streams.length < 2) { 137 throw new ERR_MISSING_ARGS('streams'); 138 } 139 140 let error; 141 let value; 142 const destroys = []; 143 144 let finishCount = 0; 145 146 function finish(err) { 147 const final = --finishCount === 0; 148 149 if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) { 150 error = err; 151 } 152 153 if (!error && !final) { 154 return; 155 } 156 157 while (destroys.length) { 158 destroys.shift()(error); 159 } 160 161 if (final) { 162 callback(error, value); 163 } 164 } 165 166 let ret; 167 for (let i = 0; i < streams.length; i++) { 168 const stream = streams[i]; 169 const reading = i < streams.length - 1; 170 const writing = i > 0; 171 172 if (isStream(stream)) { 173 finishCount++; 174 destroys.push(destroyer(stream, reading, writing, finish)); 175 } 176 177 if (i === 0) { 178 if (typeof stream === 'function') { 179 ret = stream(); 180 if (!isIterable(ret)) { 181 throw new ERR_INVALID_RETURN_VALUE( 182 'Iterable, AsyncIterable or Stream', 'source', ret); 183 } 184 } else if (isIterable(stream) || isReadable(stream)) { 185 ret = stream; 186 } else { 187 throw new ERR_INVALID_ARG_TYPE( 188 'source', ['Stream', 'Iterable', 'AsyncIterable', 'Function'], 189 stream); 190 } 191 } else if (typeof stream === 'function') { 192 ret = makeAsyncIterable(ret); 193 ret = stream(ret); 194 195 if (reading) { 196 if (!isIterable(ret, true)) { 197 throw new ERR_INVALID_RETURN_VALUE( 198 'AsyncIterable', `transform[${i - 1}]`, ret); 199 } 200 } else { 201 if (!PassThrough) { 202 PassThrough = require('internal/streams/passthrough'); 203 } 204 205 // If the last argument to pipeline is not a stream 206 // we must create a proxy stream so that pipeline(...) 207 // always returns a stream which can be further 208 // composed through `.pipe(stream)`. 209 210 const pt = new PassThrough({ 211 objectMode: true 212 }); 213 214 // Handle Promises/A+ spec, `then` could be a getter that throws on 215 // second use. 216 const then = ret?.then; 217 if (typeof then === 'function') { 218 ReflectApply(then, ret, [ 219 (val) => { 220 value = val; 221 pt.end(val); 222 }, (err) => { 223 pt.destroy(err); 224 }, 225 ]); 226 } else if (isIterable(ret, true)) { 227 finishCount++; 228 pump(ret, pt, finish); 229 } else { 230 throw new ERR_INVALID_RETURN_VALUE( 231 'AsyncIterable or Promise', 'destination', ret); 232 } 233 234 ret = pt; 235 236 finishCount++; 237 destroys.push(destroyer(ret, false, true, finish)); 238 } 239 } else if (isStream(stream)) { 240 if (isReadable(ret)) { 241 ret.pipe(stream); 242 243 // Compat. Before node v10.12.0 stdio used to throw an error so 244 // pipe() did/does not end() stdio destinations. 245 // Now they allow it but "secretly" don't close the underlying fd. 246 if (stream === process.stdout || stream === process.stderr) { 247 ret.on('end', () => stream.end()); 248 } 249 } else { 250 ret = makeAsyncIterable(ret); 251 252 finishCount++; 253 pump(ret, stream, finish); 254 } 255 ret = stream; 256 } else { 257 const name = reading ? `transform[${i - 1}]` : 'destination'; 258 throw new ERR_INVALID_ARG_TYPE( 259 name, ['Stream', 'Function'], stream); 260 } 261 } 262 263 // TODO(ronag): Consider returning a Duplex proxy if the first argument 264 // is a writable. Would improve composability. 265 // See, https://github.com/nodejs/node/issues/32020 266 return ret; 267} 268 269module.exports = pipeline; 270