• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const common = require('../common');
4const {
5  Writable,
6  Readable,
7  Transform,
8  finished,
9  Duplex,
10  PassThrough
11} = require('stream');
12const assert = require('assert');
13const EE = require('events');
14const fs = require('fs');
15const { promisify } = require('util');
16
17{
18  const rs = new Readable({
19    read() {}
20  });
21
22  finished(rs, common.mustSucceed());
23
24  rs.push(null);
25  rs.resume();
26}
27
28{
29  const ws = new Writable({
30    write(data, enc, cb) {
31      cb();
32    }
33  });
34
35  finished(ws, common.mustSucceed());
36
37  ws.end();
38}
39
40{
41  const tr = new Transform({
42    transform(data, enc, cb) {
43      cb();
44    }
45  });
46
47  let finish = false;
48  let ended = false;
49
50  tr.on('end', () => {
51    ended = true;
52  });
53
54  tr.on('finish', () => {
55    finish = true;
56  });
57
58  finished(tr, common.mustSucceed(() => {
59    assert(finish);
60    assert(ended);
61  }));
62
63  tr.end();
64  tr.resume();
65}
66
67{
68  const rs = fs.createReadStream(__filename);
69
70  rs.resume();
71  finished(rs, common.mustCall());
72}
73
74{
75  const finishedPromise = promisify(finished);
76
77  async function run() {
78    const rs = fs.createReadStream(__filename);
79    const done = common.mustCall();
80
81    let ended = false;
82    rs.resume();
83    rs.on('end', () => {
84      ended = true;
85    });
86    await finishedPromise(rs);
87    assert(ended);
88    done();
89  }
90
91  run();
92}
93
94{
95  const rs = fs.createReadStream('file-does-not-exist');
96
97  finished(rs, common.expectsError({
98    code: 'ENOENT'
99  }));
100}
101
102{
103  const rs = new Readable();
104
105  finished(rs, common.mustSucceed());
106
107  rs.push(null);
108  rs.emit('close'); // Should not trigger an error
109  rs.resume();
110}
111
112{
113  const rs = new Readable();
114
115  finished(rs, common.mustCall((err) => {
116    assert(err, 'premature close error');
117  }));
118
119  rs.emit('close'); // Should trigger error
120  rs.push(null);
121  rs.resume();
122}
123
124// Test faulty input values and options.
125{
126  const rs = new Readable({
127    read() {}
128  });
129
130  assert.throws(
131    () => finished(rs, 'foo'),
132    {
133      code: 'ERR_INVALID_ARG_TYPE',
134      message: /callback/
135    }
136  );
137  assert.throws(
138    () => finished(rs, 'foo', () => {}),
139    {
140      code: 'ERR_INVALID_ARG_TYPE',
141      message: /options/
142    }
143  );
144  assert.throws(
145    () => finished(rs, {}, 'foo'),
146    {
147      code: 'ERR_INVALID_ARG_TYPE',
148      message: /callback/
149    }
150  );
151
152  finished(rs, null, common.mustCall());
153
154  rs.push(null);
155  rs.resume();
156}
157
158// Test that calling returned function removes listeners
159{
160  const ws = new Writable({
161    write(data, env, cb) {
162      cb();
163    }
164  });
165  const removeListener = finished(ws, common.mustNotCall());
166  removeListener();
167  ws.end();
168}
169
170{
171  const rs = new Readable();
172  const removeListeners = finished(rs, common.mustNotCall());
173  removeListeners();
174
175  rs.emit('close');
176  rs.push(null);
177  rs.resume();
178}
179
180{
181  const streamLike = new EE();
182  streamLike.readableEnded = true;
183  streamLike.readable = true;
184  finished(streamLike, common.mustCall());
185  streamLike.emit('close');
186}
187
188{
189  const writable = new Writable({ write() {} });
190  writable.writable = false;
191  writable.destroy();
192  finished(writable, common.mustCall((err) => {
193    assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
194  }));
195}
196
197{
198  const readable = new Readable();
199  readable.readable = false;
200  readable.destroy();
201  finished(readable, common.mustCall((err) => {
202    assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
203  }));
204}
205
206{
207  const w = new Writable({
208    write(chunk, encoding, callback) {
209      setImmediate(callback);
210    }
211  });
212  finished(w, common.mustCall((err) => {
213    assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
214  }));
215  w.end('asd');
216  w.destroy();
217}
218
219function testClosed(factory) {
220  {
221    // If already destroyed but finished is cancelled in same tick
222    // don't invoke the callback,
223
224    const s = factory();
225    s.destroy();
226    const dispose = finished(s, common.mustNotCall());
227    dispose();
228  }
229
230  {
231    // If already destroyed invoked callback.
232
233    const s = factory();
234    s.destroy();
235    finished(s, common.mustCall());
236  }
237
238  {
239    // Don't invoke until destroy has completed.
240
241    let destroyed = false;
242    const s = factory({
243      destroy(err, cb) {
244        setImmediate(() => {
245          destroyed = true;
246          cb();
247        });
248      }
249    });
250    s.destroy();
251    finished(s, common.mustCall(() => {
252      assert.strictEqual(destroyed, true);
253    }));
254  }
255
256  {
257    // Invoke callback even if close is inhibited.
258
259    const s = factory({
260      emitClose: false,
261      destroy(err, cb) {
262        cb();
263        finished(s, common.mustCall());
264      }
265    });
266    s.destroy();
267  }
268
269  {
270    // Invoke with deep async.
271
272    const s = factory({
273      destroy(err, cb) {
274        setImmediate(() => {
275          cb();
276          setImmediate(() => {
277            finished(s, common.mustCall());
278          });
279        });
280      }
281    });
282    s.destroy();
283  }
284}
285
286testClosed((opts) => new Readable({ ...opts }));
287testClosed((opts) => new Writable({ write() {}, ...opts }));
288
289{
290  const w = new Writable({
291    write(chunk, encoding, cb) {
292      cb();
293    },
294    autoDestroy: false
295  });
296  w.end('asd');
297  process.nextTick(() => {
298    finished(w, common.mustCall());
299  });
300}
301
302{
303  const w = new Writable({
304    write(chunk, encoding, cb) {
305      cb(new Error());
306    },
307    autoDestroy: false
308  });
309  w.write('asd');
310  w.on('error', common.mustCall(() => {
311    finished(w, common.mustCall());
312  }));
313}
314
315{
316  const r = new Readable({
317    autoDestroy: false
318  });
319  r.push(null);
320  r.resume();
321  r.on('end', common.mustCall(() => {
322    finished(r, common.mustCall());
323  }));
324}
325
326{
327  const rs = fs.createReadStream(__filename, { autoClose: false });
328  rs.resume();
329  rs.on('close', common.mustNotCall());
330  rs.on('end', common.mustCall(() => {
331    finished(rs, common.mustCall());
332  }));
333}
334
335{
336  const d = new EE();
337  d._writableState = {};
338  d._writableState.finished = true;
339  finished(d, { readable: false, writable: true }, common.mustCall((err) => {
340    assert.strictEqual(err, undefined);
341  }));
342  d._writableState.errored = true;
343  d.emit('close');
344}
345
346{
347  const r = new Readable();
348  finished(r, common.mustCall((err) => {
349    assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
350  }));
351  r.push('asd');
352  r.push(null);
353  r.destroy();
354}
355
356{
357  const d = new Duplex({
358    final(cb) { }, // Never close writable side for test purpose
359    read() {
360      this.push(null);
361    }
362  });
363
364  d.on('end', common.mustCall());
365
366  finished(d, { readable: true, writable: false }, common.mustCall());
367
368  d.end();
369  d.resume();
370}
371
372{
373  const d = new Duplex({
374    final(cb) { }, // Never close writable side for test purpose
375    read() {
376      this.push(null);
377    }
378  });
379
380  d.on('end', common.mustCall());
381
382  d.end();
383  finished(d, { readable: true, writable: false }, common.mustCall());
384
385  d.resume();
386}
387
388{
389  // Test for compat for e.g. fd-slicer which implements
390  // non standard destroy behavior which might not emit
391  // 'close'.
392  const r = new Readable();
393  finished(r, common.mustCall());
394  r.resume();
395  r.push('asd');
396  r.destroyed = true;
397  r.push(null);
398}
399
400{
401  // Regression https://github.com/nodejs/node/issues/33130
402  const response = new PassThrough();
403
404  class HelloWorld extends Duplex {
405    constructor(response) {
406      super({
407        autoDestroy: false
408      });
409
410      this.response = response;
411      this.readMore = false;
412
413      response.once('end', () => {
414        this.push(null);
415      });
416
417      response.on('readable', () => {
418        if (this.readMore) {
419          this._read();
420        }
421      });
422    }
423
424    _read() {
425      const { response } = this;
426
427      this.readMore = true;
428
429      if (response.readableLength) {
430        this.readMore = false;
431      }
432
433      let data;
434      while ((data = response.read()) !== null) {
435        this.push(data);
436      }
437    }
438  }
439
440  const instance = new HelloWorld(response);
441  instance.setEncoding('utf8');
442  instance.end();
443
444  (async () => {
445    await EE.once(instance, 'finish');
446
447    setImmediate(() => {
448      response.write('chunk 1');
449      response.write('chunk 2');
450      response.write('chunk 3');
451      response.end();
452    });
453
454    let res = '';
455    for await (const data of instance) {
456      res += data;
457    }
458
459    assert.strictEqual(res, 'chunk 1chunk 2chunk 3');
460  })().then(common.mustCall());
461}
462
463{
464  const p = new PassThrough();
465  p.end();
466  finished(p, common.mustNotCall());
467}
468
469{
470  const p = new PassThrough();
471  p.end();
472  p.on('finish', common.mustCall(() => {
473    finished(p, common.mustNotCall());
474  }));
475}
476
477{
478  const w = new Writable({
479    write(chunk, encoding, callback) {
480      process.nextTick(callback);
481    }
482  });
483  w.aborted = false;
484  w.end();
485  let closed = false;
486  w.on('finish', () => {
487    assert.strictEqual(closed, false);
488    w.emit('aborted');
489  });
490  w.on('close', common.mustCall(() => {
491    closed = true;
492  }));
493
494  finished(w, common.mustCall(() => {
495    assert.strictEqual(closed, true);
496  }));
497}
498