• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict'
2
3const Buffer = require('./buffer.js')
4
5// A readable tar stream creator
6// Technically, this is a transform stream that you write paths into,
7// and tar format comes out of.
8// The `add()` method is like `write()` but returns this,
9// and end() return `this` as well, so you can
10// do `new Pack(opt).add('files').add('dir').end().pipe(output)
11// You could also do something like:
12// streamOfPaths().pipe(new Pack()).pipe(new fs.WriteStream('out.tar'))
13
14class PackJob {
15  constructor (path, absolute) {
16    this.path = path || './'
17    this.absolute = absolute
18    this.entry = null
19    this.stat = null
20    this.readdir = null
21    this.pending = false
22    this.ignore = false
23    this.piped = false
24  }
25}
26
27const MiniPass = require('minipass')
28const zlib = require('minizlib')
29const ReadEntry = require('./read-entry.js')
30const WriteEntry = require('./write-entry.js')
31const WriteEntrySync = WriteEntry.Sync
32const WriteEntryTar = WriteEntry.Tar
33const Yallist = require('yallist')
34const EOF = Buffer.alloc(1024)
35const ONSTAT = Symbol('onStat')
36const ENDED = Symbol('ended')
37const QUEUE = Symbol('queue')
38const CURRENT = Symbol('current')
39const PROCESS = Symbol('process')
40const PROCESSING = Symbol('processing')
41const PROCESSJOB = Symbol('processJob')
42const JOBS = Symbol('jobs')
43const JOBDONE = Symbol('jobDone')
44const ADDFSENTRY = Symbol('addFSEntry')
45const ADDTARENTRY = Symbol('addTarEntry')
46const STAT = Symbol('stat')
47const READDIR = Symbol('readdir')
48const ONREADDIR = Symbol('onreaddir')
49const PIPE = Symbol('pipe')
50const ENTRY = Symbol('entry')
51const ENTRYOPT = Symbol('entryOpt')
52const WRITEENTRYCLASS = Symbol('writeEntryClass')
53const WRITE = Symbol('write')
54const ONDRAIN = Symbol('ondrain')
55
56const fs = require('fs')
57const path = require('path')
58const warner = require('./warn-mixin.js')
59
60const Pack = warner(class Pack extends MiniPass {
61  constructor (opt) {
62    super(opt)
63    opt = opt || Object.create(null)
64    this.opt = opt
65    this.cwd = opt.cwd || process.cwd()
66    this.maxReadSize = opt.maxReadSize
67    this.preservePaths = !!opt.preservePaths
68    this.strict = !!opt.strict
69    this.noPax = !!opt.noPax
70    this.prefix = (opt.prefix || '').replace(/(\\|\/)+$/, '')
71    this.linkCache = opt.linkCache || new Map()
72    this.statCache = opt.statCache || new Map()
73    this.readdirCache = opt.readdirCache || new Map()
74
75    this[WRITEENTRYCLASS] = WriteEntry
76    if (typeof opt.onwarn === 'function')
77      this.on('warn', opt.onwarn)
78
79    this.zip = null
80    if (opt.gzip) {
81      if (typeof opt.gzip !== 'object')
82        opt.gzip = {}
83      this.zip = new zlib.Gzip(opt.gzip)
84      this.zip.on('data', chunk => super.write(chunk))
85      this.zip.on('end', _ => super.end())
86      this.zip.on('drain', _ => this[ONDRAIN]())
87      this.on('resume', _ => this.zip.resume())
88    } else
89      this.on('drain', this[ONDRAIN])
90
91    this.portable = !!opt.portable
92    this.noDirRecurse = !!opt.noDirRecurse
93    this.follow = !!opt.follow
94    this.noMtime = !!opt.noMtime
95    this.mtime = opt.mtime || null
96
97    this.filter = typeof opt.filter === 'function' ? opt.filter : _ => true
98
99    this[QUEUE] = new Yallist
100    this[JOBS] = 0
101    this.jobs = +opt.jobs || 4
102    this[PROCESSING] = false
103    this[ENDED] = false
104  }
105
106  [WRITE] (chunk) {
107    return super.write(chunk)
108  }
109
110  add (path) {
111    this.write(path)
112    return this
113  }
114
115  end (path) {
116    if (path)
117      this.write(path)
118    this[ENDED] = true
119    this[PROCESS]()
120    return this
121  }
122
123  write (path) {
124    if (this[ENDED])
125      throw new Error('write after end')
126
127    if (path instanceof ReadEntry)
128      this[ADDTARENTRY](path)
129    else
130      this[ADDFSENTRY](path)
131    return this.flowing
132  }
133
134  [ADDTARENTRY] (p) {
135    const absolute = path.resolve(this.cwd, p.path)
136    if (this.prefix)
137      p.path = this.prefix + '/' + p.path.replace(/^\.(\/+|$)/, '')
138
139    // in this case, we don't have to wait for the stat
140    if (!this.filter(p.path, p))
141      p.resume()
142    else {
143      const job = new PackJob(p.path, absolute, false)
144      job.entry = new WriteEntryTar(p, this[ENTRYOPT](job))
145      job.entry.on('end', _ => this[JOBDONE](job))
146      this[JOBS] += 1
147      this[QUEUE].push(job)
148    }
149
150    this[PROCESS]()
151  }
152
153  [ADDFSENTRY] (p) {
154    const absolute = path.resolve(this.cwd, p)
155    if (this.prefix)
156      p = this.prefix + '/' + p.replace(/^\.(\/+|$)/, '')
157
158    this[QUEUE].push(new PackJob(p, absolute))
159    this[PROCESS]()
160  }
161
162  [STAT] (job) {
163    job.pending = true
164    this[JOBS] += 1
165    const stat = this.follow ? 'stat' : 'lstat'
166    fs[stat](job.absolute, (er, stat) => {
167      job.pending = false
168      this[JOBS] -= 1
169      if (er)
170        this.emit('error', er)
171      else
172        this[ONSTAT](job, stat)
173    })
174  }
175
176  [ONSTAT] (job, stat) {
177    this.statCache.set(job.absolute, stat)
178    job.stat = stat
179
180    // now we have the stat, we can filter it.
181    if (!this.filter(job.path, stat))
182      job.ignore = true
183
184    this[PROCESS]()
185  }
186
187  [READDIR] (job) {
188    job.pending = true
189    this[JOBS] += 1
190    fs.readdir(job.absolute, (er, entries) => {
191      job.pending = false
192      this[JOBS] -= 1
193      if (er)
194        return this.emit('error', er)
195      this[ONREADDIR](job, entries)
196    })
197  }
198
199  [ONREADDIR] (job, entries) {
200    this.readdirCache.set(job.absolute, entries)
201    job.readdir = entries
202    this[PROCESS]()
203  }
204
205  [PROCESS] () {
206    if (this[PROCESSING])
207      return
208
209    this[PROCESSING] = true
210    for (let w = this[QUEUE].head;
211         w !== null && this[JOBS] < this.jobs;
212         w = w.next) {
213      this[PROCESSJOB](w.value)
214      if (w.value.ignore) {
215        const p = w.next
216        this[QUEUE].removeNode(w)
217        w.next = p
218      }
219    }
220
221    this[PROCESSING] = false
222
223    if (this[ENDED] && !this[QUEUE].length && this[JOBS] === 0) {
224      if (this.zip)
225        this.zip.end(EOF)
226      else {
227        super.write(EOF)
228        super.end()
229      }
230    }
231  }
232
233  get [CURRENT] () {
234    return this[QUEUE] && this[QUEUE].head && this[QUEUE].head.value
235  }
236
237  [JOBDONE] (job) {
238    this[QUEUE].shift()
239    this[JOBS] -= 1
240    this[PROCESS]()
241  }
242
243  [PROCESSJOB] (job) {
244    if (job.pending)
245      return
246
247    if (job.entry) {
248      if (job === this[CURRENT] && !job.piped)
249        this[PIPE](job)
250      return
251    }
252
253    if (!job.stat) {
254      if (this.statCache.has(job.absolute))
255        this[ONSTAT](job, this.statCache.get(job.absolute))
256      else
257        this[STAT](job)
258    }
259    if (!job.stat)
260      return
261
262    // filtered out!
263    if (job.ignore)
264      return
265
266    if (!this.noDirRecurse && job.stat.isDirectory() && !job.readdir) {
267      if (this.readdirCache.has(job.absolute))
268        this[ONREADDIR](job, this.readdirCache.get(job.absolute))
269      else
270        this[READDIR](job)
271      if (!job.readdir)
272        return
273    }
274
275    // we know it doesn't have an entry, because that got checked above
276    job.entry = this[ENTRY](job)
277    if (!job.entry) {
278      job.ignore = true
279      return
280    }
281
282    if (job === this[CURRENT] && !job.piped)
283      this[PIPE](job)
284  }
285
286  [ENTRYOPT] (job) {
287    return {
288      onwarn: (msg, data) => {
289        this.warn(msg, data)
290      },
291      noPax: this.noPax,
292      cwd: this.cwd,
293      absolute: job.absolute,
294      preservePaths: this.preservePaths,
295      maxReadSize: this.maxReadSize,
296      strict: this.strict,
297      portable: this.portable,
298      linkCache: this.linkCache,
299      statCache: this.statCache,
300      noMtime: this.noMtime,
301      mtime: this.mtime
302    }
303  }
304
305  [ENTRY] (job) {
306    this[JOBS] += 1
307    try {
308      return new this[WRITEENTRYCLASS](job.path, this[ENTRYOPT](job))
309        .on('end', () => this[JOBDONE](job))
310        .on('error', er => this.emit('error', er))
311    } catch (er) {
312      this.emit('error', er)
313    }
314  }
315
316  [ONDRAIN] () {
317    if (this[CURRENT] && this[CURRENT].entry)
318      this[CURRENT].entry.resume()
319  }
320
321  // like .pipe() but using super, because our write() is special
322  [PIPE] (job) {
323    job.piped = true
324
325    if (job.readdir)
326      job.readdir.forEach(entry => {
327        const p = this.prefix ?
328          job.path.slice(this.prefix.length + 1) || './'
329          : job.path
330
331        const base = p === './' ? '' : p.replace(/\/*$/, '/')
332        this[ADDFSENTRY](base + entry)
333      })
334
335    const source = job.entry
336    const zip = this.zip
337
338    if (zip)
339      source.on('data', chunk => {
340        if (!zip.write(chunk))
341          source.pause()
342      })
343    else
344      source.on('data', chunk => {
345        if (!super.write(chunk))
346          source.pause()
347      })
348  }
349
350  pause () {
351    if (this.zip)
352      this.zip.pause()
353    return super.pause()
354  }
355})
356
357class PackSync extends Pack {
358  constructor (opt) {
359    super(opt)
360    this[WRITEENTRYCLASS] = WriteEntrySync
361  }
362
363  // pause/resume are no-ops in sync streams.
364  pause () {}
365  resume () {}
366
367  [STAT] (job) {
368    const stat = this.follow ? 'statSync' : 'lstatSync'
369    this[ONSTAT](job, fs[stat](job.absolute))
370  }
371
372  [READDIR] (job, stat) {
373    this[ONREADDIR](job, fs.readdirSync(job.absolute))
374  }
375
376  // gotta get it all in this tick
377  [PIPE] (job) {
378    const source = job.entry
379    const zip = this.zip
380
381    if (job.readdir)
382      job.readdir.forEach(entry => {
383        const p = this.prefix ?
384          job.path.slice(this.prefix.length + 1) || './'
385          : job.path
386
387        const base = p === './' ? '' : p.replace(/\/*$/, '/')
388        this[ADDFSENTRY](base + entry)
389      })
390
391    if (zip)
392      source.on('data', chunk => {
393        zip.write(chunk)
394      })
395    else
396      source.on('data', chunk => {
397        super[WRITE](chunk)
398      })
399  }
400}
401
402Pack.Sync = PackSync
403
404module.exports = Pack
405