1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/cpp/ext/otel/otel_client_call_tracer.h"
20
21 #include <grpc/status.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpc/support/time.h>
24 #include <stdint.h>
25
26 #include <array>
27 #include <functional>
28 #include <memory>
29 #include <string>
30 #include <utility>
31
32 #include "absl/functional/any_invocable.h"
33 #include "absl/log/check.h"
34 #include "absl/status/status.h"
35 #include "absl/strings/str_format.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/strings/strip.h"
38 #include "absl/time/clock.h"
39 #include "absl/time/time.h"
40 #include "absl/types/optional.h"
41 #include "absl/types/span.h"
42 #include "opentelemetry/context/context.h"
43 #include "opentelemetry/metrics/sync_instruments.h"
44 #include "src/core/client_channel/client_channel_filter.h"
45 #include "src/core/lib/channel/channel_stack.h"
46 #include "src/core/lib/channel/status_util.h"
47 #include "src/core/lib/experiments/experiments.h"
48 #include "src/core/lib/promise/context.h"
49 #include "src/core/lib/resource_quota/arena.h"
50 #include "src/core/lib/slice/slice.h"
51 #include "src/core/lib/slice/slice_buffer.h"
52 #include "src/core/lib/transport/metadata_batch.h"
53 #include "src/core/telemetry/tcp_tracer.h"
54 #include "src/core/util/sync.h"
55 #include "src/cpp/ext/otel/key_value_iterable.h"
56 #include "src/cpp/ext/otel/otel_plugin.h"
57
58 namespace grpc {
59 namespace internal {
60
61 //
62 // OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer
63 //
64
CallAttemptTracer(const OpenTelemetryPluginImpl::ClientCallTracer * parent,bool arena_allocated)65 OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::CallAttemptTracer(
66 const OpenTelemetryPluginImpl::ClientCallTracer* parent,
67 bool arena_allocated)
68 : parent_(parent),
69 arena_allocated_(arena_allocated),
70 start_time_(absl::Now()) {
71 if (parent_->otel_plugin_->client_.attempt.started != nullptr) {
72 std::array<std::pair<absl::string_view, absl::string_view>, 2>
73 additional_labels = {
74 {{OpenTelemetryMethodKey(), parent_->MethodForStats()},
75 {OpenTelemetryTargetKey(),
76 parent_->scope_config_->filtered_target()}}};
77 // We might not have all the injected labels that we want at this point, so
78 // avoid recording a subset of injected labels here.
79 parent_->otel_plugin_->client_.attempt.started->Add(
80 1, KeyValueIterable(
81 /*injected_labels_from_plugin_options=*/{}, additional_labels,
82 /*active_plugin_options_view=*/nullptr,
83 /*optional_labels=*/{},
84 /*is_client=*/true, parent_->otel_plugin_));
85 }
86 }
87
88 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)89 RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
90 if (recv_initial_metadata != nullptr &&
91 recv_initial_metadata->get(grpc_core::GrpcTrailersOnly())
92 .value_or(false)) {
93 is_trailers_only_ = true;
94 return;
95 }
96 PopulateLabelInjectors(recv_initial_metadata);
97 }
98
99 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)100 RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
101 parent_->scope_config_->active_plugin_options_view().ForEach(
102 [&](const InternalOpenTelemetryPluginOption& plugin_option,
103 size_t /*index*/) {
104 auto* labels_injector = plugin_option.labels_injector();
105 if (labels_injector != nullptr) {
106 labels_injector->AddLabels(send_initial_metadata, nullptr);
107 }
108 return true;
109 },
110 parent_->otel_plugin_);
111 }
112
113 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordSendMessage(const grpc_core::SliceBuffer & send_message)114 RecordSendMessage(const grpc_core::SliceBuffer& send_message) {
115 RecordAnnotation(
116 absl::StrFormat("Send message: %ld bytes", send_message.Length()));
117 }
118
119 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordSendCompressedMessage(const grpc_core::SliceBuffer & send_compressed_message)120 RecordSendCompressedMessage(
121 const grpc_core::SliceBuffer& send_compressed_message) {
122 RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
123 send_compressed_message.Length()));
124 }
125
126 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedMessage(const grpc_core::SliceBuffer & recv_message)127 RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message) {
128 RecordAnnotation(
129 absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
130 }
131
132 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedDecompressedMessage(const grpc_core::SliceBuffer & recv_decompressed_message)133 RecordReceivedDecompressedMessage(
134 const grpc_core::SliceBuffer& recv_decompressed_message) {
135 RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
136 recv_decompressed_message.Length()));
137 }
138
139 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats * transport_stream_stats)140 RecordReceivedTrailingMetadata(
141 absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
142 const grpc_transport_stream_stats* transport_stream_stats) {
143 if (is_trailers_only_) {
144 PopulateLabelInjectors(recv_trailing_metadata);
145 }
146 std::array<std::pair<absl::string_view, absl::string_view>, 3>
147 additional_labels = {
148 {{OpenTelemetryMethodKey(), parent_->MethodForStats()},
149 {OpenTelemetryTargetKey(),
150 parent_->scope_config_->filtered_target()},
151 {OpenTelemetryStatusKey(),
152 grpc_status_code_to_string(
153 static_cast<grpc_status_code>(status.code()))}}};
154 KeyValueIterable labels(
155 injected_labels_from_plugin_options_, additional_labels,
156 &parent_->scope_config_->active_plugin_options_view(), optional_labels_,
157 /*is_client=*/true, parent_->otel_plugin_);
158 if (parent_->otel_plugin_->client_.attempt.duration != nullptr) {
159 parent_->otel_plugin_->client_.attempt.duration->Record(
160 absl::ToDoubleSeconds(absl::Now() - start_time_), labels,
161 opentelemetry::context::Context{});
162 }
163 uint64_t outgoing_bytes = 0;
164 uint64_t incoming_bytes = 0;
165 if (grpc_core::IsCallTracerInTransportEnabled()) {
166 outgoing_bytes = outgoing_bytes_.load();
167 incoming_bytes = incoming_bytes_.load();
168 } else if (transport_stream_stats != nullptr) {
169 outgoing_bytes = transport_stream_stats->outgoing.data_bytes;
170 incoming_bytes = transport_stream_stats->incoming.data_bytes;
171 }
172 if (parent_->otel_plugin_->client_.attempt
173 .sent_total_compressed_message_size != nullptr) {
174 parent_->otel_plugin_->client_.attempt.sent_total_compressed_message_size
175 ->Record(outgoing_bytes, labels, opentelemetry::context::Context{});
176 }
177 if (parent_->otel_plugin_->client_.attempt
178 .rcvd_total_compressed_message_size != nullptr) {
179 parent_->otel_plugin_->client_.attempt.rcvd_total_compressed_message_size
180 ->Record(incoming_bytes, labels, opentelemetry::context::Context{});
181 }
182 }
183
184 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordIncomingBytes(const TransportByteSize & transport_byte_size)185 RecordIncomingBytes(const TransportByteSize& transport_byte_size) {
186 incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
187 }
188
189 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)190 RecordOutgoingBytes(const TransportByteSize& transport_byte_size) {
191 outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
192 }
193
RecordCancel(absl::Status)194 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::RecordCancel(
195 absl::Status /*cancel_error*/) {}
196
RecordEnd(const gpr_timespec &)197 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::RecordEnd(
198 const gpr_timespec& /*latency*/) {
199 if (arena_allocated_) {
200 this->~CallAttemptTracer();
201 } else {
202 delete this;
203 }
204 }
205
206 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordAnnotation(absl::string_view)207 RecordAnnotation(absl::string_view /*annotation*/) {
208 // Not implemented
209 }
210
211 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordAnnotation(const Annotation &)212 RecordAnnotation(const Annotation& /*annotation*/) {
213 // Not implemented
214 }
215
216 std::shared_ptr<grpc_core::TcpTracerInterface> OpenTelemetryPluginImpl::
StartNewTcpTrace()217 ClientCallTracer::CallAttemptTracer::StartNewTcpTrace() {
218 // No TCP trace.
219 return nullptr;
220 }
221
222 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
SetOptionalLabel(OptionalLabelKey key,grpc_core::RefCountedStringValue value)223 SetOptionalLabel(OptionalLabelKey key,
224 grpc_core::RefCountedStringValue value) {
225 CHECK(key < OptionalLabelKey::kSize);
226 optional_labels_[static_cast<size_t>(key)] = std::move(value);
227 }
228
229 void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
PopulateLabelInjectors(grpc_metadata_batch * metadata)230 PopulateLabelInjectors(grpc_metadata_batch* metadata) {
231 parent_->scope_config_->active_plugin_options_view().ForEach(
232 [&](const InternalOpenTelemetryPluginOption& plugin_option,
233 size_t /*index*/) {
234 auto* labels_injector = plugin_option.labels_injector();
235 if (labels_injector != nullptr) {
236 injected_labels_from_plugin_options_.push_back(
237 labels_injector->GetLabels(metadata));
238 }
239 return true;
240 },
241 parent_->otel_plugin_);
242 }
243
244 //
245 // OpenTelemetryPluginImpl::ClientCallTracer
246 //
247
ClientCallTracer(const grpc_core::Slice & path,grpc_core::Arena * arena,bool registered_method,OpenTelemetryPluginImpl * otel_plugin,std::shared_ptr<OpenTelemetryPluginImpl::ClientScopeConfig> scope_config)248 OpenTelemetryPluginImpl::ClientCallTracer::ClientCallTracer(
249 const grpc_core::Slice& path, grpc_core::Arena* arena,
250 bool registered_method, OpenTelemetryPluginImpl* otel_plugin,
251 std::shared_ptr<OpenTelemetryPluginImpl::ClientScopeConfig> scope_config)
252 : path_(path.Ref()),
253 arena_(arena),
254 registered_method_(registered_method),
255 otel_plugin_(otel_plugin),
256 scope_config_(std::move(scope_config)) {}
257
~ClientCallTracer()258 OpenTelemetryPluginImpl::ClientCallTracer::~ClientCallTracer() {}
259
260 OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer*
StartNewAttempt(bool is_transparent_retry)261 OpenTelemetryPluginImpl::ClientCallTracer::StartNewAttempt(
262 bool is_transparent_retry) {
263 // We allocate the first attempt on the arena and all subsequent attempts
264 // on the heap, so that in the common case we don't require a heap
265 // allocation, nor do we unnecessarily grow the arena.
266 bool is_first_attempt = true;
267 {
268 grpc_core::MutexLock lock(&mu_);
269 if (transparent_retries_ != 0 || retries_ != 0) {
270 is_first_attempt = false;
271 }
272 if (is_transparent_retry) {
273 ++transparent_retries_;
274 } else {
275 ++retries_;
276 }
277 }
278 if (is_first_attempt) {
279 return arena_->New<CallAttemptTracer>(this, /*arena_allocated=*/true);
280 }
281 return new CallAttemptTracer(this, /*arena_allocated=*/false);
282 }
283
MethodForStats() const284 absl::string_view OpenTelemetryPluginImpl::ClientCallTracer::MethodForStats()
285 const {
286 absl::string_view method = absl::StripPrefix(path_.as_string_view(), "/");
287 if (registered_method_ ||
288 (otel_plugin_->generic_method_attribute_filter() != nullptr &&
289 otel_plugin_->generic_method_attribute_filter()(method))) {
290 return method;
291 }
292 return "other";
293 }
294
RecordAnnotation(absl::string_view)295 void OpenTelemetryPluginImpl::ClientCallTracer::RecordAnnotation(
296 absl::string_view /*annotation*/) {
297 // Not implemented
298 }
299
RecordAnnotation(const Annotation &)300 void OpenTelemetryPluginImpl::ClientCallTracer::RecordAnnotation(
301 const Annotation& /*annotation*/) {
302 // Not implemented
303 }
304
305 } // namespace internal
306 } // namespace grpc
307