1// Flags: --no-warnings --expose-internals 2'use strict'; 3 4const common = require('../common'); 5 6const assert = require('assert'); 7 8const { 9 newReadableStreamFromStreamReadable, 10} = require('internal/webstreams/adapters'); 11 12const { 13 Duplex, 14 Readable, 15} = require('stream'); 16 17const { 18 kState, 19} = require('internal/webstreams/util'); 20 21{ 22 // Canceling the readableStream closes the readable. 23 const readable = new Readable({ 24 read() { 25 readable.push('hello'); 26 readable.push(null); 27 } 28 }); 29 30 readable.on('close', common.mustCall()); 31 readable.on('end', common.mustNotCall()); 32 readable.on('pause', common.mustCall()); 33 readable.on('resume', common.mustNotCall()); 34 readable.on('error', common.mustCall((error) => { 35 assert.strictEqual(error.code, 'ABORT_ERR'); 36 })); 37 38 const readableStream = newReadableStreamFromStreamReadable(readable); 39 40 readableStream.cancel().then(common.mustCall()); 41} 42 43{ 44 // Prematurely destroying the stream.Readable without an error 45 // closes the ReadableStream with a premature close error but does 46 // not error the readable. 47 48 const readable = new Readable({ 49 read() { 50 readable.push('hello'); 51 readable.push(null); 52 } 53 }); 54 55 const readableStream = newReadableStreamFromStreamReadable(readable); 56 57 assert(!readableStream.locked); 58 59 const reader = readableStream.getReader(); 60 61 assert.rejects(reader.closed, { 62 code: 'ABORT_ERR', 63 }); 64 65 readable.on('end', common.mustNotCall()); 66 readable.on('error', common.mustNotCall()); 67 68 readable.on('close', common.mustCall(() => { 69 assert.strictEqual(readableStream[kState].state, 'errored'); 70 })); 71 72 readable.destroy(); 73} 74 75{ 76 // Ending the readable without an error just closes the 77 // readableStream without an error. 78 const readable = new Readable({ 79 read() { 80 readable.push('hello'); 81 readable.push(null); 82 } 83 }); 84 85 const readableStream = newReadableStreamFromStreamReadable(readable); 86 87 assert(!readableStream.locked); 88 89 const reader = readableStream.getReader(); 90 91 reader.closed.then(common.mustCall()); 92 93 readable.on('end', common.mustCall()); 94 readable.on('error', common.mustNotCall()); 95 96 readable.on('close', common.mustCall(() => { 97 assert.strictEqual(readableStream[kState].state, 'closed'); 98 })); 99 100 readable.push(null); 101} 102 103{ 104 // Destroying the readable with an error should error the readableStream 105 const error = new Error('boom'); 106 const readable = new Readable({ 107 read() { 108 readable.push('hello'); 109 readable.push(null); 110 } 111 }); 112 113 const readableStream = newReadableStreamFromStreamReadable(readable); 114 115 assert(!readableStream.locked); 116 117 const reader = readableStream.getReader(); 118 119 assert.rejects(reader.closed, error); 120 121 readable.on('end', common.mustNotCall()); 122 readable.on('error', common.mustCall((reason) => { 123 assert.strictEqual(reason, error); 124 })); 125 126 readable.on('close', common.mustCall(() => { 127 assert.strictEqual(readableStream[kState].state, 'errored'); 128 })); 129 130 readable.destroy(error); 131} 132 133{ 134 const readable = new Readable({ 135 encoding: 'utf8', 136 read() { 137 readable.push('hello'); 138 readable.push(null); 139 } 140 }); 141 142 const readableStream = newReadableStreamFromStreamReadable(readable); 143 const reader = readableStream.getReader(); 144 145 readable.on('data', common.mustCall()); 146 readable.on('end', common.mustCall()); 147 readable.on('close', common.mustCall()); 148 149 (async () => { 150 assert.deepStrictEqual( 151 await reader.read(), 152 { value: 'hello', done: false }); 153 assert.deepStrictEqual( 154 await reader.read(), 155 { value: undefined, done: true }); 156 157 })().then(common.mustCall()); 158} 159 160{ 161 const data = {}; 162 const readable = new Readable({ 163 objectMode: true, 164 read() { 165 readable.push(data); 166 readable.push(null); 167 } 168 }); 169 170 assert(readable.readableObjectMode); 171 172 const readableStream = newReadableStreamFromStreamReadable(readable); 173 const reader = readableStream.getReader(); 174 175 readable.on('data', common.mustCall()); 176 readable.on('end', common.mustCall()); 177 readable.on('close', common.mustCall()); 178 179 (async () => { 180 assert.deepStrictEqual( 181 await reader.read(), 182 { value: data, done: false }); 183 assert.deepStrictEqual( 184 await reader.read(), 185 { value: undefined, done: true }); 186 187 })().then(common.mustCall()); 188} 189 190{ 191 const readable = new Readable(); 192 readable.destroy(); 193 const readableStream = newReadableStreamFromStreamReadable(readable); 194 const reader = readableStream.getReader(); 195 reader.closed.then(common.mustCall()); 196} 197 198{ 199 const duplex = new Duplex({ readable: false }); 200 duplex.destroy(); 201 const readableStream = newReadableStreamFromStreamReadable(duplex); 202 const reader = readableStream.getReader(); 203 reader.closed.then(common.mustCall()); 204} 205