• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "test/cpp/interop/xds_stats_watcher.h"
16 
17 #include <map>
18 
19 #include "absl/algorithm/container.h"
20 #include "absl/log/check.h"
21 #include "absl/strings/ascii.h"
22 
23 namespace grpc {
24 namespace testing {
25 
26 namespace {
27 
AddRpcMetadata(LoadBalancerStatsResponse::RpcMetadata * rpc_metadata,const std::unordered_set<std::string> & included_keys,bool include_all_keys,const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,LoadBalancerStatsResponse::MetadataType type)28 void AddRpcMetadata(
29     LoadBalancerStatsResponse::RpcMetadata* rpc_metadata,
30     const std::unordered_set<std::string>& included_keys, bool include_all_keys,
31     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
32     LoadBalancerStatsResponse::MetadataType type) {
33   for (const auto& key_value : metadata) {
34     absl::string_view key(key_value.first.data(), key_value.first.length());
35     if (include_all_keys ||
36         included_keys.find(absl::AsciiStrToLower(key)) != included_keys.end()) {
37       auto entry = rpc_metadata->add_metadata();
38       entry->set_key(key);
39       entry->set_value(absl::string_view(key_value.second.data(),
40                                          key_value.second.length()));
41       entry->set_type(type);
42     }
43   }
44 }
45 
ToLowerCase(absl::Span<const std::string> strings)46 std::unordered_set<std::string> ToLowerCase(
47     absl::Span<const std::string> strings) {
48   std::unordered_set<std::string> result;
49   for (const auto& str : strings) {
50     result.emplace(absl::AsciiStrToLower(str));
51   }
52   return result;
53 }
54 
HasNonEmptyMetadata(const std::map<std::string,LoadBalancerStatsResponse::MetadataByPeer> & metadata_by_peer)55 bool HasNonEmptyMetadata(
56     const std::map<std::string, LoadBalancerStatsResponse::MetadataByPeer>&
57         metadata_by_peer) {
58   for (const auto& entry : metadata_by_peer) {
59     for (const auto& rpc_metadata : entry.second.rpc_metadata()) {
60       if (rpc_metadata.metadata_size() > 0) {
61         return true;
62       }
63     }
64   }
65   return false;
66 }
67 
68 }  // namespace
69 
XdsStatsWatcher(int start_id,int end_id,absl::Span<const std::string> metadata_keys)70 XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id,
71                                  absl::Span<const std::string> metadata_keys)
72     : start_id_(start_id),
73       end_id_(end_id),
74       rpcs_needed_(end_id - start_id),
75       metadata_keys_(ToLowerCase(metadata_keys)),
76       include_all_metadata_(
77           absl::c_any_of(metadata_keys, [](absl::string_view key) {
78             return absl::StripAsciiWhitespace(key) == "*";
79           })) {}
80 
RpcCompleted(const AsyncClientCallResult & call,const std::string & peer,const std::multimap<grpc::string_ref,grpc::string_ref> & initial_metadata,const std::multimap<grpc::string_ref,grpc::string_ref> & trailing_metadata)81 void XdsStatsWatcher::RpcCompleted(
82     const AsyncClientCallResult& call, const std::string& peer,
83     const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata,
84     const std::multimap<grpc::string_ref, grpc::string_ref>&
85         trailing_metadata) {
86   // We count RPCs for global watcher or if the request_id falls into the
87   // watcher's interested range of request ids.
88   if ((start_id_ == 0 && end_id_ == 0) ||
89       (start_id_ <= call.saved_request_id && call.saved_request_id < end_id_)) {
90     {
91       std::lock_guard<std::mutex> lock(m_);
92       if (peer.empty()) {
93         no_remote_peer_++;
94         ++no_remote_peer_by_type_[call.rpc_type];
95       } else {
96         // RPC is counted into both per-peer bin and per-method-per-peer bin.
97         rpcs_by_peer_[peer]++;
98         rpcs_by_type_[call.rpc_type][peer]++;
99         auto* rpc_metadata = metadata_by_peer_[peer].add_rpc_metadata();
100         AddRpcMetadata(rpc_metadata, metadata_keys_, include_all_metadata_,
101                        initial_metadata, LoadBalancerStatsResponse::INITIAL);
102         AddRpcMetadata(rpc_metadata, metadata_keys_, include_all_metadata_,
103                        trailing_metadata, LoadBalancerStatsResponse::TRAILING);
104       }
105       rpcs_needed_--;
106       // Report accumulated stats.
107       auto& stats_per_method = *accumulated_stats_.mutable_stats_per_method();
108       auto& method_stat =
109           stats_per_method[ClientConfigureRequest_RpcType_Name(call.rpc_type)];
110       auto& result = *method_stat.mutable_result();
111       grpc_status_code code =
112           static_cast<grpc_status_code>(call.status.error_code());
113       auto& num_rpcs = result[code];
114       ++num_rpcs;
115       auto rpcs_started = method_stat.rpcs_started();
116       method_stat.set_rpcs_started(++rpcs_started);
117     }
118     cv_.notify_one();
119   }
120 }
121 
WaitForRpcStatsResponse(int timeout_sec)122 LoadBalancerStatsResponse XdsStatsWatcher::WaitForRpcStatsResponse(
123     int timeout_sec) {
124   LoadBalancerStatsResponse response;
125   std::unique_lock<std::mutex> lock(m_);
126   cv_.wait_for(lock, std::chrono::seconds(timeout_sec),
127                [this] { return rpcs_needed_ == 0; });
128   response.mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
129                                           rpcs_by_peer_.end());
130   // Return metadata if at least one RPC had relevant metadata. Note that empty
131   // entries would be returned for RCPs with no relevant metadata in this case.
132   if (HasNonEmptyMetadata(metadata_by_peer_)) {
133     response.mutable_metadatas_by_peer()->insert(metadata_by_peer_.begin(),
134                                                  metadata_by_peer_.end());
135   }
136   auto& response_rpcs_by_method = *response.mutable_rpcs_by_method();
137   for (const auto& rpc_by_type : rpcs_by_type_) {
138     std::string method_name;
139     if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) {
140       method_name = "EmptyCall";
141     } else if (rpc_by_type.first == ClientConfigureRequest::UNARY_CALL) {
142       method_name = "UnaryCall";
143     } else {
144       CHECK(0);
145     }
146     // TODO(@donnadionne): When the test runner changes to accept EMPTY_CALL
147     // and UNARY_CALL we will just use the name of the enum instead of the
148     // method_name variable.
149     auto& response_rpc_by_method = response_rpcs_by_method[method_name];
150     auto& response_rpcs_by_peer =
151         *response_rpc_by_method.mutable_rpcs_by_peer();
152     for (const auto& rpc_by_peer : rpc_by_type.second) {
153       auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first];
154       response_rpc_by_peer = rpc_by_peer.second;
155     }
156   }
157   response.set_num_failures(no_remote_peer_ + rpcs_needed_);
158   return response;
159 }
160 
GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse * response,StatsWatchers * stats_watchers)161 void XdsStatsWatcher::GetCurrentRpcStats(
162     LoadBalancerAccumulatedStatsResponse* response,
163     StatsWatchers* stats_watchers) {
164   std::unique_lock<std::mutex> lock(m_);
165   response->CopyFrom(accumulated_stats_);
166   // TODO(someone): delete deprecated stats below when the test is no
167   // longer using them.
168   // NOLINTBEGIN(clang-diagnostic-deprecated-declarations)
169   auto& response_rpcs_started_by_method =
170       *response->mutable_num_rpcs_started_by_method();
171   auto& response_rpcs_succeeded_by_method =
172       *response->mutable_num_rpcs_succeeded_by_method();
173   auto& response_rpcs_failed_by_method =
174       *response->mutable_num_rpcs_failed_by_method();
175   // NOLINTEND(clang-diagnostic-deprecated-declarations)
176   for (const auto& rpc_by_type : rpcs_by_type_) {
177     auto total_succeeded = 0;
178     for (const auto& rpc_by_peer : rpc_by_type.second) {
179       total_succeeded += rpc_by_peer.second;
180     }
181     response_rpcs_succeeded_by_method[ClientConfigureRequest_RpcType_Name(
182         rpc_by_type.first)] = total_succeeded;
183     response_rpcs_started_by_method[ClientConfigureRequest_RpcType_Name(
184         rpc_by_type.first)] =
185         stats_watchers->global_request_id_by_type[rpc_by_type.first];
186     response_rpcs_failed_by_method[ClientConfigureRequest_RpcType_Name(
187         rpc_by_type.first)] = no_remote_peer_by_type_[rpc_by_type.first];
188   }
189 }
190 
191 }  // namespace testing
192 }  // namespace grpc
193