1'use strict'; 2 3const { 4 isReadable, 5 isWritable, 6 isIterable, 7 isNodeStream, 8 isReadableNodeStream, 9 isWritableNodeStream, 10 isDuplexNodeStream, 11 isReadableStream, 12 isWritableStream, 13} = require('internal/streams/utils'); 14const eos = require('internal/streams/end-of-stream'); 15const { 16 AbortError, 17 codes: { 18 ERR_INVALID_ARG_TYPE, 19 ERR_INVALID_RETURN_VALUE, 20 }, 21} = require('internal/errors'); 22const { destroyer } = require('internal/streams/destroy'); 23const Duplex = require('internal/streams/duplex'); 24const Readable = require('internal/streams/readable'); 25const Writable = require('internal/streams/writable'); 26const { createDeferredPromise } = require('internal/util'); 27const from = require('internal/streams/from'); 28 29const { 30 isBlob, 31} = require('internal/blob'); 32const { AbortController } = require('internal/abort_controller'); 33 34const { 35 FunctionPrototypeCall, 36} = primordials; 37 38// This is needed for pre node 17. 39class Duplexify extends Duplex { 40 constructor(options) { 41 super(options); 42 43 // https://github.com/nodejs/node/pull/34385 44 45 if (options?.readable === false) { 46 this._readableState.readable = false; 47 this._readableState.ended = true; 48 this._readableState.endEmitted = true; 49 } 50 51 if (options?.writable === false) { 52 this._writableState.writable = false; 53 this._writableState.ending = true; 54 this._writableState.ended = true; 55 this._writableState.finished = true; 56 } 57 } 58} 59 60module.exports = function duplexify(body, name) { 61 if (isDuplexNodeStream(body)) { 62 return body; 63 } 64 65 if (isReadableNodeStream(body)) { 66 return _duplexify({ readable: body }); 67 } 68 69 if (isWritableNodeStream(body)) { 70 return _duplexify({ writable: body }); 71 } 72 73 if (isNodeStream(body)) { 74 return _duplexify({ writable: false, readable: false }); 75 } 76 77 if (isReadableStream(body)) { 78 return _duplexify({ readable: Readable.fromWeb(body) }); 79 } 80 81 if (isWritableStream(body)) { 82 return _duplexify({ writable: Writable.fromWeb(body) }); 83 } 84 85 if (typeof body === 'function') { 86 const { value, write, final, destroy } = fromAsyncGen(body); 87 88 if (isIterable(value)) { 89 return from(Duplexify, value, { 90 // TODO (ronag): highWaterMark? 91 objectMode: true, 92 write, 93 final, 94 destroy, 95 }); 96 } 97 98 const then = value?.then; 99 if (typeof then === 'function') { 100 let d; 101 102 const promise = FunctionPrototypeCall( 103 then, 104 value, 105 (val) => { 106 if (val != null) { 107 throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val); 108 } 109 }, 110 (err) => { 111 destroyer(d, err); 112 }, 113 ); 114 115 return d = new Duplexify({ 116 // TODO (ronag): highWaterMark? 117 objectMode: true, 118 readable: false, 119 write, 120 final(cb) { 121 final(async () => { 122 try { 123 await promise; 124 process.nextTick(cb, null); 125 } catch (err) { 126 process.nextTick(cb, err); 127 } 128 }); 129 }, 130 destroy, 131 }); 132 } 133 134 throw new ERR_INVALID_RETURN_VALUE( 135 'Iterable, AsyncIterable or AsyncFunction', name, value); 136 } 137 138 if (isBlob(body)) { 139 return duplexify(body.arrayBuffer()); 140 } 141 142 if (isIterable(body)) { 143 return from(Duplexify, body, { 144 // TODO (ronag): highWaterMark? 145 objectMode: true, 146 writable: false, 147 }); 148 } 149 150 if ( 151 isReadableStream(body?.readable) && 152 isWritableStream(body?.writable) 153 ) { 154 return Duplexify.fromWeb(body); 155 } 156 157 if ( 158 typeof body?.writable === 'object' || 159 typeof body?.readable === 'object' 160 ) { 161 const readable = body?.readable ? 162 isReadableNodeStream(body?.readable) ? body?.readable : 163 duplexify(body.readable) : 164 undefined; 165 166 const writable = body?.writable ? 167 isWritableNodeStream(body?.writable) ? body?.writable : 168 duplexify(body.writable) : 169 undefined; 170 171 return _duplexify({ readable, writable }); 172 } 173 174 const then = body?.then; 175 if (typeof then === 'function') { 176 let d; 177 178 FunctionPrototypeCall( 179 then, 180 body, 181 (val) => { 182 if (val != null) { 183 d.push(val); 184 } 185 d.push(null); 186 }, 187 (err) => { 188 destroyer(d, err); 189 }, 190 ); 191 192 return d = new Duplexify({ 193 objectMode: true, 194 writable: false, 195 read() {}, 196 }); 197 } 198 199 throw new ERR_INVALID_ARG_TYPE( 200 name, 201 ['Blob', 'ReadableStream', 'WritableStream', 'Stream', 'Iterable', 202 'AsyncIterable', 'Function', '{ readable, writable } pair', 'Promise'], 203 body); 204}; 205 206function fromAsyncGen(fn) { 207 let { promise, resolve } = createDeferredPromise(); 208 const ac = new AbortController(); 209 const signal = ac.signal; 210 const value = fn(async function*() { 211 while (true) { 212 const _promise = promise; 213 promise = null; 214 const { chunk, done, cb } = await _promise; 215 process.nextTick(cb); 216 if (done) return; 217 if (signal.aborted) 218 throw new AbortError(undefined, { cause: signal.reason }); 219 ({ promise, resolve } = createDeferredPromise()); 220 yield chunk; 221 } 222 }(), { signal }); 223 224 return { 225 value, 226 write(chunk, encoding, cb) { 227 const _resolve = resolve; 228 resolve = null; 229 _resolve({ chunk, done: false, cb }); 230 }, 231 final(cb) { 232 const _resolve = resolve; 233 resolve = null; 234 _resolve({ done: true, cb }); 235 }, 236 destroy(err, cb) { 237 ac.abort(); 238 cb(err); 239 }, 240 }; 241} 242 243function _duplexify(pair) { 244 const r = pair.readable && typeof pair.readable.read !== 'function' ? 245 Readable.wrap(pair.readable) : pair.readable; 246 const w = pair.writable; 247 248 let readable = !!isReadable(r); 249 let writable = !!isWritable(w); 250 251 let ondrain; 252 let onfinish; 253 let onreadable; 254 let onclose; 255 let d; 256 257 function onfinished(err) { 258 const cb = onclose; 259 onclose = null; 260 261 if (cb) { 262 cb(err); 263 } else if (err) { 264 d.destroy(err); 265 } 266 } 267 268 // TODO(ronag): Avoid double buffering. 269 // Implement Writable/Readable/Duplex traits. 270 // See, https://github.com/nodejs/node/pull/33515. 271 d = new Duplexify({ 272 // TODO (ronag): highWaterMark? 273 readableObjectMode: !!r?.readableObjectMode, 274 writableObjectMode: !!w?.writableObjectMode, 275 readable, 276 writable, 277 }); 278 279 if (writable) { 280 eos(w, (err) => { 281 writable = false; 282 if (err) { 283 destroyer(r, err); 284 } 285 onfinished(err); 286 }); 287 288 d._write = function(chunk, encoding, callback) { 289 if (w.write(chunk, encoding)) { 290 callback(); 291 } else { 292 ondrain = callback; 293 } 294 }; 295 296 d._final = function(callback) { 297 w.end(); 298 onfinish = callback; 299 }; 300 301 w.on('drain', function() { 302 if (ondrain) { 303 const cb = ondrain; 304 ondrain = null; 305 cb(); 306 } 307 }); 308 309 w.on('finish', function() { 310 if (onfinish) { 311 const cb = onfinish; 312 onfinish = null; 313 cb(); 314 } 315 }); 316 } 317 318 if (readable) { 319 eos(r, (err) => { 320 readable = false; 321 if (err) { 322 destroyer(r, err); 323 } 324 onfinished(err); 325 }); 326 327 r.on('readable', function() { 328 if (onreadable) { 329 const cb = onreadable; 330 onreadable = null; 331 cb(); 332 } 333 }); 334 335 r.on('end', function() { 336 d.push(null); 337 }); 338 339 d._read = function() { 340 while (true) { 341 const buf = r.read(); 342 343 if (buf === null) { 344 onreadable = d._read; 345 return; 346 } 347 348 if (!d.push(buf)) { 349 return; 350 } 351 } 352 }; 353 } 354 355 d._destroy = function(err, callback) { 356 if (!err && onclose !== null) { 357 err = new AbortError(); 358 } 359 360 onreadable = null; 361 ondrain = null; 362 onfinish = null; 363 364 if (onclose === null) { 365 callback(err); 366 } else { 367 onclose = callback; 368 destroyer(w, err); 369 destroyer(r, err); 370 } 371 }; 372 373 return d; 374} 375