1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.squareup.okhttp.internal.http; 18 19 import com.squareup.okhttp.Headers; 20 import com.squareup.okhttp.Request; 21 import com.squareup.okhttp.Response; 22 import com.squareup.okhttp.ResponseBody; 23 import com.squareup.okhttp.internal.Internal; 24 import com.squareup.okhttp.internal.Util; 25 import com.squareup.okhttp.internal.io.RealConnection; 26 import java.io.EOFException; 27 import java.io.IOException; 28 import java.net.ProtocolException; 29 import okio.Buffer; 30 import okio.BufferedSink; 31 import okio.BufferedSource; 32 import okio.ForwardingTimeout; 33 import okio.Okio; 34 import okio.Sink; 35 import okio.Source; 36 import okio.Timeout; 37 38 import static com.squareup.okhttp.internal.Util.checkOffsetAndCount; 39 import static com.squareup.okhttp.internal.http.StatusLine.HTTP_CONTINUE; 40 import static java.util.concurrent.TimeUnit.MILLISECONDS; 41 42 /** 43 * A socket connection that can be used to send HTTP/1.1 messages. This class 44 * strictly enforces the following lifecycle: 45 * <ol> 46 * <li>{@link #writeRequest Send request headers}. 47 * <li>Open a sink to write the request body. Either {@link 48 * #newFixedLengthSink fixed-length} or {@link #newChunkedSink chunked}. 49 * <li>Write to and then close that sink. 50 * <li>{@link #readResponse Read response headers}. 51 * <li>Open a source to read the response body. Either {@link 52 * #newFixedLengthSource fixed-length}, {@link #newChunkedSource chunked} 53 * or {@link #newUnknownLengthSource unknown length}. 54 * <li>Read from and close that source. 55 * </ol> 56 * <p>Exchanges that do not have a request body may skip creating and closing 57 * the request body. Exchanges that do not have a response body can call {@link 58 * #newFixedLengthSource(long) newFixedLengthSource(0)} and may skip reading and 59 * closing that source. 60 */ 61 public final class Http1xStream implements HttpStream { 62 private static final int STATE_IDLE = 0; // Idle connections are ready to write request headers. 63 private static final int STATE_OPEN_REQUEST_BODY = 1; 64 private static final int STATE_WRITING_REQUEST_BODY = 2; 65 private static final int STATE_READ_RESPONSE_HEADERS = 3; 66 private static final int STATE_OPEN_RESPONSE_BODY = 4; 67 private static final int STATE_READING_RESPONSE_BODY = 5; 68 private static final int STATE_CLOSED = 6; 69 70 /** The stream allocation that owns this stream. May be null for HTTPS proxy tunnels. */ 71 private final StreamAllocation streamAllocation; 72 private final BufferedSource source; 73 private final BufferedSink sink; 74 private HttpEngine httpEngine; 75 private int state = STATE_IDLE; 76 Http1xStream(StreamAllocation streamAllocation, BufferedSource source, BufferedSink sink)77 public Http1xStream(StreamAllocation streamAllocation, BufferedSource source, BufferedSink sink) { 78 this.streamAllocation = streamAllocation; 79 this.source = source; 80 this.sink = sink; 81 } 82 setHttpEngine(HttpEngine httpEngine)83 @Override public void setHttpEngine(HttpEngine httpEngine) { 84 this.httpEngine = httpEngine; 85 } 86 createRequestBody(Request request, long contentLength)87 @Override public Sink createRequestBody(Request request, long contentLength) throws IOException { 88 if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) { 89 // Stream a request body of unknown length. 90 return newChunkedSink(); 91 } 92 93 if (contentLength != -1) { 94 // Stream a request body of a known length. 95 return newFixedLengthSink(contentLength); 96 } 97 98 throw new IllegalStateException( 99 "Cannot stream a request body without chunked encoding or a known content length!"); 100 } 101 cancel()102 @Override public void cancel() { 103 RealConnection connection = streamAllocation.connection(); 104 if (connection != null) connection.cancel(); 105 } 106 107 /** 108 * Prepares the HTTP headers and sends them to the server. 109 * 110 * <p>For streaming requests with a body, headers must be prepared 111 * <strong>before</strong> the output stream has been written to. Otherwise 112 * the body would need to be buffered! 113 * 114 * <p>For non-streaming requests with a body, headers must be prepared 115 * <strong>after</strong> the output stream has been written to and closed. 116 * This ensures that the {@code Content-Length} header field receives the 117 * proper value. 118 */ writeRequestHeaders(Request request)119 @Override public void writeRequestHeaders(Request request) throws IOException { 120 httpEngine.writingRequestHeaders(); 121 String requestLine = RequestLine.get( 122 request, httpEngine.getConnection().getRoute().getProxy().type()); 123 writeRequest(request.headers(), requestLine); 124 } 125 readResponseHeaders()126 @Override public Response.Builder readResponseHeaders() throws IOException { 127 return readResponse(); 128 } 129 openResponseBody(Response response)130 @Override public ResponseBody openResponseBody(Response response) throws IOException { 131 Source source = getTransferStream(response); 132 return new RealResponseBody(response.headers(), Okio.buffer(source)); 133 } 134 getTransferStream(Response response)135 private Source getTransferStream(Response response) throws IOException { 136 if (!HttpEngine.hasBody(response)) { 137 return newFixedLengthSource(0); 138 } 139 140 if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) { 141 return newChunkedSource(httpEngine); 142 } 143 144 long contentLength = OkHeaders.contentLength(response); 145 if (contentLength != -1) { 146 return newFixedLengthSource(contentLength); 147 } 148 149 // Wrap the input stream from the connection (rather than just returning 150 // "socketIn" directly here), so that we can control its use after the 151 // reference escapes. 152 return newUnknownLengthSource(); 153 } 154 155 /** Returns true if this connection is closed. */ isClosed()156 public boolean isClosed() { 157 return state == STATE_CLOSED; 158 } 159 finishRequest()160 @Override public void finishRequest() throws IOException { 161 sink.flush(); 162 } 163 164 /** Returns bytes of a request header for sending on an HTTP transport. */ writeRequest(Headers headers, String requestLine)165 public void writeRequest(Headers headers, String requestLine) throws IOException { 166 if (state != STATE_IDLE) throw new IllegalStateException("state: " + state); 167 sink.writeUtf8(requestLine).writeUtf8("\r\n"); 168 for (int i = 0, size = headers.size(); i < size; i ++) { 169 sink.writeUtf8(headers.name(i)) 170 .writeUtf8(": ") 171 .writeUtf8(headers.value(i)) 172 .writeUtf8("\r\n"); 173 } 174 sink.writeUtf8("\r\n"); 175 state = STATE_OPEN_REQUEST_BODY; 176 } 177 178 /** Parses bytes of a response header from an HTTP transport. */ readResponse()179 public Response.Builder readResponse() throws IOException { 180 if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) { 181 throw new IllegalStateException("state: " + state); 182 } 183 184 try { 185 while (true) { 186 StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict()); 187 188 Response.Builder responseBuilder = new Response.Builder() 189 .protocol(statusLine.protocol) 190 .code(statusLine.code) 191 .message(statusLine.message) 192 .headers(readHeaders()); 193 194 if (statusLine.code != HTTP_CONTINUE) { 195 state = STATE_OPEN_RESPONSE_BODY; 196 return responseBuilder; 197 } 198 } 199 } catch (EOFException e) { 200 // Provide more context if the server ends the stream before sending a response. 201 IOException exception = new IOException("unexpected end of stream on " + streamAllocation); 202 exception.initCause(e); 203 throw exception; 204 } 205 } 206 207 /** Reads headers or trailers. */ readHeaders()208 public Headers readHeaders() throws IOException { 209 Headers.Builder headers = new Headers.Builder(); 210 // parse the result headers until the first blank line 211 for (String line; (line = source.readUtf8LineStrict()).length() != 0; ) { 212 Internal.instance.addLenient(headers, line); 213 } 214 return headers.build(); 215 } 216 newChunkedSink()217 public Sink newChunkedSink() { 218 if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state); 219 state = STATE_WRITING_REQUEST_BODY; 220 return new ChunkedSink(); 221 } 222 newFixedLengthSink(long contentLength)223 public Sink newFixedLengthSink(long contentLength) { 224 if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state); 225 state = STATE_WRITING_REQUEST_BODY; 226 return new FixedLengthSink(contentLength); 227 } 228 writeRequestBody(RetryableSink requestBody)229 @Override public void writeRequestBody(RetryableSink requestBody) throws IOException { 230 if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state); 231 state = STATE_READ_RESPONSE_HEADERS; 232 requestBody.writeToSocket(sink); 233 } 234 newFixedLengthSource(long length)235 public Source newFixedLengthSource(long length) throws IOException { 236 if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state); 237 state = STATE_READING_RESPONSE_BODY; 238 return new FixedLengthSource(length); 239 } 240 newChunkedSource(HttpEngine httpEngine)241 public Source newChunkedSource(HttpEngine httpEngine) throws IOException { 242 if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state); 243 state = STATE_READING_RESPONSE_BODY; 244 return new ChunkedSource(httpEngine); 245 } 246 newUnknownLengthSource()247 public Source newUnknownLengthSource() throws IOException { 248 if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state); 249 if (streamAllocation == null) throw new IllegalStateException("streamAllocation == null"); 250 state = STATE_READING_RESPONSE_BODY; 251 streamAllocation.noNewStreams(); 252 return new UnknownLengthSource(); 253 } 254 255 /** 256 * Sets the delegate of {@code timeout} to {@link Timeout#NONE} and resets its underlying timeout 257 * to the default configuration. Use this to avoid unexpected sharing of timeouts between pooled 258 * connections. 259 */ detachTimeout(ForwardingTimeout timeout)260 private void detachTimeout(ForwardingTimeout timeout) { 261 Timeout oldDelegate = timeout.delegate(); 262 timeout.setDelegate(Timeout.NONE); 263 oldDelegate.clearDeadline(); 264 oldDelegate.clearTimeout(); 265 } 266 267 /** An HTTP body with a fixed length known in advance. */ 268 private final class FixedLengthSink implements Sink { 269 private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout()); 270 private boolean closed; 271 private long bytesRemaining; 272 FixedLengthSink(long bytesRemaining)273 private FixedLengthSink(long bytesRemaining) { 274 this.bytesRemaining = bytesRemaining; 275 } 276 timeout()277 @Override public Timeout timeout() { 278 return timeout; 279 } 280 write(Buffer source, long byteCount)281 @Override public void write(Buffer source, long byteCount) throws IOException { 282 if (closed) throw new IllegalStateException("closed"); 283 checkOffsetAndCount(source.size(), 0, byteCount); 284 if (byteCount > bytesRemaining) { 285 throw new ProtocolException("expected " + bytesRemaining 286 + " bytes but received " + byteCount); 287 } 288 sink.write(source, byteCount); 289 bytesRemaining -= byteCount; 290 } 291 flush()292 @Override public void flush() throws IOException { 293 if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf. 294 sink.flush(); 295 } 296 close()297 @Override public void close() throws IOException { 298 if (closed) return; 299 closed = true; 300 if (bytesRemaining > 0) throw new ProtocolException("unexpected end of stream"); 301 detachTimeout(timeout); 302 state = STATE_READ_RESPONSE_HEADERS; 303 } 304 } 305 306 /** 307 * An HTTP body with alternating chunk sizes and chunk bodies. It is the 308 * caller's responsibility to buffer chunks; typically by using a buffered 309 * sink with this sink. 310 */ 311 private final class ChunkedSink implements Sink { 312 private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout()); 313 private boolean closed; 314 timeout()315 @Override public Timeout timeout() { 316 return timeout; 317 } 318 write(Buffer source, long byteCount)319 @Override public void write(Buffer source, long byteCount) throws IOException { 320 if (closed) throw new IllegalStateException("closed"); 321 if (byteCount == 0) return; 322 323 sink.writeHexadecimalUnsignedLong(byteCount); 324 sink.writeUtf8("\r\n"); 325 sink.write(source, byteCount); 326 sink.writeUtf8("\r\n"); 327 } 328 flush()329 @Override public synchronized void flush() throws IOException { 330 if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf. 331 sink.flush(); 332 } 333 close()334 @Override public synchronized void close() throws IOException { 335 if (closed) return; 336 closed = true; 337 sink.writeUtf8("0\r\n\r\n"); 338 detachTimeout(timeout); 339 state = STATE_READ_RESPONSE_HEADERS; 340 } 341 } 342 343 private abstract class AbstractSource implements Source { 344 protected final ForwardingTimeout timeout = new ForwardingTimeout(source.timeout()); 345 protected boolean closed; 346 timeout()347 @Override public Timeout timeout() { 348 return timeout; 349 } 350 351 /** 352 * Closes the cache entry and makes the socket available for reuse. This 353 * should be invoked when the end of the body has been reached. 354 */ endOfInput()355 protected final void endOfInput() throws IOException { 356 if (state != STATE_READING_RESPONSE_BODY) throw new IllegalStateException("state: " + state); 357 358 detachTimeout(timeout); 359 360 state = STATE_CLOSED; 361 if (streamAllocation != null) { 362 streamAllocation.streamFinished(Http1xStream.this); 363 } 364 } 365 unexpectedEndOfInput()366 protected final void unexpectedEndOfInput() { 367 if (state == STATE_CLOSED) return; 368 369 state = STATE_CLOSED; 370 if (streamAllocation != null) { 371 streamAllocation.noNewStreams(); 372 streamAllocation.streamFinished(Http1xStream.this); 373 } 374 } 375 } 376 377 /** An HTTP body with a fixed length specified in advance. */ 378 private class FixedLengthSource extends AbstractSource { 379 private long bytesRemaining; 380 FixedLengthSource(long length)381 public FixedLengthSource(long length) throws IOException { 382 bytesRemaining = length; 383 if (bytesRemaining == 0) { 384 endOfInput(); 385 } 386 } 387 read(Buffer sink, long byteCount)388 @Override public long read(Buffer sink, long byteCount) throws IOException { 389 if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); 390 if (closed) throw new IllegalStateException("closed"); 391 if (bytesRemaining == 0) return -1; 392 393 long read = source.read(sink, Math.min(bytesRemaining, byteCount)); 394 if (read == -1) { 395 unexpectedEndOfInput(); // The server didn't supply the promised content length. 396 throw new ProtocolException("unexpected end of stream"); 397 } 398 399 bytesRemaining -= read; 400 if (bytesRemaining == 0) { 401 endOfInput(); 402 } 403 return read; 404 } 405 close()406 @Override public void close() throws IOException { 407 if (closed) return; 408 409 if (bytesRemaining != 0 410 && !Util.discard(this, DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) { 411 unexpectedEndOfInput(); 412 } 413 414 closed = true; 415 } 416 } 417 418 /** An HTTP body with alternating chunk sizes and chunk bodies. */ 419 private class ChunkedSource extends AbstractSource { 420 private static final long NO_CHUNK_YET = -1L; 421 private long bytesRemainingInChunk = NO_CHUNK_YET; 422 private boolean hasMoreChunks = true; 423 private final HttpEngine httpEngine; 424 ChunkedSource(HttpEngine httpEngine)425 ChunkedSource(HttpEngine httpEngine) throws IOException { 426 this.httpEngine = httpEngine; 427 } 428 read(Buffer sink, long byteCount)429 @Override public long read(Buffer sink, long byteCount) throws IOException { 430 if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); 431 if (closed) throw new IllegalStateException("closed"); 432 if (!hasMoreChunks) return -1; 433 434 if (bytesRemainingInChunk == 0 || bytesRemainingInChunk == NO_CHUNK_YET) { 435 readChunkSize(); 436 if (!hasMoreChunks) return -1; 437 } 438 439 long read = source.read(sink, Math.min(byteCount, bytesRemainingInChunk)); 440 if (read == -1) { 441 unexpectedEndOfInput(); // The server didn't supply the promised chunk length. 442 throw new ProtocolException("unexpected end of stream"); 443 } 444 bytesRemainingInChunk -= read; 445 return read; 446 } 447 readChunkSize()448 private void readChunkSize() throws IOException { 449 // Read the suffix of the previous chunk. 450 if (bytesRemainingInChunk != NO_CHUNK_YET) { 451 source.readUtf8LineStrict(); 452 } 453 try { 454 bytesRemainingInChunk = source.readHexadecimalUnsignedLong(); 455 String extensions = source.readUtf8LineStrict().trim(); 456 if (bytesRemainingInChunk < 0 || (!extensions.isEmpty() && !extensions.startsWith(";"))) { 457 throw new ProtocolException("expected chunk size and optional extensions but was \"" 458 + bytesRemainingInChunk + extensions + "\""); 459 } 460 } catch (NumberFormatException e) { 461 throw new ProtocolException(e.getMessage()); 462 } 463 if (bytesRemainingInChunk == 0L) { 464 hasMoreChunks = false; 465 httpEngine.receiveHeaders(readHeaders()); 466 endOfInput(); 467 } 468 } 469 close()470 @Override public void close() throws IOException { 471 if (closed) return; 472 if (hasMoreChunks && !Util.discard(this, DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) { 473 unexpectedEndOfInput(); 474 } 475 closed = true; 476 } 477 } 478 479 /** An HTTP message body terminated by the end of the underlying stream. */ 480 private class UnknownLengthSource extends AbstractSource { 481 private boolean inputExhausted; 482 read(Buffer sink, long byteCount)483 @Override public long read(Buffer sink, long byteCount) 484 throws IOException { 485 if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); 486 if (closed) throw new IllegalStateException("closed"); 487 if (inputExhausted) return -1; 488 489 long read = source.read(sink, byteCount); 490 if (read == -1) { 491 inputExhausted = true; 492 endOfInput(); 493 return -1; 494 } 495 return read; 496 } 497 close()498 @Override public void close() throws IOException { 499 if (closed) return; 500 if (!inputExhausted) { 501 unexpectedEndOfInput(); 502 } 503 closed = true; 504 } 505 } 506 } 507