1 // 2 // 3 // Copyright 2023 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_TEST_CPP_INTEROP_XDS_STATS_WATCHER_H 20 #define GRPC_TEST_CPP_INTEROP_XDS_STATS_WATCHER_H 21 22 #include <grpcpp/grpcpp.h> 23 24 #include <atomic> 25 #include <chrono> 26 #include <condition_variable> 27 #include <deque> 28 #include <map> 29 #include <mutex> 30 #include <set> 31 #include <sstream> 32 #include <string> 33 #include <thread> 34 #include <unordered_set> 35 #include <vector> 36 37 #include "absl/status/status.h" 38 #include "absl/types/span.h" 39 #include "src/proto/grpc/testing/empty.pb.h" 40 #include "src/proto/grpc/testing/messages.pb.h" 41 42 namespace grpc { 43 namespace testing { 44 45 class XdsStatsWatcher; 46 47 struct AsyncClientCallResult { 48 Empty empty_response; 49 SimpleResponse simple_response; 50 Status status; 51 int saved_request_id; 52 ClientConfigureRequest::RpcType rpc_type; 53 }; 54 55 struct StatsWatchers { 56 // Unique ID for each outgoing RPC 57 int global_request_id = 0; 58 // Unique ID for each outgoing RPC by RPC method type 59 std::map<int, int> global_request_id_by_type; 60 // Stores a set of watchers that should be notified upon outgoing RPC 61 // completion 62 std::set<XdsStatsWatcher*> watchers; 63 // Global watcher for accumululated stats. 64 XdsStatsWatcher* global_watcher; 65 // Mutex for global_request_id and watchers 66 std::mutex mu; 67 }; 68 69 /// Records the remote peer distribution for a given range of RPCs. 70 class XdsStatsWatcher { 71 public: 72 XdsStatsWatcher(int start_id, int end_id, 73 absl::Span<const std::string> metadata_keys); 74 75 // Upon the completion of an RPC, we will look at the request_id, the 76 // rpc_type, and the peer the RPC was sent to in order to count 77 // this RPC into the right stats bin. 78 void RpcCompleted( 79 const AsyncClientCallResult& call, const std::string& peer, 80 const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata, 81 const std::multimap<grpc::string_ref, grpc::string_ref>& 82 trailing_metadata); 83 84 LoadBalancerStatsResponse WaitForRpcStatsResponse(int timeout_sec); 85 86 void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response, 87 StatsWatchers* stats_watchers); 88 89 private: 90 int start_id_; 91 int end_id_; 92 int rpcs_needed_; 93 int no_remote_peer_ = 0; 94 std::map<int, int> no_remote_peer_by_type_; 95 // A map of stats keyed by peer name. 96 std::map<std::string, int> rpcs_by_peer_; 97 // A two-level map of stats keyed at top level by RPC method and second level 98 // by peer name. 99 std::map<int, std::map<std::string, int>> rpcs_by_type_; 100 // Storing accumulated stats in the response proto format. 101 LoadBalancerAccumulatedStatsResponse accumulated_stats_; 102 std::mutex m_; 103 std::condition_variable cv_; 104 std::unordered_set<std::string> metadata_keys_; 105 bool include_all_metadata_ = false; 106 std::map<std::string, LoadBalancerStatsResponse::MetadataByPeer> 107 metadata_by_peer_; 108 }; 109 110 } // namespace testing 111 } // namespace grpc 112 113 #endif // GRPC_TEST_CPP_INTEROP_XDS_STATS_WATCHER_H 114