1'use strict'; 2 3const { 4 Array, 5 FunctionPrototypeBind, 6 MathMin, 7 ObjectDefineProperty, 8 ObjectSetPrototypeOf, 9 PromisePrototypeThen, 10 ReflectApply, 11 Symbol, 12} = primordials; 13 14const { 15 ERR_INVALID_ARG_TYPE, 16 ERR_METHOD_NOT_IMPLEMENTED, 17 ERR_OUT_OF_RANGE, 18 ERR_STREAM_DESTROYED, 19 ERR_SYSTEM_ERROR, 20} = require('internal/errors').codes; 21const { 22 deprecate, 23 kEmptyObject, 24} = require('internal/util'); 25const { 26 validateFunction, 27 validateInteger, 28} = require('internal/validators'); 29const { errorOrDestroy } = require('internal/streams/destroy'); 30const fs = require('fs'); 31const { kRef, kUnref, FileHandle } = require('internal/fs/promises'); 32const { Buffer } = require('buffer'); 33const { 34 copyObject, 35 getOptions, 36 getValidatedFd, 37 validatePath, 38} = require('internal/fs/utils'); 39const { Readable, Writable, finished } = require('stream'); 40const { toPathIfFileURL } = require('internal/url'); 41const kIoDone = Symbol('kIoDone'); 42const kIsPerformingIO = Symbol('kIsPerformingIO'); 43 44const kFs = Symbol('kFs'); 45const kHandle = Symbol('kHandle'); 46 47function _construct(callback) { 48 const stream = this; 49 if (typeof stream.fd === 'number') { 50 callback(); 51 return; 52 } 53 54 if (stream.open !== openWriteFs && stream.open !== openReadFs) { 55 // Backwards compat for monkey patching open(). 56 const orgEmit = stream.emit; 57 stream.emit = function(...args) { 58 if (args[0] === 'open') { 59 this.emit = orgEmit; 60 callback(); 61 ReflectApply(orgEmit, this, args); 62 } else if (args[0] === 'error') { 63 this.emit = orgEmit; 64 callback(args[1]); 65 } else { 66 ReflectApply(orgEmit, this, args); 67 } 68 }; 69 stream.open(); 70 } else { 71 stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => { 72 if (er) { 73 callback(er); 74 } else { 75 stream.fd = fd; 76 callback(); 77 stream.emit('open', stream.fd); 78 stream.emit('ready'); 79 } 80 }); 81 } 82} 83 84// This generates an fs operations structure for a FileHandle 85const FileHandleOperations = (handle) => { 86 return { 87 open: (path, flags, mode, cb) => { 88 throw new ERR_METHOD_NOT_IMPLEMENTED('open()'); 89 }, 90 close: (fd, cb) => { 91 handle[kUnref](); 92 PromisePrototypeThen(handle.close(), 93 () => cb(), cb); 94 }, 95 read: (fd, buf, offset, length, pos, cb) => { 96 PromisePrototypeThen(handle.read(buf, offset, length, pos), 97 (r) => cb(null, r.bytesRead, r.buffer), 98 (err) => cb(err, 0, buf)); 99 }, 100 write: (fd, buf, offset, length, pos, cb) => { 101 PromisePrototypeThen(handle.write(buf, offset, length, pos), 102 (r) => cb(null, r.bytesWritten, r.buffer), 103 (err) => cb(err, 0, buf)); 104 }, 105 writev: (fd, buffers, pos, cb) => { 106 PromisePrototypeThen(handle.writev(buffers, pos), 107 (r) => cb(null, r.bytesWritten, r.buffers), 108 (err) => cb(err, 0, buffers)); 109 }, 110 }; 111}; 112 113function close(stream, err, cb) { 114 if (!stream.fd) { 115 cb(err); 116 } else { 117 stream[kFs].close(stream.fd, (er) => { 118 cb(er || err); 119 }); 120 stream.fd = null; 121 } 122} 123 124function importFd(stream, options) { 125 if (typeof options.fd === 'number') { 126 // When fd is a raw descriptor, we must keep our fingers crossed 127 // that the descriptor won't get closed, or worse, replaced with 128 // another one 129 // https://github.com/nodejs/node/issues/35862 130 stream[kFs] = options.fs || fs; 131 return options.fd; 132 } else if (typeof options.fd === 'object' && 133 options.fd instanceof FileHandle) { 134 // When fd is a FileHandle we can listen for 'close' events 135 if (options.fs) { 136 // FileHandle is not supported with custom fs operations 137 throw new ERR_METHOD_NOT_IMPLEMENTED('FileHandle with fs'); 138 } 139 stream[kHandle] = options.fd; 140 stream[kFs] = FileHandleOperations(stream[kHandle]); 141 stream[kHandle][kRef](); 142 options.fd.on('close', FunctionPrototypeBind(stream.close, stream)); 143 return options.fd.fd; 144 } 145 146 throw ERR_INVALID_ARG_TYPE('options.fd', 147 ['number', 'FileHandle'], options.fd); 148} 149 150function ReadStream(path, options) { 151 if (!(this instanceof ReadStream)) 152 return new ReadStream(path, options); 153 154 // A little bit bigger buffer and water marks by default 155 options = copyObject(getOptions(options, kEmptyObject)); 156 if (options.highWaterMark === undefined) 157 options.highWaterMark = 64 * 1024; 158 159 if (options.autoDestroy === undefined) { 160 options.autoDestroy = false; 161 } 162 163 if (options.fd == null) { 164 this.fd = null; 165 this[kFs] = options.fs || fs; 166 validateFunction(this[kFs].open, 'options.fs.open'); 167 168 // Path will be ignored when fd is specified, so it can be falsy 169 this.path = toPathIfFileURL(path); 170 this.flags = options.flags === undefined ? 'r' : options.flags; 171 this.mode = options.mode === undefined ? 0o666 : options.mode; 172 173 validatePath(this.path); 174 } else { 175 this.fd = getValidatedFd(importFd(this, options)); 176 } 177 178 options.autoDestroy = options.autoClose === undefined ? 179 true : options.autoClose; 180 181 validateFunction(this[kFs].read, 'options.fs.read'); 182 183 if (options.autoDestroy) { 184 validateFunction(this[kFs].close, 'options.fs.close'); 185 } 186 187 this.start = options.start; 188 this.end = options.end; 189 this.pos = undefined; 190 this.bytesRead = 0; 191 this[kIsPerformingIO] = false; 192 193 if (this.start !== undefined) { 194 validateInteger(this.start, 'start', 0); 195 196 this.pos = this.start; 197 } 198 199 200 if (this.end === undefined) { 201 this.end = Infinity; 202 } else if (this.end !== Infinity) { 203 validateInteger(this.end, 'end', 0); 204 205 if (this.start !== undefined && this.start > this.end) { 206 throw new ERR_OUT_OF_RANGE( 207 'start', 208 `<= "end" (here: ${this.end})`, 209 this.start, 210 ); 211 } 212 } 213 214 ReflectApply(Readable, this, [options]); 215} 216ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype); 217ObjectSetPrototypeOf(ReadStream, Readable); 218 219ObjectDefineProperty(ReadStream.prototype, 'autoClose', { 220 __proto__: null, 221 get() { 222 return this._readableState.autoDestroy; 223 }, 224 set(val) { 225 this._readableState.autoDestroy = val; 226 }, 227}); 228 229const openReadFs = deprecate(function() { 230 // Noop. 231}, 'ReadStream.prototype.open() is deprecated', 'DEP0135'); 232ReadStream.prototype.open = openReadFs; 233 234ReadStream.prototype._construct = _construct; 235 236ReadStream.prototype._read = function(n) { 237 n = this.pos !== undefined ? 238 MathMin(this.end - this.pos + 1, n) : 239 MathMin(this.end - this.bytesRead + 1, n); 240 241 if (n <= 0) { 242 this.push(null); 243 return; 244 } 245 246 const buf = Buffer.allocUnsafeSlow(n); 247 248 this[kIsPerformingIO] = true; 249 this[kFs] 250 .read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => { 251 this[kIsPerformingIO] = false; 252 253 // Tell ._destroy() that it's safe to close the fd now. 254 if (this.destroyed) { 255 this.emit(kIoDone, er); 256 return; 257 } 258 259 if (er) { 260 errorOrDestroy(this, er); 261 } else if (bytesRead > 0) { 262 if (this.pos !== undefined) { 263 this.pos += bytesRead; 264 } 265 266 this.bytesRead += bytesRead; 267 268 if (bytesRead !== buf.length) { 269 // Slow path. Shrink to fit. 270 // Copy instead of slice so that we don't retain 271 // large backing buffer for small reads. 272 const dst = Buffer.allocUnsafeSlow(bytesRead); 273 buf.copy(dst, 0, 0, bytesRead); 274 buf = dst; 275 } 276 277 this.push(buf); 278 } else { 279 this.push(null); 280 } 281 }); 282}; 283 284ReadStream.prototype._destroy = function(err, cb) { 285 // Usually for async IO it is safe to close a file descriptor 286 // even when there are pending operations. However, due to platform 287 // differences file IO is implemented using synchronous operations 288 // running in a thread pool. Therefore, file descriptors are not safe 289 // to close while used in a pending read or write operation. Wait for 290 // any pending IO (kIsPerformingIO) to complete (kIoDone). 291 if (this[kIsPerformingIO]) { 292 this.once(kIoDone, (er) => close(this, err || er, cb)); 293 } else { 294 close(this, err, cb); 295 } 296}; 297 298ReadStream.prototype.close = function(cb) { 299 if (typeof cb === 'function') finished(this, cb); 300 this.destroy(); 301}; 302 303ObjectDefineProperty(ReadStream.prototype, 'pending', { 304 __proto__: null, 305 get() { return this.fd === null; }, 306 configurable: true, 307}); 308 309function WriteStream(path, options) { 310 if (!(this instanceof WriteStream)) 311 return new WriteStream(path, options); 312 313 options = copyObject(getOptions(options, kEmptyObject)); 314 315 // Only buffers are supported. 316 options.decodeStrings = true; 317 318 if (options.fd == null) { 319 this.fd = null; 320 this[kFs] = options.fs || fs; 321 validateFunction(this[kFs].open, 'options.fs.open'); 322 323 // Path will be ignored when fd is specified, so it can be falsy 324 this.path = toPathIfFileURL(path); 325 this.flags = options.flags === undefined ? 'w' : options.flags; 326 this.mode = options.mode === undefined ? 0o666 : options.mode; 327 328 validatePath(this.path); 329 } else { 330 this.fd = getValidatedFd(importFd(this, options)); 331 } 332 333 options.autoDestroy = options.autoClose === undefined ? 334 true : options.autoClose; 335 336 if (!this[kFs].write && !this[kFs].writev) { 337 throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', 338 this[kFs].write); 339 } 340 341 if (this[kFs].write) { 342 validateFunction(this[kFs].write, 'options.fs.write'); 343 } 344 345 if (this[kFs].writev) { 346 validateFunction(this[kFs].writev, 'options.fs.writev'); 347 } 348 349 if (options.autoDestroy) { 350 validateFunction(this[kFs].close, 'options.fs.close'); 351 } 352 353 // It's enough to override either, in which case only one will be used. 354 if (!this[kFs].write) { 355 this._write = null; 356 } 357 if (!this[kFs].writev) { 358 this._writev = null; 359 } 360 361 this.start = options.start; 362 this.pos = undefined; 363 this.bytesWritten = 0; 364 this[kIsPerformingIO] = false; 365 366 if (this.start !== undefined) { 367 validateInteger(this.start, 'start', 0); 368 369 this.pos = this.start; 370 } 371 372 ReflectApply(Writable, this, [options]); 373 374 if (options.encoding) 375 this.setDefaultEncoding(options.encoding); 376} 377ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype); 378ObjectSetPrototypeOf(WriteStream, Writable); 379 380ObjectDefineProperty(WriteStream.prototype, 'autoClose', { 381 __proto__: null, 382 get() { 383 return this._writableState.autoDestroy; 384 }, 385 set(val) { 386 this._writableState.autoDestroy = val; 387 }, 388}); 389 390const openWriteFs = deprecate(function() { 391 // Noop. 392}, 'WriteStream.prototype.open() is deprecated', 'DEP0135'); 393WriteStream.prototype.open = openWriteFs; 394 395WriteStream.prototype._construct = _construct; 396 397function writeAll(data, size, pos, cb, retries = 0) { 398 this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten, buffer) => { 399 // No data currently available and operation should be retried later. 400 if (er?.code === 'EAGAIN') { 401 er = null; 402 bytesWritten = 0; 403 } 404 405 if (this.destroyed || er) { 406 return cb(er || new ERR_STREAM_DESTROYED('write')); 407 } 408 409 this.bytesWritten += bytesWritten; 410 411 retries = bytesWritten ? 0 : retries + 1; 412 size -= bytesWritten; 413 pos += bytesWritten; 414 415 // Try writing non-zero number of bytes up to 5 times. 416 if (retries > 5) { 417 cb(new ERR_SYSTEM_ERROR('write failed')); 418 } else if (size) { 419 writeAll.call(this, buffer.slice(bytesWritten), size, pos, cb, retries); 420 } else { 421 cb(); 422 } 423 }); 424} 425 426function writevAll(chunks, size, pos, cb, retries = 0) { 427 this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => { 428 // No data currently available and operation should be retried later. 429 if (er?.code === 'EAGAIN') { 430 er = null; 431 bytesWritten = 0; 432 } 433 434 if (this.destroyed || er) { 435 return cb(er || new ERR_STREAM_DESTROYED('writev')); 436 } 437 438 this.bytesWritten += bytesWritten; 439 440 retries = bytesWritten ? 0 : retries + 1; 441 size -= bytesWritten; 442 pos += bytesWritten; 443 444 // Try writing non-zero number of bytes up to 5 times. 445 if (retries > 5) { 446 cb(new ERR_SYSTEM_ERROR('writev failed')); 447 } else if (size) { 448 writevAll.call(this, [Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries); 449 } else { 450 cb(); 451 } 452 }); 453} 454 455WriteStream.prototype._write = function(data, encoding, cb) { 456 this[kIsPerformingIO] = true; 457 writeAll.call(this, data, data.length, this.pos, (er) => { 458 this[kIsPerformingIO] = false; 459 if (this.destroyed) { 460 // Tell ._destroy() that it's safe to close the fd now. 461 cb(er); 462 return this.emit(kIoDone, er); 463 } 464 465 cb(er); 466 }); 467 468 if (this.pos !== undefined) 469 this.pos += data.length; 470}; 471 472WriteStream.prototype._writev = function(data, cb) { 473 const len = data.length; 474 const chunks = new Array(len); 475 let size = 0; 476 477 for (let i = 0; i < len; i++) { 478 const chunk = data[i].chunk; 479 480 chunks[i] = chunk; 481 size += chunk.length; 482 } 483 484 this[kIsPerformingIO] = true; 485 writevAll.call(this, chunks, size, this.pos, (er) => { 486 this[kIsPerformingIO] = false; 487 if (this.destroyed) { 488 // Tell ._destroy() that it's safe to close the fd now. 489 cb(er); 490 return this.emit(kIoDone, er); 491 } 492 493 cb(er); 494 }); 495 496 if (this.pos !== undefined) 497 this.pos += size; 498}; 499 500WriteStream.prototype._destroy = function(err, cb) { 501 // Usually for async IO it is safe to close a file descriptor 502 // even when there are pending operations. However, due to platform 503 // differences file IO is implemented using synchronous operations 504 // running in a thread pool. Therefore, file descriptors are not safe 505 // to close while used in a pending read or write operation. Wait for 506 // any pending IO (kIsPerformingIO) to complete (kIoDone). 507 if (this[kIsPerformingIO]) { 508 this.once(kIoDone, (er) => close(this, err || er, cb)); 509 } else { 510 close(this, err, cb); 511 } 512}; 513 514WriteStream.prototype.close = function(cb) { 515 if (cb) { 516 if (this.closed) { 517 process.nextTick(cb); 518 return; 519 } 520 this.on('close', cb); 521 } 522 523 // If we are not autoClosing, we should call 524 // destroy on 'finish'. 525 if (!this.autoClose) { 526 this.on('finish', this.destroy); 527 } 528 529 // We use end() instead of destroy() because of 530 // https://github.com/nodejs/node/issues/2006 531 this.end(); 532}; 533 534// There is no shutdown() for files. 535WriteStream.prototype.destroySoon = WriteStream.prototype.end; 536 537ObjectDefineProperty(WriteStream.prototype, 'pending', { 538 __proto__: null, 539 get() { return this.fd === null; }, 540 configurable: true, 541}); 542 543module.exports = { 544 ReadStream, 545 WriteStream, 546}; 547