• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Flags: --no-warnings --expose-internals
2'use strict';
3
4const common = require('../common');
5
6const assert = require('assert');
7
8const {
9  newReadableStreamFromStreamReadable,
10} = require('internal/webstreams/adapters');
11
12const {
13  Duplex,
14  Readable,
15} = require('stream');
16
17const {
18  kState,
19} = require('internal/webstreams/util');
20
21{
22  // Canceling the readableStream closes the readable.
23  const readable = new Readable({
24    read() {
25      readable.push('hello');
26      readable.push(null);
27    }
28  });
29
30  readable.on('close', common.mustCall());
31  readable.on('end', common.mustNotCall());
32  readable.on('pause', common.mustCall());
33  readable.on('resume', common.mustNotCall());
34  readable.on('error', common.mustCall((error) => {
35    assert.strictEqual(error.code, 'ABORT_ERR');
36  }));
37
38  const readableStream = newReadableStreamFromStreamReadable(readable);
39
40  readableStream.cancel().then(common.mustCall());
41}
42
43{
44  // Prematurely destroying the stream.Readable without an error
45  // closes the ReadableStream with a premature close error but does
46  // not error the readable.
47
48  const readable = new Readable({
49    read() {
50      readable.push('hello');
51      readable.push(null);
52    }
53  });
54
55  const readableStream = newReadableStreamFromStreamReadable(readable);
56
57  assert(!readableStream.locked);
58
59  const reader = readableStream.getReader();
60
61  assert.rejects(reader.closed, {
62    code: 'ABORT_ERR',
63  });
64
65  readable.on('end', common.mustNotCall());
66  readable.on('error', common.mustNotCall());
67
68  readable.on('close', common.mustCall(() => {
69    assert.strictEqual(readableStream[kState].state, 'errored');
70  }));
71
72  readable.destroy();
73}
74
75{
76  // Ending the readable without an error just closes the
77  // readableStream without an error.
78  const readable = new Readable({
79    read() {
80      readable.push('hello');
81      readable.push(null);
82    }
83  });
84
85  const readableStream = newReadableStreamFromStreamReadable(readable);
86
87  assert(!readableStream.locked);
88
89  const reader = readableStream.getReader();
90
91  reader.closed.then(common.mustCall());
92
93  readable.on('end', common.mustCall());
94  readable.on('error', common.mustNotCall());
95
96  readable.on('close', common.mustCall(() => {
97    assert.strictEqual(readableStream[kState].state, 'closed');
98  }));
99
100  readable.push(null);
101}
102
103{
104  // Destroying the readable with an error should error the readableStream
105  const error = new Error('boom');
106  const readable = new Readable({
107    read() {
108      readable.push('hello');
109      readable.push(null);
110    }
111  });
112
113  const readableStream = newReadableStreamFromStreamReadable(readable);
114
115  assert(!readableStream.locked);
116
117  const reader = readableStream.getReader();
118
119  assert.rejects(reader.closed, error);
120
121  readable.on('end', common.mustNotCall());
122  readable.on('error', common.mustCall((reason) => {
123    assert.strictEqual(reason, error);
124  }));
125
126  readable.on('close', common.mustCall(() => {
127    assert.strictEqual(readableStream[kState].state, 'errored');
128  }));
129
130  readable.destroy(error);
131}
132
133{
134  const readable = new Readable({
135    encoding: 'utf8',
136    read() {
137      readable.push('hello');
138      readable.push(null);
139    }
140  });
141
142  const readableStream = newReadableStreamFromStreamReadable(readable);
143  const reader = readableStream.getReader();
144
145  readable.on('data', common.mustCall());
146  readable.on('end', common.mustCall());
147  readable.on('close', common.mustCall());
148
149  (async () => {
150    assert.deepStrictEqual(
151      await reader.read(),
152      { value: 'hello', done: false });
153    assert.deepStrictEqual(
154      await reader.read(),
155      { value: undefined, done: true });
156
157  })().then(common.mustCall());
158}
159
160{
161  const data = {};
162  const readable = new Readable({
163    objectMode: true,
164    read() {
165      readable.push(data);
166      readable.push(null);
167    }
168  });
169
170  assert(readable.readableObjectMode);
171
172  const readableStream = newReadableStreamFromStreamReadable(readable);
173  const reader = readableStream.getReader();
174
175  readable.on('data', common.mustCall());
176  readable.on('end', common.mustCall());
177  readable.on('close', common.mustCall());
178
179  (async () => {
180    assert.deepStrictEqual(
181      await reader.read(),
182      { value: data, done: false });
183    assert.deepStrictEqual(
184      await reader.read(),
185      { value: undefined, done: true });
186
187  })().then(common.mustCall());
188}
189
190{
191  const readable = new Readable();
192  readable.destroy();
193  const readableStream = newReadableStreamFromStreamReadable(readable);
194  const reader = readableStream.getReader();
195  reader.closed.then(common.mustCall());
196}
197
198{
199  const duplex = new Duplex({ readable: false });
200  duplex.destroy();
201  const readableStream = newReadableStreamFromStreamReadable(duplex);
202  const reader = readableStream.getReader();
203  reader.closed.then(common.mustCall());
204}
205