1'use strict'; 2 3const common = require('../common'); 4const { 5 Readable, 6} = require('stream'); 7const assert = require('assert'); 8const { once } = require('events'); 9const { setTimeout } = require('timers/promises'); 10 11{ 12 // Filter works on synchronous streams with a synchronous predicate 13 const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => x < 3); 14 const result = [1, 2]; 15 (async () => { 16 for await (const item of stream) { 17 assert.strictEqual(item, result.shift()); 18 } 19 })().then(common.mustCall()); 20} 21 22{ 23 // Filter works on synchronous streams with an asynchronous predicate 24 const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => { 25 await Promise.resolve(); 26 return x > 3; 27 }); 28 const result = [4, 5]; 29 (async () => { 30 for await (const item of stream) { 31 assert.strictEqual(item, result.shift()); 32 } 33 })().then(common.mustCall()); 34} 35 36{ 37 // Map works on asynchronous streams with a asynchronous mapper 38 const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { 39 await Promise.resolve(); 40 return x + x; 41 }).filter((x) => x > 5); 42 const result = [6, 8, 10]; 43 (async () => { 44 for await (const item of stream) { 45 assert.strictEqual(item, result.shift()); 46 } 47 })().then(common.mustCall()); 48} 49 50{ 51 // Filter works on an infinite stream 52 const stream = Readable.from(async function* () { 53 while (true) yield 1; 54 }()).filter(common.mustCall(async (x) => { 55 return x < 3; 56 }, 5)); 57 (async () => { 58 let i = 1; 59 for await (const item of stream) { 60 assert.strictEqual(item, 1); 61 if (++i === 5) break; 62 } 63 })().then(common.mustCall()); 64} 65 66{ 67 // Filter works on constructor created streams 68 let i = 0; 69 const stream = new Readable({ 70 read() { 71 if (i === 10) { 72 this.push(null); 73 return; 74 } 75 this.push(Uint8Array.from([i])); 76 i++; 77 }, 78 highWaterMark: 0, 79 }).filter(common.mustCall(async ([x]) => { 80 return x !== 5; 81 }, 10)); 82 (async () => { 83 const result = (await stream.toArray()).map((x) => x[0]); 84 const expected = [...Array(10).keys()].filter((x) => x !== 5); 85 assert.deepStrictEqual(result, expected); 86 })().then(common.mustCall()); 87} 88 89{ 90 // Throwing an error during `filter` (sync) 91 const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => { 92 if (x === 3) { 93 throw new Error('boom'); 94 } 95 return true; 96 }); 97 assert.rejects( 98 stream.map((x) => x + x).toArray(), 99 /boom/, 100 ).then(common.mustCall()); 101} 102 103{ 104 // Throwing an error during `filter` (async) 105 const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => { 106 if (x === 3) { 107 throw new Error('boom'); 108 } 109 return true; 110 }); 111 assert.rejects( 112 stream.filter(() => true).toArray(), 113 /boom/, 114 ).then(common.mustCall()); 115} 116 117{ 118 // Concurrency + AbortSignal 119 const ac = new AbortController(); 120 let calls = 0; 121 const stream = Readable.from([1, 2, 3, 4]).filter(async (_, { signal }) => { 122 calls++; 123 await once(signal, 'abort'); 124 }, { signal: ac.signal, concurrency: 2 }); 125 // pump 126 assert.rejects(async () => { 127 for await (const item of stream) { 128 // nope 129 console.log(item); 130 } 131 }, { 132 name: 'AbortError', 133 }).then(common.mustCall()); 134 135 setImmediate(() => { 136 ac.abort(); 137 assert.strictEqual(calls, 2); 138 }); 139} 140 141{ 142 // Concurrency result order 143 const stream = Readable.from([1, 2]).filter(async (item, { signal }) => { 144 await setTimeout(10 - item, { signal }); 145 return true; 146 }, { concurrency: 2 }); 147 148 (async () => { 149 const expected = [1, 2]; 150 for await (const item of stream) { 151 assert.strictEqual(item, expected.shift()); 152 } 153 })().then(common.mustCall()); 154} 155 156{ 157 // Error cases 158 assert.throws(() => Readable.from([1]).filter(1), /ERR_INVALID_ARG_TYPE/); 159 assert.throws(() => Readable.from([1]).filter((x) => x, { 160 concurrency: 'Foo' 161 }), /ERR_OUT_OF_RANGE/); 162 assert.throws(() => Readable.from([1]).filter((x) => x, 1), /ERR_INVALID_ARG_TYPE/); 163} 164{ 165 // Test result is a Readable 166 const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true); 167 assert.strictEqual(stream.readable, true); 168} 169{ 170 const stream = Readable.from([1, 2, 3, 4, 5]); 171 Object.defineProperty(stream, 'map', { 172 value: common.mustNotCall(), 173 }); 174 // Check that map isn't getting called. 175 stream.filter(() => true); 176} 177