1'use strict'; 2 3const { 4 Array, 5 MathMin, 6 ObjectDefineProperty, 7 ObjectSetPrototypeOf, 8 Symbol, 9} = primordials; 10 11const { 12 ERR_INVALID_ARG_TYPE, 13 ERR_OUT_OF_RANGE, 14 ERR_STREAM_DESTROYED 15} = require('internal/errors').codes; 16const { deprecate } = require('internal/util'); 17const { validateInteger } = require('internal/validators'); 18const fs = require('fs'); 19const { Buffer } = require('buffer'); 20const { 21 copyObject, 22 getOptions, 23} = require('internal/fs/utils'); 24const { Readable, Writable, finished } = require('stream'); 25const { toPathIfFileURL } = require('internal/url'); 26const kIoDone = Symbol('kIoDone'); 27const kIsPerformingIO = Symbol('kIsPerformingIO'); 28 29const kFs = Symbol('kFs'); 30 31function ReadStream(path, options) { 32 if (!(this instanceof ReadStream)) 33 return new ReadStream(path, options); 34 35 // A little bit bigger buffer and water marks by default 36 options = copyObject(getOptions(options, {})); 37 if (options.highWaterMark === undefined) 38 options.highWaterMark = 64 * 1024; 39 40 if (options.autoDestroy === undefined) { 41 options.autoDestroy = false; 42 } 43 44 this[kFs] = options.fs || fs; 45 46 if (typeof this[kFs].open !== 'function') { 47 throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function', 48 this[kFs].open); 49 } 50 51 if (typeof this[kFs].read !== 'function') { 52 throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function', 53 this[kFs].read); 54 } 55 56 if (typeof this[kFs].close !== 'function') { 57 throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function', 58 this[kFs].close); 59 } 60 61 Readable.call(this, options); 62 63 // Path will be ignored when fd is specified, so it can be falsy 64 this.path = toPathIfFileURL(path); 65 this.fd = options.fd === undefined ? null : options.fd; 66 this.flags = options.flags === undefined ? 'r' : options.flags; 67 this.mode = options.mode === undefined ? 0o666 : options.mode; 68 69 this.start = options.start; 70 this.end = options.end; 71 this.autoClose = options.autoClose === undefined ? true : options.autoClose; 72 this.pos = undefined; 73 this.bytesRead = 0; 74 this.closed = false; 75 this[kIsPerformingIO] = false; 76 77 if (this.start !== undefined) { 78 validateInteger(this.start, 'start', 0); 79 80 this.pos = this.start; 81 } 82 83 if (this.end === undefined) { 84 this.end = Infinity; 85 } else if (this.end !== Infinity) { 86 validateInteger(this.end, 'end', 0); 87 88 if (this.start !== undefined && this.start > this.end) { 89 throw new ERR_OUT_OF_RANGE( 90 'start', 91 `<= "end" (here: ${this.end})`, 92 this.start 93 ); 94 } 95 } 96 97 if (typeof this.fd !== 'number') 98 _openReadFs(this); 99 100 this.on('end', function() { 101 if (this.autoClose) { 102 this.destroy(); 103 } 104 }); 105} 106ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype); 107ObjectSetPrototypeOf(ReadStream, Readable); 108 109const openReadFs = deprecate(function() { 110 _openReadFs(this); 111}, 'ReadStream.prototype.open() is deprecated', 'DEP0135'); 112ReadStream.prototype.open = openReadFs; 113 114function _openReadFs(stream) { 115 // Backwards compat for overriden open. 116 if (stream.open !== openReadFs) { 117 stream.open(); 118 return; 119 } 120 121 stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => { 122 if (er) { 123 if (stream.autoClose) { 124 stream.destroy(); 125 } 126 stream.emit('error', er); 127 return; 128 } 129 130 stream.fd = fd; 131 stream.emit('open', fd); 132 stream.emit('ready'); 133 // Start the flow of data. 134 stream.read(); 135 }); 136} 137 138ReadStream.prototype._read = function(n) { 139 if (typeof this.fd !== 'number') { 140 return this.once('open', function() { 141 this._read(n); 142 }); 143 } 144 145 if (this.destroyed) return; 146 147 n = this.pos !== undefined ? 148 MathMin(this.end - this.pos + 1, n) : 149 MathMin(this.end - this.bytesRead + 1, n); 150 151 if (n <= 0) { 152 this.push(null); 153 return; 154 } 155 156 const buf = Buffer.allocUnsafeSlow(n); 157 158 this[kIsPerformingIO] = true; 159 this[kFs] 160 .read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => { 161 this[kIsPerformingIO] = false; 162 163 // Tell ._destroy() that it's safe to close the fd now. 164 if (this.destroyed) { 165 this.emit(kIoDone, er); 166 return; 167 } 168 169 if (er) { 170 if (this.autoClose) { 171 this.destroy(); 172 } 173 this.emit('error', er); 174 } else if (bytesRead > 0) { 175 if (this.pos !== undefined) { 176 this.pos += bytesRead; 177 } 178 179 this.bytesRead += bytesRead; 180 181 if (bytesRead !== buf.length) { 182 // Slow path. Shrink to fit. 183 // Copy instead of slice so that we don't retain 184 // large backing buffer for small reads. 185 const dst = Buffer.allocUnsafeSlow(bytesRead); 186 buf.copy(dst, 0, 0, bytesRead); 187 buf = dst; 188 } 189 190 this.push(buf); 191 } else { 192 this.push(null); 193 } 194 }); 195}; 196 197ReadStream.prototype._destroy = function(err, cb) { 198 if (typeof this.fd !== 'number') { 199 this.once('open', closeFsStream.bind(null, this, cb, err)); 200 return; 201 } 202 203 if (this[kIsPerformingIO]) { 204 this.once(kIoDone, (er) => closeFsStream(this, cb, err || er)); 205 return; 206 } 207 208 closeFsStream(this, cb, err); 209}; 210 211function closeFsStream(stream, cb, err) { 212 stream[kFs].close(stream.fd, (er) => { 213 stream.closed = true; 214 cb(er || err); 215 }); 216 217 stream.fd = null; 218} 219 220ReadStream.prototype.close = function(cb) { 221 if (typeof cb === 'function') finished(this, cb); 222 this.destroy(); 223}; 224 225ObjectDefineProperty(ReadStream.prototype, 'pending', { 226 get() { return this.fd === null; }, 227 configurable: true 228}); 229 230function WriteStream(path, options) { 231 if (!(this instanceof WriteStream)) 232 return new WriteStream(path, options); 233 234 options = copyObject(getOptions(options, {})); 235 236 // Only buffers are supported. 237 options.decodeStrings = true; 238 239 if (options.autoDestroy === undefined) { 240 options.autoDestroy = options.autoClose === undefined ? 241 true : (options.autoClose || false); 242 } 243 244 this[kFs] = options.fs || fs; 245 if (typeof this[kFs].open !== 'function') { 246 throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function', 247 this[kFs].open); 248 } 249 250 if (!this[kFs].write && !this[kFs].writev) { 251 throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', 252 this[kFs].write); 253 } 254 255 if (this[kFs].write && typeof this[kFs].write !== 'function') { 256 throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', 257 this[kFs].write); 258 } 259 260 if (this[kFs].writev && typeof this[kFs].writev !== 'function') { 261 throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function', 262 this[kFs].writev); 263 } 264 265 if (typeof this[kFs].close !== 'function') { 266 throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function', 267 this[kFs].close); 268 } 269 270 // It's enough to override either, in which case only one will be used. 271 if (!this[kFs].write) { 272 this._write = null; 273 } 274 if (!this[kFs].writev) { 275 this._writev = null; 276 } 277 278 Writable.call(this, options); 279 280 // Path will be ignored when fd is specified, so it can be falsy 281 this.path = toPathIfFileURL(path); 282 this.fd = options.fd === undefined ? null : options.fd; 283 this.flags = options.flags === undefined ? 'w' : options.flags; 284 this.mode = options.mode === undefined ? 0o666 : options.mode; 285 286 this.start = options.start; 287 this.autoClose = options.autoDestroy; 288 this.pos = undefined; 289 this.bytesWritten = 0; 290 this.closed = false; 291 this[kIsPerformingIO] = false; 292 293 if (this.start !== undefined) { 294 validateInteger(this.start, 'start', 0); 295 296 this.pos = this.start; 297 } 298 299 if (options.encoding) 300 this.setDefaultEncoding(options.encoding); 301 302 if (typeof this.fd !== 'number') 303 _openWriteFs(this); 304} 305ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype); 306ObjectSetPrototypeOf(WriteStream, Writable); 307 308WriteStream.prototype._final = function(callback) { 309 if (typeof this.fd !== 'number') { 310 return this.once('open', function() { 311 this._final(callback); 312 }); 313 } 314 315 callback(); 316}; 317 318const openWriteFs = deprecate(function() { 319 _openWriteFs(this); 320}, 'WriteStream.prototype.open() is deprecated', 'DEP0135'); 321WriteStream.prototype.open = openWriteFs; 322 323function _openWriteFs(stream) { 324 // Backwards compat for overriden open. 325 if (stream.open !== openWriteFs) { 326 stream.open(); 327 return; 328 } 329 330 stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => { 331 if (er) { 332 if (stream.autoClose) { 333 stream.destroy(); 334 } 335 stream.emit('error', er); 336 return; 337 } 338 339 stream.fd = fd; 340 stream.emit('open', fd); 341 stream.emit('ready'); 342 }); 343} 344 345 346WriteStream.prototype._write = function(data, encoding, cb) { 347 if (typeof this.fd !== 'number') { 348 return this.once('open', function() { 349 this._write(data, encoding, cb); 350 }); 351 } 352 353 if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write')); 354 355 this[kIsPerformingIO] = true; 356 this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { 357 this[kIsPerformingIO] = false; 358 // Tell ._destroy() that it's safe to close the fd now. 359 if (this.destroyed) { 360 cb(er); 361 return this.emit(kIoDone, er); 362 } 363 364 if (er) { 365 return cb(er); 366 } 367 this.bytesWritten += bytes; 368 cb(); 369 }); 370 371 if (this.pos !== undefined) 372 this.pos += data.length; 373}; 374 375 376WriteStream.prototype._writev = function(data, cb) { 377 if (typeof this.fd !== 'number') { 378 return this.once('open', function() { 379 this._writev(data, cb); 380 }); 381 } 382 383 if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write')); 384 385 const len = data.length; 386 const chunks = new Array(len); 387 let size = 0; 388 389 for (let i = 0; i < len; i++) { 390 const chunk = data[i].chunk; 391 392 chunks[i] = chunk; 393 size += chunk.length; 394 } 395 396 this[kIsPerformingIO] = true; 397 this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => { 398 this[kIsPerformingIO] = false; 399 // Tell ._destroy() that it's safe to close the fd now. 400 if (this.destroyed) { 401 cb(er); 402 return this.emit(kIoDone, er); 403 } 404 405 if (er) { 406 if (this.autoClose) { 407 this.destroy(er); 408 } 409 return cb(er); 410 } 411 this.bytesWritten += bytes; 412 cb(); 413 }); 414 415 if (this.pos !== undefined) 416 this.pos += size; 417}; 418 419 420WriteStream.prototype._destroy = ReadStream.prototype._destroy; 421WriteStream.prototype.close = function(cb) { 422 if (cb) { 423 if (this.closed) { 424 process.nextTick(cb); 425 return; 426 } 427 this.on('close', cb); 428 } 429 430 // If we are not autoClosing, we should call 431 // destroy on 'finish'. 432 if (!this.autoClose) { 433 this.on('finish', this.destroy); 434 } 435 436 // We use end() instead of destroy() because of 437 // https://github.com/nodejs/node/issues/2006 438 this.end(); 439}; 440 441// There is no shutdown() for files. 442WriteStream.prototype.destroySoon = WriteStream.prototype.end; 443 444ObjectDefineProperty(WriteStream.prototype, 'pending', { 445 get() { return this.fd === null; }, 446 configurable: true 447}); 448 449module.exports = { 450 ReadStream, 451 WriteStream 452}; 453