• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.squareup.okhttp.internal.framed;
17 
18 import com.squareup.okhttp.Protocol;
19 import com.squareup.okhttp.internal.NamedRunnable;
20 import com.squareup.okhttp.internal.Util;
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.net.InetSocketAddress;
25 import java.net.Socket;
26 import java.util.HashMap;
27 import java.util.LinkedHashSet;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.SynchronousQueue;
34 import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.logging.Level;
37 import okio.Buffer;
38 import okio.BufferedSink;
39 import okio.BufferedSource;
40 import okio.ByteString;
41 import okio.Okio;
42 
43 import static com.squareup.okhttp.internal.Internal.logger;
44 import static com.squareup.okhttp.internal.framed.Settings.DEFAULT_INITIAL_WINDOW_SIZE;
45 
46 /**
47  * A socket connection to a remote peer. A connection hosts streams which can
48  * send and receive data.
49  *
50  * <p>Many methods in this API are <strong>synchronous:</strong> the call is
51  * completed before the method returns. This is typical for Java but atypical
52  * for SPDY. This is motivated by exception transparency: an IOException that
53  * was triggered by a certain caller can be caught and handled by that caller.
54  */
55 public final class FramedConnection implements Closeable {
56 
57   // Internal state of this connection is guarded by 'this'. No blocking
58   // operations may be performed while holding this lock!
59   //
60   // Socket writes are guarded by frameWriter.
61   //
62   // Socket reads are unguarded but are only made by the reader thread.
63   //
64   // Certain operations (like SYN_STREAM) need to synchronize on both the
65   // frameWriter (to do blocking I/O) and this (to create streams). Such
66   // operations must synchronize on 'this' last. This ensures that we never
67   // wait for a blocking operation while holding 'this'.
68 
69   private static final ExecutorService executor = new ThreadPoolExecutor(0,
70       Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
71       Util.threadFactory("OkHttp FramedConnection", true));
72 
73   /** The protocol variant, like {@link com.squareup.okhttp.internal.framed.Spdy3}. */
74   final Protocol protocol;
75 
76   /** True if this peer initiated the connection. */
77   final boolean client;
78 
79   /**
80    * User code to run in response to incoming streams or settings. Calls to this are always invoked
81    * on {@link #executor}.
82    */
83   private final Listener listener;
84   private final Map<Integer, FramedStream> streams = new HashMap<>();
85   private final String hostName;
86   private int lastGoodStreamId;
87   private int nextStreamId;
88   private boolean shutdown;
89   private long idleStartTimeNs = System.nanoTime();
90 
91   /** Ensures push promise callbacks events are sent in order per stream. */
92   private final ExecutorService pushExecutor;
93 
94   /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */
95   private Map<Integer, Ping> pings;
96   /** User code to run in response to push promise events. */
97   private final PushObserver pushObserver;
98   private int nextPingId;
99 
100   /**
101    * The total number of bytes consumed by the application, but not yet
102    * acknowledged by sending a {@code WINDOW_UPDATE} frame on this connection.
103    */
104   // Visible for testing
105   long unacknowledgedBytesRead = 0;
106 
107   /**
108    * Count of bytes that can be written on the connection before receiving a
109    * window update.
110    */
111   // Visible for testing
112   long bytesLeftInWriteWindow;
113 
114   /** Settings we communicate to the peer. */
115   Settings okHttpSettings = new Settings();
116 
117   private static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;
118 
119   /** Settings we receive from the peer. */
120   // TODO: MWS will need to guard on this setting before attempting to push.
121   final Settings peerSettings = new Settings();
122 
123   private boolean receivedInitialPeerSettings = false;
124   final Variant variant;
125   final Socket socket;
126   final FrameWriter frameWriter;
127 
128   // Visible for testing
129   final Reader readerRunnable;
130 
FramedConnection(Builder builder)131   private FramedConnection(Builder builder) throws IOException {
132     protocol = builder.protocol;
133     pushObserver = builder.pushObserver;
134     client = builder.client;
135     listener = builder.listener;
136     // http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-5.1.1
137     nextStreamId = builder.client ? 1 : 2;
138     if (builder.client && protocol == Protocol.HTTP_2) {
139       nextStreamId += 2; // In HTTP/2, 1 on client is reserved for Upgrade.
140     }
141 
142     nextPingId = builder.client ? 1 : 2;
143 
144     // Flow control was designed more for servers, or proxies than edge clients.
145     // If we are a client, set the flow control window to 16MiB.  This avoids
146     // thrashing window updates every 64KiB, yet small enough to avoid blowing
147     // up the heap.
148     if (builder.client) {
149       okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, OKHTTP_CLIENT_WINDOW_SIZE);
150     }
151 
152     hostName = builder.hostName;
153 
154     if (protocol == Protocol.HTTP_2) {
155       variant = new Http2();
156       // Like newSingleThreadExecutor, except lazy creates the thread.
157       pushExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
158           new LinkedBlockingQueue<Runnable>(),
159           Util.threadFactory(String.format("OkHttp %s Push Observer", hostName), true));
160       // 1 less than SPDY http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.9.2
161       peerSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, 65535);
162       peerSettings.set(Settings.MAX_FRAME_SIZE, 0, Http2.INITIAL_MAX_FRAME_SIZE);
163     } else if (protocol == Protocol.SPDY_3) {
164       variant = new Spdy3();
165       pushExecutor = null;
166     } else {
167       throw new AssertionError(protocol);
168     }
169     bytesLeftInWriteWindow = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
170     socket = builder.socket;
171     frameWriter = variant.newWriter(builder.sink, client);
172 
173     readerRunnable = new Reader(variant.newReader(builder.source, client));
174     new Thread(readerRunnable).start(); // Not a daemon thread.
175   }
176 
177   /** The protocol as selected using ALPN. */
getProtocol()178   public Protocol getProtocol() {
179     return protocol;
180   }
181 
182   /**
183    * Returns the number of {@link FramedStream#isOpen() open streams} on this
184    * connection.
185    */
openStreamCount()186   public synchronized int openStreamCount() {
187     return streams.size();
188   }
189 
getStream(int id)190   synchronized FramedStream getStream(int id) {
191     return streams.get(id);
192   }
193 
removeStream(int streamId)194   synchronized FramedStream removeStream(int streamId) {
195     FramedStream stream = streams.remove(streamId);
196     if (stream != null && streams.isEmpty()) {
197       setIdle(true);
198     }
199     notifyAll(); // The removed stream may be blocked on a connection-wide window update.
200     return stream;
201   }
202 
setIdle(boolean value)203   private synchronized void setIdle(boolean value) {
204     idleStartTimeNs = value ? System.nanoTime() : Long.MAX_VALUE;
205   }
206 
207   /** Returns true if this connection is idle. */
isIdle()208   public synchronized boolean isIdle() {
209     return idleStartTimeNs != Long.MAX_VALUE;
210   }
211 
maxConcurrentStreams()212   public synchronized int maxConcurrentStreams() {
213     return peerSettings.getMaxConcurrentStreams(Integer.MAX_VALUE);
214   }
215 
216   /**
217    * Returns the time in ns when this connection became idle or Long.MAX_VALUE
218    * if connection is not idle.
219    */
getIdleStartTimeNs()220   public synchronized long getIdleStartTimeNs() {
221     return idleStartTimeNs;
222   }
223 
224   /**
225    * Returns a new server-initiated stream.
226    *
227    * @param associatedStreamId the stream that triggered the sender to create
228    *     this stream.
229    * @param out true to create an output stream that we can use to send data
230    *     to the remote peer. Corresponds to {@code FLAG_FIN}.
231    */
pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out)232   public FramedStream pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out)
233       throws IOException {
234     if (client) throw new IllegalStateException("Client cannot push requests.");
235     if (protocol != Protocol.HTTP_2) throw new IllegalStateException("protocol != HTTP_2");
236     return newStream(associatedStreamId, requestHeaders, out, false);
237   }
238 
239   /**
240    * Returns a new locally-initiated stream.
241    *
242    * @param out true to create an output stream that we can use to send data to the remote peer.
243    *     Corresponds to {@code FLAG_FIN}.
244    * @param in true to create an input stream that the remote peer can use to send data to us.
245    *     Corresponds to {@code FLAG_UNIDIRECTIONAL}.
246    */
newStream(List<Header> requestHeaders, boolean out, boolean in)247   public FramedStream newStream(List<Header> requestHeaders, boolean out, boolean in)
248       throws IOException {
249     return newStream(0, requestHeaders, out, in);
250   }
251 
newStream(int associatedStreamId, List<Header> requestHeaders, boolean out, boolean in)252   private FramedStream newStream(int associatedStreamId, List<Header> requestHeaders, boolean out,
253       boolean in) throws IOException {
254     boolean outFinished = !out;
255     boolean inFinished = !in;
256     FramedStream stream;
257     int streamId;
258 
259     synchronized (frameWriter) {
260       synchronized (this) {
261         if (shutdown) {
262           throw new IOException("shutdown");
263         }
264         streamId = nextStreamId;
265         nextStreamId += 2;
266         stream = new FramedStream(streamId, this, outFinished, inFinished, requestHeaders);
267         if (stream.isOpen()) {
268           streams.put(streamId, stream);
269           setIdle(false);
270         }
271       }
272       if (associatedStreamId == 0) {
273         frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId,
274             requestHeaders);
275       } else if (client) {
276         throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
277       } else { // HTTP/2 has a PUSH_PROMISE frame.
278         frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders);
279       }
280     }
281 
282     if (!out) {
283       frameWriter.flush();
284     }
285 
286     return stream;
287   }
288 
writeSynReply(int streamId, boolean outFinished, List<Header> alternating)289   void writeSynReply(int streamId, boolean outFinished, List<Header> alternating)
290       throws IOException {
291     frameWriter.synReply(outFinished, streamId, alternating);
292   }
293 
294   /**
295    * Callers of this method are not thread safe, and sometimes on application threads. Most often,
296    * this method will be called to send a buffer worth of data to the peer.
297    *
298    * <p>Writes are subject to the write window of the stream and the connection. Until there is a
299    * window sufficient to send {@code byteCount}, the caller will block. For example, a user of
300    * {@code HttpURLConnection} who flushes more bytes to the output stream than the connection's
301    * write window will block.
302    *
303    * <p>Zero {@code byteCount} writes are not subject to flow control and will not block. The only
304    * use case for zero {@code byteCount} is closing a flushed output stream.
305    */
writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)306   public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)
307       throws IOException {
308     if (byteCount == 0) { // Empty data frames are not flow-controlled.
309       frameWriter.data(outFinished, streamId, buffer, 0);
310       return;
311     }
312 
313     while (byteCount > 0) {
314       int toWrite;
315       synchronized (FramedConnection.this) {
316         try {
317           while (bytesLeftInWriteWindow <= 0) {
318             // Before blocking, confirm that the stream we're writing is still open. It's possible
319             // that the stream has since been closed (such as if this write timed out.)
320             if (!streams.containsKey(streamId)) {
321               throw new IOException("stream closed");
322             }
323             FramedConnection.this.wait(); // Wait until we receive a WINDOW_UPDATE.
324           }
325         } catch (InterruptedException e) {
326           throw new InterruptedIOException();
327         }
328 
329         toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow);
330         toWrite = Math.min(toWrite, frameWriter.maxDataLength());
331         bytesLeftInWriteWindow -= toWrite;
332       }
333 
334       byteCount -= toWrite;
335       frameWriter.data(outFinished && byteCount == 0, streamId, buffer, toWrite);
336     }
337   }
338 
339   /**
340    * {@code delta} will be negative if a settings frame initial window is
341    * smaller than the last.
342    */
addBytesToWriteWindow(long delta)343   void addBytesToWriteWindow(long delta) {
344     bytesLeftInWriteWindow += delta;
345     if (delta > 0) FramedConnection.this.notifyAll();
346   }
347 
writeSynResetLater(final int streamId, final ErrorCode errorCode)348   void writeSynResetLater(final int streamId, final ErrorCode errorCode) {
349     executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
350       @Override public void execute() {
351         try {
352           writeSynReset(streamId, errorCode);
353         } catch (IOException ignored) {
354         }
355       }
356     });
357   }
358 
writeSynReset(int streamId, ErrorCode statusCode)359   void writeSynReset(int streamId, ErrorCode statusCode) throws IOException {
360     frameWriter.rstStream(streamId, statusCode);
361   }
362 
writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead)363   void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) {
364     executor.execute(new NamedRunnable("OkHttp Window Update %s stream %d", hostName, streamId) {
365       @Override public void execute() {
366         try {
367           frameWriter.windowUpdate(streamId, unacknowledgedBytesRead);
368         } catch (IOException ignored) {
369         }
370       }
371     });
372   }
373 
374   /**
375    * Sends a ping frame to the peer. Use the returned object to await the
376    * ping's response and observe its round trip time.
377    */
ping()378   public Ping ping() throws IOException {
379     Ping ping = new Ping();
380     int pingId;
381     synchronized (this) {
382       if (shutdown) {
383         throw new IOException("shutdown");
384       }
385       pingId = nextPingId;
386       nextPingId += 2;
387       if (pings == null) pings = new HashMap<>();
388       pings.put(pingId, ping);
389     }
390     writePing(false, pingId, 0x4f4b6f6b /* ASCII "OKok" */, ping);
391     return ping;
392   }
393 
writePingLater( final boolean reply, final int payload1, final int payload2, final Ping ping)394   private void writePingLater(
395       final boolean reply, final int payload1, final int payload2, final Ping ping) {
396     executor.execute(new NamedRunnable("OkHttp %s ping %08x%08x",
397         hostName, payload1, payload2) {
398       @Override public void execute() {
399         try {
400           writePing(reply, payload1, payload2, ping);
401         } catch (IOException ignored) {
402         }
403       }
404     });
405   }
406 
writePing(boolean reply, int payload1, int payload2, Ping ping)407   private void writePing(boolean reply, int payload1, int payload2, Ping ping) throws IOException {
408     synchronized (frameWriter) {
409       // Observe the sent time immediately before performing I/O.
410       if (ping != null) ping.send();
411       frameWriter.ping(reply, payload1, payload2);
412     }
413   }
414 
removePing(int id)415   private synchronized Ping removePing(int id) {
416     return pings != null ? pings.remove(id) : null;
417   }
418 
flush()419   public void flush() throws IOException {
420     frameWriter.flush();
421   }
422 
423   /**
424    * Degrades this connection such that new streams can neither be created
425    * locally, nor accepted from the remote peer. Existing streams are not
426    * impacted. This is intended to permit an endpoint to gracefully stop
427    * accepting new requests without harming previously established streams.
428    */
shutdown(ErrorCode statusCode)429   public void shutdown(ErrorCode statusCode) throws IOException {
430     synchronized (frameWriter) {
431       int lastGoodStreamId;
432       synchronized (this) {
433         if (shutdown) {
434           return;
435         }
436         shutdown = true;
437         lastGoodStreamId = this.lastGoodStreamId;
438       }
439       // TODO: propagate exception message into debugData
440       frameWriter.goAway(lastGoodStreamId, statusCode, Util.EMPTY_BYTE_ARRAY);
441     }
442   }
443 
444   /**
445    * Closes this connection. This cancels all open streams and unanswered
446    * pings. It closes the underlying input and output streams and shuts down
447    * internal executor services.
448    */
close()449   @Override public void close() throws IOException {
450     close(ErrorCode.NO_ERROR, ErrorCode.CANCEL);
451   }
452 
close(ErrorCode connectionCode, ErrorCode streamCode)453   private void close(ErrorCode connectionCode, ErrorCode streamCode) throws IOException {
454     assert (!Thread.holdsLock(this));
455     IOException thrown = null;
456     try {
457       shutdown(connectionCode);
458     } catch (IOException e) {
459       thrown = e;
460     }
461 
462     FramedStream[] streamsToClose = null;
463     Ping[] pingsToCancel = null;
464     synchronized (this) {
465       if (!streams.isEmpty()) {
466         streamsToClose = streams.values().toArray(new FramedStream[streams.size()]);
467         streams.clear();
468         setIdle(false);
469       }
470       if (pings != null) {
471         pingsToCancel = pings.values().toArray(new Ping[pings.size()]);
472         pings = null;
473       }
474     }
475 
476     if (streamsToClose != null) {
477       for (FramedStream stream : streamsToClose) {
478         try {
479           stream.close(streamCode);
480         } catch (IOException e) {
481           if (thrown != null) thrown = e;
482         }
483       }
484     }
485 
486     if (pingsToCancel != null) {
487       for (Ping ping : pingsToCancel) {
488         ping.cancel();
489       }
490     }
491 
492     // Close the writer to release its resources (such as deflaters).
493     try {
494       frameWriter.close();
495     } catch (IOException e) {
496       if (thrown == null) thrown = e;
497     }
498 
499     // Close the socket to break out the reader thread, which will clean up after itself.
500     try {
501       socket.close();
502     } catch (IOException e) {
503       thrown = e;
504     }
505 
506     if (thrown != null) throw thrown;
507   }
508 
509   /**
510    * Sends a connection header if the current variant requires it. This should
511    * be called after {@link Builder#build} for all new connections.
512    */
sendConnectionPreface()513   public void sendConnectionPreface() throws IOException {
514     frameWriter.connectionPreface();
515     frameWriter.settings(okHttpSettings);
516     int windowSize = okHttpSettings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE);
517     if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) {
518       frameWriter.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE);
519     }
520   }
521 
522   /** Merges {@code settings} into this peer's settings and sends them to the remote peer. */
setSettings(Settings settings)523   public void setSettings(Settings settings) throws IOException {
524     synchronized (frameWriter) {
525       synchronized (this) {
526         if (shutdown) {
527           throw new IOException("shutdown");
528         }
529         okHttpSettings.merge(settings);
530         frameWriter.settings(settings);
531       }
532     }
533   }
534 
535   public static class Builder {
536     private Socket socket;
537     private String hostName;
538     private BufferedSource source;
539     private BufferedSink sink;
540     private Listener listener = Listener.REFUSE_INCOMING_STREAMS;
541     private Protocol protocol = Protocol.SPDY_3;
542     private PushObserver pushObserver = PushObserver.CANCEL;
543     private boolean client;
544 
545     /**
546      * @param client true if this peer initiated the connection; false if this
547      *     peer accepted the connection.
548      */
Builder(boolean client)549     public Builder(boolean client) throws IOException {
550       this.client = client;
551     }
552 
socket(Socket socket)553     public Builder socket(Socket socket) throws IOException {
554       return socket(socket, ((InetSocketAddress) socket.getRemoteSocketAddress()).getHostName(),
555           Okio.buffer(Okio.source(socket)), Okio.buffer(Okio.sink(socket)));
556     }
557 
socket( Socket socket, String hostName, BufferedSource source, BufferedSink sink)558     public Builder socket(
559         Socket socket, String hostName, BufferedSource source, BufferedSink sink) {
560       this.socket = socket;
561       this.hostName = hostName;
562       this.source = source;
563       this.sink = sink;
564       return this;
565     }
566 
listener(Listener listener)567     public Builder listener(Listener listener) {
568       this.listener = listener;
569       return this;
570     }
571 
protocol(Protocol protocol)572     public Builder protocol(Protocol protocol) {
573       this.protocol = protocol;
574       return this;
575     }
576 
pushObserver(PushObserver pushObserver)577     public Builder pushObserver(PushObserver pushObserver) {
578       this.pushObserver = pushObserver;
579       return this;
580     }
581 
build()582     public FramedConnection build() throws IOException {
583       return new FramedConnection(this);
584     }
585   }
586 
587   /**
588    * Methods in this class must not lock FrameWriter.  If a method needs to
589    * write a frame, create an async task to do so.
590    */
591   class Reader extends NamedRunnable implements FrameReader.Handler {
592     final FrameReader frameReader;
593 
Reader(FrameReader frameReader)594     private Reader(FrameReader frameReader) {
595       super("OkHttp %s", hostName);
596       this.frameReader = frameReader;
597     }
598 
execute()599     @Override protected void execute() {
600       ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
601       ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
602       try {
603         if (!client) {
604           frameReader.readConnectionPreface();
605         }
606         while (frameReader.nextFrame(this)) {
607         }
608         connectionErrorCode = ErrorCode.NO_ERROR;
609         streamErrorCode = ErrorCode.CANCEL;
610       } catch (IOException e) {
611         connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
612         streamErrorCode = ErrorCode.PROTOCOL_ERROR;
613       } finally {
614         try {
615           close(connectionErrorCode, streamErrorCode);
616         } catch (IOException ignored) {
617         }
618         Util.closeQuietly(frameReader);
619       }
620     }
621 
data(boolean inFinished, int streamId, BufferedSource source, int length)622     @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
623         throws IOException {
624       if (pushedStream(streamId)) {
625         pushDataLater(streamId, source, length, inFinished);
626         return;
627       }
628       FramedStream dataStream = getStream(streamId);
629       if (dataStream == null) {
630         writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
631         source.skip(length);
632         return;
633       }
634       dataStream.receiveData(source, length);
635       if (inFinished) {
636         dataStream.receiveFin();
637       }
638     }
639 
headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode)640     @Override public void headers(boolean outFinished, boolean inFinished, int streamId,
641         int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode) {
642       if (pushedStream(streamId)) {
643         pushHeadersLater(streamId, headerBlock, inFinished);
644         return;
645       }
646       FramedStream stream;
647       synchronized (FramedConnection.this) {
648         // If we're shutdown, don't bother with this stream.
649         if (shutdown) return;
650 
651         stream = getStream(streamId);
652 
653         if (stream == null) {
654           // The headers claim to be for an existing stream, but we don't have one.
655           if (headersMode.failIfStreamAbsent()) {
656             writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
657             return;
658           }
659 
660           // If the stream ID is less than the last created ID, assume it's already closed.
661           if (streamId <= lastGoodStreamId) return;
662 
663           // If the stream ID is in the client's namespace, assume it's already closed.
664           if (streamId % 2 == nextStreamId % 2) return;
665 
666           // Create a stream.
667           final FramedStream
668               newStream = new FramedStream(streamId, FramedConnection.this, outFinished,
669               inFinished, headerBlock);
670           lastGoodStreamId = streamId;
671           streams.put(streamId, newStream);
672           executor.execute(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
673             @Override public void execute() {
674               try {
675                 listener.onStream(newStream);
676               } catch (IOException e) {
677                 logger.log(Level.INFO, "FramedConnection.Listener failure for " + hostName, e);
678                 try {
679                   newStream.close(ErrorCode.PROTOCOL_ERROR);
680                 } catch (IOException ignored) {
681                 }
682               }
683             }
684           });
685           return;
686         }
687       }
688 
689       // The headers claim to be for a new stream, but we already have one.
690       if (headersMode.failIfStreamPresent()) {
691         stream.closeLater(ErrorCode.PROTOCOL_ERROR);
692         removeStream(streamId);
693         return;
694       }
695 
696       // Update an existing stream.
697       stream.receiveHeaders(headerBlock, headersMode);
698       if (inFinished) stream.receiveFin();
699     }
700 
rstStream(int streamId, ErrorCode errorCode)701     @Override public void rstStream(int streamId, ErrorCode errorCode) {
702       if (pushedStream(streamId)) {
703         pushResetLater(streamId, errorCode);
704         return;
705       }
706       FramedStream rstStream = removeStream(streamId);
707       if (rstStream != null) {
708         rstStream.receiveRstStream(errorCode);
709       }
710     }
711 
settings(boolean clearPrevious, Settings newSettings)712     @Override public void settings(boolean clearPrevious, Settings newSettings) {
713       long delta = 0;
714       FramedStream[] streamsToNotify = null;
715       synchronized (FramedConnection.this) {
716         int priorWriteWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
717         if (clearPrevious) peerSettings.clear();
718         peerSettings.merge(newSettings);
719         if (getProtocol() == Protocol.HTTP_2) {
720           ackSettingsLater(newSettings);
721         }
722         int peerInitialWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
723         if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) {
724           delta = peerInitialWindowSize - priorWriteWindowSize;
725           if (!receivedInitialPeerSettings) {
726             addBytesToWriteWindow(delta);
727             receivedInitialPeerSettings = true;
728           }
729           if (!streams.isEmpty()) {
730             streamsToNotify = streams.values().toArray(new FramedStream[streams.size()]);
731           }
732         }
733         executor.execute(new NamedRunnable("OkHttp %s settings", hostName) {
734           @Override public void execute() {
735             listener.onSettings(FramedConnection.this);
736           }
737         });
738       }
739       if (streamsToNotify != null && delta != 0) {
740         for (FramedStream stream : streamsToNotify) {
741           synchronized (stream) {
742             stream.addBytesToWriteWindow(delta);
743           }
744         }
745       }
746     }
747 
ackSettingsLater(final Settings peerSettings)748     private void ackSettingsLater(final Settings peerSettings) {
749       executor.execute(new NamedRunnable("OkHttp %s ACK Settings", hostName) {
750         @Override public void execute() {
751           try {
752             frameWriter.ackSettings(peerSettings);
753           } catch (IOException ignored) {
754           }
755         }
756       });
757     }
758 
ackSettings()759     @Override public void ackSettings() {
760       // TODO: If we don't get this callback after sending settings to the peer, SETTINGS_TIMEOUT.
761     }
762 
ping(boolean reply, int payload1, int payload2)763     @Override public void ping(boolean reply, int payload1, int payload2) {
764       if (reply) {
765         Ping ping = removePing(payload1);
766         if (ping != null) {
767           ping.receive();
768         }
769       } else {
770         // Send a reply to a client ping if this is a server and vice versa.
771         writePingLater(true, payload1, payload2, null);
772       }
773     }
774 
goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData)775     @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
776       if (debugData.size() > 0) { // TODO: log the debugData
777       }
778 
779       // Copy the streams first. We don't want to hold a lock when we call receiveRstStream().
780       FramedStream[] streamsCopy;
781       synchronized (FramedConnection.this) {
782         streamsCopy = streams.values().toArray(new FramedStream[streams.size()]);
783         shutdown = true;
784       }
785 
786       // Fail all streams created after the last good stream ID.
787       for (FramedStream framedStream : streamsCopy) {
788         if (framedStream.getId() > lastGoodStreamId && framedStream.isLocallyInitiated()) {
789           framedStream.receiveRstStream(ErrorCode.REFUSED_STREAM);
790           removeStream(framedStream.getId());
791         }
792       }
793     }
794 
windowUpdate(int streamId, long windowSizeIncrement)795     @Override public void windowUpdate(int streamId, long windowSizeIncrement) {
796       if (streamId == 0) {
797         synchronized (FramedConnection.this) {
798           bytesLeftInWriteWindow += windowSizeIncrement;
799           FramedConnection.this.notifyAll();
800         }
801       } else {
802         FramedStream stream = getStream(streamId);
803         if (stream != null) {
804           synchronized (stream) {
805             stream.addBytesToWriteWindow(windowSizeIncrement);
806           }
807         }
808       }
809     }
810 
priority(int streamId, int streamDependency, int weight, boolean exclusive)811     @Override public void priority(int streamId, int streamDependency, int weight,
812         boolean exclusive) {
813       // TODO: honor priority.
814     }
815 
816     @Override
pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)817     public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) {
818       pushRequestLater(promisedStreamId, requestHeaders);
819     }
820 
alternateService(int streamId, String origin, ByteString protocol, String host, int port, long maxAge)821     @Override public void alternateService(int streamId, String origin, ByteString protocol,
822         String host, int port, long maxAge) {
823       // TODO: register alternate service.
824     }
825   }
826 
827   /** Even, positive numbered streams are pushed streams in HTTP/2. */
pushedStream(int streamId)828   private boolean pushedStream(int streamId) {
829     return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0;
830   }
831 
832   // Guarded by this.
833   private final Set<Integer> currentPushRequests = new LinkedHashSet<>();
834 
pushRequestLater(final int streamId, final List<Header> requestHeaders)835   private void pushRequestLater(final int streamId, final List<Header> requestHeaders) {
836     synchronized (this) {
837       if (currentPushRequests.contains(streamId)) {
838         writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR);
839         return;
840       }
841       currentPushRequests.add(streamId);
842     }
843     pushExecutor.execute(new NamedRunnable("OkHttp %s Push Request[%s]", hostName, streamId) {
844       @Override public void execute() {
845         boolean cancel = pushObserver.onRequest(streamId, requestHeaders);
846         try {
847           if (cancel) {
848             frameWriter.rstStream(streamId, ErrorCode.CANCEL);
849             synchronized (FramedConnection.this) {
850               currentPushRequests.remove(streamId);
851             }
852           }
853         } catch (IOException ignored) {
854         }
855       }
856     });
857   }
858 
pushHeadersLater(final int streamId, final List<Header> requestHeaders, final boolean inFinished)859   private void pushHeadersLater(final int streamId, final List<Header> requestHeaders,
860       final boolean inFinished) {
861     pushExecutor.execute(new NamedRunnable("OkHttp %s Push Headers[%s]", hostName, streamId) {
862       @Override public void execute() {
863         boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished);
864         try {
865           if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
866           if (cancel || inFinished) {
867             synchronized (FramedConnection.this) {
868               currentPushRequests.remove(streamId);
869             }
870           }
871         } catch (IOException ignored) {
872         }
873       }
874     });
875   }
876 
877   /**
878    * Eagerly reads {@code byteCount} bytes from the source before launching a background task to
879    * process the data.  This avoids corrupting the stream.
880    */
pushDataLater(final int streamId, final BufferedSource source, final int byteCount, final boolean inFinished)881   private void pushDataLater(final int streamId, final BufferedSource source, final int byteCount,
882       final boolean inFinished) throws IOException {
883     final Buffer buffer = new Buffer();
884     source.require(byteCount); // Eagerly read the frame before firing client thread.
885     source.read(buffer, byteCount);
886     if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount);
887     pushExecutor.execute(new NamedRunnable("OkHttp %s Push Data[%s]", hostName, streamId) {
888       @Override public void execute() {
889         try {
890           boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished);
891           if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
892           if (cancel || inFinished) {
893             synchronized (FramedConnection.this) {
894               currentPushRequests.remove(streamId);
895             }
896           }
897         } catch (IOException ignored) {
898         }
899       }
900     });
901   }
902 
pushResetLater(final int streamId, final ErrorCode errorCode)903   private void pushResetLater(final int streamId, final ErrorCode errorCode) {
904     pushExecutor.execute(new NamedRunnable("OkHttp %s Push Reset[%s]", hostName, streamId) {
905       @Override public void execute() {
906         pushObserver.onReset(streamId, errorCode);
907         synchronized (FramedConnection.this) {
908           currentPushRequests.remove(streamId);
909         }
910       }
911     });
912   }
913 
914   /** Listener of streams and settings initiated by the peer. */
915   public abstract static class Listener {
916     public static final Listener REFUSE_INCOMING_STREAMS = new Listener() {
917       @Override public void onStream(FramedStream stream) throws IOException {
918         stream.close(ErrorCode.REFUSED_STREAM);
919       }
920     };
921 
922     /**
923      * Handle a new stream from this connection's peer. Implementations should
924      * respond by either {@linkplain FramedStream#reply replying to the stream}
925      * or {@linkplain FramedStream#close closing it}. This response does not
926      * need to be synchronous.
927      */
onStream(FramedStream stream)928     public abstract void onStream(FramedStream stream) throws IOException;
929 
930     /**
931      * Notification that the connection's peer's settings may have changed.
932      * Implementations should take appropriate action to handle the updated
933      * settings.
934      *
935      * <p>It is the implementation's responsibility to handle concurrent calls
936      * to this method. A remote peer that sends multiple settings frames will
937      * trigger multiple calls to this method, and those calls are not
938      * necessarily serialized.
939      */
onSettings(FramedConnection connection)940     public void onSettings(FramedConnection connection) {
941     }
942   }
943 }
944