1// Flags: --expose-internals --no-warnings 2'use strict'; 3 4const common = require('../common'); 5const assert = require('assert'); 6 7const { 8 ReadableStream, 9 ReadableByteStreamController, 10 ReadableStreamDefaultReader, 11 ReadableStreamBYOBReader, 12 ReadableStreamBYOBRequest, 13} = require('stream/web'); 14 15const { 16 kState, 17} = require('internal/webstreams/util'); 18 19const { 20 open, 21} = require('fs/promises'); 22 23const { 24 readFileSync, 25} = require('fs'); 26 27const { 28 Buffer, 29} = require('buffer'); 30 31const { 32 inspect, 33} = require('util'); 34 35{ 36 const r = new ReadableStream({ 37 type: 'bytes', 38 }); 39 40 assert(r[kState].controller instanceof ReadableByteStreamController); 41 42 assert.strictEqual(typeof r.locked, 'boolean'); 43 assert.strictEqual(typeof r.cancel, 'function'); 44 assert.strictEqual(typeof r.getReader, 'function'); 45 assert.strictEqual(typeof r.pipeThrough, 'function'); 46 assert.strictEqual(typeof r.pipeTo, 'function'); 47 assert.strictEqual(typeof r.tee, 'function'); 48 49 ['', null, 'asdf'].forEach((mode) => { 50 assert.throws(() => r.getReader({ mode }), { 51 code: 'ERR_INVALID_ARG_VALUE', 52 }); 53 }); 54 55 [1, 'asdf'].forEach((options) => { 56 assert.throws(() => r.getReader(options), { 57 code: 'ERR_INVALID_ARG_TYPE', 58 }); 59 }); 60 61 assert(!r.locked); 62 const defaultReader = r.getReader(); 63 assert(r.locked); 64 assert(defaultReader instanceof ReadableStreamDefaultReader); 65 defaultReader.releaseLock(); 66 const byobReader = r.getReader({ mode: 'byob' }); 67 assert(byobReader instanceof ReadableStreamBYOBReader); 68 assert.match( 69 inspect(byobReader, { depth: 0 }), 70 /ReadableStreamBYOBReader/); 71} 72 73class Source { 74 constructor() { 75 this.controllerClosed = false; 76 } 77 78 async start(controller) { 79 this.file = await open(__filename); 80 this.controller = controller; 81 } 82 83 async pull(controller) { 84 const byobRequest = controller.byobRequest; 85 assert.match(inspect(byobRequest), /ReadableStreamBYOBRequest/); 86 87 const view = byobRequest.view; 88 const { 89 bytesRead, 90 } = await this.file.read({ 91 buffer: view, 92 offset: view.byteOffset, 93 length: view.byteLength 94 }); 95 96 if (bytesRead === 0) { 97 await this.file.close(); 98 this.controller.close(); 99 } 100 101 assert.throws(() => byobRequest.respondWithNewView({}), { 102 code: 'ERR_INVALID_ARG_TYPE', 103 }); 104 105 byobRequest.respond(bytesRead); 106 107 assert.throws(() => byobRequest.respond(bytesRead), { 108 code: 'ERR_INVALID_STATE', 109 }); 110 assert.throws(() => byobRequest.respondWithNewView(view), { 111 code: 'ERR_INVALID_STATE', 112 }); 113 } 114 115 get type() { return 'bytes'; } 116 117 get autoAllocateChunkSize() { return 1024; } 118} 119 120{ 121 const stream = new ReadableStream(new Source()); 122 assert(stream[kState].controller instanceof ReadableByteStreamController); 123 124 async function read(stream) { 125 const reader = stream.getReader({ mode: 'byob' }); 126 127 const chunks = []; 128 let result; 129 do { 130 result = await reader.read(Buffer.alloc(100)); 131 if (result.value !== undefined) 132 chunks.push(Buffer.from(result.value)); 133 } while (!result.done); 134 135 return Buffer.concat(chunks); 136 } 137 138 read(stream).then(common.mustCall((data) => { 139 const check = readFileSync(__filename); 140 assert.deepStrictEqual(check, data); 141 })); 142} 143 144{ 145 const stream = new ReadableStream(new Source()); 146 assert(stream[kState].controller instanceof ReadableByteStreamController); 147 148 async function read(stream) { 149 const chunks = []; 150 for await (const chunk of stream) 151 chunks.push(chunk); 152 153 return Buffer.concat(chunks); 154 } 155 156 read(stream).then(common.mustCall((data) => { 157 const check = readFileSync(__filename); 158 assert.deepStrictEqual(check, data); 159 })); 160} 161 162{ 163 const stream = new ReadableStream(new Source()); 164 assert(stream[kState].controller instanceof ReadableByteStreamController); 165 166 async function read(stream) { 167 // eslint-disable-next-line no-unused-vars 168 for await (const _ of stream) 169 break; 170 } 171 172 read(stream).then(common.mustCall()); 173} 174 175{ 176 const stream = new ReadableStream(new Source()); 177 assert(stream[kState].controller instanceof ReadableByteStreamController); 178 179 const error = new Error('boom'); 180 181 async function read(stream) { 182 // eslint-disable-next-line no-unused-vars 183 for await (const _ of stream) 184 throw error; 185 } 186 187 assert.rejects(read(stream), error); 188} 189 190{ 191 assert.throws(() => { 192 Reflect.get(ReadableStreamBYOBRequest.prototype, 'view', {}); 193 }, { 194 code: 'ERR_INVALID_THIS', 195 }); 196 197 assert.throws(() => ReadableStreamBYOBRequest.prototype.respond.call({}), { 198 code: 'ERR_INVALID_THIS', 199 }); 200 201 assert.throws(() => { 202 ReadableStreamBYOBRequest.prototype.respondWithNewView.call({}); 203 }, { 204 code: 'ERR_INVALID_THIS', 205 }); 206} 207 208{ 209 const readable = new ReadableStream({ type: 'bytes' }); 210 const reader = readable.getReader({ mode: 'byob' }); 211 reader.releaseLock(); 212 reader.releaseLock(); 213 assert.rejects(reader.read(new Uint8Array(10)), { 214 code: 'ERR_INVALID_STATE', 215 }); 216 assert.rejects(reader.cancel(), { 217 code: 'ERR_INVALID_STATE', 218 }); 219} 220 221{ 222 let controller; 223 new ReadableStream({ 224 type: 'bytes', 225 start(c) { controller = c; } 226 }); 227 assert.throws(() => controller.enqueue(1), { 228 code: 'ERR_INVALID_ARG_TYPE', 229 }); 230 controller.close(); 231 assert.throws(() => controller.enqueue(new Uint8Array(10)), { 232 code: 'ERR_INVALID_STATE', 233 }); 234 assert.throws(() => controller.close(), { 235 code: 'ERR_INVALID_STATE', 236 }); 237} 238 239{ 240 let controller; 241 new ReadableStream({ 242 type: 'bytes', 243 start(c) { controller = c; } 244 }); 245 controller.enqueue(new Uint8Array(10)); 246 controller.close(); 247 assert.throws(() => controller.enqueue(new Uint8Array(10)), { 248 code: 'ERR_INVALID_STATE', 249 }); 250} 251 252{ 253 const stream = new ReadableStream({ 254 type: 'bytes', 255 pull(c) { 256 const v = new Uint8Array(c.byobRequest.view.buffer, 0, 3); 257 v.set([20, 21, 22]); 258 c.byobRequest.respondWithNewView(v); 259 }, 260 }); 261 const buffer = new ArrayBuffer(10); 262 const view = new Uint8Array(buffer, 0, 3); 263 view.set([10, 11, 12]); 264 const reader = stream.getReader({ mode: 'byob' }); 265 reader.read(view); 266} 267 268{ 269 const stream = new ReadableStream({ 270 type: 'bytes', 271 autoAllocateChunkSize: 10, 272 pull(c) { 273 const v = new Uint8Array(c.byobRequest.view.buffer, 0, 3); 274 v.set([20, 21, 22]); 275 c.byobRequest.respondWithNewView(v); 276 }, 277 }); 278 const reader = stream.getReader(); 279 reader.read(); 280} 281