1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/grpc.h>
18 #include <grpc/support/port_platform.h>
19 #include <grpcpp/channel.h>
20 #include <grpcpp/ext/gcp_observability.h>
21 #include <grpcpp/opencensus.h>
22 #include <grpcpp/security/credentials.h>
23 #include <grpcpp/support/channel_arguments.h>
24 #include <stdint.h>
25
26 #include <algorithm>
27 #include <map>
28 #include <memory>
29 #include <string>
30 #include <utility>
31 #include <vector>
32
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/time/clock.h"
37 #include "absl/time/time.h"
38 #include "absl/types/optional.h"
39 #include "google/api/monitored_resource.pb.h"
40 #include "google/devtools/cloudtrace/v2/tracing.grpc.pb.h"
41 #include "google/monitoring/v3/metric_service.grpc.pb.h"
42 #include "opencensus/exporters/stats/stackdriver/stackdriver_exporter.h"
43 #include "opencensus/exporters/trace/stackdriver/stackdriver_exporter.h"
44 #include "opencensus/stats/stats.h"
45 #include "opencensus/trace/sampler.h"
46 #include "opencensus/trace/trace_config.h"
47 #include "src/core/ext/filters/logging/logging_filter.h"
48 #include "src/core/util/crash.h"
49 #include "src/core/util/notification.h"
50 #include "src/cpp/client/client_stats_interceptor.h"
51 #include "src/cpp/ext/filters/census/client_filter.h"
52 #include "src/cpp/ext/filters/census/grpc_plugin.h"
53 #include "src/cpp/ext/filters/census/open_census_call_tracer.h"
54 #include "src/cpp/ext/gcp/environment_autodetect.h"
55 #include "src/cpp/ext/gcp/observability_config.h"
56 #include "src/cpp/ext/gcp/observability_logging_sink.h"
57
58 namespace grpc {
59
60 namespace internal {
61 namespace {
62
63 grpc::internal::ObservabilityLoggingSink* g_logging_sink = nullptr;
64
65 bool g_gcp_observability_initialized = false;
66
67 // TODO(yashykt): These constants are currently derived from the example at
68 // https://cloud.google.com/traffic-director/docs/observability-proxyless#c++.
69 // We might want these to be configurable.
70 constexpr uint32_t kMaxAttributes = 128;
71 constexpr uint32_t kMaxAnnotations = 128;
72 constexpr uint32_t kMaxMessageEvents = 128;
73 constexpr uint32_t kMaxLinks = 128;
74
75 constexpr char kGoogleStackdriverTraceAddress[] = "cloudtrace.googleapis.com";
76 constexpr char kGoogleStackdriverStatsAddress[] = "monitoring.googleapis.com";
77
RegisterOpenCensusViewsForGcpObservability()78 void RegisterOpenCensusViewsForGcpObservability() {
79 // Register client default views for GCP observability
80 experimental::ClientStartedRpcs().RegisterForExport();
81 experimental::ClientCompletedRpcs().RegisterForExport();
82 experimental::ClientRoundtripLatency().RegisterForExport();
83 internal::ClientApiLatency().RegisterForExport();
84 experimental::ClientSentCompressedMessageBytesPerRpc().RegisterForExport();
85 experimental::ClientReceivedCompressedMessageBytesPerRpc()
86 .RegisterForExport();
87 // Register server default views for GCP observability
88 experimental::ServerStartedRpcs().RegisterForExport();
89 experimental::ServerCompletedRpcs().RegisterForExport();
90 experimental::ServerSentCompressedMessageBytesPerRpc().RegisterForExport();
91 experimental::ServerReceivedCompressedMessageBytesPerRpc()
92 .RegisterForExport();
93 experimental::ServerServerLatency().RegisterForExport();
94 }
95
96 } // namespace
97
GcpObservabilityInit()98 absl::Status GcpObservabilityInit() {
99 auto config = grpc::internal::GcpObservabilityConfig::ReadFromEnv();
100 if (!config.ok()) {
101 return config.status();
102 }
103 if (!config->cloud_trace.has_value() &&
104 !config->cloud_monitoring.has_value() &&
105 !config->cloud_logging.has_value()) {
106 return absl::OkStatus();
107 }
108 if (g_gcp_observability_initialized) {
109 grpc_core::Crash("GCP Observability for gRPC was already initialized.");
110 }
111 g_gcp_observability_initialized = true;
112 grpc::internal::EnvironmentAutoDetect::Create(config->project_id);
113 if (!config->cloud_trace.has_value()) {
114 // Disable OpenCensus tracing
115 grpc::internal::EnableOpenCensusTracing(false);
116 }
117 if (!config->cloud_monitoring.has_value()) {
118 // Disable OpenCensus stats
119 grpc::internal::EnableOpenCensusStats(false);
120 } else {
121 // Register the OpenCensus client stats interceptor factory if stats are
122 // enabled. Note that this is currently separate from the OpenCensus Plugin
123 // to avoid changing the behavior of the currently available OpenCensus
124 // plugin.
125 grpc::internal::RegisterGlobalClientStatsInterceptorFactory(
126 new grpc::internal::OpenCensusClientInterceptorFactory);
127 }
128 if (config->cloud_logging.has_value()) {
129 g_logging_sink = new grpc::internal::ObservabilityLoggingSink(
130 config->cloud_logging.value(), config->project_id, config->labels);
131 grpc_core::RegisterLoggingFilter(g_logging_sink);
132 }
133 // If tracing or monitoring is enabled, we need to register the OpenCensus
134 // plugin as well.
135 if (config->cloud_trace.has_value() || config->cloud_monitoring.has_value()) {
136 grpc::RegisterOpenCensusPlugin();
137 }
138 // If tracing or monitoring is enabled, we need to detect the environment for
139 // OpenCensus, set the labels and attributes and prepare the StackDriver
140 // exporter.
141 // Note that this should be the last step of GcpObservabilityInit() since we
142 // can't register any more filters after grpc_init.
143 if (config->cloud_trace.has_value() || config->cloud_monitoring.has_value()) {
144 grpc_init();
145 grpc_core::Notification notification;
146 grpc::internal::EnvironmentAutoDetect::Get().NotifyOnDone(
147 [&]() { notification.Notify(); });
148 notification.WaitForNotification();
149 auto* resource = grpc::internal::EnvironmentAutoDetect::Get().resource();
150 if (config->cloud_trace.has_value()) {
151 // Set up attributes for constant tracing
152 std::vector<internal::OpenCensusRegistry::Attribute> attributes;
153 attributes.reserve(resource->labels.size() + config->labels.size());
154 // First insert in environment labels
155 for (const auto& resource_label : resource->labels) {
156 attributes.push_back(internal::OpenCensusRegistry::Attribute{
157 absl::StrCat(resource->resource_type, ".", resource_label.first),
158 resource_label.second});
159 }
160 // Then insert in labels from the GCP Observability config.
161 for (const auto& constant_label : config->labels) {
162 attributes.push_back(internal::OpenCensusRegistry::Attribute{
163 constant_label.first, constant_label.second});
164 }
165 grpc::internal::OpenCensusRegistry::Get().RegisterConstantAttributes(
166 std::move(attributes));
167 }
168 if (config->cloud_monitoring.has_value()) {
169 grpc::internal::OpenCensusRegistry::Get().RegisterConstantLabels(
170 config->labels);
171 RegisterOpenCensusViewsForGcpObservability();
172 }
173 // Note that we are setting up the exporters after registering the
174 // attributes and labels to avoid a case where the exporters start an RPC
175 // before we are ready.
176 if (config->cloud_trace.has_value()) {
177 // Set up the StackDriver Exporter for tracing.
178 opencensus::trace::TraceConfig::SetCurrentTraceParams(
179 {kMaxAttributes, kMaxAnnotations, kMaxMessageEvents, kMaxLinks,
180 opencensus::trace::ProbabilitySampler(
181 config->cloud_trace->sampling_rate)});
182 opencensus::exporters::trace::StackdriverOptions trace_opts;
183 trace_opts.project_id = config->project_id;
184 ChannelArguments args;
185 args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
186 trace_opts.trace_service_stub =
187 ::google::devtools::cloudtrace::v2::TraceService::NewStub(
188 CreateCustomChannel(kGoogleStackdriverTraceAddress,
189 GoogleDefaultCredentials(), args));
190 opencensus::exporters::trace::StackdriverExporter::Register(
191 std::move(trace_opts));
192 }
193 if (config->cloud_monitoring.has_value()) {
194 // Set up the StackDriver Exporter for monitoring.
195 opencensus::exporters::stats::StackdriverOptions stats_opts;
196 stats_opts.project_id = config->project_id;
197 stats_opts.monitored_resource.set_type(resource->resource_type);
198 stats_opts.monitored_resource.mutable_labels()->insert(
199 resource->labels.begin(), resource->labels.end());
200 ChannelArguments args;
201 args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
202 stats_opts.metric_service_stub =
203 google::monitoring::v3::MetricService::NewStub(
204 CreateCustomChannel(kGoogleStackdriverStatsAddress,
205 GoogleDefaultCredentials(), args));
206 opencensus::exporters::stats::StackdriverExporter::Register(
207 std::move(stats_opts));
208 }
209 grpc_shutdown();
210 }
211 return absl::OkStatus();
212 }
213
GcpObservabilityClose()214 void GcpObservabilityClose() {
215 if (g_logging_sink != nullptr) {
216 g_logging_sink->FlushAndClose();
217 }
218 // Currently, GcpObservabilityClose() only supports flushing logs. Stats and
219 // tracing get automatically flushed at a regular interval, so sleep for an
220 // interval to make sure that those are flushed too.
221 absl::SleepFor(absl::Seconds(25));
222 }
223
224 } // namespace internal
225
226 namespace experimental {
227
GcpObservabilityInit()228 absl::Status GcpObservabilityInit() {
229 return grpc::internal::GcpObservabilityInit();
230 }
231
GcpObservabilityClose()232 void GcpObservabilityClose() { return grpc::internal::GcpObservabilityClose(); }
233
234 } // namespace experimental
235
236 //
237 // GcpObservability
238 //
239
Init()240 absl::StatusOr<GcpObservability> GcpObservability::Init() {
241 absl::Status status = grpc::internal::GcpObservabilityInit();
242 if (!status.ok()) {
243 return status;
244 }
245 GcpObservability obj;
246 obj.impl_ = std::make_unique<GcpObservabilityImpl>();
247 return obj;
248 }
249
GcpObservability(GcpObservability && other)250 GcpObservability::GcpObservability(GcpObservability&& other) noexcept
251 : impl_(std::move(other.impl_)) {}
252
operator =(GcpObservability && other)253 GcpObservability& GcpObservability::operator=(
254 GcpObservability&& other) noexcept {
255 if (this != &other) {
256 impl_ = std::move(other.impl_);
257 }
258 return *this;
259 }
260
261 //
262 // GcpObservability::GcpObservabilityImpl
263 //
264
~GcpObservabilityImpl()265 GcpObservability::GcpObservabilityImpl::~GcpObservabilityImpl() {
266 grpc::internal::GcpObservabilityClose();
267 }
268
269 } // namespace grpc
270