• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1var eos = require('end-of-stream')
2var shift = require('stream-shift')
3
4module.exports = each
5
6function each (stream, fn, cb) {
7  var want = true
8  var error = null
9  var ended = false
10  var running = false
11  var calling = false
12
13  stream.on('readable', onreadable)
14  onreadable()
15
16  if (cb) eos(stream, {readable: true, writable: false}, done)
17  return stream
18
19  function done (err) {
20    if (!error) error = err
21    ended = true
22    if (!running) cb(error)
23  }
24
25  function onreadable () {
26    if (want) read()
27  }
28
29  function afterRead (err) {
30    running = false
31
32    if (err) {
33      error = err
34      if (ended) return cb(error)
35      stream.destroy(err)
36      return
37    }
38    if (ended) return cb(error)
39    if (!calling) read()
40  }
41
42  function read () {
43    while (!running && !ended) {
44      want = false
45
46      var data = shift(stream)
47      if (data === null) {
48        want = true
49        return
50      }
51
52      running = true
53      calling = true
54      fn(data, afterRead)
55      calling = false
56    }
57  }
58}
59