• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict'
2const MiniPass = require('minipass')
3const EE = require('events').EventEmitter
4const fs = require('fs')
5
6// for writev
7const binding = process.binding('fs')
8const writeBuffers = binding.writeBuffers
9/* istanbul ignore next */
10const FSReqWrap = binding.FSReqWrap || binding.FSReqCallback
11
12const _autoClose = Symbol('_autoClose')
13const _close = Symbol('_close')
14const _ended = Symbol('_ended')
15const _fd = Symbol('_fd')
16const _finished = Symbol('_finished')
17const _flags = Symbol('_flags')
18const _flush = Symbol('_flush')
19const _handleChunk = Symbol('_handleChunk')
20const _makeBuf = Symbol('_makeBuf')
21const _mode = Symbol('_mode')
22const _needDrain = Symbol('_needDrain')
23const _onerror = Symbol('_onerror')
24const _onopen = Symbol('_onopen')
25const _onread = Symbol('_onread')
26const _onwrite = Symbol('_onwrite')
27const _open = Symbol('_open')
28const _path = Symbol('_path')
29const _pos = Symbol('_pos')
30const _queue = Symbol('_queue')
31const _read = Symbol('_read')
32const _readSize = Symbol('_readSize')
33const _reading = Symbol('_reading')
34const _remain = Symbol('_remain')
35const _size = Symbol('_size')
36const _write = Symbol('_write')
37const _writing = Symbol('_writing')
38const _defaultFlag = Symbol('_defaultFlag')
39
40class ReadStream extends MiniPass {
41  constructor (path, opt) {
42    opt = opt || {}
43    super(opt)
44
45    this.writable = false
46
47    if (typeof path !== 'string')
48      throw new TypeError('path must be a string')
49
50    this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
51    this[_path] = path
52    this[_readSize] = opt.readSize || 16*1024*1024
53    this[_reading] = false
54    this[_size] = typeof opt.size === 'number' ? opt.size : Infinity
55    this[_remain] = this[_size]
56    this[_autoClose] = typeof opt.autoClose === 'boolean' ?
57      opt.autoClose : true
58
59    if (typeof this[_fd] === 'number')
60      this[_read]()
61    else
62      this[_open]()
63  }
64
65  get fd () { return this[_fd] }
66  get path () { return this[_path] }
67
68  write () {
69    throw new TypeError('this is a readable stream')
70  }
71
72  end () {
73    throw new TypeError('this is a readable stream')
74  }
75
76  [_open] () {
77    fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd))
78  }
79
80  [_onopen] (er, fd) {
81    if (er)
82      this[_onerror](er)
83    else {
84      this[_fd] = fd
85      this.emit('open', fd)
86      this[_read]()
87    }
88  }
89
90  [_makeBuf] () {
91    return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain]))
92  }
93
94  [_read] () {
95    if (!this[_reading]) {
96      this[_reading] = true
97      const buf = this[_makeBuf]()
98      /* istanbul ignore if */
99      if (buf.length === 0) return process.nextTick(() => this[_onread](null, 0, buf))
100      fs.read(this[_fd], buf, 0, buf.length, null, (er, br, buf) =>
101        this[_onread](er, br, buf))
102    }
103  }
104
105  [_onread] (er, br, buf) {
106    this[_reading] = false
107    if (er)
108      this[_onerror](er)
109    else if (this[_handleChunk](br, buf))
110      this[_read]()
111  }
112
113  [_close] () {
114    if (this[_autoClose] && typeof this[_fd] === 'number') {
115      fs.close(this[_fd], _ => this.emit('close'))
116      this[_fd] = null
117    }
118  }
119
120  [_onerror] (er) {
121    this[_reading] = true
122    this[_close]()
123    this.emit('error', er)
124  }
125
126  [_handleChunk] (br, buf) {
127    let ret = false
128    // no effect if infinite
129    this[_remain] -= br
130    if (br > 0)
131      ret = super.write(br < buf.length ? buf.slice(0, br) : buf)
132
133    if (br === 0 || this[_remain] <= 0) {
134      ret = false
135      this[_close]()
136      super.end()
137    }
138
139    return ret
140  }
141
142  emit (ev, data) {
143    switch (ev) {
144      case 'prefinish':
145      case 'finish':
146        break
147
148      case 'drain':
149        if (typeof this[_fd] === 'number')
150          this[_read]()
151        break
152
153      default:
154        return super.emit(ev, data)
155    }
156  }
157}
158
159class ReadStreamSync extends ReadStream {
160  [_open] () {
161    let threw = true
162    try {
163      this[_onopen](null, fs.openSync(this[_path], 'r'))
164      threw = false
165    } finally {
166      if (threw)
167        this[_close]()
168    }
169  }
170
171  [_read] () {
172    let threw = true
173    try {
174      if (!this[_reading]) {
175        this[_reading] = true
176        do {
177          const buf = this[_makeBuf]()
178          /* istanbul ignore next */
179          const br = buf.length === 0 ? 0 : fs.readSync(this[_fd], buf, 0, buf.length, null)
180          if (!this[_handleChunk](br, buf))
181            break
182        } while (true)
183        this[_reading] = false
184      }
185      threw = false
186    } finally {
187      if (threw)
188        this[_close]()
189    }
190  }
191
192  [_close] () {
193    if (this[_autoClose] && typeof this[_fd] === 'number') {
194      try {
195        fs.closeSync(this[_fd])
196      } catch (er) {}
197      this[_fd] = null
198      this.emit('close')
199    }
200  }
201}
202
203class WriteStream extends EE {
204  constructor (path, opt) {
205    opt = opt || {}
206    super(opt)
207    this.readable = false
208    this[_writing] = false
209    this[_ended] = false
210    this[_needDrain] = false
211    this[_queue] = []
212    this[_path] = path
213    this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
214    this[_mode] = opt.mode === undefined ? 0o666 : opt.mode
215    this[_pos] = typeof opt.start === 'number' ? opt.start : null
216    this[_autoClose] = typeof opt.autoClose === 'boolean' ?
217      opt.autoClose : true
218
219    // truncating makes no sense when writing into the middle
220    const defaultFlag = this[_pos] !== null ? 'r+' : 'w'
221    this[_defaultFlag] = opt.flags === undefined
222    this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags
223
224    if (this[_fd] === null)
225      this[_open]()
226  }
227
228  get fd () { return this[_fd] }
229  get path () { return this[_path] }
230
231  [_onerror] (er) {
232    this[_close]()
233    this[_writing] = true
234    this.emit('error', er)
235  }
236
237  [_open] () {
238    fs.open(this[_path], this[_flags], this[_mode],
239      (er, fd) => this[_onopen](er, fd))
240  }
241
242  [_onopen] (er, fd) {
243    if (this[_defaultFlag] &&
244        this[_flags] === 'r+' &&
245        er && er.code === 'ENOENT') {
246      this[_flags] = 'w'
247      this[_open]()
248    } else if (er)
249      this[_onerror](er)
250    else {
251      this[_fd] = fd
252      this.emit('open', fd)
253      this[_flush]()
254    }
255  }
256
257  end (buf, enc) {
258    if (buf)
259      this.write(buf, enc)
260
261    this[_ended] = true
262
263    // synthetic after-write logic, where drain/finish live
264    if (!this[_writing] && !this[_queue].length &&
265        typeof this[_fd] === 'number')
266      this[_onwrite](null, 0)
267  }
268
269  write (buf, enc) {
270    if (typeof buf === 'string')
271      buf = new Buffer(buf, enc)
272
273    if (this[_ended]) {
274      this.emit('error', new Error('write() after end()'))
275      return false
276    }
277
278    if (this[_fd] === null || this[_writing] || this[_queue].length) {
279      this[_queue].push(buf)
280      this[_needDrain] = true
281      return false
282    }
283
284    this[_writing] = true
285    this[_write](buf)
286    return true
287  }
288
289  [_write] (buf) {
290    fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) =>
291      this[_onwrite](er, bw))
292  }
293
294  [_onwrite] (er, bw) {
295    if (er)
296      this[_onerror](er)
297    else {
298      if (this[_pos] !== null)
299        this[_pos] += bw
300      if (this[_queue].length)
301        this[_flush]()
302      else {
303        this[_writing] = false
304
305        if (this[_ended] && !this[_finished]) {
306          this[_finished] = true
307          this[_close]()
308          this.emit('finish')
309        } else if (this[_needDrain]) {
310          this[_needDrain] = false
311          this.emit('drain')
312        }
313      }
314    }
315  }
316
317  [_flush] () {
318    if (this[_queue].length === 0) {
319      if (this[_ended])
320        this[_onwrite](null, 0)
321    } else if (this[_queue].length === 1)
322      this[_write](this[_queue].pop())
323    else {
324      const iovec = this[_queue]
325      this[_queue] = []
326      writev(this[_fd], iovec, this[_pos],
327        (er, bw) => this[_onwrite](er, bw))
328    }
329  }
330
331  [_close] () {
332    if (this[_autoClose] && typeof this[_fd] === 'number') {
333      fs.close(this[_fd], _ => this.emit('close'))
334      this[_fd] = null
335    }
336  }
337}
338
339class WriteStreamSync extends WriteStream {
340  [_open] () {
341    let fd
342    try {
343      fd = fs.openSync(this[_path], this[_flags], this[_mode])
344    } catch (er) {
345      if (this[_defaultFlag] &&
346          this[_flags] === 'r+' &&
347          er && er.code === 'ENOENT') {
348        this[_flags] = 'w'
349        return this[_open]()
350      } else
351        throw er
352    }
353    this[_onopen](null, fd)
354  }
355
356  [_close] () {
357    if (this[_autoClose] && typeof this[_fd] === 'number') {
358      try {
359        fs.closeSync(this[_fd])
360      } catch (er) {}
361      this[_fd] = null
362      this.emit('close')
363    }
364  }
365
366  [_write] (buf) {
367    try {
368      this[_onwrite](null,
369        fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos]))
370    } catch (er) {
371      this[_onwrite](er, 0)
372    }
373  }
374}
375
376const writev = (fd, iovec, pos, cb) => {
377  const done = (er, bw) => cb(er, bw, iovec)
378  const req = new FSReqWrap()
379  req.oncomplete = done
380  binding.writeBuffers(fd, iovec, pos, req)
381}
382
383exports.ReadStream = ReadStream
384exports.ReadStreamSync = ReadStreamSync
385
386exports.WriteStream = WriteStream
387exports.WriteStreamSync = WriteStreamSync
388