• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# mississippi
2
3a collection of useful stream utility modules. learn how the modules work using this and then pick the ones you want and use them individually
4
5the goal of the modules included in mississippi is to make working with streams easy without sacrificing speed, error handling or composability.
6
7## usage
8
9```js
10var miss = require('mississippi')
11```
12
13## methods
14
15- [pipe](#pipe)
16- [each](#each)
17- [pipeline](#pipeline)
18- [duplex](#duplex)
19- [through](#through)
20- [from](#from)
21- [to](#to)
22- [concat](#concat)
23- [finished](#finished)
24- [parallel](#parallel)
25
26### pipe
27
28##### `miss.pipe(stream1, stream2, stream3, ..., cb)`
29
30Pipes streams together and destroys all of them if one of them closes. Calls `cb` with `(error)` if there was an error in any of the streams.
31
32When using standard `source.pipe(destination)` the source will _not_ be destroyed if the destination emits close or error. You are also not able to provide a callback to tell when the pipe has finished.
33
34`miss.pipe` does these two things for you, ensuring you handle stream errors 100% of the time (unhandled errors are probably the most common bug in most node streams code)
35
36#### original module
37
38`miss.pipe` is provided by [`require('pump')`](https://www.npmjs.com/package/pump)
39
40#### example
41
42```js
43// lets do a simple file copy
44var fs = require('fs')
45
46var read = fs.createReadStream('./original.zip')
47var write = fs.createWriteStream('./copy.zip')
48
49// use miss.pipe instead of read.pipe(write)
50miss.pipe(read, write, function (err) {
51  if (err) return console.error('Copy error!', err)
52  console.log('Copied successfully')
53})
54```
55
56### each
57
58##### `miss.each(stream, each, [done])`
59
60Iterate the data in `stream` one chunk at a time. Your `each` function will be called with `(data, next)` where data is a data chunk and next is a callback. Call `next` when you are ready to consume the next chunk.
61
62Optionally you can call `next` with an error to destroy the stream. You can also pass the optional third argument, `done`, which is a function that will be called with `(err)` when the stream ends. The `err` argument will be populated with an error if the stream emitted an error.
63
64#### original module
65
66`miss.each` is provided by [`require('stream-each')`](https://www.npmjs.com/package/stream-each)
67
68#### example
69
70```js
71var fs = require('fs')
72var split = require('split2')
73
74var newLineSeparatedNumbers = fs.createReadStream('numbers.txt')
75
76var pipeline = miss.pipeline(newLineSeparatedNumbers, split())
77miss.each(pipeline, eachLine, done)
78var sum = 0
79
80function eachLine (line, next) {
81  sum += parseInt(line.toString())
82  next()
83}
84
85function done (err) {
86  if (err) throw err
87  console.log('sum is', sum)
88}
89```
90
91### pipeline
92
93##### `var pipeline = miss.pipeline(stream1, stream2, stream3, ...)`
94
95Builds a pipeline from all the transform streams passed in as arguments by piping them together and returning a single stream object that lets you write to the first stream and read from the last stream.
96
97If you are pumping object streams together use `pipeline = miss.pipeline.obj(s1, s2, ...)`.
98
99If any of the streams in the pipeline emits an error or gets destroyed, or you destroy the stream it returns, all of the streams will be destroyed and cleaned up for you.
100
101#### original module
102
103`miss.pipeline` is provided by [`require('pumpify')`](https://www.npmjs.com/package/pumpify)
104
105#### example
106
107```js
108// first create some transform streams (note: these two modules are fictional)
109var imageResize = require('image-resizer-stream')({width: 400})
110var pngOptimizer = require('png-optimizer-stream')({quality: 60})
111
112// instead of doing a.pipe(b), use pipelin
113var resizeAndOptimize = miss.pipeline(imageResize, pngOptimizer)
114// `resizeAndOptimize` is a transform stream. when you write to it, it writes
115// to `imageResize`. when you read from it, it reads from `pngOptimizer`.
116// it handles piping all the streams together for you
117
118// use it like any other transform stream
119var fs = require('fs')
120
121var read = fs.createReadStream('./image.png')
122var write = fs.createWriteStream('./resized-and-optimized.png')
123
124miss.pipe(read, resizeAndOptimize, write, function (err) {
125  if (err) return console.error('Image processing error!', err)
126  console.log('Image processed successfully')
127})
128```
129
130### duplex
131
132##### `var duplex = miss.duplex([writable, readable, opts])`
133
134Take two separate streams, a writable and a readable, and turn them into a single [duplex (readable and writable) stream](https://nodejs.org/api/stream.html#stream_class_stream_duplex).
135
136The returned stream will emit data from the readable. When you write to it it writes to the writable.
137
138You can either choose to supply the writable and the readable at the time you create the stream, or you can do it later using the `.setWritable` and `.setReadable` methods and data written to the stream in the meantime will be buffered for you.
139
140#### original module
141
142`miss.duplex` is provided by [`require('duplexify')`](https://www.npmjs.com/package/duplexify)
143
144#### example
145
146```js
147// lets spawn a process and take its stdout and stdin and combine them into 1 stream
148var child = require('child_process')
149
150// @- tells it to read from stdin, --data-binary sets 'raw' binary mode
151var curl = child.spawn('curl -X POST --data-binary @- http://foo.com')
152
153// duplexCurl will write to stdin and read from stdout
154var duplexCurl = miss.duplex(curl.stdin, curl.stdout)
155```
156
157### through
158
159##### `var transformer = miss.through([options, transformFunction, flushFunction])`
160
161Make a custom [transform stream](https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_transform).
162
163The `options` object is passed to the internal transform stream and can be used to create an `objectMode` stream (or use the shortcut `miss.through.obj([...])`)
164
165The `transformFunction` is called when data is available for the writable side and has the signature `(chunk, encoding, cb)`. Within the function, add data to the readable side any number of times with `this.push(data)`. Call `cb()` to indicate processing of the `chunk` is complete. Or to easily emit a single error or chunk, call `cb(err, chunk)`
166
167The `flushFunction`, with signature `(cb)`, is called just before the stream is complete and should be used to wrap up stream processing.
168
169#### original module
170
171`miss.through` is provided by [`require('through2')`](https://www.npmjs.com/package/through2)
172
173#### example
174
175```js
176var fs = require('fs')
177
178var read = fs.createReadStream('./boring_lowercase.txt')
179var write = fs.createWriteStream('./AWESOMECASE.TXT')
180
181// Leaving out the options object
182var uppercaser = miss.through(
183  function (chunk, enc, cb) {
184    cb(null, chunk.toString().toUpperCase())
185  },
186  function (cb) {
187    cb(null, 'ONE LAST BIT OF UPPERCASE')
188  }
189)
190
191miss.pipe(read, uppercaser, write, function (err) {
192  if (err) return console.error('Trouble uppercasing!')
193  console.log('Splendid uppercasing!')
194})
195```
196
197### from
198
199##### `miss.from([opts], read)`
200
201Make a custom [readable stream](https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_readable).
202
203`opts` contains the options to pass on to the ReadableStream constructor e.g. for creating a readable object stream (or use the shortcut `miss.from.obj([...])`).
204
205Returns a readable stream that calls `read(size, next)` when data is requested from the stream.
206
207- `size` is the recommended amount of data (in bytes) to retrieve.
208- `next(err, chunk)` should be called when you're ready to emit more data.
209
210#### original module
211
212`miss.from` is provided by [`require('from2')`](https://www.npmjs.com/package/from2)
213
214#### example
215
216```js
217
218
219function fromString(string) {
220  return miss.from(function(size, next) {
221    // if there's no more content
222    // left in the string, close the stream.
223    if (string.length <= 0) return next(null, null)
224
225    // Pull in a new chunk of text,
226    // removing it from the string.
227    var chunk = string.slice(0, size)
228    string = string.slice(size)
229
230    // Emit "chunk" from the stream.
231    next(null, chunk)
232  })
233}
234
235// pipe "hello world" out
236// to stdout.
237fromString('hello world').pipe(process.stdout)
238```
239
240### to
241
242##### `miss.to([options], write, [flush])`
243
244Make a custom [writable stream](https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_writable).
245
246`opts` contains the options to pass on to the WritableStream constructor e.g. for creating a writable object stream (or use the shortcut `miss.to.obj([...])`).
247
248Returns a writable stream that calls `write(data, enc, cb)` when data is written to the stream.
249
250- `data` is the received data to write the destination.
251- `enc` encoding of the piece of data received.
252- `cb(err, data)` should be called when you're ready to write more data, or encountered an error.
253
254`flush(cb)` is called before `finish` is emitted and allows for cleanup steps to occur.
255
256#### original module
257
258`miss.to` is provided by [`require('flush-write-stream')`](https://www.npmjs.com/package/flush-write-stream)
259
260#### example
261
262```js
263var ws = miss.to(write, flush)
264
265ws.on('finish', function () {
266  console.log('finished')
267})
268
269ws.write('hello')
270ws.write('world')
271ws.end()
272
273function write (data, enc, cb) {
274  // i am your normal ._write method
275  console.log('writing', data.toString())
276  cb()
277}
278
279function flush (cb) {
280  // i am called before finish is emitted
281  setTimeout(cb, 1000) // wait 1 sec
282}
283```
284
285If you run the above it will produce the following output
286
287```
288writing hello
289writing world
290(nothing happens for 1 sec)
291finished
292```
293
294### concat
295
296##### `var concat = miss.concat(cb)`
297
298Returns a writable stream that concatenates all data written to the stream and calls a callback with the single result.
299
300Calling `miss.concat(cb)` returns a writable stream. `cb` is called when the writable stream is finished, e.g. when all data is done being written to it. `cb` is called with a single argument, `(data)`, which will contain the result of concatenating all the data written to the stream.
301
302Note that `miss.concat` will not handle stream errors for you. To handle errors, use `miss.pipe` or handle the `error` event manually.
303
304#### original module
305
306`miss.concat` is provided by [`require('concat-stream')`](https://www.npmjs.com/package/concat-stream)
307
308#### example
309
310```js
311var fs = require('fs')
312
313var readStream = fs.createReadStream('cat.png')
314var concatStream = miss.concat(gotPicture)
315
316function callback (err) {
317  if (err) {
318    console.error(err)
319    process.exit(1)
320  }
321}
322
323miss.pipe(readStream, concatStream, callback)
324
325function gotPicture(imageBuffer) {
326  // imageBuffer is all of `cat.png` as a node.js Buffer
327}
328
329function handleError(err) {
330  // handle your error appropriately here, e.g.:
331  console.error(err) // print the error to STDERR
332  process.exit(1) // exit program with non-zero exit code
333}
334```
335
336### finished
337
338##### `miss.finished(stream, cb)`
339
340Waits for `stream` to finish or error and then calls `cb` with `(err)`. `cb` will only be called once. `err` will be null if the stream finished without error, or else it will be populated with the error from the streams `error` event.
341
342This function is useful for simplifying stream handling code as it lets you handle success or error conditions in a single code path. It's used internally `miss.pipe`.
343
344#### original module
345
346`miss.finished` is provided by [`require('end-of-stream')`](https://www.npmjs.com/package/end-of-stream)
347
348#### example
349
350```js
351var copySource = fs.createReadStream('./movie.mp4')
352var copyDest = fs.createWriteStream('./movie-copy.mp4')
353
354copySource.pipe(copyDest)
355
356miss.finished(copyDest, function(err) {
357  if (err) return console.log('write failed', err)
358  console.log('write success')
359})
360```
361
362### parallel
363
364##### `miss.parallel(concurrency, each)`
365
366This works like `through` except you can process items in parallel, while still preserving the original input order.
367
368This is handy if you wanna take advantage of node's async I/O and process streams of items in batches. With this module you can build your very own streaming parallel job queue.
369
370Note that `miss.parallel` preserves input ordering, if you don't need that then you can use [through2-concurrent](https://github.com/almost/through2-concurrent) instead, which is very similar to this otherwise.
371
372#### original module
373
374`miss.parallel` is provided by [`require('parallel-transform')`](https://npmjs.org/parallel-transform)
375
376#### example
377
378This example fetches the GET HTTP headers for a stream of input URLs 5 at a time in parallel.
379
380```js
381function getResponse (item, cb) {
382  var r = request(item.url)
383  r.on('error', function (err) {
384    cb(err)
385  })
386  r.on('response', function (re) {
387    cb(null, {url: item.url, date: new Date(), status: re.statusCode, headers: re.headers})
388    r.abort()
389  })
390}
391
392miss.pipe(
393  fs.createReadStream('./urls.txt'), // one url per line
394  split(),
395  miss.parallel(5, getResponse),
396  miss.through(function (row, enc, next) {
397    console.log(JSON.stringify(row))
398    next()
399  })
400)
401```
402
403## see also
404
405- [substack/stream-handbook](https://github.com/substack/stream-handbook)
406- [nodejs.org/api/stream.html](https://nodejs.org/api/stream.html)
407- [awesome-nodejs-streams](https://github.com/thejmazz/awesome-nodejs-streams)
408
409## license
410
411Licensed under the BSD 2-clause license.
412