1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23require('../common'); 24const assert = require('assert'); 25 26const stream = require('stream'); 27const Readable = stream.Readable; 28const Writable = stream.Writable; 29 30const totalChunks = 100; 31const chunkSize = 99; 32const expectTotalData = totalChunks * chunkSize; 33let expectEndingData = expectTotalData; 34 35const r = new Readable({ highWaterMark: 1000 }); 36let chunks = totalChunks; 37r._read = function(n) { 38 console.log('_read called', chunks); 39 if (!(chunks % 2)) 40 setImmediate(push); 41 else if (!(chunks % 3)) 42 process.nextTick(push); 43 else 44 push(); 45}; 46 47let totalPushed = 0; 48function push() { 49 const chunk = chunks-- > 0 ? Buffer.alloc(chunkSize, 'x') : null; 50 if (chunk) { 51 totalPushed += chunk.length; 52 } 53 console.log('chunks', chunks); 54 r.push(chunk); 55} 56 57read100(); 58 59// First we read 100 bytes. 60function read100() { 61 readn(100, onData); 62} 63 64function readn(n, then) { 65 console.error(`read ${n}`); 66 expectEndingData -= n; 67 (function read() { 68 const c = r.read(n); 69 console.error('c', c); 70 if (!c) 71 r.once('readable', read); 72 else { 73 assert.strictEqual(c.length, n); 74 assert(!r.readableFlowing); 75 then(); 76 } 77 })(); 78} 79 80// Then we listen to some data events. 81function onData() { 82 expectEndingData -= 100; 83 console.error('onData'); 84 let seen = 0; 85 r.on('data', function od(c) { 86 seen += c.length; 87 if (seen >= 100) { 88 // Seen enough 89 r.removeListener('data', od); 90 r.pause(); 91 if (seen > 100) { 92 // Oh no, seen too much! 93 // Put the extra back. 94 const diff = seen - 100; 95 r.unshift(c.slice(c.length - diff)); 96 console.error('seen too much', seen, diff); 97 } 98 99 // Nothing should be lost in-between. 100 setImmediate(pipeLittle); 101 } 102 }); 103} 104 105// Just pipe 200 bytes, then unshift the extra and unpipe. 106function pipeLittle() { 107 expectEndingData -= 200; 108 console.error('pipe a little'); 109 const w = new Writable(); 110 let written = 0; 111 w.on('finish', () => { 112 assert.strictEqual(written, 200); 113 setImmediate(read1234); 114 }); 115 w._write = function(chunk, encoding, cb) { 116 written += chunk.length; 117 if (written >= 200) { 118 r.unpipe(w); 119 w.end(); 120 cb(); 121 if (written > 200) { 122 const diff = written - 200; 123 written -= diff; 124 r.unshift(chunk.slice(chunk.length - diff)); 125 } 126 } else { 127 setImmediate(cb); 128 } 129 }; 130 r.pipe(w); 131} 132 133// Now read 1234 more bytes. 134function read1234() { 135 readn(1234, resumePause); 136} 137 138function resumePause() { 139 console.error('resumePause'); 140 // Don't read anything, just resume and re-pause a whole bunch. 141 r.resume(); 142 r.pause(); 143 r.resume(); 144 r.pause(); 145 r.resume(); 146 r.pause(); 147 r.resume(); 148 r.pause(); 149 r.resume(); 150 r.pause(); 151 setImmediate(pipe); 152} 153 154 155function pipe() { 156 console.error('pipe the rest'); 157 const w = new Writable(); 158 let written = 0; 159 w._write = function(chunk, encoding, cb) { 160 written += chunk.length; 161 cb(); 162 }; 163 w.on('finish', () => { 164 console.error('written', written, totalPushed); 165 assert.strictEqual(written, expectEndingData); 166 assert.strictEqual(totalPushed, expectTotalData); 167 console.log('ok'); 168 }); 169 r.pipe(w); 170} 171