• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  FunctionPrototypeBind,
5  FunctionPrototypeCall,
6  ObjectDefineProperties,
7  PromisePrototypeThen,
8  PromiseResolve,
9  ReflectConstruct,
10  SymbolToStringTag,
11} = primordials;
12
13const {
14  codes: {
15    ERR_ILLEGAL_CONSTRUCTOR,
16    ERR_INVALID_ARG_VALUE,
17    ERR_INVALID_STATE,
18    ERR_INVALID_THIS,
19  },
20} = require('internal/errors');
21
22const {
23  DOMException,
24} = internalBinding('messaging');
25
26const {
27  createDeferredPromise,
28  customInspectSymbol: kInspect,
29  kEmptyObject,
30  kEnumerableProperty,
31} = require('internal/util');
32
33const {
34  kDeserialize,
35  kTransfer,
36  kTransferList,
37  makeTransferable,
38} = require('internal/worker/js_transferable');
39
40const {
41  customInspect,
42  ensureIsPromise,
43  extractHighWaterMark,
44  extractSizeAlgorithm,
45  isBrandCheck,
46  nonOpFlush,
47  kType,
48  kState,
49} = require('internal/webstreams/util');
50
51const {
52  ReadableStream,
53  readableStreamDefaultControllerCanCloseOrEnqueue,
54  readableStreamDefaultControllerClose,
55  readableStreamDefaultControllerEnqueue,
56  readableStreamDefaultControllerError,
57  readableStreamDefaultControllerGetDesiredSize,
58  readableStreamDefaultControllerHasBackpressure,
59} = require('internal/webstreams/readablestream');
60
61const {
62  WritableStream,
63  writableStreamDefaultControllerErrorIfNeeded,
64} = require('internal/webstreams/writablestream');
65
66const assert = require('internal/assert');
67
68const getNonWritablePropertyDescriptor = (value) => {
69  return {
70    __proto__: null,
71    configurable: true,
72    value,
73  };
74};
75
76/**
77 * @typedef {import('./queuingstrategies').QueuingStrategy
78 * } QueuingStrategy
79 * @typedef {import('./queuingstrategies').QueuingStrategySize
80 * } QueuingStrategySize
81 */
82
83/**
84 * @callback TransformerStartCallback
85 * @param {TransformStreamDefaultController} controller;
86 */
87
88/**
89 * @callback TransformerFlushCallback
90 * @param {TransformStreamDefaultController} controller;
91 * @returns {Promise<void>}
92 */
93
94/**
95 * @callback TransformerTransformCallback
96 * @param {any} chunk
97 * @param {TransformStreamDefaultController} controller
98 * @returns {Promise<void>}
99 */
100
101/**
102 * @typedef {{
103 *  start? : TransformerStartCallback,
104 *  transform? : TransformerTransformCallback,
105 *  flush? : TransformerFlushCallback,
106 *  readableType? : any,
107 *  writableType? : any,
108 * }} Transformer
109 */
110
111class TransformStream {
112  [kType] = 'TransformStream';
113
114  /**
115   * @param {Transformer} [transformer]
116   * @param {QueuingStrategy} [writableStrategy]
117   * @param {QueuingStrategy} [readableStrategy]
118   */
119  constructor(
120    transformer = null,
121    writableStrategy = kEmptyObject,
122    readableStrategy = kEmptyObject) {
123    const readableType = transformer?.readableType;
124    const writableType = transformer?.writableType;
125    const start = transformer?.start;
126
127    if (readableType !== undefined) {
128      throw new ERR_INVALID_ARG_VALUE.RangeError(
129        'transformer.readableType',
130        readableType);
131    }
132    if (writableType !== undefined) {
133      throw new ERR_INVALID_ARG_VALUE.RangeError(
134        'transformer.writableType',
135        writableType);
136    }
137
138    const readableHighWaterMark = readableStrategy?.highWaterMark;
139    const readableSize = readableStrategy?.size;
140
141    const writableHighWaterMark = writableStrategy?.highWaterMark;
142    const writableSize = writableStrategy?.size;
143
144    const actualReadableHighWaterMark =
145      extractHighWaterMark(readableHighWaterMark, 0);
146    const actualReadableSize = extractSizeAlgorithm(readableSize);
147
148    const actualWritableHighWaterMark =
149      extractHighWaterMark(writableHighWaterMark, 1);
150    const actualWritableSize = extractSizeAlgorithm(writableSize);
151
152    const startPromise = createDeferredPromise();
153
154    initializeTransformStream(
155      this,
156      startPromise,
157      actualWritableHighWaterMark,
158      actualWritableSize,
159      actualReadableHighWaterMark,
160      actualReadableSize);
161
162    setupTransformStreamDefaultControllerFromTransformer(this, transformer);
163
164    if (start !== undefined) {
165      startPromise.resolve(
166        FunctionPrototypeCall(
167          start,
168          transformer,
169          this[kState].controller));
170    } else {
171      startPromise.resolve();
172    }
173
174    // eslint-disable-next-line no-constructor-return
175    return makeTransferable(this);
176  }
177
178  /**
179   * @readonly
180   * @type {ReadableStream}
181   */
182  get readable() {
183    if (!isTransformStream(this))
184      throw new ERR_INVALID_THIS('TransformStream');
185    return this[kState].readable;
186  }
187
188  /**
189   * @readonly
190   * @type {WritableStream}
191   */
192  get writable() {
193    if (!isTransformStream(this))
194      throw new ERR_INVALID_THIS('TransformStream');
195    return this[kState].writable;
196  }
197
198  [kInspect](depth, options) {
199    return customInspect(depth, options, this[kType], {
200      readable: this.readable,
201      writable: this.writable,
202      backpressure: this[kState].backpressure,
203    });
204  }
205
206  [kTransfer]() {
207    if (!isTransformStream(this))
208      throw new ERR_INVALID_THIS('TransformStream');
209    const {
210      readable,
211      writable,
212    } = this[kState];
213    if (readable.locked) {
214      throw new DOMException(
215        'Cannot transfer a locked ReadableStream',
216        'DataCloneError');
217    }
218    if (writable.locked) {
219      throw new DOMException(
220        'Cannot transfer a locked WritableStream',
221        'DataCloneError');
222    }
223    return {
224      data: {
225        readable,
226        writable,
227      },
228      deserializeInfo:
229        'internal/webstreams/transformstream:TransferredTransformStream',
230    };
231  }
232
233  [kTransferList]() {
234    return [ this[kState].readable, this[kState].writable ];
235  }
236
237  [kDeserialize]({ readable, writable }) {
238    this[kState].readable = readable;
239    this[kState].writable = writable;
240  }
241}
242
243ObjectDefineProperties(TransformStream.prototype, {
244  readable: kEnumerableProperty,
245  writable: kEnumerableProperty,
246  [SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStream.name),
247});
248
249function TransferredTransformStream() {
250  return makeTransferable(ReflectConstruct(
251    function() {
252      this[kType] = 'TransformStream';
253      this[kState] = {
254        readable: undefined,
255        writable: undefined,
256        backpressure: undefined,
257        backpressureChange: {
258          promise: undefined,
259          resolve: undefined,
260          reject: undefined,
261        },
262        controller: undefined,
263      };
264    },
265    [], TransformStream));
266}
267TransferredTransformStream.prototype[kDeserialize] = () => {};
268
269class TransformStreamDefaultController {
270  [kType] = 'TransformStreamDefaultController';
271
272  constructor() {
273    throw new ERR_ILLEGAL_CONSTRUCTOR();
274  }
275
276  /**
277   * @readonly
278   * @type {number}
279   */
280  get desiredSize() {
281    if (!isTransformStreamDefaultController(this))
282      throw new ERR_INVALID_THIS('TransformStreamDefaultController');
283    const {
284      stream,
285    } = this[kState];
286    const {
287      readable,
288    } = stream[kState];
289    const {
290      controller: readableController,
291    } = readable[kState];
292    return readableStreamDefaultControllerGetDesiredSize(readableController);
293  }
294
295  /**
296   * @param {any} [chunk]
297   */
298  enqueue(chunk = undefined) {
299    if (!isTransformStreamDefaultController(this))
300      throw new ERR_INVALID_THIS('TransformStreamDefaultController');
301    transformStreamDefaultControllerEnqueue(this, chunk);
302  }
303
304  /**
305   * @param {any} [reason]
306   */
307  error(reason = undefined) {
308    if (!isTransformStreamDefaultController(this))
309      throw new ERR_INVALID_THIS('TransformStreamDefaultController');
310    transformStreamDefaultControllerError(this, reason);
311  }
312
313  terminate() {
314    if (!isTransformStreamDefaultController(this))
315      throw new ERR_INVALID_THIS('TransformStreamDefaultController');
316    transformStreamDefaultControllerTerminate(this);
317  }
318
319  [kInspect](depth, options) {
320    return customInspect(depth, options, this[kType], {
321      stream: this[kState].stream,
322    });
323  }
324}
325
326ObjectDefineProperties(TransformStreamDefaultController.prototype, {
327  desiredSize: kEnumerableProperty,
328  enqueue: kEnumerableProperty,
329  error: kEnumerableProperty,
330  terminate: kEnumerableProperty,
331  [SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStreamDefaultController.name),
332});
333
334function createTransformStreamDefaultController() {
335  return ReflectConstruct(
336    function() {
337      this[kType] = 'TransformStreamDefaultController';
338    },
339    [],
340    TransformStreamDefaultController);
341}
342
343const isTransformStream =
344  isBrandCheck('TransformStream');
345const isTransformStreamDefaultController =
346  isBrandCheck('TransformStreamDefaultController');
347
348async function defaultTransformAlgorithm(chunk, controller) {
349  transformStreamDefaultControllerEnqueue(controller, chunk);
350}
351
352function initializeTransformStream(
353  stream,
354  startPromise,
355  writableHighWaterMark,
356  writableSizeAlgorithm,
357  readableHighWaterMark,
358  readableSizeAlgorithm) {
359
360  const writable = new WritableStream({
361    __proto__: null,
362    start() { return startPromise.promise; },
363    write(chunk) {
364      return transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
365    },
366    abort(reason) {
367      return transformStreamDefaultSinkAbortAlgorithm(stream, reason);
368    },
369    close() {
370      return transformStreamDefaultSinkCloseAlgorithm(stream);
371    },
372  }, {
373    highWaterMark: writableHighWaterMark,
374    size: writableSizeAlgorithm,
375  });
376
377  const readable = new ReadableStream({
378    __proto__: null,
379    start() { return startPromise.promise; },
380    pull() {
381      return transformStreamDefaultSourcePullAlgorithm(stream);
382    },
383    cancel(reason) {
384      transformStreamErrorWritableAndUnblockWrite(stream, reason);
385      return PromiseResolve();
386    },
387  }, {
388    highWaterMark: readableHighWaterMark,
389    size: readableSizeAlgorithm,
390  });
391
392  stream[kState] = {
393    readable,
394    writable,
395    controller: undefined,
396    backpressure: undefined,
397    backpressureChange: {
398      promise: undefined,
399      resolve: undefined,
400      reject: undefined,
401    },
402  };
403
404  transformStreamSetBackpressure(stream, true);
405}
406
407function transformStreamError(stream, error) {
408  const {
409    readable,
410  } = stream[kState];
411  const {
412    controller,
413  } = readable[kState];
414  readableStreamDefaultControllerError(controller, error);
415  transformStreamErrorWritableAndUnblockWrite(stream, error);
416}
417
418function transformStreamErrorWritableAndUnblockWrite(stream, error) {
419  const {
420    controller,
421    writable,
422  } = stream[kState];
423  transformStreamDefaultControllerClearAlgorithms(controller);
424  writableStreamDefaultControllerErrorIfNeeded(
425    writable[kState].controller,
426    error);
427  if (stream[kState].backpressure)
428    transformStreamSetBackpressure(stream, false);
429}
430
431function transformStreamSetBackpressure(stream, backpressure) {
432  assert(stream[kState].backpressure !== backpressure);
433  if (stream[kState].backpressureChange.promise !== undefined)
434    stream[kState].backpressureChange.resolve?.();
435  stream[kState].backpressureChange = createDeferredPromise();
436  stream[kState].backpressure = backpressure;
437}
438
439function setupTransformStreamDefaultController(
440  stream,
441  controller,
442  transformAlgorithm,
443  flushAlgorithm) {
444  assert(isTransformStream(stream));
445  assert(stream[kState].controller === undefined);
446  controller[kState] = {
447    stream,
448    transformAlgorithm,
449    flushAlgorithm,
450  };
451  stream[kState].controller = controller;
452}
453
454function setupTransformStreamDefaultControllerFromTransformer(
455  stream,
456  transformer) {
457  const controller = createTransformStreamDefaultController();
458  const transform = transformer?.transform || defaultTransformAlgorithm;
459  const flush = transformer?.flush || nonOpFlush;
460  const transformAlgorithm =
461    FunctionPrototypeBind(transform, transformer);
462  const flushAlgorithm =
463    FunctionPrototypeBind(flush, transformer);
464
465  setupTransformStreamDefaultController(
466    stream,
467    controller,
468    transformAlgorithm,
469    flushAlgorithm);
470}
471
472function transformStreamDefaultControllerClearAlgorithms(controller) {
473  controller[kState].transformAlgorithm = undefined;
474  controller[kState].flushAlgorithm = undefined;
475}
476
477function transformStreamDefaultControllerEnqueue(controller, chunk) {
478  const {
479    stream,
480  } = controller[kState];
481  const {
482    readable,
483  } = stream[kState];
484  const {
485    controller: readableController,
486  } = readable[kState];
487  if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
488    throw new ERR_INVALID_STATE.TypeError('Unable to enqueue');
489  try {
490    readableStreamDefaultControllerEnqueue(readableController, chunk);
491  } catch (error) {
492    transformStreamErrorWritableAndUnblockWrite(stream, error);
493    throw readable[kState].storedError;
494  }
495  const backpressure =
496    readableStreamDefaultControllerHasBackpressure(readableController);
497  if (backpressure !== stream[kState].backpressure) {
498    assert(backpressure);
499    transformStreamSetBackpressure(stream, true);
500  }
501}
502
503function transformStreamDefaultControllerError(controller, error) {
504  transformStreamError(controller[kState].stream, error);
505}
506
507async function transformStreamDefaultControllerPerformTransform(controller, chunk) {
508  try {
509    return await ensureIsPromise(
510      controller[kState].transformAlgorithm,
511      controller,
512      chunk,
513      controller);
514  } catch (error) {
515    transformStreamError(controller[kState].stream, error);
516    throw error;
517  }
518}
519
520function transformStreamDefaultControllerTerminate(controller) {
521  const {
522    stream,
523  } = controller[kState];
524  const {
525    readable,
526  } = stream[kState];
527  assert(readable !== undefined);
528  const {
529    controller: readableController,
530  } = readable[kState];
531  readableStreamDefaultControllerClose(readableController);
532  transformStreamErrorWritableAndUnblockWrite(
533    stream,
534    new ERR_INVALID_STATE.TypeError('TransformStream has been terminated'));
535}
536
537function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
538  const {
539    writable,
540    controller,
541  } = stream[kState];
542  assert(writable[kState].state === 'writable');
543  if (stream[kState].backpressure) {
544    const backpressureChange = stream[kState].backpressureChange.promise;
545    return PromisePrototypeThen(
546      backpressureChange,
547      () => {
548        const {
549          writable,
550        } = stream[kState];
551        if (writable[kState].state === 'erroring')
552          throw writable[kState].storedError;
553        assert(writable[kState].state === 'writable');
554        return transformStreamDefaultControllerPerformTransform(
555          controller,
556          chunk);
557      });
558  }
559  return transformStreamDefaultControllerPerformTransform(controller, chunk);
560}
561
562async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
563  transformStreamError(stream, reason);
564}
565
566function transformStreamDefaultSinkCloseAlgorithm(stream) {
567  const {
568    readable,
569    controller,
570  } = stream[kState];
571
572  const flushPromise =
573    ensureIsPromise(
574      controller[kState].flushAlgorithm,
575      controller,
576      controller);
577  transformStreamDefaultControllerClearAlgorithms(controller);
578  return PromisePrototypeThen(
579    flushPromise,
580    () => {
581      if (readable[kState].state === 'errored')
582        throw readable[kState].storedError;
583      readableStreamDefaultControllerClose(readable[kState].controller);
584    },
585    (error) => {
586      transformStreamError(stream, error);
587      throw readable[kState].storedError;
588    });
589}
590
591function transformStreamDefaultSourcePullAlgorithm(stream) {
592  assert(stream[kState].backpressure);
593  assert(stream[kState].backpressureChange.promise !== undefined);
594  transformStreamSetBackpressure(stream, false);
595  return stream[kState].backpressureChange.promise;
596}
597
598module.exports = {
599  TransformStream,
600  TransformStreamDefaultController,
601  TransferredTransformStream,
602
603  // Exported Brand Checks
604  isTransformStream,
605  isTransformStreamDefaultController,
606};
607