• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Ported from https://github.com/mafintosh/end-of-stream with
2// permission from the author, Mathias Buus (@mafintosh).
3
4'use strict';
5
6const {
7  ERR_INVALID_ARG_TYPE,
8  ERR_STREAM_PREMATURE_CLOSE
9} = require('internal/errors').codes;
10const { once } = require('internal/util');
11
12function isRequest(stream) {
13  return stream.setHeader && typeof stream.abort === 'function';
14}
15
16function isReadable(stream) {
17  return typeof stream.readable === 'boolean' ||
18    typeof stream.readableEnded === 'boolean' ||
19    !!stream._readableState;
20}
21
22function isWritable(stream) {
23  return typeof stream.writable === 'boolean' ||
24    typeof stream.writableEnded === 'boolean' ||
25    !!stream._writableState;
26}
27
28function eos(stream, opts, callback) {
29  if (arguments.length === 2) {
30    callback = opts;
31    opts = {};
32  } else if (opts == null) {
33    opts = {};
34  } else if (typeof opts !== 'object') {
35    throw new ERR_INVALID_ARG_TYPE('opts', 'object', opts);
36  }
37  if (typeof callback !== 'function') {
38    throw new ERR_INVALID_ARG_TYPE('callback', 'function', callback);
39  }
40
41  callback = once(callback);
42
43  let readable = opts.readable ||
44    (opts.readable !== false && isReadable(stream));
45  let writable = opts.writable ||
46    (opts.writable !== false && isWritable(stream));
47
48  const onlegacyfinish = () => {
49    if (!stream.writable) onfinish();
50  };
51
52  let writableEnded = stream._writableState && stream._writableState.finished;
53  const onfinish = () => {
54    writable = false;
55    writableEnded = true;
56    if (!readable) callback.call(stream);
57  };
58
59  let readableEnded = stream.readableEnded ||
60    (stream._readableState && stream._readableState.endEmitted);
61  const onend = () => {
62    readable = false;
63    readableEnded = true;
64    if (!writable) callback.call(stream);
65  };
66
67  const onerror = (err) => {
68    callback.call(stream, err);
69  };
70
71  const onclose = () => {
72    let err;
73    if (readable && !readableEnded) {
74      if (!stream._readableState || !stream._readableState.ended)
75        err = new ERR_STREAM_PREMATURE_CLOSE();
76      return callback.call(stream, err);
77    }
78    if (writable && !writableEnded) {
79      if (!stream._writableState || !stream._writableState.ended)
80        err = new ERR_STREAM_PREMATURE_CLOSE();
81      return callback.call(stream, err);
82    }
83  };
84
85  const onrequest = () => {
86    stream.req.on('finish', onfinish);
87  };
88
89  if (isRequest(stream)) {
90    stream.on('complete', onfinish);
91    stream.on('abort', onclose);
92    if (stream.req) onrequest();
93    else stream.on('request', onrequest);
94  } else if (writable && !stream._writableState) { // legacy streams
95    stream.on('end', onlegacyfinish);
96    stream.on('close', onlegacyfinish);
97  }
98
99  // Not all streams will emit 'close' after 'aborted'.
100  if (typeof stream.aborted === 'boolean') {
101    stream.on('aborted', onclose);
102  }
103
104  stream.on('end', onend);
105  stream.on('finish', onfinish);
106  if (opts.error !== false) stream.on('error', onerror);
107  stream.on('close', onclose);
108
109  return function() {
110    stream.removeListener('aborted', onclose);
111    stream.removeListener('complete', onfinish);
112    stream.removeListener('abort', onclose);
113    stream.removeListener('request', onrequest);
114    if (stream.req) stream.req.removeListener('finish', onfinish);
115    stream.removeListener('end', onlegacyfinish);
116    stream.removeListener('close', onlegacyfinish);
117    stream.removeListener('finish', onfinish);
118    stream.removeListener('end', onend);
119    stream.removeListener('error', onerror);
120    stream.removeListener('close', onclose);
121  };
122}
123
124module.exports = eos;
125