1// META: global=window,worker 2// META: script=../resources/test-utils.js 3// META: script=../resources/rs-utils.js 4// META: script=../resources/recording-streams.js 5'use strict'; 6 7const error1 = new Error('error1!'); 8error1.name = 'error1'; 9 10promise_test(t => { 11 12 const rs = recordingReadableStream({ 13 start(controller) { 14 controller.enqueue('a'); 15 controller.enqueue('b'); 16 controller.close(); 17 } 18 }); 19 20 const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 })); 21 22 const pipePromise = rs.pipeTo(ws, { preventCancel: true }); 23 24 // Wait and make sure it doesn't do any reading. 25 return flushAsyncEvents().then(() => { 26 ws.controller.error(error1); 27 }) 28 .then(() => promise_rejects_exactly(t, error1, pipePromise, 'pipeTo must reject with the same error')) 29 .then(() => { 30 assert_array_equals(rs.eventsWithoutPulls, []); 31 assert_array_equals(ws.events, []); 32 }) 33 .then(() => readableStreamToArray(rs)) 34 .then(chunksNotPreviouslyRead => { 35 assert_array_equals(chunksNotPreviouslyRead, ['a', 'b']); 36 }); 37 38}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks'); 39 40promise_test(() => { 41 42 const rs = recordingReadableStream({ 43 start(controller) { 44 controller.enqueue('b'); 45 controller.close(); 46 } 47 }); 48 49 let resolveWritePromise; 50 const ws = recordingWritableStream({ 51 write() { 52 if (!resolveWritePromise) { 53 // first write 54 return new Promise(resolve => { 55 resolveWritePromise = resolve; 56 }); 57 } 58 return undefined; 59 } 60 }); 61 62 const writer = ws.getWriter(); 63 const firstWritePromise = writer.write('a'); 64 assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0'); 65 writer.releaseLock(); 66 67 // firstWritePromise won't settle until we call resolveWritePromise. 68 69 const pipePromise = rs.pipeTo(ws); 70 71 return flushAsyncEvents().then(() => resolveWritePromise()) 72 .then(() => Promise.all([firstWritePromise, pipePromise])) 73 .then(() => { 74 assert_array_equals(rs.eventsWithoutPulls, []); 75 assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']); 76 }); 77 78}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks, but then does'); 79 80promise_test(() => { 81 82 const rs = recordingReadableStream(); 83 84 let resolveWritePromise; 85 const ws = recordingWritableStream({ 86 write() { 87 if (!resolveWritePromise) { 88 // first write 89 return new Promise(resolve => { 90 resolveWritePromise = resolve; 91 }); 92 } 93 return undefined; 94 } 95 }); 96 97 const writer = ws.getWriter(); 98 writer.write('a'); 99 100 return flushAsyncEvents().then(() => { 101 assert_array_equals(ws.events, ['write', 'a']); 102 assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0'); 103 writer.releaseLock(); 104 105 const pipePromise = rs.pipeTo(ws); 106 107 rs.controller.enqueue('b'); 108 resolveWritePromise(); 109 rs.controller.close(); 110 111 return pipePromise.then(() => { 112 assert_array_equals(rs.eventsWithoutPulls, []); 113 assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']); 114 }); 115 }); 116 117}, 'Piping from an empty ReadableStream into a WritableStream that does not desire chunks, but then the readable ' + 118 'stream becomes non-empty and the writable stream starts desiring chunks'); 119 120promise_test(() => { 121 const unreadChunks = ['b', 'c', 'd']; 122 123 const rs = recordingReadableStream({ 124 pull(controller) { 125 controller.enqueue(unreadChunks.shift()); 126 if (unreadChunks.length === 0) { 127 controller.close(); 128 } 129 } 130 }, new CountQueuingStrategy({ highWaterMark: 0 })); 131 132 let resolveWritePromise; 133 const ws = recordingWritableStream({ 134 write() { 135 if (!resolveWritePromise) { 136 // first write 137 return new Promise(resolve => { 138 resolveWritePromise = resolve; 139 }); 140 } 141 return undefined; 142 } 143 }, new CountQueuingStrategy({ highWaterMark: 3 })); 144 145 const writer = ws.getWriter(); 146 const firstWritePromise = writer.write('a'); 147 assert_equals(writer.desiredSize, 2, 'after writing the writer\'s desiredSize must be 2'); 148 writer.releaseLock(); 149 150 // firstWritePromise won't settle until we call resolveWritePromise. 151 152 const pipePromise = rs.pipeTo(ws); 153 154 return flushAsyncEvents().then(() => { 155 assert_array_equals(ws.events, ['write', 'a']); 156 assert_equals(unreadChunks.length, 1, 'chunks should continue to be enqueued until the HWM is reached'); 157 }).then(() => resolveWritePromise()) 158 .then(() => Promise.all([firstWritePromise, pipePromise])) 159 .then(() => { 160 assert_array_equals(rs.events, ['pull', 'pull', 'pull']); 161 assert_array_equals(ws.events, ['write', 'a', 'write', 'b','write', 'c','write', 'd', 'close']); 162 }); 163 164}, 'Piping from a ReadableStream to a WritableStream that desires more chunks before finishing with previous ones'); 165 166class StepTracker { 167 constructor() { 168 this.waiters = []; 169 this.wakers = []; 170 } 171 172 // Returns promise which resolves when step `n` is reached. Also schedules step n + 1 to happen shortly after the 173 // promise is resolved. 174 waitThenAdvance(n) { 175 if (this.waiters[n] === undefined) { 176 this.waiters[n] = new Promise(resolve => { 177 this.wakers[n] = resolve; 178 }); 179 this.waiters[n] 180 .then(() => flushAsyncEvents()) 181 .then(() => { 182 if (this.wakers[n + 1] !== undefined) { 183 this.wakers[n + 1](); 184 } 185 }); 186 } 187 if (n == 0) { 188 this.wakers[0](); 189 } 190 return this.waiters[n]; 191 } 192} 193 194promise_test(() => { 195 const steps = new StepTracker(); 196 const desiredSizes = []; 197 const rs = recordingReadableStream({ 198 start(controller) { 199 steps.waitThenAdvance(1).then(() => enqueue('a')); 200 steps.waitThenAdvance(3).then(() => enqueue('b')); 201 steps.waitThenAdvance(5).then(() => enqueue('c')); 202 steps.waitThenAdvance(7).then(() => enqueue('d')); 203 steps.waitThenAdvance(11).then(() => controller.close()); 204 205 function enqueue(chunk) { 206 controller.enqueue(chunk); 207 desiredSizes.push(controller.desiredSize); 208 } 209 } 210 }); 211 212 const chunksFinishedWriting = []; 213 const writableStartPromise = Promise.resolve(); 214 let writeCalled = false; 215 const ws = recordingWritableStream({ 216 start() { 217 return writableStartPromise; 218 }, 219 write(chunk) { 220 const waitForStep = writeCalled ? 12 : 9; 221 writeCalled = true; 222 return steps.waitThenAdvance(waitForStep).then(() => { 223 chunksFinishedWriting.push(chunk); 224 }); 225 } 226 }); 227 228 return writableStartPromise.then(() => { 229 const pipePromise = rs.pipeTo(ws); 230 steps.waitThenAdvance(0); 231 232 return Promise.all([ 233 steps.waitThenAdvance(2).then(() => { 234 assert_array_equals(chunksFinishedWriting, [], 'at step 2, zero chunks must have finished writing'); 235 assert_array_equals(ws.events, ['write', 'a'], 'at step 2, one chunk must have been written'); 236 237 // When 'a' (the very first chunk) was enqueued, it was immediately used to fulfill the outstanding read request 238 // promise, leaving the queue empty. 239 assert_array_equals(desiredSizes, [1], 240 'at step 2, the desiredSize at the last enqueue (step 1) must have been 1'); 241 assert_equals(rs.controller.desiredSize, 1, 'at step 2, the current desiredSize must be 1'); 242 }), 243 244 steps.waitThenAdvance(4).then(() => { 245 assert_array_equals(chunksFinishedWriting, [], 'at step 4, zero chunks must have finished writing'); 246 assert_array_equals(ws.events, ['write', 'a'], 'at step 4, one chunk must have been written'); 247 248 // When 'b' was enqueued at step 3, the queue was also empty, since immediately after enqueuing 'a' at 249 // step 1, it was dequeued in order to fulfill the read() call that was made at step 0. Thus the queue 250 // had size 1 (thus desiredSize of 0). 251 assert_array_equals(desiredSizes, [1, 0], 252 'at step 4, the desiredSize at the last enqueue (step 3) must have been 0'); 253 assert_equals(rs.controller.desiredSize, 0, 'at step 4, the current desiredSize must be 0'); 254 }), 255 256 steps.waitThenAdvance(6).then(() => { 257 assert_array_equals(chunksFinishedWriting, [], 'at step 6, zero chunks must have finished writing'); 258 assert_array_equals(ws.events, ['write', 'a'], 'at step 6, one chunk must have been written'); 259 260 // When 'c' was enqueued at step 5, the queue was not empty; it had 'b' in it, since 'b' will not be read until 261 // the first write completes at step 9. Thus, the queue size is 2 after enqueuing 'c', giving a desiredSize of 262 // -1. 263 assert_array_equals(desiredSizes, [1, 0, -1], 264 'at step 6, the desiredSize at the last enqueue (step 5) must have been -1'); 265 assert_equals(rs.controller.desiredSize, -1, 'at step 6, the current desiredSize must be -1'); 266 }), 267 268 steps.waitThenAdvance(8).then(() => { 269 assert_array_equals(chunksFinishedWriting, [], 'at step 8, zero chunks must have finished writing'); 270 assert_array_equals(ws.events, ['write', 'a'], 'at step 8, one chunk must have been written'); 271 272 // When 'd' was enqueued at step 7, the situation is the same as before, leading to a queue containing 'b', 'c', 273 // and 'd'. 274 assert_array_equals(desiredSizes, [1, 0, -1, -2], 275 'at step 8, the desiredSize at the last enqueue (step 7) must have been -2'); 276 assert_equals(rs.controller.desiredSize, -2, 'at step 8, the current desiredSize must be -2'); 277 }), 278 279 steps.waitThenAdvance(10).then(() => { 280 assert_array_equals(chunksFinishedWriting, ['a'], 'at step 10, one chunk must have finished writing'); 281 assert_array_equals(ws.events, ['write', 'a', 'write', 'b'], 282 'at step 10, two chunks must have been written'); 283 284 assert_equals(rs.controller.desiredSize, -1, 'at step 10, the current desiredSize must be -1'); 285 }), 286 287 pipePromise.then(() => { 288 assert_array_equals(desiredSizes, [1, 0, -1, -2], 'backpressure must have been exerted at the source'); 289 assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks finished writing'); 290 291 assert_array_equals(rs.eventsWithoutPulls, [], 'nothing unexpected should happen to the ReadableStream'); 292 assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c', 'write', 'd', 'close'], 293 'all chunks were written (and the WritableStream closed)'); 294 }) 295 ]); 296 }); 297}, 'Piping to a WritableStream that does not consume the writes fast enough exerts backpressure on the ReadableStream'); 298