1'use strict' 2const EE = require('events') 3const Yallist = require('yallist') 4const SD = require('string_decoder').StringDecoder 5 6const EOF = Symbol('EOF') 7const MAYBE_EMIT_END = Symbol('maybeEmitEnd') 8const EMITTED_END = Symbol('emittedEnd') 9const EMITTING_END = Symbol('emittingEnd') 10const CLOSED = Symbol('closed') 11const READ = Symbol('read') 12const FLUSH = Symbol('flush') 13const FLUSHCHUNK = Symbol('flushChunk') 14const ENCODING = Symbol('encoding') 15const DECODER = Symbol('decoder') 16const FLOWING = Symbol('flowing') 17const PAUSED = Symbol('paused') 18const RESUME = Symbol('resume') 19const BUFFERLENGTH = Symbol('bufferLength') 20const BUFFERPUSH = Symbol('bufferPush') 21const BUFFERSHIFT = Symbol('bufferShift') 22const OBJECTMODE = Symbol('objectMode') 23const DESTROYED = Symbol('destroyed') 24 25// TODO remove when Node v8 support drops 26const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1' 27const ASYNCITERATOR = doIter && Symbol.asyncIterator 28 || Symbol('asyncIterator not implemented') 29const ITERATOR = doIter && Symbol.iterator 30 || Symbol('iterator not implemented') 31 32// Buffer in node 4.x < 4.5.0 doesn't have working Buffer.from 33// or Buffer.alloc, and Buffer in node 10 deprecated the ctor. 34// .M, this is fine .\^/M.. 35const B = Buffer.alloc ? Buffer 36 : /* istanbul ignore next */ require('safe-buffer').Buffer 37 38// events that mean 'the stream is over' 39// these are treated specially, and re-emitted 40// if they are listened for after emitting. 41const isEndish = ev => 42 ev === 'end' || 43 ev === 'finish' || 44 ev === 'prefinish' 45 46const isArrayBuffer = b => b instanceof ArrayBuffer || 47 typeof b === 'object' && 48 b.constructor && 49 b.constructor.name === 'ArrayBuffer' && 50 b.byteLength >= 0 51 52const isArrayBufferView = b => !B.isBuffer(b) && ArrayBuffer.isView(b) 53 54module.exports = class Minipass extends EE { 55 constructor (options) { 56 super() 57 this[FLOWING] = false 58 // whether we're explicitly paused 59 this[PAUSED] = false 60 this.pipes = new Yallist() 61 this.buffer = new Yallist() 62 this[OBJECTMODE] = options && options.objectMode || false 63 if (this[OBJECTMODE]) 64 this[ENCODING] = null 65 else 66 this[ENCODING] = options && options.encoding || null 67 if (this[ENCODING] === 'buffer') 68 this[ENCODING] = null 69 this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null 70 this[EOF] = false 71 this[EMITTED_END] = false 72 this[EMITTING_END] = false 73 this[CLOSED] = false 74 this.writable = true 75 this.readable = true 76 this[BUFFERLENGTH] = 0 77 this[DESTROYED] = false 78 } 79 80 get bufferLength () { return this[BUFFERLENGTH] } 81 82 get encoding () { return this[ENCODING] } 83 set encoding (enc) { 84 if (this[OBJECTMODE]) 85 throw new Error('cannot set encoding in objectMode') 86 87 if (this[ENCODING] && enc !== this[ENCODING] && 88 (this[DECODER] && this[DECODER].lastNeed || this[BUFFERLENGTH])) 89 throw new Error('cannot change encoding') 90 91 if (this[ENCODING] !== enc) { 92 this[DECODER] = enc ? new SD(enc) : null 93 if (this.buffer.length) 94 this.buffer = this.buffer.map(chunk => this[DECODER].write(chunk)) 95 } 96 97 this[ENCODING] = enc 98 } 99 100 setEncoding (enc) { 101 this.encoding = enc 102 } 103 104 get objectMode () { return this[OBJECTMODE] } 105 set objectMode (ॐ ) { this[OBJECTMODE] = this[OBJECTMODE] || !!ॐ } 106 107 write (chunk, encoding, cb) { 108 if (this[EOF]) 109 throw new Error('write after end') 110 111 if (this[DESTROYED]) { 112 this.emit('error', Object.assign( 113 new Error('Cannot call write after a stream was destroyed'), 114 { code: 'ERR_STREAM_DESTROYED' } 115 )) 116 return true 117 } 118 119 if (typeof encoding === 'function') 120 cb = encoding, encoding = 'utf8' 121 122 if (!encoding) 123 encoding = 'utf8' 124 125 // convert array buffers and typed array views into buffers 126 // at some point in the future, we may want to do the opposite! 127 // leave strings and buffers as-is 128 // anything else switches us into object mode 129 if (!this[OBJECTMODE] && !B.isBuffer(chunk)) { 130 if (isArrayBufferView(chunk)) 131 chunk = B.from(chunk.buffer, chunk.byteOffset, chunk.byteLength) 132 else if (isArrayBuffer(chunk)) 133 chunk = B.from(chunk) 134 else if (typeof chunk !== 'string') 135 // use the setter so we throw if we have encoding set 136 this.objectMode = true 137 } 138 139 // this ensures at this point that the chunk is a buffer or string 140 // don't buffer it up or send it to the decoder 141 if (!this.objectMode && !chunk.length) { 142 const ret = this.flowing 143 if (this[BUFFERLENGTH] !== 0) 144 this.emit('readable') 145 if (cb) 146 cb() 147 return ret 148 } 149 150 // fast-path writing strings of same encoding to a stream with 151 // an empty buffer, skipping the buffer/decoder dance 152 if (typeof chunk === 'string' && !this[OBJECTMODE] && 153 // unless it is a string already ready for us to use 154 !(encoding === this[ENCODING] && !this[DECODER].lastNeed)) { 155 chunk = B.from(chunk, encoding) 156 } 157 158 if (B.isBuffer(chunk) && this[ENCODING]) 159 chunk = this[DECODER].write(chunk) 160 161 try { 162 return this.flowing 163 ? (this.emit('data', chunk), this.flowing) 164 : (this[BUFFERPUSH](chunk), false) 165 } finally { 166 if (this[BUFFERLENGTH] !== 0) 167 this.emit('readable') 168 if (cb) 169 cb() 170 } 171 } 172 173 read (n) { 174 if (this[DESTROYED]) 175 return null 176 177 try { 178 if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH]) 179 return null 180 181 if (this[OBJECTMODE]) 182 n = null 183 184 if (this.buffer.length > 1 && !this[OBJECTMODE]) { 185 if (this.encoding) 186 this.buffer = new Yallist([ 187 Array.from(this.buffer).join('') 188 ]) 189 else 190 this.buffer = new Yallist([ 191 B.concat(Array.from(this.buffer), this[BUFFERLENGTH]) 192 ]) 193 } 194 195 return this[READ](n || null, this.buffer.head.value) 196 } finally { 197 this[MAYBE_EMIT_END]() 198 } 199 } 200 201 [READ] (n, chunk) { 202 if (n === chunk.length || n === null) 203 this[BUFFERSHIFT]() 204 else { 205 this.buffer.head.value = chunk.slice(n) 206 chunk = chunk.slice(0, n) 207 this[BUFFERLENGTH] -= n 208 } 209 210 this.emit('data', chunk) 211 212 if (!this.buffer.length && !this[EOF]) 213 this.emit('drain') 214 215 return chunk 216 } 217 218 end (chunk, encoding, cb) { 219 if (typeof chunk === 'function') 220 cb = chunk, chunk = null 221 if (typeof encoding === 'function') 222 cb = encoding, encoding = 'utf8' 223 if (chunk) 224 this.write(chunk, encoding) 225 if (cb) 226 this.once('end', cb) 227 this[EOF] = true 228 this.writable = false 229 230 // if we haven't written anything, then go ahead and emit, 231 // even if we're not reading. 232 // we'll re-emit if a new 'end' listener is added anyway. 233 // This makes MP more suitable to write-only use cases. 234 if (this.flowing || !this[PAUSED]) 235 this[MAYBE_EMIT_END]() 236 return this 237 } 238 239 // don't let the internal resume be overwritten 240 [RESUME] () { 241 if (this[DESTROYED]) 242 return 243 244 this[PAUSED] = false 245 this[FLOWING] = true 246 this.emit('resume') 247 if (this.buffer.length) 248 this[FLUSH]() 249 else if (this[EOF]) 250 this[MAYBE_EMIT_END]() 251 else 252 this.emit('drain') 253 } 254 255 resume () { 256 return this[RESUME]() 257 } 258 259 pause () { 260 this[FLOWING] = false 261 this[PAUSED] = true 262 } 263 264 get destroyed () { 265 return this[DESTROYED] 266 } 267 268 get flowing () { 269 return this[FLOWING] 270 } 271 272 get paused () { 273 return this[PAUSED] 274 } 275 276 [BUFFERPUSH] (chunk) { 277 if (this[OBJECTMODE]) 278 this[BUFFERLENGTH] += 1 279 else 280 this[BUFFERLENGTH] += chunk.length 281 return this.buffer.push(chunk) 282 } 283 284 [BUFFERSHIFT] () { 285 if (this.buffer.length) { 286 if (this[OBJECTMODE]) 287 this[BUFFERLENGTH] -= 1 288 else 289 this[BUFFERLENGTH] -= this.buffer.head.value.length 290 } 291 return this.buffer.shift() 292 } 293 294 [FLUSH] () { 295 do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]())) 296 297 if (!this.buffer.length && !this[EOF]) 298 this.emit('drain') 299 } 300 301 [FLUSHCHUNK] (chunk) { 302 return chunk ? (this.emit('data', chunk), this.flowing) : false 303 } 304 305 pipe (dest, opts) { 306 if (this[DESTROYED]) 307 return 308 309 const ended = this[EMITTED_END] 310 opts = opts || {} 311 if (dest === process.stdout || dest === process.stderr) 312 opts.end = false 313 else 314 opts.end = opts.end !== false 315 316 const p = { dest: dest, opts: opts, ondrain: _ => this[RESUME]() } 317 this.pipes.push(p) 318 319 dest.on('drain', p.ondrain) 320 this[RESUME]() 321 // piping an ended stream ends immediately 322 if (ended && p.opts.end) 323 p.dest.end() 324 return dest 325 } 326 327 addListener (ev, fn) { 328 return this.on(ev, fn) 329 } 330 331 on (ev, fn) { 332 try { 333 return super.on(ev, fn) 334 } finally { 335 if (ev === 'data' && !this.pipes.length && !this.flowing) 336 this[RESUME]() 337 else if (isEndish(ev) && this[EMITTED_END]) { 338 super.emit(ev) 339 this.removeAllListeners(ev) 340 } 341 } 342 } 343 344 get emittedEnd () { 345 return this[EMITTED_END] 346 } 347 348 [MAYBE_EMIT_END] () { 349 if (!this[EMITTING_END] && 350 !this[EMITTED_END] && 351 !this[DESTROYED] && 352 this.buffer.length === 0 && 353 this[EOF]) { 354 this[EMITTING_END] = true 355 this.emit('end') 356 this.emit('prefinish') 357 this.emit('finish') 358 if (this[CLOSED]) 359 this.emit('close') 360 this[EMITTING_END] = false 361 } 362 } 363 364 emit (ev, data) { 365 // error and close are only events allowed after calling destroy() 366 if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED]) 367 return 368 else if (ev === 'data') { 369 if (!data) 370 return 371 372 if (this.pipes.length) 373 this.pipes.forEach(p => 374 p.dest.write(data) === false && this.pause()) 375 } else if (ev === 'end') { 376 // only actual end gets this treatment 377 if (this[EMITTED_END] === true) 378 return 379 380 this[EMITTED_END] = true 381 this.readable = false 382 383 if (this[DECODER]) { 384 data = this[DECODER].end() 385 if (data) { 386 this.pipes.forEach(p => p.dest.write(data)) 387 super.emit('data', data) 388 } 389 } 390 391 this.pipes.forEach(p => { 392 p.dest.removeListener('drain', p.ondrain) 393 if (p.opts.end) 394 p.dest.end() 395 }) 396 } else if (ev === 'close') { 397 this[CLOSED] = true 398 // don't emit close before 'end' and 'finish' 399 if (!this[EMITTED_END] && !this[DESTROYED]) 400 return 401 } 402 403 // TODO: replace with a spread operator when Node v4 support drops 404 const args = new Array(arguments.length) 405 args[0] = ev 406 args[1] = data 407 if (arguments.length > 2) { 408 for (let i = 2; i < arguments.length; i++) { 409 args[i] = arguments[i] 410 } 411 } 412 413 try { 414 return super.emit.apply(this, args) 415 } finally { 416 if (!isEndish(ev)) 417 this[MAYBE_EMIT_END]() 418 else 419 this.removeAllListeners(ev) 420 } 421 } 422 423 // const all = await stream.collect() 424 collect () { 425 const buf = [] 426 buf.dataLength = 0 427 this.on('data', c => { 428 buf.push(c) 429 buf.dataLength += c.length 430 }) 431 return this.promise().then(() => buf) 432 } 433 434 // const data = await stream.concat() 435 concat () { 436 return this[OBJECTMODE] 437 ? Promise.reject(new Error('cannot concat in objectMode')) 438 : this.collect().then(buf => 439 this[OBJECTMODE] 440 ? Promise.reject(new Error('cannot concat in objectMode')) 441 : this[ENCODING] ? buf.join('') : B.concat(buf, buf.dataLength)) 442 } 443 444 // stream.promise().then(() => done, er => emitted error) 445 promise () { 446 return new Promise((resolve, reject) => { 447 this.on(DESTROYED, () => reject(new Error('stream destroyed'))) 448 this.on('end', () => resolve()) 449 this.on('error', er => reject(er)) 450 }) 451 } 452 453 // for await (let chunk of stream) 454 [ASYNCITERATOR] () { 455 const next = () => { 456 const res = this.read() 457 if (res !== null) 458 return Promise.resolve({ done: false, value: res }) 459 460 if (this[EOF]) 461 return Promise.resolve({ done: true }) 462 463 let resolve = null 464 let reject = null 465 const onerr = er => { 466 this.removeListener('data', ondata) 467 this.removeListener('end', onend) 468 reject(er) 469 } 470 const ondata = value => { 471 this.removeListener('error', onerr) 472 this.removeListener('end', onend) 473 this.pause() 474 resolve({ value: value, done: !!this[EOF] }) 475 } 476 const onend = () => { 477 this.removeListener('error', onerr) 478 this.removeListener('data', ondata) 479 resolve({ done: true }) 480 } 481 const ondestroy = () => onerr(new Error('stream destroyed')) 482 return new Promise((res, rej) => { 483 reject = rej 484 resolve = res 485 this.once(DESTROYED, ondestroy) 486 this.once('error', onerr) 487 this.once('end', onend) 488 this.once('data', ondata) 489 }) 490 } 491 492 return { next } 493 } 494 495 // for (let chunk of stream) 496 [ITERATOR] () { 497 const next = () => { 498 const value = this.read() 499 const done = value === null 500 return { value, done } 501 } 502 return { next } 503 } 504 505 destroy (er) { 506 if (this[DESTROYED]) { 507 if (er) 508 this.emit('error', er) 509 else 510 this.emit(DESTROYED) 511 return this 512 } 513 514 this[DESTROYED] = true 515 516 // throw away all buffered data, it's never coming out 517 this.buffer = new Yallist() 518 this[BUFFERLENGTH] = 0 519 520 if (typeof this.close === 'function' && !this[CLOSED]) 521 this.close() 522 523 if (er) 524 this.emit('error', er) 525 else // if no error to emit, still reject pending promises 526 this.emit(DESTROYED) 527 528 return this 529 } 530 531 static isStream (s) { 532 return !!s && (s instanceof Minipass || s instanceof EE && ( 533 typeof s.pipe === 'function' || // readable 534 (typeof s.write === 'function' && typeof s.end === 'function') // writable 535 )) 536 } 537} 538