• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayBuffer,
5  ArrayBufferPrototypeGetByteLength,
6  ArrayBufferPrototypeSlice,
7  ArrayPrototypePush,
8  ArrayPrototypeShift,
9  DataView,
10  FunctionPrototypeBind,
11  FunctionPrototypeCall,
12  MathMin,
13  NumberIsInteger,
14  ObjectCreate,
15  ObjectDefineProperties,
16  ObjectSetPrototypeOf,
17  Promise,
18  PromisePrototypeThen,
19  PromiseResolve,
20  PromiseReject,
21  ReflectConstruct,
22  SafePromiseAll,
23  Symbol,
24  SymbolAsyncIterator,
25  SymbolDispose,
26  SymbolToStringTag,
27  Uint8Array,
28} = primordials;
29
30const {
31  AbortError,
32  codes: {
33    ERR_ILLEGAL_CONSTRUCTOR,
34    ERR_INVALID_ARG_VALUE,
35    ERR_INVALID_ARG_TYPE,
36    ERR_INVALID_STATE,
37    ERR_INVALID_THIS,
38  },
39} = require('internal/errors');
40
41const {
42  DOMException,
43} = internalBinding('messaging');
44
45const {
46  isArrayBufferView,
47  isDataView,
48} = require('internal/util/types');
49
50const {
51  createDeferredPromise,
52  customInspectSymbol: kInspect,
53  isArrayBufferDetached,
54  kEmptyObject,
55  kEnumerableProperty,
56  SideEffectFreeRegExpPrototypeSymbolReplace,
57} = require('internal/util');
58
59const {
60  validateAbortSignal,
61  validateBuffer,
62  validateObject,
63} = require('internal/validators');
64
65const {
66  MessageChannel,
67} = require('internal/worker/io');
68
69const {
70  kDeserialize,
71  kTransfer,
72  kTransferList,
73  makeTransferable,
74} = require('internal/worker/js_transferable');
75
76const {
77  queueMicrotask,
78} = require('internal/process/task_queues');
79
80const {
81  kIsDisturbed,
82  kIsErrored,
83  kIsReadable,
84  kIsClosedPromise,
85  kControllerErrorFunction,
86} = require('internal/streams/utils');
87
88const {
89  structuredClone,
90} = require('internal/structured_clone');
91
92const {
93  ArrayBufferViewGetBuffer,
94  ArrayBufferViewGetByteLength,
95  ArrayBufferViewGetByteOffset,
96  AsyncIterator,
97  cloneAsUint8Array,
98  copyArrayBuffer,
99  customInspect,
100  dequeueValue,
101  ensureIsPromise,
102  enqueueValueWithSize,
103  extractHighWaterMark,
104  extractSizeAlgorithm,
105  lazyTransfer,
106  isViewedArrayBufferDetached,
107  isBrandCheck,
108  resetQueue,
109  setPromiseHandled,
110  transferArrayBuffer,
111  nonOpCancel,
112  nonOpPull,
113  nonOpStart,
114  kType,
115  kState,
116} = require('internal/webstreams/util');
117
118const {
119  WritableStreamDefaultWriter,
120
121  isWritableStream,
122  isWritableStreamLocked,
123  isWritableStreamDefaultController,
124  isWritableStreamDefaultWriter,
125
126  writableStreamAbort,
127  writableStreamCloseQueuedOrInFlight,
128  writableStreamDefaultWriterCloseWithErrorPropagation,
129  writableStreamDefaultWriterRelease,
130  writableStreamDefaultWriterWrite,
131} = require('internal/webstreams/writablestream');
132
133const { Buffer } = require('buffer');
134
135const assert = require('internal/assert');
136
137const kCancel = Symbol('kCancel');
138const kClose = Symbol('kClose');
139const kChunk = Symbol('kChunk');
140const kError = Symbol('kError');
141const kPull = Symbol('kPull');
142const kRelease = Symbol('kRelease');
143const kSkipThrow = Symbol('kSkipThrow');
144
145let releasedError;
146let releasingError;
147let addAbortListener;
148
149const userModuleRegExp = /^ {4}at (?:[^/\\(]+ \()(?!node:(.+):\d+:\d+\)$).*/gm;
150
151function lazyReadableReleasedError() {
152  if (releasedError) {
153    return releasedError;
154  }
155
156  releasedError = new ERR_INVALID_STATE.TypeError('Reader released');
157  // Avoid V8 leak and remove userland stackstrace
158  releasedError.stack = SideEffectFreeRegExpPrototypeSymbolReplace(userModuleRegExp, releasedError.stack, '');
159  return releasedError;
160}
161
162function lazyReadableReleasingError() {
163  if (releasingError) {
164    return releasingError;
165  }
166  releasingError = new ERR_INVALID_STATE.TypeError('Releasing reader');
167  // Avoid V8 leak and remove userland stackstrace
168  releasingError.stack = SideEffectFreeRegExpPrototypeSymbolReplace(userModuleRegExp, releasingError.stack, '');
169  return releasingError;
170}
171
172const getNonWritablePropertyDescriptor = (value) => {
173  return {
174    __proto__: null,
175    configurable: true,
176    value,
177  };
178};
179
180/**
181 * @typedef {import('../abort_controller').AbortSignal} AbortSignal
182 * @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
183 * @typedef {import('./queuingstrategies').QueuingStrategySize
184 * } QueuingStrategySize
185 * @typedef {import('./writablestream').WritableStream} WritableStream
186 */
187
188/**
189 * @typedef {ReadableStreamDefaultController | ReadableByteStreamController
190 * } ReadableStreamController
191 */
192
193/**
194 * @typedef {ReadableStreamDefaultReader | ReadableStreamBYOBReader
195 * } ReadableStreamReader
196 */
197
198/**
199 * @callback UnderlyingSourceStartCallback
200 * @param {ReadableStreamController} controller
201 * @returns { any | Promise<void> }
202 */
203
204/**
205 * @callback UnderlyingSourcePullCallback
206 * @param {ReadableStreamController} controller
207 * @returns { Promise<void> }
208 */
209
210/**
211 * @callback UnderlyingSourceCancelCallback
212 * @param {any} reason
213 * @returns { Promise<void> }
214 */
215
216/**
217 * @typedef {{
218 *   readable: ReadableStream,
219 *   writable: WritableStream,
220 * }} ReadableWritablePair
221 */
222
223/**
224 * @typedef {{
225 *   preventClose? : boolean,
226 *   preventAbort? : boolean,
227 *   preventCancel? : boolean,
228 *   signal? : AbortSignal,
229 * }} StreamPipeOptions
230 */
231
232/**
233 * @typedef {{
234 *   start? : UnderlyingSourceStartCallback,
235 *   pull? : UnderlyingSourcePullCallback,
236 *   cancel? : UnderlyingSourceCancelCallback,
237 *   type? : "bytes",
238 *   autoAllocateChunkSize? : number
239 * }} UnderlyingSource
240 */
241
242class ReadableStream {
243  [kType] = 'ReadableStream';
244
245  /**
246   * @param {UnderlyingSource} [source]
247   * @param {QueuingStrategy} [strategy]
248   */
249  constructor(source = {}, strategy = kEmptyObject) {
250    if (source === null)
251      throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
252    this[kState] = {
253      disturbed: false,
254      reader: undefined,
255      state: 'readable',
256      storedError: undefined,
257      stream: undefined,
258      transfer: {
259        writable: undefined,
260        port1: undefined,
261        port2: undefined,
262        promise: undefined,
263      },
264    };
265
266    this[kIsClosedPromise] = createDeferredPromise();
267    this[kControllerErrorFunction] = () => {};
268
269    // The spec requires handling of the strategy first
270    // here. Specifically, if getting the size and
271    // highWaterMark from the strategy fail, that has
272    // to trigger a throw before getting the details
273    // from the source. So be sure to keep these in
274    // this order.
275    const size = strategy?.size;
276    const highWaterMark = strategy?.highWaterMark;
277    const type = source.type;
278
279    if (`${type}` === 'bytes') {
280      if (size !== undefined)
281        throw new ERR_INVALID_ARG_VALUE.RangeError('strategy.size', size);
282      setupReadableByteStreamControllerFromSource(
283        this,
284        source,
285        extractHighWaterMark(highWaterMark, 0));
286    } else {
287      if (type !== undefined)
288        throw new ERR_INVALID_ARG_VALUE('source.type', type);
289      setupReadableStreamDefaultControllerFromSource(
290        this,
291        source,
292        extractHighWaterMark(highWaterMark, 1),
293        extractSizeAlgorithm(size));
294    }
295
296    // eslint-disable-next-line no-constructor-return
297    return makeTransferable(this);
298  }
299
300  get [kIsDisturbed]() {
301    return this[kState].disturbed;
302  }
303
304  get [kIsErrored]() {
305    return this[kState].state === 'errored';
306  }
307
308  get [kIsReadable]() {
309    return this[kState].state === 'readable';
310  }
311
312  /**
313   * @readonly
314   * @type {boolean}
315   */
316  get locked() {
317    if (!isReadableStream(this))
318      throw new ERR_INVALID_THIS('ReadableStream');
319    return isReadableStreamLocked(this);
320  }
321
322  /**
323   * @param {any} [reason]
324   * @returns { Promise<void> }
325   */
326  cancel(reason = undefined) {
327    if (!isReadableStream(this))
328      return PromiseReject(new ERR_INVALID_THIS('ReadableStream'));
329    if (isReadableStreamLocked(this)) {
330      return PromiseReject(
331        new ERR_INVALID_STATE.TypeError('ReadableStream is locked'));
332    }
333    return readableStreamCancel(this, reason);
334  }
335
336  /**
337   * @param {{
338   *   mode? : "byob"
339   * }} [options]
340   * @returns {ReadableStreamReader}
341   */
342  getReader(options = kEmptyObject) {
343    if (!isReadableStream(this))
344      throw new ERR_INVALID_THIS('ReadableStream');
345    validateObject(options, 'options', { nullable: true, allowFunction: true });
346    const mode = options?.mode;
347
348    if (mode === undefined)
349      // eslint-disable-next-line no-use-before-define
350      return new ReadableStreamDefaultReader(this);
351
352    if (`${mode}` !== 'byob')
353      throw new ERR_INVALID_ARG_VALUE('options.mode', mode);
354    // eslint-disable-next-line no-use-before-define
355    return new ReadableStreamBYOBReader(this);
356  }
357
358  /**
359   * @param {ReadableWritablePair} transform
360   * @param {StreamPipeOptions} [options]
361   * @returns {ReadableStream}
362   */
363  pipeThrough(transform, options = kEmptyObject) {
364    if (!isReadableStream(this))
365      throw new ERR_INVALID_THIS('ReadableStream');
366    const readable = transform?.readable;
367    if (!isReadableStream(readable)) {
368      throw new ERR_INVALID_ARG_TYPE(
369        'transform.readable',
370        'ReadableStream',
371        readable);
372    }
373    const writable = transform?.writable;
374    if (!isWritableStream(writable)) {
375      throw new ERR_INVALID_ARG_TYPE(
376        'transform.writable',
377        'WritableStream',
378        writable);
379    }
380
381    // The web platform tests require that these be handled one at a
382    // time and in a specific order. options can be null or undefined.
383    const preventAbort = options?.preventAbort;
384    const preventCancel = options?.preventCancel;
385    const preventClose = options?.preventClose;
386    const signal = options?.signal;
387
388    if (signal !== undefined) {
389      validateAbortSignal(signal, 'options.signal');
390    }
391
392    if (isReadableStreamLocked(this))
393      throw new ERR_INVALID_STATE.TypeError('The ReadableStream is locked');
394    if (isWritableStreamLocked(writable))
395      throw new ERR_INVALID_STATE.TypeError('The WritableStream is locked');
396
397    const promise = readableStreamPipeTo(
398      this,
399      writable,
400      !!preventClose,
401      !!preventAbort,
402      !!preventCancel,
403      signal);
404    setPromiseHandled(promise);
405
406    return readable;
407  }
408
409  /**
410   * @param {WritableStream} destination
411   * @param {StreamPipeOptions} [options]
412   * @returns {Promise<void>}
413   */
414  pipeTo(destination, options = kEmptyObject) {
415    try {
416      if (!isReadableStream(this))
417        throw new ERR_INVALID_THIS('ReadableStream');
418      if (!isWritableStream(destination)) {
419        throw new ERR_INVALID_ARG_TYPE(
420          'transform.writable',
421          'WritableStream',
422          destination);
423      }
424
425      const preventAbort = options?.preventAbort;
426      const preventCancel = options?.preventCancel;
427      const preventClose = options?.preventClose;
428      const signal = options?.signal;
429
430      if (signal !== undefined) {
431        validateAbortSignal(signal, 'options.signal');
432      }
433
434      if (isReadableStreamLocked(this))
435        throw new ERR_INVALID_STATE.TypeError('The ReadableStream is locked');
436      if (isWritableStreamLocked(destination))
437        throw new ERR_INVALID_STATE.TypeError('The WritableStream is locked');
438
439      return readableStreamPipeTo(
440        this,
441        destination,
442        !!preventClose,
443        !!preventAbort,
444        !!preventCancel,
445        signal);
446    } catch (error) {
447      return PromiseReject(error);
448    }
449  }
450
451  /**
452   * @returns {ReadableStream[]}
453   */
454  tee() {
455    if (!isReadableStream(this))
456      throw new ERR_INVALID_THIS('ReadableStream');
457    return readableStreamTee(this, false);
458  }
459
460  /**
461   * @param {{
462   *   preventCancel? : boolean,
463   * }} [options]
464   * @returns {AsyncIterable}
465   */
466  values(options = kEmptyObject) {
467    if (!isReadableStream(this))
468      throw new ERR_INVALID_THIS('ReadableStream');
469    validateObject(options, 'options');
470    const {
471      preventCancel = false,
472    } = options;
473
474    // eslint-disable-next-line no-use-before-define
475    const reader = new ReadableStreamDefaultReader(this);
476    let done = false;
477    let started = false;
478    let current;
479
480    // The nextSteps function is not an async function in order
481    // to make it more efficient. Because nextSteps explicitly
482    // creates a Promise and returns it in the common case,
483    // making it an async function just causes two additional
484    // unnecessary Promise allocations to occur, which just add
485    // cost.
486    function nextSteps() {
487      if (done)
488        return PromiseResolve({ done: true, value: undefined });
489
490      if (reader[kState].stream === undefined) {
491        return PromiseReject(
492          new ERR_INVALID_STATE.TypeError(
493            'The reader is not bound to a ReadableStream'));
494      }
495      const promise = createDeferredPromise();
496
497      readableStreamDefaultReaderRead(reader, {
498        [kChunk](chunk) {
499          current = undefined;
500          promise.resolve({ value: chunk, done: false });
501        },
502        [kClose]() {
503          current = undefined;
504          done = true;
505          readableStreamReaderGenericRelease(reader);
506          promise.resolve({ done: true, value: undefined });
507        },
508        [kError](error) {
509          current = undefined;
510          done = true;
511          readableStreamReaderGenericRelease(reader);
512          promise.reject(error);
513        },
514      });
515      return promise.promise;
516    }
517
518    async function returnSteps(value) {
519      if (done)
520        return { done: true, value };
521      done = true;
522
523      if (reader[kState].stream === undefined) {
524        throw new ERR_INVALID_STATE.TypeError(
525          'The reader is not bound to a ReadableStream');
526      }
527      assert(!reader[kState].readRequests.length);
528      if (!preventCancel) {
529        const result = readableStreamReaderGenericCancel(reader, value);
530        readableStreamReaderGenericRelease(reader);
531        await result;
532        return { done: true, value };
533      }
534
535      readableStreamReaderGenericRelease(reader);
536      return { done: true, value };
537    }
538
539    // TODO(@jasnell): Explore whether an async generator
540    // can be used here instead of a custom iterator object.
541    return ObjectSetPrototypeOf({
542      // Changing either of these functions (next or return)
543      // to async functions causes a failure in the streams
544      // Web Platform Tests that check for use of a modified
545      // Promise.prototype.then. Since the await keyword
546      // uses Promise.prototype.then, it is open to prototype
547      // pollution, which causes the test to fail. The other
548      // await uses here do not trigger that failure because
549      // the test that fails does not trigger those code paths.
550      next() {
551        // If this is the first read, delay by one microtask
552        // to ensure that the controller has had an opportunity
553        // to properly start and perform the initial pull.
554        // TODO(@jasnell): The spec doesn't call this out so
555        // need to investigate if it's a bug in our impl or
556        // the spec.
557        if (!started) {
558          current = PromiseResolve();
559          started = true;
560        }
561        current = current !== undefined ?
562          PromisePrototypeThen(current, nextSteps, nextSteps) :
563          nextSteps();
564        return current;
565      },
566
567      return(error) {
568        return current ?
569          PromisePrototypeThen(
570            current,
571            () => returnSteps(error),
572            () => returnSteps(error)) :
573          returnSteps(error);
574      },
575
576      [SymbolAsyncIterator]() { return this; },
577    }, AsyncIterator);
578  }
579
580  [kInspect](depth, options) {
581    return customInspect(depth, options, this[kType], {
582      locked: this.locked,
583      state: this[kState].state,
584      supportsBYOB:
585        // eslint-disable-next-line no-use-before-define
586        this[kState].controller instanceof ReadableByteStreamController,
587    });
588  }
589
590  [kTransfer]() {
591    if (!isReadableStream(this))
592      throw new ERR_INVALID_THIS('ReadableStream');
593    if (this.locked) {
594      this[kState].transfer.port1?.close();
595      this[kState].transfer.port1 = undefined;
596      this[kState].transfer.port2 = undefined;
597      throw new DOMException(
598        'Cannot transfer a locked ReadableStream',
599        'DataCloneError');
600    }
601
602    const {
603      writable,
604      promise,
605    } = lazyTransfer().newCrossRealmWritableSink(
606      this,
607      this[kState].transfer.port1);
608
609    this[kState].transfer.writable = writable;
610    this[kState].transfer.promise = promise;
611
612    return {
613      data: { port: this[kState].transfer.port2 },
614      deserializeInfo:
615        'internal/webstreams/readablestream:TransferredReadableStream',
616    };
617  }
618
619  [kTransferList]() {
620    const { port1, port2 } = new MessageChannel();
621    this[kState].transfer.port1 = port1;
622    this[kState].transfer.port2 = port2;
623    return [ port2 ];
624  }
625
626  [kDeserialize]({ port }) {
627    const transfer = lazyTransfer();
628    setupReadableStreamDefaultControllerFromSource(
629      this,
630      new transfer.CrossRealmTransformReadableSource(port),
631      0, () => 1);
632  }
633}
634
635ObjectDefineProperties(ReadableStream.prototype, {
636  [SymbolAsyncIterator]: {
637    __proto__: null,
638    configurable: true,
639    enumerable: false,
640    writable: true,
641    value: ReadableStream.prototype.values,
642  },
643  locked: kEnumerableProperty,
644  cancel: kEnumerableProperty,
645  getReader: kEnumerableProperty,
646  pipeThrough: kEnumerableProperty,
647  pipeTo: kEnumerableProperty,
648  tee: kEnumerableProperty,
649  [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStream.name),
650});
651
652function TransferredReadableStream() {
653  return makeTransferable(ReflectConstruct(
654    function() {
655      this[kType] = 'ReadableStream';
656      this[kState] = {
657        disturbed: false,
658        state: 'readable',
659        storedError: undefined,
660        stream: undefined,
661        transfer: {
662          writable: undefined,
663          port: undefined,
664          promise: undefined,
665        },
666      };
667      this[kIsClosedPromise] = createDeferredPromise();
668    },
669    [], ReadableStream));
670}
671TransferredReadableStream.prototype[kDeserialize] = () => {};
672
673class ReadableStreamBYOBRequest {
674  [kType] = 'ReadableStreamBYOBRequest';
675
676  constructor(skipThrowSymbol = undefined) {
677    if (skipThrowSymbol !== kSkipThrow) {
678      throw new ERR_ILLEGAL_CONSTRUCTOR();
679    }
680  }
681
682  /**
683   * @readonly
684   * @type {ArrayBufferView}
685   */
686  get view() {
687    if (!isReadableStreamBYOBRequest(this))
688      throw new ERR_INVALID_THIS('ReadableStreamBYOBRequest');
689    return this[kState].view;
690  }
691
692  /**
693   * @param {number} bytesWritten
694   */
695  respond(bytesWritten) {
696    if (!isReadableStreamBYOBRequest(this))
697      throw new ERR_INVALID_THIS('ReadableStreamBYOBRequest');
698    const {
699      view,
700      controller,
701    } = this[kState];
702    if (controller === undefined) {
703      throw new ERR_INVALID_STATE.TypeError(
704        'This BYOB request has been invalidated');
705    }
706
707    const viewByteLength = ArrayBufferViewGetByteLength(view);
708    const viewBuffer = ArrayBufferViewGetBuffer(view);
709    const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer);
710
711    if (isArrayBufferDetached(viewBuffer)) {
712      throw new ERR_INVALID_STATE.TypeError('Viewed ArrayBuffer is detached');
713    }
714
715    assert(viewByteLength > 0);
716    assert(viewBufferByteLength > 0);
717
718    readableByteStreamControllerRespond(controller, bytesWritten);
719  }
720
721  /**
722   * @param {ArrayBufferView} view
723   */
724  respondWithNewView(view) {
725    if (!isReadableStreamBYOBRequest(this))
726      throw new ERR_INVALID_THIS('ReadableStreamBYOBRequest');
727    const {
728      controller,
729    } = this[kState];
730
731    if (controller === undefined) {
732      throw new ERR_INVALID_STATE.TypeError(
733        'This BYOB request has been invalidated');
734    }
735
736    validateBuffer(view, 'view');
737
738    if (isViewedArrayBufferDetached(view)) {
739      throw new ERR_INVALID_STATE.TypeError('Viewed ArrayBuffer is detached');
740    }
741
742    readableByteStreamControllerRespondWithNewView(controller, view);
743  }
744
745  [kInspect](depth, options) {
746    return customInspect(depth, options, this[kType], {
747      view: this.view,
748      controller: this[kState].controller,
749    });
750  }
751}
752
753ObjectDefineProperties(ReadableStreamBYOBRequest.prototype, {
754  view: kEnumerableProperty,
755  respond: kEnumerableProperty,
756  respondWithNewView: kEnumerableProperty,
757  [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStreamBYOBRequest.name),
758});
759
760function createReadableStreamBYOBRequest(controller, view) {
761  const stream = new ReadableStreamBYOBRequest(kSkipThrow);
762
763  stream[kState] = {
764    controller,
765    view,
766  };
767
768  return stream;
769}
770
771class DefaultReadRequest {
772  constructor() {
773    this[kState] = createDeferredPromise();
774  }
775
776  [kChunk](value) {
777    this[kState].resolve?.({ value, done: false });
778  }
779
780  [kClose]() {
781    this[kState].resolve?.({ value: undefined, done: true });
782  }
783
784  [kError](error) {
785    this[kState].reject?.(error);
786  }
787
788  get promise() { return this[kState].promise; }
789}
790
791class ReadIntoRequest {
792  constructor() {
793    this[kState] = createDeferredPromise();
794  }
795
796  [kChunk](value) {
797    this[kState].resolve?.({ value, done: false });
798  }
799
800  [kClose](value) {
801    this[kState].resolve?.({ value, done: true });
802  }
803
804  [kError](error) {
805    this[kState].reject?.(error);
806  }
807
808  get promise() { return this[kState].promise; }
809}
810
811class ReadableStreamDefaultReader {
812  [kType] = 'ReadableStreamDefaultReader';
813
814  /**
815   * @param {ReadableStream} stream
816   */
817  constructor(stream) {
818    if (!isReadableStream(stream))
819      throw new ERR_INVALID_ARG_TYPE('stream', 'ReadableStream', stream);
820    this[kState] = {
821      readRequests: [],
822      stream: undefined,
823      close: {
824        promise: undefined,
825        resolve: undefined,
826        reject: undefined,
827      },
828    };
829    setupReadableStreamDefaultReader(this, stream);
830  }
831
832  /**
833   * @returns {Promise<{
834   *   value : any,
835   *   done : boolean
836   * }>}
837   */
838  read() {
839    if (!isReadableStreamDefaultReader(this))
840      return PromiseReject(new ERR_INVALID_THIS('ReadableStreamDefaultReader'));
841    if (this[kState].stream === undefined) {
842      return PromiseReject(
843        new ERR_INVALID_STATE.TypeError(
844          'The reader is not attached to a stream'));
845    }
846    const readRequest = new DefaultReadRequest();
847    readableStreamDefaultReaderRead(this, readRequest);
848    return readRequest.promise;
849  }
850
851  releaseLock() {
852    if (!isReadableStreamDefaultReader(this))
853      throw new ERR_INVALID_THIS('ReadableStreamDefaultReader');
854    if (this[kState].stream === undefined)
855      return;
856    readableStreamDefaultReaderRelease(this);
857  }
858
859  /**
860   * @readonly
861   * @type {Promise<void>}
862   */
863  get closed() {
864    if (!isReadableStreamDefaultReader(this))
865      return PromiseReject(new ERR_INVALID_THIS('ReadableStreamDefaultReader'));
866    return this[kState].close.promise;
867  }
868
869  /**
870   * @param {any} [reason]
871   * @returns {Promise<void>}
872   */
873  cancel(reason = undefined) {
874    if (!isReadableStreamDefaultReader(this))
875      return PromiseReject(new ERR_INVALID_THIS('ReadableStreamDefaultReader'));
876    if (this[kState].stream === undefined) {
877      return PromiseReject(new ERR_INVALID_STATE.TypeError(
878        'The reader is not attached to a stream'));
879    }
880    return readableStreamReaderGenericCancel(this, reason);
881  }
882
883  [kInspect](depth, options) {
884    return customInspect(depth, options, this[kType], {
885      stream: this[kState].stream,
886      readRequests: this[kState].readRequests.length,
887      close: this[kState].close.promise,
888    });
889  }
890}
891
892ObjectDefineProperties(ReadableStreamDefaultReader.prototype, {
893  closed: kEnumerableProperty,
894  read: kEnumerableProperty,
895  releaseLock: kEnumerableProperty,
896  cancel: kEnumerableProperty,
897  [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStreamDefaultReader.name),
898});
899
900class ReadableStreamBYOBReader {
901  [kType] = 'ReadableStreamBYOBReader';
902
903  /**
904   * @param {ReadableStream} stream
905   */
906  constructor(stream) {
907    if (!isReadableStream(stream))
908      throw new ERR_INVALID_ARG_TYPE('stream', 'ReadableStream', stream);
909    this[kState] = {
910      stream: undefined,
911      requestIntoRequests: [],
912      close: {
913        promise: undefined,
914        resolve: undefined,
915        reject: undefined,
916      },
917    };
918    setupReadableStreamBYOBReader(this, stream);
919  }
920
921  /**
922   * @param {ArrayBufferView} view
923   * @returns {Promise<{
924   *   view : ArrayBufferView,
925   *   done : boolean,
926   * }>}
927   */
928  read(view) {
929    if (!isReadableStreamBYOBReader(this))
930      return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader'));
931    if (!isArrayBufferView(view)) {
932      return PromiseReject(
933        new ERR_INVALID_ARG_TYPE(
934          'view',
935          [
936            'Buffer',
937            'TypedArray',
938            'DataView',
939          ],
940          view));
941    }
942
943    const viewByteLength = ArrayBufferViewGetByteLength(view);
944    const viewBuffer = ArrayBufferViewGetBuffer(view);
945    const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer);
946
947    if (viewByteLength === 0 || viewBufferByteLength === 0) {
948      return PromiseReject(
949        new ERR_INVALID_STATE.TypeError(
950          'View or Viewed ArrayBuffer is zero-length or detached',
951        ),
952      );
953    }
954
955    // Supposed to assert here that the view's buffer is not
956    // detached, but there's no API available to use to check that.
957    if (this[kState].stream === undefined) {
958      return PromiseReject(
959        new ERR_INVALID_STATE.TypeError(
960          'The reader is not attached to a stream'));
961    }
962    const readIntoRequest = new ReadIntoRequest();
963    readableStreamBYOBReaderRead(this, view, readIntoRequest);
964    return readIntoRequest.promise;
965  }
966
967  releaseLock() {
968    if (!isReadableStreamBYOBReader(this))
969      throw new ERR_INVALID_THIS('ReadableStreamBYOBReader');
970    if (this[kState].stream === undefined)
971      return;
972    readableStreamBYOBReaderRelease(this);
973  }
974
975  /**
976   * @readonly
977   * @type {Promise<void>}
978   */
979  get closed() {
980    if (!isReadableStreamBYOBReader(this))
981      return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader'));
982    return this[kState].close.promise;
983  }
984
985  /**
986   * @param {any} [reason]
987   * @returns {Promise<void>}
988   */
989  cancel(reason = undefined) {
990    if (!isReadableStreamBYOBReader(this))
991      return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader'));
992    if (this[kState].stream === undefined) {
993      return PromiseReject(new ERR_INVALID_STATE.TypeError(
994        'The reader is not attached to a stream'));
995    }
996    return readableStreamReaderGenericCancel(this, reason);
997  }
998
999  [kInspect](depth, options) {
1000    return customInspect(depth, options, this[kType], {
1001      stream: this[kState].stream,
1002      requestIntoRequests: this[kState].requestIntoRequests.length,
1003      close: this[kState].close.promise,
1004    });
1005  }
1006}
1007
1008ObjectDefineProperties(ReadableStreamBYOBReader.prototype, {
1009  closed: kEnumerableProperty,
1010  read: kEnumerableProperty,
1011  releaseLock: kEnumerableProperty,
1012  cancel: kEnumerableProperty,
1013  [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStreamBYOBReader.name),
1014});
1015
1016class ReadableStreamDefaultController {
1017  [kType] = 'ReadableStreamDefaultController';
1018  [kState] = {};
1019
1020  constructor(skipThrowSymbol = undefined) {
1021    if (skipThrowSymbol !== kSkipThrow) {
1022      throw new ERR_ILLEGAL_CONSTRUCTOR();
1023    }
1024  }
1025
1026  /**
1027   * @readonly
1028   * @type {number}
1029   */
1030  get desiredSize() {
1031    return readableStreamDefaultControllerGetDesiredSize(this);
1032  }
1033
1034  close() {
1035    if (!readableStreamDefaultControllerCanCloseOrEnqueue(this))
1036      throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
1037    readableStreamDefaultControllerClose(this);
1038  }
1039
1040  /**
1041   * @param {any} [chunk]
1042   */
1043  enqueue(chunk = undefined) {
1044    if (!readableStreamDefaultControllerCanCloseOrEnqueue(this))
1045      throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
1046    readableStreamDefaultControllerEnqueue(this, chunk);
1047  }
1048
1049  /**
1050   * @param {any} [error]
1051   */
1052  error(error = undefined) {
1053    readableStreamDefaultControllerError(this, error);
1054  }
1055
1056  [kCancel](reason) {
1057    return readableStreamDefaultControllerCancelSteps(this, reason);
1058  }
1059
1060  [kPull](readRequest) {
1061    readableStreamDefaultControllerPullSteps(this, readRequest);
1062  }
1063
1064  [kRelease]() {}
1065
1066  [kInspect](depth, options) {
1067    return customInspect(depth, options, this[kType], { });
1068  }
1069}
1070
1071ObjectDefineProperties(ReadableStreamDefaultController.prototype, {
1072  desiredSize: kEnumerableProperty,
1073  close: kEnumerableProperty,
1074  enqueue: kEnumerableProperty,
1075  error: kEnumerableProperty,
1076  [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStreamDefaultController.name),
1077});
1078
1079class ReadableByteStreamController {
1080  [kType] = 'ReadableByteStreamController';
1081  [kState] = {};
1082
1083  constructor(skipThrowSymbol = undefined) {
1084    if (skipThrowSymbol !== kSkipThrow) {
1085      throw new ERR_ILLEGAL_CONSTRUCTOR();
1086    }
1087  }
1088
1089  /**
1090   * @readonly
1091   * @type {ReadableStreamBYOBRequest}
1092   */
1093  get byobRequest() {
1094    if (!isReadableByteStreamController(this))
1095      throw new ERR_INVALID_THIS('ReadableByteStreamController');
1096    if (this[kState].byobRequest === null &&
1097        this[kState].pendingPullIntos.length) {
1098      const {
1099        buffer,
1100        byteOffset,
1101        bytesFilled,
1102        byteLength,
1103      } = this[kState].pendingPullIntos[0];
1104      const view =
1105        new Uint8Array(
1106          buffer,
1107          byteOffset + bytesFilled,
1108          byteLength - bytesFilled);
1109      this[kState].byobRequest = createReadableStreamBYOBRequest(this, view);
1110    }
1111    return this[kState].byobRequest;
1112  }
1113
1114  /**
1115   * @readonly
1116   * @type {number}
1117   */
1118  get desiredSize() {
1119    if (!isReadableByteStreamController(this))
1120      throw new ERR_INVALID_THIS('ReadableByteStreamController');
1121    return readableByteStreamControllerGetDesiredSize(this);
1122  }
1123
1124  close() {
1125    if (!isReadableByteStreamController(this))
1126      throw new ERR_INVALID_THIS('ReadableByteStreamController');
1127    if (this[kState].closeRequested)
1128      throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
1129    if (this[kState].stream[kState].state !== 'readable')
1130      throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed');
1131    readableByteStreamControllerClose(this);
1132  }
1133
1134  /**
1135   * @param {ArrayBufferView} chunk
1136   */
1137  enqueue(chunk) {
1138    if (!isReadableByteStreamController(this))
1139      throw new ERR_INVALID_THIS('ReadableByteStreamController');
1140    validateBuffer(chunk);
1141    const chunkByteLength = ArrayBufferViewGetByteLength(chunk);
1142    const chunkBuffer = ArrayBufferViewGetBuffer(chunk);
1143    const chunkBufferByteLength = ArrayBufferPrototypeGetByteLength(chunkBuffer);
1144    if (chunkByteLength === 0 || chunkBufferByteLength === 0) {
1145      throw new ERR_INVALID_STATE.TypeError(
1146        'chunk ArrayBuffer is zero-length or detached');
1147    }
1148    if (this[kState].closeRequested)
1149      throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
1150    if (this[kState].stream[kState].state !== 'readable')
1151      throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed');
1152    readableByteStreamControllerEnqueue(this, chunk);
1153  }
1154
1155  /**
1156   * @param {any} [error]
1157   */
1158  error(error = undefined) {
1159    if (!isReadableByteStreamController(this))
1160      throw new ERR_INVALID_THIS('ReadableByteStreamController');
1161    readableByteStreamControllerError(this, error);
1162  }
1163
1164  [kCancel](reason) {
1165    return readableByteStreamControllerCancelSteps(this, reason);
1166  }
1167
1168  [kPull](readRequest) {
1169    readableByteStreamControllerPullSteps(this, readRequest);
1170  }
1171
1172  [kRelease]() {
1173    const {
1174      pendingPullIntos,
1175    } = this[kState];
1176    if (pendingPullIntos.length > 0) {
1177      const firstPendingPullInto = pendingPullIntos[0];
1178      firstPendingPullInto.type = 'none';
1179      this[kState].pendingPullIntos = [firstPendingPullInto];
1180    }
1181  }
1182
1183  [kInspect](depth, options) {
1184    return customInspect(depth, options, this[kType], { });
1185  }
1186}
1187
1188ObjectDefineProperties(ReadableByteStreamController.prototype, {
1189  byobRequest: kEnumerableProperty,
1190  desiredSize: kEnumerableProperty,
1191  close: kEnumerableProperty,
1192  enqueue: kEnumerableProperty,
1193  error: kEnumerableProperty,
1194  [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name),
1195});
1196
1197function createTeeReadableStream(start, pull, cancel) {
1198  return ReflectConstruct(
1199    function() {
1200      this[kType] = 'ReadableStream';
1201      this[kState] = {
1202        disturbed: false,
1203        state: 'readable',
1204        storedError: undefined,
1205        stream: undefined,
1206        transfer: {
1207          writable: undefined,
1208          port: undefined,
1209          promise: undefined,
1210        },
1211      };
1212      this[kIsClosedPromise] = createDeferredPromise();
1213      setupReadableStreamDefaultControllerFromSource(
1214        this,
1215        ObjectCreate(null, {
1216          start: { __proto__: null, value: start },
1217          pull: { __proto__: null, value: pull },
1218          cancel: { __proto__: null, value: cancel },
1219        }),
1220        1,
1221        () => 1);
1222      return makeTransferable(this);
1223    }, [], ReadableStream,
1224  );
1225}
1226
1227const isReadableStream =
1228  isBrandCheck('ReadableStream');
1229const isReadableByteStreamController =
1230  isBrandCheck('ReadableByteStreamController');
1231const isReadableStreamBYOBRequest =
1232  isBrandCheck('ReadableStreamBYOBRequest');
1233const isReadableStreamDefaultReader =
1234  isBrandCheck('ReadableStreamDefaultReader');
1235const isReadableStreamBYOBReader =
1236  isBrandCheck('ReadableStreamBYOBReader');
1237
1238// ---- ReadableStream Implementation
1239
1240function readableStreamPipeTo(
1241  source,
1242  dest,
1243  preventClose,
1244  preventAbort,
1245  preventCancel,
1246  signal) {
1247
1248  let reader;
1249  let writer;
1250  let disposable;
1251  // Both of these can throw synchronously. We want to capture
1252  // the error and return a rejected promise instead.
1253  try {
1254    reader = new ReadableStreamDefaultReader(source);
1255    writer = new WritableStreamDefaultWriter(dest);
1256  } catch (error) {
1257    return PromiseReject(error);
1258  }
1259
1260  source[kState].disturbed = true;
1261
1262  let shuttingDown = false;
1263
1264  if (signal !== undefined) {
1265    try {
1266      validateAbortSignal(signal, 'options.signal');
1267    } catch (error) {
1268      return PromiseReject(error);
1269    }
1270  }
1271
1272  const promise = createDeferredPromise();
1273
1274  let currentWrite = PromiseResolve();
1275
1276  // The error here can be undefined. The rejected arg
1277  // tells us that the promise must be rejected even
1278  // when error is undefine.
1279  function finalize(rejected, error) {
1280    writableStreamDefaultWriterRelease(writer);
1281    readableStreamReaderGenericRelease(reader);
1282    if (signal !== undefined)
1283      disposable?.[SymbolDispose]();
1284    if (rejected)
1285      promise.reject(error);
1286    else
1287      promise.resolve();
1288  }
1289
1290  async function waitForCurrentWrite() {
1291    const write = currentWrite;
1292    await write;
1293    if (write !== currentWrite)
1294      await waitForCurrentWrite();
1295  }
1296
1297  function shutdownWithAnAction(action, rejected, originalError) {
1298    if (shuttingDown) return;
1299    shuttingDown = true;
1300    if (dest[kState].state === 'writable' &&
1301        !writableStreamCloseQueuedOrInFlight(dest)) {
1302      PromisePrototypeThen(
1303        waitForCurrentWrite(),
1304        complete,
1305        (error) => finalize(true, error));
1306      return;
1307    }
1308    complete();
1309
1310    function complete() {
1311      PromisePrototypeThen(
1312        action(),
1313        () => finalize(rejected, originalError),
1314        (error) => finalize(true, error));
1315    }
1316  }
1317
1318  function shutdown(rejected, error) {
1319    if (shuttingDown) return;
1320    shuttingDown = true;
1321    if (dest[kState].state === 'writable' &&
1322        !writableStreamCloseQueuedOrInFlight(dest)) {
1323      PromisePrototypeThen(
1324        waitForCurrentWrite(),
1325        () => finalize(rejected, error),
1326        (error) => finalize(true, error));
1327      return;
1328    }
1329    finalize(rejected, error);
1330  }
1331
1332  function abortAlgorithm() {
1333    let error;
1334    if (signal.reason instanceof AbortError) {
1335      // Cannot use the AbortError class here. It must be a DOMException.
1336      error = new DOMException(signal.reason.message, 'AbortError');
1337    } else {
1338      error = signal.reason;
1339    }
1340
1341    const actions = [];
1342    if (!preventAbort) {
1343      ArrayPrototypePush(
1344        actions,
1345        () => {
1346          if (dest[kState].state === 'writable')
1347            return writableStreamAbort(dest, error);
1348          return PromiseResolve();
1349        });
1350    }
1351    if (!preventCancel) {
1352      ArrayPrototypePush(
1353        actions,
1354        () => {
1355          if (source[kState].state === 'readable')
1356            return readableStreamCancel(source, error);
1357          return PromiseResolve();
1358        });
1359    }
1360
1361    shutdownWithAnAction(
1362      () => SafePromiseAll(actions, (action) => action()),
1363      true,
1364      error);
1365  }
1366
1367  function watchErrored(stream, promise, action) {
1368    if (stream[kState].state === 'errored')
1369      action(stream[kState].storedError);
1370    else
1371      PromisePrototypeThen(promise, undefined, action);
1372  }
1373
1374  function watchClosed(stream, promise, action) {
1375    if (stream[kState].state === 'closed')
1376      action();
1377    else
1378      PromisePrototypeThen(promise, action, () => {});
1379  }
1380
1381  async function step() {
1382    if (shuttingDown)
1383      return true;
1384    await writer[kState].ready.promise;
1385    return new Promise((resolve, reject) => {
1386      readableStreamDefaultReaderRead(
1387        reader,
1388        {
1389          [kChunk](chunk) {
1390            currentWrite = writableStreamDefaultWriterWrite(writer, chunk);
1391            setPromiseHandled(currentWrite);
1392            resolve(false);
1393          },
1394          [kClose]: () => resolve(true),
1395          [kError]: reject,
1396        });
1397    });
1398  }
1399
1400  async function run() {
1401    // Run until step resolves as true
1402    while (!await step());
1403  }
1404
1405  if (signal !== undefined) {
1406    if (signal.aborted) {
1407      abortAlgorithm();
1408      return promise.promise;
1409    }
1410    addAbortListener ??= require('events').addAbortListener;
1411    disposable = addAbortListener(signal, abortAlgorithm);
1412  }
1413
1414  setPromiseHandled(run());
1415
1416  watchErrored(source, reader[kState].close.promise, (error) => {
1417    if (!preventAbort) {
1418      return shutdownWithAnAction(
1419        () => writableStreamAbort(dest, error),
1420        true,
1421        error);
1422    }
1423    shutdown(true, error);
1424  });
1425
1426  watchErrored(dest, writer[kState].close.promise, (error) => {
1427    if (!preventCancel) {
1428      return shutdownWithAnAction(
1429        () => readableStreamCancel(source, error),
1430        true,
1431        error);
1432    }
1433    shutdown(true, error);
1434  });
1435
1436  watchClosed(source, reader[kState].close.promise, () => {
1437    if (!preventClose) {
1438      return shutdownWithAnAction(
1439        () => writableStreamDefaultWriterCloseWithErrorPropagation(writer));
1440    }
1441    shutdown();
1442  });
1443
1444  if (writableStreamCloseQueuedOrInFlight(dest) ||
1445      dest[kState].state === 'closed') {
1446    const error = new ERR_INVALID_STATE.TypeError(
1447      'Destination WritableStream is closed');
1448    if (!preventCancel) {
1449      shutdownWithAnAction(
1450        () => readableStreamCancel(source, error), true, error);
1451    } else {
1452      shutdown(true, error);
1453    }
1454  }
1455
1456  return promise.promise;
1457}
1458
1459function readableStreamTee(stream, cloneForBranch2) {
1460  if (isReadableByteStreamController(stream[kState].controller)) {
1461    return readableByteStreamTee(stream);
1462  }
1463  return readableStreamDefaultTee(stream, cloneForBranch2);
1464}
1465
1466function readableStreamDefaultTee(stream, cloneForBranch2) {
1467  const reader = new ReadableStreamDefaultReader(stream);
1468  let reading = false;
1469  let canceled1 = false;
1470  let canceled2 = false;
1471  let reason1;
1472  let reason2;
1473  let branch1;
1474  let branch2;
1475  const cancelPromise = createDeferredPromise();
1476
1477  async function pullAlgorithm() {
1478    if (reading) return;
1479    reading = true;
1480    const readRequest = {
1481      [kChunk](value) {
1482        queueMicrotask(() => {
1483          reading = false;
1484          const value1 = value;
1485          let value2 = value;
1486          if (!canceled2 && cloneForBranch2) {
1487            value2 = structuredClone(value2);
1488          }
1489          if (!canceled1) {
1490            readableStreamDefaultControllerEnqueue(
1491              branch1[kState].controller,
1492              value1);
1493          }
1494          if (!canceled2) {
1495            readableStreamDefaultControllerEnqueue(
1496              branch2[kState].controller,
1497              value2);
1498          }
1499        });
1500      },
1501      [kClose]() {
1502        // The `process.nextTick()` is not part of the spec.
1503        // This approach was needed to avoid a race condition working with esm
1504        // Further information, see: https://github.com/nodejs/node/issues/39758
1505        process.nextTick(() => {
1506          reading = false;
1507          if (!canceled1)
1508            readableStreamDefaultControllerClose(branch1[kState].controller);
1509          if (!canceled2)
1510            readableStreamDefaultControllerClose(branch2[kState].controller);
1511          if (!canceled1 || !canceled2)
1512            cancelPromise.resolve();
1513        });
1514      },
1515      [kError]() {
1516        reading = false;
1517      },
1518    };
1519    readableStreamDefaultReaderRead(reader, readRequest);
1520  }
1521
1522  function cancel1Algorithm(reason) {
1523    canceled1 = true;
1524    reason1 = reason;
1525    if (canceled2) {
1526      const compositeReason = [reason1, reason2];
1527      cancelPromise.resolve(readableStreamCancel(stream, compositeReason));
1528    }
1529    return cancelPromise.promise;
1530  }
1531
1532  function cancel2Algorithm(reason) {
1533    canceled2 = true;
1534    reason2 = reason;
1535    if (canceled1) {
1536      const compositeReason = [reason1, reason2];
1537      cancelPromise.resolve(readableStreamCancel(stream, compositeReason));
1538    }
1539    return cancelPromise.promise;
1540  }
1541
1542  branch1 =
1543    createTeeReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm);
1544  branch2 =
1545    createTeeReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm);
1546
1547  PromisePrototypeThen(
1548    reader[kState].close.promise,
1549    undefined,
1550    (error) => {
1551      readableStreamDefaultControllerError(branch1[kState].controller, error);
1552      readableStreamDefaultControllerError(branch2[kState].controller, error);
1553      if (!canceled1 || !canceled2)
1554        cancelPromise.resolve();
1555    });
1556
1557  return [branch1, branch2];
1558}
1559
1560function readableByteStreamTee(stream) {
1561  assert(isReadableStream(stream));
1562  assert(isReadableByteStreamController(stream[kState].controller));
1563
1564  let reader = new ReadableStreamDefaultReader(stream);
1565  let reading = false;
1566  let readAgainForBranch1 = false;
1567  let readAgainForBranch2 = false;
1568  let canceled1 = false;
1569  let canceled2 = false;
1570  let reason1;
1571  let reason2;
1572  let branch1;
1573  let branch2;
1574  const cancelDeferred = createDeferredPromise();
1575
1576  function forwardReaderError(thisReader) {
1577    PromisePrototypeThen(
1578      thisReader[kState].close.promise,
1579      undefined,
1580      (error) => {
1581        if (thisReader !== reader) {
1582          return;
1583        }
1584        readableStreamDefaultControllerError(branch1[kState].controller, error);
1585        readableStreamDefaultControllerError(branch2[kState].controller, error);
1586        if (!canceled1 || !canceled2) {
1587          cancelDeferred.resolve();
1588        }
1589      },
1590    );
1591  }
1592
1593  function pullWithDefaultReader() {
1594    if (isReadableStreamBYOBReader(reader)) {
1595      readableStreamBYOBReaderRelease(reader);
1596      reader = new ReadableStreamDefaultReader(stream);
1597      forwardReaderError(reader);
1598    }
1599
1600    const readRequest = {
1601      [kChunk](chunk) {
1602        queueMicrotask(() => {
1603          readAgainForBranch1 = false;
1604          readAgainForBranch2 = false;
1605          const chunk1 = chunk;
1606          let chunk2 = chunk;
1607
1608          if (!canceled1 && !canceled2) {
1609            try {
1610              chunk2 = cloneAsUint8Array(chunk);
1611            } catch (error) {
1612              readableByteStreamControllerError(
1613                branch1[kState].controller,
1614                error,
1615              );
1616              readableByteStreamControllerError(
1617                branch2[kState].controller,
1618                error,
1619              );
1620              cancelDeferred.resolve(readableStreamCancel(stream, error));
1621              return;
1622            }
1623          }
1624          if (!canceled1) {
1625            readableByteStreamControllerEnqueue(
1626              branch1[kState].controller,
1627              chunk1,
1628            );
1629          }
1630          if (!canceled2) {
1631            readableByteStreamControllerEnqueue(
1632              branch2[kState].controller,
1633              chunk2,
1634            );
1635          }
1636          reading = false;
1637
1638          if (readAgainForBranch1) {
1639            pull1Algorithm();
1640          } else if (readAgainForBranch2) {
1641            pull2Algorithm();
1642          }
1643        });
1644      },
1645      [kClose]() {
1646        reading = false;
1647
1648        if (!canceled1) {
1649          readableByteStreamControllerClose(branch1[kState].controller);
1650        }
1651        if (!canceled2) {
1652          readableByteStreamControllerClose(branch2[kState].controller);
1653        }
1654        if (branch1[kState].controller[kState].pendingPullIntos.length > 0) {
1655          readableByteStreamControllerRespond(branch1[kState].controller, 0);
1656        }
1657        if (branch2[kState].controller[kState].pendingPullIntos.length > 0) {
1658          readableByteStreamControllerRespond(branch2[kState].controller, 0);
1659        }
1660        if (!canceled1 || !canceled2) {
1661          cancelDeferred.resolve();
1662        }
1663      },
1664      [kError]() {
1665        reading = false;
1666      },
1667    };
1668
1669    readableStreamDefaultReaderRead(reader, readRequest);
1670  }
1671
1672  function pullWithBYOBReader(view, forBranch2) {
1673    if (isReadableStreamDefaultReader(reader)) {
1674      readableStreamDefaultReaderRelease(reader);
1675      reader = new ReadableStreamBYOBReader(stream);
1676      forwardReaderError(reader);
1677    }
1678
1679    const byobBranch = forBranch2 === true ? branch2 : branch1;
1680    const otherBranch = forBranch2 === false ? branch2 : branch1;
1681    const readIntoRequest = {
1682      [kChunk](chunk) {
1683        queueMicrotask(() => {
1684          readAgainForBranch1 = false;
1685          readAgainForBranch2 = false;
1686          const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1687          const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1688
1689          if (!otherCanceled) {
1690            let clonedChunk;
1691
1692            try {
1693              clonedChunk = cloneAsUint8Array(chunk);
1694            } catch (error) {
1695              readableByteStreamControllerError(
1696                byobBranch[kState].controller,
1697                error,
1698              );
1699              readableByteStreamControllerError(
1700                otherBranch[kState].controller,
1701                error,
1702              );
1703              cancelDeferred.resolve(readableStreamCancel(stream, error));
1704              return;
1705            }
1706            if (!byobCanceled) {
1707              readableByteStreamControllerRespondWithNewView(
1708                byobBranch[kState].controller,
1709                chunk,
1710              );
1711            }
1712
1713            readableByteStreamControllerEnqueue(
1714              otherBranch[kState].controller,
1715              clonedChunk,
1716            );
1717          } else if (!byobCanceled) {
1718            readableByteStreamControllerRespondWithNewView(
1719              byobBranch[kState].controller,
1720              chunk,
1721            );
1722          }
1723          reading = false;
1724
1725          if (readAgainForBranch1) {
1726            pull1Algorithm();
1727          } else if (readAgainForBranch2) {
1728            pull2Algorithm();
1729          }
1730        });
1731      },
1732      [kClose](chunk) {
1733        reading = false;
1734
1735        const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1736        const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1737
1738        if (!byobCanceled) {
1739          readableByteStreamControllerClose(byobBranch[kState].controller);
1740        }
1741        if (!otherCanceled) {
1742          readableByteStreamControllerClose(otherBranch[kState].controller);
1743        }
1744        if (chunk !== undefined) {
1745          if (!byobCanceled) {
1746            readableByteStreamControllerRespondWithNewView(
1747              byobBranch[kState].controller,
1748              chunk,
1749            );
1750          }
1751          if (
1752            !otherCanceled &&
1753            otherBranch[kState].controller[kState].pendingPullIntos.length > 0
1754          ) {
1755            readableByteStreamControllerRespond(
1756              otherBranch[kState].controller,
1757              0,
1758            );
1759          }
1760        }
1761        if (!byobCanceled || !otherCanceled) {
1762          cancelDeferred.resolve();
1763        }
1764      },
1765      [kError]() {
1766        reading = false;
1767      },
1768    };
1769    readableStreamBYOBReaderRead(reader, view, readIntoRequest);
1770  }
1771
1772  function pull1Algorithm() {
1773    if (reading) {
1774      readAgainForBranch1 = true;
1775      return PromiseResolve();
1776    }
1777    reading = true;
1778
1779    const byobRequest = branch1[kState].controller.byobRequest;
1780    if (byobRequest === null) {
1781      pullWithDefaultReader();
1782    } else {
1783      pullWithBYOBReader(byobRequest[kState].view, false);
1784    }
1785    return PromiseResolve();
1786  }
1787
1788  function pull2Algorithm() {
1789    if (reading) {
1790      readAgainForBranch2 = true;
1791      return PromiseResolve();
1792    }
1793    reading = true;
1794
1795    const byobRequest = branch2[kState].controller.byobRequest;
1796    if (byobRequest === null) {
1797      pullWithDefaultReader();
1798    } else {
1799      pullWithBYOBReader(byobRequest[kState].view, true);
1800    }
1801    return PromiseResolve();
1802  }
1803
1804  function cancel1Algorithm(reason) {
1805    canceled1 = true;
1806    reason1 = reason;
1807    if (canceled2) {
1808      cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1809    }
1810    return cancelDeferred.promise;
1811  }
1812
1813  function cancel2Algorithm(reason) {
1814    canceled2 = true;
1815    reason2 = reason;
1816    if (canceled1) {
1817      cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1818    }
1819    return cancelDeferred.promise;
1820  }
1821
1822  branch1 = new ReadableStream({
1823    type: 'bytes',
1824    pull: pull1Algorithm,
1825    cancel: cancel1Algorithm,
1826  });
1827  branch2 = new ReadableStream({
1828    type: 'bytes',
1829    pull: pull2Algorithm,
1830    cancel: cancel2Algorithm,
1831  });
1832
1833  forwardReaderError(reader);
1834
1835  return [branch1, branch2];
1836}
1837
1838function readableByteStreamControllerConvertPullIntoDescriptor(desc) {
1839  const {
1840    buffer,
1841    bytesFilled,
1842    byteLength,
1843    byteOffset,
1844    ctor,
1845    elementSize,
1846  } = desc;
1847  if (bytesFilled > byteLength)
1848    throw new ERR_INVALID_STATE.RangeError('The buffer size is invalid');
1849  assert(!(bytesFilled % elementSize));
1850  const transferredBuffer = transferArrayBuffer(buffer);
1851
1852  if (ctor === Buffer) {
1853    return Buffer.from(transferredBuffer, byteOffset, bytesFilled / elementSize);
1854  }
1855
1856  return new ctor(transferredBuffer, byteOffset, bytesFilled / elementSize);
1857}
1858
1859function isReadableStreamLocked(stream) {
1860  return stream[kState].reader !== undefined;
1861}
1862
1863function readableStreamCancel(stream, reason) {
1864  stream[kState].disturbed = true;
1865  switch (stream[kState].state) {
1866    case 'closed':
1867      return PromiseResolve();
1868    case 'errored':
1869      return PromiseReject(stream[kState].storedError);
1870  }
1871  readableStreamClose(stream);
1872  const {
1873    reader,
1874  } = stream[kState];
1875  if (reader !== undefined && readableStreamHasBYOBReader(stream)) {
1876    for (let n = 0; n < reader[kState].readIntoRequests.length; n++)
1877      reader[kState].readIntoRequests[n][kClose]();
1878    reader[kState].readIntoRequests = [];
1879  }
1880
1881  return PromisePrototypeThen(
1882    ensureIsPromise(
1883      stream[kState].controller[kCancel],
1884      stream[kState].controller,
1885      reason),
1886    () => {});
1887}
1888
1889function readableStreamClose(stream) {
1890  assert(stream[kState].state === 'readable');
1891  stream[kState].state = 'closed';
1892  stream[kIsClosedPromise].resolve();
1893  const {
1894    reader,
1895  } = stream[kState];
1896
1897  if (reader === undefined)
1898    return;
1899
1900  reader[kState].close.resolve();
1901
1902  if (readableStreamHasDefaultReader(stream)) {
1903    for (let n = 0; n < reader[kState].readRequests.length; n++)
1904      reader[kState].readRequests[n][kClose]();
1905    reader[kState].readRequests = [];
1906  }
1907}
1908
1909function readableStreamError(stream, error) {
1910  assert(stream[kState].state === 'readable');
1911  stream[kState].state = 'errored';
1912  stream[kState].storedError = error;
1913  stream[kIsClosedPromise].reject(error);
1914  setPromiseHandled(stream[kIsClosedPromise].promise);
1915
1916  const {
1917    reader,
1918  } = stream[kState];
1919
1920  if (reader === undefined)
1921    return;
1922
1923  reader[kState].close.reject(error);
1924  setPromiseHandled(reader[kState].close.promise);
1925
1926  if (readableStreamHasDefaultReader(stream)) {
1927    for (let n = 0; n < reader[kState].readRequests.length; n++)
1928      reader[kState].readRequests[n][kError](error);
1929    reader[kState].readRequests = [];
1930  } else {
1931    assert(readableStreamHasBYOBReader(stream));
1932    for (let n = 0; n < reader[kState].readIntoRequests.length; n++)
1933      reader[kState].readIntoRequests[n][kError](error);
1934    reader[kState].readIntoRequests = [];
1935  }
1936}
1937
1938function readableStreamHasDefaultReader(stream) {
1939  const {
1940    reader,
1941  } = stream[kState];
1942
1943  if (reader === undefined)
1944    return false;
1945
1946  return reader[kState] !== undefined &&
1947         reader[kType] === 'ReadableStreamDefaultReader';
1948}
1949
1950function readableStreamGetNumReadRequests(stream) {
1951  assert(readableStreamHasDefaultReader(stream));
1952  return stream[kState].reader[kState].readRequests.length;
1953}
1954
1955function readableStreamHasBYOBReader(stream) {
1956  const {
1957    reader,
1958  } = stream[kState];
1959
1960  if (reader === undefined)
1961    return false;
1962
1963  return reader[kState] !== undefined &&
1964         reader[kType] === 'ReadableStreamBYOBReader';
1965}
1966
1967function readableStreamGetNumReadIntoRequests(stream) {
1968  assert(readableStreamHasBYOBReader(stream));
1969  return stream[kState].reader[kState].readIntoRequests.length;
1970}
1971
1972function readableStreamFulfillReadRequest(stream, chunk, done) {
1973  assert(readableStreamHasDefaultReader(stream));
1974  const {
1975    reader,
1976  } = stream[kState];
1977  assert(reader[kState].readRequests.length);
1978  const readRequest = ArrayPrototypeShift(reader[kState].readRequests);
1979
1980  // TODO(@jasnell): It's not clear under what exact conditions done
1981  // will be true here. The spec requires this check but none of the
1982  // WPT's or other tests trigger it. Will need to investigate how to
1983  // get coverage for this.
1984  if (done)
1985    readRequest[kClose]();
1986  else
1987    readRequest[kChunk](chunk);
1988}
1989
1990function readableStreamFulfillReadIntoRequest(stream, chunk, done) {
1991  assert(readableStreamHasBYOBReader(stream));
1992  const {
1993    reader,
1994  } = stream[kState];
1995  assert(reader[kState].readIntoRequests.length);
1996  const readIntoRequest = ArrayPrototypeShift(reader[kState].readIntoRequests);
1997  if (done)
1998    readIntoRequest[kClose](chunk);
1999  else
2000    readIntoRequest[kChunk](chunk);
2001}
2002
2003function readableStreamAddReadRequest(stream, readRequest) {
2004  assert(readableStreamHasDefaultReader(stream));
2005  assert(stream[kState].state === 'readable');
2006  ArrayPrototypePush(stream[kState].reader[kState].readRequests, readRequest);
2007}
2008
2009function readableStreamAddReadIntoRequest(stream, readIntoRequest) {
2010  assert(readableStreamHasBYOBReader(stream));
2011  assert(stream[kState].state !== 'errored');
2012  ArrayPrototypePush(
2013    stream[kState].reader[kState].readIntoRequests,
2014    readIntoRequest);
2015}
2016
2017function readableStreamReaderGenericCancel(reader, reason) {
2018  const {
2019    stream,
2020  } = reader[kState];
2021  assert(stream !== undefined);
2022  return readableStreamCancel(stream, reason);
2023}
2024
2025function readableStreamReaderGenericInitialize(reader, stream) {
2026  reader[kState].stream = stream;
2027  stream[kState].reader = reader;
2028  switch (stream[kState].state) {
2029    case 'readable':
2030      reader[kState].close = createDeferredPromise();
2031      break;
2032    case 'closed':
2033      reader[kState].close = {
2034        promise: PromiseResolve(),
2035        resolve: undefined,
2036        reject: undefined,
2037      };
2038      break;
2039    case 'errored':
2040      reader[kState].close = {
2041        promise: PromiseReject(stream[kState].storedError),
2042        resolve: undefined,
2043        reject: undefined,
2044      };
2045      setPromiseHandled(reader[kState].close.promise);
2046      break;
2047  }
2048}
2049
2050function readableStreamDefaultReaderRelease(reader) {
2051  readableStreamReaderGenericRelease(reader);
2052  readableStreamDefaultReaderErrorReadRequests(
2053    reader,
2054    lazyReadableReleasingError(),
2055  );
2056}
2057
2058function readableStreamDefaultReaderErrorReadRequests(reader, e) {
2059  for (let n = 0; n < reader[kState].readRequests.length; ++n) {
2060    reader[kState].readRequests[n][kError](e);
2061  }
2062  reader[kState].readRequests = [];
2063}
2064
2065function readableStreamBYOBReaderRelease(reader) {
2066  readableStreamReaderGenericRelease(reader);
2067  readableStreamBYOBReaderErrorReadIntoRequests(
2068    reader,
2069    lazyReadableReleasingError(),
2070  );
2071}
2072
2073function readableStreamBYOBReaderErrorReadIntoRequests(reader, e) {
2074  for (let n = 0; n < reader[kState].readIntoRequests.length; ++n) {
2075    reader[kState].readIntoRequests[n][kError](e);
2076  }
2077  reader[kState].readIntoRequests = [];
2078}
2079
2080function readableStreamReaderGenericRelease(reader) {
2081  const {
2082    stream,
2083  } = reader[kState];
2084  assert(stream !== undefined);
2085  assert(stream[kState].reader === reader);
2086
2087  const releasedStateError = lazyReadableReleasedError();
2088  if (stream[kState].state === 'readable') {
2089    reader[kState].close.reject?.(releasedStateError);
2090  } else {
2091    reader[kState].close = {
2092      promise: PromiseReject(releasedStateError),
2093      resolve: undefined,
2094      reject: undefined,
2095    };
2096  }
2097  setPromiseHandled(reader[kState].close.promise);
2098
2099  stream[kState].controller[kRelease]();
2100
2101  stream[kState].reader = undefined;
2102  reader[kState].stream = undefined;
2103}
2104
2105function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
2106  const {
2107    stream,
2108  } = reader[kState];
2109  assert(stream !== undefined);
2110  stream[kState].disturbed = true;
2111  if (stream[kState].state === 'errored') {
2112    readIntoRequest[kError](stream[kState].storedError);
2113    return;
2114  }
2115  readableByteStreamControllerPullInto(
2116    stream[kState].controller,
2117    view,
2118    readIntoRequest);
2119}
2120
2121function readableStreamDefaultReaderRead(reader, readRequest) {
2122  const {
2123    stream,
2124  } = reader[kState];
2125  assert(stream !== undefined);
2126  stream[kState].disturbed = true;
2127  switch (stream[kState].state) {
2128    case 'closed':
2129      readRequest[kClose]();
2130      break;
2131    case 'errored':
2132      readRequest[kError](stream[kState].storedError);
2133      break;
2134    case 'readable':
2135      stream[kState].controller[kPull](readRequest);
2136  }
2137}
2138
2139function setupReadableStreamBYOBReader(reader, stream) {
2140  if (isReadableStreamLocked(stream))
2141    throw new ERR_INVALID_STATE.TypeError('ReadableStream is locked');
2142  const {
2143    controller,
2144  } = stream[kState];
2145  if (!isReadableByteStreamController(controller))
2146    throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be a byte stream');
2147  readableStreamReaderGenericInitialize(reader, stream);
2148  reader[kState].readIntoRequests = [];
2149}
2150
2151function setupReadableStreamDefaultReader(reader, stream) {
2152  if (isReadableStreamLocked(stream))
2153    throw new ERR_INVALID_STATE.TypeError('ReadableStream is locked');
2154  readableStreamReaderGenericInitialize(reader, stream);
2155  reader[kState].readRequests = [];
2156}
2157
2158function readableStreamDefaultControllerClose(controller) {
2159  if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller))
2160    return;
2161  controller[kState].closeRequested = true;
2162  if (!controller[kState].queue.length) {
2163    readableStreamDefaultControllerClearAlgorithms(controller);
2164    readableStreamClose(controller[kState].stream);
2165  }
2166}
2167
2168function readableStreamDefaultControllerEnqueue(controller, chunk) {
2169  if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller))
2170    return;
2171
2172  const {
2173    stream,
2174  } = controller[kState];
2175
2176  if (isReadableStreamLocked(stream) &&
2177      readableStreamGetNumReadRequests(stream)) {
2178    readableStreamFulfillReadRequest(stream, chunk, false);
2179  } else {
2180    try {
2181      const chunkSize =
2182        FunctionPrototypeCall(
2183          controller[kState].sizeAlgorithm,
2184          undefined,
2185          chunk);
2186      enqueueValueWithSize(controller, chunk, chunkSize);
2187    } catch (error) {
2188      readableStreamDefaultControllerError(controller, error);
2189      throw error;
2190    }
2191  }
2192  readableStreamDefaultControllerCallPullIfNeeded(controller);
2193}
2194
2195function readableStreamDefaultControllerHasBackpressure(controller) {
2196  return !readableStreamDefaultControllerShouldCallPull(controller);
2197}
2198
2199function readableStreamDefaultControllerCanCloseOrEnqueue(controller) {
2200  const {
2201    stream,
2202  } = controller[kState];
2203  return !controller[kState].closeRequested &&
2204         stream[kState].state === 'readable';
2205}
2206
2207function readableStreamDefaultControllerGetDesiredSize(controller) {
2208  const {
2209    stream,
2210    highWaterMark,
2211    queueTotalSize,
2212  } = controller[kState];
2213  switch (stream[kState].state) {
2214    case 'errored': return null;
2215    case 'closed': return 0;
2216    default:
2217      return highWaterMark - queueTotalSize;
2218  }
2219}
2220
2221function readableStreamDefaultControllerShouldCallPull(controller) {
2222  const {
2223    stream,
2224  } = controller[kState];
2225  if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller) ||
2226      !controller[kState].started)
2227    return false;
2228
2229  if (isReadableStreamLocked(stream) &&
2230      readableStreamGetNumReadRequests(stream)) {
2231    return true;
2232  }
2233
2234  const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller);
2235  assert(desiredSize !== null);
2236
2237  return desiredSize > 0;
2238}
2239
2240function readableStreamDefaultControllerCallPullIfNeeded(controller) {
2241  if (!readableStreamDefaultControllerShouldCallPull(controller))
2242    return;
2243  if (controller[kState].pulling) {
2244    controller[kState].pullAgain = true;
2245    return;
2246  }
2247  assert(!controller[kState].pullAgain);
2248  controller[kState].pulling = true;
2249  PromisePrototypeThen(
2250    ensureIsPromise(controller[kState].pullAlgorithm, controller),
2251    () => {
2252      controller[kState].pulling = false;
2253      if (controller[kState].pullAgain) {
2254        controller[kState].pullAgain = false;
2255        readableStreamDefaultControllerCallPullIfNeeded(controller);
2256      }
2257    },
2258    (error) => readableStreamDefaultControllerError(controller, error));
2259}
2260
2261function readableStreamDefaultControllerClearAlgorithms(controller) {
2262  controller[kState].pullAlgorithm = undefined;
2263  controller[kState].cancelAlgorithm = undefined;
2264  controller[kState].sizeAlgorithm = undefined;
2265}
2266
2267function readableStreamDefaultControllerError(controller, error) {
2268  const {
2269    stream,
2270  } = controller[kState];
2271  if (stream[kState].state === 'readable') {
2272    resetQueue(controller);
2273    readableStreamDefaultControllerClearAlgorithms(controller);
2274    readableStreamError(stream, error);
2275  }
2276}
2277
2278function readableStreamDefaultControllerCancelSteps(controller, reason) {
2279  resetQueue(controller);
2280  try {
2281    const result = controller[kState].cancelAlgorithm(reason);
2282    return result;
2283  } finally {
2284    readableStreamDefaultControllerClearAlgorithms(controller);
2285  }
2286}
2287
2288function readableStreamDefaultControllerPullSteps(controller, readRequest) {
2289  const {
2290    stream,
2291    queue,
2292  } = controller[kState];
2293  if (queue.length) {
2294    const chunk = dequeueValue(controller);
2295    if (controller[kState].closeRequested && !queue.length) {
2296      readableStreamDefaultControllerClearAlgorithms(controller);
2297      readableStreamClose(stream);
2298    } else {
2299      readableStreamDefaultControllerCallPullIfNeeded(controller);
2300    }
2301    readRequest[kChunk](chunk);
2302    return;
2303  }
2304  readableStreamAddReadRequest(stream, readRequest);
2305  readableStreamDefaultControllerCallPullIfNeeded(controller);
2306}
2307
2308function setupReadableStreamDefaultController(
2309  stream,
2310  controller,
2311  startAlgorithm,
2312  pullAlgorithm,
2313  cancelAlgorithm,
2314  highWaterMark,
2315  sizeAlgorithm) {
2316  assert(stream[kState].controller === undefined);
2317  controller[kState] = {
2318    cancelAlgorithm,
2319    closeRequested: false,
2320    highWaterMark,
2321    pullAgain: false,
2322    pullAlgorithm,
2323    pulling: false,
2324    queue: [],
2325    queueTotalSize: 0,
2326    started: false,
2327    sizeAlgorithm,
2328    stream,
2329  };
2330  stream[kState].controller = controller;
2331  stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);
2332
2333  const startResult = startAlgorithm();
2334
2335  PromisePrototypeThen(
2336    PromiseResolve(startResult),
2337    () => {
2338      controller[kState].started = true;
2339      assert(!controller[kState].pulling);
2340      assert(!controller[kState].pullAgain);
2341      readableStreamDefaultControllerCallPullIfNeeded(controller);
2342    },
2343    (error) => readableStreamDefaultControllerError(controller, error));
2344}
2345
2346function setupReadableStreamDefaultControllerFromSource(
2347  stream,
2348  source,
2349  highWaterMark,
2350  sizeAlgorithm) {
2351  const controller = new ReadableStreamDefaultController(kSkipThrow);
2352  const start = source?.start;
2353  const pull = source?.pull;
2354  const cancel = source?.cancel;
2355  const startAlgorithm = start ?
2356    FunctionPrototypeBind(start, source, controller) :
2357    nonOpStart;
2358  const pullAlgorithm = pull ?
2359    FunctionPrototypeBind(pull, source, controller) :
2360    nonOpPull;
2361
2362  const cancelAlgorithm = cancel ?
2363    FunctionPrototypeBind(cancel, source) :
2364    nonOpCancel;
2365
2366  setupReadableStreamDefaultController(
2367    stream,
2368    controller,
2369    startAlgorithm,
2370    pullAlgorithm,
2371    cancelAlgorithm,
2372    highWaterMark,
2373    sizeAlgorithm);
2374}
2375
2376function readableByteStreamControllerClose(controller) {
2377  const {
2378    closeRequested,
2379    pendingPullIntos,
2380    queueTotalSize,
2381    stream,
2382  } = controller[kState];
2383
2384  if (closeRequested || stream[kState].state !== 'readable')
2385    return;
2386
2387  if (queueTotalSize) {
2388    controller[kState].closeRequested = true;
2389    return;
2390  }
2391
2392  if (pendingPullIntos.length) {
2393    const firstPendingPullInto = pendingPullIntos[0];
2394    if (firstPendingPullInto.bytesFilled > 0) {
2395      const error = new ERR_INVALID_STATE.TypeError('Partial read');
2396      readableByteStreamControllerError(controller, error);
2397      throw error;
2398    }
2399  }
2400
2401  readableByteStreamControllerClearAlgorithms(controller);
2402  readableStreamClose(stream);
2403}
2404
2405function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
2406  assert(stream[kState].state !== 'errored');
2407  assert(desc.type !== 'none');
2408
2409  let done = false;
2410  if (stream[kState].state === 'closed') {
2411    desc.bytesFilled = 0;
2412    done = true;
2413  }
2414
2415  const filledView =
2416    readableByteStreamControllerConvertPullIntoDescriptor(desc);
2417
2418  if (desc.type === 'default') {
2419    readableStreamFulfillReadRequest(stream, filledView, done);
2420  } else {
2421    assert(desc.type === 'byob');
2422    readableStreamFulfillReadIntoRequest(stream, filledView, done);
2423  }
2424}
2425
2426function readableByteStreamControllerInvalidateBYOBRequest(controller) {
2427  if (controller[kState].byobRequest === null)
2428    return;
2429  controller[kState].byobRequest[kState].controller = undefined;
2430  controller[kState].byobRequest[kState].view = null;
2431  controller[kState].byobRequest = null;
2432}
2433
2434function readableByteStreamControllerClearAlgorithms(controller) {
2435  controller[kState].pullAlgorithm = undefined;
2436  controller[kState].cancelAlgorithm = undefined;
2437}
2438
2439function readableByteStreamControllerClearPendingPullIntos(controller) {
2440  readableByteStreamControllerInvalidateBYOBRequest(controller);
2441  controller[kState].pendingPullIntos = [];
2442}
2443
2444function readableByteStreamControllerGetDesiredSize(controller) {
2445  const {
2446    stream,
2447    highWaterMark,
2448    queueTotalSize,
2449  } = controller[kState];
2450  switch (stream[kState].state) {
2451    case 'errored': return null;
2452    case 'closed': return 0;
2453    default: return highWaterMark - queueTotalSize;
2454  }
2455}
2456
2457function readableByteStreamControllerShouldCallPull(controller) {
2458  const {
2459    stream,
2460  } = controller[kState];
2461  if (stream[kState].state !== 'readable' ||
2462      controller[kState].closeRequested ||
2463      !controller[kState].started) {
2464    return false;
2465  }
2466  if (readableStreamHasDefaultReader(stream) &&
2467      readableStreamGetNumReadRequests(stream) > 0) {
2468    return true;
2469  }
2470
2471  if (readableStreamHasBYOBReader(stream) &&
2472      readableStreamGetNumReadIntoRequests(stream) > 0) {
2473    return true;
2474  }
2475
2476  const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
2477  assert(desiredSize !== null);
2478
2479  return desiredSize > 0;
2480}
2481
2482function readableByteStreamControllerHandleQueueDrain(controller) {
2483  const {
2484    closeRequested,
2485    queueTotalSize,
2486    stream,
2487  } = controller[kState];
2488  assert(stream[kState].state === 'readable');
2489  if (!queueTotalSize && closeRequested) {
2490    readableByteStreamControllerClearAlgorithms(controller);
2491    readableStreamClose(stream);
2492    return;
2493  }
2494  readableByteStreamControllerCallPullIfNeeded(controller);
2495}
2496
2497function readableByteStreamControllerPullInto(
2498  controller,
2499  view,
2500  readIntoRequest) {
2501  const {
2502    closeRequested,
2503    stream,
2504    pendingPullIntos,
2505  } = controller[kState];
2506  let elementSize = 1;
2507  let ctor = DataView;
2508  if (isArrayBufferView(view) && !isDataView(view)) {
2509    elementSize = view.constructor.BYTES_PER_ELEMENT;
2510    ctor = view.constructor;
2511  }
2512  const buffer = ArrayBufferViewGetBuffer(view);
2513  const byteOffset = ArrayBufferViewGetByteOffset(view);
2514  const byteLength = ArrayBufferViewGetByteLength(view);
2515  const bufferByteLength = ArrayBufferPrototypeGetByteLength(buffer);
2516
2517  let transferredBuffer;
2518  try {
2519    transferredBuffer = transferArrayBuffer(buffer);
2520  } catch (error) {
2521    readIntoRequest[kError](error);
2522    return;
2523  }
2524  const desc = {
2525    buffer: transferredBuffer,
2526    bufferByteLength,
2527    byteOffset,
2528    byteLength,
2529    bytesFilled: 0,
2530    elementSize,
2531    ctor,
2532    type: 'byob',
2533  };
2534  if (pendingPullIntos.length) {
2535    ArrayPrototypePush(pendingPullIntos, desc);
2536    readableStreamAddReadIntoRequest(stream, readIntoRequest);
2537    return;
2538  }
2539  if (stream[kState].state === 'closed') {
2540    const emptyView = new ctor(desc.buffer, byteOffset, 0);
2541    readIntoRequest[kClose](emptyView);
2542    return;
2543  }
2544  if (controller[kState].queueTotalSize) {
2545    if (readableByteStreamControllerFillPullIntoDescriptorFromQueue(
2546      controller,
2547      desc)) {
2548      const filledView =
2549        readableByteStreamControllerConvertPullIntoDescriptor(desc);
2550      readableByteStreamControllerHandleQueueDrain(controller);
2551      readIntoRequest[kChunk](filledView);
2552      return;
2553    }
2554    if (closeRequested) {
2555      const error = new ERR_INVALID_STATE.TypeError('ReadableStream closed');
2556      readableByteStreamControllerError(controller, error);
2557      readIntoRequest[kError](error);
2558      return;
2559    }
2560  }
2561  ArrayPrototypePush(pendingPullIntos, desc);
2562  readableStreamAddReadIntoRequest(stream, readIntoRequest);
2563  readableByteStreamControllerCallPullIfNeeded(controller);
2564}
2565
2566function readableByteStreamControllerRespondInternal(controller, bytesWritten) {
2567  const {
2568    stream,
2569    pendingPullIntos,
2570  } = controller[kState];
2571  const desc = pendingPullIntos[0];
2572  readableByteStreamControllerInvalidateBYOBRequest(controller);
2573  if (stream[kState].state === 'closed') {
2574    if (bytesWritten)
2575      throw new ERR_INVALID_STATE.TypeError(
2576        'Controller is closed but view is not zero-length');
2577    readableByteStreamControllerRespondInClosedState(controller, desc);
2578  } else {
2579    assert(stream[kState].state === 'readable');
2580    if (!bytesWritten)
2581      throw new ERR_INVALID_STATE.TypeError('View cannot be zero-length');
2582    readableByteStreamControllerRespondInReadableState(
2583      controller,
2584      bytesWritten,
2585      desc);
2586  }
2587  readableByteStreamControllerCallPullIfNeeded(controller);
2588}
2589
2590function readableByteStreamControllerRespond(controller, bytesWritten) {
2591  const {
2592    pendingPullIntos,
2593    stream,
2594  } = controller[kState];
2595  assert(pendingPullIntos.length);
2596  const desc = pendingPullIntos[0];
2597
2598  if (stream[kState].state === 'closed') {
2599    if (bytesWritten !== 0)
2600      throw new ERR_INVALID_ARG_VALUE('bytesWritten', bytesWritten);
2601  } else {
2602    assert(stream[kState].state === 'readable');
2603
2604    if (!bytesWritten)
2605      throw new ERR_INVALID_ARG_VALUE('bytesWritten', bytesWritten);
2606
2607    if ((desc.bytesFilled + bytesWritten) > desc.byteLength)
2608      throw new ERR_INVALID_ARG_VALUE.RangeError('bytesWritten', bytesWritten);
2609  }
2610
2611  desc.buffer = transferArrayBuffer(desc.buffer);
2612
2613  readableByteStreamControllerRespondInternal(controller, bytesWritten);
2614}
2615
2616function readableByteStreamControllerRespondInClosedState(controller, desc) {
2617  assert(!desc.bytesFilled);
2618  if (desc.type === 'none') {
2619    readableByteStreamControllerShiftPendingPullInto(controller);
2620  }
2621  const {
2622    stream,
2623  } = controller[kState];
2624  if (readableStreamHasBYOBReader(stream)) {
2625    while (readableStreamGetNumReadIntoRequests(stream) > 0) {
2626      readableByteStreamControllerCommitPullIntoDescriptor(
2627        stream,
2628        readableByteStreamControllerShiftPendingPullInto(controller));
2629    }
2630  }
2631}
2632
2633function readableByteStreamControllerFillHeadPullIntoDescriptor(
2634  controller,
2635  size,
2636  desc) {
2637  const {
2638    pendingPullIntos,
2639    byobRequest,
2640  } = controller[kState];
2641  assert(!pendingPullIntos.length || pendingPullIntos[0] === desc);
2642  assert(byobRequest === null);
2643  desc.bytesFilled += size;
2644}
2645
2646function readableByteStreamControllerEnqueue(controller, chunk) {
2647  const {
2648    closeRequested,
2649    pendingPullIntos,
2650    queue,
2651    stream,
2652  } = controller[kState];
2653
2654  const buffer = ArrayBufferViewGetBuffer(chunk);
2655  const byteOffset = ArrayBufferViewGetByteOffset(chunk);
2656  const byteLength = ArrayBufferViewGetByteLength(chunk);
2657
2658  if (closeRequested || stream[kState].state !== 'readable')
2659    return;
2660
2661  const transferredBuffer = transferArrayBuffer(buffer);
2662
2663  if (pendingPullIntos.length) {
2664    const firstPendingPullInto = pendingPullIntos[0];
2665
2666    if (isArrayBufferDetached(firstPendingPullInto.buffer)) {
2667      throw new ERR_INVALID_STATE.TypeError(
2668        'Destination ArrayBuffer is detached',
2669      );
2670    }
2671
2672    readableByteStreamControllerInvalidateBYOBRequest(controller);
2673
2674    firstPendingPullInto.buffer = transferArrayBuffer(
2675      firstPendingPullInto.buffer,
2676    );
2677
2678    if (firstPendingPullInto.type === 'none') {
2679      readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
2680        controller,
2681        firstPendingPullInto,
2682      );
2683    }
2684  }
2685
2686  if (readableStreamHasDefaultReader(stream)) {
2687    readableByteStreamControllerProcessReadRequestsUsingQueue(controller);
2688    if (!readableStreamGetNumReadRequests(stream)) {
2689      readableByteStreamControllerEnqueueChunkToQueue(
2690        controller,
2691        transferredBuffer,
2692        byteOffset,
2693        byteLength);
2694    } else {
2695      assert(!queue.length);
2696      if (pendingPullIntos.length) {
2697        assert(pendingPullIntos[0].type === 'default');
2698        readableByteStreamControllerShiftPendingPullInto(controller);
2699      }
2700      const transferredView =
2701        new Uint8Array(transferredBuffer, byteOffset, byteLength);
2702      readableStreamFulfillReadRequest(stream, transferredView, false);
2703    }
2704  } else if (readableStreamHasBYOBReader(stream)) {
2705    readableByteStreamControllerEnqueueChunkToQueue(
2706      controller,
2707      transferredBuffer,
2708      byteOffset,
2709      byteLength);
2710    readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
2711      controller);
2712  } else {
2713    assert(!isReadableStreamLocked(stream));
2714    readableByteStreamControllerEnqueueChunkToQueue(
2715      controller,
2716      transferredBuffer,
2717      byteOffset,
2718      byteLength);
2719  }
2720  readableByteStreamControllerCallPullIfNeeded(controller);
2721}
2722
2723function readableByteStreamControllerEnqueueClonedChunkToQueue(
2724  controller,
2725  buffer,
2726  byteOffset,
2727  byteLength,
2728) {
2729  let cloneResult;
2730  try {
2731    cloneResult = ArrayBufferPrototypeSlice(
2732      buffer,
2733      byteOffset,
2734      byteOffset + byteLength,
2735    );
2736  } catch (error) {
2737    readableByteStreamControllerError(controller, error);
2738    throw error;
2739  }
2740  readableByteStreamControllerEnqueueChunkToQueue(
2741    controller,
2742    cloneResult,
2743    0,
2744    byteLength,
2745  );
2746}
2747
2748function readableByteStreamControllerEnqueueChunkToQueue(
2749  controller,
2750  buffer,
2751  byteOffset,
2752  byteLength) {
2753  ArrayPrototypePush(
2754    controller[kState].queue,
2755    {
2756      buffer,
2757      byteOffset,
2758      byteLength,
2759    });
2760  controller[kState].queueTotalSize += byteLength;
2761}
2762
2763function readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
2764  controller,
2765  desc,
2766) {
2767  const {
2768    buffer,
2769    byteOffset,
2770    bytesFilled,
2771    type,
2772  } = desc;
2773  assert(type === 'none');
2774
2775  if (bytesFilled > 0) {
2776    readableByteStreamControllerEnqueueClonedChunkToQueue(
2777      controller,
2778      buffer,
2779      byteOffset,
2780      bytesFilled,
2781    );
2782  }
2783  readableByteStreamControllerShiftPendingPullInto(controller);
2784}
2785
2786function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
2787  controller,
2788  desc) {
2789  const {
2790    buffer,
2791    byteLength,
2792    byteOffset,
2793    bytesFilled,
2794    elementSize,
2795  } = desc;
2796  const currentAlignedBytes = bytesFilled - (bytesFilled % elementSize);
2797  const maxBytesToCopy = MathMin(
2798    controller[kState].queueTotalSize,
2799    byteLength - bytesFilled);
2800  const maxBytesFilled = bytesFilled + maxBytesToCopy;
2801  const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
2802  let totalBytesToCopyRemaining = maxBytesToCopy;
2803  let ready = false;
2804  if (maxAlignedBytes > currentAlignedBytes) {
2805    totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;
2806    ready = true;
2807  }
2808  const {
2809    queue,
2810  } = controller[kState];
2811
2812  while (totalBytesToCopyRemaining) {
2813    const headOfQueue = queue[0];
2814    const bytesToCopy = MathMin(
2815      totalBytesToCopyRemaining,
2816      headOfQueue.byteLength);
2817    const destStart = byteOffset + desc.bytesFilled;
2818    const arrayBufferByteLength = ArrayBufferPrototypeGetByteLength(buffer);
2819    if (arrayBufferByteLength - destStart < bytesToCopy) {
2820      throw new ERR_INVALID_STATE.RangeError(
2821        'view ArrayBuffer size is invalid');
2822    }
2823    assert(arrayBufferByteLength - destStart >= bytesToCopy);
2824    copyArrayBuffer(
2825      buffer,
2826      destStart,
2827      headOfQueue.buffer,
2828      headOfQueue.byteOffset,
2829      bytesToCopy);
2830    if (headOfQueue.byteLength === bytesToCopy) {
2831      ArrayPrototypeShift(queue);
2832    } else {
2833      headOfQueue.byteOffset += bytesToCopy;
2834      headOfQueue.byteLength -= bytesToCopy;
2835    }
2836    controller[kState].queueTotalSize -= bytesToCopy;
2837    readableByteStreamControllerFillHeadPullIntoDescriptor(
2838      controller,
2839      bytesToCopy,
2840      desc);
2841    totalBytesToCopyRemaining -= bytesToCopy;
2842  }
2843
2844  if (!ready) {
2845    assert(!controller[kState].queueTotalSize);
2846    assert(desc.bytesFilled > 0);
2847    assert(desc.bytesFilled < elementSize);
2848  }
2849  return ready;
2850}
2851
2852function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
2853  controller) {
2854  const {
2855    closeRequested,
2856    pendingPullIntos,
2857    stream,
2858  } = controller[kState];
2859  assert(!closeRequested);
2860  while (pendingPullIntos.length) {
2861    if (!controller[kState].queueTotalSize)
2862      return;
2863    const desc = pendingPullIntos[0];
2864    if (readableByteStreamControllerFillPullIntoDescriptorFromQueue(
2865      controller,
2866      desc)) {
2867      readableByteStreamControllerShiftPendingPullInto(controller);
2868      readableByteStreamControllerCommitPullIntoDescriptor(stream, desc);
2869    }
2870  }
2871}
2872
2873function readableByteStreamControllerRespondInReadableState(
2874  controller,
2875  bytesWritten,
2876  desc) {
2877  const {
2878    buffer,
2879    bytesFilled,
2880    byteLength,
2881    type,
2882  } = desc;
2883
2884  if (bytesFilled + bytesWritten > byteLength)
2885    throw new ERR_INVALID_STATE.RangeError('The buffer size is invalid');
2886
2887  readableByteStreamControllerFillHeadPullIntoDescriptor(
2888    controller,
2889    bytesWritten,
2890    desc);
2891
2892  if (type === 'none') {
2893    readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
2894      controller,
2895      desc,
2896    );
2897    readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
2898      controller,
2899    );
2900    return;
2901  }
2902
2903  if (desc.bytesFilled < desc.elementSize)
2904    return;
2905
2906  readableByteStreamControllerShiftPendingPullInto(controller);
2907
2908  const remainderSize = desc.bytesFilled % desc.elementSize;
2909
2910  if (remainderSize) {
2911    const end = desc.byteOffset + desc.bytesFilled;
2912    const start = end - remainderSize;
2913    const remainder =
2914      ArrayBufferPrototypeSlice(
2915        buffer,
2916        start,
2917        end);
2918    readableByteStreamControllerEnqueueChunkToQueue(
2919      controller,
2920      remainder,
2921      0,
2922      ArrayBufferPrototypeGetByteLength(remainder));
2923  }
2924  desc.bytesFilled -= remainderSize;
2925  readableByteStreamControllerCommitPullIntoDescriptor(
2926    controller[kState].stream,
2927    desc);
2928  readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);
2929}
2930
2931function readableByteStreamControllerRespondWithNewView(controller, view) {
2932  const {
2933    stream,
2934    pendingPullIntos,
2935  } = controller[kState];
2936  assert(pendingPullIntos.length);
2937
2938  const desc = pendingPullIntos[0];
2939  assert(stream[kState].state !== 'errored');
2940
2941  const viewByteLength = ArrayBufferViewGetByteLength(view);
2942  const viewByteOffset = ArrayBufferViewGetByteOffset(view);
2943  const viewBuffer = ArrayBufferViewGetBuffer(view);
2944  const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer);
2945
2946  if (stream[kState].state === 'closed') {
2947    if (viewByteLength !== 0)
2948      throw new ERR_INVALID_STATE.TypeError('View is not zero-length');
2949  } else {
2950    assert(stream[kState].state === 'readable');
2951    if (viewByteLength === 0)
2952      throw new ERR_INVALID_STATE.TypeError('View is zero-length');
2953  }
2954
2955  const {
2956    byteOffset,
2957    byteLength,
2958    bytesFilled,
2959    bufferByteLength,
2960  } = desc;
2961
2962  if (byteOffset + bytesFilled !== viewByteOffset)
2963    throw new ERR_INVALID_ARG_VALUE.RangeError('view', view);
2964
2965  if (bytesFilled + viewByteLength > byteLength)
2966    throw new ERR_INVALID_ARG_VALUE.RangeError('view', view);
2967
2968  if (bufferByteLength !== viewBufferByteLength)
2969    throw new ERR_INVALID_ARG_VALUE.RangeError('view', view);
2970
2971  desc.buffer = transferArrayBuffer(viewBuffer);
2972
2973  readableByteStreamControllerRespondInternal(controller, viewByteLength);
2974}
2975
2976function readableByteStreamControllerShiftPendingPullInto(controller) {
2977  assert(controller[kState].byobRequest === null);
2978  return ArrayPrototypeShift(controller[kState].pendingPullIntos);
2979}
2980
2981function readableByteStreamControllerCallPullIfNeeded(controller) {
2982  if (!readableByteStreamControllerShouldCallPull(controller))
2983    return;
2984  if (controller[kState].pulling) {
2985    controller[kState].pullAgain = true;
2986    return;
2987  }
2988  assert(!controller[kState].pullAgain);
2989  controller[kState].pulling = true;
2990  PromisePrototypeThen(
2991    ensureIsPromise(controller[kState].pullAlgorithm, controller),
2992    () => {
2993      controller[kState].pulling = false;
2994      if (controller[kState].pullAgain) {
2995        controller[kState].pullAgain = false;
2996        readableByteStreamControllerCallPullIfNeeded(controller);
2997      }
2998    },
2999    (error) => readableByteStreamControllerError(controller, error));
3000}
3001
3002function readableByteStreamControllerError(controller, error) {
3003  const {
3004    stream,
3005  } = controller[kState];
3006  if (stream[kState].state !== 'readable')
3007    return;
3008  readableByteStreamControllerClearPendingPullIntos(controller);
3009  resetQueue(controller);
3010  readableByteStreamControllerClearAlgorithms(controller);
3011  readableStreamError(stream, error);
3012}
3013
3014function readableByteStreamControllerCancelSteps(controller, reason) {
3015  readableByteStreamControllerClearPendingPullIntos(controller);
3016  resetQueue(controller);
3017  const result = controller[kState].cancelAlgorithm(reason);
3018  readableByteStreamControllerClearAlgorithms(controller);
3019  return result;
3020}
3021
3022function readableByteStreamControllerFillReadRequestFromQueue(controller, readRequest) {
3023  const {
3024    queue,
3025    queueTotalSize,
3026  } = controller[kState];
3027  assert(queueTotalSize > 0);
3028  const {
3029    buffer,
3030    byteOffset,
3031    byteLength,
3032  } = ArrayPrototypeShift(queue);
3033
3034  controller[kState].queueTotalSize -= byteLength;
3035  readableByteStreamControllerHandleQueueDrain(controller);
3036  const view = new Uint8Array(buffer, byteOffset, byteLength);
3037  readRequest[kChunk](view);
3038}
3039
3040function readableByteStreamControllerProcessReadRequestsUsingQueue(controller) {
3041  const {
3042    stream,
3043    queueTotalSize,
3044  } = controller[kState];
3045  const { reader } = stream[kState];
3046  assert(isReadableStreamDefaultReader(reader));
3047
3048  while (reader[kState].readRequests.length > 0) {
3049    if (queueTotalSize === 0) {
3050      return;
3051    }
3052    readableByteStreamControllerFillReadRequestFromQueue(
3053      controller,
3054      ArrayPrototypeShift(reader[kState].readRequests),
3055    );
3056  }
3057}
3058
3059function readableByteStreamControllerPullSteps(controller, readRequest) {
3060  const {
3061    pendingPullIntos,
3062    queueTotalSize,
3063    stream,
3064  } = controller[kState];
3065  assert(readableStreamHasDefaultReader(stream));
3066  if (queueTotalSize) {
3067    assert(!readableStreamGetNumReadRequests(stream));
3068    readableByteStreamControllerFillReadRequestFromQueue(
3069      controller,
3070      readRequest,
3071    );
3072    return;
3073  }
3074  const {
3075    autoAllocateChunkSize,
3076  } = controller[kState];
3077  if (autoAllocateChunkSize !== undefined) {
3078    try {
3079      const buffer = new ArrayBuffer(autoAllocateChunkSize);
3080      ArrayPrototypePush(
3081        pendingPullIntos,
3082        {
3083          buffer,
3084          bufferByteLength: autoAllocateChunkSize,
3085          byteOffset: 0,
3086          byteLength: autoAllocateChunkSize,
3087          bytesFilled: 0,
3088          elementSize: 1,
3089          ctor: Uint8Array,
3090          type: 'default',
3091        });
3092    } catch (error) {
3093      readRequest[kError](error);
3094      return;
3095    }
3096  }
3097
3098  readableStreamAddReadRequest(stream, readRequest);
3099  readableByteStreamControllerCallPullIfNeeded(controller);
3100}
3101
3102function setupReadableByteStreamController(
3103  stream,
3104  controller,
3105  startAlgorithm,
3106  pullAlgorithm,
3107  cancelAlgorithm,
3108  highWaterMark,
3109  autoAllocateChunkSize) {
3110  assert(stream[kState].controller === undefined);
3111  if (autoAllocateChunkSize !== undefined) {
3112    assert(NumberIsInteger(autoAllocateChunkSize));
3113    assert(autoAllocateChunkSize > 0);
3114  }
3115  controller[kState] = {
3116    byobRequest: null,
3117    closeRequested: false,
3118    pullAgain: false,
3119    pulling: false,
3120    started: false,
3121    stream,
3122    queue: [],
3123    queueTotalSize: 0,
3124    highWaterMark,
3125    pullAlgorithm,
3126    cancelAlgorithm,
3127    autoAllocateChunkSize,
3128    pendingPullIntos: [],
3129  };
3130  stream[kState].controller = controller;
3131
3132  const startResult = startAlgorithm();
3133
3134  PromisePrototypeThen(
3135    PromiseResolve(startResult),
3136    () => {
3137      controller[kState].started = true;
3138      assert(!controller[kState].pulling);
3139      assert(!controller[kState].pullAgain);
3140      readableByteStreamControllerCallPullIfNeeded(controller);
3141    },
3142    (error) => readableByteStreamControllerError(controller, error));
3143}
3144
3145function setupReadableByteStreamControllerFromSource(
3146  stream,
3147  source,
3148  highWaterMark) {
3149  const controller = new ReadableByteStreamController(kSkipThrow);
3150  const start = source?.start;
3151  const pull = source?.pull;
3152  const cancel = source?.cancel;
3153  const autoAllocateChunkSize = source?.autoAllocateChunkSize;
3154  const startAlgorithm = start ?
3155    FunctionPrototypeBind(start, source, controller) :
3156    nonOpStart;
3157  const pullAlgorithm = pull ?
3158    FunctionPrototypeBind(pull, source, controller) :
3159    nonOpPull;
3160  const cancelAlgorithm = cancel ?
3161    FunctionPrototypeBind(cancel, source) :
3162    nonOpCancel;
3163
3164  if (autoAllocateChunkSize === 0) {
3165    throw new ERR_INVALID_ARG_VALUE(
3166      'source.autoAllocateChunkSize',
3167      autoAllocateChunkSize);
3168  }
3169  setupReadableByteStreamController(
3170    stream,
3171    controller,
3172    startAlgorithm,
3173    pullAlgorithm,
3174    cancelAlgorithm,
3175    highWaterMark,
3176    autoAllocateChunkSize);
3177}
3178
3179module.exports = {
3180  ReadableStream,
3181  ReadableStreamDefaultReader,
3182  ReadableStreamBYOBReader,
3183  ReadableStreamBYOBRequest,
3184  ReadableByteStreamController,
3185  ReadableStreamDefaultController,
3186  TransferredReadableStream,
3187
3188  // Exported Brand Checks
3189  isReadableStream,
3190  isReadableByteStreamController,
3191  isReadableStreamBYOBRequest,
3192  isReadableStreamDefaultReader,
3193  isReadableStreamBYOBReader,
3194  isWritableStreamDefaultWriter,
3195  isWritableStreamDefaultController,
3196
3197  readableStreamPipeTo,
3198  readableStreamTee,
3199  readableByteStreamControllerConvertPullIntoDescriptor,
3200  isReadableStreamLocked,
3201  readableStreamCancel,
3202  readableStreamClose,
3203  readableStreamError,
3204  readableStreamHasDefaultReader,
3205  readableStreamGetNumReadRequests,
3206  readableStreamHasBYOBReader,
3207  readableStreamGetNumReadIntoRequests,
3208  readableStreamFulfillReadRequest,
3209  readableStreamFulfillReadIntoRequest,
3210  readableStreamAddReadRequest,
3211  readableStreamAddReadIntoRequest,
3212  readableStreamReaderGenericCancel,
3213  readableStreamReaderGenericInitialize,
3214  readableStreamReaderGenericRelease,
3215  readableStreamBYOBReaderRead,
3216  readableStreamDefaultReaderRead,
3217  setupReadableStreamBYOBReader,
3218  setupReadableStreamDefaultReader,
3219  readableStreamDefaultControllerClose,
3220  readableStreamDefaultControllerEnqueue,
3221  readableStreamDefaultControllerHasBackpressure,
3222  readableStreamDefaultControllerCanCloseOrEnqueue,
3223  readableStreamDefaultControllerGetDesiredSize,
3224  readableStreamDefaultControllerShouldCallPull,
3225  readableStreamDefaultControllerCallPullIfNeeded,
3226  readableStreamDefaultControllerClearAlgorithms,
3227  readableStreamDefaultControllerError,
3228  readableStreamDefaultControllerCancelSteps,
3229  readableStreamDefaultControllerPullSteps,
3230  setupReadableStreamDefaultController,
3231  setupReadableStreamDefaultControllerFromSource,
3232  readableByteStreamControllerClose,
3233  readableByteStreamControllerCommitPullIntoDescriptor,
3234  readableByteStreamControllerInvalidateBYOBRequest,
3235  readableByteStreamControllerClearAlgorithms,
3236  readableByteStreamControllerClearPendingPullIntos,
3237  readableByteStreamControllerGetDesiredSize,
3238  readableByteStreamControllerShouldCallPull,
3239  readableByteStreamControllerHandleQueueDrain,
3240  readableByteStreamControllerPullInto,
3241  readableByteStreamControllerRespondInternal,
3242  readableByteStreamControllerRespond,
3243  readableByteStreamControllerRespondInClosedState,
3244  readableByteStreamControllerFillHeadPullIntoDescriptor,
3245  readableByteStreamControllerEnqueue,
3246  readableByteStreamControllerEnqueueChunkToQueue,
3247  readableByteStreamControllerFillPullIntoDescriptorFromQueue,
3248  readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue,
3249  readableByteStreamControllerRespondInReadableState,
3250  readableByteStreamControllerRespondWithNewView,
3251  readableByteStreamControllerShiftPendingPullInto,
3252  readableByteStreamControllerCallPullIfNeeded,
3253  readableByteStreamControllerError,
3254  readableByteStreamControllerCancelSteps,
3255  readableByteStreamControllerPullSteps,
3256  setupReadableByteStreamController,
3257  setupReadableByteStreamControllerFromSource,
3258};
3259