1const Minipass = require('minipass') 2const EE = require('events') 3const isStream = s => s && s instanceof EE && ( 4 typeof s.pipe === 'function' || // readable 5 (typeof s.write === 'function' && typeof s.end === 'function') // writable 6) 7 8const _head = Symbol('_head') 9const _tail = Symbol('_tail') 10const _linkStreams = Symbol('_linkStreams') 11const _setHead = Symbol('_setHead') 12const _setTail = Symbol('_setTail') 13const _onError = Symbol('_onError') 14const _onData = Symbol('_onData') 15const _onEnd = Symbol('_onEnd') 16const _onDrain = Symbol('_onDrain') 17const _streams = Symbol('_streams') 18class Pipeline extends Minipass { 19 constructor (opts, ...streams) { 20 if (isStream(opts)) { 21 streams.unshift(opts) 22 opts = {} 23 } 24 25 super(opts) 26 this[_streams] = [] 27 if (streams.length) 28 this.push(...streams) 29 } 30 31 [_linkStreams] (streams) { 32 // reduce takes (left,right), and we return right to make it the 33 // new left value. 34 return streams.reduce((src, dest) => { 35 src.on('error', er => dest.emit('error', er)) 36 src.pipe(dest) 37 return dest 38 }) 39 } 40 41 push (...streams) { 42 this[_streams].push(...streams) 43 if (this[_tail]) 44 streams.unshift(this[_tail]) 45 46 const linkRet = this[_linkStreams](streams) 47 48 this[_setTail](linkRet) 49 if (!this[_head]) 50 this[_setHead](streams[0]) 51 } 52 53 unshift (...streams) { 54 this[_streams].unshift(...streams) 55 if (this[_head]) 56 streams.push(this[_head]) 57 58 const linkRet = this[_linkStreams](streams) 59 this[_setHead](streams[0]) 60 if (!this[_tail]) 61 this[_setTail](linkRet) 62 } 63 64 destroy (er) { 65 // set fire to the whole thing. 66 this[_streams].forEach(s => 67 typeof s.destroy === 'function' && s.destroy()) 68 return super.destroy(er) 69 } 70 71 // readable interface -> tail 72 [_setTail] (stream) { 73 this[_tail] = stream 74 stream.on('error', er => this[_onError](stream, er)) 75 stream.on('data', chunk => this[_onData](stream, chunk)) 76 stream.on('end', () => this[_onEnd](stream)) 77 stream.on('finish', () => this[_onEnd](stream)) 78 } 79 80 // errors proxied down the pipeline 81 // they're considered part of the "read" interface 82 [_onError] (stream, er) { 83 if (stream === this[_tail]) 84 this.emit('error', er) 85 } 86 [_onData] (stream, chunk) { 87 if (stream === this[_tail]) 88 super.write(chunk) 89 } 90 [_onEnd] (stream) { 91 if (stream === this[_tail]) 92 super.end() 93 } 94 pause () { 95 super.pause() 96 return this[_tail] && this[_tail].pause && this[_tail].pause() 97 } 98 99 // NB: Minipass calls its internal private [RESUME] method during 100 // pipe drains, to avoid hazards where stream.resume() is overridden. 101 // Thus, we need to listen to the resume *event*, not override the 102 // resume() method, and proxy *that* to the tail. 103 emit (ev, ...args) { 104 if (ev === 'resume' && this[_tail] && this[_tail].resume) 105 this[_tail].resume() 106 return super.emit(ev, ...args) 107 } 108 109 // writable interface -> head 110 [_setHead] (stream) { 111 this[_head] = stream 112 stream.on('drain', () => this[_onDrain](stream)) 113 } 114 [_onDrain] (stream) { 115 if (stream === this[_head]) 116 this.emit('drain') 117 } 118 write (chunk, enc, cb) { 119 return this[_head].write(chunk, enc, cb) && 120 (this.flowing || this.buffer.length === 0) 121 } 122 end (chunk, enc, cb) { 123 this[_head].end(chunk, enc, cb) 124 return this 125 } 126} 127 128module.exports = Pipeline 129