• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const common = require('../common');
4const {
5  Stream,
6  Writable,
7  Readable,
8  Transform,
9  pipeline,
10  PassThrough,
11  Duplex
12} = require('stream');
13const assert = require('assert');
14const http = require('http');
15const { promisify } = require('util');
16const net = require('net');
17
18{
19  let finished = false;
20  const processed = [];
21  const expected = [
22    Buffer.from('a'),
23    Buffer.from('b'),
24    Buffer.from('c'),
25  ];
26
27  const read = new Readable({
28    read() {}
29  });
30
31  const write = new Writable({
32    write(data, enc, cb) {
33      processed.push(data);
34      cb();
35    }
36  });
37
38  write.on('finish', () => {
39    finished = true;
40  });
41
42  for (let i = 0; i < expected.length; i++) {
43    read.push(expected[i]);
44  }
45  read.push(null);
46
47  pipeline(read, write, common.mustSucceed(() => {
48    assert.ok(finished);
49    assert.deepStrictEqual(processed, expected);
50  }));
51}
52
53{
54  const read = new Readable({
55    read() {}
56  });
57
58  assert.throws(() => {
59    pipeline(read, () => {});
60  }, /ERR_MISSING_ARGS/);
61  assert.throws(() => {
62    pipeline(() => {});
63  }, /ERR_MISSING_ARGS/);
64  assert.throws(() => {
65    pipeline();
66  }, /ERR_INVALID_CALLBACK/);
67}
68
69{
70  const read = new Readable({
71    read() {}
72  });
73
74  const write = new Writable({
75    write(data, enc, cb) {
76      cb();
77    }
78  });
79
80  read.push('data');
81  setImmediate(() => read.destroy());
82
83  pipeline(read, write, common.mustCall((err) => {
84    assert.ok(err, 'should have an error');
85  }));
86}
87
88{
89  const read = new Readable({
90    read() {}
91  });
92
93  const write = new Writable({
94    write(data, enc, cb) {
95      cb();
96    }
97  });
98
99  read.push('data');
100  setImmediate(() => read.destroy(new Error('kaboom')));
101
102  const dst = pipeline(read, write, common.mustCall((err) => {
103    assert.deepStrictEqual(err, new Error('kaboom'));
104  }));
105
106  assert.strictEqual(dst, write);
107}
108
109{
110  const read = new Readable({
111    read() {}
112  });
113
114  const transform = new Transform({
115    transform(data, enc, cb) {
116      cb(new Error('kaboom'));
117    }
118  });
119
120  const write = new Writable({
121    write(data, enc, cb) {
122      cb();
123    }
124  });
125
126  read.on('close', common.mustCall());
127  transform.on('close', common.mustCall());
128  write.on('close', common.mustCall());
129
130  [read, transform, write].forEach((stream) => {
131    stream.on('error', common.mustCall((err) => {
132      assert.deepStrictEqual(err, new Error('kaboom'));
133    }));
134  });
135
136  const dst = pipeline(read, transform, write, common.mustCall((err) => {
137    assert.deepStrictEqual(err, new Error('kaboom'));
138  }));
139
140  assert.strictEqual(dst, write);
141
142  read.push('hello');
143}
144
145{
146  const server = http.createServer((req, res) => {
147    const rs = new Readable({
148      read() {
149        rs.push('hello');
150        rs.push(null);
151      }
152    });
153
154    pipeline(rs, res, () => {});
155  });
156
157  server.listen(0, () => {
158    const req = http.request({
159      port: server.address().port
160    });
161
162    req.end();
163    req.on('response', (res) => {
164      const buf = [];
165      res.on('data', (data) => buf.push(data));
166      res.on('end', common.mustCall(() => {
167        assert.deepStrictEqual(
168          Buffer.concat(buf),
169          Buffer.from('hello')
170        );
171        server.close();
172      }));
173    });
174  });
175}
176
177{
178  const server = http.createServer((req, res) => {
179    let sent = false;
180    const rs = new Readable({
181      read() {
182        if (sent) {
183          return;
184        }
185        sent = true;
186        rs.push('hello');
187      },
188      destroy: common.mustCall((err, cb) => {
189        // Prevents fd leaks by destroying http pipelines
190        cb();
191      })
192    });
193
194    pipeline(rs, res, () => {});
195  });
196
197  server.listen(0, () => {
198    const req = http.request({
199      port: server.address().port
200    });
201
202    req.end();
203    req.on('response', (res) => {
204      setImmediate(() => {
205        res.destroy();
206        server.close();
207      });
208    });
209  });
210}
211
212{
213  const server = http.createServer((req, res) => {
214    let sent = 0;
215    const rs = new Readable({
216      read() {
217        if (sent++ > 10) {
218          return;
219        }
220        rs.push('hello');
221      },
222      destroy: common.mustCall((err, cb) => {
223        cb();
224      })
225    });
226
227    pipeline(rs, res, () => {});
228  });
229
230  let cnt = 10;
231
232  const badSink = new Writable({
233    write(data, enc, cb) {
234      cnt--;
235      if (cnt === 0) cb(new Error('kaboom'));
236      else cb();
237    }
238  });
239
240  server.listen(0, () => {
241    const req = http.request({
242      port: server.address().port
243    });
244
245    req.end();
246    req.on('response', (res) => {
247      pipeline(res, badSink, common.mustCall((err) => {
248        assert.deepStrictEqual(err, new Error('kaboom'));
249        server.close();
250      }));
251    });
252  });
253}
254
255{
256  const server = http.createServer((req, res) => {
257    pipeline(req, res, common.mustSucceed());
258  });
259
260  server.listen(0, () => {
261    const req = http.request({
262      port: server.address().port
263    });
264
265    let sent = 0;
266    const rs = new Readable({
267      read() {
268        if (sent++ > 10) {
269          return;
270        }
271        rs.push('hello');
272      }
273    });
274
275    pipeline(rs, req, common.mustCall(() => {
276      server.close();
277    }));
278
279    req.on('response', (res) => {
280      let cnt = 10;
281      res.on('data', () => {
282        cnt--;
283        if (cnt === 0) rs.destroy();
284      });
285    });
286  });
287}
288
289{
290  const makeTransform = () => {
291    const tr = new Transform({
292      transform(data, enc, cb) {
293        cb(null, data);
294      }
295    });
296
297    tr.on('close', common.mustCall());
298    return tr;
299  };
300
301  const rs = new Readable({
302    read() {
303      rs.push('hello');
304    }
305  });
306
307  let cnt = 10;
308
309  const ws = new Writable({
310    write(data, enc, cb) {
311      cnt--;
312      if (cnt === 0) return cb(new Error('kaboom'));
313      cb();
314    }
315  });
316
317  rs.on('close', common.mustCall());
318  ws.on('close', common.mustCall());
319
320  pipeline(
321    rs,
322    makeTransform(),
323    makeTransform(),
324    makeTransform(),
325    makeTransform(),
326    makeTransform(),
327    makeTransform(),
328    ws,
329    common.mustCall((err) => {
330      assert.deepStrictEqual(err, new Error('kaboom'));
331    })
332  );
333}
334
335{
336  const oldStream = new Stream();
337
338  oldStream.pause = oldStream.resume = () => {};
339  oldStream.write = (data) => {
340    oldStream.emit('data', data);
341    return true;
342  };
343  oldStream.end = () => {
344    oldStream.emit('end');
345  };
346
347  const expected = [
348    Buffer.from('hello'),
349    Buffer.from('world'),
350  ];
351
352  const rs = new Readable({
353    read() {
354      for (let i = 0; i < expected.length; i++) {
355        rs.push(expected[i]);
356      }
357      rs.push(null);
358    }
359  });
360
361  const ws = new Writable({
362    write(data, enc, cb) {
363      assert.deepStrictEqual(data, expected.shift());
364      cb();
365    }
366  });
367
368  let finished = false;
369
370  ws.on('finish', () => {
371    finished = true;
372  });
373
374  pipeline(
375    rs,
376    oldStream,
377    ws,
378    common.mustSucceed(() => {
379      assert(finished, 'last stream finished');
380    })
381  );
382}
383
384{
385  const oldStream = new Stream();
386
387  oldStream.pause = oldStream.resume = () => {};
388  oldStream.write = (data) => {
389    oldStream.emit('data', data);
390    return true;
391  };
392  oldStream.end = () => {
393    oldStream.emit('end');
394  };
395
396  const destroyableOldStream = new Stream();
397
398  destroyableOldStream.pause = destroyableOldStream.resume = () => {};
399  destroyableOldStream.destroy = common.mustCall(() => {
400    destroyableOldStream.emit('close');
401  });
402  destroyableOldStream.write = (data) => {
403    destroyableOldStream.emit('data', data);
404    return true;
405  };
406  destroyableOldStream.end = () => {
407    destroyableOldStream.emit('end');
408  };
409
410  const rs = new Readable({
411    read() {
412      rs.destroy(new Error('stop'));
413    }
414  });
415
416  const ws = new Writable({
417    write(data, enc, cb) {
418      cb();
419    }
420  });
421
422  let finished = false;
423
424  ws.on('finish', () => {
425    finished = true;
426  });
427
428  pipeline(
429    rs,
430    oldStream,
431    destroyableOldStream,
432    ws,
433    common.mustCall((err) => {
434      assert.deepStrictEqual(err, new Error('stop'));
435      assert(!finished, 'should not finish');
436    })
437  );
438}
439
440{
441  const pipelinePromise = promisify(pipeline);
442
443  async function run() {
444    const read = new Readable({
445      read() {}
446    });
447
448    const write = new Writable({
449      write(data, enc, cb) {
450        cb();
451      }
452    });
453
454    read.push('data');
455    read.push(null);
456
457    let finished = false;
458
459    write.on('finish', () => {
460      finished = true;
461    });
462
463    await pipelinePromise(read, write);
464
465    assert(finished);
466  }
467
468  run();
469}
470
471{
472  const read = new Readable({
473    read() {}
474  });
475
476  const transform = new Transform({
477    transform(data, enc, cb) {
478      cb(new Error('kaboom'));
479    }
480  });
481
482  const write = new Writable({
483    write(data, enc, cb) {
484      cb();
485    }
486  });
487
488  assert.throws(
489    () => pipeline(read, transform, write),
490    { code: 'ERR_INVALID_CALLBACK' }
491  );
492}
493
494{
495  const server = http.Server(function(req, res) {
496    res.write('asd');
497  });
498  server.listen(0, function() {
499    http.get({ port: this.address().port }, (res) => {
500      const stream = new PassThrough();
501
502      stream.on('error', common.mustCall());
503
504      pipeline(
505        res,
506        stream,
507        common.mustCall((err) => {
508          assert.strictEqual(err.message, 'oh no');
509          server.close();
510        })
511      );
512
513      stream.destroy(new Error('oh no'));
514    }).on('error', common.mustNotCall());
515  });
516}
517
518{
519  let res = '';
520  const w = new Writable({
521    write(chunk, encoding, callback) {
522      res += chunk;
523      callback();
524    }
525  });
526  pipeline(function*() {
527    yield 'hello';
528    yield 'world';
529  }(), w, common.mustSucceed(() => {
530    assert.strictEqual(res, 'helloworld');
531  }));
532}
533
534{
535  let res = '';
536  const w = new Writable({
537    write(chunk, encoding, callback) {
538      res += chunk;
539      callback();
540    }
541  });
542  pipeline(async function*() {
543    await Promise.resolve();
544    yield 'hello';
545    yield 'world';
546  }(), w, common.mustSucceed(() => {
547    assert.strictEqual(res, 'helloworld');
548  }));
549}
550
551{
552  let res = '';
553  const w = new Writable({
554    write(chunk, encoding, callback) {
555      res += chunk;
556      callback();
557    }
558  });
559  pipeline(function*() {
560    yield 'hello';
561    yield 'world';
562  }, w, common.mustSucceed(() => {
563    assert.strictEqual(res, 'helloworld');
564  }));
565}
566
567{
568  let res = '';
569  const w = new Writable({
570    write(chunk, encoding, callback) {
571      res += chunk;
572      callback();
573    }
574  });
575  pipeline(async function*() {
576    await Promise.resolve();
577    yield 'hello';
578    yield 'world';
579  }, w, common.mustSucceed(() => {
580    assert.strictEqual(res, 'helloworld');
581  }));
582}
583
584{
585  let res = '';
586  pipeline(async function*() {
587    await Promise.resolve();
588    yield 'hello';
589    yield 'world';
590  }, async function*(source) {
591    for await (const chunk of source) {
592      yield chunk.toUpperCase();
593    }
594  }, async function(source) {
595    for await (const chunk of source) {
596      res += chunk;
597    }
598  }, common.mustSucceed(() => {
599    assert.strictEqual(res, 'HELLOWORLD');
600  }));
601}
602
603{
604  pipeline(async function*() {
605    await Promise.resolve();
606    yield 'hello';
607    yield 'world';
608  }, async function*(source) {
609    for await (const chunk of source) {
610      yield chunk.toUpperCase();
611    }
612  }, async function(source) {
613    let ret = '';
614    for await (const chunk of source) {
615      ret += chunk;
616    }
617    return ret;
618  }, common.mustSucceed((val) => {
619    assert.strictEqual(val, 'HELLOWORLD');
620  }));
621}
622
623{
624  // AsyncIterable destination is returned and finalizes.
625
626  const ret = pipeline(async function*() {
627    await Promise.resolve();
628    yield 'hello';
629  }, async function*(source) {
630    for await (const chunk of source) {}
631  }, common.mustCall((err) => {
632    assert.strictEqual(err, undefined);
633  }));
634  ret.resume();
635  assert.strictEqual(typeof ret.pipe, 'function');
636}
637
638{
639  // AsyncFunction destination is not returned and error is
640  // propagated.
641
642  const ret = pipeline(async function*() {
643    await Promise.resolve();
644    throw new Error('kaboom');
645  }, async function*(source) {
646    for await (const chunk of source) {}
647  }, common.mustCall((err) => {
648    assert.strictEqual(err.message, 'kaboom');
649  }));
650  ret.resume();
651  assert.strictEqual(typeof ret.pipe, 'function');
652}
653
654{
655  const s = new PassThrough();
656  pipeline(async function*() {
657    throw new Error('kaboom');
658  }, s, common.mustCall((err) => {
659    assert.strictEqual(err.message, 'kaboom');
660    assert.strictEqual(s.destroyed, true);
661  }));
662}
663
664{
665  const s = new PassThrough();
666  pipeline(async function*() {
667    throw new Error('kaboom');
668  }(), s, common.mustCall((err) => {
669    assert.strictEqual(err.message, 'kaboom');
670    assert.strictEqual(s.destroyed, true);
671  }));
672}
673
674{
675  const s = new PassThrough();
676  pipeline(function*() {
677    throw new Error('kaboom');
678  }, s, common.mustCall((err, val) => {
679    assert.strictEqual(err.message, 'kaboom');
680    assert.strictEqual(s.destroyed, true);
681  }));
682}
683
684{
685  const s = new PassThrough();
686  pipeline(function*() {
687    throw new Error('kaboom');
688  }(), s, common.mustCall((err, val) => {
689    assert.strictEqual(err.message, 'kaboom');
690    assert.strictEqual(s.destroyed, true);
691  }));
692}
693
694{
695  const s = new PassThrough();
696  pipeline(async function*() {
697    await Promise.resolve();
698    yield 'hello';
699    yield 'world';
700  }, s, async function(source) {
701    for await (const chunk of source) {
702      throw new Error('kaboom');
703    }
704  }, common.mustCall((err, val) => {
705    assert.strictEqual(err.message, 'kaboom');
706    assert.strictEqual(s.destroyed, true);
707  }));
708}
709
710{
711  const s = new PassThrough();
712  const ret = pipeline(function() {
713    return ['hello', 'world'];
714  }, s, async function*(source) {
715    for await (const chunk of source) {
716      throw new Error('kaboom');
717    }
718  }, common.mustCall((err) => {
719    assert.strictEqual(err.message, 'kaboom');
720    assert.strictEqual(s.destroyed, true);
721  }));
722  ret.resume();
723  assert.strictEqual(typeof ret.pipe, 'function');
724}
725
726{
727  // Legacy streams without async iterator.
728
729  const s = new PassThrough();
730  s.push('asd');
731  s.push(null);
732  s[Symbol.asyncIterator] = null;
733  let ret = '';
734  pipeline(s, async function(source) {
735    for await (const chunk of source) {
736      ret += chunk;
737    }
738  }, common.mustCall((err) => {
739    assert.strictEqual(err, undefined);
740    assert.strictEqual(ret, 'asd');
741  }));
742}
743
744{
745  // v1 streams without read().
746
747  const s = new Stream();
748  process.nextTick(() => {
749    s.emit('data', 'asd');
750    s.emit('end');
751  });
752  // 'destroyer' can be called multiple times,
753  // once from stream wrapper and
754  // once from iterator wrapper.
755  s.close = common.mustCallAtLeast(1);
756  let ret = '';
757  pipeline(s, async function(source) {
758    for await (const chunk of source) {
759      ret += chunk;
760    }
761  }, common.mustCall((err) => {
762    assert.strictEqual(err, undefined);
763    assert.strictEqual(ret, 'asd');
764  }));
765}
766
767{
768  // v1 error streams without read().
769
770  const s = new Stream();
771  process.nextTick(() => {
772    s.emit('error', new Error('kaboom'));
773  });
774  s.destroy = common.mustCall();
775  pipeline(s, async function(source) {
776  }, common.mustCall((err) => {
777    assert.strictEqual(err.message, 'kaboom');
778  }));
779}
780
781{
782  const s = new PassThrough();
783  assert.throws(() => {
784    pipeline(function(source) {
785    }, s, () => {});
786  }, (err) => {
787    assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
788    assert.strictEqual(s.destroyed, false);
789    return true;
790  });
791}
792
793{
794  const s = new PassThrough();
795  assert.throws(() => {
796    pipeline(s, function(source) {
797    }, s, () => {});
798  }, (err) => {
799    assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
800    assert.strictEqual(s.destroyed, false);
801    return true;
802  });
803}
804
805{
806  const s = new PassThrough();
807  assert.throws(() => {
808    pipeline(s, function(source) {
809    }, () => {});
810  }, (err) => {
811    assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
812    assert.strictEqual(s.destroyed, false);
813    return true;
814  });
815}
816
817{
818  const s = new PassThrough();
819  assert.throws(() => {
820    pipeline(s, function*(source) {
821    }, () => {});
822  }, (err) => {
823    assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
824    assert.strictEqual(s.destroyed, false);
825    return true;
826  });
827}
828
829{
830  let res = '';
831  pipeline(async function*() {
832    await Promise.resolve();
833    yield 'hello';
834    yield 'world';
835  }, new Transform({
836    transform(chunk, encoding, cb) {
837      cb(new Error('kaboom'));
838    }
839  }), async function(source) {
840    for await (const chunk of source) {
841      res += chunk;
842    }
843  }, common.mustCall((err) => {
844    assert.strictEqual(err.message, 'kaboom');
845    assert.strictEqual(res, '');
846  }));
847}
848
849{
850  let res = '';
851  pipeline(async function*() {
852    await Promise.resolve();
853    yield 'hello';
854    yield 'world';
855  }, new Transform({
856    transform(chunk, encoding, cb) {
857      process.nextTick(cb, new Error('kaboom'));
858    }
859  }), async function(source) {
860    for await (const chunk of source) {
861      res += chunk;
862    }
863  }, common.mustCall((err) => {
864    assert.strictEqual(err.message, 'kaboom');
865    assert.strictEqual(res, '');
866  }));
867}
868
869{
870  let res = '';
871  pipeline(async function*() {
872    await Promise.resolve();
873    yield 'hello';
874    yield 'world';
875  }, new Transform({
876    decodeStrings: false,
877    transform(chunk, encoding, cb) {
878      cb(null, chunk.toUpperCase());
879    }
880  }), async function(source) {
881    for await (const chunk of source) {
882      res += chunk;
883    }
884  }, common.mustSucceed(() => {
885    assert.strictEqual(res, 'HELLOWORLD');
886  }));
887}
888
889{
890  // Ensure no unhandled rejection from async function.
891
892  pipeline(async function*() {
893    yield 'hello';
894  }, async function(source) {
895    throw new Error('kaboom');
896  }, common.mustCall((err) => {
897    assert.strictEqual(err.message, 'kaboom');
898  }));
899}
900
901{
902  const src = new PassThrough({ autoDestroy: false });
903  const dst = new PassThrough({ autoDestroy: false });
904  pipeline(src, dst, common.mustCall(() => {
905    assert.strictEqual(src.destroyed, false);
906    assert.strictEqual(dst.destroyed, false);
907  }));
908  src.end();
909}
910
911{
912  // Make sure 'close' before 'end' finishes without error
913  // if readable has received eof.
914  // Ref: https://github.com/nodejs/node/issues/29699
915  const r = new Readable();
916  const w = new Writable({
917    write(chunk, encoding, cb) {
918      cb();
919    }
920  });
921  pipeline(r, w, (err) => {
922    assert.strictEqual(err, undefined);
923  });
924  r.push('asd');
925  r.push(null);
926  r.emit('close');
927}
928
929{
930  const server = http.createServer((req, res) => {
931  });
932
933  server.listen(0, () => {
934    const req = http.request({
935      port: server.address().port
936    });
937
938    const body = new PassThrough();
939    pipeline(
940      body,
941      req,
942      common.mustSucceed(() => {
943        assert(!req.res);
944        assert(!req.aborted);
945        req.abort();
946        server.close();
947      })
948    );
949    body.end();
950  });
951}
952
953{
954  const src = new PassThrough();
955  const dst = new PassThrough();
956  pipeline(src, dst, common.mustSucceed(() => {
957    assert.strictEqual(dst.destroyed, false);
958  }));
959  src.end();
960}
961
962{
963  const src = new PassThrough();
964  const dst = new PassThrough();
965  dst.readable = false;
966  pipeline(src, dst, common.mustSucceed(() => {
967    assert.strictEqual(dst.destroyed, false);
968  }));
969  src.end();
970}
971
972{
973  let res = '';
974  const rs = new Readable({
975    read() {
976      setImmediate(() => {
977        rs.push('hello');
978      });
979    }
980  });
981  const ws = new Writable({
982    write: common.mustNotCall()
983  });
984  pipeline(rs, async function*(stream) {
985    /* eslint no-unused-vars: off */
986    for await (const chunk of stream) {
987      throw new Error('kaboom');
988    }
989  }, async function *(source) {
990    for await (const chunk of source) {
991      res += chunk;
992    }
993  }, ws, common.mustCall((err) => {
994    assert.strictEqual(err.message, 'kaboom');
995    assert.strictEqual(res, '');
996  }));
997}
998
999{
1000  const server = http.createServer((req, res) => {
1001    req.socket.on('error', common.mustNotCall());
1002    pipeline(req, new PassThrough(), (err) => {
1003      assert.ifError(err);
1004      res.end();
1005      server.close();
1006    });
1007  });
1008
1009  server.listen(0, () => {
1010    const req = http.request({
1011      method: 'PUT',
1012      port: server.address().port
1013    });
1014    req.end('asd123');
1015    req.on('response', common.mustCall());
1016    req.on('error', common.mustNotCall());
1017  });
1018}
1019
1020{
1021  // Might still want to be able to use the writable side
1022  // of src. This is in the case where e.g. the Duplex input
1023  // is not directly connected to its output. Such a case could
1024  // happen when the Duplex is reading from a socket and then echos
1025  // the data back on the same socket.
1026  const src = new PassThrough();
1027  assert.strictEqual(src.writable, true);
1028  const dst = new PassThrough();
1029  pipeline(src, dst, common.mustCall((err) => {
1030    assert.strictEqual(src.writable, true);
1031    assert.strictEqual(src.destroyed, false);
1032  }));
1033  src.push(null);
1034}
1035
1036{
1037  const src = new PassThrough();
1038  const dst = pipeline(
1039    src,
1040    async function * (source) {
1041      for await (const chunk of source) {
1042        yield chunk;
1043      }
1044    },
1045    common.mustCall((err) => {
1046      assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
1047    })
1048  );
1049  src.push('asd');
1050  dst.destroy();
1051}
1052
1053{
1054  pipeline(async function * () {
1055    yield 'asd';
1056  }, async function * (source) {
1057    for await (const chunk of source) {
1058      yield { chunk };
1059    }
1060  }, common.mustSucceed());
1061}
1062
1063{
1064  let closed = false;
1065  const src = new Readable({
1066    read() {},
1067    destroy(err, cb) {
1068      process.nextTick(cb);
1069    }
1070  });
1071  const dst = new Writable({
1072    write(chunk, encoding, callback) {
1073      callback();
1074    }
1075  });
1076  src.on('close', () => {
1077    closed = true;
1078  });
1079  src.push(null);
1080  pipeline(src, dst, common.mustCall((err) => {
1081    assert.strictEqual(closed, true);
1082  }));
1083}
1084
1085{
1086  let closed = false;
1087  const src = new Readable({
1088    read() {},
1089    destroy(err, cb) {
1090      process.nextTick(cb);
1091    }
1092  });
1093  const dst = new Duplex({});
1094  src.on('close', common.mustCall(() => {
1095    closed = true;
1096  }));
1097  src.push(null);
1098  pipeline(src, dst, common.mustCall((err) => {
1099    assert.strictEqual(closed, true);
1100  }));
1101}
1102
1103{
1104  const server = net.createServer(common.mustCall((socket) => {
1105    // echo server
1106    pipeline(socket, socket, common.mustSucceed());
1107    // 13 force destroys the socket before it has a chance to emit finish
1108    socket.on('finish', common.mustCall(() => {
1109      server.close();
1110    }));
1111  })).listen(0, common.mustCall(() => {
1112    const socket = net.connect(server.address().port);
1113    socket.end();
1114  }));
1115}
1116
1117{
1118  const d = new Duplex({
1119    autoDestroy: false,
1120    write: common.mustCall((data, enc, cb) => {
1121      d.push(data);
1122      cb();
1123    }),
1124    read: common.mustCall(() => {
1125      d.push(null);
1126    }),
1127    final: common.mustCall((cb) => {
1128      setTimeout(() => {
1129        assert.strictEqual(d.destroyed, false);
1130        cb();
1131      }, 1000);
1132    }),
1133    destroy: common.mustNotCall()
1134  });
1135
1136  const sink = new Writable({
1137    write: common.mustCall((data, enc, cb) => {
1138      cb();
1139    })
1140  });
1141
1142  pipeline(d, sink, common.mustSucceed());
1143
1144  d.write('test');
1145  d.end();
1146}
1147
1148{
1149  const server = net.createServer(common.mustCall((socket) => {
1150    // echo server
1151    pipeline(socket, socket, common.mustSucceed());
1152    socket.on('finish', common.mustCall(() => {
1153      server.close();
1154    }));
1155  })).listen(0, common.mustCall(() => {
1156    const socket = net.connect(server.address().port);
1157    socket.end();
1158  }));
1159}
1160
1161{
1162  const d = new Duplex({
1163    autoDestroy: false,
1164    write: common.mustCall((data, enc, cb) => {
1165      d.push(data);
1166      cb();
1167    }),
1168    read: common.mustCall(() => {
1169      d.push(null);
1170    }),
1171    final: common.mustCall((cb) => {
1172      setTimeout(() => {
1173        assert.strictEqual(d.destroyed, false);
1174        cb();
1175      }, 1000);
1176    }),
1177    // `destroy()` won't be invoked by pipeline since
1178    // the writable side has not completed when
1179    // the pipeline has completed.
1180    destroy: common.mustNotCall()
1181  });
1182
1183  const sink = new Writable({
1184    write: common.mustCall((data, enc, cb) => {
1185      cb();
1186    })
1187  });
1188
1189  pipeline(d, sink, common.mustSucceed());
1190
1191  d.write('test');
1192  d.end();
1193}
1194
1195{
1196  const r = new Readable({
1197    read() {}
1198  });
1199  r.push('hello');
1200  r.push('world');
1201  r.push(null);
1202  let res = '';
1203  const w = new Writable({
1204    write(chunk, encoding, callback) {
1205      res += chunk;
1206      callback();
1207    }
1208  });
1209  pipeline([r, w], common.mustSucceed(() => {
1210    assert.strictEqual(res, 'helloworld');
1211  }));
1212}
1213
1214{
1215  pipeline([1, 2, 3], PassThrough({ objectMode: true }),
1216           common.mustSucceed(() => {}));
1217
1218  let res = '';
1219  const w = new Writable({
1220    write(chunk, encoding, callback) {
1221      res += chunk;
1222      callback();
1223    },
1224  });
1225  pipeline(['1', '2', '3'], w, common.mustSucceed(() => {
1226    assert.strictEqual(res, '123');
1227  }));
1228}
1229{
1230  function createThenable() {
1231    let counter = 0;
1232    return {
1233      get then() {
1234        if (counter++) {
1235          throw new Error('Cannot access `then` more than once');
1236        }
1237        return Function.prototype;
1238      },
1239    };
1240  }
1241
1242  pipeline(
1243    function* () {
1244      yield 0;
1245    },
1246    createThenable,
1247    () => common.mustNotCall(),
1248  );
1249}
1250
1251{
1252  const content = 'abc';
1253  pipeline(Buffer.from(content), PassThrough({ objectMode: true }),
1254           common.mustSucceed(() => {}));
1255
1256  let res = '';
1257  pipeline(Buffer.from(content), async function*(previous) {
1258    for await (const val of previous) {
1259      res += String.fromCharCode(val);
1260      yield val;
1261    }
1262  }, common.mustSucceed(() => {
1263    assert.strictEqual(res, content);
1264  }));
1265}
1266