• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1var fs = require('fs')
2var polyfills = require('./polyfills.js')
3var legacy = require('./legacy-streams.js')
4var clone = require('./clone.js')
5
6var util = require('util')
7
8/* istanbul ignore next - node 0.x polyfill */
9var gracefulQueue
10var previousSymbol
11
12/* istanbul ignore else - node 0.x polyfill */
13if (typeof Symbol === 'function' && typeof Symbol.for === 'function') {
14  gracefulQueue = Symbol.for('graceful-fs.queue')
15  // This is used in testing by future versions
16  previousSymbol = Symbol.for('graceful-fs.previous')
17} else {
18  gracefulQueue = '___graceful-fs.queue'
19  previousSymbol = '___graceful-fs.previous'
20}
21
22function noop () {}
23
24function publishQueue(context, queue) {
25  Object.defineProperty(context, gracefulQueue, {
26    get: function() {
27      return queue
28    }
29  })
30}
31
32var debug = noop
33if (util.debuglog)
34  debug = util.debuglog('gfs4')
35else if (/\bgfs4\b/i.test(process.env.NODE_DEBUG || ''))
36  debug = function() {
37    var m = util.format.apply(util, arguments)
38    m = 'GFS4: ' + m.split(/\n/).join('\nGFS4: ')
39    console.error(m)
40  }
41
42// Once time initialization
43if (!fs[gracefulQueue]) {
44  // This queue can be shared by multiple loaded instances
45  var queue = global[gracefulQueue] || []
46  publishQueue(fs, queue)
47
48  // Patch fs.close/closeSync to shared queue version, because we need
49  // to retry() whenever a close happens *anywhere* in the program.
50  // This is essential when multiple graceful-fs instances are
51  // in play at the same time.
52  fs.close = (function (fs$close) {
53    function close (fd, cb) {
54      return fs$close.call(fs, fd, function (err) {
55        // This function uses the graceful-fs shared queue
56        if (!err) {
57          resetQueue()
58        }
59
60        if (typeof cb === 'function')
61          cb.apply(this, arguments)
62      })
63    }
64
65    Object.defineProperty(close, previousSymbol, {
66      value: fs$close
67    })
68    return close
69  })(fs.close)
70
71  fs.closeSync = (function (fs$closeSync) {
72    function closeSync (fd) {
73      // This function uses the graceful-fs shared queue
74      fs$closeSync.apply(fs, arguments)
75      resetQueue()
76    }
77
78    Object.defineProperty(closeSync, previousSymbol, {
79      value: fs$closeSync
80    })
81    return closeSync
82  })(fs.closeSync)
83
84  if (/\bgfs4\b/i.test(process.env.NODE_DEBUG || '')) {
85    process.on('exit', function() {
86      debug(fs[gracefulQueue])
87      require('assert').equal(fs[gracefulQueue].length, 0)
88    })
89  }
90}
91
92if (!global[gracefulQueue]) {
93  publishQueue(global, fs[gracefulQueue]);
94}
95
96module.exports = patch(clone(fs))
97if (process.env.TEST_GRACEFUL_FS_GLOBAL_PATCH && !fs.__patched) {
98    module.exports = patch(fs)
99    fs.__patched = true;
100}
101
102function patch (fs) {
103  // Everything that references the open() function needs to be in here
104  polyfills(fs)
105  fs.gracefulify = patch
106
107  fs.createReadStream = createReadStream
108  fs.createWriteStream = createWriteStream
109  var fs$readFile = fs.readFile
110  fs.readFile = readFile
111  function readFile (path, options, cb) {
112    if (typeof options === 'function')
113      cb = options, options = null
114
115    return go$readFile(path, options, cb)
116
117    function go$readFile (path, options, cb, startTime) {
118      return fs$readFile(path, options, function (err) {
119        if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
120          enqueue([go$readFile, [path, options, cb], err, startTime || Date.now(), Date.now()])
121        else {
122          if (typeof cb === 'function')
123            cb.apply(this, arguments)
124        }
125      })
126    }
127  }
128
129  var fs$writeFile = fs.writeFile
130  fs.writeFile = writeFile
131  function writeFile (path, data, options, cb) {
132    if (typeof options === 'function')
133      cb = options, options = null
134
135    return go$writeFile(path, data, options, cb)
136
137    function go$writeFile (path, data, options, cb, startTime) {
138      return fs$writeFile(path, data, options, function (err) {
139        if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
140          enqueue([go$writeFile, [path, data, options, cb], err, startTime || Date.now(), Date.now()])
141        else {
142          if (typeof cb === 'function')
143            cb.apply(this, arguments)
144        }
145      })
146    }
147  }
148
149  var fs$appendFile = fs.appendFile
150  if (fs$appendFile)
151    fs.appendFile = appendFile
152  function appendFile (path, data, options, cb) {
153    if (typeof options === 'function')
154      cb = options, options = null
155
156    return go$appendFile(path, data, options, cb)
157
158    function go$appendFile (path, data, options, cb, startTime) {
159      return fs$appendFile(path, data, options, function (err) {
160        if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
161          enqueue([go$appendFile, [path, data, options, cb], err, startTime || Date.now(), Date.now()])
162        else {
163          if (typeof cb === 'function')
164            cb.apply(this, arguments)
165        }
166      })
167    }
168  }
169
170  var fs$copyFile = fs.copyFile
171  if (fs$copyFile)
172    fs.copyFile = copyFile
173  function copyFile (src, dest, flags, cb) {
174    if (typeof flags === 'function') {
175      cb = flags
176      flags = 0
177    }
178    return go$copyFile(src, dest, flags, cb)
179
180    function go$copyFile (src, dest, flags, cb, startTime) {
181      return fs$copyFile(src, dest, flags, function (err) {
182        if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
183          enqueue([go$copyFile, [src, dest, flags, cb], err, startTime || Date.now(), Date.now()])
184        else {
185          if (typeof cb === 'function')
186            cb.apply(this, arguments)
187        }
188      })
189    }
190  }
191
192  var fs$readdir = fs.readdir
193  fs.readdir = readdir
194  var noReaddirOptionVersions = /^v[0-5]\./
195  function readdir (path, options, cb) {
196    if (typeof options === 'function')
197      cb = options, options = null
198
199    var go$readdir = noReaddirOptionVersions.test(process.version)
200      ? function go$readdir (path, options, cb, startTime) {
201        return fs$readdir(path, fs$readdirCallback(
202          path, options, cb, startTime
203        ))
204      }
205      : function go$readdir (path, options, cb, startTime) {
206        return fs$readdir(path, options, fs$readdirCallback(
207          path, options, cb, startTime
208        ))
209      }
210
211    return go$readdir(path, options, cb)
212
213    function fs$readdirCallback (path, options, cb, startTime) {
214      return function (err, files) {
215        if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
216          enqueue([
217            go$readdir,
218            [path, options, cb],
219            err,
220            startTime || Date.now(),
221            Date.now()
222          ])
223        else {
224          if (files && files.sort)
225            files.sort()
226
227          if (typeof cb === 'function')
228            cb.call(this, err, files)
229        }
230      }
231    }
232  }
233
234  if (process.version.substr(0, 4) === 'v0.8') {
235    var legStreams = legacy(fs)
236    ReadStream = legStreams.ReadStream
237    WriteStream = legStreams.WriteStream
238  }
239
240  var fs$ReadStream = fs.ReadStream
241  if (fs$ReadStream) {
242    ReadStream.prototype = Object.create(fs$ReadStream.prototype)
243    ReadStream.prototype.open = ReadStream$open
244  }
245
246  var fs$WriteStream = fs.WriteStream
247  if (fs$WriteStream) {
248    WriteStream.prototype = Object.create(fs$WriteStream.prototype)
249    WriteStream.prototype.open = WriteStream$open
250  }
251
252  Object.defineProperty(fs, 'ReadStream', {
253    get: function () {
254      return ReadStream
255    },
256    set: function (val) {
257      ReadStream = val
258    },
259    enumerable: true,
260    configurable: true
261  })
262  Object.defineProperty(fs, 'WriteStream', {
263    get: function () {
264      return WriteStream
265    },
266    set: function (val) {
267      WriteStream = val
268    },
269    enumerable: true,
270    configurable: true
271  })
272
273  // legacy names
274  var FileReadStream = ReadStream
275  Object.defineProperty(fs, 'FileReadStream', {
276    get: function () {
277      return FileReadStream
278    },
279    set: function (val) {
280      FileReadStream = val
281    },
282    enumerable: true,
283    configurable: true
284  })
285  var FileWriteStream = WriteStream
286  Object.defineProperty(fs, 'FileWriteStream', {
287    get: function () {
288      return FileWriteStream
289    },
290    set: function (val) {
291      FileWriteStream = val
292    },
293    enumerable: true,
294    configurable: true
295  })
296
297  function ReadStream (path, options) {
298    if (this instanceof ReadStream)
299      return fs$ReadStream.apply(this, arguments), this
300    else
301      return ReadStream.apply(Object.create(ReadStream.prototype), arguments)
302  }
303
304  function ReadStream$open () {
305    var that = this
306    open(that.path, that.flags, that.mode, function (err, fd) {
307      if (err) {
308        if (that.autoClose)
309          that.destroy()
310
311        that.emit('error', err)
312      } else {
313        that.fd = fd
314        that.emit('open', fd)
315        that.read()
316      }
317    })
318  }
319
320  function WriteStream (path, options) {
321    if (this instanceof WriteStream)
322      return fs$WriteStream.apply(this, arguments), this
323    else
324      return WriteStream.apply(Object.create(WriteStream.prototype), arguments)
325  }
326
327  function WriteStream$open () {
328    var that = this
329    open(that.path, that.flags, that.mode, function (err, fd) {
330      if (err) {
331        that.destroy()
332        that.emit('error', err)
333      } else {
334        that.fd = fd
335        that.emit('open', fd)
336      }
337    })
338  }
339
340  function createReadStream (path, options) {
341    return new fs.ReadStream(path, options)
342  }
343
344  function createWriteStream (path, options) {
345    return new fs.WriteStream(path, options)
346  }
347
348  var fs$open = fs.open
349  fs.open = open
350  function open (path, flags, mode, cb) {
351    if (typeof mode === 'function')
352      cb = mode, mode = null
353
354    return go$open(path, flags, mode, cb)
355
356    function go$open (path, flags, mode, cb, startTime) {
357      return fs$open(path, flags, mode, function (err, fd) {
358        if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
359          enqueue([go$open, [path, flags, mode, cb], err, startTime || Date.now(), Date.now()])
360        else {
361          if (typeof cb === 'function')
362            cb.apply(this, arguments)
363        }
364      })
365    }
366  }
367
368  return fs
369}
370
371function enqueue (elem) {
372  debug('ENQUEUE', elem[0].name, elem[1])
373  fs[gracefulQueue].push(elem)
374  retry()
375}
376
377// keep track of the timeout between retry() calls
378var retryTimer
379
380// reset the startTime and lastTime to now
381// this resets the start of the 60 second overall timeout as well as the
382// delay between attempts so that we'll retry these jobs sooner
383function resetQueue () {
384  var now = Date.now()
385  for (var i = 0; i < fs[gracefulQueue].length; ++i) {
386    // entries that are only a length of 2 are from an older version, don't
387    // bother modifying those since they'll be retried anyway.
388    if (fs[gracefulQueue][i].length > 2) {
389      fs[gracefulQueue][i][3] = now // startTime
390      fs[gracefulQueue][i][4] = now // lastTime
391    }
392  }
393  // call retry to make sure we're actively processing the queue
394  retry()
395}
396
397function retry () {
398  // clear the timer and remove it to help prevent unintended concurrency
399  clearTimeout(retryTimer)
400  retryTimer = undefined
401
402  if (fs[gracefulQueue].length === 0)
403    return
404
405  var elem = fs[gracefulQueue].shift()
406  var fn = elem[0]
407  var args = elem[1]
408  // these items may be unset if they were added by an older graceful-fs
409  var err = elem[2]
410  var startTime = elem[3]
411  var lastTime = elem[4]
412
413  // if we don't have a startTime we have no way of knowing if we've waited
414  // long enough, so go ahead and retry this item now
415  if (startTime === undefined) {
416    debug('RETRY', fn.name, args)
417    fn.apply(null, args)
418  } else if (Date.now() - startTime >= 60000) {
419    // it's been more than 60 seconds total, bail now
420    debug('TIMEOUT', fn.name, args)
421    var cb = args.pop()
422    if (typeof cb === 'function')
423      cb.call(null, err)
424  } else {
425    // the amount of time between the last attempt and right now
426    var sinceAttempt = Date.now() - lastTime
427    // the amount of time between when we first tried, and when we last tried
428    // rounded up to at least 1
429    var sinceStart = Math.max(lastTime - startTime, 1)
430    // backoff. wait longer than the total time we've been retrying, but only
431    // up to a maximum of 100ms
432    var desiredDelay = Math.min(sinceStart * 1.2, 100)
433    // it's been long enough since the last retry, do it again
434    if (sinceAttempt >= desiredDelay) {
435      debug('RETRY', fn.name, args)
436      fn.apply(null, args.concat([startTime]))
437    } else {
438      // if we can't do this job yet, push it to the end of the queue
439      // and let the next iteration check again
440      fs[gracefulQueue].push(elem)
441    }
442  }
443
444  // schedule our next run if one isn't already scheduled
445  if (retryTimer === undefined) {
446    retryTimer = setTimeout(retry, 0)
447  }
448}
449