• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1var Readable = require('readable-stream').Readable
2var shift = require('stream-shift')
3
4var stream2 = function (stream) {
5  if (stream._readableState) return stream
6  return new Readable({objectMode: true, highWaterMark: 16}).wrap(stream)
7}
8
9module.exports = function (stream) {
10  stream = stream2(stream)
11
12  var ended = false
13  var data = null
14  var err = null
15  var destroyed = false
16  var fn = null
17
18  var consume = function (e) {
19    if (e) {
20      destroyed = true
21      if (stream.destroy) stream.destroy(e)
22      return
23    }
24
25    data = null
26    err = null
27  }
28
29  var onresult = function () {
30    if (!fn) return
31    var tmp = fn
32    fn = undefined
33    tmp(err, data, consume)
34  }
35
36  var update = function () {
37    if (!fn) return
38    data = shift(stream)
39    if (data === null && !ended) return
40    onresult()
41  }
42
43  var onend = function () {
44    ended = true
45    onresult()
46  }
47
48  stream.on('readable', update)
49
50  stream.on('error', function (e) {
51    err = e
52    onresult()
53  })
54
55  stream.on('close', function () {
56    if (stream._readableState.ended) return
57    onend()
58  })
59
60  stream.on('end', onend)
61
62  return function (callback) {
63    if (destroyed) return
64    if (err) return callback(err, null, consume)
65    if (data) return callback(null, data, consume)
66    if (ended) return callback(null, null, consume)
67    fn = callback
68    update()
69  }
70}
71