1'use strict'; 2 3const common = require('../common'); 4const { 5 Stream, 6 Writable, 7 Readable, 8 Transform, 9 pipeline, 10 PassThrough, 11 Duplex 12} = require('stream'); 13const assert = require('assert'); 14const http = require('http'); 15const { promisify } = require('util'); 16const net = require('net'); 17 18{ 19 let finished = false; 20 const processed = []; 21 const expected = [ 22 Buffer.from('a'), 23 Buffer.from('b'), 24 Buffer.from('c'), 25 ]; 26 27 const read = new Readable({ 28 read() {} 29 }); 30 31 const write = new Writable({ 32 write(data, enc, cb) { 33 processed.push(data); 34 cb(); 35 } 36 }); 37 38 write.on('finish', () => { 39 finished = true; 40 }); 41 42 for (let i = 0; i < expected.length; i++) { 43 read.push(expected[i]); 44 } 45 read.push(null); 46 47 pipeline(read, write, common.mustSucceed(() => { 48 assert.ok(finished); 49 assert.deepStrictEqual(processed, expected); 50 })); 51} 52 53{ 54 const read = new Readable({ 55 read() {} 56 }); 57 58 assert.throws(() => { 59 pipeline(read, () => {}); 60 }, /ERR_MISSING_ARGS/); 61 assert.throws(() => { 62 pipeline(() => {}); 63 }, /ERR_MISSING_ARGS/); 64 assert.throws(() => { 65 pipeline(); 66 }, /ERR_INVALID_CALLBACK/); 67} 68 69{ 70 const read = new Readable({ 71 read() {} 72 }); 73 74 const write = new Writable({ 75 write(data, enc, cb) { 76 cb(); 77 } 78 }); 79 80 read.push('data'); 81 setImmediate(() => read.destroy()); 82 83 pipeline(read, write, common.mustCall((err) => { 84 assert.ok(err, 'should have an error'); 85 })); 86} 87 88{ 89 const read = new Readable({ 90 read() {} 91 }); 92 93 const write = new Writable({ 94 write(data, enc, cb) { 95 cb(); 96 } 97 }); 98 99 read.push('data'); 100 setImmediate(() => read.destroy(new Error('kaboom'))); 101 102 const dst = pipeline(read, write, common.mustCall((err) => { 103 assert.deepStrictEqual(err, new Error('kaboom')); 104 })); 105 106 assert.strictEqual(dst, write); 107} 108 109{ 110 const read = new Readable({ 111 read() {} 112 }); 113 114 const transform = new Transform({ 115 transform(data, enc, cb) { 116 cb(new Error('kaboom')); 117 } 118 }); 119 120 const write = new Writable({ 121 write(data, enc, cb) { 122 cb(); 123 } 124 }); 125 126 read.on('close', common.mustCall()); 127 transform.on('close', common.mustCall()); 128 write.on('close', common.mustCall()); 129 130 [read, transform, write].forEach((stream) => { 131 stream.on('error', common.mustCall((err) => { 132 assert.deepStrictEqual(err, new Error('kaboom')); 133 })); 134 }); 135 136 const dst = pipeline(read, transform, write, common.mustCall((err) => { 137 assert.deepStrictEqual(err, new Error('kaboom')); 138 })); 139 140 assert.strictEqual(dst, write); 141 142 read.push('hello'); 143} 144 145{ 146 const server = http.createServer((req, res) => { 147 const rs = new Readable({ 148 read() { 149 rs.push('hello'); 150 rs.push(null); 151 } 152 }); 153 154 pipeline(rs, res, () => {}); 155 }); 156 157 server.listen(0, () => { 158 const req = http.request({ 159 port: server.address().port 160 }); 161 162 req.end(); 163 req.on('response', (res) => { 164 const buf = []; 165 res.on('data', (data) => buf.push(data)); 166 res.on('end', common.mustCall(() => { 167 assert.deepStrictEqual( 168 Buffer.concat(buf), 169 Buffer.from('hello') 170 ); 171 server.close(); 172 })); 173 }); 174 }); 175} 176 177{ 178 const server = http.createServer((req, res) => { 179 let sent = false; 180 const rs = new Readable({ 181 read() { 182 if (sent) { 183 return; 184 } 185 sent = true; 186 rs.push('hello'); 187 }, 188 destroy: common.mustCall((err, cb) => { 189 // Prevents fd leaks by destroying http pipelines 190 cb(); 191 }) 192 }); 193 194 pipeline(rs, res, () => {}); 195 }); 196 197 server.listen(0, () => { 198 const req = http.request({ 199 port: server.address().port 200 }); 201 202 req.end(); 203 req.on('response', (res) => { 204 setImmediate(() => { 205 res.destroy(); 206 server.close(); 207 }); 208 }); 209 }); 210} 211 212{ 213 const server = http.createServer((req, res) => { 214 let sent = 0; 215 const rs = new Readable({ 216 read() { 217 if (sent++ > 10) { 218 return; 219 } 220 rs.push('hello'); 221 }, 222 destroy: common.mustCall((err, cb) => { 223 cb(); 224 }) 225 }); 226 227 pipeline(rs, res, () => {}); 228 }); 229 230 let cnt = 10; 231 232 const badSink = new Writable({ 233 write(data, enc, cb) { 234 cnt--; 235 if (cnt === 0) cb(new Error('kaboom')); 236 else cb(); 237 } 238 }); 239 240 server.listen(0, () => { 241 const req = http.request({ 242 port: server.address().port 243 }); 244 245 req.end(); 246 req.on('response', (res) => { 247 pipeline(res, badSink, common.mustCall((err) => { 248 assert.deepStrictEqual(err, new Error('kaboom')); 249 server.close(); 250 })); 251 }); 252 }); 253} 254 255{ 256 const server = http.createServer((req, res) => { 257 pipeline(req, res, common.mustSucceed()); 258 }); 259 260 server.listen(0, () => { 261 const req = http.request({ 262 port: server.address().port 263 }); 264 265 let sent = 0; 266 const rs = new Readable({ 267 read() { 268 if (sent++ > 10) { 269 return; 270 } 271 rs.push('hello'); 272 } 273 }); 274 275 pipeline(rs, req, common.mustCall(() => { 276 server.close(); 277 })); 278 279 req.on('response', (res) => { 280 let cnt = 10; 281 res.on('data', () => { 282 cnt--; 283 if (cnt === 0) rs.destroy(); 284 }); 285 }); 286 }); 287} 288 289{ 290 const makeTransform = () => { 291 const tr = new Transform({ 292 transform(data, enc, cb) { 293 cb(null, data); 294 } 295 }); 296 297 tr.on('close', common.mustCall()); 298 return tr; 299 }; 300 301 const rs = new Readable({ 302 read() { 303 rs.push('hello'); 304 } 305 }); 306 307 let cnt = 10; 308 309 const ws = new Writable({ 310 write(data, enc, cb) { 311 cnt--; 312 if (cnt === 0) return cb(new Error('kaboom')); 313 cb(); 314 } 315 }); 316 317 rs.on('close', common.mustCall()); 318 ws.on('close', common.mustCall()); 319 320 pipeline( 321 rs, 322 makeTransform(), 323 makeTransform(), 324 makeTransform(), 325 makeTransform(), 326 makeTransform(), 327 makeTransform(), 328 ws, 329 common.mustCall((err) => { 330 assert.deepStrictEqual(err, new Error('kaboom')); 331 }) 332 ); 333} 334 335{ 336 const oldStream = new Stream(); 337 338 oldStream.pause = oldStream.resume = () => {}; 339 oldStream.write = (data) => { 340 oldStream.emit('data', data); 341 return true; 342 }; 343 oldStream.end = () => { 344 oldStream.emit('end'); 345 }; 346 347 const expected = [ 348 Buffer.from('hello'), 349 Buffer.from('world'), 350 ]; 351 352 const rs = new Readable({ 353 read() { 354 for (let i = 0; i < expected.length; i++) { 355 rs.push(expected[i]); 356 } 357 rs.push(null); 358 } 359 }); 360 361 const ws = new Writable({ 362 write(data, enc, cb) { 363 assert.deepStrictEqual(data, expected.shift()); 364 cb(); 365 } 366 }); 367 368 let finished = false; 369 370 ws.on('finish', () => { 371 finished = true; 372 }); 373 374 pipeline( 375 rs, 376 oldStream, 377 ws, 378 common.mustSucceed(() => { 379 assert(finished, 'last stream finished'); 380 }) 381 ); 382} 383 384{ 385 const oldStream = new Stream(); 386 387 oldStream.pause = oldStream.resume = () => {}; 388 oldStream.write = (data) => { 389 oldStream.emit('data', data); 390 return true; 391 }; 392 oldStream.end = () => { 393 oldStream.emit('end'); 394 }; 395 396 const destroyableOldStream = new Stream(); 397 398 destroyableOldStream.pause = destroyableOldStream.resume = () => {}; 399 destroyableOldStream.destroy = common.mustCall(() => { 400 destroyableOldStream.emit('close'); 401 }); 402 destroyableOldStream.write = (data) => { 403 destroyableOldStream.emit('data', data); 404 return true; 405 }; 406 destroyableOldStream.end = () => { 407 destroyableOldStream.emit('end'); 408 }; 409 410 const rs = new Readable({ 411 read() { 412 rs.destroy(new Error('stop')); 413 } 414 }); 415 416 const ws = new Writable({ 417 write(data, enc, cb) { 418 cb(); 419 } 420 }); 421 422 let finished = false; 423 424 ws.on('finish', () => { 425 finished = true; 426 }); 427 428 pipeline( 429 rs, 430 oldStream, 431 destroyableOldStream, 432 ws, 433 common.mustCall((err) => { 434 assert.deepStrictEqual(err, new Error('stop')); 435 assert(!finished, 'should not finish'); 436 }) 437 ); 438} 439 440{ 441 const pipelinePromise = promisify(pipeline); 442 443 async function run() { 444 const read = new Readable({ 445 read() {} 446 }); 447 448 const write = new Writable({ 449 write(data, enc, cb) { 450 cb(); 451 } 452 }); 453 454 read.push('data'); 455 read.push(null); 456 457 let finished = false; 458 459 write.on('finish', () => { 460 finished = true; 461 }); 462 463 await pipelinePromise(read, write); 464 465 assert(finished); 466 } 467 468 run(); 469} 470 471{ 472 const read = new Readable({ 473 read() {} 474 }); 475 476 const transform = new Transform({ 477 transform(data, enc, cb) { 478 cb(new Error('kaboom')); 479 } 480 }); 481 482 const write = new Writable({ 483 write(data, enc, cb) { 484 cb(); 485 } 486 }); 487 488 assert.throws( 489 () => pipeline(read, transform, write), 490 { code: 'ERR_INVALID_CALLBACK' } 491 ); 492} 493 494{ 495 const server = http.Server(function(req, res) { 496 res.write('asd'); 497 }); 498 server.listen(0, function() { 499 http.get({ port: this.address().port }, (res) => { 500 const stream = new PassThrough(); 501 502 stream.on('error', common.mustCall()); 503 504 pipeline( 505 res, 506 stream, 507 common.mustCall((err) => { 508 assert.strictEqual(err.message, 'oh no'); 509 server.close(); 510 }) 511 ); 512 513 stream.destroy(new Error('oh no')); 514 }).on('error', common.mustNotCall()); 515 }); 516} 517 518{ 519 let res = ''; 520 const w = new Writable({ 521 write(chunk, encoding, callback) { 522 res += chunk; 523 callback(); 524 } 525 }); 526 pipeline(function*() { 527 yield 'hello'; 528 yield 'world'; 529 }(), w, common.mustSucceed(() => { 530 assert.strictEqual(res, 'helloworld'); 531 })); 532} 533 534{ 535 let res = ''; 536 const w = new Writable({ 537 write(chunk, encoding, callback) { 538 res += chunk; 539 callback(); 540 } 541 }); 542 pipeline(async function*() { 543 await Promise.resolve(); 544 yield 'hello'; 545 yield 'world'; 546 }(), w, common.mustSucceed(() => { 547 assert.strictEqual(res, 'helloworld'); 548 })); 549} 550 551{ 552 let res = ''; 553 const w = new Writable({ 554 write(chunk, encoding, callback) { 555 res += chunk; 556 callback(); 557 } 558 }); 559 pipeline(function*() { 560 yield 'hello'; 561 yield 'world'; 562 }, w, common.mustSucceed(() => { 563 assert.strictEqual(res, 'helloworld'); 564 })); 565} 566 567{ 568 let res = ''; 569 const w = new Writable({ 570 write(chunk, encoding, callback) { 571 res += chunk; 572 callback(); 573 } 574 }); 575 pipeline(async function*() { 576 await Promise.resolve(); 577 yield 'hello'; 578 yield 'world'; 579 }, w, common.mustSucceed(() => { 580 assert.strictEqual(res, 'helloworld'); 581 })); 582} 583 584{ 585 let res = ''; 586 pipeline(async function*() { 587 await Promise.resolve(); 588 yield 'hello'; 589 yield 'world'; 590 }, async function*(source) { 591 for await (const chunk of source) { 592 yield chunk.toUpperCase(); 593 } 594 }, async function(source) { 595 for await (const chunk of source) { 596 res += chunk; 597 } 598 }, common.mustSucceed(() => { 599 assert.strictEqual(res, 'HELLOWORLD'); 600 })); 601} 602 603{ 604 pipeline(async function*() { 605 await Promise.resolve(); 606 yield 'hello'; 607 yield 'world'; 608 }, async function*(source) { 609 for await (const chunk of source) { 610 yield chunk.toUpperCase(); 611 } 612 }, async function(source) { 613 let ret = ''; 614 for await (const chunk of source) { 615 ret += chunk; 616 } 617 return ret; 618 }, common.mustSucceed((val) => { 619 assert.strictEqual(val, 'HELLOWORLD'); 620 })); 621} 622 623{ 624 // AsyncIterable destination is returned and finalizes. 625 626 const ret = pipeline(async function*() { 627 await Promise.resolve(); 628 yield 'hello'; 629 }, async function*(source) { 630 for await (const chunk of source) {} 631 }, common.mustCall((err) => { 632 assert.strictEqual(err, undefined); 633 })); 634 ret.resume(); 635 assert.strictEqual(typeof ret.pipe, 'function'); 636} 637 638{ 639 // AsyncFunction destination is not returned and error is 640 // propagated. 641 642 const ret = pipeline(async function*() { 643 await Promise.resolve(); 644 throw new Error('kaboom'); 645 }, async function*(source) { 646 for await (const chunk of source) {} 647 }, common.mustCall((err) => { 648 assert.strictEqual(err.message, 'kaboom'); 649 })); 650 ret.resume(); 651 assert.strictEqual(typeof ret.pipe, 'function'); 652} 653 654{ 655 const s = new PassThrough(); 656 pipeline(async function*() { 657 throw new Error('kaboom'); 658 }, s, common.mustCall((err) => { 659 assert.strictEqual(err.message, 'kaboom'); 660 assert.strictEqual(s.destroyed, true); 661 })); 662} 663 664{ 665 const s = new PassThrough(); 666 pipeline(async function*() { 667 throw new Error('kaboom'); 668 }(), s, common.mustCall((err) => { 669 assert.strictEqual(err.message, 'kaboom'); 670 assert.strictEqual(s.destroyed, true); 671 })); 672} 673 674{ 675 const s = new PassThrough(); 676 pipeline(function*() { 677 throw new Error('kaboom'); 678 }, s, common.mustCall((err, val) => { 679 assert.strictEqual(err.message, 'kaboom'); 680 assert.strictEqual(s.destroyed, true); 681 })); 682} 683 684{ 685 const s = new PassThrough(); 686 pipeline(function*() { 687 throw new Error('kaboom'); 688 }(), s, common.mustCall((err, val) => { 689 assert.strictEqual(err.message, 'kaboom'); 690 assert.strictEqual(s.destroyed, true); 691 })); 692} 693 694{ 695 const s = new PassThrough(); 696 pipeline(async function*() { 697 await Promise.resolve(); 698 yield 'hello'; 699 yield 'world'; 700 }, s, async function(source) { 701 for await (const chunk of source) { 702 throw new Error('kaboom'); 703 } 704 }, common.mustCall((err, val) => { 705 assert.strictEqual(err.message, 'kaboom'); 706 assert.strictEqual(s.destroyed, true); 707 })); 708} 709 710{ 711 const s = new PassThrough(); 712 const ret = pipeline(function() { 713 return ['hello', 'world']; 714 }, s, async function*(source) { 715 for await (const chunk of source) { 716 throw new Error('kaboom'); 717 } 718 }, common.mustCall((err) => { 719 assert.strictEqual(err.message, 'kaboom'); 720 assert.strictEqual(s.destroyed, true); 721 })); 722 ret.resume(); 723 assert.strictEqual(typeof ret.pipe, 'function'); 724} 725 726{ 727 // Legacy streams without async iterator. 728 729 const s = new PassThrough(); 730 s.push('asd'); 731 s.push(null); 732 s[Symbol.asyncIterator] = null; 733 let ret = ''; 734 pipeline(s, async function(source) { 735 for await (const chunk of source) { 736 ret += chunk; 737 } 738 }, common.mustCall((err) => { 739 assert.strictEqual(err, undefined); 740 assert.strictEqual(ret, 'asd'); 741 })); 742} 743 744{ 745 // v1 streams without read(). 746 747 const s = new Stream(); 748 process.nextTick(() => { 749 s.emit('data', 'asd'); 750 s.emit('end'); 751 }); 752 // 'destroyer' can be called multiple times, 753 // once from stream wrapper and 754 // once from iterator wrapper. 755 s.close = common.mustCallAtLeast(1); 756 let ret = ''; 757 pipeline(s, async function(source) { 758 for await (const chunk of source) { 759 ret += chunk; 760 } 761 }, common.mustCall((err) => { 762 assert.strictEqual(err, undefined); 763 assert.strictEqual(ret, 'asd'); 764 })); 765} 766 767{ 768 // v1 error streams without read(). 769 770 const s = new Stream(); 771 process.nextTick(() => { 772 s.emit('error', new Error('kaboom')); 773 }); 774 s.destroy = common.mustCall(); 775 pipeline(s, async function(source) { 776 }, common.mustCall((err) => { 777 assert.strictEqual(err.message, 'kaboom'); 778 })); 779} 780 781{ 782 const s = new PassThrough(); 783 assert.throws(() => { 784 pipeline(function(source) { 785 }, s, () => {}); 786 }, (err) => { 787 assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); 788 assert.strictEqual(s.destroyed, false); 789 return true; 790 }); 791} 792 793{ 794 const s = new PassThrough(); 795 assert.throws(() => { 796 pipeline(s, function(source) { 797 }, s, () => {}); 798 }, (err) => { 799 assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); 800 assert.strictEqual(s.destroyed, false); 801 return true; 802 }); 803} 804 805{ 806 const s = new PassThrough(); 807 assert.throws(() => { 808 pipeline(s, function(source) { 809 }, () => {}); 810 }, (err) => { 811 assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); 812 assert.strictEqual(s.destroyed, false); 813 return true; 814 }); 815} 816 817{ 818 const s = new PassThrough(); 819 assert.throws(() => { 820 pipeline(s, function*(source) { 821 }, () => {}); 822 }, (err) => { 823 assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); 824 assert.strictEqual(s.destroyed, false); 825 return true; 826 }); 827} 828 829{ 830 let res = ''; 831 pipeline(async function*() { 832 await Promise.resolve(); 833 yield 'hello'; 834 yield 'world'; 835 }, new Transform({ 836 transform(chunk, encoding, cb) { 837 cb(new Error('kaboom')); 838 } 839 }), async function(source) { 840 for await (const chunk of source) { 841 res += chunk; 842 } 843 }, common.mustCall((err) => { 844 assert.strictEqual(err.message, 'kaboom'); 845 assert.strictEqual(res, ''); 846 })); 847} 848 849{ 850 let res = ''; 851 pipeline(async function*() { 852 await Promise.resolve(); 853 yield 'hello'; 854 yield 'world'; 855 }, new Transform({ 856 transform(chunk, encoding, cb) { 857 process.nextTick(cb, new Error('kaboom')); 858 } 859 }), async function(source) { 860 for await (const chunk of source) { 861 res += chunk; 862 } 863 }, common.mustCall((err) => { 864 assert.strictEqual(err.message, 'kaboom'); 865 assert.strictEqual(res, ''); 866 })); 867} 868 869{ 870 let res = ''; 871 pipeline(async function*() { 872 await Promise.resolve(); 873 yield 'hello'; 874 yield 'world'; 875 }, new Transform({ 876 decodeStrings: false, 877 transform(chunk, encoding, cb) { 878 cb(null, chunk.toUpperCase()); 879 } 880 }), async function(source) { 881 for await (const chunk of source) { 882 res += chunk; 883 } 884 }, common.mustSucceed(() => { 885 assert.strictEqual(res, 'HELLOWORLD'); 886 })); 887} 888 889{ 890 // Ensure no unhandled rejection from async function. 891 892 pipeline(async function*() { 893 yield 'hello'; 894 }, async function(source) { 895 throw new Error('kaboom'); 896 }, common.mustCall((err) => { 897 assert.strictEqual(err.message, 'kaboom'); 898 })); 899} 900 901{ 902 const src = new PassThrough({ autoDestroy: false }); 903 const dst = new PassThrough({ autoDestroy: false }); 904 pipeline(src, dst, common.mustCall(() => { 905 assert.strictEqual(src.destroyed, false); 906 assert.strictEqual(dst.destroyed, false); 907 })); 908 src.end(); 909} 910 911{ 912 // Make sure 'close' before 'end' finishes without error 913 // if readable has received eof. 914 // Ref: https://github.com/nodejs/node/issues/29699 915 const r = new Readable(); 916 const w = new Writable({ 917 write(chunk, encoding, cb) { 918 cb(); 919 } 920 }); 921 pipeline(r, w, (err) => { 922 assert.strictEqual(err, undefined); 923 }); 924 r.push('asd'); 925 r.push(null); 926 r.emit('close'); 927} 928 929{ 930 const server = http.createServer((req, res) => { 931 }); 932 933 server.listen(0, () => { 934 const req = http.request({ 935 port: server.address().port 936 }); 937 938 const body = new PassThrough(); 939 pipeline( 940 body, 941 req, 942 common.mustSucceed(() => { 943 assert(!req.res); 944 assert(!req.aborted); 945 req.abort(); 946 server.close(); 947 }) 948 ); 949 body.end(); 950 }); 951} 952 953{ 954 const src = new PassThrough(); 955 const dst = new PassThrough(); 956 pipeline(src, dst, common.mustSucceed(() => { 957 assert.strictEqual(dst.destroyed, false); 958 })); 959 src.end(); 960} 961 962{ 963 const src = new PassThrough(); 964 const dst = new PassThrough(); 965 dst.readable = false; 966 pipeline(src, dst, common.mustSucceed(() => { 967 assert.strictEqual(dst.destroyed, false); 968 })); 969 src.end(); 970} 971 972{ 973 let res = ''; 974 const rs = new Readable({ 975 read() { 976 setImmediate(() => { 977 rs.push('hello'); 978 }); 979 } 980 }); 981 const ws = new Writable({ 982 write: common.mustNotCall() 983 }); 984 pipeline(rs, async function*(stream) { 985 /* eslint no-unused-vars: off */ 986 for await (const chunk of stream) { 987 throw new Error('kaboom'); 988 } 989 }, async function *(source) { 990 for await (const chunk of source) { 991 res += chunk; 992 } 993 }, ws, common.mustCall((err) => { 994 assert.strictEqual(err.message, 'kaboom'); 995 assert.strictEqual(res, ''); 996 })); 997} 998 999{ 1000 const server = http.createServer((req, res) => { 1001 req.socket.on('error', common.mustNotCall()); 1002 pipeline(req, new PassThrough(), (err) => { 1003 assert.ifError(err); 1004 res.end(); 1005 server.close(); 1006 }); 1007 }); 1008 1009 server.listen(0, () => { 1010 const req = http.request({ 1011 method: 'PUT', 1012 port: server.address().port 1013 }); 1014 req.end('asd123'); 1015 req.on('response', common.mustCall()); 1016 req.on('error', common.mustNotCall()); 1017 }); 1018} 1019 1020{ 1021 // Might still want to be able to use the writable side 1022 // of src. This is in the case where e.g. the Duplex input 1023 // is not directly connected to its output. Such a case could 1024 // happen when the Duplex is reading from a socket and then echos 1025 // the data back on the same socket. 1026 const src = new PassThrough(); 1027 assert.strictEqual(src.writable, true); 1028 const dst = new PassThrough(); 1029 pipeline(src, dst, common.mustCall((err) => { 1030 assert.strictEqual(src.writable, true); 1031 assert.strictEqual(src.destroyed, false); 1032 })); 1033 src.push(null); 1034} 1035 1036{ 1037 const src = new PassThrough(); 1038 const dst = pipeline( 1039 src, 1040 async function * (source) { 1041 for await (const chunk of source) { 1042 yield chunk; 1043 } 1044 }, 1045 common.mustCall((err) => { 1046 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 1047 }) 1048 ); 1049 src.push('asd'); 1050 dst.destroy(); 1051} 1052 1053{ 1054 pipeline(async function * () { 1055 yield 'asd'; 1056 }, async function * (source) { 1057 for await (const chunk of source) { 1058 yield { chunk }; 1059 } 1060 }, common.mustSucceed()); 1061} 1062 1063{ 1064 let closed = false; 1065 const src = new Readable({ 1066 read() {}, 1067 destroy(err, cb) { 1068 process.nextTick(cb); 1069 } 1070 }); 1071 const dst = new Writable({ 1072 write(chunk, encoding, callback) { 1073 callback(); 1074 } 1075 }); 1076 src.on('close', () => { 1077 closed = true; 1078 }); 1079 src.push(null); 1080 pipeline(src, dst, common.mustCall((err) => { 1081 assert.strictEqual(closed, true); 1082 })); 1083} 1084 1085{ 1086 let closed = false; 1087 const src = new Readable({ 1088 read() {}, 1089 destroy(err, cb) { 1090 process.nextTick(cb); 1091 } 1092 }); 1093 const dst = new Duplex({}); 1094 src.on('close', common.mustCall(() => { 1095 closed = true; 1096 })); 1097 src.push(null); 1098 pipeline(src, dst, common.mustCall((err) => { 1099 assert.strictEqual(closed, true); 1100 })); 1101} 1102 1103{ 1104 const server = net.createServer(common.mustCall((socket) => { 1105 // echo server 1106 pipeline(socket, socket, common.mustSucceed()); 1107 // 13 force destroys the socket before it has a chance to emit finish 1108 socket.on('finish', common.mustCall(() => { 1109 server.close(); 1110 })); 1111 })).listen(0, common.mustCall(() => { 1112 const socket = net.connect(server.address().port); 1113 socket.end(); 1114 })); 1115} 1116 1117{ 1118 const d = new Duplex({ 1119 autoDestroy: false, 1120 write: common.mustCall((data, enc, cb) => { 1121 d.push(data); 1122 cb(); 1123 }), 1124 read: common.mustCall(() => { 1125 d.push(null); 1126 }), 1127 final: common.mustCall((cb) => { 1128 setTimeout(() => { 1129 assert.strictEqual(d.destroyed, false); 1130 cb(); 1131 }, 1000); 1132 }), 1133 destroy: common.mustNotCall() 1134 }); 1135 1136 const sink = new Writable({ 1137 write: common.mustCall((data, enc, cb) => { 1138 cb(); 1139 }) 1140 }); 1141 1142 pipeline(d, sink, common.mustSucceed()); 1143 1144 d.write('test'); 1145 d.end(); 1146} 1147 1148{ 1149 const server = net.createServer(common.mustCall((socket) => { 1150 // echo server 1151 pipeline(socket, socket, common.mustSucceed()); 1152 socket.on('finish', common.mustCall(() => { 1153 server.close(); 1154 })); 1155 })).listen(0, common.mustCall(() => { 1156 const socket = net.connect(server.address().port); 1157 socket.end(); 1158 })); 1159} 1160 1161{ 1162 const d = new Duplex({ 1163 autoDestroy: false, 1164 write: common.mustCall((data, enc, cb) => { 1165 d.push(data); 1166 cb(); 1167 }), 1168 read: common.mustCall(() => { 1169 d.push(null); 1170 }), 1171 final: common.mustCall((cb) => { 1172 setTimeout(() => { 1173 assert.strictEqual(d.destroyed, false); 1174 cb(); 1175 }, 1000); 1176 }), 1177 // `destroy()` won't be invoked by pipeline since 1178 // the writable side has not completed when 1179 // the pipeline has completed. 1180 destroy: common.mustNotCall() 1181 }); 1182 1183 const sink = new Writable({ 1184 write: common.mustCall((data, enc, cb) => { 1185 cb(); 1186 }) 1187 }); 1188 1189 pipeline(d, sink, common.mustSucceed()); 1190 1191 d.write('test'); 1192 d.end(); 1193} 1194 1195{ 1196 const r = new Readable({ 1197 read() {} 1198 }); 1199 r.push('hello'); 1200 r.push('world'); 1201 r.push(null); 1202 let res = ''; 1203 const w = new Writable({ 1204 write(chunk, encoding, callback) { 1205 res += chunk; 1206 callback(); 1207 } 1208 }); 1209 pipeline([r, w], common.mustSucceed(() => { 1210 assert.strictEqual(res, 'helloworld'); 1211 })); 1212} 1213 1214{ 1215 pipeline([1, 2, 3], PassThrough({ objectMode: true }), 1216 common.mustSucceed(() => {})); 1217 1218 let res = ''; 1219 const w = new Writable({ 1220 write(chunk, encoding, callback) { 1221 res += chunk; 1222 callback(); 1223 }, 1224 }); 1225 pipeline(['1', '2', '3'], w, common.mustSucceed(() => { 1226 assert.strictEqual(res, '123'); 1227 })); 1228} 1229{ 1230 function createThenable() { 1231 let counter = 0; 1232 return { 1233 get then() { 1234 if (counter++) { 1235 throw new Error('Cannot access `then` more than once'); 1236 } 1237 return Function.prototype; 1238 }, 1239 }; 1240 } 1241 1242 pipeline( 1243 function* () { 1244 yield 0; 1245 }, 1246 createThenable, 1247 () => common.mustNotCall(), 1248 ); 1249} 1250 1251{ 1252 const content = 'abc'; 1253 pipeline(Buffer.from(content), PassThrough({ objectMode: true }), 1254 common.mustSucceed(() => {})); 1255 1256 let res = ''; 1257 pipeline(Buffer.from(content), async function*(previous) { 1258 for await (const val of previous) { 1259 res += String.fromCharCode(val); 1260 yield val; 1261 } 1262 }, common.mustSucceed(() => { 1263 assert.strictEqual(res, content); 1264 })); 1265} 1266