• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Ported from https://github.com/mafintosh/pump with
2// permission from the author, Mathias Buus (@mafintosh).
3
4'use strict';
5
6const {
7  ArrayIsArray,
8  Promise,
9  SymbolAsyncIterator,
10  SymbolDispose,
11} = primordials;
12
13const eos = require('internal/streams/end-of-stream');
14const { once } = require('internal/util');
15const destroyImpl = require('internal/streams/destroy');
16const Duplex = require('internal/streams/duplex');
17const {
18  aggregateTwoErrors,
19  codes: {
20    ERR_INVALID_ARG_TYPE,
21    ERR_INVALID_RETURN_VALUE,
22    ERR_MISSING_ARGS,
23    ERR_STREAM_DESTROYED,
24    ERR_STREAM_PREMATURE_CLOSE,
25  },
26  AbortError,
27} = require('internal/errors');
28
29const {
30  validateFunction,
31  validateAbortSignal,
32} = require('internal/validators');
33
34const {
35  isIterable,
36  isReadable,
37  isReadableNodeStream,
38  isNodeStream,
39  isTransformStream,
40  isWebStream,
41  isReadableStream,
42  isReadableFinished,
43} = require('internal/streams/utils');
44const { AbortController } = require('internal/abort_controller');
45
46let PassThrough;
47let Readable;
48let addAbortListener;
49
50function destroyer(stream, reading, writing) {
51  let finished = false;
52  stream.on('close', () => {
53    finished = true;
54  });
55
56  const cleanup = eos(stream, { readable: reading, writable: writing }, (err) => {
57    finished = !err;
58  });
59
60  return {
61    destroy: (err) => {
62      if (finished) return;
63      finished = true;
64      destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
65    },
66    cleanup,
67  };
68}
69
70function popCallback(streams) {
71  // Streams should never be an empty array. It should always contain at least
72  // a single stream. Therefore optimize for the average case instead of
73  // checking for length === 0 as well.
74  validateFunction(streams[streams.length - 1], 'streams[stream.length - 1]');
75  return streams.pop();
76}
77
78function makeAsyncIterable(val) {
79  if (isIterable(val)) {
80    return val;
81  } else if (isReadableNodeStream(val)) {
82    // Legacy streams are not Iterable.
83    return fromReadable(val);
84  }
85  throw new ERR_INVALID_ARG_TYPE(
86    'val', ['Readable', 'Iterable', 'AsyncIterable'], val);
87}
88
89async function* fromReadable(val) {
90  if (!Readable) {
91    Readable = require('internal/streams/readable');
92  }
93
94  yield* Readable.prototype[SymbolAsyncIterator].call(val);
95}
96
97async function pumpToNode(iterable, writable, finish, { end }) {
98  let error;
99  let onresolve = null;
100
101  const resume = (err) => {
102    if (err) {
103      error = err;
104    }
105
106    if (onresolve) {
107      const callback = onresolve;
108      onresolve = null;
109      callback();
110    }
111  };
112
113  const wait = () => new Promise((resolve, reject) => {
114    if (error) {
115      reject(error);
116    } else {
117      onresolve = () => {
118        if (error) {
119          reject(error);
120        } else {
121          resolve();
122        }
123      };
124    }
125  });
126
127  writable.on('drain', resume);
128  const cleanup = eos(writable, { readable: false }, resume);
129
130  try {
131    if (writable.writableNeedDrain) {
132      await wait();
133    }
134
135    for await (const chunk of iterable) {
136      if (!writable.write(chunk)) {
137        await wait();
138      }
139    }
140
141    if (end) {
142      writable.end();
143      await wait();
144    }
145
146    finish();
147  } catch (err) {
148    finish(error !== err ? aggregateTwoErrors(error, err) : err);
149  } finally {
150    cleanup();
151    writable.off('drain', resume);
152  }
153}
154
155async function pumpToWeb(readable, writable, finish, { end }) {
156  if (isTransformStream(writable)) {
157    writable = writable.writable;
158  }
159  // https://streams.spec.whatwg.org/#example-manual-write-with-backpressure
160  const writer = writable.getWriter();
161  try {
162    for await (const chunk of readable) {
163      await writer.ready;
164      writer.write(chunk).catch(() => {});
165    }
166
167    await writer.ready;
168
169    if (end) {
170      await writer.close();
171    }
172
173    finish();
174  } catch (err) {
175    try {
176      await writer.abort(err);
177      finish(err);
178    } catch (err) {
179      finish(err);
180    }
181  }
182}
183
184function pipeline(...streams) {
185  return pipelineImpl(streams, once(popCallback(streams)));
186}
187
188function pipelineImpl(streams, callback, opts) {
189  if (streams.length === 1 && ArrayIsArray(streams[0])) {
190    streams = streams[0];
191  }
192
193  if (streams.length < 2) {
194    throw new ERR_MISSING_ARGS('streams');
195  }
196
197  const ac = new AbortController();
198  const signal = ac.signal;
199  const outerSignal = opts?.signal;
200
201  // Need to cleanup event listeners if last stream is readable
202  // https://github.com/nodejs/node/issues/35452
203  const lastStreamCleanup = [];
204
205  validateAbortSignal(outerSignal, 'options.signal');
206
207  function abort() {
208    finishImpl(new AbortError());
209  }
210
211  addAbortListener ??= require('events').addAbortListener;
212  let disposable;
213  if (outerSignal) {
214    disposable = addAbortListener(outerSignal, abort);
215  }
216
217  let error;
218  let value;
219  const destroys = [];
220
221  let finishCount = 0;
222
223  function finish(err) {
224    finishImpl(err, --finishCount === 0);
225  }
226
227  function finishImpl(err, final) {
228    if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
229      error = err;
230    }
231
232    if (!error && !final) {
233      return;
234    }
235
236    while (destroys.length) {
237      destroys.shift()(error);
238    }
239
240    disposable?.[SymbolDispose]();
241    ac.abort();
242
243    if (final) {
244      if (!error) {
245        lastStreamCleanup.forEach((fn) => fn());
246      }
247      process.nextTick(callback, error, value);
248    }
249  }
250
251  let ret;
252  for (let i = 0; i < streams.length; i++) {
253    const stream = streams[i];
254    const reading = i < streams.length - 1;
255    const writing = i > 0;
256    const end = reading || opts?.end !== false;
257    const isLastStream = i === streams.length - 1;
258
259    if (isNodeStream(stream)) {
260      if (end) {
261        const { destroy, cleanup } = destroyer(stream, reading, writing);
262        destroys.push(destroy);
263
264        if (isReadable(stream) && isLastStream) {
265          lastStreamCleanup.push(cleanup);
266        }
267      }
268
269      // Catch stream errors that occur after pipe/pump has completed.
270      function onError(err) {
271        if (
272          err &&
273          err.name !== 'AbortError' &&
274          err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
275        ) {
276          finish(err);
277        }
278      }
279      stream.on('error', onError);
280      if (isReadable(stream) && isLastStream) {
281        lastStreamCleanup.push(() => {
282          stream.removeListener('error', onError);
283        });
284      }
285    }
286
287    if (i === 0) {
288      if (typeof stream === 'function') {
289        ret = stream({ signal });
290        if (!isIterable(ret)) {
291          throw new ERR_INVALID_RETURN_VALUE(
292            'Iterable, AsyncIterable or Stream', 'source', ret);
293        }
294      } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
295        ret = stream;
296      } else {
297        ret = Duplex.from(stream);
298      }
299    } else if (typeof stream === 'function') {
300      if (isTransformStream(ret)) {
301        ret = makeAsyncIterable(ret?.readable);
302      } else {
303        ret = makeAsyncIterable(ret);
304      }
305      ret = stream(ret, { signal });
306
307      if (reading) {
308        if (!isIterable(ret, true)) {
309          throw new ERR_INVALID_RETURN_VALUE(
310            'AsyncIterable', `transform[${i - 1}]`, ret);
311        }
312      } else {
313        if (!PassThrough) {
314          PassThrough = require('internal/streams/passthrough');
315        }
316
317        // If the last argument to pipeline is not a stream
318        // we must create a proxy stream so that pipeline(...)
319        // always returns a stream which can be further
320        // composed through `.pipe(stream)`.
321
322        const pt = new PassThrough({
323          objectMode: true,
324        });
325
326        // Handle Promises/A+ spec, `then` could be a getter that throws on
327        // second use.
328        const then = ret?.then;
329        if (typeof then === 'function') {
330          finishCount++;
331          then.call(ret,
332                    (val) => {
333                      value = val;
334                      if (val != null) {
335                        pt.write(val);
336                      }
337                      if (end) {
338                        pt.end();
339                      }
340                      process.nextTick(finish);
341                    }, (err) => {
342                      pt.destroy(err);
343                      process.nextTick(finish, err);
344                    },
345          );
346        } else if (isIterable(ret, true)) {
347          finishCount++;
348          pumpToNode(ret, pt, finish, { end });
349        } else if (isReadableStream(ret) || isTransformStream(ret)) {
350          const toRead = ret.readable || ret;
351          finishCount++;
352          pumpToNode(toRead, pt, finish, { end });
353        } else {
354          throw new ERR_INVALID_RETURN_VALUE(
355            'AsyncIterable or Promise', 'destination', ret);
356        }
357
358        ret = pt;
359
360        const { destroy, cleanup } = destroyer(ret, false, true);
361        destroys.push(destroy);
362        if (isLastStream) {
363          lastStreamCleanup.push(cleanup);
364        }
365      }
366    } else if (isNodeStream(stream)) {
367      if (isReadableNodeStream(ret)) {
368        finishCount += 2;
369        const cleanup = pipe(ret, stream, finish, { end });
370        if (isReadable(stream) && isLastStream) {
371          lastStreamCleanup.push(cleanup);
372        }
373      } else if (isTransformStream(ret) || isReadableStream(ret)) {
374        const toRead = ret.readable || ret;
375        finishCount++;
376        pumpToNode(toRead, stream, finish, { end });
377      } else if (isIterable(ret)) {
378        finishCount++;
379        pumpToNode(ret, stream, finish, { end });
380      } else {
381        throw new ERR_INVALID_ARG_TYPE(
382          'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
383      }
384      ret = stream;
385    } else if (isWebStream(stream)) {
386      if (isReadableNodeStream(ret)) {
387        finishCount++;
388        pumpToWeb(makeAsyncIterable(ret), stream, finish, { end });
389      } else if (isReadableStream(ret) || isIterable(ret)) {
390        finishCount++;
391        pumpToWeb(ret, stream, finish, { end });
392      } else if (isTransformStream(ret)) {
393        finishCount++;
394        pumpToWeb(ret.readable, stream, finish, { end });
395      } else {
396        throw new ERR_INVALID_ARG_TYPE(
397          'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
398      }
399      ret = stream;
400    } else {
401      ret = Duplex.from(stream);
402    }
403  }
404
405  if (signal?.aborted || outerSignal?.aborted) {
406    process.nextTick(abort);
407  }
408
409  return ret;
410}
411
412function pipe(src, dst, finish, { end }) {
413  let ended = false;
414  dst.on('close', () => {
415    if (!ended) {
416      // Finish if the destination closes before the source has completed.
417      finish(new ERR_STREAM_PREMATURE_CLOSE());
418    }
419  });
420
421  src.pipe(dst, { end: false }); // If end is true we already will have a listener to end dst.
422
423  if (end) {
424    // Compat. Before node v10.12.0 stdio used to throw an error so
425    // pipe() did/does not end() stdio destinations.
426    // Now they allow it but "secretly" don't close the underlying fd.
427
428    function endFn() {
429      ended = true;
430      dst.end();
431    }
432
433    if (isReadableFinished(src)) { // End the destination if the source has already ended.
434      process.nextTick(endFn);
435    } else {
436      src.once('end', endFn);
437    }
438  } else {
439    finish();
440  }
441
442  eos(src, { readable: true, writable: false }, (err) => {
443    const rState = src._readableState;
444    if (
445      err &&
446      err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
447      (rState && rState.ended && !rState.errored && !rState.errorEmitted)
448    ) {
449      // Some readable streams will emit 'close' before 'end'. However, since
450      // this is on the readable side 'end' should still be emitted if the
451      // stream has been ended and no error emitted. This should be allowed in
452      // favor of backwards compatibility. Since the stream is piped to a
453      // destination this should not result in any observable difference.
454      // We don't need to check if this is a writable premature close since
455      // eos will only fail with premature close on the reading side for
456      // duplex streams.
457      src
458        .once('end', finish)
459        .once('error', finish);
460    } else {
461      finish(err);
462    }
463  });
464  return eos(dst, { readable: false, writable: true }, finish);
465}
466
467module.exports = { pipelineImpl, pipeline };
468