• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.squareup.okhttp.internal.framed;
18 
19 import java.io.EOFException;
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.net.SocketTimeoutException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import okio.AsyncTimeout;
26 import okio.Buffer;
27 import okio.BufferedSource;
28 import okio.Sink;
29 import okio.Source;
30 import okio.Timeout;
31 
32 import static com.squareup.okhttp.internal.framed.Settings.DEFAULT_INITIAL_WINDOW_SIZE;
33 
34 /** A logical bidirectional stream. */
35 public final class FramedStream {
36   // Internal state is guarded by this. No long-running or potentially
37   // blocking operations are performed while the lock is held.
38 
39   /**
40    * The total number of bytes consumed by the application (with {@link
41    * FramedDataSource#read}), but not yet acknowledged by sending a {@code
42    * WINDOW_UPDATE} frame on this stream.
43    */
44   // Visible for testing
45   long unacknowledgedBytesRead = 0;
46 
47   /**
48    * Count of bytes that can be written on the stream before receiving a
49    * window update. Even if this is positive, writes will block until there
50    * available bytes in {@code connection.bytesLeftInWriteWindow}.
51    */
52   // guarded by this
53   long bytesLeftInWriteWindow;
54 
55   private final int id;
56   private final FramedConnection connection;
57 
58   /** Headers sent by the stream initiator. Immutable and non null. */
59   private final List<Header> requestHeaders;
60 
61   /** Headers sent in the stream reply. Null if reply is either not sent or not sent yet. */
62   private List<Header> responseHeaders;
63 
64   private final FramedDataSource source;
65   final FramedDataSink sink;
66   private final StreamTimeout readTimeout = new StreamTimeout();
67   private final StreamTimeout writeTimeout = new StreamTimeout();
68 
69   /**
70    * The reason why this stream was abnormally closed. If there are multiple
71    * reasons to abnormally close this stream (such as both peers closing it
72    * near-simultaneously) then this is the first reason known to this peer.
73    */
74   private ErrorCode errorCode = null;
75 
FramedStream(int id, FramedConnection connection, boolean outFinished, boolean inFinished, List<Header> requestHeaders)76   FramedStream(int id, FramedConnection connection, boolean outFinished, boolean inFinished,
77       List<Header> requestHeaders) {
78     if (connection == null) throw new NullPointerException("connection == null");
79     if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
80     this.id = id;
81     this.connection = connection;
82     this.bytesLeftInWriteWindow =
83         connection.peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
84     this.source = new FramedDataSource(
85         connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE));
86     this.sink = new FramedDataSink();
87     this.source.finished = inFinished;
88     this.sink.finished = outFinished;
89     this.requestHeaders = requestHeaders;
90   }
91 
getId()92   public int getId() {
93     return id;
94   }
95 
96   /**
97    * Returns true if this stream is open. A stream is open until either:
98    * <ul>
99    * <li>A {@code SYN_RESET} frame abnormally terminates the stream.
100    * <li>Both input and output streams have transmitted all data and
101    * headers.
102    * </ul>
103    * Note that the input stream may continue to yield data even after a stream
104    * reports itself as not open. This is because input data is buffered.
105    */
isOpen()106   public synchronized boolean isOpen() {
107     if (errorCode != null) {
108       return false;
109     }
110     if ((source.finished || source.closed)
111         && (sink.finished || sink.closed)
112         && responseHeaders != null) {
113       return false;
114     }
115     return true;
116   }
117 
118   /** Returns true if this stream was created by this peer. */
isLocallyInitiated()119   public boolean isLocallyInitiated() {
120     boolean streamIsClient = ((id & 1) == 1);
121     return connection.client == streamIsClient;
122   }
123 
getConnection()124   public FramedConnection getConnection() {
125     return connection;
126   }
127 
getRequestHeaders()128   public List<Header> getRequestHeaders() {
129     return requestHeaders;
130   }
131 
132   /**
133    * Returns the stream's response headers, blocking if necessary if they
134    * have not been received yet.
135    */
getResponseHeaders()136   public synchronized List<Header> getResponseHeaders() throws IOException {
137     readTimeout.enter();
138     try {
139       while (responseHeaders == null && errorCode == null) {
140         waitForIo();
141       }
142     } finally {
143       readTimeout.exitAndThrowIfTimedOut();
144     }
145     if (responseHeaders != null) return responseHeaders;
146     throw new IOException("stream was reset: " + errorCode);
147   }
148 
149   /**
150    * Returns the reason why this stream was closed, or null if it closed
151    * normally or has not yet been closed.
152    */
getErrorCode()153   public synchronized ErrorCode getErrorCode() {
154     return errorCode;
155   }
156 
157   /**
158    * Sends a reply to an incoming stream.
159    *
160    * @param out true to create an output stream that we can use to send data
161    * to the remote peer. Corresponds to {@code FLAG_FIN}.
162    */
reply(List<Header> responseHeaders, boolean out)163   public void reply(List<Header> responseHeaders, boolean out) throws IOException {
164     assert (!Thread.holdsLock(FramedStream.this));
165     boolean outFinished = false;
166     synchronized (this) {
167       if (responseHeaders == null) {
168         throw new NullPointerException("responseHeaders == null");
169       }
170       if (this.responseHeaders != null) {
171         throw new IllegalStateException("reply already sent");
172       }
173       this.responseHeaders = responseHeaders;
174       if (!out) {
175         this.sink.finished = true;
176         outFinished = true;
177       }
178     }
179     connection.writeSynReply(id, outFinished, responseHeaders);
180 
181     if (outFinished) {
182       connection.flush();
183     }
184   }
185 
readTimeout()186   public Timeout readTimeout() {
187     return readTimeout;
188   }
189 
writeTimeout()190   public Timeout writeTimeout() {
191     return writeTimeout;
192   }
193 
194   /** Returns a source that reads data from the peer. */
getSource()195   public Source getSource() {
196     return source;
197   }
198 
199   /**
200    * Returns a sink that can be used to write data to the peer.
201    *
202    * @throws IllegalStateException if this stream was initiated by the peer
203    *     and a {@link #reply} has not yet been sent.
204    */
getSink()205   public Sink getSink() {
206     synchronized (this) {
207       if (responseHeaders == null && !isLocallyInitiated()) {
208         throw new IllegalStateException("reply before requesting the sink");
209       }
210     }
211     return sink;
212   }
213 
214   /**
215    * Abnormally terminate this stream. This blocks until the {@code RST_STREAM}
216    * frame has been transmitted.
217    */
close(ErrorCode rstStatusCode)218   public void close(ErrorCode rstStatusCode) throws IOException {
219     if (!closeInternal(rstStatusCode)) {
220       return; // Already closed.
221     }
222     connection.writeSynReset(id, rstStatusCode);
223   }
224 
225   /**
226    * Abnormally terminate this stream. This enqueues a {@code RST_STREAM}
227    * frame and returns immediately.
228    */
closeLater(ErrorCode errorCode)229   public void closeLater(ErrorCode errorCode) {
230     if (!closeInternal(errorCode)) {
231       return; // Already closed.
232     }
233     connection.writeSynResetLater(id, errorCode);
234   }
235 
236   /** Returns true if this stream was closed. */
closeInternal(ErrorCode errorCode)237   private boolean closeInternal(ErrorCode errorCode) {
238     assert (!Thread.holdsLock(this));
239     synchronized (this) {
240       if (this.errorCode != null) {
241         return false;
242       }
243       if (source.finished && sink.finished) {
244         return false;
245       }
246       this.errorCode = errorCode;
247       notifyAll();
248     }
249     connection.removeStream(id);
250     return true;
251   }
252 
receiveHeaders(List<Header> headers, HeadersMode headersMode)253   void receiveHeaders(List<Header> headers, HeadersMode headersMode) {
254     assert (!Thread.holdsLock(FramedStream.this));
255     ErrorCode errorCode = null;
256     boolean open = true;
257     synchronized (this) {
258       if (responseHeaders == null) {
259         if (headersMode.failIfHeadersAbsent()) {
260           errorCode = ErrorCode.PROTOCOL_ERROR;
261         } else {
262           responseHeaders = headers;
263           open = isOpen();
264           notifyAll();
265         }
266       } else {
267         if (headersMode.failIfHeadersPresent()) {
268           errorCode = ErrorCode.STREAM_IN_USE;
269         } else {
270           List<Header> newHeaders = new ArrayList<>();
271           newHeaders.addAll(responseHeaders);
272           newHeaders.addAll(headers);
273           this.responseHeaders = newHeaders;
274         }
275       }
276     }
277     if (errorCode != null) {
278       closeLater(errorCode);
279     } else if (!open) {
280       connection.removeStream(id);
281     }
282   }
283 
receiveData(BufferedSource in, int length)284   void receiveData(BufferedSource in, int length) throws IOException {
285     assert (!Thread.holdsLock(FramedStream.this));
286     this.source.receive(in, length);
287   }
288 
receiveFin()289   void receiveFin() {
290     assert (!Thread.holdsLock(FramedStream.this));
291     boolean open;
292     synchronized (this) {
293       this.source.finished = true;
294       open = isOpen();
295       notifyAll();
296     }
297     if (!open) {
298       connection.removeStream(id);
299     }
300   }
301 
receiveRstStream(ErrorCode errorCode)302   synchronized void receiveRstStream(ErrorCode errorCode) {
303     if (this.errorCode == null) {
304       this.errorCode = errorCode;
305       notifyAll();
306     }
307   }
308 
309   /**
310    * A source that reads the incoming data frames of a stream. Although this
311    * class uses synchronization to safely receive incoming data frames, it is
312    * not intended for use by multiple readers.
313    */
314   private final class FramedDataSource implements Source {
315     /** Buffer to receive data from the network into. Only accessed by the reader thread. */
316     private final Buffer receiveBuffer = new Buffer();
317 
318     /** Buffer with readable data. Guarded by FramedStream.this. */
319     private final Buffer readBuffer = new Buffer();
320 
321     /** Maximum number of bytes to buffer before reporting a flow control error. */
322     private final long maxByteCount;
323 
324     /** True if the caller has closed this stream. */
325     private boolean closed;
326 
327     /**
328      * True if either side has cleanly shut down this stream. We will
329      * receive no more bytes beyond those already in the buffer.
330      */
331     private boolean finished;
332 
FramedDataSource(long maxByteCount)333     private FramedDataSource(long maxByteCount) {
334       this.maxByteCount = maxByteCount;
335     }
336 
read(Buffer sink, long byteCount)337     @Override public long read(Buffer sink, long byteCount)
338         throws IOException {
339       if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
340 
341       long read;
342       synchronized (FramedStream.this) {
343         waitUntilReadable();
344         checkNotClosed();
345         if (readBuffer.size() == 0) return -1; // This source is exhausted.
346 
347         // Move bytes from the read buffer into the caller's buffer.
348         read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
349 
350         // Flow control: notify the peer that we're ready for more data!
351         unacknowledgedBytesRead += read;
352         if (unacknowledgedBytesRead
353             >= connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) {
354           connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
355           unacknowledgedBytesRead = 0;
356         }
357       }
358 
359       // Update connection.unacknowledgedBytesRead outside the stream lock.
360       synchronized (connection) { // Multiple application threads may hit this section.
361         connection.unacknowledgedBytesRead += read;
362         if (connection.unacknowledgedBytesRead
363             >= connection.okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) {
364           connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead);
365           connection.unacknowledgedBytesRead = 0;
366         }
367       }
368 
369       return read;
370     }
371 
372     /** Returns once the source is either readable or finished. */
waitUntilReadable()373     private void waitUntilReadable() throws IOException {
374       readTimeout.enter();
375       try {
376         while (readBuffer.size() == 0 && !finished && !closed && errorCode == null) {
377           waitForIo();
378         }
379       } finally {
380         readTimeout.exitAndThrowIfTimedOut();
381       }
382     }
383 
receive(BufferedSource in, long byteCount)384     void receive(BufferedSource in, long byteCount) throws IOException {
385       assert (!Thread.holdsLock(FramedStream.this));
386 
387       while (byteCount > 0) {
388         boolean finished;
389         boolean flowControlError;
390         synchronized (FramedStream.this) {
391           finished = this.finished;
392           flowControlError = byteCount + readBuffer.size() > maxByteCount;
393         }
394 
395         // If the peer sends more data than we can handle, discard it and close the connection.
396         if (flowControlError) {
397           in.skip(byteCount);
398           closeLater(ErrorCode.FLOW_CONTROL_ERROR);
399           return;
400         }
401 
402         // Discard data received after the stream is finished. It's probably a benign race.
403         if (finished) {
404           in.skip(byteCount);
405           return;
406         }
407 
408         // Fill the receive buffer without holding any locks.
409         long read = in.read(receiveBuffer, byteCount);
410         if (read == -1) throw new EOFException();
411         byteCount -= read;
412 
413         // Move the received data to the read buffer to the reader can read it.
414         synchronized (FramedStream.this) {
415           boolean wasEmpty = readBuffer.size() == 0;
416           readBuffer.writeAll(receiveBuffer);
417           if (wasEmpty) {
418             FramedStream.this.notifyAll();
419           }
420         }
421       }
422     }
423 
timeout()424     @Override public Timeout timeout() {
425       return readTimeout;
426     }
427 
close()428     @Override public void close() throws IOException {
429       synchronized (FramedStream.this) {
430         closed = true;
431         readBuffer.clear();
432         FramedStream.this.notifyAll();
433       }
434       cancelStreamIfNecessary();
435     }
436 
checkNotClosed()437     private void checkNotClosed() throws IOException {
438       if (closed) {
439         throw new IOException("stream closed");
440       }
441       if (errorCode != null) {
442         throw new IOException("stream was reset: " + errorCode);
443       }
444     }
445   }
446 
cancelStreamIfNecessary()447   private void cancelStreamIfNecessary() throws IOException {
448     assert (!Thread.holdsLock(FramedStream.this));
449     boolean open;
450     boolean cancel;
451     synchronized (this) {
452       cancel = !source.finished && source.closed && (sink.finished || sink.closed);
453       open = isOpen();
454     }
455     if (cancel) {
456       // RST this stream to prevent additional data from being sent. This
457       // is safe because the input stream is closed (we won't use any
458       // further bytes) and the output stream is either finished or closed
459       // (so RSTing both streams doesn't cause harm).
460       FramedStream.this.close(ErrorCode.CANCEL);
461     } else if (!open) {
462       connection.removeStream(id);
463     }
464   }
465 
466   /**
467    * A sink that writes outgoing data frames of a stream. This class is not
468    * thread safe.
469    */
470   final class FramedDataSink implements Sink {
471     private static final long EMIT_BUFFER_SIZE = 16384;
472 
473     /**
474      * Buffer of outgoing data. This batches writes of small writes into this sink as larges
475      * frames written to the outgoing connection. Batching saves the (small) framing overhead.
476      */
477     private final Buffer sendBuffer = new Buffer();
478 
479     private boolean closed;
480 
481     /**
482      * True if either side has cleanly shut down this stream. We shall send
483      * no more bytes.
484      */
485     private boolean finished;
486 
write(Buffer source, long byteCount)487     @Override public void write(Buffer source, long byteCount) throws IOException {
488       assert (!Thread.holdsLock(FramedStream.this));
489       sendBuffer.write(source, byteCount);
490       while (sendBuffer.size() >= EMIT_BUFFER_SIZE) {
491         emitDataFrame(false);
492       }
493     }
494 
495     /**
496      * Emit a single data frame to the connection. The frame's size be limited by this stream's
497      * write window. This method will block until the write window is nonempty.
498      */
emitDataFrame(boolean outFinished)499     private void emitDataFrame(boolean outFinished) throws IOException {
500       long toWrite;
501       synchronized (FramedStream.this) {
502         writeTimeout.enter();
503         try {
504           while (bytesLeftInWriteWindow <= 0 && !finished && !closed && errorCode == null) {
505             waitForIo(); // Wait until we receive a WINDOW_UPDATE for this stream.
506           }
507         } finally {
508           writeTimeout.exitAndThrowIfTimedOut();
509         }
510 
511         checkOutNotClosed(); // Kick out if the stream was reset or closed while waiting.
512         toWrite = Math.min(bytesLeftInWriteWindow, sendBuffer.size());
513         bytesLeftInWriteWindow -= toWrite;
514       }
515 
516       writeTimeout.enter();
517       try {
518         connection.writeData(id, outFinished && toWrite == sendBuffer.size(), sendBuffer, toWrite);
519       } finally {
520         writeTimeout.exitAndThrowIfTimedOut();
521       }
522     }
523 
flush()524     @Override public void flush() throws IOException {
525       assert (!Thread.holdsLock(FramedStream.this));
526       synchronized (FramedStream.this) {
527         checkOutNotClosed();
528       }
529       while (sendBuffer.size() > 0) {
530         emitDataFrame(false);
531         connection.flush();
532       }
533     }
534 
timeout()535     @Override public Timeout timeout() {
536       return writeTimeout;
537     }
538 
close()539     @Override public void close() throws IOException {
540       assert (!Thread.holdsLock(FramedStream.this));
541       synchronized (FramedStream.this) {
542         if (closed) return;
543       }
544       if (!sink.finished) {
545         // Emit the remaining data, setting the END_STREAM flag on the last frame.
546         if (sendBuffer.size() > 0) {
547           while (sendBuffer.size() > 0) {
548             emitDataFrame(true);
549           }
550         } else {
551           // Send an empty frame just so we can set the END_STREAM flag.
552           connection.writeData(id, true, null, 0);
553         }
554       }
555       synchronized (FramedStream.this) {
556         closed = true;
557       }
558       connection.flush();
559       cancelStreamIfNecessary();
560     }
561   }
562 
563   /**
564    * {@code delta} will be negative if a settings frame initial window is
565    * smaller than the last.
566    */
addBytesToWriteWindow(long delta)567   void addBytesToWriteWindow(long delta) {
568     bytesLeftInWriteWindow += delta;
569     if (delta > 0) FramedStream.this.notifyAll();
570   }
571 
checkOutNotClosed()572   private void checkOutNotClosed() throws IOException {
573     if (sink.closed) {
574       throw new IOException("stream closed");
575     } else if (sink.finished) {
576       throw new IOException("stream finished");
577     } else if (errorCode != null) {
578       throw new IOException("stream was reset: " + errorCode);
579     }
580   }
581 
582   /**
583    * Like {@link #wait}, but throws an {@code InterruptedIOException} when
584    * interrupted instead of the more awkward {@link InterruptedException}.
585    */
waitForIo()586   private void waitForIo() throws InterruptedIOException {
587     try {
588       wait();
589     } catch (InterruptedException e) {
590       throw new InterruptedIOException();
591     }
592   }
593 
594   /**
595    * The Okio timeout watchdog will call {@link #timedOut} if the timeout is
596    * reached. In that case we close the stream (asynchronously) which will
597    * notify the waiting thread.
598    */
599   class StreamTimeout extends AsyncTimeout {
timedOut()600     @Override protected void timedOut() {
601       closeLater(ErrorCode.CANCEL);
602     }
603 
newTimeoutException(IOException cause)604     @Override protected IOException newTimeoutException(IOException cause) {
605       SocketTimeoutException socketTimeoutException = new SocketTimeoutException("timeout");
606       if (cause != null) {
607         socketTimeoutException.initCause(cause);
608       }
609       return socketTimeoutException;
610     }
611 
exitAndThrowIfTimedOut()612     public void exitAndThrowIfTimedOut() throws IOException {
613       if (exit()) throw newTimeoutException(null /* cause */);
614     }
615   }
616 }
617