1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "test/cpp/interop/xds_stats_watcher.h"
16
17 #include <map>
18
19 #include "absl/algorithm/container.h"
20 #include "absl/log/check.h"
21 #include "absl/strings/ascii.h"
22
23 namespace grpc {
24 namespace testing {
25
26 namespace {
27
AddRpcMetadata(LoadBalancerStatsResponse::RpcMetadata * rpc_metadata,const std::unordered_set<std::string> & included_keys,bool include_all_keys,const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,LoadBalancerStatsResponse::MetadataType type)28 void AddRpcMetadata(
29 LoadBalancerStatsResponse::RpcMetadata* rpc_metadata,
30 const std::unordered_set<std::string>& included_keys, bool include_all_keys,
31 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
32 LoadBalancerStatsResponse::MetadataType type) {
33 for (const auto& key_value : metadata) {
34 absl::string_view key(key_value.first.data(), key_value.first.length());
35 if (include_all_keys ||
36 included_keys.find(absl::AsciiStrToLower(key)) != included_keys.end()) {
37 auto entry = rpc_metadata->add_metadata();
38 entry->set_key(key);
39 entry->set_value(absl::string_view(key_value.second.data(),
40 key_value.second.length()));
41 entry->set_type(type);
42 }
43 }
44 }
45
ToLowerCase(absl::Span<const std::string> strings)46 std::unordered_set<std::string> ToLowerCase(
47 absl::Span<const std::string> strings) {
48 std::unordered_set<std::string> result;
49 for (const auto& str : strings) {
50 result.emplace(absl::AsciiStrToLower(str));
51 }
52 return result;
53 }
54
HasNonEmptyMetadata(const std::map<std::string,LoadBalancerStatsResponse::MetadataByPeer> & metadata_by_peer)55 bool HasNonEmptyMetadata(
56 const std::map<std::string, LoadBalancerStatsResponse::MetadataByPeer>&
57 metadata_by_peer) {
58 for (const auto& entry : metadata_by_peer) {
59 for (const auto& rpc_metadata : entry.second.rpc_metadata()) {
60 if (rpc_metadata.metadata_size() > 0) {
61 return true;
62 }
63 }
64 }
65 return false;
66 }
67
68 } // namespace
69
XdsStatsWatcher(int start_id,int end_id,absl::Span<const std::string> metadata_keys)70 XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id,
71 absl::Span<const std::string> metadata_keys)
72 : start_id_(start_id),
73 end_id_(end_id),
74 rpcs_needed_(end_id - start_id),
75 metadata_keys_(ToLowerCase(metadata_keys)),
76 include_all_metadata_(
77 absl::c_any_of(metadata_keys, [](absl::string_view key) {
78 return absl::StripAsciiWhitespace(key) == "*";
79 })) {}
80
RpcCompleted(const AsyncClientCallResult & call,const std::string & peer,const std::multimap<grpc::string_ref,grpc::string_ref> & initial_metadata,const std::multimap<grpc::string_ref,grpc::string_ref> & trailing_metadata)81 void XdsStatsWatcher::RpcCompleted(
82 const AsyncClientCallResult& call, const std::string& peer,
83 const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata,
84 const std::multimap<grpc::string_ref, grpc::string_ref>&
85 trailing_metadata) {
86 // We count RPCs for global watcher or if the request_id falls into the
87 // watcher's interested range of request ids.
88 if ((start_id_ == 0 && end_id_ == 0) ||
89 (start_id_ <= call.saved_request_id && call.saved_request_id < end_id_)) {
90 {
91 std::lock_guard<std::mutex> lock(m_);
92 if (peer.empty()) {
93 no_remote_peer_++;
94 ++no_remote_peer_by_type_[call.rpc_type];
95 } else {
96 // RPC is counted into both per-peer bin and per-method-per-peer bin.
97 rpcs_by_peer_[peer]++;
98 rpcs_by_type_[call.rpc_type][peer]++;
99 auto* rpc_metadata = metadata_by_peer_[peer].add_rpc_metadata();
100 AddRpcMetadata(rpc_metadata, metadata_keys_, include_all_metadata_,
101 initial_metadata, LoadBalancerStatsResponse::INITIAL);
102 AddRpcMetadata(rpc_metadata, metadata_keys_, include_all_metadata_,
103 trailing_metadata, LoadBalancerStatsResponse::TRAILING);
104 }
105 rpcs_needed_--;
106 // Report accumulated stats.
107 auto& stats_per_method = *accumulated_stats_.mutable_stats_per_method();
108 auto& method_stat =
109 stats_per_method[ClientConfigureRequest_RpcType_Name(call.rpc_type)];
110 auto& result = *method_stat.mutable_result();
111 grpc_status_code code =
112 static_cast<grpc_status_code>(call.status.error_code());
113 auto& num_rpcs = result[code];
114 ++num_rpcs;
115 auto rpcs_started = method_stat.rpcs_started();
116 method_stat.set_rpcs_started(++rpcs_started);
117 }
118 cv_.notify_one();
119 }
120 }
121
WaitForRpcStatsResponse(int timeout_sec)122 LoadBalancerStatsResponse XdsStatsWatcher::WaitForRpcStatsResponse(
123 int timeout_sec) {
124 LoadBalancerStatsResponse response;
125 std::unique_lock<std::mutex> lock(m_);
126 cv_.wait_for(lock, std::chrono::seconds(timeout_sec),
127 [this] { return rpcs_needed_ == 0; });
128 response.mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
129 rpcs_by_peer_.end());
130 // Return metadata if at least one RPC had relevant metadata. Note that empty
131 // entries would be returned for RCPs with no relevant metadata in this case.
132 if (HasNonEmptyMetadata(metadata_by_peer_)) {
133 response.mutable_metadatas_by_peer()->insert(metadata_by_peer_.begin(),
134 metadata_by_peer_.end());
135 }
136 auto& response_rpcs_by_method = *response.mutable_rpcs_by_method();
137 for (const auto& rpc_by_type : rpcs_by_type_) {
138 std::string method_name;
139 if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) {
140 method_name = "EmptyCall";
141 } else if (rpc_by_type.first == ClientConfigureRequest::UNARY_CALL) {
142 method_name = "UnaryCall";
143 } else {
144 CHECK(0);
145 }
146 // TODO(@donnadionne): When the test runner changes to accept EMPTY_CALL
147 // and UNARY_CALL we will just use the name of the enum instead of the
148 // method_name variable.
149 auto& response_rpc_by_method = response_rpcs_by_method[method_name];
150 auto& response_rpcs_by_peer =
151 *response_rpc_by_method.mutable_rpcs_by_peer();
152 for (const auto& rpc_by_peer : rpc_by_type.second) {
153 auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first];
154 response_rpc_by_peer = rpc_by_peer.second;
155 }
156 }
157 response.set_num_failures(no_remote_peer_ + rpcs_needed_);
158 return response;
159 }
160
GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse * response,StatsWatchers * stats_watchers)161 void XdsStatsWatcher::GetCurrentRpcStats(
162 LoadBalancerAccumulatedStatsResponse* response,
163 StatsWatchers* stats_watchers) {
164 std::unique_lock<std::mutex> lock(m_);
165 response->CopyFrom(accumulated_stats_);
166 // TODO(someone): delete deprecated stats below when the test is no
167 // longer using them.
168 // NOLINTBEGIN(clang-diagnostic-deprecated-declarations)
169 auto& response_rpcs_started_by_method =
170 *response->mutable_num_rpcs_started_by_method();
171 auto& response_rpcs_succeeded_by_method =
172 *response->mutable_num_rpcs_succeeded_by_method();
173 auto& response_rpcs_failed_by_method =
174 *response->mutable_num_rpcs_failed_by_method();
175 // NOLINTEND(clang-diagnostic-deprecated-declarations)
176 for (const auto& rpc_by_type : rpcs_by_type_) {
177 auto total_succeeded = 0;
178 for (const auto& rpc_by_peer : rpc_by_type.second) {
179 total_succeeded += rpc_by_peer.second;
180 }
181 response_rpcs_succeeded_by_method[ClientConfigureRequest_RpcType_Name(
182 rpc_by_type.first)] = total_succeeded;
183 response_rpcs_started_by_method[ClientConfigureRequest_RpcType_Name(
184 rpc_by_type.first)] =
185 stats_watchers->global_request_id_by_type[rpc_by_type.first];
186 response_rpcs_failed_by_method[ClientConfigureRequest_RpcType_Name(
187 rpc_by_type.first)] = no_remote_peer_by_type_[rpc_by_type.first];
188 }
189 }
190
191 } // namespace testing
192 } // namespace grpc
193