1'use strict'; 2 3const { 4 ObjectSetPrototypeOf, 5} = primordials; 6 7const EE = require('events'); 8 9function Stream(opts) { 10 EE.call(this, opts); 11} 12ObjectSetPrototypeOf(Stream.prototype, EE.prototype); 13ObjectSetPrototypeOf(Stream, EE); 14 15Stream.prototype.pipe = function(dest, options) { 16 const source = this; 17 18 function ondata(chunk) { 19 if (dest.writable && dest.write(chunk) === false && source.pause) { 20 source.pause(); 21 } 22 } 23 24 source.on('data', ondata); 25 26 function ondrain() { 27 if (source.readable && source.resume) { 28 source.resume(); 29 } 30 } 31 32 dest.on('drain', ondrain); 33 34 // If the 'end' option is not supplied, dest.end() will be called when 35 // source gets the 'end' or 'close' events. Only dest.end() once. 36 if (!dest._isStdio && (!options || options.end !== false)) { 37 source.on('end', onend); 38 source.on('close', onclose); 39 } 40 41 let didOnEnd = false; 42 function onend() { 43 if (didOnEnd) return; 44 didOnEnd = true; 45 46 dest.end(); 47 } 48 49 50 function onclose() { 51 if (didOnEnd) return; 52 didOnEnd = true; 53 54 if (typeof dest.destroy === 'function') dest.destroy(); 55 } 56 57 // Don't leave dangling pipes when there are errors. 58 function onerror(er) { 59 cleanup(); 60 if (EE.listenerCount(this, 'error') === 0) { 61 throw er; // Unhandled stream error in pipe. 62 } 63 } 64 65 source.on('error', onerror); 66 dest.on('error', onerror); 67 68 // Remove all the event listeners that were added. 69 function cleanup() { 70 source.removeListener('data', ondata); 71 dest.removeListener('drain', ondrain); 72 73 source.removeListener('end', onend); 74 source.removeListener('close', onclose); 75 76 source.removeListener('error', onerror); 77 dest.removeListener('error', onerror); 78 79 source.removeListener('end', cleanup); 80 source.removeListener('close', cleanup); 81 82 dest.removeListener('close', cleanup); 83 } 84 85 source.on('end', cleanup); 86 source.on('close', cleanup); 87 88 dest.on('close', cleanup); 89 dest.emit('pipe', source); 90 91 // Allow for unix-like usage: A.pipe(B).pipe(C) 92 return dest; 93}; 94 95module.exports = Stream; 96