1# mississippi 2 3a collection of useful stream utility modules. learn how the modules work using this and then pick the ones you want and use them individually 4 5the goal of the modules included in mississippi is to make working with streams easy without sacrificing speed, error handling or composability. 6 7## usage 8 9```js 10var miss = require('mississippi') 11``` 12 13## methods 14 15- [pipe](#pipe) 16- [each](#each) 17- [pipeline](#pipeline) 18- [duplex](#duplex) 19- [through](#through) 20- [from](#from) 21- [to](#to) 22- [concat](#concat) 23- [finished](#finished) 24- [parallel](#parallel) 25 26### pipe 27 28##### `miss.pipe(stream1, stream2, stream3, ..., cb)` 29 30Pipes streams together and destroys all of them if one of them closes. Calls `cb` with `(error)` if there was an error in any of the streams. 31 32When using standard `source.pipe(destination)` the source will _not_ be destroyed if the destination emits close or error. You are also not able to provide a callback to tell when the pipe has finished. 33 34`miss.pipe` does these two things for you, ensuring you handle stream errors 100% of the time (unhandled errors are probably the most common bug in most node streams code) 35 36#### original module 37 38`miss.pipe` is provided by [`require('pump')`](https://www.npmjs.com/package/pump) 39 40#### example 41 42```js 43// lets do a simple file copy 44var fs = require('fs') 45 46var read = fs.createReadStream('./original.zip') 47var write = fs.createWriteStream('./copy.zip') 48 49// use miss.pipe instead of read.pipe(write) 50miss.pipe(read, write, function (err) { 51 if (err) return console.error('Copy error!', err) 52 console.log('Copied successfully') 53}) 54``` 55 56### each 57 58##### `miss.each(stream, each, [done])` 59 60Iterate the data in `stream` one chunk at a time. Your `each` function will be called with `(data, next)` where data is a data chunk and next is a callback. Call `next` when you are ready to consume the next chunk. 61 62Optionally you can call `next` with an error to destroy the stream. You can also pass the optional third argument, `done`, which is a function that will be called with `(err)` when the stream ends. The `err` argument will be populated with an error if the stream emitted an error. 63 64#### original module 65 66`miss.each` is provided by [`require('stream-each')`](https://www.npmjs.com/package/stream-each) 67 68#### example 69 70```js 71var fs = require('fs') 72var split = require('split2') 73 74var newLineSeparatedNumbers = fs.createReadStream('numbers.txt') 75 76var pipeline = miss.pipeline(newLineSeparatedNumbers, split()) 77miss.each(pipeline, eachLine, done) 78var sum = 0 79 80function eachLine (line, next) { 81 sum += parseInt(line.toString()) 82 next() 83} 84 85function done (err) { 86 if (err) throw err 87 console.log('sum is', sum) 88} 89``` 90 91### pipeline 92 93##### `var pipeline = miss.pipeline(stream1, stream2, stream3, ...)` 94 95Builds a pipeline from all the transform streams passed in as arguments by piping them together and returning a single stream object that lets you write to the first stream and read from the last stream. 96 97If you are pumping object streams together use `pipeline = miss.pipeline.obj(s1, s2, ...)`. 98 99If any of the streams in the pipeline emits an error or gets destroyed, or you destroy the stream it returns, all of the streams will be destroyed and cleaned up for you. 100 101#### original module 102 103`miss.pipeline` is provided by [`require('pumpify')`](https://www.npmjs.com/package/pumpify) 104 105#### example 106 107```js 108// first create some transform streams (note: these two modules are fictional) 109var imageResize = require('image-resizer-stream')({width: 400}) 110var pngOptimizer = require('png-optimizer-stream')({quality: 60}) 111 112// instead of doing a.pipe(b), use pipelin 113var resizeAndOptimize = miss.pipeline(imageResize, pngOptimizer) 114// `resizeAndOptimize` is a transform stream. when you write to it, it writes 115// to `imageResize`. when you read from it, it reads from `pngOptimizer`. 116// it handles piping all the streams together for you 117 118// use it like any other transform stream 119var fs = require('fs') 120 121var read = fs.createReadStream('./image.png') 122var write = fs.createWriteStream('./resized-and-optimized.png') 123 124miss.pipe(read, resizeAndOptimize, write, function (err) { 125 if (err) return console.error('Image processing error!', err) 126 console.log('Image processed successfully') 127}) 128``` 129 130### duplex 131 132##### `var duplex = miss.duplex([writable, readable, opts])` 133 134Take two separate streams, a writable and a readable, and turn them into a single [duplex (readable and writable) stream](https://nodejs.org/api/stream.html#stream_class_stream_duplex). 135 136The returned stream will emit data from the readable. When you write to it it writes to the writable. 137 138You can either choose to supply the writable and the readable at the time you create the stream, or you can do it later using the `.setWritable` and `.setReadable` methods and data written to the stream in the meantime will be buffered for you. 139 140#### original module 141 142`miss.duplex` is provided by [`require('duplexify')`](https://www.npmjs.com/package/duplexify) 143 144#### example 145 146```js 147// lets spawn a process and take its stdout and stdin and combine them into 1 stream 148var child = require('child_process') 149 150// @- tells it to read from stdin, --data-binary sets 'raw' binary mode 151var curl = child.spawn('curl -X POST --data-binary @- http://foo.com') 152 153// duplexCurl will write to stdin and read from stdout 154var duplexCurl = miss.duplex(curl.stdin, curl.stdout) 155``` 156 157### through 158 159##### `var transformer = miss.through([options, transformFunction, flushFunction])` 160 161Make a custom [transform stream](https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_transform). 162 163The `options` object is passed to the internal transform stream and can be used to create an `objectMode` stream (or use the shortcut `miss.through.obj([...])`) 164 165The `transformFunction` is called when data is available for the writable side and has the signature `(chunk, encoding, cb)`. Within the function, add data to the readable side any number of times with `this.push(data)`. Call `cb()` to indicate processing of the `chunk` is complete. Or to easily emit a single error or chunk, call `cb(err, chunk)` 166 167The `flushFunction`, with signature `(cb)`, is called just before the stream is complete and should be used to wrap up stream processing. 168 169#### original module 170 171`miss.through` is provided by [`require('through2')`](https://www.npmjs.com/package/through2) 172 173#### example 174 175```js 176var fs = require('fs') 177 178var read = fs.createReadStream('./boring_lowercase.txt') 179var write = fs.createWriteStream('./AWESOMECASE.TXT') 180 181// Leaving out the options object 182var uppercaser = miss.through( 183 function (chunk, enc, cb) { 184 cb(null, chunk.toString().toUpperCase()) 185 }, 186 function (cb) { 187 cb(null, 'ONE LAST BIT OF UPPERCASE') 188 } 189) 190 191miss.pipe(read, uppercaser, write, function (err) { 192 if (err) return console.error('Trouble uppercasing!') 193 console.log('Splendid uppercasing!') 194}) 195``` 196 197### from 198 199##### `miss.from([opts], read)` 200 201Make a custom [readable stream](https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_readable). 202 203`opts` contains the options to pass on to the ReadableStream constructor e.g. for creating a readable object stream (or use the shortcut `miss.from.obj([...])`). 204 205Returns a readable stream that calls `read(size, next)` when data is requested from the stream. 206 207- `size` is the recommended amount of data (in bytes) to retrieve. 208- `next(err, chunk)` should be called when you're ready to emit more data. 209 210#### original module 211 212`miss.from` is provided by [`require('from2')`](https://www.npmjs.com/package/from2) 213 214#### example 215 216```js 217 218 219function fromString(string) { 220 return miss.from(function(size, next) { 221 // if there's no more content 222 // left in the string, close the stream. 223 if (string.length <= 0) return next(null, null) 224 225 // Pull in a new chunk of text, 226 // removing it from the string. 227 var chunk = string.slice(0, size) 228 string = string.slice(size) 229 230 // Emit "chunk" from the stream. 231 next(null, chunk) 232 }) 233} 234 235// pipe "hello world" out 236// to stdout. 237fromString('hello world').pipe(process.stdout) 238``` 239 240### to 241 242##### `miss.to([options], write, [flush])` 243 244Make a custom [writable stream](https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_writable). 245 246`opts` contains the options to pass on to the WritableStream constructor e.g. for creating a writable object stream (or use the shortcut `miss.to.obj([...])`). 247 248Returns a writable stream that calls `write(data, enc, cb)` when data is written to the stream. 249 250- `data` is the received data to write the destination. 251- `enc` encoding of the piece of data received. 252- `cb(err, data)` should be called when you're ready to write more data, or encountered an error. 253 254`flush(cb)` is called before `finish` is emitted and allows for cleanup steps to occur. 255 256#### original module 257 258`miss.to` is provided by [`require('flush-write-stream')`](https://www.npmjs.com/package/flush-write-stream) 259 260#### example 261 262```js 263var ws = miss.to(write, flush) 264 265ws.on('finish', function () { 266 console.log('finished') 267}) 268 269ws.write('hello') 270ws.write('world') 271ws.end() 272 273function write (data, enc, cb) { 274 // i am your normal ._write method 275 console.log('writing', data.toString()) 276 cb() 277} 278 279function flush (cb) { 280 // i am called before finish is emitted 281 setTimeout(cb, 1000) // wait 1 sec 282} 283``` 284 285If you run the above it will produce the following output 286 287``` 288writing hello 289writing world 290(nothing happens for 1 sec) 291finished 292``` 293 294### concat 295 296##### `var concat = miss.concat(cb)` 297 298Returns a writable stream that concatenates all data written to the stream and calls a callback with the single result. 299 300Calling `miss.concat(cb)` returns a writable stream. `cb` is called when the writable stream is finished, e.g. when all data is done being written to it. `cb` is called with a single argument, `(data)`, which will contain the result of concatenating all the data written to the stream. 301 302Note that `miss.concat` will not handle stream errors for you. To handle errors, use `miss.pipe` or handle the `error` event manually. 303 304#### original module 305 306`miss.concat` is provided by [`require('concat-stream')`](https://www.npmjs.com/package/concat-stream) 307 308#### example 309 310```js 311var fs = require('fs') 312 313var readStream = fs.createReadStream('cat.png') 314var concatStream = miss.concat(gotPicture) 315 316function callback (err) { 317 if (err) { 318 console.error(err) 319 process.exit(1) 320 } 321} 322 323miss.pipe(readStream, concatStream, callback) 324 325function gotPicture(imageBuffer) { 326 // imageBuffer is all of `cat.png` as a node.js Buffer 327} 328 329function handleError(err) { 330 // handle your error appropriately here, e.g.: 331 console.error(err) // print the error to STDERR 332 process.exit(1) // exit program with non-zero exit code 333} 334``` 335 336### finished 337 338##### `miss.finished(stream, cb)` 339 340Waits for `stream` to finish or error and then calls `cb` with `(err)`. `cb` will only be called once. `err` will be null if the stream finished without error, or else it will be populated with the error from the streams `error` event. 341 342This function is useful for simplifying stream handling code as it lets you handle success or error conditions in a single code path. It's used internally `miss.pipe`. 343 344#### original module 345 346`miss.finished` is provided by [`require('end-of-stream')`](https://www.npmjs.com/package/end-of-stream) 347 348#### example 349 350```js 351var copySource = fs.createReadStream('./movie.mp4') 352var copyDest = fs.createWriteStream('./movie-copy.mp4') 353 354copySource.pipe(copyDest) 355 356miss.finished(copyDest, function(err) { 357 if (err) return console.log('write failed', err) 358 console.log('write success') 359}) 360``` 361 362### parallel 363 364##### `miss.parallel(concurrency, each)` 365 366This works like `through` except you can process items in parallel, while still preserving the original input order. 367 368This is handy if you wanna take advantage of node's async I/O and process streams of items in batches. With this module you can build your very own streaming parallel job queue. 369 370Note that `miss.parallel` preserves input ordering, if you don't need that then you can use [through2-concurrent](https://github.com/almost/through2-concurrent) instead, which is very similar to this otherwise. 371 372#### original module 373 374`miss.parallel` is provided by [`require('parallel-transform')`](https://npmjs.org/parallel-transform) 375 376#### example 377 378This example fetches the GET HTTP headers for a stream of input URLs 5 at a time in parallel. 379 380```js 381function getResponse (item, cb) { 382 var r = request(item.url) 383 r.on('error', function (err) { 384 cb(err) 385 }) 386 r.on('response', function (re) { 387 cb(null, {url: item.url, date: new Date(), status: re.statusCode, headers: re.headers}) 388 r.abort() 389 }) 390} 391 392miss.pipe( 393 fs.createReadStream('./urls.txt'), // one url per line 394 split(), 395 miss.parallel(5, getResponse), 396 miss.through(function (row, enc, next) { 397 console.log(JSON.stringify(row)) 398 next() 399 }) 400) 401``` 402 403## see also 404 405- [substack/stream-handbook](https://github.com/substack/stream-handbook) 406- [nodejs.org/api/stream.html](https://nodejs.org/api/stream.html) 407- [awesome-nodejs-streams](https://github.com/thejmazz/awesome-nodejs-streams) 408 409## license 410 411Licensed under the BSD 2-clause license. 412