• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Ported from https://github.com/mafintosh/pump with
2// permission from the author, Mathias Buus (@mafintosh).
3
4'use strict';
5
6const {
7  ArrayIsArray,
8  ReflectApply,
9  SymbolAsyncIterator,
10} = primordials;
11
12let eos;
13
14const { once } = require('internal/util');
15const destroyImpl = require('internal/streams/destroy');
16const {
17  ERR_INVALID_ARG_TYPE,
18  ERR_INVALID_RETURN_VALUE,
19  ERR_INVALID_CALLBACK,
20  ERR_MISSING_ARGS,
21  ERR_STREAM_DESTROYED
22} = require('internal/errors').codes;
23
24const {
25  isIterable,
26  isReadable,
27  isStream,
28} = require('internal/streams/utils');
29
30let EE;
31let PassThrough;
32let Readable;
33
34function destroyer(stream, reading, writing, callback) {
35  callback = once(callback);
36
37  let finished = false;
38  stream.on('close', () => {
39    finished = true;
40  });
41
42  if (eos === undefined) eos = require('internal/streams/end-of-stream');
43  eos(stream, { readable: reading, writable: writing }, (err) => {
44    finished = !err;
45
46    const rState = stream._readableState;
47    if (
48      err &&
49      err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
50      reading &&
51      (rState && rState.ended && !rState.errored && !rState.errorEmitted)
52    ) {
53      // Some readable streams will emit 'close' before 'end'. However, since
54      // this is on the readable side 'end' should still be emitted if the
55      // stream has been ended and no error emitted. This should be allowed in
56      // favor of backwards compatibility. Since the stream is piped to a
57      // destination this should not result in any observable difference.
58      // We don't need to check if this is a writable premature close since
59      // eos will only fail with premature close on the reading side for
60      // duplex streams.
61      stream
62        .once('end', callback)
63        .once('error', callback);
64    } else {
65      callback(err);
66    }
67  });
68
69  return (err) => {
70    if (finished) return;
71    finished = true;
72    destroyImpl.destroyer(stream, err);
73    callback(err || new ERR_STREAM_DESTROYED('pipe'));
74  };
75}
76
77function popCallback(streams) {
78  // Streams should never be an empty array. It should always contain at least
79  // a single stream. Therefore optimize for the average case instead of
80  // checking for length === 0 as well.
81  if (typeof streams[streams.length - 1] !== 'function')
82    throw new ERR_INVALID_CALLBACK(streams[streams.length - 1]);
83  return streams.pop();
84}
85
86function makeAsyncIterable(val) {
87  if (isIterable(val)) {
88    return val;
89  } else if (isReadable(val)) {
90    // Legacy streams are not Iterable.
91    return fromReadable(val);
92  }
93  throw new ERR_INVALID_ARG_TYPE(
94    'val', ['Readable', 'Iterable', 'AsyncIterable'], val);
95}
96
97async function* fromReadable(val) {
98  if (!Readable) {
99    Readable = require('_stream_readable');
100  }
101  yield* Readable.prototype[SymbolAsyncIterator].call(val);
102}
103
104async function pump(iterable, writable, finish) {
105  if (!EE) {
106    EE = require('events');
107  }
108  let error;
109  try {
110    if (writable.writableNeedDrain === true) {
111      await EE.once(writable, 'drain');
112    }
113
114    for await (const chunk of iterable) {
115      if (!writable.write(chunk)) {
116        if (writable.destroyed) return;
117        await EE.once(writable, 'drain');
118      }
119    }
120    writable.end();
121  } catch (err) {
122    error = err;
123  } finally {
124    finish(error);
125  }
126}
127
128function pipeline(...streams) {
129  const callback = once(popCallback(streams));
130
131  // stream.pipeline(streams, callback)
132  if (ArrayIsArray(streams[0]) && streams.length === 1) {
133    streams = streams[0];
134  }
135
136  if (streams.length < 2) {
137    throw new ERR_MISSING_ARGS('streams');
138  }
139
140  let error;
141  let value;
142  const destroys = [];
143
144  let finishCount = 0;
145
146  function finish(err) {
147    const final = --finishCount === 0;
148
149    if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
150      error = err;
151    }
152
153    if (!error && !final) {
154      return;
155    }
156
157    while (destroys.length) {
158      destroys.shift()(error);
159    }
160
161    if (final) {
162      callback(error, value);
163    }
164  }
165
166  let ret;
167  for (let i = 0; i < streams.length; i++) {
168    const stream = streams[i];
169    const reading = i < streams.length - 1;
170    const writing = i > 0;
171
172    if (isStream(stream)) {
173      finishCount++;
174      destroys.push(destroyer(stream, reading, writing, finish));
175    }
176
177    if (i === 0) {
178      if (typeof stream === 'function') {
179        ret = stream();
180        if (!isIterable(ret)) {
181          throw new ERR_INVALID_RETURN_VALUE(
182            'Iterable, AsyncIterable or Stream', 'source', ret);
183        }
184      } else if (isIterable(stream) || isReadable(stream)) {
185        ret = stream;
186      } else {
187        throw new ERR_INVALID_ARG_TYPE(
188          'source', ['Stream', 'Iterable', 'AsyncIterable', 'Function'],
189          stream);
190      }
191    } else if (typeof stream === 'function') {
192      ret = makeAsyncIterable(ret);
193      ret = stream(ret);
194
195      if (reading) {
196        if (!isIterable(ret, true)) {
197          throw new ERR_INVALID_RETURN_VALUE(
198            'AsyncIterable', `transform[${i - 1}]`, ret);
199        }
200      } else {
201        if (!PassThrough) {
202          PassThrough = require('internal/streams/passthrough');
203        }
204
205        // If the last argument to pipeline is not a stream
206        // we must create a proxy stream so that pipeline(...)
207        // always returns a stream which can be further
208        // composed through `.pipe(stream)`.
209
210        const pt = new PassThrough({
211          objectMode: true
212        });
213
214        // Handle Promises/A+ spec, `then` could be a getter that throws on
215        // second use.
216        const then = ret?.then;
217        if (typeof then === 'function') {
218          ReflectApply(then, ret, [
219            (val) => {
220              value = val;
221              pt.end(val);
222            }, (err) => {
223              pt.destroy(err);
224            },
225          ]);
226        } else if (isIterable(ret, true)) {
227          finishCount++;
228          pump(ret, pt, finish);
229        } else {
230          throw new ERR_INVALID_RETURN_VALUE(
231            'AsyncIterable or Promise', 'destination', ret);
232        }
233
234        ret = pt;
235
236        finishCount++;
237        destroys.push(destroyer(ret, false, true, finish));
238      }
239    } else if (isStream(stream)) {
240      if (isReadable(ret)) {
241        ret.pipe(stream);
242
243        // Compat. Before node v10.12.0 stdio used to throw an error so
244        // pipe() did/does not end() stdio destinations.
245        // Now they allow it but "secretly" don't close the underlying fd.
246        if (stream === process.stdout || stream === process.stderr) {
247          ret.on('end', () => stream.end());
248        }
249      } else {
250        ret = makeAsyncIterable(ret);
251
252        finishCount++;
253        pump(ret, stream, finish);
254      }
255      ret = stream;
256    } else {
257      const name = reading ? `transform[${i - 1}]` : 'destination';
258      throw new ERR_INVALID_ARG_TYPE(
259        name, ['Stream', 'Function'], stream);
260    }
261  }
262
263  // TODO(ronag): Consider returning a Duplex proxy if the first argument
264  // is a writable. Would improve composability.
265  // See, https://github.com/nodejs/node/issues/32020
266  return ret;
267}
268
269module.exports = pipeline;
270