1'use strict'; 2 3const { 4 ArrayIsArray, 5 ObjectSetPrototypeOf, 6} = primordials; 7 8const EE = require('events'); 9 10function Stream(opts) { 11 EE.call(this, opts); 12} 13ObjectSetPrototypeOf(Stream.prototype, EE.prototype); 14ObjectSetPrototypeOf(Stream, EE); 15 16Stream.prototype.pipe = function(dest, options) { 17 const source = this; 18 19 function ondata(chunk) { 20 if (dest.writable && dest.write(chunk) === false && source.pause) { 21 source.pause(); 22 } 23 } 24 25 source.on('data', ondata); 26 27 function ondrain() { 28 if (source.readable && source.resume) { 29 source.resume(); 30 } 31 } 32 33 dest.on('drain', ondrain); 34 35 // If the 'end' option is not supplied, dest.end() will be called when 36 // source gets the 'end' or 'close' events. Only dest.end() once. 37 if (!dest._isStdio && (!options || options.end !== false)) { 38 source.on('end', onend); 39 source.on('close', onclose); 40 } 41 42 let didOnEnd = false; 43 function onend() { 44 if (didOnEnd) return; 45 didOnEnd = true; 46 47 dest.end(); 48 } 49 50 51 function onclose() { 52 if (didOnEnd) return; 53 didOnEnd = true; 54 55 if (typeof dest.destroy === 'function') dest.destroy(); 56 } 57 58 // Don't leave dangling pipes when there are errors. 59 function onerror(er) { 60 cleanup(); 61 if (EE.listenerCount(this, 'error') === 0) { 62 this.emit('error', er); 63 } 64 } 65 66 prependListener(source, 'error', onerror); 67 prependListener(dest, 'error', onerror); 68 69 // Remove all the event listeners that were added. 70 function cleanup() { 71 source.removeListener('data', ondata); 72 dest.removeListener('drain', ondrain); 73 74 source.removeListener('end', onend); 75 source.removeListener('close', onclose); 76 77 source.removeListener('error', onerror); 78 dest.removeListener('error', onerror); 79 80 source.removeListener('end', cleanup); 81 source.removeListener('close', cleanup); 82 83 dest.removeListener('close', cleanup); 84 } 85 86 source.on('end', cleanup); 87 source.on('close', cleanup); 88 89 dest.on('close', cleanup); 90 dest.emit('pipe', source); 91 92 // Allow for unix-like usage: A.pipe(B).pipe(C) 93 return dest; 94}; 95 96function prependListener(emitter, event, fn) { 97 // Sadly this is not cacheable as some libraries bundle their own 98 // event emitter implementation with them. 99 if (typeof emitter.prependListener === 'function') 100 return emitter.prependListener(event, fn); 101 102 // This is a hack to make sure that our error handler is attached before any 103 // userland ones. NEVER DO THIS. This is here only because this code needs 104 // to continue to work with older versions of Node.js that do not include 105 // the prependListener() method. The goal is to eventually remove this hack. 106 if (!emitter._events || !emitter._events[event]) 107 emitter.on(event, fn); 108 else if (ArrayIsArray(emitter._events[event])) 109 emitter._events[event].unshift(fn); 110 else 111 emitter._events[event] = [fn, emitter._events[event]]; 112} 113 114module.exports = { Stream, prependListener }; 115