1'use strict'; 2 3const common = require('../common'); 4const { 5 Writable, 6 Readable, 7 Transform, 8 finished, 9 Duplex, 10 PassThrough 11} = require('stream'); 12const assert = require('assert'); 13const EE = require('events'); 14const fs = require('fs'); 15const { promisify } = require('util'); 16 17{ 18 const rs = new Readable({ 19 read() {} 20 }); 21 22 finished(rs, common.mustSucceed()); 23 24 rs.push(null); 25 rs.resume(); 26} 27 28{ 29 const ws = new Writable({ 30 write(data, enc, cb) { 31 cb(); 32 } 33 }); 34 35 finished(ws, common.mustSucceed()); 36 37 ws.end(); 38} 39 40{ 41 const tr = new Transform({ 42 transform(data, enc, cb) { 43 cb(); 44 } 45 }); 46 47 let finish = false; 48 let ended = false; 49 50 tr.on('end', () => { 51 ended = true; 52 }); 53 54 tr.on('finish', () => { 55 finish = true; 56 }); 57 58 finished(tr, common.mustSucceed(() => { 59 assert(finish); 60 assert(ended); 61 })); 62 63 tr.end(); 64 tr.resume(); 65} 66 67{ 68 const rs = fs.createReadStream(__filename); 69 70 rs.resume(); 71 finished(rs, common.mustCall()); 72} 73 74{ 75 const finishedPromise = promisify(finished); 76 77 async function run() { 78 const rs = fs.createReadStream(__filename); 79 const done = common.mustCall(); 80 81 let ended = false; 82 rs.resume(); 83 rs.on('end', () => { 84 ended = true; 85 }); 86 await finishedPromise(rs); 87 assert(ended); 88 done(); 89 } 90 91 run(); 92} 93 94{ 95 const rs = fs.createReadStream('file-does-not-exist'); 96 97 finished(rs, common.expectsError({ 98 code: 'ENOENT' 99 })); 100} 101 102{ 103 const rs = new Readable(); 104 105 finished(rs, common.mustSucceed()); 106 107 rs.push(null); 108 rs.emit('close'); // Should not trigger an error 109 rs.resume(); 110} 111 112{ 113 const rs = new Readable(); 114 115 finished(rs, common.mustCall((err) => { 116 assert(err, 'premature close error'); 117 })); 118 119 rs.emit('close'); // Should trigger error 120 rs.push(null); 121 rs.resume(); 122} 123 124// Test faulty input values and options. 125{ 126 const rs = new Readable({ 127 read() {} 128 }); 129 130 assert.throws( 131 () => finished(rs, 'foo'), 132 { 133 code: 'ERR_INVALID_ARG_TYPE', 134 message: /callback/ 135 } 136 ); 137 assert.throws( 138 () => finished(rs, 'foo', () => {}), 139 { 140 code: 'ERR_INVALID_ARG_TYPE', 141 message: /options/ 142 } 143 ); 144 assert.throws( 145 () => finished(rs, {}, 'foo'), 146 { 147 code: 'ERR_INVALID_ARG_TYPE', 148 message: /callback/ 149 } 150 ); 151 152 finished(rs, null, common.mustCall()); 153 154 rs.push(null); 155 rs.resume(); 156} 157 158// Test that calling returned function removes listeners 159{ 160 const ws = new Writable({ 161 write(data, env, cb) { 162 cb(); 163 } 164 }); 165 const removeListener = finished(ws, common.mustNotCall()); 166 removeListener(); 167 ws.end(); 168} 169 170{ 171 const rs = new Readable(); 172 const removeListeners = finished(rs, common.mustNotCall()); 173 removeListeners(); 174 175 rs.emit('close'); 176 rs.push(null); 177 rs.resume(); 178} 179 180{ 181 const streamLike = new EE(); 182 streamLike.readableEnded = true; 183 streamLike.readable = true; 184 finished(streamLike, common.mustCall()); 185 streamLike.emit('close'); 186} 187 188{ 189 const writable = new Writable({ write() {} }); 190 writable.writable = false; 191 writable.destroy(); 192 finished(writable, common.mustCall((err) => { 193 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 194 })); 195} 196 197{ 198 const readable = new Readable(); 199 readable.readable = false; 200 readable.destroy(); 201 finished(readable, common.mustCall((err) => { 202 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 203 })); 204} 205 206{ 207 const w = new Writable({ 208 write(chunk, encoding, callback) { 209 setImmediate(callback); 210 } 211 }); 212 finished(w, common.mustCall((err) => { 213 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 214 })); 215 w.end('asd'); 216 w.destroy(); 217} 218 219function testClosed(factory) { 220 { 221 // If already destroyed but finished is cancelled in same tick 222 // don't invoke the callback, 223 224 const s = factory(); 225 s.destroy(); 226 const dispose = finished(s, common.mustNotCall()); 227 dispose(); 228 } 229 230 { 231 // If already destroyed invoked callback. 232 233 const s = factory(); 234 s.destroy(); 235 finished(s, common.mustCall()); 236 } 237 238 { 239 // Don't invoke until destroy has completed. 240 241 let destroyed = false; 242 const s = factory({ 243 destroy(err, cb) { 244 setImmediate(() => { 245 destroyed = true; 246 cb(); 247 }); 248 } 249 }); 250 s.destroy(); 251 finished(s, common.mustCall(() => { 252 assert.strictEqual(destroyed, true); 253 })); 254 } 255 256 { 257 // Invoke callback even if close is inhibited. 258 259 const s = factory({ 260 emitClose: false, 261 destroy(err, cb) { 262 cb(); 263 finished(s, common.mustCall()); 264 } 265 }); 266 s.destroy(); 267 } 268 269 { 270 // Invoke with deep async. 271 272 const s = factory({ 273 destroy(err, cb) { 274 setImmediate(() => { 275 cb(); 276 setImmediate(() => { 277 finished(s, common.mustCall()); 278 }); 279 }); 280 } 281 }); 282 s.destroy(); 283 } 284} 285 286testClosed((opts) => new Readable({ ...opts })); 287testClosed((opts) => new Writable({ write() {}, ...opts })); 288 289{ 290 const w = new Writable({ 291 write(chunk, encoding, cb) { 292 cb(); 293 }, 294 autoDestroy: false 295 }); 296 w.end('asd'); 297 process.nextTick(() => { 298 finished(w, common.mustCall()); 299 }); 300} 301 302{ 303 const w = new Writable({ 304 write(chunk, encoding, cb) { 305 cb(new Error()); 306 }, 307 autoDestroy: false 308 }); 309 w.write('asd'); 310 w.on('error', common.mustCall(() => { 311 finished(w, common.mustCall()); 312 })); 313} 314 315{ 316 const r = new Readable({ 317 autoDestroy: false 318 }); 319 r.push(null); 320 r.resume(); 321 r.on('end', common.mustCall(() => { 322 finished(r, common.mustCall()); 323 })); 324} 325 326{ 327 const rs = fs.createReadStream(__filename, { autoClose: false }); 328 rs.resume(); 329 rs.on('close', common.mustNotCall()); 330 rs.on('end', common.mustCall(() => { 331 finished(rs, common.mustCall()); 332 })); 333} 334 335{ 336 const d = new EE(); 337 d._writableState = {}; 338 d._writableState.finished = true; 339 finished(d, { readable: false, writable: true }, common.mustCall((err) => { 340 assert.strictEqual(err, undefined); 341 })); 342 d._writableState.errored = true; 343 d.emit('close'); 344} 345 346{ 347 const r = new Readable(); 348 finished(r, common.mustCall((err) => { 349 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 350 })); 351 r.push('asd'); 352 r.push(null); 353 r.destroy(); 354} 355 356{ 357 const d = new Duplex({ 358 final(cb) { }, // Never close writable side for test purpose 359 read() { 360 this.push(null); 361 } 362 }); 363 364 d.on('end', common.mustCall()); 365 366 finished(d, { readable: true, writable: false }, common.mustCall()); 367 368 d.end(); 369 d.resume(); 370} 371 372{ 373 const d = new Duplex({ 374 final(cb) { }, // Never close writable side for test purpose 375 read() { 376 this.push(null); 377 } 378 }); 379 380 d.on('end', common.mustCall()); 381 382 d.end(); 383 finished(d, { readable: true, writable: false }, common.mustCall()); 384 385 d.resume(); 386} 387 388{ 389 // Test for compat for e.g. fd-slicer which implements 390 // non standard destroy behavior which might not emit 391 // 'close'. 392 const r = new Readable(); 393 finished(r, common.mustCall()); 394 r.resume(); 395 r.push('asd'); 396 r.destroyed = true; 397 r.push(null); 398} 399 400{ 401 // Regression https://github.com/nodejs/node/issues/33130 402 const response = new PassThrough(); 403 404 class HelloWorld extends Duplex { 405 constructor(response) { 406 super({ 407 autoDestroy: false 408 }); 409 410 this.response = response; 411 this.readMore = false; 412 413 response.once('end', () => { 414 this.push(null); 415 }); 416 417 response.on('readable', () => { 418 if (this.readMore) { 419 this._read(); 420 } 421 }); 422 } 423 424 _read() { 425 const { response } = this; 426 427 this.readMore = true; 428 429 if (response.readableLength) { 430 this.readMore = false; 431 } 432 433 let data; 434 while ((data = response.read()) !== null) { 435 this.push(data); 436 } 437 } 438 } 439 440 const instance = new HelloWorld(response); 441 instance.setEncoding('utf8'); 442 instance.end(); 443 444 (async () => { 445 await EE.once(instance, 'finish'); 446 447 setImmediate(() => { 448 response.write('chunk 1'); 449 response.write('chunk 2'); 450 response.write('chunk 3'); 451 response.end(); 452 }); 453 454 let res = ''; 455 for await (const data of instance) { 456 res += data; 457 } 458 459 assert.strictEqual(res, 'chunk 1chunk 2chunk 3'); 460 })().then(common.mustCall()); 461} 462 463{ 464 const p = new PassThrough(); 465 p.end(); 466 finished(p, common.mustNotCall()); 467} 468 469{ 470 const p = new PassThrough(); 471 p.end(); 472 p.on('finish', common.mustCall(() => { 473 finished(p, common.mustNotCall()); 474 })); 475} 476 477{ 478 const w = new Writable({ 479 write(chunk, encoding, callback) { 480 process.nextTick(callback); 481 } 482 }); 483 w.aborted = false; 484 w.end(); 485 let closed = false; 486 w.on('finish', () => { 487 assert.strictEqual(closed, false); 488 w.emit('aborted'); 489 }); 490 w.on('close', common.mustCall(() => { 491 closed = true; 492 })); 493 494 finished(w, common.mustCall(() => { 495 assert.strictEqual(closed, true); 496 })); 497} 498