1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #ifndef GRPC_PYTHON_OPENCENSUS_SERVER_CALL_TRACER_H
16 #define GRPC_PYTHON_OPENCENSUS_SERVER_CALL_TRACER_H
17
18 #include <grpc/support/port_platform.h>
19
20 #include <atomic>
21
22 #include "absl/strings/string_view.h"
23 #include "absl/strings/strip.h"
24 #include "constants.h"
25 #include "metadata_exchange.h"
26 #include "python_observability_context.h"
27 #include "src/core/lib/resource_quota/arena.h"
28 #include "src/core/lib/slice/slice.h"
29 #include "src/core/telemetry/call_tracer.h"
30
31 namespace grpc_observability {
32
33 class PythonOpenCensusServerCallTracerFactory
34 : public grpc_core::ServerCallTracerFactory {
35 public:
36 grpc_core::ServerCallTracer* CreateNewServerCallTracer(
37 grpc_core::Arena* arena,
38 const grpc_core::ChannelArgs& channel_args) override;
39 explicit PythonOpenCensusServerCallTracerFactory(
40 const std::vector<Label>& exchange_labels, const char* identifier);
41
42 bool IsServerTraced(const grpc_core::ChannelArgs& args) override;
43
44 private:
45 const std::vector<Label> exchange_labels_;
46 std::string identifier_;
47 };
48
GetMethod(const grpc_core::Slice & path)49 inline absl::string_view GetMethod(const grpc_core::Slice& path) {
50 if (path.empty()) {
51 return "";
52 }
53 // Check for leading '/' and trim it if present.
54 return absl::StripPrefix(path.as_string_view(), "/");
55 }
56
57 class PythonOpenCensusServerCallTracer : public grpc_core::ServerCallTracer {
58 public:
59 // Maximum size of server stats that are sent on the wire.
60 static constexpr uint32_t kMaxServerStatsLen = 16;
61
PythonOpenCensusServerCallTracer(const std::vector<Label> & exchange_labels,std::string identifier)62 PythonOpenCensusServerCallTracer(const std::vector<Label>& exchange_labels,
63 std::string identifier)
64 : start_time_(absl::Now()),
65 recv_message_count_(0),
66 sent_message_count_(0),
67 labels_injector_(exchange_labels),
68 identifier_(identifier) {}
69
70 std::string TraceId() override;
71
72 std::string SpanId() override;
73
74 bool IsSampled() override;
75
76 // Please refer to `grpc_transport_stream_op_batch_payload` for details on
77 // arguments.
78 // It's not a requirement to have this metric thus left unimplemented.
79 void RecordSendInitialMetadata(
80 grpc_metadata_batch* send_initial_metadata) override;
81
82 void RecordSendTrailingMetadata(
83 grpc_metadata_batch* send_trailing_metadata) override;
84
85 void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override;
86
87 void RecordSendCompressedMessage(
88 const grpc_core::SliceBuffer& send_compressed_message) override;
89
90 void RecordReceivedInitialMetadata(
91 grpc_metadata_batch* recv_initial_metadata) override;
92
93 void RecordReceivedMessage(
94 const grpc_core::SliceBuffer& recv_message) override;
95
96 void RecordReceivedDecompressedMessage(
97 const grpc_core::SliceBuffer& recv_decompressed_message) override;
98
RecordReceivedTrailingMetadata(grpc_metadata_batch *)99 void RecordReceivedTrailingMetadata(
100 grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
101
102 void RecordCancel(grpc_error_handle /*cancel_error*/) override;
103
104 void RecordEnd(const grpc_call_final_info* final_info) override;
105
106 void RecordIncomingBytes(
107 const TransportByteSize& transport_byte_size) override;
108
109 void RecordOutgoingBytes(
110 const TransportByteSize& transport_byte_size) override;
111
112 void RecordAnnotation(absl::string_view annotation) override;
113
114 void RecordAnnotation(const Annotation& annotation) override;
115
116 std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override;
117
118 private:
119 PythonCensusContext context_;
120 // server method
121 grpc_core::Slice path_;
122 absl::string_view method_;
123 absl::Time start_time_;
124 absl::Duration elapsed_time_;
125 uint64_t recv_message_count_;
126 uint64_t sent_message_count_;
127 // Buffer needed for grpc_slice to reference it when adding metadata to
128 // response.
129 char stats_buf_[kMaxServerStatsLen];
130 PythonLabelsInjector labels_injector_;
131 std::vector<Label> labels_from_peer_;
132 std::string identifier_;
133 bool registered_method_ = false;
134 // TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated
135 // to promises, after which we can ensure that the transport invokes
136 // the RecordIncomingBytes() and RecordOutgoingBytes() methods inside
137 // the call's party.
138 std::atomic<uint64_t> incoming_bytes_{0};
139 std::atomic<uint64_t> outgoing_bytes_{0};
140 };
141
142 } // namespace grpc_observability
143
144 #endif // GRPC_PYTHON_OPENCENSUS_SERVER_CALL_TRACER_H
145