• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/cpp/ext/filters/census/server_filter.h"
22 
23 #include "absl/strings/str_cat.h"
24 #include "absl/strings/string_view.h"
25 #include "absl/time/clock.h"
26 #include "absl/time/time.h"
27 #include "opencensus/stats/stats.h"
28 #include "src/core/lib/surface/call.h"
29 #include "src/cpp/ext/filters/census/grpc_plugin.h"
30 #include "src/cpp/ext/filters/census/measures.h"
31 
32 namespace grpc {
33 
34 constexpr uint32_t CensusServerCallData::kMaxServerStatsLen;
35 
36 namespace {
37 
38 // server metadata elements
39 struct ServerMetadataElements {
40   grpc_slice path;
41   grpc_slice tracing_slice;
42   grpc_slice census_proto;
43 };
44 
FilterInitialMetadata(grpc_metadata_batch * b,ServerMetadataElements * sml)45 void FilterInitialMetadata(grpc_metadata_batch* b,
46                            ServerMetadataElements* sml) {
47   if (b->idx.named.path != nullptr) {
48     sml->path = grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.path->md));
49   }
50   if (b->idx.named.grpc_trace_bin != nullptr) {
51     sml->tracing_slice =
52         grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_trace_bin->md));
53     grpc_metadata_batch_remove(b, b->idx.named.grpc_trace_bin);
54   }
55   if (b->idx.named.grpc_tags_bin != nullptr) {
56     sml->census_proto =
57         grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_tags_bin->md));
58     grpc_metadata_batch_remove(b, b->idx.named.grpc_tags_bin);
59   }
60 }
61 
62 }  // namespace
63 
OnDoneRecvMessageCb(void * user_data,grpc_error * error)64 void CensusServerCallData::OnDoneRecvMessageCb(void* user_data,
65                                                grpc_error* error) {
66   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
67   CensusServerCallData* calld =
68       reinterpret_cast<CensusServerCallData*>(elem->call_data);
69   CensusChannelData* channeld =
70       reinterpret_cast<CensusChannelData*>(elem->channel_data);
71   GPR_ASSERT(calld != nullptr);
72   GPR_ASSERT(channeld != nullptr);
73   // Stream messages are no longer valid after receiving trailing metadata.
74   if ((*calld->recv_message_) != nullptr) {
75     ++calld->recv_message_count_;
76   }
77   GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error));
78 }
79 
OnDoneRecvInitialMetadataCb(void * user_data,grpc_error * error)80 void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data,
81                                                        grpc_error* error) {
82   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
83   CensusServerCallData* calld =
84       reinterpret_cast<CensusServerCallData*>(elem->call_data);
85   GPR_ASSERT(calld != nullptr);
86   if (error == GRPC_ERROR_NONE) {
87     grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_;
88     GPR_ASSERT(initial_metadata != nullptr);
89     ServerMetadataElements sml;
90     sml.path = grpc_empty_slice();
91     sml.tracing_slice = grpc_empty_slice();
92     sml.census_proto = grpc_empty_slice();
93     FilterInitialMetadata(initial_metadata, &sml);
94     calld->path_ = grpc_slice_ref_internal(sml.path);
95     calld->method_ = GetMethod(&calld->path_);
96     calld->qualified_method_ = absl::StrCat("Recv.", calld->method_);
97     const char* tracing_str =
98         GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
99             ? ""
100             : reinterpret_cast<const char*>(
101                   GRPC_SLICE_START_PTR(sml.tracing_slice));
102     size_t tracing_str_len = GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
103                                  ? 0
104                                  : GRPC_SLICE_LENGTH(sml.tracing_slice);
105     const char* census_str = GRPC_SLICE_IS_EMPTY(sml.census_proto)
106                                  ? ""
107                                  : reinterpret_cast<const char*>(
108                                        GRPC_SLICE_START_PTR(sml.census_proto));
109     size_t census_str_len = GRPC_SLICE_IS_EMPTY(sml.census_proto)
110                                 ? 0
111                                 : GRPC_SLICE_LENGTH(sml.census_proto);
112 
113     GenerateServerContext(absl::string_view(tracing_str, tracing_str_len),
114                           absl::string_view(census_str, census_str_len),
115                           /*primary_role*/ "", calld->qualified_method_,
116                           &calld->context_);
117 
118     grpc_slice_unref_internal(sml.tracing_slice);
119     grpc_slice_unref_internal(sml.census_proto);
120     grpc_slice_unref_internal(sml.path);
121     grpc_census_call_set_context(
122         calld->gc_, reinterpret_cast<census_context*>(&calld->context_));
123   }
124   GRPC_CLOSURE_RUN(calld->initial_on_done_recv_initial_metadata_,
125                    GRPC_ERROR_REF(error));
126 }
127 
StartTransportStreamOpBatch(grpc_call_element * elem,TransportStreamOpBatch * op)128 void CensusServerCallData::StartTransportStreamOpBatch(
129     grpc_call_element* elem, TransportStreamOpBatch* op) {
130   if (op->recv_initial_metadata() != nullptr) {
131     // substitute our callback for the op callback
132     recv_initial_metadata_ = op->recv_initial_metadata()->batch();
133     initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready();
134     op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_);
135   }
136   if (op->send_message() != nullptr) {
137     ++sent_message_count_;
138   }
139   if (op->recv_message() != nullptr) {
140     recv_message_ = op->op()->payload->recv_message.recv_message;
141     initial_on_done_recv_message_ =
142         op->op()->payload->recv_message.recv_message_ready;
143     op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
144   }
145   // We need to record the time when the trailing metadata was sent to mark the
146   // completeness of the request.
147   if (op->send_trailing_metadata() != nullptr) {
148     elapsed_time_ = absl::Now() - start_time_;
149     size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
150                                       stats_buf_, kMaxServerStatsLen);
151     if (len > 0) {
152       GRPC_LOG_IF_ERROR(
153           "census grpc_filter",
154           grpc_metadata_batch_add_tail(
155               op->send_trailing_metadata()->batch(), &census_bin_,
156               grpc_mdelem_from_slices(
157                   GRPC_MDSTR_GRPC_SERVER_STATS_BIN,
158                   grpc_slice_from_copied_buffer(stats_buf_, len))));
159     }
160   }
161   // Call next op.
162   grpc_call_next_op(elem, op->op());
163 }
164 
Init(grpc_call_element * elem,const grpc_call_element_args * args)165 grpc_error* CensusServerCallData::Init(grpc_call_element* elem,
166                                        const grpc_call_element_args* args) {
167   start_time_ = absl::Now();
168   gc_ =
169       grpc_call_from_top_element(grpc_call_stack_element(args->call_stack, 0));
170   GRPC_CLOSURE_INIT(&on_done_recv_initial_metadata_,
171                     OnDoneRecvInitialMetadataCb, elem,
172                     grpc_schedule_on_exec_ctx);
173   GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
174                     grpc_schedule_on_exec_ctx);
175   auth_context_ = grpc_call_auth_context(gc_);
176   return GRPC_ERROR_NONE;
177 }
178 
Destroy(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_call_closure)179 void CensusServerCallData::Destroy(grpc_call_element* elem,
180                                    const grpc_call_final_info* final_info,
181                                    grpc_closure* then_call_closure) {
182   const uint64_t request_size = GetOutgoingDataSize(final_info);
183   const uint64_t response_size = GetIncomingDataSize(final_info);
184   double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_);
185   grpc_auth_context_release(auth_context_);
186   ::opencensus::stats::Record(
187       {{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)},
188        {RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)},
189        {RpcServerServerLatency(), elapsed_time_ms},
190        {RpcServerSentMessagesPerRpc(), sent_message_count_},
191        {RpcServerReceivedMessagesPerRpc(), recv_message_count_}},
192       {{ServerMethodTagKey(), method_},
193        {ServerStatusTagKey(), StatusCodeToString(final_info->final_status)}});
194   grpc_slice_unref_internal(path_);
195   context_.EndSpan();
196 }
197 
198 }  // namespace grpc
199