1'use strict'; 2 3const { 4 FunctionPrototypeBind, 5 FunctionPrototypeCall, 6 ObjectDefineProperties, 7 PromisePrototypeThen, 8 PromiseResolve, 9 ReflectConstruct, 10 SymbolToStringTag, 11 Symbol, 12} = primordials; 13 14const { 15 codes: { 16 ERR_ILLEGAL_CONSTRUCTOR, 17 ERR_INVALID_ARG_VALUE, 18 ERR_INVALID_STATE, 19 ERR_INVALID_THIS, 20 }, 21} = require('internal/errors'); 22 23const { 24 DOMException, 25} = internalBinding('messaging'); 26 27const { 28 createDeferredPromise, 29 customInspectSymbol: kInspect, 30 kEmptyObject, 31 kEnumerableProperty, 32} = require('internal/util'); 33 34const { 35 kDeserialize, 36 kTransfer, 37 kTransferList, 38 makeTransferable, 39} = require('internal/worker/js_transferable'); 40 41const { 42 customInspect, 43 ensureIsPromise, 44 extractHighWaterMark, 45 extractSizeAlgorithm, 46 isBrandCheck, 47 nonOpFlush, 48 kType, 49 kState, 50} = require('internal/webstreams/util'); 51 52const { 53 ReadableStream, 54 readableStreamDefaultControllerCanCloseOrEnqueue, 55 readableStreamDefaultControllerClose, 56 readableStreamDefaultControllerEnqueue, 57 readableStreamDefaultControllerError, 58 readableStreamDefaultControllerGetDesiredSize, 59 readableStreamDefaultControllerHasBackpressure, 60} = require('internal/webstreams/readablestream'); 61 62const { 63 WritableStream, 64 writableStreamDefaultControllerErrorIfNeeded, 65} = require('internal/webstreams/writablestream'); 66 67const assert = require('internal/assert'); 68 69const kSkipThrow = Symbol('kSkipThrow'); 70 71const getNonWritablePropertyDescriptor = (value) => { 72 return { 73 __proto__: null, 74 configurable: true, 75 value, 76 }; 77}; 78 79/** 80 * @typedef {import('./queuingstrategies').QueuingStrategy 81 * } QueuingStrategy 82 * @typedef {import('./queuingstrategies').QueuingStrategySize 83 * } QueuingStrategySize 84 */ 85 86/** 87 * @callback TransformerStartCallback 88 * @param {TransformStreamDefaultController} controller; 89 */ 90 91/** 92 * @callback TransformerFlushCallback 93 * @param {TransformStreamDefaultController} controller; 94 * @returns {Promise<void>} 95 */ 96 97/** 98 * @callback TransformerTransformCallback 99 * @param {any} chunk 100 * @param {TransformStreamDefaultController} controller 101 * @returns {Promise<void>} 102 */ 103 104/** 105 * @typedef {{ 106 * start? : TransformerStartCallback, 107 * transform? : TransformerTransformCallback, 108 * flush? : TransformerFlushCallback, 109 * readableType? : any, 110 * writableType? : any, 111 * }} Transformer 112 */ 113 114class TransformStream { 115 [kType] = 'TransformStream'; 116 117 /** 118 * @param {Transformer} [transformer] 119 * @param {QueuingStrategy} [writableStrategy] 120 * @param {QueuingStrategy} [readableStrategy] 121 */ 122 constructor( 123 transformer = null, 124 writableStrategy = kEmptyObject, 125 readableStrategy = kEmptyObject) { 126 const readableType = transformer?.readableType; 127 const writableType = transformer?.writableType; 128 const start = transformer?.start; 129 130 if (readableType !== undefined) { 131 throw new ERR_INVALID_ARG_VALUE.RangeError( 132 'transformer.readableType', 133 readableType); 134 } 135 if (writableType !== undefined) { 136 throw new ERR_INVALID_ARG_VALUE.RangeError( 137 'transformer.writableType', 138 writableType); 139 } 140 141 const readableHighWaterMark = readableStrategy?.highWaterMark; 142 const readableSize = readableStrategy?.size; 143 144 const writableHighWaterMark = writableStrategy?.highWaterMark; 145 const writableSize = writableStrategy?.size; 146 147 const actualReadableHighWaterMark = 148 extractHighWaterMark(readableHighWaterMark, 0); 149 const actualReadableSize = extractSizeAlgorithm(readableSize); 150 151 const actualWritableHighWaterMark = 152 extractHighWaterMark(writableHighWaterMark, 1); 153 const actualWritableSize = extractSizeAlgorithm(writableSize); 154 155 const startPromise = createDeferredPromise(); 156 157 initializeTransformStream( 158 this, 159 startPromise, 160 actualWritableHighWaterMark, 161 actualWritableSize, 162 actualReadableHighWaterMark, 163 actualReadableSize); 164 165 setupTransformStreamDefaultControllerFromTransformer(this, transformer); 166 167 if (start !== undefined) { 168 startPromise.resolve( 169 FunctionPrototypeCall( 170 start, 171 transformer, 172 this[kState].controller)); 173 } else { 174 startPromise.resolve(); 175 } 176 177 // eslint-disable-next-line no-constructor-return 178 return makeTransferable(this); 179 } 180 181 /** 182 * @readonly 183 * @type {ReadableStream} 184 */ 185 get readable() { 186 if (!isTransformStream(this)) 187 throw new ERR_INVALID_THIS('TransformStream'); 188 return this[kState].readable; 189 } 190 191 /** 192 * @readonly 193 * @type {WritableStream} 194 */ 195 get writable() { 196 if (!isTransformStream(this)) 197 throw new ERR_INVALID_THIS('TransformStream'); 198 return this[kState].writable; 199 } 200 201 [kInspect](depth, options) { 202 return customInspect(depth, options, this[kType], { 203 readable: this.readable, 204 writable: this.writable, 205 backpressure: this[kState].backpressure, 206 }); 207 } 208 209 [kTransfer]() { 210 if (!isTransformStream(this)) 211 throw new ERR_INVALID_THIS('TransformStream'); 212 const { 213 readable, 214 writable, 215 } = this[kState]; 216 if (readable.locked) { 217 throw new DOMException( 218 'Cannot transfer a locked ReadableStream', 219 'DataCloneError'); 220 } 221 if (writable.locked) { 222 throw new DOMException( 223 'Cannot transfer a locked WritableStream', 224 'DataCloneError'); 225 } 226 return { 227 data: { 228 readable, 229 writable, 230 }, 231 deserializeInfo: 232 'internal/webstreams/transformstream:TransferredTransformStream', 233 }; 234 } 235 236 [kTransferList]() { 237 return [ this[kState].readable, this[kState].writable ]; 238 } 239 240 [kDeserialize]({ readable, writable }) { 241 this[kState].readable = readable; 242 this[kState].writable = writable; 243 } 244} 245 246ObjectDefineProperties(TransformStream.prototype, { 247 readable: kEnumerableProperty, 248 writable: kEnumerableProperty, 249 [SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStream.name), 250}); 251 252function TransferredTransformStream() { 253 return makeTransferable(ReflectConstruct( 254 function() { 255 this[kType] = 'TransformStream'; 256 this[kState] = { 257 readable: undefined, 258 writable: undefined, 259 backpressure: undefined, 260 backpressureChange: { 261 promise: undefined, 262 resolve: undefined, 263 reject: undefined, 264 }, 265 controller: undefined, 266 }; 267 }, 268 [], TransformStream)); 269} 270TransferredTransformStream.prototype[kDeserialize] = () => {}; 271 272class TransformStreamDefaultController { 273 [kType] = 'TransformStreamDefaultController'; 274 275 constructor(skipThrowSymbol = undefined) { 276 if (skipThrowSymbol !== kSkipThrow) { 277 throw new ERR_ILLEGAL_CONSTRUCTOR(); 278 } 279 } 280 281 /** 282 * @readonly 283 * @type {number} 284 */ 285 get desiredSize() { 286 if (!isTransformStreamDefaultController(this)) 287 throw new ERR_INVALID_THIS('TransformStreamDefaultController'); 288 const { 289 stream, 290 } = this[kState]; 291 const { 292 readable, 293 } = stream[kState]; 294 const { 295 controller: readableController, 296 } = readable[kState]; 297 return readableStreamDefaultControllerGetDesiredSize(readableController); 298 } 299 300 /** 301 * @param {any} [chunk] 302 */ 303 enqueue(chunk = undefined) { 304 if (!isTransformStreamDefaultController(this)) 305 throw new ERR_INVALID_THIS('TransformStreamDefaultController'); 306 transformStreamDefaultControllerEnqueue(this, chunk); 307 } 308 309 /** 310 * @param {any} [reason] 311 */ 312 error(reason = undefined) { 313 if (!isTransformStreamDefaultController(this)) 314 throw new ERR_INVALID_THIS('TransformStreamDefaultController'); 315 transformStreamDefaultControllerError(this, reason); 316 } 317 318 terminate() { 319 if (!isTransformStreamDefaultController(this)) 320 throw new ERR_INVALID_THIS('TransformStreamDefaultController'); 321 transformStreamDefaultControllerTerminate(this); 322 } 323 324 [kInspect](depth, options) { 325 return customInspect(depth, options, this[kType], { 326 stream: this[kState].stream, 327 }); 328 } 329} 330 331ObjectDefineProperties(TransformStreamDefaultController.prototype, { 332 desiredSize: kEnumerableProperty, 333 enqueue: kEnumerableProperty, 334 error: kEnumerableProperty, 335 terminate: kEnumerableProperty, 336 [SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStreamDefaultController.name), 337}); 338 339const isTransformStream = 340 isBrandCheck('TransformStream'); 341const isTransformStreamDefaultController = 342 isBrandCheck('TransformStreamDefaultController'); 343 344async function defaultTransformAlgorithm(chunk, controller) { 345 transformStreamDefaultControllerEnqueue(controller, chunk); 346} 347 348function initializeTransformStream( 349 stream, 350 startPromise, 351 writableHighWaterMark, 352 writableSizeAlgorithm, 353 readableHighWaterMark, 354 readableSizeAlgorithm) { 355 356 const writable = new WritableStream({ 357 __proto__: null, 358 start() { return startPromise.promise; }, 359 write(chunk) { 360 return transformStreamDefaultSinkWriteAlgorithm(stream, chunk); 361 }, 362 abort(reason) { 363 return transformStreamDefaultSinkAbortAlgorithm(stream, reason); 364 }, 365 close() { 366 return transformStreamDefaultSinkCloseAlgorithm(stream); 367 }, 368 }, { 369 highWaterMark: writableHighWaterMark, 370 size: writableSizeAlgorithm, 371 }); 372 373 const readable = new ReadableStream({ 374 __proto__: null, 375 start() { return startPromise.promise; }, 376 pull() { 377 return transformStreamDefaultSourcePullAlgorithm(stream); 378 }, 379 cancel(reason) { 380 transformStreamErrorWritableAndUnblockWrite(stream, reason); 381 return PromiseResolve(); 382 }, 383 }, { 384 highWaterMark: readableHighWaterMark, 385 size: readableSizeAlgorithm, 386 }); 387 388 stream[kState] = { 389 readable, 390 writable, 391 controller: undefined, 392 backpressure: undefined, 393 backpressureChange: { 394 promise: undefined, 395 resolve: undefined, 396 reject: undefined, 397 }, 398 }; 399 400 transformStreamSetBackpressure(stream, true); 401} 402 403function transformStreamError(stream, error) { 404 const { 405 readable, 406 } = stream[kState]; 407 const { 408 controller, 409 } = readable[kState]; 410 readableStreamDefaultControllerError(controller, error); 411 transformStreamErrorWritableAndUnblockWrite(stream, error); 412} 413 414function transformStreamErrorWritableAndUnblockWrite(stream, error) { 415 const { 416 controller, 417 writable, 418 } = stream[kState]; 419 transformStreamDefaultControllerClearAlgorithms(controller); 420 writableStreamDefaultControllerErrorIfNeeded( 421 writable[kState].controller, 422 error); 423 if (stream[kState].backpressure) 424 transformStreamSetBackpressure(stream, false); 425} 426 427function transformStreamSetBackpressure(stream, backpressure) { 428 assert(stream[kState].backpressure !== backpressure); 429 if (stream[kState].backpressureChange.promise !== undefined) 430 stream[kState].backpressureChange.resolve?.(); 431 stream[kState].backpressureChange = createDeferredPromise(); 432 stream[kState].backpressure = backpressure; 433} 434 435function setupTransformStreamDefaultController( 436 stream, 437 controller, 438 transformAlgorithm, 439 flushAlgorithm) { 440 assert(isTransformStream(stream)); 441 assert(stream[kState].controller === undefined); 442 controller[kState] = { 443 stream, 444 transformAlgorithm, 445 flushAlgorithm, 446 }; 447 stream[kState].controller = controller; 448} 449 450function setupTransformStreamDefaultControllerFromTransformer( 451 stream, 452 transformer) { 453 const controller = new TransformStreamDefaultController(kSkipThrow); 454 const transform = transformer?.transform || defaultTransformAlgorithm; 455 const flush = transformer?.flush || nonOpFlush; 456 const transformAlgorithm = 457 FunctionPrototypeBind(transform, transformer); 458 const flushAlgorithm = 459 FunctionPrototypeBind(flush, transformer); 460 461 setupTransformStreamDefaultController( 462 stream, 463 controller, 464 transformAlgorithm, 465 flushAlgorithm); 466} 467 468function transformStreamDefaultControllerClearAlgorithms(controller) { 469 controller[kState].transformAlgorithm = undefined; 470 controller[kState].flushAlgorithm = undefined; 471} 472 473function transformStreamDefaultControllerEnqueue(controller, chunk) { 474 const { 475 stream, 476 } = controller[kState]; 477 const { 478 readable, 479 } = stream[kState]; 480 const { 481 controller: readableController, 482 } = readable[kState]; 483 if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) 484 throw new ERR_INVALID_STATE.TypeError('Unable to enqueue'); 485 try { 486 readableStreamDefaultControllerEnqueue(readableController, chunk); 487 } catch (error) { 488 transformStreamErrorWritableAndUnblockWrite(stream, error); 489 throw readable[kState].storedError; 490 } 491 const backpressure = 492 readableStreamDefaultControllerHasBackpressure(readableController); 493 if (backpressure !== stream[kState].backpressure) { 494 assert(backpressure); 495 transformStreamSetBackpressure(stream, true); 496 } 497} 498 499function transformStreamDefaultControllerError(controller, error) { 500 transformStreamError(controller[kState].stream, error); 501} 502 503async function transformStreamDefaultControllerPerformTransform(controller, chunk) { 504 try { 505 return await ensureIsPromise( 506 controller[kState].transformAlgorithm, 507 controller, 508 chunk, 509 controller); 510 } catch (error) { 511 transformStreamError(controller[kState].stream, error); 512 throw error; 513 } 514} 515 516function transformStreamDefaultControllerTerminate(controller) { 517 const { 518 stream, 519 } = controller[kState]; 520 const { 521 readable, 522 } = stream[kState]; 523 assert(readable !== undefined); 524 const { 525 controller: readableController, 526 } = readable[kState]; 527 readableStreamDefaultControllerClose(readableController); 528 transformStreamErrorWritableAndUnblockWrite( 529 stream, 530 new ERR_INVALID_STATE.TypeError('TransformStream has been terminated')); 531} 532 533function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) { 534 const { 535 writable, 536 controller, 537 } = stream[kState]; 538 assert(writable[kState].state === 'writable'); 539 if (stream[kState].backpressure) { 540 const backpressureChange = stream[kState].backpressureChange.promise; 541 return PromisePrototypeThen( 542 backpressureChange, 543 () => { 544 const { 545 writable, 546 } = stream[kState]; 547 if (writable[kState].state === 'erroring') 548 throw writable[kState].storedError; 549 assert(writable[kState].state === 'writable'); 550 return transformStreamDefaultControllerPerformTransform( 551 controller, 552 chunk); 553 }); 554 } 555 return transformStreamDefaultControllerPerformTransform(controller, chunk); 556} 557 558async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { 559 transformStreamError(stream, reason); 560} 561 562function transformStreamDefaultSinkCloseAlgorithm(stream) { 563 const { 564 readable, 565 controller, 566 } = stream[kState]; 567 568 const flushPromise = 569 ensureIsPromise( 570 controller[kState].flushAlgorithm, 571 controller, 572 controller); 573 transformStreamDefaultControllerClearAlgorithms(controller); 574 return PromisePrototypeThen( 575 flushPromise, 576 () => { 577 if (readable[kState].state === 'errored') 578 throw readable[kState].storedError; 579 readableStreamDefaultControllerClose(readable[kState].controller); 580 }, 581 (error) => { 582 transformStreamError(stream, error); 583 throw readable[kState].storedError; 584 }); 585} 586 587function transformStreamDefaultSourcePullAlgorithm(stream) { 588 assert(stream[kState].backpressure); 589 assert(stream[kState].backpressureChange.promise !== undefined); 590 transformStreamSetBackpressure(stream, false); 591 return stream[kState].backpressureChange.promise; 592} 593 594module.exports = { 595 TransformStream, 596 TransformStreamDefaultController, 597 TransferredTransformStream, 598 599 // Exported Brand Checks 600 isTransformStream, 601 isTransformStreamDefaultController, 602}; 603