• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2017 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H
18 #define GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H
19 
20 #include <grpcpp/support/status.h>
21 
22 #include <deque>
23 #include <set>
24 #include <string>
25 #include <thread>
26 #include <vector>
27 
28 #include "absl/log/check.h"
29 #include "absl/log/log.h"
30 #include "absl/types/optional.h"
31 #include "envoy/config/cluster/v3/cluster.pb.h"
32 #include "envoy/config/endpoint/v3/endpoint.pb.h"
33 #include "envoy/config/listener/v3/listener.pb.h"
34 #include "envoy/config/route/v3/route.pb.h"
35 #include "src/core/lib/address_utils/parse_address.h"
36 #include "src/core/util/crash.h"
37 #include "src/core/util/sync.h"
38 #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h"
39 #include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
40 #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h"
41 #include "test/core/test_util/test_config.h"
42 #include "test/cpp/end2end/counted_service.h"
43 
44 namespace grpc {
45 namespace testing {
46 
47 constexpr char kLdsTypeUrl[] =
48     "type.googleapis.com/envoy.config.listener.v3.Listener";
49 constexpr char kRdsTypeUrl[] =
50     "type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
51 constexpr char kCdsTypeUrl[] =
52     "type.googleapis.com/envoy.config.cluster.v3.Cluster";
53 constexpr char kEdsTypeUrl[] =
54     "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
55 
56 // An ADS service implementation.
57 class AdsServiceImpl
58     : public CountedService<
59           ::envoy::service::discovery::v3::AggregatedDiscoveryService::Service>,
60       public std::enable_shared_from_this<AdsServiceImpl> {
61  public:
62   using DiscoveryRequest = ::envoy::service::discovery::v3::DiscoveryRequest;
63   using DiscoveryResponse = ::envoy::service::discovery::v3::DiscoveryResponse;
64 
65   // State for a given xDS resource type.
66   struct ResponseState {
67     enum State {
68       ACKED,   // ACK received.
69       NACKED,  // NACK received; error_message will contain the error.
70     };
71     State state = ACKED;
72     std::string error_message;
73   };
74 
75   explicit AdsServiceImpl(
76       std::function<void(const DiscoveryRequest& request)> check_first_request =
77           nullptr,
78       std::function<void(absl::StatusCode)> check_nack_status_code = nullptr,
79       absl::string_view debug_label = "")
check_first_request_(std::move (check_first_request))80       : check_first_request_(std::move(check_first_request)),
81         check_nack_status_code_(std::move(check_nack_status_code)),
82         debug_label_(absl::StrFormat(
83             "%p%s%s", this, debug_label.empty() ? "" : ":", debug_label)) {}
84 
set_wrap_resources(bool wrap_resources)85   void set_wrap_resources(bool wrap_resources) {
86     grpc_core::MutexLock lock(&ads_mu_);
87     wrap_resources_ = wrap_resources;
88   }
89 
90   // Sets a resource to a particular value, overwriting any previous value.
91   void SetResource(google::protobuf::Any resource, const std::string& type_url,
92                    const std::string& name);
93 
94   // Removes a resource from the server's state.
95   void UnsetResource(const std::string& type_url, const std::string& name);
96 
SetLdsResource(const::envoy::config::listener::v3::Listener & listener)97   void SetLdsResource(const ::envoy::config::listener::v3::Listener& listener) {
98     google::protobuf::Any resource;
99     resource.PackFrom(listener);
100     SetResource(std::move(resource), kLdsTypeUrl, listener.name());
101   }
102 
SetRdsResource(const::envoy::config::route::v3::RouteConfiguration & route)103   void SetRdsResource(
104       const ::envoy::config::route::v3::RouteConfiguration& route) {
105     google::protobuf::Any resource;
106     resource.PackFrom(route);
107     SetResource(std::move(resource), kRdsTypeUrl, route.name());
108   }
109 
SetCdsResource(const::envoy::config::cluster::v3::Cluster & cluster)110   void SetCdsResource(const ::envoy::config::cluster::v3::Cluster& cluster) {
111     google::protobuf::Any resource;
112     resource.PackFrom(cluster);
113     SetResource(std::move(resource), kCdsTypeUrl, cluster.name());
114   }
115 
SetEdsResource(const::envoy::config::endpoint::v3::ClusterLoadAssignment & assignment)116   void SetEdsResource(
117       const ::envoy::config::endpoint::v3::ClusterLoadAssignment& assignment) {
118     google::protobuf::Any resource;
119     resource.PackFrom(assignment);
120     SetResource(std::move(resource), kEdsTypeUrl, assignment.cluster_name());
121   }
122 
123   // Tells the server to ignore requests from the client for a given
124   // resource type.
IgnoreResourceType(const std::string & type_url)125   void IgnoreResourceType(const std::string& type_url) {
126     grpc_core::MutexLock lock(&ads_mu_);
127     resource_types_to_ignore_.emplace(type_url);
128   }
129 
130   // Sets a callback to be invoked on request messages with respoonse_nonce
131   // set.  The callback is passed the resource type and version.
SetCheckVersionCallback(std::function<void (absl::string_view,int)> check_version_callback)132   void SetCheckVersionCallback(
133       std::function<void(absl::string_view, int)> check_version_callback) {
134     grpc_core::MutexLock lock(&ads_mu_);
135     check_version_callback_ = std::move(check_version_callback);
136   }
137 
138   // Get the list of response state for each resource type.
139   // TODO(roth): Consider adding an absl::Notification-based mechanism
140   // here to avoid the need for tests to poll the response state.
GetResponseState(const std::string & type_url)141   absl::optional<ResponseState> GetResponseState(const std::string& type_url) {
142     grpc_core::MutexLock lock(&ads_mu_);
143     if (resource_type_response_state_[type_url].empty()) {
144       return absl::nullopt;
145     }
146     auto response = resource_type_response_state_[type_url].front();
147     resource_type_response_state_[type_url].pop_front();
148     return response;
149   }
lds_response_state()150   absl::optional<ResponseState> lds_response_state() {
151     return GetResponseState(kLdsTypeUrl);
152   }
rds_response_state()153   absl::optional<ResponseState> rds_response_state() {
154     return GetResponseState(kRdsTypeUrl);
155   }
cds_response_state()156   absl::optional<ResponseState> cds_response_state() {
157     return GetResponseState(kCdsTypeUrl);
158   }
eds_response_state()159   absl::optional<ResponseState> eds_response_state() {
160     return GetResponseState(kEdsTypeUrl);
161   }
162 
163   // Starts the service.
164   void Start();
165 
166   // Shuts down the service.
167   void Shutdown();
168 
169   // Returns the peer names of clients currently connected to the service.
clients()170   std::set<std::string> clients() {
171     grpc_core::MutexLock lock(&clients_mu_);
172     return clients_;
173   }
174 
ForceADSFailure(Status status)175   void ForceADSFailure(Status status) {
176     grpc_core::MutexLock lock(&ads_mu_);
177     forced_ads_failure_ = std::move(status);
178   }
179 
ClearADSFailure()180   void ClearADSFailure() {
181     grpc_core::MutexLock lock(&ads_mu_);
182     forced_ads_failure_ = absl::nullopt;
183   }
184 
185  private:
186   // A queue of resource type/name pairs that have changed since the client
187   // subscribed to them.
188   using UpdateQueue = std::deque<
189       std::pair<std::string /* type url */, std::string /* resource name */>>;
190 
191   // A struct representing a client's subscription to a particular resource.
192   struct SubscriptionState {
193     // The queue upon which to place updates when the resource is updated.
194     UpdateQueue* update_queue;
195   };
196 
197   // A struct representing the a client's subscription to all the resources.
198   using SubscriptionNameMap =
199       std::map<std::string /* resource_name */, SubscriptionState>;
200   using SubscriptionMap =
201       std::map<std::string /* type_url */, SubscriptionNameMap>;
202 
203   // Sent state for a given resource type.
204   struct SentState {
205     int nonce = 0;
206     int resource_type_version = 0;
207   };
208 
209   // A struct representing the current state for an individual resource.
210   struct ResourceState {
211     // The resource itself, if present.
212     absl::optional<google::protobuf::Any> resource;
213     // The resource type version that this resource was last updated in.
214     int resource_type_version = 0;
215     // A list of subscriptions to this resource.
216     std::set<SubscriptionState*> subscriptions;
217   };
218 
219   // The current state for all individual resources of a given type.
220   using ResourceNameMap =
221       std::map<std::string /* resource_name */, ResourceState>;
222 
223   struct ResourceTypeState {
224     int resource_type_version = 0;
225     ResourceNameMap resource_name_map;
226   };
227 
228   using ResourceMap = std::map<std::string /* type_url */, ResourceTypeState>;
229 
230   using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>;
231 
StreamAggregatedResources(ServerContext * context,Stream * stream)232   Status StreamAggregatedResources(ServerContext* context,
233                                    Stream* stream) override {
234     LOG(INFO) << "ADS[" << debug_label_
235               << "]: StreamAggregatedResources starts";
236     {
237       grpc_core::MutexLock lock(&ads_mu_);
238       if (forced_ads_failure_.has_value()) {
239         LOG(INFO) << "ADS[" << debug_label_
240                   << "]: StreamAggregatedResources forcing early failure "
241                      "with status code: "
242                   << forced_ads_failure_.value().error_code() << ", message: "
243                   << forced_ads_failure_.value().error_message();
244         return forced_ads_failure_.value();
245       }
246     }
247     AddClient(context->peer());
248     // Take a reference of the AdsServiceImpl object, which will go
249     // out of scope when this request handler returns.  This ensures
250     // that the parent won't be destroyed until this stream is complete.
251     std::shared_ptr<AdsServiceImpl> ads_service_impl = shared_from_this();
252     // Resources (type/name pairs) that have changed since the client
253     // subscribed to them.
254     UpdateQueue update_queue;
255     // Resources that the client will be subscribed to keyed by resource type
256     // url.
257     SubscriptionMap subscription_map;
258     // Sent state for each resource type.
259     std::map<std::string /*type_url*/, SentState> sent_state_map;
260     // Spawn a thread to read requests from the stream.
261     // Requests will be delivered to this thread in a queue.
262     std::deque<DiscoveryRequest> requests;
263     bool stream_closed = false;
264     std::thread reader(std::bind(&AdsServiceImpl::BlockingRead, this, stream,
265                                  &requests, &stream_closed));
266     // Main loop to process requests and updates.
267     while (true) {
268       // Boolean to keep track if the loop received any work to do: a
269       // request or an update; regardless whether a response was actually
270       // sent out.
271       bool did_work = false;
272       // Look for new requests and decide what to handle.
273       absl::optional<DiscoveryResponse> response;
274       {
275         grpc_core::MutexLock lock(&ads_mu_);
276         // If the stream has been closed or our parent is being shut
277         // down, stop immediately.
278         if (stream_closed || ads_done_) break;
279         // Otherwise, see if there's a request to read from the queue.
280         if (!requests.empty()) {
281           DiscoveryRequest request = std::move(requests.front());
282           requests.pop_front();
283           did_work = true;
284           LOG(INFO) << "ADS[" << debug_label_ << "]: Received request for type "
285                     << request.type_url() << " with content "
286                     << request.DebugString();
287           SentState& sent_state = sent_state_map[request.type_url()];
288           // Process request.
289           ProcessRequest(request, &update_queue, &subscription_map, &sent_state,
290                          &response);
291         }
292       }
293       if (response.has_value()) {
294         LOG(INFO) << "ADS[" << debug_label_
295                   << "]: Sending response: " << response->DebugString();
296         stream->Write(response.value());
297       }
298       response.reset();
299       // Look for updates and decide what to handle.
300       {
301         grpc_core::MutexLock lock(&ads_mu_);
302         if (!update_queue.empty()) {
303           const std::string resource_type =
304               std::move(update_queue.front().first);
305           const std::string resource_name =
306               std::move(update_queue.front().second);
307           update_queue.pop_front();
308           did_work = true;
309           SentState& sent_state = sent_state_map[resource_type];
310           ProcessUpdate(resource_type, resource_name, &subscription_map,
311                         &sent_state, &response);
312         }
313       }
314       if (response.has_value()) {
315         LOG(INFO) << "ADS[" << debug_label_
316                   << "]: Sending update response: " << response->DebugString();
317         stream->Write(response.value());
318       }
319       {
320         grpc_core::MutexLock lock(&ads_mu_);
321         if (ads_done_) {
322           break;
323         }
324       }
325       // If we didn't find anything to do, delay before the next loop
326       // iteration; otherwise, check whether we should exit and then
327       // immediately continue.
328       gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(did_work ? 0 : 10));
329     }
330     // Done with main loop.  Clean up before returning.
331     // Join reader thread.
332     reader.join();
333     // Clean up any subscriptions that were still active when the call
334     // finished.
335     {
336       grpc_core::MutexLock lock(&ads_mu_);
337       for (auto& p : subscription_map) {
338         const std::string& type_url = p.first;
339         SubscriptionNameMap& subscription_name_map = p.second;
340         for (auto& q : subscription_name_map) {
341           const std::string& resource_name = q.first;
342           SubscriptionState& subscription_state = q.second;
343           ResourceNameMap& resource_name_map =
344               resource_map_[type_url].resource_name_map;
345           ResourceState& resource_state = resource_name_map[resource_name];
346           resource_state.subscriptions.erase(&subscription_state);
347         }
348       }
349     }
350     LOG(INFO) << "ADS[" << debug_label_ << "]: StreamAggregatedResources done";
351     RemoveClient(context->peer());
352     return Status::OK;
353   }
354 
355   // Processes a response read from the client.
356   // Populates response if needed.
ProcessRequest(const DiscoveryRequest & request,UpdateQueue * update_queue,SubscriptionMap * subscription_map,SentState * sent_state,absl::optional<DiscoveryResponse> * response)357   void ProcessRequest(const DiscoveryRequest& request,
358                       UpdateQueue* update_queue,
359                       SubscriptionMap* subscription_map, SentState* sent_state,
360                       absl::optional<DiscoveryResponse>* response)
361       ABSL_EXCLUSIVE_LOCKS_REQUIRED(ads_mu_) {
362     // Check the nonce sent by the client, if any.
363     // (This will be absent on the first request on a stream.)
364     if (request.response_nonce().empty()) {
365       int client_resource_type_version = 0;
366       if (!request.version_info().empty()) {
367         CHECK(absl::SimpleAtoi(request.version_info(),
368                                &client_resource_type_version));
369       }
370       if (check_version_callback_ != nullptr) {
371         check_version_callback_(request.type_url(),
372                                 client_resource_type_version);
373       }
374     } else {
375       int client_nonce;
376       CHECK(absl::SimpleAtoi(request.response_nonce(), &client_nonce));
377       // Check for ACK or NACK.
378       ResponseState response_state;
379       if (!request.has_error_detail()) {
380         response_state.state = ResponseState::ACKED;
381         LOG(INFO) << "ADS[" << debug_label_
382                   << "]: client ACKed resource_type=" << request.type_url()
383                   << " version=" << request.version_info();
384       } else {
385         response_state.state = ResponseState::NACKED;
386         if (check_nack_status_code_ != nullptr) {
387           check_nack_status_code_(
388               static_cast<absl::StatusCode>(request.error_detail().code()));
389         }
390         response_state.error_message = request.error_detail().message();
391         LOG(INFO) << "ADS[" << debug_label_
392                   << "]: client NACKed resource_type=" << request.type_url()
393                   << " version=" << request.version_info() << ": "
394                   << response_state.error_message;
395       }
396       resource_type_response_state_[request.type_url()].emplace_back(
397           std::move(response_state));
398       // Ignore requests with stale nonces.
399       if (client_nonce < sent_state->nonce) return;
400     }
401     // Ignore resource types as requested by tests.
402     if (resource_types_to_ignore_.find(request.type_url()) !=
403         resource_types_to_ignore_.end()) {
404       return;
405     }
406     // Look at all the resource names in the request.
407     auto& subscription_name_map = (*subscription_map)[request.type_url()];
408     auto& resource_type_state = resource_map_[request.type_url()];
409     auto& resource_name_map = resource_type_state.resource_name_map;
410     std::set<std::string> resources_in_current_request;
411     std::set<std::string> resources_added_to_response;
412     for (const std::string& resource_name : request.resource_names()) {
413       resources_in_current_request.emplace(resource_name);
414       auto& subscription_state = subscription_name_map[resource_name];
415       auto& resource_state = resource_name_map[resource_name];
416       // Subscribe if needed.
417       // Send the resource in the response if either (a) this is
418       // a new subscription or (b) there is an updated version of
419       // this resource to send.
420       if (MaybeSubscribe(request.type_url(), resource_name, &subscription_state,
421                          &resource_state, update_queue) ||
422           ClientNeedsResourceUpdate(resource_type_state, resource_state,
423                                     sent_state->resource_type_version)) {
424         LOG(INFO) << "ADS[" << debug_label_
425                   << "]: Sending update for type=" << request.type_url()
426                   << " name=" << resource_name;
427         resources_added_to_response.emplace(resource_name);
428         if (!response->has_value()) response->emplace();
429         if (resource_state.resource.has_value()) {
430           auto* resource = (*response)->add_resources();
431           resource->CopyFrom(resource_state.resource.value());
432           if (wrap_resources_) {
433             envoy::service::discovery::v3::Resource resource_wrapper;
434             *resource_wrapper.mutable_resource() = std::move(*resource);
435             resource->PackFrom(resource_wrapper);
436           }
437         }
438       } else {
439         LOG(INFO) << "ADS[" << debug_label_
440                   << "]: client does not need update for type="
441                   << request.type_url() << " name=" << resource_name;
442       }
443     }
444     // Process unsubscriptions for any resource no longer
445     // present in the request's resource list.
446     ProcessUnsubscriptions(request.type_url(), resources_in_current_request,
447                            &subscription_name_map, &resource_name_map);
448     // Construct response if needed.
449     if (!resources_added_to_response.empty()) {
450       CompleteBuildingDiscoveryResponse(
451           request.type_url(), resource_type_state.resource_type_version,
452           subscription_name_map, resources_added_to_response, sent_state,
453           &response->value());
454     }
455   }
456 
457   // Processes a resource update from the test.
458   // Populates response if needed.
ProcessUpdate(const std::string & resource_type,const std::string & resource_name,SubscriptionMap * subscription_map,SentState * sent_state,absl::optional<DiscoveryResponse> * response)459   void ProcessUpdate(const std::string& resource_type,
460                      const std::string& resource_name,
461                      SubscriptionMap* subscription_map, SentState* sent_state,
462                      absl::optional<DiscoveryResponse>* response)
463       ABSL_EXCLUSIVE_LOCKS_REQUIRED(ads_mu_) {
464     LOG(INFO) << "ADS[" << debug_label_
465               << "]: Received update for type=" << resource_type
466               << " name=" << resource_name;
467     auto& subscription_name_map = (*subscription_map)[resource_type];
468     auto& resource_type_state = resource_map_[resource_type];
469     auto& resource_name_map = resource_type_state.resource_name_map;
470     auto it = subscription_name_map.find(resource_name);
471     if (it != subscription_name_map.end()) {
472       ResourceState& resource_state = resource_name_map[resource_name];
473       if (ClientNeedsResourceUpdate(resource_type_state, resource_state,
474                                     sent_state->resource_type_version)) {
475         LOG(INFO) << "ADS[" << debug_label_
476                   << "]: Sending update for type=" << resource_type
477                   << " name=" << resource_name;
478         response->emplace();
479         if (resource_state.resource.has_value()) {
480           auto* resource = (*response)->add_resources();
481           resource->CopyFrom(resource_state.resource.value());
482         }
483         CompleteBuildingDiscoveryResponse(
484             resource_type, resource_type_state.resource_type_version,
485             subscription_name_map, {resource_name}, sent_state,
486             &response->value());
487       }
488     }
489   }
490 
491   // Starting a thread to do blocking read on the stream until cancel.
BlockingRead(Stream * stream,std::deque<DiscoveryRequest> * requests,bool * stream_closed)492   void BlockingRead(Stream* stream, std::deque<DiscoveryRequest>* requests,
493                     bool* stream_closed) {
494     DiscoveryRequest request;
495     bool seen_first_request = false;
496     while (stream->Read(&request)) {
497       if (!seen_first_request) {
498         if (check_first_request_ != nullptr) {
499           check_first_request_(request);
500         }
501         seen_first_request = true;
502       }
503       {
504         grpc_core::MutexLock lock(&ads_mu_);
505         requests->emplace_back(std::move(request));
506       }
507     }
508     LOG(INFO) << "ADS[" << debug_label_ << "]: Null read, stream closed";
509     grpc_core::MutexLock lock(&ads_mu_);
510     *stream_closed = true;
511   }
512 
513   // Completing the building a DiscoveryResponse by adding common information
514   // for all resources and by adding all subscribed resources for LDS and CDS.
CompleteBuildingDiscoveryResponse(const std::string & resource_type,const int version,const SubscriptionNameMap & subscription_name_map,const std::set<std::string> & resources_added_to_response,SentState * sent_state,DiscoveryResponse * response)515   void CompleteBuildingDiscoveryResponse(
516       const std::string& resource_type, const int version,
517       const SubscriptionNameMap& subscription_name_map,
518       const std::set<std::string>& resources_added_to_response,
519       SentState* sent_state, DiscoveryResponse* response)
520       ABSL_EXCLUSIVE_LOCKS_REQUIRED(ads_mu_) {
521     response->set_type_url(resource_type);
522     response->set_version_info(std::to_string(version));
523     response->set_nonce(std::to_string(++sent_state->nonce));
524     if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) {
525       // For LDS and CDS we must send back all subscribed resources
526       // (even the unchanged ones)
527       for (const auto& p : subscription_name_map) {
528         const std::string& resource_name = p.first;
529         if (resources_added_to_response.find(resource_name) ==
530             resources_added_to_response.end()) {
531           ResourceNameMap& resource_name_map =
532               resource_map_[resource_type].resource_name_map;
533           const ResourceState& resource_state =
534               resource_name_map[resource_name];
535           if (resource_state.resource.has_value()) {
536             auto* resource = response->add_resources();
537             resource->CopyFrom(resource_state.resource.value());
538           }
539         }
540       }
541     }
542     sent_state->resource_type_version = version;
543   }
544 
545   // Checks whether the client needs to receive a newer version of
546   // the resource.
547   static bool ClientNeedsResourceUpdate(
548       const ResourceTypeState& resource_type_state,
549       const ResourceState& resource_state, int client_resource_type_version);
550 
551   // Subscribes to a resource if not already subscribed:
552   // 1. Sets the update_queue field in subscription_state.
553   // 2. Adds subscription_state to resource_state->subscriptions.
554   bool MaybeSubscribe(const std::string& resource_type,
555                       const std::string& resource_name,
556                       SubscriptionState* subscription_state,
557                       ResourceState* resource_state, UpdateQueue* update_queue);
558 
559   // Removes subscriptions for resources no longer present in the
560   // current request.
561   void ProcessUnsubscriptions(
562       const std::string& resource_type,
563       const std::set<std::string>& resources_in_current_request,
564       SubscriptionNameMap* subscription_name_map,
565       ResourceNameMap* resource_name_map);
566 
AddClient(const std::string & client)567   void AddClient(const std::string& client) {
568     grpc_core::MutexLock lock(&clients_mu_);
569     clients_.insert(client);
570   }
571 
RemoveClient(const std::string & client)572   void RemoveClient(const std::string& client) {
573     grpc_core::MutexLock lock(&clients_mu_);
574     clients_.erase(client);
575   }
576 
577   std::function<void(const DiscoveryRequest& request)> check_first_request_;
578   std::function<void(absl::StatusCode)> check_nack_status_code_;
579   std::string debug_label_;
580 
581   grpc_core::CondVar ads_cond_;
582   grpc_core::Mutex ads_mu_;
583   bool ads_done_ ABSL_GUARDED_BY(ads_mu_) = false;
584   std::map<std::string /* type_url */, std::deque<ResponseState>>
585       resource_type_response_state_ ABSL_GUARDED_BY(ads_mu_);
586   std::set<std::string /*resource_type*/> resource_types_to_ignore_
587       ABSL_GUARDED_BY(ads_mu_);
588   std::function<void(absl::string_view, int)> check_version_callback_
589       ABSL_GUARDED_BY(ads_mu_);
590   // An instance data member containing the current state of all resources.
591   // Note that an entry will exist whenever either of the following is true:
592   // - The resource exists (i.e., has been created by SetResource() and has not
593   //   yet been destroyed by UnsetResource()).
594   // - There is at least one subscription for the resource.
595   ResourceMap resource_map_ ABSL_GUARDED_BY(ads_mu_);
596   absl::optional<Status> forced_ads_failure_ ABSL_GUARDED_BY(ads_mu_);
597   bool wrap_resources_ ABSL_GUARDED_BY(ads_mu_) = false;
598 
599   grpc_core::Mutex clients_mu_;
600   std::set<std::string> clients_ ABSL_GUARDED_BY(clients_mu_);
601 };
602 
603 // An LRS service implementation.
604 class LrsServiceImpl
605     : public CountedService<
606           ::envoy::service::load_stats::v3::LoadReportingService::Service>,
607       public std::enable_shared_from_this<LrsServiceImpl> {
608  public:
609   using LoadStatsRequest = ::envoy::service::load_stats::v3::LoadStatsRequest;
610   using LoadStatsResponse = ::envoy::service::load_stats::v3::LoadStatsResponse;
611 
612   // Stats reported by client.
613   class ClientStats {
614    public:
615     // Stats for a given locality.
616     struct LocalityStats {
617       struct LoadMetric {
618         uint64_t num_requests_finished_with_metric = 0;
619         double total_metric_value = 0;
620 
621         LoadMetric() = default;
622 
623         // Works for both EndpointLoadMetricStats and
624         // UnnamedEndpointLoadMetricStats.
625         template <typename T>
LoadMetricLocalityStats::LoadMetric626         explicit LoadMetric(const T& stats)
627             : num_requests_finished_with_metric(
628                   stats.num_requests_finished_with_metric()),
629               total_metric_value(stats.total_metric_value()) {}
630 
631         LoadMetric& operator+=(const LoadMetric& other) {
632           num_requests_finished_with_metric +=
633               other.num_requests_finished_with_metric;
634           total_metric_value += other.total_metric_value;
635           return *this;
636         }
637       };
638 
LocalityStatsLocalityStats639       LocalityStats() {}
640 
641       // Converts from proto message class.
LocalityStatsLocalityStats642       explicit LocalityStats(
643           const ::envoy::config::endpoint::v3::UpstreamLocalityStats&
644               upstream_locality_stats)
645           : total_successful_requests(
646                 upstream_locality_stats.total_successful_requests()),
647             total_requests_in_progress(
648                 upstream_locality_stats.total_requests_in_progress()),
649             total_error_requests(
650                 upstream_locality_stats.total_error_requests()),
651             total_issued_requests(
652                 upstream_locality_stats.total_issued_requests()),
653             cpu_utilization(upstream_locality_stats.cpu_utilization()),
654             mem_utilization(upstream_locality_stats.mem_utilization()),
655             application_utilization(
656                 upstream_locality_stats.application_utilization()) {
657         for (const auto& s : upstream_locality_stats.load_metric_stats()) {
658           load_metrics[s.metric_name()] += LoadMetric(s);
659         }
660       }
661 
662       LocalityStats& operator+=(const LocalityStats& other) {
663         total_successful_requests += other.total_successful_requests;
664         total_requests_in_progress += other.total_requests_in_progress;
665         total_error_requests += other.total_error_requests;
666         total_issued_requests += other.total_issued_requests;
667         cpu_utilization += other.cpu_utilization;
668         mem_utilization += other.mem_utilization;
669         application_utilization += other.application_utilization;
670         for (const auto& p : other.load_metrics) {
671           load_metrics[p.first] += p.second;
672         }
673         return *this;
674       }
675 
676       uint64_t total_successful_requests = 0;
677       uint64_t total_requests_in_progress = 0;
678       uint64_t total_error_requests = 0;
679       uint64_t total_issued_requests = 0;
680       LoadMetric cpu_utilization;
681       LoadMetric mem_utilization;
682       LoadMetric application_utilization;
683       std::map<std::string, LoadMetric> load_metrics;
684     };
685 
ClientStats()686     ClientStats() {}
687 
688     // Converts from proto message class.
ClientStats(const::envoy::config::endpoint::v3::ClusterStats & cluster_stats)689     explicit ClientStats(
690         const ::envoy::config::endpoint::v3::ClusterStats& cluster_stats)
691         : cluster_name_(cluster_stats.cluster_name()),
692           eds_service_name_(cluster_stats.cluster_service_name()),
693           total_dropped_requests_(cluster_stats.total_dropped_requests()) {
694       for (const auto& input_locality_stats :
695            cluster_stats.upstream_locality_stats()) {
696         locality_stats_.emplace(input_locality_stats.locality().sub_zone(),
697                                 LocalityStats(input_locality_stats));
698       }
699       for (const auto& input_dropped_requests :
700            cluster_stats.dropped_requests()) {
701         dropped_requests_.emplace(input_dropped_requests.category(),
702                                   input_dropped_requests.dropped_count());
703       }
704     }
705 
cluster_name()706     const std::string& cluster_name() const { return cluster_name_; }
eds_service_name()707     const std::string& eds_service_name() const { return eds_service_name_; }
708 
locality_stats()709     const std::map<std::string, LocalityStats>& locality_stats() const {
710       return locality_stats_;
711     }
712 
713     uint64_t total_successful_requests() const;
714     uint64_t total_requests_in_progress() const;
715     uint64_t total_error_requests() const;
716     uint64_t total_issued_requests() const;
717 
total_dropped_requests()718     uint64_t total_dropped_requests() const { return total_dropped_requests_; }
719 
720     uint64_t dropped_requests(const std::string& category) const;
721 
722     ClientStats& operator+=(const ClientStats& other);
723 
724    private:
725     std::string cluster_name_;
726     std::string eds_service_name_;
727     std::map<std::string, LocalityStats> locality_stats_;
728     uint64_t total_dropped_requests_ = 0;
729     std::map<std::string, uint64_t> dropped_requests_;
730   };
731 
732   LrsServiceImpl(int client_load_reporting_interval_seconds,
733                  std::set<std::string> cluster_names,
734                  std::function<void()> stream_started_callback = nullptr,
735                  std::function<void(const LoadStatsRequest& request)>
736                      check_first_request = nullptr,
737                  absl::string_view debug_label = "")
client_load_reporting_interval_seconds_(client_load_reporting_interval_seconds)738       : client_load_reporting_interval_seconds_(
739             client_load_reporting_interval_seconds),
740         cluster_names_(std::move(cluster_names)),
741         stream_started_callback_(std::move(stream_started_callback)),
742         check_first_request_(std::move(check_first_request)),
743         debug_label_(absl::StrFormat(
744             "%p%s%s", this, debug_label.empty() ? "" : ":", debug_label)) {}
745 
746   // Must be called before the LRS call is started.
set_send_all_clusters(bool send_all_clusters)747   void set_send_all_clusters(bool send_all_clusters) {
748     send_all_clusters_ = send_all_clusters;
749   }
set_cluster_names(const std::set<std::string> & cluster_names)750   void set_cluster_names(const std::set<std::string>& cluster_names) {
751     cluster_names_ = cluster_names;
752   }
753 
754   void Start() ABSL_LOCKS_EXCLUDED(lrs_mu_, load_report_mu_);
755 
756   void Shutdown();
757 
758   // Returns an empty vector if the timeout elapses with no load report.
759   // TODO(roth): Change the default here to a finite duration and verify
760   // that it doesn't cause failures in any existing tests.
761   std::vector<ClientStats> WaitForLoadReport(
762       absl::Duration timeout = absl::InfiniteDuration());
763 
764  private:
765   using Stream = ServerReaderWriter<LoadStatsResponse, LoadStatsRequest>;
766 
StreamLoadStats(ServerContext *,Stream * stream)767   Status StreamLoadStats(ServerContext* /*context*/, Stream* stream) override {
768     LOG(INFO) << "LRS[" << debug_label_ << "]: StreamLoadStats starts";
769     if (stream_started_callback_ != nullptr) stream_started_callback_();
770     // Take a reference of the LrsServiceImpl object, reference will go
771     // out of scope after this method exits.
772     std::shared_ptr<LrsServiceImpl> lrs_service_impl = shared_from_this();
773     // Read initial request.
774     LoadStatsRequest request;
775     if (stream->Read(&request)) {
776       IncreaseRequestCount();
777       if (check_first_request_ != nullptr) check_first_request_(request);
778       // Send initial response.
779       LoadStatsResponse response;
780       if (send_all_clusters_) {
781         response.set_send_all_clusters(true);
782       } else {
783         for (const std::string& cluster_name : cluster_names_) {
784           response.add_clusters(cluster_name);
785         }
786       }
787       response.mutable_load_reporting_interval()->set_seconds(
788           client_load_reporting_interval_seconds_ *
789           grpc_test_slowdown_factor());
790       stream->Write(response);
791       IncreaseResponseCount();
792       // Wait for report.
793       request.Clear();
794       while (stream->Read(&request)) {
795         LOG(INFO) << "LRS[" << debug_label_
796                   << "]: received client load report message: "
797                   << request.DebugString();
798         std::vector<ClientStats> stats;
799         for (const auto& cluster_stats : request.cluster_stats()) {
800           stats.emplace_back(cluster_stats);
801         }
802         grpc_core::MutexLock lock(&load_report_mu_);
803         result_queue_.emplace_back(std::move(stats));
804         if (load_report_cond_ != nullptr) {
805           load_report_cond_->Signal();
806         }
807       }
808       // Wait until notified done.
809       grpc_core::MutexLock lock(&lrs_mu_);
810       while (!lrs_done_) {
811         lrs_cv_.Wait(&lrs_mu_);
812       }
813     }
814     LOG(INFO) << "LRS[" << debug_label_ << "]: StreamLoadStats done";
815     return Status::OK;
816   }
817 
818   const int client_load_reporting_interval_seconds_;
819   bool send_all_clusters_ = false;
820   std::set<std::string> cluster_names_;
821   std::function<void()> stream_started_callback_;
822   std::function<void(const LoadStatsRequest& request)> check_first_request_;
823   std::string debug_label_;
824 
825   grpc_core::CondVar lrs_cv_;
826   grpc_core::Mutex lrs_mu_;
827   bool lrs_done_ ABSL_GUARDED_BY(lrs_mu_) = false;
828 
829   grpc_core::Mutex load_report_mu_;
830   grpc_core::CondVar* load_report_cond_ ABSL_GUARDED_BY(load_report_mu_) =
831       nullptr;
832   std::deque<std::vector<ClientStats>> result_queue_
833       ABSL_GUARDED_BY(load_report_mu_);
834 };
835 
836 }  // namespace testing
837 }  // namespace grpc
838 
839 #endif  // GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H
840