1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static io.grpc.ConnectivityState.IDLE; 23 import static io.grpc.ConnectivityState.SHUTDOWN; 24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 25 import static io.grpc.internal.ServiceConfigInterceptor.HEDGING_POLICY_KEY; 26 import static io.grpc.internal.ServiceConfigInterceptor.RETRY_POLICY_KEY; 27 28 import com.google.common.annotations.VisibleForTesting; 29 import com.google.common.base.MoreObjects; 30 import com.google.common.base.Stopwatch; 31 import com.google.common.base.Supplier; 32 import com.google.common.util.concurrent.ListenableFuture; 33 import com.google.common.util.concurrent.SettableFuture; 34 import io.grpc.Attributes; 35 import io.grpc.CallOptions; 36 import io.grpc.Channel; 37 import io.grpc.ClientCall; 38 import io.grpc.ClientInterceptor; 39 import io.grpc.ClientInterceptors; 40 import io.grpc.ClientStreamTracer; 41 import io.grpc.CompressorRegistry; 42 import io.grpc.ConnectivityState; 43 import io.grpc.ConnectivityStateInfo; 44 import io.grpc.Context; 45 import io.grpc.DecompressorRegistry; 46 import io.grpc.EquivalentAddressGroup; 47 import io.grpc.InternalChannelz; 48 import io.grpc.InternalChannelz.ChannelStats; 49 import io.grpc.InternalChannelz.ChannelTrace; 50 import io.grpc.InternalInstrumented; 51 import io.grpc.InternalLogId; 52 import io.grpc.InternalWithLogId; 53 import io.grpc.LoadBalancer; 54 import io.grpc.LoadBalancer.PickResult; 55 import io.grpc.LoadBalancer.PickSubchannelArgs; 56 import io.grpc.LoadBalancer.SubchannelPicker; 57 import io.grpc.ManagedChannel; 58 import io.grpc.Metadata; 59 import io.grpc.MethodDescriptor; 60 import io.grpc.NameResolver; 61 import io.grpc.Status; 62 import io.grpc.internal.ClientCallImpl.ClientTransportProvider; 63 import io.grpc.internal.RetriableStream.ChannelBufferMeter; 64 import io.grpc.internal.RetriableStream.Throttle; 65 import java.net.URI; 66 import java.net.URISyntaxException; 67 import java.util.ArrayList; 68 import java.util.Collection; 69 import java.util.Collections; 70 import java.util.HashSet; 71 import java.util.List; 72 import java.util.Map; 73 import java.util.Set; 74 import java.util.concurrent.CountDownLatch; 75 import java.util.concurrent.Executor; 76 import java.util.concurrent.ScheduledFuture; 77 import java.util.concurrent.TimeUnit; 78 import java.util.concurrent.atomic.AtomicBoolean; 79 import java.util.logging.Level; 80 import java.util.logging.Logger; 81 import java.util.regex.Pattern; 82 import javax.annotation.CheckForNull; 83 import javax.annotation.Nullable; 84 import javax.annotation.concurrent.GuardedBy; 85 import javax.annotation.concurrent.ThreadSafe; 86 87 /** A communication channel for making outgoing RPCs. */ 88 @ThreadSafe 89 final class ManagedChannelImpl extends ManagedChannel implements 90 InternalInstrumented<ChannelStats> { 91 static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName()); 92 93 // Matching this pattern means the target string is a URI target or at least intended to be one. 94 // A URI target must be an absolute hierarchical URI. 95 // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." ) 96 @VisibleForTesting 97 static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*"); 98 99 static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1; 100 101 @VisibleForTesting 102 static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5; 103 104 @VisibleForTesting 105 static final Status SHUTDOWN_NOW_STATUS = 106 Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked"); 107 108 @VisibleForTesting 109 static final Status SHUTDOWN_STATUS = 110 Status.UNAVAILABLE.withDescription("Channel shutdown invoked"); 111 112 @VisibleForTesting 113 static final Status SUBCHANNEL_SHUTDOWN_STATUS = 114 Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked"); 115 116 private final InternalLogId logId = InternalLogId.allocate(getClass().getName()); 117 private final String target; 118 private final NameResolver.Factory nameResolverFactory; 119 private final Attributes nameResolverParams; 120 private final LoadBalancer.Factory loadBalancerFactory; 121 private final ClientTransportFactory transportFactory; 122 private final Executor executor; 123 private final ObjectPool<? extends Executor> executorPool; 124 private final ObjectPool<? extends Executor> oobExecutorPool; 125 private final TimeProvider timeProvider; 126 private final int maxTraceEvents; 127 128 private final ChannelExecutor channelExecutor = new PanicChannelExecutor(); 129 130 private boolean fullStreamDecompression; 131 132 private final DecompressorRegistry decompressorRegistry; 133 private final CompressorRegistry compressorRegistry; 134 135 private final Supplier<Stopwatch> stopwatchSupplier; 136 /** The timout before entering idle mode. */ 137 private final long idleTimeoutMillis; 138 139 private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager(); 140 141 private final ServiceConfigInterceptor serviceConfigInterceptor; 142 143 private final BackoffPolicy.Provider backoffPolicyProvider; 144 145 /** 146 * We delegate to this channel, so that we can have interceptors as necessary. If there aren't 147 * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a 148 * {@link RealChannel}. 149 */ 150 private final Channel interceptorChannel; 151 @Nullable private final String userAgent; 152 153 // Only null after channel is terminated. Must be assigned from the channelExecutor. 154 private NameResolver nameResolver; 155 156 // Must be accessed from the channelExecutor. 157 private boolean nameResolverStarted; 158 159 // null when channel is in idle mode. Must be assigned from channelExecutor. 160 @Nullable 161 private LbHelperImpl lbHelper; 162 163 // Must ONLY be assigned from updateSubchannelPicker(), which is called from channelExecutor. 164 // null if channel is in idle mode. 165 @Nullable 166 private volatile SubchannelPicker subchannelPicker; 167 168 // Must be accessed from the channelExecutor 169 private boolean panicMode; 170 171 // Must be mutated from channelExecutor 172 // If any monitoring hook to be added later needs to get a snapshot of this Set, we could 173 // switch to a ConcurrentHashMap. 174 private final Set<InternalSubchannel> subchannels = new HashSet<InternalSubchannel>(16, .75f); 175 176 // Must be mutated from channelExecutor 177 private final Set<OobChannel> oobChannels = new HashSet<OobChannel>(1, .75f); 178 179 // reprocess() must be run from channelExecutor 180 private final DelayedClientTransport delayedTransport; 181 private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry 182 = new UncommittedRetriableStreamsRegistry(); 183 184 // Shutdown states. 185 // 186 // Channel's shutdown process: 187 // 1. shutdown(): stop accepting new calls from applications 188 // 1a shutdown <- true 189 // 1b subchannelPicker <- null 190 // 1c delayedTransport.shutdown() 191 // 2. delayedTransport terminated: stop stream-creation functionality 192 // 2a terminating <- true 193 // 2b loadBalancer.shutdown() 194 // * LoadBalancer will shutdown subchannels and OOB channels 195 // 2c loadBalancer <- null 196 // 2d nameResolver.shutdown() 197 // 2e nameResolver <- null 198 // 3. All subchannels and OOB channels terminated: Channel considered terminated 199 200 private final AtomicBoolean shutdown = new AtomicBoolean(false); 201 // Must only be mutated and read from channelExecutor 202 private boolean shutdownNowed; 203 // Must be mutated from channelExecutor 204 private volatile boolean terminating; 205 // Must be mutated from channelExecutor 206 private volatile boolean terminated; 207 private final CountDownLatch terminatedLatch = new CountDownLatch(1); 208 209 private final CallTracer.Factory callTracerFactory; 210 private final CallTracer channelCallTracer; 211 @CheckForNull 212 private final ChannelTracer channelTracer; 213 private final InternalChannelz channelz; 214 @CheckForNull 215 private Boolean haveBackends; // a flag for doing channel tracing when flipped 216 @Nullable 217 private Map<String, Object> lastServiceConfig; // used for channel tracing when value changed 218 219 // One instance per channel. 220 private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter(); 221 222 @Nullable 223 private Throttle throttle; 224 225 private final long perRpcBufferLimit; 226 private final long channelBufferLimit; 227 228 // Temporary false flag that can skip the retry code path. 229 private final boolean retryEnabled; 230 231 // Called from channelExecutor 232 private final ManagedClientTransport.Listener delayedTransportListener = 233 new DelayedTransportListener(); 234 235 // Must be called from channelExecutor maybeShutdownNowSubchannels()236 private void maybeShutdownNowSubchannels() { 237 if (shutdownNowed) { 238 for (InternalSubchannel subchannel : subchannels) { 239 subchannel.shutdownNow(SHUTDOWN_NOW_STATUS); 240 } 241 for (OobChannel oobChannel : oobChannels) { 242 oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS); 243 } 244 } 245 } 246 247 // Must be accessed from channelExecutor 248 @VisibleForTesting 249 final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator(); 250 251 @Override getStats()252 public ListenableFuture<ChannelStats> getStats() { 253 final SettableFuture<ChannelStats> ret = SettableFuture.create(); 254 final class StatsFetcher implements Runnable { 255 @Override 256 public void run() { 257 ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder(); 258 channelCallTracer.updateBuilder(builder); 259 if (channelTracer != null) { 260 channelTracer.updateBuilder(builder); 261 } 262 builder.setTarget(target).setState(channelStateManager.getState()); 263 List<InternalWithLogId> children = new ArrayList<>(); 264 children.addAll(subchannels); 265 children.addAll(oobChannels); 266 builder.setSubchannels(children); 267 ret.set(builder.build()); 268 } 269 } 270 271 // subchannels and oobchannels can only be accessed from channelExecutor 272 channelExecutor.executeLater(new StatsFetcher()).drain(); 273 return ret; 274 } 275 276 @Override getLogId()277 public InternalLogId getLogId() { 278 return logId; 279 } 280 281 // Run from channelExecutor 282 private class IdleModeTimer implements Runnable { 283 284 @Override run()285 public void run() { 286 enterIdleMode(); 287 } 288 } 289 290 // Must be called from channelExecutor shutdownNameResolverAndLoadBalancer(boolean verifyActive)291 private void shutdownNameResolverAndLoadBalancer(boolean verifyActive) { 292 if (verifyActive) { 293 checkState(nameResolver != null, "nameResolver is null"); 294 checkState(lbHelper != null, "lbHelper is null"); 295 } 296 if (nameResolver != null) { 297 cancelNameResolverBackoff(); 298 nameResolver.shutdown(); 299 nameResolver = null; 300 nameResolverStarted = false; 301 } 302 if (lbHelper != null) { 303 lbHelper.lb.shutdown(); 304 lbHelper = null; 305 } 306 subchannelPicker = null; 307 } 308 309 /** 310 * Make the channel exit idle mode, if it's in it. 311 * 312 * <p>Must be called from channelExecutor 313 */ 314 @VisibleForTesting exitIdleMode()315 void exitIdleMode() { 316 if (shutdown.get() || panicMode) { 317 return; 318 } 319 if (inUseStateAggregator.isInUse()) { 320 // Cancel the timer now, so that a racing due timer will not put Channel on idleness 321 // when the caller of exitIdleMode() is about to use the returned loadBalancer. 322 cancelIdleTimer(false); 323 } else { 324 // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while 325 // isInUse() == false, in which case we still need to schedule the timer. 326 rescheduleIdleTimer(); 327 } 328 if (lbHelper != null) { 329 return; 330 } 331 logger.log(Level.FINE, "[{0}] Exiting idle mode", getLogId()); 332 lbHelper = new LbHelperImpl(nameResolver); 333 lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper); 334 335 NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper); 336 try { 337 nameResolver.start(listener); 338 nameResolverStarted = true; 339 } catch (Throwable t) { 340 listener.onError(Status.fromThrowable(t)); 341 } 342 } 343 344 // Must be run from channelExecutor enterIdleMode()345 private void enterIdleMode() { 346 logger.log(Level.FINE, "[{0}] Entering idle mode", getLogId()); 347 // nameResolver and loadBalancer are guaranteed to be non-null. If any of them were null, 348 // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown() 349 // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of 350 // which are bugs. 351 shutdownNameResolverAndLoadBalancer(true); 352 delayedTransport.reprocess(null); 353 nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams); 354 if (channelTracer != null) { 355 channelTracer.reportEvent( 356 new ChannelTrace.Event.Builder() 357 .setDescription("Entering IDLE state") 358 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 359 .setTimestampNanos(timeProvider.currentTimeNanos()) 360 .build()); 361 } 362 channelStateManager.gotoState(IDLE); 363 if (inUseStateAggregator.isInUse()) { 364 exitIdleMode(); 365 } 366 } 367 368 // Must be run from channelExecutor cancelIdleTimer(boolean permanent)369 private void cancelIdleTimer(boolean permanent) { 370 idleTimer.cancel(permanent); 371 } 372 373 // Always run from channelExecutor rescheduleIdleTimer()374 private void rescheduleIdleTimer() { 375 if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { 376 return; 377 } 378 idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS); 379 } 380 381 // Run from channelExecutor 382 @VisibleForTesting 383 class NameResolverRefresh implements Runnable { 384 // Only mutated from channelExecutor 385 boolean cancelled; 386 387 @Override run()388 public void run() { 389 if (cancelled) { 390 // Race detected: this task was scheduled on channelExecutor before 391 // cancelNameResolverBackoff() could cancel the timer. 392 return; 393 } 394 nameResolverRefreshFuture = null; 395 nameResolverRefresh = null; 396 if (nameResolver != null) { 397 nameResolver.refresh(); 398 } 399 } 400 } 401 402 // Must be used from channelExecutor 403 @Nullable private ScheduledFuture<?> nameResolverRefreshFuture; 404 // Must be used from channelExecutor 405 @Nullable private NameResolverRefresh nameResolverRefresh; 406 // The policy to control backoff between name resolution attempts. Non-null when an attempt is 407 // scheduled. Must be used from channelExecutor 408 @Nullable private BackoffPolicy nameResolverBackoffPolicy; 409 410 // Must be run from channelExecutor cancelNameResolverBackoff()411 private void cancelNameResolverBackoff() { 412 if (nameResolverRefreshFuture != null) { 413 nameResolverRefreshFuture.cancel(false); 414 nameResolverRefresh.cancelled = true; 415 nameResolverRefreshFuture = null; 416 nameResolverRefresh = null; 417 nameResolverBackoffPolicy = null; 418 } 419 } 420 421 private final class ChannelTransportProvider implements ClientTransportProvider { 422 @Override get(PickSubchannelArgs args)423 public ClientTransport get(PickSubchannelArgs args) { 424 SubchannelPicker pickerCopy = subchannelPicker; 425 if (shutdown.get()) { 426 // If channel is shut down, delayedTransport is also shut down which will fail the stream 427 // properly. 428 return delayedTransport; 429 } 430 if (pickerCopy == null) { 431 final class ExitIdleModeForTransport implements Runnable { 432 @Override 433 public void run() { 434 exitIdleMode(); 435 } 436 } 437 438 channelExecutor.executeLater(new ExitIdleModeForTransport()).drain(); 439 return delayedTransport; 440 } 441 // There is no need to reschedule the idle timer here. 442 // 443 // pickerCopy != null, which means idle timer has not expired when this method starts. 444 // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer 445 // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after 446 // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it. 447 // 448 // In most cases the idle timer is scheduled to fire after the transport has created the 449 // stream, which would have reported in-use state to the channel that would have cancelled 450 // the idle timer. 451 PickResult pickResult = pickerCopy.pickSubchannel(args); 452 ClientTransport transport = GrpcUtil.getTransportFromPickResult( 453 pickResult, args.getCallOptions().isWaitForReady()); 454 if (transport != null) { 455 return transport; 456 } 457 return delayedTransport; 458 } 459 460 @Override newRetriableStream( final MethodDescriptor<ReqT, ?> method, final CallOptions callOptions, final Metadata headers, final Context context)461 public <ReqT> RetriableStream<ReqT> newRetriableStream( 462 final MethodDescriptor<ReqT, ?> method, 463 final CallOptions callOptions, 464 final Metadata headers, 465 final Context context) { 466 checkState(retryEnabled, "retry should be enabled"); 467 final class RetryStream extends RetriableStream<ReqT> { 468 RetryStream() { 469 super( 470 method, 471 headers, 472 channelBufferUsed, 473 perRpcBufferLimit, 474 channelBufferLimit, 475 getCallExecutor(callOptions), 476 transportFactory.getScheduledExecutorService(), 477 callOptions.getOption(RETRY_POLICY_KEY), 478 callOptions.getOption(HEDGING_POLICY_KEY), 479 throttle); 480 } 481 482 @Override 483 Status prestart() { 484 return uncommittedRetriableStreamsRegistry.add(this); 485 } 486 487 @Override 488 void postCommit() { 489 uncommittedRetriableStreamsRegistry.remove(this); 490 } 491 492 @Override 493 ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata newHeaders) { 494 CallOptions newOptions = callOptions.withStreamTracerFactory(tracerFactory); 495 ClientTransport transport = 496 get(new PickSubchannelArgsImpl(method, newHeaders, newOptions)); 497 Context origContext = context.attach(); 498 try { 499 return transport.newStream(method, newHeaders, newOptions); 500 } finally { 501 context.detach(origContext); 502 } 503 } 504 } 505 506 return new RetryStream(); 507 } 508 } 509 510 private final ClientTransportProvider transportProvider = new ChannelTransportProvider(); 511 512 private final Rescheduler idleTimer; 513 ManagedChannelImpl( AbstractManagedChannelImplBuilder<?> builder, ClientTransportFactory clientTransportFactory, BackoffPolicy.Provider backoffPolicyProvider, ObjectPool<? extends Executor> oobExecutorPool, Supplier<Stopwatch> stopwatchSupplier, List<ClientInterceptor> interceptors, final TimeProvider timeProvider)514 ManagedChannelImpl( 515 AbstractManagedChannelImplBuilder<?> builder, 516 ClientTransportFactory clientTransportFactory, 517 BackoffPolicy.Provider backoffPolicyProvider, 518 ObjectPool<? extends Executor> oobExecutorPool, 519 Supplier<Stopwatch> stopwatchSupplier, 520 List<ClientInterceptor> interceptors, 521 final TimeProvider timeProvider) { 522 this.target = checkNotNull(builder.target, "target"); 523 this.nameResolverFactory = builder.getNameResolverFactory(); 524 this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams"); 525 this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams); 526 this.timeProvider = checkNotNull(timeProvider, "timeProvider"); 527 maxTraceEvents = builder.maxTraceEvents; 528 if (maxTraceEvents > 0) { 529 long currentTimeNanos = timeProvider.currentTimeNanos(); 530 channelTracer = new ChannelTracer(builder.maxTraceEvents, currentTimeNanos, "Channel"); 531 } else { 532 channelTracer = null; 533 } 534 if (builder.loadBalancerFactory == null) { 535 this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(channelTracer, timeProvider); 536 } else { 537 this.loadBalancerFactory = builder.loadBalancerFactory; 538 } 539 this.executorPool = checkNotNull(builder.executorPool, "executorPool"); 540 this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool"); 541 this.executor = checkNotNull(executorPool.getObject(), "executor"); 542 this.delayedTransport = new DelayedClientTransport(this.executor, this.channelExecutor); 543 this.delayedTransport.start(delayedTransportListener); 544 this.backoffPolicyProvider = backoffPolicyProvider; 545 this.transportFactory = 546 new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor); 547 this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry; 548 serviceConfigInterceptor = new ServiceConfigInterceptor( 549 retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts); 550 Channel channel = new RealChannel(nameResolver.getServiceAuthority()); 551 channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor); 552 if (builder.binlog != null) { 553 channel = builder.binlog.wrapChannel(channel); 554 } 555 this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors); 556 this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); 557 if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { 558 this.idleTimeoutMillis = builder.idleTimeoutMillis; 559 } else { 560 checkArgument( 561 builder.idleTimeoutMillis 562 >= AbstractManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS, 563 "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis); 564 this.idleTimeoutMillis = builder.idleTimeoutMillis; 565 } 566 567 final class AutoDrainChannelExecutor implements Executor { 568 569 @Override 570 public void execute(Runnable command) { 571 channelExecutor.executeLater(command); 572 channelExecutor.drain(); 573 } 574 } 575 576 idleTimer = new Rescheduler( 577 new IdleModeTimer(), 578 new AutoDrainChannelExecutor(), 579 transportFactory.getScheduledExecutorService(), 580 stopwatchSupplier.get()); 581 this.fullStreamDecompression = builder.fullStreamDecompression; 582 this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry"); 583 this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry"); 584 this.userAgent = builder.userAgent; 585 586 this.channelBufferLimit = builder.retryBufferSize; 587 this.perRpcBufferLimit = builder.perRpcBufferLimit; 588 final class ChannelCallTracerFactory implements CallTracer.Factory { 589 @Override 590 public CallTracer create() { 591 return new CallTracer(timeProvider); 592 } 593 } 594 595 this.callTracerFactory = new ChannelCallTracerFactory(); 596 channelCallTracer = callTracerFactory.create(); 597 this.channelz = checkNotNull(builder.channelz); 598 channelz.addRootChannel(this); 599 600 logger.log(Level.FINE, "[{0}] Created with target {1}", new Object[] {getLogId(), target}); 601 } 602 603 @VisibleForTesting getNameResolver(String target, NameResolver.Factory nameResolverFactory, Attributes nameResolverParams)604 static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory, 605 Attributes nameResolverParams) { 606 // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending 607 // "dns:///". 608 URI targetUri = null; 609 StringBuilder uriSyntaxErrors = new StringBuilder(); 610 try { 611 targetUri = new URI(target); 612 // For "localhost:8080" this would likely cause newNameResolver to return null, because 613 // "localhost" is parsed as the scheme. Will fall into the next branch and try 614 // "dns:///localhost:8080". 615 } catch (URISyntaxException e) { 616 // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234. 617 uriSyntaxErrors.append(e.getMessage()); 618 } 619 if (targetUri != null) { 620 NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams); 621 if (resolver != null) { 622 return resolver; 623 } 624 // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an 625 // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080" 626 } 627 628 // If we reached here, the targetUri couldn't be used. 629 if (!URI_PATTERN.matcher(target).matches()) { 630 // It doesn't look like a URI target. Maybe it's an authority string. Try with the default 631 // scheme from the factory. 632 try { 633 targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null); 634 } catch (URISyntaxException e) { 635 // Should not be possible. 636 throw new IllegalArgumentException(e); 637 } 638 NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams); 639 if (resolver != null) { 640 return resolver; 641 } 642 } 643 throw new IllegalArgumentException(String.format( 644 "cannot find a NameResolver for %s%s", 645 target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : "")); 646 } 647 648 /** 649 * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately 650 * cancelled. 651 */ 652 @Override shutdown()653 public ManagedChannelImpl shutdown() { 654 logger.log(Level.FINE, "[{0}] shutdown() called", getLogId()); 655 if (!shutdown.compareAndSet(false, true)) { 656 return this; 657 } 658 659 // Put gotoState(SHUTDOWN) as early into the channelExecutor's queue as possible. 660 // delayedTransport.shutdown() may also add some tasks into the queue. But some things inside 661 // delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the 662 // channelExecutor's queue and should not be blocked, so we do not drain() immediately here. 663 final class Shutdown implements Runnable { 664 @Override 665 public void run() { 666 if (channelTracer != null) { 667 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 668 .setDescription("Entering SHUTDOWN state") 669 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 670 .setTimestampNanos(timeProvider.currentTimeNanos()) 671 .build()); 672 } 673 channelStateManager.gotoState(SHUTDOWN); 674 } 675 } 676 677 channelExecutor.executeLater(new Shutdown()); 678 679 uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS); 680 final class CancelIdleTimer implements Runnable { 681 @Override 682 public void run() { 683 cancelIdleTimer(/* permanent= */ true); 684 } 685 } 686 687 channelExecutor.executeLater(new CancelIdleTimer()).drain(); 688 logger.log(Level.FINE, "[{0}] Shutting down", getLogId()); 689 return this; 690 } 691 692 /** 693 * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although 694 * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely 695 * return {@code false} immediately after this method returns. 696 */ 697 @Override shutdownNow()698 public ManagedChannelImpl shutdownNow() { 699 logger.log(Level.FINE, "[{0}] shutdownNow() called", getLogId()); 700 shutdown(); 701 uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS); 702 final class ShutdownNow implements Runnable { 703 @Override 704 public void run() { 705 if (shutdownNowed) { 706 return; 707 } 708 shutdownNowed = true; 709 maybeShutdownNowSubchannels(); 710 } 711 } 712 713 channelExecutor.executeLater(new ShutdownNow()).drain(); 714 return this; 715 } 716 717 // Called from channelExecutor 718 @VisibleForTesting panic(final Throwable t)719 void panic(final Throwable t) { 720 if (panicMode) { 721 // Preserve the first panic information 722 return; 723 } 724 panicMode = true; 725 cancelIdleTimer(/* permanent= */ true); 726 shutdownNameResolverAndLoadBalancer(false); 727 final class PanicSubchannelPicker extends SubchannelPicker { 728 private final PickResult panicPickResult = 729 PickResult.withDrop( 730 Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t)); 731 732 @Override 733 public PickResult pickSubchannel(PickSubchannelArgs args) { 734 return panicPickResult; 735 } 736 } 737 738 updateSubchannelPicker(new PanicSubchannelPicker()); 739 if (channelTracer != null) { 740 channelTracer.reportEvent( 741 new ChannelTrace.Event.Builder() 742 .setDescription("Entering TRANSIENT_FAILURE state") 743 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 744 .setTimestampNanos(timeProvider.currentTimeNanos()) 745 .build()); 746 } 747 channelStateManager.gotoState(TRANSIENT_FAILURE); 748 } 749 750 // Called from channelExecutor updateSubchannelPicker(SubchannelPicker newPicker)751 private void updateSubchannelPicker(SubchannelPicker newPicker) { 752 subchannelPicker = newPicker; 753 delayedTransport.reprocess(newPicker); 754 } 755 756 @Override isShutdown()757 public boolean isShutdown() { 758 return shutdown.get(); 759 } 760 761 @Override awaitTermination(long timeout, TimeUnit unit)762 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 763 return terminatedLatch.await(timeout, unit); 764 } 765 766 @Override isTerminated()767 public boolean isTerminated() { 768 return terminated; 769 } 770 771 /* 772 * Creates a new outgoing call on the channel. 773 */ 774 @Override newCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions)775 public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method, 776 CallOptions callOptions) { 777 return interceptorChannel.newCall(method, callOptions); 778 } 779 780 @Override authority()781 public String authority() { 782 return interceptorChannel.authority(); 783 } 784 getCallExecutor(CallOptions callOptions)785 private Executor getCallExecutor(CallOptions callOptions) { 786 Executor executor = callOptions.getExecutor(); 787 if (executor == null) { 788 executor = this.executor; 789 } 790 return executor; 791 } 792 793 private class RealChannel extends Channel { 794 // Set when the NameResolver is initially created. When we create a new NameResolver for the 795 // same target, the new instance must have the same value. 796 private final String authority; 797 RealChannel(String authority)798 private RealChannel(String authority) { 799 this.authority = checkNotNull(authority, "authority"); 800 } 801 802 @Override newCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions)803 public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method, 804 CallOptions callOptions) { 805 return new ClientCallImpl<ReqT, RespT>( 806 method, 807 getCallExecutor(callOptions), 808 callOptions, 809 transportProvider, 810 terminated ? null : transportFactory.getScheduledExecutorService(), 811 channelCallTracer, 812 retryEnabled) 813 .setFullStreamDecompression(fullStreamDecompression) 814 .setDecompressorRegistry(decompressorRegistry) 815 .setCompressorRegistry(compressorRegistry); 816 } 817 818 @Override authority()819 public String authority() { 820 return authority; 821 } 822 } 823 824 /** 825 * Terminate the channel if termination conditions are met. 826 */ 827 // Must be run from channelExecutor maybeTerminateChannel()828 private void maybeTerminateChannel() { 829 if (terminated) { 830 return; 831 } 832 if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) { 833 logger.log(Level.FINE, "[{0}] Terminated", getLogId()); 834 channelz.removeRootChannel(this); 835 terminated = true; 836 terminatedLatch.countDown(); 837 executorPool.returnObject(executor); 838 // Release the transport factory so that it can deallocate any resources. 839 transportFactory.close(); 840 } 841 } 842 843 @Override getState(boolean requestConnection)844 public ConnectivityState getState(boolean requestConnection) { 845 ConnectivityState savedChannelState = channelStateManager.getState(); 846 if (requestConnection && savedChannelState == IDLE) { 847 final class RequestConnection implements Runnable { 848 @Override 849 public void run() { 850 exitIdleMode(); 851 if (subchannelPicker != null) { 852 subchannelPicker.requestConnection(); 853 } 854 } 855 } 856 857 channelExecutor.executeLater(new RequestConnection()).drain(); 858 } 859 return savedChannelState; 860 } 861 862 @Override notifyWhenStateChanged(final ConnectivityState source, final Runnable callback)863 public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) { 864 final class NotifyStateChanged implements Runnable { 865 @Override 866 public void run() { 867 channelStateManager.notifyWhenStateChanged(callback, executor, source); 868 } 869 } 870 871 channelExecutor.executeLater(new NotifyStateChanged()).drain(); 872 } 873 874 @Override resetConnectBackoff()875 public void resetConnectBackoff() { 876 final class ResetConnectBackoff implements Runnable { 877 @Override 878 public void run() { 879 if (shutdown.get()) { 880 return; 881 } 882 if (nameResolverRefreshFuture != null) { 883 checkState(nameResolverStarted, "name resolver must be started"); 884 cancelNameResolverBackoff(); 885 nameResolver.refresh(); 886 } 887 for (InternalSubchannel subchannel : subchannels) { 888 subchannel.resetConnectBackoff(); 889 } 890 for (OobChannel oobChannel : oobChannels) { 891 oobChannel.resetConnectBackoff(); 892 } 893 } 894 } 895 896 channelExecutor.executeLater(new ResetConnectBackoff()).drain(); 897 } 898 899 @Override enterIdle()900 public void enterIdle() { 901 final class PrepareToLoseNetworkRunnable implements Runnable { 902 @Override 903 public void run() { 904 if (shutdown.get() || lbHelper == null) { 905 return; 906 } 907 cancelIdleTimer(/* permanent= */ false); 908 enterIdleMode(); 909 } 910 } 911 912 channelExecutor.executeLater(new PrepareToLoseNetworkRunnable()).drain(); 913 } 914 915 /** 916 * A registry that prevents channel shutdown from killing existing retry attempts that are in 917 * backoff. 918 */ 919 private final class UncommittedRetriableStreamsRegistry { 920 // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream, 921 // it's worthwhile to look for a lock-free approach. 922 final Object lock = new Object(); 923 924 @GuardedBy("lock") 925 Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>(); 926 927 @GuardedBy("lock") 928 Status shutdownStatus; 929 onShutdown(Status reason)930 void onShutdown(Status reason) { 931 boolean shouldShutdownDelayedTransport = false; 932 synchronized (lock) { 933 if (shutdownStatus != null) { 934 return; 935 } 936 shutdownStatus = reason; 937 // Keep the delayedTransport open until there is no more uncommitted streams, b/c those 938 // retriable streams, which may be in backoff and not using any transport, are already 939 // started RPCs. 940 if (uncommittedRetriableStreams.isEmpty()) { 941 shouldShutdownDelayedTransport = true; 942 } 943 } 944 945 if (shouldShutdownDelayedTransport) { 946 delayedTransport.shutdown(reason); 947 } 948 } 949 onShutdownNow(Status reason)950 void onShutdownNow(Status reason) { 951 onShutdown(reason); 952 Collection<ClientStream> streams; 953 954 synchronized (lock) { 955 streams = new ArrayList<>(uncommittedRetriableStreams); 956 } 957 958 for (ClientStream stream : streams) { 959 stream.cancel(reason); 960 } 961 delayedTransport.shutdownNow(reason); 962 } 963 964 /** 965 * Registers a RetriableStream and return null if not shutdown, otherwise just returns the 966 * shutdown Status. 967 */ 968 @Nullable add(RetriableStream<?> retriableStream)969 Status add(RetriableStream<?> retriableStream) { 970 synchronized (lock) { 971 if (shutdownStatus != null) { 972 return shutdownStatus; 973 } 974 uncommittedRetriableStreams.add(retriableStream); 975 return null; 976 } 977 } 978 remove(RetriableStream<?> retriableStream)979 void remove(RetriableStream<?> retriableStream) { 980 Status shutdownStatusCopy = null; 981 982 synchronized (lock) { 983 uncommittedRetriableStreams.remove(retriableStream); 984 if (uncommittedRetriableStreams.isEmpty()) { 985 shutdownStatusCopy = shutdownStatus; 986 // Because retriable transport is long-lived, we take this opportunity to down-size the 987 // hashmap. 988 uncommittedRetriableStreams = new HashSet<>(); 989 } 990 } 991 992 if (shutdownStatusCopy != null) { 993 delayedTransport.shutdown(shutdownStatusCopy); 994 } 995 } 996 } 997 998 private class LbHelperImpl extends LoadBalancer.Helper { 999 LoadBalancer lb; 1000 final NameResolver nr; 1001 LbHelperImpl(NameResolver nr)1002 LbHelperImpl(NameResolver nr) { 1003 this.nr = checkNotNull(nr, "NameResolver"); 1004 } 1005 1006 // Must be called from channelExecutor handleInternalSubchannelState(ConnectivityStateInfo newState)1007 private void handleInternalSubchannelState(ConnectivityStateInfo newState) { 1008 if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) { 1009 nr.refresh(); 1010 } 1011 } 1012 1013 @Override createSubchannel( List<EquivalentAddressGroup> addressGroups, Attributes attrs)1014 public AbstractSubchannel createSubchannel( 1015 List<EquivalentAddressGroup> addressGroups, Attributes attrs) { 1016 checkNotNull(addressGroups, "addressGroups"); 1017 checkNotNull(attrs, "attrs"); 1018 // TODO(ejona): can we be even stricter? Like loadBalancer == null? 1019 checkState(!terminated, "Channel is terminated"); 1020 final SubchannelImpl subchannel = new SubchannelImpl(attrs); 1021 ChannelTracer subchannelTracer = null; 1022 long subchannelCreationTime = timeProvider.currentTimeNanos(); 1023 if (maxTraceEvents > 0) { 1024 subchannelTracer = new ChannelTracer(maxTraceEvents, subchannelCreationTime, "Subchannel"); 1025 } 1026 1027 final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback { 1028 // All callbacks are run in channelExecutor 1029 @Override 1030 void onTerminated(InternalSubchannel is) { 1031 subchannels.remove(is); 1032 channelz.removeSubchannel(is); 1033 maybeTerminateChannel(); 1034 } 1035 1036 @Override 1037 void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { 1038 handleInternalSubchannelState(newState); 1039 // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. 1040 if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) { 1041 lb.handleSubchannelState(subchannel, newState); 1042 } 1043 } 1044 1045 @Override 1046 void onInUse(InternalSubchannel is) { 1047 inUseStateAggregator.updateObjectInUse(is, true); 1048 } 1049 1050 @Override 1051 void onNotInUse(InternalSubchannel is) { 1052 inUseStateAggregator.updateObjectInUse(is, false); 1053 } 1054 } 1055 1056 final InternalSubchannel internalSubchannel = new InternalSubchannel( 1057 addressGroups, 1058 authority(), 1059 userAgent, 1060 backoffPolicyProvider, 1061 transportFactory, 1062 transportFactory.getScheduledExecutorService(), 1063 stopwatchSupplier, 1064 channelExecutor, 1065 new ManagedInternalSubchannelCallback(), 1066 channelz, 1067 callTracerFactory.create(), 1068 subchannelTracer, 1069 timeProvider); 1070 if (channelTracer != null) { 1071 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 1072 .setDescription("Child channel created") 1073 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1074 .setTimestampNanos(subchannelCreationTime) 1075 .setSubchannelRef(internalSubchannel) 1076 .build()); 1077 } 1078 channelz.addSubchannel(internalSubchannel); 1079 subchannel.subchannel = internalSubchannel; 1080 logger.log(Level.FINE, "[{0}] {1} created for {2}", 1081 new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroups}); 1082 1083 final class AddSubchannel implements Runnable { 1084 @Override 1085 public void run() { 1086 if (terminating) { 1087 // Because runSerialized() doesn't guarantee the runnable has been executed upon when 1088 // returning, the subchannel may still be returned to the balancer without being 1089 // shutdown even if "terminating" is already true. The subchannel will not be used in 1090 // this case, because delayed transport has terminated when "terminating" becomes 1091 // true, and no more requests will be sent to balancer beyond this point. 1092 internalSubchannel.shutdown(SHUTDOWN_STATUS); 1093 } 1094 if (!terminated) { 1095 // If channel has not terminated, it will track the subchannel and block termination 1096 // for it. 1097 subchannels.add(internalSubchannel); 1098 } 1099 } 1100 } 1101 1102 runSerialized(new AddSubchannel()); 1103 return subchannel; 1104 } 1105 1106 @Override updateBalancingState( final ConnectivityState newState, final SubchannelPicker newPicker)1107 public void updateBalancingState( 1108 final ConnectivityState newState, final SubchannelPicker newPicker) { 1109 checkNotNull(newState, "newState"); 1110 checkNotNull(newPicker, "newPicker"); 1111 final class UpdateBalancingState implements Runnable { 1112 @Override 1113 public void run() { 1114 if (LbHelperImpl.this != lbHelper) { 1115 return; 1116 } 1117 updateSubchannelPicker(newPicker); 1118 // It's not appropriate to report SHUTDOWN state from lb. 1119 // Ignore the case of newState == SHUTDOWN for now. 1120 if (newState != SHUTDOWN) { 1121 if (channelTracer != null) { 1122 channelTracer.reportEvent( 1123 new ChannelTrace.Event.Builder() 1124 .setDescription("Entering " + newState + " state") 1125 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1126 .setTimestampNanos(timeProvider.currentTimeNanos()) 1127 .build()); 1128 } 1129 channelStateManager.gotoState(newState); 1130 } 1131 } 1132 } 1133 1134 runSerialized(new UpdateBalancingState()); 1135 } 1136 1137 @Override updateSubchannelAddresses( LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs)1138 public void updateSubchannelAddresses( 1139 LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs) { 1140 checkArgument(subchannel instanceof SubchannelImpl, 1141 "subchannel must have been returned from createSubchannel"); 1142 ((SubchannelImpl) subchannel).subchannel.updateAddresses(addrs); 1143 } 1144 1145 @Override createOobChannel(EquivalentAddressGroup addressGroup, String authority)1146 public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) { 1147 // TODO(ejona): can we be even stricter? Like terminating? 1148 checkState(!terminated, "Channel is terminated"); 1149 long oobChannelCreationTime = timeProvider.currentTimeNanos(); 1150 ChannelTracer oobChannelTracer = null; 1151 ChannelTracer subchannelTracer = null; 1152 if (channelTracer != null) { 1153 oobChannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "OobChannel"); 1154 } 1155 final OobChannel oobChannel = new OobChannel( 1156 authority, oobExecutorPool, transportFactory.getScheduledExecutorService(), 1157 channelExecutor, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider); 1158 if (channelTracer != null) { 1159 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 1160 .setDescription("Child channel created") 1161 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1162 .setTimestampNanos(oobChannelCreationTime) 1163 .setChannelRef(oobChannel) 1164 .build()); 1165 subchannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "Subchannel"); 1166 } 1167 final class ManagedOobChannelCallback extends InternalSubchannel.Callback { 1168 @Override 1169 void onTerminated(InternalSubchannel is) { 1170 oobChannels.remove(oobChannel); 1171 channelz.removeSubchannel(is); 1172 oobChannel.handleSubchannelTerminated(); 1173 maybeTerminateChannel(); 1174 } 1175 1176 @Override 1177 void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { 1178 handleInternalSubchannelState(newState); 1179 oobChannel.handleSubchannelStateChange(newState); 1180 } 1181 } 1182 1183 final InternalSubchannel internalSubchannel = new InternalSubchannel( 1184 Collections.singletonList(addressGroup), 1185 authority, userAgent, backoffPolicyProvider, transportFactory, 1186 transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor, 1187 // All callback methods are run from channelExecutor 1188 new ManagedOobChannelCallback(), 1189 channelz, 1190 callTracerFactory.create(), 1191 subchannelTracer, 1192 timeProvider); 1193 if (oobChannelTracer != null) { 1194 oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder() 1195 .setDescription("Child channel created") 1196 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1197 .setTimestampNanos(oobChannelCreationTime) 1198 .setSubchannelRef(internalSubchannel) 1199 .build()); 1200 } 1201 channelz.addSubchannel(oobChannel); 1202 channelz.addSubchannel(internalSubchannel); 1203 oobChannel.setSubchannel(internalSubchannel); 1204 final class AddOobChannel implements Runnable { 1205 @Override 1206 public void run() { 1207 if (terminating) { 1208 oobChannel.shutdown(); 1209 } 1210 if (!terminated) { 1211 // If channel has not terminated, it will track the subchannel and block termination 1212 // for it. 1213 oobChannels.add(oobChannel); 1214 } 1215 } 1216 } 1217 1218 runSerialized(new AddOobChannel()); 1219 return oobChannel; 1220 } 1221 1222 @Override updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag)1223 public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) { 1224 checkArgument(channel instanceof OobChannel, 1225 "channel must have been returned from createOobChannel"); 1226 ((OobChannel) channel).updateAddresses(eag); 1227 } 1228 1229 @Override getAuthority()1230 public String getAuthority() { 1231 return ManagedChannelImpl.this.authority(); 1232 } 1233 1234 @Override getNameResolverFactory()1235 public NameResolver.Factory getNameResolverFactory() { 1236 return nameResolverFactory; 1237 } 1238 1239 @Override runSerialized(Runnable task)1240 public void runSerialized(Runnable task) { 1241 channelExecutor.executeLater(task).drain(); 1242 } 1243 } 1244 1245 private class NameResolverListenerImpl implements NameResolver.Listener { 1246 final LbHelperImpl helper; 1247 NameResolverListenerImpl(LbHelperImpl helperImpl)1248 NameResolverListenerImpl(LbHelperImpl helperImpl) { 1249 this.helper = helperImpl; 1250 } 1251 1252 @Override onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config)1253 public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) { 1254 if (servers.isEmpty()) { 1255 onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list")); 1256 return; 1257 } 1258 if (logger.isLoggable(Level.FINE)) { 1259 logger.log(Level.FINE, "[{0}] resolved address: {1}, config={2}", 1260 new Object[]{getLogId(), servers, config}); 1261 } 1262 1263 if (channelTracer != null && (haveBackends == null || !haveBackends)) { 1264 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 1265 .setDescription("Address resolved: " + servers) 1266 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1267 .setTimestampNanos(timeProvider.currentTimeNanos()) 1268 .build()); 1269 haveBackends = true; 1270 } 1271 final Map<String, Object> serviceConfig = 1272 config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); 1273 if (channelTracer != null && serviceConfig != null 1274 && !serviceConfig.equals(lastServiceConfig)) { 1275 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 1276 .setDescription("Service config changed") 1277 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1278 .setTimestampNanos(timeProvider.currentTimeNanos()) 1279 .build()); 1280 lastServiceConfig = serviceConfig; 1281 } 1282 1283 final class NamesResolved implements Runnable { 1284 @Override 1285 public void run() { 1286 // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. 1287 if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { 1288 return; 1289 } 1290 1291 nameResolverBackoffPolicy = null; 1292 1293 if (serviceConfig != null) { 1294 try { 1295 serviceConfigInterceptor.handleUpdate(serviceConfig); 1296 if (retryEnabled) { 1297 throttle = getThrottle(config); 1298 } 1299 } catch (RuntimeException re) { 1300 logger.log( 1301 Level.WARNING, 1302 "[" + getLogId() + "] Unexpected exception from parsing service config", 1303 re); 1304 } 1305 } 1306 1307 helper.lb.handleResolvedAddressGroups(servers, config); 1308 } 1309 } 1310 1311 helper.runSerialized(new NamesResolved()); 1312 } 1313 1314 @Override onError(final Status error)1315 public void onError(final Status error) { 1316 checkArgument(!error.isOk(), "the error status must not be OK"); 1317 logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}", 1318 new Object[] {getLogId(), error}); 1319 if (channelTracer != null && (haveBackends == null || haveBackends)) { 1320 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 1321 .setDescription("Failed to resolve name") 1322 .setSeverity(ChannelTrace.Event.Severity.CT_WARNING) 1323 .setTimestampNanos(timeProvider.currentTimeNanos()) 1324 .build()); 1325 haveBackends = false; 1326 } 1327 final class NameResolverErrorHandler implements Runnable { 1328 @Override 1329 public void run() { 1330 // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. 1331 if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { 1332 return; 1333 } 1334 helper.lb.handleNameResolutionError(error); 1335 if (nameResolverRefreshFuture != null) { 1336 // The name resolver may invoke onError multiple times, but we only want to 1337 // schedule one backoff attempt 1338 // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we 1339 // want to reset the backoff interval upon repeated onError() calls 1340 return; 1341 } 1342 if (nameResolverBackoffPolicy == null) { 1343 nameResolverBackoffPolicy = backoffPolicyProvider.get(); 1344 } 1345 long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos(); 1346 if (logger.isLoggable(Level.FINE)) { 1347 logger.log( 1348 Level.FINE, 1349 "[{0}] Scheduling DNS resolution backoff for {1} ns", 1350 new Object[] {logId, delayNanos}); 1351 } 1352 nameResolverRefresh = new NameResolverRefresh(); 1353 nameResolverRefreshFuture = 1354 transportFactory 1355 .getScheduledExecutorService() 1356 .schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS); 1357 } 1358 } 1359 1360 channelExecutor.executeLater(new NameResolverErrorHandler()).drain(); 1361 } 1362 } 1363 1364 @Nullable getThrottle(Attributes config)1365 private static Throttle getThrottle(Attributes config) { 1366 return ServiceConfigUtil.getThrottlePolicy( 1367 config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)); 1368 } 1369 1370 private final class SubchannelImpl extends AbstractSubchannel { 1371 // Set right after SubchannelImpl is created. 1372 InternalSubchannel subchannel; 1373 final Object shutdownLock = new Object(); 1374 final Attributes attrs; 1375 1376 @GuardedBy("shutdownLock") 1377 boolean shutdownRequested; 1378 @GuardedBy("shutdownLock") 1379 ScheduledFuture<?> delayedShutdownTask; 1380 SubchannelImpl(Attributes attrs)1381 SubchannelImpl(Attributes attrs) { 1382 this.attrs = checkNotNull(attrs, "attrs"); 1383 } 1384 1385 @Override obtainActiveTransport()1386 ClientTransport obtainActiveTransport() { 1387 return subchannel.obtainActiveTransport(); 1388 } 1389 1390 @Override getInternalSubchannel()1391 InternalInstrumented<ChannelStats> getInternalSubchannel() { 1392 return subchannel; 1393 } 1394 1395 @Override shutdown()1396 public void shutdown() { 1397 synchronized (shutdownLock) { 1398 if (shutdownRequested) { 1399 if (terminating && delayedShutdownTask != null) { 1400 // shutdown() was previously called when terminating == false, thus a delayed shutdown() 1401 // was scheduled. Now since terminating == true, We should expedite the shutdown. 1402 delayedShutdownTask.cancel(false); 1403 delayedShutdownTask = null; 1404 // Will fall through to the subchannel.shutdown() at the end. 1405 } else { 1406 return; 1407 } 1408 } else { 1409 shutdownRequested = true; 1410 } 1411 // Add a delay to shutdown to deal with the race between 1) a transport being picked and 1412 // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g., 1413 // because of address change, or because LoadBalancer is shutdown by Channel entering idle 1414 // mode). If (2) wins, the app will see a spurious error. We work around this by delaying 1415 // shutdown of Subchannel for a few seconds here. 1416 // 1417 // TODO(zhangkun83): consider a better approach 1418 // (https://github.com/grpc/grpc-java/issues/2562). 1419 if (!terminating) { 1420 final class ShutdownSubchannel implements Runnable { 1421 @Override 1422 public void run() { 1423 subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS); 1424 } 1425 } 1426 1427 delayedShutdownTask = transportFactory.getScheduledExecutorService().schedule( 1428 new LogExceptionRunnable( 1429 new ShutdownSubchannel()), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); 1430 return; 1431 } 1432 } 1433 // When terminating == true, no more real streams will be created. It's safe and also 1434 // desirable to shutdown timely. 1435 subchannel.shutdown(SHUTDOWN_STATUS); 1436 } 1437 1438 @Override requestConnection()1439 public void requestConnection() { 1440 subchannel.obtainActiveTransport(); 1441 } 1442 1443 @Override getAllAddresses()1444 public List<EquivalentAddressGroup> getAllAddresses() { 1445 return subchannel.getAddressGroups(); 1446 } 1447 1448 @Override getAttributes()1449 public Attributes getAttributes() { 1450 return attrs; 1451 } 1452 1453 @Override toString()1454 public String toString() { 1455 return subchannel.getLogId().toString(); 1456 } 1457 } 1458 1459 @Override toString()1460 public String toString() { 1461 return MoreObjects.toStringHelper(this) 1462 .add("logId", logId.getId()) 1463 .add("target", target) 1464 .toString(); 1465 } 1466 1467 private final class PanicChannelExecutor extends ChannelExecutor { 1468 @Override handleUncaughtThrowable(Throwable t)1469 void handleUncaughtThrowable(Throwable t) { 1470 super.handleUncaughtThrowable(t); 1471 panic(t); 1472 } 1473 } 1474 1475 /** 1476 * Called from channelExecutor. 1477 */ 1478 private final class DelayedTransportListener implements ManagedClientTransport.Listener { 1479 @Override transportShutdown(Status s)1480 public void transportShutdown(Status s) { 1481 checkState(shutdown.get(), "Channel must have been shut down"); 1482 } 1483 1484 @Override transportReady()1485 public void transportReady() { 1486 // Don't care 1487 } 1488 1489 @Override transportInUse(final boolean inUse)1490 public void transportInUse(final boolean inUse) { 1491 inUseStateAggregator.updateObjectInUse(delayedTransport, inUse); 1492 } 1493 1494 @Override transportTerminated()1495 public void transportTerminated() { 1496 checkState(shutdown.get(), "Channel must have been shut down"); 1497 terminating = true; 1498 shutdownNameResolverAndLoadBalancer(false); 1499 // No need to call channelStateManager since we are already in SHUTDOWN state. 1500 // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them 1501 // here. 1502 maybeShutdownNowSubchannels(); 1503 maybeTerminateChannel(); 1504 } 1505 } 1506 1507 /** 1508 * Must be accessed from channelExecutor. 1509 */ 1510 private final class IdleModeStateAggregator extends InUseStateAggregator<Object> { 1511 @Override handleInUse()1512 void handleInUse() { 1513 exitIdleMode(); 1514 } 1515 1516 @Override handleNotInUse()1517 void handleNotInUse() { 1518 if (shutdown.get()) { 1519 return; 1520 } 1521 rescheduleIdleTimer(); 1522 } 1523 } 1524 } 1525