• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict'
2
3const Buffer = require('./buffer.js')
4
5// A readable tar stream creator
6// Technically, this is a transform stream that you write paths into,
7// and tar format comes out of.
8// The `add()` method is like `write()` but returns this,
9// and end() return `this` as well, so you can
10// do `new Pack(opt).add('files').add('dir').end().pipe(output)
11// You could also do something like:
12// streamOfPaths().pipe(new Pack()).pipe(new fs.WriteStream('out.tar'))
13
14class PackJob {
15  constructor (path, absolute) {
16    this.path = path || './'
17    this.absolute = absolute
18    this.entry = null
19    this.stat = null
20    this.readdir = null
21    this.pending = false
22    this.ignore = false
23    this.piped = false
24  }
25}
26
27const MiniPass = require('minipass')
28const zlib = require('minizlib')
29const ReadEntry = require('./read-entry.js')
30const WriteEntry = require('./write-entry.js')
31const WriteEntrySync = WriteEntry.Sync
32const WriteEntryTar = WriteEntry.Tar
33const Yallist = require('yallist')
34const EOF = Buffer.alloc(1024)
35const ONSTAT = Symbol('onStat')
36const ENDED = Symbol('ended')
37const QUEUE = Symbol('queue')
38const CURRENT = Symbol('current')
39const PROCESS = Symbol('process')
40const PROCESSING = Symbol('processing')
41const PROCESSJOB = Symbol('processJob')
42const JOBS = Symbol('jobs')
43const JOBDONE = Symbol('jobDone')
44const ADDFSENTRY = Symbol('addFSEntry')
45const ADDTARENTRY = Symbol('addTarEntry')
46const STAT = Symbol('stat')
47const READDIR = Symbol('readdir')
48const ONREADDIR = Symbol('onreaddir')
49const PIPE = Symbol('pipe')
50const ENTRY = Symbol('entry')
51const ENTRYOPT = Symbol('entryOpt')
52const WRITEENTRYCLASS = Symbol('writeEntryClass')
53const WRITE = Symbol('write')
54const ONDRAIN = Symbol('ondrain')
55
56const fs = require('fs')
57const path = require('path')
58const warner = require('./warn-mixin.js')
59const normPath = require('./normalize-windows-path.js')
60
61const Pack = warner(class Pack extends MiniPass {
62  constructor (opt) {
63    super(opt)
64    opt = opt || Object.create(null)
65    this.opt = opt
66    this.cwd = opt.cwd || process.cwd()
67    this.maxReadSize = opt.maxReadSize
68    this.preservePaths = !!opt.preservePaths
69    this.strict = !!opt.strict
70    this.noPax = !!opt.noPax
71    this.prefix = normPath(opt.prefix || '')
72    this.linkCache = opt.linkCache || new Map()
73    this.statCache = opt.statCache || new Map()
74    this.readdirCache = opt.readdirCache || new Map()
75
76    this[WRITEENTRYCLASS] = WriteEntry
77    if (typeof opt.onwarn === 'function')
78      this.on('warn', opt.onwarn)
79
80    this.zip = null
81    if (opt.gzip) {
82      if (typeof opt.gzip !== 'object')
83        opt.gzip = {}
84      this.zip = new zlib.Gzip(opt.gzip)
85      this.zip.on('data', chunk => super.write(chunk))
86      this.zip.on('end', _ => super.end())
87      this.zip.on('drain', _ => this[ONDRAIN]())
88      this.on('resume', _ => this.zip.resume())
89    } else
90      this.on('drain', this[ONDRAIN])
91
92    this.portable = !!opt.portable
93    this.noDirRecurse = !!opt.noDirRecurse
94    this.follow = !!opt.follow
95    this.noMtime = !!opt.noMtime
96    this.mtime = opt.mtime || null
97
98    this.filter = typeof opt.filter === 'function' ? opt.filter : _ => true
99
100    this[QUEUE] = new Yallist
101    this[JOBS] = 0
102    this.jobs = +opt.jobs || 4
103    this[PROCESSING] = false
104    this[ENDED] = false
105  }
106
107  [WRITE] (chunk) {
108    return super.write(chunk)
109  }
110
111  add (path) {
112    this.write(path)
113    return this
114  }
115
116  end (path) {
117    if (path)
118      this.write(path)
119    this[ENDED] = true
120    this[PROCESS]()
121    return this
122  }
123
124  write (path) {
125    if (this[ENDED])
126      throw new Error('write after end')
127
128    if (path instanceof ReadEntry)
129      this[ADDTARENTRY](path)
130    else
131      this[ADDFSENTRY](path)
132    return this.flowing
133  }
134
135  [ADDTARENTRY] (p) {
136    const absolute = normPath(path.resolve(this.cwd, p.path))
137    // in this case, we don't have to wait for the stat
138    if (!this.filter(p.path, p))
139      p.resume()
140    else {
141      const job = new PackJob(p.path, absolute, false)
142      job.entry = new WriteEntryTar(p, this[ENTRYOPT](job))
143      job.entry.on('end', _ => this[JOBDONE](job))
144      this[JOBS] += 1
145      this[QUEUE].push(job)
146    }
147
148    this[PROCESS]()
149  }
150
151  [ADDFSENTRY] (p) {
152    const absolute = normPath(path.resolve(this.cwd, p))
153    this[QUEUE].push(new PackJob(p, absolute))
154    this[PROCESS]()
155  }
156
157  [STAT] (job) {
158    job.pending = true
159    this[JOBS] += 1
160    const stat = this.follow ? 'stat' : 'lstat'
161    fs[stat](job.absolute, (er, stat) => {
162      job.pending = false
163      this[JOBS] -= 1
164      if (er)
165        this.emit('error', er)
166      else
167        this[ONSTAT](job, stat)
168    })
169  }
170
171  [ONSTAT] (job, stat) {
172    this.statCache.set(job.absolute, stat)
173    job.stat = stat
174
175    // now we have the stat, we can filter it.
176    if (!this.filter(job.path, stat))
177      job.ignore = true
178
179    this[PROCESS]()
180  }
181
182  [READDIR] (job) {
183    job.pending = true
184    this[JOBS] += 1
185    fs.readdir(job.absolute, (er, entries) => {
186      job.pending = false
187      this[JOBS] -= 1
188      if (er)
189        return this.emit('error', er)
190      this[ONREADDIR](job, entries)
191    })
192  }
193
194  [ONREADDIR] (job, entries) {
195    this.readdirCache.set(job.absolute, entries)
196    job.readdir = entries
197    this[PROCESS]()
198  }
199
200  [PROCESS] () {
201    if (this[PROCESSING])
202      return
203
204    this[PROCESSING] = true
205    for (let w = this[QUEUE].head;
206         w !== null && this[JOBS] < this.jobs;
207         w = w.next) {
208      this[PROCESSJOB](w.value)
209      if (w.value.ignore) {
210        const p = w.next
211        this[QUEUE].removeNode(w)
212        w.next = p
213      }
214    }
215
216    this[PROCESSING] = false
217
218    if (this[ENDED] && !this[QUEUE].length && this[JOBS] === 0) {
219      if (this.zip)
220        this.zip.end(EOF)
221      else {
222        super.write(EOF)
223        super.end()
224      }
225    }
226  }
227
228  get [CURRENT] () {
229    return this[QUEUE] && this[QUEUE].head && this[QUEUE].head.value
230  }
231
232  [JOBDONE] (job) {
233    this[QUEUE].shift()
234    this[JOBS] -= 1
235    this[PROCESS]()
236  }
237
238  [PROCESSJOB] (job) {
239    if (job.pending)
240      return
241
242    if (job.entry) {
243      if (job === this[CURRENT] && !job.piped)
244        this[PIPE](job)
245      return
246    }
247
248    if (!job.stat) {
249      if (this.statCache.has(job.absolute))
250        this[ONSTAT](job, this.statCache.get(job.absolute))
251      else
252        this[STAT](job)
253    }
254    if (!job.stat)
255      return
256
257    // filtered out!
258    if (job.ignore)
259      return
260
261    if (!this.noDirRecurse && job.stat.isDirectory() && !job.readdir) {
262      if (this.readdirCache.has(job.absolute))
263        this[ONREADDIR](job, this.readdirCache.get(job.absolute))
264      else
265        this[READDIR](job)
266      if (!job.readdir)
267        return
268    }
269
270    // we know it doesn't have an entry, because that got checked above
271    job.entry = this[ENTRY](job)
272    if (!job.entry) {
273      job.ignore = true
274      return
275    }
276
277    if (job === this[CURRENT] && !job.piped)
278      this[PIPE](job)
279  }
280
281  [ENTRYOPT] (job) {
282    return {
283      onwarn: (msg, data) => {
284        this.warn(msg, data)
285      },
286      noPax: this.noPax,
287      cwd: this.cwd,
288      absolute: job.absolute,
289      preservePaths: this.preservePaths,
290      maxReadSize: this.maxReadSize,
291      strict: this.strict,
292      portable: this.portable,
293      linkCache: this.linkCache,
294      statCache: this.statCache,
295      noMtime: this.noMtime,
296      mtime: this.mtime,
297      prefix: this.prefix,
298    }
299  }
300
301  [ENTRY] (job) {
302    this[JOBS] += 1
303    try {
304      return new this[WRITEENTRYCLASS](job.path, this[ENTRYOPT](job))
305        .on('end', () => this[JOBDONE](job))
306        .on('error', er => this.emit('error', er))
307    } catch (er) {
308      this.emit('error', er)
309    }
310  }
311
312  [ONDRAIN] () {
313    if (this[CURRENT] && this[CURRENT].entry)
314      this[CURRENT].entry.resume()
315  }
316
317  // like .pipe() but using super, because our write() is special
318  [PIPE] (job) {
319    job.piped = true
320
321    if (job.readdir)
322      job.readdir.forEach(entry => {
323        const p = job.path
324        const base = p === './' ? '' : p.replace(/\/*$/, '/')
325        this[ADDFSENTRY](base + entry)
326      })
327
328    const source = job.entry
329    const zip = this.zip
330
331    if (zip)
332      source.on('data', chunk => {
333        if (!zip.write(chunk))
334          source.pause()
335      })
336    else
337      source.on('data', chunk => {
338        if (!super.write(chunk))
339          source.pause()
340      })
341  }
342
343  pause () {
344    if (this.zip)
345      this.zip.pause()
346    return super.pause()
347  }
348})
349
350class PackSync extends Pack {
351  constructor (opt) {
352    super(opt)
353    this[WRITEENTRYCLASS] = WriteEntrySync
354  }
355
356  // pause/resume are no-ops in sync streams.
357  pause () {}
358  resume () {}
359
360  [STAT] (job) {
361    const stat = this.follow ? 'statSync' : 'lstatSync'
362    this[ONSTAT](job, fs[stat](job.absolute))
363  }
364
365  [READDIR] (job, stat) {
366    this[ONREADDIR](job, fs.readdirSync(job.absolute))
367  }
368
369  // gotta get it all in this tick
370  [PIPE] (job) {
371    const source = job.entry
372    const zip = this.zip
373
374    if (job.readdir)
375      job.readdir.forEach(entry => {
376        const p = job.path
377        const base = p === './' ? '' : p.replace(/\/*$/, '/')
378        this[ADDFSENTRY](base + entry)
379      })
380
381    if (zip)
382      source.on('data', chunk => {
383        zip.write(chunk)
384      })
385    else
386      source.on('data', chunk => {
387        super[WRITE](chunk)
388      })
389  }
390}
391
392Pack.Sync = PackSync
393
394module.exports = Pack
395