1// Flags: --expose-internals --no-warnings 2'use strict'; 3 4const common = require('../common'); 5 6const assert = require('assert'); 7 8const { 9 pipeline, 10 finished, 11 Writable, 12} = require('stream'); 13 14const { 15 ReadableStream, 16 WritableStream, 17} = require('stream/web'); 18 19const { 20 newStreamReadableFromReadableStream, 21} = require('internal/webstreams/adapters'); 22 23const { 24 kState, 25} = require('internal/webstreams/util'); 26 27class MySource { 28 constructor(value = new Uint8Array(10)) { 29 this.value = value; 30 } 31 32 start(c) { 33 this.started = true; 34 this.controller = c; 35 } 36 37 pull(controller) { 38 controller.enqueue(this.value); 39 controller.close(); 40 } 41 42 cancel(reason) { 43 this.canceled = true; 44 this.cancelReason = reason; 45 } 46} 47 48{ 49 // Destroying the readable without an error closes 50 // the readableStream. 51 52 const readableStream = new ReadableStream(); 53 const readable = newStreamReadableFromReadableStream(readableStream); 54 55 assert(readableStream.locked); 56 57 assert.rejects(readableStream.cancel(), { 58 code: 'ERR_INVALID_STATE', 59 }); 60 assert.rejects(readableStream.pipeTo(new WritableStream()), { 61 code: 'ERR_INVALID_STATE', 62 }); 63 assert.throws(() => readableStream.tee(), { 64 code: 'ERR_INVALID_STATE', 65 }); 66 assert.throws(() => readableStream.getReader(), { 67 code: 'ERR_INVALID_STATE', 68 }); 69 assert.throws(() => { 70 readableStream.pipeThrough({ 71 readable: new ReadableStream(), 72 writable: new WritableStream(), 73 }); 74 }, { 75 code: 'ERR_INVALID_STATE', 76 }); 77 78 readable.destroy(); 79 80 readable.on('close', common.mustCall(() => { 81 assert.strictEqual(readableStream[kState].state, 'closed'); 82 })); 83} 84 85{ 86 // Destroying the readable with an error closes the readableStream 87 // without error but records the cancel reason in the source. 88 const error = new Error('boom'); 89 const source = new MySource(); 90 const readableStream = new ReadableStream(source); 91 const readable = newStreamReadableFromReadableStream(readableStream); 92 93 assert(readableStream.locked); 94 95 readable.destroy(error); 96 97 readable.on('error', common.mustCall((reason) => { 98 assert.strictEqual(reason, error); 99 })); 100 101 readable.on('close', common.mustCall(() => { 102 assert.strictEqual(readableStream[kState].state, 'closed'); 103 assert.strictEqual(source.cancelReason, error); 104 })); 105} 106 107{ 108 // An error in the source causes the readable to error. 109 const error = new Error('boom'); 110 const source = new MySource(); 111 const readableStream = new ReadableStream(source); 112 const readable = newStreamReadableFromReadableStream(readableStream); 113 114 assert(readableStream.locked); 115 116 source.controller.error(error); 117 118 readable.on('error', common.mustCall((reason) => { 119 assert.strictEqual(reason, error); 120 })); 121 122 readable.on('close', common.mustCall(() => { 123 assert.strictEqual(readableStream[kState].state, 'errored'); 124 })); 125} 126 127{ 128 const readableStream = new ReadableStream(new MySource()); 129 const readable = newStreamReadableFromReadableStream(readableStream); 130 131 readable.on('data', common.mustCall((chunk) => { 132 assert.deepStrictEqual(chunk, Buffer.alloc(10)); 133 })); 134 readable.on('end', common.mustCall()); 135 readable.on('close', common.mustCall()); 136 readable.on('error', common.mustNotCall()); 137} 138 139{ 140 const readableStream = new ReadableStream(new MySource('hello')); 141 const readable = newStreamReadableFromReadableStream(readableStream, { 142 encoding: 'utf8', 143 }); 144 145 readable.on('data', common.mustCall((chunk) => { 146 assert.strictEqual(chunk, 'hello'); 147 })); 148 readable.on('end', common.mustCall()); 149 readable.on('close', common.mustCall()); 150 readable.on('error', common.mustNotCall()); 151} 152 153{ 154 const readableStream = new ReadableStream(new MySource()); 155 const readable = newStreamReadableFromReadableStream(readableStream, { 156 objectMode: true 157 }); 158 159 readable.on('data', common.mustCall((chunk) => { 160 assert.deepStrictEqual(chunk, new Uint8Array(10)); 161 })); 162 readable.on('end', common.mustCall()); 163 readable.on('close', common.mustCall()); 164 readable.on('error', common.mustNotCall()); 165} 166 167{ 168 const ec = new TextEncoder(); 169 const readable = new ReadableStream({ 170 start(controller) { 171 controller.enqueue(ec.encode('hello')); 172 setImmediate(() => { 173 controller.enqueue(ec.encode('there')); 174 controller.close(); 175 }); 176 } 177 }); 178 const streamReadable = newStreamReadableFromReadableStream(readable); 179 180 finished(streamReadable, common.mustCall()); 181 182 streamReadable.resume(); 183} 184 185{ 186 const ec = new TextEncoder(); 187 const readable = new ReadableStream({ 188 start(controller) { 189 controller.enqueue(ec.encode('hello')); 190 setImmediate(() => { 191 controller.enqueue(ec.encode('there')); 192 controller.close(); 193 }); 194 } 195 }); 196 const streamReadable = newStreamReadableFromReadableStream(readable); 197 198 finished(streamReadable, common.mustCall()); 199 200 streamReadable.resume(); 201} 202 203{ 204 const ec = new TextEncoder(); 205 const dc = new TextDecoder(); 206 const check = ['hello', 'there']; 207 const readable = new ReadableStream({ 208 start(controller) { 209 controller.enqueue(ec.encode('hello')); 210 setImmediate(() => { 211 controller.enqueue(ec.encode('there')); 212 controller.close(); 213 }); 214 } 215 }); 216 const writable = new Writable({ 217 write: common.mustCall((chunk, encoding, callback) => { 218 assert.strictEqual(dc.decode(chunk), check.shift()); 219 assert.strictEqual(encoding, 'buffer'); 220 callback(); 221 }, 2), 222 }); 223 224 const streamReadable = newStreamReadableFromReadableStream(readable); 225 226 pipeline(streamReadable, writable, common.mustCall()); 227 228 streamReadable.resume(); 229} 230