1'use strict'; 2 3const { 4 FunctionPrototypeBind, 5 FunctionPrototypeCall, 6 ObjectDefineProperties, 7 PromisePrototypeThen, 8 PromiseResolve, 9 ReflectConstruct, 10 SymbolToStringTag, 11} = primordials; 12 13const { 14 codes: { 15 ERR_ILLEGAL_CONSTRUCTOR, 16 ERR_INVALID_ARG_VALUE, 17 ERR_INVALID_STATE, 18 ERR_INVALID_THIS, 19 }, 20} = require('internal/errors'); 21 22const { 23 DOMException, 24} = internalBinding('messaging'); 25 26const { 27 createDeferredPromise, 28 customInspectSymbol: kInspect, 29 kEmptyObject, 30 kEnumerableProperty, 31} = require('internal/util'); 32 33const { 34 kDeserialize, 35 kTransfer, 36 kTransferList, 37 makeTransferable, 38} = require('internal/worker/js_transferable'); 39 40const { 41 customInspect, 42 ensureIsPromise, 43 extractHighWaterMark, 44 extractSizeAlgorithm, 45 isBrandCheck, 46 nonOpFlush, 47 kType, 48 kState, 49} = require('internal/webstreams/util'); 50 51const { 52 ReadableStream, 53 readableStreamDefaultControllerCanCloseOrEnqueue, 54 readableStreamDefaultControllerClose, 55 readableStreamDefaultControllerEnqueue, 56 readableStreamDefaultControllerError, 57 readableStreamDefaultControllerGetDesiredSize, 58 readableStreamDefaultControllerHasBackpressure, 59} = require('internal/webstreams/readablestream'); 60 61const { 62 WritableStream, 63 writableStreamDefaultControllerErrorIfNeeded, 64} = require('internal/webstreams/writablestream'); 65 66const assert = require('internal/assert'); 67 68const getNonWritablePropertyDescriptor = (value) => { 69 return { 70 __proto__: null, 71 configurable: true, 72 value, 73 }; 74}; 75 76/** 77 * @typedef {import('./queuingstrategies').QueuingStrategy 78 * } QueuingStrategy 79 * @typedef {import('./queuingstrategies').QueuingStrategySize 80 * } QueuingStrategySize 81 */ 82 83/** 84 * @callback TransformerStartCallback 85 * @param {TransformStreamDefaultController} controller; 86 */ 87 88/** 89 * @callback TransformerFlushCallback 90 * @param {TransformStreamDefaultController} controller; 91 * @returns {Promise<void>} 92 */ 93 94/** 95 * @callback TransformerTransformCallback 96 * @param {any} chunk 97 * @param {TransformStreamDefaultController} controller 98 * @returns {Promise<void>} 99 */ 100 101/** 102 * @typedef {{ 103 * start? : TransformerStartCallback, 104 * transform? : TransformerTransformCallback, 105 * flush? : TransformerFlushCallback, 106 * readableType? : any, 107 * writableType? : any, 108 * }} Transformer 109 */ 110 111class TransformStream { 112 [kType] = 'TransformStream'; 113 114 /** 115 * @param {Transformer} [transformer] 116 * @param {QueuingStrategy} [writableStrategy] 117 * @param {QueuingStrategy} [readableStrategy] 118 */ 119 constructor( 120 transformer = null, 121 writableStrategy = kEmptyObject, 122 readableStrategy = kEmptyObject) { 123 const readableType = transformer?.readableType; 124 const writableType = transformer?.writableType; 125 const start = transformer?.start; 126 127 if (readableType !== undefined) { 128 throw new ERR_INVALID_ARG_VALUE.RangeError( 129 'transformer.readableType', 130 readableType); 131 } 132 if (writableType !== undefined) { 133 throw new ERR_INVALID_ARG_VALUE.RangeError( 134 'transformer.writableType', 135 writableType); 136 } 137 138 const readableHighWaterMark = readableStrategy?.highWaterMark; 139 const readableSize = readableStrategy?.size; 140 141 const writableHighWaterMark = writableStrategy?.highWaterMark; 142 const writableSize = writableStrategy?.size; 143 144 const actualReadableHighWaterMark = 145 extractHighWaterMark(readableHighWaterMark, 0); 146 const actualReadableSize = extractSizeAlgorithm(readableSize); 147 148 const actualWritableHighWaterMark = 149 extractHighWaterMark(writableHighWaterMark, 1); 150 const actualWritableSize = extractSizeAlgorithm(writableSize); 151 152 const startPromise = createDeferredPromise(); 153 154 initializeTransformStream( 155 this, 156 startPromise, 157 actualWritableHighWaterMark, 158 actualWritableSize, 159 actualReadableHighWaterMark, 160 actualReadableSize); 161 162 setupTransformStreamDefaultControllerFromTransformer(this, transformer); 163 164 if (start !== undefined) { 165 startPromise.resolve( 166 FunctionPrototypeCall( 167 start, 168 transformer, 169 this[kState].controller)); 170 } else { 171 startPromise.resolve(); 172 } 173 174 // eslint-disable-next-line no-constructor-return 175 return makeTransferable(this); 176 } 177 178 /** 179 * @readonly 180 * @type {ReadableStream} 181 */ 182 get readable() { 183 if (!isTransformStream(this)) 184 throw new ERR_INVALID_THIS('TransformStream'); 185 return this[kState].readable; 186 } 187 188 /** 189 * @readonly 190 * @type {WritableStream} 191 */ 192 get writable() { 193 if (!isTransformStream(this)) 194 throw new ERR_INVALID_THIS('TransformStream'); 195 return this[kState].writable; 196 } 197 198 [kInspect](depth, options) { 199 return customInspect(depth, options, this[kType], { 200 readable: this.readable, 201 writable: this.writable, 202 backpressure: this[kState].backpressure, 203 }); 204 } 205 206 [kTransfer]() { 207 if (!isTransformStream(this)) 208 throw new ERR_INVALID_THIS('TransformStream'); 209 const { 210 readable, 211 writable, 212 } = this[kState]; 213 if (readable.locked) { 214 throw new DOMException( 215 'Cannot transfer a locked ReadableStream', 216 'DataCloneError'); 217 } 218 if (writable.locked) { 219 throw new DOMException( 220 'Cannot transfer a locked WritableStream', 221 'DataCloneError'); 222 } 223 return { 224 data: { 225 readable, 226 writable, 227 }, 228 deserializeInfo: 229 'internal/webstreams/transformstream:TransferredTransformStream', 230 }; 231 } 232 233 [kTransferList]() { 234 return [ this[kState].readable, this[kState].writable ]; 235 } 236 237 [kDeserialize]({ readable, writable }) { 238 this[kState].readable = readable; 239 this[kState].writable = writable; 240 } 241} 242 243ObjectDefineProperties(TransformStream.prototype, { 244 readable: kEnumerableProperty, 245 writable: kEnumerableProperty, 246 [SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStream.name), 247}); 248 249function TransferredTransformStream() { 250 return makeTransferable(ReflectConstruct( 251 function() { 252 this[kType] = 'TransformStream'; 253 this[kState] = { 254 readable: undefined, 255 writable: undefined, 256 backpressure: undefined, 257 backpressureChange: { 258 promise: undefined, 259 resolve: undefined, 260 reject: undefined, 261 }, 262 controller: undefined, 263 }; 264 }, 265 [], TransformStream)); 266} 267TransferredTransformStream.prototype[kDeserialize] = () => {}; 268 269class TransformStreamDefaultController { 270 [kType] = 'TransformStreamDefaultController'; 271 272 constructor() { 273 throw new ERR_ILLEGAL_CONSTRUCTOR(); 274 } 275 276 /** 277 * @readonly 278 * @type {number} 279 */ 280 get desiredSize() { 281 if (!isTransformStreamDefaultController(this)) 282 throw new ERR_INVALID_THIS('TransformStreamDefaultController'); 283 const { 284 stream, 285 } = this[kState]; 286 const { 287 readable, 288 } = stream[kState]; 289 const { 290 controller: readableController, 291 } = readable[kState]; 292 return readableStreamDefaultControllerGetDesiredSize(readableController); 293 } 294 295 /** 296 * @param {any} [chunk] 297 */ 298 enqueue(chunk = undefined) { 299 if (!isTransformStreamDefaultController(this)) 300 throw new ERR_INVALID_THIS('TransformStreamDefaultController'); 301 transformStreamDefaultControllerEnqueue(this, chunk); 302 } 303 304 /** 305 * @param {any} [reason] 306 */ 307 error(reason = undefined) { 308 if (!isTransformStreamDefaultController(this)) 309 throw new ERR_INVALID_THIS('TransformStreamDefaultController'); 310 transformStreamDefaultControllerError(this, reason); 311 } 312 313 terminate() { 314 if (!isTransformStreamDefaultController(this)) 315 throw new ERR_INVALID_THIS('TransformStreamDefaultController'); 316 transformStreamDefaultControllerTerminate(this); 317 } 318 319 [kInspect](depth, options) { 320 return customInspect(depth, options, this[kType], { 321 stream: this[kState].stream, 322 }); 323 } 324} 325 326ObjectDefineProperties(TransformStreamDefaultController.prototype, { 327 desiredSize: kEnumerableProperty, 328 enqueue: kEnumerableProperty, 329 error: kEnumerableProperty, 330 terminate: kEnumerableProperty, 331 [SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStreamDefaultController.name), 332}); 333 334function createTransformStreamDefaultController() { 335 return ReflectConstruct( 336 function() { 337 this[kType] = 'TransformStreamDefaultController'; 338 }, 339 [], 340 TransformStreamDefaultController); 341} 342 343const isTransformStream = 344 isBrandCheck('TransformStream'); 345const isTransformStreamDefaultController = 346 isBrandCheck('TransformStreamDefaultController'); 347 348async function defaultTransformAlgorithm(chunk, controller) { 349 transformStreamDefaultControllerEnqueue(controller, chunk); 350} 351 352function initializeTransformStream( 353 stream, 354 startPromise, 355 writableHighWaterMark, 356 writableSizeAlgorithm, 357 readableHighWaterMark, 358 readableSizeAlgorithm) { 359 360 const writable = new WritableStream({ 361 __proto__: null, 362 start() { return startPromise.promise; }, 363 write(chunk) { 364 return transformStreamDefaultSinkWriteAlgorithm(stream, chunk); 365 }, 366 abort(reason) { 367 return transformStreamDefaultSinkAbortAlgorithm(stream, reason); 368 }, 369 close() { 370 return transformStreamDefaultSinkCloseAlgorithm(stream); 371 }, 372 }, { 373 highWaterMark: writableHighWaterMark, 374 size: writableSizeAlgorithm, 375 }); 376 377 const readable = new ReadableStream({ 378 __proto__: null, 379 start() { return startPromise.promise; }, 380 pull() { 381 return transformStreamDefaultSourcePullAlgorithm(stream); 382 }, 383 cancel(reason) { 384 transformStreamErrorWritableAndUnblockWrite(stream, reason); 385 return PromiseResolve(); 386 }, 387 }, { 388 highWaterMark: readableHighWaterMark, 389 size: readableSizeAlgorithm, 390 }); 391 392 stream[kState] = { 393 readable, 394 writable, 395 controller: undefined, 396 backpressure: undefined, 397 backpressureChange: { 398 promise: undefined, 399 resolve: undefined, 400 reject: undefined, 401 }, 402 }; 403 404 transformStreamSetBackpressure(stream, true); 405} 406 407function transformStreamError(stream, error) { 408 const { 409 readable, 410 } = stream[kState]; 411 const { 412 controller, 413 } = readable[kState]; 414 readableStreamDefaultControllerError(controller, error); 415 transformStreamErrorWritableAndUnblockWrite(stream, error); 416} 417 418function transformStreamErrorWritableAndUnblockWrite(stream, error) { 419 const { 420 controller, 421 writable, 422 } = stream[kState]; 423 transformStreamDefaultControllerClearAlgorithms(controller); 424 writableStreamDefaultControllerErrorIfNeeded( 425 writable[kState].controller, 426 error); 427 if (stream[kState].backpressure) 428 transformStreamSetBackpressure(stream, false); 429} 430 431function transformStreamSetBackpressure(stream, backpressure) { 432 assert(stream[kState].backpressure !== backpressure); 433 if (stream[kState].backpressureChange.promise !== undefined) 434 stream[kState].backpressureChange.resolve?.(); 435 stream[kState].backpressureChange = createDeferredPromise(); 436 stream[kState].backpressure = backpressure; 437} 438 439function setupTransformStreamDefaultController( 440 stream, 441 controller, 442 transformAlgorithm, 443 flushAlgorithm) { 444 assert(isTransformStream(stream)); 445 assert(stream[kState].controller === undefined); 446 controller[kState] = { 447 stream, 448 transformAlgorithm, 449 flushAlgorithm, 450 }; 451 stream[kState].controller = controller; 452} 453 454function setupTransformStreamDefaultControllerFromTransformer( 455 stream, 456 transformer) { 457 const controller = createTransformStreamDefaultController(); 458 const transform = transformer?.transform || defaultTransformAlgorithm; 459 const flush = transformer?.flush || nonOpFlush; 460 const transformAlgorithm = 461 FunctionPrototypeBind(transform, transformer); 462 const flushAlgorithm = 463 FunctionPrototypeBind(flush, transformer); 464 465 setupTransformStreamDefaultController( 466 stream, 467 controller, 468 transformAlgorithm, 469 flushAlgorithm); 470} 471 472function transformStreamDefaultControllerClearAlgorithms(controller) { 473 controller[kState].transformAlgorithm = undefined; 474 controller[kState].flushAlgorithm = undefined; 475} 476 477function transformStreamDefaultControllerEnqueue(controller, chunk) { 478 const { 479 stream, 480 } = controller[kState]; 481 const { 482 readable, 483 } = stream[kState]; 484 const { 485 controller: readableController, 486 } = readable[kState]; 487 if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) 488 throw new ERR_INVALID_STATE.TypeError('Unable to enqueue'); 489 try { 490 readableStreamDefaultControllerEnqueue(readableController, chunk); 491 } catch (error) { 492 transformStreamErrorWritableAndUnblockWrite(stream, error); 493 throw readable[kState].storedError; 494 } 495 const backpressure = 496 readableStreamDefaultControllerHasBackpressure(readableController); 497 if (backpressure !== stream[kState].backpressure) { 498 assert(backpressure); 499 transformStreamSetBackpressure(stream, true); 500 } 501} 502 503function transformStreamDefaultControllerError(controller, error) { 504 transformStreamError(controller[kState].stream, error); 505} 506 507async function transformStreamDefaultControllerPerformTransform(controller, chunk) { 508 try { 509 return await ensureIsPromise( 510 controller[kState].transformAlgorithm, 511 controller, 512 chunk, 513 controller); 514 } catch (error) { 515 transformStreamError(controller[kState].stream, error); 516 throw error; 517 } 518} 519 520function transformStreamDefaultControllerTerminate(controller) { 521 const { 522 stream, 523 } = controller[kState]; 524 const { 525 readable, 526 } = stream[kState]; 527 assert(readable !== undefined); 528 const { 529 controller: readableController, 530 } = readable[kState]; 531 readableStreamDefaultControllerClose(readableController); 532 transformStreamErrorWritableAndUnblockWrite( 533 stream, 534 new ERR_INVALID_STATE.TypeError('TransformStream has been terminated')); 535} 536 537function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) { 538 const { 539 writable, 540 controller, 541 } = stream[kState]; 542 assert(writable[kState].state === 'writable'); 543 if (stream[kState].backpressure) { 544 const backpressureChange = stream[kState].backpressureChange.promise; 545 return PromisePrototypeThen( 546 backpressureChange, 547 () => { 548 const { 549 writable, 550 } = stream[kState]; 551 if (writable[kState].state === 'erroring') 552 throw writable[kState].storedError; 553 assert(writable[kState].state === 'writable'); 554 return transformStreamDefaultControllerPerformTransform( 555 controller, 556 chunk); 557 }); 558 } 559 return transformStreamDefaultControllerPerformTransform(controller, chunk); 560} 561 562async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { 563 transformStreamError(stream, reason); 564} 565 566function transformStreamDefaultSinkCloseAlgorithm(stream) { 567 const { 568 readable, 569 controller, 570 } = stream[kState]; 571 572 const flushPromise = 573 ensureIsPromise( 574 controller[kState].flushAlgorithm, 575 controller, 576 controller); 577 transformStreamDefaultControllerClearAlgorithms(controller); 578 return PromisePrototypeThen( 579 flushPromise, 580 () => { 581 if (readable[kState].state === 'errored') 582 throw readable[kState].storedError; 583 readableStreamDefaultControllerClose(readable[kState].controller); 584 }, 585 (error) => { 586 transformStreamError(stream, error); 587 throw readable[kState].storedError; 588 }); 589} 590 591function transformStreamDefaultSourcePullAlgorithm(stream) { 592 assert(stream[kState].backpressure); 593 assert(stream[kState].backpressureChange.promise !== undefined); 594 transformStreamSetBackpressure(stream, false); 595 return stream[kState].backpressureChange.promise; 596} 597 598module.exports = { 599 TransformStream, 600 TransformStreamDefaultController, 601 TransferredTransformStream, 602 603 // Exported Brand Checks 604 isTransformStream, 605 isTransformStreamDefaultController, 606}; 607