• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  Array,
5  MathMin,
6  ObjectDefineProperty,
7  ObjectSetPrototypeOf,
8  Symbol,
9} = primordials;
10
11const {
12  ERR_INVALID_ARG_TYPE,
13  ERR_OUT_OF_RANGE,
14  ERR_STREAM_DESTROYED
15} = require('internal/errors').codes;
16const { deprecate } = require('internal/util');
17const { validateInteger } = require('internal/validators');
18const fs = require('fs');
19const { Buffer } = require('buffer');
20const {
21  copyObject,
22  getOptions,
23} = require('internal/fs/utils');
24const { Readable, Writable, finished } = require('stream');
25const { toPathIfFileURL } = require('internal/url');
26const kIoDone = Symbol('kIoDone');
27const kIsPerformingIO = Symbol('kIsPerformingIO');
28
29const kFs = Symbol('kFs');
30
31function ReadStream(path, options) {
32  if (!(this instanceof ReadStream))
33    return new ReadStream(path, options);
34
35  // A little bit bigger buffer and water marks by default
36  options = copyObject(getOptions(options, {}));
37  if (options.highWaterMark === undefined)
38    options.highWaterMark = 64 * 1024;
39
40  if (options.autoDestroy === undefined) {
41    options.autoDestroy = false;
42  }
43
44  this[kFs] = options.fs || fs;
45
46  if (typeof this[kFs].open !== 'function') {
47    throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
48                                   this[kFs].open);
49  }
50
51  if (typeof this[kFs].read !== 'function') {
52    throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function',
53                                   this[kFs].read);
54  }
55
56  if (typeof this[kFs].close !== 'function') {
57    throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
58                                   this[kFs].close);
59  }
60
61  Readable.call(this, options);
62
63  // Path will be ignored when fd is specified, so it can be falsy
64  this.path = toPathIfFileURL(path);
65  this.fd = options.fd === undefined ? null : options.fd;
66  this.flags = options.flags === undefined ? 'r' : options.flags;
67  this.mode = options.mode === undefined ? 0o666 : options.mode;
68
69  this.start = options.start;
70  this.end = options.end;
71  this.autoClose = options.autoClose === undefined ? true : options.autoClose;
72  this.pos = undefined;
73  this.bytesRead = 0;
74  this.closed = false;
75  this[kIsPerformingIO] = false;
76
77  if (this.start !== undefined) {
78    validateInteger(this.start, 'start', 0);
79
80    this.pos = this.start;
81  }
82
83  if (this.end === undefined) {
84    this.end = Infinity;
85  } else if (this.end !== Infinity) {
86    validateInteger(this.end, 'end', 0);
87
88    if (this.start !== undefined && this.start > this.end) {
89      throw new ERR_OUT_OF_RANGE(
90        'start',
91        `<= "end" (here: ${this.end})`,
92        this.start
93      );
94    }
95  }
96
97  if (typeof this.fd !== 'number')
98    _openReadFs(this);
99
100  this.on('end', function() {
101    if (this.autoClose) {
102      this.destroy();
103    }
104  });
105}
106ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
107ObjectSetPrototypeOf(ReadStream, Readable);
108
109const openReadFs = deprecate(function() {
110  _openReadFs(this);
111}, 'ReadStream.prototype.open() is deprecated', 'DEP0135');
112ReadStream.prototype.open = openReadFs;
113
114function _openReadFs(stream) {
115  // Backwards compat for overriden open.
116  if (stream.open !== openReadFs) {
117    stream.open();
118    return;
119  }
120
121  stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
122    if (er) {
123      if (stream.autoClose) {
124        stream.destroy();
125      }
126      stream.emit('error', er);
127      return;
128    }
129
130    stream.fd = fd;
131    stream.emit('open', fd);
132    stream.emit('ready');
133    // Start the flow of data.
134    stream.read();
135  });
136}
137
138ReadStream.prototype._read = function(n) {
139  if (typeof this.fd !== 'number') {
140    return this.once('open', function() {
141      this._read(n);
142    });
143  }
144
145  if (this.destroyed) return;
146
147  n = this.pos !== undefined ?
148    MathMin(this.end - this.pos + 1, n) :
149    MathMin(this.end - this.bytesRead + 1, n);
150
151  if (n <= 0) {
152    this.push(null);
153    return;
154  }
155
156  const buf = Buffer.allocUnsafeSlow(n);
157
158  this[kIsPerformingIO] = true;
159  this[kFs]
160    .read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => {
161      this[kIsPerformingIO] = false;
162
163      // Tell ._destroy() that it's safe to close the fd now.
164      if (this.destroyed) {
165        this.emit(kIoDone, er);
166        return;
167      }
168
169      if (er) {
170        if (this.autoClose) {
171          this.destroy();
172        }
173        this.emit('error', er);
174      } else if (bytesRead > 0) {
175        if (this.pos !== undefined) {
176          this.pos += bytesRead;
177        }
178
179        this.bytesRead += bytesRead;
180
181        if (bytesRead !== buf.length) {
182          // Slow path. Shrink to fit.
183          // Copy instead of slice so that we don't retain
184          // large backing buffer for small reads.
185          const dst = Buffer.allocUnsafeSlow(bytesRead);
186          buf.copy(dst, 0, 0, bytesRead);
187          buf = dst;
188        }
189
190        this.push(buf);
191      } else {
192        this.push(null);
193      }
194    });
195};
196
197ReadStream.prototype._destroy = function(err, cb) {
198  if (typeof this.fd !== 'number') {
199    this.once('open', closeFsStream.bind(null, this, cb, err));
200    return;
201  }
202
203  if (this[kIsPerformingIO]) {
204    this.once(kIoDone, (er) => closeFsStream(this, cb, err || er));
205    return;
206  }
207
208  closeFsStream(this, cb, err);
209};
210
211function closeFsStream(stream, cb, err) {
212  stream[kFs].close(stream.fd, (er) => {
213    stream.closed = true;
214    cb(er || err);
215  });
216
217  stream.fd = null;
218}
219
220ReadStream.prototype.close = function(cb) {
221  if (typeof cb === 'function') finished(this, cb);
222  this.destroy();
223};
224
225ObjectDefineProperty(ReadStream.prototype, 'pending', {
226  get() { return this.fd === null; },
227  configurable: true
228});
229
230function WriteStream(path, options) {
231  if (!(this instanceof WriteStream))
232    return new WriteStream(path, options);
233
234  options = copyObject(getOptions(options, {}));
235
236  // Only buffers are supported.
237  options.decodeStrings = true;
238
239  if (options.autoDestroy === undefined) {
240    options.autoDestroy = options.autoClose === undefined ?
241      true : (options.autoClose || false);
242  }
243
244  this[kFs] = options.fs || fs;
245  if (typeof this[kFs].open !== 'function') {
246    throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
247                                   this[kFs].open);
248  }
249
250  if (!this[kFs].write && !this[kFs].writev) {
251    throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
252                                   this[kFs].write);
253  }
254
255  if (this[kFs].write && typeof this[kFs].write !== 'function') {
256    throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
257                                   this[kFs].write);
258  }
259
260  if (this[kFs].writev && typeof this[kFs].writev !== 'function') {
261    throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function',
262                                   this[kFs].writev);
263  }
264
265  if (typeof this[kFs].close !== 'function') {
266    throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
267                                   this[kFs].close);
268  }
269
270  // It's enough to override either, in which case only one will be used.
271  if (!this[kFs].write) {
272    this._write = null;
273  }
274  if (!this[kFs].writev) {
275    this._writev = null;
276  }
277
278  Writable.call(this, options);
279
280  // Path will be ignored when fd is specified, so it can be falsy
281  this.path = toPathIfFileURL(path);
282  this.fd = options.fd === undefined ? null : options.fd;
283  this.flags = options.flags === undefined ? 'w' : options.flags;
284  this.mode = options.mode === undefined ? 0o666 : options.mode;
285
286  this.start = options.start;
287  this.autoClose = options.autoDestroy;
288  this.pos = undefined;
289  this.bytesWritten = 0;
290  this.closed = false;
291  this[kIsPerformingIO] = false;
292
293  if (this.start !== undefined) {
294    validateInteger(this.start, 'start', 0);
295
296    this.pos = this.start;
297  }
298
299  if (options.encoding)
300    this.setDefaultEncoding(options.encoding);
301
302  if (typeof this.fd !== 'number')
303    _openWriteFs(this);
304}
305ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype);
306ObjectSetPrototypeOf(WriteStream, Writable);
307
308WriteStream.prototype._final = function(callback) {
309  if (typeof this.fd !== 'number') {
310    return this.once('open', function() {
311      this._final(callback);
312    });
313  }
314
315  callback();
316};
317
318const openWriteFs = deprecate(function() {
319  _openWriteFs(this);
320}, 'WriteStream.prototype.open() is deprecated', 'DEP0135');
321WriteStream.prototype.open = openWriteFs;
322
323function _openWriteFs(stream) {
324  // Backwards compat for overriden open.
325  if (stream.open !== openWriteFs) {
326    stream.open();
327    return;
328  }
329
330  stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
331    if (er) {
332      if (stream.autoClose) {
333        stream.destroy();
334      }
335      stream.emit('error', er);
336      return;
337    }
338
339    stream.fd = fd;
340    stream.emit('open', fd);
341    stream.emit('ready');
342  });
343}
344
345
346WriteStream.prototype._write = function(data, encoding, cb) {
347  if (typeof this.fd !== 'number') {
348    return this.once('open', function() {
349      this._write(data, encoding, cb);
350    });
351  }
352
353  if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
354
355  this[kIsPerformingIO] = true;
356  this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
357    this[kIsPerformingIO] = false;
358    // Tell ._destroy() that it's safe to close the fd now.
359    if (this.destroyed) {
360      cb(er);
361      return this.emit(kIoDone, er);
362    }
363
364    if (er) {
365      return cb(er);
366    }
367    this.bytesWritten += bytes;
368    cb();
369  });
370
371  if (this.pos !== undefined)
372    this.pos += data.length;
373};
374
375
376WriteStream.prototype._writev = function(data, cb) {
377  if (typeof this.fd !== 'number') {
378    return this.once('open', function() {
379      this._writev(data, cb);
380    });
381  }
382
383  if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
384
385  const len = data.length;
386  const chunks = new Array(len);
387  let size = 0;
388
389  for (let i = 0; i < len; i++) {
390    const chunk = data[i].chunk;
391
392    chunks[i] = chunk;
393    size += chunk.length;
394  }
395
396  this[kIsPerformingIO] = true;
397  this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
398    this[kIsPerformingIO] = false;
399    // Tell ._destroy() that it's safe to close the fd now.
400    if (this.destroyed) {
401      cb(er);
402      return this.emit(kIoDone, er);
403    }
404
405    if (er) {
406      if (this.autoClose) {
407        this.destroy(er);
408      }
409      return cb(er);
410    }
411    this.bytesWritten += bytes;
412    cb();
413  });
414
415  if (this.pos !== undefined)
416    this.pos += size;
417};
418
419
420WriteStream.prototype._destroy = ReadStream.prototype._destroy;
421WriteStream.prototype.close = function(cb) {
422  if (cb) {
423    if (this.closed) {
424      process.nextTick(cb);
425      return;
426    }
427    this.on('close', cb);
428  }
429
430  // If we are not autoClosing, we should call
431  // destroy on 'finish'.
432  if (!this.autoClose) {
433    this.on('finish', this.destroy);
434  }
435
436  // We use end() instead of destroy() because of
437  // https://github.com/nodejs/node/issues/2006
438  this.end();
439};
440
441// There is no shutdown() for files.
442WriteStream.prototype.destroySoon = WriteStream.prototype.end;
443
444ObjectDefineProperty(WriteStream.prototype, 'pending', {
445  get() { return this.fd === null; },
446  configurable: true
447});
448
449module.exports = {
450  ReadStream,
451  WriteStream
452};
453