1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/telemetry/call_tracer.h"
20
21 #include <grpc/support/port_platform.h>
22
23 #include <memory>
24 #include <utility>
25 #include <vector>
26
27 #include "absl/log/check.h"
28 #include "src/core/lib/promise/context.h"
29 #include "src/core/telemetry/tcp_tracer.h"
30
31 namespace grpc_core {
32
33 CallTracerInterface::TransportByteSize&
operator +=(const CallTracerInterface::TransportByteSize & other)34 CallTracerInterface::TransportByteSize::operator+=(
35 const CallTracerInterface::TransportByteSize& other) {
36 framing_bytes += other.framing_bytes;
37 data_bytes += other.data_bytes;
38 header_bytes += other.header_bytes;
39 return *this;
40 }
41
42 //
43 // ServerCallTracerFactory
44 //
45
46 namespace {
47
48 ServerCallTracerFactory* g_server_call_tracer_factory_ = nullptr;
49
50 const char* kServerCallTracerFactoryChannelArgName =
51 "grpc.experimental.server_call_tracer_factory";
52
53 } // namespace
54
Get(const ChannelArgs & channel_args)55 ServerCallTracerFactory* ServerCallTracerFactory::Get(
56 const ChannelArgs& channel_args) {
57 ServerCallTracerFactory* factory =
58 channel_args.GetObject<ServerCallTracerFactory>();
59 if (factory == nullptr) {
60 factory = g_server_call_tracer_factory_;
61 }
62 if (factory && factory->IsServerTraced(channel_args)) {
63 return factory;
64 }
65 return nullptr;
66 }
67
RegisterGlobal(ServerCallTracerFactory * factory)68 void ServerCallTracerFactory::RegisterGlobal(ServerCallTracerFactory* factory) {
69 g_server_call_tracer_factory_ = factory;
70 }
71
TestOnlyReset()72 void ServerCallTracerFactory::TestOnlyReset() {
73 delete g_server_call_tracer_factory_;
74 g_server_call_tracer_factory_ = nullptr;
75 }
76
ChannelArgName()77 absl::string_view ServerCallTracerFactory::ChannelArgName() {
78 return kServerCallTracerFactoryChannelArgName;
79 }
80
81 class DelegatingClientCallTracer : public ClientCallTracer {
82 public:
83 class DelegatingClientCallAttemptTracer
84 : public ClientCallTracer::CallAttemptTracer {
85 public:
DelegatingClientCallAttemptTracer(std::vector<CallAttemptTracer * > tracers)86 explicit DelegatingClientCallAttemptTracer(
87 std::vector<CallAttemptTracer*> tracers)
88 : tracers_(std::move(tracers)) {
89 DCHECK(!tracers_.empty());
90 }
~DelegatingClientCallAttemptTracer()91 ~DelegatingClientCallAttemptTracer() override {}
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)92 void RecordSendInitialMetadata(
93 grpc_metadata_batch* send_initial_metadata) override {
94 for (auto* tracer : tracers_) {
95 tracer->RecordSendInitialMetadata(send_initial_metadata);
96 }
97 }
RecordSendTrailingMetadata(grpc_metadata_batch * send_trailing_metadata)98 void RecordSendTrailingMetadata(
99 grpc_metadata_batch* send_trailing_metadata) override {
100 for (auto* tracer : tracers_) {
101 tracer->RecordSendTrailingMetadata(send_trailing_metadata);
102 }
103 }
RecordSendMessage(const SliceBuffer & send_message)104 void RecordSendMessage(const SliceBuffer& send_message) override {
105 for (auto* tracer : tracers_) {
106 tracer->RecordSendMessage(send_message);
107 }
108 }
RecordSendCompressedMessage(const SliceBuffer & send_compressed_message)109 void RecordSendCompressedMessage(
110 const SliceBuffer& send_compressed_message) override {
111 for (auto* tracer : tracers_) {
112 tracer->RecordSendCompressedMessage(send_compressed_message);
113 }
114 }
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)115 void RecordReceivedInitialMetadata(
116 grpc_metadata_batch* recv_initial_metadata) override {
117 for (auto* tracer : tracers_) {
118 tracer->RecordReceivedInitialMetadata(recv_initial_metadata);
119 }
120 }
RecordReceivedMessage(const SliceBuffer & recv_message)121 void RecordReceivedMessage(const SliceBuffer& recv_message) override {
122 for (auto* tracer : tracers_) {
123 tracer->RecordReceivedMessage(recv_message);
124 }
125 }
RecordReceivedDecompressedMessage(const SliceBuffer & recv_decompressed_message)126 void RecordReceivedDecompressedMessage(
127 const SliceBuffer& recv_decompressed_message) override {
128 for (auto* tracer : tracers_) {
129 tracer->RecordReceivedDecompressedMessage(recv_decompressed_message);
130 }
131 }
RecordCancel(grpc_error_handle cancel_error)132 void RecordCancel(grpc_error_handle cancel_error) override {
133 for (auto* tracer : tracers_) {
134 tracer->RecordCancel(cancel_error);
135 }
136 }
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats * transport_stream_stats)137 void RecordReceivedTrailingMetadata(
138 absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
139 const grpc_transport_stream_stats* transport_stream_stats) override {
140 for (auto* tracer : tracers_) {
141 tracer->RecordReceivedTrailingMetadata(status, recv_trailing_metadata,
142 transport_stream_stats);
143 }
144 }
RecordEnd(const gpr_timespec & latency)145 void RecordEnd(const gpr_timespec& latency) override {
146 for (auto* tracer : tracers_) {
147 tracer->RecordEnd(latency);
148 }
149 }
RecordIncomingBytes(const TransportByteSize & transport_byte_size)150 void RecordIncomingBytes(
151 const TransportByteSize& transport_byte_size) override {
152 for (auto* tracer : tracers_) {
153 tracer->RecordIncomingBytes(transport_byte_size);
154 }
155 }
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)156 void RecordOutgoingBytes(
157 const TransportByteSize& transport_byte_size) override {
158 for (auto* tracer : tracers_) {
159 tracer->RecordOutgoingBytes(transport_byte_size);
160 }
161 }
RecordAnnotation(absl::string_view annotation)162 void RecordAnnotation(absl::string_view annotation) override {
163 for (auto* tracer : tracers_) {
164 tracer->RecordAnnotation(annotation);
165 }
166 }
RecordAnnotation(const Annotation & annotation)167 void RecordAnnotation(const Annotation& annotation) override {
168 for (auto* tracer : tracers_) {
169 tracer->RecordAnnotation(annotation);
170 }
171 }
StartNewTcpTrace()172 std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
173 return nullptr;
174 }
SetOptionalLabel(OptionalLabelKey key,RefCountedStringValue value)175 void SetOptionalLabel(OptionalLabelKey key,
176 RefCountedStringValue value) override {
177 for (auto* tracer : tracers_) {
178 tracer->SetOptionalLabel(key, value);
179 }
180 }
TraceId()181 std::string TraceId() override { return tracers_[0]->TraceId(); }
SpanId()182 std::string SpanId() override { return tracers_[0]->SpanId(); }
IsSampled()183 bool IsSampled() override { return tracers_[0]->IsSampled(); }
IsDelegatingTracer()184 bool IsDelegatingTracer() override { return true; }
185
186 private:
187 // There is no additional synchronization needed since filters/interceptors
188 // will be adding call tracers to the context and these are already
189 // synchronized through promises/call combiners (single promise running per
190 // call at any moment).
191 std::vector<CallAttemptTracer*> tracers_;
192 };
DelegatingClientCallTracer(ClientCallTracer * tracer)193 explicit DelegatingClientCallTracer(ClientCallTracer* tracer)
194 : tracers_{tracer} {}
~DelegatingClientCallTracer()195 ~DelegatingClientCallTracer() override {}
StartNewAttempt(bool is_transparent_retry)196 CallAttemptTracer* StartNewAttempt(bool is_transparent_retry) override {
197 std::vector<CallAttemptTracer*> attempt_tracers;
198 attempt_tracers.reserve(tracers_.size());
199 for (auto* tracer : tracers_) {
200 auto* attempt_tracer = tracer->StartNewAttempt(is_transparent_retry);
201 DCHECK_NE(attempt_tracer, nullptr);
202 attempt_tracers.push_back(attempt_tracer);
203 }
204 return GetContext<Arena>()->ManagedNew<DelegatingClientCallAttemptTracer>(
205 std::move(attempt_tracers));
206 }
207
RecordAnnotation(absl::string_view annotation)208 void RecordAnnotation(absl::string_view annotation) override {
209 for (auto* tracer : tracers_) {
210 tracer->RecordAnnotation(annotation);
211 }
212 }
RecordAnnotation(const Annotation & annotation)213 void RecordAnnotation(const Annotation& annotation) override {
214 for (auto* tracer : tracers_) {
215 tracer->RecordAnnotation(annotation);
216 }
217 }
TraceId()218 std::string TraceId() override { return tracers_[0]->TraceId(); }
SpanId()219 std::string SpanId() override { return tracers_[0]->SpanId(); }
IsSampled()220 bool IsSampled() override { return tracers_[0]->IsSampled(); }
IsDelegatingTracer()221 bool IsDelegatingTracer() override { return true; }
222
223 // There is no additional synchronization needed since filters/interceptors
224 // will be adding call tracers to the context and these are already
225 // synchronized through promises/call combiners (single promise running per
226 // call at any moment).
AddTracer(ClientCallTracer * tracer)227 void AddTracer(ClientCallTracer* tracer) { tracers_.push_back(tracer); }
228
229 private:
230 std::vector<ClientCallTracer*> tracers_;
231 };
232
233 class DelegatingServerCallTracer : public ServerCallTracer {
234 public:
DelegatingServerCallTracer(ServerCallTracer * tracer)235 explicit DelegatingServerCallTracer(ServerCallTracer* tracer)
236 : tracers_{tracer} {}
~DelegatingServerCallTracer()237 ~DelegatingServerCallTracer() override {}
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)238 void RecordSendInitialMetadata(
239 grpc_metadata_batch* send_initial_metadata) override {
240 for (auto* tracer : tracers_) {
241 tracer->RecordSendInitialMetadata(send_initial_metadata);
242 }
243 }
RecordSendTrailingMetadata(grpc_metadata_batch * send_trailing_metadata)244 void RecordSendTrailingMetadata(
245 grpc_metadata_batch* send_trailing_metadata) override {
246 for (auto* tracer : tracers_) {
247 tracer->RecordSendTrailingMetadata(send_trailing_metadata);
248 }
249 }
RecordSendMessage(const SliceBuffer & send_message)250 void RecordSendMessage(const SliceBuffer& send_message) override {
251 for (auto* tracer : tracers_) {
252 tracer->RecordSendMessage(send_message);
253 }
254 }
RecordSendCompressedMessage(const SliceBuffer & send_compressed_message)255 void RecordSendCompressedMessage(
256 const SliceBuffer& send_compressed_message) override {
257 for (auto* tracer : tracers_) {
258 tracer->RecordSendCompressedMessage(send_compressed_message);
259 }
260 }
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)261 void RecordReceivedInitialMetadata(
262 grpc_metadata_batch* recv_initial_metadata) override {
263 for (auto* tracer : tracers_) {
264 tracer->RecordReceivedInitialMetadata(recv_initial_metadata);
265 }
266 }
RecordReceivedMessage(const SliceBuffer & recv_message)267 void RecordReceivedMessage(const SliceBuffer& recv_message) override {
268 for (auto* tracer : tracers_) {
269 tracer->RecordReceivedMessage(recv_message);
270 }
271 }
RecordReceivedDecompressedMessage(const SliceBuffer & recv_decompressed_message)272 void RecordReceivedDecompressedMessage(
273 const SliceBuffer& recv_decompressed_message) override {
274 for (auto* tracer : tracers_) {
275 tracer->RecordReceivedDecompressedMessage(recv_decompressed_message);
276 }
277 }
RecordCancel(grpc_error_handle cancel_error)278 void RecordCancel(grpc_error_handle cancel_error) override {
279 for (auto* tracer : tracers_) {
280 tracer->RecordCancel(cancel_error);
281 }
282 }
RecordReceivedTrailingMetadata(grpc_metadata_batch * recv_trailing_metadata)283 void RecordReceivedTrailingMetadata(
284 grpc_metadata_batch* recv_trailing_metadata) override {
285 for (auto* tracer : tracers_) {
286 tracer->RecordReceivedTrailingMetadata(recv_trailing_metadata);
287 }
288 }
RecordEnd(const grpc_call_final_info * final_info)289 void RecordEnd(const grpc_call_final_info* final_info) override {
290 for (auto* tracer : tracers_) {
291 tracer->RecordEnd(final_info);
292 }
293 }
RecordIncomingBytes(const TransportByteSize & transport_byte_size)294 void RecordIncomingBytes(
295 const TransportByteSize& transport_byte_size) override {
296 for (auto* tracer : tracers_) {
297 tracer->RecordIncomingBytes(transport_byte_size);
298 }
299 }
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)300 void RecordOutgoingBytes(
301 const TransportByteSize& transport_byte_size) override {
302 for (auto* tracer : tracers_) {
303 tracer->RecordOutgoingBytes(transport_byte_size);
304 }
305 }
RecordAnnotation(absl::string_view annotation)306 void RecordAnnotation(absl::string_view annotation) override {
307 for (auto* tracer : tracers_) {
308 tracer->RecordAnnotation(annotation);
309 }
310 }
RecordAnnotation(const Annotation & annotation)311 void RecordAnnotation(const Annotation& annotation) override {
312 for (auto* tracer : tracers_) {
313 tracer->RecordAnnotation(annotation);
314 }
315 }
StartNewTcpTrace()316 std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
317 return nullptr;
318 }
TraceId()319 std::string TraceId() override { return tracers_[0]->TraceId(); }
SpanId()320 std::string SpanId() override { return tracers_[0]->SpanId(); }
IsSampled()321 bool IsSampled() override { return tracers_[0]->IsSampled(); }
IsDelegatingTracer()322 bool IsDelegatingTracer() override { return true; }
323
AddTracer(ServerCallTracer * tracer)324 void AddTracer(ServerCallTracer* tracer) { tracers_.push_back(tracer); }
325
326 private:
327 // The ServerCallTracerFilter will be responsible for making sure that the
328 // tracers are added in a thread-safe manner. It is imagined that the filter
329 // will just invoke the factories in the server call tracer factory list
330 // sequentially, removing the need for any synchronization.
331 std::vector<ServerCallTracer*> tracers_;
332 };
333
AddClientCallTracerToContext(Arena * arena,ClientCallTracer * tracer)334 void AddClientCallTracerToContext(Arena* arena, ClientCallTracer* tracer) {
335 if (arena->GetContext<CallTracerAnnotationInterface>() == nullptr) {
336 // This is the first call tracer. Set it directly.
337 arena->SetContext<CallTracerAnnotationInterface>(tracer);
338 } else {
339 // There was already a call tracer present.
340 auto* orig_tracer = DownCast<ClientCallTracer*>(
341 arena->GetContext<CallTracerAnnotationInterface>());
342 if (orig_tracer->IsDelegatingTracer()) {
343 // We already created a delegating tracer. Just add the new tracer to the
344 // list.
345 DownCast<DelegatingClientCallTracer*>(orig_tracer)->AddTracer(tracer);
346 } else {
347 // Create a new delegating tracer and add the first tracer and the new
348 // tracer to the list.
349 auto* delegating_tracer =
350 GetContext<Arena>()->ManagedNew<DelegatingClientCallTracer>(
351 orig_tracer);
352 arena->SetContext<CallTracerAnnotationInterface>(delegating_tracer);
353 delegating_tracer->AddTracer(tracer);
354 }
355 }
356 }
357
AddServerCallTracerToContext(Arena * arena,ServerCallTracer * tracer)358 void AddServerCallTracerToContext(Arena* arena, ServerCallTracer* tracer) {
359 DCHECK_EQ(arena->GetContext<CallTracerInterface>(),
360 arena->GetContext<CallTracerAnnotationInterface>());
361 if (arena->GetContext<CallTracerAnnotationInterface>() == nullptr) {
362 // This is the first call tracer. Set it directly.
363 arena->SetContext<CallTracerAnnotationInterface>(tracer);
364 arena->SetContext<CallTracerInterface>(tracer);
365 } else {
366 // There was already a call tracer present.
367 auto* orig_tracer = DownCast<ServerCallTracer*>(
368 arena->GetContext<CallTracerAnnotationInterface>());
369 if (orig_tracer->IsDelegatingTracer()) {
370 // We already created a delegating tracer. Just add the new tracer to the
371 // list.
372 DownCast<DelegatingServerCallTracer*>(orig_tracer)->AddTracer(tracer);
373 } else {
374 // Create a new delegating tracer and add the first tracer and the new
375 // tracer to the list.
376 auto* delegating_tracer =
377 GetContext<Arena>()->ManagedNew<DelegatingServerCallTracer>(
378 orig_tracer);
379 arena->SetContext<CallTracerAnnotationInterface>(delegating_tracer);
380 arena->SetContext<CallTracerInterface>(delegating_tracer);
381 delegating_tracer->AddTracer(tracer);
382 }
383 }
384 }
385
386 } // namespace grpc_core
387