1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23 24const common = require('../common'); 25const R = require('_stream_readable'); 26const W = require('_stream_writable'); 27const assert = require('assert'); 28 29const EE = require('events').EventEmitter; 30 31class TestReader extends R { 32 constructor(n) { 33 super(); 34 this._buffer = Buffer.alloc(n || 100, 'x'); 35 this._pos = 0; 36 this._bufs = 10; 37 } 38 39 _read(n) { 40 const max = this._buffer.length - this._pos; 41 n = Math.max(n, 0); 42 const toRead = Math.min(n, max); 43 if (toRead === 0) { 44 // Simulate the read buffer filling up with some more bytes some time 45 // in the future. 46 setTimeout(() => { 47 this._pos = 0; 48 this._bufs -= 1; 49 if (this._bufs <= 0) { 50 // read them all! 51 if (!this.ended) 52 this.push(null); 53 } else { 54 // now we have more. 55 // kinda cheating by calling _read, but whatever, 56 // it's just fake anyway. 57 this._read(n); 58 } 59 }, 10); 60 return; 61 } 62 63 const ret = this._buffer.slice(this._pos, this._pos + toRead); 64 this._pos += toRead; 65 this.push(ret); 66 } 67} 68 69class TestWriter extends EE { 70 constructor() { 71 super(); 72 this.received = []; 73 this.flush = false; 74 } 75 76 write(c) { 77 this.received.push(c.toString()); 78 this.emit('write', c); 79 return true; 80 } 81 82 end(c) { 83 if (c) this.write(c); 84 this.emit('end', this.received); 85 } 86} 87 88{ 89 // Test basic functionality 90 const r = new TestReader(20); 91 92 const reads = []; 93 const expect = [ 'x', 94 'xx', 95 'xxx', 96 'xxxx', 97 'xxxxx', 98 'xxxxxxxxx', 99 'xxxxxxxxxx', 100 'xxxxxxxxxxxx', 101 'xxxxxxxxxxxxx', 102 'xxxxxxxxxxxxxxx', 103 'xxxxxxxxxxxxxxxxx', 104 'xxxxxxxxxxxxxxxxxxx', 105 'xxxxxxxxxxxxxxxxxxxxx', 106 'xxxxxxxxxxxxxxxxxxxxxxx', 107 'xxxxxxxxxxxxxxxxxxxxxxxxx', 108 'xxxxxxxxxxxxxxxxxxxxx' ]; 109 110 r.on('end', common.mustCall(function() { 111 assert.deepStrictEqual(reads, expect); 112 })); 113 114 let readSize = 1; 115 function flow() { 116 let res; 117 while (null !== (res = r.read(readSize++))) { 118 reads.push(res.toString()); 119 } 120 r.once('readable', flow); 121 } 122 123 flow(); 124} 125 126{ 127 // Verify pipe 128 const r = new TestReader(5); 129 130 const expect = [ 'xxxxx', 131 'xxxxx', 132 'xxxxx', 133 'xxxxx', 134 'xxxxx', 135 'xxxxx', 136 'xxxxx', 137 'xxxxx', 138 'xxxxx', 139 'xxxxx' ]; 140 141 const w = new TestWriter(); 142 143 w.on('end', common.mustCall(function(received) { 144 assert.deepStrictEqual(received, expect); 145 })); 146 147 r.pipe(w); 148} 149 150 151[1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(function(SPLIT) { 152 // Verify unpipe 153 const r = new TestReader(5); 154 155 // Unpipe after 3 writes, then write to another stream instead. 156 let expect = [ 'xxxxx', 157 'xxxxx', 158 'xxxxx', 159 'xxxxx', 160 'xxxxx', 161 'xxxxx', 162 'xxxxx', 163 'xxxxx', 164 'xxxxx', 165 'xxxxx' ]; 166 expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ]; 167 168 const w = [ new TestWriter(), new TestWriter() ]; 169 170 let writes = SPLIT; 171 w[0].on('write', function() { 172 if (--writes === 0) { 173 r.unpipe(); 174 assert.deepStrictEqual(r._readableState.pipes, []); 175 w[0].end(); 176 r.pipe(w[1]); 177 assert.deepStrictEqual(r._readableState.pipes, [w[1]]); 178 } 179 }); 180 181 let ended = 0; 182 183 w[0].on('end', common.mustCall(function(results) { 184 ended++; 185 assert.strictEqual(ended, 1); 186 assert.deepStrictEqual(results, expect[0]); 187 })); 188 189 w[1].on('end', common.mustCall(function(results) { 190 ended++; 191 assert.strictEqual(ended, 2); 192 assert.deepStrictEqual(results, expect[1]); 193 })); 194 195 r.pipe(w[0]); 196}); 197 198 199{ 200 // Verify both writers get the same data when piping to destinations 201 const r = new TestReader(5); 202 const w = [ new TestWriter(), new TestWriter() ]; 203 204 const expect = [ 'xxxxx', 205 'xxxxx', 206 'xxxxx', 207 'xxxxx', 208 'xxxxx', 209 'xxxxx', 210 'xxxxx', 211 'xxxxx', 212 'xxxxx', 213 'xxxxx' ]; 214 215 w[0].on('end', common.mustCall(function(received) { 216 assert.deepStrictEqual(received, expect); 217 })); 218 w[1].on('end', common.mustCall(function(received) { 219 assert.deepStrictEqual(received, expect); 220 })); 221 222 r.pipe(w[0]); 223 r.pipe(w[1]); 224} 225 226 227[1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(function(SPLIT) { 228 // Verify multi-unpipe 229 const r = new TestReader(5); 230 231 // Unpipe after 3 writes, then write to another stream instead. 232 let expect = [ 'xxxxx', 233 'xxxxx', 234 'xxxxx', 235 'xxxxx', 236 'xxxxx', 237 'xxxxx', 238 'xxxxx', 239 'xxxxx', 240 'xxxxx', 241 'xxxxx' ]; 242 expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ]; 243 244 const w = [ new TestWriter(), new TestWriter(), new TestWriter() ]; 245 246 let writes = SPLIT; 247 w[0].on('write', function() { 248 if (--writes === 0) { 249 r.unpipe(); 250 w[0].end(); 251 r.pipe(w[1]); 252 } 253 }); 254 255 let ended = 0; 256 257 w[0].on('end', common.mustCall(function(results) { 258 ended++; 259 assert.strictEqual(ended, 1); 260 assert.deepStrictEqual(results, expect[0]); 261 })); 262 263 w[1].on('end', common.mustCall(function(results) { 264 ended++; 265 assert.strictEqual(ended, 2); 266 assert.deepStrictEqual(results, expect[1]); 267 })); 268 269 r.pipe(w[0]); 270 r.pipe(w[2]); 271}); 272 273{ 274 // Verify that back pressure is respected 275 const r = new R({ objectMode: true }); 276 r._read = common.mustNotCall(); 277 let counter = 0; 278 r.push(['one']); 279 r.push(['two']); 280 r.push(['three']); 281 r.push(['four']); 282 r.push(null); 283 284 const w1 = new R(); 285 w1.write = function(chunk) { 286 assert.strictEqual(chunk[0], 'one'); 287 w1.emit('close'); 288 process.nextTick(function() { 289 r.pipe(w2); 290 r.pipe(w3); 291 }); 292 }; 293 w1.end = common.mustNotCall(); 294 295 r.pipe(w1); 296 297 const expected = ['two', 'two', 'three', 'three', 'four', 'four']; 298 299 const w2 = new R(); 300 w2.write = function(chunk) { 301 assert.strictEqual(chunk[0], expected.shift()); 302 assert.strictEqual(counter, 0); 303 304 counter++; 305 306 if (chunk[0] === 'four') { 307 return true; 308 } 309 310 setTimeout(function() { 311 counter--; 312 w2.emit('drain'); 313 }, 10); 314 315 return false; 316 }; 317 w2.end = common.mustCall(); 318 319 const w3 = new R(); 320 w3.write = function(chunk) { 321 assert.strictEqual(chunk[0], expected.shift()); 322 assert.strictEqual(counter, 1); 323 324 counter++; 325 326 if (chunk[0] === 'four') { 327 return true; 328 } 329 330 setTimeout(function() { 331 counter--; 332 w3.emit('drain'); 333 }, 50); 334 335 return false; 336 }; 337 w3.end = common.mustCall(function() { 338 assert.strictEqual(counter, 2); 339 assert.strictEqual(expected.length, 0); 340 }); 341} 342 343{ 344 // Verify read(0) behavior for ended streams 345 const r = new R(); 346 let written = false; 347 let ended = false; 348 r._read = common.mustNotCall(); 349 350 r.push(Buffer.from('foo')); 351 r.push(null); 352 353 const v = r.read(0); 354 355 assert.strictEqual(v, null); 356 357 const w = new R(); 358 w.write = function(buffer) { 359 written = true; 360 assert.strictEqual(ended, false); 361 assert.strictEqual(buffer.toString(), 'foo'); 362 }; 363 364 w.end = common.mustCall(function() { 365 ended = true; 366 assert.strictEqual(written, true); 367 }); 368 369 r.pipe(w); 370} 371 372{ 373 // Verify synchronous _read ending 374 const r = new R(); 375 let called = false; 376 r._read = function(n) { 377 r.push(null); 378 }; 379 380 r.once('end', function() { 381 // Verify that this is called before the next tick 382 called = true; 383 }); 384 385 r.read(); 386 387 process.nextTick(function() { 388 assert.strictEqual(called, true); 389 }); 390} 391 392{ 393 // Verify that adding readable listeners trigger data flow 394 const r = new R({ highWaterMark: 5 }); 395 let onReadable = false; 396 let readCalled = 0; 397 398 r._read = function(n) { 399 if (readCalled++ === 2) 400 r.push(null); 401 else 402 r.push(Buffer.from('asdf')); 403 }; 404 405 r.on('readable', function() { 406 onReadable = true; 407 r.read(); 408 }); 409 410 r.on('end', common.mustCall(function() { 411 assert.strictEqual(readCalled, 3); 412 assert.ok(onReadable); 413 })); 414} 415 416{ 417 // Verify that streams are chainable 418 const r = new R(); 419 r._read = common.mustCall(); 420 const r2 = r.setEncoding('utf8').pause().resume().pause(); 421 assert.strictEqual(r, r2); 422} 423 424{ 425 // Verify readableEncoding property 426 assert(R.prototype.hasOwnProperty('readableEncoding')); 427 428 const r = new R({ encoding: 'utf8' }); 429 assert.strictEqual(r.readableEncoding, 'utf8'); 430} 431 432{ 433 // Verify readableObjectMode property 434 assert(R.prototype.hasOwnProperty('readableObjectMode')); 435 436 const r = new R({ objectMode: true }); 437 assert.strictEqual(r.readableObjectMode, true); 438} 439 440{ 441 // Verify writableObjectMode property 442 assert(W.prototype.hasOwnProperty('writableObjectMode')); 443 444 const w = new W({ objectMode: true }); 445 assert.strictEqual(w.writableObjectMode, true); 446} 447