1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "client_call_tracer.h"
16
17 #include <grpc/slice.h>
18 #include <stddef.h>
19
20 #include <algorithm>
21 #include <vector>
22
23 #include "absl/strings/str_cat.h"
24 #include "absl/time/clock.h"
25 #include "constants.h"
26 #include "metadata_exchange.h"
27 #include "observability_util.h"
28 #include "python_observability_context.h"
29 #include "src/core/lib/experiments/experiments.h"
30 #include "src/core/lib/slice/slice.h"
31
32 namespace grpc_observability {
33
34 constexpr uint32_t PythonOpenCensusCallTracer::
35 PythonOpenCensusCallAttemptTracer::kMaxTraceContextLen;
36 constexpr uint32_t
37 PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::kMaxTagsLen;
38
39 //
40 // OpenCensusCallTracer
41 //
42
PythonOpenCensusCallTracer(const char * method,const char * target,const char * trace_id,const char * parent_span_id,const char * identifier,const std::vector<Label> & exchange_labels,bool tracing_enabled,bool add_csm_optional_labels,bool registered_method)43 PythonOpenCensusCallTracer::PythonOpenCensusCallTracer(
44 const char* method, const char* target, const char* trace_id,
45 const char* parent_span_id, const char* identifier,
46 const std::vector<Label>& exchange_labels, bool tracing_enabled,
47 bool add_csm_optional_labels, bool registered_method)
48 : method_(GetMethod(method)),
49 target_(GetTarget(target)),
50 tracing_enabled_(tracing_enabled),
51 add_csm_optional_labels_(add_csm_optional_labels),
52 labels_injector_(exchange_labels),
53 identifier_(identifier),
54 registered_method_(registered_method) {
55 GenerateClientContext(absl::StrCat("Sent.", method_),
56 absl::string_view(trace_id),
57 absl::string_view(parent_span_id), &context_);
58 }
59
GenerateContext()60 void PythonOpenCensusCallTracer::GenerateContext() {}
61
RecordAnnotation(absl::string_view annotation)62 void PythonOpenCensusCallTracer::RecordAnnotation(
63 absl::string_view annotation) {
64 if (!context_.GetSpanContext().IsSampled()) {
65 return;
66 }
67 context_.AddSpanAnnotation(annotation);
68 }
69
RecordAnnotation(const Annotation & annotation)70 void PythonOpenCensusCallTracer::RecordAnnotation(
71 const Annotation& annotation) {
72 if (!context_.GetSpanContext().IsSampled()) {
73 return;
74 }
75
76 switch (annotation.type()) {
77 // Annotations are expensive to create. We should only create it if the call
78 // is being sampled by default.
79 default:
80 if (IsSampled()) {
81 context_.AddSpanAnnotation(annotation.ToString());
82 }
83 break;
84 }
85 }
86
~PythonOpenCensusCallTracer()87 PythonOpenCensusCallTracer::~PythonOpenCensusCallTracer() {
88 if (PythonCensusStatsEnabled()) {
89 context_.Labels().emplace_back(kClientMethod, method_);
90 RecordIntMetric(kRpcClientRetriesPerCallMeasureName, retries_ - 1,
91 context_.Labels(), identifier_, registered_method_,
92 /*include_exchange_labels=*/true); // exclude first attempt
93 RecordIntMetric(kRpcClientTransparentRetriesPerCallMeasureName,
94 transparent_retries_, context_.Labels(), identifier_,
95 registered_method_, /*include_exchange_labels=*/true);
96 RecordDoubleMetric(kRpcClientRetryDelayPerCallMeasureName,
97 ToDoubleSeconds(retry_delay_), context_.Labels(),
98 identifier_, registered_method_,
99 /*include_exchange_labels=*/true);
100 }
101
102 if (tracing_enabled_) {
103 context_.EndSpan();
104 if (IsSampled()) {
105 RecordSpan(context_.GetSpan().ToCensusData());
106 }
107 }
108 }
109
110 PythonCensusContext
CreateCensusContextForCallAttempt()111 PythonOpenCensusCallTracer::CreateCensusContextForCallAttempt() {
112 auto context = PythonCensusContext(absl::StrCat("Attempt.", method_),
113 &(context_.GetSpan()), context_.Labels());
114 return context;
115 }
116
117 PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer*
StartNewAttempt(bool is_transparent_retry)118 PythonOpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
119 uint64_t attempt_num;
120 {
121 grpc_core::MutexLock lock(&mu_);
122 if (transparent_retries_ != 0 || retries_ != 0) {
123 if (PythonCensusStatsEnabled() && num_active_rpcs_ == 0) {
124 retry_delay_ += absl::Now() - time_at_last_attempt_end_;
125 }
126 }
127 attempt_num = retries_;
128 if (is_transparent_retry) {
129 ++transparent_retries_;
130 } else {
131 ++retries_;
132 }
133 ++num_active_rpcs_;
134 }
135 context_.IncreaseChildSpanCount();
136 return new PythonOpenCensusCallAttemptTracer(this, attempt_num,
137 is_transparent_retry);
138 }
139
140 //
141 // PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer
142 //
143
144 PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
PythonOpenCensusCallAttemptTracer(PythonOpenCensusCallTracer * parent,uint64_t attempt_num,bool is_transparent_retry)145 PythonOpenCensusCallAttemptTracer(PythonOpenCensusCallTracer* parent,
146 uint64_t attempt_num,
147 bool is_transparent_retry)
148 : parent_(parent),
149 context_(parent_->CreateCensusContextForCallAttempt()),
150 start_time_(absl::Now()) {
151 if (parent_->tracing_enabled_) {
152 context_.AddSpanAttribute("previous-rpc-attempts",
153 absl::StrCat(attempt_num));
154 context_.AddSpanAttribute("transparent-retry",
155 absl::StrCat(is_transparent_retry));
156 }
157 if (!PythonCensusStatsEnabled()) {
158 return;
159 }
160 context_.Labels().emplace_back(kClientMethod, parent_->method_);
161 context_.Labels().emplace_back(kClientTarget, parent_->target_);
162 RecordIntMetric(kRpcClientStartedRpcsMeasureName, 1, context_.Labels(),
163 parent_->identifier_, parent_->registered_method_,
164 /*include_exchange_labels=*/false);
165 }
166
167 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)168 RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
169 if (parent_->tracing_enabled_) {
170 char tracing_buf[kMaxTraceContextLen];
171 size_t tracing_len =
172 TraceContextSerialize(context_, tracing_buf, kMaxTraceContextLen);
173 if (tracing_len > 0) {
174 send_initial_metadata->Set(
175 grpc_core::GrpcTraceBinMetadata(),
176 grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len));
177 }
178 }
179 if (!PythonCensusStatsEnabled()) {
180 return;
181 }
182 grpc_slice tags = grpc_empty_slice();
183 size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
184 if (encoded_tags_len > 0) {
185 send_initial_metadata->Set(grpc_core::GrpcTagsBinMetadata(),
186 grpc_core::Slice(tags));
187 }
188 parent_->labels_injector_.AddExchangeLabelsToMetadata(send_initial_metadata);
189 }
190
191 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)192 RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
193 if (recv_initial_metadata != nullptr &&
194 recv_initial_metadata->get(grpc_core::GrpcTrailersOnly())
195 .value_or(false)) {
196 is_trailers_only_ = true;
197 return;
198 }
199 labels_from_peer_ =
200 parent_->labels_injector_.GetExchangeLabels(recv_initial_metadata);
201 }
202
203 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordSendMessage(const grpc_core::SliceBuffer &)204 RecordSendMessage(const grpc_core::SliceBuffer& /*send_message*/) {
205 ++sent_message_count_;
206 }
207
208 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordReceivedMessage(const grpc_core::SliceBuffer &)209 RecordReceivedMessage(const grpc_core::SliceBuffer& /*recv_message*/) {
210 ++recv_message_count_;
211 }
212
213 std::shared_ptr<grpc_core::TcpTracerInterface> PythonOpenCensusCallTracer::
StartNewTcpTrace()214 PythonOpenCensusCallAttemptTracer::StartNewTcpTrace() {
215 return nullptr;
216 }
217
218 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
SetOptionalLabel(OptionalLabelKey key,grpc_core::RefCountedStringValue value)219 SetOptionalLabel(OptionalLabelKey key,
220 grpc_core::RefCountedStringValue value) {
221 optional_labels_array_[static_cast<size_t>(key)] = std::move(value);
222 }
223
224 namespace {
225
226 // Returns 0 if no server stats are present in the metadata.
GetElapsedTimeFromTrailingMetadata(const grpc_metadata_batch * b)227 uint64_t GetElapsedTimeFromTrailingMetadata(const grpc_metadata_batch* b) {
228 if (!PythonCensusStatsEnabled()) {
229 return 0;
230 }
231
232 const grpc_core::Slice* grpc_server_stats_bin_ptr =
233 b->get_pointer(grpc_core::GrpcServerStatsBinMetadata());
234 if (grpc_server_stats_bin_ptr == nullptr) {
235 return 0;
236 }
237
238 uint64_t elapsed_time = 0;
239 ServerStatsDeserialize(
240 reinterpret_cast<const char*>(grpc_server_stats_bin_ptr->data()),
241 grpc_server_stats_bin_ptr->size(), &elapsed_time);
242 return elapsed_time;
243 }
244
245 } // namespace
246
247 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats * transport_stream_stats)248 RecordReceivedTrailingMetadata(
249 absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
250 const grpc_transport_stream_stats* transport_stream_stats) {
251 if (!PythonCensusStatsEnabled()) {
252 return;
253 }
254 if (is_trailers_only_) {
255 labels_from_peer_ =
256 parent_->labels_injector_.GetExchangeLabels(recv_trailing_metadata);
257 }
258 auto status_code_ = status.code();
259 uint64_t elapsed_time = 0;
260 if (recv_trailing_metadata != nullptr) {
261 elapsed_time = GetElapsedTimeFromTrailingMetadata(recv_trailing_metadata);
262 }
263
264 std::string final_status = absl::StatusCodeToString(status_code_);
265 context_.Labels().emplace_back(kClientMethod, parent_->method_);
266 context_.Labels().emplace_back(kClientTarget, parent_->target_);
267 context_.Labels().emplace_back(kClientStatus, final_status);
268 if (parent_->add_csm_optional_labels_) {
269 parent_->labels_injector_.AddXdsOptionalLabels(
270 /*is_client=*/true, optional_labels_array_, context_.Labels());
271 }
272 for (const auto& label : labels_from_peer_) {
273 context_.Labels().emplace_back(label);
274 }
275 uint64_t incoming_bytes = 0;
276 uint64_t outgoing_bytes = 0;
277 if (grpc_core::IsCallTracerInTransportEnabled()) {
278 incoming_bytes = incoming_bytes_.load();
279 outgoing_bytes = outgoing_bytes_.load();
280 } else if (transport_stream_stats != nullptr) {
281 incoming_bytes = transport_stream_stats->incoming.data_bytes;
282 outgoing_bytes = transport_stream_stats->outgoing.data_bytes;
283 }
284 RecordDoubleMetric(kRpcClientSentBytesPerRpcMeasureName,
285 static_cast<double>(outgoing_bytes), context_.Labels(),
286 parent_->identifier_, parent_->registered_method_,
287 /*include_exchange_labels=*/true);
288 RecordDoubleMetric(kRpcClientReceivedBytesPerRpcMeasureName,
289 static_cast<double>(incoming_bytes), context_.Labels(),
290 parent_->identifier_, parent_->registered_method_,
291 /*include_exchange_labels=*/true);
292 RecordDoubleMetric(kRpcClientServerLatencyMeasureName,
293 absl::ToDoubleSeconds(absl::Nanoseconds(elapsed_time)),
294 context_.Labels(), parent_->identifier_,
295 parent_->registered_method_,
296 /*include_exchange_labels=*/true);
297 RecordDoubleMetric(kRpcClientRoundtripLatencyMeasureName,
298 absl::ToDoubleSeconds(absl::Now() - start_time_),
299 context_.Labels(), parent_->identifier_,
300 parent_->registered_method_,
301 /*include_exchange_labels=*/true);
302 RecordIntMetric(kRpcClientCompletedRpcMeasureName, 1, context_.Labels(),
303 parent_->identifier_, parent_->registered_method_,
304 /*include_exchange_labels=*/true);
305 }
306
307 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordIncomingBytes(const TransportByteSize & transport_byte_size)308 RecordIncomingBytes(const TransportByteSize& transport_byte_size) {
309 incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
310 }
311
312 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)313 RecordOutgoingBytes(const TransportByteSize& transport_byte_size) {
314 outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
315 }
316
317 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordCancel(absl::Status)318 RecordCancel(absl::Status /*cancel_error*/) {}
319
RecordEnd(const gpr_timespec &)320 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::RecordEnd(
321 const gpr_timespec& /*latency*/) {
322 if (PythonCensusStatsEnabled()) {
323 context_.Labels().emplace_back(kClientMethod, parent_->method_);
324 context_.Labels().emplace_back(kClientStatus,
325 StatusCodeToString(status_code_));
326 RecordIntMetric(kRpcClientSentMessagesPerRpcMeasureName,
327 sent_message_count_, context_.Labels(),
328 parent_->identifier_, parent_->registered_method_,
329 /*include_exchange_labels=*/true);
330 RecordIntMetric(kRpcClientReceivedMessagesPerRpcMeasureName,
331 recv_message_count_, context_.Labels(),
332 parent_->identifier_, parent_->registered_method_,
333 /*include_exchange_labels=*/true);
334
335 grpc_core::MutexLock lock(&parent_->mu_);
336 if (--parent_->num_active_rpcs_ == 0) {
337 parent_->time_at_last_attempt_end_ = absl::Now();
338 }
339 }
340
341 if (parent_->tracing_enabled_) {
342 if (status_code_ != absl::StatusCode::kOk) {
343 context_.GetSpan().SetStatus(StatusCodeToString(status_code_));
344 }
345 context_.EndSpan();
346 if (IsSampled()) {
347 RecordSpan(context_.GetSpan().ToCensusData());
348 }
349 }
350
351 // After RecordEnd, Core will make no further usage of this CallAttemptTracer,
352 // so we are free it here.
353 delete this;
354 }
355
356 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordAnnotation(absl::string_view annotation)357 RecordAnnotation(absl::string_view annotation) {
358 if (!context_.GetSpanContext().IsSampled()) {
359 return;
360 }
361 context_.AddSpanAnnotation(annotation);
362 }
363
364 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordAnnotation(const Annotation & annotation)365 RecordAnnotation(const Annotation& annotation) {
366 if (!context_.GetSpanContext().IsSampled()) {
367 return;
368 }
369
370 switch (annotation.type()) {
371 // Annotations are expensive to create. We should only create it if the call
372 // is being sampled by default.
373 default:
374 if (IsSampled()) {
375 context_.AddSpanAnnotation(annotation.ToString());
376 }
377 break;
378 }
379 }
380
381 } // namespace grpc_observability
382