1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static io.grpc.ConnectivityState.IDLE; 23 import static io.grpc.ConnectivityState.SHUTDOWN; 24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 25 import static io.grpc.internal.ServiceConfigInterceptor.HEDGING_POLICY_KEY; 26 import static io.grpc.internal.ServiceConfigInterceptor.RETRY_POLICY_KEY; 27 28 import com.google.common.annotations.VisibleForTesting; 29 import com.google.common.base.MoreObjects; 30 import com.google.common.base.Stopwatch; 31 import com.google.common.base.Supplier; 32 import com.google.common.util.concurrent.ListenableFuture; 33 import com.google.common.util.concurrent.SettableFuture; 34 import io.grpc.Attributes; 35 import io.grpc.CallOptions; 36 import io.grpc.Channel; 37 import io.grpc.ClientCall; 38 import io.grpc.ClientInterceptor; 39 import io.grpc.ClientInterceptors; 40 import io.grpc.ClientStreamTracer; 41 import io.grpc.CompressorRegistry; 42 import io.grpc.ConnectivityState; 43 import io.grpc.ConnectivityStateInfo; 44 import io.grpc.Context; 45 import io.grpc.DecompressorRegistry; 46 import io.grpc.EquivalentAddressGroup; 47 import io.grpc.InternalChannelz; 48 import io.grpc.InternalChannelz.ChannelStats; 49 import io.grpc.InternalChannelz.ChannelTrace; 50 import io.grpc.InternalInstrumented; 51 import io.grpc.InternalLogId; 52 import io.grpc.InternalWithLogId; 53 import io.grpc.LoadBalancer; 54 import io.grpc.LoadBalancer.PickResult; 55 import io.grpc.LoadBalancer.PickSubchannelArgs; 56 import io.grpc.LoadBalancer.SubchannelPicker; 57 import io.grpc.ManagedChannel; 58 import io.grpc.Metadata; 59 import io.grpc.MethodDescriptor; 60 import io.grpc.NameResolver; 61 import io.grpc.Status; 62 import io.grpc.internal.ClientCallImpl.ClientTransportProvider; 63 import io.grpc.internal.RetriableStream.ChannelBufferMeter; 64 import io.grpc.internal.RetriableStream.Throttle; 65 import java.net.URI; 66 import java.net.URISyntaxException; 67 import java.util.ArrayList; 68 import java.util.Collection; 69 import java.util.Collections; 70 import java.util.HashSet; 71 import java.util.List; 72 import java.util.Map; 73 import java.util.Set; 74 import java.util.concurrent.CountDownLatch; 75 import java.util.concurrent.Executor; 76 import java.util.concurrent.ScheduledFuture; 77 import java.util.concurrent.TimeUnit; 78 import java.util.concurrent.atomic.AtomicBoolean; 79 import java.util.logging.Level; 80 import java.util.logging.Logger; 81 import java.util.regex.Pattern; 82 import javax.annotation.CheckForNull; 83 import javax.annotation.Nullable; 84 import javax.annotation.concurrent.GuardedBy; 85 import javax.annotation.concurrent.ThreadSafe; 86 87 /** A communication channel for making outgoing RPCs. */ 88 @ThreadSafe 89 final class ManagedChannelImpl extends ManagedChannel implements 90 InternalInstrumented<ChannelStats> { 91 static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName()); 92 93 // Matching this pattern means the target string is a URI target or at least intended to be one. 94 // A URI target must be an absolute hierarchical URI. 95 // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." ) 96 @VisibleForTesting 97 static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*"); 98 99 static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1; 100 101 @VisibleForTesting 102 static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5; 103 104 @VisibleForTesting 105 static final Status SHUTDOWN_NOW_STATUS = 106 Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked"); 107 108 @VisibleForTesting 109 static final Status SHUTDOWN_STATUS = 110 Status.UNAVAILABLE.withDescription("Channel shutdown invoked"); 111 112 @VisibleForTesting 113 static final Status SUBCHANNEL_SHUTDOWN_STATUS = 114 Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked"); 115 116 private final InternalLogId logId = InternalLogId.allocate(getClass().getName()); 117 private final String target; 118 private final NameResolver.Factory nameResolverFactory; 119 private final Attributes nameResolverParams; 120 private final LoadBalancer.Factory loadBalancerFactory; 121 private final ClientTransportFactory transportFactory; 122 private final Executor executor; 123 private final ObjectPool<? extends Executor> executorPool; 124 private final ObjectPool<? extends Executor> oobExecutorPool; 125 private final TimeProvider timeProvider; 126 private final int maxTraceEvents; 127 128 private final ChannelExecutor channelExecutor = new ChannelExecutor() { 129 @Override 130 void handleUncaughtThrowable(Throwable t) { 131 super.handleUncaughtThrowable(t); 132 panic(t); 133 } 134 }; 135 136 private boolean fullStreamDecompression; 137 138 private final DecompressorRegistry decompressorRegistry; 139 private final CompressorRegistry compressorRegistry; 140 141 private final Supplier<Stopwatch> stopwatchSupplier; 142 /** The timout before entering idle mode. */ 143 private final long idleTimeoutMillis; 144 145 private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager(); 146 147 private final ServiceConfigInterceptor serviceConfigInterceptor; 148 149 private final BackoffPolicy.Provider backoffPolicyProvider; 150 151 /** 152 * We delegate to this channel, so that we can have interceptors as necessary. If there aren't 153 * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a 154 * {@link RealChannel}. 155 */ 156 private final Channel interceptorChannel; 157 @Nullable private final String userAgent; 158 159 // Only null after channel is terminated. Must be assigned from the channelExecutor. 160 private NameResolver nameResolver; 161 162 // Must be accessed from the channelExecutor. 163 private boolean nameResolverStarted; 164 165 // null when channel is in idle mode. Must be assigned from channelExecutor. 166 @Nullable 167 private LbHelperImpl lbHelper; 168 169 // Must ONLY be assigned from updateSubchannelPicker(), which is called from channelExecutor. 170 // null if channel is in idle mode. 171 @Nullable 172 private volatile SubchannelPicker subchannelPicker; 173 174 // Must be accessed from the channelExecutor 175 private boolean panicMode; 176 177 // Must be mutated from channelExecutor 178 // If any monitoring hook to be added later needs to get a snapshot of this Set, we could 179 // switch to a ConcurrentHashMap. 180 private final Set<InternalSubchannel> subchannels = new HashSet<InternalSubchannel>(16, .75f); 181 182 // Must be mutated from channelExecutor 183 private final Set<OobChannel> oobChannels = new HashSet<OobChannel>(1, .75f); 184 185 // reprocess() must be run from channelExecutor 186 private final DelayedClientTransport delayedTransport; 187 private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry 188 = new UncommittedRetriableStreamsRegistry(); 189 190 // Shutdown states. 191 // 192 // Channel's shutdown process: 193 // 1. shutdown(): stop accepting new calls from applications 194 // 1a shutdown <- true 195 // 1b subchannelPicker <- null 196 // 1c delayedTransport.shutdown() 197 // 2. delayedTransport terminated: stop stream-creation functionality 198 // 2a terminating <- true 199 // 2b loadBalancer.shutdown() 200 // * LoadBalancer will shutdown subchannels and OOB channels 201 // 2c loadBalancer <- null 202 // 2d nameResolver.shutdown() 203 // 2e nameResolver <- null 204 // 3. All subchannels and OOB channels terminated: Channel considered terminated 205 206 private final AtomicBoolean shutdown = new AtomicBoolean(false); 207 // Must only be mutated and read from channelExecutor 208 private boolean shutdownNowed; 209 // Must be mutated from channelExecutor 210 private volatile boolean terminating; 211 // Must be mutated from channelExecutor 212 private volatile boolean terminated; 213 private final CountDownLatch terminatedLatch = new CountDownLatch(1); 214 215 private final CallTracer.Factory callTracerFactory; 216 private final CallTracer channelCallTracer; 217 @CheckForNull 218 private final ChannelTracer channelTracer; 219 private final InternalChannelz channelz; 220 @CheckForNull 221 private Boolean haveBackends; // a flag for doing channel tracing when flipped 222 @Nullable 223 private Map<String, Object> lastServiceConfig; // used for channel tracing when value changed 224 225 // One instance per channel. 226 private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter(); 227 228 @Nullable 229 private Throttle throttle; 230 231 private final long perRpcBufferLimit; 232 private final long channelBufferLimit; 233 234 // Temporary false flag that can skip the retry code path. 235 private final boolean retryEnabled; 236 237 // Called from channelExecutor 238 private final ManagedClientTransport.Listener delayedTransportListener = 239 new ManagedClientTransport.Listener() { 240 @Override 241 public void transportShutdown(Status s) { 242 checkState(shutdown.get(), "Channel must have been shut down"); 243 } 244 245 @Override 246 public void transportReady() { 247 // Don't care 248 } 249 250 @Override 251 public void transportInUse(final boolean inUse) { 252 inUseStateAggregator.updateObjectInUse(delayedTransport, inUse); 253 } 254 255 @Override 256 public void transportTerminated() { 257 checkState(shutdown.get(), "Channel must have been shut down"); 258 terminating = true; 259 shutdownNameResolverAndLoadBalancer(false); 260 // No need to call channelStateManager since we are already in SHUTDOWN state. 261 // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them 262 // here. 263 maybeShutdownNowSubchannels(); 264 maybeTerminateChannel(); 265 } 266 }; 267 268 // Must be called from channelExecutor maybeShutdownNowSubchannels()269 private void maybeShutdownNowSubchannels() { 270 if (shutdownNowed) { 271 for (InternalSubchannel subchannel : subchannels) { 272 subchannel.shutdownNow(SHUTDOWN_NOW_STATUS); 273 } 274 for (OobChannel oobChannel : oobChannels) { 275 oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS); 276 } 277 } 278 } 279 280 // Must be accessed from channelExecutor 281 @VisibleForTesting 282 final InUseStateAggregator<Object> inUseStateAggregator = 283 new InUseStateAggregator<Object>() { 284 @Override 285 void handleInUse() { 286 exitIdleMode(); 287 } 288 289 @Override 290 void handleNotInUse() { 291 if (shutdown.get()) { 292 return; 293 } 294 rescheduleIdleTimer(); 295 } 296 }; 297 298 @Override getStats()299 public ListenableFuture<ChannelStats> getStats() { 300 final SettableFuture<ChannelStats> ret = SettableFuture.create(); 301 // subchannels and oobchannels can only be accessed from channelExecutor 302 channelExecutor.executeLater(new Runnable() { 303 @Override 304 public void run() { 305 ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder(); 306 channelCallTracer.updateBuilder(builder); 307 if (channelTracer != null) { 308 channelTracer.updateBuilder(builder); 309 } 310 builder.setTarget(target).setState(channelStateManager.getState()); 311 List<InternalWithLogId> children = new ArrayList<>(); 312 children.addAll(subchannels); 313 children.addAll(oobChannels); 314 builder.setSubchannels(children); 315 ret.set(builder.build()); 316 } 317 }).drain(); 318 return ret; 319 } 320 321 @Override getLogId()322 public InternalLogId getLogId() { 323 return logId; 324 } 325 326 // Run from channelExecutor 327 private class IdleModeTimer implements Runnable { 328 329 @Override run()330 public void run() { 331 enterIdleMode(); 332 } 333 } 334 335 // Must be called from channelExecutor shutdownNameResolverAndLoadBalancer(boolean verifyActive)336 private void shutdownNameResolverAndLoadBalancer(boolean verifyActive) { 337 if (verifyActive) { 338 checkState(nameResolver != null, "nameResolver is null"); 339 checkState(lbHelper != null, "lbHelper is null"); 340 } 341 if (nameResolver != null) { 342 cancelNameResolverBackoff(); 343 nameResolver.shutdown(); 344 nameResolver = null; 345 nameResolverStarted = false; 346 } 347 if (lbHelper != null) { 348 lbHelper.lb.shutdown(); 349 lbHelper = null; 350 } 351 subchannelPicker = null; 352 } 353 354 /** 355 * Make the channel exit idle mode, if it's in it. 356 * 357 * <p>Must be called from channelExecutor 358 */ 359 @VisibleForTesting exitIdleMode()360 void exitIdleMode() { 361 if (shutdown.get() || panicMode) { 362 return; 363 } 364 if (inUseStateAggregator.isInUse()) { 365 // Cancel the timer now, so that a racing due timer will not put Channel on idleness 366 // when the caller of exitIdleMode() is about to use the returned loadBalancer. 367 cancelIdleTimer(false); 368 } else { 369 // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while 370 // isInUse() == false, in which case we still need to schedule the timer. 371 rescheduleIdleTimer(); 372 } 373 if (lbHelper != null) { 374 return; 375 } 376 logger.log(Level.FINE, "[{0}] Exiting idle mode", getLogId()); 377 lbHelper = new LbHelperImpl(nameResolver); 378 lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper); 379 380 NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper); 381 try { 382 nameResolver.start(listener); 383 nameResolverStarted = true; 384 } catch (Throwable t) { 385 listener.onError(Status.fromThrowable(t)); 386 } 387 } 388 389 // Must be run from channelExecutor enterIdleMode()390 private void enterIdleMode() { 391 logger.log(Level.FINE, "[{0}] Entering idle mode", getLogId()); 392 // nameResolver and loadBalancer are guaranteed to be non-null. If any of them were null, 393 // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown() 394 // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of 395 // which are bugs. 396 shutdownNameResolverAndLoadBalancer(true); 397 delayedTransport.reprocess(null); 398 nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams); 399 if (channelTracer != null) { 400 channelTracer.reportEvent( 401 new ChannelTrace.Event.Builder() 402 .setDescription("Entering IDLE state") 403 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 404 .setTimestampNanos(timeProvider.currentTimeNanos()) 405 .build()); 406 } 407 channelStateManager.gotoState(IDLE); 408 if (inUseStateAggregator.isInUse()) { 409 exitIdleMode(); 410 } 411 } 412 413 // Must be run from channelExecutor cancelIdleTimer(boolean permanent)414 private void cancelIdleTimer(boolean permanent) { 415 idleTimer.cancel(permanent); 416 } 417 418 // Always run from channelExecutor rescheduleIdleTimer()419 private void rescheduleIdleTimer() { 420 if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { 421 return; 422 } 423 idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS); 424 } 425 426 // Run from channelExecutor 427 @VisibleForTesting 428 class NameResolverRefresh implements Runnable { 429 // Only mutated from channelExecutor 430 boolean cancelled; 431 432 @Override run()433 public void run() { 434 if (cancelled) { 435 // Race detected: this task was scheduled on channelExecutor before 436 // cancelNameResolverBackoff() could cancel the timer. 437 return; 438 } 439 nameResolverRefreshFuture = null; 440 nameResolverRefresh = null; 441 if (nameResolver != null) { 442 nameResolver.refresh(); 443 } 444 } 445 } 446 447 // Must be used from channelExecutor 448 @Nullable private ScheduledFuture<?> nameResolverRefreshFuture; 449 // Must be used from channelExecutor 450 @Nullable private NameResolverRefresh nameResolverRefresh; 451 // The policy to control backoff between name resolution attempts. Non-null when an attempt is 452 // scheduled. Must be used from channelExecutor 453 @Nullable private BackoffPolicy nameResolverBackoffPolicy; 454 455 // Must be run from channelExecutor cancelNameResolverBackoff()456 private void cancelNameResolverBackoff() { 457 if (nameResolverRefreshFuture != null) { 458 nameResolverRefreshFuture.cancel(false); 459 nameResolverRefresh.cancelled = true; 460 nameResolverRefreshFuture = null; 461 nameResolverRefresh = null; 462 nameResolverBackoffPolicy = null; 463 } 464 } 465 466 private final ClientTransportProvider transportProvider = new ClientTransportProvider() { 467 @Override 468 public ClientTransport get(PickSubchannelArgs args) { 469 SubchannelPicker pickerCopy = subchannelPicker; 470 if (shutdown.get()) { 471 // If channel is shut down, delayedTransport is also shut down which will fail the stream 472 // properly. 473 return delayedTransport; 474 } 475 if (pickerCopy == null) { 476 channelExecutor.executeLater(new Runnable() { 477 @Override 478 public void run() { 479 exitIdleMode(); 480 } 481 }).drain(); 482 return delayedTransport; 483 } 484 // There is no need to reschedule the idle timer here. 485 // 486 // pickerCopy != null, which means idle timer has not expired when this method starts. 487 // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer 488 // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after 489 // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it. 490 // 491 // In most cases the idle timer is scheduled to fire after the transport has created the 492 // stream, which would have reported in-use state to the channel that would have cancelled 493 // the idle timer. 494 PickResult pickResult = pickerCopy.pickSubchannel(args); 495 ClientTransport transport = GrpcUtil.getTransportFromPickResult( 496 pickResult, args.getCallOptions().isWaitForReady()); 497 if (transport != null) { 498 return transport; 499 } 500 return delayedTransport; 501 } 502 503 @Override 504 public <ReqT> RetriableStream<ReqT> newRetriableStream( 505 final MethodDescriptor<ReqT, ?> method, 506 final CallOptions callOptions, 507 final Metadata headers, 508 final Context context) { 509 checkState(retryEnabled, "retry should be enabled"); 510 return new RetriableStream<ReqT>( 511 method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit, 512 getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(), 513 callOptions.getOption(RETRY_POLICY_KEY), callOptions.getOption(HEDGING_POLICY_KEY), 514 throttle) { 515 @Override 516 Status prestart() { 517 return uncommittedRetriableStreamsRegistry.add(this); 518 } 519 520 @Override 521 void postCommit() { 522 uncommittedRetriableStreamsRegistry.remove(this); 523 } 524 525 @Override 526 ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata newHeaders) { 527 CallOptions newOptions = callOptions.withStreamTracerFactory(tracerFactory); 528 ClientTransport transport = 529 get(new PickSubchannelArgsImpl(method, newHeaders, newOptions)); 530 Context origContext = context.attach(); 531 try { 532 return transport.newStream(method, newHeaders, newOptions); 533 } finally { 534 context.detach(origContext); 535 } 536 } 537 }; 538 } 539 }; 540 541 private final Rescheduler idleTimer; 542 ManagedChannelImpl( AbstractManagedChannelImplBuilder<?> builder, ClientTransportFactory clientTransportFactory, BackoffPolicy.Provider backoffPolicyProvider, ObjectPool<? extends Executor> oobExecutorPool, Supplier<Stopwatch> stopwatchSupplier, List<ClientInterceptor> interceptors, final TimeProvider timeProvider)543 ManagedChannelImpl( 544 AbstractManagedChannelImplBuilder<?> builder, 545 ClientTransportFactory clientTransportFactory, 546 BackoffPolicy.Provider backoffPolicyProvider, 547 ObjectPool<? extends Executor> oobExecutorPool, 548 Supplier<Stopwatch> stopwatchSupplier, 549 List<ClientInterceptor> interceptors, 550 final TimeProvider timeProvider) { 551 this.target = checkNotNull(builder.target, "target"); 552 this.nameResolverFactory = builder.getNameResolverFactory(); 553 this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams"); 554 this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams); 555 if (builder.loadBalancerFactory == null) { 556 this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(); 557 } else { 558 this.loadBalancerFactory = builder.loadBalancerFactory; 559 } 560 this.executorPool = checkNotNull(builder.executorPool, "executorPool"); 561 this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool"); 562 this.executor = checkNotNull(executorPool.getObject(), "executor"); 563 this.delayedTransport = new DelayedClientTransport(this.executor, this.channelExecutor); 564 this.delayedTransport.start(delayedTransportListener); 565 this.backoffPolicyProvider = backoffPolicyProvider; 566 this.transportFactory = 567 new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor); 568 this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry; 569 serviceConfigInterceptor = new ServiceConfigInterceptor( 570 retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts); 571 Channel channel = new RealChannel(); 572 channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor); 573 if (builder.binlog != null) { 574 channel = builder.binlog.wrapChannel(channel); 575 } 576 this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors); 577 this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); 578 if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { 579 this.idleTimeoutMillis = builder.idleTimeoutMillis; 580 } else { 581 checkArgument( 582 builder.idleTimeoutMillis 583 >= AbstractManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS, 584 "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis); 585 this.idleTimeoutMillis = builder.idleTimeoutMillis; 586 } 587 588 final class AutoDrainChannelExecutor implements Executor { 589 590 @Override 591 public void execute(Runnable command) { 592 channelExecutor.executeLater(command); 593 channelExecutor.drain(); 594 } 595 } 596 597 idleTimer = new Rescheduler( 598 new IdleModeTimer(), 599 new AutoDrainChannelExecutor(), 600 transportFactory.getScheduledExecutorService(), 601 stopwatchSupplier.get()); 602 this.fullStreamDecompression = builder.fullStreamDecompression; 603 this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry"); 604 this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry"); 605 this.userAgent = builder.userAgent; 606 607 this.channelBufferLimit = builder.retryBufferSize; 608 this.perRpcBufferLimit = builder.perRpcBufferLimit; 609 this.timeProvider = checkNotNull(timeProvider, "timeProvider"); 610 this.callTracerFactory = new CallTracer.Factory() { 611 @Override 612 public CallTracer create() { 613 return new CallTracer(timeProvider); 614 } 615 }; 616 channelCallTracer = callTracerFactory.create(); 617 this.channelz = checkNotNull(builder.channelz); 618 channelz.addRootChannel(this); 619 620 maxTraceEvents = builder.maxTraceEvents; 621 if (maxTraceEvents > 0) { 622 long currentTimeNanos = timeProvider.currentTimeNanos(); 623 channelTracer = new ChannelTracer(builder.maxTraceEvents, currentTimeNanos, "Channel"); 624 } else { 625 channelTracer = null; 626 } 627 logger.log(Level.FINE, "[{0}] Created with target {1}", new Object[] {getLogId(), target}); 628 } 629 630 @VisibleForTesting getNameResolver(String target, NameResolver.Factory nameResolverFactory, Attributes nameResolverParams)631 static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory, 632 Attributes nameResolverParams) { 633 // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending 634 // "dns:///". 635 URI targetUri = null; 636 StringBuilder uriSyntaxErrors = new StringBuilder(); 637 try { 638 targetUri = new URI(target); 639 // For "localhost:8080" this would likely cause newNameResolver to return null, because 640 // "localhost" is parsed as the scheme. Will fall into the next branch and try 641 // "dns:///localhost:8080". 642 } catch (URISyntaxException e) { 643 // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234. 644 uriSyntaxErrors.append(e.getMessage()); 645 } 646 if (targetUri != null) { 647 NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams); 648 if (resolver != null) { 649 return resolver; 650 } 651 // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an 652 // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080" 653 } 654 655 // If we reached here, the targetUri couldn't be used. 656 if (!URI_PATTERN.matcher(target).matches()) { 657 // It doesn't look like a URI target. Maybe it's an authority string. Try with the default 658 // scheme from the factory. 659 try { 660 targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null); 661 } catch (URISyntaxException e) { 662 // Should not be possible. 663 throw new IllegalArgumentException(e); 664 } 665 NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams); 666 if (resolver != null) { 667 return resolver; 668 } 669 } 670 throw new IllegalArgumentException(String.format( 671 "cannot find a NameResolver for %s%s", 672 target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : "")); 673 } 674 675 /** 676 * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately 677 * cancelled. 678 */ 679 @Override shutdown()680 public ManagedChannelImpl shutdown() { 681 logger.log(Level.FINE, "[{0}] shutdown() called", getLogId()); 682 if (!shutdown.compareAndSet(false, true)) { 683 return this; 684 } 685 686 // Put gotoState(SHUTDOWN) as early into the channelExecutor's queue as possible. 687 // delayedTransport.shutdown() may also add some tasks into the queue. But some things inside 688 // delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the 689 // channelExecutor's queue and should not be blocked, so we do not drain() immediately here. 690 channelExecutor.executeLater(new Runnable() { 691 @Override 692 public void run() { 693 if (channelTracer != null) { 694 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 695 .setDescription("Entering SHUTDOWN state") 696 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 697 .setTimestampNanos(timeProvider.currentTimeNanos()) 698 .build()); 699 } 700 channelStateManager.gotoState(SHUTDOWN); 701 } 702 }); 703 704 uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS); 705 channelExecutor.executeLater(new Runnable() { 706 @Override 707 public void run() { 708 cancelIdleTimer(/* permanent= */ true); 709 } 710 }).drain(); 711 logger.log(Level.FINE, "[{0}] Shutting down", getLogId()); 712 return this; 713 } 714 715 /** 716 * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although 717 * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely 718 * return {@code false} immediately after this method returns. 719 */ 720 @Override shutdownNow()721 public ManagedChannelImpl shutdownNow() { 722 logger.log(Level.FINE, "[{0}] shutdownNow() called", getLogId()); 723 shutdown(); 724 uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS); 725 channelExecutor.executeLater(new Runnable() { 726 @Override 727 public void run() { 728 if (shutdownNowed) { 729 return; 730 } 731 shutdownNowed = true; 732 maybeShutdownNowSubchannels(); 733 } 734 }).drain(); 735 return this; 736 } 737 738 // Called from channelExecutor 739 @VisibleForTesting panic(final Throwable t)740 void panic(final Throwable t) { 741 if (panicMode) { 742 // Preserve the first panic information 743 return; 744 } 745 panicMode = true; 746 cancelIdleTimer(/* permanent= */ true); 747 shutdownNameResolverAndLoadBalancer(false); 748 SubchannelPicker newPicker = new SubchannelPicker() { 749 final PickResult panicPickResult = 750 PickResult.withDrop( 751 Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t)); 752 @Override 753 public PickResult pickSubchannel(PickSubchannelArgs args) { 754 return panicPickResult; 755 } 756 }; 757 updateSubchannelPicker(newPicker); 758 if (channelTracer != null) { 759 channelTracer.reportEvent( 760 new ChannelTrace.Event.Builder() 761 .setDescription("Entering TRANSIENT_FAILURE state") 762 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 763 .setTimestampNanos(timeProvider.currentTimeNanos()) 764 .build()); 765 } 766 channelStateManager.gotoState(TRANSIENT_FAILURE); 767 } 768 769 // Called from channelExecutor updateSubchannelPicker(SubchannelPicker newPicker)770 private void updateSubchannelPicker(SubchannelPicker newPicker) { 771 subchannelPicker = newPicker; 772 delayedTransport.reprocess(newPicker); 773 } 774 775 @Override isShutdown()776 public boolean isShutdown() { 777 return shutdown.get(); 778 } 779 780 @Override awaitTermination(long timeout, TimeUnit unit)781 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 782 return terminatedLatch.await(timeout, unit); 783 } 784 785 @Override isTerminated()786 public boolean isTerminated() { 787 return terminated; 788 } 789 790 /* 791 * Creates a new outgoing call on the channel. 792 */ 793 @Override newCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions)794 public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method, 795 CallOptions callOptions) { 796 return interceptorChannel.newCall(method, callOptions); 797 } 798 799 @Override authority()800 public String authority() { 801 return interceptorChannel.authority(); 802 } 803 getCallExecutor(CallOptions callOptions)804 private Executor getCallExecutor(CallOptions callOptions) { 805 Executor executor = callOptions.getExecutor(); 806 if (executor == null) { 807 executor = this.executor; 808 } 809 return executor; 810 } 811 812 private class RealChannel extends Channel { 813 @Override newCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions)814 public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method, 815 CallOptions callOptions) { 816 return new ClientCallImpl<ReqT, RespT>( 817 method, 818 getCallExecutor(callOptions), 819 callOptions, 820 transportProvider, 821 terminated ? null : transportFactory.getScheduledExecutorService(), 822 channelCallTracer, 823 retryEnabled) 824 .setFullStreamDecompression(fullStreamDecompression) 825 .setDecompressorRegistry(decompressorRegistry) 826 .setCompressorRegistry(compressorRegistry); 827 } 828 829 @Override authority()830 public String authority() { 831 String authority = nameResolver.getServiceAuthority(); 832 return checkNotNull(authority, "authority"); 833 } 834 } 835 836 /** 837 * Terminate the channel if termination conditions are met. 838 */ 839 // Must be run from channelExecutor maybeTerminateChannel()840 private void maybeTerminateChannel() { 841 if (terminated) { 842 return; 843 } 844 if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) { 845 logger.log(Level.FINE, "[{0}] Terminated", getLogId()); 846 channelz.removeRootChannel(this); 847 terminated = true; 848 terminatedLatch.countDown(); 849 executorPool.returnObject(executor); 850 // Release the transport factory so that it can deallocate any resources. 851 transportFactory.close(); 852 } 853 } 854 855 @Override getState(boolean requestConnection)856 public ConnectivityState getState(boolean requestConnection) { 857 ConnectivityState savedChannelState = channelStateManager.getState(); 858 if (requestConnection && savedChannelState == IDLE) { 859 channelExecutor.executeLater( 860 new Runnable() { 861 @Override 862 public void run() { 863 exitIdleMode(); 864 if (subchannelPicker != null) { 865 subchannelPicker.requestConnection(); 866 } 867 } 868 }).drain(); 869 } 870 return savedChannelState; 871 } 872 873 @Override notifyWhenStateChanged(final ConnectivityState source, final Runnable callback)874 public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) { 875 channelExecutor.executeLater( 876 new Runnable() { 877 @Override 878 public void run() { 879 channelStateManager.notifyWhenStateChanged(callback, executor, source); 880 } 881 }).drain(); 882 } 883 884 @Override resetConnectBackoff()885 public void resetConnectBackoff() { 886 channelExecutor 887 .executeLater( 888 new Runnable() { 889 @Override 890 public void run() { 891 if (shutdown.get()) { 892 return; 893 } 894 if (nameResolverRefreshFuture != null) { 895 checkState(nameResolverStarted, "name resolver must be started"); 896 cancelNameResolverBackoff(); 897 nameResolver.refresh(); 898 } 899 for (InternalSubchannel subchannel : subchannels) { 900 subchannel.resetConnectBackoff(); 901 } 902 for (OobChannel oobChannel : oobChannels) { 903 oobChannel.resetConnectBackoff(); 904 } 905 } 906 }) 907 .drain(); 908 } 909 910 @Override enterIdle()911 public void enterIdle() { 912 class PrepareToLoseNetworkRunnable implements Runnable { 913 @Override 914 public void run() { 915 if (shutdown.get() || lbHelper == null) { 916 return; 917 } 918 cancelIdleTimer(/* permanent= */ false); 919 enterIdleMode(); 920 } 921 } 922 923 channelExecutor.executeLater(new PrepareToLoseNetworkRunnable()).drain(); 924 } 925 926 /** 927 * A registry that prevents channel shutdown from killing existing retry attempts that are in 928 * backoff. 929 */ 930 private final class UncommittedRetriableStreamsRegistry { 931 // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream, 932 // it's worthwhile to look for a lock-free approach. 933 final Object lock = new Object(); 934 935 @GuardedBy("lock") 936 Collection<ClientStream> uncommittedRetriableStreams = new HashSet<ClientStream>(); 937 938 @GuardedBy("lock") 939 Status shutdownStatus; 940 onShutdown(Status reason)941 void onShutdown(Status reason) { 942 boolean shouldShutdownDelayedTransport = false; 943 synchronized (lock) { 944 if (shutdownStatus != null) { 945 return; 946 } 947 shutdownStatus = reason; 948 // Keep the delayedTransport open until there is no more uncommitted streams, b/c those 949 // retriable streams, which may be in backoff and not using any transport, are already 950 // started RPCs. 951 if (uncommittedRetriableStreams.isEmpty()) { 952 shouldShutdownDelayedTransport = true; 953 } 954 } 955 956 if (shouldShutdownDelayedTransport) { 957 delayedTransport.shutdown(reason); 958 } 959 } 960 onShutdownNow(Status reason)961 void onShutdownNow(Status reason) { 962 onShutdown(reason); 963 Collection<ClientStream> streams; 964 965 synchronized (lock) { 966 streams = new ArrayList<>(uncommittedRetriableStreams); 967 } 968 969 for (ClientStream stream : streams) { 970 stream.cancel(reason); 971 } 972 delayedTransport.shutdownNow(reason); 973 } 974 975 /** 976 * Registers a RetriableStream and return null if not shutdown, otherwise just returns the 977 * shutdown Status. 978 */ 979 @Nullable add(RetriableStream<?> retriableStream)980 Status add(RetriableStream<?> retriableStream) { 981 synchronized (lock) { 982 if (shutdownStatus != null) { 983 return shutdownStatus; 984 } 985 uncommittedRetriableStreams.add(retriableStream); 986 return null; 987 } 988 } 989 remove(RetriableStream<?> retriableStream)990 void remove(RetriableStream<?> retriableStream) { 991 Status shutdownStatusCopy = null; 992 993 synchronized (lock) { 994 uncommittedRetriableStreams.remove(retriableStream); 995 if (uncommittedRetriableStreams.isEmpty()) { 996 shutdownStatusCopy = shutdownStatus; 997 // Because retriable transport is long-lived, we take this opportunity to down-size the 998 // hashmap. 999 uncommittedRetriableStreams = new HashSet<ClientStream>(); 1000 } 1001 } 1002 1003 if (shutdownStatusCopy != null) { 1004 delayedTransport.shutdown(shutdownStatusCopy); 1005 } 1006 } 1007 } 1008 1009 private class LbHelperImpl extends LoadBalancer.Helper { 1010 LoadBalancer lb; 1011 final NameResolver nr; 1012 LbHelperImpl(NameResolver nr)1013 LbHelperImpl(NameResolver nr) { 1014 this.nr = checkNotNull(nr, "NameResolver"); 1015 } 1016 1017 // Must be called from channelExecutor handleInternalSubchannelState(ConnectivityStateInfo newState)1018 private void handleInternalSubchannelState(ConnectivityStateInfo newState) { 1019 if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) { 1020 nr.refresh(); 1021 } 1022 } 1023 1024 @Override createSubchannel( List<EquivalentAddressGroup> addressGroups, Attributes attrs)1025 public AbstractSubchannel createSubchannel( 1026 List<EquivalentAddressGroup> addressGroups, Attributes attrs) { 1027 checkNotNull(addressGroups, "addressGroups"); 1028 checkNotNull(attrs, "attrs"); 1029 // TODO(ejona): can we be even stricter? Like loadBalancer == null? 1030 checkState(!terminated, "Channel is terminated"); 1031 final SubchannelImpl subchannel = new SubchannelImpl(attrs); 1032 ChannelTracer subchannelTracer = null; 1033 long subchannelCreationTime = timeProvider.currentTimeNanos(); 1034 if (maxTraceEvents > 0) { 1035 subchannelTracer = new ChannelTracer(maxTraceEvents, subchannelCreationTime, "Subchannel"); 1036 } 1037 final InternalSubchannel internalSubchannel = new InternalSubchannel( 1038 addressGroups, 1039 authority(), 1040 userAgent, 1041 backoffPolicyProvider, 1042 transportFactory, 1043 transportFactory.getScheduledExecutorService(), 1044 stopwatchSupplier, 1045 channelExecutor, 1046 new InternalSubchannel.Callback() { 1047 // All callbacks are run in channelExecutor 1048 @Override 1049 void onTerminated(InternalSubchannel is) { 1050 subchannels.remove(is); 1051 channelz.removeSubchannel(is); 1052 maybeTerminateChannel(); 1053 } 1054 1055 @Override 1056 void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { 1057 handleInternalSubchannelState(newState); 1058 // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. 1059 if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) { 1060 lb.handleSubchannelState(subchannel, newState); 1061 } 1062 } 1063 1064 @Override 1065 void onInUse(InternalSubchannel is) { 1066 inUseStateAggregator.updateObjectInUse(is, true); 1067 } 1068 1069 @Override 1070 void onNotInUse(InternalSubchannel is) { 1071 inUseStateAggregator.updateObjectInUse(is, false); 1072 } 1073 }, 1074 channelz, 1075 callTracerFactory.create(), 1076 subchannelTracer, 1077 timeProvider); 1078 if (channelTracer != null) { 1079 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 1080 .setDescription("Child channel created") 1081 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1082 .setTimestampNanos(subchannelCreationTime) 1083 .setSubchannelRef(internalSubchannel) 1084 .build()); 1085 } 1086 channelz.addSubchannel(internalSubchannel); 1087 subchannel.subchannel = internalSubchannel; 1088 logger.log(Level.FINE, "[{0}] {1} created for {2}", 1089 new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroups}); 1090 runSerialized(new Runnable() { 1091 @Override 1092 public void run() { 1093 if (terminating) { 1094 // Because runSerialized() doesn't guarantee the runnable has been executed upon when 1095 // returning, the subchannel may still be returned to the balancer without being 1096 // shutdown even if "terminating" is already true. The subchannel will not be used in 1097 // this case, because delayed transport has terminated when "terminating" becomes 1098 // true, and no more requests will be sent to balancer beyond this point. 1099 internalSubchannel.shutdown(SHUTDOWN_STATUS); 1100 } 1101 if (!terminated) { 1102 // If channel has not terminated, it will track the subchannel and block termination 1103 // for it. 1104 subchannels.add(internalSubchannel); 1105 } 1106 } 1107 }); 1108 return subchannel; 1109 } 1110 1111 @Override updateBalancingState( final ConnectivityState newState, final SubchannelPicker newPicker)1112 public void updateBalancingState( 1113 final ConnectivityState newState, final SubchannelPicker newPicker) { 1114 checkNotNull(newState, "newState"); 1115 checkNotNull(newPicker, "newPicker"); 1116 1117 runSerialized( 1118 new Runnable() { 1119 @Override 1120 public void run() { 1121 if (LbHelperImpl.this != lbHelper) { 1122 return; 1123 } 1124 updateSubchannelPicker(newPicker); 1125 // It's not appropriate to report SHUTDOWN state from lb. 1126 // Ignore the case of newState == SHUTDOWN for now. 1127 if (newState != SHUTDOWN) { 1128 if (channelTracer != null) { 1129 channelTracer.reportEvent( 1130 new ChannelTrace.Event.Builder() 1131 .setDescription("Entering " + newState + " state") 1132 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1133 .setTimestampNanos(timeProvider.currentTimeNanos()) 1134 .build()); 1135 } 1136 channelStateManager.gotoState(newState); 1137 } 1138 } 1139 }); 1140 } 1141 1142 @Override updateSubchannelAddresses( LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs)1143 public void updateSubchannelAddresses( 1144 LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs) { 1145 checkArgument(subchannel instanceof SubchannelImpl, 1146 "subchannel must have been returned from createSubchannel"); 1147 ((SubchannelImpl) subchannel).subchannel.updateAddresses(addrs); 1148 } 1149 1150 @Override createOobChannel(EquivalentAddressGroup addressGroup, String authority)1151 public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) { 1152 // TODO(ejona): can we be even stricter? Like terminating? 1153 checkState(!terminated, "Channel is terminated"); 1154 long oobChannelCreationTime = timeProvider.currentTimeNanos(); 1155 ChannelTracer oobChannelTracer = null; 1156 ChannelTracer subchannelTracer = null; 1157 if (channelTracer != null) { 1158 oobChannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "OobChannel"); 1159 } 1160 final OobChannel oobChannel = new OobChannel( 1161 authority, oobExecutorPool, transportFactory.getScheduledExecutorService(), 1162 channelExecutor, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider); 1163 if (channelTracer != null) { 1164 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 1165 .setDescription("Child channel created") 1166 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1167 .setTimestampNanos(oobChannelCreationTime) 1168 .setChannelRef(oobChannel) 1169 .build()); 1170 subchannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "Subchannel"); 1171 } 1172 final InternalSubchannel internalSubchannel = new InternalSubchannel( 1173 Collections.singletonList(addressGroup), 1174 authority, userAgent, backoffPolicyProvider, transportFactory, 1175 transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor, 1176 // All callback methods are run from channelExecutor 1177 new InternalSubchannel.Callback() { 1178 @Override 1179 void onTerminated(InternalSubchannel is) { 1180 oobChannels.remove(oobChannel); 1181 channelz.removeSubchannel(is); 1182 oobChannel.handleSubchannelTerminated(); 1183 maybeTerminateChannel(); 1184 } 1185 1186 @Override 1187 void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { 1188 handleInternalSubchannelState(newState); 1189 oobChannel.handleSubchannelStateChange(newState); 1190 } 1191 }, 1192 channelz, 1193 callTracerFactory.create(), 1194 subchannelTracer, 1195 timeProvider); 1196 if (oobChannelTracer != null) { 1197 oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder() 1198 .setDescription("Child channel created") 1199 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1200 .setTimestampNanos(oobChannelCreationTime) 1201 .setSubchannelRef(internalSubchannel) 1202 .build()); 1203 } 1204 channelz.addSubchannel(oobChannel); 1205 channelz.addSubchannel(internalSubchannel); 1206 oobChannel.setSubchannel(internalSubchannel); 1207 runSerialized(new Runnable() { 1208 @Override 1209 public void run() { 1210 if (terminating) { 1211 oobChannel.shutdown(); 1212 } 1213 if (!terminated) { 1214 // If channel has not terminated, it will track the subchannel and block termination 1215 // for it. 1216 oobChannels.add(oobChannel); 1217 } 1218 } 1219 }); 1220 return oobChannel; 1221 } 1222 1223 @Override updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag)1224 public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) { 1225 checkArgument(channel instanceof OobChannel, 1226 "channel must have been returned from createOobChannel"); 1227 ((OobChannel) channel).updateAddresses(eag); 1228 } 1229 1230 @Override getAuthority()1231 public String getAuthority() { 1232 return ManagedChannelImpl.this.authority(); 1233 } 1234 1235 @Override getNameResolverFactory()1236 public NameResolver.Factory getNameResolverFactory() { 1237 return nameResolverFactory; 1238 } 1239 1240 @Override runSerialized(Runnable task)1241 public void runSerialized(Runnable task) { 1242 channelExecutor.executeLater(task).drain(); 1243 } 1244 } 1245 1246 private class NameResolverListenerImpl implements NameResolver.Listener { 1247 final LbHelperImpl helper; 1248 NameResolverListenerImpl(LbHelperImpl helperImpl)1249 NameResolverListenerImpl(LbHelperImpl helperImpl) { 1250 this.helper = helperImpl; 1251 } 1252 1253 @Override onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config)1254 public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) { 1255 if (servers.isEmpty()) { 1256 onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list")); 1257 return; 1258 } 1259 if (logger.isLoggable(Level.FINE)) { 1260 logger.log(Level.FINE, "[{0}] resolved address: {1}, config={2}", 1261 new Object[]{getLogId(), servers, config}); 1262 } 1263 1264 if (channelTracer != null && (haveBackends == null || !haveBackends)) { 1265 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 1266 .setDescription("Address resolved: " + servers) 1267 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1268 .setTimestampNanos(timeProvider.currentTimeNanos()) 1269 .build()); 1270 haveBackends = true; 1271 } 1272 final Map<String, Object> serviceConfig = 1273 config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); 1274 if (channelTracer != null && serviceConfig != null 1275 && !serviceConfig.equals(lastServiceConfig)) { 1276 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 1277 .setDescription("Service config changed") 1278 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 1279 .setTimestampNanos(timeProvider.currentTimeNanos()) 1280 .build()); 1281 lastServiceConfig = serviceConfig; 1282 } 1283 1284 final class NamesResolved implements Runnable { 1285 @Override 1286 public void run() { 1287 // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. 1288 if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { 1289 return; 1290 } 1291 1292 nameResolverBackoffPolicy = null; 1293 1294 if (serviceConfig != null) { 1295 try { 1296 serviceConfigInterceptor.handleUpdate(serviceConfig); 1297 if (retryEnabled) { 1298 throttle = getThrottle(config); 1299 } 1300 } catch (RuntimeException re) { 1301 logger.log( 1302 Level.WARNING, 1303 "[" + getLogId() + "] Unexpected exception from parsing service config", 1304 re); 1305 } 1306 } 1307 1308 helper.lb.handleResolvedAddressGroups(servers, config); 1309 } 1310 } 1311 1312 helper.runSerialized(new NamesResolved()); 1313 } 1314 1315 @Override onError(final Status error)1316 public void onError(final Status error) { 1317 checkArgument(!error.isOk(), "the error status must not be OK"); 1318 logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}", 1319 new Object[] {getLogId(), error}); 1320 if (channelTracer != null && (haveBackends == null || haveBackends)) { 1321 channelTracer.reportEvent(new ChannelTrace.Event.Builder() 1322 .setDescription("Failed to resolve name") 1323 .setSeverity(ChannelTrace.Event.Severity.CT_WARNING) 1324 .setTimestampNanos(timeProvider.currentTimeNanos()) 1325 .build()); 1326 haveBackends = false; 1327 } 1328 channelExecutor 1329 .executeLater( 1330 new Runnable() { 1331 @Override 1332 public void run() { 1333 // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. 1334 if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { 1335 return; 1336 } 1337 helper.lb.handleNameResolutionError(error); 1338 if (nameResolverRefreshFuture != null) { 1339 // The name resolver may invoke onError multiple times, but we only want to 1340 // schedule one backoff attempt 1341 // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we 1342 // want to reset the backoff interval upon repeated onError() calls 1343 return; 1344 } 1345 if (nameResolverBackoffPolicy == null) { 1346 nameResolverBackoffPolicy = backoffPolicyProvider.get(); 1347 } 1348 long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos(); 1349 if (logger.isLoggable(Level.FINE)) { 1350 logger.log( 1351 Level.FINE, 1352 "[{0}] Scheduling DNS resolution backoff for {1} ns", 1353 new Object[] {logId, delayNanos}); 1354 } 1355 nameResolverRefresh = new NameResolverRefresh(); 1356 nameResolverRefreshFuture = 1357 transportFactory 1358 .getScheduledExecutorService() 1359 .schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS); 1360 } 1361 }) 1362 .drain(); 1363 } 1364 } 1365 1366 @Nullable getThrottle(Attributes config)1367 private static Throttle getThrottle(Attributes config) { 1368 return ServiceConfigUtil.getThrottlePolicy( 1369 config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)); 1370 } 1371 1372 private final class SubchannelImpl extends AbstractSubchannel { 1373 // Set right after SubchannelImpl is created. 1374 InternalSubchannel subchannel; 1375 final Object shutdownLock = new Object(); 1376 final Attributes attrs; 1377 1378 @GuardedBy("shutdownLock") 1379 boolean shutdownRequested; 1380 @GuardedBy("shutdownLock") 1381 ScheduledFuture<?> delayedShutdownTask; 1382 SubchannelImpl(Attributes attrs)1383 SubchannelImpl(Attributes attrs) { 1384 this.attrs = checkNotNull(attrs, "attrs"); 1385 } 1386 1387 @Override obtainActiveTransport()1388 ClientTransport obtainActiveTransport() { 1389 return subchannel.obtainActiveTransport(); 1390 } 1391 1392 @Override getInternalSubchannel()1393 InternalInstrumented<ChannelStats> getInternalSubchannel() { 1394 return subchannel; 1395 } 1396 1397 @Override shutdown()1398 public void shutdown() { 1399 synchronized (shutdownLock) { 1400 if (shutdownRequested) { 1401 if (terminating && delayedShutdownTask != null) { 1402 // shutdown() was previously called when terminating == false, thus a delayed shutdown() 1403 // was scheduled. Now since terminating == true, We should expedite the shutdown. 1404 delayedShutdownTask.cancel(false); 1405 delayedShutdownTask = null; 1406 // Will fall through to the subchannel.shutdown() at the end. 1407 } else { 1408 return; 1409 } 1410 } else { 1411 shutdownRequested = true; 1412 } 1413 // Add a delay to shutdown to deal with the race between 1) a transport being picked and 1414 // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g., 1415 // because of address change, or because LoadBalancer is shutdown by Channel entering idle 1416 // mode). If (2) wins, the app will see a spurious error. We work around this by delaying 1417 // shutdown of Subchannel for a few seconds here. 1418 // 1419 // TODO(zhangkun83): consider a better approach 1420 // (https://github.com/grpc/grpc-java/issues/2562). 1421 if (!terminating) { 1422 delayedShutdownTask = transportFactory.getScheduledExecutorService().schedule( 1423 new LogExceptionRunnable( 1424 new Runnable() { 1425 @Override 1426 public void run() { 1427 subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS); 1428 } 1429 }), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); 1430 return; 1431 } 1432 } 1433 // When terminating == true, no more real streams will be created. It's safe and also 1434 // desirable to shutdown timely. 1435 subchannel.shutdown(SHUTDOWN_STATUS); 1436 } 1437 1438 @Override requestConnection()1439 public void requestConnection() { 1440 subchannel.obtainActiveTransport(); 1441 } 1442 1443 @Override getAllAddresses()1444 public List<EquivalentAddressGroup> getAllAddresses() { 1445 return subchannel.getAddressGroups(); 1446 } 1447 1448 @Override getAttributes()1449 public Attributes getAttributes() { 1450 return attrs; 1451 } 1452 1453 @Override toString()1454 public String toString() { 1455 return subchannel.getLogId().toString(); 1456 } 1457 } 1458 1459 @Override toString()1460 public String toString() { 1461 return MoreObjects.toStringHelper(this) 1462 .add("logId", logId.getId()) 1463 .add("target", target) 1464 .toString(); 1465 } 1466 } 1467