• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// META: global=window,worker
2// META: script=../resources/rs-utils.js
3// META: script=../resources/test-utils.js
4// META: script=../resources/recording-streams.js
5// META: script=../resources/rs-test-templates.js
6'use strict';
7
8test(() => {
9
10  const rs = new ReadableStream();
11  const result = rs.tee();
12
13  assert_true(Array.isArray(result), 'return value should be an array');
14  assert_equals(result.length, 2, 'array should have length 2');
15  assert_equals(result[0].constructor, ReadableStream, '0th element should be a ReadableStream');
16  assert_equals(result[1].constructor, ReadableStream, '1st element should be a ReadableStream');
17
18}, 'ReadableStream teeing: rs.tee() returns an array of two ReadableStreams');
19
20promise_test(t => {
21
22  const rs = new ReadableStream({
23    start(c) {
24      c.enqueue('a');
25      c.enqueue('b');
26      c.close();
27    }
28  });
29
30  const branch = rs.tee();
31  const branch1 = branch[0];
32  const branch2 = branch[1];
33  const reader1 = branch1.getReader();
34  const reader2 = branch2.getReader();
35
36  reader2.closed.then(t.unreached_func('branch2 should not be closed'));
37
38  return Promise.all([
39    reader1.closed,
40    reader1.read().then(r => {
41      assert_object_equals(r, { value: 'a', done: false }, 'first chunk from branch1 should be correct');
42    }),
43    reader1.read().then(r => {
44      assert_object_equals(r, { value: 'b', done: false }, 'second chunk from branch1 should be correct');
45    }),
46    reader1.read().then(r => {
47      assert_object_equals(r, { value: undefined, done: true }, 'third read() from branch1 should be done');
48    }),
49    reader2.read().then(r => {
50      assert_object_equals(r, { value: 'a', done: false }, 'first chunk from branch2 should be correct');
51    })
52  ]);
53
54}, 'ReadableStream teeing: should be able to read one branch to the end without affecting the other');
55
56promise_test(() => {
57
58  const theObject = { the: 'test object' };
59  const rs = new ReadableStream({
60    start(c) {
61      c.enqueue(theObject);
62    }
63  });
64
65  const branch = rs.tee();
66  const branch1 = branch[0];
67  const branch2 = branch[1];
68  const reader1 = branch1.getReader();
69  const reader2 = branch2.getReader();
70
71  return Promise.all([reader1.read(), reader2.read()]).then(values => {
72    assert_object_equals(values[0], values[1], 'the values should be equal');
73  });
74
75}, 'ReadableStream teeing: values should be equal across each branch');
76
77promise_test(t => {
78
79  const theError = { name: 'boo!' };
80  const rs = new ReadableStream({
81    start(c) {
82      c.enqueue('a');
83      c.enqueue('b');
84    },
85    pull() {
86      throw theError;
87    }
88  });
89
90  const branches = rs.tee();
91  const reader1 = branches[0].getReader();
92  const reader2 = branches[1].getReader();
93
94  reader1.label = 'reader1';
95  reader2.label = 'reader2';
96
97  return Promise.all([
98    promise_rejects_exactly(t, theError, reader1.closed),
99    promise_rejects_exactly(t, theError, reader2.closed),
100    reader1.read().then(r => {
101      assert_object_equals(r, { value: 'a', done: false }, 'should be able to read the first chunk in branch1');
102    }),
103    reader1.read().then(r => {
104      assert_object_equals(r, { value: 'b', done: false }, 'should be able to read the second chunk in branch1');
105
106      return promise_rejects_exactly(t, theError, reader2.read());
107    })
108    .then(() => promise_rejects_exactly(t, theError, reader1.read()))
109  ]);
110
111}, 'ReadableStream teeing: errors in the source should propagate to both branches');
112
113promise_test(() => {
114
115  const rs = new ReadableStream({
116    start(c) {
117      c.enqueue('a');
118      c.enqueue('b');
119      c.close();
120    }
121  });
122
123  const branches = rs.tee();
124  const branch1 = branches[0];
125  const branch2 = branches[1];
126  branch1.cancel();
127
128  return Promise.all([
129    readableStreamToArray(branch1).then(chunks => {
130      assert_array_equals(chunks, [], 'branch1 should have no chunks');
131    }),
132    readableStreamToArray(branch2).then(chunks => {
133      assert_array_equals(chunks, ['a', 'b'], 'branch2 should have two chunks');
134    })
135  ]);
136
137}, 'ReadableStream teeing: canceling branch1 should not impact branch2');
138
139promise_test(() => {
140
141  const rs = new ReadableStream({
142    start(c) {
143      c.enqueue('a');
144      c.enqueue('b');
145      c.close();
146    }
147  });
148
149  const branches = rs.tee();
150  const branch1 = branches[0];
151  const branch2 = branches[1];
152  branch2.cancel();
153
154  return Promise.all([
155    readableStreamToArray(branch1).then(chunks => {
156      assert_array_equals(chunks, ['a', 'b'], 'branch1 should have two chunks');
157    }),
158    readableStreamToArray(branch2).then(chunks => {
159      assert_array_equals(chunks, [], 'branch2 should have no chunks');
160    })
161  ]);
162
163}, 'ReadableStream teeing: canceling branch2 should not impact branch1');
164
165templatedRSTeeCancel('ReadableStream teeing', (extras) => {
166  return new ReadableStream({ ...extras });
167});
168
169promise_test(t => {
170
171  let controller;
172  const stream = new ReadableStream({ start(c) { controller = c; } });
173  const [branch1, branch2] = stream.tee();
174
175  const error = new Error();
176  error.name = 'distinctive';
177
178  // Ensure neither branch is waiting in ReadableStreamDefaultReaderRead().
179  controller.enqueue();
180  controller.enqueue();
181
182  return delay(0).then(() => {
183    // This error will have to be detected via [[closedPromise]].
184    controller.error(error);
185
186    const reader1 = branch1.getReader();
187    const reader2 = branch2.getReader();
188
189    return Promise.all([
190      promise_rejects_exactly(t, error, reader1.closed, 'reader1.closed should reject'),
191      promise_rejects_exactly(t, error, reader2.closed, 'reader2.closed should reject')
192    ]);
193  });
194
195}, 'ReadableStream teeing: erroring a teed stream should error both branches');
196
197promise_test(() => {
198
199  let controller;
200  const rs = new ReadableStream({
201    start(c) {
202      controller = c;
203    }
204  });
205
206  const branches = rs.tee();
207  const reader1 = branches[0].getReader();
208  const reader2 = branches[1].getReader();
209
210  const promise = Promise.all([reader1.closed, reader2.closed]);
211
212  controller.close();
213  return promise;
214
215}, 'ReadableStream teeing: closing the original should immediately close the branches');
216
217promise_test(t => {
218
219  let controller;
220  const rs = new ReadableStream({
221    start(c) {
222      controller = c;
223    }
224  });
225
226  const branches = rs.tee();
227  const reader1 = branches[0].getReader();
228  const reader2 = branches[1].getReader();
229
230  const theError = { name: 'boo!' };
231  const promise = Promise.all([
232    promise_rejects_exactly(t, theError, reader1.closed),
233    promise_rejects_exactly(t, theError, reader2.closed)
234  ]);
235
236  controller.error(theError);
237  return promise;
238
239}, 'ReadableStream teeing: erroring the original should immediately error the branches');
240
241promise_test(async t => {
242
243  let controller;
244  const rs = new ReadableStream({
245    start(c) {
246      controller = c;
247    }
248  });
249
250  const [reader1, reader2] = rs.tee().map(branch => branch.getReader());
251  const cancelPromise = reader2.cancel();
252
253  controller.enqueue('a');
254
255  const read1 = await reader1.read();
256  assert_object_equals(read1, { value: 'a', done: false }, 'first read() from branch1 should fulfill with the chunk');
257
258  controller.close();
259
260  const read2 = await reader1.read();
261  assert_object_equals(read2, { value: undefined, done: true }, 'second read() from branch1 should be done');
262
263  await Promise.all([
264    reader1.closed,
265    cancelPromise
266  ]);
267
268}, 'ReadableStream teeing: canceling branch1 should finish when branch2 reads until end of stream');
269
270promise_test(async t => {
271
272  let controller;
273  const theError = { name: 'boo!' };
274  const rs = new ReadableStream({
275    start(c) {
276      controller = c;
277    }
278  });
279
280  const [reader1, reader2] = rs.tee().map(branch => branch.getReader());
281  const cancelPromise = reader2.cancel();
282
283  controller.error(theError);
284
285  await Promise.all([
286    promise_rejects_exactly(t, theError, reader1.read()),
287    cancelPromise
288  ]);
289
290}, 'ReadableStream teeing: canceling branch1 should finish when original stream errors');
291
292promise_test(async () => {
293
294  const rs = new ReadableStream({});
295
296  const [branch1, branch2] = rs.tee();
297
298  const cancel1 = branch1.cancel();
299  await flushAsyncEvents();
300  const cancel2 = branch2.cancel();
301
302  await Promise.all([cancel1, cancel2]);
303
304}, 'ReadableStream teeing: canceling both branches in sequence with delay');
305
306promise_test(async t => {
307
308  const theError = { name: 'boo!' };
309  const rs = new ReadableStream({
310    cancel() {
311      throw theError;
312    }
313  });
314
315  const [branch1, branch2] = rs.tee();
316
317  const cancel1 = branch1.cancel();
318  await flushAsyncEvents();
319  const cancel2 = branch2.cancel();
320
321  await Promise.all([
322    promise_rejects_exactly(t, theError, cancel1),
323    promise_rejects_exactly(t, theError, cancel2)
324  ]);
325
326}, 'ReadableStream teeing: failing to cancel when canceling both branches in sequence with delay');
327
328test(t => {
329
330  // Copy original global.
331  const oldReadableStream = ReadableStream;
332  const getReader = ReadableStream.prototype.getReader;
333
334  const origRS = new ReadableStream();
335
336  // Replace the global ReadableStream constructor with one that doesn't work.
337  ReadableStream = function() {
338    throw new Error('global ReadableStream constructor called');
339  };
340  t.add_cleanup(() => {
341    ReadableStream = oldReadableStream;
342  });
343
344  // This will probably fail if the global ReadableStream constructor was used.
345  const [rs1, rs2] = origRS.tee();
346
347  // These will definitely fail if the global ReadableStream constructor was used.
348  assert_not_equals(getReader.call(rs1), undefined, 'getReader should work on rs1');
349  assert_not_equals(getReader.call(rs2), undefined, 'getReader should work on rs2');
350
351}, 'ReadableStreamTee should not use a modified ReadableStream constructor from the global object');
352
353promise_test(t => {
354
355  const rs = recordingReadableStream({}, { highWaterMark: 0 });
356
357  // Create two branches, each with a HWM of 1. This should result in one
358  // chunk being pulled, not two.
359  rs.tee();
360  return flushAsyncEvents().then(() => {
361    assert_array_equals(rs.events, ['pull'], 'pull should only be called once');
362  });
363
364}, 'ReadableStreamTee should not pull more chunks than can fit in the branch queue');
365
366promise_test(t => {
367
368  const rs = recordingReadableStream({
369    pull(controller) {
370      controller.enqueue('a');
371    }
372  }, { highWaterMark: 0 });
373
374  const [reader1, reader2] = rs.tee().map(branch => branch.getReader());
375  return Promise.all([reader1.read(), reader2.read()])
376      .then(() => {
377    assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice');
378  });
379
380}, 'ReadableStreamTee should only pull enough to fill the emptiest queue');
381
382promise_test(t => {
383
384  const rs = recordingReadableStream({}, { highWaterMark: 0 });
385  const theError = { name: 'boo!' };
386
387  rs.controller.error(theError);
388
389  const [reader1, reader2] = rs.tee().map(branch => branch.getReader());
390
391  return flushAsyncEvents().then(() => {
392    assert_array_equals(rs.events, [], 'pull should not be called');
393
394    return Promise.all([
395      promise_rejects_exactly(t, theError, reader1.closed),
396      promise_rejects_exactly(t, theError, reader2.closed)
397    ]);
398  });
399
400}, 'ReadableStreamTee should not pull when original is already errored');
401
402for (const branch of [1, 2]) {
403  promise_test(t => {
404
405    const rs = recordingReadableStream({}, { highWaterMark: 0 });
406    const theError = { name: 'boo!' };
407
408    const [reader1, reader2] = rs.tee().map(branch => branch.getReader());
409
410    return flushAsyncEvents().then(() => {
411      assert_array_equals(rs.events, ['pull'], 'pull should be called once');
412
413      rs.controller.enqueue('a');
414
415      const reader = (branch === 1) ? reader1 : reader2;
416      return reader.read();
417    }).then(() => flushAsyncEvents()).then(() => {
418      assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice');
419
420      rs.controller.error(theError);
421
422      return Promise.all([
423        promise_rejects_exactly(t, theError, reader1.closed),
424        promise_rejects_exactly(t, theError, reader2.closed)
425      ]);
426    }).then(() => flushAsyncEvents()).then(() => {
427      assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice');
428    });
429
430  }, `ReadableStreamTee stops pulling when original stream errors while branch ${branch} is reading`);
431}
432
433promise_test(t => {
434
435  const rs = recordingReadableStream({}, { highWaterMark: 0 });
436  const theError = { name: 'boo!' };
437
438  const [reader1, reader2] = rs.tee().map(branch => branch.getReader());
439
440  return flushAsyncEvents().then(() => {
441    assert_array_equals(rs.events, ['pull'], 'pull should be called once');
442
443    rs.controller.enqueue('a');
444
445    return Promise.all([reader1.read(), reader2.read()]);
446  }).then(() => flushAsyncEvents()).then(() => {
447    assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice');
448
449    rs.controller.error(theError);
450
451    return Promise.all([
452      promise_rejects_exactly(t, theError, reader1.closed),
453      promise_rejects_exactly(t, theError, reader2.closed)
454    ]);
455  }).then(() => flushAsyncEvents()).then(() => {
456    assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice');
457  });
458
459}, 'ReadableStreamTee stops pulling when original stream errors while both branches are reading');
460
461promise_test(async () => {
462
463  const rs = recordingReadableStream();
464
465  const [reader1, reader2] = rs.tee().map(branch => branch.getReader());
466  const branch1Reads = [reader1.read(), reader1.read()];
467  const branch2Reads = [reader2.read(), reader2.read()];
468
469  await flushAsyncEvents();
470  rs.controller.enqueue('a');
471  rs.controller.close();
472
473  assert_object_equals(await branch1Reads[0], { value: 'a', done: false }, 'first chunk from branch1 should be correct');
474  assert_object_equals(await branch2Reads[0], { value: 'a', done: false }, 'first chunk from branch2 should be correct');
475
476  assert_object_equals(await branch1Reads[1], { value: undefined, done: true }, 'second read() from branch1 should be done');
477  assert_object_equals(await branch2Reads[1], { value: undefined, done: true }, 'second read() from branch2 should be done');
478
479}, 'ReadableStream teeing: enqueue() and close() while both branches are pulling');
480