1'use strict' 2 3// this[BUFFER] is the remainder of a chunk if we're waiting for 4// the full 512 bytes of a header to come in. We will Buffer.concat() 5// it to the next write(), which is a mem copy, but a small one. 6// 7// this[QUEUE] is a Yallist of entries that haven't been emitted 8// yet this can only get filled up if the user keeps write()ing after 9// a write() returns false, or does a write() with more than one entry 10// 11// We don't buffer chunks, we always parse them and either create an 12// entry, or push it into the active entry. The ReadEntry class knows 13// to throw data away if .ignore=true 14// 15// Shift entry off the buffer when it emits 'end', and emit 'entry' for 16// the next one in the list. 17// 18// At any time, we're pushing body chunks into the entry at WRITEENTRY, 19// and waiting for 'end' on the entry at READENTRY 20// 21// ignored entries get .resume() called on them straight away 22 23const warner = require('./warn-mixin.js') 24const path = require('path') 25const Header = require('./header.js') 26const EE = require('events') 27const Yallist = require('yallist') 28const maxMetaEntrySize = 1024 * 1024 29const Entry = require('./read-entry.js') 30const Pax = require('./pax.js') 31const zlib = require('minizlib') 32const Buffer = require('./buffer.js') 33 34const gzipHeader = Buffer.from([0x1f, 0x8b]) 35const STATE = Symbol('state') 36const WRITEENTRY = Symbol('writeEntry') 37const READENTRY = Symbol('readEntry') 38const NEXTENTRY = Symbol('nextEntry') 39const PROCESSENTRY = Symbol('processEntry') 40const EX = Symbol('extendedHeader') 41const GEX = Symbol('globalExtendedHeader') 42const META = Symbol('meta') 43const EMITMETA = Symbol('emitMeta') 44const BUFFER = Symbol('buffer') 45const QUEUE = Symbol('queue') 46const ENDED = Symbol('ended') 47const EMITTEDEND = Symbol('emittedEnd') 48const EMIT = Symbol('emit') 49const UNZIP = Symbol('unzip') 50const CONSUMECHUNK = Symbol('consumeChunk') 51const CONSUMECHUNKSUB = Symbol('consumeChunkSub') 52const CONSUMEBODY = Symbol('consumeBody') 53const CONSUMEMETA = Symbol('consumeMeta') 54const CONSUMEHEADER = Symbol('consumeHeader') 55const CONSUMING = Symbol('consuming') 56const BUFFERCONCAT = Symbol('bufferConcat') 57const MAYBEEND = Symbol('maybeEnd') 58const WRITING = Symbol('writing') 59const ABORTED = Symbol('aborted') 60const DONE = Symbol('onDone') 61 62const noop = _ => true 63 64module.exports = warner(class Parser extends EE { 65 constructor (opt) { 66 opt = opt || {} 67 super(opt) 68 69 if (opt.ondone) 70 this.on(DONE, opt.ondone) 71 else 72 this.on(DONE, _ => { 73 this.emit('prefinish') 74 this.emit('finish') 75 this.emit('end') 76 this.emit('close') 77 }) 78 79 this.strict = !!opt.strict 80 this.maxMetaEntrySize = opt.maxMetaEntrySize || maxMetaEntrySize 81 this.filter = typeof opt.filter === 'function' ? opt.filter : noop 82 83 // have to set this so that streams are ok piping into it 84 this.writable = true 85 this.readable = false 86 87 this[QUEUE] = new Yallist() 88 this[BUFFER] = null 89 this[READENTRY] = null 90 this[WRITEENTRY] = null 91 this[STATE] = 'begin' 92 this[META] = '' 93 this[EX] = null 94 this[GEX] = null 95 this[ENDED] = false 96 this[UNZIP] = null 97 this[ABORTED] = false 98 if (typeof opt.onwarn === 'function') 99 this.on('warn', opt.onwarn) 100 if (typeof opt.onentry === 'function') 101 this.on('entry', opt.onentry) 102 } 103 104 [CONSUMEHEADER] (chunk, position) { 105 let header 106 try { 107 header = new Header(chunk, position, this[EX], this[GEX]) 108 } catch (er) { 109 return this.warn('invalid entry', er) 110 } 111 112 if (header.nullBlock) 113 this[EMIT]('nullBlock') 114 else if (!header.cksumValid) 115 this.warn('invalid entry', header) 116 else if (!header.path) 117 this.warn('invalid: path is required', header) 118 else { 119 const type = header.type 120 if (/^(Symbolic)?Link$/.test(type) && !header.linkpath) 121 this.warn('invalid: linkpath required', header) 122 else if (!/^(Symbolic)?Link$/.test(type) && header.linkpath) 123 this.warn('invalid: linkpath forbidden', header) 124 else { 125 const entry = this[WRITEENTRY] = new Entry(header, this[EX], this[GEX]) 126 127 if (entry.meta) { 128 if (entry.size > this.maxMetaEntrySize) { 129 entry.ignore = true 130 this[EMIT]('ignoredEntry', entry) 131 this[STATE] = 'ignore' 132 } else if (entry.size > 0) { 133 this[META] = '' 134 entry.on('data', c => this[META] += c) 135 this[STATE] = 'meta' 136 } 137 } else { 138 139 this[EX] = null 140 entry.ignore = entry.ignore || !this.filter(entry.path, entry) 141 if (entry.ignore) { 142 this[EMIT]('ignoredEntry', entry) 143 this[STATE] = entry.remain ? 'ignore' : 'begin' 144 } else { 145 if (entry.remain) 146 this[STATE] = 'body' 147 else { 148 this[STATE] = 'begin' 149 entry.end() 150 } 151 152 if (!this[READENTRY]) { 153 this[QUEUE].push(entry) 154 this[NEXTENTRY]() 155 } else 156 this[QUEUE].push(entry) 157 } 158 } 159 } 160 } 161 } 162 163 [PROCESSENTRY] (entry) { 164 let go = true 165 166 if (!entry) { 167 this[READENTRY] = null 168 go = false 169 } else if (Array.isArray(entry)) 170 this.emit.apply(this, entry) 171 else { 172 this[READENTRY] = entry 173 this.emit('entry', entry) 174 if (!entry.emittedEnd) { 175 entry.on('end', _ => this[NEXTENTRY]()) 176 go = false 177 } 178 } 179 180 return go 181 } 182 183 [NEXTENTRY] () { 184 do {} while (this[PROCESSENTRY](this[QUEUE].shift())) 185 186 if (!this[QUEUE].length) { 187 // At this point, there's nothing in the queue, but we may have an 188 // entry which is being consumed (readEntry). 189 // If we don't, then we definitely can handle more data. 190 // If we do, and either it's flowing, or it has never had any data 191 // written to it, then it needs more. 192 // The only other possibility is that it has returned false from a 193 // write() call, so we wait for the next drain to continue. 194 const re = this[READENTRY] 195 const drainNow = !re || re.flowing || re.size === re.remain 196 if (drainNow) { 197 if (!this[WRITING]) 198 this.emit('drain') 199 } else 200 re.once('drain', _ => this.emit('drain')) 201 } 202 } 203 204 [CONSUMEBODY] (chunk, position) { 205 // write up to but no more than writeEntry.blockRemain 206 const entry = this[WRITEENTRY] 207 const br = entry.blockRemain 208 const c = (br >= chunk.length && position === 0) ? chunk 209 : chunk.slice(position, position + br) 210 211 entry.write(c) 212 213 if (!entry.blockRemain) { 214 this[STATE] = 'begin' 215 this[WRITEENTRY] = null 216 entry.end() 217 } 218 219 return c.length 220 } 221 222 [CONSUMEMETA] (chunk, position) { 223 const entry = this[WRITEENTRY] 224 const ret = this[CONSUMEBODY](chunk, position) 225 226 // if we finished, then the entry is reset 227 if (!this[WRITEENTRY]) 228 this[EMITMETA](entry) 229 230 return ret 231 } 232 233 [EMIT] (ev, data, extra) { 234 if (!this[QUEUE].length && !this[READENTRY]) 235 this.emit(ev, data, extra) 236 else 237 this[QUEUE].push([ev, data, extra]) 238 } 239 240 [EMITMETA] (entry) { 241 this[EMIT]('meta', this[META]) 242 switch (entry.type) { 243 case 'ExtendedHeader': 244 case 'OldExtendedHeader': 245 this[EX] = Pax.parse(this[META], this[EX], false) 246 break 247 248 case 'GlobalExtendedHeader': 249 this[GEX] = Pax.parse(this[META], this[GEX], true) 250 break 251 252 case 'NextFileHasLongPath': 253 case 'OldGnuLongPath': 254 this[EX] = this[EX] || Object.create(null) 255 this[EX].path = this[META].replace(/\0.*/, '') 256 break 257 258 case 'NextFileHasLongLinkpath': 259 this[EX] = this[EX] || Object.create(null) 260 this[EX].linkpath = this[META].replace(/\0.*/, '') 261 break 262 263 /* istanbul ignore next */ 264 default: throw new Error('unknown meta: ' + entry.type) 265 } 266 } 267 268 abort (msg, error) { 269 this[ABORTED] = true 270 this.warn(msg, error) 271 this.emit('abort', error) 272 this.emit('error', error) 273 } 274 275 write (chunk) { 276 if (this[ABORTED]) 277 return 278 279 // first write, might be gzipped 280 if (this[UNZIP] === null && chunk) { 281 if (this[BUFFER]) { 282 chunk = Buffer.concat([this[BUFFER], chunk]) 283 this[BUFFER] = null 284 } 285 if (chunk.length < gzipHeader.length) { 286 this[BUFFER] = chunk 287 return true 288 } 289 for (let i = 0; this[UNZIP] === null && i < gzipHeader.length; i++) { 290 if (chunk[i] !== gzipHeader[i]) 291 this[UNZIP] = false 292 } 293 if (this[UNZIP] === null) { 294 const ended = this[ENDED] 295 this[ENDED] = false 296 this[UNZIP] = new zlib.Unzip() 297 this[UNZIP].on('data', chunk => this[CONSUMECHUNK](chunk)) 298 this[UNZIP].on('error', er => 299 this.abort(er.message, er)) 300 this[UNZIP].on('end', _ => { 301 this[ENDED] = true 302 this[CONSUMECHUNK]() 303 }) 304 this[WRITING] = true 305 const ret = this[UNZIP][ended ? 'end' : 'write' ](chunk) 306 this[WRITING] = false 307 return ret 308 } 309 } 310 311 this[WRITING] = true 312 if (this[UNZIP]) 313 this[UNZIP].write(chunk) 314 else 315 this[CONSUMECHUNK](chunk) 316 this[WRITING] = false 317 318 // return false if there's a queue, or if the current entry isn't flowing 319 const ret = 320 this[QUEUE].length ? false : 321 this[READENTRY] ? this[READENTRY].flowing : 322 true 323 324 // if we have no queue, then that means a clogged READENTRY 325 if (!ret && !this[QUEUE].length) 326 this[READENTRY].once('drain', _ => this.emit('drain')) 327 328 return ret 329 } 330 331 [BUFFERCONCAT] (c) { 332 if (c && !this[ABORTED]) 333 this[BUFFER] = this[BUFFER] ? Buffer.concat([this[BUFFER], c]) : c 334 } 335 336 [MAYBEEND] () { 337 if (this[ENDED] && 338 !this[EMITTEDEND] && 339 !this[ABORTED] && 340 !this[CONSUMING]) { 341 this[EMITTEDEND] = true 342 const entry = this[WRITEENTRY] 343 if (entry && entry.blockRemain) { 344 const have = this[BUFFER] ? this[BUFFER].length : 0 345 this.warn('Truncated input (needed ' + entry.blockRemain + 346 ' more bytes, only ' + have + ' available)', entry) 347 if (this[BUFFER]) 348 entry.write(this[BUFFER]) 349 entry.end() 350 } 351 this[EMIT](DONE) 352 } 353 } 354 355 [CONSUMECHUNK] (chunk) { 356 if (this[CONSUMING]) { 357 this[BUFFERCONCAT](chunk) 358 } else if (!chunk && !this[BUFFER]) { 359 this[MAYBEEND]() 360 } else { 361 this[CONSUMING] = true 362 if (this[BUFFER]) { 363 this[BUFFERCONCAT](chunk) 364 const c = this[BUFFER] 365 this[BUFFER] = null 366 this[CONSUMECHUNKSUB](c) 367 } else { 368 this[CONSUMECHUNKSUB](chunk) 369 } 370 371 while (this[BUFFER] && this[BUFFER].length >= 512 && !this[ABORTED]) { 372 const c = this[BUFFER] 373 this[BUFFER] = null 374 this[CONSUMECHUNKSUB](c) 375 } 376 this[CONSUMING] = false 377 } 378 379 if (!this[BUFFER] || this[ENDED]) 380 this[MAYBEEND]() 381 } 382 383 [CONSUMECHUNKSUB] (chunk) { 384 // we know that we are in CONSUMING mode, so anything written goes into 385 // the buffer. Advance the position and put any remainder in the buffer. 386 let position = 0 387 let length = chunk.length 388 while (position + 512 <= length && !this[ABORTED]) { 389 switch (this[STATE]) { 390 case 'begin': 391 this[CONSUMEHEADER](chunk, position) 392 position += 512 393 break 394 395 case 'ignore': 396 case 'body': 397 position += this[CONSUMEBODY](chunk, position) 398 break 399 400 case 'meta': 401 position += this[CONSUMEMETA](chunk, position) 402 break 403 404 /* istanbul ignore next */ 405 default: 406 throw new Error('invalid state: ' + this[STATE]) 407 } 408 } 409 410 if (position < length) { 411 if (this[BUFFER]) 412 this[BUFFER] = Buffer.concat([chunk.slice(position), this[BUFFER]]) 413 else 414 this[BUFFER] = chunk.slice(position) 415 } 416 } 417 418 end (chunk) { 419 if (!this[ABORTED]) { 420 if (this[UNZIP]) 421 this[UNZIP].end(chunk) 422 else { 423 this[ENDED] = true 424 this.write(chunk) 425 } 426 } 427 } 428}) 429