1'use strict' 2 3const Buffer = require('./buffer.js') 4 5// A readable tar stream creator 6// Technically, this is a transform stream that you write paths into, 7// and tar format comes out of. 8// The `add()` method is like `write()` but returns this, 9// and end() return `this` as well, so you can 10// do `new Pack(opt).add('files').add('dir').end().pipe(output) 11// You could also do something like: 12// streamOfPaths().pipe(new Pack()).pipe(new fs.WriteStream('out.tar')) 13 14class PackJob { 15 constructor (path, absolute) { 16 this.path = path || './' 17 this.absolute = absolute 18 this.entry = null 19 this.stat = null 20 this.readdir = null 21 this.pending = false 22 this.ignore = false 23 this.piped = false 24 } 25} 26 27const MiniPass = require('minipass') 28const zlib = require('minizlib') 29const ReadEntry = require('./read-entry.js') 30const WriteEntry = require('./write-entry.js') 31const WriteEntrySync = WriteEntry.Sync 32const WriteEntryTar = WriteEntry.Tar 33const Yallist = require('yallist') 34const EOF = Buffer.alloc(1024) 35const ONSTAT = Symbol('onStat') 36const ENDED = Symbol('ended') 37const QUEUE = Symbol('queue') 38const CURRENT = Symbol('current') 39const PROCESS = Symbol('process') 40const PROCESSING = Symbol('processing') 41const PROCESSJOB = Symbol('processJob') 42const JOBS = Symbol('jobs') 43const JOBDONE = Symbol('jobDone') 44const ADDFSENTRY = Symbol('addFSEntry') 45const ADDTARENTRY = Symbol('addTarEntry') 46const STAT = Symbol('stat') 47const READDIR = Symbol('readdir') 48const ONREADDIR = Symbol('onreaddir') 49const PIPE = Symbol('pipe') 50const ENTRY = Symbol('entry') 51const ENTRYOPT = Symbol('entryOpt') 52const WRITEENTRYCLASS = Symbol('writeEntryClass') 53const WRITE = Symbol('write') 54const ONDRAIN = Symbol('ondrain') 55 56const fs = require('fs') 57const path = require('path') 58const warner = require('./warn-mixin.js') 59const normPath = require('./normalize-windows-path.js') 60 61const Pack = warner(class Pack extends MiniPass { 62 constructor (opt) { 63 super(opt) 64 opt = opt || Object.create(null) 65 this.opt = opt 66 this.cwd = opt.cwd || process.cwd() 67 this.maxReadSize = opt.maxReadSize 68 this.preservePaths = !!opt.preservePaths 69 this.strict = !!opt.strict 70 this.noPax = !!opt.noPax 71 this.prefix = normPath(opt.prefix || '') 72 this.linkCache = opt.linkCache || new Map() 73 this.statCache = opt.statCache || new Map() 74 this.readdirCache = opt.readdirCache || new Map() 75 76 this[WRITEENTRYCLASS] = WriteEntry 77 if (typeof opt.onwarn === 'function') 78 this.on('warn', opt.onwarn) 79 80 this.zip = null 81 if (opt.gzip) { 82 if (typeof opt.gzip !== 'object') 83 opt.gzip = {} 84 this.zip = new zlib.Gzip(opt.gzip) 85 this.zip.on('data', chunk => super.write(chunk)) 86 this.zip.on('end', _ => super.end()) 87 this.zip.on('drain', _ => this[ONDRAIN]()) 88 this.on('resume', _ => this.zip.resume()) 89 } else 90 this.on('drain', this[ONDRAIN]) 91 92 this.portable = !!opt.portable 93 this.noDirRecurse = !!opt.noDirRecurse 94 this.follow = !!opt.follow 95 this.noMtime = !!opt.noMtime 96 this.mtime = opt.mtime || null 97 98 this.filter = typeof opt.filter === 'function' ? opt.filter : _ => true 99 100 this[QUEUE] = new Yallist 101 this[JOBS] = 0 102 this.jobs = +opt.jobs || 4 103 this[PROCESSING] = false 104 this[ENDED] = false 105 } 106 107 [WRITE] (chunk) { 108 return super.write(chunk) 109 } 110 111 add (path) { 112 this.write(path) 113 return this 114 } 115 116 end (path) { 117 if (path) 118 this.write(path) 119 this[ENDED] = true 120 this[PROCESS]() 121 return this 122 } 123 124 write (path) { 125 if (this[ENDED]) 126 throw new Error('write after end') 127 128 if (path instanceof ReadEntry) 129 this[ADDTARENTRY](path) 130 else 131 this[ADDFSENTRY](path) 132 return this.flowing 133 } 134 135 [ADDTARENTRY] (p) { 136 const absolute = normPath(path.resolve(this.cwd, p.path)) 137 // in this case, we don't have to wait for the stat 138 if (!this.filter(p.path, p)) 139 p.resume() 140 else { 141 const job = new PackJob(p.path, absolute, false) 142 job.entry = new WriteEntryTar(p, this[ENTRYOPT](job)) 143 job.entry.on('end', _ => this[JOBDONE](job)) 144 this[JOBS] += 1 145 this[QUEUE].push(job) 146 } 147 148 this[PROCESS]() 149 } 150 151 [ADDFSENTRY] (p) { 152 const absolute = normPath(path.resolve(this.cwd, p)) 153 this[QUEUE].push(new PackJob(p, absolute)) 154 this[PROCESS]() 155 } 156 157 [STAT] (job) { 158 job.pending = true 159 this[JOBS] += 1 160 const stat = this.follow ? 'stat' : 'lstat' 161 fs[stat](job.absolute, (er, stat) => { 162 job.pending = false 163 this[JOBS] -= 1 164 if (er) 165 this.emit('error', er) 166 else 167 this[ONSTAT](job, stat) 168 }) 169 } 170 171 [ONSTAT] (job, stat) { 172 this.statCache.set(job.absolute, stat) 173 job.stat = stat 174 175 // now we have the stat, we can filter it. 176 if (!this.filter(job.path, stat)) 177 job.ignore = true 178 179 this[PROCESS]() 180 } 181 182 [READDIR] (job) { 183 job.pending = true 184 this[JOBS] += 1 185 fs.readdir(job.absolute, (er, entries) => { 186 job.pending = false 187 this[JOBS] -= 1 188 if (er) 189 return this.emit('error', er) 190 this[ONREADDIR](job, entries) 191 }) 192 } 193 194 [ONREADDIR] (job, entries) { 195 this.readdirCache.set(job.absolute, entries) 196 job.readdir = entries 197 this[PROCESS]() 198 } 199 200 [PROCESS] () { 201 if (this[PROCESSING]) 202 return 203 204 this[PROCESSING] = true 205 for (let w = this[QUEUE].head; 206 w !== null && this[JOBS] < this.jobs; 207 w = w.next) { 208 this[PROCESSJOB](w.value) 209 if (w.value.ignore) { 210 const p = w.next 211 this[QUEUE].removeNode(w) 212 w.next = p 213 } 214 } 215 216 this[PROCESSING] = false 217 218 if (this[ENDED] && !this[QUEUE].length && this[JOBS] === 0) { 219 if (this.zip) 220 this.zip.end(EOF) 221 else { 222 super.write(EOF) 223 super.end() 224 } 225 } 226 } 227 228 get [CURRENT] () { 229 return this[QUEUE] && this[QUEUE].head && this[QUEUE].head.value 230 } 231 232 [JOBDONE] (job) { 233 this[QUEUE].shift() 234 this[JOBS] -= 1 235 this[PROCESS]() 236 } 237 238 [PROCESSJOB] (job) { 239 if (job.pending) 240 return 241 242 if (job.entry) { 243 if (job === this[CURRENT] && !job.piped) 244 this[PIPE](job) 245 return 246 } 247 248 if (!job.stat) { 249 if (this.statCache.has(job.absolute)) 250 this[ONSTAT](job, this.statCache.get(job.absolute)) 251 else 252 this[STAT](job) 253 } 254 if (!job.stat) 255 return 256 257 // filtered out! 258 if (job.ignore) 259 return 260 261 if (!this.noDirRecurse && job.stat.isDirectory() && !job.readdir) { 262 if (this.readdirCache.has(job.absolute)) 263 this[ONREADDIR](job, this.readdirCache.get(job.absolute)) 264 else 265 this[READDIR](job) 266 if (!job.readdir) 267 return 268 } 269 270 // we know it doesn't have an entry, because that got checked above 271 job.entry = this[ENTRY](job) 272 if (!job.entry) { 273 job.ignore = true 274 return 275 } 276 277 if (job === this[CURRENT] && !job.piped) 278 this[PIPE](job) 279 } 280 281 [ENTRYOPT] (job) { 282 return { 283 onwarn: (msg, data) => { 284 this.warn(msg, data) 285 }, 286 noPax: this.noPax, 287 cwd: this.cwd, 288 absolute: job.absolute, 289 preservePaths: this.preservePaths, 290 maxReadSize: this.maxReadSize, 291 strict: this.strict, 292 portable: this.portable, 293 linkCache: this.linkCache, 294 statCache: this.statCache, 295 noMtime: this.noMtime, 296 mtime: this.mtime, 297 prefix: this.prefix, 298 } 299 } 300 301 [ENTRY] (job) { 302 this[JOBS] += 1 303 try { 304 return new this[WRITEENTRYCLASS](job.path, this[ENTRYOPT](job)) 305 .on('end', () => this[JOBDONE](job)) 306 .on('error', er => this.emit('error', er)) 307 } catch (er) { 308 this.emit('error', er) 309 } 310 } 311 312 [ONDRAIN] () { 313 if (this[CURRENT] && this[CURRENT].entry) 314 this[CURRENT].entry.resume() 315 } 316 317 // like .pipe() but using super, because our write() is special 318 [PIPE] (job) { 319 job.piped = true 320 321 if (job.readdir) 322 job.readdir.forEach(entry => { 323 const p = job.path 324 const base = p === './' ? '' : p.replace(/\/*$/, '/') 325 this[ADDFSENTRY](base + entry) 326 }) 327 328 const source = job.entry 329 const zip = this.zip 330 331 if (zip) 332 source.on('data', chunk => { 333 if (!zip.write(chunk)) 334 source.pause() 335 }) 336 else 337 source.on('data', chunk => { 338 if (!super.write(chunk)) 339 source.pause() 340 }) 341 } 342 343 pause () { 344 if (this.zip) 345 this.zip.pause() 346 return super.pause() 347 } 348}) 349 350class PackSync extends Pack { 351 constructor (opt) { 352 super(opt) 353 this[WRITEENTRYCLASS] = WriteEntrySync 354 } 355 356 // pause/resume are no-ops in sync streams. 357 pause () {} 358 resume () {} 359 360 [STAT] (job) { 361 const stat = this.follow ? 'statSync' : 'lstatSync' 362 this[ONSTAT](job, fs[stat](job.absolute)) 363 } 364 365 [READDIR] (job, stat) { 366 this[ONREADDIR](job, fs.readdirSync(job.absolute)) 367 } 368 369 // gotta get it all in this tick 370 [PIPE] (job) { 371 const source = job.entry 372 const zip = this.zip 373 374 if (job.readdir) 375 job.readdir.forEach(entry => { 376 const p = job.path 377 const base = p === './' ? '' : p.replace(/\/*$/, '/') 378 this[ADDFSENTRY](base + entry) 379 }) 380 381 if (zip) 382 source.on('data', chunk => { 383 zip.write(chunk) 384 }) 385 else 386 source.on('data', chunk => { 387 super[WRITE](chunk) 388 }) 389 } 390} 391 392Pack.Sync = PackSync 393 394module.exports = Pack 395