• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2020 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.xds;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21 import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
22 
23 import com.google.common.annotations.VisibleForTesting;
24 import io.grpc.Attributes;
25 import io.grpc.EquivalentAddressGroup;
26 import io.grpc.InternalLogId;
27 import io.grpc.LoadBalancer;
28 import io.grpc.LoadBalancerProvider;
29 import io.grpc.LoadBalancerRegistry;
30 import io.grpc.NameResolver;
31 import io.grpc.NameResolver.ResolutionResult;
32 import io.grpc.Status;
33 import io.grpc.SynchronizationContext;
34 import io.grpc.SynchronizationContext.ScheduledHandle;
35 import io.grpc.internal.BackoffPolicy;
36 import io.grpc.internal.ExponentialBackoffPolicy;
37 import io.grpc.internal.ObjectPool;
38 import io.grpc.internal.ServiceConfigUtil.PolicySelection;
39 import io.grpc.util.ForwardingLoadBalancerHelper;
40 import io.grpc.util.GracefulSwitchLoadBalancer;
41 import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
42 import io.grpc.xds.Bootstrapper.ServerInfo;
43 import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
44 import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
45 import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
46 import io.grpc.xds.Endpoints.DropOverload;
47 import io.grpc.xds.Endpoints.LbEndpoint;
48 import io.grpc.xds.Endpoints.LocalityLbEndpoints;
49 import io.grpc.xds.EnvoyServerProtoData.FailurePercentageEjection;
50 import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
51 import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection;
52 import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
53 import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
54 import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
55 import io.grpc.xds.XdsClient.ResourceWatcher;
56 import io.grpc.xds.XdsEndpointResource.EdsUpdate;
57 import io.grpc.xds.XdsLogger.XdsLogLevel;
58 import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
59 import java.net.URI;
60 import java.net.URISyntaxException;
61 import java.util.ArrayList;
62 import java.util.Arrays;
63 import java.util.Collections;
64 import java.util.HashMap;
65 import java.util.HashSet;
66 import java.util.List;
67 import java.util.Locale;
68 import java.util.Map;
69 import java.util.Objects;
70 import java.util.Set;
71 import java.util.TreeMap;
72 import java.util.concurrent.ScheduledExecutorService;
73 import java.util.concurrent.TimeUnit;
74 import javax.annotation.Nullable;
75 
76 /**
77  * Load balancer for cluster_resolver_experimental LB policy. This LB policy is the child LB policy
78  * of the cds_experimental LB policy and the parent LB policy of the priority_experimental LB
79  * policy in the xDS load balancing hierarchy. This policy resolves endpoints of non-aggregate
80  * clusters (e.g., EDS or Logical DNS) and groups endpoints in priorities and localities to be
81  * used in the downstream LB policies for fine-grained load balancing purposes.
82  */
83 final class ClusterResolverLoadBalancer extends LoadBalancer {
84   // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
85   // to an empty locality.
86   private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
87   private final XdsLogger logger;
88   private final SynchronizationContext syncContext;
89   private final ScheduledExecutorService timeService;
90   private final LoadBalancerRegistry lbRegistry;
91   private final BackoffPolicy.Provider backoffPolicyProvider;
92   private final GracefulSwitchLoadBalancer delegate;
93   private ObjectPool<XdsClient> xdsClientPool;
94   private XdsClient xdsClient;
95   private ClusterResolverConfig config;
96 
ClusterResolverLoadBalancer(Helper helper)97   ClusterResolverLoadBalancer(Helper helper) {
98     this(helper, LoadBalancerRegistry.getDefaultRegistry(),
99         new ExponentialBackoffPolicy.Provider());
100   }
101 
102   @VisibleForTesting
ClusterResolverLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, BackoffPolicy.Provider backoffPolicyProvider)103   ClusterResolverLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry,
104       BackoffPolicy.Provider backoffPolicyProvider) {
105     this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
106     this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
107     this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
108     this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
109     delegate = new GracefulSwitchLoadBalancer(helper);
110     logger = XdsLogger.withLogId(
111         InternalLogId.allocate("cluster-resolver-lb", helper.getAuthority()));
112     logger.log(XdsLogLevel.INFO, "Created");
113   }
114 
115   @Override
acceptResolvedAddresses(ResolvedAddresses resolvedAddresses)116   public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
117     logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
118     if (xdsClientPool == null) {
119       xdsClientPool = resolvedAddresses.getAttributes().get(InternalXdsAttributes.XDS_CLIENT_POOL);
120       xdsClient = xdsClientPool.getObject();
121     }
122     ClusterResolverConfig config =
123         (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
124     if (!Objects.equals(this.config, config)) {
125       logger.log(XdsLogLevel.DEBUG, "Config: {0}", config);
126       delegate.switchTo(new ClusterResolverLbStateFactory());
127       this.config = config;
128       delegate.handleResolvedAddresses(resolvedAddresses);
129     }
130     return true;
131   }
132 
133   @Override
handleNameResolutionError(Status error)134   public void handleNameResolutionError(Status error) {
135     logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
136     delegate.handleNameResolutionError(error);
137   }
138 
139   @Override
shutdown()140   public void shutdown() {
141     logger.log(XdsLogLevel.INFO, "Shutdown");
142     delegate.shutdown();
143     if (xdsClientPool != null) {
144       xdsClientPool.returnObject(xdsClient);
145     }
146   }
147 
148   private final class ClusterResolverLbStateFactory extends LoadBalancer.Factory {
149     @Override
newLoadBalancer(Helper helper)150     public LoadBalancer newLoadBalancer(Helper helper) {
151       return new ClusterResolverLbState(helper);
152     }
153   }
154 
155   /**
156    * The state of a cluster_resolver LB working session. A new instance is created whenever
157    * the cluster_resolver LB receives a new config. The old instance is replaced when the
158    * new one is ready to handle new RPCs.
159    */
160   private final class ClusterResolverLbState extends LoadBalancer {
161     private final Helper helper;
162     private final List<String> clusters = new ArrayList<>();
163     private final Map<String, ClusterState> clusterStates = new HashMap<>();
164     private PolicySelection endpointLbPolicy;
165     private ResolvedAddresses resolvedAddresses;
166     private LoadBalancer childLb;
167 
ClusterResolverLbState(Helper helper)168     ClusterResolverLbState(Helper helper) {
169       this.helper = new RefreshableHelper(checkNotNull(helper, "helper"));
170       logger.log(XdsLogLevel.DEBUG, "New ClusterResolverLbState");
171     }
172 
173     @Override
acceptResolvedAddresses(ResolvedAddresses resolvedAddresses)174     public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
175       this.resolvedAddresses = resolvedAddresses;
176       ClusterResolverConfig config =
177           (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
178       endpointLbPolicy = config.lbPolicy;
179       for (DiscoveryMechanism instance : config.discoveryMechanisms) {
180         clusters.add(instance.cluster);
181         ClusterState state;
182         if (instance.type == DiscoveryMechanism.Type.EDS) {
183           state = new EdsClusterState(instance.cluster, instance.edsServiceName,
184               instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
185               instance.outlierDetection);
186         } else {  // logical DNS
187           state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName,
188               instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext);
189         }
190         clusterStates.put(instance.cluster, state);
191         state.start();
192       }
193       return true;
194     }
195 
196     @Override
handleNameResolutionError(Status error)197     public void handleNameResolutionError(Status error) {
198       if (childLb != null) {
199         childLb.handleNameResolutionError(error);
200       } else {
201         helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
202       }
203     }
204 
205     @Override
shutdown()206     public void shutdown() {
207       for (ClusterState state : clusterStates.values()) {
208         state.shutdown();
209       }
210       if (childLb != null) {
211         childLb.shutdown();
212       }
213     }
214 
handleEndpointResourceUpdate()215     private void handleEndpointResourceUpdate() {
216       List<EquivalentAddressGroup> addresses = new ArrayList<>();
217       Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
218       List<String> priorities = new ArrayList<>();  // totally ordered priority list
219 
220       Status endpointNotFound = Status.OK;
221       for (String cluster : clusters) {
222         ClusterState state = clusterStates.get(cluster);
223         // Propagate endpoints to the child LB policy only after all clusters have been resolved.
224         if (!state.resolved && state.status.isOk()) {
225           return;
226         }
227         if (state.result != null) {
228           addresses.addAll(state.result.addresses);
229           priorityChildConfigs.putAll(state.result.priorityChildConfigs);
230           priorities.addAll(state.result.priorities);
231         } else {
232           endpointNotFound = state.status;
233         }
234       }
235       if (addresses.isEmpty()) {
236         if (endpointNotFound.isOk()) {
237           endpointNotFound = Status.UNAVAILABLE.withDescription(
238               "No usable endpoint from cluster(s): " + clusters);
239         } else {
240           endpointNotFound =
241               Status.UNAVAILABLE.withCause(endpointNotFound.getCause())
242                   .withDescription(endpointNotFound.getDescription());
243         }
244         helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(endpointNotFound));
245         if (childLb != null) {
246           childLb.shutdown();
247           childLb = null;
248         }
249         return;
250       }
251       PriorityLbConfig childConfig =
252           new PriorityLbConfig(Collections.unmodifiableMap(priorityChildConfigs),
253               Collections.unmodifiableList(priorities));
254       if (childLb == null) {
255         childLb = lbRegistry.getProvider(PRIORITY_POLICY_NAME).newLoadBalancer(helper);
256       }
257       childLb.handleResolvedAddresses(
258           resolvedAddresses.toBuilder()
259               .setLoadBalancingPolicyConfig(childConfig)
260               .setAddresses(Collections.unmodifiableList(addresses))
261               .build());
262     }
263 
handleEndpointResolutionError()264     private void handleEndpointResolutionError() {
265       boolean allInError = true;
266       Status error = null;
267       for (String cluster : clusters) {
268         ClusterState state = clusterStates.get(cluster);
269         if (state.status.isOk()) {
270           allInError = false;
271         } else {
272           error = state.status;
273         }
274       }
275       if (allInError) {
276         if (childLb != null) {
277           childLb.handleNameResolutionError(error);
278         } else {
279           helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
280         }
281       }
282     }
283 
284     /**
285      * Wires re-resolution requests from downstream LB policies with DNS resolver.
286      */
287     private final class RefreshableHelper extends ForwardingLoadBalancerHelper {
288       private final Helper delegate;
289 
RefreshableHelper(Helper delegate)290       private RefreshableHelper(Helper delegate) {
291         this.delegate = checkNotNull(delegate, "delegate");
292       }
293 
294       @Override
refreshNameResolution()295       public void refreshNameResolution() {
296         for (ClusterState state : clusterStates.values()) {
297           if (state instanceof LogicalDnsClusterState) {
298             ((LogicalDnsClusterState) state).refresh();
299           }
300         }
301       }
302 
303       @Override
delegate()304       protected Helper delegate() {
305         return delegate;
306       }
307     }
308 
309     /**
310      * Resolution state of an underlying cluster.
311      */
312     private abstract class ClusterState {
313       // Name of the cluster to be resolved.
314       protected final String name;
315       @Nullable
316       protected final ServerInfo lrsServerInfo;
317       @Nullable
318       protected final Long maxConcurrentRequests;
319       @Nullable
320       protected final UpstreamTlsContext tlsContext;
321       @Nullable
322       protected final OutlierDetection outlierDetection;
323       // Resolution status, may contain most recent error encountered.
324       protected Status status = Status.OK;
325       // True if has received resolution result.
326       protected boolean resolved;
327       // Most recently resolved addresses and config, or null if resource not exists.
328       @Nullable
329       protected ClusterResolutionResult result;
330 
331       protected boolean shutdown;
332 
ClusterState(String name, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, @Nullable OutlierDetection outlierDetection)333       private ClusterState(String name, @Nullable ServerInfo lrsServerInfo,
334           @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
335           @Nullable OutlierDetection outlierDetection) {
336         this.name = name;
337         this.lrsServerInfo = lrsServerInfo;
338         this.maxConcurrentRequests = maxConcurrentRequests;
339         this.tlsContext = tlsContext;
340         this.outlierDetection = outlierDetection;
341       }
342 
start()343       abstract void start();
344 
shutdown()345       void shutdown() {
346         shutdown = true;
347       }
348     }
349 
350     private final class EdsClusterState extends ClusterState implements ResourceWatcher<EdsUpdate> {
351       @Nullable
352       private final String edsServiceName;
353       private Map<Locality, String> localityPriorityNames = Collections.emptyMap();
354       int priorityNameGenId = 1;
355 
EdsClusterState(String name, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, @Nullable OutlierDetection outlierDetection)356       private EdsClusterState(String name, @Nullable String edsServiceName,
357           @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
358           @Nullable UpstreamTlsContext tlsContext, @Nullable OutlierDetection outlierDetection) {
359         super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, outlierDetection);
360         this.edsServiceName = edsServiceName;
361       }
362 
363       @Override
start()364       void start() {
365         String resourceName = edsServiceName != null ? edsServiceName : name;
366         logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName);
367         xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), resourceName, this);
368       }
369 
370       @Override
shutdown()371       protected void shutdown() {
372         super.shutdown();
373         String resourceName = edsServiceName != null ? edsServiceName : name;
374         logger.log(XdsLogLevel.INFO, "Stop watching EDS resource {0}", resourceName);
375         xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), resourceName, this);
376       }
377 
378       @Override
onChanged(final EdsUpdate update)379       public void onChanged(final EdsUpdate update) {
380         class EndpointsUpdated implements Runnable {
381           @Override
382           public void run() {
383             if (shutdown) {
384               return;
385             }
386             logger.log(XdsLogLevel.DEBUG, "Received endpoint update {0}", update);
387             if (logger.isLoggable(XdsLogLevel.INFO)) {
388               logger.log(XdsLogLevel.INFO, "Cluster {0}: {1} localities, {2} drop categories",
389                   update.clusterName, update.localityLbEndpointsMap.size(),
390                   update.dropPolicies.size());
391             }
392             Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
393                 update.localityLbEndpointsMap;
394             List<DropOverload> dropOverloads = update.dropPolicies;
395             List<EquivalentAddressGroup> addresses = new ArrayList<>();
396             Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
397             List<String> sortedPriorityNames = generatePriorityNames(name, localityLbEndpoints);
398             for (Locality locality : localityLbEndpoints.keySet()) {
399               LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
400               String priorityName = localityPriorityNames.get(locality);
401               boolean discard = true;
402               for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
403                 if (endpoint.isHealthy()) {
404                   discard = false;
405                   long weight = localityLbInfo.localityWeight();
406                   if (endpoint.loadBalancingWeight() != 0) {
407                     weight *= endpoint.loadBalancingWeight();
408                   }
409                   Attributes attr =
410                       endpoint.eag().getAttributes().toBuilder()
411                           .set(InternalXdsAttributes.ATTR_LOCALITY, locality)
412                           .set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT,
413                               localityLbInfo.localityWeight())
414                           .set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight)
415                           .build();
416                   EquivalentAddressGroup eag = new EquivalentAddressGroup(
417                       endpoint.eag().getAddresses(), attr);
418                   eag = AddressFilter.setPathFilter(
419                       eag, Arrays.asList(priorityName, localityName(locality)));
420                   addresses.add(eag);
421                 }
422               }
423               if (discard) {
424                 logger.log(XdsLogLevel.INFO,
425                     "Discard locality {0} with 0 healthy endpoints", locality);
426                 continue;
427               }
428               if (!prioritizedLocalityWeights.containsKey(priorityName)) {
429                 prioritizedLocalityWeights.put(priorityName, new HashMap<Locality, Integer>());
430               }
431               prioritizedLocalityWeights.get(priorityName).put(
432                   locality, localityLbInfo.localityWeight());
433             }
434             if (prioritizedLocalityWeights.isEmpty()) {
435               // Will still update the result, as if the cluster resource is revoked.
436               logger.log(XdsLogLevel.INFO,
437                   "Cluster {0} has no usable priority/locality/endpoint", update.clusterName);
438             }
439             sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet());
440             Map<String, PriorityChildConfig> priorityChildConfigs =
441                 generateEdsBasedPriorityChildConfigs(
442                     name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext,
443                     outlierDetection, endpointLbPolicy, lbRegistry, prioritizedLocalityWeights,
444                     dropOverloads);
445             status = Status.OK;
446             resolved = true;
447             result = new ClusterResolutionResult(addresses, priorityChildConfigs,
448                 sortedPriorityNames);
449             handleEndpointResourceUpdate();
450           }
451         }
452 
453         syncContext.execute(new EndpointsUpdated());
454       }
455 
generatePriorityNames(String name, Map<Locality, LocalityLbEndpoints> localityLbEndpoints)456       private List<String> generatePriorityNames(String name,
457           Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
458         TreeMap<Integer, List<Locality>> todo = new TreeMap<>();
459         for (Locality locality : localityLbEndpoints.keySet()) {
460           int priority = localityLbEndpoints.get(locality).priority();
461           if (!todo.containsKey(priority)) {
462             todo.put(priority, new ArrayList<>());
463           }
464           todo.get(priority).add(locality);
465         }
466         Map<Locality, String> newNames = new HashMap<>();
467         Set<String> usedNames = new HashSet<>();
468         List<String> ret = new ArrayList<>();
469         for (Integer priority: todo.keySet()) {
470           String foundName = "";
471           for (Locality locality : todo.get(priority)) {
472             if (localityPriorityNames.containsKey(locality)
473                 && usedNames.add(localityPriorityNames.get(locality))) {
474               foundName = localityPriorityNames.get(locality);
475               break;
476             }
477           }
478           if ("".equals(foundName)) {
479             foundName = String.format(Locale.US, "%s[child%d]", name, priorityNameGenId++);
480           }
481           for (Locality locality : todo.get(priority)) {
482             newNames.put(locality, foundName);
483           }
484           ret.add(foundName);
485         }
486         localityPriorityNames = newNames;
487         return ret;
488       }
489 
490       @Override
onResourceDoesNotExist(final String resourceName)491       public void onResourceDoesNotExist(final String resourceName) {
492         syncContext.execute(new Runnable() {
493           @Override
494           public void run() {
495             if (shutdown) {
496               return;
497             }
498             logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
499             status = Status.OK;
500             resolved = true;
501             result = null;  // resource revoked
502             handleEndpointResourceUpdate();
503           }
504         });
505       }
506 
507       @Override
onError(final Status error)508       public void onError(final Status error) {
509         syncContext.execute(new Runnable() {
510           @Override
511           public void run() {
512             if (shutdown) {
513               return;
514             }
515             String resourceName = edsServiceName != null ? edsServiceName : name;
516             status = Status.UNAVAILABLE
517                 .withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
518                       resourceName, error.getCode(), error.getDescription()))
519                 .withCause(error.getCause());
520             logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
521             handleEndpointResolutionError();
522           }
523         });
524       }
525     }
526 
527     private final class LogicalDnsClusterState extends ClusterState {
528       private final String dnsHostName;
529       private final NameResolver.Factory nameResolverFactory;
530       private final NameResolver.Args nameResolverArgs;
531       private NameResolver resolver;
532       @Nullable
533       private BackoffPolicy backoffPolicy;
534       @Nullable
535       private ScheduledHandle scheduledRefresh;
536 
LogicalDnsClusterState(String name, String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext)537       private LogicalDnsClusterState(String name, String dnsHostName,
538           @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
539           @Nullable UpstreamTlsContext tlsContext) {
540         super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, null);
541         this.dnsHostName = checkNotNull(dnsHostName, "dnsHostName");
542         nameResolverFactory =
543             checkNotNull(helper.getNameResolverRegistry().asFactory(), "nameResolverFactory");
544         nameResolverArgs = checkNotNull(helper.getNameResolverArgs(), "nameResolverArgs");
545       }
546 
547       @Override
start()548       void start() {
549         URI uri;
550         try {
551           uri = new URI("dns", "", "/" + dnsHostName, null);
552         } catch (URISyntaxException e) {
553           status = Status.INTERNAL.withDescription(
554               "Bug, invalid URI creation: " + dnsHostName).withCause(e);
555           handleEndpointResolutionError();
556           return;
557         }
558         resolver = nameResolverFactory.newNameResolver(uri, nameResolverArgs);
559         if (resolver == null) {
560           status = Status.INTERNAL.withDescription("Xds cluster resolver lb for logical DNS "
561               + "cluster [" + name + "] cannot find DNS resolver with uri:" + uri);
562           handleEndpointResolutionError();
563           return;
564         }
565         resolver.start(new NameResolverListener());
566       }
567 
refresh()568       void refresh() {
569         if (resolver == null) {
570           return;
571         }
572         cancelBackoff();
573         resolver.refresh();
574       }
575 
576       @Override
shutdown()577       void shutdown() {
578         super.shutdown();
579         if (resolver != null) {
580           resolver.shutdown();
581         }
582         cancelBackoff();
583       }
584 
cancelBackoff()585       private void cancelBackoff() {
586         if (scheduledRefresh != null) {
587           scheduledRefresh.cancel();
588           scheduledRefresh = null;
589           backoffPolicy = null;
590         }
591       }
592 
593       private class DelayedNameResolverRefresh implements Runnable {
594         @Override
run()595         public void run() {
596           scheduledRefresh = null;
597           if (!shutdown) {
598             resolver.refresh();
599           }
600         }
601       }
602 
603       private class NameResolverListener extends NameResolver.Listener2 {
604         @Override
onResult(final ResolutionResult resolutionResult)605         public void onResult(final ResolutionResult resolutionResult) {
606           class NameResolved implements Runnable {
607             @Override
608             public void run() {
609               if (shutdown) {
610                 return;
611               }
612               backoffPolicy = null;  // reset backoff sequence if succeeded
613               // Arbitrary priority notation for all DNS-resolved endpoints.
614               String priorityName = priorityName(name, 0);  // value doesn't matter
615               List<EquivalentAddressGroup> addresses = new ArrayList<>();
616               for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
617                 // No weight attribute is attached, all endpoint-level LB policy should be able
618                 // to handle such it.
619                 Attributes attr = eag.getAttributes().toBuilder().set(
620                     InternalXdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY).build();
621                 eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
622                 eag = AddressFilter.setPathFilter(
623                     eag, Arrays.asList(priorityName, LOGICAL_DNS_CLUSTER_LOCALITY.toString()));
624                 addresses.add(eag);
625               }
626               PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
627                   name, lrsServerInfo, maxConcurrentRequests, tlsContext, lbRegistry,
628                   Collections.<DropOverload>emptyList());
629               status = Status.OK;
630               resolved = true;
631               result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
632               handleEndpointResourceUpdate();
633             }
634           }
635 
636           syncContext.execute(new NameResolved());
637         }
638 
639         @Override
onError(final Status error)640         public void onError(final Status error) {
641           syncContext.execute(new Runnable() {
642             @Override
643             public void run() {
644               if (shutdown) {
645                 return;
646               }
647               status = error;
648               // NameResolver.Listener API cannot distinguish between address-not-found and
649               // transient errors. If the error occurs in the first resolution, treat it as
650               // address not found. Otherwise, either there is previously resolved addresses
651               // previously encountered error, propagate the error to downstream/upstream and
652               // let downstream/upstream handle it.
653               if (!resolved) {
654                 resolved = true;
655                 handleEndpointResourceUpdate();
656               } else {
657                 handleEndpointResolutionError();
658               }
659               if (scheduledRefresh != null && scheduledRefresh.isPending()) {
660                 return;
661               }
662               if (backoffPolicy == null) {
663                 backoffPolicy = backoffPolicyProvider.get();
664               }
665               long delayNanos = backoffPolicy.nextBackoffNanos();
666               logger.log(XdsLogLevel.DEBUG,
667                   "Logical DNS resolver for cluster {0} encountered name resolution "
668                       + "error: {1}, scheduling DNS resolution backoff for {2} ns",
669                   name, error, delayNanos);
670               scheduledRefresh =
671                   syncContext.schedule(
672                       new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
673                       timeService);
674             }
675           });
676         }
677       }
678     }
679   }
680 
681   private static class ClusterResolutionResult {
682     // Endpoint addresses.
683     private final List<EquivalentAddressGroup> addresses;
684     // Config (include load balancing policy/config) for each priority in the cluster.
685     private final Map<String, PriorityChildConfig> priorityChildConfigs;
686     // List of priority names ordered in descending priorities.
687     private final List<String> priorities;
688 
ClusterResolutionResult(List<EquivalentAddressGroup> addresses, String priority, PriorityChildConfig config)689     ClusterResolutionResult(List<EquivalentAddressGroup> addresses, String priority,
690         PriorityChildConfig config) {
691       this(addresses, Collections.singletonMap(priority, config),
692           Collections.singletonList(priority));
693     }
694 
ClusterResolutionResult(List<EquivalentAddressGroup> addresses, Map<String, PriorityChildConfig> configs, List<String> priorities)695     ClusterResolutionResult(List<EquivalentAddressGroup> addresses,
696         Map<String, PriorityChildConfig> configs, List<String> priorities) {
697       this.addresses = addresses;
698       this.priorityChildConfigs = configs;
699       this.priorities = priorities;
700     }
701   }
702 
703   /**
704    * Generates the config to be used in the priority LB policy for the single priority of
705    * logical DNS cluster.
706    *
707    * <p>priority LB -> cluster_impl LB (single hardcoded priority) -> pick_first
708    */
generateDnsBasedPriorityChildConfig( String cluster, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, LoadBalancerRegistry lbRegistry, List<DropOverload> dropOverloads)709   private static PriorityChildConfig generateDnsBasedPriorityChildConfig(
710       String cluster, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
711       @Nullable UpstreamTlsContext tlsContext, LoadBalancerRegistry lbRegistry,
712       List<DropOverload> dropOverloads) {
713     // Override endpoint-level LB policy with pick_first for logical DNS cluster.
714     PolicySelection endpointLbPolicy =
715         new PolicySelection(lbRegistry.getProvider("pick_first"), null);
716     ClusterImplConfig clusterImplConfig =
717         new ClusterImplConfig(cluster, null, lrsServerInfo, maxConcurrentRequests,
718             dropOverloads, endpointLbPolicy, tlsContext);
719     LoadBalancerProvider clusterImplLbProvider =
720         lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
721     PolicySelection clusterImplPolicy =
722         new PolicySelection(clusterImplLbProvider, clusterImplConfig);
723     return new PriorityChildConfig(clusterImplPolicy, false /* ignoreReresolution*/);
724   }
725 
726   /**
727    * Generates configs to be used in the priority LB policy for priorities in an EDS cluster.
728    *
729    * <p>priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB
730    * -> round_robin / least_request_experimental (one per locality)) / ring_hash_experimental
731    */
generateEdsBasedPriorityChildConfigs( String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, @Nullable OutlierDetection outlierDetection, PolicySelection endpointLbPolicy, LoadBalancerRegistry lbRegistry, Map<String, Map<Locality, Integer>> prioritizedLocalityWeights, List<DropOverload> dropOverloads)732   private static Map<String, PriorityChildConfig> generateEdsBasedPriorityChildConfigs(
733       String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo,
734       @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
735       @Nullable OutlierDetection outlierDetection, PolicySelection endpointLbPolicy,
736       LoadBalancerRegistry lbRegistry, Map<String,
737       Map<Locality, Integer>> prioritizedLocalityWeights, List<DropOverload> dropOverloads) {
738     Map<String, PriorityChildConfig> configs = new HashMap<>();
739     for (String priority : prioritizedLocalityWeights.keySet()) {
740       ClusterImplConfig clusterImplConfig =
741           new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests,
742               dropOverloads, endpointLbPolicy, tlsContext);
743       LoadBalancerProvider clusterImplLbProvider =
744           lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
745       PolicySelection priorityChildPolicy =
746           new PolicySelection(clusterImplLbProvider, clusterImplConfig);
747 
748       // If outlier detection has been configured we wrap the child policy in the outlier detection
749       // load balancer.
750       if (outlierDetection != null) {
751         LoadBalancerProvider outlierDetectionProvider = lbRegistry.getProvider(
752             "outlier_detection_experimental");
753         priorityChildPolicy = new PolicySelection(outlierDetectionProvider,
754             buildOutlierDetectionLbConfig(outlierDetection, priorityChildPolicy));
755       }
756 
757       PriorityChildConfig priorityChildConfig =
758           new PriorityChildConfig(priorityChildPolicy, true /* ignoreReresolution */);
759       configs.put(priority, priorityChildConfig);
760     }
761     return configs;
762   }
763 
764   /**
765    * Converts {@link OutlierDetection} that represents the xDS configuration to {@link
766    * OutlierDetectionLoadBalancerConfig} that the {@link io.grpc.util.OutlierDetectionLoadBalancer}
767    * understands.
768    */
buildOutlierDetectionLbConfig( OutlierDetection outlierDetection, PolicySelection childPolicy)769   private static OutlierDetectionLoadBalancerConfig buildOutlierDetectionLbConfig(
770       OutlierDetection outlierDetection, PolicySelection childPolicy) {
771     OutlierDetectionLoadBalancerConfig.Builder configBuilder
772         = new OutlierDetectionLoadBalancerConfig.Builder();
773 
774     configBuilder.setChildPolicy(childPolicy);
775 
776     if (outlierDetection.intervalNanos() != null) {
777       configBuilder.setIntervalNanos(outlierDetection.intervalNanos());
778     }
779     if (outlierDetection.baseEjectionTimeNanos() != null) {
780       configBuilder.setBaseEjectionTimeNanos(outlierDetection.baseEjectionTimeNanos());
781     }
782     if (outlierDetection.maxEjectionTimeNanos() != null) {
783       configBuilder.setMaxEjectionTimeNanos(outlierDetection.maxEjectionTimeNanos());
784     }
785     if (outlierDetection.maxEjectionPercent() != null) {
786       configBuilder.setMaxEjectionPercent(outlierDetection.maxEjectionPercent());
787     }
788 
789     SuccessRateEjection successRate = outlierDetection.successRateEjection();
790     if (successRate != null) {
791       OutlierDetectionLoadBalancerConfig.SuccessRateEjection.Builder
792           successRateConfigBuilder = new OutlierDetectionLoadBalancerConfig
793           .SuccessRateEjection.Builder();
794 
795       if (successRate.stdevFactor() != null) {
796         successRateConfigBuilder.setStdevFactor(successRate.stdevFactor());
797       }
798       if (successRate.enforcementPercentage() != null) {
799         successRateConfigBuilder.setEnforcementPercentage(successRate.enforcementPercentage());
800       }
801       if (successRate.minimumHosts() != null) {
802         successRateConfigBuilder.setMinimumHosts(successRate.minimumHosts());
803       }
804       if (successRate.requestVolume() != null) {
805         successRateConfigBuilder.setRequestVolume(successRate.requestVolume());
806       }
807 
808       configBuilder.setSuccessRateEjection(successRateConfigBuilder.build());
809     }
810 
811     FailurePercentageEjection failurePercentage = outlierDetection.failurePercentageEjection();
812     if (failurePercentage != null) {
813       OutlierDetectionLoadBalancerConfig.FailurePercentageEjection.Builder
814           failurePercentageConfigBuilder = new OutlierDetectionLoadBalancerConfig
815           .FailurePercentageEjection.Builder();
816 
817       if (failurePercentage.threshold() != null) {
818         failurePercentageConfigBuilder.setThreshold(failurePercentage.threshold());
819       }
820       if (failurePercentage.enforcementPercentage() != null) {
821         failurePercentageConfigBuilder.setEnforcementPercentage(
822             failurePercentage.enforcementPercentage());
823       }
824       if (failurePercentage.minimumHosts() != null) {
825         failurePercentageConfigBuilder.setMinimumHosts(failurePercentage.minimumHosts());
826       }
827       if (failurePercentage.requestVolume() != null) {
828         failurePercentageConfigBuilder.setRequestVolume(failurePercentage.requestVolume());
829       }
830 
831       configBuilder.setFailurePercentageEjection(failurePercentageConfigBuilder.build());
832     }
833 
834     return configBuilder.build();
835   }
836 
837   /**
838    * Generates a string that represents the priority in the LB policy config. The string is unique
839    * across priorities in all clusters and priorityName(c, p1) < priorityName(c, p2) iff p1 < p2.
840    * The ordering is undefined for priorities in different clusters.
841    */
priorityName(String cluster, int priority)842   private static String priorityName(String cluster, int priority) {
843     return cluster + "[child" + priority + "]";
844   }
845 
846   /**
847    * Generates a string that represents the locality in the LB policy config. The string is unique
848    * across all localities in all clusters.
849    */
localityName(Locality locality)850   private static String localityName(Locality locality) {
851     return locality.toString();
852   }
853 }
854