1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/xds/xds_certificate_provider.h"
22 #include "src/core/ext/xds/xds_client.h"
23 #include "src/core/lib/channel/channel_args.h"
24 #include "src/core/lib/security/credentials/xds/xds_credentials.h"
25 #include "src/core/lib/surface/api_trace.h"
26 #include "src/core/lib/surface/server.h"
27
28 namespace grpc_core {
29
30 TraceFlag grpc_xds_server_config_fetcher_trace(false,
31 "xds_server_config_fetcher");
32
33 namespace {
34
35 class XdsServerConfigFetcher : public grpc_server_config_fetcher {
36 public:
XdsServerConfigFetcher(RefCountedPtr<XdsClient> xds_client)37 explicit XdsServerConfigFetcher(RefCountedPtr<XdsClient> xds_client)
38 : xds_client_(std::move(xds_client)) {
39 GPR_ASSERT(xds_client_ != nullptr);
40 }
41
StartWatch(std::string listening_address,grpc_channel_args * args,std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> watcher)42 void StartWatch(std::string listening_address, grpc_channel_args* args,
43 std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
44 watcher) override {
45 grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get();
46 auto listener_watcher = absl::make_unique<ListenerWatcher>(
47 std::move(watcher), args, xds_client_);
48 auto* listener_watcher_ptr = listener_watcher.get();
49 // TODO(yashykt): Get the resource name id from bootstrap
50 listening_address = absl::StrCat(
51 "grpc/server?xds.resource.listening_address=", listening_address);
52 xds_client_->WatchListenerData(listening_address,
53 std::move(listener_watcher));
54 MutexLock lock(&mu_);
55 auto& watcher_state = watchers_[watcher_ptr];
56 watcher_state.listening_address = listening_address;
57 watcher_state.listener_watcher = listener_watcher_ptr;
58 }
59
CancelWatch(grpc_server_config_fetcher::WatcherInterface * watcher)60 void CancelWatch(
61 grpc_server_config_fetcher::WatcherInterface* watcher) override {
62 MutexLock lock(&mu_);
63 auto it = watchers_.find(watcher);
64 if (it != watchers_.end()) {
65 // Cancel the watch on the listener before erasing
66 xds_client_->CancelListenerDataWatch(it->second.listening_address,
67 it->second.listener_watcher,
68 false /* delay_unsubscription */);
69 watchers_.erase(it);
70 }
71 }
72
73 // Return the interested parties from the xds client so that it can be polled.
interested_parties()74 grpc_pollset_set* interested_parties() override {
75 return xds_client_->interested_parties();
76 }
77
78 private:
79 class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
80 public:
ListenerWatcher(std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> server_config_watcher,grpc_channel_args * args,RefCountedPtr<XdsClient> xds_client)81 explicit ListenerWatcher(
82 std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
83 server_config_watcher,
84 grpc_channel_args* args, RefCountedPtr<XdsClient> xds_client)
85 : server_config_watcher_(std::move(server_config_watcher)),
86 args_(args),
87 xds_client_(std::move(xds_client)) {}
88
~ListenerWatcher()89 ~ListenerWatcher() override { grpc_channel_args_destroy(args_); }
90
91 // Deleted due to special handling required for args_. Copy the channel args
92 // if we ever need these.
93 ListenerWatcher(const ListenerWatcher&) = delete;
94 ListenerWatcher& operator=(const ListenerWatcher&) = delete;
95
OnListenerChanged(XdsApi::LdsUpdate listener)96 void OnListenerChanged(XdsApi::LdsUpdate listener) override {
97 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_server_config_fetcher_trace)) {
98 gpr_log(
99 GPR_INFO,
100 "[ListenerWatcher %p] Received LDS update from xds client %p: %s",
101 this, xds_client_.get(), listener.ToString().c_str());
102 }
103 grpc_error* error = GRPC_ERROR_NONE;
104 bool update_needed = UpdateXdsCertificateProvider(listener, &error);
105 if (error != GRPC_ERROR_NONE) {
106 OnError(error);
107 return;
108 }
109 // Only send an update, if something changed.
110 if (updated_once_ && !update_needed) {
111 return;
112 }
113 updated_once_ = true;
114 grpc_channel_args* updated_args = nullptr;
115 if (xds_certificate_provider_ != nullptr) {
116 grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg();
117 updated_args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1);
118 } else {
119 updated_args = grpc_channel_args_copy(args_);
120 }
121 server_config_watcher_->UpdateConfig(updated_args);
122 }
123
OnError(grpc_error * error)124 void OnError(grpc_error* error) override {
125 gpr_log(GPR_ERROR, "ListenerWatcher:%p XdsClient reports error: %s", this,
126 grpc_error_string(error));
127 GRPC_ERROR_UNREF(error);
128 // TODO(yashykt): We might want to bubble this error to the application.
129 }
130
OnResourceDoesNotExist()131 void OnResourceDoesNotExist() override {
132 gpr_log(GPR_ERROR,
133 "ListenerWatcher:%p XdsClient reports requested listener does "
134 "not exist",
135 this);
136 // TODO(yashykt): We might want to bubble this error to the application.
137 }
138
139 private:
140 // Returns true if the xds certificate provider changed in a way that
141 // required a new security connector to be created, false otherwise.
UpdateXdsCertificateProvider(const XdsApi::LdsUpdate & listener,grpc_error ** error)142 bool UpdateXdsCertificateProvider(const XdsApi::LdsUpdate& listener,
143 grpc_error** error) {
144 // Early out if channel is not configured to use xDS security.
145 grpc_server_credentials* server_creds =
146 grpc_find_server_credentials_in_args(args_);
147 if (server_creds == nullptr ||
148 server_creds->type() != kCredentialsTypeXds) {
149 xds_certificate_provider_ = nullptr;
150 return false;
151 }
152 if (xds_certificate_provider_ == nullptr) {
153 xds_certificate_provider_ = MakeRefCounted<XdsCertificateProvider>();
154 }
155 // Configure root cert.
156 absl::string_view root_provider_instance_name =
157 listener.downstream_tls_context.common_tls_context
158 .combined_validation_context
159 .validation_context_certificate_provider_instance.instance_name;
160 absl::string_view root_provider_cert_name =
161 listener.downstream_tls_context.common_tls_context
162 .combined_validation_context
163 .validation_context_certificate_provider_instance
164 .certificate_name;
165 RefCountedPtr<grpc_tls_certificate_provider> new_root_provider;
166 if (!root_provider_instance_name.empty()) {
167 new_root_provider =
168 xds_client_->certificate_provider_store()
169 .CreateOrGetCertificateProvider(root_provider_instance_name);
170 if (new_root_provider == nullptr) {
171 *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
172 absl::StrCat("Certificate provider instance name: \"",
173 root_provider_instance_name, "\" not recognized.")
174 .c_str());
175 return false;
176 }
177 }
178 // Configure identity cert.
179 absl::string_view identity_provider_instance_name =
180 listener.downstream_tls_context.common_tls_context
181 .tls_certificate_certificate_provider_instance.instance_name;
182 absl::string_view identity_provider_cert_name =
183 listener.downstream_tls_context.common_tls_context
184 .tls_certificate_certificate_provider_instance.certificate_name;
185 RefCountedPtr<grpc_tls_certificate_provider> new_identity_provider;
186 if (!identity_provider_instance_name.empty()) {
187 new_identity_provider = xds_client_->certificate_provider_store()
188 .CreateOrGetCertificateProvider(
189 identity_provider_instance_name);
190 if (new_identity_provider == nullptr) {
191 *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
192 absl::StrCat("Certificate provider instance name: \"",
193 identity_provider_instance_name,
194 "\" not recognized.")
195 .c_str());
196 return false;
197 }
198 }
199 bool security_connector_update_required = false;
200 if (((new_root_provider == nullptr) !=
201 (root_certificate_provider_ == nullptr)) ||
202 ((new_identity_provider == nullptr) !=
203 (identity_certificate_provider_ == nullptr)) ||
204 (listener.downstream_tls_context.require_client_certificate !=
205 xds_certificate_provider_->GetRequireClientCertificate(""))) {
206 security_connector_update_required = true;
207 }
208 if (root_certificate_provider_ != new_root_provider) {
209 root_certificate_provider_ = std::move(new_root_provider);
210 }
211 if (identity_certificate_provider_ != new_identity_provider) {
212 identity_certificate_provider_ = std::move(new_identity_provider);
213 }
214 xds_certificate_provider_->UpdateRootCertNameAndDistributor(
215 "", root_provider_cert_name,
216 root_certificate_provider_ == nullptr
217 ? nullptr
218 : root_certificate_provider_->distributor());
219 xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(
220 "", identity_provider_cert_name,
221 identity_certificate_provider_ == nullptr
222 ? nullptr
223 : identity_certificate_provider_->distributor());
224 xds_certificate_provider_->UpdateRequireClientCertificate(
225 "", listener.downstream_tls_context.require_client_certificate);
226 return security_connector_update_required;
227 }
228
229 std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
230 server_config_watcher_;
231 grpc_channel_args* args_;
232 RefCountedPtr<XdsClient> xds_client_;
233 RefCountedPtr<grpc_tls_certificate_provider> root_certificate_provider_;
234 RefCountedPtr<grpc_tls_certificate_provider> identity_certificate_provider_;
235 RefCountedPtr<XdsCertificateProvider> xds_certificate_provider_;
236 bool updated_once_ = false;
237 };
238
239 struct WatcherState {
240 std::string listening_address;
241 ListenerWatcher* listener_watcher = nullptr;
242 };
243
244 RefCountedPtr<XdsClient> xds_client_;
245 Mutex mu_;
246 std::map<grpc_server_config_fetcher::WatcherInterface*, WatcherState>
247 watchers_;
248 };
249
250 } // namespace
251 } // namespace grpc_core
252
grpc_server_config_fetcher_xds_create()253 grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() {
254 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
255 grpc_core::ExecCtx exec_ctx;
256 GRPC_API_TRACE("grpc_server_config_fetcher_xds_create()", 0, ());
257 grpc_error* error = GRPC_ERROR_NONE;
258 grpc_core::RefCountedPtr<grpc_core::XdsClient> xds_client =
259 grpc_core::XdsClient::GetOrCreate(&error);
260 if (error != GRPC_ERROR_NONE) {
261 gpr_log(GPR_ERROR, "Failed to create xds client: %s",
262 grpc_error_string(error));
263 GRPC_ERROR_UNREF(error);
264 return nullptr;
265 }
266 return new grpc_core::XdsServerConfigFetcher(std::move(xds_client));
267 }
268