• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1var tape = require('tape')
2var through = require('through2')
3var pumpify = require('./')
4var stream = require('stream')
5var duplexify = require('duplexify')
6
7tape('basic', function(t) {
8  t.plan(3)
9
10  var pipeline = pumpify(
11    through(function(data, enc, cb) {
12      t.same(data.toString(), 'hello')
13      cb(null, data.toString().toUpperCase())
14    }),
15    through(function(data, enc, cb) {
16      t.same(data.toString(), 'HELLO')
17      cb(null, data.toString().toLowerCase())
18    })
19  )
20
21  pipeline.write('hello')
22  pipeline.on('data', function(data) {
23    t.same(data.toString(), 'hello')
24    t.end()
25  })
26})
27
28tape('3 times', function(t) {
29  t.plan(4)
30
31  var pipeline = pumpify(
32    through(function(data, enc, cb) {
33      t.same(data.toString(), 'hello')
34      cb(null, data.toString().toUpperCase())
35    }),
36    through(function(data, enc, cb) {
37      t.same(data.toString(), 'HELLO')
38      cb(null, data.toString().toLowerCase())
39    }),
40    through(function(data, enc, cb) {
41      t.same(data.toString(), 'hello')
42      cb(null, data.toString().toUpperCase())
43    })
44  )
45
46  pipeline.write('hello')
47  pipeline.on('data', function(data) {
48    t.same(data.toString(), 'HELLO')
49    t.end()
50  })
51})
52
53tape('destroy', function(t) {
54  var test = through()
55  test.destroy = function() {
56    t.ok(true)
57    t.end()
58  }
59
60  var pipeline = pumpify(through(), test)
61
62  pipeline.destroy()
63})
64
65tape('close', function(t) {
66  var test = through()
67  var pipeline = pumpify(through(), test)
68
69  pipeline.on('error', function(err) {
70    t.same(err.message, 'lol')
71    t.end()
72  })
73
74  test.emit('error', new Error('lol'))
75})
76
77tape('end waits for last one', function(t) {
78  var ran = false
79
80  var a = through()
81  var b = through()
82  var c = through(function(data, enc, cb) {
83    setTimeout(function() {
84      ran = true
85      cb()
86    }, 100)
87  })
88
89  var pipeline = pumpify(a, b, c)
90
91  pipeline.write('foo')
92  pipeline.end(function() {
93    t.ok(ran)
94    t.end()
95  })
96
97  t.ok(!ran)
98})
99
100tape('always wait for finish', function(t) {
101  var a = new stream.Readable()
102  a._read = function() {}
103  a.push('hello')
104
105  var pipeline = pumpify(a, through(), through())
106  var ran = false
107
108  pipeline.on('finish', function() {
109    t.ok(ran)
110    t.end()
111  })
112
113  setTimeout(function() {
114    ran = true
115    a.push(null)
116  }, 100)
117})
118
119tape('async', function(t) {
120  var pipeline = pumpify()
121
122  t.plan(4)
123
124  pipeline.write('hello')
125  pipeline.on('data', function(data) {
126    t.same(data.toString(), 'HELLO')
127    t.end()
128  })
129
130  setTimeout(function() {
131    pipeline.setPipeline(
132      through(function(data, enc, cb) {
133        t.same(data.toString(), 'hello')
134        cb(null, data.toString().toUpperCase())
135      }),
136      through(function(data, enc, cb) {
137        t.same(data.toString(), 'HELLO')
138        cb(null, data.toString().toLowerCase())
139      }),
140      through(function(data, enc, cb) {
141        t.same(data.toString(), 'hello')
142        cb(null, data.toString().toUpperCase())
143      })
144    )
145  }, 100)
146})
147
148tape('early destroy', function(t) {
149  var a = through()
150  var b = through()
151  var c = through()
152
153  b.destroy = function() {
154    t.ok(true)
155    t.end()
156  }
157
158  var pipeline = pumpify()
159
160  pipeline.destroy()
161  setTimeout(function() {
162    pipeline.setPipeline(a, b, c)
163  }, 100)
164})
165
166tape('preserves error', function (t) {
167  var a = through()
168  var b = through(function (data, enc, cb) {
169    cb(new Error('stop'))
170  })
171  var c = through()
172  var s = pumpify()
173
174  s.on('error', function (err) {
175    t.same(err.message, 'stop')
176    t.end()
177  })
178
179  s.setPipeline(a, b, c)
180  s.resume()
181  s.write('hi')
182})
183
184tape('preserves error again', function (t) {
185  var ws = new stream.Writable()
186  var rs = new stream.Readable({highWaterMark: 16})
187
188  ws._write = function (data, enc, cb) {
189    cb(null)
190  }
191
192  rs._read = function () {
193    process.nextTick(function () {
194      rs.push('hello world')
195    })
196  }
197
198  var pumpifyErr = pumpify(
199    through(),
200    through(function(chunk, _, cb) {
201      cb(new Error('test'))
202    }),
203    ws
204  )
205
206  rs.pipe(pumpifyErr)
207    .on('error', function (err) {
208      t.ok(err)
209      t.ok(err.message !== 'premature close', 'does not close with premature close')
210      t.end()
211    })
212})
213
214tape('returns error from duplexify', function (t) {
215  var a = through()
216  var b = duplexify()
217  var s = pumpify()
218
219  s.setPipeline(a, b)
220
221  s.on('error', function (err) {
222    t.same(err.message, 'stop')
223    t.end()
224  })
225
226  s.write('data')
227  // Test passes if `.end()` is not called
228  s.end()
229
230  b.setWritable(through())
231
232  setImmediate(function () {
233    b.destroy(new Error('stop'))
234  })
235})
236