1// META: global=window,worker 2// META: script=../resources/recording-streams.js 3// META: script=../resources/test-utils.js 4'use strict'; 5 6const error1 = new Error('error1 message'); 7error1.name = 'error1'; 8 9promise_test(() => { 10 const ts = recordingTransformStream(); 11 const writer = ts.writable.getWriter(); 12 // This call never resolves. 13 writer.write('a'); 14 return flushAsyncEvents().then(() => { 15 assert_array_equals(ts.events, [], 'transform should not be called'); 16 }); 17}, 'backpressure allows no transforms with a default identity transform and no reader'); 18 19promise_test(() => { 20 const ts = recordingTransformStream({}, undefined, { highWaterMark: 1 }); 21 const writer = ts.writable.getWriter(); 22 // This call to write() resolves asynchronously. 23 writer.write('a'); 24 // This call to write() waits for backpressure that is never relieved and never calls transform(). 25 writer.write('b'); 26 return flushAsyncEvents().then(() => { 27 assert_array_equals(ts.events, ['transform', 'a'], 'transform should be called once'); 28 }); 29}, 'backpressure only allows one transform() with a identity transform with a readable HWM of 1 and no reader'); 30 31promise_test(() => { 32 // Without a transform() implementation, recordingTransformStream() never enqueues anything. 33 const ts = recordingTransformStream({ 34 transform() { 35 // Discard all chunks. As a result, the readable side is never full enough to exert backpressure and transform() 36 // keeps being called. 37 } 38 }, undefined, { highWaterMark: 1 }); 39 const writer = ts.writable.getWriter(); 40 const writePromises = []; 41 for (let i = 0; i < 4; ++i) { 42 writePromises.push(writer.write(i)); 43 } 44 return Promise.all(writePromises).then(() => { 45 assert_array_equals(ts.events, ['transform', 0, 'transform', 1, 'transform', 2, 'transform', 3], 46 'all 4 events should be transformed'); 47 }); 48}, 'transform() should keep being called as long as there is no backpressure'); 49 50promise_test(() => { 51 const ts = new TransformStream({}, undefined, { highWaterMark: 1 }); 52 const writer = ts.writable.getWriter(); 53 const reader = ts.readable.getReader(); 54 const events = []; 55 const writerPromises = [ 56 writer.write('a').then(() => events.push('a')), 57 writer.write('b').then(() => events.push('b')), 58 writer.close().then(() => events.push('closed'))]; 59 return delay(0).then(() => { 60 assert_array_equals(events, ['a'], 'the first write should have resolved'); 61 return reader.read(); 62 }).then(({ value, done }) => { 63 assert_false(done, 'done should not be true'); 64 assert_equals('a', value, 'value should be "a"'); 65 return delay(0); 66 }).then(() => { 67 assert_array_equals(events, ['a', 'b', 'closed'], 'both writes and close() should have resolved'); 68 return reader.read(); 69 }).then(({ value, done }) => { 70 assert_false(done, 'done should still not be true'); 71 assert_equals('b', value, 'value should be "b"'); 72 return reader.read(); 73 }).then(({ done }) => { 74 assert_true(done, 'done should be true'); 75 return writerPromises; 76 }); 77}, 'writes should resolve as soon as transform completes'); 78 79promise_test(() => { 80 const ts = new TransformStream(undefined, undefined, { highWaterMark: 0 }); 81 const writer = ts.writable.getWriter(); 82 const reader = ts.readable.getReader(); 83 const readPromise = reader.read(); 84 writer.write('a'); 85 return readPromise.then(({ value, done }) => { 86 assert_false(done, 'not done'); 87 assert_equals(value, 'a', 'value should be "a"'); 88 }); 89}, 'calling pull() before the first write() with backpressure should work'); 90 91promise_test(() => { 92 let reader; 93 const ts = recordingTransformStream({ 94 transform(chunk, controller) { 95 controller.enqueue(chunk); 96 return reader.read(); 97 } 98 }, undefined, { highWaterMark: 1 }); 99 const writer = ts.writable.getWriter(); 100 reader = ts.readable.getReader(); 101 return writer.write('a'); 102}, 'transform() should be able to read the chunk it just enqueued'); 103 104promise_test(() => { 105 let resolveTransform; 106 const transformPromise = new Promise(resolve => { 107 resolveTransform = resolve; 108 }); 109 const ts = recordingTransformStream({ 110 transform() { 111 return transformPromise; 112 } 113 }, undefined, new CountQueuingStrategy({ highWaterMark: Infinity })); 114 const writer = ts.writable.getWriter(); 115 assert_equals(writer.desiredSize, 1, 'desiredSize should be 1'); 116 return delay(0).then(() => { 117 writer.write('a'); 118 assert_array_equals(ts.events, ['transform', 'a']); 119 assert_equals(writer.desiredSize, 0, 'desiredSize should be 0'); 120 return flushAsyncEvents(); 121 }).then(() => { 122 assert_equals(writer.desiredSize, 0, 'desiredSize should still be 0'); 123 resolveTransform(); 124 return delay(0); 125 }).then(() => { 126 assert_equals(writer.desiredSize, 1, 'desiredSize should be 1'); 127 }); 128}, 'blocking transform() should cause backpressure'); 129 130promise_test(t => { 131 const ts = new TransformStream(); 132 ts.readable.cancel(error1); 133 return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject'); 134}, 'writer.closed should resolve after readable is canceled during start'); 135 136promise_test(t => { 137 const ts = new TransformStream({}, undefined, { highWaterMark: 0 }); 138 return delay(0).then(() => { 139 ts.readable.cancel(error1); 140 return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject'); 141 }); 142}, 'writer.closed should resolve after readable is canceled with backpressure'); 143 144promise_test(t => { 145 const ts = new TransformStream({}, undefined, { highWaterMark: 1 }); 146 return delay(0).then(() => { 147 ts.readable.cancel(error1); 148 return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject'); 149 }); 150}, 'writer.closed should resolve after readable is canceled with no backpressure'); 151 152promise_test(() => { 153 const ts = new TransformStream({}, undefined, { highWaterMark: 1 }); 154 const writer = ts.writable.getWriter(); 155 return delay(0).then(() => { 156 const writePromise = writer.write('a'); 157 ts.readable.cancel(error1); 158 return writePromise; 159 }); 160}, 'cancelling the readable should cause a pending write to resolve'); 161 162promise_test(t => { 163 const rs = new ReadableStream(); 164 const ts = new TransformStream(); 165 const pipePromise = rs.pipeTo(ts.writable); 166 ts.readable.cancel(error1); 167 return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected'); 168}, 'cancelling the readable side of a TransformStream should abort an empty pipe'); 169 170promise_test(t => { 171 const rs = new ReadableStream(); 172 const ts = new TransformStream(); 173 const pipePromise = rs.pipeTo(ts.writable); 174 return delay(0).then(() => { 175 ts.readable.cancel(error1); 176 return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected'); 177 }); 178}, 'cancelling the readable side of a TransformStream should abort an empty pipe after startup'); 179 180promise_test(t => { 181 const rs = new ReadableStream({ 182 start(controller) { 183 controller.enqueue('a'); 184 controller.enqueue('b'); 185 controller.enqueue('c'); 186 } 187 }); 188 const ts = new TransformStream(); 189 const pipePromise = rs.pipeTo(ts.writable); 190 // Allow data to flow into the pipe. 191 return delay(0).then(() => { 192 ts.readable.cancel(error1); 193 return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected'); 194 }); 195}, 'cancelling the readable side of a TransformStream should abort a full pipe'); 196