1'use strict' 2const MiniPass = require('minipass') 3const EE = require('events').EventEmitter 4const fs = require('fs') 5 6// for writev 7const binding = process.binding('fs') 8const writeBuffers = binding.writeBuffers 9/* istanbul ignore next */ 10const FSReqWrap = binding.FSReqWrap || binding.FSReqCallback 11 12const _autoClose = Symbol('_autoClose') 13const _close = Symbol('_close') 14const _ended = Symbol('_ended') 15const _fd = Symbol('_fd') 16const _finished = Symbol('_finished') 17const _flags = Symbol('_flags') 18const _flush = Symbol('_flush') 19const _handleChunk = Symbol('_handleChunk') 20const _makeBuf = Symbol('_makeBuf') 21const _mode = Symbol('_mode') 22const _needDrain = Symbol('_needDrain') 23const _onerror = Symbol('_onerror') 24const _onopen = Symbol('_onopen') 25const _onread = Symbol('_onread') 26const _onwrite = Symbol('_onwrite') 27const _open = Symbol('_open') 28const _path = Symbol('_path') 29const _pos = Symbol('_pos') 30const _queue = Symbol('_queue') 31const _read = Symbol('_read') 32const _readSize = Symbol('_readSize') 33const _reading = Symbol('_reading') 34const _remain = Symbol('_remain') 35const _size = Symbol('_size') 36const _write = Symbol('_write') 37const _writing = Symbol('_writing') 38const _defaultFlag = Symbol('_defaultFlag') 39 40class ReadStream extends MiniPass { 41 constructor (path, opt) { 42 opt = opt || {} 43 super(opt) 44 45 this.writable = false 46 47 if (typeof path !== 'string') 48 throw new TypeError('path must be a string') 49 50 this[_fd] = typeof opt.fd === 'number' ? opt.fd : null 51 this[_path] = path 52 this[_readSize] = opt.readSize || 16*1024*1024 53 this[_reading] = false 54 this[_size] = typeof opt.size === 'number' ? opt.size : Infinity 55 this[_remain] = this[_size] 56 this[_autoClose] = typeof opt.autoClose === 'boolean' ? 57 opt.autoClose : true 58 59 if (typeof this[_fd] === 'number') 60 this[_read]() 61 else 62 this[_open]() 63 } 64 65 get fd () { return this[_fd] } 66 get path () { return this[_path] } 67 68 write () { 69 throw new TypeError('this is a readable stream') 70 } 71 72 end () { 73 throw new TypeError('this is a readable stream') 74 } 75 76 [_open] () { 77 fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd)) 78 } 79 80 [_onopen] (er, fd) { 81 if (er) 82 this[_onerror](er) 83 else { 84 this[_fd] = fd 85 this.emit('open', fd) 86 this[_read]() 87 } 88 } 89 90 [_makeBuf] () { 91 return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain])) 92 } 93 94 [_read] () { 95 if (!this[_reading]) { 96 this[_reading] = true 97 const buf = this[_makeBuf]() 98 /* istanbul ignore if */ 99 if (buf.length === 0) return process.nextTick(() => this[_onread](null, 0, buf)) 100 fs.read(this[_fd], buf, 0, buf.length, null, (er, br, buf) => 101 this[_onread](er, br, buf)) 102 } 103 } 104 105 [_onread] (er, br, buf) { 106 this[_reading] = false 107 if (er) 108 this[_onerror](er) 109 else if (this[_handleChunk](br, buf)) 110 this[_read]() 111 } 112 113 [_close] () { 114 if (this[_autoClose] && typeof this[_fd] === 'number') { 115 fs.close(this[_fd], _ => this.emit('close')) 116 this[_fd] = null 117 } 118 } 119 120 [_onerror] (er) { 121 this[_reading] = true 122 this[_close]() 123 this.emit('error', er) 124 } 125 126 [_handleChunk] (br, buf) { 127 let ret = false 128 // no effect if infinite 129 this[_remain] -= br 130 if (br > 0) 131 ret = super.write(br < buf.length ? buf.slice(0, br) : buf) 132 133 if (br === 0 || this[_remain] <= 0) { 134 ret = false 135 this[_close]() 136 super.end() 137 } 138 139 return ret 140 } 141 142 emit (ev, data) { 143 switch (ev) { 144 case 'prefinish': 145 case 'finish': 146 break 147 148 case 'drain': 149 if (typeof this[_fd] === 'number') 150 this[_read]() 151 break 152 153 default: 154 return super.emit(ev, data) 155 } 156 } 157} 158 159class ReadStreamSync extends ReadStream { 160 [_open] () { 161 let threw = true 162 try { 163 this[_onopen](null, fs.openSync(this[_path], 'r')) 164 threw = false 165 } finally { 166 if (threw) 167 this[_close]() 168 } 169 } 170 171 [_read] () { 172 let threw = true 173 try { 174 if (!this[_reading]) { 175 this[_reading] = true 176 do { 177 const buf = this[_makeBuf]() 178 /* istanbul ignore next */ 179 const br = buf.length === 0 ? 0 : fs.readSync(this[_fd], buf, 0, buf.length, null) 180 if (!this[_handleChunk](br, buf)) 181 break 182 } while (true) 183 this[_reading] = false 184 } 185 threw = false 186 } finally { 187 if (threw) 188 this[_close]() 189 } 190 } 191 192 [_close] () { 193 if (this[_autoClose] && typeof this[_fd] === 'number') { 194 try { 195 fs.closeSync(this[_fd]) 196 } catch (er) {} 197 this[_fd] = null 198 this.emit('close') 199 } 200 } 201} 202 203class WriteStream extends EE { 204 constructor (path, opt) { 205 opt = opt || {} 206 super(opt) 207 this.readable = false 208 this[_writing] = false 209 this[_ended] = false 210 this[_needDrain] = false 211 this[_queue] = [] 212 this[_path] = path 213 this[_fd] = typeof opt.fd === 'number' ? opt.fd : null 214 this[_mode] = opt.mode === undefined ? 0o666 : opt.mode 215 this[_pos] = typeof opt.start === 'number' ? opt.start : null 216 this[_autoClose] = typeof opt.autoClose === 'boolean' ? 217 opt.autoClose : true 218 219 // truncating makes no sense when writing into the middle 220 const defaultFlag = this[_pos] !== null ? 'r+' : 'w' 221 this[_defaultFlag] = opt.flags === undefined 222 this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags 223 224 if (this[_fd] === null) 225 this[_open]() 226 } 227 228 get fd () { return this[_fd] } 229 get path () { return this[_path] } 230 231 [_onerror] (er) { 232 this[_close]() 233 this[_writing] = true 234 this.emit('error', er) 235 } 236 237 [_open] () { 238 fs.open(this[_path], this[_flags], this[_mode], 239 (er, fd) => this[_onopen](er, fd)) 240 } 241 242 [_onopen] (er, fd) { 243 if (this[_defaultFlag] && 244 this[_flags] === 'r+' && 245 er && er.code === 'ENOENT') { 246 this[_flags] = 'w' 247 this[_open]() 248 } else if (er) 249 this[_onerror](er) 250 else { 251 this[_fd] = fd 252 this.emit('open', fd) 253 this[_flush]() 254 } 255 } 256 257 end (buf, enc) { 258 if (buf) 259 this.write(buf, enc) 260 261 this[_ended] = true 262 263 // synthetic after-write logic, where drain/finish live 264 if (!this[_writing] && !this[_queue].length && 265 typeof this[_fd] === 'number') 266 this[_onwrite](null, 0) 267 } 268 269 write (buf, enc) { 270 if (typeof buf === 'string') 271 buf = new Buffer(buf, enc) 272 273 if (this[_ended]) { 274 this.emit('error', new Error('write() after end()')) 275 return false 276 } 277 278 if (this[_fd] === null || this[_writing] || this[_queue].length) { 279 this[_queue].push(buf) 280 this[_needDrain] = true 281 return false 282 } 283 284 this[_writing] = true 285 this[_write](buf) 286 return true 287 } 288 289 [_write] (buf) { 290 fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) => 291 this[_onwrite](er, bw)) 292 } 293 294 [_onwrite] (er, bw) { 295 if (er) 296 this[_onerror](er) 297 else { 298 if (this[_pos] !== null) 299 this[_pos] += bw 300 if (this[_queue].length) 301 this[_flush]() 302 else { 303 this[_writing] = false 304 305 if (this[_ended] && !this[_finished]) { 306 this[_finished] = true 307 this[_close]() 308 this.emit('finish') 309 } else if (this[_needDrain]) { 310 this[_needDrain] = false 311 this.emit('drain') 312 } 313 } 314 } 315 } 316 317 [_flush] () { 318 if (this[_queue].length === 0) { 319 if (this[_ended]) 320 this[_onwrite](null, 0) 321 } else if (this[_queue].length === 1) 322 this[_write](this[_queue].pop()) 323 else { 324 const iovec = this[_queue] 325 this[_queue] = [] 326 writev(this[_fd], iovec, this[_pos], 327 (er, bw) => this[_onwrite](er, bw)) 328 } 329 } 330 331 [_close] () { 332 if (this[_autoClose] && typeof this[_fd] === 'number') { 333 fs.close(this[_fd], _ => this.emit('close')) 334 this[_fd] = null 335 } 336 } 337} 338 339class WriteStreamSync extends WriteStream { 340 [_open] () { 341 let fd 342 try { 343 fd = fs.openSync(this[_path], this[_flags], this[_mode]) 344 } catch (er) { 345 if (this[_defaultFlag] && 346 this[_flags] === 'r+' && 347 er && er.code === 'ENOENT') { 348 this[_flags] = 'w' 349 return this[_open]() 350 } else 351 throw er 352 } 353 this[_onopen](null, fd) 354 } 355 356 [_close] () { 357 if (this[_autoClose] && typeof this[_fd] === 'number') { 358 try { 359 fs.closeSync(this[_fd]) 360 } catch (er) {} 361 this[_fd] = null 362 this.emit('close') 363 } 364 } 365 366 [_write] (buf) { 367 try { 368 this[_onwrite](null, 369 fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos])) 370 } catch (er) { 371 this[_onwrite](er, 0) 372 } 373 } 374} 375 376const writev = (fd, iovec, pos, cb) => { 377 const done = (er, bw) => cb(er, bw, iovec) 378 const req = new FSReqWrap() 379 req.oncomplete = done 380 binding.writeBuffers(fd, iovec, pos, req) 381} 382 383exports.ReadStream = ReadStream 384exports.ReadStreamSync = ReadStreamSync 385 386exports.WriteStream = WriteStream 387exports.WriteStreamSync = WriteStreamSync 388