1'use strict' 2 3/* replacement start */ 4 5const process = require('process/') 6 7/* replacement end */ 8 9const { PromisePrototypeThen, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials') 10const { Buffer } = require('buffer') 11const { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } = require('../../ours/errors').codes 12function from(Readable, iterable, opts) { 13 let iterator 14 if (typeof iterable === 'string' || iterable instanceof Buffer) { 15 return new Readable({ 16 objectMode: true, 17 ...opts, 18 read() { 19 this.push(iterable) 20 this.push(null) 21 } 22 }) 23 } 24 let isAsync 25 if (iterable && iterable[SymbolAsyncIterator]) { 26 isAsync = true 27 iterator = iterable[SymbolAsyncIterator]() 28 } else if (iterable && iterable[SymbolIterator]) { 29 isAsync = false 30 iterator = iterable[SymbolIterator]() 31 } else { 32 throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable) 33 } 34 const readable = new Readable({ 35 objectMode: true, 36 highWaterMark: 1, 37 // TODO(ronag): What options should be allowed? 38 ...opts 39 }) 40 41 // Flag to protect against _read 42 // being called before last iteration completion. 43 let reading = false 44 readable._read = function () { 45 if (!reading) { 46 reading = true 47 next() 48 } 49 } 50 readable._destroy = function (error, cb) { 51 PromisePrototypeThen( 52 close(error), 53 () => process.nextTick(cb, error), 54 // nextTick is here in case cb throws 55 (e) => process.nextTick(cb, e || error) 56 ) 57 } 58 async function close(error) { 59 const hadError = error !== undefined && error !== null 60 const hasThrow = typeof iterator.throw === 'function' 61 if (hadError && hasThrow) { 62 const { value, done } = await iterator.throw(error) 63 await value 64 if (done) { 65 return 66 } 67 } 68 if (typeof iterator.return === 'function') { 69 const { value } = await iterator.return() 70 await value 71 } 72 } 73 async function next() { 74 for (;;) { 75 try { 76 const { value, done } = isAsync ? await iterator.next() : iterator.next() 77 if (done) { 78 readable.push(null) 79 } else { 80 const res = value && typeof value.then === 'function' ? await value : value 81 if (res === null) { 82 reading = false 83 throw new ERR_STREAM_NULL_VALUES() 84 } else if (readable.push(res)) { 85 continue 86 } else { 87 reading = false 88 } 89 } 90 } catch (err) { 91 readable.destroy(err) 92 } 93 break 94 } 95 } 96 return readable 97} 98module.exports = from 99