1// Ported from https://github.com/mafintosh/end-of-stream with 2// permission from the author, Mathias Buus (@mafintosh). 3 4'use strict'; 5 6const { 7 ERR_INVALID_ARG_TYPE, 8 ERR_STREAM_PREMATURE_CLOSE 9} = require('internal/errors').codes; 10const { once } = require('internal/util'); 11 12function isRequest(stream) { 13 return stream.setHeader && typeof stream.abort === 'function'; 14} 15 16function isReadable(stream) { 17 return typeof stream.readable === 'boolean' || 18 typeof stream.readableEnded === 'boolean' || 19 !!stream._readableState; 20} 21 22function isWritable(stream) { 23 return typeof stream.writable === 'boolean' || 24 typeof stream.writableEnded === 'boolean' || 25 !!stream._writableState; 26} 27 28function isWritableFinished(stream) { 29 if (stream.writableFinished) return true; 30 const wState = stream._writableState; 31 if (!wState || wState.errored) return false; 32 return wState.finished || (wState.ended && wState.length === 0); 33} 34 35function nop() {} 36 37function isReadableEnded(stream) { 38 if (stream.readableEnded) return true; 39 const rState = stream._readableState; 40 if (!rState || rState.errored) return false; 41 return rState.endEmitted || (rState.ended && rState.length === 0); 42} 43 44function eos(stream, options, callback) { 45 if (arguments.length === 2) { 46 callback = options; 47 options = {}; 48 } else if (options == null) { 49 options = {}; 50 } else if (typeof options !== 'object') { 51 throw new ERR_INVALID_ARG_TYPE('options', 'object', options); 52 } 53 if (typeof callback !== 'function') { 54 throw new ERR_INVALID_ARG_TYPE('callback', 'function', callback); 55 } 56 57 callback = once(callback); 58 59 const readable = options.readable || 60 (options.readable !== false && isReadable(stream)); 61 const writable = options.writable || 62 (options.writable !== false && isWritable(stream)); 63 64 const wState = stream._writableState; 65 const rState = stream._readableState; 66 const state = wState || rState; 67 68 const onlegacyfinish = () => { 69 if (!stream.writable) onfinish(); 70 }; 71 72 // TODO (ronag): Improve soft detection to include core modules and 73 // common ecosystem modules that do properly emit 'close' but fail 74 // this generic check. 75 let willEmitClose = ( 76 state && 77 state.autoDestroy && 78 state.emitClose && 79 state.closed === false && 80 isReadable(stream) === readable && 81 isWritable(stream) === writable 82 ); 83 84 let writableFinished = stream.writableFinished || 85 (wState && wState.finished); 86 const onfinish = () => { 87 writableFinished = true; 88 // Stream should not be destroyed here. If it is that 89 // means that user space is doing something differently and 90 // we cannot trust willEmitClose. 91 if (stream.destroyed) willEmitClose = false; 92 93 if (willEmitClose && (!stream.readable || readable)) return; 94 if (!readable || readableEnded) callback.call(stream); 95 }; 96 97 let readableEnded = stream.readableEnded || 98 (rState && rState.endEmitted); 99 const onend = () => { 100 readableEnded = true; 101 // Stream should not be destroyed here. If it is that 102 // means that user space is doing something differently and 103 // we cannot trust willEmitClose. 104 if (stream.destroyed) willEmitClose = false; 105 106 if (willEmitClose && (!stream.writable || writable)) return; 107 if (!writable || writableFinished) callback.call(stream); 108 }; 109 110 const onerror = (err) => { 111 callback.call(stream, err); 112 }; 113 114 const onclose = () => { 115 if (readable && !readableEnded) { 116 if (!isReadableEnded(stream)) 117 return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); 118 } 119 if (writable && !writableFinished) { 120 if (!isWritableFinished(stream)) 121 return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); 122 } 123 callback.call(stream); 124 }; 125 126 const onrequest = () => { 127 stream.req.on('finish', onfinish); 128 }; 129 130 if (isRequest(stream)) { 131 stream.on('complete', onfinish); 132 if (!willEmitClose) { 133 stream.on('abort', onclose); 134 } 135 if (stream.req) onrequest(); 136 else stream.on('request', onrequest); 137 } else if (writable && !wState) { // legacy streams 138 stream.on('end', onlegacyfinish); 139 stream.on('close', onlegacyfinish); 140 } 141 142 // Not all streams will emit 'close' after 'aborted'. 143 if (!willEmitClose && typeof stream.aborted === 'boolean') { 144 stream.on('aborted', onclose); 145 } 146 147 stream.on('end', onend); 148 stream.on('finish', onfinish); 149 if (options.error !== false) stream.on('error', onerror); 150 stream.on('close', onclose); 151 152 const closed = ( 153 (wState && wState.closed) || 154 (rState && rState.closed) || 155 (wState && wState.errorEmitted) || 156 (rState && rState.errorEmitted) || 157 (rState && stream.req && stream.aborted) || 158 ( 159 (!writable || (wState && wState.finished)) && 160 (!readable || (rState && rState.endEmitted)) 161 ) 162 ); 163 164 if (closed) { 165 // TODO(ronag): Re-throw error if errorEmitted? 166 // TODO(ronag): Throw premature close as if finished was called? 167 // before being closed? i.e. if closed but not errored, ended or finished. 168 // TODO(ronag): Throw some kind of error? Does it make sense 169 // to call finished() on a "finished" stream? 170 // TODO(ronag): willEmitClose? 171 process.nextTick(() => { 172 callback(); 173 }); 174 } 175 176 return function() { 177 callback = nop; 178 stream.removeListener('aborted', onclose); 179 stream.removeListener('complete', onfinish); 180 stream.removeListener('abort', onclose); 181 stream.removeListener('request', onrequest); 182 if (stream.req) stream.req.removeListener('finish', onfinish); 183 stream.removeListener('end', onlegacyfinish); 184 stream.removeListener('close', onlegacyfinish); 185 stream.removeListener('finish', onfinish); 186 stream.removeListener('end', onend); 187 stream.removeListener('error', onerror); 188 stream.removeListener('close', onclose); 189 }; 190} 191 192module.exports = eos; 193