1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/cpp/ext/otel/otel_plugin.h"
20
21 #include <grpc/support/port_platform.h>
22 #include <grpcpp/ext/otel_plugin.h>
23 #include <grpcpp/version_info.h>
24
25 #include <memory>
26 #include <type_traits>
27 #include <utility>
28
29 #include "absl/log/check.h"
30 #include "opentelemetry/metrics/meter.h"
31 #include "opentelemetry/metrics/meter_provider.h"
32 #include "opentelemetry/metrics/sync_instruments.h"
33 #include "opentelemetry/nostd/shared_ptr.h"
34 #include "opentelemetry/nostd/unique_ptr.h"
35 #include "opentelemetry/nostd/variant.h"
36 #include "src/core/client_channel/client_channel_filter.h"
37 #include "src/core/config/core_configuration.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/surface/channel_stack_type.h"
40 #include "src/core/telemetry/call_tracer.h"
41 #include "src/core/util/match.h"
42 #include "src/cpp/ext/otel/key_value_iterable.h"
43 #include "src/cpp/ext/otel/otel_client_call_tracer.h"
44 #include "src/cpp/ext/otel/otel_server_call_tracer.h"
45
46 namespace grpc {
47 namespace internal {
48
OpenTelemetryMethodKey()49 absl::string_view OpenTelemetryMethodKey() { return "grpc.method"; }
50
OpenTelemetryStatusKey()51 absl::string_view OpenTelemetryStatusKey() { return "grpc.status"; }
52
OpenTelemetryTargetKey()53 absl::string_view OpenTelemetryTargetKey() { return "grpc.target"; }
54
55 namespace {
BaseMetrics()56 absl::flat_hash_set<std::string> BaseMetrics() {
57 absl::flat_hash_set<std::string> base_metrics{
58 std::string(grpc::OpenTelemetryPluginBuilder::
59 kClientAttemptStartedInstrumentName),
60 std::string(grpc::OpenTelemetryPluginBuilder::
61 kClientAttemptDurationInstrumentName),
62 std::string(
63 grpc::OpenTelemetryPluginBuilder::
64 kClientAttemptSentTotalCompressedMessageSizeInstrumentName),
65 std::string(
66 grpc::OpenTelemetryPluginBuilder::
67 kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName),
68 std::string(
69 grpc::OpenTelemetryPluginBuilder::kServerCallStartedInstrumentName),
70 std::string(
71 grpc::OpenTelemetryPluginBuilder::kServerCallDurationInstrumentName),
72 std::string(grpc::OpenTelemetryPluginBuilder::
73 kServerCallSentTotalCompressedMessageSizeInstrumentName),
74 std::string(grpc::OpenTelemetryPluginBuilder::
75 kServerCallRcvdTotalCompressedMessageSizeInstrumentName)};
76 grpc_core::GlobalInstrumentsRegistry::ForEach(
77 [&](const grpc_core::GlobalInstrumentsRegistry::
78 GlobalInstrumentDescriptor& descriptor) {
79 if (descriptor.enable_by_default) {
80 base_metrics.emplace(descriptor.name);
81 }
82 });
83 return base_metrics;
84 }
85 } // namespace
86
87 class OpenTelemetryPluginImpl::NPCMetricsKeyValueIterable
88 : public opentelemetry::common::KeyValueIterable {
89 public:
NPCMetricsKeyValueIterable(absl::Span<const absl::string_view> label_keys,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_label_keys,absl::Span<const absl::string_view> optional_label_values,const OptionalLabelsBitSet & optional_labels_bits)90 NPCMetricsKeyValueIterable(
91 absl::Span<const absl::string_view> label_keys,
92 absl::Span<const absl::string_view> label_values,
93 absl::Span<const absl::string_view> optional_label_keys,
94 absl::Span<const absl::string_view> optional_label_values,
95 const OptionalLabelsBitSet& optional_labels_bits)
96 : label_keys_(label_keys),
97 label_values_(label_values),
98 optional_label_keys_(optional_label_keys),
99 optional_label_values_(optional_label_values),
100 optional_labels_bits_(optional_labels_bits) {}
101
ForEachKeyValue(opentelemetry::nostd::function_ref<bool (opentelemetry::nostd::string_view,opentelemetry::common::AttributeValue)> callback) const102 bool ForEachKeyValue(opentelemetry::nostd::function_ref<
103 bool(opentelemetry::nostd::string_view,
104 opentelemetry::common::AttributeValue)>
105 callback) const noexcept override {
106 for (size_t i = 0; i < label_keys_.size(); ++i) {
107 if (!callback(AbslStrViewToOpenTelemetryStrView(label_keys_[i]),
108 AbslStrViewToOpenTelemetryStrView(label_values_[i]))) {
109 return false;
110 }
111 }
112 // Since we are saving the optional label values as std::string for callback
113 // gauges, we want to minimize memory usage by filtering out the disabled
114 // optional label values.
115 bool filtered = optional_label_values_.size() < optional_label_keys_.size();
116 for (size_t i = 0, j = 0; i < optional_label_keys_.size(); ++i) {
117 if (!optional_labels_bits_.test(i)) {
118 if (!filtered) ++j;
119 continue;
120 }
121 if (!callback(
122 AbslStrViewToOpenTelemetryStrView(optional_label_keys_[i]),
123 AbslStrViewToOpenTelemetryStrView(optional_label_values_[j++]))) {
124 return false;
125 }
126 }
127 return true;
128 }
129
size() const130 size_t size() const noexcept override {
131 return label_keys_.size() + optional_labels_bits_.count();
132 }
133
134 private:
135 absl::Span<const absl::string_view> label_keys_;
136 absl::Span<const absl::string_view> label_values_;
137 absl::Span<const absl::string_view> optional_label_keys_;
138 absl::Span<const absl::string_view> optional_label_values_;
139 const OptionalLabelsBitSet& optional_labels_bits_;
140 };
141
142 //
143 // OpenTelemetryPluginBuilderImpl
144 //
145
OpenTelemetryPluginBuilderImpl()146 OpenTelemetryPluginBuilderImpl::OpenTelemetryPluginBuilderImpl()
147 : metrics_(BaseMetrics()) {}
148
149 OpenTelemetryPluginBuilderImpl::~OpenTelemetryPluginBuilderImpl() = default;
150
151 OpenTelemetryPluginBuilderImpl&
SetMeterProvider(std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider)152 OpenTelemetryPluginBuilderImpl::SetMeterProvider(
153 std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) {
154 meter_provider_ = std::move(meter_provider);
155 return *this;
156 }
157
EnableMetrics(absl::Span<const absl::string_view> metric_names)158 OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::EnableMetrics(
159 absl::Span<const absl::string_view> metric_names) {
160 for (const auto& metric_name : metric_names) {
161 metrics_.emplace(metric_name);
162 }
163 return *this;
164 }
165
DisableMetrics(absl::Span<const absl::string_view> metric_names)166 OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::DisableMetrics(
167 absl::Span<const absl::string_view> metric_names) {
168 for (const auto& metric_name : metric_names) {
169 metrics_.erase(metric_name);
170 }
171 return *this;
172 }
173
174 OpenTelemetryPluginBuilderImpl&
DisableAllMetrics()175 OpenTelemetryPluginBuilderImpl::DisableAllMetrics() {
176 metrics_.clear();
177 return *this;
178 }
179
180 OpenTelemetryPluginBuilderImpl&
SetTargetAttributeFilter(absl::AnyInvocable<bool (absl::string_view)const> target_attribute_filter)181 OpenTelemetryPluginBuilderImpl::SetTargetAttributeFilter(
182 absl::AnyInvocable<bool(absl::string_view /*target*/) const>
183 target_attribute_filter) {
184 target_attribute_filter_ = std::move(target_attribute_filter);
185 return *this;
186 }
187
188 OpenTelemetryPluginBuilderImpl&
SetGenericMethodAttributeFilter(absl::AnyInvocable<bool (absl::string_view)const> generic_method_attribute_filter)189 OpenTelemetryPluginBuilderImpl::SetGenericMethodAttributeFilter(
190 absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
191 generic_method_attribute_filter) {
192 generic_method_attribute_filter_ = std::move(generic_method_attribute_filter);
193 return *this;
194 }
195
196 OpenTelemetryPluginBuilderImpl&
SetServerSelector(absl::AnyInvocable<bool (const grpc_core::ChannelArgs &)const> server_selector)197 OpenTelemetryPluginBuilderImpl::SetServerSelector(
198 absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
199 server_selector) {
200 server_selector_ = std::move(server_selector);
201 return *this;
202 }
203
AddPluginOption(std::unique_ptr<InternalOpenTelemetryPluginOption> option)204 OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::AddPluginOption(
205 std::unique_ptr<InternalOpenTelemetryPluginOption> option) {
206 // We allow a limit of 64 plugin options to be registered at this time.
207 CHECK_LT(plugin_options_.size(), 64u);
208 plugin_options_.push_back(std::move(option));
209 return *this;
210 }
211
212 OpenTelemetryPluginBuilderImpl&
AddOptionalLabel(absl::string_view optional_label_key)213 OpenTelemetryPluginBuilderImpl::AddOptionalLabel(
214 absl::string_view optional_label_key) {
215 optional_label_keys_.emplace(optional_label_key);
216 return *this;
217 }
218
219 OpenTelemetryPluginBuilderImpl&
SetChannelScopeFilter(absl::AnyInvocable<bool (const OpenTelemetryPluginBuilder::ChannelScope &)const> channel_scope_filter)220 OpenTelemetryPluginBuilderImpl::SetChannelScopeFilter(
221 absl::AnyInvocable<
222 bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
223 channel_scope_filter) {
224 channel_scope_filter_ = std::move(channel_scope_filter);
225 return *this;
226 }
227
BuildAndRegisterGlobal()228 absl::Status OpenTelemetryPluginBuilderImpl::BuildAndRegisterGlobal() {
229 if (meter_provider_ == nullptr) {
230 return absl::InvalidArgumentError(
231 "Need to configure a valid meter provider.");
232 }
233 grpc_core::GlobalStatsPluginRegistry::RegisterStatsPlugin(
234 std::make_shared<OpenTelemetryPluginImpl>(
235 metrics_, meter_provider_, std::move(target_attribute_filter_),
236 std::move(generic_method_attribute_filter_),
237 std::move(server_selector_), std::move(plugin_options_),
238 std::move(optional_label_keys_), std::move(channel_scope_filter_)));
239 return absl::OkStatus();
240 }
241
242 absl::StatusOr<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>>
Build()243 OpenTelemetryPluginBuilderImpl::Build() {
244 if (meter_provider_ == nullptr) {
245 return absl::InvalidArgumentError(
246 "Need to configure a valid meter provider.");
247 }
248 return std::make_shared<OpenTelemetryPluginImpl>(
249 metrics_, meter_provider_, std::move(target_attribute_filter_),
250 std::move(generic_method_attribute_filter_), std::move(server_selector_),
251 std::move(plugin_options_), std::move(optional_label_keys_),
252 std::move(channel_scope_filter_));
253 }
254
CallbackMetricReporter(OpenTelemetryPluginImpl * ot_plugin,grpc_core::RegisteredMetricCallback * key)255 OpenTelemetryPluginImpl::CallbackMetricReporter::CallbackMetricReporter(
256 OpenTelemetryPluginImpl* ot_plugin,
257 grpc_core::RegisteredMetricCallback* key)
258 : ot_plugin_(ot_plugin), key_(key) {
259 // Since we are updating the timestamp and updating the cache for all
260 // registered instruments in a RegisteredMetricCallback, we will need to
261 // clear all the cache cells for this RegisteredMetricCallback first, so
262 // that if a particular combination of labels was previously present but
263 // is no longer present, we won't continue to report it.
264 for (const auto& handle : key->metrics()) {
265 const auto& descriptor =
266 grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
267 CHECK(descriptor.instrument_type ==
268 grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
269 switch (descriptor.value_type) {
270 case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: {
271 auto& callback_gauge_state =
272 absl::get<std::unique_ptr<CallbackGaugeState<int64_t>>>(
273 ot_plugin_->instruments_data_.at(handle.index).instrument);
274 callback_gauge_state->caches[key].clear();
275 break;
276 }
277 case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
278 auto& callback_gauge_state =
279 absl::get<std::unique_ptr<CallbackGaugeState<double>>>(
280 ot_plugin_->instruments_data_.at(handle.index).instrument);
281 callback_gauge_state->caches[key].clear();
282 break;
283 }
284 default:
285 grpc_core::Crash(absl::StrFormat(
286 "Unknown or unsupported value type: %d", descriptor.value_type));
287 }
288 }
289 }
290
ReportInt64(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,int64_t value,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_values)291 void OpenTelemetryPluginImpl::CallbackMetricReporter::ReportInt64(
292 grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
293 int64_t value, absl::Span<const absl::string_view> label_values,
294 absl::Span<const absl::string_view> optional_values) {
295 const auto& instrument_data = ot_plugin_->instruments_data_.at(handle.index);
296 auto* callback_gauge_state =
297 absl::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>(
298 &instrument_data.instrument);
299 CHECK_NE(callback_gauge_state, nullptr);
300 const auto& descriptor =
301 grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
302 CHECK(descriptor.label_keys.size() == label_values.size());
303 CHECK(descriptor.optional_label_keys.size() == optional_values.size());
304 if ((*callback_gauge_state)->caches.find(key_) ==
305 (*callback_gauge_state)->caches.end()) {
306 LOG(ERROR) << "This may occur when the gauge used in AddCallback is "
307 "different from the gauge used in Report. This indicates a "
308 "misuse of the API. The value "
309 << value << " will not be recorded for instrument "
310 << handle.index;
311 return;
312 }
313 auto& cell = (*callback_gauge_state)->caches.at(key_);
314 std::vector<std::string> key;
315 key.reserve(label_values.size() +
316 instrument_data.optional_labels_bits.count());
317 for (const absl::string_view value : label_values) {
318 key.emplace_back(value);
319 }
320 for (size_t i = 0; i < optional_values.size(); ++i) {
321 if (instrument_data.optional_labels_bits.test(i)) {
322 key.emplace_back(optional_values[i]);
323 }
324 }
325 cell.insert_or_assign(std::move(key), value);
326 }
327
ReportDouble(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,double value,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_values)328 void OpenTelemetryPluginImpl::CallbackMetricReporter::ReportDouble(
329 grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
330 double value, absl::Span<const absl::string_view> label_values,
331 absl::Span<const absl::string_view> optional_values) {
332 const auto& instrument_data = ot_plugin_->instruments_data_.at(handle.index);
333 auto* callback_gauge_state =
334 absl::get_if<std::unique_ptr<CallbackGaugeState<double>>>(
335 &instrument_data.instrument);
336 CHECK_NE(callback_gauge_state, nullptr);
337 const auto& descriptor =
338 grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
339 CHECK(descriptor.label_keys.size() == label_values.size());
340 CHECK(descriptor.optional_label_keys.size() == optional_values.size());
341 if ((*callback_gauge_state)->caches.find(key_) ==
342 (*callback_gauge_state)->caches.end()) {
343 LOG(ERROR) << "This may occur when the gauge used in AddCallback is "
344 "different from the gauge used in Report. This indicates a "
345 "misuse of the API. The value "
346 << value << " will not be recorded for instrument "
347 << handle.index;
348 return;
349 }
350 auto& cell = (*callback_gauge_state)->caches.at(key_);
351 std::vector<std::string> key;
352 key.reserve(label_values.size() +
353 instrument_data.optional_labels_bits.count());
354 for (const absl::string_view value : label_values) {
355 key.emplace_back(value);
356 }
357 for (size_t i = 0; i < optional_values.size(); ++i) {
358 if (instrument_data.optional_labels_bits.test(i)) {
359 key.emplace_back(optional_values[i]);
360 }
361 }
362 cell.insert_or_assign(std::move(key), value);
363 }
364
UpdateArguments(grpc::ChannelArguments * args)365 void OpenTelemetryPluginImpl::ServerBuilderOption::UpdateArguments(
366 grpc::ChannelArguments* args) {
367 plugin_->AddToChannelArguments(args);
368 }
369
OpenTelemetryPluginImpl(const absl::flat_hash_set<std::string> & metrics,opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider,absl::AnyInvocable<bool (absl::string_view)const> target_attribute_filter,absl::AnyInvocable<bool (absl::string_view)const> generic_method_attribute_filter,absl::AnyInvocable<bool (const grpc_core::ChannelArgs &)const> server_selector,std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>> plugin_options,const std::set<absl::string_view> & optional_label_keys,absl::AnyInvocable<bool (const OpenTelemetryPluginBuilder::ChannelScope &)const> channel_scope_filter)370 OpenTelemetryPluginImpl::OpenTelemetryPluginImpl(
371 const absl::flat_hash_set<std::string>& metrics,
372 opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
373 meter_provider,
374 absl::AnyInvocable<bool(absl::string_view /*target*/) const>
375 target_attribute_filter,
376 absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
377 generic_method_attribute_filter,
378 absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
379 server_selector,
380 std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>
381 plugin_options,
382 const std::set<absl::string_view>& optional_label_keys,
383 absl::AnyInvocable<
384 bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
385 channel_scope_filter)
386 : meter_provider_(std::move(meter_provider)),
387 server_selector_(std::move(server_selector)),
388 target_attribute_filter_(std::move(target_attribute_filter)),
389 generic_method_attribute_filter_(
390 std::move(generic_method_attribute_filter)),
391 plugin_options_(std::move(plugin_options)),
392 channel_scope_filter_(std::move(channel_scope_filter)) {
393 auto meter = meter_provider_->GetMeter("grpc-c++", GRPC_CPP_VERSION_STRING);
394 // Per-call metrics.
395 if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
396 kClientAttemptStartedInstrumentName)) {
397 client_.attempt.started = meter->CreateUInt64Counter(
398 std::string(grpc::OpenTelemetryPluginBuilder::
399 kClientAttemptStartedInstrumentName),
400 "Number of client call attempts started", "{attempt}");
401 }
402 if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
403 kClientAttemptDurationInstrumentName)) {
404 client_.attempt.duration = meter->CreateDoubleHistogram(
405 std::string(grpc::OpenTelemetryPluginBuilder::
406 kClientAttemptDurationInstrumentName),
407 "End-to-end time taken to complete a client call attempt", "s");
408 }
409 if (metrics.contains(
410 grpc::OpenTelemetryPluginBuilder::
411 kClientAttemptSentTotalCompressedMessageSizeInstrumentName)) {
412 client_.attempt.sent_total_compressed_message_size =
413 meter->CreateUInt64Histogram(
414 std::string(
415 grpc::OpenTelemetryPluginBuilder::
416 kClientAttemptSentTotalCompressedMessageSizeInstrumentName),
417 "Compressed message bytes sent per client call attempt", "By");
418 }
419 if (metrics.contains(
420 grpc::OpenTelemetryPluginBuilder::
421 kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName)) {
422 client_.attempt.rcvd_total_compressed_message_size =
423 meter->CreateUInt64Histogram(
424 std::string(
425 grpc::OpenTelemetryPluginBuilder::
426 kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName),
427 "Compressed message bytes received per call attempt", "By");
428 }
429 if (metrics.contains(
430 grpc::OpenTelemetryPluginBuilder::kServerCallStartedInstrumentName)) {
431 server_.call.started = meter->CreateUInt64Counter(
432 std::string(
433 grpc::OpenTelemetryPluginBuilder::kServerCallStartedInstrumentName),
434 "Number of server calls started", "{call}");
435 }
436 if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
437 kServerCallDurationInstrumentName)) {
438 server_.call.duration = meter->CreateDoubleHistogram(
439 std::string(grpc::OpenTelemetryPluginBuilder::
440 kServerCallDurationInstrumentName),
441 "End-to-end time taken to complete a call from server transport's "
442 "perspective",
443 "s");
444 }
445 if (metrics.contains(
446 grpc::OpenTelemetryPluginBuilder::
447 kServerCallSentTotalCompressedMessageSizeInstrumentName)) {
448 server_.call.sent_total_compressed_message_size =
449 meter->CreateUInt64Histogram(
450 std::string(
451 grpc::OpenTelemetryPluginBuilder::
452 kServerCallSentTotalCompressedMessageSizeInstrumentName),
453 "Compressed message bytes sent per server call", "By");
454 }
455 if (metrics.contains(
456 grpc::OpenTelemetryPluginBuilder::
457 kServerCallRcvdTotalCompressedMessageSizeInstrumentName)) {
458 server_.call.rcvd_total_compressed_message_size =
459 meter->CreateUInt64Histogram(
460 std::string(
461 grpc::OpenTelemetryPluginBuilder::
462 kServerCallRcvdTotalCompressedMessageSizeInstrumentName),
463 "Compressed message bytes received per server call", "By");
464 }
465 // Store optional label keys for per call metrics
466 CHECK(static_cast<size_t>(grpc_core::ClientCallTracer::CallAttemptTracer::
467 OptionalLabelKey::kSize) <=
468 kOptionalLabelsSizeLimit);
469 for (const auto& key : optional_label_keys) {
470 auto optional_key = OptionalLabelStringToKey(key);
471 if (optional_key.has_value()) {
472 per_call_optional_label_bits_.set(
473 static_cast<size_t>(optional_key.value()));
474 }
475 }
476 // Non-per-call metrics.
477 grpc_core::GlobalInstrumentsRegistry::ForEach(
478 [&, this](const grpc_core::GlobalInstrumentsRegistry::
479 GlobalInstrumentDescriptor& descriptor) {
480 CHECK(descriptor.optional_label_keys.size() <=
481 kOptionalLabelsSizeLimit);
482 if (instruments_data_.size() < descriptor.index + 1) {
483 instruments_data_.resize(descriptor.index + 1);
484 }
485 if (!metrics.contains(descriptor.name)) {
486 return;
487 }
488 switch (descriptor.instrument_type) {
489 case grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter:
490 switch (descriptor.value_type) {
491 case grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64:
492 instruments_data_[descriptor.index].instrument =
493 meter->CreateUInt64Counter(
494 std::string(descriptor.name),
495 std::string(descriptor.description),
496 std::string(descriptor.unit));
497 break;
498 case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble:
499 instruments_data_[descriptor.index].instrument =
500 meter->CreateDoubleCounter(
501 std::string(descriptor.name),
502 std::string(descriptor.description),
503 std::string(descriptor.unit));
504 break;
505 default:
506 grpc_core::Crash(
507 absl::StrFormat("Unknown or unsupported value type: %d",
508 descriptor.value_type));
509 }
510 break;
511 case grpc_core::GlobalInstrumentsRegistry::InstrumentType::kHistogram:
512 switch (descriptor.value_type) {
513 case grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64:
514 instruments_data_[descriptor.index].instrument =
515 meter->CreateUInt64Histogram(
516 std::string(descriptor.name),
517 std::string(descriptor.description),
518 std::string(descriptor.unit));
519 break;
520 case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble:
521 instruments_data_[descriptor.index].instrument =
522 meter->CreateDoubleHistogram(
523 std::string(descriptor.name),
524 std::string(descriptor.description),
525 std::string(descriptor.unit));
526 break;
527 default:
528 grpc_core::Crash(
529 absl::StrFormat("Unknown or unsupported value type: %d",
530 descriptor.value_type));
531 }
532 break;
533 case grpc_core::GlobalInstrumentsRegistry::InstrumentType::
534 kCallbackGauge:
535 switch (descriptor.value_type) {
536 case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: {
537 auto observable_state =
538 std::make_unique<CallbackGaugeState<int64_t>>();
539 observable_state->id = descriptor.index;
540 observable_state->ot_plugin = this;
541 observable_state->instrument =
542 meter->CreateInt64ObservableGauge(
543 std::string(descriptor.name),
544 std::string(descriptor.description),
545 std::string(descriptor.unit));
546 instruments_data_[descriptor.index].instrument =
547 std::move(observable_state);
548 break;
549 }
550 case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
551 auto observable_state =
552 std::make_unique<CallbackGaugeState<double>>();
553 observable_state->id = descriptor.index;
554 observable_state->ot_plugin = this;
555 observable_state->instrument =
556 meter->CreateDoubleObservableGauge(
557 std::string(descriptor.name),
558 std::string(descriptor.description),
559 std::string(descriptor.unit));
560 instruments_data_[descriptor.index].instrument =
561 std::move(observable_state);
562 break;
563 }
564 default:
565 grpc_core::Crash(
566 absl::StrFormat("Unknown or unsupported value type: %d",
567 descriptor.value_type));
568 }
569 break;
570 default:
571 grpc_core::Crash(absl::StrFormat("Unknown instrument_type: %d",
572 descriptor.instrument_type));
573 }
574 for (size_t i = 0; i < descriptor.optional_label_keys.size(); ++i) {
575 if (optional_label_keys.find(descriptor.optional_label_keys[i]) !=
576 optional_label_keys.end()) {
577 instruments_data_[descriptor.index].optional_labels_bits.set(i);
578 }
579 }
580 });
581 }
582
~OpenTelemetryPluginImpl()583 OpenTelemetryPluginImpl::~OpenTelemetryPluginImpl() {
584 for (const auto& instrument_data : instruments_data_) {
585 grpc_core::Match(
586 instrument_data.instrument, [](const Disabled&) {},
587 [](const std::unique_ptr<opentelemetry::metrics::Counter<double>>&) {},
588 [](const std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>&) {
589 },
590 [](const std::unique_ptr<
591 opentelemetry::metrics::Histogram<uint64_t>>&) {},
592 [](const std::unique_ptr<opentelemetry::metrics::Histogram<double>>&) {
593 },
594 [](const std::unique_ptr<CallbackGaugeState<int64_t>>& state) {
595 CHECK(state->caches.empty());
596 if (state->ot_callback_registered) {
597 state->instrument->RemoveCallback(
598 &CallbackGaugeState<int64_t>::CallbackGaugeCallback,
599 state.get());
600 state->ot_callback_registered = false;
601 }
602 },
603 [](const std::unique_ptr<CallbackGaugeState<double>>& state) {
604 CHECK(state->caches.empty());
605 if (state->ot_callback_registered) {
606 state->instrument->RemoveCallback(
607 &CallbackGaugeState<double>::CallbackGaugeCallback,
608 state.get());
609 state->ot_callback_registered = false;
610 }
611 });
612 }
613 }
614
615 namespace {
616 constexpr absl::string_view kLocality = "grpc.lb.locality";
617 }
618
OptionalLabelKeyToString(grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey key)619 absl::string_view OpenTelemetryPluginImpl::OptionalLabelKeyToString(
620 grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey key) {
621 switch (key) {
622 case grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey::
623 kLocality:
624 return kLocality;
625 default:
626 grpc_core::Crash("Illegal OptionalLabelKey index");
627 }
628 }
629
630 absl::optional<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey>
OptionalLabelStringToKey(absl::string_view key)631 OpenTelemetryPluginImpl::OptionalLabelStringToKey(absl::string_view key) {
632 if (key == kLocality) {
633 return grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey::
634 kLocality;
635 }
636 return absl::nullopt;
637 }
638
639 std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
IsEnabledForChannel(const OpenTelemetryPluginBuilder::ChannelScope & scope) const640 OpenTelemetryPluginImpl::IsEnabledForChannel(
641 const OpenTelemetryPluginBuilder::ChannelScope& scope) const {
642 if (channel_scope_filter_ == nullptr || channel_scope_filter_(scope)) {
643 return {true, std::make_shared<ClientScopeConfig>(this, scope)};
644 }
645 return {false, nullptr};
646 }
647
648 std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
IsEnabledForServer(const grpc_core::ChannelArgs & args) const649 OpenTelemetryPluginImpl::IsEnabledForServer(
650 const grpc_core::ChannelArgs& args) const {
651 // Return true only if there is no server selector registered or if the
652 // server selector returns true.
653 if (server_selector_ == nullptr || server_selector_(args)) {
654 return {true, std::make_shared<ServerScopeConfig>(this, args)};
655 }
656 return {false, nullptr};
657 }
658
659 std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>
GetChannelScopeConfig(const OpenTelemetryPluginBuilder::ChannelScope & scope) const660 OpenTelemetryPluginImpl::GetChannelScopeConfig(
661 const OpenTelemetryPluginBuilder::ChannelScope& scope) const {
662 CHECK(channel_scope_filter_ == nullptr || channel_scope_filter_(scope));
663 return std::make_shared<ClientScopeConfig>(this, scope);
664 }
665
666 std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>
GetServerScopeConfig(const grpc_core::ChannelArgs & args) const667 OpenTelemetryPluginImpl::GetServerScopeConfig(
668 const grpc_core::ChannelArgs& args) const {
669 CHECK(server_selector_ == nullptr || server_selector_(args));
670 return std::make_shared<ServerScopeConfig>(this, args);
671 }
672
AddCounter(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,uint64_t value,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_values)673 void OpenTelemetryPluginImpl::AddCounter(
674 grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
675 uint64_t value, absl::Span<const absl::string_view> label_values,
676 absl::Span<const absl::string_view> optional_values) {
677 const auto& instrument_data = instruments_data_.at(handle.index);
678 if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
679 // This instrument is disabled.
680 return;
681 }
682 CHECK(absl::holds_alternative<
683 std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>>(
684 instrument_data.instrument));
685 const auto& descriptor =
686 grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
687 CHECK(descriptor.label_keys.size() == label_values.size());
688 CHECK(descriptor.optional_label_keys.size() == optional_values.size());
689 if (label_values.empty() && optional_values.empty()) {
690 absl::get<std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>>(
691 instrument_data.instrument)
692 ->Add(value);
693 } else {
694 absl::get<std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>>(
695 instrument_data.instrument)
696 ->Add(value, NPCMetricsKeyValueIterable(
697 descriptor.label_keys, label_values,
698 descriptor.optional_label_keys, optional_values,
699 instrument_data.optional_labels_bits));
700 }
701 }
702
AddCounter(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,double value,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_values)703 void OpenTelemetryPluginImpl::AddCounter(
704 grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
705 double value, absl::Span<const absl::string_view> label_values,
706 absl::Span<const absl::string_view> optional_values) {
707 const auto& instrument_data = instruments_data_.at(handle.index);
708 if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
709 // This instrument is disabled.
710 return;
711 }
712 CHECK(absl::holds_alternative<
713 std::unique_ptr<opentelemetry::metrics::Counter<double>>>(
714 instrument_data.instrument));
715 const auto& descriptor =
716 grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
717 CHECK(descriptor.label_keys.size() == label_values.size());
718 CHECK(descriptor.optional_label_keys.size() == optional_values.size());
719 if (label_values.empty() && optional_values.empty()) {
720 absl::get<std::unique_ptr<opentelemetry::metrics::Counter<double>>>(
721 instrument_data.instrument)
722 ->Add(value);
723 } else {
724 absl::get<std::unique_ptr<opentelemetry::metrics::Counter<double>>>(
725 instrument_data.instrument)
726 ->Add(value, NPCMetricsKeyValueIterable(
727 descriptor.label_keys, label_values,
728 descriptor.optional_label_keys, optional_values,
729 instrument_data.optional_labels_bits));
730 }
731 }
732
RecordHistogram(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,uint64_t value,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_values)733 void OpenTelemetryPluginImpl::RecordHistogram(
734 grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
735 uint64_t value, absl::Span<const absl::string_view> label_values,
736 absl::Span<const absl::string_view> optional_values) {
737 const auto& instrument_data = instruments_data_.at(handle.index);
738 if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
739 // This instrument is disabled.
740 return;
741 }
742 CHECK(absl::holds_alternative<
743 std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>>(
744 instrument_data.instrument));
745 const auto& descriptor =
746 grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
747 CHECK(descriptor.label_keys.size() == label_values.size());
748 CHECK(descriptor.optional_label_keys.size() == optional_values.size());
749 if (label_values.empty() && optional_values.empty()) {
750 absl::get<std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>>(
751 instrument_data.instrument)
752 ->Record(value, opentelemetry::context::Context{});
753 } else {
754 absl::get<std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>>(
755 instrument_data.instrument)
756 ->Record(value,
757 NPCMetricsKeyValueIterable(
758 descriptor.label_keys, label_values,
759 descriptor.optional_label_keys, optional_values,
760 instrument_data.optional_labels_bits),
761 opentelemetry::context::Context{});
762 }
763 }
764
RecordHistogram(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,double value,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_values)765 void OpenTelemetryPluginImpl::RecordHistogram(
766 grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
767 double value, absl::Span<const absl::string_view> label_values,
768 absl::Span<const absl::string_view> optional_values) {
769 const auto& instrument_data = instruments_data_.at(handle.index);
770 if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
771 // This instrument is disabled.
772 return;
773 }
774 CHECK(absl::holds_alternative<
775 std::unique_ptr<opentelemetry::metrics::Histogram<double>>>(
776 instrument_data.instrument));
777 const auto& descriptor =
778 grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
779 CHECK(descriptor.label_keys.size() == label_values.size());
780 CHECK(descriptor.optional_label_keys.size() == optional_values.size());
781 if (label_values.empty() && optional_values.empty()) {
782 absl::get<std::unique_ptr<opentelemetry::metrics::Histogram<double>>>(
783 instrument_data.instrument)
784 ->Record(value, opentelemetry::context::Context{});
785 } else {
786 absl::get<std::unique_ptr<opentelemetry::metrics::Histogram<double>>>(
787 instrument_data.instrument)
788 ->Record(value,
789 NPCMetricsKeyValueIterable(
790 descriptor.label_keys, label_values,
791 descriptor.optional_label_keys, optional_values,
792 instrument_data.optional_labels_bits),
793 opentelemetry::context::Context{});
794 }
795 }
796
AddCallback(grpc_core::RegisteredMetricCallback * callback)797 void OpenTelemetryPluginImpl::AddCallback(
798 grpc_core::RegisteredMetricCallback* callback) {
799 std::vector<
800 absl::variant<CallbackGaugeState<int64_t>*, CallbackGaugeState<double>*>>
801 gauges_that_need_to_add_callback;
802 {
803 grpc_core::MutexLock lock(&mu_);
804 callback_timestamps_.emplace(callback, grpc_core::Timestamp::InfPast());
805 for (const auto& handle : callback->metrics()) {
806 const auto& descriptor =
807 grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
808 CHECK(
809 descriptor.instrument_type ==
810 grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
811 switch (descriptor.value_type) {
812 case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: {
813 const auto& instrument_data = instruments_data_.at(handle.index);
814 if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
815 // This instrument is disabled.
816 continue;
817 }
818 auto* callback_gauge_state =
819 absl::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>(
820 &instrument_data.instrument);
821 CHECK_NE(callback_gauge_state, nullptr);
822 (*callback_gauge_state)
823 ->caches.emplace(callback, CallbackGaugeState<int64_t>::Cache{});
824 if (!std::exchange((*callback_gauge_state)->ot_callback_registered,
825 true)) {
826 gauges_that_need_to_add_callback.push_back(
827 callback_gauge_state->get());
828 }
829 break;
830 }
831 case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
832 const auto& instrument_data = instruments_data_.at(handle.index);
833 if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
834 // This instrument is disabled.
835 continue;
836 }
837 auto* callback_gauge_state =
838 absl::get_if<std::unique_ptr<CallbackGaugeState<double>>>(
839 &instrument_data.instrument);
840 CHECK_NE(callback_gauge_state, nullptr);
841 (*callback_gauge_state)
842 ->caches.emplace(callback, CallbackGaugeState<double>::Cache{});
843 if (!std::exchange((*callback_gauge_state)->ot_callback_registered,
844 true)) {
845 gauges_that_need_to_add_callback.push_back(
846 callback_gauge_state->get());
847 }
848 break;
849 }
850 default:
851 grpc_core::Crash(absl::StrFormat(
852 "Unknown or unsupported value type: %d", descriptor.value_type));
853 }
854 }
855 }
856 // AddCallback internally grabs OpenTelemetry's observable_registry's
857 // lock. So we need to call it without our plugin lock otherwise we may
858 // deadlock.
859 for (const auto& gauge : gauges_that_need_to_add_callback) {
860 grpc_core::Match(
861 gauge,
862 [](CallbackGaugeState<int64_t>* gauge) {
863 gauge->instrument->AddCallback(
864 &CallbackGaugeState<int64_t>::CallbackGaugeCallback, gauge);
865 },
866 [](CallbackGaugeState<double>* gauge) {
867 gauge->instrument->AddCallback(
868 &CallbackGaugeState<double>::CallbackGaugeCallback, gauge);
869 });
870 }
871 }
872
RemoveCallback(grpc_core::RegisteredMetricCallback * callback)873 void OpenTelemetryPluginImpl::RemoveCallback(
874 grpc_core::RegisteredMetricCallback* callback) {
875 {
876 grpc_core::MutexLock lock(&mu_);
877 callback_timestamps_.erase(callback);
878 for (const auto& handle : callback->metrics()) {
879 const auto& descriptor =
880 grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
881 CHECK(
882 descriptor.instrument_type ==
883 grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
884 switch (descriptor.value_type) {
885 case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: {
886 const auto& instrument_data = instruments_data_.at(handle.index);
887 if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
888 // This instrument is disabled.
889 continue;
890 }
891 auto* callback_gauge_state =
892 absl::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>(
893 &instrument_data.instrument);
894 CHECK_NE(callback_gauge_state, nullptr);
895 CHECK((*callback_gauge_state)->ot_callback_registered);
896 CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u);
897 break;
898 }
899 case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
900 const auto& instrument_data = instruments_data_.at(handle.index);
901 if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
902 // This instrument is disabled.
903 continue;
904 }
905 auto* callback_gauge_state =
906 absl::get_if<std::unique_ptr<CallbackGaugeState<double>>>(
907 &instrument_data.instrument);
908 CHECK_NE(callback_gauge_state, nullptr);
909 CHECK((*callback_gauge_state)->ot_callback_registered);
910 CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u);
911 break;
912 }
913 default:
914 grpc_core::Crash(absl::StrFormat(
915 "Unknown or unsupported value type: %d", descriptor.value_type));
916 }
917 }
918 }
919 // Note that we are not removing the callback from OpenTelemetry immediately,
920 // and instead remove it when the plugin is destroyed. We just have a single
921 // callback per OpenTelemetry instrument which is a small number. If we decide
922 // to remove the callback immediately at this point, we need to make sure that
923 // 1) the callback is removed without holding mu_ and 2) we make sure that
924 // this does not race against a possible `AddCallback` operation. A potential
925 // way to do this is to use WorkSerializer.
926 }
927
928 template <typename ValueType>
Observe(opentelemetry::metrics::ObserverResult & result,const Cache & cache)929 void OpenTelemetryPluginImpl::CallbackGaugeState<ValueType>::Observe(
930 opentelemetry::metrics::ObserverResult& result, const Cache& cache) {
931 const auto& descriptor =
932 grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor({id});
933 for (const auto& pair : cache) {
934 CHECK(pair.first.size() <= (descriptor.label_keys.size() +
935 descriptor.optional_label_keys.size()));
936 if (descriptor.label_keys.empty() &&
937 descriptor.optional_label_keys.empty()) {
938 opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
939 opentelemetry::metrics::ObserverResultT<ValueType>>>(result)
940 ->Observe(pair.second);
941 } else {
942 auto& instrument_data = ot_plugin->instruments_data_.at(id);
943 opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
944 opentelemetry::metrics::ObserverResultT<ValueType>>>(result)
945 ->Observe(pair.second,
946 NPCMetricsKeyValueIterable(
947 descriptor.label_keys,
948 absl::FixedArray<absl::string_view>(
949 pair.first.begin(),
950 pair.first.begin() + descriptor.label_keys.size()),
951 descriptor.optional_label_keys,
952 absl::FixedArray<absl::string_view>(
953 pair.first.begin() + descriptor.label_keys.size(),
954 pair.first.end()),
955 instrument_data.optional_labels_bits));
956 }
957 }
958 }
959
960 // OpenTelemetry calls our callback with its observable_registry's lock
961 // held.
962 template <typename ValueType>
963 void OpenTelemetryPluginImpl::CallbackGaugeState<ValueType>::
CallbackGaugeCallback(opentelemetry::metrics::ObserverResult result,void * arg)964 CallbackGaugeCallback(opentelemetry::metrics::ObserverResult result,
965 void* arg) {
966 auto* callback_gauge_state = static_cast<CallbackGaugeState<ValueType>*>(arg);
967 auto now = grpc_core::Timestamp::Now();
968 grpc_core::MutexLock plugin_lock(&callback_gauge_state->ot_plugin->mu_);
969 for (auto& elem : callback_gauge_state->caches) {
970 auto* registered_metric_callback = elem.first;
971 auto iter = callback_gauge_state->ot_plugin->callback_timestamps_.find(
972 registered_metric_callback);
973 CHECK(iter != callback_gauge_state->ot_plugin->callback_timestamps_.end());
974 if (now - iter->second < registered_metric_callback->min_interval()) {
975 // Use cached value.
976 callback_gauge_state->Observe(result, elem.second);
977 continue;
978 }
979 // Otherwise update and use the cache.
980 iter->second = now;
981 CallbackMetricReporter reporter(callback_gauge_state->ot_plugin,
982 registered_metric_callback);
983 registered_metric_callback->Run(reporter);
984 callback_gauge_state->Observe(result, elem.second);
985 }
986 }
987
GetClientCallTracer(const grpc_core::Slice & path,bool registered_method,std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config)988 grpc_core::ClientCallTracer* OpenTelemetryPluginImpl::GetClientCallTracer(
989 const grpc_core::Slice& path, bool registered_method,
990 std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config) {
991 return grpc_core::GetContext<grpc_core::Arena>()
992 ->ManagedNew<ClientCallTracer>(
993 path, grpc_core::GetContext<grpc_core::Arena>(), registered_method,
994 this,
995 std::static_pointer_cast<OpenTelemetryPluginImpl::ClientScopeConfig>(
996 scope_config));
997 }
998
GetServerCallTracer(std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config)999 grpc_core::ServerCallTracer* OpenTelemetryPluginImpl::GetServerCallTracer(
1000 std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config) {
1001 return grpc_core::GetContext<grpc_core::Arena>()
1002 ->ManagedNew<ServerCallTracer>(
1003 this,
1004 std::static_pointer_cast<OpenTelemetryPluginImpl::ServerScopeConfig>(
1005 scope_config));
1006 }
1007
IsInstrumentEnabled(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle) const1008 bool OpenTelemetryPluginImpl::IsInstrumentEnabled(
1009 grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle) const {
1010 return !absl::holds_alternative<Disabled>(
1011 instruments_data_.at(handle.index).instrument);
1012 }
1013
AddToChannelArguments(grpc::ChannelArguments * args)1014 void OpenTelemetryPluginImpl::AddToChannelArguments(
1015 grpc::ChannelArguments* args) {
1016 const grpc_channel_args c_args = args->c_channel_args();
1017 auto* stats_plugin_list = grpc_channel_args_find_pointer<
1018 std::shared_ptr<std::vector<std::shared_ptr<grpc_core::StatsPlugin>>>>(
1019 &c_args, GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS);
1020 if (stats_plugin_list != nullptr) {
1021 (*stats_plugin_list)->emplace_back(shared_from_this());
1022 } else {
1023 auto stats_plugin_list = std::make_shared<
1024 std::vector<std::shared_ptr<grpc_core::StatsPlugin>>>();
1025 args->SetPointerWithVtable(
1026 GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS, &stats_plugin_list,
1027 grpc_core::ChannelArgTypeTraits<decltype(stats_plugin_list)>::VTable());
1028 stats_plugin_list->emplace_back(shared_from_this());
1029 }
1030 }
1031
AddToServerBuilder(grpc::ServerBuilder * builder)1032 void OpenTelemetryPluginImpl::AddToServerBuilder(grpc::ServerBuilder* builder) {
1033 builder->SetOption(std::make_unique<ServerBuilderOption>(shared_from_this()));
1034 }
1035
1036 } // namespace internal
1037
1038 constexpr absl::string_view
1039 OpenTelemetryPluginBuilder::kClientAttemptStartedInstrumentName;
1040 constexpr absl::string_view
1041 OpenTelemetryPluginBuilder::kClientAttemptDurationInstrumentName;
1042 constexpr absl::string_view OpenTelemetryPluginBuilder::
1043 kClientAttemptSentTotalCompressedMessageSizeInstrumentName;
1044 constexpr absl::string_view OpenTelemetryPluginBuilder::
1045 kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName;
1046 constexpr absl::string_view
1047 OpenTelemetryPluginBuilder::kServerCallStartedInstrumentName;
1048 constexpr absl::string_view
1049 OpenTelemetryPluginBuilder::kServerCallDurationInstrumentName;
1050 constexpr absl::string_view OpenTelemetryPluginBuilder::
1051 kServerCallSentTotalCompressedMessageSizeInstrumentName;
1052 constexpr absl::string_view OpenTelemetryPluginBuilder::
1053 kServerCallRcvdTotalCompressedMessageSizeInstrumentName;
1054
1055 //
1056 // OpenTelemetryPluginBuilder
1057 //
1058
OpenTelemetryPluginBuilder()1059 OpenTelemetryPluginBuilder::OpenTelemetryPluginBuilder()
1060 : impl_(std::make_unique<internal::OpenTelemetryPluginBuilderImpl>()) {}
1061
1062 OpenTelemetryPluginBuilder::~OpenTelemetryPluginBuilder() = default;
1063
SetMeterProvider(std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider)1064 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetMeterProvider(
1065 std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) {
1066 impl_->SetMeterProvider(std::move(meter_provider));
1067 return *this;
1068 }
1069
1070 OpenTelemetryPluginBuilder&
SetTargetAttributeFilter(absl::AnyInvocable<bool (absl::string_view)const> target_attribute_filter)1071 OpenTelemetryPluginBuilder::SetTargetAttributeFilter(
1072 absl::AnyInvocable<bool(absl::string_view /*target*/) const>
1073 target_attribute_filter) {
1074 impl_->SetTargetAttributeFilter(std::move(target_attribute_filter));
1075 return *this;
1076 }
1077
1078 OpenTelemetryPluginBuilder&
SetGenericMethodAttributeFilter(absl::AnyInvocable<bool (absl::string_view)const> generic_method_attribute_filter)1079 OpenTelemetryPluginBuilder::SetGenericMethodAttributeFilter(
1080 absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
1081 generic_method_attribute_filter) {
1082 impl_->SetGenericMethodAttributeFilter(
1083 std::move(generic_method_attribute_filter));
1084 return *this;
1085 }
1086
EnableMetrics(absl::Span<const absl::string_view> metric_names)1087 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::EnableMetrics(
1088 absl::Span<const absl::string_view> metric_names) {
1089 impl_->EnableMetrics(metric_names);
1090 return *this;
1091 }
1092
DisableMetrics(absl::Span<const absl::string_view> metric_names)1093 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::DisableMetrics(
1094 absl::Span<const absl::string_view> metric_names) {
1095 impl_->DisableMetrics(metric_names);
1096 return *this;
1097 }
1098
DisableAllMetrics()1099 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::DisableAllMetrics() {
1100 impl_->DisableAllMetrics();
1101 return *this;
1102 }
1103
AddPluginOption(std::unique_ptr<OpenTelemetryPluginOption> option)1104 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::AddPluginOption(
1105 std::unique_ptr<OpenTelemetryPluginOption> option) {
1106 impl_->AddPluginOption(
1107 std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>(
1108 static_cast<grpc::internal::InternalOpenTelemetryPluginOption*>(
1109 option.release())));
1110 return *this;
1111 }
1112
AddOptionalLabel(absl::string_view optional_label_key)1113 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::AddOptionalLabel(
1114 absl::string_view optional_label_key) {
1115 impl_->AddOptionalLabel(optional_label_key);
1116 return *this;
1117 }
1118
SetChannelScopeFilter(absl::AnyInvocable<bool (const ChannelScope &)const> channel_scope_filter)1119 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetChannelScopeFilter(
1120 absl::AnyInvocable<bool(const ChannelScope& /*scope*/) const>
1121 channel_scope_filter) {
1122 impl_->SetChannelScopeFilter(std::move(channel_scope_filter));
1123 return *this;
1124 }
1125
BuildAndRegisterGlobal()1126 absl::Status OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
1127 return impl_->BuildAndRegisterGlobal();
1128 }
1129
1130 absl::StatusOr<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>>
Build()1131 OpenTelemetryPluginBuilder::Build() {
1132 return impl_->Build();
1133 }
1134
1135 } // namespace grpc
1136