1var tape = require('tape') 2var through = require('through2') 3var pumpify = require('./') 4var stream = require('stream') 5var duplexify = require('duplexify') 6 7tape('basic', function(t) { 8 t.plan(3) 9 10 var pipeline = pumpify( 11 through(function(data, enc, cb) { 12 t.same(data.toString(), 'hello') 13 cb(null, data.toString().toUpperCase()) 14 }), 15 through(function(data, enc, cb) { 16 t.same(data.toString(), 'HELLO') 17 cb(null, data.toString().toLowerCase()) 18 }) 19 ) 20 21 pipeline.write('hello') 22 pipeline.on('data', function(data) { 23 t.same(data.toString(), 'hello') 24 t.end() 25 }) 26}) 27 28tape('3 times', function(t) { 29 t.plan(4) 30 31 var pipeline = pumpify( 32 through(function(data, enc, cb) { 33 t.same(data.toString(), 'hello') 34 cb(null, data.toString().toUpperCase()) 35 }), 36 through(function(data, enc, cb) { 37 t.same(data.toString(), 'HELLO') 38 cb(null, data.toString().toLowerCase()) 39 }), 40 through(function(data, enc, cb) { 41 t.same(data.toString(), 'hello') 42 cb(null, data.toString().toUpperCase()) 43 }) 44 ) 45 46 pipeline.write('hello') 47 pipeline.on('data', function(data) { 48 t.same(data.toString(), 'HELLO') 49 t.end() 50 }) 51}) 52 53tape('destroy', function(t) { 54 var test = through() 55 test.destroy = function() { 56 t.ok(true) 57 t.end() 58 } 59 60 var pipeline = pumpify(through(), test) 61 62 pipeline.destroy() 63}) 64 65tape('close', function(t) { 66 var test = through() 67 var pipeline = pumpify(through(), test) 68 69 pipeline.on('error', function(err) { 70 t.same(err.message, 'lol') 71 t.end() 72 }) 73 74 test.emit('error', new Error('lol')) 75}) 76 77tape('end waits for last one', function(t) { 78 var ran = false 79 80 var a = through() 81 var b = through() 82 var c = through(function(data, enc, cb) { 83 setTimeout(function() { 84 ran = true 85 cb() 86 }, 100) 87 }) 88 89 var pipeline = pumpify(a, b, c) 90 91 pipeline.write('foo') 92 pipeline.end(function() { 93 t.ok(ran) 94 t.end() 95 }) 96 97 t.ok(!ran) 98}) 99 100tape('always wait for finish', function(t) { 101 var a = new stream.Readable() 102 a._read = function() {} 103 a.push('hello') 104 105 var pipeline = pumpify(a, through(), through()) 106 var ran = false 107 108 pipeline.on('finish', function() { 109 t.ok(ran) 110 t.end() 111 }) 112 113 setTimeout(function() { 114 ran = true 115 a.push(null) 116 }, 100) 117}) 118 119tape('async', function(t) { 120 var pipeline = pumpify() 121 122 t.plan(4) 123 124 pipeline.write('hello') 125 pipeline.on('data', function(data) { 126 t.same(data.toString(), 'HELLO') 127 t.end() 128 }) 129 130 setTimeout(function() { 131 pipeline.setPipeline( 132 through(function(data, enc, cb) { 133 t.same(data.toString(), 'hello') 134 cb(null, data.toString().toUpperCase()) 135 }), 136 through(function(data, enc, cb) { 137 t.same(data.toString(), 'HELLO') 138 cb(null, data.toString().toLowerCase()) 139 }), 140 through(function(data, enc, cb) { 141 t.same(data.toString(), 'hello') 142 cb(null, data.toString().toUpperCase()) 143 }) 144 ) 145 }, 100) 146}) 147 148tape('early destroy', function(t) { 149 var a = through() 150 var b = through() 151 var c = through() 152 153 b.destroy = function() { 154 t.ok(true) 155 t.end() 156 } 157 158 var pipeline = pumpify() 159 160 pipeline.destroy() 161 setTimeout(function() { 162 pipeline.setPipeline(a, b, c) 163 }, 100) 164}) 165 166tape('preserves error', function (t) { 167 var a = through() 168 var b = through(function (data, enc, cb) { 169 cb(new Error('stop')) 170 }) 171 var c = through() 172 var s = pumpify() 173 174 s.on('error', function (err) { 175 t.same(err.message, 'stop') 176 t.end() 177 }) 178 179 s.setPipeline(a, b, c) 180 s.resume() 181 s.write('hi') 182}) 183 184tape('preserves error again', function (t) { 185 var ws = new stream.Writable() 186 var rs = new stream.Readable({highWaterMark: 16}) 187 188 ws._write = function (data, enc, cb) { 189 cb(null) 190 } 191 192 rs._read = function () { 193 process.nextTick(function () { 194 rs.push('hello world') 195 }) 196 } 197 198 var pumpifyErr = pumpify( 199 through(), 200 through(function(chunk, _, cb) { 201 cb(new Error('test')) 202 }), 203 ws 204 ) 205 206 rs.pipe(pumpifyErr) 207 .on('error', function (err) { 208 t.ok(err) 209 t.ok(err.message !== 'premature close', 'does not close with premature close') 210 t.end() 211 }) 212}) 213 214tape('returns error from duplexify', function (t) { 215 var a = through() 216 var b = duplexify() 217 var s = pumpify() 218 219 s.setPipeline(a, b) 220 221 s.on('error', function (err) { 222 t.same(err.message, 'stop') 223 t.end() 224 }) 225 226 s.write('data') 227 // Test passes if `.end()` is not called 228 s.end() 229 230 b.setWritable(through()) 231 232 setImmediate(function () { 233 b.destroy(new Error('stop')) 234 }) 235}) 236