1'use strict' 2 3const { 4 Readable, 5 Duplex, 6 PassThrough 7} = require('stream') 8const { 9 InvalidArgumentError, 10 InvalidReturnValueError, 11 RequestAbortedError 12} = require('../core/errors') 13const util = require('../core/util') 14const { AsyncResource } = require('async_hooks') 15const { addSignal, removeSignal } = require('./abort-signal') 16const assert = require('assert') 17 18const kResume = Symbol('resume') 19 20class PipelineRequest extends Readable { 21 constructor () { 22 super({ autoDestroy: true }) 23 24 this[kResume] = null 25 } 26 27 _read () { 28 const { [kResume]: resume } = this 29 30 if (resume) { 31 this[kResume] = null 32 resume() 33 } 34 } 35 36 _destroy (err, callback) { 37 this._read() 38 39 callback(err) 40 } 41} 42 43class PipelineResponse extends Readable { 44 constructor (resume) { 45 super({ autoDestroy: true }) 46 this[kResume] = resume 47 } 48 49 _read () { 50 this[kResume]() 51 } 52 53 _destroy (err, callback) { 54 if (!err && !this._readableState.endEmitted) { 55 err = new RequestAbortedError() 56 } 57 58 callback(err) 59 } 60} 61 62class PipelineHandler extends AsyncResource { 63 constructor (opts, handler) { 64 if (!opts || typeof opts !== 'object') { 65 throw new InvalidArgumentError('invalid opts') 66 } 67 68 if (typeof handler !== 'function') { 69 throw new InvalidArgumentError('invalid handler') 70 } 71 72 const { signal, method, opaque, onInfo, responseHeaders } = opts 73 74 if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { 75 throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') 76 } 77 78 if (method === 'CONNECT') { 79 throw new InvalidArgumentError('invalid method') 80 } 81 82 if (onInfo && typeof onInfo !== 'function') { 83 throw new InvalidArgumentError('invalid onInfo callback') 84 } 85 86 super('UNDICI_PIPELINE') 87 88 this.opaque = opaque || null 89 this.responseHeaders = responseHeaders || null 90 this.handler = handler 91 this.abort = null 92 this.context = null 93 this.onInfo = onInfo || null 94 95 this.req = new PipelineRequest().on('error', util.nop) 96 97 this.ret = new Duplex({ 98 readableObjectMode: opts.objectMode, 99 autoDestroy: true, 100 read: () => { 101 const { body } = this 102 103 if (body && body.resume) { 104 body.resume() 105 } 106 }, 107 write: (chunk, encoding, callback) => { 108 const { req } = this 109 110 if (req.push(chunk, encoding) || req._readableState.destroyed) { 111 callback() 112 } else { 113 req[kResume] = callback 114 } 115 }, 116 destroy: (err, callback) => { 117 const { body, req, res, ret, abort } = this 118 119 if (!err && !ret._readableState.endEmitted) { 120 err = new RequestAbortedError() 121 } 122 123 if (abort && err) { 124 abort() 125 } 126 127 util.destroy(body, err) 128 util.destroy(req, err) 129 util.destroy(res, err) 130 131 removeSignal(this) 132 133 callback(err) 134 } 135 }).on('prefinish', () => { 136 const { req } = this 137 138 // Node < 15 does not call _final in same tick. 139 req.push(null) 140 }) 141 142 this.res = null 143 144 addSignal(this, signal) 145 } 146 147 onConnect (abort, context) { 148 const { ret, res } = this 149 150 assert(!res, 'pipeline cannot be retried') 151 152 if (ret.destroyed) { 153 throw new RequestAbortedError() 154 } 155 156 this.abort = abort 157 this.context = context 158 } 159 160 onHeaders (statusCode, rawHeaders, resume) { 161 const { opaque, handler, context } = this 162 163 if (statusCode < 200) { 164 if (this.onInfo) { 165 const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) 166 this.onInfo({ statusCode, headers }) 167 } 168 return 169 } 170 171 this.res = new PipelineResponse(resume) 172 173 let body 174 try { 175 this.handler = null 176 const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) 177 body = this.runInAsyncScope(handler, null, { 178 statusCode, 179 headers, 180 opaque, 181 body: this.res, 182 context 183 }) 184 } catch (err) { 185 this.res.on('error', util.nop) 186 throw err 187 } 188 189 if (!body || typeof body.on !== 'function') { 190 throw new InvalidReturnValueError('expected Readable') 191 } 192 193 body 194 .on('data', (chunk) => { 195 const { ret, body } = this 196 197 if (!ret.push(chunk) && body.pause) { 198 body.pause() 199 } 200 }) 201 .on('error', (err) => { 202 const { ret } = this 203 204 util.destroy(ret, err) 205 }) 206 .on('end', () => { 207 const { ret } = this 208 209 ret.push(null) 210 }) 211 .on('close', () => { 212 const { ret } = this 213 214 if (!ret._readableState.ended) { 215 util.destroy(ret, new RequestAbortedError()) 216 } 217 }) 218 219 this.body = body 220 } 221 222 onData (chunk) { 223 const { res } = this 224 return res.push(chunk) 225 } 226 227 onComplete (trailers) { 228 const { res } = this 229 res.push(null) 230 } 231 232 onError (err) { 233 const { ret } = this 234 this.handler = null 235 util.destroy(ret, err) 236 } 237} 238 239function pipeline (opts, handler) { 240 try { 241 const pipelineHandler = new PipelineHandler(opts, handler) 242 this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler) 243 return pipelineHandler.ret 244 } catch (err) { 245 return new PassThrough().destroy(err) 246 } 247} 248 249module.exports = pipeline 250