• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Ported from https://github.com/mafintosh/pump with
2// permission from the author, Mathias Buus (@mafintosh).
3
4'use strict';
5
6const {
7  ArrayIsArray,
8} = primordials;
9
10let eos;
11
12const { once } = require('internal/util');
13const {
14  ERR_INVALID_CALLBACK,
15  ERR_MISSING_ARGS,
16  ERR_STREAM_DESTROYED
17} = require('internal/errors').codes;
18
19function isRequest(stream) {
20  return stream && stream.setHeader && typeof stream.abort === 'function';
21}
22
23function destroyer(stream, reading, writing, callback) {
24  callback = once(callback);
25
26  let closed = false;
27  stream.on('close', () => {
28    closed = true;
29  });
30
31  if (eos === undefined) eos = require('internal/streams/end-of-stream');
32  eos(stream, { readable: reading, writable: writing }, (err) => {
33    if (err) return callback(err);
34    closed = true;
35    callback();
36  });
37
38  let destroyed = false;
39  return (err) => {
40    if (closed) return;
41    if (destroyed) return;
42    destroyed = true;
43
44    // request.destroy just do .end - .abort is what we want
45    if (isRequest(stream)) return stream.abort();
46    if (isRequest(stream.req)) return stream.req.abort();
47    if (typeof stream.destroy === 'function') return stream.destroy(err);
48
49    callback(err || new ERR_STREAM_DESTROYED('pipe'));
50  };
51}
52
53function pipe(from, to) {
54  return from.pipe(to);
55}
56
57function popCallback(streams) {
58  // Streams should never be an empty array. It should always contain at least
59  // a single stream. Therefore optimize for the average case instead of
60  // checking for length === 0 as well.
61  if (typeof streams[streams.length - 1] !== 'function')
62    throw new ERR_INVALID_CALLBACK(streams[streams.length - 1]);
63  return streams.pop();
64}
65
66function pipeline(...streams) {
67  const callback = popCallback(streams);
68
69  if (ArrayIsArray(streams[0])) streams = streams[0];
70
71  if (streams.length < 2) {
72    throw new ERR_MISSING_ARGS('streams');
73  }
74
75  let error;
76  const destroys = streams.map(function(stream, i) {
77    const reading = i < streams.length - 1;
78    const writing = i > 0;
79    return destroyer(stream, reading, writing, function(err) {
80      if (!error) error = err;
81      if (err) {
82        for (const destroy of destroys) {
83          destroy(err);
84        }
85      }
86      if (reading) return;
87      for (const destroy of destroys) {
88        destroy();
89      }
90      callback(error);
91    });
92  });
93
94  return streams.reduce(pipe);
95}
96
97module.exports = pipeline;
98