• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23
24const common = require('../common');
25const R = require('_stream_readable');
26const W = require('_stream_writable');
27const assert = require('assert');
28
29const EE = require('events').EventEmitter;
30
31class TestReader extends R {
32  constructor(n) {
33    super();
34    this._buffer = Buffer.alloc(n || 100, 'x');
35    this._pos = 0;
36    this._bufs = 10;
37  }
38
39  _read(n) {
40    const max = this._buffer.length - this._pos;
41    n = Math.max(n, 0);
42    const toRead = Math.min(n, max);
43    if (toRead === 0) {
44      // Simulate the read buffer filling up with some more bytes some time
45      // in the future.
46      setTimeout(() => {
47        this._pos = 0;
48        this._bufs -= 1;
49        if (this._bufs <= 0) {
50          // read them all!
51          if (!this.ended)
52            this.push(null);
53        } else {
54          // now we have more.
55          // kinda cheating by calling _read, but whatever,
56          // it's just fake anyway.
57          this._read(n);
58        }
59      }, 10);
60      return;
61    }
62
63    const ret = this._buffer.slice(this._pos, this._pos + toRead);
64    this._pos += toRead;
65    this.push(ret);
66  }
67}
68
69class TestWriter extends EE {
70  constructor() {
71    super();
72    this.received = [];
73    this.flush = false;
74  }
75
76  write(c) {
77    this.received.push(c.toString());
78    this.emit('write', c);
79    return true;
80  }
81
82  end(c) {
83    if (c) this.write(c);
84    this.emit('end', this.received);
85  }
86}
87
88{
89  // Test basic functionality
90  const r = new TestReader(20);
91
92  const reads = [];
93  const expect = [ 'x',
94                   'xx',
95                   'xxx',
96                   'xxxx',
97                   'xxxxx',
98                   'xxxxxxxxx',
99                   'xxxxxxxxxx',
100                   'xxxxxxxxxxxx',
101                   'xxxxxxxxxxxxx',
102                   'xxxxxxxxxxxxxxx',
103                   'xxxxxxxxxxxxxxxxx',
104                   'xxxxxxxxxxxxxxxxxxx',
105                   'xxxxxxxxxxxxxxxxxxxxx',
106                   'xxxxxxxxxxxxxxxxxxxxxxx',
107                   'xxxxxxxxxxxxxxxxxxxxxxxxx',
108                   'xxxxxxxxxxxxxxxxxxxxx' ];
109
110  r.on('end', common.mustCall(function() {
111    assert.deepStrictEqual(reads, expect);
112  }));
113
114  let readSize = 1;
115  function flow() {
116    let res;
117    while (null !== (res = r.read(readSize++))) {
118      reads.push(res.toString());
119    }
120    r.once('readable', flow);
121  }
122
123  flow();
124}
125
126{
127  // Verify pipe
128  const r = new TestReader(5);
129
130  const expect = [ 'xxxxx',
131                   'xxxxx',
132                   'xxxxx',
133                   'xxxxx',
134                   'xxxxx',
135                   'xxxxx',
136                   'xxxxx',
137                   'xxxxx',
138                   'xxxxx',
139                   'xxxxx' ];
140
141  const w = new TestWriter();
142
143  w.on('end', common.mustCall(function(received) {
144    assert.deepStrictEqual(received, expect);
145  }));
146
147  r.pipe(w);
148}
149
150
151[1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(function(SPLIT) {
152  // Verify unpipe
153  const r = new TestReader(5);
154
155  // Unpipe after 3 writes, then write to another stream instead.
156  let expect = [ 'xxxxx',
157                 'xxxxx',
158                 'xxxxx',
159                 'xxxxx',
160                 'xxxxx',
161                 'xxxxx',
162                 'xxxxx',
163                 'xxxxx',
164                 'xxxxx',
165                 'xxxxx' ];
166  expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ];
167
168  const w = [ new TestWriter(), new TestWriter() ];
169
170  let writes = SPLIT;
171  w[0].on('write', function() {
172    if (--writes === 0) {
173      r.unpipe();
174      assert.deepStrictEqual(r._readableState.pipes, []);
175      w[0].end();
176      r.pipe(w[1]);
177      assert.deepStrictEqual(r._readableState.pipes, [w[1]]);
178    }
179  });
180
181  let ended = 0;
182
183  w[0].on('end', common.mustCall(function(results) {
184    ended++;
185    assert.strictEqual(ended, 1);
186    assert.deepStrictEqual(results, expect[0]);
187  }));
188
189  w[1].on('end', common.mustCall(function(results) {
190    ended++;
191    assert.strictEqual(ended, 2);
192    assert.deepStrictEqual(results, expect[1]);
193  }));
194
195  r.pipe(w[0]);
196});
197
198
199{
200  // Verify both writers get the same data when piping to destinations
201  const r = new TestReader(5);
202  const w = [ new TestWriter(), new TestWriter() ];
203
204  const expect = [ 'xxxxx',
205                   'xxxxx',
206                   'xxxxx',
207                   'xxxxx',
208                   'xxxxx',
209                   'xxxxx',
210                   'xxxxx',
211                   'xxxxx',
212                   'xxxxx',
213                   'xxxxx' ];
214
215  w[0].on('end', common.mustCall(function(received) {
216    assert.deepStrictEqual(received, expect);
217  }));
218  w[1].on('end', common.mustCall(function(received) {
219    assert.deepStrictEqual(received, expect);
220  }));
221
222  r.pipe(w[0]);
223  r.pipe(w[1]);
224}
225
226
227[1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(function(SPLIT) {
228  // Verify multi-unpipe
229  const r = new TestReader(5);
230
231  // Unpipe after 3 writes, then write to another stream instead.
232  let expect = [ 'xxxxx',
233                 'xxxxx',
234                 'xxxxx',
235                 'xxxxx',
236                 'xxxxx',
237                 'xxxxx',
238                 'xxxxx',
239                 'xxxxx',
240                 'xxxxx',
241                 'xxxxx' ];
242  expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ];
243
244  const w = [ new TestWriter(), new TestWriter(), new TestWriter() ];
245
246  let writes = SPLIT;
247  w[0].on('write', function() {
248    if (--writes === 0) {
249      r.unpipe();
250      w[0].end();
251      r.pipe(w[1]);
252    }
253  });
254
255  let ended = 0;
256
257  w[0].on('end', common.mustCall(function(results) {
258    ended++;
259    assert.strictEqual(ended, 1);
260    assert.deepStrictEqual(results, expect[0]);
261  }));
262
263  w[1].on('end', common.mustCall(function(results) {
264    ended++;
265    assert.strictEqual(ended, 2);
266    assert.deepStrictEqual(results, expect[1]);
267  }));
268
269  r.pipe(w[0]);
270  r.pipe(w[2]);
271});
272
273{
274  // Verify that back pressure is respected
275  const r = new R({ objectMode: true });
276  r._read = common.mustNotCall();
277  let counter = 0;
278  r.push(['one']);
279  r.push(['two']);
280  r.push(['three']);
281  r.push(['four']);
282  r.push(null);
283
284  const w1 = new R();
285  w1.write = function(chunk) {
286    assert.strictEqual(chunk[0], 'one');
287    w1.emit('close');
288    process.nextTick(function() {
289      r.pipe(w2);
290      r.pipe(w3);
291    });
292  };
293  w1.end = common.mustNotCall();
294
295  r.pipe(w1);
296
297  const expected = ['two', 'two', 'three', 'three', 'four', 'four'];
298
299  const w2 = new R();
300  w2.write = function(chunk) {
301    assert.strictEqual(chunk[0], expected.shift());
302    assert.strictEqual(counter, 0);
303
304    counter++;
305
306    if (chunk[0] === 'four') {
307      return true;
308    }
309
310    setTimeout(function() {
311      counter--;
312      w2.emit('drain');
313    }, 10);
314
315    return false;
316  };
317  w2.end = common.mustCall();
318
319  const w3 = new R();
320  w3.write = function(chunk) {
321    assert.strictEqual(chunk[0], expected.shift());
322    assert.strictEqual(counter, 1);
323
324    counter++;
325
326    if (chunk[0] === 'four') {
327      return true;
328    }
329
330    setTimeout(function() {
331      counter--;
332      w3.emit('drain');
333    }, 50);
334
335    return false;
336  };
337  w3.end = common.mustCall(function() {
338    assert.strictEqual(counter, 2);
339    assert.strictEqual(expected.length, 0);
340  });
341}
342
343{
344  // Verify read(0) behavior for ended streams
345  const r = new R();
346  let written = false;
347  let ended = false;
348  r._read = common.mustNotCall();
349
350  r.push(Buffer.from('foo'));
351  r.push(null);
352
353  const v = r.read(0);
354
355  assert.strictEqual(v, null);
356
357  const w = new R();
358  w.write = function(buffer) {
359    written = true;
360    assert.strictEqual(ended, false);
361    assert.strictEqual(buffer.toString(), 'foo');
362  };
363
364  w.end = common.mustCall(function() {
365    ended = true;
366    assert.strictEqual(written, true);
367  });
368
369  r.pipe(w);
370}
371
372{
373  // Verify synchronous _read ending
374  const r = new R();
375  let called = false;
376  r._read = function(n) {
377    r.push(null);
378  };
379
380  r.once('end', function() {
381    // Verify that this is called before the next tick
382    called = true;
383  });
384
385  r.read();
386
387  process.nextTick(function() {
388    assert.strictEqual(called, true);
389  });
390}
391
392{
393  // Verify that adding readable listeners trigger data flow
394  const r = new R({ highWaterMark: 5 });
395  let onReadable = false;
396  let readCalled = 0;
397
398  r._read = function(n) {
399    if (readCalled++ === 2)
400      r.push(null);
401    else
402      r.push(Buffer.from('asdf'));
403  };
404
405  r.on('readable', function() {
406    onReadable = true;
407    r.read();
408  });
409
410  r.on('end', common.mustCall(function() {
411    assert.strictEqual(readCalled, 3);
412    assert.ok(onReadable);
413  }));
414}
415
416{
417  // Verify that streams are chainable
418  const r = new R();
419  r._read = common.mustCall();
420  const r2 = r.setEncoding('utf8').pause().resume().pause();
421  assert.strictEqual(r, r2);
422}
423
424{
425  // Verify readableEncoding property
426  assert(R.prototype.hasOwnProperty('readableEncoding'));
427
428  const r = new R({ encoding: 'utf8' });
429  assert.strictEqual(r.readableEncoding, 'utf8');
430}
431
432{
433  // Verify readableObjectMode property
434  assert(R.prototype.hasOwnProperty('readableObjectMode'));
435
436  const r = new R({ objectMode: true });
437  assert.strictEqual(r.readableObjectMode, true);
438}
439
440{
441  // Verify writableObjectMode property
442  assert(W.prototype.hasOwnProperty('writableObjectMode'));
443
444  const w = new W({ objectMode: true });
445  assert.strictEqual(w.writableObjectMode, true);
446}
447