1// Ported from https://github.com/mafintosh/end-of-stream with 2// permission from the author, Mathias Buus (@mafintosh). 3 4'use strict'; 5 6const { 7 ERR_INVALID_ARG_TYPE, 8 ERR_STREAM_PREMATURE_CLOSE 9} = require('internal/errors').codes; 10const { once } = require('internal/util'); 11 12function isRequest(stream) { 13 return stream.setHeader && typeof stream.abort === 'function'; 14} 15 16function isReadable(stream) { 17 return typeof stream.readable === 'boolean' || 18 typeof stream.readableEnded === 'boolean' || 19 !!stream._readableState; 20} 21 22function isWritable(stream) { 23 return typeof stream.writable === 'boolean' || 24 typeof stream.writableEnded === 'boolean' || 25 !!stream._writableState; 26} 27 28function eos(stream, opts, callback) { 29 if (arguments.length === 2) { 30 callback = opts; 31 opts = {}; 32 } else if (opts == null) { 33 opts = {}; 34 } else if (typeof opts !== 'object') { 35 throw new ERR_INVALID_ARG_TYPE('opts', 'object', opts); 36 } 37 if (typeof callback !== 'function') { 38 throw new ERR_INVALID_ARG_TYPE('callback', 'function', callback); 39 } 40 41 callback = once(callback); 42 43 let readable = opts.readable || 44 (opts.readable !== false && isReadable(stream)); 45 let writable = opts.writable || 46 (opts.writable !== false && isWritable(stream)); 47 48 const onlegacyfinish = () => { 49 if (!stream.writable) onfinish(); 50 }; 51 52 let writableEnded = stream._writableState && stream._writableState.finished; 53 const onfinish = () => { 54 writable = false; 55 writableEnded = true; 56 if (!readable) callback.call(stream); 57 }; 58 59 let readableEnded = stream.readableEnded || 60 (stream._readableState && stream._readableState.endEmitted); 61 const onend = () => { 62 readable = false; 63 readableEnded = true; 64 if (!writable) callback.call(stream); 65 }; 66 67 const onerror = (err) => { 68 callback.call(stream, err); 69 }; 70 71 const onclose = () => { 72 let err; 73 if (readable && !readableEnded) { 74 if (!stream._readableState || !stream._readableState.ended) 75 err = new ERR_STREAM_PREMATURE_CLOSE(); 76 return callback.call(stream, err); 77 } 78 if (writable && !writableEnded) { 79 if (!stream._writableState || !stream._writableState.ended) 80 err = new ERR_STREAM_PREMATURE_CLOSE(); 81 return callback.call(stream, err); 82 } 83 }; 84 85 const onrequest = () => { 86 stream.req.on('finish', onfinish); 87 }; 88 89 if (isRequest(stream)) { 90 stream.on('complete', onfinish); 91 stream.on('abort', onclose); 92 if (stream.req) onrequest(); 93 else stream.on('request', onrequest); 94 } else if (writable && !stream._writableState) { // legacy streams 95 stream.on('end', onlegacyfinish); 96 stream.on('close', onlegacyfinish); 97 } 98 99 // Not all streams will emit 'close' after 'aborted'. 100 if (typeof stream.aborted === 'boolean') { 101 stream.on('aborted', onclose); 102 } 103 104 stream.on('end', onend); 105 stream.on('finish', onfinish); 106 if (opts.error !== false) stream.on('error', onerror); 107 stream.on('close', onclose); 108 109 return function() { 110 stream.removeListener('aborted', onclose); 111 stream.removeListener('complete', onfinish); 112 stream.removeListener('abort', onclose); 113 stream.removeListener('request', onrequest); 114 if (stream.req) stream.req.removeListener('finish', onfinish); 115 stream.removeListener('end', onlegacyfinish); 116 stream.removeListener('close', onlegacyfinish); 117 stream.removeListener('finish', onfinish); 118 stream.removeListener('end', onend); 119 stream.removeListener('error', onerror); 120 stream.removeListener('close', onclose); 121 }; 122} 123 124module.exports = eos; 125