• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const common = require('../common');
4const {
5  Readable,
6} = require('stream');
7const assert = require('assert');
8const { once } = require('events');
9const { setTimeout } = require('timers/promises');
10
11{
12  // Filter works on synchronous streams with a synchronous predicate
13  const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => x < 3);
14  const result = [1, 2];
15  (async () => {
16    for await (const item of stream) {
17      assert.strictEqual(item, result.shift());
18    }
19  })().then(common.mustCall());
20}
21
22{
23  // Filter works on synchronous streams with an asynchronous predicate
24  const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => {
25    await Promise.resolve();
26    return x > 3;
27  });
28  const result = [4, 5];
29  (async () => {
30    for await (const item of stream) {
31      assert.strictEqual(item, result.shift());
32    }
33  })().then(common.mustCall());
34}
35
36{
37  // Map works on asynchronous streams with a asynchronous mapper
38  const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
39    await Promise.resolve();
40    return x + x;
41  }).filter((x) => x > 5);
42  const result = [6, 8, 10];
43  (async () => {
44    for await (const item of stream) {
45      assert.strictEqual(item, result.shift());
46    }
47  })().then(common.mustCall());
48}
49
50{
51  // Filter works on an infinite stream
52  const stream = Readable.from(async function* () {
53    while (true) yield 1;
54  }()).filter(common.mustCall(async (x) => {
55    return x < 3;
56  }, 5));
57  (async () => {
58    let i = 1;
59    for await (const item of stream) {
60      assert.strictEqual(item, 1);
61      if (++i === 5) break;
62    }
63  })().then(common.mustCall());
64}
65
66{
67  // Filter works on constructor created streams
68  let i = 0;
69  const stream = new Readable({
70    read() {
71      if (i === 10) {
72        this.push(null);
73        return;
74      }
75      this.push(Uint8Array.from([i]));
76      i++;
77    },
78    highWaterMark: 0,
79  }).filter(common.mustCall(async ([x]) => {
80    return x !== 5;
81  }, 10));
82  (async () => {
83    const result = (await stream.toArray()).map((x) => x[0]);
84    const expected = [...Array(10).keys()].filter((x) => x !== 5);
85    assert.deepStrictEqual(result, expected);
86  })().then(common.mustCall());
87}
88
89{
90  // Throwing an error during `filter` (sync)
91  const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => {
92    if (x === 3) {
93      throw new Error('boom');
94    }
95    return true;
96  });
97  assert.rejects(
98    stream.map((x) => x + x).toArray(),
99    /boom/,
100  ).then(common.mustCall());
101}
102
103{
104  // Throwing an error during `filter` (async)
105  const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => {
106    if (x === 3) {
107      throw new Error('boom');
108    }
109    return true;
110  });
111  assert.rejects(
112    stream.filter(() => true).toArray(),
113    /boom/,
114  ).then(common.mustCall());
115}
116
117{
118  // Concurrency + AbortSignal
119  const ac = new AbortController();
120  let calls = 0;
121  const stream = Readable.from([1, 2, 3, 4]).filter(async (_, { signal }) => {
122    calls++;
123    await once(signal, 'abort');
124  }, { signal: ac.signal, concurrency: 2 });
125  // pump
126  assert.rejects(async () => {
127    for await (const item of stream) {
128      // nope
129      console.log(item);
130    }
131  }, {
132    name: 'AbortError',
133  }).then(common.mustCall());
134
135  setImmediate(() => {
136    ac.abort();
137    assert.strictEqual(calls, 2);
138  });
139}
140
141{
142  // Concurrency result order
143  const stream = Readable.from([1, 2]).filter(async (item, { signal }) => {
144    await setTimeout(10 - item, { signal });
145    return true;
146  }, { concurrency: 2 });
147
148  (async () => {
149    const expected = [1, 2];
150    for await (const item of stream) {
151      assert.strictEqual(item, expected.shift());
152    }
153  })().then(common.mustCall());
154}
155
156{
157  // Error cases
158  assert.throws(() => Readable.from([1]).filter(1), /ERR_INVALID_ARG_TYPE/);
159  assert.throws(() => Readable.from([1]).filter((x) => x, {
160    concurrency: 'Foo'
161  }), /ERR_OUT_OF_RANGE/);
162  assert.throws(() => Readable.from([1]).filter((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
163}
164{
165  // Test result is a Readable
166  const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true);
167  assert.strictEqual(stream.readable, true);
168}
169{
170  const stream = Readable.from([1, 2, 3, 4, 5]);
171  Object.defineProperty(stream, 'map', {
172    value: common.mustNotCall(),
173  });
174  // Check that map isn't getting called.
175  stream.filter(() => true);
176}
177