• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.squareup.okhttp.internal.http;
18 
19 import com.squareup.okhttp.Connection;
20 import com.squareup.okhttp.ConnectionPool;
21 import com.squareup.okhttp.Headers;
22 import com.squareup.okhttp.Response;
23 import com.squareup.okhttp.internal.Internal;
24 import com.squareup.okhttp.internal.Util;
25 import java.io.EOFException;
26 import java.io.IOException;
27 import java.net.ProtocolException;
28 import java.net.Socket;
29 import java.net.SocketTimeoutException;
30 import okio.Buffer;
31 import okio.BufferedSink;
32 import okio.BufferedSource;
33 import okio.ForwardingTimeout;
34 import okio.Okio;
35 import okio.Sink;
36 import okio.Source;
37 import okio.Timeout;
38 
39 import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;
40 import static com.squareup.okhttp.internal.http.StatusLine.HTTP_CONTINUE;
41 import static com.squareup.okhttp.internal.http.Transport.DISCARD_STREAM_TIMEOUT_MILLIS;
42 import static java.util.concurrent.TimeUnit.MILLISECONDS;
43 
44 /**
45  * A socket connection that can be used to send HTTP/1.1 messages. This class
46  * strictly enforces the following lifecycle:
47  * <ol>
48  *   <li>{@link #writeRequest Send request headers}.
49  *   <li>Open a sink to write the request body. Either {@link
50  *       #newFixedLengthSink fixed-length} or {@link #newChunkedSink chunked}.
51  *   <li>Write to and then close that sink.
52  *   <li>{@link #readResponse Read response headers}.
53  *   <li>Open a source to read the response body. Either {@link
54  *       #newFixedLengthSource fixed-length}, {@link #newChunkedSource chunked}
55  *       or {@link #newUnknownLengthSource unknown length}.
56  *   <li>Read from and close that source.
57  * </ol>
58  * <p>Exchanges that do not have a request body may skip creating and closing
59  * the request body. Exchanges that do not have a response body can call {@link
60  * #newFixedLengthSource(long) newFixedLengthSource(0)} and may skip reading and
61  * closing that source.
62  */
63 public final class HttpConnection {
64   private static final int STATE_IDLE = 0; // Idle connections are ready to write request headers.
65   private static final int STATE_OPEN_REQUEST_BODY = 1;
66   private static final int STATE_WRITING_REQUEST_BODY = 2;
67   private static final int STATE_READ_RESPONSE_HEADERS = 3;
68   private static final int STATE_OPEN_RESPONSE_BODY = 4;
69   private static final int STATE_READING_RESPONSE_BODY = 5;
70   private static final int STATE_CLOSED = 6;
71 
72   private static final int ON_IDLE_HOLD = 0;
73   private static final int ON_IDLE_POOL = 1;
74   private static final int ON_IDLE_CLOSE = 2;
75 
76   private final ConnectionPool pool;
77   private final Connection connection;
78   private final Socket socket;
79   private final BufferedSource source;
80   private final BufferedSink sink;
81 
82   private int state = STATE_IDLE;
83   private int onIdle = ON_IDLE_HOLD;
84 
HttpConnection(ConnectionPool pool, Connection connection, Socket socket)85   public HttpConnection(ConnectionPool pool, Connection connection, Socket socket)
86       throws IOException {
87     this.pool = pool;
88     this.connection = connection;
89     this.socket = socket;
90     this.source = Okio.buffer(Okio.source(socket));
91     this.sink = Okio.buffer(Okio.sink(socket));
92   }
93 
setTimeouts(int readTimeoutMillis, int writeTimeoutMillis)94   public void setTimeouts(int readTimeoutMillis, int writeTimeoutMillis) {
95     if (readTimeoutMillis != 0) {
96       source.timeout().timeout(readTimeoutMillis, MILLISECONDS);
97     }
98     if (writeTimeoutMillis != 0) {
99       sink.timeout().timeout(writeTimeoutMillis, MILLISECONDS);
100     }
101   }
102 
103   /**
104    * Configure this connection to put itself back into the connection pool when
105    * the HTTP response body is exhausted.
106    */
poolOnIdle()107   public void poolOnIdle() {
108     onIdle = ON_IDLE_POOL;
109 
110     // If we're already idle, go to the pool immediately.
111     if (state == STATE_IDLE) {
112       onIdle = ON_IDLE_HOLD; // Set the on idle policy back to the default.
113       Internal.instance.recycle(pool, connection);
114     }
115   }
116 
117   /**
118    * Configure this connection to close itself when the HTTP response body is
119    * exhausted.
120    */
closeOnIdle()121   public void closeOnIdle() throws IOException {
122     onIdle = ON_IDLE_CLOSE;
123 
124     // If we're already idle, close immediately.
125     if (state == STATE_IDLE) {
126       state = STATE_CLOSED;
127       connection.getSocket().close();
128     }
129   }
130 
131   /** Returns true if this connection is closed. */
isClosed()132   public boolean isClosed() {
133     return state == STATE_CLOSED;
134   }
135 
closeIfOwnedBy(Object owner)136   public void closeIfOwnedBy(Object owner) throws IOException {
137     Internal.instance.closeIfOwnedBy(connection, owner);
138   }
139 
flush()140   public void flush() throws IOException {
141     sink.flush();
142   }
143 
144   /** Returns the number of buffered bytes immediately readable. */
bufferSize()145   public long bufferSize() {
146     return source.buffer().size();
147   }
148 
149   /** Test for a stale socket. */
isReadable()150   public boolean isReadable() {
151     try {
152       int readTimeout = socket.getSoTimeout();
153       try {
154         socket.setSoTimeout(1);
155         if (source.exhausted()) {
156           return false; // Stream is exhausted; socket is closed.
157         }
158         return true;
159       } finally {
160         socket.setSoTimeout(readTimeout);
161       }
162     } catch (SocketTimeoutException ignored) {
163       return true; // Read timed out; socket is good.
164     } catch (IOException e) {
165       return false; // Couldn't read; socket is closed.
166     }
167   }
168 
169   /** Returns bytes of a request header for sending on an HTTP transport. */
writeRequest(Headers headers, String requestLine)170   public void writeRequest(Headers headers, String requestLine) throws IOException {
171     if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
172     sink.writeUtf8(requestLine).writeUtf8("\r\n");
173     for (int i = 0, size = headers.size(); i < size; i ++) {
174       sink.writeUtf8(headers.name(i))
175           .writeUtf8(": ")
176           .writeUtf8(headers.value(i))
177           .writeUtf8("\r\n");
178     }
179     sink.writeUtf8("\r\n");
180     state = STATE_OPEN_REQUEST_BODY;
181   }
182 
183   /** Parses bytes of a response header from an HTTP transport. */
readResponse()184   public Response.Builder readResponse() throws IOException {
185     if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
186       throw new IllegalStateException("state: " + state);
187     }
188 
189     try {
190       while (true) {
191         StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict());
192 
193         Response.Builder responseBuilder = new Response.Builder()
194             .protocol(statusLine.protocol)
195             .code(statusLine.code)
196             .message(statusLine.message);
197 
198         Headers.Builder headersBuilder = new Headers.Builder();
199         readHeaders(headersBuilder);
200         headersBuilder.add(OkHeaders.SELECTED_PROTOCOL, statusLine.protocol.toString());
201         responseBuilder.headers(headersBuilder.build());
202 
203         if (statusLine.code != HTTP_CONTINUE) {
204           state = STATE_OPEN_RESPONSE_BODY;
205           return responseBuilder;
206         }
207       }
208     } catch (EOFException e) {
209       // Provide more context if the server ends the stream before sending a response.
210       IOException exception = new IOException("unexpected end of stream on " + connection
211           + " (recycle count=" + Internal.instance.recycleCount(connection) + ")");
212       exception.initCause(e);
213       throw exception;
214     }
215   }
216 
217   /** Reads headers or trailers into {@code builder}. */
readHeaders(Headers.Builder builder)218   public void readHeaders(Headers.Builder builder) throws IOException {
219     // parse the result headers until the first blank line
220     for (String line; (line = source.readUtf8LineStrict()).length() != 0; ) {
221       Internal.instance.addLenient(builder, line);
222     }
223   }
224 
newChunkedSink()225   public Sink newChunkedSink() {
226     if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
227     state = STATE_WRITING_REQUEST_BODY;
228     return new ChunkedSink();
229   }
230 
newFixedLengthSink(long contentLength)231   public Sink newFixedLengthSink(long contentLength) {
232     if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
233     state = STATE_WRITING_REQUEST_BODY;
234     return new FixedLengthSink(contentLength);
235   }
236 
writeRequestBody(RetryableSink requestBody)237   public void writeRequestBody(RetryableSink requestBody) throws IOException {
238     if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
239     state = STATE_READ_RESPONSE_HEADERS;
240     requestBody.writeToSocket(sink);
241   }
242 
newFixedLengthSource(long length)243   public Source newFixedLengthSource(long length) throws IOException {
244     if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
245     state = STATE_READING_RESPONSE_BODY;
246     return new FixedLengthSource(length);
247   }
248 
newChunkedSource(HttpEngine httpEngine)249   public Source newChunkedSource(HttpEngine httpEngine) throws IOException {
250     if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
251     state = STATE_READING_RESPONSE_BODY;
252     return new ChunkedSource(httpEngine);
253   }
254 
newUnknownLengthSource()255   public Source newUnknownLengthSource() throws IOException {
256     if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
257     state = STATE_READING_RESPONSE_BODY;
258     return new UnknownLengthSource();
259   }
260 
rawSink()261   public BufferedSink rawSink() {
262     return sink;
263   }
264 
rawSource()265   public BufferedSource rawSource() {
266     return source;
267   }
268 
269   /**
270    * Sets the delegate of {@code timeout} to {@link Timeout#NONE} and resets its underlying timeout
271    * to the default configuration. Use this to avoid unexpected sharing of timeouts between pooled
272    * connections.
273    */
detachTimeout(ForwardingTimeout timeout)274   private void detachTimeout(ForwardingTimeout timeout) {
275     Timeout oldDelegate = timeout.delegate();
276     timeout.setDelegate(Timeout.NONE);
277     oldDelegate.clearDeadline();
278     oldDelegate.clearTimeout();
279   }
280 
281   /** An HTTP body with a fixed length known in advance. */
282   private final class FixedLengthSink implements Sink {
283     private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
284     private boolean closed;
285     private long bytesRemaining;
286 
FixedLengthSink(long bytesRemaining)287     private FixedLengthSink(long bytesRemaining) {
288       this.bytesRemaining = bytesRemaining;
289     }
290 
timeout()291     @Override public Timeout timeout() {
292       return timeout;
293     }
294 
write(Buffer source, long byteCount)295     @Override public void write(Buffer source, long byteCount) throws IOException {
296       if (closed) throw new IllegalStateException("closed");
297       checkOffsetAndCount(source.size(), 0, byteCount);
298       if (byteCount > bytesRemaining) {
299         throw new ProtocolException("expected " + bytesRemaining
300             + " bytes but received " + byteCount);
301       }
302       sink.write(source, byteCount);
303       bytesRemaining -= byteCount;
304     }
305 
flush()306     @Override public void flush() throws IOException {
307       if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf.
308       sink.flush();
309     }
310 
close()311     @Override public void close() throws IOException {
312       if (closed) return;
313       closed = true;
314       if (bytesRemaining > 0) throw new ProtocolException("unexpected end of stream");
315       detachTimeout(timeout);
316       state = STATE_READ_RESPONSE_HEADERS;
317     }
318   }
319 
320   /**
321    * An HTTP body with alternating chunk sizes and chunk bodies. It is the
322    * caller's responsibility to buffer chunks; typically by using a buffered
323    * sink with this sink.
324    */
325   private final class ChunkedSink implements Sink {
326     private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
327     private boolean closed;
328 
timeout()329     @Override public Timeout timeout() {
330       return timeout;
331     }
332 
write(Buffer source, long byteCount)333     @Override public void write(Buffer source, long byteCount) throws IOException {
334       if (closed) throw new IllegalStateException("closed");
335       if (byteCount == 0) return;
336 
337       sink.writeHexadecimalUnsignedLong(byteCount);
338       sink.writeUtf8("\r\n");
339       sink.write(source, byteCount);
340       sink.writeUtf8("\r\n");
341     }
342 
flush()343     @Override public synchronized void flush() throws IOException {
344       if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf.
345       sink.flush();
346     }
347 
close()348     @Override public synchronized void close() throws IOException {
349       if (closed) return;
350       closed = true;
351       sink.writeUtf8("0\r\n\r\n");
352       detachTimeout(timeout);
353       state = STATE_READ_RESPONSE_HEADERS;
354     }
355   }
356 
357   private abstract class AbstractSource implements Source {
358     protected final ForwardingTimeout timeout = new ForwardingTimeout(source.timeout());
359     protected boolean closed;
360 
timeout()361     @Override public Timeout timeout() {
362       return timeout;
363     }
364 
365     /**
366      * Closes the cache entry and makes the socket available for reuse. This
367      * should be invoked when the end of the body has been reached.
368      */
endOfInput(boolean recyclable)369     protected final void endOfInput(boolean recyclable) throws IOException {
370       if (state != STATE_READING_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
371 
372       detachTimeout(timeout);
373 
374       state = STATE_IDLE;
375       if (recyclable && onIdle == ON_IDLE_POOL) {
376         onIdle = ON_IDLE_HOLD; // Set the on idle policy back to the default.
377         Internal.instance.recycle(pool, connection);
378       } else if (onIdle == ON_IDLE_CLOSE) {
379         state = STATE_CLOSED;
380         connection.getSocket().close();
381       }
382     }
383 
384     /**
385      * Calls abort on the cache entry and disconnects the socket. This
386      * should be invoked when the connection is closed unexpectedly to
387      * invalidate the cache entry and to prevent the HTTP connection from
388      * being reused. HTTP messages are sent in serial so whenever a message
389      * cannot be read to completion, subsequent messages cannot be read
390      * either and the connection must be discarded.
391      *
392      * <p>An earlier implementation skipped the remaining bytes, but this
393      * requires that the entire transfer be completed. If the intention was
394      * to cancel the transfer, closing the connection is the only solution.
395      */
unexpectedEndOfInput()396     protected final void unexpectedEndOfInput() {
397       Util.closeQuietly(connection.getSocket());
398       state = STATE_CLOSED;
399     }
400   }
401 
402   /** An HTTP body with a fixed length specified in advance. */
403   private class FixedLengthSource extends AbstractSource {
404     private long bytesRemaining;
405 
FixedLengthSource(long length)406     public FixedLengthSource(long length) throws IOException {
407       bytesRemaining = length;
408       if (bytesRemaining == 0) {
409         endOfInput(true);
410       }
411     }
412 
read(Buffer sink, long byteCount)413     @Override public long read(Buffer sink, long byteCount) throws IOException {
414       if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
415       if (closed) throw new IllegalStateException("closed");
416       if (bytesRemaining == 0) return -1;
417 
418       long read = source.read(sink, Math.min(bytesRemaining, byteCount));
419       if (read == -1) {
420         unexpectedEndOfInput(); // The server didn't supply the promised content length.
421         throw new ProtocolException("unexpected end of stream");
422       }
423 
424       bytesRemaining -= read;
425       if (bytesRemaining == 0) {
426         endOfInput(true);
427       }
428       return read;
429     }
430 
close()431     @Override public void close() throws IOException {
432       if (closed) return;
433 
434       if (bytesRemaining != 0
435           && !Util.discard(this, DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
436         unexpectedEndOfInput();
437       }
438 
439       closed = true;
440     }
441   }
442 
443   /** An HTTP body with alternating chunk sizes and chunk bodies. */
444   private class ChunkedSource extends AbstractSource {
445     private static final long NO_CHUNK_YET = -1L;
446     private long bytesRemainingInChunk = NO_CHUNK_YET;
447     private boolean hasMoreChunks = true;
448     private final HttpEngine httpEngine;
449 
ChunkedSource(HttpEngine httpEngine)450     ChunkedSource(HttpEngine httpEngine) throws IOException {
451       this.httpEngine = httpEngine;
452     }
453 
read(Buffer sink, long byteCount)454     @Override public long read(Buffer sink, long byteCount) throws IOException {
455       if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
456       if (closed) throw new IllegalStateException("closed");
457       if (!hasMoreChunks) return -1;
458 
459       if (bytesRemainingInChunk == 0 || bytesRemainingInChunk == NO_CHUNK_YET) {
460         readChunkSize();
461         if (!hasMoreChunks) return -1;
462       }
463 
464       long read = source.read(sink, Math.min(byteCount, bytesRemainingInChunk));
465       if (read == -1) {
466         unexpectedEndOfInput(); // The server didn't supply the promised chunk length.
467         throw new ProtocolException("unexpected end of stream");
468       }
469       bytesRemainingInChunk -= read;
470       return read;
471     }
472 
readChunkSize()473     private void readChunkSize() throws IOException {
474       // Read the suffix of the previous chunk.
475       if (bytesRemainingInChunk != NO_CHUNK_YET) {
476         source.readUtf8LineStrict();
477       }
478       try {
479         bytesRemainingInChunk = source.readHexadecimalUnsignedLong();
480         String extensions = source.readUtf8LineStrict().trim();
481         if (bytesRemainingInChunk < 0 || (!extensions.isEmpty() && !extensions.startsWith(";"))) {
482           throw new ProtocolException("expected chunk size and optional extensions but was \""
483               + bytesRemainingInChunk + extensions + "\"");
484         }
485       } catch (NumberFormatException e) {
486         throw new ProtocolException(e.getMessage());
487       }
488       if (bytesRemainingInChunk == 0L) {
489         hasMoreChunks = false;
490         Headers.Builder trailersBuilder = new Headers.Builder();
491         readHeaders(trailersBuilder);
492         httpEngine.receiveHeaders(trailersBuilder.build());
493         endOfInput(true);
494       }
495     }
496 
close()497     @Override public void close() throws IOException {
498       if (closed) return;
499       if (hasMoreChunks && !Util.discard(this, DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
500         unexpectedEndOfInput();
501       }
502       closed = true;
503     }
504   }
505 
506   /** An HTTP message body terminated by the end of the underlying stream. */
507   private class UnknownLengthSource extends AbstractSource {
508     private boolean inputExhausted;
509 
read(Buffer sink, long byteCount)510     @Override public long read(Buffer sink, long byteCount)
511         throws IOException {
512       if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
513       if (closed) throw new IllegalStateException("closed");
514       if (inputExhausted) return -1;
515 
516       long read = source.read(sink, byteCount);
517       if (read == -1) {
518         inputExhausted = true;
519         endOfInput(false);
520         return -1;
521       }
522       return read;
523     }
524 
close()525     @Override public void close() throws IOException {
526       if (closed) return;
527       if (!inputExhausted) {
528         unexpectedEndOfInput();
529       }
530       closed = true;
531     }
532   }
533 }
534