• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkState;
20 
21 import java.io.Closeable;
22 import java.util.zip.CRC32;
23 import java.util.zip.DataFormatException;
24 import java.util.zip.Inflater;
25 import java.util.zip.ZipException;
26 import javax.annotation.concurrent.NotThreadSafe;
27 
28 /**
29  * Processes gzip streams, delegating to {@link Inflater} to perform on-demand inflation of the
30  * deflated blocks. Like {@link java.util.zip.GZIPInputStream}, this handles concatenated gzip
31  * streams. Unlike {@link java.util.zip.GZIPInputStream}, this allows for incremental processing of
32  * gzip streams, allowing data to be inflated as it arrives over the wire.
33  *
34  * <p>This also frees the inflate context when the end of a gzip stream is reached without another
35  * concatenated stream available to inflate.
36  */
37 @NotThreadSafe
38 class GzipInflatingBuffer implements Closeable {
39 
40   private static final int INFLATE_BUFFER_SIZE = 512;
41   private static final int UNSIGNED_SHORT_SIZE = 2;
42 
43   private static final int GZIP_MAGIC = 0x8b1f;
44 
45   private static final int GZIP_HEADER_MIN_SIZE = 10;
46   private static final int GZIP_TRAILER_SIZE = 8;
47 
48   private static final int HEADER_CRC_FLAG = 2;
49   private static final int HEADER_EXTRA_FLAG = 4;
50   private static final int HEADER_NAME_FLAG = 8;
51   private static final int HEADER_COMMENT_FLAG = 16;
52 
53   /**
54    * Reads gzip header and trailer bytes from the inflater's buffer (if bytes beyond the inflate
55    * block were given to the inflater) and then from {@code gzippedData}, and handles updating the
56    * CRC and the count of gzipped bytes consumed.
57    */
58   private class GzipMetadataReader {
59 
60     /**
61      * Returns the next unsigned byte, adding it the CRC and incrementing {@code bytesConsumed}.
62      *
63      * <p>It is the responsibility of the caller to verify and reset the CRC as needed, as well as
64      * caching the current CRC value when necessary before invoking this method.
65      */
readUnsignedByte()66     private int readUnsignedByte() {
67       int bytesRemainingInInflaterInput = inflaterInputEnd - inflaterInputStart;
68       int b;
69       if (bytesRemainingInInflaterInput > 0) {
70         b = inflaterInput[inflaterInputStart] & 0xFF;
71         inflaterInputStart += 1;
72       } else {
73         b = gzippedData.readUnsignedByte();
74       }
75       crc.update(b);
76       bytesConsumed += 1;
77       return b;
78     }
79 
80     /**
81      * Skips {@code length} bytes, adding them to the CRC and adding {@code length} to {@code
82      * bytesConsumed}.
83      *
84      * <p>It is the responsibility of the caller to verify and reset the CRC as needed, as well as
85      * caching the current CRC value when necessary before invoking this method.
86      */
skipBytes(int length)87     private void skipBytes(int length) {
88       int bytesToSkip = length;
89       int bytesRemainingInInflaterInput = inflaterInputEnd - inflaterInputStart;
90 
91       if (bytesRemainingInInflaterInput > 0) {
92         int bytesToGetFromInflaterInput = Math.min(bytesRemainingInInflaterInput, bytesToSkip);
93         crc.update(inflaterInput, inflaterInputStart, bytesToGetFromInflaterInput);
94         inflaterInputStart += bytesToGetFromInflaterInput;
95         bytesToSkip -= bytesToGetFromInflaterInput;
96       }
97 
98       if (bytesToSkip > 0) {
99         byte[] buf = new byte[512];
100         int total = 0;
101         while (total < bytesToSkip) {
102           int toRead = Math.min(bytesToSkip - total, buf.length);
103           gzippedData.readBytes(buf, 0, toRead);
104           crc.update(buf, 0, toRead);
105           total += toRead;
106         }
107       }
108 
109       bytesConsumed += length;
110     }
111 
readableBytes()112     private int readableBytes() {
113       return (inflaterInputEnd - inflaterInputStart) + gzippedData.readableBytes();
114     }
115 
116     /** Skip over a zero-terminated byte sequence. Returns true when the zero byte is read. */
readBytesUntilZero()117     private boolean readBytesUntilZero() {
118       while (readableBytes() > 0) {
119         if (readUnsignedByte() == 0) {
120           return true;
121         }
122       }
123       return false;
124     }
125 
126     /** Reads unsigned short in Little-Endian byte order. */
readUnsignedShort()127     private int readUnsignedShort() {
128       return readUnsignedByte() | (readUnsignedByte() << 8);
129     }
130 
131     /** Reads unsigned integer in Little-Endian byte order. */
readUnsignedInt()132     private long readUnsignedInt() {
133       long s = readUnsignedShort();
134       return ((long) readUnsignedShort() << 16) | s;
135     }
136   }
137 
138   private enum State {
139     HEADER,
140     HEADER_EXTRA_LEN,
141     HEADER_EXTRA,
142     HEADER_NAME,
143     HEADER_COMMENT,
144     HEADER_CRC,
145     INITIALIZE_INFLATER,
146     INFLATING,
147     INFLATER_NEEDS_INPUT,
148     TRAILER
149   }
150 
151   /**
152    * This buffer holds all input gzipped data, consisting of blocks of deflated data and the
153    * surrounding gzip headers and trailers. All access to the Gzip headers and trailers must be made
154    * via {@link GzipMetadataReader}.
155    */
156   private final CompositeReadableBuffer gzippedData = new CompositeReadableBuffer();
157 
158   private final CRC32 crc = new CRC32();
159 
160   private final GzipMetadataReader gzipMetadataReader = new GzipMetadataReader();
161   private final byte[] inflaterInput = new byte[INFLATE_BUFFER_SIZE];
162   private int inflaterInputStart;
163   private int inflaterInputEnd;
164   private Inflater inflater;
165   private State state = State.HEADER;
166   private boolean closed = false;
167 
168   /** Extra state variables for parsing gzip header flags. */
169   private int gzipHeaderFlag;
170   private int headerExtraToRead;
171 
172   /* Number of inflated bytes per gzip stream, used to validate the gzip trailer. */
173   private long expectedGzipTrailerIsize;
174 
175   /**
176    * Tracks gzipped bytes (including gzip metadata and deflated blocks) consumed during {@link
177    * #inflateBytes} calls.
178    */
179   private int bytesConsumed = 0;
180 
181   /** Tracks deflated bytes (excluding gzip metadata) consumed by the inflater. */
182   private int deflatedBytesConsumed = 0;
183 
184   private boolean isStalled = true;
185 
186   /**
187    * Returns true when more bytes must be added via {@link #addGzippedBytes} to enable additional
188    * calls to {@link #inflateBytes} to make progress.
189    */
isStalled()190   boolean isStalled() {
191     checkState(!closed, "GzipInflatingBuffer is closed");
192     return isStalled;
193   }
194 
195   /**
196    * Returns true when there is gzippedData that has not been input to the inflater or the inflater
197    * has not consumed all of its input, or all data has been consumed but we are at not at the
198    * boundary between gzip streams.
199    */
hasPartialData()200   boolean hasPartialData() {
201     checkState(!closed, "GzipInflatingBuffer is closed");
202     return gzipMetadataReader.readableBytes() != 0 || state != State.HEADER;
203   }
204 
205   /**
206    * Adds more gzipped data, which will be consumed only when needed to fulfill requests made via
207    * {@link #inflateBytes}.
208    */
addGzippedBytes(ReadableBuffer buffer)209   void addGzippedBytes(ReadableBuffer buffer) {
210     checkState(!closed, "GzipInflatingBuffer is closed");
211     gzippedData.addBuffer(buffer);
212     isStalled = false;
213   }
214 
215   @Override
close()216   public void close() {
217     if (!closed) {
218       closed = true;
219       gzippedData.close();
220       if (inflater != null) {
221         inflater.end();
222         inflater = null;
223       }
224     }
225   }
226 
227   /**
228    * Reports bytes consumed by calls to {@link #inflateBytes} since the last invocation of this
229    * method, then resets the count to zero.
230    */
getAndResetBytesConsumed()231   int getAndResetBytesConsumed() {
232     int savedBytesConsumed = bytesConsumed;
233     bytesConsumed = 0;
234     return savedBytesConsumed;
235   }
236 
237   /**
238    * Reports bytes consumed by the inflater since the last invocation of this method, then resets
239    * the count to zero.
240    */
getAndResetDeflatedBytesConsumed()241   int getAndResetDeflatedBytesConsumed() {
242     int savedDeflatedBytesConsumed = deflatedBytesConsumed;
243     deflatedBytesConsumed = 0;
244     return savedDeflatedBytesConsumed;
245   }
246 
247   /**
248    * Attempts to inflate {@code length} bytes of data into {@code b}.
249    *
250    * <p>Any gzipped bytes consumed by this method will be added to the counter returned by {@link
251    * #getAndResetBytesConsumed()}. This method may consume gzipped bytes without writing any data to
252    * {@code b}, and may also write data to {@code b} without consuming additional gzipped bytes (if
253    * the inflater on an earlier call consumed the bytes necessary to produce output).
254    *
255    * @param b the destination array to receive the bytes.
256    * @param offset the starting offset in the destination array.
257    * @param length the number of bytes to be copied.
258    * @throws IndexOutOfBoundsException if {@code b} is too small to hold the requested bytes.
259    */
inflateBytes(byte[] b, int offset, int length)260   int inflateBytes(byte[] b, int offset, int length) throws DataFormatException, ZipException {
261     checkState(!closed, "GzipInflatingBuffer is closed");
262 
263     int bytesRead = 0;
264     int missingBytes;
265     boolean madeProgress = true;
266     while (madeProgress && (missingBytes = length - bytesRead) > 0) {
267       switch (state) {
268         case HEADER:
269           madeProgress = processHeader();
270           break;
271         case HEADER_EXTRA_LEN:
272           madeProgress = processHeaderExtraLen();
273           break;
274         case HEADER_EXTRA:
275           madeProgress = processHeaderExtra();
276           break;
277         case HEADER_NAME:
278           madeProgress = processHeaderName();
279           break;
280         case HEADER_COMMENT:
281           madeProgress = processHeaderComment();
282           break;
283         case HEADER_CRC:
284           madeProgress = processHeaderCrc();
285           break;
286         case INITIALIZE_INFLATER:
287           madeProgress = initializeInflater();
288           break;
289         case INFLATING:
290           bytesRead += inflate(b, offset + bytesRead, missingBytes);
291           if (state == State.TRAILER) {
292             // Eagerly process trailer, if available, to validate CRC.
293             madeProgress = processTrailer();
294           } else {
295             // Continue in INFLATING until we have the required bytes or we transition to
296             // INFLATER_NEEDS_INPUT
297             madeProgress = true;
298           }
299           break;
300         case INFLATER_NEEDS_INPUT:
301           madeProgress = fill();
302           break;
303         case TRAILER:
304           madeProgress = processTrailer();
305           break;
306         default:
307           throw new AssertionError("Invalid state: " + state);
308       }
309     }
310     // If we finished a gzip block, check if we have enough bytes to read another header
311     isStalled =
312         !madeProgress
313             || (state == State.HEADER && gzipMetadataReader.readableBytes() < GZIP_HEADER_MIN_SIZE);
314 
315     return bytesRead;
316   }
317 
processHeader()318   private boolean processHeader() throws ZipException {
319     if (gzipMetadataReader.readableBytes() < GZIP_HEADER_MIN_SIZE) {
320       return false;
321     }
322     if (gzipMetadataReader.readUnsignedShort() != GZIP_MAGIC) {
323       throw new ZipException("Not in GZIP format");
324     }
325     if (gzipMetadataReader.readUnsignedByte() != 8) {
326       throw new ZipException("Unsupported compression method");
327     }
328     gzipHeaderFlag = gzipMetadataReader.readUnsignedByte();
329     gzipMetadataReader.skipBytes(6 /* remaining header bytes */);
330     state = State.HEADER_EXTRA_LEN;
331     return true;
332   }
333 
processHeaderExtraLen()334   private boolean processHeaderExtraLen() {
335     if ((gzipHeaderFlag & HEADER_EXTRA_FLAG) != HEADER_EXTRA_FLAG) {
336       state = State.HEADER_NAME;
337       return true;
338     }
339     if (gzipMetadataReader.readableBytes() < UNSIGNED_SHORT_SIZE) {
340       return false;
341     }
342     headerExtraToRead = gzipMetadataReader.readUnsignedShort();
343     state = State.HEADER_EXTRA;
344     return true;
345   }
346 
processHeaderExtra()347   private boolean processHeaderExtra() {
348     if (gzipMetadataReader.readableBytes() < headerExtraToRead) {
349       return false;
350     }
351     gzipMetadataReader.skipBytes(headerExtraToRead);
352     state = State.HEADER_NAME;
353     return true;
354   }
355 
processHeaderName()356   private boolean processHeaderName() {
357     if ((gzipHeaderFlag & HEADER_NAME_FLAG) != HEADER_NAME_FLAG) {
358       state = State.HEADER_COMMENT;
359       return true;
360     }
361     if (!gzipMetadataReader.readBytesUntilZero()) {
362       return false;
363     }
364     state = State.HEADER_COMMENT;
365     return true;
366   }
367 
processHeaderComment()368   private boolean processHeaderComment() {
369     if ((gzipHeaderFlag & HEADER_COMMENT_FLAG) != HEADER_COMMENT_FLAG) {
370       state = State.HEADER_CRC;
371       return true;
372     }
373     if (!gzipMetadataReader.readBytesUntilZero()) {
374       return false;
375     }
376     state = State.HEADER_CRC;
377     return true;
378   }
379 
processHeaderCrc()380   private boolean processHeaderCrc() throws ZipException {
381     if ((gzipHeaderFlag & HEADER_CRC_FLAG) != HEADER_CRC_FLAG) {
382       state = State.INITIALIZE_INFLATER;
383       return true;
384     }
385     if (gzipMetadataReader.readableBytes() < UNSIGNED_SHORT_SIZE) {
386       return false;
387     }
388     int desiredCrc16 = (int) crc.getValue() & 0xffff;
389     if (desiredCrc16 != gzipMetadataReader.readUnsignedShort()) {
390       throw new ZipException("Corrupt GZIP header");
391     }
392     state = State.INITIALIZE_INFLATER;
393     return true;
394   }
395 
initializeInflater()396   private boolean initializeInflater() {
397     if (inflater == null) {
398       inflater = new Inflater(true);
399     } else {
400       inflater.reset();
401     }
402     crc.reset();
403     int bytesRemainingInInflaterInput = inflaterInputEnd - inflaterInputStart;
404     if (bytesRemainingInInflaterInput > 0) {
405       inflater.setInput(inflaterInput, inflaterInputStart, bytesRemainingInInflaterInput);
406       state = State.INFLATING;
407     } else {
408       state = State.INFLATER_NEEDS_INPUT;
409     }
410     return true;
411   }
412 
inflate(byte[] b, int off, int len)413   private int inflate(byte[] b, int off, int len) throws DataFormatException, ZipException {
414     checkState(inflater != null, "inflater is null");
415 
416     try {
417       int inflaterTotalIn = inflater.getTotalIn();
418       int n = inflater.inflate(b, off, len);
419       int bytesConsumedDelta = inflater.getTotalIn() - inflaterTotalIn;
420       bytesConsumed += bytesConsumedDelta;
421       deflatedBytesConsumed += bytesConsumedDelta;
422       inflaterInputStart += bytesConsumedDelta;
423       crc.update(b, off, n);
424 
425       if (inflater.finished()) {
426         // Save bytes written to check against the trailer ISIZE
427         expectedGzipTrailerIsize = (inflater.getBytesWritten() & 0xffffffffL);
428 
429         state = State.TRAILER;
430       } else if (inflater.needsInput()) {
431         state = State.INFLATER_NEEDS_INPUT;
432       }
433 
434       return n;
435     } catch (DataFormatException e) {
436       // Wrap the exception so tests can check for a specific prefix
437       throw new DataFormatException("Inflater data format exception: " + e.getMessage());
438     }
439   }
440 
fill()441   private boolean fill() {
442     checkState(inflater != null, "inflater is null");
443     checkState(inflaterInputStart == inflaterInputEnd, "inflaterInput has unconsumed bytes");
444     int bytesToAdd = Math.min(gzippedData.readableBytes(), INFLATE_BUFFER_SIZE);
445     if (bytesToAdd == 0) {
446       return false;
447     }
448     inflaterInputStart = 0;
449     inflaterInputEnd = bytesToAdd;
450     gzippedData.readBytes(inflaterInput, inflaterInputStart, bytesToAdd);
451     inflater.setInput(inflaterInput, inflaterInputStart, bytesToAdd);
452     state = State.INFLATING;
453     return true;
454   }
455 
processTrailer()456   private boolean processTrailer() throws ZipException {
457     if (inflater != null
458         && gzipMetadataReader.readableBytes() <= GZIP_HEADER_MIN_SIZE + GZIP_TRAILER_SIZE) {
459       // We don't have enough bytes to begin inflating a concatenated gzip stream, drop context
460       inflater.end();
461       inflater = null;
462     }
463     if (gzipMetadataReader.readableBytes() < GZIP_TRAILER_SIZE) {
464       return false;
465     }
466     if (crc.getValue() != gzipMetadataReader.readUnsignedInt()
467         || expectedGzipTrailerIsize != gzipMetadataReader.readUnsignedInt()) {
468       throw new ZipException("Corrupt GZIP trailer");
469     }
470     crc.reset();
471     state = State.HEADER;
472     return true;
473   }
474 }
475