• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  isReadable,
5  isWritable,
6  isIterable,
7  isNodeStream,
8  isReadableNodeStream,
9  isWritableNodeStream,
10  isDuplexNodeStream,
11  isReadableStream,
12  isWritableStream,
13} = require('internal/streams/utils');
14const eos = require('internal/streams/end-of-stream');
15const {
16  AbortError,
17  codes: {
18    ERR_INVALID_ARG_TYPE,
19    ERR_INVALID_RETURN_VALUE,
20  },
21} = require('internal/errors');
22const { destroyer } = require('internal/streams/destroy');
23const Duplex = require('internal/streams/duplex');
24const Readable = require('internal/streams/readable');
25const Writable = require('internal/streams/writable');
26const { createDeferredPromise } = require('internal/util');
27const from = require('internal/streams/from');
28
29const {
30  isBlob,
31} = require('internal/blob');
32const { AbortController } = require('internal/abort_controller');
33
34const {
35  FunctionPrototypeCall,
36} = primordials;
37
38// This is needed for pre node 17.
39class Duplexify extends Duplex {
40  constructor(options) {
41    super(options);
42
43    // https://github.com/nodejs/node/pull/34385
44
45    if (options?.readable === false) {
46      this._readableState.readable = false;
47      this._readableState.ended = true;
48      this._readableState.endEmitted = true;
49    }
50
51    if (options?.writable === false) {
52      this._writableState.writable = false;
53      this._writableState.ending = true;
54      this._writableState.ended = true;
55      this._writableState.finished = true;
56    }
57  }
58}
59
60module.exports = function duplexify(body, name) {
61  if (isDuplexNodeStream(body)) {
62    return body;
63  }
64
65  if (isReadableNodeStream(body)) {
66    return _duplexify({ readable: body });
67  }
68
69  if (isWritableNodeStream(body)) {
70    return _duplexify({ writable: body });
71  }
72
73  if (isNodeStream(body)) {
74    return _duplexify({ writable: false, readable: false });
75  }
76
77  if (isReadableStream(body)) {
78    return _duplexify({ readable: Readable.fromWeb(body) });
79  }
80
81  if (isWritableStream(body)) {
82    return _duplexify({ writable: Writable.fromWeb(body) });
83  }
84
85  if (typeof body === 'function') {
86    const { value, write, final, destroy } = fromAsyncGen(body);
87
88    if (isIterable(value)) {
89      return from(Duplexify, value, {
90        // TODO (ronag): highWaterMark?
91        objectMode: true,
92        write,
93        final,
94        destroy,
95      });
96    }
97
98    const then = value?.then;
99    if (typeof then === 'function') {
100      let d;
101
102      const promise = FunctionPrototypeCall(
103        then,
104        value,
105        (val) => {
106          if (val != null) {
107            throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val);
108          }
109        },
110        (err) => {
111          destroyer(d, err);
112        },
113      );
114
115      return d = new Duplexify({
116        // TODO (ronag): highWaterMark?
117        objectMode: true,
118        readable: false,
119        write,
120        final(cb) {
121          final(async () => {
122            try {
123              await promise;
124              process.nextTick(cb, null);
125            } catch (err) {
126              process.nextTick(cb, err);
127            }
128          });
129        },
130        destroy,
131      });
132    }
133
134    throw new ERR_INVALID_RETURN_VALUE(
135      'Iterable, AsyncIterable or AsyncFunction', name, value);
136  }
137
138  if (isBlob(body)) {
139    return duplexify(body.arrayBuffer());
140  }
141
142  if (isIterable(body)) {
143    return from(Duplexify, body, {
144      // TODO (ronag): highWaterMark?
145      objectMode: true,
146      writable: false,
147    });
148  }
149
150  if (
151    isReadableStream(body?.readable) &&
152    isWritableStream(body?.writable)
153  ) {
154    return Duplexify.fromWeb(body);
155  }
156
157  if (
158    typeof body?.writable === 'object' ||
159    typeof body?.readable === 'object'
160  ) {
161    const readable = body?.readable ?
162      isReadableNodeStream(body?.readable) ? body?.readable :
163        duplexify(body.readable) :
164      undefined;
165
166    const writable = body?.writable ?
167      isWritableNodeStream(body?.writable) ? body?.writable :
168        duplexify(body.writable) :
169      undefined;
170
171    return _duplexify({ readable, writable });
172  }
173
174  const then = body?.then;
175  if (typeof then === 'function') {
176    let d;
177
178    FunctionPrototypeCall(
179      then,
180      body,
181      (val) => {
182        if (val != null) {
183          d.push(val);
184        }
185        d.push(null);
186      },
187      (err) => {
188        destroyer(d, err);
189      },
190    );
191
192    return d = new Duplexify({
193      objectMode: true,
194      writable: false,
195      read() {},
196    });
197  }
198
199  throw new ERR_INVALID_ARG_TYPE(
200    name,
201    ['Blob', 'ReadableStream', 'WritableStream', 'Stream', 'Iterable',
202     'AsyncIterable', 'Function', '{ readable, writable } pair', 'Promise'],
203    body);
204};
205
206function fromAsyncGen(fn) {
207  let { promise, resolve } = createDeferredPromise();
208  const ac = new AbortController();
209  const signal = ac.signal;
210  const value = fn(async function*() {
211    while (true) {
212      const _promise = promise;
213      promise = null;
214      const { chunk, done, cb } = await _promise;
215      process.nextTick(cb);
216      if (done) return;
217      if (signal.aborted)
218        throw new AbortError(undefined, { cause: signal.reason });
219      ({ promise, resolve } = createDeferredPromise());
220      yield chunk;
221    }
222  }(), { signal });
223
224  return {
225    value,
226    write(chunk, encoding, cb) {
227      const _resolve = resolve;
228      resolve = null;
229      _resolve({ chunk, done: false, cb });
230    },
231    final(cb) {
232      const _resolve = resolve;
233      resolve = null;
234      _resolve({ done: true, cb });
235    },
236    destroy(err, cb) {
237      ac.abort();
238      cb(err);
239    },
240  };
241}
242
243function _duplexify(pair) {
244  const r = pair.readable && typeof pair.readable.read !== 'function' ?
245    Readable.wrap(pair.readable) : pair.readable;
246  const w = pair.writable;
247
248  let readable = !!isReadable(r);
249  let writable = !!isWritable(w);
250
251  let ondrain;
252  let onfinish;
253  let onreadable;
254  let onclose;
255  let d;
256
257  function onfinished(err) {
258    const cb = onclose;
259    onclose = null;
260
261    if (cb) {
262      cb(err);
263    } else if (err) {
264      d.destroy(err);
265    }
266  }
267
268  // TODO(ronag): Avoid double buffering.
269  // Implement Writable/Readable/Duplex traits.
270  // See, https://github.com/nodejs/node/pull/33515.
271  d = new Duplexify({
272    // TODO (ronag): highWaterMark?
273    readableObjectMode: !!r?.readableObjectMode,
274    writableObjectMode: !!w?.writableObjectMode,
275    readable,
276    writable,
277  });
278
279  if (writable) {
280    eos(w, (err) => {
281      writable = false;
282      if (err) {
283        destroyer(r, err);
284      }
285      onfinished(err);
286    });
287
288    d._write = function(chunk, encoding, callback) {
289      if (w.write(chunk, encoding)) {
290        callback();
291      } else {
292        ondrain = callback;
293      }
294    };
295
296    d._final = function(callback) {
297      w.end();
298      onfinish = callback;
299    };
300
301    w.on('drain', function() {
302      if (ondrain) {
303        const cb = ondrain;
304        ondrain = null;
305        cb();
306      }
307    });
308
309    w.on('finish', function() {
310      if (onfinish) {
311        const cb = onfinish;
312        onfinish = null;
313        cb();
314      }
315    });
316  }
317
318  if (readable) {
319    eos(r, (err) => {
320      readable = false;
321      if (err) {
322        destroyer(r, err);
323      }
324      onfinished(err);
325    });
326
327    r.on('readable', function() {
328      if (onreadable) {
329        const cb = onreadable;
330        onreadable = null;
331        cb();
332      }
333    });
334
335    r.on('end', function() {
336      d.push(null);
337    });
338
339    d._read = function() {
340      while (true) {
341        const buf = r.read();
342
343        if (buf === null) {
344          onreadable = d._read;
345          return;
346        }
347
348        if (!d.push(buf)) {
349          return;
350        }
351      }
352    };
353  }
354
355  d._destroy = function(err, callback) {
356    if (!err && onclose !== null) {
357      err = new AbortError();
358    }
359
360    onreadable = null;
361    ondrain = null;
362    onfinish = null;
363
364    if (onclose === null) {
365      callback(err);
366    } else {
367      onclose = callback;
368      destroyer(w, err);
369      destroyer(r, err);
370    }
371  };
372
373  return d;
374}
375