• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict'
2const EE = require('events')
3const Yallist = require('yallist')
4const SD = require('string_decoder').StringDecoder
5
6const EOF = Symbol('EOF')
7const MAYBE_EMIT_END = Symbol('maybeEmitEnd')
8const EMITTED_END = Symbol('emittedEnd')
9const EMITTING_END = Symbol('emittingEnd')
10const CLOSED = Symbol('closed')
11const READ = Symbol('read')
12const FLUSH = Symbol('flush')
13const FLUSHCHUNK = Symbol('flushChunk')
14const ENCODING = Symbol('encoding')
15const DECODER = Symbol('decoder')
16const FLOWING = Symbol('flowing')
17const PAUSED = Symbol('paused')
18const RESUME = Symbol('resume')
19const BUFFERLENGTH = Symbol('bufferLength')
20const BUFFERPUSH = Symbol('bufferPush')
21const BUFFERSHIFT = Symbol('bufferShift')
22const OBJECTMODE = Symbol('objectMode')
23const DESTROYED = Symbol('destroyed')
24
25// TODO remove when Node v8 support drops
26const doIter = global._MP_NO_ITERATOR_SYMBOLS_  !== '1'
27const ASYNCITERATOR = doIter && Symbol.asyncIterator
28  || Symbol('asyncIterator not implemented')
29const ITERATOR = doIter && Symbol.iterator
30  || Symbol('iterator not implemented')
31
32// Buffer in node 4.x < 4.5.0 doesn't have working Buffer.from
33// or Buffer.alloc, and Buffer in node 10 deprecated the ctor.
34// .M, this is fine .\^/M..
35const B = Buffer.alloc ? Buffer
36  : /* istanbul ignore next */ require('safe-buffer').Buffer
37
38// events that mean 'the stream is over'
39// these are treated specially, and re-emitted
40// if they are listened for after emitting.
41const isEndish = ev =>
42  ev === 'end' ||
43  ev === 'finish' ||
44  ev === 'prefinish'
45
46const isArrayBuffer = b => b instanceof ArrayBuffer ||
47  typeof b === 'object' &&
48  b.constructor &&
49  b.constructor.name === 'ArrayBuffer' &&
50  b.byteLength >= 0
51
52const isArrayBufferView = b => !B.isBuffer(b) && ArrayBuffer.isView(b)
53
54module.exports = class Minipass extends EE {
55  constructor (options) {
56    super()
57    this[FLOWING] = false
58    // whether we're explicitly paused
59    this[PAUSED] = false
60    this.pipes = new Yallist()
61    this.buffer = new Yallist()
62    this[OBJECTMODE] = options && options.objectMode || false
63    if (this[OBJECTMODE])
64      this[ENCODING] = null
65    else
66      this[ENCODING] = options && options.encoding || null
67    if (this[ENCODING] === 'buffer')
68      this[ENCODING] = null
69    this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null
70    this[EOF] = false
71    this[EMITTED_END] = false
72    this[EMITTING_END] = false
73    this[CLOSED] = false
74    this.writable = true
75    this.readable = true
76    this[BUFFERLENGTH] = 0
77    this[DESTROYED] = false
78  }
79
80  get bufferLength () { return this[BUFFERLENGTH] }
81
82  get encoding () { return this[ENCODING] }
83  set encoding (enc) {
84    if (this[OBJECTMODE])
85      throw new Error('cannot set encoding in objectMode')
86
87    if (this[ENCODING] && enc !== this[ENCODING] &&
88        (this[DECODER] && this[DECODER].lastNeed || this[BUFFERLENGTH]))
89      throw new Error('cannot change encoding')
90
91    if (this[ENCODING] !== enc) {
92      this[DECODER] = enc ? new SD(enc) : null
93      if (this.buffer.length)
94        this.buffer = this.buffer.map(chunk => this[DECODER].write(chunk))
95    }
96
97    this[ENCODING] = enc
98  }
99
100  setEncoding (enc) {
101    this.encoding = enc
102  }
103
104  get objectMode () { return this[OBJECTMODE] }
105  set objectMode ( ) { this[OBJECTMODE] = this[OBJECTMODE] || !!  }
106
107  write (chunk, encoding, cb) {
108    if (this[EOF])
109      throw new Error('write after end')
110
111    if (this[DESTROYED]) {
112      this.emit('error', Object.assign(
113        new Error('Cannot call write after a stream was destroyed'),
114        { code: 'ERR_STREAM_DESTROYED' }
115      ))
116      return true
117    }
118
119    if (typeof encoding === 'function')
120      cb = encoding, encoding = 'utf8'
121
122    if (!encoding)
123      encoding = 'utf8'
124
125    // convert array buffers and typed array views into buffers
126    // at some point in the future, we may want to do the opposite!
127    // leave strings and buffers as-is
128    // anything else switches us into object mode
129    if (!this[OBJECTMODE] && !B.isBuffer(chunk)) {
130      if (isArrayBufferView(chunk))
131        chunk = B.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
132      else if (isArrayBuffer(chunk))
133        chunk = B.from(chunk)
134      else if (typeof chunk !== 'string')
135        // use the setter so we throw if we have encoding set
136        this.objectMode = true
137    }
138
139    // this ensures at this point that the chunk is a buffer or string
140    // don't buffer it up or send it to the decoder
141    if (!this.objectMode && !chunk.length) {
142      const ret = this.flowing
143      if (this[BUFFERLENGTH] !== 0)
144        this.emit('readable')
145      if (cb)
146        cb()
147      return ret
148    }
149
150    // fast-path writing strings of same encoding to a stream with
151    // an empty buffer, skipping the buffer/decoder dance
152    if (typeof chunk === 'string' && !this[OBJECTMODE] &&
153        // unless it is a string already ready for us to use
154        !(encoding === this[ENCODING] && !this[DECODER].lastNeed)) {
155      chunk = B.from(chunk, encoding)
156    }
157
158    if (B.isBuffer(chunk) && this[ENCODING])
159      chunk = this[DECODER].write(chunk)
160
161    try {
162      return this.flowing
163        ? (this.emit('data', chunk), this.flowing)
164        : (this[BUFFERPUSH](chunk), false)
165    } finally {
166      if (this[BUFFERLENGTH] !== 0)
167        this.emit('readable')
168      if (cb)
169        cb()
170    }
171  }
172
173  read (n) {
174    if (this[DESTROYED])
175      return null
176
177    try {
178      if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH])
179        return null
180
181      if (this[OBJECTMODE])
182        n = null
183
184      if (this.buffer.length > 1 && !this[OBJECTMODE]) {
185        if (this.encoding)
186          this.buffer = new Yallist([
187            Array.from(this.buffer).join('')
188          ])
189        else
190          this.buffer = new Yallist([
191            B.concat(Array.from(this.buffer), this[BUFFERLENGTH])
192          ])
193      }
194
195      return this[READ](n || null, this.buffer.head.value)
196    } finally {
197      this[MAYBE_EMIT_END]()
198    }
199  }
200
201  [READ] (n, chunk) {
202    if (n === chunk.length || n === null)
203      this[BUFFERSHIFT]()
204    else {
205      this.buffer.head.value = chunk.slice(n)
206      chunk = chunk.slice(0, n)
207      this[BUFFERLENGTH] -= n
208    }
209
210    this.emit('data', chunk)
211
212    if (!this.buffer.length && !this[EOF])
213      this.emit('drain')
214
215    return chunk
216  }
217
218  end (chunk, encoding, cb) {
219    if (typeof chunk === 'function')
220      cb = chunk, chunk = null
221    if (typeof encoding === 'function')
222      cb = encoding, encoding = 'utf8'
223    if (chunk)
224      this.write(chunk, encoding)
225    if (cb)
226      this.once('end', cb)
227    this[EOF] = true
228    this.writable = false
229
230    // if we haven't written anything, then go ahead and emit,
231    // even if we're not reading.
232    // we'll re-emit if a new 'end' listener is added anyway.
233    // This makes MP more suitable to write-only use cases.
234    if (this.flowing || !this[PAUSED])
235      this[MAYBE_EMIT_END]()
236    return this
237  }
238
239  // don't let the internal resume be overwritten
240  [RESUME] () {
241    if (this[DESTROYED])
242      return
243
244    this[PAUSED] = false
245    this[FLOWING] = true
246    this.emit('resume')
247    if (this.buffer.length)
248      this[FLUSH]()
249    else if (this[EOF])
250      this[MAYBE_EMIT_END]()
251    else
252      this.emit('drain')
253  }
254
255  resume () {
256    return this[RESUME]()
257  }
258
259  pause () {
260    this[FLOWING] = false
261    this[PAUSED] = true
262  }
263
264  get destroyed () {
265    return this[DESTROYED]
266  }
267
268  get flowing () {
269    return this[FLOWING]
270  }
271
272  get paused () {
273    return this[PAUSED]
274  }
275
276  [BUFFERPUSH] (chunk) {
277    if (this[OBJECTMODE])
278      this[BUFFERLENGTH] += 1
279    else
280      this[BUFFERLENGTH] += chunk.length
281    return this.buffer.push(chunk)
282  }
283
284  [BUFFERSHIFT] () {
285    if (this.buffer.length) {
286      if (this[OBJECTMODE])
287        this[BUFFERLENGTH] -= 1
288      else
289        this[BUFFERLENGTH] -= this.buffer.head.value.length
290    }
291    return this.buffer.shift()
292  }
293
294  [FLUSH] () {
295    do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]()))
296
297    if (!this.buffer.length && !this[EOF])
298      this.emit('drain')
299  }
300
301  [FLUSHCHUNK] (chunk) {
302    return chunk ? (this.emit('data', chunk), this.flowing) : false
303  }
304
305  pipe (dest, opts) {
306    if (this[DESTROYED])
307      return
308
309    const ended = this[EMITTED_END]
310    opts = opts || {}
311    if (dest === process.stdout || dest === process.stderr)
312      opts.end = false
313    else
314      opts.end = opts.end !== false
315
316    const p = { dest: dest, opts: opts, ondrain: _ => this[RESUME]() }
317    this.pipes.push(p)
318
319    dest.on('drain', p.ondrain)
320    this[RESUME]()
321    // piping an ended stream ends immediately
322    if (ended && p.opts.end)
323      p.dest.end()
324    return dest
325  }
326
327  addListener (ev, fn) {
328    return this.on(ev, fn)
329  }
330
331  on (ev, fn) {
332    try {
333      return super.on(ev, fn)
334    } finally {
335      if (ev === 'data' && !this.pipes.length && !this.flowing)
336        this[RESUME]()
337      else if (isEndish(ev) && this[EMITTED_END]) {
338        super.emit(ev)
339        this.removeAllListeners(ev)
340      }
341    }
342  }
343
344  get emittedEnd () {
345    return this[EMITTED_END]
346  }
347
348  [MAYBE_EMIT_END] () {
349    if (!this[EMITTING_END] &&
350        !this[EMITTED_END] &&
351        !this[DESTROYED] &&
352        this.buffer.length === 0 &&
353        this[EOF]) {
354      this[EMITTING_END] = true
355      this.emit('end')
356      this.emit('prefinish')
357      this.emit('finish')
358      if (this[CLOSED])
359        this.emit('close')
360      this[EMITTING_END] = false
361    }
362  }
363
364  emit (ev, data) {
365    // error and close are only events allowed after calling destroy()
366    if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED])
367      return
368    else if (ev === 'data') {
369      if (!data)
370        return
371
372      if (this.pipes.length)
373        this.pipes.forEach(p =>
374          p.dest.write(data) === false && this.pause())
375    } else if (ev === 'end') {
376      // only actual end gets this treatment
377      if (this[EMITTED_END] === true)
378        return
379
380      this[EMITTED_END] = true
381      this.readable = false
382
383      if (this[DECODER]) {
384        data = this[DECODER].end()
385        if (data) {
386          this.pipes.forEach(p => p.dest.write(data))
387          super.emit('data', data)
388        }
389      }
390
391      this.pipes.forEach(p => {
392        p.dest.removeListener('drain', p.ondrain)
393        if (p.opts.end)
394          p.dest.end()
395      })
396    } else if (ev === 'close') {
397      this[CLOSED] = true
398      // don't emit close before 'end' and 'finish'
399      if (!this[EMITTED_END] && !this[DESTROYED])
400        return
401    }
402
403    // TODO: replace with a spread operator when Node v4 support drops
404    const args = new Array(arguments.length)
405    args[0] = ev
406    args[1] = data
407    if (arguments.length > 2) {
408      for (let i = 2; i < arguments.length; i++) {
409        args[i] = arguments[i]
410      }
411    }
412
413    try {
414      return super.emit.apply(this, args)
415    } finally {
416      if (!isEndish(ev))
417        this[MAYBE_EMIT_END]()
418      else
419        this.removeAllListeners(ev)
420    }
421  }
422
423  // const all = await stream.collect()
424  collect () {
425    const buf = []
426    buf.dataLength = 0
427    this.on('data', c => {
428      buf.push(c)
429      buf.dataLength += c.length
430    })
431    return this.promise().then(() => buf)
432  }
433
434  // const data = await stream.concat()
435  concat () {
436    return this[OBJECTMODE]
437      ? Promise.reject(new Error('cannot concat in objectMode'))
438      : this.collect().then(buf =>
439          this[OBJECTMODE]
440            ? Promise.reject(new Error('cannot concat in objectMode'))
441            : this[ENCODING] ? buf.join('') : B.concat(buf, buf.dataLength))
442  }
443
444  // stream.promise().then(() => done, er => emitted error)
445  promise () {
446    return new Promise((resolve, reject) => {
447      this.on(DESTROYED, () => reject(new Error('stream destroyed')))
448      this.on('end', () => resolve())
449      this.on('error', er => reject(er))
450    })
451  }
452
453  // for await (let chunk of stream)
454  [ASYNCITERATOR] () {
455    const next = () => {
456      const res = this.read()
457      if (res !== null)
458        return Promise.resolve({ done: false, value: res })
459
460      if (this[EOF])
461        return Promise.resolve({ done: true })
462
463      let resolve = null
464      let reject = null
465      const onerr = er => {
466        this.removeListener('data', ondata)
467        this.removeListener('end', onend)
468        reject(er)
469      }
470      const ondata = value => {
471        this.removeListener('error', onerr)
472        this.removeListener('end', onend)
473        this.pause()
474        resolve({ value: value, done: !!this[EOF] })
475      }
476      const onend = () => {
477        this.removeListener('error', onerr)
478        this.removeListener('data', ondata)
479        resolve({ done: true })
480      }
481      const ondestroy = () => onerr(new Error('stream destroyed'))
482      return new Promise((res, rej) => {
483        reject = rej
484        resolve = res
485        this.once(DESTROYED, ondestroy)
486        this.once('error', onerr)
487        this.once('end', onend)
488        this.once('data', ondata)
489      })
490    }
491
492    return { next }
493  }
494
495  // for (let chunk of stream)
496  [ITERATOR] () {
497    const next = () => {
498      const value = this.read()
499      const done = value === null
500      return { value, done }
501    }
502    return { next }
503  }
504
505  destroy (er) {
506    if (this[DESTROYED]) {
507      if (er)
508        this.emit('error', er)
509      else
510        this.emit(DESTROYED)
511      return this
512    }
513
514    this[DESTROYED] = true
515
516    // throw away all buffered data, it's never coming out
517    this.buffer = new Yallist()
518    this[BUFFERLENGTH] = 0
519
520    if (typeof this.close === 'function' && !this[CLOSED])
521      this.close()
522
523    if (er)
524      this.emit('error', er)
525    else // if no error to emit, still reject pending promises
526      this.emit(DESTROYED)
527
528    return this
529  }
530
531  static isStream (s) {
532    return !!s && (s instanceof Minipass || s instanceof EE && (
533      typeof s.pipe === 'function' || // readable
534      (typeof s.write === 'function' && typeof s.end === 'function') // writable
535    ))
536  }
537}
538