• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// META: global=window,worker
2// META: script=../resources/recording-streams.js
3// META: script=../resources/test-utils.js
4'use strict';
5
6const error1 = new Error('error1 message');
7error1.name = 'error1';
8
9promise_test(() => {
10  const ts = recordingTransformStream();
11  const writer = ts.writable.getWriter();
12  // This call never resolves.
13  writer.write('a');
14  return flushAsyncEvents().then(() => {
15    assert_array_equals(ts.events, [], 'transform should not be called');
16  });
17}, 'backpressure allows no transforms with a default identity transform and no reader');
18
19promise_test(() => {
20  const ts = recordingTransformStream({}, undefined, { highWaterMark: 1 });
21  const writer = ts.writable.getWriter();
22  // This call to write() resolves asynchronously.
23  writer.write('a');
24  // This call to write() waits for backpressure that is never relieved and never calls transform().
25  writer.write('b');
26  return flushAsyncEvents().then(() => {
27    assert_array_equals(ts.events, ['transform', 'a'], 'transform should be called once');
28  });
29}, 'backpressure only allows one transform() with a identity transform with a readable HWM of 1 and no reader');
30
31promise_test(() => {
32  // Without a transform() implementation, recordingTransformStream() never enqueues anything.
33  const ts = recordingTransformStream({
34    transform() {
35      // Discard all chunks. As a result, the readable side is never full enough to exert backpressure and transform()
36      // keeps being called.
37    }
38  }, undefined, { highWaterMark: 1 });
39  const writer = ts.writable.getWriter();
40  const writePromises = [];
41  for (let i = 0; i < 4; ++i) {
42    writePromises.push(writer.write(i));
43  }
44  return Promise.all(writePromises).then(() => {
45    assert_array_equals(ts.events, ['transform', 0, 'transform', 1, 'transform', 2, 'transform', 3],
46                        'all 4 events should be transformed');
47  });
48}, 'transform() should keep being called as long as there is no backpressure');
49
50promise_test(() => {
51  const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
52  const writer = ts.writable.getWriter();
53  const reader = ts.readable.getReader();
54  const events = [];
55  const writerPromises = [
56    writer.write('a').then(() => events.push('a')),
57    writer.write('b').then(() => events.push('b')),
58    writer.close().then(() => events.push('closed'))];
59  return delay(0).then(() => {
60    assert_array_equals(events, ['a'], 'the first write should have resolved');
61    return reader.read();
62  }).then(({ value, done }) => {
63    assert_false(done, 'done should not be true');
64    assert_equals('a', value, 'value should be "a"');
65    return delay(0);
66  }).then(() => {
67    assert_array_equals(events, ['a', 'b', 'closed'], 'both writes and close() should have resolved');
68    return reader.read();
69  }).then(({ value, done }) => {
70    assert_false(done, 'done should still not be true');
71    assert_equals('b', value, 'value should be "b"');
72    return reader.read();
73  }).then(({ done }) => {
74    assert_true(done, 'done should be true');
75    return writerPromises;
76  });
77}, 'writes should resolve as soon as transform completes');
78
79promise_test(() => {
80  const ts = new TransformStream(undefined, undefined, { highWaterMark: 0 });
81  const writer = ts.writable.getWriter();
82  const reader = ts.readable.getReader();
83  const readPromise = reader.read();
84  writer.write('a');
85  return readPromise.then(({ value, done }) => {
86    assert_false(done, 'not done');
87    assert_equals(value, 'a', 'value should be "a"');
88  });
89}, 'calling pull() before the first write() with backpressure should work');
90
91promise_test(() => {
92  let reader;
93  const ts = recordingTransformStream({
94    transform(chunk, controller) {
95      controller.enqueue(chunk);
96      return reader.read();
97    }
98  }, undefined, { highWaterMark: 1 });
99  const writer = ts.writable.getWriter();
100  reader = ts.readable.getReader();
101  return writer.write('a');
102}, 'transform() should be able to read the chunk it just enqueued');
103
104promise_test(() => {
105  let resolveTransform;
106  const transformPromise = new Promise(resolve => {
107    resolveTransform = resolve;
108  });
109  const ts = recordingTransformStream({
110    transform() {
111      return transformPromise;
112    }
113  }, undefined, new CountQueuingStrategy({ highWaterMark: Infinity }));
114  const writer = ts.writable.getWriter();
115  assert_equals(writer.desiredSize, 1, 'desiredSize should be 1');
116  return delay(0).then(() => {
117    writer.write('a');
118    assert_array_equals(ts.events, ['transform', 'a']);
119    assert_equals(writer.desiredSize, 0, 'desiredSize should be 0');
120    return flushAsyncEvents();
121  }).then(() => {
122    assert_equals(writer.desiredSize, 0, 'desiredSize should still be 0');
123    resolveTransform();
124    return delay(0);
125  }).then(() => {
126    assert_equals(writer.desiredSize, 1, 'desiredSize should be 1');
127  });
128}, 'blocking transform() should cause backpressure');
129
130promise_test(t => {
131  const ts = new TransformStream();
132  ts.readable.cancel(error1);
133  return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject');
134}, 'writer.closed should resolve after readable is canceled during start');
135
136promise_test(t => {
137  const ts = new TransformStream({}, undefined, { highWaterMark: 0 });
138  return delay(0).then(() => {
139    ts.readable.cancel(error1);
140    return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject');
141  });
142}, 'writer.closed should resolve after readable is canceled with backpressure');
143
144promise_test(t => {
145  const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
146  return delay(0).then(() => {
147    ts.readable.cancel(error1);
148    return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject');
149  });
150}, 'writer.closed should resolve after readable is canceled with no backpressure');
151
152promise_test(() => {
153  const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
154  const writer = ts.writable.getWriter();
155  return delay(0).then(() => {
156    const writePromise = writer.write('a');
157    ts.readable.cancel(error1);
158    return writePromise;
159  });
160}, 'cancelling the readable should cause a pending write to resolve');
161
162promise_test(t => {
163  const rs = new ReadableStream();
164  const ts = new TransformStream();
165  const pipePromise = rs.pipeTo(ts.writable);
166  ts.readable.cancel(error1);
167  return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected');
168}, 'cancelling the readable side of a TransformStream should abort an empty pipe');
169
170promise_test(t => {
171  const rs = new ReadableStream();
172  const ts = new TransformStream();
173  const pipePromise = rs.pipeTo(ts.writable);
174  return delay(0).then(() => {
175    ts.readable.cancel(error1);
176    return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected');
177  });
178}, 'cancelling the readable side of a TransformStream should abort an empty pipe after startup');
179
180promise_test(t => {
181  const rs = new ReadableStream({
182    start(controller) {
183      controller.enqueue('a');
184      controller.enqueue('b');
185      controller.enqueue('c');
186    }
187  });
188  const ts = new TransformStream();
189  const pipePromise = rs.pipeTo(ts.writable);
190  // Allow data to flow into the pipe.
191  return delay(0).then(() => {
192    ts.readable.cancel(error1);
193    return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected');
194  });
195}, 'cancelling the readable side of a TransformStream should abort a full pipe');
196