1var tape = require('tape') 2var through = require('through2') 3var concat = require('concat-stream') 4var net = require('net') 5var duplexify = require('./') 6 7tape('passthrough', function(t) { 8 t.plan(2) 9 10 var pt = through() 11 var dup = duplexify(pt, pt) 12 13 dup.end('hello world') 14 dup.on('finish', function() { 15 t.ok(true, 'should finish') 16 }) 17 dup.pipe(concat(function(data) { 18 t.same(data.toString(), 'hello world', 'same in as out') 19 })) 20}) 21 22tape('passthrough + double end', function(t) { 23 t.plan(2) 24 25 var pt = through() 26 var dup = duplexify(pt, pt) 27 28 dup.end('hello world') 29 dup.end() 30 31 dup.on('finish', function() { 32 t.ok(true, 'should finish') 33 }) 34 dup.pipe(concat(function(data) { 35 t.same(data.toString(), 'hello world', 'same in as out') 36 })) 37}) 38 39tape('async passthrough + end', function(t) { 40 t.plan(2) 41 42 var pt = through.obj({highWaterMark:1}, function(data, enc, cb) { 43 setTimeout(function() { 44 cb(null, data) 45 }, 100) 46 }) 47 48 var dup = duplexify(pt, pt) 49 50 dup.write('hello ') 51 dup.write('world') 52 dup.end() 53 54 dup.on('finish', function() { 55 t.ok(true, 'should finish') 56 }) 57 dup.pipe(concat(function(data) { 58 t.same(data.toString(), 'hello world', 'same in as out') 59 })) 60}) 61 62tape('duplex', function(t) { 63 var readExpected = ['read-a', 'read-b', 'read-c'] 64 var writeExpected = ['write-a', 'write-b', 'write-c'] 65 66 t.plan(readExpected.length+writeExpected.length+2) 67 68 var readable = through.obj() 69 var writable = through.obj(function(data, enc, cb) { 70 t.same(data, writeExpected.shift(), 'onwrite should match') 71 cb() 72 }) 73 74 var dup = duplexify.obj(writable, readable) 75 76 readExpected.slice().forEach(function(data) { 77 readable.write(data) 78 }) 79 readable.end() 80 81 writeExpected.slice().forEach(function(data) { 82 dup.write(data) 83 }) 84 dup.end() 85 86 dup.on('data', function(data) { 87 t.same(data, readExpected.shift(), 'ondata should match') 88 }) 89 dup.on('end', function() { 90 t.ok(true, 'should end') 91 }) 92 dup.on('finish', function() { 93 t.ok(true, 'should finish') 94 }) 95}) 96 97tape('async', function(t) { 98 var dup = duplexify() 99 var pt = through() 100 101 dup.pipe(concat(function(data) { 102 t.same(data.toString(), 'i was async', 'same in as out') 103 t.end() 104 })) 105 106 dup.write('i') 107 dup.write(' was ') 108 dup.end('async') 109 110 setTimeout(function() { 111 dup.setWritable(pt) 112 setTimeout(function() { 113 dup.setReadable(pt) 114 }, 50) 115 }, 50) 116}) 117 118tape('destroy', function(t) { 119 t.plan(2) 120 121 var write = through() 122 var read = through() 123 var dup = duplexify(write, read) 124 125 write.destroy = function() { 126 t.ok(true, 'write destroyed') 127 } 128 129 dup.on('close', function() { 130 t.ok(true, 'close emitted') 131 }) 132 133 dup.destroy() 134 dup.destroy() // should only work once 135}) 136 137tape('destroy both', function(t) { 138 t.plan(3) 139 140 var write = through() 141 var read = through() 142 var dup = duplexify(write, read) 143 144 write.destroy = function() { 145 t.ok(true, 'write destroyed') 146 } 147 148 read.destroy = function() { 149 t.ok(true, 'read destroyed') 150 } 151 152 dup.on('close', function() { 153 t.ok(true, 'close emitted') 154 }) 155 156 dup.destroy() 157 dup.destroy() // should only work once 158}) 159 160tape('bubble read errors', function(t) { 161 t.plan(2) 162 163 var write = through() 164 var read = through() 165 var dup = duplexify(write, read) 166 167 dup.on('error', function(err) { 168 t.same(err.message, 'read-error', 'received read error') 169 }) 170 dup.on('close', function() { 171 t.ok(true, 'close emitted') 172 }) 173 174 read.emit('error', new Error('read-error')) 175 write.emit('error', new Error('write-error')) // only emit first error 176}) 177 178tape('bubble write errors', function(t) { 179 t.plan(2) 180 181 var write = through() 182 var read = through() 183 var dup = duplexify(write, read) 184 185 dup.on('error', function(err) { 186 t.same(err.message, 'write-error', 'received write error') 187 }) 188 dup.on('close', function() { 189 t.ok(true, 'close emitted') 190 }) 191 192 write.emit('error', new Error('write-error')) 193 read.emit('error', new Error('read-error')) // only emit first error 194}) 195 196tape('reset writable / readable', function(t) { 197 t.plan(3) 198 199 var toUpperCase = function(data, enc, cb) { 200 cb(null, data.toString().toUpperCase()) 201 } 202 203 var passthrough = through() 204 var upper = through(toUpperCase) 205 var dup = duplexify(passthrough, passthrough) 206 207 dup.once('data', function(data) { 208 t.same(data.toString(), 'hello') 209 dup.setWritable(upper) 210 dup.setReadable(upper) 211 dup.once('data', function(data) { 212 t.same(data.toString(), 'HELLO') 213 dup.once('data', function(data) { 214 t.same(data.toString(), 'HI') 215 t.end() 216 }) 217 }) 218 dup.write('hello') 219 dup.write('hi') 220 }) 221 dup.write('hello') 222}) 223 224tape('cork', function(t) { 225 var passthrough = through() 226 var dup = duplexify(passthrough, passthrough) 227 var ok = false 228 229 dup.on('prefinish', function() { 230 dup.cork() 231 setTimeout(function() { 232 ok = true 233 dup.uncork() 234 }, 100) 235 }) 236 dup.on('finish', function() { 237 t.ok(ok) 238 t.end() 239 }) 240 dup.end() 241}) 242 243tape('prefinish not twice', function(t) { 244 var passthrough = through() 245 var dup = duplexify(passthrough, passthrough) 246 var prefinished = false 247 248 dup.on('prefinish', function() { 249 t.ok(!prefinished, 'only prefinish once') 250 prefinished = true 251 }) 252 253 dup.on('finish', function() { 254 t.end() 255 }) 256 257 dup.end() 258}) 259 260tape('close', function(t) { 261 var passthrough = through() 262 var dup = duplexify(passthrough, passthrough) 263 264 passthrough.emit('close') 265 dup.on('close', function() { 266 t.ok(true, 'should forward close') 267 t.end() 268 }) 269}) 270 271tape('works with node native streams (net)', function(t) { 272 t.plan(1) 273 274 var server = net.createServer(function(socket) { 275 var dup = duplexify(socket, socket) 276 277 dup.once('data', function(chunk) { 278 t.same(chunk, Buffer('hello world')) 279 server.close() 280 socket.end() 281 t.end() 282 }) 283 }) 284 285 server.listen(0, function () { 286 var socket = net.connect(server.address().port) 287 var dup = duplexify(socket, socket) 288 289 dup.write(Buffer('hello world')) 290 }) 291}) 292