1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "server_call_tracer.h"
16
17 #include <grpc/support/port_platform.h>
18 #include <stdint.h>
19 #include <string.h>
20
21 #include <algorithm>
22 #include <initializer_list>
23 #include <string>
24 #include <utility>
25 #include <vector>
26
27 #include "absl/strings/escaping.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/strings/str_format.h"
30 #include "absl/strings/string_view.h"
31 #include "absl/time/clock.h"
32 #include "absl/time/time.h"
33 #include "constants.h"
34 #include "observability_util.h"
35 #include "python_observability_context.h"
36 #include "src/core/lib/channel/channel_stack.h"
37 #include "src/core/lib/experiments/experiments.h"
38 #include "src/core/lib/iomgr/error.h"
39 #include "src/core/lib/resource_quota/arena.h"
40 #include "src/core/lib/slice/slice.h"
41 #include "src/core/lib/slice/slice_buffer.h"
42 #include "src/core/lib/transport/metadata_batch.h"
43 #include "src/core/telemetry/call_tracer.h"
44
45 namespace grpc_observability {
46
47 namespace {
48
49 // server metadata elements
50 struct ServerO11yMetadata {
51 grpc_core::Slice path;
52 grpc_core::Slice tracing_slice;
53 grpc_core::Slice census_proto;
54 };
55
GetO11yMetadata(const grpc_metadata_batch * b,ServerO11yMetadata * som)56 void GetO11yMetadata(const grpc_metadata_batch* b, ServerO11yMetadata* som) {
57 const auto* path = b->get_pointer(grpc_core::HttpPathMetadata());
58 if (path != nullptr) {
59 som->path = path->Ref();
60 }
61 if (PythonCensusTracingEnabled()) {
62 const auto* grpc_trace_bin =
63 b->get_pointer(grpc_core::GrpcTraceBinMetadata());
64 if (grpc_trace_bin != nullptr) {
65 som->tracing_slice = grpc_trace_bin->Ref();
66 }
67 }
68 if (PythonCensusStatsEnabled()) {
69 const auto* grpc_tags_bin =
70 b->get_pointer(grpc_core::GrpcTagsBinMetadata());
71 if (grpc_tags_bin != nullptr) {
72 som->census_proto = grpc_tags_bin->Ref();
73 }
74 }
75 }
76
KeyInLabels(std::string key,const std::vector<Label> & labels)77 bool KeyInLabels(std::string key, const std::vector<Label>& labels) {
78 const auto it = std::find_if(labels.begin(), labels.end(),
79 [&key](const Label& l) { return l.key == key; });
80
81 if (it == labels.end()) {
82 return false;
83 }
84 return true;
85 }
86
87 } // namespace
88
89 //
90 // PythonOpenCensusServerCallTracer
91 //
92
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)93 void PythonOpenCensusServerCallTracer::RecordSendInitialMetadata(
94 grpc_metadata_batch* send_initial_metadata) {
95 // Only add labels if exchange is needed (Client send metadata with keys in
96 // MetadataExchangeKeyNames).
97 for (const auto& key : MetadataExchangeKeyNames) {
98 if (KeyInLabels(key, labels_from_peer_)) {
99 labels_injector_.AddExchangeLabelsToMetadata(send_initial_metadata);
100 }
101 }
102 }
103
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)104 void PythonOpenCensusServerCallTracer::RecordReceivedInitialMetadata(
105 grpc_metadata_batch* recv_initial_metadata) {
106 ServerO11yMetadata som;
107 GetO11yMetadata(recv_initial_metadata, &som);
108 path_ = std::move(som.path);
109 method_ = GetMethod(path_);
110 auto tracing_enabled = PythonCensusTracingEnabled();
111 GenerateServerContext(
112 tracing_enabled ? som.tracing_slice.as_string_view() : "",
113 absl::StrCat("Recv.", method_), &context_);
114 registered_method_ =
115 recv_initial_metadata->get(grpc_core::GrpcRegisteredMethod())
116 .value_or(nullptr) != nullptr;
117 if (PythonCensusStatsEnabled()) {
118 context_.Labels().emplace_back(kServerMethod, std::string(method_));
119 RecordIntMetric(kRpcServerStartedRpcsMeasureName, 1, context_.Labels(),
120 identifier_, registered_method_,
121 /*include_exchange_labels=*/false);
122 }
123
124 labels_from_peer_ = labels_injector_.GetExchangeLabels(recv_initial_metadata);
125 }
126
RecordSendTrailingMetadata(grpc_metadata_batch * send_trailing_metadata)127 void PythonOpenCensusServerCallTracer::RecordSendTrailingMetadata(
128 grpc_metadata_batch* send_trailing_metadata) {
129 // We need to record the time when the trailing metadata was sent to
130 // mark the completeness of the request.
131 elapsed_time_ = absl::Now() - start_time_;
132 if (PythonCensusStatsEnabled() && send_trailing_metadata != nullptr) {
133 size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
134 stats_buf_, kMaxServerStatsLen);
135 if (len > 0) {
136 send_trailing_metadata->Set(
137 grpc_core::GrpcServerStatsBinMetadata(),
138 grpc_core::Slice::FromCopiedBuffer(stats_buf_, len));
139 }
140 }
141 }
142
RecordSendMessage(const grpc_core::SliceBuffer & send_message)143 void PythonOpenCensusServerCallTracer::RecordSendMessage(
144 const grpc_core::SliceBuffer& send_message) {
145 RecordAnnotation(
146 absl::StrFormat("Send message: %ld bytes", send_message.Length()));
147 ++sent_message_count_;
148 }
149
RecordSendCompressedMessage(const grpc_core::SliceBuffer & send_compressed_message)150 void PythonOpenCensusServerCallTracer::RecordSendCompressedMessage(
151 const grpc_core::SliceBuffer& send_compressed_message) {
152 RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
153 send_compressed_message.Length()));
154 }
155
RecordReceivedMessage(const grpc_core::SliceBuffer & recv_message)156 void PythonOpenCensusServerCallTracer::RecordReceivedMessage(
157 const grpc_core::SliceBuffer& recv_message) {
158 RecordAnnotation(
159 absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
160 ++recv_message_count_;
161 }
162
RecordReceivedDecompressedMessage(const grpc_core::SliceBuffer & recv_decompressed_message)163 void PythonOpenCensusServerCallTracer::RecordReceivedDecompressedMessage(
164 const grpc_core::SliceBuffer& recv_decompressed_message) {
165 RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
166 recv_decompressed_message.Length()));
167 }
168
RecordCancel(grpc_error_handle)169 void PythonOpenCensusServerCallTracer::RecordCancel(
170 grpc_error_handle /*cancel_error*/) {
171 elapsed_time_ = absl::Now() - start_time_;
172 }
173
RecordEnd(const grpc_call_final_info * final_info)174 void PythonOpenCensusServerCallTracer::RecordEnd(
175 const grpc_call_final_info* final_info) {
176 if (PythonCensusStatsEnabled()) {
177 uint64_t outgoing_bytes;
178 uint64_t incoming_bytes;
179 if (grpc_core::IsCallTracerInTransportEnabled()) {
180 outgoing_bytes = outgoing_bytes_.load();
181 incoming_bytes = incoming_bytes_.load();
182 } else {
183 outgoing_bytes = GetOutgoingDataSize(final_info);
184 incoming_bytes = GetIncomingDataSize(final_info);
185 }
186 double elapsed_time_s = absl::ToDoubleSeconds(elapsed_time_);
187 context_.Labels().emplace_back(kServerMethod, std::string(method_));
188 context_.Labels().emplace_back(
189 kServerStatus,
190 std::string(StatusCodeToString(final_info->final_status)));
191 for (const auto& label : labels_from_peer_) {
192 context_.Labels().emplace_back(label);
193 }
194 RecordDoubleMetric(kRpcServerSentBytesPerRpcMeasureName,
195 static_cast<double>(outgoing_bytes), context_.Labels(),
196 identifier_, registered_method_,
197 /*include_exchange_labels=*/true);
198 RecordDoubleMetric(kRpcServerReceivedBytesPerRpcMeasureName,
199 static_cast<double>(incoming_bytes), context_.Labels(),
200 identifier_, registered_method_,
201 /*include_exchange_labels=*/true);
202 RecordDoubleMetric(kRpcServerServerLatencyMeasureName, elapsed_time_s,
203 context_.Labels(), identifier_, registered_method_,
204 /*include_exchange_labels=*/true);
205 RecordIntMetric(kRpcServerCompletedRpcMeasureName, 1, context_.Labels(),
206 identifier_, registered_method_,
207 /*include_exchange_labels=*/true);
208 RecordIntMetric(kRpcServerSentMessagesPerRpcMeasureName,
209 sent_message_count_, context_.Labels(), identifier_,
210 registered_method_, /*include_exchange_labels=*/true);
211 RecordIntMetric(kRpcServerReceivedMessagesPerRpcMeasureName,
212 recv_message_count_, context_.Labels(), identifier_,
213 registered_method_, /*include_exchange_labels=*/true);
214 }
215 if (PythonCensusTracingEnabled()) {
216 context_.EndSpan();
217 if (IsSampled()) {
218 RecordSpan(context_.GetSpan().ToCensusData());
219 }
220 }
221
222 // After RecordEnd, Core will make no further usage of this ServerCallTracer,
223 // so we are free it here.
224 delete this;
225 }
226
RecordIncomingBytes(const TransportByteSize & transport_byte_size)227 void PythonOpenCensusServerCallTracer::RecordIncomingBytes(
228 const TransportByteSize& transport_byte_size) {
229 incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
230 }
231
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)232 void PythonOpenCensusServerCallTracer::RecordOutgoingBytes(
233 const TransportByteSize& transport_byte_size) {
234 outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
235 }
236
RecordAnnotation(absl::string_view annotation)237 void PythonOpenCensusServerCallTracer::RecordAnnotation(
238 absl::string_view annotation) {
239 if (!context_.GetSpanContext().IsSampled()) {
240 return;
241 }
242 context_.AddSpanAnnotation(annotation);
243 }
244
RecordAnnotation(const Annotation & annotation)245 void PythonOpenCensusServerCallTracer::RecordAnnotation(
246 const Annotation& annotation) {
247 if (!context_.GetSpanContext().IsSampled()) {
248 return;
249 }
250
251 switch (annotation.type()) {
252 // Annotations are expensive to create. We should only create it if the
253 // call is being sampled by default.
254 default:
255 if (IsSampled()) {
256 context_.AddSpanAnnotation(annotation.ToString());
257 }
258 break;
259 }
260 }
261
262 std::shared_ptr<grpc_core::TcpTracerInterface>
StartNewTcpTrace()263 PythonOpenCensusServerCallTracer::StartNewTcpTrace() {
264 return nullptr;
265 }
266
TraceId()267 std::string PythonOpenCensusServerCallTracer::TraceId() {
268 return absl::BytesToHexString(
269 absl::string_view(context_.GetSpanContext().TraceId()));
270 }
271
SpanId()272 std::string PythonOpenCensusServerCallTracer::SpanId() {
273 return absl::BytesToHexString(
274 absl::string_view(context_.GetSpanContext().SpanId()));
275 }
276
IsSampled()277 bool PythonOpenCensusServerCallTracer::IsSampled() {
278 return context_.GetSpanContext().IsSampled();
279 }
280
281 //
282 // PythonOpenCensusServerCallTracerFactory
283 //
284
285 grpc_core::ServerCallTracer*
CreateNewServerCallTracer(grpc_core::Arena * arena,const grpc_core::ChannelArgs & channel_args)286 PythonOpenCensusServerCallTracerFactory::CreateNewServerCallTracer(
287 grpc_core::Arena* arena, const grpc_core::ChannelArgs& channel_args) {
288 // We don't use arena here to to ensure that memory is allocated and freed in
289 // the same DLL in Windows.
290 (void)arena;
291 (void)channel_args;
292 return new PythonOpenCensusServerCallTracer(exchange_labels_, identifier_);
293 }
294
IsServerTraced(const grpc_core::ChannelArgs & args)295 bool PythonOpenCensusServerCallTracerFactory::IsServerTraced(
296 const grpc_core::ChannelArgs& args) {
297 // Returns true if a server is to be traced, false otherwise.
298 return true;
299 }
300
301 PythonOpenCensusServerCallTracerFactory::
PythonOpenCensusServerCallTracerFactory(const std::vector<Label> & exchange_labels,const char * identifier)302 PythonOpenCensusServerCallTracerFactory(
303 const std::vector<Label>& exchange_labels, const char* identifier)
304 : exchange_labels_(exchange_labels), identifier_(identifier) {}
305
306 } // namespace grpc_observability
307