• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_PYTHON_OPENCENSUS_SERVER_CALL_TRACER_H
16 #define GRPC_PYTHON_OPENCENSUS_SERVER_CALL_TRACER_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <atomic>
21 
22 #include "absl/strings/string_view.h"
23 #include "absl/strings/strip.h"
24 #include "constants.h"
25 #include "metadata_exchange.h"
26 #include "python_observability_context.h"
27 #include "src/core/lib/resource_quota/arena.h"
28 #include "src/core/lib/slice/slice.h"
29 #include "src/core/telemetry/call_tracer.h"
30 
31 namespace grpc_observability {
32 
33 class PythonOpenCensusServerCallTracerFactory
34     : public grpc_core::ServerCallTracerFactory {
35  public:
36   grpc_core::ServerCallTracer* CreateNewServerCallTracer(
37       grpc_core::Arena* arena,
38       const grpc_core::ChannelArgs& channel_args) override;
39   explicit PythonOpenCensusServerCallTracerFactory(
40       const std::vector<Label>& exchange_labels, const char* identifier);
41 
42   bool IsServerTraced(const grpc_core::ChannelArgs& args) override;
43 
44  private:
45   const std::vector<Label> exchange_labels_;
46   std::string identifier_;
47 };
48 
GetMethod(const grpc_core::Slice & path)49 inline absl::string_view GetMethod(const grpc_core::Slice& path) {
50   if (path.empty()) {
51     return "";
52   }
53   // Check for leading '/' and trim it if present.
54   return absl::StripPrefix(path.as_string_view(), "/");
55 }
56 
57 class PythonOpenCensusServerCallTracer : public grpc_core::ServerCallTracer {
58  public:
59   // Maximum size of server stats that are sent on the wire.
60   static constexpr uint32_t kMaxServerStatsLen = 16;
61 
PythonOpenCensusServerCallTracer(const std::vector<Label> & exchange_labels,std::string identifier)62   PythonOpenCensusServerCallTracer(const std::vector<Label>& exchange_labels,
63                                    std::string identifier)
64       : start_time_(absl::Now()),
65         recv_message_count_(0),
66         sent_message_count_(0),
67         labels_injector_(exchange_labels),
68         identifier_(identifier) {}
69 
70   std::string TraceId() override;
71 
72   std::string SpanId() override;
73 
74   bool IsSampled() override;
75 
76   // Please refer to `grpc_transport_stream_op_batch_payload` for details on
77   // arguments.
78   // It's not a requirement to have this metric thus left unimplemented.
79   void RecordSendInitialMetadata(
80       grpc_metadata_batch* send_initial_metadata) override;
81 
82   void RecordSendTrailingMetadata(
83       grpc_metadata_batch* send_trailing_metadata) override;
84 
85   void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override;
86 
87   void RecordSendCompressedMessage(
88       const grpc_core::SliceBuffer& send_compressed_message) override;
89 
90   void RecordReceivedInitialMetadata(
91       grpc_metadata_batch* recv_initial_metadata) override;
92 
93   void RecordReceivedMessage(
94       const grpc_core::SliceBuffer& recv_message) override;
95 
96   void RecordReceivedDecompressedMessage(
97       const grpc_core::SliceBuffer& recv_decompressed_message) override;
98 
RecordReceivedTrailingMetadata(grpc_metadata_batch *)99   void RecordReceivedTrailingMetadata(
100       grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
101 
102   void RecordCancel(grpc_error_handle /*cancel_error*/) override;
103 
104   void RecordEnd(const grpc_call_final_info* final_info) override;
105 
106   void RecordIncomingBytes(
107       const TransportByteSize& transport_byte_size) override;
108 
109   void RecordOutgoingBytes(
110       const TransportByteSize& transport_byte_size) override;
111 
112   void RecordAnnotation(absl::string_view annotation) override;
113 
114   void RecordAnnotation(const Annotation& annotation) override;
115 
116   std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override;
117 
118  private:
119   PythonCensusContext context_;
120   // server method
121   grpc_core::Slice path_;
122   absl::string_view method_;
123   absl::Time start_time_;
124   absl::Duration elapsed_time_;
125   uint64_t recv_message_count_;
126   uint64_t sent_message_count_;
127   // Buffer needed for grpc_slice to reference it when adding metadata to
128   // response.
129   char stats_buf_[kMaxServerStatsLen];
130   PythonLabelsInjector labels_injector_;
131   std::vector<Label> labels_from_peer_;
132   std::string identifier_;
133   bool registered_method_ = false;
134   // TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated
135   // to promises, after which we can ensure that the transport invokes
136   // the RecordIncomingBytes() and RecordOutgoingBytes() methods inside
137   // the call's party.
138   std::atomic<uint64_t> incoming_bytes_{0};
139   std::atomic<uint64_t> outgoing_bytes_{0};
140 };
141 
142 }  // namespace grpc_observability
143 
144 #endif  // GRPC_PYTHON_OPENCENSUS_SERVER_CALL_TRACER_H
145