• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const common = require('../common');
4const {
5  Writable,
6  Readable,
7  Transform,
8  finished,
9  Duplex,
10  PassThrough,
11  Stream,
12} = require('stream');
13const assert = require('assert');
14const EE = require('events');
15const fs = require('fs');
16const { promisify } = require('util');
17const http = require('http');
18
19{
20  const rs = new Readable({
21    read() {}
22  });
23
24  finished(rs, common.mustSucceed());
25
26  rs.push(null);
27  rs.resume();
28}
29
30{
31  const ws = new Writable({
32    write(data, enc, cb) {
33      cb();
34    }
35  });
36
37  finished(ws, common.mustSucceed());
38
39  ws.end();
40}
41
42{
43  const tr = new Transform({
44    transform(data, enc, cb) {
45      cb();
46    }
47  });
48
49  let finish = false;
50  let ended = false;
51
52  tr.on('end', () => {
53    ended = true;
54  });
55
56  tr.on('finish', () => {
57    finish = true;
58  });
59
60  finished(tr, common.mustSucceed(() => {
61    assert(finish);
62    assert(ended);
63  }));
64
65  tr.end();
66  tr.resume();
67}
68
69{
70  const rs = fs.createReadStream(__filename);
71
72  rs.resume();
73  finished(rs, common.mustCall());
74}
75
76{
77  const finishedPromise = promisify(finished);
78
79  async function run() {
80    const rs = fs.createReadStream(__filename);
81    const done = common.mustCall();
82
83    let ended = false;
84    rs.resume();
85    rs.on('end', () => {
86      ended = true;
87    });
88    await finishedPromise(rs);
89    assert(ended);
90    done();
91  }
92
93  run();
94}
95
96{
97  // Check pre-cancelled
98  const signal = new EventTarget();
99  signal.aborted = true;
100
101  const rs = Readable.from((function* () {})());
102  finished(rs, { signal }, common.mustCall((err) => {
103    assert.strictEqual(err.name, 'AbortError');
104  }));
105}
106
107{
108  // Check cancelled before the stream ends sync.
109  const ac = new AbortController();
110  const { signal } = ac;
111
112  const rs = Readable.from((function* () {})());
113  finished(rs, { signal }, common.mustCall((err) => {
114    assert.strictEqual(err.name, 'AbortError');
115  }));
116
117  ac.abort();
118}
119
120{
121  // Check cancelled before the stream ends async.
122  const ac = new AbortController();
123  const { signal } = ac;
124
125  const rs = Readable.from((function* () {})());
126  setTimeout(() => ac.abort(), 1);
127  finished(rs, { signal }, common.mustCall((err) => {
128    assert.strictEqual(err.name, 'AbortError');
129  }));
130}
131
132{
133  // Check cancelled after doesn't throw.
134  const ac = new AbortController();
135  const { signal } = ac;
136
137  const rs = Readable.from((function* () {
138    yield 5;
139    setImmediate(() => ac.abort());
140  })());
141  rs.resume();
142  finished(rs, { signal }, common.mustSucceed());
143}
144
145{
146  // Promisified abort works
147  const finishedPromise = promisify(finished);
148  async function run() {
149    const ac = new AbortController();
150    const { signal } = ac;
151    const rs = Readable.from((function* () {})());
152    setImmediate(() => ac.abort());
153    await finishedPromise(rs, { signal });
154  }
155
156  assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
157}
158
159{
160  // Promisified pre-aborted works
161  const finishedPromise = promisify(finished);
162  async function run() {
163    const signal = new EventTarget();
164    signal.aborted = true;
165    const rs = Readable.from((function* () {})());
166    await finishedPromise(rs, { signal });
167  }
168
169  assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
170}
171
172
173{
174  const rs = fs.createReadStream('file-does-not-exist');
175
176  finished(rs, common.expectsError({
177    code: 'ENOENT'
178  }));
179}
180
181{
182  const rs = new Readable();
183
184  finished(rs, common.mustSucceed());
185
186  rs.push(null);
187  rs.emit('close'); // Should not trigger an error
188  rs.resume();
189}
190
191{
192  const rs = new Readable();
193
194  finished(rs, common.mustCall((err) => {
195    assert(err, 'premature close error');
196  }));
197
198  rs.emit('close'); // Should trigger error
199  rs.push(null);
200  rs.resume();
201}
202
203// Test faulty input values and options.
204{
205  const rs = new Readable({
206    read() {}
207  });
208
209  assert.throws(
210    () => finished(rs, 'foo'),
211    {
212      code: 'ERR_INVALID_ARG_TYPE',
213      message: /callback/
214    }
215  );
216  assert.throws(
217    () => finished(rs, 'foo', () => {}),
218    {
219      code: 'ERR_INVALID_ARG_TYPE',
220      message: /options/
221    }
222  );
223  assert.throws(
224    () => finished(rs, {}, 'foo'),
225    {
226      code: 'ERR_INVALID_ARG_TYPE',
227      message: /callback/
228    }
229  );
230
231  finished(rs, null, common.mustCall());
232
233  rs.push(null);
234  rs.resume();
235}
236
237// Test that calling returned function removes listeners
238{
239  const ws = new Writable({
240    write(data, env, cb) {
241      cb();
242    }
243  });
244  const removeListener = finished(ws, common.mustNotCall());
245  removeListener();
246  ws.end();
247}
248
249{
250  const rs = new Readable();
251  const removeListeners = finished(rs, common.mustNotCall());
252  removeListeners();
253
254  rs.emit('close');
255  rs.push(null);
256  rs.resume();
257}
258
259{
260  const streamLike = new EE();
261  streamLike.readableEnded = true;
262  streamLike.readable = true;
263  assert.throws(
264    () => {
265      finished(streamLike, () => {});
266    },
267    { code: 'ERR_INVALID_ARG_TYPE' }
268  );
269  streamLike.emit('close');
270}
271
272{
273  const writable = new Writable({ write() {} });
274  writable.writable = false;
275  writable.destroy();
276  finished(writable, common.mustCall((err) => {
277    assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
278  }));
279}
280
281{
282  const readable = new Readable();
283  readable.readable = false;
284  readable.destroy();
285  finished(readable, common.mustCall((err) => {
286    assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
287  }));
288}
289
290{
291  const w = new Writable({
292    write(chunk, encoding, callback) {
293      setImmediate(callback);
294    }
295  });
296  finished(w, common.mustCall((err) => {
297    assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
298  }));
299  w.end('asd');
300  w.destroy();
301}
302
303function testClosed(factory) {
304  {
305    // If already destroyed but finished is cancelled in same tick
306    // don't invoke the callback,
307
308    const s = factory();
309    s.destroy();
310    const dispose = finished(s, common.mustNotCall());
311    dispose();
312  }
313
314  {
315    // If already destroyed invoked callback.
316
317    const s = factory();
318    s.destroy();
319    finished(s, common.mustCall());
320  }
321
322  {
323    // Don't invoke until destroy has completed.
324
325    let destroyed = false;
326    const s = factory({
327      destroy(err, cb) {
328        setImmediate(() => {
329          destroyed = true;
330          cb();
331        });
332      }
333    });
334    s.destroy();
335    finished(s, common.mustCall(() => {
336      assert.strictEqual(destroyed, true);
337    }));
338  }
339
340  {
341    // Invoke callback even if close is inhibited.
342
343    const s = factory({
344      emitClose: false,
345      destroy(err, cb) {
346        cb();
347        finished(s, common.mustCall());
348      }
349    });
350    s.destroy();
351  }
352
353  {
354    // Invoke with deep async.
355
356    const s = factory({
357      destroy(err, cb) {
358        setImmediate(() => {
359          cb();
360          setImmediate(() => {
361            finished(s, common.mustCall());
362          });
363        });
364      }
365    });
366    s.destroy();
367  }
368}
369
370testClosed((opts) => new Readable({ ...opts }));
371testClosed((opts) => new Writable({ write() {}, ...opts }));
372
373{
374  const w = new Writable({
375    write(chunk, encoding, cb) {
376      cb();
377    },
378    autoDestroy: false
379  });
380  w.end('asd');
381  process.nextTick(() => {
382    finished(w, common.mustCall());
383  });
384}
385
386{
387  const w = new Writable({
388    write(chunk, encoding, cb) {
389      cb(new Error());
390    },
391    autoDestroy: false
392  });
393  w.write('asd');
394  w.on('error', common.mustCall(() => {
395    finished(w, common.mustCall());
396  }));
397}
398
399{
400  const r = new Readable({
401    autoDestroy: false
402  });
403  r.push(null);
404  r.resume();
405  r.on('end', common.mustCall(() => {
406    finished(r, common.mustCall());
407  }));
408}
409
410{
411  const rs = fs.createReadStream(__filename, { autoClose: false });
412  rs.resume();
413  rs.on('close', common.mustNotCall());
414  rs.on('end', common.mustCall(() => {
415    finished(rs, common.mustCall());
416  }));
417}
418
419{
420  const d = new EE();
421  d._writableState = {};
422  d._writableState.finished = true;
423  finished(d, { readable: false, writable: true }, common.mustCall((err) => {
424    assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
425  }));
426  d._writableState.errored = true;
427  d.emit('close');
428}
429
430{
431  const r = new Readable();
432  finished(r, common.mustCall((err) => {
433    assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
434  }));
435  r.push('asd');
436  r.push(null);
437  r.destroy();
438}
439
440{
441  const d = new Duplex({
442    final(cb) { }, // Never close writable side for test purpose
443    read() {
444      this.push(null);
445    }
446  });
447
448  d.on('end', common.mustCall());
449
450  finished(d, { readable: true, writable: false }, common.mustCall());
451
452  d.end();
453  d.resume();
454}
455
456{
457  const d = new Duplex({
458    final(cb) { }, // Never close writable side for test purpose
459    read() {
460      this.push(null);
461    }
462  });
463
464  d.on('end', common.mustCall());
465
466  d.end();
467  finished(d, { readable: true, writable: false }, common.mustCall());
468
469  d.resume();
470}
471
472{
473  // Test for compat for e.g. fd-slicer which implements
474  // non standard destroy behavior which might not emit
475  // 'close'.
476  const r = new Readable();
477  finished(r, common.mustCall());
478  r.resume();
479  r.push('asd');
480  r.destroyed = true;
481  r.push(null);
482}
483
484{
485  // Regression https://github.com/nodejs/node/issues/33130
486  const response = new PassThrough();
487
488  class HelloWorld extends Duplex {
489    constructor(response) {
490      super({
491        autoDestroy: false
492      });
493
494      this.response = response;
495      this.readMore = false;
496
497      response.once('end', () => {
498        this.push(null);
499      });
500
501      response.on('readable', () => {
502        if (this.readMore) {
503          this._read();
504        }
505      });
506    }
507
508    _read() {
509      const { response } = this;
510
511      this.readMore = true;
512
513      if (response.readableLength) {
514        this.readMore = false;
515      }
516
517      let data;
518      while ((data = response.read()) !== null) {
519        this.push(data);
520      }
521    }
522  }
523
524  const instance = new HelloWorld(response);
525  instance.setEncoding('utf8');
526  instance.end();
527
528  (async () => {
529    await EE.once(instance, 'finish');
530
531    setImmediate(() => {
532      response.write('chunk 1');
533      response.write('chunk 2');
534      response.write('chunk 3');
535      response.end();
536    });
537
538    let res = '';
539    for await (const data of instance) {
540      res += data;
541    }
542
543    assert.strictEqual(res, 'chunk 1chunk 2chunk 3');
544  })().then(common.mustCall());
545}
546
547{
548  const p = new PassThrough();
549  p.end();
550  finished(p, common.mustNotCall());
551}
552
553{
554  const p = new PassThrough();
555  p.end();
556  p.on('finish', common.mustCall(() => {
557    finished(p, common.mustNotCall());
558  }));
559}
560
561{
562  const server = http.createServer(common.mustCall((req, res) => {
563    res.on('close', common.mustCall(() => {
564      finished(res, common.mustCall(() => {
565        server.close();
566      }));
567    }));
568    res.end();
569  }))
570  .listen(0, function() {
571    http.request({
572      method: 'GET',
573      port: this.address().port
574    }).end()
575      .on('response', common.mustCall());
576  });
577}
578
579{
580  const server = http.createServer(common.mustCall((req, res) => {
581    req.on('close', common.mustCall(() => {
582      finished(req, common.mustCall(() => {
583        server.close();
584      }));
585    }));
586    req.destroy();
587  })).listen(0, function() {
588    http.request({
589      method: 'GET',
590      port: this.address().port
591    }).end().on('error', common.mustCall());
592  });
593}
594
595{
596  const w = new Writable({
597    write(chunk, encoding, callback) {
598      process.nextTick(callback);
599    }
600  });
601  w.aborted = false;
602  w.end();
603  let closed = false;
604  w.on('finish', () => {
605    assert.strictEqual(closed, false);
606    w.emit('aborted');
607  });
608  w.on('close', common.mustCall(() => {
609    closed = true;
610  }));
611
612  finished(w, common.mustCall(() => {
613    assert.strictEqual(closed, true);
614  }));
615}
616
617{
618  const w = new Writable();
619  const _err = new Error();
620  w.destroy(_err);
621  assert.strictEqual(w.errored, _err);
622  finished(w, common.mustCall((err) => {
623    assert.strictEqual(_err, err);
624    assert.strictEqual(w.closed, true);
625    finished(w, common.mustCall((err) => {
626      assert.strictEqual(_err, err);
627    }));
628  }));
629}
630
631{
632  const w = new Writable();
633  w.destroy();
634  assert.strictEqual(w.errored, null);
635  finished(w, common.mustCall((err) => {
636    assert.strictEqual(w.closed, true);
637    assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
638    finished(w, common.mustCall((err) => {
639      assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
640    }));
641  }));
642}
643
644{
645  // Legacy Streams do not inherit from Readable or Writable.
646  // We cannot really assume anything about them, so we cannot close them
647  // automatically.
648  const s = new Stream();
649  finished(s, common.mustNotCall());
650}
651
652{
653  const server = http.createServer(common.mustCall(function(req, res) {
654    fs.createReadStream(__filename).pipe(res);
655    finished(res, common.mustCall(function(err) {
656      assert.strictEqual(err, undefined);
657    }));
658  })).listen(0, function() {
659    http.request(
660      { method: 'GET', port: this.address().port },
661      common.mustCall(function(res) {
662        res.resume();
663        finished(res, common.mustCall(() => {
664          server.close();
665        }));
666      })
667    ).end();
668  });
669}
670
671{
672  const stream = new Duplex({
673    write(chunk, enc, cb) {
674      setImmediate(cb);
675    }
676  });
677
678  stream.end('foo');
679
680  finished(stream, { readable: false }, common.mustCall((err) => {
681    assert(!err);
682  }));
683}
684