1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.squareup.okhttp.internal.framed; 17 18 import com.squareup.okhttp.Protocol; 19 import com.squareup.okhttp.internal.NamedRunnable; 20 import com.squareup.okhttp.internal.Util; 21 import java.io.Closeable; 22 import java.io.IOException; 23 import java.io.InterruptedIOException; 24 import java.net.InetSocketAddress; 25 import java.net.Socket; 26 import java.util.HashMap; 27 import java.util.LinkedHashSet; 28 import java.util.List; 29 import java.util.Map; 30 import java.util.Set; 31 import java.util.concurrent.ExecutorService; 32 import java.util.concurrent.LinkedBlockingQueue; 33 import java.util.concurrent.SynchronousQueue; 34 import java.util.concurrent.ThreadPoolExecutor; 35 import java.util.concurrent.TimeUnit; 36 import java.util.logging.Level; 37 import okio.Buffer; 38 import okio.BufferedSource; 39 import okio.ByteString; 40 import okio.Okio; 41 42 import static com.squareup.okhttp.internal.Internal.logger; 43 import static com.squareup.okhttp.internal.framed.Settings.DEFAULT_INITIAL_WINDOW_SIZE; 44 45 /** 46 * A socket connection to a remote peer. A connection hosts streams which can 47 * send and receive data. 48 * 49 * <p>Many methods in this API are <strong>synchronous:</strong> the call is 50 * completed before the method returns. This is typical for Java but atypical 51 * for SPDY. This is motivated by exception transparency: an IOException that 52 * was triggered by a certain caller can be caught and handled by that caller. 53 */ 54 public final class FramedConnection implements Closeable { 55 56 // Internal state of this connection is guarded by 'this'. No blocking 57 // operations may be performed while holding this lock! 58 // 59 // Socket writes are guarded by frameWriter. 60 // 61 // Socket reads are unguarded but are only made by the reader thread. 62 // 63 // Certain operations (like SYN_STREAM) need to synchronize on both the 64 // frameWriter (to do blocking I/O) and this (to create streams). Such 65 // operations must synchronize on 'this' last. This ensures that we never 66 // wait for a blocking operation while holding 'this'. 67 68 private static final ExecutorService executor = new ThreadPoolExecutor(0, 69 Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 70 Util.threadFactory("OkHttp FramedConnection", true)); 71 72 /** The protocol variant, like {@link com.squareup.okhttp.internal.framed.Spdy3}. */ 73 final Protocol protocol; 74 75 /** True if this peer initiated the connection. */ 76 final boolean client; 77 78 /** 79 * User code to run in response to an incoming stream. Callbacks must not be 80 * run on the callback executor. 81 */ 82 private final IncomingStreamHandler handler; 83 private final Map<Integer, FramedStream> streams = new HashMap<>(); 84 private final String hostName; 85 private int lastGoodStreamId; 86 private int nextStreamId; 87 private boolean shutdown; 88 private long idleStartTimeNs = System.nanoTime(); 89 90 /** Ensures push promise callbacks events are sent in order per stream. */ 91 private final ExecutorService pushExecutor; 92 93 /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */ 94 private Map<Integer, Ping> pings; 95 /** User code to run in response to push promise events. */ 96 private final PushObserver pushObserver; 97 private int nextPingId; 98 99 /** 100 * The total number of bytes consumed by the application, but not yet 101 * acknowledged by sending a {@code WINDOW_UPDATE} frame on this connection. 102 */ 103 // Visible for testing 104 long unacknowledgedBytesRead = 0; 105 106 /** 107 * Count of bytes that can be written on the connection before receiving a 108 * window update. 109 */ 110 // Visible for testing 111 long bytesLeftInWriteWindow; 112 113 /** Settings we communicate to the peer. */ 114 // TODO: Do we want to dynamically adjust settings, or KISS and only set once? 115 final Settings okHttpSettings = new Settings(); 116 // okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max); 117 private static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024; 118 119 /** Settings we receive from the peer. */ 120 // TODO: MWS will need to guard on this setting before attempting to push. 121 final Settings peerSettings = new Settings(); 122 123 private boolean receivedInitialPeerSettings = false; 124 final Variant variant; 125 final Socket socket; 126 final FrameWriter frameWriter; 127 128 // Visible for testing 129 final Reader readerRunnable; 130 FramedConnection(Builder builder)131 private FramedConnection(Builder builder) throws IOException { 132 protocol = builder.protocol; 133 pushObserver = builder.pushObserver; 134 client = builder.client; 135 handler = builder.handler; 136 // http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-5.1.1 137 nextStreamId = builder.client ? 1 : 2; 138 if (builder.client && protocol == Protocol.HTTP_2) { 139 nextStreamId += 2; // In HTTP/2, 1 on client is reserved for Upgrade. 140 } 141 142 nextPingId = builder.client ? 1 : 2; 143 144 // Flow control was designed more for servers, or proxies than edge clients. 145 // If we are a client, set the flow control window to 16MiB. This avoids 146 // thrashing window updates every 64KiB, yet small enough to avoid blowing 147 // up the heap. 148 if (builder.client) { 149 okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, OKHTTP_CLIENT_WINDOW_SIZE); 150 } 151 152 hostName = builder.hostName; 153 154 if (protocol == Protocol.HTTP_2) { 155 variant = new Http2(); 156 // Like newSingleThreadExecutor, except lazy creates the thread. 157 pushExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, 158 new LinkedBlockingQueue<Runnable>(), 159 Util.threadFactory(String.format("OkHttp %s Push Observer", hostName), true)); 160 // 1 less than SPDY http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.9.2 161 peerSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, 65535); 162 peerSettings.set(Settings.MAX_FRAME_SIZE, 0, Http2.INITIAL_MAX_FRAME_SIZE); 163 } else if (protocol == Protocol.SPDY_3) { 164 variant = new Spdy3(); 165 pushExecutor = null; 166 } else { 167 throw new AssertionError(protocol); 168 } 169 bytesLeftInWriteWindow = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 170 socket = builder.socket; 171 frameWriter = variant.newWriter(Okio.buffer(Okio.sink(builder.socket)), client); 172 173 readerRunnable = new Reader(); 174 new Thread(readerRunnable).start(); // Not a daemon thread. 175 } 176 177 /** The protocol as selected using ALPN. */ getProtocol()178 public Protocol getProtocol() { 179 return protocol; 180 } 181 182 /** 183 * Returns the number of {@link FramedStream#isOpen() open streams} on this 184 * connection. 185 */ openStreamCount()186 public synchronized int openStreamCount() { 187 return streams.size(); 188 } 189 getStream(int id)190 synchronized FramedStream getStream(int id) { 191 return streams.get(id); 192 } 193 removeStream(int streamId)194 synchronized FramedStream removeStream(int streamId) { 195 FramedStream stream = streams.remove(streamId); 196 if (stream != null && streams.isEmpty()) { 197 setIdle(true); 198 } 199 notifyAll(); // The removed stream may be blocked on a connection-wide window update. 200 return stream; 201 } 202 setIdle(boolean value)203 private synchronized void setIdle(boolean value) { 204 idleStartTimeNs = value ? System.nanoTime() : Long.MAX_VALUE; 205 } 206 207 /** Returns true if this connection is idle. */ isIdle()208 public synchronized boolean isIdle() { 209 return idleStartTimeNs != Long.MAX_VALUE; 210 } 211 212 /** 213 * Returns the time in ns when this connection became idle or Long.MAX_VALUE 214 * if connection is not idle. 215 */ getIdleStartTimeNs()216 public synchronized long getIdleStartTimeNs() { 217 return idleStartTimeNs; 218 } 219 220 /** 221 * Returns a new server-initiated stream. 222 * 223 * @param associatedStreamId the stream that triggered the sender to create 224 * this stream. 225 * @param out true to create an output stream that we can use to send data 226 * to the remote peer. Corresponds to {@code FLAG_FIN}. 227 */ pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out)228 public FramedStream pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out) 229 throws IOException { 230 if (client) throw new IllegalStateException("Client cannot push requests."); 231 if (protocol != Protocol.HTTP_2) throw new IllegalStateException("protocol != HTTP_2"); 232 return newStream(associatedStreamId, requestHeaders, out, false); 233 } 234 235 /** 236 * Returns a new locally-initiated stream. 237 * 238 * @param out true to create an output stream that we can use to send data to the remote peer. 239 * Corresponds to {@code FLAG_FIN}. 240 * @param in true to create an input stream that the remote peer can use to send data to us. 241 * Corresponds to {@code FLAG_UNIDIRECTIONAL}. 242 */ newStream(List<Header> requestHeaders, boolean out, boolean in)243 public FramedStream newStream(List<Header> requestHeaders, boolean out, boolean in) 244 throws IOException { 245 return newStream(0, requestHeaders, out, in); 246 } 247 newStream(int associatedStreamId, List<Header> requestHeaders, boolean out, boolean in)248 private FramedStream newStream(int associatedStreamId, List<Header> requestHeaders, boolean out, 249 boolean in) throws IOException { 250 boolean outFinished = !out; 251 boolean inFinished = !in; 252 FramedStream stream; 253 int streamId; 254 255 synchronized (frameWriter) { 256 synchronized (this) { 257 if (shutdown) { 258 throw new IOException("shutdown"); 259 } 260 streamId = nextStreamId; 261 nextStreamId += 2; 262 stream = new FramedStream(streamId, this, outFinished, inFinished, requestHeaders); 263 if (stream.isOpen()) { 264 streams.put(streamId, stream); 265 setIdle(false); 266 } 267 } 268 if (associatedStreamId == 0) { 269 frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, 270 requestHeaders); 271 } else if (client) { 272 throw new IllegalArgumentException("client streams shouldn't have associated stream IDs"); 273 } else { // HTTP/2 has a PUSH_PROMISE frame. 274 frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders); 275 } 276 } 277 278 if (!out) { 279 frameWriter.flush(); 280 } 281 282 return stream; 283 } 284 writeSynReply(int streamId, boolean outFinished, List<Header> alternating)285 void writeSynReply(int streamId, boolean outFinished, List<Header> alternating) 286 throws IOException { 287 frameWriter.synReply(outFinished, streamId, alternating); 288 } 289 290 /** 291 * Callers of this method are not thread safe, and sometimes on application threads. Most often, 292 * this method will be called to send a buffer worth of data to the peer. 293 * 294 * <p>Writes are subject to the write window of the stream and the connection. Until there is a 295 * window sufficient to send {@code byteCount}, the caller will block. For example, a user of 296 * {@code HttpURLConnection} who flushes more bytes to the output stream than the connection's 297 * write window will block. 298 * 299 * <p>Zero {@code byteCount} writes are not subject to flow control and will not block. The only 300 * use case for zero {@code byteCount} is closing a flushed output stream. 301 */ writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)302 public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount) 303 throws IOException { 304 if (byteCount == 0) { // Empty data frames are not flow-controlled. 305 frameWriter.data(outFinished, streamId, buffer, 0); 306 return; 307 } 308 309 while (byteCount > 0) { 310 int toWrite; 311 synchronized (FramedConnection.this) { 312 try { 313 while (bytesLeftInWriteWindow <= 0) { 314 // Before blocking, confirm that the stream we're writing is still open. It's possible 315 // that the stream has since been closed (such as if this write timed out.) 316 if (!streams.containsKey(streamId)) { 317 throw new IOException("stream closed"); 318 } 319 FramedConnection.this.wait(); // Wait until we receive a WINDOW_UPDATE. 320 } 321 } catch (InterruptedException e) { 322 throw new InterruptedIOException(); 323 } 324 325 toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow); 326 toWrite = Math.min(toWrite, frameWriter.maxDataLength()); 327 bytesLeftInWriteWindow -= toWrite; 328 } 329 330 byteCount -= toWrite; 331 frameWriter.data(outFinished && byteCount == 0, streamId, buffer, toWrite); 332 } 333 } 334 335 /** 336 * {@code delta} will be negative if a settings frame initial window is 337 * smaller than the last. 338 */ addBytesToWriteWindow(long delta)339 void addBytesToWriteWindow(long delta) { 340 bytesLeftInWriteWindow += delta; 341 if (delta > 0) FramedConnection.this.notifyAll(); 342 } 343 writeSynResetLater(final int streamId, final ErrorCode errorCode)344 void writeSynResetLater(final int streamId, final ErrorCode errorCode) { 345 executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) { 346 @Override public void execute() { 347 try { 348 writeSynReset(streamId, errorCode); 349 } catch (IOException ignored) { 350 } 351 } 352 }); 353 } 354 writeSynReset(int streamId, ErrorCode statusCode)355 void writeSynReset(int streamId, ErrorCode statusCode) throws IOException { 356 frameWriter.rstStream(streamId, statusCode); 357 } 358 writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead)359 void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) { 360 executor.execute(new NamedRunnable("OkHttp Window Update %s stream %d", hostName, streamId) { 361 @Override public void execute() { 362 try { 363 frameWriter.windowUpdate(streamId, unacknowledgedBytesRead); 364 } catch (IOException ignored) { 365 } 366 } 367 }); 368 } 369 370 /** 371 * Sends a ping frame to the peer. Use the returned object to await the 372 * ping's response and observe its round trip time. 373 */ ping()374 public Ping ping() throws IOException { 375 Ping ping = new Ping(); 376 int pingId; 377 synchronized (this) { 378 if (shutdown) { 379 throw new IOException("shutdown"); 380 } 381 pingId = nextPingId; 382 nextPingId += 2; 383 if (pings == null) pings = new HashMap<>(); 384 pings.put(pingId, ping); 385 } 386 writePing(false, pingId, 0x4f4b6f6b /* ASCII "OKok" */, ping); 387 return ping; 388 } 389 writePingLater( final boolean reply, final int payload1, final int payload2, final Ping ping)390 private void writePingLater( 391 final boolean reply, final int payload1, final int payload2, final Ping ping) { 392 executor.execute(new NamedRunnable("OkHttp %s ping %08x%08x", 393 hostName, payload1, payload2) { 394 @Override public void execute() { 395 try { 396 writePing(reply, payload1, payload2, ping); 397 } catch (IOException ignored) { 398 } 399 } 400 }); 401 } 402 writePing(boolean reply, int payload1, int payload2, Ping ping)403 private void writePing(boolean reply, int payload1, int payload2, Ping ping) throws IOException { 404 synchronized (frameWriter) { 405 // Observe the sent time immediately before performing I/O. 406 if (ping != null) ping.send(); 407 frameWriter.ping(reply, payload1, payload2); 408 } 409 } 410 removePing(int id)411 private synchronized Ping removePing(int id) { 412 return pings != null ? pings.remove(id) : null; 413 } 414 flush()415 public void flush() throws IOException { 416 frameWriter.flush(); 417 } 418 419 /** 420 * Degrades this connection such that new streams can neither be created 421 * locally, nor accepted from the remote peer. Existing streams are not 422 * impacted. This is intended to permit an endpoint to gracefully stop 423 * accepting new requests without harming previously established streams. 424 */ shutdown(ErrorCode statusCode)425 public void shutdown(ErrorCode statusCode) throws IOException { 426 synchronized (frameWriter) { 427 int lastGoodStreamId; 428 synchronized (this) { 429 if (shutdown) { 430 return; 431 } 432 shutdown = true; 433 lastGoodStreamId = this.lastGoodStreamId; 434 } 435 // TODO: propagate exception message into debugData 436 frameWriter.goAway(lastGoodStreamId, statusCode, Util.EMPTY_BYTE_ARRAY); 437 } 438 } 439 440 /** 441 * Closes this connection. This cancels all open streams and unanswered 442 * pings. It closes the underlying input and output streams and shuts down 443 * internal executor services. 444 */ close()445 @Override public void close() throws IOException { 446 close(ErrorCode.NO_ERROR, ErrorCode.CANCEL); 447 } 448 close(ErrorCode connectionCode, ErrorCode streamCode)449 private void close(ErrorCode connectionCode, ErrorCode streamCode) throws IOException { 450 assert (!Thread.holdsLock(this)); 451 IOException thrown = null; 452 try { 453 shutdown(connectionCode); 454 } catch (IOException e) { 455 thrown = e; 456 } 457 458 FramedStream[] streamsToClose = null; 459 Ping[] pingsToCancel = null; 460 synchronized (this) { 461 if (!streams.isEmpty()) { 462 streamsToClose = streams.values().toArray(new FramedStream[streams.size()]); 463 streams.clear(); 464 setIdle(false); 465 } 466 if (pings != null) { 467 pingsToCancel = pings.values().toArray(new Ping[pings.size()]); 468 pings = null; 469 } 470 } 471 472 if (streamsToClose != null) { 473 for (FramedStream stream : streamsToClose) { 474 try { 475 stream.close(streamCode); 476 } catch (IOException e) { 477 if (thrown != null) thrown = e; 478 } 479 } 480 } 481 482 if (pingsToCancel != null) { 483 for (Ping ping : pingsToCancel) { 484 ping.cancel(); 485 } 486 } 487 488 // Close the writer to release its resources (such as deflaters). 489 try { 490 frameWriter.close(); 491 } catch (IOException e) { 492 if (thrown == null) thrown = e; 493 } 494 495 // Close the socket to break out the reader thread, which will clean up after itself. 496 try { 497 socket.close(); 498 } catch (IOException e) { 499 thrown = e; 500 } 501 502 if (thrown != null) throw thrown; 503 } 504 505 /** 506 * Sends a connection header if the current variant requires it. This should 507 * be called after {@link Builder#build} for all new connections. 508 */ sendConnectionPreface()509 public void sendConnectionPreface() throws IOException { 510 frameWriter.connectionPreface(); 511 frameWriter.settings(okHttpSettings); 512 int windowSize = okHttpSettings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE); 513 if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) { 514 frameWriter.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE); 515 } 516 } 517 518 public static class Builder { 519 private String hostName; 520 private Socket socket; 521 private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS; 522 private Protocol protocol = Protocol.SPDY_3; 523 private PushObserver pushObserver = PushObserver.CANCEL; 524 private boolean client; 525 Builder(boolean client, Socket socket)526 public Builder(boolean client, Socket socket) throws IOException { 527 this(((InetSocketAddress) socket.getRemoteSocketAddress()).getHostName(), client, socket); 528 } 529 530 /** 531 * @param client true if this peer initiated the connection; false if this 532 * peer accepted the connection. 533 */ Builder(String hostName, boolean client, Socket socket)534 public Builder(String hostName, boolean client, Socket socket) throws IOException { 535 this.hostName = hostName; 536 this.client = client; 537 this.socket = socket; 538 } 539 handler(IncomingStreamHandler handler)540 public Builder handler(IncomingStreamHandler handler) { 541 this.handler = handler; 542 return this; 543 } 544 protocol(Protocol protocol)545 public Builder protocol(Protocol protocol) { 546 this.protocol = protocol; 547 return this; 548 } 549 pushObserver(PushObserver pushObserver)550 public Builder pushObserver(PushObserver pushObserver) { 551 this.pushObserver = pushObserver; 552 return this; 553 } 554 build()555 public FramedConnection build() throws IOException { 556 return new FramedConnection(this); 557 } 558 } 559 560 /** 561 * Methods in this class must not lock FrameWriter. If a method needs to 562 * write a frame, create an async task to do so. 563 */ 564 class Reader extends NamedRunnable implements FrameReader.Handler { 565 FrameReader frameReader; 566 Reader()567 private Reader() { 568 super("OkHttp %s", hostName); 569 } 570 execute()571 @Override protected void execute() { 572 ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR; 573 ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR; 574 try { 575 frameReader = variant.newReader(Okio.buffer(Okio.source(socket)), client); 576 if (!client) { 577 frameReader.readConnectionPreface(); 578 } 579 while (frameReader.nextFrame(this)) { 580 } 581 connectionErrorCode = ErrorCode.NO_ERROR; 582 streamErrorCode = ErrorCode.CANCEL; 583 } catch (IOException e) { 584 connectionErrorCode = ErrorCode.PROTOCOL_ERROR; 585 streamErrorCode = ErrorCode.PROTOCOL_ERROR; 586 } finally { 587 try { 588 close(connectionErrorCode, streamErrorCode); 589 } catch (IOException ignored) { 590 } 591 Util.closeQuietly(frameReader); 592 } 593 } 594 data(boolean inFinished, int streamId, BufferedSource source, int length)595 @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length) 596 throws IOException { 597 if (pushedStream(streamId)) { 598 pushDataLater(streamId, source, length, inFinished); 599 return; 600 } 601 FramedStream dataStream = getStream(streamId); 602 if (dataStream == null) { 603 writeSynResetLater(streamId, ErrorCode.INVALID_STREAM); 604 source.skip(length); 605 return; 606 } 607 dataStream.receiveData(source, length); 608 if (inFinished) { 609 dataStream.receiveFin(); 610 } 611 } 612 headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode)613 @Override public void headers(boolean outFinished, boolean inFinished, int streamId, 614 int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode) { 615 if (pushedStream(streamId)) { 616 pushHeadersLater(streamId, headerBlock, inFinished); 617 return; 618 } 619 FramedStream stream; 620 synchronized (FramedConnection.this) { 621 // If we're shutdown, don't bother with this stream. 622 if (shutdown) return; 623 624 stream = getStream(streamId); 625 626 if (stream == null) { 627 // The headers claim to be for an existing stream, but we don't have one. 628 if (headersMode.failIfStreamAbsent()) { 629 writeSynResetLater(streamId, ErrorCode.INVALID_STREAM); 630 return; 631 } 632 633 // If the stream ID is less than the last created ID, assume it's already closed. 634 if (streamId <= lastGoodStreamId) return; 635 636 // If the stream ID is in the client's namespace, assume it's already closed. 637 if (streamId % 2 == nextStreamId % 2) return; 638 639 // Create a stream. 640 final FramedStream 641 newStream = new FramedStream(streamId, FramedConnection.this, outFinished, 642 inFinished, headerBlock); 643 lastGoodStreamId = streamId; 644 streams.put(streamId, newStream); 645 executor.execute(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) { 646 @Override public void execute() { 647 try { 648 handler.receive(newStream); 649 } catch (IOException e) { 650 logger.log(Level.INFO, "StreamHandler failure for " + hostName, e); 651 try { 652 newStream.close(ErrorCode.PROTOCOL_ERROR); 653 } catch (IOException ignored) { 654 } 655 } 656 } 657 }); 658 return; 659 } 660 } 661 662 // The headers claim to be for a new stream, but we already have one. 663 if (headersMode.failIfStreamPresent()) { 664 stream.closeLater(ErrorCode.PROTOCOL_ERROR); 665 removeStream(streamId); 666 return; 667 } 668 669 // Update an existing stream. 670 stream.receiveHeaders(headerBlock, headersMode); 671 if (inFinished) stream.receiveFin(); 672 } 673 rstStream(int streamId, ErrorCode errorCode)674 @Override public void rstStream(int streamId, ErrorCode errorCode) { 675 if (pushedStream(streamId)) { 676 pushResetLater(streamId, errorCode); 677 return; 678 } 679 FramedStream rstStream = removeStream(streamId); 680 if (rstStream != null) { 681 rstStream.receiveRstStream(errorCode); 682 } 683 } 684 settings(boolean clearPrevious, Settings newSettings)685 @Override public void settings(boolean clearPrevious, Settings newSettings) { 686 long delta = 0; 687 FramedStream[] streamsToNotify = null; 688 synchronized (FramedConnection.this) { 689 int priorWriteWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 690 if (clearPrevious) peerSettings.clear(); 691 peerSettings.merge(newSettings); 692 if (getProtocol() == Protocol.HTTP_2) { 693 ackSettingsLater(newSettings); 694 } 695 int peerInitialWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE); 696 if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) { 697 delta = peerInitialWindowSize - priorWriteWindowSize; 698 if (!receivedInitialPeerSettings) { 699 addBytesToWriteWindow(delta); 700 receivedInitialPeerSettings = true; 701 } 702 if (!streams.isEmpty()) { 703 streamsToNotify = streams.values().toArray(new FramedStream[streams.size()]); 704 } 705 } 706 } 707 if (streamsToNotify != null && delta != 0) { 708 for (FramedStream stream : streamsToNotify) { 709 synchronized (stream) { 710 stream.addBytesToWriteWindow(delta); 711 } 712 } 713 } 714 } 715 ackSettingsLater(final Settings peerSettings)716 private void ackSettingsLater(final Settings peerSettings) { 717 executor.execute(new NamedRunnable("OkHttp %s ACK Settings", hostName) { 718 @Override public void execute() { 719 try { 720 frameWriter.ackSettings(peerSettings); 721 } catch (IOException ignored) { 722 } 723 } 724 }); 725 } 726 ackSettings()727 @Override public void ackSettings() { 728 // TODO: If we don't get this callback after sending settings to the peer, SETTINGS_TIMEOUT. 729 } 730 ping(boolean reply, int payload1, int payload2)731 @Override public void ping(boolean reply, int payload1, int payload2) { 732 if (reply) { 733 Ping ping = removePing(payload1); 734 if (ping != null) { 735 ping.receive(); 736 } 737 } else { 738 // Send a reply to a client ping if this is a server and vice versa. 739 writePingLater(true, payload1, payload2, null); 740 } 741 } 742 goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData)743 @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) { 744 if (debugData.size() > 0) { // TODO: log the debugData 745 } 746 747 // Copy the streams first. We don't want to hold a lock when we call receiveRstStream(). 748 FramedStream[] streamsCopy; 749 synchronized (FramedConnection.this) { 750 streamsCopy = streams.values().toArray(new FramedStream[streams.size()]); 751 shutdown = true; 752 } 753 754 // Fail all streams created after the last good stream ID. 755 for (FramedStream framedStream : streamsCopy) { 756 if (framedStream.getId() > lastGoodStreamId && framedStream.isLocallyInitiated()) { 757 framedStream.receiveRstStream(ErrorCode.REFUSED_STREAM); 758 removeStream(framedStream.getId()); 759 } 760 } 761 } 762 windowUpdate(int streamId, long windowSizeIncrement)763 @Override public void windowUpdate(int streamId, long windowSizeIncrement) { 764 if (streamId == 0) { 765 synchronized (FramedConnection.this) { 766 bytesLeftInWriteWindow += windowSizeIncrement; 767 FramedConnection.this.notifyAll(); 768 } 769 } else { 770 FramedStream stream = getStream(streamId); 771 if (stream != null) { 772 synchronized (stream) { 773 stream.addBytesToWriteWindow(windowSizeIncrement); 774 } 775 } 776 } 777 } 778 priority(int streamId, int streamDependency, int weight, boolean exclusive)779 @Override public void priority(int streamId, int streamDependency, int weight, 780 boolean exclusive) { 781 // TODO: honor priority. 782 } 783 784 @Override pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)785 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) { 786 pushRequestLater(promisedStreamId, requestHeaders); 787 } 788 alternateService(int streamId, String origin, ByteString protocol, String host, int port, long maxAge)789 @Override public void alternateService(int streamId, String origin, ByteString protocol, 790 String host, int port, long maxAge) { 791 // TODO: register alternate service. 792 } 793 } 794 795 /** Even, positive numbered streams are pushed streams in HTTP/2. */ pushedStream(int streamId)796 private boolean pushedStream(int streamId) { 797 return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0; 798 } 799 800 // Guarded by this. 801 private final Set<Integer> currentPushRequests = new LinkedHashSet<>(); 802 pushRequestLater(final int streamId, final List<Header> requestHeaders)803 private void pushRequestLater(final int streamId, final List<Header> requestHeaders) { 804 synchronized (this) { 805 if (currentPushRequests.contains(streamId)) { 806 writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR); 807 return; 808 } 809 currentPushRequests.add(streamId); 810 } 811 pushExecutor.execute(new NamedRunnable("OkHttp %s Push Request[%s]", hostName, streamId) { 812 @Override public void execute() { 813 boolean cancel = pushObserver.onRequest(streamId, requestHeaders); 814 try { 815 if (cancel) { 816 frameWriter.rstStream(streamId, ErrorCode.CANCEL); 817 synchronized (FramedConnection.this) { 818 currentPushRequests.remove(streamId); 819 } 820 } 821 } catch (IOException ignored) { 822 } 823 } 824 }); 825 } 826 pushHeadersLater(final int streamId, final List<Header> requestHeaders, final boolean inFinished)827 private void pushHeadersLater(final int streamId, final List<Header> requestHeaders, 828 final boolean inFinished) { 829 pushExecutor.execute(new NamedRunnable("OkHttp %s Push Headers[%s]", hostName, streamId) { 830 @Override public void execute() { 831 boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished); 832 try { 833 if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL); 834 if (cancel || inFinished) { 835 synchronized (FramedConnection.this) { 836 currentPushRequests.remove(streamId); 837 } 838 } 839 } catch (IOException ignored) { 840 } 841 } 842 }); 843 } 844 845 /** 846 * Eagerly reads {@code byteCount} bytes from the source before launching a background task to 847 * process the data. This avoids corrupting the stream. 848 */ pushDataLater(final int streamId, final BufferedSource source, final int byteCount, final boolean inFinished)849 private void pushDataLater(final int streamId, final BufferedSource source, final int byteCount, 850 final boolean inFinished) throws IOException { 851 final Buffer buffer = new Buffer(); 852 source.require(byteCount); // Eagerly read the frame before firing client thread. 853 source.read(buffer, byteCount); 854 if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount); 855 pushExecutor.execute(new NamedRunnable("OkHttp %s Push Data[%s]", hostName, streamId) { 856 @Override public void execute() { 857 try { 858 boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished); 859 if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL); 860 if (cancel || inFinished) { 861 synchronized (FramedConnection.this) { 862 currentPushRequests.remove(streamId); 863 } 864 } 865 } catch (IOException ignored) { 866 } 867 } 868 }); 869 } 870 pushResetLater(final int streamId, final ErrorCode errorCode)871 private void pushResetLater(final int streamId, final ErrorCode errorCode) { 872 pushExecutor.execute(new NamedRunnable("OkHttp %s Push Reset[%s]", hostName, streamId) { 873 @Override public void execute() { 874 pushObserver.onReset(streamId, errorCode); 875 synchronized (FramedConnection.this) { 876 currentPushRequests.remove(streamId); 877 } 878 } 879 }); 880 } 881 } 882