1'use strict'; 2 3const { 4 ArrayBuffer, 5 ArrayBufferPrototypeGetByteLength, 6 ArrayBufferPrototypeSlice, 7 ArrayPrototypePush, 8 ArrayPrototypeShift, 9 DataView, 10 FunctionPrototypeBind, 11 FunctionPrototypeCall, 12 MathMin, 13 NumberIsInteger, 14 ObjectCreate, 15 ObjectDefineProperties, 16 ObjectSetPrototypeOf, 17 Promise, 18 PromisePrototypeThen, 19 PromiseResolve, 20 PromiseReject, 21 ReflectConstruct, 22 SafePromiseAll, 23 Symbol, 24 SymbolAsyncIterator, 25 SymbolDispose, 26 SymbolToStringTag, 27 Uint8Array, 28} = primordials; 29 30const { 31 AbortError, 32 codes: { 33 ERR_ILLEGAL_CONSTRUCTOR, 34 ERR_INVALID_ARG_VALUE, 35 ERR_INVALID_ARG_TYPE, 36 ERR_INVALID_STATE, 37 ERR_INVALID_THIS, 38 }, 39} = require('internal/errors'); 40 41const { 42 DOMException, 43} = internalBinding('messaging'); 44 45const { 46 isArrayBufferView, 47 isDataView, 48} = require('internal/util/types'); 49 50const { 51 createDeferredPromise, 52 customInspectSymbol: kInspect, 53 isArrayBufferDetached, 54 kEmptyObject, 55 kEnumerableProperty, 56 SideEffectFreeRegExpPrototypeSymbolReplace, 57} = require('internal/util'); 58 59const { 60 validateAbortSignal, 61 validateBuffer, 62 validateObject, 63} = require('internal/validators'); 64 65const { 66 MessageChannel, 67} = require('internal/worker/io'); 68 69const { 70 kDeserialize, 71 kTransfer, 72 kTransferList, 73 makeTransferable, 74} = require('internal/worker/js_transferable'); 75 76const { 77 queueMicrotask, 78} = require('internal/process/task_queues'); 79 80const { 81 kIsDisturbed, 82 kIsErrored, 83 kIsReadable, 84 kIsClosedPromise, 85 kControllerErrorFunction, 86} = require('internal/streams/utils'); 87 88const { 89 structuredClone, 90} = require('internal/structured_clone'); 91 92const { 93 ArrayBufferViewGetBuffer, 94 ArrayBufferViewGetByteLength, 95 ArrayBufferViewGetByteOffset, 96 AsyncIterator, 97 cloneAsUint8Array, 98 copyArrayBuffer, 99 customInspect, 100 dequeueValue, 101 ensureIsPromise, 102 enqueueValueWithSize, 103 extractHighWaterMark, 104 extractSizeAlgorithm, 105 lazyTransfer, 106 isViewedArrayBufferDetached, 107 isBrandCheck, 108 resetQueue, 109 setPromiseHandled, 110 transferArrayBuffer, 111 nonOpCancel, 112 nonOpPull, 113 nonOpStart, 114 kType, 115 kState, 116} = require('internal/webstreams/util'); 117 118const { 119 WritableStreamDefaultWriter, 120 121 isWritableStream, 122 isWritableStreamLocked, 123 isWritableStreamDefaultController, 124 isWritableStreamDefaultWriter, 125 126 writableStreamAbort, 127 writableStreamCloseQueuedOrInFlight, 128 writableStreamDefaultWriterCloseWithErrorPropagation, 129 writableStreamDefaultWriterRelease, 130 writableStreamDefaultWriterWrite, 131} = require('internal/webstreams/writablestream'); 132 133const { Buffer } = require('buffer'); 134 135const assert = require('internal/assert'); 136 137const kCancel = Symbol('kCancel'); 138const kClose = Symbol('kClose'); 139const kChunk = Symbol('kChunk'); 140const kError = Symbol('kError'); 141const kPull = Symbol('kPull'); 142const kRelease = Symbol('kRelease'); 143const kSkipThrow = Symbol('kSkipThrow'); 144 145let releasedError; 146let releasingError; 147let addAbortListener; 148 149const userModuleRegExp = /^ {4}at (?:[^/\\(]+ \()(?!node:(.+):\d+:\d+\)$).*/gm; 150 151function lazyReadableReleasedError() { 152 if (releasedError) { 153 return releasedError; 154 } 155 156 releasedError = new ERR_INVALID_STATE.TypeError('Reader released'); 157 // Avoid V8 leak and remove userland stackstrace 158 releasedError.stack = SideEffectFreeRegExpPrototypeSymbolReplace(userModuleRegExp, releasedError.stack, ''); 159 return releasedError; 160} 161 162function lazyReadableReleasingError() { 163 if (releasingError) { 164 return releasingError; 165 } 166 releasingError = new ERR_INVALID_STATE.TypeError('Releasing reader'); 167 // Avoid V8 leak and remove userland stackstrace 168 releasingError.stack = SideEffectFreeRegExpPrototypeSymbolReplace(userModuleRegExp, releasingError.stack, ''); 169 return releasingError; 170} 171 172const getNonWritablePropertyDescriptor = (value) => { 173 return { 174 __proto__: null, 175 configurable: true, 176 value, 177 }; 178}; 179 180/** 181 * @typedef {import('../abort_controller').AbortSignal} AbortSignal 182 * @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy 183 * @typedef {import('./queuingstrategies').QueuingStrategySize 184 * } QueuingStrategySize 185 * @typedef {import('./writablestream').WritableStream} WritableStream 186 */ 187 188/** 189 * @typedef {ReadableStreamDefaultController | ReadableByteStreamController 190 * } ReadableStreamController 191 */ 192 193/** 194 * @typedef {ReadableStreamDefaultReader | ReadableStreamBYOBReader 195 * } ReadableStreamReader 196 */ 197 198/** 199 * @callback UnderlyingSourceStartCallback 200 * @param {ReadableStreamController} controller 201 * @returns { any | Promise<void> } 202 */ 203 204/** 205 * @callback UnderlyingSourcePullCallback 206 * @param {ReadableStreamController} controller 207 * @returns { Promise<void> } 208 */ 209 210/** 211 * @callback UnderlyingSourceCancelCallback 212 * @param {any} reason 213 * @returns { Promise<void> } 214 */ 215 216/** 217 * @typedef {{ 218 * readable: ReadableStream, 219 * writable: WritableStream, 220 * }} ReadableWritablePair 221 */ 222 223/** 224 * @typedef {{ 225 * preventClose? : boolean, 226 * preventAbort? : boolean, 227 * preventCancel? : boolean, 228 * signal? : AbortSignal, 229 * }} StreamPipeOptions 230 */ 231 232/** 233 * @typedef {{ 234 * start? : UnderlyingSourceStartCallback, 235 * pull? : UnderlyingSourcePullCallback, 236 * cancel? : UnderlyingSourceCancelCallback, 237 * type? : "bytes", 238 * autoAllocateChunkSize? : number 239 * }} UnderlyingSource 240 */ 241 242class ReadableStream { 243 [kType] = 'ReadableStream'; 244 245 /** 246 * @param {UnderlyingSource} [source] 247 * @param {QueuingStrategy} [strategy] 248 */ 249 constructor(source = {}, strategy = kEmptyObject) { 250 if (source === null) 251 throw new ERR_INVALID_ARG_VALUE('source', 'Object', source); 252 this[kState] = { 253 disturbed: false, 254 reader: undefined, 255 state: 'readable', 256 storedError: undefined, 257 stream: undefined, 258 transfer: { 259 writable: undefined, 260 port1: undefined, 261 port2: undefined, 262 promise: undefined, 263 }, 264 }; 265 266 this[kIsClosedPromise] = createDeferredPromise(); 267 this[kControllerErrorFunction] = () => {}; 268 269 // The spec requires handling of the strategy first 270 // here. Specifically, if getting the size and 271 // highWaterMark from the strategy fail, that has 272 // to trigger a throw before getting the details 273 // from the source. So be sure to keep these in 274 // this order. 275 const size = strategy?.size; 276 const highWaterMark = strategy?.highWaterMark; 277 const type = source.type; 278 279 if (`${type}` === 'bytes') { 280 if (size !== undefined) 281 throw new ERR_INVALID_ARG_VALUE.RangeError('strategy.size', size); 282 setupReadableByteStreamControllerFromSource( 283 this, 284 source, 285 extractHighWaterMark(highWaterMark, 0)); 286 } else { 287 if (type !== undefined) 288 throw new ERR_INVALID_ARG_VALUE('source.type', type); 289 setupReadableStreamDefaultControllerFromSource( 290 this, 291 source, 292 extractHighWaterMark(highWaterMark, 1), 293 extractSizeAlgorithm(size)); 294 } 295 296 // eslint-disable-next-line no-constructor-return 297 return makeTransferable(this); 298 } 299 300 get [kIsDisturbed]() { 301 return this[kState].disturbed; 302 } 303 304 get [kIsErrored]() { 305 return this[kState].state === 'errored'; 306 } 307 308 get [kIsReadable]() { 309 return this[kState].state === 'readable'; 310 } 311 312 /** 313 * @readonly 314 * @type {boolean} 315 */ 316 get locked() { 317 if (!isReadableStream(this)) 318 throw new ERR_INVALID_THIS('ReadableStream'); 319 return isReadableStreamLocked(this); 320 } 321 322 /** 323 * @param {any} [reason] 324 * @returns { Promise<void> } 325 */ 326 cancel(reason = undefined) { 327 if (!isReadableStream(this)) 328 return PromiseReject(new ERR_INVALID_THIS('ReadableStream')); 329 if (isReadableStreamLocked(this)) { 330 return PromiseReject( 331 new ERR_INVALID_STATE.TypeError('ReadableStream is locked')); 332 } 333 return readableStreamCancel(this, reason); 334 } 335 336 /** 337 * @param {{ 338 * mode? : "byob" 339 * }} [options] 340 * @returns {ReadableStreamReader} 341 */ 342 getReader(options = kEmptyObject) { 343 if (!isReadableStream(this)) 344 throw new ERR_INVALID_THIS('ReadableStream'); 345 validateObject(options, 'options', { nullable: true, allowFunction: true }); 346 const mode = options?.mode; 347 348 if (mode === undefined) 349 // eslint-disable-next-line no-use-before-define 350 return new ReadableStreamDefaultReader(this); 351 352 if (`${mode}` !== 'byob') 353 throw new ERR_INVALID_ARG_VALUE('options.mode', mode); 354 // eslint-disable-next-line no-use-before-define 355 return new ReadableStreamBYOBReader(this); 356 } 357 358 /** 359 * @param {ReadableWritablePair} transform 360 * @param {StreamPipeOptions} [options] 361 * @returns {ReadableStream} 362 */ 363 pipeThrough(transform, options = kEmptyObject) { 364 if (!isReadableStream(this)) 365 throw new ERR_INVALID_THIS('ReadableStream'); 366 const readable = transform?.readable; 367 if (!isReadableStream(readable)) { 368 throw new ERR_INVALID_ARG_TYPE( 369 'transform.readable', 370 'ReadableStream', 371 readable); 372 } 373 const writable = transform?.writable; 374 if (!isWritableStream(writable)) { 375 throw new ERR_INVALID_ARG_TYPE( 376 'transform.writable', 377 'WritableStream', 378 writable); 379 } 380 381 // The web platform tests require that these be handled one at a 382 // time and in a specific order. options can be null or undefined. 383 const preventAbort = options?.preventAbort; 384 const preventCancel = options?.preventCancel; 385 const preventClose = options?.preventClose; 386 const signal = options?.signal; 387 388 if (signal !== undefined) { 389 validateAbortSignal(signal, 'options.signal'); 390 } 391 392 if (isReadableStreamLocked(this)) 393 throw new ERR_INVALID_STATE.TypeError('The ReadableStream is locked'); 394 if (isWritableStreamLocked(writable)) 395 throw new ERR_INVALID_STATE.TypeError('The WritableStream is locked'); 396 397 const promise = readableStreamPipeTo( 398 this, 399 writable, 400 !!preventClose, 401 !!preventAbort, 402 !!preventCancel, 403 signal); 404 setPromiseHandled(promise); 405 406 return readable; 407 } 408 409 /** 410 * @param {WritableStream} destination 411 * @param {StreamPipeOptions} [options] 412 * @returns {Promise<void>} 413 */ 414 pipeTo(destination, options = kEmptyObject) { 415 try { 416 if (!isReadableStream(this)) 417 throw new ERR_INVALID_THIS('ReadableStream'); 418 if (!isWritableStream(destination)) { 419 throw new ERR_INVALID_ARG_TYPE( 420 'transform.writable', 421 'WritableStream', 422 destination); 423 } 424 425 const preventAbort = options?.preventAbort; 426 const preventCancel = options?.preventCancel; 427 const preventClose = options?.preventClose; 428 const signal = options?.signal; 429 430 if (signal !== undefined) { 431 validateAbortSignal(signal, 'options.signal'); 432 } 433 434 if (isReadableStreamLocked(this)) 435 throw new ERR_INVALID_STATE.TypeError('The ReadableStream is locked'); 436 if (isWritableStreamLocked(destination)) 437 throw new ERR_INVALID_STATE.TypeError('The WritableStream is locked'); 438 439 return readableStreamPipeTo( 440 this, 441 destination, 442 !!preventClose, 443 !!preventAbort, 444 !!preventCancel, 445 signal); 446 } catch (error) { 447 return PromiseReject(error); 448 } 449 } 450 451 /** 452 * @returns {ReadableStream[]} 453 */ 454 tee() { 455 if (!isReadableStream(this)) 456 throw new ERR_INVALID_THIS('ReadableStream'); 457 return readableStreamTee(this, false); 458 } 459 460 /** 461 * @param {{ 462 * preventCancel? : boolean, 463 * }} [options] 464 * @returns {AsyncIterable} 465 */ 466 values(options = kEmptyObject) { 467 if (!isReadableStream(this)) 468 throw new ERR_INVALID_THIS('ReadableStream'); 469 validateObject(options, 'options'); 470 const { 471 preventCancel = false, 472 } = options; 473 474 // eslint-disable-next-line no-use-before-define 475 const reader = new ReadableStreamDefaultReader(this); 476 let done = false; 477 let started = false; 478 let current; 479 480 // The nextSteps function is not an async function in order 481 // to make it more efficient. Because nextSteps explicitly 482 // creates a Promise and returns it in the common case, 483 // making it an async function just causes two additional 484 // unnecessary Promise allocations to occur, which just add 485 // cost. 486 function nextSteps() { 487 if (done) 488 return PromiseResolve({ done: true, value: undefined }); 489 490 if (reader[kState].stream === undefined) { 491 return PromiseReject( 492 new ERR_INVALID_STATE.TypeError( 493 'The reader is not bound to a ReadableStream')); 494 } 495 const promise = createDeferredPromise(); 496 497 readableStreamDefaultReaderRead(reader, { 498 [kChunk](chunk) { 499 current = undefined; 500 promise.resolve({ value: chunk, done: false }); 501 }, 502 [kClose]() { 503 current = undefined; 504 done = true; 505 readableStreamReaderGenericRelease(reader); 506 promise.resolve({ done: true, value: undefined }); 507 }, 508 [kError](error) { 509 current = undefined; 510 done = true; 511 readableStreamReaderGenericRelease(reader); 512 promise.reject(error); 513 }, 514 }); 515 return promise.promise; 516 } 517 518 async function returnSteps(value) { 519 if (done) 520 return { done: true, value }; 521 done = true; 522 523 if (reader[kState].stream === undefined) { 524 throw new ERR_INVALID_STATE.TypeError( 525 'The reader is not bound to a ReadableStream'); 526 } 527 assert(!reader[kState].readRequests.length); 528 if (!preventCancel) { 529 const result = readableStreamReaderGenericCancel(reader, value); 530 readableStreamReaderGenericRelease(reader); 531 await result; 532 return { done: true, value }; 533 } 534 535 readableStreamReaderGenericRelease(reader); 536 return { done: true, value }; 537 } 538 539 // TODO(@jasnell): Explore whether an async generator 540 // can be used here instead of a custom iterator object. 541 return ObjectSetPrototypeOf({ 542 // Changing either of these functions (next or return) 543 // to async functions causes a failure in the streams 544 // Web Platform Tests that check for use of a modified 545 // Promise.prototype.then. Since the await keyword 546 // uses Promise.prototype.then, it is open to prototype 547 // pollution, which causes the test to fail. The other 548 // await uses here do not trigger that failure because 549 // the test that fails does not trigger those code paths. 550 next() { 551 // If this is the first read, delay by one microtask 552 // to ensure that the controller has had an opportunity 553 // to properly start and perform the initial pull. 554 // TODO(@jasnell): The spec doesn't call this out so 555 // need to investigate if it's a bug in our impl or 556 // the spec. 557 if (!started) { 558 current = PromiseResolve(); 559 started = true; 560 } 561 current = current !== undefined ? 562 PromisePrototypeThen(current, nextSteps, nextSteps) : 563 nextSteps(); 564 return current; 565 }, 566 567 return(error) { 568 return current ? 569 PromisePrototypeThen( 570 current, 571 () => returnSteps(error), 572 () => returnSteps(error)) : 573 returnSteps(error); 574 }, 575 576 [SymbolAsyncIterator]() { return this; }, 577 }, AsyncIterator); 578 } 579 580 [kInspect](depth, options) { 581 return customInspect(depth, options, this[kType], { 582 locked: this.locked, 583 state: this[kState].state, 584 supportsBYOB: 585 // eslint-disable-next-line no-use-before-define 586 this[kState].controller instanceof ReadableByteStreamController, 587 }); 588 } 589 590 [kTransfer]() { 591 if (!isReadableStream(this)) 592 throw new ERR_INVALID_THIS('ReadableStream'); 593 if (this.locked) { 594 this[kState].transfer.port1?.close(); 595 this[kState].transfer.port1 = undefined; 596 this[kState].transfer.port2 = undefined; 597 throw new DOMException( 598 'Cannot transfer a locked ReadableStream', 599 'DataCloneError'); 600 } 601 602 const { 603 writable, 604 promise, 605 } = lazyTransfer().newCrossRealmWritableSink( 606 this, 607 this[kState].transfer.port1); 608 609 this[kState].transfer.writable = writable; 610 this[kState].transfer.promise = promise; 611 612 return { 613 data: { port: this[kState].transfer.port2 }, 614 deserializeInfo: 615 'internal/webstreams/readablestream:TransferredReadableStream', 616 }; 617 } 618 619 [kTransferList]() { 620 const { port1, port2 } = new MessageChannel(); 621 this[kState].transfer.port1 = port1; 622 this[kState].transfer.port2 = port2; 623 return [ port2 ]; 624 } 625 626 [kDeserialize]({ port }) { 627 const transfer = lazyTransfer(); 628 setupReadableStreamDefaultControllerFromSource( 629 this, 630 new transfer.CrossRealmTransformReadableSource(port), 631 0, () => 1); 632 } 633} 634 635ObjectDefineProperties(ReadableStream.prototype, { 636 [SymbolAsyncIterator]: { 637 __proto__: null, 638 configurable: true, 639 enumerable: false, 640 writable: true, 641 value: ReadableStream.prototype.values, 642 }, 643 locked: kEnumerableProperty, 644 cancel: kEnumerableProperty, 645 getReader: kEnumerableProperty, 646 pipeThrough: kEnumerableProperty, 647 pipeTo: kEnumerableProperty, 648 tee: kEnumerableProperty, 649 [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStream.name), 650}); 651 652function TransferredReadableStream() { 653 return makeTransferable(ReflectConstruct( 654 function() { 655 this[kType] = 'ReadableStream'; 656 this[kState] = { 657 disturbed: false, 658 state: 'readable', 659 storedError: undefined, 660 stream: undefined, 661 transfer: { 662 writable: undefined, 663 port: undefined, 664 promise: undefined, 665 }, 666 }; 667 this[kIsClosedPromise] = createDeferredPromise(); 668 }, 669 [], ReadableStream)); 670} 671TransferredReadableStream.prototype[kDeserialize] = () => {}; 672 673class ReadableStreamBYOBRequest { 674 [kType] = 'ReadableStreamBYOBRequest'; 675 676 constructor(skipThrowSymbol = undefined) { 677 if (skipThrowSymbol !== kSkipThrow) { 678 throw new ERR_ILLEGAL_CONSTRUCTOR(); 679 } 680 } 681 682 /** 683 * @readonly 684 * @type {ArrayBufferView} 685 */ 686 get view() { 687 if (!isReadableStreamBYOBRequest(this)) 688 throw new ERR_INVALID_THIS('ReadableStreamBYOBRequest'); 689 return this[kState].view; 690 } 691 692 /** 693 * @param {number} bytesWritten 694 */ 695 respond(bytesWritten) { 696 if (!isReadableStreamBYOBRequest(this)) 697 throw new ERR_INVALID_THIS('ReadableStreamBYOBRequest'); 698 const { 699 view, 700 controller, 701 } = this[kState]; 702 if (controller === undefined) { 703 throw new ERR_INVALID_STATE.TypeError( 704 'This BYOB request has been invalidated'); 705 } 706 707 const viewByteLength = ArrayBufferViewGetByteLength(view); 708 const viewBuffer = ArrayBufferViewGetBuffer(view); 709 const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer); 710 711 if (isArrayBufferDetached(viewBuffer)) { 712 throw new ERR_INVALID_STATE.TypeError('Viewed ArrayBuffer is detached'); 713 } 714 715 assert(viewByteLength > 0); 716 assert(viewBufferByteLength > 0); 717 718 readableByteStreamControllerRespond(controller, bytesWritten); 719 } 720 721 /** 722 * @param {ArrayBufferView} view 723 */ 724 respondWithNewView(view) { 725 if (!isReadableStreamBYOBRequest(this)) 726 throw new ERR_INVALID_THIS('ReadableStreamBYOBRequest'); 727 const { 728 controller, 729 } = this[kState]; 730 731 if (controller === undefined) { 732 throw new ERR_INVALID_STATE.TypeError( 733 'This BYOB request has been invalidated'); 734 } 735 736 validateBuffer(view, 'view'); 737 738 if (isViewedArrayBufferDetached(view)) { 739 throw new ERR_INVALID_STATE.TypeError('Viewed ArrayBuffer is detached'); 740 } 741 742 readableByteStreamControllerRespondWithNewView(controller, view); 743 } 744 745 [kInspect](depth, options) { 746 return customInspect(depth, options, this[kType], { 747 view: this.view, 748 controller: this[kState].controller, 749 }); 750 } 751} 752 753ObjectDefineProperties(ReadableStreamBYOBRequest.prototype, { 754 view: kEnumerableProperty, 755 respond: kEnumerableProperty, 756 respondWithNewView: kEnumerableProperty, 757 [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStreamBYOBRequest.name), 758}); 759 760function createReadableStreamBYOBRequest(controller, view) { 761 const stream = new ReadableStreamBYOBRequest(kSkipThrow); 762 763 stream[kState] = { 764 controller, 765 view, 766 }; 767 768 return stream; 769} 770 771class DefaultReadRequest { 772 constructor() { 773 this[kState] = createDeferredPromise(); 774 } 775 776 [kChunk](value) { 777 this[kState].resolve?.({ value, done: false }); 778 } 779 780 [kClose]() { 781 this[kState].resolve?.({ value: undefined, done: true }); 782 } 783 784 [kError](error) { 785 this[kState].reject?.(error); 786 } 787 788 get promise() { return this[kState].promise; } 789} 790 791class ReadIntoRequest { 792 constructor() { 793 this[kState] = createDeferredPromise(); 794 } 795 796 [kChunk](value) { 797 this[kState].resolve?.({ value, done: false }); 798 } 799 800 [kClose](value) { 801 this[kState].resolve?.({ value, done: true }); 802 } 803 804 [kError](error) { 805 this[kState].reject?.(error); 806 } 807 808 get promise() { return this[kState].promise; } 809} 810 811class ReadableStreamDefaultReader { 812 [kType] = 'ReadableStreamDefaultReader'; 813 814 /** 815 * @param {ReadableStream} stream 816 */ 817 constructor(stream) { 818 if (!isReadableStream(stream)) 819 throw new ERR_INVALID_ARG_TYPE('stream', 'ReadableStream', stream); 820 this[kState] = { 821 readRequests: [], 822 stream: undefined, 823 close: { 824 promise: undefined, 825 resolve: undefined, 826 reject: undefined, 827 }, 828 }; 829 setupReadableStreamDefaultReader(this, stream); 830 } 831 832 /** 833 * @returns {Promise<{ 834 * value : any, 835 * done : boolean 836 * }>} 837 */ 838 read() { 839 if (!isReadableStreamDefaultReader(this)) 840 return PromiseReject(new ERR_INVALID_THIS('ReadableStreamDefaultReader')); 841 if (this[kState].stream === undefined) { 842 return PromiseReject( 843 new ERR_INVALID_STATE.TypeError( 844 'The reader is not attached to a stream')); 845 } 846 const readRequest = new DefaultReadRequest(); 847 readableStreamDefaultReaderRead(this, readRequest); 848 return readRequest.promise; 849 } 850 851 releaseLock() { 852 if (!isReadableStreamDefaultReader(this)) 853 throw new ERR_INVALID_THIS('ReadableStreamDefaultReader'); 854 if (this[kState].stream === undefined) 855 return; 856 readableStreamDefaultReaderRelease(this); 857 } 858 859 /** 860 * @readonly 861 * @type {Promise<void>} 862 */ 863 get closed() { 864 if (!isReadableStreamDefaultReader(this)) 865 return PromiseReject(new ERR_INVALID_THIS('ReadableStreamDefaultReader')); 866 return this[kState].close.promise; 867 } 868 869 /** 870 * @param {any} [reason] 871 * @returns {Promise<void>} 872 */ 873 cancel(reason = undefined) { 874 if (!isReadableStreamDefaultReader(this)) 875 return PromiseReject(new ERR_INVALID_THIS('ReadableStreamDefaultReader')); 876 if (this[kState].stream === undefined) { 877 return PromiseReject(new ERR_INVALID_STATE.TypeError( 878 'The reader is not attached to a stream')); 879 } 880 return readableStreamReaderGenericCancel(this, reason); 881 } 882 883 [kInspect](depth, options) { 884 return customInspect(depth, options, this[kType], { 885 stream: this[kState].stream, 886 readRequests: this[kState].readRequests.length, 887 close: this[kState].close.promise, 888 }); 889 } 890} 891 892ObjectDefineProperties(ReadableStreamDefaultReader.prototype, { 893 closed: kEnumerableProperty, 894 read: kEnumerableProperty, 895 releaseLock: kEnumerableProperty, 896 cancel: kEnumerableProperty, 897 [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStreamDefaultReader.name), 898}); 899 900class ReadableStreamBYOBReader { 901 [kType] = 'ReadableStreamBYOBReader'; 902 903 /** 904 * @param {ReadableStream} stream 905 */ 906 constructor(stream) { 907 if (!isReadableStream(stream)) 908 throw new ERR_INVALID_ARG_TYPE('stream', 'ReadableStream', stream); 909 this[kState] = { 910 stream: undefined, 911 requestIntoRequests: [], 912 close: { 913 promise: undefined, 914 resolve: undefined, 915 reject: undefined, 916 }, 917 }; 918 setupReadableStreamBYOBReader(this, stream); 919 } 920 921 /** 922 * @param {ArrayBufferView} view 923 * @returns {Promise<{ 924 * view : ArrayBufferView, 925 * done : boolean, 926 * }>} 927 */ 928 read(view) { 929 if (!isReadableStreamBYOBReader(this)) 930 return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader')); 931 if (!isArrayBufferView(view)) { 932 return PromiseReject( 933 new ERR_INVALID_ARG_TYPE( 934 'view', 935 [ 936 'Buffer', 937 'TypedArray', 938 'DataView', 939 ], 940 view)); 941 } 942 943 const viewByteLength = ArrayBufferViewGetByteLength(view); 944 const viewBuffer = ArrayBufferViewGetBuffer(view); 945 const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer); 946 947 if (viewByteLength === 0 || viewBufferByteLength === 0) { 948 return PromiseReject( 949 new ERR_INVALID_STATE.TypeError( 950 'View or Viewed ArrayBuffer is zero-length or detached', 951 ), 952 ); 953 } 954 955 // Supposed to assert here that the view's buffer is not 956 // detached, but there's no API available to use to check that. 957 if (this[kState].stream === undefined) { 958 return PromiseReject( 959 new ERR_INVALID_STATE.TypeError( 960 'The reader is not attached to a stream')); 961 } 962 const readIntoRequest = new ReadIntoRequest(); 963 readableStreamBYOBReaderRead(this, view, readIntoRequest); 964 return readIntoRequest.promise; 965 } 966 967 releaseLock() { 968 if (!isReadableStreamBYOBReader(this)) 969 throw new ERR_INVALID_THIS('ReadableStreamBYOBReader'); 970 if (this[kState].stream === undefined) 971 return; 972 readableStreamBYOBReaderRelease(this); 973 } 974 975 /** 976 * @readonly 977 * @type {Promise<void>} 978 */ 979 get closed() { 980 if (!isReadableStreamBYOBReader(this)) 981 return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader')); 982 return this[kState].close.promise; 983 } 984 985 /** 986 * @param {any} [reason] 987 * @returns {Promise<void>} 988 */ 989 cancel(reason = undefined) { 990 if (!isReadableStreamBYOBReader(this)) 991 return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader')); 992 if (this[kState].stream === undefined) { 993 return PromiseReject(new ERR_INVALID_STATE.TypeError( 994 'The reader is not attached to a stream')); 995 } 996 return readableStreamReaderGenericCancel(this, reason); 997 } 998 999 [kInspect](depth, options) { 1000 return customInspect(depth, options, this[kType], { 1001 stream: this[kState].stream, 1002 requestIntoRequests: this[kState].requestIntoRequests.length, 1003 close: this[kState].close.promise, 1004 }); 1005 } 1006} 1007 1008ObjectDefineProperties(ReadableStreamBYOBReader.prototype, { 1009 closed: kEnumerableProperty, 1010 read: kEnumerableProperty, 1011 releaseLock: kEnumerableProperty, 1012 cancel: kEnumerableProperty, 1013 [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStreamBYOBReader.name), 1014}); 1015 1016class ReadableStreamDefaultController { 1017 [kType] = 'ReadableStreamDefaultController'; 1018 [kState] = {}; 1019 1020 constructor(skipThrowSymbol = undefined) { 1021 if (skipThrowSymbol !== kSkipThrow) { 1022 throw new ERR_ILLEGAL_CONSTRUCTOR(); 1023 } 1024 } 1025 1026 /** 1027 * @readonly 1028 * @type {number} 1029 */ 1030 get desiredSize() { 1031 return readableStreamDefaultControllerGetDesiredSize(this); 1032 } 1033 1034 close() { 1035 if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) 1036 throw new ERR_INVALID_STATE.TypeError('Controller is already closed'); 1037 readableStreamDefaultControllerClose(this); 1038 } 1039 1040 /** 1041 * @param {any} [chunk] 1042 */ 1043 enqueue(chunk = undefined) { 1044 if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) 1045 throw new ERR_INVALID_STATE.TypeError('Controller is already closed'); 1046 readableStreamDefaultControllerEnqueue(this, chunk); 1047 } 1048 1049 /** 1050 * @param {any} [error] 1051 */ 1052 error(error = undefined) { 1053 readableStreamDefaultControllerError(this, error); 1054 } 1055 1056 [kCancel](reason) { 1057 return readableStreamDefaultControllerCancelSteps(this, reason); 1058 } 1059 1060 [kPull](readRequest) { 1061 readableStreamDefaultControllerPullSteps(this, readRequest); 1062 } 1063 1064 [kRelease]() {} 1065 1066 [kInspect](depth, options) { 1067 return customInspect(depth, options, this[kType], { }); 1068 } 1069} 1070 1071ObjectDefineProperties(ReadableStreamDefaultController.prototype, { 1072 desiredSize: kEnumerableProperty, 1073 close: kEnumerableProperty, 1074 enqueue: kEnumerableProperty, 1075 error: kEnumerableProperty, 1076 [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStreamDefaultController.name), 1077}); 1078 1079class ReadableByteStreamController { 1080 [kType] = 'ReadableByteStreamController'; 1081 [kState] = {}; 1082 1083 constructor(skipThrowSymbol = undefined) { 1084 if (skipThrowSymbol !== kSkipThrow) { 1085 throw new ERR_ILLEGAL_CONSTRUCTOR(); 1086 } 1087 } 1088 1089 /** 1090 * @readonly 1091 * @type {ReadableStreamBYOBRequest} 1092 */ 1093 get byobRequest() { 1094 if (!isReadableByteStreamController(this)) 1095 throw new ERR_INVALID_THIS('ReadableByteStreamController'); 1096 if (this[kState].byobRequest === null && 1097 this[kState].pendingPullIntos.length) { 1098 const { 1099 buffer, 1100 byteOffset, 1101 bytesFilled, 1102 byteLength, 1103 } = this[kState].pendingPullIntos[0]; 1104 const view = 1105 new Uint8Array( 1106 buffer, 1107 byteOffset + bytesFilled, 1108 byteLength - bytesFilled); 1109 this[kState].byobRequest = createReadableStreamBYOBRequest(this, view); 1110 } 1111 return this[kState].byobRequest; 1112 } 1113 1114 /** 1115 * @readonly 1116 * @type {number} 1117 */ 1118 get desiredSize() { 1119 if (!isReadableByteStreamController(this)) 1120 throw new ERR_INVALID_THIS('ReadableByteStreamController'); 1121 return readableByteStreamControllerGetDesiredSize(this); 1122 } 1123 1124 close() { 1125 if (!isReadableByteStreamController(this)) 1126 throw new ERR_INVALID_THIS('ReadableByteStreamController'); 1127 if (this[kState].closeRequested) 1128 throw new ERR_INVALID_STATE.TypeError('Controller is already closed'); 1129 if (this[kState].stream[kState].state !== 'readable') 1130 throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed'); 1131 readableByteStreamControllerClose(this); 1132 } 1133 1134 /** 1135 * @param {ArrayBufferView} chunk 1136 */ 1137 enqueue(chunk) { 1138 if (!isReadableByteStreamController(this)) 1139 throw new ERR_INVALID_THIS('ReadableByteStreamController'); 1140 validateBuffer(chunk); 1141 const chunkByteLength = ArrayBufferViewGetByteLength(chunk); 1142 const chunkBuffer = ArrayBufferViewGetBuffer(chunk); 1143 const chunkBufferByteLength = ArrayBufferPrototypeGetByteLength(chunkBuffer); 1144 if (chunkByteLength === 0 || chunkBufferByteLength === 0) { 1145 throw new ERR_INVALID_STATE.TypeError( 1146 'chunk ArrayBuffer is zero-length or detached'); 1147 } 1148 if (this[kState].closeRequested) 1149 throw new ERR_INVALID_STATE.TypeError('Controller is already closed'); 1150 if (this[kState].stream[kState].state !== 'readable') 1151 throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed'); 1152 readableByteStreamControllerEnqueue(this, chunk); 1153 } 1154 1155 /** 1156 * @param {any} [error] 1157 */ 1158 error(error = undefined) { 1159 if (!isReadableByteStreamController(this)) 1160 throw new ERR_INVALID_THIS('ReadableByteStreamController'); 1161 readableByteStreamControllerError(this, error); 1162 } 1163 1164 [kCancel](reason) { 1165 return readableByteStreamControllerCancelSteps(this, reason); 1166 } 1167 1168 [kPull](readRequest) { 1169 readableByteStreamControllerPullSteps(this, readRequest); 1170 } 1171 1172 [kRelease]() { 1173 const { 1174 pendingPullIntos, 1175 } = this[kState]; 1176 if (pendingPullIntos.length > 0) { 1177 const firstPendingPullInto = pendingPullIntos[0]; 1178 firstPendingPullInto.type = 'none'; 1179 this[kState].pendingPullIntos = [firstPendingPullInto]; 1180 } 1181 } 1182 1183 [kInspect](depth, options) { 1184 return customInspect(depth, options, this[kType], { }); 1185 } 1186} 1187 1188ObjectDefineProperties(ReadableByteStreamController.prototype, { 1189 byobRequest: kEnumerableProperty, 1190 desiredSize: kEnumerableProperty, 1191 close: kEnumerableProperty, 1192 enqueue: kEnumerableProperty, 1193 error: kEnumerableProperty, 1194 [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name), 1195}); 1196 1197function createTeeReadableStream(start, pull, cancel) { 1198 return ReflectConstruct( 1199 function() { 1200 this[kType] = 'ReadableStream'; 1201 this[kState] = { 1202 disturbed: false, 1203 state: 'readable', 1204 storedError: undefined, 1205 stream: undefined, 1206 transfer: { 1207 writable: undefined, 1208 port: undefined, 1209 promise: undefined, 1210 }, 1211 }; 1212 this[kIsClosedPromise] = createDeferredPromise(); 1213 setupReadableStreamDefaultControllerFromSource( 1214 this, 1215 ObjectCreate(null, { 1216 start: { __proto__: null, value: start }, 1217 pull: { __proto__: null, value: pull }, 1218 cancel: { __proto__: null, value: cancel }, 1219 }), 1220 1, 1221 () => 1); 1222 return makeTransferable(this); 1223 }, [], ReadableStream, 1224 ); 1225} 1226 1227const isReadableStream = 1228 isBrandCheck('ReadableStream'); 1229const isReadableByteStreamController = 1230 isBrandCheck('ReadableByteStreamController'); 1231const isReadableStreamBYOBRequest = 1232 isBrandCheck('ReadableStreamBYOBRequest'); 1233const isReadableStreamDefaultReader = 1234 isBrandCheck('ReadableStreamDefaultReader'); 1235const isReadableStreamBYOBReader = 1236 isBrandCheck('ReadableStreamBYOBReader'); 1237 1238// ---- ReadableStream Implementation 1239 1240function readableStreamPipeTo( 1241 source, 1242 dest, 1243 preventClose, 1244 preventAbort, 1245 preventCancel, 1246 signal) { 1247 1248 let reader; 1249 let writer; 1250 let disposable; 1251 // Both of these can throw synchronously. We want to capture 1252 // the error and return a rejected promise instead. 1253 try { 1254 reader = new ReadableStreamDefaultReader(source); 1255 writer = new WritableStreamDefaultWriter(dest); 1256 } catch (error) { 1257 return PromiseReject(error); 1258 } 1259 1260 source[kState].disturbed = true; 1261 1262 let shuttingDown = false; 1263 1264 if (signal !== undefined) { 1265 try { 1266 validateAbortSignal(signal, 'options.signal'); 1267 } catch (error) { 1268 return PromiseReject(error); 1269 } 1270 } 1271 1272 const promise = createDeferredPromise(); 1273 1274 let currentWrite = PromiseResolve(); 1275 1276 // The error here can be undefined. The rejected arg 1277 // tells us that the promise must be rejected even 1278 // when error is undefine. 1279 function finalize(rejected, error) { 1280 writableStreamDefaultWriterRelease(writer); 1281 readableStreamReaderGenericRelease(reader); 1282 if (signal !== undefined) 1283 disposable?.[SymbolDispose](); 1284 if (rejected) 1285 promise.reject(error); 1286 else 1287 promise.resolve(); 1288 } 1289 1290 async function waitForCurrentWrite() { 1291 const write = currentWrite; 1292 await write; 1293 if (write !== currentWrite) 1294 await waitForCurrentWrite(); 1295 } 1296 1297 function shutdownWithAnAction(action, rejected, originalError) { 1298 if (shuttingDown) return; 1299 shuttingDown = true; 1300 if (dest[kState].state === 'writable' && 1301 !writableStreamCloseQueuedOrInFlight(dest)) { 1302 PromisePrototypeThen( 1303 waitForCurrentWrite(), 1304 complete, 1305 (error) => finalize(true, error)); 1306 return; 1307 } 1308 complete(); 1309 1310 function complete() { 1311 PromisePrototypeThen( 1312 action(), 1313 () => finalize(rejected, originalError), 1314 (error) => finalize(true, error)); 1315 } 1316 } 1317 1318 function shutdown(rejected, error) { 1319 if (shuttingDown) return; 1320 shuttingDown = true; 1321 if (dest[kState].state === 'writable' && 1322 !writableStreamCloseQueuedOrInFlight(dest)) { 1323 PromisePrototypeThen( 1324 waitForCurrentWrite(), 1325 () => finalize(rejected, error), 1326 (error) => finalize(true, error)); 1327 return; 1328 } 1329 finalize(rejected, error); 1330 } 1331 1332 function abortAlgorithm() { 1333 let error; 1334 if (signal.reason instanceof AbortError) { 1335 // Cannot use the AbortError class here. It must be a DOMException. 1336 error = new DOMException(signal.reason.message, 'AbortError'); 1337 } else { 1338 error = signal.reason; 1339 } 1340 1341 const actions = []; 1342 if (!preventAbort) { 1343 ArrayPrototypePush( 1344 actions, 1345 () => { 1346 if (dest[kState].state === 'writable') 1347 return writableStreamAbort(dest, error); 1348 return PromiseResolve(); 1349 }); 1350 } 1351 if (!preventCancel) { 1352 ArrayPrototypePush( 1353 actions, 1354 () => { 1355 if (source[kState].state === 'readable') 1356 return readableStreamCancel(source, error); 1357 return PromiseResolve(); 1358 }); 1359 } 1360 1361 shutdownWithAnAction( 1362 () => SafePromiseAll(actions, (action) => action()), 1363 true, 1364 error); 1365 } 1366 1367 function watchErrored(stream, promise, action) { 1368 if (stream[kState].state === 'errored') 1369 action(stream[kState].storedError); 1370 else 1371 PromisePrototypeThen(promise, undefined, action); 1372 } 1373 1374 function watchClosed(stream, promise, action) { 1375 if (stream[kState].state === 'closed') 1376 action(); 1377 else 1378 PromisePrototypeThen(promise, action, () => {}); 1379 } 1380 1381 async function step() { 1382 if (shuttingDown) 1383 return true; 1384 await writer[kState].ready.promise; 1385 return new Promise((resolve, reject) => { 1386 readableStreamDefaultReaderRead( 1387 reader, 1388 { 1389 [kChunk](chunk) { 1390 currentWrite = writableStreamDefaultWriterWrite(writer, chunk); 1391 setPromiseHandled(currentWrite); 1392 resolve(false); 1393 }, 1394 [kClose]: () => resolve(true), 1395 [kError]: reject, 1396 }); 1397 }); 1398 } 1399 1400 async function run() { 1401 // Run until step resolves as true 1402 while (!await step()); 1403 } 1404 1405 if (signal !== undefined) { 1406 if (signal.aborted) { 1407 abortAlgorithm(); 1408 return promise.promise; 1409 } 1410 addAbortListener ??= require('events').addAbortListener; 1411 disposable = addAbortListener(signal, abortAlgorithm); 1412 } 1413 1414 setPromiseHandled(run()); 1415 1416 watchErrored(source, reader[kState].close.promise, (error) => { 1417 if (!preventAbort) { 1418 return shutdownWithAnAction( 1419 () => writableStreamAbort(dest, error), 1420 true, 1421 error); 1422 } 1423 shutdown(true, error); 1424 }); 1425 1426 watchErrored(dest, writer[kState].close.promise, (error) => { 1427 if (!preventCancel) { 1428 return shutdownWithAnAction( 1429 () => readableStreamCancel(source, error), 1430 true, 1431 error); 1432 } 1433 shutdown(true, error); 1434 }); 1435 1436 watchClosed(source, reader[kState].close.promise, () => { 1437 if (!preventClose) { 1438 return shutdownWithAnAction( 1439 () => writableStreamDefaultWriterCloseWithErrorPropagation(writer)); 1440 } 1441 shutdown(); 1442 }); 1443 1444 if (writableStreamCloseQueuedOrInFlight(dest) || 1445 dest[kState].state === 'closed') { 1446 const error = new ERR_INVALID_STATE.TypeError( 1447 'Destination WritableStream is closed'); 1448 if (!preventCancel) { 1449 shutdownWithAnAction( 1450 () => readableStreamCancel(source, error), true, error); 1451 } else { 1452 shutdown(true, error); 1453 } 1454 } 1455 1456 return promise.promise; 1457} 1458 1459function readableStreamTee(stream, cloneForBranch2) { 1460 if (isReadableByteStreamController(stream[kState].controller)) { 1461 return readableByteStreamTee(stream); 1462 } 1463 return readableStreamDefaultTee(stream, cloneForBranch2); 1464} 1465 1466function readableStreamDefaultTee(stream, cloneForBranch2) { 1467 const reader = new ReadableStreamDefaultReader(stream); 1468 let reading = false; 1469 let canceled1 = false; 1470 let canceled2 = false; 1471 let reason1; 1472 let reason2; 1473 let branch1; 1474 let branch2; 1475 const cancelPromise = createDeferredPromise(); 1476 1477 async function pullAlgorithm() { 1478 if (reading) return; 1479 reading = true; 1480 const readRequest = { 1481 [kChunk](value) { 1482 queueMicrotask(() => { 1483 reading = false; 1484 const value1 = value; 1485 let value2 = value; 1486 if (!canceled2 && cloneForBranch2) { 1487 value2 = structuredClone(value2); 1488 } 1489 if (!canceled1) { 1490 readableStreamDefaultControllerEnqueue( 1491 branch1[kState].controller, 1492 value1); 1493 } 1494 if (!canceled2) { 1495 readableStreamDefaultControllerEnqueue( 1496 branch2[kState].controller, 1497 value2); 1498 } 1499 }); 1500 }, 1501 [kClose]() { 1502 // The `process.nextTick()` is not part of the spec. 1503 // This approach was needed to avoid a race condition working with esm 1504 // Further information, see: https://github.com/nodejs/node/issues/39758 1505 process.nextTick(() => { 1506 reading = false; 1507 if (!canceled1) 1508 readableStreamDefaultControllerClose(branch1[kState].controller); 1509 if (!canceled2) 1510 readableStreamDefaultControllerClose(branch2[kState].controller); 1511 if (!canceled1 || !canceled2) 1512 cancelPromise.resolve(); 1513 }); 1514 }, 1515 [kError]() { 1516 reading = false; 1517 }, 1518 }; 1519 readableStreamDefaultReaderRead(reader, readRequest); 1520 } 1521 1522 function cancel1Algorithm(reason) { 1523 canceled1 = true; 1524 reason1 = reason; 1525 if (canceled2) { 1526 const compositeReason = [reason1, reason2]; 1527 cancelPromise.resolve(readableStreamCancel(stream, compositeReason)); 1528 } 1529 return cancelPromise.promise; 1530 } 1531 1532 function cancel2Algorithm(reason) { 1533 canceled2 = true; 1534 reason2 = reason; 1535 if (canceled1) { 1536 const compositeReason = [reason1, reason2]; 1537 cancelPromise.resolve(readableStreamCancel(stream, compositeReason)); 1538 } 1539 return cancelPromise.promise; 1540 } 1541 1542 branch1 = 1543 createTeeReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm); 1544 branch2 = 1545 createTeeReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm); 1546 1547 PromisePrototypeThen( 1548 reader[kState].close.promise, 1549 undefined, 1550 (error) => { 1551 readableStreamDefaultControllerError(branch1[kState].controller, error); 1552 readableStreamDefaultControllerError(branch2[kState].controller, error); 1553 if (!canceled1 || !canceled2) 1554 cancelPromise.resolve(); 1555 }); 1556 1557 return [branch1, branch2]; 1558} 1559 1560function readableByteStreamTee(stream) { 1561 assert(isReadableStream(stream)); 1562 assert(isReadableByteStreamController(stream[kState].controller)); 1563 1564 let reader = new ReadableStreamDefaultReader(stream); 1565 let reading = false; 1566 let readAgainForBranch1 = false; 1567 let readAgainForBranch2 = false; 1568 let canceled1 = false; 1569 let canceled2 = false; 1570 let reason1; 1571 let reason2; 1572 let branch1; 1573 let branch2; 1574 const cancelDeferred = createDeferredPromise(); 1575 1576 function forwardReaderError(thisReader) { 1577 PromisePrototypeThen( 1578 thisReader[kState].close.promise, 1579 undefined, 1580 (error) => { 1581 if (thisReader !== reader) { 1582 return; 1583 } 1584 readableStreamDefaultControllerError(branch1[kState].controller, error); 1585 readableStreamDefaultControllerError(branch2[kState].controller, error); 1586 if (!canceled1 || !canceled2) { 1587 cancelDeferred.resolve(); 1588 } 1589 }, 1590 ); 1591 } 1592 1593 function pullWithDefaultReader() { 1594 if (isReadableStreamBYOBReader(reader)) { 1595 readableStreamBYOBReaderRelease(reader); 1596 reader = new ReadableStreamDefaultReader(stream); 1597 forwardReaderError(reader); 1598 } 1599 1600 const readRequest = { 1601 [kChunk](chunk) { 1602 queueMicrotask(() => { 1603 readAgainForBranch1 = false; 1604 readAgainForBranch2 = false; 1605 const chunk1 = chunk; 1606 let chunk2 = chunk; 1607 1608 if (!canceled1 && !canceled2) { 1609 try { 1610 chunk2 = cloneAsUint8Array(chunk); 1611 } catch (error) { 1612 readableByteStreamControllerError( 1613 branch1[kState].controller, 1614 error, 1615 ); 1616 readableByteStreamControllerError( 1617 branch2[kState].controller, 1618 error, 1619 ); 1620 cancelDeferred.resolve(readableStreamCancel(stream, error)); 1621 return; 1622 } 1623 } 1624 if (!canceled1) { 1625 readableByteStreamControllerEnqueue( 1626 branch1[kState].controller, 1627 chunk1, 1628 ); 1629 } 1630 if (!canceled2) { 1631 readableByteStreamControllerEnqueue( 1632 branch2[kState].controller, 1633 chunk2, 1634 ); 1635 } 1636 reading = false; 1637 1638 if (readAgainForBranch1) { 1639 pull1Algorithm(); 1640 } else if (readAgainForBranch2) { 1641 pull2Algorithm(); 1642 } 1643 }); 1644 }, 1645 [kClose]() { 1646 reading = false; 1647 1648 if (!canceled1) { 1649 readableByteStreamControllerClose(branch1[kState].controller); 1650 } 1651 if (!canceled2) { 1652 readableByteStreamControllerClose(branch2[kState].controller); 1653 } 1654 if (branch1[kState].controller[kState].pendingPullIntos.length > 0) { 1655 readableByteStreamControllerRespond(branch1[kState].controller, 0); 1656 } 1657 if (branch2[kState].controller[kState].pendingPullIntos.length > 0) { 1658 readableByteStreamControllerRespond(branch2[kState].controller, 0); 1659 } 1660 if (!canceled1 || !canceled2) { 1661 cancelDeferred.resolve(); 1662 } 1663 }, 1664 [kError]() { 1665 reading = false; 1666 }, 1667 }; 1668 1669 readableStreamDefaultReaderRead(reader, readRequest); 1670 } 1671 1672 function pullWithBYOBReader(view, forBranch2) { 1673 if (isReadableStreamDefaultReader(reader)) { 1674 readableStreamDefaultReaderRelease(reader); 1675 reader = new ReadableStreamBYOBReader(stream); 1676 forwardReaderError(reader); 1677 } 1678 1679 const byobBranch = forBranch2 === true ? branch2 : branch1; 1680 const otherBranch = forBranch2 === false ? branch2 : branch1; 1681 const readIntoRequest = { 1682 [kChunk](chunk) { 1683 queueMicrotask(() => { 1684 readAgainForBranch1 = false; 1685 readAgainForBranch2 = false; 1686 const byobCanceled = forBranch2 === true ? canceled2 : canceled1; 1687 const otherCanceled = forBranch2 === false ? canceled2 : canceled1; 1688 1689 if (!otherCanceled) { 1690 let clonedChunk; 1691 1692 try { 1693 clonedChunk = cloneAsUint8Array(chunk); 1694 } catch (error) { 1695 readableByteStreamControllerError( 1696 byobBranch[kState].controller, 1697 error, 1698 ); 1699 readableByteStreamControllerError( 1700 otherBranch[kState].controller, 1701 error, 1702 ); 1703 cancelDeferred.resolve(readableStreamCancel(stream, error)); 1704 return; 1705 } 1706 if (!byobCanceled) { 1707 readableByteStreamControllerRespondWithNewView( 1708 byobBranch[kState].controller, 1709 chunk, 1710 ); 1711 } 1712 1713 readableByteStreamControllerEnqueue( 1714 otherBranch[kState].controller, 1715 clonedChunk, 1716 ); 1717 } else if (!byobCanceled) { 1718 readableByteStreamControllerRespondWithNewView( 1719 byobBranch[kState].controller, 1720 chunk, 1721 ); 1722 } 1723 reading = false; 1724 1725 if (readAgainForBranch1) { 1726 pull1Algorithm(); 1727 } else if (readAgainForBranch2) { 1728 pull2Algorithm(); 1729 } 1730 }); 1731 }, 1732 [kClose](chunk) { 1733 reading = false; 1734 1735 const byobCanceled = forBranch2 === true ? canceled2 : canceled1; 1736 const otherCanceled = forBranch2 === false ? canceled2 : canceled1; 1737 1738 if (!byobCanceled) { 1739 readableByteStreamControllerClose(byobBranch[kState].controller); 1740 } 1741 if (!otherCanceled) { 1742 readableByteStreamControllerClose(otherBranch[kState].controller); 1743 } 1744 if (chunk !== undefined) { 1745 if (!byobCanceled) { 1746 readableByteStreamControllerRespondWithNewView( 1747 byobBranch[kState].controller, 1748 chunk, 1749 ); 1750 } 1751 if ( 1752 !otherCanceled && 1753 otherBranch[kState].controller[kState].pendingPullIntos.length > 0 1754 ) { 1755 readableByteStreamControllerRespond( 1756 otherBranch[kState].controller, 1757 0, 1758 ); 1759 } 1760 } 1761 if (!byobCanceled || !otherCanceled) { 1762 cancelDeferred.resolve(); 1763 } 1764 }, 1765 [kError]() { 1766 reading = false; 1767 }, 1768 }; 1769 readableStreamBYOBReaderRead(reader, view, readIntoRequest); 1770 } 1771 1772 function pull1Algorithm() { 1773 if (reading) { 1774 readAgainForBranch1 = true; 1775 return PromiseResolve(); 1776 } 1777 reading = true; 1778 1779 const byobRequest = branch1[kState].controller.byobRequest; 1780 if (byobRequest === null) { 1781 pullWithDefaultReader(); 1782 } else { 1783 pullWithBYOBReader(byobRequest[kState].view, false); 1784 } 1785 return PromiseResolve(); 1786 } 1787 1788 function pull2Algorithm() { 1789 if (reading) { 1790 readAgainForBranch2 = true; 1791 return PromiseResolve(); 1792 } 1793 reading = true; 1794 1795 const byobRequest = branch2[kState].controller.byobRequest; 1796 if (byobRequest === null) { 1797 pullWithDefaultReader(); 1798 } else { 1799 pullWithBYOBReader(byobRequest[kState].view, true); 1800 } 1801 return PromiseResolve(); 1802 } 1803 1804 function cancel1Algorithm(reason) { 1805 canceled1 = true; 1806 reason1 = reason; 1807 if (canceled2) { 1808 cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2])); 1809 } 1810 return cancelDeferred.promise; 1811 } 1812 1813 function cancel2Algorithm(reason) { 1814 canceled2 = true; 1815 reason2 = reason; 1816 if (canceled1) { 1817 cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2])); 1818 } 1819 return cancelDeferred.promise; 1820 } 1821 1822 branch1 = new ReadableStream({ 1823 type: 'bytes', 1824 pull: pull1Algorithm, 1825 cancel: cancel1Algorithm, 1826 }); 1827 branch2 = new ReadableStream({ 1828 type: 'bytes', 1829 pull: pull2Algorithm, 1830 cancel: cancel2Algorithm, 1831 }); 1832 1833 forwardReaderError(reader); 1834 1835 return [branch1, branch2]; 1836} 1837 1838function readableByteStreamControllerConvertPullIntoDescriptor(desc) { 1839 const { 1840 buffer, 1841 bytesFilled, 1842 byteLength, 1843 byteOffset, 1844 ctor, 1845 elementSize, 1846 } = desc; 1847 if (bytesFilled > byteLength) 1848 throw new ERR_INVALID_STATE.RangeError('The buffer size is invalid'); 1849 assert(!(bytesFilled % elementSize)); 1850 const transferredBuffer = transferArrayBuffer(buffer); 1851 1852 if (ctor === Buffer) { 1853 return Buffer.from(transferredBuffer, byteOffset, bytesFilled / elementSize); 1854 } 1855 1856 return new ctor(transferredBuffer, byteOffset, bytesFilled / elementSize); 1857} 1858 1859function isReadableStreamLocked(stream) { 1860 return stream[kState].reader !== undefined; 1861} 1862 1863function readableStreamCancel(stream, reason) { 1864 stream[kState].disturbed = true; 1865 switch (stream[kState].state) { 1866 case 'closed': 1867 return PromiseResolve(); 1868 case 'errored': 1869 return PromiseReject(stream[kState].storedError); 1870 } 1871 readableStreamClose(stream); 1872 const { 1873 reader, 1874 } = stream[kState]; 1875 if (reader !== undefined && readableStreamHasBYOBReader(stream)) { 1876 for (let n = 0; n < reader[kState].readIntoRequests.length; n++) 1877 reader[kState].readIntoRequests[n][kClose](); 1878 reader[kState].readIntoRequests = []; 1879 } 1880 1881 return PromisePrototypeThen( 1882 ensureIsPromise( 1883 stream[kState].controller[kCancel], 1884 stream[kState].controller, 1885 reason), 1886 () => {}); 1887} 1888 1889function readableStreamClose(stream) { 1890 assert(stream[kState].state === 'readable'); 1891 stream[kState].state = 'closed'; 1892 stream[kIsClosedPromise].resolve(); 1893 const { 1894 reader, 1895 } = stream[kState]; 1896 1897 if (reader === undefined) 1898 return; 1899 1900 reader[kState].close.resolve(); 1901 1902 if (readableStreamHasDefaultReader(stream)) { 1903 for (let n = 0; n < reader[kState].readRequests.length; n++) 1904 reader[kState].readRequests[n][kClose](); 1905 reader[kState].readRequests = []; 1906 } 1907} 1908 1909function readableStreamError(stream, error) { 1910 assert(stream[kState].state === 'readable'); 1911 stream[kState].state = 'errored'; 1912 stream[kState].storedError = error; 1913 stream[kIsClosedPromise].reject(error); 1914 setPromiseHandled(stream[kIsClosedPromise].promise); 1915 1916 const { 1917 reader, 1918 } = stream[kState]; 1919 1920 if (reader === undefined) 1921 return; 1922 1923 reader[kState].close.reject(error); 1924 setPromiseHandled(reader[kState].close.promise); 1925 1926 if (readableStreamHasDefaultReader(stream)) { 1927 for (let n = 0; n < reader[kState].readRequests.length; n++) 1928 reader[kState].readRequests[n][kError](error); 1929 reader[kState].readRequests = []; 1930 } else { 1931 assert(readableStreamHasBYOBReader(stream)); 1932 for (let n = 0; n < reader[kState].readIntoRequests.length; n++) 1933 reader[kState].readIntoRequests[n][kError](error); 1934 reader[kState].readIntoRequests = []; 1935 } 1936} 1937 1938function readableStreamHasDefaultReader(stream) { 1939 const { 1940 reader, 1941 } = stream[kState]; 1942 1943 if (reader === undefined) 1944 return false; 1945 1946 return reader[kState] !== undefined && 1947 reader[kType] === 'ReadableStreamDefaultReader'; 1948} 1949 1950function readableStreamGetNumReadRequests(stream) { 1951 assert(readableStreamHasDefaultReader(stream)); 1952 return stream[kState].reader[kState].readRequests.length; 1953} 1954 1955function readableStreamHasBYOBReader(stream) { 1956 const { 1957 reader, 1958 } = stream[kState]; 1959 1960 if (reader === undefined) 1961 return false; 1962 1963 return reader[kState] !== undefined && 1964 reader[kType] === 'ReadableStreamBYOBReader'; 1965} 1966 1967function readableStreamGetNumReadIntoRequests(stream) { 1968 assert(readableStreamHasBYOBReader(stream)); 1969 return stream[kState].reader[kState].readIntoRequests.length; 1970} 1971 1972function readableStreamFulfillReadRequest(stream, chunk, done) { 1973 assert(readableStreamHasDefaultReader(stream)); 1974 const { 1975 reader, 1976 } = stream[kState]; 1977 assert(reader[kState].readRequests.length); 1978 const readRequest = ArrayPrototypeShift(reader[kState].readRequests); 1979 1980 // TODO(@jasnell): It's not clear under what exact conditions done 1981 // will be true here. The spec requires this check but none of the 1982 // WPT's or other tests trigger it. Will need to investigate how to 1983 // get coverage for this. 1984 if (done) 1985 readRequest[kClose](); 1986 else 1987 readRequest[kChunk](chunk); 1988} 1989 1990function readableStreamFulfillReadIntoRequest(stream, chunk, done) { 1991 assert(readableStreamHasBYOBReader(stream)); 1992 const { 1993 reader, 1994 } = stream[kState]; 1995 assert(reader[kState].readIntoRequests.length); 1996 const readIntoRequest = ArrayPrototypeShift(reader[kState].readIntoRequests); 1997 if (done) 1998 readIntoRequest[kClose](chunk); 1999 else 2000 readIntoRequest[kChunk](chunk); 2001} 2002 2003function readableStreamAddReadRequest(stream, readRequest) { 2004 assert(readableStreamHasDefaultReader(stream)); 2005 assert(stream[kState].state === 'readable'); 2006 ArrayPrototypePush(stream[kState].reader[kState].readRequests, readRequest); 2007} 2008 2009function readableStreamAddReadIntoRequest(stream, readIntoRequest) { 2010 assert(readableStreamHasBYOBReader(stream)); 2011 assert(stream[kState].state !== 'errored'); 2012 ArrayPrototypePush( 2013 stream[kState].reader[kState].readIntoRequests, 2014 readIntoRequest); 2015} 2016 2017function readableStreamReaderGenericCancel(reader, reason) { 2018 const { 2019 stream, 2020 } = reader[kState]; 2021 assert(stream !== undefined); 2022 return readableStreamCancel(stream, reason); 2023} 2024 2025function readableStreamReaderGenericInitialize(reader, stream) { 2026 reader[kState].stream = stream; 2027 stream[kState].reader = reader; 2028 switch (stream[kState].state) { 2029 case 'readable': 2030 reader[kState].close = createDeferredPromise(); 2031 break; 2032 case 'closed': 2033 reader[kState].close = { 2034 promise: PromiseResolve(), 2035 resolve: undefined, 2036 reject: undefined, 2037 }; 2038 break; 2039 case 'errored': 2040 reader[kState].close = { 2041 promise: PromiseReject(stream[kState].storedError), 2042 resolve: undefined, 2043 reject: undefined, 2044 }; 2045 setPromiseHandled(reader[kState].close.promise); 2046 break; 2047 } 2048} 2049 2050function readableStreamDefaultReaderRelease(reader) { 2051 readableStreamReaderGenericRelease(reader); 2052 readableStreamDefaultReaderErrorReadRequests( 2053 reader, 2054 lazyReadableReleasingError(), 2055 ); 2056} 2057 2058function readableStreamDefaultReaderErrorReadRequests(reader, e) { 2059 for (let n = 0; n < reader[kState].readRequests.length; ++n) { 2060 reader[kState].readRequests[n][kError](e); 2061 } 2062 reader[kState].readRequests = []; 2063} 2064 2065function readableStreamBYOBReaderRelease(reader) { 2066 readableStreamReaderGenericRelease(reader); 2067 readableStreamBYOBReaderErrorReadIntoRequests( 2068 reader, 2069 lazyReadableReleasingError(), 2070 ); 2071} 2072 2073function readableStreamBYOBReaderErrorReadIntoRequests(reader, e) { 2074 for (let n = 0; n < reader[kState].readIntoRequests.length; ++n) { 2075 reader[kState].readIntoRequests[n][kError](e); 2076 } 2077 reader[kState].readIntoRequests = []; 2078} 2079 2080function readableStreamReaderGenericRelease(reader) { 2081 const { 2082 stream, 2083 } = reader[kState]; 2084 assert(stream !== undefined); 2085 assert(stream[kState].reader === reader); 2086 2087 const releasedStateError = lazyReadableReleasedError(); 2088 if (stream[kState].state === 'readable') { 2089 reader[kState].close.reject?.(releasedStateError); 2090 } else { 2091 reader[kState].close = { 2092 promise: PromiseReject(releasedStateError), 2093 resolve: undefined, 2094 reject: undefined, 2095 }; 2096 } 2097 setPromiseHandled(reader[kState].close.promise); 2098 2099 stream[kState].controller[kRelease](); 2100 2101 stream[kState].reader = undefined; 2102 reader[kState].stream = undefined; 2103} 2104 2105function readableStreamBYOBReaderRead(reader, view, readIntoRequest) { 2106 const { 2107 stream, 2108 } = reader[kState]; 2109 assert(stream !== undefined); 2110 stream[kState].disturbed = true; 2111 if (stream[kState].state === 'errored') { 2112 readIntoRequest[kError](stream[kState].storedError); 2113 return; 2114 } 2115 readableByteStreamControllerPullInto( 2116 stream[kState].controller, 2117 view, 2118 readIntoRequest); 2119} 2120 2121function readableStreamDefaultReaderRead(reader, readRequest) { 2122 const { 2123 stream, 2124 } = reader[kState]; 2125 assert(stream !== undefined); 2126 stream[kState].disturbed = true; 2127 switch (stream[kState].state) { 2128 case 'closed': 2129 readRequest[kClose](); 2130 break; 2131 case 'errored': 2132 readRequest[kError](stream[kState].storedError); 2133 break; 2134 case 'readable': 2135 stream[kState].controller[kPull](readRequest); 2136 } 2137} 2138 2139function setupReadableStreamBYOBReader(reader, stream) { 2140 if (isReadableStreamLocked(stream)) 2141 throw new ERR_INVALID_STATE.TypeError('ReadableStream is locked'); 2142 const { 2143 controller, 2144 } = stream[kState]; 2145 if (!isReadableByteStreamController(controller)) 2146 throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be a byte stream'); 2147 readableStreamReaderGenericInitialize(reader, stream); 2148 reader[kState].readIntoRequests = []; 2149} 2150 2151function setupReadableStreamDefaultReader(reader, stream) { 2152 if (isReadableStreamLocked(stream)) 2153 throw new ERR_INVALID_STATE.TypeError('ReadableStream is locked'); 2154 readableStreamReaderGenericInitialize(reader, stream); 2155 reader[kState].readRequests = []; 2156} 2157 2158function readableStreamDefaultControllerClose(controller) { 2159 if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) 2160 return; 2161 controller[kState].closeRequested = true; 2162 if (!controller[kState].queue.length) { 2163 readableStreamDefaultControllerClearAlgorithms(controller); 2164 readableStreamClose(controller[kState].stream); 2165 } 2166} 2167 2168function readableStreamDefaultControllerEnqueue(controller, chunk) { 2169 if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) 2170 return; 2171 2172 const { 2173 stream, 2174 } = controller[kState]; 2175 2176 if (isReadableStreamLocked(stream) && 2177 readableStreamGetNumReadRequests(stream)) { 2178 readableStreamFulfillReadRequest(stream, chunk, false); 2179 } else { 2180 try { 2181 const chunkSize = 2182 FunctionPrototypeCall( 2183 controller[kState].sizeAlgorithm, 2184 undefined, 2185 chunk); 2186 enqueueValueWithSize(controller, chunk, chunkSize); 2187 } catch (error) { 2188 readableStreamDefaultControllerError(controller, error); 2189 throw error; 2190 } 2191 } 2192 readableStreamDefaultControllerCallPullIfNeeded(controller); 2193} 2194 2195function readableStreamDefaultControllerHasBackpressure(controller) { 2196 return !readableStreamDefaultControllerShouldCallPull(controller); 2197} 2198 2199function readableStreamDefaultControllerCanCloseOrEnqueue(controller) { 2200 const { 2201 stream, 2202 } = controller[kState]; 2203 return !controller[kState].closeRequested && 2204 stream[kState].state === 'readable'; 2205} 2206 2207function readableStreamDefaultControllerGetDesiredSize(controller) { 2208 const { 2209 stream, 2210 highWaterMark, 2211 queueTotalSize, 2212 } = controller[kState]; 2213 switch (stream[kState].state) { 2214 case 'errored': return null; 2215 case 'closed': return 0; 2216 default: 2217 return highWaterMark - queueTotalSize; 2218 } 2219} 2220 2221function readableStreamDefaultControllerShouldCallPull(controller) { 2222 const { 2223 stream, 2224 } = controller[kState]; 2225 if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller) || 2226 !controller[kState].started) 2227 return false; 2228 2229 if (isReadableStreamLocked(stream) && 2230 readableStreamGetNumReadRequests(stream)) { 2231 return true; 2232 } 2233 2234 const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller); 2235 assert(desiredSize !== null); 2236 2237 return desiredSize > 0; 2238} 2239 2240function readableStreamDefaultControllerCallPullIfNeeded(controller) { 2241 if (!readableStreamDefaultControllerShouldCallPull(controller)) 2242 return; 2243 if (controller[kState].pulling) { 2244 controller[kState].pullAgain = true; 2245 return; 2246 } 2247 assert(!controller[kState].pullAgain); 2248 controller[kState].pulling = true; 2249 PromisePrototypeThen( 2250 ensureIsPromise(controller[kState].pullAlgorithm, controller), 2251 () => { 2252 controller[kState].pulling = false; 2253 if (controller[kState].pullAgain) { 2254 controller[kState].pullAgain = false; 2255 readableStreamDefaultControllerCallPullIfNeeded(controller); 2256 } 2257 }, 2258 (error) => readableStreamDefaultControllerError(controller, error)); 2259} 2260 2261function readableStreamDefaultControllerClearAlgorithms(controller) { 2262 controller[kState].pullAlgorithm = undefined; 2263 controller[kState].cancelAlgorithm = undefined; 2264 controller[kState].sizeAlgorithm = undefined; 2265} 2266 2267function readableStreamDefaultControllerError(controller, error) { 2268 const { 2269 stream, 2270 } = controller[kState]; 2271 if (stream[kState].state === 'readable') { 2272 resetQueue(controller); 2273 readableStreamDefaultControllerClearAlgorithms(controller); 2274 readableStreamError(stream, error); 2275 } 2276} 2277 2278function readableStreamDefaultControllerCancelSteps(controller, reason) { 2279 resetQueue(controller); 2280 try { 2281 const result = controller[kState].cancelAlgorithm(reason); 2282 return result; 2283 } finally { 2284 readableStreamDefaultControllerClearAlgorithms(controller); 2285 } 2286} 2287 2288function readableStreamDefaultControllerPullSteps(controller, readRequest) { 2289 const { 2290 stream, 2291 queue, 2292 } = controller[kState]; 2293 if (queue.length) { 2294 const chunk = dequeueValue(controller); 2295 if (controller[kState].closeRequested && !queue.length) { 2296 readableStreamDefaultControllerClearAlgorithms(controller); 2297 readableStreamClose(stream); 2298 } else { 2299 readableStreamDefaultControllerCallPullIfNeeded(controller); 2300 } 2301 readRequest[kChunk](chunk); 2302 return; 2303 } 2304 readableStreamAddReadRequest(stream, readRequest); 2305 readableStreamDefaultControllerCallPullIfNeeded(controller); 2306} 2307 2308function setupReadableStreamDefaultController( 2309 stream, 2310 controller, 2311 startAlgorithm, 2312 pullAlgorithm, 2313 cancelAlgorithm, 2314 highWaterMark, 2315 sizeAlgorithm) { 2316 assert(stream[kState].controller === undefined); 2317 controller[kState] = { 2318 cancelAlgorithm, 2319 closeRequested: false, 2320 highWaterMark, 2321 pullAgain: false, 2322 pullAlgorithm, 2323 pulling: false, 2324 queue: [], 2325 queueTotalSize: 0, 2326 started: false, 2327 sizeAlgorithm, 2328 stream, 2329 }; 2330 stream[kState].controller = controller; 2331 stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller); 2332 2333 const startResult = startAlgorithm(); 2334 2335 PromisePrototypeThen( 2336 PromiseResolve(startResult), 2337 () => { 2338 controller[kState].started = true; 2339 assert(!controller[kState].pulling); 2340 assert(!controller[kState].pullAgain); 2341 readableStreamDefaultControllerCallPullIfNeeded(controller); 2342 }, 2343 (error) => readableStreamDefaultControllerError(controller, error)); 2344} 2345 2346function setupReadableStreamDefaultControllerFromSource( 2347 stream, 2348 source, 2349 highWaterMark, 2350 sizeAlgorithm) { 2351 const controller = new ReadableStreamDefaultController(kSkipThrow); 2352 const start = source?.start; 2353 const pull = source?.pull; 2354 const cancel = source?.cancel; 2355 const startAlgorithm = start ? 2356 FunctionPrototypeBind(start, source, controller) : 2357 nonOpStart; 2358 const pullAlgorithm = pull ? 2359 FunctionPrototypeBind(pull, source, controller) : 2360 nonOpPull; 2361 2362 const cancelAlgorithm = cancel ? 2363 FunctionPrototypeBind(cancel, source) : 2364 nonOpCancel; 2365 2366 setupReadableStreamDefaultController( 2367 stream, 2368 controller, 2369 startAlgorithm, 2370 pullAlgorithm, 2371 cancelAlgorithm, 2372 highWaterMark, 2373 sizeAlgorithm); 2374} 2375 2376function readableByteStreamControllerClose(controller) { 2377 const { 2378 closeRequested, 2379 pendingPullIntos, 2380 queueTotalSize, 2381 stream, 2382 } = controller[kState]; 2383 2384 if (closeRequested || stream[kState].state !== 'readable') 2385 return; 2386 2387 if (queueTotalSize) { 2388 controller[kState].closeRequested = true; 2389 return; 2390 } 2391 2392 if (pendingPullIntos.length) { 2393 const firstPendingPullInto = pendingPullIntos[0]; 2394 if (firstPendingPullInto.bytesFilled > 0) { 2395 const error = new ERR_INVALID_STATE.TypeError('Partial read'); 2396 readableByteStreamControllerError(controller, error); 2397 throw error; 2398 } 2399 } 2400 2401 readableByteStreamControllerClearAlgorithms(controller); 2402 readableStreamClose(stream); 2403} 2404 2405function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) { 2406 assert(stream[kState].state !== 'errored'); 2407 assert(desc.type !== 'none'); 2408 2409 let done = false; 2410 if (stream[kState].state === 'closed') { 2411 desc.bytesFilled = 0; 2412 done = true; 2413 } 2414 2415 const filledView = 2416 readableByteStreamControllerConvertPullIntoDescriptor(desc); 2417 2418 if (desc.type === 'default') { 2419 readableStreamFulfillReadRequest(stream, filledView, done); 2420 } else { 2421 assert(desc.type === 'byob'); 2422 readableStreamFulfillReadIntoRequest(stream, filledView, done); 2423 } 2424} 2425 2426function readableByteStreamControllerInvalidateBYOBRequest(controller) { 2427 if (controller[kState].byobRequest === null) 2428 return; 2429 controller[kState].byobRequest[kState].controller = undefined; 2430 controller[kState].byobRequest[kState].view = null; 2431 controller[kState].byobRequest = null; 2432} 2433 2434function readableByteStreamControllerClearAlgorithms(controller) { 2435 controller[kState].pullAlgorithm = undefined; 2436 controller[kState].cancelAlgorithm = undefined; 2437} 2438 2439function readableByteStreamControllerClearPendingPullIntos(controller) { 2440 readableByteStreamControllerInvalidateBYOBRequest(controller); 2441 controller[kState].pendingPullIntos = []; 2442} 2443 2444function readableByteStreamControllerGetDesiredSize(controller) { 2445 const { 2446 stream, 2447 highWaterMark, 2448 queueTotalSize, 2449 } = controller[kState]; 2450 switch (stream[kState].state) { 2451 case 'errored': return null; 2452 case 'closed': return 0; 2453 default: return highWaterMark - queueTotalSize; 2454 } 2455} 2456 2457function readableByteStreamControllerShouldCallPull(controller) { 2458 const { 2459 stream, 2460 } = controller[kState]; 2461 if (stream[kState].state !== 'readable' || 2462 controller[kState].closeRequested || 2463 !controller[kState].started) { 2464 return false; 2465 } 2466 if (readableStreamHasDefaultReader(stream) && 2467 readableStreamGetNumReadRequests(stream) > 0) { 2468 return true; 2469 } 2470 2471 if (readableStreamHasBYOBReader(stream) && 2472 readableStreamGetNumReadIntoRequests(stream) > 0) { 2473 return true; 2474 } 2475 2476 const desiredSize = readableByteStreamControllerGetDesiredSize(controller); 2477 assert(desiredSize !== null); 2478 2479 return desiredSize > 0; 2480} 2481 2482function readableByteStreamControllerHandleQueueDrain(controller) { 2483 const { 2484 closeRequested, 2485 queueTotalSize, 2486 stream, 2487 } = controller[kState]; 2488 assert(stream[kState].state === 'readable'); 2489 if (!queueTotalSize && closeRequested) { 2490 readableByteStreamControllerClearAlgorithms(controller); 2491 readableStreamClose(stream); 2492 return; 2493 } 2494 readableByteStreamControllerCallPullIfNeeded(controller); 2495} 2496 2497function readableByteStreamControllerPullInto( 2498 controller, 2499 view, 2500 readIntoRequest) { 2501 const { 2502 closeRequested, 2503 stream, 2504 pendingPullIntos, 2505 } = controller[kState]; 2506 let elementSize = 1; 2507 let ctor = DataView; 2508 if (isArrayBufferView(view) && !isDataView(view)) { 2509 elementSize = view.constructor.BYTES_PER_ELEMENT; 2510 ctor = view.constructor; 2511 } 2512 const buffer = ArrayBufferViewGetBuffer(view); 2513 const byteOffset = ArrayBufferViewGetByteOffset(view); 2514 const byteLength = ArrayBufferViewGetByteLength(view); 2515 const bufferByteLength = ArrayBufferPrototypeGetByteLength(buffer); 2516 2517 let transferredBuffer; 2518 try { 2519 transferredBuffer = transferArrayBuffer(buffer); 2520 } catch (error) { 2521 readIntoRequest[kError](error); 2522 return; 2523 } 2524 const desc = { 2525 buffer: transferredBuffer, 2526 bufferByteLength, 2527 byteOffset, 2528 byteLength, 2529 bytesFilled: 0, 2530 elementSize, 2531 ctor, 2532 type: 'byob', 2533 }; 2534 if (pendingPullIntos.length) { 2535 ArrayPrototypePush(pendingPullIntos, desc); 2536 readableStreamAddReadIntoRequest(stream, readIntoRequest); 2537 return; 2538 } 2539 if (stream[kState].state === 'closed') { 2540 const emptyView = new ctor(desc.buffer, byteOffset, 0); 2541 readIntoRequest[kClose](emptyView); 2542 return; 2543 } 2544 if (controller[kState].queueTotalSize) { 2545 if (readableByteStreamControllerFillPullIntoDescriptorFromQueue( 2546 controller, 2547 desc)) { 2548 const filledView = 2549 readableByteStreamControllerConvertPullIntoDescriptor(desc); 2550 readableByteStreamControllerHandleQueueDrain(controller); 2551 readIntoRequest[kChunk](filledView); 2552 return; 2553 } 2554 if (closeRequested) { 2555 const error = new ERR_INVALID_STATE.TypeError('ReadableStream closed'); 2556 readableByteStreamControllerError(controller, error); 2557 readIntoRequest[kError](error); 2558 return; 2559 } 2560 } 2561 ArrayPrototypePush(pendingPullIntos, desc); 2562 readableStreamAddReadIntoRequest(stream, readIntoRequest); 2563 readableByteStreamControllerCallPullIfNeeded(controller); 2564} 2565 2566function readableByteStreamControllerRespondInternal(controller, bytesWritten) { 2567 const { 2568 stream, 2569 pendingPullIntos, 2570 } = controller[kState]; 2571 const desc = pendingPullIntos[0]; 2572 readableByteStreamControllerInvalidateBYOBRequest(controller); 2573 if (stream[kState].state === 'closed') { 2574 if (bytesWritten) 2575 throw new ERR_INVALID_STATE.TypeError( 2576 'Controller is closed but view is not zero-length'); 2577 readableByteStreamControllerRespondInClosedState(controller, desc); 2578 } else { 2579 assert(stream[kState].state === 'readable'); 2580 if (!bytesWritten) 2581 throw new ERR_INVALID_STATE.TypeError('View cannot be zero-length'); 2582 readableByteStreamControllerRespondInReadableState( 2583 controller, 2584 bytesWritten, 2585 desc); 2586 } 2587 readableByteStreamControllerCallPullIfNeeded(controller); 2588} 2589 2590function readableByteStreamControllerRespond(controller, bytesWritten) { 2591 const { 2592 pendingPullIntos, 2593 stream, 2594 } = controller[kState]; 2595 assert(pendingPullIntos.length); 2596 const desc = pendingPullIntos[0]; 2597 2598 if (stream[kState].state === 'closed') { 2599 if (bytesWritten !== 0) 2600 throw new ERR_INVALID_ARG_VALUE('bytesWritten', bytesWritten); 2601 } else { 2602 assert(stream[kState].state === 'readable'); 2603 2604 if (!bytesWritten) 2605 throw new ERR_INVALID_ARG_VALUE('bytesWritten', bytesWritten); 2606 2607 if ((desc.bytesFilled + bytesWritten) > desc.byteLength) 2608 throw new ERR_INVALID_ARG_VALUE.RangeError('bytesWritten', bytesWritten); 2609 } 2610 2611 desc.buffer = transferArrayBuffer(desc.buffer); 2612 2613 readableByteStreamControllerRespondInternal(controller, bytesWritten); 2614} 2615 2616function readableByteStreamControllerRespondInClosedState(controller, desc) { 2617 assert(!desc.bytesFilled); 2618 if (desc.type === 'none') { 2619 readableByteStreamControllerShiftPendingPullInto(controller); 2620 } 2621 const { 2622 stream, 2623 } = controller[kState]; 2624 if (readableStreamHasBYOBReader(stream)) { 2625 while (readableStreamGetNumReadIntoRequests(stream) > 0) { 2626 readableByteStreamControllerCommitPullIntoDescriptor( 2627 stream, 2628 readableByteStreamControllerShiftPendingPullInto(controller)); 2629 } 2630 } 2631} 2632 2633function readableByteStreamControllerFillHeadPullIntoDescriptor( 2634 controller, 2635 size, 2636 desc) { 2637 const { 2638 pendingPullIntos, 2639 byobRequest, 2640 } = controller[kState]; 2641 assert(!pendingPullIntos.length || pendingPullIntos[0] === desc); 2642 assert(byobRequest === null); 2643 desc.bytesFilled += size; 2644} 2645 2646function readableByteStreamControllerEnqueue(controller, chunk) { 2647 const { 2648 closeRequested, 2649 pendingPullIntos, 2650 queue, 2651 stream, 2652 } = controller[kState]; 2653 2654 const buffer = ArrayBufferViewGetBuffer(chunk); 2655 const byteOffset = ArrayBufferViewGetByteOffset(chunk); 2656 const byteLength = ArrayBufferViewGetByteLength(chunk); 2657 2658 if (closeRequested || stream[kState].state !== 'readable') 2659 return; 2660 2661 const transferredBuffer = transferArrayBuffer(buffer); 2662 2663 if (pendingPullIntos.length) { 2664 const firstPendingPullInto = pendingPullIntos[0]; 2665 2666 if (isArrayBufferDetached(firstPendingPullInto.buffer)) { 2667 throw new ERR_INVALID_STATE.TypeError( 2668 'Destination ArrayBuffer is detached', 2669 ); 2670 } 2671 2672 readableByteStreamControllerInvalidateBYOBRequest(controller); 2673 2674 firstPendingPullInto.buffer = transferArrayBuffer( 2675 firstPendingPullInto.buffer, 2676 ); 2677 2678 if (firstPendingPullInto.type === 'none') { 2679 readableByteStreamControllerEnqueueDetachedPullIntoToQueue( 2680 controller, 2681 firstPendingPullInto, 2682 ); 2683 } 2684 } 2685 2686 if (readableStreamHasDefaultReader(stream)) { 2687 readableByteStreamControllerProcessReadRequestsUsingQueue(controller); 2688 if (!readableStreamGetNumReadRequests(stream)) { 2689 readableByteStreamControllerEnqueueChunkToQueue( 2690 controller, 2691 transferredBuffer, 2692 byteOffset, 2693 byteLength); 2694 } else { 2695 assert(!queue.length); 2696 if (pendingPullIntos.length) { 2697 assert(pendingPullIntos[0].type === 'default'); 2698 readableByteStreamControllerShiftPendingPullInto(controller); 2699 } 2700 const transferredView = 2701 new Uint8Array(transferredBuffer, byteOffset, byteLength); 2702 readableStreamFulfillReadRequest(stream, transferredView, false); 2703 } 2704 } else if (readableStreamHasBYOBReader(stream)) { 2705 readableByteStreamControllerEnqueueChunkToQueue( 2706 controller, 2707 transferredBuffer, 2708 byteOffset, 2709 byteLength); 2710 readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( 2711 controller); 2712 } else { 2713 assert(!isReadableStreamLocked(stream)); 2714 readableByteStreamControllerEnqueueChunkToQueue( 2715 controller, 2716 transferredBuffer, 2717 byteOffset, 2718 byteLength); 2719 } 2720 readableByteStreamControllerCallPullIfNeeded(controller); 2721} 2722 2723function readableByteStreamControllerEnqueueClonedChunkToQueue( 2724 controller, 2725 buffer, 2726 byteOffset, 2727 byteLength, 2728) { 2729 let cloneResult; 2730 try { 2731 cloneResult = ArrayBufferPrototypeSlice( 2732 buffer, 2733 byteOffset, 2734 byteOffset + byteLength, 2735 ); 2736 } catch (error) { 2737 readableByteStreamControllerError(controller, error); 2738 throw error; 2739 } 2740 readableByteStreamControllerEnqueueChunkToQueue( 2741 controller, 2742 cloneResult, 2743 0, 2744 byteLength, 2745 ); 2746} 2747 2748function readableByteStreamControllerEnqueueChunkToQueue( 2749 controller, 2750 buffer, 2751 byteOffset, 2752 byteLength) { 2753 ArrayPrototypePush( 2754 controller[kState].queue, 2755 { 2756 buffer, 2757 byteOffset, 2758 byteLength, 2759 }); 2760 controller[kState].queueTotalSize += byteLength; 2761} 2762 2763function readableByteStreamControllerEnqueueDetachedPullIntoToQueue( 2764 controller, 2765 desc, 2766) { 2767 const { 2768 buffer, 2769 byteOffset, 2770 bytesFilled, 2771 type, 2772 } = desc; 2773 assert(type === 'none'); 2774 2775 if (bytesFilled > 0) { 2776 readableByteStreamControllerEnqueueClonedChunkToQueue( 2777 controller, 2778 buffer, 2779 byteOffset, 2780 bytesFilled, 2781 ); 2782 } 2783 readableByteStreamControllerShiftPendingPullInto(controller); 2784} 2785 2786function readableByteStreamControllerFillPullIntoDescriptorFromQueue( 2787 controller, 2788 desc) { 2789 const { 2790 buffer, 2791 byteLength, 2792 byteOffset, 2793 bytesFilled, 2794 elementSize, 2795 } = desc; 2796 const currentAlignedBytes = bytesFilled - (bytesFilled % elementSize); 2797 const maxBytesToCopy = MathMin( 2798 controller[kState].queueTotalSize, 2799 byteLength - bytesFilled); 2800 const maxBytesFilled = bytesFilled + maxBytesToCopy; 2801 const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize); 2802 let totalBytesToCopyRemaining = maxBytesToCopy; 2803 let ready = false; 2804 if (maxAlignedBytes > currentAlignedBytes) { 2805 totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled; 2806 ready = true; 2807 } 2808 const { 2809 queue, 2810 } = controller[kState]; 2811 2812 while (totalBytesToCopyRemaining) { 2813 const headOfQueue = queue[0]; 2814 const bytesToCopy = MathMin( 2815 totalBytesToCopyRemaining, 2816 headOfQueue.byteLength); 2817 const destStart = byteOffset + desc.bytesFilled; 2818 const arrayBufferByteLength = ArrayBufferPrototypeGetByteLength(buffer); 2819 if (arrayBufferByteLength - destStart < bytesToCopy) { 2820 throw new ERR_INVALID_STATE.RangeError( 2821 'view ArrayBuffer size is invalid'); 2822 } 2823 assert(arrayBufferByteLength - destStart >= bytesToCopy); 2824 copyArrayBuffer( 2825 buffer, 2826 destStart, 2827 headOfQueue.buffer, 2828 headOfQueue.byteOffset, 2829 bytesToCopy); 2830 if (headOfQueue.byteLength === bytesToCopy) { 2831 ArrayPrototypeShift(queue); 2832 } else { 2833 headOfQueue.byteOffset += bytesToCopy; 2834 headOfQueue.byteLength -= bytesToCopy; 2835 } 2836 controller[kState].queueTotalSize -= bytesToCopy; 2837 readableByteStreamControllerFillHeadPullIntoDescriptor( 2838 controller, 2839 bytesToCopy, 2840 desc); 2841 totalBytesToCopyRemaining -= bytesToCopy; 2842 } 2843 2844 if (!ready) { 2845 assert(!controller[kState].queueTotalSize); 2846 assert(desc.bytesFilled > 0); 2847 assert(desc.bytesFilled < elementSize); 2848 } 2849 return ready; 2850} 2851 2852function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( 2853 controller) { 2854 const { 2855 closeRequested, 2856 pendingPullIntos, 2857 stream, 2858 } = controller[kState]; 2859 assert(!closeRequested); 2860 while (pendingPullIntos.length) { 2861 if (!controller[kState].queueTotalSize) 2862 return; 2863 const desc = pendingPullIntos[0]; 2864 if (readableByteStreamControllerFillPullIntoDescriptorFromQueue( 2865 controller, 2866 desc)) { 2867 readableByteStreamControllerShiftPendingPullInto(controller); 2868 readableByteStreamControllerCommitPullIntoDescriptor(stream, desc); 2869 } 2870 } 2871} 2872 2873function readableByteStreamControllerRespondInReadableState( 2874 controller, 2875 bytesWritten, 2876 desc) { 2877 const { 2878 buffer, 2879 bytesFilled, 2880 byteLength, 2881 type, 2882 } = desc; 2883 2884 if (bytesFilled + bytesWritten > byteLength) 2885 throw new ERR_INVALID_STATE.RangeError('The buffer size is invalid'); 2886 2887 readableByteStreamControllerFillHeadPullIntoDescriptor( 2888 controller, 2889 bytesWritten, 2890 desc); 2891 2892 if (type === 'none') { 2893 readableByteStreamControllerEnqueueDetachedPullIntoToQueue( 2894 controller, 2895 desc, 2896 ); 2897 readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( 2898 controller, 2899 ); 2900 return; 2901 } 2902 2903 if (desc.bytesFilled < desc.elementSize) 2904 return; 2905 2906 readableByteStreamControllerShiftPendingPullInto(controller); 2907 2908 const remainderSize = desc.bytesFilled % desc.elementSize; 2909 2910 if (remainderSize) { 2911 const end = desc.byteOffset + desc.bytesFilled; 2912 const start = end - remainderSize; 2913 const remainder = 2914 ArrayBufferPrototypeSlice( 2915 buffer, 2916 start, 2917 end); 2918 readableByteStreamControllerEnqueueChunkToQueue( 2919 controller, 2920 remainder, 2921 0, 2922 ArrayBufferPrototypeGetByteLength(remainder)); 2923 } 2924 desc.bytesFilled -= remainderSize; 2925 readableByteStreamControllerCommitPullIntoDescriptor( 2926 controller[kState].stream, 2927 desc); 2928 readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller); 2929} 2930 2931function readableByteStreamControllerRespondWithNewView(controller, view) { 2932 const { 2933 stream, 2934 pendingPullIntos, 2935 } = controller[kState]; 2936 assert(pendingPullIntos.length); 2937 2938 const desc = pendingPullIntos[0]; 2939 assert(stream[kState].state !== 'errored'); 2940 2941 const viewByteLength = ArrayBufferViewGetByteLength(view); 2942 const viewByteOffset = ArrayBufferViewGetByteOffset(view); 2943 const viewBuffer = ArrayBufferViewGetBuffer(view); 2944 const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer); 2945 2946 if (stream[kState].state === 'closed') { 2947 if (viewByteLength !== 0) 2948 throw new ERR_INVALID_STATE.TypeError('View is not zero-length'); 2949 } else { 2950 assert(stream[kState].state === 'readable'); 2951 if (viewByteLength === 0) 2952 throw new ERR_INVALID_STATE.TypeError('View is zero-length'); 2953 } 2954 2955 const { 2956 byteOffset, 2957 byteLength, 2958 bytesFilled, 2959 bufferByteLength, 2960 } = desc; 2961 2962 if (byteOffset + bytesFilled !== viewByteOffset) 2963 throw new ERR_INVALID_ARG_VALUE.RangeError('view', view); 2964 2965 if (bytesFilled + viewByteLength > byteLength) 2966 throw new ERR_INVALID_ARG_VALUE.RangeError('view', view); 2967 2968 if (bufferByteLength !== viewBufferByteLength) 2969 throw new ERR_INVALID_ARG_VALUE.RangeError('view', view); 2970 2971 desc.buffer = transferArrayBuffer(viewBuffer); 2972 2973 readableByteStreamControllerRespondInternal(controller, viewByteLength); 2974} 2975 2976function readableByteStreamControllerShiftPendingPullInto(controller) { 2977 assert(controller[kState].byobRequest === null); 2978 return ArrayPrototypeShift(controller[kState].pendingPullIntos); 2979} 2980 2981function readableByteStreamControllerCallPullIfNeeded(controller) { 2982 if (!readableByteStreamControllerShouldCallPull(controller)) 2983 return; 2984 if (controller[kState].pulling) { 2985 controller[kState].pullAgain = true; 2986 return; 2987 } 2988 assert(!controller[kState].pullAgain); 2989 controller[kState].pulling = true; 2990 PromisePrototypeThen( 2991 ensureIsPromise(controller[kState].pullAlgorithm, controller), 2992 () => { 2993 controller[kState].pulling = false; 2994 if (controller[kState].pullAgain) { 2995 controller[kState].pullAgain = false; 2996 readableByteStreamControllerCallPullIfNeeded(controller); 2997 } 2998 }, 2999 (error) => readableByteStreamControllerError(controller, error)); 3000} 3001 3002function readableByteStreamControllerError(controller, error) { 3003 const { 3004 stream, 3005 } = controller[kState]; 3006 if (stream[kState].state !== 'readable') 3007 return; 3008 readableByteStreamControllerClearPendingPullIntos(controller); 3009 resetQueue(controller); 3010 readableByteStreamControllerClearAlgorithms(controller); 3011 readableStreamError(stream, error); 3012} 3013 3014function readableByteStreamControllerCancelSteps(controller, reason) { 3015 readableByteStreamControllerClearPendingPullIntos(controller); 3016 resetQueue(controller); 3017 const result = controller[kState].cancelAlgorithm(reason); 3018 readableByteStreamControllerClearAlgorithms(controller); 3019 return result; 3020} 3021 3022function readableByteStreamControllerFillReadRequestFromQueue(controller, readRequest) { 3023 const { 3024 queue, 3025 queueTotalSize, 3026 } = controller[kState]; 3027 assert(queueTotalSize > 0); 3028 const { 3029 buffer, 3030 byteOffset, 3031 byteLength, 3032 } = ArrayPrototypeShift(queue); 3033 3034 controller[kState].queueTotalSize -= byteLength; 3035 readableByteStreamControllerHandleQueueDrain(controller); 3036 const view = new Uint8Array(buffer, byteOffset, byteLength); 3037 readRequest[kChunk](view); 3038} 3039 3040function readableByteStreamControllerProcessReadRequestsUsingQueue(controller) { 3041 const { 3042 stream, 3043 queueTotalSize, 3044 } = controller[kState]; 3045 const { reader } = stream[kState]; 3046 assert(isReadableStreamDefaultReader(reader)); 3047 3048 while (reader[kState].readRequests.length > 0) { 3049 if (queueTotalSize === 0) { 3050 return; 3051 } 3052 readableByteStreamControllerFillReadRequestFromQueue( 3053 controller, 3054 ArrayPrototypeShift(reader[kState].readRequests), 3055 ); 3056 } 3057} 3058 3059function readableByteStreamControllerPullSteps(controller, readRequest) { 3060 const { 3061 pendingPullIntos, 3062 queueTotalSize, 3063 stream, 3064 } = controller[kState]; 3065 assert(readableStreamHasDefaultReader(stream)); 3066 if (queueTotalSize) { 3067 assert(!readableStreamGetNumReadRequests(stream)); 3068 readableByteStreamControllerFillReadRequestFromQueue( 3069 controller, 3070 readRequest, 3071 ); 3072 return; 3073 } 3074 const { 3075 autoAllocateChunkSize, 3076 } = controller[kState]; 3077 if (autoAllocateChunkSize !== undefined) { 3078 try { 3079 const buffer = new ArrayBuffer(autoAllocateChunkSize); 3080 ArrayPrototypePush( 3081 pendingPullIntos, 3082 { 3083 buffer, 3084 bufferByteLength: autoAllocateChunkSize, 3085 byteOffset: 0, 3086 byteLength: autoAllocateChunkSize, 3087 bytesFilled: 0, 3088 elementSize: 1, 3089 ctor: Uint8Array, 3090 type: 'default', 3091 }); 3092 } catch (error) { 3093 readRequest[kError](error); 3094 return; 3095 } 3096 } 3097 3098 readableStreamAddReadRequest(stream, readRequest); 3099 readableByteStreamControllerCallPullIfNeeded(controller); 3100} 3101 3102function setupReadableByteStreamController( 3103 stream, 3104 controller, 3105 startAlgorithm, 3106 pullAlgorithm, 3107 cancelAlgorithm, 3108 highWaterMark, 3109 autoAllocateChunkSize) { 3110 assert(stream[kState].controller === undefined); 3111 if (autoAllocateChunkSize !== undefined) { 3112 assert(NumberIsInteger(autoAllocateChunkSize)); 3113 assert(autoAllocateChunkSize > 0); 3114 } 3115 controller[kState] = { 3116 byobRequest: null, 3117 closeRequested: false, 3118 pullAgain: false, 3119 pulling: false, 3120 started: false, 3121 stream, 3122 queue: [], 3123 queueTotalSize: 0, 3124 highWaterMark, 3125 pullAlgorithm, 3126 cancelAlgorithm, 3127 autoAllocateChunkSize, 3128 pendingPullIntos: [], 3129 }; 3130 stream[kState].controller = controller; 3131 3132 const startResult = startAlgorithm(); 3133 3134 PromisePrototypeThen( 3135 PromiseResolve(startResult), 3136 () => { 3137 controller[kState].started = true; 3138 assert(!controller[kState].pulling); 3139 assert(!controller[kState].pullAgain); 3140 readableByteStreamControllerCallPullIfNeeded(controller); 3141 }, 3142 (error) => readableByteStreamControllerError(controller, error)); 3143} 3144 3145function setupReadableByteStreamControllerFromSource( 3146 stream, 3147 source, 3148 highWaterMark) { 3149 const controller = new ReadableByteStreamController(kSkipThrow); 3150 const start = source?.start; 3151 const pull = source?.pull; 3152 const cancel = source?.cancel; 3153 const autoAllocateChunkSize = source?.autoAllocateChunkSize; 3154 const startAlgorithm = start ? 3155 FunctionPrototypeBind(start, source, controller) : 3156 nonOpStart; 3157 const pullAlgorithm = pull ? 3158 FunctionPrototypeBind(pull, source, controller) : 3159 nonOpPull; 3160 const cancelAlgorithm = cancel ? 3161 FunctionPrototypeBind(cancel, source) : 3162 nonOpCancel; 3163 3164 if (autoAllocateChunkSize === 0) { 3165 throw new ERR_INVALID_ARG_VALUE( 3166 'source.autoAllocateChunkSize', 3167 autoAllocateChunkSize); 3168 } 3169 setupReadableByteStreamController( 3170 stream, 3171 controller, 3172 startAlgorithm, 3173 pullAlgorithm, 3174 cancelAlgorithm, 3175 highWaterMark, 3176 autoAllocateChunkSize); 3177} 3178 3179module.exports = { 3180 ReadableStream, 3181 ReadableStreamDefaultReader, 3182 ReadableStreamBYOBReader, 3183 ReadableStreamBYOBRequest, 3184 ReadableByteStreamController, 3185 ReadableStreamDefaultController, 3186 TransferredReadableStream, 3187 3188 // Exported Brand Checks 3189 isReadableStream, 3190 isReadableByteStreamController, 3191 isReadableStreamBYOBRequest, 3192 isReadableStreamDefaultReader, 3193 isReadableStreamBYOBReader, 3194 isWritableStreamDefaultWriter, 3195 isWritableStreamDefaultController, 3196 3197 readableStreamPipeTo, 3198 readableStreamTee, 3199 readableByteStreamControllerConvertPullIntoDescriptor, 3200 isReadableStreamLocked, 3201 readableStreamCancel, 3202 readableStreamClose, 3203 readableStreamError, 3204 readableStreamHasDefaultReader, 3205 readableStreamGetNumReadRequests, 3206 readableStreamHasBYOBReader, 3207 readableStreamGetNumReadIntoRequests, 3208 readableStreamFulfillReadRequest, 3209 readableStreamFulfillReadIntoRequest, 3210 readableStreamAddReadRequest, 3211 readableStreamAddReadIntoRequest, 3212 readableStreamReaderGenericCancel, 3213 readableStreamReaderGenericInitialize, 3214 readableStreamReaderGenericRelease, 3215 readableStreamBYOBReaderRead, 3216 readableStreamDefaultReaderRead, 3217 setupReadableStreamBYOBReader, 3218 setupReadableStreamDefaultReader, 3219 readableStreamDefaultControllerClose, 3220 readableStreamDefaultControllerEnqueue, 3221 readableStreamDefaultControllerHasBackpressure, 3222 readableStreamDefaultControllerCanCloseOrEnqueue, 3223 readableStreamDefaultControllerGetDesiredSize, 3224 readableStreamDefaultControllerShouldCallPull, 3225 readableStreamDefaultControllerCallPullIfNeeded, 3226 readableStreamDefaultControllerClearAlgorithms, 3227 readableStreamDefaultControllerError, 3228 readableStreamDefaultControllerCancelSteps, 3229 readableStreamDefaultControllerPullSteps, 3230 setupReadableStreamDefaultController, 3231 setupReadableStreamDefaultControllerFromSource, 3232 readableByteStreamControllerClose, 3233 readableByteStreamControllerCommitPullIntoDescriptor, 3234 readableByteStreamControllerInvalidateBYOBRequest, 3235 readableByteStreamControllerClearAlgorithms, 3236 readableByteStreamControllerClearPendingPullIntos, 3237 readableByteStreamControllerGetDesiredSize, 3238 readableByteStreamControllerShouldCallPull, 3239 readableByteStreamControllerHandleQueueDrain, 3240 readableByteStreamControllerPullInto, 3241 readableByteStreamControllerRespondInternal, 3242 readableByteStreamControllerRespond, 3243 readableByteStreamControllerRespondInClosedState, 3244 readableByteStreamControllerFillHeadPullIntoDescriptor, 3245 readableByteStreamControllerEnqueue, 3246 readableByteStreamControllerEnqueueChunkToQueue, 3247 readableByteStreamControllerFillPullIntoDescriptorFromQueue, 3248 readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue, 3249 readableByteStreamControllerRespondInReadableState, 3250 readableByteStreamControllerRespondWithNewView, 3251 readableByteStreamControllerShiftPendingPullInto, 3252 readableByteStreamControllerCallPullIfNeeded, 3253 readableByteStreamControllerError, 3254 readableByteStreamControllerCancelSteps, 3255 readableByteStreamControllerPullSteps, 3256 setupReadableByteStreamController, 3257 setupReadableByteStreamControllerFromSource, 3258}; 3259