• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/telemetry/call_tracer.h"
20 
21 #include <grpc/support/port_platform.h>
22 
23 #include <memory>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/log/check.h"
28 #include "src/core/lib/promise/context.h"
29 #include "src/core/telemetry/tcp_tracer.h"
30 
31 namespace grpc_core {
32 
33 CallTracerInterface::TransportByteSize&
operator +=(const CallTracerInterface::TransportByteSize & other)34 CallTracerInterface::TransportByteSize::operator+=(
35     const CallTracerInterface::TransportByteSize& other) {
36   framing_bytes += other.framing_bytes;
37   data_bytes += other.data_bytes;
38   header_bytes += other.header_bytes;
39   return *this;
40 }
41 
42 //
43 // ServerCallTracerFactory
44 //
45 
46 namespace {
47 
48 ServerCallTracerFactory* g_server_call_tracer_factory_ = nullptr;
49 
50 const char* kServerCallTracerFactoryChannelArgName =
51     "grpc.experimental.server_call_tracer_factory";
52 
53 }  // namespace
54 
Get(const ChannelArgs & channel_args)55 ServerCallTracerFactory* ServerCallTracerFactory::Get(
56     const ChannelArgs& channel_args) {
57   ServerCallTracerFactory* factory =
58       channel_args.GetObject<ServerCallTracerFactory>();
59   if (factory == nullptr) {
60     factory = g_server_call_tracer_factory_;
61   }
62   if (factory && factory->IsServerTraced(channel_args)) {
63     return factory;
64   }
65   return nullptr;
66 }
67 
RegisterGlobal(ServerCallTracerFactory * factory)68 void ServerCallTracerFactory::RegisterGlobal(ServerCallTracerFactory* factory) {
69   g_server_call_tracer_factory_ = factory;
70 }
71 
TestOnlyReset()72 void ServerCallTracerFactory::TestOnlyReset() {
73   delete g_server_call_tracer_factory_;
74   g_server_call_tracer_factory_ = nullptr;
75 }
76 
ChannelArgName()77 absl::string_view ServerCallTracerFactory::ChannelArgName() {
78   return kServerCallTracerFactoryChannelArgName;
79 }
80 
81 class DelegatingClientCallTracer : public ClientCallTracer {
82  public:
83   class DelegatingClientCallAttemptTracer
84       : public ClientCallTracer::CallAttemptTracer {
85    public:
DelegatingClientCallAttemptTracer(std::vector<CallAttemptTracer * > tracers)86     explicit DelegatingClientCallAttemptTracer(
87         std::vector<CallAttemptTracer*> tracers)
88         : tracers_(std::move(tracers)) {
89       DCHECK(!tracers_.empty());
90     }
~DelegatingClientCallAttemptTracer()91     ~DelegatingClientCallAttemptTracer() override {}
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)92     void RecordSendInitialMetadata(
93         grpc_metadata_batch* send_initial_metadata) override {
94       for (auto* tracer : tracers_) {
95         tracer->RecordSendInitialMetadata(send_initial_metadata);
96       }
97     }
RecordSendTrailingMetadata(grpc_metadata_batch * send_trailing_metadata)98     void RecordSendTrailingMetadata(
99         grpc_metadata_batch* send_trailing_metadata) override {
100       for (auto* tracer : tracers_) {
101         tracer->RecordSendTrailingMetadata(send_trailing_metadata);
102       }
103     }
RecordSendMessage(const SliceBuffer & send_message)104     void RecordSendMessage(const SliceBuffer& send_message) override {
105       for (auto* tracer : tracers_) {
106         tracer->RecordSendMessage(send_message);
107       }
108     }
RecordSendCompressedMessage(const SliceBuffer & send_compressed_message)109     void RecordSendCompressedMessage(
110         const SliceBuffer& send_compressed_message) override {
111       for (auto* tracer : tracers_) {
112         tracer->RecordSendCompressedMessage(send_compressed_message);
113       }
114     }
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)115     void RecordReceivedInitialMetadata(
116         grpc_metadata_batch* recv_initial_metadata) override {
117       for (auto* tracer : tracers_) {
118         tracer->RecordReceivedInitialMetadata(recv_initial_metadata);
119       }
120     }
RecordReceivedMessage(const SliceBuffer & recv_message)121     void RecordReceivedMessage(const SliceBuffer& recv_message) override {
122       for (auto* tracer : tracers_) {
123         tracer->RecordReceivedMessage(recv_message);
124       }
125     }
RecordReceivedDecompressedMessage(const SliceBuffer & recv_decompressed_message)126     void RecordReceivedDecompressedMessage(
127         const SliceBuffer& recv_decompressed_message) override {
128       for (auto* tracer : tracers_) {
129         tracer->RecordReceivedDecompressedMessage(recv_decompressed_message);
130       }
131     }
RecordCancel(grpc_error_handle cancel_error)132     void RecordCancel(grpc_error_handle cancel_error) override {
133       for (auto* tracer : tracers_) {
134         tracer->RecordCancel(cancel_error);
135       }
136     }
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats * transport_stream_stats)137     void RecordReceivedTrailingMetadata(
138         absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
139         const grpc_transport_stream_stats* transport_stream_stats) override {
140       for (auto* tracer : tracers_) {
141         tracer->RecordReceivedTrailingMetadata(status, recv_trailing_metadata,
142                                                transport_stream_stats);
143       }
144     }
RecordEnd(const gpr_timespec & latency)145     void RecordEnd(const gpr_timespec& latency) override {
146       for (auto* tracer : tracers_) {
147         tracer->RecordEnd(latency);
148       }
149     }
RecordIncomingBytes(const TransportByteSize & transport_byte_size)150     void RecordIncomingBytes(
151         const TransportByteSize& transport_byte_size) override {
152       for (auto* tracer : tracers_) {
153         tracer->RecordIncomingBytes(transport_byte_size);
154       }
155     }
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)156     void RecordOutgoingBytes(
157         const TransportByteSize& transport_byte_size) override {
158       for (auto* tracer : tracers_) {
159         tracer->RecordOutgoingBytes(transport_byte_size);
160       }
161     }
RecordAnnotation(absl::string_view annotation)162     void RecordAnnotation(absl::string_view annotation) override {
163       for (auto* tracer : tracers_) {
164         tracer->RecordAnnotation(annotation);
165       }
166     }
RecordAnnotation(const Annotation & annotation)167     void RecordAnnotation(const Annotation& annotation) override {
168       for (auto* tracer : tracers_) {
169         tracer->RecordAnnotation(annotation);
170       }
171     }
StartNewTcpTrace()172     std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
173       return nullptr;
174     }
SetOptionalLabel(OptionalLabelKey key,RefCountedStringValue value)175     void SetOptionalLabel(OptionalLabelKey key,
176                           RefCountedStringValue value) override {
177       for (auto* tracer : tracers_) {
178         tracer->SetOptionalLabel(key, value);
179       }
180     }
TraceId()181     std::string TraceId() override { return tracers_[0]->TraceId(); }
SpanId()182     std::string SpanId() override { return tracers_[0]->SpanId(); }
IsSampled()183     bool IsSampled() override { return tracers_[0]->IsSampled(); }
IsDelegatingTracer()184     bool IsDelegatingTracer() override { return true; }
185 
186    private:
187     // There is no additional synchronization needed since filters/interceptors
188     // will be adding call tracers to the context and these are already
189     // synchronized through promises/call combiners (single promise running per
190     // call at any moment).
191     std::vector<CallAttemptTracer*> tracers_;
192   };
DelegatingClientCallTracer(ClientCallTracer * tracer)193   explicit DelegatingClientCallTracer(ClientCallTracer* tracer)
194       : tracers_{tracer} {}
~DelegatingClientCallTracer()195   ~DelegatingClientCallTracer() override {}
StartNewAttempt(bool is_transparent_retry)196   CallAttemptTracer* StartNewAttempt(bool is_transparent_retry) override {
197     std::vector<CallAttemptTracer*> attempt_tracers;
198     attempt_tracers.reserve(tracers_.size());
199     for (auto* tracer : tracers_) {
200       auto* attempt_tracer = tracer->StartNewAttempt(is_transparent_retry);
201       DCHECK_NE(attempt_tracer, nullptr);
202       attempt_tracers.push_back(attempt_tracer);
203     }
204     return GetContext<Arena>()->ManagedNew<DelegatingClientCallAttemptTracer>(
205         std::move(attempt_tracers));
206   }
207 
RecordAnnotation(absl::string_view annotation)208   void RecordAnnotation(absl::string_view annotation) override {
209     for (auto* tracer : tracers_) {
210       tracer->RecordAnnotation(annotation);
211     }
212   }
RecordAnnotation(const Annotation & annotation)213   void RecordAnnotation(const Annotation& annotation) override {
214     for (auto* tracer : tracers_) {
215       tracer->RecordAnnotation(annotation);
216     }
217   }
TraceId()218   std::string TraceId() override { return tracers_[0]->TraceId(); }
SpanId()219   std::string SpanId() override { return tracers_[0]->SpanId(); }
IsSampled()220   bool IsSampled() override { return tracers_[0]->IsSampled(); }
IsDelegatingTracer()221   bool IsDelegatingTracer() override { return true; }
222 
223   // There is no additional synchronization needed since filters/interceptors
224   // will be adding call tracers to the context and these are already
225   // synchronized through promises/call combiners (single promise running per
226   // call at any moment).
AddTracer(ClientCallTracer * tracer)227   void AddTracer(ClientCallTracer* tracer) { tracers_.push_back(tracer); }
228 
229  private:
230   std::vector<ClientCallTracer*> tracers_;
231 };
232 
233 class DelegatingServerCallTracer : public ServerCallTracer {
234  public:
DelegatingServerCallTracer(ServerCallTracer * tracer)235   explicit DelegatingServerCallTracer(ServerCallTracer* tracer)
236       : tracers_{tracer} {}
~DelegatingServerCallTracer()237   ~DelegatingServerCallTracer() override {}
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)238   void RecordSendInitialMetadata(
239       grpc_metadata_batch* send_initial_metadata) override {
240     for (auto* tracer : tracers_) {
241       tracer->RecordSendInitialMetadata(send_initial_metadata);
242     }
243   }
RecordSendTrailingMetadata(grpc_metadata_batch * send_trailing_metadata)244   void RecordSendTrailingMetadata(
245       grpc_metadata_batch* send_trailing_metadata) override {
246     for (auto* tracer : tracers_) {
247       tracer->RecordSendTrailingMetadata(send_trailing_metadata);
248     }
249   }
RecordSendMessage(const SliceBuffer & send_message)250   void RecordSendMessage(const SliceBuffer& send_message) override {
251     for (auto* tracer : tracers_) {
252       tracer->RecordSendMessage(send_message);
253     }
254   }
RecordSendCompressedMessage(const SliceBuffer & send_compressed_message)255   void RecordSendCompressedMessage(
256       const SliceBuffer& send_compressed_message) override {
257     for (auto* tracer : tracers_) {
258       tracer->RecordSendCompressedMessage(send_compressed_message);
259     }
260   }
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)261   void RecordReceivedInitialMetadata(
262       grpc_metadata_batch* recv_initial_metadata) override {
263     for (auto* tracer : tracers_) {
264       tracer->RecordReceivedInitialMetadata(recv_initial_metadata);
265     }
266   }
RecordReceivedMessage(const SliceBuffer & recv_message)267   void RecordReceivedMessage(const SliceBuffer& recv_message) override {
268     for (auto* tracer : tracers_) {
269       tracer->RecordReceivedMessage(recv_message);
270     }
271   }
RecordReceivedDecompressedMessage(const SliceBuffer & recv_decompressed_message)272   void RecordReceivedDecompressedMessage(
273       const SliceBuffer& recv_decompressed_message) override {
274     for (auto* tracer : tracers_) {
275       tracer->RecordReceivedDecompressedMessage(recv_decompressed_message);
276     }
277   }
RecordCancel(grpc_error_handle cancel_error)278   void RecordCancel(grpc_error_handle cancel_error) override {
279     for (auto* tracer : tracers_) {
280       tracer->RecordCancel(cancel_error);
281     }
282   }
RecordReceivedTrailingMetadata(grpc_metadata_batch * recv_trailing_metadata)283   void RecordReceivedTrailingMetadata(
284       grpc_metadata_batch* recv_trailing_metadata) override {
285     for (auto* tracer : tracers_) {
286       tracer->RecordReceivedTrailingMetadata(recv_trailing_metadata);
287     }
288   }
RecordEnd(const grpc_call_final_info * final_info)289   void RecordEnd(const grpc_call_final_info* final_info) override {
290     for (auto* tracer : tracers_) {
291       tracer->RecordEnd(final_info);
292     }
293   }
RecordIncomingBytes(const TransportByteSize & transport_byte_size)294   void RecordIncomingBytes(
295       const TransportByteSize& transport_byte_size) override {
296     for (auto* tracer : tracers_) {
297       tracer->RecordIncomingBytes(transport_byte_size);
298     }
299   }
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)300   void RecordOutgoingBytes(
301       const TransportByteSize& transport_byte_size) override {
302     for (auto* tracer : tracers_) {
303       tracer->RecordOutgoingBytes(transport_byte_size);
304     }
305   }
RecordAnnotation(absl::string_view annotation)306   void RecordAnnotation(absl::string_view annotation) override {
307     for (auto* tracer : tracers_) {
308       tracer->RecordAnnotation(annotation);
309     }
310   }
RecordAnnotation(const Annotation & annotation)311   void RecordAnnotation(const Annotation& annotation) override {
312     for (auto* tracer : tracers_) {
313       tracer->RecordAnnotation(annotation);
314     }
315   }
StartNewTcpTrace()316   std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
317     return nullptr;
318   }
TraceId()319   std::string TraceId() override { return tracers_[0]->TraceId(); }
SpanId()320   std::string SpanId() override { return tracers_[0]->SpanId(); }
IsSampled()321   bool IsSampled() override { return tracers_[0]->IsSampled(); }
IsDelegatingTracer()322   bool IsDelegatingTracer() override { return true; }
323 
AddTracer(ServerCallTracer * tracer)324   void AddTracer(ServerCallTracer* tracer) { tracers_.push_back(tracer); }
325 
326  private:
327   // The ServerCallTracerFilter will be responsible for making sure that the
328   // tracers are added in a thread-safe manner. It is imagined that the filter
329   // will just invoke the factories in the server call tracer factory list
330   // sequentially, removing the need for any synchronization.
331   std::vector<ServerCallTracer*> tracers_;
332 };
333 
AddClientCallTracerToContext(Arena * arena,ClientCallTracer * tracer)334 void AddClientCallTracerToContext(Arena* arena, ClientCallTracer* tracer) {
335   if (arena->GetContext<CallTracerAnnotationInterface>() == nullptr) {
336     // This is the first call tracer. Set it directly.
337     arena->SetContext<CallTracerAnnotationInterface>(tracer);
338   } else {
339     // There was already a call tracer present.
340     auto* orig_tracer = DownCast<ClientCallTracer*>(
341         arena->GetContext<CallTracerAnnotationInterface>());
342     if (orig_tracer->IsDelegatingTracer()) {
343       // We already created a delegating tracer. Just add the new tracer to the
344       // list.
345       DownCast<DelegatingClientCallTracer*>(orig_tracer)->AddTracer(tracer);
346     } else {
347       // Create a new delegating tracer and add the first tracer and the new
348       // tracer to the list.
349       auto* delegating_tracer =
350           GetContext<Arena>()->ManagedNew<DelegatingClientCallTracer>(
351               orig_tracer);
352       arena->SetContext<CallTracerAnnotationInterface>(delegating_tracer);
353       delegating_tracer->AddTracer(tracer);
354     }
355   }
356 }
357 
AddServerCallTracerToContext(Arena * arena,ServerCallTracer * tracer)358 void AddServerCallTracerToContext(Arena* arena, ServerCallTracer* tracer) {
359   DCHECK_EQ(arena->GetContext<CallTracerInterface>(),
360             arena->GetContext<CallTracerAnnotationInterface>());
361   if (arena->GetContext<CallTracerAnnotationInterface>() == nullptr) {
362     // This is the first call tracer. Set it directly.
363     arena->SetContext<CallTracerAnnotationInterface>(tracer);
364     arena->SetContext<CallTracerInterface>(tracer);
365   } else {
366     // There was already a call tracer present.
367     auto* orig_tracer = DownCast<ServerCallTracer*>(
368         arena->GetContext<CallTracerAnnotationInterface>());
369     if (orig_tracer->IsDelegatingTracer()) {
370       // We already created a delegating tracer. Just add the new tracer to the
371       // list.
372       DownCast<DelegatingServerCallTracer*>(orig_tracer)->AddTracer(tracer);
373     } else {
374       // Create a new delegating tracer and add the first tracer and the new
375       // tracer to the list.
376       auto* delegating_tracer =
377           GetContext<Arena>()->ManagedNew<DelegatingServerCallTracer>(
378               orig_tracer);
379       arena->SetContext<CallTracerAnnotationInterface>(delegating_tracer);
380       arena->SetContext<CallTracerInterface>(delegating_tracer);
381       delegating_tracer->AddTracer(tracer);
382     }
383   }
384 }
385 
386 }  // namespace grpc_core
387