• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/cpp/ext/otel/otel_plugin.h"
20 
21 #include <grpc/support/port_platform.h>
22 #include <grpcpp/ext/otel_plugin.h>
23 #include <grpcpp/version_info.h>
24 
25 #include <memory>
26 #include <type_traits>
27 #include <utility>
28 
29 #include "absl/log/check.h"
30 #include "opentelemetry/metrics/meter.h"
31 #include "opentelemetry/metrics/meter_provider.h"
32 #include "opentelemetry/metrics/sync_instruments.h"
33 #include "opentelemetry/nostd/shared_ptr.h"
34 #include "opentelemetry/nostd/unique_ptr.h"
35 #include "opentelemetry/nostd/variant.h"
36 #include "src/core/client_channel/client_channel_filter.h"
37 #include "src/core/config/core_configuration.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/surface/channel_stack_type.h"
40 #include "src/core/telemetry/call_tracer.h"
41 #include "src/core/util/match.h"
42 #include "src/cpp/ext/otel/key_value_iterable.h"
43 #include "src/cpp/ext/otel/otel_client_call_tracer.h"
44 #include "src/cpp/ext/otel/otel_server_call_tracer.h"
45 
46 namespace grpc {
47 namespace internal {
48 
OpenTelemetryMethodKey()49 absl::string_view OpenTelemetryMethodKey() { return "grpc.method"; }
50 
OpenTelemetryStatusKey()51 absl::string_view OpenTelemetryStatusKey() { return "grpc.status"; }
52 
OpenTelemetryTargetKey()53 absl::string_view OpenTelemetryTargetKey() { return "grpc.target"; }
54 
55 namespace {
BaseMetrics()56 absl::flat_hash_set<std::string> BaseMetrics() {
57   absl::flat_hash_set<std::string> base_metrics{
58       std::string(grpc::OpenTelemetryPluginBuilder::
59                       kClientAttemptStartedInstrumentName),
60       std::string(grpc::OpenTelemetryPluginBuilder::
61                       kClientAttemptDurationInstrumentName),
62       std::string(
63           grpc::OpenTelemetryPluginBuilder::
64               kClientAttemptSentTotalCompressedMessageSizeInstrumentName),
65       std::string(
66           grpc::OpenTelemetryPluginBuilder::
67               kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName),
68       std::string(
69           grpc::OpenTelemetryPluginBuilder::kServerCallStartedInstrumentName),
70       std::string(
71           grpc::OpenTelemetryPluginBuilder::kServerCallDurationInstrumentName),
72       std::string(grpc::OpenTelemetryPluginBuilder::
73                       kServerCallSentTotalCompressedMessageSizeInstrumentName),
74       std::string(grpc::OpenTelemetryPluginBuilder::
75                       kServerCallRcvdTotalCompressedMessageSizeInstrumentName)};
76   grpc_core::GlobalInstrumentsRegistry::ForEach(
77       [&](const grpc_core::GlobalInstrumentsRegistry::
78               GlobalInstrumentDescriptor& descriptor) {
79         if (descriptor.enable_by_default) {
80           base_metrics.emplace(descriptor.name);
81         }
82       });
83   return base_metrics;
84 }
85 }  // namespace
86 
87 class OpenTelemetryPluginImpl::NPCMetricsKeyValueIterable
88     : public opentelemetry::common::KeyValueIterable {
89  public:
NPCMetricsKeyValueIterable(absl::Span<const absl::string_view> label_keys,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_label_keys,absl::Span<const absl::string_view> optional_label_values,const OptionalLabelsBitSet & optional_labels_bits)90   NPCMetricsKeyValueIterable(
91       absl::Span<const absl::string_view> label_keys,
92       absl::Span<const absl::string_view> label_values,
93       absl::Span<const absl::string_view> optional_label_keys,
94       absl::Span<const absl::string_view> optional_label_values,
95       const OptionalLabelsBitSet& optional_labels_bits)
96       : label_keys_(label_keys),
97         label_values_(label_values),
98         optional_label_keys_(optional_label_keys),
99         optional_label_values_(optional_label_values),
100         optional_labels_bits_(optional_labels_bits) {}
101 
ForEachKeyValue(opentelemetry::nostd::function_ref<bool (opentelemetry::nostd::string_view,opentelemetry::common::AttributeValue)> callback) const102   bool ForEachKeyValue(opentelemetry::nostd::function_ref<
103                        bool(opentelemetry::nostd::string_view,
104                             opentelemetry::common::AttributeValue)>
105                            callback) const noexcept override {
106     for (size_t i = 0; i < label_keys_.size(); ++i) {
107       if (!callback(AbslStrViewToOpenTelemetryStrView(label_keys_[i]),
108                     AbslStrViewToOpenTelemetryStrView(label_values_[i]))) {
109         return false;
110       }
111     }
112     // Since we are saving the optional label values as std::string for callback
113     // gauges, we want to minimize memory usage by filtering out the disabled
114     // optional label values.
115     bool filtered = optional_label_values_.size() < optional_label_keys_.size();
116     for (size_t i = 0, j = 0; i < optional_label_keys_.size(); ++i) {
117       if (!optional_labels_bits_.test(i)) {
118         if (!filtered) ++j;
119         continue;
120       }
121       if (!callback(
122               AbslStrViewToOpenTelemetryStrView(optional_label_keys_[i]),
123               AbslStrViewToOpenTelemetryStrView(optional_label_values_[j++]))) {
124         return false;
125       }
126     }
127     return true;
128   }
129 
size() const130   size_t size() const noexcept override {
131     return label_keys_.size() + optional_labels_bits_.count();
132   }
133 
134  private:
135   absl::Span<const absl::string_view> label_keys_;
136   absl::Span<const absl::string_view> label_values_;
137   absl::Span<const absl::string_view> optional_label_keys_;
138   absl::Span<const absl::string_view> optional_label_values_;
139   const OptionalLabelsBitSet& optional_labels_bits_;
140 };
141 
142 //
143 // OpenTelemetryPluginBuilderImpl
144 //
145 
OpenTelemetryPluginBuilderImpl()146 OpenTelemetryPluginBuilderImpl::OpenTelemetryPluginBuilderImpl()
147     : metrics_(BaseMetrics()) {}
148 
149 OpenTelemetryPluginBuilderImpl::~OpenTelemetryPluginBuilderImpl() = default;
150 
151 OpenTelemetryPluginBuilderImpl&
SetMeterProvider(std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider)152 OpenTelemetryPluginBuilderImpl::SetMeterProvider(
153     std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) {
154   meter_provider_ = std::move(meter_provider);
155   return *this;
156 }
157 
EnableMetrics(absl::Span<const absl::string_view> metric_names)158 OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::EnableMetrics(
159     absl::Span<const absl::string_view> metric_names) {
160   for (const auto& metric_name : metric_names) {
161     metrics_.emplace(metric_name);
162   }
163   return *this;
164 }
165 
DisableMetrics(absl::Span<const absl::string_view> metric_names)166 OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::DisableMetrics(
167     absl::Span<const absl::string_view> metric_names) {
168   for (const auto& metric_name : metric_names) {
169     metrics_.erase(metric_name);
170   }
171   return *this;
172 }
173 
174 OpenTelemetryPluginBuilderImpl&
DisableAllMetrics()175 OpenTelemetryPluginBuilderImpl::DisableAllMetrics() {
176   metrics_.clear();
177   return *this;
178 }
179 
180 OpenTelemetryPluginBuilderImpl&
SetTargetAttributeFilter(absl::AnyInvocable<bool (absl::string_view)const> target_attribute_filter)181 OpenTelemetryPluginBuilderImpl::SetTargetAttributeFilter(
182     absl::AnyInvocable<bool(absl::string_view /*target*/) const>
183         target_attribute_filter) {
184   target_attribute_filter_ = std::move(target_attribute_filter);
185   return *this;
186 }
187 
188 OpenTelemetryPluginBuilderImpl&
SetGenericMethodAttributeFilter(absl::AnyInvocable<bool (absl::string_view)const> generic_method_attribute_filter)189 OpenTelemetryPluginBuilderImpl::SetGenericMethodAttributeFilter(
190     absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
191         generic_method_attribute_filter) {
192   generic_method_attribute_filter_ = std::move(generic_method_attribute_filter);
193   return *this;
194 }
195 
196 OpenTelemetryPluginBuilderImpl&
SetServerSelector(absl::AnyInvocable<bool (const grpc_core::ChannelArgs &)const> server_selector)197 OpenTelemetryPluginBuilderImpl::SetServerSelector(
198     absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
199         server_selector) {
200   server_selector_ = std::move(server_selector);
201   return *this;
202 }
203 
AddPluginOption(std::unique_ptr<InternalOpenTelemetryPluginOption> option)204 OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::AddPluginOption(
205     std::unique_ptr<InternalOpenTelemetryPluginOption> option) {
206   // We allow a limit of 64 plugin options to be registered at this time.
207   CHECK_LT(plugin_options_.size(), 64u);
208   plugin_options_.push_back(std::move(option));
209   return *this;
210 }
211 
212 OpenTelemetryPluginBuilderImpl&
AddOptionalLabel(absl::string_view optional_label_key)213 OpenTelemetryPluginBuilderImpl::AddOptionalLabel(
214     absl::string_view optional_label_key) {
215   optional_label_keys_.emplace(optional_label_key);
216   return *this;
217 }
218 
219 OpenTelemetryPluginBuilderImpl&
SetChannelScopeFilter(absl::AnyInvocable<bool (const OpenTelemetryPluginBuilder::ChannelScope &)const> channel_scope_filter)220 OpenTelemetryPluginBuilderImpl::SetChannelScopeFilter(
221     absl::AnyInvocable<
222         bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
223         channel_scope_filter) {
224   channel_scope_filter_ = std::move(channel_scope_filter);
225   return *this;
226 }
227 
BuildAndRegisterGlobal()228 absl::Status OpenTelemetryPluginBuilderImpl::BuildAndRegisterGlobal() {
229   if (meter_provider_ == nullptr) {
230     return absl::InvalidArgumentError(
231         "Need to configure a valid meter provider.");
232   }
233   grpc_core::GlobalStatsPluginRegistry::RegisterStatsPlugin(
234       std::make_shared<OpenTelemetryPluginImpl>(
235           metrics_, meter_provider_, std::move(target_attribute_filter_),
236           std::move(generic_method_attribute_filter_),
237           std::move(server_selector_), std::move(plugin_options_),
238           std::move(optional_label_keys_), std::move(channel_scope_filter_)));
239   return absl::OkStatus();
240 }
241 
242 absl::StatusOr<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>>
Build()243 OpenTelemetryPluginBuilderImpl::Build() {
244   if (meter_provider_ == nullptr) {
245     return absl::InvalidArgumentError(
246         "Need to configure a valid meter provider.");
247   }
248   return std::make_shared<OpenTelemetryPluginImpl>(
249       metrics_, meter_provider_, std::move(target_attribute_filter_),
250       std::move(generic_method_attribute_filter_), std::move(server_selector_),
251       std::move(plugin_options_), std::move(optional_label_keys_),
252       std::move(channel_scope_filter_));
253 }
254 
CallbackMetricReporter(OpenTelemetryPluginImpl * ot_plugin,grpc_core::RegisteredMetricCallback * key)255 OpenTelemetryPluginImpl::CallbackMetricReporter::CallbackMetricReporter(
256     OpenTelemetryPluginImpl* ot_plugin,
257     grpc_core::RegisteredMetricCallback* key)
258     : ot_plugin_(ot_plugin), key_(key) {
259   // Since we are updating the timestamp and updating the cache for all
260   // registered instruments in a RegisteredMetricCallback, we will need to
261   // clear all the cache cells for this RegisteredMetricCallback first, so
262   // that if a particular combination of labels was previously present but
263   // is no longer present, we won't continue to report it.
264   for (const auto& handle : key->metrics()) {
265     const auto& descriptor =
266         grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
267     CHECK(descriptor.instrument_type ==
268           grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
269     switch (descriptor.value_type) {
270       case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: {
271         auto& callback_gauge_state =
272             absl::get<std::unique_ptr<CallbackGaugeState<int64_t>>>(
273                 ot_plugin_->instruments_data_.at(handle.index).instrument);
274         callback_gauge_state->caches[key].clear();
275         break;
276       }
277       case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
278         auto& callback_gauge_state =
279             absl::get<std::unique_ptr<CallbackGaugeState<double>>>(
280                 ot_plugin_->instruments_data_.at(handle.index).instrument);
281         callback_gauge_state->caches[key].clear();
282         break;
283       }
284       default:
285         grpc_core::Crash(absl::StrFormat(
286             "Unknown or unsupported value type: %d", descriptor.value_type));
287     }
288   }
289 }
290 
ReportInt64(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,int64_t value,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_values)291 void OpenTelemetryPluginImpl::CallbackMetricReporter::ReportInt64(
292     grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
293     int64_t value, absl::Span<const absl::string_view> label_values,
294     absl::Span<const absl::string_view> optional_values) {
295   const auto& instrument_data = ot_plugin_->instruments_data_.at(handle.index);
296   auto* callback_gauge_state =
297       absl::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>(
298           &instrument_data.instrument);
299   CHECK_NE(callback_gauge_state, nullptr);
300   const auto& descriptor =
301       grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
302   CHECK(descriptor.label_keys.size() == label_values.size());
303   CHECK(descriptor.optional_label_keys.size() == optional_values.size());
304   if ((*callback_gauge_state)->caches.find(key_) ==
305       (*callback_gauge_state)->caches.end()) {
306     LOG(ERROR) << "This may occur when the gauge used in AddCallback is "
307                   "different from the gauge used in Report. This indicates a "
308                   "misuse of the API. The value "
309                << value << " will not be recorded for instrument "
310                << handle.index;
311     return;
312   }
313   auto& cell = (*callback_gauge_state)->caches.at(key_);
314   std::vector<std::string> key;
315   key.reserve(label_values.size() +
316               instrument_data.optional_labels_bits.count());
317   for (const absl::string_view value : label_values) {
318     key.emplace_back(value);
319   }
320   for (size_t i = 0; i < optional_values.size(); ++i) {
321     if (instrument_data.optional_labels_bits.test(i)) {
322       key.emplace_back(optional_values[i]);
323     }
324   }
325   cell.insert_or_assign(std::move(key), value);
326 }
327 
ReportDouble(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,double value,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_values)328 void OpenTelemetryPluginImpl::CallbackMetricReporter::ReportDouble(
329     grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
330     double value, absl::Span<const absl::string_view> label_values,
331     absl::Span<const absl::string_view> optional_values) {
332   const auto& instrument_data = ot_plugin_->instruments_data_.at(handle.index);
333   auto* callback_gauge_state =
334       absl::get_if<std::unique_ptr<CallbackGaugeState<double>>>(
335           &instrument_data.instrument);
336   CHECK_NE(callback_gauge_state, nullptr);
337   const auto& descriptor =
338       grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
339   CHECK(descriptor.label_keys.size() == label_values.size());
340   CHECK(descriptor.optional_label_keys.size() == optional_values.size());
341   if ((*callback_gauge_state)->caches.find(key_) ==
342       (*callback_gauge_state)->caches.end()) {
343     LOG(ERROR) << "This may occur when the gauge used in AddCallback is "
344                   "different from the gauge used in Report. This indicates a "
345                   "misuse of the API. The value "
346                << value << " will not be recorded for instrument "
347                << handle.index;
348     return;
349   }
350   auto& cell = (*callback_gauge_state)->caches.at(key_);
351   std::vector<std::string> key;
352   key.reserve(label_values.size() +
353               instrument_data.optional_labels_bits.count());
354   for (const absl::string_view value : label_values) {
355     key.emplace_back(value);
356   }
357   for (size_t i = 0; i < optional_values.size(); ++i) {
358     if (instrument_data.optional_labels_bits.test(i)) {
359       key.emplace_back(optional_values[i]);
360     }
361   }
362   cell.insert_or_assign(std::move(key), value);
363 }
364 
UpdateArguments(grpc::ChannelArguments * args)365 void OpenTelemetryPluginImpl::ServerBuilderOption::UpdateArguments(
366     grpc::ChannelArguments* args) {
367   plugin_->AddToChannelArguments(args);
368 }
369 
OpenTelemetryPluginImpl(const absl::flat_hash_set<std::string> & metrics,opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider,absl::AnyInvocable<bool (absl::string_view)const> target_attribute_filter,absl::AnyInvocable<bool (absl::string_view)const> generic_method_attribute_filter,absl::AnyInvocable<bool (const grpc_core::ChannelArgs &)const> server_selector,std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>> plugin_options,const std::set<absl::string_view> & optional_label_keys,absl::AnyInvocable<bool (const OpenTelemetryPluginBuilder::ChannelScope &)const> channel_scope_filter)370 OpenTelemetryPluginImpl::OpenTelemetryPluginImpl(
371     const absl::flat_hash_set<std::string>& metrics,
372     opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
373         meter_provider,
374     absl::AnyInvocable<bool(absl::string_view /*target*/) const>
375         target_attribute_filter,
376     absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
377         generic_method_attribute_filter,
378     absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
379         server_selector,
380     std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>
381         plugin_options,
382     const std::set<absl::string_view>& optional_label_keys,
383     absl::AnyInvocable<
384         bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
385         channel_scope_filter)
386     : meter_provider_(std::move(meter_provider)),
387       server_selector_(std::move(server_selector)),
388       target_attribute_filter_(std::move(target_attribute_filter)),
389       generic_method_attribute_filter_(
390           std::move(generic_method_attribute_filter)),
391       plugin_options_(std::move(plugin_options)),
392       channel_scope_filter_(std::move(channel_scope_filter)) {
393   auto meter = meter_provider_->GetMeter("grpc-c++", GRPC_CPP_VERSION_STRING);
394   // Per-call metrics.
395   if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
396                            kClientAttemptStartedInstrumentName)) {
397     client_.attempt.started = meter->CreateUInt64Counter(
398         std::string(grpc::OpenTelemetryPluginBuilder::
399                         kClientAttemptStartedInstrumentName),
400         "Number of client call attempts started", "{attempt}");
401   }
402   if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
403                            kClientAttemptDurationInstrumentName)) {
404     client_.attempt.duration = meter->CreateDoubleHistogram(
405         std::string(grpc::OpenTelemetryPluginBuilder::
406                         kClientAttemptDurationInstrumentName),
407         "End-to-end time taken to complete a client call attempt", "s");
408   }
409   if (metrics.contains(
410           grpc::OpenTelemetryPluginBuilder::
411               kClientAttemptSentTotalCompressedMessageSizeInstrumentName)) {
412     client_.attempt.sent_total_compressed_message_size =
413         meter->CreateUInt64Histogram(
414             std::string(
415                 grpc::OpenTelemetryPluginBuilder::
416                     kClientAttemptSentTotalCompressedMessageSizeInstrumentName),
417             "Compressed message bytes sent per client call attempt", "By");
418   }
419   if (metrics.contains(
420           grpc::OpenTelemetryPluginBuilder::
421               kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName)) {
422     client_.attempt.rcvd_total_compressed_message_size =
423         meter->CreateUInt64Histogram(
424             std::string(
425                 grpc::OpenTelemetryPluginBuilder::
426                     kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName),
427             "Compressed message bytes received per call attempt", "By");
428   }
429   if (metrics.contains(
430           grpc::OpenTelemetryPluginBuilder::kServerCallStartedInstrumentName)) {
431     server_.call.started = meter->CreateUInt64Counter(
432         std::string(
433             grpc::OpenTelemetryPluginBuilder::kServerCallStartedInstrumentName),
434         "Number of server calls started", "{call}");
435   }
436   if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
437                            kServerCallDurationInstrumentName)) {
438     server_.call.duration = meter->CreateDoubleHistogram(
439         std::string(grpc::OpenTelemetryPluginBuilder::
440                         kServerCallDurationInstrumentName),
441         "End-to-end time taken to complete a call from server transport's "
442         "perspective",
443         "s");
444   }
445   if (metrics.contains(
446           grpc::OpenTelemetryPluginBuilder::
447               kServerCallSentTotalCompressedMessageSizeInstrumentName)) {
448     server_.call.sent_total_compressed_message_size =
449         meter->CreateUInt64Histogram(
450             std::string(
451                 grpc::OpenTelemetryPluginBuilder::
452                     kServerCallSentTotalCompressedMessageSizeInstrumentName),
453             "Compressed message bytes sent per server call", "By");
454   }
455   if (metrics.contains(
456           grpc::OpenTelemetryPluginBuilder::
457               kServerCallRcvdTotalCompressedMessageSizeInstrumentName)) {
458     server_.call.rcvd_total_compressed_message_size =
459         meter->CreateUInt64Histogram(
460             std::string(
461                 grpc::OpenTelemetryPluginBuilder::
462                     kServerCallRcvdTotalCompressedMessageSizeInstrumentName),
463             "Compressed message bytes received per server call", "By");
464   }
465   // Store optional label keys for per call metrics
466   CHECK(static_cast<size_t>(grpc_core::ClientCallTracer::CallAttemptTracer::
467                                 OptionalLabelKey::kSize) <=
468         kOptionalLabelsSizeLimit);
469   for (const auto& key : optional_label_keys) {
470     auto optional_key = OptionalLabelStringToKey(key);
471     if (optional_key.has_value()) {
472       per_call_optional_label_bits_.set(
473           static_cast<size_t>(optional_key.value()));
474     }
475   }
476   // Non-per-call metrics.
477   grpc_core::GlobalInstrumentsRegistry::ForEach(
478       [&, this](const grpc_core::GlobalInstrumentsRegistry::
479                     GlobalInstrumentDescriptor& descriptor) {
480         CHECK(descriptor.optional_label_keys.size() <=
481               kOptionalLabelsSizeLimit);
482         if (instruments_data_.size() < descriptor.index + 1) {
483           instruments_data_.resize(descriptor.index + 1);
484         }
485         if (!metrics.contains(descriptor.name)) {
486           return;
487         }
488         switch (descriptor.instrument_type) {
489           case grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter:
490             switch (descriptor.value_type) {
491               case grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64:
492                 instruments_data_[descriptor.index].instrument =
493                     meter->CreateUInt64Counter(
494                         std::string(descriptor.name),
495                         std::string(descriptor.description),
496                         std::string(descriptor.unit));
497                 break;
498               case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble:
499                 instruments_data_[descriptor.index].instrument =
500                     meter->CreateDoubleCounter(
501                         std::string(descriptor.name),
502                         std::string(descriptor.description),
503                         std::string(descriptor.unit));
504                 break;
505               default:
506                 grpc_core::Crash(
507                     absl::StrFormat("Unknown or unsupported value type: %d",
508                                     descriptor.value_type));
509             }
510             break;
511           case grpc_core::GlobalInstrumentsRegistry::InstrumentType::kHistogram:
512             switch (descriptor.value_type) {
513               case grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64:
514                 instruments_data_[descriptor.index].instrument =
515                     meter->CreateUInt64Histogram(
516                         std::string(descriptor.name),
517                         std::string(descriptor.description),
518                         std::string(descriptor.unit));
519                 break;
520               case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble:
521                 instruments_data_[descriptor.index].instrument =
522                     meter->CreateDoubleHistogram(
523                         std::string(descriptor.name),
524                         std::string(descriptor.description),
525                         std::string(descriptor.unit));
526                 break;
527               default:
528                 grpc_core::Crash(
529                     absl::StrFormat("Unknown or unsupported value type: %d",
530                                     descriptor.value_type));
531             }
532             break;
533           case grpc_core::GlobalInstrumentsRegistry::InstrumentType::
534               kCallbackGauge:
535             switch (descriptor.value_type) {
536               case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: {
537                 auto observable_state =
538                     std::make_unique<CallbackGaugeState<int64_t>>();
539                 observable_state->id = descriptor.index;
540                 observable_state->ot_plugin = this;
541                 observable_state->instrument =
542                     meter->CreateInt64ObservableGauge(
543                         std::string(descriptor.name),
544                         std::string(descriptor.description),
545                         std::string(descriptor.unit));
546                 instruments_data_[descriptor.index].instrument =
547                     std::move(observable_state);
548                 break;
549               }
550               case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
551                 auto observable_state =
552                     std::make_unique<CallbackGaugeState<double>>();
553                 observable_state->id = descriptor.index;
554                 observable_state->ot_plugin = this;
555                 observable_state->instrument =
556                     meter->CreateDoubleObservableGauge(
557                         std::string(descriptor.name),
558                         std::string(descriptor.description),
559                         std::string(descriptor.unit));
560                 instruments_data_[descriptor.index].instrument =
561                     std::move(observable_state);
562                 break;
563               }
564               default:
565                 grpc_core::Crash(
566                     absl::StrFormat("Unknown or unsupported value type: %d",
567                                     descriptor.value_type));
568             }
569             break;
570           default:
571             grpc_core::Crash(absl::StrFormat("Unknown instrument_type: %d",
572                                              descriptor.instrument_type));
573         }
574         for (size_t i = 0; i < descriptor.optional_label_keys.size(); ++i) {
575           if (optional_label_keys.find(descriptor.optional_label_keys[i]) !=
576               optional_label_keys.end()) {
577             instruments_data_[descriptor.index].optional_labels_bits.set(i);
578           }
579         }
580       });
581 }
582 
~OpenTelemetryPluginImpl()583 OpenTelemetryPluginImpl::~OpenTelemetryPluginImpl() {
584   for (const auto& instrument_data : instruments_data_) {
585     grpc_core::Match(
586         instrument_data.instrument, [](const Disabled&) {},
587         [](const std::unique_ptr<opentelemetry::metrics::Counter<double>>&) {},
588         [](const std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>&) {
589         },
590         [](const std::unique_ptr<
591             opentelemetry::metrics::Histogram<uint64_t>>&) {},
592         [](const std::unique_ptr<opentelemetry::metrics::Histogram<double>>&) {
593         },
594         [](const std::unique_ptr<CallbackGaugeState<int64_t>>& state) {
595           CHECK(state->caches.empty());
596           if (state->ot_callback_registered) {
597             state->instrument->RemoveCallback(
598                 &CallbackGaugeState<int64_t>::CallbackGaugeCallback,
599                 state.get());
600             state->ot_callback_registered = false;
601           }
602         },
603         [](const std::unique_ptr<CallbackGaugeState<double>>& state) {
604           CHECK(state->caches.empty());
605           if (state->ot_callback_registered) {
606             state->instrument->RemoveCallback(
607                 &CallbackGaugeState<double>::CallbackGaugeCallback,
608                 state.get());
609             state->ot_callback_registered = false;
610           }
611         });
612   }
613 }
614 
615 namespace {
616 constexpr absl::string_view kLocality = "grpc.lb.locality";
617 }
618 
OptionalLabelKeyToString(grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey key)619 absl::string_view OpenTelemetryPluginImpl::OptionalLabelKeyToString(
620     grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey key) {
621   switch (key) {
622     case grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey::
623         kLocality:
624       return kLocality;
625     default:
626       grpc_core::Crash("Illegal OptionalLabelKey index");
627   }
628 }
629 
630 absl::optional<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey>
OptionalLabelStringToKey(absl::string_view key)631 OpenTelemetryPluginImpl::OptionalLabelStringToKey(absl::string_view key) {
632   if (key == kLocality) {
633     return grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey::
634         kLocality;
635   }
636   return absl::nullopt;
637 }
638 
639 std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
IsEnabledForChannel(const OpenTelemetryPluginBuilder::ChannelScope & scope) const640 OpenTelemetryPluginImpl::IsEnabledForChannel(
641     const OpenTelemetryPluginBuilder::ChannelScope& scope) const {
642   if (channel_scope_filter_ == nullptr || channel_scope_filter_(scope)) {
643     return {true, std::make_shared<ClientScopeConfig>(this, scope)};
644   }
645   return {false, nullptr};
646 }
647 
648 std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
IsEnabledForServer(const grpc_core::ChannelArgs & args) const649 OpenTelemetryPluginImpl::IsEnabledForServer(
650     const grpc_core::ChannelArgs& args) const {
651   // Return true only if there is no server selector registered or if the
652   // server selector returns true.
653   if (server_selector_ == nullptr || server_selector_(args)) {
654     return {true, std::make_shared<ServerScopeConfig>(this, args)};
655   }
656   return {false, nullptr};
657 }
658 
659 std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>
GetChannelScopeConfig(const OpenTelemetryPluginBuilder::ChannelScope & scope) const660 OpenTelemetryPluginImpl::GetChannelScopeConfig(
661     const OpenTelemetryPluginBuilder::ChannelScope& scope) const {
662   CHECK(channel_scope_filter_ == nullptr || channel_scope_filter_(scope));
663   return std::make_shared<ClientScopeConfig>(this, scope);
664 }
665 
666 std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>
GetServerScopeConfig(const grpc_core::ChannelArgs & args) const667 OpenTelemetryPluginImpl::GetServerScopeConfig(
668     const grpc_core::ChannelArgs& args) const {
669   CHECK(server_selector_ == nullptr || server_selector_(args));
670   return std::make_shared<ServerScopeConfig>(this, args);
671 }
672 
AddCounter(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,uint64_t value,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_values)673 void OpenTelemetryPluginImpl::AddCounter(
674     grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
675     uint64_t value, absl::Span<const absl::string_view> label_values,
676     absl::Span<const absl::string_view> optional_values) {
677   const auto& instrument_data = instruments_data_.at(handle.index);
678   if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
679     // This instrument is disabled.
680     return;
681   }
682   CHECK(absl::holds_alternative<
683         std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>>(
684       instrument_data.instrument));
685   const auto& descriptor =
686       grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
687   CHECK(descriptor.label_keys.size() == label_values.size());
688   CHECK(descriptor.optional_label_keys.size() == optional_values.size());
689   if (label_values.empty() && optional_values.empty()) {
690     absl::get<std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>>(
691         instrument_data.instrument)
692         ->Add(value);
693   } else {
694     absl::get<std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>>(
695         instrument_data.instrument)
696         ->Add(value, NPCMetricsKeyValueIterable(
697                          descriptor.label_keys, label_values,
698                          descriptor.optional_label_keys, optional_values,
699                          instrument_data.optional_labels_bits));
700   }
701 }
702 
AddCounter(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,double value,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_values)703 void OpenTelemetryPluginImpl::AddCounter(
704     grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
705     double value, absl::Span<const absl::string_view> label_values,
706     absl::Span<const absl::string_view> optional_values) {
707   const auto& instrument_data = instruments_data_.at(handle.index);
708   if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
709     // This instrument is disabled.
710     return;
711   }
712   CHECK(absl::holds_alternative<
713         std::unique_ptr<opentelemetry::metrics::Counter<double>>>(
714       instrument_data.instrument));
715   const auto& descriptor =
716       grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
717   CHECK(descriptor.label_keys.size() == label_values.size());
718   CHECK(descriptor.optional_label_keys.size() == optional_values.size());
719   if (label_values.empty() && optional_values.empty()) {
720     absl::get<std::unique_ptr<opentelemetry::metrics::Counter<double>>>(
721         instrument_data.instrument)
722         ->Add(value);
723   } else {
724     absl::get<std::unique_ptr<opentelemetry::metrics::Counter<double>>>(
725         instrument_data.instrument)
726         ->Add(value, NPCMetricsKeyValueIterable(
727                          descriptor.label_keys, label_values,
728                          descriptor.optional_label_keys, optional_values,
729                          instrument_data.optional_labels_bits));
730   }
731 }
732 
RecordHistogram(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,uint64_t value,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_values)733 void OpenTelemetryPluginImpl::RecordHistogram(
734     grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
735     uint64_t value, absl::Span<const absl::string_view> label_values,
736     absl::Span<const absl::string_view> optional_values) {
737   const auto& instrument_data = instruments_data_.at(handle.index);
738   if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
739     // This instrument is disabled.
740     return;
741   }
742   CHECK(absl::holds_alternative<
743         std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>>(
744       instrument_data.instrument));
745   const auto& descriptor =
746       grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
747   CHECK(descriptor.label_keys.size() == label_values.size());
748   CHECK(descriptor.optional_label_keys.size() == optional_values.size());
749   if (label_values.empty() && optional_values.empty()) {
750     absl::get<std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>>(
751         instrument_data.instrument)
752         ->Record(value, opentelemetry::context::Context{});
753   } else {
754     absl::get<std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>>(
755         instrument_data.instrument)
756         ->Record(value,
757                  NPCMetricsKeyValueIterable(
758                      descriptor.label_keys, label_values,
759                      descriptor.optional_label_keys, optional_values,
760                      instrument_data.optional_labels_bits),
761                  opentelemetry::context::Context{});
762   }
763 }
764 
RecordHistogram(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,double value,absl::Span<const absl::string_view> label_values,absl::Span<const absl::string_view> optional_values)765 void OpenTelemetryPluginImpl::RecordHistogram(
766     grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
767     double value, absl::Span<const absl::string_view> label_values,
768     absl::Span<const absl::string_view> optional_values) {
769   const auto& instrument_data = instruments_data_.at(handle.index);
770   if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
771     // This instrument is disabled.
772     return;
773   }
774   CHECK(absl::holds_alternative<
775         std::unique_ptr<opentelemetry::metrics::Histogram<double>>>(
776       instrument_data.instrument));
777   const auto& descriptor =
778       grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
779   CHECK(descriptor.label_keys.size() == label_values.size());
780   CHECK(descriptor.optional_label_keys.size() == optional_values.size());
781   if (label_values.empty() && optional_values.empty()) {
782     absl::get<std::unique_ptr<opentelemetry::metrics::Histogram<double>>>(
783         instrument_data.instrument)
784         ->Record(value, opentelemetry::context::Context{});
785   } else {
786     absl::get<std::unique_ptr<opentelemetry::metrics::Histogram<double>>>(
787         instrument_data.instrument)
788         ->Record(value,
789                  NPCMetricsKeyValueIterable(
790                      descriptor.label_keys, label_values,
791                      descriptor.optional_label_keys, optional_values,
792                      instrument_data.optional_labels_bits),
793                  opentelemetry::context::Context{});
794   }
795 }
796 
AddCallback(grpc_core::RegisteredMetricCallback * callback)797 void OpenTelemetryPluginImpl::AddCallback(
798     grpc_core::RegisteredMetricCallback* callback) {
799   std::vector<
800       absl::variant<CallbackGaugeState<int64_t>*, CallbackGaugeState<double>*>>
801       gauges_that_need_to_add_callback;
802   {
803     grpc_core::MutexLock lock(&mu_);
804     callback_timestamps_.emplace(callback, grpc_core::Timestamp::InfPast());
805     for (const auto& handle : callback->metrics()) {
806       const auto& descriptor =
807           grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
808       CHECK(
809           descriptor.instrument_type ==
810           grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
811       switch (descriptor.value_type) {
812         case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: {
813           const auto& instrument_data = instruments_data_.at(handle.index);
814           if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
815             // This instrument is disabled.
816             continue;
817           }
818           auto* callback_gauge_state =
819               absl::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>(
820                   &instrument_data.instrument);
821           CHECK_NE(callback_gauge_state, nullptr);
822           (*callback_gauge_state)
823               ->caches.emplace(callback, CallbackGaugeState<int64_t>::Cache{});
824           if (!std::exchange((*callback_gauge_state)->ot_callback_registered,
825                              true)) {
826             gauges_that_need_to_add_callback.push_back(
827                 callback_gauge_state->get());
828           }
829           break;
830         }
831         case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
832           const auto& instrument_data = instruments_data_.at(handle.index);
833           if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
834             // This instrument is disabled.
835             continue;
836           }
837           auto* callback_gauge_state =
838               absl::get_if<std::unique_ptr<CallbackGaugeState<double>>>(
839                   &instrument_data.instrument);
840           CHECK_NE(callback_gauge_state, nullptr);
841           (*callback_gauge_state)
842               ->caches.emplace(callback, CallbackGaugeState<double>::Cache{});
843           if (!std::exchange((*callback_gauge_state)->ot_callback_registered,
844                              true)) {
845             gauges_that_need_to_add_callback.push_back(
846                 callback_gauge_state->get());
847           }
848           break;
849         }
850         default:
851           grpc_core::Crash(absl::StrFormat(
852               "Unknown or unsupported value type: %d", descriptor.value_type));
853       }
854     }
855   }
856   // AddCallback internally grabs OpenTelemetry's observable_registry's
857   // lock. So we need to call it without our plugin lock otherwise we may
858   // deadlock.
859   for (const auto& gauge : gauges_that_need_to_add_callback) {
860     grpc_core::Match(
861         gauge,
862         [](CallbackGaugeState<int64_t>* gauge) {
863           gauge->instrument->AddCallback(
864               &CallbackGaugeState<int64_t>::CallbackGaugeCallback, gauge);
865         },
866         [](CallbackGaugeState<double>* gauge) {
867           gauge->instrument->AddCallback(
868               &CallbackGaugeState<double>::CallbackGaugeCallback, gauge);
869         });
870   }
871 }
872 
RemoveCallback(grpc_core::RegisteredMetricCallback * callback)873 void OpenTelemetryPluginImpl::RemoveCallback(
874     grpc_core::RegisteredMetricCallback* callback) {
875   {
876     grpc_core::MutexLock lock(&mu_);
877     callback_timestamps_.erase(callback);
878     for (const auto& handle : callback->metrics()) {
879       const auto& descriptor =
880           grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
881       CHECK(
882           descriptor.instrument_type ==
883           grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
884       switch (descriptor.value_type) {
885         case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: {
886           const auto& instrument_data = instruments_data_.at(handle.index);
887           if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
888             // This instrument is disabled.
889             continue;
890           }
891           auto* callback_gauge_state =
892               absl::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>(
893                   &instrument_data.instrument);
894           CHECK_NE(callback_gauge_state, nullptr);
895           CHECK((*callback_gauge_state)->ot_callback_registered);
896           CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u);
897           break;
898         }
899         case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
900           const auto& instrument_data = instruments_data_.at(handle.index);
901           if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
902             // This instrument is disabled.
903             continue;
904           }
905           auto* callback_gauge_state =
906               absl::get_if<std::unique_ptr<CallbackGaugeState<double>>>(
907                   &instrument_data.instrument);
908           CHECK_NE(callback_gauge_state, nullptr);
909           CHECK((*callback_gauge_state)->ot_callback_registered);
910           CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u);
911           break;
912         }
913         default:
914           grpc_core::Crash(absl::StrFormat(
915               "Unknown or unsupported value type: %d", descriptor.value_type));
916       }
917     }
918   }
919   // Note that we are not removing the callback from OpenTelemetry immediately,
920   // and instead remove it when the plugin is destroyed. We just have a single
921   // callback per OpenTelemetry instrument which is a small number. If we decide
922   // to remove the callback immediately at this point, we need to make sure that
923   // 1) the callback is removed without holding mu_ and 2) we make sure that
924   // this does not race against a possible `AddCallback` operation. A potential
925   // way to do this is to use WorkSerializer.
926 }
927 
928 template <typename ValueType>
Observe(opentelemetry::metrics::ObserverResult & result,const Cache & cache)929 void OpenTelemetryPluginImpl::CallbackGaugeState<ValueType>::Observe(
930     opentelemetry::metrics::ObserverResult& result, const Cache& cache) {
931   const auto& descriptor =
932       grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor({id});
933   for (const auto& pair : cache) {
934     CHECK(pair.first.size() <= (descriptor.label_keys.size() +
935                                 descriptor.optional_label_keys.size()));
936     if (descriptor.label_keys.empty() &&
937         descriptor.optional_label_keys.empty()) {
938       opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
939           opentelemetry::metrics::ObserverResultT<ValueType>>>(result)
940           ->Observe(pair.second);
941     } else {
942       auto& instrument_data = ot_plugin->instruments_data_.at(id);
943       opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
944           opentelemetry::metrics::ObserverResultT<ValueType>>>(result)
945           ->Observe(pair.second,
946                     NPCMetricsKeyValueIterable(
947                         descriptor.label_keys,
948                         absl::FixedArray<absl::string_view>(
949                             pair.first.begin(),
950                             pair.first.begin() + descriptor.label_keys.size()),
951                         descriptor.optional_label_keys,
952                         absl::FixedArray<absl::string_view>(
953                             pair.first.begin() + descriptor.label_keys.size(),
954                             pair.first.end()),
955                         instrument_data.optional_labels_bits));
956     }
957   }
958 }
959 
960 // OpenTelemetry calls our callback with its observable_registry's lock
961 // held.
962 template <typename ValueType>
963 void OpenTelemetryPluginImpl::CallbackGaugeState<ValueType>::
CallbackGaugeCallback(opentelemetry::metrics::ObserverResult result,void * arg)964     CallbackGaugeCallback(opentelemetry::metrics::ObserverResult result,
965                           void* arg) {
966   auto* callback_gauge_state = static_cast<CallbackGaugeState<ValueType>*>(arg);
967   auto now = grpc_core::Timestamp::Now();
968   grpc_core::MutexLock plugin_lock(&callback_gauge_state->ot_plugin->mu_);
969   for (auto& elem : callback_gauge_state->caches) {
970     auto* registered_metric_callback = elem.first;
971     auto iter = callback_gauge_state->ot_plugin->callback_timestamps_.find(
972         registered_metric_callback);
973     CHECK(iter != callback_gauge_state->ot_plugin->callback_timestamps_.end());
974     if (now - iter->second < registered_metric_callback->min_interval()) {
975       // Use cached value.
976       callback_gauge_state->Observe(result, elem.second);
977       continue;
978     }
979     // Otherwise update and use the cache.
980     iter->second = now;
981     CallbackMetricReporter reporter(callback_gauge_state->ot_plugin,
982                                     registered_metric_callback);
983     registered_metric_callback->Run(reporter);
984     callback_gauge_state->Observe(result, elem.second);
985   }
986 }
987 
GetClientCallTracer(const grpc_core::Slice & path,bool registered_method,std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config)988 grpc_core::ClientCallTracer* OpenTelemetryPluginImpl::GetClientCallTracer(
989     const grpc_core::Slice& path, bool registered_method,
990     std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config) {
991   return grpc_core::GetContext<grpc_core::Arena>()
992       ->ManagedNew<ClientCallTracer>(
993           path, grpc_core::GetContext<grpc_core::Arena>(), registered_method,
994           this,
995           std::static_pointer_cast<OpenTelemetryPluginImpl::ClientScopeConfig>(
996               scope_config));
997 }
998 
GetServerCallTracer(std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config)999 grpc_core::ServerCallTracer* OpenTelemetryPluginImpl::GetServerCallTracer(
1000     std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config) {
1001   return grpc_core::GetContext<grpc_core::Arena>()
1002       ->ManagedNew<ServerCallTracer>(
1003           this,
1004           std::static_pointer_cast<OpenTelemetryPluginImpl::ServerScopeConfig>(
1005               scope_config));
1006 }
1007 
IsInstrumentEnabled(grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle) const1008 bool OpenTelemetryPluginImpl::IsInstrumentEnabled(
1009     grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle) const {
1010   return !absl::holds_alternative<Disabled>(
1011       instruments_data_.at(handle.index).instrument);
1012 }
1013 
AddToChannelArguments(grpc::ChannelArguments * args)1014 void OpenTelemetryPluginImpl::AddToChannelArguments(
1015     grpc::ChannelArguments* args) {
1016   const grpc_channel_args c_args = args->c_channel_args();
1017   auto* stats_plugin_list = grpc_channel_args_find_pointer<
1018       std::shared_ptr<std::vector<std::shared_ptr<grpc_core::StatsPlugin>>>>(
1019       &c_args, GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS);
1020   if (stats_plugin_list != nullptr) {
1021     (*stats_plugin_list)->emplace_back(shared_from_this());
1022   } else {
1023     auto stats_plugin_list = std::make_shared<
1024         std::vector<std::shared_ptr<grpc_core::StatsPlugin>>>();
1025     args->SetPointerWithVtable(
1026         GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS, &stats_plugin_list,
1027         grpc_core::ChannelArgTypeTraits<decltype(stats_plugin_list)>::VTable());
1028     stats_plugin_list->emplace_back(shared_from_this());
1029   }
1030 }
1031 
AddToServerBuilder(grpc::ServerBuilder * builder)1032 void OpenTelemetryPluginImpl::AddToServerBuilder(grpc::ServerBuilder* builder) {
1033   builder->SetOption(std::make_unique<ServerBuilderOption>(shared_from_this()));
1034 }
1035 
1036 }  // namespace internal
1037 
1038 constexpr absl::string_view
1039     OpenTelemetryPluginBuilder::kClientAttemptStartedInstrumentName;
1040 constexpr absl::string_view
1041     OpenTelemetryPluginBuilder::kClientAttemptDurationInstrumentName;
1042 constexpr absl::string_view OpenTelemetryPluginBuilder::
1043     kClientAttemptSentTotalCompressedMessageSizeInstrumentName;
1044 constexpr absl::string_view OpenTelemetryPluginBuilder::
1045     kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName;
1046 constexpr absl::string_view
1047     OpenTelemetryPluginBuilder::kServerCallStartedInstrumentName;
1048 constexpr absl::string_view
1049     OpenTelemetryPluginBuilder::kServerCallDurationInstrumentName;
1050 constexpr absl::string_view OpenTelemetryPluginBuilder::
1051     kServerCallSentTotalCompressedMessageSizeInstrumentName;
1052 constexpr absl::string_view OpenTelemetryPluginBuilder::
1053     kServerCallRcvdTotalCompressedMessageSizeInstrumentName;
1054 
1055 //
1056 // OpenTelemetryPluginBuilder
1057 //
1058 
OpenTelemetryPluginBuilder()1059 OpenTelemetryPluginBuilder::OpenTelemetryPluginBuilder()
1060     : impl_(std::make_unique<internal::OpenTelemetryPluginBuilderImpl>()) {}
1061 
1062 OpenTelemetryPluginBuilder::~OpenTelemetryPluginBuilder() = default;
1063 
SetMeterProvider(std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider)1064 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetMeterProvider(
1065     std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) {
1066   impl_->SetMeterProvider(std::move(meter_provider));
1067   return *this;
1068 }
1069 
1070 OpenTelemetryPluginBuilder&
SetTargetAttributeFilter(absl::AnyInvocable<bool (absl::string_view)const> target_attribute_filter)1071 OpenTelemetryPluginBuilder::SetTargetAttributeFilter(
1072     absl::AnyInvocable<bool(absl::string_view /*target*/) const>
1073         target_attribute_filter) {
1074   impl_->SetTargetAttributeFilter(std::move(target_attribute_filter));
1075   return *this;
1076 }
1077 
1078 OpenTelemetryPluginBuilder&
SetGenericMethodAttributeFilter(absl::AnyInvocable<bool (absl::string_view)const> generic_method_attribute_filter)1079 OpenTelemetryPluginBuilder::SetGenericMethodAttributeFilter(
1080     absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
1081         generic_method_attribute_filter) {
1082   impl_->SetGenericMethodAttributeFilter(
1083       std::move(generic_method_attribute_filter));
1084   return *this;
1085 }
1086 
EnableMetrics(absl::Span<const absl::string_view> metric_names)1087 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::EnableMetrics(
1088     absl::Span<const absl::string_view> metric_names) {
1089   impl_->EnableMetrics(metric_names);
1090   return *this;
1091 }
1092 
DisableMetrics(absl::Span<const absl::string_view> metric_names)1093 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::DisableMetrics(
1094     absl::Span<const absl::string_view> metric_names) {
1095   impl_->DisableMetrics(metric_names);
1096   return *this;
1097 }
1098 
DisableAllMetrics()1099 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::DisableAllMetrics() {
1100   impl_->DisableAllMetrics();
1101   return *this;
1102 }
1103 
AddPluginOption(std::unique_ptr<OpenTelemetryPluginOption> option)1104 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::AddPluginOption(
1105     std::unique_ptr<OpenTelemetryPluginOption> option) {
1106   impl_->AddPluginOption(
1107       std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>(
1108           static_cast<grpc::internal::InternalOpenTelemetryPluginOption*>(
1109               option.release())));
1110   return *this;
1111 }
1112 
AddOptionalLabel(absl::string_view optional_label_key)1113 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::AddOptionalLabel(
1114     absl::string_view optional_label_key) {
1115   impl_->AddOptionalLabel(optional_label_key);
1116   return *this;
1117 }
1118 
SetChannelScopeFilter(absl::AnyInvocable<bool (const ChannelScope &)const> channel_scope_filter)1119 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetChannelScopeFilter(
1120     absl::AnyInvocable<bool(const ChannelScope& /*scope*/) const>
1121         channel_scope_filter) {
1122   impl_->SetChannelScopeFilter(std::move(channel_scope_filter));
1123   return *this;
1124 }
1125 
BuildAndRegisterGlobal()1126 absl::Status OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
1127   return impl_->BuildAndRegisterGlobal();
1128 }
1129 
1130 absl::StatusOr<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>>
Build()1131 OpenTelemetryPluginBuilder::Build() {
1132   return impl_->Build();
1133 }
1134 
1135 }  // namespace grpc
1136