• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Flags: --expose-internals --no-warnings
2'use strict';
3
4const common = require('../common');
5const assert = require('assert');
6
7const {
8  ReadableStream,
9  ReadableByteStreamController,
10  ReadableStreamDefaultReader,
11  ReadableStreamBYOBReader,
12  ReadableStreamBYOBRequest,
13} = require('stream/web');
14
15const {
16  kState,
17} = require('internal/webstreams/util');
18
19const {
20  open,
21} = require('fs/promises');
22
23const {
24  readFileSync,
25} = require('fs');
26
27const {
28  Buffer,
29} = require('buffer');
30
31const {
32  inspect,
33} = require('util');
34
35{
36  const r = new ReadableStream({
37    type: 'bytes',
38  });
39
40  assert(r[kState].controller instanceof ReadableByteStreamController);
41
42  assert.strictEqual(typeof r.locked, 'boolean');
43  assert.strictEqual(typeof r.cancel, 'function');
44  assert.strictEqual(typeof r.getReader, 'function');
45  assert.strictEqual(typeof r.pipeThrough, 'function');
46  assert.strictEqual(typeof r.pipeTo, 'function');
47  assert.strictEqual(typeof r.tee, 'function');
48
49  ['', null, 'asdf'].forEach((mode) => {
50    assert.throws(() => r.getReader({ mode }), {
51      code: 'ERR_INVALID_ARG_VALUE',
52    });
53  });
54
55  [1, 'asdf'].forEach((options) => {
56    assert.throws(() => r.getReader(options), {
57      code: 'ERR_INVALID_ARG_TYPE',
58    });
59  });
60
61  assert(!r.locked);
62  const defaultReader = r.getReader();
63  assert(r.locked);
64  assert(defaultReader instanceof ReadableStreamDefaultReader);
65  defaultReader.releaseLock();
66  const byobReader = r.getReader({ mode: 'byob' });
67  assert(byobReader instanceof ReadableStreamBYOBReader);
68  assert.match(
69    inspect(byobReader, { depth: 0 }),
70    /ReadableStreamBYOBReader/);
71}
72
73class Source {
74  constructor() {
75    this.controllerClosed = false;
76  }
77
78  async start(controller) {
79    this.file = await open(__filename);
80    this.controller = controller;
81  }
82
83  async pull(controller) {
84    const byobRequest = controller.byobRequest;
85    assert.match(inspect(byobRequest), /ReadableStreamBYOBRequest/);
86
87    const view = byobRequest.view;
88    const {
89      bytesRead,
90    } = await this.file.read({
91      buffer: view,
92      offset: view.byteOffset,
93      length: view.byteLength
94    });
95
96    if (bytesRead === 0) {
97      await this.file.close();
98      this.controller.close();
99    }
100
101    assert.throws(() => byobRequest.respondWithNewView({}), {
102      code: 'ERR_INVALID_ARG_TYPE',
103    });
104
105    byobRequest.respond(bytesRead);
106
107    assert.throws(() => byobRequest.respond(bytesRead), {
108      code: 'ERR_INVALID_STATE',
109    });
110    assert.throws(() => byobRequest.respondWithNewView(view), {
111      code: 'ERR_INVALID_STATE',
112    });
113  }
114
115  get type() { return 'bytes'; }
116
117  get autoAllocateChunkSize() { return 1024; }
118}
119
120{
121  const stream = new ReadableStream(new Source());
122  assert(stream[kState].controller instanceof ReadableByteStreamController);
123
124  async function read(stream) {
125    const reader = stream.getReader({ mode: 'byob' });
126
127    const chunks = [];
128    let result;
129    do {
130      result = await reader.read(Buffer.alloc(100));
131      if (result.value !== undefined)
132        chunks.push(Buffer.from(result.value));
133    } while (!result.done);
134
135    return Buffer.concat(chunks);
136  }
137
138  read(stream).then(common.mustCall((data) => {
139    const check = readFileSync(__filename);
140    assert.deepStrictEqual(check, data);
141  }));
142}
143
144{
145  const stream = new ReadableStream(new Source());
146  assert(stream[kState].controller instanceof ReadableByteStreamController);
147
148  async function read(stream) {
149    const chunks = [];
150    for await (const chunk of stream)
151      chunks.push(chunk);
152
153    return Buffer.concat(chunks);
154  }
155
156  read(stream).then(common.mustCall((data) => {
157    const check = readFileSync(__filename);
158    assert.deepStrictEqual(check, data);
159  }));
160}
161
162{
163  const stream = new ReadableStream(new Source());
164  assert(stream[kState].controller instanceof ReadableByteStreamController);
165
166  async function read(stream) {
167    // eslint-disable-next-line no-unused-vars
168    for await (const _ of stream)
169      break;
170  }
171
172  read(stream).then(common.mustCall());
173}
174
175{
176  const stream = new ReadableStream(new Source());
177  assert(stream[kState].controller instanceof ReadableByteStreamController);
178
179  const error = new Error('boom');
180
181  async function read(stream) {
182    // eslint-disable-next-line no-unused-vars
183    for await (const _ of stream)
184      throw error;
185  }
186
187  assert.rejects(read(stream), error);
188}
189
190{
191  assert.throws(() => {
192    Reflect.get(ReadableStreamBYOBRequest.prototype, 'view', {});
193  }, {
194    code: 'ERR_INVALID_THIS',
195  });
196
197  assert.throws(() => ReadableStreamBYOBRequest.prototype.respond.call({}), {
198    code: 'ERR_INVALID_THIS',
199  });
200
201  assert.throws(() => {
202    ReadableStreamBYOBRequest.prototype.respondWithNewView.call({});
203  }, {
204    code: 'ERR_INVALID_THIS',
205  });
206}
207
208{
209  const readable = new ReadableStream({ type: 'bytes' });
210  const reader = readable.getReader({ mode: 'byob' });
211  reader.releaseLock();
212  reader.releaseLock();
213  assert.rejects(reader.read(new Uint8Array(10)), {
214    code: 'ERR_INVALID_STATE',
215  });
216  assert.rejects(reader.cancel(), {
217    code: 'ERR_INVALID_STATE',
218  });
219}
220
221{
222  let controller;
223  new ReadableStream({
224    type: 'bytes',
225    start(c) { controller = c; }
226  });
227  assert.throws(() => controller.enqueue(1), {
228    code: 'ERR_INVALID_ARG_TYPE',
229  });
230  controller.close();
231  assert.throws(() => controller.enqueue(new Uint8Array(10)), {
232    code: 'ERR_INVALID_STATE',
233  });
234  assert.throws(() => controller.close(), {
235    code: 'ERR_INVALID_STATE',
236  });
237}
238
239{
240  let controller;
241  new ReadableStream({
242    type: 'bytes',
243    start(c) { controller = c; }
244  });
245  controller.enqueue(new Uint8Array(10));
246  controller.close();
247  assert.throws(() => controller.enqueue(new Uint8Array(10)), {
248    code: 'ERR_INVALID_STATE',
249  });
250}
251
252{
253  const stream = new ReadableStream({
254    type: 'bytes',
255    pull(c) {
256      const v = new Uint8Array(c.byobRequest.view.buffer, 0, 3);
257      v.set([20, 21, 22]);
258      c.byobRequest.respondWithNewView(v);
259    },
260  });
261  const buffer = new ArrayBuffer(10);
262  const view = new Uint8Array(buffer, 0, 3);
263  view.set([10, 11, 12]);
264  const reader = stream.getReader({ mode: 'byob' });
265  reader.read(view);
266}
267
268{
269  const stream = new ReadableStream({
270    type: 'bytes',
271    autoAllocateChunkSize: 10,
272    pull(c) {
273      const v = new Uint8Array(c.byobRequest.view.buffer, 0, 3);
274      v.set([20, 21, 22]);
275      c.byobRequest.respondWithNewView(v);
276    },
277  });
278  const reader = stream.getReader();
279  reader.read();
280}
281