1var fs = require('graceful-fs') 2var Writable = require('readable-stream').Writable 3var util = require('util') 4var MurmurHash3 = require('imurmurhash') 5var iferr = require('iferr') 6var crypto = require('crypto') 7 8function murmurhex () { 9 var hash = MurmurHash3('') 10 for (var ii = 0; ii < arguments.length; ++ii) { 11 hash.hash('' + arguments[ii]) 12 } 13 return hash.result() 14} 15 16var invocations = 0 17function getTmpname (filename) { 18 return filename + '.' + murmurhex(__filename, process.pid, ++invocations) 19} 20 21var setImmediate = global.setImmediate || setTimeout 22 23module.exports = WriteStreamAtomic 24 25// Requirements: 26// 1. Write everything written to the stream to a temp file. 27// 2. If there are no errors: 28// a. moves the temp file into its final destination 29// b. emits `finish` & `closed` ONLY after the file is 30// fully flushed and renamed. 31// 3. If there's an error, removes the temp file. 32 33util.inherits(WriteStreamAtomic, Writable) 34function WriteStreamAtomic (path, options) { 35 if (!(this instanceof WriteStreamAtomic)) { 36 return new WriteStreamAtomic(path, options) 37 } 38 Writable.call(this, options) 39 40 this.__isWin = options && options.hasOwnProperty('isWin') ? options.isWin : process.platform === 'win32' 41 42 this.__atomicTarget = path 43 this.__atomicTmp = getTmpname(path) 44 45 this.__atomicChown = options && options.chown 46 47 this.__atomicClosed = false 48 49 this.__atomicStream = fs.WriteStream(this.__atomicTmp, options) 50 51 this.__atomicStream.once('open', handleOpen(this)) 52 this.__atomicStream.once('close', handleClose(this)) 53 this.__atomicStream.once('error', handleError(this)) 54} 55 56// We have to suppress default finish emitting, because ordinarily it 57// would happen as soon as `end` is called on us and all of the 58// data has been written to our target stream. So we suppress 59// finish from being emitted here, and only emit it after our 60// target stream is closed and we've moved everything around. 61WriteStreamAtomic.prototype.emit = function (event) { 62 if (event === 'finish') return this.__atomicStream.end() 63 return Writable.prototype.emit.apply(this, arguments) 64} 65 66WriteStreamAtomic.prototype._write = function (buffer, encoding, cb) { 67 var flushed = this.__atomicStream.write(buffer, encoding) 68 if (flushed) return cb() 69 this.__atomicStream.once('drain', cb) 70} 71 72function handleOpen (writeStream) { 73 return function (fd) { 74 writeStream.emit('open', fd) 75 } 76} 77 78function handleClose (writeStream) { 79 return function () { 80 if (writeStream.__atomicClosed) return 81 writeStream.__atomicClosed = true 82 if (writeStream.__atomicChown) { 83 var uid = writeStream.__atomicChown.uid 84 var gid = writeStream.__atomicChown.gid 85 return fs.chown(writeStream.__atomicTmp, uid, gid, iferr(cleanup, moveIntoPlace)) 86 } else { 87 moveIntoPlace() 88 } 89 } 90 91 function moveIntoPlace () { 92 fs.rename(writeStream.__atomicTmp, writeStream.__atomicTarget, iferr(trapWindowsEPERM, end)) 93 } 94 95 function trapWindowsEPERM (err) { 96 if (writeStream.__isWin && 97 err.syscall && err.syscall === 'rename' && 98 err.code && err.code === 'EPERM' 99 ) { 100 checkFileHashes(err) 101 } else { 102 cleanup(err) 103 } 104 } 105 106 function checkFileHashes (eperm) { 107 var inprocess = 2 108 var tmpFileHash = crypto.createHash('sha512') 109 var targetFileHash = crypto.createHash('sha512') 110 111 fs.createReadStream(writeStream.__atomicTmp) 112 .on('data', function (data, enc) { tmpFileHash.update(data, enc) }) 113 .on('error', fileHashError) 114 .on('end', fileHashComplete) 115 fs.createReadStream(writeStream.__atomicTarget) 116 .on('data', function (data, enc) { targetFileHash.update(data, enc) }) 117 .on('error', fileHashError) 118 .on('end', fileHashComplete) 119 120 function fileHashError () { 121 if (inprocess === 0) return 122 inprocess = 0 123 cleanup(eperm) 124 } 125 126 function fileHashComplete () { 127 if (inprocess === 0) return 128 if (--inprocess) return 129 if (tmpFileHash.digest('hex') === targetFileHash.digest('hex')) { 130 return cleanup() 131 } else { 132 return cleanup(eperm) 133 } 134 } 135 } 136 137 function cleanup (err) { 138 fs.unlink(writeStream.__atomicTmp, function () { 139 if (err) { 140 writeStream.emit('error', err) 141 writeStream.emit('close') 142 } else { 143 end() 144 } 145 }) 146 } 147 148 function end () { 149 // We have to use our parent class directly because we suppress `finish` 150 // events fired via our own emit method. 151 Writable.prototype.emit.call(writeStream, 'finish') 152 153 // Delay the close to provide the same temporal separation a physical 154 // file operation would have– that is, the close event is emitted only 155 // after the async close operation completes. 156 setImmediate(function () { 157 writeStream.emit('close') 158 }) 159 } 160} 161 162function handleError (writeStream) { 163 return function (er) { 164 cleanupSync() 165 writeStream.emit('error', er) 166 writeStream.__atomicClosed = true 167 writeStream.emit('close') 168 } 169 function cleanupSync () { 170 try { 171 fs.unlinkSync(writeStream.__atomicTmp) 172 } finally { 173 return 174 } 175 } 176} 177