• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/xds/grpc/xds_client_grpc.h"
18 
19 #include <grpc/grpc.h>
20 #include <grpc/impl/channel_arg_names.h>
21 #include <grpc/slice.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/port_platform.h>
24 #include <grpc/support/string_util.h>
25 
26 #include <algorithm>
27 #include <cstddef>
28 #include <memory>
29 #include <string>
30 #include <utility>
31 #include <vector>
32 
33 #include "absl/base/thread_annotations.h"
34 #include "absl/log/log.h"
35 #include "absl/status/status.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/string_view.h"
38 #include "absl/types/optional.h"
39 #include "envoy/service/status/v3/csds.upb.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/debug/trace.h"
42 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
43 #include "src/core/lib/event_engine/default_event_engine.h"
44 #include "src/core/lib/iomgr/error.h"
45 #include "src/core/lib/iomgr/exec_ctx.h"
46 #include "src/core/lib/slice/slice.h"
47 #include "src/core/lib/slice/slice_internal.h"
48 #include "src/core/lib/transport/error_utils.h"
49 #include "src/core/telemetry/metrics.h"
50 #include "src/core/util/debug_location.h"
51 #include "src/core/util/env.h"
52 #include "src/core/util/load_file.h"
53 #include "src/core/util/orphanable.h"
54 #include "src/core/util/ref_counted_ptr.h"
55 #include "src/core/util/sync.h"
56 #include "src/core/util/time.h"
57 #include "src/core/util/upb_utils.h"
58 #include "src/core/xds/grpc/xds_bootstrap_grpc.h"
59 #include "src/core/xds/grpc/xds_transport_grpc.h"
60 #include "src/core/xds/xds_client/xds_api.h"
61 #include "src/core/xds/xds_client/xds_bootstrap.h"
62 #include "src/core/xds/xds_client/xds_channel_args.h"
63 #include "src/core/xds/xds_client/xds_client.h"
64 #include "src/core/xds/xds_client/xds_transport.h"
65 #include "upb/base/string_view.h"
66 
67 // If gRPC is built with -DGRPC_XDS_USER_AGENT_NAME_SUFFIX="...", that string
68 // will be appended to the user agent name reported to the xDS server.
69 #ifdef GRPC_XDS_USER_AGENT_NAME_SUFFIX
70 #define GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING \
71   " " GRPC_XDS_USER_AGENT_NAME_SUFFIX
72 #else
73 #define GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING ""
74 #endif
75 
76 // If gRPC is built with -DGRPC_XDS_USER_AGENT_VERSION_SUFFIX="...", that string
77 // will be appended to the user agent version reported to the xDS server.
78 #ifdef GRPC_XDS_USER_AGENT_VERSION_SUFFIX
79 #define GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING \
80   " " GRPC_XDS_USER_AGENT_VERSION_SUFFIX
81 #else
82 #define GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING ""
83 #endif
84 
85 namespace grpc_core {
86 
87 namespace {
88 
89 // Metric labels.
90 constexpr absl::string_view kMetricLabelXdsServer = "grpc.xds.server";
91 constexpr absl::string_view kMetricLabelXdsAuthority = "grpc.xds.authority";
92 constexpr absl::string_view kMetricLabelXdsResourceType =
93     "grpc.xds.resource_type";
94 constexpr absl::string_view kMetricLabelXdsCacheState = "grpc.xds.cache_state";
95 
96 const auto kMetricResourceUpdatesValid =
97     GlobalInstrumentsRegistry::RegisterUInt64Counter(
98         "grpc.xds_client.resource_updates_valid",
99         "EXPERIMENTAL.  A counter of resources received that were considered "
100         "valid.  The counter will be incremented even for resources that "
101         "have not changed.",
102         "{resource}", false)
103         .Labels(kMetricLabelTarget, kMetricLabelXdsServer,
104                 kMetricLabelXdsResourceType)
105         .Build();
106 
107 const auto kMetricResourceUpdatesInvalid =
108     GlobalInstrumentsRegistry::RegisterUInt64Counter(
109         "grpc.xds_client.resource_updates_invalid",
110         "EXPERIMENTAL.  A counter of resources received that were considered "
111         "invalid.",
112         "{resource}", false)
113         .Labels(kMetricLabelTarget, kMetricLabelXdsServer,
114                 kMetricLabelXdsResourceType)
115         .Build();
116 
117 const auto kMetricServerFailure =
118     GlobalInstrumentsRegistry::RegisterUInt64Counter(
119         "grpc.xds_client.server_failure",
120         "EXPERIMENTAL.  A counter of xDS servers going from healthy to "
121         "unhealthy.  A server goes unhealthy when we have a connectivity "
122         "failure or when the ADS stream fails without seeing a response "
123         "message, as per gRFC A57.",
124         "{failure}", false)
125         .Labels(kMetricLabelTarget, kMetricLabelXdsServer)
126         .Build();
127 
128 const auto kMetricConnected =
129     GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
130         "grpc.xds_client.connected",
131         "EXPERIMENTAL.  Whether or not the xDS client currently has a "
132         "working ADS stream to the xDS server.  For a given server, this "
133         "will be set to 0 when we have a connectivity failure or when the "
134         "ADS stream fails without seeing a response message, as per gRFC "
135         "A57.  It will be set to 1 when we receive the first response on "
136         "an ADS stream.",
137         "{bool}", false)
138         .Labels(kMetricLabelTarget, kMetricLabelXdsServer)
139         .Build();
140 
141 const auto kMetricResources =
142     GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
143         "grpc.xds_client.resources", "EXPERIMENTAL.  Number of xDS resources.",
144         "{resource}", false)
145         .Labels(kMetricLabelTarget, kMetricLabelXdsAuthority,
146                 kMetricLabelXdsResourceType, kMetricLabelXdsCacheState)
147         .Build();
148 
149 }  // namespace
150 
151 //
152 // GrpcXdsClient::MetricsReporter
153 //
154 
155 class GrpcXdsClient::MetricsReporter final : public XdsMetricsReporter {
156  public:
MetricsReporter(GrpcXdsClient & xds_client)157   explicit MetricsReporter(GrpcXdsClient& xds_client)
158       : xds_client_(xds_client) {}
159 
ReportResourceUpdates(absl::string_view xds_server,absl::string_view resource_type,uint64_t num_valid_resources,uint64_t num_invalid_resources)160   void ReportResourceUpdates(absl::string_view xds_server,
161                              absl::string_view resource_type,
162                              uint64_t num_valid_resources,
163                              uint64_t num_invalid_resources) override {
164     xds_client_.stats_plugin_group_.AddCounter(
165         kMetricResourceUpdatesValid, num_valid_resources,
166         {xds_client_.key_, xds_server, resource_type}, {});
167     xds_client_.stats_plugin_group_.AddCounter(
168         kMetricResourceUpdatesInvalid, num_invalid_resources,
169         {xds_client_.key_, xds_server, resource_type}, {});
170   }
171 
ReportServerFailure(absl::string_view xds_server)172   void ReportServerFailure(absl::string_view xds_server) override {
173     xds_client_.stats_plugin_group_.AddCounter(
174         kMetricServerFailure, 1, {xds_client_.key_, xds_server}, {});
175   }
176 
177  private:
178   GrpcXdsClient& xds_client_;
179 };
180 
181 //
182 // GrpcXdsClient
183 //
184 
185 constexpr absl::string_view GrpcXdsClient::kServerKey;
186 
187 namespace {
188 
189 Mutex* g_mu = new Mutex;
190 const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr;
191 // Key bytes live in clients so they outlive the entries in this map
192 NoDestruct<std::map<absl::string_view, GrpcXdsClient*>> g_xds_client_map
193     ABSL_GUARDED_BY(*g_mu);
194 char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
195 
GetBootstrapContents(const char * fallback_config)196 absl::StatusOr<std::string> GetBootstrapContents(const char* fallback_config) {
197   // First, try GRPC_XDS_BOOTSTRAP env var.
198   auto path = GetEnv("GRPC_XDS_BOOTSTRAP");
199   if (path.has_value()) {
200     GRPC_TRACE_LOG(xds_client, INFO)
201         << "Got bootstrap file location from GRPC_XDS_BOOTSTRAP "
202            "environment variable: "
203         << *path;
204     auto contents = LoadFile(*path, /*add_null_terminator=*/true);
205     if (!contents.ok()) return contents.status();
206     return std::string(contents->as_string_view());
207   }
208   // Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
209   auto env_config = GetEnv("GRPC_XDS_BOOTSTRAP_CONFIG");
210   if (env_config.has_value()) {
211     GRPC_TRACE_LOG(xds_client, INFO)
212         << "Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG "
213         << "environment variable";
214     return std::move(*env_config);
215   }
216   // Finally, try fallback config.
217   if (fallback_config != nullptr) {
218     GRPC_TRACE_LOG(xds_client, INFO)
219         << "Got bootstrap contents from fallback config";
220     return fallback_config;
221   }
222   // No bootstrap config found.
223   return absl::FailedPreconditionError(
224       "Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
225       "not defined");
226 }
227 
228 GlobalStatsPluginRegistry::StatsPluginGroup
GetStatsPluginGroupForKeyAndChannelArgs(absl::string_view key,const ChannelArgs & channel_args)229 GetStatsPluginGroupForKeyAndChannelArgs(absl::string_view key,
230                                         const ChannelArgs& channel_args) {
231   if (key == GrpcXdsClient::kServerKey) {
232     return GlobalStatsPluginRegistry::GetStatsPluginsForServer(channel_args);
233   }
234   grpc_event_engine::experimental::ChannelArgsEndpointConfig endpoint_config(
235       channel_args);
236   std::string authority =
237       channel_args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY)
238           .value_or(
239               CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
240                   key));
241   experimental::StatsPluginChannelScope scope(key, authority, endpoint_config);
242   return GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
243 }
244 
245 }  // namespace
246 
GetOrCreate(absl::string_view key,const ChannelArgs & args,const char * reason)247 absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GrpcXdsClient::GetOrCreate(
248     absl::string_view key, const ChannelArgs& args, const char* reason) {
249   // If getting bootstrap from channel args, create a local XdsClient
250   // instance for the channel or server instead of using the global instance.
251   absl::optional<absl::string_view> bootstrap_config = args.GetString(
252       GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG);
253   if (bootstrap_config.has_value()) {
254     auto bootstrap = GrpcXdsBootstrap::Create(*bootstrap_config);
255     if (!bootstrap.ok()) return bootstrap.status();
256     grpc_channel_args* xds_channel_args = args.GetPointer<grpc_channel_args>(
257         GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS);
258     auto channel_args = ChannelArgs::FromC(xds_channel_args);
259     return MakeRefCounted<GrpcXdsClient>(
260         key, std::move(*bootstrap), channel_args,
261         MakeRefCounted<GrpcXdsTransportFactory>(channel_args),
262         GetStatsPluginGroupForKeyAndChannelArgs(key, args));
263   }
264   // Otherwise, use the global instance.
265   MutexLock lock(g_mu);
266   auto it = g_xds_client_map->find(key);
267   if (it != g_xds_client_map->end()) {
268     auto xds_client = it->second->RefIfNonZero(DEBUG_LOCATION, reason);
269     if (xds_client != nullptr) {
270       return xds_client.TakeAsSubclass<GrpcXdsClient>();
271     }
272   }
273   // Find bootstrap contents.
274   auto bootstrap_contents = GetBootstrapContents(g_fallback_bootstrap_config);
275   if (!bootstrap_contents.ok()) return bootstrap_contents.status();
276   GRPC_TRACE_LOG(xds_client, INFO)
277       << "xDS bootstrap contents: " << *bootstrap_contents;
278   // Parse bootstrap.
279   auto bootstrap = GrpcXdsBootstrap::Create(*bootstrap_contents);
280   if (!bootstrap.ok()) return bootstrap.status();
281   // Instantiate XdsClient.
282   auto channel_args = ChannelArgs::FromC(g_channel_args);
283   auto xds_client = MakeRefCounted<GrpcXdsClient>(
284       key, std::move(*bootstrap), channel_args,
285       MakeRefCounted<GrpcXdsTransportFactory>(channel_args),
286       GetStatsPluginGroupForKeyAndChannelArgs(key, args));
287   g_xds_client_map->emplace(xds_client->key(), xds_client.get());
288   GRPC_TRACE_LOG(xds_client, INFO) << "[xds_client " << xds_client.get()
289                                    << "] Created xDS client for key " << key;
290   return xds_client;
291 }
292 
293 namespace {
294 
UserAgentName()295 std::string UserAgentName() {
296   return absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING,
297                       GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING);
298 }
299 
UserAgentVersion()300 std::string UserAgentVersion() {
301   return absl::StrCat("C-core ", grpc_version_string(),
302                       GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING,
303                       GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING);
304 }
305 
306 }  // namespace
307 
GrpcXdsClient(absl::string_view key,std::shared_ptr<GrpcXdsBootstrap> bootstrap,const ChannelArgs & args,RefCountedPtr<XdsTransportFactory> transport_factory,GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group)308 GrpcXdsClient::GrpcXdsClient(
309     absl::string_view key, std::shared_ptr<GrpcXdsBootstrap> bootstrap,
310     const ChannelArgs& args,
311     RefCountedPtr<XdsTransportFactory> transport_factory,
312     GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group)
313     : XdsClient(
314           bootstrap, transport_factory,
315           grpc_event_engine::experimental::GetDefaultEventEngine(),
316           std::make_unique<MetricsReporter>(*this), UserAgentName(),
317           UserAgentVersion(),
318           std::max(Duration::Zero(),
319                    args.GetDurationFromIntMillis(
320                            GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS)
321                        .value_or(Duration::Seconds(15)))),
322       key_(key),
323       certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
324           static_cast<const GrpcXdsBootstrap&>(this->bootstrap())
325               .certificate_providers())),
326       stats_plugin_group_(std::move(stats_plugin_group)),
327       registered_metric_callback_(stats_plugin_group_.RegisterCallback(
328           [this](CallbackMetricReporter& reporter) {
329             ReportCallbackMetrics(reporter);
330           },
331           Duration::Seconds(5), kMetricConnected, kMetricResources)),
332       lrs_client_(MakeRefCounted<LrsClient>(
333           std::move(bootstrap), UserAgentName(), UserAgentVersion(),
334           std::move(transport_factory),
335           grpc_event_engine::experimental::GetDefaultEventEngine())) {}
336 
Orphaned()337 void GrpcXdsClient::Orphaned() {
338   registered_metric_callback_.reset();
339   XdsClient::Orphaned();
340   lrs_client_.reset();
341   MutexLock lock(g_mu);
342   auto it = g_xds_client_map->find(key_);
343   if (it != g_xds_client_map->end() && it->second == this) {
344     g_xds_client_map->erase(it);
345   }
346 }
347 
ResetBackoff()348 void GrpcXdsClient::ResetBackoff() {
349   XdsClient::ResetBackoff();
350   lrs_client_->ResetBackoff();
351 }
352 
interested_parties() const353 grpc_pollset_set* GrpcXdsClient::interested_parties() const {
354   return reinterpret_cast<GrpcXdsTransportFactory*>(transport_factory())
355       ->interested_parties();
356 }
357 
358 namespace {
359 
GetAllXdsClients()360 std::vector<RefCountedPtr<GrpcXdsClient>> GetAllXdsClients() {
361   MutexLock lock(g_mu);
362   std::vector<RefCountedPtr<GrpcXdsClient>> xds_clients;
363   for (const auto& key_client : *g_xds_client_map) {
364     auto xds_client =
365         key_client.second->RefIfNonZero(DEBUG_LOCATION, "DumpAllClientConfigs");
366     if (xds_client != nullptr) {
367       xds_clients.emplace_back(xds_client.TakeAsSubclass<GrpcXdsClient>());
368     }
369   }
370   return xds_clients;
371 }
372 
373 }  // namespace
374 
375 // ABSL_NO_THREAD_SAFETY_ANALYSIS because we have to manually manage locks for
376 // individual XdsClients and compiler struggles with checking the validity
DumpAllClientConfigs()377 grpc_slice GrpcXdsClient::DumpAllClientConfigs()
378     ABSL_NO_THREAD_SAFETY_ANALYSIS {
379   auto xds_clients = GetAllXdsClients();
380   upb::Arena arena;
381   // Contains strings that should survive till serialization
382   std::set<std::string> string_pool;
383   auto response = envoy_service_status_v3_ClientStatusResponse_new(arena.ptr());
384   // We lock each XdsClient mutex till we are done with the serialization to
385   // ensure that all data referenced from the UPB proto message stays alive.
386   for (const auto& xds_client : xds_clients) {
387     auto client_config =
388         envoy_service_status_v3_ClientStatusResponse_add_config(response,
389                                                                 arena.ptr());
390     xds_client->mu()->Lock();
391     xds_client->DumpClientConfig(&string_pool, arena.ptr(), client_config);
392     envoy_service_status_v3_ClientConfig_set_client_scope(
393         client_config, StdStringToUpbString(xds_client->key()));
394   }
395   // Serialize the upb message to bytes
396   size_t output_length;
397   char* output = envoy_service_status_v3_ClientStatusResponse_serialize(
398       response, arena.ptr(), &output_length);
399   for (const auto& xds_client : xds_clients) {
400     xds_client->mu()->Unlock();
401   }
402   return grpc_slice_from_cpp_string(std::string(output, output_length));
403 }
404 
ReportCallbackMetrics(CallbackMetricReporter & reporter)405 void GrpcXdsClient::ReportCallbackMetrics(CallbackMetricReporter& reporter) {
406   MutexLock lock(mu());
407   ReportResourceCounts([&](const ResourceCountLabels& labels, uint64_t count) {
408     reporter.Report(
409         kMetricResources, count,
410         {key_, labels.xds_authority, labels.resource_type, labels.cache_state},
411         {});
412   });
413   ReportServerConnections([&](absl::string_view xds_server, bool connected) {
414     reporter.Report(kMetricConnected, connected, {key_, xds_server}, {});
415   });
416 }
417 
418 namespace internal {
419 
SetXdsChannelArgsForTest(grpc_channel_args * args)420 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
421   MutexLock lock(g_mu);
422   g_channel_args = args;
423 }
424 
UnsetGlobalXdsClientsForTest()425 void UnsetGlobalXdsClientsForTest() {
426   MutexLock lock(g_mu);
427   g_xds_client_map->clear();
428 }
429 
SetXdsFallbackBootstrapConfig(const char * config)430 void SetXdsFallbackBootstrapConfig(const char* config) {
431   MutexLock lock(g_mu);
432   gpr_free(g_fallback_bootstrap_config);
433   g_fallback_bootstrap_config = gpr_strdup(config);
434 }
435 
436 }  // namespace internal
437 
438 }  // namespace grpc_core
439 
440 // The returned bytes may contain NULL(0), so we can't use c-string.
grpc_dump_xds_configs(void)441 grpc_slice grpc_dump_xds_configs(void) {
442   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
443   grpc_core::ExecCtx exec_ctx;
444   return grpc_core::GrpcXdsClient::DumpAllClientConfigs();
445 }
446