1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23require('../common'); 24const assert = require('assert'); 25const { Readable, Writable } = require('stream'); 26 27const EE = require('events').EventEmitter; 28 29 30// A mock thing a bit like the net.Socket/tcp_wrap.handle interaction 31 32const stream = new Readable({ 33 highWaterMark: 16, 34 encoding: 'utf8' 35}); 36 37const source = new EE(); 38 39stream._read = function() { 40 console.error('stream._read'); 41 readStart(); 42}; 43 44let ended = false; 45stream.on('end', function() { 46 ended = true; 47}); 48 49source.on('data', function(chunk) { 50 const ret = stream.push(chunk); 51 console.error('data', stream.readableLength); 52 if (!ret) 53 readStop(); 54}); 55 56source.on('end', function() { 57 stream.push(null); 58}); 59 60let reading = false; 61 62function readStart() { 63 console.error('readStart'); 64 reading = true; 65} 66 67function readStop() { 68 console.error('readStop'); 69 reading = false; 70 process.nextTick(function() { 71 const r = stream.read(); 72 if (r !== null) 73 writer.write(r); 74 }); 75} 76 77const writer = new Writable({ 78 decodeStrings: false 79}); 80 81const written = []; 82 83const expectWritten = 84 [ 'asdfgasdfgasdfgasdfg', 85 'asdfgasdfgasdfgasdfg', 86 'asdfgasdfgasdfgasdfg', 87 'asdfgasdfgasdfgasdfg', 88 'asdfgasdfgasdfgasdfg', 89 'asdfgasdfgasdfgasdfg' ]; 90 91writer._write = function(chunk, encoding, cb) { 92 console.error(`WRITE ${chunk}`); 93 written.push(chunk); 94 process.nextTick(cb); 95}; 96 97writer.on('finish', finish); 98 99 100// Now emit some chunks. 101 102const chunk = 'asdfg'; 103 104let set = 0; 105readStart(); 106data(); 107function data() { 108 assert(reading); 109 source.emit('data', chunk); 110 assert(reading); 111 source.emit('data', chunk); 112 assert(reading); 113 source.emit('data', chunk); 114 assert(reading); 115 source.emit('data', chunk); 116 assert(!reading); 117 if (set++ < 5) 118 setTimeout(data, 10); 119 else 120 end(); 121} 122 123function finish() { 124 console.error('finish'); 125 assert.deepStrictEqual(written, expectWritten); 126 console.log('ok'); 127} 128 129function end() { 130 source.emit('end'); 131 assert(!reading); 132 writer.end(stream.read()); 133 setImmediate(function() { 134 assert(ended); 135 }); 136} 137