1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.squareup.okhttp.internal.framed; 17 18 import com.squareup.okhttp.Protocol; 19 import com.squareup.okhttp.internal.NamedRunnable; 20 import com.squareup.okhttp.internal.Util; 21 import java.io.Closeable; 22 import java.io.IOException; 23 import java.io.InterruptedIOException; 24 import java.net.InetSocketAddress; 25 import java.net.Socket; 26 import java.util.HashMap; 27 import java.util.LinkedHashSet; 28 import java.util.List; 29 import java.util.Map; 30 import java.util.Set; 31 import java.util.concurrent.ExecutorService; 32 import java.util.concurrent.LinkedBlockingQueue; 33 import java.util.concurrent.SynchronousQueue; 34 import java.util.concurrent.ThreadPoolExecutor; 35 import java.util.concurrent.TimeUnit; 36 import java.util.logging.Level; 37 import okio.Buffer; 38 import okio.BufferedSink; 39 import okio.BufferedSource; 40 import okio.ByteString; 41 import okio.Okio; 42 43 import static com.squareup.okhttp.internal.Internal.logger; 44 import static com.squareup.okhttp.internal.framed.Settings.DEFAULT_INITIAL_WINDOW_SIZE; 45 46 /** 47 * A socket connection to a remote peer. A connection hosts streams which can 48 * send and receive data. 49 * 50 * <p>Many methods in this API are <strong>synchronous:</strong> the call is 51 * completed before the method returns. This is typical for Java but atypical 52 * for SPDY. This is motivated by exception transparency: an IOException that 53 * was triggered by a certain caller can be caught and handled by that caller. 54 */ 55 public final class FramedConnection implements Closeable { 56 57 // Internal state of this connection is guarded by 'this'. No blocking 58 // operations may be performed while holding this lock! 59 // 60 // Socket writes are guarded by frameWriter. 61 // 62 // Socket reads are unguarded but are only made by the reader thread. 63 // 64 // Certain operations (like SYN_STREAM) need to synchronize on both the 65 // frameWriter (to do blocking I/O) and this (to create streams). Such 66 // operations must synchronize on 'this' last. This ensures that we never 67 // wait for a blocking operation while holding 'this'. 68 69 private static final ExecutorService executor = new ThreadPoolExecutor(0, 70 Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 71 Util.threadFactory("OkHttp FramedConnection", true)); 72 73 /** The protocol variant, like {@link com.squareup.okhttp.internal.framed.Spdy3}. */ 74 final Protocol protocol; 75 76 /** True if this peer initiated the connection. */ 77 final boolean client; 78 79 /** 80 * User code to run in response to incoming streams or settings. Calls to this are always invoked 81 * on {@link #executor}. 82 */ 83 private final Listener listener; 84 private final Map<Integer, FramedStream> streams = new HashMap<>(); 85 private final String hostName; 86 private int lastGoodStreamId; 87 private int nextStreamId; 88 private boolean shutdown; 89 private long idleStartTimeNs = System.nanoTime(); 90 91 /** Ensures push promise callbacks events are sent in order per stream. */ 92 private final ExecutorService pushExecutor; 93 94 /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */ 95 private Map<Integer, Ping> pings; 96 /** User code to run in response to push promise events. */ 97 private final PushObserver pushObserver; 98 private int nextPingId; 99 100 /** 101 * The total number of bytes consumed by the application, but not yet 102 * acknowledged by sending a {@code WINDOW_UPDATE} frame on this connection. 103 */ 104 // Visible for testing 105 long unacknowledgedBytesRead = 0; 106 107 /** 108 * Count of bytes that can be written on the connection before receiving a 109 * window update. 110 */ 111 // Visible for testing 112 long bytesLeftInWriteWindow; 113 114 /** Settings we communicate to the peer. */ 115 Settings okHttpSettings = new Settings(); 116 117 private static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024; 118 119 /** Settings we receive from the peer. */ 120 // TODO: MWS will need to guard on this setting before attempting to push. 121 final Settings peerSettings = new Settings(); 122 123 private boolean receivedInitialPeerSettings = false; 124 final Variant variant; 125 final Socket socket; 126 final FrameWriter frameWriter; 127 128 // Visible for testing 129 final Reader readerRunnable; 130 FramedConnection(Builder builder)131 private FramedConnection(Builder builder) throws IOException { 132 protocol = builder.protocol; 133 pushObserver = builder.pushObserver; 134 client = builder.client; 135 listener = builder.listener; 136 // http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-5.1.1 137 nextStreamId = builder.client ? 1 : 2; 138 if (builder.client && protocol == Protocol.HTTP_2) { 139 nextStreamId += 2; // In HTTP/2, 1 on client is reserved for Upgrade. 140 } 141 142 nextPingId = builder.client ? 1 : 2; 143 144 // Flow control was designed more for servers, or proxies than edge clients. 145 // If we are a client, set the flow control window to 16MiB. This avoids 146 // thrashing window updates every 64KiB, yet small enough to avoid blowing 147 // up the heap. 148 if (builder.client) { 149 okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, OKHTTP_CLIENT_WINDOW_SIZE); 150 } 151 152 hostName = builder.hostName; 153 154 if (protocol == Protocol.HTTP_2) { 155 variant = new Http2(); 156 // Like newSingleThreadExecutor, except lazy creates the thread. 157 pushExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, 158 new LinkedBlockingQueue<Runnable>(), 159 Util.threadFactory(String.format("OkHttp %s Push Observer", hostName), true)); 160 // 1 less than SPDY http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.9.2 161 peerSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, 65535); 162 peerSettings.set(Settings.MAX_FRAME_SIZE, 0, Http2.INITIAL_MAX_FRAME_SIZE); 163 } else if (protocol == Protocol.SPDY_3) { 164 variant = new Spdy3(); 165 pushExecutor = null; 166 } else { 167 throw new AssertionError(protocol); 168 } 169 bytesLeftInWriteWindow = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 170 socket = builder.socket; 171 frameWriter = variant.newWriter(builder.sink, client); 172 173 readerRunnable = new Reader(variant.newReader(builder.source, client)); 174 new Thread(readerRunnable).start(); // Not a daemon thread. 175 } 176 177 /** The protocol as selected using ALPN. */ getProtocol()178 public Protocol getProtocol() { 179 return protocol; 180 } 181 182 /** 183 * Returns the number of {@link FramedStream#isOpen() open streams} on this 184 * connection. 185 */ openStreamCount()186 public synchronized int openStreamCount() { 187 return streams.size(); 188 } 189 getStream(int id)190 synchronized FramedStream getStream(int id) { 191 return streams.get(id); 192 } 193 removeStream(int streamId)194 synchronized FramedStream removeStream(int streamId) { 195 FramedStream stream = streams.remove(streamId); 196 if (stream != null && streams.isEmpty()) { 197 setIdle(true); 198 } 199 notifyAll(); // The removed stream may be blocked on a connection-wide window update. 200 return stream; 201 } 202 setIdle(boolean value)203 private synchronized void setIdle(boolean value) { 204 idleStartTimeNs = value ? System.nanoTime() : Long.MAX_VALUE; 205 } 206 207 /** Returns true if this connection is idle. */ isIdle()208 public synchronized boolean isIdle() { 209 return idleStartTimeNs != Long.MAX_VALUE; 210 } 211 maxConcurrentStreams()212 public synchronized int maxConcurrentStreams() { 213 return peerSettings.getMaxConcurrentStreams(Integer.MAX_VALUE); 214 } 215 216 /** 217 * Returns the time in ns when this connection became idle or Long.MAX_VALUE 218 * if connection is not idle. 219 */ getIdleStartTimeNs()220 public synchronized long getIdleStartTimeNs() { 221 return idleStartTimeNs; 222 } 223 224 /** 225 * Returns a new server-initiated stream. 226 * 227 * @param associatedStreamId the stream that triggered the sender to create 228 * this stream. 229 * @param out true to create an output stream that we can use to send data 230 * to the remote peer. Corresponds to {@code FLAG_FIN}. 231 */ pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out)232 public FramedStream pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out) 233 throws IOException { 234 if (client) throw new IllegalStateException("Client cannot push requests."); 235 if (protocol != Protocol.HTTP_2) throw new IllegalStateException("protocol != HTTP_2"); 236 return newStream(associatedStreamId, requestHeaders, out, false); 237 } 238 239 /** 240 * Returns a new locally-initiated stream. 241 * 242 * @param out true to create an output stream that we can use to send data to the remote peer. 243 * Corresponds to {@code FLAG_FIN}. 244 * @param in true to create an input stream that the remote peer can use to send data to us. 245 * Corresponds to {@code FLAG_UNIDIRECTIONAL}. 246 */ newStream(List<Header> requestHeaders, boolean out, boolean in)247 public FramedStream newStream(List<Header> requestHeaders, boolean out, boolean in) 248 throws IOException { 249 return newStream(0, requestHeaders, out, in); 250 } 251 newStream(int associatedStreamId, List<Header> requestHeaders, boolean out, boolean in)252 private FramedStream newStream(int associatedStreamId, List<Header> requestHeaders, boolean out, 253 boolean in) throws IOException { 254 boolean outFinished = !out; 255 boolean inFinished = !in; 256 FramedStream stream; 257 int streamId; 258 259 synchronized (frameWriter) { 260 synchronized (this) { 261 if (shutdown) { 262 throw new IOException("shutdown"); 263 } 264 streamId = nextStreamId; 265 nextStreamId += 2; 266 stream = new FramedStream(streamId, this, outFinished, inFinished, requestHeaders); 267 if (stream.isOpen()) { 268 streams.put(streamId, stream); 269 setIdle(false); 270 } 271 } 272 if (associatedStreamId == 0) { 273 frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, 274 requestHeaders); 275 } else if (client) { 276 throw new IllegalArgumentException("client streams shouldn't have associated stream IDs"); 277 } else { // HTTP/2 has a PUSH_PROMISE frame. 278 frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders); 279 } 280 } 281 282 if (!out) { 283 frameWriter.flush(); 284 } 285 286 return stream; 287 } 288 writeSynReply(int streamId, boolean outFinished, List<Header> alternating)289 void writeSynReply(int streamId, boolean outFinished, List<Header> alternating) 290 throws IOException { 291 frameWriter.synReply(outFinished, streamId, alternating); 292 } 293 294 /** 295 * Callers of this method are not thread safe, and sometimes on application threads. Most often, 296 * this method will be called to send a buffer worth of data to the peer. 297 * 298 * <p>Writes are subject to the write window of the stream and the connection. Until there is a 299 * window sufficient to send {@code byteCount}, the caller will block. For example, a user of 300 * {@code HttpURLConnection} who flushes more bytes to the output stream than the connection's 301 * write window will block. 302 * 303 * <p>Zero {@code byteCount} writes are not subject to flow control and will not block. The only 304 * use case for zero {@code byteCount} is closing a flushed output stream. 305 */ writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)306 public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount) 307 throws IOException { 308 if (byteCount == 0) { // Empty data frames are not flow-controlled. 309 frameWriter.data(outFinished, streamId, buffer, 0); 310 return; 311 } 312 313 while (byteCount > 0) { 314 int toWrite; 315 synchronized (FramedConnection.this) { 316 try { 317 while (bytesLeftInWriteWindow <= 0) { 318 // Before blocking, confirm that the stream we're writing is still open. It's possible 319 // that the stream has since been closed (such as if this write timed out.) 320 if (!streams.containsKey(streamId)) { 321 throw new IOException("stream closed"); 322 } 323 FramedConnection.this.wait(); // Wait until we receive a WINDOW_UPDATE. 324 } 325 } catch (InterruptedException e) { 326 throw new InterruptedIOException(); 327 } 328 329 toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow); 330 toWrite = Math.min(toWrite, frameWriter.maxDataLength()); 331 bytesLeftInWriteWindow -= toWrite; 332 } 333 334 byteCount -= toWrite; 335 frameWriter.data(outFinished && byteCount == 0, streamId, buffer, toWrite); 336 } 337 } 338 339 /** 340 * {@code delta} will be negative if a settings frame initial window is 341 * smaller than the last. 342 */ addBytesToWriteWindow(long delta)343 void addBytesToWriteWindow(long delta) { 344 bytesLeftInWriteWindow += delta; 345 if (delta > 0) FramedConnection.this.notifyAll(); 346 } 347 writeSynResetLater(final int streamId, final ErrorCode errorCode)348 void writeSynResetLater(final int streamId, final ErrorCode errorCode) { 349 executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) { 350 @Override public void execute() { 351 try { 352 writeSynReset(streamId, errorCode); 353 } catch (IOException ignored) { 354 } 355 } 356 }); 357 } 358 writeSynReset(int streamId, ErrorCode statusCode)359 void writeSynReset(int streamId, ErrorCode statusCode) throws IOException { 360 frameWriter.rstStream(streamId, statusCode); 361 } 362 writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead)363 void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) { 364 executor.execute(new NamedRunnable("OkHttp Window Update %s stream %d", hostName, streamId) { 365 @Override public void execute() { 366 try { 367 frameWriter.windowUpdate(streamId, unacknowledgedBytesRead); 368 } catch (IOException ignored) { 369 } 370 } 371 }); 372 } 373 374 /** 375 * Sends a ping frame to the peer. Use the returned object to await the 376 * ping's response and observe its round trip time. 377 */ ping()378 public Ping ping() throws IOException { 379 Ping ping = new Ping(); 380 int pingId; 381 synchronized (this) { 382 if (shutdown) { 383 throw new IOException("shutdown"); 384 } 385 pingId = nextPingId; 386 nextPingId += 2; 387 if (pings == null) pings = new HashMap<>(); 388 pings.put(pingId, ping); 389 } 390 writePing(false, pingId, 0x4f4b6f6b /* ASCII "OKok" */, ping); 391 return ping; 392 } 393 writePingLater( final boolean reply, final int payload1, final int payload2, final Ping ping)394 private void writePingLater( 395 final boolean reply, final int payload1, final int payload2, final Ping ping) { 396 executor.execute(new NamedRunnable("OkHttp %s ping %08x%08x", 397 hostName, payload1, payload2) { 398 @Override public void execute() { 399 try { 400 writePing(reply, payload1, payload2, ping); 401 } catch (IOException ignored) { 402 } 403 } 404 }); 405 } 406 writePing(boolean reply, int payload1, int payload2, Ping ping)407 private void writePing(boolean reply, int payload1, int payload2, Ping ping) throws IOException { 408 synchronized (frameWriter) { 409 // Observe the sent time immediately before performing I/O. 410 if (ping != null) ping.send(); 411 frameWriter.ping(reply, payload1, payload2); 412 } 413 } 414 removePing(int id)415 private synchronized Ping removePing(int id) { 416 return pings != null ? pings.remove(id) : null; 417 } 418 flush()419 public void flush() throws IOException { 420 frameWriter.flush(); 421 } 422 423 /** 424 * Degrades this connection such that new streams can neither be created 425 * locally, nor accepted from the remote peer. Existing streams are not 426 * impacted. This is intended to permit an endpoint to gracefully stop 427 * accepting new requests without harming previously established streams. 428 */ shutdown(ErrorCode statusCode)429 public void shutdown(ErrorCode statusCode) throws IOException { 430 synchronized (frameWriter) { 431 int lastGoodStreamId; 432 synchronized (this) { 433 if (shutdown) { 434 return; 435 } 436 shutdown = true; 437 lastGoodStreamId = this.lastGoodStreamId; 438 } 439 // TODO: propagate exception message into debugData 440 frameWriter.goAway(lastGoodStreamId, statusCode, Util.EMPTY_BYTE_ARRAY); 441 } 442 } 443 444 /** 445 * Closes this connection. This cancels all open streams and unanswered 446 * pings. It closes the underlying input and output streams and shuts down 447 * internal executor services. 448 */ close()449 @Override public void close() throws IOException { 450 close(ErrorCode.NO_ERROR, ErrorCode.CANCEL); 451 } 452 close(ErrorCode connectionCode, ErrorCode streamCode)453 private void close(ErrorCode connectionCode, ErrorCode streamCode) throws IOException { 454 assert (!Thread.holdsLock(this)); 455 IOException thrown = null; 456 try { 457 shutdown(connectionCode); 458 } catch (IOException e) { 459 thrown = e; 460 } 461 462 FramedStream[] streamsToClose = null; 463 Ping[] pingsToCancel = null; 464 synchronized (this) { 465 if (!streams.isEmpty()) { 466 streamsToClose = streams.values().toArray(new FramedStream[streams.size()]); 467 streams.clear(); 468 setIdle(false); 469 } 470 if (pings != null) { 471 pingsToCancel = pings.values().toArray(new Ping[pings.size()]); 472 pings = null; 473 } 474 } 475 476 if (streamsToClose != null) { 477 for (FramedStream stream : streamsToClose) { 478 try { 479 stream.close(streamCode); 480 } catch (IOException e) { 481 if (thrown != null) thrown = e; 482 } 483 } 484 } 485 486 if (pingsToCancel != null) { 487 for (Ping ping : pingsToCancel) { 488 ping.cancel(); 489 } 490 } 491 492 // Close the writer to release its resources (such as deflaters). 493 try { 494 frameWriter.close(); 495 } catch (IOException e) { 496 if (thrown == null) thrown = e; 497 } 498 499 // Close the socket to break out the reader thread, which will clean up after itself. 500 try { 501 socket.close(); 502 } catch (IOException e) { 503 thrown = e; 504 } 505 506 if (thrown != null) throw thrown; 507 } 508 509 /** 510 * Sends a connection header if the current variant requires it. This should 511 * be called after {@link Builder#build} for all new connections. 512 */ sendConnectionPreface()513 public void sendConnectionPreface() throws IOException { 514 frameWriter.connectionPreface(); 515 frameWriter.settings(okHttpSettings); 516 int windowSize = okHttpSettings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE); 517 if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) { 518 frameWriter.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE); 519 } 520 } 521 522 /** Merges {@code settings} into this peer's settings and sends them to the remote peer. */ setSettings(Settings settings)523 public void setSettings(Settings settings) throws IOException { 524 synchronized (frameWriter) { 525 synchronized (this) { 526 if (shutdown) { 527 throw new IOException("shutdown"); 528 } 529 okHttpSettings.merge(settings); 530 frameWriter.settings(settings); 531 } 532 } 533 } 534 535 public static class Builder { 536 private Socket socket; 537 private String hostName; 538 private BufferedSource source; 539 private BufferedSink sink; 540 private Listener listener = Listener.REFUSE_INCOMING_STREAMS; 541 private Protocol protocol = Protocol.SPDY_3; 542 private PushObserver pushObserver = PushObserver.CANCEL; 543 private boolean client; 544 545 /** 546 * @param client true if this peer initiated the connection; false if this 547 * peer accepted the connection. 548 */ Builder(boolean client)549 public Builder(boolean client) throws IOException { 550 this.client = client; 551 } 552 socket(Socket socket)553 public Builder socket(Socket socket) throws IOException { 554 return socket(socket, ((InetSocketAddress) socket.getRemoteSocketAddress()).getHostName(), 555 Okio.buffer(Okio.source(socket)), Okio.buffer(Okio.sink(socket))); 556 } 557 socket( Socket socket, String hostName, BufferedSource source, BufferedSink sink)558 public Builder socket( 559 Socket socket, String hostName, BufferedSource source, BufferedSink sink) { 560 this.socket = socket; 561 this.hostName = hostName; 562 this.source = source; 563 this.sink = sink; 564 return this; 565 } 566 listener(Listener listener)567 public Builder listener(Listener listener) { 568 this.listener = listener; 569 return this; 570 } 571 protocol(Protocol protocol)572 public Builder protocol(Protocol protocol) { 573 this.protocol = protocol; 574 return this; 575 } 576 pushObserver(PushObserver pushObserver)577 public Builder pushObserver(PushObserver pushObserver) { 578 this.pushObserver = pushObserver; 579 return this; 580 } 581 build()582 public FramedConnection build() throws IOException { 583 return new FramedConnection(this); 584 } 585 } 586 587 /** 588 * Methods in this class must not lock FrameWriter. If a method needs to 589 * write a frame, create an async task to do so. 590 */ 591 class Reader extends NamedRunnable implements FrameReader.Handler { 592 final FrameReader frameReader; 593 Reader(FrameReader frameReader)594 private Reader(FrameReader frameReader) { 595 super("OkHttp %s", hostName); 596 this.frameReader = frameReader; 597 } 598 execute()599 @Override protected void execute() { 600 ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR; 601 ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR; 602 try { 603 if (!client) { 604 frameReader.readConnectionPreface(); 605 } 606 while (frameReader.nextFrame(this)) { 607 } 608 connectionErrorCode = ErrorCode.NO_ERROR; 609 streamErrorCode = ErrorCode.CANCEL; 610 } catch (IOException e) { 611 connectionErrorCode = ErrorCode.PROTOCOL_ERROR; 612 streamErrorCode = ErrorCode.PROTOCOL_ERROR; 613 } finally { 614 try { 615 close(connectionErrorCode, streamErrorCode); 616 } catch (IOException ignored) { 617 } 618 Util.closeQuietly(frameReader); 619 } 620 } 621 data(boolean inFinished, int streamId, BufferedSource source, int length)622 @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length) 623 throws IOException { 624 if (pushedStream(streamId)) { 625 pushDataLater(streamId, source, length, inFinished); 626 return; 627 } 628 FramedStream dataStream = getStream(streamId); 629 if (dataStream == null) { 630 writeSynResetLater(streamId, ErrorCode.INVALID_STREAM); 631 source.skip(length); 632 return; 633 } 634 dataStream.receiveData(source, length); 635 if (inFinished) { 636 dataStream.receiveFin(); 637 } 638 } 639 headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode)640 @Override public void headers(boolean outFinished, boolean inFinished, int streamId, 641 int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode) { 642 if (pushedStream(streamId)) { 643 pushHeadersLater(streamId, headerBlock, inFinished); 644 return; 645 } 646 FramedStream stream; 647 synchronized (FramedConnection.this) { 648 // If we're shutdown, don't bother with this stream. 649 if (shutdown) return; 650 651 stream = getStream(streamId); 652 653 if (stream == null) { 654 // The headers claim to be for an existing stream, but we don't have one. 655 if (headersMode.failIfStreamAbsent()) { 656 writeSynResetLater(streamId, ErrorCode.INVALID_STREAM); 657 return; 658 } 659 660 // If the stream ID is less than the last created ID, assume it's already closed. 661 if (streamId <= lastGoodStreamId) return; 662 663 // If the stream ID is in the client's namespace, assume it's already closed. 664 if (streamId % 2 == nextStreamId % 2) return; 665 666 // Create a stream. 667 final FramedStream 668 newStream = new FramedStream(streamId, FramedConnection.this, outFinished, 669 inFinished, headerBlock); 670 lastGoodStreamId = streamId; 671 streams.put(streamId, newStream); 672 executor.execute(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) { 673 @Override public void execute() { 674 try { 675 listener.onStream(newStream); 676 } catch (IOException e) { 677 logger.log(Level.INFO, "FramedConnection.Listener failure for " + hostName, e); 678 try { 679 newStream.close(ErrorCode.PROTOCOL_ERROR); 680 } catch (IOException ignored) { 681 } 682 } 683 } 684 }); 685 return; 686 } 687 } 688 689 // The headers claim to be for a new stream, but we already have one. 690 if (headersMode.failIfStreamPresent()) { 691 stream.closeLater(ErrorCode.PROTOCOL_ERROR); 692 removeStream(streamId); 693 return; 694 } 695 696 // Update an existing stream. 697 stream.receiveHeaders(headerBlock, headersMode); 698 if (inFinished) stream.receiveFin(); 699 } 700 rstStream(int streamId, ErrorCode errorCode)701 @Override public void rstStream(int streamId, ErrorCode errorCode) { 702 if (pushedStream(streamId)) { 703 pushResetLater(streamId, errorCode); 704 return; 705 } 706 FramedStream rstStream = removeStream(streamId); 707 if (rstStream != null) { 708 rstStream.receiveRstStream(errorCode); 709 } 710 } 711 settings(boolean clearPrevious, Settings newSettings)712 @Override public void settings(boolean clearPrevious, Settings newSettings) { 713 long delta = 0; 714 FramedStream[] streamsToNotify = null; 715 synchronized (FramedConnection.this) { 716 int priorWriteWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 717 if (clearPrevious) peerSettings.clear(); 718 peerSettings.merge(newSettings); 719 if (getProtocol() == Protocol.HTTP_2) { 720 ackSettingsLater(newSettings); 721 } 722 int peerInitialWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 723 if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) { 724 delta = peerInitialWindowSize - priorWriteWindowSize; 725 if (!receivedInitialPeerSettings) { 726 addBytesToWriteWindow(delta); 727 receivedInitialPeerSettings = true; 728 } 729 if (!streams.isEmpty()) { 730 streamsToNotify = streams.values().toArray(new FramedStream[streams.size()]); 731 } 732 } 733 executor.execute(new NamedRunnable("OkHttp %s settings", hostName) { 734 @Override public void execute() { 735 listener.onSettings(FramedConnection.this); 736 } 737 }); 738 } 739 if (streamsToNotify != null && delta != 0) { 740 for (FramedStream stream : streamsToNotify) { 741 synchronized (stream) { 742 stream.addBytesToWriteWindow(delta); 743 } 744 } 745 } 746 } 747 ackSettingsLater(final Settings peerSettings)748 private void ackSettingsLater(final Settings peerSettings) { 749 executor.execute(new NamedRunnable("OkHttp %s ACK Settings", hostName) { 750 @Override public void execute() { 751 try { 752 frameWriter.ackSettings(peerSettings); 753 } catch (IOException ignored) { 754 } 755 } 756 }); 757 } 758 ackSettings()759 @Override public void ackSettings() { 760 // TODO: If we don't get this callback after sending settings to the peer, SETTINGS_TIMEOUT. 761 } 762 ping(boolean reply, int payload1, int payload2)763 @Override public void ping(boolean reply, int payload1, int payload2) { 764 if (reply) { 765 Ping ping = removePing(payload1); 766 if (ping != null) { 767 ping.receive(); 768 } 769 } else { 770 // Send a reply to a client ping if this is a server and vice versa. 771 writePingLater(true, payload1, payload2, null); 772 } 773 } 774 goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData)775 @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) { 776 if (debugData.size() > 0) { // TODO: log the debugData 777 } 778 779 // Copy the streams first. We don't want to hold a lock when we call receiveRstStream(). 780 FramedStream[] streamsCopy; 781 synchronized (FramedConnection.this) { 782 streamsCopy = streams.values().toArray(new FramedStream[streams.size()]); 783 shutdown = true; 784 } 785 786 // Fail all streams created after the last good stream ID. 787 for (FramedStream framedStream : streamsCopy) { 788 if (framedStream.getId() > lastGoodStreamId && framedStream.isLocallyInitiated()) { 789 framedStream.receiveRstStream(ErrorCode.REFUSED_STREAM); 790 removeStream(framedStream.getId()); 791 } 792 } 793 } 794 windowUpdate(int streamId, long windowSizeIncrement)795 @Override public void windowUpdate(int streamId, long windowSizeIncrement) { 796 if (streamId == 0) { 797 synchronized (FramedConnection.this) { 798 bytesLeftInWriteWindow += windowSizeIncrement; 799 FramedConnection.this.notifyAll(); 800 } 801 } else { 802 FramedStream stream = getStream(streamId); 803 if (stream != null) { 804 synchronized (stream) { 805 stream.addBytesToWriteWindow(windowSizeIncrement); 806 } 807 } 808 } 809 } 810 priority(int streamId, int streamDependency, int weight, boolean exclusive)811 @Override public void priority(int streamId, int streamDependency, int weight, 812 boolean exclusive) { 813 // TODO: honor priority. 814 } 815 816 @Override pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)817 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) { 818 pushRequestLater(promisedStreamId, requestHeaders); 819 } 820 alternateService(int streamId, String origin, ByteString protocol, String host, int port, long maxAge)821 @Override public void alternateService(int streamId, String origin, ByteString protocol, 822 String host, int port, long maxAge) { 823 // TODO: register alternate service. 824 } 825 } 826 827 /** Even, positive numbered streams are pushed streams in HTTP/2. */ pushedStream(int streamId)828 private boolean pushedStream(int streamId) { 829 return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0; 830 } 831 832 // Guarded by this. 833 private final Set<Integer> currentPushRequests = new LinkedHashSet<>(); 834 pushRequestLater(final int streamId, final List<Header> requestHeaders)835 private void pushRequestLater(final int streamId, final List<Header> requestHeaders) { 836 synchronized (this) { 837 if (currentPushRequests.contains(streamId)) { 838 writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR); 839 return; 840 } 841 currentPushRequests.add(streamId); 842 } 843 pushExecutor.execute(new NamedRunnable("OkHttp %s Push Request[%s]", hostName, streamId) { 844 @Override public void execute() { 845 boolean cancel = pushObserver.onRequest(streamId, requestHeaders); 846 try { 847 if (cancel) { 848 frameWriter.rstStream(streamId, ErrorCode.CANCEL); 849 synchronized (FramedConnection.this) { 850 currentPushRequests.remove(streamId); 851 } 852 } 853 } catch (IOException ignored) { 854 } 855 } 856 }); 857 } 858 pushHeadersLater(final int streamId, final List<Header> requestHeaders, final boolean inFinished)859 private void pushHeadersLater(final int streamId, final List<Header> requestHeaders, 860 final boolean inFinished) { 861 pushExecutor.execute(new NamedRunnable("OkHttp %s Push Headers[%s]", hostName, streamId) { 862 @Override public void execute() { 863 boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished); 864 try { 865 if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL); 866 if (cancel || inFinished) { 867 synchronized (FramedConnection.this) { 868 currentPushRequests.remove(streamId); 869 } 870 } 871 } catch (IOException ignored) { 872 } 873 } 874 }); 875 } 876 877 /** 878 * Eagerly reads {@code byteCount} bytes from the source before launching a background task to 879 * process the data. This avoids corrupting the stream. 880 */ pushDataLater(final int streamId, final BufferedSource source, final int byteCount, final boolean inFinished)881 private void pushDataLater(final int streamId, final BufferedSource source, final int byteCount, 882 final boolean inFinished) throws IOException { 883 final Buffer buffer = new Buffer(); 884 source.require(byteCount); // Eagerly read the frame before firing client thread. 885 source.read(buffer, byteCount); 886 if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount); 887 pushExecutor.execute(new NamedRunnable("OkHttp %s Push Data[%s]", hostName, streamId) { 888 @Override public void execute() { 889 try { 890 boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished); 891 if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL); 892 if (cancel || inFinished) { 893 synchronized (FramedConnection.this) { 894 currentPushRequests.remove(streamId); 895 } 896 } 897 } catch (IOException ignored) { 898 } 899 } 900 }); 901 } 902 pushResetLater(final int streamId, final ErrorCode errorCode)903 private void pushResetLater(final int streamId, final ErrorCode errorCode) { 904 pushExecutor.execute(new NamedRunnable("OkHttp %s Push Reset[%s]", hostName, streamId) { 905 @Override public void execute() { 906 pushObserver.onReset(streamId, errorCode); 907 synchronized (FramedConnection.this) { 908 currentPushRequests.remove(streamId); 909 } 910 } 911 }); 912 } 913 914 /** Listener of streams and settings initiated by the peer. */ 915 public abstract static class Listener { 916 public static final Listener REFUSE_INCOMING_STREAMS = new Listener() { 917 @Override public void onStream(FramedStream stream) throws IOException { 918 stream.close(ErrorCode.REFUSED_STREAM); 919 } 920 }; 921 922 /** 923 * Handle a new stream from this connection's peer. Implementations should 924 * respond by either {@linkplain FramedStream#reply replying to the stream} 925 * or {@linkplain FramedStream#close closing it}. This response does not 926 * need to be synchronous. 927 */ onStream(FramedStream stream)928 public abstract void onStream(FramedStream stream) throws IOException; 929 930 /** 931 * Notification that the connection's peer's settings may have changed. 932 * Implementations should take appropriate action to handle the updated 933 * settings. 934 * 935 * <p>It is the implementation's responsibility to handle concurrent calls 936 * to this method. A remote peer that sends multiple settings frames will 937 * trigger multiple calls to this method, and those calls are not 938 * necessarily serialized. 939 */ onSettings(FramedConnection connection)940 public void onSettings(FramedConnection connection) { 941 } 942 } 943 } 944