• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1const proc = typeof process === 'object' && process
2    ? process
3    : {
4        stdout: null,
5        stderr: null,
6    };
7import { EventEmitter } from 'events';
8import Stream from 'stream';
9import { StringDecoder } from 'string_decoder';
10/**
11 * Return true if the argument is a Minipass stream, Node stream, or something
12 * else that Minipass can interact with.
13 */
14export const isStream = (s) => !!s &&
15    typeof s === 'object' &&
16    (s instanceof Minipass ||
17        s instanceof Stream ||
18        isReadable(s) ||
19        isWritable(s));
20/**
21 * Return true if the argument is a valid {@link Minipass.Readable}
22 */
23export const isReadable = (s) => !!s &&
24    typeof s === 'object' &&
25    s instanceof EventEmitter &&
26    typeof s.pipe === 'function' &&
27    // node core Writable streams have a pipe() method, but it throws
28    s.pipe !== Stream.Writable.prototype.pipe;
29/**
30 * Return true if the argument is a valid {@link Minipass.Writable}
31 */
32export const isWritable = (s) => !!s &&
33    typeof s === 'object' &&
34    s instanceof EventEmitter &&
35    typeof s.write === 'function' &&
36    typeof s.end === 'function';
37const EOF = Symbol('EOF');
38const MAYBE_EMIT_END = Symbol('maybeEmitEnd');
39const EMITTED_END = Symbol('emittedEnd');
40const EMITTING_END = Symbol('emittingEnd');
41const EMITTED_ERROR = Symbol('emittedError');
42const CLOSED = Symbol('closed');
43const READ = Symbol('read');
44const FLUSH = Symbol('flush');
45const FLUSHCHUNK = Symbol('flushChunk');
46const ENCODING = Symbol('encoding');
47const DECODER = Symbol('decoder');
48const FLOWING = Symbol('flowing');
49const PAUSED = Symbol('paused');
50const RESUME = Symbol('resume');
51const BUFFER = Symbol('buffer');
52const PIPES = Symbol('pipes');
53const BUFFERLENGTH = Symbol('bufferLength');
54const BUFFERPUSH = Symbol('bufferPush');
55const BUFFERSHIFT = Symbol('bufferShift');
56const OBJECTMODE = Symbol('objectMode');
57// internal event when stream is destroyed
58const DESTROYED = Symbol('destroyed');
59// internal event when stream has an error
60const ERROR = Symbol('error');
61const EMITDATA = Symbol('emitData');
62const EMITEND = Symbol('emitEnd');
63const EMITEND2 = Symbol('emitEnd2');
64const ASYNC = Symbol('async');
65const ABORT = Symbol('abort');
66const ABORTED = Symbol('aborted');
67const SIGNAL = Symbol('signal');
68const DATALISTENERS = Symbol('dataListeners');
69const DISCARDED = Symbol('discarded');
70const defer = (fn) => Promise.resolve().then(fn);
71const nodefer = (fn) => fn();
72const isEndish = (ev) => ev === 'end' || ev === 'finish' || ev === 'prefinish';
73const isArrayBufferLike = (b) => b instanceof ArrayBuffer ||
74    (!!b &&
75        typeof b === 'object' &&
76        b.constructor &&
77        b.constructor.name === 'ArrayBuffer' &&
78        b.byteLength >= 0);
79const isArrayBufferView = (b) => !Buffer.isBuffer(b) && ArrayBuffer.isView(b);
80/**
81 * Internal class representing a pipe to a destination stream.
82 *
83 * @internal
84 */
85class Pipe {
86    src;
87    dest;
88    opts;
89    ondrain;
90    constructor(src, dest, opts) {
91        this.src = src;
92        this.dest = dest;
93        this.opts = opts;
94        this.ondrain = () => src[RESUME]();
95        this.dest.on('drain', this.ondrain);
96    }
97    unpipe() {
98        this.dest.removeListener('drain', this.ondrain);
99    }
100    // only here for the prototype
101    /* c8 ignore start */
102    proxyErrors(_er) { }
103    /* c8 ignore stop */
104    end() {
105        this.unpipe();
106        if (this.opts.end)
107            this.dest.end();
108    }
109}
110/**
111 * Internal class representing a pipe to a destination stream where
112 * errors are proxied.
113 *
114 * @internal
115 */
116class PipeProxyErrors extends Pipe {
117    unpipe() {
118        this.src.removeListener('error', this.proxyErrors);
119        super.unpipe();
120    }
121    constructor(src, dest, opts) {
122        super(src, dest, opts);
123        this.proxyErrors = er => dest.emit('error', er);
124        src.on('error', this.proxyErrors);
125    }
126}
127const isObjectModeOptions = (o) => !!o.objectMode;
128const isEncodingOptions = (o) => !o.objectMode && !!o.encoding && o.encoding !== 'buffer';
129/**
130 * Main export, the Minipass class
131 *
132 * `RType` is the type of data emitted, defaults to Buffer
133 *
134 * `WType` is the type of data to be written, if RType is buffer or string,
135 * then any {@link Minipass.ContiguousData} is allowed.
136 *
137 * `Events` is the set of event handler signatures that this object
138 * will emit, see {@link Minipass.Events}
139 */
140export class Minipass extends EventEmitter {
141    [FLOWING] = false;
142    [PAUSED] = false;
143    [PIPES] = [];
144    [BUFFER] = [];
145    [OBJECTMODE];
146    [ENCODING];
147    [ASYNC];
148    [DECODER];
149    [EOF] = false;
150    [EMITTED_END] = false;
151    [EMITTING_END] = false;
152    [CLOSED] = false;
153    [EMITTED_ERROR] = null;
154    [BUFFERLENGTH] = 0;
155    [DESTROYED] = false;
156    [SIGNAL];
157    [ABORTED] = false;
158    [DATALISTENERS] = 0;
159    [DISCARDED] = false;
160    /**
161     * true if the stream can be written
162     */
163    writable = true;
164    /**
165     * true if the stream can be read
166     */
167    readable = true;
168    /**
169     * If `RType` is Buffer, then options do not need to be provided.
170     * Otherwise, an options object must be provided to specify either
171     * {@link Minipass.SharedOptions.objectMode} or
172     * {@link Minipass.SharedOptions.encoding}, as appropriate.
173     */
174    constructor(...args) {
175        const options = (args[0] ||
176            {});
177        super();
178        if (options.objectMode && typeof options.encoding === 'string') {
179            throw new TypeError('Encoding and objectMode may not be used together');
180        }
181        if (isObjectModeOptions(options)) {
182            this[OBJECTMODE] = true;
183            this[ENCODING] = null;
184        }
185        else if (isEncodingOptions(options)) {
186            this[ENCODING] = options.encoding;
187            this[OBJECTMODE] = false;
188        }
189        else {
190            this[OBJECTMODE] = false;
191            this[ENCODING] = null;
192        }
193        this[ASYNC] = !!options.async;
194        this[DECODER] = this[ENCODING]
195            ? new StringDecoder(this[ENCODING])
196            : null;
197        //@ts-ignore - private option for debugging and testing
198        if (options && options.debugExposeBuffer === true) {
199            Object.defineProperty(this, 'buffer', { get: () => this[BUFFER] });
200        }
201        //@ts-ignore - private option for debugging and testing
202        if (options && options.debugExposePipes === true) {
203            Object.defineProperty(this, 'pipes', { get: () => this[PIPES] });
204        }
205        const { signal } = options;
206        if (signal) {
207            this[SIGNAL] = signal;
208            if (signal.aborted) {
209                this[ABORT]();
210            }
211            else {
212                signal.addEventListener('abort', () => this[ABORT]());
213            }
214        }
215    }
216    /**
217     * The amount of data stored in the buffer waiting to be read.
218     *
219     * For Buffer strings, this will be the total byte length.
220     * For string encoding streams, this will be the string character length,
221     * according to JavaScript's `string.length` logic.
222     * For objectMode streams, this is a count of the items waiting to be
223     * emitted.
224     */
225    get bufferLength() {
226        return this[BUFFERLENGTH];
227    }
228    /**
229     * The `BufferEncoding` currently in use, or `null`
230     */
231    get encoding() {
232        return this[ENCODING];
233    }
234    /**
235     * @deprecated - This is a read only property
236     */
237    set encoding(_enc) {
238        throw new Error('Encoding must be set at instantiation time');
239    }
240    /**
241     * @deprecated - Encoding may only be set at instantiation time
242     */
243    setEncoding(_enc) {
244        throw new Error('Encoding must be set at instantiation time');
245    }
246    /**
247     * True if this is an objectMode stream
248     */
249    get objectMode() {
250        return this[OBJECTMODE];
251    }
252    /**
253     * @deprecated - This is a read-only property
254     */
255    set objectMode(_om) {
256        throw new Error('objectMode must be set at instantiation time');
257    }
258    /**
259     * true if this is an async stream
260     */
261    get ['async']() {
262        return this[ASYNC];
263    }
264    /**
265     * Set to true to make this stream async.
266     *
267     * Once set, it cannot be unset, as this would potentially cause incorrect
268     * behavior.  Ie, a sync stream can be made async, but an async stream
269     * cannot be safely made sync.
270     */
271    set ['async'](a) {
272        this[ASYNC] = this[ASYNC] || !!a;
273    }
274    // drop everything and get out of the flow completely
275    [ABORT]() {
276        this[ABORTED] = true;
277        this.emit('abort', this[SIGNAL]?.reason);
278        this.destroy(this[SIGNAL]?.reason);
279    }
280    /**
281     * True if the stream has been aborted.
282     */
283    get aborted() {
284        return this[ABORTED];
285    }
286    /**
287     * No-op setter. Stream aborted status is set via the AbortSignal provided
288     * in the constructor options.
289     */
290    set aborted(_) { }
291    write(chunk, encoding, cb) {
292        if (this[ABORTED])
293            return false;
294        if (this[EOF])
295            throw new Error('write after end');
296        if (this[DESTROYED]) {
297            this.emit('error', Object.assign(new Error('Cannot call write after a stream was destroyed'), { code: 'ERR_STREAM_DESTROYED' }));
298            return true;
299        }
300        if (typeof encoding === 'function') {
301            cb = encoding;
302            encoding = 'utf8';
303        }
304        if (!encoding)
305            encoding = 'utf8';
306        const fn = this[ASYNC] ? defer : nodefer;
307        // convert array buffers and typed array views into buffers
308        // at some point in the future, we may want to do the opposite!
309        // leave strings and buffers as-is
310        // anything is only allowed if in object mode, so throw
311        if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) {
312            if (isArrayBufferView(chunk)) {
313                //@ts-ignore - sinful unsafe type changing
314                chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength);
315            }
316            else if (isArrayBufferLike(chunk)) {
317                //@ts-ignore - sinful unsafe type changing
318                chunk = Buffer.from(chunk);
319            }
320            else if (typeof chunk !== 'string') {
321                throw new Error('Non-contiguous data written to non-objectMode stream');
322            }
323        }
324        // handle object mode up front, since it's simpler
325        // this yields better performance, fewer checks later.
326        if (this[OBJECTMODE]) {
327            // maybe impossible?
328            /* c8 ignore start */
329            if (this[FLOWING] && this[BUFFERLENGTH] !== 0)
330                this[FLUSH](true);
331            /* c8 ignore stop */
332            if (this[FLOWING])
333                this.emit('data', chunk);
334            else
335                this[BUFFERPUSH](chunk);
336            if (this[BUFFERLENGTH] !== 0)
337                this.emit('readable');
338            if (cb)
339                fn(cb);
340            return this[FLOWING];
341        }
342        // at this point the chunk is a buffer or string
343        // don't buffer it up or send it to the decoder
344        if (!chunk.length) {
345            if (this[BUFFERLENGTH] !== 0)
346                this.emit('readable');
347            if (cb)
348                fn(cb);
349            return this[FLOWING];
350        }
351        // fast-path writing strings of same encoding to a stream with
352        // an empty buffer, skipping the buffer/decoder dance
353        if (typeof chunk === 'string' &&
354            // unless it is a string already ready for us to use
355            !(encoding === this[ENCODING] && !this[DECODER]?.lastNeed)) {
356            //@ts-ignore - sinful unsafe type change
357            chunk = Buffer.from(chunk, encoding);
358        }
359        if (Buffer.isBuffer(chunk) && this[ENCODING]) {
360            //@ts-ignore - sinful unsafe type change
361            chunk = this[DECODER].write(chunk);
362        }
363        // Note: flushing CAN potentially switch us into not-flowing mode
364        if (this[FLOWING] && this[BUFFERLENGTH] !== 0)
365            this[FLUSH](true);
366        if (this[FLOWING])
367            this.emit('data', chunk);
368        else
369            this[BUFFERPUSH](chunk);
370        if (this[BUFFERLENGTH] !== 0)
371            this.emit('readable');
372        if (cb)
373            fn(cb);
374        return this[FLOWING];
375    }
376    /**
377     * Low-level explicit read method.
378     *
379     * In objectMode, the argument is ignored, and one item is returned if
380     * available.
381     *
382     * `n` is the number of bytes (or in the case of encoding streams,
383     * characters) to consume. If `n` is not provided, then the entire buffer
384     * is returned, or `null` is returned if no data is available.
385     *
386     * If `n` is greater that the amount of data in the internal buffer,
387     * then `null` is returned.
388     */
389    read(n) {
390        if (this[DESTROYED])
391            return null;
392        this[DISCARDED] = false;
393        if (this[BUFFERLENGTH] === 0 ||
394            n === 0 ||
395            (n && n > this[BUFFERLENGTH])) {
396            this[MAYBE_EMIT_END]();
397            return null;
398        }
399        if (this[OBJECTMODE])
400            n = null;
401        if (this[BUFFER].length > 1 && !this[OBJECTMODE]) {
402            // not object mode, so if we have an encoding, then RType is string
403            // otherwise, must be Buffer
404            this[BUFFER] = [
405                (this[ENCODING]
406                    ? this[BUFFER].join('')
407                    : Buffer.concat(this[BUFFER], this[BUFFERLENGTH])),
408            ];
409        }
410        const ret = this[READ](n || null, this[BUFFER][0]);
411        this[MAYBE_EMIT_END]();
412        return ret;
413    }
414    [READ](n, chunk) {
415        if (this[OBJECTMODE])
416            this[BUFFERSHIFT]();
417        else {
418            const c = chunk;
419            if (n === c.length || n === null)
420                this[BUFFERSHIFT]();
421            else if (typeof c === 'string') {
422                this[BUFFER][0] = c.slice(n);
423                chunk = c.slice(0, n);
424                this[BUFFERLENGTH] -= n;
425            }
426            else {
427                this[BUFFER][0] = c.subarray(n);
428                chunk = c.subarray(0, n);
429                this[BUFFERLENGTH] -= n;
430            }
431        }
432        this.emit('data', chunk);
433        if (!this[BUFFER].length && !this[EOF])
434            this.emit('drain');
435        return chunk;
436    }
437    end(chunk, encoding, cb) {
438        if (typeof chunk === 'function') {
439            cb = chunk;
440            chunk = undefined;
441        }
442        if (typeof encoding === 'function') {
443            cb = encoding;
444            encoding = 'utf8';
445        }
446        if (chunk !== undefined)
447            this.write(chunk, encoding);
448        if (cb)
449            this.once('end', cb);
450        this[EOF] = true;
451        this.writable = false;
452        // if we haven't written anything, then go ahead and emit,
453        // even if we're not reading.
454        // we'll re-emit if a new 'end' listener is added anyway.
455        // This makes MP more suitable to write-only use cases.
456        if (this[FLOWING] || !this[PAUSED])
457            this[MAYBE_EMIT_END]();
458        return this;
459    }
460    // don't let the internal resume be overwritten
461    [RESUME]() {
462        if (this[DESTROYED])
463            return;
464        if (!this[DATALISTENERS] && !this[PIPES].length) {
465            this[DISCARDED] = true;
466        }
467        this[PAUSED] = false;
468        this[FLOWING] = true;
469        this.emit('resume');
470        if (this[BUFFER].length)
471            this[FLUSH]();
472        else if (this[EOF])
473            this[MAYBE_EMIT_END]();
474        else
475            this.emit('drain');
476    }
477    /**
478     * Resume the stream if it is currently in a paused state
479     *
480     * If called when there are no pipe destinations or `data` event listeners,
481     * this will place the stream in a "discarded" state, where all data will
482     * be thrown away. The discarded state is removed if a pipe destination or
483     * data handler is added, if pause() is called, or if any synchronous or
484     * asynchronous iteration is started.
485     */
486    resume() {
487        return this[RESUME]();
488    }
489    /**
490     * Pause the stream
491     */
492    pause() {
493        this[FLOWING] = false;
494        this[PAUSED] = true;
495        this[DISCARDED] = false;
496    }
497    /**
498     * true if the stream has been forcibly destroyed
499     */
500    get destroyed() {
501        return this[DESTROYED];
502    }
503    /**
504     * true if the stream is currently in a flowing state, meaning that
505     * any writes will be immediately emitted.
506     */
507    get flowing() {
508        return this[FLOWING];
509    }
510    /**
511     * true if the stream is currently in a paused state
512     */
513    get paused() {
514        return this[PAUSED];
515    }
516    [BUFFERPUSH](chunk) {
517        if (this[OBJECTMODE])
518            this[BUFFERLENGTH] += 1;
519        else
520            this[BUFFERLENGTH] += chunk.length;
521        this[BUFFER].push(chunk);
522    }
523    [BUFFERSHIFT]() {
524        if (this[OBJECTMODE])
525            this[BUFFERLENGTH] -= 1;
526        else
527            this[BUFFERLENGTH] -= this[BUFFER][0].length;
528        return this[BUFFER].shift();
529    }
530    [FLUSH](noDrain = false) {
531        do { } while (this[FLUSHCHUNK](this[BUFFERSHIFT]()) &&
532            this[BUFFER].length);
533        if (!noDrain && !this[BUFFER].length && !this[EOF])
534            this.emit('drain');
535    }
536    [FLUSHCHUNK](chunk) {
537        this.emit('data', chunk);
538        return this[FLOWING];
539    }
540    /**
541     * Pipe all data emitted by this stream into the destination provided.
542     *
543     * Triggers the flow of data.
544     */
545    pipe(dest, opts) {
546        if (this[DESTROYED])
547            return dest;
548        this[DISCARDED] = false;
549        const ended = this[EMITTED_END];
550        opts = opts || {};
551        if (dest === proc.stdout || dest === proc.stderr)
552            opts.end = false;
553        else
554            opts.end = opts.end !== false;
555        opts.proxyErrors = !!opts.proxyErrors;
556        // piping an ended stream ends immediately
557        if (ended) {
558            if (opts.end)
559                dest.end();
560        }
561        else {
562            // "as" here just ignores the WType, which pipes don't care about,
563            // since they're only consuming from us, and writing to the dest
564            this[PIPES].push(!opts.proxyErrors
565                ? new Pipe(this, dest, opts)
566                : new PipeProxyErrors(this, dest, opts));
567            if (this[ASYNC])
568                defer(() => this[RESUME]());
569            else
570                this[RESUME]();
571        }
572        return dest;
573    }
574    /**
575     * Fully unhook a piped destination stream.
576     *
577     * If the destination stream was the only consumer of this stream (ie,
578     * there are no other piped destinations or `'data'` event listeners)
579     * then the flow of data will stop until there is another consumer or
580     * {@link Minipass#resume} is explicitly called.
581     */
582    unpipe(dest) {
583        const p = this[PIPES].find(p => p.dest === dest);
584        if (p) {
585            if (this[PIPES].length === 1) {
586                if (this[FLOWING] && this[DATALISTENERS] === 0) {
587                    this[FLOWING] = false;
588                }
589                this[PIPES] = [];
590            }
591            else
592                this[PIPES].splice(this[PIPES].indexOf(p), 1);
593            p.unpipe();
594        }
595    }
596    /**
597     * Alias for {@link Minipass#on}
598     */
599    addListener(ev, handler) {
600        return this.on(ev, handler);
601    }
602    /**
603     * Mostly identical to `EventEmitter.on`, with the following
604     * behavior differences to prevent data loss and unnecessary hangs:
605     *
606     * - Adding a 'data' event handler will trigger the flow of data
607     *
608     * - Adding a 'readable' event handler when there is data waiting to be read
609     *   will cause 'readable' to be emitted immediately.
610     *
611     * - Adding an 'endish' event handler ('end', 'finish', etc.) which has
612     *   already passed will cause the event to be emitted immediately and all
613     *   handlers removed.
614     *
615     * - Adding an 'error' event handler after an error has been emitted will
616     *   cause the event to be re-emitted immediately with the error previously
617     *   raised.
618     */
619    on(ev, handler) {
620        const ret = super.on(ev, handler);
621        if (ev === 'data') {
622            this[DISCARDED] = false;
623            this[DATALISTENERS]++;
624            if (!this[PIPES].length && !this[FLOWING]) {
625                this[RESUME]();
626            }
627        }
628        else if (ev === 'readable' && this[BUFFERLENGTH] !== 0) {
629            super.emit('readable');
630        }
631        else if (isEndish(ev) && this[EMITTED_END]) {
632            super.emit(ev);
633            this.removeAllListeners(ev);
634        }
635        else if (ev === 'error' && this[EMITTED_ERROR]) {
636            const h = handler;
637            if (this[ASYNC])
638                defer(() => h.call(this, this[EMITTED_ERROR]));
639            else
640                h.call(this, this[EMITTED_ERROR]);
641        }
642        return ret;
643    }
644    /**
645     * Alias for {@link Minipass#off}
646     */
647    removeListener(ev, handler) {
648        return this.off(ev, handler);
649    }
650    /**
651     * Mostly identical to `EventEmitter.off`
652     *
653     * If a 'data' event handler is removed, and it was the last consumer
654     * (ie, there are no pipe destinations or other 'data' event listeners),
655     * then the flow of data will stop until there is another consumer or
656     * {@link Minipass#resume} is explicitly called.
657     */
658    off(ev, handler) {
659        const ret = super.off(ev, handler);
660        // if we previously had listeners, and now we don't, and we don't
661        // have any pipes, then stop the flow, unless it's been explicitly
662        // put in a discarded flowing state via stream.resume().
663        if (ev === 'data') {
664            this[DATALISTENERS] = this.listeners('data').length;
665            if (this[DATALISTENERS] === 0 &&
666                !this[DISCARDED] &&
667                !this[PIPES].length) {
668                this[FLOWING] = false;
669            }
670        }
671        return ret;
672    }
673    /**
674     * Mostly identical to `EventEmitter.removeAllListeners`
675     *
676     * If all 'data' event handlers are removed, and they were the last consumer
677     * (ie, there are no pipe destinations), then the flow of data will stop
678     * until there is another consumer or {@link Minipass#resume} is explicitly
679     * called.
680     */
681    removeAllListeners(ev) {
682        const ret = super.removeAllListeners(ev);
683        if (ev === 'data' || ev === undefined) {
684            this[DATALISTENERS] = 0;
685            if (!this[DISCARDED] && !this[PIPES].length) {
686                this[FLOWING] = false;
687            }
688        }
689        return ret;
690    }
691    /**
692     * true if the 'end' event has been emitted
693     */
694    get emittedEnd() {
695        return this[EMITTED_END];
696    }
697    [MAYBE_EMIT_END]() {
698        if (!this[EMITTING_END] &&
699            !this[EMITTED_END] &&
700            !this[DESTROYED] &&
701            this[BUFFER].length === 0 &&
702            this[EOF]) {
703            this[EMITTING_END] = true;
704            this.emit('end');
705            this.emit('prefinish');
706            this.emit('finish');
707            if (this[CLOSED])
708                this.emit('close');
709            this[EMITTING_END] = false;
710        }
711    }
712    /**
713     * Mostly identical to `EventEmitter.emit`, with the following
714     * behavior differences to prevent data loss and unnecessary hangs:
715     *
716     * If the stream has been destroyed, and the event is something other
717     * than 'close' or 'error', then `false` is returned and no handlers
718     * are called.
719     *
720     * If the event is 'end', and has already been emitted, then the event
721     * is ignored. If the stream is in a paused or non-flowing state, then
722     * the event will be deferred until data flow resumes. If the stream is
723     * async, then handlers will be called on the next tick rather than
724     * immediately.
725     *
726     * If the event is 'close', and 'end' has not yet been emitted, then
727     * the event will be deferred until after 'end' is emitted.
728     *
729     * If the event is 'error', and an AbortSignal was provided for the stream,
730     * and there are no listeners, then the event is ignored, matching the
731     * behavior of node core streams in the presense of an AbortSignal.
732     *
733     * If the event is 'finish' or 'prefinish', then all listeners will be
734     * removed after emitting the event, to prevent double-firing.
735     */
736    emit(ev, ...args) {
737        const data = args[0];
738        // error and close are only events allowed after calling destroy()
739        if (ev !== 'error' &&
740            ev !== 'close' &&
741            ev !== DESTROYED &&
742            this[DESTROYED]) {
743            return false;
744        }
745        else if (ev === 'data') {
746            return !this[OBJECTMODE] && !data
747                ? false
748                : this[ASYNC]
749                    ? (defer(() => this[EMITDATA](data)), true)
750                    : this[EMITDATA](data);
751        }
752        else if (ev === 'end') {
753            return this[EMITEND]();
754        }
755        else if (ev === 'close') {
756            this[CLOSED] = true;
757            // don't emit close before 'end' and 'finish'
758            if (!this[EMITTED_END] && !this[DESTROYED])
759                return false;
760            const ret = super.emit('close');
761            this.removeAllListeners('close');
762            return ret;
763        }
764        else if (ev === 'error') {
765            this[EMITTED_ERROR] = data;
766            super.emit(ERROR, data);
767            const ret = !this[SIGNAL] || this.listeners('error').length
768                ? super.emit('error', data)
769                : false;
770            this[MAYBE_EMIT_END]();
771            return ret;
772        }
773        else if (ev === 'resume') {
774            const ret = super.emit('resume');
775            this[MAYBE_EMIT_END]();
776            return ret;
777        }
778        else if (ev === 'finish' || ev === 'prefinish') {
779            const ret = super.emit(ev);
780            this.removeAllListeners(ev);
781            return ret;
782        }
783        // Some other unknown event
784        const ret = super.emit(ev, ...args);
785        this[MAYBE_EMIT_END]();
786        return ret;
787    }
788    [EMITDATA](data) {
789        for (const p of this[PIPES]) {
790            if (p.dest.write(data) === false)
791                this.pause();
792        }
793        const ret = this[DISCARDED] ? false : super.emit('data', data);
794        this[MAYBE_EMIT_END]();
795        return ret;
796    }
797    [EMITEND]() {
798        if (this[EMITTED_END])
799            return false;
800        this[EMITTED_END] = true;
801        this.readable = false;
802        return this[ASYNC]
803            ? (defer(() => this[EMITEND2]()), true)
804            : this[EMITEND2]();
805    }
806    [EMITEND2]() {
807        if (this[DECODER]) {
808            const data = this[DECODER].end();
809            if (data) {
810                for (const p of this[PIPES]) {
811                    p.dest.write(data);
812                }
813                if (!this[DISCARDED])
814                    super.emit('data', data);
815            }
816        }
817        for (const p of this[PIPES]) {
818            p.end();
819        }
820        const ret = super.emit('end');
821        this.removeAllListeners('end');
822        return ret;
823    }
824    /**
825     * Return a Promise that resolves to an array of all emitted data once
826     * the stream ends.
827     */
828    async collect() {
829        const buf = Object.assign([], {
830            dataLength: 0,
831        });
832        if (!this[OBJECTMODE])
833            buf.dataLength = 0;
834        // set the promise first, in case an error is raised
835        // by triggering the flow here.
836        const p = this.promise();
837        this.on('data', c => {
838            buf.push(c);
839            if (!this[OBJECTMODE])
840                buf.dataLength += c.length;
841        });
842        await p;
843        return buf;
844    }
845    /**
846     * Return a Promise that resolves to the concatenation of all emitted data
847     * once the stream ends.
848     *
849     * Not allowed on objectMode streams.
850     */
851    async concat() {
852        if (this[OBJECTMODE]) {
853            throw new Error('cannot concat in objectMode');
854        }
855        const buf = await this.collect();
856        return (this[ENCODING]
857            ? buf.join('')
858            : Buffer.concat(buf, buf.dataLength));
859    }
860    /**
861     * Return a void Promise that resolves once the stream ends.
862     */
863    async promise() {
864        return new Promise((resolve, reject) => {
865            this.on(DESTROYED, () => reject(new Error('stream destroyed')));
866            this.on('error', er => reject(er));
867            this.on('end', () => resolve());
868        });
869    }
870    /**
871     * Asynchronous `for await of` iteration.
872     *
873     * This will continue emitting all chunks until the stream terminates.
874     */
875    [Symbol.asyncIterator]() {
876        // set this up front, in case the consumer doesn't call next()
877        // right away.
878        this[DISCARDED] = false;
879        let stopped = false;
880        const stop = async () => {
881            this.pause();
882            stopped = true;
883            return { value: undefined, done: true };
884        };
885        const next = () => {
886            if (stopped)
887                return stop();
888            const res = this.read();
889            if (res !== null)
890                return Promise.resolve({ done: false, value: res });
891            if (this[EOF])
892                return stop();
893            let resolve;
894            let reject;
895            const onerr = (er) => {
896                this.off('data', ondata);
897                this.off('end', onend);
898                this.off(DESTROYED, ondestroy);
899                stop();
900                reject(er);
901            };
902            const ondata = (value) => {
903                this.off('error', onerr);
904                this.off('end', onend);
905                this.off(DESTROYED, ondestroy);
906                this.pause();
907                resolve({ value, done: !!this[EOF] });
908            };
909            const onend = () => {
910                this.off('error', onerr);
911                this.off('data', ondata);
912                this.off(DESTROYED, ondestroy);
913                stop();
914                resolve({ done: true, value: undefined });
915            };
916            const ondestroy = () => onerr(new Error('stream destroyed'));
917            return new Promise((res, rej) => {
918                reject = rej;
919                resolve = res;
920                this.once(DESTROYED, ondestroy);
921                this.once('error', onerr);
922                this.once('end', onend);
923                this.once('data', ondata);
924            });
925        };
926        return {
927            next,
928            throw: stop,
929            return: stop,
930            [Symbol.asyncIterator]() {
931                return this;
932            },
933        };
934    }
935    /**
936     * Synchronous `for of` iteration.
937     *
938     * The iteration will terminate when the internal buffer runs out, even
939     * if the stream has not yet terminated.
940     */
941    [Symbol.iterator]() {
942        // set this up front, in case the consumer doesn't call next()
943        // right away.
944        this[DISCARDED] = false;
945        let stopped = false;
946        const stop = () => {
947            this.pause();
948            this.off(ERROR, stop);
949            this.off(DESTROYED, stop);
950            this.off('end', stop);
951            stopped = true;
952            return { done: true, value: undefined };
953        };
954        const next = () => {
955            if (stopped)
956                return stop();
957            const value = this.read();
958            return value === null ? stop() : { done: false, value };
959        };
960        this.once('end', stop);
961        this.once(ERROR, stop);
962        this.once(DESTROYED, stop);
963        return {
964            next,
965            throw: stop,
966            return: stop,
967            [Symbol.iterator]() {
968                return this;
969            },
970        };
971    }
972    /**
973     * Destroy a stream, preventing it from being used for any further purpose.
974     *
975     * If the stream has a `close()` method, then it will be called on
976     * destruction.
977     *
978     * After destruction, any attempt to write data, read data, or emit most
979     * events will be ignored.
980     *
981     * If an error argument is provided, then it will be emitted in an
982     * 'error' event.
983     */
984    destroy(er) {
985        if (this[DESTROYED]) {
986            if (er)
987                this.emit('error', er);
988            else
989                this.emit(DESTROYED);
990            return this;
991        }
992        this[DESTROYED] = true;
993        this[DISCARDED] = true;
994        // throw away all buffered data, it's never coming out
995        this[BUFFER].length = 0;
996        this[BUFFERLENGTH] = 0;
997        const wc = this;
998        if (typeof wc.close === 'function' && !this[CLOSED])
999            wc.close();
1000        if (er)
1001            this.emit('error', er);
1002        // if no error to emit, still reject pending promises
1003        else
1004            this.emit(DESTROYED);
1005        return this;
1006    }
1007    /**
1008     * Alias for {@link isStream}
1009     *
1010     * Former export location, maintained for backwards compatibility.
1011     *
1012     * @deprecated
1013     */
1014    static get isStream() {
1015        return isStream;
1016    }
1017}
1018//# sourceMappingURL=index.js.map