• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  PromisePrototypeThen,
5  SymbolAsyncIterator,
6  SymbolIterator,
7} = primordials;
8const { Buffer } = require('buffer');
9
10const {
11  ERR_INVALID_ARG_TYPE,
12  ERR_STREAM_NULL_VALUES,
13} = require('internal/errors').codes;
14
15function from(Readable, iterable, opts) {
16  let iterator;
17  if (typeof iterable === 'string' || iterable instanceof Buffer) {
18    return new Readable({
19      objectMode: true,
20      ...opts,
21      read() {
22        this.push(iterable);
23        this.push(null);
24      },
25    });
26  }
27
28  let isAsync;
29  if (iterable && iterable[SymbolAsyncIterator]) {
30    isAsync = true;
31    iterator = iterable[SymbolAsyncIterator]();
32  } else if (iterable && iterable[SymbolIterator]) {
33    isAsync = false;
34    iterator = iterable[SymbolIterator]();
35  } else {
36    throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
37  }
38
39  const readable = new Readable({
40    objectMode: true,
41    highWaterMark: 1,
42    // TODO(ronag): What options should be allowed?
43    ...opts,
44  });
45
46  // Flag to protect against _read
47  // being called before last iteration completion.
48  let reading = false;
49
50  readable._read = function() {
51    if (!reading) {
52      reading = true;
53      next();
54    }
55  };
56
57  readable._destroy = function(error, cb) {
58    PromisePrototypeThen(
59      close(error),
60      () => process.nextTick(cb, error), // nextTick is here in case cb throws
61      (e) => process.nextTick(cb, e || error),
62    );
63  };
64
65  async function close(error) {
66    const hadError = (error !== undefined) && (error !== null);
67    const hasThrow = typeof iterator.throw === 'function';
68    if (hadError && hasThrow) {
69      const { value, done } = await iterator.throw(error);
70      await value;
71      if (done) {
72        return;
73      }
74    }
75    if (typeof iterator.return === 'function') {
76      const { value } = await iterator.return();
77      await value;
78    }
79  }
80
81  async function next() {
82    for (;;) {
83      try {
84        const { value, done } = isAsync ?
85          await iterator.next() :
86          iterator.next();
87
88        if (done) {
89          readable.push(null);
90        } else {
91          const res = (value &&
92            typeof value.then === 'function') ?
93            await value :
94            value;
95          if (res === null) {
96            reading = false;
97            throw new ERR_STREAM_NULL_VALUES();
98          } else if (readable.push(res)) {
99            continue;
100          } else {
101            reading = false;
102          }
103        }
104      } catch (err) {
105        readable.destroy(err);
106      }
107      break;
108    }
109  }
110  return readable;
111}
112
113module.exports = from;
114