1'use strict'; 2 3const common = require('../common'); 4const { 5 Writable, 6 Readable, 7 Transform, 8 finished, 9 Duplex, 10 PassThrough, 11 Stream, 12} = require('stream'); 13const assert = require('assert'); 14const EE = require('events'); 15const fs = require('fs'); 16const { promisify } = require('util'); 17const http = require('http'); 18 19{ 20 const rs = new Readable({ 21 read() {} 22 }); 23 24 finished(rs, common.mustSucceed()); 25 26 rs.push(null); 27 rs.resume(); 28} 29 30{ 31 const ws = new Writable({ 32 write(data, enc, cb) { 33 cb(); 34 } 35 }); 36 37 finished(ws, common.mustSucceed()); 38 39 ws.end(); 40} 41 42{ 43 const tr = new Transform({ 44 transform(data, enc, cb) { 45 cb(); 46 } 47 }); 48 49 let finish = false; 50 let ended = false; 51 52 tr.on('end', () => { 53 ended = true; 54 }); 55 56 tr.on('finish', () => { 57 finish = true; 58 }); 59 60 finished(tr, common.mustSucceed(() => { 61 assert(finish); 62 assert(ended); 63 })); 64 65 tr.end(); 66 tr.resume(); 67} 68 69{ 70 const rs = fs.createReadStream(__filename); 71 72 rs.resume(); 73 finished(rs, common.mustCall()); 74} 75 76{ 77 const finishedPromise = promisify(finished); 78 79 async function run() { 80 const rs = fs.createReadStream(__filename); 81 const done = common.mustCall(); 82 83 let ended = false; 84 rs.resume(); 85 rs.on('end', () => { 86 ended = true; 87 }); 88 await finishedPromise(rs); 89 assert(ended); 90 done(); 91 } 92 93 run(); 94} 95 96{ 97 // Check pre-cancelled 98 const signal = new EventTarget(); 99 signal.aborted = true; 100 101 const rs = Readable.from((function* () {})()); 102 finished(rs, { signal }, common.mustCall((err) => { 103 assert.strictEqual(err.name, 'AbortError'); 104 })); 105} 106 107{ 108 // Check cancelled before the stream ends sync. 109 const ac = new AbortController(); 110 const { signal } = ac; 111 112 const rs = Readable.from((function* () {})()); 113 finished(rs, { signal }, common.mustCall((err) => { 114 assert.strictEqual(err.name, 'AbortError'); 115 })); 116 117 ac.abort(); 118} 119 120{ 121 // Check cancelled before the stream ends async. 122 const ac = new AbortController(); 123 const { signal } = ac; 124 125 const rs = Readable.from((function* () {})()); 126 setTimeout(() => ac.abort(), 1); 127 finished(rs, { signal }, common.mustCall((err) => { 128 assert.strictEqual(err.name, 'AbortError'); 129 })); 130} 131 132{ 133 // Check cancelled after doesn't throw. 134 const ac = new AbortController(); 135 const { signal } = ac; 136 137 const rs = Readable.from((function* () { 138 yield 5; 139 setImmediate(() => ac.abort()); 140 })()); 141 rs.resume(); 142 finished(rs, { signal }, common.mustSucceed()); 143} 144 145{ 146 // Promisified abort works 147 const finishedPromise = promisify(finished); 148 async function run() { 149 const ac = new AbortController(); 150 const { signal } = ac; 151 const rs = Readable.from((function* () {})()); 152 setImmediate(() => ac.abort()); 153 await finishedPromise(rs, { signal }); 154 } 155 156 assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); 157} 158 159{ 160 // Promisified pre-aborted works 161 const finishedPromise = promisify(finished); 162 async function run() { 163 const signal = new EventTarget(); 164 signal.aborted = true; 165 const rs = Readable.from((function* () {})()); 166 await finishedPromise(rs, { signal }); 167 } 168 169 assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); 170} 171 172 173{ 174 const rs = fs.createReadStream('file-does-not-exist'); 175 176 finished(rs, common.expectsError({ 177 code: 'ENOENT' 178 })); 179} 180 181{ 182 const rs = new Readable(); 183 184 finished(rs, common.mustSucceed()); 185 186 rs.push(null); 187 rs.emit('close'); // Should not trigger an error 188 rs.resume(); 189} 190 191{ 192 const rs = new Readable(); 193 194 finished(rs, common.mustCall((err) => { 195 assert(err, 'premature close error'); 196 })); 197 198 rs.emit('close'); // Should trigger error 199 rs.push(null); 200 rs.resume(); 201} 202 203// Test faulty input values and options. 204{ 205 const rs = new Readable({ 206 read() {} 207 }); 208 209 assert.throws( 210 () => finished(rs, 'foo'), 211 { 212 code: 'ERR_INVALID_ARG_TYPE', 213 message: /callback/ 214 } 215 ); 216 assert.throws( 217 () => finished(rs, 'foo', () => {}), 218 { 219 code: 'ERR_INVALID_ARG_TYPE', 220 message: /options/ 221 } 222 ); 223 assert.throws( 224 () => finished(rs, {}, 'foo'), 225 { 226 code: 'ERR_INVALID_ARG_TYPE', 227 message: /callback/ 228 } 229 ); 230 231 finished(rs, null, common.mustCall()); 232 233 rs.push(null); 234 rs.resume(); 235} 236 237// Test that calling returned function removes listeners 238{ 239 const ws = new Writable({ 240 write(data, env, cb) { 241 cb(); 242 } 243 }); 244 const removeListener = finished(ws, common.mustNotCall()); 245 removeListener(); 246 ws.end(); 247} 248 249{ 250 const rs = new Readable(); 251 const removeListeners = finished(rs, common.mustNotCall()); 252 removeListeners(); 253 254 rs.emit('close'); 255 rs.push(null); 256 rs.resume(); 257} 258 259{ 260 const streamLike = new EE(); 261 streamLike.readableEnded = true; 262 streamLike.readable = true; 263 assert.throws( 264 () => { 265 finished(streamLike, () => {}); 266 }, 267 { code: 'ERR_INVALID_ARG_TYPE' } 268 ); 269 streamLike.emit('close'); 270} 271 272{ 273 const writable = new Writable({ write() {} }); 274 writable.writable = false; 275 writable.destroy(); 276 finished(writable, common.mustCall((err) => { 277 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 278 })); 279} 280 281{ 282 const readable = new Readable(); 283 readable.readable = false; 284 readable.destroy(); 285 finished(readable, common.mustCall((err) => { 286 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 287 })); 288} 289 290{ 291 const w = new Writable({ 292 write(chunk, encoding, callback) { 293 setImmediate(callback); 294 } 295 }); 296 finished(w, common.mustCall((err) => { 297 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 298 })); 299 w.end('asd'); 300 w.destroy(); 301} 302 303function testClosed(factory) { 304 { 305 // If already destroyed but finished is cancelled in same tick 306 // don't invoke the callback, 307 308 const s = factory(); 309 s.destroy(); 310 const dispose = finished(s, common.mustNotCall()); 311 dispose(); 312 } 313 314 { 315 // If already destroyed invoked callback. 316 317 const s = factory(); 318 s.destroy(); 319 finished(s, common.mustCall()); 320 } 321 322 { 323 // Don't invoke until destroy has completed. 324 325 let destroyed = false; 326 const s = factory({ 327 destroy(err, cb) { 328 setImmediate(() => { 329 destroyed = true; 330 cb(); 331 }); 332 } 333 }); 334 s.destroy(); 335 finished(s, common.mustCall(() => { 336 assert.strictEqual(destroyed, true); 337 })); 338 } 339 340 { 341 // Invoke callback even if close is inhibited. 342 343 const s = factory({ 344 emitClose: false, 345 destroy(err, cb) { 346 cb(); 347 finished(s, common.mustCall()); 348 } 349 }); 350 s.destroy(); 351 } 352 353 { 354 // Invoke with deep async. 355 356 const s = factory({ 357 destroy(err, cb) { 358 setImmediate(() => { 359 cb(); 360 setImmediate(() => { 361 finished(s, common.mustCall()); 362 }); 363 }); 364 } 365 }); 366 s.destroy(); 367 } 368} 369 370testClosed((opts) => new Readable({ ...opts })); 371testClosed((opts) => new Writable({ write() {}, ...opts })); 372 373{ 374 const w = new Writable({ 375 write(chunk, encoding, cb) { 376 cb(); 377 }, 378 autoDestroy: false 379 }); 380 w.end('asd'); 381 process.nextTick(() => { 382 finished(w, common.mustCall()); 383 }); 384} 385 386{ 387 const w = new Writable({ 388 write(chunk, encoding, cb) { 389 cb(new Error()); 390 }, 391 autoDestroy: false 392 }); 393 w.write('asd'); 394 w.on('error', common.mustCall(() => { 395 finished(w, common.mustCall()); 396 })); 397} 398 399{ 400 const r = new Readable({ 401 autoDestroy: false 402 }); 403 r.push(null); 404 r.resume(); 405 r.on('end', common.mustCall(() => { 406 finished(r, common.mustCall()); 407 })); 408} 409 410{ 411 const rs = fs.createReadStream(__filename, { autoClose: false }); 412 rs.resume(); 413 rs.on('close', common.mustNotCall()); 414 rs.on('end', common.mustCall(() => { 415 finished(rs, common.mustCall()); 416 })); 417} 418 419{ 420 const d = new EE(); 421 d._writableState = {}; 422 d._writableState.finished = true; 423 finished(d, { readable: false, writable: true }, common.mustCall((err) => { 424 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 425 })); 426 d._writableState.errored = true; 427 d.emit('close'); 428} 429 430{ 431 const r = new Readable(); 432 finished(r, common.mustCall((err) => { 433 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 434 })); 435 r.push('asd'); 436 r.push(null); 437 r.destroy(); 438} 439 440{ 441 const d = new Duplex({ 442 final(cb) { }, // Never close writable side for test purpose 443 read() { 444 this.push(null); 445 } 446 }); 447 448 d.on('end', common.mustCall()); 449 450 finished(d, { readable: true, writable: false }, common.mustCall()); 451 452 d.end(); 453 d.resume(); 454} 455 456{ 457 const d = new Duplex({ 458 final(cb) { }, // Never close writable side for test purpose 459 read() { 460 this.push(null); 461 } 462 }); 463 464 d.on('end', common.mustCall()); 465 466 d.end(); 467 finished(d, { readable: true, writable: false }, common.mustCall()); 468 469 d.resume(); 470} 471 472{ 473 // Test for compat for e.g. fd-slicer which implements 474 // non standard destroy behavior which might not emit 475 // 'close'. 476 const r = new Readable(); 477 finished(r, common.mustCall()); 478 r.resume(); 479 r.push('asd'); 480 r.destroyed = true; 481 r.push(null); 482} 483 484{ 485 // Regression https://github.com/nodejs/node/issues/33130 486 const response = new PassThrough(); 487 488 class HelloWorld extends Duplex { 489 constructor(response) { 490 super({ 491 autoDestroy: false 492 }); 493 494 this.response = response; 495 this.readMore = false; 496 497 response.once('end', () => { 498 this.push(null); 499 }); 500 501 response.on('readable', () => { 502 if (this.readMore) { 503 this._read(); 504 } 505 }); 506 } 507 508 _read() { 509 const { response } = this; 510 511 this.readMore = true; 512 513 if (response.readableLength) { 514 this.readMore = false; 515 } 516 517 let data; 518 while ((data = response.read()) !== null) { 519 this.push(data); 520 } 521 } 522 } 523 524 const instance = new HelloWorld(response); 525 instance.setEncoding('utf8'); 526 instance.end(); 527 528 (async () => { 529 await EE.once(instance, 'finish'); 530 531 setImmediate(() => { 532 response.write('chunk 1'); 533 response.write('chunk 2'); 534 response.write('chunk 3'); 535 response.end(); 536 }); 537 538 let res = ''; 539 for await (const data of instance) { 540 res += data; 541 } 542 543 assert.strictEqual(res, 'chunk 1chunk 2chunk 3'); 544 })().then(common.mustCall()); 545} 546 547{ 548 const p = new PassThrough(); 549 p.end(); 550 finished(p, common.mustNotCall()); 551} 552 553{ 554 const p = new PassThrough(); 555 p.end(); 556 p.on('finish', common.mustCall(() => { 557 finished(p, common.mustNotCall()); 558 })); 559} 560 561{ 562 const server = http.createServer(common.mustCall((req, res) => { 563 res.on('close', common.mustCall(() => { 564 finished(res, common.mustCall(() => { 565 server.close(); 566 })); 567 })); 568 res.end(); 569 })) 570 .listen(0, function() { 571 http.request({ 572 method: 'GET', 573 port: this.address().port 574 }).end() 575 .on('response', common.mustCall()); 576 }); 577} 578 579{ 580 const server = http.createServer(common.mustCall((req, res) => { 581 req.on('close', common.mustCall(() => { 582 finished(req, common.mustCall(() => { 583 server.close(); 584 })); 585 })); 586 req.destroy(); 587 })).listen(0, function() { 588 http.request({ 589 method: 'GET', 590 port: this.address().port 591 }).end().on('error', common.mustCall()); 592 }); 593} 594 595{ 596 const w = new Writable({ 597 write(chunk, encoding, callback) { 598 process.nextTick(callback); 599 } 600 }); 601 w.aborted = false; 602 w.end(); 603 let closed = false; 604 w.on('finish', () => { 605 assert.strictEqual(closed, false); 606 w.emit('aborted'); 607 }); 608 w.on('close', common.mustCall(() => { 609 closed = true; 610 })); 611 612 finished(w, common.mustCall(() => { 613 assert.strictEqual(closed, true); 614 })); 615} 616 617{ 618 const w = new Writable(); 619 const _err = new Error(); 620 w.destroy(_err); 621 assert.strictEqual(w.errored, _err); 622 finished(w, common.mustCall((err) => { 623 assert.strictEqual(_err, err); 624 assert.strictEqual(w.closed, true); 625 finished(w, common.mustCall((err) => { 626 assert.strictEqual(_err, err); 627 })); 628 })); 629} 630 631{ 632 const w = new Writable(); 633 w.destroy(); 634 assert.strictEqual(w.errored, null); 635 finished(w, common.mustCall((err) => { 636 assert.strictEqual(w.closed, true); 637 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 638 finished(w, common.mustCall((err) => { 639 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 640 })); 641 })); 642} 643 644{ 645 // Legacy Streams do not inherit from Readable or Writable. 646 // We cannot really assume anything about them, so we cannot close them 647 // automatically. 648 const s = new Stream(); 649 finished(s, common.mustNotCall()); 650} 651 652{ 653 const server = http.createServer(common.mustCall(function(req, res) { 654 fs.createReadStream(__filename).pipe(res); 655 finished(res, common.mustCall(function(err) { 656 assert.strictEqual(err, undefined); 657 })); 658 })).listen(0, function() { 659 http.request( 660 { method: 'GET', port: this.address().port }, 661 common.mustCall(function(res) { 662 res.resume(); 663 finished(res, common.mustCall(() => { 664 server.close(); 665 })); 666 }) 667 ).end(); 668 }); 669} 670 671{ 672 const stream = new Duplex({ 673 write(chunk, enc, cb) { 674 setImmediate(cb); 675 } 676 }); 677 678 stream.end('foo'); 679 680 finished(stream, { readable: false }, common.mustCall((err) => { 681 assert(!err); 682 })); 683} 684