• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// META: global=window,worker
2// META: script=../resources/test-utils.js
3// META: script=../resources/rs-utils.js
4// META: script=../resources/recording-streams.js
5'use strict';
6
7const error1 = new Error('error1!');
8error1.name = 'error1';
9
10promise_test(t => {
11
12  const rs = recordingReadableStream({
13    start(controller) {
14      controller.enqueue('a');
15      controller.enqueue('b');
16      controller.close();
17    }
18  });
19
20  const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
21
22  const pipePromise = rs.pipeTo(ws, { preventCancel: true });
23
24  // Wait and make sure it doesn't do any reading.
25  return flushAsyncEvents().then(() => {
26    ws.controller.error(error1);
27  })
28  .then(() => promise_rejects_exactly(t, error1, pipePromise, 'pipeTo must reject with the same error'))
29  .then(() => {
30    assert_array_equals(rs.eventsWithoutPulls, []);
31    assert_array_equals(ws.events, []);
32  })
33  .then(() => readableStreamToArray(rs))
34  .then(chunksNotPreviouslyRead => {
35    assert_array_equals(chunksNotPreviouslyRead, ['a', 'b']);
36  });
37
38}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks');
39
40promise_test(() => {
41
42  const rs = recordingReadableStream({
43    start(controller) {
44      controller.enqueue('b');
45      controller.close();
46    }
47  });
48
49  let resolveWritePromise;
50  const ws = recordingWritableStream({
51    write() {
52      if (!resolveWritePromise) {
53        // first write
54        return new Promise(resolve => {
55          resolveWritePromise = resolve;
56        });
57      }
58      return undefined;
59    }
60  });
61
62  const writer = ws.getWriter();
63  const firstWritePromise = writer.write('a');
64  assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0');
65  writer.releaseLock();
66
67  // firstWritePromise won't settle until we call resolveWritePromise.
68
69  const pipePromise = rs.pipeTo(ws);
70
71  return flushAsyncEvents().then(() => resolveWritePromise())
72    .then(() => Promise.all([firstWritePromise, pipePromise]))
73    .then(() => {
74      assert_array_equals(rs.eventsWithoutPulls, []);
75      assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']);
76    });
77
78}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks, but then does');
79
80promise_test(() => {
81
82  const rs = recordingReadableStream();
83
84  let resolveWritePromise;
85  const ws = recordingWritableStream({
86    write() {
87      if (!resolveWritePromise) {
88        // first write
89        return new Promise(resolve => {
90          resolveWritePromise = resolve;
91        });
92      }
93      return undefined;
94    }
95  });
96
97  const writer = ws.getWriter();
98  writer.write('a');
99
100  return flushAsyncEvents().then(() => {
101    assert_array_equals(ws.events, ['write', 'a']);
102    assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0');
103    writer.releaseLock();
104
105    const pipePromise = rs.pipeTo(ws);
106
107    rs.controller.enqueue('b');
108    resolveWritePromise();
109    rs.controller.close();
110
111    return pipePromise.then(() => {
112      assert_array_equals(rs.eventsWithoutPulls, []);
113      assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']);
114    });
115  });
116
117}, 'Piping from an empty ReadableStream into a WritableStream that does not desire chunks, but then the readable ' +
118   'stream becomes non-empty and the writable stream starts desiring chunks');
119
120promise_test(() => {
121  const unreadChunks = ['b', 'c', 'd'];
122
123  const rs = recordingReadableStream({
124    pull(controller) {
125      controller.enqueue(unreadChunks.shift());
126      if (unreadChunks.length === 0) {
127        controller.close();
128      }
129    }
130  }, new CountQueuingStrategy({ highWaterMark: 0 }));
131
132  let resolveWritePromise;
133  const ws = recordingWritableStream({
134    write() {
135      if (!resolveWritePromise) {
136        // first write
137        return new Promise(resolve => {
138          resolveWritePromise = resolve;
139        });
140      }
141      return undefined;
142    }
143  }, new CountQueuingStrategy({ highWaterMark: 3 }));
144
145  const writer = ws.getWriter();
146  const firstWritePromise = writer.write('a');
147  assert_equals(writer.desiredSize, 2, 'after writing the writer\'s desiredSize must be 2');
148  writer.releaseLock();
149
150  // firstWritePromise won't settle until we call resolveWritePromise.
151
152  const pipePromise = rs.pipeTo(ws);
153
154  return flushAsyncEvents().then(() => {
155    assert_array_equals(ws.events, ['write', 'a']);
156    assert_equals(unreadChunks.length, 1, 'chunks should continue to be enqueued until the HWM is reached');
157  }).then(() => resolveWritePromise())
158    .then(() => Promise.all([firstWritePromise, pipePromise]))
159    .then(() => {
160      assert_array_equals(rs.events, ['pull', 'pull', 'pull']);
161      assert_array_equals(ws.events, ['write', 'a', 'write', 'b','write', 'c','write', 'd', 'close']);
162    });
163
164}, 'Piping from a ReadableStream to a WritableStream that desires more chunks before finishing with previous ones');
165
166class StepTracker {
167  constructor() {
168    this.waiters = [];
169    this.wakers = [];
170  }
171
172  // Returns promise which resolves when step `n` is reached. Also schedules step n + 1 to happen shortly after the
173  // promise is resolved.
174  waitThenAdvance(n) {
175    if (this.waiters[n] === undefined) {
176      this.waiters[n] = new Promise(resolve => {
177        this.wakers[n] = resolve;
178      });
179      this.waiters[n]
180          .then(() => flushAsyncEvents())
181          .then(() => {
182            if (this.wakers[n + 1] !== undefined) {
183              this.wakers[n + 1]();
184            }
185          });
186    }
187    if (n == 0) {
188      this.wakers[0]();
189    }
190    return this.waiters[n];
191  }
192}
193
194promise_test(() => {
195  const steps = new StepTracker();
196  const desiredSizes = [];
197  const rs = recordingReadableStream({
198    start(controller) {
199      steps.waitThenAdvance(1).then(() => enqueue('a'));
200      steps.waitThenAdvance(3).then(() => enqueue('b'));
201      steps.waitThenAdvance(5).then(() => enqueue('c'));
202      steps.waitThenAdvance(7).then(() => enqueue('d'));
203      steps.waitThenAdvance(11).then(() => controller.close());
204
205      function enqueue(chunk) {
206        controller.enqueue(chunk);
207        desiredSizes.push(controller.desiredSize);
208      }
209    }
210  });
211
212  const chunksFinishedWriting = [];
213  const writableStartPromise = Promise.resolve();
214  let writeCalled = false;
215  const ws = recordingWritableStream({
216    start() {
217      return writableStartPromise;
218    },
219    write(chunk) {
220      const waitForStep = writeCalled ? 12 : 9;
221      writeCalled = true;
222      return steps.waitThenAdvance(waitForStep).then(() => {
223        chunksFinishedWriting.push(chunk);
224      });
225    }
226  });
227
228  return writableStartPromise.then(() => {
229    const pipePromise = rs.pipeTo(ws);
230    steps.waitThenAdvance(0);
231
232    return Promise.all([
233      steps.waitThenAdvance(2).then(() => {
234        assert_array_equals(chunksFinishedWriting, [], 'at step 2, zero chunks must have finished writing');
235        assert_array_equals(ws.events, ['write', 'a'], 'at step 2, one chunk must have been written');
236
237        // When 'a' (the very first chunk) was enqueued, it was immediately used to fulfill the outstanding read request
238        // promise, leaving the queue empty.
239        assert_array_equals(desiredSizes, [1],
240                            'at step 2, the desiredSize at the last enqueue (step 1) must have been 1');
241        assert_equals(rs.controller.desiredSize, 1, 'at step 2, the current desiredSize must be 1');
242      }),
243
244      steps.waitThenAdvance(4).then(() => {
245        assert_array_equals(chunksFinishedWriting, [], 'at step 4, zero chunks must have finished writing');
246        assert_array_equals(ws.events, ['write', 'a'], 'at step 4, one chunk must have been written');
247
248        // When 'b' was enqueued at step 3, the queue was also empty, since immediately after enqueuing 'a' at
249        // step 1, it was dequeued in order to fulfill the read() call that was made at step 0. Thus the queue
250        // had size 1 (thus desiredSize of 0).
251        assert_array_equals(desiredSizes, [1, 0],
252                            'at step 4, the desiredSize at the last enqueue (step 3) must have been 0');
253        assert_equals(rs.controller.desiredSize, 0, 'at step 4, the current desiredSize must be 0');
254      }),
255
256      steps.waitThenAdvance(6).then(() => {
257        assert_array_equals(chunksFinishedWriting, [], 'at step 6, zero chunks must have finished writing');
258        assert_array_equals(ws.events, ['write', 'a'], 'at step 6, one chunk must have been written');
259
260        // When 'c' was enqueued at step 5, the queue was not empty; it had 'b' in it, since 'b' will not be read until
261        // the first write completes at step 9. Thus, the queue size is 2 after enqueuing 'c', giving a desiredSize of
262        // -1.
263        assert_array_equals(desiredSizes, [1, 0, -1],
264                            'at step 6, the desiredSize at the last enqueue (step 5) must have been -1');
265        assert_equals(rs.controller.desiredSize, -1, 'at step 6, the current desiredSize must be -1');
266      }),
267
268      steps.waitThenAdvance(8).then(() => {
269        assert_array_equals(chunksFinishedWriting, [], 'at step 8, zero chunks must have finished writing');
270        assert_array_equals(ws.events, ['write', 'a'], 'at step 8, one chunk must have been written');
271
272        // When 'd' was enqueued at step 7, the situation is the same as before, leading to a queue containing 'b', 'c',
273        // and 'd'.
274        assert_array_equals(desiredSizes, [1, 0, -1, -2],
275                            'at step 8, the desiredSize at the last enqueue (step 7) must have been -2');
276        assert_equals(rs.controller.desiredSize, -2, 'at step 8, the current desiredSize must be -2');
277      }),
278
279      steps.waitThenAdvance(10).then(() => {
280        assert_array_equals(chunksFinishedWriting, ['a'], 'at step 10, one chunk must have finished writing');
281        assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
282                            'at step 10, two chunks must have been written');
283
284        assert_equals(rs.controller.desiredSize, -1, 'at step 10, the current desiredSize must be -1');
285      }),
286
287      pipePromise.then(() => {
288        assert_array_equals(desiredSizes, [1, 0, -1, -2], 'backpressure must have been exerted at the source');
289        assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks finished writing');
290
291        assert_array_equals(rs.eventsWithoutPulls, [], 'nothing unexpected should happen to the ReadableStream');
292        assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c', 'write', 'd', 'close'],
293                            'all chunks were written (and the WritableStream closed)');
294      })
295    ]);
296  });
297}, 'Piping to a WritableStream that does not consume the writes fast enough exerts backpressure on the ReadableStream');
298