1'use strict' 2const proc = 3 typeof process === 'object' && process 4 ? process 5 : { 6 stdout: null, 7 stderr: null, 8 } 9const EE = require('events') 10const Stream = require('stream') 11const stringdecoder = require('string_decoder') 12const SD = stringdecoder.StringDecoder 13 14const EOF = Symbol('EOF') 15const MAYBE_EMIT_END = Symbol('maybeEmitEnd') 16const EMITTED_END = Symbol('emittedEnd') 17const EMITTING_END = Symbol('emittingEnd') 18const EMITTED_ERROR = Symbol('emittedError') 19const CLOSED = Symbol('closed') 20const READ = Symbol('read') 21const FLUSH = Symbol('flush') 22const FLUSHCHUNK = Symbol('flushChunk') 23const ENCODING = Symbol('encoding') 24const DECODER = Symbol('decoder') 25const FLOWING = Symbol('flowing') 26const PAUSED = Symbol('paused') 27const RESUME = Symbol('resume') 28const BUFFER = Symbol('buffer') 29const PIPES = Symbol('pipes') 30const BUFFERLENGTH = Symbol('bufferLength') 31const BUFFERPUSH = Symbol('bufferPush') 32const BUFFERSHIFT = Symbol('bufferShift') 33const OBJECTMODE = Symbol('objectMode') 34// internal event when stream is destroyed 35const DESTROYED = Symbol('destroyed') 36// internal event when stream has an error 37const ERROR = Symbol('error') 38const EMITDATA = Symbol('emitData') 39const EMITEND = Symbol('emitEnd') 40const EMITEND2 = Symbol('emitEnd2') 41const ASYNC = Symbol('async') 42const ABORT = Symbol('abort') 43const ABORTED = Symbol('aborted') 44const SIGNAL = Symbol('signal') 45 46const defer = fn => Promise.resolve().then(fn) 47 48// TODO remove when Node v8 support drops 49const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1' 50const ASYNCITERATOR = 51 (doIter && Symbol.asyncIterator) || Symbol('asyncIterator not implemented') 52const ITERATOR = 53 (doIter && Symbol.iterator) || Symbol('iterator not implemented') 54 55// events that mean 'the stream is over' 56// these are treated specially, and re-emitted 57// if they are listened for after emitting. 58const isEndish = ev => ev === 'end' || ev === 'finish' || ev === 'prefinish' 59 60const isArrayBuffer = b => 61 b instanceof ArrayBuffer || 62 (typeof b === 'object' && 63 b.constructor && 64 b.constructor.name === 'ArrayBuffer' && 65 b.byteLength >= 0) 66 67const isArrayBufferView = b => !Buffer.isBuffer(b) && ArrayBuffer.isView(b) 68 69class Pipe { 70 constructor(src, dest, opts) { 71 this.src = src 72 this.dest = dest 73 this.opts = opts 74 this.ondrain = () => src[RESUME]() 75 dest.on('drain', this.ondrain) 76 } 77 unpipe() { 78 this.dest.removeListener('drain', this.ondrain) 79 } 80 // istanbul ignore next - only here for the prototype 81 proxyErrors() {} 82 end() { 83 this.unpipe() 84 if (this.opts.end) this.dest.end() 85 } 86} 87 88class PipeProxyErrors extends Pipe { 89 unpipe() { 90 this.src.removeListener('error', this.proxyErrors) 91 super.unpipe() 92 } 93 constructor(src, dest, opts) { 94 super(src, dest, opts) 95 this.proxyErrors = er => dest.emit('error', er) 96 src.on('error', this.proxyErrors) 97 } 98} 99 100class Minipass extends Stream { 101 constructor(options) { 102 super() 103 this[FLOWING] = false 104 // whether we're explicitly paused 105 this[PAUSED] = false 106 this[PIPES] = [] 107 this[BUFFER] = [] 108 this[OBJECTMODE] = (options && options.objectMode) || false 109 if (this[OBJECTMODE]) this[ENCODING] = null 110 else this[ENCODING] = (options && options.encoding) || null 111 if (this[ENCODING] === 'buffer') this[ENCODING] = null 112 this[ASYNC] = (options && !!options.async) || false 113 this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null 114 this[EOF] = false 115 this[EMITTED_END] = false 116 this[EMITTING_END] = false 117 this[CLOSED] = false 118 this[EMITTED_ERROR] = null 119 this.writable = true 120 this.readable = true 121 this[BUFFERLENGTH] = 0 122 this[DESTROYED] = false 123 if (options && options.debugExposeBuffer === true) { 124 Object.defineProperty(this, 'buffer', { get: () => this[BUFFER] }) 125 } 126 if (options && options.debugExposePipes === true) { 127 Object.defineProperty(this, 'pipes', { get: () => this[PIPES] }) 128 } 129 this[SIGNAL] = options && options.signal 130 this[ABORTED] = false 131 if (this[SIGNAL]) { 132 this[SIGNAL].addEventListener('abort', () => this[ABORT]()) 133 if (this[SIGNAL].aborted) { 134 this[ABORT]() 135 } 136 } 137 } 138 139 get bufferLength() { 140 return this[BUFFERLENGTH] 141 } 142 143 get encoding() { 144 return this[ENCODING] 145 } 146 set encoding(enc) { 147 if (this[OBJECTMODE]) throw new Error('cannot set encoding in objectMode') 148 149 if ( 150 this[ENCODING] && 151 enc !== this[ENCODING] && 152 ((this[DECODER] && this[DECODER].lastNeed) || this[BUFFERLENGTH]) 153 ) 154 throw new Error('cannot change encoding') 155 156 if (this[ENCODING] !== enc) { 157 this[DECODER] = enc ? new SD(enc) : null 158 if (this[BUFFER].length) 159 this[BUFFER] = this[BUFFER].map(chunk => this[DECODER].write(chunk)) 160 } 161 162 this[ENCODING] = enc 163 } 164 165 setEncoding(enc) { 166 this.encoding = enc 167 } 168 169 get objectMode() { 170 return this[OBJECTMODE] 171 } 172 set objectMode(om) { 173 this[OBJECTMODE] = this[OBJECTMODE] || !!om 174 } 175 176 get ['async']() { 177 return this[ASYNC] 178 } 179 set ['async'](a) { 180 this[ASYNC] = this[ASYNC] || !!a 181 } 182 183 // drop everything and get out of the flow completely 184 [ABORT]() { 185 this[ABORTED] = true 186 this.emit('abort', this[SIGNAL].reason) 187 this.destroy(this[SIGNAL].reason) 188 } 189 190 get aborted() { 191 return this[ABORTED] 192 } 193 set aborted(_) {} 194 195 write(chunk, encoding, cb) { 196 if (this[ABORTED]) return false 197 if (this[EOF]) throw new Error('write after end') 198 199 if (this[DESTROYED]) { 200 this.emit( 201 'error', 202 Object.assign( 203 new Error('Cannot call write after a stream was destroyed'), 204 { code: 'ERR_STREAM_DESTROYED' } 205 ) 206 ) 207 return true 208 } 209 210 if (typeof encoding === 'function') (cb = encoding), (encoding = 'utf8') 211 212 if (!encoding) encoding = 'utf8' 213 214 const fn = this[ASYNC] ? defer : f => f() 215 216 // convert array buffers and typed array views into buffers 217 // at some point in the future, we may want to do the opposite! 218 // leave strings and buffers as-is 219 // anything else switches us into object mode 220 if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) { 221 if (isArrayBufferView(chunk)) 222 chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength) 223 else if (isArrayBuffer(chunk)) chunk = Buffer.from(chunk) 224 else if (typeof chunk !== 'string') 225 // use the setter so we throw if we have encoding set 226 this.objectMode = true 227 } 228 229 // handle object mode up front, since it's simpler 230 // this yields better performance, fewer checks later. 231 if (this[OBJECTMODE]) { 232 /* istanbul ignore if - maybe impossible? */ 233 if (this.flowing && this[BUFFERLENGTH] !== 0) this[FLUSH](true) 234 235 if (this.flowing) this.emit('data', chunk) 236 else this[BUFFERPUSH](chunk) 237 238 if (this[BUFFERLENGTH] !== 0) this.emit('readable') 239 240 if (cb) fn(cb) 241 242 return this.flowing 243 } 244 245 // at this point the chunk is a buffer or string 246 // don't buffer it up or send it to the decoder 247 if (!chunk.length) { 248 if (this[BUFFERLENGTH] !== 0) this.emit('readable') 249 if (cb) fn(cb) 250 return this.flowing 251 } 252 253 // fast-path writing strings of same encoding to a stream with 254 // an empty buffer, skipping the buffer/decoder dance 255 if ( 256 typeof chunk === 'string' && 257 // unless it is a string already ready for us to use 258 !(encoding === this[ENCODING] && !this[DECODER].lastNeed) 259 ) { 260 chunk = Buffer.from(chunk, encoding) 261 } 262 263 if (Buffer.isBuffer(chunk) && this[ENCODING]) 264 chunk = this[DECODER].write(chunk) 265 266 // Note: flushing CAN potentially switch us into not-flowing mode 267 if (this.flowing && this[BUFFERLENGTH] !== 0) this[FLUSH](true) 268 269 if (this.flowing) this.emit('data', chunk) 270 else this[BUFFERPUSH](chunk) 271 272 if (this[BUFFERLENGTH] !== 0) this.emit('readable') 273 274 if (cb) fn(cb) 275 276 return this.flowing 277 } 278 279 read(n) { 280 if (this[DESTROYED]) return null 281 282 if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH]) { 283 this[MAYBE_EMIT_END]() 284 return null 285 } 286 287 if (this[OBJECTMODE]) n = null 288 289 if (this[BUFFER].length > 1 && !this[OBJECTMODE]) { 290 if (this.encoding) this[BUFFER] = [this[BUFFER].join('')] 291 else this[BUFFER] = [Buffer.concat(this[BUFFER], this[BUFFERLENGTH])] 292 } 293 294 const ret = this[READ](n || null, this[BUFFER][0]) 295 this[MAYBE_EMIT_END]() 296 return ret 297 } 298 299 [READ](n, chunk) { 300 if (n === chunk.length || n === null) this[BUFFERSHIFT]() 301 else { 302 this[BUFFER][0] = chunk.slice(n) 303 chunk = chunk.slice(0, n) 304 this[BUFFERLENGTH] -= n 305 } 306 307 this.emit('data', chunk) 308 309 if (!this[BUFFER].length && !this[EOF]) this.emit('drain') 310 311 return chunk 312 } 313 314 end(chunk, encoding, cb) { 315 if (typeof chunk === 'function') (cb = chunk), (chunk = null) 316 if (typeof encoding === 'function') (cb = encoding), (encoding = 'utf8') 317 if (chunk) this.write(chunk, encoding) 318 if (cb) this.once('end', cb) 319 this[EOF] = true 320 this.writable = false 321 322 // if we haven't written anything, then go ahead and emit, 323 // even if we're not reading. 324 // we'll re-emit if a new 'end' listener is added anyway. 325 // This makes MP more suitable to write-only use cases. 326 if (this.flowing || !this[PAUSED]) this[MAYBE_EMIT_END]() 327 return this 328 } 329 330 // don't let the internal resume be overwritten 331 [RESUME]() { 332 if (this[DESTROYED]) return 333 334 this[PAUSED] = false 335 this[FLOWING] = true 336 this.emit('resume') 337 if (this[BUFFER].length) this[FLUSH]() 338 else if (this[EOF]) this[MAYBE_EMIT_END]() 339 else this.emit('drain') 340 } 341 342 resume() { 343 return this[RESUME]() 344 } 345 346 pause() { 347 this[FLOWING] = false 348 this[PAUSED] = true 349 } 350 351 get destroyed() { 352 return this[DESTROYED] 353 } 354 355 get flowing() { 356 return this[FLOWING] 357 } 358 359 get paused() { 360 return this[PAUSED] 361 } 362 363 [BUFFERPUSH](chunk) { 364 if (this[OBJECTMODE]) this[BUFFERLENGTH] += 1 365 else this[BUFFERLENGTH] += chunk.length 366 this[BUFFER].push(chunk) 367 } 368 369 [BUFFERSHIFT]() { 370 if (this[OBJECTMODE]) this[BUFFERLENGTH] -= 1 371 else this[BUFFERLENGTH] -= this[BUFFER][0].length 372 return this[BUFFER].shift() 373 } 374 375 [FLUSH](noDrain) { 376 do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]()) && this[BUFFER].length) 377 378 if (!noDrain && !this[BUFFER].length && !this[EOF]) this.emit('drain') 379 } 380 381 [FLUSHCHUNK](chunk) { 382 this.emit('data', chunk) 383 return this.flowing 384 } 385 386 pipe(dest, opts) { 387 if (this[DESTROYED]) return 388 389 const ended = this[EMITTED_END] 390 opts = opts || {} 391 if (dest === proc.stdout || dest === proc.stderr) opts.end = false 392 else opts.end = opts.end !== false 393 opts.proxyErrors = !!opts.proxyErrors 394 395 // piping an ended stream ends immediately 396 if (ended) { 397 if (opts.end) dest.end() 398 } else { 399 this[PIPES].push( 400 !opts.proxyErrors 401 ? new Pipe(this, dest, opts) 402 : new PipeProxyErrors(this, dest, opts) 403 ) 404 if (this[ASYNC]) defer(() => this[RESUME]()) 405 else this[RESUME]() 406 } 407 408 return dest 409 } 410 411 unpipe(dest) { 412 const p = this[PIPES].find(p => p.dest === dest) 413 if (p) { 414 this[PIPES].splice(this[PIPES].indexOf(p), 1) 415 p.unpipe() 416 } 417 } 418 419 addListener(ev, fn) { 420 return this.on(ev, fn) 421 } 422 423 on(ev, fn) { 424 const ret = super.on(ev, fn) 425 if (ev === 'data' && !this[PIPES].length && !this.flowing) this[RESUME]() 426 else if (ev === 'readable' && this[BUFFERLENGTH] !== 0) 427 super.emit('readable') 428 else if (isEndish(ev) && this[EMITTED_END]) { 429 super.emit(ev) 430 this.removeAllListeners(ev) 431 } else if (ev === 'error' && this[EMITTED_ERROR]) { 432 if (this[ASYNC]) defer(() => fn.call(this, this[EMITTED_ERROR])) 433 else fn.call(this, this[EMITTED_ERROR]) 434 } 435 return ret 436 } 437 438 get emittedEnd() { 439 return this[EMITTED_END] 440 } 441 442 [MAYBE_EMIT_END]() { 443 if ( 444 !this[EMITTING_END] && 445 !this[EMITTED_END] && 446 !this[DESTROYED] && 447 this[BUFFER].length === 0 && 448 this[EOF] 449 ) { 450 this[EMITTING_END] = true 451 this.emit('end') 452 this.emit('prefinish') 453 this.emit('finish') 454 if (this[CLOSED]) this.emit('close') 455 this[EMITTING_END] = false 456 } 457 } 458 459 emit(ev, data, ...extra) { 460 // error and close are only events allowed after calling destroy() 461 if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED]) 462 return 463 else if (ev === 'data') { 464 return !this[OBJECTMODE] && !data 465 ? false 466 : this[ASYNC] 467 ? defer(() => this[EMITDATA](data)) 468 : this[EMITDATA](data) 469 } else if (ev === 'end') { 470 return this[EMITEND]() 471 } else if (ev === 'close') { 472 this[CLOSED] = true 473 // don't emit close before 'end' and 'finish' 474 if (!this[EMITTED_END] && !this[DESTROYED]) return 475 const ret = super.emit('close') 476 this.removeAllListeners('close') 477 return ret 478 } else if (ev === 'error') { 479 this[EMITTED_ERROR] = data 480 super.emit(ERROR, data) 481 const ret = 482 !this[SIGNAL] || this.listeners('error').length 483 ? super.emit('error', data) 484 : false 485 this[MAYBE_EMIT_END]() 486 return ret 487 } else if (ev === 'resume') { 488 const ret = super.emit('resume') 489 this[MAYBE_EMIT_END]() 490 return ret 491 } else if (ev === 'finish' || ev === 'prefinish') { 492 const ret = super.emit(ev) 493 this.removeAllListeners(ev) 494 return ret 495 } 496 497 // Some other unknown event 498 const ret = super.emit(ev, data, ...extra) 499 this[MAYBE_EMIT_END]() 500 return ret 501 } 502 503 [EMITDATA](data) { 504 for (const p of this[PIPES]) { 505 if (p.dest.write(data) === false) this.pause() 506 } 507 const ret = super.emit('data', data) 508 this[MAYBE_EMIT_END]() 509 return ret 510 } 511 512 [EMITEND]() { 513 if (this[EMITTED_END]) return 514 515 this[EMITTED_END] = true 516 this.readable = false 517 if (this[ASYNC]) defer(() => this[EMITEND2]()) 518 else this[EMITEND2]() 519 } 520 521 [EMITEND2]() { 522 if (this[DECODER]) { 523 const data = this[DECODER].end() 524 if (data) { 525 for (const p of this[PIPES]) { 526 p.dest.write(data) 527 } 528 super.emit('data', data) 529 } 530 } 531 532 for (const p of this[PIPES]) { 533 p.end() 534 } 535 const ret = super.emit('end') 536 this.removeAllListeners('end') 537 return ret 538 } 539 540 // const all = await stream.collect() 541 collect() { 542 const buf = [] 543 if (!this[OBJECTMODE]) buf.dataLength = 0 544 // set the promise first, in case an error is raised 545 // by triggering the flow here. 546 const p = this.promise() 547 this.on('data', c => { 548 buf.push(c) 549 if (!this[OBJECTMODE]) buf.dataLength += c.length 550 }) 551 return p.then(() => buf) 552 } 553 554 // const data = await stream.concat() 555 concat() { 556 return this[OBJECTMODE] 557 ? Promise.reject(new Error('cannot concat in objectMode')) 558 : this.collect().then(buf => 559 this[OBJECTMODE] 560 ? Promise.reject(new Error('cannot concat in objectMode')) 561 : this[ENCODING] 562 ? buf.join('') 563 : Buffer.concat(buf, buf.dataLength) 564 ) 565 } 566 567 // stream.promise().then(() => done, er => emitted error) 568 promise() { 569 return new Promise((resolve, reject) => { 570 this.on(DESTROYED, () => reject(new Error('stream destroyed'))) 571 this.on('error', er => reject(er)) 572 this.on('end', () => resolve()) 573 }) 574 } 575 576 // for await (let chunk of stream) 577 [ASYNCITERATOR]() { 578 let stopped = false 579 const stop = () => { 580 this.pause() 581 stopped = true 582 return Promise.resolve({ done: true }) 583 } 584 const next = () => { 585 if (stopped) return stop() 586 const res = this.read() 587 if (res !== null) return Promise.resolve({ done: false, value: res }) 588 589 if (this[EOF]) return stop() 590 591 let resolve = null 592 let reject = null 593 const onerr = er => { 594 this.removeListener('data', ondata) 595 this.removeListener('end', onend) 596 this.removeListener(DESTROYED, ondestroy) 597 stop() 598 reject(er) 599 } 600 const ondata = value => { 601 this.removeListener('error', onerr) 602 this.removeListener('end', onend) 603 this.removeListener(DESTROYED, ondestroy) 604 this.pause() 605 resolve({ value: value, done: !!this[EOF] }) 606 } 607 const onend = () => { 608 this.removeListener('error', onerr) 609 this.removeListener('data', ondata) 610 this.removeListener(DESTROYED, ondestroy) 611 stop() 612 resolve({ done: true }) 613 } 614 const ondestroy = () => onerr(new Error('stream destroyed')) 615 return new Promise((res, rej) => { 616 reject = rej 617 resolve = res 618 this.once(DESTROYED, ondestroy) 619 this.once('error', onerr) 620 this.once('end', onend) 621 this.once('data', ondata) 622 }) 623 } 624 625 return { 626 next, 627 throw: stop, 628 return: stop, 629 [ASYNCITERATOR]() { 630 return this 631 }, 632 } 633 } 634 635 // for (let chunk of stream) 636 [ITERATOR]() { 637 let stopped = false 638 const stop = () => { 639 this.pause() 640 this.removeListener(ERROR, stop) 641 this.removeListener(DESTROYED, stop) 642 this.removeListener('end', stop) 643 stopped = true 644 return { done: true } 645 } 646 647 const next = () => { 648 if (stopped) return stop() 649 const value = this.read() 650 return value === null ? stop() : { value } 651 } 652 this.once('end', stop) 653 this.once(ERROR, stop) 654 this.once(DESTROYED, stop) 655 656 return { 657 next, 658 throw: stop, 659 return: stop, 660 [ITERATOR]() { 661 return this 662 }, 663 } 664 } 665 666 destroy(er) { 667 if (this[DESTROYED]) { 668 if (er) this.emit('error', er) 669 else this.emit(DESTROYED) 670 return this 671 } 672 673 this[DESTROYED] = true 674 675 // throw away all buffered data, it's never coming out 676 this[BUFFER].length = 0 677 this[BUFFERLENGTH] = 0 678 679 if (typeof this.close === 'function' && !this[CLOSED]) this.close() 680 681 if (er) this.emit('error', er) 682 // if no error to emit, still reject pending promises 683 else this.emit(DESTROYED) 684 685 return this 686 } 687 688 static isStream(s) { 689 return ( 690 !!s && 691 (s instanceof Minipass || 692 s instanceof Stream || 693 (s instanceof EE && 694 // readable 695 (typeof s.pipe === 'function' || 696 // writable 697 (typeof s.write === 'function' && typeof s.end === 'function')))) 698 ) 699 } 700} 701 702exports.Minipass = Minipass 703