• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2const common = require('../common');
3const stream = require('stream');
4const assert = require('assert');
5
6const readable = new stream.Readable({
7  read: () => {}
8});
9
10const writables = [];
11
12for (let i = 0; i < 5; i++) {
13  const target = new stream.Writable({
14    write: common.mustCall((chunk, encoding, callback) => {
15      target.output.push(chunk);
16      callback();
17    }, 1)
18  });
19  target.output = [];
20
21  target.on('pipe', common.mustCall());
22  readable.pipe(target);
23
24
25  writables.push(target);
26}
27
28const input = Buffer.from([1, 2, 3, 4, 5]);
29
30readable.push(input);
31
32// The pipe() calls will postpone emission of the 'resume' event using nextTick,
33// so no data will be available to the writable streams until then.
34process.nextTick(common.mustCall(() => {
35  for (const target of writables) {
36    assert.deepStrictEqual(target.output, [input]);
37
38    target.on('unpipe', common.mustCall());
39    readable.unpipe(target);
40  }
41
42  readable.push('something else'); // This does not get through.
43  readable.push(null);
44  readable.resume(); // Make sure the 'end' event gets emitted.
45}));
46
47readable.on('end', common.mustCall(() => {
48  for (const target of writables) {
49    assert.deepStrictEqual(target.output, [input]);
50  }
51}));
52