• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/credentials.h>
20 #include <grpc/grpc.h>
21 #include <grpc/grpc_security.h>
22 #include <grpc/slice.h>
23 #include <grpc/status.h>
24 #include <grpc/support/port_platform.h>
25 #include <string.h>
26 
27 #include <algorithm>
28 #include <map>
29 #include <memory>
30 #include <set>
31 #include <string>
32 #include <utility>
33 #include <vector>
34 
35 #include "absl/base/thread_annotations.h"
36 #include "absl/log/check.h"
37 #include "absl/log/log.h"
38 #include "absl/status/status.h"
39 #include "absl/status/statusor.h"
40 #include "absl/strings/match.h"
41 #include "absl/strings/numbers.h"
42 #include "absl/strings/str_cat.h"
43 #include "absl/strings/str_join.h"
44 #include "absl/strings/str_replace.h"
45 #include "absl/strings/string_view.h"
46 #include "absl/types/optional.h"
47 #include "absl/types/variant.h"
48 #include "src/core/config/core_configuration.h"
49 #include "src/core/lib/address_utils/parse_address.h"
50 #include "src/core/lib/address_utils/sockaddr_utils.h"
51 #include "src/core/lib/channel/channel_args.h"
52 #include "src/core/lib/channel/channel_args_preconditioning.h"
53 #include "src/core/lib/channel/channel_fwd.h"
54 #include "src/core/lib/debug/trace.h"
55 #include "src/core/lib/iomgr/endpoint.h"
56 #include "src/core/lib/iomgr/exec_ctx.h"
57 #include "src/core/lib/iomgr/iomgr_fwd.h"
58 #include "src/core/lib/iomgr/resolved_address.h"
59 #include "src/core/lib/iomgr/sockaddr.h"
60 #include "src/core/lib/iomgr/socket_utils.h"
61 #include "src/core/lib/security/credentials/credentials.h"
62 #include "src/core/lib/security/credentials/tls/grpc_tls_certificate_distributor.h"
63 #include "src/core/lib/security/credentials/tls/grpc_tls_certificate_provider.h"
64 #include "src/core/lib/security/credentials/xds/xds_credentials.h"
65 #include "src/core/lib/transport/metadata_batch.h"
66 #include "src/core/server/server.h"
67 #include "src/core/server/server_config_selector.h"
68 #include "src/core/server/server_config_selector_filter.h"
69 #include "src/core/server/xds_channel_stack_modifier.h"
70 #include "src/core/service_config/service_config.h"
71 #include "src/core/service_config/service_config_impl.h"
72 #include "src/core/util/debug_location.h"
73 #include "src/core/util/host_port.h"
74 #include "src/core/util/match.h"
75 #include "src/core/util/ref_counted_ptr.h"
76 #include "src/core/util/sync.h"
77 #include "src/core/util/unique_type_name.h"
78 #include "src/core/util/uri.h"
79 #include "src/core/xds/grpc/certificate_provider_store.h"
80 #include "src/core/xds/grpc/xds_bootstrap_grpc.h"
81 #include "src/core/xds/grpc/xds_certificate_provider.h"
82 #include "src/core/xds/grpc/xds_client_grpc.h"
83 #include "src/core/xds/grpc/xds_common_types.h"
84 #include "src/core/xds/grpc/xds_http_filter.h"
85 #include "src/core/xds/grpc/xds_http_filter_registry.h"
86 #include "src/core/xds/grpc/xds_listener.h"
87 #include "src/core/xds/grpc/xds_listener_parser.h"
88 #include "src/core/xds/grpc/xds_route_config.h"
89 #include "src/core/xds/grpc/xds_route_config_parser.h"
90 #include "src/core/xds/grpc/xds_routing.h"
91 #include "src/core/xds/xds_client/xds_client.h"
92 
93 namespace grpc_core {
94 namespace {
95 
96 using ReadDelayHandle = XdsClient::ReadDelayHandle;
97 
98 // A server config fetcher that fetches the information for configuring server
99 // listeners from the xDS control plane.
100 class XdsServerConfigFetcher final : public ServerConfigFetcher {
101  public:
102   XdsServerConfigFetcher(RefCountedPtr<GrpcXdsClient> xds_client,
103                          grpc_server_xds_status_notifier notifier);
104 
~XdsServerConfigFetcher()105   ~XdsServerConfigFetcher() override {
106     xds_client_.reset(DEBUG_LOCATION, "XdsServerConfigFetcher");
107   }
108 
109   void StartWatch(
110       std::string listening_address,
111       std::unique_ptr<ServerConfigFetcher::WatcherInterface> watcher) override;
112 
113   void CancelWatch(ServerConfigFetcher::WatcherInterface* watcher) override;
114 
115   // Return the interested parties from the xds client so that it can be polled.
interested_parties()116   grpc_pollset_set* interested_parties() override {
117     return xds_client_->interested_parties();
118   }
119 
120  private:
121   class ListenerWatcher;
122 
123   RefCountedPtr<GrpcXdsClient> xds_client_;
124   const grpc_server_xds_status_notifier serving_status_notifier_;
125   Mutex mu_;
126   std::map<ServerConfigFetcher::WatcherInterface*, ListenerWatcher*>
127       listener_watchers_ ABSL_GUARDED_BY(mu_);
128 };
129 
130 // A watcher implementation for listening on LDS updates from the xDS control
131 // plane. When a good LDS update is received, it creates a
132 // FilterChainMatchManager object that would replace the existing (if any)
133 // FilterChainMatchManager object after all referenced RDS resources are
134 // fetched. Note that a good update also causes the server listener to start
135 // listening if it isn't already. If an error LDS update is received (NACKed
136 // resource, timeouts), the previous good FilterChainMatchManager, if any,
137 // continues to be used. If there isn't any previous good update or if the
138 // update received was a fatal error (resource does not exist), the server
139 // listener is made to stop listening.
140 class XdsServerConfigFetcher::ListenerWatcher final
141     : public XdsListenerResourceType::WatcherInterface {
142  public:
143   ListenerWatcher(RefCountedPtr<GrpcXdsClient> xds_client,
144                   std::unique_ptr<ServerConfigFetcher::WatcherInterface>
145                       server_config_watcher,
146                   grpc_server_xds_status_notifier serving_status_notifier,
147                   std::string listening_address);
148 
~ListenerWatcher()149   ~ListenerWatcher() override {
150     xds_client_.reset(DEBUG_LOCATION, "ListenerWatcher");
151   }
152 
153   void OnResourceChanged(
154       absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener,
155       RefCountedPtr<ReadDelayHandle> read_delay_handle) override;
156 
157   void OnAmbientError(
158       absl::Status status,
159       RefCountedPtr<ReadDelayHandle> read_delay_handle) override;
160 
listening_address() const161   const std::string& listening_address() const { return listening_address_; }
162 
163  private:
164   class FilterChainMatchManager;
165 
166   void OnFatalError(absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
167 
168   // Invoked by FilterChainMatchManager that is done fetching all referenced RDS
169   // resources. If the calling FilterChainMatchManager is the
170   // pending_filter_chain_match_manager_, it is promoted to be the
171   // filter_chain_match_manager_ in use.
PendingFilterChainMatchManagerReady(FilterChainMatchManager * filter_chain_match_manager)172   void PendingFilterChainMatchManagerReady(
173       FilterChainMatchManager* filter_chain_match_manager) {
174     MutexLock lock(&mu_);
175     PendingFilterChainMatchManagerReadyLocked(filter_chain_match_manager);
176   }
177   void PendingFilterChainMatchManagerReadyLocked(
178       FilterChainMatchManager* filter_chain_match_manager)
179       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
180 
181   RefCountedPtr<GrpcXdsClient> xds_client_;
182   const std::unique_ptr<ServerConfigFetcher::WatcherInterface>
183       server_config_watcher_;
184   const grpc_server_xds_status_notifier serving_status_notifier_;
185   const std::string listening_address_;
186   Mutex mu_;
187   RefCountedPtr<FilterChainMatchManager> filter_chain_match_manager_
188       ABSL_GUARDED_BY(mu_);
189   RefCountedPtr<FilterChainMatchManager> pending_filter_chain_match_manager_
190       ABSL_GUARDED_BY(mu_);
191 };
192 
193 // A connection manager used by the server listener code to inject channel args
194 // to be used for each incoming connection. This implementation chooses the
195 // appropriate filter chain from the xDS Listener resource and injects channel
196 // args that configure the right mTLS certs and cause the right set of HTTP
197 // filters to be injected.
198 class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager final
199     : public ServerConfigFetcher::ConnectionManager {
200  public:
201   FilterChainMatchManager(RefCountedPtr<GrpcXdsClient> xds_client,
202                           XdsListenerResource::FilterChainMap filter_chain_map,
203                           absl::optional<XdsListenerResource::FilterChainData>
204                               default_filter_chain);
205 
~FilterChainMatchManager()206   ~FilterChainMatchManager() override {
207     xds_client_.reset(DEBUG_LOCATION, "FilterChainMatchManager");
208   }
209 
210   absl::StatusOr<ChannelArgs> UpdateChannelArgsForConnection(
211       const ChannelArgs& args, grpc_endpoint* tcp) override;
212 
213   // Invoked by ListenerWatcher to start fetching referenced RDS resources.
214   void StartRdsWatch(RefCountedPtr<ListenerWatcher> listener_watcher)
215       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ListenerWatcher::mu_);
216 
filter_chain_map() const217   const XdsListenerResource::FilterChainMap& filter_chain_map() const {
218     return filter_chain_map_;
219   }
220 
221   const absl::optional<XdsListenerResource::FilterChainData>&
default_filter_chain() const222   default_filter_chain() const {
223     return default_filter_chain_;
224   }
225 
226  private:
227   class RouteConfigWatcher;
228   struct RdsUpdateState {
229     RouteConfigWatcher* watcher;
230     absl::optional<
231         absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>>
232         rds_update;
233   };
234 
235   class XdsServerConfigSelector;
236   class StaticXdsServerConfigSelectorProvider;
237   class DynamicXdsServerConfigSelectorProvider;
238 
239   absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
240   CreateOrGetXdsCertificateProviderFromFilterChainData(
241       const XdsListenerResource::FilterChainData* filter_chain);
242   void Orphaned() override;
243 
244   // Helper functions invoked by RouteConfigWatcher when there are updates to
245   // RDS resources.
246   void OnRouteConfigChanged(
247       const std::string& resource_name,
248       absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
249           route_config);
250   void OnAmbientError(const std::string& resource_name, absl::Status status);
251 
252   RefCountedPtr<GrpcXdsClient> xds_client_;
253   // This ref is only kept around till the FilterChainMatchManager becomes
254   // ready.
255   RefCountedPtr<ListenerWatcher> listener_watcher_;
256   // TODO(roth): Consider holding a ref to the LDS resource and storing
257   // a pointer to the filter chain data within that LDS resource, rather
258   // than copying the filter chain data here.
259   XdsListenerResource::FilterChainMap filter_chain_map_;
260   absl::optional<XdsListenerResource::FilterChainData> default_filter_chain_;
261   Mutex mu_;
262   size_t rds_resources_yet_to_fetch_ ABSL_GUARDED_BY(mu_) = 0;
263   std::map<std::string /* resource_name */, RdsUpdateState> rds_map_
264       ABSL_GUARDED_BY(mu_);
265   std::map<const XdsListenerResource::FilterChainData*,
266            RefCountedPtr<XdsCertificateProvider>>
267       certificate_providers_map_ ABSL_GUARDED_BY(mu_);
268 };
269 
270 // A watcher implementation for listening on RDS updates referenced to by a
271 // FilterChainMatchManager object. After all referenced RDS resources are
272 // fetched (errors are allowed), the FilterChainMatchManager tries to replace
273 // the current object. The watcher continues to update the referenced RDS
274 // resources so that new XdsServerConfigSelectorProvider objects are created
275 // with the latest updates and new connections do not need to wait for the RDS
276 // resources to be fetched.
277 class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
278     RouteConfigWatcher final
279     : public XdsRouteConfigResourceType::WatcherInterface {
280  public:
RouteConfigWatcher(std::string resource_name,WeakRefCountedPtr<FilterChainMatchManager> filter_chain_match_manager)281   RouteConfigWatcher(
282       std::string resource_name,
283       WeakRefCountedPtr<FilterChainMatchManager> filter_chain_match_manager)
284       : resource_name_(std::move(resource_name)),
285         filter_chain_match_manager_(std::move(filter_chain_match_manager)) {}
286 
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> route_config,RefCountedPtr<ReadDelayHandle>)287   void OnResourceChanged(
288       absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
289           route_config,
290       RefCountedPtr<ReadDelayHandle> /* read_delay_handle */) override {
291     filter_chain_match_manager_->OnRouteConfigChanged(resource_name_,
292                                                       std::move(route_config));
293   }
294 
OnAmbientError(absl::Status status,RefCountedPtr<ReadDelayHandle>)295   void OnAmbientError(
296       absl::Status status,
297       RefCountedPtr<ReadDelayHandle> /* read_delay_handle */) override {
298     filter_chain_match_manager_->OnAmbientError(resource_name_,
299                                                 std::move(status));
300   }
301 
302  private:
303   std::string resource_name_;
304   WeakRefCountedPtr<FilterChainMatchManager> filter_chain_match_manager_;
305 };
306 
307 // An implementation of ServerConfigSelector used by
308 // StaticXdsServerConfigSelectorProvider and
309 // DynamicXdsServerConfigSelectorProvider to parse the RDS update and get
310 // per-call configuration based on incoming metadata.
311 class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
312     XdsServerConfigSelector final : public ServerConfigSelector {
313  public:
314   static absl::StatusOr<RefCountedPtr<XdsServerConfigSelector>> Create(
315       const XdsHttpFilterRegistry& http_filter_registry,
316       std::shared_ptr<const XdsRouteConfigResource> rds_update,
317       const std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>&
318           http_filters);
319   ~XdsServerConfigSelector() override = default;
320 
321   absl::StatusOr<CallConfig> GetCallConfig(
322       grpc_metadata_batch* metadata) override;
323 
324  private:
325   struct VirtualHost {
326     struct Route {
327       // true if an action other than kNonForwardingAction is configured.
328       bool unsupported_action;
329       // TODO(roth): Consider holding a ref to the RDS resource and storing
330       // a pointer to the matchers within that RDS resource, rather than
331       // copying the matchers here.
332       XdsRouteConfigResource::Route::Matchers matchers;
333       RefCountedPtr<ServiceConfig> method_config;
334     };
335 
336     class RouteListIterator final : public XdsRouting::RouteListIterator {
337      public:
RouteListIterator(const std::vector<Route> * routes)338       explicit RouteListIterator(const std::vector<Route>* routes)
339           : routes_(routes) {}
340 
Size() const341       size_t Size() const override { return routes_->size(); }
342 
GetMatchersForRoute(size_t index) const343       const XdsRouteConfigResource::Route::Matchers& GetMatchersForRoute(
344           size_t index) const override {
345         return (*routes_)[index].matchers;
346       }
347 
348      private:
349       const std::vector<Route>* routes_;
350     };
351 
352     std::vector<std::string> domains;
353     std::vector<Route> routes;
354   };
355 
356   class VirtualHostListIterator final
357       : public XdsRouting::VirtualHostListIterator {
358    public:
VirtualHostListIterator(const std::vector<VirtualHost> * virtual_hosts)359     explicit VirtualHostListIterator(
360         const std::vector<VirtualHost>* virtual_hosts)
361         : virtual_hosts_(virtual_hosts) {}
362 
Size() const363     size_t Size() const override { return virtual_hosts_->size(); }
364 
GetDomainsForVirtualHost(size_t index) const365     const std::vector<std::string>& GetDomainsForVirtualHost(
366         size_t index) const override {
367       return (*virtual_hosts_)[index].domains;
368     }
369 
370    private:
371     const std::vector<VirtualHost>* virtual_hosts_;
372   };
373 
374   std::vector<VirtualHost> virtual_hosts_;
375 };
376 
377 // An XdsServerConfigSelectorProvider implementation for when the
378 // RouteConfiguration is available inline.
379 class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
380     StaticXdsServerConfigSelectorProvider final
381     : public ServerConfigSelectorProvider {
382  public:
StaticXdsServerConfigSelectorProvider(RefCountedPtr<GrpcXdsClient> xds_client,absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> static_resource,std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter> http_filters)383   StaticXdsServerConfigSelectorProvider(
384       RefCountedPtr<GrpcXdsClient> xds_client,
385       absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
386           static_resource,
387       std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
388           http_filters)
389       : xds_client_(std::move(xds_client)),
390         static_resource_(std::move(static_resource)),
391         http_filters_(std::move(http_filters)) {}
392 
~StaticXdsServerConfigSelectorProvider()393   ~StaticXdsServerConfigSelectorProvider() override {
394     xds_client_.reset(DEBUG_LOCATION, "StaticXdsServerConfigSelectorProvider");
395   }
396 
Watch(std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher> watcher)397   absl::StatusOr<RefCountedPtr<ServerConfigSelector>> Watch(
398       std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
399           watcher) override {
400     CHECK(watcher_ == nullptr);
401     watcher_ = std::move(watcher);
402     if (!static_resource_.ok()) {
403       return static_resource_.status();
404     }
405     return XdsServerConfigSelector::Create(
406         static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
407             .http_filter_registry(),
408         static_resource_.value(), http_filters_);
409   }
410 
CancelWatch()411   void CancelWatch() override { watcher_.reset(); }
412 
413  private:
Orphaned()414   void Orphaned() override {}
415 
416   RefCountedPtr<GrpcXdsClient> xds_client_;
417   absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
418       static_resource_;
419   // TODO(roth): Consider holding a ref to the LDS resource and storing
420   // a pointer to the HTTP filters within that LDS resource, rather than
421   // copying the HTTP filters here.
422   std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
423       http_filters_;
424   std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
425       watcher_;
426 };
427 
428 // An XdsServerConfigSelectorProvider implementation for when the
429 // RouteConfiguration is to be fetched separately via RDS.
430 class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
431     DynamicXdsServerConfigSelectorProvider final
432     : public ServerConfigSelectorProvider {
433  public:
434   DynamicXdsServerConfigSelectorProvider(
435       RefCountedPtr<GrpcXdsClient> xds_client, std::string resource_name,
436       absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
437           initial_resource,
438       std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
439           http_filters);
440 
~DynamicXdsServerConfigSelectorProvider()441   ~DynamicXdsServerConfigSelectorProvider() override {
442     xds_client_.reset(DEBUG_LOCATION, "DynamicXdsServerConfigSelectorProvider");
443   }
444 
445   absl::StatusOr<RefCountedPtr<ServerConfigSelector>> Watch(
446       std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
447           watcher) override;
448   void CancelWatch() override;
449 
450  private:
451   class RouteConfigWatcher;
452 
453   void Orphaned() override;
454   void OnRouteConfigChanged(
455       absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> rds_update);
456   void OnAmbientError(absl::Status status);
457 
458   RefCountedPtr<GrpcXdsClient> xds_client_;
459   std::string resource_name_;
460   // TODO(roth): Consider holding a ref to the LDS resource and storing
461   // a pointer to the HTTP filters within that LDS resource, rather than
462   // copying the HTTP filters here.
463   std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
464       http_filters_;
465   RouteConfigWatcher* route_config_watcher_ = nullptr;
466   Mutex mu_;
467   std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
468       watcher_ ABSL_GUARDED_BY(mu_);
469   absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> resource_
470       ABSL_GUARDED_BY(mu_);
471 };
472 
473 // A watcher implementation for updating the RDS resource used by
474 // DynamicXdsServerConfigSelectorProvider
475 class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
476     DynamicXdsServerConfigSelectorProvider::RouteConfigWatcher final
477     : public XdsRouteConfigResourceType::WatcherInterface {
478  public:
RouteConfigWatcher(WeakRefCountedPtr<DynamicXdsServerConfigSelectorProvider> parent)479   explicit RouteConfigWatcher(
480       WeakRefCountedPtr<DynamicXdsServerConfigSelectorProvider> parent)
481       : parent_(std::move(parent)) {}
482 
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> route_config,RefCountedPtr<ReadDelayHandle>)483   void OnResourceChanged(
484       absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
485           route_config,
486       RefCountedPtr<ReadDelayHandle> /* read_delay_handle */) override {
487     parent_->OnRouteConfigChanged(std::move(route_config));
488   }
489 
OnAmbientError(absl::Status status,RefCountedPtr<ReadDelayHandle>)490   void OnAmbientError(
491       absl::Status status,
492       RefCountedPtr<ReadDelayHandle> /* read_delay_handle */) override {
493     parent_->OnAmbientError(std::move(status));
494   }
495 
496  private:
497   WeakRefCountedPtr<DynamicXdsServerConfigSelectorProvider> parent_;
498 };
499 
500 //
501 // XdsServerConfigFetcher
502 //
503 
XdsServerConfigFetcher(RefCountedPtr<GrpcXdsClient> xds_client,grpc_server_xds_status_notifier notifier)504 XdsServerConfigFetcher::XdsServerConfigFetcher(
505     RefCountedPtr<GrpcXdsClient> xds_client,
506     grpc_server_xds_status_notifier notifier)
507     : xds_client_(std::move(xds_client)), serving_status_notifier_(notifier) {
508   CHECK(xds_client_ != nullptr);
509 }
510 
ListenerResourceName(absl::string_view resource_name_template,absl::string_view listening_address)511 std::string ListenerResourceName(absl::string_view resource_name_template,
512                                  absl::string_view listening_address) {
513   std::string tmp;
514   if (absl::StartsWith(resource_name_template, "xdstp:")) {
515     tmp = URI::PercentEncodePath(listening_address);
516     listening_address = tmp;
517   }
518   return absl::StrReplaceAll(resource_name_template,
519                              {{"%s", listening_address}});
520 }
521 
StartWatch(std::string listening_address,std::unique_ptr<ServerConfigFetcher::WatcherInterface> watcher)522 void XdsServerConfigFetcher::StartWatch(
523     std::string listening_address,
524     std::unique_ptr<ServerConfigFetcher::WatcherInterface> watcher) {
525   ServerConfigFetcher::WatcherInterface* watcher_ptr = watcher.get();
526   auto listener_watcher = MakeRefCounted<ListenerWatcher>(
527       xds_client_.Ref(DEBUG_LOCATION, "ListenerWatcher"), std::move(watcher),
528       serving_status_notifier_, listening_address);
529   auto* listener_watcher_ptr = listener_watcher.get();
530   XdsListenerResourceType::StartWatch(
531       xds_client_.get(),
532       ListenerResourceName(
533           static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
534               .server_listener_resource_name_template(),
535           listening_address),
536       std::move(listener_watcher));
537   MutexLock lock(&mu_);
538   listener_watchers_.emplace(watcher_ptr, listener_watcher_ptr);
539 }
540 
CancelWatch(ServerConfigFetcher::WatcherInterface * watcher)541 void XdsServerConfigFetcher::CancelWatch(
542     ServerConfigFetcher::WatcherInterface* watcher) {
543   MutexLock lock(&mu_);
544   auto it = listener_watchers_.find(watcher);
545   if (it != listener_watchers_.end()) {
546     // Cancel the watch on the listener before erasing
547     XdsListenerResourceType::CancelWatch(
548         xds_client_.get(),
549         ListenerResourceName(
550             static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
551                 .server_listener_resource_name_template(),
552             it->second->listening_address()),
553         it->second, false /* delay_unsubscription */);
554     listener_watchers_.erase(it);
555   }
556 }
557 
558 //
559 // XdsServerConfigFetcher::ListenerWatcher
560 //
561 
ListenerWatcher(RefCountedPtr<GrpcXdsClient> xds_client,std::unique_ptr<ServerConfigFetcher::WatcherInterface> server_config_watcher,grpc_server_xds_status_notifier serving_status_notifier,std::string listening_address)562 XdsServerConfigFetcher::ListenerWatcher::ListenerWatcher(
563     RefCountedPtr<GrpcXdsClient> xds_client,
564     std::unique_ptr<ServerConfigFetcher::WatcherInterface>
565         server_config_watcher,
566     grpc_server_xds_status_notifier serving_status_notifier,
567     std::string listening_address)
568     : xds_client_(std::move(xds_client)),
569       server_config_watcher_(std::move(server_config_watcher)),
570       serving_status_notifier_(serving_status_notifier),
571       listening_address_(std::move(listening_address)) {}
572 
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener,RefCountedPtr<ReadDelayHandle>)573 void XdsServerConfigFetcher::ListenerWatcher::OnResourceChanged(
574     absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener,
575     RefCountedPtr<ReadDelayHandle> /* read_delay_handle */) {
576   if (!listener.ok()) {
577     MutexLock lock(&mu_);
578     OnFatalError(absl::Status(
579         listener.status().code(),
580         absl::StrCat("LDS resource: ", listener.status().message())));
581     return;
582   }
583   GRPC_TRACE_LOG(xds_server_config_fetcher, INFO)
584       << "[ListenerWatcher " << this << "] Received LDS update from xds client "
585       << xds_client_.get() << ": " << (*listener)->ToString();
586   auto* tcp_listener =
587       absl::get_if<XdsListenerResource::TcpListener>(&(*listener)->listener);
588   if (tcp_listener == nullptr) {
589     MutexLock lock(&mu_);
590     OnFatalError(
591         absl::FailedPreconditionError("LDS resource is not a TCP listener"));
592     return;
593   }
594   if (tcp_listener->address != listening_address_) {
595     MutexLock lock(&mu_);
596     OnFatalError(absl::FailedPreconditionError(
597         "Address in LDS update does not match listening address"));
598     return;
599   }
600   auto new_filter_chain_match_manager = MakeRefCounted<FilterChainMatchManager>(
601       xds_client_.Ref(DEBUG_LOCATION, "FilterChainMatchManager"),
602       tcp_listener->filter_chain_map, tcp_listener->default_filter_chain);
603   MutexLock lock(&mu_);
604   if (filter_chain_match_manager_ == nullptr ||
605       !(new_filter_chain_match_manager->filter_chain_map() ==
606             filter_chain_match_manager_->filter_chain_map() &&
607         new_filter_chain_match_manager->default_filter_chain() ==
608             filter_chain_match_manager_->default_filter_chain())) {
609     pending_filter_chain_match_manager_ =
610         std::move(new_filter_chain_match_manager);
611     pending_filter_chain_match_manager_->StartRdsWatch(
612         RefAsSubclass<ListenerWatcher>());
613   }
614 }
615 
OnAmbientError(absl::Status status,RefCountedPtr<ReadDelayHandle>)616 void XdsServerConfigFetcher::ListenerWatcher::OnAmbientError(
617     absl::Status status, RefCountedPtr<ReadDelayHandle> /*read_delay_handle*/) {
618   LOG(ERROR) << "ListenerWatcher:" << this
619              << " XdsClient reports ambient error: " << status << " for "
620              << listening_address_
621              << "; ignoring in favor of existing resource";
622 }
623 
OnFatalError(absl::Status status)624 void XdsServerConfigFetcher::ListenerWatcher::OnFatalError(
625     absl::Status status) {
626   pending_filter_chain_match_manager_.reset();
627   if (filter_chain_match_manager_ != nullptr) {
628     // The server has started listening already, so we need to gracefully
629     // stop serving.
630     server_config_watcher_->StopServing();
631     filter_chain_match_manager_.reset();
632   }
633   if (serving_status_notifier_.on_serving_status_update != nullptr) {
634     serving_status_notifier_.on_serving_status_update(
635         serving_status_notifier_.user_data, listening_address_.c_str(),
636         {static_cast<grpc_status_code>(status.raw_code()),
637          std::string(status.message()).c_str()});
638   } else {
639     LOG(ERROR) << "ListenerWatcher:" << this << " Encountered fatal error "
640                << status << "; not serving on " << listening_address_;
641   }
642 }
643 
644 void XdsServerConfigFetcher::ListenerWatcher::
PendingFilterChainMatchManagerReadyLocked(XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager * filter_chain_match_manager)645     PendingFilterChainMatchManagerReadyLocked(
646         XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager*
647             filter_chain_match_manager) {
648   if (pending_filter_chain_match_manager_ != filter_chain_match_manager) {
649     // This FilterChainMatchManager is no longer the current pending resource.
650     // It should get cleaned up eventually. Ignore this update.
651     return;
652   }
653   bool first_good_update = filter_chain_match_manager_ == nullptr;
654   // Promote the pending FilterChainMatchManager
655   filter_chain_match_manager_ = std::move(pending_filter_chain_match_manager_);
656   // TODO(yashykt): Right now, the server_config_watcher_ does not invoke
657   // XdsServerConfigFetcher while holding a lock, but that might change in the
658   // future in which case we would want to execute this update outside the
659   // critical region through a WorkSerializer similar to XdsClient.
660   server_config_watcher_->UpdateConnectionManager(filter_chain_match_manager_);
661   // Let the logger know about the update if there was no previous good update.
662   if (first_good_update) {
663     if (serving_status_notifier_.on_serving_status_update != nullptr) {
664       serving_status_notifier_.on_serving_status_update(
665           serving_status_notifier_.user_data, listening_address_.c_str(),
666           {GRPC_STATUS_OK, ""});
667     } else {
668       LOG(INFO) << "xDS Listener resource obtained; will start serving on "
669                 << listening_address_;
670     }
671   }
672 }
673 
674 //
675 // XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager
676 //
677 
678 XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
FilterChainMatchManager(RefCountedPtr<GrpcXdsClient> xds_client,XdsListenerResource::FilterChainMap filter_chain_map,absl::optional<XdsListenerResource::FilterChainData> default_filter_chain)679     FilterChainMatchManager(
680         RefCountedPtr<GrpcXdsClient> xds_client,
681         XdsListenerResource::FilterChainMap filter_chain_map,
682         absl::optional<XdsListenerResource::FilterChainData>
683             default_filter_chain)
684     : xds_client_(std::move(xds_client)),
685       filter_chain_map_(std::move(filter_chain_map)),
686       default_filter_chain_(std::move(default_filter_chain)) {}
687 
688 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
StartRdsWatch(RefCountedPtr<ListenerWatcher> listener_watcher)689     StartRdsWatch(RefCountedPtr<ListenerWatcher> listener_watcher) {
690   // Get the set of RDS resources to watch on. Also get the set of
691   // FilterChainData so that we can reverse the list of HTTP filters since
692   // received data moves *up* the stack in Core.
693   std::set<std::string> resource_names;
694   std::set<XdsListenerResource::FilterChainData*> filter_chain_data_set;
695   for (const auto& destination_ip : filter_chain_map_.destination_ip_vector) {
696     for (const auto& source_type : destination_ip.source_types_array) {
697       for (const auto& source_ip : source_type) {
698         for (const auto& source_port_pair : source_ip.ports_map) {
699           auto* filter_chain_data = source_port_pair.second.data.get();
700           const auto* rds_name = absl::get_if<std::string>(
701               &filter_chain_data->http_connection_manager.route_config);
702           if (rds_name != nullptr) resource_names.insert(*rds_name);
703           filter_chain_data_set.insert(filter_chain_data);
704         }
705       }
706     }
707   }
708   if (default_filter_chain_.has_value()) {
709     auto& hcm = default_filter_chain_->http_connection_manager;
710     const auto* rds_name = absl::get_if<std::string>(&hcm.route_config);
711     if (rds_name != nullptr) resource_names.insert(*rds_name);
712     std::reverse(hcm.http_filters.begin(), hcm.http_filters.end());
713   }
714   // Reverse the lists of HTTP filters in all the filter chains
715   for (auto* filter_chain_data : filter_chain_data_set) {
716     auto& hcm = filter_chain_data->http_connection_manager;
717     std::reverse(hcm.http_filters.begin(), hcm.http_filters.end());
718   }
719   // Start watching on referenced RDS resources
720   struct WatcherToStart {
721     std::string resource_name;
722     RefCountedPtr<RouteConfigWatcher> watcher;
723   };
724   std::vector<WatcherToStart> watchers_to_start;
725   watchers_to_start.reserve(resource_names.size());
726   {
727     MutexLock lock(&mu_);
728     for (const auto& resource_name : resource_names) {
729       ++rds_resources_yet_to_fetch_;
730       auto route_config_watcher = MakeRefCounted<RouteConfigWatcher>(
731           resource_name, WeakRefAsSubclass<FilterChainMatchManager>());
732       rds_map_.emplace(resource_name, RdsUpdateState{route_config_watcher.get(),
733                                                      absl::nullopt});
734       watchers_to_start.push_back(
735           WatcherToStart{resource_name, std::move(route_config_watcher)});
736     }
737     if (rds_resources_yet_to_fetch_ != 0) {
738       listener_watcher_ = std::move(listener_watcher);
739       listener_watcher = nullptr;
740     }
741   }
742   for (auto& watcher_to_start : watchers_to_start) {
743     XdsRouteConfigResourceType::StartWatch(xds_client_.get(),
744                                            watcher_to_start.resource_name,
745                                            std::move(watcher_to_start.watcher));
746   }
747   // Promote this filter chain match manager if all referenced resources are
748   // fetched.
749   if (listener_watcher != nullptr) {
750     listener_watcher->PendingFilterChainMatchManagerReadyLocked(this);
751   }
752 }
753 
754 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
Orphaned()755     Orphaned() {
756   MutexLock lock(&mu_);
757   // Cancel the RDS watches to clear up the weak refs
758   for (const auto& entry : rds_map_) {
759     XdsRouteConfigResourceType::CancelWatch(xds_client_.get(), entry.first,
760                                             entry.second.watcher,
761                                             false /* delay_unsubscription */);
762   }
763   // Also give up the ref on ListenerWatcher since it won't be needed anymore
764   listener_watcher_.reset();
765 }
766 
767 absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
768 XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
CreateOrGetXdsCertificateProviderFromFilterChainData(const XdsListenerResource::FilterChainData * filter_chain)769     CreateOrGetXdsCertificateProviderFromFilterChainData(
770         const XdsListenerResource::FilterChainData* filter_chain) {
771   MutexLock lock(&mu_);
772   auto it = certificate_providers_map_.find(filter_chain);
773   if (it != certificate_providers_map_.end()) return it->second;
774   // Configure root cert.
775   auto* ca_cert_provider =
776       absl::get_if<CommonTlsContext::CertificateProviderPluginInstance>(
777           &filter_chain->downstream_tls_context.common_tls_context
778                .certificate_validation_context.ca_certs);
779   absl::string_view root_provider_cert_name;
780   RefCountedPtr<grpc_tls_certificate_provider> root_cert_provider;
781   if (ca_cert_provider != nullptr) {
782     root_provider_cert_name = ca_cert_provider->certificate_name;
783     root_cert_provider =
784         xds_client_->certificate_provider_store()
785             .CreateOrGetCertificateProvider(ca_cert_provider->instance_name);
786     if (root_cert_provider == nullptr) {
787       return absl::NotFoundError(
788           absl::StrCat("Certificate provider instance name: \"",
789                        ca_cert_provider->instance_name, "\" not recognized."));
790     }
791   }
792   // Configure identity cert.
793   absl::string_view identity_provider_instance_name =
794       filter_chain->downstream_tls_context.common_tls_context
795           .tls_certificate_provider_instance.instance_name;
796   absl::string_view identity_provider_cert_name =
797       filter_chain->downstream_tls_context.common_tls_context
798           .tls_certificate_provider_instance.certificate_name;
799   RefCountedPtr<grpc_tls_certificate_provider> identity_cert_provider;
800   if (!identity_provider_instance_name.empty()) {
801     identity_cert_provider =
802         xds_client_->certificate_provider_store()
803             .CreateOrGetCertificateProvider(identity_provider_instance_name);
804     if (identity_cert_provider == nullptr) {
805       return absl::NotFoundError(
806           absl::StrCat("Certificate provider instance name: \"",
807                        identity_provider_instance_name, "\" not recognized."));
808     }
809   }
810   auto xds_cert_provider = MakeRefCounted<XdsCertificateProvider>(
811       std::move(root_cert_provider), root_provider_cert_name,
812       std::move(identity_cert_provider), identity_provider_cert_name,
813       filter_chain->downstream_tls_context.require_client_certificate);
814   certificate_providers_map_.emplace(filter_chain, xds_cert_provider);
815   return xds_cert_provider;
816 }
817 
818 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
OnRouteConfigChanged(const std::string & resource_name,absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> route_config)819     OnRouteConfigChanged(
820         const std::string& resource_name,
821         absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
822             route_config) {
823   RefCountedPtr<ListenerWatcher> listener_watcher;
824   {
825     MutexLock lock(&mu_);
826     auto& state = rds_map_[resource_name];
827     if (!state.rds_update.has_value()) {
828       if (--rds_resources_yet_to_fetch_ == 0) {
829         listener_watcher = std::move(listener_watcher_);
830       }
831     }
832     if (!route_config.ok()) {
833       route_config = absl::UnavailableError(
834           absl::StrCat("RDS resource ", resource_name, ": ",
835                        route_config.status().message()));
836     }
837     state.rds_update = std::move(route_config);
838   }
839   // Promote the filter chain match manager object if all the referenced
840   // resources are fetched.
841   if (listener_watcher != nullptr) {
842     listener_watcher->PendingFilterChainMatchManagerReady(this);
843   }
844 }
845 
846 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
OnAmbientError(const std::string & resource_name,absl::Status status)847     OnAmbientError(const std::string& resource_name, absl::Status status) {
848   LOG(ERROR) << "RouteConfigWatcher:" << this
849              << " XdsClient reports ambient error: " << status << " for "
850              << resource_name << "; ignoring in favor of existing resource";
851 }
852 
FindFilterChainDataForSourcePort(const XdsListenerResource::FilterChainMap::SourcePortsMap & source_ports_map,absl::string_view port_str)853 const XdsListenerResource::FilterChainData* FindFilterChainDataForSourcePort(
854     const XdsListenerResource::FilterChainMap::SourcePortsMap& source_ports_map,
855     absl::string_view port_str) {
856   int port = 0;
857   if (!absl::SimpleAtoi(port_str, &port)) return nullptr;
858   auto it = source_ports_map.find(port);
859   if (it != source_ports_map.end()) {
860     return it->second.data.get();
861   }
862   // Search for the catch-all port 0 since we didn't get a direct match
863   it = source_ports_map.find(0);
864   if (it != source_ports_map.end()) {
865     return it->second.data.get();
866   }
867   return nullptr;
868 }
869 
FindFilterChainDataForSourceIp(const XdsListenerResource::FilterChainMap::SourceIpVector & source_ip_vector,const grpc_resolved_address * source_ip,absl::string_view port)870 const XdsListenerResource::FilterChainData* FindFilterChainDataForSourceIp(
871     const XdsListenerResource::FilterChainMap::SourceIpVector& source_ip_vector,
872     const grpc_resolved_address* source_ip, absl::string_view port) {
873   const XdsListenerResource::FilterChainMap::SourceIp* best_match = nullptr;
874   for (const auto& entry : source_ip_vector) {
875     // Special case for catch-all
876     if (!entry.prefix_range.has_value()) {
877       if (best_match == nullptr) {
878         best_match = &entry;
879       }
880       continue;
881     }
882     if (best_match != nullptr && best_match->prefix_range.has_value() &&
883         best_match->prefix_range->prefix_len >=
884             entry.prefix_range->prefix_len) {
885       continue;
886     }
887     if (grpc_sockaddr_match_subnet(source_ip, &entry.prefix_range->address,
888                                    entry.prefix_range->prefix_len)) {
889       best_match = &entry;
890     }
891   }
892   if (best_match == nullptr) return nullptr;
893   return FindFilterChainDataForSourcePort(best_match->ports_map, port);
894 }
895 
IsLoopbackIp(const grpc_resolved_address * address)896 bool IsLoopbackIp(const grpc_resolved_address* address) {
897   const grpc_sockaddr* sock_addr =
898       reinterpret_cast<const grpc_sockaddr*>(&address->addr);
899   if (sock_addr->sa_family == GRPC_AF_INET) {
900     const grpc_sockaddr_in* addr4 =
901         reinterpret_cast<const grpc_sockaddr_in*>(sock_addr);
902     if (addr4->sin_addr.s_addr == grpc_htonl(INADDR_LOOPBACK)) {
903       return true;
904     }
905   } else if (sock_addr->sa_family == GRPC_AF_INET6) {
906     const grpc_sockaddr_in6* addr6 =
907         reinterpret_cast<const grpc_sockaddr_in6*>(sock_addr);
908     if (memcmp(&addr6->sin6_addr, &in6addr_loopback,
909                sizeof(in6addr_loopback)) == 0) {
910       return true;
911     }
912   }
913   return false;
914 }
915 
FindFilterChainDataForSourceType(const XdsListenerResource::FilterChainMap::ConnectionSourceTypesArray & source_types_array,grpc_endpoint * tcp,absl::string_view destination_ip)916 const XdsListenerResource::FilterChainData* FindFilterChainDataForSourceType(
917     const XdsListenerResource::FilterChainMap::ConnectionSourceTypesArray&
918         source_types_array,
919     grpc_endpoint* tcp, absl::string_view destination_ip) {
920   auto source_uri = URI::Parse(grpc_endpoint_get_peer(tcp));
921   if (!source_uri.ok() ||
922       (source_uri->scheme() != "ipv4" && source_uri->scheme() != "ipv6")) {
923     return nullptr;
924   }
925   std::string host;
926   std::string port;
927   if (!SplitHostPort(source_uri->path(), &host, &port)) {
928     return nullptr;
929   }
930   auto source_addr = StringToSockaddr(host, 0);  // Port doesn't matter here.
931   if (!source_addr.ok()) {
932     VLOG(2) << "Could not parse \"" << host
933             << "\" as socket address: " << source_addr.status();
934     return nullptr;
935   }
936   // Use kAny only if kSameIporLoopback and kExternal are empty
937   if (source_types_array[static_cast<int>(
938                              XdsListenerResource::FilterChainMap::
939                                  ConnectionSourceType::kSameIpOrLoopback)]
940           .empty() &&
941       source_types_array[static_cast<int>(XdsListenerResource::FilterChainMap::
942                                               ConnectionSourceType::kExternal)]
943           .empty()) {
944     return FindFilterChainDataForSourceIp(
945         source_types_array[static_cast<int>(
946             XdsListenerResource::FilterChainMap::ConnectionSourceType::kAny)],
947         &*source_addr, port);
948   }
949   if (IsLoopbackIp(&*source_addr) || host == destination_ip) {
950     return FindFilterChainDataForSourceIp(
951         source_types_array[static_cast<int>(
952             XdsListenerResource::FilterChainMap::ConnectionSourceType::
953                 kSameIpOrLoopback)],
954         &*source_addr, port);
955   } else {
956     return FindFilterChainDataForSourceIp(
957         source_types_array[static_cast<int>(
958             XdsListenerResource::FilterChainMap::ConnectionSourceType::
959                 kExternal)],
960         &*source_addr, port);
961   }
962 }
963 
FindFilterChainDataForDestinationIp(const XdsListenerResource::FilterChainMap::DestinationIpVector destination_ip_vector,grpc_endpoint * tcp)964 const XdsListenerResource::FilterChainData* FindFilterChainDataForDestinationIp(
965     const XdsListenerResource::FilterChainMap::DestinationIpVector
966         destination_ip_vector,
967     grpc_endpoint* tcp) {
968   auto destination_uri = URI::Parse(grpc_endpoint_get_local_address(tcp));
969   if (!destination_uri.ok() || (destination_uri->scheme() != "ipv4" &&
970                                 destination_uri->scheme() != "ipv6")) {
971     return nullptr;
972   }
973   std::string host;
974   std::string port;
975   if (!SplitHostPort(destination_uri->path(), &host, &port)) {
976     return nullptr;
977   }
978   auto destination_addr =
979       StringToSockaddr(host, 0);  // Port doesn't matter here.
980   if (!destination_addr.ok()) {
981     VLOG(2) << "Could not parse \"" << host
982             << "\" as socket address: " << destination_addr.status();
983     return nullptr;
984   }
985   const XdsListenerResource::FilterChainMap::DestinationIp* best_match =
986       nullptr;
987   for (const auto& entry : destination_ip_vector) {
988     // Special case for catch-all
989     if (!entry.prefix_range.has_value()) {
990       if (best_match == nullptr) {
991         best_match = &entry;
992       }
993       continue;
994     }
995     if (best_match != nullptr && best_match->prefix_range.has_value() &&
996         best_match->prefix_range->prefix_len >=
997             entry.prefix_range->prefix_len) {
998       continue;
999     }
1000     if (grpc_sockaddr_match_subnet(&*destination_addr,
1001                                    &entry.prefix_range->address,
1002                                    entry.prefix_range->prefix_len)) {
1003       best_match = &entry;
1004     }
1005   }
1006   if (best_match == nullptr) return nullptr;
1007   return FindFilterChainDataForSourceType(best_match->source_types_array, tcp,
1008                                           host);
1009 }
1010 
1011 absl::StatusOr<ChannelArgs> XdsServerConfigFetcher::ListenerWatcher::
UpdateChannelArgsForConnection(const ChannelArgs & input_args,grpc_endpoint * tcp)1012     FilterChainMatchManager::UpdateChannelArgsForConnection(
1013         const ChannelArgs& input_args, grpc_endpoint* tcp) {
1014   ChannelArgs args = input_args;
1015   const auto* filter_chain = FindFilterChainDataForDestinationIp(
1016       filter_chain_map_.destination_ip_vector, tcp);
1017   if (filter_chain == nullptr && default_filter_chain_.has_value()) {
1018     filter_chain = &default_filter_chain_.value();
1019   }
1020   if (filter_chain == nullptr) {
1021     return absl::UnavailableError("No matching filter chain found");
1022   }
1023   RefCountedPtr<ServerConfigSelectorProvider> server_config_selector_provider;
1024   RefCountedPtr<XdsChannelStackModifier> channel_stack_modifier;
1025   RefCountedPtr<XdsCertificateProvider> xds_certificate_provider;
1026   // Iterate the list of HTTP filters in reverse since in Core, received data
1027   // flows *up* the stack.
1028   std::vector<const grpc_channel_filter*> filters;
1029   const auto& http_filter_registry =
1030       static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
1031           .http_filter_registry();
1032   for (const auto& http_filter :
1033        filter_chain->http_connection_manager.http_filters) {
1034     // Find filter.  This is guaranteed to succeed, because it's checked
1035     // at config validation time in the XdsApi code.
1036     const XdsHttpFilterImpl* filter_impl =
1037         http_filter_registry.GetFilterForType(
1038             http_filter.config.config_proto_type_name);
1039     CHECK_NE(filter_impl, nullptr);
1040     // Some filters like the router filter are no-op filters and do not have
1041     // an implementation.
1042     if (filter_impl->channel_filter() != nullptr) {
1043       filters.push_back(filter_impl->channel_filter());
1044     }
1045   }
1046   // Add config selector filter.
1047   filters.push_back(&kServerConfigSelectorFilter);
1048   channel_stack_modifier =
1049       MakeRefCounted<XdsChannelStackModifier>(std::move(filters));
1050   Match(
1051       filter_chain->http_connection_manager.route_config,
1052       // RDS resource name
1053       [&](const std::string& rds_name) {
1054         absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
1055             initial_resource;
1056         {
1057           MutexLock lock(&mu_);
1058           initial_resource = rds_map_[rds_name].rds_update.value();
1059         }
1060         server_config_selector_provider =
1061             MakeRefCounted<DynamicXdsServerConfigSelectorProvider>(
1062                 xds_client_.Ref(DEBUG_LOCATION,
1063                                 "DynamicXdsServerConfigSelectorProvider"),
1064                 rds_name, std::move(initial_resource),
1065                 filter_chain->http_connection_manager.http_filters);
1066       },
1067       // inline RouteConfig
1068       [&](const std::shared_ptr<const XdsRouteConfigResource>& route_config) {
1069         server_config_selector_provider =
1070             MakeRefCounted<StaticXdsServerConfigSelectorProvider>(
1071                 xds_client_.Ref(DEBUG_LOCATION,
1072                                 "StaticXdsServerConfigSelectorProvider"),
1073                 route_config,
1074                 filter_chain->http_connection_manager.http_filters);
1075       });
1076   args = args.SetObject(server_config_selector_provider)
1077              .SetObject(channel_stack_modifier);
1078   // Add XdsCertificateProvider if credentials are xDS.
1079   auto* server_creds = args.GetObject<grpc_server_credentials>();
1080   if (server_creds != nullptr &&
1081       server_creds->type() == XdsServerCredentials::Type()) {
1082     absl::StatusOr<RefCountedPtr<XdsCertificateProvider>> result =
1083         CreateOrGetXdsCertificateProviderFromFilterChainData(filter_chain);
1084     if (!result.ok()) {
1085       return result.status();
1086     }
1087     xds_certificate_provider = std::move(*result);
1088     CHECK(xds_certificate_provider != nullptr);
1089     args = args.SetObject(xds_certificate_provider);
1090   }
1091   return args;
1092 }
1093 
1094 //
1095 // XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::XdsServerConfigSelector
1096 //
1097 
1098 absl::StatusOr<
1099     RefCountedPtr<XdsServerConfigFetcher::ListenerWatcher::
1100                       FilterChainMatchManager::XdsServerConfigSelector>>
1101 XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
Create(const XdsHttpFilterRegistry & http_filter_registry,std::shared_ptr<const XdsRouteConfigResource> rds_update,const std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter> & http_filters)1102     XdsServerConfigSelector::Create(
1103         const XdsHttpFilterRegistry& http_filter_registry,
1104         std::shared_ptr<const XdsRouteConfigResource> rds_update,
1105         const std::vector<
1106             XdsListenerResource::HttpConnectionManager::HttpFilter>&
1107             http_filters) {
1108   auto config_selector = MakeRefCounted<XdsServerConfigSelector>();
1109   for (auto& vhost : rds_update->virtual_hosts) {
1110     config_selector->virtual_hosts_.emplace_back();
1111     auto& virtual_host = config_selector->virtual_hosts_.back();
1112     virtual_host.domains = vhost.domains;
1113     for (auto& route : vhost.routes) {
1114       virtual_host.routes.emplace_back();
1115       auto& config_selector_route = virtual_host.routes.back();
1116       config_selector_route.matchers = route.matchers;
1117       config_selector_route.unsupported_action =
1118           absl::get_if<XdsRouteConfigResource::Route::NonForwardingAction>(
1119               &route.action) == nullptr;
1120       auto result = XdsRouting::GeneratePerHTTPFilterConfigsForMethodConfig(
1121           http_filter_registry, http_filters, vhost, route, nullptr,
1122           ChannelArgs());
1123       if (!result.ok()) return result.status();
1124       std::vector<std::string> fields;
1125       fields.reserve(result->per_filter_configs.size());
1126       for (const auto& p : result->per_filter_configs) {
1127         fields.emplace_back(absl::StrCat("    \"", p.first, "\": [\n",
1128                                          absl::StrJoin(p.second, ",\n"),
1129                                          "\n    ]"));
1130       }
1131       if (!fields.empty()) {
1132         std::string json = absl::StrCat(
1133             "{\n"
1134             "  \"methodConfig\": [ {\n"
1135             "    \"name\": [\n"
1136             "      {}\n"
1137             "    ],\n"
1138             "    ",
1139             absl::StrJoin(fields, ",\n"),
1140             "\n  } ]\n"
1141             "}");
1142         config_selector_route.method_config =
1143             ServiceConfigImpl::Create(result->args, json.c_str()).value();
1144       }
1145     }
1146   }
1147   return config_selector;
1148 }
1149 
1150 absl::StatusOr<ServerConfigSelector::CallConfig>
1151 XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
GetCallConfig(grpc_metadata_batch * metadata)1152     XdsServerConfigSelector::GetCallConfig(grpc_metadata_batch* metadata) {
1153   CallConfig call_config;
1154   if (metadata->get_pointer(HttpPathMetadata()) == nullptr) {
1155     return absl::InternalError("no path found");
1156   }
1157   absl::string_view path =
1158       metadata->get_pointer(HttpPathMetadata())->as_string_view();
1159   if (metadata->get_pointer(HttpAuthorityMetadata()) == nullptr) {
1160     return absl::InternalError("no authority found");
1161   }
1162   absl::string_view authority =
1163       metadata->get_pointer(HttpAuthorityMetadata())->as_string_view();
1164   auto vhost_index = XdsRouting::FindVirtualHostForDomain(
1165       VirtualHostListIterator(&virtual_hosts_), authority);
1166   if (!vhost_index.has_value()) {
1167     return absl::UnavailableError(
1168         absl::StrCat("could not find VirtualHost for ", authority,
1169                      " in RouteConfiguration"));
1170   }
1171   auto& virtual_host = virtual_hosts_[vhost_index.value()];
1172   auto route_index = XdsRouting::GetRouteForRequest(
1173       VirtualHost::RouteListIterator(&virtual_host.routes), path, metadata);
1174   if (route_index.has_value()) {
1175     auto& route = virtual_host.routes[route_index.value()];
1176     // Found the matching route
1177     if (route.unsupported_action) {
1178       return absl::UnavailableError("matching route has unsupported action");
1179     }
1180     if (route.method_config != nullptr) {
1181       call_config.method_configs =
1182           route.method_config->GetMethodParsedConfigVector(grpc_empty_slice());
1183       call_config.service_config = route.method_config;
1184     }
1185     return call_config;
1186   }
1187   return absl::UnavailableError("no route matched");
1188 }
1189 
1190 //
1191 // XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::DynamicXdsServerConfigSelectorProvider
1192 //
1193 
1194 XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
1195     DynamicXdsServerConfigSelectorProvider::
DynamicXdsServerConfigSelectorProvider(RefCountedPtr<GrpcXdsClient> xds_client,std::string resource_name,absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> initial_resource,std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter> http_filters)1196         DynamicXdsServerConfigSelectorProvider(
1197             RefCountedPtr<GrpcXdsClient> xds_client, std::string resource_name,
1198             absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
1199                 initial_resource,
1200             std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
1201                 http_filters)
1202     : xds_client_(std::move(xds_client)),
1203       resource_name_(std::move(resource_name)),
1204       http_filters_(std::move(http_filters)),
1205       resource_(std::move(initial_resource)) {
1206   CHECK(!resource_name_.empty());
1207   // RouteConfigWatcher is being created here instead of in Watch() to avoid
1208   // deadlocks from invoking XdsRouteConfigResourceType::StartWatch whilst in a
1209   // critical region.
1210   auto route_config_watcher = MakeRefCounted<RouteConfigWatcher>(
1211       WeakRefAsSubclass<DynamicXdsServerConfigSelectorProvider>());
1212   route_config_watcher_ = route_config_watcher.get();
1213   XdsRouteConfigResourceType::StartWatch(xds_client_.get(), resource_name_,
1214                                          std::move(route_config_watcher));
1215 }
1216 
1217 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
Orphaned()1218     DynamicXdsServerConfigSelectorProvider::Orphaned() {
1219   XdsRouteConfigResourceType::CancelWatch(xds_client_.get(), resource_name_,
1220                                           route_config_watcher_,
1221                                           false /* delay_unsubscription */);
1222 }
1223 
1224 absl::StatusOr<RefCountedPtr<ServerConfigSelector>>
1225 XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
Watch(std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher> watcher)1226     DynamicXdsServerConfigSelectorProvider::Watch(
1227         std::unique_ptr<
1228             ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
1229             watcher) {
1230   absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> resource;
1231   {
1232     MutexLock lock(&mu_);
1233     CHECK(watcher_ == nullptr);
1234     watcher_ = std::move(watcher);
1235     resource = resource_;
1236   }
1237   if (!resource.ok()) {
1238     return resource.status();
1239   }
1240   return XdsServerConfigSelector::Create(
1241       static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
1242           .http_filter_registry(),
1243       resource.value(), http_filters_);
1244 }
1245 
1246 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
CancelWatch()1247     DynamicXdsServerConfigSelectorProvider::CancelWatch() {
1248   MutexLock lock(&mu_);
1249   watcher_.reset();
1250 }
1251 
1252 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
OnRouteConfigChanged(absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> rds_update)1253     DynamicXdsServerConfigSelectorProvider::OnRouteConfigChanged(
1254         absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
1255             rds_update) {
1256   MutexLock lock(&mu_);
1257   if (!rds_update.ok()) {
1258     rds_update = absl::UnavailableError(absl::StrCat(
1259         "RDS resource ", resource_name_, ": ", rds_update.status().message()));
1260   }
1261   resource_ = std::move(rds_update);
1262   if (watcher_ == nullptr) {
1263     return;
1264   }
1265   // Currently server_config_selector_filter does not call into
1266   // DynamicXdsServerConfigSelectorProvider while holding a lock, but if that
1267   // ever changes, we would want to invoke the update outside the critical
1268   // region with the use of a WorkSerializer.
1269   if (!resource_.ok()) {
1270     watcher_->OnServerConfigSelectorUpdate(resource_.status());
1271   } else {
1272     watcher_->OnServerConfigSelectorUpdate(XdsServerConfigSelector::Create(
1273         static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
1274             .http_filter_registry(),
1275         *resource_, http_filters_));
1276   }
1277 }
1278 
1279 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
OnAmbientError(absl::Status status)1280     DynamicXdsServerConfigSelectorProvider::OnAmbientError(
1281         absl::Status status) {
1282   LOG(ERROR) << "RouteConfigWatcher:" << this
1283              << " XdsClient reports ambient error: " << status << " for "
1284              << resource_name_ << "; ignoring in favor of existing resource";
1285 }
1286 
1287 }  // namespace
1288 }  // namespace grpc_core
1289 
grpc_server_config_fetcher_xds_create(grpc_server_xds_status_notifier notifier,const grpc_channel_args * args)1290 grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
1291     grpc_server_xds_status_notifier notifier, const grpc_channel_args* args) {
1292   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1293   grpc_core::ExecCtx exec_ctx;
1294   grpc_core::ChannelArgs channel_args = grpc_core::CoreConfiguration::Get()
1295                                             .channel_args_preconditioning()
1296                                             .PreconditionChannelArgs(args);
1297   GRPC_TRACE_LOG(api, INFO)
1298       << "grpc_server_config_fetcher_xds_create(notifier={on_serving_status_"
1299          "update="
1300       << notifier.on_serving_status_update
1301       << ", user_data=" << notifier.user_data << "}, args=" << args << ")";
1302   auto xds_client = grpc_core::GrpcXdsClient::GetOrCreate(
1303       grpc_core::GrpcXdsClient::kServerKey, channel_args,
1304       "XdsServerConfigFetcher");
1305   if (!xds_client.ok()) {
1306     LOG(ERROR) << "Failed to create xds client: " << xds_client.status();
1307     return nullptr;
1308   }
1309   if (static_cast<const grpc_core::GrpcXdsBootstrap&>(
1310           (*xds_client)->bootstrap())
1311           .server_listener_resource_name_template()
1312           .empty()) {
1313     LOG(ERROR) << "server_listener_resource_name_template not provided in "
1314                   "bootstrap file.";
1315     return nullptr;
1316   }
1317   return (new grpc_core::XdsServerConfigFetcher(std::move(*xds_client),
1318                                                 notifier))
1319       ->c_ptr();
1320 }
1321