1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23// Test that having a bunch of streams piping in parallel 24// doesn't break anything. 25 26require('../common'); 27const assert = require('assert'); 28const Stream = require('stream').Stream; 29const rr = []; 30const ww = []; 31const cnt = 100; 32const chunks = 1000; 33const chunkSize = 250; 34const data = Buffer.allocUnsafe(chunkSize); 35let wclosed = 0; 36let rclosed = 0; 37 38function FakeStream() { 39 Stream.apply(this); 40 this.wait = false; 41 this.writable = true; 42 this.readable = true; 43} 44 45FakeStream.prototype = Object.create(Stream.prototype); 46 47FakeStream.prototype.write = function(chunk) { 48 console.error(this.ID, 'write', this.wait); 49 if (this.wait) { 50 process.nextTick(this.emit.bind(this, 'drain')); 51 } 52 this.wait = !this.wait; 53 return this.wait; 54}; 55 56FakeStream.prototype.end = function() { 57 this.emit('end'); 58 process.nextTick(this.close.bind(this)); 59}; 60 61// noop - closes happen automatically on end. 62FakeStream.prototype.close = function() { 63 this.emit('close'); 64}; 65 66 67// Expect all streams to close properly. 68process.on('exit', function() { 69 assert.strictEqual(wclosed, cnt); 70 assert.strictEqual(rclosed, cnt); 71}); 72 73for (let i = 0; i < chunkSize; i++) { 74 data[i] = i; 75} 76 77for (let i = 0; i < cnt; i++) { 78 const r = new FakeStream(); 79 r.on('close', function() { 80 console.error(this.ID, 'read close'); 81 rclosed++; 82 }); 83 rr.push(r); 84 85 const w = new FakeStream(); 86 w.on('close', function() { 87 console.error(this.ID, 'write close'); 88 wclosed++; 89 }); 90 ww.push(w); 91 92 r.ID = w.ID = i; 93 r.pipe(w); 94} 95 96// Now start passing through data. 97// Simulate a relatively fast async stream. 98rr.forEach(function(r) { 99 let cnt = chunks; 100 let paused = false; 101 102 r.on('pause', function() { 103 paused = true; 104 }); 105 106 r.on('resume', function() { 107 paused = false; 108 step(); 109 }); 110 111 function step() { 112 r.emit('data', data); 113 if (--cnt === 0) { 114 r.end(); 115 return; 116 } 117 if (paused) return; 118 process.nextTick(step); 119 } 120 121 process.nextTick(step); 122}); 123