1var fs = require('fs') 2var polyfills = require('./polyfills.js') 3var legacy = require('./legacy-streams.js') 4var clone = require('./clone.js') 5 6var util = require('util') 7 8/* istanbul ignore next - node 0.x polyfill */ 9var gracefulQueue 10var previousSymbol 11 12/* istanbul ignore else - node 0.x polyfill */ 13if (typeof Symbol === 'function' && typeof Symbol.for === 'function') { 14 gracefulQueue = Symbol.for('graceful-fs.queue') 15 // This is used in testing by future versions 16 previousSymbol = Symbol.for('graceful-fs.previous') 17} else { 18 gracefulQueue = '___graceful-fs.queue' 19 previousSymbol = '___graceful-fs.previous' 20} 21 22function noop () {} 23 24function publishQueue(context, queue) { 25 Object.defineProperty(context, gracefulQueue, { 26 get: function() { 27 return queue 28 } 29 }) 30} 31 32var debug = noop 33if (util.debuglog) 34 debug = util.debuglog('gfs4') 35else if (/\bgfs4\b/i.test(process.env.NODE_DEBUG || '')) 36 debug = function() { 37 var m = util.format.apply(util, arguments) 38 m = 'GFS4: ' + m.split(/\n/).join('\nGFS4: ') 39 console.error(m) 40 } 41 42// Once time initialization 43if (!fs[gracefulQueue]) { 44 // This queue can be shared by multiple loaded instances 45 var queue = global[gracefulQueue] || [] 46 publishQueue(fs, queue) 47 48 // Patch fs.close/closeSync to shared queue version, because we need 49 // to retry() whenever a close happens *anywhere* in the program. 50 // This is essential when multiple graceful-fs instances are 51 // in play at the same time. 52 fs.close = (function (fs$close) { 53 function close (fd, cb) { 54 return fs$close.call(fs, fd, function (err) { 55 // This function uses the graceful-fs shared queue 56 if (!err) { 57 resetQueue() 58 } 59 60 if (typeof cb === 'function') 61 cb.apply(this, arguments) 62 }) 63 } 64 65 Object.defineProperty(close, previousSymbol, { 66 value: fs$close 67 }) 68 return close 69 })(fs.close) 70 71 fs.closeSync = (function (fs$closeSync) { 72 function closeSync (fd) { 73 // This function uses the graceful-fs shared queue 74 fs$closeSync.apply(fs, arguments) 75 resetQueue() 76 } 77 78 Object.defineProperty(closeSync, previousSymbol, { 79 value: fs$closeSync 80 }) 81 return closeSync 82 })(fs.closeSync) 83 84 if (/\bgfs4\b/i.test(process.env.NODE_DEBUG || '')) { 85 process.on('exit', function() { 86 debug(fs[gracefulQueue]) 87 require('assert').equal(fs[gracefulQueue].length, 0) 88 }) 89 } 90} 91 92if (!global[gracefulQueue]) { 93 publishQueue(global, fs[gracefulQueue]); 94} 95 96module.exports = patch(clone(fs)) 97if (process.env.TEST_GRACEFUL_FS_GLOBAL_PATCH && !fs.__patched) { 98 module.exports = patch(fs) 99 fs.__patched = true; 100} 101 102function patch (fs) { 103 // Everything that references the open() function needs to be in here 104 polyfills(fs) 105 fs.gracefulify = patch 106 107 fs.createReadStream = createReadStream 108 fs.createWriteStream = createWriteStream 109 var fs$readFile = fs.readFile 110 fs.readFile = readFile 111 function readFile (path, options, cb) { 112 if (typeof options === 'function') 113 cb = options, options = null 114 115 return go$readFile(path, options, cb) 116 117 function go$readFile (path, options, cb, startTime) { 118 return fs$readFile(path, options, function (err) { 119 if (err && (err.code === 'EMFILE' || err.code === 'ENFILE')) 120 enqueue([go$readFile, [path, options, cb], err, startTime || Date.now(), Date.now()]) 121 else { 122 if (typeof cb === 'function') 123 cb.apply(this, arguments) 124 } 125 }) 126 } 127 } 128 129 var fs$writeFile = fs.writeFile 130 fs.writeFile = writeFile 131 function writeFile (path, data, options, cb) { 132 if (typeof options === 'function') 133 cb = options, options = null 134 135 return go$writeFile(path, data, options, cb) 136 137 function go$writeFile (path, data, options, cb, startTime) { 138 return fs$writeFile(path, data, options, function (err) { 139 if (err && (err.code === 'EMFILE' || err.code === 'ENFILE')) 140 enqueue([go$writeFile, [path, data, options, cb], err, startTime || Date.now(), Date.now()]) 141 else { 142 if (typeof cb === 'function') 143 cb.apply(this, arguments) 144 } 145 }) 146 } 147 } 148 149 var fs$appendFile = fs.appendFile 150 if (fs$appendFile) 151 fs.appendFile = appendFile 152 function appendFile (path, data, options, cb) { 153 if (typeof options === 'function') 154 cb = options, options = null 155 156 return go$appendFile(path, data, options, cb) 157 158 function go$appendFile (path, data, options, cb, startTime) { 159 return fs$appendFile(path, data, options, function (err) { 160 if (err && (err.code === 'EMFILE' || err.code === 'ENFILE')) 161 enqueue([go$appendFile, [path, data, options, cb], err, startTime || Date.now(), Date.now()]) 162 else { 163 if (typeof cb === 'function') 164 cb.apply(this, arguments) 165 } 166 }) 167 } 168 } 169 170 var fs$copyFile = fs.copyFile 171 if (fs$copyFile) 172 fs.copyFile = copyFile 173 function copyFile (src, dest, flags, cb) { 174 if (typeof flags === 'function') { 175 cb = flags 176 flags = 0 177 } 178 return go$copyFile(src, dest, flags, cb) 179 180 function go$copyFile (src, dest, flags, cb, startTime) { 181 return fs$copyFile(src, dest, flags, function (err) { 182 if (err && (err.code === 'EMFILE' || err.code === 'ENFILE')) 183 enqueue([go$copyFile, [src, dest, flags, cb], err, startTime || Date.now(), Date.now()]) 184 else { 185 if (typeof cb === 'function') 186 cb.apply(this, arguments) 187 } 188 }) 189 } 190 } 191 192 var fs$readdir = fs.readdir 193 fs.readdir = readdir 194 var noReaddirOptionVersions = /^v[0-5]\./ 195 function readdir (path, options, cb) { 196 if (typeof options === 'function') 197 cb = options, options = null 198 199 var go$readdir = noReaddirOptionVersions.test(process.version) 200 ? function go$readdir (path, options, cb, startTime) { 201 return fs$readdir(path, fs$readdirCallback( 202 path, options, cb, startTime 203 )) 204 } 205 : function go$readdir (path, options, cb, startTime) { 206 return fs$readdir(path, options, fs$readdirCallback( 207 path, options, cb, startTime 208 )) 209 } 210 211 return go$readdir(path, options, cb) 212 213 function fs$readdirCallback (path, options, cb, startTime) { 214 return function (err, files) { 215 if (err && (err.code === 'EMFILE' || err.code === 'ENFILE')) 216 enqueue([ 217 go$readdir, 218 [path, options, cb], 219 err, 220 startTime || Date.now(), 221 Date.now() 222 ]) 223 else { 224 if (files && files.sort) 225 files.sort() 226 227 if (typeof cb === 'function') 228 cb.call(this, err, files) 229 } 230 } 231 } 232 } 233 234 if (process.version.substr(0, 4) === 'v0.8') { 235 var legStreams = legacy(fs) 236 ReadStream = legStreams.ReadStream 237 WriteStream = legStreams.WriteStream 238 } 239 240 var fs$ReadStream = fs.ReadStream 241 if (fs$ReadStream) { 242 ReadStream.prototype = Object.create(fs$ReadStream.prototype) 243 ReadStream.prototype.open = ReadStream$open 244 } 245 246 var fs$WriteStream = fs.WriteStream 247 if (fs$WriteStream) { 248 WriteStream.prototype = Object.create(fs$WriteStream.prototype) 249 WriteStream.prototype.open = WriteStream$open 250 } 251 252 Object.defineProperty(fs, 'ReadStream', { 253 get: function () { 254 return ReadStream 255 }, 256 set: function (val) { 257 ReadStream = val 258 }, 259 enumerable: true, 260 configurable: true 261 }) 262 Object.defineProperty(fs, 'WriteStream', { 263 get: function () { 264 return WriteStream 265 }, 266 set: function (val) { 267 WriteStream = val 268 }, 269 enumerable: true, 270 configurable: true 271 }) 272 273 // legacy names 274 var FileReadStream = ReadStream 275 Object.defineProperty(fs, 'FileReadStream', { 276 get: function () { 277 return FileReadStream 278 }, 279 set: function (val) { 280 FileReadStream = val 281 }, 282 enumerable: true, 283 configurable: true 284 }) 285 var FileWriteStream = WriteStream 286 Object.defineProperty(fs, 'FileWriteStream', { 287 get: function () { 288 return FileWriteStream 289 }, 290 set: function (val) { 291 FileWriteStream = val 292 }, 293 enumerable: true, 294 configurable: true 295 }) 296 297 function ReadStream (path, options) { 298 if (this instanceof ReadStream) 299 return fs$ReadStream.apply(this, arguments), this 300 else 301 return ReadStream.apply(Object.create(ReadStream.prototype), arguments) 302 } 303 304 function ReadStream$open () { 305 var that = this 306 open(that.path, that.flags, that.mode, function (err, fd) { 307 if (err) { 308 if (that.autoClose) 309 that.destroy() 310 311 that.emit('error', err) 312 } else { 313 that.fd = fd 314 that.emit('open', fd) 315 that.read() 316 } 317 }) 318 } 319 320 function WriteStream (path, options) { 321 if (this instanceof WriteStream) 322 return fs$WriteStream.apply(this, arguments), this 323 else 324 return WriteStream.apply(Object.create(WriteStream.prototype), arguments) 325 } 326 327 function WriteStream$open () { 328 var that = this 329 open(that.path, that.flags, that.mode, function (err, fd) { 330 if (err) { 331 that.destroy() 332 that.emit('error', err) 333 } else { 334 that.fd = fd 335 that.emit('open', fd) 336 } 337 }) 338 } 339 340 function createReadStream (path, options) { 341 return new fs.ReadStream(path, options) 342 } 343 344 function createWriteStream (path, options) { 345 return new fs.WriteStream(path, options) 346 } 347 348 var fs$open = fs.open 349 fs.open = open 350 function open (path, flags, mode, cb) { 351 if (typeof mode === 'function') 352 cb = mode, mode = null 353 354 return go$open(path, flags, mode, cb) 355 356 function go$open (path, flags, mode, cb, startTime) { 357 return fs$open(path, flags, mode, function (err, fd) { 358 if (err && (err.code === 'EMFILE' || err.code === 'ENFILE')) 359 enqueue([go$open, [path, flags, mode, cb], err, startTime || Date.now(), Date.now()]) 360 else { 361 if (typeof cb === 'function') 362 cb.apply(this, arguments) 363 } 364 }) 365 } 366 } 367 368 return fs 369} 370 371function enqueue (elem) { 372 debug('ENQUEUE', elem[0].name, elem[1]) 373 fs[gracefulQueue].push(elem) 374 retry() 375} 376 377// keep track of the timeout between retry() calls 378var retryTimer 379 380// reset the startTime and lastTime to now 381// this resets the start of the 60 second overall timeout as well as the 382// delay between attempts so that we'll retry these jobs sooner 383function resetQueue () { 384 var now = Date.now() 385 for (var i = 0; i < fs[gracefulQueue].length; ++i) { 386 // entries that are only a length of 2 are from an older version, don't 387 // bother modifying those since they'll be retried anyway. 388 if (fs[gracefulQueue][i].length > 2) { 389 fs[gracefulQueue][i][3] = now // startTime 390 fs[gracefulQueue][i][4] = now // lastTime 391 } 392 } 393 // call retry to make sure we're actively processing the queue 394 retry() 395} 396 397function retry () { 398 // clear the timer and remove it to help prevent unintended concurrency 399 clearTimeout(retryTimer) 400 retryTimer = undefined 401 402 if (fs[gracefulQueue].length === 0) 403 return 404 405 var elem = fs[gracefulQueue].shift() 406 var fn = elem[0] 407 var args = elem[1] 408 // these items may be unset if they were added by an older graceful-fs 409 var err = elem[2] 410 var startTime = elem[3] 411 var lastTime = elem[4] 412 413 // if we don't have a startTime we have no way of knowing if we've waited 414 // long enough, so go ahead and retry this item now 415 if (startTime === undefined) { 416 debug('RETRY', fn.name, args) 417 fn.apply(null, args) 418 } else if (Date.now() - startTime >= 60000) { 419 // it's been more than 60 seconds total, bail now 420 debug('TIMEOUT', fn.name, args) 421 var cb = args.pop() 422 if (typeof cb === 'function') 423 cb.call(null, err) 424 } else { 425 // the amount of time between the last attempt and right now 426 var sinceAttempt = Date.now() - lastTime 427 // the amount of time between when we first tried, and when we last tried 428 // rounded up to at least 1 429 var sinceStart = Math.max(lastTime - startTime, 1) 430 // backoff. wait longer than the total time we've been retrying, but only 431 // up to a maximum of 100ms 432 var desiredDelay = Math.min(sinceStart * 1.2, 100) 433 // it's been long enough since the last retry, do it again 434 if (sinceAttempt >= desiredDelay) { 435 debug('RETRY', fn.name, args) 436 fn.apply(null, args.concat([startTime])) 437 } else { 438 // if we can't do this job yet, push it to the end of the queue 439 // and let the next iteration check again 440 fs[gracefulQueue].push(elem) 441 } 442 } 443 444 // schedule our next run if one isn't already scheduled 445 if (retryTimer === undefined) { 446 retryTimer = setTimeout(retry, 0) 447 } 448} 449