1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/cpp/ext/filters/census/client_filter.h"
20
21 #include <grpc/slice.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpc/support/time.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/opencensus.h>
26 #include <grpcpp/support/status.h>
27 #include <stddef.h>
28 #include <stdint.h>
29
30 #include <algorithm>
31 #include <functional>
32 #include <memory>
33 #include <string>
34 #include <utility>
35 #include <vector>
36
37 #include "absl/log/check.h"
38 #include "absl/status/status.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/str_format.h"
41 #include "absl/strings/string_view.h"
42 #include "absl/time/clock.h"
43 #include "absl/time/time.h"
44 #include "absl/types/optional.h"
45 #include "opencensus/stats/stats.h"
46 #include "opencensus/tags/tag_key.h"
47 #include "opencensus/tags/tag_map.h"
48 #include "opencensus/trace/span.h"
49 #include "opencensus/trace/span_context.h"
50 #include "opencensus/trace/status_code.h"
51 #include "src/core/lib/channel/channel_args.h"
52 #include "src/core/lib/channel/channel_stack.h"
53 #include "src/core/lib/experiments/experiments.h"
54 #include "src/core/lib/promise/context.h"
55 #include "src/core/lib/resource_quota/arena.h"
56 #include "src/core/lib/slice/slice.h"
57 #include "src/core/lib/slice/slice_buffer.h"
58 #include "src/core/lib/surface/call.h"
59 #include "src/core/lib/transport/metadata_batch.h"
60 #include "src/core/lib/transport/transport.h"
61 #include "src/core/telemetry/tcp_tracer.h"
62 #include "src/core/util/sync.h"
63 #include "src/cpp/ext/filters/census/context.h"
64 #include "src/cpp/ext/filters/census/grpc_plugin.h"
65 #include "src/cpp/ext/filters/census/measures.h"
66 #include "src/cpp/ext/filters/census/open_census_call_tracer.h"
67
68 namespace grpc {
69 namespace internal {
70
71 constexpr uint32_t
72 OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTraceContextLen;
73 constexpr uint32_t
74 OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen;
75
76 //
77 // OpenCensusClientFilter
78 //
79
80 const grpc_channel_filter OpenCensusClientFilter::kFilter =
81 grpc_core::MakePromiseBasedFilter<OpenCensusClientFilter,
82 grpc_core::FilterEndpoint::kClient, 0>();
83
84 absl::StatusOr<std::unique_ptr<OpenCensusClientFilter>>
Create(const grpc_core::ChannelArgs & args,ChannelFilter::Args)85 OpenCensusClientFilter::Create(const grpc_core::ChannelArgs& args,
86 ChannelFilter::Args /*filter_args*/) {
87 bool observability_enabled =
88 args.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY).value_or(true);
89 return std::make_unique<OpenCensusClientFilter>(
90 /*tracing_enabled=*/observability_enabled);
91 }
92
93 grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
MakeCallPromise(grpc_core::CallArgs call_args,grpc_core::NextPromiseFactory next_promise_factory)94 OpenCensusClientFilter::MakeCallPromise(
95 grpc_core::CallArgs call_args,
96 grpc_core::NextPromiseFactory next_promise_factory) {
97 auto* path = call_args.client_initial_metadata->get_pointer(
98 grpc_core::HttpPathMetadata());
99 auto* arena = grpc_core::GetContext<grpc_core::Arena>();
100 auto* tracer = arena->ManagedNew<OpenCensusCallTracer>(
101 path != nullptr ? path->Ref() : grpc_core::Slice(),
102 grpc_core::GetContext<grpc_core::Arena>(),
103 OpenCensusTracingEnabled() && tracing_enabled_);
104 DCHECK_EQ(arena->GetContext<grpc_core::CallTracerAnnotationInterface>(),
105 nullptr);
106 grpc_core::SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
107 return next_promise_factory(std::move(call_args));
108 }
109
110 //
111 // OpenCensusCallTracer::OpenCensusCallAttemptTracer
112 //
113
OpenCensusCallAttemptTracer(OpenCensusCallTracer * parent,uint64_t attempt_num,bool is_transparent_retry,bool arena_allocated)114 OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer(
115 OpenCensusCallTracer* parent, uint64_t attempt_num,
116 bool is_transparent_retry, bool arena_allocated)
117 : parent_(parent),
118 arena_allocated_(arena_allocated),
119 context_(parent_->CreateCensusContextForCallAttempt()),
120 start_time_(absl::Now()) {
121 if (parent_->tracing_enabled_) {
122 context_.AddSpanAttribute("previous-rpc-attempts", attempt_num);
123 context_.AddSpanAttribute("transparent-retry", is_transparent_retry);
124 }
125 if (OpenCensusStatsEnabled()) {
126 std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
127 context_.tags().tags();
128 tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
129 ::opencensus::stats::Record({{RpcClientStartedRpcs(), 1}}, tags);
130 }
131 }
132
133 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)134 RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
135 if (parent_->tracing_enabled_) {
136 char tracing_buf[kMaxTraceContextLen];
137 size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf,
138 kMaxTraceContextLen);
139 if (tracing_len > 0) {
140 send_initial_metadata->Set(
141 grpc_core::GrpcTraceBinMetadata(),
142 grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len));
143 }
144 }
145 if (OpenCensusStatsEnabled()) {
146 grpc_slice tags = grpc_empty_slice();
147 // TODO(unknown): Add in tagging serialization.
148 size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
149 if (encoded_tags_len > 0) {
150 send_initial_metadata->Set(grpc_core::GrpcTagsBinMetadata(),
151 grpc_core::Slice(tags));
152 }
153 }
154 }
155
RecordSendMessage(const grpc_core::SliceBuffer & send_message)156 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage(
157 const grpc_core::SliceBuffer& send_message) {
158 RecordAnnotation(
159 absl::StrFormat("Send message: %ld bytes", send_message.Length()));
160 ++sent_message_count_;
161 }
162
163 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordSendCompressedMessage(const grpc_core::SliceBuffer & send_compressed_message)164 RecordSendCompressedMessage(
165 const grpc_core::SliceBuffer& send_compressed_message) {
166 RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
167 send_compressed_message.Length()));
168 }
169
RecordReceivedMessage(const grpc_core::SliceBuffer & recv_message)170 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage(
171 const grpc_core::SliceBuffer& recv_message) {
172 RecordAnnotation(
173 absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
174 ++recv_message_count_;
175 }
176
177 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordReceivedDecompressedMessage(const grpc_core::SliceBuffer & recv_decompressed_message)178 RecordReceivedDecompressedMessage(
179 const grpc_core::SliceBuffer& recv_decompressed_message) {
180 RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
181 recv_decompressed_message.Length()));
182 }
183
184 namespace {
185
FilterTrailingMetadata(grpc_metadata_batch * b,uint64_t * elapsed_time)186 void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
187 if (OpenCensusStatsEnabled()) {
188 absl::optional<grpc_core::Slice> grpc_server_stats_bin =
189 b->Take(grpc_core::GrpcServerStatsBinMetadata());
190 if (grpc_server_stats_bin.has_value()) {
191 ServerStatsDeserialize(
192 reinterpret_cast<const char*>(grpc_server_stats_bin->data()),
193 grpc_server_stats_bin->size(), elapsed_time);
194 }
195 }
196 }
197
198 } // namespace
199
200 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats * transport_stream_stats)201 RecordReceivedTrailingMetadata(
202 absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
203 const grpc_transport_stream_stats* transport_stream_stats) {
204 status_code_ = status.code();
205 if (OpenCensusStatsEnabled()) {
206 uint64_t elapsed_time = 0;
207 if (recv_trailing_metadata != nullptr) {
208 FilterTrailingMetadata(recv_trailing_metadata, &elapsed_time);
209 }
210 std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
211 context_.tags().tags();
212 tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
213 tags.emplace_back(ClientStatusTagKey(),
214 absl::StatusCodeToString(status_code_));
215 uint64_t outgoing_bytes = 0;
216 uint64_t incoming_bytes = 0;
217 if (grpc_core::IsCallTracerInTransportEnabled()) {
218 outgoing_bytes = outgoing_bytes_.load();
219 incoming_bytes = incoming_bytes_.load();
220 } else if (transport_stream_stats != nullptr) {
221 outgoing_bytes = transport_stream_stats->outgoing.data_bytes;
222 incoming_bytes = transport_stream_stats->incoming.data_bytes;
223 }
224 ::opencensus::stats::Record(
225 {{RpcClientSentBytesPerRpc(), static_cast<double>(outgoing_bytes)},
226 {RpcClientReceivedBytesPerRpc(), static_cast<double>(incoming_bytes)},
227 {RpcClientServerLatency(),
228 ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time))},
229 {RpcClientRoundtripLatency(),
230 absl::ToDoubleMilliseconds(absl::Now() - start_time_)}},
231 tags);
232 }
233 }
234
RecordIncomingBytes(const TransportByteSize & transport_byte_size)235 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordIncomingBytes(
236 const TransportByteSize& transport_byte_size) {
237 incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
238 }
239
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)240 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordOutgoingBytes(
241 const TransportByteSize& transport_byte_size) {
242 outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
243 }
244
RecordCancel(absl::Status)245 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel(
246 absl::Status /*cancel_error*/) {}
247
RecordEnd(const gpr_timespec &)248 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd(
249 const gpr_timespec& /*latency*/) {
250 if (OpenCensusStatsEnabled()) {
251 std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
252 context_.tags().tags();
253 tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
254 tags.emplace_back(ClientStatusTagKey(), StatusCodeToString(status_code_));
255 ::opencensus::stats::Record(
256 {{RpcClientSentMessagesPerRpc(), sent_message_count_},
257 {RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
258 tags);
259 grpc_core::MutexLock lock(&parent_->mu_);
260 if (--parent_->num_active_rpcs_ == 0) {
261 parent_->time_at_last_attempt_end_ = absl::Now();
262 }
263 }
264 if (parent_->tracing_enabled_) {
265 if (status_code_ != absl::StatusCode::kOk) {
266 context_.Span().SetStatus(
267 static_cast<opencensus::trace::StatusCode>(status_code_),
268 StatusCodeToString(status_code_));
269 }
270 context_.EndSpan();
271 }
272 if (arena_allocated_) {
273 this->~OpenCensusCallAttemptTracer();
274 } else {
275 delete this;
276 }
277 }
278
RecordAnnotation(absl::string_view annotation)279 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordAnnotation(
280 absl::string_view annotation) {
281 if (!context_.Span().IsRecording()) {
282 return;
283 }
284 context_.AddSpanAnnotation(annotation, {});
285 }
286
RecordAnnotation(const Annotation & annotation)287 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordAnnotation(
288 const Annotation& annotation) {
289 if (!context_.Span().IsRecording()) {
290 return;
291 }
292
293 switch (annotation.type()) {
294 // Annotations are expensive to create. We should only create it if the
295 // call is being sampled by default.
296 default:
297 if (IsSampled()) {
298 context_.AddSpanAnnotation(annotation.ToString(), {});
299 }
300 break;
301 }
302 }
303
304 std::shared_ptr<grpc_core::TcpTracerInterface>
StartNewTcpTrace()305 OpenCensusCallTracer::OpenCensusCallAttemptTracer::StartNewTcpTrace() {
306 return nullptr;
307 }
308
309 //
310 // OpenCensusCallTracer
311 //
312
OpenCensusCallTracer(grpc_core::Slice path,grpc_core::Arena * arena,bool tracing_enabled)313 OpenCensusCallTracer::OpenCensusCallTracer(grpc_core::Slice path,
314 grpc_core::Arena* arena,
315 bool tracing_enabled)
316 : path_(std::move(path)),
317 method_(GetMethod(path_)),
318 arena_(arena),
319 tracing_enabled_(tracing_enabled) {
320 auto* parent_context =
321 reinterpret_cast<CensusContext*>(arena->GetContext<census_context>());
322 GenerateClientContext(tracing_enabled_ ? absl::StrCat("Sent.", method_) : "",
323 &context_,
324 (parent_context == nullptr) ? nullptr : parent_context);
325 }
326
~OpenCensusCallTracer()327 OpenCensusCallTracer::~OpenCensusCallTracer() {
328 if (OpenCensusStatsEnabled()) {
329 std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
330 context_.tags().tags();
331 tags.emplace_back(ClientMethodTagKey(), std::string(method_));
332 ::opencensus::stats::Record(
333 {{RpcClientRetriesPerCall(), retries_ - 1}, // exclude first attempt
334 {RpcClientTransparentRetriesPerCall(), transparent_retries_},
335 {RpcClientRetryDelayPerCall(), ToDoubleMilliseconds(retry_delay_)}},
336 tags);
337 }
338 if (tracing_enabled_) {
339 context_.EndSpan();
340 }
341 }
342
343 OpenCensusCallTracer::OpenCensusCallAttemptTracer*
StartNewAttempt(bool is_transparent_retry)344 OpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
345 // We allocate the first attempt on the arena and all subsequent attempts
346 // on the heap, so that in the common case we don't require a heap
347 // allocation, nor do we unnecessarily grow the arena.
348 bool is_first_attempt = true;
349 uint64_t attempt_num;
350 {
351 grpc_core::MutexLock lock(&mu_);
352 if (transparent_retries_ != 0 || retries_ != 0) {
353 is_first_attempt = false;
354 if (OpenCensusStatsEnabled() && num_active_rpcs_ == 0) {
355 retry_delay_ += absl::Now() - time_at_last_attempt_end_;
356 }
357 }
358 attempt_num = retries_;
359 if (is_transparent_retry) {
360 ++transparent_retries_;
361 } else {
362 ++retries_;
363 }
364 ++num_active_rpcs_;
365 }
366 if (is_first_attempt) {
367 return arena_->New<OpenCensusCallAttemptTracer>(
368 this, attempt_num, is_transparent_retry, true /* arena_allocated */);
369 }
370 return new OpenCensusCallAttemptTracer(
371 this, attempt_num, is_transparent_retry, false /* arena_allocated */);
372 }
373
RecordAnnotation(absl::string_view annotation)374 void OpenCensusCallTracer::RecordAnnotation(absl::string_view annotation) {
375 if (!context_.Span().IsRecording()) {
376 return;
377 }
378 context_.AddSpanAnnotation(annotation, {});
379 }
380
RecordAnnotation(const Annotation & annotation)381 void OpenCensusCallTracer::RecordAnnotation(const Annotation& annotation) {
382 if (!context_.Span().IsRecording()) {
383 return;
384 }
385
386 switch (annotation.type()) {
387 // Annotations are expensive to create. We should only create it if the
388 // call is being sampled by default.
389 default:
390 if (IsSampled()) {
391 context_.AddSpanAnnotation(annotation.ToString(), {});
392 }
393 break;
394 }
395 }
396
RecordApiLatency(absl::Duration api_latency,absl::StatusCode status_code)397 void OpenCensusCallTracer::RecordApiLatency(absl::Duration api_latency,
398 absl::StatusCode status_code) {
399 if (OpenCensusStatsEnabled()) {
400 std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
401 context_.tags().tags();
402 tags.emplace_back(ClientMethodTagKey(), std::string(method_));
403 tags.emplace_back(ClientStatusTagKey(),
404 absl::StatusCodeToString(status_code));
405 ::opencensus::stats::Record(
406 {{RpcClientApiLatency(), absl::ToDoubleMilliseconds(api_latency)}},
407 tags);
408 }
409 }
410
CreateCensusContextForCallAttempt()411 CensusContext OpenCensusCallTracer::CreateCensusContextForCallAttempt() {
412 if (!tracing_enabled_) return CensusContext(context_.tags());
413 DCHECK(context_.Context().IsValid());
414 auto context = CensusContext(absl::StrCat("Attempt.", method_),
415 &(context_.Span()), context_.tags());
416 grpc::internal::OpenCensusRegistry::Get()
417 .PopulateCensusContextWithConstantAttributes(&context);
418 return context;
419 }
420
421 class OpenCensusClientInterceptor : public grpc::experimental::Interceptor {
422 public:
OpenCensusClientInterceptor(grpc::experimental::ClientRpcInfo * info)423 explicit OpenCensusClientInterceptor(grpc::experimental::ClientRpcInfo* info)
424 : info_(info), start_time_(absl::Now()) {}
425
Intercept(grpc::experimental::InterceptorBatchMethods * methods)426 void Intercept(
427 grpc::experimental::InterceptorBatchMethods* methods) override {
428 if (methods->QueryInterceptionHookPoint(
429 grpc::experimental::InterceptionHookPoints::POST_RECV_STATUS)) {
430 auto* tracer = grpc_core::DownCast<OpenCensusCallTracer*>(
431 grpc_call_get_arena(info_->client_context()->c_call())
432 ->GetContext<grpc_core::CallTracerAnnotationInterface>());
433 if (tracer != nullptr) {
434 tracer->RecordApiLatency(absl::Now() - start_time_,
435 static_cast<absl::StatusCode>(
436 methods->GetRecvStatus()->error_code()));
437 }
438 }
439 methods->Proceed();
440 }
441
442 private:
443 grpc::experimental::ClientRpcInfo* info_;
444 // Start time for measuring end-to-end API latency
445 absl::Time start_time_;
446 };
447
448 //
449 // OpenCensusClientInterceptorFactory
450 //
451
452 grpc::experimental::Interceptor*
CreateClientInterceptor(grpc::experimental::ClientRpcInfo * info)453 OpenCensusClientInterceptorFactory::CreateClientInterceptor(
454 grpc::experimental::ClientRpcInfo* info) {
455 return new OpenCensusClientInterceptor(info);
456 }
457
458 } // namespace internal
459 } // namespace grpc
460