1'use strict'; 2 3const common = require('../common'); 4const { 5 Readable, 6} = require('stream'); 7const assert = require('assert'); 8const { once } = require('events'); 9const { setTimeout } = require('timers/promises'); 10 11{ 12 // Map works on synchronous streams with a synchronous mapper 13 const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x); 14 (async () => { 15 assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]); 16 })().then(common.mustCall()); 17} 18 19{ 20 // Map works on synchronous streams with an asynchronous mapper 21 const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { 22 await Promise.resolve(); 23 return x + x; 24 }); 25 (async () => { 26 assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]); 27 })().then(common.mustCall()); 28} 29 30{ 31 // Map works on asynchronous streams with a asynchronous mapper 32 const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { 33 return x + x; 34 }).map((x) => x + x); 35 (async () => { 36 assert.deepStrictEqual(await stream.toArray(), [4, 8, 12, 16, 20]); 37 })().then(common.mustCall()); 38} 39 40{ 41 // Map works on an infinite stream 42 const stream = Readable.from(async function* () { 43 while (true) yield 1; 44 }()).map(common.mustCall(async (x) => { 45 return x + x; 46 }, 5)); 47 (async () => { 48 let i = 1; 49 for await (const item of stream) { 50 assert.strictEqual(item, 2); 51 if (++i === 5) break; 52 } 53 })().then(common.mustCall()); 54} 55 56{ 57 // Map works on non-objectMode streams 58 const stream = new Readable({ 59 read() { 60 this.push(Uint8Array.from([1])); 61 this.push(Uint8Array.from([2])); 62 this.push(null); 63 } 64 }).map(async ([x]) => { 65 return x + x; 66 }).map((x) => x + x); 67 const result = [4, 8]; 68 (async () => { 69 for await (const item of stream) { 70 assert.strictEqual(item, result.shift()); 71 } 72 })().then(common.mustCall()); 73} 74 75{ 76 // Does not care about data events 77 const source = new Readable({ 78 read() { 79 this.push(Uint8Array.from([1])); 80 this.push(Uint8Array.from([2])); 81 this.push(null); 82 } 83 }); 84 setImmediate(() => stream.emit('data', Uint8Array.from([1]))); 85 const stream = source.map(async ([x]) => { 86 return x + x; 87 }).map((x) => x + x); 88 const result = [4, 8]; 89 (async () => { 90 for await (const item of stream) { 91 assert.strictEqual(item, result.shift()); 92 } 93 })().then(common.mustCall()); 94} 95 96{ 97 // Emitting an error during `map` 98 const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { 99 if (x === 3) { 100 stream.emit('error', new Error('boom')); 101 } 102 return x + x; 103 }); 104 assert.rejects( 105 stream.map((x) => x + x).toArray(), 106 /boom/, 107 ).then(common.mustCall()); 108} 109 110{ 111 // Throwing an error during `map` (sync) 112 const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => { 113 if (x === 3) { 114 throw new Error('boom'); 115 } 116 return x + x; 117 }); 118 assert.rejects( 119 stream.map((x) => x + x).toArray(), 120 /boom/, 121 ).then(common.mustCall()); 122} 123 124 125{ 126 // Throwing an error during `map` (async) 127 const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { 128 if (x === 3) { 129 throw new Error('boom'); 130 } 131 return x + x; 132 }); 133 assert.rejects( 134 stream.map((x) => x + x).toArray(), 135 /boom/, 136 ).then(common.mustCall()); 137} 138 139{ 140 // Concurrency + AbortSignal 141 const ac = new AbortController(); 142 const range = Readable.from([1, 2, 3, 4, 5]); 143 const stream = range.map(common.mustCall(async (_, { signal }) => { 144 await once(signal, 'abort'); 145 throw signal.reason; 146 }, 2), { signal: ac.signal, concurrency: 2 }); 147 // pump 148 assert.rejects(async () => { 149 for await (const item of stream) { 150 assert.fail('should not reach here, got ' + item); 151 } 152 }, { 153 name: 'AbortError', 154 }).then(common.mustCall()); 155 156 setImmediate(() => { 157 ac.abort(); 158 }); 159} 160 161{ 162 // Concurrency result order 163 const stream = Readable.from([1, 2]).map(async (item, { signal }) => { 164 await setTimeout(10 - item, { signal }); 165 return item; 166 }, { concurrency: 2 }); 167 168 (async () => { 169 const expected = [1, 2]; 170 for await (const item of stream) { 171 assert.strictEqual(item, expected.shift()); 172 } 173 })().then(common.mustCall()); 174} 175 176{ 177 // Error cases 178 assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/); 179 assert.throws(() => Readable.from([1]).map((x) => x, { 180 concurrency: 'Foo' 181 }), /ERR_OUT_OF_RANGE/); 182 assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/); 183 assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/); 184} 185{ 186 // Test result is a Readable 187 const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x); 188 assert.strictEqual(stream.readable, true); 189} 190