• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <string>
22 #include <utility>
23 #include <vector>
24 
25 #include "src/cpp/ext/filters/census/client_filter.h"
26 
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/string_view.h"
29 #include "opencensus/stats/stats.h"
30 #include "opencensus/tags/tag_key.h"
31 #include "opencensus/tags/tag_map.h"
32 #include "src/core/lib/surface/call.h"
33 #include "src/cpp/ext/filters/census/grpc_plugin.h"
34 #include "src/cpp/ext/filters/census/measures.h"
35 
36 namespace grpc {
37 
38 constexpr uint32_t CensusClientCallData::kMaxTraceContextLen;
39 constexpr uint32_t CensusClientCallData::kMaxTagsLen;
40 
41 namespace {
42 
FilterTrailingMetadata(grpc_metadata_batch * b,uint64_t * elapsed_time)43 void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
44   if (b->idx.named.grpc_server_stats_bin != nullptr) {
45     ServerStatsDeserialize(
46         reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(
47             GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md))),
48         GRPC_SLICE_LENGTH(GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md)),
49         elapsed_time);
50     grpc_metadata_batch_remove(b, b->idx.named.grpc_server_stats_bin);
51   }
52 }
53 
54 }  // namespace
55 
OnDoneRecvTrailingMetadataCb(void * user_data,grpc_error * error)56 void CensusClientCallData::OnDoneRecvTrailingMetadataCb(void* user_data,
57                                                         grpc_error* error) {
58   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
59   CensusClientCallData* calld =
60       reinterpret_cast<CensusClientCallData*>(elem->call_data);
61   GPR_ASSERT(calld != nullptr);
62   if (error == GRPC_ERROR_NONE) {
63     GPR_ASSERT(calld->recv_trailing_metadata_ != nullptr);
64     FilterTrailingMetadata(calld->recv_trailing_metadata_,
65                            &calld->elapsed_time_);
66   }
67   grpc_core::Closure::Run(DEBUG_LOCATION,
68                           calld->initial_on_done_recv_trailing_metadata_,
69                           GRPC_ERROR_REF(error));
70 }
71 
OnDoneRecvMessageCb(void * user_data,grpc_error * error)72 void CensusClientCallData::OnDoneRecvMessageCb(void* user_data,
73                                                grpc_error* error) {
74   grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
75   CensusClientCallData* calld =
76       reinterpret_cast<CensusClientCallData*>(elem->call_data);
77   CensusChannelData* channeld =
78       reinterpret_cast<CensusChannelData*>(elem->channel_data);
79   GPR_ASSERT(calld != nullptr);
80   GPR_ASSERT(channeld != nullptr);
81   // Stream messages are no longer valid after receiving trailing metadata.
82   if ((*calld->recv_message_) != nullptr) {
83     calld->recv_message_count_++;
84   }
85   grpc_core::Closure::Run(DEBUG_LOCATION, calld->initial_on_done_recv_message_,
86                           GRPC_ERROR_REF(error));
87 }
88 
StartTransportStreamOpBatch(grpc_call_element * elem,TransportStreamOpBatch * op)89 void CensusClientCallData::StartTransportStreamOpBatch(
90     grpc_call_element* elem, TransportStreamOpBatch* op) {
91   if (op->send_initial_metadata() != nullptr) {
92     census_context* ctxt = op->get_census_context();
93     GenerateClientContext(
94         qualified_method_, &context_,
95         (ctxt == nullptr) ? nullptr : reinterpret_cast<CensusContext*>(ctxt));
96     size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_,
97                                                kMaxTraceContextLen);
98     if (tracing_len > 0) {
99       GRPC_LOG_IF_ERROR(
100           "census grpc_filter",
101           grpc_metadata_batch_add_tail(
102               op->send_initial_metadata()->batch(), &tracing_bin_,
103               grpc_mdelem_from_slices(
104                   GRPC_MDSTR_GRPC_TRACE_BIN,
105                   grpc_core::UnmanagedMemorySlice(tracing_buf_, tracing_len)),
106               GRPC_BATCH_GRPC_TRACE_BIN));
107     }
108     grpc_slice tags = grpc_empty_slice();
109     // TODO(unknown): Add in tagging serialization.
110     size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
111     if (encoded_tags_len > 0) {
112       GRPC_LOG_IF_ERROR(
113           "census grpc_filter",
114           grpc_metadata_batch_add_tail(
115               op->send_initial_metadata()->batch(), &stats_bin_,
116               grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags),
117               GRPC_BATCH_GRPC_TAGS_BIN));
118     }
119   }
120 
121   if (op->send_message() != nullptr) {
122     ++sent_message_count_;
123   }
124   if (op->recv_message() != nullptr) {
125     recv_message_ = op->op()->payload->recv_message.recv_message;
126     initial_on_done_recv_message_ =
127         op->op()->payload->recv_message.recv_message_ready;
128     op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
129   }
130   if (op->recv_trailing_metadata() != nullptr) {
131     recv_trailing_metadata_ = op->recv_trailing_metadata()->batch();
132     initial_on_done_recv_trailing_metadata_ =
133         op->op()->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
134     op->op()->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
135         &on_done_recv_trailing_metadata_;
136   }
137   // Call next op.
138   grpc_call_next_op(elem, op->op());
139 }
140 
Init(grpc_call_element * elem,const grpc_call_element_args * args)141 grpc_error* CensusClientCallData::Init(grpc_call_element* elem,
142                                        const grpc_call_element_args* args) {
143   path_ = grpc_slice_ref_internal(args->path);
144   start_time_ = absl::Now();
145   method_ = GetMethod(&path_);
146   qualified_method_ = absl::StrCat("Sent.", method_);
147   GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
148                     grpc_schedule_on_exec_ctx);
149   GRPC_CLOSURE_INIT(&on_done_recv_trailing_metadata_,
150                     OnDoneRecvTrailingMetadataCb, elem,
151                     grpc_schedule_on_exec_ctx);
152   return GRPC_ERROR_NONE;
153 }
154 
Destroy(grpc_call_element *,const grpc_call_final_info * final_info,grpc_closure *)155 void CensusClientCallData::Destroy(grpc_call_element* /*elem*/,
156                                    const grpc_call_final_info* final_info,
157                                    grpc_closure* /*then_call_closure*/) {
158   const uint64_t request_size = GetOutgoingDataSize(final_info);
159   const uint64_t response_size = GetIncomingDataSize(final_info);
160   double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
161   std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
162       context_.tags().tags();
163   std::string method = absl::StrCat(method_);
164   tags.emplace_back(ClientMethodTagKey(), method);
165   std::string final_status =
166       absl::StrCat(StatusCodeToString(final_info->final_status));
167   tags.emplace_back(ClientStatusTagKey(), final_status);
168   ::opencensus::stats::Record(
169       {{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)},
170        {RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)},
171        {RpcClientRoundtripLatency(), latency_ms},
172        {RpcClientServerLatency(),
173         ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))},
174        {RpcClientSentMessagesPerRpc(), sent_message_count_},
175        {RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
176       tags);
177   grpc_slice_unref_internal(path_);
178   if (final_info->final_status != GRPC_STATUS_OK) {
179     // TODO(unknown): Map grpc_status_code to trace::StatusCode.
180     context_.Span().SetStatus(opencensus::trace::StatusCode::UNKNOWN,
181                               StatusCodeToString(final_info->final_status));
182   }
183   context_.EndSpan();
184 }
185 
186 }  // namespace grpc
187