• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Flags: --expose-internals --no-warnings
2'use strict';
3
4const common = require('../common');
5const assert = require('assert');
6
7const {
8  ReadableStream,
9  TransformStream,
10  TransformStreamDefaultController,
11} = require('stream/web');
12
13const {
14  createReadStream,
15  readFileSync,
16} = require('fs');
17
18const {
19  kTransfer,
20} = require('internal/worker/js_transferable');
21
22const {
23  inspect,
24} = require('util');
25
26assert.throws(() => new TransformStream({ readableType: 1 }), {
27  code: 'ERR_INVALID_ARG_VALUE',
28});
29assert.throws(() => new TransformStream({ writableType: 1 }), {
30  code: 'ERR_INVALID_ARG_VALUE',
31});
32
33
34{
35  const stream = new TransformStream();
36
37  async function test(stream) {
38    const writer = stream.writable.getWriter();
39    const reader = stream.readable.getReader();
40
41    const { 1: result } = await Promise.all([
42      writer.write('hello'),
43      reader.read(),
44    ]);
45
46    assert.strictEqual(result.value, 'hello');
47  }
48
49  test(stream).then(common.mustCall());
50}
51
52class Transform {
53  start(controller) {
54    this.started = true;
55  }
56
57  async transform(chunk, controller) {
58    controller.enqueue(chunk.toUpperCase());
59  }
60
61  async flush() {
62    this.flushed = true;
63  }
64}
65
66{
67  const transform = new Transform();
68  const stream = new TransformStream(transform);
69  assert(transform.started);
70
71  async function test(stream) {
72    const writer = stream.writable.getWriter();
73    const reader = stream.readable.getReader();
74
75    const { 1: result } = await Promise.all([
76      writer.write('hello'),
77      reader.read(),
78    ]);
79
80    assert.strictEqual(result.value, 'HELLO');
81
82    await writer.close();
83  }
84
85  test(stream).then(common.mustCall(() => {
86    assert(transform.flushed);
87  }));
88}
89
90class Source {
91  constructor() {
92    this.cancelCalled = false;
93  }
94
95  start(controller) {
96    this.stream = createReadStream(__filename);
97    this.stream.on('data', (chunk) => {
98      controller.enqueue(chunk.toString());
99    });
100    this.stream.once('end', () => {
101      if (!this.cancelCalled)
102        controller.close();
103    });
104    this.stream.once('error', (error) => {
105      controller.error(error);
106    });
107  }
108
109  cancel() {
110    this.cancelCalled = true;
111  }
112}
113
114{
115  const instream = new ReadableStream(new Source());
116  const tstream = new TransformStream(new Transform());
117  const r = instream.pipeThrough(tstream);
118
119  async function read(stream) {
120    let res = '';
121    for await (const chunk of stream)
122      res += chunk;
123    return res;
124  }
125
126  read(r).then(common.mustCall((data) => {
127    const check = readFileSync(__filename);
128    assert.strictEqual(check.toString().toUpperCase(), data);
129  }));
130}
131
132{
133  assert.throws(() => Reflect.get(TransformStream.prototype, 'readable', {}), {
134    code: 'ERR_INVALID_THIS',
135  });
136  assert.throws(() => Reflect.get(TransformStream.prototype, 'writable', {}), {
137    code: 'ERR_INVALID_THIS',
138  });
139  assert.throws(() => TransformStream.prototype[kTransfer]({}), {
140    code: 'ERR_INVALID_THIS',
141  });
142
143  assert.throws(() => {
144    Reflect.get(TransformStreamDefaultController.prototype, 'desiredSize', {});
145  }, {
146    code: 'ERR_INVALID_THIS',
147  });
148  assert.throws(() => {
149    TransformStreamDefaultController.prototype.enqueue({});
150  }, {
151    code: 'ERR_INVALID_THIS',
152  });
153  assert.throws(() => {
154    TransformStreamDefaultController.prototype.error({});
155  }, {
156    code: 'ERR_INVALID_THIS',
157  });
158  assert.throws(() => {
159    TransformStreamDefaultController.prototype.terminate({});
160  }, {
161    code: 'ERR_INVALID_THIS',
162  });
163
164  assert.throws(() => new TransformStreamDefaultController(), {
165    code: 'ERR_ILLEGAL_CONSTRUCTOR',
166  });
167}
168
169{
170  let controller;
171  const transform = new TransformStream({
172    start(c) {
173      controller = c;
174    }
175  });
176
177  assert.match(inspect(transform), /TransformStream/);
178  assert.match(inspect(transform, { depth: null }), /TransformStream/);
179  assert.match(inspect(transform, { depth: 0 }), /TransformStream \[/);
180
181  assert.match(inspect(controller), /TransformStreamDefaultController/);
182  assert.match(
183    inspect(controller, { depth: null }),
184    /TransformStreamDefaultController/);
185  assert.match(
186    inspect(controller, { depth: 0 }),
187    /TransformStreamDefaultController \[/);
188}
189
190{
191  Object.defineProperty(Object.prototype, 'type', {
192    get: common.mustNotCall('get %Object.prototype%.type'),
193    set: common.mustNotCall('set %Object.prototype%.type'),
194    configurable: true,
195  });
196
197  new TransformStream({
198    transform(chunk, controller) {
199      controller.enqueue(chunk);
200    },
201    flush(controller) {
202      controller.terminate();
203    }
204  });
205
206  delete Object.prototype.type;
207}
208