• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1const Minipass = require('minipass')
2const EE = require('events')
3const isStream = s => s && s instanceof EE && (
4  typeof s.pipe === 'function' || // readable
5  (typeof s.write === 'function' && typeof s.end === 'function') // writable
6)
7
8const _head = Symbol('_head')
9const _tail = Symbol('_tail')
10const _linkStreams = Symbol('_linkStreams')
11const _setHead = Symbol('_setHead')
12const _setTail = Symbol('_setTail')
13const _onError = Symbol('_onError')
14const _onData = Symbol('_onData')
15const _onEnd = Symbol('_onEnd')
16const _onDrain = Symbol('_onDrain')
17const _streams = Symbol('_streams')
18class Pipeline extends Minipass {
19  constructor (opts, ...streams) {
20    if (isStream(opts)) {
21      streams.unshift(opts)
22      opts = {}
23    }
24
25    super(opts)
26    this[_streams] = []
27    if (streams.length)
28      this.push(...streams)
29  }
30
31  [_linkStreams] (streams) {
32    // reduce takes (left,right), and we return right to make it the
33    // new left value.
34    return streams.reduce((src, dest) => {
35      src.on('error', er => dest.emit('error', er))
36      src.pipe(dest)
37      return dest
38    })
39  }
40
41  push (...streams) {
42    this[_streams].push(...streams)
43    if (this[_tail])
44      streams.unshift(this[_tail])
45
46    const linkRet = this[_linkStreams](streams)
47
48    this[_setTail](linkRet)
49    if (!this[_head])
50      this[_setHead](streams[0])
51  }
52
53  unshift (...streams) {
54    this[_streams].unshift(...streams)
55    if (this[_head])
56      streams.push(this[_head])
57
58    const linkRet = this[_linkStreams](streams)
59    this[_setHead](streams[0])
60    if (!this[_tail])
61      this[_setTail](linkRet)
62  }
63
64  destroy (er) {
65    // set fire to the whole thing.
66    this[_streams].forEach(s =>
67      typeof s.destroy === 'function' && s.destroy())
68    return super.destroy(er)
69  }
70
71  // readable interface -> tail
72  [_setTail] (stream) {
73    this[_tail] = stream
74    stream.on('error', er => this[_onError](stream, er))
75    stream.on('data', chunk => this[_onData](stream, chunk))
76    stream.on('end', () => this[_onEnd](stream))
77    stream.on('finish', () => this[_onEnd](stream))
78  }
79
80  // errors proxied down the pipeline
81  // they're considered part of the "read" interface
82  [_onError] (stream, er) {
83    if (stream === this[_tail])
84      this.emit('error', er)
85  }
86  [_onData] (stream, chunk) {
87    if (stream === this[_tail])
88      super.write(chunk)
89  }
90  [_onEnd] (stream) {
91    if (stream === this[_tail])
92      super.end()
93  }
94  pause () {
95    super.pause()
96    return this[_tail] && this[_tail].pause && this[_tail].pause()
97  }
98
99  // NB: Minipass calls its internal private [RESUME] method during
100  // pipe drains, to avoid hazards where stream.resume() is overridden.
101  // Thus, we need to listen to the resume *event*, not override the
102  // resume() method, and proxy *that* to the tail.
103  emit (ev, ...args) {
104    if (ev === 'resume' && this[_tail] && this[_tail].resume)
105      this[_tail].resume()
106    return super.emit(ev, ...args)
107  }
108
109  // writable interface -> head
110  [_setHead] (stream) {
111    this[_head] = stream
112    stream.on('drain', () => this[_onDrain](stream))
113  }
114  [_onDrain] (stream) {
115    if (stream === this[_head])
116      this.emit('drain')
117  }
118  write (chunk, enc, cb) {
119    return this[_head].write(chunk, enc, cb) &&
120      (this.flowing || this.buffer.length === 0)
121  }
122  end (chunk, enc, cb) {
123    this[_head].end(chunk, enc, cb)
124    return this
125  }
126}
127
128module.exports = Pipeline
129