• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Flags: --expose-internals --no-warnings
2'use strict';
3
4const common = require('../common');
5
6const assert = require('assert');
7
8const {
9  pipeline,
10  finished,
11  Writable,
12} = require('stream');
13
14const {
15  ReadableStream,
16  WritableStream,
17} = require('stream/web');
18
19const {
20  newStreamReadableFromReadableStream,
21} = require('internal/webstreams/adapters');
22
23const {
24  kState,
25} = require('internal/webstreams/util');
26
27class MySource {
28  constructor(value = new Uint8Array(10)) {
29    this.value = value;
30  }
31
32  start(c) {
33    this.started = true;
34    this.controller = c;
35  }
36
37  pull(controller) {
38    controller.enqueue(this.value);
39    controller.close();
40  }
41
42  cancel(reason) {
43    this.canceled = true;
44    this.cancelReason = reason;
45  }
46}
47
48{
49  // Destroying the readable without an error closes
50  // the readableStream.
51
52  const readableStream = new ReadableStream();
53  const readable = newStreamReadableFromReadableStream(readableStream);
54
55  assert(readableStream.locked);
56
57  assert.rejects(readableStream.cancel(), {
58    code: 'ERR_INVALID_STATE',
59  });
60  assert.rejects(readableStream.pipeTo(new WritableStream()), {
61    code: 'ERR_INVALID_STATE',
62  });
63  assert.throws(() => readableStream.tee(), {
64    code: 'ERR_INVALID_STATE',
65  });
66  assert.throws(() => readableStream.getReader(), {
67    code: 'ERR_INVALID_STATE',
68  });
69  assert.throws(() => {
70    readableStream.pipeThrough({
71      readable: new ReadableStream(),
72      writable: new WritableStream(),
73    });
74  }, {
75    code: 'ERR_INVALID_STATE',
76  });
77
78  readable.destroy();
79
80  readable.on('close', common.mustCall(() => {
81    assert.strictEqual(readableStream[kState].state, 'closed');
82  }));
83}
84
85{
86  // Destroying the readable with an error closes the readableStream
87  // without error but records the cancel reason in the source.
88  const error = new Error('boom');
89  const source = new MySource();
90  const readableStream = new ReadableStream(source);
91  const readable = newStreamReadableFromReadableStream(readableStream);
92
93  assert(readableStream.locked);
94
95  readable.destroy(error);
96
97  readable.on('error', common.mustCall((reason) => {
98    assert.strictEqual(reason, error);
99  }));
100
101  readable.on('close', common.mustCall(() => {
102    assert.strictEqual(readableStream[kState].state, 'closed');
103    assert.strictEqual(source.cancelReason, error);
104  }));
105}
106
107{
108  // An error in the source causes the readable to error.
109  const error = new Error('boom');
110  const source = new MySource();
111  const readableStream = new ReadableStream(source);
112  const readable = newStreamReadableFromReadableStream(readableStream);
113
114  assert(readableStream.locked);
115
116  source.controller.error(error);
117
118  readable.on('error', common.mustCall((reason) => {
119    assert.strictEqual(reason, error);
120  }));
121
122  readable.on('close', common.mustCall(() => {
123    assert.strictEqual(readableStream[kState].state, 'errored');
124  }));
125}
126
127{
128  const readableStream = new ReadableStream(new MySource());
129  const readable = newStreamReadableFromReadableStream(readableStream);
130
131  readable.on('data', common.mustCall((chunk) => {
132    assert.deepStrictEqual(chunk, Buffer.alloc(10));
133  }));
134  readable.on('end', common.mustCall());
135  readable.on('close', common.mustCall());
136  readable.on('error', common.mustNotCall());
137}
138
139{
140  const readableStream = new ReadableStream(new MySource('hello'));
141  const readable = newStreamReadableFromReadableStream(readableStream, {
142    encoding: 'utf8',
143  });
144
145  readable.on('data', common.mustCall((chunk) => {
146    assert.strictEqual(chunk, 'hello');
147  }));
148  readable.on('end', common.mustCall());
149  readable.on('close', common.mustCall());
150  readable.on('error', common.mustNotCall());
151}
152
153{
154  const readableStream = new ReadableStream(new MySource());
155  const readable = newStreamReadableFromReadableStream(readableStream, {
156    objectMode: true
157  });
158
159  readable.on('data', common.mustCall((chunk) => {
160    assert.deepStrictEqual(chunk, new Uint8Array(10));
161  }));
162  readable.on('end', common.mustCall());
163  readable.on('close', common.mustCall());
164  readable.on('error', common.mustNotCall());
165}
166
167{
168  const ec = new TextEncoder();
169  const readable = new ReadableStream({
170    start(controller) {
171      controller.enqueue(ec.encode('hello'));
172      setImmediate(() => {
173        controller.enqueue(ec.encode('there'));
174        controller.close();
175      });
176    }
177  });
178  const streamReadable = newStreamReadableFromReadableStream(readable);
179
180  finished(streamReadable, common.mustCall());
181
182  streamReadable.resume();
183}
184
185{
186  const ec = new TextEncoder();
187  const readable = new ReadableStream({
188    start(controller) {
189      controller.enqueue(ec.encode('hello'));
190      setImmediate(() => {
191        controller.enqueue(ec.encode('there'));
192        controller.close();
193      });
194    }
195  });
196  const streamReadable = newStreamReadableFromReadableStream(readable);
197
198  finished(streamReadable, common.mustCall());
199
200  streamReadable.resume();
201}
202
203{
204  const ec = new TextEncoder();
205  const dc = new TextDecoder();
206  const check = ['hello', 'there'];
207  const readable = new ReadableStream({
208    start(controller) {
209      controller.enqueue(ec.encode('hello'));
210      setImmediate(() => {
211        controller.enqueue(ec.encode('there'));
212        controller.close();
213      });
214    }
215  });
216  const writable = new Writable({
217    write: common.mustCall((chunk, encoding, callback) => {
218      assert.strictEqual(dc.decode(chunk), check.shift());
219      assert.strictEqual(encoding, 'buffer');
220      callback();
221    }, 2),
222  });
223
224  const streamReadable = newStreamReadableFromReadableStream(readable);
225
226  pipeline(streamReadable, writable, common.mustCall());
227
228  streamReadable.resume();
229}
230