1'use strict'; 2 3const { 4 PromisePrototypeThen, 5 SymbolAsyncIterator, 6 SymbolIterator, 7} = primordials; 8const { Buffer } = require('buffer'); 9 10const { 11 ERR_INVALID_ARG_TYPE, 12 ERR_STREAM_NULL_VALUES, 13} = require('internal/errors').codes; 14 15function from(Readable, iterable, opts) { 16 let iterator; 17 if (typeof iterable === 'string' || iterable instanceof Buffer) { 18 return new Readable({ 19 objectMode: true, 20 ...opts, 21 read() { 22 this.push(iterable); 23 this.push(null); 24 }, 25 }); 26 } 27 28 let isAsync; 29 if (iterable && iterable[SymbolAsyncIterator]) { 30 isAsync = true; 31 iterator = iterable[SymbolAsyncIterator](); 32 } else if (iterable && iterable[SymbolIterator]) { 33 isAsync = false; 34 iterator = iterable[SymbolIterator](); 35 } else { 36 throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable); 37 } 38 39 const readable = new Readable({ 40 objectMode: true, 41 highWaterMark: 1, 42 // TODO(ronag): What options should be allowed? 43 ...opts, 44 }); 45 46 // Flag to protect against _read 47 // being called before last iteration completion. 48 let reading = false; 49 50 readable._read = function() { 51 if (!reading) { 52 reading = true; 53 next(); 54 } 55 }; 56 57 readable._destroy = function(error, cb) { 58 PromisePrototypeThen( 59 close(error), 60 () => process.nextTick(cb, error), // nextTick is here in case cb throws 61 (e) => process.nextTick(cb, e || error), 62 ); 63 }; 64 65 async function close(error) { 66 const hadError = (error !== undefined) && (error !== null); 67 const hasThrow = typeof iterator.throw === 'function'; 68 if (hadError && hasThrow) { 69 const { value, done } = await iterator.throw(error); 70 await value; 71 if (done) { 72 return; 73 } 74 } 75 if (typeof iterator.return === 'function') { 76 const { value } = await iterator.return(); 77 await value; 78 } 79 } 80 81 async function next() { 82 for (;;) { 83 try { 84 const { value, done } = isAsync ? 85 await iterator.next() : 86 iterator.next(); 87 88 if (done) { 89 readable.push(null); 90 } else { 91 const res = (value && 92 typeof value.then === 'function') ? 93 await value : 94 value; 95 if (res === null) { 96 reading = false; 97 throw new ERR_STREAM_NULL_VALUES(); 98 } else if (readable.push(res)) { 99 continue; 100 } else { 101 reading = false; 102 } 103 } 104 } catch (err) { 105 readable.destroy(err); 106 } 107 break; 108 } 109 } 110 return readable; 111} 112 113module.exports = from; 114