1'use strict'; 2 3const { 4 SymbolAsyncIterator, 5 SymbolIterator 6} = primordials; 7const { Buffer } = require('buffer'); 8 9const { 10 ERR_INVALID_ARG_TYPE, 11 ERR_STREAM_NULL_VALUES 12} = require('internal/errors').codes; 13 14function from(Readable, iterable, opts) { 15 let iterator; 16 if (typeof iterable === 'string' || iterable instanceof Buffer) { 17 return new Readable({ 18 objectMode: true, 19 ...opts, 20 read() { 21 this.push(iterable); 22 this.push(null); 23 } 24 }); 25 } 26 27 if (iterable && iterable[SymbolAsyncIterator]) 28 iterator = iterable[SymbolAsyncIterator](); 29 else if (iterable && iterable[SymbolIterator]) 30 iterator = iterable[SymbolIterator](); 31 else 32 throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable); 33 34 const readable = new Readable({ 35 objectMode: true, 36 highWaterMark: 1, 37 // TODO(ronag): What options should be allowed? 38 ...opts 39 }); 40 41 // Flag to protect against _read 42 // being called before last iteration completion. 43 let reading = false; 44 45 // Flag for when iterator needs to be explicitly closed. 46 let needToClose = false; 47 48 readable._read = function() { 49 if (!reading) { 50 reading = true; 51 next(); 52 } 53 }; 54 55 readable._destroy = function(error, cb) { 56 if (needToClose) { 57 needToClose = false; 58 close().then( 59 () => process.nextTick(cb, error), 60 (e) => process.nextTick(cb, error || e), 61 ); 62 } else { 63 cb(error); 64 } 65 }; 66 67 async function close() { 68 if (typeof iterator.return === 'function') { 69 const { value } = await iterator.return(); 70 await value; 71 } 72 } 73 74 async function next() { 75 try { 76 needToClose = false; 77 const { value, done } = await iterator.next(); 78 needToClose = !done; 79 if (done) { 80 readable.push(null); 81 } else if (readable.destroyed) { 82 await close(); 83 } else { 84 const res = await value; 85 if (res === null) { 86 reading = false; 87 throw new ERR_STREAM_NULL_VALUES(); 88 } else if (readable.push(res)) { 89 next(); 90 } else { 91 reading = false; 92 } 93 } 94 } catch (err) { 95 readable.destroy(err); 96 } 97 } 98 return readable; 99} 100 101module.exports = from; 102