• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  FunctionPrototypeBind,
5  FunctionPrototypeCall,
6  ObjectDefineProperties,
7  PromisePrototypeThen,
8  PromiseResolve,
9  ReflectConstruct,
10  SymbolToStringTag,
11  Symbol,
12} = primordials;
13
14const {
15  codes: {
16    ERR_ILLEGAL_CONSTRUCTOR,
17    ERR_INVALID_ARG_VALUE,
18    ERR_INVALID_STATE,
19    ERR_INVALID_THIS,
20  },
21} = require('internal/errors');
22
23const {
24  DOMException,
25} = internalBinding('messaging');
26
27const {
28  createDeferredPromise,
29  customInspectSymbol: kInspect,
30  kEmptyObject,
31  kEnumerableProperty,
32} = require('internal/util');
33
34const {
35  kDeserialize,
36  kTransfer,
37  kTransferList,
38  makeTransferable,
39} = require('internal/worker/js_transferable');
40
41const {
42  customInspect,
43  ensureIsPromise,
44  extractHighWaterMark,
45  extractSizeAlgorithm,
46  isBrandCheck,
47  nonOpFlush,
48  kType,
49  kState,
50} = require('internal/webstreams/util');
51
52const {
53  ReadableStream,
54  readableStreamDefaultControllerCanCloseOrEnqueue,
55  readableStreamDefaultControllerClose,
56  readableStreamDefaultControllerEnqueue,
57  readableStreamDefaultControllerError,
58  readableStreamDefaultControllerGetDesiredSize,
59  readableStreamDefaultControllerHasBackpressure,
60} = require('internal/webstreams/readablestream');
61
62const {
63  WritableStream,
64  writableStreamDefaultControllerErrorIfNeeded,
65} = require('internal/webstreams/writablestream');
66
67const assert = require('internal/assert');
68
69const kSkipThrow = Symbol('kSkipThrow');
70
71const getNonWritablePropertyDescriptor = (value) => {
72  return {
73    __proto__: null,
74    configurable: true,
75    value,
76  };
77};
78
79/**
80 * @typedef {import('./queuingstrategies').QueuingStrategy
81 * } QueuingStrategy
82 * @typedef {import('./queuingstrategies').QueuingStrategySize
83 * } QueuingStrategySize
84 */
85
86/**
87 * @callback TransformerStartCallback
88 * @param {TransformStreamDefaultController} controller;
89 */
90
91/**
92 * @callback TransformerFlushCallback
93 * @param {TransformStreamDefaultController} controller;
94 * @returns {Promise<void>}
95 */
96
97/**
98 * @callback TransformerTransformCallback
99 * @param {any} chunk
100 * @param {TransformStreamDefaultController} controller
101 * @returns {Promise<void>}
102 */
103
104/**
105 * @typedef {{
106 *  start? : TransformerStartCallback,
107 *  transform? : TransformerTransformCallback,
108 *  flush? : TransformerFlushCallback,
109 *  readableType? : any,
110 *  writableType? : any,
111 * }} Transformer
112 */
113
114class TransformStream {
115  [kType] = 'TransformStream';
116
117  /**
118   * @param {Transformer} [transformer]
119   * @param {QueuingStrategy} [writableStrategy]
120   * @param {QueuingStrategy} [readableStrategy]
121   */
122  constructor(
123    transformer = null,
124    writableStrategy = kEmptyObject,
125    readableStrategy = kEmptyObject) {
126    const readableType = transformer?.readableType;
127    const writableType = transformer?.writableType;
128    const start = transformer?.start;
129
130    if (readableType !== undefined) {
131      throw new ERR_INVALID_ARG_VALUE.RangeError(
132        'transformer.readableType',
133        readableType);
134    }
135    if (writableType !== undefined) {
136      throw new ERR_INVALID_ARG_VALUE.RangeError(
137        'transformer.writableType',
138        writableType);
139    }
140
141    const readableHighWaterMark = readableStrategy?.highWaterMark;
142    const readableSize = readableStrategy?.size;
143
144    const writableHighWaterMark = writableStrategy?.highWaterMark;
145    const writableSize = writableStrategy?.size;
146
147    const actualReadableHighWaterMark =
148      extractHighWaterMark(readableHighWaterMark, 0);
149    const actualReadableSize = extractSizeAlgorithm(readableSize);
150
151    const actualWritableHighWaterMark =
152      extractHighWaterMark(writableHighWaterMark, 1);
153    const actualWritableSize = extractSizeAlgorithm(writableSize);
154
155    const startPromise = createDeferredPromise();
156
157    initializeTransformStream(
158      this,
159      startPromise,
160      actualWritableHighWaterMark,
161      actualWritableSize,
162      actualReadableHighWaterMark,
163      actualReadableSize);
164
165    setupTransformStreamDefaultControllerFromTransformer(this, transformer);
166
167    if (start !== undefined) {
168      startPromise.resolve(
169        FunctionPrototypeCall(
170          start,
171          transformer,
172          this[kState].controller));
173    } else {
174      startPromise.resolve();
175    }
176
177    // eslint-disable-next-line no-constructor-return
178    return makeTransferable(this);
179  }
180
181  /**
182   * @readonly
183   * @type {ReadableStream}
184   */
185  get readable() {
186    if (!isTransformStream(this))
187      throw new ERR_INVALID_THIS('TransformStream');
188    return this[kState].readable;
189  }
190
191  /**
192   * @readonly
193   * @type {WritableStream}
194   */
195  get writable() {
196    if (!isTransformStream(this))
197      throw new ERR_INVALID_THIS('TransformStream');
198    return this[kState].writable;
199  }
200
201  [kInspect](depth, options) {
202    return customInspect(depth, options, this[kType], {
203      readable: this.readable,
204      writable: this.writable,
205      backpressure: this[kState].backpressure,
206    });
207  }
208
209  [kTransfer]() {
210    if (!isTransformStream(this))
211      throw new ERR_INVALID_THIS('TransformStream');
212    const {
213      readable,
214      writable,
215    } = this[kState];
216    if (readable.locked) {
217      throw new DOMException(
218        'Cannot transfer a locked ReadableStream',
219        'DataCloneError');
220    }
221    if (writable.locked) {
222      throw new DOMException(
223        'Cannot transfer a locked WritableStream',
224        'DataCloneError');
225    }
226    return {
227      data: {
228        readable,
229        writable,
230      },
231      deserializeInfo:
232        'internal/webstreams/transformstream:TransferredTransformStream',
233    };
234  }
235
236  [kTransferList]() {
237    return [ this[kState].readable, this[kState].writable ];
238  }
239
240  [kDeserialize]({ readable, writable }) {
241    this[kState].readable = readable;
242    this[kState].writable = writable;
243  }
244}
245
246ObjectDefineProperties(TransformStream.prototype, {
247  readable: kEnumerableProperty,
248  writable: kEnumerableProperty,
249  [SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStream.name),
250});
251
252function TransferredTransformStream() {
253  return makeTransferable(ReflectConstruct(
254    function() {
255      this[kType] = 'TransformStream';
256      this[kState] = {
257        readable: undefined,
258        writable: undefined,
259        backpressure: undefined,
260        backpressureChange: {
261          promise: undefined,
262          resolve: undefined,
263          reject: undefined,
264        },
265        controller: undefined,
266      };
267    },
268    [], TransformStream));
269}
270TransferredTransformStream.prototype[kDeserialize] = () => {};
271
272class TransformStreamDefaultController {
273  [kType] = 'TransformStreamDefaultController';
274
275  constructor(skipThrowSymbol = undefined) {
276    if (skipThrowSymbol !== kSkipThrow) {
277      throw new ERR_ILLEGAL_CONSTRUCTOR();
278    }
279  }
280
281  /**
282   * @readonly
283   * @type {number}
284   */
285  get desiredSize() {
286    if (!isTransformStreamDefaultController(this))
287      throw new ERR_INVALID_THIS('TransformStreamDefaultController');
288    const {
289      stream,
290    } = this[kState];
291    const {
292      readable,
293    } = stream[kState];
294    const {
295      controller: readableController,
296    } = readable[kState];
297    return readableStreamDefaultControllerGetDesiredSize(readableController);
298  }
299
300  /**
301   * @param {any} [chunk]
302   */
303  enqueue(chunk = undefined) {
304    if (!isTransformStreamDefaultController(this))
305      throw new ERR_INVALID_THIS('TransformStreamDefaultController');
306    transformStreamDefaultControllerEnqueue(this, chunk);
307  }
308
309  /**
310   * @param {any} [reason]
311   */
312  error(reason = undefined) {
313    if (!isTransformStreamDefaultController(this))
314      throw new ERR_INVALID_THIS('TransformStreamDefaultController');
315    transformStreamDefaultControllerError(this, reason);
316  }
317
318  terminate() {
319    if (!isTransformStreamDefaultController(this))
320      throw new ERR_INVALID_THIS('TransformStreamDefaultController');
321    transformStreamDefaultControllerTerminate(this);
322  }
323
324  [kInspect](depth, options) {
325    return customInspect(depth, options, this[kType], {
326      stream: this[kState].stream,
327    });
328  }
329}
330
331ObjectDefineProperties(TransformStreamDefaultController.prototype, {
332  desiredSize: kEnumerableProperty,
333  enqueue: kEnumerableProperty,
334  error: kEnumerableProperty,
335  terminate: kEnumerableProperty,
336  [SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStreamDefaultController.name),
337});
338
339const isTransformStream =
340  isBrandCheck('TransformStream');
341const isTransformStreamDefaultController =
342  isBrandCheck('TransformStreamDefaultController');
343
344async function defaultTransformAlgorithm(chunk, controller) {
345  transformStreamDefaultControllerEnqueue(controller, chunk);
346}
347
348function initializeTransformStream(
349  stream,
350  startPromise,
351  writableHighWaterMark,
352  writableSizeAlgorithm,
353  readableHighWaterMark,
354  readableSizeAlgorithm) {
355
356  const writable = new WritableStream({
357    __proto__: null,
358    start() { return startPromise.promise; },
359    write(chunk) {
360      return transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
361    },
362    abort(reason) {
363      return transformStreamDefaultSinkAbortAlgorithm(stream, reason);
364    },
365    close() {
366      return transformStreamDefaultSinkCloseAlgorithm(stream);
367    },
368  }, {
369    highWaterMark: writableHighWaterMark,
370    size: writableSizeAlgorithm,
371  });
372
373  const readable = new ReadableStream({
374    __proto__: null,
375    start() { return startPromise.promise; },
376    pull() {
377      return transformStreamDefaultSourcePullAlgorithm(stream);
378    },
379    cancel(reason) {
380      transformStreamErrorWritableAndUnblockWrite(stream, reason);
381      return PromiseResolve();
382    },
383  }, {
384    highWaterMark: readableHighWaterMark,
385    size: readableSizeAlgorithm,
386  });
387
388  stream[kState] = {
389    readable,
390    writable,
391    controller: undefined,
392    backpressure: undefined,
393    backpressureChange: {
394      promise: undefined,
395      resolve: undefined,
396      reject: undefined,
397    },
398  };
399
400  transformStreamSetBackpressure(stream, true);
401}
402
403function transformStreamError(stream, error) {
404  const {
405    readable,
406  } = stream[kState];
407  const {
408    controller,
409  } = readable[kState];
410  readableStreamDefaultControllerError(controller, error);
411  transformStreamErrorWritableAndUnblockWrite(stream, error);
412}
413
414function transformStreamErrorWritableAndUnblockWrite(stream, error) {
415  const {
416    controller,
417    writable,
418  } = stream[kState];
419  transformStreamDefaultControllerClearAlgorithms(controller);
420  writableStreamDefaultControllerErrorIfNeeded(
421    writable[kState].controller,
422    error);
423  if (stream[kState].backpressure)
424    transformStreamSetBackpressure(stream, false);
425}
426
427function transformStreamSetBackpressure(stream, backpressure) {
428  assert(stream[kState].backpressure !== backpressure);
429  if (stream[kState].backpressureChange.promise !== undefined)
430    stream[kState].backpressureChange.resolve?.();
431  stream[kState].backpressureChange = createDeferredPromise();
432  stream[kState].backpressure = backpressure;
433}
434
435function setupTransformStreamDefaultController(
436  stream,
437  controller,
438  transformAlgorithm,
439  flushAlgorithm) {
440  assert(isTransformStream(stream));
441  assert(stream[kState].controller === undefined);
442  controller[kState] = {
443    stream,
444    transformAlgorithm,
445    flushAlgorithm,
446  };
447  stream[kState].controller = controller;
448}
449
450function setupTransformStreamDefaultControllerFromTransformer(
451  stream,
452  transformer) {
453  const controller = new TransformStreamDefaultController(kSkipThrow);
454  const transform = transformer?.transform || defaultTransformAlgorithm;
455  const flush = transformer?.flush || nonOpFlush;
456  const transformAlgorithm =
457    FunctionPrototypeBind(transform, transformer);
458  const flushAlgorithm =
459    FunctionPrototypeBind(flush, transformer);
460
461  setupTransformStreamDefaultController(
462    stream,
463    controller,
464    transformAlgorithm,
465    flushAlgorithm);
466}
467
468function transformStreamDefaultControllerClearAlgorithms(controller) {
469  controller[kState].transformAlgorithm = undefined;
470  controller[kState].flushAlgorithm = undefined;
471}
472
473function transformStreamDefaultControllerEnqueue(controller, chunk) {
474  const {
475    stream,
476  } = controller[kState];
477  const {
478    readable,
479  } = stream[kState];
480  const {
481    controller: readableController,
482  } = readable[kState];
483  if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
484    throw new ERR_INVALID_STATE.TypeError('Unable to enqueue');
485  try {
486    readableStreamDefaultControllerEnqueue(readableController, chunk);
487  } catch (error) {
488    transformStreamErrorWritableAndUnblockWrite(stream, error);
489    throw readable[kState].storedError;
490  }
491  const backpressure =
492    readableStreamDefaultControllerHasBackpressure(readableController);
493  if (backpressure !== stream[kState].backpressure) {
494    assert(backpressure);
495    transformStreamSetBackpressure(stream, true);
496  }
497}
498
499function transformStreamDefaultControllerError(controller, error) {
500  transformStreamError(controller[kState].stream, error);
501}
502
503async function transformStreamDefaultControllerPerformTransform(controller, chunk) {
504  try {
505    return await ensureIsPromise(
506      controller[kState].transformAlgorithm,
507      controller,
508      chunk,
509      controller);
510  } catch (error) {
511    transformStreamError(controller[kState].stream, error);
512    throw error;
513  }
514}
515
516function transformStreamDefaultControllerTerminate(controller) {
517  const {
518    stream,
519  } = controller[kState];
520  const {
521    readable,
522  } = stream[kState];
523  assert(readable !== undefined);
524  const {
525    controller: readableController,
526  } = readable[kState];
527  readableStreamDefaultControllerClose(readableController);
528  transformStreamErrorWritableAndUnblockWrite(
529    stream,
530    new ERR_INVALID_STATE.TypeError('TransformStream has been terminated'));
531}
532
533function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
534  const {
535    writable,
536    controller,
537  } = stream[kState];
538  assert(writable[kState].state === 'writable');
539  if (stream[kState].backpressure) {
540    const backpressureChange = stream[kState].backpressureChange.promise;
541    return PromisePrototypeThen(
542      backpressureChange,
543      () => {
544        const {
545          writable,
546        } = stream[kState];
547        if (writable[kState].state === 'erroring')
548          throw writable[kState].storedError;
549        assert(writable[kState].state === 'writable');
550        return transformStreamDefaultControllerPerformTransform(
551          controller,
552          chunk);
553      });
554  }
555  return transformStreamDefaultControllerPerformTransform(controller, chunk);
556}
557
558async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
559  transformStreamError(stream, reason);
560}
561
562function transformStreamDefaultSinkCloseAlgorithm(stream) {
563  const {
564    readable,
565    controller,
566  } = stream[kState];
567
568  const flushPromise =
569    ensureIsPromise(
570      controller[kState].flushAlgorithm,
571      controller,
572      controller);
573  transformStreamDefaultControllerClearAlgorithms(controller);
574  return PromisePrototypeThen(
575    flushPromise,
576    () => {
577      if (readable[kState].state === 'errored')
578        throw readable[kState].storedError;
579      readableStreamDefaultControllerClose(readable[kState].controller);
580    },
581    (error) => {
582      transformStreamError(stream, error);
583      throw readable[kState].storedError;
584    });
585}
586
587function transformStreamDefaultSourcePullAlgorithm(stream) {
588  assert(stream[kState].backpressure);
589  assert(stream[kState].backpressureChange.promise !== undefined);
590  transformStreamSetBackpressure(stream, false);
591  return stream[kState].backpressureChange.promise;
592}
593
594module.exports = {
595  TransformStream,
596  TransformStreamDefaultController,
597  TransferredTransformStream,
598
599  // Exported Brand Checks
600  isTransformStream,
601  isTransformStreamDefaultController,
602};
603