• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.squareup.okhttp.internal.framed;
17 
18 import com.squareup.okhttp.Protocol;
19 import com.squareup.okhttp.internal.NamedRunnable;
20 import com.squareup.okhttp.internal.Util;
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.net.InetSocketAddress;
25 import java.net.Socket;
26 import java.util.HashMap;
27 import java.util.LinkedHashSet;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.SynchronousQueue;
34 import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.logging.Level;
37 import okio.Buffer;
38 import okio.BufferedSource;
39 import okio.ByteString;
40 import okio.Okio;
41 
42 import static com.squareup.okhttp.internal.Internal.logger;
43 import static com.squareup.okhttp.internal.framed.Settings.DEFAULT_INITIAL_WINDOW_SIZE;
44 
45 /**
46  * A socket connection to a remote peer. A connection hosts streams which can
47  * send and receive data.
48  *
49  * <p>Many methods in this API are <strong>synchronous:</strong> the call is
50  * completed before the method returns. This is typical for Java but atypical
51  * for SPDY. This is motivated by exception transparency: an IOException that
52  * was triggered by a certain caller can be caught and handled by that caller.
53  */
54 public final class FramedConnection implements Closeable {
55 
56   // Internal state of this connection is guarded by 'this'. No blocking
57   // operations may be performed while holding this lock!
58   //
59   // Socket writes are guarded by frameWriter.
60   //
61   // Socket reads are unguarded but are only made by the reader thread.
62   //
63   // Certain operations (like SYN_STREAM) need to synchronize on both the
64   // frameWriter (to do blocking I/O) and this (to create streams). Such
65   // operations must synchronize on 'this' last. This ensures that we never
66   // wait for a blocking operation while holding 'this'.
67 
68   private static final ExecutorService executor = new ThreadPoolExecutor(0,
69       Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
70       Util.threadFactory("OkHttp FramedConnection", true));
71 
72   /** The protocol variant, like {@link com.squareup.okhttp.internal.framed.Spdy3}. */
73   final Protocol protocol;
74 
75   /** True if this peer initiated the connection. */
76   final boolean client;
77 
78   /**
79    * User code to run in response to an incoming stream. Callbacks must not be
80    * run on the callback executor.
81    */
82   private final IncomingStreamHandler handler;
83   private final Map<Integer, FramedStream> streams = new HashMap<>();
84   private final String hostName;
85   private int lastGoodStreamId;
86   private int nextStreamId;
87   private boolean shutdown;
88   private long idleStartTimeNs = System.nanoTime();
89 
90   /** Ensures push promise callbacks events are sent in order per stream. */
91   private final ExecutorService pushExecutor;
92 
93   /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */
94   private Map<Integer, Ping> pings;
95   /** User code to run in response to push promise events. */
96   private final PushObserver pushObserver;
97   private int nextPingId;
98 
99   /**
100    * The total number of bytes consumed by the application, but not yet
101    * acknowledged by sending a {@code WINDOW_UPDATE} frame on this connection.
102    */
103   // Visible for testing
104   long unacknowledgedBytesRead = 0;
105 
106   /**
107    * Count of bytes that can be written on the connection before receiving a
108    * window update.
109    */
110   // Visible for testing
111   long bytesLeftInWriteWindow;
112 
113   /** Settings we communicate to the peer. */
114   // TODO: Do we want to dynamically adjust settings, or KISS and only set once?
115   final Settings okHttpSettings = new Settings();
116       // okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max);
117   private static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;
118 
119   /** Settings we receive from the peer. */
120   // TODO: MWS will need to guard on this setting before attempting to push.
121   final Settings peerSettings = new Settings();
122 
123   private boolean receivedInitialPeerSettings = false;
124   final Variant variant;
125   final Socket socket;
126   final FrameWriter frameWriter;
127 
128   // Visible for testing
129   final Reader readerRunnable;
130 
FramedConnection(Builder builder)131   private FramedConnection(Builder builder) throws IOException {
132     protocol = builder.protocol;
133     pushObserver = builder.pushObserver;
134     client = builder.client;
135     handler = builder.handler;
136     // http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-5.1.1
137     nextStreamId = builder.client ? 1 : 2;
138     if (builder.client && protocol == Protocol.HTTP_2) {
139       nextStreamId += 2; // In HTTP/2, 1 on client is reserved for Upgrade.
140     }
141 
142     nextPingId = builder.client ? 1 : 2;
143 
144     // Flow control was designed more for servers, or proxies than edge clients.
145     // If we are a client, set the flow control window to 16MiB.  This avoids
146     // thrashing window updates every 64KiB, yet small enough to avoid blowing
147     // up the heap.
148     if (builder.client) {
149       okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, OKHTTP_CLIENT_WINDOW_SIZE);
150     }
151 
152     hostName = builder.hostName;
153 
154     if (protocol == Protocol.HTTP_2) {
155       variant = new Http2();
156       // Like newSingleThreadExecutor, except lazy creates the thread.
157       pushExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
158           new LinkedBlockingQueue<Runnable>(),
159           Util.threadFactory(String.format("OkHttp %s Push Observer", hostName), true));
160       // 1 less than SPDY http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.9.2
161       peerSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, 65535);
162       peerSettings.set(Settings.MAX_FRAME_SIZE, 0, Http2.INITIAL_MAX_FRAME_SIZE);
163     } else if (protocol == Protocol.SPDY_3) {
164       variant = new Spdy3();
165       pushExecutor = null;
166     } else {
167       throw new AssertionError(protocol);
168     }
169     bytesLeftInWriteWindow = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
170     socket = builder.socket;
171     frameWriter = variant.newWriter(Okio.buffer(Okio.sink(builder.socket)), client);
172 
173     readerRunnable = new Reader();
174     new Thread(readerRunnable).start(); // Not a daemon thread.
175   }
176 
177   /** The protocol as selected using ALPN. */
getProtocol()178   public Protocol getProtocol() {
179     return protocol;
180   }
181 
182   /**
183    * Returns the number of {@link FramedStream#isOpen() open streams} on this
184    * connection.
185    */
openStreamCount()186   public synchronized int openStreamCount() {
187     return streams.size();
188   }
189 
getStream(int id)190   synchronized FramedStream getStream(int id) {
191     return streams.get(id);
192   }
193 
removeStream(int streamId)194   synchronized FramedStream removeStream(int streamId) {
195     FramedStream stream = streams.remove(streamId);
196     if (stream != null && streams.isEmpty()) {
197       setIdle(true);
198     }
199     notifyAll(); // The removed stream may be blocked on a connection-wide window update.
200     return stream;
201   }
202 
setIdle(boolean value)203   private synchronized void setIdle(boolean value) {
204     idleStartTimeNs = value ? System.nanoTime() : Long.MAX_VALUE;
205   }
206 
207   /** Returns true if this connection is idle. */
isIdle()208   public synchronized boolean isIdle() {
209     return idleStartTimeNs != Long.MAX_VALUE;
210   }
211 
212   /**
213    * Returns the time in ns when this connection became idle or Long.MAX_VALUE
214    * if connection is not idle.
215    */
getIdleStartTimeNs()216   public synchronized long getIdleStartTimeNs() {
217     return idleStartTimeNs;
218   }
219 
220   /**
221    * Returns a new server-initiated stream.
222    *
223    * @param associatedStreamId the stream that triggered the sender to create
224    *     this stream.
225    * @param out true to create an output stream that we can use to send data
226    *     to the remote peer. Corresponds to {@code FLAG_FIN}.
227    */
pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out)228   public FramedStream pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out)
229       throws IOException {
230     if (client) throw new IllegalStateException("Client cannot push requests.");
231     if (protocol != Protocol.HTTP_2) throw new IllegalStateException("protocol != HTTP_2");
232     return newStream(associatedStreamId, requestHeaders, out, false);
233   }
234 
235   /**
236    * Returns a new locally-initiated stream.
237    *
238    * @param out true to create an output stream that we can use to send data to the remote peer.
239    *     Corresponds to {@code FLAG_FIN}.
240    * @param in true to create an input stream that the remote peer can use to send data to us.
241    *     Corresponds to {@code FLAG_UNIDIRECTIONAL}.
242    */
newStream(List<Header> requestHeaders, boolean out, boolean in)243   public FramedStream newStream(List<Header> requestHeaders, boolean out, boolean in)
244       throws IOException {
245     return newStream(0, requestHeaders, out, in);
246   }
247 
newStream(int associatedStreamId, List<Header> requestHeaders, boolean out, boolean in)248   private FramedStream newStream(int associatedStreamId, List<Header> requestHeaders, boolean out,
249       boolean in) throws IOException {
250     boolean outFinished = !out;
251     boolean inFinished = !in;
252     FramedStream stream;
253     int streamId;
254 
255     synchronized (frameWriter) {
256       synchronized (this) {
257         if (shutdown) {
258           throw new IOException("shutdown");
259         }
260         streamId = nextStreamId;
261         nextStreamId += 2;
262         stream = new FramedStream(streamId, this, outFinished, inFinished, requestHeaders);
263         if (stream.isOpen()) {
264           streams.put(streamId, stream);
265           setIdle(false);
266         }
267       }
268       if (associatedStreamId == 0) {
269         frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId,
270             requestHeaders);
271       } else if (client) {
272         throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
273       } else { // HTTP/2 has a PUSH_PROMISE frame.
274         frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders);
275       }
276     }
277 
278     if (!out) {
279       frameWriter.flush();
280     }
281 
282     return stream;
283   }
284 
writeSynReply(int streamId, boolean outFinished, List<Header> alternating)285   void writeSynReply(int streamId, boolean outFinished, List<Header> alternating)
286       throws IOException {
287     frameWriter.synReply(outFinished, streamId, alternating);
288   }
289 
290   /**
291    * Callers of this method are not thread safe, and sometimes on application threads. Most often,
292    * this method will be called to send a buffer worth of data to the peer.
293    *
294    * <p>Writes are subject to the write window of the stream and the connection. Until there is a
295    * window sufficient to send {@code byteCount}, the caller will block. For example, a user of
296    * {@code HttpURLConnection} who flushes more bytes to the output stream than the connection's
297    * write window will block.
298    *
299    * <p>Zero {@code byteCount} writes are not subject to flow control and will not block. The only
300    * use case for zero {@code byteCount} is closing a flushed output stream.
301    */
writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)302   public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)
303       throws IOException {
304     if (byteCount == 0) { // Empty data frames are not flow-controlled.
305       frameWriter.data(outFinished, streamId, buffer, 0);
306       return;
307     }
308 
309     while (byteCount > 0) {
310       int toWrite;
311       synchronized (FramedConnection.this) {
312         try {
313           while (bytesLeftInWriteWindow <= 0) {
314             // Before blocking, confirm that the stream we're writing is still open. It's possible
315             // that the stream has since been closed (such as if this write timed out.)
316             if (!streams.containsKey(streamId)) {
317               throw new IOException("stream closed");
318             }
319             FramedConnection.this.wait(); // Wait until we receive a WINDOW_UPDATE.
320           }
321         } catch (InterruptedException e) {
322           throw new InterruptedIOException();
323         }
324 
325         toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow);
326         toWrite = Math.min(toWrite, frameWriter.maxDataLength());
327         bytesLeftInWriteWindow -= toWrite;
328       }
329 
330       byteCount -= toWrite;
331       frameWriter.data(outFinished && byteCount == 0, streamId, buffer, toWrite);
332     }
333   }
334 
335   /**
336    * {@code delta} will be negative if a settings frame initial window is
337    * smaller than the last.
338    */
addBytesToWriteWindow(long delta)339   void addBytesToWriteWindow(long delta) {
340     bytesLeftInWriteWindow += delta;
341     if (delta > 0) FramedConnection.this.notifyAll();
342   }
343 
writeSynResetLater(final int streamId, final ErrorCode errorCode)344   void writeSynResetLater(final int streamId, final ErrorCode errorCode) {
345     executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
346       @Override public void execute() {
347         try {
348           writeSynReset(streamId, errorCode);
349         } catch (IOException ignored) {
350         }
351       }
352     });
353   }
354 
writeSynReset(int streamId, ErrorCode statusCode)355   void writeSynReset(int streamId, ErrorCode statusCode) throws IOException {
356     frameWriter.rstStream(streamId, statusCode);
357   }
358 
writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead)359   void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) {
360     executor.execute(new NamedRunnable("OkHttp Window Update %s stream %d", hostName, streamId) {
361       @Override public void execute() {
362         try {
363           frameWriter.windowUpdate(streamId, unacknowledgedBytesRead);
364         } catch (IOException ignored) {
365         }
366       }
367     });
368   }
369 
370   /**
371    * Sends a ping frame to the peer. Use the returned object to await the
372    * ping's response and observe its round trip time.
373    */
ping()374   public Ping ping() throws IOException {
375     Ping ping = new Ping();
376     int pingId;
377     synchronized (this) {
378       if (shutdown) {
379         throw new IOException("shutdown");
380       }
381       pingId = nextPingId;
382       nextPingId += 2;
383       if (pings == null) pings = new HashMap<>();
384       pings.put(pingId, ping);
385     }
386     writePing(false, pingId, 0x4f4b6f6b /* ASCII "OKok" */, ping);
387     return ping;
388   }
389 
writePingLater( final boolean reply, final int payload1, final int payload2, final Ping ping)390   private void writePingLater(
391       final boolean reply, final int payload1, final int payload2, final Ping ping) {
392     executor.execute(new NamedRunnable("OkHttp %s ping %08x%08x",
393         hostName, payload1, payload2) {
394       @Override public void execute() {
395         try {
396           writePing(reply, payload1, payload2, ping);
397         } catch (IOException ignored) {
398         }
399       }
400     });
401   }
402 
writePing(boolean reply, int payload1, int payload2, Ping ping)403   private void writePing(boolean reply, int payload1, int payload2, Ping ping) throws IOException {
404     synchronized (frameWriter) {
405       // Observe the sent time immediately before performing I/O.
406       if (ping != null) ping.send();
407       frameWriter.ping(reply, payload1, payload2);
408     }
409   }
410 
removePing(int id)411   private synchronized Ping removePing(int id) {
412     return pings != null ? pings.remove(id) : null;
413   }
414 
flush()415   public void flush() throws IOException {
416     frameWriter.flush();
417   }
418 
419   /**
420    * Degrades this connection such that new streams can neither be created
421    * locally, nor accepted from the remote peer. Existing streams are not
422    * impacted. This is intended to permit an endpoint to gracefully stop
423    * accepting new requests without harming previously established streams.
424    */
shutdown(ErrorCode statusCode)425   public void shutdown(ErrorCode statusCode) throws IOException {
426     synchronized (frameWriter) {
427       int lastGoodStreamId;
428       synchronized (this) {
429         if (shutdown) {
430           return;
431         }
432         shutdown = true;
433         lastGoodStreamId = this.lastGoodStreamId;
434       }
435       // TODO: propagate exception message into debugData
436       frameWriter.goAway(lastGoodStreamId, statusCode, Util.EMPTY_BYTE_ARRAY);
437     }
438   }
439 
440   /**
441    * Closes this connection. This cancels all open streams and unanswered
442    * pings. It closes the underlying input and output streams and shuts down
443    * internal executor services.
444    */
close()445   @Override public void close() throws IOException {
446     close(ErrorCode.NO_ERROR, ErrorCode.CANCEL);
447   }
448 
close(ErrorCode connectionCode, ErrorCode streamCode)449   private void close(ErrorCode connectionCode, ErrorCode streamCode) throws IOException {
450     assert (!Thread.holdsLock(this));
451     IOException thrown = null;
452     try {
453       shutdown(connectionCode);
454     } catch (IOException e) {
455       thrown = e;
456     }
457 
458     FramedStream[] streamsToClose = null;
459     Ping[] pingsToCancel = null;
460     synchronized (this) {
461       if (!streams.isEmpty()) {
462         streamsToClose = streams.values().toArray(new FramedStream[streams.size()]);
463         streams.clear();
464         setIdle(false);
465       }
466       if (pings != null) {
467         pingsToCancel = pings.values().toArray(new Ping[pings.size()]);
468         pings = null;
469       }
470     }
471 
472     if (streamsToClose != null) {
473       for (FramedStream stream : streamsToClose) {
474         try {
475           stream.close(streamCode);
476         } catch (IOException e) {
477           if (thrown != null) thrown = e;
478         }
479       }
480     }
481 
482     if (pingsToCancel != null) {
483       for (Ping ping : pingsToCancel) {
484         ping.cancel();
485       }
486     }
487 
488     // Close the writer to release its resources (such as deflaters).
489     try {
490       frameWriter.close();
491     } catch (IOException e) {
492       if (thrown == null) thrown = e;
493     }
494 
495     // Close the socket to break out the reader thread, which will clean up after itself.
496     try {
497       socket.close();
498     } catch (IOException e) {
499       thrown = e;
500     }
501 
502     if (thrown != null) throw thrown;
503   }
504 
505   /**
506    * Sends a connection header if the current variant requires it. This should
507    * be called after {@link Builder#build} for all new connections.
508    */
sendConnectionPreface()509   public void sendConnectionPreface() throws IOException {
510     frameWriter.connectionPreface();
511     frameWriter.settings(okHttpSettings);
512     int windowSize = okHttpSettings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE);
513     if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) {
514       frameWriter.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE);
515     }
516   }
517 
518   public static class Builder {
519     private String hostName;
520     private Socket socket;
521     private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS;
522     private Protocol protocol = Protocol.SPDY_3;
523     private PushObserver pushObserver = PushObserver.CANCEL;
524     private boolean client;
525 
Builder(boolean client, Socket socket)526     public Builder(boolean client, Socket socket) throws IOException {
527       this(((InetSocketAddress) socket.getRemoteSocketAddress()).getHostName(), client, socket);
528     }
529 
530     /**
531      * @param client true if this peer initiated the connection; false if this
532      *     peer accepted the connection.
533      */
Builder(String hostName, boolean client, Socket socket)534     public Builder(String hostName, boolean client, Socket socket) throws IOException {
535       this.hostName = hostName;
536       this.client = client;
537       this.socket = socket;
538     }
539 
handler(IncomingStreamHandler handler)540     public Builder handler(IncomingStreamHandler handler) {
541       this.handler = handler;
542       return this;
543     }
544 
protocol(Protocol protocol)545     public Builder protocol(Protocol protocol) {
546       this.protocol = protocol;
547       return this;
548     }
549 
pushObserver(PushObserver pushObserver)550     public Builder pushObserver(PushObserver pushObserver) {
551       this.pushObserver = pushObserver;
552       return this;
553     }
554 
build()555     public FramedConnection build() throws IOException {
556       return new FramedConnection(this);
557     }
558   }
559 
560   /**
561    * Methods in this class must not lock FrameWriter.  If a method needs to
562    * write a frame, create an async task to do so.
563    */
564   class Reader extends NamedRunnable implements FrameReader.Handler {
565     FrameReader frameReader;
566 
Reader()567     private Reader() {
568       super("OkHttp %s", hostName);
569     }
570 
execute()571     @Override protected void execute() {
572       ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
573       ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
574       try {
575         frameReader = variant.newReader(Okio.buffer(Okio.source(socket)), client);
576         if (!client) {
577           frameReader.readConnectionPreface();
578         }
579         while (frameReader.nextFrame(this)) {
580         }
581         connectionErrorCode = ErrorCode.NO_ERROR;
582         streamErrorCode = ErrorCode.CANCEL;
583       } catch (IOException e) {
584         connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
585         streamErrorCode = ErrorCode.PROTOCOL_ERROR;
586       } finally {
587         try {
588           close(connectionErrorCode, streamErrorCode);
589         } catch (IOException ignored) {
590         }
591         Util.closeQuietly(frameReader);
592       }
593     }
594 
data(boolean inFinished, int streamId, BufferedSource source, int length)595     @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
596         throws IOException {
597       if (pushedStream(streamId)) {
598         pushDataLater(streamId, source, length, inFinished);
599         return;
600       }
601       FramedStream dataStream = getStream(streamId);
602       if (dataStream == null) {
603         writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
604         source.skip(length);
605         return;
606       }
607       dataStream.receiveData(source, length);
608       if (inFinished) {
609         dataStream.receiveFin();
610       }
611     }
612 
headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode)613     @Override public void headers(boolean outFinished, boolean inFinished, int streamId,
614         int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode) {
615       if (pushedStream(streamId)) {
616         pushHeadersLater(streamId, headerBlock, inFinished);
617         return;
618       }
619       FramedStream stream;
620       synchronized (FramedConnection.this) {
621         // If we're shutdown, don't bother with this stream.
622         if (shutdown) return;
623 
624         stream = getStream(streamId);
625 
626         if (stream == null) {
627           // The headers claim to be for an existing stream, but we don't have one.
628           if (headersMode.failIfStreamAbsent()) {
629             writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
630             return;
631           }
632 
633           // If the stream ID is less than the last created ID, assume it's already closed.
634           if (streamId <= lastGoodStreamId) return;
635 
636           // If the stream ID is in the client's namespace, assume it's already closed.
637           if (streamId % 2 == nextStreamId % 2) return;
638 
639           // Create a stream.
640           final FramedStream
641               newStream = new FramedStream(streamId, FramedConnection.this, outFinished,
642               inFinished, headerBlock);
643           lastGoodStreamId = streamId;
644           streams.put(streamId, newStream);
645           executor.execute(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
646             @Override public void execute() {
647               try {
648                 handler.receive(newStream);
649               } catch (IOException e) {
650                 logger.log(Level.INFO, "StreamHandler failure for " + hostName, e);
651                 try {
652                   newStream.close(ErrorCode.PROTOCOL_ERROR);
653                 } catch (IOException ignored) {
654                 }
655               }
656             }
657           });
658           return;
659         }
660       }
661 
662       // The headers claim to be for a new stream, but we already have one.
663       if (headersMode.failIfStreamPresent()) {
664         stream.closeLater(ErrorCode.PROTOCOL_ERROR);
665         removeStream(streamId);
666         return;
667       }
668 
669       // Update an existing stream.
670       stream.receiveHeaders(headerBlock, headersMode);
671       if (inFinished) stream.receiveFin();
672     }
673 
rstStream(int streamId, ErrorCode errorCode)674     @Override public void rstStream(int streamId, ErrorCode errorCode) {
675       if (pushedStream(streamId)) {
676         pushResetLater(streamId, errorCode);
677         return;
678       }
679       FramedStream rstStream = removeStream(streamId);
680       if (rstStream != null) {
681         rstStream.receiveRstStream(errorCode);
682       }
683     }
684 
settings(boolean clearPrevious, Settings newSettings)685     @Override public void settings(boolean clearPrevious, Settings newSettings) {
686       long delta = 0;
687       FramedStream[] streamsToNotify = null;
688       synchronized (FramedConnection.this) {
689         int priorWriteWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
690         if (clearPrevious) peerSettings.clear();
691         peerSettings.merge(newSettings);
692         if (getProtocol() == Protocol.HTTP_2) {
693           ackSettingsLater(newSettings);
694         }
695         int peerInitialWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
696         if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) {
697           delta = peerInitialWindowSize - priorWriteWindowSize;
698           if (!receivedInitialPeerSettings) {
699             addBytesToWriteWindow(delta);
700             receivedInitialPeerSettings = true;
701           }
702           if (!streams.isEmpty()) {
703             streamsToNotify = streams.values().toArray(new FramedStream[streams.size()]);
704           }
705         }
706       }
707       if (streamsToNotify != null && delta != 0) {
708         for (FramedStream stream : streamsToNotify) {
709           synchronized (stream) {
710             stream.addBytesToWriteWindow(delta);
711           }
712         }
713       }
714     }
715 
ackSettingsLater(final Settings peerSettings)716     private void ackSettingsLater(final Settings peerSettings) {
717       executor.execute(new NamedRunnable("OkHttp %s ACK Settings", hostName) {
718         @Override public void execute() {
719           try {
720             frameWriter.ackSettings(peerSettings);
721           } catch (IOException ignored) {
722           }
723         }
724       });
725     }
726 
ackSettings()727     @Override public void ackSettings() {
728       // TODO: If we don't get this callback after sending settings to the peer, SETTINGS_TIMEOUT.
729     }
730 
ping(boolean reply, int payload1, int payload2)731     @Override public void ping(boolean reply, int payload1, int payload2) {
732       if (reply) {
733         Ping ping = removePing(payload1);
734         if (ping != null) {
735           ping.receive();
736         }
737       } else {
738         // Send a reply to a client ping if this is a server and vice versa.
739         writePingLater(true, payload1, payload2, null);
740       }
741     }
742 
goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData)743     @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
744       if (debugData.size() > 0) { // TODO: log the debugData
745       }
746 
747       // Copy the streams first. We don't want to hold a lock when we call receiveRstStream().
748       FramedStream[] streamsCopy;
749       synchronized (FramedConnection.this) {
750         streamsCopy = streams.values().toArray(new FramedStream[streams.size()]);
751         shutdown = true;
752       }
753 
754       // Fail all streams created after the last good stream ID.
755       for (FramedStream framedStream : streamsCopy) {
756         if (framedStream.getId() > lastGoodStreamId && framedStream.isLocallyInitiated()) {
757           framedStream.receiveRstStream(ErrorCode.REFUSED_STREAM);
758           removeStream(framedStream.getId());
759         }
760       }
761     }
762 
windowUpdate(int streamId, long windowSizeIncrement)763     @Override public void windowUpdate(int streamId, long windowSizeIncrement) {
764       if (streamId == 0) {
765         synchronized (FramedConnection.this) {
766           bytesLeftInWriteWindow += windowSizeIncrement;
767           FramedConnection.this.notifyAll();
768         }
769       } else {
770         FramedStream stream = getStream(streamId);
771         if (stream != null) {
772           synchronized (stream) {
773             stream.addBytesToWriteWindow(windowSizeIncrement);
774           }
775         }
776       }
777     }
778 
priority(int streamId, int streamDependency, int weight, boolean exclusive)779     @Override public void priority(int streamId, int streamDependency, int weight,
780         boolean exclusive) {
781       // TODO: honor priority.
782     }
783 
784     @Override
pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)785     public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) {
786       pushRequestLater(promisedStreamId, requestHeaders);
787     }
788 
alternateService(int streamId, String origin, ByteString protocol, String host, int port, long maxAge)789     @Override public void alternateService(int streamId, String origin, ByteString protocol,
790         String host, int port, long maxAge) {
791       // TODO: register alternate service.
792     }
793   }
794 
795   /** Even, positive numbered streams are pushed streams in HTTP/2. */
pushedStream(int streamId)796   private boolean pushedStream(int streamId) {
797     return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0;
798   }
799 
800   // Guarded by this.
801   private final Set<Integer> currentPushRequests = new LinkedHashSet<>();
802 
pushRequestLater(final int streamId, final List<Header> requestHeaders)803   private void pushRequestLater(final int streamId, final List<Header> requestHeaders) {
804     synchronized (this) {
805       if (currentPushRequests.contains(streamId)) {
806         writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR);
807         return;
808       }
809       currentPushRequests.add(streamId);
810     }
811     pushExecutor.execute(new NamedRunnable("OkHttp %s Push Request[%s]", hostName, streamId) {
812       @Override public void execute() {
813         boolean cancel = pushObserver.onRequest(streamId, requestHeaders);
814         try {
815           if (cancel) {
816             frameWriter.rstStream(streamId, ErrorCode.CANCEL);
817             synchronized (FramedConnection.this) {
818               currentPushRequests.remove(streamId);
819             }
820           }
821         } catch (IOException ignored) {
822         }
823       }
824     });
825   }
826 
pushHeadersLater(final int streamId, final List<Header> requestHeaders, final boolean inFinished)827   private void pushHeadersLater(final int streamId, final List<Header> requestHeaders,
828       final boolean inFinished) {
829     pushExecutor.execute(new NamedRunnable("OkHttp %s Push Headers[%s]", hostName, streamId) {
830       @Override public void execute() {
831         boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished);
832         try {
833           if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
834           if (cancel || inFinished) {
835             synchronized (FramedConnection.this) {
836               currentPushRequests.remove(streamId);
837             }
838           }
839         } catch (IOException ignored) {
840         }
841       }
842     });
843   }
844 
845   /**
846    * Eagerly reads {@code byteCount} bytes from the source before launching a background task to
847    * process the data.  This avoids corrupting the stream.
848    */
pushDataLater(final int streamId, final BufferedSource source, final int byteCount, final boolean inFinished)849   private void pushDataLater(final int streamId, final BufferedSource source, final int byteCount,
850       final boolean inFinished) throws IOException {
851     final Buffer buffer = new Buffer();
852     source.require(byteCount); // Eagerly read the frame before firing client thread.
853     source.read(buffer, byteCount);
854     if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount);
855     pushExecutor.execute(new NamedRunnable("OkHttp %s Push Data[%s]", hostName, streamId) {
856       @Override public void execute() {
857         try {
858           boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished);
859           if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
860           if (cancel || inFinished) {
861             synchronized (FramedConnection.this) {
862               currentPushRequests.remove(streamId);
863             }
864           }
865         } catch (IOException ignored) {
866         }
867       }
868     });
869   }
870 
pushResetLater(final int streamId, final ErrorCode errorCode)871   private void pushResetLater(final int streamId, final ErrorCode errorCode) {
872     pushExecutor.execute(new NamedRunnable("OkHttp %s Push Reset[%s]", hostName, streamId) {
873       @Override public void execute() {
874         pushObserver.onReset(streamId, errorCode);
875         synchronized (FramedConnection.this) {
876           currentPushRequests.remove(streamId);
877         }
878       }
879     });
880   }
881 }
882