• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const common = require('../common');
4const assert = require('assert');
5const { Readable } = require('stream');
6
7let ticks = 18;
8let expectedData = 19;
9
10const rs = new Readable({
11  objectMode: true,
12  read: () => {
13    if (ticks-- > 0)
14      return process.nextTick(() => rs.push({}));
15    rs.push({});
16    rs.push(null);
17  }
18});
19
20rs.on('end', common.mustCall());
21readAndPause();
22
23function readAndPause() {
24  // Does a on(data) -> pause -> wait -> resume -> on(data) ... loop.
25  // Expects on(data) to never fire if the stream is paused.
26  const ondata = common.mustCall((data) => {
27    rs.pause();
28
29    expectedData--;
30    if (expectedData <= 0)
31      return;
32
33    setImmediate(function() {
34      rs.removeListener('data', ondata);
35      readAndPause();
36      rs.resume();
37    });
38  }, 1); // Only call ondata once
39
40  rs.on('data', ondata);
41}
42
43{
44  const readable = new Readable({
45    read() {}
46  });
47
48  function read() {}
49
50  readable.setEncoding('utf8');
51  readable.on('readable', read);
52  readable.removeListener('readable', read);
53  readable.pause();
54
55  process.nextTick(function() {
56    assert(readable.isPaused());
57  });
58}
59
60{
61  const { PassThrough } = require('stream');
62
63  const source3 = new PassThrough();
64  const target3 = new PassThrough();
65
66  const chunk = Buffer.allocUnsafe(1000);
67  while (target3.write(chunk));
68
69  source3.pipe(target3);
70  target3.on('drain', common.mustCall(() => {
71    assert(!source3.isPaused());
72  }));
73  target3.on('data', () => {});
74}
75