1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/xds/grpc/xds_client_grpc.h"
18
19 #include <grpc/grpc.h>
20 #include <grpc/impl/channel_arg_names.h>
21 #include <grpc/slice.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/port_platform.h>
24 #include <grpc/support/string_util.h>
25
26 #include <algorithm>
27 #include <cstddef>
28 #include <memory>
29 #include <string>
30 #include <utility>
31 #include <vector>
32
33 #include "absl/base/thread_annotations.h"
34 #include "absl/log/log.h"
35 #include "absl/status/status.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/string_view.h"
38 #include "absl/types/optional.h"
39 #include "envoy/service/status/v3/csds.upb.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/debug/trace.h"
42 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
43 #include "src/core/lib/event_engine/default_event_engine.h"
44 #include "src/core/lib/iomgr/error.h"
45 #include "src/core/lib/iomgr/exec_ctx.h"
46 #include "src/core/lib/slice/slice.h"
47 #include "src/core/lib/slice/slice_internal.h"
48 #include "src/core/lib/transport/error_utils.h"
49 #include "src/core/telemetry/metrics.h"
50 #include "src/core/util/debug_location.h"
51 #include "src/core/util/env.h"
52 #include "src/core/util/load_file.h"
53 #include "src/core/util/orphanable.h"
54 #include "src/core/util/ref_counted_ptr.h"
55 #include "src/core/util/sync.h"
56 #include "src/core/util/time.h"
57 #include "src/core/util/upb_utils.h"
58 #include "src/core/xds/grpc/xds_bootstrap_grpc.h"
59 #include "src/core/xds/grpc/xds_transport_grpc.h"
60 #include "src/core/xds/xds_client/xds_api.h"
61 #include "src/core/xds/xds_client/xds_bootstrap.h"
62 #include "src/core/xds/xds_client/xds_channel_args.h"
63 #include "src/core/xds/xds_client/xds_client.h"
64 #include "src/core/xds/xds_client/xds_transport.h"
65 #include "upb/base/string_view.h"
66
67 // If gRPC is built with -DGRPC_XDS_USER_AGENT_NAME_SUFFIX="...", that string
68 // will be appended to the user agent name reported to the xDS server.
69 #ifdef GRPC_XDS_USER_AGENT_NAME_SUFFIX
70 #define GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING \
71 " " GRPC_XDS_USER_AGENT_NAME_SUFFIX
72 #else
73 #define GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING ""
74 #endif
75
76 // If gRPC is built with -DGRPC_XDS_USER_AGENT_VERSION_SUFFIX="...", that string
77 // will be appended to the user agent version reported to the xDS server.
78 #ifdef GRPC_XDS_USER_AGENT_VERSION_SUFFIX
79 #define GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING \
80 " " GRPC_XDS_USER_AGENT_VERSION_SUFFIX
81 #else
82 #define GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING ""
83 #endif
84
85 namespace grpc_core {
86
87 namespace {
88
89 // Metric labels.
90 constexpr absl::string_view kMetricLabelXdsServer = "grpc.xds.server";
91 constexpr absl::string_view kMetricLabelXdsAuthority = "grpc.xds.authority";
92 constexpr absl::string_view kMetricLabelXdsResourceType =
93 "grpc.xds.resource_type";
94 constexpr absl::string_view kMetricLabelXdsCacheState = "grpc.xds.cache_state";
95
96 const auto kMetricResourceUpdatesValid =
97 GlobalInstrumentsRegistry::RegisterUInt64Counter(
98 "grpc.xds_client.resource_updates_valid",
99 "EXPERIMENTAL. A counter of resources received that were considered "
100 "valid. The counter will be incremented even for resources that "
101 "have not changed.",
102 "{resource}", false)
103 .Labels(kMetricLabelTarget, kMetricLabelXdsServer,
104 kMetricLabelXdsResourceType)
105 .Build();
106
107 const auto kMetricResourceUpdatesInvalid =
108 GlobalInstrumentsRegistry::RegisterUInt64Counter(
109 "grpc.xds_client.resource_updates_invalid",
110 "EXPERIMENTAL. A counter of resources received that were considered "
111 "invalid.",
112 "{resource}", false)
113 .Labels(kMetricLabelTarget, kMetricLabelXdsServer,
114 kMetricLabelXdsResourceType)
115 .Build();
116
117 const auto kMetricServerFailure =
118 GlobalInstrumentsRegistry::RegisterUInt64Counter(
119 "grpc.xds_client.server_failure",
120 "EXPERIMENTAL. A counter of xDS servers going from healthy to "
121 "unhealthy. A server goes unhealthy when we have a connectivity "
122 "failure or when the ADS stream fails without seeing a response "
123 "message, as per gRFC A57.",
124 "{failure}", false)
125 .Labels(kMetricLabelTarget, kMetricLabelXdsServer)
126 .Build();
127
128 const auto kMetricConnected =
129 GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
130 "grpc.xds_client.connected",
131 "EXPERIMENTAL. Whether or not the xDS client currently has a "
132 "working ADS stream to the xDS server. For a given server, this "
133 "will be set to 0 when we have a connectivity failure or when the "
134 "ADS stream fails without seeing a response message, as per gRFC "
135 "A57. It will be set to 1 when we receive the first response on "
136 "an ADS stream.",
137 "{bool}", false)
138 .Labels(kMetricLabelTarget, kMetricLabelXdsServer)
139 .Build();
140
141 const auto kMetricResources =
142 GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
143 "grpc.xds_client.resources", "EXPERIMENTAL. Number of xDS resources.",
144 "{resource}", false)
145 .Labels(kMetricLabelTarget, kMetricLabelXdsAuthority,
146 kMetricLabelXdsResourceType, kMetricLabelXdsCacheState)
147 .Build();
148
149 } // namespace
150
151 //
152 // GrpcXdsClient::MetricsReporter
153 //
154
155 class GrpcXdsClient::MetricsReporter final : public XdsMetricsReporter {
156 public:
MetricsReporter(GrpcXdsClient & xds_client)157 explicit MetricsReporter(GrpcXdsClient& xds_client)
158 : xds_client_(xds_client) {}
159
ReportResourceUpdates(absl::string_view xds_server,absl::string_view resource_type,uint64_t num_valid_resources,uint64_t num_invalid_resources)160 void ReportResourceUpdates(absl::string_view xds_server,
161 absl::string_view resource_type,
162 uint64_t num_valid_resources,
163 uint64_t num_invalid_resources) override {
164 xds_client_.stats_plugin_group_.AddCounter(
165 kMetricResourceUpdatesValid, num_valid_resources,
166 {xds_client_.key_, xds_server, resource_type}, {});
167 xds_client_.stats_plugin_group_.AddCounter(
168 kMetricResourceUpdatesInvalid, num_invalid_resources,
169 {xds_client_.key_, xds_server, resource_type}, {});
170 }
171
ReportServerFailure(absl::string_view xds_server)172 void ReportServerFailure(absl::string_view xds_server) override {
173 xds_client_.stats_plugin_group_.AddCounter(
174 kMetricServerFailure, 1, {xds_client_.key_, xds_server}, {});
175 }
176
177 private:
178 GrpcXdsClient& xds_client_;
179 };
180
181 //
182 // GrpcXdsClient
183 //
184
185 constexpr absl::string_view GrpcXdsClient::kServerKey;
186
187 namespace {
188
189 Mutex* g_mu = new Mutex;
190 const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr;
191 // Key bytes live in clients so they outlive the entries in this map
192 NoDestruct<std::map<absl::string_view, GrpcXdsClient*>> g_xds_client_map
193 ABSL_GUARDED_BY(*g_mu);
194 char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
195
GetBootstrapContents(const char * fallback_config)196 absl::StatusOr<std::string> GetBootstrapContents(const char* fallback_config) {
197 // First, try GRPC_XDS_BOOTSTRAP env var.
198 auto path = GetEnv("GRPC_XDS_BOOTSTRAP");
199 if (path.has_value()) {
200 GRPC_TRACE_LOG(xds_client, INFO)
201 << "Got bootstrap file location from GRPC_XDS_BOOTSTRAP "
202 "environment variable: "
203 << *path;
204 auto contents = LoadFile(*path, /*add_null_terminator=*/true);
205 if (!contents.ok()) return contents.status();
206 return std::string(contents->as_string_view());
207 }
208 // Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
209 auto env_config = GetEnv("GRPC_XDS_BOOTSTRAP_CONFIG");
210 if (env_config.has_value()) {
211 GRPC_TRACE_LOG(xds_client, INFO)
212 << "Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG "
213 << "environment variable";
214 return std::move(*env_config);
215 }
216 // Finally, try fallback config.
217 if (fallback_config != nullptr) {
218 GRPC_TRACE_LOG(xds_client, INFO)
219 << "Got bootstrap contents from fallback config";
220 return fallback_config;
221 }
222 // No bootstrap config found.
223 return absl::FailedPreconditionError(
224 "Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
225 "not defined");
226 }
227
228 GlobalStatsPluginRegistry::StatsPluginGroup
GetStatsPluginGroupForKeyAndChannelArgs(absl::string_view key,const ChannelArgs & channel_args)229 GetStatsPluginGroupForKeyAndChannelArgs(absl::string_view key,
230 const ChannelArgs& channel_args) {
231 if (key == GrpcXdsClient::kServerKey) {
232 return GlobalStatsPluginRegistry::GetStatsPluginsForServer(channel_args);
233 }
234 grpc_event_engine::experimental::ChannelArgsEndpointConfig endpoint_config(
235 channel_args);
236 std::string authority =
237 channel_args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY)
238 .value_or(
239 CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
240 key));
241 experimental::StatsPluginChannelScope scope(key, authority, endpoint_config);
242 return GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
243 }
244
245 } // namespace
246
GetOrCreate(absl::string_view key,const ChannelArgs & args,const char * reason)247 absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GrpcXdsClient::GetOrCreate(
248 absl::string_view key, const ChannelArgs& args, const char* reason) {
249 // If getting bootstrap from channel args, create a local XdsClient
250 // instance for the channel or server instead of using the global instance.
251 absl::optional<absl::string_view> bootstrap_config = args.GetString(
252 GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG);
253 if (bootstrap_config.has_value()) {
254 auto bootstrap = GrpcXdsBootstrap::Create(*bootstrap_config);
255 if (!bootstrap.ok()) return bootstrap.status();
256 grpc_channel_args* xds_channel_args = args.GetPointer<grpc_channel_args>(
257 GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS);
258 auto channel_args = ChannelArgs::FromC(xds_channel_args);
259 return MakeRefCounted<GrpcXdsClient>(
260 key, std::move(*bootstrap), channel_args,
261 MakeRefCounted<GrpcXdsTransportFactory>(channel_args),
262 GetStatsPluginGroupForKeyAndChannelArgs(key, args));
263 }
264 // Otherwise, use the global instance.
265 MutexLock lock(g_mu);
266 auto it = g_xds_client_map->find(key);
267 if (it != g_xds_client_map->end()) {
268 auto xds_client = it->second->RefIfNonZero(DEBUG_LOCATION, reason);
269 if (xds_client != nullptr) {
270 return xds_client.TakeAsSubclass<GrpcXdsClient>();
271 }
272 }
273 // Find bootstrap contents.
274 auto bootstrap_contents = GetBootstrapContents(g_fallback_bootstrap_config);
275 if (!bootstrap_contents.ok()) return bootstrap_contents.status();
276 GRPC_TRACE_LOG(xds_client, INFO)
277 << "xDS bootstrap contents: " << *bootstrap_contents;
278 // Parse bootstrap.
279 auto bootstrap = GrpcXdsBootstrap::Create(*bootstrap_contents);
280 if (!bootstrap.ok()) return bootstrap.status();
281 // Instantiate XdsClient.
282 auto channel_args = ChannelArgs::FromC(g_channel_args);
283 auto xds_client = MakeRefCounted<GrpcXdsClient>(
284 key, std::move(*bootstrap), channel_args,
285 MakeRefCounted<GrpcXdsTransportFactory>(channel_args),
286 GetStatsPluginGroupForKeyAndChannelArgs(key, args));
287 g_xds_client_map->emplace(xds_client->key(), xds_client.get());
288 GRPC_TRACE_LOG(xds_client, INFO) << "[xds_client " << xds_client.get()
289 << "] Created xDS client for key " << key;
290 return xds_client;
291 }
292
293 namespace {
294
UserAgentName()295 std::string UserAgentName() {
296 return absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING,
297 GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING);
298 }
299
UserAgentVersion()300 std::string UserAgentVersion() {
301 return absl::StrCat("C-core ", grpc_version_string(),
302 GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING,
303 GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING);
304 }
305
306 } // namespace
307
GrpcXdsClient(absl::string_view key,std::shared_ptr<GrpcXdsBootstrap> bootstrap,const ChannelArgs & args,RefCountedPtr<XdsTransportFactory> transport_factory,GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group)308 GrpcXdsClient::GrpcXdsClient(
309 absl::string_view key, std::shared_ptr<GrpcXdsBootstrap> bootstrap,
310 const ChannelArgs& args,
311 RefCountedPtr<XdsTransportFactory> transport_factory,
312 GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group)
313 : XdsClient(
314 bootstrap, transport_factory,
315 grpc_event_engine::experimental::GetDefaultEventEngine(),
316 std::make_unique<MetricsReporter>(*this), UserAgentName(),
317 UserAgentVersion(),
318 std::max(Duration::Zero(),
319 args.GetDurationFromIntMillis(
320 GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS)
321 .value_or(Duration::Seconds(15)))),
322 key_(key),
323 certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
324 static_cast<const GrpcXdsBootstrap&>(this->bootstrap())
325 .certificate_providers())),
326 stats_plugin_group_(std::move(stats_plugin_group)),
327 registered_metric_callback_(stats_plugin_group_.RegisterCallback(
328 [this](CallbackMetricReporter& reporter) {
329 ReportCallbackMetrics(reporter);
330 },
331 Duration::Seconds(5), kMetricConnected, kMetricResources)),
332 lrs_client_(MakeRefCounted<LrsClient>(
333 std::move(bootstrap), UserAgentName(), UserAgentVersion(),
334 std::move(transport_factory),
335 grpc_event_engine::experimental::GetDefaultEventEngine())) {}
336
Orphaned()337 void GrpcXdsClient::Orphaned() {
338 registered_metric_callback_.reset();
339 XdsClient::Orphaned();
340 lrs_client_.reset();
341 MutexLock lock(g_mu);
342 auto it = g_xds_client_map->find(key_);
343 if (it != g_xds_client_map->end() && it->second == this) {
344 g_xds_client_map->erase(it);
345 }
346 }
347
ResetBackoff()348 void GrpcXdsClient::ResetBackoff() {
349 XdsClient::ResetBackoff();
350 lrs_client_->ResetBackoff();
351 }
352
interested_parties() const353 grpc_pollset_set* GrpcXdsClient::interested_parties() const {
354 return reinterpret_cast<GrpcXdsTransportFactory*>(transport_factory())
355 ->interested_parties();
356 }
357
358 namespace {
359
GetAllXdsClients()360 std::vector<RefCountedPtr<GrpcXdsClient>> GetAllXdsClients() {
361 MutexLock lock(g_mu);
362 std::vector<RefCountedPtr<GrpcXdsClient>> xds_clients;
363 for (const auto& key_client : *g_xds_client_map) {
364 auto xds_client =
365 key_client.second->RefIfNonZero(DEBUG_LOCATION, "DumpAllClientConfigs");
366 if (xds_client != nullptr) {
367 xds_clients.emplace_back(xds_client.TakeAsSubclass<GrpcXdsClient>());
368 }
369 }
370 return xds_clients;
371 }
372
373 } // namespace
374
375 // ABSL_NO_THREAD_SAFETY_ANALYSIS because we have to manually manage locks for
376 // individual XdsClients and compiler struggles with checking the validity
DumpAllClientConfigs()377 grpc_slice GrpcXdsClient::DumpAllClientConfigs()
378 ABSL_NO_THREAD_SAFETY_ANALYSIS {
379 auto xds_clients = GetAllXdsClients();
380 upb::Arena arena;
381 // Contains strings that should survive till serialization
382 std::set<std::string> string_pool;
383 auto response = envoy_service_status_v3_ClientStatusResponse_new(arena.ptr());
384 // We lock each XdsClient mutex till we are done with the serialization to
385 // ensure that all data referenced from the UPB proto message stays alive.
386 for (const auto& xds_client : xds_clients) {
387 auto client_config =
388 envoy_service_status_v3_ClientStatusResponse_add_config(response,
389 arena.ptr());
390 xds_client->mu()->Lock();
391 xds_client->DumpClientConfig(&string_pool, arena.ptr(), client_config);
392 envoy_service_status_v3_ClientConfig_set_client_scope(
393 client_config, StdStringToUpbString(xds_client->key()));
394 }
395 // Serialize the upb message to bytes
396 size_t output_length;
397 char* output = envoy_service_status_v3_ClientStatusResponse_serialize(
398 response, arena.ptr(), &output_length);
399 for (const auto& xds_client : xds_clients) {
400 xds_client->mu()->Unlock();
401 }
402 return grpc_slice_from_cpp_string(std::string(output, output_length));
403 }
404
ReportCallbackMetrics(CallbackMetricReporter & reporter)405 void GrpcXdsClient::ReportCallbackMetrics(CallbackMetricReporter& reporter) {
406 MutexLock lock(mu());
407 ReportResourceCounts([&](const ResourceCountLabels& labels, uint64_t count) {
408 reporter.Report(
409 kMetricResources, count,
410 {key_, labels.xds_authority, labels.resource_type, labels.cache_state},
411 {});
412 });
413 ReportServerConnections([&](absl::string_view xds_server, bool connected) {
414 reporter.Report(kMetricConnected, connected, {key_, xds_server}, {});
415 });
416 }
417
418 namespace internal {
419
SetXdsChannelArgsForTest(grpc_channel_args * args)420 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
421 MutexLock lock(g_mu);
422 g_channel_args = args;
423 }
424
UnsetGlobalXdsClientsForTest()425 void UnsetGlobalXdsClientsForTest() {
426 MutexLock lock(g_mu);
427 g_xds_client_map->clear();
428 }
429
SetXdsFallbackBootstrapConfig(const char * config)430 void SetXdsFallbackBootstrapConfig(const char* config) {
431 MutexLock lock(g_mu);
432 gpr_free(g_fallback_bootstrap_config);
433 g_fallback_bootstrap_config = gpr_strdup(config);
434 }
435
436 } // namespace internal
437
438 } // namespace grpc_core
439
440 // The returned bytes may contain NULL(0), so we can't use c-string.
grpc_dump_xds_configs(void)441 grpc_slice grpc_dump_xds_configs(void) {
442 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
443 grpc_core::ExecCtx exec_ctx;
444 return grpc_core::GrpcXdsClient::DumpAllClientConfigs();
445 }
446