• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "server_call_tracer.h"
18 
19 #include <stdint.h>
20 #include <string.h>
21 
22 #include <algorithm>
23 #include <initializer_list>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/strings/escaping.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/time/clock.h"
33 #include "absl/time/time.h"
34 #include "constants.h"
35 #include "observability_util.h"
36 #include "python_census_context.h"
37 
38 #include "src/core/lib/channel/call_tracer.h"
39 #include "src/core/lib/channel/channel_stack.h"
40 #include "src/core/lib/iomgr/error.h"
41 #include "src/core/lib/resource_quota/arena.h"
42 #include "src/core/lib/slice/slice.h"
43 #include "src/core/lib/slice/slice_buffer.h"
44 #include "src/core/lib/transport/metadata_batch.h"
45 
46 namespace grpc_observability {
47 
48 namespace {
49 
50 // server metadata elements
51 struct ServerO11yMetadata {
52   grpc_core::Slice path;
53   grpc_core::Slice tracing_slice;
54   grpc_core::Slice census_proto;
55 };
56 
GetO11yMetadata(const grpc_metadata_batch * b,ServerO11yMetadata * som)57 void GetO11yMetadata(const grpc_metadata_batch* b, ServerO11yMetadata* som) {
58   const auto* path = b->get_pointer(grpc_core::HttpPathMetadata());
59   if (path != nullptr) {
60     som->path = path->Ref();
61   }
62   if (PythonCensusTracingEnabled()) {
63     const auto* grpc_trace_bin =
64         b->get_pointer(grpc_core::GrpcTraceBinMetadata());
65     if (grpc_trace_bin != nullptr) {
66       som->tracing_slice = grpc_trace_bin->Ref();
67     }
68   }
69   if (PythonCensusStatsEnabled()) {
70     const auto* grpc_tags_bin =
71         b->get_pointer(grpc_core::GrpcTagsBinMetadata());
72     if (grpc_tags_bin != nullptr) {
73       som->census_proto = grpc_tags_bin->Ref();
74     }
75   }
76 }
77 
78 }  // namespace
79 
80 //
81 // PythonOpenCensusServerCallTracer
82 //
83 
84 class PythonOpenCensusServerCallTracer : public grpc_core::ServerCallTracer {
85  public:
86   // Maximum size of server stats that are sent on the wire.
87   static constexpr uint32_t kMaxServerStatsLen = 16;
88 
PythonOpenCensusServerCallTracer()89   PythonOpenCensusServerCallTracer()
90       : start_time_(absl::Now()),
91         recv_message_count_(0),
92         sent_message_count_(0) {}
93 
TraceId()94   std::string TraceId() override {
95     return absl::BytesToHexString(
96         absl::string_view(context_.GetSpanContext().TraceId()));
97   }
98 
SpanId()99   std::string SpanId() override {
100     return absl::BytesToHexString(
101         absl::string_view(context_.GetSpanContext().SpanId()));
102   }
103 
IsSampled()104   bool IsSampled() override { return context_.GetSpanContext().IsSampled(); }
105 
106   // Please refer to `grpc_transport_stream_op_batch_payload` for details on
107   // arguments.
108   // It's not a requirement to have this metric thus left unimplemented.
RecordSendInitialMetadata(grpc_metadata_batch *)109   void RecordSendInitialMetadata(
110       grpc_metadata_batch* /*send_initial_metadata*/) override {}
111 
112   void RecordSendTrailingMetadata(
113       grpc_metadata_batch* send_trailing_metadata) override;
114 
RecordSendMessage(const grpc_core::SliceBuffer & send_message)115   void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override {
116     RecordAnnotation(
117         absl::StrFormat("Send message: %ld bytes", send_message.Length()));
118     ++sent_message_count_;
119   }
120 
RecordSendCompressedMessage(const grpc_core::SliceBuffer & send_compressed_message)121   void RecordSendCompressedMessage(
122       const grpc_core::SliceBuffer& send_compressed_message) override {
123     RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
124                                      send_compressed_message.Length()));
125   }
126 
127   void RecordReceivedInitialMetadata(
128       grpc_metadata_batch* recv_initial_metadata) override;
129 
RecordReceivedMessage(const grpc_core::SliceBuffer & recv_message)130   void RecordReceivedMessage(
131       const grpc_core::SliceBuffer& recv_message) override {
132     RecordAnnotation(
133         absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
134     ++recv_message_count_;
135   }
RecordReceivedDecompressedMessage(const grpc_core::SliceBuffer & recv_decompressed_message)136   void RecordReceivedDecompressedMessage(
137       const grpc_core::SliceBuffer& recv_decompressed_message) override {
138     RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
139                                      recv_decompressed_message.Length()));
140   }
141 
RecordReceivedTrailingMetadata(grpc_metadata_batch *)142   void RecordReceivedTrailingMetadata(
143       grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
144 
RecordCancel(grpc_error_handle)145   void RecordCancel(grpc_error_handle /*cancel_error*/) override {
146     elapsed_time_ = absl::Now() - start_time_;
147   }
148 
149   void RecordEnd(const grpc_call_final_info* final_info) override;
150 
RecordAnnotation(absl::string_view annotation)151   void RecordAnnotation(absl::string_view annotation) override {
152     if (!context_.GetSpanContext().IsSampled()) {
153       return;
154     }
155     context_.AddSpanAnnotation(annotation);
156   }
157 
RecordAnnotation(const Annotation & annotation)158   void RecordAnnotation(const Annotation& annotation) override {
159     if (!context_.GetSpanContext().IsSampled()) {
160       return;
161     }
162 
163     switch (annotation.type()) {
164       // Annotations are expensive to create. We should only create it if the
165       // call is being sampled by default.
166       default:
167         if (IsSampled()) {
168           context_.AddSpanAnnotation(annotation.ToString());
169         }
170         break;
171     }
172   }
173 
StartNewTcpTrace()174   std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override {
175     return nullptr;
176   }
177 
178  private:
179   PythonCensusContext context_;
180   // server method
181   grpc_core::Slice path_;
182   absl::string_view method_;
183   absl::Time start_time_;
184   absl::Duration elapsed_time_;
185   uint64_t recv_message_count_;
186   uint64_t sent_message_count_;
187   // Buffer needed for grpc_slice to reference it when adding metadata to
188   // response.
189   char stats_buf_[kMaxServerStatsLen];
190 };
191 
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)192 void PythonOpenCensusServerCallTracer::RecordReceivedInitialMetadata(
193     grpc_metadata_batch* recv_initial_metadata) {
194   ServerO11yMetadata som;
195   GetO11yMetadata(recv_initial_metadata, &som);
196   path_ = std::move(som.path);
197   method_ = GetMethod(path_);
198   auto tracing_enabled = PythonCensusTracingEnabled();
199   GenerateServerContext(
200       tracing_enabled ? som.tracing_slice.as_string_view() : "",
201       absl::StrCat("Recv.", method_), &context_);
202   if (PythonCensusStatsEnabled()) {
203     context_.Labels().emplace_back(kServerMethod, std::string(method_));
204     RecordIntMetric(kRpcServerStartedRpcsMeasureName, 1, context_.Labels());
205   }
206 }
207 
RecordSendTrailingMetadata(grpc_metadata_batch * send_trailing_metadata)208 void PythonOpenCensusServerCallTracer::RecordSendTrailingMetadata(
209     grpc_metadata_batch* send_trailing_metadata) {
210   // We need to record the time when the trailing metadata was sent to
211   // mark the completeness of the request.
212   elapsed_time_ = absl::Now() - start_time_;
213   if (PythonCensusStatsEnabled() && send_trailing_metadata != nullptr) {
214     size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
215                                       stats_buf_, kMaxServerStatsLen);
216     if (len > 0) {
217       send_trailing_metadata->Set(
218           grpc_core::GrpcServerStatsBinMetadata(),
219           grpc_core::Slice::FromCopiedBuffer(stats_buf_, len));
220     }
221   }
222 }
223 
RecordEnd(const grpc_call_final_info * final_info)224 void PythonOpenCensusServerCallTracer::RecordEnd(
225     const grpc_call_final_info* final_info) {
226   if (PythonCensusStatsEnabled()) {
227     const uint64_t request_size = GetOutgoingDataSize(final_info);
228     const uint64_t response_size = GetIncomingDataSize(final_info);
229     double elapsed_time_s = absl::ToDoubleSeconds(elapsed_time_);
230     context_.Labels().emplace_back(kServerMethod, std::string(method_));
231     context_.Labels().emplace_back(
232         kServerStatus,
233         std::string(StatusCodeToString(final_info->final_status)));
234     RecordDoubleMetric(kRpcServerSentBytesPerRpcMeasureName,
235                        static_cast<double>(response_size), context_.Labels());
236     RecordDoubleMetric(kRpcServerReceivedBytesPerRpcMeasureName,
237                        static_cast<double>(request_size), context_.Labels());
238     RecordDoubleMetric(kRpcServerServerLatencyMeasureName, elapsed_time_s,
239                        context_.Labels());
240     RecordIntMetric(kRpcServerCompletedRpcMeasureName, 1, context_.Labels());
241     RecordIntMetric(kRpcServerSentMessagesPerRpcMeasureName,
242                     sent_message_count_, context_.Labels());
243     RecordIntMetric(kRpcServerReceivedMessagesPerRpcMeasureName,
244                     recv_message_count_, context_.Labels());
245   }
246   if (PythonCensusTracingEnabled()) {
247     context_.EndSpan();
248     if (IsSampled()) {
249       RecordSpan(context_.GetSpan().ToCensusData());
250     }
251   }
252 
253   // After RecordEnd, Core will make no further usage of this ServerCallTracer,
254   // so we are free it here.
255   delete this;
256 }
257 
258 //
259 // PythonOpenCensusServerCallTracerFactory
260 //
261 
262 grpc_core::ServerCallTracer*
CreateNewServerCallTracer(grpc_core::Arena * arena,const grpc_core::ChannelArgs & channel_args)263 PythonOpenCensusServerCallTracerFactory::CreateNewServerCallTracer(
264     grpc_core::Arena* arena, const grpc_core::ChannelArgs& channel_args) {
265   // We don't use arena here to to ensure that memory is allocated and freed in
266   // the same DLL in Windows.
267   (void)arena;
268   (void)channel_args;
269   return new PythonOpenCensusServerCallTracer();
270 }
271 
272 }  // namespace grpc_observability
273