• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  Array,
5  FunctionPrototypeBind,
6  MathMin,
7  ObjectDefineProperty,
8  ObjectSetPrototypeOf,
9  PromisePrototypeThen,
10  ReflectApply,
11  Symbol,
12} = primordials;
13
14const {
15  ERR_INVALID_ARG_TYPE,
16  ERR_METHOD_NOT_IMPLEMENTED,
17  ERR_OUT_OF_RANGE,
18  ERR_STREAM_DESTROYED,
19  ERR_SYSTEM_ERROR,
20} = require('internal/errors').codes;
21const {
22  deprecate,
23  kEmptyObject,
24} = require('internal/util');
25const {
26  validateFunction,
27  validateInteger,
28} = require('internal/validators');
29const { errorOrDestroy } = require('internal/streams/destroy');
30const fs = require('fs');
31const { kRef, kUnref, FileHandle } = require('internal/fs/promises');
32const { Buffer } = require('buffer');
33const {
34  copyObject,
35  getOptions,
36  getValidatedFd,
37  validatePath,
38} = require('internal/fs/utils');
39const { Readable, Writable, finished } = require('stream');
40const { toPathIfFileURL } = require('internal/url');
41const kIoDone = Symbol('kIoDone');
42const kIsPerformingIO = Symbol('kIsPerformingIO');
43
44const kFs = Symbol('kFs');
45const kHandle = Symbol('kHandle');
46
47function _construct(callback) {
48  const stream = this;
49  if (typeof stream.fd === 'number') {
50    callback();
51    return;
52  }
53
54  if (stream.open !== openWriteFs && stream.open !== openReadFs) {
55    // Backwards compat for monkey patching open().
56    const orgEmit = stream.emit;
57    stream.emit = function(...args) {
58      if (args[0] === 'open') {
59        this.emit = orgEmit;
60        callback();
61        ReflectApply(orgEmit, this, args);
62      } else if (args[0] === 'error') {
63        this.emit = orgEmit;
64        callback(args[1]);
65      } else {
66        ReflectApply(orgEmit, this, args);
67      }
68    };
69    stream.open();
70  } else {
71    stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
72      if (er) {
73        callback(er);
74      } else {
75        stream.fd = fd;
76        callback();
77        stream.emit('open', stream.fd);
78        stream.emit('ready');
79      }
80    });
81  }
82}
83
84// This generates an fs operations structure for a FileHandle
85const FileHandleOperations = (handle) => {
86  return {
87    open: (path, flags, mode, cb) => {
88      throw new ERR_METHOD_NOT_IMPLEMENTED('open()');
89    },
90    close: (fd, cb) => {
91      handle[kUnref]();
92      PromisePrototypeThen(handle.close(),
93                           () => cb(), cb);
94    },
95    read: (fd, buf, offset, length, pos, cb) => {
96      PromisePrototypeThen(handle.read(buf, offset, length, pos),
97                           (r) => cb(null, r.bytesRead, r.buffer),
98                           (err) => cb(err, 0, buf));
99    },
100    write: (fd, buf, offset, length, pos, cb) => {
101      PromisePrototypeThen(handle.write(buf, offset, length, pos),
102                           (r) => cb(null, r.bytesWritten, r.buffer),
103                           (err) => cb(err, 0, buf));
104    },
105    writev: (fd, buffers, pos, cb) => {
106      PromisePrototypeThen(handle.writev(buffers, pos),
107                           (r) => cb(null, r.bytesWritten, r.buffers),
108                           (err) => cb(err, 0, buffers));
109    },
110  };
111};
112
113function close(stream, err, cb) {
114  if (!stream.fd) {
115    cb(err);
116  } else {
117    stream[kFs].close(stream.fd, (er) => {
118      cb(er || err);
119    });
120    stream.fd = null;
121  }
122}
123
124function importFd(stream, options) {
125  if (typeof options.fd === 'number') {
126    // When fd is a raw descriptor, we must keep our fingers crossed
127    // that the descriptor won't get closed, or worse, replaced with
128    // another one
129    // https://github.com/nodejs/node/issues/35862
130    stream[kFs] = options.fs || fs;
131    return options.fd;
132  } else if (typeof options.fd === 'object' &&
133             options.fd instanceof FileHandle) {
134    // When fd is a FileHandle we can listen for 'close' events
135    if (options.fs) {
136      // FileHandle is not supported with custom fs operations
137      throw new ERR_METHOD_NOT_IMPLEMENTED('FileHandle with fs');
138    }
139    stream[kHandle] = options.fd;
140    stream[kFs] = FileHandleOperations(stream[kHandle]);
141    stream[kHandle][kRef]();
142    options.fd.on('close', FunctionPrototypeBind(stream.close, stream));
143    return options.fd.fd;
144  }
145
146  throw ERR_INVALID_ARG_TYPE('options.fd',
147                             ['number', 'FileHandle'], options.fd);
148}
149
150function ReadStream(path, options) {
151  if (!(this instanceof ReadStream))
152    return new ReadStream(path, options);
153
154  // A little bit bigger buffer and water marks by default
155  options = copyObject(getOptions(options, kEmptyObject));
156  if (options.highWaterMark === undefined)
157    options.highWaterMark = 64 * 1024;
158
159  if (options.autoDestroy === undefined) {
160    options.autoDestroy = false;
161  }
162
163  if (options.fd == null) {
164    this.fd = null;
165    this[kFs] = options.fs || fs;
166    validateFunction(this[kFs].open, 'options.fs.open');
167
168    // Path will be ignored when fd is specified, so it can be falsy
169    this.path = toPathIfFileURL(path);
170    this.flags = options.flags === undefined ? 'r' : options.flags;
171    this.mode = options.mode === undefined ? 0o666 : options.mode;
172
173    validatePath(this.path);
174  } else {
175    this.fd = getValidatedFd(importFd(this, options));
176  }
177
178  options.autoDestroy = options.autoClose === undefined ?
179    true : options.autoClose;
180
181  validateFunction(this[kFs].read, 'options.fs.read');
182
183  if (options.autoDestroy) {
184    validateFunction(this[kFs].close, 'options.fs.close');
185  }
186
187  this.start = options.start;
188  this.end = options.end;
189  this.pos = undefined;
190  this.bytesRead = 0;
191  this[kIsPerformingIO] = false;
192
193  if (this.start !== undefined) {
194    validateInteger(this.start, 'start', 0);
195
196    this.pos = this.start;
197  }
198
199
200  if (this.end === undefined) {
201    this.end = Infinity;
202  } else if (this.end !== Infinity) {
203    validateInteger(this.end, 'end', 0);
204
205    if (this.start !== undefined && this.start > this.end) {
206      throw new ERR_OUT_OF_RANGE(
207        'start',
208        `<= "end" (here: ${this.end})`,
209        this.start,
210      );
211    }
212  }
213
214  ReflectApply(Readable, this, [options]);
215}
216ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
217ObjectSetPrototypeOf(ReadStream, Readable);
218
219ObjectDefineProperty(ReadStream.prototype, 'autoClose', {
220  __proto__: null,
221  get() {
222    return this._readableState.autoDestroy;
223  },
224  set(val) {
225    this._readableState.autoDestroy = val;
226  },
227});
228
229const openReadFs = deprecate(function() {
230  // Noop.
231}, 'ReadStream.prototype.open() is deprecated', 'DEP0135');
232ReadStream.prototype.open = openReadFs;
233
234ReadStream.prototype._construct = _construct;
235
236ReadStream.prototype._read = function(n) {
237  n = this.pos !== undefined ?
238    MathMin(this.end - this.pos + 1, n) :
239    MathMin(this.end - this.bytesRead + 1, n);
240
241  if (n <= 0) {
242    this.push(null);
243    return;
244  }
245
246  const buf = Buffer.allocUnsafeSlow(n);
247
248  this[kIsPerformingIO] = true;
249  this[kFs]
250    .read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => {
251      this[kIsPerformingIO] = false;
252
253      // Tell ._destroy() that it's safe to close the fd now.
254      if (this.destroyed) {
255        this.emit(kIoDone, er);
256        return;
257      }
258
259      if (er) {
260        errorOrDestroy(this, er);
261      } else if (bytesRead > 0) {
262        if (this.pos !== undefined) {
263          this.pos += bytesRead;
264        }
265
266        this.bytesRead += bytesRead;
267
268        if (bytesRead !== buf.length) {
269          // Slow path. Shrink to fit.
270          // Copy instead of slice so that we don't retain
271          // large backing buffer for small reads.
272          const dst = Buffer.allocUnsafeSlow(bytesRead);
273          buf.copy(dst, 0, 0, bytesRead);
274          buf = dst;
275        }
276
277        this.push(buf);
278      } else {
279        this.push(null);
280      }
281    });
282};
283
284ReadStream.prototype._destroy = function(err, cb) {
285  // Usually for async IO it is safe to close a file descriptor
286  // even when there are pending operations. However, due to platform
287  // differences file IO is implemented using synchronous operations
288  // running in a thread pool. Therefore, file descriptors are not safe
289  // to close while used in a pending read or write operation. Wait for
290  // any pending IO (kIsPerformingIO) to complete (kIoDone).
291  if (this[kIsPerformingIO]) {
292    this.once(kIoDone, (er) => close(this, err || er, cb));
293  } else {
294    close(this, err, cb);
295  }
296};
297
298ReadStream.prototype.close = function(cb) {
299  if (typeof cb === 'function') finished(this, cb);
300  this.destroy();
301};
302
303ObjectDefineProperty(ReadStream.prototype, 'pending', {
304  __proto__: null,
305  get() { return this.fd === null; },
306  configurable: true,
307});
308
309function WriteStream(path, options) {
310  if (!(this instanceof WriteStream))
311    return new WriteStream(path, options);
312
313  options = copyObject(getOptions(options, kEmptyObject));
314
315  // Only buffers are supported.
316  options.decodeStrings = true;
317
318  if (options.fd == null) {
319    this.fd = null;
320    this[kFs] = options.fs || fs;
321    validateFunction(this[kFs].open, 'options.fs.open');
322
323    // Path will be ignored when fd is specified, so it can be falsy
324    this.path = toPathIfFileURL(path);
325    this.flags = options.flags === undefined ? 'w' : options.flags;
326    this.mode = options.mode === undefined ? 0o666 : options.mode;
327
328    validatePath(this.path);
329  } else {
330    this.fd = getValidatedFd(importFd(this, options));
331  }
332
333  options.autoDestroy = options.autoClose === undefined ?
334    true : options.autoClose;
335
336  if (!this[kFs].write && !this[kFs].writev) {
337    throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
338                                   this[kFs].write);
339  }
340
341  if (this[kFs].write) {
342    validateFunction(this[kFs].write, 'options.fs.write');
343  }
344
345  if (this[kFs].writev) {
346    validateFunction(this[kFs].writev, 'options.fs.writev');
347  }
348
349  if (options.autoDestroy) {
350    validateFunction(this[kFs].close, 'options.fs.close');
351  }
352
353  // It's enough to override either, in which case only one will be used.
354  if (!this[kFs].write) {
355    this._write = null;
356  }
357  if (!this[kFs].writev) {
358    this._writev = null;
359  }
360
361  this.start = options.start;
362  this.pos = undefined;
363  this.bytesWritten = 0;
364  this[kIsPerformingIO] = false;
365
366  if (this.start !== undefined) {
367    validateInteger(this.start, 'start', 0);
368
369    this.pos = this.start;
370  }
371
372  ReflectApply(Writable, this, [options]);
373
374  if (options.encoding)
375    this.setDefaultEncoding(options.encoding);
376}
377ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype);
378ObjectSetPrototypeOf(WriteStream, Writable);
379
380ObjectDefineProperty(WriteStream.prototype, 'autoClose', {
381  __proto__: null,
382  get() {
383    return this._writableState.autoDestroy;
384  },
385  set(val) {
386    this._writableState.autoDestroy = val;
387  },
388});
389
390const openWriteFs = deprecate(function() {
391  // Noop.
392}, 'WriteStream.prototype.open() is deprecated', 'DEP0135');
393WriteStream.prototype.open = openWriteFs;
394
395WriteStream.prototype._construct = _construct;
396
397function writeAll(data, size, pos, cb, retries = 0) {
398  this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten, buffer) => {
399    // No data currently available and operation should be retried later.
400    if (er?.code === 'EAGAIN') {
401      er = null;
402      bytesWritten = 0;
403    }
404
405    if (this.destroyed || er) {
406      return cb(er || new ERR_STREAM_DESTROYED('write'));
407    }
408
409    this.bytesWritten += bytesWritten;
410
411    retries = bytesWritten ? 0 : retries + 1;
412    size -= bytesWritten;
413    pos += bytesWritten;
414
415    // Try writing non-zero number of bytes up to 5 times.
416    if (retries > 5) {
417      cb(new ERR_SYSTEM_ERROR('write failed'));
418    } else if (size) {
419      writeAll.call(this, buffer.slice(bytesWritten), size, pos, cb, retries);
420    } else {
421      cb();
422    }
423  });
424}
425
426function writevAll(chunks, size, pos, cb, retries = 0) {
427  this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => {
428    // No data currently available and operation should be retried later.
429    if (er?.code === 'EAGAIN') {
430      er = null;
431      bytesWritten = 0;
432    }
433
434    if (this.destroyed || er) {
435      return cb(er || new ERR_STREAM_DESTROYED('writev'));
436    }
437
438    this.bytesWritten += bytesWritten;
439
440    retries = bytesWritten ? 0 : retries + 1;
441    size -= bytesWritten;
442    pos += bytesWritten;
443
444    // Try writing non-zero number of bytes up to 5 times.
445    if (retries > 5) {
446      cb(new ERR_SYSTEM_ERROR('writev failed'));
447    } else if (size) {
448      writevAll.call(this, [Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries);
449    } else {
450      cb();
451    }
452  });
453}
454
455WriteStream.prototype._write = function(data, encoding, cb) {
456  this[kIsPerformingIO] = true;
457  writeAll.call(this, data, data.length, this.pos, (er) => {
458    this[kIsPerformingIO] = false;
459    if (this.destroyed) {
460      // Tell ._destroy() that it's safe to close the fd now.
461      cb(er);
462      return this.emit(kIoDone, er);
463    }
464
465    cb(er);
466  });
467
468  if (this.pos !== undefined)
469    this.pos += data.length;
470};
471
472WriteStream.prototype._writev = function(data, cb) {
473  const len = data.length;
474  const chunks = new Array(len);
475  let size = 0;
476
477  for (let i = 0; i < len; i++) {
478    const chunk = data[i].chunk;
479
480    chunks[i] = chunk;
481    size += chunk.length;
482  }
483
484  this[kIsPerformingIO] = true;
485  writevAll.call(this, chunks, size, this.pos, (er) => {
486    this[kIsPerformingIO] = false;
487    if (this.destroyed) {
488      // Tell ._destroy() that it's safe to close the fd now.
489      cb(er);
490      return this.emit(kIoDone, er);
491    }
492
493    cb(er);
494  });
495
496  if (this.pos !== undefined)
497    this.pos += size;
498};
499
500WriteStream.prototype._destroy = function(err, cb) {
501  // Usually for async IO it is safe to close a file descriptor
502  // even when there are pending operations. However, due to platform
503  // differences file IO is implemented using synchronous operations
504  // running in a thread pool. Therefore, file descriptors are not safe
505  // to close while used in a pending read or write operation. Wait for
506  // any pending IO (kIsPerformingIO) to complete (kIoDone).
507  if (this[kIsPerformingIO]) {
508    this.once(kIoDone, (er) => close(this, err || er, cb));
509  } else {
510    close(this, err, cb);
511  }
512};
513
514WriteStream.prototype.close = function(cb) {
515  if (cb) {
516    if (this.closed) {
517      process.nextTick(cb);
518      return;
519    }
520    this.on('close', cb);
521  }
522
523  // If we are not autoClosing, we should call
524  // destroy on 'finish'.
525  if (!this.autoClose) {
526    this.on('finish', this.destroy);
527  }
528
529  // We use end() instead of destroy() because of
530  // https://github.com/nodejs/node/issues/2006
531  this.end();
532};
533
534// There is no shutdown() for files.
535WriteStream.prototype.destroySoon = WriteStream.prototype.end;
536
537ObjectDefineProperty(WriteStream.prototype, 'pending', {
538  __proto__: null,
539  get() { return this.fd === null; },
540  configurable: true,
541});
542
543module.exports = {
544  ReadStream,
545  WriteStream,
546};
547