1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static io.grpc.ConnectivityState.CONNECTING; 20 import static io.grpc.ConnectivityState.IDLE; 21 import static io.grpc.ConnectivityState.READY; 22 import static io.grpc.ConnectivityState.SHUTDOWN; 23 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 24 25 import com.google.common.annotations.VisibleForTesting; 26 import com.google.common.base.MoreObjects; 27 import com.google.common.base.Preconditions; 28 import com.google.common.base.Stopwatch; 29 import com.google.common.base.Supplier; 30 import com.google.common.util.concurrent.ListenableFuture; 31 import com.google.common.util.concurrent.SettableFuture; 32 import com.google.errorprone.annotations.ForOverride; 33 import io.grpc.Attributes; 34 import io.grpc.CallOptions; 35 import io.grpc.ChannelLogger; 36 import io.grpc.ChannelLogger.ChannelLogLevel; 37 import io.grpc.ClientStreamTracer; 38 import io.grpc.ConnectivityState; 39 import io.grpc.ConnectivityStateInfo; 40 import io.grpc.EquivalentAddressGroup; 41 import io.grpc.HttpConnectProxiedSocketAddress; 42 import io.grpc.InternalChannelz; 43 import io.grpc.InternalChannelz.ChannelStats; 44 import io.grpc.InternalInstrumented; 45 import io.grpc.InternalLogId; 46 import io.grpc.InternalWithLogId; 47 import io.grpc.Metadata; 48 import io.grpc.MethodDescriptor; 49 import io.grpc.Status; 50 import io.grpc.SynchronizationContext; 51 import io.grpc.SynchronizationContext.ScheduledHandle; 52 import java.net.SocketAddress; 53 import java.util.ArrayList; 54 import java.util.Collection; 55 import java.util.Collections; 56 import java.util.List; 57 import java.util.concurrent.ScheduledExecutorService; 58 import java.util.concurrent.TimeUnit; 59 import javax.annotation.Nullable; 60 import javax.annotation.concurrent.ThreadSafe; 61 62 /** 63 * Transports for a single {@link SocketAddress}. 64 */ 65 @ThreadSafe 66 final class InternalSubchannel implements InternalInstrumented<ChannelStats>, TransportProvider { 67 68 private final InternalLogId logId; 69 private final String authority; 70 private final String userAgent; 71 private final BackoffPolicy.Provider backoffPolicyProvider; 72 private final Callback callback; 73 private final ClientTransportFactory transportFactory; 74 private final ScheduledExecutorService scheduledExecutor; 75 private final InternalChannelz channelz; 76 private final CallTracer callsTracer; 77 private final ChannelTracer channelTracer; 78 private final ChannelLogger channelLogger; 79 80 /** 81 * All field must be mutated in the syncContext. 82 */ 83 private final SynchronizationContext syncContext; 84 85 /** 86 * The index of the address corresponding to pendingTransport/activeTransport, or at beginning if 87 * both are null. 88 * 89 * <p>Note: any {@link Index#updateAddresses(List)} should also update {@link #addressGroups}. 90 */ 91 private final Index addressIndex; 92 93 /** 94 * A volatile accessor to {@link Index#getAddressGroups()}. There are few methods ({@link 95 * #getAddressGroups()} and {@link #toString()} access this value where they supposed to access 96 * in the {@link #syncContext}. Ideally {@link Index#getAddressGroups()} can be volatile, so we 97 * don't need to maintain this volatile accessor. Although, having this accessor can reduce 98 * unnecessary volatile reads while it delivers clearer intention of why . 99 */ 100 private volatile List<EquivalentAddressGroup> addressGroups; 101 102 /** 103 * The policy to control back off between reconnects. Non-{@code null} when a reconnect task is 104 * scheduled. 105 */ 106 private BackoffPolicy reconnectPolicy; 107 108 /** 109 * Timer monitoring duration since entering CONNECTING state. 110 */ 111 private final Stopwatch connectingTimer; 112 113 @Nullable 114 private ScheduledHandle reconnectTask; 115 @Nullable 116 private ScheduledHandle shutdownDueToUpdateTask; 117 @Nullable 118 private ManagedClientTransport shutdownDueToUpdateTransport; 119 120 /** 121 * All transports that are not terminated. At the very least the value of {@link #activeTransport} 122 * will be present, but previously used transports that still have streams or are stopping may 123 * also be present. 124 */ 125 private final Collection<ConnectionClientTransport> transports = new ArrayList<>(); 126 127 // Must only be used from syncContext 128 private final InUseStateAggregator<ConnectionClientTransport> inUseStateAggregator = 129 new InUseStateAggregator<ConnectionClientTransport>() { 130 @Override 131 protected void handleInUse() { 132 callback.onInUse(InternalSubchannel.this); 133 } 134 135 @Override 136 protected void handleNotInUse() { 137 callback.onNotInUse(InternalSubchannel.this); 138 } 139 }; 140 141 /** 142 * The to-be active transport, which is not ready yet. 143 */ 144 @Nullable 145 private ConnectionClientTransport pendingTransport; 146 147 /** 148 * The transport for new outgoing requests. Non-null only in READY state. 149 */ 150 @Nullable 151 private volatile ManagedClientTransport activeTransport; 152 153 private volatile ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE); 154 155 private Status shutdownReason; 156 InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent, BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext, Callback callback, InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer, InternalLogId logId, ChannelLogger channelLogger)157 InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent, 158 BackoffPolicy.Provider backoffPolicyProvider, 159 ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, 160 Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext, Callback callback, 161 InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer, 162 InternalLogId logId, ChannelLogger channelLogger) { 163 Preconditions.checkNotNull(addressGroups, "addressGroups"); 164 Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty"); 165 checkListHasNoNulls(addressGroups, "addressGroups contains null entry"); 166 List<EquivalentAddressGroup> unmodifiableAddressGroups = 167 Collections.unmodifiableList(new ArrayList<>(addressGroups)); 168 this.addressGroups = unmodifiableAddressGroups; 169 this.addressIndex = new Index(unmodifiableAddressGroups); 170 this.authority = authority; 171 this.userAgent = userAgent; 172 this.backoffPolicyProvider = backoffPolicyProvider; 173 this.transportFactory = transportFactory; 174 this.scheduledExecutor = scheduledExecutor; 175 this.connectingTimer = stopwatchSupplier.get(); 176 this.syncContext = syncContext; 177 this.callback = callback; 178 this.channelz = channelz; 179 this.callsTracer = callsTracer; 180 this.channelTracer = Preconditions.checkNotNull(channelTracer, "channelTracer"); 181 this.logId = Preconditions.checkNotNull(logId, "logId"); 182 this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger"); 183 } 184 getChannelLogger()185 ChannelLogger getChannelLogger() { 186 return channelLogger; 187 } 188 189 @Override obtainActiveTransport()190 public ClientTransport obtainActiveTransport() { 191 ClientTransport savedTransport = activeTransport; 192 if (savedTransport != null) { 193 return savedTransport; 194 } 195 syncContext.execute(new Runnable() { 196 @Override 197 public void run() { 198 if (state.getState() == IDLE) { 199 channelLogger.log(ChannelLogLevel.INFO, "CONNECTING as requested"); 200 gotoNonErrorState(CONNECTING); 201 startNewTransport(); 202 } 203 } 204 }); 205 return null; 206 } 207 208 /** 209 * Returns a READY transport if there is any, without trying to connect. 210 */ 211 @Nullable getTransport()212 ClientTransport getTransport() { 213 return activeTransport; 214 } 215 216 /** 217 * Returns the authority string associated with this Subchannel. 218 */ getAuthority()219 String getAuthority() { 220 return authority; 221 } 222 startNewTransport()223 private void startNewTransport() { 224 syncContext.throwIfNotInThisSynchronizationContext(); 225 226 Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled"); 227 228 if (addressIndex.isAtBeginning()) { 229 connectingTimer.reset().start(); 230 } 231 SocketAddress address = addressIndex.getCurrentAddress(); 232 233 HttpConnectProxiedSocketAddress proxiedAddr = null; 234 if (address instanceof HttpConnectProxiedSocketAddress) { 235 proxiedAddr = (HttpConnectProxiedSocketAddress) address; 236 address = proxiedAddr.getTargetAddress(); 237 } 238 239 Attributes currentEagAttributes = addressIndex.getCurrentEagAttributes(); 240 String eagChannelAuthority = currentEagAttributes 241 .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE); 242 ClientTransportFactory.ClientTransportOptions options = 243 new ClientTransportFactory.ClientTransportOptions() 244 .setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority) 245 .setEagAttributes(currentEagAttributes) 246 .setUserAgent(userAgent) 247 .setHttpConnectProxiedSocketAddress(proxiedAddr); 248 TransportLogger transportLogger = new TransportLogger(); 249 // In case the transport logs in the constructor, use the subchannel logId 250 transportLogger.logId = getLogId(); 251 ConnectionClientTransport transport = 252 new CallTracingTransport( 253 transportFactory 254 .newClientTransport(address, options, transportLogger), callsTracer); 255 transportLogger.logId = transport.getLogId(); 256 channelz.addClientSocket(transport); 257 pendingTransport = transport; 258 transports.add(transport); 259 Runnable runnable = transport.start(new TransportListener(transport)); 260 if (runnable != null) { 261 syncContext.executeLater(runnable); 262 } 263 channelLogger.log(ChannelLogLevel.INFO, "Started transport {0}", transportLogger.logId); 264 } 265 266 /** 267 * Only called after all addresses attempted and failed (TRANSIENT_FAILURE). 268 * @param status the causal status when the channel begins transition to 269 * TRANSIENT_FAILURE. 270 */ scheduleBackoff(final Status status)271 private void scheduleBackoff(final Status status) { 272 syncContext.throwIfNotInThisSynchronizationContext(); 273 274 class EndOfCurrentBackoff implements Runnable { 275 @Override 276 public void run() { 277 reconnectTask = null; 278 channelLogger.log(ChannelLogLevel.INFO, "CONNECTING after backoff"); 279 gotoNonErrorState(CONNECTING); 280 startNewTransport(); 281 } 282 } 283 284 gotoState(ConnectivityStateInfo.forTransientFailure(status)); 285 if (reconnectPolicy == null) { 286 reconnectPolicy = backoffPolicyProvider.get(); 287 } 288 long delayNanos = 289 reconnectPolicy.nextBackoffNanos() - connectingTimer.elapsed(TimeUnit.NANOSECONDS); 290 channelLogger.log( 291 ChannelLogLevel.INFO, 292 "TRANSIENT_FAILURE ({0}). Will reconnect after {1} ns", 293 printShortStatus(status), delayNanos); 294 Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done"); 295 reconnectTask = syncContext.schedule( 296 new EndOfCurrentBackoff(), 297 delayNanos, 298 TimeUnit.NANOSECONDS, 299 scheduledExecutor); 300 } 301 302 /** 303 * Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise this 304 * method has no effect. 305 */ resetConnectBackoff()306 void resetConnectBackoff() { 307 syncContext.execute(new Runnable() { 308 @Override 309 public void run() { 310 if (state.getState() != TRANSIENT_FAILURE) { 311 return; 312 } 313 cancelReconnectTask(); 314 channelLogger.log(ChannelLogLevel.INFO, "CONNECTING; backoff interrupted"); 315 gotoNonErrorState(CONNECTING); 316 startNewTransport(); 317 } 318 }); 319 } 320 gotoNonErrorState(final ConnectivityState newState)321 private void gotoNonErrorState(final ConnectivityState newState) { 322 syncContext.throwIfNotInThisSynchronizationContext(); 323 324 gotoState(ConnectivityStateInfo.forNonError(newState)); 325 } 326 gotoState(final ConnectivityStateInfo newState)327 private void gotoState(final ConnectivityStateInfo newState) { 328 syncContext.throwIfNotInThisSynchronizationContext(); 329 330 if (state.getState() != newState.getState()) { 331 Preconditions.checkState(state.getState() != SHUTDOWN, 332 "Cannot transition out of SHUTDOWN to " + newState); 333 state = newState; 334 callback.onStateChange(InternalSubchannel.this, newState); 335 } 336 } 337 338 /** Replaces the existing addresses, avoiding unnecessary reconnects. */ updateAddresses(final List<EquivalentAddressGroup> newAddressGroups)339 public void updateAddresses(final List<EquivalentAddressGroup> newAddressGroups) { 340 Preconditions.checkNotNull(newAddressGroups, "newAddressGroups"); 341 checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry"); 342 Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty"); 343 final List<EquivalentAddressGroup> newImmutableAddressGroups = 344 Collections.unmodifiableList(new ArrayList<>(newAddressGroups)); 345 346 syncContext.execute(new Runnable() { 347 @Override 348 public void run() { 349 ManagedClientTransport savedTransport = null; 350 SocketAddress previousAddress = addressIndex.getCurrentAddress(); 351 addressIndex.updateGroups(newImmutableAddressGroups); 352 addressGroups = newImmutableAddressGroups; 353 if (state.getState() == READY || state.getState() == CONNECTING) { 354 if (!addressIndex.seekTo(previousAddress)) { 355 // Forced to drop the connection 356 if (state.getState() == READY) { 357 savedTransport = activeTransport; 358 activeTransport = null; 359 addressIndex.reset(); 360 gotoNonErrorState(IDLE); 361 } else { 362 pendingTransport.shutdown( 363 Status.UNAVAILABLE.withDescription( 364 "InternalSubchannel closed pending transport due to address change")); 365 pendingTransport = null; 366 addressIndex.reset(); 367 startNewTransport(); 368 } 369 } 370 } 371 if (savedTransport != null) { 372 if (shutdownDueToUpdateTask != null) { 373 // Keeping track of multiple shutdown tasks adds complexity, and shouldn't generally be 374 // necessary. This transport has probably already had plenty of time. 375 shutdownDueToUpdateTransport.shutdown( 376 Status.UNAVAILABLE.withDescription( 377 "InternalSubchannel closed transport early due to address change")); 378 shutdownDueToUpdateTask.cancel(); 379 shutdownDueToUpdateTask = null; 380 shutdownDueToUpdateTransport = null; 381 } 382 // Avoid needless RPC failures by delaying the shutdown. See 383 // https://github.com/grpc/grpc-java/issues/2562 384 shutdownDueToUpdateTransport = savedTransport; 385 shutdownDueToUpdateTask = syncContext.schedule( 386 new Runnable() { 387 @Override public void run() { 388 ManagedClientTransport transport = shutdownDueToUpdateTransport; 389 shutdownDueToUpdateTask = null; 390 shutdownDueToUpdateTransport = null; 391 transport.shutdown( 392 Status.UNAVAILABLE.withDescription( 393 "InternalSubchannel closed transport due to address change")); 394 } 395 }, 396 ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, 397 TimeUnit.SECONDS, 398 scheduledExecutor); 399 } 400 } 401 }); 402 } 403 shutdown(final Status reason)404 public void shutdown(final Status reason) { 405 syncContext.execute(new Runnable() { 406 @Override 407 public void run() { 408 ManagedClientTransport savedActiveTransport; 409 ConnectionClientTransport savedPendingTransport; 410 if (state.getState() == SHUTDOWN) { 411 return; 412 } 413 shutdownReason = reason; 414 savedActiveTransport = activeTransport; 415 savedPendingTransport = pendingTransport; 416 activeTransport = null; 417 pendingTransport = null; 418 gotoNonErrorState(SHUTDOWN); 419 addressIndex.reset(); 420 if (transports.isEmpty()) { 421 handleTermination(); 422 } // else: the callback will be run once all transports have been terminated 423 cancelReconnectTask(); 424 if (shutdownDueToUpdateTask != null) { 425 shutdownDueToUpdateTask.cancel(); 426 shutdownDueToUpdateTransport.shutdown(reason); 427 shutdownDueToUpdateTask = null; 428 shutdownDueToUpdateTransport = null; 429 } 430 if (savedActiveTransport != null) { 431 savedActiveTransport.shutdown(reason); 432 } 433 if (savedPendingTransport != null) { 434 savedPendingTransport.shutdown(reason); 435 } 436 } 437 }); 438 } 439 440 @Override toString()441 public String toString() { 442 // addressGroupsCopy being a little stale is fine, just avoid calling toString with the lock 443 // since there may be many addresses. 444 return MoreObjects.toStringHelper(this) 445 .add("logId", logId.getId()) 446 .add("addressGroups", addressGroups) 447 .toString(); 448 } 449 handleTermination()450 private void handleTermination() { 451 syncContext.execute(new Runnable() { 452 @Override 453 public void run() { 454 channelLogger.log(ChannelLogLevel.INFO, "Terminated"); 455 callback.onTerminated(InternalSubchannel.this); 456 } 457 }); 458 } 459 handleTransportInUseState( final ConnectionClientTransport transport, final boolean inUse)460 private void handleTransportInUseState( 461 final ConnectionClientTransport transport, final boolean inUse) { 462 syncContext.execute(new Runnable() { 463 @Override 464 public void run() { 465 inUseStateAggregator.updateObjectInUse(transport, inUse); 466 } 467 }); 468 } 469 shutdownNow(final Status reason)470 void shutdownNow(final Status reason) { 471 shutdown(reason); 472 syncContext.execute(new Runnable() { 473 @Override 474 public void run() { 475 Collection<ManagedClientTransport> transportsCopy = 476 new ArrayList<ManagedClientTransport>(transports); 477 478 for (ManagedClientTransport transport : transportsCopy) { 479 transport.shutdownNow(reason); 480 } 481 } 482 }); 483 } 484 getAddressGroups()485 List<EquivalentAddressGroup> getAddressGroups() { 486 return addressGroups; 487 } 488 cancelReconnectTask()489 private void cancelReconnectTask() { 490 syncContext.throwIfNotInThisSynchronizationContext(); 491 492 if (reconnectTask != null) { 493 reconnectTask.cancel(); 494 reconnectTask = null; 495 reconnectPolicy = null; 496 } 497 } 498 499 @Override getLogId()500 public InternalLogId getLogId() { 501 return logId; 502 } 503 504 @Override getStats()505 public ListenableFuture<ChannelStats> getStats() { 506 final SettableFuture<ChannelStats> channelStatsFuture = SettableFuture.create(); 507 syncContext.execute(new Runnable() { 508 @Override 509 public void run() { 510 ChannelStats.Builder builder = new ChannelStats.Builder(); 511 List<EquivalentAddressGroup> addressGroupsSnapshot = addressIndex.getGroups(); 512 List<InternalWithLogId> transportsSnapshot = new ArrayList<InternalWithLogId>(transports); 513 builder.setTarget(addressGroupsSnapshot.toString()).setState(getState()); 514 builder.setSockets(transportsSnapshot); 515 callsTracer.updateBuilder(builder); 516 channelTracer.updateBuilder(builder); 517 channelStatsFuture.set(builder.build()); 518 } 519 }); 520 return channelStatsFuture; 521 } 522 getState()523 ConnectivityState getState() { 524 return state.getState(); 525 } 526 checkListHasNoNulls(List<?> list, String msg)527 private static void checkListHasNoNulls(List<?> list, String msg) { 528 for (Object item : list) { 529 Preconditions.checkNotNull(item, msg); 530 } 531 } 532 533 /** Listener for real transports. */ 534 private class TransportListener implements ManagedClientTransport.Listener { 535 final ConnectionClientTransport transport; 536 boolean shutdownInitiated = false; 537 TransportListener(ConnectionClientTransport transport)538 TransportListener(ConnectionClientTransport transport) { 539 this.transport = transport; 540 } 541 542 @Override transportReady()543 public void transportReady() { 544 channelLogger.log(ChannelLogLevel.INFO, "READY"); 545 syncContext.execute(new Runnable() { 546 @Override 547 public void run() { 548 reconnectPolicy = null; 549 if (shutdownReason != null) { 550 // activeTransport should have already been set to null by shutdown(). We keep it null. 551 Preconditions.checkState(activeTransport == null, 552 "Unexpected non-null activeTransport"); 553 transport.shutdown(shutdownReason); 554 } else if (pendingTransport == transport) { 555 activeTransport = transport; 556 pendingTransport = null; 557 gotoNonErrorState(READY); 558 } 559 } 560 }); 561 } 562 563 @Override transportInUse(boolean inUse)564 public void transportInUse(boolean inUse) { 565 handleTransportInUseState(transport, inUse); 566 } 567 568 @Override transportShutdown(final Status s)569 public void transportShutdown(final Status s) { 570 channelLogger.log( 571 ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s)); 572 shutdownInitiated = true; 573 syncContext.execute(new Runnable() { 574 @Override 575 public void run() { 576 if (state.getState() == SHUTDOWN) { 577 return; 578 } 579 if (activeTransport == transport) { 580 activeTransport = null; 581 addressIndex.reset(); 582 gotoNonErrorState(IDLE); 583 } else if (pendingTransport == transport) { 584 Preconditions.checkState(state.getState() == CONNECTING, 585 "Expected state is CONNECTING, actual state is %s", state.getState()); 586 addressIndex.increment(); 587 // Continue reconnect if there are still addresses to try. 588 if (!addressIndex.isValid()) { 589 pendingTransport = null; 590 addressIndex.reset(); 591 // Initiate backoff 592 // Transition to TRANSIENT_FAILURE 593 scheduleBackoff(s); 594 } else { 595 startNewTransport(); 596 } 597 } 598 } 599 }); 600 } 601 602 @Override transportTerminated()603 public void transportTerminated() { 604 Preconditions.checkState( 605 shutdownInitiated, "transportShutdown() must be called before transportTerminated()."); 606 607 channelLogger.log(ChannelLogLevel.INFO, "{0} Terminated", transport.getLogId()); 608 channelz.removeClientSocket(transport); 609 handleTransportInUseState(transport, false); 610 syncContext.execute(new Runnable() { 611 @Override 612 public void run() { 613 transports.remove(transport); 614 if (state.getState() == SHUTDOWN && transports.isEmpty()) { 615 handleTermination(); 616 } 617 } 618 }); 619 } 620 } 621 622 // All methods are called in syncContext 623 abstract static class Callback { 624 /** 625 * Called when the subchannel is terminated, which means it's shut down and all transports 626 * have been terminated. 627 */ 628 @ForOverride onTerminated(InternalSubchannel is)629 void onTerminated(InternalSubchannel is) { } 630 631 /** 632 * Called when the subchannel's connectivity state has changed. 633 */ 634 @ForOverride onStateChange(InternalSubchannel is, ConnectivityStateInfo newState)635 void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { } 636 637 /** 638 * Called when the subchannel's in-use state has changed to true, which means at least one 639 * transport is in use. 640 */ 641 @ForOverride onInUse(InternalSubchannel is)642 void onInUse(InternalSubchannel is) { } 643 644 /** 645 * Called when the subchannel's in-use state has changed to false, which means no transport is 646 * in use. 647 */ 648 @ForOverride onNotInUse(InternalSubchannel is)649 void onNotInUse(InternalSubchannel is) { } 650 } 651 652 @VisibleForTesting 653 static final class CallTracingTransport extends ForwardingConnectionClientTransport { 654 private final ConnectionClientTransport delegate; 655 private final CallTracer callTracer; 656 CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer)657 private CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer) { 658 this.delegate = delegate; 659 this.callTracer = callTracer; 660 } 661 662 @Override delegate()663 protected ConnectionClientTransport delegate() { 664 return delegate; 665 } 666 667 @Override newStream( MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions, ClientStreamTracer[] tracers)668 public ClientStream newStream( 669 MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions, 670 ClientStreamTracer[] tracers) { 671 final ClientStream streamDelegate = super.newStream(method, headers, callOptions, tracers); 672 return new ForwardingClientStream() { 673 @Override 674 protected ClientStream delegate() { 675 return streamDelegate; 676 } 677 678 @Override 679 public void start(final ClientStreamListener listener) { 680 callTracer.reportCallStarted(); 681 super.start(new ForwardingClientStreamListener() { 682 @Override 683 protected ClientStreamListener delegate() { 684 return listener; 685 } 686 687 @Override 688 public void closed( 689 Status status, RpcProgress rpcProgress, Metadata trailers) { 690 callTracer.reportCallEnded(status.isOk()); 691 super.closed(status, rpcProgress, trailers); 692 } 693 }); 694 } 695 }; 696 } 697 } 698 699 /** Index as in 'i', the pointer to an entry. Not a "search index." */ 700 @VisibleForTesting 701 static final class Index { 702 private List<EquivalentAddressGroup> addressGroups; 703 private int groupIndex; 704 private int addressIndex; 705 706 public Index(List<EquivalentAddressGroup> groups) { 707 this.addressGroups = groups; 708 } 709 710 public boolean isValid() { 711 // addressIndex will never be invalid 712 return groupIndex < addressGroups.size(); 713 } 714 715 public boolean isAtBeginning() { 716 return groupIndex == 0 && addressIndex == 0; 717 } 718 719 public void increment() { 720 EquivalentAddressGroup group = addressGroups.get(groupIndex); 721 addressIndex++; 722 if (addressIndex >= group.getAddresses().size()) { 723 groupIndex++; 724 addressIndex = 0; 725 } 726 } 727 728 public void reset() { 729 groupIndex = 0; 730 addressIndex = 0; 731 } 732 733 public SocketAddress getCurrentAddress() { 734 return addressGroups.get(groupIndex).getAddresses().get(addressIndex); 735 } 736 737 public Attributes getCurrentEagAttributes() { 738 return addressGroups.get(groupIndex).getAttributes(); 739 } 740 741 public List<EquivalentAddressGroup> getGroups() { 742 return addressGroups; 743 } 744 745 /** Update to new groups, resetting the current index. */ 746 public void updateGroups(List<EquivalentAddressGroup> newGroups) { 747 addressGroups = newGroups; 748 reset(); 749 } 750 751 /** Returns false if the needle was not found and the current index was left unchanged. */ 752 public boolean seekTo(SocketAddress needle) { 753 for (int i = 0; i < addressGroups.size(); i++) { 754 EquivalentAddressGroup group = addressGroups.get(i); 755 int j = group.getAddresses().indexOf(needle); 756 if (j == -1) { 757 continue; 758 } 759 this.groupIndex = i; 760 this.addressIndex = j; 761 return true; 762 } 763 return false; 764 } 765 } 766 767 private String printShortStatus(Status status) { 768 StringBuilder buffer = new StringBuilder(); 769 buffer.append(status.getCode()); 770 if (status.getDescription() != null) { 771 buffer.append("(").append(status.getDescription()).append(")"); 772 } 773 if (status.getCause() != null) { 774 buffer.append("[").append(status.getCause()).append("]"); 775 } 776 return buffer.toString(); 777 } 778 779 @VisibleForTesting 780 static final class TransportLogger extends ChannelLogger { 781 // Changed just after construction to break a cyclic dependency. 782 InternalLogId logId; 783 784 @Override 785 public void log(ChannelLogLevel level, String message) { 786 ChannelLoggerImpl.logOnly(logId, level, message); 787 } 788 789 @Override 790 public void log(ChannelLogLevel level, String messageFormat, Object... args) { 791 ChannelLoggerImpl.logOnly(logId, level, messageFormat, args); 792 } 793 } 794 } 795