1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.okhttp; 18 19 import static com.google.common.base.Preconditions.checkState; 20 import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import com.google.common.base.MoreObjects; 24 import com.google.common.base.Preconditions; 25 import com.google.common.base.Stopwatch; 26 import com.google.common.base.Supplier; 27 import com.google.common.util.concurrent.ListenableFuture; 28 import com.google.common.util.concurrent.SettableFuture; 29 import com.squareup.okhttp.Credentials; 30 import com.squareup.okhttp.HttpUrl; 31 import com.squareup.okhttp.Request; 32 import com.squareup.okhttp.internal.http.StatusLine; 33 import io.grpc.Attributes; 34 import io.grpc.CallCredentials; 35 import io.grpc.CallOptions; 36 import io.grpc.Grpc; 37 import io.grpc.InternalChannelz; 38 import io.grpc.InternalChannelz.SocketStats; 39 import io.grpc.InternalLogId; 40 import io.grpc.Metadata; 41 import io.grpc.MethodDescriptor; 42 import io.grpc.MethodDescriptor.MethodType; 43 import io.grpc.SecurityLevel; 44 import io.grpc.Status; 45 import io.grpc.Status.Code; 46 import io.grpc.StatusException; 47 import io.grpc.internal.ClientStreamListener.RpcProgress; 48 import io.grpc.internal.ConnectionClientTransport; 49 import io.grpc.internal.GrpcUtil; 50 import io.grpc.internal.Http2Ping; 51 import io.grpc.internal.KeepAliveManager; 52 import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger; 53 import io.grpc.internal.ProxyParameters; 54 import io.grpc.internal.SerializingExecutor; 55 import io.grpc.internal.SharedResourceHolder; 56 import io.grpc.internal.StatsTraceContext; 57 import io.grpc.internal.TransportTracer; 58 import io.grpc.okhttp.AsyncFrameWriter.TransportExceptionHandler; 59 import io.grpc.okhttp.internal.ConnectionSpec; 60 import io.grpc.okhttp.internal.framed.ErrorCode; 61 import io.grpc.okhttp.internal.framed.FrameReader; 62 import io.grpc.okhttp.internal.framed.FrameWriter; 63 import io.grpc.okhttp.internal.framed.Header; 64 import io.grpc.okhttp.internal.framed.HeadersMode; 65 import io.grpc.okhttp.internal.framed.Http2; 66 import io.grpc.okhttp.internal.framed.Settings; 67 import io.grpc.okhttp.internal.framed.Variant; 68 import java.io.EOFException; 69 import java.io.IOException; 70 import java.net.InetSocketAddress; 71 import java.net.Socket; 72 import java.net.URI; 73 import java.util.Collections; 74 import java.util.EnumMap; 75 import java.util.HashMap; 76 import java.util.Iterator; 77 import java.util.LinkedList; 78 import java.util.List; 79 import java.util.Map; 80 import java.util.Random; 81 import java.util.concurrent.Executor; 82 import java.util.concurrent.ScheduledExecutorService; 83 import java.util.logging.Level; 84 import java.util.logging.Logger; 85 import javax.annotation.Nullable; 86 import javax.annotation.concurrent.GuardedBy; 87 import javax.net.ssl.HostnameVerifier; 88 import javax.net.ssl.SSLSession; 89 import javax.net.ssl.SSLSocket; 90 import javax.net.ssl.SSLSocketFactory; 91 import okio.Buffer; 92 import okio.BufferedSink; 93 import okio.BufferedSource; 94 import okio.ByteString; 95 import okio.Okio; 96 import okio.Source; 97 import okio.Timeout; 98 99 /** 100 * A okhttp-based {@link ConnectionClientTransport} implementation. 101 */ 102 class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler { 103 private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap(); 104 private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName()); 105 private static final OkHttpClientStream[] EMPTY_STREAM_ARRAY = new OkHttpClientStream[0]; 106 buildErrorCodeToStatusMap()107 private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() { 108 Map<ErrorCode, Status> errorToStatus = new EnumMap<ErrorCode, Status>(ErrorCode.class); 109 errorToStatus.put(ErrorCode.NO_ERROR, 110 Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent")); 111 errorToStatus.put(ErrorCode.PROTOCOL_ERROR, 112 Status.INTERNAL.withDescription("Protocol error")); 113 errorToStatus.put(ErrorCode.INTERNAL_ERROR, 114 Status.INTERNAL.withDescription("Internal error")); 115 errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR, 116 Status.INTERNAL.withDescription("Flow control error")); 117 errorToStatus.put(ErrorCode.STREAM_CLOSED, 118 Status.INTERNAL.withDescription("Stream closed")); 119 errorToStatus.put(ErrorCode.FRAME_TOO_LARGE, 120 Status.INTERNAL.withDescription("Frame too large")); 121 errorToStatus.put(ErrorCode.REFUSED_STREAM, 122 Status.UNAVAILABLE.withDescription("Refused stream")); 123 errorToStatus.put(ErrorCode.CANCEL, 124 Status.CANCELLED.withDescription("Cancelled")); 125 errorToStatus.put(ErrorCode.COMPRESSION_ERROR, 126 Status.INTERNAL.withDescription("Compression error")); 127 errorToStatus.put(ErrorCode.CONNECT_ERROR, 128 Status.INTERNAL.withDescription("Connect error")); 129 errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM, 130 Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm")); 131 errorToStatus.put(ErrorCode.INADEQUATE_SECURITY, 132 Status.PERMISSION_DENIED.withDescription("Inadequate security")); 133 return Collections.unmodifiableMap(errorToStatus); 134 } 135 136 private final InetSocketAddress address; 137 private final String defaultAuthority; 138 private final String userAgent; 139 private final Random random = new Random(); 140 // Returns new unstarted stopwatches 141 private final Supplier<Stopwatch> stopwatchFactory; 142 private Listener listener; 143 private FrameReader testFrameReader; 144 private AsyncFrameWriter frameWriter; 145 private OutboundFlowController outboundFlow; 146 private final Object lock = new Object(); 147 private final InternalLogId logId = InternalLogId.allocate(getClass().getName()); 148 @GuardedBy("lock") 149 private int nextStreamId; 150 @GuardedBy("lock") 151 private final Map<Integer, OkHttpClientStream> streams = 152 new HashMap<Integer, OkHttpClientStream>(); 153 private final Executor executor; 154 // Wrap on executor, to guarantee some operations be executed serially. 155 private final SerializingExecutor serializingExecutor; 156 private final int maxMessageSize; 157 private int connectionUnacknowledgedBytesRead; 158 private ClientFrameHandler clientFrameHandler; 159 // Caution: Not synchronized, new value can only be safely read after the connection is complete. 160 private Attributes attributes = Attributes.EMPTY; 161 /** 162 * Indicates the transport is in go-away state: no new streams will be processed, but existing 163 * streams may continue. 164 */ 165 @GuardedBy("lock") 166 private Status goAwayStatus; 167 @GuardedBy("lock") 168 private boolean goAwaySent; 169 @GuardedBy("lock") 170 private Http2Ping ping; 171 @GuardedBy("lock") 172 private boolean stopped; 173 @GuardedBy("lock") 174 private boolean inUse; 175 private SSLSocketFactory sslSocketFactory; 176 private HostnameVerifier hostnameVerifier; 177 private Socket socket; 178 @GuardedBy("lock") 179 private int maxConcurrentStreams = 0; 180 @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty 181 @GuardedBy("lock") 182 private LinkedList<OkHttpClientStream> pendingStreams = new LinkedList<OkHttpClientStream>(); 183 private final ConnectionSpec connectionSpec; 184 private FrameWriter testFrameWriter; 185 private ScheduledExecutorService scheduler; 186 private KeepAliveManager keepAliveManager; 187 private boolean enableKeepAlive; 188 private long keepAliveTimeNanos; 189 private long keepAliveTimeoutNanos; 190 private boolean keepAliveWithoutCalls; 191 private final Runnable tooManyPingsRunnable; 192 @GuardedBy("lock") 193 private final TransportTracer transportTracer; 194 @GuardedBy("lock") 195 private InternalChannelz.Security securityInfo; 196 197 @VisibleForTesting 198 @Nullable 199 final ProxyParameters proxy; 200 201 // The following fields should only be used for test. 202 Runnable connectingCallback; 203 SettableFuture<Void> connectedFuture; 204 205 OkHttpClientTransport(InetSocketAddress address, String authority, @Nullable String userAgent, Executor executor, @Nullable SSLSocketFactory sslSocketFactory, @Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec, int maxMessageSize, @Nullable ProxyParameters proxy, Runnable tooManyPingsRunnable, TransportTracer transportTracer)206 OkHttpClientTransport(InetSocketAddress address, String authority, @Nullable String userAgent, 207 Executor executor, @Nullable SSLSocketFactory sslSocketFactory, 208 @Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec, 209 int maxMessageSize, @Nullable ProxyParameters proxy, Runnable tooManyPingsRunnable, 210 TransportTracer transportTracer) { 211 this.address = Preconditions.checkNotNull(address, "address"); 212 this.defaultAuthority = authority; 213 this.maxMessageSize = maxMessageSize; 214 this.executor = Preconditions.checkNotNull(executor, "executor"); 215 serializingExecutor = new SerializingExecutor(executor); 216 // Client initiated streams are odd, server initiated ones are even. Server should not need to 217 // use it. We start clients at 3 to avoid conflicting with HTTP negotiation. 218 nextStreamId = 3; 219 this.sslSocketFactory = sslSocketFactory; 220 this.hostnameVerifier = hostnameVerifier; 221 this.connectionSpec = Preconditions.checkNotNull(connectionSpec, "connectionSpec"); 222 this.stopwatchFactory = GrpcUtil.STOPWATCH_SUPPLIER; 223 this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent); 224 this.proxy = proxy; 225 this.tooManyPingsRunnable = 226 Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); 227 this.transportTracer = Preconditions.checkNotNull(transportTracer); 228 initTransportTracer(); 229 } 230 231 /** 232 * Create a transport connected to a fake peer for test. 233 */ 234 @VisibleForTesting OkHttpClientTransport( String userAgent, Executor executor, FrameReader frameReader, FrameWriter testFrameWriter, int nextStreamId, Socket socket, Supplier<Stopwatch> stopwatchFactory, @Nullable Runnable connectingCallback, SettableFuture<Void> connectedFuture, int maxMessageSize, Runnable tooManyPingsRunnable, TransportTracer transportTracer)235 OkHttpClientTransport( 236 String userAgent, 237 Executor executor, 238 FrameReader frameReader, 239 FrameWriter testFrameWriter, 240 int nextStreamId, 241 Socket socket, 242 Supplier<Stopwatch> stopwatchFactory, 243 @Nullable Runnable connectingCallback, 244 SettableFuture<Void> connectedFuture, 245 int maxMessageSize, 246 Runnable tooManyPingsRunnable, 247 TransportTracer transportTracer) { 248 address = null; 249 this.maxMessageSize = maxMessageSize; 250 defaultAuthority = "notarealauthority:80"; 251 this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent); 252 this.executor = Preconditions.checkNotNull(executor, "executor"); 253 serializingExecutor = new SerializingExecutor(executor); 254 this.testFrameReader = Preconditions.checkNotNull(frameReader, "frameReader"); 255 this.testFrameWriter = Preconditions.checkNotNull(testFrameWriter, "testFrameWriter"); 256 this.socket = Preconditions.checkNotNull(socket, "socket"); 257 this.nextStreamId = nextStreamId; 258 this.stopwatchFactory = stopwatchFactory; 259 this.connectionSpec = null; 260 this.connectingCallback = connectingCallback; 261 this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture"); 262 this.proxy = null; 263 this.tooManyPingsRunnable = 264 Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); 265 this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); 266 initTransportTracer(); 267 } 268 initTransportTracer()269 private void initTransportTracer() { 270 synchronized (lock) { // to make @GuardedBy linter happy 271 transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() { 272 @Override 273 public TransportTracer.FlowControlWindows read() { 274 synchronized (lock) { 275 long local = -1; // okhttp does not track the local window size 276 long remote = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0); 277 return new TransportTracer.FlowControlWindows(local, remote); 278 } 279 } 280 }); 281 } 282 } 283 284 /** 285 * Enable keepalive with custom delay and timeout. 286 */ enableKeepAlive(boolean enable, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls)287 void enableKeepAlive(boolean enable, long keepAliveTimeNanos, 288 long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) { 289 enableKeepAlive = enable; 290 this.keepAliveTimeNanos = keepAliveTimeNanos; 291 this.keepAliveTimeoutNanos = keepAliveTimeoutNanos; 292 this.keepAliveWithoutCalls = keepAliveWithoutCalls; 293 } 294 isForTest()295 private boolean isForTest() { 296 return address == null; 297 } 298 299 @Override ping(final PingCallback callback, Executor executor)300 public void ping(final PingCallback callback, Executor executor) { 301 checkState(frameWriter != null); 302 long data = 0; 303 Http2Ping p; 304 boolean writePing; 305 synchronized (lock) { 306 if (stopped) { 307 Http2Ping.notifyFailed(callback, executor, getPingFailure()); 308 return; 309 } 310 if (ping != null) { 311 // we only allow one outstanding ping at a time, so just add the callback to 312 // any outstanding operation 313 p = ping; 314 writePing = false; 315 } else { 316 // set outstanding operation and then write the ping after releasing lock 317 data = random.nextLong(); 318 Stopwatch stopwatch = stopwatchFactory.get(); 319 stopwatch.start(); 320 p = ping = new Http2Ping(data, stopwatch); 321 writePing = true; 322 transportTracer.reportKeepAliveSent(); 323 } 324 } 325 if (writePing) { 326 frameWriter.ping(false, (int) (data >>> 32), (int) data); 327 } 328 // If transport concurrently failed/stopped since we released the lock above, this could 329 // immediately invoke callback (which we shouldn't do while holding a lock) 330 p.addCallback(callback, executor); 331 } 332 333 @Override newStream(final MethodDescriptor<?, ?> method, final Metadata headers, CallOptions callOptions)334 public OkHttpClientStream newStream(final MethodDescriptor<?, ?> method, 335 final Metadata headers, CallOptions callOptions) { 336 Preconditions.checkNotNull(method, "method"); 337 Preconditions.checkNotNull(headers, "headers"); 338 StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers); 339 return new OkHttpClientStream( 340 method, 341 headers, 342 frameWriter, 343 OkHttpClientTransport.this, 344 outboundFlow, 345 lock, 346 maxMessageSize, 347 defaultAuthority, 348 userAgent, 349 statsTraceCtx, 350 transportTracer); 351 } 352 353 @GuardedBy("lock") streamReadyToStart(OkHttpClientStream clientStream)354 void streamReadyToStart(OkHttpClientStream clientStream) { 355 if (goAwayStatus != null) { 356 clientStream.transportState().transportReportStatus( 357 goAwayStatus, RpcProgress.REFUSED, true, new Metadata()); 358 } else if (streams.size() >= maxConcurrentStreams) { 359 pendingStreams.add(clientStream); 360 setInUse(); 361 } else { 362 startStream(clientStream); 363 } 364 } 365 366 @GuardedBy("lock") startStream(OkHttpClientStream stream)367 private void startStream(OkHttpClientStream stream) { 368 Preconditions.checkState( 369 stream.id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned"); 370 streams.put(nextStreamId, stream); 371 setInUse(); 372 stream.transportState().start(nextStreamId); 373 // For unary and server streaming, there will be a data frame soon, no need to flush the header. 374 if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING) 375 || stream.useGet()) { 376 frameWriter.flush(); 377 } 378 if (nextStreamId >= Integer.MAX_VALUE - 2) { 379 // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs 380 // correctly. 381 nextStreamId = Integer.MAX_VALUE; 382 startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, 383 Status.UNAVAILABLE.withDescription("Stream ids exhausted")); 384 } else { 385 nextStreamId += 2; 386 } 387 } 388 389 /** 390 * Starts pending streams, returns true if at least one pending stream is started. 391 */ 392 @GuardedBy("lock") startPendingStreams()393 private boolean startPendingStreams() { 394 boolean hasStreamStarted = false; 395 while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) { 396 OkHttpClientStream stream = pendingStreams.poll(); 397 startStream(stream); 398 hasStreamStarted = true; 399 } 400 return hasStreamStarted; 401 } 402 403 /** 404 * Removes given pending stream, used when a pending stream is cancelled. 405 */ 406 @GuardedBy("lock") removePendingStream(OkHttpClientStream pendingStream)407 void removePendingStream(OkHttpClientStream pendingStream) { 408 pendingStreams.remove(pendingStream); 409 maybeClearInUse(); 410 } 411 412 @Override start(Listener listener)413 public Runnable start(Listener listener) { 414 this.listener = Preconditions.checkNotNull(listener, "listener"); 415 416 if (enableKeepAlive) { 417 scheduler = SharedResourceHolder.get(TIMER_SERVICE); 418 keepAliveManager = new KeepAliveManager( 419 new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos, 420 keepAliveWithoutCalls); 421 keepAliveManager.onTransportStarted(); 422 } 423 424 frameWriter = new AsyncFrameWriter(this, serializingExecutor); 425 outboundFlow = new OutboundFlowController(this, frameWriter); 426 // Connecting in the serializingExecutor, so that some stream operations like synStream 427 // will be executed after connected. 428 serializingExecutor.execute(new Runnable() { 429 @Override 430 public void run() { 431 if (isForTest()) { 432 if (connectingCallback != null) { 433 connectingCallback.run(); 434 } 435 clientFrameHandler = new ClientFrameHandler(testFrameReader); 436 executor.execute(clientFrameHandler); 437 synchronized (lock) { 438 maxConcurrentStreams = Integer.MAX_VALUE; 439 startPendingStreams(); 440 } 441 frameWriter.becomeConnected(testFrameWriter, socket); 442 connectedFuture.set(null); 443 return; 444 } 445 446 // Use closed source on failure so that the reader immediately shuts down. 447 BufferedSource source = Okio.buffer(new Source() { 448 @Override 449 public long read(Buffer sink, long byteCount) { 450 return -1; 451 } 452 453 @Override 454 public Timeout timeout() { 455 return Timeout.NONE; 456 } 457 458 @Override 459 public void close() {} 460 }); 461 Variant variant = new Http2(); 462 BufferedSink sink; 463 Socket sock; 464 SSLSession sslSession = null; 465 try { 466 if (proxy == null) { 467 sock = new Socket(address.getAddress(), address.getPort()); 468 } else { 469 sock = createHttpProxySocket( 470 address, proxy.proxyAddress, proxy.username, proxy.password); 471 } 472 473 if (sslSocketFactory != null) { 474 SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade( 475 sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(), 476 connectionSpec); 477 sslSession = sslSocket.getSession(); 478 sock = sslSocket; 479 } 480 sock.setTcpNoDelay(true); 481 source = Okio.buffer(Okio.source(sock)); 482 sink = Okio.buffer(Okio.sink(sock)); 483 // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info 484 attributes = Attributes 485 .newBuilder() 486 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress()) 487 .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession) 488 .set(CallCredentials.ATTR_SECURITY_LEVEL, 489 sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY) 490 .build(); 491 } catch (StatusException e) { 492 startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus()); 493 return; 494 } catch (Exception e) { 495 onException(e); 496 return; 497 } finally { 498 clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true)); 499 executor.execute(clientFrameHandler); 500 } 501 502 FrameWriter rawFrameWriter; 503 synchronized (lock) { 504 socket = Preconditions.checkNotNull(sock, "socket"); 505 maxConcurrentStreams = Integer.MAX_VALUE; 506 startPendingStreams(); 507 if (sslSession != null) { 508 securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession)); 509 } 510 } 511 512 rawFrameWriter = variant.newWriter(sink, true); 513 frameWriter.becomeConnected(rawFrameWriter, socket); 514 515 try { 516 // Do these with the raw FrameWriter, so that they will be done in this thread, 517 // and before any possible pending stream operations. 518 rawFrameWriter.connectionPreface(); 519 Settings settings = new Settings(); 520 rawFrameWriter.settings(settings); 521 } catch (Exception e) { 522 onException(e); 523 return; 524 } 525 } 526 }); 527 return null; 528 } 529 createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress, String proxyUsername, String proxyPassword)530 private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress, 531 String proxyUsername, String proxyPassword) throws IOException, StatusException { 532 try { 533 Socket sock; 534 // The proxy address may not be resolved 535 if (proxyAddress.getAddress() != null) { 536 sock = new Socket(proxyAddress.getAddress(), proxyAddress.getPort()); 537 } else { 538 sock = new Socket(proxyAddress.getHostName(), proxyAddress.getPort()); 539 } 540 sock.setTcpNoDelay(true); 541 542 Source source = Okio.source(sock); 543 BufferedSink sink = Okio.buffer(Okio.sink(sock)); 544 545 // Prepare headers and request method line 546 Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword); 547 HttpUrl url = proxyRequest.httpUrl(); 548 String requestLine = String.format("CONNECT %s:%d HTTP/1.1", url.host(), url.port()); 549 550 // Write request to socket 551 sink.writeUtf8(requestLine).writeUtf8("\r\n"); 552 for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) { 553 sink.writeUtf8(proxyRequest.headers().name(i)) 554 .writeUtf8(": ") 555 .writeUtf8(proxyRequest.headers().value(i)) 556 .writeUtf8("\r\n"); 557 } 558 sink.writeUtf8("\r\n"); 559 // Flush buffer (flushes socket and sends request) 560 sink.flush(); 561 562 // Read status line, check if 2xx was returned 563 StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source)); 564 // Drain rest of headers 565 while (!readUtf8LineStrictUnbuffered(source).equals("")) {} 566 if (statusLine.code < 200 || statusLine.code >= 300) { 567 Buffer body = new Buffer(); 568 try { 569 sock.shutdownOutput(); 570 source.read(body, 1024); 571 } catch (IOException ex) { 572 body.writeUtf8("Unable to read body: " + ex.toString()); 573 } 574 try { 575 sock.close(); 576 } catch (IOException ignored) { 577 // ignored 578 } 579 String message = String.format( 580 "Response returned from proxy was not successful (expected 2xx, got %d %s). " 581 + "Response body:\n%s", 582 statusLine.code, statusLine.message, body.readUtf8()); 583 throw Status.UNAVAILABLE.withDescription(message).asException(); 584 } 585 return sock; 586 } catch (IOException e) { 587 throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e) 588 .asException(); 589 } 590 } 591 createHttpProxyRequest(InetSocketAddress address, String proxyUsername, String proxyPassword)592 private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername, 593 String proxyPassword) { 594 HttpUrl tunnelUrl = new HttpUrl.Builder() 595 .scheme("https") 596 .host(address.getHostName()) 597 .port(address.getPort()) 598 .build(); 599 Request.Builder request = new Request.Builder() 600 .url(tunnelUrl) 601 .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port()) 602 .header("User-Agent", userAgent); 603 604 // If we have proxy credentials, set them right away 605 if (proxyUsername != null && proxyPassword != null) { 606 request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword)); 607 } 608 return request.build(); 609 } 610 readUtf8LineStrictUnbuffered(Source source)611 private static String readUtf8LineStrictUnbuffered(Source source) throws IOException { 612 Buffer buffer = new Buffer(); 613 while (true) { 614 if (source.read(buffer, 1) == -1) { 615 throw new EOFException("\\n not found: " + buffer.readByteString().hex()); 616 } 617 if (buffer.getByte(buffer.size() - 1) == '\n') { 618 return buffer.readUtf8LineStrict(); 619 } 620 } 621 } 622 623 @Override toString()624 public String toString() { 625 return MoreObjects.toStringHelper(this) 626 .add("logId", logId.getId()) 627 .add("address", address) 628 .toString(); 629 } 630 631 @Override getLogId()632 public InternalLogId getLogId() { 633 return logId; 634 } 635 636 /** 637 * Gets the overridden authority hostname. If the authority is overridden to be an invalid 638 * authority, uri.getHost() will (rightly) return null, since the authority is no longer 639 * an actual service. This method overrides the behavior for practical reasons. For example, 640 * if an authority is in the form "invalid_authority" (note the "_"), rather than return null, 641 * we return the input. This is because the return value, in conjunction with getOverridenPort, 642 * are used by the SSL library to reconstruct the actual authority. It /already/ has a 643 * connection to the port, independent of this function. 644 * 645 * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do 646 * the wrong thing. An example wrong behavior would be "invalid_host:443". Registry based 647 * authorities do not have ports, so this is even more wrong than before. Sorry. 648 */ 649 @VisibleForTesting getOverridenHost()650 String getOverridenHost() { 651 URI uri = GrpcUtil.authorityToUri(defaultAuthority); 652 if (uri.getHost() != null) { 653 return uri.getHost(); 654 } 655 656 return defaultAuthority; 657 } 658 659 @VisibleForTesting getOverridenPort()660 int getOverridenPort() { 661 URI uri = GrpcUtil.authorityToUri(defaultAuthority); 662 if (uri.getPort() != -1) { 663 return uri.getPort(); 664 } 665 666 return address.getPort(); 667 } 668 669 @Override shutdown(Status reason)670 public void shutdown(Status reason) { 671 synchronized (lock) { 672 if (goAwayStatus != null) { 673 return; 674 } 675 676 goAwayStatus = reason; 677 listener.transportShutdown(goAwayStatus); 678 stopIfNecessary(); 679 } 680 } 681 682 @Override shutdownNow(Status reason)683 public void shutdownNow(Status reason) { 684 shutdown(reason); 685 synchronized (lock) { 686 Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator(); 687 while (it.hasNext()) { 688 Map.Entry<Integer, OkHttpClientStream> entry = it.next(); 689 it.remove(); 690 entry.getValue().transportState().transportReportStatus(reason, false, new Metadata()); 691 } 692 693 for (OkHttpClientStream stream : pendingStreams) { 694 stream.transportState().transportReportStatus(reason, true, new Metadata()); 695 } 696 pendingStreams.clear(); 697 maybeClearInUse(); 698 699 stopIfNecessary(); 700 } 701 } 702 703 @Override getAttributes()704 public Attributes getAttributes() { 705 return attributes; 706 } 707 708 /** 709 * Gets all active streams as an array. 710 */ getActiveStreams()711 OkHttpClientStream[] getActiveStreams() { 712 synchronized (lock) { 713 return streams.values().toArray(EMPTY_STREAM_ARRAY); 714 } 715 } 716 717 @VisibleForTesting getHandler()718 ClientFrameHandler getHandler() { 719 return clientFrameHandler; 720 } 721 722 @VisibleForTesting getPendingStreamSize()723 int getPendingStreamSize() { 724 synchronized (lock) { 725 return pendingStreams.size(); 726 } 727 } 728 729 /** 730 * Finish all active streams due to an IOException, then close the transport. 731 */ 732 @Override onException(Throwable failureCause)733 public void onException(Throwable failureCause) { 734 Preconditions.checkNotNull(failureCause, "failureCause"); 735 Status status = Status.UNAVAILABLE.withCause(failureCause); 736 startGoAway(0, ErrorCode.INTERNAL_ERROR, status); 737 } 738 739 /** 740 * Send GOAWAY to the server, then finish all active streams and close the transport. 741 */ onError(ErrorCode errorCode, String moreDetail)742 private void onError(ErrorCode errorCode, String moreDetail) { 743 startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail)); 744 } 745 startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status)746 private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) { 747 synchronized (lock) { 748 if (goAwayStatus == null) { 749 goAwayStatus = status; 750 listener.transportShutdown(status); 751 } 752 if (errorCode != null && !goAwaySent) { 753 // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated 754 // streams. The GOAWAY is part of graceful shutdown. 755 goAwaySent = true; 756 frameWriter.goAway(0, errorCode, new byte[0]); 757 } 758 759 Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator(); 760 while (it.hasNext()) { 761 Map.Entry<Integer, OkHttpClientStream> entry = it.next(); 762 if (entry.getKey() > lastKnownStreamId) { 763 it.remove(); 764 entry.getValue().transportState().transportReportStatus( 765 status, RpcProgress.REFUSED, false, new Metadata()); 766 } 767 } 768 769 for (OkHttpClientStream stream : pendingStreams) { 770 stream.transportState().transportReportStatus( 771 status, RpcProgress.REFUSED, true, new Metadata()); 772 } 773 pendingStreams.clear(); 774 maybeClearInUse(); 775 776 stopIfNecessary(); 777 } 778 } 779 780 /** 781 * Called when a stream is closed, we do things like: 782 * <ul> 783 * <li>Removing the stream from the map. 784 * <li>Optionally reporting the status. 785 * <li>Starting pending streams if we can. 786 * <li>Stopping the transport if this is the last live stream under a go-away status. 787 * </ul> 788 * 789 * @param streamId the Id of the stream. 790 * @param status the final status of this stream, null means no need to report. 791 * @param stopDelivery interrupt queued messages in the deframer 792 * @param errorCode reset the stream with this ErrorCode if not null. 793 * @param trailers the trailers received if not null 794 */ finishStream( int streamId, @Nullable Status status, RpcProgress rpcProgress, boolean stopDelivery, @Nullable ErrorCode errorCode, @Nullable Metadata trailers)795 void finishStream( 796 int streamId, 797 @Nullable Status status, 798 RpcProgress rpcProgress, 799 boolean stopDelivery, 800 @Nullable ErrorCode errorCode, 801 @Nullable Metadata trailers) { 802 synchronized (lock) { 803 OkHttpClientStream stream = streams.remove(streamId); 804 if (stream != null) { 805 if (errorCode != null) { 806 frameWriter.rstStream(streamId, ErrorCode.CANCEL); 807 } 808 if (status != null) { 809 stream 810 .transportState() 811 .transportReportStatus( 812 status, 813 rpcProgress, 814 stopDelivery, 815 trailers != null ? trailers : new Metadata()); 816 } 817 if (!startPendingStreams()) { 818 stopIfNecessary(); 819 maybeClearInUse(); 820 } 821 } 822 } 823 } 824 825 /** 826 * When the transport is in goAway state, we should stop it once all active streams finish. 827 */ 828 @GuardedBy("lock") stopIfNecessary()829 private void stopIfNecessary() { 830 if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) { 831 return; 832 } 833 if (stopped) { 834 return; 835 } 836 stopped = true; 837 838 if (keepAliveManager != null) { 839 keepAliveManager.onTransportTermination(); 840 // KeepAliveManager should stop using the scheduler after onTransportTermination gets called. 841 scheduler = SharedResourceHolder.release(TIMER_SERVICE, scheduler); 842 } 843 844 if (ping != null) { 845 ping.failed(getPingFailure()); 846 ping = null; 847 } 848 849 if (!goAwaySent) { 850 // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated 851 // streams. The GOAWAY is part of graceful shutdown. 852 goAwaySent = true; 853 frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]); 854 } 855 856 // We will close the underlying socket in the writing thread to break out the reader 857 // thread, which will close the frameReader and notify the listener. 858 frameWriter.close(); 859 } 860 861 @GuardedBy("lock") maybeClearInUse()862 private void maybeClearInUse() { 863 if (inUse) { 864 if (pendingStreams.isEmpty() && streams.isEmpty()) { 865 inUse = false; 866 listener.transportInUse(false); 867 if (keepAliveManager != null) { 868 // We don't have any active streams. No need to do keepalives any more. 869 // Again, we have to call this inside the lock to avoid the race between onTransportIdle 870 // and onTransportActive. 871 keepAliveManager.onTransportIdle(); 872 } 873 } 874 } 875 } 876 877 @GuardedBy("lock") setInUse()878 private void setInUse() { 879 if (!inUse) { 880 inUse = true; 881 listener.transportInUse(true); 882 if (keepAliveManager != null) { 883 // We have a new stream. We might need to do keepalives now. 884 // Note that we have to do this inside the lock to avoid calling 885 // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong 886 // order. 887 keepAliveManager.onTransportActive(); 888 } 889 } 890 } 891 getPingFailure()892 private Throwable getPingFailure() { 893 synchronized (lock) { 894 if (goAwayStatus != null) { 895 return goAwayStatus.asException(); 896 } else { 897 return Status.UNAVAILABLE.withDescription("Connection closed").asException(); 898 } 899 } 900 } 901 mayHaveCreatedStream(int streamId)902 boolean mayHaveCreatedStream(int streamId) { 903 synchronized (lock) { 904 return streamId < nextStreamId && (streamId & 1) == 1; 905 } 906 } 907 getStream(int streamId)908 OkHttpClientStream getStream(int streamId) { 909 synchronized (lock) { 910 return streams.get(streamId); 911 } 912 } 913 914 /** 915 * Returns a Grpc status corresponding to the given ErrorCode. 916 */ 917 @VisibleForTesting toGrpcStatus(ErrorCode code)918 static Status toGrpcStatus(ErrorCode code) { 919 Status status = ERROR_CODE_TO_STATUS.get(code); 920 return status != null ? status : Status.UNKNOWN.withDescription( 921 "Unknown http2 error code: " + code.httpCode); 922 } 923 924 @Override getStats()925 public ListenableFuture<SocketStats> getStats() { 926 SettableFuture<SocketStats> ret = SettableFuture.create(); 927 synchronized (lock) { 928 if (socket == null) { 929 ret.set(new SocketStats( 930 transportTracer.getStats(), 931 /*local=*/ null, 932 /*remote=*/ null, 933 new InternalChannelz.SocketOptions.Builder().build(), 934 /*security=*/ null)); 935 } else { 936 ret.set(new SocketStats( 937 transportTracer.getStats(), 938 socket.getLocalSocketAddress(), 939 socket.getRemoteSocketAddress(), 940 Utils.getSocketOptions(socket), 941 securityInfo)); 942 } 943 return ret; 944 } 945 } 946 947 /** 948 * Runnable which reads frames and dispatches them to in flight calls. 949 */ 950 @VisibleForTesting 951 class ClientFrameHandler implements FrameReader.Handler, Runnable { 952 FrameReader frameReader; 953 boolean firstSettings = true; 954 ClientFrameHandler(FrameReader frameReader)955 ClientFrameHandler(FrameReader frameReader) { 956 this.frameReader = frameReader; 957 } 958 959 @Override run()960 public void run() { 961 String threadName = Thread.currentThread().getName(); 962 if (!GrpcUtil.IS_RESTRICTED_APPENGINE) { 963 Thread.currentThread().setName("OkHttpClientTransport"); 964 } 965 try { 966 // Read until the underlying socket closes. 967 while (frameReader.nextFrame(this)) { 968 if (keepAliveManager != null) { 969 keepAliveManager.onDataReceived(); 970 } 971 } 972 // frameReader.nextFrame() returns false when the underlying read encounters an IOException, 973 // it may be triggered by the socket closing, in such case, the startGoAway() will do 974 // nothing, otherwise, we finish all streams since it's a real IO issue. 975 startGoAway(0, ErrorCode.INTERNAL_ERROR, 976 Status.UNAVAILABLE.withDescription("End of stream or IOException")); 977 } catch (Throwable t) { 978 // TODO(madongfly): Send the exception message to the server. 979 startGoAway( 980 0, 981 ErrorCode.PROTOCOL_ERROR, 982 Status.UNAVAILABLE.withDescription("error in frame handler").withCause(t)); 983 } finally { 984 try { 985 frameReader.close(); 986 } catch (IOException ex) { 987 log.log(Level.INFO, "Exception closing frame reader", ex); 988 } 989 listener.transportTerminated(); 990 if (!GrpcUtil.IS_RESTRICTED_APPENGINE) { 991 // Restore the original thread name. 992 Thread.currentThread().setName(threadName); 993 } 994 } 995 } 996 997 /** 998 * Handle a HTTP2 DATA frame. 999 */ 1000 @Override data(boolean inFinished, int streamId, BufferedSource in, int length)1001 public void data(boolean inFinished, int streamId, BufferedSource in, int length) 1002 throws IOException { 1003 OkHttpClientStream stream = getStream(streamId); 1004 if (stream == null) { 1005 if (mayHaveCreatedStream(streamId)) { 1006 frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM); 1007 in.skip(length); 1008 } else { 1009 onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId); 1010 return; 1011 } 1012 } else { 1013 // Wait until the frame is complete. 1014 in.require(length); 1015 1016 Buffer buf = new Buffer(); 1017 buf.write(in.buffer(), length); 1018 synchronized (lock) { 1019 stream.transportState().transportDataReceived(buf, inFinished); 1020 } 1021 } 1022 1023 // connection window update 1024 connectionUnacknowledgedBytesRead += length; 1025 if (connectionUnacknowledgedBytesRead >= Utils.DEFAULT_WINDOW_SIZE / 2) { 1026 frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead); 1027 connectionUnacknowledgedBytesRead = 0; 1028 } 1029 } 1030 1031 /** 1032 * Handle HTTP2 HEADER and CONTINUATION frames. 1033 */ 1034 @Override headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode)1035 public void headers(boolean outFinished, 1036 boolean inFinished, 1037 int streamId, 1038 int associatedStreamId, 1039 List<Header> headerBlock, 1040 HeadersMode headersMode) { 1041 boolean unknownStream = false; 1042 synchronized (lock) { 1043 OkHttpClientStream stream = streams.get(streamId); 1044 if (stream == null) { 1045 if (mayHaveCreatedStream(streamId)) { 1046 frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM); 1047 } else { 1048 unknownStream = true; 1049 } 1050 } else { 1051 stream.transportState().transportHeadersReceived(headerBlock, inFinished); 1052 } 1053 } 1054 if (unknownStream) { 1055 // We don't expect any server-initiated streams. 1056 onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId); 1057 } 1058 } 1059 1060 @Override rstStream(int streamId, ErrorCode errorCode)1061 public void rstStream(int streamId, ErrorCode errorCode) { 1062 Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream"); 1063 boolean stopDelivery = 1064 (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED); 1065 finishStream( 1066 streamId, status, 1067 errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED, 1068 stopDelivery, null, null); 1069 } 1070 1071 @Override settings(boolean clearPrevious, Settings settings)1072 public void settings(boolean clearPrevious, Settings settings) { 1073 boolean outboundWindowSizeIncreased = false; 1074 synchronized (lock) { 1075 if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) { 1076 int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get( 1077 settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS); 1078 maxConcurrentStreams = receivedMaxConcurrentStreams; 1079 } 1080 1081 if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) { 1082 int initialWindowSize = OkHttpSettingsUtil.get( 1083 settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE); 1084 outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize); 1085 } 1086 if (firstSettings) { 1087 listener.transportReady(); 1088 firstSettings = false; 1089 } 1090 1091 // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any 1092 // writes due to update in settings must be sent after SETTINGS acknowledgment frame, 1093 // otherwise it will cause a stream error (RST_STREAM). 1094 frameWriter.ackSettings(settings); 1095 1096 // send any pending bytes / streams 1097 if (outboundWindowSizeIncreased) { 1098 outboundFlow.writeStreams(); 1099 } 1100 startPendingStreams(); 1101 } 1102 } 1103 1104 @Override ping(boolean ack, int payload1, int payload2)1105 public void ping(boolean ack, int payload1, int payload2) { 1106 if (!ack) { 1107 frameWriter.ping(true, payload1, payload2); 1108 } else { 1109 Http2Ping p = null; 1110 long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL); 1111 synchronized (lock) { 1112 if (ping != null) { 1113 if (ping.payload() == ackPayload) { 1114 p = ping; 1115 ping = null; 1116 } else { 1117 log.log(Level.WARNING, String.format("Received unexpected ping ack. " 1118 + "Expecting %d, got %d", ping.payload(), ackPayload)); 1119 } 1120 } else { 1121 log.warning("Received unexpected ping ack. No ping outstanding"); 1122 } 1123 } 1124 // don't complete it while holding lock since callbacks could run immediately 1125 if (p != null) { 1126 p.complete(); 1127 } 1128 } 1129 } 1130 1131 @Override ackSettings()1132 public void ackSettings() { 1133 // Do nothing currently. 1134 } 1135 1136 @Override goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData)1137 public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) { 1138 if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) { 1139 String data = debugData.utf8(); 1140 log.log(Level.WARNING, String.format( 1141 "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data)); 1142 if ("too_many_pings".equals(data)) { 1143 tooManyPingsRunnable.run(); 1144 } 1145 } 1146 Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode) 1147 .augmentDescription("Received Goaway"); 1148 if (debugData.size() > 0) { 1149 // If a debug message was provided, use it. 1150 status = status.augmentDescription(debugData.utf8()); 1151 } 1152 startGoAway(lastGoodStreamId, null, status); 1153 } 1154 1155 @Override pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)1156 public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) 1157 throws IOException { 1158 // We don't accept server initiated stream. 1159 frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR); 1160 } 1161 1162 @Override windowUpdate(int streamId, long delta)1163 public void windowUpdate(int streamId, long delta) { 1164 if (delta == 0) { 1165 String errorMsg = "Received 0 flow control window increment."; 1166 if (streamId == 0) { 1167 onError(ErrorCode.PROTOCOL_ERROR, errorMsg); 1168 } else { 1169 finishStream( 1170 streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false, 1171 ErrorCode.PROTOCOL_ERROR, null); 1172 } 1173 return; 1174 } 1175 1176 boolean unknownStream = false; 1177 synchronized (lock) { 1178 if (streamId == Utils.CONNECTION_STREAM_ID) { 1179 outboundFlow.windowUpdate(null, (int) delta); 1180 return; 1181 } 1182 1183 OkHttpClientStream stream = streams.get(streamId); 1184 if (stream != null) { 1185 outboundFlow.windowUpdate(stream, (int) delta); 1186 } else if (!mayHaveCreatedStream(streamId)) { 1187 unknownStream = true; 1188 } 1189 } 1190 if (unknownStream) { 1191 onError(ErrorCode.PROTOCOL_ERROR, 1192 "Received window_update for unknown stream: " + streamId); 1193 } 1194 } 1195 1196 @Override priority(int streamId, int streamDependency, int weight, boolean exclusive)1197 public void priority(int streamId, int streamDependency, int weight, boolean exclusive) { 1198 // Ignore priority change. 1199 // TODO(madongfly): log 1200 } 1201 1202 @Override alternateService(int streamId, String origin, ByteString protocol, String host, int port, long maxAge)1203 public void alternateService(int streamId, String origin, ByteString protocol, String host, 1204 int port, long maxAge) { 1205 // TODO(madongfly): Deal with alternateService propagation 1206 } 1207 } 1208 } 1209