1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; 23 import static io.grpc.ConnectivityState.IDLE; 24 import static io.grpc.ConnectivityState.SHUTDOWN; 25 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 26 import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE; 27 28 import com.google.common.annotations.VisibleForTesting; 29 import com.google.common.base.MoreObjects; 30 import com.google.common.base.Stopwatch; 31 import com.google.common.base.Supplier; 32 import com.google.common.util.concurrent.ListenableFuture; 33 import com.google.common.util.concurrent.SettableFuture; 34 import io.grpc.Attributes; 35 import io.grpc.CallCredentials; 36 import io.grpc.CallOptions; 37 import io.grpc.Channel; 38 import io.grpc.ChannelCredentials; 39 import io.grpc.ChannelLogger; 40 import io.grpc.ChannelLogger.ChannelLogLevel; 41 import io.grpc.ClientCall; 42 import io.grpc.ClientInterceptor; 43 import io.grpc.ClientInterceptors; 44 import io.grpc.ClientStreamTracer; 45 import io.grpc.CompressorRegistry; 46 import io.grpc.ConnectivityState; 47 import io.grpc.ConnectivityStateInfo; 48 import io.grpc.Context; 49 import io.grpc.DecompressorRegistry; 50 import io.grpc.EquivalentAddressGroup; 51 import io.grpc.ForwardingChannelBuilder; 52 import io.grpc.ForwardingClientCall; 53 import io.grpc.Grpc; 54 import io.grpc.InternalChannelz; 55 import io.grpc.InternalChannelz.ChannelStats; 56 import io.grpc.InternalChannelz.ChannelTrace; 57 import io.grpc.InternalConfigSelector; 58 import io.grpc.InternalInstrumented; 59 import io.grpc.InternalLogId; 60 import io.grpc.InternalWithLogId; 61 import io.grpc.LoadBalancer; 62 import io.grpc.LoadBalancer.CreateSubchannelArgs; 63 import io.grpc.LoadBalancer.PickResult; 64 import io.grpc.LoadBalancer.PickSubchannelArgs; 65 import io.grpc.LoadBalancer.ResolvedAddresses; 66 import io.grpc.LoadBalancer.SubchannelPicker; 67 import io.grpc.LoadBalancer.SubchannelStateListener; 68 import io.grpc.ManagedChannel; 69 import io.grpc.ManagedChannelBuilder; 70 import io.grpc.Metadata; 71 import io.grpc.MethodDescriptor; 72 import io.grpc.NameResolver; 73 import io.grpc.NameResolver.ConfigOrError; 74 import io.grpc.NameResolver.ResolutionResult; 75 import io.grpc.NameResolverRegistry; 76 import io.grpc.ProxyDetector; 77 import io.grpc.Status; 78 import io.grpc.SynchronizationContext; 79 import io.grpc.SynchronizationContext.ScheduledHandle; 80 import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer; 81 import io.grpc.internal.ClientCallImpl.ClientStreamProvider; 82 import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult; 83 import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder; 84 import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider; 85 import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo; 86 import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector; 87 import io.grpc.internal.RetriableStream.ChannelBufferMeter; 88 import io.grpc.internal.RetriableStream.Throttle; 89 import io.grpc.internal.RetryingNameResolver.ResolutionResultListener; 90 import java.net.URI; 91 import java.net.URISyntaxException; 92 import java.util.ArrayList; 93 import java.util.Collection; 94 import java.util.Collections; 95 import java.util.HashSet; 96 import java.util.LinkedHashSet; 97 import java.util.List; 98 import java.util.Map; 99 import java.util.Set; 100 import java.util.concurrent.Callable; 101 import java.util.concurrent.CountDownLatch; 102 import java.util.concurrent.ExecutionException; 103 import java.util.concurrent.Executor; 104 import java.util.concurrent.Future; 105 import java.util.concurrent.ScheduledExecutorService; 106 import java.util.concurrent.ScheduledFuture; 107 import java.util.concurrent.TimeUnit; 108 import java.util.concurrent.TimeoutException; 109 import java.util.concurrent.atomic.AtomicBoolean; 110 import java.util.concurrent.atomic.AtomicReference; 111 import java.util.logging.Level; 112 import java.util.logging.Logger; 113 import java.util.regex.Pattern; 114 import javax.annotation.Nullable; 115 import javax.annotation.concurrent.GuardedBy; 116 import javax.annotation.concurrent.ThreadSafe; 117 118 /** A communication channel for making outgoing RPCs. */ 119 @ThreadSafe 120 final class ManagedChannelImpl extends ManagedChannel implements 121 InternalInstrumented<ChannelStats> { 122 @VisibleForTesting 123 static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName()); 124 125 // Matching this pattern means the target string is a URI target or at least intended to be one. 126 // A URI target must be an absolute hierarchical URI. 127 // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." ) 128 @VisibleForTesting 129 static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*"); 130 131 static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1; 132 133 static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5; 134 135 @VisibleForTesting 136 static final Status SHUTDOWN_NOW_STATUS = 137 Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked"); 138 139 @VisibleForTesting 140 static final Status SHUTDOWN_STATUS = 141 Status.UNAVAILABLE.withDescription("Channel shutdown invoked"); 142 143 @VisibleForTesting 144 static final Status SUBCHANNEL_SHUTDOWN_STATUS = 145 Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked"); 146 147 private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG = 148 ManagedChannelServiceConfig.empty(); 149 private static final InternalConfigSelector INITIAL_PENDING_SELECTOR = 150 new InternalConfigSelector() { 151 @Override 152 public Result selectConfig(PickSubchannelArgs args) { 153 throw new IllegalStateException("Resolution is pending"); 154 } 155 }; 156 157 private final InternalLogId logId; 158 private final String target; 159 @Nullable 160 private final String authorityOverride; 161 private final NameResolverRegistry nameResolverRegistry; 162 private final NameResolver.Factory nameResolverFactory; 163 private final NameResolver.Args nameResolverArgs; 164 private final AutoConfiguredLoadBalancerFactory loadBalancerFactory; 165 private final ClientTransportFactory originalTransportFactory; 166 @Nullable 167 private final ChannelCredentials originalChannelCreds; 168 private final ClientTransportFactory transportFactory; 169 private final ClientTransportFactory oobTransportFactory; 170 private final RestrictedScheduledExecutor scheduledExecutor; 171 private final Executor executor; 172 private final ObjectPool<? extends Executor> executorPool; 173 private final ObjectPool<? extends Executor> balancerRpcExecutorPool; 174 private final ExecutorHolder balancerRpcExecutorHolder; 175 private final ExecutorHolder offloadExecutorHolder; 176 private final TimeProvider timeProvider; 177 private final int maxTraceEvents; 178 179 @VisibleForTesting 180 final SynchronizationContext syncContext = new SynchronizationContext( 181 new Thread.UncaughtExceptionHandler() { 182 @Override 183 public void uncaughtException(Thread t, Throwable e) { 184 logger.log( 185 Level.SEVERE, 186 "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!", 187 e); 188 panic(e); 189 } 190 }); 191 192 private boolean fullStreamDecompression; 193 194 private final DecompressorRegistry decompressorRegistry; 195 private final CompressorRegistry compressorRegistry; 196 197 private final Supplier<Stopwatch> stopwatchSupplier; 198 /** The timout before entering idle mode. */ 199 private final long idleTimeoutMillis; 200 201 private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager(); 202 private final BackoffPolicy.Provider backoffPolicyProvider; 203 204 /** 205 * We delegate to this channel, so that we can have interceptors as necessary. If there aren't 206 * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a 207 * {@link RealChannel}. 208 */ 209 private final Channel interceptorChannel; 210 @Nullable private final String userAgent; 211 212 // Only null after channel is terminated. Must be assigned from the syncContext. 213 private NameResolver nameResolver; 214 215 // Must be accessed from the syncContext. 216 private boolean nameResolverStarted; 217 218 // null when channel is in idle mode. Must be assigned from syncContext. 219 @Nullable 220 private LbHelperImpl lbHelper; 221 222 // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext. 223 // null if channel is in idle mode. 224 @Nullable 225 private volatile SubchannelPicker subchannelPicker; 226 227 // Must be accessed from the syncContext 228 private boolean panicMode; 229 230 // Must be mutated from syncContext 231 // If any monitoring hook to be added later needs to get a snapshot of this Set, we could 232 // switch to a ConcurrentHashMap. 233 private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f); 234 235 // Must be accessed from syncContext 236 @Nullable 237 private Collection<RealChannel.PendingCall<?, ?>> pendingCalls; 238 private final Object pendingCallsInUseObject = new Object(); 239 240 // Must be mutated from syncContext 241 private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f); 242 243 // reprocess() must be run from syncContext 244 private final DelayedClientTransport delayedTransport; 245 private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry 246 = new UncommittedRetriableStreamsRegistry(); 247 248 // Shutdown states. 249 // 250 // Channel's shutdown process: 251 // 1. shutdown(): stop accepting new calls from applications 252 // 1a shutdown <- true 253 // 1b subchannelPicker <- null 254 // 1c delayedTransport.shutdown() 255 // 2. delayedTransport terminated: stop stream-creation functionality 256 // 2a terminating <- true 257 // 2b loadBalancer.shutdown() 258 // * LoadBalancer will shutdown subchannels and OOB channels 259 // 2c loadBalancer <- null 260 // 2d nameResolver.shutdown() 261 // 2e nameResolver <- null 262 // 3. All subchannels and OOB channels terminated: Channel considered terminated 263 264 private final AtomicBoolean shutdown = new AtomicBoolean(false); 265 // Must only be mutated and read from syncContext 266 private boolean shutdownNowed; 267 // Must only be mutated from syncContext 268 private boolean terminating; 269 // Must be mutated from syncContext 270 private volatile boolean terminated; 271 private final CountDownLatch terminatedLatch = new CountDownLatch(1); 272 273 private final CallTracer.Factory callTracerFactory; 274 private final CallTracer channelCallTracer; 275 private final ChannelTracer channelTracer; 276 private final ChannelLogger channelLogger; 277 private final InternalChannelz channelz; 278 private final RealChannel realChannel; 279 // Must be mutated and read from syncContext 280 // a flag for doing channel tracing when flipped 281 private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION; 282 // Must be mutated and read from constructor or syncContext 283 // used for channel tracing when value changed 284 private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG; 285 286 @Nullable 287 private final ManagedChannelServiceConfig defaultServiceConfig; 288 // Must be mutated and read from constructor or syncContext 289 private boolean serviceConfigUpdated = false; 290 private final boolean lookUpServiceConfig; 291 292 // One instance per channel. 293 private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter(); 294 295 private final long perRpcBufferLimit; 296 private final long channelBufferLimit; 297 298 // Temporary false flag that can skip the retry code path. 299 private final boolean retryEnabled; 300 301 // Called from syncContext 302 private final ManagedClientTransport.Listener delayedTransportListener = 303 new DelayedTransportListener(); 304 305 // Must be called from syncContext maybeShutdownNowSubchannels()306 private void maybeShutdownNowSubchannels() { 307 if (shutdownNowed) { 308 for (InternalSubchannel subchannel : subchannels) { 309 subchannel.shutdownNow(SHUTDOWN_NOW_STATUS); 310 } 311 for (OobChannel oobChannel : oobChannels) { 312 oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS); 313 } 314 } 315 } 316 317 // Must be accessed from syncContext 318 @VisibleForTesting 319 final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator(); 320 321 @Override getStats()322 public ListenableFuture<ChannelStats> getStats() { 323 final SettableFuture<ChannelStats> ret = SettableFuture.create(); 324 final class StatsFetcher implements Runnable { 325 @Override 326 public void run() { 327 ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder(); 328 channelCallTracer.updateBuilder(builder); 329 channelTracer.updateBuilder(builder); 330 builder.setTarget(target).setState(channelStateManager.getState()); 331 List<InternalWithLogId> children = new ArrayList<>(); 332 children.addAll(subchannels); 333 children.addAll(oobChannels); 334 builder.setSubchannels(children); 335 ret.set(builder.build()); 336 } 337 } 338 339 // subchannels and oobchannels can only be accessed from syncContext 340 syncContext.execute(new StatsFetcher()); 341 return ret; 342 } 343 344 @Override getLogId()345 public InternalLogId getLogId() { 346 return logId; 347 } 348 349 // Run from syncContext 350 private class IdleModeTimer implements Runnable { 351 352 @Override run()353 public void run() { 354 // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after 355 // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too 356 // subtle to change rapidly to resolve the channel panic. See #8714 357 if (lbHelper == null) { 358 return; 359 } 360 enterIdleMode(); 361 } 362 } 363 364 // Must be called from syncContext shutdownNameResolverAndLoadBalancer(boolean channelIsActive)365 private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) { 366 syncContext.throwIfNotInThisSynchronizationContext(); 367 if (channelIsActive) { 368 checkState(nameResolverStarted, "nameResolver is not started"); 369 checkState(lbHelper != null, "lbHelper is null"); 370 } 371 if (nameResolver != null) { 372 nameResolver.shutdown(); 373 nameResolverStarted = false; 374 if (channelIsActive) { 375 nameResolver = getNameResolver( 376 target, authorityOverride, nameResolverFactory, nameResolverArgs); 377 } else { 378 nameResolver = null; 379 } 380 } 381 if (lbHelper != null) { 382 lbHelper.lb.shutdown(); 383 lbHelper = null; 384 } 385 subchannelPicker = null; 386 } 387 388 /** 389 * Make the channel exit idle mode, if it's in it. 390 * 391 * <p>Must be called from syncContext 392 */ 393 @VisibleForTesting exitIdleMode()394 void exitIdleMode() { 395 syncContext.throwIfNotInThisSynchronizationContext(); 396 if (shutdown.get() || panicMode) { 397 return; 398 } 399 if (inUseStateAggregator.isInUse()) { 400 // Cancel the timer now, so that a racing due timer will not put Channel on idleness 401 // when the caller of exitIdleMode() is about to use the returned loadBalancer. 402 cancelIdleTimer(false); 403 } else { 404 // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while 405 // isInUse() == false, in which case we still need to schedule the timer. 406 rescheduleIdleTimer(); 407 } 408 if (lbHelper != null) { 409 return; 410 } 411 channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode"); 412 LbHelperImpl lbHelper = new LbHelperImpl(); 413 lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper); 414 // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and 415 // may throw. We don't want to confuse our state, even if we will enter panic mode. 416 this.lbHelper = lbHelper; 417 418 NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver); 419 nameResolver.start(listener); 420 nameResolverStarted = true; 421 } 422 423 // Must be run from syncContext enterIdleMode()424 private void enterIdleMode() { 425 // nameResolver and loadBalancer are guaranteed to be non-null. If any of them were null, 426 // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown() 427 // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of 428 // which are bugs. 429 shutdownNameResolverAndLoadBalancer(true); 430 delayedTransport.reprocess(null); 431 channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state"); 432 channelStateManager.gotoState(IDLE); 433 // If the inUseStateAggregator still considers pending calls to be queued up or the delayed 434 // transport to be holding some we need to exit idle mode to give these calls a chance to 435 // be processed. 436 if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) { 437 exitIdleMode(); 438 } 439 } 440 441 // Must be run from syncContext cancelIdleTimer(boolean permanent)442 private void cancelIdleTimer(boolean permanent) { 443 idleTimer.cancel(permanent); 444 } 445 446 // Always run from syncContext rescheduleIdleTimer()447 private void rescheduleIdleTimer() { 448 if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { 449 return; 450 } 451 idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS); 452 } 453 454 /** 455 * Force name resolution refresh to happen immediately. Must be run 456 * from syncContext. 457 */ refreshNameResolution()458 private void refreshNameResolution() { 459 syncContext.throwIfNotInThisSynchronizationContext(); 460 if (nameResolverStarted) { 461 nameResolver.refresh(); 462 } 463 } 464 465 private final class ChannelStreamProvider implements ClientStreamProvider { 466 volatile Throttle throttle; 467 getTransport(PickSubchannelArgs args)468 private ClientTransport getTransport(PickSubchannelArgs args) { 469 SubchannelPicker pickerCopy = subchannelPicker; 470 if (shutdown.get()) { 471 // If channel is shut down, delayedTransport is also shut down which will fail the stream 472 // properly. 473 return delayedTransport; 474 } 475 if (pickerCopy == null) { 476 final class ExitIdleModeForTransport implements Runnable { 477 @Override 478 public void run() { 479 exitIdleMode(); 480 } 481 } 482 483 syncContext.execute(new ExitIdleModeForTransport()); 484 return delayedTransport; 485 } 486 // There is no need to reschedule the idle timer here. 487 // 488 // pickerCopy != null, which means idle timer has not expired when this method starts. 489 // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer 490 // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after 491 // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it. 492 // 493 // In most cases the idle timer is scheduled to fire after the transport has created the 494 // stream, which would have reported in-use state to the channel that would have cancelled 495 // the idle timer. 496 PickResult pickResult = pickerCopy.pickSubchannel(args); 497 ClientTransport transport = GrpcUtil.getTransportFromPickResult( 498 pickResult, args.getCallOptions().isWaitForReady()); 499 if (transport != null) { 500 return transport; 501 } 502 return delayedTransport; 503 } 504 505 @Override newStream( final MethodDescriptor<?, ?> method, final CallOptions callOptions, final Metadata headers, final Context context)506 public ClientStream newStream( 507 final MethodDescriptor<?, ?> method, 508 final CallOptions callOptions, 509 final Metadata headers, 510 final Context context) { 511 if (!retryEnabled) { 512 ClientTransport transport = 513 getTransport(new PickSubchannelArgsImpl(method, headers, callOptions)); 514 Context origContext = context.attach(); 515 ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( 516 callOptions, headers, 0, /* isTransparentRetry= */ false); 517 try { 518 return transport.newStream(method, headers, callOptions, tracers); 519 } finally { 520 context.detach(origContext); 521 } 522 } else { 523 MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY); 524 final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy; 525 final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy; 526 final class RetryStream<ReqT> extends RetriableStream<ReqT> { 527 @SuppressWarnings("unchecked") 528 RetryStream() { 529 super( 530 (MethodDescriptor<ReqT, ?>) method, 531 headers, 532 channelBufferUsed, 533 perRpcBufferLimit, 534 channelBufferLimit, 535 getCallExecutor(callOptions), 536 transportFactory.getScheduledExecutorService(), 537 retryPolicy, 538 hedgingPolicy, 539 throttle); 540 } 541 542 @Override 543 Status prestart() { 544 return uncommittedRetriableStreamsRegistry.add(this); 545 } 546 547 @Override 548 void postCommit() { 549 uncommittedRetriableStreamsRegistry.remove(this); 550 } 551 552 @Override 553 ClientStream newSubstream( 554 Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts, 555 boolean isTransparentRetry) { 556 CallOptions newOptions = callOptions.withStreamTracerFactory(factory); 557 ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( 558 newOptions, newHeaders, previousAttempts, isTransparentRetry); 559 ClientTransport transport = 560 getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions)); 561 Context origContext = context.attach(); 562 try { 563 return transport.newStream(method, newHeaders, newOptions, tracers); 564 } finally { 565 context.detach(origContext); 566 } 567 } 568 } 569 570 return new RetryStream<>(); 571 } 572 } 573 } 574 575 private final ChannelStreamProvider transportProvider = new ChannelStreamProvider(); 576 577 private final Rescheduler idleTimer; 578 ManagedChannelImpl( ManagedChannelImplBuilder builder, ClientTransportFactory clientTransportFactory, BackoffPolicy.Provider backoffPolicyProvider, ObjectPool<? extends Executor> balancerRpcExecutorPool, Supplier<Stopwatch> stopwatchSupplier, List<ClientInterceptor> interceptors, final TimeProvider timeProvider)579 ManagedChannelImpl( 580 ManagedChannelImplBuilder builder, 581 ClientTransportFactory clientTransportFactory, 582 BackoffPolicy.Provider backoffPolicyProvider, 583 ObjectPool<? extends Executor> balancerRpcExecutorPool, 584 Supplier<Stopwatch> stopwatchSupplier, 585 List<ClientInterceptor> interceptors, 586 final TimeProvider timeProvider) { 587 this.target = checkNotNull(builder.target, "target"); 588 this.logId = InternalLogId.allocate("Channel", target); 589 this.timeProvider = checkNotNull(timeProvider, "timeProvider"); 590 this.executorPool = checkNotNull(builder.executorPool, "executorPool"); 591 this.executor = checkNotNull(executorPool.getObject(), "executor"); 592 this.originalChannelCreds = builder.channelCredentials; 593 this.originalTransportFactory = clientTransportFactory; 594 this.offloadExecutorHolder = 595 new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool")); 596 this.transportFactory = new CallCredentialsApplyingTransportFactory( 597 clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder); 598 this.oobTransportFactory = new CallCredentialsApplyingTransportFactory( 599 clientTransportFactory, null, this.offloadExecutorHolder); 600 this.scheduledExecutor = 601 new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService()); 602 maxTraceEvents = builder.maxTraceEvents; 603 channelTracer = new ChannelTracer( 604 logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(), 605 "Channel for '" + target + "'"); 606 channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider); 607 ProxyDetector proxyDetector = 608 builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR; 609 this.retryEnabled = builder.retryEnabled; 610 this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy); 611 this.nameResolverRegistry = builder.nameResolverRegistry; 612 ScParser serviceConfigParser = 613 new ScParser( 614 retryEnabled, 615 builder.maxRetryAttempts, 616 builder.maxHedgedAttempts, 617 loadBalancerFactory); 618 this.authorityOverride = builder.authorityOverride; 619 this.nameResolverArgs = 620 NameResolver.Args.newBuilder() 621 .setDefaultPort(builder.getDefaultPort()) 622 .setProxyDetector(proxyDetector) 623 .setSynchronizationContext(syncContext) 624 .setScheduledExecutorService(scheduledExecutor) 625 .setServiceConfigParser(serviceConfigParser) 626 .setChannelLogger(channelLogger) 627 .setOffloadExecutor(this.offloadExecutorHolder) 628 .setOverrideAuthority(this.authorityOverride) 629 .build(); 630 this.nameResolverFactory = builder.nameResolverFactory; 631 this.nameResolver = getNameResolver( 632 target, authorityOverride, nameResolverFactory, nameResolverArgs); 633 this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool"); 634 this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool); 635 this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext); 636 this.delayedTransport.start(delayedTransportListener); 637 this.backoffPolicyProvider = backoffPolicyProvider; 638 639 if (builder.defaultServiceConfig != null) { 640 ConfigOrError parsedDefaultServiceConfig = 641 serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig); 642 checkState( 643 parsedDefaultServiceConfig.getError() == null, 644 "Default config is invalid: %s", 645 parsedDefaultServiceConfig.getError()); 646 this.defaultServiceConfig = 647 (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig(); 648 this.lastServiceConfig = this.defaultServiceConfig; 649 } else { 650 this.defaultServiceConfig = null; 651 } 652 this.lookUpServiceConfig = builder.lookUpServiceConfig; 653 realChannel = new RealChannel(nameResolver.getServiceAuthority()); 654 Channel channel = realChannel; 655 if (builder.binlog != null) { 656 channel = builder.binlog.wrapChannel(channel); 657 } 658 this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors); 659 this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); 660 if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { 661 this.idleTimeoutMillis = builder.idleTimeoutMillis; 662 } else { 663 checkArgument( 664 builder.idleTimeoutMillis 665 >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS, 666 "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis); 667 this.idleTimeoutMillis = builder.idleTimeoutMillis; 668 } 669 670 idleTimer = new Rescheduler( 671 new IdleModeTimer(), 672 syncContext, 673 transportFactory.getScheduledExecutorService(), 674 stopwatchSupplier.get()); 675 this.fullStreamDecompression = builder.fullStreamDecompression; 676 this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry"); 677 this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry"); 678 this.userAgent = builder.userAgent; 679 680 this.channelBufferLimit = builder.retryBufferSize; 681 this.perRpcBufferLimit = builder.perRpcBufferLimit; 682 final class ChannelCallTracerFactory implements CallTracer.Factory { 683 @Override 684 public CallTracer create() { 685 return new CallTracer(timeProvider); 686 } 687 } 688 689 this.callTracerFactory = new ChannelCallTracerFactory(); 690 channelCallTracer = callTracerFactory.create(); 691 this.channelz = checkNotNull(builder.channelz); 692 channelz.addRootChannel(this); 693 694 if (!lookUpServiceConfig) { 695 if (defaultServiceConfig != null) { 696 channelLogger.log( 697 ChannelLogLevel.INFO, "Service config look-up disabled, using default service config"); 698 } 699 serviceConfigUpdated = true; 700 } 701 } 702 getNameResolver( String target, NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs)703 private static NameResolver getNameResolver( 704 String target, NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) { 705 // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending 706 // "dns:///". 707 URI targetUri = null; 708 StringBuilder uriSyntaxErrors = new StringBuilder(); 709 try { 710 targetUri = new URI(target); 711 // For "localhost:8080" this would likely cause newNameResolver to return null, because 712 // "localhost" is parsed as the scheme. Will fall into the next branch and try 713 // "dns:///localhost:8080". 714 } catch (URISyntaxException e) { 715 // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234. 716 uriSyntaxErrors.append(e.getMessage()); 717 } 718 if (targetUri != null) { 719 NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs); 720 if (resolver != null) { 721 return resolver; 722 } 723 // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an 724 // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080" 725 } 726 727 // If we reached here, the targetUri couldn't be used. 728 if (!URI_PATTERN.matcher(target).matches()) { 729 // It doesn't look like a URI target. Maybe it's an authority string. Try with the default 730 // scheme from the factory. 731 try { 732 targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null); 733 } catch (URISyntaxException e) { 734 // Should not be possible. 735 throw new IllegalArgumentException(e); 736 } 737 NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs); 738 if (resolver != null) { 739 return resolver; 740 } 741 } 742 throw new IllegalArgumentException(String.format( 743 "cannot find a NameResolver for %s%s", 744 target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : "")); 745 } 746 747 @VisibleForTesting getNameResolver( String target, @Nullable final String overrideAuthority, NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs)748 static NameResolver getNameResolver( 749 String target, @Nullable final String overrideAuthority, 750 NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) { 751 NameResolver resolver = getNameResolver(target, nameResolverFactory, nameResolverArgs); 752 if (overrideAuthority == null) { 753 return resolver; 754 } 755 756 // If the nameResolver is not already a RetryingNameResolver, then wrap it with it. 757 // This helps guarantee that name resolution retry remains supported even as it has been 758 // removed from ManagedChannelImpl. 759 // TODO: After a transition period, all NameResolver implementations that need retry should use 760 // RetryingNameResolver directly and this step can be removed. 761 NameResolver usedNameResolver; 762 if (resolver instanceof RetryingNameResolver) { 763 usedNameResolver = resolver; 764 } else { 765 usedNameResolver = new RetryingNameResolver(resolver, 766 new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(), 767 nameResolverArgs.getScheduledExecutorService(), 768 nameResolverArgs.getSynchronizationContext()), 769 nameResolverArgs.getSynchronizationContext()); 770 } 771 772 return new ForwardingNameResolver(usedNameResolver) { 773 @Override 774 public String getServiceAuthority() { 775 return overrideAuthority; 776 } 777 }; 778 } 779 780 @VisibleForTesting 781 InternalConfigSelector getConfigSelector() { 782 return realChannel.configSelector.get(); 783 } 784 785 /** 786 * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately 787 * cancelled. 788 */ 789 @Override 790 public ManagedChannelImpl shutdown() { 791 channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called"); 792 if (!shutdown.compareAndSet(false, true)) { 793 return this; 794 } 795 final class Shutdown implements Runnable { 796 @Override 797 public void run() { 798 channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state"); 799 channelStateManager.gotoState(SHUTDOWN); 800 } 801 } 802 803 syncContext.execute(new Shutdown()); 804 realChannel.shutdown(); 805 final class CancelIdleTimer implements Runnable { 806 @Override 807 public void run() { 808 cancelIdleTimer(/* permanent= */ true); 809 } 810 } 811 812 syncContext.execute(new CancelIdleTimer()); 813 return this; 814 } 815 816 /** 817 * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although 818 * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely 819 * return {@code false} immediately after this method returns. 820 */ 821 @Override 822 public ManagedChannelImpl shutdownNow() { 823 channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called"); 824 shutdown(); 825 realChannel.shutdownNow(); 826 final class ShutdownNow implements Runnable { 827 @Override 828 public void run() { 829 if (shutdownNowed) { 830 return; 831 } 832 shutdownNowed = true; 833 maybeShutdownNowSubchannels(); 834 } 835 } 836 837 syncContext.execute(new ShutdownNow()); 838 return this; 839 } 840 841 // Called from syncContext 842 @VisibleForTesting 843 void panic(final Throwable t) { 844 if (panicMode) { 845 // Preserve the first panic information 846 return; 847 } 848 panicMode = true; 849 cancelIdleTimer(/* permanent= */ true); 850 shutdownNameResolverAndLoadBalancer(false); 851 final class PanicSubchannelPicker extends SubchannelPicker { 852 private final PickResult panicPickResult = 853 PickResult.withDrop( 854 Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t)); 855 856 @Override 857 public PickResult pickSubchannel(PickSubchannelArgs args) { 858 return panicPickResult; 859 } 860 861 @Override 862 public String toString() { 863 return MoreObjects.toStringHelper(PanicSubchannelPicker.class) 864 .add("panicPickResult", panicPickResult) 865 .toString(); 866 } 867 } 868 869 updateSubchannelPicker(new PanicSubchannelPicker()); 870 realChannel.updateConfigSelector(null); 871 channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE"); 872 channelStateManager.gotoState(TRANSIENT_FAILURE); 873 } 874 875 @VisibleForTesting 876 boolean isInPanicMode() { 877 return panicMode; 878 } 879 880 // Called from syncContext 881 private void updateSubchannelPicker(SubchannelPicker newPicker) { 882 subchannelPicker = newPicker; 883 delayedTransport.reprocess(newPicker); 884 } 885 886 @Override 887 public boolean isShutdown() { 888 return shutdown.get(); 889 } 890 891 @Override 892 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 893 return terminatedLatch.await(timeout, unit); 894 } 895 896 @Override 897 public boolean isTerminated() { 898 return terminated; 899 } 900 901 /* 902 * Creates a new outgoing call on the channel. 903 */ 904 @Override 905 public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method, 906 CallOptions callOptions) { 907 return interceptorChannel.newCall(method, callOptions); 908 } 909 910 @Override 911 public String authority() { 912 return interceptorChannel.authority(); 913 } 914 915 private Executor getCallExecutor(CallOptions callOptions) { 916 Executor executor = callOptions.getExecutor(); 917 if (executor == null) { 918 executor = this.executor; 919 } 920 return executor; 921 } 922 923 private class RealChannel extends Channel { 924 // Reference to null if no config selector is available from resolution result 925 // Reference must be set() from syncContext 926 private final AtomicReference<InternalConfigSelector> configSelector = 927 new AtomicReference<>(INITIAL_PENDING_SELECTOR); 928 // Set when the NameResolver is initially created. When we create a new NameResolver for the 929 // same target, the new instance must have the same value. 930 private final String authority; 931 932 private final Channel clientCallImplChannel = new Channel() { 933 @Override 934 public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall( 935 MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) { 936 return new ClientCallImpl<>( 937 method, 938 getCallExecutor(callOptions), 939 callOptions, 940 transportProvider, 941 terminated ? null : transportFactory.getScheduledExecutorService(), 942 channelCallTracer, 943 null) 944 .setFullStreamDecompression(fullStreamDecompression) 945 .setDecompressorRegistry(decompressorRegistry) 946 .setCompressorRegistry(compressorRegistry); 947 } 948 949 @Override 950 public String authority() { 951 return authority; 952 } 953 }; 954 955 private RealChannel(String authority) { 956 this.authority = checkNotNull(authority, "authority"); 957 } 958 959 @Override 960 public <ReqT, RespT> ClientCall<ReqT, RespT> newCall( 961 MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) { 962 if (configSelector.get() != INITIAL_PENDING_SELECTOR) { 963 return newClientCall(method, callOptions); 964 } 965 syncContext.execute(new Runnable() { 966 @Override 967 public void run() { 968 exitIdleMode(); 969 } 970 }); 971 if (configSelector.get() != INITIAL_PENDING_SELECTOR) { 972 // This is an optimization for the case (typically with InProcessTransport) when name 973 // resolution result is immediately available at this point. Otherwise, some users' 974 // tests might observe slight behavior difference from earlier grpc versions. 975 return newClientCall(method, callOptions); 976 } 977 if (shutdown.get()) { 978 // Return a failing ClientCall. 979 return new ClientCall<ReqT, RespT>() { 980 @Override 981 public void start(Listener<RespT> responseListener, Metadata headers) { 982 responseListener.onClose(SHUTDOWN_STATUS, new Metadata()); 983 } 984 985 @Override public void request(int numMessages) {} 986 987 @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {} 988 989 @Override public void halfClose() {} 990 991 @Override public void sendMessage(ReqT message) {} 992 }; 993 } 994 Context context = Context.current(); 995 final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions); 996 syncContext.execute(new Runnable() { 997 @Override 998 public void run() { 999 if (configSelector.get() == INITIAL_PENDING_SELECTOR) { 1000 if (pendingCalls == null) { 1001 pendingCalls = new LinkedHashSet<>(); 1002 inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true); 1003 } 1004 pendingCalls.add(pendingCall); 1005 } else { 1006 pendingCall.reprocess(); 1007 } 1008 } 1009 }); 1010 return pendingCall; 1011 } 1012 1013 // Must run in SynchronizationContext. 1014 void updateConfigSelector(@Nullable InternalConfigSelector config) { 1015 InternalConfigSelector prevConfig = configSelector.get(); 1016 configSelector.set(config); 1017 if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) { 1018 for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) { 1019 pendingCall.reprocess(); 1020 } 1021 } 1022 } 1023 1024 // Must run in SynchronizationContext. 1025 void onConfigError() { 1026 if (configSelector.get() == INITIAL_PENDING_SELECTOR) { 1027 updateConfigSelector(null); 1028 } 1029 } 1030 1031 void shutdown() { 1032 final class RealChannelShutdown implements Runnable { 1033 @Override 1034 public void run() { 1035 if (pendingCalls == null) { 1036 if (configSelector.get() == INITIAL_PENDING_SELECTOR) { 1037 configSelector.set(null); 1038 } 1039 uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS); 1040 } 1041 } 1042 } 1043 1044 syncContext.execute(new RealChannelShutdown()); 1045 } 1046 1047 void shutdownNow() { 1048 final class RealChannelShutdownNow implements Runnable { 1049 @Override 1050 public void run() { 1051 if (configSelector.get() == INITIAL_PENDING_SELECTOR) { 1052 configSelector.set(null); 1053 } 1054 if (pendingCalls != null) { 1055 for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) { 1056 pendingCall.cancel("Channel is forcefully shutdown", null); 1057 } 1058 } 1059 uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS); 1060 } 1061 } 1062 1063 syncContext.execute(new RealChannelShutdownNow()); 1064 } 1065 1066 @Override 1067 public String authority() { 1068 return authority; 1069 } 1070 1071 private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> { 1072 final Context context; 1073 final MethodDescriptor<ReqT, RespT> method; 1074 final CallOptions callOptions; 1075 1076 PendingCall( 1077 Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) { 1078 super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline()); 1079 this.context = context; 1080 this.method = method; 1081 this.callOptions = callOptions; 1082 } 1083 1084 /** Called when it's ready to create a real call and reprocess the pending call. */ 1085 void reprocess() { 1086 ClientCall<ReqT, RespT> realCall; 1087 Context previous = context.attach(); 1088 try { 1089 CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED, true); 1090 realCall = newClientCall(method, delayResolutionOption); 1091 } finally { 1092 context.detach(previous); 1093 } 1094 Runnable toRun = setCall(realCall); 1095 if (toRun == null) { 1096 syncContext.execute(new PendingCallRemoval()); 1097 } else { 1098 getCallExecutor(callOptions).execute(new Runnable() { 1099 @Override 1100 public void run() { 1101 toRun.run(); 1102 syncContext.execute(new PendingCallRemoval()); 1103 } 1104 }); 1105 } 1106 } 1107 1108 @Override 1109 protected void callCancelled() { 1110 super.callCancelled(); 1111 syncContext.execute(new PendingCallRemoval()); 1112 } 1113 1114 final class PendingCallRemoval implements Runnable { 1115 @Override 1116 public void run() { 1117 if (pendingCalls != null) { 1118 pendingCalls.remove(PendingCall.this); 1119 if (pendingCalls.isEmpty()) { 1120 inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false); 1121 pendingCalls = null; 1122 if (shutdown.get()) { 1123 uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS); 1124 } 1125 } 1126 } 1127 } 1128 } 1129 } 1130 1131 private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall( 1132 MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) { 1133 InternalConfigSelector selector = configSelector.get(); 1134 if (selector == null) { 1135 return clientCallImplChannel.newCall(method, callOptions); 1136 } 1137 if (selector instanceof ServiceConfigConvertedSelector) { 1138 MethodInfo methodInfo = 1139 ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method); 1140 if (methodInfo != null) { 1141 callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo); 1142 } 1143 return clientCallImplChannel.newCall(method, callOptions); 1144 } 1145 return new ConfigSelectingClientCall<>( 1146 selector, clientCallImplChannel, executor, method, callOptions); 1147 } 1148 } 1149 1150 /** 1151 * A client call for a given channel that applies a given config selector when it starts. 1152 */ 1153 static final class ConfigSelectingClientCall<ReqT, RespT> 1154 extends ForwardingClientCall<ReqT, RespT> { 1155 1156 private final InternalConfigSelector configSelector; 1157 private final Channel channel; 1158 private final Executor callExecutor; 1159 private final MethodDescriptor<ReqT, RespT> method; 1160 private final Context context; 1161 private CallOptions callOptions; 1162 1163 private ClientCall<ReqT, RespT> delegate; 1164 1165 ConfigSelectingClientCall( 1166 InternalConfigSelector configSelector, Channel channel, Executor channelExecutor, 1167 MethodDescriptor<ReqT, RespT> method, 1168 CallOptions callOptions) { 1169 this.configSelector = configSelector; 1170 this.channel = channel; 1171 this.method = method; 1172 this.callExecutor = 1173 callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor(); 1174 this.callOptions = callOptions.withExecutor(callExecutor); 1175 this.context = Context.current(); 1176 } 1177 1178 @Override 1179 protected ClientCall<ReqT, RespT> delegate() { 1180 return delegate; 1181 } 1182 1183 @SuppressWarnings("unchecked") 1184 @Override 1185 public void start(Listener<RespT> observer, Metadata headers) { 1186 PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions); 1187 InternalConfigSelector.Result result = configSelector.selectConfig(args); 1188 Status status = result.getStatus(); 1189 if (!status.isOk()) { 1190 executeCloseObserverInContext(observer, 1191 GrpcUtil.replaceInappropriateControlPlaneStatus(status)); 1192 delegate = (ClientCall<ReqT, RespT>) NOOP_CALL; 1193 return; 1194 } 1195 ClientInterceptor interceptor = result.getInterceptor(); 1196 ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig(); 1197 MethodInfo methodInfo = config.getMethodConfig(method); 1198 if (methodInfo != null) { 1199 callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo); 1200 } 1201 if (interceptor != null) { 1202 delegate = interceptor.interceptCall(method, callOptions, channel); 1203 } else { 1204 delegate = channel.newCall(method, callOptions); 1205 } 1206 delegate.start(observer, headers); 1207 } 1208 1209 private void executeCloseObserverInContext( 1210 final Listener<RespT> observer, final Status status) { 1211 class CloseInContext extends ContextRunnable { 1212 CloseInContext() { 1213 super(context); 1214 } 1215 1216 @Override 1217 public void runInContext() { 1218 observer.onClose(status, new Metadata()); 1219 } 1220 } 1221 1222 callExecutor.execute(new CloseInContext()); 1223 } 1224 1225 @Override 1226 public void cancel(@Nullable String message, @Nullable Throwable cause) { 1227 if (delegate != null) { 1228 delegate.cancel(message, cause); 1229 } 1230 } 1231 } 1232 1233 private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() { 1234 @Override 1235 public void start(Listener<Object> responseListener, Metadata headers) {} 1236 1237 @Override 1238 public void request(int numMessages) {} 1239 1240 @Override 1241 public void cancel(String message, Throwable cause) {} 1242 1243 @Override 1244 public void halfClose() {} 1245 1246 @Override 1247 public void sendMessage(Object message) {} 1248 1249 // Always returns {@code false}, since this is only used when the startup of the call fails. 1250 @Override 1251 public boolean isReady() { 1252 return false; 1253 } 1254 }; 1255 1256 /** 1257 * Terminate the channel if termination conditions are met. 1258 */ 1259 // Must be run from syncContext 1260 private void maybeTerminateChannel() { 1261 if (terminated) { 1262 return; 1263 } 1264 if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) { 1265 channelLogger.log(ChannelLogLevel.INFO, "Terminated"); 1266 channelz.removeRootChannel(this); 1267 executorPool.returnObject(executor); 1268 balancerRpcExecutorHolder.release(); 1269 offloadExecutorHolder.release(); 1270 // Release the transport factory so that it can deallocate any resources. 1271 transportFactory.close(); 1272 1273 terminated = true; 1274 terminatedLatch.countDown(); 1275 } 1276 } 1277 1278 // Must be called from syncContext 1279 private void handleInternalSubchannelState(ConnectivityStateInfo newState) { 1280 if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) { 1281 refreshNameResolution(); 1282 } 1283 } 1284 1285 @Override 1286 @SuppressWarnings("deprecation") 1287 public ConnectivityState getState(boolean requestConnection) { 1288 ConnectivityState savedChannelState = channelStateManager.getState(); 1289 if (requestConnection && savedChannelState == IDLE) { 1290 final class RequestConnection implements Runnable { 1291 @Override 1292 public void run() { 1293 exitIdleMode(); 1294 if (subchannelPicker != null) { 1295 subchannelPicker.requestConnection(); 1296 } 1297 if (lbHelper != null) { 1298 lbHelper.lb.requestConnection(); 1299 } 1300 } 1301 } 1302 1303 syncContext.execute(new RequestConnection()); 1304 } 1305 return savedChannelState; 1306 } 1307 1308 @Override 1309 public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) { 1310 final class NotifyStateChanged implements Runnable { 1311 @Override 1312 public void run() { 1313 channelStateManager.notifyWhenStateChanged(callback, executor, source); 1314 } 1315 } 1316 1317 syncContext.execute(new NotifyStateChanged()); 1318 } 1319 1320 @Override 1321 public void resetConnectBackoff() { 1322 final class ResetConnectBackoff implements Runnable { 1323 @Override 1324 public void run() { 1325 if (shutdown.get()) { 1326 return; 1327 } 1328 if (nameResolverStarted) { 1329 refreshNameResolution(); 1330 } 1331 for (InternalSubchannel subchannel : subchannels) { 1332 subchannel.resetConnectBackoff(); 1333 } 1334 for (OobChannel oobChannel : oobChannels) { 1335 oobChannel.resetConnectBackoff(); 1336 } 1337 } 1338 } 1339 1340 syncContext.execute(new ResetConnectBackoff()); 1341 } 1342 1343 @Override 1344 public void enterIdle() { 1345 final class PrepareToLoseNetworkRunnable implements Runnable { 1346 @Override 1347 public void run() { 1348 if (shutdown.get() || lbHelper == null) { 1349 return; 1350 } 1351 cancelIdleTimer(/* permanent= */ false); 1352 enterIdleMode(); 1353 } 1354 } 1355 1356 syncContext.execute(new PrepareToLoseNetworkRunnable()); 1357 } 1358 1359 /** 1360 * A registry that prevents channel shutdown from killing existing retry attempts that are in 1361 * backoff. 1362 */ 1363 private final class UncommittedRetriableStreamsRegistry { 1364 // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream, 1365 // it's worthwhile to look for a lock-free approach. 1366 final Object lock = new Object(); 1367 1368 @GuardedBy("lock") 1369 Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>(); 1370 1371 @GuardedBy("lock") 1372 Status shutdownStatus; 1373 1374 void onShutdown(Status reason) { 1375 boolean shouldShutdownDelayedTransport = false; 1376 synchronized (lock) { 1377 if (shutdownStatus != null) { 1378 return; 1379 } 1380 shutdownStatus = reason; 1381 // Keep the delayedTransport open until there is no more uncommitted streams, b/c those 1382 // retriable streams, which may be in backoff and not using any transport, are already 1383 // started RPCs. 1384 if (uncommittedRetriableStreams.isEmpty()) { 1385 shouldShutdownDelayedTransport = true; 1386 } 1387 } 1388 1389 if (shouldShutdownDelayedTransport) { 1390 delayedTransport.shutdown(reason); 1391 } 1392 } 1393 1394 void onShutdownNow(Status reason) { 1395 onShutdown(reason); 1396 Collection<ClientStream> streams; 1397 1398 synchronized (lock) { 1399 streams = new ArrayList<>(uncommittedRetriableStreams); 1400 } 1401 1402 for (ClientStream stream : streams) { 1403 stream.cancel(reason); 1404 } 1405 delayedTransport.shutdownNow(reason); 1406 } 1407 1408 /** 1409 * Registers a RetriableStream and return null if not shutdown, otherwise just returns the 1410 * shutdown Status. 1411 */ 1412 @Nullable 1413 Status add(RetriableStream<?> retriableStream) { 1414 synchronized (lock) { 1415 if (shutdownStatus != null) { 1416 return shutdownStatus; 1417 } 1418 uncommittedRetriableStreams.add(retriableStream); 1419 return null; 1420 } 1421 } 1422 1423 void remove(RetriableStream<?> retriableStream) { 1424 Status shutdownStatusCopy = null; 1425 1426 synchronized (lock) { 1427 uncommittedRetriableStreams.remove(retriableStream); 1428 if (uncommittedRetriableStreams.isEmpty()) { 1429 shutdownStatusCopy = shutdownStatus; 1430 // Because retriable transport is long-lived, we take this opportunity to down-size the 1431 // hashmap. 1432 uncommittedRetriableStreams = new HashSet<>(); 1433 } 1434 } 1435 1436 if (shutdownStatusCopy != null) { 1437 delayedTransport.shutdown(shutdownStatusCopy); 1438 } 1439 } 1440 } 1441 1442 private final class LbHelperImpl extends LoadBalancer.Helper { 1443 AutoConfiguredLoadBalancer lb; 1444 1445 @Override 1446 public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) { 1447 syncContext.throwIfNotInThisSynchronizationContext(); 1448 // No new subchannel should be created after load balancer has been shutdown. 1449 checkState(!terminating, "Channel is being terminated"); 1450 return new SubchannelImpl(args); 1451 } 1452 1453 @Override 1454 public void updateBalancingState( 1455 final ConnectivityState newState, final SubchannelPicker newPicker) { 1456 syncContext.throwIfNotInThisSynchronizationContext(); 1457 checkNotNull(newState, "newState"); 1458 checkNotNull(newPicker, "newPicker"); 1459 final class UpdateBalancingState implements Runnable { 1460 @Override 1461 public void run() { 1462 if (LbHelperImpl.this != lbHelper) { 1463 return; 1464 } 1465 updateSubchannelPicker(newPicker); 1466 // It's not appropriate to report SHUTDOWN state from lb. 1467 // Ignore the case of newState == SHUTDOWN for now. 1468 if (newState != SHUTDOWN) { 1469 channelLogger.log( 1470 ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker); 1471 channelStateManager.gotoState(newState); 1472 } 1473 } 1474 } 1475 1476 syncContext.execute(new UpdateBalancingState()); 1477 } 1478 1479 @Override 1480 public void refreshNameResolution() { 1481 syncContext.throwIfNotInThisSynchronizationContext(); 1482 final class LoadBalancerRefreshNameResolution implements Runnable { 1483 @Override 1484 public void run() { 1485 ManagedChannelImpl.this.refreshNameResolution(); 1486 } 1487 } 1488 1489 syncContext.execute(new LoadBalancerRefreshNameResolution()); 1490 } 1491 1492 @Override 1493 public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) { 1494 return createOobChannel(Collections.singletonList(addressGroup), authority); 1495 } 1496 1497 @Override 1498 public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup, 1499 String authority) { 1500 // TODO(ejona): can we be even stricter? Like terminating? 1501 checkState(!terminated, "Channel is terminated"); 1502 long oobChannelCreationTime = timeProvider.currentTimeNanos(); 1503 InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null); 1504 InternalLogId subchannelLogId = 1505 InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority); 1506 ChannelTracer oobChannelTracer = 1507 new ChannelTracer( 1508 oobLogId, maxTraceEvents, oobChannelCreationTime, 1509 "OobChannel for " + addressGroup); 1510 final OobChannel oobChannel = new OobChannel( 1511 authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(), 1512 syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider); 1513 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 1514 .setDescription("Child OobChannel created") 1515 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1516 .setTimestampNanos(oobChannelCreationTime) 1517 .setChannelRef(oobChannel) 1518 .build()); 1519 ChannelTracer subchannelTracer = 1520 new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime, 1521 "Subchannel for " + addressGroup); 1522 ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider); 1523 final class ManagedOobChannelCallback extends InternalSubchannel.Callback { 1524 @Override 1525 void onTerminated(InternalSubchannel is) { 1526 oobChannels.remove(oobChannel); 1527 channelz.removeSubchannel(is); 1528 oobChannel.handleSubchannelTerminated(); 1529 maybeTerminateChannel(); 1530 } 1531 1532 @Override 1533 void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { 1534 // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's 1535 // state and refresh name resolution if necessary. 1536 handleInternalSubchannelState(newState); 1537 oobChannel.handleSubchannelStateChange(newState); 1538 } 1539 } 1540 1541 final InternalSubchannel internalSubchannel = new InternalSubchannel( 1542 addressGroup, 1543 authority, userAgent, backoffPolicyProvider, oobTransportFactory, 1544 oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext, 1545 // All callback methods are run from syncContext 1546 new ManagedOobChannelCallback(), 1547 channelz, 1548 callTracerFactory.create(), 1549 subchannelTracer, 1550 subchannelLogId, 1551 subchannelLogger); 1552 oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder() 1553 .setDescription("Child Subchannel created") 1554 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1555 .setTimestampNanos(oobChannelCreationTime) 1556 .setSubchannelRef(internalSubchannel) 1557 .build()); 1558 channelz.addSubchannel(oobChannel); 1559 channelz.addSubchannel(internalSubchannel); 1560 oobChannel.setSubchannel(internalSubchannel); 1561 final class AddOobChannel implements Runnable { 1562 @Override 1563 public void run() { 1564 if (terminating) { 1565 oobChannel.shutdown(); 1566 } 1567 if (!terminated) { 1568 // If channel has not terminated, it will track the subchannel and block termination 1569 // for it. 1570 oobChannels.add(oobChannel); 1571 } 1572 } 1573 } 1574 1575 syncContext.execute(new AddOobChannel()); 1576 return oobChannel; 1577 } 1578 1579 @Deprecated 1580 @Override 1581 public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) { 1582 return createResolvingOobChannelBuilder(target, new DefaultChannelCreds()) 1583 // Override authority to keep the old behavior. 1584 // createResolvingOobChannelBuilder(String target) will be deleted soon. 1585 .overrideAuthority(getAuthority()); 1586 } 1587 1588 // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated 1589 // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz. 1590 @Override 1591 public ManagedChannelBuilder<?> createResolvingOobChannelBuilder( 1592 final String target, final ChannelCredentials channelCreds) { 1593 checkNotNull(channelCreds, "channelCreds"); 1594 1595 final class ResolvingOobChannelBuilder 1596 extends ForwardingChannelBuilder<ResolvingOobChannelBuilder> { 1597 final ManagedChannelBuilder<?> delegate; 1598 1599 ResolvingOobChannelBuilder() { 1600 final ClientTransportFactory transportFactory; 1601 CallCredentials callCredentials; 1602 if (channelCreds instanceof DefaultChannelCreds) { 1603 transportFactory = originalTransportFactory; 1604 callCredentials = null; 1605 } else { 1606 SwapChannelCredentialsResult swapResult = 1607 originalTransportFactory.swapChannelCredentials(channelCreds); 1608 if (swapResult == null) { 1609 delegate = Grpc.newChannelBuilder(target, channelCreds); 1610 return; 1611 } else { 1612 transportFactory = swapResult.transportFactory; 1613 callCredentials = swapResult.callCredentials; 1614 } 1615 } 1616 ClientTransportFactoryBuilder transportFactoryBuilder = 1617 new ClientTransportFactoryBuilder() { 1618 @Override 1619 public ClientTransportFactory buildClientTransportFactory() { 1620 return transportFactory; 1621 } 1622 }; 1623 delegate = new ManagedChannelImplBuilder( 1624 target, 1625 channelCreds, 1626 callCredentials, 1627 transportFactoryBuilder, 1628 new FixedPortProvider(nameResolverArgs.getDefaultPort())); 1629 } 1630 1631 @Override 1632 protected ManagedChannelBuilder<?> delegate() { 1633 return delegate; 1634 } 1635 } 1636 1637 checkState(!terminated, "Channel is terminated"); 1638 1639 @SuppressWarnings("deprecation") 1640 ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder() 1641 .nameResolverFactory(nameResolverFactory); 1642 1643 return builder 1644 // TODO(zdapeng): executors should not outlive the parent channel. 1645 .executor(executor) 1646 .offloadExecutor(offloadExecutorHolder.getExecutor()) 1647 .maxTraceEvents(maxTraceEvents) 1648 .proxyDetector(nameResolverArgs.getProxyDetector()) 1649 .userAgent(userAgent); 1650 } 1651 1652 @Override 1653 public ChannelCredentials getUnsafeChannelCredentials() { 1654 if (originalChannelCreds == null) { 1655 return new DefaultChannelCreds(); 1656 } 1657 return originalChannelCreds; 1658 } 1659 1660 @Override 1661 public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) { 1662 updateOobChannelAddresses(channel, Collections.singletonList(eag)); 1663 } 1664 1665 @Override 1666 public void updateOobChannelAddresses(ManagedChannel channel, 1667 List<EquivalentAddressGroup> eag) { 1668 checkArgument(channel instanceof OobChannel, 1669 "channel must have been returned from createOobChannel"); 1670 ((OobChannel) channel).updateAddresses(eag); 1671 } 1672 1673 @Override 1674 public String getAuthority() { 1675 return ManagedChannelImpl.this.authority(); 1676 } 1677 1678 @Override 1679 public SynchronizationContext getSynchronizationContext() { 1680 return syncContext; 1681 } 1682 1683 @Override 1684 public ScheduledExecutorService getScheduledExecutorService() { 1685 return scheduledExecutor; 1686 } 1687 1688 @Override 1689 public ChannelLogger getChannelLogger() { 1690 return channelLogger; 1691 } 1692 1693 @Override 1694 public NameResolver.Args getNameResolverArgs() { 1695 return nameResolverArgs; 1696 } 1697 1698 @Override 1699 public NameResolverRegistry getNameResolverRegistry() { 1700 return nameResolverRegistry; 1701 } 1702 1703 /** 1704 * A placeholder for channel creds if user did not specify channel creds for the channel. 1705 */ 1706 // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null 1707 // channel creds. 1708 final class DefaultChannelCreds extends ChannelCredentials { 1709 @Override 1710 public ChannelCredentials withoutBearerTokens() { 1711 return this; 1712 } 1713 } 1714 } 1715 1716 final class NameResolverListener extends NameResolver.Listener2 { 1717 final LbHelperImpl helper; 1718 final NameResolver resolver; 1719 1720 NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) { 1721 this.helper = checkNotNull(helperImpl, "helperImpl"); 1722 this.resolver = checkNotNull(resolver, "resolver"); 1723 } 1724 1725 @Override 1726 public void onResult(final ResolutionResult resolutionResult) { 1727 final class NamesResolved implements Runnable { 1728 1729 @SuppressWarnings("ReferenceEquality") 1730 @Override 1731 public void run() { 1732 if (ManagedChannelImpl.this.nameResolver != resolver) { 1733 return; 1734 } 1735 1736 List<EquivalentAddressGroup> servers = resolutionResult.getAddresses(); 1737 channelLogger.log( 1738 ChannelLogLevel.DEBUG, 1739 "Resolved address: {0}, config={1}", 1740 servers, 1741 resolutionResult.getAttributes()); 1742 1743 if (lastResolutionState != ResolutionState.SUCCESS) { 1744 channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers); 1745 lastResolutionState = ResolutionState.SUCCESS; 1746 } 1747 1748 ConfigOrError configOrError = resolutionResult.getServiceConfig(); 1749 ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes() 1750 .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY); 1751 InternalConfigSelector resolvedConfigSelector = 1752 resolutionResult.getAttributes().get(InternalConfigSelector.KEY); 1753 ManagedChannelServiceConfig validServiceConfig = 1754 configOrError != null && configOrError.getConfig() != null 1755 ? (ManagedChannelServiceConfig) configOrError.getConfig() 1756 : null; 1757 Status serviceConfigError = configOrError != null ? configOrError.getError() : null; 1758 1759 ManagedChannelServiceConfig effectiveServiceConfig; 1760 if (!lookUpServiceConfig) { 1761 if (validServiceConfig != null) { 1762 channelLogger.log( 1763 ChannelLogLevel.INFO, 1764 "Service config from name resolver discarded by channel settings"); 1765 } 1766 effectiveServiceConfig = 1767 defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig; 1768 if (resolvedConfigSelector != null) { 1769 channelLogger.log( 1770 ChannelLogLevel.INFO, 1771 "Config selector from name resolver discarded by channel settings"); 1772 } 1773 realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector()); 1774 } else { 1775 // Try to use config if returned from name resolver 1776 // Otherwise, try to use the default config if available 1777 if (validServiceConfig != null) { 1778 effectiveServiceConfig = validServiceConfig; 1779 if (resolvedConfigSelector != null) { 1780 realChannel.updateConfigSelector(resolvedConfigSelector); 1781 if (effectiveServiceConfig.getDefaultConfigSelector() != null) { 1782 channelLogger.log( 1783 ChannelLogLevel.DEBUG, 1784 "Method configs in service config will be discarded due to presence of" 1785 + "config-selector"); 1786 } 1787 } else { 1788 realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector()); 1789 } 1790 } else if (defaultServiceConfig != null) { 1791 effectiveServiceConfig = defaultServiceConfig; 1792 realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector()); 1793 channelLogger.log( 1794 ChannelLogLevel.INFO, 1795 "Received no service config, using default service config"); 1796 } else if (serviceConfigError != null) { 1797 if (!serviceConfigUpdated) { 1798 // First DNS lookup has invalid service config, and cannot fall back to default 1799 channelLogger.log( 1800 ChannelLogLevel.INFO, 1801 "Fallback to error due to invalid first service config without default config"); 1802 // This error could be an "inappropriate" control plane error that should not bleed 1803 // through to client code using gRPC. We let them flow through here to the LB as 1804 // we later check for these error codes when investigating pick results in 1805 // GrpcUtil.getTransportFromPickResult(). 1806 onError(configOrError.getError()); 1807 if (resolutionResultListener != null) { 1808 resolutionResultListener.resolutionAttempted(false); 1809 } 1810 return; 1811 } else { 1812 effectiveServiceConfig = lastServiceConfig; 1813 } 1814 } else { 1815 effectiveServiceConfig = EMPTY_SERVICE_CONFIG; 1816 realChannel.updateConfigSelector(null); 1817 } 1818 if (!effectiveServiceConfig.equals(lastServiceConfig)) { 1819 channelLogger.log( 1820 ChannelLogLevel.INFO, 1821 "Service config changed{0}", 1822 effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : ""); 1823 lastServiceConfig = effectiveServiceConfig; 1824 transportProvider.throttle = effectiveServiceConfig.getRetryThrottling(); 1825 } 1826 1827 try { 1828 // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS 1829 // and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But, 1830 // lbNeedAddress is not deterministic 1831 serviceConfigUpdated = true; 1832 } catch (RuntimeException re) { 1833 logger.log( 1834 Level.WARNING, 1835 "[" + getLogId() + "] Unexpected exception from parsing service config", 1836 re); 1837 } 1838 } 1839 1840 Attributes effectiveAttrs = resolutionResult.getAttributes(); 1841 // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. 1842 if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) { 1843 Attributes.Builder attrBuilder = 1844 effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY); 1845 Map<String, ?> healthCheckingConfig = 1846 effectiveServiceConfig.getHealthCheckingConfig(); 1847 if (healthCheckingConfig != null) { 1848 attrBuilder 1849 .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig) 1850 .build(); 1851 } 1852 Attributes attributes = attrBuilder.build(); 1853 1854 boolean lastAddressesAccepted = helper.lb.tryAcceptResolvedAddresses( 1855 ResolvedAddresses.newBuilder() 1856 .setAddresses(servers) 1857 .setAttributes(attributes) 1858 .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig()) 1859 .build()); 1860 // If a listener is provided, let it know if the addresses were accepted. 1861 if (resolutionResultListener != null) { 1862 resolutionResultListener.resolutionAttempted(lastAddressesAccepted); 1863 } 1864 } 1865 } 1866 } 1867 1868 syncContext.execute(new NamesResolved()); 1869 } 1870 1871 @Override 1872 public void onError(final Status error) { 1873 checkArgument(!error.isOk(), "the error status must not be OK"); 1874 final class NameResolverErrorHandler implements Runnable { 1875 @Override 1876 public void run() { 1877 handleErrorInSyncContext(error); 1878 } 1879 } 1880 1881 syncContext.execute(new NameResolverErrorHandler()); 1882 } 1883 1884 private void handleErrorInSyncContext(Status error) { 1885 logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}", 1886 new Object[] {getLogId(), error}); 1887 realChannel.onConfigError(); 1888 if (lastResolutionState != ResolutionState.ERROR) { 1889 channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error); 1890 lastResolutionState = ResolutionState.ERROR; 1891 } 1892 // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. 1893 if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) { 1894 return; 1895 } 1896 1897 helper.lb.handleNameResolutionError(error); 1898 } 1899 } 1900 1901 private final class SubchannelImpl extends AbstractSubchannel { 1902 final CreateSubchannelArgs args; 1903 final InternalLogId subchannelLogId; 1904 final ChannelLoggerImpl subchannelLogger; 1905 final ChannelTracer subchannelTracer; 1906 List<EquivalentAddressGroup> addressGroups; 1907 InternalSubchannel subchannel; 1908 boolean started; 1909 boolean shutdown; 1910 ScheduledHandle delayedShutdownTask; 1911 1912 SubchannelImpl(CreateSubchannelArgs args) { 1913 checkNotNull(args, "args"); 1914 addressGroups = args.getAddresses(); 1915 if (authorityOverride != null) { 1916 List<EquivalentAddressGroup> eagsWithoutOverrideAttr = 1917 stripOverrideAuthorityAttributes(args.getAddresses()); 1918 args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build(); 1919 } 1920 this.args = args; 1921 subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority()); 1922 subchannelTracer = new ChannelTracer( 1923 subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(), 1924 "Subchannel for " + args.getAddresses()); 1925 subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider); 1926 } 1927 1928 @Override 1929 public void start(final SubchannelStateListener listener) { 1930 syncContext.throwIfNotInThisSynchronizationContext(); 1931 checkState(!started, "already started"); 1932 checkState(!shutdown, "already shutdown"); 1933 checkState(!terminating, "Channel is being terminated"); 1934 started = true; 1935 final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback { 1936 // All callbacks are run in syncContext 1937 @Override 1938 void onTerminated(InternalSubchannel is) { 1939 subchannels.remove(is); 1940 channelz.removeSubchannel(is); 1941 maybeTerminateChannel(); 1942 } 1943 1944 @Override 1945 void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { 1946 checkState(listener != null, "listener is null"); 1947 listener.onSubchannelState(newState); 1948 } 1949 1950 @Override 1951 void onInUse(InternalSubchannel is) { 1952 inUseStateAggregator.updateObjectInUse(is, true); 1953 } 1954 1955 @Override 1956 void onNotInUse(InternalSubchannel is) { 1957 inUseStateAggregator.updateObjectInUse(is, false); 1958 } 1959 } 1960 1961 final InternalSubchannel internalSubchannel = new InternalSubchannel( 1962 args.getAddresses(), 1963 authority(), 1964 userAgent, 1965 backoffPolicyProvider, 1966 transportFactory, 1967 transportFactory.getScheduledExecutorService(), 1968 stopwatchSupplier, 1969 syncContext, 1970 new ManagedInternalSubchannelCallback(), 1971 channelz, 1972 callTracerFactory.create(), 1973 subchannelTracer, 1974 subchannelLogId, 1975 subchannelLogger); 1976 1977 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 1978 .setDescription("Child Subchannel started") 1979 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1980 .setTimestampNanos(timeProvider.currentTimeNanos()) 1981 .setSubchannelRef(internalSubchannel) 1982 .build()); 1983 1984 this.subchannel = internalSubchannel; 1985 channelz.addSubchannel(internalSubchannel); 1986 subchannels.add(internalSubchannel); 1987 } 1988 1989 @Override 1990 InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() { 1991 checkState(started, "not started"); 1992 return subchannel; 1993 } 1994 1995 @Override 1996 public void shutdown() { 1997 syncContext.throwIfNotInThisSynchronizationContext(); 1998 if (subchannel == null) { 1999 // start() was not successful 2000 shutdown = true; 2001 return; 2002 } 2003 if (shutdown) { 2004 if (terminating && delayedShutdownTask != null) { 2005 // shutdown() was previously called when terminating == false, thus a delayed shutdown() 2006 // was scheduled. Now since terminating == true, We should expedite the shutdown. 2007 delayedShutdownTask.cancel(); 2008 delayedShutdownTask = null; 2009 // Will fall through to the subchannel.shutdown() at the end. 2010 } else { 2011 return; 2012 } 2013 } else { 2014 shutdown = true; 2015 } 2016 // Add a delay to shutdown to deal with the race between 1) a transport being picked and 2017 // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g., 2018 // because of address change, or because LoadBalancer is shutdown by Channel entering idle 2019 // mode). If (2) wins, the app will see a spurious error. We work around this by delaying 2020 // shutdown of Subchannel for a few seconds here. 2021 // 2022 // TODO(zhangkun83): consider a better approach 2023 // (https://github.com/grpc/grpc-java/issues/2562). 2024 if (!terminating) { 2025 final class ShutdownSubchannel implements Runnable { 2026 @Override 2027 public void run() { 2028 subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS); 2029 } 2030 } 2031 2032 delayedShutdownTask = syncContext.schedule( 2033 new LogExceptionRunnable(new ShutdownSubchannel()), 2034 SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS, 2035 transportFactory.getScheduledExecutorService()); 2036 return; 2037 } 2038 // When terminating == true, no more real streams will be created. It's safe and also 2039 // desirable to shutdown timely. 2040 subchannel.shutdown(SHUTDOWN_STATUS); 2041 } 2042 2043 @Override 2044 public void requestConnection() { 2045 syncContext.throwIfNotInThisSynchronizationContext(); 2046 checkState(started, "not started"); 2047 subchannel.obtainActiveTransport(); 2048 } 2049 2050 @Override 2051 public List<EquivalentAddressGroup> getAllAddresses() { 2052 syncContext.throwIfNotInThisSynchronizationContext(); 2053 checkState(started, "not started"); 2054 return addressGroups; 2055 } 2056 2057 @Override 2058 public Attributes getAttributes() { 2059 return args.getAttributes(); 2060 } 2061 2062 @Override 2063 public String toString() { 2064 return subchannelLogId.toString(); 2065 } 2066 2067 @Override 2068 public Channel asChannel() { 2069 checkState(started, "not started"); 2070 return new SubchannelChannel( 2071 subchannel, balancerRpcExecutorHolder.getExecutor(), 2072 transportFactory.getScheduledExecutorService(), 2073 callTracerFactory.create(), 2074 new AtomicReference<InternalConfigSelector>(null)); 2075 } 2076 2077 @Override 2078 public Object getInternalSubchannel() { 2079 checkState(started, "Subchannel is not started"); 2080 return subchannel; 2081 } 2082 2083 @Override 2084 public ChannelLogger getChannelLogger() { 2085 return subchannelLogger; 2086 } 2087 2088 @Override 2089 public void updateAddresses(List<EquivalentAddressGroup> addrs) { 2090 syncContext.throwIfNotInThisSynchronizationContext(); 2091 addressGroups = addrs; 2092 if (authorityOverride != null) { 2093 addrs = stripOverrideAuthorityAttributes(addrs); 2094 } 2095 subchannel.updateAddresses(addrs); 2096 } 2097 2098 private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes( 2099 List<EquivalentAddressGroup> eags) { 2100 List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>(); 2101 for (EquivalentAddressGroup eag : eags) { 2102 EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup( 2103 eag.getAddresses(), 2104 eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build()); 2105 eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr); 2106 } 2107 return Collections.unmodifiableList(eagsWithoutOverrideAttr); 2108 } 2109 } 2110 2111 @Override 2112 public String toString() { 2113 return MoreObjects.toStringHelper(this) 2114 .add("logId", logId.getId()) 2115 .add("target", target) 2116 .toString(); 2117 } 2118 2119 /** 2120 * Called from syncContext. 2121 */ 2122 private final class DelayedTransportListener implements ManagedClientTransport.Listener { 2123 @Override 2124 public void transportShutdown(Status s) { 2125 checkState(shutdown.get(), "Channel must have been shut down"); 2126 } 2127 2128 @Override 2129 public void transportReady() { 2130 // Don't care 2131 } 2132 2133 @Override 2134 public void transportInUse(final boolean inUse) { 2135 inUseStateAggregator.updateObjectInUse(delayedTransport, inUse); 2136 } 2137 2138 @Override 2139 public void transportTerminated() { 2140 checkState(shutdown.get(), "Channel must have been shut down"); 2141 terminating = true; 2142 shutdownNameResolverAndLoadBalancer(false); 2143 // No need to call channelStateManager since we are already in SHUTDOWN state. 2144 // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them 2145 // here. 2146 maybeShutdownNowSubchannels(); 2147 maybeTerminateChannel(); 2148 } 2149 } 2150 2151 /** 2152 * Must be accessed from syncContext. 2153 */ 2154 private final class IdleModeStateAggregator extends InUseStateAggregator<Object> { 2155 @Override 2156 protected void handleInUse() { 2157 exitIdleMode(); 2158 } 2159 2160 @Override 2161 protected void handleNotInUse() { 2162 if (shutdown.get()) { 2163 return; 2164 } 2165 rescheduleIdleTimer(); 2166 } 2167 } 2168 2169 /** 2170 * Lazily request for Executor from an executor pool. 2171 * Also act as an Executor directly to simply run a cmd 2172 */ 2173 @VisibleForTesting 2174 static final class ExecutorHolder implements Executor { 2175 private final ObjectPool<? extends Executor> pool; 2176 private Executor executor; 2177 2178 ExecutorHolder(ObjectPool<? extends Executor> executorPool) { 2179 this.pool = checkNotNull(executorPool, "executorPool"); 2180 } 2181 2182 synchronized Executor getExecutor() { 2183 if (executor == null) { 2184 executor = checkNotNull(pool.getObject(), "%s.getObject()", executor); 2185 } 2186 return executor; 2187 } 2188 2189 synchronized void release() { 2190 if (executor != null) { 2191 executor = pool.returnObject(executor); 2192 } 2193 } 2194 2195 @Override 2196 public void execute(Runnable command) { 2197 getExecutor().execute(command); 2198 } 2199 } 2200 2201 private static final class RestrictedScheduledExecutor implements ScheduledExecutorService { 2202 final ScheduledExecutorService delegate; 2203 2204 private RestrictedScheduledExecutor(ScheduledExecutorService delegate) { 2205 this.delegate = checkNotNull(delegate, "delegate"); 2206 } 2207 2208 @Override 2209 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { 2210 return delegate.schedule(callable, delay, unit); 2211 } 2212 2213 @Override 2214 public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) { 2215 return delegate.schedule(cmd, delay, unit); 2216 } 2217 2218 @Override 2219 public ScheduledFuture<?> scheduleAtFixedRate( 2220 Runnable command, long initialDelay, long period, TimeUnit unit) { 2221 return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); 2222 } 2223 2224 @Override 2225 public ScheduledFuture<?> scheduleWithFixedDelay( 2226 Runnable command, long initialDelay, long delay, TimeUnit unit) { 2227 return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit); 2228 } 2229 2230 @Override 2231 public boolean awaitTermination(long timeout, TimeUnit unit) 2232 throws InterruptedException { 2233 return delegate.awaitTermination(timeout, unit); 2234 } 2235 2236 @Override 2237 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 2238 throws InterruptedException { 2239 return delegate.invokeAll(tasks); 2240 } 2241 2242 @Override 2243 public <T> List<Future<T>> invokeAll( 2244 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 2245 throws InterruptedException { 2246 return delegate.invokeAll(tasks, timeout, unit); 2247 } 2248 2249 @Override 2250 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 2251 throws InterruptedException, ExecutionException { 2252 return delegate.invokeAny(tasks); 2253 } 2254 2255 @Override 2256 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 2257 throws InterruptedException, ExecutionException, TimeoutException { 2258 return delegate.invokeAny(tasks, timeout, unit); 2259 } 2260 2261 @Override 2262 public boolean isShutdown() { 2263 return delegate.isShutdown(); 2264 } 2265 2266 @Override 2267 public boolean isTerminated() { 2268 return delegate.isTerminated(); 2269 } 2270 2271 @Override 2272 public void shutdown() { 2273 throw new UnsupportedOperationException("Restricted: shutdown() is not allowed"); 2274 } 2275 2276 @Override 2277 public List<Runnable> shutdownNow() { 2278 throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed"); 2279 } 2280 2281 @Override 2282 public <T> Future<T> submit(Callable<T> task) { 2283 return delegate.submit(task); 2284 } 2285 2286 @Override 2287 public Future<?> submit(Runnable task) { 2288 return delegate.submit(task); 2289 } 2290 2291 @Override 2292 public <T> Future<T> submit(Runnable task, T result) { 2293 return delegate.submit(task, result); 2294 } 2295 2296 @Override 2297 public void execute(Runnable command) { 2298 delegate.execute(command); 2299 } 2300 } 2301 2302 /** 2303 * A ResolutionState indicates the status of last name resolution. 2304 */ 2305 enum ResolutionState { 2306 NO_RESOLUTION, 2307 SUCCESS, 2308 ERROR 2309 } 2310 } 2311