• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/cpp/ext/filters/census/client_filter.h"
22 
23 #include "absl/strings/str_cat.h"
24 #include "absl/strings/string_view.h"
25 #include "opencensus/stats/stats.h"
26 #include "src/core/lib/surface/call.h"
27 #include "src/cpp/ext/filters/census/grpc_plugin.h"
28 #include "src/cpp/ext/filters/census/measures.h"
29 
30 namespace grpc {
31 
32 constexpr uint32_t CensusClientCallData::kMaxTraceContextLen;
33 constexpr uint32_t CensusClientCallData::kMaxTagsLen;
34 
35 namespace {
36 
FilterTrailingMetadata(grpc_metadata_batch * b,uint64_t * elapsed_time)37 void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
38   if (b->idx.named.grpc_server_stats_bin != nullptr) {
39     ServerStatsDeserialize(
40         reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(
41             GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md))),
42         GRPC_SLICE_LENGTH(GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md)),
43         elapsed_time);
44     grpc_metadata_batch_remove(b, b->idx.named.grpc_server_stats_bin);
45   }
46 }
47 
48 }  // namespace
49 
OnDoneRecvTrailingMetadataCb(void * user_data,grpc_error * error)50 void CensusClientCallData::OnDoneRecvTrailingMetadataCb(void* user_data,
51                                                         grpc_error* error) {
52   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
53   CensusClientCallData* calld =
54       reinterpret_cast<CensusClientCallData*>(elem->call_data);
55   GPR_ASSERT(calld != nullptr);
56   if (error == GRPC_ERROR_NONE) {
57     GPR_ASSERT(calld->recv_trailing_metadata_ != nullptr);
58     FilterTrailingMetadata(calld->recv_trailing_metadata_,
59                            &calld->elapsed_time_);
60   }
61   GRPC_CLOSURE_RUN(calld->initial_on_done_recv_trailing_metadata_,
62                    GRPC_ERROR_REF(error));
63 }
64 
OnDoneRecvMessageCb(void * user_data,grpc_error * error)65 void CensusClientCallData::OnDoneRecvMessageCb(void* user_data,
66                                                grpc_error* error) {
67   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
68   CensusClientCallData* calld =
69       reinterpret_cast<CensusClientCallData*>(elem->call_data);
70   CensusChannelData* channeld =
71       reinterpret_cast<CensusChannelData*>(elem->channel_data);
72   GPR_ASSERT(calld != nullptr);
73   GPR_ASSERT(channeld != nullptr);
74   // Stream messages are no longer valid after receiving trailing metadata.
75   if ((*calld->recv_message_) != nullptr) {
76     calld->recv_message_count_++;
77   }
78   GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error));
79 }
80 
StartTransportStreamOpBatch(grpc_call_element * elem,TransportStreamOpBatch * op)81 void CensusClientCallData::StartTransportStreamOpBatch(
82     grpc_call_element* elem, TransportStreamOpBatch* op) {
83   if (op->send_initial_metadata() != nullptr) {
84     census_context* ctxt = op->get_census_context();
85     GenerateClientContext(
86         qualified_method_, &context_,
87         (ctxt == nullptr) ? nullptr : reinterpret_cast<CensusContext*>(ctxt));
88     size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_,
89                                                kMaxTraceContextLen);
90     if (tracing_len > 0) {
91       GRPC_LOG_IF_ERROR(
92           "census grpc_filter",
93           grpc_metadata_batch_add_tail(
94               op->send_initial_metadata()->batch(), &tracing_bin_,
95               grpc_mdelem_from_slices(
96                   GRPC_MDSTR_GRPC_TRACE_BIN,
97                   grpc_slice_from_copied_buffer(tracing_buf_, tracing_len))));
98     }
99     grpc_slice tags = grpc_empty_slice();
100     // TODO: Add in tagging serialization.
101     size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
102     if (encoded_tags_len > 0) {
103       GRPC_LOG_IF_ERROR(
104           "census grpc_filter",
105           grpc_metadata_batch_add_tail(
106               op->send_initial_metadata()->batch(), &stats_bin_,
107               grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags)));
108     }
109   }
110 
111   if (op->send_message() != nullptr) {
112     ++sent_message_count_;
113   }
114   if (op->recv_message() != nullptr) {
115     recv_message_ = op->op()->payload->recv_message.recv_message;
116     initial_on_done_recv_message_ =
117         op->op()->payload->recv_message.recv_message_ready;
118     op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
119   }
120   if (op->recv_trailing_metadata() != nullptr) {
121     recv_trailing_metadata_ = op->recv_trailing_metadata()->batch();
122     initial_on_done_recv_trailing_metadata_ =
123         op->op()->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
124     op->op()->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
125         &on_done_recv_trailing_metadata_;
126   }
127   // Call next op.
128   grpc_call_next_op(elem, op->op());
129 }
130 
Init(grpc_call_element * elem,const grpc_call_element_args * args)131 grpc_error* CensusClientCallData::Init(grpc_call_element* elem,
132                                        const grpc_call_element_args* args) {
133   path_ = grpc_slice_ref_internal(args->path);
134   start_time_ = absl::Now();
135   method_ = GetMethod(&path_);
136   qualified_method_ = absl::StrCat("Sent.", method_);
137   GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
138                     grpc_schedule_on_exec_ctx);
139   GRPC_CLOSURE_INIT(&on_done_recv_trailing_metadata_,
140                     OnDoneRecvTrailingMetadataCb, elem,
141                     grpc_schedule_on_exec_ctx);
142   return GRPC_ERROR_NONE;
143 }
144 
Destroy(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_call_closure)145 void CensusClientCallData::Destroy(grpc_call_element* elem,
146                                    const grpc_call_final_info* final_info,
147                                    grpc_closure* then_call_closure) {
148   const uint64_t request_size = GetOutgoingDataSize(final_info);
149   const uint64_t response_size = GetIncomingDataSize(final_info);
150   double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
151   ::opencensus::stats::Record(
152       {{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)},
153        {RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)},
154        {RpcClientRoundtripLatency(), latency_ms},
155        {RpcClientServerLatency(),
156         ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))},
157        {RpcClientSentMessagesPerRpc(), sent_message_count_},
158        {RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
159       {{ClientMethodTagKey(), method_},
160        {ClientStatusTagKey(), StatusCodeToString(final_info->final_status)}});
161   grpc_slice_unref_internal(path_);
162   context_.EndSpan();
163 }
164 
165 }  // namespace grpc
166