• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Ported from https://github.com/mafintosh/end-of-stream with
2// permission from the author, Mathias Buus (@mafintosh).
3
4'use strict';
5
6const {
7  ERR_INVALID_ARG_TYPE,
8  ERR_STREAM_PREMATURE_CLOSE
9} = require('internal/errors').codes;
10const { once } = require('internal/util');
11
12function isRequest(stream) {
13  return stream.setHeader && typeof stream.abort === 'function';
14}
15
16function isReadable(stream) {
17  return typeof stream.readable === 'boolean' ||
18    typeof stream.readableEnded === 'boolean' ||
19    !!stream._readableState;
20}
21
22function isWritable(stream) {
23  return typeof stream.writable === 'boolean' ||
24    typeof stream.writableEnded === 'boolean' ||
25    !!stream._writableState;
26}
27
28function isWritableFinished(stream) {
29  if (stream.writableFinished) return true;
30  const wState = stream._writableState;
31  if (!wState || wState.errored) return false;
32  return wState.finished || (wState.ended && wState.length === 0);
33}
34
35function nop() {}
36
37function isReadableEnded(stream) {
38  if (stream.readableEnded) return true;
39  const rState = stream._readableState;
40  if (!rState || rState.errored) return false;
41  return rState.endEmitted || (rState.ended && rState.length === 0);
42}
43
44function eos(stream, options, callback) {
45  if (arguments.length === 2) {
46    callback = options;
47    options = {};
48  } else if (options == null) {
49    options = {};
50  } else if (typeof options !== 'object') {
51    throw new ERR_INVALID_ARG_TYPE('options', 'object', options);
52  }
53  if (typeof callback !== 'function') {
54    throw new ERR_INVALID_ARG_TYPE('callback', 'function', callback);
55  }
56
57  callback = once(callback);
58
59  const readable = options.readable ||
60    (options.readable !== false && isReadable(stream));
61  const writable = options.writable ||
62    (options.writable !== false && isWritable(stream));
63
64  const wState = stream._writableState;
65  const rState = stream._readableState;
66  const state = wState || rState;
67
68  const onlegacyfinish = () => {
69    if (!stream.writable) onfinish();
70  };
71
72  // TODO (ronag): Improve soft detection to include core modules and
73  // common ecosystem modules that do properly emit 'close' but fail
74  // this generic check.
75  let willEmitClose = (
76    state &&
77    state.autoDestroy &&
78    state.emitClose &&
79    state.closed === false &&
80    isReadable(stream) === readable &&
81    isWritable(stream) === writable
82  );
83
84  let writableFinished = stream.writableFinished ||
85    (wState && wState.finished);
86  const onfinish = () => {
87    writableFinished = true;
88    // Stream should not be destroyed here. If it is that
89    // means that user space is doing something differently and
90    // we cannot trust willEmitClose.
91    if (stream.destroyed) willEmitClose = false;
92
93    if (willEmitClose && (!stream.readable || readable)) return;
94    if (!readable || readableEnded) callback.call(stream);
95  };
96
97  let readableEnded = stream.readableEnded ||
98    (rState && rState.endEmitted);
99  const onend = () => {
100    readableEnded = true;
101    // Stream should not be destroyed here. If it is that
102    // means that user space is doing something differently and
103    // we cannot trust willEmitClose.
104    if (stream.destroyed) willEmitClose = false;
105
106    if (willEmitClose && (!stream.writable || writable)) return;
107    if (!writable || writableFinished) callback.call(stream);
108  };
109
110  const onerror = (err) => {
111    callback.call(stream, err);
112  };
113
114  const onclose = () => {
115    if (readable && !readableEnded) {
116      if (!isReadableEnded(stream))
117        return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
118    }
119    if (writable && !writableFinished) {
120      if (!isWritableFinished(stream))
121        return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
122    }
123    callback.call(stream);
124  };
125
126  const onrequest = () => {
127    stream.req.on('finish', onfinish);
128  };
129
130  if (isRequest(stream)) {
131    stream.on('complete', onfinish);
132    if (!willEmitClose) {
133      stream.on('abort', onclose);
134    }
135    if (stream.req) onrequest();
136    else stream.on('request', onrequest);
137  } else if (writable && !wState) { // legacy streams
138    stream.on('end', onlegacyfinish);
139    stream.on('close', onlegacyfinish);
140  }
141
142  // Not all streams will emit 'close' after 'aborted'.
143  if (!willEmitClose && typeof stream.aborted === 'boolean') {
144    stream.on('aborted', onclose);
145  }
146
147  stream.on('end', onend);
148  stream.on('finish', onfinish);
149  if (options.error !== false) stream.on('error', onerror);
150  stream.on('close', onclose);
151
152  const closed = (
153    (wState && wState.closed) ||
154    (rState && rState.closed) ||
155    (wState && wState.errorEmitted) ||
156    (rState && rState.errorEmitted) ||
157    (rState && stream.req && stream.aborted) ||
158    (
159      (!writable || (wState && wState.finished)) &&
160      (!readable || (rState && rState.endEmitted))
161    )
162  );
163
164  if (closed) {
165    // TODO(ronag): Re-throw error if errorEmitted?
166    // TODO(ronag): Throw premature close as if finished was called?
167    // before being closed? i.e. if closed but not errored, ended or finished.
168    // TODO(ronag): Throw some kind of error? Does it make sense
169    // to call finished() on a "finished" stream?
170    // TODO(ronag): willEmitClose?
171    process.nextTick(() => {
172      callback();
173    });
174  }
175
176  return function() {
177    callback = nop;
178    stream.removeListener('aborted', onclose);
179    stream.removeListener('complete', onfinish);
180    stream.removeListener('abort', onclose);
181    stream.removeListener('request', onrequest);
182    if (stream.req) stream.req.removeListener('finish', onfinish);
183    stream.removeListener('end', onlegacyfinish);
184    stream.removeListener('close', onlegacyfinish);
185    stream.removeListener('finish', onfinish);
186    stream.removeListener('end', onend);
187    stream.removeListener('error', onerror);
188    stream.removeListener('close', onclose);
189  };
190}
191
192module.exports = eos;
193