1'use strict'; 2 3const { 4 aggregateTwoErrors, 5 codes: { 6 ERR_MULTIPLE_CALLBACK, 7 }, 8 AbortError, 9} = require('internal/errors'); 10const { 11 Symbol, 12} = primordials; 13const { 14 kIsDestroyed, 15 isDestroyed, 16 isFinished, 17 isServerRequest, 18} = require('internal/streams/utils'); 19 20const kDestroy = Symbol('kDestroy'); 21const kConstruct = Symbol('kConstruct'); 22 23function checkError(err, w, r) { 24 if (err) { 25 // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 26 err.stack; // eslint-disable-line no-unused-expressions 27 28 if (w && !w.errored) { 29 w.errored = err; 30 } 31 if (r && !r.errored) { 32 r.errored = err; 33 } 34 } 35} 36 37// Backwards compat. cb() is undocumented and unused in core but 38// unfortunately might be used by modules. 39function destroy(err, cb) { 40 const r = this._readableState; 41 const w = this._writableState; 42 // With duplex streams we use the writable side for state. 43 const s = w || r; 44 45 if (w?.destroyed || r?.destroyed) { 46 if (typeof cb === 'function') { 47 cb(); 48 } 49 50 return this; 51 } 52 53 54 // We set destroyed to true before firing error callbacks in order 55 // to make it re-entrance safe in case destroy() is called within callbacks 56 checkError(err, w, r); 57 58 if (w) { 59 w.destroyed = true; 60 } 61 if (r) { 62 r.destroyed = true; 63 } 64 65 // If still constructing then defer calling _destroy. 66 if (!s.constructed) { 67 this.once(kDestroy, function(er) { 68 _destroy(this, aggregateTwoErrors(er, err), cb); 69 }); 70 } else { 71 _destroy(this, err, cb); 72 } 73 74 return this; 75} 76 77function _destroy(self, err, cb) { 78 let called = false; 79 80 function onDestroy(err) { 81 if (called) { 82 return; 83 } 84 called = true; 85 86 const r = self._readableState; 87 const w = self._writableState; 88 89 checkError(err, w, r); 90 91 if (w) { 92 w.closed = true; 93 } 94 if (r) { 95 r.closed = true; 96 } 97 98 if (typeof cb === 'function') { 99 cb(err); 100 } 101 102 if (err) { 103 process.nextTick(emitErrorCloseNT, self, err); 104 } else { 105 process.nextTick(emitCloseNT, self); 106 } 107 } 108 try { 109 self._destroy(err || null, onDestroy); 110 } catch (err) { 111 onDestroy(err); 112 } 113} 114 115function emitErrorCloseNT(self, err) { 116 emitErrorNT(self, err); 117 emitCloseNT(self); 118} 119 120function emitCloseNT(self) { 121 const r = self._readableState; 122 const w = self._writableState; 123 124 if (w) { 125 w.closeEmitted = true; 126 } 127 if (r) { 128 r.closeEmitted = true; 129 } 130 131 if (w?.emitClose || r?.emitClose) { 132 self.emit('close'); 133 } 134} 135 136function emitErrorNT(self, err) { 137 const r = self._readableState; 138 const w = self._writableState; 139 140 if (w?.errorEmitted || r?.errorEmitted) { 141 return; 142 } 143 144 if (w) { 145 w.errorEmitted = true; 146 } 147 if (r) { 148 r.errorEmitted = true; 149 } 150 151 self.emit('error', err); 152} 153 154function undestroy() { 155 const r = this._readableState; 156 const w = this._writableState; 157 158 if (r) { 159 r.constructed = true; 160 r.closed = false; 161 r.closeEmitted = false; 162 r.destroyed = false; 163 r.errored = null; 164 r.errorEmitted = false; 165 r.reading = false; 166 r.ended = r.readable === false; 167 r.endEmitted = r.readable === false; 168 } 169 170 if (w) { 171 w.constructed = true; 172 w.destroyed = false; 173 w.closed = false; 174 w.closeEmitted = false; 175 w.errored = null; 176 w.errorEmitted = false; 177 w.finalCalled = false; 178 w.prefinished = false; 179 w.ended = w.writable === false; 180 w.ending = w.writable === false; 181 w.finished = w.writable === false; 182 } 183} 184 185function errorOrDestroy(stream, err, sync) { 186 // We have tests that rely on errors being emitted 187 // in the same tick, so changing this is semver major. 188 // For now when you opt-in to autoDestroy we allow 189 // the error to be emitted nextTick. In a future 190 // semver major update we should change the default to this. 191 192 const r = stream._readableState; 193 const w = stream._writableState; 194 195 if (w?.destroyed || r?.destroyed) { 196 return this; 197 } 198 199 if (r?.autoDestroy || w?.autoDestroy) 200 stream.destroy(err); 201 else if (err) { 202 // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 203 err.stack; // eslint-disable-line no-unused-expressions 204 205 if (w && !w.errored) { 206 w.errored = err; 207 } 208 if (r && !r.errored) { 209 r.errored = err; 210 } 211 if (sync) { 212 process.nextTick(emitErrorNT, stream, err); 213 } else { 214 emitErrorNT(stream, err); 215 } 216 } 217} 218 219function construct(stream, cb) { 220 if (typeof stream._construct !== 'function') { 221 return; 222 } 223 224 const r = stream._readableState; 225 const w = stream._writableState; 226 227 if (r) { 228 r.constructed = false; 229 } 230 if (w) { 231 w.constructed = false; 232 } 233 234 stream.once(kConstruct, cb); 235 236 if (stream.listenerCount(kConstruct) > 1) { 237 // Duplex 238 return; 239 } 240 241 process.nextTick(constructNT, stream); 242} 243 244function constructNT(stream) { 245 let called = false; 246 247 function onConstruct(err) { 248 if (called) { 249 errorOrDestroy(stream, err ?? new ERR_MULTIPLE_CALLBACK()); 250 return; 251 } 252 called = true; 253 254 const r = stream._readableState; 255 const w = stream._writableState; 256 const s = w || r; 257 258 if (r) { 259 r.constructed = true; 260 } 261 if (w) { 262 w.constructed = true; 263 } 264 265 if (s.destroyed) { 266 stream.emit(kDestroy, err); 267 } else if (err) { 268 errorOrDestroy(stream, err, true); 269 } else { 270 stream.emit(kConstruct); 271 } 272 } 273 274 try { 275 stream._construct((err) => { 276 process.nextTick(onConstruct, err); 277 }); 278 } catch (err) { 279 process.nextTick(onConstruct, err); 280 } 281} 282 283function isRequest(stream) { 284 return stream?.setHeader && typeof stream.abort === 'function'; 285} 286 287function emitCloseLegacy(stream) { 288 stream.emit('close'); 289} 290 291function emitErrorCloseLegacy(stream, err) { 292 stream.emit('error', err); 293 process.nextTick(emitCloseLegacy, stream); 294} 295 296// Normalize destroy for legacy. 297function destroyer(stream, err) { 298 if (!stream || isDestroyed(stream)) { 299 return; 300 } 301 302 if (!err && !isFinished(stream)) { 303 err = new AbortError(); 304 } 305 306 // TODO: Remove isRequest branches. 307 if (isServerRequest(stream)) { 308 stream.socket = null; 309 stream.destroy(err); 310 } else if (isRequest(stream)) { 311 stream.abort(); 312 } else if (isRequest(stream.req)) { 313 stream.req.abort(); 314 } else if (typeof stream.destroy === 'function') { 315 stream.destroy(err); 316 } else if (typeof stream.close === 'function') { 317 // TODO: Don't lose err? 318 stream.close(); 319 } else if (err) { 320 process.nextTick(emitErrorCloseLegacy, stream, err); 321 } else { 322 process.nextTick(emitCloseLegacy, stream); 323 } 324 325 if (!stream.destroyed) { 326 stream[kIsDestroyed] = true; 327 } 328} 329 330module.exports = { 331 construct, 332 destroyer, 333 destroy, 334 undestroy, 335 errorOrDestroy, 336}; 337