• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/cpp/ext/otel/otel_client_call_tracer.h"
20 
21 #include <grpc/status.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpc/support/time.h>
24 #include <stdint.h>
25 
26 #include <array>
27 #include <functional>
28 #include <memory>
29 #include <string>
30 #include <utility>
31 
32 #include "absl/functional/any_invocable.h"
33 #include "absl/log/check.h"
34 #include "absl/status/status.h"
35 #include "absl/strings/str_format.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/strings/strip.h"
38 #include "absl/time/clock.h"
39 #include "absl/time/time.h"
40 #include "absl/types/optional.h"
41 #include "absl/types/span.h"
42 #include "opentelemetry/context/context.h"
43 #include "opentelemetry/metrics/sync_instruments.h"
44 #include "src/core/client_channel/client_channel_filter.h"
45 #include "src/core/lib/channel/channel_stack.h"
46 #include "src/core/lib/channel/status_util.h"
47 #include "src/core/lib/experiments/experiments.h"
48 #include "src/core/lib/promise/context.h"
49 #include "src/core/lib/resource_quota/arena.h"
50 #include "src/core/lib/slice/slice.h"
51 #include "src/core/lib/slice/slice_buffer.h"
52 #include "src/core/lib/transport/metadata_batch.h"
53 #include "src/core/telemetry/tcp_tracer.h"
54 #include "src/core/util/sync.h"
55 #include "src/cpp/ext/otel/key_value_iterable.h"
56 #include "src/cpp/ext/otel/otel_plugin.h"
57 
58 namespace grpc {
59 namespace internal {
60 
61 //
62 // OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer
63 //
64 
CallAttemptTracer(const OpenTelemetryPluginImpl::ClientCallTracer * parent,bool arena_allocated)65 OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::CallAttemptTracer(
66     const OpenTelemetryPluginImpl::ClientCallTracer* parent,
67     bool arena_allocated)
68     : parent_(parent),
69       arena_allocated_(arena_allocated),
70       start_time_(absl::Now()) {
71   if (parent_->otel_plugin_->client_.attempt.started != nullptr) {
72     std::array<std::pair<absl::string_view, absl::string_view>, 2>
73         additional_labels = {
74             {{OpenTelemetryMethodKey(), parent_->MethodForStats()},
75              {OpenTelemetryTargetKey(),
76               parent_->scope_config_->filtered_target()}}};
77     // We might not have all the injected labels that we want at this point, so
78     // avoid recording a subset of injected labels here.
79     parent_->otel_plugin_->client_.attempt.started->Add(
80         1, KeyValueIterable(
81                /*injected_labels_from_plugin_options=*/{}, additional_labels,
82                /*active_plugin_options_view=*/nullptr,
83                /*optional_labels=*/{},
84                /*is_client=*/true, parent_->otel_plugin_));
85   }
86 }
87 
88 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)89     RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
90   if (recv_initial_metadata != nullptr &&
91       recv_initial_metadata->get(grpc_core::GrpcTrailersOnly())
92           .value_or(false)) {
93     is_trailers_only_ = true;
94     return;
95   }
96   PopulateLabelInjectors(recv_initial_metadata);
97 }
98 
99 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)100     RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
101   parent_->scope_config_->active_plugin_options_view().ForEach(
102       [&](const InternalOpenTelemetryPluginOption& plugin_option,
103           size_t /*index*/) {
104         auto* labels_injector = plugin_option.labels_injector();
105         if (labels_injector != nullptr) {
106           labels_injector->AddLabels(send_initial_metadata, nullptr);
107         }
108         return true;
109       },
110       parent_->otel_plugin_);
111 }
112 
113 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordSendMessage(const grpc_core::SliceBuffer & send_message)114     RecordSendMessage(const grpc_core::SliceBuffer& send_message) {
115   RecordAnnotation(
116       absl::StrFormat("Send message: %ld bytes", send_message.Length()));
117 }
118 
119 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordSendCompressedMessage(const grpc_core::SliceBuffer & send_compressed_message)120     RecordSendCompressedMessage(
121         const grpc_core::SliceBuffer& send_compressed_message) {
122   RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
123                                    send_compressed_message.Length()));
124 }
125 
126 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedMessage(const grpc_core::SliceBuffer & recv_message)127     RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message) {
128   RecordAnnotation(
129       absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
130 }
131 
132 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedDecompressedMessage(const grpc_core::SliceBuffer & recv_decompressed_message)133     RecordReceivedDecompressedMessage(
134         const grpc_core::SliceBuffer& recv_decompressed_message) {
135   RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
136                                    recv_decompressed_message.Length()));
137 }
138 
139 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats * transport_stream_stats)140     RecordReceivedTrailingMetadata(
141         absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
142         const grpc_transport_stream_stats* transport_stream_stats) {
143   if (is_trailers_only_) {
144     PopulateLabelInjectors(recv_trailing_metadata);
145   }
146   std::array<std::pair<absl::string_view, absl::string_view>, 3>
147       additional_labels = {
148           {{OpenTelemetryMethodKey(), parent_->MethodForStats()},
149            {OpenTelemetryTargetKey(),
150             parent_->scope_config_->filtered_target()},
151            {OpenTelemetryStatusKey(),
152             grpc_status_code_to_string(
153                 static_cast<grpc_status_code>(status.code()))}}};
154   KeyValueIterable labels(
155       injected_labels_from_plugin_options_, additional_labels,
156       &parent_->scope_config_->active_plugin_options_view(), optional_labels_,
157       /*is_client=*/true, parent_->otel_plugin_);
158   if (parent_->otel_plugin_->client_.attempt.duration != nullptr) {
159     parent_->otel_plugin_->client_.attempt.duration->Record(
160         absl::ToDoubleSeconds(absl::Now() - start_time_), labels,
161         opentelemetry::context::Context{});
162   }
163   uint64_t outgoing_bytes = 0;
164   uint64_t incoming_bytes = 0;
165   if (grpc_core::IsCallTracerInTransportEnabled()) {
166     outgoing_bytes = outgoing_bytes_.load();
167     incoming_bytes = incoming_bytes_.load();
168   } else if (transport_stream_stats != nullptr) {
169     outgoing_bytes = transport_stream_stats->outgoing.data_bytes;
170     incoming_bytes = transport_stream_stats->incoming.data_bytes;
171   }
172   if (parent_->otel_plugin_->client_.attempt
173           .sent_total_compressed_message_size != nullptr) {
174     parent_->otel_plugin_->client_.attempt.sent_total_compressed_message_size
175         ->Record(outgoing_bytes, labels, opentelemetry::context::Context{});
176   }
177   if (parent_->otel_plugin_->client_.attempt
178           .rcvd_total_compressed_message_size != nullptr) {
179     parent_->otel_plugin_->client_.attempt.rcvd_total_compressed_message_size
180         ->Record(incoming_bytes, labels, opentelemetry::context::Context{});
181   }
182 }
183 
184 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordIncomingBytes(const TransportByteSize & transport_byte_size)185     RecordIncomingBytes(const TransportByteSize& transport_byte_size) {
186   incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
187 }
188 
189 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)190     RecordOutgoingBytes(const TransportByteSize& transport_byte_size) {
191   outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
192 }
193 
RecordCancel(absl::Status)194 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::RecordCancel(
195     absl::Status /*cancel_error*/) {}
196 
RecordEnd(const gpr_timespec &)197 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::RecordEnd(
198     const gpr_timespec& /*latency*/) {
199   if (arena_allocated_) {
200     this->~CallAttemptTracer();
201   } else {
202     delete this;
203   }
204 }
205 
206 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordAnnotation(absl::string_view)207     RecordAnnotation(absl::string_view /*annotation*/) {
208   // Not implemented
209 }
210 
211 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordAnnotation(const Annotation &)212     RecordAnnotation(const Annotation& /*annotation*/) {
213   // Not implemented
214 }
215 
216 std::shared_ptr<grpc_core::TcpTracerInterface> OpenTelemetryPluginImpl::
StartNewTcpTrace()217     ClientCallTracer::CallAttemptTracer::StartNewTcpTrace() {
218   // No TCP trace.
219   return nullptr;
220 }
221 
222 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
SetOptionalLabel(OptionalLabelKey key,grpc_core::RefCountedStringValue value)223     SetOptionalLabel(OptionalLabelKey key,
224                      grpc_core::RefCountedStringValue value) {
225   CHECK(key < OptionalLabelKey::kSize);
226   optional_labels_[static_cast<size_t>(key)] = std::move(value);
227 }
228 
229 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
PopulateLabelInjectors(grpc_metadata_batch * metadata)230     PopulateLabelInjectors(grpc_metadata_batch* metadata) {
231   parent_->scope_config_->active_plugin_options_view().ForEach(
232       [&](const InternalOpenTelemetryPluginOption& plugin_option,
233           size_t /*index*/) {
234         auto* labels_injector = plugin_option.labels_injector();
235         if (labels_injector != nullptr) {
236           injected_labels_from_plugin_options_.push_back(
237               labels_injector->GetLabels(metadata));
238         }
239         return true;
240       },
241       parent_->otel_plugin_);
242 }
243 
244 //
245 // OpenTelemetryPluginImpl::ClientCallTracer
246 //
247 
ClientCallTracer(const grpc_core::Slice & path,grpc_core::Arena * arena,bool registered_method,OpenTelemetryPluginImpl * otel_plugin,std::shared_ptr<OpenTelemetryPluginImpl::ClientScopeConfig> scope_config)248 OpenTelemetryPluginImpl::ClientCallTracer::ClientCallTracer(
249     const grpc_core::Slice& path, grpc_core::Arena* arena,
250     bool registered_method, OpenTelemetryPluginImpl* otel_plugin,
251     std::shared_ptr<OpenTelemetryPluginImpl::ClientScopeConfig> scope_config)
252     : path_(path.Ref()),
253       arena_(arena),
254       registered_method_(registered_method),
255       otel_plugin_(otel_plugin),
256       scope_config_(std::move(scope_config)) {}
257 
~ClientCallTracer()258 OpenTelemetryPluginImpl::ClientCallTracer::~ClientCallTracer() {}
259 
260 OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer*
StartNewAttempt(bool is_transparent_retry)261 OpenTelemetryPluginImpl::ClientCallTracer::StartNewAttempt(
262     bool is_transparent_retry) {
263   // We allocate the first attempt on the arena and all subsequent attempts
264   // on the heap, so that in the common case we don't require a heap
265   // allocation, nor do we unnecessarily grow the arena.
266   bool is_first_attempt = true;
267   {
268     grpc_core::MutexLock lock(&mu_);
269     if (transparent_retries_ != 0 || retries_ != 0) {
270       is_first_attempt = false;
271     }
272     if (is_transparent_retry) {
273       ++transparent_retries_;
274     } else {
275       ++retries_;
276     }
277   }
278   if (is_first_attempt) {
279     return arena_->New<CallAttemptTracer>(this, /*arena_allocated=*/true);
280   }
281   return new CallAttemptTracer(this, /*arena_allocated=*/false);
282 }
283 
MethodForStats() const284 absl::string_view OpenTelemetryPluginImpl::ClientCallTracer::MethodForStats()
285     const {
286   absl::string_view method = absl::StripPrefix(path_.as_string_view(), "/");
287   if (registered_method_ ||
288       (otel_plugin_->generic_method_attribute_filter() != nullptr &&
289        otel_plugin_->generic_method_attribute_filter()(method))) {
290     return method;
291   }
292   return "other";
293 }
294 
RecordAnnotation(absl::string_view)295 void OpenTelemetryPluginImpl::ClientCallTracer::RecordAnnotation(
296     absl::string_view /*annotation*/) {
297   // Not implemented
298 }
299 
RecordAnnotation(const Annotation &)300 void OpenTelemetryPluginImpl::ClientCallTracer::RecordAnnotation(
301     const Annotation& /*annotation*/) {
302   // Not implemented
303 }
304 
305 }  // namespace internal
306 }  // namespace grpc
307