• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23require('../common');
24const assert = require('assert');
25
26const stream = require('stream');
27const Readable = stream.Readable;
28const Writable = stream.Writable;
29
30const totalChunks = 100;
31const chunkSize = 99;
32const expectTotalData = totalChunks * chunkSize;
33let expectEndingData = expectTotalData;
34
35const r = new Readable({ highWaterMark: 1000 });
36let chunks = totalChunks;
37r._read = function(n) {
38  console.log('_read called', chunks);
39  if (!(chunks % 2))
40    setImmediate(push);
41  else if (!(chunks % 3))
42    process.nextTick(push);
43  else
44    push();
45};
46
47let totalPushed = 0;
48function push() {
49  const chunk = chunks-- > 0 ? Buffer.alloc(chunkSize, 'x') : null;
50  if (chunk) {
51    totalPushed += chunk.length;
52  }
53  console.log('chunks', chunks);
54  r.push(chunk);
55}
56
57read100();
58
59// First we read 100 bytes.
60function read100() {
61  readn(100, onData);
62}
63
64function readn(n, then) {
65  console.error(`read ${n}`);
66  expectEndingData -= n;
67  (function read() {
68    const c = r.read(n);
69    console.error('c', c);
70    if (!c)
71      r.once('readable', read);
72    else {
73      assert.strictEqual(c.length, n);
74      assert(!r.readableFlowing);
75      then();
76    }
77  })();
78}
79
80// Then we listen to some data events.
81function onData() {
82  expectEndingData -= 100;
83  console.error('onData');
84  let seen = 0;
85  r.on('data', function od(c) {
86    seen += c.length;
87    if (seen >= 100) {
88      // Seen enough
89      r.removeListener('data', od);
90      r.pause();
91      if (seen > 100) {
92        // Oh no, seen too much!
93        // Put the extra back.
94        const diff = seen - 100;
95        r.unshift(c.slice(c.length - diff));
96        console.error('seen too much', seen, diff);
97      }
98
99      // Nothing should be lost in-between.
100      setImmediate(pipeLittle);
101    }
102  });
103}
104
105// Just pipe 200 bytes, then unshift the extra and unpipe.
106function pipeLittle() {
107  expectEndingData -= 200;
108  console.error('pipe a little');
109  const w = new Writable();
110  let written = 0;
111  w.on('finish', () => {
112    assert.strictEqual(written, 200);
113    setImmediate(read1234);
114  });
115  w._write = function(chunk, encoding, cb) {
116    written += chunk.length;
117    if (written >= 200) {
118      r.unpipe(w);
119      w.end();
120      cb();
121      if (written > 200) {
122        const diff = written - 200;
123        written -= diff;
124        r.unshift(chunk.slice(chunk.length - diff));
125      }
126    } else {
127      setImmediate(cb);
128    }
129  };
130  r.pipe(w);
131}
132
133// Now read 1234 more bytes.
134function read1234() {
135  readn(1234, resumePause);
136}
137
138function resumePause() {
139  console.error('resumePause');
140  // Don't read anything, just resume and re-pause a whole bunch.
141  r.resume();
142  r.pause();
143  r.resume();
144  r.pause();
145  r.resume();
146  r.pause();
147  r.resume();
148  r.pause();
149  r.resume();
150  r.pause();
151  setImmediate(pipe);
152}
153
154
155function pipe() {
156  console.error('pipe the rest');
157  const w = new Writable();
158  let written = 0;
159  w._write = function(chunk, encoding, cb) {
160    written += chunk.length;
161    cb();
162  };
163  w.on('finish', () => {
164    console.error('written', written, totalPushed);
165    assert.strictEqual(written, expectEndingData);
166    assert.strictEqual(totalPushed, expectTotalData);
167    console.log('ok');
168  });
169  r.pipe(w);
170}
171