• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2019 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.xds;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;
22 
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.Joiner;
25 import com.google.common.base.Strings;
26 import com.google.common.collect.ImmutableList;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.collect.Sets;
29 import com.google.gson.Gson;
30 import com.google.protobuf.util.Durations;
31 import io.grpc.Attributes;
32 import io.grpc.CallOptions;
33 import io.grpc.Channel;
34 import io.grpc.ClientCall;
35 import io.grpc.ClientInterceptor;
36 import io.grpc.ClientInterceptors;
37 import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
38 import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
39 import io.grpc.InternalConfigSelector;
40 import io.grpc.InternalLogId;
41 import io.grpc.LoadBalancer.PickSubchannelArgs;
42 import io.grpc.Metadata;
43 import io.grpc.MethodDescriptor;
44 import io.grpc.NameResolver;
45 import io.grpc.Status;
46 import io.grpc.Status.Code;
47 import io.grpc.SynchronizationContext;
48 import io.grpc.internal.GrpcUtil;
49 import io.grpc.internal.ObjectPool;
50 import io.grpc.xds.Bootstrapper.AuthorityInfo;
51 import io.grpc.xds.Bootstrapper.BootstrapInfo;
52 import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig;
53 import io.grpc.xds.Filter.ClientInterceptorBuilder;
54 import io.grpc.xds.Filter.FilterConfig;
55 import io.grpc.xds.Filter.NamedFilterConfig;
56 import io.grpc.xds.RouteLookupServiceClusterSpecifierPlugin.RlsPluginConfig;
57 import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
58 import io.grpc.xds.VirtualHost.Route;
59 import io.grpc.xds.VirtualHost.Route.RouteAction;
60 import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
61 import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
62 import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
63 import io.grpc.xds.XdsClient.ResourceWatcher;
64 import io.grpc.xds.XdsListenerResource.LdsUpdate;
65 import io.grpc.xds.XdsLogger.XdsLogLevel;
66 import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
67 import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
68 import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
69 import java.util.ArrayList;
70 import java.util.Collections;
71 import java.util.HashMap;
72 import java.util.HashSet;
73 import java.util.List;
74 import java.util.Locale;
75 import java.util.Map;
76 import java.util.Objects;
77 import java.util.Set;
78 import java.util.concurrent.ConcurrentHashMap;
79 import java.util.concurrent.ConcurrentMap;
80 import java.util.concurrent.ScheduledExecutorService;
81 import java.util.concurrent.atomic.AtomicInteger;
82 import javax.annotation.Nullable;
83 
84 /**
85  * A {@link NameResolver} for resolving gRPC target names with "xds:" scheme.
86  *
87  * <p>Resolving a gRPC target involves contacting the control plane management server via xDS
88  * protocol to retrieve service information and produce a service config to the caller.
89  *
90  * @see XdsNameResolverProvider
91  */
92 final class XdsNameResolver extends NameResolver {
93 
94   static final CallOptions.Key<String> CLUSTER_SELECTION_KEY =
95       CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY");
96   static final CallOptions.Key<Long> RPC_HASH_KEY =
97       CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY");
98   @VisibleForTesting
99   static boolean enableTimeout =
100       Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"))
101           || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"));
102 
103   private final InternalLogId logId;
104   private final XdsLogger logger;
105   @Nullable
106   private final String targetAuthority;
107   private final String serviceAuthority;
108   private final String overrideAuthority;
109   private final ServiceConfigParser serviceConfigParser;
110   private final SynchronizationContext syncContext;
111   private final ScheduledExecutorService scheduler;
112   private final XdsClientPoolFactory xdsClientPoolFactory;
113   private final ThreadSafeRandom random;
114   private final FilterRegistry filterRegistry;
115   private final XxHash64 hashFunc = XxHash64.INSTANCE;
116   // Clusters (with reference counts) to which new/existing requests can be/are routed.
117   // put()/remove() must be called in SyncContext, and get() can be called in any thread.
118   private final ConcurrentMap<String, ClusterRefState> clusterRefs = new ConcurrentHashMap<>();
119   private final ConfigSelector configSelector = new ConfigSelector();
120   private final long randomChannelId;
121 
122   private volatile RoutingConfig routingConfig = RoutingConfig.empty;
123   private Listener2 listener;
124   private ObjectPool<XdsClient> xdsClientPool;
125   private XdsClient xdsClient;
126   private CallCounterProvider callCounterProvider;
127   private ResolveState resolveState;
128   // Workaround for https://github.com/grpc/grpc-java/issues/8886 . This should be handled in
129   // XdsClient instead of here.
130   private boolean receivedConfig;
131 
XdsNameResolver( @ullable String targetAuthority, String name, @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser, SynchronizationContext syncContext, ScheduledExecutorService scheduler, @Nullable Map<String, ?> bootstrapOverride)132   XdsNameResolver(
133       @Nullable String targetAuthority, String name, @Nullable String overrideAuthority,
134       ServiceConfigParser serviceConfigParser,
135       SynchronizationContext syncContext, ScheduledExecutorService scheduler,
136       @Nullable Map<String, ?> bootstrapOverride) {
137     this(targetAuthority, name, overrideAuthority, serviceConfigParser, syncContext, scheduler,
138         SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance,
139         FilterRegistry.getDefaultRegistry(), bootstrapOverride);
140   }
141 
142   @VisibleForTesting
XdsNameResolver( @ullable String targetAuthority, String name, @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser, SynchronizationContext syncContext, ScheduledExecutorService scheduler, XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random, FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride)143   XdsNameResolver(
144       @Nullable String targetAuthority, String name, @Nullable String overrideAuthority,
145       ServiceConfigParser serviceConfigParser,
146       SynchronizationContext syncContext, ScheduledExecutorService scheduler,
147       XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
148       FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride) {
149     this.targetAuthority = targetAuthority;
150     serviceAuthority = GrpcUtil.checkAuthority(checkNotNull(name, "name"));
151     this.overrideAuthority = overrideAuthority;
152     this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
153     this.syncContext = checkNotNull(syncContext, "syncContext");
154     this.scheduler = checkNotNull(scheduler, "scheduler");
155     this.xdsClientPoolFactory = bootstrapOverride == null ? checkNotNull(xdsClientPoolFactory,
156             "xdsClientPoolFactory") : new SharedXdsClientPoolProvider();
157     this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride);
158     this.random = checkNotNull(random, "random");
159     this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
160     randomChannelId = random.nextLong();
161     logId = InternalLogId.allocate("xds-resolver", name);
162     logger = XdsLogger.withLogId(logId);
163     logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name);
164   }
165 
166   @Override
getServiceAuthority()167   public String getServiceAuthority() {
168     return serviceAuthority;
169   }
170 
171   @Override
start(Listener2 listener)172   public void start(Listener2 listener) {
173     this.listener = checkNotNull(listener, "listener");
174     try {
175       xdsClientPool = xdsClientPoolFactory.getOrCreate();
176     } catch (Exception e) {
177       listener.onError(
178           Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
179       return;
180     }
181     xdsClient = xdsClientPool.getObject();
182     BootstrapInfo bootstrapInfo = xdsClient.getBootstrapInfo();
183     String listenerNameTemplate;
184     if (targetAuthority == null) {
185       listenerNameTemplate = bootstrapInfo.clientDefaultListenerResourceNameTemplate();
186     } else {
187       AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(targetAuthority);
188       if (authorityInfo == null) {
189         listener.onError(Status.INVALID_ARGUMENT.withDescription(
190             "invalid target URI: target authority not found in the bootstrap"));
191         return;
192       }
193       listenerNameTemplate = authorityInfo.clientListenerResourceNameTemplate();
194     }
195     String replacement = serviceAuthority;
196     if (listenerNameTemplate.startsWith(XDSTP_SCHEME)) {
197       replacement = XdsClient.percentEncodePath(replacement);
198     }
199     String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
200     if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl())
201         ) {
202       listener.onError(Status.INVALID_ARGUMENT.withDescription(
203           "invalid listener resource URI for service authority: " + serviceAuthority));
204       return;
205     }
206     ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
207     callCounterProvider = SharedCallCounterMap.getInstance();
208     resolveState = new ResolveState(ldsResourceName);
209     resolveState.start();
210   }
211 
expandPercentS(String template, String replacement)212   private static String expandPercentS(String template, String replacement) {
213     return template.replace("%s", replacement);
214   }
215 
216   @Override
shutdown()217   public void shutdown() {
218     logger.log(XdsLogLevel.INFO, "Shutdown");
219     if (resolveState != null) {
220       resolveState.stop();
221     }
222     if (xdsClient != null) {
223       xdsClient = xdsClientPool.returnObject(xdsClient);
224     }
225   }
226 
227   @VisibleForTesting
generateServiceConfigWithMethodConfig( @ullable Long timeoutNano, @Nullable RetryPolicy retryPolicy)228   static Map<String, ?> generateServiceConfigWithMethodConfig(
229       @Nullable Long timeoutNano, @Nullable RetryPolicy retryPolicy) {
230     if (timeoutNano == null
231         && (retryPolicy == null || retryPolicy.retryableStatusCodes().isEmpty())) {
232       return Collections.emptyMap();
233     }
234     ImmutableMap.Builder<String, Object> methodConfig = ImmutableMap.builder();
235     methodConfig.put(
236         "name", Collections.singletonList(Collections.emptyMap()));
237     if (retryPolicy != null && !retryPolicy.retryableStatusCodes().isEmpty()) {
238       ImmutableMap.Builder<String, Object> rawRetryPolicy = ImmutableMap.builder();
239       rawRetryPolicy.put("maxAttempts", (double) retryPolicy.maxAttempts());
240       rawRetryPolicy.put("initialBackoff", Durations.toString(retryPolicy.initialBackoff()));
241       rawRetryPolicy.put("maxBackoff", Durations.toString(retryPolicy.maxBackoff()));
242       rawRetryPolicy.put("backoffMultiplier", 2D);
243       List<String> codes = new ArrayList<>(retryPolicy.retryableStatusCodes().size());
244       for (Code code : retryPolicy.retryableStatusCodes()) {
245         codes.add(code.name());
246       }
247       rawRetryPolicy.put(
248           "retryableStatusCodes", Collections.unmodifiableList(codes));
249       if (retryPolicy.perAttemptRecvTimeout() != null) {
250         rawRetryPolicy.put(
251             "perAttemptRecvTimeout", Durations.toString(retryPolicy.perAttemptRecvTimeout()));
252       }
253       methodConfig.put("retryPolicy", rawRetryPolicy.buildOrThrow());
254     }
255     if (timeoutNano != null) {
256       String timeout = timeoutNano / 1_000_000_000.0 + "s";
257       methodConfig.put("timeout", timeout);
258     }
259     return Collections.singletonMap(
260         "methodConfig", Collections.singletonList(methodConfig.buildOrThrow()));
261   }
262 
263   @VisibleForTesting
getXdsClient()264   XdsClient getXdsClient() {
265     return xdsClient;
266   }
267 
268   // called in syncContext
updateResolutionResult()269   private void updateResolutionResult() {
270     syncContext.throwIfNotInThisSynchronizationContext();
271 
272     ImmutableMap.Builder<String, Object> childPolicy = new ImmutableMap.Builder<>();
273     for (String name : clusterRefs.keySet()) {
274       Map<String, ?> lbPolicy = clusterRefs.get(name).toLbPolicy();
275       childPolicy.put(name, ImmutableMap.of("lbPolicy", ImmutableList.of(lbPolicy)));
276     }
277     Map<String, ?> rawServiceConfig = ImmutableMap.of(
278         "loadBalancingConfig",
279         ImmutableList.of(ImmutableMap.of(
280             XdsLbPolicies.CLUSTER_MANAGER_POLICY_NAME,
281             ImmutableMap.of("childPolicy", childPolicy.buildOrThrow()))));
282 
283     if (logger.isLoggable(XdsLogLevel.INFO)) {
284       logger.log(
285           XdsLogLevel.INFO, "Generated service config:\n{0}", new Gson().toJson(rawServiceConfig));
286     }
287     ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
288     Attributes attrs =
289         Attributes.newBuilder()
290             .set(InternalXdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
291             .set(InternalXdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider)
292             .set(InternalConfigSelector.KEY, configSelector)
293             .build();
294     ResolutionResult result =
295         ResolutionResult.newBuilder()
296             .setAttributes(attrs)
297             .setServiceConfig(parsedServiceConfig)
298             .build();
299     listener.onResult(result);
300     receivedConfig = true;
301   }
302 
303   /**
304    * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with
305    * case-insensitive.
306    *
307    * <p>Wildcard pattern rules:
308    * <ol>
309    * <li>A single asterisk (*) matches any domain.</li>
310    * <li>Asterisk (*) is only permitted in the left-most or the right-most part of the pattern,
311    *     but not both.</li>
312    * </ol>
313    */
314   @VisibleForTesting
matchHostName(String hostName, String pattern)315   static boolean matchHostName(String hostName, String pattern) {
316     checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."),
317         "Invalid host name");
318     checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."),
319         "Invalid pattern/domain name");
320 
321     hostName = hostName.toLowerCase(Locale.US);
322     pattern = pattern.toLowerCase(Locale.US);
323     // hostName and pattern are now in lower case -- domain names are case-insensitive.
324 
325     if (!pattern.contains("*")) {
326       // Not a wildcard pattern -- hostName and pattern must match exactly.
327       return hostName.equals(pattern);
328     }
329     // Wildcard pattern
330 
331     if (pattern.length() == 1) {
332       return true;
333     }
334 
335     int index = pattern.indexOf('*');
336 
337     // At most one asterisk (*) is allowed.
338     if (pattern.indexOf('*', index + 1) != -1) {
339       return false;
340     }
341 
342     // Asterisk can only match prefix or suffix.
343     if (index != 0 && index != pattern.length() - 1) {
344       return false;
345     }
346 
347     // HostName must be at least as long as the pattern because asterisk has to
348     // match one or more characters.
349     if (hostName.length() < pattern.length()) {
350       return false;
351     }
352 
353     if (index == 0 && hostName.endsWith(pattern.substring(1))) {
354       // Prefix matching fails.
355       return true;
356     }
357 
358     // Pattern matches hostname if suffix matching succeeds.
359     return index == pattern.length() - 1
360         && hostName.startsWith(pattern.substring(0, pattern.length() - 1));
361   }
362 
363   private final class ConfigSelector extends InternalConfigSelector {
364     @Override
selectConfig(PickSubchannelArgs args)365     public Result selectConfig(PickSubchannelArgs args) {
366       String cluster = null;
367       Route selectedRoute = null;
368       RoutingConfig routingCfg;
369       Map<String, FilterConfig> selectedOverrideConfigs;
370       List<ClientInterceptor> filterInterceptors = new ArrayList<>();
371       Metadata headers = args.getHeaders();
372       do {
373         routingCfg = routingConfig;
374         selectedOverrideConfigs = new HashMap<>(routingCfg.virtualHostOverrideConfig);
375         for (Route route : routingCfg.routes) {
376           if (RoutingUtils.matchRoute(
377                   route.routeMatch(), "/" + args.getMethodDescriptor().getFullMethodName(),
378               headers, random)) {
379             selectedRoute = route;
380             selectedOverrideConfigs.putAll(route.filterConfigOverrides());
381             break;
382           }
383         }
384         if (selectedRoute == null) {
385           return Result.forError(
386               Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"));
387         }
388         if (selectedRoute.routeAction() == null) {
389           return Result.forError(Status.UNAVAILABLE.withDescription(
390               "Could not route RPC to Route with non-forwarding action"));
391         }
392         RouteAction action = selectedRoute.routeAction();
393         if (action.cluster() != null) {
394           cluster = prefixedClusterName(action.cluster());
395         } else if (action.weightedClusters() != null) {
396           long totalWeight = 0;
397           for (ClusterWeight weightedCluster : action.weightedClusters()) {
398             totalWeight += weightedCluster.weight();
399           }
400           long select = random.nextLong(totalWeight);
401           long accumulator = 0;
402           for (ClusterWeight weightedCluster : action.weightedClusters()) {
403             accumulator += weightedCluster.weight();
404             if (select < accumulator) {
405               cluster = prefixedClusterName(weightedCluster.name());
406               selectedOverrideConfigs.putAll(weightedCluster.filterConfigOverrides());
407               break;
408             }
409           }
410         } else if (action.namedClusterSpecifierPluginConfig() != null) {
411           cluster =
412               prefixedClusterSpecifierPluginName(action.namedClusterSpecifierPluginConfig().name());
413         }
414       } while (!retainCluster(cluster));
415       Long timeoutNanos = null;
416       if (enableTimeout) {
417         if (selectedRoute != null) {
418           timeoutNanos = selectedRoute.routeAction().timeoutNano();
419         }
420         if (timeoutNanos == null) {
421           timeoutNanos = routingCfg.fallbackTimeoutNano;
422         }
423         if (timeoutNanos <= 0) {
424           timeoutNanos = null;
425         }
426       }
427       RetryPolicy retryPolicy =
428           selectedRoute == null ? null : selectedRoute.routeAction().retryPolicy();
429       // TODO(chengyuanzhang): avoid service config generation and parsing for each call.
430       Map<String, ?> rawServiceConfig =
431           generateServiceConfigWithMethodConfig(timeoutNanos, retryPolicy);
432       ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
433       Object config = parsedServiceConfig.getConfig();
434       if (config == null) {
435         releaseCluster(cluster);
436         return Result.forError(
437             parsedServiceConfig.getError().augmentDescription(
438                 "Failed to parse service config (method config)"));
439       }
440       if (routingCfg.filterChain != null) {
441         for (NamedFilterConfig namedFilter : routingCfg.filterChain) {
442           FilterConfig filterConfig = namedFilter.filterConfig;
443           Filter filter = filterRegistry.get(filterConfig.typeUrl());
444           if (filter instanceof ClientInterceptorBuilder) {
445             ClientInterceptor interceptor = ((ClientInterceptorBuilder) filter)
446                 .buildClientInterceptor(
447                     filterConfig, selectedOverrideConfigs.get(namedFilter.name),
448                     args, scheduler);
449             if (interceptor != null) {
450               filterInterceptors.add(interceptor);
451             }
452           }
453         }
454       }
455       final String finalCluster = cluster;
456       final long hash = generateHash(selectedRoute.routeAction().hashPolicies(), headers);
457       class ClusterSelectionInterceptor implements ClientInterceptor {
458         @Override
459         public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
460             final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
461             final Channel next) {
462           final CallOptions callOptionsForCluster =
463               callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster)
464                   .withOption(RPC_HASH_KEY, hash);
465           return new SimpleForwardingClientCall<ReqT, RespT>(
466               next.newCall(method, callOptionsForCluster)) {
467             @Override
468             public void start(Listener<RespT> listener, Metadata headers) {
469               listener = new SimpleForwardingClientCallListener<RespT>(listener) {
470                 boolean committed;
471 
472                 @Override
473                 public void onHeaders(Metadata headers) {
474                   committed = true;
475                   releaseCluster(finalCluster);
476                   delegate().onHeaders(headers);
477                 }
478 
479                 @Override
480                 public void onClose(Status status, Metadata trailers) {
481                   if (!committed) {
482                     releaseCluster(finalCluster);
483                   }
484                   delegate().onClose(status, trailers);
485                 }
486               };
487               delegate().start(listener, headers);
488             }
489           };
490         }
491       }
492 
493       filterInterceptors.add(new ClusterSelectionInterceptor());
494       return
495           Result.newBuilder()
496               .setConfig(config)
497               .setInterceptor(combineInterceptors(filterInterceptors))
498               .build();
499     }
500 
501     private boolean retainCluster(String cluster) {
502       ClusterRefState clusterRefState = clusterRefs.get(cluster);
503       if (clusterRefState == null) {
504         return false;
505       }
506       AtomicInteger refCount = clusterRefState.refCount;
507       int count;
508       do {
509         count = refCount.get();
510         if (count == 0) {
511           return false;
512         }
513       } while (!refCount.compareAndSet(count, count + 1));
514       return true;
515     }
516 
517     private void releaseCluster(final String cluster) {
518       int count = clusterRefs.get(cluster).refCount.decrementAndGet();
519       if (count == 0) {
520         syncContext.execute(new Runnable() {
521           @Override
522           public void run() {
523             if (clusterRefs.get(cluster).refCount.get() == 0) {
524               clusterRefs.remove(cluster);
525               updateResolutionResult();
526             }
527           }
528         });
529       }
530     }
531 
532     private long generateHash(List<HashPolicy> hashPolicies, Metadata headers) {
533       Long hash = null;
534       for (HashPolicy policy : hashPolicies) {
535         Long newHash = null;
536         if (policy.type() == HashPolicy.Type.HEADER) {
537           String value = getHeaderValue(headers, policy.headerName());
538           if (value != null) {
539             if (policy.regEx() != null && policy.regExSubstitution() != null) {
540               value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution());
541             }
542             newHash = hashFunc.hashAsciiString(value);
543           }
544         } else if (policy.type() == HashPolicy.Type.CHANNEL_ID) {
545           newHash = hashFunc.hashLong(randomChannelId);
546         }
547         if (newHash != null ) {
548           // Rotating the old value prevents duplicate hash rules from cancelling each other out
549           // and preserves all of the entropy.
550           long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0;
551           hash = oldHash ^ newHash;
552         }
553         // If the policy is a terminal policy and a hash has been generated, ignore
554         // the rest of the hash policies.
555         if (policy.isTerminal() && hash != null) {
556           break;
557         }
558       }
559       return hash == null ? random.nextLong() : hash;
560     }
561   }
562 
563   private static ClientInterceptor combineInterceptors(final List<ClientInterceptor> interceptors) {
564     checkArgument(!interceptors.isEmpty(), "empty interceptors");
565     if (interceptors.size() == 1) {
566       return interceptors.get(0);
567     }
568     return new ClientInterceptor() {
569       @Override
570       public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
571           MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
572         next = ClientInterceptors.interceptForward(next, interceptors);
573         return next.newCall(method, callOptions);
574       }
575     };
576   }
577 
578   @Nullable
579   private static String getHeaderValue(Metadata headers, String headerName) {
580     if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
581       return null;
582     }
583     if (headerName.equals("content-type")) {
584       return "application/grpc";
585     }
586     Metadata.Key<String> key;
587     try {
588       key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
589     } catch (IllegalArgumentException e) {
590       return null;
591     }
592     Iterable<String> values = headers.getAll(key);
593     return values == null ? null : Joiner.on(",").join(values);
594   }
595 
596   private static String prefixedClusterName(String name) {
597     return "cluster:" + name;
598   }
599 
600   private static String prefixedClusterSpecifierPluginName(String pluginName) {
601     return "cluster_specifier_plugin:" + pluginName;
602   }
603 
604   private static final class FailingConfigSelector extends InternalConfigSelector {
605     private final Result result;
606 
607     public FailingConfigSelector(Status error) {
608       this.result = Result.forError(error);
609     }
610 
611     @Override
612     public Result selectConfig(PickSubchannelArgs args) {
613       return result;
614     }
615   }
616 
617   private class ResolveState implements ResourceWatcher<LdsUpdate> {
618     private final ConfigOrError emptyServiceConfig =
619         serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap());
620     private final String ldsResourceName;
621     private boolean stopped;
622     @Nullable
623     private Set<String> existingClusters;  // clusters to which new requests can be routed
624     @Nullable
625     private RouteDiscoveryState routeDiscoveryState;
626 
627     ResolveState(String ldsResourceName) {
628       this.ldsResourceName = ldsResourceName;
629     }
630 
631     @Override
632     public void onChanged(final LdsUpdate update) {
633       syncContext.execute(new Runnable() {
634         @Override
635         public void run() {
636           if (stopped) {
637             return;
638           }
639           logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
640           HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
641           List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
642           String rdsName = httpConnectionManager.rdsName();
643           cleanUpRouteDiscoveryState();
644           if (virtualHosts != null) {
645             updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(),
646                 httpConnectionManager.httpFilterConfigs());
647           } else {
648             routeDiscoveryState = new RouteDiscoveryState(
649                 rdsName, httpConnectionManager.httpMaxStreamDurationNano(),
650                 httpConnectionManager.httpFilterConfigs());
651             logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
652             xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
653                 rdsName, routeDiscoveryState);
654           }
655         }
656       });
657     }
658 
659     @Override
660     public void onError(final Status error) {
661       syncContext.execute(new Runnable() {
662         @Override
663         public void run() {
664           if (stopped || receivedConfig) {
665             return;
666           }
667           listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
668               String.format("Unable to load LDS %s. xDS server returned: %s: %s",
669               ldsResourceName, error.getCode(), error.getDescription())));
670         }
671       });
672     }
673 
674     @Override
675     public void onResourceDoesNotExist(final String resourceName) {
676       syncContext.execute(new Runnable() {
677         @Override
678         public void run() {
679           if (stopped) {
680             return;
681           }
682           String error = "LDS resource does not exist: " + resourceName;
683           logger.log(XdsLogLevel.INFO, error);
684           cleanUpRouteDiscoveryState();
685           cleanUpRoutes(error);
686         }
687       });
688     }
689 
690     private void start() {
691       logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName);
692       xdsClient.watchXdsResource(XdsListenerResource.getInstance(), ldsResourceName, this);
693     }
694 
695     private void stop() {
696       logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName);
697       stopped = true;
698       cleanUpRouteDiscoveryState();
699       xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, this);
700     }
701 
702     // called in syncContext
703     private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano,
704         @Nullable List<NamedFilterConfig> filterConfigs) {
705       String authority = overrideAuthority != null ? overrideAuthority : ldsResourceName;
706       VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority);
707       if (virtualHost == null) {
708         String error = "Failed to find virtual host matching hostname: " + authority;
709         logger.log(XdsLogLevel.WARNING, error);
710         cleanUpRoutes(error);
711         return;
712       }
713 
714       List<Route> routes = virtualHost.routes();
715 
716       // Populate all clusters to which requests can be routed to through the virtual host.
717       Set<String> clusters = new HashSet<>();
718       // uniqueName -> clusterName
719       Map<String, String> clusterNameMap = new HashMap<>();
720       // uniqueName -> pluginConfig
721       Map<String, RlsPluginConfig> rlsPluginConfigMap = new HashMap<>();
722       for (Route route : routes) {
723         RouteAction action = route.routeAction();
724         String prefixedName;
725         if (action != null) {
726           if (action.cluster() != null) {
727             prefixedName = prefixedClusterName(action.cluster());
728             clusters.add(prefixedName);
729             clusterNameMap.put(prefixedName, action.cluster());
730           } else if (action.weightedClusters() != null) {
731             for (ClusterWeight weighedCluster : action.weightedClusters()) {
732               prefixedName = prefixedClusterName(weighedCluster.name());
733               clusters.add(prefixedName);
734               clusterNameMap.put(prefixedName, weighedCluster.name());
735             }
736           } else if (action.namedClusterSpecifierPluginConfig() != null) {
737             PluginConfig pluginConfig = action.namedClusterSpecifierPluginConfig().config();
738             if (pluginConfig instanceof RlsPluginConfig) {
739               prefixedName = prefixedClusterSpecifierPluginName(
740                   action.namedClusterSpecifierPluginConfig().name());
741               clusters.add(prefixedName);
742               rlsPluginConfigMap.put(prefixedName, (RlsPluginConfig) pluginConfig);
743             }
744           }
745         }
746       }
747 
748       // Updates channel's load balancing config whenever the set of selectable clusters changes.
749       boolean shouldUpdateResult = existingClusters == null;
750       Set<String> addedClusters =
751           existingClusters == null ? clusters : Sets.difference(clusters, existingClusters);
752       Set<String> deletedClusters =
753           existingClusters == null
754               ? Collections.emptySet() : Sets.difference(existingClusters, clusters);
755       existingClusters = clusters;
756       for (String cluster : addedClusters) {
757         if (clusterRefs.containsKey(cluster)) {
758           clusterRefs.get(cluster).refCount.incrementAndGet();
759         } else {
760           if (clusterNameMap.containsKey(cluster)) {
761             clusterRefs.put(
762                 cluster,
763                 ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster)));
764           }
765           if (rlsPluginConfigMap.containsKey(cluster)) {
766             clusterRefs.put(
767                 cluster,
768                 ClusterRefState.forRlsPlugin(
769                     new AtomicInteger(1), rlsPluginConfigMap.get(cluster)));
770           }
771           shouldUpdateResult = true;
772         }
773       }
774       for (String cluster : clusters) {
775         RlsPluginConfig rlsPluginConfig = rlsPluginConfigMap.get(cluster);
776         if (!Objects.equals(rlsPluginConfig, clusterRefs.get(cluster).rlsPluginConfig)) {
777           ClusterRefState newClusterRefState =
778               ClusterRefState.forRlsPlugin(clusterRefs.get(cluster).refCount, rlsPluginConfig);
779           clusterRefs.put(cluster, newClusterRefState);
780           shouldUpdateResult = true;
781         }
782       }
783       // Update service config to include newly added clusters.
784       if (shouldUpdateResult) {
785         updateResolutionResult();
786       }
787       // Make newly added clusters selectable by config selector and deleted clusters no longer
788       // selectable.
789       routingConfig =
790           new RoutingConfig(
791               httpMaxStreamDurationNano, routes, filterConfigs,
792               virtualHost.filterConfigOverrides());
793       shouldUpdateResult = false;
794       for (String cluster : deletedClusters) {
795         int count = clusterRefs.get(cluster).refCount.decrementAndGet();
796         if (count == 0) {
797           clusterRefs.remove(cluster);
798           shouldUpdateResult = true;
799         }
800       }
801       if (shouldUpdateResult) {
802         updateResolutionResult();
803       }
804     }
805 
806     private void cleanUpRoutes(String error) {
807       if (existingClusters != null) {
808         for (String cluster : existingClusters) {
809           int count = clusterRefs.get(cluster).refCount.decrementAndGet();
810           if (count == 0) {
811             clusterRefs.remove(cluster);
812           }
813         }
814         existingClusters = null;
815       }
816       routingConfig = RoutingConfig.empty;
817       // Without addresses the default LB (normally pick_first) should become TRANSIENT_FAILURE, and
818       // the config selector handles the error message itself. Once the LB API allows providing
819       // failure information for addresses yet still providing a service config, the config seector
820       // could be avoided.
821       listener.onResult(ResolutionResult.newBuilder()
822           .setAttributes(Attributes.newBuilder()
823             .set(InternalConfigSelector.KEY,
824               new FailingConfigSelector(Status.UNAVAILABLE.withDescription(error)))
825             .build())
826           .setServiceConfig(emptyServiceConfig)
827           .build());
828       receivedConfig = true;
829     }
830 
831     private void cleanUpRouteDiscoveryState() {
832       if (routeDiscoveryState != null) {
833         String rdsName = routeDiscoveryState.resourceName;
834         logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsName);
835         xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
836             routeDiscoveryState);
837         routeDiscoveryState = null;
838       }
839     }
840 
841     /**
842      * Discovery state for RouteConfiguration resource. One instance for each Listener resource
843      * update.
844      */
845     private class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
846       private final String resourceName;
847       private final long httpMaxStreamDurationNano;
848       @Nullable
849       private final List<NamedFilterConfig> filterConfigs;
850 
851       private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano,
852           @Nullable List<NamedFilterConfig> filterConfigs) {
853         this.resourceName = resourceName;
854         this.httpMaxStreamDurationNano = httpMaxStreamDurationNano;
855         this.filterConfigs = filterConfigs;
856       }
857 
858       @Override
859       public void onChanged(final RdsUpdate update) {
860         syncContext.execute(new Runnable() {
861           @Override
862           public void run() {
863             if (RouteDiscoveryState.this != routeDiscoveryState) {
864               return;
865             }
866             logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update);
867             updateRoutes(update.virtualHosts, httpMaxStreamDurationNano,
868                 filterConfigs);
869           }
870         });
871       }
872 
873       @Override
874       public void onError(final Status error) {
875         syncContext.execute(new Runnable() {
876           @Override
877           public void run() {
878             if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) {
879               return;
880             }
881             listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
882                 String.format("Unable to load RDS %s. xDS server returned: %s: %s",
883                 resourceName, error.getCode(), error.getDescription())));
884           }
885         });
886       }
887 
888       @Override
889       public void onResourceDoesNotExist(final String resourceName) {
890         syncContext.execute(new Runnable() {
891           @Override
892           public void run() {
893             if (RouteDiscoveryState.this != routeDiscoveryState) {
894               return;
895             }
896             String error = "RDS resource does not exist: " + resourceName;
897             logger.log(XdsLogLevel.INFO, error);
898             cleanUpRoutes(error);
899           }
900         });
901       }
902     }
903   }
904 
905   /**
906    * VirtualHost-level configuration for request routing.
907    */
908   private static class RoutingConfig {
909     private final long fallbackTimeoutNano;
910     final List<Route> routes;
911     // Null if HttpFilter is not supported.
912     @Nullable final List<NamedFilterConfig> filterChain;
913     final Map<String, FilterConfig> virtualHostOverrideConfig;
914 
915     private static RoutingConfig empty = new RoutingConfig(
916         0, Collections.emptyList(), null, Collections.emptyMap());
917 
918     private RoutingConfig(
919         long fallbackTimeoutNano, List<Route> routes, @Nullable List<NamedFilterConfig> filterChain,
920         Map<String, FilterConfig> virtualHostOverrideConfig) {
921       this.fallbackTimeoutNano = fallbackTimeoutNano;
922       this.routes = routes;
923       checkArgument(filterChain == null || !filterChain.isEmpty(), "filterChain is empty");
924       this.filterChain = filterChain == null ? null : Collections.unmodifiableList(filterChain);
925       this.virtualHostOverrideConfig = Collections.unmodifiableMap(virtualHostOverrideConfig);
926     }
927   }
928 
929   private static class ClusterRefState {
930     final AtomicInteger refCount;
931     @Nullable
932     final String traditionalCluster;
933     @Nullable
934     final RlsPluginConfig rlsPluginConfig;
935 
936     private ClusterRefState(
937         AtomicInteger refCount, @Nullable String traditionalCluster,
938         @Nullable RlsPluginConfig rlsPluginConfig) {
939       this.refCount = refCount;
940       checkArgument(traditionalCluster == null ^ rlsPluginConfig == null,
941           "There must be exactly one non-null value in traditionalCluster and pluginConfig");
942       this.traditionalCluster = traditionalCluster;
943       this.rlsPluginConfig = rlsPluginConfig;
944     }
945 
946     private Map<String, ?> toLbPolicy() {
947       if (traditionalCluster != null) {
948         return ImmutableMap.of(
949             XdsLbPolicies.CDS_POLICY_NAME,
950             ImmutableMap.of("cluster", traditionalCluster));
951       } else {
952         ImmutableMap<String, ?> rlsConfig = new ImmutableMap.Builder<String, Object>()
953             .put("routeLookupConfig", rlsPluginConfig.config())
954             .put(
955                 "childPolicy",
956                 ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of())))
957             .put("childPolicyConfigTargetFieldName", "cluster")
958             .buildOrThrow();
959         return ImmutableMap.of("rls_experimental", rlsConfig);
960       }
961     }
962 
963     static ClusterRefState forCluster(AtomicInteger refCount, String name) {
964       return new ClusterRefState(refCount, name, null);
965     }
966 
967     static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) {
968       return new ClusterRefState(refCount, null, rlsPluginConfig);
969     }
970   }
971 }
972