• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/cpp/ext/filters/census/client_filter.h"
20 
21 #include <grpc/slice.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpc/support/time.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/opencensus.h>
26 #include <grpcpp/support/status.h>
27 #include <stddef.h>
28 #include <stdint.h>
29 
30 #include <algorithm>
31 #include <functional>
32 #include <memory>
33 #include <string>
34 #include <utility>
35 #include <vector>
36 
37 #include "absl/log/check.h"
38 #include "absl/status/status.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/str_format.h"
41 #include "absl/strings/string_view.h"
42 #include "absl/time/clock.h"
43 #include "absl/time/time.h"
44 #include "absl/types/optional.h"
45 #include "opencensus/stats/stats.h"
46 #include "opencensus/tags/tag_key.h"
47 #include "opencensus/tags/tag_map.h"
48 #include "opencensus/trace/span.h"
49 #include "opencensus/trace/span_context.h"
50 #include "opencensus/trace/status_code.h"
51 #include "src/core/lib/channel/channel_args.h"
52 #include "src/core/lib/channel/channel_stack.h"
53 #include "src/core/lib/experiments/experiments.h"
54 #include "src/core/lib/promise/context.h"
55 #include "src/core/lib/resource_quota/arena.h"
56 #include "src/core/lib/slice/slice.h"
57 #include "src/core/lib/slice/slice_buffer.h"
58 #include "src/core/lib/surface/call.h"
59 #include "src/core/lib/transport/metadata_batch.h"
60 #include "src/core/lib/transport/transport.h"
61 #include "src/core/telemetry/tcp_tracer.h"
62 #include "src/core/util/sync.h"
63 #include "src/cpp/ext/filters/census/context.h"
64 #include "src/cpp/ext/filters/census/grpc_plugin.h"
65 #include "src/cpp/ext/filters/census/measures.h"
66 #include "src/cpp/ext/filters/census/open_census_call_tracer.h"
67 
68 namespace grpc {
69 namespace internal {
70 
71 constexpr uint32_t
72     OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTraceContextLen;
73 constexpr uint32_t
74     OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen;
75 
76 //
77 // OpenCensusClientFilter
78 //
79 
80 const grpc_channel_filter OpenCensusClientFilter::kFilter =
81     grpc_core::MakePromiseBasedFilter<OpenCensusClientFilter,
82                                       grpc_core::FilterEndpoint::kClient, 0>();
83 
84 absl::StatusOr<std::unique_ptr<OpenCensusClientFilter>>
Create(const grpc_core::ChannelArgs & args,ChannelFilter::Args)85 OpenCensusClientFilter::Create(const grpc_core::ChannelArgs& args,
86                                ChannelFilter::Args /*filter_args*/) {
87   bool observability_enabled =
88       args.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY).value_or(true);
89   return std::make_unique<OpenCensusClientFilter>(
90       /*tracing_enabled=*/observability_enabled);
91 }
92 
93 grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
MakeCallPromise(grpc_core::CallArgs call_args,grpc_core::NextPromiseFactory next_promise_factory)94 OpenCensusClientFilter::MakeCallPromise(
95     grpc_core::CallArgs call_args,
96     grpc_core::NextPromiseFactory next_promise_factory) {
97   auto* path = call_args.client_initial_metadata->get_pointer(
98       grpc_core::HttpPathMetadata());
99   auto* arena = grpc_core::GetContext<grpc_core::Arena>();
100   auto* tracer = arena->ManagedNew<OpenCensusCallTracer>(
101       path != nullptr ? path->Ref() : grpc_core::Slice(),
102       grpc_core::GetContext<grpc_core::Arena>(),
103       OpenCensusTracingEnabled() && tracing_enabled_);
104   DCHECK_EQ(arena->GetContext<grpc_core::CallTracerAnnotationInterface>(),
105             nullptr);
106   grpc_core::SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
107   return next_promise_factory(std::move(call_args));
108 }
109 
110 //
111 // OpenCensusCallTracer::OpenCensusCallAttemptTracer
112 //
113 
OpenCensusCallAttemptTracer(OpenCensusCallTracer * parent,uint64_t attempt_num,bool is_transparent_retry,bool arena_allocated)114 OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer(
115     OpenCensusCallTracer* parent, uint64_t attempt_num,
116     bool is_transparent_retry, bool arena_allocated)
117     : parent_(parent),
118       arena_allocated_(arena_allocated),
119       context_(parent_->CreateCensusContextForCallAttempt()),
120       start_time_(absl::Now()) {
121   if (parent_->tracing_enabled_) {
122     context_.AddSpanAttribute("previous-rpc-attempts", attempt_num);
123     context_.AddSpanAttribute("transparent-retry", is_transparent_retry);
124   }
125   if (OpenCensusStatsEnabled()) {
126     std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
127         context_.tags().tags();
128     tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
129     ::opencensus::stats::Record({{RpcClientStartedRpcs(), 1}}, tags);
130   }
131 }
132 
133 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)134     RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
135   if (parent_->tracing_enabled_) {
136     char tracing_buf[kMaxTraceContextLen];
137     size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf,
138                                                kMaxTraceContextLen);
139     if (tracing_len > 0) {
140       send_initial_metadata->Set(
141           grpc_core::GrpcTraceBinMetadata(),
142           grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len));
143     }
144   }
145   if (OpenCensusStatsEnabled()) {
146     grpc_slice tags = grpc_empty_slice();
147     // TODO(unknown): Add in tagging serialization.
148     size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
149     if (encoded_tags_len > 0) {
150       send_initial_metadata->Set(grpc_core::GrpcTagsBinMetadata(),
151                                  grpc_core::Slice(tags));
152     }
153   }
154 }
155 
RecordSendMessage(const grpc_core::SliceBuffer & send_message)156 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage(
157     const grpc_core::SliceBuffer& send_message) {
158   RecordAnnotation(
159       absl::StrFormat("Send message: %ld bytes", send_message.Length()));
160   ++sent_message_count_;
161 }
162 
163 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordSendCompressedMessage(const grpc_core::SliceBuffer & send_compressed_message)164     RecordSendCompressedMessage(
165         const grpc_core::SliceBuffer& send_compressed_message) {
166   RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
167                                    send_compressed_message.Length()));
168 }
169 
RecordReceivedMessage(const grpc_core::SliceBuffer & recv_message)170 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage(
171     const grpc_core::SliceBuffer& recv_message) {
172   RecordAnnotation(
173       absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
174   ++recv_message_count_;
175 }
176 
177 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordReceivedDecompressedMessage(const grpc_core::SliceBuffer & recv_decompressed_message)178     RecordReceivedDecompressedMessage(
179         const grpc_core::SliceBuffer& recv_decompressed_message) {
180   RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
181                                    recv_decompressed_message.Length()));
182 }
183 
184 namespace {
185 
FilterTrailingMetadata(grpc_metadata_batch * b,uint64_t * elapsed_time)186 void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
187   if (OpenCensusStatsEnabled()) {
188     absl::optional<grpc_core::Slice> grpc_server_stats_bin =
189         b->Take(grpc_core::GrpcServerStatsBinMetadata());
190     if (grpc_server_stats_bin.has_value()) {
191       ServerStatsDeserialize(
192           reinterpret_cast<const char*>(grpc_server_stats_bin->data()),
193           grpc_server_stats_bin->size(), elapsed_time);
194     }
195   }
196 }
197 
198 }  // namespace
199 
200 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats * transport_stream_stats)201     RecordReceivedTrailingMetadata(
202         absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
203         const grpc_transport_stream_stats* transport_stream_stats) {
204   status_code_ = status.code();
205   if (OpenCensusStatsEnabled()) {
206     uint64_t elapsed_time = 0;
207     if (recv_trailing_metadata != nullptr) {
208       FilterTrailingMetadata(recv_trailing_metadata, &elapsed_time);
209     }
210     std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
211         context_.tags().tags();
212     tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
213     tags.emplace_back(ClientStatusTagKey(),
214                       absl::StatusCodeToString(status_code_));
215     uint64_t outgoing_bytes = 0;
216     uint64_t incoming_bytes = 0;
217     if (grpc_core::IsCallTracerInTransportEnabled()) {
218       outgoing_bytes = outgoing_bytes_.load();
219       incoming_bytes = incoming_bytes_.load();
220     } else if (transport_stream_stats != nullptr) {
221       outgoing_bytes = transport_stream_stats->outgoing.data_bytes;
222       incoming_bytes = transport_stream_stats->incoming.data_bytes;
223     }
224     ::opencensus::stats::Record(
225         {{RpcClientSentBytesPerRpc(), static_cast<double>(outgoing_bytes)},
226          {RpcClientReceivedBytesPerRpc(), static_cast<double>(incoming_bytes)},
227          {RpcClientServerLatency(),
228           ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time))},
229          {RpcClientRoundtripLatency(),
230           absl::ToDoubleMilliseconds(absl::Now() - start_time_)}},
231         tags);
232   }
233 }
234 
RecordIncomingBytes(const TransportByteSize & transport_byte_size)235 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordIncomingBytes(
236     const TransportByteSize& transport_byte_size) {
237   incoming_bytes_.fetch_add(transport_byte_size.data_bytes);
238 }
239 
RecordOutgoingBytes(const TransportByteSize & transport_byte_size)240 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordOutgoingBytes(
241     const TransportByteSize& transport_byte_size) {
242   outgoing_bytes_.fetch_add(transport_byte_size.data_bytes);
243 }
244 
RecordCancel(absl::Status)245 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel(
246     absl::Status /*cancel_error*/) {}
247 
RecordEnd(const gpr_timespec &)248 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd(
249     const gpr_timespec& /*latency*/) {
250   if (OpenCensusStatsEnabled()) {
251     std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
252         context_.tags().tags();
253     tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
254     tags.emplace_back(ClientStatusTagKey(), StatusCodeToString(status_code_));
255     ::opencensus::stats::Record(
256         {{RpcClientSentMessagesPerRpc(), sent_message_count_},
257          {RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
258         tags);
259     grpc_core::MutexLock lock(&parent_->mu_);
260     if (--parent_->num_active_rpcs_ == 0) {
261       parent_->time_at_last_attempt_end_ = absl::Now();
262     }
263   }
264   if (parent_->tracing_enabled_) {
265     if (status_code_ != absl::StatusCode::kOk) {
266       context_.Span().SetStatus(
267           static_cast<opencensus::trace::StatusCode>(status_code_),
268           StatusCodeToString(status_code_));
269     }
270     context_.EndSpan();
271   }
272   if (arena_allocated_) {
273     this->~OpenCensusCallAttemptTracer();
274   } else {
275     delete this;
276   }
277 }
278 
RecordAnnotation(absl::string_view annotation)279 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordAnnotation(
280     absl::string_view annotation) {
281   if (!context_.Span().IsRecording()) {
282     return;
283   }
284   context_.AddSpanAnnotation(annotation, {});
285 }
286 
RecordAnnotation(const Annotation & annotation)287 void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordAnnotation(
288     const Annotation& annotation) {
289   if (!context_.Span().IsRecording()) {
290     return;
291   }
292 
293   switch (annotation.type()) {
294     // Annotations are expensive to create. We should only create it if the
295     // call is being sampled by default.
296     default:
297       if (IsSampled()) {
298         context_.AddSpanAnnotation(annotation.ToString(), {});
299       }
300       break;
301   }
302 }
303 
304 std::shared_ptr<grpc_core::TcpTracerInterface>
StartNewTcpTrace()305 OpenCensusCallTracer::OpenCensusCallAttemptTracer::StartNewTcpTrace() {
306   return nullptr;
307 }
308 
309 //
310 // OpenCensusCallTracer
311 //
312 
OpenCensusCallTracer(grpc_core::Slice path,grpc_core::Arena * arena,bool tracing_enabled)313 OpenCensusCallTracer::OpenCensusCallTracer(grpc_core::Slice path,
314                                            grpc_core::Arena* arena,
315                                            bool tracing_enabled)
316     : path_(std::move(path)),
317       method_(GetMethod(path_)),
318       arena_(arena),
319       tracing_enabled_(tracing_enabled) {
320   auto* parent_context =
321       reinterpret_cast<CensusContext*>(arena->GetContext<census_context>());
322   GenerateClientContext(tracing_enabled_ ? absl::StrCat("Sent.", method_) : "",
323                         &context_,
324                         (parent_context == nullptr) ? nullptr : parent_context);
325 }
326 
~OpenCensusCallTracer()327 OpenCensusCallTracer::~OpenCensusCallTracer() {
328   if (OpenCensusStatsEnabled()) {
329     std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
330         context_.tags().tags();
331     tags.emplace_back(ClientMethodTagKey(), std::string(method_));
332     ::opencensus::stats::Record(
333         {{RpcClientRetriesPerCall(), retries_ - 1},  // exclude first attempt
334          {RpcClientTransparentRetriesPerCall(), transparent_retries_},
335          {RpcClientRetryDelayPerCall(), ToDoubleMilliseconds(retry_delay_)}},
336         tags);
337   }
338   if (tracing_enabled_) {
339     context_.EndSpan();
340   }
341 }
342 
343 OpenCensusCallTracer::OpenCensusCallAttemptTracer*
StartNewAttempt(bool is_transparent_retry)344 OpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
345   // We allocate the first attempt on the arena and all subsequent attempts
346   // on the heap, so that in the common case we don't require a heap
347   // allocation, nor do we unnecessarily grow the arena.
348   bool is_first_attempt = true;
349   uint64_t attempt_num;
350   {
351     grpc_core::MutexLock lock(&mu_);
352     if (transparent_retries_ != 0 || retries_ != 0) {
353       is_first_attempt = false;
354       if (OpenCensusStatsEnabled() && num_active_rpcs_ == 0) {
355         retry_delay_ += absl::Now() - time_at_last_attempt_end_;
356       }
357     }
358     attempt_num = retries_;
359     if (is_transparent_retry) {
360       ++transparent_retries_;
361     } else {
362       ++retries_;
363     }
364     ++num_active_rpcs_;
365   }
366   if (is_first_attempt) {
367     return arena_->New<OpenCensusCallAttemptTracer>(
368         this, attempt_num, is_transparent_retry, true /* arena_allocated */);
369   }
370   return new OpenCensusCallAttemptTracer(
371       this, attempt_num, is_transparent_retry, false /* arena_allocated */);
372 }
373 
RecordAnnotation(absl::string_view annotation)374 void OpenCensusCallTracer::RecordAnnotation(absl::string_view annotation) {
375   if (!context_.Span().IsRecording()) {
376     return;
377   }
378   context_.AddSpanAnnotation(annotation, {});
379 }
380 
RecordAnnotation(const Annotation & annotation)381 void OpenCensusCallTracer::RecordAnnotation(const Annotation& annotation) {
382   if (!context_.Span().IsRecording()) {
383     return;
384   }
385 
386   switch (annotation.type()) {
387     // Annotations are expensive to create. We should only create it if the
388     // call is being sampled by default.
389     default:
390       if (IsSampled()) {
391         context_.AddSpanAnnotation(annotation.ToString(), {});
392       }
393       break;
394   }
395 }
396 
RecordApiLatency(absl::Duration api_latency,absl::StatusCode status_code)397 void OpenCensusCallTracer::RecordApiLatency(absl::Duration api_latency,
398                                             absl::StatusCode status_code) {
399   if (OpenCensusStatsEnabled()) {
400     std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
401         context_.tags().tags();
402     tags.emplace_back(ClientMethodTagKey(), std::string(method_));
403     tags.emplace_back(ClientStatusTagKey(),
404                       absl::StatusCodeToString(status_code));
405     ::opencensus::stats::Record(
406         {{RpcClientApiLatency(), absl::ToDoubleMilliseconds(api_latency)}},
407         tags);
408   }
409 }
410 
CreateCensusContextForCallAttempt()411 CensusContext OpenCensusCallTracer::CreateCensusContextForCallAttempt() {
412   if (!tracing_enabled_) return CensusContext(context_.tags());
413   DCHECK(context_.Context().IsValid());
414   auto context = CensusContext(absl::StrCat("Attempt.", method_),
415                                &(context_.Span()), context_.tags());
416   grpc::internal::OpenCensusRegistry::Get()
417       .PopulateCensusContextWithConstantAttributes(&context);
418   return context;
419 }
420 
421 class OpenCensusClientInterceptor : public grpc::experimental::Interceptor {
422  public:
OpenCensusClientInterceptor(grpc::experimental::ClientRpcInfo * info)423   explicit OpenCensusClientInterceptor(grpc::experimental::ClientRpcInfo* info)
424       : info_(info), start_time_(absl::Now()) {}
425 
Intercept(grpc::experimental::InterceptorBatchMethods * methods)426   void Intercept(
427       grpc::experimental::InterceptorBatchMethods* methods) override {
428     if (methods->QueryInterceptionHookPoint(
429             grpc::experimental::InterceptionHookPoints::POST_RECV_STATUS)) {
430       auto* tracer = grpc_core::DownCast<OpenCensusCallTracer*>(
431           grpc_call_get_arena(info_->client_context()->c_call())
432               ->GetContext<grpc_core::CallTracerAnnotationInterface>());
433       if (tracer != nullptr) {
434         tracer->RecordApiLatency(absl::Now() - start_time_,
435                                  static_cast<absl::StatusCode>(
436                                      methods->GetRecvStatus()->error_code()));
437       }
438     }
439     methods->Proceed();
440   }
441 
442  private:
443   grpc::experimental::ClientRpcInfo* info_;
444   // Start time for measuring end-to-end API latency
445   absl::Time start_time_;
446 };
447 
448 //
449 // OpenCensusClientInterceptorFactory
450 //
451 
452 grpc::experimental::Interceptor*
CreateClientInterceptor(grpc::experimental::ClientRpcInfo * info)453 OpenCensusClientInterceptorFactory::CreateClientInterceptor(
454     grpc::experimental::ClientRpcInfo* info) {
455   return new OpenCensusClientInterceptor(info);
456 }
457 
458 }  // namespace internal
459 }  // namespace grpc
460