1var Readable = require('readable-stream').Readable 2var shift = require('stream-shift') 3 4var stream2 = function (stream) { 5 if (stream._readableState) return stream 6 return new Readable({objectMode: true, highWaterMark: 16}).wrap(stream) 7} 8 9module.exports = function (stream) { 10 stream = stream2(stream) 11 12 var ended = false 13 var data = null 14 var err = null 15 var destroyed = false 16 var fn = null 17 18 var consume = function (e) { 19 if (e) { 20 destroyed = true 21 if (stream.destroy) stream.destroy(e) 22 return 23 } 24 25 data = null 26 err = null 27 } 28 29 var onresult = function () { 30 if (!fn) return 31 var tmp = fn 32 fn = undefined 33 tmp(err, data, consume) 34 } 35 36 var update = function () { 37 if (!fn) return 38 data = shift(stream) 39 if (data === null && !ended) return 40 onresult() 41 } 42 43 var onend = function () { 44 ended = true 45 onresult() 46 } 47 48 stream.on('readable', update) 49 50 stream.on('error', function (e) { 51 err = e 52 onresult() 53 }) 54 55 stream.on('close', function () { 56 if (stream._readableState.ended) return 57 onend() 58 }) 59 60 stream.on('end', onend) 61 62 return function (callback) { 63 if (destroyed) return 64 if (err) return callback(err, null, consume) 65 if (data) return callback(null, data, consume) 66 if (ended) return callback(null, null, consume) 67 fn = callback 68 update() 69 } 70} 71