• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23// Test that having a bunch of streams piping in parallel
24// doesn't break anything.
25
26require('../common');
27const assert = require('assert');
28const Stream = require('stream').Stream;
29const rr = [];
30const ww = [];
31const cnt = 100;
32const chunks = 1000;
33const chunkSize = 250;
34const data = Buffer.allocUnsafe(chunkSize);
35let wclosed = 0;
36let rclosed = 0;
37
38function FakeStream() {
39  Stream.apply(this);
40  this.wait = false;
41  this.writable = true;
42  this.readable = true;
43}
44
45FakeStream.prototype = Object.create(Stream.prototype);
46
47FakeStream.prototype.write = function(chunk) {
48  console.error(this.ID, 'write', this.wait);
49  if (this.wait) {
50    process.nextTick(this.emit.bind(this, 'drain'));
51  }
52  this.wait = !this.wait;
53  return this.wait;
54};
55
56FakeStream.prototype.end = function() {
57  this.emit('end');
58  process.nextTick(this.close.bind(this));
59};
60
61// noop - closes happen automatically on end.
62FakeStream.prototype.close = function() {
63  this.emit('close');
64};
65
66
67// Expect all streams to close properly.
68process.on('exit', function() {
69  assert.strictEqual(wclosed, cnt);
70  assert.strictEqual(rclosed, cnt);
71});
72
73for (let i = 0; i < chunkSize; i++) {
74  data[i] = i;
75}
76
77for (let i = 0; i < cnt; i++) {
78  const r = new FakeStream();
79  r.on('close', function() {
80    console.error(this.ID, 'read close');
81    rclosed++;
82  });
83  rr.push(r);
84
85  const w = new FakeStream();
86  w.on('close', function() {
87    console.error(this.ID, 'write close');
88    wclosed++;
89  });
90  ww.push(w);
91
92  r.ID = w.ID = i;
93  r.pipe(w);
94}
95
96// Now start passing through data.
97// Simulate a relatively fast async stream.
98rr.forEach(function(r) {
99  let cnt = chunks;
100  let paused = false;
101
102  r.on('pause', function() {
103    paused = true;
104  });
105
106  r.on('resume', function() {
107    paused = false;
108    step();
109  });
110
111  function step() {
112    r.emit('data', data);
113    if (--cnt === 0) {
114      r.end();
115      return;
116    }
117    if (paused) return;
118    process.nextTick(step);
119  }
120
121  process.nextTick(step);
122});
123