1/* replacement start */ 2 3const process = require('process/') 4 5/* replacement end */ 6// Ported from https://github.com/mafintosh/end-of-stream with 7// permission from the author, Mathias Buus (@mafintosh). 8 9;('use strict') 10const { AbortError, codes } = require('../../ours/errors') 11const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes 12const { kEmptyObject, once } = require('../../ours/util') 13const { validateAbortSignal, validateFunction, validateObject, validateBoolean } = require('../validators') 14const { Promise, PromisePrototypeThen } = require('../../ours/primordials') 15const { 16 isClosed, 17 isReadable, 18 isReadableNodeStream, 19 isReadableStream, 20 isReadableFinished, 21 isReadableErrored, 22 isWritable, 23 isWritableNodeStream, 24 isWritableStream, 25 isWritableFinished, 26 isWritableErrored, 27 isNodeStream, 28 willEmitClose: _willEmitClose, 29 kIsClosedPromise 30} = require('./utils') 31function isRequest(stream) { 32 return stream.setHeader && typeof stream.abort === 'function' 33} 34const nop = () => {} 35function eos(stream, options, callback) { 36 var _options$readable, _options$writable 37 if (arguments.length === 2) { 38 callback = options 39 options = kEmptyObject 40 } else if (options == null) { 41 options = kEmptyObject 42 } else { 43 validateObject(options, 'options') 44 } 45 validateFunction(callback, 'callback') 46 validateAbortSignal(options.signal, 'options.signal') 47 callback = once(callback) 48 if (isReadableStream(stream) || isWritableStream(stream)) { 49 return eosWeb(stream, options, callback) 50 } 51 if (!isNodeStream(stream)) { 52 throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream) 53 } 54 const readable = 55 (_options$readable = options.readable) !== null && _options$readable !== undefined 56 ? _options$readable 57 : isReadableNodeStream(stream) 58 const writable = 59 (_options$writable = options.writable) !== null && _options$writable !== undefined 60 ? _options$writable 61 : isWritableNodeStream(stream) 62 const wState = stream._writableState 63 const rState = stream._readableState 64 const onlegacyfinish = () => { 65 if (!stream.writable) { 66 onfinish() 67 } 68 } 69 70 // TODO (ronag): Improve soft detection to include core modules and 71 // common ecosystem modules that do properly emit 'close' but fail 72 // this generic check. 73 let willEmitClose = 74 _willEmitClose(stream) && isReadableNodeStream(stream) === readable && isWritableNodeStream(stream) === writable 75 let writableFinished = isWritableFinished(stream, false) 76 const onfinish = () => { 77 writableFinished = true 78 // Stream should not be destroyed here. If it is that 79 // means that user space is doing something differently and 80 // we cannot trust willEmitClose. 81 if (stream.destroyed) { 82 willEmitClose = false 83 } 84 if (willEmitClose && (!stream.readable || readable)) { 85 return 86 } 87 if (!readable || readableFinished) { 88 callback.call(stream) 89 } 90 } 91 let readableFinished = isReadableFinished(stream, false) 92 const onend = () => { 93 readableFinished = true 94 // Stream should not be destroyed here. If it is that 95 // means that user space is doing something differently and 96 // we cannot trust willEmitClose. 97 if (stream.destroyed) { 98 willEmitClose = false 99 } 100 if (willEmitClose && (!stream.writable || writable)) { 101 return 102 } 103 if (!writable || writableFinished) { 104 callback.call(stream) 105 } 106 } 107 const onerror = (err) => { 108 callback.call(stream, err) 109 } 110 let closed = isClosed(stream) 111 const onclose = () => { 112 closed = true 113 const errored = isWritableErrored(stream) || isReadableErrored(stream) 114 if (errored && typeof errored !== 'boolean') { 115 return callback.call(stream, errored) 116 } 117 if (readable && !readableFinished && isReadableNodeStream(stream, true)) { 118 if (!isReadableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()) 119 } 120 if (writable && !writableFinished) { 121 if (!isWritableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()) 122 } 123 callback.call(stream) 124 } 125 const onclosed = () => { 126 closed = true 127 const errored = isWritableErrored(stream) || isReadableErrored(stream) 128 if (errored && typeof errored !== 'boolean') { 129 return callback.call(stream, errored) 130 } 131 callback.call(stream) 132 } 133 const onrequest = () => { 134 stream.req.on('finish', onfinish) 135 } 136 if (isRequest(stream)) { 137 stream.on('complete', onfinish) 138 if (!willEmitClose) { 139 stream.on('abort', onclose) 140 } 141 if (stream.req) { 142 onrequest() 143 } else { 144 stream.on('request', onrequest) 145 } 146 } else if (writable && !wState) { 147 // legacy streams 148 stream.on('end', onlegacyfinish) 149 stream.on('close', onlegacyfinish) 150 } 151 152 // Not all streams will emit 'close' after 'aborted'. 153 if (!willEmitClose && typeof stream.aborted === 'boolean') { 154 stream.on('aborted', onclose) 155 } 156 stream.on('end', onend) 157 stream.on('finish', onfinish) 158 if (options.error !== false) { 159 stream.on('error', onerror) 160 } 161 stream.on('close', onclose) 162 if (closed) { 163 process.nextTick(onclose) 164 } else if ( 165 (wState !== null && wState !== undefined && wState.errorEmitted) || 166 (rState !== null && rState !== undefined && rState.errorEmitted) 167 ) { 168 if (!willEmitClose) { 169 process.nextTick(onclosed) 170 } 171 } else if ( 172 !readable && 173 (!willEmitClose || isReadable(stream)) && 174 (writableFinished || isWritable(stream) === false) 175 ) { 176 process.nextTick(onclosed) 177 } else if ( 178 !writable && 179 (!willEmitClose || isWritable(stream)) && 180 (readableFinished || isReadable(stream) === false) 181 ) { 182 process.nextTick(onclosed) 183 } else if (rState && stream.req && stream.aborted) { 184 process.nextTick(onclosed) 185 } 186 const cleanup = () => { 187 callback = nop 188 stream.removeListener('aborted', onclose) 189 stream.removeListener('complete', onfinish) 190 stream.removeListener('abort', onclose) 191 stream.removeListener('request', onrequest) 192 if (stream.req) stream.req.removeListener('finish', onfinish) 193 stream.removeListener('end', onlegacyfinish) 194 stream.removeListener('close', onlegacyfinish) 195 stream.removeListener('finish', onfinish) 196 stream.removeListener('end', onend) 197 stream.removeListener('error', onerror) 198 stream.removeListener('close', onclose) 199 } 200 if (options.signal && !closed) { 201 const abort = () => { 202 // Keep it because cleanup removes it. 203 const endCallback = callback 204 cleanup() 205 endCallback.call( 206 stream, 207 new AbortError(undefined, { 208 cause: options.signal.reason 209 }) 210 ) 211 } 212 if (options.signal.aborted) { 213 process.nextTick(abort) 214 } else { 215 const originalCallback = callback 216 callback = once((...args) => { 217 options.signal.removeEventListener('abort', abort) 218 originalCallback.apply(stream, args) 219 }) 220 options.signal.addEventListener('abort', abort) 221 } 222 } 223 return cleanup 224} 225function eosWeb(stream, options, callback) { 226 let isAborted = false 227 let abort = nop 228 if (options.signal) { 229 abort = () => { 230 isAborted = true 231 callback.call( 232 stream, 233 new AbortError(undefined, { 234 cause: options.signal.reason 235 }) 236 ) 237 } 238 if (options.signal.aborted) { 239 process.nextTick(abort) 240 } else { 241 const originalCallback = callback 242 callback = once((...args) => { 243 options.signal.removeEventListener('abort', abort) 244 originalCallback.apply(stream, args) 245 }) 246 options.signal.addEventListener('abort', abort) 247 } 248 } 249 const resolverFn = (...args) => { 250 if (!isAborted) { 251 process.nextTick(() => callback.apply(stream, args)) 252 } 253 } 254 PromisePrototypeThen(stream[kIsClosedPromise].promise, resolverFn, resolverFn) 255 return nop 256} 257function finished(stream, opts) { 258 var _opts 259 let autoCleanup = false 260 if (opts === null) { 261 opts = kEmptyObject 262 } 263 if ((_opts = opts) !== null && _opts !== undefined && _opts.cleanup) { 264 validateBoolean(opts.cleanup, 'cleanup') 265 autoCleanup = opts.cleanup 266 } 267 return new Promise((resolve, reject) => { 268 const cleanup = eos(stream, opts, (err) => { 269 if (autoCleanup) { 270 cleanup() 271 } 272 if (err) { 273 reject(err) 274 } else { 275 resolve() 276 } 277 }) 278 }) 279} 280module.exports = eos 281module.exports.finished = finished 282