1'use strict'; 2 3const common = require('../common'); 4const { Readable, Transform, PassThrough, pipeline } = require('stream'); 5const assert = require('assert'); 6 7async function tests() { 8 { 9 const AsyncIteratorPrototype = Object.getPrototypeOf( 10 Object.getPrototypeOf(async function* () {}).prototype); 11 const rs = new Readable({}); 12 assert.strictEqual( 13 Object.getPrototypeOf(Object.getPrototypeOf(rs[Symbol.asyncIterator]())), 14 AsyncIteratorPrototype); 15 } 16 17 { 18 const readable = new Readable({ objectMode: true, read() {} }); 19 readable.push(0); 20 readable.push(1); 21 readable.push(null); 22 23 const iter = readable[Symbol.asyncIterator](); 24 assert.strictEqual((await iter.next()).value, 0); 25 for await (const d of iter) { 26 assert.strictEqual(d, 1); 27 } 28 } 29 30 { 31 console.log('read without for..await'); 32 const max = 5; 33 const readable = new Readable({ 34 objectMode: true, 35 read() {} 36 }); 37 38 const iter = readable[Symbol.asyncIterator](); 39 assert.strictEqual(iter.stream, readable); 40 const values = []; 41 for (let i = 0; i < max; i++) { 42 values.push(iter.next()); 43 } 44 Promise.all(values).then(common.mustCall((values) => { 45 values.forEach(common.mustCall( 46 (item, i) => assert.strictEqual(item.value, 'hello-' + i), 5)); 47 })); 48 49 readable.push('hello-0'); 50 readable.push('hello-1'); 51 readable.push('hello-2'); 52 readable.push('hello-3'); 53 readable.push('hello-4'); 54 readable.push(null); 55 56 const last = await iter.next(); 57 assert.strictEqual(last.done, true); 58 } 59 60 { 61 console.log('read without for..await deferred'); 62 const readable = new Readable({ 63 objectMode: true, 64 read() {} 65 }); 66 67 const iter = readable[Symbol.asyncIterator](); 68 assert.strictEqual(iter.stream, readable); 69 let values = []; 70 for (let i = 0; i < 3; i++) { 71 values.push(iter.next()); 72 } 73 74 readable.push('hello-0'); 75 readable.push('hello-1'); 76 readable.push('hello-2'); 77 78 let k = 0; 79 const results1 = await Promise.all(values); 80 results1.forEach(common.mustCall( 81 (item) => assert.strictEqual(item.value, 'hello-' + k++), 3)); 82 83 values = []; 84 for (let i = 0; i < 2; i++) { 85 values.push(iter.next()); 86 } 87 88 readable.push('hello-3'); 89 readable.push('hello-4'); 90 readable.push(null); 91 92 const results2 = await Promise.all(values); 93 results2.forEach(common.mustCall( 94 (item) => assert.strictEqual(item.value, 'hello-' + k++), 2)); 95 96 const last = await iter.next(); 97 assert.strictEqual(last.done, true); 98 } 99 100 { 101 console.log('read without for..await with errors'); 102 const max = 3; 103 const readable = new Readable({ 104 objectMode: true, 105 read() {} 106 }); 107 108 const iter = readable[Symbol.asyncIterator](); 109 assert.strictEqual(iter.stream, readable); 110 const values = []; 111 const errors = []; 112 let i; 113 for (i = 0; i < max; i++) { 114 values.push(iter.next()); 115 } 116 for (i = 0; i < 2; i++) { 117 errors.push(iter.next()); 118 } 119 120 readable.push('hello-0'); 121 readable.push('hello-1'); 122 readable.push('hello-2'); 123 124 const resolved = await Promise.all(values); 125 126 resolved.forEach(common.mustCall( 127 (item, i) => assert.strictEqual(item.value, 'hello-' + i), max)); 128 129 errors.forEach((promise) => { 130 promise.catch(common.mustCall((err) => { 131 assert.strictEqual(err.message, 'kaboom'); 132 })); 133 }); 134 135 readable.destroy(new Error('kaboom')); 136 } 137 138 { 139 console.log('call next() after error'); 140 const readable = new Readable({ 141 read() {} 142 }); 143 const iterator = readable[Symbol.asyncIterator](); 144 145 const err = new Error('kaboom'); 146 readable.destroy(new Error('kaboom')); 147 await assert.rejects(iterator.next.bind(iterator), err); 148 } 149 150 { 151 console.log('read object mode'); 152 const max = 42; 153 let readed = 0; 154 let received = 0; 155 const readable = new Readable({ 156 objectMode: true, 157 read() { 158 this.push('hello'); 159 if (++readed === max) { 160 this.push(null); 161 } 162 } 163 }); 164 165 for await (const k of readable) { 166 received++; 167 assert.strictEqual(k, 'hello'); 168 } 169 170 assert.strictEqual(readed, received); 171 } 172 173 { 174 console.log('destroy sync'); 175 const readable = new Readable({ 176 objectMode: true, 177 read() { 178 this.destroy(new Error('kaboom from read')); 179 } 180 }); 181 182 let err; 183 try { 184 // eslint-disable-next-line no-unused-vars 185 for await (const k of readable) {} 186 } catch (e) { 187 err = e; 188 } 189 assert.strictEqual(err.message, 'kaboom from read'); 190 } 191 192 { 193 console.log('destroy async'); 194 const readable = new Readable({ 195 objectMode: true, 196 read() { 197 if (!this.pushed) { 198 this.push('hello'); 199 this.pushed = true; 200 201 setImmediate(() => { 202 this.destroy(new Error('kaboom')); 203 }); 204 } 205 } 206 }); 207 208 let received = 0; 209 210 let err = null; 211 try { 212 // eslint-disable-next-line no-unused-vars 213 for await (const k of readable) { 214 received++; 215 } 216 } catch (e) { 217 err = e; 218 } 219 220 assert.strictEqual(err.message, 'kaboom'); 221 assert.strictEqual(received, 1); 222 } 223 224 { 225 console.log('destroyed by throw'); 226 const readable = new Readable({ 227 objectMode: true, 228 read() { 229 this.push('hello'); 230 } 231 }); 232 233 let err = null; 234 try { 235 for await (const k of readable) { 236 assert.strictEqual(k, 'hello'); 237 throw new Error('kaboom'); 238 } 239 } catch (e) { 240 err = e; 241 } 242 243 assert.strictEqual(err.message, 'kaboom'); 244 assert.strictEqual(readable.destroyed, true); 245 } 246 247 { 248 console.log('destroyed sync after push'); 249 const readable = new Readable({ 250 objectMode: true, 251 read() { 252 this.push('hello'); 253 this.destroy(new Error('kaboom')); 254 } 255 }); 256 257 let received = 0; 258 259 let err = null; 260 try { 261 for await (const k of readable) { 262 assert.strictEqual(k, 'hello'); 263 received++; 264 } 265 } catch (e) { 266 err = e; 267 } 268 269 assert.strictEqual(err.message, 'kaboom'); 270 assert.strictEqual(received, 1); 271 } 272 273 { 274 console.log('push async'); 275 const max = 42; 276 let readed = 0; 277 let received = 0; 278 const readable = new Readable({ 279 objectMode: true, 280 read() { 281 setImmediate(() => { 282 this.push('hello'); 283 if (++readed === max) { 284 this.push(null); 285 } 286 }); 287 } 288 }); 289 290 for await (const k of readable) { 291 received++; 292 assert.strictEqual(k, 'hello'); 293 } 294 295 assert.strictEqual(readed, received); 296 } 297 298 { 299 console.log('push binary async'); 300 const max = 42; 301 let readed = 0; 302 const readable = new Readable({ 303 read() { 304 setImmediate(() => { 305 this.push('hello'); 306 if (++readed === max) { 307 this.push(null); 308 } 309 }); 310 } 311 }); 312 313 let expected = ''; 314 readable.setEncoding('utf8'); 315 readable.pause(); 316 readable.on('data', (chunk) => { 317 expected += chunk; 318 }); 319 320 let data = ''; 321 for await (const k of readable) { 322 data += k; 323 } 324 325 assert.strictEqual(data, expected); 326 } 327 328 { 329 console.log('.next() on destroyed stream'); 330 const readable = new Readable({ 331 read() { 332 // no-op 333 } 334 }); 335 336 readable.destroy(); 337 338 const { done } = await readable[Symbol.asyncIterator]().next(); 339 assert.strictEqual(done, true); 340 } 341 342 { 343 console.log('.next() on pipelined stream'); 344 const readable = new Readable({ 345 read() { 346 // no-op 347 } 348 }); 349 350 const passthrough = new PassThrough(); 351 const err = new Error('kaboom'); 352 pipeline(readable, passthrough, common.mustCall((e) => { 353 assert.strictEqual(e, err); 354 })); 355 readable.destroy(err); 356 await assert.rejects( 357 readable[Symbol.asyncIterator]().next(), 358 (e) => { 359 assert.strictEqual(e, err); 360 return true; 361 } 362 ); 363 } 364 365 { 366 console.log('iterating on an ended stream completes'); 367 const r = new Readable({ 368 objectMode: true, 369 read() { 370 this.push('asdf'); 371 this.push('hehe'); 372 this.push(null); 373 } 374 }); 375 // eslint-disable-next-line no-unused-vars 376 for await (const a of r) { 377 } 378 // eslint-disable-next-line no-unused-vars 379 for await (const b of r) { 380 } 381 } 382 383 { 384 console.log('destroy mid-stream does not error'); 385 const r = new Readable({ 386 objectMode: true, 387 read() { 388 this.push('asdf'); 389 this.push('hehe'); 390 } 391 }); 392 393 // eslint-disable-next-line no-unused-vars 394 for await (const a of r) { 395 r.destroy(null); 396 } 397 } 398 399 { 400 console.log('readable side of a transform stream pushes null'); 401 const transform = new Transform({ 402 objectMode: true, 403 transform: (chunk, enc, cb) => { cb(null, chunk); } 404 }); 405 transform.push(0); 406 transform.push(1); 407 process.nextTick(() => { 408 transform.push(null); 409 }); 410 411 const mustReach = [ common.mustCall(), common.mustCall() ]; 412 413 const iter = transform[Symbol.asyncIterator](); 414 assert.strictEqual((await iter.next()).value, 0); 415 416 for await (const d of iter) { 417 assert.strictEqual(d, 1); 418 mustReach[0](); 419 } 420 mustReach[1](); 421 } 422 423 { 424 console.log('all next promises must be resolved on end'); 425 const r = new Readable({ 426 objectMode: true, 427 read() { 428 } 429 }); 430 431 const b = r[Symbol.asyncIterator](); 432 const c = b.next(); 433 const d = b.next(); 434 r.push(null); 435 assert.deepStrictEqual(await c, { done: true, value: undefined }); 436 assert.deepStrictEqual(await d, { done: true, value: undefined }); 437 } 438 439 { 440 console.log('all next promises must be resolved on destroy'); 441 const r = new Readable({ 442 objectMode: true, 443 read() { 444 } 445 }); 446 447 const b = r[Symbol.asyncIterator](); 448 const c = b.next(); 449 const d = b.next(); 450 r.destroy(); 451 assert.deepStrictEqual(await c, { done: true, value: undefined }); 452 assert.deepStrictEqual(await d, { done: true, value: undefined }); 453 } 454 455 { 456 console.log('all next promises must be resolved on destroy with error'); 457 const r = new Readable({ 458 objectMode: true, 459 read() { 460 } 461 }); 462 463 const b = r[Symbol.asyncIterator](); 464 const c = b.next(); 465 const d = b.next(); 466 const err = new Error('kaboom'); 467 r.destroy(err); 468 469 await Promise.all([(async () => { 470 let e; 471 try { 472 await c; 473 } catch (_e) { 474 e = _e; 475 } 476 assert.strictEqual(e, err); 477 })(), (async () => { 478 let e; 479 try { 480 await d; 481 } catch (_e) { 482 e = _e; 483 } 484 assert.strictEqual(e, err); 485 })()]); 486 } 487} 488 489{ 490 // AsyncIterator return should end even when destroy 491 // does not implement the callback API. 492 493 const r = new Readable({ 494 objectMode: true, 495 read() { 496 } 497 }); 498 499 const originalDestroy = r.destroy; 500 r.destroy = (err) => { 501 originalDestroy.call(r, err); 502 }; 503 const it = r[Symbol.asyncIterator](); 504 const p = it.return(); 505 r.push(null); 506 p.then(common.mustCall()); 507} 508 509 510{ 511 // AsyncIterator return should not error with 512 // premature close. 513 514 const r = new Readable({ 515 objectMode: true, 516 read() { 517 } 518 }); 519 520 const originalDestroy = r.destroy; 521 r.destroy = (err) => { 522 originalDestroy.call(r, err); 523 }; 524 const it = r[Symbol.asyncIterator](); 525 const p = it.return(); 526 r.emit('close'); 527 p.then(common.mustCall()).catch(common.mustNotCall()); 528} 529 530{ 531 // AsyncIterator should finish correctly if destroyed. 532 533 const r = new Readable({ 534 objectMode: true, 535 read() { 536 } 537 }); 538 539 r.destroy(); 540 r.on('close', () => { 541 const it = r[Symbol.asyncIterator](); 542 const next = it.next(); 543 next 544 .then(common.mustCall(({ done }) => assert.strictEqual(done, true))) 545 .catch(common.mustNotCall()); 546 }); 547} 548 549// To avoid missing some tests if a promise does not resolve 550tests().then(common.mustCall()); 551