1'use strict' 2 3const Buffer = require('./buffer.js') 4 5// A readable tar stream creator 6// Technically, this is a transform stream that you write paths into, 7// and tar format comes out of. 8// The `add()` method is like `write()` but returns this, 9// and end() return `this` as well, so you can 10// do `new Pack(opt).add('files').add('dir').end().pipe(output) 11// You could also do something like: 12// streamOfPaths().pipe(new Pack()).pipe(new fs.WriteStream('out.tar')) 13 14class PackJob { 15 constructor (path, absolute) { 16 this.path = path || './' 17 this.absolute = absolute 18 this.entry = null 19 this.stat = null 20 this.readdir = null 21 this.pending = false 22 this.ignore = false 23 this.piped = false 24 } 25} 26 27const MiniPass = require('minipass') 28const zlib = require('minizlib') 29const ReadEntry = require('./read-entry.js') 30const WriteEntry = require('./write-entry.js') 31const WriteEntrySync = WriteEntry.Sync 32const WriteEntryTar = WriteEntry.Tar 33const Yallist = require('yallist') 34const EOF = Buffer.alloc(1024) 35const ONSTAT = Symbol('onStat') 36const ENDED = Symbol('ended') 37const QUEUE = Symbol('queue') 38const CURRENT = Symbol('current') 39const PROCESS = Symbol('process') 40const PROCESSING = Symbol('processing') 41const PROCESSJOB = Symbol('processJob') 42const JOBS = Symbol('jobs') 43const JOBDONE = Symbol('jobDone') 44const ADDFSENTRY = Symbol('addFSEntry') 45const ADDTARENTRY = Symbol('addTarEntry') 46const STAT = Symbol('stat') 47const READDIR = Symbol('readdir') 48const ONREADDIR = Symbol('onreaddir') 49const PIPE = Symbol('pipe') 50const ENTRY = Symbol('entry') 51const ENTRYOPT = Symbol('entryOpt') 52const WRITEENTRYCLASS = Symbol('writeEntryClass') 53const WRITE = Symbol('write') 54const ONDRAIN = Symbol('ondrain') 55 56const fs = require('fs') 57const path = require('path') 58const warner = require('./warn-mixin.js') 59 60const Pack = warner(class Pack extends MiniPass { 61 constructor (opt) { 62 super(opt) 63 opt = opt || Object.create(null) 64 this.opt = opt 65 this.cwd = opt.cwd || process.cwd() 66 this.maxReadSize = opt.maxReadSize 67 this.preservePaths = !!opt.preservePaths 68 this.strict = !!opt.strict 69 this.noPax = !!opt.noPax 70 this.prefix = (opt.prefix || '').replace(/(\\|\/)+$/, '') 71 this.linkCache = opt.linkCache || new Map() 72 this.statCache = opt.statCache || new Map() 73 this.readdirCache = opt.readdirCache || new Map() 74 75 this[WRITEENTRYCLASS] = WriteEntry 76 if (typeof opt.onwarn === 'function') 77 this.on('warn', opt.onwarn) 78 79 this.zip = null 80 if (opt.gzip) { 81 if (typeof opt.gzip !== 'object') 82 opt.gzip = {} 83 this.zip = new zlib.Gzip(opt.gzip) 84 this.zip.on('data', chunk => super.write(chunk)) 85 this.zip.on('end', _ => super.end()) 86 this.zip.on('drain', _ => this[ONDRAIN]()) 87 this.on('resume', _ => this.zip.resume()) 88 } else 89 this.on('drain', this[ONDRAIN]) 90 91 this.portable = !!opt.portable 92 this.noDirRecurse = !!opt.noDirRecurse 93 this.follow = !!opt.follow 94 this.noMtime = !!opt.noMtime 95 this.mtime = opt.mtime || null 96 97 this.filter = typeof opt.filter === 'function' ? opt.filter : _ => true 98 99 this[QUEUE] = new Yallist 100 this[JOBS] = 0 101 this.jobs = +opt.jobs || 4 102 this[PROCESSING] = false 103 this[ENDED] = false 104 } 105 106 [WRITE] (chunk) { 107 return super.write(chunk) 108 } 109 110 add (path) { 111 this.write(path) 112 return this 113 } 114 115 end (path) { 116 if (path) 117 this.write(path) 118 this[ENDED] = true 119 this[PROCESS]() 120 return this 121 } 122 123 write (path) { 124 if (this[ENDED]) 125 throw new Error('write after end') 126 127 if (path instanceof ReadEntry) 128 this[ADDTARENTRY](path) 129 else 130 this[ADDFSENTRY](path) 131 return this.flowing 132 } 133 134 [ADDTARENTRY] (p) { 135 const absolute = path.resolve(this.cwd, p.path) 136 if (this.prefix) 137 p.path = this.prefix + '/' + p.path.replace(/^\.(\/+|$)/, '') 138 139 // in this case, we don't have to wait for the stat 140 if (!this.filter(p.path, p)) 141 p.resume() 142 else { 143 const job = new PackJob(p.path, absolute, false) 144 job.entry = new WriteEntryTar(p, this[ENTRYOPT](job)) 145 job.entry.on('end', _ => this[JOBDONE](job)) 146 this[JOBS] += 1 147 this[QUEUE].push(job) 148 } 149 150 this[PROCESS]() 151 } 152 153 [ADDFSENTRY] (p) { 154 const absolute = path.resolve(this.cwd, p) 155 if (this.prefix) 156 p = this.prefix + '/' + p.replace(/^\.(\/+|$)/, '') 157 158 this[QUEUE].push(new PackJob(p, absolute)) 159 this[PROCESS]() 160 } 161 162 [STAT] (job) { 163 job.pending = true 164 this[JOBS] += 1 165 const stat = this.follow ? 'stat' : 'lstat' 166 fs[stat](job.absolute, (er, stat) => { 167 job.pending = false 168 this[JOBS] -= 1 169 if (er) 170 this.emit('error', er) 171 else 172 this[ONSTAT](job, stat) 173 }) 174 } 175 176 [ONSTAT] (job, stat) { 177 this.statCache.set(job.absolute, stat) 178 job.stat = stat 179 180 // now we have the stat, we can filter it. 181 if (!this.filter(job.path, stat)) 182 job.ignore = true 183 184 this[PROCESS]() 185 } 186 187 [READDIR] (job) { 188 job.pending = true 189 this[JOBS] += 1 190 fs.readdir(job.absolute, (er, entries) => { 191 job.pending = false 192 this[JOBS] -= 1 193 if (er) 194 return this.emit('error', er) 195 this[ONREADDIR](job, entries) 196 }) 197 } 198 199 [ONREADDIR] (job, entries) { 200 this.readdirCache.set(job.absolute, entries) 201 job.readdir = entries 202 this[PROCESS]() 203 } 204 205 [PROCESS] () { 206 if (this[PROCESSING]) 207 return 208 209 this[PROCESSING] = true 210 for (let w = this[QUEUE].head; 211 w !== null && this[JOBS] < this.jobs; 212 w = w.next) { 213 this[PROCESSJOB](w.value) 214 if (w.value.ignore) { 215 const p = w.next 216 this[QUEUE].removeNode(w) 217 w.next = p 218 } 219 } 220 221 this[PROCESSING] = false 222 223 if (this[ENDED] && !this[QUEUE].length && this[JOBS] === 0) { 224 if (this.zip) 225 this.zip.end(EOF) 226 else { 227 super.write(EOF) 228 super.end() 229 } 230 } 231 } 232 233 get [CURRENT] () { 234 return this[QUEUE] && this[QUEUE].head && this[QUEUE].head.value 235 } 236 237 [JOBDONE] (job) { 238 this[QUEUE].shift() 239 this[JOBS] -= 1 240 this[PROCESS]() 241 } 242 243 [PROCESSJOB] (job) { 244 if (job.pending) 245 return 246 247 if (job.entry) { 248 if (job === this[CURRENT] && !job.piped) 249 this[PIPE](job) 250 return 251 } 252 253 if (!job.stat) { 254 if (this.statCache.has(job.absolute)) 255 this[ONSTAT](job, this.statCache.get(job.absolute)) 256 else 257 this[STAT](job) 258 } 259 if (!job.stat) 260 return 261 262 // filtered out! 263 if (job.ignore) 264 return 265 266 if (!this.noDirRecurse && job.stat.isDirectory() && !job.readdir) { 267 if (this.readdirCache.has(job.absolute)) 268 this[ONREADDIR](job, this.readdirCache.get(job.absolute)) 269 else 270 this[READDIR](job) 271 if (!job.readdir) 272 return 273 } 274 275 // we know it doesn't have an entry, because that got checked above 276 job.entry = this[ENTRY](job) 277 if (!job.entry) { 278 job.ignore = true 279 return 280 } 281 282 if (job === this[CURRENT] && !job.piped) 283 this[PIPE](job) 284 } 285 286 [ENTRYOPT] (job) { 287 return { 288 onwarn: (msg, data) => { 289 this.warn(msg, data) 290 }, 291 noPax: this.noPax, 292 cwd: this.cwd, 293 absolute: job.absolute, 294 preservePaths: this.preservePaths, 295 maxReadSize: this.maxReadSize, 296 strict: this.strict, 297 portable: this.portable, 298 linkCache: this.linkCache, 299 statCache: this.statCache, 300 noMtime: this.noMtime, 301 mtime: this.mtime 302 } 303 } 304 305 [ENTRY] (job) { 306 this[JOBS] += 1 307 try { 308 return new this[WRITEENTRYCLASS](job.path, this[ENTRYOPT](job)) 309 .on('end', () => this[JOBDONE](job)) 310 .on('error', er => this.emit('error', er)) 311 } catch (er) { 312 this.emit('error', er) 313 } 314 } 315 316 [ONDRAIN] () { 317 if (this[CURRENT] && this[CURRENT].entry) 318 this[CURRENT].entry.resume() 319 } 320 321 // like .pipe() but using super, because our write() is special 322 [PIPE] (job) { 323 job.piped = true 324 325 if (job.readdir) 326 job.readdir.forEach(entry => { 327 const p = this.prefix ? 328 job.path.slice(this.prefix.length + 1) || './' 329 : job.path 330 331 const base = p === './' ? '' : p.replace(/\/*$/, '/') 332 this[ADDFSENTRY](base + entry) 333 }) 334 335 const source = job.entry 336 const zip = this.zip 337 338 if (zip) 339 source.on('data', chunk => { 340 if (!zip.write(chunk)) 341 source.pause() 342 }) 343 else 344 source.on('data', chunk => { 345 if (!super.write(chunk)) 346 source.pause() 347 }) 348 } 349 350 pause () { 351 if (this.zip) 352 this.zip.pause() 353 return super.pause() 354 } 355}) 356 357class PackSync extends Pack { 358 constructor (opt) { 359 super(opt) 360 this[WRITEENTRYCLASS] = WriteEntrySync 361 } 362 363 // pause/resume are no-ops in sync streams. 364 pause () {} 365 resume () {} 366 367 [STAT] (job) { 368 const stat = this.follow ? 'statSync' : 'lstatSync' 369 this[ONSTAT](job, fs[stat](job.absolute)) 370 } 371 372 [READDIR] (job, stat) { 373 this[ONREADDIR](job, fs.readdirSync(job.absolute)) 374 } 375 376 // gotta get it all in this tick 377 [PIPE] (job) { 378 const source = job.entry 379 const zip = this.zip 380 381 if (job.readdir) 382 job.readdir.forEach(entry => { 383 const p = this.prefix ? 384 job.path.slice(this.prefix.length + 1) || './' 385 : job.path 386 387 const base = p === './' ? '' : p.replace(/\/*$/, '/') 388 this[ADDFSENTRY](base + entry) 389 }) 390 391 if (zip) 392 source.on('data', chunk => { 393 zip.write(chunk) 394 }) 395 else 396 source.on('data', chunk => { 397 super[WRITE](chunk) 398 }) 399 } 400} 401 402Pack.Sync = PackSync 403 404module.exports = Pack 405