1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/cpp/ext/filters/census/server_filter.h"
22
23 #include "absl/strings/str_cat.h"
24 #include "absl/strings/string_view.h"
25 #include "absl/time/clock.h"
26 #include "absl/time/time.h"
27 #include "opencensus/stats/stats.h"
28 #include "src/core/lib/surface/call.h"
29 #include "src/cpp/ext/filters/census/grpc_plugin.h"
30 #include "src/cpp/ext/filters/census/measures.h"
31
32 namespace grpc {
33
34 constexpr uint32_t CensusServerCallData::kMaxServerStatsLen;
35
36 namespace {
37
38 // server metadata elements
39 struct ServerMetadataElements {
40 grpc_slice path;
41 grpc_slice tracing_slice;
42 grpc_slice census_proto;
43 };
44
FilterInitialMetadata(grpc_metadata_batch * b,ServerMetadataElements * sml)45 void FilterInitialMetadata(grpc_metadata_batch* b,
46 ServerMetadataElements* sml) {
47 if (b->idx.named.path != nullptr) {
48 sml->path = grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.path->md));
49 }
50 if (b->idx.named.grpc_trace_bin != nullptr) {
51 sml->tracing_slice =
52 grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_trace_bin->md));
53 grpc_metadata_batch_remove(b, b->idx.named.grpc_trace_bin);
54 }
55 if (b->idx.named.grpc_tags_bin != nullptr) {
56 sml->census_proto =
57 grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_tags_bin->md));
58 grpc_metadata_batch_remove(b, b->idx.named.grpc_tags_bin);
59 }
60 }
61
62 } // namespace
63
OnDoneRecvMessageCb(void * user_data,grpc_error * error)64 void CensusServerCallData::OnDoneRecvMessageCb(void* user_data,
65 grpc_error* error) {
66 grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
67 CensusServerCallData* calld =
68 reinterpret_cast<CensusServerCallData*>(elem->call_data);
69 CensusChannelData* channeld =
70 reinterpret_cast<CensusChannelData*>(elem->channel_data);
71 GPR_ASSERT(calld != nullptr);
72 GPR_ASSERT(channeld != nullptr);
73 // Stream messages are no longer valid after receiving trailing metadata.
74 if ((*calld->recv_message_) != nullptr) {
75 ++calld->recv_message_count_;
76 }
77 GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error));
78 }
79
OnDoneRecvInitialMetadataCb(void * user_data,grpc_error * error)80 void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data,
81 grpc_error* error) {
82 grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
83 CensusServerCallData* calld =
84 reinterpret_cast<CensusServerCallData*>(elem->call_data);
85 GPR_ASSERT(calld != nullptr);
86 if (error == GRPC_ERROR_NONE) {
87 grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_;
88 GPR_ASSERT(initial_metadata != nullptr);
89 ServerMetadataElements sml;
90 sml.path = grpc_empty_slice();
91 sml.tracing_slice = grpc_empty_slice();
92 sml.census_proto = grpc_empty_slice();
93 FilterInitialMetadata(initial_metadata, &sml);
94 calld->path_ = grpc_slice_ref_internal(sml.path);
95 calld->method_ = GetMethod(&calld->path_);
96 calld->qualified_method_ = absl::StrCat("Recv.", calld->method_);
97 const char* tracing_str =
98 GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
99 ? ""
100 : reinterpret_cast<const char*>(
101 GRPC_SLICE_START_PTR(sml.tracing_slice));
102 size_t tracing_str_len = GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
103 ? 0
104 : GRPC_SLICE_LENGTH(sml.tracing_slice);
105 const char* census_str = GRPC_SLICE_IS_EMPTY(sml.census_proto)
106 ? ""
107 : reinterpret_cast<const char*>(
108 GRPC_SLICE_START_PTR(sml.census_proto));
109 size_t census_str_len = GRPC_SLICE_IS_EMPTY(sml.census_proto)
110 ? 0
111 : GRPC_SLICE_LENGTH(sml.census_proto);
112
113 GenerateServerContext(absl::string_view(tracing_str, tracing_str_len),
114 absl::string_view(census_str, census_str_len),
115 /*primary_role*/ "", calld->qualified_method_,
116 &calld->context_);
117
118 grpc_slice_unref_internal(sml.tracing_slice);
119 grpc_slice_unref_internal(sml.census_proto);
120 grpc_slice_unref_internal(sml.path);
121 grpc_census_call_set_context(
122 calld->gc_, reinterpret_cast<census_context*>(&calld->context_));
123 }
124 GRPC_CLOSURE_RUN(calld->initial_on_done_recv_initial_metadata_,
125 GRPC_ERROR_REF(error));
126 }
127
StartTransportStreamOpBatch(grpc_call_element * elem,TransportStreamOpBatch * op)128 void CensusServerCallData::StartTransportStreamOpBatch(
129 grpc_call_element* elem, TransportStreamOpBatch* op) {
130 if (op->recv_initial_metadata() != nullptr) {
131 // substitute our callback for the op callback
132 recv_initial_metadata_ = op->recv_initial_metadata()->batch();
133 initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready();
134 op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_);
135 }
136 if (op->send_message() != nullptr) {
137 ++sent_message_count_;
138 }
139 if (op->recv_message() != nullptr) {
140 recv_message_ = op->op()->payload->recv_message.recv_message;
141 initial_on_done_recv_message_ =
142 op->op()->payload->recv_message.recv_message_ready;
143 op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
144 }
145 // We need to record the time when the trailing metadata was sent to mark the
146 // completeness of the request.
147 if (op->send_trailing_metadata() != nullptr) {
148 elapsed_time_ = absl::Now() - start_time_;
149 size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
150 stats_buf_, kMaxServerStatsLen);
151 if (len > 0) {
152 GRPC_LOG_IF_ERROR(
153 "census grpc_filter",
154 grpc_metadata_batch_add_tail(
155 op->send_trailing_metadata()->batch(), &census_bin_,
156 grpc_mdelem_from_slices(
157 GRPC_MDSTR_GRPC_SERVER_STATS_BIN,
158 grpc_slice_from_copied_buffer(stats_buf_, len))));
159 }
160 }
161 // Call next op.
162 grpc_call_next_op(elem, op->op());
163 }
164
Init(grpc_call_element * elem,const grpc_call_element_args * args)165 grpc_error* CensusServerCallData::Init(grpc_call_element* elem,
166 const grpc_call_element_args* args) {
167 start_time_ = absl::Now();
168 gc_ =
169 grpc_call_from_top_element(grpc_call_stack_element(args->call_stack, 0));
170 GRPC_CLOSURE_INIT(&on_done_recv_initial_metadata_,
171 OnDoneRecvInitialMetadataCb, elem,
172 grpc_schedule_on_exec_ctx);
173 GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
174 grpc_schedule_on_exec_ctx);
175 auth_context_ = grpc_call_auth_context(gc_);
176 return GRPC_ERROR_NONE;
177 }
178
Destroy(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_call_closure)179 void CensusServerCallData::Destroy(grpc_call_element* elem,
180 const grpc_call_final_info* final_info,
181 grpc_closure* then_call_closure) {
182 const uint64_t request_size = GetOutgoingDataSize(final_info);
183 const uint64_t response_size = GetIncomingDataSize(final_info);
184 double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_);
185 grpc_auth_context_release(auth_context_);
186 ::opencensus::stats::Record(
187 {{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)},
188 {RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)},
189 {RpcServerServerLatency(), elapsed_time_ms},
190 {RpcServerSentMessagesPerRpc(), sent_message_count_},
191 {RpcServerReceivedMessagesPerRpc(), recv_message_count_}},
192 {{ServerMethodTagKey(), method_},
193 {ServerStatusTagKey(), StatusCodeToString(final_info->final_status)}});
194 grpc_slice_unref_internal(path_);
195 context_.EndSpan();
196 }
197
198 } // namespace grpc
199