• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_PYTHON_OPENCENSUS_CLIENT_CALL_TRACER_H
16 #define GRPC_PYTHON_OPENCENSUS_CLIENT_CALL_TRACER_H
17 
18 #include <grpc/support/time.h>
19 #include <stdint.h>
20 
21 #include <atomic>
22 #include <string>
23 
24 #include "absl/base/thread_annotations.h"
25 #include "absl/status/status.h"
26 #include "absl/strings/escaping.h"
27 #include "absl/strings/string_view.h"
28 #include "absl/time/time.h"
29 #include "metadata_exchange.h"
30 #include "python_observability_context.h"
31 #include "src/core/telemetry/call_tracer.h"
32 
33 namespace grpc_observability {
34 
35 class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer {
36  public:
37   class PythonOpenCensusCallAttemptTracer : public CallAttemptTracer {
38    public:
39     PythonOpenCensusCallAttemptTracer(PythonOpenCensusCallTracer* parent,
40                                       uint64_t attempt_num,
41                                       bool is_transparent_retry);
TraceId()42     std::string TraceId() override {
43       return absl::BytesToHexString(
44           absl::string_view(context_.GetSpanContext().TraceId()));
45     }
46 
SpanId()47     std::string SpanId() override {
48       return absl::BytesToHexString(
49           absl::string_view(context_.GetSpanContext().SpanId()));
50     }
51 
IsSampled()52     bool IsSampled() override { return context_.GetSpanContext().IsSampled(); }
53 
54     void RecordSendInitialMetadata(
55         grpc_metadata_batch* send_initial_metadata) override;
RecordSendTrailingMetadata(grpc_metadata_batch *)56     void RecordSendTrailingMetadata(
57         grpc_metadata_batch* /*send_trailing_metadata*/) override {}
58     void RecordSendMessage(
59         const grpc_core::SliceBuffer& /*send_message*/) override;
RecordSendCompressedMessage(const grpc_core::SliceBuffer &)60     void RecordSendCompressedMessage(
61         const grpc_core::SliceBuffer& /*send_compressed_message*/) override {}
62     void RecordReceivedInitialMetadata(
63         grpc_metadata_batch* /*recv_initial_metadata*/) override;
64     void RecordReceivedMessage(
65         const grpc_core::SliceBuffer& /*recv_message*/) override;
RecordReceivedDecompressedMessage(const grpc_core::SliceBuffer &)66     void RecordReceivedDecompressedMessage(
67         const grpc_core::SliceBuffer& /*recv_decompressed_message*/) override {}
68     void RecordReceivedTrailingMetadata(
69         absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
70         const grpc_transport_stream_stats* transport_stream_stats) override;
71     void RecordIncomingBytes(
72         const TransportByteSize& transport_byte_size) override;
73     void RecordOutgoingBytes(
74         const TransportByteSize& transport_byte_size) override;
75     void RecordCancel(grpc_error_handle cancel_error) override;
76     void RecordEnd(const gpr_timespec& /*latency*/) override;
77     void RecordAnnotation(absl::string_view annotation) override;
78     void RecordAnnotation(const Annotation& annotation) override;
79     std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override;
80     void SetOptionalLabel(OptionalLabelKey key,
81                           grpc_core::RefCountedStringValue value) override;
82 
83    private:
84     // Maximum size of trace context is sent on the wire.
85     static constexpr uint32_t kMaxTraceContextLen = 64;
86     // Maximum size of tags that are sent on the wire.
87     static constexpr uint32_t kMaxTagsLen = 2048;
88     PythonOpenCensusCallTracer* parent_;
89     PythonCensusContext context_;
90     // Start time (for measuring latency).
91     absl::Time start_time_;
92     // Number of messages in this RPC.
93     uint64_t recv_message_count_ = 0;
94     uint64_t sent_message_count_ = 0;
95     // End status code
96     absl::StatusCode status_code_;
97     // Avoid std::map to avoid per-call allocations.
98     std::array<grpc_core::RefCountedStringValue,
99                static_cast<size_t>(OptionalLabelKey::kSize)>
100         optional_labels_array_;
101     std::vector<Label> labels_from_peer_;
102     bool is_trailers_only_ = false;
103     // TODO(roth, ctiller): Won't need atomic here once chttp2 is migrated
104     // to promises, after which we can ensure that the transport invokes
105     // the RecordIncomingBytes() and RecordOutgoingBytes() methods inside
106     // the call's party.
107     std::atomic<uint64_t> incoming_bytes_{0};
108     std::atomic<uint64_t> outgoing_bytes_{0};
109   };
110 
111   explicit PythonOpenCensusCallTracer(
112       const char* method, const char* target, const char* trace_id,
113       const char* parent_span_id, const char* identifier,
114       const std::vector<Label>& exchange_labels, bool tracing_enabled,
115       bool add_csm_optional_labels, bool registered_method);
116   ~PythonOpenCensusCallTracer() override;
117 
TraceId()118   std::string TraceId() override {
119     return absl::BytesToHexString(
120         absl::string_view(context_.GetSpanContext().TraceId()));
121   }
122 
SpanId()123   std::string SpanId() override {
124     return absl::BytesToHexString(
125         absl::string_view(context_.GetSpanContext().SpanId()));
126   }
127 
IsSampled()128   bool IsSampled() override { return context_.GetSpanContext().IsSampled(); }
129 
130   void GenerateContext();
131   PythonOpenCensusCallAttemptTracer* StartNewAttempt(
132       bool is_transparent_retry) override;
133 
134   void RecordAnnotation(absl::string_view annotation) override;
135   void RecordAnnotation(const Annotation& annotation) override;
136 
137  private:
138   PythonCensusContext CreateCensusContextForCallAttempt();
139 
140   // Client method.
141   std::string method_;
142   // Client target.
143   std::string target_;
144   PythonCensusContext context_;
145   bool tracing_enabled_;
146   bool add_csm_optional_labels_;
147   mutable grpc_core::Mutex mu_;
148   PythonLabelsInjector labels_injector_;
149   std::string identifier_;
150   const bool registered_method_;
151   // Non-transparent attempts per call
152   uint64_t retries_ ABSL_GUARDED_BY(&mu_) = 0;
153   // Transparent retries per call
154   uint64_t transparent_retries_ ABSL_GUARDED_BY(&mu_) = 0;
155   // Retry delay
156   absl::Duration retry_delay_ ABSL_GUARDED_BY(&mu_);
157   absl::Time time_at_last_attempt_end_ ABSL_GUARDED_BY(&mu_);
158   uint64_t num_active_rpcs_ ABSL_GUARDED_BY(&mu_) = 0;
159 };
160 
161 }  // namespace grpc_observability
162 
163 #endif  // GRPC_PYTHON_OPENCENSUS_CLIENT_CALL_TRACER_H
164