1"use strict"; 2 3var stream = require("stream"); 4 5function DuplexWrapper(options, writable, readable) { 6 if (typeof readable === "undefined") { 7 readable = writable; 8 writable = options; 9 options = null; 10 } 11 12 stream.Duplex.call(this, options); 13 14 if (typeof readable.read !== "function") { 15 readable = (new stream.Readable(options)).wrap(readable); 16 } 17 18 this._writable = writable; 19 this._readable = readable; 20 this._waiting = false; 21 22 var self = this; 23 24 writable.once("finish", function() { 25 self.end(); 26 }); 27 28 this.once("finish", function() { 29 writable.end(); 30 }); 31 32 readable.on("readable", function() { 33 if (self._waiting) { 34 self._waiting = false; 35 self._read(); 36 } 37 }); 38 39 readable.once("end", function() { 40 self.push(null); 41 }); 42 43 if (!options || typeof options.bubbleErrors === "undefined" || options.bubbleErrors) { 44 writable.on("error", function(err) { 45 self.emit("error", err); 46 }); 47 48 readable.on("error", function(err) { 49 self.emit("error", err); 50 }); 51 } 52} 53 54DuplexWrapper.prototype = Object.create(stream.Duplex.prototype, {constructor: {value: DuplexWrapper}}); 55 56DuplexWrapper.prototype._write = function _write(input, encoding, done) { 57 this._writable.write(input, encoding, done); 58}; 59 60DuplexWrapper.prototype._read = function _read() { 61 var buf; 62 var reads = 0; 63 while ((buf = this._readable.read()) !== null) { 64 this.push(buf); 65 reads++; 66 } 67 if (reads === 0) { 68 this._waiting = true; 69 } 70}; 71 72module.exports = function duplex2(options, writable, readable) { 73 return new DuplexWrapper(options, writable, readable); 74}; 75 76module.exports.DuplexWrapper = DuplexWrapper; 77