1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkState; 20 21 import java.io.Closeable; 22 import java.util.zip.CRC32; 23 import java.util.zip.DataFormatException; 24 import java.util.zip.Inflater; 25 import java.util.zip.ZipException; 26 import javax.annotation.concurrent.NotThreadSafe; 27 28 /** 29 * Processes gzip streams, delegating to {@link Inflater} to perform on-demand inflation of the 30 * deflated blocks. Like {@link java.util.zip.GZIPInputStream}, this handles concatenated gzip 31 * streams. Unlike {@link java.util.zip.GZIPInputStream}, this allows for incremental processing of 32 * gzip streams, allowing data to be inflated as it arrives over the wire. 33 * 34 * <p>This also frees the inflate context when the end of a gzip stream is reached without another 35 * concatenated stream available to inflate. 36 */ 37 @NotThreadSafe 38 class GzipInflatingBuffer implements Closeable { 39 40 private static final int INFLATE_BUFFER_SIZE = 512; 41 private static final int UNSIGNED_SHORT_SIZE = 2; 42 43 private static final int GZIP_MAGIC = 0x8b1f; 44 45 private static final int GZIP_HEADER_MIN_SIZE = 10; 46 private static final int GZIP_TRAILER_SIZE = 8; 47 48 private static final int HEADER_CRC_FLAG = 2; 49 private static final int HEADER_EXTRA_FLAG = 4; 50 private static final int HEADER_NAME_FLAG = 8; 51 private static final int HEADER_COMMENT_FLAG = 16; 52 53 /** 54 * Reads gzip header and trailer bytes from the inflater's buffer (if bytes beyond the inflate 55 * block were given to the inflater) and then from {@code gzippedData}, and handles updating the 56 * CRC and the count of gzipped bytes consumed. 57 */ 58 private class GzipMetadataReader { 59 60 /** 61 * Returns the next unsigned byte, adding it the CRC and incrementing {@code bytesConsumed}. 62 * 63 * <p>It is the responsibility of the caller to verify and reset the CRC as needed, as well as 64 * caching the current CRC value when necessary before invoking this method. 65 */ readUnsignedByte()66 private int readUnsignedByte() { 67 int bytesRemainingInInflaterInput = inflaterInputEnd - inflaterInputStart; 68 int b; 69 if (bytesRemainingInInflaterInput > 0) { 70 b = inflaterInput[inflaterInputStart] & 0xFF; 71 inflaterInputStart += 1; 72 } else { 73 b = gzippedData.readUnsignedByte(); 74 } 75 crc.update(b); 76 bytesConsumed += 1; 77 return b; 78 } 79 80 /** 81 * Skips {@code length} bytes, adding them to the CRC and adding {@code length} to {@code 82 * bytesConsumed}. 83 * 84 * <p>It is the responsibility of the caller to verify and reset the CRC as needed, as well as 85 * caching the current CRC value when necessary before invoking this method. 86 */ skipBytes(int length)87 private void skipBytes(int length) { 88 int bytesToSkip = length; 89 int bytesRemainingInInflaterInput = inflaterInputEnd - inflaterInputStart; 90 91 if (bytesRemainingInInflaterInput > 0) { 92 int bytesToGetFromInflaterInput = Math.min(bytesRemainingInInflaterInput, bytesToSkip); 93 crc.update(inflaterInput, inflaterInputStart, bytesToGetFromInflaterInput); 94 inflaterInputStart += bytesToGetFromInflaterInput; 95 bytesToSkip -= bytesToGetFromInflaterInput; 96 } 97 98 if (bytesToSkip > 0) { 99 byte[] buf = new byte[512]; 100 int total = 0; 101 while (total < bytesToSkip) { 102 int toRead = Math.min(bytesToSkip - total, buf.length); 103 gzippedData.readBytes(buf, 0, toRead); 104 crc.update(buf, 0, toRead); 105 total += toRead; 106 } 107 } 108 109 bytesConsumed += length; 110 } 111 readableBytes()112 private int readableBytes() { 113 return (inflaterInputEnd - inflaterInputStart) + gzippedData.readableBytes(); 114 } 115 116 /** Skip over a zero-terminated byte sequence. Returns true when the zero byte is read. */ readBytesUntilZero()117 private boolean readBytesUntilZero() { 118 while (readableBytes() > 0) { 119 if (readUnsignedByte() == 0) { 120 return true; 121 } 122 } 123 return false; 124 } 125 126 /** Reads unsigned short in Little-Endian byte order. */ readUnsignedShort()127 private int readUnsignedShort() { 128 return readUnsignedByte() | (readUnsignedByte() << 8); 129 } 130 131 /** Reads unsigned integer in Little-Endian byte order. */ readUnsignedInt()132 private long readUnsignedInt() { 133 long s = readUnsignedShort(); 134 return ((long) readUnsignedShort() << 16) | s; 135 } 136 } 137 138 private enum State { 139 HEADER, 140 HEADER_EXTRA_LEN, 141 HEADER_EXTRA, 142 HEADER_NAME, 143 HEADER_COMMENT, 144 HEADER_CRC, 145 INITIALIZE_INFLATER, 146 INFLATING, 147 INFLATER_NEEDS_INPUT, 148 TRAILER 149 } 150 151 /** 152 * This buffer holds all input gzipped data, consisting of blocks of deflated data and the 153 * surrounding gzip headers and trailers. All access to the Gzip headers and trailers must be made 154 * via {@link GzipMetadataReader}. 155 */ 156 private final CompositeReadableBuffer gzippedData = new CompositeReadableBuffer(); 157 158 private final CRC32 crc = new CRC32(); 159 160 private final GzipMetadataReader gzipMetadataReader = new GzipMetadataReader(); 161 private final byte[] inflaterInput = new byte[INFLATE_BUFFER_SIZE]; 162 private int inflaterInputStart; 163 private int inflaterInputEnd; 164 private Inflater inflater; 165 private State state = State.HEADER; 166 private boolean closed = false; 167 168 /** Extra state variables for parsing gzip header flags. */ 169 private int gzipHeaderFlag; 170 private int headerExtraToRead; 171 172 /* Number of inflated bytes per gzip stream, used to validate the gzip trailer. */ 173 private long expectedGzipTrailerIsize; 174 175 /** 176 * Tracks gzipped bytes (including gzip metadata and deflated blocks) consumed during {@link 177 * #inflateBytes} calls. 178 */ 179 private int bytesConsumed = 0; 180 181 /** Tracks deflated bytes (excluding gzip metadata) consumed by the inflater. */ 182 private int deflatedBytesConsumed = 0; 183 184 private boolean isStalled = true; 185 186 /** 187 * Returns true when more bytes must be added via {@link #addGzippedBytes} to enable additional 188 * calls to {@link #inflateBytes} to make progress. 189 */ isStalled()190 boolean isStalled() { 191 checkState(!closed, "GzipInflatingBuffer is closed"); 192 return isStalled; 193 } 194 195 /** 196 * Returns true when there is gzippedData that has not been input to the inflater or the inflater 197 * has not consumed all of its input, or all data has been consumed but we are at not at the 198 * boundary between gzip streams. 199 */ hasPartialData()200 boolean hasPartialData() { 201 checkState(!closed, "GzipInflatingBuffer is closed"); 202 return gzipMetadataReader.readableBytes() != 0 || state != State.HEADER; 203 } 204 205 /** 206 * Adds more gzipped data, which will be consumed only when needed to fulfill requests made via 207 * {@link #inflateBytes}. 208 */ addGzippedBytes(ReadableBuffer buffer)209 void addGzippedBytes(ReadableBuffer buffer) { 210 checkState(!closed, "GzipInflatingBuffer is closed"); 211 gzippedData.addBuffer(buffer); 212 isStalled = false; 213 } 214 215 @Override close()216 public void close() { 217 if (!closed) { 218 closed = true; 219 gzippedData.close(); 220 if (inflater != null) { 221 inflater.end(); 222 inflater = null; 223 } 224 } 225 } 226 227 /** 228 * Reports bytes consumed by calls to {@link #inflateBytes} since the last invocation of this 229 * method, then resets the count to zero. 230 */ getAndResetBytesConsumed()231 int getAndResetBytesConsumed() { 232 int savedBytesConsumed = bytesConsumed; 233 bytesConsumed = 0; 234 return savedBytesConsumed; 235 } 236 237 /** 238 * Reports bytes consumed by the inflater since the last invocation of this method, then resets 239 * the count to zero. 240 */ getAndResetDeflatedBytesConsumed()241 int getAndResetDeflatedBytesConsumed() { 242 int savedDeflatedBytesConsumed = deflatedBytesConsumed; 243 deflatedBytesConsumed = 0; 244 return savedDeflatedBytesConsumed; 245 } 246 247 /** 248 * Attempts to inflate {@code length} bytes of data into {@code b}. 249 * 250 * <p>Any gzipped bytes consumed by this method will be added to the counter returned by {@link 251 * #getAndResetBytesConsumed()}. This method may consume gzipped bytes without writing any data to 252 * {@code b}, and may also write data to {@code b} without consuming additional gzipped bytes (if 253 * the inflater on an earlier call consumed the bytes necessary to produce output). 254 * 255 * @param b the destination array to receive the bytes. 256 * @param offset the starting offset in the destination array. 257 * @param length the number of bytes to be copied. 258 * @throws IndexOutOfBoundsException if {@code b} is too small to hold the requested bytes. 259 */ inflateBytes(byte[] b, int offset, int length)260 int inflateBytes(byte[] b, int offset, int length) throws DataFormatException, ZipException { 261 checkState(!closed, "GzipInflatingBuffer is closed"); 262 263 int bytesRead = 0; 264 int missingBytes; 265 boolean madeProgress = true; 266 while (madeProgress && (missingBytes = length - bytesRead) > 0) { 267 switch (state) { 268 case HEADER: 269 madeProgress = processHeader(); 270 break; 271 case HEADER_EXTRA_LEN: 272 madeProgress = processHeaderExtraLen(); 273 break; 274 case HEADER_EXTRA: 275 madeProgress = processHeaderExtra(); 276 break; 277 case HEADER_NAME: 278 madeProgress = processHeaderName(); 279 break; 280 case HEADER_COMMENT: 281 madeProgress = processHeaderComment(); 282 break; 283 case HEADER_CRC: 284 madeProgress = processHeaderCrc(); 285 break; 286 case INITIALIZE_INFLATER: 287 madeProgress = initializeInflater(); 288 break; 289 case INFLATING: 290 bytesRead += inflate(b, offset + bytesRead, missingBytes); 291 if (state == State.TRAILER) { 292 // Eagerly process trailer, if available, to validate CRC. 293 madeProgress = processTrailer(); 294 } else { 295 // Continue in INFLATING until we have the required bytes or we transition to 296 // INFLATER_NEEDS_INPUT 297 madeProgress = true; 298 } 299 break; 300 case INFLATER_NEEDS_INPUT: 301 madeProgress = fill(); 302 break; 303 case TRAILER: 304 madeProgress = processTrailer(); 305 break; 306 default: 307 throw new AssertionError("Invalid state: " + state); 308 } 309 } 310 // If we finished a gzip block, check if we have enough bytes to read another header 311 isStalled = 312 !madeProgress 313 || (state == State.HEADER && gzipMetadataReader.readableBytes() < GZIP_HEADER_MIN_SIZE); 314 315 return bytesRead; 316 } 317 processHeader()318 private boolean processHeader() throws ZipException { 319 if (gzipMetadataReader.readableBytes() < GZIP_HEADER_MIN_SIZE) { 320 return false; 321 } 322 if (gzipMetadataReader.readUnsignedShort() != GZIP_MAGIC) { 323 throw new ZipException("Not in GZIP format"); 324 } 325 if (gzipMetadataReader.readUnsignedByte() != 8) { 326 throw new ZipException("Unsupported compression method"); 327 } 328 gzipHeaderFlag = gzipMetadataReader.readUnsignedByte(); 329 gzipMetadataReader.skipBytes(6 /* remaining header bytes */); 330 state = State.HEADER_EXTRA_LEN; 331 return true; 332 } 333 processHeaderExtraLen()334 private boolean processHeaderExtraLen() { 335 if ((gzipHeaderFlag & HEADER_EXTRA_FLAG) != HEADER_EXTRA_FLAG) { 336 state = State.HEADER_NAME; 337 return true; 338 } 339 if (gzipMetadataReader.readableBytes() < UNSIGNED_SHORT_SIZE) { 340 return false; 341 } 342 headerExtraToRead = gzipMetadataReader.readUnsignedShort(); 343 state = State.HEADER_EXTRA; 344 return true; 345 } 346 processHeaderExtra()347 private boolean processHeaderExtra() { 348 if (gzipMetadataReader.readableBytes() < headerExtraToRead) { 349 return false; 350 } 351 gzipMetadataReader.skipBytes(headerExtraToRead); 352 state = State.HEADER_NAME; 353 return true; 354 } 355 processHeaderName()356 private boolean processHeaderName() { 357 if ((gzipHeaderFlag & HEADER_NAME_FLAG) != HEADER_NAME_FLAG) { 358 state = State.HEADER_COMMENT; 359 return true; 360 } 361 if (!gzipMetadataReader.readBytesUntilZero()) { 362 return false; 363 } 364 state = State.HEADER_COMMENT; 365 return true; 366 } 367 processHeaderComment()368 private boolean processHeaderComment() { 369 if ((gzipHeaderFlag & HEADER_COMMENT_FLAG) != HEADER_COMMENT_FLAG) { 370 state = State.HEADER_CRC; 371 return true; 372 } 373 if (!gzipMetadataReader.readBytesUntilZero()) { 374 return false; 375 } 376 state = State.HEADER_CRC; 377 return true; 378 } 379 processHeaderCrc()380 private boolean processHeaderCrc() throws ZipException { 381 if ((gzipHeaderFlag & HEADER_CRC_FLAG) != HEADER_CRC_FLAG) { 382 state = State.INITIALIZE_INFLATER; 383 return true; 384 } 385 if (gzipMetadataReader.readableBytes() < UNSIGNED_SHORT_SIZE) { 386 return false; 387 } 388 int desiredCrc16 = (int) crc.getValue() & 0xffff; 389 if (desiredCrc16 != gzipMetadataReader.readUnsignedShort()) { 390 throw new ZipException("Corrupt GZIP header"); 391 } 392 state = State.INITIALIZE_INFLATER; 393 return true; 394 } 395 initializeInflater()396 private boolean initializeInflater() { 397 if (inflater == null) { 398 inflater = new Inflater(true); 399 } else { 400 inflater.reset(); 401 } 402 crc.reset(); 403 int bytesRemainingInInflaterInput = inflaterInputEnd - inflaterInputStart; 404 if (bytesRemainingInInflaterInput > 0) { 405 inflater.setInput(inflaterInput, inflaterInputStart, bytesRemainingInInflaterInput); 406 state = State.INFLATING; 407 } else { 408 state = State.INFLATER_NEEDS_INPUT; 409 } 410 return true; 411 } 412 inflate(byte[] b, int off, int len)413 private int inflate(byte[] b, int off, int len) throws DataFormatException, ZipException { 414 checkState(inflater != null, "inflater is null"); 415 416 try { 417 int inflaterTotalIn = inflater.getTotalIn(); 418 int n = inflater.inflate(b, off, len); 419 int bytesConsumedDelta = inflater.getTotalIn() - inflaterTotalIn; 420 bytesConsumed += bytesConsumedDelta; 421 deflatedBytesConsumed += bytesConsumedDelta; 422 inflaterInputStart += bytesConsumedDelta; 423 crc.update(b, off, n); 424 425 if (inflater.finished()) { 426 // Save bytes written to check against the trailer ISIZE 427 expectedGzipTrailerIsize = (inflater.getBytesWritten() & 0xffffffffL); 428 429 state = State.TRAILER; 430 } else if (inflater.needsInput()) { 431 state = State.INFLATER_NEEDS_INPUT; 432 } 433 434 return n; 435 } catch (DataFormatException e) { 436 // Wrap the exception so tests can check for a specific prefix 437 throw new DataFormatException("Inflater data format exception: " + e.getMessage()); 438 } 439 } 440 fill()441 private boolean fill() { 442 checkState(inflater != null, "inflater is null"); 443 checkState(inflaterInputStart == inflaterInputEnd, "inflaterInput has unconsumed bytes"); 444 int bytesToAdd = Math.min(gzippedData.readableBytes(), INFLATE_BUFFER_SIZE); 445 if (bytesToAdd == 0) { 446 return false; 447 } 448 inflaterInputStart = 0; 449 inflaterInputEnd = bytesToAdd; 450 gzippedData.readBytes(inflaterInput, inflaterInputStart, bytesToAdd); 451 inflater.setInput(inflaterInput, inflaterInputStart, bytesToAdd); 452 state = State.INFLATING; 453 return true; 454 } 455 processTrailer()456 private boolean processTrailer() throws ZipException { 457 if (inflater != null 458 && gzipMetadataReader.readableBytes() <= GZIP_HEADER_MIN_SIZE + GZIP_TRAILER_SIZE) { 459 // We don't have enough bytes to begin inflating a concatenated gzip stream, drop context 460 inflater.end(); 461 inflater = null; 462 } 463 if (gzipMetadataReader.readableBytes() < GZIP_TRAILER_SIZE) { 464 return false; 465 } 466 if (crc.getValue() != gzipMetadataReader.readUnsignedInt() 467 || expectedGzipTrailerIsize != gzipMetadataReader.readUnsignedInt()) { 468 throw new ZipException("Corrupt GZIP trailer"); 469 } 470 crc.reset(); 471 state = State.HEADER; 472 return true; 473 } 474 } 475