1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.okhttp; 18 19 import static com.google.common.base.Preconditions.checkState; 20 import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE; 21 import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO; 22 23 import com.google.common.annotations.VisibleForTesting; 24 import com.google.common.base.MoreObjects; 25 import com.google.common.base.Preconditions; 26 import com.google.common.base.Stopwatch; 27 import com.google.common.base.Supplier; 28 import com.google.common.util.concurrent.ListenableFuture; 29 import com.google.common.util.concurrent.SettableFuture; 30 import io.grpc.Attributes; 31 import io.grpc.CallOptions; 32 import io.grpc.ClientStreamTracer; 33 import io.grpc.Grpc; 34 import io.grpc.HttpConnectProxiedSocketAddress; 35 import io.grpc.InternalChannelz; 36 import io.grpc.InternalChannelz.SocketStats; 37 import io.grpc.InternalLogId; 38 import io.grpc.Metadata; 39 import io.grpc.MethodDescriptor; 40 import io.grpc.MethodDescriptor.MethodType; 41 import io.grpc.SecurityLevel; 42 import io.grpc.Status; 43 import io.grpc.Status.Code; 44 import io.grpc.StatusException; 45 import io.grpc.internal.ClientStreamListener.RpcProgress; 46 import io.grpc.internal.ConnectionClientTransport; 47 import io.grpc.internal.GrpcAttributes; 48 import io.grpc.internal.GrpcUtil; 49 import io.grpc.internal.Http2Ping; 50 import io.grpc.internal.InUseStateAggregator; 51 import io.grpc.internal.KeepAliveManager; 52 import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger; 53 import io.grpc.internal.SerializingExecutor; 54 import io.grpc.internal.StatsTraceContext; 55 import io.grpc.internal.TransportTracer; 56 import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler; 57 import io.grpc.okhttp.internal.ConnectionSpec; 58 import io.grpc.okhttp.internal.Credentials; 59 import io.grpc.okhttp.internal.StatusLine; 60 import io.grpc.okhttp.internal.framed.ErrorCode; 61 import io.grpc.okhttp.internal.framed.FrameReader; 62 import io.grpc.okhttp.internal.framed.FrameWriter; 63 import io.grpc.okhttp.internal.framed.Header; 64 import io.grpc.okhttp.internal.framed.HeadersMode; 65 import io.grpc.okhttp.internal.framed.Http2; 66 import io.grpc.okhttp.internal.framed.Settings; 67 import io.grpc.okhttp.internal.framed.Variant; 68 import io.grpc.okhttp.internal.proxy.HttpUrl; 69 import io.grpc.okhttp.internal.proxy.Request; 70 import io.perfmark.PerfMark; 71 import java.io.EOFException; 72 import java.io.IOException; 73 import java.net.InetSocketAddress; 74 import java.net.Socket; 75 import java.net.URI; 76 import java.util.Collections; 77 import java.util.Deque; 78 import java.util.EnumMap; 79 import java.util.HashMap; 80 import java.util.Iterator; 81 import java.util.LinkedList; 82 import java.util.List; 83 import java.util.Locale; 84 import java.util.Map; 85 import java.util.Random; 86 import java.util.concurrent.CountDownLatch; 87 import java.util.concurrent.Executor; 88 import java.util.concurrent.ScheduledExecutorService; 89 import java.util.logging.Level; 90 import java.util.logging.Logger; 91 import javax.annotation.Nullable; 92 import javax.annotation.concurrent.GuardedBy; 93 import javax.net.SocketFactory; 94 import javax.net.ssl.HostnameVerifier; 95 import javax.net.ssl.SSLSession; 96 import javax.net.ssl.SSLSocket; 97 import javax.net.ssl.SSLSocketFactory; 98 import okio.Buffer; 99 import okio.BufferedSink; 100 import okio.BufferedSource; 101 import okio.ByteString; 102 import okio.Okio; 103 import okio.Source; 104 import okio.Timeout; 105 106 /** 107 * A okhttp-based {@link ConnectionClientTransport} implementation. 108 */ 109 class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler, 110 OutboundFlowController.Transport { 111 private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap(); 112 private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName()); 113 buildErrorCodeToStatusMap()114 private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() { 115 Map<ErrorCode, Status> errorToStatus = new EnumMap<>(ErrorCode.class); 116 errorToStatus.put(ErrorCode.NO_ERROR, 117 Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent")); 118 errorToStatus.put(ErrorCode.PROTOCOL_ERROR, 119 Status.INTERNAL.withDescription("Protocol error")); 120 errorToStatus.put(ErrorCode.INTERNAL_ERROR, 121 Status.INTERNAL.withDescription("Internal error")); 122 errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR, 123 Status.INTERNAL.withDescription("Flow control error")); 124 errorToStatus.put(ErrorCode.STREAM_CLOSED, 125 Status.INTERNAL.withDescription("Stream closed")); 126 errorToStatus.put(ErrorCode.FRAME_TOO_LARGE, 127 Status.INTERNAL.withDescription("Frame too large")); 128 errorToStatus.put(ErrorCode.REFUSED_STREAM, 129 Status.UNAVAILABLE.withDescription("Refused stream")); 130 errorToStatus.put(ErrorCode.CANCEL, 131 Status.CANCELLED.withDescription("Cancelled")); 132 errorToStatus.put(ErrorCode.COMPRESSION_ERROR, 133 Status.INTERNAL.withDescription("Compression error")); 134 errorToStatus.put(ErrorCode.CONNECT_ERROR, 135 Status.INTERNAL.withDescription("Connect error")); 136 errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM, 137 Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm")); 138 errorToStatus.put(ErrorCode.INADEQUATE_SECURITY, 139 Status.PERMISSION_DENIED.withDescription("Inadequate security")); 140 return Collections.unmodifiableMap(errorToStatus); 141 } 142 143 private final InetSocketAddress address; 144 private final String defaultAuthority; 145 private final String userAgent; 146 private final Random random = new Random(); 147 // Returns new unstarted stopwatches 148 private final Supplier<Stopwatch> stopwatchFactory; 149 private final int initialWindowSize; 150 private final Variant variant; 151 private Listener listener; 152 @GuardedBy("lock") 153 private ExceptionHandlingFrameWriter frameWriter; 154 private OutboundFlowController outboundFlow; 155 private final Object lock = new Object(); 156 private final InternalLogId logId; 157 @GuardedBy("lock") 158 private int nextStreamId; 159 @GuardedBy("lock") 160 private final Map<Integer, OkHttpClientStream> streams = new HashMap<>(); 161 private final Executor executor; 162 // Wrap on executor, to guarantee some operations be executed serially. 163 private final SerializingExecutor serializingExecutor; 164 private final ScheduledExecutorService scheduler; 165 private final int maxMessageSize; 166 private int connectionUnacknowledgedBytesRead; 167 private ClientFrameHandler clientFrameHandler; 168 // Caution: Not synchronized, new value can only be safely read after the connection is complete. 169 private Attributes attributes; 170 /** 171 * Indicates the transport is in go-away state: no new streams will be processed, but existing 172 * streams may continue. 173 */ 174 @GuardedBy("lock") 175 private Status goAwayStatus; 176 @GuardedBy("lock") 177 private boolean goAwaySent; 178 @GuardedBy("lock") 179 private Http2Ping ping; 180 @GuardedBy("lock") 181 private boolean stopped; 182 @GuardedBy("lock") 183 private boolean hasStream; 184 private final SocketFactory socketFactory; 185 private SSLSocketFactory sslSocketFactory; 186 private HostnameVerifier hostnameVerifier; 187 private Socket socket; 188 @GuardedBy("lock") 189 private int maxConcurrentStreams = 0; 190 @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty 191 @GuardedBy("lock") 192 private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>(); 193 private final ConnectionSpec connectionSpec; 194 private KeepAliveManager keepAliveManager; 195 private boolean enableKeepAlive; 196 private long keepAliveTimeNanos; 197 private long keepAliveTimeoutNanos; 198 private boolean keepAliveWithoutCalls; 199 private final Runnable tooManyPingsRunnable; 200 private final int maxInboundMetadataSize; 201 private final boolean useGetForSafeMethods; 202 @GuardedBy("lock") 203 private final TransportTracer transportTracer; 204 @GuardedBy("lock") 205 private final InUseStateAggregator<OkHttpClientStream> inUseState = 206 new InUseStateAggregator<OkHttpClientStream>() { 207 @Override 208 protected void handleInUse() { 209 listener.transportInUse(true); 210 } 211 212 @Override 213 protected void handleNotInUse() { 214 listener.transportInUse(false); 215 } 216 }; 217 @GuardedBy("lock") 218 private InternalChannelz.Security securityInfo; 219 220 @VisibleForTesting 221 @Nullable 222 final HttpConnectProxiedSocketAddress proxiedAddr; 223 224 @VisibleForTesting 225 int proxySocketTimeout = 30000; 226 227 // The following fields should only be used for test. 228 Runnable connectingCallback; 229 SettableFuture<Void> connectedFuture; 230 OkHttpClientTransport( OkHttpChannelBuilder.OkHttpTransportFactory transportFactory, InetSocketAddress address, String authority, @Nullable String userAgent, Attributes eagAttrs, @Nullable HttpConnectProxiedSocketAddress proxiedAddr, Runnable tooManyPingsRunnable)231 public OkHttpClientTransport( 232 OkHttpChannelBuilder.OkHttpTransportFactory transportFactory, 233 InetSocketAddress address, 234 String authority, 235 @Nullable String userAgent, 236 Attributes eagAttrs, 237 @Nullable HttpConnectProxiedSocketAddress proxiedAddr, 238 Runnable tooManyPingsRunnable) { 239 this( 240 transportFactory, 241 address, 242 authority, 243 userAgent, 244 eagAttrs, 245 GrpcUtil.STOPWATCH_SUPPLIER, 246 new Http2(), 247 proxiedAddr, 248 tooManyPingsRunnable); 249 } 250 OkHttpClientTransport( OkHttpChannelBuilder.OkHttpTransportFactory transportFactory, InetSocketAddress address, String authority, @Nullable String userAgent, Attributes eagAttrs, Supplier<Stopwatch> stopwatchFactory, Variant variant, @Nullable HttpConnectProxiedSocketAddress proxiedAddr, Runnable tooManyPingsRunnable)251 private OkHttpClientTransport( 252 OkHttpChannelBuilder.OkHttpTransportFactory transportFactory, 253 InetSocketAddress address, 254 String authority, 255 @Nullable String userAgent, 256 Attributes eagAttrs, 257 Supplier<Stopwatch> stopwatchFactory, 258 Variant variant, 259 @Nullable HttpConnectProxiedSocketAddress proxiedAddr, 260 Runnable tooManyPingsRunnable) { 261 this.address = Preconditions.checkNotNull(address, "address"); 262 this.defaultAuthority = authority; 263 this.maxMessageSize = transportFactory.maxMessageSize; 264 this.initialWindowSize = transportFactory.flowControlWindow; 265 this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor"); 266 serializingExecutor = new SerializingExecutor(transportFactory.executor); 267 this.scheduler = Preconditions.checkNotNull( 268 transportFactory.scheduledExecutorService, "scheduledExecutorService"); 269 // Client initiated streams are odd, server initiated ones are even. Server should not need to 270 // use it. We start clients at 3 to avoid conflicting with HTTP negotiation. 271 nextStreamId = 3; 272 this.socketFactory = transportFactory.socketFactory == null 273 ? SocketFactory.getDefault() : transportFactory.socketFactory; 274 this.sslSocketFactory = transportFactory.sslSocketFactory; 275 this.hostnameVerifier = transportFactory.hostnameVerifier; 276 this.connectionSpec = Preconditions.checkNotNull( 277 transportFactory.connectionSpec, "connectionSpec"); 278 this.stopwatchFactory = Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory"); 279 this.variant = Preconditions.checkNotNull(variant, "variant"); 280 this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent); 281 this.proxiedAddr = proxiedAddr; 282 this.tooManyPingsRunnable = 283 Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); 284 this.maxInboundMetadataSize = transportFactory.maxInboundMetadataSize; 285 this.transportTracer = transportFactory.transportTracerFactory.create(); 286 this.logId = InternalLogId.allocate(getClass(), address.toString()); 287 this.attributes = Attributes.newBuilder() 288 .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs).build(); 289 this.useGetForSafeMethods = transportFactory.useGetForSafeMethods; 290 initTransportTracer(); 291 } 292 293 /** 294 * Create a transport connected to a fake peer for test. 295 */ 296 @VisibleForTesting OkHttpClientTransport( OkHttpChannelBuilder.OkHttpTransportFactory transportFactory, String userAgent, Supplier<Stopwatch> stopwatchFactory, Variant variant, @Nullable Runnable connectingCallback, SettableFuture<Void> connectedFuture, Runnable tooManyPingsRunnable)297 OkHttpClientTransport( 298 OkHttpChannelBuilder.OkHttpTransportFactory transportFactory, 299 String userAgent, 300 Supplier<Stopwatch> stopwatchFactory, 301 Variant variant, 302 @Nullable Runnable connectingCallback, 303 SettableFuture<Void> connectedFuture, 304 Runnable tooManyPingsRunnable) { 305 this( 306 transportFactory, 307 new InetSocketAddress("127.0.0.1", 80), 308 "notarealauthority:80", 309 userAgent, 310 Attributes.EMPTY, 311 stopwatchFactory, 312 variant, 313 null, 314 tooManyPingsRunnable); 315 this.connectingCallback = connectingCallback; 316 this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture"); 317 } 318 319 // sslSocketFactory is set to null when use plaintext. isUsingPlaintext()320 boolean isUsingPlaintext() { 321 return sslSocketFactory == null; 322 } 323 initTransportTracer()324 private void initTransportTracer() { 325 synchronized (lock) { // to make @GuardedBy linter happy 326 transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() { 327 @Override 328 public TransportTracer.FlowControlWindows read() { 329 synchronized (lock) { 330 long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0); 331 // connectionUnacknowledgedBytesRead is only readable by ClientFrameHandler, so we 332 // provide a lower bound. 333 long remote = (long) (initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO); 334 return new TransportTracer.FlowControlWindows(local, remote); 335 } 336 } 337 }); 338 } 339 } 340 341 /** 342 * Enable keepalive with custom delay and timeout. 343 */ enableKeepAlive(boolean enable, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls)344 void enableKeepAlive(boolean enable, long keepAliveTimeNanos, 345 long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) { 346 enableKeepAlive = enable; 347 this.keepAliveTimeNanos = keepAliveTimeNanos; 348 this.keepAliveTimeoutNanos = keepAliveTimeoutNanos; 349 this.keepAliveWithoutCalls = keepAliveWithoutCalls; 350 } 351 352 @Override ping(final PingCallback callback, Executor executor)353 public void ping(final PingCallback callback, Executor executor) { 354 long data = 0; 355 Http2Ping p; 356 boolean writePing; 357 synchronized (lock) { 358 checkState(frameWriter != null); 359 if (stopped) { 360 Http2Ping.notifyFailed(callback, executor, getPingFailure()); 361 return; 362 } 363 if (ping != null) { 364 // we only allow one outstanding ping at a time, so just add the callback to 365 // any outstanding operation 366 p = ping; 367 writePing = false; 368 } else { 369 // set outstanding operation and then write the ping after releasing lock 370 data = random.nextLong(); 371 Stopwatch stopwatch = stopwatchFactory.get(); 372 stopwatch.start(); 373 p = ping = new Http2Ping(data, stopwatch); 374 writePing = true; 375 transportTracer.reportKeepAliveSent(); 376 } 377 if (writePing) { 378 frameWriter.ping(false, (int) (data >>> 32), (int) data); 379 } 380 } 381 // If transport concurrently failed/stopped since we released the lock above, this could 382 // immediately invoke callback (which we shouldn't do while holding a lock) 383 p.addCallback(callback, executor); 384 } 385 386 @Override newStream( MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions, ClientStreamTracer[] tracers)387 public OkHttpClientStream newStream( 388 MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions, 389 ClientStreamTracer[] tracers) { 390 Preconditions.checkNotNull(method, "method"); 391 Preconditions.checkNotNull(headers, "headers"); 392 StatsTraceContext statsTraceContext = 393 StatsTraceContext.newClientContext(tracers, getAttributes(), headers); 394 // FIXME: it is likely wrong to pass the transportTracer here as it'll exit the lock's scope 395 synchronized (lock) { // to make @GuardedBy linter happy 396 return new OkHttpClientStream( 397 method, 398 headers, 399 frameWriter, 400 OkHttpClientTransport.this, 401 outboundFlow, 402 lock, 403 maxMessageSize, 404 initialWindowSize, 405 defaultAuthority, 406 userAgent, 407 statsTraceContext, 408 transportTracer, 409 callOptions, 410 useGetForSafeMethods); 411 } 412 } 413 414 @GuardedBy("lock") streamReadyToStart(OkHttpClientStream clientStream)415 void streamReadyToStart(OkHttpClientStream clientStream) { 416 if (goAwayStatus != null) { 417 clientStream.transportState().transportReportStatus( 418 goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata()); 419 } else if (streams.size() >= maxConcurrentStreams) { 420 pendingStreams.add(clientStream); 421 setInUse(clientStream); 422 } else { 423 startStream(clientStream); 424 } 425 } 426 427 @SuppressWarnings("GuardedBy") 428 @GuardedBy("lock") startStream(OkHttpClientStream stream)429 private void startStream(OkHttpClientStream stream) { 430 Preconditions.checkState( 431 stream.transportState().id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned"); 432 streams.put(nextStreamId, stream); 433 setInUse(stream); 434 // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead 435 // found: 'this.lock' 436 stream.transportState().start(nextStreamId); 437 // For unary and server streaming, there will be a data frame soon, no need to flush the header. 438 if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING) 439 || stream.useGet()) { 440 frameWriter.flush(); 441 } 442 if (nextStreamId >= Integer.MAX_VALUE - 2) { 443 // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs 444 // correctly. 445 nextStreamId = Integer.MAX_VALUE; 446 startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, 447 Status.UNAVAILABLE.withDescription("Stream ids exhausted")); 448 } else { 449 nextStreamId += 2; 450 } 451 } 452 453 /** 454 * Starts pending streams, returns true if at least one pending stream is started. 455 */ 456 @GuardedBy("lock") startPendingStreams()457 private boolean startPendingStreams() { 458 boolean hasStreamStarted = false; 459 while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) { 460 OkHttpClientStream stream = pendingStreams.poll(); 461 startStream(stream); 462 hasStreamStarted = true; 463 } 464 return hasStreamStarted; 465 } 466 467 /** 468 * Removes given pending stream, used when a pending stream is cancelled. 469 */ 470 @GuardedBy("lock") removePendingStream(OkHttpClientStream pendingStream)471 void removePendingStream(OkHttpClientStream pendingStream) { 472 pendingStreams.remove(pendingStream); 473 maybeClearInUse(pendingStream); 474 } 475 476 @Override start(Listener listener)477 public Runnable start(Listener listener) { 478 this.listener = Preconditions.checkNotNull(listener, "listener"); 479 480 if (enableKeepAlive) { 481 keepAliveManager = new KeepAliveManager( 482 new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos, 483 keepAliveWithoutCalls); 484 keepAliveManager.onTransportStarted(); 485 } 486 487 int maxQueuedControlFrames = 10000; 488 final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames); 489 FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter( 490 variant.newWriter(Okio.buffer(asyncSink), true)); 491 492 synchronized (lock) { 493 // Handle FrameWriter exceptions centrally, since there are many callers. Note that errors 494 // coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink does not 495 // propagate syscall errors through the FrameWriter. But we handle the AsyncSink failures with 496 // the same TransportExceptionHandler instance so it is all mixed back together. 497 frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter); 498 outboundFlow = new OutboundFlowController(this, frameWriter); 499 } 500 final CountDownLatch latch = new CountDownLatch(1); 501 // Connecting in the serializingExecutor, so that some stream operations like synStream 502 // will be executed after connected. 503 serializingExecutor.execute(new Runnable() { 504 @Override 505 public void run() { 506 // This is a hack to make sure the connection preface and initial settings to be sent out 507 // without blocking the start. By doing this essentially prevents potential deadlock when 508 // network is not available during startup while another thread holding lock to send the 509 // initial preface. 510 try { 511 latch.await(); 512 } catch (InterruptedException e) { 513 Thread.currentThread().interrupt(); 514 } 515 // Use closed source on failure so that the reader immediately shuts down. 516 BufferedSource source = Okio.buffer(new Source() { 517 @Override 518 public long read(Buffer sink, long byteCount) { 519 return -1; 520 } 521 522 @Override 523 public Timeout timeout() { 524 return Timeout.NONE; 525 } 526 527 @Override 528 public void close() { 529 } 530 }); 531 Socket sock; 532 SSLSession sslSession = null; 533 try { 534 if (proxiedAddr == null) { 535 sock = socketFactory.createSocket(address.getAddress(), address.getPort()); 536 } else { 537 if (proxiedAddr.getProxyAddress() instanceof InetSocketAddress) { 538 sock = createHttpProxySocket( 539 proxiedAddr.getTargetAddress(), 540 (InetSocketAddress) proxiedAddr.getProxyAddress(), 541 proxiedAddr.getUsername(), 542 proxiedAddr.getPassword() 543 ); 544 } else { 545 throw Status.INTERNAL.withDescription( 546 "Unsupported SocketAddress implementation " 547 + proxiedAddr.getProxyAddress().getClass()).asException(); 548 } 549 } 550 if (sslSocketFactory != null) { 551 SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade( 552 sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(), 553 connectionSpec); 554 sslSession = sslSocket.getSession(); 555 sock = sslSocket; 556 } 557 sock.setTcpNoDelay(true); 558 source = Okio.buffer(Okio.source(sock)); 559 asyncSink.becomeConnected(Okio.sink(sock), sock); 560 561 // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info 562 attributes = attributes.toBuilder() 563 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress()) 564 .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress()) 565 .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession) 566 .set(GrpcAttributes.ATTR_SECURITY_LEVEL, 567 sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY) 568 .build(); 569 } catch (StatusException e) { 570 startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus()); 571 return; 572 } catch (Exception e) { 573 onException(e); 574 return; 575 } finally { 576 clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true)); 577 } 578 synchronized (lock) { 579 socket = Preconditions.checkNotNull(sock, "socket"); 580 if (sslSession != null) { 581 securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession)); 582 } 583 } 584 } 585 }); 586 // Schedule to send connection preface & settings before any other write. 587 try { 588 sendConnectionPrefaceAndSettings(); 589 } finally { 590 latch.countDown(); 591 } 592 593 serializingExecutor.execute(new Runnable() { 594 @Override 595 public void run() { 596 if (connectingCallback != null) { 597 connectingCallback.run(); 598 } 599 // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it 600 // may send goAway immediately. 601 executor.execute(clientFrameHandler); 602 synchronized (lock) { 603 maxConcurrentStreams = Integer.MAX_VALUE; 604 startPendingStreams(); 605 } 606 if (connectedFuture != null) { 607 connectedFuture.set(null); 608 } 609 } 610 }); 611 return null; 612 } 613 614 /** 615 * Should only be called once when the transport is first established. 616 */ sendConnectionPrefaceAndSettings()617 private void sendConnectionPrefaceAndSettings() { 618 synchronized (lock) { 619 frameWriter.connectionPreface(); 620 Settings settings = new Settings(); 621 OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize); 622 frameWriter.settings(settings); 623 if (initialWindowSize > DEFAULT_WINDOW_SIZE) { 624 frameWriter.windowUpdate( 625 Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE); 626 } 627 } 628 } 629 createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress, String proxyUsername, String proxyPassword)630 private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress, 631 String proxyUsername, String proxyPassword) throws StatusException { 632 Socket sock = null; 633 try { 634 // The proxy address may not be resolved 635 if (proxyAddress.getAddress() != null) { 636 sock = socketFactory.createSocket(proxyAddress.getAddress(), proxyAddress.getPort()); 637 } else { 638 sock = 639 socketFactory.createSocket(proxyAddress.getHostName(), proxyAddress.getPort()); 640 } 641 sock.setTcpNoDelay(true); 642 // A socket timeout is needed because lost network connectivity while reading from the proxy, 643 // can cause reading from the socket to hang. 644 sock.setSoTimeout(proxySocketTimeout); 645 646 Source source = Okio.source(sock); 647 BufferedSink sink = Okio.buffer(Okio.sink(sock)); 648 649 // Prepare headers and request method line 650 Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword); 651 HttpUrl url = proxyRequest.httpUrl(); 652 String requestLine = 653 String.format(Locale.US, "CONNECT %s:%d HTTP/1.1", url.host(), url.port()); 654 655 // Write request to socket 656 sink.writeUtf8(requestLine).writeUtf8("\r\n"); 657 for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) { 658 sink.writeUtf8(proxyRequest.headers().name(i)) 659 .writeUtf8(": ") 660 .writeUtf8(proxyRequest.headers().value(i)) 661 .writeUtf8("\r\n"); 662 } 663 sink.writeUtf8("\r\n"); 664 // Flush buffer (flushes socket and sends request) 665 sink.flush(); 666 667 // Read status line, check if 2xx was returned 668 StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source)); 669 // Drain rest of headers 670 while (!readUtf8LineStrictUnbuffered(source).equals("")) {} 671 if (statusLine.code < 200 || statusLine.code >= 300) { 672 Buffer body = new Buffer(); 673 try { 674 sock.shutdownOutput(); 675 source.read(body, 1024); 676 } catch (IOException ex) { 677 body.writeUtf8("Unable to read body: " + ex.toString()); 678 } 679 try { 680 sock.close(); 681 } catch (IOException ignored) { 682 // ignored 683 } 684 String message = String.format( 685 Locale.US, 686 "Response returned from proxy was not successful (expected 2xx, got %d %s). " 687 + "Response body:\n%s", 688 statusLine.code, statusLine.message, body.readUtf8()); 689 throw Status.UNAVAILABLE.withDescription(message).asException(); 690 } 691 // As the socket will be used for RPCs from here on, we want the socket timeout back to zero. 692 sock.setSoTimeout(0); 693 return sock; 694 } catch (IOException e) { 695 if (sock != null) { 696 GrpcUtil.closeQuietly(sock); 697 } 698 throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e) 699 .asException(); 700 } 701 } 702 createHttpProxyRequest(InetSocketAddress address, String proxyUsername, String proxyPassword)703 private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername, 704 String proxyPassword) { 705 HttpUrl tunnelUrl = new HttpUrl.Builder() 706 .scheme("https") 707 .host(address.getHostName()) 708 .port(address.getPort()) 709 .build(); 710 711 Request.Builder request = new Request.Builder() 712 .url(tunnelUrl) 713 .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port()) 714 .header("User-Agent", userAgent); 715 716 // If we have proxy credentials, set them right away 717 if (proxyUsername != null && proxyPassword != null) { 718 request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword)); 719 } 720 return request.build(); 721 } 722 readUtf8LineStrictUnbuffered(Source source)723 private static String readUtf8LineStrictUnbuffered(Source source) throws IOException { 724 Buffer buffer = new Buffer(); 725 while (true) { 726 if (source.read(buffer, 1) == -1) { 727 throw new EOFException("\\n not found: " + buffer.readByteString().hex()); 728 } 729 if (buffer.getByte(buffer.size() - 1) == '\n') { 730 return buffer.readUtf8LineStrict(); 731 } 732 } 733 } 734 735 @Override toString()736 public String toString() { 737 return MoreObjects.toStringHelper(this) 738 .add("logId", logId.getId()) 739 .add("address", address) 740 .toString(); 741 } 742 743 @Override getLogId()744 public InternalLogId getLogId() { 745 return logId; 746 } 747 748 /** 749 * Gets the overridden authority hostname. If the authority is overridden to be an invalid 750 * authority, uri.getHost() will (rightly) return null, since the authority is no longer 751 * an actual service. This method overrides the behavior for practical reasons. For example, 752 * if an authority is in the form "invalid_authority" (note the "_"), rather than return null, 753 * we return the input. This is because the return value, in conjunction with getOverridenPort, 754 * are used by the SSL library to reconstruct the actual authority. It /already/ has a 755 * connection to the port, independent of this function. 756 * 757 * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do 758 * the wrong thing. An example wrong behavior would be "invalid_host:443". Registry based 759 * authorities do not have ports, so this is even more wrong than before. Sorry. 760 */ 761 @VisibleForTesting getOverridenHost()762 String getOverridenHost() { 763 URI uri = GrpcUtil.authorityToUri(defaultAuthority); 764 if (uri.getHost() != null) { 765 return uri.getHost(); 766 } 767 768 return defaultAuthority; 769 } 770 771 @VisibleForTesting getOverridenPort()772 int getOverridenPort() { 773 URI uri = GrpcUtil.authorityToUri(defaultAuthority); 774 if (uri.getPort() != -1) { 775 return uri.getPort(); 776 } 777 778 return address.getPort(); 779 } 780 781 @Override shutdown(Status reason)782 public void shutdown(Status reason) { 783 synchronized (lock) { 784 if (goAwayStatus != null) { 785 return; 786 } 787 788 goAwayStatus = reason; 789 listener.transportShutdown(goAwayStatus); 790 stopIfNecessary(); 791 } 792 } 793 794 @Override shutdownNow(Status reason)795 public void shutdownNow(Status reason) { 796 shutdown(reason); 797 synchronized (lock) { 798 Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator(); 799 while (it.hasNext()) { 800 Map.Entry<Integer, OkHttpClientStream> entry = it.next(); 801 it.remove(); 802 entry.getValue().transportState().transportReportStatus(reason, false, new Metadata()); 803 maybeClearInUse(entry.getValue()); 804 } 805 806 for (OkHttpClientStream stream : pendingStreams) { 807 // in cases such as the connection fails to ACK keep-alive, pending streams should have a 808 // chance to retry and be routed to another connection. 809 stream.transportState().transportReportStatus( 810 reason, RpcProgress.MISCARRIED, true, new Metadata()); 811 maybeClearInUse(stream); 812 } 813 pendingStreams.clear(); 814 815 stopIfNecessary(); 816 } 817 } 818 819 @Override getAttributes()820 public Attributes getAttributes() { 821 return attributes; 822 } 823 824 /** 825 * Gets all active streams as an array. 826 */ 827 @Override getActiveStreams()828 public OutboundFlowController.StreamState[] getActiveStreams() { 829 synchronized (lock) { 830 OutboundFlowController.StreamState[] flowStreams = 831 new OutboundFlowController.StreamState[streams.size()]; 832 int i = 0; 833 for (OkHttpClientStream stream : streams.values()) { 834 flowStreams[i++] = stream.transportState().getOutboundFlowState(); 835 } 836 return flowStreams; 837 } 838 } 839 840 @VisibleForTesting getHandler()841 ClientFrameHandler getHandler() { 842 return clientFrameHandler; 843 } 844 845 @VisibleForTesting getSocketFactory()846 SocketFactory getSocketFactory() { 847 return socketFactory; 848 } 849 850 @VisibleForTesting getPendingStreamSize()851 int getPendingStreamSize() { 852 synchronized (lock) { 853 return pendingStreams.size(); 854 } 855 } 856 857 @VisibleForTesting setNextStreamId(int nextStreamId)858 void setNextStreamId(int nextStreamId) { 859 synchronized (lock) { 860 this.nextStreamId = nextStreamId; 861 } 862 } 863 864 /** 865 * Finish all active streams due to an IOException, then close the transport. 866 */ 867 @Override onException(Throwable failureCause)868 public void onException(Throwable failureCause) { 869 Preconditions.checkNotNull(failureCause, "failureCause"); 870 Status status = Status.UNAVAILABLE.withCause(failureCause); 871 startGoAway(0, ErrorCode.INTERNAL_ERROR, status); 872 } 873 874 /** 875 * Send GOAWAY to the server, then finish all active streams and close the transport. 876 */ onError(ErrorCode errorCode, String moreDetail)877 private void onError(ErrorCode errorCode, String moreDetail) { 878 startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail)); 879 } 880 startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status)881 private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) { 882 synchronized (lock) { 883 if (goAwayStatus == null) { 884 goAwayStatus = status; 885 listener.transportShutdown(status); 886 } 887 if (errorCode != null && !goAwaySent) { 888 // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated 889 // streams. The GOAWAY is part of graceful shutdown. 890 goAwaySent = true; 891 frameWriter.goAway(0, errorCode, new byte[0]); 892 } 893 894 Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator(); 895 while (it.hasNext()) { 896 Map.Entry<Integer, OkHttpClientStream> entry = it.next(); 897 if (entry.getKey() > lastKnownStreamId) { 898 it.remove(); 899 entry.getValue().transportState().transportReportStatus( 900 status, RpcProgress.REFUSED, false, new Metadata()); 901 maybeClearInUse(entry.getValue()); 902 } 903 } 904 905 for (OkHttpClientStream stream : pendingStreams) { 906 stream.transportState().transportReportStatus( 907 status, RpcProgress.MISCARRIED, true, new Metadata()); 908 maybeClearInUse(stream); 909 } 910 pendingStreams.clear(); 911 912 stopIfNecessary(); 913 } 914 } 915 916 /** 917 * Called when a stream is closed. We do things like: 918 * <ul> 919 * <li>Removing the stream from the map. 920 * <li>Optionally reporting the status. 921 * <li>Starting pending streams if we can. 922 * <li>Stopping the transport if this is the last live stream under a go-away status. 923 * </ul> 924 * 925 * @param streamId the Id of the stream. 926 * @param status the final status of this stream, null means no need to report. 927 * @param stopDelivery interrupt queued messages in the deframer 928 * @param errorCode reset the stream with this ErrorCode if not null. 929 * @param trailers the trailers received if not null 930 */ finishStream( int streamId, @Nullable Status status, RpcProgress rpcProgress, boolean stopDelivery, @Nullable ErrorCode errorCode, @Nullable Metadata trailers)931 void finishStream( 932 int streamId, 933 @Nullable Status status, 934 RpcProgress rpcProgress, 935 boolean stopDelivery, 936 @Nullable ErrorCode errorCode, 937 @Nullable Metadata trailers) { 938 synchronized (lock) { 939 OkHttpClientStream stream = streams.remove(streamId); 940 if (stream != null) { 941 if (errorCode != null) { 942 frameWriter.rstStream(streamId, ErrorCode.CANCEL); 943 } 944 if (status != null) { 945 stream 946 .transportState() 947 .transportReportStatus( 948 status, 949 rpcProgress, 950 stopDelivery, 951 trailers != null ? trailers : new Metadata()); 952 } 953 if (!startPendingStreams()) { 954 stopIfNecessary(); 955 maybeClearInUse(stream); 956 } 957 } 958 } 959 } 960 961 /** 962 * When the transport is in goAway state, we should stop it once all active streams finish. 963 */ 964 @GuardedBy("lock") stopIfNecessary()965 private void stopIfNecessary() { 966 if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) { 967 return; 968 } 969 if (stopped) { 970 return; 971 } 972 stopped = true; 973 974 if (keepAliveManager != null) { 975 keepAliveManager.onTransportTermination(); 976 } 977 978 if (ping != null) { 979 ping.failed(getPingFailure()); 980 ping = null; 981 } 982 983 if (!goAwaySent) { 984 // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated 985 // streams. The GOAWAY is part of graceful shutdown. 986 goAwaySent = true; 987 frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]); 988 } 989 990 // We will close the underlying socket in the writing thread to break out the reader 991 // thread, which will close the frameReader and notify the listener. 992 frameWriter.close(); 993 } 994 995 @GuardedBy("lock") maybeClearInUse(OkHttpClientStream stream)996 private void maybeClearInUse(OkHttpClientStream stream) { 997 if (hasStream) { 998 if (pendingStreams.isEmpty() && streams.isEmpty()) { 999 hasStream = false; 1000 if (keepAliveManager != null) { 1001 // We don't have any active streams. No need to do keepalives any more. 1002 // Again, we have to call this inside the lock to avoid the race between onTransportIdle 1003 // and onTransportActive. 1004 keepAliveManager.onTransportIdle(); 1005 } 1006 } 1007 } 1008 if (stream.shouldBeCountedForInUse()) { 1009 inUseState.updateObjectInUse(stream, false); 1010 } 1011 } 1012 1013 @GuardedBy("lock") setInUse(OkHttpClientStream stream)1014 private void setInUse(OkHttpClientStream stream) { 1015 if (!hasStream) { 1016 hasStream = true; 1017 if (keepAliveManager != null) { 1018 // We have a new stream. We might need to do keepalives now. 1019 // Note that we have to do this inside the lock to avoid calling 1020 // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong 1021 // order. 1022 keepAliveManager.onTransportActive(); 1023 } 1024 } 1025 if (stream.shouldBeCountedForInUse()) { 1026 inUseState.updateObjectInUse(stream, true); 1027 } 1028 } 1029 getPingFailure()1030 private Throwable getPingFailure() { 1031 synchronized (lock) { 1032 if (goAwayStatus != null) { 1033 return goAwayStatus.asException(); 1034 } else { 1035 return Status.UNAVAILABLE.withDescription("Connection closed").asException(); 1036 } 1037 } 1038 } 1039 mayHaveCreatedStream(int streamId)1040 boolean mayHaveCreatedStream(int streamId) { 1041 synchronized (lock) { 1042 return streamId < nextStreamId && (streamId & 1) == 1; 1043 } 1044 } 1045 getStream(int streamId)1046 OkHttpClientStream getStream(int streamId) { 1047 synchronized (lock) { 1048 return streams.get(streamId); 1049 } 1050 } 1051 1052 /** 1053 * Returns a Grpc status corresponding to the given ErrorCode. 1054 */ 1055 @VisibleForTesting toGrpcStatus(ErrorCode code)1056 static Status toGrpcStatus(ErrorCode code) { 1057 Status status = ERROR_CODE_TO_STATUS.get(code); 1058 return status != null ? status : Status.UNKNOWN.withDescription( 1059 "Unknown http2 error code: " + code.httpCode); 1060 } 1061 1062 @Override getStats()1063 public ListenableFuture<SocketStats> getStats() { 1064 SettableFuture<SocketStats> ret = SettableFuture.create(); 1065 synchronized (lock) { 1066 if (socket == null) { 1067 ret.set(new SocketStats( 1068 transportTracer.getStats(), 1069 /*local=*/ null, 1070 /*remote=*/ null, 1071 new InternalChannelz.SocketOptions.Builder().build(), 1072 /*security=*/ null)); 1073 } else { 1074 ret.set(new SocketStats( 1075 transportTracer.getStats(), 1076 socket.getLocalSocketAddress(), 1077 socket.getRemoteSocketAddress(), 1078 Utils.getSocketOptions(socket), 1079 securityInfo)); 1080 } 1081 return ret; 1082 } 1083 } 1084 1085 /** 1086 * Runnable which reads frames and dispatches them to in flight calls. 1087 */ 1088 class ClientFrameHandler implements FrameReader.Handler, Runnable { 1089 1090 private final OkHttpFrameLogger logger = 1091 new OkHttpFrameLogger(Level.FINE, OkHttpClientTransport.class); 1092 FrameReader frameReader; 1093 boolean firstSettings = true; 1094 ClientFrameHandler(FrameReader frameReader)1095 ClientFrameHandler(FrameReader frameReader) { 1096 this.frameReader = frameReader; 1097 } 1098 1099 @Override run()1100 public void run() { 1101 String threadName = Thread.currentThread().getName(); 1102 Thread.currentThread().setName("OkHttpClientTransport"); 1103 try { 1104 // Read until the underlying socket closes. 1105 while (frameReader.nextFrame(this)) { 1106 if (keepAliveManager != null) { 1107 keepAliveManager.onDataReceived(); 1108 } 1109 } 1110 // frameReader.nextFrame() returns false when the underlying read encounters an IOException, 1111 // it may be triggered by the socket closing, in such case, the startGoAway() will do 1112 // nothing, otherwise, we finish all streams since it's a real IO issue. 1113 Status status; 1114 synchronized (lock) { 1115 status = goAwayStatus; 1116 } 1117 if (status == null) { 1118 status = Status.UNAVAILABLE.withDescription("End of stream or IOException"); 1119 } 1120 startGoAway(0, ErrorCode.INTERNAL_ERROR, status); 1121 } catch (Throwable t) { 1122 // TODO(madongfly): Send the exception message to the server. 1123 startGoAway( 1124 0, 1125 ErrorCode.PROTOCOL_ERROR, 1126 Status.INTERNAL.withDescription("error in frame handler").withCause(t)); 1127 } finally { 1128 try { 1129 frameReader.close(); 1130 } catch (IOException ex) { 1131 log.log(Level.INFO, "Exception closing frame reader", ex); 1132 } 1133 listener.transportTerminated(); 1134 Thread.currentThread().setName(threadName); 1135 } 1136 } 1137 1138 /** 1139 * Handle an HTTP2 DATA frame. 1140 */ 1141 @SuppressWarnings("GuardedBy") 1142 @Override data(boolean inFinished, int streamId, BufferedSource in, int length)1143 public void data(boolean inFinished, int streamId, BufferedSource in, int length) 1144 throws IOException { 1145 logger.logData(OkHttpFrameLogger.Direction.INBOUND, 1146 streamId, in.getBuffer(), length, inFinished); 1147 OkHttpClientStream stream = getStream(streamId); 1148 if (stream == null) { 1149 if (mayHaveCreatedStream(streamId)) { 1150 synchronized (lock) { 1151 frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED); 1152 } 1153 in.skip(length); 1154 } else { 1155 onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId); 1156 return; 1157 } 1158 } else { 1159 // Wait until the frame is complete. 1160 in.require(length); 1161 1162 Buffer buf = new Buffer(); 1163 buf.write(in.getBuffer(), length); 1164 PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data", 1165 stream.transportState().tag()); 1166 synchronized (lock) { 1167 // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; 1168 // instead found: 'OkHttpClientTransport.this.lock' 1169 stream.transportState().transportDataReceived(buf, inFinished); 1170 } 1171 } 1172 1173 // connection window update 1174 connectionUnacknowledgedBytesRead += length; 1175 if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) { 1176 synchronized (lock) { 1177 frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead); 1178 } 1179 connectionUnacknowledgedBytesRead = 0; 1180 } 1181 } 1182 1183 /** 1184 * Handle HTTP2 HEADER and CONTINUATION frames. 1185 */ 1186 @SuppressWarnings("GuardedBy") 1187 @Override headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode)1188 public void headers(boolean outFinished, 1189 boolean inFinished, 1190 int streamId, 1191 int associatedStreamId, 1192 List<Header> headerBlock, 1193 HeadersMode headersMode) { 1194 logger.logHeaders(OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished); 1195 boolean unknownStream = false; 1196 Status failedStatus = null; 1197 if (maxInboundMetadataSize != Integer.MAX_VALUE) { 1198 int metadataSize = headerBlockSize(headerBlock); 1199 if (metadataSize > maxInboundMetadataSize) { 1200 failedStatus = Status.RESOURCE_EXHAUSTED.withDescription( 1201 String.format( 1202 Locale.US, 1203 "Response %s metadata larger than %d: %d", 1204 inFinished ? "trailer" : "header", 1205 maxInboundMetadataSize, 1206 metadataSize)); 1207 } 1208 } 1209 synchronized (lock) { 1210 OkHttpClientStream stream = streams.get(streamId); 1211 if (stream == null) { 1212 if (mayHaveCreatedStream(streamId)) { 1213 frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED); 1214 } else { 1215 unknownStream = true; 1216 } 1217 } else { 1218 if (failedStatus == null) { 1219 PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers", 1220 stream.transportState().tag()); 1221 // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; 1222 // instead found: 'OkHttpClientTransport.this.lock' 1223 stream.transportState().transportHeadersReceived(headerBlock, inFinished); 1224 } else { 1225 if (!inFinished) { 1226 frameWriter.rstStream(streamId, ErrorCode.CANCEL); 1227 } 1228 stream.transportState().transportReportStatus(failedStatus, false, new Metadata()); 1229 } 1230 } 1231 } 1232 if (unknownStream) { 1233 // We don't expect any server-initiated streams. 1234 onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId); 1235 } 1236 } 1237 headerBlockSize(List<Header> headerBlock)1238 private int headerBlockSize(List<Header> headerBlock) { 1239 // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2. 1240 long size = 0; 1241 for (int i = 0; i < headerBlock.size(); i++) { 1242 Header header = headerBlock.get(i); 1243 size += 32 + header.name.size() + header.value.size(); 1244 } 1245 size = Math.min(size, Integer.MAX_VALUE); 1246 return (int) size; 1247 } 1248 1249 @Override rstStream(int streamId, ErrorCode errorCode)1250 public void rstStream(int streamId, ErrorCode errorCode) { 1251 logger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode); 1252 Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream"); 1253 boolean stopDelivery = 1254 (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED); 1255 synchronized (lock) { 1256 OkHttpClientStream stream = streams.get(streamId); 1257 if (stream != null) { 1258 PerfMark.event("OkHttpClientTransport$ClientFrameHandler.rstStream", 1259 stream.transportState().tag()); 1260 finishStream( 1261 streamId, status, 1262 errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED, 1263 stopDelivery, null, null); 1264 } 1265 } 1266 } 1267 1268 @Override settings(boolean clearPrevious, Settings settings)1269 public void settings(boolean clearPrevious, Settings settings) { 1270 logger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings); 1271 boolean outboundWindowSizeIncreased = false; 1272 synchronized (lock) { 1273 if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) { 1274 int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get( 1275 settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS); 1276 maxConcurrentStreams = receivedMaxConcurrentStreams; 1277 } 1278 1279 if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) { 1280 int initialWindowSize = OkHttpSettingsUtil.get( 1281 settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE); 1282 outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize); 1283 } 1284 if (firstSettings) { 1285 listener.transportReady(); 1286 firstSettings = false; 1287 } 1288 1289 // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any 1290 // writes due to update in settings must be sent after SETTINGS acknowledgment frame, 1291 // otherwise it will cause a stream error (RST_STREAM). 1292 frameWriter.ackSettings(settings); 1293 1294 // send any pending bytes / streams 1295 if (outboundWindowSizeIncreased) { 1296 outboundFlow.writeStreams(); 1297 } 1298 startPendingStreams(); 1299 } 1300 } 1301 1302 @Override ping(boolean ack, int payload1, int payload2)1303 public void ping(boolean ack, int payload1, int payload2) { 1304 long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL); 1305 logger.logPing(OkHttpFrameLogger.Direction.INBOUND, ackPayload); 1306 if (!ack) { 1307 synchronized (lock) { 1308 frameWriter.ping(true, payload1, payload2); 1309 } 1310 } else { 1311 Http2Ping p = null; 1312 synchronized (lock) { 1313 if (ping != null) { 1314 if (ping.payload() == ackPayload) { 1315 p = ping; 1316 ping = null; 1317 } else { 1318 log.log(Level.WARNING, String.format( 1319 Locale.US, "Received unexpected ping ack. Expecting %d, got %d", 1320 ping.payload(), ackPayload)); 1321 } 1322 } else { 1323 log.warning("Received unexpected ping ack. No ping outstanding"); 1324 } 1325 } 1326 // don't complete it while holding lock since callbacks could run immediately 1327 if (p != null) { 1328 p.complete(); 1329 } 1330 } 1331 } 1332 1333 @Override ackSettings()1334 public void ackSettings() { 1335 // Do nothing currently. 1336 } 1337 1338 @Override goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData)1339 public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) { 1340 logger.logGoAway(OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData); 1341 if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) { 1342 String data = debugData.utf8(); 1343 log.log(Level.WARNING, String.format( 1344 "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data)); 1345 if ("too_many_pings".equals(data)) { 1346 tooManyPingsRunnable.run(); 1347 } 1348 } 1349 Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode) 1350 .augmentDescription("Received Goaway"); 1351 if (debugData.size() > 0) { 1352 // If a debug message was provided, use it. 1353 status = status.augmentDescription(debugData.utf8()); 1354 } 1355 startGoAway(lastGoodStreamId, null, status); 1356 } 1357 1358 @Override pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)1359 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) 1360 throws IOException { 1361 logger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND, 1362 streamId, promisedStreamId, requestHeaders); 1363 // We don't accept server initiated stream. 1364 synchronized (lock) { 1365 frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR); 1366 } 1367 } 1368 1369 @Override windowUpdate(int streamId, long delta)1370 public void windowUpdate(int streamId, long delta) { 1371 logger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta); 1372 if (delta == 0) { 1373 String errorMsg = "Received 0 flow control window increment."; 1374 if (streamId == 0) { 1375 onError(ErrorCode.PROTOCOL_ERROR, errorMsg); 1376 } else { 1377 finishStream( 1378 streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false, 1379 ErrorCode.PROTOCOL_ERROR, null); 1380 } 1381 return; 1382 } 1383 1384 boolean unknownStream = false; 1385 synchronized (lock) { 1386 if (streamId == Utils.CONNECTION_STREAM_ID) { 1387 outboundFlow.windowUpdate(null, (int) delta); 1388 return; 1389 } 1390 1391 OkHttpClientStream stream = streams.get(streamId); 1392 if (stream != null) { 1393 outboundFlow.windowUpdate(stream.transportState().getOutboundFlowState(), (int) delta); 1394 } else if (!mayHaveCreatedStream(streamId)) { 1395 unknownStream = true; 1396 } 1397 } 1398 if (unknownStream) { 1399 onError(ErrorCode.PROTOCOL_ERROR, 1400 "Received window_update for unknown stream: " + streamId); 1401 } 1402 } 1403 1404 @Override priority(int streamId, int streamDependency, int weight, boolean exclusive)1405 public void priority(int streamId, int streamDependency, int weight, boolean exclusive) { 1406 // Ignore priority change. 1407 // TODO(madongfly): log 1408 } 1409 1410 @Override alternateService(int streamId, String origin, ByteString protocol, String host, int port, long maxAge)1411 public void alternateService(int streamId, String origin, ByteString protocol, String host, 1412 int port, long maxAge) { 1413 // TODO(madongfly): Deal with alternateService propagation 1414 } 1415 } 1416 } 1417