• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// META: global=window,worker
2// META: script=../resources/recording-streams.js
3// META: script=../resources/rs-utils.js
4// META: script=../resources/test-utils.js
5'use strict';
6
7// The size() function of readableStrategy can re-entrantly call back into the TransformStream implementation. This
8// makes it risky to cache state across the call to ReadableStreamDefaultControllerEnqueue. These tests attempt to catch
9// such errors. They are separated from the other strategy tests because no real user code should ever do anything like
10// this.
11//
12// There is no such issue with writableStrategy size() because it is never called from within TransformStream
13// algorithms.
14
15const error1 = new Error('error1');
16error1.name = 'error1';
17
18promise_test(() => {
19  let controller;
20  let calls = 0;
21  const ts = new TransformStream({
22    start(c) {
23      controller = c;
24    }
25  }, undefined, {
26    size() {
27      ++calls;
28      if (calls < 2) {
29        controller.enqueue('b');
30      }
31      return 1;
32    },
33    highWaterMark: Infinity
34  });
35  const writer = ts.writable.getWriter();
36  return Promise.all([writer.write('a'), writer.close()])
37      .then(() => readableStreamToArray(ts.readable))
38      .then(array => assert_array_equals(array, ['b', 'a'], 'array should contain two chunks'));
39}, 'enqueue() inside size() should work');
40
41promise_test(() => {
42  let controller;
43  const ts = new TransformStream({
44    start(c) {
45      controller = c;
46    }
47  }, undefined, {
48    size() {
49      // The readable queue is empty.
50      controller.terminate();
51      // The readable state has gone from "readable" to "closed".
52      return 1;
53      // This chunk will be enqueued, but will be impossible to read because the state is already "closed".
54    },
55    highWaterMark: Infinity
56  });
57  const writer = ts.writable.getWriter();
58  return writer.write('a')
59      .then(() => readableStreamToArray(ts.readable))
60      .then(array => assert_array_equals(array, [], 'array should contain no chunks'));
61  // The chunk 'a' is still in readable's queue. readable is closed so 'a' cannot be read. writable's queue is empty and
62  // it is still writable.
63}, 'terminate() inside size() should work');
64
65promise_test(t => {
66  let controller;
67  const ts = new TransformStream({
68    start(c) {
69      controller = c;
70    }
71  }, undefined, {
72    size() {
73      controller.error(error1);
74      return 1;
75    },
76    highWaterMark: Infinity
77  });
78  const writer = ts.writable.getWriter();
79  return writer.write('a')
80      .then(() => promise_rejects_exactly(t, error1, ts.readable.getReader().read(), 'read() should reject'));
81}, 'error() inside size() should work');
82
83promise_test(() => {
84  let controller;
85  const ts = new TransformStream({
86    start(c) {
87      controller = c;
88    }
89  }, undefined, {
90    size() {
91      assert_equals(controller.desiredSize, 1, 'desiredSize should be 1');
92      return 1;
93    },
94    highWaterMark: 1
95  });
96  const writer = ts.writable.getWriter();
97  return Promise.all([writer.write('a'), writer.close()])
98      .then(() => readableStreamToArray(ts.readable))
99      .then(array => assert_array_equals(array, ['a'], 'array should contain one chunk'));
100}, 'desiredSize inside size() should work');
101
102promise_test(t => {
103  let cancelPromise;
104  const ts = new TransformStream({}, undefined, {
105    size() {
106      cancelPromise = ts.readable.cancel(error1);
107      return 1;
108    },
109    highWaterMark: Infinity
110  });
111  const writer = ts.writable.getWriter();
112  return writer.write('a')
113      .then(() => {
114        promise_rejects_exactly(t, error1, writer.closed, 'writer.closed should reject');
115        return cancelPromise;
116      });
117}, 'readable cancel() inside size() should work');
118
119promise_test(() => {
120  let controller;
121  let pipeToPromise;
122  const ws = recordingWritableStream();
123  const ts = new TransformStream({
124    start(c) {
125      controller = c;
126    }
127  }, undefined, {
128    size() {
129      if (!pipeToPromise) {
130        pipeToPromise = ts.readable.pipeTo(ws);
131      }
132      return 1;
133    },
134    highWaterMark: 1
135  });
136  // Allow promise returned by start() to resolve so that enqueue() will happen synchronously.
137  return delay(0).then(() => {
138    controller.enqueue('a');
139    assert_not_equals(pipeToPromise, undefined);
140
141    // Some pipeTo() implementations need an additional chunk enqueued in order for the first one to be processed. See
142    // https://github.com/whatwg/streams/issues/794 for background.
143    controller.enqueue('a');
144
145    // Give pipeTo() a chance to process the queued chunks.
146    return delay(0);
147  }).then(() => {
148    assert_array_equals(ws.events, ['write', 'a', 'write', 'a'], 'ws should contain two chunks');
149    controller.terminate();
150    return pipeToPromise;
151  }).then(() => {
152    assert_array_equals(ws.events, ['write', 'a', 'write', 'a', 'close'], 'target should have been closed');
153  });
154}, 'pipeTo() inside size() should work');
155
156promise_test(() => {
157  let controller;
158  let readPromise;
159  let calls = 0;
160  let reader;
161  const ts = new TransformStream({
162    start(c) {
163      controller = c;
164    }
165  }, undefined, {
166    size() {
167      // This is triggered by controller.enqueue(). The queue is empty and there are no pending reads. pull() is called
168      // synchronously, allowing transform() to proceed asynchronously. This results in a second call to enqueue(),
169      // which resolves this pending read() without calling size() again.
170      readPromise = reader.read();
171      ++calls;
172      return 1;
173    },
174    highWaterMark: 0
175  });
176  reader = ts.readable.getReader();
177  const writer = ts.writable.getWriter();
178  let writeResolved = false;
179  const writePromise = writer.write('b').then(() => {
180    writeResolved = true;
181  });
182  return flushAsyncEvents().then(() => {
183    assert_false(writeResolved);
184    controller.enqueue('a');
185    assert_equals(calls, 1, 'size() should have been called once');
186    return delay(0);
187  }).then(() => {
188    assert_true(writeResolved);
189    assert_equals(calls, 1, 'size() should only be called once');
190    return readPromise;
191  }).then(({ value, done }) => {
192    assert_false(done, 'done should be false');
193    // See https://github.com/whatwg/streams/issues/794 for why this chunk is not 'a'.
194    assert_equals(value, 'b', 'chunk should have been read');
195    assert_equals(calls, 1, 'calls should still be 1');
196    return writePromise;
197  });
198}, 'read() inside of size() should work');
199
200promise_test(() => {
201  let writer;
202  let writePromise1;
203  let calls = 0;
204  const ts = new TransformStream({}, undefined, {
205    size() {
206      ++calls;
207      if (calls < 2) {
208        writePromise1 = writer.write('a');
209      }
210      return 1;
211    },
212    highWaterMark: Infinity
213  });
214  writer = ts.writable.getWriter();
215  // Give pull() a chance to be called.
216  return delay(0).then(() => {
217    // This write results in a synchronous call to transform(), enqueue(), and size().
218    const writePromise2 = writer.write('b');
219    assert_equals(calls, 1, 'size() should have been called once');
220    return Promise.all([writePromise1, writePromise2, writer.close()]);
221  }).then(() => {
222    assert_equals(calls, 2, 'size() should have been called twice');
223    return readableStreamToArray(ts.readable);
224  }).then(array => {
225    assert_array_equals(array, ['b', 'a'], 'both chunks should have been enqueued');
226    assert_equals(calls, 2, 'calls should still be 2');
227  });
228}, 'writer.write() inside size() should work');
229
230promise_test(() => {
231  let controller;
232  let writer;
233  let writePromise;
234  let calls = 0;
235  const ts = new TransformStream({
236    start(c) {
237      controller = c;
238    }
239  }, undefined, {
240    size() {
241      ++calls;
242      if (calls < 2) {
243        writePromise = writer.write('a');
244      }
245      return 1;
246    },
247    highWaterMark: Infinity
248  });
249  writer = ts.writable.getWriter();
250  // Give pull() a chance to be called.
251  return delay(0).then(() => {
252    // This enqueue results in synchronous calls to size(), write(), transform() and enqueue().
253    controller.enqueue('b');
254    assert_equals(calls, 2, 'size() should have been called twice');
255    return Promise.all([writePromise, writer.close()]);
256  }).then(() => {
257    return readableStreamToArray(ts.readable);
258  }).then(array => {
259    // Because one call to enqueue() is nested inside the other, they finish in the opposite order that they were
260    // called, so the chunks end up reverse order.
261    assert_array_equals(array, ['a', 'b'], 'both chunks should have been enqueued');
262    assert_equals(calls, 2, 'calls should still be 2');
263  });
264}, 'synchronous writer.write() inside size() should work');
265
266promise_test(() => {
267  let writer;
268  let closePromise;
269  let controller;
270  const ts = new TransformStream({
271    start(c) {
272      controller = c;
273    }
274  }, undefined, {
275    size() {
276      closePromise = writer.close();
277      return 1;
278    },
279    highWaterMark: 1
280  });
281  writer = ts.writable.getWriter();
282  const reader = ts.readable.getReader();
283  // Wait for the promise returned by start() to be resolved so that the call to close() will result in a synchronous
284  // call to TransformStreamDefaultSink.
285  return delay(0).then(() => {
286    controller.enqueue('a');
287    return reader.read();
288  }).then(({ value, done }) => {
289    assert_false(done, 'done should be false');
290    assert_equals(value, 'a', 'value should be correct');
291    return reader.read();
292  }).then(({ done }) => {
293    assert_true(done, 'done should be true');
294    return closePromise;
295  });
296}, 'writer.close() inside size() should work');
297
298promise_test(t => {
299  let abortPromise;
300  let controller;
301  const ts = new TransformStream({
302    start(c) {
303      controller = c;
304    }
305  }, undefined, {
306    size() {
307      abortPromise = ts.writable.abort(error1);
308      return 1;
309    },
310    highWaterMark: 1
311  });
312  const reader = ts.readable.getReader();
313  // Wait for the promise returned by start() to be resolved so that the call to abort() will result in a synchronous
314  // call to TransformStreamDefaultSink.
315  return delay(0).then(() => {
316    controller.enqueue('a');
317    return Promise.all([promise_rejects_exactly(t, error1, reader.read(), 'read() should reject'), abortPromise]);
318  });
319}, 'writer.abort() inside size() should work');
320