1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/cpp/ext/filters/census/client_filter.h"
22
23 #include "absl/strings/str_cat.h"
24 #include "absl/strings/string_view.h"
25 #include "opencensus/stats/stats.h"
26 #include "src/core/lib/surface/call.h"
27 #include "src/cpp/ext/filters/census/grpc_plugin.h"
28 #include "src/cpp/ext/filters/census/measures.h"
29
30 namespace grpc {
31
32 constexpr uint32_t CensusClientCallData::kMaxTraceContextLen;
33 constexpr uint32_t CensusClientCallData::kMaxTagsLen;
34
35 namespace {
36
FilterTrailingMetadata(grpc_metadata_batch * b,uint64_t * elapsed_time)37 void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
38 if (b->idx.named.grpc_server_stats_bin != nullptr) {
39 ServerStatsDeserialize(
40 reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(
41 GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md))),
42 GRPC_SLICE_LENGTH(GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md)),
43 elapsed_time);
44 grpc_metadata_batch_remove(b, b->idx.named.grpc_server_stats_bin);
45 }
46 }
47
48 } // namespace
49
OnDoneRecvTrailingMetadataCb(void * user_data,grpc_error * error)50 void CensusClientCallData::OnDoneRecvTrailingMetadataCb(void* user_data,
51 grpc_error* error) {
52 grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
53 CensusClientCallData* calld =
54 reinterpret_cast<CensusClientCallData*>(elem->call_data);
55 GPR_ASSERT(calld != nullptr);
56 if (error == GRPC_ERROR_NONE) {
57 GPR_ASSERT(calld->recv_trailing_metadata_ != nullptr);
58 FilterTrailingMetadata(calld->recv_trailing_metadata_,
59 &calld->elapsed_time_);
60 }
61 GRPC_CLOSURE_RUN(calld->initial_on_done_recv_trailing_metadata_,
62 GRPC_ERROR_REF(error));
63 }
64
OnDoneRecvMessageCb(void * user_data,grpc_error * error)65 void CensusClientCallData::OnDoneRecvMessageCb(void* user_data,
66 grpc_error* error) {
67 grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
68 CensusClientCallData* calld =
69 reinterpret_cast<CensusClientCallData*>(elem->call_data);
70 CensusChannelData* channeld =
71 reinterpret_cast<CensusChannelData*>(elem->channel_data);
72 GPR_ASSERT(calld != nullptr);
73 GPR_ASSERT(channeld != nullptr);
74 // Stream messages are no longer valid after receiving trailing metadata.
75 if ((*calld->recv_message_) != nullptr) {
76 calld->recv_message_count_++;
77 }
78 GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error));
79 }
80
StartTransportStreamOpBatch(grpc_call_element * elem,TransportStreamOpBatch * op)81 void CensusClientCallData::StartTransportStreamOpBatch(
82 grpc_call_element* elem, TransportStreamOpBatch* op) {
83 if (op->send_initial_metadata() != nullptr) {
84 census_context* ctxt = op->get_census_context();
85 GenerateClientContext(
86 qualified_method_, &context_,
87 (ctxt == nullptr) ? nullptr : reinterpret_cast<CensusContext*>(ctxt));
88 size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_,
89 kMaxTraceContextLen);
90 if (tracing_len > 0) {
91 GRPC_LOG_IF_ERROR(
92 "census grpc_filter",
93 grpc_metadata_batch_add_tail(
94 op->send_initial_metadata()->batch(), &tracing_bin_,
95 grpc_mdelem_from_slices(
96 GRPC_MDSTR_GRPC_TRACE_BIN,
97 grpc_slice_from_copied_buffer(tracing_buf_, tracing_len))));
98 }
99 grpc_slice tags = grpc_empty_slice();
100 // TODO: Add in tagging serialization.
101 size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
102 if (encoded_tags_len > 0) {
103 GRPC_LOG_IF_ERROR(
104 "census grpc_filter",
105 grpc_metadata_batch_add_tail(
106 op->send_initial_metadata()->batch(), &stats_bin_,
107 grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags)));
108 }
109 }
110
111 if (op->send_message() != nullptr) {
112 ++sent_message_count_;
113 }
114 if (op->recv_message() != nullptr) {
115 recv_message_ = op->op()->payload->recv_message.recv_message;
116 initial_on_done_recv_message_ =
117 op->op()->payload->recv_message.recv_message_ready;
118 op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
119 }
120 if (op->recv_trailing_metadata() != nullptr) {
121 recv_trailing_metadata_ = op->recv_trailing_metadata()->batch();
122 initial_on_done_recv_trailing_metadata_ =
123 op->op()->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
124 op->op()->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
125 &on_done_recv_trailing_metadata_;
126 }
127 // Call next op.
128 grpc_call_next_op(elem, op->op());
129 }
130
Init(grpc_call_element * elem,const grpc_call_element_args * args)131 grpc_error* CensusClientCallData::Init(grpc_call_element* elem,
132 const grpc_call_element_args* args) {
133 path_ = grpc_slice_ref_internal(args->path);
134 start_time_ = absl::Now();
135 method_ = GetMethod(&path_);
136 qualified_method_ = absl::StrCat("Sent.", method_);
137 GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
138 grpc_schedule_on_exec_ctx);
139 GRPC_CLOSURE_INIT(&on_done_recv_trailing_metadata_,
140 OnDoneRecvTrailingMetadataCb, elem,
141 grpc_schedule_on_exec_ctx);
142 return GRPC_ERROR_NONE;
143 }
144
Destroy(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_call_closure)145 void CensusClientCallData::Destroy(grpc_call_element* elem,
146 const grpc_call_final_info* final_info,
147 grpc_closure* then_call_closure) {
148 const uint64_t request_size = GetOutgoingDataSize(final_info);
149 const uint64_t response_size = GetIncomingDataSize(final_info);
150 double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
151 ::opencensus::stats::Record(
152 {{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)},
153 {RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)},
154 {RpcClientRoundtripLatency(), latency_ms},
155 {RpcClientServerLatency(),
156 ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))},
157 {RpcClientSentMessagesPerRpc(), sent_message_count_},
158 {RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
159 {{ClientMethodTagKey(), method_},
160 {ClientStatusTagKey(), StatusCodeToString(final_info->final_status)}});
161 grpc_slice_unref_internal(path_);
162 context_.EndSpan();
163 }
164
165 } // namespace grpc
166