• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Flags: --expose-internals --no-warnings
2'use strict';
3
4const common = require('../common');
5const { isDisturbed, isErrored, isReadable } = require('stream');
6const assert = require('assert');
7const {
8  isPromise,
9} = require('util/types');
10const {
11  setImmediate: delay
12} = require('timers/promises');
13
14const {
15  ByteLengthQueuingStrategy,
16  CountQueuingStrategy,
17  ReadableStream,
18  ReadableStreamDefaultReader,
19  ReadableStreamDefaultController,
20  ReadableByteStreamController,
21  ReadableStreamBYOBReader,
22  ReadableStreamBYOBRequest,
23  WritableStream,
24} = require('stream/web');
25
26const {
27  readableStreamPipeTo,
28  readableStreamTee,
29  readableByteStreamControllerConvertPullIntoDescriptor,
30  readableStreamDefaultControllerEnqueue,
31  readableByteStreamControllerEnqueue,
32  readableStreamDefaultControllerCanCloseOrEnqueue,
33  readableByteStreamControllerClose,
34  readableByteStreamControllerRespond,
35  readableStreamReaderGenericRelease,
36} = require('internal/webstreams/readablestream');
37
38const {
39  kState
40} = require('internal/webstreams/util');
41
42const {
43  createReadStream,
44  readFileSync,
45} = require('fs');
46const {
47  Buffer,
48} = require('buffer');
49
50const {
51  kTransfer,
52} = require('internal/worker/js_transferable');
53
54const {
55  inspect,
56} = require('util');
57
58{
59  const r = new ReadableStream();
60  assert.strictEqual(typeof r.locked, 'boolean');
61  assert.strictEqual(typeof r.cancel, 'function');
62  assert.strictEqual(typeof r.getReader, 'function');
63  assert.strictEqual(typeof r.pipeThrough, 'function');
64  assert.strictEqual(typeof r.pipeTo, 'function');
65  assert.strictEqual(typeof r.tee, 'function');
66
67  ['', null, 'asdf'].forEach((mode) => {
68    assert.throws(() => r.getReader({ mode }), {
69      code: 'ERR_INVALID_ARG_VALUE',
70    });
71  });
72
73  [1, 'asdf'].forEach((options) => {
74    assert.throws(() => r.getReader(options), {
75      code: 'ERR_INVALID_ARG_TYPE',
76    });
77  });
78
79  assert(!r.locked);
80  r.getReader();
81  assert(r.locked);
82}
83
84{
85  // Throw error and return rejected promise in `cancel()` method
86  // would execute same cleanup code
87  const r1 = new ReadableStream({
88    cancel: () => {
89      return Promise.reject('Cancel Error');
90    },
91  });
92  r1.cancel().finally(common.mustCall(() => {
93    const controllerState = r1[kState].controller[kState];
94
95    assert.strictEqual(controllerState.pullAlgorithm, undefined);
96    assert.strictEqual(controllerState.cancelAlgorithm, undefined);
97    assert.strictEqual(controllerState.sizeAlgorithm, undefined);
98  })).catch(() => {});
99
100  const r2 = new ReadableStream({
101    cancel() {
102      throw new Error('Cancel Error');
103    }
104  });
105  r2.cancel().finally(common.mustCall(() => {
106    const controllerState = r2[kState].controller[kState];
107
108    assert.strictEqual(controllerState.pullAlgorithm, undefined);
109    assert.strictEqual(controllerState.cancelAlgorithm, undefined);
110    assert.strictEqual(controllerState.sizeAlgorithm, undefined);
111  })).catch(() => {});
112}
113
114{
115  const source = {
116    start: common.mustCall((controller) => {
117      assert(controller instanceof ReadableStreamDefaultController);
118    }),
119    pull: common.mustCall((controller) => {
120      assert(controller instanceof ReadableStreamDefaultController);
121    }),
122    cancel: common.mustNotCall(),
123  };
124
125  new ReadableStream(source);
126}
127
128{
129  const source = {
130    start: common.mustCall(async (controller) => {
131      assert(controller instanceof ReadableStreamDefaultController);
132    }),
133    pull: common.mustCall(async (controller) => {
134      assert(controller instanceof ReadableStreamDefaultController);
135    }),
136    cancel: common.mustNotCall(),
137  };
138
139  new ReadableStream(source);
140}
141
142{
143  const source = {
144    start: common.mustCall((controller) => {
145      assert(controller instanceof ReadableByteStreamController);
146    }),
147    pull: common.mustNotCall(),
148    cancel: common.mustNotCall(),
149    type: 'bytes',
150  };
151
152  new ReadableStream(source);
153}
154
155{
156  const source = {
157    start: common.mustCall(async (controller) => {
158      assert(controller instanceof ReadableByteStreamController);
159    }),
160    pull: common.mustNotCall(),
161    cancel: common.mustNotCall(),
162    type: 'bytes',
163  };
164
165  new ReadableStream(source);
166}
167
168{
169  const source = {
170    start: common.mustCall(async (controller) => {
171      assert(controller instanceof ReadableByteStreamController);
172    }),
173    pull: common.mustCall(async (controller) => {
174      assert(controller instanceof ReadableByteStreamController);
175    }),
176    cancel: common.mustNotCall(),
177    type: 'bytes',
178  };
179
180  new ReadableStream(source, { highWaterMark: 10 });
181}
182
183{
184  // These are silly but they should all work per spec
185  new ReadableStream(1);
186  new ReadableStream('hello');
187  new ReadableStream(false);
188  new ReadableStream([]);
189  new ReadableStream(1, 1);
190  new ReadableStream(1, 'hello');
191  new ReadableStream(1, false);
192  new ReadableStream(1, []);
193}
194
195['a', {}, false].forEach((size) => {
196  assert.throws(() => {
197    new ReadableStream({}, { size });
198  }, {
199    code: 'ERR_INVALID_ARG_TYPE',
200  });
201});
202
203['a', {}].forEach((highWaterMark) => {
204  assert.throws(() => {
205    new ReadableStream({}, { highWaterMark });
206  }, {
207    code: 'ERR_INVALID_ARG_VALUE',
208  });
209
210  assert.throws(() => {
211    new ReadableStream({ type: 'bytes' }, { highWaterMark });
212  }, {
213    code: 'ERR_INVALID_ARG_VALUE',
214  });
215});
216
217[-1, NaN].forEach((highWaterMark) => {
218  assert.throws(() => {
219    new ReadableStream({}, { highWaterMark });
220  }, {
221    code: 'ERR_INVALID_ARG_VALUE',
222  });
223
224  assert.throws(() => {
225    new ReadableStream({ type: 'bytes' }, { highWaterMark });
226  }, {
227    code: 'ERR_INVALID_ARG_VALUE',
228  });
229});
230
231{
232  new ReadableStream({}, new ByteLengthQueuingStrategy({ highWaterMark: 1 }));
233  new ReadableStream({}, new CountQueuingStrategy({ highWaterMark: 1 }));
234}
235
236{
237  const strategy = new ByteLengthQueuingStrategy({ highWaterMark: 1 });
238  assert.strictEqual(strategy.highWaterMark, 1);
239  assert.strictEqual(strategy.size(new ArrayBuffer(10)), 10);
240
241  const { size } = strategy;
242  assert.strictEqual(size(new ArrayBuffer(10)), 10);
243}
244
245{
246  const strategy = new CountQueuingStrategy({ highWaterMark: 1 });
247  assert.strictEqual(strategy.highWaterMark, 1);
248  assert.strictEqual(strategy.size(new ArrayBuffer(10)), 1);
249
250  const { size } = strategy;
251  assert.strictEqual(size(new ArrayBuffer(10)), 1);
252}
253
254{
255  const r = new ReadableStream({
256    async start() {
257      throw new Error('boom');
258    }
259  });
260
261  setImmediate(() => {
262    assert.strictEqual(r[kState].state, 'errored');
263    assert.match(r[kState].storedError?.message, /boom/);
264  });
265}
266
267{
268  const data = Buffer.from('hello');
269  const r = new ReadableStream({
270    start(controller) {
271      controller.enqueue(data);
272      controller.close();
273    },
274  });
275
276  (async function read() {
277    const reader = r.getReader();
278    let res = await reader.read();
279    if (res.done) return;
280    const buf = Buffer.from(res.value);
281    assert.strictEqual(buf.toString(), data.toString());
282    res = await reader.read();
283    assert(res.done);
284  })().then(common.mustCall());
285}
286
287{
288  const r = new ReadableStream({
289    start(controller) {
290      controller.close();
291    },
292  });
293
294  (async function read() {
295    const reader = r.getReader();
296    const res = await reader.read();
297    assert(res.done);
298  })().then(common.mustCall());
299}
300
301assert.throws(() => {
302  new ReadableStream({
303    get start() { throw new Error('boom1'); }
304  }, {
305    get size() { throw new Error('boom2'); }
306  });
307}, /boom2/);
308
309{
310  const stream = new ReadableStream();
311  const reader = stream.getReader();
312
313  assert(stream.locked);
314  assert.strictEqual(reader[kState].stream, stream);
315  assert.strictEqual(stream[kState].reader, reader);
316
317  assert.throws(() => stream.getReader(), {
318    code: 'ERR_INVALID_STATE',
319  });
320
321  assert(reader instanceof ReadableStreamDefaultReader);
322
323  assert(isPromise(reader.closed));
324  assert.strictEqual(typeof reader.cancel, 'function');
325  assert.strictEqual(typeof reader.read, 'function');
326  assert.strictEqual(typeof reader.releaseLock, 'function');
327
328  const read1 = reader.read();
329  const read2 = reader.read();
330
331  read1.then(common.mustNotCall(), common.mustCall());
332  read2.then(common.mustNotCall(), common.mustCall());
333
334  assert.notStrictEqual(read1, read2);
335
336  assert.strictEqual(reader[kState].readRequests.length, 2);
337
338  delay().then(common.mustCall());
339
340  assert(stream.locked);
341  reader.releaseLock();
342  assert(!stream.locked);
343}
344
345{
346  const stream = new ReadableStream();
347  const reader = stream.getReader();
348  const closedBefore = reader.closed;
349  assert(stream.locked);
350  reader.releaseLock();
351  assert(!stream.locked);
352  const closedAfter = reader.closed;
353
354  assert.strictEqual(closedBefore, closedAfter);
355
356  assert.rejects(reader.read(), {
357    code: 'ERR_INVALID_STATE',
358  });
359
360  assert.rejects(closedBefore, {
361    code: 'ERR_INVALID_STATE',
362  });
363}
364
365{
366  const stream = new ReadableStream();
367  const iterable = stream.values();
368  readableStreamReaderGenericRelease(stream[kState].reader);
369  assert.rejects(iterable.next(), {
370    code: 'ERR_INVALID_STATE',
371  }).then(common.mustCall());
372}
373
374{
375  const stream = new ReadableStream();
376  const iterable = stream.values();
377  readableStreamReaderGenericRelease(stream[kState].reader);
378  assert.rejects(iterable.return(), {
379    code: 'ERR_INVALID_STATE',
380  }).then(common.mustCall());
381}
382
383{
384  const stream = new ReadableStream({
385    start(controller) {
386      controller.enqueue(Buffer.from('hello'));
387    }
388  });
389
390  const reader = stream.getReader();
391
392  assert.rejects(stream.cancel(), {
393    code: 'ERR_INVALID_STATE',
394  });
395
396  reader.cancel();
397
398  reader.read().then(common.mustCall(({ value, done }) => {
399    assert.strictEqual(value, undefined);
400    assert(done);
401  }));
402}
403
404{
405  const stream = new ReadableStream({
406    start(controller) {
407      controller.close();
408    }
409  });
410  assert(!stream.locked);
411
412  const cancel1 = stream.cancel();
413  const cancel2 = stream.cancel();
414
415  assert.notStrictEqual(cancel1, cancel2);
416
417  Promise.all([cancel1, cancel2]).then(common.mustCall((res) => {
418    assert.deepStrictEqual(res, [undefined, undefined]);
419  }));
420}
421
422{
423  const stream = new ReadableStream({
424    start(controller) {
425      controller.close();
426    }
427  });
428
429  stream.getReader().releaseLock();
430  stream.getReader().releaseLock();
431  stream.getReader();
432}
433
434{
435  const stream = new ReadableStream({
436    start(controller) {
437      controller.close();
438    }
439  });
440
441  stream.getReader();
442
443  assert.throws(() => stream.getReader(), {
444    code: 'ERR_INVALID_STATE',
445  });
446}
447
448{
449  const stream = new ReadableStream({
450    start(controller) {
451      controller.close();
452    },
453  });
454
455  const reader = stream.getReader();
456
457  reader.closed.then(common.mustCall());
458
459  reader.read().then(common.mustCall(({ value, done }) => {
460    assert.strictEqual(value, undefined);
461    assert(done);
462    reader.read().then(common.mustCall(({ value, done }) => {
463      assert.strictEqual(value, undefined);
464      assert(done);
465    }));
466  }));
467}
468
469{
470  const stream = new ReadableStream({
471    start(controller) {
472      controller.close();
473    },
474  });
475
476  const reader = stream.getReader();
477
478  const closedBefore = reader.closed;
479  reader.releaseLock();
480  const closedAfter = reader.closed;
481  assert.notStrictEqual(closedBefore, closedAfter);
482
483  closedBefore.then(common.mustCall());
484  assert.rejects(closedAfter, {
485    code: 'ERR_INVALID_STATE',
486  });
487}
488
489{
490  let c;
491  const stream = new ReadableStream({
492    start(controller) {
493      c = controller;
494    },
495  });
496
497  const reader = stream.getReader();
498  c.close();
499
500  const closedBefore = reader.closed;
501  reader.releaseLock();
502  const closedAfter = reader.closed;
503  assert.notStrictEqual(closedBefore, closedAfter);
504
505  closedBefore.then(common.mustCall());
506  assert.rejects(closedAfter, {
507    code: 'ERR_INVALID_STATE',
508  });
509}
510
511{
512  const stream = new ReadableStream({
513    start(controller) {
514      controller.close();
515    },
516  });
517
518  const reader = stream.getReader();
519
520  const cancel1 = reader.cancel();
521  const cancel2 = reader.cancel();
522  const closed = reader.closed;
523
524  assert.notStrictEqual(cancel1, cancel2);
525  assert.notStrictEqual(cancel1, closed);
526  assert.notStrictEqual(cancel2, closed);
527
528  Promise.all([cancel1, cancel2]).then(common.mustCall((res) => {
529    assert.deepStrictEqual(res, [undefined, undefined]);
530  }));
531}
532
533{
534  let c;
535  const stream = new ReadableStream({
536    start(controller) {
537      c = controller;
538    },
539  });
540
541  const reader = stream.getReader();
542  c.close();
543
544  const cancel1 = reader.cancel();
545  const cancel2 = reader.cancel();
546  const closed = reader.closed;
547
548  assert.notStrictEqual(cancel1, cancel2);
549  assert.notStrictEqual(cancel1, closed);
550  assert.notStrictEqual(cancel2, closed);
551
552  Promise.all([cancel1, cancel2]).then(common.mustCall((res) => {
553    assert.deepStrictEqual(res, [undefined, undefined]);
554  }));
555}
556
557{
558  const stream = new ReadableStream();
559  const cancel1 = stream.cancel();
560  const cancel2 = stream.cancel();
561  assert.notStrictEqual(cancel1, cancel2);
562
563  Promise.all([cancel1, cancel2]).then(common.mustCall((res) => {
564    assert.deepStrictEqual(res, [undefined, undefined]);
565  }));
566
567  stream.getReader().read().then(common.mustCall(({ value, done }) => {
568    assert.strictEqual(value, undefined);
569    assert(done);
570  }));
571}
572
573{
574  const error = new Error('boom');
575  const stream = new ReadableStream({
576    start(controller) {
577      controller.error(error);
578    }
579  });
580  stream.getReader().releaseLock();
581  const reader = stream.getReader();
582  assert.rejects(reader.closed, error);
583  assert.rejects(reader.read(), error);
584  assert.rejects(reader.read(), error);
585}
586
587{
588  const error = new Error('boom');
589  const stream = new ReadableStream({
590    start(controller) {
591      controller.error(error);
592    }
593  });
594  const reader = stream.getReader();
595  const cancel1 = reader.cancel();
596  const cancel2 = reader.cancel();
597  assert.notStrictEqual(cancel1, cancel2);
598  assert.rejects(cancel1, error);
599  assert.rejects(cancel2, error);
600}
601
602{
603  const error = new Error('boom');
604  const stream = new ReadableStream({
605    async start(controller) {
606      throw error;
607    }
608  });
609  stream.getReader().releaseLock();
610  const reader = stream.getReader();
611  assert.rejects(reader.closed, error);
612  assert.rejects(reader.read(), error);
613  assert.rejects(reader.read(), error);
614}
615
616{
617  const buf1 = Buffer.from('hello');
618  const buf2 = Buffer.from('there');
619  let doClose;
620  const stream = new ReadableStream({
621    start(controller) {
622      controller.enqueue(buf1);
623      controller.enqueue(buf2);
624      doClose = controller.close.bind(controller);
625    }
626  });
627  const reader = stream.getReader();
628  doClose();
629  reader.read().then(common.mustCall(({ value, done }) => {
630    assert.deepStrictEqual(value, buf1);
631    assert(!done);
632    reader.read().then(common.mustCall(({ value, done }) => {
633      assert.deepStrictEqual(value, buf2);
634      assert(!done);
635      reader.read().then(common.mustCall(({ value, done }) => {
636        assert.strictEqual(value, undefined);
637        assert(done);
638      }));
639    }));
640  }));
641}
642
643{
644  const buf1 = Buffer.from('hello');
645  const buf2 = Buffer.from('there');
646  const stream = new ReadableStream({
647    start(controller) {
648      controller.enqueue(buf1);
649      controller.enqueue(buf2);
650    }
651  });
652  const reader = stream.getReader();
653  reader.read().then(common.mustCall(({ value, done }) => {
654    assert.deepStrictEqual(value, buf1);
655    assert(!done);
656    reader.read().then(common.mustCall(({ value, done }) => {
657      assert.deepStrictEqual(value, buf2);
658      assert(!done);
659      reader.read().then(common.mustNotCall());
660      delay().then(common.mustCall());
661    }));
662  }));
663}
664
665{
666  const stream = new ReadableStream({
667    start(controller) {
668      controller.enqueue('a');
669      controller.enqueue('b');
670      controller.close();
671    }
672  });
673
674  const { 0: s1, 1: s2 } = stream.tee();
675
676  assert(s1 instanceof ReadableStream);
677  assert(s2 instanceof ReadableStream);
678
679  async function read(stream) {
680    const reader = stream.getReader();
681    assert.deepStrictEqual(
682      await reader.read(), { value: 'a', done: false });
683    assert.deepStrictEqual(
684      await reader.read(), { value: 'b', done: false });
685    assert.deepStrictEqual(
686      await reader.read(), { value: undefined, done: true });
687  }
688
689  Promise.all([
690    read(s1),
691    read(s2),
692  ]).then(common.mustCall());
693}
694
695{
696  const error = new Error('boom');
697  const stream = new ReadableStream({
698    start(controller) {
699      controller.enqueue('a');
700      controller.enqueue('b');
701    },
702    pull() { throw error; }
703  });
704
705  const { 0: s1, 1: s2 } = stream.tee();
706
707  assert(stream.locked);
708
709  assert(s1 instanceof ReadableStream);
710  assert(s2 instanceof ReadableStream);
711
712  const reader1 = s1.getReader();
713  const reader2 = s2.getReader();
714
715  const closed1 = reader1.closed;
716  const closed2 = reader2.closed;
717
718  assert.notStrictEqual(closed1, closed2);
719
720  assert.rejects(closed1, error);
721  assert.rejects(closed2, error);
722}
723
724{
725  const stream = new ReadableStream({
726    start(controller) {
727      controller.enqueue('a');
728      controller.enqueue('b');
729      controller.close();
730    }
731  });
732
733  const { 0: s1, 1: s2 } = stream.tee();
734
735  assert(s1 instanceof ReadableStream);
736  assert(s2 instanceof ReadableStream);
737
738  s2.cancel();
739
740  async function read(stream, canceled = false) {
741    const reader = stream.getReader();
742    if (!canceled) {
743      assert.deepStrictEqual(
744        await reader.read(), { value: 'a', done: false });
745      assert.deepStrictEqual(
746        await reader.read(), { value: 'b', done: false });
747    }
748    assert.deepStrictEqual(
749      await reader.read(), { value: undefined, done: true });
750  }
751
752  Promise.all([
753    read(s1),
754    read(s2, true),
755  ]).then(common.mustCall());
756}
757
758{
759  const error1 = new Error('boom1');
760  const error2 = new Error('boom2');
761
762  const stream = new ReadableStream({
763    cancel(reason) {
764      assert.deepStrictEqual(reason, [error1, error2]);
765    }
766  });
767
768  const { 0: s1, 1: s2 } = stream.tee();
769  s1.cancel(error1);
770  s2.cancel(error2);
771}
772
773{
774  const error1 = new Error('boom1');
775  const error2 = new Error('boom2');
776
777  const stream = new ReadableStream({
778    cancel(reason) {
779      assert.deepStrictEqual(reason, [error1, error2]);
780    }
781  });
782
783  const { 0: s1, 1: s2 } = stream.tee();
784  s2.cancel(error2);
785  s1.cancel(error1);
786}
787
788{
789  const error = new Error('boom1');
790
791  const stream = new ReadableStream({
792    cancel() {
793      throw error;
794    }
795  });
796
797  const { 0: s1, 1: s2 } = stream.tee();
798
799  assert.rejects(s1.cancel(), error);
800  assert.rejects(s2.cancel(), error);
801}
802
803{
804  const error = new Error('boom1');
805  let c;
806  const stream = new ReadableStream({
807    start(controller) {
808      c = controller;
809    }
810  });
811
812  const { 0: s1, 1: s2 } = stream.tee();
813  c.error(error);
814
815  assert.rejects(s1.cancel(), error);
816  assert.rejects(s2.cancel(), error);
817}
818
819{
820  const error = new Error('boom1');
821  let c;
822  const stream = new ReadableStream({
823    start(controller) {
824      c = controller;
825    }
826  });
827
828  const { 0: s1, 1: s2 } = stream.tee();
829
830  const reader1 = s1.getReader();
831  const reader2 = s2.getReader();
832
833  assert.rejects(reader1.closed, error);
834  assert.rejects(reader2.closed, error);
835
836  assert.rejects(reader1.read(), error);
837  assert.rejects(reader2.read(), error);
838
839  setImmediate(() => c.error(error));
840}
841
842{
843  let pullCount = 0;
844  const stream = new ReadableStream({
845    pull(controller) {
846      if (pullCount)
847        controller.enqueue(pullCount);
848      pullCount++;
849    },
850  });
851
852  const reader = stream.getReader();
853
854  queueMicrotask(common.mustCall(() => {
855    assert.strictEqual(pullCount, 1);
856    reader.read().then(common.mustCall(({ value, done }) => {
857      assert.strictEqual(value, 1);
858      assert(!done);
859
860      reader.read().then(common.mustCall(({ value, done }) => {
861        assert.strictEqual(value, 2);
862        assert(!done);
863      }));
864
865    }));
866  }));
867}
868
869{
870  const stream = new ReadableStream({
871    start(controller) {
872      controller.enqueue('a');
873    },
874    pull: common.mustCall(),
875  });
876
877  stream.getReader().read().then(common.mustCall(({ value, done }) => {
878    assert.strictEqual(value, 'a');
879    assert(!done);
880  }));
881}
882
883{
884  const stream = new ReadableStream({
885    start(controller) {
886      controller.enqueue('a');
887      controller.enqueue('b');
888    },
889    pull: common.mustCall(),
890  });
891
892  const reader = stream.getReader();
893  reader.read().then(common.mustCall(({ value, done }) => {
894    assert.strictEqual(value, 'a');
895    assert(!done);
896
897    reader.read().then(common.mustCall(({ value, done }) => {
898      assert.strictEqual(value, 'b');
899      assert(!done);
900    }));
901  }));
902}
903
904{
905  const stream = new ReadableStream({
906    start(controller) {
907      controller.enqueue('a');
908      controller.enqueue('b');
909      controller.close();
910    },
911    pull: common.mustNotCall(),
912  });
913
914  const reader = stream.getReader();
915  reader.read().then(common.mustCall(({ value, done }) => {
916    assert.strictEqual(value, 'a');
917    assert(!done);
918
919    reader.read().then(common.mustCall(({ value, done }) => {
920      assert.strictEqual(value, 'b');
921      assert(!done);
922
923      reader.read().then(common.mustCall(({ value, done }) => {
924        assert.strictEqual(value, undefined);
925        assert(done);
926      }));
927
928    }));
929  }));
930}
931
932{
933  let res;
934  let promise;
935  let calls = 0;
936  const stream = new ReadableStream({
937    pull(controller) {
938      controller.enqueue(++calls);
939      promise = new Promise((resolve) => res = resolve);
940      return promise;
941    }
942  });
943
944  const reader = stream.getReader();
945
946  (async () => {
947    await reader.read();
948    assert.strictEqual(calls, 1);
949    await delay();
950    assert.strictEqual(calls, 1);
951    res();
952    await delay();
953    assert.strictEqual(calls, 2);
954  })().then(common.mustCall());
955}
956
957{
958  const stream = new ReadableStream({
959    start(controller) {
960      controller.enqueue('a');
961      controller.enqueue('b');
962      controller.enqueue('c');
963    },
964    pull: common.mustCall(4),
965  }, {
966    highWaterMark: Infinity,
967    size() { return 1; }
968  });
969
970  const reader = stream.getReader();
971  (async () => {
972    await delay();
973    await reader.read();
974    await reader.read();
975    await reader.read();
976  })().then(common.mustCall());
977}
978
979{
980  const stream = new ReadableStream({
981    start(controller) {
982      controller.enqueue('a');
983      controller.enqueue('b');
984      controller.enqueue('c');
985      controller.close();
986    },
987    pull: common.mustNotCall(),
988  }, {
989    highWaterMark: Infinity,
990    size() { return 1; }
991  });
992
993  const reader = stream.getReader();
994  (async () => {
995    await delay();
996    await reader.read();
997    await reader.read();
998    await reader.read();
999  })().then(common.mustCall());
1000}
1001
1002{
1003  let calls = 0;
1004  let res;
1005  const ready = new Promise((resolve) => res = resolve);
1006
1007  new ReadableStream({
1008    pull(controller) {
1009      controller.enqueue(++calls);
1010      if (calls === 4)
1011        res();
1012    }
1013  }, {
1014    size() { return 1; },
1015    highWaterMark: 4
1016  });
1017
1018  ready.then(common.mustCall(() => {
1019    assert.strictEqual(calls, 4);
1020  }));
1021}
1022
1023{
1024  const stream = new ReadableStream({
1025    pull: common.mustCall((controller) => controller.close())
1026  });
1027
1028  const reader = stream.getReader();
1029
1030  reader.closed.then(common.mustCall());
1031}
1032
1033{
1034  const error = new Error('boom');
1035  const stream = new ReadableStream({
1036    pull: common.mustCall((controller) => controller.error(error))
1037  });
1038
1039  const reader = stream.getReader();
1040
1041  assert.rejects(reader.closed, error);
1042}
1043
1044{
1045  const error = new Error('boom');
1046  const error2 = new Error('boom2');
1047  const stream = new ReadableStream({
1048    pull: common.mustCall((controller) => {
1049      controller.error(error);
1050      throw error2;
1051    })
1052  });
1053
1054  const reader = stream.getReader();
1055
1056  assert.rejects(reader.closed, error);
1057}
1058
1059{
1060  let startCalled = false;
1061  new ReadableStream({
1062    start: common.mustCall((controller) => {
1063      controller.enqueue('a');
1064      controller.close();
1065      assert.throws(() => controller.enqueue('b'), {
1066        code: 'ERR_INVALID_STATE'
1067      });
1068      startCalled = true;
1069    })
1070  });
1071  assert(startCalled);
1072}
1073
1074{
1075  let startCalled = false;
1076  new ReadableStream({
1077    start: common.mustCall((controller) => {
1078      controller.close();
1079      assert.throws(() => controller.enqueue('b'), {
1080        code: 'ERR_INVALID_STATE'
1081      });
1082      startCalled = true;
1083    })
1084  });
1085  assert(startCalled);
1086}
1087
1088{
1089  class Source {
1090    startCalled = false;
1091    pullCalled = false;
1092    cancelCalled = false;
1093
1094    start(controller) {
1095      assert.strictEqual(this, source);
1096      this.startCalled = true;
1097      controller.enqueue('a');
1098    }
1099
1100    pull() {
1101      assert.strictEqual(this, source);
1102      this.pullCalled = true;
1103    }
1104
1105    cancel() {
1106      assert.strictEqual(this, source);
1107      this.cancelCalled = true;
1108    }
1109  }
1110
1111  const source = new Source();
1112
1113  const stream = new ReadableStream(source);
1114  const reader = stream.getReader();
1115
1116  (async () => {
1117    await reader.read();
1118    reader.releaseLock();
1119    stream.cancel();
1120    assert(source.startCalled);
1121    assert(source.pullCalled);
1122    assert(source.cancelCalled);
1123  })().then(common.mustCall());
1124}
1125
1126{
1127  let startCalled = false;
1128  new ReadableStream({
1129    start(controller) {
1130      assert.strictEqual(controller.desiredSize, 10);
1131      controller.close();
1132      assert.strictEqual(controller.desiredSize, 0);
1133      startCalled = true;
1134    }
1135  }, {
1136    highWaterMark: 10
1137  });
1138  assert(startCalled);
1139}
1140
1141{
1142  let startCalled = false;
1143  new ReadableStream({
1144    start(controller) {
1145      assert.strictEqual(controller.desiredSize, 10);
1146      controller.error();
1147      assert.strictEqual(controller.desiredSize, null);
1148      startCalled = true;
1149    }
1150  }, {
1151    highWaterMark: 10
1152  });
1153  assert(startCalled);
1154}
1155
1156{
1157  class Foo extends ReadableStream {}
1158  const foo = new Foo();
1159  foo.getReader();
1160}
1161
1162{
1163  let startCalled = false;
1164  new ReadableStream({
1165    start(controller) {
1166      assert.strictEqual(controller.desiredSize, 1);
1167      controller.enqueue('a');
1168      assert.strictEqual(controller.desiredSize, 0);
1169      controller.enqueue('a');
1170      assert.strictEqual(controller.desiredSize, -1);
1171      controller.enqueue('a');
1172      assert.strictEqual(controller.desiredSize, -2);
1173      controller.enqueue('a');
1174      assert.strictEqual(controller.desiredSize, -3);
1175      startCalled = true;
1176    }
1177  });
1178  assert(startCalled);
1179}
1180
1181{
1182  let c;
1183  const stream = new ReadableStream({
1184    start(controller) {
1185      c = controller;
1186    }
1187  });
1188
1189  const reader = stream.getReader();
1190
1191  (async () => {
1192    assert.strictEqual(c.desiredSize, 1);
1193    c.enqueue(1);
1194    assert.strictEqual(c.desiredSize, 0);
1195    await reader.read();
1196    assert.strictEqual(c.desiredSize, 1);
1197    c.enqueue(1);
1198    c.enqueue(1);
1199    assert.strictEqual(c.desiredSize, -1);
1200    await reader.read();
1201    assert.strictEqual(c.desiredSize, 0);
1202    await reader.read();
1203    assert.strictEqual(c.desiredSize, 1);
1204  })().then(common.mustCall());
1205}
1206
1207{
1208  let c;
1209  new ReadableStream({
1210    start(controller) {
1211      c = controller;
1212    }
1213  });
1214  assert(c instanceof ReadableStreamDefaultController);
1215  assert.strictEqual(typeof c.desiredSize, 'number');
1216  assert.strictEqual(typeof c.enqueue, 'function');
1217  assert.strictEqual(typeof c.close, 'function');
1218  assert.strictEqual(typeof c.error, 'function');
1219}
1220
1221class Source {
1222  constructor() {
1223    this.cancelCalled = false;
1224  }
1225
1226  start(controller) {
1227    this.stream = createReadStream(__filename);
1228    this.stream.on('data', (chunk) => {
1229      controller.enqueue(chunk);
1230    });
1231    this.stream.once('end', () => {
1232      if (!this.cancelCalled)
1233        controller.close();
1234    });
1235    this.stream.once('error', (error) => {
1236      controller.error(error);
1237    });
1238  }
1239
1240  cancel() {
1241    this.cancelCalled = true;
1242  }
1243}
1244
1245{
1246  const source = new Source();
1247  const stream = new ReadableStream(source);
1248
1249  async function read(stream) {
1250    const reader = stream.getReader();
1251    const chunks = [];
1252    let read = await reader.read();
1253    while (!read.done) {
1254      chunks.push(Buffer.from(read.value));
1255      read = await reader.read();
1256    }
1257    return Buffer.concat(chunks);
1258  }
1259
1260  read(stream).then(common.mustCall((data) => {
1261    const check = readFileSync(__filename);
1262    assert.deepStrictEqual(data, check);
1263  }));
1264}
1265
1266{
1267  const source = new Source();
1268  const stream = new ReadableStream(source);
1269
1270  async function read(stream) {
1271    const chunks = [];
1272    for await (const chunk of stream)
1273      chunks.push(chunk);
1274    return Buffer.concat(chunks);
1275  }
1276
1277  read(stream).then(common.mustCall((data) => {
1278    const check = readFileSync(__filename);
1279    assert.deepStrictEqual(data, check);
1280
1281    assert.strictEqual(stream[kState].state, 'closed');
1282    assert(!stream.locked);
1283  }));
1284}
1285
1286{
1287  const source = new Source();
1288  const stream = new ReadableStream(source);
1289
1290  [1, false, ''].forEach((options) => {
1291    assert.throws(() => stream.values(options), {
1292      code: 'ERR_INVALID_ARG_TYPE',
1293    });
1294  });
1295
1296  async function read(stream) {
1297    for await (const _ of stream.values({ preventCancel: true }))
1298      return;
1299  }
1300
1301  read(stream).then(common.mustCall((data) => {
1302    assert.strictEqual(stream[kState].state, 'readable');
1303  }));
1304}
1305
1306{
1307  const source = new Source();
1308  const stream = new ReadableStream(source);
1309
1310  async function read(stream) {
1311    for await (const _ of stream.values({ preventCancel: false }))
1312      return;
1313  }
1314
1315  read(stream).then(common.mustCall((data) => {
1316    assert.strictEqual(stream[kState].state, 'closed');
1317  }));
1318}
1319
1320{
1321  const source = new Source();
1322  const stream = new ReadableStream(source);
1323
1324  const error = new Error('boom');
1325
1326  async function read(stream) {
1327    // eslint-disable-next-line no-unused-vars
1328    for await (const _ of stream.values({ preventCancel: true }))
1329      throw error;
1330  }
1331
1332  assert.rejects(read(stream), error).then(common.mustCall(() => {
1333    assert.strictEqual(stream[kState].state, 'readable');
1334  }));
1335}
1336
1337{
1338  const source = new Source();
1339  const stream = new ReadableStream(source);
1340
1341  const error = new Error('boom');
1342
1343  async function read(stream) {
1344    // eslint-disable-next-line no-unused-vars
1345    for await (const _ of stream.values({ preventCancel: false }))
1346      throw error;
1347  }
1348
1349  assert.rejects(read(stream), error).then(common.mustCall(() => {
1350    assert.strictEqual(stream[kState].state, 'closed');
1351  }));
1352}
1353
1354{
1355  assert.throws(() => Reflect.get(ReadableStream.prototype, 'locked', {}), {
1356    code: 'ERR_INVALID_THIS',
1357  });
1358  assert.rejects(() => ReadableStream.prototype.cancel.call({}), {
1359    code: 'ERR_INVALID_THIS',
1360  });
1361  assert.throws(() => ReadableStream.prototype.getReader.call({}), {
1362    code: 'ERR_INVALID_THIS',
1363  });
1364  assert.throws(() => ReadableStream.prototype.tee.call({}), {
1365    code: 'ERR_INVALID_THIS',
1366  });
1367  assert.throws(() => ReadableStream.prototype.values.call({}), {
1368    code: 'ERR_INVALID_THIS',
1369  });
1370  assert.throws(() => ReadableStream.prototype[kTransfer].call({}), {
1371    code: 'ERR_INVALID_THIS',
1372  });
1373  assert.rejects(() => ReadableStreamDefaultReader.prototype.read.call({}), {
1374    code: 'ERR_INVALID_THIS',
1375  });
1376  assert.rejects(() => ReadableStreamDefaultReader.prototype.cancel.call({}), {
1377    code: 'ERR_INVALID_THIS',
1378  });
1379  assert.rejects(() => {
1380    return Reflect.get(ReadableStreamDefaultReader.prototype, 'closed');
1381  }, {
1382    code: 'ERR_INVALID_THIS',
1383  });
1384  assert.throws(() => {
1385    ReadableStreamDefaultReader.prototype.releaseLock.call({});
1386  }, {
1387    code: 'ERR_INVALID_THIS',
1388  });
1389  assert.rejects(() => ReadableStreamBYOBReader.prototype.read.call({}), {
1390    code: 'ERR_INVALID_THIS',
1391  });
1392  assert.throws(() => {
1393    ReadableStreamBYOBReader.prototype.releaseLock.call({});
1394  }, {
1395    code: 'ERR_INVALID_THIS',
1396  });
1397  assert.rejects(() => {
1398    return Reflect.get(ReadableStreamBYOBReader.prototype, 'closed');
1399  }, {
1400    code: 'ERR_INVALID_THIS',
1401  });
1402  assert.rejects(() => ReadableStreamBYOBReader.prototype.cancel.call({}), {
1403    code: 'ERR_INVALID_THIS',
1404  });
1405
1406  assert.throws(() => {
1407    Reflect.get(ReadableByteStreamController.prototype, 'byobRequest', {});
1408  }, {
1409    code: 'ERR_INVALID_THIS',
1410  });
1411  assert.throws(() => {
1412    Reflect.get(ReadableByteStreamController.prototype, 'desiredSize', {});
1413  }, {
1414    code: 'ERR_INVALID_THIS',
1415  });
1416  assert.throws(() => {
1417    ReadableByteStreamController.prototype.close.call({});
1418  }, {
1419    code: 'ERR_INVALID_THIS',
1420  });
1421  assert.throws(() => {
1422    ReadableByteStreamController.prototype.enqueue.call({});
1423  }, {
1424    code: 'ERR_INVALID_THIS',
1425  });
1426  assert.throws(() => {
1427    ReadableByteStreamController.prototype.error.call({});
1428  }, {
1429    code: 'ERR_INVALID_THIS',
1430  });
1431
1432  assert.throws(() => new ReadableStreamBYOBRequest(), {
1433    code: 'ERR_ILLEGAL_CONSTRUCTOR',
1434  });
1435
1436  assert.throws(() => new ReadableStreamDefaultController(), {
1437    code: 'ERR_ILLEGAL_CONSTRUCTOR',
1438  });
1439
1440  assert.throws(() => new ReadableByteStreamController(), {
1441    code: 'ERR_ILLEGAL_CONSTRUCTOR',
1442  });
1443}
1444
1445{
1446  let controller;
1447  const readable = new ReadableStream({
1448    start(c) { controller = c; }
1449  });
1450
1451  assert.strictEqual(
1452    inspect(readable),
1453    'ReadableStream { locked: false, state: \'readable\', ' +
1454    'supportsBYOB: false }');
1455  assert.strictEqual(
1456    inspect(readable, { depth: null }),
1457    'ReadableStream { locked: false, state: \'readable\', ' +
1458    'supportsBYOB: false }');
1459  assert.strictEqual(
1460    inspect(readable, { depth: 0 }),
1461    'ReadableStream [Object]');
1462
1463  assert.strictEqual(
1464    inspect(controller),
1465    'ReadableStreamDefaultController {}');
1466  assert.strictEqual(
1467    inspect(controller, { depth: null }),
1468    'ReadableStreamDefaultController {}');
1469  assert.strictEqual(
1470    inspect(controller, { depth: 0 }),
1471    'ReadableStreamDefaultController {}');
1472
1473  const reader = readable.getReader();
1474
1475  assert.match(
1476    inspect(reader),
1477    /ReadableStreamDefaultReader/);
1478  assert.match(
1479    inspect(reader, { depth: null }),
1480    /ReadableStreamDefaultReader/);
1481  assert.match(
1482    inspect(reader, { depth: 0 }),
1483    /ReadableStreamDefaultReader/);
1484
1485  assert.rejects(readableStreamPipeTo(1), {
1486    code: 'ERR_INVALID_ARG_TYPE',
1487  });
1488
1489  assert.rejects(readableStreamPipeTo(new ReadableStream(), 1), {
1490    code: 'ERR_INVALID_ARG_TYPE',
1491  });
1492
1493  assert.rejects(
1494    readableStreamPipeTo(
1495      new ReadableStream(),
1496      new WritableStream(),
1497      false,
1498      false,
1499      false,
1500      {}),
1501    {
1502      code: 'ERR_INVALID_ARG_TYPE',
1503    });
1504}
1505
1506{
1507  const readable = new ReadableStream();
1508  const reader = readable.getReader();
1509  reader.releaseLock();
1510  reader.releaseLock();
1511  assert.rejects(reader.read(), {
1512    code: 'ERR_INVALID_STATE',
1513  });
1514  assert.rejects(reader.cancel(), {
1515    code: 'ERR_INVALID_STATE',
1516  });
1517}
1518
1519{
1520  // Test tee() cloneForBranch2 argument
1521  const readable = new ReadableStream({
1522    start(controller) {
1523      controller.enqueue('hello');
1524    }
1525  });
1526  const [r1, r2] = readableStreamTee(readable, true);
1527  r1.getReader().read().then(
1528    common.mustCall(({ value }) => assert.strictEqual(value, 'hello')));
1529  r2.getReader().read().then(
1530    common.mustCall(({ value }) => assert.strictEqual(value, 'hello')));
1531}
1532
1533{
1534  assert.throws(() => {
1535    readableByteStreamControllerConvertPullIntoDescriptor({
1536      bytesFilled: 10,
1537      byteLength: 5
1538    });
1539  }, {
1540    code: 'ERR_INVALID_STATE',
1541  });
1542}
1543
1544{
1545  let controller;
1546  const readable = new ReadableStream({
1547    start(c) { controller = c; }
1548  });
1549
1550  controller[kState].pendingPullIntos = [{}];
1551  assert.throws(() => readableByteStreamControllerRespond(controller, 0), {
1552    code: 'ERR_INVALID_ARG_VALUE',
1553  });
1554
1555  readable.cancel().then(common.mustCall());
1556
1557  assert.throws(() => readableByteStreamControllerRespond(controller, 1), {
1558    code: 'ERR_INVALID_ARG_VALUE',
1559  });
1560
1561  assert(!readableStreamDefaultControllerCanCloseOrEnqueue(controller));
1562  readableStreamDefaultControllerEnqueue(controller);
1563  readableByteStreamControllerClose(controller);
1564  readableByteStreamControllerEnqueue(controller, new Uint8Array(1));
1565}
1566
1567{
1568  const stream = new ReadableStream({
1569    start(controller) {
1570      controller.enqueue('a');
1571      controller.close();
1572    },
1573    pull: common.mustNotCall(),
1574  });
1575
1576  const reader = stream.getReader();
1577  (async () => {
1578    isDisturbed(stream, false);
1579    await reader.read();
1580    isDisturbed(stream, true);
1581  })().then(common.mustCall());
1582}
1583
1584{
1585  const stream = new ReadableStream({
1586    start(controller) {
1587      controller.close();
1588    },
1589    pull: common.mustNotCall(),
1590  });
1591
1592  const reader = stream.getReader();
1593  (async () => {
1594    isDisturbed(stream, false);
1595    await reader.read();
1596    isDisturbed(stream, true);
1597  })().then(common.mustCall());
1598}
1599
1600{
1601  const stream = new ReadableStream({
1602    start(controller) {
1603    },
1604    pull: common.mustNotCall(),
1605  });
1606  stream.cancel();
1607
1608  const reader = stream.getReader();
1609  (async () => {
1610    isDisturbed(stream, false);
1611    await reader.read();
1612    isDisturbed(stream, true);
1613  })().then(common.mustCall());
1614}
1615
1616{
1617  const stream = new ReadableStream({
1618    pull: common.mustCall((controller) => {
1619      controller.error(new Error());
1620    }),
1621  });
1622
1623  const reader = stream.getReader();
1624  (async () => {
1625    isErrored(stream, false);
1626    await reader.read().catch(common.mustCall());
1627    isErrored(stream, true);
1628  })().then(common.mustCall());
1629}
1630
1631{
1632  const stream = new ReadableStream({
1633    pull: common.mustCall((controller) => {
1634      controller.error(new Error());
1635    }),
1636  });
1637
1638  const reader = stream.getReader();
1639  (async () => {
1640    isReadable(stream, true);
1641    await reader.read().catch(common.mustCall());
1642    isReadable(stream, false);
1643  })().then(common.mustCall());
1644}
1645
1646{
1647  const stream = new ReadableStream({
1648    type: 'bytes',
1649    start(controller) {
1650      controller.close();
1651    }
1652  });
1653
1654  const buffer = new ArrayBuffer(1024);
1655  const reader = stream.getReader({ mode: 'byob' });
1656
1657  reader.read(new DataView(buffer))
1658    .then(common.mustCall());
1659}
1660
1661{
1662  const stream = new ReadableStream({
1663    type: 'bytes',
1664    autoAllocateChunkSize: 128,
1665    pull: common.mustCall((controller) => {
1666      const view = controller.byobRequest.view;
1667      const dest = new Uint8Array(
1668        view.buffer,
1669        view.byteOffset,
1670        view.byteLength
1671      );
1672      dest.fill(1);
1673      controller.byobRequest.respondWithNewView(dest);
1674    }),
1675  });
1676
1677  const reader = stream.getReader({ mode: 'byob' });
1678
1679  const buffer = new ArrayBuffer(10);
1680  const view = new Uint8Array(
1681    buffer,
1682    1,
1683    3
1684  );
1685
1686  reader.read(view).then(common.mustCall(({ value }) => {
1687    assert.deepStrictEqual(value, new Uint8Array([1, 1, 1]));
1688  }));
1689}
1690