• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const common = require('../common');
4const {
5  Readable,
6} = require('stream');
7const assert = require('assert');
8const { once } = require('events');
9const { setTimeout } = require('timers/promises');
10
11{
12  // Map works on synchronous streams with a synchronous mapper
13  const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
14  (async () => {
15    assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
16  })().then(common.mustCall());
17}
18
19{
20  // Map works on synchronous streams with an asynchronous mapper
21  const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
22    await Promise.resolve();
23    return x + x;
24  });
25  (async () => {
26    assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
27  })().then(common.mustCall());
28}
29
30{
31  // Map works on asynchronous streams with a asynchronous mapper
32  const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
33    return x + x;
34  }).map((x) => x + x);
35  (async () => {
36    assert.deepStrictEqual(await stream.toArray(), [4, 8, 12, 16, 20]);
37  })().then(common.mustCall());
38}
39
40{
41  // Map works on an infinite stream
42  const stream = Readable.from(async function* () {
43    while (true) yield 1;
44  }()).map(common.mustCall(async (x) => {
45    return x + x;
46  }, 5));
47  (async () => {
48    let i = 1;
49    for await (const item of stream) {
50      assert.strictEqual(item, 2);
51      if (++i === 5) break;
52    }
53  })().then(common.mustCall());
54}
55
56{
57  // Map works on non-objectMode streams
58  const stream = new Readable({
59    read() {
60      this.push(Uint8Array.from([1]));
61      this.push(Uint8Array.from([2]));
62      this.push(null);
63    }
64  }).map(async ([x]) => {
65    return x + x;
66  }).map((x) => x + x);
67  const result = [4, 8];
68  (async () => {
69    for await (const item of stream) {
70      assert.strictEqual(item, result.shift());
71    }
72  })().then(common.mustCall());
73}
74
75{
76  // Does not care about data events
77  const source = new Readable({
78    read() {
79      this.push(Uint8Array.from([1]));
80      this.push(Uint8Array.from([2]));
81      this.push(null);
82    }
83  });
84  setImmediate(() => stream.emit('data', Uint8Array.from([1])));
85  const stream = source.map(async ([x]) => {
86    return x + x;
87  }).map((x) => x + x);
88  const result = [4, 8];
89  (async () => {
90    for await (const item of stream) {
91      assert.strictEqual(item, result.shift());
92    }
93  })().then(common.mustCall());
94}
95
96{
97  // Emitting an error during `map`
98  const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
99    if (x === 3) {
100      stream.emit('error', new Error('boom'));
101    }
102    return x + x;
103  });
104  assert.rejects(
105    stream.map((x) => x + x).toArray(),
106    /boom/,
107  ).then(common.mustCall());
108}
109
110{
111  // Throwing an error during `map` (sync)
112  const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => {
113    if (x === 3) {
114      throw new Error('boom');
115    }
116    return x + x;
117  });
118  assert.rejects(
119    stream.map((x) => x + x).toArray(),
120    /boom/,
121  ).then(common.mustCall());
122}
123
124
125{
126  // Throwing an error during `map` (async)
127  const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
128    if (x === 3) {
129      throw new Error('boom');
130    }
131    return x + x;
132  });
133  assert.rejects(
134    stream.map((x) => x + x).toArray(),
135    /boom/,
136  ).then(common.mustCall());
137}
138
139{
140  // Concurrency + AbortSignal
141  const ac = new AbortController();
142  const range = Readable.from([1, 2, 3, 4, 5]);
143  const stream = range.map(common.mustCall(async (_, { signal }) => {
144    await once(signal, 'abort');
145    throw signal.reason;
146  }, 2), { signal: ac.signal, concurrency: 2 });
147  // pump
148  assert.rejects(async () => {
149    for await (const item of stream) {
150      assert.fail('should not reach here, got ' + item);
151    }
152  }, {
153    name: 'AbortError',
154  }).then(common.mustCall());
155
156  setImmediate(() => {
157    ac.abort();
158  });
159}
160
161{
162  // Concurrency result order
163  const stream = Readable.from([1, 2]).map(async (item, { signal }) => {
164    await setTimeout(10 - item, { signal });
165    return item;
166  }, { concurrency: 2 });
167
168  (async () => {
169    const expected = [1, 2];
170    for await (const item of stream) {
171      assert.strictEqual(item, expected.shift());
172    }
173  })().then(common.mustCall());
174}
175
176{
177  // Error cases
178  assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/);
179  assert.throws(() => Readable.from([1]).map((x) => x, {
180    concurrency: 'Foo'
181  }), /ERR_OUT_OF_RANGE/);
182  assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
183  assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
184}
185{
186  // Test result is a Readable
187  const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x);
188  assert.strictEqual(stream.readable, true);
189}
190