• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  aggregateTwoErrors,
5  codes: {
6    ERR_MULTIPLE_CALLBACK,
7  },
8  AbortError,
9} = require('internal/errors');
10const {
11  Symbol,
12} = primordials;
13const {
14  kIsDestroyed,
15  isDestroyed,
16  isFinished,
17  isServerRequest,
18} = require('internal/streams/utils');
19
20const kDestroy = Symbol('kDestroy');
21const kConstruct = Symbol('kConstruct');
22
23function checkError(err, w, r) {
24  if (err) {
25    // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
26    err.stack; // eslint-disable-line no-unused-expressions
27
28    if (w && !w.errored) {
29      w.errored = err;
30    }
31    if (r && !r.errored) {
32      r.errored = err;
33    }
34  }
35}
36
37// Backwards compat. cb() is undocumented and unused in core but
38// unfortunately might be used by modules.
39function destroy(err, cb) {
40  const r = this._readableState;
41  const w = this._writableState;
42  // With duplex streams we use the writable side for state.
43  const s = w || r;
44
45  if (w?.destroyed || r?.destroyed) {
46    if (typeof cb === 'function') {
47      cb();
48    }
49
50    return this;
51  }
52
53
54  // We set destroyed to true before firing error callbacks in order
55  // to make it re-entrance safe in case destroy() is called within callbacks
56  checkError(err, w, r);
57
58  if (w) {
59    w.destroyed = true;
60  }
61  if (r) {
62    r.destroyed = true;
63  }
64
65  // If still constructing then defer calling _destroy.
66  if (!s.constructed) {
67    this.once(kDestroy, function(er) {
68      _destroy(this, aggregateTwoErrors(er, err), cb);
69    });
70  } else {
71    _destroy(this, err, cb);
72  }
73
74  return this;
75}
76
77function _destroy(self, err, cb) {
78  let called = false;
79
80  function onDestroy(err) {
81    if (called) {
82      return;
83    }
84    called = true;
85
86    const r = self._readableState;
87    const w = self._writableState;
88
89    checkError(err, w, r);
90
91    if (w) {
92      w.closed = true;
93    }
94    if (r) {
95      r.closed = true;
96    }
97
98    if (typeof cb === 'function') {
99      cb(err);
100    }
101
102    if (err) {
103      process.nextTick(emitErrorCloseNT, self, err);
104    } else {
105      process.nextTick(emitCloseNT, self);
106    }
107  }
108  try {
109    self._destroy(err || null, onDestroy);
110  } catch (err) {
111    onDestroy(err);
112  }
113}
114
115function emitErrorCloseNT(self, err) {
116  emitErrorNT(self, err);
117  emitCloseNT(self);
118}
119
120function emitCloseNT(self) {
121  const r = self._readableState;
122  const w = self._writableState;
123
124  if (w) {
125    w.closeEmitted = true;
126  }
127  if (r) {
128    r.closeEmitted = true;
129  }
130
131  if (w?.emitClose || r?.emitClose) {
132    self.emit('close');
133  }
134}
135
136function emitErrorNT(self, err) {
137  const r = self._readableState;
138  const w = self._writableState;
139
140  if (w?.errorEmitted || r?.errorEmitted) {
141    return;
142  }
143
144  if (w) {
145    w.errorEmitted = true;
146  }
147  if (r) {
148    r.errorEmitted = true;
149  }
150
151  self.emit('error', err);
152}
153
154function undestroy() {
155  const r = this._readableState;
156  const w = this._writableState;
157
158  if (r) {
159    r.constructed = true;
160    r.closed = false;
161    r.closeEmitted = false;
162    r.destroyed = false;
163    r.errored = null;
164    r.errorEmitted = false;
165    r.reading = false;
166    r.ended = r.readable === false;
167    r.endEmitted = r.readable === false;
168  }
169
170  if (w) {
171    w.constructed = true;
172    w.destroyed = false;
173    w.closed = false;
174    w.closeEmitted = false;
175    w.errored = null;
176    w.errorEmitted = false;
177    w.finalCalled = false;
178    w.prefinished = false;
179    w.ended = w.writable === false;
180    w.ending = w.writable === false;
181    w.finished = w.writable === false;
182  }
183}
184
185function errorOrDestroy(stream, err, sync) {
186  // We have tests that rely on errors being emitted
187  // in the same tick, so changing this is semver major.
188  // For now when you opt-in to autoDestroy we allow
189  // the error to be emitted nextTick. In a future
190  // semver major update we should change the default to this.
191
192  const r = stream._readableState;
193  const w = stream._writableState;
194
195  if (w?.destroyed || r?.destroyed) {
196    return this;
197  }
198
199  if (r?.autoDestroy || w?.autoDestroy)
200    stream.destroy(err);
201  else if (err) {
202    // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
203    err.stack; // eslint-disable-line no-unused-expressions
204
205    if (w && !w.errored) {
206      w.errored = err;
207    }
208    if (r && !r.errored) {
209      r.errored = err;
210    }
211    if (sync) {
212      process.nextTick(emitErrorNT, stream, err);
213    } else {
214      emitErrorNT(stream, err);
215    }
216  }
217}
218
219function construct(stream, cb) {
220  if (typeof stream._construct !== 'function') {
221    return;
222  }
223
224  const r = stream._readableState;
225  const w = stream._writableState;
226
227  if (r) {
228    r.constructed = false;
229  }
230  if (w) {
231    w.constructed = false;
232  }
233
234  stream.once(kConstruct, cb);
235
236  if (stream.listenerCount(kConstruct) > 1) {
237    // Duplex
238    return;
239  }
240
241  process.nextTick(constructNT, stream);
242}
243
244function constructNT(stream) {
245  let called = false;
246
247  function onConstruct(err) {
248    if (called) {
249      errorOrDestroy(stream, err ?? new ERR_MULTIPLE_CALLBACK());
250      return;
251    }
252    called = true;
253
254    const r = stream._readableState;
255    const w = stream._writableState;
256    const s = w || r;
257
258    if (r) {
259      r.constructed = true;
260    }
261    if (w) {
262      w.constructed = true;
263    }
264
265    if (s.destroyed) {
266      stream.emit(kDestroy, err);
267    } else if (err) {
268      errorOrDestroy(stream, err, true);
269    } else {
270      stream.emit(kConstruct);
271    }
272  }
273
274  try {
275    stream._construct((err) => {
276      process.nextTick(onConstruct, err);
277    });
278  } catch (err) {
279    process.nextTick(onConstruct, err);
280  }
281}
282
283function isRequest(stream) {
284  return stream?.setHeader && typeof stream.abort === 'function';
285}
286
287function emitCloseLegacy(stream) {
288  stream.emit('close');
289}
290
291function emitErrorCloseLegacy(stream, err) {
292  stream.emit('error', err);
293  process.nextTick(emitCloseLegacy, stream);
294}
295
296// Normalize destroy for legacy.
297function destroyer(stream, err) {
298  if (!stream || isDestroyed(stream)) {
299    return;
300  }
301
302  if (!err && !isFinished(stream)) {
303    err = new AbortError();
304  }
305
306  // TODO: Remove isRequest branches.
307  if (isServerRequest(stream)) {
308    stream.socket = null;
309    stream.destroy(err);
310  } else if (isRequest(stream)) {
311    stream.abort();
312  } else if (isRequest(stream.req)) {
313    stream.req.abort();
314  } else if (typeof stream.destroy === 'function') {
315    stream.destroy(err);
316  } else if (typeof stream.close === 'function') {
317    // TODO: Don't lose err?
318    stream.close();
319  } else if (err) {
320    process.nextTick(emitErrorCloseLegacy, stream, err);
321  } else {
322    process.nextTick(emitCloseLegacy, stream);
323  }
324
325  if (!stream.destroyed) {
326    stream[kIsDestroyed] = true;
327  }
328}
329
330module.exports = {
331  construct,
332  destroyer,
333  destroy,
334  undestroy,
335  errorOrDestroy,
336};
337