1'use strict'; 2 3const common = require('../common'); 4const { 5 Stream, 6 Readable, 7 Transform, 8 PassThrough, 9 pipeline 10} = require('stream'); 11const assert = require('assert'); 12const http = require('http'); 13 14async function tests() { 15 { 16 // v1 stream 17 18 const stream = new Stream(); 19 stream.destroy = common.mustCall(); 20 process.nextTick(() => { 21 stream.emit('data', 'hello'); 22 stream.emit('data', 'world'); 23 stream.emit('end'); 24 }); 25 26 let res = ''; 27 stream[Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator]; 28 for await (const d of stream) { 29 res += d; 30 } 31 assert.strictEqual(res, 'helloworld'); 32 } 33 34 { 35 // v1 stream error 36 37 const stream = new Stream(); 38 stream.close = common.mustCall(); 39 process.nextTick(() => { 40 stream.emit('data', 0); 41 stream.emit('data', 1); 42 stream.emit('error', new Error('asd')); 43 }); 44 45 const iter = Readable.prototype[Symbol.asyncIterator].call(stream); 46 await iter.next() 47 .then(common.mustNotCall()) 48 .catch(common.mustCall((err) => { 49 assert.strictEqual(err.message, 'asd'); 50 })); 51 } 52 53 { 54 // Non standard stream cleanup 55 56 const readable = new Readable({ autoDestroy: false, read() {} }); 57 readable.push('asd'); 58 readable.push('asd'); 59 readable.destroy = null; 60 readable.close = common.mustCall(() => { 61 readable.emit('close'); 62 }); 63 64 await (async () => { 65 for await (const d of readable) { // eslint-disable-line no-unused-vars 66 return; 67 } 68 })(); 69 } 70 71 { 72 const readable = new Readable({ objectMode: true, read() {} }); 73 readable.push(0); 74 readable.push(1); 75 readable.push(null); 76 77 const iter = readable[Symbol.asyncIterator](); 78 assert.strictEqual((await iter.next()).value, 0); 79 for await (const d of iter) { 80 assert.strictEqual(d, 1); 81 } 82 } 83 84 { 85 console.log('read without for..await'); 86 const max = 5; 87 const readable = new Readable({ 88 objectMode: true, 89 read() {} 90 }); 91 92 const iter = readable[Symbol.asyncIterator](); 93 assert.strictEqual(iter.stream, readable); 94 const values = []; 95 for (let i = 0; i < max; i++) { 96 values.push(iter.next()); 97 } 98 Promise.all(values).then(common.mustCall((values) => { 99 values.forEach(common.mustCall( 100 (item, i) => assert.strictEqual(item.value, 'hello-' + i), 5)); 101 })); 102 103 readable.push('hello-0'); 104 readable.push('hello-1'); 105 readable.push('hello-2'); 106 readable.push('hello-3'); 107 readable.push('hello-4'); 108 readable.push(null); 109 110 const last = await iter.next(); 111 assert.strictEqual(last.done, true); 112 } 113 114 { 115 console.log('read without for..await deferred'); 116 const readable = new Readable({ 117 objectMode: true, 118 read() {} 119 }); 120 121 const iter = readable[Symbol.asyncIterator](); 122 assert.strictEqual(iter.stream, readable); 123 let values = []; 124 for (let i = 0; i < 3; i++) { 125 values.push(iter.next()); 126 } 127 128 readable.push('hello-0'); 129 readable.push('hello-1'); 130 readable.push('hello-2'); 131 132 let k = 0; 133 const results1 = await Promise.all(values); 134 results1.forEach(common.mustCall( 135 (item) => assert.strictEqual(item.value, 'hello-' + k++), 3)); 136 137 values = []; 138 for (let i = 0; i < 2; i++) { 139 values.push(iter.next()); 140 } 141 142 readable.push('hello-3'); 143 readable.push('hello-4'); 144 readable.push(null); 145 146 const results2 = await Promise.all(values); 147 results2.forEach(common.mustCall( 148 (item) => assert.strictEqual(item.value, 'hello-' + k++), 2)); 149 150 const last = await iter.next(); 151 assert.strictEqual(last.done, true); 152 } 153 154 { 155 console.log('read without for..await with errors'); 156 const max = 3; 157 const readable = new Readable({ 158 objectMode: true, 159 read() {} 160 }); 161 162 const iter = readable[Symbol.asyncIterator](); 163 assert.strictEqual(iter.stream, readable); 164 const values = []; 165 const errors = []; 166 let i; 167 for (i = 0; i < max; i++) { 168 values.push(iter.next()); 169 } 170 for (i = 0; i < 2; i++) { 171 errors.push(iter.next()); 172 } 173 174 readable.push('hello-0'); 175 readable.push('hello-1'); 176 readable.push('hello-2'); 177 178 const resolved = await Promise.all(values); 179 180 resolved.forEach(common.mustCall( 181 (item, i) => assert.strictEqual(item.value, 'hello-' + i), max)); 182 183 errors.slice(0, 1).forEach((promise) => { 184 promise.catch(common.mustCall((err) => { 185 assert.strictEqual(err.message, 'kaboom'); 186 })); 187 }); 188 189 errors.slice(1).forEach((promise) => { 190 promise.then(common.mustCall(({ done, value }) => { 191 assert.strictEqual(done, true); 192 assert.strictEqual(value, undefined); 193 })); 194 }); 195 196 readable.destroy(new Error('kaboom')); 197 } 198 199 { 200 console.log('call next() after error'); 201 const readable = new Readable({ 202 read() {} 203 }); 204 const iterator = readable[Symbol.asyncIterator](); 205 206 const err = new Error('kaboom'); 207 readable.destroy(new Error('kaboom')); 208 await assert.rejects(iterator.next.bind(iterator), err); 209 } 210 211 { 212 console.log('read object mode'); 213 const max = 42; 214 let readed = 0; 215 let received = 0; 216 const readable = new Readable({ 217 objectMode: true, 218 read() { 219 this.push('hello'); 220 if (++readed === max) { 221 this.push(null); 222 } 223 } 224 }); 225 226 for await (const k of readable) { 227 received++; 228 assert.strictEqual(k, 'hello'); 229 } 230 231 assert.strictEqual(readed, received); 232 } 233 234 { 235 console.log('destroy sync'); 236 const readable = new Readable({ 237 objectMode: true, 238 read() { 239 this.destroy(new Error('kaboom from read')); 240 } 241 }); 242 243 let err; 244 try { 245 // eslint-disable-next-line no-unused-vars 246 for await (const k of readable) {} 247 } catch (e) { 248 err = e; 249 } 250 assert.strictEqual(err.message, 'kaboom from read'); 251 } 252 253 { 254 console.log('destroy async'); 255 const readable = new Readable({ 256 objectMode: true, 257 read() { 258 if (!this.pushed) { 259 this.push('hello'); 260 this.pushed = true; 261 262 setImmediate(() => { 263 this.destroy(new Error('kaboom')); 264 }); 265 } 266 } 267 }); 268 269 let received = 0; 270 271 let err = null; 272 try { 273 // eslint-disable-next-line no-unused-vars 274 for await (const k of readable) { 275 received++; 276 } 277 } catch (e) { 278 err = e; 279 } 280 281 assert.strictEqual(err.message, 'kaboom'); 282 assert.strictEqual(received, 1); 283 } 284 285 { 286 console.log('destroyed by throw'); 287 const readable = new Readable({ 288 objectMode: true, 289 read() { 290 this.push('hello'); 291 } 292 }); 293 294 let err = null; 295 try { 296 for await (const k of readable) { 297 assert.strictEqual(k, 'hello'); 298 throw new Error('kaboom'); 299 } 300 } catch (e) { 301 err = e; 302 } 303 304 assert.strictEqual(err.message, 'kaboom'); 305 assert.strictEqual(readable.destroyed, true); 306 } 307 308 { 309 console.log('destroyed sync after push'); 310 const readable = new Readable({ 311 objectMode: true, 312 read() { 313 this.push('hello'); 314 this.destroy(new Error('kaboom')); 315 } 316 }); 317 318 let received = 0; 319 320 let err = null; 321 try { 322 for await (const k of readable) { 323 assert.strictEqual(k, 'hello'); 324 received++; 325 } 326 } catch (e) { 327 err = e; 328 } 329 330 assert.strictEqual(err.message, 'kaboom'); 331 assert.strictEqual(received, 1); 332 } 333 334 { 335 console.log('destroyed will not deadlock'); 336 const readable = new Readable(); 337 readable.destroy(); 338 process.nextTick(async () => { 339 readable.on('close', common.mustNotCall()); 340 let received = 0; 341 for await (const k of readable) { 342 // Just make linting pass. This should never run. 343 assert.strictEqual(k, 'hello'); 344 received++; 345 } 346 assert.strictEqual(received, 0); 347 }); 348 } 349 350 { 351 console.log('push async'); 352 const max = 42; 353 let readed = 0; 354 let received = 0; 355 const readable = new Readable({ 356 objectMode: true, 357 read() { 358 setImmediate(() => { 359 this.push('hello'); 360 if (++readed === max) { 361 this.push(null); 362 } 363 }); 364 } 365 }); 366 367 for await (const k of readable) { 368 received++; 369 assert.strictEqual(k, 'hello'); 370 } 371 372 assert.strictEqual(readed, received); 373 } 374 375 { 376 console.log('push binary async'); 377 const max = 42; 378 let readed = 0; 379 const readable = new Readable({ 380 read() { 381 setImmediate(() => { 382 this.push('hello'); 383 if (++readed === max) { 384 this.push(null); 385 } 386 }); 387 } 388 }); 389 390 let expected = ''; 391 readable.setEncoding('utf8'); 392 readable.pause(); 393 readable.on('data', (chunk) => { 394 expected += chunk; 395 }); 396 397 let data = ''; 398 for await (const k of readable) { 399 data += k; 400 } 401 402 assert.strictEqual(data, expected); 403 } 404 405 { 406 console.log('.next() on destroyed stream'); 407 const readable = new Readable({ 408 read() { 409 // no-op 410 } 411 }); 412 413 readable.destroy(); 414 415 const { done } = await readable[Symbol.asyncIterator]().next(); 416 assert.strictEqual(done, true); 417 } 418 419 { 420 console.log('.next() on pipelined stream'); 421 const readable = new Readable({ 422 read() { 423 // no-op 424 } 425 }); 426 427 const passthrough = new PassThrough(); 428 const err = new Error('kaboom'); 429 pipeline(readable, passthrough, common.mustCall((e) => { 430 assert.strictEqual(e, err); 431 })); 432 readable.destroy(err); 433 await assert.rejects( 434 readable[Symbol.asyncIterator]().next(), 435 (e) => { 436 assert.strictEqual(e, err); 437 return true; 438 } 439 ); 440 } 441 442 { 443 console.log('iterating on an ended stream completes'); 444 const r = new Readable({ 445 objectMode: true, 446 read() { 447 this.push('asdf'); 448 this.push('hehe'); 449 this.push(null); 450 } 451 }); 452 // eslint-disable-next-line no-unused-vars 453 for await (const a of r) { 454 } 455 // eslint-disable-next-line no-unused-vars 456 for await (const b of r) { 457 } 458 } 459 460 { 461 console.log('destroy mid-stream does not error'); 462 const r = new Readable({ 463 objectMode: true, 464 read() { 465 this.push('asdf'); 466 this.push('hehe'); 467 } 468 }); 469 470 // eslint-disable-next-line no-unused-vars 471 for await (const a of r) { 472 r.destroy(null); 473 } 474 } 475 476 { 477 console.log('readable side of a transform stream pushes null'); 478 const transform = new Transform({ 479 objectMode: true, 480 transform: (chunk, enc, cb) => { cb(null, chunk); } 481 }); 482 transform.push(0); 483 transform.push(1); 484 process.nextTick(() => { 485 transform.push(null); 486 }); 487 488 const mustReach = [ common.mustCall(), common.mustCall() ]; 489 490 const iter = transform[Symbol.asyncIterator](); 491 assert.strictEqual((await iter.next()).value, 0); 492 493 for await (const d of iter) { 494 assert.strictEqual(d, 1); 495 mustReach[0](); 496 } 497 mustReach[1](); 498 } 499 500 { 501 console.log('all next promises must be resolved on end'); 502 const r = new Readable({ 503 objectMode: true, 504 read() { 505 } 506 }); 507 508 const b = r[Symbol.asyncIterator](); 509 const c = b.next(); 510 const d = b.next(); 511 r.push(null); 512 assert.deepStrictEqual(await c, { done: true, value: undefined }); 513 assert.deepStrictEqual(await d, { done: true, value: undefined }); 514 } 515 516 { 517 console.log('all next promises must be resolved on destroy'); 518 const r = new Readable({ 519 objectMode: true, 520 read() { 521 } 522 }); 523 524 const b = r[Symbol.asyncIterator](); 525 const c = b.next(); 526 const d = b.next(); 527 r.destroy(); 528 assert.deepStrictEqual(await c, { done: true, value: undefined }); 529 assert.deepStrictEqual(await d, { done: true, value: undefined }); 530 } 531 532 { 533 console.log('all next promises must be resolved on destroy with error'); 534 const r = new Readable({ 535 objectMode: true, 536 read() { 537 } 538 }); 539 540 const b = r[Symbol.asyncIterator](); 541 const c = b.next(); 542 const d = b.next(); 543 const err = new Error('kaboom'); 544 r.destroy(err); 545 546 await Promise.all([(async () => { 547 let e; 548 try { 549 await c; 550 } catch (_e) { 551 e = _e; 552 } 553 assert.strictEqual(e, err); 554 })(), (async () => { 555 let e; 556 let x; 557 try { 558 x = await d; 559 } catch (_e) { 560 e = _e; 561 } 562 assert.strictEqual(e, undefined); 563 assert.strictEqual(x.done, true); 564 assert.strictEqual(x.value, undefined); 565 })()]); 566 } 567 568 { 569 const _err = new Error('asd'); 570 const r = new Readable({ 571 read() { 572 }, 573 destroy(err, callback) { 574 setTimeout(() => callback(_err), 1); 575 } 576 }); 577 578 r.destroy(); 579 const it = r[Symbol.asyncIterator](); 580 it.next().catch(common.mustCall((err) => { 581 assert.strictEqual(err, _err); 582 })); 583 } 584 585 { 586 // Don't destroy if no auto destroy. 587 // https://github.com/nodejs/node/issues/35116 588 589 const r = new Readable({ 590 autoDestroy: false, 591 read() { 592 this.push('asd'); 593 this.push(null); 594 } 595 }); 596 597 for await (const chunk of r) {} // eslint-disable-line no-unused-vars 598 assert.strictEqual(r.destroyed, false); 599 } 600 601 { 602 // Destroy if no auto destroy and premature break. 603 // https://github.com/nodejs/node/pull/35122/files#r485678318 604 605 const r = new Readable({ 606 autoDestroy: false, 607 read() { 608 this.push('asd'); 609 } 610 }); 611 612 for await (const chunk of r) { // eslint-disable-line no-unused-vars 613 break; 614 } 615 assert.strictEqual(r.destroyed, true); 616 } 617 618 { 619 // Don't destroy before 'end'. 620 621 const r = new Readable({ 622 read() { 623 this.push('asd'); 624 this.push(null); 625 } 626 }).on('end', () => { 627 assert.strictEqual(r.destroyed, false); 628 }); 629 630 for await (const chunk of r) {} // eslint-disable-line no-unused-vars 631 632 assert.strictEqual(r.destroyed, true); 633 } 634} 635 636{ 637 // AsyncIterator return should end even when destroy 638 // does not implement the callback API. 639 640 const r = new Readable({ 641 objectMode: true, 642 read() { 643 } 644 }); 645 646 const originalDestroy = r.destroy; 647 r.destroy = (err) => { 648 originalDestroy.call(r, err); 649 }; 650 const it = r[Symbol.asyncIterator](); 651 const p = it.return(); 652 r.push(null); 653 p.then(common.mustCall()); 654} 655 656 657{ 658 // AsyncIterator return should not error with 659 // premature close. 660 661 const r = new Readable({ 662 objectMode: true, 663 read() { 664 } 665 }); 666 667 const originalDestroy = r.destroy; 668 r.destroy = (err) => { 669 originalDestroy.call(r, err); 670 }; 671 const it = r[Symbol.asyncIterator](); 672 const p = it.return(); 673 r.emit('close'); 674 p.then(common.mustCall()).catch(common.mustNotCall()); 675} 676 677{ 678 // AsyncIterator should finish correctly if destroyed. 679 680 const r = new Readable({ 681 objectMode: true, 682 read() { 683 } 684 }); 685 686 r.destroy(); 687 r.on('close', () => { 688 const it = r[Symbol.asyncIterator](); 689 const next = it.next(); 690 next 691 .then(common.mustCall(({ done }) => assert.strictEqual(done, true))) 692 .catch(common.mustNotCall()); 693 }); 694} 695 696{ 697 let _req; 698 const server = http.createServer((request, response) => { 699 response.statusCode = 404; 700 response.write('never ends'); 701 }); 702 703 server.listen(() => { 704 _req = http.request(`http://localhost:${server.address().port}`) 705 .on('response', common.mustCall(async (res) => { 706 setTimeout(() => { 707 _req.destroy(new Error('something happened')); 708 }, 100); 709 710 res.on('aborted', () => { 711 const err = new Error(); 712 err.code = 'ECONNRESET'; 713 res.emit('error', err); 714 }); 715 716 res.on('error', common.mustCall()); 717 718 let _err; 719 try { 720 // eslint-disable-next-line no-unused-vars 721 for await (const chunk of res) {} 722 } catch (err) { 723 _err = err; 724 } 725 726 assert.strictEqual(_err.code, 'ECONNRESET'); 727 server.close(); 728 })) 729 .on('error', common.mustCall()) 730 .end(); 731 }); 732} 733 734{ 735 async function getParsedBody(request) { 736 let body = ''; 737 738 for await (const data of request) { 739 body += data; 740 } 741 742 try { 743 return JSON.parse(body); 744 } catch { 745 return {}; 746 } 747 } 748 749 const str = JSON.stringify({ asd: true }); 750 const server = http.createServer(async (request, response) => { 751 const body = await getParsedBody(request); 752 response.statusCode = 200; 753 assert.strictEqual(JSON.stringify(body), str); 754 response.end(JSON.stringify(body)); 755 }).listen(() => { 756 http 757 .request({ 758 method: 'POST', 759 hostname: 'localhost', 760 port: server.address().port, 761 }) 762 .end(str) 763 .on('response', async (res) => { 764 let body = ''; 765 for await (const chunk of res) { 766 body += chunk; 767 } 768 assert.strictEqual(body, str); 769 server.close(); 770 }); 771 }); 772} 773 774// To avoid missing some tests if a promise does not resolve 775tests().then(common.mustCall()); 776