• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2020 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.xds;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;
22 import static io.grpc.xds.XdsResourceType.ParsedResource;
23 import static io.grpc.xds.XdsResourceType.ValidatedResourceUpdate;
24 
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.Joiner;
27 import com.google.common.base.Stopwatch;
28 import com.google.common.base.Supplier;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.collect.ImmutableSet;
31 import com.google.common.util.concurrent.ListenableFuture;
32 import com.google.common.util.concurrent.SettableFuture;
33 import com.google.protobuf.Any;
34 import io.grpc.ChannelCredentials;
35 import io.grpc.Context;
36 import io.grpc.Grpc;
37 import io.grpc.InternalLogId;
38 import io.grpc.LoadBalancerRegistry;
39 import io.grpc.ManagedChannel;
40 import io.grpc.Status;
41 import io.grpc.SynchronizationContext;
42 import io.grpc.SynchronizationContext.ScheduledHandle;
43 import io.grpc.internal.BackoffPolicy;
44 import io.grpc.internal.TimeProvider;
45 import io.grpc.xds.Bootstrapper.AuthorityInfo;
46 import io.grpc.xds.Bootstrapper.ServerInfo;
47 import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
48 import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
49 import io.grpc.xds.XdsClient.ResourceStore;
50 import io.grpc.xds.XdsClient.TimerLaunch;
51 import io.grpc.xds.XdsClient.XdsResponseHandler;
52 import io.grpc.xds.XdsLogger.XdsLogLevel;
53 import java.net.URI;
54 import java.util.Collection;
55 import java.util.Collections;
56 import java.util.HashMap;
57 import java.util.HashSet;
58 import java.util.List;
59 import java.util.Map;
60 import java.util.Objects;
61 import java.util.Set;
62 import java.util.concurrent.ScheduledExecutorService;
63 import java.util.concurrent.TimeUnit;
64 import java.util.logging.Level;
65 import java.util.logging.Logger;
66 import javax.annotation.Nullable;
67 
68 /**
69  * XdsClient implementation.
70  */
71 final class XdsClientImpl extends XdsClient
72     implements XdsResponseHandler, ResourceStore, TimerLaunch {
73 
74   private static boolean LOG_XDS_NODE_ID = Boolean.parseBoolean(
75       System.getenv("GRPC_LOG_XDS_NODE_ID"));
76   private static final Logger classLogger = Logger.getLogger(XdsClientImpl.class.getName());
77 
78   // Longest time to wait, since the subscription to some resource, for concluding its absence.
79   @VisibleForTesting
80   static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
81   private final SynchronizationContext syncContext = new SynchronizationContext(
82       new Thread.UncaughtExceptionHandler() {
83         @Override
84         public void uncaughtException(Thread t, Throwable e) {
85           logger.log(
86               XdsLogLevel.ERROR,
87               "Uncaught exception in XdsClient SynchronizationContext. Panic!",
88               e);
89           // TODO(chengyuanzhang): better error handling.
90           throw new AssertionError(e);
91         }
92       });
93   private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
94   private final LoadBalancerRegistry loadBalancerRegistry
95       = LoadBalancerRegistry.getDefaultRegistry();
96   private final Map<ServerInfo, ControlPlaneClient> serverChannelMap = new HashMap<>();
97   private final Map<XdsResourceType<? extends ResourceUpdate>,
98       Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
99       resourceSubscribers = new HashMap<>();
100   private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
101   private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap = new HashMap<>();
102   private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
103   private final XdsChannelFactory xdsChannelFactory;
104   private final Bootstrapper.BootstrapInfo bootstrapInfo;
105   private final Context context;
106   private final ScheduledExecutorService timeService;
107   private final BackoffPolicy.Provider backoffPolicyProvider;
108   private final Supplier<Stopwatch> stopwatchSupplier;
109   private final TimeProvider timeProvider;
110   private final TlsContextManager tlsContextManager;
111   private final InternalLogId logId;
112   private final XdsLogger logger;
113   private volatile boolean isShutdown;
114 
XdsClientImpl( XdsChannelFactory xdsChannelFactory, Bootstrapper.BootstrapInfo bootstrapInfo, Context context, ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier, TimeProvider timeProvider, TlsContextManager tlsContextManager)115   XdsClientImpl(
116       XdsChannelFactory xdsChannelFactory,
117       Bootstrapper.BootstrapInfo bootstrapInfo,
118       Context context,
119       ScheduledExecutorService timeService,
120       BackoffPolicy.Provider backoffPolicyProvider,
121       Supplier<Stopwatch> stopwatchSupplier,
122       TimeProvider timeProvider,
123       TlsContextManager tlsContextManager) {
124     this.xdsChannelFactory = xdsChannelFactory;
125     this.bootstrapInfo = bootstrapInfo;
126     this.context = context;
127     this.timeService = timeService;
128     this.backoffPolicyProvider = backoffPolicyProvider;
129     this.stopwatchSupplier = stopwatchSupplier;
130     this.timeProvider = timeProvider;
131     this.tlsContextManager = checkNotNull(tlsContextManager, "tlsContextManager");
132     logId = InternalLogId.allocate("xds-client", null);
133     logger = XdsLogger.withLogId(logId);
134     logger.log(XdsLogLevel.INFO, "Created");
135     if (LOG_XDS_NODE_ID) {
136       classLogger.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId());
137     }
138   }
139 
maybeCreateXdsChannelWithLrs(ServerInfo serverInfo)140   private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {
141     syncContext.throwIfNotInThisSynchronizationContext();
142     if (serverChannelMap.containsKey(serverInfo)) {
143       return;
144     }
145     ControlPlaneClient xdsChannel = new ControlPlaneClient(
146         xdsChannelFactory,
147         serverInfo,
148         bootstrapInfo.node(),
149         this,
150         this,
151         context,
152         timeService,
153         syncContext,
154         backoffPolicyProvider,
155         stopwatchSupplier,
156         this);
157     LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
158     loadStatsManagerMap.put(serverInfo, loadStatsManager);
159     LoadReportClient lrsClient = new LoadReportClient(
160         loadStatsManager, xdsChannel.channel(), context, bootstrapInfo.node(), syncContext,
161         timeService, backoffPolicyProvider, stopwatchSupplier);
162     serverChannelMap.put(serverInfo, xdsChannel);
163     serverLrsClientMap.put(serverInfo, lrsClient);
164   }
165 
166   @Override
handleResourceResponse( XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce)167   public void handleResourceResponse(
168       XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo,
169       List<Any> resources, String nonce) {
170     checkNotNull(xdsResourceType, "xdsResourceType");
171     syncContext.throwIfNotInThisSynchronizationContext();
172     Set<String> toParseResourceNames = null;
173     if (!(xdsResourceType == XdsListenerResource.getInstance()
174         || xdsResourceType == XdsRouteConfigureResource.getInstance())
175         && resourceSubscribers.containsKey(xdsResourceType)) {
176       toParseResourceNames = resourceSubscribers.get(xdsResourceType).keySet();
177     }
178     XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce,
179         bootstrapInfo, filterRegistry, loadBalancerRegistry, tlsContextManager,
180         toParseResourceNames);
181     handleResourceUpdate(args, resources, xdsResourceType);
182   }
183 
184   @Override
handleStreamClosed(Status error)185   public void handleStreamClosed(Status error) {
186     syncContext.throwIfNotInThisSynchronizationContext();
187     cleanUpResourceTimers();
188     for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
189         resourceSubscribers.values()) {
190       for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
191         if (!subscriber.hasResult()) {
192           subscriber.onError(error);
193         }
194       }
195     }
196   }
197 
198   @Override
handleStreamRestarted(ServerInfo serverInfo)199   public void handleStreamRestarted(ServerInfo serverInfo) {
200     syncContext.throwIfNotInThisSynchronizationContext();
201     for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
202         resourceSubscribers.values()) {
203       for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
204         if (subscriber.serverInfo.equals(serverInfo)) {
205           subscriber.restartTimer();
206         }
207       }
208     }
209   }
210 
211   @Override
shutdown()212   void shutdown() {
213     syncContext.execute(
214         new Runnable() {
215           @Override
216           public void run() {
217             if (isShutdown) {
218               return;
219             }
220             isShutdown = true;
221             for (ControlPlaneClient xdsChannel : serverChannelMap.values()) {
222               xdsChannel.shutdown();
223             }
224             for (final LoadReportClient lrsClient : serverLrsClientMap.values()) {
225               lrsClient.stopLoadReporting();
226             }
227             cleanUpResourceTimers();
228           }
229         });
230   }
231 
232   @Override
isShutDown()233   boolean isShutDown() {
234     return isShutdown;
235   }
236 
237   @Override
getSubscribedResourceTypesWithTypeUrl()238   public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() {
239     return Collections.unmodifiableMap(subscribedResourceTypeUrls);
240   }
241 
242   @Nullable
243   @Override
getSubscribedResources(ServerInfo serverInfo, XdsResourceType<? extends ResourceUpdate> type)244   public Collection<String> getSubscribedResources(ServerInfo serverInfo,
245                                                    XdsResourceType<? extends ResourceUpdate> type) {
246     Map<String, ResourceSubscriber<? extends ResourceUpdate>> resources =
247         resourceSubscribers.getOrDefault(type, Collections.emptyMap());
248     ImmutableSet.Builder<String> builder = ImmutableSet.builder();
249     for (String key : resources.keySet()) {
250       if (resources.get(key).serverInfo.equals(serverInfo)) {
251         builder.add(key);
252       }
253     }
254     Collection<String> retVal = builder.build();
255     return retVal.isEmpty() ? null : retVal;
256   }
257 
258   // As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
259   // ResourceTypes that do not have subscribers does not show up in the snapshot keys.
260   @Override
261   ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
getSubscribedResourcesMetadataSnapshot()262       getSubscribedResourcesMetadataSnapshot() {
263     final SettableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future =
264         SettableFuture.create();
265     syncContext.execute(new Runnable() {
266       @Override
267       public void run() {
268         // A map from a "resource type" to a map ("resource name": "resource metadata")
269         ImmutableMap.Builder<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataSnapshot =
270             ImmutableMap.builder();
271         for (XdsResourceType<?> resourceType: resourceSubscribers.keySet()) {
272           ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder();
273           for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry
274               : resourceSubscribers.get(resourceType).entrySet()) {
275             metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata);
276           }
277           metadataSnapshot.put(resourceType, metadataMap.buildOrThrow());
278         }
279         future.set(metadataSnapshot.buildOrThrow());
280       }
281     });
282     return future;
283   }
284 
285   @Override
getTlsContextManager()286   TlsContextManager getTlsContextManager() {
287     return tlsContextManager;
288   }
289 
290   @Override
watchXdsResource(XdsResourceType<T> type, String resourceName, ResourceWatcher<T> watcher)291   <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
292                                                    ResourceWatcher<T> watcher) {
293     syncContext.execute(new Runnable() {
294       @Override
295       @SuppressWarnings("unchecked")
296       public void run() {
297         if (!resourceSubscribers.containsKey(type)) {
298           resourceSubscribers.put(type, new HashMap<>());
299           subscribedResourceTypeUrls.put(type.typeUrl(), type);
300         }
301         ResourceSubscriber<T> subscriber =
302             (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
303         if (subscriber == null) {
304           logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName);
305           subscriber = new ResourceSubscriber<>(type, resourceName);
306           resourceSubscribers.get(type).put(resourceName, subscriber);
307           if (subscriber.xdsChannel != null) {
308             subscriber.xdsChannel.adjustResourceSubscription(type);
309           }
310         }
311         subscriber.addWatcher(watcher);
312       }
313     });
314   }
315 
316   @Override
cancelXdsResourceWatch(XdsResourceType<T> type, String resourceName, ResourceWatcher<T> watcher)317   <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
318                                                          String resourceName,
319                                                          ResourceWatcher<T> watcher) {
320     syncContext.execute(new Runnable() {
321       @Override
322       @SuppressWarnings("unchecked")
323       public void run() {
324         ResourceSubscriber<T> subscriber =
325             (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
326         subscriber.removeWatcher(watcher);
327         if (!subscriber.isWatched()) {
328           subscriber.cancelResourceWatch();
329           resourceSubscribers.get(type).remove(resourceName);
330           if (subscriber.xdsChannel != null) {
331             subscriber.xdsChannel.adjustResourceSubscription(type);
332           }
333           if (resourceSubscribers.get(type).isEmpty()) {
334             resourceSubscribers.remove(type);
335             subscribedResourceTypeUrls.remove(type.typeUrl());
336 
337           }
338         }
339       }
340     });
341   }
342 
343   @Override
addClusterDropStats( final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName)344   ClusterDropStats addClusterDropStats(
345       final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName) {
346     LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
347     ClusterDropStats dropCounter =
348         loadStatsManager.getClusterDropStats(clusterName, edsServiceName);
349     syncContext.execute(new Runnable() {
350       @Override
351       public void run() {
352         serverLrsClientMap.get(serverInfo).startLoadReporting();
353       }
354     });
355     return dropCounter;
356   }
357 
358   @Override
addClusterLocalityStats( final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName, Locality locality)359   ClusterLocalityStats addClusterLocalityStats(
360       final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
361       Locality locality) {
362     LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
363     ClusterLocalityStats loadCounter =
364         loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality);
365     syncContext.execute(new Runnable() {
366       @Override
367       public void run() {
368         serverLrsClientMap.get(serverInfo).startLoadReporting();
369       }
370     });
371     return loadCounter;
372   }
373 
374   @Override
getBootstrapInfo()375   Bootstrapper.BootstrapInfo getBootstrapInfo() {
376     return bootstrapInfo;
377   }
378 
379   @VisibleForTesting
380   @Override
getServerLrsClientMap()381   Map<ServerInfo, LoadReportClient> getServerLrsClientMap() {
382     return ImmutableMap.copyOf(serverLrsClientMap);
383   }
384 
385   @Override
toString()386   public String toString() {
387     return logId.toString();
388   }
389 
390   @Override
startSubscriberTimersIfNeeded(ServerInfo serverInfo)391   public void startSubscriberTimersIfNeeded(ServerInfo serverInfo) {
392     if (isShutDown()) {
393       return;
394     }
395 
396     syncContext.execute(new Runnable() {
397       @Override
398       public void run() {
399         if (isShutDown()) {
400           return;
401         }
402 
403         for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
404           for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
405             if (subscriber.serverInfo.equals(serverInfo) && subscriber.respTimer == null) {
406               subscriber.restartTimer();
407             }
408           }
409         }
410       }
411     });
412   }
413 
cleanUpResourceTimers()414   private void cleanUpResourceTimers() {
415     for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
416       for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
417         subscriber.stopTimer();
418       }
419     }
420   }
421 
422   @SuppressWarnings("unchecked")
handleResourceUpdate(XdsResourceType.Args args, List<Any> resources, XdsResourceType<T> xdsResourceType)423   private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Args args,
424                                                                List<Any> resources,
425                                                                XdsResourceType<T> xdsResourceType) {
426     ValidatedResourceUpdate<T> result = xdsResourceType.parse(args, resources);
427     logger.log(XdsLogger.XdsLogLevel.INFO,
428         "Received {0} Response version {1} nonce {2}. Parsed resources: {3}",
429          xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources);
430     Map<String, ParsedResource<T>> parsedResources = result.parsedResources;
431     Set<String> invalidResources = result.invalidResources;
432     List<String> errors = result.errors;
433     String errorDetail = null;
434     if (errors.isEmpty()) {
435       checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
436       serverChannelMap.get(args.serverInfo).ackResponse(xdsResourceType, args.versionInfo,
437           args.nonce);
438     } else {
439       errorDetail = Joiner.on('\n').join(errors);
440       logger.log(XdsLogLevel.WARNING,
441           "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
442           xdsResourceType.typeName(), args.versionInfo, args.nonce, errorDetail);
443       serverChannelMap.get(args.serverInfo).nackResponse(xdsResourceType, args.nonce, errorDetail);
444     }
445 
446     long updateTime = timeProvider.currentTimeNanos();
447     Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscribedResources =
448         resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap());
449     for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
450       String resourceName = entry.getKey();
451       ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue();
452 
453       if (parsedResources.containsKey(resourceName)) {
454         // Happy path: the resource updated successfully. Notify the watchers of the update.
455         subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime);
456         continue;
457       }
458 
459       if (invalidResources.contains(resourceName)) {
460         // The resource update is invalid. Capture the error without notifying the watchers.
461         subscriber.onRejected(args.versionInfo, updateTime, errorDetail);
462       }
463 
464       // Nothing else to do for incremental ADS resources.
465       if (!xdsResourceType.isFullStateOfTheWorld()) {
466         continue;
467       }
468 
469       // Handle State of the World ADS: invalid resources.
470       if (invalidResources.contains(resourceName)) {
471         // The resource is missing. Reuse the cached resource if possible.
472         if (subscriber.data == null) {
473           // No cached data. Notify the watchers of an invalid update.
474           subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail));
475         }
476         continue;
477       }
478 
479       // For State of the World services, notify watchers when their watched resource is missing
480       // from the ADS update. Note that we can only do this if the resource update is coming from
481       // the same xDS server that the ResourceSubscriber is subscribed to.
482       if (subscriber.serverInfo.equals(args.serverInfo)) {
483         subscriber.onAbsent();
484       }
485     }
486   }
487 
488   /**
489    * Tracks a single subscribed resource.
490    */
491   private final class ResourceSubscriber<T extends ResourceUpdate> {
492     @Nullable private final ServerInfo serverInfo;
493     @Nullable private final ControlPlaneClient xdsChannel;
494     private final XdsResourceType<T> type;
495     private final String resource;
496     private final Set<ResourceWatcher<T>> watchers = new HashSet<>();
497     @Nullable private T data;
498     private boolean absent;
499     // Tracks whether the deletion has been ignored per bootstrap server feature.
500     // See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md
501     private boolean resourceDeletionIgnored;
502     @Nullable private ScheduledHandle respTimer;
503     @Nullable private ResourceMetadata metadata;
504     @Nullable private String errorDescription;
505 
ResourceSubscriber(XdsResourceType<T> type, String resource)506     ResourceSubscriber(XdsResourceType<T> type, String resource) {
507       syncContext.throwIfNotInThisSynchronizationContext();
508       this.type = type;
509       this.resource = resource;
510       this.serverInfo = getServerInfo(resource);
511       if (serverInfo == null) {
512         this.errorDescription = "Wrong configuration: xds server does not exist for resource "
513             + resource;
514         this.xdsChannel = null;
515         return;
516       }
517       // Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
518       // is created but not yet requested because the client is in backoff.
519       this.metadata = ResourceMetadata.newResourceMetadataUnknown();
520 
521       ControlPlaneClient xdsChannelTemp = null;
522       try {
523         maybeCreateXdsChannelWithLrs(serverInfo);
524         xdsChannelTemp = serverChannelMap.get(serverInfo);
525         if (xdsChannelTemp.isInBackoff()) {
526           return;
527         }
528       } catch (IllegalArgumentException e) {
529         xdsChannelTemp = null;
530         this.errorDescription = "Bad configuration:  " + e.getMessage();
531         return;
532       } finally {
533         this.xdsChannel = xdsChannelTemp;
534       }
535 
536       restartTimer();
537     }
538 
539     @Nullable
getServerInfo(String resource)540     private ServerInfo getServerInfo(String resource) {
541       if (BootstrapperImpl.enableFederation && resource.startsWith(XDSTP_SCHEME)) {
542         URI uri = URI.create(resource);
543         String authority = uri.getAuthority();
544         if (authority == null) {
545           authority = "";
546         }
547         AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority);
548         if (authorityInfo == null || authorityInfo.xdsServers().isEmpty()) {
549           return null;
550         }
551         return authorityInfo.xdsServers().get(0);
552       }
553       return bootstrapInfo.servers().get(0); // use first server
554     }
555 
addWatcher(ResourceWatcher<T> watcher)556     void addWatcher(ResourceWatcher<T> watcher) {
557       checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher);
558       watchers.add(watcher);
559       if (errorDescription != null) {
560         watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
561         return;
562       }
563       if (data != null) {
564         notifyWatcher(watcher, data);
565       } else if (absent) {
566         watcher.onResourceDoesNotExist(resource);
567       }
568     }
569 
removeWatcher(ResourceWatcher<T> watcher)570     void removeWatcher(ResourceWatcher<T>  watcher) {
571       checkArgument(watchers.contains(watcher), "watcher %s not registered", watcher);
572       watchers.remove(watcher);
573     }
574 
restartTimer()575     void restartTimer() {
576       if (data != null || absent) {  // resource already resolved
577         return;
578       }
579       if (!xdsChannel.isReady()) { // When channel becomes ready, it will trigger a restartTimer
580         return;
581       }
582 
583       class ResourceNotFound implements Runnable {
584         @Override
585         public void run() {
586           logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout",
587               type, resource);
588           respTimer = null;
589           onAbsent();
590         }
591 
592         @Override
593         public String toString() {
594           return type + this.getClass().getSimpleName();
595         }
596       }
597 
598       // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
599       metadata = ResourceMetadata.newResourceMetadataRequested();
600 
601       respTimer = syncContext.schedule(
602           new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
603           timeService);
604     }
605 
stopTimer()606     void stopTimer() {
607       if (respTimer != null && respTimer.isPending()) {
608         respTimer.cancel();
609         respTimer = null;
610       }
611     }
612 
cancelResourceWatch()613     void cancelResourceWatch() {
614       if (isWatched()) {
615         throw new IllegalStateException("Can't cancel resource watch with active watchers present");
616       }
617       stopTimer();
618       String message = "Unsubscribing {0} resource {1} from server {2}";
619       XdsLogLevel logLevel = XdsLogLevel.INFO;
620       if (resourceDeletionIgnored) {
621         message += " for which we previously ignored a deletion";
622         logLevel = XdsLogLevel.FORCE_INFO;
623       }
624       logger.log(logLevel, message, type, resource,
625           serverInfo != null ? serverInfo.target() : "unknown");
626     }
627 
isWatched()628     boolean isWatched() {
629       return !watchers.isEmpty();
630     }
631 
hasResult()632     boolean hasResult() {
633       return data != null || absent;
634     }
635 
onData(ParsedResource<T> parsedResource, String version, long updateTime)636     void onData(ParsedResource<T> parsedResource, String version, long updateTime) {
637       if (respTimer != null && respTimer.isPending()) {
638         respTimer.cancel();
639         respTimer = null;
640       }
641       this.metadata = ResourceMetadata
642           .newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
643       ResourceUpdate oldData = this.data;
644       this.data = parsedResource.getResourceUpdate();
645       absent = false;
646       if (resourceDeletionIgnored) {
647         logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
648                 + "of resource for which we previously ignored a deletion: type {1} name {2}",
649             serverInfo != null ? serverInfo.target() : "unknown", type, resource);
650         resourceDeletionIgnored = false;
651       }
652       if (!Objects.equals(oldData, data)) {
653         for (ResourceWatcher<T> watcher : watchers) {
654           notifyWatcher(watcher, data);
655         }
656       }
657     }
658 
onAbsent()659     void onAbsent() {
660       if (respTimer != null && respTimer.isPending()) {  // too early to conclude absence
661         return;
662       }
663 
664       // Ignore deletion of State of the World resources when this feature is on,
665       // and the resource is reusable.
666       boolean ignoreResourceDeletionEnabled =
667           serverInfo != null && serverInfo.ignoreResourceDeletion();
668       if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
669         if (!resourceDeletionIgnored) {
670           logger.log(XdsLogLevel.FORCE_WARNING,
671               "xds server {0}: ignoring deletion for resource type {1} name {2}}",
672               serverInfo.target(), type, resource);
673           resourceDeletionIgnored = true;
674         }
675         return;
676       }
677 
678       logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
679       if (!absent) {
680         data = null;
681         absent = true;
682         metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
683         for (ResourceWatcher<T> watcher : watchers) {
684           watcher.onResourceDoesNotExist(resource);
685         }
686       }
687     }
688 
onError(Status error)689     void onError(Status error) {
690       if (respTimer != null && respTimer.isPending()) {
691         respTimer.cancel();
692         respTimer = null;
693       }
694 
695       // Include node ID in xds failures to allow cross-referencing with control plane logs
696       // when debugging.
697       String description = error.getDescription() == null ? "" : error.getDescription() + " ";
698       Status errorAugmented = Status.fromCode(error.getCode())
699           .withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
700           .withCause(error.getCause());
701 
702       for (ResourceWatcher<T> watcher : watchers) {
703         watcher.onError(errorAugmented);
704       }
705     }
706 
onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails)707     void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) {
708       metadata = ResourceMetadata
709           .newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails);
710     }
711 
notifyWatcher(ResourceWatcher<T> watcher, T update)712     private void notifyWatcher(ResourceWatcher<T> watcher, T update) {
713       watcher.onChanged(update);
714     }
715   }
716 
717   static final class ResourceInvalidException extends Exception {
718     private static final long serialVersionUID = 0L;
719 
ResourceInvalidException(String message)720     ResourceInvalidException(String message) {
721       super(message, null, false, false);
722     }
723 
ResourceInvalidException(String message, Throwable cause)724     ResourceInvalidException(String message, Throwable cause) {
725       super(cause != null ? message + ": " + cause.getMessage() : message, cause, false, false);
726     }
727   }
728 
729   abstract static class XdsChannelFactory {
730     static final XdsChannelFactory DEFAULT_XDS_CHANNEL_FACTORY = new XdsChannelFactory() {
731       @Override
732       ManagedChannel create(ServerInfo serverInfo) {
733         String target = serverInfo.target();
734         ChannelCredentials channelCredentials = serverInfo.channelCredentials();
735         return Grpc.newChannelBuilder(target, channelCredentials)
736             .keepAliveTime(5, TimeUnit.MINUTES)
737             .build();
738       }
739     };
740 
create(ServerInfo serverInfo)741     abstract ManagedChannel create(ServerInfo serverInfo);
742   }
743 }
744