• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const { AbortController, AbortSignal } = require('internal/abort_controller');
4
5const {
6  codes: {
7    ERR_INVALID_ARG_VALUE,
8    ERR_INVALID_ARG_TYPE,
9    ERR_MISSING_ARGS,
10    ERR_OUT_OF_RANGE,
11  },
12  AbortError,
13} = require('internal/errors');
14const {
15  validateAbortSignal,
16  validateInteger,
17  validateObject,
18} = require('internal/validators');
19const { kWeakHandler, kResistStopPropagation } = require('internal/event_target');
20const { finished } = require('internal/streams/end-of-stream');
21const staticCompose = require('internal/streams/compose');
22const {
23  addAbortSignalNoValidate,
24} = require('internal/streams/add-abort-signal');
25const { isWritable, isNodeStream } = require('internal/streams/utils');
26const { deprecate } = require('internal/util');
27
28const {
29  ArrayPrototypePush,
30  Boolean,
31  MathFloor,
32  Number,
33  NumberIsNaN,
34  Promise,
35  PromiseReject,
36  PromiseResolve,
37  PromisePrototypeThen,
38  Symbol,
39} = primordials;
40
41const kEmpty = Symbol('kEmpty');
42const kEof = Symbol('kEof');
43
44function compose(stream, options) {
45  if (options != null) {
46    validateObject(options, 'options');
47  }
48  if (options?.signal != null) {
49    validateAbortSignal(options.signal, 'options.signal');
50  }
51
52  if (isNodeStream(stream) && !isWritable(stream)) {
53    throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable');
54  }
55
56  const composedStream = staticCompose(this, stream);
57
58  if (options?.signal) {
59    // Not validating as we already validated before
60    addAbortSignalNoValidate(
61      options.signal,
62      composedStream,
63    );
64  }
65
66  return composedStream;
67}
68
69function map(fn, options) {
70  if (typeof fn !== 'function') {
71    throw new ERR_INVALID_ARG_TYPE(
72      'fn', ['Function', 'AsyncFunction'], fn);
73  }
74  if (options != null) {
75    validateObject(options, 'options');
76  }
77  if (options?.signal != null) {
78    validateAbortSignal(options.signal, 'options.signal');
79  }
80
81  let concurrency = 1;
82  if (options?.concurrency != null) {
83    concurrency = MathFloor(options.concurrency);
84  }
85
86  let highWaterMark = concurrency - 1;
87  if (options?.highWaterMark != null) {
88    highWaterMark = MathFloor(options.highWaterMark);
89  }
90
91  validateInteger(concurrency, 'options.concurrency', 1);
92  validateInteger(highWaterMark, 'options.highWaterMark', 0);
93
94  highWaterMark += concurrency;
95
96  return async function* map() {
97    const signal = AbortSignal.any([options?.signal].filter(Boolean));
98    const stream = this;
99    const queue = [];
100    const signalOpt = { signal };
101
102    let next;
103    let resume;
104    let done = false;
105    let cnt = 0;
106
107    function onCatch() {
108      done = true;
109      afterItemProcessed();
110    }
111
112    function afterItemProcessed() {
113      cnt -= 1;
114      maybeResume();
115    }
116
117    function maybeResume() {
118      if (
119        resume &&
120        !done &&
121        cnt < concurrency &&
122        queue.length < highWaterMark
123      ) {
124        resume();
125        resume = null;
126      }
127    }
128
129    async function pump() {
130      try {
131        for await (let val of stream) {
132          if (done) {
133            return;
134          }
135
136          if (signal.aborted) {
137            throw new AbortError();
138          }
139
140          try {
141            val = fn(val, signalOpt);
142
143            if (val === kEmpty) {
144              continue;
145            }
146
147            val = PromiseResolve(val);
148          } catch (err) {
149            val = PromiseReject(err);
150          }
151
152          cnt += 1;
153
154          PromisePrototypeThen(val, afterItemProcessed, onCatch);
155
156          queue.push(val);
157          if (next) {
158            next();
159            next = null;
160          }
161
162          if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) {
163            await new Promise((resolve) => {
164              resume = resolve;
165            });
166          }
167        }
168        queue.push(kEof);
169      } catch (err) {
170        const val = PromiseReject(err);
171        PromisePrototypeThen(val, afterItemProcessed, onCatch);
172        queue.push(val);
173      } finally {
174        done = true;
175        if (next) {
176          next();
177          next = null;
178        }
179      }
180    }
181
182    pump();
183
184    try {
185      while (true) {
186        while (queue.length > 0) {
187          const val = await queue[0];
188
189          if (val === kEof) {
190            return;
191          }
192
193          if (signal.aborted) {
194            throw new AbortError();
195          }
196
197          if (val !== kEmpty) {
198            yield val;
199          }
200
201          queue.shift();
202          maybeResume();
203        }
204
205        await new Promise((resolve) => {
206          next = resolve;
207        });
208      }
209    } finally {
210      done = true;
211      if (resume) {
212        resume();
213        resume = null;
214      }
215    }
216  }.call(this);
217}
218
219function asIndexedPairs(options = undefined) {
220  if (options != null) {
221    validateObject(options, 'options');
222  }
223  if (options?.signal != null) {
224    validateAbortSignal(options.signal, 'options.signal');
225  }
226
227  return async function* asIndexedPairs() {
228    let index = 0;
229    for await (const val of this) {
230      if (options?.signal?.aborted) {
231        throw new AbortError({ cause: options.signal.reason });
232      }
233      yield [index++, val];
234    }
235  }.call(this);
236}
237
238async function some(fn, options = undefined) {
239  for await (const unused of filter.call(this, fn, options)) {
240    return true;
241  }
242  return false;
243}
244
245async function every(fn, options = undefined) {
246  if (typeof fn !== 'function') {
247    throw new ERR_INVALID_ARG_TYPE(
248      'fn', ['Function', 'AsyncFunction'], fn);
249  }
250  // https://en.wikipedia.org/wiki/De_Morgan%27s_laws
251  return !(await some.call(this, async (...args) => {
252    return !(await fn(...args));
253  }, options));
254}
255
256async function find(fn, options) {
257  for await (const result of filter.call(this, fn, options)) {
258    return result;
259  }
260  return undefined;
261}
262
263async function forEach(fn, options) {
264  if (typeof fn !== 'function') {
265    throw new ERR_INVALID_ARG_TYPE(
266      'fn', ['Function', 'AsyncFunction'], fn);
267  }
268  async function forEachFn(value, options) {
269    await fn(value, options);
270    return kEmpty;
271  }
272  // eslint-disable-next-line no-unused-vars
273  for await (const unused of map.call(this, forEachFn, options));
274}
275
276function filter(fn, options) {
277  if (typeof fn !== 'function') {
278    throw new ERR_INVALID_ARG_TYPE(
279      'fn', ['Function', 'AsyncFunction'], fn);
280  }
281  async function filterFn(value, options) {
282    if (await fn(value, options)) {
283      return value;
284    }
285    return kEmpty;
286  }
287  return map.call(this, filterFn, options);
288}
289
290// Specific to provide better error to reduce since the argument is only
291// missing if the stream has no items in it - but the code is still appropriate
292class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
293  constructor() {
294    super('reduce');
295    this.message = 'Reduce of an empty stream requires an initial value';
296  }
297}
298
299async function reduce(reducer, initialValue, options) {
300  if (typeof reducer !== 'function') {
301    throw new ERR_INVALID_ARG_TYPE(
302      'reducer', ['Function', 'AsyncFunction'], reducer);
303  }
304  if (options != null) {
305    validateObject(options, 'options');
306  }
307  if (options?.signal != null) {
308    validateAbortSignal(options.signal, 'options.signal');
309  }
310
311  let hasInitialValue = arguments.length > 1;
312  if (options?.signal?.aborted) {
313    const err = new AbortError(undefined, { cause: options.signal.reason });
314    this.once('error', () => {}); // The error is already propagated
315    await finished(this.destroy(err));
316    throw err;
317  }
318  const ac = new AbortController();
319  const signal = ac.signal;
320  if (options?.signal) {
321    const opts = { once: true, [kWeakHandler]: this, [kResistStopPropagation]: true };
322    options.signal.addEventListener('abort', () => ac.abort(), opts);
323  }
324  let gotAnyItemFromStream = false;
325  try {
326    for await (const value of this) {
327      gotAnyItemFromStream = true;
328      if (options?.signal?.aborted) {
329        throw new AbortError();
330      }
331      if (!hasInitialValue) {
332        initialValue = value;
333        hasInitialValue = true;
334      } else {
335        initialValue = await reducer(initialValue, value, { signal });
336      }
337    }
338    if (!gotAnyItemFromStream && !hasInitialValue) {
339      throw new ReduceAwareErrMissingArgs();
340    }
341  } finally {
342    ac.abort();
343  }
344  return initialValue;
345}
346
347async function toArray(options) {
348  if (options != null) {
349    validateObject(options, 'options');
350  }
351  if (options?.signal != null) {
352    validateAbortSignal(options.signal, 'options.signal');
353  }
354
355  const result = [];
356  for await (const val of this) {
357    if (options?.signal?.aborted) {
358      throw new AbortError(undefined, { cause: options.signal.reason });
359    }
360    ArrayPrototypePush(result, val);
361  }
362  return result;
363}
364
365function flatMap(fn, options) {
366  const values = map.call(this, fn, options);
367  return async function* flatMap() {
368    for await (const val of values) {
369      yield* val;
370    }
371  }.call(this);
372}
373
374function toIntegerOrInfinity(number) {
375  // We coerce here to align with the spec
376  // https://github.com/tc39/proposal-iterator-helpers/issues/169
377  number = Number(number);
378  if (NumberIsNaN(number)) {
379    return 0;
380  }
381  if (number < 0) {
382    throw new ERR_OUT_OF_RANGE('number', '>= 0', number);
383  }
384  return number;
385}
386
387function drop(number, options = undefined) {
388  if (options != null) {
389    validateObject(options, 'options');
390  }
391  if (options?.signal != null) {
392    validateAbortSignal(options.signal, 'options.signal');
393  }
394
395  number = toIntegerOrInfinity(number);
396  return async function* drop() {
397    if (options?.signal?.aborted) {
398      throw new AbortError();
399    }
400    for await (const val of this) {
401      if (options?.signal?.aborted) {
402        throw new AbortError();
403      }
404      if (number-- <= 0) {
405        yield val;
406      }
407    }
408  }.call(this);
409}
410
411function take(number, options = undefined) {
412  if (options != null) {
413    validateObject(options, 'options');
414  }
415  if (options?.signal != null) {
416    validateAbortSignal(options.signal, 'options.signal');
417  }
418
419  number = toIntegerOrInfinity(number);
420  return async function* take() {
421    if (options?.signal?.aborted) {
422      throw new AbortError();
423    }
424    for await (const val of this) {
425      if (options?.signal?.aborted) {
426        throw new AbortError();
427      }
428      if (number-- > 0) {
429        yield val;
430      }
431
432      // Don't get another item from iterator in case we reached the end
433      if (number <= 0) {
434        return;
435      }
436    }
437  }.call(this);
438}
439
440module.exports.streamReturningOperators = {
441  asIndexedPairs: deprecate(asIndexedPairs, 'readable.asIndexedPairs will be removed in a future version.'),
442  drop,
443  filter,
444  flatMap,
445  map,
446  take,
447  compose,
448};
449
450module.exports.promiseReturningOperators = {
451  every,
452  forEach,
453  reduce,
454  toArray,
455  some,
456  find,
457};
458