1'use strict'; 2 3const { 4 PromisePrototypeThen, 5 PromiseResolve, 6 SafePromiseAll, 7 SafePromisePrototypeFinally, 8 TypedArrayPrototypeGetBuffer, 9 TypedArrayPrototypeGetByteOffset, 10 TypedArrayPrototypeGetByteLength, 11 Uint8Array, 12} = primordials; 13 14const { TextEncoder } = require('internal/encoding'); 15 16const { 17 ReadableStream, 18 isReadableStream, 19} = require('internal/webstreams/readablestream'); 20 21const { 22 WritableStream, 23 isWritableStream, 24} = require('internal/webstreams/writablestream'); 25 26const { 27 CountQueuingStrategy, 28} = require('internal/webstreams/queuingstrategies'); 29 30const { 31 Writable, 32 Readable, 33 Duplex, 34 destroy, 35} = require('stream'); 36 37const { 38 isDestroyed, 39 isReadable, 40 isWritable, 41 isWritableEnded, 42} = require('internal/streams/utils'); 43 44const { 45 Buffer, 46} = require('buffer'); 47 48const { 49 errnoException, 50 codes: { 51 ERR_INVALID_ARG_TYPE, 52 ERR_INVALID_ARG_VALUE, 53 ERR_INVALID_STATE, 54 ERR_STREAM_PREMATURE_CLOSE, 55 }, 56 AbortError, 57} = require('internal/errors'); 58 59const { 60 createDeferredPromise, 61 kEmptyObject, 62 normalizeEncoding, 63} = require('internal/util'); 64 65const { 66 validateBoolean, 67 validateFunction, 68 validateObject, 69} = require('internal/validators'); 70 71const { 72 WriteWrap, 73 ShutdownWrap, 74 kReadBytesOrError, 75 kLastWriteWasAsync, 76 streamBaseState, 77} = internalBinding('stream_wrap'); 78 79const finished = require('internal/streams/end-of-stream'); 80 81const { UV_EOF } = internalBinding('uv'); 82 83const encoder = new TextEncoder(); 84 85/** 86 * @typedef {import('../../stream').Writable} Writable 87 * @typedef {import('../../stream').Readable} Readable 88 * @typedef {import('./writablestream').WritableStream} WritableStream 89 * @typedef {import('./readablestream').ReadableStream} ReadableStream 90 */ 91 92/** 93 * @typedef {import('../abort_controller').AbortSignal} AbortSignal 94 */ 95 96/** 97 * @param {Writable} streamWritable 98 * @returns {WritableStream} 99 */ 100function newWritableStreamFromStreamWritable(streamWritable) { 101 // Not using the internal/streams/utils isWritableNodeStream utility 102 // here because it will return false if streamWritable is a Duplex 103 // whose writable option is false. For a Duplex that is not writable, 104 // we want it to pass this check but return a closed WritableStream. 105 // We check if the given stream is a stream.Writable or http.OutgoingMessage 106 const checkIfWritableOrOutgoingMessage = 107 streamWritable && 108 typeof streamWritable?.write === 'function' && 109 typeof streamWritable?.on === 'function'; 110 if (!checkIfWritableOrOutgoingMessage) { 111 throw new ERR_INVALID_ARG_TYPE( 112 'streamWritable', 113 'stream.Writable', 114 streamWritable, 115 ); 116 } 117 118 if (isDestroyed(streamWritable) || !isWritable(streamWritable)) { 119 const writable = new WritableStream(); 120 writable.close(); 121 return writable; 122 } 123 124 const highWaterMark = streamWritable.writableHighWaterMark; 125 const strategy = 126 streamWritable.writableObjectMode ? 127 new CountQueuingStrategy({ highWaterMark }) : 128 { highWaterMark }; 129 130 let controller; 131 let backpressurePromise; 132 let closed; 133 134 function onDrain() { 135 if (backpressurePromise !== undefined) 136 backpressurePromise.resolve(); 137 } 138 139 const cleanup = finished(streamWritable, (error) => { 140 if (error?.code === 'ERR_STREAM_PREMATURE_CLOSE') { 141 const err = new AbortError(undefined, { cause: error }); 142 error = err; 143 } 144 145 cleanup(); 146 // This is a protection against non-standard, legacy streams 147 // that happen to emit an error event again after finished is called. 148 streamWritable.on('error', () => {}); 149 if (error != null) { 150 if (backpressurePromise !== undefined) 151 backpressurePromise.reject(error); 152 // If closed is not undefined, the error is happening 153 // after the WritableStream close has already started. 154 // We need to reject it here. 155 if (closed !== undefined) { 156 closed.reject(error); 157 closed = undefined; 158 } 159 controller.error(error); 160 controller = undefined; 161 return; 162 } 163 164 if (closed !== undefined) { 165 closed.resolve(); 166 closed = undefined; 167 return; 168 } 169 controller.error(new AbortError()); 170 controller = undefined; 171 }); 172 173 streamWritable.on('drain', onDrain); 174 175 return new WritableStream({ 176 start(c) { controller = c; }, 177 178 async write(chunk) { 179 if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) { 180 backpressurePromise = createDeferredPromise(); 181 return SafePromisePrototypeFinally( 182 backpressurePromise.promise, () => { 183 backpressurePromise = undefined; 184 }); 185 } 186 }, 187 188 abort(reason) { 189 destroy(streamWritable, reason); 190 }, 191 192 close() { 193 if (closed === undefined && !isWritableEnded(streamWritable)) { 194 closed = createDeferredPromise(); 195 streamWritable.end(); 196 return closed.promise; 197 } 198 199 controller = undefined; 200 return PromiseResolve(); 201 }, 202 }, strategy); 203} 204 205/** 206 * @param {WritableStream} writableStream 207 * @param {{ 208 * decodeStrings? : boolean, 209 * highWaterMark? : number, 210 * objectMode? : boolean, 211 * signal? : AbortSignal, 212 * }} [options] 213 * @returns {Writable} 214 */ 215function newStreamWritableFromWritableStream(writableStream, options = kEmptyObject) { 216 if (!isWritableStream(writableStream)) { 217 throw new ERR_INVALID_ARG_TYPE( 218 'writableStream', 219 'WritableStream', 220 writableStream); 221 } 222 223 validateObject(options, 'options'); 224 const { 225 highWaterMark, 226 decodeStrings = true, 227 objectMode = false, 228 signal, 229 } = options; 230 231 validateBoolean(objectMode, 'options.objectMode'); 232 validateBoolean(decodeStrings, 'options.decodeStrings'); 233 234 const writer = writableStream.getWriter(); 235 let closed = false; 236 237 const writable = new Writable({ 238 highWaterMark, 239 objectMode, 240 decodeStrings, 241 signal, 242 243 writev(chunks, callback) { 244 function done(error) { 245 error = error.filter((e) => e); 246 try { 247 callback(error.length === 0 ? undefined : error); 248 } catch (error) { 249 // In a next tick because this is happening within 250 // a promise context, and if there are any errors 251 // thrown we don't want those to cause an unhandled 252 // rejection. Let's just escape the promise and 253 // handle it separately. 254 process.nextTick(() => destroy(writable, error)); 255 } 256 } 257 258 PromisePrototypeThen( 259 writer.ready, 260 () => { 261 return PromisePrototypeThen( 262 SafePromiseAll( 263 chunks, 264 (data) => writer.write(data.chunk)), 265 done, 266 done); 267 }, 268 done); 269 }, 270 271 write(chunk, encoding, callback) { 272 if (typeof chunk === 'string' && decodeStrings && !objectMode) { 273 const enc = normalizeEncoding(encoding); 274 275 if (enc === 'utf8') { 276 chunk = encoder.encode(chunk); 277 } else { 278 chunk = Buffer.from(chunk, encoding); 279 chunk = new Uint8Array( 280 TypedArrayPrototypeGetBuffer(chunk), 281 TypedArrayPrototypeGetByteOffset(chunk), 282 TypedArrayPrototypeGetByteLength(chunk), 283 ); 284 } 285 } 286 287 function done(error) { 288 try { 289 callback(error); 290 } catch (error) { 291 destroy(writable, error); 292 } 293 } 294 295 PromisePrototypeThen( 296 writer.ready, 297 () => { 298 return PromisePrototypeThen( 299 writer.write(chunk), 300 done, 301 done); 302 }, 303 done); 304 }, 305 306 destroy(error, callback) { 307 function done() { 308 try { 309 callback(error); 310 } catch (error) { 311 // In a next tick because this is happening within 312 // a promise context, and if there are any errors 313 // thrown we don't want those to cause an unhandled 314 // rejection. Let's just escape the promise and 315 // handle it separately. 316 process.nextTick(() => { throw error; }); 317 } 318 } 319 320 if (!closed) { 321 if (error != null) { 322 PromisePrototypeThen( 323 writer.abort(error), 324 done, 325 done); 326 } else { 327 PromisePrototypeThen( 328 writer.close(), 329 done, 330 done); 331 } 332 return; 333 } 334 335 done(); 336 }, 337 338 final(callback) { 339 function done(error) { 340 try { 341 callback(error); 342 } catch (error) { 343 // In a next tick because this is happening within 344 // a promise context, and if there are any errors 345 // thrown we don't want those to cause an unhandled 346 // rejection. Let's just escape the promise and 347 // handle it separately. 348 process.nextTick(() => destroy(writable, error)); 349 } 350 } 351 352 if (!closed) { 353 PromisePrototypeThen( 354 writer.close(), 355 done, 356 done); 357 } 358 }, 359 }); 360 361 PromisePrototypeThen( 362 writer.closed, 363 () => { 364 // If the WritableStream closes before the stream.Writable has been 365 // ended, we signal an error on the stream.Writable. 366 closed = true; 367 if (!isWritableEnded(writable)) 368 destroy(writable, new ERR_STREAM_PREMATURE_CLOSE()); 369 }, 370 (error) => { 371 // If the WritableStream errors before the stream.Writable has been 372 // destroyed, signal an error on the stream.Writable. 373 closed = true; 374 destroy(writable, error); 375 }); 376 377 return writable; 378} 379 380/** 381 * @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy 382 * @param {Readable} streamReadable 383 * @param {{ 384 * strategy : QueuingStrategy 385 * }} [options] 386 * @returns {ReadableStream} 387 */ 388function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObject) { 389 // Not using the internal/streams/utils isReadableNodeStream utility 390 // here because it will return false if streamReadable is a Duplex 391 // whose readable option is false. For a Duplex that is not readable, 392 // we want it to pass this check but return a closed ReadableStream. 393 if (typeof streamReadable?._readableState !== 'object') { 394 throw new ERR_INVALID_ARG_TYPE( 395 'streamReadable', 396 'stream.Readable', 397 streamReadable); 398 } 399 400 if (isDestroyed(streamReadable) || !isReadable(streamReadable)) { 401 const readable = new ReadableStream(); 402 readable.cancel(); 403 return readable; 404 } 405 406 const objectMode = streamReadable.readableObjectMode; 407 const highWaterMark = streamReadable.readableHighWaterMark; 408 409 const evaluateStrategyOrFallback = (strategy) => { 410 // If there is a strategy available, use it 411 if (strategy) 412 return strategy; 413 414 if (objectMode) { 415 // When running in objectMode explicitly but no strategy, we just fall 416 // back to CountQueuingStrategy 417 return new CountQueuingStrategy({ highWaterMark }); 418 } 419 420 // When not running in objectMode explicitly, we just fall 421 // back to a minimal strategy that just specifies the highWaterMark 422 // and no size algorithm. Using a ByteLengthQueuingStrategy here 423 // is unnecessary. 424 return { highWaterMark }; 425 }; 426 427 const strategy = evaluateStrategyOrFallback(options?.strategy); 428 429 let controller; 430 431 function onData(chunk) { 432 // Copy the Buffer to detach it from the pool. 433 if (Buffer.isBuffer(chunk) && !objectMode) 434 chunk = new Uint8Array(chunk); 435 controller.enqueue(chunk); 436 if (controller.desiredSize <= 0) 437 streamReadable.pause(); 438 } 439 440 streamReadable.pause(); 441 442 const cleanup = finished(streamReadable, (error) => { 443 if (error?.code === 'ERR_STREAM_PREMATURE_CLOSE') { 444 const err = new AbortError(undefined, { cause: error }); 445 error = err; 446 } 447 448 cleanup(); 449 // This is a protection against non-standard, legacy streams 450 // that happen to emit an error event again after finished is called. 451 streamReadable.on('error', () => {}); 452 if (error) 453 return controller.error(error); 454 controller.close(); 455 }); 456 457 streamReadable.on('data', onData); 458 459 return new ReadableStream({ 460 start(c) { controller = c; }, 461 462 pull() { streamReadable.resume(); }, 463 464 cancel(reason) { 465 destroy(streamReadable, reason); 466 }, 467 }, strategy); 468} 469 470/** 471 * @param {ReadableStream} readableStream 472 * @param {{ 473 * highWaterMark? : number, 474 * encoding? : string, 475 * objectMode? : boolean, 476 * signal? : AbortSignal, 477 * }} [options] 478 * @returns {Readable} 479 */ 480function newStreamReadableFromReadableStream(readableStream, options = kEmptyObject) { 481 if (!isReadableStream(readableStream)) { 482 throw new ERR_INVALID_ARG_TYPE( 483 'readableStream', 484 'ReadableStream', 485 readableStream); 486 } 487 488 validateObject(options, 'options'); 489 const { 490 highWaterMark, 491 encoding, 492 objectMode = false, 493 signal, 494 } = options; 495 496 if (encoding !== undefined && !Buffer.isEncoding(encoding)) 497 throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding'); 498 validateBoolean(objectMode, 'options.objectMode'); 499 500 const reader = readableStream.getReader(); 501 let closed = false; 502 503 const readable = new Readable({ 504 objectMode, 505 highWaterMark, 506 encoding, 507 signal, 508 509 read() { 510 PromisePrototypeThen( 511 reader.read(), 512 (chunk) => { 513 if (chunk.done) { 514 // Value should always be undefined here. 515 readable.push(null); 516 } else { 517 readable.push(chunk.value); 518 } 519 }, 520 (error) => destroy(readable, error)); 521 }, 522 523 destroy(error, callback) { 524 function done() { 525 try { 526 callback(error); 527 } catch (error) { 528 // In a next tick because this is happening within 529 // a promise context, and if there are any errors 530 // thrown we don't want those to cause an unhandled 531 // rejection. Let's just escape the promise and 532 // handle it separately. 533 process.nextTick(() => { throw error; }); 534 } 535 } 536 537 if (!closed) { 538 PromisePrototypeThen( 539 reader.cancel(error), 540 done, 541 done); 542 return; 543 } 544 done(); 545 }, 546 }); 547 548 PromisePrototypeThen( 549 reader.closed, 550 () => { 551 closed = true; 552 }, 553 (error) => { 554 closed = true; 555 destroy(readable, error); 556 }); 557 558 return readable; 559} 560 561/** 562 * @typedef {import('./readablestream').ReadableWritablePair 563 * } ReadableWritablePair 564 * @typedef {import('../../stream').Duplex} Duplex 565 */ 566 567/** 568 * @param {Duplex} duplex 569 * @returns {ReadableWritablePair} 570 */ 571function newReadableWritablePairFromDuplex(duplex) { 572 // Not using the internal/streams/utils isWritableNodeStream and 573 // isReadableNodeStream utilities here because they will return false 574 // if the duplex was created with writable or readable options set to 575 // false. Instead, we'll check the readable and writable state after 576 // and return closed WritableStream or closed ReadableStream as 577 // necessary. 578 if (typeof duplex?._writableState !== 'object' || 579 typeof duplex?._readableState !== 'object') { 580 throw new ERR_INVALID_ARG_TYPE('duplex', 'stream.Duplex', duplex); 581 } 582 583 if (isDestroyed(duplex)) { 584 const writable = new WritableStream(); 585 const readable = new ReadableStream(); 586 writable.close(); 587 readable.cancel(); 588 return { readable, writable }; 589 } 590 591 const writable = 592 isWritable(duplex) ? 593 newWritableStreamFromStreamWritable(duplex) : 594 new WritableStream(); 595 596 if (!isWritable(duplex)) 597 writable.close(); 598 599 const readable = 600 isReadable(duplex) ? 601 newReadableStreamFromStreamReadable(duplex) : 602 new ReadableStream(); 603 604 if (!isReadable(duplex)) 605 readable.cancel(); 606 607 return { writable, readable }; 608} 609 610/** 611 * @param {ReadableWritablePair} pair 612 * @param {{ 613 * allowHalfOpen? : boolean, 614 * decodeStrings? : boolean, 615 * encoding? : string, 616 * highWaterMark? : number, 617 * objectMode? : boolean, 618 * signal? : AbortSignal, 619 * }} [options] 620 * @returns {Duplex} 621 */ 622function newStreamDuplexFromReadableWritablePair(pair = kEmptyObject, options = kEmptyObject) { 623 validateObject(pair, 'pair'); 624 const { 625 readable: readableStream, 626 writable: writableStream, 627 } = pair; 628 629 if (!isReadableStream(readableStream)) { 630 throw new ERR_INVALID_ARG_TYPE( 631 'pair.readable', 632 'ReadableStream', 633 readableStream); 634 } 635 if (!isWritableStream(writableStream)) { 636 throw new ERR_INVALID_ARG_TYPE( 637 'pair.writable', 638 'WritableStream', 639 writableStream); 640 } 641 642 validateObject(options, 'options'); 643 const { 644 allowHalfOpen = false, 645 objectMode = false, 646 encoding, 647 decodeStrings = true, 648 highWaterMark, 649 signal, 650 } = options; 651 652 validateBoolean(objectMode, 'options.objectMode'); 653 if (encoding !== undefined && !Buffer.isEncoding(encoding)) 654 throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding'); 655 656 const writer = writableStream.getWriter(); 657 const reader = readableStream.getReader(); 658 let writableClosed = false; 659 let readableClosed = false; 660 661 const duplex = new Duplex({ 662 allowHalfOpen, 663 highWaterMark, 664 objectMode, 665 encoding, 666 decodeStrings, 667 signal, 668 669 writev(chunks, callback) { 670 function done(error) { 671 error = error.filter((e) => e); 672 try { 673 callback(error.length === 0 ? undefined : error); 674 } catch (error) { 675 // In a next tick because this is happening within 676 // a promise context, and if there are any errors 677 // thrown we don't want those to cause an unhandled 678 // rejection. Let's just escape the promise and 679 // handle it separately. 680 process.nextTick(() => destroy(duplex, error)); 681 } 682 } 683 684 PromisePrototypeThen( 685 writer.ready, 686 () => { 687 return PromisePrototypeThen( 688 SafePromiseAll( 689 chunks, 690 (data) => writer.write(data.chunk)), 691 done, 692 done); 693 }, 694 done); 695 }, 696 697 write(chunk, encoding, callback) { 698 if (typeof chunk === 'string' && decodeStrings && !objectMode) { 699 const enc = normalizeEncoding(encoding); 700 701 if (enc === 'utf8') { 702 chunk = encoder.encode(chunk); 703 } else { 704 chunk = Buffer.from(chunk, encoding); 705 chunk = new Uint8Array( 706 TypedArrayPrototypeGetBuffer(chunk), 707 TypedArrayPrototypeGetByteOffset(chunk), 708 TypedArrayPrototypeGetByteLength(chunk), 709 ); 710 } 711 } 712 713 function done(error) { 714 try { 715 callback(error); 716 } catch (error) { 717 destroy(duplex, error); 718 } 719 } 720 721 PromisePrototypeThen( 722 writer.ready, 723 () => { 724 return PromisePrototypeThen( 725 writer.write(chunk), 726 done, 727 done); 728 }, 729 done); 730 }, 731 732 final(callback) { 733 function done(error) { 734 try { 735 callback(error); 736 } catch (error) { 737 // In a next tick because this is happening within 738 // a promise context, and if there are any errors 739 // thrown we don't want those to cause an unhandled 740 // rejection. Let's just escape the promise and 741 // handle it separately. 742 process.nextTick(() => destroy(duplex, error)); 743 } 744 } 745 746 if (!writableClosed) { 747 PromisePrototypeThen( 748 writer.close(), 749 done, 750 done); 751 } 752 }, 753 754 read() { 755 PromisePrototypeThen( 756 reader.read(), 757 (chunk) => { 758 if (chunk.done) { 759 duplex.push(null); 760 } else { 761 duplex.push(chunk.value); 762 } 763 }, 764 (error) => destroy(duplex, error)); 765 }, 766 767 destroy(error, callback) { 768 function done() { 769 try { 770 callback(error); 771 } catch (error) { 772 // In a next tick because this is happening within 773 // a promise context, and if there are any errors 774 // thrown we don't want those to cause an unhandled 775 // rejection. Let's just escape the promise and 776 // handle it separately. 777 process.nextTick(() => { throw error; }); 778 } 779 } 780 781 async function closeWriter() { 782 if (!writableClosed) 783 await writer.abort(error); 784 } 785 786 async function closeReader() { 787 if (!readableClosed) 788 await reader.cancel(error); 789 } 790 791 if (!writableClosed || !readableClosed) { 792 PromisePrototypeThen( 793 SafePromiseAll([ 794 closeWriter(), 795 closeReader(), 796 ]), 797 done, 798 done); 799 return; 800 } 801 802 done(); 803 }, 804 }); 805 806 PromisePrototypeThen( 807 writer.closed, 808 () => { 809 writableClosed = true; 810 if (!isWritableEnded(duplex)) 811 destroy(duplex, new ERR_STREAM_PREMATURE_CLOSE()); 812 }, 813 (error) => { 814 writableClosed = true; 815 readableClosed = true; 816 destroy(duplex, error); 817 }); 818 819 PromisePrototypeThen( 820 reader.closed, 821 () => { 822 readableClosed = true; 823 }, 824 (error) => { 825 writableClosed = true; 826 readableClosed = true; 827 destroy(duplex, error); 828 }); 829 830 return duplex; 831} 832 833/** 834 * @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy 835 * @typedef {{}} StreamBase 836 * @param {StreamBase} streamBase 837 * @param {QueuingStrategy} strategy 838 * @returns {WritableStream} 839 */ 840function newWritableStreamFromStreamBase(streamBase, strategy) { 841 validateObject(streamBase, 'streamBase'); 842 843 let current; 844 845 function createWriteWrap(controller, promise) { 846 const req = new WriteWrap(); 847 req.handle = streamBase; 848 req.oncomplete = onWriteComplete; 849 req.async = false; 850 req.bytes = 0; 851 req.buffer = null; 852 req.controller = controller; 853 req.promise = promise; 854 return req; 855 } 856 857 function onWriteComplete(status) { 858 if (status < 0) { 859 const error = errnoException(status, 'write', this.error); 860 this.promise.reject(error); 861 this.controller.error(error); 862 return; 863 } 864 this.promise.resolve(); 865 } 866 867 function doWrite(chunk, controller) { 868 const promise = createDeferredPromise(); 869 let ret; 870 let req; 871 try { 872 req = createWriteWrap(controller, promise); 873 ret = streamBase.writeBuffer(req, chunk); 874 if (streamBaseState[kLastWriteWasAsync]) 875 req.buffer = chunk; 876 req.async = !!streamBaseState[kLastWriteWasAsync]; 877 } catch (error) { 878 promise.reject(error); 879 } 880 881 if (ret !== 0) 882 promise.reject(errnoException(ret, 'write', req)); 883 else if (!req.async) 884 promise.resolve(); 885 886 return promise.promise; 887 } 888 889 return new WritableStream({ 890 write(chunk, controller) { 891 current = current !== undefined ? 892 PromisePrototypeThen( 893 current, 894 () => doWrite(chunk, controller), 895 (error) => controller.error(error)) : 896 doWrite(chunk, controller); 897 return current; 898 }, 899 900 close() { 901 const promise = createDeferredPromise(); 902 const req = new ShutdownWrap(); 903 req.oncomplete = () => promise.resolve(); 904 const err = streamBase.shutdown(req); 905 if (err === 1) 906 promise.resolve(); 907 return promise.promise; 908 }, 909 }, strategy); 910} 911 912/** 913 * @param {StreamBase} streamBase 914 * @param {QueuingStrategy} strategy 915 * @returns {ReadableStream} 916 */ 917function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyObject) { 918 validateObject(streamBase, 'streamBase'); 919 validateObject(options, 'options'); 920 921 const { 922 ondone = () => {}, 923 } = options; 924 925 if (typeof streamBase.onread === 'function') 926 throw new ERR_INVALID_STATE('StreamBase already has a consumer'); 927 928 validateFunction(ondone, 'options.ondone'); 929 930 let controller; 931 932 streamBase.onread = (arrayBuffer) => { 933 const nread = streamBaseState[kReadBytesOrError]; 934 935 if (nread === 0) 936 return; 937 938 try { 939 if (nread === UV_EOF) { 940 controller.close(); 941 streamBase.readStop(); 942 try { 943 ondone(); 944 } catch (error) { 945 controller.error(error); 946 } 947 return; 948 } 949 950 controller.enqueue(arrayBuffer); 951 952 if (controller.desiredSize <= 0) 953 streamBase.readStop(); 954 } catch (error) { 955 controller.error(error); 956 streamBase.readStop(); 957 } 958 }; 959 960 return new ReadableStream({ 961 start(c) { controller = c; }, 962 963 pull() { 964 streamBase.readStart(); 965 }, 966 967 cancel() { 968 const promise = createDeferredPromise(); 969 try { 970 ondone(); 971 } catch (error) { 972 promise.reject(error); 973 return promise.promise; 974 } 975 const req = new ShutdownWrap(); 976 req.oncomplete = () => promise.resolve(); 977 const err = streamBase.shutdown(req); 978 if (err === 1) 979 promise.resolve(); 980 return promise.promise; 981 }, 982 }, strategy); 983} 984 985module.exports = { 986 newWritableStreamFromStreamWritable, 987 newReadableStreamFromStreamReadable, 988 newStreamWritableFromWritableStream, 989 newStreamReadableFromReadableStream, 990 newReadableWritablePairFromDuplex, 991 newStreamDuplexFromReadableWritablePair, 992 newWritableStreamFromStreamBase, 993 newReadableStreamFromStreamBase, 994}; 995