• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  PromisePrototypeThen,
5  PromiseResolve,
6  SafePromiseAll,
7  SafePromisePrototypeFinally,
8  TypedArrayPrototypeGetBuffer,
9  TypedArrayPrototypeGetByteOffset,
10  TypedArrayPrototypeGetByteLength,
11  Uint8Array,
12} = primordials;
13
14const { TextEncoder } = require('internal/encoding');
15
16const {
17  ReadableStream,
18  isReadableStream,
19} = require('internal/webstreams/readablestream');
20
21const {
22  WritableStream,
23  isWritableStream,
24} = require('internal/webstreams/writablestream');
25
26const {
27  CountQueuingStrategy,
28} = require('internal/webstreams/queuingstrategies');
29
30const {
31  Writable,
32  Readable,
33  Duplex,
34  destroy,
35} = require('stream');
36
37const {
38  isDestroyed,
39  isReadable,
40  isWritable,
41  isWritableEnded,
42} = require('internal/streams/utils');
43
44const {
45  Buffer,
46} = require('buffer');
47
48const {
49  errnoException,
50  codes: {
51    ERR_INVALID_ARG_TYPE,
52    ERR_INVALID_ARG_VALUE,
53    ERR_INVALID_STATE,
54    ERR_STREAM_PREMATURE_CLOSE,
55  },
56  AbortError,
57} = require('internal/errors');
58
59const {
60  createDeferredPromise,
61  kEmptyObject,
62  normalizeEncoding,
63} = require('internal/util');
64
65const {
66  validateBoolean,
67  validateFunction,
68  validateObject,
69} = require('internal/validators');
70
71const {
72  WriteWrap,
73  ShutdownWrap,
74  kReadBytesOrError,
75  kLastWriteWasAsync,
76  streamBaseState,
77} = internalBinding('stream_wrap');
78
79const finished = require('internal/streams/end-of-stream');
80
81const { UV_EOF } = internalBinding('uv');
82
83const encoder = new TextEncoder();
84
85/**
86 * @typedef {import('../../stream').Writable} Writable
87 * @typedef {import('../../stream').Readable} Readable
88 * @typedef {import('./writablestream').WritableStream} WritableStream
89 * @typedef {import('./readablestream').ReadableStream} ReadableStream
90 */
91
92/**
93 * @typedef {import('../abort_controller').AbortSignal} AbortSignal
94 */
95
96/**
97 * @param {Writable} streamWritable
98 * @returns {WritableStream}
99 */
100function newWritableStreamFromStreamWritable(streamWritable) {
101  // Not using the internal/streams/utils isWritableNodeStream utility
102  // here because it will return false if streamWritable is a Duplex
103  // whose writable option is false. For a Duplex that is not writable,
104  // we want it to pass this check but return a closed WritableStream.
105  // We check if the given stream is a stream.Writable or http.OutgoingMessage
106  const checkIfWritableOrOutgoingMessage =
107    streamWritable &&
108    typeof streamWritable?.write === 'function' &&
109    typeof streamWritable?.on === 'function';
110  if (!checkIfWritableOrOutgoingMessage) {
111    throw new ERR_INVALID_ARG_TYPE(
112      'streamWritable',
113      'stream.Writable',
114      streamWritable,
115    );
116  }
117
118  if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
119    const writable = new WritableStream();
120    writable.close();
121    return writable;
122  }
123
124  const highWaterMark = streamWritable.writableHighWaterMark;
125  const strategy =
126    streamWritable.writableObjectMode ?
127      new CountQueuingStrategy({ highWaterMark }) :
128      { highWaterMark };
129
130  let controller;
131  let backpressurePromise;
132  let closed;
133
134  function onDrain() {
135    if (backpressurePromise !== undefined)
136      backpressurePromise.resolve();
137  }
138
139  const cleanup = finished(streamWritable, (error) => {
140    if (error?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
141      const err = new AbortError(undefined, { cause: error });
142      error = err;
143    }
144
145    cleanup();
146    // This is a protection against non-standard, legacy streams
147    // that happen to emit an error event again after finished is called.
148    streamWritable.on('error', () => {});
149    if (error != null) {
150      if (backpressurePromise !== undefined)
151        backpressurePromise.reject(error);
152      // If closed is not undefined, the error is happening
153      // after the WritableStream close has already started.
154      // We need to reject it here.
155      if (closed !== undefined) {
156        closed.reject(error);
157        closed = undefined;
158      }
159      controller.error(error);
160      controller = undefined;
161      return;
162    }
163
164    if (closed !== undefined) {
165      closed.resolve();
166      closed = undefined;
167      return;
168    }
169    controller.error(new AbortError());
170    controller = undefined;
171  });
172
173  streamWritable.on('drain', onDrain);
174
175  return new WritableStream({
176    start(c) { controller = c; },
177
178    async write(chunk) {
179      if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
180        backpressurePromise = createDeferredPromise();
181        return SafePromisePrototypeFinally(
182          backpressurePromise.promise, () => {
183            backpressurePromise = undefined;
184          });
185      }
186    },
187
188    abort(reason) {
189      destroy(streamWritable, reason);
190    },
191
192    close() {
193      if (closed === undefined && !isWritableEnded(streamWritable)) {
194        closed = createDeferredPromise();
195        streamWritable.end();
196        return closed.promise;
197      }
198
199      controller = undefined;
200      return PromiseResolve();
201    },
202  }, strategy);
203}
204
205/**
206 * @param {WritableStream} writableStream
207 * @param {{
208 *   decodeStrings? : boolean,
209 *   highWaterMark? : number,
210 *   objectMode? : boolean,
211 *   signal? : AbortSignal,
212 * }} [options]
213 * @returns {Writable}
214 */
215function newStreamWritableFromWritableStream(writableStream, options = kEmptyObject) {
216  if (!isWritableStream(writableStream)) {
217    throw new ERR_INVALID_ARG_TYPE(
218      'writableStream',
219      'WritableStream',
220      writableStream);
221  }
222
223  validateObject(options, 'options');
224  const {
225    highWaterMark,
226    decodeStrings = true,
227    objectMode = false,
228    signal,
229  } = options;
230
231  validateBoolean(objectMode, 'options.objectMode');
232  validateBoolean(decodeStrings, 'options.decodeStrings');
233
234  const writer = writableStream.getWriter();
235  let closed = false;
236
237  const writable = new Writable({
238    highWaterMark,
239    objectMode,
240    decodeStrings,
241    signal,
242
243    writev(chunks, callback) {
244      function done(error) {
245        error = error.filter((e) => e);
246        try {
247          callback(error.length === 0 ? undefined : error);
248        } catch (error) {
249          // In a next tick because this is happening within
250          // a promise context, and if there are any errors
251          // thrown we don't want those to cause an unhandled
252          // rejection. Let's just escape the promise and
253          // handle it separately.
254          process.nextTick(() => destroy(writable, error));
255        }
256      }
257
258      PromisePrototypeThen(
259        writer.ready,
260        () => {
261          return PromisePrototypeThen(
262            SafePromiseAll(
263              chunks,
264              (data) => writer.write(data.chunk)),
265            done,
266            done);
267        },
268        done);
269    },
270
271    write(chunk, encoding, callback) {
272      if (typeof chunk === 'string' && decodeStrings && !objectMode) {
273        const enc = normalizeEncoding(encoding);
274
275        if (enc === 'utf8') {
276          chunk = encoder.encode(chunk);
277        } else {
278          chunk = Buffer.from(chunk, encoding);
279          chunk = new Uint8Array(
280            TypedArrayPrototypeGetBuffer(chunk),
281            TypedArrayPrototypeGetByteOffset(chunk),
282            TypedArrayPrototypeGetByteLength(chunk),
283          );
284        }
285      }
286
287      function done(error) {
288        try {
289          callback(error);
290        } catch (error) {
291          destroy(writable, error);
292        }
293      }
294
295      PromisePrototypeThen(
296        writer.ready,
297        () => {
298          return PromisePrototypeThen(
299            writer.write(chunk),
300            done,
301            done);
302        },
303        done);
304    },
305
306    destroy(error, callback) {
307      function done() {
308        try {
309          callback(error);
310        } catch (error) {
311          // In a next tick because this is happening within
312          // a promise context, and if there are any errors
313          // thrown we don't want those to cause an unhandled
314          // rejection. Let's just escape the promise and
315          // handle it separately.
316          process.nextTick(() => { throw error; });
317        }
318      }
319
320      if (!closed) {
321        if (error != null) {
322          PromisePrototypeThen(
323            writer.abort(error),
324            done,
325            done);
326        } else {
327          PromisePrototypeThen(
328            writer.close(),
329            done,
330            done);
331        }
332        return;
333      }
334
335      done();
336    },
337
338    final(callback) {
339      function done(error) {
340        try {
341          callback(error);
342        } catch (error) {
343          // In a next tick because this is happening within
344          // a promise context, and if there are any errors
345          // thrown we don't want those to cause an unhandled
346          // rejection. Let's just escape the promise and
347          // handle it separately.
348          process.nextTick(() => destroy(writable, error));
349        }
350      }
351
352      if (!closed) {
353        PromisePrototypeThen(
354          writer.close(),
355          done,
356          done);
357      }
358    },
359  });
360
361  PromisePrototypeThen(
362    writer.closed,
363    () => {
364      // If the WritableStream closes before the stream.Writable has been
365      // ended, we signal an error on the stream.Writable.
366      closed = true;
367      if (!isWritableEnded(writable))
368        destroy(writable, new ERR_STREAM_PREMATURE_CLOSE());
369    },
370    (error) => {
371      // If the WritableStream errors before the stream.Writable has been
372      // destroyed, signal an error on the stream.Writable.
373      closed = true;
374      destroy(writable, error);
375    });
376
377  return writable;
378}
379
380/**
381 * @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
382 * @param {Readable} streamReadable
383 * @param {{
384 *  strategy : QueuingStrategy
385 * }} [options]
386 * @returns {ReadableStream}
387 */
388function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObject) {
389  // Not using the internal/streams/utils isReadableNodeStream utility
390  // here because it will return false if streamReadable is a Duplex
391  // whose readable option is false. For a Duplex that is not readable,
392  // we want it to pass this check but return a closed ReadableStream.
393  if (typeof streamReadable?._readableState !== 'object') {
394    throw new ERR_INVALID_ARG_TYPE(
395      'streamReadable',
396      'stream.Readable',
397      streamReadable);
398  }
399
400  if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
401    const readable = new ReadableStream();
402    readable.cancel();
403    return readable;
404  }
405
406  const objectMode = streamReadable.readableObjectMode;
407  const highWaterMark = streamReadable.readableHighWaterMark;
408
409  const evaluateStrategyOrFallback = (strategy) => {
410    // If there is a strategy available, use it
411    if (strategy)
412      return strategy;
413
414    if (objectMode) {
415      // When running in objectMode explicitly but no strategy, we just fall
416      // back to CountQueuingStrategy
417      return new CountQueuingStrategy({ highWaterMark });
418    }
419
420    // When not running in objectMode explicitly, we just fall
421    // back to a minimal strategy that just specifies the highWaterMark
422    // and no size algorithm. Using a ByteLengthQueuingStrategy here
423    // is unnecessary.
424    return { highWaterMark };
425  };
426
427  const strategy = evaluateStrategyOrFallback(options?.strategy);
428
429  let controller;
430
431  function onData(chunk) {
432    // Copy the Buffer to detach it from the pool.
433    if (Buffer.isBuffer(chunk) && !objectMode)
434      chunk = new Uint8Array(chunk);
435    controller.enqueue(chunk);
436    if (controller.desiredSize <= 0)
437      streamReadable.pause();
438  }
439
440  streamReadable.pause();
441
442  const cleanup = finished(streamReadable, (error) => {
443    if (error?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
444      const err = new AbortError(undefined, { cause: error });
445      error = err;
446    }
447
448    cleanup();
449    // This is a protection against non-standard, legacy streams
450    // that happen to emit an error event again after finished is called.
451    streamReadable.on('error', () => {});
452    if (error)
453      return controller.error(error);
454    controller.close();
455  });
456
457  streamReadable.on('data', onData);
458
459  return new ReadableStream({
460    start(c) { controller = c; },
461
462    pull() { streamReadable.resume(); },
463
464    cancel(reason) {
465      destroy(streamReadable, reason);
466    },
467  }, strategy);
468}
469
470/**
471 * @param {ReadableStream} readableStream
472 * @param {{
473 *   highWaterMark? : number,
474 *   encoding? : string,
475 *   objectMode? : boolean,
476 *   signal? : AbortSignal,
477 * }} [options]
478 * @returns {Readable}
479 */
480function newStreamReadableFromReadableStream(readableStream, options = kEmptyObject) {
481  if (!isReadableStream(readableStream)) {
482    throw new ERR_INVALID_ARG_TYPE(
483      'readableStream',
484      'ReadableStream',
485      readableStream);
486  }
487
488  validateObject(options, 'options');
489  const {
490    highWaterMark,
491    encoding,
492    objectMode = false,
493    signal,
494  } = options;
495
496  if (encoding !== undefined && !Buffer.isEncoding(encoding))
497    throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding');
498  validateBoolean(objectMode, 'options.objectMode');
499
500  const reader = readableStream.getReader();
501  let closed = false;
502
503  const readable = new Readable({
504    objectMode,
505    highWaterMark,
506    encoding,
507    signal,
508
509    read() {
510      PromisePrototypeThen(
511        reader.read(),
512        (chunk) => {
513          if (chunk.done) {
514            // Value should always be undefined here.
515            readable.push(null);
516          } else {
517            readable.push(chunk.value);
518          }
519        },
520        (error) => destroy(readable, error));
521    },
522
523    destroy(error, callback) {
524      function done() {
525        try {
526          callback(error);
527        } catch (error) {
528          // In a next tick because this is happening within
529          // a promise context, and if there are any errors
530          // thrown we don't want those to cause an unhandled
531          // rejection. Let's just escape the promise and
532          // handle it separately.
533          process.nextTick(() => { throw error; });
534        }
535      }
536
537      if (!closed) {
538        PromisePrototypeThen(
539          reader.cancel(error),
540          done,
541          done);
542        return;
543      }
544      done();
545    },
546  });
547
548  PromisePrototypeThen(
549    reader.closed,
550    () => {
551      closed = true;
552    },
553    (error) => {
554      closed = true;
555      destroy(readable, error);
556    });
557
558  return readable;
559}
560
561/**
562 * @typedef {import('./readablestream').ReadableWritablePair
563 * } ReadableWritablePair
564 * @typedef {import('../../stream').Duplex} Duplex
565 */
566
567/**
568 * @param {Duplex} duplex
569 * @returns {ReadableWritablePair}
570 */
571function newReadableWritablePairFromDuplex(duplex) {
572  // Not using the internal/streams/utils isWritableNodeStream and
573  // isReadableNodeStream utilities here because they will return false
574  // if the duplex was created with writable or readable options set to
575  // false. Instead, we'll check the readable and writable state after
576  // and return closed WritableStream or closed ReadableStream as
577  // necessary.
578  if (typeof duplex?._writableState !== 'object' ||
579      typeof duplex?._readableState !== 'object') {
580    throw new ERR_INVALID_ARG_TYPE('duplex', 'stream.Duplex', duplex);
581  }
582
583  if (isDestroyed(duplex)) {
584    const writable = new WritableStream();
585    const readable = new ReadableStream();
586    writable.close();
587    readable.cancel();
588    return { readable, writable };
589  }
590
591  const writable =
592    isWritable(duplex) ?
593      newWritableStreamFromStreamWritable(duplex) :
594      new WritableStream();
595
596  if (!isWritable(duplex))
597    writable.close();
598
599  const readable =
600    isReadable(duplex) ?
601      newReadableStreamFromStreamReadable(duplex) :
602      new ReadableStream();
603
604  if (!isReadable(duplex))
605    readable.cancel();
606
607  return { writable, readable };
608}
609
610/**
611 * @param {ReadableWritablePair} pair
612 * @param {{
613 *   allowHalfOpen? : boolean,
614 *   decodeStrings? : boolean,
615 *   encoding? : string,
616 *   highWaterMark? : number,
617 *   objectMode? : boolean,
618 *   signal? : AbortSignal,
619 * }} [options]
620 * @returns {Duplex}
621 */
622function newStreamDuplexFromReadableWritablePair(pair = kEmptyObject, options = kEmptyObject) {
623  validateObject(pair, 'pair');
624  const {
625    readable: readableStream,
626    writable: writableStream,
627  } = pair;
628
629  if (!isReadableStream(readableStream)) {
630    throw new ERR_INVALID_ARG_TYPE(
631      'pair.readable',
632      'ReadableStream',
633      readableStream);
634  }
635  if (!isWritableStream(writableStream)) {
636    throw new ERR_INVALID_ARG_TYPE(
637      'pair.writable',
638      'WritableStream',
639      writableStream);
640  }
641
642  validateObject(options, 'options');
643  const {
644    allowHalfOpen = false,
645    objectMode = false,
646    encoding,
647    decodeStrings = true,
648    highWaterMark,
649    signal,
650  } = options;
651
652  validateBoolean(objectMode, 'options.objectMode');
653  if (encoding !== undefined && !Buffer.isEncoding(encoding))
654    throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding');
655
656  const writer = writableStream.getWriter();
657  const reader = readableStream.getReader();
658  let writableClosed = false;
659  let readableClosed = false;
660
661  const duplex = new Duplex({
662    allowHalfOpen,
663    highWaterMark,
664    objectMode,
665    encoding,
666    decodeStrings,
667    signal,
668
669    writev(chunks, callback) {
670      function done(error) {
671        error = error.filter((e) => e);
672        try {
673          callback(error.length === 0 ? undefined : error);
674        } catch (error) {
675          // In a next tick because this is happening within
676          // a promise context, and if there are any errors
677          // thrown we don't want those to cause an unhandled
678          // rejection. Let's just escape the promise and
679          // handle it separately.
680          process.nextTick(() => destroy(duplex, error));
681        }
682      }
683
684      PromisePrototypeThen(
685        writer.ready,
686        () => {
687          return PromisePrototypeThen(
688            SafePromiseAll(
689              chunks,
690              (data) => writer.write(data.chunk)),
691            done,
692            done);
693        },
694        done);
695    },
696
697    write(chunk, encoding, callback) {
698      if (typeof chunk === 'string' && decodeStrings && !objectMode) {
699        const enc = normalizeEncoding(encoding);
700
701        if (enc === 'utf8') {
702          chunk = encoder.encode(chunk);
703        } else {
704          chunk = Buffer.from(chunk, encoding);
705          chunk = new Uint8Array(
706            TypedArrayPrototypeGetBuffer(chunk),
707            TypedArrayPrototypeGetByteOffset(chunk),
708            TypedArrayPrototypeGetByteLength(chunk),
709          );
710        }
711      }
712
713      function done(error) {
714        try {
715          callback(error);
716        } catch (error) {
717          destroy(duplex, error);
718        }
719      }
720
721      PromisePrototypeThen(
722        writer.ready,
723        () => {
724          return PromisePrototypeThen(
725            writer.write(chunk),
726            done,
727            done);
728        },
729        done);
730    },
731
732    final(callback) {
733      function done(error) {
734        try {
735          callback(error);
736        } catch (error) {
737          // In a next tick because this is happening within
738          // a promise context, and if there are any errors
739          // thrown we don't want those to cause an unhandled
740          // rejection. Let's just escape the promise and
741          // handle it separately.
742          process.nextTick(() => destroy(duplex, error));
743        }
744      }
745
746      if (!writableClosed) {
747        PromisePrototypeThen(
748          writer.close(),
749          done,
750          done);
751      }
752    },
753
754    read() {
755      PromisePrototypeThen(
756        reader.read(),
757        (chunk) => {
758          if (chunk.done) {
759            duplex.push(null);
760          } else {
761            duplex.push(chunk.value);
762          }
763        },
764        (error) => destroy(duplex, error));
765    },
766
767    destroy(error, callback) {
768      function done() {
769        try {
770          callback(error);
771        } catch (error) {
772          // In a next tick because this is happening within
773          // a promise context, and if there are any errors
774          // thrown we don't want those to cause an unhandled
775          // rejection. Let's just escape the promise and
776          // handle it separately.
777          process.nextTick(() => { throw error; });
778        }
779      }
780
781      async function closeWriter() {
782        if (!writableClosed)
783          await writer.abort(error);
784      }
785
786      async function closeReader() {
787        if (!readableClosed)
788          await reader.cancel(error);
789      }
790
791      if (!writableClosed || !readableClosed) {
792        PromisePrototypeThen(
793          SafePromiseAll([
794            closeWriter(),
795            closeReader(),
796          ]),
797          done,
798          done);
799        return;
800      }
801
802      done();
803    },
804  });
805
806  PromisePrototypeThen(
807    writer.closed,
808    () => {
809      writableClosed = true;
810      if (!isWritableEnded(duplex))
811        destroy(duplex, new ERR_STREAM_PREMATURE_CLOSE());
812    },
813    (error) => {
814      writableClosed = true;
815      readableClosed = true;
816      destroy(duplex, error);
817    });
818
819  PromisePrototypeThen(
820    reader.closed,
821    () => {
822      readableClosed = true;
823    },
824    (error) => {
825      writableClosed = true;
826      readableClosed = true;
827      destroy(duplex, error);
828    });
829
830  return duplex;
831}
832
833/**
834 * @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
835 * @typedef {{}} StreamBase
836 * @param {StreamBase} streamBase
837 * @param {QueuingStrategy} strategy
838 * @returns {WritableStream}
839 */
840function newWritableStreamFromStreamBase(streamBase, strategy) {
841  validateObject(streamBase, 'streamBase');
842
843  let current;
844
845  function createWriteWrap(controller, promise) {
846    const req = new WriteWrap();
847    req.handle = streamBase;
848    req.oncomplete = onWriteComplete;
849    req.async = false;
850    req.bytes = 0;
851    req.buffer = null;
852    req.controller = controller;
853    req.promise = promise;
854    return req;
855  }
856
857  function onWriteComplete(status) {
858    if (status < 0) {
859      const error = errnoException(status, 'write', this.error);
860      this.promise.reject(error);
861      this.controller.error(error);
862      return;
863    }
864    this.promise.resolve();
865  }
866
867  function doWrite(chunk, controller) {
868    const promise = createDeferredPromise();
869    let ret;
870    let req;
871    try {
872      req = createWriteWrap(controller, promise);
873      ret = streamBase.writeBuffer(req, chunk);
874      if (streamBaseState[kLastWriteWasAsync])
875        req.buffer = chunk;
876      req.async = !!streamBaseState[kLastWriteWasAsync];
877    } catch (error) {
878      promise.reject(error);
879    }
880
881    if (ret !== 0)
882      promise.reject(errnoException(ret, 'write', req));
883    else if (!req.async)
884      promise.resolve();
885
886    return promise.promise;
887  }
888
889  return new WritableStream({
890    write(chunk, controller) {
891      current = current !== undefined ?
892        PromisePrototypeThen(
893          current,
894          () => doWrite(chunk, controller),
895          (error) => controller.error(error)) :
896        doWrite(chunk, controller);
897      return current;
898    },
899
900    close() {
901      const promise = createDeferredPromise();
902      const req = new ShutdownWrap();
903      req.oncomplete = () => promise.resolve();
904      const err = streamBase.shutdown(req);
905      if (err === 1)
906        promise.resolve();
907      return promise.promise;
908    },
909  }, strategy);
910}
911
912/**
913 * @param {StreamBase} streamBase
914 * @param {QueuingStrategy} strategy
915 * @returns {ReadableStream}
916 */
917function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyObject) {
918  validateObject(streamBase, 'streamBase');
919  validateObject(options, 'options');
920
921  const {
922    ondone = () => {},
923  } = options;
924
925  if (typeof streamBase.onread === 'function')
926    throw new ERR_INVALID_STATE('StreamBase already has a consumer');
927
928  validateFunction(ondone, 'options.ondone');
929
930  let controller;
931
932  streamBase.onread = (arrayBuffer) => {
933    const nread = streamBaseState[kReadBytesOrError];
934
935    if (nread === 0)
936      return;
937
938    try {
939      if (nread === UV_EOF) {
940        controller.close();
941        streamBase.readStop();
942        try {
943          ondone();
944        } catch (error) {
945          controller.error(error);
946        }
947        return;
948      }
949
950      controller.enqueue(arrayBuffer);
951
952      if (controller.desiredSize <= 0)
953        streamBase.readStop();
954    } catch (error) {
955      controller.error(error);
956      streamBase.readStop();
957    }
958  };
959
960  return new ReadableStream({
961    start(c) { controller = c; },
962
963    pull() {
964      streamBase.readStart();
965    },
966
967    cancel() {
968      const promise = createDeferredPromise();
969      try {
970        ondone();
971      } catch (error) {
972        promise.reject(error);
973        return promise.promise;
974      }
975      const req = new ShutdownWrap();
976      req.oncomplete = () => promise.resolve();
977      const err = streamBase.shutdown(req);
978      if (err === 1)
979        promise.resolve();
980      return promise.promise;
981    },
982  }, strategy);
983}
984
985module.exports = {
986  newWritableStreamFromStreamWritable,
987  newReadableStreamFromStreamReadable,
988  newStreamWritableFromWritableStream,
989  newStreamReadableFromReadableStream,
990  newReadableWritablePairFromDuplex,
991  newStreamDuplexFromReadableWritablePair,
992  newWritableStreamFromStreamBase,
993  newReadableStreamFromStreamBase,
994};
995