1'use strict'; 2 3const common = require('../common'); 4const { 5 Stream, 6 Writable, 7 Readable, 8 Transform, 9 pipeline, 10 PassThrough 11} = require('stream'); 12const assert = require('assert'); 13const http = require('http'); 14const { promisify } = require('util'); 15 16{ 17 let finished = false; 18 const processed = []; 19 const expected = [ 20 Buffer.from('a'), 21 Buffer.from('b'), 22 Buffer.from('c') 23 ]; 24 25 const read = new Readable({ 26 read() {} 27 }); 28 29 const write = new Writable({ 30 write(data, enc, cb) { 31 processed.push(data); 32 cb(); 33 } 34 }); 35 36 write.on('finish', () => { 37 finished = true; 38 }); 39 40 for (let i = 0; i < expected.length; i++) { 41 read.push(expected[i]); 42 } 43 read.push(null); 44 45 pipeline(read, write, common.mustCall((err) => { 46 assert.ok(!err, 'no error'); 47 assert.ok(finished); 48 assert.deepStrictEqual(processed, expected); 49 })); 50} 51 52{ 53 const read = new Readable({ 54 read() {} 55 }); 56 57 assert.throws(() => { 58 pipeline(read, () => {}); 59 }, /ERR_MISSING_ARGS/); 60 assert.throws(() => { 61 pipeline(() => {}); 62 }, /ERR_MISSING_ARGS/); 63 assert.throws(() => { 64 pipeline(); 65 }, /ERR_INVALID_CALLBACK/); 66} 67 68{ 69 const read = new Readable({ 70 read() {} 71 }); 72 73 const write = new Writable({ 74 write(data, enc, cb) { 75 cb(); 76 } 77 }); 78 79 read.push('data'); 80 setImmediate(() => read.destroy()); 81 82 pipeline(read, write, common.mustCall((err) => { 83 assert.ok(err, 'should have an error'); 84 })); 85} 86 87{ 88 const read = new Readable({ 89 read() {} 90 }); 91 92 const write = new Writable({ 93 write(data, enc, cb) { 94 cb(); 95 } 96 }); 97 98 read.push('data'); 99 setImmediate(() => read.destroy(new Error('kaboom'))); 100 101 const dst = pipeline(read, write, common.mustCall((err) => { 102 assert.deepStrictEqual(err, new Error('kaboom')); 103 })); 104 105 assert.strictEqual(dst, write); 106} 107 108{ 109 const read = new Readable({ 110 read() {} 111 }); 112 113 const transform = new Transform({ 114 transform(data, enc, cb) { 115 cb(new Error('kaboom')); 116 } 117 }); 118 119 const write = new Writable({ 120 write(data, enc, cb) { 121 cb(); 122 } 123 }); 124 125 read.on('close', common.mustCall()); 126 transform.on('close', common.mustCall()); 127 write.on('close', common.mustCall()); 128 129 [read, transform, write].forEach((stream) => { 130 stream.on('error', common.mustCall((err) => { 131 assert.deepStrictEqual(err, new Error('kaboom')); 132 })); 133 }); 134 135 const dst = pipeline(read, transform, write, common.mustCall((err) => { 136 assert.deepStrictEqual(err, new Error('kaboom')); 137 })); 138 139 assert.strictEqual(dst, write); 140 141 read.push('hello'); 142} 143 144{ 145 const server = http.createServer((req, res) => { 146 const rs = new Readable({ 147 read() { 148 rs.push('hello'); 149 rs.push(null); 150 } 151 }); 152 153 pipeline(rs, res, () => {}); 154 }); 155 156 server.listen(0, () => { 157 const req = http.request({ 158 port: server.address().port 159 }); 160 161 req.end(); 162 req.on('response', (res) => { 163 const buf = []; 164 res.on('data', (data) => buf.push(data)); 165 res.on('end', common.mustCall(() => { 166 assert.deepStrictEqual( 167 Buffer.concat(buf), 168 Buffer.from('hello') 169 ); 170 server.close(); 171 })); 172 }); 173 }); 174} 175 176{ 177 const server = http.createServer((req, res) => { 178 let sent = false; 179 const rs = new Readable({ 180 read() { 181 if (sent) { 182 return; 183 } 184 sent = true; 185 rs.push('hello'); 186 }, 187 destroy: common.mustCall((err, cb) => { 188 // Prevents fd leaks by destroying http pipelines 189 cb(); 190 }) 191 }); 192 193 pipeline(rs, res, () => {}); 194 }); 195 196 server.listen(0, () => { 197 const req = http.request({ 198 port: server.address().port 199 }); 200 201 req.end(); 202 req.on('response', (res) => { 203 setImmediate(() => { 204 res.destroy(); 205 server.close(); 206 }); 207 }); 208 }); 209} 210 211{ 212 const server = http.createServer((req, res) => { 213 let sent = 0; 214 const rs = new Readable({ 215 read() { 216 if (sent++ > 10) { 217 return; 218 } 219 rs.push('hello'); 220 }, 221 destroy: common.mustCall((err, cb) => { 222 cb(); 223 }) 224 }); 225 226 pipeline(rs, res, () => {}); 227 }); 228 229 let cnt = 10; 230 231 const badSink = new Writable({ 232 write(data, enc, cb) { 233 cnt--; 234 if (cnt === 0) cb(new Error('kaboom')); 235 else cb(); 236 } 237 }); 238 239 server.listen(0, () => { 240 const req = http.request({ 241 port: server.address().port 242 }); 243 244 req.end(); 245 req.on('response', (res) => { 246 pipeline(res, badSink, common.mustCall((err) => { 247 assert.deepStrictEqual(err, new Error('kaboom')); 248 server.close(); 249 })); 250 }); 251 }); 252} 253 254{ 255 const server = http.createServer((req, res) => { 256 pipeline(req, res, common.mustCall()); 257 }); 258 259 server.listen(0, () => { 260 const req = http.request({ 261 port: server.address().port 262 }); 263 264 let sent = 0; 265 const rs = new Readable({ 266 read() { 267 if (sent++ > 10) { 268 return; 269 } 270 rs.push('hello'); 271 } 272 }); 273 274 pipeline(rs, req, common.mustCall(() => { 275 server.close(); 276 })); 277 278 req.on('response', (res) => { 279 let cnt = 10; 280 res.on('data', () => { 281 cnt--; 282 if (cnt === 0) rs.destroy(); 283 }); 284 }); 285 }); 286} 287 288{ 289 const makeTransform = () => { 290 const tr = new Transform({ 291 transform(data, enc, cb) { 292 cb(null, data); 293 } 294 }); 295 296 tr.on('close', common.mustCall()); 297 return tr; 298 }; 299 300 const rs = new Readable({ 301 read() { 302 rs.push('hello'); 303 } 304 }); 305 306 let cnt = 10; 307 308 const ws = new Writable({ 309 write(data, enc, cb) { 310 cnt--; 311 if (cnt === 0) return cb(new Error('kaboom')); 312 cb(); 313 } 314 }); 315 316 rs.on('close', common.mustCall()); 317 ws.on('close', common.mustCall()); 318 319 pipeline( 320 rs, 321 makeTransform(), 322 makeTransform(), 323 makeTransform(), 324 makeTransform(), 325 makeTransform(), 326 makeTransform(), 327 ws, 328 common.mustCall((err) => { 329 assert.deepStrictEqual(err, new Error('kaboom')); 330 }) 331 ); 332} 333 334{ 335 const oldStream = new Stream(); 336 337 oldStream.pause = oldStream.resume = () => {}; 338 oldStream.write = (data) => { 339 oldStream.emit('data', data); 340 return true; 341 }; 342 oldStream.end = () => { 343 oldStream.emit('end'); 344 }; 345 346 const expected = [ 347 Buffer.from('hello'), 348 Buffer.from('world') 349 ]; 350 351 const rs = new Readable({ 352 read() { 353 for (let i = 0; i < expected.length; i++) { 354 rs.push(expected[i]); 355 } 356 rs.push(null); 357 } 358 }); 359 360 const ws = new Writable({ 361 write(data, enc, cb) { 362 assert.deepStrictEqual(data, expected.shift()); 363 cb(); 364 } 365 }); 366 367 let finished = false; 368 369 ws.on('finish', () => { 370 finished = true; 371 }); 372 373 pipeline( 374 rs, 375 oldStream, 376 ws, 377 common.mustCall((err) => { 378 assert(!err, 'no error'); 379 assert(finished, 'last stream finished'); 380 }) 381 ); 382} 383 384{ 385 const oldStream = new Stream(); 386 387 oldStream.pause = oldStream.resume = () => {}; 388 oldStream.write = (data) => { 389 oldStream.emit('data', data); 390 return true; 391 }; 392 oldStream.end = () => { 393 oldStream.emit('end'); 394 }; 395 396 const destroyableOldStream = new Stream(); 397 398 destroyableOldStream.pause = destroyableOldStream.resume = () => {}; 399 destroyableOldStream.destroy = common.mustCall(() => { 400 destroyableOldStream.emit('close'); 401 }); 402 destroyableOldStream.write = (data) => { 403 destroyableOldStream.emit('data', data); 404 return true; 405 }; 406 destroyableOldStream.end = () => { 407 destroyableOldStream.emit('end'); 408 }; 409 410 const rs = new Readable({ 411 read() { 412 rs.destroy(new Error('stop')); 413 } 414 }); 415 416 const ws = new Writable({ 417 write(data, enc, cb) { 418 cb(); 419 } 420 }); 421 422 let finished = false; 423 424 ws.on('finish', () => { 425 finished = true; 426 }); 427 428 pipeline( 429 rs, 430 oldStream, 431 destroyableOldStream, 432 ws, 433 common.mustCall((err) => { 434 assert.deepStrictEqual(err, new Error('stop')); 435 assert(!finished, 'should not finish'); 436 }) 437 ); 438} 439 440{ 441 const pipelinePromise = promisify(pipeline); 442 443 async function run() { 444 const read = new Readable({ 445 read() {} 446 }); 447 448 const write = new Writable({ 449 write(data, enc, cb) { 450 cb(); 451 } 452 }); 453 454 read.push('data'); 455 read.push(null); 456 457 let finished = false; 458 459 write.on('finish', () => { 460 finished = true; 461 }); 462 463 await pipelinePromise(read, write); 464 465 assert(finished); 466 } 467 468 run(); 469} 470 471{ 472 const read = new Readable({ 473 read() {} 474 }); 475 476 const transform = new Transform({ 477 transform(data, enc, cb) { 478 cb(new Error('kaboom')); 479 } 480 }); 481 482 const write = new Writable({ 483 write(data, enc, cb) { 484 cb(); 485 } 486 }); 487 488 assert.throws( 489 () => pipeline(read, transform, write), 490 { code: 'ERR_INVALID_CALLBACK' } 491 ); 492} 493 494{ 495 const server = http.Server(function(req, res) { 496 res.write('asd'); 497 }); 498 server.listen(0, function() { 499 http.get({ port: this.address().port }, (res) => { 500 const stream = new PassThrough(); 501 502 // NOTE: 2 because Node 12 streams can emit 'error' 503 // multiple times. 504 stream.on('error', common.mustCall(2)); 505 506 pipeline( 507 res, 508 stream, 509 common.mustCall((err) => { 510 assert.ok(err); 511 // TODO(ronag): 512 // assert.strictEqual(err.message, 'oh no'); 513 server.close(); 514 }) 515 ); 516 517 stream.destroy(new Error('oh no')); 518 }).on('error', common.mustNotCall()); 519 }); 520} 521 522{ 523 const r = new Readable({ 524 read() {} 525 }); 526 r.push('hello'); 527 r.push('world'); 528 r.push(null); 529 let res = ''; 530 const w = new Writable({ 531 write(chunk, encoding, callback) { 532 res += chunk; 533 callback(); 534 } 535 }); 536 pipeline([r, w], common.mustCall((err) => { 537 assert.ok(!err); 538 assert.strictEqual(res, 'helloworld'); 539 })); 540} 541