1 // 2 // Copyright 2017 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H 18 #define GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H 19 20 #include <grpcpp/support/status.h> 21 22 #include <deque> 23 #include <set> 24 #include <string> 25 #include <thread> 26 #include <vector> 27 28 #include "absl/log/check.h" 29 #include "absl/log/log.h" 30 #include "absl/types/optional.h" 31 #include "envoy/config/cluster/v3/cluster.pb.h" 32 #include "envoy/config/endpoint/v3/endpoint.pb.h" 33 #include "envoy/config/listener/v3/listener.pb.h" 34 #include "envoy/config/route/v3/route.pb.h" 35 #include "src/core/lib/address_utils/parse_address.h" 36 #include "src/core/util/crash.h" 37 #include "src/core/util/sync.h" 38 #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h" 39 #include "src/proto/grpc/testing/xds/v3/discovery.pb.h" 40 #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h" 41 #include "test/core/test_util/test_config.h" 42 #include "test/cpp/end2end/counted_service.h" 43 44 namespace grpc { 45 namespace testing { 46 47 constexpr char kLdsTypeUrl[] = 48 "type.googleapis.com/envoy.config.listener.v3.Listener"; 49 constexpr char kRdsTypeUrl[] = 50 "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"; 51 constexpr char kCdsTypeUrl[] = 52 "type.googleapis.com/envoy.config.cluster.v3.Cluster"; 53 constexpr char kEdsTypeUrl[] = 54 "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; 55 56 // An ADS service implementation. 57 class AdsServiceImpl 58 : public CountedService< 59 ::envoy::service::discovery::v3::AggregatedDiscoveryService::Service>, 60 public std::enable_shared_from_this<AdsServiceImpl> { 61 public: 62 using DiscoveryRequest = ::envoy::service::discovery::v3::DiscoveryRequest; 63 using DiscoveryResponse = ::envoy::service::discovery::v3::DiscoveryResponse; 64 65 // State for a given xDS resource type. 66 struct ResponseState { 67 enum State { 68 ACKED, // ACK received. 69 NACKED, // NACK received; error_message will contain the error. 70 }; 71 State state = ACKED; 72 std::string error_message; 73 }; 74 75 explicit AdsServiceImpl( 76 std::function<void(const DiscoveryRequest& request)> check_first_request = 77 nullptr, 78 std::function<void(absl::StatusCode)> check_nack_status_code = nullptr, 79 absl::string_view debug_label = "") check_first_request_(std::move (check_first_request))80 : check_first_request_(std::move(check_first_request)), 81 check_nack_status_code_(std::move(check_nack_status_code)), 82 debug_label_(absl::StrFormat( 83 "%p%s%s", this, debug_label.empty() ? "" : ":", debug_label)) {} 84 set_wrap_resources(bool wrap_resources)85 void set_wrap_resources(bool wrap_resources) { 86 grpc_core::MutexLock lock(&ads_mu_); 87 wrap_resources_ = wrap_resources; 88 } 89 90 // Sets a resource to a particular value, overwriting any previous value. 91 void SetResource(google::protobuf::Any resource, const std::string& type_url, 92 const std::string& name); 93 94 // Removes a resource from the server's state. 95 void UnsetResource(const std::string& type_url, const std::string& name); 96 SetLdsResource(const::envoy::config::listener::v3::Listener & listener)97 void SetLdsResource(const ::envoy::config::listener::v3::Listener& listener) { 98 google::protobuf::Any resource; 99 resource.PackFrom(listener); 100 SetResource(std::move(resource), kLdsTypeUrl, listener.name()); 101 } 102 SetRdsResource(const::envoy::config::route::v3::RouteConfiguration & route)103 void SetRdsResource( 104 const ::envoy::config::route::v3::RouteConfiguration& route) { 105 google::protobuf::Any resource; 106 resource.PackFrom(route); 107 SetResource(std::move(resource), kRdsTypeUrl, route.name()); 108 } 109 SetCdsResource(const::envoy::config::cluster::v3::Cluster & cluster)110 void SetCdsResource(const ::envoy::config::cluster::v3::Cluster& cluster) { 111 google::protobuf::Any resource; 112 resource.PackFrom(cluster); 113 SetResource(std::move(resource), kCdsTypeUrl, cluster.name()); 114 } 115 SetEdsResource(const::envoy::config::endpoint::v3::ClusterLoadAssignment & assignment)116 void SetEdsResource( 117 const ::envoy::config::endpoint::v3::ClusterLoadAssignment& assignment) { 118 google::protobuf::Any resource; 119 resource.PackFrom(assignment); 120 SetResource(std::move(resource), kEdsTypeUrl, assignment.cluster_name()); 121 } 122 123 // Tells the server to ignore requests from the client for a given 124 // resource type. IgnoreResourceType(const std::string & type_url)125 void IgnoreResourceType(const std::string& type_url) { 126 grpc_core::MutexLock lock(&ads_mu_); 127 resource_types_to_ignore_.emplace(type_url); 128 } 129 130 // Sets a callback to be invoked on request messages with respoonse_nonce 131 // set. The callback is passed the resource type and version. SetCheckVersionCallback(std::function<void (absl::string_view,int)> check_version_callback)132 void SetCheckVersionCallback( 133 std::function<void(absl::string_view, int)> check_version_callback) { 134 grpc_core::MutexLock lock(&ads_mu_); 135 check_version_callback_ = std::move(check_version_callback); 136 } 137 138 // Get the list of response state for each resource type. 139 // TODO(roth): Consider adding an absl::Notification-based mechanism 140 // here to avoid the need for tests to poll the response state. GetResponseState(const std::string & type_url)141 absl::optional<ResponseState> GetResponseState(const std::string& type_url) { 142 grpc_core::MutexLock lock(&ads_mu_); 143 if (resource_type_response_state_[type_url].empty()) { 144 return absl::nullopt; 145 } 146 auto response = resource_type_response_state_[type_url].front(); 147 resource_type_response_state_[type_url].pop_front(); 148 return response; 149 } lds_response_state()150 absl::optional<ResponseState> lds_response_state() { 151 return GetResponseState(kLdsTypeUrl); 152 } rds_response_state()153 absl::optional<ResponseState> rds_response_state() { 154 return GetResponseState(kRdsTypeUrl); 155 } cds_response_state()156 absl::optional<ResponseState> cds_response_state() { 157 return GetResponseState(kCdsTypeUrl); 158 } eds_response_state()159 absl::optional<ResponseState> eds_response_state() { 160 return GetResponseState(kEdsTypeUrl); 161 } 162 163 // Starts the service. 164 void Start(); 165 166 // Shuts down the service. 167 void Shutdown(); 168 169 // Returns the peer names of clients currently connected to the service. clients()170 std::set<std::string> clients() { 171 grpc_core::MutexLock lock(&clients_mu_); 172 return clients_; 173 } 174 ForceADSFailure(Status status)175 void ForceADSFailure(Status status) { 176 grpc_core::MutexLock lock(&ads_mu_); 177 forced_ads_failure_ = std::move(status); 178 } 179 ClearADSFailure()180 void ClearADSFailure() { 181 grpc_core::MutexLock lock(&ads_mu_); 182 forced_ads_failure_ = absl::nullopt; 183 } 184 185 private: 186 // A queue of resource type/name pairs that have changed since the client 187 // subscribed to them. 188 using UpdateQueue = std::deque< 189 std::pair<std::string /* type url */, std::string /* resource name */>>; 190 191 // A struct representing a client's subscription to a particular resource. 192 struct SubscriptionState { 193 // The queue upon which to place updates when the resource is updated. 194 UpdateQueue* update_queue; 195 }; 196 197 // A struct representing the a client's subscription to all the resources. 198 using SubscriptionNameMap = 199 std::map<std::string /* resource_name */, SubscriptionState>; 200 using SubscriptionMap = 201 std::map<std::string /* type_url */, SubscriptionNameMap>; 202 203 // Sent state for a given resource type. 204 struct SentState { 205 int nonce = 0; 206 int resource_type_version = 0; 207 }; 208 209 // A struct representing the current state for an individual resource. 210 struct ResourceState { 211 // The resource itself, if present. 212 absl::optional<google::protobuf::Any> resource; 213 // The resource type version that this resource was last updated in. 214 int resource_type_version = 0; 215 // A list of subscriptions to this resource. 216 std::set<SubscriptionState*> subscriptions; 217 }; 218 219 // The current state for all individual resources of a given type. 220 using ResourceNameMap = 221 std::map<std::string /* resource_name */, ResourceState>; 222 223 struct ResourceTypeState { 224 int resource_type_version = 0; 225 ResourceNameMap resource_name_map; 226 }; 227 228 using ResourceMap = std::map<std::string /* type_url */, ResourceTypeState>; 229 230 using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>; 231 StreamAggregatedResources(ServerContext * context,Stream * stream)232 Status StreamAggregatedResources(ServerContext* context, 233 Stream* stream) override { 234 LOG(INFO) << "ADS[" << debug_label_ 235 << "]: StreamAggregatedResources starts"; 236 { 237 grpc_core::MutexLock lock(&ads_mu_); 238 if (forced_ads_failure_.has_value()) { 239 LOG(INFO) << "ADS[" << debug_label_ 240 << "]: StreamAggregatedResources forcing early failure " 241 "with status code: " 242 << forced_ads_failure_.value().error_code() << ", message: " 243 << forced_ads_failure_.value().error_message(); 244 return forced_ads_failure_.value(); 245 } 246 } 247 AddClient(context->peer()); 248 // Take a reference of the AdsServiceImpl object, which will go 249 // out of scope when this request handler returns. This ensures 250 // that the parent won't be destroyed until this stream is complete. 251 std::shared_ptr<AdsServiceImpl> ads_service_impl = shared_from_this(); 252 // Resources (type/name pairs) that have changed since the client 253 // subscribed to them. 254 UpdateQueue update_queue; 255 // Resources that the client will be subscribed to keyed by resource type 256 // url. 257 SubscriptionMap subscription_map; 258 // Sent state for each resource type. 259 std::map<std::string /*type_url*/, SentState> sent_state_map; 260 // Spawn a thread to read requests from the stream. 261 // Requests will be delivered to this thread in a queue. 262 std::deque<DiscoveryRequest> requests; 263 bool stream_closed = false; 264 std::thread reader(std::bind(&AdsServiceImpl::BlockingRead, this, stream, 265 &requests, &stream_closed)); 266 // Main loop to process requests and updates. 267 while (true) { 268 // Boolean to keep track if the loop received any work to do: a 269 // request or an update; regardless whether a response was actually 270 // sent out. 271 bool did_work = false; 272 // Look for new requests and decide what to handle. 273 absl::optional<DiscoveryResponse> response; 274 { 275 grpc_core::MutexLock lock(&ads_mu_); 276 // If the stream has been closed or our parent is being shut 277 // down, stop immediately. 278 if (stream_closed || ads_done_) break; 279 // Otherwise, see if there's a request to read from the queue. 280 if (!requests.empty()) { 281 DiscoveryRequest request = std::move(requests.front()); 282 requests.pop_front(); 283 did_work = true; 284 LOG(INFO) << "ADS[" << debug_label_ << "]: Received request for type " 285 << request.type_url() << " with content " 286 << request.DebugString(); 287 SentState& sent_state = sent_state_map[request.type_url()]; 288 // Process request. 289 ProcessRequest(request, &update_queue, &subscription_map, &sent_state, 290 &response); 291 } 292 } 293 if (response.has_value()) { 294 LOG(INFO) << "ADS[" << debug_label_ 295 << "]: Sending response: " << response->DebugString(); 296 stream->Write(response.value()); 297 } 298 response.reset(); 299 // Look for updates and decide what to handle. 300 { 301 grpc_core::MutexLock lock(&ads_mu_); 302 if (!update_queue.empty()) { 303 const std::string resource_type = 304 std::move(update_queue.front().first); 305 const std::string resource_name = 306 std::move(update_queue.front().second); 307 update_queue.pop_front(); 308 did_work = true; 309 SentState& sent_state = sent_state_map[resource_type]; 310 ProcessUpdate(resource_type, resource_name, &subscription_map, 311 &sent_state, &response); 312 } 313 } 314 if (response.has_value()) { 315 LOG(INFO) << "ADS[" << debug_label_ 316 << "]: Sending update response: " << response->DebugString(); 317 stream->Write(response.value()); 318 } 319 { 320 grpc_core::MutexLock lock(&ads_mu_); 321 if (ads_done_) { 322 break; 323 } 324 } 325 // If we didn't find anything to do, delay before the next loop 326 // iteration; otherwise, check whether we should exit and then 327 // immediately continue. 328 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(did_work ? 0 : 10)); 329 } 330 // Done with main loop. Clean up before returning. 331 // Join reader thread. 332 reader.join(); 333 // Clean up any subscriptions that were still active when the call 334 // finished. 335 { 336 grpc_core::MutexLock lock(&ads_mu_); 337 for (auto& p : subscription_map) { 338 const std::string& type_url = p.first; 339 SubscriptionNameMap& subscription_name_map = p.second; 340 for (auto& q : subscription_name_map) { 341 const std::string& resource_name = q.first; 342 SubscriptionState& subscription_state = q.second; 343 ResourceNameMap& resource_name_map = 344 resource_map_[type_url].resource_name_map; 345 ResourceState& resource_state = resource_name_map[resource_name]; 346 resource_state.subscriptions.erase(&subscription_state); 347 } 348 } 349 } 350 LOG(INFO) << "ADS[" << debug_label_ << "]: StreamAggregatedResources done"; 351 RemoveClient(context->peer()); 352 return Status::OK; 353 } 354 355 // Processes a response read from the client. 356 // Populates response if needed. ProcessRequest(const DiscoveryRequest & request,UpdateQueue * update_queue,SubscriptionMap * subscription_map,SentState * sent_state,absl::optional<DiscoveryResponse> * response)357 void ProcessRequest(const DiscoveryRequest& request, 358 UpdateQueue* update_queue, 359 SubscriptionMap* subscription_map, SentState* sent_state, 360 absl::optional<DiscoveryResponse>* response) 361 ABSL_EXCLUSIVE_LOCKS_REQUIRED(ads_mu_) { 362 // Check the nonce sent by the client, if any. 363 // (This will be absent on the first request on a stream.) 364 if (request.response_nonce().empty()) { 365 int client_resource_type_version = 0; 366 if (!request.version_info().empty()) { 367 CHECK(absl::SimpleAtoi(request.version_info(), 368 &client_resource_type_version)); 369 } 370 if (check_version_callback_ != nullptr) { 371 check_version_callback_(request.type_url(), 372 client_resource_type_version); 373 } 374 } else { 375 int client_nonce; 376 CHECK(absl::SimpleAtoi(request.response_nonce(), &client_nonce)); 377 // Check for ACK or NACK. 378 ResponseState response_state; 379 if (!request.has_error_detail()) { 380 response_state.state = ResponseState::ACKED; 381 LOG(INFO) << "ADS[" << debug_label_ 382 << "]: client ACKed resource_type=" << request.type_url() 383 << " version=" << request.version_info(); 384 } else { 385 response_state.state = ResponseState::NACKED; 386 if (check_nack_status_code_ != nullptr) { 387 check_nack_status_code_( 388 static_cast<absl::StatusCode>(request.error_detail().code())); 389 } 390 response_state.error_message = request.error_detail().message(); 391 LOG(INFO) << "ADS[" << debug_label_ 392 << "]: client NACKed resource_type=" << request.type_url() 393 << " version=" << request.version_info() << ": " 394 << response_state.error_message; 395 } 396 resource_type_response_state_[request.type_url()].emplace_back( 397 std::move(response_state)); 398 // Ignore requests with stale nonces. 399 if (client_nonce < sent_state->nonce) return; 400 } 401 // Ignore resource types as requested by tests. 402 if (resource_types_to_ignore_.find(request.type_url()) != 403 resource_types_to_ignore_.end()) { 404 return; 405 } 406 // Look at all the resource names in the request. 407 auto& subscription_name_map = (*subscription_map)[request.type_url()]; 408 auto& resource_type_state = resource_map_[request.type_url()]; 409 auto& resource_name_map = resource_type_state.resource_name_map; 410 std::set<std::string> resources_in_current_request; 411 std::set<std::string> resources_added_to_response; 412 for (const std::string& resource_name : request.resource_names()) { 413 resources_in_current_request.emplace(resource_name); 414 auto& subscription_state = subscription_name_map[resource_name]; 415 auto& resource_state = resource_name_map[resource_name]; 416 // Subscribe if needed. 417 // Send the resource in the response if either (a) this is 418 // a new subscription or (b) there is an updated version of 419 // this resource to send. 420 if (MaybeSubscribe(request.type_url(), resource_name, &subscription_state, 421 &resource_state, update_queue) || 422 ClientNeedsResourceUpdate(resource_type_state, resource_state, 423 sent_state->resource_type_version)) { 424 LOG(INFO) << "ADS[" << debug_label_ 425 << "]: Sending update for type=" << request.type_url() 426 << " name=" << resource_name; 427 resources_added_to_response.emplace(resource_name); 428 if (!response->has_value()) response->emplace(); 429 if (resource_state.resource.has_value()) { 430 auto* resource = (*response)->add_resources(); 431 resource->CopyFrom(resource_state.resource.value()); 432 if (wrap_resources_) { 433 envoy::service::discovery::v3::Resource resource_wrapper; 434 *resource_wrapper.mutable_resource() = std::move(*resource); 435 resource->PackFrom(resource_wrapper); 436 } 437 } 438 } else { 439 LOG(INFO) << "ADS[" << debug_label_ 440 << "]: client does not need update for type=" 441 << request.type_url() << " name=" << resource_name; 442 } 443 } 444 // Process unsubscriptions for any resource no longer 445 // present in the request's resource list. 446 ProcessUnsubscriptions(request.type_url(), resources_in_current_request, 447 &subscription_name_map, &resource_name_map); 448 // Construct response if needed. 449 if (!resources_added_to_response.empty()) { 450 CompleteBuildingDiscoveryResponse( 451 request.type_url(), resource_type_state.resource_type_version, 452 subscription_name_map, resources_added_to_response, sent_state, 453 &response->value()); 454 } 455 } 456 457 // Processes a resource update from the test. 458 // Populates response if needed. ProcessUpdate(const std::string & resource_type,const std::string & resource_name,SubscriptionMap * subscription_map,SentState * sent_state,absl::optional<DiscoveryResponse> * response)459 void ProcessUpdate(const std::string& resource_type, 460 const std::string& resource_name, 461 SubscriptionMap* subscription_map, SentState* sent_state, 462 absl::optional<DiscoveryResponse>* response) 463 ABSL_EXCLUSIVE_LOCKS_REQUIRED(ads_mu_) { 464 LOG(INFO) << "ADS[" << debug_label_ 465 << "]: Received update for type=" << resource_type 466 << " name=" << resource_name; 467 auto& subscription_name_map = (*subscription_map)[resource_type]; 468 auto& resource_type_state = resource_map_[resource_type]; 469 auto& resource_name_map = resource_type_state.resource_name_map; 470 auto it = subscription_name_map.find(resource_name); 471 if (it != subscription_name_map.end()) { 472 ResourceState& resource_state = resource_name_map[resource_name]; 473 if (ClientNeedsResourceUpdate(resource_type_state, resource_state, 474 sent_state->resource_type_version)) { 475 LOG(INFO) << "ADS[" << debug_label_ 476 << "]: Sending update for type=" << resource_type 477 << " name=" << resource_name; 478 response->emplace(); 479 if (resource_state.resource.has_value()) { 480 auto* resource = (*response)->add_resources(); 481 resource->CopyFrom(resource_state.resource.value()); 482 } 483 CompleteBuildingDiscoveryResponse( 484 resource_type, resource_type_state.resource_type_version, 485 subscription_name_map, {resource_name}, sent_state, 486 &response->value()); 487 } 488 } 489 } 490 491 // Starting a thread to do blocking read on the stream until cancel. BlockingRead(Stream * stream,std::deque<DiscoveryRequest> * requests,bool * stream_closed)492 void BlockingRead(Stream* stream, std::deque<DiscoveryRequest>* requests, 493 bool* stream_closed) { 494 DiscoveryRequest request; 495 bool seen_first_request = false; 496 while (stream->Read(&request)) { 497 if (!seen_first_request) { 498 if (check_first_request_ != nullptr) { 499 check_first_request_(request); 500 } 501 seen_first_request = true; 502 } 503 { 504 grpc_core::MutexLock lock(&ads_mu_); 505 requests->emplace_back(std::move(request)); 506 } 507 } 508 LOG(INFO) << "ADS[" << debug_label_ << "]: Null read, stream closed"; 509 grpc_core::MutexLock lock(&ads_mu_); 510 *stream_closed = true; 511 } 512 513 // Completing the building a DiscoveryResponse by adding common information 514 // for all resources and by adding all subscribed resources for LDS and CDS. CompleteBuildingDiscoveryResponse(const std::string & resource_type,const int version,const SubscriptionNameMap & subscription_name_map,const std::set<std::string> & resources_added_to_response,SentState * sent_state,DiscoveryResponse * response)515 void CompleteBuildingDiscoveryResponse( 516 const std::string& resource_type, const int version, 517 const SubscriptionNameMap& subscription_name_map, 518 const std::set<std::string>& resources_added_to_response, 519 SentState* sent_state, DiscoveryResponse* response) 520 ABSL_EXCLUSIVE_LOCKS_REQUIRED(ads_mu_) { 521 response->set_type_url(resource_type); 522 response->set_version_info(std::to_string(version)); 523 response->set_nonce(std::to_string(++sent_state->nonce)); 524 if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) { 525 // For LDS and CDS we must send back all subscribed resources 526 // (even the unchanged ones) 527 for (const auto& p : subscription_name_map) { 528 const std::string& resource_name = p.first; 529 if (resources_added_to_response.find(resource_name) == 530 resources_added_to_response.end()) { 531 ResourceNameMap& resource_name_map = 532 resource_map_[resource_type].resource_name_map; 533 const ResourceState& resource_state = 534 resource_name_map[resource_name]; 535 if (resource_state.resource.has_value()) { 536 auto* resource = response->add_resources(); 537 resource->CopyFrom(resource_state.resource.value()); 538 } 539 } 540 } 541 } 542 sent_state->resource_type_version = version; 543 } 544 545 // Checks whether the client needs to receive a newer version of 546 // the resource. 547 static bool ClientNeedsResourceUpdate( 548 const ResourceTypeState& resource_type_state, 549 const ResourceState& resource_state, int client_resource_type_version); 550 551 // Subscribes to a resource if not already subscribed: 552 // 1. Sets the update_queue field in subscription_state. 553 // 2. Adds subscription_state to resource_state->subscriptions. 554 bool MaybeSubscribe(const std::string& resource_type, 555 const std::string& resource_name, 556 SubscriptionState* subscription_state, 557 ResourceState* resource_state, UpdateQueue* update_queue); 558 559 // Removes subscriptions for resources no longer present in the 560 // current request. 561 void ProcessUnsubscriptions( 562 const std::string& resource_type, 563 const std::set<std::string>& resources_in_current_request, 564 SubscriptionNameMap* subscription_name_map, 565 ResourceNameMap* resource_name_map); 566 AddClient(const std::string & client)567 void AddClient(const std::string& client) { 568 grpc_core::MutexLock lock(&clients_mu_); 569 clients_.insert(client); 570 } 571 RemoveClient(const std::string & client)572 void RemoveClient(const std::string& client) { 573 grpc_core::MutexLock lock(&clients_mu_); 574 clients_.erase(client); 575 } 576 577 std::function<void(const DiscoveryRequest& request)> check_first_request_; 578 std::function<void(absl::StatusCode)> check_nack_status_code_; 579 std::string debug_label_; 580 581 grpc_core::CondVar ads_cond_; 582 grpc_core::Mutex ads_mu_; 583 bool ads_done_ ABSL_GUARDED_BY(ads_mu_) = false; 584 std::map<std::string /* type_url */, std::deque<ResponseState>> 585 resource_type_response_state_ ABSL_GUARDED_BY(ads_mu_); 586 std::set<std::string /*resource_type*/> resource_types_to_ignore_ 587 ABSL_GUARDED_BY(ads_mu_); 588 std::function<void(absl::string_view, int)> check_version_callback_ 589 ABSL_GUARDED_BY(ads_mu_); 590 // An instance data member containing the current state of all resources. 591 // Note that an entry will exist whenever either of the following is true: 592 // - The resource exists (i.e., has been created by SetResource() and has not 593 // yet been destroyed by UnsetResource()). 594 // - There is at least one subscription for the resource. 595 ResourceMap resource_map_ ABSL_GUARDED_BY(ads_mu_); 596 absl::optional<Status> forced_ads_failure_ ABSL_GUARDED_BY(ads_mu_); 597 bool wrap_resources_ ABSL_GUARDED_BY(ads_mu_) = false; 598 599 grpc_core::Mutex clients_mu_; 600 std::set<std::string> clients_ ABSL_GUARDED_BY(clients_mu_); 601 }; 602 603 // An LRS service implementation. 604 class LrsServiceImpl 605 : public CountedService< 606 ::envoy::service::load_stats::v3::LoadReportingService::Service>, 607 public std::enable_shared_from_this<LrsServiceImpl> { 608 public: 609 using LoadStatsRequest = ::envoy::service::load_stats::v3::LoadStatsRequest; 610 using LoadStatsResponse = ::envoy::service::load_stats::v3::LoadStatsResponse; 611 612 // Stats reported by client. 613 class ClientStats { 614 public: 615 // Stats for a given locality. 616 struct LocalityStats { 617 struct LoadMetric { 618 uint64_t num_requests_finished_with_metric = 0; 619 double total_metric_value = 0; 620 621 LoadMetric() = default; 622 623 // Works for both EndpointLoadMetricStats and 624 // UnnamedEndpointLoadMetricStats. 625 template <typename T> LoadMetricLocalityStats::LoadMetric626 explicit LoadMetric(const T& stats) 627 : num_requests_finished_with_metric( 628 stats.num_requests_finished_with_metric()), 629 total_metric_value(stats.total_metric_value()) {} 630 631 LoadMetric& operator+=(const LoadMetric& other) { 632 num_requests_finished_with_metric += 633 other.num_requests_finished_with_metric; 634 total_metric_value += other.total_metric_value; 635 return *this; 636 } 637 }; 638 LocalityStatsLocalityStats639 LocalityStats() {} 640 641 // Converts from proto message class. LocalityStatsLocalityStats642 explicit LocalityStats( 643 const ::envoy::config::endpoint::v3::UpstreamLocalityStats& 644 upstream_locality_stats) 645 : total_successful_requests( 646 upstream_locality_stats.total_successful_requests()), 647 total_requests_in_progress( 648 upstream_locality_stats.total_requests_in_progress()), 649 total_error_requests( 650 upstream_locality_stats.total_error_requests()), 651 total_issued_requests( 652 upstream_locality_stats.total_issued_requests()), 653 cpu_utilization(upstream_locality_stats.cpu_utilization()), 654 mem_utilization(upstream_locality_stats.mem_utilization()), 655 application_utilization( 656 upstream_locality_stats.application_utilization()) { 657 for (const auto& s : upstream_locality_stats.load_metric_stats()) { 658 load_metrics[s.metric_name()] += LoadMetric(s); 659 } 660 } 661 662 LocalityStats& operator+=(const LocalityStats& other) { 663 total_successful_requests += other.total_successful_requests; 664 total_requests_in_progress += other.total_requests_in_progress; 665 total_error_requests += other.total_error_requests; 666 total_issued_requests += other.total_issued_requests; 667 cpu_utilization += other.cpu_utilization; 668 mem_utilization += other.mem_utilization; 669 application_utilization += other.application_utilization; 670 for (const auto& p : other.load_metrics) { 671 load_metrics[p.first] += p.second; 672 } 673 return *this; 674 } 675 676 uint64_t total_successful_requests = 0; 677 uint64_t total_requests_in_progress = 0; 678 uint64_t total_error_requests = 0; 679 uint64_t total_issued_requests = 0; 680 LoadMetric cpu_utilization; 681 LoadMetric mem_utilization; 682 LoadMetric application_utilization; 683 std::map<std::string, LoadMetric> load_metrics; 684 }; 685 ClientStats()686 ClientStats() {} 687 688 // Converts from proto message class. ClientStats(const::envoy::config::endpoint::v3::ClusterStats & cluster_stats)689 explicit ClientStats( 690 const ::envoy::config::endpoint::v3::ClusterStats& cluster_stats) 691 : cluster_name_(cluster_stats.cluster_name()), 692 eds_service_name_(cluster_stats.cluster_service_name()), 693 total_dropped_requests_(cluster_stats.total_dropped_requests()) { 694 for (const auto& input_locality_stats : 695 cluster_stats.upstream_locality_stats()) { 696 locality_stats_.emplace(input_locality_stats.locality().sub_zone(), 697 LocalityStats(input_locality_stats)); 698 } 699 for (const auto& input_dropped_requests : 700 cluster_stats.dropped_requests()) { 701 dropped_requests_.emplace(input_dropped_requests.category(), 702 input_dropped_requests.dropped_count()); 703 } 704 } 705 cluster_name()706 const std::string& cluster_name() const { return cluster_name_; } eds_service_name()707 const std::string& eds_service_name() const { return eds_service_name_; } 708 locality_stats()709 const std::map<std::string, LocalityStats>& locality_stats() const { 710 return locality_stats_; 711 } 712 713 uint64_t total_successful_requests() const; 714 uint64_t total_requests_in_progress() const; 715 uint64_t total_error_requests() const; 716 uint64_t total_issued_requests() const; 717 total_dropped_requests()718 uint64_t total_dropped_requests() const { return total_dropped_requests_; } 719 720 uint64_t dropped_requests(const std::string& category) const; 721 722 ClientStats& operator+=(const ClientStats& other); 723 724 private: 725 std::string cluster_name_; 726 std::string eds_service_name_; 727 std::map<std::string, LocalityStats> locality_stats_; 728 uint64_t total_dropped_requests_ = 0; 729 std::map<std::string, uint64_t> dropped_requests_; 730 }; 731 732 LrsServiceImpl(int client_load_reporting_interval_seconds, 733 std::set<std::string> cluster_names, 734 std::function<void()> stream_started_callback = nullptr, 735 std::function<void(const LoadStatsRequest& request)> 736 check_first_request = nullptr, 737 absl::string_view debug_label = "") client_load_reporting_interval_seconds_(client_load_reporting_interval_seconds)738 : client_load_reporting_interval_seconds_( 739 client_load_reporting_interval_seconds), 740 cluster_names_(std::move(cluster_names)), 741 stream_started_callback_(std::move(stream_started_callback)), 742 check_first_request_(std::move(check_first_request)), 743 debug_label_(absl::StrFormat( 744 "%p%s%s", this, debug_label.empty() ? "" : ":", debug_label)) {} 745 746 // Must be called before the LRS call is started. set_send_all_clusters(bool send_all_clusters)747 void set_send_all_clusters(bool send_all_clusters) { 748 send_all_clusters_ = send_all_clusters; 749 } set_cluster_names(const std::set<std::string> & cluster_names)750 void set_cluster_names(const std::set<std::string>& cluster_names) { 751 cluster_names_ = cluster_names; 752 } 753 754 void Start() ABSL_LOCKS_EXCLUDED(lrs_mu_, load_report_mu_); 755 756 void Shutdown(); 757 758 // Returns an empty vector if the timeout elapses with no load report. 759 // TODO(roth): Change the default here to a finite duration and verify 760 // that it doesn't cause failures in any existing tests. 761 std::vector<ClientStats> WaitForLoadReport( 762 absl::Duration timeout = absl::InfiniteDuration()); 763 764 private: 765 using Stream = ServerReaderWriter<LoadStatsResponse, LoadStatsRequest>; 766 StreamLoadStats(ServerContext *,Stream * stream)767 Status StreamLoadStats(ServerContext* /*context*/, Stream* stream) override { 768 LOG(INFO) << "LRS[" << debug_label_ << "]: StreamLoadStats starts"; 769 if (stream_started_callback_ != nullptr) stream_started_callback_(); 770 // Take a reference of the LrsServiceImpl object, reference will go 771 // out of scope after this method exits. 772 std::shared_ptr<LrsServiceImpl> lrs_service_impl = shared_from_this(); 773 // Read initial request. 774 LoadStatsRequest request; 775 if (stream->Read(&request)) { 776 IncreaseRequestCount(); 777 if (check_first_request_ != nullptr) check_first_request_(request); 778 // Send initial response. 779 LoadStatsResponse response; 780 if (send_all_clusters_) { 781 response.set_send_all_clusters(true); 782 } else { 783 for (const std::string& cluster_name : cluster_names_) { 784 response.add_clusters(cluster_name); 785 } 786 } 787 response.mutable_load_reporting_interval()->set_seconds( 788 client_load_reporting_interval_seconds_ * 789 grpc_test_slowdown_factor()); 790 stream->Write(response); 791 IncreaseResponseCount(); 792 // Wait for report. 793 request.Clear(); 794 while (stream->Read(&request)) { 795 LOG(INFO) << "LRS[" << debug_label_ 796 << "]: received client load report message: " 797 << request.DebugString(); 798 std::vector<ClientStats> stats; 799 for (const auto& cluster_stats : request.cluster_stats()) { 800 stats.emplace_back(cluster_stats); 801 } 802 grpc_core::MutexLock lock(&load_report_mu_); 803 result_queue_.emplace_back(std::move(stats)); 804 if (load_report_cond_ != nullptr) { 805 load_report_cond_->Signal(); 806 } 807 } 808 // Wait until notified done. 809 grpc_core::MutexLock lock(&lrs_mu_); 810 while (!lrs_done_) { 811 lrs_cv_.Wait(&lrs_mu_); 812 } 813 } 814 LOG(INFO) << "LRS[" << debug_label_ << "]: StreamLoadStats done"; 815 return Status::OK; 816 } 817 818 const int client_load_reporting_interval_seconds_; 819 bool send_all_clusters_ = false; 820 std::set<std::string> cluster_names_; 821 std::function<void()> stream_started_callback_; 822 std::function<void(const LoadStatsRequest& request)> check_first_request_; 823 std::string debug_label_; 824 825 grpc_core::CondVar lrs_cv_; 826 grpc_core::Mutex lrs_mu_; 827 bool lrs_done_ ABSL_GUARDED_BY(lrs_mu_) = false; 828 829 grpc_core::Mutex load_report_mu_; 830 grpc_core::CondVar* load_report_cond_ ABSL_GUARDED_BY(load_report_mu_) = 831 nullptr; 832 std::deque<std::vector<ClientStats>> result_queue_ 833 ABSL_GUARDED_BY(load_report_mu_); 834 }; 835 836 } // namespace testing 837 } // namespace grpc 838 839 #endif // GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H 840