• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict'
2
3/* replacement start */
4
5const process = require('process/')
6
7/* replacement end */
8
9const { PromisePrototypeThen, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials')
10const { Buffer } = require('buffer')
11const { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } = require('../../ours/errors').codes
12function from(Readable, iterable, opts) {
13  let iterator
14  if (typeof iterable === 'string' || iterable instanceof Buffer) {
15    return new Readable({
16      objectMode: true,
17      ...opts,
18      read() {
19        this.push(iterable)
20        this.push(null)
21      }
22    })
23  }
24  let isAsync
25  if (iterable && iterable[SymbolAsyncIterator]) {
26    isAsync = true
27    iterator = iterable[SymbolAsyncIterator]()
28  } else if (iterable && iterable[SymbolIterator]) {
29    isAsync = false
30    iterator = iterable[SymbolIterator]()
31  } else {
32    throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable)
33  }
34  const readable = new Readable({
35    objectMode: true,
36    highWaterMark: 1,
37    // TODO(ronag): What options should be allowed?
38    ...opts
39  })
40
41  // Flag to protect against _read
42  // being called before last iteration completion.
43  let reading = false
44  readable._read = function () {
45    if (!reading) {
46      reading = true
47      next()
48    }
49  }
50  readable._destroy = function (error, cb) {
51    PromisePrototypeThen(
52      close(error),
53      () => process.nextTick(cb, error),
54      // nextTick is here in case cb throws
55      (e) => process.nextTick(cb, e || error)
56    )
57  }
58  async function close(error) {
59    const hadError = error !== undefined && error !== null
60    const hasThrow = typeof iterator.throw === 'function'
61    if (hadError && hasThrow) {
62      const { value, done } = await iterator.throw(error)
63      await value
64      if (done) {
65        return
66      }
67    }
68    if (typeof iterator.return === 'function') {
69      const { value } = await iterator.return()
70      await value
71    }
72  }
73  async function next() {
74    for (;;) {
75      try {
76        const { value, done } = isAsync ? await iterator.next() : iterator.next()
77        if (done) {
78          readable.push(null)
79        } else {
80          const res = value && typeof value.then === 'function' ? await value : value
81          if (res === null) {
82            reading = false
83            throw new ERR_STREAM_NULL_VALUES()
84          } else if (readable.push(res)) {
85            continue
86          } else {
87            reading = false
88          }
89        }
90      } catch (err) {
91        readable.destroy(err)
92      }
93      break
94    }
95  }
96  return readable
97}
98module.exports = from
99