1 /* 2 * Copyright 2019 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME; 22 23 import com.google.common.annotations.VisibleForTesting; 24 import com.google.common.base.Joiner; 25 import com.google.common.base.Strings; 26 import com.google.common.collect.ImmutableList; 27 import com.google.common.collect.ImmutableMap; 28 import com.google.common.collect.Sets; 29 import com.google.gson.Gson; 30 import com.google.protobuf.util.Durations; 31 import io.grpc.Attributes; 32 import io.grpc.CallOptions; 33 import io.grpc.Channel; 34 import io.grpc.ClientCall; 35 import io.grpc.ClientInterceptor; 36 import io.grpc.ClientInterceptors; 37 import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; 38 import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; 39 import io.grpc.InternalConfigSelector; 40 import io.grpc.InternalLogId; 41 import io.grpc.LoadBalancer.PickSubchannelArgs; 42 import io.grpc.Metadata; 43 import io.grpc.MethodDescriptor; 44 import io.grpc.NameResolver; 45 import io.grpc.Status; 46 import io.grpc.Status.Code; 47 import io.grpc.SynchronizationContext; 48 import io.grpc.internal.GrpcUtil; 49 import io.grpc.internal.ObjectPool; 50 import io.grpc.xds.Bootstrapper.AuthorityInfo; 51 import io.grpc.xds.Bootstrapper.BootstrapInfo; 52 import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig; 53 import io.grpc.xds.Filter.ClientInterceptorBuilder; 54 import io.grpc.xds.Filter.FilterConfig; 55 import io.grpc.xds.Filter.NamedFilterConfig; 56 import io.grpc.xds.RouteLookupServiceClusterSpecifierPlugin.RlsPluginConfig; 57 import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl; 58 import io.grpc.xds.VirtualHost.Route; 59 import io.grpc.xds.VirtualHost.Route.RouteAction; 60 import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight; 61 import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy; 62 import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy; 63 import io.grpc.xds.XdsClient.ResourceWatcher; 64 import io.grpc.xds.XdsListenerResource.LdsUpdate; 65 import io.grpc.xds.XdsLogger.XdsLogLevel; 66 import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider; 67 import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; 68 import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; 69 import java.util.ArrayList; 70 import java.util.Collections; 71 import java.util.HashMap; 72 import java.util.HashSet; 73 import java.util.List; 74 import java.util.Locale; 75 import java.util.Map; 76 import java.util.Objects; 77 import java.util.Set; 78 import java.util.concurrent.ConcurrentHashMap; 79 import java.util.concurrent.ConcurrentMap; 80 import java.util.concurrent.ScheduledExecutorService; 81 import java.util.concurrent.atomic.AtomicInteger; 82 import javax.annotation.Nullable; 83 84 /** 85 * A {@link NameResolver} for resolving gRPC target names with "xds:" scheme. 86 * 87 * <p>Resolving a gRPC target involves contacting the control plane management server via xDS 88 * protocol to retrieve service information and produce a service config to the caller. 89 * 90 * @see XdsNameResolverProvider 91 */ 92 final class XdsNameResolver extends NameResolver { 93 94 static final CallOptions.Key<String> CLUSTER_SELECTION_KEY = 95 CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY"); 96 static final CallOptions.Key<Long> RPC_HASH_KEY = 97 CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY"); 98 @VisibleForTesting 99 static boolean enableTimeout = 100 Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT")) 101 || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT")); 102 103 private final InternalLogId logId; 104 private final XdsLogger logger; 105 @Nullable 106 private final String targetAuthority; 107 private final String serviceAuthority; 108 private final String overrideAuthority; 109 private final ServiceConfigParser serviceConfigParser; 110 private final SynchronizationContext syncContext; 111 private final ScheduledExecutorService scheduler; 112 private final XdsClientPoolFactory xdsClientPoolFactory; 113 private final ThreadSafeRandom random; 114 private final FilterRegistry filterRegistry; 115 private final XxHash64 hashFunc = XxHash64.INSTANCE; 116 // Clusters (with reference counts) to which new/existing requests can be/are routed. 117 // put()/remove() must be called in SyncContext, and get() can be called in any thread. 118 private final ConcurrentMap<String, ClusterRefState> clusterRefs = new ConcurrentHashMap<>(); 119 private final ConfigSelector configSelector = new ConfigSelector(); 120 private final long randomChannelId; 121 122 private volatile RoutingConfig routingConfig = RoutingConfig.empty; 123 private Listener2 listener; 124 private ObjectPool<XdsClient> xdsClientPool; 125 private XdsClient xdsClient; 126 private CallCounterProvider callCounterProvider; 127 private ResolveState resolveState; 128 // Workaround for https://github.com/grpc/grpc-java/issues/8886 . This should be handled in 129 // XdsClient instead of here. 130 private boolean receivedConfig; 131 XdsNameResolver( @ullable String targetAuthority, String name, @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser, SynchronizationContext syncContext, ScheduledExecutorService scheduler, @Nullable Map<String, ?> bootstrapOverride)132 XdsNameResolver( 133 @Nullable String targetAuthority, String name, @Nullable String overrideAuthority, 134 ServiceConfigParser serviceConfigParser, 135 SynchronizationContext syncContext, ScheduledExecutorService scheduler, 136 @Nullable Map<String, ?> bootstrapOverride) { 137 this(targetAuthority, name, overrideAuthority, serviceConfigParser, syncContext, scheduler, 138 SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance, 139 FilterRegistry.getDefaultRegistry(), bootstrapOverride); 140 } 141 142 @VisibleForTesting XdsNameResolver( @ullable String targetAuthority, String name, @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser, SynchronizationContext syncContext, ScheduledExecutorService scheduler, XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random, FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride)143 XdsNameResolver( 144 @Nullable String targetAuthority, String name, @Nullable String overrideAuthority, 145 ServiceConfigParser serviceConfigParser, 146 SynchronizationContext syncContext, ScheduledExecutorService scheduler, 147 XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random, 148 FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride) { 149 this.targetAuthority = targetAuthority; 150 serviceAuthority = GrpcUtil.checkAuthority(checkNotNull(name, "name")); 151 this.overrideAuthority = overrideAuthority; 152 this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser"); 153 this.syncContext = checkNotNull(syncContext, "syncContext"); 154 this.scheduler = checkNotNull(scheduler, "scheduler"); 155 this.xdsClientPoolFactory = bootstrapOverride == null ? checkNotNull(xdsClientPoolFactory, 156 "xdsClientPoolFactory") : new SharedXdsClientPoolProvider(); 157 this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride); 158 this.random = checkNotNull(random, "random"); 159 this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry"); 160 randomChannelId = random.nextLong(); 161 logId = InternalLogId.allocate("xds-resolver", name); 162 logger = XdsLogger.withLogId(logId); 163 logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name); 164 } 165 166 @Override getServiceAuthority()167 public String getServiceAuthority() { 168 return serviceAuthority; 169 } 170 171 @Override start(Listener2 listener)172 public void start(Listener2 listener) { 173 this.listener = checkNotNull(listener, "listener"); 174 try { 175 xdsClientPool = xdsClientPoolFactory.getOrCreate(); 176 } catch (Exception e) { 177 listener.onError( 178 Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e)); 179 return; 180 } 181 xdsClient = xdsClientPool.getObject(); 182 BootstrapInfo bootstrapInfo = xdsClient.getBootstrapInfo(); 183 String listenerNameTemplate; 184 if (targetAuthority == null) { 185 listenerNameTemplate = bootstrapInfo.clientDefaultListenerResourceNameTemplate(); 186 } else { 187 AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(targetAuthority); 188 if (authorityInfo == null) { 189 listener.onError(Status.INVALID_ARGUMENT.withDescription( 190 "invalid target URI: target authority not found in the bootstrap")); 191 return; 192 } 193 listenerNameTemplate = authorityInfo.clientListenerResourceNameTemplate(); 194 } 195 String replacement = serviceAuthority; 196 if (listenerNameTemplate.startsWith(XDSTP_SCHEME)) { 197 replacement = XdsClient.percentEncodePath(replacement); 198 } 199 String ldsResourceName = expandPercentS(listenerNameTemplate, replacement); 200 if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl()) 201 ) { 202 listener.onError(Status.INVALID_ARGUMENT.withDescription( 203 "invalid listener resource URI for service authority: " + serviceAuthority)); 204 return; 205 } 206 ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName); 207 callCounterProvider = SharedCallCounterMap.getInstance(); 208 resolveState = new ResolveState(ldsResourceName); 209 resolveState.start(); 210 } 211 expandPercentS(String template, String replacement)212 private static String expandPercentS(String template, String replacement) { 213 return template.replace("%s", replacement); 214 } 215 216 @Override shutdown()217 public void shutdown() { 218 logger.log(XdsLogLevel.INFO, "Shutdown"); 219 if (resolveState != null) { 220 resolveState.stop(); 221 } 222 if (xdsClient != null) { 223 xdsClient = xdsClientPool.returnObject(xdsClient); 224 } 225 } 226 227 @VisibleForTesting generateServiceConfigWithMethodConfig( @ullable Long timeoutNano, @Nullable RetryPolicy retryPolicy)228 static Map<String, ?> generateServiceConfigWithMethodConfig( 229 @Nullable Long timeoutNano, @Nullable RetryPolicy retryPolicy) { 230 if (timeoutNano == null 231 && (retryPolicy == null || retryPolicy.retryableStatusCodes().isEmpty())) { 232 return Collections.emptyMap(); 233 } 234 ImmutableMap.Builder<String, Object> methodConfig = ImmutableMap.builder(); 235 methodConfig.put( 236 "name", Collections.singletonList(Collections.emptyMap())); 237 if (retryPolicy != null && !retryPolicy.retryableStatusCodes().isEmpty()) { 238 ImmutableMap.Builder<String, Object> rawRetryPolicy = ImmutableMap.builder(); 239 rawRetryPolicy.put("maxAttempts", (double) retryPolicy.maxAttempts()); 240 rawRetryPolicy.put("initialBackoff", Durations.toString(retryPolicy.initialBackoff())); 241 rawRetryPolicy.put("maxBackoff", Durations.toString(retryPolicy.maxBackoff())); 242 rawRetryPolicy.put("backoffMultiplier", 2D); 243 List<String> codes = new ArrayList<>(retryPolicy.retryableStatusCodes().size()); 244 for (Code code : retryPolicy.retryableStatusCodes()) { 245 codes.add(code.name()); 246 } 247 rawRetryPolicy.put( 248 "retryableStatusCodes", Collections.unmodifiableList(codes)); 249 if (retryPolicy.perAttemptRecvTimeout() != null) { 250 rawRetryPolicy.put( 251 "perAttemptRecvTimeout", Durations.toString(retryPolicy.perAttemptRecvTimeout())); 252 } 253 methodConfig.put("retryPolicy", rawRetryPolicy.buildOrThrow()); 254 } 255 if (timeoutNano != null) { 256 String timeout = timeoutNano / 1_000_000_000.0 + "s"; 257 methodConfig.put("timeout", timeout); 258 } 259 return Collections.singletonMap( 260 "methodConfig", Collections.singletonList(methodConfig.buildOrThrow())); 261 } 262 263 @VisibleForTesting getXdsClient()264 XdsClient getXdsClient() { 265 return xdsClient; 266 } 267 268 // called in syncContext updateResolutionResult()269 private void updateResolutionResult() { 270 syncContext.throwIfNotInThisSynchronizationContext(); 271 272 ImmutableMap.Builder<String, Object> childPolicy = new ImmutableMap.Builder<>(); 273 for (String name : clusterRefs.keySet()) { 274 Map<String, ?> lbPolicy = clusterRefs.get(name).toLbPolicy(); 275 childPolicy.put(name, ImmutableMap.of("lbPolicy", ImmutableList.of(lbPolicy))); 276 } 277 Map<String, ?> rawServiceConfig = ImmutableMap.of( 278 "loadBalancingConfig", 279 ImmutableList.of(ImmutableMap.of( 280 XdsLbPolicies.CLUSTER_MANAGER_POLICY_NAME, 281 ImmutableMap.of("childPolicy", childPolicy.buildOrThrow())))); 282 283 if (logger.isLoggable(XdsLogLevel.INFO)) { 284 logger.log( 285 XdsLogLevel.INFO, "Generated service config:\n{0}", new Gson().toJson(rawServiceConfig)); 286 } 287 ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig); 288 Attributes attrs = 289 Attributes.newBuilder() 290 .set(InternalXdsAttributes.XDS_CLIENT_POOL, xdsClientPool) 291 .set(InternalXdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider) 292 .set(InternalConfigSelector.KEY, configSelector) 293 .build(); 294 ResolutionResult result = 295 ResolutionResult.newBuilder() 296 .setAttributes(attrs) 297 .setServiceConfig(parsedServiceConfig) 298 .build(); 299 listener.onResult(result); 300 receivedConfig = true; 301 } 302 303 /** 304 * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with 305 * case-insensitive. 306 * 307 * <p>Wildcard pattern rules: 308 * <ol> 309 * <li>A single asterisk (*) matches any domain.</li> 310 * <li>Asterisk (*) is only permitted in the left-most or the right-most part of the pattern, 311 * but not both.</li> 312 * </ol> 313 */ 314 @VisibleForTesting matchHostName(String hostName, String pattern)315 static boolean matchHostName(String hostName, String pattern) { 316 checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."), 317 "Invalid host name"); 318 checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."), 319 "Invalid pattern/domain name"); 320 321 hostName = hostName.toLowerCase(Locale.US); 322 pattern = pattern.toLowerCase(Locale.US); 323 // hostName and pattern are now in lower case -- domain names are case-insensitive. 324 325 if (!pattern.contains("*")) { 326 // Not a wildcard pattern -- hostName and pattern must match exactly. 327 return hostName.equals(pattern); 328 } 329 // Wildcard pattern 330 331 if (pattern.length() == 1) { 332 return true; 333 } 334 335 int index = pattern.indexOf('*'); 336 337 // At most one asterisk (*) is allowed. 338 if (pattern.indexOf('*', index + 1) != -1) { 339 return false; 340 } 341 342 // Asterisk can only match prefix or suffix. 343 if (index != 0 && index != pattern.length() - 1) { 344 return false; 345 } 346 347 // HostName must be at least as long as the pattern because asterisk has to 348 // match one or more characters. 349 if (hostName.length() < pattern.length()) { 350 return false; 351 } 352 353 if (index == 0 && hostName.endsWith(pattern.substring(1))) { 354 // Prefix matching fails. 355 return true; 356 } 357 358 // Pattern matches hostname if suffix matching succeeds. 359 return index == pattern.length() - 1 360 && hostName.startsWith(pattern.substring(0, pattern.length() - 1)); 361 } 362 363 private final class ConfigSelector extends InternalConfigSelector { 364 @Override selectConfig(PickSubchannelArgs args)365 public Result selectConfig(PickSubchannelArgs args) { 366 String cluster = null; 367 Route selectedRoute = null; 368 RoutingConfig routingCfg; 369 Map<String, FilterConfig> selectedOverrideConfigs; 370 List<ClientInterceptor> filterInterceptors = new ArrayList<>(); 371 Metadata headers = args.getHeaders(); 372 do { 373 routingCfg = routingConfig; 374 selectedOverrideConfigs = new HashMap<>(routingCfg.virtualHostOverrideConfig); 375 for (Route route : routingCfg.routes) { 376 if (RoutingUtils.matchRoute( 377 route.routeMatch(), "/" + args.getMethodDescriptor().getFullMethodName(), 378 headers, random)) { 379 selectedRoute = route; 380 selectedOverrideConfigs.putAll(route.filterConfigOverrides()); 381 break; 382 } 383 } 384 if (selectedRoute == null) { 385 return Result.forError( 386 Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC")); 387 } 388 if (selectedRoute.routeAction() == null) { 389 return Result.forError(Status.UNAVAILABLE.withDescription( 390 "Could not route RPC to Route with non-forwarding action")); 391 } 392 RouteAction action = selectedRoute.routeAction(); 393 if (action.cluster() != null) { 394 cluster = prefixedClusterName(action.cluster()); 395 } else if (action.weightedClusters() != null) { 396 long totalWeight = 0; 397 for (ClusterWeight weightedCluster : action.weightedClusters()) { 398 totalWeight += weightedCluster.weight(); 399 } 400 long select = random.nextLong(totalWeight); 401 long accumulator = 0; 402 for (ClusterWeight weightedCluster : action.weightedClusters()) { 403 accumulator += weightedCluster.weight(); 404 if (select < accumulator) { 405 cluster = prefixedClusterName(weightedCluster.name()); 406 selectedOverrideConfigs.putAll(weightedCluster.filterConfigOverrides()); 407 break; 408 } 409 } 410 } else if (action.namedClusterSpecifierPluginConfig() != null) { 411 cluster = 412 prefixedClusterSpecifierPluginName(action.namedClusterSpecifierPluginConfig().name()); 413 } 414 } while (!retainCluster(cluster)); 415 Long timeoutNanos = null; 416 if (enableTimeout) { 417 if (selectedRoute != null) { 418 timeoutNanos = selectedRoute.routeAction().timeoutNano(); 419 } 420 if (timeoutNanos == null) { 421 timeoutNanos = routingCfg.fallbackTimeoutNano; 422 } 423 if (timeoutNanos <= 0) { 424 timeoutNanos = null; 425 } 426 } 427 RetryPolicy retryPolicy = 428 selectedRoute == null ? null : selectedRoute.routeAction().retryPolicy(); 429 // TODO(chengyuanzhang): avoid service config generation and parsing for each call. 430 Map<String, ?> rawServiceConfig = 431 generateServiceConfigWithMethodConfig(timeoutNanos, retryPolicy); 432 ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig); 433 Object config = parsedServiceConfig.getConfig(); 434 if (config == null) { 435 releaseCluster(cluster); 436 return Result.forError( 437 parsedServiceConfig.getError().augmentDescription( 438 "Failed to parse service config (method config)")); 439 } 440 if (routingCfg.filterChain != null) { 441 for (NamedFilterConfig namedFilter : routingCfg.filterChain) { 442 FilterConfig filterConfig = namedFilter.filterConfig; 443 Filter filter = filterRegistry.get(filterConfig.typeUrl()); 444 if (filter instanceof ClientInterceptorBuilder) { 445 ClientInterceptor interceptor = ((ClientInterceptorBuilder) filter) 446 .buildClientInterceptor( 447 filterConfig, selectedOverrideConfigs.get(namedFilter.name), 448 args, scheduler); 449 if (interceptor != null) { 450 filterInterceptors.add(interceptor); 451 } 452 } 453 } 454 } 455 final String finalCluster = cluster; 456 final long hash = generateHash(selectedRoute.routeAction().hashPolicies(), headers); 457 class ClusterSelectionInterceptor implements ClientInterceptor { 458 @Override 459 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( 460 final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, 461 final Channel next) { 462 final CallOptions callOptionsForCluster = 463 callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster) 464 .withOption(RPC_HASH_KEY, hash); 465 return new SimpleForwardingClientCall<ReqT, RespT>( 466 next.newCall(method, callOptionsForCluster)) { 467 @Override 468 public void start(Listener<RespT> listener, Metadata headers) { 469 listener = new SimpleForwardingClientCallListener<RespT>(listener) { 470 boolean committed; 471 472 @Override 473 public void onHeaders(Metadata headers) { 474 committed = true; 475 releaseCluster(finalCluster); 476 delegate().onHeaders(headers); 477 } 478 479 @Override 480 public void onClose(Status status, Metadata trailers) { 481 if (!committed) { 482 releaseCluster(finalCluster); 483 } 484 delegate().onClose(status, trailers); 485 } 486 }; 487 delegate().start(listener, headers); 488 } 489 }; 490 } 491 } 492 493 filterInterceptors.add(new ClusterSelectionInterceptor()); 494 return 495 Result.newBuilder() 496 .setConfig(config) 497 .setInterceptor(combineInterceptors(filterInterceptors)) 498 .build(); 499 } 500 501 private boolean retainCluster(String cluster) { 502 ClusterRefState clusterRefState = clusterRefs.get(cluster); 503 if (clusterRefState == null) { 504 return false; 505 } 506 AtomicInteger refCount = clusterRefState.refCount; 507 int count; 508 do { 509 count = refCount.get(); 510 if (count == 0) { 511 return false; 512 } 513 } while (!refCount.compareAndSet(count, count + 1)); 514 return true; 515 } 516 517 private void releaseCluster(final String cluster) { 518 int count = clusterRefs.get(cluster).refCount.decrementAndGet(); 519 if (count == 0) { 520 syncContext.execute(new Runnable() { 521 @Override 522 public void run() { 523 if (clusterRefs.get(cluster).refCount.get() == 0) { 524 clusterRefs.remove(cluster); 525 updateResolutionResult(); 526 } 527 } 528 }); 529 } 530 } 531 532 private long generateHash(List<HashPolicy> hashPolicies, Metadata headers) { 533 Long hash = null; 534 for (HashPolicy policy : hashPolicies) { 535 Long newHash = null; 536 if (policy.type() == HashPolicy.Type.HEADER) { 537 String value = getHeaderValue(headers, policy.headerName()); 538 if (value != null) { 539 if (policy.regEx() != null && policy.regExSubstitution() != null) { 540 value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution()); 541 } 542 newHash = hashFunc.hashAsciiString(value); 543 } 544 } else if (policy.type() == HashPolicy.Type.CHANNEL_ID) { 545 newHash = hashFunc.hashLong(randomChannelId); 546 } 547 if (newHash != null ) { 548 // Rotating the old value prevents duplicate hash rules from cancelling each other out 549 // and preserves all of the entropy. 550 long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0; 551 hash = oldHash ^ newHash; 552 } 553 // If the policy is a terminal policy and a hash has been generated, ignore 554 // the rest of the hash policies. 555 if (policy.isTerminal() && hash != null) { 556 break; 557 } 558 } 559 return hash == null ? random.nextLong() : hash; 560 } 561 } 562 563 private static ClientInterceptor combineInterceptors(final List<ClientInterceptor> interceptors) { 564 checkArgument(!interceptors.isEmpty(), "empty interceptors"); 565 if (interceptors.size() == 1) { 566 return interceptors.get(0); 567 } 568 return new ClientInterceptor() { 569 @Override 570 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( 571 MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { 572 next = ClientInterceptors.interceptForward(next, interceptors); 573 return next.newCall(method, callOptions); 574 } 575 }; 576 } 577 578 @Nullable 579 private static String getHeaderValue(Metadata headers, String headerName) { 580 if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { 581 return null; 582 } 583 if (headerName.equals("content-type")) { 584 return "application/grpc"; 585 } 586 Metadata.Key<String> key; 587 try { 588 key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER); 589 } catch (IllegalArgumentException e) { 590 return null; 591 } 592 Iterable<String> values = headers.getAll(key); 593 return values == null ? null : Joiner.on(",").join(values); 594 } 595 596 private static String prefixedClusterName(String name) { 597 return "cluster:" + name; 598 } 599 600 private static String prefixedClusterSpecifierPluginName(String pluginName) { 601 return "cluster_specifier_plugin:" + pluginName; 602 } 603 604 private static final class FailingConfigSelector extends InternalConfigSelector { 605 private final Result result; 606 607 public FailingConfigSelector(Status error) { 608 this.result = Result.forError(error); 609 } 610 611 @Override 612 public Result selectConfig(PickSubchannelArgs args) { 613 return result; 614 } 615 } 616 617 private class ResolveState implements ResourceWatcher<LdsUpdate> { 618 private final ConfigOrError emptyServiceConfig = 619 serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap()); 620 private final String ldsResourceName; 621 private boolean stopped; 622 @Nullable 623 private Set<String> existingClusters; // clusters to which new requests can be routed 624 @Nullable 625 private RouteDiscoveryState routeDiscoveryState; 626 627 ResolveState(String ldsResourceName) { 628 this.ldsResourceName = ldsResourceName; 629 } 630 631 @Override 632 public void onChanged(final LdsUpdate update) { 633 syncContext.execute(new Runnable() { 634 @Override 635 public void run() { 636 if (stopped) { 637 return; 638 } 639 logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update); 640 HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); 641 List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts(); 642 String rdsName = httpConnectionManager.rdsName(); 643 cleanUpRouteDiscoveryState(); 644 if (virtualHosts != null) { 645 updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(), 646 httpConnectionManager.httpFilterConfigs()); 647 } else { 648 routeDiscoveryState = new RouteDiscoveryState( 649 rdsName, httpConnectionManager.httpMaxStreamDurationNano(), 650 httpConnectionManager.httpFilterConfigs()); 651 logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName); 652 xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), 653 rdsName, routeDiscoveryState); 654 } 655 } 656 }); 657 } 658 659 @Override 660 public void onError(final Status error) { 661 syncContext.execute(new Runnable() { 662 @Override 663 public void run() { 664 if (stopped || receivedConfig) { 665 return; 666 } 667 listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( 668 String.format("Unable to load LDS %s. xDS server returned: %s: %s", 669 ldsResourceName, error.getCode(), error.getDescription()))); 670 } 671 }); 672 } 673 674 @Override 675 public void onResourceDoesNotExist(final String resourceName) { 676 syncContext.execute(new Runnable() { 677 @Override 678 public void run() { 679 if (stopped) { 680 return; 681 } 682 String error = "LDS resource does not exist: " + resourceName; 683 logger.log(XdsLogLevel.INFO, error); 684 cleanUpRouteDiscoveryState(); 685 cleanUpRoutes(error); 686 } 687 }); 688 } 689 690 private void start() { 691 logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName); 692 xdsClient.watchXdsResource(XdsListenerResource.getInstance(), ldsResourceName, this); 693 } 694 695 private void stop() { 696 logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName); 697 stopped = true; 698 cleanUpRouteDiscoveryState(); 699 xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, this); 700 } 701 702 // called in syncContext 703 private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano, 704 @Nullable List<NamedFilterConfig> filterConfigs) { 705 String authority = overrideAuthority != null ? overrideAuthority : ldsResourceName; 706 VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority); 707 if (virtualHost == null) { 708 String error = "Failed to find virtual host matching hostname: " + authority; 709 logger.log(XdsLogLevel.WARNING, error); 710 cleanUpRoutes(error); 711 return; 712 } 713 714 List<Route> routes = virtualHost.routes(); 715 716 // Populate all clusters to which requests can be routed to through the virtual host. 717 Set<String> clusters = new HashSet<>(); 718 // uniqueName -> clusterName 719 Map<String, String> clusterNameMap = new HashMap<>(); 720 // uniqueName -> pluginConfig 721 Map<String, RlsPluginConfig> rlsPluginConfigMap = new HashMap<>(); 722 for (Route route : routes) { 723 RouteAction action = route.routeAction(); 724 String prefixedName; 725 if (action != null) { 726 if (action.cluster() != null) { 727 prefixedName = prefixedClusterName(action.cluster()); 728 clusters.add(prefixedName); 729 clusterNameMap.put(prefixedName, action.cluster()); 730 } else if (action.weightedClusters() != null) { 731 for (ClusterWeight weighedCluster : action.weightedClusters()) { 732 prefixedName = prefixedClusterName(weighedCluster.name()); 733 clusters.add(prefixedName); 734 clusterNameMap.put(prefixedName, weighedCluster.name()); 735 } 736 } else if (action.namedClusterSpecifierPluginConfig() != null) { 737 PluginConfig pluginConfig = action.namedClusterSpecifierPluginConfig().config(); 738 if (pluginConfig instanceof RlsPluginConfig) { 739 prefixedName = prefixedClusterSpecifierPluginName( 740 action.namedClusterSpecifierPluginConfig().name()); 741 clusters.add(prefixedName); 742 rlsPluginConfigMap.put(prefixedName, (RlsPluginConfig) pluginConfig); 743 } 744 } 745 } 746 } 747 748 // Updates channel's load balancing config whenever the set of selectable clusters changes. 749 boolean shouldUpdateResult = existingClusters == null; 750 Set<String> addedClusters = 751 existingClusters == null ? clusters : Sets.difference(clusters, existingClusters); 752 Set<String> deletedClusters = 753 existingClusters == null 754 ? Collections.emptySet() : Sets.difference(existingClusters, clusters); 755 existingClusters = clusters; 756 for (String cluster : addedClusters) { 757 if (clusterRefs.containsKey(cluster)) { 758 clusterRefs.get(cluster).refCount.incrementAndGet(); 759 } else { 760 if (clusterNameMap.containsKey(cluster)) { 761 clusterRefs.put( 762 cluster, 763 ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster))); 764 } 765 if (rlsPluginConfigMap.containsKey(cluster)) { 766 clusterRefs.put( 767 cluster, 768 ClusterRefState.forRlsPlugin( 769 new AtomicInteger(1), rlsPluginConfigMap.get(cluster))); 770 } 771 shouldUpdateResult = true; 772 } 773 } 774 for (String cluster : clusters) { 775 RlsPluginConfig rlsPluginConfig = rlsPluginConfigMap.get(cluster); 776 if (!Objects.equals(rlsPluginConfig, clusterRefs.get(cluster).rlsPluginConfig)) { 777 ClusterRefState newClusterRefState = 778 ClusterRefState.forRlsPlugin(clusterRefs.get(cluster).refCount, rlsPluginConfig); 779 clusterRefs.put(cluster, newClusterRefState); 780 shouldUpdateResult = true; 781 } 782 } 783 // Update service config to include newly added clusters. 784 if (shouldUpdateResult) { 785 updateResolutionResult(); 786 } 787 // Make newly added clusters selectable by config selector and deleted clusters no longer 788 // selectable. 789 routingConfig = 790 new RoutingConfig( 791 httpMaxStreamDurationNano, routes, filterConfigs, 792 virtualHost.filterConfigOverrides()); 793 shouldUpdateResult = false; 794 for (String cluster : deletedClusters) { 795 int count = clusterRefs.get(cluster).refCount.decrementAndGet(); 796 if (count == 0) { 797 clusterRefs.remove(cluster); 798 shouldUpdateResult = true; 799 } 800 } 801 if (shouldUpdateResult) { 802 updateResolutionResult(); 803 } 804 } 805 806 private void cleanUpRoutes(String error) { 807 if (existingClusters != null) { 808 for (String cluster : existingClusters) { 809 int count = clusterRefs.get(cluster).refCount.decrementAndGet(); 810 if (count == 0) { 811 clusterRefs.remove(cluster); 812 } 813 } 814 existingClusters = null; 815 } 816 routingConfig = RoutingConfig.empty; 817 // Without addresses the default LB (normally pick_first) should become TRANSIENT_FAILURE, and 818 // the config selector handles the error message itself. Once the LB API allows providing 819 // failure information for addresses yet still providing a service config, the config seector 820 // could be avoided. 821 listener.onResult(ResolutionResult.newBuilder() 822 .setAttributes(Attributes.newBuilder() 823 .set(InternalConfigSelector.KEY, 824 new FailingConfigSelector(Status.UNAVAILABLE.withDescription(error))) 825 .build()) 826 .setServiceConfig(emptyServiceConfig) 827 .build()); 828 receivedConfig = true; 829 } 830 831 private void cleanUpRouteDiscoveryState() { 832 if (routeDiscoveryState != null) { 833 String rdsName = routeDiscoveryState.resourceName; 834 logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsName); 835 xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName, 836 routeDiscoveryState); 837 routeDiscoveryState = null; 838 } 839 } 840 841 /** 842 * Discovery state for RouteConfiguration resource. One instance for each Listener resource 843 * update. 844 */ 845 private class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> { 846 private final String resourceName; 847 private final long httpMaxStreamDurationNano; 848 @Nullable 849 private final List<NamedFilterConfig> filterConfigs; 850 851 private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano, 852 @Nullable List<NamedFilterConfig> filterConfigs) { 853 this.resourceName = resourceName; 854 this.httpMaxStreamDurationNano = httpMaxStreamDurationNano; 855 this.filterConfigs = filterConfigs; 856 } 857 858 @Override 859 public void onChanged(final RdsUpdate update) { 860 syncContext.execute(new Runnable() { 861 @Override 862 public void run() { 863 if (RouteDiscoveryState.this != routeDiscoveryState) { 864 return; 865 } 866 logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update); 867 updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, 868 filterConfigs); 869 } 870 }); 871 } 872 873 @Override 874 public void onError(final Status error) { 875 syncContext.execute(new Runnable() { 876 @Override 877 public void run() { 878 if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) { 879 return; 880 } 881 listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( 882 String.format("Unable to load RDS %s. xDS server returned: %s: %s", 883 resourceName, error.getCode(), error.getDescription()))); 884 } 885 }); 886 } 887 888 @Override 889 public void onResourceDoesNotExist(final String resourceName) { 890 syncContext.execute(new Runnable() { 891 @Override 892 public void run() { 893 if (RouteDiscoveryState.this != routeDiscoveryState) { 894 return; 895 } 896 String error = "RDS resource does not exist: " + resourceName; 897 logger.log(XdsLogLevel.INFO, error); 898 cleanUpRoutes(error); 899 } 900 }); 901 } 902 } 903 } 904 905 /** 906 * VirtualHost-level configuration for request routing. 907 */ 908 private static class RoutingConfig { 909 private final long fallbackTimeoutNano; 910 final List<Route> routes; 911 // Null if HttpFilter is not supported. 912 @Nullable final List<NamedFilterConfig> filterChain; 913 final Map<String, FilterConfig> virtualHostOverrideConfig; 914 915 private static RoutingConfig empty = new RoutingConfig( 916 0, Collections.emptyList(), null, Collections.emptyMap()); 917 918 private RoutingConfig( 919 long fallbackTimeoutNano, List<Route> routes, @Nullable List<NamedFilterConfig> filterChain, 920 Map<String, FilterConfig> virtualHostOverrideConfig) { 921 this.fallbackTimeoutNano = fallbackTimeoutNano; 922 this.routes = routes; 923 checkArgument(filterChain == null || !filterChain.isEmpty(), "filterChain is empty"); 924 this.filterChain = filterChain == null ? null : Collections.unmodifiableList(filterChain); 925 this.virtualHostOverrideConfig = Collections.unmodifiableMap(virtualHostOverrideConfig); 926 } 927 } 928 929 private static class ClusterRefState { 930 final AtomicInteger refCount; 931 @Nullable 932 final String traditionalCluster; 933 @Nullable 934 final RlsPluginConfig rlsPluginConfig; 935 936 private ClusterRefState( 937 AtomicInteger refCount, @Nullable String traditionalCluster, 938 @Nullable RlsPluginConfig rlsPluginConfig) { 939 this.refCount = refCount; 940 checkArgument(traditionalCluster == null ^ rlsPluginConfig == null, 941 "There must be exactly one non-null value in traditionalCluster and pluginConfig"); 942 this.traditionalCluster = traditionalCluster; 943 this.rlsPluginConfig = rlsPluginConfig; 944 } 945 946 private Map<String, ?> toLbPolicy() { 947 if (traditionalCluster != null) { 948 return ImmutableMap.of( 949 XdsLbPolicies.CDS_POLICY_NAME, 950 ImmutableMap.of("cluster", traditionalCluster)); 951 } else { 952 ImmutableMap<String, ?> rlsConfig = new ImmutableMap.Builder<String, Object>() 953 .put("routeLookupConfig", rlsPluginConfig.config()) 954 .put( 955 "childPolicy", 956 ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of()))) 957 .put("childPolicyConfigTargetFieldName", "cluster") 958 .buildOrThrow(); 959 return ImmutableMap.of("rls_experimental", rlsConfig); 960 } 961 } 962 963 static ClusterRefState forCluster(AtomicInteger refCount, String name) { 964 return new ClusterRefState(refCount, name, null); 965 } 966 967 static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) { 968 return new ClusterRefState(refCount, null, rlsPluginConfig); 969 } 970 } 971 } 972