• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "server_call_tracer.h"
16 
17 #include <grpc/support/port_platform.h>
18 #include <stdint.h>
19 #include <string.h>
20 
21 #include <algorithm>
22 #include <initializer_list>
23 #include <string>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/strings/escaping.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/strings/str_format.h"
30 #include "absl/strings/string_view.h"
31 #include "absl/time/clock.h"
32 #include "absl/time/time.h"
33 #include "constants.h"
34 #include "observability_util.h"
35 #include "python_observability_context.h"
36 #include "src/core/lib/channel/channel_stack.h"
37 #include "src/core/lib/experiments/experiments.h"
38 #include "src/core/lib/iomgr/error.h"
39 #include "src/core/lib/resource_quota/arena.h"
40 #include "src/core/lib/slice/slice.h"
41 #include "src/core/lib/slice/slice_buffer.h"
42 #include "src/core/lib/transport/metadata_batch.h"
43 #include "src/core/telemetry/call_tracer.h"
44 
45 namespace grpc_observability {
46 
47 namespace {
48 
49 // server metadata elements
50 struct ServerO11yMetadata {
51   grpc_core::Slice path;
52   grpc_core::Slice tracing_slice;
53   grpc_core::Slice census_proto;
54 };
55 
GetO11yMetadata(const grpc_metadata_batch * b,ServerO11yMetadata * som)56 void GetO11yMetadata(const grpc_metadata_batch* b, ServerO11yMetadata* som) {
57   const auto* path = b->get_pointer(grpc_core::HttpPathMetadata());
58   if (path != nullptr) {
59     som->path = path->Ref();
60   }
61   if (PythonCensusTracingEnabled()) {
62     const auto* grpc_trace_bin =
63         b->get_pointer(grpc_core::GrpcTraceBinMetadata());
64     if (grpc_trace_bin != nullptr) {
65       som->tracing_slice = grpc_trace_bin->Ref();
66     }
67   }
68   if (PythonCensusStatsEnabled()) {
69     const auto* grpc_tags_bin =
70         b->get_pointer(grpc_core::GrpcTagsBinMetadata());
71     if (grpc_tags_bin != nullptr) {
72       som->census_proto = grpc_tags_bin->Ref();
73     }
74   }
75 }
76 
KeyInLabels(std::string key,const std::vector<Label> & labels)77 bool KeyInLabels(std::string key, const std::vector<Label>& labels) {
78   const auto it = std::find_if(labels.begin(), labels.end(),
79                                [&key](const Label& l) { return l.key == key; });
80 
81   if (it == labels.end()) {
82     return false;
83   }
84   return true;
85 }
86 
87 }  // namespace
88 
89 //
90 // PythonOpenCensusServerCallTracer
91 //
92 
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)93 void PythonOpenCensusServerCallTracer::RecordSendInitialMetadata(
94     grpc_metadata_batch* send_initial_metadata) {
95   // Only add labels if exchange is needed (Client send metadata with keys in
96   // MetadataExchangeKeyNames).
97   for (const auto& key : MetadataExchangeKeyNames) {
98     if (KeyInLabels(key, labels_from_peer_)) {
99       labels_injector_.AddExchangeLabelsToMetadata(send_initial_metadata);
100     }
101   }
102 }
103 
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)104 void PythonOpenCensusServerCallTracer::RecordReceivedInitialMetadata(
105     grpc_metadata_batch* recv_initial_metadata) {
106   ServerO11yMetadata som;
107   GetO11yMetadata(recv_initial_metadata, &som);
108   path_ = std::move(som.path);
109   method_ = GetMethod(path_);
110   auto tracing_enabled = PythonCensusTracingEnabled();
111   GenerateServerContext(
112       tracing_enabled ? som.tracing_slice.as_string_view() : "",
113       absl::StrCat("Recv.", method_), &context_);
114   registered_method_ =
115       recv_initial_metadata->get(grpc_core::GrpcRegisteredMethod())
116           .value_or(nullptr) != nullptr;
117   if (PythonCensusStatsEnabled()) {
118     context_.Labels().emplace_back(kServerMethod, std::string(method_));
119     RecordIntMetric(kRpcServerStartedRpcsMeasureName, 1, context_.Labels(),
120                     identifier_, registered_method_,
121                     /*include_exchange_labels=*/false);
122   }
123 
124   labels_from_peer_ = labels_injector_.GetExchangeLabels(recv_initial_metadata);
125 }
126 
RecordSendTrailingMetadata(grpc_metadata_batch * send_trailing_metadata)127 void PythonOpenCensusServerCallTracer::RecordSendTrailingMetadata(
128     grpc_metadata_batch* send_trailing_metadata) {
129   // We need to record the time when the trailing metadata was sent to
130   // mark the completeness of the request.
131   elapsed_time_ = absl::Now() - start_time_;
132   if (PythonCensusStatsEnabled() && send_trailing_metadata != nullptr) {
133     size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
134                                       stats_buf_, kMaxServerStatsLen);
135     if (len > 0) {
136       send_trailing_metadata->Set(
137           grpc_core::GrpcServerStatsBinMetadata(),
138           grpc_core::Slice::FromCopiedBuffer(stats_buf_, len));
139     }
140   }
141 }
142 
RecordSendMessage(const grpc_core::SliceBuffer & send_message)143 void PythonOpenCensusServerCallTracer::RecordSendMessage(
144     const grpc_core::SliceBuffer& send_message) {
145   RecordAnnotation(
146       absl::StrFormat("Send message: %ld bytes", send_message.Length()));
147   ++sent_message_count_;
148 }
149 
RecordSendCompressedMessage(const grpc_core::SliceBuffer & send_compressed_message)150 void PythonOpenCensusServerCallTracer::RecordSendCompressedMessage(
151     const grpc_core::SliceBuffer& send_compressed_message) {
152   RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
153                                    send_compressed_message.Length()));
154 }
155 
RecordReceivedMessage(const grpc_core::SliceBuffer & recv_message)156 void PythonOpenCensusServerCallTracer::RecordReceivedMessage(
157     const grpc_core::SliceBuffer& recv_message) {
158   RecordAnnotation(
159       absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
160   ++recv_message_count_;
161 }
162 
RecordReceivedDecompressedMessage(const grpc_core::SliceBuffer & recv_decompressed_message)163 void PythonOpenCensusServerCallTracer::RecordReceivedDecompressedMessage(
164     const grpc_core::SliceBuffer& recv_decompressed_message) {
165   RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
166                                    recv_decompressed_message.Length()));
167 }
168 
RecordCancel(grpc_error_handle)169 void PythonOpenCensusServerCallTracer::RecordCancel(
170     grpc_error_handle /*cancel_error*/) {
171   elapsed_time_ = absl::Now() - start_time_;
172 }
173 
RecordEnd(const grpc_call_final_info * final_info)174 void PythonOpenCensusServerCallTracer::RecordEnd(
175     const grpc_call_final_info* final_info) {
176   if (PythonCensusStatsEnabled()) {
177     uint64_t outgoing_bytes;
178     uint64_t incoming_bytes;
179     if (grpc_core::IsCallTracerInTransportEnabled()) {
180       outgoing_bytes = outgoing_bytes_.load();
181       incoming_bytes = incoming_bytes_.load();
182     } else {
183       outgoing_bytes = GetOutgoingDataSize(final_info);
184       incoming_bytes = GetIncomingDataSize(final_info);
185     }
186     double elapsed_time_s = absl::ToDoubleSeconds(elapsed_time_);
187     context_.Labels().emplace_back(kServerMethod, std::string(method_));
188     context_.Labels().emplace_back(
189         kServerStatus,
190         std::string(StatusCodeToString(final_info->final_status)));
191     for (const auto& label : labels_from_peer_) {
192       context_.Labels().emplace_back(label);
193     }
194     RecordDoubleMetric(kRpcServerSentBytesPerRpcMeasureName,
195                        static_cast<double>(outgoing_bytes), context_.Labels(),
196                        identifier_, registered_method_,
197                        /*include_exchange_labels=*/true);
198     RecordDoubleMetric(kRpcServerReceivedBytesPerRpcMeasureName,
199                        static_cast<double>(incoming_bytes), context_.Labels(),
200                        identifier_, registered_method_,
201                        /*include_exchange_labels=*/true);
202     RecordDoubleMetric(kRpcServerServerLatencyMeasureName, elapsed_time_s,
203                        context_.Labels(), identifier_, registered_method_,
204                        /*include_exchange_labels=*/true);
205     RecordIntMetric(kRpcServerCompletedRpcMeasureName, 1, context_.Labels(),
206                     identifier_, registered_method_,
207                     /*include_exchange_labels=*/true);
208     RecordIntMetric(kRpcServerSentMessagesPerRpcMeasureName,
209                     sent_message_count_, context_.Labels(), identifier_,
210                     registered_method_, /*include_exchange_labels=*/true);
211     RecordIntMetric(kRpcServerReceivedMessagesPerRpcMeasureName,
212                     recv_message_count_, context_.Labels(), identifier_,
213                     registered_method_, /*include_exchange_labels=*/true);
214   }
215   if (PythonCensusTracingEnabled()) {
216     context_.EndSpan();
217     if (IsSampled()) {
218       RecordSpan(context_.GetSpan().ToCensusData());
219     }
220   }
221 
222   // After RecordEnd, Core will make no further usage of this ServerCallTracer,
223   // so we are free it here.
224   delete this;
225 }
226 
RecordIncomingBytes(const TransportByteSize & transport_byte_size)227 void PythonOpenCensusServerCallTracer::RecordIncomingBytes(
228     const TransportByteSize& transport_byte_size) {
229   incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
230 }
231 
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)232 void PythonOpenCensusServerCallTracer::RecordOutgoingBytes(
233     const TransportByteSize& transport_byte_size) {
234   outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
235 }
236 
RecordAnnotation(absl::string_view annotation)237 void PythonOpenCensusServerCallTracer::RecordAnnotation(
238     absl::string_view annotation) {
239   if (!context_.GetSpanContext().IsSampled()) {
240     return;
241   }
242   context_.AddSpanAnnotation(annotation);
243 }
244 
RecordAnnotation(const Annotation & annotation)245 void PythonOpenCensusServerCallTracer::RecordAnnotation(
246     const Annotation& annotation) {
247   if (!context_.GetSpanContext().IsSampled()) {
248     return;
249   }
250 
251   switch (annotation.type()) {
252     // Annotations are expensive to create. We should only create it if the
253     // call is being sampled by default.
254     default:
255       if (IsSampled()) {
256         context_.AddSpanAnnotation(annotation.ToString());
257       }
258       break;
259   }
260 }
261 
262 std::shared_ptr<grpc_core::TcpTracerInterface>
StartNewTcpTrace()263 PythonOpenCensusServerCallTracer::StartNewTcpTrace() {
264   return nullptr;
265 }
266 
TraceId()267 std::string PythonOpenCensusServerCallTracer::TraceId() {
268   return absl::BytesToHexString(
269       absl::string_view(context_.GetSpanContext().TraceId()));
270 }
271 
SpanId()272 std::string PythonOpenCensusServerCallTracer::SpanId() {
273   return absl::BytesToHexString(
274       absl::string_view(context_.GetSpanContext().SpanId()));
275 }
276 
IsSampled()277 bool PythonOpenCensusServerCallTracer::IsSampled() {
278   return context_.GetSpanContext().IsSampled();
279 }
280 
281 //
282 // PythonOpenCensusServerCallTracerFactory
283 //
284 
285 grpc_core::ServerCallTracer*
CreateNewServerCallTracer(grpc_core::Arena * arena,const grpc_core::ChannelArgs & channel_args)286 PythonOpenCensusServerCallTracerFactory::CreateNewServerCallTracer(
287     grpc_core::Arena* arena, const grpc_core::ChannelArgs& channel_args) {
288   // We don't use arena here to to ensure that memory is allocated and freed in
289   // the same DLL in Windows.
290   (void)arena;
291   (void)channel_args;
292   return new PythonOpenCensusServerCallTracer(exchange_labels_, identifier_);
293 }
294 
IsServerTraced(const grpc_core::ChannelArgs & args)295 bool PythonOpenCensusServerCallTracerFactory::IsServerTraced(
296     const grpc_core::ChannelArgs& args) {
297   // Returns true if a server is to be traced, false otherwise.
298   return true;
299 }
300 
301 PythonOpenCensusServerCallTracerFactory::
PythonOpenCensusServerCallTracerFactory(const std::vector<Label> & exchange_labels,const char * identifier)302     PythonOpenCensusServerCallTracerFactory(
303         const std::vector<Label>& exchange_labels, const char* identifier)
304     : exchange_labels_(exchange_labels), identifier_(identifier) {}
305 
306 }  // namespace grpc_observability
307