Name |
Date |
Size |
#Lines |
LOC |
||
---|---|---|---|---|---|---|
.. | - | - | ||||
LICENSE | D | 12-May-2024 | 756 | 16 | 12 | |
README.md | D | 12-May-2024 | 20.5 KiB | 607 | 505 | |
index.js | D | 12-May-2024 | 13.6 KiB | 538 | 427 | |
package.json | D | 12-May-2024 | 1.7 KiB | 71 | 70 |
README.md
1# minipass 2 3A _very_ minimal implementation of a [PassThrough 4stream](https://nodejs.org/api/stream.html#stream_class_stream_passthrough) 5 6[It's very 7fast](https://docs.google.com/spreadsheets/d/1oObKSrVwLX_7Ut4Z6g3fZW-AX1j1-k6w-cDsrkaSbHM/edit#gid=0) 8for objects, strings, and buffers. 9 10Supports pipe()ing (including multi-pipe() and backpressure 11transmission), buffering data until either a `data` event handler or 12`pipe()` is added (so you don't lose the first chunk), and most other 13cases where PassThrough is a good idea. 14 15There is a `read()` method, but it's much more efficient to consume 16data from this stream via `'data'` events or by calling `pipe()` into 17some other stream. Calling `read()` requires the buffer to be 18flattened in some cases, which requires copying memory. 19 20There is also no `unpipe()` method. Once you start piping, there is 21no stopping it! 22 23If you set `objectMode: true` in the options, then whatever is written 24will be emitted. Otherwise, it'll do a minimal amount of Buffer 25copying to ensure proper Streams semantics when `read(n)` is called. 26 27`objectMode` can also be set by doing `stream.objectMode = true`, or by 28writing any non-string/non-buffer data. `objectMode` cannot be set to 29false once it is set. 30 31This is not a `through` or `through2` stream. It doesn't transform 32the data, it just passes it right through. If you want to transform 33the data, extend the class, and override the `write()` method. Once 34you're done transforming the data however you want, call 35`super.write()` with the transform output. 36 37For some examples of streams that extend Minipass in various ways, check 38out: 39 40- [minizlib](http://npm.im/minizlib) 41- [fs-minipass](http://npm.im/fs-minipass) 42- [tar](http://npm.im/tar) 43- [minipass-collect](http://npm.im/minipass-collect) 44- [minipass-flush](http://npm.im/minipass-flush) 45- [minipass-pipeline](http://npm.im/minipass-pipeline) 46- [tap](http://npm.im/tap) 47- [tap-parser](http://npm.im/tap) 48- [treport](http://npm.im/tap) 49 50## Differences from Node.js Streams 51 52There are several things that make Minipass streams different from (and in 53some ways superior to) Node.js core streams. 54 55Please read these caveats if you are familiar with noode-core streams and 56intend to use Minipass streams in your programs. 57 58### Timing 59 60Minipass streams are designed to support synchronous use-cases. Thus, data 61is emitted as soon as it is available, always. It is buffered until read, 62but no longer. Another way to look at it is that Minipass streams are 63exactly as synchronous as the logic that writes into them. 64 65This can be surprising if your code relies on `PassThrough.write()` always 66providing data on the next tick rather than the current one, or being able 67to call `resume()` and not have the entire buffer disappear immediately. 68 69However, without this synchronicity guarantee, there would be no way for 70Minipass to achieve the speeds it does, or support the synchronous use 71cases that it does. Simply put, waiting takes time. 72 73This non-deferring approach makes Minipass streams much easier to reason 74about, especially in the context of Promises and other flow-control 75mechanisms. 76 77### No High/Low Water Marks 78 79Node.js core streams will optimistically fill up a buffer, returning `true` 80on all writes until the limit is hit, even if the data has nowhere to go. 81Then, they will not attempt to draw more data in until the buffer size dips 82below a minimum value. 83 84Minipass streams are much simpler. The `write()` method will return `true` 85if the data has somewhere to go (which is to say, given the timing 86guarantees, that the data is already there by the time `write()` returns). 87 88If the data has nowhere to go, then `write()` returns false, and the data 89sits in a buffer, to be drained out immediately as soon as anyone consumes 90it. 91 92### Hazards of Buffering (or: Why Minipass Is So Fast) 93 94Since data written to a Minipass stream is immediately written all the way 95through the pipeline, and `write()` always returns true/false based on 96whether the data was fully flushed, backpressure is communicated 97immediately to the upstream caller. This minimizes buffering. 98 99Consider this case: 100 101```js 102const {PassThrough} = require('stream') 103const p1 = new PassThrough({ highWaterMark: 1024 }) 104const p2 = new PassThrough({ highWaterMark: 1024 }) 105const p3 = new PassThrough({ highWaterMark: 1024 }) 106const p4 = new PassThrough({ highWaterMark: 1024 }) 107 108p1.pipe(p2).pipe(p3).pipe(p4) 109p4.on('data', () => console.log('made it through')) 110 111// this returns false and buffers, then writes to p2 on next tick (1) 112// p2 returns false and buffers, pausing p1, then writes to p3 on next tick (2) 113// p3 returns false and buffers, pausing p2, then writes to p4 on next tick (3) 114// p4 returns false and buffers, pausing p3, then emits 'data' and 'drain' 115// on next tick (4) 116// p3 sees p4's 'drain' event, and calls resume(), emitting 'resume' and 117// 'drain' on next tick (5) 118// p2 sees p3's 'drain', calls resume(), emits 'resume' and 'drain' on next tick (6) 119// p1 sees p2's 'drain', calls resume(), emits 'resume' and 'drain' on next 120// tick (7) 121 122p1.write(Buffer.alloc(2048)) // returns false 123``` 124 125Along the way, the data was buffered and deferred at each stage, and 126multiple event deferrals happened, for an unblocked pipeline where it was 127perfectly safe to write all the way through! 128 129Furthermore, setting a `highWaterMark` of `1024` might lead someone reading 130the code to think an advisory maximum of 1KiB is being set for the 131pipeline. However, the actual advisory buffering level is the _sum_ of 132`highWaterMark` values, since each one has its own bucket. 133 134Consider the Minipass case: 135 136```js 137const m1 = new Minipass() 138const m2 = new Minipass() 139const m3 = new Minipass() 140const m4 = new Minipass() 141 142m1.pipe(m2).pipe(m3).pipe(m4) 143m4.on('data', () => console.log('made it through')) 144 145// m1 is flowing, so it writes the data to m2 immediately 146// m2 is flowing, so it writes the data to m3 immediately 147// m3 is flowing, so it writes the data to m4 immediately 148// m4 is flowing, so it fires the 'data' event immediately, returns true 149// m4's write returned true, so m3 is still flowing, returns true 150// m3's write returned true, so m2 is still flowing, returns true 151// m2's write returned true, so m1 is still flowing, returns true 152// No event deferrals or buffering along the way! 153 154m1.write(Buffer.alloc(2048)) // returns true 155``` 156 157It is extremely unlikely that you _don't_ want to buffer any data written, 158or _ever_ buffer data that can be flushed all the way through. Neither 159node-core streams nor Minipass ever fail to buffer written data, but 160node-core streams do a lot of unnecessary buffering and pausing. 161 162As always, the faster implementation is the one that does less stuff and 163waits less time to do it. 164 165### Immediately emit `end` for empty streams (when not paused) 166 167If a stream is not paused, and `end()` is called before writing any data 168into it, then it will emit `end` immediately. 169 170If you have logic that occurs on the `end` event which you don't want to 171potentially happen immediately (for example, closing file descriptors, 172moving on to the next entry in an archive parse stream, etc.) then be sure 173to call `stream.pause()` on creation, and then `stream.resume()` once you 174are ready to respond to the `end` event. 175 176### Emit `end` When Asked 177 178One hazard of immediately emitting `'end'` is that you may not yet have had 179a chance to add a listener. In order to avoid this hazard, Minipass 180streams safely re-emit the `'end'` event if a new listener is added after 181`'end'` has been emitted. 182 183Ie, if you do `stream.on('end', someFunction)`, and the stream has already 184emitted `end`, then it will call the handler right away. (You can think of 185this somewhat like attaching a new `.then(fn)` to a previously-resolved 186Promise.) 187 188To prevent calling handlers multiple times who would not expect multiple 189ends to occur, all listeners are removed from the `'end'` event whenever it 190is emitted. 191 192### Impact of "immediate flow" on Tee-streams 193 194A "tee stream" is a stream piping to multiple destinations: 195 196```js 197const tee = new Minipass() 198t.pipe(dest1) 199t.pipe(dest2) 200t.write('foo') // goes to both destinations 201``` 202 203Since Minipass streams _immediately_ process any pending data through the 204pipeline when a new pipe destination is added, this can have surprising 205effects, especially when a stream comes in from some other function and may 206or may not have data in its buffer. 207 208```js 209// WARNING! WILL LOSE DATA! 210const src = new Minipass() 211src.write('foo') 212src.pipe(dest1) // 'foo' chunk flows to dest1 immediately, and is gone 213src.pipe(dest2) // gets nothing! 214``` 215 216The solution is to create a dedicated tee-stream junction that pipes to 217both locations, and then pipe to _that_ instead. 218 219```js 220// Safe example: tee to both places 221const src = new Minipass() 222src.write('foo') 223const tee = new Minipass() 224tee.pipe(dest1) 225tee.pipe(dest2) 226stream.pipe(tee) // tee gets 'foo', pipes to both locations 227``` 228 229The same caveat applies to `on('data')` event listeners. The first one 230added will _immediately_ receive all of the data, leaving nothing for the 231second: 232 233```js 234// WARNING! WILL LOSE DATA! 235const src = new Minipass() 236src.write('foo') 237src.on('data', handler1) // receives 'foo' right away 238src.on('data', handler2) // nothing to see here! 239``` 240 241Using a dedicated tee-stream can be used in this case as well: 242 243```js 244// Safe example: tee to both data handlers 245const src = new Minipass() 246src.write('foo') 247const tee = new Minipass() 248tee.on('data', handler1) 249tee.on('data', handler2) 250src.pipe(tee) 251``` 252 253## USAGE 254 255It's a stream! Use it like a stream and it'll most likely do what you want. 256 257```js 258const Minipass = require('minipass') 259const mp = new Minipass(options) // optional: { encoding, objectMode } 260mp.write('foo') 261mp.pipe(someOtherStream) 262mp.end('bar') 263``` 264 265### OPTIONS 266 267* `encoding` How would you like the data coming _out_ of the stream to be 268 encoded? Accepts any values that can be passed to `Buffer.toString()`. 269* `objectMode` Emit data exactly as it comes in. This will be flipped on 270 by default if you write() something other than a string or Buffer at any 271 point. Setting `objectMode: true` will prevent setting any encoding 272 value. 273 274### API 275 276Implements the user-facing portions of Node.js's `Readable` and `Writable` 277streams. 278 279### Methods 280 281* `write(chunk, [encoding], [callback])` - Put data in. (Note that, in the 282 base Minipass class, the same data will come out.) Returns `false` if 283 the stream will buffer the next write, or true if it's still in 284 "flowing" mode. 285* `end([chunk, [encoding]], [callback])` - Signal that you have no more 286 data to write. This will queue an `end` event to be fired when all the 287 data has been consumed. 288* `setEncoding(encoding)` - Set the encoding for data coming of the 289 stream. This can only be done once. 290* `pause()` - No more data for a while, please. This also prevents `end` 291 from being emitted for empty streams until the stream is resumed. 292* `resume()` - Resume the stream. If there's data in the buffer, it is 293 all discarded. Any buffered events are immediately emitted. 294* `pipe(dest)` - Send all output to the stream provided. There is no way 295 to unpipe. When data is emitted, it is immediately written to any and 296 all pipe destinations. 297* `on(ev, fn)`, `emit(ev, fn)` - Minipass streams are EventEmitters. 298 Some events are given special treatment, however. (See below under 299 "events".) 300* `promise()` - Returns a Promise that resolves when the stream emits 301 `end`, or rejects if the stream emits `error`. 302* `collect()` - Return a Promise that resolves on `end` with an array 303 containing each chunk of data that was emitted, or rejects if the 304 stream emits `error`. Note that this consumes the stream data. 305* `concat()` - Same as `collect()`, but concatenates the data into a 306 single Buffer object. Will reject the returned promise if the stream is 307 in objectMode, or if it goes into objectMode by the end of the data. 308* `read(n)` - Consume `n` bytes of data out of the buffer. If `n` is not 309 provided, then consume all of it. If `n` bytes are not available, then 310 it returns null. **Note** consuming streams in this way is less 311 efficient, and can lead to unnecessary Buffer copying. 312* `destroy([er])` - Destroy the stream. If an error is provided, then an 313 `'error'` event is emitted. If the stream has a `close()` method, and 314 has not emitted a `'close'` event yet, then `stream.close()` will be 315 called. Any Promises returned by `.promise()`, `.collect()` or 316 `.concat()` will be rejected. After being destroyed, writing to the 317 stream will emit an error. No more data will be emitted if the stream is 318 destroyed, even if it was previously buffered. 319 320### Properties 321 322* `bufferLength` Read-only. Total number of bytes buffered, or in the case 323 of objectMode, the total number of objects. 324* `encoding` The encoding that has been set. (Setting this is equivalent 325 to calling `setEncoding(enc)` and has the same prohibition against 326 setting multiple times.) 327* `flowing` Read-only. Boolean indicating whether a chunk written to the 328 stream will be immediately emitted. 329* `emittedEnd` Read-only. Boolean indicating whether the end-ish events 330 (ie, `end`, `prefinish`, `finish`) have been emitted. Note that 331 listening on any end-ish event will immediateyl re-emit it if it has 332 already been emitted. 333* `writable` Whether the stream is writable. Default `true`. Set to 334 `false` when `end()` 335* `readable` Whether the stream is readable. Default `true`. 336* `buffer` A [yallist](http://npm.im/yallist) linked list of chunks written 337 to the stream that have not yet been emitted. (It's probably a bad idea 338 to mess with this.) 339* `pipes` A [yallist](http://npm.im/yallist) linked list of streams that 340 this stream is piping into. (It's probably a bad idea to mess with 341 this.) 342* `destroyed` A getter that indicates whether the stream was destroyed. 343* `paused` True if the stream has been explicitly paused, otherwise false. 344* `objectMode` Indicates whether the stream is in `objectMode`. Once set 345 to `true`, it cannot be set to `false`. 346 347### Events 348 349* `data` Emitted when there's data to read. Argument is the data to read. 350 This is never emitted while not flowing. If a listener is attached, that 351 will resume the stream. 352* `end` Emitted when there's no more data to read. This will be emitted 353 immediately for empty streams when `end()` is called. If a listener is 354 attached, and `end` was already emitted, then it will be emitted again. 355 All listeners are removed when `end` is emitted. 356* `prefinish` An end-ish event that follows the same logic as `end` and is 357 emitted in the same conditions where `end` is emitted. Emitted after 358 `'end'`. 359* `finish` An end-ish event that follows the same logic as `end` and is 360 emitted in the same conditions where `end` is emitted. Emitted after 361 `'prefinish'`. 362* `close` An indication that an underlying resource has been released. 363 Minipass does not emit this event, but will defer it until after `end` 364 has been emitted, since it throws off some stream libraries otherwise. 365* `drain` Emitted when the internal buffer empties, and it is again 366 suitable to `write()` into the stream. 367* `readable` Emitted when data is buffered and ready to be read by a 368 consumer. 369* `resume` Emitted when stream changes state from buffering to flowing 370 mode. (Ie, when `resume` is called, `pipe` is called, or a `data` event 371 listener is added.) 372 373### Static Methods 374 375* `Minipass.isStream(stream)` Returns `true` if the argument is a stream, 376 and false otherwise. To be considered a stream, the object must be 377 either an instance of Minipass, or an EventEmitter that has either a 378 `pipe()` method, or both `write()` and `end()` methods. (Pretty much any 379 stream in node-land will return `true` for this.) 380 381## EXAMPLES 382 383Here are some examples of things you can do with Minipass streams. 384 385### simple "are you done yet" promise 386 387```js 388mp.promise().then(() => { 389 // stream is finished 390}, er => { 391 // stream emitted an error 392}) 393``` 394 395### collecting 396 397```js 398mp.collect().then(all => { 399 // all is an array of all the data emitted 400 // encoding is supported in this case, so 401 // so the result will be a collection of strings if 402 // an encoding is specified, or buffers/objects if not. 403 // 404 // In an async function, you may do 405 // const data = await stream.collect() 406}) 407``` 408 409### collecting into a single blob 410 411This is a bit slower because it concatenates the data into one chunk for 412you, but if you're going to do it yourself anyway, it's convenient this 413way: 414 415```js 416mp.concat().then(onebigchunk => { 417 // onebigchunk is a string if the stream 418 // had an encoding set, or a buffer otherwise. 419}) 420``` 421 422### iteration 423 424You can iterate over streams synchronously or asynchronously in 425platforms that support it. 426 427Synchronous iteration will end when the currently available data is 428consumed, even if the `end` event has not been reached. In string and 429buffer mode, the data is concatenated, so unless multiple writes are 430occurring in the same tick as the `read()`, sync iteration loops will 431generally only have a single iteration. 432 433To consume chunks in this way exactly as they have been written, with 434no flattening, create the stream with the `{ objectMode: true }` 435option. 436 437```js 438const mp = new Minipass({ objectMode: true }) 439mp.write('a') 440mp.write('b') 441for (let letter of mp) { 442 console.log(letter) // a, b 443} 444mp.write('c') 445mp.write('d') 446for (let letter of mp) { 447 console.log(letter) // c, d 448} 449mp.write('e') 450mp.end() 451for (let letter of mp) { 452 console.log(letter) // e 453} 454for (let letter of mp) { 455 console.log(letter) // nothing 456} 457``` 458 459Asynchronous iteration will continue until the end event is reached, 460consuming all of the data. 461 462```js 463const mp = new Minipass({ encoding: 'utf8' }) 464 465// some source of some data 466let i = 5 467const inter = setInterval(() => { 468 if (i --> 0) 469 mp.write(Buffer.from('foo\n', 'utf8')) 470 else { 471 mp.end() 472 clearInterval(inter) 473 } 474}, 100) 475 476// consume the data with asynchronous iteration 477async function consume () { 478 for await (let chunk of mp) { 479 console.log(chunk) 480 } 481 return 'ok' 482} 483 484consume().then(res => console.log(res)) 485// logs `foo\n` 5 times, and then `ok` 486``` 487 488### subclass that `console.log()`s everything written into it 489 490```js 491class Logger extends Minipass { 492 write (chunk, encoding, callback) { 493 console.log('WRITE', chunk, encoding) 494 return super.write(chunk, encoding, callback) 495 } 496 end (chunk, encoding, callback) { 497 console.log('END', chunk, encoding) 498 return super.end(chunk, encoding, callback) 499 } 500} 501 502someSource.pipe(new Logger()).pipe(someDest) 503``` 504 505### same thing, but using an inline anonymous class 506 507```js 508// js classes are fun 509someSource 510 .pipe(new (class extends Minipass { 511 emit (ev, ...data) { 512 // let's also log events, because debugging some weird thing 513 console.log('EMIT', ev) 514 return super.emit(ev, ...data) 515 } 516 write (chunk, encoding, callback) { 517 console.log('WRITE', chunk, encoding) 518 return super.write(chunk, encoding, callback) 519 } 520 end (chunk, encoding, callback) { 521 console.log('END', chunk, encoding) 522 return super.end(chunk, encoding, callback) 523 } 524 })) 525 .pipe(someDest) 526``` 527 528### subclass that defers 'end' for some reason 529 530```js 531class SlowEnd extends Minipass { 532 emit (ev, ...args) { 533 if (ev === 'end') { 534 console.log('going to end, hold on a sec') 535 setTimeout(() => { 536 console.log('ok, ready to end now') 537 super.emit('end', ...args) 538 }, 100) 539 } else { 540 return super.emit(ev, ...args) 541 } 542 } 543} 544``` 545 546### transform that creates newline-delimited JSON 547 548```js 549class NDJSONEncode extends Minipass { 550 write (obj, cb) { 551 try { 552 // JSON.stringify can throw, emit an error on that 553 return super.write(JSON.stringify(obj) + '\n', 'utf8', cb) 554 } catch (er) { 555 this.emit('error', er) 556 } 557 } 558 end (obj, cb) { 559 if (typeof obj === 'function') { 560 cb = obj 561 obj = undefined 562 } 563 if (obj !== undefined) { 564 this.write(obj) 565 } 566 return super.end(cb) 567 } 568} 569``` 570 571### transform that parses newline-delimited JSON 572 573```js 574class NDJSONDecode extends Minipass { 575 constructor (options) { 576 // always be in object mode, as far as Minipass is concerned 577 super({ objectMode: true }) 578 this._jsonBuffer = '' 579 } 580 write (chunk, encoding, cb) { 581 if (typeof chunk === 'string' && 582 typeof encoding === 'string' && 583 encoding !== 'utf8') { 584 chunk = Buffer.from(chunk, encoding).toString() 585 } else if (Buffer.isBuffer(chunk)) 586 chunk = chunk.toString() 587 } 588 if (typeof encoding === 'function') { 589 cb = encoding 590 } 591 const jsonData = (this._jsonBuffer + chunk).split('\n') 592 this._jsonBuffer = jsonData.pop() 593 for (let i = 0; i < jsonData.length; i++) { 594 let parsed 595 try { 596 super.write(parsed) 597 } catch (er) { 598 this.emit('error', er) 599 continue 600 } 601 } 602 if (cb) 603 cb() 604 } 605} 606``` 607