• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/cpp/ext/filters/census/server_filter.h"
22 
23 #include "absl/strings/str_cat.h"
24 #include "absl/strings/string_view.h"
25 #include "absl/time/clock.h"
26 #include "absl/time/time.h"
27 #include "opencensus/stats/stats.h"
28 #include "src/core/lib/surface/call.h"
29 #include "src/cpp/ext/filters/census/grpc_plugin.h"
30 #include "src/cpp/ext/filters/census/measures.h"
31 
32 namespace grpc {
33 
34 constexpr uint32_t CensusServerCallData::kMaxServerStatsLen;
35 
36 namespace {
37 
38 // server metadata elements
39 struct ServerMetadataElements {
40   grpc_slice path;
41   grpc_slice tracing_slice;
42   grpc_slice census_proto;
43 };
44 
FilterInitialMetadata(grpc_metadata_batch * b,ServerMetadataElements * sml)45 void FilterInitialMetadata(grpc_metadata_batch* b,
46                            ServerMetadataElements* sml) {
47   if (b->idx.named.path != nullptr) {
48     sml->path = grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.path->md));
49   }
50   if (b->idx.named.grpc_trace_bin != nullptr) {
51     sml->tracing_slice =
52         grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_trace_bin->md));
53     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_TRACE_BIN);
54   }
55   if (b->idx.named.grpc_tags_bin != nullptr) {
56     sml->census_proto =
57         grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_tags_bin->md));
58     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_TAGS_BIN);
59   }
60 }
61 
62 }  // namespace
63 
OnDoneRecvMessageCb(void * user_data,grpc_error * error)64 void CensusServerCallData::OnDoneRecvMessageCb(void* user_data,
65                                                grpc_error* error) {
66   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
67   CensusServerCallData* calld =
68       reinterpret_cast<CensusServerCallData*>(elem->call_data);
69   CensusChannelData* channeld =
70       reinterpret_cast<CensusChannelData*>(elem->channel_data);
71   GPR_ASSERT(calld != nullptr);
72   GPR_ASSERT(channeld != nullptr);
73   // Stream messages are no longer valid after receiving trailing metadata.
74   if ((*calld->recv_message_) != nullptr) {
75     ++calld->recv_message_count_;
76   }
77   grpc_core::Closure::Run(DEBUG_LOCATION, calld->initial_on_done_recv_message_,
78                           GRPC_ERROR_REF(error));
79 }
80 
OnDoneRecvInitialMetadataCb(void * user_data,grpc_error * error)81 void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data,
82                                                        grpc_error* error) {
83   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
84   CensusServerCallData* calld =
85       reinterpret_cast<CensusServerCallData*>(elem->call_data);
86   GPR_ASSERT(calld != nullptr);
87   if (error == GRPC_ERROR_NONE) {
88     grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_;
89     GPR_ASSERT(initial_metadata != nullptr);
90     ServerMetadataElements sml;
91     sml.path = grpc_empty_slice();
92     sml.tracing_slice = grpc_empty_slice();
93     sml.census_proto = grpc_empty_slice();
94     FilterInitialMetadata(initial_metadata, &sml);
95     calld->path_ = grpc_slice_ref_internal(sml.path);
96     calld->method_ = GetMethod(&calld->path_);
97     calld->qualified_method_ = absl::StrCat("Recv.", calld->method_);
98     const char* tracing_str =
99         GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
100             ? ""
101             : reinterpret_cast<const char*>(
102                   GRPC_SLICE_START_PTR(sml.tracing_slice));
103     size_t tracing_str_len = GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
104                                  ? 0
105                                  : GRPC_SLICE_LENGTH(sml.tracing_slice);
106     const char* census_str = GRPC_SLICE_IS_EMPTY(sml.census_proto)
107                                  ? ""
108                                  : reinterpret_cast<const char*>(
109                                        GRPC_SLICE_START_PTR(sml.census_proto));
110     size_t census_str_len = GRPC_SLICE_IS_EMPTY(sml.census_proto)
111                                 ? 0
112                                 : GRPC_SLICE_LENGTH(sml.census_proto);
113 
114     GenerateServerContext(absl::string_view(tracing_str, tracing_str_len),
115                           absl::string_view(census_str, census_str_len),
116                           /*primary_role*/ "", calld->qualified_method_,
117                           &calld->context_);
118 
119     grpc_slice_unref_internal(sml.tracing_slice);
120     grpc_slice_unref_internal(sml.census_proto);
121     grpc_slice_unref_internal(sml.path);
122     grpc_census_call_set_context(
123         calld->gc_, reinterpret_cast<census_context*>(&calld->context_));
124   }
125   grpc_core::Closure::Run(DEBUG_LOCATION,
126                           calld->initial_on_done_recv_initial_metadata_,
127                           GRPC_ERROR_REF(error));
128 }
129 
StartTransportStreamOpBatch(grpc_call_element * elem,TransportStreamOpBatch * op)130 void CensusServerCallData::StartTransportStreamOpBatch(
131     grpc_call_element* elem, TransportStreamOpBatch* op) {
132   if (op->recv_initial_metadata() != nullptr) {
133     // substitute our callback for the op callback
134     recv_initial_metadata_ = op->recv_initial_metadata()->batch();
135     initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready();
136     op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_);
137   }
138   if (op->send_message() != nullptr) {
139     ++sent_message_count_;
140   }
141   if (op->recv_message() != nullptr) {
142     recv_message_ = op->op()->payload->recv_message.recv_message;
143     initial_on_done_recv_message_ =
144         op->op()->payload->recv_message.recv_message_ready;
145     op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
146   }
147   // We need to record the time when the trailing metadata was sent to mark the
148   // completeness of the request.
149   if (op->send_trailing_metadata() != nullptr) {
150     elapsed_time_ = absl::Now() - start_time_;
151     size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
152                                       stats_buf_, kMaxServerStatsLen);
153     if (len > 0) {
154       GRPC_LOG_IF_ERROR(
155           "census grpc_filter",
156           grpc_metadata_batch_add_tail(
157               op->send_trailing_metadata()->batch(), &census_bin_,
158               grpc_mdelem_from_slices(
159                   GRPC_MDSTR_GRPC_SERVER_STATS_BIN,
160                   grpc_core::UnmanagedMemorySlice(stats_buf_, len)),
161               GRPC_BATCH_GRPC_SERVER_STATS_BIN));
162     }
163   }
164   // Call next op.
165   grpc_call_next_op(elem, op->op());
166 }
167 
Init(grpc_call_element * elem,const grpc_call_element_args * args)168 grpc_error* CensusServerCallData::Init(grpc_call_element* elem,
169                                        const grpc_call_element_args* args) {
170   start_time_ = absl::Now();
171   gc_ =
172       grpc_call_from_top_element(grpc_call_stack_element(args->call_stack, 0));
173   GRPC_CLOSURE_INIT(&on_done_recv_initial_metadata_,
174                     OnDoneRecvInitialMetadataCb, elem,
175                     grpc_schedule_on_exec_ctx);
176   GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
177                     grpc_schedule_on_exec_ctx);
178   auth_context_ = grpc_call_auth_context(gc_);
179   return GRPC_ERROR_NONE;
180 }
181 
Destroy(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_call_closure)182 void CensusServerCallData::Destroy(grpc_call_element* elem,
183                                    const grpc_call_final_info* final_info,
184                                    grpc_closure* then_call_closure) {
185   const uint64_t request_size = GetOutgoingDataSize(final_info);
186   const uint64_t response_size = GetIncomingDataSize(final_info);
187   double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_);
188   grpc_auth_context_release(auth_context_);
189   ::opencensus::stats::Record(
190       {{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)},
191        {RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)},
192        {RpcServerServerLatency(), elapsed_time_ms},
193        {RpcServerSentMessagesPerRpc(), sent_message_count_},
194        {RpcServerReceivedMessagesPerRpc(), recv_message_count_}},
195       {{ServerMethodTagKey(), method_},
196        {ServerStatusTagKey(), StatusCodeToString(final_info->final_status)}});
197   grpc_slice_unref_internal(path_);
198   context_.EndSpan();
199 }
200 
201 }  // namespace grpc
202