1// META: global=window,worker 2// META: script=../resources/rs-utils.js 3// META: script=../resources/test-utils.js 4// META: script=../resources/recording-streams.js 5// META: script=../resources/rs-test-templates.js 6'use strict'; 7 8test(() => { 9 10 const rs = new ReadableStream(); 11 const result = rs.tee(); 12 13 assert_true(Array.isArray(result), 'return value should be an array'); 14 assert_equals(result.length, 2, 'array should have length 2'); 15 assert_equals(result[0].constructor, ReadableStream, '0th element should be a ReadableStream'); 16 assert_equals(result[1].constructor, ReadableStream, '1st element should be a ReadableStream'); 17 18}, 'ReadableStream teeing: rs.tee() returns an array of two ReadableStreams'); 19 20promise_test(t => { 21 22 const rs = new ReadableStream({ 23 start(c) { 24 c.enqueue('a'); 25 c.enqueue('b'); 26 c.close(); 27 } 28 }); 29 30 const branch = rs.tee(); 31 const branch1 = branch[0]; 32 const branch2 = branch[1]; 33 const reader1 = branch1.getReader(); 34 const reader2 = branch2.getReader(); 35 36 reader2.closed.then(t.unreached_func('branch2 should not be closed')); 37 38 return Promise.all([ 39 reader1.closed, 40 reader1.read().then(r => { 41 assert_object_equals(r, { value: 'a', done: false }, 'first chunk from branch1 should be correct'); 42 }), 43 reader1.read().then(r => { 44 assert_object_equals(r, { value: 'b', done: false }, 'second chunk from branch1 should be correct'); 45 }), 46 reader1.read().then(r => { 47 assert_object_equals(r, { value: undefined, done: true }, 'third read() from branch1 should be done'); 48 }), 49 reader2.read().then(r => { 50 assert_object_equals(r, { value: 'a', done: false }, 'first chunk from branch2 should be correct'); 51 }) 52 ]); 53 54}, 'ReadableStream teeing: should be able to read one branch to the end without affecting the other'); 55 56promise_test(() => { 57 58 const theObject = { the: 'test object' }; 59 const rs = new ReadableStream({ 60 start(c) { 61 c.enqueue(theObject); 62 } 63 }); 64 65 const branch = rs.tee(); 66 const branch1 = branch[0]; 67 const branch2 = branch[1]; 68 const reader1 = branch1.getReader(); 69 const reader2 = branch2.getReader(); 70 71 return Promise.all([reader1.read(), reader2.read()]).then(values => { 72 assert_object_equals(values[0], values[1], 'the values should be equal'); 73 }); 74 75}, 'ReadableStream teeing: values should be equal across each branch'); 76 77promise_test(t => { 78 79 const theError = { name: 'boo!' }; 80 const rs = new ReadableStream({ 81 start(c) { 82 c.enqueue('a'); 83 c.enqueue('b'); 84 }, 85 pull() { 86 throw theError; 87 } 88 }); 89 90 const branches = rs.tee(); 91 const reader1 = branches[0].getReader(); 92 const reader2 = branches[1].getReader(); 93 94 reader1.label = 'reader1'; 95 reader2.label = 'reader2'; 96 97 return Promise.all([ 98 promise_rejects_exactly(t, theError, reader1.closed), 99 promise_rejects_exactly(t, theError, reader2.closed), 100 reader1.read().then(r => { 101 assert_object_equals(r, { value: 'a', done: false }, 'should be able to read the first chunk in branch1'); 102 }), 103 reader1.read().then(r => { 104 assert_object_equals(r, { value: 'b', done: false }, 'should be able to read the second chunk in branch1'); 105 106 return promise_rejects_exactly(t, theError, reader2.read()); 107 }) 108 .then(() => promise_rejects_exactly(t, theError, reader1.read())) 109 ]); 110 111}, 'ReadableStream teeing: errors in the source should propagate to both branches'); 112 113promise_test(() => { 114 115 const rs = new ReadableStream({ 116 start(c) { 117 c.enqueue('a'); 118 c.enqueue('b'); 119 c.close(); 120 } 121 }); 122 123 const branches = rs.tee(); 124 const branch1 = branches[0]; 125 const branch2 = branches[1]; 126 branch1.cancel(); 127 128 return Promise.all([ 129 readableStreamToArray(branch1).then(chunks => { 130 assert_array_equals(chunks, [], 'branch1 should have no chunks'); 131 }), 132 readableStreamToArray(branch2).then(chunks => { 133 assert_array_equals(chunks, ['a', 'b'], 'branch2 should have two chunks'); 134 }) 135 ]); 136 137}, 'ReadableStream teeing: canceling branch1 should not impact branch2'); 138 139promise_test(() => { 140 141 const rs = new ReadableStream({ 142 start(c) { 143 c.enqueue('a'); 144 c.enqueue('b'); 145 c.close(); 146 } 147 }); 148 149 const branches = rs.tee(); 150 const branch1 = branches[0]; 151 const branch2 = branches[1]; 152 branch2.cancel(); 153 154 return Promise.all([ 155 readableStreamToArray(branch1).then(chunks => { 156 assert_array_equals(chunks, ['a', 'b'], 'branch1 should have two chunks'); 157 }), 158 readableStreamToArray(branch2).then(chunks => { 159 assert_array_equals(chunks, [], 'branch2 should have no chunks'); 160 }) 161 ]); 162 163}, 'ReadableStream teeing: canceling branch2 should not impact branch1'); 164 165templatedRSTeeCancel('ReadableStream teeing', (extras) => { 166 return new ReadableStream({ ...extras }); 167}); 168 169promise_test(t => { 170 171 let controller; 172 const stream = new ReadableStream({ start(c) { controller = c; } }); 173 const [branch1, branch2] = stream.tee(); 174 175 const error = new Error(); 176 error.name = 'distinctive'; 177 178 // Ensure neither branch is waiting in ReadableStreamDefaultReaderRead(). 179 controller.enqueue(); 180 controller.enqueue(); 181 182 return delay(0).then(() => { 183 // This error will have to be detected via [[closedPromise]]. 184 controller.error(error); 185 186 const reader1 = branch1.getReader(); 187 const reader2 = branch2.getReader(); 188 189 return Promise.all([ 190 promise_rejects_exactly(t, error, reader1.closed, 'reader1.closed should reject'), 191 promise_rejects_exactly(t, error, reader2.closed, 'reader2.closed should reject') 192 ]); 193 }); 194 195}, 'ReadableStream teeing: erroring a teed stream should error both branches'); 196 197promise_test(() => { 198 199 let controller; 200 const rs = new ReadableStream({ 201 start(c) { 202 controller = c; 203 } 204 }); 205 206 const branches = rs.tee(); 207 const reader1 = branches[0].getReader(); 208 const reader2 = branches[1].getReader(); 209 210 const promise = Promise.all([reader1.closed, reader2.closed]); 211 212 controller.close(); 213 return promise; 214 215}, 'ReadableStream teeing: closing the original should immediately close the branches'); 216 217promise_test(t => { 218 219 let controller; 220 const rs = new ReadableStream({ 221 start(c) { 222 controller = c; 223 } 224 }); 225 226 const branches = rs.tee(); 227 const reader1 = branches[0].getReader(); 228 const reader2 = branches[1].getReader(); 229 230 const theError = { name: 'boo!' }; 231 const promise = Promise.all([ 232 promise_rejects_exactly(t, theError, reader1.closed), 233 promise_rejects_exactly(t, theError, reader2.closed) 234 ]); 235 236 controller.error(theError); 237 return promise; 238 239}, 'ReadableStream teeing: erroring the original should immediately error the branches'); 240 241promise_test(async t => { 242 243 let controller; 244 const rs = new ReadableStream({ 245 start(c) { 246 controller = c; 247 } 248 }); 249 250 const [reader1, reader2] = rs.tee().map(branch => branch.getReader()); 251 const cancelPromise = reader2.cancel(); 252 253 controller.enqueue('a'); 254 255 const read1 = await reader1.read(); 256 assert_object_equals(read1, { value: 'a', done: false }, 'first read() from branch1 should fulfill with the chunk'); 257 258 controller.close(); 259 260 const read2 = await reader1.read(); 261 assert_object_equals(read2, { value: undefined, done: true }, 'second read() from branch1 should be done'); 262 263 await Promise.all([ 264 reader1.closed, 265 cancelPromise 266 ]); 267 268}, 'ReadableStream teeing: canceling branch1 should finish when branch2 reads until end of stream'); 269 270promise_test(async t => { 271 272 let controller; 273 const theError = { name: 'boo!' }; 274 const rs = new ReadableStream({ 275 start(c) { 276 controller = c; 277 } 278 }); 279 280 const [reader1, reader2] = rs.tee().map(branch => branch.getReader()); 281 const cancelPromise = reader2.cancel(); 282 283 controller.error(theError); 284 285 await Promise.all([ 286 promise_rejects_exactly(t, theError, reader1.read()), 287 cancelPromise 288 ]); 289 290}, 'ReadableStream teeing: canceling branch1 should finish when original stream errors'); 291 292promise_test(async () => { 293 294 const rs = new ReadableStream({}); 295 296 const [branch1, branch2] = rs.tee(); 297 298 const cancel1 = branch1.cancel(); 299 await flushAsyncEvents(); 300 const cancel2 = branch2.cancel(); 301 302 await Promise.all([cancel1, cancel2]); 303 304}, 'ReadableStream teeing: canceling both branches in sequence with delay'); 305 306promise_test(async t => { 307 308 const theError = { name: 'boo!' }; 309 const rs = new ReadableStream({ 310 cancel() { 311 throw theError; 312 } 313 }); 314 315 const [branch1, branch2] = rs.tee(); 316 317 const cancel1 = branch1.cancel(); 318 await flushAsyncEvents(); 319 const cancel2 = branch2.cancel(); 320 321 await Promise.all([ 322 promise_rejects_exactly(t, theError, cancel1), 323 promise_rejects_exactly(t, theError, cancel2) 324 ]); 325 326}, 'ReadableStream teeing: failing to cancel when canceling both branches in sequence with delay'); 327 328test(t => { 329 330 // Copy original global. 331 const oldReadableStream = ReadableStream; 332 const getReader = ReadableStream.prototype.getReader; 333 334 const origRS = new ReadableStream(); 335 336 // Replace the global ReadableStream constructor with one that doesn't work. 337 ReadableStream = function() { 338 throw new Error('global ReadableStream constructor called'); 339 }; 340 t.add_cleanup(() => { 341 ReadableStream = oldReadableStream; 342 }); 343 344 // This will probably fail if the global ReadableStream constructor was used. 345 const [rs1, rs2] = origRS.tee(); 346 347 // These will definitely fail if the global ReadableStream constructor was used. 348 assert_not_equals(getReader.call(rs1), undefined, 'getReader should work on rs1'); 349 assert_not_equals(getReader.call(rs2), undefined, 'getReader should work on rs2'); 350 351}, 'ReadableStreamTee should not use a modified ReadableStream constructor from the global object'); 352 353promise_test(t => { 354 355 const rs = recordingReadableStream({}, { highWaterMark: 0 }); 356 357 // Create two branches, each with a HWM of 1. This should result in one 358 // chunk being pulled, not two. 359 rs.tee(); 360 return flushAsyncEvents().then(() => { 361 assert_array_equals(rs.events, ['pull'], 'pull should only be called once'); 362 }); 363 364}, 'ReadableStreamTee should not pull more chunks than can fit in the branch queue'); 365 366promise_test(t => { 367 368 const rs = recordingReadableStream({ 369 pull(controller) { 370 controller.enqueue('a'); 371 } 372 }, { highWaterMark: 0 }); 373 374 const [reader1, reader2] = rs.tee().map(branch => branch.getReader()); 375 return Promise.all([reader1.read(), reader2.read()]) 376 .then(() => { 377 assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice'); 378 }); 379 380}, 'ReadableStreamTee should only pull enough to fill the emptiest queue'); 381 382promise_test(t => { 383 384 const rs = recordingReadableStream({}, { highWaterMark: 0 }); 385 const theError = { name: 'boo!' }; 386 387 rs.controller.error(theError); 388 389 const [reader1, reader2] = rs.tee().map(branch => branch.getReader()); 390 391 return flushAsyncEvents().then(() => { 392 assert_array_equals(rs.events, [], 'pull should not be called'); 393 394 return Promise.all([ 395 promise_rejects_exactly(t, theError, reader1.closed), 396 promise_rejects_exactly(t, theError, reader2.closed) 397 ]); 398 }); 399 400}, 'ReadableStreamTee should not pull when original is already errored'); 401 402for (const branch of [1, 2]) { 403 promise_test(t => { 404 405 const rs = recordingReadableStream({}, { highWaterMark: 0 }); 406 const theError = { name: 'boo!' }; 407 408 const [reader1, reader2] = rs.tee().map(branch => branch.getReader()); 409 410 return flushAsyncEvents().then(() => { 411 assert_array_equals(rs.events, ['pull'], 'pull should be called once'); 412 413 rs.controller.enqueue('a'); 414 415 const reader = (branch === 1) ? reader1 : reader2; 416 return reader.read(); 417 }).then(() => flushAsyncEvents()).then(() => { 418 assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice'); 419 420 rs.controller.error(theError); 421 422 return Promise.all([ 423 promise_rejects_exactly(t, theError, reader1.closed), 424 promise_rejects_exactly(t, theError, reader2.closed) 425 ]); 426 }).then(() => flushAsyncEvents()).then(() => { 427 assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice'); 428 }); 429 430 }, `ReadableStreamTee stops pulling when original stream errors while branch ${branch} is reading`); 431} 432 433promise_test(t => { 434 435 const rs = recordingReadableStream({}, { highWaterMark: 0 }); 436 const theError = { name: 'boo!' }; 437 438 const [reader1, reader2] = rs.tee().map(branch => branch.getReader()); 439 440 return flushAsyncEvents().then(() => { 441 assert_array_equals(rs.events, ['pull'], 'pull should be called once'); 442 443 rs.controller.enqueue('a'); 444 445 return Promise.all([reader1.read(), reader2.read()]); 446 }).then(() => flushAsyncEvents()).then(() => { 447 assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice'); 448 449 rs.controller.error(theError); 450 451 return Promise.all([ 452 promise_rejects_exactly(t, theError, reader1.closed), 453 promise_rejects_exactly(t, theError, reader2.closed) 454 ]); 455 }).then(() => flushAsyncEvents()).then(() => { 456 assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice'); 457 }); 458 459}, 'ReadableStreamTee stops pulling when original stream errors while both branches are reading'); 460 461promise_test(async () => { 462 463 const rs = recordingReadableStream(); 464 465 const [reader1, reader2] = rs.tee().map(branch => branch.getReader()); 466 const branch1Reads = [reader1.read(), reader1.read()]; 467 const branch2Reads = [reader2.read(), reader2.read()]; 468 469 await flushAsyncEvents(); 470 rs.controller.enqueue('a'); 471 rs.controller.close(); 472 473 assert_object_equals(await branch1Reads[0], { value: 'a', done: false }, 'first chunk from branch1 should be correct'); 474 assert_object_equals(await branch2Reads[0], { value: 'a', done: false }, 'first chunk from branch2 should be correct'); 475 476 assert_object_equals(await branch1Reads[1], { value: undefined, done: true }, 'second read() from branch1 should be done'); 477 assert_object_equals(await branch2Reads[1], { value: undefined, done: true }, 'second read() from branch2 should be done'); 478 479}, 'ReadableStream teeing: enqueue() and close() while both branches are pulling'); 480