• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/* replacement start */
2
3const process = require('process/')
4
5/* replacement end */
6// Ported from https://github.com/mafintosh/end-of-stream with
7// permission from the author, Mathias Buus (@mafintosh).
8
9;('use strict')
10const { AbortError, codes } = require('../../ours/errors')
11const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes
12const { kEmptyObject, once } = require('../../ours/util')
13const { validateAbortSignal, validateFunction, validateObject, validateBoolean } = require('../validators')
14const { Promise, PromisePrototypeThen } = require('../../ours/primordials')
15const {
16  isClosed,
17  isReadable,
18  isReadableNodeStream,
19  isReadableStream,
20  isReadableFinished,
21  isReadableErrored,
22  isWritable,
23  isWritableNodeStream,
24  isWritableStream,
25  isWritableFinished,
26  isWritableErrored,
27  isNodeStream,
28  willEmitClose: _willEmitClose,
29  kIsClosedPromise
30} = require('./utils')
31function isRequest(stream) {
32  return stream.setHeader && typeof stream.abort === 'function'
33}
34const nop = () => {}
35function eos(stream, options, callback) {
36  var _options$readable, _options$writable
37  if (arguments.length === 2) {
38    callback = options
39    options = kEmptyObject
40  } else if (options == null) {
41    options = kEmptyObject
42  } else {
43    validateObject(options, 'options')
44  }
45  validateFunction(callback, 'callback')
46  validateAbortSignal(options.signal, 'options.signal')
47  callback = once(callback)
48  if (isReadableStream(stream) || isWritableStream(stream)) {
49    return eosWeb(stream, options, callback)
50  }
51  if (!isNodeStream(stream)) {
52    throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream)
53  }
54  const readable =
55    (_options$readable = options.readable) !== null && _options$readable !== undefined
56      ? _options$readable
57      : isReadableNodeStream(stream)
58  const writable =
59    (_options$writable = options.writable) !== null && _options$writable !== undefined
60      ? _options$writable
61      : isWritableNodeStream(stream)
62  const wState = stream._writableState
63  const rState = stream._readableState
64  const onlegacyfinish = () => {
65    if (!stream.writable) {
66      onfinish()
67    }
68  }
69
70  // TODO (ronag): Improve soft detection to include core modules and
71  // common ecosystem modules that do properly emit 'close' but fail
72  // this generic check.
73  let willEmitClose =
74    _willEmitClose(stream) && isReadableNodeStream(stream) === readable && isWritableNodeStream(stream) === writable
75  let writableFinished = isWritableFinished(stream, false)
76  const onfinish = () => {
77    writableFinished = true
78    // Stream should not be destroyed here. If it is that
79    // means that user space is doing something differently and
80    // we cannot trust willEmitClose.
81    if (stream.destroyed) {
82      willEmitClose = false
83    }
84    if (willEmitClose && (!stream.readable || readable)) {
85      return
86    }
87    if (!readable || readableFinished) {
88      callback.call(stream)
89    }
90  }
91  let readableFinished = isReadableFinished(stream, false)
92  const onend = () => {
93    readableFinished = true
94    // Stream should not be destroyed here. If it is that
95    // means that user space is doing something differently and
96    // we cannot trust willEmitClose.
97    if (stream.destroyed) {
98      willEmitClose = false
99    }
100    if (willEmitClose && (!stream.writable || writable)) {
101      return
102    }
103    if (!writable || writableFinished) {
104      callback.call(stream)
105    }
106  }
107  const onerror = (err) => {
108    callback.call(stream, err)
109  }
110  let closed = isClosed(stream)
111  const onclose = () => {
112    closed = true
113    const errored = isWritableErrored(stream) || isReadableErrored(stream)
114    if (errored && typeof errored !== 'boolean') {
115      return callback.call(stream, errored)
116    }
117    if (readable && !readableFinished && isReadableNodeStream(stream, true)) {
118      if (!isReadableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE())
119    }
120    if (writable && !writableFinished) {
121      if (!isWritableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE())
122    }
123    callback.call(stream)
124  }
125  const onclosed = () => {
126    closed = true
127    const errored = isWritableErrored(stream) || isReadableErrored(stream)
128    if (errored && typeof errored !== 'boolean') {
129      return callback.call(stream, errored)
130    }
131    callback.call(stream)
132  }
133  const onrequest = () => {
134    stream.req.on('finish', onfinish)
135  }
136  if (isRequest(stream)) {
137    stream.on('complete', onfinish)
138    if (!willEmitClose) {
139      stream.on('abort', onclose)
140    }
141    if (stream.req) {
142      onrequest()
143    } else {
144      stream.on('request', onrequest)
145    }
146  } else if (writable && !wState) {
147    // legacy streams
148    stream.on('end', onlegacyfinish)
149    stream.on('close', onlegacyfinish)
150  }
151
152  // Not all streams will emit 'close' after 'aborted'.
153  if (!willEmitClose && typeof stream.aborted === 'boolean') {
154    stream.on('aborted', onclose)
155  }
156  stream.on('end', onend)
157  stream.on('finish', onfinish)
158  if (options.error !== false) {
159    stream.on('error', onerror)
160  }
161  stream.on('close', onclose)
162  if (closed) {
163    process.nextTick(onclose)
164  } else if (
165    (wState !== null && wState !== undefined && wState.errorEmitted) ||
166    (rState !== null && rState !== undefined && rState.errorEmitted)
167  ) {
168    if (!willEmitClose) {
169      process.nextTick(onclosed)
170    }
171  } else if (
172    !readable &&
173    (!willEmitClose || isReadable(stream)) &&
174    (writableFinished || isWritable(stream) === false)
175  ) {
176    process.nextTick(onclosed)
177  } else if (
178    !writable &&
179    (!willEmitClose || isWritable(stream)) &&
180    (readableFinished || isReadable(stream) === false)
181  ) {
182    process.nextTick(onclosed)
183  } else if (rState && stream.req && stream.aborted) {
184    process.nextTick(onclosed)
185  }
186  const cleanup = () => {
187    callback = nop
188    stream.removeListener('aborted', onclose)
189    stream.removeListener('complete', onfinish)
190    stream.removeListener('abort', onclose)
191    stream.removeListener('request', onrequest)
192    if (stream.req) stream.req.removeListener('finish', onfinish)
193    stream.removeListener('end', onlegacyfinish)
194    stream.removeListener('close', onlegacyfinish)
195    stream.removeListener('finish', onfinish)
196    stream.removeListener('end', onend)
197    stream.removeListener('error', onerror)
198    stream.removeListener('close', onclose)
199  }
200  if (options.signal && !closed) {
201    const abort = () => {
202      // Keep it because cleanup removes it.
203      const endCallback = callback
204      cleanup()
205      endCallback.call(
206        stream,
207        new AbortError(undefined, {
208          cause: options.signal.reason
209        })
210      )
211    }
212    if (options.signal.aborted) {
213      process.nextTick(abort)
214    } else {
215      const originalCallback = callback
216      callback = once((...args) => {
217        options.signal.removeEventListener('abort', abort)
218        originalCallback.apply(stream, args)
219      })
220      options.signal.addEventListener('abort', abort)
221    }
222  }
223  return cleanup
224}
225function eosWeb(stream, options, callback) {
226  let isAborted = false
227  let abort = nop
228  if (options.signal) {
229    abort = () => {
230      isAborted = true
231      callback.call(
232        stream,
233        new AbortError(undefined, {
234          cause: options.signal.reason
235        })
236      )
237    }
238    if (options.signal.aborted) {
239      process.nextTick(abort)
240    } else {
241      const originalCallback = callback
242      callback = once((...args) => {
243        options.signal.removeEventListener('abort', abort)
244        originalCallback.apply(stream, args)
245      })
246      options.signal.addEventListener('abort', abort)
247    }
248  }
249  const resolverFn = (...args) => {
250    if (!isAborted) {
251      process.nextTick(() => callback.apply(stream, args))
252    }
253  }
254  PromisePrototypeThen(stream[kIsClosedPromise].promise, resolverFn, resolverFn)
255  return nop
256}
257function finished(stream, opts) {
258  var _opts
259  let autoCleanup = false
260  if (opts === null) {
261    opts = kEmptyObject
262  }
263  if ((_opts = opts) !== null && _opts !== undefined && _opts.cleanup) {
264    validateBoolean(opts.cleanup, 'cleanup')
265    autoCleanup = opts.cleanup
266  }
267  return new Promise((resolve, reject) => {
268    const cleanup = eos(stream, opts, (err) => {
269      if (autoCleanup) {
270        cleanup()
271      }
272      if (err) {
273        reject(err)
274      } else {
275        resolve()
276      }
277    })
278  })
279}
280module.exports = eos
281module.exports.finished = finished
282