• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1var Stream = require('stream')
2
3// through
4//
5// a stream that does nothing but re-emit the input.
6// useful for aggregating a series of changing but not ending streams into one stream)
7
8exports = module.exports = through
9through.through = through
10
11//create a readable writable stream.
12
13function through (write, end, opts) {
14  write = write || function (data) { this.queue(data) }
15  end = end || function () { this.queue(null) }
16
17  var ended = false, destroyed = false, buffer = [], _ended = false
18  var stream = new Stream()
19  stream.readable = stream.writable = true
20  stream.paused = false
21
22//  stream.autoPause   = !(opts && opts.autoPause   === false)
23  stream.autoDestroy = !(opts && opts.autoDestroy === false)
24
25  stream.write = function (data) {
26    write.call(this, data)
27    return !stream.paused
28  }
29
30  function drain() {
31    while(buffer.length && !stream.paused) {
32      var data = buffer.shift()
33      if(null === data)
34        return stream.emit('end')
35      else
36        stream.emit('data', data)
37    }
38  }
39
40  stream.queue = stream.push = function (data) {
41//    console.error(ended)
42    if(_ended) return stream
43    if(data === null) _ended = true
44    buffer.push(data)
45    drain()
46    return stream
47  }
48
49  //this will be registered as the first 'end' listener
50  //must call destroy next tick, to make sure we're after any
51  //stream piped from here.
52  //this is only a problem if end is not emitted synchronously.
53  //a nicer way to do this is to make sure this is the last listener for 'end'
54
55  stream.on('end', function () {
56    stream.readable = false
57    if(!stream.writable && stream.autoDestroy)
58      process.nextTick(function () {
59        stream.destroy()
60      })
61  })
62
63  function _end () {
64    stream.writable = false
65    end.call(stream)
66    if(!stream.readable && stream.autoDestroy)
67      stream.destroy()
68  }
69
70  stream.end = function (data) {
71    if(ended) return
72    ended = true
73    if(arguments.length) stream.write(data)
74    _end() // will emit or queue
75    return stream
76  }
77
78  stream.destroy = function () {
79    if(destroyed) return
80    destroyed = true
81    ended = true
82    buffer.length = 0
83    stream.writable = stream.readable = false
84    stream.emit('close')
85    return stream
86  }
87
88  stream.pause = function () {
89    if(stream.paused) return
90    stream.paused = true
91    return stream
92  }
93
94  stream.resume = function () {
95    if(stream.paused) {
96      stream.paused = false
97      stream.emit('resume')
98    }
99    drain()
100    //may have become paused again,
101    //as drain emits 'data'.
102    if(!stream.paused)
103      stream.emit('drain')
104    return stream
105  }
106  return stream
107}
108
109