• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CPP_EXT_OTEL_OTEL_SERVER_CALL_TRACER_H
20 #define GRPC_SRC_CPP_EXT_OTEL_OTEL_SERVER_CALL_TRACER_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include "absl/strings/strip.h"
25 #include "src/core/lib/channel/channel_args.h"
26 #include "src/core/telemetry/call_tracer.h"
27 #include "src/cpp/ext/otel/otel_plugin.h"
28 
29 namespace grpc {
30 namespace internal {
31 
32 // OpenTelemetryPluginImpl::ServerCallTracer implementation
33 
34 class OpenTelemetryPluginImpl::ServerCallTracer
35     : public grpc_core::ServerCallTracer {
36  public:
ServerCallTracer(OpenTelemetryPluginImpl * otel_plugin,std::shared_ptr<OpenTelemetryPluginImpl::ServerScopeConfig> scope_config)37   ServerCallTracer(
38       OpenTelemetryPluginImpl* otel_plugin,
39       std::shared_ptr<OpenTelemetryPluginImpl::ServerScopeConfig> scope_config)
40       : start_time_(absl::Now()),
41         injected_labels_from_plugin_options_(
42             otel_plugin->plugin_options().size()),
43         otel_plugin_(otel_plugin),
44         scope_config_(std::move(scope_config)) {}
45 
TraceId()46   std::string TraceId() override {
47     // Not implemented
48     return "";
49   }
50 
SpanId()51   std::string SpanId() override {
52     // Not implemented
53     return "";
54   }
55 
IsSampled()56   bool IsSampled() override {
57     // Not implemented
58     return false;
59   }
60 
61   // Please refer to `grpc_transport_stream_op_batch_payload` for details on
62   // arguments.
63   void RecordSendInitialMetadata(
64       grpc_metadata_batch* send_initial_metadata) override;
65 
66   void RecordSendTrailingMetadata(
67       grpc_metadata_batch* /*send_trailing_metadata*/) override;
68 
RecordSendMessage(const grpc_core::SliceBuffer & send_message)69   void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override {
70     RecordAnnotation(
71         absl::StrFormat("Send message: %ld bytes", send_message.Length()));
72   }
RecordSendCompressedMessage(const grpc_core::SliceBuffer & send_compressed_message)73   void RecordSendCompressedMessage(
74       const grpc_core::SliceBuffer& send_compressed_message) override {
75     RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
76                                      send_compressed_message.Length()));
77   }
78 
79   void RecordReceivedInitialMetadata(
80       grpc_metadata_batch* recv_initial_metadata) override;
81 
RecordReceivedMessage(const grpc_core::SliceBuffer & recv_message)82   void RecordReceivedMessage(
83       const grpc_core::SliceBuffer& recv_message) override {
84     RecordAnnotation(
85         absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
86   }
RecordReceivedDecompressedMessage(const grpc_core::SliceBuffer & recv_decompressed_message)87   void RecordReceivedDecompressedMessage(
88       const grpc_core::SliceBuffer& recv_decompressed_message) override {
89     RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
90                                      recv_decompressed_message.Length()));
91   }
92 
RecordReceivedTrailingMetadata(grpc_metadata_batch *)93   void RecordReceivedTrailingMetadata(
94       grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
95 
RecordCancel(grpc_error_handle)96   void RecordCancel(grpc_error_handle /*cancel_error*/) override {
97     elapsed_time_ = absl::Now() - start_time_;
98   }
99 
100   void RecordEnd(const grpc_call_final_info* final_info) override;
101 
102   void RecordIncomingBytes(
103       const TransportByteSize& transport_byte_size) override;
104   void RecordOutgoingBytes(
105       const TransportByteSize& transport_byte_size) override;
106 
RecordAnnotation(absl::string_view)107   void RecordAnnotation(absl::string_view /*annotation*/) override {
108     // Not implemented
109   }
110 
RecordAnnotation(const Annotation &)111   void RecordAnnotation(const Annotation& /*annotation*/) override {
112     // Not implemented
113   }
StartNewTcpTrace()114   std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override {
115     // No TCP trace.
116     return nullptr;
117   }
118 
119  private:
MethodForStats()120   absl::string_view MethodForStats() const {
121     absl::string_view method = absl::StripPrefix(path_.as_string_view(), "/");
122     if (registered_method_ ||
123         (otel_plugin_->generic_method_attribute_filter() != nullptr &&
124          otel_plugin_->generic_method_attribute_filter()(method))) {
125       return method;
126     }
127     return "other";
128   }
129 
130   absl::Time start_time_;
131   absl::Duration elapsed_time_;
132   grpc_core::Slice path_;
133   bool registered_method_;
134   std::vector<std::unique_ptr<LabelsIterable>>
135       injected_labels_from_plugin_options_;
136   OpenTelemetryPluginImpl* otel_plugin_;
137   std::shared_ptr<OpenTelemetryPluginImpl::ServerScopeConfig> scope_config_;
138   // TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated
139   // to promises, after which we can ensure that the transport invokes
140   // the RecordIncomingBytes() and RecordOutgoingBytes() methods inside
141   // the call's party.
142   std::atomic<uint64_t> incoming_bytes_{0};
143   std::atomic<uint64_t> outgoing_bytes_{0};
144 };
145 
146 }  // namespace internal
147 }  // namespace grpc
148 
149 #endif  // GRPC_SRC_CPP_EXT_OTEL_OTEL_SERVER_CALL_TRACER_H
150