• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static io.grpc.ConnectivityState.IDLE;
23 import static io.grpc.ConnectivityState.SHUTDOWN;
24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25 import static io.grpc.internal.ServiceConfigInterceptor.HEDGING_POLICY_KEY;
26 import static io.grpc.internal.ServiceConfigInterceptor.RETRY_POLICY_KEY;
27 
28 import com.google.common.annotations.VisibleForTesting;
29 import com.google.common.base.MoreObjects;
30 import com.google.common.base.Stopwatch;
31 import com.google.common.base.Supplier;
32 import com.google.common.util.concurrent.ListenableFuture;
33 import com.google.common.util.concurrent.SettableFuture;
34 import io.grpc.Attributes;
35 import io.grpc.CallOptions;
36 import io.grpc.Channel;
37 import io.grpc.ClientCall;
38 import io.grpc.ClientInterceptor;
39 import io.grpc.ClientInterceptors;
40 import io.grpc.ClientStreamTracer;
41 import io.grpc.CompressorRegistry;
42 import io.grpc.ConnectivityState;
43 import io.grpc.ConnectivityStateInfo;
44 import io.grpc.Context;
45 import io.grpc.DecompressorRegistry;
46 import io.grpc.EquivalentAddressGroup;
47 import io.grpc.InternalChannelz;
48 import io.grpc.InternalChannelz.ChannelStats;
49 import io.grpc.InternalChannelz.ChannelTrace;
50 import io.grpc.InternalInstrumented;
51 import io.grpc.InternalLogId;
52 import io.grpc.InternalWithLogId;
53 import io.grpc.LoadBalancer;
54 import io.grpc.LoadBalancer.PickResult;
55 import io.grpc.LoadBalancer.PickSubchannelArgs;
56 import io.grpc.LoadBalancer.SubchannelPicker;
57 import io.grpc.ManagedChannel;
58 import io.grpc.Metadata;
59 import io.grpc.MethodDescriptor;
60 import io.grpc.NameResolver;
61 import io.grpc.Status;
62 import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
63 import io.grpc.internal.RetriableStream.ChannelBufferMeter;
64 import io.grpc.internal.RetriableStream.Throttle;
65 import java.net.URI;
66 import java.net.URISyntaxException;
67 import java.util.ArrayList;
68 import java.util.Collection;
69 import java.util.Collections;
70 import java.util.HashSet;
71 import java.util.List;
72 import java.util.Map;
73 import java.util.Set;
74 import java.util.concurrent.CountDownLatch;
75 import java.util.concurrent.Executor;
76 import java.util.concurrent.ScheduledFuture;
77 import java.util.concurrent.TimeUnit;
78 import java.util.concurrent.atomic.AtomicBoolean;
79 import java.util.logging.Level;
80 import java.util.logging.Logger;
81 import java.util.regex.Pattern;
82 import javax.annotation.CheckForNull;
83 import javax.annotation.Nullable;
84 import javax.annotation.concurrent.GuardedBy;
85 import javax.annotation.concurrent.ThreadSafe;
86 
87 /** A communication channel for making outgoing RPCs. */
88 @ThreadSafe
89 final class ManagedChannelImpl extends ManagedChannel implements
90     InternalInstrumented<ChannelStats> {
91   static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
92 
93   // Matching this pattern means the target string is a URI target or at least intended to be one.
94   // A URI target must be an absolute hierarchical URI.
95   // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
96   @VisibleForTesting
97   static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
98 
99   static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
100 
101   @VisibleForTesting
102   static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
103 
104   @VisibleForTesting
105   static final Status SHUTDOWN_NOW_STATUS =
106       Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
107 
108   @VisibleForTesting
109   static final Status SHUTDOWN_STATUS =
110       Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
111 
112   @VisibleForTesting
113   static final Status SUBCHANNEL_SHUTDOWN_STATUS =
114       Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
115 
116   private final InternalLogId logId = InternalLogId.allocate(getClass().getName());
117   private final String target;
118   private final NameResolver.Factory nameResolverFactory;
119   private final Attributes nameResolverParams;
120   private final LoadBalancer.Factory loadBalancerFactory;
121   private final ClientTransportFactory transportFactory;
122   private final Executor executor;
123   private final ObjectPool<? extends Executor> executorPool;
124   private final ObjectPool<? extends Executor> oobExecutorPool;
125   private final TimeProvider timeProvider;
126   private final int maxTraceEvents;
127 
128   private final ChannelExecutor channelExecutor = new ChannelExecutor() {
129       @Override
130       void handleUncaughtThrowable(Throwable t) {
131         super.handleUncaughtThrowable(t);
132         panic(t);
133       }
134     };
135 
136   private boolean fullStreamDecompression;
137 
138   private final DecompressorRegistry decompressorRegistry;
139   private final CompressorRegistry compressorRegistry;
140 
141   private final Supplier<Stopwatch> stopwatchSupplier;
142   /** The timout before entering idle mode. */
143   private final long idleTimeoutMillis;
144 
145   private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
146 
147   private final ServiceConfigInterceptor serviceConfigInterceptor;
148 
149   private final BackoffPolicy.Provider backoffPolicyProvider;
150 
151   /**
152    * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
153    * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
154    * {@link RealChannel}.
155    */
156   private final Channel interceptorChannel;
157   @Nullable private final String userAgent;
158 
159   // Only null after channel is terminated. Must be assigned from the channelExecutor.
160   private NameResolver nameResolver;
161 
162   // Must be accessed from the channelExecutor.
163   private boolean nameResolverStarted;
164 
165   // null when channel is in idle mode.  Must be assigned from channelExecutor.
166   @Nullable
167   private LbHelperImpl lbHelper;
168 
169   // Must ONLY be assigned from updateSubchannelPicker(), which is called from channelExecutor.
170   // null if channel is in idle mode.
171   @Nullable
172   private volatile SubchannelPicker subchannelPicker;
173 
174   // Must be accessed from the channelExecutor
175   private boolean panicMode;
176 
177   // Must be mutated from channelExecutor
178   // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
179   // switch to a ConcurrentHashMap.
180   private final Set<InternalSubchannel> subchannels = new HashSet<InternalSubchannel>(16, .75f);
181 
182   // Must be mutated from channelExecutor
183   private final Set<OobChannel> oobChannels = new HashSet<OobChannel>(1, .75f);
184 
185   // reprocess() must be run from channelExecutor
186   private final DelayedClientTransport delayedTransport;
187   private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
188       = new UncommittedRetriableStreamsRegistry();
189 
190   // Shutdown states.
191   //
192   // Channel's shutdown process:
193   // 1. shutdown(): stop accepting new calls from applications
194   //   1a shutdown <- true
195   //   1b subchannelPicker <- null
196   //   1c delayedTransport.shutdown()
197   // 2. delayedTransport terminated: stop stream-creation functionality
198   //   2a terminating <- true
199   //   2b loadBalancer.shutdown()
200   //     * LoadBalancer will shutdown subchannels and OOB channels
201   //   2c loadBalancer <- null
202   //   2d nameResolver.shutdown()
203   //   2e nameResolver <- null
204   // 3. All subchannels and OOB channels terminated: Channel considered terminated
205 
206   private final AtomicBoolean shutdown = new AtomicBoolean(false);
207   // Must only be mutated and read from channelExecutor
208   private boolean shutdownNowed;
209   // Must be mutated from channelExecutor
210   private volatile boolean terminating;
211   // Must be mutated from channelExecutor
212   private volatile boolean terminated;
213   private final CountDownLatch terminatedLatch = new CountDownLatch(1);
214 
215   private final CallTracer.Factory callTracerFactory;
216   private final CallTracer channelCallTracer;
217   @CheckForNull
218   private final ChannelTracer channelTracer;
219   private final InternalChannelz channelz;
220   @CheckForNull
221   private Boolean haveBackends; // a flag for doing channel tracing when flipped
222   @Nullable
223   private Map<String, Object> lastServiceConfig; // used for channel tracing when value changed
224 
225   // One instance per channel.
226   private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
227 
228   @Nullable
229   private Throttle throttle;
230 
231   private final long perRpcBufferLimit;
232   private final long channelBufferLimit;
233 
234   // Temporary false flag that can skip the retry code path.
235   private final boolean retryEnabled;
236 
237   // Called from channelExecutor
238   private final ManagedClientTransport.Listener delayedTransportListener =
239       new ManagedClientTransport.Listener() {
240         @Override
241         public void transportShutdown(Status s) {
242           checkState(shutdown.get(), "Channel must have been shut down");
243         }
244 
245         @Override
246         public void transportReady() {
247           // Don't care
248         }
249 
250         @Override
251         public void transportInUse(final boolean inUse) {
252           inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
253         }
254 
255         @Override
256         public void transportTerminated() {
257           checkState(shutdown.get(), "Channel must have been shut down");
258           terminating = true;
259           shutdownNameResolverAndLoadBalancer(false);
260           // No need to call channelStateManager since we are already in SHUTDOWN state.
261           // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
262           // here.
263           maybeShutdownNowSubchannels();
264           maybeTerminateChannel();
265         }
266       };
267 
268   // Must be called from channelExecutor
maybeShutdownNowSubchannels()269   private void maybeShutdownNowSubchannels() {
270     if (shutdownNowed) {
271       for (InternalSubchannel subchannel : subchannels) {
272         subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
273       }
274       for (OobChannel oobChannel : oobChannels) {
275         oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
276       }
277     }
278   }
279 
280   // Must be accessed from channelExecutor
281   @VisibleForTesting
282   final InUseStateAggregator<Object> inUseStateAggregator =
283       new InUseStateAggregator<Object>() {
284         @Override
285         void handleInUse() {
286           exitIdleMode();
287         }
288 
289         @Override
290         void handleNotInUse() {
291           if (shutdown.get()) {
292             return;
293           }
294           rescheduleIdleTimer();
295         }
296       };
297 
298   @Override
getStats()299   public ListenableFuture<ChannelStats> getStats() {
300     final SettableFuture<ChannelStats> ret = SettableFuture.create();
301     // subchannels and oobchannels can only be accessed from channelExecutor
302     channelExecutor.executeLater(new Runnable() {
303       @Override
304       public void run() {
305         ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
306         channelCallTracer.updateBuilder(builder);
307         if (channelTracer != null) {
308           channelTracer.updateBuilder(builder);
309         }
310         builder.setTarget(target).setState(channelStateManager.getState());
311         List<InternalWithLogId> children = new ArrayList<>();
312         children.addAll(subchannels);
313         children.addAll(oobChannels);
314         builder.setSubchannels(children);
315         ret.set(builder.build());
316       }
317     }).drain();
318     return ret;
319   }
320 
321   @Override
getLogId()322   public InternalLogId getLogId() {
323     return logId;
324   }
325 
326   // Run from channelExecutor
327   private class IdleModeTimer implements Runnable {
328 
329     @Override
run()330     public void run() {
331       enterIdleMode();
332     }
333   }
334 
335   // Must be called from channelExecutor
shutdownNameResolverAndLoadBalancer(boolean verifyActive)336   private void shutdownNameResolverAndLoadBalancer(boolean verifyActive) {
337     if (verifyActive) {
338       checkState(nameResolver != null, "nameResolver is null");
339       checkState(lbHelper != null, "lbHelper is null");
340     }
341     if (nameResolver != null) {
342       cancelNameResolverBackoff();
343       nameResolver.shutdown();
344       nameResolver = null;
345       nameResolverStarted = false;
346     }
347     if (lbHelper != null) {
348       lbHelper.lb.shutdown();
349       lbHelper = null;
350     }
351     subchannelPicker = null;
352   }
353 
354   /**
355    * Make the channel exit idle mode, if it's in it.
356    *
357    * <p>Must be called from channelExecutor
358    */
359   @VisibleForTesting
exitIdleMode()360   void exitIdleMode() {
361     if (shutdown.get() || panicMode) {
362       return;
363     }
364     if (inUseStateAggregator.isInUse()) {
365       // Cancel the timer now, so that a racing due timer will not put Channel on idleness
366       // when the caller of exitIdleMode() is about to use the returned loadBalancer.
367       cancelIdleTimer(false);
368     } else {
369       // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
370       // isInUse() == false, in which case we still need to schedule the timer.
371       rescheduleIdleTimer();
372     }
373     if (lbHelper != null) {
374       return;
375     }
376     logger.log(Level.FINE, "[{0}] Exiting idle mode", getLogId());
377     lbHelper = new LbHelperImpl(nameResolver);
378     lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
379 
380     NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper);
381     try {
382       nameResolver.start(listener);
383       nameResolverStarted = true;
384     } catch (Throwable t) {
385       listener.onError(Status.fromThrowable(t));
386     }
387   }
388 
389   // Must be run from channelExecutor
enterIdleMode()390   private void enterIdleMode() {
391     logger.log(Level.FINE, "[{0}] Entering idle mode", getLogId());
392     // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
393     // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
394     // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
395     // which are bugs.
396     shutdownNameResolverAndLoadBalancer(true);
397     delayedTransport.reprocess(null);
398     nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
399     if (channelTracer != null) {
400       channelTracer.reportEvent(
401           new ChannelTrace.Event.Builder()
402               .setDescription("Entering IDLE state")
403               .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
404               .setTimestampNanos(timeProvider.currentTimeNanos())
405               .build());
406     }
407     channelStateManager.gotoState(IDLE);
408     if (inUseStateAggregator.isInUse()) {
409       exitIdleMode();
410     }
411   }
412 
413   // Must be run from channelExecutor
cancelIdleTimer(boolean permanent)414   private void cancelIdleTimer(boolean permanent) {
415     idleTimer.cancel(permanent);
416   }
417 
418   // Always run from channelExecutor
rescheduleIdleTimer()419   private void rescheduleIdleTimer() {
420     if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
421       return;
422     }
423     idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
424   }
425 
426   // Run from channelExecutor
427   @VisibleForTesting
428   class NameResolverRefresh implements Runnable {
429     // Only mutated from channelExecutor
430     boolean cancelled;
431 
432     @Override
run()433     public void run() {
434       if (cancelled) {
435         // Race detected: this task was scheduled on channelExecutor before
436         // cancelNameResolverBackoff() could cancel the timer.
437         return;
438       }
439       nameResolverRefreshFuture = null;
440       nameResolverRefresh = null;
441       if (nameResolver != null) {
442         nameResolver.refresh();
443       }
444     }
445   }
446 
447   // Must be used from channelExecutor
448   @Nullable private ScheduledFuture<?> nameResolverRefreshFuture;
449   // Must be used from channelExecutor
450   @Nullable private NameResolverRefresh nameResolverRefresh;
451   // The policy to control backoff between name resolution attempts. Non-null when an attempt is
452   // scheduled. Must be used from channelExecutor
453   @Nullable private BackoffPolicy nameResolverBackoffPolicy;
454 
455   // Must be run from channelExecutor
cancelNameResolverBackoff()456   private void cancelNameResolverBackoff() {
457     if (nameResolverRefreshFuture != null) {
458       nameResolverRefreshFuture.cancel(false);
459       nameResolverRefresh.cancelled = true;
460       nameResolverRefreshFuture = null;
461       nameResolverRefresh = null;
462       nameResolverBackoffPolicy = null;
463     }
464   }
465 
466   private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
467     @Override
468     public ClientTransport get(PickSubchannelArgs args) {
469       SubchannelPicker pickerCopy = subchannelPicker;
470       if (shutdown.get()) {
471         // If channel is shut down, delayedTransport is also shut down which will fail the stream
472         // properly.
473         return delayedTransport;
474       }
475       if (pickerCopy == null) {
476         channelExecutor.executeLater(new Runnable() {
477             @Override
478             public void run() {
479               exitIdleMode();
480             }
481           }).drain();
482         return delayedTransport;
483       }
484       // There is no need to reschedule the idle timer here.
485       //
486       // pickerCopy != null, which means idle timer has not expired when this method starts.
487       // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
488       // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
489       // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
490       //
491       // In most cases the idle timer is scheduled to fire after the transport has created the
492       // stream, which would have reported in-use state to the channel that would have cancelled
493       // the idle timer.
494       PickResult pickResult = pickerCopy.pickSubchannel(args);
495       ClientTransport transport = GrpcUtil.getTransportFromPickResult(
496           pickResult, args.getCallOptions().isWaitForReady());
497       if (transport != null) {
498         return transport;
499       }
500       return delayedTransport;
501     }
502 
503     @Override
504     public <ReqT> RetriableStream<ReqT> newRetriableStream(
505         final MethodDescriptor<ReqT, ?> method,
506         final CallOptions callOptions,
507         final Metadata headers,
508         final Context context) {
509       checkState(retryEnabled, "retry should be enabled");
510       return new RetriableStream<ReqT>(
511           method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit,
512           getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(),
513           callOptions.getOption(RETRY_POLICY_KEY), callOptions.getOption(HEDGING_POLICY_KEY),
514           throttle) {
515         @Override
516         Status prestart() {
517           return uncommittedRetriableStreamsRegistry.add(this);
518         }
519 
520         @Override
521         void postCommit() {
522           uncommittedRetriableStreamsRegistry.remove(this);
523         }
524 
525         @Override
526         ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata newHeaders) {
527           CallOptions newOptions = callOptions.withStreamTracerFactory(tracerFactory);
528           ClientTransport transport =
529               get(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
530           Context origContext = context.attach();
531           try {
532             return transport.newStream(method, newHeaders, newOptions);
533           } finally {
534             context.detach(origContext);
535           }
536         }
537       };
538     }
539   };
540 
541   private final Rescheduler idleTimer;
542 
ManagedChannelImpl( AbstractManagedChannelImplBuilder<?> builder, ClientTransportFactory clientTransportFactory, BackoffPolicy.Provider backoffPolicyProvider, ObjectPool<? extends Executor> oobExecutorPool, Supplier<Stopwatch> stopwatchSupplier, List<ClientInterceptor> interceptors, final TimeProvider timeProvider)543   ManagedChannelImpl(
544       AbstractManagedChannelImplBuilder<?> builder,
545       ClientTransportFactory clientTransportFactory,
546       BackoffPolicy.Provider backoffPolicyProvider,
547       ObjectPool<? extends Executor> oobExecutorPool,
548       Supplier<Stopwatch> stopwatchSupplier,
549       List<ClientInterceptor> interceptors,
550       final TimeProvider timeProvider) {
551     this.target = checkNotNull(builder.target, "target");
552     this.nameResolverFactory = builder.getNameResolverFactory();
553     this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams");
554     this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
555     if (builder.loadBalancerFactory == null) {
556       this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory();
557     } else {
558       this.loadBalancerFactory = builder.loadBalancerFactory;
559     }
560     this.executorPool = checkNotNull(builder.executorPool, "executorPool");
561     this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool");
562     this.executor = checkNotNull(executorPool.getObject(), "executor");
563     this.delayedTransport = new DelayedClientTransport(this.executor, this.channelExecutor);
564     this.delayedTransport.start(delayedTransportListener);
565     this.backoffPolicyProvider = backoffPolicyProvider;
566     this.transportFactory =
567         new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
568     this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
569     serviceConfigInterceptor = new ServiceConfigInterceptor(
570         retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
571     Channel channel = new RealChannel();
572     channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor);
573     if (builder.binlog != null) {
574       channel = builder.binlog.wrapChannel(channel);
575     }
576     this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
577     this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
578     if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
579       this.idleTimeoutMillis = builder.idleTimeoutMillis;
580     } else {
581       checkArgument(
582           builder.idleTimeoutMillis
583               >= AbstractManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
584           "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
585       this.idleTimeoutMillis = builder.idleTimeoutMillis;
586     }
587 
588     final class AutoDrainChannelExecutor implements Executor {
589 
590       @Override
591       public void execute(Runnable command) {
592         channelExecutor.executeLater(command);
593         channelExecutor.drain();
594       }
595     }
596 
597     idleTimer = new Rescheduler(
598         new IdleModeTimer(),
599         new AutoDrainChannelExecutor(),
600         transportFactory.getScheduledExecutorService(),
601         stopwatchSupplier.get());
602     this.fullStreamDecompression = builder.fullStreamDecompression;
603     this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
604     this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
605     this.userAgent = builder.userAgent;
606 
607     this.channelBufferLimit = builder.retryBufferSize;
608     this.perRpcBufferLimit = builder.perRpcBufferLimit;
609     this.timeProvider = checkNotNull(timeProvider, "timeProvider");
610     this.callTracerFactory = new CallTracer.Factory() {
611       @Override
612       public CallTracer create() {
613         return new CallTracer(timeProvider);
614       }
615     };
616     channelCallTracer = callTracerFactory.create();
617     this.channelz = checkNotNull(builder.channelz);
618     channelz.addRootChannel(this);
619 
620     maxTraceEvents = builder.maxTraceEvents;
621     if (maxTraceEvents > 0) {
622       long currentTimeNanos = timeProvider.currentTimeNanos();
623       channelTracer = new ChannelTracer(builder.maxTraceEvents, currentTimeNanos, "Channel");
624     } else {
625       channelTracer = null;
626     }
627     logger.log(Level.FINE, "[{0}] Created with target {1}", new Object[] {getLogId(), target});
628   }
629 
630   @VisibleForTesting
getNameResolver(String target, NameResolver.Factory nameResolverFactory, Attributes nameResolverParams)631   static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory,
632       Attributes nameResolverParams) {
633     // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending
634     // "dns:///".
635     URI targetUri = null;
636     StringBuilder uriSyntaxErrors = new StringBuilder();
637     try {
638       targetUri = new URI(target);
639       // For "localhost:8080" this would likely cause newNameResolver to return null, because
640       // "localhost" is parsed as the scheme. Will fall into the next branch and try
641       // "dns:///localhost:8080".
642     } catch (URISyntaxException e) {
643       // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234.
644       uriSyntaxErrors.append(e.getMessage());
645     }
646     if (targetUri != null) {
647       NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams);
648       if (resolver != null) {
649         return resolver;
650       }
651       // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an
652       // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080"
653     }
654 
655     // If we reached here, the targetUri couldn't be used.
656     if (!URI_PATTERN.matcher(target).matches()) {
657       // It doesn't look like a URI target. Maybe it's an authority string. Try with the default
658       // scheme from the factory.
659       try {
660         targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null);
661       } catch (URISyntaxException e) {
662         // Should not be possible.
663         throw new IllegalArgumentException(e);
664       }
665       NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams);
666       if (resolver != null) {
667         return resolver;
668       }
669     }
670     throw new IllegalArgumentException(String.format(
671         "cannot find a NameResolver for %s%s",
672         target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
673   }
674 
675   /**
676    * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
677    * cancelled.
678    */
679   @Override
shutdown()680   public ManagedChannelImpl shutdown() {
681     logger.log(Level.FINE, "[{0}] shutdown() called", getLogId());
682     if (!shutdown.compareAndSet(false, true)) {
683       return this;
684     }
685 
686     // Put gotoState(SHUTDOWN) as early into the channelExecutor's queue as possible.
687     // delayedTransport.shutdown() may also add some tasks into the queue. But some things inside
688     // delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the
689     // channelExecutor's queue and should not be blocked, so we do not drain() immediately here.
690     channelExecutor.executeLater(new Runnable() {
691       @Override
692       public void run() {
693         if (channelTracer != null) {
694           channelTracer.reportEvent(new ChannelTrace.Event.Builder()
695               .setDescription("Entering SHUTDOWN state")
696               .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
697               .setTimestampNanos(timeProvider.currentTimeNanos())
698               .build());
699         }
700         channelStateManager.gotoState(SHUTDOWN);
701       }
702     });
703 
704     uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
705     channelExecutor.executeLater(new Runnable() {
706         @Override
707         public void run() {
708           cancelIdleTimer(/* permanent= */ true);
709         }
710       }).drain();
711     logger.log(Level.FINE, "[{0}] Shutting down", getLogId());
712     return this;
713   }
714 
715   /**
716    * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
717    * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
718    * return {@code false} immediately after this method returns.
719    */
720   @Override
shutdownNow()721   public ManagedChannelImpl shutdownNow() {
722     logger.log(Level.FINE, "[{0}] shutdownNow() called", getLogId());
723     shutdown();
724     uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
725     channelExecutor.executeLater(new Runnable() {
726         @Override
727         public void run() {
728           if (shutdownNowed) {
729             return;
730           }
731           shutdownNowed = true;
732           maybeShutdownNowSubchannels();
733         }
734       }).drain();
735     return this;
736   }
737 
738   // Called from channelExecutor
739   @VisibleForTesting
panic(final Throwable t)740   void panic(final Throwable t) {
741     if (panicMode) {
742       // Preserve the first panic information
743       return;
744     }
745     panicMode = true;
746     cancelIdleTimer(/* permanent= */ true);
747     shutdownNameResolverAndLoadBalancer(false);
748     SubchannelPicker newPicker = new SubchannelPicker() {
749       final PickResult panicPickResult =
750           PickResult.withDrop(
751               Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
752       @Override
753       public PickResult pickSubchannel(PickSubchannelArgs args) {
754         return panicPickResult;
755       }
756     };
757     updateSubchannelPicker(newPicker);
758     if (channelTracer != null) {
759       channelTracer.reportEvent(
760           new ChannelTrace.Event.Builder()
761               .setDescription("Entering TRANSIENT_FAILURE state")
762               .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
763               .setTimestampNanos(timeProvider.currentTimeNanos())
764               .build());
765     }
766     channelStateManager.gotoState(TRANSIENT_FAILURE);
767   }
768 
769   // Called from channelExecutor
updateSubchannelPicker(SubchannelPicker newPicker)770   private void updateSubchannelPicker(SubchannelPicker newPicker) {
771     subchannelPicker = newPicker;
772     delayedTransport.reprocess(newPicker);
773   }
774 
775   @Override
isShutdown()776   public boolean isShutdown() {
777     return shutdown.get();
778   }
779 
780   @Override
awaitTermination(long timeout, TimeUnit unit)781   public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
782     return terminatedLatch.await(timeout, unit);
783   }
784 
785   @Override
isTerminated()786   public boolean isTerminated() {
787     return terminated;
788   }
789 
790   /*
791    * Creates a new outgoing call on the channel.
792    */
793   @Override
newCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions)794   public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
795       CallOptions callOptions) {
796     return interceptorChannel.newCall(method, callOptions);
797   }
798 
799   @Override
authority()800   public String authority() {
801     return interceptorChannel.authority();
802   }
803 
getCallExecutor(CallOptions callOptions)804   private Executor getCallExecutor(CallOptions callOptions) {
805     Executor executor = callOptions.getExecutor();
806     if (executor == null) {
807       executor = this.executor;
808     }
809     return executor;
810   }
811 
812   private class RealChannel extends Channel {
813     @Override
newCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions)814     public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
815         CallOptions callOptions) {
816       return new ClientCallImpl<ReqT, RespT>(
817               method,
818               getCallExecutor(callOptions),
819               callOptions,
820               transportProvider,
821               terminated ? null : transportFactory.getScheduledExecutorService(),
822               channelCallTracer,
823               retryEnabled)
824           .setFullStreamDecompression(fullStreamDecompression)
825           .setDecompressorRegistry(decompressorRegistry)
826           .setCompressorRegistry(compressorRegistry);
827     }
828 
829     @Override
authority()830     public String authority() {
831       String authority = nameResolver.getServiceAuthority();
832       return checkNotNull(authority, "authority");
833     }
834   }
835 
836   /**
837    * Terminate the channel if termination conditions are met.
838    */
839   // Must be run from channelExecutor
maybeTerminateChannel()840   private void maybeTerminateChannel() {
841     if (terminated) {
842       return;
843     }
844     if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
845       logger.log(Level.FINE, "[{0}] Terminated", getLogId());
846       channelz.removeRootChannel(this);
847       terminated = true;
848       terminatedLatch.countDown();
849       executorPool.returnObject(executor);
850       // Release the transport factory so that it can deallocate any resources.
851       transportFactory.close();
852     }
853   }
854 
855   @Override
getState(boolean requestConnection)856   public ConnectivityState getState(boolean requestConnection) {
857     ConnectivityState savedChannelState = channelStateManager.getState();
858     if (requestConnection && savedChannelState == IDLE) {
859       channelExecutor.executeLater(
860           new Runnable() {
861             @Override
862             public void run() {
863               exitIdleMode();
864               if (subchannelPicker != null) {
865                 subchannelPicker.requestConnection();
866               }
867             }
868           }).drain();
869     }
870     return savedChannelState;
871   }
872 
873   @Override
notifyWhenStateChanged(final ConnectivityState source, final Runnable callback)874   public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
875     channelExecutor.executeLater(
876         new Runnable() {
877           @Override
878           public void run() {
879             channelStateManager.notifyWhenStateChanged(callback, executor, source);
880           }
881         }).drain();
882   }
883 
884   @Override
resetConnectBackoff()885   public void resetConnectBackoff() {
886     channelExecutor
887         .executeLater(
888             new Runnable() {
889               @Override
890               public void run() {
891                 if (shutdown.get()) {
892                   return;
893                 }
894                 if (nameResolverRefreshFuture != null) {
895                   checkState(nameResolverStarted, "name resolver must be started");
896                   cancelNameResolverBackoff();
897                   nameResolver.refresh();
898                 }
899                 for (InternalSubchannel subchannel : subchannels) {
900                   subchannel.resetConnectBackoff();
901                 }
902                 for (OobChannel oobChannel : oobChannels) {
903                   oobChannel.resetConnectBackoff();
904                 }
905               }
906             })
907         .drain();
908   }
909 
910   @Override
enterIdle()911   public void enterIdle() {
912     class PrepareToLoseNetworkRunnable implements Runnable {
913       @Override
914       public void run() {
915         if (shutdown.get() || lbHelper == null) {
916           return;
917         }
918         cancelIdleTimer(/* permanent= */ false);
919         enterIdleMode();
920       }
921     }
922 
923     channelExecutor.executeLater(new PrepareToLoseNetworkRunnable()).drain();
924   }
925 
926   /**
927    * A registry that prevents channel shutdown from killing existing retry attempts that are in
928    * backoff.
929    */
930   private final class UncommittedRetriableStreamsRegistry {
931     // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
932     // it's worthwhile to look for a lock-free approach.
933     final Object lock = new Object();
934 
935     @GuardedBy("lock")
936     Collection<ClientStream> uncommittedRetriableStreams = new HashSet<ClientStream>();
937 
938     @GuardedBy("lock")
939     Status shutdownStatus;
940 
onShutdown(Status reason)941     void onShutdown(Status reason) {
942       boolean shouldShutdownDelayedTransport = false;
943       synchronized (lock) {
944         if (shutdownStatus != null) {
945           return;
946         }
947         shutdownStatus = reason;
948         // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
949         // retriable streams, which may be in backoff and not using any transport, are already
950         // started RPCs.
951         if (uncommittedRetriableStreams.isEmpty()) {
952           shouldShutdownDelayedTransport = true;
953         }
954       }
955 
956       if (shouldShutdownDelayedTransport) {
957         delayedTransport.shutdown(reason);
958       }
959     }
960 
onShutdownNow(Status reason)961     void onShutdownNow(Status reason) {
962       onShutdown(reason);
963       Collection<ClientStream> streams;
964 
965       synchronized (lock) {
966         streams = new ArrayList<>(uncommittedRetriableStreams);
967       }
968 
969       for (ClientStream stream : streams) {
970         stream.cancel(reason);
971       }
972       delayedTransport.shutdownNow(reason);
973     }
974 
975     /**
976      * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
977      * shutdown Status.
978      */
979     @Nullable
add(RetriableStream<?> retriableStream)980     Status add(RetriableStream<?> retriableStream) {
981       synchronized (lock) {
982         if (shutdownStatus != null) {
983           return shutdownStatus;
984         }
985         uncommittedRetriableStreams.add(retriableStream);
986         return null;
987       }
988     }
989 
remove(RetriableStream<?> retriableStream)990     void remove(RetriableStream<?> retriableStream) {
991       Status shutdownStatusCopy = null;
992 
993       synchronized (lock) {
994         uncommittedRetriableStreams.remove(retriableStream);
995         if (uncommittedRetriableStreams.isEmpty()) {
996           shutdownStatusCopy = shutdownStatus;
997           // Because retriable transport is long-lived, we take this opportunity to down-size the
998           // hashmap.
999           uncommittedRetriableStreams = new HashSet<ClientStream>();
1000         }
1001       }
1002 
1003       if (shutdownStatusCopy != null) {
1004         delayedTransport.shutdown(shutdownStatusCopy);
1005       }
1006     }
1007   }
1008 
1009   private class LbHelperImpl extends LoadBalancer.Helper {
1010     LoadBalancer lb;
1011     final NameResolver nr;
1012 
LbHelperImpl(NameResolver nr)1013     LbHelperImpl(NameResolver nr) {
1014       this.nr = checkNotNull(nr, "NameResolver");
1015     }
1016 
1017     // Must be called from channelExecutor
handleInternalSubchannelState(ConnectivityStateInfo newState)1018     private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1019       if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1020         nr.refresh();
1021       }
1022     }
1023 
1024     @Override
createSubchannel( List<EquivalentAddressGroup> addressGroups, Attributes attrs)1025     public AbstractSubchannel createSubchannel(
1026         List<EquivalentAddressGroup> addressGroups, Attributes attrs) {
1027       checkNotNull(addressGroups, "addressGroups");
1028       checkNotNull(attrs, "attrs");
1029       // TODO(ejona): can we be even stricter? Like loadBalancer == null?
1030       checkState(!terminated, "Channel is terminated");
1031       final SubchannelImpl subchannel = new SubchannelImpl(attrs);
1032       ChannelTracer subchannelTracer = null;
1033       long subchannelCreationTime = timeProvider.currentTimeNanos();
1034       if (maxTraceEvents > 0) {
1035         subchannelTracer = new ChannelTracer(maxTraceEvents, subchannelCreationTime, "Subchannel");
1036       }
1037       final InternalSubchannel internalSubchannel = new InternalSubchannel(
1038           addressGroups,
1039           authority(),
1040           userAgent,
1041           backoffPolicyProvider,
1042           transportFactory,
1043           transportFactory.getScheduledExecutorService(),
1044           stopwatchSupplier,
1045           channelExecutor,
1046           new InternalSubchannel.Callback() {
1047             // All callbacks are run in channelExecutor
1048             @Override
1049             void onTerminated(InternalSubchannel is) {
1050               subchannels.remove(is);
1051               channelz.removeSubchannel(is);
1052               maybeTerminateChannel();
1053             }
1054 
1055             @Override
1056             void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1057               handleInternalSubchannelState(newState);
1058               // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1059               if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) {
1060                 lb.handleSubchannelState(subchannel, newState);
1061               }
1062             }
1063 
1064             @Override
1065             void onInUse(InternalSubchannel is) {
1066               inUseStateAggregator.updateObjectInUse(is, true);
1067             }
1068 
1069             @Override
1070             void onNotInUse(InternalSubchannel is) {
1071               inUseStateAggregator.updateObjectInUse(is, false);
1072             }
1073           },
1074           channelz,
1075           callTracerFactory.create(),
1076           subchannelTracer,
1077           timeProvider);
1078       if (channelTracer != null) {
1079         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1080             .setDescription("Child channel created")
1081             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1082             .setTimestampNanos(subchannelCreationTime)
1083             .setSubchannelRef(internalSubchannel)
1084             .build());
1085       }
1086       channelz.addSubchannel(internalSubchannel);
1087       subchannel.subchannel = internalSubchannel;
1088       logger.log(Level.FINE, "[{0}] {1} created for {2}",
1089           new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroups});
1090       runSerialized(new Runnable() {
1091           @Override
1092           public void run() {
1093             if (terminating) {
1094               // Because runSerialized() doesn't guarantee the runnable has been executed upon when
1095               // returning, the subchannel may still be returned to the balancer without being
1096               // shutdown even if "terminating" is already true.  The subchannel will not be used in
1097               // this case, because delayed transport has terminated when "terminating" becomes
1098               // true, and no more requests will be sent to balancer beyond this point.
1099               internalSubchannel.shutdown(SHUTDOWN_STATUS);
1100             }
1101             if (!terminated) {
1102               // If channel has not terminated, it will track the subchannel and block termination
1103               // for it.
1104               subchannels.add(internalSubchannel);
1105             }
1106           }
1107         });
1108       return subchannel;
1109     }
1110 
1111     @Override
updateBalancingState( final ConnectivityState newState, final SubchannelPicker newPicker)1112     public void updateBalancingState(
1113         final ConnectivityState newState, final SubchannelPicker newPicker) {
1114       checkNotNull(newState, "newState");
1115       checkNotNull(newPicker, "newPicker");
1116 
1117       runSerialized(
1118           new Runnable() {
1119             @Override
1120             public void run() {
1121               if (LbHelperImpl.this != lbHelper) {
1122                 return;
1123               }
1124               updateSubchannelPicker(newPicker);
1125               // It's not appropriate to report SHUTDOWN state from lb.
1126               // Ignore the case of newState == SHUTDOWN for now.
1127               if (newState != SHUTDOWN) {
1128                 if (channelTracer != null) {
1129                   channelTracer.reportEvent(
1130                       new ChannelTrace.Event.Builder()
1131                           .setDescription("Entering " + newState + " state")
1132                           .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1133                           .setTimestampNanos(timeProvider.currentTimeNanos())
1134                           .build());
1135                 }
1136                 channelStateManager.gotoState(newState);
1137               }
1138             }
1139           });
1140     }
1141 
1142     @Override
updateSubchannelAddresses( LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs)1143     public void updateSubchannelAddresses(
1144         LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
1145       checkArgument(subchannel instanceof SubchannelImpl,
1146           "subchannel must have been returned from createSubchannel");
1147       ((SubchannelImpl) subchannel).subchannel.updateAddresses(addrs);
1148     }
1149 
1150     @Override
createOobChannel(EquivalentAddressGroup addressGroup, String authority)1151     public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1152       // TODO(ejona): can we be even stricter? Like terminating?
1153       checkState(!terminated, "Channel is terminated");
1154       long oobChannelCreationTime = timeProvider.currentTimeNanos();
1155       ChannelTracer oobChannelTracer = null;
1156       ChannelTracer subchannelTracer = null;
1157       if (channelTracer != null) {
1158         oobChannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "OobChannel");
1159       }
1160       final OobChannel oobChannel = new OobChannel(
1161           authority, oobExecutorPool, transportFactory.getScheduledExecutorService(),
1162           channelExecutor, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1163       if (channelTracer != null) {
1164         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1165             .setDescription("Child channel created")
1166             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1167             .setTimestampNanos(oobChannelCreationTime)
1168             .setChannelRef(oobChannel)
1169             .build());
1170         subchannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "Subchannel");
1171       }
1172       final InternalSubchannel internalSubchannel = new InternalSubchannel(
1173           Collections.singletonList(addressGroup),
1174           authority, userAgent, backoffPolicyProvider, transportFactory,
1175           transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,
1176           // All callback methods are run from channelExecutor
1177           new InternalSubchannel.Callback() {
1178             @Override
1179             void onTerminated(InternalSubchannel is) {
1180               oobChannels.remove(oobChannel);
1181               channelz.removeSubchannel(is);
1182               oobChannel.handleSubchannelTerminated();
1183               maybeTerminateChannel();
1184             }
1185 
1186             @Override
1187             void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1188               handleInternalSubchannelState(newState);
1189               oobChannel.handleSubchannelStateChange(newState);
1190             }
1191           },
1192           channelz,
1193           callTracerFactory.create(),
1194           subchannelTracer,
1195           timeProvider);
1196       if (oobChannelTracer != null) {
1197         oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1198             .setDescription("Child channel created")
1199             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1200             .setTimestampNanos(oobChannelCreationTime)
1201             .setSubchannelRef(internalSubchannel)
1202             .build());
1203       }
1204       channelz.addSubchannel(oobChannel);
1205       channelz.addSubchannel(internalSubchannel);
1206       oobChannel.setSubchannel(internalSubchannel);
1207       runSerialized(new Runnable() {
1208           @Override
1209           public void run() {
1210             if (terminating) {
1211               oobChannel.shutdown();
1212             }
1213             if (!terminated) {
1214               // If channel has not terminated, it will track the subchannel and block termination
1215               // for it.
1216               oobChannels.add(oobChannel);
1217             }
1218           }
1219         });
1220       return oobChannel;
1221     }
1222 
1223     @Override
updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag)1224     public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1225       checkArgument(channel instanceof OobChannel,
1226           "channel must have been returned from createOobChannel");
1227       ((OobChannel) channel).updateAddresses(eag);
1228     }
1229 
1230     @Override
getAuthority()1231     public String getAuthority() {
1232       return ManagedChannelImpl.this.authority();
1233     }
1234 
1235     @Override
getNameResolverFactory()1236     public NameResolver.Factory getNameResolverFactory() {
1237       return nameResolverFactory;
1238     }
1239 
1240     @Override
runSerialized(Runnable task)1241     public void runSerialized(Runnable task) {
1242       channelExecutor.executeLater(task).drain();
1243     }
1244   }
1245 
1246   private class NameResolverListenerImpl implements NameResolver.Listener {
1247     final LbHelperImpl helper;
1248 
NameResolverListenerImpl(LbHelperImpl helperImpl)1249     NameResolverListenerImpl(LbHelperImpl helperImpl) {
1250       this.helper = helperImpl;
1251     }
1252 
1253     @Override
onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config)1254     public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) {
1255       if (servers.isEmpty()) {
1256         onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list"));
1257         return;
1258       }
1259       if (logger.isLoggable(Level.FINE)) {
1260         logger.log(Level.FINE, "[{0}] resolved address: {1}, config={2}",
1261             new Object[]{getLogId(), servers, config});
1262       }
1263 
1264       if (channelTracer != null && (haveBackends == null || !haveBackends)) {
1265         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1266             .setDescription("Address resolved: " + servers)
1267             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1268             .setTimestampNanos(timeProvider.currentTimeNanos())
1269             .build());
1270         haveBackends = true;
1271       }
1272       final Map<String, Object> serviceConfig =
1273           config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
1274       if (channelTracer != null && serviceConfig != null
1275           && !serviceConfig.equals(lastServiceConfig)) {
1276         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1277             .setDescription("Service config changed")
1278             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1279             .setTimestampNanos(timeProvider.currentTimeNanos())
1280             .build());
1281         lastServiceConfig = serviceConfig;
1282       }
1283 
1284       final class NamesResolved implements Runnable {
1285         @Override
1286         public void run() {
1287           // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1288           if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
1289             return;
1290           }
1291 
1292           nameResolverBackoffPolicy = null;
1293 
1294           if (serviceConfig != null) {
1295             try {
1296               serviceConfigInterceptor.handleUpdate(serviceConfig);
1297               if (retryEnabled) {
1298                 throttle = getThrottle(config);
1299               }
1300             } catch (RuntimeException re) {
1301               logger.log(
1302                   Level.WARNING,
1303                   "[" + getLogId() + "] Unexpected exception from parsing service config",
1304                   re);
1305             }
1306           }
1307 
1308           helper.lb.handleResolvedAddressGroups(servers, config);
1309         }
1310       }
1311 
1312       helper.runSerialized(new NamesResolved());
1313     }
1314 
1315     @Override
onError(final Status error)1316     public void onError(final Status error) {
1317       checkArgument(!error.isOk(), "the error status must not be OK");
1318       logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1319           new Object[] {getLogId(), error});
1320       if (channelTracer != null && (haveBackends == null || haveBackends)) {
1321         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1322             .setDescription("Failed to resolve name")
1323             .setSeverity(ChannelTrace.Event.Severity.CT_WARNING)
1324             .setTimestampNanos(timeProvider.currentTimeNanos())
1325             .build());
1326         haveBackends = false;
1327       }
1328       channelExecutor
1329           .executeLater(
1330               new Runnable() {
1331                 @Override
1332                 public void run() {
1333                   // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1334                   if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
1335                     return;
1336                   }
1337                   helper.lb.handleNameResolutionError(error);
1338                   if (nameResolverRefreshFuture != null) {
1339                     // The name resolver may invoke onError multiple times, but we only want to
1340                     // schedule one backoff attempt
1341                     // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
1342                     // want to reset the backoff interval upon repeated onError() calls
1343                     return;
1344                   }
1345                   if (nameResolverBackoffPolicy == null) {
1346                     nameResolverBackoffPolicy = backoffPolicyProvider.get();
1347                   }
1348                   long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
1349                   if (logger.isLoggable(Level.FINE)) {
1350                     logger.log(
1351                         Level.FINE,
1352                         "[{0}] Scheduling DNS resolution backoff for {1} ns",
1353                         new Object[] {logId, delayNanos});
1354                   }
1355                   nameResolverRefresh = new NameResolverRefresh();
1356                   nameResolverRefreshFuture =
1357                       transportFactory
1358                           .getScheduledExecutorService()
1359                           .schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS);
1360                 }
1361               })
1362           .drain();
1363     }
1364   }
1365 
1366   @Nullable
getThrottle(Attributes config)1367   private static Throttle getThrottle(Attributes config) {
1368     return ServiceConfigUtil.getThrottlePolicy(
1369         config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG));
1370   }
1371 
1372   private final class SubchannelImpl extends AbstractSubchannel {
1373     // Set right after SubchannelImpl is created.
1374     InternalSubchannel subchannel;
1375     final Object shutdownLock = new Object();
1376     final Attributes attrs;
1377 
1378     @GuardedBy("shutdownLock")
1379     boolean shutdownRequested;
1380     @GuardedBy("shutdownLock")
1381     ScheduledFuture<?> delayedShutdownTask;
1382 
SubchannelImpl(Attributes attrs)1383     SubchannelImpl(Attributes attrs) {
1384       this.attrs = checkNotNull(attrs, "attrs");
1385     }
1386 
1387     @Override
obtainActiveTransport()1388     ClientTransport obtainActiveTransport() {
1389       return subchannel.obtainActiveTransport();
1390     }
1391 
1392     @Override
getInternalSubchannel()1393     InternalInstrumented<ChannelStats> getInternalSubchannel() {
1394       return subchannel;
1395     }
1396 
1397     @Override
shutdown()1398     public void shutdown() {
1399       synchronized (shutdownLock) {
1400         if (shutdownRequested) {
1401           if (terminating && delayedShutdownTask != null) {
1402             // shutdown() was previously called when terminating == false, thus a delayed shutdown()
1403             // was scheduled.  Now since terminating == true, We should expedite the shutdown.
1404             delayedShutdownTask.cancel(false);
1405             delayedShutdownTask = null;
1406             // Will fall through to the subchannel.shutdown() at the end.
1407           } else {
1408             return;
1409           }
1410         } else {
1411           shutdownRequested = true;
1412         }
1413         // Add a delay to shutdown to deal with the race between 1) a transport being picked and
1414         // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
1415         // because of address change, or because LoadBalancer is shutdown by Channel entering idle
1416         // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
1417         // shutdown of Subchannel for a few seconds here.
1418         //
1419         // TODO(zhangkun83): consider a better approach
1420         // (https://github.com/grpc/grpc-java/issues/2562).
1421         if (!terminating) {
1422           delayedShutdownTask = transportFactory.getScheduledExecutorService().schedule(
1423               new LogExceptionRunnable(
1424                   new Runnable() {
1425                     @Override
1426                     public void run() {
1427                       subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1428                     }
1429                   }), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
1430           return;
1431         }
1432       }
1433       // When terminating == true, no more real streams will be created. It's safe and also
1434       // desirable to shutdown timely.
1435       subchannel.shutdown(SHUTDOWN_STATUS);
1436     }
1437 
1438     @Override
requestConnection()1439     public void requestConnection() {
1440       subchannel.obtainActiveTransport();
1441     }
1442 
1443     @Override
getAllAddresses()1444     public List<EquivalentAddressGroup> getAllAddresses() {
1445       return subchannel.getAddressGroups();
1446     }
1447 
1448     @Override
getAttributes()1449     public Attributes getAttributes() {
1450       return attrs;
1451     }
1452 
1453     @Override
toString()1454     public String toString() {
1455       return subchannel.getLogId().toString();
1456     }
1457   }
1458 
1459   @Override
toString()1460   public String toString() {
1461     return MoreObjects.toStringHelper(this)
1462         .add("logId", logId.getId())
1463         .add("target", target)
1464         .toString();
1465   }
1466 }
1467