1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.okhttp; 18 19 import static com.google.common.base.Preconditions.checkState; 20 import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import com.google.common.base.MoreObjects; 24 import com.google.common.base.Preconditions; 25 import com.google.common.base.Stopwatch; 26 import com.google.common.base.Supplier; 27 import com.google.common.util.concurrent.ListenableFuture; 28 import com.google.common.util.concurrent.SettableFuture; 29 import com.squareup.okhttp.Credentials; 30 import com.squareup.okhttp.HttpUrl; 31 import com.squareup.okhttp.Request; 32 import com.squareup.okhttp.internal.http.StatusLine; 33 import io.grpc.Attributes; 34 import io.grpc.CallOptions; 35 import io.grpc.Grpc; 36 import io.grpc.InternalChannelz; 37 import io.grpc.InternalChannelz.SocketStats; 38 import io.grpc.InternalLogId; 39 import io.grpc.Metadata; 40 import io.grpc.MethodDescriptor; 41 import io.grpc.MethodDescriptor.MethodType; 42 import io.grpc.SecurityLevel; 43 import io.grpc.Status; 44 import io.grpc.Status.Code; 45 import io.grpc.StatusException; 46 import io.grpc.internal.ClientStreamListener.RpcProgress; 47 import io.grpc.internal.ConnectionClientTransport; 48 import io.grpc.internal.GrpcAttributes; 49 import io.grpc.internal.GrpcUtil; 50 import io.grpc.internal.Http2Ping; 51 import io.grpc.internal.KeepAliveManager; 52 import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger; 53 import io.grpc.internal.ProxyParameters; 54 import io.grpc.internal.SerializingExecutor; 55 import io.grpc.internal.SharedResourceHolder; 56 import io.grpc.internal.StatsTraceContext; 57 import io.grpc.internal.TransportTracer; 58 import io.grpc.okhttp.AsyncFrameWriter.TransportExceptionHandler; 59 import io.grpc.okhttp.internal.ConnectionSpec; 60 import io.grpc.okhttp.internal.framed.ErrorCode; 61 import io.grpc.okhttp.internal.framed.FrameReader; 62 import io.grpc.okhttp.internal.framed.FrameWriter; 63 import io.grpc.okhttp.internal.framed.Header; 64 import io.grpc.okhttp.internal.framed.HeadersMode; 65 import io.grpc.okhttp.internal.framed.Http2; 66 import io.grpc.okhttp.internal.framed.Settings; 67 import io.grpc.okhttp.internal.framed.Variant; 68 import java.io.EOFException; 69 import java.io.IOException; 70 import java.net.InetSocketAddress; 71 import java.net.Socket; 72 import java.net.URI; 73 import java.util.Collections; 74 import java.util.EnumMap; 75 import java.util.HashMap; 76 import java.util.Iterator; 77 import java.util.LinkedList; 78 import java.util.List; 79 import java.util.Map; 80 import java.util.Random; 81 import java.util.concurrent.Executor; 82 import java.util.concurrent.ScheduledExecutorService; 83 import java.util.logging.Level; 84 import java.util.logging.Logger; 85 import javax.annotation.Nullable; 86 import javax.annotation.concurrent.GuardedBy; 87 import javax.net.ssl.HostnameVerifier; 88 import javax.net.ssl.SSLSession; 89 import javax.net.ssl.SSLSocket; 90 import javax.net.ssl.SSLSocketFactory; 91 import okio.Buffer; 92 import okio.BufferedSink; 93 import okio.BufferedSource; 94 import okio.ByteString; 95 import okio.Okio; 96 import okio.Source; 97 import okio.Timeout; 98 99 /** 100 * A okhttp-based {@link ConnectionClientTransport} implementation. 101 */ 102 class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler { 103 private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap(); 104 private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName()); 105 private static final OkHttpClientStream[] EMPTY_STREAM_ARRAY = new OkHttpClientStream[0]; 106 buildErrorCodeToStatusMap()107 private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() { 108 Map<ErrorCode, Status> errorToStatus = new EnumMap<ErrorCode, Status>(ErrorCode.class); 109 errorToStatus.put(ErrorCode.NO_ERROR, 110 Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent")); 111 errorToStatus.put(ErrorCode.PROTOCOL_ERROR, 112 Status.INTERNAL.withDescription("Protocol error")); 113 errorToStatus.put(ErrorCode.INTERNAL_ERROR, 114 Status.INTERNAL.withDescription("Internal error")); 115 errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR, 116 Status.INTERNAL.withDescription("Flow control error")); 117 errorToStatus.put(ErrorCode.STREAM_CLOSED, 118 Status.INTERNAL.withDescription("Stream closed")); 119 errorToStatus.put(ErrorCode.FRAME_TOO_LARGE, 120 Status.INTERNAL.withDescription("Frame too large")); 121 errorToStatus.put(ErrorCode.REFUSED_STREAM, 122 Status.UNAVAILABLE.withDescription("Refused stream")); 123 errorToStatus.put(ErrorCode.CANCEL, 124 Status.CANCELLED.withDescription("Cancelled")); 125 errorToStatus.put(ErrorCode.COMPRESSION_ERROR, 126 Status.INTERNAL.withDescription("Compression error")); 127 errorToStatus.put(ErrorCode.CONNECT_ERROR, 128 Status.INTERNAL.withDescription("Connect error")); 129 errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM, 130 Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm")); 131 errorToStatus.put(ErrorCode.INADEQUATE_SECURITY, 132 Status.PERMISSION_DENIED.withDescription("Inadequate security")); 133 return Collections.unmodifiableMap(errorToStatus); 134 } 135 136 private final InetSocketAddress address; 137 private final String defaultAuthority; 138 private final String userAgent; 139 private final Random random = new Random(); 140 // Returns new unstarted stopwatches 141 private final Supplier<Stopwatch> stopwatchFactory; 142 private Listener listener; 143 private FrameReader testFrameReader; 144 private AsyncFrameWriter frameWriter; 145 private OutboundFlowController outboundFlow; 146 private final Object lock = new Object(); 147 private final InternalLogId logId = InternalLogId.allocate(getClass().getName()); 148 @GuardedBy("lock") 149 private int nextStreamId; 150 @GuardedBy("lock") 151 private final Map<Integer, OkHttpClientStream> streams = 152 new HashMap<Integer, OkHttpClientStream>(); 153 private final Executor executor; 154 // Wrap on executor, to guarantee some operations be executed serially. 155 private final SerializingExecutor serializingExecutor; 156 private final int maxMessageSize; 157 private int connectionUnacknowledgedBytesRead; 158 private ClientFrameHandler clientFrameHandler; 159 // Caution: Not synchronized, new value can only be safely read after the connection is complete. 160 private Attributes attributes = Attributes.EMPTY; 161 /** 162 * Indicates the transport is in go-away state: no new streams will be processed, but existing 163 * streams may continue. 164 */ 165 @GuardedBy("lock") 166 private Status goAwayStatus; 167 @GuardedBy("lock") 168 private boolean goAwaySent; 169 @GuardedBy("lock") 170 private Http2Ping ping; 171 @GuardedBy("lock") 172 private boolean stopped; 173 @GuardedBy("lock") 174 private boolean inUse; 175 private SSLSocketFactory sslSocketFactory; 176 private HostnameVerifier hostnameVerifier; 177 private Socket socket; 178 @GuardedBy("lock") 179 private int maxConcurrentStreams = 0; 180 @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty 181 @GuardedBy("lock") 182 private LinkedList<OkHttpClientStream> pendingStreams = new LinkedList<OkHttpClientStream>(); 183 private final ConnectionSpec connectionSpec; 184 private FrameWriter testFrameWriter; 185 private ScheduledExecutorService scheduler; 186 private KeepAliveManager keepAliveManager; 187 private boolean enableKeepAlive; 188 private long keepAliveTimeNanos; 189 private long keepAliveTimeoutNanos; 190 private boolean keepAliveWithoutCalls; 191 private final Runnable tooManyPingsRunnable; 192 @GuardedBy("lock") 193 private final TransportTracer transportTracer; 194 @GuardedBy("lock") 195 private InternalChannelz.Security securityInfo; 196 197 @VisibleForTesting 198 @Nullable 199 final ProxyParameters proxy; 200 201 // The following fields should only be used for test. 202 Runnable connectingCallback; 203 SettableFuture<Void> connectedFuture; 204 205 OkHttpClientTransport(InetSocketAddress address, String authority, @Nullable String userAgent, Executor executor, @Nullable SSLSocketFactory sslSocketFactory, @Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec, int maxMessageSize, @Nullable ProxyParameters proxy, Runnable tooManyPingsRunnable, TransportTracer transportTracer)206 OkHttpClientTransport(InetSocketAddress address, String authority, @Nullable String userAgent, 207 Executor executor, @Nullable SSLSocketFactory sslSocketFactory, 208 @Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec, 209 int maxMessageSize, @Nullable ProxyParameters proxy, Runnable tooManyPingsRunnable, 210 TransportTracer transportTracer) { 211 this.address = Preconditions.checkNotNull(address, "address"); 212 this.defaultAuthority = authority; 213 this.maxMessageSize = maxMessageSize; 214 this.executor = Preconditions.checkNotNull(executor, "executor"); 215 serializingExecutor = new SerializingExecutor(executor); 216 // Client initiated streams are odd, server initiated ones are even. Server should not need to 217 // use it. We start clients at 3 to avoid conflicting with HTTP negotiation. 218 nextStreamId = 3; 219 this.sslSocketFactory = sslSocketFactory; 220 this.hostnameVerifier = hostnameVerifier; 221 this.connectionSpec = Preconditions.checkNotNull(connectionSpec, "connectionSpec"); 222 this.stopwatchFactory = GrpcUtil.STOPWATCH_SUPPLIER; 223 this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent); 224 this.proxy = proxy; 225 this.tooManyPingsRunnable = 226 Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); 227 this.transportTracer = Preconditions.checkNotNull(transportTracer); 228 initTransportTracer(); 229 } 230 231 /** 232 * Create a transport connected to a fake peer for test. 233 */ 234 @VisibleForTesting OkHttpClientTransport( String userAgent, Executor executor, FrameReader frameReader, FrameWriter testFrameWriter, int nextStreamId, Socket socket, Supplier<Stopwatch> stopwatchFactory, @Nullable Runnable connectingCallback, SettableFuture<Void> connectedFuture, int maxMessageSize, Runnable tooManyPingsRunnable, TransportTracer transportTracer)235 OkHttpClientTransport( 236 String userAgent, 237 Executor executor, 238 FrameReader frameReader, 239 FrameWriter testFrameWriter, 240 int nextStreamId, 241 Socket socket, 242 Supplier<Stopwatch> stopwatchFactory, 243 @Nullable Runnable connectingCallback, 244 SettableFuture<Void> connectedFuture, 245 int maxMessageSize, 246 Runnable tooManyPingsRunnable, 247 TransportTracer transportTracer) { 248 address = null; 249 this.maxMessageSize = maxMessageSize; 250 defaultAuthority = "notarealauthority:80"; 251 this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent); 252 this.executor = Preconditions.checkNotNull(executor, "executor"); 253 serializingExecutor = new SerializingExecutor(executor); 254 this.testFrameReader = Preconditions.checkNotNull(frameReader, "frameReader"); 255 this.testFrameWriter = Preconditions.checkNotNull(testFrameWriter, "testFrameWriter"); 256 this.socket = Preconditions.checkNotNull(socket, "socket"); 257 this.nextStreamId = nextStreamId; 258 this.stopwatchFactory = stopwatchFactory; 259 this.connectionSpec = null; 260 this.connectingCallback = connectingCallback; 261 this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture"); 262 this.proxy = null; 263 this.tooManyPingsRunnable = 264 Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); 265 this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); 266 initTransportTracer(); 267 } 268 initTransportTracer()269 private void initTransportTracer() { 270 synchronized (lock) { // to make @GuardedBy linter happy 271 transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() { 272 @Override 273 public TransportTracer.FlowControlWindows read() { 274 synchronized (lock) { 275 long local = -1; // okhttp does not track the local window size 276 long remote = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0); 277 return new TransportTracer.FlowControlWindows(local, remote); 278 } 279 } 280 }); 281 } 282 } 283 284 /** 285 * Enable keepalive with custom delay and timeout. 286 */ enableKeepAlive(boolean enable, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls)287 void enableKeepAlive(boolean enable, long keepAliveTimeNanos, 288 long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) { 289 enableKeepAlive = enable; 290 this.keepAliveTimeNanos = keepAliveTimeNanos; 291 this.keepAliveTimeoutNanos = keepAliveTimeoutNanos; 292 this.keepAliveWithoutCalls = keepAliveWithoutCalls; 293 } 294 isForTest()295 private boolean isForTest() { 296 return address == null; 297 } 298 299 @Override ping(final PingCallback callback, Executor executor)300 public void ping(final PingCallback callback, Executor executor) { 301 checkState(frameWriter != null); 302 long data = 0; 303 Http2Ping p; 304 boolean writePing; 305 synchronized (lock) { 306 if (stopped) { 307 Http2Ping.notifyFailed(callback, executor, getPingFailure()); 308 return; 309 } 310 if (ping != null) { 311 // we only allow one outstanding ping at a time, so just add the callback to 312 // any outstanding operation 313 p = ping; 314 writePing = false; 315 } else { 316 // set outstanding operation and then write the ping after releasing lock 317 data = random.nextLong(); 318 Stopwatch stopwatch = stopwatchFactory.get(); 319 stopwatch.start(); 320 p = ping = new Http2Ping(data, stopwatch); 321 writePing = true; 322 transportTracer.reportKeepAliveSent(); 323 } 324 } 325 if (writePing) { 326 frameWriter.ping(false, (int) (data >>> 32), (int) data); 327 } 328 // If transport concurrently failed/stopped since we released the lock above, this could 329 // immediately invoke callback (which we shouldn't do while holding a lock) 330 p.addCallback(callback, executor); 331 } 332 333 @Override newStream(final MethodDescriptor<?, ?> method, final Metadata headers, CallOptions callOptions)334 public OkHttpClientStream newStream(final MethodDescriptor<?, ?> method, 335 final Metadata headers, CallOptions callOptions) { 336 Preconditions.checkNotNull(method, "method"); 337 Preconditions.checkNotNull(headers, "headers"); 338 StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers); 339 return new OkHttpClientStream( 340 method, 341 headers, 342 frameWriter, 343 OkHttpClientTransport.this, 344 outboundFlow, 345 lock, 346 maxMessageSize, 347 defaultAuthority, 348 userAgent, 349 statsTraceCtx, 350 transportTracer); 351 } 352 353 @GuardedBy("lock") streamReadyToStart(OkHttpClientStream clientStream)354 void streamReadyToStart(OkHttpClientStream clientStream) { 355 if (goAwayStatus != null) { 356 clientStream.transportState().transportReportStatus( 357 goAwayStatus, RpcProgress.REFUSED, true, new Metadata()); 358 } else if (streams.size() >= maxConcurrentStreams) { 359 pendingStreams.add(clientStream); 360 setInUse(); 361 } else { 362 startStream(clientStream); 363 } 364 } 365 366 @GuardedBy("lock") startStream(OkHttpClientStream stream)367 private void startStream(OkHttpClientStream stream) { 368 Preconditions.checkState( 369 stream.id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned"); 370 streams.put(nextStreamId, stream); 371 setInUse(); 372 stream.transportState().start(nextStreamId); 373 // For unary and server streaming, there will be a data frame soon, no need to flush the header. 374 if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING) 375 || stream.useGet()) { 376 frameWriter.flush(); 377 } 378 if (nextStreamId >= Integer.MAX_VALUE - 2) { 379 // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs 380 // correctly. 381 nextStreamId = Integer.MAX_VALUE; 382 startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, 383 Status.UNAVAILABLE.withDescription("Stream ids exhausted")); 384 } else { 385 nextStreamId += 2; 386 } 387 } 388 389 /** 390 * Starts pending streams, returns true if at least one pending stream is started. 391 */ 392 @GuardedBy("lock") startPendingStreams()393 private boolean startPendingStreams() { 394 boolean hasStreamStarted = false; 395 while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) { 396 OkHttpClientStream stream = pendingStreams.poll(); 397 startStream(stream); 398 hasStreamStarted = true; 399 } 400 return hasStreamStarted; 401 } 402 403 /** 404 * Removes given pending stream, used when a pending stream is cancelled. 405 */ 406 @GuardedBy("lock") removePendingStream(OkHttpClientStream pendingStream)407 void removePendingStream(OkHttpClientStream pendingStream) { 408 pendingStreams.remove(pendingStream); 409 maybeClearInUse(); 410 } 411 412 @Override start(Listener listener)413 public Runnable start(Listener listener) { 414 this.listener = Preconditions.checkNotNull(listener, "listener"); 415 416 if (enableKeepAlive) { 417 scheduler = SharedResourceHolder.get(TIMER_SERVICE); 418 keepAliveManager = new KeepAliveManager( 419 new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos, 420 keepAliveWithoutCalls); 421 keepAliveManager.onTransportStarted(); 422 } 423 424 frameWriter = new AsyncFrameWriter(this, serializingExecutor); 425 outboundFlow = new OutboundFlowController(this, frameWriter); 426 // Connecting in the serializingExecutor, so that some stream operations like synStream 427 // will be executed after connected. 428 serializingExecutor.execute(new Runnable() { 429 @Override 430 public void run() { 431 if (isForTest()) { 432 if (connectingCallback != null) { 433 connectingCallback.run(); 434 } 435 clientFrameHandler = new ClientFrameHandler(testFrameReader); 436 executor.execute(clientFrameHandler); 437 synchronized (lock) { 438 maxConcurrentStreams = Integer.MAX_VALUE; 439 startPendingStreams(); 440 } 441 frameWriter.becomeConnected(testFrameWriter, socket); 442 connectedFuture.set(null); 443 return; 444 } 445 446 // Use closed source on failure so that the reader immediately shuts down. 447 BufferedSource source = Okio.buffer(new Source() { 448 @Override 449 public long read(Buffer sink, long byteCount) { 450 return -1; 451 } 452 453 @Override 454 public Timeout timeout() { 455 return Timeout.NONE; 456 } 457 458 @Override 459 public void close() {} 460 }); 461 Variant variant = new Http2(); 462 BufferedSink sink; 463 Socket sock; 464 SSLSession sslSession = null; 465 try { 466 if (proxy == null) { 467 sock = new Socket(address.getAddress(), address.getPort()); 468 } else { 469 sock = createHttpProxySocket( 470 address, proxy.proxyAddress, proxy.username, proxy.password); 471 } 472 473 if (sslSocketFactory != null) { 474 SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade( 475 sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(), 476 connectionSpec); 477 sslSession = sslSocket.getSession(); 478 sock = sslSocket; 479 } 480 sock.setTcpNoDelay(true); 481 source = Okio.buffer(Okio.source(sock)); 482 sink = Okio.buffer(Okio.sink(sock)); 483 // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info 484 attributes = Attributes 485 .newBuilder() 486 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress()) 487 .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress()) 488 .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession) 489 .set(GrpcAttributes.ATTR_SECURITY_LEVEL, 490 sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY) 491 .build(); 492 } catch (StatusException e) { 493 startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus()); 494 return; 495 } catch (Exception e) { 496 onException(e); 497 return; 498 } finally { 499 clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true)); 500 executor.execute(clientFrameHandler); 501 } 502 503 FrameWriter rawFrameWriter; 504 synchronized (lock) { 505 socket = Preconditions.checkNotNull(sock, "socket"); 506 maxConcurrentStreams = Integer.MAX_VALUE; 507 startPendingStreams(); 508 if (sslSession != null) { 509 securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession)); 510 } 511 } 512 513 rawFrameWriter = variant.newWriter(sink, true); 514 frameWriter.becomeConnected(rawFrameWriter, socket); 515 516 try { 517 // Do these with the raw FrameWriter, so that they will be done in this thread, 518 // and before any possible pending stream operations. 519 rawFrameWriter.connectionPreface(); 520 Settings settings = new Settings(); 521 rawFrameWriter.settings(settings); 522 } catch (Exception e) { 523 onException(e); 524 return; 525 } 526 } 527 }); 528 return null; 529 } 530 createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress, String proxyUsername, String proxyPassword)531 private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress, 532 String proxyUsername, String proxyPassword) throws IOException, StatusException { 533 try { 534 Socket sock; 535 // The proxy address may not be resolved 536 if (proxyAddress.getAddress() != null) { 537 sock = new Socket(proxyAddress.getAddress(), proxyAddress.getPort()); 538 } else { 539 sock = new Socket(proxyAddress.getHostName(), proxyAddress.getPort()); 540 } 541 sock.setTcpNoDelay(true); 542 543 Source source = Okio.source(sock); 544 BufferedSink sink = Okio.buffer(Okio.sink(sock)); 545 546 // Prepare headers and request method line 547 Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword); 548 HttpUrl url = proxyRequest.httpUrl(); 549 String requestLine = String.format("CONNECT %s:%d HTTP/1.1", url.host(), url.port()); 550 551 // Write request to socket 552 sink.writeUtf8(requestLine).writeUtf8("\r\n"); 553 for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) { 554 sink.writeUtf8(proxyRequest.headers().name(i)) 555 .writeUtf8(": ") 556 .writeUtf8(proxyRequest.headers().value(i)) 557 .writeUtf8("\r\n"); 558 } 559 sink.writeUtf8("\r\n"); 560 // Flush buffer (flushes socket and sends request) 561 sink.flush(); 562 563 // Read status line, check if 2xx was returned 564 StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source)); 565 // Drain rest of headers 566 while (!readUtf8LineStrictUnbuffered(source).equals("")) {} 567 if (statusLine.code < 200 || statusLine.code >= 300) { 568 Buffer body = new Buffer(); 569 try { 570 sock.shutdownOutput(); 571 source.read(body, 1024); 572 } catch (IOException ex) { 573 body.writeUtf8("Unable to read body: " + ex.toString()); 574 } 575 try { 576 sock.close(); 577 } catch (IOException ignored) { 578 // ignored 579 } 580 String message = String.format( 581 "Response returned from proxy was not successful (expected 2xx, got %d %s). " 582 + "Response body:\n%s", 583 statusLine.code, statusLine.message, body.readUtf8()); 584 throw Status.UNAVAILABLE.withDescription(message).asException(); 585 } 586 return sock; 587 } catch (IOException e) { 588 throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e) 589 .asException(); 590 } 591 } 592 createHttpProxyRequest(InetSocketAddress address, String proxyUsername, String proxyPassword)593 private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername, 594 String proxyPassword) { 595 HttpUrl tunnelUrl = new HttpUrl.Builder() 596 .scheme("https") 597 .host(address.getHostName()) 598 .port(address.getPort()) 599 .build(); 600 Request.Builder request = new Request.Builder() 601 .url(tunnelUrl) 602 .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port()) 603 .header("User-Agent", userAgent); 604 605 // If we have proxy credentials, set them right away 606 if (proxyUsername != null && proxyPassword != null) { 607 request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword)); 608 } 609 return request.build(); 610 } 611 readUtf8LineStrictUnbuffered(Source source)612 private static String readUtf8LineStrictUnbuffered(Source source) throws IOException { 613 Buffer buffer = new Buffer(); 614 while (true) { 615 if (source.read(buffer, 1) == -1) { 616 throw new EOFException("\\n not found: " + buffer.readByteString().hex()); 617 } 618 if (buffer.getByte(buffer.size() - 1) == '\n') { 619 return buffer.readUtf8LineStrict(); 620 } 621 } 622 } 623 624 @Override toString()625 public String toString() { 626 return MoreObjects.toStringHelper(this) 627 .add("logId", logId.getId()) 628 .add("address", address) 629 .toString(); 630 } 631 632 @Override getLogId()633 public InternalLogId getLogId() { 634 return logId; 635 } 636 637 /** 638 * Gets the overridden authority hostname. If the authority is overridden to be an invalid 639 * authority, uri.getHost() will (rightly) return null, since the authority is no longer 640 * an actual service. This method overrides the behavior for practical reasons. For example, 641 * if an authority is in the form "invalid_authority" (note the "_"), rather than return null, 642 * we return the input. This is because the return value, in conjunction with getOverridenPort, 643 * are used by the SSL library to reconstruct the actual authority. It /already/ has a 644 * connection to the port, independent of this function. 645 * 646 * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do 647 * the wrong thing. An example wrong behavior would be "invalid_host:443". Registry based 648 * authorities do not have ports, so this is even more wrong than before. Sorry. 649 */ 650 @VisibleForTesting getOverridenHost()651 String getOverridenHost() { 652 URI uri = GrpcUtil.authorityToUri(defaultAuthority); 653 if (uri.getHost() != null) { 654 return uri.getHost(); 655 } 656 657 return defaultAuthority; 658 } 659 660 @VisibleForTesting getOverridenPort()661 int getOverridenPort() { 662 URI uri = GrpcUtil.authorityToUri(defaultAuthority); 663 if (uri.getPort() != -1) { 664 return uri.getPort(); 665 } 666 667 return address.getPort(); 668 } 669 670 @Override shutdown(Status reason)671 public void shutdown(Status reason) { 672 synchronized (lock) { 673 if (goAwayStatus != null) { 674 return; 675 } 676 677 goAwayStatus = reason; 678 listener.transportShutdown(goAwayStatus); 679 stopIfNecessary(); 680 } 681 } 682 683 @Override shutdownNow(Status reason)684 public void shutdownNow(Status reason) { 685 shutdown(reason); 686 synchronized (lock) { 687 Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator(); 688 while (it.hasNext()) { 689 Map.Entry<Integer, OkHttpClientStream> entry = it.next(); 690 it.remove(); 691 entry.getValue().transportState().transportReportStatus(reason, false, new Metadata()); 692 } 693 694 for (OkHttpClientStream stream : pendingStreams) { 695 stream.transportState().transportReportStatus(reason, true, new Metadata()); 696 } 697 pendingStreams.clear(); 698 maybeClearInUse(); 699 700 stopIfNecessary(); 701 } 702 } 703 704 @Override getAttributes()705 public Attributes getAttributes() { 706 return attributes; 707 } 708 709 /** 710 * Gets all active streams as an array. 711 */ getActiveStreams()712 OkHttpClientStream[] getActiveStreams() { 713 synchronized (lock) { 714 return streams.values().toArray(EMPTY_STREAM_ARRAY); 715 } 716 } 717 718 @VisibleForTesting getHandler()719 ClientFrameHandler getHandler() { 720 return clientFrameHandler; 721 } 722 723 @VisibleForTesting getPendingStreamSize()724 int getPendingStreamSize() { 725 synchronized (lock) { 726 return pendingStreams.size(); 727 } 728 } 729 730 /** 731 * Finish all active streams due to an IOException, then close the transport. 732 */ 733 @Override onException(Throwable failureCause)734 public void onException(Throwable failureCause) { 735 Preconditions.checkNotNull(failureCause, "failureCause"); 736 Status status = Status.UNAVAILABLE.withCause(failureCause); 737 startGoAway(0, ErrorCode.INTERNAL_ERROR, status); 738 } 739 740 /** 741 * Send GOAWAY to the server, then finish all active streams and close the transport. 742 */ onError(ErrorCode errorCode, String moreDetail)743 private void onError(ErrorCode errorCode, String moreDetail) { 744 startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail)); 745 } 746 startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status)747 private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) { 748 synchronized (lock) { 749 if (goAwayStatus == null) { 750 goAwayStatus = status; 751 listener.transportShutdown(status); 752 } 753 if (errorCode != null && !goAwaySent) { 754 // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated 755 // streams. The GOAWAY is part of graceful shutdown. 756 goAwaySent = true; 757 frameWriter.goAway(0, errorCode, new byte[0]); 758 } 759 760 Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator(); 761 while (it.hasNext()) { 762 Map.Entry<Integer, OkHttpClientStream> entry = it.next(); 763 if (entry.getKey() > lastKnownStreamId) { 764 it.remove(); 765 entry.getValue().transportState().transportReportStatus( 766 status, RpcProgress.REFUSED, false, new Metadata()); 767 } 768 } 769 770 for (OkHttpClientStream stream : pendingStreams) { 771 stream.transportState().transportReportStatus( 772 status, RpcProgress.REFUSED, true, new Metadata()); 773 } 774 pendingStreams.clear(); 775 maybeClearInUse(); 776 777 stopIfNecessary(); 778 } 779 } 780 781 /** 782 * Called when a stream is closed, we do things like: 783 * <ul> 784 * <li>Removing the stream from the map. 785 * <li>Optionally reporting the status. 786 * <li>Starting pending streams if we can. 787 * <li>Stopping the transport if this is the last live stream under a go-away status. 788 * </ul> 789 * 790 * @param streamId the Id of the stream. 791 * @param status the final status of this stream, null means no need to report. 792 * @param stopDelivery interrupt queued messages in the deframer 793 * @param errorCode reset the stream with this ErrorCode if not null. 794 * @param trailers the trailers received if not null 795 */ finishStream( int streamId, @Nullable Status status, RpcProgress rpcProgress, boolean stopDelivery, @Nullable ErrorCode errorCode, @Nullable Metadata trailers)796 void finishStream( 797 int streamId, 798 @Nullable Status status, 799 RpcProgress rpcProgress, 800 boolean stopDelivery, 801 @Nullable ErrorCode errorCode, 802 @Nullable Metadata trailers) { 803 synchronized (lock) { 804 OkHttpClientStream stream = streams.remove(streamId); 805 if (stream != null) { 806 if (errorCode != null) { 807 frameWriter.rstStream(streamId, ErrorCode.CANCEL); 808 } 809 if (status != null) { 810 stream 811 .transportState() 812 .transportReportStatus( 813 status, 814 rpcProgress, 815 stopDelivery, 816 trailers != null ? trailers : new Metadata()); 817 } 818 if (!startPendingStreams()) { 819 stopIfNecessary(); 820 maybeClearInUse(); 821 } 822 } 823 } 824 } 825 826 /** 827 * When the transport is in goAway state, we should stop it once all active streams finish. 828 */ 829 @GuardedBy("lock") stopIfNecessary()830 private void stopIfNecessary() { 831 if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) { 832 return; 833 } 834 if (stopped) { 835 return; 836 } 837 stopped = true; 838 839 if (keepAliveManager != null) { 840 keepAliveManager.onTransportTermination(); 841 // KeepAliveManager should stop using the scheduler after onTransportTermination gets called. 842 scheduler = SharedResourceHolder.release(TIMER_SERVICE, scheduler); 843 } 844 845 if (ping != null) { 846 ping.failed(getPingFailure()); 847 ping = null; 848 } 849 850 if (!goAwaySent) { 851 // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated 852 // streams. The GOAWAY is part of graceful shutdown. 853 goAwaySent = true; 854 frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]); 855 } 856 857 // We will close the underlying socket in the writing thread to break out the reader 858 // thread, which will close the frameReader and notify the listener. 859 frameWriter.close(); 860 } 861 862 @GuardedBy("lock") maybeClearInUse()863 private void maybeClearInUse() { 864 if (inUse) { 865 if (pendingStreams.isEmpty() && streams.isEmpty()) { 866 inUse = false; 867 listener.transportInUse(false); 868 if (keepAliveManager != null) { 869 // We don't have any active streams. No need to do keepalives any more. 870 // Again, we have to call this inside the lock to avoid the race between onTransportIdle 871 // and onTransportActive. 872 keepAliveManager.onTransportIdle(); 873 } 874 } 875 } 876 } 877 878 @GuardedBy("lock") setInUse()879 private void setInUse() { 880 if (!inUse) { 881 inUse = true; 882 listener.transportInUse(true); 883 if (keepAliveManager != null) { 884 // We have a new stream. We might need to do keepalives now. 885 // Note that we have to do this inside the lock to avoid calling 886 // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong 887 // order. 888 keepAliveManager.onTransportActive(); 889 } 890 } 891 } 892 getPingFailure()893 private Throwable getPingFailure() { 894 synchronized (lock) { 895 if (goAwayStatus != null) { 896 return goAwayStatus.asException(); 897 } else { 898 return Status.UNAVAILABLE.withDescription("Connection closed").asException(); 899 } 900 } 901 } 902 mayHaveCreatedStream(int streamId)903 boolean mayHaveCreatedStream(int streamId) { 904 synchronized (lock) { 905 return streamId < nextStreamId && (streamId & 1) == 1; 906 } 907 } 908 getStream(int streamId)909 OkHttpClientStream getStream(int streamId) { 910 synchronized (lock) { 911 return streams.get(streamId); 912 } 913 } 914 915 /** 916 * Returns a Grpc status corresponding to the given ErrorCode. 917 */ 918 @VisibleForTesting toGrpcStatus(ErrorCode code)919 static Status toGrpcStatus(ErrorCode code) { 920 Status status = ERROR_CODE_TO_STATUS.get(code); 921 return status != null ? status : Status.UNKNOWN.withDescription( 922 "Unknown http2 error code: " + code.httpCode); 923 } 924 925 @Override getStats()926 public ListenableFuture<SocketStats> getStats() { 927 SettableFuture<SocketStats> ret = SettableFuture.create(); 928 synchronized (lock) { 929 if (socket == null) { 930 ret.set(new SocketStats( 931 transportTracer.getStats(), 932 /*local=*/ null, 933 /*remote=*/ null, 934 new InternalChannelz.SocketOptions.Builder().build(), 935 /*security=*/ null)); 936 } else { 937 ret.set(new SocketStats( 938 transportTracer.getStats(), 939 socket.getLocalSocketAddress(), 940 socket.getRemoteSocketAddress(), 941 Utils.getSocketOptions(socket), 942 securityInfo)); 943 } 944 return ret; 945 } 946 } 947 948 /** 949 * Runnable which reads frames and dispatches them to in flight calls. 950 */ 951 @VisibleForTesting 952 class ClientFrameHandler implements FrameReader.Handler, Runnable { 953 FrameReader frameReader; 954 boolean firstSettings = true; 955 ClientFrameHandler(FrameReader frameReader)956 ClientFrameHandler(FrameReader frameReader) { 957 this.frameReader = frameReader; 958 } 959 960 @Override run()961 public void run() { 962 String threadName = Thread.currentThread().getName(); 963 if (!GrpcUtil.IS_RESTRICTED_APPENGINE) { 964 Thread.currentThread().setName("OkHttpClientTransport"); 965 } 966 try { 967 // Read until the underlying socket closes. 968 while (frameReader.nextFrame(this)) { 969 if (keepAliveManager != null) { 970 keepAliveManager.onDataReceived(); 971 } 972 } 973 // frameReader.nextFrame() returns false when the underlying read encounters an IOException, 974 // it may be triggered by the socket closing, in such case, the startGoAway() will do 975 // nothing, otherwise, we finish all streams since it's a real IO issue. 976 startGoAway(0, ErrorCode.INTERNAL_ERROR, 977 Status.UNAVAILABLE.withDescription("End of stream or IOException")); 978 } catch (Throwable t) { 979 // TODO(madongfly): Send the exception message to the server. 980 startGoAway( 981 0, 982 ErrorCode.PROTOCOL_ERROR, 983 Status.UNAVAILABLE.withDescription("error in frame handler").withCause(t)); 984 } finally { 985 try { 986 frameReader.close(); 987 } catch (IOException ex) { 988 log.log(Level.INFO, "Exception closing frame reader", ex); 989 } 990 listener.transportTerminated(); 991 if (!GrpcUtil.IS_RESTRICTED_APPENGINE) { 992 // Restore the original thread name. 993 Thread.currentThread().setName(threadName); 994 } 995 } 996 } 997 998 /** 999 * Handle a HTTP2 DATA frame. 1000 */ 1001 @Override data(boolean inFinished, int streamId, BufferedSource in, int length)1002 public void data(boolean inFinished, int streamId, BufferedSource in, int length) 1003 throws IOException { 1004 OkHttpClientStream stream = getStream(streamId); 1005 if (stream == null) { 1006 if (mayHaveCreatedStream(streamId)) { 1007 frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM); 1008 in.skip(length); 1009 } else { 1010 onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId); 1011 return; 1012 } 1013 } else { 1014 // Wait until the frame is complete. 1015 in.require(length); 1016 1017 Buffer buf = new Buffer(); 1018 buf.write(in.buffer(), length); 1019 synchronized (lock) { 1020 stream.transportState().transportDataReceived(buf, inFinished); 1021 } 1022 } 1023 1024 // connection window update 1025 connectionUnacknowledgedBytesRead += length; 1026 if (connectionUnacknowledgedBytesRead >= Utils.DEFAULT_WINDOW_SIZE / 2) { 1027 frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead); 1028 connectionUnacknowledgedBytesRead = 0; 1029 } 1030 } 1031 1032 /** 1033 * Handle HTTP2 HEADER and CONTINUATION frames. 1034 */ 1035 @Override headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode)1036 public void headers(boolean outFinished, 1037 boolean inFinished, 1038 int streamId, 1039 int associatedStreamId, 1040 List<Header> headerBlock, 1041 HeadersMode headersMode) { 1042 boolean unknownStream = false; 1043 synchronized (lock) { 1044 OkHttpClientStream stream = streams.get(streamId); 1045 if (stream == null) { 1046 if (mayHaveCreatedStream(streamId)) { 1047 frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM); 1048 } else { 1049 unknownStream = true; 1050 } 1051 } else { 1052 stream.transportState().transportHeadersReceived(headerBlock, inFinished); 1053 } 1054 } 1055 if (unknownStream) { 1056 // We don't expect any server-initiated streams. 1057 onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId); 1058 } 1059 } 1060 1061 @Override rstStream(int streamId, ErrorCode errorCode)1062 public void rstStream(int streamId, ErrorCode errorCode) { 1063 Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream"); 1064 boolean stopDelivery = 1065 (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED); 1066 finishStream( 1067 streamId, status, 1068 errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED, 1069 stopDelivery, null, null); 1070 } 1071 1072 @Override settings(boolean clearPrevious, Settings settings)1073 public void settings(boolean clearPrevious, Settings settings) { 1074 boolean outboundWindowSizeIncreased = false; 1075 synchronized (lock) { 1076 if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) { 1077 int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get( 1078 settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS); 1079 maxConcurrentStreams = receivedMaxConcurrentStreams; 1080 } 1081 1082 if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) { 1083 int initialWindowSize = OkHttpSettingsUtil.get( 1084 settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE); 1085 outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize); 1086 } 1087 if (firstSettings) { 1088 listener.transportReady(); 1089 firstSettings = false; 1090 } 1091 1092 // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any 1093 // writes due to update in settings must be sent after SETTINGS acknowledgment frame, 1094 // otherwise it will cause a stream error (RST_STREAM). 1095 frameWriter.ackSettings(settings); 1096 1097 // send any pending bytes / streams 1098 if (outboundWindowSizeIncreased) { 1099 outboundFlow.writeStreams(); 1100 } 1101 startPendingStreams(); 1102 } 1103 } 1104 1105 @Override ping(boolean ack, int payload1, int payload2)1106 public void ping(boolean ack, int payload1, int payload2) { 1107 if (!ack) { 1108 frameWriter.ping(true, payload1, payload2); 1109 } else { 1110 Http2Ping p = null; 1111 long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL); 1112 synchronized (lock) { 1113 if (ping != null) { 1114 if (ping.payload() == ackPayload) { 1115 p = ping; 1116 ping = null; 1117 } else { 1118 log.log(Level.WARNING, String.format("Received unexpected ping ack. " 1119 + "Expecting %d, got %d", ping.payload(), ackPayload)); 1120 } 1121 } else { 1122 log.warning("Received unexpected ping ack. No ping outstanding"); 1123 } 1124 } 1125 // don't complete it while holding lock since callbacks could run immediately 1126 if (p != null) { 1127 p.complete(); 1128 } 1129 } 1130 } 1131 1132 @Override ackSettings()1133 public void ackSettings() { 1134 // Do nothing currently. 1135 } 1136 1137 @Override goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData)1138 public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) { 1139 if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) { 1140 String data = debugData.utf8(); 1141 log.log(Level.WARNING, String.format( 1142 "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data)); 1143 if ("too_many_pings".equals(data)) { 1144 tooManyPingsRunnable.run(); 1145 } 1146 } 1147 Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode) 1148 .augmentDescription("Received Goaway"); 1149 if (debugData.size() > 0) { 1150 // If a debug message was provided, use it. 1151 status = status.augmentDescription(debugData.utf8()); 1152 } 1153 startGoAway(lastGoodStreamId, null, status); 1154 } 1155 1156 @Override pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)1157 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) 1158 throws IOException { 1159 // We don't accept server initiated stream. 1160 frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR); 1161 } 1162 1163 @Override windowUpdate(int streamId, long delta)1164 public void windowUpdate(int streamId, long delta) { 1165 if (delta == 0) { 1166 String errorMsg = "Received 0 flow control window increment."; 1167 if (streamId == 0) { 1168 onError(ErrorCode.PROTOCOL_ERROR, errorMsg); 1169 } else { 1170 finishStream( 1171 streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false, 1172 ErrorCode.PROTOCOL_ERROR, null); 1173 } 1174 return; 1175 } 1176 1177 boolean unknownStream = false; 1178 synchronized (lock) { 1179 if (streamId == Utils.CONNECTION_STREAM_ID) { 1180 outboundFlow.windowUpdate(null, (int) delta); 1181 return; 1182 } 1183 1184 OkHttpClientStream stream = streams.get(streamId); 1185 if (stream != null) { 1186 outboundFlow.windowUpdate(stream, (int) delta); 1187 } else if (!mayHaveCreatedStream(streamId)) { 1188 unknownStream = true; 1189 } 1190 } 1191 if (unknownStream) { 1192 onError(ErrorCode.PROTOCOL_ERROR, 1193 "Received window_update for unknown stream: " + streamId); 1194 } 1195 } 1196 1197 @Override priority(int streamId, int streamDependency, int weight, boolean exclusive)1198 public void priority(int streamId, int streamDependency, int weight, boolean exclusive) { 1199 // Ignore priority change. 1200 // TODO(madongfly): log 1201 } 1202 1203 @Override alternateService(int streamId, String origin, ByteString protocol, String host, int port, long maxAge)1204 public void alternateService(int streamId, String origin, ByteString protocol, String host, 1205 int port, long maxAge) { 1206 // TODO(madongfly): Deal with alternateService propagation 1207 } 1208 } 1209 } 1210