• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict'
2const proc =
3  typeof process === 'object' && process
4    ? process
5    : {
6        stdout: null,
7        stderr: null,
8      }
9const EE = require('events')
10const Stream = require('stream')
11const stringdecoder = require('string_decoder')
12const SD = stringdecoder.StringDecoder
13
14const EOF = Symbol('EOF')
15const MAYBE_EMIT_END = Symbol('maybeEmitEnd')
16const EMITTED_END = Symbol('emittedEnd')
17const EMITTING_END = Symbol('emittingEnd')
18const EMITTED_ERROR = Symbol('emittedError')
19const CLOSED = Symbol('closed')
20const READ = Symbol('read')
21const FLUSH = Symbol('flush')
22const FLUSHCHUNK = Symbol('flushChunk')
23const ENCODING = Symbol('encoding')
24const DECODER = Symbol('decoder')
25const FLOWING = Symbol('flowing')
26const PAUSED = Symbol('paused')
27const RESUME = Symbol('resume')
28const BUFFER = Symbol('buffer')
29const PIPES = Symbol('pipes')
30const BUFFERLENGTH = Symbol('bufferLength')
31const BUFFERPUSH = Symbol('bufferPush')
32const BUFFERSHIFT = Symbol('bufferShift')
33const OBJECTMODE = Symbol('objectMode')
34// internal event when stream is destroyed
35const DESTROYED = Symbol('destroyed')
36// internal event when stream has an error
37const ERROR = Symbol('error')
38const EMITDATA = Symbol('emitData')
39const EMITEND = Symbol('emitEnd')
40const EMITEND2 = Symbol('emitEnd2')
41const ASYNC = Symbol('async')
42const ABORT = Symbol('abort')
43const ABORTED = Symbol('aborted')
44const SIGNAL = Symbol('signal')
45
46const defer = fn => Promise.resolve().then(fn)
47
48// TODO remove when Node v8 support drops
49const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1'
50const ASYNCITERATOR =
51  (doIter && Symbol.asyncIterator) || Symbol('asyncIterator not implemented')
52const ITERATOR =
53  (doIter && Symbol.iterator) || Symbol('iterator not implemented')
54
55// events that mean 'the stream is over'
56// these are treated specially, and re-emitted
57// if they are listened for after emitting.
58const isEndish = ev => ev === 'end' || ev === 'finish' || ev === 'prefinish'
59
60const isArrayBuffer = b =>
61  b instanceof ArrayBuffer ||
62  (typeof b === 'object' &&
63    b.constructor &&
64    b.constructor.name === 'ArrayBuffer' &&
65    b.byteLength >= 0)
66
67const isArrayBufferView = b => !Buffer.isBuffer(b) && ArrayBuffer.isView(b)
68
69class Pipe {
70  constructor(src, dest, opts) {
71    this.src = src
72    this.dest = dest
73    this.opts = opts
74    this.ondrain = () => src[RESUME]()
75    dest.on('drain', this.ondrain)
76  }
77  unpipe() {
78    this.dest.removeListener('drain', this.ondrain)
79  }
80  // istanbul ignore next - only here for the prototype
81  proxyErrors() {}
82  end() {
83    this.unpipe()
84    if (this.opts.end) this.dest.end()
85  }
86}
87
88class PipeProxyErrors extends Pipe {
89  unpipe() {
90    this.src.removeListener('error', this.proxyErrors)
91    super.unpipe()
92  }
93  constructor(src, dest, opts) {
94    super(src, dest, opts)
95    this.proxyErrors = er => dest.emit('error', er)
96    src.on('error', this.proxyErrors)
97  }
98}
99
100class Minipass extends Stream {
101  constructor(options) {
102    super()
103    this[FLOWING] = false
104    // whether we're explicitly paused
105    this[PAUSED] = false
106    this[PIPES] = []
107    this[BUFFER] = []
108    this[OBJECTMODE] = (options && options.objectMode) || false
109    if (this[OBJECTMODE]) this[ENCODING] = null
110    else this[ENCODING] = (options && options.encoding) || null
111    if (this[ENCODING] === 'buffer') this[ENCODING] = null
112    this[ASYNC] = (options && !!options.async) || false
113    this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null
114    this[EOF] = false
115    this[EMITTED_END] = false
116    this[EMITTING_END] = false
117    this[CLOSED] = false
118    this[EMITTED_ERROR] = null
119    this.writable = true
120    this.readable = true
121    this[BUFFERLENGTH] = 0
122    this[DESTROYED] = false
123    if (options && options.debugExposeBuffer === true) {
124      Object.defineProperty(this, 'buffer', { get: () => this[BUFFER] })
125    }
126    if (options && options.debugExposePipes === true) {
127      Object.defineProperty(this, 'pipes', { get: () => this[PIPES] })
128    }
129    this[SIGNAL] = options && options.signal
130    this[ABORTED] = false
131    if (this[SIGNAL]) {
132      this[SIGNAL].addEventListener('abort', () => this[ABORT]())
133      if (this[SIGNAL].aborted) {
134        this[ABORT]()
135      }
136    }
137  }
138
139  get bufferLength() {
140    return this[BUFFERLENGTH]
141  }
142
143  get encoding() {
144    return this[ENCODING]
145  }
146  set encoding(enc) {
147    if (this[OBJECTMODE]) throw new Error('cannot set encoding in objectMode')
148
149    if (
150      this[ENCODING] &&
151      enc !== this[ENCODING] &&
152      ((this[DECODER] && this[DECODER].lastNeed) || this[BUFFERLENGTH])
153    )
154      throw new Error('cannot change encoding')
155
156    if (this[ENCODING] !== enc) {
157      this[DECODER] = enc ? new SD(enc) : null
158      if (this[BUFFER].length)
159        this[BUFFER] = this[BUFFER].map(chunk => this[DECODER].write(chunk))
160    }
161
162    this[ENCODING] = enc
163  }
164
165  setEncoding(enc) {
166    this.encoding = enc
167  }
168
169  get objectMode() {
170    return this[OBJECTMODE]
171  }
172  set objectMode(om) {
173    this[OBJECTMODE] = this[OBJECTMODE] || !!om
174  }
175
176  get ['async']() {
177    return this[ASYNC]
178  }
179  set ['async'](a) {
180    this[ASYNC] = this[ASYNC] || !!a
181  }
182
183  // drop everything and get out of the flow completely
184  [ABORT]() {
185    this[ABORTED] = true
186    this.emit('abort', this[SIGNAL].reason)
187    this.destroy(this[SIGNAL].reason)
188  }
189
190  get aborted() {
191    return this[ABORTED]
192  }
193  set aborted(_) {}
194
195  write(chunk, encoding, cb) {
196    if (this[ABORTED]) return false
197    if (this[EOF]) throw new Error('write after end')
198
199    if (this[DESTROYED]) {
200      this.emit(
201        'error',
202        Object.assign(
203          new Error('Cannot call write after a stream was destroyed'),
204          { code: 'ERR_STREAM_DESTROYED' }
205        )
206      )
207      return true
208    }
209
210    if (typeof encoding === 'function') (cb = encoding), (encoding = 'utf8')
211
212    if (!encoding) encoding = 'utf8'
213
214    const fn = this[ASYNC] ? defer : f => f()
215
216    // convert array buffers and typed array views into buffers
217    // at some point in the future, we may want to do the opposite!
218    // leave strings and buffers as-is
219    // anything else switches us into object mode
220    if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) {
221      if (isArrayBufferView(chunk))
222        chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
223      else if (isArrayBuffer(chunk)) chunk = Buffer.from(chunk)
224      else if (typeof chunk !== 'string')
225        // use the setter so we throw if we have encoding set
226        this.objectMode = true
227    }
228
229    // handle object mode up front, since it's simpler
230    // this yields better performance, fewer checks later.
231    if (this[OBJECTMODE]) {
232      /* istanbul ignore if - maybe impossible? */
233      if (this.flowing && this[BUFFERLENGTH] !== 0) this[FLUSH](true)
234
235      if (this.flowing) this.emit('data', chunk)
236      else this[BUFFERPUSH](chunk)
237
238      if (this[BUFFERLENGTH] !== 0) this.emit('readable')
239
240      if (cb) fn(cb)
241
242      return this.flowing
243    }
244
245    // at this point the chunk is a buffer or string
246    // don't buffer it up or send it to the decoder
247    if (!chunk.length) {
248      if (this[BUFFERLENGTH] !== 0) this.emit('readable')
249      if (cb) fn(cb)
250      return this.flowing
251    }
252
253    // fast-path writing strings of same encoding to a stream with
254    // an empty buffer, skipping the buffer/decoder dance
255    if (
256      typeof chunk === 'string' &&
257      // unless it is a string already ready for us to use
258      !(encoding === this[ENCODING] && !this[DECODER].lastNeed)
259    ) {
260      chunk = Buffer.from(chunk, encoding)
261    }
262
263    if (Buffer.isBuffer(chunk) && this[ENCODING])
264      chunk = this[DECODER].write(chunk)
265
266    // Note: flushing CAN potentially switch us into not-flowing mode
267    if (this.flowing && this[BUFFERLENGTH] !== 0) this[FLUSH](true)
268
269    if (this.flowing) this.emit('data', chunk)
270    else this[BUFFERPUSH](chunk)
271
272    if (this[BUFFERLENGTH] !== 0) this.emit('readable')
273
274    if (cb) fn(cb)
275
276    return this.flowing
277  }
278
279  read(n) {
280    if (this[DESTROYED]) return null
281
282    if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH]) {
283      this[MAYBE_EMIT_END]()
284      return null
285    }
286
287    if (this[OBJECTMODE]) n = null
288
289    if (this[BUFFER].length > 1 && !this[OBJECTMODE]) {
290      if (this.encoding) this[BUFFER] = [this[BUFFER].join('')]
291      else this[BUFFER] = [Buffer.concat(this[BUFFER], this[BUFFERLENGTH])]
292    }
293
294    const ret = this[READ](n || null, this[BUFFER][0])
295    this[MAYBE_EMIT_END]()
296    return ret
297  }
298
299  [READ](n, chunk) {
300    if (n === chunk.length || n === null) this[BUFFERSHIFT]()
301    else {
302      this[BUFFER][0] = chunk.slice(n)
303      chunk = chunk.slice(0, n)
304      this[BUFFERLENGTH] -= n
305    }
306
307    this.emit('data', chunk)
308
309    if (!this[BUFFER].length && !this[EOF]) this.emit('drain')
310
311    return chunk
312  }
313
314  end(chunk, encoding, cb) {
315    if (typeof chunk === 'function') (cb = chunk), (chunk = null)
316    if (typeof encoding === 'function') (cb = encoding), (encoding = 'utf8')
317    if (chunk) this.write(chunk, encoding)
318    if (cb) this.once('end', cb)
319    this[EOF] = true
320    this.writable = false
321
322    // if we haven't written anything, then go ahead and emit,
323    // even if we're not reading.
324    // we'll re-emit if a new 'end' listener is added anyway.
325    // This makes MP more suitable to write-only use cases.
326    if (this.flowing || !this[PAUSED]) this[MAYBE_EMIT_END]()
327    return this
328  }
329
330  // don't let the internal resume be overwritten
331  [RESUME]() {
332    if (this[DESTROYED]) return
333
334    this[PAUSED] = false
335    this[FLOWING] = true
336    this.emit('resume')
337    if (this[BUFFER].length) this[FLUSH]()
338    else if (this[EOF]) this[MAYBE_EMIT_END]()
339    else this.emit('drain')
340  }
341
342  resume() {
343    return this[RESUME]()
344  }
345
346  pause() {
347    this[FLOWING] = false
348    this[PAUSED] = true
349  }
350
351  get destroyed() {
352    return this[DESTROYED]
353  }
354
355  get flowing() {
356    return this[FLOWING]
357  }
358
359  get paused() {
360    return this[PAUSED]
361  }
362
363  [BUFFERPUSH](chunk) {
364    if (this[OBJECTMODE]) this[BUFFERLENGTH] += 1
365    else this[BUFFERLENGTH] += chunk.length
366    this[BUFFER].push(chunk)
367  }
368
369  [BUFFERSHIFT]() {
370    if (this[OBJECTMODE]) this[BUFFERLENGTH] -= 1
371    else this[BUFFERLENGTH] -= this[BUFFER][0].length
372    return this[BUFFER].shift()
373  }
374
375  [FLUSH](noDrain) {
376    do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]()) && this[BUFFER].length)
377
378    if (!noDrain && !this[BUFFER].length && !this[EOF]) this.emit('drain')
379  }
380
381  [FLUSHCHUNK](chunk) {
382    this.emit('data', chunk)
383    return this.flowing
384  }
385
386  pipe(dest, opts) {
387    if (this[DESTROYED]) return
388
389    const ended = this[EMITTED_END]
390    opts = opts || {}
391    if (dest === proc.stdout || dest === proc.stderr) opts.end = false
392    else opts.end = opts.end !== false
393    opts.proxyErrors = !!opts.proxyErrors
394
395    // piping an ended stream ends immediately
396    if (ended) {
397      if (opts.end) dest.end()
398    } else {
399      this[PIPES].push(
400        !opts.proxyErrors
401          ? new Pipe(this, dest, opts)
402          : new PipeProxyErrors(this, dest, opts)
403      )
404      if (this[ASYNC]) defer(() => this[RESUME]())
405      else this[RESUME]()
406    }
407
408    return dest
409  }
410
411  unpipe(dest) {
412    const p = this[PIPES].find(p => p.dest === dest)
413    if (p) {
414      this[PIPES].splice(this[PIPES].indexOf(p), 1)
415      p.unpipe()
416    }
417  }
418
419  addListener(ev, fn) {
420    return this.on(ev, fn)
421  }
422
423  on(ev, fn) {
424    const ret = super.on(ev, fn)
425    if (ev === 'data' && !this[PIPES].length && !this.flowing) this[RESUME]()
426    else if (ev === 'readable' && this[BUFFERLENGTH] !== 0)
427      super.emit('readable')
428    else if (isEndish(ev) && this[EMITTED_END]) {
429      super.emit(ev)
430      this.removeAllListeners(ev)
431    } else if (ev === 'error' && this[EMITTED_ERROR]) {
432      if (this[ASYNC]) defer(() => fn.call(this, this[EMITTED_ERROR]))
433      else fn.call(this, this[EMITTED_ERROR])
434    }
435    return ret
436  }
437
438  get emittedEnd() {
439    return this[EMITTED_END]
440  }
441
442  [MAYBE_EMIT_END]() {
443    if (
444      !this[EMITTING_END] &&
445      !this[EMITTED_END] &&
446      !this[DESTROYED] &&
447      this[BUFFER].length === 0 &&
448      this[EOF]
449    ) {
450      this[EMITTING_END] = true
451      this.emit('end')
452      this.emit('prefinish')
453      this.emit('finish')
454      if (this[CLOSED]) this.emit('close')
455      this[EMITTING_END] = false
456    }
457  }
458
459  emit(ev, data, ...extra) {
460    // error and close are only events allowed after calling destroy()
461    if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED])
462      return
463    else if (ev === 'data') {
464      return !this[OBJECTMODE] && !data
465        ? false
466        : this[ASYNC]
467        ? defer(() => this[EMITDATA](data))
468        : this[EMITDATA](data)
469    } else if (ev === 'end') {
470      return this[EMITEND]()
471    } else if (ev === 'close') {
472      this[CLOSED] = true
473      // don't emit close before 'end' and 'finish'
474      if (!this[EMITTED_END] && !this[DESTROYED]) return
475      const ret = super.emit('close')
476      this.removeAllListeners('close')
477      return ret
478    } else if (ev === 'error') {
479      this[EMITTED_ERROR] = data
480      super.emit(ERROR, data)
481      const ret =
482        !this[SIGNAL] || this.listeners('error').length
483          ? super.emit('error', data)
484          : false
485      this[MAYBE_EMIT_END]()
486      return ret
487    } else if (ev === 'resume') {
488      const ret = super.emit('resume')
489      this[MAYBE_EMIT_END]()
490      return ret
491    } else if (ev === 'finish' || ev === 'prefinish') {
492      const ret = super.emit(ev)
493      this.removeAllListeners(ev)
494      return ret
495    }
496
497    // Some other unknown event
498    const ret = super.emit(ev, data, ...extra)
499    this[MAYBE_EMIT_END]()
500    return ret
501  }
502
503  [EMITDATA](data) {
504    for (const p of this[PIPES]) {
505      if (p.dest.write(data) === false) this.pause()
506    }
507    const ret = super.emit('data', data)
508    this[MAYBE_EMIT_END]()
509    return ret
510  }
511
512  [EMITEND]() {
513    if (this[EMITTED_END]) return
514
515    this[EMITTED_END] = true
516    this.readable = false
517    if (this[ASYNC]) defer(() => this[EMITEND2]())
518    else this[EMITEND2]()
519  }
520
521  [EMITEND2]() {
522    if (this[DECODER]) {
523      const data = this[DECODER].end()
524      if (data) {
525        for (const p of this[PIPES]) {
526          p.dest.write(data)
527        }
528        super.emit('data', data)
529      }
530    }
531
532    for (const p of this[PIPES]) {
533      p.end()
534    }
535    const ret = super.emit('end')
536    this.removeAllListeners('end')
537    return ret
538  }
539
540  // const all = await stream.collect()
541  collect() {
542    const buf = []
543    if (!this[OBJECTMODE]) buf.dataLength = 0
544    // set the promise first, in case an error is raised
545    // by triggering the flow here.
546    const p = this.promise()
547    this.on('data', c => {
548      buf.push(c)
549      if (!this[OBJECTMODE]) buf.dataLength += c.length
550    })
551    return p.then(() => buf)
552  }
553
554  // const data = await stream.concat()
555  concat() {
556    return this[OBJECTMODE]
557      ? Promise.reject(new Error('cannot concat in objectMode'))
558      : this.collect().then(buf =>
559          this[OBJECTMODE]
560            ? Promise.reject(new Error('cannot concat in objectMode'))
561            : this[ENCODING]
562            ? buf.join('')
563            : Buffer.concat(buf, buf.dataLength)
564        )
565  }
566
567  // stream.promise().then(() => done, er => emitted error)
568  promise() {
569    return new Promise((resolve, reject) => {
570      this.on(DESTROYED, () => reject(new Error('stream destroyed')))
571      this.on('error', er => reject(er))
572      this.on('end', () => resolve())
573    })
574  }
575
576  // for await (let chunk of stream)
577  [ASYNCITERATOR]() {
578    let stopped = false
579    const stop = () => {
580      this.pause()
581      stopped = true
582      return Promise.resolve({ done: true })
583    }
584    const next = () => {
585      if (stopped) return stop()
586      const res = this.read()
587      if (res !== null) return Promise.resolve({ done: false, value: res })
588
589      if (this[EOF]) return stop()
590
591      let resolve = null
592      let reject = null
593      const onerr = er => {
594        this.removeListener('data', ondata)
595        this.removeListener('end', onend)
596        this.removeListener(DESTROYED, ondestroy)
597        stop()
598        reject(er)
599      }
600      const ondata = value => {
601        this.removeListener('error', onerr)
602        this.removeListener('end', onend)
603        this.removeListener(DESTROYED, ondestroy)
604        this.pause()
605        resolve({ value: value, done: !!this[EOF] })
606      }
607      const onend = () => {
608        this.removeListener('error', onerr)
609        this.removeListener('data', ondata)
610        this.removeListener(DESTROYED, ondestroy)
611        stop()
612        resolve({ done: true })
613      }
614      const ondestroy = () => onerr(new Error('stream destroyed'))
615      return new Promise((res, rej) => {
616        reject = rej
617        resolve = res
618        this.once(DESTROYED, ondestroy)
619        this.once('error', onerr)
620        this.once('end', onend)
621        this.once('data', ondata)
622      })
623    }
624
625    return {
626      next,
627      throw: stop,
628      return: stop,
629      [ASYNCITERATOR]() {
630        return this
631      },
632    }
633  }
634
635  // for (let chunk of stream)
636  [ITERATOR]() {
637    let stopped = false
638    const stop = () => {
639      this.pause()
640      this.removeListener(ERROR, stop)
641      this.removeListener(DESTROYED, stop)
642      this.removeListener('end', stop)
643      stopped = true
644      return { done: true }
645    }
646
647    const next = () => {
648      if (stopped) return stop()
649      const value = this.read()
650      return value === null ? stop() : { value }
651    }
652    this.once('end', stop)
653    this.once(ERROR, stop)
654    this.once(DESTROYED, stop)
655
656    return {
657      next,
658      throw: stop,
659      return: stop,
660      [ITERATOR]() {
661        return this
662      },
663    }
664  }
665
666  destroy(er) {
667    if (this[DESTROYED]) {
668      if (er) this.emit('error', er)
669      else this.emit(DESTROYED)
670      return this
671    }
672
673    this[DESTROYED] = true
674
675    // throw away all buffered data, it's never coming out
676    this[BUFFER].length = 0
677    this[BUFFERLENGTH] = 0
678
679    if (typeof this.close === 'function' && !this[CLOSED]) this.close()
680
681    if (er) this.emit('error', er)
682    // if no error to emit, still reject pending promises
683    else this.emit(DESTROYED)
684
685    return this
686  }
687
688  static isStream(s) {
689    return (
690      !!s &&
691      (s instanceof Minipass ||
692        s instanceof Stream ||
693        (s instanceof EE &&
694          // readable
695          (typeof s.pipe === 'function' ||
696            // writable
697            (typeof s.write === 'function' && typeof s.end === 'function'))))
698    )
699  }
700}
701
702exports.Minipass = Minipass
703