1 /* 2 * Copyright (c) 1999, 2006, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.net.www.http; 26 27 import java.io.*; 28 import java.util.*; 29 30 import sun.net.*; 31 import sun.net.www.*; 32 33 /** 34 * A <code>ChunkedInputStream</code> provides a stream for reading a body of 35 * a http message that can be sent as a series of chunks, each with its own 36 * size indicator. Optionally the last chunk can be followed by trailers 37 * containing entity-header fields. 38 * <p> 39 * A <code>ChunkedInputStream</code> is also <code>Hurryable</code> so it 40 * can be hurried to the end of the stream if the bytes are available on 41 * the underlying stream. 42 */ 43 public 44 class ChunkedInputStream extends InputStream implements Hurryable { 45 46 /** 47 * The underlying stream 48 */ 49 private InputStream in; 50 51 /** 52 * The <code>HttpClient</code> that should be notified when the chunked stream has 53 * completed. 54 */ 55 private HttpClient hc; 56 57 /** 58 * The <code>MessageHeader</code> that is populated with any optional trailer 59 * that appear after the last chunk. 60 */ 61 private MessageHeader responses; 62 63 /** 64 * The size, in bytes, of the chunk that is currently being read. 65 * This size is only valid if the current position in the underlying 66 * input stream is inside a chunk (ie: state == STATE_READING_CHUNK). 67 */ 68 private int chunkSize; 69 70 /** 71 * The number of bytes read from the underlying stream for the current 72 * chunk. This value is always in the range <code>0</code> through to 73 * <code>chunkSize</code> 74 */ 75 private int chunkRead; 76 77 /** 78 * The internal buffer array where chunk data is available for the 79 * application to read. 80 */ 81 private byte chunkData[] = new byte[4096]; 82 83 /** 84 * The current position in the buffer. It contains the index 85 * of the next byte to read from <code>chunkData</code> 86 */ 87 private int chunkPos; 88 89 /** 90 * The index one greater than the index of the last valid byte in the 91 * buffer. This value is always in the range <code>0</code> through 92 * <code>chunkData.length</code>. 93 */ 94 private int chunkCount; 95 96 /** 97 * The internal buffer where bytes from the underlying stream can be 98 * read. It may contain bytes representing chunk-size, chunk-data, or 99 * trailer fields. 100 */ 101 private byte rawData[] = new byte[32]; 102 103 /** 104 * The current position in the buffer. It contains the index 105 * of the next byte to read from <code>rawData</code> 106 */ 107 private int rawPos; 108 109 /** 110 * The index one greater than the index of the last valid byte in the 111 * buffer. This value is always in the range <code>0</code> through 112 * <code>rawData.length</code>. 113 */ 114 private int rawCount; 115 116 /** 117 * Indicates if an error was encountered when processing the chunked 118 * stream. 119 */ 120 private boolean error; 121 122 /** 123 * Indicates if the chunked stream has been closed using the 124 * <code>close</code> method. 125 */ 126 private boolean closed; 127 128 /* 129 * Maximum chunk header size of 2KB + 2 bytes for CRLF 130 */ 131 private final static int MAX_CHUNK_HEADER_SIZE = 2050; 132 133 /** 134 * State to indicate that next field should be :- 135 * chunk-size [ chunk-extension ] CRLF 136 */ 137 static final int STATE_AWAITING_CHUNK_HEADER = 1; 138 139 /** 140 * State to indicate that we are currently reading the chunk-data. 141 */ 142 static final int STATE_READING_CHUNK = 2; 143 144 /** 145 * Indicates that a chunk has been completely read and the next 146 * fields to be examine should be CRLF 147 */ 148 static final int STATE_AWAITING_CHUNK_EOL = 3; 149 150 /** 151 * Indicates that all chunks have been read and the next field 152 * should be optional trailers or an indication that the chunked 153 * stream is complete. 154 */ 155 static final int STATE_AWAITING_TRAILERS = 4; 156 157 /** 158 * State to indicate that the chunked stream is complete and 159 * no further bytes should be read from the underlying stream. 160 */ 161 static final int STATE_DONE = 5; 162 163 /** 164 * Indicates the current state. 165 */ 166 private int state; 167 168 169 /** 170 * Check to make sure that this stream has not been closed. 171 */ ensureOpen()172 private void ensureOpen() throws IOException { 173 if (closed) { 174 throw new IOException("stream is closed"); 175 } 176 } 177 178 179 /** 180 * Ensures there is <code>size</code> bytes available in 181 * <code>rawData</code>. This requires that we either 182 * shift the bytes in use to the begining of the buffer 183 * or allocate a large buffer with sufficient space available. 184 */ ensureRawAvailable(int size)185 private void ensureRawAvailable(int size) { 186 if (rawCount + size > rawData.length) { 187 int used = rawCount - rawPos; 188 if (used + size > rawData.length) { 189 byte tmp[] = new byte[used + size]; 190 if (used > 0) { 191 System.arraycopy(rawData, rawPos, tmp, 0, used); 192 } 193 rawData = tmp; 194 } else { 195 if (used > 0) { 196 System.arraycopy(rawData, rawPos, rawData, 0, used); 197 } 198 } 199 rawCount = used; 200 rawPos = 0; 201 } 202 } 203 204 205 /** 206 * Close the underlying input stream by either returning it to the 207 * keep alive cache or closing the stream. 208 * <p> 209 * As a chunked stream is inheritly persistent (see HTTP 1.1 RFC) the 210 * underlying stream can be returned to the keep alive cache if the 211 * stream can be completely read without error. 212 */ closeUnderlying()213 private void closeUnderlying() throws IOException { 214 if (in == null) { 215 return; 216 } 217 218 if (!error && state == STATE_DONE) { 219 hc.finished(); 220 } else { 221 if (!hurry()) { 222 hc.closeServer(); 223 } 224 } 225 226 in = null; 227 } 228 229 /** 230 * Attempt to read the remainder of a chunk directly into the 231 * caller's buffer. 232 * <p> 233 * Return the number of bytes read. 234 */ fastRead(byte[] b, int off, int len)235 private int fastRead(byte[] b, int off, int len) throws IOException { 236 237 // assert state == STATE_READING_CHUNKS; 238 239 int remaining = chunkSize - chunkRead; 240 int cnt = (remaining < len) ? remaining : len; 241 if (cnt > 0) { 242 int nread; 243 try { 244 nread = in.read(b, off, cnt); 245 } catch (IOException e) { 246 error = true; 247 throw e; 248 } 249 if (nread > 0) { 250 chunkRead += nread; 251 if (chunkRead >= chunkSize) { 252 state = STATE_AWAITING_CHUNK_EOL; 253 } 254 return nread; 255 } 256 error = true; 257 throw new IOException("Premature EOF"); 258 } else { 259 return 0; 260 } 261 } 262 263 /** 264 * Process any outstanding bytes that have already been read into 265 * <code>rawData</code>. 266 * <p> 267 * The parsing of the chunked stream is performed as a state machine with 268 * <code>state</code> representing the current state of the processing. 269 * <p> 270 * Returns when either all the outstanding bytes in rawData have been 271 * processed or there is insufficient bytes available to continue 272 * processing. When the latter occurs <code>rawPos</code> will not have 273 * been updated and thus the processing can be restarted once further 274 * bytes have been read into <code>rawData</code>. 275 */ processRaw()276 private void processRaw() throws IOException { 277 int pos; 278 int i; 279 280 while (state != STATE_DONE) { 281 282 switch (state) { 283 284 /** 285 * We are awaiting a line with a chunk header 286 */ 287 case STATE_AWAITING_CHUNK_HEADER: 288 /* 289 * Find \n to indicate end of chunk header. If not found when there is 290 * insufficient bytes in the raw buffer to parse a chunk header. 291 */ 292 pos = rawPos; 293 while (pos < rawCount) { 294 if (rawData[pos] == '\n') { 295 break; 296 } 297 pos++; 298 if ((pos - rawPos) >= MAX_CHUNK_HEADER_SIZE) { 299 error = true; 300 throw new IOException("Chunk header too long"); 301 } 302 } 303 if (pos >= rawCount) { 304 return; 305 } 306 307 /* 308 * Extract the chunk size from the header (ignoring extensions). 309 */ 310 String header = new String(rawData, rawPos, pos-rawPos+1, "US-ASCII"); 311 for (i=0; i < header.length(); i++) { 312 if (Character.digit(header.charAt(i), 16) == -1) 313 break; 314 } 315 try { 316 chunkSize = Integer.parseInt(header.substring(0, i), 16); 317 } catch (NumberFormatException e) { 318 error = true; 319 throw new IOException("Bogus chunk size"); 320 } 321 322 /* 323 * Chunk has been parsed so move rawPos to first byte of chunk 324 * data. 325 */ 326 rawPos = pos + 1; 327 chunkRead = 0; 328 329 /* 330 * A chunk size of 0 means EOF. 331 */ 332 if (chunkSize > 0) { 333 state = STATE_READING_CHUNK; 334 } else { 335 state = STATE_AWAITING_TRAILERS; 336 } 337 break; 338 339 340 /** 341 * We are awaiting raw entity data (some may have already been 342 * read). chunkSize is the size of the chunk; chunkRead is the 343 * total read from the underlying stream to date. 344 */ 345 case STATE_READING_CHUNK : 346 /* no data available yet */ 347 if (rawPos >= rawCount) { 348 return; 349 } 350 351 /* 352 * Compute the number of bytes of chunk data available in the 353 * raw buffer. 354 */ 355 int copyLen = Math.min( chunkSize-chunkRead, rawCount-rawPos ); 356 357 /* 358 * Expand or compact chunkData if needed. 359 */ 360 if (chunkData.length < chunkCount + copyLen) { 361 int cnt = chunkCount - chunkPos; 362 if (chunkData.length < cnt + copyLen) { 363 byte tmp[] = new byte[cnt + copyLen]; 364 System.arraycopy(chunkData, chunkPos, tmp, 0, cnt); 365 chunkData = tmp; 366 } else { 367 System.arraycopy(chunkData, chunkPos, chunkData, 0, cnt); 368 } 369 chunkPos = 0; 370 chunkCount = cnt; 371 } 372 373 /* 374 * Copy the chunk data into chunkData so that it's available 375 * to the read methods. 376 */ 377 System.arraycopy(rawData, rawPos, chunkData, chunkCount, copyLen); 378 rawPos += copyLen; 379 chunkCount += copyLen; 380 chunkRead += copyLen; 381 382 /* 383 * If all the chunk has been copied into chunkData then the next 384 * token should be CRLF. 385 */ 386 if (chunkSize - chunkRead <= 0) { 387 state = STATE_AWAITING_CHUNK_EOL; 388 } else { 389 return; 390 } 391 break; 392 393 394 /** 395 * Awaiting CRLF after the chunk 396 */ 397 case STATE_AWAITING_CHUNK_EOL: 398 /* not available yet */ 399 if (rawPos + 1 >= rawCount) { 400 return; 401 } 402 403 if (rawData[rawPos] != '\r') { 404 error = true; 405 throw new IOException("missing CR"); 406 } 407 if (rawData[rawPos+1] != '\n') { 408 error = true; 409 throw new IOException("missing LF"); 410 } 411 rawPos += 2; 412 413 /* 414 * Move onto the next chunk 415 */ 416 state = STATE_AWAITING_CHUNK_HEADER; 417 break; 418 419 420 /** 421 * Last chunk has been read so not we're waiting for optional 422 * trailers. 423 */ 424 case STATE_AWAITING_TRAILERS: 425 426 /* 427 * Do we have an entire line in the raw buffer? 428 */ 429 pos = rawPos; 430 while (pos < rawCount) { 431 if (rawData[pos] == '\n') { 432 break; 433 } 434 pos++; 435 } 436 if (pos >= rawCount) { 437 return; 438 } 439 440 if (pos == rawPos) { 441 error = true; 442 throw new IOException("LF should be proceeded by CR"); 443 } 444 if (rawData[pos-1] != '\r') { 445 error = true; 446 throw new IOException("LF should be proceeded by CR"); 447 } 448 449 /* 450 * Stream done so close underlying stream. 451 */ 452 if (pos == (rawPos + 1)) { 453 454 state = STATE_DONE; 455 closeUnderlying(); 456 457 return; 458 } 459 460 /* 461 * Extract any tailers and append them to the message 462 * headers. 463 */ 464 String trailer = new String(rawData, rawPos, pos-rawPos, "US-ASCII"); 465 i = trailer.indexOf(':'); 466 if (i == -1) { 467 throw new IOException("Malformed tailer - format should be key:value"); 468 } 469 String key = (trailer.substring(0, i)).trim(); 470 String value = (trailer.substring(i+1, trailer.length())).trim(); 471 472 responses.add(key, value); 473 474 /* 475 * Move onto the next trailer. 476 */ 477 rawPos = pos+1; 478 break; 479 480 } /* switch */ 481 } 482 } 483 484 485 /** 486 * Reads any available bytes from the underlying stream into 487 * <code>rawData</code> and returns the number of bytes of 488 * chunk data available in <code>chunkData</code> that the 489 * application can read. 490 */ readAheadNonBlocking()491 private int readAheadNonBlocking() throws IOException { 492 493 /* 494 * If there's anything available on the underlying stream then we read 495 * it into the raw buffer and process it. Processing ensures that any 496 * available chunk data is made available in chunkData. 497 */ 498 int avail = in.available(); 499 if (avail > 0) { 500 501 /* ensure that there is space in rawData to read the available */ 502 ensureRawAvailable(avail); 503 504 int nread; 505 try { 506 nread = in.read(rawData, rawCount, avail); 507 } catch (IOException e) { 508 error = true; 509 throw e; 510 } 511 if (nread < 0) { 512 error = true; /* premature EOF ? */ 513 return -1; 514 } 515 rawCount += nread; 516 517 /* 518 * Process the raw bytes that have been read. 519 */ 520 processRaw(); 521 } 522 523 /* 524 * Return the number of chunked bytes available to read 525 */ 526 return chunkCount - chunkPos; 527 } 528 529 /** 530 * Reads from the underlying stream until there is chunk data 531 * available in <code>chunkData</code> for the application to 532 * read. 533 */ readAheadBlocking()534 private int readAheadBlocking() throws IOException { 535 536 do { 537 /* 538 * All of chunked response has been read to return EOF. 539 */ 540 if (state == STATE_DONE) { 541 return -1; 542 } 543 544 /* 545 * We must read into the raw buffer so make sure there is space 546 * available. We use a size of 32 to avoid too much chunk data 547 * being read into the raw buffer. 548 */ 549 ensureRawAvailable(32); 550 int nread; 551 try { 552 nread = in.read(rawData, rawCount, rawData.length-rawCount); 553 } catch (IOException e) { 554 error = true; 555 throw e; 556 } 557 558 /** 559 * If we hit EOF it means there's a problem as we should never 560 * attempt to read once the last chunk and trailers have been 561 * received. 562 */ 563 if (nread < 0) { 564 error = true; 565 throw new IOException("Premature EOF"); 566 } 567 568 /** 569 * Process the bytes from the underlying stream 570 */ 571 rawCount += nread; 572 processRaw(); 573 574 } while (chunkCount <= 0); 575 576 /* 577 * Return the number of chunked bytes available to read 578 */ 579 return chunkCount - chunkPos; 580 } 581 582 /** 583 * Read ahead in either blocking or non-blocking mode. This method 584 * is typically used when we run out of available bytes in 585 * <code>chunkData</code> or we need to determine how many bytes 586 * are available on the input stream. 587 */ readAhead(boolean allowBlocking)588 private int readAhead(boolean allowBlocking) throws IOException { 589 590 /* 591 * Last chunk already received - return EOF 592 */ 593 if (state == STATE_DONE) { 594 return -1; 595 } 596 597 /* 598 * Reset position/count if data in chunkData is exhausted. 599 */ 600 if (chunkPos >= chunkCount) { 601 chunkCount = 0; 602 chunkPos = 0; 603 } 604 605 /* 606 * Read ahead blocking or non-blocking 607 */ 608 if (allowBlocking) { 609 return readAheadBlocking(); 610 } else { 611 return readAheadNonBlocking(); 612 } 613 } 614 615 /** 616 * Creates a <code>ChunkedInputStream</code> and saves its arguments, for 617 * later use. 618 * 619 * @param in the underlying input stream. 620 * @param hc the HttpClient 621 * @param responses the MessageHeader that should be populated with optional 622 * trailers. 623 */ ChunkedInputStream(InputStream in, HttpClient hc, MessageHeader responses)624 public ChunkedInputStream(InputStream in, HttpClient hc, MessageHeader responses) throws IOException { 625 626 /* save arguments */ 627 this.in = in; 628 this.responses = responses; 629 this.hc = hc; 630 631 /* 632 * Set our initial state to indicate that we are first starting to 633 * look for a chunk header. 634 */ 635 state = STATE_AWAITING_CHUNK_HEADER; 636 } 637 638 /** 639 * See 640 * the general contract of the <code>read</code> 641 * method of <code>InputStream</code>. 642 * 643 * @return the next byte of data, or <code>-1</code> if the end of the 644 * stream is reached. 645 * @exception IOException if an I/O error occurs. 646 * @see java.io.FilterInputStream#in 647 */ read()648 public synchronized int read() throws IOException { 649 ensureOpen(); 650 if (chunkPos >= chunkCount) { 651 if (readAhead(true) <= 0) { 652 return -1; 653 } 654 } 655 return chunkData[chunkPos++] & 0xff; 656 } 657 658 659 /** 660 * Reads bytes from this stream into the specified byte array, starting at 661 * the given offset. 662 * 663 * @param b destination buffer. 664 * @param off offset at which to start storing bytes. 665 * @param len maximum number of bytes to read. 666 * @return the number of bytes read, or <code>-1</code> if the end of 667 * the stream has been reached. 668 * @exception IOException if an I/O error occurs. 669 */ read(byte b[], int off, int len)670 public synchronized int read(byte b[], int off, int len) 671 throws IOException 672 { 673 ensureOpen(); 674 if ((off < 0) || (off > b.length) || (len < 0) || 675 ((off + len) > b.length) || ((off + len) < 0)) { 676 throw new IndexOutOfBoundsException(); 677 } else if (len == 0) { 678 return 0; 679 } 680 681 int avail = chunkCount - chunkPos; 682 if (avail <= 0) { 683 /* 684 * Optimization: if we're in the middle of the chunk read 685 * directly from the underlying stream into the caller's 686 * buffer 687 */ 688 if (state == STATE_READING_CHUNK) { 689 return fastRead( b, off, len ); 690 } 691 692 /* 693 * We're not in the middle of a chunk so we must read ahead 694 * until there is some chunk data available. 695 */ 696 avail = readAhead(true); 697 if (avail < 0) { 698 return -1; /* EOF */ 699 } 700 } 701 int cnt = (avail < len) ? avail : len; 702 System.arraycopy(chunkData, chunkPos, b, off, cnt); 703 chunkPos += cnt; 704 705 return cnt; 706 } 707 708 /** 709 * Returns the number of bytes that can be read from this input 710 * stream without blocking. 711 * 712 * @return the number of bytes that can be read from this input 713 * stream without blocking. 714 * @exception IOException if an I/O error occurs. 715 * @see java.io.FilterInputStream#in 716 */ available()717 public synchronized int available() throws IOException { 718 ensureOpen(); 719 720 int avail = chunkCount - chunkPos; 721 if(avail > 0) { 722 return avail; 723 } 724 725 avail = readAhead(false); 726 727 if (avail < 0) { 728 return 0; 729 } else { 730 return avail; 731 } 732 } 733 734 /** 735 * Close the stream by either returning the connection to the 736 * keep alive cache or closing the underlying stream. 737 * <p> 738 * If the chunked response hasn't been completely read we 739 * try to "hurry" to the end of the response. If this is 740 * possible (without blocking) then the connection can be 741 * returned to the keep alive cache. 742 * 743 * @exception IOException if an I/O error occurs. 744 */ close()745 public synchronized void close() throws IOException { 746 if (closed) { 747 return; 748 } 749 closeUnderlying(); 750 closed = true; 751 } 752 753 /** 754 * Hurry the input stream by reading everything from the underlying 755 * stream. If the last chunk (and optional trailers) can be read without 756 * blocking then the stream is considered hurried. 757 * <p> 758 * Note that if an error has occured or we can't get to last chunk 759 * without blocking then this stream can't be hurried and should be 760 * closed. 761 */ hurry()762 public synchronized boolean hurry() { 763 if (in == null || error) { 764 return false; 765 } 766 767 try { 768 readAhead(false); 769 } catch (Exception e) { 770 return false; 771 } 772 773 if (error) { 774 return false; 775 } 776 777 return (state == STATE_DONE); 778 } 779 780 } 781