• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  SymbolAsyncIterator,
5  SymbolIterator
6} = primordials;
7const { Buffer } = require('buffer');
8
9const {
10  ERR_INVALID_ARG_TYPE,
11  ERR_STREAM_NULL_VALUES
12} = require('internal/errors').codes;
13
14function from(Readable, iterable, opts) {
15  let iterator;
16  if (typeof iterable === 'string' || iterable instanceof Buffer) {
17    return new Readable({
18      objectMode: true,
19      ...opts,
20      read() {
21        this.push(iterable);
22        this.push(null);
23      }
24    });
25  }
26
27  if (iterable && iterable[SymbolAsyncIterator])
28    iterator = iterable[SymbolAsyncIterator]();
29  else if (iterable && iterable[SymbolIterator])
30    iterator = iterable[SymbolIterator]();
31  else
32    throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
33
34  const readable = new Readable({
35    objectMode: true,
36    highWaterMark: 1,
37    // TODO(ronag): What options should be allowed?
38    ...opts
39  });
40
41  // Flag to protect against _read
42  // being called before last iteration completion.
43  let reading = false;
44
45  // Flag for when iterator needs to be explicitly closed.
46  let needToClose = false;
47
48  readable._read = function() {
49    if (!reading) {
50      reading = true;
51      next();
52    }
53  };
54
55  readable._destroy = function(error, cb) {
56    if (needToClose) {
57      needToClose = false;
58      close().then(
59        () => process.nextTick(cb, error),
60        (e) => process.nextTick(cb, error || e),
61      );
62    } else {
63      cb(error);
64    }
65  };
66
67  async function close() {
68    if (typeof iterator.return === 'function') {
69      const { value } = await iterator.return();
70      await value;
71    }
72  }
73
74  async function next() {
75    try {
76      needToClose = false;
77      const { value, done } = await iterator.next();
78      needToClose = !done;
79      if (done) {
80        readable.push(null);
81      } else if (readable.destroyed) {
82        await close();
83      } else {
84        const res = await value;
85        if (res === null) {
86          reading = false;
87          throw new ERR_STREAM_NULL_VALUES();
88        } else if (readable.push(res)) {
89          next();
90        } else {
91          reading = false;
92        }
93      }
94    } catch (err) {
95      readable.destroy(err);
96    }
97  }
98  return readable;
99}
100
101module.exports = from;
102