1'use strict'; 2 3const { AbortController, AbortSignal } = require('internal/abort_controller'); 4 5const { 6 codes: { 7 ERR_INVALID_ARG_VALUE, 8 ERR_INVALID_ARG_TYPE, 9 ERR_MISSING_ARGS, 10 ERR_OUT_OF_RANGE, 11 }, 12 AbortError, 13} = require('internal/errors'); 14const { 15 validateAbortSignal, 16 validateInteger, 17 validateObject, 18} = require('internal/validators'); 19const { kWeakHandler, kResistStopPropagation } = require('internal/event_target'); 20const { finished } = require('internal/streams/end-of-stream'); 21const staticCompose = require('internal/streams/compose'); 22const { 23 addAbortSignalNoValidate, 24} = require('internal/streams/add-abort-signal'); 25const { isWritable, isNodeStream } = require('internal/streams/utils'); 26const { deprecate } = require('internal/util'); 27 28const { 29 ArrayPrototypePush, 30 Boolean, 31 MathFloor, 32 Number, 33 NumberIsNaN, 34 Promise, 35 PromiseReject, 36 PromiseResolve, 37 PromisePrototypeThen, 38 Symbol, 39} = primordials; 40 41const kEmpty = Symbol('kEmpty'); 42const kEof = Symbol('kEof'); 43 44function compose(stream, options) { 45 if (options != null) { 46 validateObject(options, 'options'); 47 } 48 if (options?.signal != null) { 49 validateAbortSignal(options.signal, 'options.signal'); 50 } 51 52 if (isNodeStream(stream) && !isWritable(stream)) { 53 throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable'); 54 } 55 56 const composedStream = staticCompose(this, stream); 57 58 if (options?.signal) { 59 // Not validating as we already validated before 60 addAbortSignalNoValidate( 61 options.signal, 62 composedStream, 63 ); 64 } 65 66 return composedStream; 67} 68 69function map(fn, options) { 70 if (typeof fn !== 'function') { 71 throw new ERR_INVALID_ARG_TYPE( 72 'fn', ['Function', 'AsyncFunction'], fn); 73 } 74 if (options != null) { 75 validateObject(options, 'options'); 76 } 77 if (options?.signal != null) { 78 validateAbortSignal(options.signal, 'options.signal'); 79 } 80 81 let concurrency = 1; 82 if (options?.concurrency != null) { 83 concurrency = MathFloor(options.concurrency); 84 } 85 86 let highWaterMark = concurrency - 1; 87 if (options?.highWaterMark != null) { 88 highWaterMark = MathFloor(options.highWaterMark); 89 } 90 91 validateInteger(concurrency, 'options.concurrency', 1); 92 validateInteger(highWaterMark, 'options.highWaterMark', 0); 93 94 highWaterMark += concurrency; 95 96 return async function* map() { 97 const signal = AbortSignal.any([options?.signal].filter(Boolean)); 98 const stream = this; 99 const queue = []; 100 const signalOpt = { signal }; 101 102 let next; 103 let resume; 104 let done = false; 105 let cnt = 0; 106 107 function onCatch() { 108 done = true; 109 afterItemProcessed(); 110 } 111 112 function afterItemProcessed() { 113 cnt -= 1; 114 maybeResume(); 115 } 116 117 function maybeResume() { 118 if ( 119 resume && 120 !done && 121 cnt < concurrency && 122 queue.length < highWaterMark 123 ) { 124 resume(); 125 resume = null; 126 } 127 } 128 129 async function pump() { 130 try { 131 for await (let val of stream) { 132 if (done) { 133 return; 134 } 135 136 if (signal.aborted) { 137 throw new AbortError(); 138 } 139 140 try { 141 val = fn(val, signalOpt); 142 143 if (val === kEmpty) { 144 continue; 145 } 146 147 val = PromiseResolve(val); 148 } catch (err) { 149 val = PromiseReject(err); 150 } 151 152 cnt += 1; 153 154 PromisePrototypeThen(val, afterItemProcessed, onCatch); 155 156 queue.push(val); 157 if (next) { 158 next(); 159 next = null; 160 } 161 162 if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) { 163 await new Promise((resolve) => { 164 resume = resolve; 165 }); 166 } 167 } 168 queue.push(kEof); 169 } catch (err) { 170 const val = PromiseReject(err); 171 PromisePrototypeThen(val, afterItemProcessed, onCatch); 172 queue.push(val); 173 } finally { 174 done = true; 175 if (next) { 176 next(); 177 next = null; 178 } 179 } 180 } 181 182 pump(); 183 184 try { 185 while (true) { 186 while (queue.length > 0) { 187 const val = await queue[0]; 188 189 if (val === kEof) { 190 return; 191 } 192 193 if (signal.aborted) { 194 throw new AbortError(); 195 } 196 197 if (val !== kEmpty) { 198 yield val; 199 } 200 201 queue.shift(); 202 maybeResume(); 203 } 204 205 await new Promise((resolve) => { 206 next = resolve; 207 }); 208 } 209 } finally { 210 done = true; 211 if (resume) { 212 resume(); 213 resume = null; 214 } 215 } 216 }.call(this); 217} 218 219function asIndexedPairs(options = undefined) { 220 if (options != null) { 221 validateObject(options, 'options'); 222 } 223 if (options?.signal != null) { 224 validateAbortSignal(options.signal, 'options.signal'); 225 } 226 227 return async function* asIndexedPairs() { 228 let index = 0; 229 for await (const val of this) { 230 if (options?.signal?.aborted) { 231 throw new AbortError({ cause: options.signal.reason }); 232 } 233 yield [index++, val]; 234 } 235 }.call(this); 236} 237 238async function some(fn, options = undefined) { 239 for await (const unused of filter.call(this, fn, options)) { 240 return true; 241 } 242 return false; 243} 244 245async function every(fn, options = undefined) { 246 if (typeof fn !== 'function') { 247 throw new ERR_INVALID_ARG_TYPE( 248 'fn', ['Function', 'AsyncFunction'], fn); 249 } 250 // https://en.wikipedia.org/wiki/De_Morgan%27s_laws 251 return !(await some.call(this, async (...args) => { 252 return !(await fn(...args)); 253 }, options)); 254} 255 256async function find(fn, options) { 257 for await (const result of filter.call(this, fn, options)) { 258 return result; 259 } 260 return undefined; 261} 262 263async function forEach(fn, options) { 264 if (typeof fn !== 'function') { 265 throw new ERR_INVALID_ARG_TYPE( 266 'fn', ['Function', 'AsyncFunction'], fn); 267 } 268 async function forEachFn(value, options) { 269 await fn(value, options); 270 return kEmpty; 271 } 272 // eslint-disable-next-line no-unused-vars 273 for await (const unused of map.call(this, forEachFn, options)); 274} 275 276function filter(fn, options) { 277 if (typeof fn !== 'function') { 278 throw new ERR_INVALID_ARG_TYPE( 279 'fn', ['Function', 'AsyncFunction'], fn); 280 } 281 async function filterFn(value, options) { 282 if (await fn(value, options)) { 283 return value; 284 } 285 return kEmpty; 286 } 287 return map.call(this, filterFn, options); 288} 289 290// Specific to provide better error to reduce since the argument is only 291// missing if the stream has no items in it - but the code is still appropriate 292class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS { 293 constructor() { 294 super('reduce'); 295 this.message = 'Reduce of an empty stream requires an initial value'; 296 } 297} 298 299async function reduce(reducer, initialValue, options) { 300 if (typeof reducer !== 'function') { 301 throw new ERR_INVALID_ARG_TYPE( 302 'reducer', ['Function', 'AsyncFunction'], reducer); 303 } 304 if (options != null) { 305 validateObject(options, 'options'); 306 } 307 if (options?.signal != null) { 308 validateAbortSignal(options.signal, 'options.signal'); 309 } 310 311 let hasInitialValue = arguments.length > 1; 312 if (options?.signal?.aborted) { 313 const err = new AbortError(undefined, { cause: options.signal.reason }); 314 this.once('error', () => {}); // The error is already propagated 315 await finished(this.destroy(err)); 316 throw err; 317 } 318 const ac = new AbortController(); 319 const signal = ac.signal; 320 if (options?.signal) { 321 const opts = { once: true, [kWeakHandler]: this, [kResistStopPropagation]: true }; 322 options.signal.addEventListener('abort', () => ac.abort(), opts); 323 } 324 let gotAnyItemFromStream = false; 325 try { 326 for await (const value of this) { 327 gotAnyItemFromStream = true; 328 if (options?.signal?.aborted) { 329 throw new AbortError(); 330 } 331 if (!hasInitialValue) { 332 initialValue = value; 333 hasInitialValue = true; 334 } else { 335 initialValue = await reducer(initialValue, value, { signal }); 336 } 337 } 338 if (!gotAnyItemFromStream && !hasInitialValue) { 339 throw new ReduceAwareErrMissingArgs(); 340 } 341 } finally { 342 ac.abort(); 343 } 344 return initialValue; 345} 346 347async function toArray(options) { 348 if (options != null) { 349 validateObject(options, 'options'); 350 } 351 if (options?.signal != null) { 352 validateAbortSignal(options.signal, 'options.signal'); 353 } 354 355 const result = []; 356 for await (const val of this) { 357 if (options?.signal?.aborted) { 358 throw new AbortError(undefined, { cause: options.signal.reason }); 359 } 360 ArrayPrototypePush(result, val); 361 } 362 return result; 363} 364 365function flatMap(fn, options) { 366 const values = map.call(this, fn, options); 367 return async function* flatMap() { 368 for await (const val of values) { 369 yield* val; 370 } 371 }.call(this); 372} 373 374function toIntegerOrInfinity(number) { 375 // We coerce here to align with the spec 376 // https://github.com/tc39/proposal-iterator-helpers/issues/169 377 number = Number(number); 378 if (NumberIsNaN(number)) { 379 return 0; 380 } 381 if (number < 0) { 382 throw new ERR_OUT_OF_RANGE('number', '>= 0', number); 383 } 384 return number; 385} 386 387function drop(number, options = undefined) { 388 if (options != null) { 389 validateObject(options, 'options'); 390 } 391 if (options?.signal != null) { 392 validateAbortSignal(options.signal, 'options.signal'); 393 } 394 395 number = toIntegerOrInfinity(number); 396 return async function* drop() { 397 if (options?.signal?.aborted) { 398 throw new AbortError(); 399 } 400 for await (const val of this) { 401 if (options?.signal?.aborted) { 402 throw new AbortError(); 403 } 404 if (number-- <= 0) { 405 yield val; 406 } 407 } 408 }.call(this); 409} 410 411function take(number, options = undefined) { 412 if (options != null) { 413 validateObject(options, 'options'); 414 } 415 if (options?.signal != null) { 416 validateAbortSignal(options.signal, 'options.signal'); 417 } 418 419 number = toIntegerOrInfinity(number); 420 return async function* take() { 421 if (options?.signal?.aborted) { 422 throw new AbortError(); 423 } 424 for await (const val of this) { 425 if (options?.signal?.aborted) { 426 throw new AbortError(); 427 } 428 if (number-- > 0) { 429 yield val; 430 } 431 432 // Don't get another item from iterator in case we reached the end 433 if (number <= 0) { 434 return; 435 } 436 } 437 }.call(this); 438} 439 440module.exports.streamReturningOperators = { 441 asIndexedPairs: deprecate(asIndexedPairs, 'readable.asIndexedPairs will be removed in a future version.'), 442 drop, 443 filter, 444 flatMap, 445 map, 446 take, 447 compose, 448}; 449 450module.exports.promiseReturningOperators = { 451 every, 452 forEach, 453 reduce, 454 toArray, 455 some, 456 find, 457}; 458