1// Ported from https://github.com/mafintosh/pump with 2// permission from the author, Mathias Buus (@mafintosh). 3 4'use strict'; 5 6const { 7 ArrayIsArray, 8} = primordials; 9 10let eos; 11 12const { once } = require('internal/util'); 13const { 14 ERR_INVALID_CALLBACK, 15 ERR_MISSING_ARGS, 16 ERR_STREAM_DESTROYED 17} = require('internal/errors').codes; 18 19function isRequest(stream) { 20 return stream && stream.setHeader && typeof stream.abort === 'function'; 21} 22 23function destroyer(stream, reading, writing, callback) { 24 callback = once(callback); 25 26 let closed = false; 27 stream.on('close', () => { 28 closed = true; 29 }); 30 31 if (eos === undefined) eos = require('internal/streams/end-of-stream'); 32 eos(stream, { readable: reading, writable: writing }, (err) => { 33 if (err) return callback(err); 34 closed = true; 35 callback(); 36 }); 37 38 let destroyed = false; 39 return (err) => { 40 if (closed) return; 41 if (destroyed) return; 42 destroyed = true; 43 44 // request.destroy just do .end - .abort is what we want 45 if (isRequest(stream)) return stream.abort(); 46 if (isRequest(stream.req)) return stream.req.abort(); 47 if (typeof stream.destroy === 'function') return stream.destroy(err); 48 49 callback(err || new ERR_STREAM_DESTROYED('pipe')); 50 }; 51} 52 53function pipe(from, to) { 54 return from.pipe(to); 55} 56 57function popCallback(streams) { 58 // Streams should never be an empty array. It should always contain at least 59 // a single stream. Therefore optimize for the average case instead of 60 // checking for length === 0 as well. 61 if (typeof streams[streams.length - 1] !== 'function') 62 throw new ERR_INVALID_CALLBACK(streams[streams.length - 1]); 63 return streams.pop(); 64} 65 66function pipeline(...streams) { 67 const callback = popCallback(streams); 68 69 if (ArrayIsArray(streams[0])) streams = streams[0]; 70 71 if (streams.length < 2) { 72 throw new ERR_MISSING_ARGS('streams'); 73 } 74 75 let error; 76 const destroys = streams.map(function(stream, i) { 77 const reading = i < streams.length - 1; 78 const writing = i > 0; 79 return destroyer(stream, reading, writing, function(err) { 80 if (!error) error = err; 81 if (err) { 82 for (const destroy of destroys) { 83 destroy(err); 84 } 85 } 86 if (reading) return; 87 for (const destroy of destroys) { 88 destroy(); 89 } 90 callback(error); 91 }); 92 }); 93 94 return streams.reduce(pipe); 95} 96 97module.exports = pipeline; 98