1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/credentials.h>
20 #include <grpc/grpc.h>
21 #include <grpc/grpc_security.h>
22 #include <grpc/slice.h>
23 #include <grpc/status.h>
24 #include <grpc/support/port_platform.h>
25 #include <string.h>
26
27 #include <algorithm>
28 #include <map>
29 #include <memory>
30 #include <set>
31 #include <string>
32 #include <utility>
33 #include <vector>
34
35 #include "absl/base/thread_annotations.h"
36 #include "absl/log/check.h"
37 #include "absl/log/log.h"
38 #include "absl/status/status.h"
39 #include "absl/status/statusor.h"
40 #include "absl/strings/match.h"
41 #include "absl/strings/numbers.h"
42 #include "absl/strings/str_cat.h"
43 #include "absl/strings/str_join.h"
44 #include "absl/strings/str_replace.h"
45 #include "absl/strings/string_view.h"
46 #include "absl/types/optional.h"
47 #include "absl/types/variant.h"
48 #include "src/core/config/core_configuration.h"
49 #include "src/core/lib/address_utils/parse_address.h"
50 #include "src/core/lib/address_utils/sockaddr_utils.h"
51 #include "src/core/lib/channel/channel_args.h"
52 #include "src/core/lib/channel/channel_args_preconditioning.h"
53 #include "src/core/lib/channel/channel_fwd.h"
54 #include "src/core/lib/debug/trace.h"
55 #include "src/core/lib/iomgr/endpoint.h"
56 #include "src/core/lib/iomgr/exec_ctx.h"
57 #include "src/core/lib/iomgr/iomgr_fwd.h"
58 #include "src/core/lib/iomgr/resolved_address.h"
59 #include "src/core/lib/iomgr/sockaddr.h"
60 #include "src/core/lib/iomgr/socket_utils.h"
61 #include "src/core/lib/security/credentials/credentials.h"
62 #include "src/core/lib/security/credentials/tls/grpc_tls_certificate_distributor.h"
63 #include "src/core/lib/security/credentials/tls/grpc_tls_certificate_provider.h"
64 #include "src/core/lib/security/credentials/xds/xds_credentials.h"
65 #include "src/core/lib/transport/metadata_batch.h"
66 #include "src/core/server/server.h"
67 #include "src/core/server/server_config_selector.h"
68 #include "src/core/server/server_config_selector_filter.h"
69 #include "src/core/server/xds_channel_stack_modifier.h"
70 #include "src/core/service_config/service_config.h"
71 #include "src/core/service_config/service_config_impl.h"
72 #include "src/core/util/debug_location.h"
73 #include "src/core/util/host_port.h"
74 #include "src/core/util/match.h"
75 #include "src/core/util/ref_counted_ptr.h"
76 #include "src/core/util/sync.h"
77 #include "src/core/util/unique_type_name.h"
78 #include "src/core/util/uri.h"
79 #include "src/core/xds/grpc/certificate_provider_store.h"
80 #include "src/core/xds/grpc/xds_bootstrap_grpc.h"
81 #include "src/core/xds/grpc/xds_certificate_provider.h"
82 #include "src/core/xds/grpc/xds_client_grpc.h"
83 #include "src/core/xds/grpc/xds_common_types.h"
84 #include "src/core/xds/grpc/xds_http_filter.h"
85 #include "src/core/xds/grpc/xds_http_filter_registry.h"
86 #include "src/core/xds/grpc/xds_listener.h"
87 #include "src/core/xds/grpc/xds_listener_parser.h"
88 #include "src/core/xds/grpc/xds_route_config.h"
89 #include "src/core/xds/grpc/xds_route_config_parser.h"
90 #include "src/core/xds/grpc/xds_routing.h"
91 #include "src/core/xds/xds_client/xds_client.h"
92
93 namespace grpc_core {
94 namespace {
95
96 using ReadDelayHandle = XdsClient::ReadDelayHandle;
97
98 // A server config fetcher that fetches the information for configuring server
99 // listeners from the xDS control plane.
100 class XdsServerConfigFetcher final : public ServerConfigFetcher {
101 public:
102 XdsServerConfigFetcher(RefCountedPtr<GrpcXdsClient> xds_client,
103 grpc_server_xds_status_notifier notifier);
104
~XdsServerConfigFetcher()105 ~XdsServerConfigFetcher() override {
106 xds_client_.reset(DEBUG_LOCATION, "XdsServerConfigFetcher");
107 }
108
109 void StartWatch(
110 std::string listening_address,
111 std::unique_ptr<ServerConfigFetcher::WatcherInterface> watcher) override;
112
113 void CancelWatch(ServerConfigFetcher::WatcherInterface* watcher) override;
114
115 // Return the interested parties from the xds client so that it can be polled.
interested_parties()116 grpc_pollset_set* interested_parties() override {
117 return xds_client_->interested_parties();
118 }
119
120 private:
121 class ListenerWatcher;
122
123 RefCountedPtr<GrpcXdsClient> xds_client_;
124 const grpc_server_xds_status_notifier serving_status_notifier_;
125 Mutex mu_;
126 std::map<ServerConfigFetcher::WatcherInterface*, ListenerWatcher*>
127 listener_watchers_ ABSL_GUARDED_BY(mu_);
128 };
129
130 // A watcher implementation for listening on LDS updates from the xDS control
131 // plane. When a good LDS update is received, it creates a
132 // FilterChainMatchManager object that would replace the existing (if any)
133 // FilterChainMatchManager object after all referenced RDS resources are
134 // fetched. Note that a good update also causes the server listener to start
135 // listening if it isn't already. If an error LDS update is received (NACKed
136 // resource, timeouts), the previous good FilterChainMatchManager, if any,
137 // continues to be used. If there isn't any previous good update or if the
138 // update received was a fatal error (resource does not exist), the server
139 // listener is made to stop listening.
140 class XdsServerConfigFetcher::ListenerWatcher final
141 : public XdsListenerResourceType::WatcherInterface {
142 public:
143 ListenerWatcher(RefCountedPtr<GrpcXdsClient> xds_client,
144 std::unique_ptr<ServerConfigFetcher::WatcherInterface>
145 server_config_watcher,
146 grpc_server_xds_status_notifier serving_status_notifier,
147 std::string listening_address);
148
~ListenerWatcher()149 ~ListenerWatcher() override {
150 xds_client_.reset(DEBUG_LOCATION, "ListenerWatcher");
151 }
152
153 void OnResourceChanged(
154 absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener,
155 RefCountedPtr<ReadDelayHandle> read_delay_handle) override;
156
157 void OnAmbientError(
158 absl::Status status,
159 RefCountedPtr<ReadDelayHandle> read_delay_handle) override;
160
listening_address() const161 const std::string& listening_address() const { return listening_address_; }
162
163 private:
164 class FilterChainMatchManager;
165
166 void OnFatalError(absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
167
168 // Invoked by FilterChainMatchManager that is done fetching all referenced RDS
169 // resources. If the calling FilterChainMatchManager is the
170 // pending_filter_chain_match_manager_, it is promoted to be the
171 // filter_chain_match_manager_ in use.
PendingFilterChainMatchManagerReady(FilterChainMatchManager * filter_chain_match_manager)172 void PendingFilterChainMatchManagerReady(
173 FilterChainMatchManager* filter_chain_match_manager) {
174 MutexLock lock(&mu_);
175 PendingFilterChainMatchManagerReadyLocked(filter_chain_match_manager);
176 }
177 void PendingFilterChainMatchManagerReadyLocked(
178 FilterChainMatchManager* filter_chain_match_manager)
179 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
180
181 RefCountedPtr<GrpcXdsClient> xds_client_;
182 const std::unique_ptr<ServerConfigFetcher::WatcherInterface>
183 server_config_watcher_;
184 const grpc_server_xds_status_notifier serving_status_notifier_;
185 const std::string listening_address_;
186 Mutex mu_;
187 RefCountedPtr<FilterChainMatchManager> filter_chain_match_manager_
188 ABSL_GUARDED_BY(mu_);
189 RefCountedPtr<FilterChainMatchManager> pending_filter_chain_match_manager_
190 ABSL_GUARDED_BY(mu_);
191 };
192
193 // A connection manager used by the server listener code to inject channel args
194 // to be used for each incoming connection. This implementation chooses the
195 // appropriate filter chain from the xDS Listener resource and injects channel
196 // args that configure the right mTLS certs and cause the right set of HTTP
197 // filters to be injected.
198 class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager final
199 : public ServerConfigFetcher::ConnectionManager {
200 public:
201 FilterChainMatchManager(RefCountedPtr<GrpcXdsClient> xds_client,
202 XdsListenerResource::FilterChainMap filter_chain_map,
203 absl::optional<XdsListenerResource::FilterChainData>
204 default_filter_chain);
205
~FilterChainMatchManager()206 ~FilterChainMatchManager() override {
207 xds_client_.reset(DEBUG_LOCATION, "FilterChainMatchManager");
208 }
209
210 absl::StatusOr<ChannelArgs> UpdateChannelArgsForConnection(
211 const ChannelArgs& args, grpc_endpoint* tcp) override;
212
213 // Invoked by ListenerWatcher to start fetching referenced RDS resources.
214 void StartRdsWatch(RefCountedPtr<ListenerWatcher> listener_watcher)
215 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ListenerWatcher::mu_);
216
filter_chain_map() const217 const XdsListenerResource::FilterChainMap& filter_chain_map() const {
218 return filter_chain_map_;
219 }
220
221 const absl::optional<XdsListenerResource::FilterChainData>&
default_filter_chain() const222 default_filter_chain() const {
223 return default_filter_chain_;
224 }
225
226 private:
227 class RouteConfigWatcher;
228 struct RdsUpdateState {
229 RouteConfigWatcher* watcher;
230 absl::optional<
231 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>>
232 rds_update;
233 };
234
235 class XdsServerConfigSelector;
236 class StaticXdsServerConfigSelectorProvider;
237 class DynamicXdsServerConfigSelectorProvider;
238
239 absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
240 CreateOrGetXdsCertificateProviderFromFilterChainData(
241 const XdsListenerResource::FilterChainData* filter_chain);
242 void Orphaned() override;
243
244 // Helper functions invoked by RouteConfigWatcher when there are updates to
245 // RDS resources.
246 void OnRouteConfigChanged(
247 const std::string& resource_name,
248 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
249 route_config);
250 void OnAmbientError(const std::string& resource_name, absl::Status status);
251
252 RefCountedPtr<GrpcXdsClient> xds_client_;
253 // This ref is only kept around till the FilterChainMatchManager becomes
254 // ready.
255 RefCountedPtr<ListenerWatcher> listener_watcher_;
256 // TODO(roth): Consider holding a ref to the LDS resource and storing
257 // a pointer to the filter chain data within that LDS resource, rather
258 // than copying the filter chain data here.
259 XdsListenerResource::FilterChainMap filter_chain_map_;
260 absl::optional<XdsListenerResource::FilterChainData> default_filter_chain_;
261 Mutex mu_;
262 size_t rds_resources_yet_to_fetch_ ABSL_GUARDED_BY(mu_) = 0;
263 std::map<std::string /* resource_name */, RdsUpdateState> rds_map_
264 ABSL_GUARDED_BY(mu_);
265 std::map<const XdsListenerResource::FilterChainData*,
266 RefCountedPtr<XdsCertificateProvider>>
267 certificate_providers_map_ ABSL_GUARDED_BY(mu_);
268 };
269
270 // A watcher implementation for listening on RDS updates referenced to by a
271 // FilterChainMatchManager object. After all referenced RDS resources are
272 // fetched (errors are allowed), the FilterChainMatchManager tries to replace
273 // the current object. The watcher continues to update the referenced RDS
274 // resources so that new XdsServerConfigSelectorProvider objects are created
275 // with the latest updates and new connections do not need to wait for the RDS
276 // resources to be fetched.
277 class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
278 RouteConfigWatcher final
279 : public XdsRouteConfigResourceType::WatcherInterface {
280 public:
RouteConfigWatcher(std::string resource_name,WeakRefCountedPtr<FilterChainMatchManager> filter_chain_match_manager)281 RouteConfigWatcher(
282 std::string resource_name,
283 WeakRefCountedPtr<FilterChainMatchManager> filter_chain_match_manager)
284 : resource_name_(std::move(resource_name)),
285 filter_chain_match_manager_(std::move(filter_chain_match_manager)) {}
286
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> route_config,RefCountedPtr<ReadDelayHandle>)287 void OnResourceChanged(
288 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
289 route_config,
290 RefCountedPtr<ReadDelayHandle> /* read_delay_handle */) override {
291 filter_chain_match_manager_->OnRouteConfigChanged(resource_name_,
292 std::move(route_config));
293 }
294
OnAmbientError(absl::Status status,RefCountedPtr<ReadDelayHandle>)295 void OnAmbientError(
296 absl::Status status,
297 RefCountedPtr<ReadDelayHandle> /* read_delay_handle */) override {
298 filter_chain_match_manager_->OnAmbientError(resource_name_,
299 std::move(status));
300 }
301
302 private:
303 std::string resource_name_;
304 WeakRefCountedPtr<FilterChainMatchManager> filter_chain_match_manager_;
305 };
306
307 // An implementation of ServerConfigSelector used by
308 // StaticXdsServerConfigSelectorProvider and
309 // DynamicXdsServerConfigSelectorProvider to parse the RDS update and get
310 // per-call configuration based on incoming metadata.
311 class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
312 XdsServerConfigSelector final : public ServerConfigSelector {
313 public:
314 static absl::StatusOr<RefCountedPtr<XdsServerConfigSelector>> Create(
315 const XdsHttpFilterRegistry& http_filter_registry,
316 std::shared_ptr<const XdsRouteConfigResource> rds_update,
317 const std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>&
318 http_filters);
319 ~XdsServerConfigSelector() override = default;
320
321 absl::StatusOr<CallConfig> GetCallConfig(
322 grpc_metadata_batch* metadata) override;
323
324 private:
325 struct VirtualHost {
326 struct Route {
327 // true if an action other than kNonForwardingAction is configured.
328 bool unsupported_action;
329 // TODO(roth): Consider holding a ref to the RDS resource and storing
330 // a pointer to the matchers within that RDS resource, rather than
331 // copying the matchers here.
332 XdsRouteConfigResource::Route::Matchers matchers;
333 RefCountedPtr<ServiceConfig> method_config;
334 };
335
336 class RouteListIterator final : public XdsRouting::RouteListIterator {
337 public:
RouteListIterator(const std::vector<Route> * routes)338 explicit RouteListIterator(const std::vector<Route>* routes)
339 : routes_(routes) {}
340
Size() const341 size_t Size() const override { return routes_->size(); }
342
GetMatchersForRoute(size_t index) const343 const XdsRouteConfigResource::Route::Matchers& GetMatchersForRoute(
344 size_t index) const override {
345 return (*routes_)[index].matchers;
346 }
347
348 private:
349 const std::vector<Route>* routes_;
350 };
351
352 std::vector<std::string> domains;
353 std::vector<Route> routes;
354 };
355
356 class VirtualHostListIterator final
357 : public XdsRouting::VirtualHostListIterator {
358 public:
VirtualHostListIterator(const std::vector<VirtualHost> * virtual_hosts)359 explicit VirtualHostListIterator(
360 const std::vector<VirtualHost>* virtual_hosts)
361 : virtual_hosts_(virtual_hosts) {}
362
Size() const363 size_t Size() const override { return virtual_hosts_->size(); }
364
GetDomainsForVirtualHost(size_t index) const365 const std::vector<std::string>& GetDomainsForVirtualHost(
366 size_t index) const override {
367 return (*virtual_hosts_)[index].domains;
368 }
369
370 private:
371 const std::vector<VirtualHost>* virtual_hosts_;
372 };
373
374 std::vector<VirtualHost> virtual_hosts_;
375 };
376
377 // An XdsServerConfigSelectorProvider implementation for when the
378 // RouteConfiguration is available inline.
379 class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
380 StaticXdsServerConfigSelectorProvider final
381 : public ServerConfigSelectorProvider {
382 public:
StaticXdsServerConfigSelectorProvider(RefCountedPtr<GrpcXdsClient> xds_client,absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> static_resource,std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter> http_filters)383 StaticXdsServerConfigSelectorProvider(
384 RefCountedPtr<GrpcXdsClient> xds_client,
385 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
386 static_resource,
387 std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
388 http_filters)
389 : xds_client_(std::move(xds_client)),
390 static_resource_(std::move(static_resource)),
391 http_filters_(std::move(http_filters)) {}
392
~StaticXdsServerConfigSelectorProvider()393 ~StaticXdsServerConfigSelectorProvider() override {
394 xds_client_.reset(DEBUG_LOCATION, "StaticXdsServerConfigSelectorProvider");
395 }
396
Watch(std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher> watcher)397 absl::StatusOr<RefCountedPtr<ServerConfigSelector>> Watch(
398 std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
399 watcher) override {
400 CHECK(watcher_ == nullptr);
401 watcher_ = std::move(watcher);
402 if (!static_resource_.ok()) {
403 return static_resource_.status();
404 }
405 return XdsServerConfigSelector::Create(
406 static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
407 .http_filter_registry(),
408 static_resource_.value(), http_filters_);
409 }
410
CancelWatch()411 void CancelWatch() override { watcher_.reset(); }
412
413 private:
Orphaned()414 void Orphaned() override {}
415
416 RefCountedPtr<GrpcXdsClient> xds_client_;
417 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
418 static_resource_;
419 // TODO(roth): Consider holding a ref to the LDS resource and storing
420 // a pointer to the HTTP filters within that LDS resource, rather than
421 // copying the HTTP filters here.
422 std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
423 http_filters_;
424 std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
425 watcher_;
426 };
427
428 // An XdsServerConfigSelectorProvider implementation for when the
429 // RouteConfiguration is to be fetched separately via RDS.
430 class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
431 DynamicXdsServerConfigSelectorProvider final
432 : public ServerConfigSelectorProvider {
433 public:
434 DynamicXdsServerConfigSelectorProvider(
435 RefCountedPtr<GrpcXdsClient> xds_client, std::string resource_name,
436 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
437 initial_resource,
438 std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
439 http_filters);
440
~DynamicXdsServerConfigSelectorProvider()441 ~DynamicXdsServerConfigSelectorProvider() override {
442 xds_client_.reset(DEBUG_LOCATION, "DynamicXdsServerConfigSelectorProvider");
443 }
444
445 absl::StatusOr<RefCountedPtr<ServerConfigSelector>> Watch(
446 std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
447 watcher) override;
448 void CancelWatch() override;
449
450 private:
451 class RouteConfigWatcher;
452
453 void Orphaned() override;
454 void OnRouteConfigChanged(
455 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> rds_update);
456 void OnAmbientError(absl::Status status);
457
458 RefCountedPtr<GrpcXdsClient> xds_client_;
459 std::string resource_name_;
460 // TODO(roth): Consider holding a ref to the LDS resource and storing
461 // a pointer to the HTTP filters within that LDS resource, rather than
462 // copying the HTTP filters here.
463 std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
464 http_filters_;
465 RouteConfigWatcher* route_config_watcher_ = nullptr;
466 Mutex mu_;
467 std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
468 watcher_ ABSL_GUARDED_BY(mu_);
469 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> resource_
470 ABSL_GUARDED_BY(mu_);
471 };
472
473 // A watcher implementation for updating the RDS resource used by
474 // DynamicXdsServerConfigSelectorProvider
475 class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
476 DynamicXdsServerConfigSelectorProvider::RouteConfigWatcher final
477 : public XdsRouteConfigResourceType::WatcherInterface {
478 public:
RouteConfigWatcher(WeakRefCountedPtr<DynamicXdsServerConfigSelectorProvider> parent)479 explicit RouteConfigWatcher(
480 WeakRefCountedPtr<DynamicXdsServerConfigSelectorProvider> parent)
481 : parent_(std::move(parent)) {}
482
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> route_config,RefCountedPtr<ReadDelayHandle>)483 void OnResourceChanged(
484 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
485 route_config,
486 RefCountedPtr<ReadDelayHandle> /* read_delay_handle */) override {
487 parent_->OnRouteConfigChanged(std::move(route_config));
488 }
489
OnAmbientError(absl::Status status,RefCountedPtr<ReadDelayHandle>)490 void OnAmbientError(
491 absl::Status status,
492 RefCountedPtr<ReadDelayHandle> /* read_delay_handle */) override {
493 parent_->OnAmbientError(std::move(status));
494 }
495
496 private:
497 WeakRefCountedPtr<DynamicXdsServerConfigSelectorProvider> parent_;
498 };
499
500 //
501 // XdsServerConfigFetcher
502 //
503
XdsServerConfigFetcher(RefCountedPtr<GrpcXdsClient> xds_client,grpc_server_xds_status_notifier notifier)504 XdsServerConfigFetcher::XdsServerConfigFetcher(
505 RefCountedPtr<GrpcXdsClient> xds_client,
506 grpc_server_xds_status_notifier notifier)
507 : xds_client_(std::move(xds_client)), serving_status_notifier_(notifier) {
508 CHECK(xds_client_ != nullptr);
509 }
510
ListenerResourceName(absl::string_view resource_name_template,absl::string_view listening_address)511 std::string ListenerResourceName(absl::string_view resource_name_template,
512 absl::string_view listening_address) {
513 std::string tmp;
514 if (absl::StartsWith(resource_name_template, "xdstp:")) {
515 tmp = URI::PercentEncodePath(listening_address);
516 listening_address = tmp;
517 }
518 return absl::StrReplaceAll(resource_name_template,
519 {{"%s", listening_address}});
520 }
521
StartWatch(std::string listening_address,std::unique_ptr<ServerConfigFetcher::WatcherInterface> watcher)522 void XdsServerConfigFetcher::StartWatch(
523 std::string listening_address,
524 std::unique_ptr<ServerConfigFetcher::WatcherInterface> watcher) {
525 ServerConfigFetcher::WatcherInterface* watcher_ptr = watcher.get();
526 auto listener_watcher = MakeRefCounted<ListenerWatcher>(
527 xds_client_.Ref(DEBUG_LOCATION, "ListenerWatcher"), std::move(watcher),
528 serving_status_notifier_, listening_address);
529 auto* listener_watcher_ptr = listener_watcher.get();
530 XdsListenerResourceType::StartWatch(
531 xds_client_.get(),
532 ListenerResourceName(
533 static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
534 .server_listener_resource_name_template(),
535 listening_address),
536 std::move(listener_watcher));
537 MutexLock lock(&mu_);
538 listener_watchers_.emplace(watcher_ptr, listener_watcher_ptr);
539 }
540
CancelWatch(ServerConfigFetcher::WatcherInterface * watcher)541 void XdsServerConfigFetcher::CancelWatch(
542 ServerConfigFetcher::WatcherInterface* watcher) {
543 MutexLock lock(&mu_);
544 auto it = listener_watchers_.find(watcher);
545 if (it != listener_watchers_.end()) {
546 // Cancel the watch on the listener before erasing
547 XdsListenerResourceType::CancelWatch(
548 xds_client_.get(),
549 ListenerResourceName(
550 static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
551 .server_listener_resource_name_template(),
552 it->second->listening_address()),
553 it->second, false /* delay_unsubscription */);
554 listener_watchers_.erase(it);
555 }
556 }
557
558 //
559 // XdsServerConfigFetcher::ListenerWatcher
560 //
561
ListenerWatcher(RefCountedPtr<GrpcXdsClient> xds_client,std::unique_ptr<ServerConfigFetcher::WatcherInterface> server_config_watcher,grpc_server_xds_status_notifier serving_status_notifier,std::string listening_address)562 XdsServerConfigFetcher::ListenerWatcher::ListenerWatcher(
563 RefCountedPtr<GrpcXdsClient> xds_client,
564 std::unique_ptr<ServerConfigFetcher::WatcherInterface>
565 server_config_watcher,
566 grpc_server_xds_status_notifier serving_status_notifier,
567 std::string listening_address)
568 : xds_client_(std::move(xds_client)),
569 server_config_watcher_(std::move(server_config_watcher)),
570 serving_status_notifier_(serving_status_notifier),
571 listening_address_(std::move(listening_address)) {}
572
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener,RefCountedPtr<ReadDelayHandle>)573 void XdsServerConfigFetcher::ListenerWatcher::OnResourceChanged(
574 absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener,
575 RefCountedPtr<ReadDelayHandle> /* read_delay_handle */) {
576 if (!listener.ok()) {
577 MutexLock lock(&mu_);
578 OnFatalError(absl::Status(
579 listener.status().code(),
580 absl::StrCat("LDS resource: ", listener.status().message())));
581 return;
582 }
583 GRPC_TRACE_LOG(xds_server_config_fetcher, INFO)
584 << "[ListenerWatcher " << this << "] Received LDS update from xds client "
585 << xds_client_.get() << ": " << (*listener)->ToString();
586 auto* tcp_listener =
587 absl::get_if<XdsListenerResource::TcpListener>(&(*listener)->listener);
588 if (tcp_listener == nullptr) {
589 MutexLock lock(&mu_);
590 OnFatalError(
591 absl::FailedPreconditionError("LDS resource is not a TCP listener"));
592 return;
593 }
594 if (tcp_listener->address != listening_address_) {
595 MutexLock lock(&mu_);
596 OnFatalError(absl::FailedPreconditionError(
597 "Address in LDS update does not match listening address"));
598 return;
599 }
600 auto new_filter_chain_match_manager = MakeRefCounted<FilterChainMatchManager>(
601 xds_client_.Ref(DEBUG_LOCATION, "FilterChainMatchManager"),
602 tcp_listener->filter_chain_map, tcp_listener->default_filter_chain);
603 MutexLock lock(&mu_);
604 if (filter_chain_match_manager_ == nullptr ||
605 !(new_filter_chain_match_manager->filter_chain_map() ==
606 filter_chain_match_manager_->filter_chain_map() &&
607 new_filter_chain_match_manager->default_filter_chain() ==
608 filter_chain_match_manager_->default_filter_chain())) {
609 pending_filter_chain_match_manager_ =
610 std::move(new_filter_chain_match_manager);
611 pending_filter_chain_match_manager_->StartRdsWatch(
612 RefAsSubclass<ListenerWatcher>());
613 }
614 }
615
OnAmbientError(absl::Status status,RefCountedPtr<ReadDelayHandle>)616 void XdsServerConfigFetcher::ListenerWatcher::OnAmbientError(
617 absl::Status status, RefCountedPtr<ReadDelayHandle> /*read_delay_handle*/) {
618 LOG(ERROR) << "ListenerWatcher:" << this
619 << " XdsClient reports ambient error: " << status << " for "
620 << listening_address_
621 << "; ignoring in favor of existing resource";
622 }
623
OnFatalError(absl::Status status)624 void XdsServerConfigFetcher::ListenerWatcher::OnFatalError(
625 absl::Status status) {
626 pending_filter_chain_match_manager_.reset();
627 if (filter_chain_match_manager_ != nullptr) {
628 // The server has started listening already, so we need to gracefully
629 // stop serving.
630 server_config_watcher_->StopServing();
631 filter_chain_match_manager_.reset();
632 }
633 if (serving_status_notifier_.on_serving_status_update != nullptr) {
634 serving_status_notifier_.on_serving_status_update(
635 serving_status_notifier_.user_data, listening_address_.c_str(),
636 {static_cast<grpc_status_code>(status.raw_code()),
637 std::string(status.message()).c_str()});
638 } else {
639 LOG(ERROR) << "ListenerWatcher:" << this << " Encountered fatal error "
640 << status << "; not serving on " << listening_address_;
641 }
642 }
643
644 void XdsServerConfigFetcher::ListenerWatcher::
PendingFilterChainMatchManagerReadyLocked(XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager * filter_chain_match_manager)645 PendingFilterChainMatchManagerReadyLocked(
646 XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager*
647 filter_chain_match_manager) {
648 if (pending_filter_chain_match_manager_ != filter_chain_match_manager) {
649 // This FilterChainMatchManager is no longer the current pending resource.
650 // It should get cleaned up eventually. Ignore this update.
651 return;
652 }
653 bool first_good_update = filter_chain_match_manager_ == nullptr;
654 // Promote the pending FilterChainMatchManager
655 filter_chain_match_manager_ = std::move(pending_filter_chain_match_manager_);
656 // TODO(yashykt): Right now, the server_config_watcher_ does not invoke
657 // XdsServerConfigFetcher while holding a lock, but that might change in the
658 // future in which case we would want to execute this update outside the
659 // critical region through a WorkSerializer similar to XdsClient.
660 server_config_watcher_->UpdateConnectionManager(filter_chain_match_manager_);
661 // Let the logger know about the update if there was no previous good update.
662 if (first_good_update) {
663 if (serving_status_notifier_.on_serving_status_update != nullptr) {
664 serving_status_notifier_.on_serving_status_update(
665 serving_status_notifier_.user_data, listening_address_.c_str(),
666 {GRPC_STATUS_OK, ""});
667 } else {
668 LOG(INFO) << "xDS Listener resource obtained; will start serving on "
669 << listening_address_;
670 }
671 }
672 }
673
674 //
675 // XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager
676 //
677
678 XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
FilterChainMatchManager(RefCountedPtr<GrpcXdsClient> xds_client,XdsListenerResource::FilterChainMap filter_chain_map,absl::optional<XdsListenerResource::FilterChainData> default_filter_chain)679 FilterChainMatchManager(
680 RefCountedPtr<GrpcXdsClient> xds_client,
681 XdsListenerResource::FilterChainMap filter_chain_map,
682 absl::optional<XdsListenerResource::FilterChainData>
683 default_filter_chain)
684 : xds_client_(std::move(xds_client)),
685 filter_chain_map_(std::move(filter_chain_map)),
686 default_filter_chain_(std::move(default_filter_chain)) {}
687
688 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
StartRdsWatch(RefCountedPtr<ListenerWatcher> listener_watcher)689 StartRdsWatch(RefCountedPtr<ListenerWatcher> listener_watcher) {
690 // Get the set of RDS resources to watch on. Also get the set of
691 // FilterChainData so that we can reverse the list of HTTP filters since
692 // received data moves *up* the stack in Core.
693 std::set<std::string> resource_names;
694 std::set<XdsListenerResource::FilterChainData*> filter_chain_data_set;
695 for (const auto& destination_ip : filter_chain_map_.destination_ip_vector) {
696 for (const auto& source_type : destination_ip.source_types_array) {
697 for (const auto& source_ip : source_type) {
698 for (const auto& source_port_pair : source_ip.ports_map) {
699 auto* filter_chain_data = source_port_pair.second.data.get();
700 const auto* rds_name = absl::get_if<std::string>(
701 &filter_chain_data->http_connection_manager.route_config);
702 if (rds_name != nullptr) resource_names.insert(*rds_name);
703 filter_chain_data_set.insert(filter_chain_data);
704 }
705 }
706 }
707 }
708 if (default_filter_chain_.has_value()) {
709 auto& hcm = default_filter_chain_->http_connection_manager;
710 const auto* rds_name = absl::get_if<std::string>(&hcm.route_config);
711 if (rds_name != nullptr) resource_names.insert(*rds_name);
712 std::reverse(hcm.http_filters.begin(), hcm.http_filters.end());
713 }
714 // Reverse the lists of HTTP filters in all the filter chains
715 for (auto* filter_chain_data : filter_chain_data_set) {
716 auto& hcm = filter_chain_data->http_connection_manager;
717 std::reverse(hcm.http_filters.begin(), hcm.http_filters.end());
718 }
719 // Start watching on referenced RDS resources
720 struct WatcherToStart {
721 std::string resource_name;
722 RefCountedPtr<RouteConfigWatcher> watcher;
723 };
724 std::vector<WatcherToStart> watchers_to_start;
725 watchers_to_start.reserve(resource_names.size());
726 {
727 MutexLock lock(&mu_);
728 for (const auto& resource_name : resource_names) {
729 ++rds_resources_yet_to_fetch_;
730 auto route_config_watcher = MakeRefCounted<RouteConfigWatcher>(
731 resource_name, WeakRefAsSubclass<FilterChainMatchManager>());
732 rds_map_.emplace(resource_name, RdsUpdateState{route_config_watcher.get(),
733 absl::nullopt});
734 watchers_to_start.push_back(
735 WatcherToStart{resource_name, std::move(route_config_watcher)});
736 }
737 if (rds_resources_yet_to_fetch_ != 0) {
738 listener_watcher_ = std::move(listener_watcher);
739 listener_watcher = nullptr;
740 }
741 }
742 for (auto& watcher_to_start : watchers_to_start) {
743 XdsRouteConfigResourceType::StartWatch(xds_client_.get(),
744 watcher_to_start.resource_name,
745 std::move(watcher_to_start.watcher));
746 }
747 // Promote this filter chain match manager if all referenced resources are
748 // fetched.
749 if (listener_watcher != nullptr) {
750 listener_watcher->PendingFilterChainMatchManagerReadyLocked(this);
751 }
752 }
753
754 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
Orphaned()755 Orphaned() {
756 MutexLock lock(&mu_);
757 // Cancel the RDS watches to clear up the weak refs
758 for (const auto& entry : rds_map_) {
759 XdsRouteConfigResourceType::CancelWatch(xds_client_.get(), entry.first,
760 entry.second.watcher,
761 false /* delay_unsubscription */);
762 }
763 // Also give up the ref on ListenerWatcher since it won't be needed anymore
764 listener_watcher_.reset();
765 }
766
767 absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
768 XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
CreateOrGetXdsCertificateProviderFromFilterChainData(const XdsListenerResource::FilterChainData * filter_chain)769 CreateOrGetXdsCertificateProviderFromFilterChainData(
770 const XdsListenerResource::FilterChainData* filter_chain) {
771 MutexLock lock(&mu_);
772 auto it = certificate_providers_map_.find(filter_chain);
773 if (it != certificate_providers_map_.end()) return it->second;
774 // Configure root cert.
775 auto* ca_cert_provider =
776 absl::get_if<CommonTlsContext::CertificateProviderPluginInstance>(
777 &filter_chain->downstream_tls_context.common_tls_context
778 .certificate_validation_context.ca_certs);
779 absl::string_view root_provider_cert_name;
780 RefCountedPtr<grpc_tls_certificate_provider> root_cert_provider;
781 if (ca_cert_provider != nullptr) {
782 root_provider_cert_name = ca_cert_provider->certificate_name;
783 root_cert_provider =
784 xds_client_->certificate_provider_store()
785 .CreateOrGetCertificateProvider(ca_cert_provider->instance_name);
786 if (root_cert_provider == nullptr) {
787 return absl::NotFoundError(
788 absl::StrCat("Certificate provider instance name: \"",
789 ca_cert_provider->instance_name, "\" not recognized."));
790 }
791 }
792 // Configure identity cert.
793 absl::string_view identity_provider_instance_name =
794 filter_chain->downstream_tls_context.common_tls_context
795 .tls_certificate_provider_instance.instance_name;
796 absl::string_view identity_provider_cert_name =
797 filter_chain->downstream_tls_context.common_tls_context
798 .tls_certificate_provider_instance.certificate_name;
799 RefCountedPtr<grpc_tls_certificate_provider> identity_cert_provider;
800 if (!identity_provider_instance_name.empty()) {
801 identity_cert_provider =
802 xds_client_->certificate_provider_store()
803 .CreateOrGetCertificateProvider(identity_provider_instance_name);
804 if (identity_cert_provider == nullptr) {
805 return absl::NotFoundError(
806 absl::StrCat("Certificate provider instance name: \"",
807 identity_provider_instance_name, "\" not recognized."));
808 }
809 }
810 auto xds_cert_provider = MakeRefCounted<XdsCertificateProvider>(
811 std::move(root_cert_provider), root_provider_cert_name,
812 std::move(identity_cert_provider), identity_provider_cert_name,
813 filter_chain->downstream_tls_context.require_client_certificate);
814 certificate_providers_map_.emplace(filter_chain, xds_cert_provider);
815 return xds_cert_provider;
816 }
817
818 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
OnRouteConfigChanged(const std::string & resource_name,absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> route_config)819 OnRouteConfigChanged(
820 const std::string& resource_name,
821 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
822 route_config) {
823 RefCountedPtr<ListenerWatcher> listener_watcher;
824 {
825 MutexLock lock(&mu_);
826 auto& state = rds_map_[resource_name];
827 if (!state.rds_update.has_value()) {
828 if (--rds_resources_yet_to_fetch_ == 0) {
829 listener_watcher = std::move(listener_watcher_);
830 }
831 }
832 if (!route_config.ok()) {
833 route_config = absl::UnavailableError(
834 absl::StrCat("RDS resource ", resource_name, ": ",
835 route_config.status().message()));
836 }
837 state.rds_update = std::move(route_config);
838 }
839 // Promote the filter chain match manager object if all the referenced
840 // resources are fetched.
841 if (listener_watcher != nullptr) {
842 listener_watcher->PendingFilterChainMatchManagerReady(this);
843 }
844 }
845
846 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
OnAmbientError(const std::string & resource_name,absl::Status status)847 OnAmbientError(const std::string& resource_name, absl::Status status) {
848 LOG(ERROR) << "RouteConfigWatcher:" << this
849 << " XdsClient reports ambient error: " << status << " for "
850 << resource_name << "; ignoring in favor of existing resource";
851 }
852
FindFilterChainDataForSourcePort(const XdsListenerResource::FilterChainMap::SourcePortsMap & source_ports_map,absl::string_view port_str)853 const XdsListenerResource::FilterChainData* FindFilterChainDataForSourcePort(
854 const XdsListenerResource::FilterChainMap::SourcePortsMap& source_ports_map,
855 absl::string_view port_str) {
856 int port = 0;
857 if (!absl::SimpleAtoi(port_str, &port)) return nullptr;
858 auto it = source_ports_map.find(port);
859 if (it != source_ports_map.end()) {
860 return it->second.data.get();
861 }
862 // Search for the catch-all port 0 since we didn't get a direct match
863 it = source_ports_map.find(0);
864 if (it != source_ports_map.end()) {
865 return it->second.data.get();
866 }
867 return nullptr;
868 }
869
FindFilterChainDataForSourceIp(const XdsListenerResource::FilterChainMap::SourceIpVector & source_ip_vector,const grpc_resolved_address * source_ip,absl::string_view port)870 const XdsListenerResource::FilterChainData* FindFilterChainDataForSourceIp(
871 const XdsListenerResource::FilterChainMap::SourceIpVector& source_ip_vector,
872 const grpc_resolved_address* source_ip, absl::string_view port) {
873 const XdsListenerResource::FilterChainMap::SourceIp* best_match = nullptr;
874 for (const auto& entry : source_ip_vector) {
875 // Special case for catch-all
876 if (!entry.prefix_range.has_value()) {
877 if (best_match == nullptr) {
878 best_match = &entry;
879 }
880 continue;
881 }
882 if (best_match != nullptr && best_match->prefix_range.has_value() &&
883 best_match->prefix_range->prefix_len >=
884 entry.prefix_range->prefix_len) {
885 continue;
886 }
887 if (grpc_sockaddr_match_subnet(source_ip, &entry.prefix_range->address,
888 entry.prefix_range->prefix_len)) {
889 best_match = &entry;
890 }
891 }
892 if (best_match == nullptr) return nullptr;
893 return FindFilterChainDataForSourcePort(best_match->ports_map, port);
894 }
895
IsLoopbackIp(const grpc_resolved_address * address)896 bool IsLoopbackIp(const grpc_resolved_address* address) {
897 const grpc_sockaddr* sock_addr =
898 reinterpret_cast<const grpc_sockaddr*>(&address->addr);
899 if (sock_addr->sa_family == GRPC_AF_INET) {
900 const grpc_sockaddr_in* addr4 =
901 reinterpret_cast<const grpc_sockaddr_in*>(sock_addr);
902 if (addr4->sin_addr.s_addr == grpc_htonl(INADDR_LOOPBACK)) {
903 return true;
904 }
905 } else if (sock_addr->sa_family == GRPC_AF_INET6) {
906 const grpc_sockaddr_in6* addr6 =
907 reinterpret_cast<const grpc_sockaddr_in6*>(sock_addr);
908 if (memcmp(&addr6->sin6_addr, &in6addr_loopback,
909 sizeof(in6addr_loopback)) == 0) {
910 return true;
911 }
912 }
913 return false;
914 }
915
FindFilterChainDataForSourceType(const XdsListenerResource::FilterChainMap::ConnectionSourceTypesArray & source_types_array,grpc_endpoint * tcp,absl::string_view destination_ip)916 const XdsListenerResource::FilterChainData* FindFilterChainDataForSourceType(
917 const XdsListenerResource::FilterChainMap::ConnectionSourceTypesArray&
918 source_types_array,
919 grpc_endpoint* tcp, absl::string_view destination_ip) {
920 auto source_uri = URI::Parse(grpc_endpoint_get_peer(tcp));
921 if (!source_uri.ok() ||
922 (source_uri->scheme() != "ipv4" && source_uri->scheme() != "ipv6")) {
923 return nullptr;
924 }
925 std::string host;
926 std::string port;
927 if (!SplitHostPort(source_uri->path(), &host, &port)) {
928 return nullptr;
929 }
930 auto source_addr = StringToSockaddr(host, 0); // Port doesn't matter here.
931 if (!source_addr.ok()) {
932 VLOG(2) << "Could not parse \"" << host
933 << "\" as socket address: " << source_addr.status();
934 return nullptr;
935 }
936 // Use kAny only if kSameIporLoopback and kExternal are empty
937 if (source_types_array[static_cast<int>(
938 XdsListenerResource::FilterChainMap::
939 ConnectionSourceType::kSameIpOrLoopback)]
940 .empty() &&
941 source_types_array[static_cast<int>(XdsListenerResource::FilterChainMap::
942 ConnectionSourceType::kExternal)]
943 .empty()) {
944 return FindFilterChainDataForSourceIp(
945 source_types_array[static_cast<int>(
946 XdsListenerResource::FilterChainMap::ConnectionSourceType::kAny)],
947 &*source_addr, port);
948 }
949 if (IsLoopbackIp(&*source_addr) || host == destination_ip) {
950 return FindFilterChainDataForSourceIp(
951 source_types_array[static_cast<int>(
952 XdsListenerResource::FilterChainMap::ConnectionSourceType::
953 kSameIpOrLoopback)],
954 &*source_addr, port);
955 } else {
956 return FindFilterChainDataForSourceIp(
957 source_types_array[static_cast<int>(
958 XdsListenerResource::FilterChainMap::ConnectionSourceType::
959 kExternal)],
960 &*source_addr, port);
961 }
962 }
963
FindFilterChainDataForDestinationIp(const XdsListenerResource::FilterChainMap::DestinationIpVector destination_ip_vector,grpc_endpoint * tcp)964 const XdsListenerResource::FilterChainData* FindFilterChainDataForDestinationIp(
965 const XdsListenerResource::FilterChainMap::DestinationIpVector
966 destination_ip_vector,
967 grpc_endpoint* tcp) {
968 auto destination_uri = URI::Parse(grpc_endpoint_get_local_address(tcp));
969 if (!destination_uri.ok() || (destination_uri->scheme() != "ipv4" &&
970 destination_uri->scheme() != "ipv6")) {
971 return nullptr;
972 }
973 std::string host;
974 std::string port;
975 if (!SplitHostPort(destination_uri->path(), &host, &port)) {
976 return nullptr;
977 }
978 auto destination_addr =
979 StringToSockaddr(host, 0); // Port doesn't matter here.
980 if (!destination_addr.ok()) {
981 VLOG(2) << "Could not parse \"" << host
982 << "\" as socket address: " << destination_addr.status();
983 return nullptr;
984 }
985 const XdsListenerResource::FilterChainMap::DestinationIp* best_match =
986 nullptr;
987 for (const auto& entry : destination_ip_vector) {
988 // Special case for catch-all
989 if (!entry.prefix_range.has_value()) {
990 if (best_match == nullptr) {
991 best_match = &entry;
992 }
993 continue;
994 }
995 if (best_match != nullptr && best_match->prefix_range.has_value() &&
996 best_match->prefix_range->prefix_len >=
997 entry.prefix_range->prefix_len) {
998 continue;
999 }
1000 if (grpc_sockaddr_match_subnet(&*destination_addr,
1001 &entry.prefix_range->address,
1002 entry.prefix_range->prefix_len)) {
1003 best_match = &entry;
1004 }
1005 }
1006 if (best_match == nullptr) return nullptr;
1007 return FindFilterChainDataForSourceType(best_match->source_types_array, tcp,
1008 host);
1009 }
1010
1011 absl::StatusOr<ChannelArgs> XdsServerConfigFetcher::ListenerWatcher::
UpdateChannelArgsForConnection(const ChannelArgs & input_args,grpc_endpoint * tcp)1012 FilterChainMatchManager::UpdateChannelArgsForConnection(
1013 const ChannelArgs& input_args, grpc_endpoint* tcp) {
1014 ChannelArgs args = input_args;
1015 const auto* filter_chain = FindFilterChainDataForDestinationIp(
1016 filter_chain_map_.destination_ip_vector, tcp);
1017 if (filter_chain == nullptr && default_filter_chain_.has_value()) {
1018 filter_chain = &default_filter_chain_.value();
1019 }
1020 if (filter_chain == nullptr) {
1021 return absl::UnavailableError("No matching filter chain found");
1022 }
1023 RefCountedPtr<ServerConfigSelectorProvider> server_config_selector_provider;
1024 RefCountedPtr<XdsChannelStackModifier> channel_stack_modifier;
1025 RefCountedPtr<XdsCertificateProvider> xds_certificate_provider;
1026 // Iterate the list of HTTP filters in reverse since in Core, received data
1027 // flows *up* the stack.
1028 std::vector<const grpc_channel_filter*> filters;
1029 const auto& http_filter_registry =
1030 static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
1031 .http_filter_registry();
1032 for (const auto& http_filter :
1033 filter_chain->http_connection_manager.http_filters) {
1034 // Find filter. This is guaranteed to succeed, because it's checked
1035 // at config validation time in the XdsApi code.
1036 const XdsHttpFilterImpl* filter_impl =
1037 http_filter_registry.GetFilterForType(
1038 http_filter.config.config_proto_type_name);
1039 CHECK_NE(filter_impl, nullptr);
1040 // Some filters like the router filter are no-op filters and do not have
1041 // an implementation.
1042 if (filter_impl->channel_filter() != nullptr) {
1043 filters.push_back(filter_impl->channel_filter());
1044 }
1045 }
1046 // Add config selector filter.
1047 filters.push_back(&kServerConfigSelectorFilter);
1048 channel_stack_modifier =
1049 MakeRefCounted<XdsChannelStackModifier>(std::move(filters));
1050 Match(
1051 filter_chain->http_connection_manager.route_config,
1052 // RDS resource name
1053 [&](const std::string& rds_name) {
1054 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
1055 initial_resource;
1056 {
1057 MutexLock lock(&mu_);
1058 initial_resource = rds_map_[rds_name].rds_update.value();
1059 }
1060 server_config_selector_provider =
1061 MakeRefCounted<DynamicXdsServerConfigSelectorProvider>(
1062 xds_client_.Ref(DEBUG_LOCATION,
1063 "DynamicXdsServerConfigSelectorProvider"),
1064 rds_name, std::move(initial_resource),
1065 filter_chain->http_connection_manager.http_filters);
1066 },
1067 // inline RouteConfig
1068 [&](const std::shared_ptr<const XdsRouteConfigResource>& route_config) {
1069 server_config_selector_provider =
1070 MakeRefCounted<StaticXdsServerConfigSelectorProvider>(
1071 xds_client_.Ref(DEBUG_LOCATION,
1072 "StaticXdsServerConfigSelectorProvider"),
1073 route_config,
1074 filter_chain->http_connection_manager.http_filters);
1075 });
1076 args = args.SetObject(server_config_selector_provider)
1077 .SetObject(channel_stack_modifier);
1078 // Add XdsCertificateProvider if credentials are xDS.
1079 auto* server_creds = args.GetObject<grpc_server_credentials>();
1080 if (server_creds != nullptr &&
1081 server_creds->type() == XdsServerCredentials::Type()) {
1082 absl::StatusOr<RefCountedPtr<XdsCertificateProvider>> result =
1083 CreateOrGetXdsCertificateProviderFromFilterChainData(filter_chain);
1084 if (!result.ok()) {
1085 return result.status();
1086 }
1087 xds_certificate_provider = std::move(*result);
1088 CHECK(xds_certificate_provider != nullptr);
1089 args = args.SetObject(xds_certificate_provider);
1090 }
1091 return args;
1092 }
1093
1094 //
1095 // XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::XdsServerConfigSelector
1096 //
1097
1098 absl::StatusOr<
1099 RefCountedPtr<XdsServerConfigFetcher::ListenerWatcher::
1100 FilterChainMatchManager::XdsServerConfigSelector>>
1101 XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
Create(const XdsHttpFilterRegistry & http_filter_registry,std::shared_ptr<const XdsRouteConfigResource> rds_update,const std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter> & http_filters)1102 XdsServerConfigSelector::Create(
1103 const XdsHttpFilterRegistry& http_filter_registry,
1104 std::shared_ptr<const XdsRouteConfigResource> rds_update,
1105 const std::vector<
1106 XdsListenerResource::HttpConnectionManager::HttpFilter>&
1107 http_filters) {
1108 auto config_selector = MakeRefCounted<XdsServerConfigSelector>();
1109 for (auto& vhost : rds_update->virtual_hosts) {
1110 config_selector->virtual_hosts_.emplace_back();
1111 auto& virtual_host = config_selector->virtual_hosts_.back();
1112 virtual_host.domains = vhost.domains;
1113 for (auto& route : vhost.routes) {
1114 virtual_host.routes.emplace_back();
1115 auto& config_selector_route = virtual_host.routes.back();
1116 config_selector_route.matchers = route.matchers;
1117 config_selector_route.unsupported_action =
1118 absl::get_if<XdsRouteConfigResource::Route::NonForwardingAction>(
1119 &route.action) == nullptr;
1120 auto result = XdsRouting::GeneratePerHTTPFilterConfigsForMethodConfig(
1121 http_filter_registry, http_filters, vhost, route, nullptr,
1122 ChannelArgs());
1123 if (!result.ok()) return result.status();
1124 std::vector<std::string> fields;
1125 fields.reserve(result->per_filter_configs.size());
1126 for (const auto& p : result->per_filter_configs) {
1127 fields.emplace_back(absl::StrCat(" \"", p.first, "\": [\n",
1128 absl::StrJoin(p.second, ",\n"),
1129 "\n ]"));
1130 }
1131 if (!fields.empty()) {
1132 std::string json = absl::StrCat(
1133 "{\n"
1134 " \"methodConfig\": [ {\n"
1135 " \"name\": [\n"
1136 " {}\n"
1137 " ],\n"
1138 " ",
1139 absl::StrJoin(fields, ",\n"),
1140 "\n } ]\n"
1141 "}");
1142 config_selector_route.method_config =
1143 ServiceConfigImpl::Create(result->args, json.c_str()).value();
1144 }
1145 }
1146 }
1147 return config_selector;
1148 }
1149
1150 absl::StatusOr<ServerConfigSelector::CallConfig>
1151 XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
GetCallConfig(grpc_metadata_batch * metadata)1152 XdsServerConfigSelector::GetCallConfig(grpc_metadata_batch* metadata) {
1153 CallConfig call_config;
1154 if (metadata->get_pointer(HttpPathMetadata()) == nullptr) {
1155 return absl::InternalError("no path found");
1156 }
1157 absl::string_view path =
1158 metadata->get_pointer(HttpPathMetadata())->as_string_view();
1159 if (metadata->get_pointer(HttpAuthorityMetadata()) == nullptr) {
1160 return absl::InternalError("no authority found");
1161 }
1162 absl::string_view authority =
1163 metadata->get_pointer(HttpAuthorityMetadata())->as_string_view();
1164 auto vhost_index = XdsRouting::FindVirtualHostForDomain(
1165 VirtualHostListIterator(&virtual_hosts_), authority);
1166 if (!vhost_index.has_value()) {
1167 return absl::UnavailableError(
1168 absl::StrCat("could not find VirtualHost for ", authority,
1169 " in RouteConfiguration"));
1170 }
1171 auto& virtual_host = virtual_hosts_[vhost_index.value()];
1172 auto route_index = XdsRouting::GetRouteForRequest(
1173 VirtualHost::RouteListIterator(&virtual_host.routes), path, metadata);
1174 if (route_index.has_value()) {
1175 auto& route = virtual_host.routes[route_index.value()];
1176 // Found the matching route
1177 if (route.unsupported_action) {
1178 return absl::UnavailableError("matching route has unsupported action");
1179 }
1180 if (route.method_config != nullptr) {
1181 call_config.method_configs =
1182 route.method_config->GetMethodParsedConfigVector(grpc_empty_slice());
1183 call_config.service_config = route.method_config;
1184 }
1185 return call_config;
1186 }
1187 return absl::UnavailableError("no route matched");
1188 }
1189
1190 //
1191 // XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::DynamicXdsServerConfigSelectorProvider
1192 //
1193
1194 XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
1195 DynamicXdsServerConfigSelectorProvider::
DynamicXdsServerConfigSelectorProvider(RefCountedPtr<GrpcXdsClient> xds_client,std::string resource_name,absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> initial_resource,std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter> http_filters)1196 DynamicXdsServerConfigSelectorProvider(
1197 RefCountedPtr<GrpcXdsClient> xds_client, std::string resource_name,
1198 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
1199 initial_resource,
1200 std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
1201 http_filters)
1202 : xds_client_(std::move(xds_client)),
1203 resource_name_(std::move(resource_name)),
1204 http_filters_(std::move(http_filters)),
1205 resource_(std::move(initial_resource)) {
1206 CHECK(!resource_name_.empty());
1207 // RouteConfigWatcher is being created here instead of in Watch() to avoid
1208 // deadlocks from invoking XdsRouteConfigResourceType::StartWatch whilst in a
1209 // critical region.
1210 auto route_config_watcher = MakeRefCounted<RouteConfigWatcher>(
1211 WeakRefAsSubclass<DynamicXdsServerConfigSelectorProvider>());
1212 route_config_watcher_ = route_config_watcher.get();
1213 XdsRouteConfigResourceType::StartWatch(xds_client_.get(), resource_name_,
1214 std::move(route_config_watcher));
1215 }
1216
1217 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
Orphaned()1218 DynamicXdsServerConfigSelectorProvider::Orphaned() {
1219 XdsRouteConfigResourceType::CancelWatch(xds_client_.get(), resource_name_,
1220 route_config_watcher_,
1221 false /* delay_unsubscription */);
1222 }
1223
1224 absl::StatusOr<RefCountedPtr<ServerConfigSelector>>
1225 XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
Watch(std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher> watcher)1226 DynamicXdsServerConfigSelectorProvider::Watch(
1227 std::unique_ptr<
1228 ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
1229 watcher) {
1230 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> resource;
1231 {
1232 MutexLock lock(&mu_);
1233 CHECK(watcher_ == nullptr);
1234 watcher_ = std::move(watcher);
1235 resource = resource_;
1236 }
1237 if (!resource.ok()) {
1238 return resource.status();
1239 }
1240 return XdsServerConfigSelector::Create(
1241 static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
1242 .http_filter_registry(),
1243 resource.value(), http_filters_);
1244 }
1245
1246 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
CancelWatch()1247 DynamicXdsServerConfigSelectorProvider::CancelWatch() {
1248 MutexLock lock(&mu_);
1249 watcher_.reset();
1250 }
1251
1252 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
OnRouteConfigChanged(absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> rds_update)1253 DynamicXdsServerConfigSelectorProvider::OnRouteConfigChanged(
1254 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
1255 rds_update) {
1256 MutexLock lock(&mu_);
1257 if (!rds_update.ok()) {
1258 rds_update = absl::UnavailableError(absl::StrCat(
1259 "RDS resource ", resource_name_, ": ", rds_update.status().message()));
1260 }
1261 resource_ = std::move(rds_update);
1262 if (watcher_ == nullptr) {
1263 return;
1264 }
1265 // Currently server_config_selector_filter does not call into
1266 // DynamicXdsServerConfigSelectorProvider while holding a lock, but if that
1267 // ever changes, we would want to invoke the update outside the critical
1268 // region with the use of a WorkSerializer.
1269 if (!resource_.ok()) {
1270 watcher_->OnServerConfigSelectorUpdate(resource_.status());
1271 } else {
1272 watcher_->OnServerConfigSelectorUpdate(XdsServerConfigSelector::Create(
1273 static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
1274 .http_filter_registry(),
1275 *resource_, http_filters_));
1276 }
1277 }
1278
1279 void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
OnAmbientError(absl::Status status)1280 DynamicXdsServerConfigSelectorProvider::OnAmbientError(
1281 absl::Status status) {
1282 LOG(ERROR) << "RouteConfigWatcher:" << this
1283 << " XdsClient reports ambient error: " << status << " for "
1284 << resource_name_ << "; ignoring in favor of existing resource";
1285 }
1286
1287 } // namespace
1288 } // namespace grpc_core
1289
grpc_server_config_fetcher_xds_create(grpc_server_xds_status_notifier notifier,const grpc_channel_args * args)1290 grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
1291 grpc_server_xds_status_notifier notifier, const grpc_channel_args* args) {
1292 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1293 grpc_core::ExecCtx exec_ctx;
1294 grpc_core::ChannelArgs channel_args = grpc_core::CoreConfiguration::Get()
1295 .channel_args_preconditioning()
1296 .PreconditionChannelArgs(args);
1297 GRPC_TRACE_LOG(api, INFO)
1298 << "grpc_server_config_fetcher_xds_create(notifier={on_serving_status_"
1299 "update="
1300 << notifier.on_serving_status_update
1301 << ", user_data=" << notifier.user_data << "}, args=" << args << ")";
1302 auto xds_client = grpc_core::GrpcXdsClient::GetOrCreate(
1303 grpc_core::GrpcXdsClient::kServerKey, channel_args,
1304 "XdsServerConfigFetcher");
1305 if (!xds_client.ok()) {
1306 LOG(ERROR) << "Failed to create xds client: " << xds_client.status();
1307 return nullptr;
1308 }
1309 if (static_cast<const grpc_core::GrpcXdsBootstrap&>(
1310 (*xds_client)->bootstrap())
1311 .server_listener_resource_name_template()
1312 .empty()) {
1313 LOG(ERROR) << "server_listener_resource_name_template not provided in "
1314 "bootstrap file.";
1315 return nullptr;
1316 }
1317 return (new grpc_core::XdsServerConfigFetcher(std::move(*xds_client),
1318 notifier))
1319 ->c_ptr();
1320 }
1321