• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "client_call_tracer.h"
16 
17 #include <grpc/slice.h>
18 #include <stddef.h>
19 
20 #include <algorithm>
21 #include <vector>
22 
23 #include "absl/strings/str_cat.h"
24 #include "absl/time/clock.h"
25 #include "constants.h"
26 #include "metadata_exchange.h"
27 #include "observability_util.h"
28 #include "python_observability_context.h"
29 #include "src/core/lib/experiments/experiments.h"
30 #include "src/core/lib/slice/slice.h"
31 
32 namespace grpc_observability {
33 
34 constexpr uint32_t PythonOpenCensusCallTracer::
35     PythonOpenCensusCallAttemptTracer::kMaxTraceContextLen;
36 constexpr uint32_t
37     PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::kMaxTagsLen;
38 
39 //
40 // OpenCensusCallTracer
41 //
42 
PythonOpenCensusCallTracer(const char * method,const char * target,const char * trace_id,const char * parent_span_id,const char * identifier,const std::vector<Label> & exchange_labels,bool tracing_enabled,bool add_csm_optional_labels,bool registered_method)43 PythonOpenCensusCallTracer::PythonOpenCensusCallTracer(
44     const char* method, const char* target, const char* trace_id,
45     const char* parent_span_id, const char* identifier,
46     const std::vector<Label>& exchange_labels, bool tracing_enabled,
47     bool add_csm_optional_labels, bool registered_method)
48     : method_(GetMethod(method)),
49       target_(GetTarget(target)),
50       tracing_enabled_(tracing_enabled),
51       add_csm_optional_labels_(add_csm_optional_labels),
52       labels_injector_(exchange_labels),
53       identifier_(identifier),
54       registered_method_(registered_method) {
55   GenerateClientContext(absl::StrCat("Sent.", method_),
56                         absl::string_view(trace_id),
57                         absl::string_view(parent_span_id), &context_);
58 }
59 
GenerateContext()60 void PythonOpenCensusCallTracer::GenerateContext() {}
61 
RecordAnnotation(absl::string_view annotation)62 void PythonOpenCensusCallTracer::RecordAnnotation(
63     absl::string_view annotation) {
64   if (!context_.GetSpanContext().IsSampled()) {
65     return;
66   }
67   context_.AddSpanAnnotation(annotation);
68 }
69 
RecordAnnotation(const Annotation & annotation)70 void PythonOpenCensusCallTracer::RecordAnnotation(
71     const Annotation& annotation) {
72   if (!context_.GetSpanContext().IsSampled()) {
73     return;
74   }
75 
76   switch (annotation.type()) {
77     // Annotations are expensive to create. We should only create it if the call
78     // is being sampled by default.
79     default:
80       if (IsSampled()) {
81         context_.AddSpanAnnotation(annotation.ToString());
82       }
83       break;
84   }
85 }
86 
~PythonOpenCensusCallTracer()87 PythonOpenCensusCallTracer::~PythonOpenCensusCallTracer() {
88   if (PythonCensusStatsEnabled()) {
89     context_.Labels().emplace_back(kClientMethod, method_);
90     RecordIntMetric(kRpcClientRetriesPerCallMeasureName, retries_ - 1,
91                     context_.Labels(), identifier_, registered_method_,
92                     /*include_exchange_labels=*/true);  // exclude first attempt
93     RecordIntMetric(kRpcClientTransparentRetriesPerCallMeasureName,
94                     transparent_retries_, context_.Labels(), identifier_,
95                     registered_method_, /*include_exchange_labels=*/true);
96     RecordDoubleMetric(kRpcClientRetryDelayPerCallMeasureName,
97                        ToDoubleSeconds(retry_delay_), context_.Labels(),
98                        identifier_, registered_method_,
99                        /*include_exchange_labels=*/true);
100   }
101 
102   if (tracing_enabled_) {
103     context_.EndSpan();
104     if (IsSampled()) {
105       RecordSpan(context_.GetSpan().ToCensusData());
106     }
107   }
108 }
109 
110 PythonCensusContext
CreateCensusContextForCallAttempt()111 PythonOpenCensusCallTracer::CreateCensusContextForCallAttempt() {
112   auto context = PythonCensusContext(absl::StrCat("Attempt.", method_),
113                                      &(context_.GetSpan()), context_.Labels());
114   return context;
115 }
116 
117 PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer*
StartNewAttempt(bool is_transparent_retry)118 PythonOpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
119   uint64_t attempt_num;
120   {
121     grpc_core::MutexLock lock(&mu_);
122     if (transparent_retries_ != 0 || retries_ != 0) {
123       if (PythonCensusStatsEnabled() && num_active_rpcs_ == 0) {
124         retry_delay_ += absl::Now() - time_at_last_attempt_end_;
125       }
126     }
127     attempt_num = retries_;
128     if (is_transparent_retry) {
129       ++transparent_retries_;
130     } else {
131       ++retries_;
132     }
133     ++num_active_rpcs_;
134   }
135   context_.IncreaseChildSpanCount();
136   return new PythonOpenCensusCallAttemptTracer(this, attempt_num,
137                                                is_transparent_retry);
138 }
139 
140 //
141 // PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer
142 //
143 
144 PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
PythonOpenCensusCallAttemptTracer(PythonOpenCensusCallTracer * parent,uint64_t attempt_num,bool is_transparent_retry)145     PythonOpenCensusCallAttemptTracer(PythonOpenCensusCallTracer* parent,
146                                       uint64_t attempt_num,
147                                       bool is_transparent_retry)
148     : parent_(parent),
149       context_(parent_->CreateCensusContextForCallAttempt()),
150       start_time_(absl::Now()) {
151   if (parent_->tracing_enabled_) {
152     context_.AddSpanAttribute("previous-rpc-attempts",
153                               absl::StrCat(attempt_num));
154     context_.AddSpanAttribute("transparent-retry",
155                               absl::StrCat(is_transparent_retry));
156   }
157   if (!PythonCensusStatsEnabled()) {
158     return;
159   }
160   context_.Labels().emplace_back(kClientMethod, parent_->method_);
161   context_.Labels().emplace_back(kClientTarget, parent_->target_);
162   RecordIntMetric(kRpcClientStartedRpcsMeasureName, 1, context_.Labels(),
163                   parent_->identifier_, parent_->registered_method_,
164                   /*include_exchange_labels=*/false);
165 }
166 
167 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)168     RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
169   if (parent_->tracing_enabled_) {
170     char tracing_buf[kMaxTraceContextLen];
171     size_t tracing_len =
172         TraceContextSerialize(context_, tracing_buf, kMaxTraceContextLen);
173     if (tracing_len > 0) {
174       send_initial_metadata->Set(
175           grpc_core::GrpcTraceBinMetadata(),
176           grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len));
177     }
178   }
179   if (!PythonCensusStatsEnabled()) {
180     return;
181   }
182   grpc_slice tags = grpc_empty_slice();
183   size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
184   if (encoded_tags_len > 0) {
185     send_initial_metadata->Set(grpc_core::GrpcTagsBinMetadata(),
186                                grpc_core::Slice(tags));
187   }
188   parent_->labels_injector_.AddExchangeLabelsToMetadata(send_initial_metadata);
189 }
190 
191 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)192     RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
193   if (recv_initial_metadata != nullptr &&
194       recv_initial_metadata->get(grpc_core::GrpcTrailersOnly())
195           .value_or(false)) {
196     is_trailers_only_ = true;
197     return;
198   }
199   labels_from_peer_ =
200       parent_->labels_injector_.GetExchangeLabels(recv_initial_metadata);
201 }
202 
203 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordSendMessage(const grpc_core::SliceBuffer &)204     RecordSendMessage(const grpc_core::SliceBuffer& /*send_message*/) {
205   ++sent_message_count_;
206 }
207 
208 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordReceivedMessage(const grpc_core::SliceBuffer &)209     RecordReceivedMessage(const grpc_core::SliceBuffer& /*recv_message*/) {
210   ++recv_message_count_;
211 }
212 
213 std::shared_ptr<grpc_core::TcpTracerInterface> PythonOpenCensusCallTracer::
StartNewTcpTrace()214     PythonOpenCensusCallAttemptTracer::StartNewTcpTrace() {
215   return nullptr;
216 }
217 
218 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
SetOptionalLabel(OptionalLabelKey key,grpc_core::RefCountedStringValue value)219     SetOptionalLabel(OptionalLabelKey key,
220                      grpc_core::RefCountedStringValue value) {
221   optional_labels_array_[static_cast<size_t>(key)] = std::move(value);
222 }
223 
224 namespace {
225 
226 // Returns 0 if no server stats are present in the metadata.
GetElapsedTimeFromTrailingMetadata(const grpc_metadata_batch * b)227 uint64_t GetElapsedTimeFromTrailingMetadata(const grpc_metadata_batch* b) {
228   if (!PythonCensusStatsEnabled()) {
229     return 0;
230   }
231 
232   const grpc_core::Slice* grpc_server_stats_bin_ptr =
233       b->get_pointer(grpc_core::GrpcServerStatsBinMetadata());
234   if (grpc_server_stats_bin_ptr == nullptr) {
235     return 0;
236   }
237 
238   uint64_t elapsed_time = 0;
239   ServerStatsDeserialize(
240       reinterpret_cast<const char*>(grpc_server_stats_bin_ptr->data()),
241       grpc_server_stats_bin_ptr->size(), &elapsed_time);
242   return elapsed_time;
243 }
244 
245 }  // namespace
246 
247 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats * transport_stream_stats)248     RecordReceivedTrailingMetadata(
249         absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
250         const grpc_transport_stream_stats* transport_stream_stats) {
251   if (!PythonCensusStatsEnabled()) {
252     return;
253   }
254   if (is_trailers_only_) {
255     labels_from_peer_ =
256         parent_->labels_injector_.GetExchangeLabels(recv_trailing_metadata);
257   }
258   auto status_code_ = status.code();
259   uint64_t elapsed_time = 0;
260   if (recv_trailing_metadata != nullptr) {
261     elapsed_time = GetElapsedTimeFromTrailingMetadata(recv_trailing_metadata);
262   }
263 
264   std::string final_status = absl::StatusCodeToString(status_code_);
265   context_.Labels().emplace_back(kClientMethod, parent_->method_);
266   context_.Labels().emplace_back(kClientTarget, parent_->target_);
267   context_.Labels().emplace_back(kClientStatus, final_status);
268   if (parent_->add_csm_optional_labels_) {
269     parent_->labels_injector_.AddXdsOptionalLabels(
270         /*is_client=*/true, optional_labels_array_, context_.Labels());
271   }
272   for (const auto& label : labels_from_peer_) {
273     context_.Labels().emplace_back(label);
274   }
275   uint64_t incoming_bytes = 0;
276   uint64_t outgoing_bytes = 0;
277   if (grpc_core::IsCallTracerInTransportEnabled()) {
278     incoming_bytes = incoming_bytes_.load();
279     outgoing_bytes = outgoing_bytes_.load();
280   } else if (transport_stream_stats != nullptr) {
281     incoming_bytes = transport_stream_stats->incoming.data_bytes;
282     outgoing_bytes = transport_stream_stats->outgoing.data_bytes;
283   }
284   RecordDoubleMetric(kRpcClientSentBytesPerRpcMeasureName,
285                      static_cast<double>(outgoing_bytes), context_.Labels(),
286                      parent_->identifier_, parent_->registered_method_,
287                      /*include_exchange_labels=*/true);
288   RecordDoubleMetric(kRpcClientReceivedBytesPerRpcMeasureName,
289                      static_cast<double>(incoming_bytes), context_.Labels(),
290                      parent_->identifier_, parent_->registered_method_,
291                      /*include_exchange_labels=*/true);
292   RecordDoubleMetric(kRpcClientServerLatencyMeasureName,
293                      absl::ToDoubleSeconds(absl::Nanoseconds(elapsed_time)),
294                      context_.Labels(), parent_->identifier_,
295                      parent_->registered_method_,
296                      /*include_exchange_labels=*/true);
297   RecordDoubleMetric(kRpcClientRoundtripLatencyMeasureName,
298                      absl::ToDoubleSeconds(absl::Now() - start_time_),
299                      context_.Labels(), parent_->identifier_,
300                      parent_->registered_method_,
301                      /*include_exchange_labels=*/true);
302   RecordIntMetric(kRpcClientCompletedRpcMeasureName, 1, context_.Labels(),
303                   parent_->identifier_, parent_->registered_method_,
304                   /*include_exchange_labels=*/true);
305 }
306 
307 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordIncomingBytes(const TransportByteSize & transport_byte_size)308     RecordIncomingBytes(const TransportByteSize& transport_byte_size) {
309   incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
310 }
311 
312 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)313     RecordOutgoingBytes(const TransportByteSize& transport_byte_size) {
314   outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
315 }
316 
317 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordCancel(absl::Status)318     RecordCancel(absl::Status /*cancel_error*/) {}
319 
RecordEnd(const gpr_timespec &)320 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::RecordEnd(
321     const gpr_timespec& /*latency*/) {
322   if (PythonCensusStatsEnabled()) {
323     context_.Labels().emplace_back(kClientMethod, parent_->method_);
324     context_.Labels().emplace_back(kClientStatus,
325                                    StatusCodeToString(status_code_));
326     RecordIntMetric(kRpcClientSentMessagesPerRpcMeasureName,
327                     sent_message_count_, context_.Labels(),
328                     parent_->identifier_, parent_->registered_method_,
329                     /*include_exchange_labels=*/true);
330     RecordIntMetric(kRpcClientReceivedMessagesPerRpcMeasureName,
331                     recv_message_count_, context_.Labels(),
332                     parent_->identifier_, parent_->registered_method_,
333                     /*include_exchange_labels=*/true);
334 
335     grpc_core::MutexLock lock(&parent_->mu_);
336     if (--parent_->num_active_rpcs_ == 0) {
337       parent_->time_at_last_attempt_end_ = absl::Now();
338     }
339   }
340 
341   if (parent_->tracing_enabled_) {
342     if (status_code_ != absl::StatusCode::kOk) {
343       context_.GetSpan().SetStatus(StatusCodeToString(status_code_));
344     }
345     context_.EndSpan();
346     if (IsSampled()) {
347       RecordSpan(context_.GetSpan().ToCensusData());
348     }
349   }
350 
351   // After RecordEnd, Core will make no further usage of this CallAttemptTracer,
352   // so we are free it here.
353   delete this;
354 }
355 
356 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordAnnotation(absl::string_view annotation)357     RecordAnnotation(absl::string_view annotation) {
358   if (!context_.GetSpanContext().IsSampled()) {
359     return;
360   }
361   context_.AddSpanAnnotation(annotation);
362 }
363 
364 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordAnnotation(const Annotation & annotation)365     RecordAnnotation(const Annotation& annotation) {
366   if (!context_.GetSpanContext().IsSampled()) {
367     return;
368   }
369 
370   switch (annotation.type()) {
371     // Annotations are expensive to create. We should only create it if the call
372     // is being sampled by default.
373     default:
374       if (IsSampled()) {
375         context_.AddSpanAnnotation(annotation.ToString());
376       }
377       break;
378   }
379 }
380 
381 }  // namespace grpc_observability
382