1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 21 import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; 22 23 import com.google.common.annotations.VisibleForTesting; 24 import io.grpc.Attributes; 25 import io.grpc.EquivalentAddressGroup; 26 import io.grpc.InternalLogId; 27 import io.grpc.LoadBalancer; 28 import io.grpc.LoadBalancerProvider; 29 import io.grpc.LoadBalancerRegistry; 30 import io.grpc.NameResolver; 31 import io.grpc.NameResolver.ResolutionResult; 32 import io.grpc.Status; 33 import io.grpc.SynchronizationContext; 34 import io.grpc.SynchronizationContext.ScheduledHandle; 35 import io.grpc.internal.BackoffPolicy; 36 import io.grpc.internal.ExponentialBackoffPolicy; 37 import io.grpc.internal.ObjectPool; 38 import io.grpc.internal.ServiceConfigUtil.PolicySelection; 39 import io.grpc.util.ForwardingLoadBalancerHelper; 40 import io.grpc.util.GracefulSwitchLoadBalancer; 41 import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig; 42 import io.grpc.xds.Bootstrapper.ServerInfo; 43 import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig; 44 import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; 45 import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; 46 import io.grpc.xds.Endpoints.DropOverload; 47 import io.grpc.xds.Endpoints.LbEndpoint; 48 import io.grpc.xds.Endpoints.LocalityLbEndpoints; 49 import io.grpc.xds.EnvoyServerProtoData.FailurePercentageEjection; 50 import io.grpc.xds.EnvoyServerProtoData.OutlierDetection; 51 import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection; 52 import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; 53 import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; 54 import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig; 55 import io.grpc.xds.XdsClient.ResourceWatcher; 56 import io.grpc.xds.XdsEndpointResource.EdsUpdate; 57 import io.grpc.xds.XdsLogger.XdsLogLevel; 58 import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; 59 import java.net.URI; 60 import java.net.URISyntaxException; 61 import java.util.ArrayList; 62 import java.util.Arrays; 63 import java.util.Collections; 64 import java.util.HashMap; 65 import java.util.HashSet; 66 import java.util.List; 67 import java.util.Locale; 68 import java.util.Map; 69 import java.util.Objects; 70 import java.util.Set; 71 import java.util.TreeMap; 72 import java.util.concurrent.ScheduledExecutorService; 73 import java.util.concurrent.TimeUnit; 74 import javax.annotation.Nullable; 75 76 /** 77 * Load balancer for cluster_resolver_experimental LB policy. This LB policy is the child LB policy 78 * of the cds_experimental LB policy and the parent LB policy of the priority_experimental LB 79 * policy in the xDS load balancing hierarchy. This policy resolves endpoints of non-aggregate 80 * clusters (e.g., EDS or Logical DNS) and groups endpoints in priorities and localities to be 81 * used in the downstream LB policies for fine-grained load balancing purposes. 82 */ 83 final class ClusterResolverLoadBalancer extends LoadBalancer { 84 // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode 85 // to an empty locality. 86 private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", ""); 87 private final XdsLogger logger; 88 private final SynchronizationContext syncContext; 89 private final ScheduledExecutorService timeService; 90 private final LoadBalancerRegistry lbRegistry; 91 private final BackoffPolicy.Provider backoffPolicyProvider; 92 private final GracefulSwitchLoadBalancer delegate; 93 private ObjectPool<XdsClient> xdsClientPool; 94 private XdsClient xdsClient; 95 private ClusterResolverConfig config; 96 ClusterResolverLoadBalancer(Helper helper)97 ClusterResolverLoadBalancer(Helper helper) { 98 this(helper, LoadBalancerRegistry.getDefaultRegistry(), 99 new ExponentialBackoffPolicy.Provider()); 100 } 101 102 @VisibleForTesting ClusterResolverLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, BackoffPolicy.Provider backoffPolicyProvider)103 ClusterResolverLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, 104 BackoffPolicy.Provider backoffPolicyProvider) { 105 this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry"); 106 this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); 107 this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); 108 this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService"); 109 delegate = new GracefulSwitchLoadBalancer(helper); 110 logger = XdsLogger.withLogId( 111 InternalLogId.allocate("cluster-resolver-lb", helper.getAuthority())); 112 logger.log(XdsLogLevel.INFO, "Created"); 113 } 114 115 @Override acceptResolvedAddresses(ResolvedAddresses resolvedAddresses)116 public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { 117 logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); 118 if (xdsClientPool == null) { 119 xdsClientPool = resolvedAddresses.getAttributes().get(InternalXdsAttributes.XDS_CLIENT_POOL); 120 xdsClient = xdsClientPool.getObject(); 121 } 122 ClusterResolverConfig config = 123 (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); 124 if (!Objects.equals(this.config, config)) { 125 logger.log(XdsLogLevel.DEBUG, "Config: {0}", config); 126 delegate.switchTo(new ClusterResolverLbStateFactory()); 127 this.config = config; 128 delegate.handleResolvedAddresses(resolvedAddresses); 129 } 130 return true; 131 } 132 133 @Override handleNameResolutionError(Status error)134 public void handleNameResolutionError(Status error) { 135 logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error); 136 delegate.handleNameResolutionError(error); 137 } 138 139 @Override shutdown()140 public void shutdown() { 141 logger.log(XdsLogLevel.INFO, "Shutdown"); 142 delegate.shutdown(); 143 if (xdsClientPool != null) { 144 xdsClientPool.returnObject(xdsClient); 145 } 146 } 147 148 private final class ClusterResolverLbStateFactory extends LoadBalancer.Factory { 149 @Override newLoadBalancer(Helper helper)150 public LoadBalancer newLoadBalancer(Helper helper) { 151 return new ClusterResolverLbState(helper); 152 } 153 } 154 155 /** 156 * The state of a cluster_resolver LB working session. A new instance is created whenever 157 * the cluster_resolver LB receives a new config. The old instance is replaced when the 158 * new one is ready to handle new RPCs. 159 */ 160 private final class ClusterResolverLbState extends LoadBalancer { 161 private final Helper helper; 162 private final List<String> clusters = new ArrayList<>(); 163 private final Map<String, ClusterState> clusterStates = new HashMap<>(); 164 private PolicySelection endpointLbPolicy; 165 private ResolvedAddresses resolvedAddresses; 166 private LoadBalancer childLb; 167 ClusterResolverLbState(Helper helper)168 ClusterResolverLbState(Helper helper) { 169 this.helper = new RefreshableHelper(checkNotNull(helper, "helper")); 170 logger.log(XdsLogLevel.DEBUG, "New ClusterResolverLbState"); 171 } 172 173 @Override acceptResolvedAddresses(ResolvedAddresses resolvedAddresses)174 public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { 175 this.resolvedAddresses = resolvedAddresses; 176 ClusterResolverConfig config = 177 (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); 178 endpointLbPolicy = config.lbPolicy; 179 for (DiscoveryMechanism instance : config.discoveryMechanisms) { 180 clusters.add(instance.cluster); 181 ClusterState state; 182 if (instance.type == DiscoveryMechanism.Type.EDS) { 183 state = new EdsClusterState(instance.cluster, instance.edsServiceName, 184 instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext, 185 instance.outlierDetection); 186 } else { // logical DNS 187 state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName, 188 instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext); 189 } 190 clusterStates.put(instance.cluster, state); 191 state.start(); 192 } 193 return true; 194 } 195 196 @Override handleNameResolutionError(Status error)197 public void handleNameResolutionError(Status error) { 198 if (childLb != null) { 199 childLb.handleNameResolutionError(error); 200 } else { 201 helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); 202 } 203 } 204 205 @Override shutdown()206 public void shutdown() { 207 for (ClusterState state : clusterStates.values()) { 208 state.shutdown(); 209 } 210 if (childLb != null) { 211 childLb.shutdown(); 212 } 213 } 214 handleEndpointResourceUpdate()215 private void handleEndpointResourceUpdate() { 216 List<EquivalentAddressGroup> addresses = new ArrayList<>(); 217 Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>(); 218 List<String> priorities = new ArrayList<>(); // totally ordered priority list 219 220 Status endpointNotFound = Status.OK; 221 for (String cluster : clusters) { 222 ClusterState state = clusterStates.get(cluster); 223 // Propagate endpoints to the child LB policy only after all clusters have been resolved. 224 if (!state.resolved && state.status.isOk()) { 225 return; 226 } 227 if (state.result != null) { 228 addresses.addAll(state.result.addresses); 229 priorityChildConfigs.putAll(state.result.priorityChildConfigs); 230 priorities.addAll(state.result.priorities); 231 } else { 232 endpointNotFound = state.status; 233 } 234 } 235 if (addresses.isEmpty()) { 236 if (endpointNotFound.isOk()) { 237 endpointNotFound = Status.UNAVAILABLE.withDescription( 238 "No usable endpoint from cluster(s): " + clusters); 239 } else { 240 endpointNotFound = 241 Status.UNAVAILABLE.withCause(endpointNotFound.getCause()) 242 .withDescription(endpointNotFound.getDescription()); 243 } 244 helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(endpointNotFound)); 245 if (childLb != null) { 246 childLb.shutdown(); 247 childLb = null; 248 } 249 return; 250 } 251 PriorityLbConfig childConfig = 252 new PriorityLbConfig(Collections.unmodifiableMap(priorityChildConfigs), 253 Collections.unmodifiableList(priorities)); 254 if (childLb == null) { 255 childLb = lbRegistry.getProvider(PRIORITY_POLICY_NAME).newLoadBalancer(helper); 256 } 257 childLb.handleResolvedAddresses( 258 resolvedAddresses.toBuilder() 259 .setLoadBalancingPolicyConfig(childConfig) 260 .setAddresses(Collections.unmodifiableList(addresses)) 261 .build()); 262 } 263 handleEndpointResolutionError()264 private void handleEndpointResolutionError() { 265 boolean allInError = true; 266 Status error = null; 267 for (String cluster : clusters) { 268 ClusterState state = clusterStates.get(cluster); 269 if (state.status.isOk()) { 270 allInError = false; 271 } else { 272 error = state.status; 273 } 274 } 275 if (allInError) { 276 if (childLb != null) { 277 childLb.handleNameResolutionError(error); 278 } else { 279 helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); 280 } 281 } 282 } 283 284 /** 285 * Wires re-resolution requests from downstream LB policies with DNS resolver. 286 */ 287 private final class RefreshableHelper extends ForwardingLoadBalancerHelper { 288 private final Helper delegate; 289 RefreshableHelper(Helper delegate)290 private RefreshableHelper(Helper delegate) { 291 this.delegate = checkNotNull(delegate, "delegate"); 292 } 293 294 @Override refreshNameResolution()295 public void refreshNameResolution() { 296 for (ClusterState state : clusterStates.values()) { 297 if (state instanceof LogicalDnsClusterState) { 298 ((LogicalDnsClusterState) state).refresh(); 299 } 300 } 301 } 302 303 @Override delegate()304 protected Helper delegate() { 305 return delegate; 306 } 307 } 308 309 /** 310 * Resolution state of an underlying cluster. 311 */ 312 private abstract class ClusterState { 313 // Name of the cluster to be resolved. 314 protected final String name; 315 @Nullable 316 protected final ServerInfo lrsServerInfo; 317 @Nullable 318 protected final Long maxConcurrentRequests; 319 @Nullable 320 protected final UpstreamTlsContext tlsContext; 321 @Nullable 322 protected final OutlierDetection outlierDetection; 323 // Resolution status, may contain most recent error encountered. 324 protected Status status = Status.OK; 325 // True if has received resolution result. 326 protected boolean resolved; 327 // Most recently resolved addresses and config, or null if resource not exists. 328 @Nullable 329 protected ClusterResolutionResult result; 330 331 protected boolean shutdown; 332 ClusterState(String name, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, @Nullable OutlierDetection outlierDetection)333 private ClusterState(String name, @Nullable ServerInfo lrsServerInfo, 334 @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, 335 @Nullable OutlierDetection outlierDetection) { 336 this.name = name; 337 this.lrsServerInfo = lrsServerInfo; 338 this.maxConcurrentRequests = maxConcurrentRequests; 339 this.tlsContext = tlsContext; 340 this.outlierDetection = outlierDetection; 341 } 342 start()343 abstract void start(); 344 shutdown()345 void shutdown() { 346 shutdown = true; 347 } 348 } 349 350 private final class EdsClusterState extends ClusterState implements ResourceWatcher<EdsUpdate> { 351 @Nullable 352 private final String edsServiceName; 353 private Map<Locality, String> localityPriorityNames = Collections.emptyMap(); 354 int priorityNameGenId = 1; 355 EdsClusterState(String name, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, @Nullable OutlierDetection outlierDetection)356 private EdsClusterState(String name, @Nullable String edsServiceName, 357 @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, 358 @Nullable UpstreamTlsContext tlsContext, @Nullable OutlierDetection outlierDetection) { 359 super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, outlierDetection); 360 this.edsServiceName = edsServiceName; 361 } 362 363 @Override start()364 void start() { 365 String resourceName = edsServiceName != null ? edsServiceName : name; 366 logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName); 367 xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), resourceName, this); 368 } 369 370 @Override shutdown()371 protected void shutdown() { 372 super.shutdown(); 373 String resourceName = edsServiceName != null ? edsServiceName : name; 374 logger.log(XdsLogLevel.INFO, "Stop watching EDS resource {0}", resourceName); 375 xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), resourceName, this); 376 } 377 378 @Override onChanged(final EdsUpdate update)379 public void onChanged(final EdsUpdate update) { 380 class EndpointsUpdated implements Runnable { 381 @Override 382 public void run() { 383 if (shutdown) { 384 return; 385 } 386 logger.log(XdsLogLevel.DEBUG, "Received endpoint update {0}", update); 387 if (logger.isLoggable(XdsLogLevel.INFO)) { 388 logger.log(XdsLogLevel.INFO, "Cluster {0}: {1} localities, {2} drop categories", 389 update.clusterName, update.localityLbEndpointsMap.size(), 390 update.dropPolicies.size()); 391 } 392 Map<Locality, LocalityLbEndpoints> localityLbEndpoints = 393 update.localityLbEndpointsMap; 394 List<DropOverload> dropOverloads = update.dropPolicies; 395 List<EquivalentAddressGroup> addresses = new ArrayList<>(); 396 Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>(); 397 List<String> sortedPriorityNames = generatePriorityNames(name, localityLbEndpoints); 398 for (Locality locality : localityLbEndpoints.keySet()) { 399 LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality); 400 String priorityName = localityPriorityNames.get(locality); 401 boolean discard = true; 402 for (LbEndpoint endpoint : localityLbInfo.endpoints()) { 403 if (endpoint.isHealthy()) { 404 discard = false; 405 long weight = localityLbInfo.localityWeight(); 406 if (endpoint.loadBalancingWeight() != 0) { 407 weight *= endpoint.loadBalancingWeight(); 408 } 409 Attributes attr = 410 endpoint.eag().getAttributes().toBuilder() 411 .set(InternalXdsAttributes.ATTR_LOCALITY, locality) 412 .set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT, 413 localityLbInfo.localityWeight()) 414 .set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight) 415 .build(); 416 EquivalentAddressGroup eag = new EquivalentAddressGroup( 417 endpoint.eag().getAddresses(), attr); 418 eag = AddressFilter.setPathFilter( 419 eag, Arrays.asList(priorityName, localityName(locality))); 420 addresses.add(eag); 421 } 422 } 423 if (discard) { 424 logger.log(XdsLogLevel.INFO, 425 "Discard locality {0} with 0 healthy endpoints", locality); 426 continue; 427 } 428 if (!prioritizedLocalityWeights.containsKey(priorityName)) { 429 prioritizedLocalityWeights.put(priorityName, new HashMap<Locality, Integer>()); 430 } 431 prioritizedLocalityWeights.get(priorityName).put( 432 locality, localityLbInfo.localityWeight()); 433 } 434 if (prioritizedLocalityWeights.isEmpty()) { 435 // Will still update the result, as if the cluster resource is revoked. 436 logger.log(XdsLogLevel.INFO, 437 "Cluster {0} has no usable priority/locality/endpoint", update.clusterName); 438 } 439 sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet()); 440 Map<String, PriorityChildConfig> priorityChildConfigs = 441 generateEdsBasedPriorityChildConfigs( 442 name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext, 443 outlierDetection, endpointLbPolicy, lbRegistry, prioritizedLocalityWeights, 444 dropOverloads); 445 status = Status.OK; 446 resolved = true; 447 result = new ClusterResolutionResult(addresses, priorityChildConfigs, 448 sortedPriorityNames); 449 handleEndpointResourceUpdate(); 450 } 451 } 452 453 syncContext.execute(new EndpointsUpdated()); 454 } 455 generatePriorityNames(String name, Map<Locality, LocalityLbEndpoints> localityLbEndpoints)456 private List<String> generatePriorityNames(String name, 457 Map<Locality, LocalityLbEndpoints> localityLbEndpoints) { 458 TreeMap<Integer, List<Locality>> todo = new TreeMap<>(); 459 for (Locality locality : localityLbEndpoints.keySet()) { 460 int priority = localityLbEndpoints.get(locality).priority(); 461 if (!todo.containsKey(priority)) { 462 todo.put(priority, new ArrayList<>()); 463 } 464 todo.get(priority).add(locality); 465 } 466 Map<Locality, String> newNames = new HashMap<>(); 467 Set<String> usedNames = new HashSet<>(); 468 List<String> ret = new ArrayList<>(); 469 for (Integer priority: todo.keySet()) { 470 String foundName = ""; 471 for (Locality locality : todo.get(priority)) { 472 if (localityPriorityNames.containsKey(locality) 473 && usedNames.add(localityPriorityNames.get(locality))) { 474 foundName = localityPriorityNames.get(locality); 475 break; 476 } 477 } 478 if ("".equals(foundName)) { 479 foundName = String.format(Locale.US, "%s[child%d]", name, priorityNameGenId++); 480 } 481 for (Locality locality : todo.get(priority)) { 482 newNames.put(locality, foundName); 483 } 484 ret.add(foundName); 485 } 486 localityPriorityNames = newNames; 487 return ret; 488 } 489 490 @Override onResourceDoesNotExist(final String resourceName)491 public void onResourceDoesNotExist(final String resourceName) { 492 syncContext.execute(new Runnable() { 493 @Override 494 public void run() { 495 if (shutdown) { 496 return; 497 } 498 logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName); 499 status = Status.OK; 500 resolved = true; 501 result = null; // resource revoked 502 handleEndpointResourceUpdate(); 503 } 504 }); 505 } 506 507 @Override onError(final Status error)508 public void onError(final Status error) { 509 syncContext.execute(new Runnable() { 510 @Override 511 public void run() { 512 if (shutdown) { 513 return; 514 } 515 String resourceName = edsServiceName != null ? edsServiceName : name; 516 status = Status.UNAVAILABLE 517 .withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s", 518 resourceName, error.getCode(), error.getDescription())) 519 .withCause(error.getCause()); 520 logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error); 521 handleEndpointResolutionError(); 522 } 523 }); 524 } 525 } 526 527 private final class LogicalDnsClusterState extends ClusterState { 528 private final String dnsHostName; 529 private final NameResolver.Factory nameResolverFactory; 530 private final NameResolver.Args nameResolverArgs; 531 private NameResolver resolver; 532 @Nullable 533 private BackoffPolicy backoffPolicy; 534 @Nullable 535 private ScheduledHandle scheduledRefresh; 536 LogicalDnsClusterState(String name, String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext)537 private LogicalDnsClusterState(String name, String dnsHostName, 538 @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, 539 @Nullable UpstreamTlsContext tlsContext) { 540 super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, null); 541 this.dnsHostName = checkNotNull(dnsHostName, "dnsHostName"); 542 nameResolverFactory = 543 checkNotNull(helper.getNameResolverRegistry().asFactory(), "nameResolverFactory"); 544 nameResolverArgs = checkNotNull(helper.getNameResolverArgs(), "nameResolverArgs"); 545 } 546 547 @Override start()548 void start() { 549 URI uri; 550 try { 551 uri = new URI("dns", "", "/" + dnsHostName, null); 552 } catch (URISyntaxException e) { 553 status = Status.INTERNAL.withDescription( 554 "Bug, invalid URI creation: " + dnsHostName).withCause(e); 555 handleEndpointResolutionError(); 556 return; 557 } 558 resolver = nameResolverFactory.newNameResolver(uri, nameResolverArgs); 559 if (resolver == null) { 560 status = Status.INTERNAL.withDescription("Xds cluster resolver lb for logical DNS " 561 + "cluster [" + name + "] cannot find DNS resolver with uri:" + uri); 562 handleEndpointResolutionError(); 563 return; 564 } 565 resolver.start(new NameResolverListener()); 566 } 567 refresh()568 void refresh() { 569 if (resolver == null) { 570 return; 571 } 572 cancelBackoff(); 573 resolver.refresh(); 574 } 575 576 @Override shutdown()577 void shutdown() { 578 super.shutdown(); 579 if (resolver != null) { 580 resolver.shutdown(); 581 } 582 cancelBackoff(); 583 } 584 cancelBackoff()585 private void cancelBackoff() { 586 if (scheduledRefresh != null) { 587 scheduledRefresh.cancel(); 588 scheduledRefresh = null; 589 backoffPolicy = null; 590 } 591 } 592 593 private class DelayedNameResolverRefresh implements Runnable { 594 @Override run()595 public void run() { 596 scheduledRefresh = null; 597 if (!shutdown) { 598 resolver.refresh(); 599 } 600 } 601 } 602 603 private class NameResolverListener extends NameResolver.Listener2 { 604 @Override onResult(final ResolutionResult resolutionResult)605 public void onResult(final ResolutionResult resolutionResult) { 606 class NameResolved implements Runnable { 607 @Override 608 public void run() { 609 if (shutdown) { 610 return; 611 } 612 backoffPolicy = null; // reset backoff sequence if succeeded 613 // Arbitrary priority notation for all DNS-resolved endpoints. 614 String priorityName = priorityName(name, 0); // value doesn't matter 615 List<EquivalentAddressGroup> addresses = new ArrayList<>(); 616 for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) { 617 // No weight attribute is attached, all endpoint-level LB policy should be able 618 // to handle such it. 619 Attributes attr = eag.getAttributes().toBuilder().set( 620 InternalXdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY).build(); 621 eag = new EquivalentAddressGroup(eag.getAddresses(), attr); 622 eag = AddressFilter.setPathFilter( 623 eag, Arrays.asList(priorityName, LOGICAL_DNS_CLUSTER_LOCALITY.toString())); 624 addresses.add(eag); 625 } 626 PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig( 627 name, lrsServerInfo, maxConcurrentRequests, tlsContext, lbRegistry, 628 Collections.<DropOverload>emptyList()); 629 status = Status.OK; 630 resolved = true; 631 result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig); 632 handleEndpointResourceUpdate(); 633 } 634 } 635 636 syncContext.execute(new NameResolved()); 637 } 638 639 @Override onError(final Status error)640 public void onError(final Status error) { 641 syncContext.execute(new Runnable() { 642 @Override 643 public void run() { 644 if (shutdown) { 645 return; 646 } 647 status = error; 648 // NameResolver.Listener API cannot distinguish between address-not-found and 649 // transient errors. If the error occurs in the first resolution, treat it as 650 // address not found. Otherwise, either there is previously resolved addresses 651 // previously encountered error, propagate the error to downstream/upstream and 652 // let downstream/upstream handle it. 653 if (!resolved) { 654 resolved = true; 655 handleEndpointResourceUpdate(); 656 } else { 657 handleEndpointResolutionError(); 658 } 659 if (scheduledRefresh != null && scheduledRefresh.isPending()) { 660 return; 661 } 662 if (backoffPolicy == null) { 663 backoffPolicy = backoffPolicyProvider.get(); 664 } 665 long delayNanos = backoffPolicy.nextBackoffNanos(); 666 logger.log(XdsLogLevel.DEBUG, 667 "Logical DNS resolver for cluster {0} encountered name resolution " 668 + "error: {1}, scheduling DNS resolution backoff for {2} ns", 669 name, error, delayNanos); 670 scheduledRefresh = 671 syncContext.schedule( 672 new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS, 673 timeService); 674 } 675 }); 676 } 677 } 678 } 679 } 680 681 private static class ClusterResolutionResult { 682 // Endpoint addresses. 683 private final List<EquivalentAddressGroup> addresses; 684 // Config (include load balancing policy/config) for each priority in the cluster. 685 private final Map<String, PriorityChildConfig> priorityChildConfigs; 686 // List of priority names ordered in descending priorities. 687 private final List<String> priorities; 688 ClusterResolutionResult(List<EquivalentAddressGroup> addresses, String priority, PriorityChildConfig config)689 ClusterResolutionResult(List<EquivalentAddressGroup> addresses, String priority, 690 PriorityChildConfig config) { 691 this(addresses, Collections.singletonMap(priority, config), 692 Collections.singletonList(priority)); 693 } 694 ClusterResolutionResult(List<EquivalentAddressGroup> addresses, Map<String, PriorityChildConfig> configs, List<String> priorities)695 ClusterResolutionResult(List<EquivalentAddressGroup> addresses, 696 Map<String, PriorityChildConfig> configs, List<String> priorities) { 697 this.addresses = addresses; 698 this.priorityChildConfigs = configs; 699 this.priorities = priorities; 700 } 701 } 702 703 /** 704 * Generates the config to be used in the priority LB policy for the single priority of 705 * logical DNS cluster. 706 * 707 * <p>priority LB -> cluster_impl LB (single hardcoded priority) -> pick_first 708 */ generateDnsBasedPriorityChildConfig( String cluster, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, LoadBalancerRegistry lbRegistry, List<DropOverload> dropOverloads)709 private static PriorityChildConfig generateDnsBasedPriorityChildConfig( 710 String cluster, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, 711 @Nullable UpstreamTlsContext tlsContext, LoadBalancerRegistry lbRegistry, 712 List<DropOverload> dropOverloads) { 713 // Override endpoint-level LB policy with pick_first for logical DNS cluster. 714 PolicySelection endpointLbPolicy = 715 new PolicySelection(lbRegistry.getProvider("pick_first"), null); 716 ClusterImplConfig clusterImplConfig = 717 new ClusterImplConfig(cluster, null, lrsServerInfo, maxConcurrentRequests, 718 dropOverloads, endpointLbPolicy, tlsContext); 719 LoadBalancerProvider clusterImplLbProvider = 720 lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); 721 PolicySelection clusterImplPolicy = 722 new PolicySelection(clusterImplLbProvider, clusterImplConfig); 723 return new PriorityChildConfig(clusterImplPolicy, false /* ignoreReresolution*/); 724 } 725 726 /** 727 * Generates configs to be used in the priority LB policy for priorities in an EDS cluster. 728 * 729 * <p>priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB 730 * -> round_robin / least_request_experimental (one per locality)) / ring_hash_experimental 731 */ generateEdsBasedPriorityChildConfigs( String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, @Nullable OutlierDetection outlierDetection, PolicySelection endpointLbPolicy, LoadBalancerRegistry lbRegistry, Map<String, Map<Locality, Integer>> prioritizedLocalityWeights, List<DropOverload> dropOverloads)732 private static Map<String, PriorityChildConfig> generateEdsBasedPriorityChildConfigs( 733 String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, 734 @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, 735 @Nullable OutlierDetection outlierDetection, PolicySelection endpointLbPolicy, 736 LoadBalancerRegistry lbRegistry, Map<String, 737 Map<Locality, Integer>> prioritizedLocalityWeights, List<DropOverload> dropOverloads) { 738 Map<String, PriorityChildConfig> configs = new HashMap<>(); 739 for (String priority : prioritizedLocalityWeights.keySet()) { 740 ClusterImplConfig clusterImplConfig = 741 new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests, 742 dropOverloads, endpointLbPolicy, tlsContext); 743 LoadBalancerProvider clusterImplLbProvider = 744 lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); 745 PolicySelection priorityChildPolicy = 746 new PolicySelection(clusterImplLbProvider, clusterImplConfig); 747 748 // If outlier detection has been configured we wrap the child policy in the outlier detection 749 // load balancer. 750 if (outlierDetection != null) { 751 LoadBalancerProvider outlierDetectionProvider = lbRegistry.getProvider( 752 "outlier_detection_experimental"); 753 priorityChildPolicy = new PolicySelection(outlierDetectionProvider, 754 buildOutlierDetectionLbConfig(outlierDetection, priorityChildPolicy)); 755 } 756 757 PriorityChildConfig priorityChildConfig = 758 new PriorityChildConfig(priorityChildPolicy, true /* ignoreReresolution */); 759 configs.put(priority, priorityChildConfig); 760 } 761 return configs; 762 } 763 764 /** 765 * Converts {@link OutlierDetection} that represents the xDS configuration to {@link 766 * OutlierDetectionLoadBalancerConfig} that the {@link io.grpc.util.OutlierDetectionLoadBalancer} 767 * understands. 768 */ buildOutlierDetectionLbConfig( OutlierDetection outlierDetection, PolicySelection childPolicy)769 private static OutlierDetectionLoadBalancerConfig buildOutlierDetectionLbConfig( 770 OutlierDetection outlierDetection, PolicySelection childPolicy) { 771 OutlierDetectionLoadBalancerConfig.Builder configBuilder 772 = new OutlierDetectionLoadBalancerConfig.Builder(); 773 774 configBuilder.setChildPolicy(childPolicy); 775 776 if (outlierDetection.intervalNanos() != null) { 777 configBuilder.setIntervalNanos(outlierDetection.intervalNanos()); 778 } 779 if (outlierDetection.baseEjectionTimeNanos() != null) { 780 configBuilder.setBaseEjectionTimeNanos(outlierDetection.baseEjectionTimeNanos()); 781 } 782 if (outlierDetection.maxEjectionTimeNanos() != null) { 783 configBuilder.setMaxEjectionTimeNanos(outlierDetection.maxEjectionTimeNanos()); 784 } 785 if (outlierDetection.maxEjectionPercent() != null) { 786 configBuilder.setMaxEjectionPercent(outlierDetection.maxEjectionPercent()); 787 } 788 789 SuccessRateEjection successRate = outlierDetection.successRateEjection(); 790 if (successRate != null) { 791 OutlierDetectionLoadBalancerConfig.SuccessRateEjection.Builder 792 successRateConfigBuilder = new OutlierDetectionLoadBalancerConfig 793 .SuccessRateEjection.Builder(); 794 795 if (successRate.stdevFactor() != null) { 796 successRateConfigBuilder.setStdevFactor(successRate.stdevFactor()); 797 } 798 if (successRate.enforcementPercentage() != null) { 799 successRateConfigBuilder.setEnforcementPercentage(successRate.enforcementPercentage()); 800 } 801 if (successRate.minimumHosts() != null) { 802 successRateConfigBuilder.setMinimumHosts(successRate.minimumHosts()); 803 } 804 if (successRate.requestVolume() != null) { 805 successRateConfigBuilder.setRequestVolume(successRate.requestVolume()); 806 } 807 808 configBuilder.setSuccessRateEjection(successRateConfigBuilder.build()); 809 } 810 811 FailurePercentageEjection failurePercentage = outlierDetection.failurePercentageEjection(); 812 if (failurePercentage != null) { 813 OutlierDetectionLoadBalancerConfig.FailurePercentageEjection.Builder 814 failurePercentageConfigBuilder = new OutlierDetectionLoadBalancerConfig 815 .FailurePercentageEjection.Builder(); 816 817 if (failurePercentage.threshold() != null) { 818 failurePercentageConfigBuilder.setThreshold(failurePercentage.threshold()); 819 } 820 if (failurePercentage.enforcementPercentage() != null) { 821 failurePercentageConfigBuilder.setEnforcementPercentage( 822 failurePercentage.enforcementPercentage()); 823 } 824 if (failurePercentage.minimumHosts() != null) { 825 failurePercentageConfigBuilder.setMinimumHosts(failurePercentage.minimumHosts()); 826 } 827 if (failurePercentage.requestVolume() != null) { 828 failurePercentageConfigBuilder.setRequestVolume(failurePercentage.requestVolume()); 829 } 830 831 configBuilder.setFailurePercentageEjection(failurePercentageConfigBuilder.build()); 832 } 833 834 return configBuilder.build(); 835 } 836 837 /** 838 * Generates a string that represents the priority in the LB policy config. The string is unique 839 * across priorities in all clusters and priorityName(c, p1) < priorityName(c, p2) iff p1 < p2. 840 * The ordering is undefined for priorities in different clusters. 841 */ priorityName(String cluster, int priority)842 private static String priorityName(String cluster, int priority) { 843 return cluster + "[child" + priority + "]"; 844 } 845 846 /** 847 * Generates a string that represents the locality in the LB policy config. The string is unique 848 * across all localities in all clusters. 849 */ localityName(Locality locality)850 private static String localityName(Locality locality) { 851 return locality.toString(); 852 } 853 } 854