• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 'use strict';
2 
3 const {
4   Array,
5   MathMin,
6   ObjectDefineProperty,
7   ObjectSetPrototypeOf,
8   Symbol,
9 } = primordials;
10 
11 const {
12   ERR_INVALID_ARG_TYPE,
13   ERR_OUT_OF_RANGE,
14   ERR_STREAM_DESTROYED
15 } = require('internal/errors').codes;
16 const { deprecate } = require('internal/util');
17 const { validateInteger } = require('internal/validators');
18 const fs = require('fs');
19 const { Buffer } = require('buffer');
20 const {
21   copyObject,
22   getOptions,
23 } = require('internal/fs/utils');
24 const { Readable, Writable, finished } = require('stream');
25 const { toPathIfFileURL } = require('internal/url');
26 const kIoDone = Symbol('kIoDone');
27 const kIsPerformingIO = Symbol('kIsPerformingIO');
28 
29 const kFs = Symbol('kFs');
30 
31 function ReadStream(path, options) {
32   if (!(this instanceof ReadStream))
33     return new ReadStream(path, options);
34 
35   // A little bit bigger buffer and water marks by default
36   options = copyObject(getOptions(options, {}));
37   if (options.highWaterMark === undefined)
38     options.highWaterMark = 64 * 1024;
39 
40   if (options.autoDestroy === undefined) {
41     options.autoDestroy = false;
42   }
43 
44   this[kFs] = options.fs || fs;
45 
46   if (typeof this[kFs].open !== 'function') {
47     throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
48                                    this[kFs].open);
49   }
50 
51   if (typeof this[kFs].read !== 'function') {
52     throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function',
53                                    this[kFs].read);
54   }
55 
56   if (typeof this[kFs].close !== 'function') {
57     throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
58                                    this[kFs].close);
59   }
60 
61   Readable.call(this, options);
62 
63   // Path will be ignored when fd is specified, so it can be falsy
64   this.path = toPathIfFileURL(path);
65   this.fd = options.fd === undefined ? null : options.fd;
66   this.flags = options.flags === undefined ? 'r' : options.flags;
67   this.mode = options.mode === undefined ? 0o666 : options.mode;
68 
69   this.start = options.start;
70   this.end = options.end;
71   this.autoClose = options.autoClose === undefined ? true : options.autoClose;
72   this.pos = undefined;
73   this.bytesRead = 0;
74   this.closed = false;
75   this[kIsPerformingIO] = false;
76 
77   if (this.start !== undefined) {
78     validateInteger(this.start, 'start', 0);
79 
80     this.pos = this.start;
81   }
82 
83   if (this.end === undefined) {
84     this.end = Infinity;
85   } else if (this.end !== Infinity) {
86     validateInteger(this.end, 'end', 0);
87 
88     if (this.start !== undefined && this.start > this.end) {
89       throw new ERR_OUT_OF_RANGE(
90         'start',
91         `<= "end" (here: ${this.end})`,
92         this.start
93       );
94     }
95   }
96 
97   if (typeof this.fd !== 'number')
98     _openReadFs(this);
99 
100   this.on('end', function() {
101     if (this.autoClose) {
102       this.destroy();
103     }
104   });
105 }
106 ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
107 ObjectSetPrototypeOf(ReadStream, Readable);
108 
109 const openReadFs = deprecate(function() {
110   _openReadFs(this);
111 }, 'ReadStream.prototype.open() is deprecated', 'DEP0135');
112 ReadStream.prototype.open = openReadFs;
113 
114 function _openReadFs(stream) {
115   // Backwards compat for overriden open.
116   if (stream.open !== openReadFs) {
117     stream.open();
118     return;
119   }
120 
121   stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
122     if (er) {
123       if (stream.autoClose) {
124         stream.destroy();
125       }
126       stream.emit('error', er);
127       return;
128     }
129 
130     stream.fd = fd;
131     stream.emit('open', fd);
132     stream.emit('ready');
133     // Start the flow of data.
134     stream.read();
135   });
136 }
137 
138 ReadStream.prototype._read = function(n) {
139   if (typeof this.fd !== 'number') {
140     return this.once('open', function() {
141       this._read(n);
142     });
143   }
144 
145   if (this.destroyed) return;
146 
147   n = this.pos !== undefined ?
148     MathMin(this.end - this.pos + 1, n) :
149     MathMin(this.end - this.bytesRead + 1, n);
150 
151   if (n <= 0) {
152     this.push(null);
153     return;
154   }
155 
156   const buf = Buffer.allocUnsafeSlow(n);
157 
158   this[kIsPerformingIO] = true;
159   this[kFs]
160     .read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => {
161       this[kIsPerformingIO] = false;
162 
163       // Tell ._destroy() that it's safe to close the fd now.
164       if (this.destroyed) {
165         this.emit(kIoDone, er);
166         return;
167       }
168 
169       if (er) {
170         if (this.autoClose) {
171           this.destroy();
172         }
173         this.emit('error', er);
174       } else if (bytesRead > 0) {
175         if (this.pos !== undefined) {
176           this.pos += bytesRead;
177         }
178 
179         this.bytesRead += bytesRead;
180 
181         if (bytesRead !== buf.length) {
182           // Slow path. Shrink to fit.
183           // Copy instead of slice so that we don't retain
184           // large backing buffer for small reads.
185           const dst = Buffer.allocUnsafeSlow(bytesRead);
186           buf.copy(dst, 0, 0, bytesRead);
187           buf = dst;
188         }
189 
190         this.push(buf);
191       } else {
192         this.push(null);
193       }
194     });
195 };
196 
197 ReadStream.prototype._destroy = function(err, cb) {
198   if (typeof this.fd !== 'number') {
199     this.once('open', closeFsStream.bind(null, this, cb, err));
200     return;
201   }
202 
203   if (this[kIsPerformingIO]) {
204     this.once(kIoDone, (er) => closeFsStream(this, cb, err || er));
205     return;
206   }
207 
208   closeFsStream(this, cb, err);
209 };
210 
211 function closeFsStream(stream, cb, err) {
212   stream[kFs].close(stream.fd, (er) => {
213     stream.closed = true;
214     cb(er || err);
215   });
216 
217   stream.fd = null;
218 }
219 
220 ReadStream.prototype.close = function(cb) {
221   if (typeof cb === 'function') finished(this, cb);
222   this.destroy();
223 };
224 
225 ObjectDefineProperty(ReadStream.prototype, 'pending', {
226   get() { return this.fd === null; },
227   configurable: true
228 });
229 
230 function WriteStream(path, options) {
231   if (!(this instanceof WriteStream))
232     return new WriteStream(path, options);
233 
234   options = copyObject(getOptions(options, {}));
235 
236   // Only buffers are supported.
237   options.decodeStrings = true;
238 
239   if (options.autoDestroy === undefined) {
240     options.autoDestroy = options.autoClose === undefined ?
241       true : (options.autoClose || false);
242   }
243 
244   this[kFs] = options.fs || fs;
245   if (typeof this[kFs].open !== 'function') {
246     throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
247                                    this[kFs].open);
248   }
249 
250   if (!this[kFs].write && !this[kFs].writev) {
251     throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
252                                    this[kFs].write);
253   }
254 
255   if (this[kFs].write && typeof this[kFs].write !== 'function') {
256     throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
257                                    this[kFs].write);
258   }
259 
260   if (this[kFs].writev && typeof this[kFs].writev !== 'function') {
261     throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function',
262                                    this[kFs].writev);
263   }
264 
265   if (typeof this[kFs].close !== 'function') {
266     throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
267                                    this[kFs].close);
268   }
269 
270   // It's enough to override either, in which case only one will be used.
271   if (!this[kFs].write) {
272     this._write = null;
273   }
274   if (!this[kFs].writev) {
275     this._writev = null;
276   }
277 
278   Writable.call(this, options);
279 
280   // Path will be ignored when fd is specified, so it can be falsy
281   this.path = toPathIfFileURL(path);
282   this.fd = options.fd === undefined ? null : options.fd;
283   this.flags = options.flags === undefined ? 'w' : options.flags;
284   this.mode = options.mode === undefined ? 0o666 : options.mode;
285 
286   this.start = options.start;
287   this.autoClose = options.autoDestroy;
288   this.pos = undefined;
289   this.bytesWritten = 0;
290   this.closed = false;
291   this[kIsPerformingIO] = false;
292 
293   if (this.start !== undefined) {
294     validateInteger(this.start, 'start', 0);
295 
296     this.pos = this.start;
297   }
298 
299   if (options.encoding)
300     this.setDefaultEncoding(options.encoding);
301 
302   if (typeof this.fd !== 'number')
303     _openWriteFs(this);
304 }
305 ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype);
306 ObjectSetPrototypeOf(WriteStream, Writable);
307 
308 WriteStream.prototype._final = function(callback) {
309   if (typeof this.fd !== 'number') {
310     return this.once('open', function() {
311       this._final(callback);
312     });
313   }
314 
315   callback();
316 };
317 
318 const openWriteFs = deprecate(function() {
319   _openWriteFs(this);
320 }, 'WriteStream.prototype.open() is deprecated', 'DEP0135');
321 WriteStream.prototype.open = openWriteFs;
322 
323 function _openWriteFs(stream) {
324   // Backwards compat for overriden open.
325   if (stream.open !== openWriteFs) {
326     stream.open();
327     return;
328   }
329 
330   stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
331     if (er) {
332       if (stream.autoClose) {
333         stream.destroy();
334       }
335       stream.emit('error', er);
336       return;
337     }
338 
339     stream.fd = fd;
340     stream.emit('open', fd);
341     stream.emit('ready');
342   });
343 }
344 
345 
346 WriteStream.prototype._write = function(data, encoding, cb) {
347   if (typeof this.fd !== 'number') {
348     return this.once('open', function() {
349       this._write(data, encoding, cb);
350     });
351   }
352 
353   if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
354 
355   this[kIsPerformingIO] = true;
356   this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
357     this[kIsPerformingIO] = false;
358     // Tell ._destroy() that it's safe to close the fd now.
359     if (this.destroyed) {
360       cb(er);
361       return this.emit(kIoDone, er);
362     }
363 
364     if (er) {
365       return cb(er);
366     }
367     this.bytesWritten += bytes;
368     cb();
369   });
370 
371   if (this.pos !== undefined)
372     this.pos += data.length;
373 };
374 
375 
376 WriteStream.prototype._writev = function(data, cb) {
377   if (typeof this.fd !== 'number') {
378     return this.once('open', function() {
379       this._writev(data, cb);
380     });
381   }
382 
383   if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
384 
385   const len = data.length;
386   const chunks = new Array(len);
387   let size = 0;
388 
389   for (let i = 0; i < len; i++) {
390     const chunk = data[i].chunk;
391 
392     chunks[i] = chunk;
393     size += chunk.length;
394   }
395 
396   this[kIsPerformingIO] = true;
397   this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
398     this[kIsPerformingIO] = false;
399     // Tell ._destroy() that it's safe to close the fd now.
400     if (this.destroyed) {
401       cb(er);
402       return this.emit(kIoDone, er);
403     }
404 
405     if (er) {
406       if (this.autoClose) {
407         this.destroy(er);
408       }
409       return cb(er);
410     }
411     this.bytesWritten += bytes;
412     cb();
413   });
414 
415   if (this.pos !== undefined)
416     this.pos += size;
417 };
418 
419 
420 WriteStream.prototype._destroy = ReadStream.prototype._destroy;
421 WriteStream.prototype.close = function(cb) {
422   if (cb) {
423     if (this.closed) {
424       process.nextTick(cb);
425       return;
426     }
427     this.on('close', cb);
428   }
429 
430   // If we are not autoClosing, we should call
431   // destroy on 'finish'.
432   if (!this.autoClose) {
433     this.on('finish', this.destroy);
434   }
435 
436   // We use end() instead of destroy() because of
437   // https://github.com/nodejs/node/issues/2006
438   this.end();
439 };
440 
441 // There is no shutdown() for files.
442 WriteStream.prototype.destroySoon = WriteStream.prototype.end;
443 
444 ObjectDefineProperty(WriteStream.prototype, 'pending', {
445   get() { return this.fd === null; },
446   configurable: true
447 });
448 
449 module.exports = {
450   ReadStream,
451   WriteStream
452 };
453