1// Flags: --expose-internals --no-warnings 2'use strict'; 3 4const common = require('../common'); 5const { isDisturbed, isErrored, isReadable } = require('stream'); 6const assert = require('assert'); 7const { 8 isPromise, 9} = require('util/types'); 10const { 11 setImmediate: delay 12} = require('timers/promises'); 13 14const { 15 ByteLengthQueuingStrategy, 16 CountQueuingStrategy, 17 ReadableStream, 18 ReadableStreamDefaultReader, 19 ReadableStreamDefaultController, 20 ReadableByteStreamController, 21 ReadableStreamBYOBReader, 22 ReadableStreamBYOBRequest, 23 WritableStream, 24} = require('stream/web'); 25 26const { 27 readableStreamPipeTo, 28 readableStreamTee, 29 readableByteStreamControllerConvertPullIntoDescriptor, 30 readableStreamDefaultControllerEnqueue, 31 readableByteStreamControllerEnqueue, 32 readableStreamDefaultControllerCanCloseOrEnqueue, 33 readableByteStreamControllerClose, 34 readableByteStreamControllerRespond, 35 readableStreamReaderGenericRelease, 36} = require('internal/webstreams/readablestream'); 37 38const { 39 kState 40} = require('internal/webstreams/util'); 41 42const { 43 createReadStream, 44 readFileSync, 45} = require('fs'); 46const { 47 Buffer, 48} = require('buffer'); 49 50const { 51 kTransfer, 52} = require('internal/worker/js_transferable'); 53 54const { 55 inspect, 56} = require('util'); 57 58{ 59 const r = new ReadableStream(); 60 assert.strictEqual(typeof r.locked, 'boolean'); 61 assert.strictEqual(typeof r.cancel, 'function'); 62 assert.strictEqual(typeof r.getReader, 'function'); 63 assert.strictEqual(typeof r.pipeThrough, 'function'); 64 assert.strictEqual(typeof r.pipeTo, 'function'); 65 assert.strictEqual(typeof r.tee, 'function'); 66 67 ['', null, 'asdf'].forEach((mode) => { 68 assert.throws(() => r.getReader({ mode }), { 69 code: 'ERR_INVALID_ARG_VALUE', 70 }); 71 }); 72 73 [1, 'asdf'].forEach((options) => { 74 assert.throws(() => r.getReader(options), { 75 code: 'ERR_INVALID_ARG_TYPE', 76 }); 77 }); 78 79 assert(!r.locked); 80 r.getReader(); 81 assert(r.locked); 82} 83 84{ 85 // Throw error and return rejected promise in `cancel()` method 86 // would execute same cleanup code 87 const r1 = new ReadableStream({ 88 cancel: () => { 89 return Promise.reject('Cancel Error'); 90 }, 91 }); 92 r1.cancel().finally(common.mustCall(() => { 93 const controllerState = r1[kState].controller[kState]; 94 95 assert.strictEqual(controllerState.pullAlgorithm, undefined); 96 assert.strictEqual(controllerState.cancelAlgorithm, undefined); 97 assert.strictEqual(controllerState.sizeAlgorithm, undefined); 98 })).catch(() => {}); 99 100 const r2 = new ReadableStream({ 101 cancel() { 102 throw new Error('Cancel Error'); 103 } 104 }); 105 r2.cancel().finally(common.mustCall(() => { 106 const controllerState = r2[kState].controller[kState]; 107 108 assert.strictEqual(controllerState.pullAlgorithm, undefined); 109 assert.strictEqual(controllerState.cancelAlgorithm, undefined); 110 assert.strictEqual(controllerState.sizeAlgorithm, undefined); 111 })).catch(() => {}); 112} 113 114{ 115 const source = { 116 start: common.mustCall((controller) => { 117 assert(controller instanceof ReadableStreamDefaultController); 118 }), 119 pull: common.mustCall((controller) => { 120 assert(controller instanceof ReadableStreamDefaultController); 121 }), 122 cancel: common.mustNotCall(), 123 }; 124 125 new ReadableStream(source); 126} 127 128{ 129 const source = { 130 start: common.mustCall(async (controller) => { 131 assert(controller instanceof ReadableStreamDefaultController); 132 }), 133 pull: common.mustCall(async (controller) => { 134 assert(controller instanceof ReadableStreamDefaultController); 135 }), 136 cancel: common.mustNotCall(), 137 }; 138 139 new ReadableStream(source); 140} 141 142{ 143 const source = { 144 start: common.mustCall((controller) => { 145 assert(controller instanceof ReadableByteStreamController); 146 }), 147 pull: common.mustNotCall(), 148 cancel: common.mustNotCall(), 149 type: 'bytes', 150 }; 151 152 new ReadableStream(source); 153} 154 155{ 156 const source = { 157 start: common.mustCall(async (controller) => { 158 assert(controller instanceof ReadableByteStreamController); 159 }), 160 pull: common.mustNotCall(), 161 cancel: common.mustNotCall(), 162 type: 'bytes', 163 }; 164 165 new ReadableStream(source); 166} 167 168{ 169 const source = { 170 start: common.mustCall(async (controller) => { 171 assert(controller instanceof ReadableByteStreamController); 172 }), 173 pull: common.mustCall(async (controller) => { 174 assert(controller instanceof ReadableByteStreamController); 175 }), 176 cancel: common.mustNotCall(), 177 type: 'bytes', 178 }; 179 180 new ReadableStream(source, { highWaterMark: 10 }); 181} 182 183{ 184 // These are silly but they should all work per spec 185 new ReadableStream(1); 186 new ReadableStream('hello'); 187 new ReadableStream(false); 188 new ReadableStream([]); 189 new ReadableStream(1, 1); 190 new ReadableStream(1, 'hello'); 191 new ReadableStream(1, false); 192 new ReadableStream(1, []); 193} 194 195['a', {}, false].forEach((size) => { 196 assert.throws(() => { 197 new ReadableStream({}, { size }); 198 }, { 199 code: 'ERR_INVALID_ARG_TYPE', 200 }); 201}); 202 203['a', {}].forEach((highWaterMark) => { 204 assert.throws(() => { 205 new ReadableStream({}, { highWaterMark }); 206 }, { 207 code: 'ERR_INVALID_ARG_VALUE', 208 }); 209 210 assert.throws(() => { 211 new ReadableStream({ type: 'bytes' }, { highWaterMark }); 212 }, { 213 code: 'ERR_INVALID_ARG_VALUE', 214 }); 215}); 216 217[-1, NaN].forEach((highWaterMark) => { 218 assert.throws(() => { 219 new ReadableStream({}, { highWaterMark }); 220 }, { 221 code: 'ERR_INVALID_ARG_VALUE', 222 }); 223 224 assert.throws(() => { 225 new ReadableStream({ type: 'bytes' }, { highWaterMark }); 226 }, { 227 code: 'ERR_INVALID_ARG_VALUE', 228 }); 229}); 230 231{ 232 new ReadableStream({}, new ByteLengthQueuingStrategy({ highWaterMark: 1 })); 233 new ReadableStream({}, new CountQueuingStrategy({ highWaterMark: 1 })); 234} 235 236{ 237 const strategy = new ByteLengthQueuingStrategy({ highWaterMark: 1 }); 238 assert.strictEqual(strategy.highWaterMark, 1); 239 assert.strictEqual(strategy.size(new ArrayBuffer(10)), 10); 240 241 const { size } = strategy; 242 assert.strictEqual(size(new ArrayBuffer(10)), 10); 243} 244 245{ 246 const strategy = new CountQueuingStrategy({ highWaterMark: 1 }); 247 assert.strictEqual(strategy.highWaterMark, 1); 248 assert.strictEqual(strategy.size(new ArrayBuffer(10)), 1); 249 250 const { size } = strategy; 251 assert.strictEqual(size(new ArrayBuffer(10)), 1); 252} 253 254{ 255 const r = new ReadableStream({ 256 async start() { 257 throw new Error('boom'); 258 } 259 }); 260 261 setImmediate(() => { 262 assert.strictEqual(r[kState].state, 'errored'); 263 assert.match(r[kState].storedError?.message, /boom/); 264 }); 265} 266 267{ 268 const data = Buffer.from('hello'); 269 const r = new ReadableStream({ 270 start(controller) { 271 controller.enqueue(data); 272 controller.close(); 273 }, 274 }); 275 276 (async function read() { 277 const reader = r.getReader(); 278 let res = await reader.read(); 279 if (res.done) return; 280 const buf = Buffer.from(res.value); 281 assert.strictEqual(buf.toString(), data.toString()); 282 res = await reader.read(); 283 assert(res.done); 284 })().then(common.mustCall()); 285} 286 287{ 288 const r = new ReadableStream({ 289 start(controller) { 290 controller.close(); 291 }, 292 }); 293 294 (async function read() { 295 const reader = r.getReader(); 296 const res = await reader.read(); 297 assert(res.done); 298 })().then(common.mustCall()); 299} 300 301assert.throws(() => { 302 new ReadableStream({ 303 get start() { throw new Error('boom1'); } 304 }, { 305 get size() { throw new Error('boom2'); } 306 }); 307}, /boom2/); 308 309{ 310 const stream = new ReadableStream(); 311 const reader = stream.getReader(); 312 313 assert(stream.locked); 314 assert.strictEqual(reader[kState].stream, stream); 315 assert.strictEqual(stream[kState].reader, reader); 316 317 assert.throws(() => stream.getReader(), { 318 code: 'ERR_INVALID_STATE', 319 }); 320 321 assert(reader instanceof ReadableStreamDefaultReader); 322 323 assert(isPromise(reader.closed)); 324 assert.strictEqual(typeof reader.cancel, 'function'); 325 assert.strictEqual(typeof reader.read, 'function'); 326 assert.strictEqual(typeof reader.releaseLock, 'function'); 327 328 const read1 = reader.read(); 329 const read2 = reader.read(); 330 331 read1.then(common.mustNotCall(), common.mustCall()); 332 read2.then(common.mustNotCall(), common.mustCall()); 333 334 assert.notStrictEqual(read1, read2); 335 336 assert.strictEqual(reader[kState].readRequests.length, 2); 337 338 delay().then(common.mustCall()); 339 340 assert(stream.locked); 341 reader.releaseLock(); 342 assert(!stream.locked); 343} 344 345{ 346 const stream = new ReadableStream(); 347 const reader = stream.getReader(); 348 const closedBefore = reader.closed; 349 assert(stream.locked); 350 reader.releaseLock(); 351 assert(!stream.locked); 352 const closedAfter = reader.closed; 353 354 assert.strictEqual(closedBefore, closedAfter); 355 356 assert.rejects(reader.read(), { 357 code: 'ERR_INVALID_STATE', 358 }); 359 360 assert.rejects(closedBefore, { 361 code: 'ERR_INVALID_STATE', 362 }); 363} 364 365{ 366 const stream = new ReadableStream(); 367 const iterable = stream.values(); 368 readableStreamReaderGenericRelease(stream[kState].reader); 369 assert.rejects(iterable.next(), { 370 code: 'ERR_INVALID_STATE', 371 }).then(common.mustCall()); 372} 373 374{ 375 const stream = new ReadableStream(); 376 const iterable = stream.values(); 377 readableStreamReaderGenericRelease(stream[kState].reader); 378 assert.rejects(iterable.return(), { 379 code: 'ERR_INVALID_STATE', 380 }).then(common.mustCall()); 381} 382 383{ 384 const stream = new ReadableStream({ 385 start(controller) { 386 controller.enqueue(Buffer.from('hello')); 387 } 388 }); 389 390 const reader = stream.getReader(); 391 392 assert.rejects(stream.cancel(), { 393 code: 'ERR_INVALID_STATE', 394 }); 395 396 reader.cancel(); 397 398 reader.read().then(common.mustCall(({ value, done }) => { 399 assert.strictEqual(value, undefined); 400 assert(done); 401 })); 402} 403 404{ 405 const stream = new ReadableStream({ 406 start(controller) { 407 controller.close(); 408 } 409 }); 410 assert(!stream.locked); 411 412 const cancel1 = stream.cancel(); 413 const cancel2 = stream.cancel(); 414 415 assert.notStrictEqual(cancel1, cancel2); 416 417 Promise.all([cancel1, cancel2]).then(common.mustCall((res) => { 418 assert.deepStrictEqual(res, [undefined, undefined]); 419 })); 420} 421 422{ 423 const stream = new ReadableStream({ 424 start(controller) { 425 controller.close(); 426 } 427 }); 428 429 stream.getReader().releaseLock(); 430 stream.getReader().releaseLock(); 431 stream.getReader(); 432} 433 434{ 435 const stream = new ReadableStream({ 436 start(controller) { 437 controller.close(); 438 } 439 }); 440 441 stream.getReader(); 442 443 assert.throws(() => stream.getReader(), { 444 code: 'ERR_INVALID_STATE', 445 }); 446} 447 448{ 449 const stream = new ReadableStream({ 450 start(controller) { 451 controller.close(); 452 }, 453 }); 454 455 const reader = stream.getReader(); 456 457 reader.closed.then(common.mustCall()); 458 459 reader.read().then(common.mustCall(({ value, done }) => { 460 assert.strictEqual(value, undefined); 461 assert(done); 462 reader.read().then(common.mustCall(({ value, done }) => { 463 assert.strictEqual(value, undefined); 464 assert(done); 465 })); 466 })); 467} 468 469{ 470 const stream = new ReadableStream({ 471 start(controller) { 472 controller.close(); 473 }, 474 }); 475 476 const reader = stream.getReader(); 477 478 const closedBefore = reader.closed; 479 reader.releaseLock(); 480 const closedAfter = reader.closed; 481 assert.notStrictEqual(closedBefore, closedAfter); 482 483 closedBefore.then(common.mustCall()); 484 assert.rejects(closedAfter, { 485 code: 'ERR_INVALID_STATE', 486 }); 487} 488 489{ 490 let c; 491 const stream = new ReadableStream({ 492 start(controller) { 493 c = controller; 494 }, 495 }); 496 497 const reader = stream.getReader(); 498 c.close(); 499 500 const closedBefore = reader.closed; 501 reader.releaseLock(); 502 const closedAfter = reader.closed; 503 assert.notStrictEqual(closedBefore, closedAfter); 504 505 closedBefore.then(common.mustCall()); 506 assert.rejects(closedAfter, { 507 code: 'ERR_INVALID_STATE', 508 }); 509} 510 511{ 512 const stream = new ReadableStream({ 513 start(controller) { 514 controller.close(); 515 }, 516 }); 517 518 const reader = stream.getReader(); 519 520 const cancel1 = reader.cancel(); 521 const cancel2 = reader.cancel(); 522 const closed = reader.closed; 523 524 assert.notStrictEqual(cancel1, cancel2); 525 assert.notStrictEqual(cancel1, closed); 526 assert.notStrictEqual(cancel2, closed); 527 528 Promise.all([cancel1, cancel2]).then(common.mustCall((res) => { 529 assert.deepStrictEqual(res, [undefined, undefined]); 530 })); 531} 532 533{ 534 let c; 535 const stream = new ReadableStream({ 536 start(controller) { 537 c = controller; 538 }, 539 }); 540 541 const reader = stream.getReader(); 542 c.close(); 543 544 const cancel1 = reader.cancel(); 545 const cancel2 = reader.cancel(); 546 const closed = reader.closed; 547 548 assert.notStrictEqual(cancel1, cancel2); 549 assert.notStrictEqual(cancel1, closed); 550 assert.notStrictEqual(cancel2, closed); 551 552 Promise.all([cancel1, cancel2]).then(common.mustCall((res) => { 553 assert.deepStrictEqual(res, [undefined, undefined]); 554 })); 555} 556 557{ 558 const stream = new ReadableStream(); 559 const cancel1 = stream.cancel(); 560 const cancel2 = stream.cancel(); 561 assert.notStrictEqual(cancel1, cancel2); 562 563 Promise.all([cancel1, cancel2]).then(common.mustCall((res) => { 564 assert.deepStrictEqual(res, [undefined, undefined]); 565 })); 566 567 stream.getReader().read().then(common.mustCall(({ value, done }) => { 568 assert.strictEqual(value, undefined); 569 assert(done); 570 })); 571} 572 573{ 574 const error = new Error('boom'); 575 const stream = new ReadableStream({ 576 start(controller) { 577 controller.error(error); 578 } 579 }); 580 stream.getReader().releaseLock(); 581 const reader = stream.getReader(); 582 assert.rejects(reader.closed, error); 583 assert.rejects(reader.read(), error); 584 assert.rejects(reader.read(), error); 585} 586 587{ 588 const error = new Error('boom'); 589 const stream = new ReadableStream({ 590 start(controller) { 591 controller.error(error); 592 } 593 }); 594 const reader = stream.getReader(); 595 const cancel1 = reader.cancel(); 596 const cancel2 = reader.cancel(); 597 assert.notStrictEqual(cancel1, cancel2); 598 assert.rejects(cancel1, error); 599 assert.rejects(cancel2, error); 600} 601 602{ 603 const error = new Error('boom'); 604 const stream = new ReadableStream({ 605 async start(controller) { 606 throw error; 607 } 608 }); 609 stream.getReader().releaseLock(); 610 const reader = stream.getReader(); 611 assert.rejects(reader.closed, error); 612 assert.rejects(reader.read(), error); 613 assert.rejects(reader.read(), error); 614} 615 616{ 617 const buf1 = Buffer.from('hello'); 618 const buf2 = Buffer.from('there'); 619 let doClose; 620 const stream = new ReadableStream({ 621 start(controller) { 622 controller.enqueue(buf1); 623 controller.enqueue(buf2); 624 doClose = controller.close.bind(controller); 625 } 626 }); 627 const reader = stream.getReader(); 628 doClose(); 629 reader.read().then(common.mustCall(({ value, done }) => { 630 assert.deepStrictEqual(value, buf1); 631 assert(!done); 632 reader.read().then(common.mustCall(({ value, done }) => { 633 assert.deepStrictEqual(value, buf2); 634 assert(!done); 635 reader.read().then(common.mustCall(({ value, done }) => { 636 assert.strictEqual(value, undefined); 637 assert(done); 638 })); 639 })); 640 })); 641} 642 643{ 644 const buf1 = Buffer.from('hello'); 645 const buf2 = Buffer.from('there'); 646 const stream = new ReadableStream({ 647 start(controller) { 648 controller.enqueue(buf1); 649 controller.enqueue(buf2); 650 } 651 }); 652 const reader = stream.getReader(); 653 reader.read().then(common.mustCall(({ value, done }) => { 654 assert.deepStrictEqual(value, buf1); 655 assert(!done); 656 reader.read().then(common.mustCall(({ value, done }) => { 657 assert.deepStrictEqual(value, buf2); 658 assert(!done); 659 reader.read().then(common.mustNotCall()); 660 delay().then(common.mustCall()); 661 })); 662 })); 663} 664 665{ 666 const stream = new ReadableStream({ 667 start(controller) { 668 controller.enqueue('a'); 669 controller.enqueue('b'); 670 controller.close(); 671 } 672 }); 673 674 const { 0: s1, 1: s2 } = stream.tee(); 675 676 assert(s1 instanceof ReadableStream); 677 assert(s2 instanceof ReadableStream); 678 679 async function read(stream) { 680 const reader = stream.getReader(); 681 assert.deepStrictEqual( 682 await reader.read(), { value: 'a', done: false }); 683 assert.deepStrictEqual( 684 await reader.read(), { value: 'b', done: false }); 685 assert.deepStrictEqual( 686 await reader.read(), { value: undefined, done: true }); 687 } 688 689 Promise.all([ 690 read(s1), 691 read(s2), 692 ]).then(common.mustCall()); 693} 694 695{ 696 const error = new Error('boom'); 697 const stream = new ReadableStream({ 698 start(controller) { 699 controller.enqueue('a'); 700 controller.enqueue('b'); 701 }, 702 pull() { throw error; } 703 }); 704 705 const { 0: s1, 1: s2 } = stream.tee(); 706 707 assert(stream.locked); 708 709 assert(s1 instanceof ReadableStream); 710 assert(s2 instanceof ReadableStream); 711 712 const reader1 = s1.getReader(); 713 const reader2 = s2.getReader(); 714 715 const closed1 = reader1.closed; 716 const closed2 = reader2.closed; 717 718 assert.notStrictEqual(closed1, closed2); 719 720 assert.rejects(closed1, error); 721 assert.rejects(closed2, error); 722} 723 724{ 725 const stream = new ReadableStream({ 726 start(controller) { 727 controller.enqueue('a'); 728 controller.enqueue('b'); 729 controller.close(); 730 } 731 }); 732 733 const { 0: s1, 1: s2 } = stream.tee(); 734 735 assert(s1 instanceof ReadableStream); 736 assert(s2 instanceof ReadableStream); 737 738 s2.cancel(); 739 740 async function read(stream, canceled = false) { 741 const reader = stream.getReader(); 742 if (!canceled) { 743 assert.deepStrictEqual( 744 await reader.read(), { value: 'a', done: false }); 745 assert.deepStrictEqual( 746 await reader.read(), { value: 'b', done: false }); 747 } 748 assert.deepStrictEqual( 749 await reader.read(), { value: undefined, done: true }); 750 } 751 752 Promise.all([ 753 read(s1), 754 read(s2, true), 755 ]).then(common.mustCall()); 756} 757 758{ 759 const error1 = new Error('boom1'); 760 const error2 = new Error('boom2'); 761 762 const stream = new ReadableStream({ 763 cancel(reason) { 764 assert.deepStrictEqual(reason, [error1, error2]); 765 } 766 }); 767 768 const { 0: s1, 1: s2 } = stream.tee(); 769 s1.cancel(error1); 770 s2.cancel(error2); 771} 772 773{ 774 const error1 = new Error('boom1'); 775 const error2 = new Error('boom2'); 776 777 const stream = new ReadableStream({ 778 cancel(reason) { 779 assert.deepStrictEqual(reason, [error1, error2]); 780 } 781 }); 782 783 const { 0: s1, 1: s2 } = stream.tee(); 784 s2.cancel(error2); 785 s1.cancel(error1); 786} 787 788{ 789 const error = new Error('boom1'); 790 791 const stream = new ReadableStream({ 792 cancel() { 793 throw error; 794 } 795 }); 796 797 const { 0: s1, 1: s2 } = stream.tee(); 798 799 assert.rejects(s1.cancel(), error); 800 assert.rejects(s2.cancel(), error); 801} 802 803{ 804 const error = new Error('boom1'); 805 let c; 806 const stream = new ReadableStream({ 807 start(controller) { 808 c = controller; 809 } 810 }); 811 812 const { 0: s1, 1: s2 } = stream.tee(); 813 c.error(error); 814 815 assert.rejects(s1.cancel(), error); 816 assert.rejects(s2.cancel(), error); 817} 818 819{ 820 const error = new Error('boom1'); 821 let c; 822 const stream = new ReadableStream({ 823 start(controller) { 824 c = controller; 825 } 826 }); 827 828 const { 0: s1, 1: s2 } = stream.tee(); 829 830 const reader1 = s1.getReader(); 831 const reader2 = s2.getReader(); 832 833 assert.rejects(reader1.closed, error); 834 assert.rejects(reader2.closed, error); 835 836 assert.rejects(reader1.read(), error); 837 assert.rejects(reader2.read(), error); 838 839 setImmediate(() => c.error(error)); 840} 841 842{ 843 let pullCount = 0; 844 const stream = new ReadableStream({ 845 pull(controller) { 846 if (pullCount) 847 controller.enqueue(pullCount); 848 pullCount++; 849 }, 850 }); 851 852 const reader = stream.getReader(); 853 854 queueMicrotask(common.mustCall(() => { 855 assert.strictEqual(pullCount, 1); 856 reader.read().then(common.mustCall(({ value, done }) => { 857 assert.strictEqual(value, 1); 858 assert(!done); 859 860 reader.read().then(common.mustCall(({ value, done }) => { 861 assert.strictEqual(value, 2); 862 assert(!done); 863 })); 864 865 })); 866 })); 867} 868 869{ 870 const stream = new ReadableStream({ 871 start(controller) { 872 controller.enqueue('a'); 873 }, 874 pull: common.mustCall(), 875 }); 876 877 stream.getReader().read().then(common.mustCall(({ value, done }) => { 878 assert.strictEqual(value, 'a'); 879 assert(!done); 880 })); 881} 882 883{ 884 const stream = new ReadableStream({ 885 start(controller) { 886 controller.enqueue('a'); 887 controller.enqueue('b'); 888 }, 889 pull: common.mustCall(), 890 }); 891 892 const reader = stream.getReader(); 893 reader.read().then(common.mustCall(({ value, done }) => { 894 assert.strictEqual(value, 'a'); 895 assert(!done); 896 897 reader.read().then(common.mustCall(({ value, done }) => { 898 assert.strictEqual(value, 'b'); 899 assert(!done); 900 })); 901 })); 902} 903 904{ 905 const stream = new ReadableStream({ 906 start(controller) { 907 controller.enqueue('a'); 908 controller.enqueue('b'); 909 controller.close(); 910 }, 911 pull: common.mustNotCall(), 912 }); 913 914 const reader = stream.getReader(); 915 reader.read().then(common.mustCall(({ value, done }) => { 916 assert.strictEqual(value, 'a'); 917 assert(!done); 918 919 reader.read().then(common.mustCall(({ value, done }) => { 920 assert.strictEqual(value, 'b'); 921 assert(!done); 922 923 reader.read().then(common.mustCall(({ value, done }) => { 924 assert.strictEqual(value, undefined); 925 assert(done); 926 })); 927 928 })); 929 })); 930} 931 932{ 933 let res; 934 let promise; 935 let calls = 0; 936 const stream = new ReadableStream({ 937 pull(controller) { 938 controller.enqueue(++calls); 939 promise = new Promise((resolve) => res = resolve); 940 return promise; 941 } 942 }); 943 944 const reader = stream.getReader(); 945 946 (async () => { 947 await reader.read(); 948 assert.strictEqual(calls, 1); 949 await delay(); 950 assert.strictEqual(calls, 1); 951 res(); 952 await delay(); 953 assert.strictEqual(calls, 2); 954 })().then(common.mustCall()); 955} 956 957{ 958 const stream = new ReadableStream({ 959 start(controller) { 960 controller.enqueue('a'); 961 controller.enqueue('b'); 962 controller.enqueue('c'); 963 }, 964 pull: common.mustCall(4), 965 }, { 966 highWaterMark: Infinity, 967 size() { return 1; } 968 }); 969 970 const reader = stream.getReader(); 971 (async () => { 972 await delay(); 973 await reader.read(); 974 await reader.read(); 975 await reader.read(); 976 })().then(common.mustCall()); 977} 978 979{ 980 const stream = new ReadableStream({ 981 start(controller) { 982 controller.enqueue('a'); 983 controller.enqueue('b'); 984 controller.enqueue('c'); 985 controller.close(); 986 }, 987 pull: common.mustNotCall(), 988 }, { 989 highWaterMark: Infinity, 990 size() { return 1; } 991 }); 992 993 const reader = stream.getReader(); 994 (async () => { 995 await delay(); 996 await reader.read(); 997 await reader.read(); 998 await reader.read(); 999 })().then(common.mustCall()); 1000} 1001 1002{ 1003 let calls = 0; 1004 let res; 1005 const ready = new Promise((resolve) => res = resolve); 1006 1007 new ReadableStream({ 1008 pull(controller) { 1009 controller.enqueue(++calls); 1010 if (calls === 4) 1011 res(); 1012 } 1013 }, { 1014 size() { return 1; }, 1015 highWaterMark: 4 1016 }); 1017 1018 ready.then(common.mustCall(() => { 1019 assert.strictEqual(calls, 4); 1020 })); 1021} 1022 1023{ 1024 const stream = new ReadableStream({ 1025 pull: common.mustCall((controller) => controller.close()) 1026 }); 1027 1028 const reader = stream.getReader(); 1029 1030 reader.closed.then(common.mustCall()); 1031} 1032 1033{ 1034 const error = new Error('boom'); 1035 const stream = new ReadableStream({ 1036 pull: common.mustCall((controller) => controller.error(error)) 1037 }); 1038 1039 const reader = stream.getReader(); 1040 1041 assert.rejects(reader.closed, error); 1042} 1043 1044{ 1045 const error = new Error('boom'); 1046 const error2 = new Error('boom2'); 1047 const stream = new ReadableStream({ 1048 pull: common.mustCall((controller) => { 1049 controller.error(error); 1050 throw error2; 1051 }) 1052 }); 1053 1054 const reader = stream.getReader(); 1055 1056 assert.rejects(reader.closed, error); 1057} 1058 1059{ 1060 let startCalled = false; 1061 new ReadableStream({ 1062 start: common.mustCall((controller) => { 1063 controller.enqueue('a'); 1064 controller.close(); 1065 assert.throws(() => controller.enqueue('b'), { 1066 code: 'ERR_INVALID_STATE' 1067 }); 1068 startCalled = true; 1069 }) 1070 }); 1071 assert(startCalled); 1072} 1073 1074{ 1075 let startCalled = false; 1076 new ReadableStream({ 1077 start: common.mustCall((controller) => { 1078 controller.close(); 1079 assert.throws(() => controller.enqueue('b'), { 1080 code: 'ERR_INVALID_STATE' 1081 }); 1082 startCalled = true; 1083 }) 1084 }); 1085 assert(startCalled); 1086} 1087 1088{ 1089 class Source { 1090 startCalled = false; 1091 pullCalled = false; 1092 cancelCalled = false; 1093 1094 start(controller) { 1095 assert.strictEqual(this, source); 1096 this.startCalled = true; 1097 controller.enqueue('a'); 1098 } 1099 1100 pull() { 1101 assert.strictEqual(this, source); 1102 this.pullCalled = true; 1103 } 1104 1105 cancel() { 1106 assert.strictEqual(this, source); 1107 this.cancelCalled = true; 1108 } 1109 } 1110 1111 const source = new Source(); 1112 1113 const stream = new ReadableStream(source); 1114 const reader = stream.getReader(); 1115 1116 (async () => { 1117 await reader.read(); 1118 reader.releaseLock(); 1119 stream.cancel(); 1120 assert(source.startCalled); 1121 assert(source.pullCalled); 1122 assert(source.cancelCalled); 1123 })().then(common.mustCall()); 1124} 1125 1126{ 1127 let startCalled = false; 1128 new ReadableStream({ 1129 start(controller) { 1130 assert.strictEqual(controller.desiredSize, 10); 1131 controller.close(); 1132 assert.strictEqual(controller.desiredSize, 0); 1133 startCalled = true; 1134 } 1135 }, { 1136 highWaterMark: 10 1137 }); 1138 assert(startCalled); 1139} 1140 1141{ 1142 let startCalled = false; 1143 new ReadableStream({ 1144 start(controller) { 1145 assert.strictEqual(controller.desiredSize, 10); 1146 controller.error(); 1147 assert.strictEqual(controller.desiredSize, null); 1148 startCalled = true; 1149 } 1150 }, { 1151 highWaterMark: 10 1152 }); 1153 assert(startCalled); 1154} 1155 1156{ 1157 class Foo extends ReadableStream {} 1158 const foo = new Foo(); 1159 foo.getReader(); 1160} 1161 1162{ 1163 let startCalled = false; 1164 new ReadableStream({ 1165 start(controller) { 1166 assert.strictEqual(controller.desiredSize, 1); 1167 controller.enqueue('a'); 1168 assert.strictEqual(controller.desiredSize, 0); 1169 controller.enqueue('a'); 1170 assert.strictEqual(controller.desiredSize, -1); 1171 controller.enqueue('a'); 1172 assert.strictEqual(controller.desiredSize, -2); 1173 controller.enqueue('a'); 1174 assert.strictEqual(controller.desiredSize, -3); 1175 startCalled = true; 1176 } 1177 }); 1178 assert(startCalled); 1179} 1180 1181{ 1182 let c; 1183 const stream = new ReadableStream({ 1184 start(controller) { 1185 c = controller; 1186 } 1187 }); 1188 1189 const reader = stream.getReader(); 1190 1191 (async () => { 1192 assert.strictEqual(c.desiredSize, 1); 1193 c.enqueue(1); 1194 assert.strictEqual(c.desiredSize, 0); 1195 await reader.read(); 1196 assert.strictEqual(c.desiredSize, 1); 1197 c.enqueue(1); 1198 c.enqueue(1); 1199 assert.strictEqual(c.desiredSize, -1); 1200 await reader.read(); 1201 assert.strictEqual(c.desiredSize, 0); 1202 await reader.read(); 1203 assert.strictEqual(c.desiredSize, 1); 1204 })().then(common.mustCall()); 1205} 1206 1207{ 1208 let c; 1209 new ReadableStream({ 1210 start(controller) { 1211 c = controller; 1212 } 1213 }); 1214 assert(c instanceof ReadableStreamDefaultController); 1215 assert.strictEqual(typeof c.desiredSize, 'number'); 1216 assert.strictEqual(typeof c.enqueue, 'function'); 1217 assert.strictEqual(typeof c.close, 'function'); 1218 assert.strictEqual(typeof c.error, 'function'); 1219} 1220 1221class Source { 1222 constructor() { 1223 this.cancelCalled = false; 1224 } 1225 1226 start(controller) { 1227 this.stream = createReadStream(__filename); 1228 this.stream.on('data', (chunk) => { 1229 controller.enqueue(chunk); 1230 }); 1231 this.stream.once('end', () => { 1232 if (!this.cancelCalled) 1233 controller.close(); 1234 }); 1235 this.stream.once('error', (error) => { 1236 controller.error(error); 1237 }); 1238 } 1239 1240 cancel() { 1241 this.cancelCalled = true; 1242 } 1243} 1244 1245{ 1246 const source = new Source(); 1247 const stream = new ReadableStream(source); 1248 1249 async function read(stream) { 1250 const reader = stream.getReader(); 1251 const chunks = []; 1252 let read = await reader.read(); 1253 while (!read.done) { 1254 chunks.push(Buffer.from(read.value)); 1255 read = await reader.read(); 1256 } 1257 return Buffer.concat(chunks); 1258 } 1259 1260 read(stream).then(common.mustCall((data) => { 1261 const check = readFileSync(__filename); 1262 assert.deepStrictEqual(data, check); 1263 })); 1264} 1265 1266{ 1267 const source = new Source(); 1268 const stream = new ReadableStream(source); 1269 1270 async function read(stream) { 1271 const chunks = []; 1272 for await (const chunk of stream) 1273 chunks.push(chunk); 1274 return Buffer.concat(chunks); 1275 } 1276 1277 read(stream).then(common.mustCall((data) => { 1278 const check = readFileSync(__filename); 1279 assert.deepStrictEqual(data, check); 1280 1281 assert.strictEqual(stream[kState].state, 'closed'); 1282 assert(!stream.locked); 1283 })); 1284} 1285 1286{ 1287 const source = new Source(); 1288 const stream = new ReadableStream(source); 1289 1290 [1, false, ''].forEach((options) => { 1291 assert.throws(() => stream.values(options), { 1292 code: 'ERR_INVALID_ARG_TYPE', 1293 }); 1294 }); 1295 1296 async function read(stream) { 1297 for await (const _ of stream.values({ preventCancel: true })) 1298 return; 1299 } 1300 1301 read(stream).then(common.mustCall((data) => { 1302 assert.strictEqual(stream[kState].state, 'readable'); 1303 })); 1304} 1305 1306{ 1307 const source = new Source(); 1308 const stream = new ReadableStream(source); 1309 1310 async function read(stream) { 1311 for await (const _ of stream.values({ preventCancel: false })) 1312 return; 1313 } 1314 1315 read(stream).then(common.mustCall((data) => { 1316 assert.strictEqual(stream[kState].state, 'closed'); 1317 })); 1318} 1319 1320{ 1321 const source = new Source(); 1322 const stream = new ReadableStream(source); 1323 1324 const error = new Error('boom'); 1325 1326 async function read(stream) { 1327 // eslint-disable-next-line no-unused-vars 1328 for await (const _ of stream.values({ preventCancel: true })) 1329 throw error; 1330 } 1331 1332 assert.rejects(read(stream), error).then(common.mustCall(() => { 1333 assert.strictEqual(stream[kState].state, 'readable'); 1334 })); 1335} 1336 1337{ 1338 const source = new Source(); 1339 const stream = new ReadableStream(source); 1340 1341 const error = new Error('boom'); 1342 1343 async function read(stream) { 1344 // eslint-disable-next-line no-unused-vars 1345 for await (const _ of stream.values({ preventCancel: false })) 1346 throw error; 1347 } 1348 1349 assert.rejects(read(stream), error).then(common.mustCall(() => { 1350 assert.strictEqual(stream[kState].state, 'closed'); 1351 })); 1352} 1353 1354{ 1355 assert.throws(() => Reflect.get(ReadableStream.prototype, 'locked', {}), { 1356 code: 'ERR_INVALID_THIS', 1357 }); 1358 assert.rejects(() => ReadableStream.prototype.cancel.call({}), { 1359 code: 'ERR_INVALID_THIS', 1360 }); 1361 assert.throws(() => ReadableStream.prototype.getReader.call({}), { 1362 code: 'ERR_INVALID_THIS', 1363 }); 1364 assert.throws(() => ReadableStream.prototype.tee.call({}), { 1365 code: 'ERR_INVALID_THIS', 1366 }); 1367 assert.throws(() => ReadableStream.prototype.values.call({}), { 1368 code: 'ERR_INVALID_THIS', 1369 }); 1370 assert.throws(() => ReadableStream.prototype[kTransfer].call({}), { 1371 code: 'ERR_INVALID_THIS', 1372 }); 1373 assert.rejects(() => ReadableStreamDefaultReader.prototype.read.call({}), { 1374 code: 'ERR_INVALID_THIS', 1375 }); 1376 assert.rejects(() => ReadableStreamDefaultReader.prototype.cancel.call({}), { 1377 code: 'ERR_INVALID_THIS', 1378 }); 1379 assert.rejects(() => { 1380 return Reflect.get(ReadableStreamDefaultReader.prototype, 'closed'); 1381 }, { 1382 code: 'ERR_INVALID_THIS', 1383 }); 1384 assert.throws(() => { 1385 ReadableStreamDefaultReader.prototype.releaseLock.call({}); 1386 }, { 1387 code: 'ERR_INVALID_THIS', 1388 }); 1389 assert.rejects(() => ReadableStreamBYOBReader.prototype.read.call({}), { 1390 code: 'ERR_INVALID_THIS', 1391 }); 1392 assert.throws(() => { 1393 ReadableStreamBYOBReader.prototype.releaseLock.call({}); 1394 }, { 1395 code: 'ERR_INVALID_THIS', 1396 }); 1397 assert.rejects(() => { 1398 return Reflect.get(ReadableStreamBYOBReader.prototype, 'closed'); 1399 }, { 1400 code: 'ERR_INVALID_THIS', 1401 }); 1402 assert.rejects(() => ReadableStreamBYOBReader.prototype.cancel.call({}), { 1403 code: 'ERR_INVALID_THIS', 1404 }); 1405 1406 assert.throws(() => { 1407 Reflect.get(ReadableByteStreamController.prototype, 'byobRequest', {}); 1408 }, { 1409 code: 'ERR_INVALID_THIS', 1410 }); 1411 assert.throws(() => { 1412 Reflect.get(ReadableByteStreamController.prototype, 'desiredSize', {}); 1413 }, { 1414 code: 'ERR_INVALID_THIS', 1415 }); 1416 assert.throws(() => { 1417 ReadableByteStreamController.prototype.close.call({}); 1418 }, { 1419 code: 'ERR_INVALID_THIS', 1420 }); 1421 assert.throws(() => { 1422 ReadableByteStreamController.prototype.enqueue.call({}); 1423 }, { 1424 code: 'ERR_INVALID_THIS', 1425 }); 1426 assert.throws(() => { 1427 ReadableByteStreamController.prototype.error.call({}); 1428 }, { 1429 code: 'ERR_INVALID_THIS', 1430 }); 1431 1432 assert.throws(() => new ReadableStreamBYOBRequest(), { 1433 code: 'ERR_ILLEGAL_CONSTRUCTOR', 1434 }); 1435 1436 assert.throws(() => new ReadableStreamDefaultController(), { 1437 code: 'ERR_ILLEGAL_CONSTRUCTOR', 1438 }); 1439 1440 assert.throws(() => new ReadableByteStreamController(), { 1441 code: 'ERR_ILLEGAL_CONSTRUCTOR', 1442 }); 1443} 1444 1445{ 1446 let controller; 1447 const readable = new ReadableStream({ 1448 start(c) { controller = c; } 1449 }); 1450 1451 assert.strictEqual( 1452 inspect(readable), 1453 'ReadableStream { locked: false, state: \'readable\', ' + 1454 'supportsBYOB: false }'); 1455 assert.strictEqual( 1456 inspect(readable, { depth: null }), 1457 'ReadableStream { locked: false, state: \'readable\', ' + 1458 'supportsBYOB: false }'); 1459 assert.strictEqual( 1460 inspect(readable, { depth: 0 }), 1461 'ReadableStream [Object]'); 1462 1463 assert.strictEqual( 1464 inspect(controller), 1465 'ReadableStreamDefaultController {}'); 1466 assert.strictEqual( 1467 inspect(controller, { depth: null }), 1468 'ReadableStreamDefaultController {}'); 1469 assert.strictEqual( 1470 inspect(controller, { depth: 0 }), 1471 'ReadableStreamDefaultController {}'); 1472 1473 const reader = readable.getReader(); 1474 1475 assert.match( 1476 inspect(reader), 1477 /ReadableStreamDefaultReader/); 1478 assert.match( 1479 inspect(reader, { depth: null }), 1480 /ReadableStreamDefaultReader/); 1481 assert.match( 1482 inspect(reader, { depth: 0 }), 1483 /ReadableStreamDefaultReader/); 1484 1485 assert.rejects(readableStreamPipeTo(1), { 1486 code: 'ERR_INVALID_ARG_TYPE', 1487 }); 1488 1489 assert.rejects(readableStreamPipeTo(new ReadableStream(), 1), { 1490 code: 'ERR_INVALID_ARG_TYPE', 1491 }); 1492 1493 assert.rejects( 1494 readableStreamPipeTo( 1495 new ReadableStream(), 1496 new WritableStream(), 1497 false, 1498 false, 1499 false, 1500 {}), 1501 { 1502 code: 'ERR_INVALID_ARG_TYPE', 1503 }); 1504} 1505 1506{ 1507 const readable = new ReadableStream(); 1508 const reader = readable.getReader(); 1509 reader.releaseLock(); 1510 reader.releaseLock(); 1511 assert.rejects(reader.read(), { 1512 code: 'ERR_INVALID_STATE', 1513 }); 1514 assert.rejects(reader.cancel(), { 1515 code: 'ERR_INVALID_STATE', 1516 }); 1517} 1518 1519{ 1520 // Test tee() cloneForBranch2 argument 1521 const readable = new ReadableStream({ 1522 start(controller) { 1523 controller.enqueue('hello'); 1524 } 1525 }); 1526 const [r1, r2] = readableStreamTee(readable, true); 1527 r1.getReader().read().then( 1528 common.mustCall(({ value }) => assert.strictEqual(value, 'hello'))); 1529 r2.getReader().read().then( 1530 common.mustCall(({ value }) => assert.strictEqual(value, 'hello'))); 1531} 1532 1533{ 1534 assert.throws(() => { 1535 readableByteStreamControllerConvertPullIntoDescriptor({ 1536 bytesFilled: 10, 1537 byteLength: 5 1538 }); 1539 }, { 1540 code: 'ERR_INVALID_STATE', 1541 }); 1542} 1543 1544{ 1545 let controller; 1546 const readable = new ReadableStream({ 1547 start(c) { controller = c; } 1548 }); 1549 1550 controller[kState].pendingPullIntos = [{}]; 1551 assert.throws(() => readableByteStreamControllerRespond(controller, 0), { 1552 code: 'ERR_INVALID_ARG_VALUE', 1553 }); 1554 1555 readable.cancel().then(common.mustCall()); 1556 1557 assert.throws(() => readableByteStreamControllerRespond(controller, 1), { 1558 code: 'ERR_INVALID_ARG_VALUE', 1559 }); 1560 1561 assert(!readableStreamDefaultControllerCanCloseOrEnqueue(controller)); 1562 readableStreamDefaultControllerEnqueue(controller); 1563 readableByteStreamControllerClose(controller); 1564 readableByteStreamControllerEnqueue(controller, new Uint8Array(1)); 1565} 1566 1567{ 1568 const stream = new ReadableStream({ 1569 start(controller) { 1570 controller.enqueue('a'); 1571 controller.close(); 1572 }, 1573 pull: common.mustNotCall(), 1574 }); 1575 1576 const reader = stream.getReader(); 1577 (async () => { 1578 isDisturbed(stream, false); 1579 await reader.read(); 1580 isDisturbed(stream, true); 1581 })().then(common.mustCall()); 1582} 1583 1584{ 1585 const stream = new ReadableStream({ 1586 start(controller) { 1587 controller.close(); 1588 }, 1589 pull: common.mustNotCall(), 1590 }); 1591 1592 const reader = stream.getReader(); 1593 (async () => { 1594 isDisturbed(stream, false); 1595 await reader.read(); 1596 isDisturbed(stream, true); 1597 })().then(common.mustCall()); 1598} 1599 1600{ 1601 const stream = new ReadableStream({ 1602 start(controller) { 1603 }, 1604 pull: common.mustNotCall(), 1605 }); 1606 stream.cancel(); 1607 1608 const reader = stream.getReader(); 1609 (async () => { 1610 isDisturbed(stream, false); 1611 await reader.read(); 1612 isDisturbed(stream, true); 1613 })().then(common.mustCall()); 1614} 1615 1616{ 1617 const stream = new ReadableStream({ 1618 pull: common.mustCall((controller) => { 1619 controller.error(new Error()); 1620 }), 1621 }); 1622 1623 const reader = stream.getReader(); 1624 (async () => { 1625 isErrored(stream, false); 1626 await reader.read().catch(common.mustCall()); 1627 isErrored(stream, true); 1628 })().then(common.mustCall()); 1629} 1630 1631{ 1632 const stream = new ReadableStream({ 1633 pull: common.mustCall((controller) => { 1634 controller.error(new Error()); 1635 }), 1636 }); 1637 1638 const reader = stream.getReader(); 1639 (async () => { 1640 isReadable(stream, true); 1641 await reader.read().catch(common.mustCall()); 1642 isReadable(stream, false); 1643 })().then(common.mustCall()); 1644} 1645 1646{ 1647 const stream = new ReadableStream({ 1648 type: 'bytes', 1649 start(controller) { 1650 controller.close(); 1651 } 1652 }); 1653 1654 const buffer = new ArrayBuffer(1024); 1655 const reader = stream.getReader({ mode: 'byob' }); 1656 1657 reader.read(new DataView(buffer)) 1658 .then(common.mustCall()); 1659} 1660 1661{ 1662 const stream = new ReadableStream({ 1663 type: 'bytes', 1664 autoAllocateChunkSize: 128, 1665 pull: common.mustCall((controller) => { 1666 const view = controller.byobRequest.view; 1667 const dest = new Uint8Array( 1668 view.buffer, 1669 view.byteOffset, 1670 view.byteLength 1671 ); 1672 dest.fill(1); 1673 controller.byobRequest.respondWithNewView(dest); 1674 }), 1675 }); 1676 1677 const reader = stream.getReader({ mode: 'byob' }); 1678 1679 const buffer = new ArrayBuffer(10); 1680 const view = new Uint8Array( 1681 buffer, 1682 1, 1683 3 1684 ); 1685 1686 reader.read(view).then(common.mustCall(({ value }) => { 1687 assert.deepStrictEqual(value, new Uint8Array([1, 1, 1])); 1688 })); 1689} 1690