• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.squareup.okhttp.internal.http;
18 
19 import com.squareup.okhttp.Headers;
20 import com.squareup.okhttp.Request;
21 import com.squareup.okhttp.Response;
22 import com.squareup.okhttp.ResponseBody;
23 import com.squareup.okhttp.internal.Internal;
24 import com.squareup.okhttp.internal.Util;
25 import com.squareup.okhttp.internal.io.RealConnection;
26 import java.io.EOFException;
27 import java.io.IOException;
28 import java.net.ProtocolException;
29 import okio.Buffer;
30 import okio.BufferedSink;
31 import okio.BufferedSource;
32 import okio.ForwardingTimeout;
33 import okio.Okio;
34 import okio.Sink;
35 import okio.Source;
36 import okio.Timeout;
37 
38 import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;
39 import static com.squareup.okhttp.internal.http.StatusLine.HTTP_CONTINUE;
40 import static java.util.concurrent.TimeUnit.MILLISECONDS;
41 
42 /**
43  * A socket connection that can be used to send HTTP/1.1 messages. This class
44  * strictly enforces the following lifecycle:
45  * <ol>
46  *   <li>{@link #writeRequest Send request headers}.
47  *   <li>Open a sink to write the request body. Either {@link
48  *       #newFixedLengthSink fixed-length} or {@link #newChunkedSink chunked}.
49  *   <li>Write to and then close that sink.
50  *   <li>{@link #readResponse Read response headers}.
51  *   <li>Open a source to read the response body. Either {@link
52  *       #newFixedLengthSource fixed-length}, {@link #newChunkedSource chunked}
53  *       or {@link #newUnknownLengthSource unknown length}.
54  *   <li>Read from and close that source.
55  * </ol>
56  * <p>Exchanges that do not have a request body may skip creating and closing
57  * the request body. Exchanges that do not have a response body can call {@link
58  * #newFixedLengthSource(long) newFixedLengthSource(0)} and may skip reading and
59  * closing that source.
60  */
61 public final class Http1xStream implements HttpStream {
62   private static final int STATE_IDLE = 0; // Idle connections are ready to write request headers.
63   private static final int STATE_OPEN_REQUEST_BODY = 1;
64   private static final int STATE_WRITING_REQUEST_BODY = 2;
65   private static final int STATE_READ_RESPONSE_HEADERS = 3;
66   private static final int STATE_OPEN_RESPONSE_BODY = 4;
67   private static final int STATE_READING_RESPONSE_BODY = 5;
68   private static final int STATE_CLOSED = 6;
69 
70   /** The stream allocation that owns this stream. May be null for HTTPS proxy tunnels. */
71   private final StreamAllocation streamAllocation;
72   private final BufferedSource source;
73   private final BufferedSink sink;
74   private HttpEngine httpEngine;
75   private int state = STATE_IDLE;
76 
Http1xStream(StreamAllocation streamAllocation, BufferedSource source, BufferedSink sink)77   public Http1xStream(StreamAllocation streamAllocation, BufferedSource source, BufferedSink sink) {
78     this.streamAllocation = streamAllocation;
79     this.source = source;
80     this.sink = sink;
81   }
82 
setHttpEngine(HttpEngine httpEngine)83   @Override public void setHttpEngine(HttpEngine httpEngine) {
84     this.httpEngine = httpEngine;
85   }
86 
createRequestBody(Request request, long contentLength)87   @Override public Sink createRequestBody(Request request, long contentLength) throws IOException {
88     if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
89       // Stream a request body of unknown length.
90       return newChunkedSink();
91     }
92 
93     if (contentLength != -1) {
94       // Stream a request body of a known length.
95       return newFixedLengthSink(contentLength);
96     }
97 
98     throw new IllegalStateException(
99         "Cannot stream a request body without chunked encoding or a known content length!");
100   }
101 
cancel()102   @Override public void cancel() {
103     RealConnection connection = streamAllocation.connection();
104     if (connection != null) connection.cancel();
105   }
106 
107   /**
108    * Prepares the HTTP headers and sends them to the server.
109    *
110    * <p>For streaming requests with a body, headers must be prepared
111    * <strong>before</strong> the output stream has been written to. Otherwise
112    * the body would need to be buffered!
113    *
114    * <p>For non-streaming requests with a body, headers must be prepared
115    * <strong>after</strong> the output stream has been written to and closed.
116    * This ensures that the {@code Content-Length} header field receives the
117    * proper value.
118    */
writeRequestHeaders(Request request)119   @Override public void writeRequestHeaders(Request request) throws IOException {
120     httpEngine.writingRequestHeaders();
121     String requestLine = RequestLine.get(
122         request, httpEngine.getConnection().getRoute().getProxy().type());
123     writeRequest(request.headers(), requestLine);
124   }
125 
readResponseHeaders()126   @Override public Response.Builder readResponseHeaders() throws IOException {
127     return readResponse();
128   }
129 
openResponseBody(Response response)130   @Override public ResponseBody openResponseBody(Response response) throws IOException {
131     Source source = getTransferStream(response);
132     return new RealResponseBody(response.headers(), Okio.buffer(source));
133   }
134 
getTransferStream(Response response)135   private Source getTransferStream(Response response) throws IOException {
136     if (!HttpEngine.hasBody(response)) {
137       return newFixedLengthSource(0);
138     }
139 
140     if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
141       return newChunkedSource(httpEngine);
142     }
143 
144     long contentLength = OkHeaders.contentLength(response);
145     if (contentLength != -1) {
146       return newFixedLengthSource(contentLength);
147     }
148 
149     // Wrap the input stream from the connection (rather than just returning
150     // "socketIn" directly here), so that we can control its use after the
151     // reference escapes.
152     return newUnknownLengthSource();
153   }
154 
155   /** Returns true if this connection is closed. */
isClosed()156   public boolean isClosed() {
157     return state == STATE_CLOSED;
158   }
159 
finishRequest()160   @Override public void finishRequest() throws IOException {
161     sink.flush();
162   }
163 
164   /** Returns bytes of a request header for sending on an HTTP transport. */
writeRequest(Headers headers, String requestLine)165   public void writeRequest(Headers headers, String requestLine) throws IOException {
166     if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
167     sink.writeUtf8(requestLine).writeUtf8("\r\n");
168     for (int i = 0, size = headers.size(); i < size; i ++) {
169       sink.writeUtf8(headers.name(i))
170           .writeUtf8(": ")
171           .writeUtf8(headers.value(i))
172           .writeUtf8("\r\n");
173     }
174     sink.writeUtf8("\r\n");
175     state = STATE_OPEN_REQUEST_BODY;
176   }
177 
178   /** Parses bytes of a response header from an HTTP transport. */
readResponse()179   public Response.Builder readResponse() throws IOException {
180     if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
181       throw new IllegalStateException("state: " + state);
182     }
183 
184     try {
185       while (true) {
186         StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict());
187 
188         Response.Builder responseBuilder = new Response.Builder()
189             .protocol(statusLine.protocol)
190             .code(statusLine.code)
191             .message(statusLine.message)
192             .headers(readHeaders());
193 
194         if (statusLine.code != HTTP_CONTINUE) {
195           state = STATE_OPEN_RESPONSE_BODY;
196           return responseBuilder;
197         }
198       }
199     } catch (EOFException e) {
200       // Provide more context if the server ends the stream before sending a response.
201       IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
202       exception.initCause(e);
203       throw exception;
204     }
205   }
206 
207   /** Reads headers or trailers. */
readHeaders()208   public Headers readHeaders() throws IOException {
209     Headers.Builder headers = new Headers.Builder();
210     // parse the result headers until the first blank line
211     for (String line; (line = source.readUtf8LineStrict()).length() != 0; ) {
212       Internal.instance.addLenient(headers, line);
213     }
214     return headers.build();
215   }
216 
newChunkedSink()217   public Sink newChunkedSink() {
218     if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
219     state = STATE_WRITING_REQUEST_BODY;
220     return new ChunkedSink();
221   }
222 
newFixedLengthSink(long contentLength)223   public Sink newFixedLengthSink(long contentLength) {
224     if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
225     state = STATE_WRITING_REQUEST_BODY;
226     return new FixedLengthSink(contentLength);
227   }
228 
writeRequestBody(RetryableSink requestBody)229   @Override public void writeRequestBody(RetryableSink requestBody) throws IOException {
230     if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
231     state = STATE_READ_RESPONSE_HEADERS;
232     requestBody.writeToSocket(sink);
233   }
234 
newFixedLengthSource(long length)235   public Source newFixedLengthSource(long length) throws IOException {
236     if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
237     state = STATE_READING_RESPONSE_BODY;
238     return new FixedLengthSource(length);
239   }
240 
newChunkedSource(HttpEngine httpEngine)241   public Source newChunkedSource(HttpEngine httpEngine) throws IOException {
242     if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
243     state = STATE_READING_RESPONSE_BODY;
244     return new ChunkedSource(httpEngine);
245   }
246 
newUnknownLengthSource()247   public Source newUnknownLengthSource() throws IOException {
248     if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
249     if (streamAllocation == null) throw new IllegalStateException("streamAllocation == null");
250     state = STATE_READING_RESPONSE_BODY;
251     streamAllocation.noNewStreams();
252     return new UnknownLengthSource();
253   }
254 
255   /**
256    * Sets the delegate of {@code timeout} to {@link Timeout#NONE} and resets its underlying timeout
257    * to the default configuration. Use this to avoid unexpected sharing of timeouts between pooled
258    * connections.
259    */
detachTimeout(ForwardingTimeout timeout)260   private void detachTimeout(ForwardingTimeout timeout) {
261     Timeout oldDelegate = timeout.delegate();
262     timeout.setDelegate(Timeout.NONE);
263     oldDelegate.clearDeadline();
264     oldDelegate.clearTimeout();
265   }
266 
267   /** An HTTP body with a fixed length known in advance. */
268   private final class FixedLengthSink implements Sink {
269     private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
270     private boolean closed;
271     private long bytesRemaining;
272 
FixedLengthSink(long bytesRemaining)273     private FixedLengthSink(long bytesRemaining) {
274       this.bytesRemaining = bytesRemaining;
275     }
276 
timeout()277     @Override public Timeout timeout() {
278       return timeout;
279     }
280 
write(Buffer source, long byteCount)281     @Override public void write(Buffer source, long byteCount) throws IOException {
282       if (closed) throw new IllegalStateException("closed");
283       checkOffsetAndCount(source.size(), 0, byteCount);
284       if (byteCount > bytesRemaining) {
285         throw new ProtocolException("expected " + bytesRemaining
286             + " bytes but received " + byteCount);
287       }
288       sink.write(source, byteCount);
289       bytesRemaining -= byteCount;
290     }
291 
flush()292     @Override public void flush() throws IOException {
293       if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf.
294       sink.flush();
295     }
296 
close()297     @Override public void close() throws IOException {
298       if (closed) return;
299       closed = true;
300       if (bytesRemaining > 0) throw new ProtocolException("unexpected end of stream");
301       detachTimeout(timeout);
302       state = STATE_READ_RESPONSE_HEADERS;
303     }
304   }
305 
306   /**
307    * An HTTP body with alternating chunk sizes and chunk bodies. It is the
308    * caller's responsibility to buffer chunks; typically by using a buffered
309    * sink with this sink.
310    */
311   private final class ChunkedSink implements Sink {
312     private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
313     private boolean closed;
314 
timeout()315     @Override public Timeout timeout() {
316       return timeout;
317     }
318 
write(Buffer source, long byteCount)319     @Override public void write(Buffer source, long byteCount) throws IOException {
320       if (closed) throw new IllegalStateException("closed");
321       if (byteCount == 0) return;
322 
323       sink.writeHexadecimalUnsignedLong(byteCount);
324       sink.writeUtf8("\r\n");
325       sink.write(source, byteCount);
326       sink.writeUtf8("\r\n");
327     }
328 
flush()329     @Override public synchronized void flush() throws IOException {
330       if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf.
331       sink.flush();
332     }
333 
close()334     @Override public synchronized void close() throws IOException {
335       if (closed) return;
336       closed = true;
337       sink.writeUtf8("0\r\n\r\n");
338       detachTimeout(timeout);
339       state = STATE_READ_RESPONSE_HEADERS;
340     }
341   }
342 
343   private abstract class AbstractSource implements Source {
344     protected final ForwardingTimeout timeout = new ForwardingTimeout(source.timeout());
345     protected boolean closed;
346 
timeout()347     @Override public Timeout timeout() {
348       return timeout;
349     }
350 
351     /**
352      * Closes the cache entry and makes the socket available for reuse. This
353      * should be invoked when the end of the body has been reached.
354      */
endOfInput()355     protected final void endOfInput() throws IOException {
356       if (state != STATE_READING_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
357 
358       detachTimeout(timeout);
359 
360       state = STATE_CLOSED;
361       if (streamAllocation != null) {
362         streamAllocation.streamFinished(Http1xStream.this);
363       }
364     }
365 
unexpectedEndOfInput()366     protected final void unexpectedEndOfInput() {
367       if (state == STATE_CLOSED) return;
368 
369       state = STATE_CLOSED;
370       if (streamAllocation != null) {
371         streamAllocation.noNewStreams();
372         streamAllocation.streamFinished(Http1xStream.this);
373       }
374     }
375   }
376 
377   /** An HTTP body with a fixed length specified in advance. */
378   private class FixedLengthSource extends AbstractSource {
379     private long bytesRemaining;
380 
FixedLengthSource(long length)381     public FixedLengthSource(long length) throws IOException {
382       bytesRemaining = length;
383       if (bytesRemaining == 0) {
384         endOfInput();
385       }
386     }
387 
read(Buffer sink, long byteCount)388     @Override public long read(Buffer sink, long byteCount) throws IOException {
389       if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
390       if (closed) throw new IllegalStateException("closed");
391       if (bytesRemaining == 0) return -1;
392 
393       long read = source.read(sink, Math.min(bytesRemaining, byteCount));
394       if (read == -1) {
395         unexpectedEndOfInput(); // The server didn't supply the promised content length.
396         throw new ProtocolException("unexpected end of stream");
397       }
398 
399       bytesRemaining -= read;
400       if (bytesRemaining == 0) {
401         endOfInput();
402       }
403       return read;
404     }
405 
close()406     @Override public void close() throws IOException {
407       if (closed) return;
408 
409       if (bytesRemaining != 0
410           && !Util.discard(this, DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
411         unexpectedEndOfInput();
412       }
413 
414       closed = true;
415     }
416   }
417 
418   /** An HTTP body with alternating chunk sizes and chunk bodies. */
419   private class ChunkedSource extends AbstractSource {
420     private static final long NO_CHUNK_YET = -1L;
421     private long bytesRemainingInChunk = NO_CHUNK_YET;
422     private boolean hasMoreChunks = true;
423     private final HttpEngine httpEngine;
424 
ChunkedSource(HttpEngine httpEngine)425     ChunkedSource(HttpEngine httpEngine) throws IOException {
426       this.httpEngine = httpEngine;
427     }
428 
read(Buffer sink, long byteCount)429     @Override public long read(Buffer sink, long byteCount) throws IOException {
430       if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
431       if (closed) throw new IllegalStateException("closed");
432       if (!hasMoreChunks) return -1;
433 
434       if (bytesRemainingInChunk == 0 || bytesRemainingInChunk == NO_CHUNK_YET) {
435         readChunkSize();
436         if (!hasMoreChunks) return -1;
437       }
438 
439       long read = source.read(sink, Math.min(byteCount, bytesRemainingInChunk));
440       if (read == -1) {
441         unexpectedEndOfInput(); // The server didn't supply the promised chunk length.
442         throw new ProtocolException("unexpected end of stream");
443       }
444       bytesRemainingInChunk -= read;
445       return read;
446     }
447 
readChunkSize()448     private void readChunkSize() throws IOException {
449       // Read the suffix of the previous chunk.
450       if (bytesRemainingInChunk != NO_CHUNK_YET) {
451         source.readUtf8LineStrict();
452       }
453       try {
454         bytesRemainingInChunk = source.readHexadecimalUnsignedLong();
455         String extensions = source.readUtf8LineStrict().trim();
456         if (bytesRemainingInChunk < 0 || (!extensions.isEmpty() && !extensions.startsWith(";"))) {
457           throw new ProtocolException("expected chunk size and optional extensions but was \""
458               + bytesRemainingInChunk + extensions + "\"");
459         }
460       } catch (NumberFormatException e) {
461         throw new ProtocolException(e.getMessage());
462       }
463       if (bytesRemainingInChunk == 0L) {
464         hasMoreChunks = false;
465         httpEngine.receiveHeaders(readHeaders());
466         endOfInput();
467       }
468     }
469 
close()470     @Override public void close() throws IOException {
471       if (closed) return;
472       if (hasMoreChunks && !Util.discard(this, DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
473         unexpectedEndOfInput();
474       }
475       closed = true;
476     }
477   }
478 
479   /** An HTTP message body terminated by the end of the underlying stream. */
480   private class UnknownLengthSource extends AbstractSource {
481     private boolean inputExhausted;
482 
read(Buffer sink, long byteCount)483     @Override public long read(Buffer sink, long byteCount)
484         throws IOException {
485       if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
486       if (closed) throw new IllegalStateException("closed");
487       if (inputExhausted) return -1;
488 
489       long read = source.read(sink, byteCount);
490       if (read == -1) {
491         inputExhausted = true;
492         endOfInput();
493         return -1;
494       }
495       return read;
496     }
497 
close()498     @Override public void close() throws IOException {
499       if (closed) return;
500       if (!inputExhausted) {
501         unexpectedEndOfInput();
502       }
503       closed = true;
504     }
505   }
506 }
507