1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "server_call_tracer.h"
18
19 #include <stdint.h>
20 #include <string.h>
21
22 #include <algorithm>
23 #include <initializer_list>
24 #include <string>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/strings/escaping.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/time/clock.h"
33 #include "absl/time/time.h"
34 #include "constants.h"
35 #include "observability_util.h"
36 #include "python_census_context.h"
37
38 #include "src/core/lib/channel/call_tracer.h"
39 #include "src/core/lib/channel/channel_stack.h"
40 #include "src/core/lib/iomgr/error.h"
41 #include "src/core/lib/resource_quota/arena.h"
42 #include "src/core/lib/slice/slice.h"
43 #include "src/core/lib/slice/slice_buffer.h"
44 #include "src/core/lib/transport/metadata_batch.h"
45
46 namespace grpc_observability {
47
48 namespace {
49
50 // server metadata elements
51 struct ServerO11yMetadata {
52 grpc_core::Slice path;
53 grpc_core::Slice tracing_slice;
54 grpc_core::Slice census_proto;
55 };
56
GetO11yMetadata(const grpc_metadata_batch * b,ServerO11yMetadata * som)57 void GetO11yMetadata(const grpc_metadata_batch* b, ServerO11yMetadata* som) {
58 const auto* path = b->get_pointer(grpc_core::HttpPathMetadata());
59 if (path != nullptr) {
60 som->path = path->Ref();
61 }
62 if (PythonCensusTracingEnabled()) {
63 const auto* grpc_trace_bin =
64 b->get_pointer(grpc_core::GrpcTraceBinMetadata());
65 if (grpc_trace_bin != nullptr) {
66 som->tracing_slice = grpc_trace_bin->Ref();
67 }
68 }
69 if (PythonCensusStatsEnabled()) {
70 const auto* grpc_tags_bin =
71 b->get_pointer(grpc_core::GrpcTagsBinMetadata());
72 if (grpc_tags_bin != nullptr) {
73 som->census_proto = grpc_tags_bin->Ref();
74 }
75 }
76 }
77
78 } // namespace
79
80 //
81 // PythonOpenCensusServerCallTracer
82 //
83
84 class PythonOpenCensusServerCallTracer : public grpc_core::ServerCallTracer {
85 public:
86 // Maximum size of server stats that are sent on the wire.
87 static constexpr uint32_t kMaxServerStatsLen = 16;
88
PythonOpenCensusServerCallTracer()89 PythonOpenCensusServerCallTracer()
90 : start_time_(absl::Now()),
91 recv_message_count_(0),
92 sent_message_count_(0) {}
93
TraceId()94 std::string TraceId() override {
95 return absl::BytesToHexString(
96 absl::string_view(context_.GetSpanContext().TraceId()));
97 }
98
SpanId()99 std::string SpanId() override {
100 return absl::BytesToHexString(
101 absl::string_view(context_.GetSpanContext().SpanId()));
102 }
103
IsSampled()104 bool IsSampled() override { return context_.GetSpanContext().IsSampled(); }
105
106 // Please refer to `grpc_transport_stream_op_batch_payload` for details on
107 // arguments.
108 // It's not a requirement to have this metric thus left unimplemented.
RecordSendInitialMetadata(grpc_metadata_batch *)109 void RecordSendInitialMetadata(
110 grpc_metadata_batch* /*send_initial_metadata*/) override {}
111
112 void RecordSendTrailingMetadata(
113 grpc_metadata_batch* send_trailing_metadata) override;
114
RecordSendMessage(const grpc_core::SliceBuffer & send_message)115 void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override {
116 RecordAnnotation(
117 absl::StrFormat("Send message: %ld bytes", send_message.Length()));
118 ++sent_message_count_;
119 }
120
RecordSendCompressedMessage(const grpc_core::SliceBuffer & send_compressed_message)121 void RecordSendCompressedMessage(
122 const grpc_core::SliceBuffer& send_compressed_message) override {
123 RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
124 send_compressed_message.Length()));
125 }
126
127 void RecordReceivedInitialMetadata(
128 grpc_metadata_batch* recv_initial_metadata) override;
129
RecordReceivedMessage(const grpc_core::SliceBuffer & recv_message)130 void RecordReceivedMessage(
131 const grpc_core::SliceBuffer& recv_message) override {
132 RecordAnnotation(
133 absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
134 ++recv_message_count_;
135 }
RecordReceivedDecompressedMessage(const grpc_core::SliceBuffer & recv_decompressed_message)136 void RecordReceivedDecompressedMessage(
137 const grpc_core::SliceBuffer& recv_decompressed_message) override {
138 RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
139 recv_decompressed_message.Length()));
140 }
141
RecordReceivedTrailingMetadata(grpc_metadata_batch *)142 void RecordReceivedTrailingMetadata(
143 grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
144
RecordCancel(grpc_error_handle)145 void RecordCancel(grpc_error_handle /*cancel_error*/) override {
146 elapsed_time_ = absl::Now() - start_time_;
147 }
148
149 void RecordEnd(const grpc_call_final_info* final_info) override;
150
RecordAnnotation(absl::string_view annotation)151 void RecordAnnotation(absl::string_view annotation) override {
152 if (!context_.GetSpanContext().IsSampled()) {
153 return;
154 }
155 context_.AddSpanAnnotation(annotation);
156 }
157
RecordAnnotation(const Annotation & annotation)158 void RecordAnnotation(const Annotation& annotation) override {
159 if (!context_.GetSpanContext().IsSampled()) {
160 return;
161 }
162
163 switch (annotation.type()) {
164 // Annotations are expensive to create. We should only create it if the
165 // call is being sampled by default.
166 default:
167 if (IsSampled()) {
168 context_.AddSpanAnnotation(annotation.ToString());
169 }
170 break;
171 }
172 }
173
StartNewTcpTrace()174 std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override {
175 return nullptr;
176 }
177
178 private:
179 PythonCensusContext context_;
180 // server method
181 grpc_core::Slice path_;
182 absl::string_view method_;
183 absl::Time start_time_;
184 absl::Duration elapsed_time_;
185 uint64_t recv_message_count_;
186 uint64_t sent_message_count_;
187 // Buffer needed for grpc_slice to reference it when adding metadata to
188 // response.
189 char stats_buf_[kMaxServerStatsLen];
190 };
191
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)192 void PythonOpenCensusServerCallTracer::RecordReceivedInitialMetadata(
193 grpc_metadata_batch* recv_initial_metadata) {
194 ServerO11yMetadata som;
195 GetO11yMetadata(recv_initial_metadata, &som);
196 path_ = std::move(som.path);
197 method_ = GetMethod(path_);
198 auto tracing_enabled = PythonCensusTracingEnabled();
199 GenerateServerContext(
200 tracing_enabled ? som.tracing_slice.as_string_view() : "",
201 absl::StrCat("Recv.", method_), &context_);
202 if (PythonCensusStatsEnabled()) {
203 context_.Labels().emplace_back(kServerMethod, std::string(method_));
204 RecordIntMetric(kRpcServerStartedRpcsMeasureName, 1, context_.Labels());
205 }
206 }
207
RecordSendTrailingMetadata(grpc_metadata_batch * send_trailing_metadata)208 void PythonOpenCensusServerCallTracer::RecordSendTrailingMetadata(
209 grpc_metadata_batch* send_trailing_metadata) {
210 // We need to record the time when the trailing metadata was sent to
211 // mark the completeness of the request.
212 elapsed_time_ = absl::Now() - start_time_;
213 if (PythonCensusStatsEnabled() && send_trailing_metadata != nullptr) {
214 size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
215 stats_buf_, kMaxServerStatsLen);
216 if (len > 0) {
217 send_trailing_metadata->Set(
218 grpc_core::GrpcServerStatsBinMetadata(),
219 grpc_core::Slice::FromCopiedBuffer(stats_buf_, len));
220 }
221 }
222 }
223
RecordEnd(const grpc_call_final_info * final_info)224 void PythonOpenCensusServerCallTracer::RecordEnd(
225 const grpc_call_final_info* final_info) {
226 if (PythonCensusStatsEnabled()) {
227 const uint64_t request_size = GetOutgoingDataSize(final_info);
228 const uint64_t response_size = GetIncomingDataSize(final_info);
229 double elapsed_time_s = absl::ToDoubleSeconds(elapsed_time_);
230 context_.Labels().emplace_back(kServerMethod, std::string(method_));
231 context_.Labels().emplace_back(
232 kServerStatus,
233 std::string(StatusCodeToString(final_info->final_status)));
234 RecordDoubleMetric(kRpcServerSentBytesPerRpcMeasureName,
235 static_cast<double>(response_size), context_.Labels());
236 RecordDoubleMetric(kRpcServerReceivedBytesPerRpcMeasureName,
237 static_cast<double>(request_size), context_.Labels());
238 RecordDoubleMetric(kRpcServerServerLatencyMeasureName, elapsed_time_s,
239 context_.Labels());
240 RecordIntMetric(kRpcServerCompletedRpcMeasureName, 1, context_.Labels());
241 RecordIntMetric(kRpcServerSentMessagesPerRpcMeasureName,
242 sent_message_count_, context_.Labels());
243 RecordIntMetric(kRpcServerReceivedMessagesPerRpcMeasureName,
244 recv_message_count_, context_.Labels());
245 }
246 if (PythonCensusTracingEnabled()) {
247 context_.EndSpan();
248 if (IsSampled()) {
249 RecordSpan(context_.GetSpan().ToCensusData());
250 }
251 }
252
253 // After RecordEnd, Core will make no further usage of this ServerCallTracer,
254 // so we are free it here.
255 delete this;
256 }
257
258 //
259 // PythonOpenCensusServerCallTracerFactory
260 //
261
262 grpc_core::ServerCallTracer*
CreateNewServerCallTracer(grpc_core::Arena * arena,const grpc_core::ChannelArgs & channel_args)263 PythonOpenCensusServerCallTracerFactory::CreateNewServerCallTracer(
264 grpc_core::Arena* arena, const grpc_core::ChannelArgs& channel_args) {
265 // We don't use arena here to to ensure that memory is allocated and freed in
266 // the same DLL in Windows.
267 (void)arena;
268 (void)channel_args;
269 return new PythonOpenCensusServerCallTracer();
270 }
271
272 } // namespace grpc_observability
273