• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23 import static io.grpc.ConnectivityState.IDLE;
24 import static io.grpc.ConnectivityState.SHUTDOWN;
25 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
26 import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
27 
28 import com.google.common.annotations.VisibleForTesting;
29 import com.google.common.base.MoreObjects;
30 import com.google.common.base.Stopwatch;
31 import com.google.common.base.Supplier;
32 import com.google.common.util.concurrent.ListenableFuture;
33 import com.google.common.util.concurrent.SettableFuture;
34 import io.grpc.Attributes;
35 import io.grpc.CallCredentials;
36 import io.grpc.CallOptions;
37 import io.grpc.Channel;
38 import io.grpc.ChannelCredentials;
39 import io.grpc.ChannelLogger;
40 import io.grpc.ChannelLogger.ChannelLogLevel;
41 import io.grpc.ClientCall;
42 import io.grpc.ClientInterceptor;
43 import io.grpc.ClientInterceptors;
44 import io.grpc.ClientStreamTracer;
45 import io.grpc.CompressorRegistry;
46 import io.grpc.ConnectivityState;
47 import io.grpc.ConnectivityStateInfo;
48 import io.grpc.Context;
49 import io.grpc.DecompressorRegistry;
50 import io.grpc.EquivalentAddressGroup;
51 import io.grpc.ForwardingChannelBuilder;
52 import io.grpc.ForwardingClientCall;
53 import io.grpc.Grpc;
54 import io.grpc.InternalChannelz;
55 import io.grpc.InternalChannelz.ChannelStats;
56 import io.grpc.InternalChannelz.ChannelTrace;
57 import io.grpc.InternalConfigSelector;
58 import io.grpc.InternalInstrumented;
59 import io.grpc.InternalLogId;
60 import io.grpc.InternalWithLogId;
61 import io.grpc.LoadBalancer;
62 import io.grpc.LoadBalancer.CreateSubchannelArgs;
63 import io.grpc.LoadBalancer.PickResult;
64 import io.grpc.LoadBalancer.PickSubchannelArgs;
65 import io.grpc.LoadBalancer.ResolvedAddresses;
66 import io.grpc.LoadBalancer.SubchannelPicker;
67 import io.grpc.LoadBalancer.SubchannelStateListener;
68 import io.grpc.ManagedChannel;
69 import io.grpc.ManagedChannelBuilder;
70 import io.grpc.Metadata;
71 import io.grpc.MethodDescriptor;
72 import io.grpc.NameResolver;
73 import io.grpc.NameResolver.ConfigOrError;
74 import io.grpc.NameResolver.ResolutionResult;
75 import io.grpc.NameResolverRegistry;
76 import io.grpc.ProxyDetector;
77 import io.grpc.Status;
78 import io.grpc.SynchronizationContext;
79 import io.grpc.SynchronizationContext.ScheduledHandle;
80 import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
81 import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
82 import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
83 import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
84 import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
85 import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
86 import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
87 import io.grpc.internal.RetriableStream.ChannelBufferMeter;
88 import io.grpc.internal.RetriableStream.Throttle;
89 import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
90 import java.net.URI;
91 import java.net.URISyntaxException;
92 import java.util.ArrayList;
93 import java.util.Collection;
94 import java.util.Collections;
95 import java.util.HashSet;
96 import java.util.LinkedHashSet;
97 import java.util.List;
98 import java.util.Map;
99 import java.util.Set;
100 import java.util.concurrent.Callable;
101 import java.util.concurrent.CountDownLatch;
102 import java.util.concurrent.ExecutionException;
103 import java.util.concurrent.Executor;
104 import java.util.concurrent.Future;
105 import java.util.concurrent.ScheduledExecutorService;
106 import java.util.concurrent.ScheduledFuture;
107 import java.util.concurrent.TimeUnit;
108 import java.util.concurrent.TimeoutException;
109 import java.util.concurrent.atomic.AtomicBoolean;
110 import java.util.concurrent.atomic.AtomicReference;
111 import java.util.logging.Level;
112 import java.util.logging.Logger;
113 import java.util.regex.Pattern;
114 import javax.annotation.Nullable;
115 import javax.annotation.concurrent.GuardedBy;
116 import javax.annotation.concurrent.ThreadSafe;
117 
118 /** A communication channel for making outgoing RPCs. */
119 @ThreadSafe
120 final class ManagedChannelImpl extends ManagedChannel implements
121     InternalInstrumented<ChannelStats> {
122   @VisibleForTesting
123   static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
124 
125   // Matching this pattern means the target string is a URI target or at least intended to be one.
126   // A URI target must be an absolute hierarchical URI.
127   // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
128   @VisibleForTesting
129   static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
130 
131   static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
132 
133   static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
134 
135   @VisibleForTesting
136   static final Status SHUTDOWN_NOW_STATUS =
137       Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
138 
139   @VisibleForTesting
140   static final Status SHUTDOWN_STATUS =
141       Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
142 
143   @VisibleForTesting
144   static final Status SUBCHANNEL_SHUTDOWN_STATUS =
145       Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
146 
147   private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
148       ManagedChannelServiceConfig.empty();
149   private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
150       new InternalConfigSelector() {
151         @Override
152         public Result selectConfig(PickSubchannelArgs args) {
153           throw new IllegalStateException("Resolution is pending");
154         }
155       };
156 
157   private final InternalLogId logId;
158   private final String target;
159   @Nullable
160   private final String authorityOverride;
161   private final NameResolverRegistry nameResolverRegistry;
162   private final NameResolver.Factory nameResolverFactory;
163   private final NameResolver.Args nameResolverArgs;
164   private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
165   private final ClientTransportFactory originalTransportFactory;
166   @Nullable
167   private final ChannelCredentials originalChannelCreds;
168   private final ClientTransportFactory transportFactory;
169   private final ClientTransportFactory oobTransportFactory;
170   private final RestrictedScheduledExecutor scheduledExecutor;
171   private final Executor executor;
172   private final ObjectPool<? extends Executor> executorPool;
173   private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
174   private final ExecutorHolder balancerRpcExecutorHolder;
175   private final ExecutorHolder offloadExecutorHolder;
176   private final TimeProvider timeProvider;
177   private final int maxTraceEvents;
178 
179   @VisibleForTesting
180   final SynchronizationContext syncContext = new SynchronizationContext(
181       new Thread.UncaughtExceptionHandler() {
182         @Override
183         public void uncaughtException(Thread t, Throwable e) {
184           logger.log(
185               Level.SEVERE,
186               "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
187               e);
188           panic(e);
189         }
190       });
191 
192   private boolean fullStreamDecompression;
193 
194   private final DecompressorRegistry decompressorRegistry;
195   private final CompressorRegistry compressorRegistry;
196 
197   private final Supplier<Stopwatch> stopwatchSupplier;
198   /** The timout before entering idle mode. */
199   private final long idleTimeoutMillis;
200 
201   private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
202   private final BackoffPolicy.Provider backoffPolicyProvider;
203 
204   /**
205    * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
206    * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
207    * {@link RealChannel}.
208    */
209   private final Channel interceptorChannel;
210   @Nullable private final String userAgent;
211 
212   // Only null after channel is terminated. Must be assigned from the syncContext.
213   private NameResolver nameResolver;
214 
215   // Must be accessed from the syncContext.
216   private boolean nameResolverStarted;
217 
218   // null when channel is in idle mode.  Must be assigned from syncContext.
219   @Nullable
220   private LbHelperImpl lbHelper;
221 
222   // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
223   // null if channel is in idle mode.
224   @Nullable
225   private volatile SubchannelPicker subchannelPicker;
226 
227   // Must be accessed from the syncContext
228   private boolean panicMode;
229 
230   // Must be mutated from syncContext
231   // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
232   // switch to a ConcurrentHashMap.
233   private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
234 
235   // Must be accessed from syncContext
236   @Nullable
237   private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
238   private final Object pendingCallsInUseObject = new Object();
239 
240   // Must be mutated from syncContext
241   private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
242 
243   // reprocess() must be run from syncContext
244   private final DelayedClientTransport delayedTransport;
245   private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
246       = new UncommittedRetriableStreamsRegistry();
247 
248   // Shutdown states.
249   //
250   // Channel's shutdown process:
251   // 1. shutdown(): stop accepting new calls from applications
252   //   1a shutdown <- true
253   //   1b subchannelPicker <- null
254   //   1c delayedTransport.shutdown()
255   // 2. delayedTransport terminated: stop stream-creation functionality
256   //   2a terminating <- true
257   //   2b loadBalancer.shutdown()
258   //     * LoadBalancer will shutdown subchannels and OOB channels
259   //   2c loadBalancer <- null
260   //   2d nameResolver.shutdown()
261   //   2e nameResolver <- null
262   // 3. All subchannels and OOB channels terminated: Channel considered terminated
263 
264   private final AtomicBoolean shutdown = new AtomicBoolean(false);
265   // Must only be mutated and read from syncContext
266   private boolean shutdownNowed;
267   // Must only be mutated from syncContext
268   private boolean terminating;
269   // Must be mutated from syncContext
270   private volatile boolean terminated;
271   private final CountDownLatch terminatedLatch = new CountDownLatch(1);
272 
273   private final CallTracer.Factory callTracerFactory;
274   private final CallTracer channelCallTracer;
275   private final ChannelTracer channelTracer;
276   private final ChannelLogger channelLogger;
277   private final InternalChannelz channelz;
278   private final RealChannel realChannel;
279   // Must be mutated and read from syncContext
280   // a flag for doing channel tracing when flipped
281   private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
282   // Must be mutated and read from constructor or syncContext
283   // used for channel tracing when value changed
284   private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
285 
286   @Nullable
287   private final ManagedChannelServiceConfig defaultServiceConfig;
288   // Must be mutated and read from constructor or syncContext
289   private boolean serviceConfigUpdated = false;
290   private final boolean lookUpServiceConfig;
291 
292   // One instance per channel.
293   private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
294 
295   private final long perRpcBufferLimit;
296   private final long channelBufferLimit;
297 
298   // Temporary false flag that can skip the retry code path.
299   private final boolean retryEnabled;
300 
301   // Called from syncContext
302   private final ManagedClientTransport.Listener delayedTransportListener =
303       new DelayedTransportListener();
304 
305   // Must be called from syncContext
maybeShutdownNowSubchannels()306   private void maybeShutdownNowSubchannels() {
307     if (shutdownNowed) {
308       for (InternalSubchannel subchannel : subchannels) {
309         subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
310       }
311       for (OobChannel oobChannel : oobChannels) {
312         oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
313       }
314     }
315   }
316 
317   // Must be accessed from syncContext
318   @VisibleForTesting
319   final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
320 
321   @Override
getStats()322   public ListenableFuture<ChannelStats> getStats() {
323     final SettableFuture<ChannelStats> ret = SettableFuture.create();
324     final class StatsFetcher implements Runnable {
325       @Override
326       public void run() {
327         ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
328         channelCallTracer.updateBuilder(builder);
329         channelTracer.updateBuilder(builder);
330         builder.setTarget(target).setState(channelStateManager.getState());
331         List<InternalWithLogId> children = new ArrayList<>();
332         children.addAll(subchannels);
333         children.addAll(oobChannels);
334         builder.setSubchannels(children);
335         ret.set(builder.build());
336       }
337     }
338 
339     // subchannels and oobchannels can only be accessed from syncContext
340     syncContext.execute(new StatsFetcher());
341     return ret;
342   }
343 
344   @Override
getLogId()345   public InternalLogId getLogId() {
346     return logId;
347   }
348 
349   // Run from syncContext
350   private class IdleModeTimer implements Runnable {
351 
352     @Override
run()353     public void run() {
354       // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
355       // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
356       // subtle to change rapidly to resolve the channel panic. See #8714
357       if (lbHelper == null) {
358         return;
359       }
360       enterIdleMode();
361     }
362   }
363 
364   // Must be called from syncContext
shutdownNameResolverAndLoadBalancer(boolean channelIsActive)365   private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
366     syncContext.throwIfNotInThisSynchronizationContext();
367     if (channelIsActive) {
368       checkState(nameResolverStarted, "nameResolver is not started");
369       checkState(lbHelper != null, "lbHelper is null");
370     }
371     if (nameResolver != null) {
372       nameResolver.shutdown();
373       nameResolverStarted = false;
374       if (channelIsActive) {
375         nameResolver = getNameResolver(
376             target, authorityOverride, nameResolverFactory, nameResolverArgs);
377       } else {
378         nameResolver = null;
379       }
380     }
381     if (lbHelper != null) {
382       lbHelper.lb.shutdown();
383       lbHelper = null;
384     }
385     subchannelPicker = null;
386   }
387 
388   /**
389    * Make the channel exit idle mode, if it's in it.
390    *
391    * <p>Must be called from syncContext
392    */
393   @VisibleForTesting
exitIdleMode()394   void exitIdleMode() {
395     syncContext.throwIfNotInThisSynchronizationContext();
396     if (shutdown.get() || panicMode) {
397       return;
398     }
399     if (inUseStateAggregator.isInUse()) {
400       // Cancel the timer now, so that a racing due timer will not put Channel on idleness
401       // when the caller of exitIdleMode() is about to use the returned loadBalancer.
402       cancelIdleTimer(false);
403     } else {
404       // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
405       // isInUse() == false, in which case we still need to schedule the timer.
406       rescheduleIdleTimer();
407     }
408     if (lbHelper != null) {
409       return;
410     }
411     channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
412     LbHelperImpl lbHelper = new LbHelperImpl();
413     lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
414     // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
415     // may throw. We don't want to confuse our state, even if we will enter panic mode.
416     this.lbHelper = lbHelper;
417 
418     NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
419     nameResolver.start(listener);
420     nameResolverStarted = true;
421   }
422 
423   // Must be run from syncContext
enterIdleMode()424   private void enterIdleMode() {
425     // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
426     // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
427     // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
428     // which are bugs.
429     shutdownNameResolverAndLoadBalancer(true);
430     delayedTransport.reprocess(null);
431     channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
432     channelStateManager.gotoState(IDLE);
433     // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
434     // transport to be holding some we need to exit idle mode to give these calls a chance to
435     // be processed.
436     if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
437       exitIdleMode();
438     }
439   }
440 
441   // Must be run from syncContext
cancelIdleTimer(boolean permanent)442   private void cancelIdleTimer(boolean permanent) {
443     idleTimer.cancel(permanent);
444   }
445 
446   // Always run from syncContext
rescheduleIdleTimer()447   private void rescheduleIdleTimer() {
448     if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
449       return;
450     }
451     idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
452   }
453 
454   /**
455    * Force name resolution refresh to happen immediately. Must be run
456    * from syncContext.
457    */
refreshNameResolution()458   private void refreshNameResolution() {
459     syncContext.throwIfNotInThisSynchronizationContext();
460     if (nameResolverStarted) {
461       nameResolver.refresh();
462     }
463   }
464 
465   private final class ChannelStreamProvider implements ClientStreamProvider {
466     volatile Throttle throttle;
467 
getTransport(PickSubchannelArgs args)468     private ClientTransport getTransport(PickSubchannelArgs args) {
469       SubchannelPicker pickerCopy = subchannelPicker;
470       if (shutdown.get()) {
471         // If channel is shut down, delayedTransport is also shut down which will fail the stream
472         // properly.
473         return delayedTransport;
474       }
475       if (pickerCopy == null) {
476         final class ExitIdleModeForTransport implements Runnable {
477           @Override
478           public void run() {
479             exitIdleMode();
480           }
481         }
482 
483         syncContext.execute(new ExitIdleModeForTransport());
484         return delayedTransport;
485       }
486       // There is no need to reschedule the idle timer here.
487       //
488       // pickerCopy != null, which means idle timer has not expired when this method starts.
489       // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
490       // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
491       // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
492       //
493       // In most cases the idle timer is scheduled to fire after the transport has created the
494       // stream, which would have reported in-use state to the channel that would have cancelled
495       // the idle timer.
496       PickResult pickResult = pickerCopy.pickSubchannel(args);
497       ClientTransport transport = GrpcUtil.getTransportFromPickResult(
498           pickResult, args.getCallOptions().isWaitForReady());
499       if (transport != null) {
500         return transport;
501       }
502       return delayedTransport;
503     }
504 
505     @Override
newStream( final MethodDescriptor<?, ?> method, final CallOptions callOptions, final Metadata headers, final Context context)506     public ClientStream newStream(
507         final MethodDescriptor<?, ?> method,
508         final CallOptions callOptions,
509         final Metadata headers,
510         final Context context) {
511       if (!retryEnabled) {
512         ClientTransport transport =
513             getTransport(new PickSubchannelArgsImpl(method, headers, callOptions));
514         Context origContext = context.attach();
515         ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
516             callOptions, headers, 0, /* isTransparentRetry= */ false);
517         try {
518           return transport.newStream(method, headers, callOptions, tracers);
519         } finally {
520           context.detach(origContext);
521         }
522       } else {
523         MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
524         final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
525         final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
526         final class RetryStream<ReqT> extends RetriableStream<ReqT> {
527           @SuppressWarnings("unchecked")
528           RetryStream() {
529             super(
530                 (MethodDescriptor<ReqT, ?>) method,
531                 headers,
532                 channelBufferUsed,
533                 perRpcBufferLimit,
534                 channelBufferLimit,
535                 getCallExecutor(callOptions),
536                 transportFactory.getScheduledExecutorService(),
537                 retryPolicy,
538                 hedgingPolicy,
539                 throttle);
540           }
541 
542           @Override
543           Status prestart() {
544             return uncommittedRetriableStreamsRegistry.add(this);
545           }
546 
547           @Override
548           void postCommit() {
549             uncommittedRetriableStreamsRegistry.remove(this);
550           }
551 
552           @Override
553           ClientStream newSubstream(
554               Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
555               boolean isTransparentRetry) {
556             CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
557             ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
558                 newOptions, newHeaders, previousAttempts, isTransparentRetry);
559             ClientTransport transport =
560                 getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
561             Context origContext = context.attach();
562             try {
563               return transport.newStream(method, newHeaders, newOptions, tracers);
564             } finally {
565               context.detach(origContext);
566             }
567           }
568         }
569 
570         return new RetryStream<>();
571       }
572     }
573   }
574 
575   private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
576 
577   private final Rescheduler idleTimer;
578 
ManagedChannelImpl( ManagedChannelImplBuilder builder, ClientTransportFactory clientTransportFactory, BackoffPolicy.Provider backoffPolicyProvider, ObjectPool<? extends Executor> balancerRpcExecutorPool, Supplier<Stopwatch> stopwatchSupplier, List<ClientInterceptor> interceptors, final TimeProvider timeProvider)579   ManagedChannelImpl(
580       ManagedChannelImplBuilder builder,
581       ClientTransportFactory clientTransportFactory,
582       BackoffPolicy.Provider backoffPolicyProvider,
583       ObjectPool<? extends Executor> balancerRpcExecutorPool,
584       Supplier<Stopwatch> stopwatchSupplier,
585       List<ClientInterceptor> interceptors,
586       final TimeProvider timeProvider) {
587     this.target = checkNotNull(builder.target, "target");
588     this.logId = InternalLogId.allocate("Channel", target);
589     this.timeProvider = checkNotNull(timeProvider, "timeProvider");
590     this.executorPool = checkNotNull(builder.executorPool, "executorPool");
591     this.executor = checkNotNull(executorPool.getObject(), "executor");
592     this.originalChannelCreds = builder.channelCredentials;
593     this.originalTransportFactory = clientTransportFactory;
594     this.offloadExecutorHolder =
595         new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
596     this.transportFactory = new CallCredentialsApplyingTransportFactory(
597         clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
598     this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
599         clientTransportFactory, null, this.offloadExecutorHolder);
600     this.scheduledExecutor =
601         new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
602     maxTraceEvents = builder.maxTraceEvents;
603     channelTracer = new ChannelTracer(
604         logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
605         "Channel for '" + target + "'");
606     channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
607     ProxyDetector proxyDetector =
608         builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
609     this.retryEnabled = builder.retryEnabled;
610     this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
611     this.nameResolverRegistry = builder.nameResolverRegistry;
612     ScParser serviceConfigParser =
613         new ScParser(
614             retryEnabled,
615             builder.maxRetryAttempts,
616             builder.maxHedgedAttempts,
617             loadBalancerFactory);
618     this.authorityOverride = builder.authorityOverride;
619     this.nameResolverArgs =
620         NameResolver.Args.newBuilder()
621             .setDefaultPort(builder.getDefaultPort())
622             .setProxyDetector(proxyDetector)
623             .setSynchronizationContext(syncContext)
624             .setScheduledExecutorService(scheduledExecutor)
625             .setServiceConfigParser(serviceConfigParser)
626             .setChannelLogger(channelLogger)
627             .setOffloadExecutor(this.offloadExecutorHolder)
628             .setOverrideAuthority(this.authorityOverride)
629             .build();
630     this.nameResolverFactory = builder.nameResolverFactory;
631     this.nameResolver = getNameResolver(
632         target, authorityOverride, nameResolverFactory, nameResolverArgs);
633     this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
634     this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
635     this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
636     this.delayedTransport.start(delayedTransportListener);
637     this.backoffPolicyProvider = backoffPolicyProvider;
638 
639     if (builder.defaultServiceConfig != null) {
640       ConfigOrError parsedDefaultServiceConfig =
641           serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
642       checkState(
643           parsedDefaultServiceConfig.getError() == null,
644           "Default config is invalid: %s",
645           parsedDefaultServiceConfig.getError());
646       this.defaultServiceConfig =
647           (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
648       this.lastServiceConfig = this.defaultServiceConfig;
649     } else {
650       this.defaultServiceConfig = null;
651     }
652     this.lookUpServiceConfig = builder.lookUpServiceConfig;
653     realChannel = new RealChannel(nameResolver.getServiceAuthority());
654     Channel channel = realChannel;
655     if (builder.binlog != null) {
656       channel = builder.binlog.wrapChannel(channel);
657     }
658     this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
659     this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
660     if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
661       this.idleTimeoutMillis = builder.idleTimeoutMillis;
662     } else {
663       checkArgument(
664           builder.idleTimeoutMillis
665               >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
666           "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
667       this.idleTimeoutMillis = builder.idleTimeoutMillis;
668     }
669 
670     idleTimer = new Rescheduler(
671         new IdleModeTimer(),
672         syncContext,
673         transportFactory.getScheduledExecutorService(),
674         stopwatchSupplier.get());
675     this.fullStreamDecompression = builder.fullStreamDecompression;
676     this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
677     this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
678     this.userAgent = builder.userAgent;
679 
680     this.channelBufferLimit = builder.retryBufferSize;
681     this.perRpcBufferLimit = builder.perRpcBufferLimit;
682     final class ChannelCallTracerFactory implements CallTracer.Factory {
683       @Override
684       public CallTracer create() {
685         return new CallTracer(timeProvider);
686       }
687     }
688 
689     this.callTracerFactory = new ChannelCallTracerFactory();
690     channelCallTracer = callTracerFactory.create();
691     this.channelz = checkNotNull(builder.channelz);
692     channelz.addRootChannel(this);
693 
694     if (!lookUpServiceConfig) {
695       if (defaultServiceConfig != null) {
696         channelLogger.log(
697             ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
698       }
699       serviceConfigUpdated = true;
700     }
701   }
702 
getNameResolver( String target, NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs)703   private static NameResolver getNameResolver(
704       String target, NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) {
705     // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending
706     // "dns:///".
707     URI targetUri = null;
708     StringBuilder uriSyntaxErrors = new StringBuilder();
709     try {
710       targetUri = new URI(target);
711       // For "localhost:8080" this would likely cause newNameResolver to return null, because
712       // "localhost" is parsed as the scheme. Will fall into the next branch and try
713       // "dns:///localhost:8080".
714     } catch (URISyntaxException e) {
715       // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234.
716       uriSyntaxErrors.append(e.getMessage());
717     }
718     if (targetUri != null) {
719       NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs);
720       if (resolver != null) {
721         return resolver;
722       }
723       // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an
724       // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080"
725     }
726 
727     // If we reached here, the targetUri couldn't be used.
728     if (!URI_PATTERN.matcher(target).matches()) {
729       // It doesn't look like a URI target. Maybe it's an authority string. Try with the default
730       // scheme from the factory.
731       try {
732         targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null);
733       } catch (URISyntaxException e) {
734         // Should not be possible.
735         throw new IllegalArgumentException(e);
736       }
737       NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs);
738       if (resolver != null) {
739         return resolver;
740       }
741     }
742     throw new IllegalArgumentException(String.format(
743         "cannot find a NameResolver for %s%s",
744         target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
745   }
746 
747   @VisibleForTesting
getNameResolver( String target, @Nullable final String overrideAuthority, NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs)748   static NameResolver getNameResolver(
749       String target, @Nullable final String overrideAuthority,
750       NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) {
751     NameResolver resolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
752     if (overrideAuthority == null) {
753       return resolver;
754     }
755 
756     // If the nameResolver is not already a RetryingNameResolver, then wrap it with it.
757     // This helps guarantee that name resolution retry remains supported even as it has been
758     // removed from ManagedChannelImpl.
759     // TODO: After a transition period, all NameResolver implementations that need retry should use
760     //       RetryingNameResolver directly and this step can be removed.
761     NameResolver usedNameResolver;
762     if (resolver instanceof RetryingNameResolver) {
763       usedNameResolver = resolver;
764     } else {
765       usedNameResolver = new RetryingNameResolver(resolver,
766           new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
767               nameResolverArgs.getScheduledExecutorService(),
768               nameResolverArgs.getSynchronizationContext()),
769           nameResolverArgs.getSynchronizationContext());
770     }
771 
772     return new ForwardingNameResolver(usedNameResolver) {
773       @Override
774       public String getServiceAuthority() {
775         return overrideAuthority;
776       }
777     };
778   }
779 
780   @VisibleForTesting
781   InternalConfigSelector getConfigSelector() {
782     return realChannel.configSelector.get();
783   }
784 
785   /**
786    * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
787    * cancelled.
788    */
789   @Override
790   public ManagedChannelImpl shutdown() {
791     channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
792     if (!shutdown.compareAndSet(false, true)) {
793       return this;
794     }
795     final class Shutdown implements Runnable {
796       @Override
797       public void run() {
798         channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
799         channelStateManager.gotoState(SHUTDOWN);
800       }
801     }
802 
803     syncContext.execute(new Shutdown());
804     realChannel.shutdown();
805     final class CancelIdleTimer implements Runnable {
806       @Override
807       public void run() {
808         cancelIdleTimer(/* permanent= */ true);
809       }
810     }
811 
812     syncContext.execute(new CancelIdleTimer());
813     return this;
814   }
815 
816   /**
817    * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
818    * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
819    * return {@code false} immediately after this method returns.
820    */
821   @Override
822   public ManagedChannelImpl shutdownNow() {
823     channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
824     shutdown();
825     realChannel.shutdownNow();
826     final class ShutdownNow implements Runnable {
827       @Override
828       public void run() {
829         if (shutdownNowed) {
830           return;
831         }
832         shutdownNowed = true;
833         maybeShutdownNowSubchannels();
834       }
835     }
836 
837     syncContext.execute(new ShutdownNow());
838     return this;
839   }
840 
841   // Called from syncContext
842   @VisibleForTesting
843   void panic(final Throwable t) {
844     if (panicMode) {
845       // Preserve the first panic information
846       return;
847     }
848     panicMode = true;
849     cancelIdleTimer(/* permanent= */ true);
850     shutdownNameResolverAndLoadBalancer(false);
851     final class PanicSubchannelPicker extends SubchannelPicker {
852       private final PickResult panicPickResult =
853           PickResult.withDrop(
854               Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
855 
856       @Override
857       public PickResult pickSubchannel(PickSubchannelArgs args) {
858         return panicPickResult;
859       }
860 
861       @Override
862       public String toString() {
863         return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
864             .add("panicPickResult", panicPickResult)
865             .toString();
866       }
867     }
868 
869     updateSubchannelPicker(new PanicSubchannelPicker());
870     realChannel.updateConfigSelector(null);
871     channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
872     channelStateManager.gotoState(TRANSIENT_FAILURE);
873   }
874 
875   @VisibleForTesting
876   boolean isInPanicMode() {
877     return panicMode;
878   }
879 
880   // Called from syncContext
881   private void updateSubchannelPicker(SubchannelPicker newPicker) {
882     subchannelPicker = newPicker;
883     delayedTransport.reprocess(newPicker);
884   }
885 
886   @Override
887   public boolean isShutdown() {
888     return shutdown.get();
889   }
890 
891   @Override
892   public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
893     return terminatedLatch.await(timeout, unit);
894   }
895 
896   @Override
897   public boolean isTerminated() {
898     return terminated;
899   }
900 
901   /*
902    * Creates a new outgoing call on the channel.
903    */
904   @Override
905   public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
906       CallOptions callOptions) {
907     return interceptorChannel.newCall(method, callOptions);
908   }
909 
910   @Override
911   public String authority() {
912     return interceptorChannel.authority();
913   }
914 
915   private Executor getCallExecutor(CallOptions callOptions) {
916     Executor executor = callOptions.getExecutor();
917     if (executor == null) {
918       executor = this.executor;
919     }
920     return executor;
921   }
922 
923   private class RealChannel extends Channel {
924     // Reference to null if no config selector is available from resolution result
925     // Reference must be set() from syncContext
926     private final AtomicReference<InternalConfigSelector> configSelector =
927         new AtomicReference<>(INITIAL_PENDING_SELECTOR);
928     // Set when the NameResolver is initially created. When we create a new NameResolver for the
929     // same target, the new instance must have the same value.
930     private final String authority;
931 
932     private final Channel clientCallImplChannel = new Channel() {
933       @Override
934       public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
935           MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
936         return new ClientCallImpl<>(
937             method,
938             getCallExecutor(callOptions),
939             callOptions,
940             transportProvider,
941             terminated ? null : transportFactory.getScheduledExecutorService(),
942             channelCallTracer,
943             null)
944             .setFullStreamDecompression(fullStreamDecompression)
945             .setDecompressorRegistry(decompressorRegistry)
946             .setCompressorRegistry(compressorRegistry);
947       }
948 
949       @Override
950       public String authority() {
951         return authority;
952       }
953     };
954 
955     private RealChannel(String authority) {
956       this.authority =  checkNotNull(authority, "authority");
957     }
958 
959     @Override
960     public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
961         MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
962       if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
963         return newClientCall(method, callOptions);
964       }
965       syncContext.execute(new Runnable() {
966         @Override
967         public void run() {
968           exitIdleMode();
969         }
970       });
971       if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
972         // This is an optimization for the case (typically with InProcessTransport) when name
973         // resolution result is immediately available at this point. Otherwise, some users'
974         // tests might observe slight behavior difference from earlier grpc versions.
975         return newClientCall(method, callOptions);
976       }
977       if (shutdown.get()) {
978         // Return a failing ClientCall.
979         return new ClientCall<ReqT, RespT>() {
980           @Override
981           public void start(Listener<RespT> responseListener, Metadata headers) {
982             responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
983           }
984 
985           @Override public void request(int numMessages) {}
986 
987           @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
988 
989           @Override public void halfClose() {}
990 
991           @Override public void sendMessage(ReqT message) {}
992         };
993       }
994       Context context = Context.current();
995       final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
996       syncContext.execute(new Runnable() {
997         @Override
998         public void run() {
999           if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1000             if (pendingCalls == null) {
1001               pendingCalls = new LinkedHashSet<>();
1002               inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1003             }
1004             pendingCalls.add(pendingCall);
1005           } else {
1006             pendingCall.reprocess();
1007           }
1008         }
1009       });
1010       return pendingCall;
1011     }
1012 
1013     // Must run in SynchronizationContext.
1014     void updateConfigSelector(@Nullable InternalConfigSelector config) {
1015       InternalConfigSelector prevConfig = configSelector.get();
1016       configSelector.set(config);
1017       if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1018         for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1019           pendingCall.reprocess();
1020         }
1021       }
1022     }
1023 
1024     // Must run in SynchronizationContext.
1025     void onConfigError() {
1026       if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1027         updateConfigSelector(null);
1028       }
1029     }
1030 
1031     void shutdown() {
1032       final class RealChannelShutdown implements Runnable {
1033         @Override
1034         public void run() {
1035           if (pendingCalls == null) {
1036             if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1037               configSelector.set(null);
1038             }
1039             uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1040           }
1041         }
1042       }
1043 
1044       syncContext.execute(new RealChannelShutdown());
1045     }
1046 
1047     void shutdownNow() {
1048       final class RealChannelShutdownNow implements Runnable {
1049         @Override
1050         public void run() {
1051           if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1052             configSelector.set(null);
1053           }
1054           if (pendingCalls != null) {
1055             for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1056               pendingCall.cancel("Channel is forcefully shutdown", null);
1057             }
1058           }
1059           uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1060         }
1061       }
1062 
1063       syncContext.execute(new RealChannelShutdownNow());
1064     }
1065 
1066     @Override
1067     public String authority() {
1068       return authority;
1069     }
1070 
1071     private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
1072       final Context context;
1073       final MethodDescriptor<ReqT, RespT> method;
1074       final CallOptions callOptions;
1075 
1076       PendingCall(
1077           Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1078         super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1079         this.context = context;
1080         this.method = method;
1081         this.callOptions = callOptions;
1082       }
1083 
1084       /** Called when it's ready to create a real call and reprocess the pending call. */
1085       void reprocess() {
1086         ClientCall<ReqT, RespT> realCall;
1087         Context previous = context.attach();
1088         try {
1089           CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED, true);
1090           realCall = newClientCall(method, delayResolutionOption);
1091         } finally {
1092           context.detach(previous);
1093         }
1094         Runnable toRun = setCall(realCall);
1095         if (toRun == null) {
1096           syncContext.execute(new PendingCallRemoval());
1097         } else {
1098           getCallExecutor(callOptions).execute(new Runnable() {
1099             @Override
1100             public void run() {
1101               toRun.run();
1102               syncContext.execute(new PendingCallRemoval());
1103             }
1104           });
1105         }
1106       }
1107 
1108       @Override
1109       protected void callCancelled() {
1110         super.callCancelled();
1111         syncContext.execute(new PendingCallRemoval());
1112       }
1113 
1114       final class PendingCallRemoval implements Runnable {
1115         @Override
1116         public void run() {
1117           if (pendingCalls != null) {
1118             pendingCalls.remove(PendingCall.this);
1119             if (pendingCalls.isEmpty()) {
1120               inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1121               pendingCalls = null;
1122               if (shutdown.get()) {
1123                 uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1124               }
1125             }
1126           }
1127         }
1128       }
1129     }
1130 
1131     private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1132         MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1133       InternalConfigSelector selector = configSelector.get();
1134       if (selector == null) {
1135         return clientCallImplChannel.newCall(method, callOptions);
1136       }
1137       if (selector instanceof ServiceConfigConvertedSelector) {
1138         MethodInfo methodInfo =
1139             ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1140         if (methodInfo != null) {
1141           callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1142         }
1143         return clientCallImplChannel.newCall(method, callOptions);
1144       }
1145       return new ConfigSelectingClientCall<>(
1146           selector, clientCallImplChannel, executor, method, callOptions);
1147     }
1148   }
1149 
1150   /**
1151    * A client call for a given channel that applies a given config selector when it starts.
1152    */
1153   static final class ConfigSelectingClientCall<ReqT, RespT>
1154       extends ForwardingClientCall<ReqT, RespT> {
1155 
1156     private final InternalConfigSelector configSelector;
1157     private final Channel channel;
1158     private final Executor callExecutor;
1159     private final MethodDescriptor<ReqT, RespT> method;
1160     private final Context context;
1161     private CallOptions callOptions;
1162 
1163     private ClientCall<ReqT, RespT> delegate;
1164 
1165     ConfigSelectingClientCall(
1166         InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1167         MethodDescriptor<ReqT, RespT> method,
1168         CallOptions callOptions) {
1169       this.configSelector = configSelector;
1170       this.channel = channel;
1171       this.method = method;
1172       this.callExecutor =
1173           callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1174       this.callOptions = callOptions.withExecutor(callExecutor);
1175       this.context = Context.current();
1176     }
1177 
1178     @Override
1179     protected ClientCall<ReqT, RespT> delegate() {
1180       return delegate;
1181     }
1182 
1183     @SuppressWarnings("unchecked")
1184     @Override
1185     public void start(Listener<RespT> observer, Metadata headers) {
1186       PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
1187       InternalConfigSelector.Result result = configSelector.selectConfig(args);
1188       Status status = result.getStatus();
1189       if (!status.isOk()) {
1190         executeCloseObserverInContext(observer,
1191             GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1192         delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1193         return;
1194       }
1195       ClientInterceptor interceptor = result.getInterceptor();
1196       ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1197       MethodInfo methodInfo = config.getMethodConfig(method);
1198       if (methodInfo != null) {
1199         callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1200       }
1201       if (interceptor != null) {
1202         delegate = interceptor.interceptCall(method, callOptions, channel);
1203       } else {
1204         delegate = channel.newCall(method, callOptions);
1205       }
1206       delegate.start(observer, headers);
1207     }
1208 
1209     private void executeCloseObserverInContext(
1210         final Listener<RespT> observer, final Status status) {
1211       class CloseInContext extends ContextRunnable {
1212         CloseInContext() {
1213           super(context);
1214         }
1215 
1216         @Override
1217         public void runInContext() {
1218           observer.onClose(status, new Metadata());
1219         }
1220       }
1221 
1222       callExecutor.execute(new CloseInContext());
1223     }
1224 
1225     @Override
1226     public void cancel(@Nullable String message, @Nullable Throwable cause) {
1227       if (delegate != null) {
1228         delegate.cancel(message, cause);
1229       }
1230     }
1231   }
1232 
1233   private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1234     @Override
1235     public void start(Listener<Object> responseListener, Metadata headers) {}
1236 
1237     @Override
1238     public void request(int numMessages) {}
1239 
1240     @Override
1241     public void cancel(String message, Throwable cause) {}
1242 
1243     @Override
1244     public void halfClose() {}
1245 
1246     @Override
1247     public void sendMessage(Object message) {}
1248 
1249     // Always returns {@code false}, since this is only used when the startup of the call fails.
1250     @Override
1251     public boolean isReady() {
1252       return false;
1253     }
1254   };
1255 
1256   /**
1257    * Terminate the channel if termination conditions are met.
1258    */
1259   // Must be run from syncContext
1260   private void maybeTerminateChannel() {
1261     if (terminated) {
1262       return;
1263     }
1264     if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1265       channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1266       channelz.removeRootChannel(this);
1267       executorPool.returnObject(executor);
1268       balancerRpcExecutorHolder.release();
1269       offloadExecutorHolder.release();
1270       // Release the transport factory so that it can deallocate any resources.
1271       transportFactory.close();
1272 
1273       terminated = true;
1274       terminatedLatch.countDown();
1275     }
1276   }
1277 
1278   // Must be called from syncContext
1279   private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1280     if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1281       refreshNameResolution();
1282     }
1283   }
1284 
1285   @Override
1286   @SuppressWarnings("deprecation")
1287   public ConnectivityState getState(boolean requestConnection) {
1288     ConnectivityState savedChannelState = channelStateManager.getState();
1289     if (requestConnection && savedChannelState == IDLE) {
1290       final class RequestConnection implements Runnable {
1291         @Override
1292         public void run() {
1293           exitIdleMode();
1294           if (subchannelPicker != null) {
1295             subchannelPicker.requestConnection();
1296           }
1297           if (lbHelper != null) {
1298             lbHelper.lb.requestConnection();
1299           }
1300         }
1301       }
1302 
1303       syncContext.execute(new RequestConnection());
1304     }
1305     return savedChannelState;
1306   }
1307 
1308   @Override
1309   public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1310     final class NotifyStateChanged implements Runnable {
1311       @Override
1312       public void run() {
1313         channelStateManager.notifyWhenStateChanged(callback, executor, source);
1314       }
1315     }
1316 
1317     syncContext.execute(new NotifyStateChanged());
1318   }
1319 
1320   @Override
1321   public void resetConnectBackoff() {
1322     final class ResetConnectBackoff implements Runnable {
1323       @Override
1324       public void run() {
1325         if (shutdown.get()) {
1326           return;
1327         }
1328         if (nameResolverStarted) {
1329           refreshNameResolution();
1330         }
1331         for (InternalSubchannel subchannel : subchannels) {
1332           subchannel.resetConnectBackoff();
1333         }
1334         for (OobChannel oobChannel : oobChannels) {
1335           oobChannel.resetConnectBackoff();
1336         }
1337       }
1338     }
1339 
1340     syncContext.execute(new ResetConnectBackoff());
1341   }
1342 
1343   @Override
1344   public void enterIdle() {
1345     final class PrepareToLoseNetworkRunnable implements Runnable {
1346       @Override
1347       public void run() {
1348         if (shutdown.get() || lbHelper == null) {
1349           return;
1350         }
1351         cancelIdleTimer(/* permanent= */ false);
1352         enterIdleMode();
1353       }
1354     }
1355 
1356     syncContext.execute(new PrepareToLoseNetworkRunnable());
1357   }
1358 
1359   /**
1360    * A registry that prevents channel shutdown from killing existing retry attempts that are in
1361    * backoff.
1362    */
1363   private final class UncommittedRetriableStreamsRegistry {
1364     // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1365     // it's worthwhile to look for a lock-free approach.
1366     final Object lock = new Object();
1367 
1368     @GuardedBy("lock")
1369     Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1370 
1371     @GuardedBy("lock")
1372     Status shutdownStatus;
1373 
1374     void onShutdown(Status reason) {
1375       boolean shouldShutdownDelayedTransport = false;
1376       synchronized (lock) {
1377         if (shutdownStatus != null) {
1378           return;
1379         }
1380         shutdownStatus = reason;
1381         // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1382         // retriable streams, which may be in backoff and not using any transport, are already
1383         // started RPCs.
1384         if (uncommittedRetriableStreams.isEmpty()) {
1385           shouldShutdownDelayedTransport = true;
1386         }
1387       }
1388 
1389       if (shouldShutdownDelayedTransport) {
1390         delayedTransport.shutdown(reason);
1391       }
1392     }
1393 
1394     void onShutdownNow(Status reason) {
1395       onShutdown(reason);
1396       Collection<ClientStream> streams;
1397 
1398       synchronized (lock) {
1399         streams = new ArrayList<>(uncommittedRetriableStreams);
1400       }
1401 
1402       for (ClientStream stream : streams) {
1403         stream.cancel(reason);
1404       }
1405       delayedTransport.shutdownNow(reason);
1406     }
1407 
1408     /**
1409      * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1410      * shutdown Status.
1411      */
1412     @Nullable
1413     Status add(RetriableStream<?> retriableStream) {
1414       synchronized (lock) {
1415         if (shutdownStatus != null) {
1416           return shutdownStatus;
1417         }
1418         uncommittedRetriableStreams.add(retriableStream);
1419         return null;
1420       }
1421     }
1422 
1423     void remove(RetriableStream<?> retriableStream) {
1424       Status shutdownStatusCopy = null;
1425 
1426       synchronized (lock) {
1427         uncommittedRetriableStreams.remove(retriableStream);
1428         if (uncommittedRetriableStreams.isEmpty()) {
1429           shutdownStatusCopy = shutdownStatus;
1430           // Because retriable transport is long-lived, we take this opportunity to down-size the
1431           // hashmap.
1432           uncommittedRetriableStreams = new HashSet<>();
1433         }
1434       }
1435 
1436       if (shutdownStatusCopy != null) {
1437         delayedTransport.shutdown(shutdownStatusCopy);
1438       }
1439     }
1440   }
1441 
1442   private final class LbHelperImpl extends LoadBalancer.Helper {
1443     AutoConfiguredLoadBalancer lb;
1444 
1445     @Override
1446     public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1447       syncContext.throwIfNotInThisSynchronizationContext();
1448       // No new subchannel should be created after load balancer has been shutdown.
1449       checkState(!terminating, "Channel is being terminated");
1450       return new SubchannelImpl(args);
1451     }
1452 
1453     @Override
1454     public void updateBalancingState(
1455         final ConnectivityState newState, final SubchannelPicker newPicker) {
1456       syncContext.throwIfNotInThisSynchronizationContext();
1457       checkNotNull(newState, "newState");
1458       checkNotNull(newPicker, "newPicker");
1459       final class UpdateBalancingState implements Runnable {
1460         @Override
1461         public void run() {
1462           if (LbHelperImpl.this != lbHelper) {
1463             return;
1464           }
1465           updateSubchannelPicker(newPicker);
1466           // It's not appropriate to report SHUTDOWN state from lb.
1467           // Ignore the case of newState == SHUTDOWN for now.
1468           if (newState != SHUTDOWN) {
1469             channelLogger.log(
1470                 ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1471             channelStateManager.gotoState(newState);
1472           }
1473         }
1474       }
1475 
1476       syncContext.execute(new UpdateBalancingState());
1477     }
1478 
1479     @Override
1480     public void refreshNameResolution() {
1481       syncContext.throwIfNotInThisSynchronizationContext();
1482       final class LoadBalancerRefreshNameResolution implements Runnable {
1483         @Override
1484         public void run() {
1485           ManagedChannelImpl.this.refreshNameResolution();
1486         }
1487       }
1488 
1489       syncContext.execute(new LoadBalancerRefreshNameResolution());
1490     }
1491 
1492     @Override
1493     public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1494       return createOobChannel(Collections.singletonList(addressGroup), authority);
1495     }
1496 
1497     @Override
1498     public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1499         String authority) {
1500       // TODO(ejona): can we be even stricter? Like terminating?
1501       checkState(!terminated, "Channel is terminated");
1502       long oobChannelCreationTime = timeProvider.currentTimeNanos();
1503       InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1504       InternalLogId subchannelLogId =
1505           InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1506       ChannelTracer oobChannelTracer =
1507           new ChannelTracer(
1508               oobLogId, maxTraceEvents, oobChannelCreationTime,
1509               "OobChannel for " + addressGroup);
1510       final OobChannel oobChannel = new OobChannel(
1511           authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1512           syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1513       channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1514           .setDescription("Child OobChannel created")
1515           .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1516           .setTimestampNanos(oobChannelCreationTime)
1517           .setChannelRef(oobChannel)
1518           .build());
1519       ChannelTracer subchannelTracer =
1520           new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1521               "Subchannel for " + addressGroup);
1522       ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1523       final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1524         @Override
1525         void onTerminated(InternalSubchannel is) {
1526           oobChannels.remove(oobChannel);
1527           channelz.removeSubchannel(is);
1528           oobChannel.handleSubchannelTerminated();
1529           maybeTerminateChannel();
1530         }
1531 
1532         @Override
1533         void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1534           // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1535           //  state and refresh name resolution if necessary.
1536           handleInternalSubchannelState(newState);
1537           oobChannel.handleSubchannelStateChange(newState);
1538         }
1539       }
1540 
1541       final InternalSubchannel internalSubchannel = new InternalSubchannel(
1542           addressGroup,
1543           authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1544           oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1545           // All callback methods are run from syncContext
1546           new ManagedOobChannelCallback(),
1547           channelz,
1548           callTracerFactory.create(),
1549           subchannelTracer,
1550           subchannelLogId,
1551           subchannelLogger);
1552       oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1553           .setDescription("Child Subchannel created")
1554           .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1555           .setTimestampNanos(oobChannelCreationTime)
1556           .setSubchannelRef(internalSubchannel)
1557           .build());
1558       channelz.addSubchannel(oobChannel);
1559       channelz.addSubchannel(internalSubchannel);
1560       oobChannel.setSubchannel(internalSubchannel);
1561       final class AddOobChannel implements Runnable {
1562         @Override
1563         public void run() {
1564           if (terminating) {
1565             oobChannel.shutdown();
1566           }
1567           if (!terminated) {
1568             // If channel has not terminated, it will track the subchannel and block termination
1569             // for it.
1570             oobChannels.add(oobChannel);
1571           }
1572         }
1573       }
1574 
1575       syncContext.execute(new AddOobChannel());
1576       return oobChannel;
1577     }
1578 
1579     @Deprecated
1580     @Override
1581     public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1582       return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1583           // Override authority to keep the old behavior.
1584           // createResolvingOobChannelBuilder(String target) will be deleted soon.
1585           .overrideAuthority(getAuthority());
1586     }
1587 
1588     // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1589     // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1590     @Override
1591     public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1592         final String target, final ChannelCredentials channelCreds) {
1593       checkNotNull(channelCreds, "channelCreds");
1594 
1595       final class ResolvingOobChannelBuilder
1596           extends ForwardingChannelBuilder<ResolvingOobChannelBuilder> {
1597         final ManagedChannelBuilder<?> delegate;
1598 
1599         ResolvingOobChannelBuilder() {
1600           final ClientTransportFactory transportFactory;
1601           CallCredentials callCredentials;
1602           if (channelCreds instanceof DefaultChannelCreds) {
1603             transportFactory = originalTransportFactory;
1604             callCredentials = null;
1605           } else {
1606             SwapChannelCredentialsResult swapResult =
1607                 originalTransportFactory.swapChannelCredentials(channelCreds);
1608             if (swapResult == null) {
1609               delegate = Grpc.newChannelBuilder(target, channelCreds);
1610               return;
1611             } else {
1612               transportFactory = swapResult.transportFactory;
1613               callCredentials = swapResult.callCredentials;
1614             }
1615           }
1616           ClientTransportFactoryBuilder transportFactoryBuilder =
1617               new ClientTransportFactoryBuilder() {
1618                 @Override
1619                 public ClientTransportFactory buildClientTransportFactory() {
1620                   return transportFactory;
1621                 }
1622               };
1623           delegate = new ManagedChannelImplBuilder(
1624               target,
1625               channelCreds,
1626               callCredentials,
1627               transportFactoryBuilder,
1628               new FixedPortProvider(nameResolverArgs.getDefaultPort()));
1629         }
1630 
1631         @Override
1632         protected ManagedChannelBuilder<?> delegate() {
1633           return delegate;
1634         }
1635       }
1636 
1637       checkState(!terminated, "Channel is terminated");
1638 
1639       @SuppressWarnings("deprecation")
1640       ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder()
1641           .nameResolverFactory(nameResolverFactory);
1642 
1643       return builder
1644           // TODO(zdapeng): executors should not outlive the parent channel.
1645           .executor(executor)
1646           .offloadExecutor(offloadExecutorHolder.getExecutor())
1647           .maxTraceEvents(maxTraceEvents)
1648           .proxyDetector(nameResolverArgs.getProxyDetector())
1649           .userAgent(userAgent);
1650     }
1651 
1652     @Override
1653     public ChannelCredentials getUnsafeChannelCredentials() {
1654       if (originalChannelCreds == null) {
1655         return new DefaultChannelCreds();
1656       }
1657       return originalChannelCreds;
1658     }
1659 
1660     @Override
1661     public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1662       updateOobChannelAddresses(channel, Collections.singletonList(eag));
1663     }
1664 
1665     @Override
1666     public void updateOobChannelAddresses(ManagedChannel channel,
1667         List<EquivalentAddressGroup> eag) {
1668       checkArgument(channel instanceof OobChannel,
1669           "channel must have been returned from createOobChannel");
1670       ((OobChannel) channel).updateAddresses(eag);
1671     }
1672 
1673     @Override
1674     public String getAuthority() {
1675       return ManagedChannelImpl.this.authority();
1676     }
1677 
1678     @Override
1679     public SynchronizationContext getSynchronizationContext() {
1680       return syncContext;
1681     }
1682 
1683     @Override
1684     public ScheduledExecutorService getScheduledExecutorService() {
1685       return scheduledExecutor;
1686     }
1687 
1688     @Override
1689     public ChannelLogger getChannelLogger() {
1690       return channelLogger;
1691     }
1692 
1693     @Override
1694     public NameResolver.Args getNameResolverArgs() {
1695       return nameResolverArgs;
1696     }
1697 
1698     @Override
1699     public NameResolverRegistry getNameResolverRegistry() {
1700       return nameResolverRegistry;
1701     }
1702 
1703     /**
1704      * A placeholder for channel creds if user did not specify channel creds for the channel.
1705      */
1706     // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1707     //     channel creds.
1708     final class DefaultChannelCreds extends ChannelCredentials {
1709       @Override
1710       public ChannelCredentials withoutBearerTokens() {
1711         return this;
1712       }
1713     }
1714   }
1715 
1716   final class NameResolverListener extends NameResolver.Listener2 {
1717     final LbHelperImpl helper;
1718     final NameResolver resolver;
1719 
1720     NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1721       this.helper = checkNotNull(helperImpl, "helperImpl");
1722       this.resolver = checkNotNull(resolver, "resolver");
1723     }
1724 
1725     @Override
1726     public void onResult(final ResolutionResult resolutionResult) {
1727       final class NamesResolved implements Runnable {
1728 
1729         @SuppressWarnings("ReferenceEquality")
1730         @Override
1731         public void run() {
1732           if (ManagedChannelImpl.this.nameResolver != resolver) {
1733             return;
1734           }
1735 
1736           List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
1737           channelLogger.log(
1738               ChannelLogLevel.DEBUG,
1739               "Resolved address: {0}, config={1}",
1740               servers,
1741               resolutionResult.getAttributes());
1742 
1743           if (lastResolutionState != ResolutionState.SUCCESS) {
1744             channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
1745             lastResolutionState = ResolutionState.SUCCESS;
1746           }
1747 
1748           ConfigOrError configOrError = resolutionResult.getServiceConfig();
1749           ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1750               .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1751           InternalConfigSelector resolvedConfigSelector =
1752               resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1753           ManagedChannelServiceConfig validServiceConfig =
1754               configOrError != null && configOrError.getConfig() != null
1755                   ? (ManagedChannelServiceConfig) configOrError.getConfig()
1756                   : null;
1757           Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1758 
1759           ManagedChannelServiceConfig effectiveServiceConfig;
1760           if (!lookUpServiceConfig) {
1761             if (validServiceConfig != null) {
1762               channelLogger.log(
1763                   ChannelLogLevel.INFO,
1764                   "Service config from name resolver discarded by channel settings");
1765             }
1766             effectiveServiceConfig =
1767                 defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1768             if (resolvedConfigSelector != null) {
1769               channelLogger.log(
1770                   ChannelLogLevel.INFO,
1771                   "Config selector from name resolver discarded by channel settings");
1772             }
1773             realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1774           } else {
1775             // Try to use config if returned from name resolver
1776             // Otherwise, try to use the default config if available
1777             if (validServiceConfig != null) {
1778               effectiveServiceConfig = validServiceConfig;
1779               if (resolvedConfigSelector != null) {
1780                 realChannel.updateConfigSelector(resolvedConfigSelector);
1781                 if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1782                   channelLogger.log(
1783                       ChannelLogLevel.DEBUG,
1784                       "Method configs in service config will be discarded due to presence of"
1785                           + "config-selector");
1786                 }
1787               } else {
1788                 realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1789               }
1790             } else if (defaultServiceConfig != null) {
1791               effectiveServiceConfig = defaultServiceConfig;
1792               realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1793               channelLogger.log(
1794                   ChannelLogLevel.INFO,
1795                   "Received no service config, using default service config");
1796             } else if (serviceConfigError != null) {
1797               if (!serviceConfigUpdated) {
1798                 // First DNS lookup has invalid service config, and cannot fall back to default
1799                 channelLogger.log(
1800                     ChannelLogLevel.INFO,
1801                     "Fallback to error due to invalid first service config without default config");
1802                 // This error could be an "inappropriate" control plane error that should not bleed
1803                 // through to client code using gRPC. We let them flow through here to the LB as
1804                 // we later check for these error codes when investigating pick results in
1805                 // GrpcUtil.getTransportFromPickResult().
1806                 onError(configOrError.getError());
1807                 if (resolutionResultListener != null) {
1808                   resolutionResultListener.resolutionAttempted(false);
1809                 }
1810                 return;
1811               } else {
1812                 effectiveServiceConfig = lastServiceConfig;
1813               }
1814             } else {
1815               effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1816               realChannel.updateConfigSelector(null);
1817             }
1818             if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1819               channelLogger.log(
1820                   ChannelLogLevel.INFO,
1821                   "Service config changed{0}",
1822                   effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1823               lastServiceConfig = effectiveServiceConfig;
1824               transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1825             }
1826 
1827             try {
1828               // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
1829               //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1830               //  lbNeedAddress is not deterministic
1831               serviceConfigUpdated = true;
1832             } catch (RuntimeException re) {
1833               logger.log(
1834                   Level.WARNING,
1835                   "[" + getLogId() + "] Unexpected exception from parsing service config",
1836                   re);
1837             }
1838           }
1839 
1840           Attributes effectiveAttrs = resolutionResult.getAttributes();
1841           // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1842           if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1843             Attributes.Builder attrBuilder =
1844                 effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1845             Map<String, ?> healthCheckingConfig =
1846                 effectiveServiceConfig.getHealthCheckingConfig();
1847             if (healthCheckingConfig != null) {
1848               attrBuilder
1849                   .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1850                   .build();
1851             }
1852             Attributes attributes = attrBuilder.build();
1853 
1854             boolean lastAddressesAccepted = helper.lb.tryAcceptResolvedAddresses(
1855                 ResolvedAddresses.newBuilder()
1856                     .setAddresses(servers)
1857                     .setAttributes(attributes)
1858                     .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
1859                     .build());
1860             // If a listener is provided, let it know if the addresses were accepted.
1861             if (resolutionResultListener != null) {
1862               resolutionResultListener.resolutionAttempted(lastAddressesAccepted);
1863             }
1864           }
1865         }
1866       }
1867 
1868       syncContext.execute(new NamesResolved());
1869     }
1870 
1871     @Override
1872     public void onError(final Status error) {
1873       checkArgument(!error.isOk(), "the error status must not be OK");
1874       final class NameResolverErrorHandler implements Runnable {
1875         @Override
1876         public void run() {
1877           handleErrorInSyncContext(error);
1878         }
1879       }
1880 
1881       syncContext.execute(new NameResolverErrorHandler());
1882     }
1883 
1884     private void handleErrorInSyncContext(Status error) {
1885       logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1886           new Object[] {getLogId(), error});
1887       realChannel.onConfigError();
1888       if (lastResolutionState != ResolutionState.ERROR) {
1889         channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1890         lastResolutionState = ResolutionState.ERROR;
1891       }
1892       // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1893       if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1894         return;
1895       }
1896 
1897       helper.lb.handleNameResolutionError(error);
1898     }
1899   }
1900 
1901   private final class SubchannelImpl extends AbstractSubchannel {
1902     final CreateSubchannelArgs args;
1903     final InternalLogId subchannelLogId;
1904     final ChannelLoggerImpl subchannelLogger;
1905     final ChannelTracer subchannelTracer;
1906     List<EquivalentAddressGroup> addressGroups;
1907     InternalSubchannel subchannel;
1908     boolean started;
1909     boolean shutdown;
1910     ScheduledHandle delayedShutdownTask;
1911 
1912     SubchannelImpl(CreateSubchannelArgs args) {
1913       checkNotNull(args, "args");
1914       addressGroups = args.getAddresses();
1915       if (authorityOverride != null) {
1916         List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1917             stripOverrideAuthorityAttributes(args.getAddresses());
1918         args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1919       }
1920       this.args = args;
1921       subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1922       subchannelTracer = new ChannelTracer(
1923           subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1924           "Subchannel for " + args.getAddresses());
1925       subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1926     }
1927 
1928     @Override
1929     public void start(final SubchannelStateListener listener) {
1930       syncContext.throwIfNotInThisSynchronizationContext();
1931       checkState(!started, "already started");
1932       checkState(!shutdown, "already shutdown");
1933       checkState(!terminating, "Channel is being terminated");
1934       started = true;
1935       final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1936         // All callbacks are run in syncContext
1937         @Override
1938         void onTerminated(InternalSubchannel is) {
1939           subchannels.remove(is);
1940           channelz.removeSubchannel(is);
1941           maybeTerminateChannel();
1942         }
1943 
1944         @Override
1945         void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1946           checkState(listener != null, "listener is null");
1947           listener.onSubchannelState(newState);
1948         }
1949 
1950         @Override
1951         void onInUse(InternalSubchannel is) {
1952           inUseStateAggregator.updateObjectInUse(is, true);
1953         }
1954 
1955         @Override
1956         void onNotInUse(InternalSubchannel is) {
1957           inUseStateAggregator.updateObjectInUse(is, false);
1958         }
1959       }
1960 
1961       final InternalSubchannel internalSubchannel = new InternalSubchannel(
1962           args.getAddresses(),
1963           authority(),
1964           userAgent,
1965           backoffPolicyProvider,
1966           transportFactory,
1967           transportFactory.getScheduledExecutorService(),
1968           stopwatchSupplier,
1969           syncContext,
1970           new ManagedInternalSubchannelCallback(),
1971           channelz,
1972           callTracerFactory.create(),
1973           subchannelTracer,
1974           subchannelLogId,
1975           subchannelLogger);
1976 
1977       channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1978           .setDescription("Child Subchannel started")
1979           .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1980           .setTimestampNanos(timeProvider.currentTimeNanos())
1981           .setSubchannelRef(internalSubchannel)
1982           .build());
1983 
1984       this.subchannel = internalSubchannel;
1985       channelz.addSubchannel(internalSubchannel);
1986       subchannels.add(internalSubchannel);
1987     }
1988 
1989     @Override
1990     InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1991       checkState(started, "not started");
1992       return subchannel;
1993     }
1994 
1995     @Override
1996     public void shutdown() {
1997       syncContext.throwIfNotInThisSynchronizationContext();
1998       if (subchannel == null) {
1999         // start() was not successful
2000         shutdown = true;
2001         return;
2002       }
2003       if (shutdown) {
2004         if (terminating && delayedShutdownTask != null) {
2005           // shutdown() was previously called when terminating == false, thus a delayed shutdown()
2006           // was scheduled.  Now since terminating == true, We should expedite the shutdown.
2007           delayedShutdownTask.cancel();
2008           delayedShutdownTask = null;
2009           // Will fall through to the subchannel.shutdown() at the end.
2010         } else {
2011           return;
2012         }
2013       } else {
2014         shutdown = true;
2015       }
2016       // Add a delay to shutdown to deal with the race between 1) a transport being picked and
2017       // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
2018       // because of address change, or because LoadBalancer is shutdown by Channel entering idle
2019       // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
2020       // shutdown of Subchannel for a few seconds here.
2021       //
2022       // TODO(zhangkun83): consider a better approach
2023       // (https://github.com/grpc/grpc-java/issues/2562).
2024       if (!terminating) {
2025         final class ShutdownSubchannel implements Runnable {
2026           @Override
2027           public void run() {
2028             subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
2029           }
2030         }
2031 
2032         delayedShutdownTask = syncContext.schedule(
2033             new LogExceptionRunnable(new ShutdownSubchannel()),
2034             SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
2035             transportFactory.getScheduledExecutorService());
2036         return;
2037       }
2038       // When terminating == true, no more real streams will be created. It's safe and also
2039       // desirable to shutdown timely.
2040       subchannel.shutdown(SHUTDOWN_STATUS);
2041     }
2042 
2043     @Override
2044     public void requestConnection() {
2045       syncContext.throwIfNotInThisSynchronizationContext();
2046       checkState(started, "not started");
2047       subchannel.obtainActiveTransport();
2048     }
2049 
2050     @Override
2051     public List<EquivalentAddressGroup> getAllAddresses() {
2052       syncContext.throwIfNotInThisSynchronizationContext();
2053       checkState(started, "not started");
2054       return addressGroups;
2055     }
2056 
2057     @Override
2058     public Attributes getAttributes() {
2059       return args.getAttributes();
2060     }
2061 
2062     @Override
2063     public String toString() {
2064       return subchannelLogId.toString();
2065     }
2066 
2067     @Override
2068     public Channel asChannel() {
2069       checkState(started, "not started");
2070       return new SubchannelChannel(
2071           subchannel, balancerRpcExecutorHolder.getExecutor(),
2072           transportFactory.getScheduledExecutorService(),
2073           callTracerFactory.create(),
2074           new AtomicReference<InternalConfigSelector>(null));
2075     }
2076 
2077     @Override
2078     public Object getInternalSubchannel() {
2079       checkState(started, "Subchannel is not started");
2080       return subchannel;
2081     }
2082 
2083     @Override
2084     public ChannelLogger getChannelLogger() {
2085       return subchannelLogger;
2086     }
2087 
2088     @Override
2089     public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2090       syncContext.throwIfNotInThisSynchronizationContext();
2091       addressGroups = addrs;
2092       if (authorityOverride != null) {
2093         addrs = stripOverrideAuthorityAttributes(addrs);
2094       }
2095       subchannel.updateAddresses(addrs);
2096     }
2097 
2098     private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2099         List<EquivalentAddressGroup> eags) {
2100       List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
2101       for (EquivalentAddressGroup eag : eags) {
2102         EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
2103             eag.getAddresses(),
2104             eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
2105         eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
2106       }
2107       return Collections.unmodifiableList(eagsWithoutOverrideAttr);
2108     }
2109   }
2110 
2111   @Override
2112   public String toString() {
2113     return MoreObjects.toStringHelper(this)
2114         .add("logId", logId.getId())
2115         .add("target", target)
2116         .toString();
2117   }
2118 
2119   /**
2120    * Called from syncContext.
2121    */
2122   private final class DelayedTransportListener implements ManagedClientTransport.Listener {
2123     @Override
2124     public void transportShutdown(Status s) {
2125       checkState(shutdown.get(), "Channel must have been shut down");
2126     }
2127 
2128     @Override
2129     public void transportReady() {
2130       // Don't care
2131     }
2132 
2133     @Override
2134     public void transportInUse(final boolean inUse) {
2135       inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
2136     }
2137 
2138     @Override
2139     public void transportTerminated() {
2140       checkState(shutdown.get(), "Channel must have been shut down");
2141       terminating = true;
2142       shutdownNameResolverAndLoadBalancer(false);
2143       // No need to call channelStateManager since we are already in SHUTDOWN state.
2144       // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2145       // here.
2146       maybeShutdownNowSubchannels();
2147       maybeTerminateChannel();
2148     }
2149   }
2150 
2151   /**
2152    * Must be accessed from syncContext.
2153    */
2154   private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
2155     @Override
2156     protected void handleInUse() {
2157       exitIdleMode();
2158     }
2159 
2160     @Override
2161     protected void handleNotInUse() {
2162       if (shutdown.get()) {
2163         return;
2164       }
2165       rescheduleIdleTimer();
2166     }
2167   }
2168 
2169   /**
2170    * Lazily request for Executor from an executor pool.
2171    * Also act as an Executor directly to simply run a cmd
2172    */
2173   @VisibleForTesting
2174   static final class ExecutorHolder implements Executor {
2175     private final ObjectPool<? extends Executor> pool;
2176     private Executor executor;
2177 
2178     ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
2179       this.pool = checkNotNull(executorPool, "executorPool");
2180     }
2181 
2182     synchronized Executor getExecutor() {
2183       if (executor == null) {
2184         executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
2185       }
2186       return executor;
2187     }
2188 
2189     synchronized void release() {
2190       if (executor != null) {
2191         executor = pool.returnObject(executor);
2192       }
2193     }
2194 
2195     @Override
2196     public void execute(Runnable command) {
2197       getExecutor().execute(command);
2198     }
2199   }
2200 
2201   private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2202     final ScheduledExecutorService delegate;
2203 
2204     private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
2205       this.delegate = checkNotNull(delegate, "delegate");
2206     }
2207 
2208     @Override
2209     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2210       return delegate.schedule(callable, delay, unit);
2211     }
2212 
2213     @Override
2214     public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2215       return delegate.schedule(cmd, delay, unit);
2216     }
2217 
2218     @Override
2219     public ScheduledFuture<?> scheduleAtFixedRate(
2220         Runnable command, long initialDelay, long period, TimeUnit unit) {
2221       return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
2222     }
2223 
2224     @Override
2225     public ScheduledFuture<?> scheduleWithFixedDelay(
2226         Runnable command, long initialDelay, long delay, TimeUnit unit) {
2227       return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
2228     }
2229 
2230     @Override
2231     public boolean awaitTermination(long timeout, TimeUnit unit)
2232         throws InterruptedException {
2233       return delegate.awaitTermination(timeout, unit);
2234     }
2235 
2236     @Override
2237     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2238         throws InterruptedException {
2239       return delegate.invokeAll(tasks);
2240     }
2241 
2242     @Override
2243     public <T> List<Future<T>> invokeAll(
2244         Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2245         throws InterruptedException {
2246       return delegate.invokeAll(tasks, timeout, unit);
2247     }
2248 
2249     @Override
2250     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2251         throws InterruptedException, ExecutionException {
2252       return delegate.invokeAny(tasks);
2253     }
2254 
2255     @Override
2256     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2257         throws InterruptedException, ExecutionException, TimeoutException {
2258       return delegate.invokeAny(tasks, timeout, unit);
2259     }
2260 
2261     @Override
2262     public boolean isShutdown() {
2263       return delegate.isShutdown();
2264     }
2265 
2266     @Override
2267     public boolean isTerminated() {
2268       return delegate.isTerminated();
2269     }
2270 
2271     @Override
2272     public void shutdown() {
2273       throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
2274     }
2275 
2276     @Override
2277     public List<Runnable> shutdownNow() {
2278       throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
2279     }
2280 
2281     @Override
2282     public <T> Future<T> submit(Callable<T> task) {
2283       return delegate.submit(task);
2284     }
2285 
2286     @Override
2287     public Future<?> submit(Runnable task) {
2288       return delegate.submit(task);
2289     }
2290 
2291     @Override
2292     public <T> Future<T> submit(Runnable task, T result) {
2293       return delegate.submit(task, result);
2294     }
2295 
2296     @Override
2297     public void execute(Runnable command) {
2298       delegate.execute(command);
2299     }
2300   }
2301 
2302   /**
2303    * A ResolutionState indicates the status of last name resolution.
2304    */
2305   enum ResolutionState {
2306     NO_RESOLUTION,
2307     SUCCESS,
2308     ERROR
2309   }
2310 }
2311