1var eos = require('end-of-stream') 2var shift = require('stream-shift') 3 4module.exports = each 5 6function each (stream, fn, cb) { 7 var want = true 8 var error = null 9 var ended = false 10 var running = false 11 var calling = false 12 13 stream.on('readable', onreadable) 14 onreadable() 15 16 if (cb) eos(stream, {readable: true, writable: false}, done) 17 return stream 18 19 function done (err) { 20 if (!error) error = err 21 ended = true 22 if (!running) cb(error) 23 } 24 25 function onreadable () { 26 if (want) read() 27 } 28 29 function afterRead (err) { 30 running = false 31 32 if (err) { 33 error = err 34 if (ended) return cb(error) 35 stream.destroy(err) 36 return 37 } 38 if (ended) return cb(error) 39 if (!calling) read() 40 } 41 42 function read () { 43 while (!running && !ended) { 44 want = false 45 46 var data = shift(stream) 47 if (data === null) { 48 want = true 49 return 50 } 51 52 running = true 53 calling = true 54 fn(data, afterRead) 55 calling = false 56 } 57 } 58} 59