1var stream = require('readable-stream') 2var eos = require('end-of-stream') 3var inherits = require('inherits') 4var shift = require('stream-shift') 5 6var SIGNAL_FLUSH = (Buffer.from && Buffer.from !== Uint8Array.from) 7 ? Buffer.from([0]) 8 : new Buffer([0]) 9 10var onuncork = function(self, fn) { 11 if (self._corked) self.once('uncork', fn) 12 else fn() 13} 14 15var autoDestroy = function (self, err) { 16 if (self._autoDestroy) self.destroy(err) 17} 18 19var destroyer = function(self, end) { 20 return function(err) { 21 if (err) autoDestroy(self, err.message === 'premature close' ? null : err) 22 else if (end && !self._ended) self.end() 23 } 24} 25 26var end = function(ws, fn) { 27 if (!ws) return fn() 28 if (ws._writableState && ws._writableState.finished) return fn() 29 if (ws._writableState) return ws.end(fn) 30 ws.end() 31 fn() 32} 33 34var toStreams2 = function(rs) { 35 return new (stream.Readable)({objectMode:true, highWaterMark:16}).wrap(rs) 36} 37 38var Duplexify = function(writable, readable, opts) { 39 if (!(this instanceof Duplexify)) return new Duplexify(writable, readable, opts) 40 stream.Duplex.call(this, opts) 41 42 this._writable = null 43 this._readable = null 44 this._readable2 = null 45 46 this._autoDestroy = !opts || opts.autoDestroy !== false 47 this._forwardDestroy = !opts || opts.destroy !== false 48 this._forwardEnd = !opts || opts.end !== false 49 this._corked = 1 // start corked 50 this._ondrain = null 51 this._drained = false 52 this._forwarding = false 53 this._unwrite = null 54 this._unread = null 55 this._ended = false 56 57 this.destroyed = false 58 59 if (writable) this.setWritable(writable) 60 if (readable) this.setReadable(readable) 61} 62 63inherits(Duplexify, stream.Duplex) 64 65Duplexify.obj = function(writable, readable, opts) { 66 if (!opts) opts = {} 67 opts.objectMode = true 68 opts.highWaterMark = 16 69 return new Duplexify(writable, readable, opts) 70} 71 72Duplexify.prototype.cork = function() { 73 if (++this._corked === 1) this.emit('cork') 74} 75 76Duplexify.prototype.uncork = function() { 77 if (this._corked && --this._corked === 0) this.emit('uncork') 78} 79 80Duplexify.prototype.setWritable = function(writable) { 81 if (this._unwrite) this._unwrite() 82 83 if (this.destroyed) { 84 if (writable && writable.destroy) writable.destroy() 85 return 86 } 87 88 if (writable === null || writable === false) { 89 this.end() 90 return 91 } 92 93 var self = this 94 var unend = eos(writable, {writable:true, readable:false}, destroyer(this, this._forwardEnd)) 95 96 var ondrain = function() { 97 var ondrain = self._ondrain 98 self._ondrain = null 99 if (ondrain) ondrain() 100 } 101 102 var clear = function() { 103 self._writable.removeListener('drain', ondrain) 104 unend() 105 } 106 107 if (this._unwrite) process.nextTick(ondrain) // force a drain on stream reset to avoid livelocks 108 109 this._writable = writable 110 this._writable.on('drain', ondrain) 111 this._unwrite = clear 112 113 this.uncork() // always uncork setWritable 114} 115 116Duplexify.prototype.setReadable = function(readable) { 117 if (this._unread) this._unread() 118 119 if (this.destroyed) { 120 if (readable && readable.destroy) readable.destroy() 121 return 122 } 123 124 if (readable === null || readable === false) { 125 this.push(null) 126 this.resume() 127 return 128 } 129 130 var self = this 131 var unend = eos(readable, {writable:false, readable:true}, destroyer(this)) 132 133 var onreadable = function() { 134 self._forward() 135 } 136 137 var onend = function() { 138 self.push(null) 139 } 140 141 var clear = function() { 142 self._readable2.removeListener('readable', onreadable) 143 self._readable2.removeListener('end', onend) 144 unend() 145 } 146 147 this._drained = true 148 this._readable = readable 149 this._readable2 = readable._readableState ? readable : toStreams2(readable) 150 this._readable2.on('readable', onreadable) 151 this._readable2.on('end', onend) 152 this._unread = clear 153 154 this._forward() 155} 156 157Duplexify.prototype._read = function() { 158 this._drained = true 159 this._forward() 160} 161 162Duplexify.prototype._forward = function() { 163 if (this._forwarding || !this._readable2 || !this._drained) return 164 this._forwarding = true 165 166 var data 167 168 while (this._drained && (data = shift(this._readable2)) !== null) { 169 if (this.destroyed) continue 170 this._drained = this.push(data) 171 } 172 173 this._forwarding = false 174} 175 176Duplexify.prototype.destroy = function(err) { 177 if (this.destroyed) return 178 this.destroyed = true 179 180 var self = this 181 process.nextTick(function() { 182 self._destroy(err) 183 }) 184} 185 186Duplexify.prototype._destroy = function(err) { 187 if (err) { 188 var ondrain = this._ondrain 189 this._ondrain = null 190 if (ondrain) ondrain(err) 191 else this.emit('error', err) 192 } 193 194 if (this._forwardDestroy) { 195 if (this._readable && this._readable.destroy) this._readable.destroy() 196 if (this._writable && this._writable.destroy) this._writable.destroy() 197 } 198 199 this.emit('close') 200} 201 202Duplexify.prototype._write = function(data, enc, cb) { 203 if (this.destroyed) return cb() 204 if (this._corked) return onuncork(this, this._write.bind(this, data, enc, cb)) 205 if (data === SIGNAL_FLUSH) return this._finish(cb) 206 if (!this._writable) return cb() 207 208 if (this._writable.write(data) === false) this._ondrain = cb 209 else cb() 210} 211 212 213Duplexify.prototype._finish = function(cb) { 214 var self = this 215 this.emit('preend') 216 onuncork(this, function() { 217 end(self._forwardEnd && self._writable, function() { 218 // haxx to not emit prefinish twice 219 if (self._writableState.prefinished === false) self._writableState.prefinished = true 220 self.emit('prefinish') 221 onuncork(self, cb) 222 }) 223 }) 224} 225 226Duplexify.prototype.end = function(data, enc, cb) { 227 if (typeof data === 'function') return this.end(null, null, data) 228 if (typeof enc === 'function') return this.end(data, null, enc) 229 this._ended = true 230 if (data) this.write(data) 231 if (!this._writableState.ending) this.write(SIGNAL_FLUSH) 232 return stream.Writable.prototype.end.call(this, cb) 233} 234 235module.exports = Duplexify 236