1const proc = typeof process === 'object' && process 2 ? process 3 : { 4 stdout: null, 5 stderr: null, 6 }; 7import { EventEmitter } from 'events'; 8import Stream from 'stream'; 9import { StringDecoder } from 'string_decoder'; 10/** 11 * Return true if the argument is a Minipass stream, Node stream, or something 12 * else that Minipass can interact with. 13 */ 14export const isStream = (s) => !!s && 15 typeof s === 'object' && 16 (s instanceof Minipass || 17 s instanceof Stream || 18 isReadable(s) || 19 isWritable(s)); 20/** 21 * Return true if the argument is a valid {@link Minipass.Readable} 22 */ 23export const isReadable = (s) => !!s && 24 typeof s === 'object' && 25 s instanceof EventEmitter && 26 typeof s.pipe === 'function' && 27 // node core Writable streams have a pipe() method, but it throws 28 s.pipe !== Stream.Writable.prototype.pipe; 29/** 30 * Return true if the argument is a valid {@link Minipass.Writable} 31 */ 32export const isWritable = (s) => !!s && 33 typeof s === 'object' && 34 s instanceof EventEmitter && 35 typeof s.write === 'function' && 36 typeof s.end === 'function'; 37const EOF = Symbol('EOF'); 38const MAYBE_EMIT_END = Symbol('maybeEmitEnd'); 39const EMITTED_END = Symbol('emittedEnd'); 40const EMITTING_END = Symbol('emittingEnd'); 41const EMITTED_ERROR = Symbol('emittedError'); 42const CLOSED = Symbol('closed'); 43const READ = Symbol('read'); 44const FLUSH = Symbol('flush'); 45const FLUSHCHUNK = Symbol('flushChunk'); 46const ENCODING = Symbol('encoding'); 47const DECODER = Symbol('decoder'); 48const FLOWING = Symbol('flowing'); 49const PAUSED = Symbol('paused'); 50const RESUME = Symbol('resume'); 51const BUFFER = Symbol('buffer'); 52const PIPES = Symbol('pipes'); 53const BUFFERLENGTH = Symbol('bufferLength'); 54const BUFFERPUSH = Symbol('bufferPush'); 55const BUFFERSHIFT = Symbol('bufferShift'); 56const OBJECTMODE = Symbol('objectMode'); 57// internal event when stream is destroyed 58const DESTROYED = Symbol('destroyed'); 59// internal event when stream has an error 60const ERROR = Symbol('error'); 61const EMITDATA = Symbol('emitData'); 62const EMITEND = Symbol('emitEnd'); 63const EMITEND2 = Symbol('emitEnd2'); 64const ASYNC = Symbol('async'); 65const ABORT = Symbol('abort'); 66const ABORTED = Symbol('aborted'); 67const SIGNAL = Symbol('signal'); 68const DATALISTENERS = Symbol('dataListeners'); 69const DISCARDED = Symbol('discarded'); 70const defer = (fn) => Promise.resolve().then(fn); 71const nodefer = (fn) => fn(); 72const isEndish = (ev) => ev === 'end' || ev === 'finish' || ev === 'prefinish'; 73const isArrayBufferLike = (b) => b instanceof ArrayBuffer || 74 (!!b && 75 typeof b === 'object' && 76 b.constructor && 77 b.constructor.name === 'ArrayBuffer' && 78 b.byteLength >= 0); 79const isArrayBufferView = (b) => !Buffer.isBuffer(b) && ArrayBuffer.isView(b); 80/** 81 * Internal class representing a pipe to a destination stream. 82 * 83 * @internal 84 */ 85class Pipe { 86 src; 87 dest; 88 opts; 89 ondrain; 90 constructor(src, dest, opts) { 91 this.src = src; 92 this.dest = dest; 93 this.opts = opts; 94 this.ondrain = () => src[RESUME](); 95 this.dest.on('drain', this.ondrain); 96 } 97 unpipe() { 98 this.dest.removeListener('drain', this.ondrain); 99 } 100 // only here for the prototype 101 /* c8 ignore start */ 102 proxyErrors(_er) { } 103 /* c8 ignore stop */ 104 end() { 105 this.unpipe(); 106 if (this.opts.end) 107 this.dest.end(); 108 } 109} 110/** 111 * Internal class representing a pipe to a destination stream where 112 * errors are proxied. 113 * 114 * @internal 115 */ 116class PipeProxyErrors extends Pipe { 117 unpipe() { 118 this.src.removeListener('error', this.proxyErrors); 119 super.unpipe(); 120 } 121 constructor(src, dest, opts) { 122 super(src, dest, opts); 123 this.proxyErrors = er => dest.emit('error', er); 124 src.on('error', this.proxyErrors); 125 } 126} 127const isObjectModeOptions = (o) => !!o.objectMode; 128const isEncodingOptions = (o) => !o.objectMode && !!o.encoding && o.encoding !== 'buffer'; 129/** 130 * Main export, the Minipass class 131 * 132 * `RType` is the type of data emitted, defaults to Buffer 133 * 134 * `WType` is the type of data to be written, if RType is buffer or string, 135 * then any {@link Minipass.ContiguousData} is allowed. 136 * 137 * `Events` is the set of event handler signatures that this object 138 * will emit, see {@link Minipass.Events} 139 */ 140export class Minipass extends EventEmitter { 141 [FLOWING] = false; 142 [PAUSED] = false; 143 [PIPES] = []; 144 [BUFFER] = []; 145 [OBJECTMODE]; 146 [ENCODING]; 147 [ASYNC]; 148 [DECODER]; 149 [EOF] = false; 150 [EMITTED_END] = false; 151 [EMITTING_END] = false; 152 [CLOSED] = false; 153 [EMITTED_ERROR] = null; 154 [BUFFERLENGTH] = 0; 155 [DESTROYED] = false; 156 [SIGNAL]; 157 [ABORTED] = false; 158 [DATALISTENERS] = 0; 159 [DISCARDED] = false; 160 /** 161 * true if the stream can be written 162 */ 163 writable = true; 164 /** 165 * true if the stream can be read 166 */ 167 readable = true; 168 /** 169 * If `RType` is Buffer, then options do not need to be provided. 170 * Otherwise, an options object must be provided to specify either 171 * {@link Minipass.SharedOptions.objectMode} or 172 * {@link Minipass.SharedOptions.encoding}, as appropriate. 173 */ 174 constructor(...args) { 175 const options = (args[0] || 176 {}); 177 super(); 178 if (options.objectMode && typeof options.encoding === 'string') { 179 throw new TypeError('Encoding and objectMode may not be used together'); 180 } 181 if (isObjectModeOptions(options)) { 182 this[OBJECTMODE] = true; 183 this[ENCODING] = null; 184 } 185 else if (isEncodingOptions(options)) { 186 this[ENCODING] = options.encoding; 187 this[OBJECTMODE] = false; 188 } 189 else { 190 this[OBJECTMODE] = false; 191 this[ENCODING] = null; 192 } 193 this[ASYNC] = !!options.async; 194 this[DECODER] = this[ENCODING] 195 ? new StringDecoder(this[ENCODING]) 196 : null; 197 //@ts-ignore - private option for debugging and testing 198 if (options && options.debugExposeBuffer === true) { 199 Object.defineProperty(this, 'buffer', { get: () => this[BUFFER] }); 200 } 201 //@ts-ignore - private option for debugging and testing 202 if (options && options.debugExposePipes === true) { 203 Object.defineProperty(this, 'pipes', { get: () => this[PIPES] }); 204 } 205 const { signal } = options; 206 if (signal) { 207 this[SIGNAL] = signal; 208 if (signal.aborted) { 209 this[ABORT](); 210 } 211 else { 212 signal.addEventListener('abort', () => this[ABORT]()); 213 } 214 } 215 } 216 /** 217 * The amount of data stored in the buffer waiting to be read. 218 * 219 * For Buffer strings, this will be the total byte length. 220 * For string encoding streams, this will be the string character length, 221 * according to JavaScript's `string.length` logic. 222 * For objectMode streams, this is a count of the items waiting to be 223 * emitted. 224 */ 225 get bufferLength() { 226 return this[BUFFERLENGTH]; 227 } 228 /** 229 * The `BufferEncoding` currently in use, or `null` 230 */ 231 get encoding() { 232 return this[ENCODING]; 233 } 234 /** 235 * @deprecated - This is a read only property 236 */ 237 set encoding(_enc) { 238 throw new Error('Encoding must be set at instantiation time'); 239 } 240 /** 241 * @deprecated - Encoding may only be set at instantiation time 242 */ 243 setEncoding(_enc) { 244 throw new Error('Encoding must be set at instantiation time'); 245 } 246 /** 247 * True if this is an objectMode stream 248 */ 249 get objectMode() { 250 return this[OBJECTMODE]; 251 } 252 /** 253 * @deprecated - This is a read-only property 254 */ 255 set objectMode(_om) { 256 throw new Error('objectMode must be set at instantiation time'); 257 } 258 /** 259 * true if this is an async stream 260 */ 261 get ['async']() { 262 return this[ASYNC]; 263 } 264 /** 265 * Set to true to make this stream async. 266 * 267 * Once set, it cannot be unset, as this would potentially cause incorrect 268 * behavior. Ie, a sync stream can be made async, but an async stream 269 * cannot be safely made sync. 270 */ 271 set ['async'](a) { 272 this[ASYNC] = this[ASYNC] || !!a; 273 } 274 // drop everything and get out of the flow completely 275 [ABORT]() { 276 this[ABORTED] = true; 277 this.emit('abort', this[SIGNAL]?.reason); 278 this.destroy(this[SIGNAL]?.reason); 279 } 280 /** 281 * True if the stream has been aborted. 282 */ 283 get aborted() { 284 return this[ABORTED]; 285 } 286 /** 287 * No-op setter. Stream aborted status is set via the AbortSignal provided 288 * in the constructor options. 289 */ 290 set aborted(_) { } 291 write(chunk, encoding, cb) { 292 if (this[ABORTED]) 293 return false; 294 if (this[EOF]) 295 throw new Error('write after end'); 296 if (this[DESTROYED]) { 297 this.emit('error', Object.assign(new Error('Cannot call write after a stream was destroyed'), { code: 'ERR_STREAM_DESTROYED' })); 298 return true; 299 } 300 if (typeof encoding === 'function') { 301 cb = encoding; 302 encoding = 'utf8'; 303 } 304 if (!encoding) 305 encoding = 'utf8'; 306 const fn = this[ASYNC] ? defer : nodefer; 307 // convert array buffers and typed array views into buffers 308 // at some point in the future, we may want to do the opposite! 309 // leave strings and buffers as-is 310 // anything is only allowed if in object mode, so throw 311 if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) { 312 if (isArrayBufferView(chunk)) { 313 //@ts-ignore - sinful unsafe type changing 314 chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength); 315 } 316 else if (isArrayBufferLike(chunk)) { 317 //@ts-ignore - sinful unsafe type changing 318 chunk = Buffer.from(chunk); 319 } 320 else if (typeof chunk !== 'string') { 321 throw new Error('Non-contiguous data written to non-objectMode stream'); 322 } 323 } 324 // handle object mode up front, since it's simpler 325 // this yields better performance, fewer checks later. 326 if (this[OBJECTMODE]) { 327 // maybe impossible? 328 /* c8 ignore start */ 329 if (this[FLOWING] && this[BUFFERLENGTH] !== 0) 330 this[FLUSH](true); 331 /* c8 ignore stop */ 332 if (this[FLOWING]) 333 this.emit('data', chunk); 334 else 335 this[BUFFERPUSH](chunk); 336 if (this[BUFFERLENGTH] !== 0) 337 this.emit('readable'); 338 if (cb) 339 fn(cb); 340 return this[FLOWING]; 341 } 342 // at this point the chunk is a buffer or string 343 // don't buffer it up or send it to the decoder 344 if (!chunk.length) { 345 if (this[BUFFERLENGTH] !== 0) 346 this.emit('readable'); 347 if (cb) 348 fn(cb); 349 return this[FLOWING]; 350 } 351 // fast-path writing strings of same encoding to a stream with 352 // an empty buffer, skipping the buffer/decoder dance 353 if (typeof chunk === 'string' && 354 // unless it is a string already ready for us to use 355 !(encoding === this[ENCODING] && !this[DECODER]?.lastNeed)) { 356 //@ts-ignore - sinful unsafe type change 357 chunk = Buffer.from(chunk, encoding); 358 } 359 if (Buffer.isBuffer(chunk) && this[ENCODING]) { 360 //@ts-ignore - sinful unsafe type change 361 chunk = this[DECODER].write(chunk); 362 } 363 // Note: flushing CAN potentially switch us into not-flowing mode 364 if (this[FLOWING] && this[BUFFERLENGTH] !== 0) 365 this[FLUSH](true); 366 if (this[FLOWING]) 367 this.emit('data', chunk); 368 else 369 this[BUFFERPUSH](chunk); 370 if (this[BUFFERLENGTH] !== 0) 371 this.emit('readable'); 372 if (cb) 373 fn(cb); 374 return this[FLOWING]; 375 } 376 /** 377 * Low-level explicit read method. 378 * 379 * In objectMode, the argument is ignored, and one item is returned if 380 * available. 381 * 382 * `n` is the number of bytes (or in the case of encoding streams, 383 * characters) to consume. If `n` is not provided, then the entire buffer 384 * is returned, or `null` is returned if no data is available. 385 * 386 * If `n` is greater that the amount of data in the internal buffer, 387 * then `null` is returned. 388 */ 389 read(n) { 390 if (this[DESTROYED]) 391 return null; 392 this[DISCARDED] = false; 393 if (this[BUFFERLENGTH] === 0 || 394 n === 0 || 395 (n && n > this[BUFFERLENGTH])) { 396 this[MAYBE_EMIT_END](); 397 return null; 398 } 399 if (this[OBJECTMODE]) 400 n = null; 401 if (this[BUFFER].length > 1 && !this[OBJECTMODE]) { 402 // not object mode, so if we have an encoding, then RType is string 403 // otherwise, must be Buffer 404 this[BUFFER] = [ 405 (this[ENCODING] 406 ? this[BUFFER].join('') 407 : Buffer.concat(this[BUFFER], this[BUFFERLENGTH])), 408 ]; 409 } 410 const ret = this[READ](n || null, this[BUFFER][0]); 411 this[MAYBE_EMIT_END](); 412 return ret; 413 } 414 [READ](n, chunk) { 415 if (this[OBJECTMODE]) 416 this[BUFFERSHIFT](); 417 else { 418 const c = chunk; 419 if (n === c.length || n === null) 420 this[BUFFERSHIFT](); 421 else if (typeof c === 'string') { 422 this[BUFFER][0] = c.slice(n); 423 chunk = c.slice(0, n); 424 this[BUFFERLENGTH] -= n; 425 } 426 else { 427 this[BUFFER][0] = c.subarray(n); 428 chunk = c.subarray(0, n); 429 this[BUFFERLENGTH] -= n; 430 } 431 } 432 this.emit('data', chunk); 433 if (!this[BUFFER].length && !this[EOF]) 434 this.emit('drain'); 435 return chunk; 436 } 437 end(chunk, encoding, cb) { 438 if (typeof chunk === 'function') { 439 cb = chunk; 440 chunk = undefined; 441 } 442 if (typeof encoding === 'function') { 443 cb = encoding; 444 encoding = 'utf8'; 445 } 446 if (chunk !== undefined) 447 this.write(chunk, encoding); 448 if (cb) 449 this.once('end', cb); 450 this[EOF] = true; 451 this.writable = false; 452 // if we haven't written anything, then go ahead and emit, 453 // even if we're not reading. 454 // we'll re-emit if a new 'end' listener is added anyway. 455 // This makes MP more suitable to write-only use cases. 456 if (this[FLOWING] || !this[PAUSED]) 457 this[MAYBE_EMIT_END](); 458 return this; 459 } 460 // don't let the internal resume be overwritten 461 [RESUME]() { 462 if (this[DESTROYED]) 463 return; 464 if (!this[DATALISTENERS] && !this[PIPES].length) { 465 this[DISCARDED] = true; 466 } 467 this[PAUSED] = false; 468 this[FLOWING] = true; 469 this.emit('resume'); 470 if (this[BUFFER].length) 471 this[FLUSH](); 472 else if (this[EOF]) 473 this[MAYBE_EMIT_END](); 474 else 475 this.emit('drain'); 476 } 477 /** 478 * Resume the stream if it is currently in a paused state 479 * 480 * If called when there are no pipe destinations or `data` event listeners, 481 * this will place the stream in a "discarded" state, where all data will 482 * be thrown away. The discarded state is removed if a pipe destination or 483 * data handler is added, if pause() is called, or if any synchronous or 484 * asynchronous iteration is started. 485 */ 486 resume() { 487 return this[RESUME](); 488 } 489 /** 490 * Pause the stream 491 */ 492 pause() { 493 this[FLOWING] = false; 494 this[PAUSED] = true; 495 this[DISCARDED] = false; 496 } 497 /** 498 * true if the stream has been forcibly destroyed 499 */ 500 get destroyed() { 501 return this[DESTROYED]; 502 } 503 /** 504 * true if the stream is currently in a flowing state, meaning that 505 * any writes will be immediately emitted. 506 */ 507 get flowing() { 508 return this[FLOWING]; 509 } 510 /** 511 * true if the stream is currently in a paused state 512 */ 513 get paused() { 514 return this[PAUSED]; 515 } 516 [BUFFERPUSH](chunk) { 517 if (this[OBJECTMODE]) 518 this[BUFFERLENGTH] += 1; 519 else 520 this[BUFFERLENGTH] += chunk.length; 521 this[BUFFER].push(chunk); 522 } 523 [BUFFERSHIFT]() { 524 if (this[OBJECTMODE]) 525 this[BUFFERLENGTH] -= 1; 526 else 527 this[BUFFERLENGTH] -= this[BUFFER][0].length; 528 return this[BUFFER].shift(); 529 } 530 [FLUSH](noDrain = false) { 531 do { } while (this[FLUSHCHUNK](this[BUFFERSHIFT]()) && 532 this[BUFFER].length); 533 if (!noDrain && !this[BUFFER].length && !this[EOF]) 534 this.emit('drain'); 535 } 536 [FLUSHCHUNK](chunk) { 537 this.emit('data', chunk); 538 return this[FLOWING]; 539 } 540 /** 541 * Pipe all data emitted by this stream into the destination provided. 542 * 543 * Triggers the flow of data. 544 */ 545 pipe(dest, opts) { 546 if (this[DESTROYED]) 547 return dest; 548 this[DISCARDED] = false; 549 const ended = this[EMITTED_END]; 550 opts = opts || {}; 551 if (dest === proc.stdout || dest === proc.stderr) 552 opts.end = false; 553 else 554 opts.end = opts.end !== false; 555 opts.proxyErrors = !!opts.proxyErrors; 556 // piping an ended stream ends immediately 557 if (ended) { 558 if (opts.end) 559 dest.end(); 560 } 561 else { 562 // "as" here just ignores the WType, which pipes don't care about, 563 // since they're only consuming from us, and writing to the dest 564 this[PIPES].push(!opts.proxyErrors 565 ? new Pipe(this, dest, opts) 566 : new PipeProxyErrors(this, dest, opts)); 567 if (this[ASYNC]) 568 defer(() => this[RESUME]()); 569 else 570 this[RESUME](); 571 } 572 return dest; 573 } 574 /** 575 * Fully unhook a piped destination stream. 576 * 577 * If the destination stream was the only consumer of this stream (ie, 578 * there are no other piped destinations or `'data'` event listeners) 579 * then the flow of data will stop until there is another consumer or 580 * {@link Minipass#resume} is explicitly called. 581 */ 582 unpipe(dest) { 583 const p = this[PIPES].find(p => p.dest === dest); 584 if (p) { 585 if (this[PIPES].length === 1) { 586 if (this[FLOWING] && this[DATALISTENERS] === 0) { 587 this[FLOWING] = false; 588 } 589 this[PIPES] = []; 590 } 591 else 592 this[PIPES].splice(this[PIPES].indexOf(p), 1); 593 p.unpipe(); 594 } 595 } 596 /** 597 * Alias for {@link Minipass#on} 598 */ 599 addListener(ev, handler) { 600 return this.on(ev, handler); 601 } 602 /** 603 * Mostly identical to `EventEmitter.on`, with the following 604 * behavior differences to prevent data loss and unnecessary hangs: 605 * 606 * - Adding a 'data' event handler will trigger the flow of data 607 * 608 * - Adding a 'readable' event handler when there is data waiting to be read 609 * will cause 'readable' to be emitted immediately. 610 * 611 * - Adding an 'endish' event handler ('end', 'finish', etc.) which has 612 * already passed will cause the event to be emitted immediately and all 613 * handlers removed. 614 * 615 * - Adding an 'error' event handler after an error has been emitted will 616 * cause the event to be re-emitted immediately with the error previously 617 * raised. 618 */ 619 on(ev, handler) { 620 const ret = super.on(ev, handler); 621 if (ev === 'data') { 622 this[DISCARDED] = false; 623 this[DATALISTENERS]++; 624 if (!this[PIPES].length && !this[FLOWING]) { 625 this[RESUME](); 626 } 627 } 628 else if (ev === 'readable' && this[BUFFERLENGTH] !== 0) { 629 super.emit('readable'); 630 } 631 else if (isEndish(ev) && this[EMITTED_END]) { 632 super.emit(ev); 633 this.removeAllListeners(ev); 634 } 635 else if (ev === 'error' && this[EMITTED_ERROR]) { 636 const h = handler; 637 if (this[ASYNC]) 638 defer(() => h.call(this, this[EMITTED_ERROR])); 639 else 640 h.call(this, this[EMITTED_ERROR]); 641 } 642 return ret; 643 } 644 /** 645 * Alias for {@link Minipass#off} 646 */ 647 removeListener(ev, handler) { 648 return this.off(ev, handler); 649 } 650 /** 651 * Mostly identical to `EventEmitter.off` 652 * 653 * If a 'data' event handler is removed, and it was the last consumer 654 * (ie, there are no pipe destinations or other 'data' event listeners), 655 * then the flow of data will stop until there is another consumer or 656 * {@link Minipass#resume} is explicitly called. 657 */ 658 off(ev, handler) { 659 const ret = super.off(ev, handler); 660 // if we previously had listeners, and now we don't, and we don't 661 // have any pipes, then stop the flow, unless it's been explicitly 662 // put in a discarded flowing state via stream.resume(). 663 if (ev === 'data') { 664 this[DATALISTENERS] = this.listeners('data').length; 665 if (this[DATALISTENERS] === 0 && 666 !this[DISCARDED] && 667 !this[PIPES].length) { 668 this[FLOWING] = false; 669 } 670 } 671 return ret; 672 } 673 /** 674 * Mostly identical to `EventEmitter.removeAllListeners` 675 * 676 * If all 'data' event handlers are removed, and they were the last consumer 677 * (ie, there are no pipe destinations), then the flow of data will stop 678 * until there is another consumer or {@link Minipass#resume} is explicitly 679 * called. 680 */ 681 removeAllListeners(ev) { 682 const ret = super.removeAllListeners(ev); 683 if (ev === 'data' || ev === undefined) { 684 this[DATALISTENERS] = 0; 685 if (!this[DISCARDED] && !this[PIPES].length) { 686 this[FLOWING] = false; 687 } 688 } 689 return ret; 690 } 691 /** 692 * true if the 'end' event has been emitted 693 */ 694 get emittedEnd() { 695 return this[EMITTED_END]; 696 } 697 [MAYBE_EMIT_END]() { 698 if (!this[EMITTING_END] && 699 !this[EMITTED_END] && 700 !this[DESTROYED] && 701 this[BUFFER].length === 0 && 702 this[EOF]) { 703 this[EMITTING_END] = true; 704 this.emit('end'); 705 this.emit('prefinish'); 706 this.emit('finish'); 707 if (this[CLOSED]) 708 this.emit('close'); 709 this[EMITTING_END] = false; 710 } 711 } 712 /** 713 * Mostly identical to `EventEmitter.emit`, with the following 714 * behavior differences to prevent data loss and unnecessary hangs: 715 * 716 * If the stream has been destroyed, and the event is something other 717 * than 'close' or 'error', then `false` is returned and no handlers 718 * are called. 719 * 720 * If the event is 'end', and has already been emitted, then the event 721 * is ignored. If the stream is in a paused or non-flowing state, then 722 * the event will be deferred until data flow resumes. If the stream is 723 * async, then handlers will be called on the next tick rather than 724 * immediately. 725 * 726 * If the event is 'close', and 'end' has not yet been emitted, then 727 * the event will be deferred until after 'end' is emitted. 728 * 729 * If the event is 'error', and an AbortSignal was provided for the stream, 730 * and there are no listeners, then the event is ignored, matching the 731 * behavior of node core streams in the presense of an AbortSignal. 732 * 733 * If the event is 'finish' or 'prefinish', then all listeners will be 734 * removed after emitting the event, to prevent double-firing. 735 */ 736 emit(ev, ...args) { 737 const data = args[0]; 738 // error and close are only events allowed after calling destroy() 739 if (ev !== 'error' && 740 ev !== 'close' && 741 ev !== DESTROYED && 742 this[DESTROYED]) { 743 return false; 744 } 745 else if (ev === 'data') { 746 return !this[OBJECTMODE] && !data 747 ? false 748 : this[ASYNC] 749 ? (defer(() => this[EMITDATA](data)), true) 750 : this[EMITDATA](data); 751 } 752 else if (ev === 'end') { 753 return this[EMITEND](); 754 } 755 else if (ev === 'close') { 756 this[CLOSED] = true; 757 // don't emit close before 'end' and 'finish' 758 if (!this[EMITTED_END] && !this[DESTROYED]) 759 return false; 760 const ret = super.emit('close'); 761 this.removeAllListeners('close'); 762 return ret; 763 } 764 else if (ev === 'error') { 765 this[EMITTED_ERROR] = data; 766 super.emit(ERROR, data); 767 const ret = !this[SIGNAL] || this.listeners('error').length 768 ? super.emit('error', data) 769 : false; 770 this[MAYBE_EMIT_END](); 771 return ret; 772 } 773 else if (ev === 'resume') { 774 const ret = super.emit('resume'); 775 this[MAYBE_EMIT_END](); 776 return ret; 777 } 778 else if (ev === 'finish' || ev === 'prefinish') { 779 const ret = super.emit(ev); 780 this.removeAllListeners(ev); 781 return ret; 782 } 783 // Some other unknown event 784 const ret = super.emit(ev, ...args); 785 this[MAYBE_EMIT_END](); 786 return ret; 787 } 788 [EMITDATA](data) { 789 for (const p of this[PIPES]) { 790 if (p.dest.write(data) === false) 791 this.pause(); 792 } 793 const ret = this[DISCARDED] ? false : super.emit('data', data); 794 this[MAYBE_EMIT_END](); 795 return ret; 796 } 797 [EMITEND]() { 798 if (this[EMITTED_END]) 799 return false; 800 this[EMITTED_END] = true; 801 this.readable = false; 802 return this[ASYNC] 803 ? (defer(() => this[EMITEND2]()), true) 804 : this[EMITEND2](); 805 } 806 [EMITEND2]() { 807 if (this[DECODER]) { 808 const data = this[DECODER].end(); 809 if (data) { 810 for (const p of this[PIPES]) { 811 p.dest.write(data); 812 } 813 if (!this[DISCARDED]) 814 super.emit('data', data); 815 } 816 } 817 for (const p of this[PIPES]) { 818 p.end(); 819 } 820 const ret = super.emit('end'); 821 this.removeAllListeners('end'); 822 return ret; 823 } 824 /** 825 * Return a Promise that resolves to an array of all emitted data once 826 * the stream ends. 827 */ 828 async collect() { 829 const buf = Object.assign([], { 830 dataLength: 0, 831 }); 832 if (!this[OBJECTMODE]) 833 buf.dataLength = 0; 834 // set the promise first, in case an error is raised 835 // by triggering the flow here. 836 const p = this.promise(); 837 this.on('data', c => { 838 buf.push(c); 839 if (!this[OBJECTMODE]) 840 buf.dataLength += c.length; 841 }); 842 await p; 843 return buf; 844 } 845 /** 846 * Return a Promise that resolves to the concatenation of all emitted data 847 * once the stream ends. 848 * 849 * Not allowed on objectMode streams. 850 */ 851 async concat() { 852 if (this[OBJECTMODE]) { 853 throw new Error('cannot concat in objectMode'); 854 } 855 const buf = await this.collect(); 856 return (this[ENCODING] 857 ? buf.join('') 858 : Buffer.concat(buf, buf.dataLength)); 859 } 860 /** 861 * Return a void Promise that resolves once the stream ends. 862 */ 863 async promise() { 864 return new Promise((resolve, reject) => { 865 this.on(DESTROYED, () => reject(new Error('stream destroyed'))); 866 this.on('error', er => reject(er)); 867 this.on('end', () => resolve()); 868 }); 869 } 870 /** 871 * Asynchronous `for await of` iteration. 872 * 873 * This will continue emitting all chunks until the stream terminates. 874 */ 875 [Symbol.asyncIterator]() { 876 // set this up front, in case the consumer doesn't call next() 877 // right away. 878 this[DISCARDED] = false; 879 let stopped = false; 880 const stop = async () => { 881 this.pause(); 882 stopped = true; 883 return { value: undefined, done: true }; 884 }; 885 const next = () => { 886 if (stopped) 887 return stop(); 888 const res = this.read(); 889 if (res !== null) 890 return Promise.resolve({ done: false, value: res }); 891 if (this[EOF]) 892 return stop(); 893 let resolve; 894 let reject; 895 const onerr = (er) => { 896 this.off('data', ondata); 897 this.off('end', onend); 898 this.off(DESTROYED, ondestroy); 899 stop(); 900 reject(er); 901 }; 902 const ondata = (value) => { 903 this.off('error', onerr); 904 this.off('end', onend); 905 this.off(DESTROYED, ondestroy); 906 this.pause(); 907 resolve({ value, done: !!this[EOF] }); 908 }; 909 const onend = () => { 910 this.off('error', onerr); 911 this.off('data', ondata); 912 this.off(DESTROYED, ondestroy); 913 stop(); 914 resolve({ done: true, value: undefined }); 915 }; 916 const ondestroy = () => onerr(new Error('stream destroyed')); 917 return new Promise((res, rej) => { 918 reject = rej; 919 resolve = res; 920 this.once(DESTROYED, ondestroy); 921 this.once('error', onerr); 922 this.once('end', onend); 923 this.once('data', ondata); 924 }); 925 }; 926 return { 927 next, 928 throw: stop, 929 return: stop, 930 [Symbol.asyncIterator]() { 931 return this; 932 }, 933 }; 934 } 935 /** 936 * Synchronous `for of` iteration. 937 * 938 * The iteration will terminate when the internal buffer runs out, even 939 * if the stream has not yet terminated. 940 */ 941 [Symbol.iterator]() { 942 // set this up front, in case the consumer doesn't call next() 943 // right away. 944 this[DISCARDED] = false; 945 let stopped = false; 946 const stop = () => { 947 this.pause(); 948 this.off(ERROR, stop); 949 this.off(DESTROYED, stop); 950 this.off('end', stop); 951 stopped = true; 952 return { done: true, value: undefined }; 953 }; 954 const next = () => { 955 if (stopped) 956 return stop(); 957 const value = this.read(); 958 return value === null ? stop() : { done: false, value }; 959 }; 960 this.once('end', stop); 961 this.once(ERROR, stop); 962 this.once(DESTROYED, stop); 963 return { 964 next, 965 throw: stop, 966 return: stop, 967 [Symbol.iterator]() { 968 return this; 969 }, 970 }; 971 } 972 /** 973 * Destroy a stream, preventing it from being used for any further purpose. 974 * 975 * If the stream has a `close()` method, then it will be called on 976 * destruction. 977 * 978 * After destruction, any attempt to write data, read data, or emit most 979 * events will be ignored. 980 * 981 * If an error argument is provided, then it will be emitted in an 982 * 'error' event. 983 */ 984 destroy(er) { 985 if (this[DESTROYED]) { 986 if (er) 987 this.emit('error', er); 988 else 989 this.emit(DESTROYED); 990 return this; 991 } 992 this[DESTROYED] = true; 993 this[DISCARDED] = true; 994 // throw away all buffered data, it's never coming out 995 this[BUFFER].length = 0; 996 this[BUFFERLENGTH] = 0; 997 const wc = this; 998 if (typeof wc.close === 'function' && !this[CLOSED]) 999 wc.close(); 1000 if (er) 1001 this.emit('error', er); 1002 // if no error to emit, still reject pending promises 1003 else 1004 this.emit(DESTROYED); 1005 return this; 1006 } 1007 /** 1008 * Alias for {@link isStream} 1009 * 1010 * Former export location, maintained for backwards compatibility. 1011 * 1012 * @deprecated 1013 */ 1014 static get isStream() { 1015 return isStream; 1016 } 1017} 1018//# sourceMappingURL=index.js.map