• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2017 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "test/cpp/end2end/xds/xds_server.h"
18 
19 #include <deque>
20 #include <set>
21 #include <string>
22 #include <thread>
23 #include <vector>
24 
25 #include "absl/log/check.h"
26 #include "absl/log/log.h"
27 #include "absl/types/optional.h"
28 #include "src/core/lib/address_utils/parse_address.h"
29 #include "src/core/util/crash.h"
30 #include "src/core/util/sync.h"
31 #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h"
32 #include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
33 #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h"
34 
35 namespace grpc {
36 namespace testing {
37 
38 //
39 // AdsServiceImpl
40 //
41 
SetResource(google::protobuf::Any resource,const std::string & type_url,const std::string & name)42 void AdsServiceImpl::SetResource(google::protobuf::Any resource,
43                                  const std::string& type_url,
44                                  const std::string& name) {
45   grpc_core::MutexLock lock(&ads_mu_);
46   ResourceTypeState& resource_type_state = resource_map_[type_url];
47   ++resource_type_state.resource_type_version;
48   ResourceState& resource_state = resource_type_state.resource_name_map[name];
49   resource_state.resource_type_version =
50       resource_type_state.resource_type_version;
51   resource_state.resource = std::move(resource);
52   LOG(INFO) << "ADS[" << debug_label_ << "]: Updating " << type_url
53             << " resource " << name << "; resource_type_version now "
54             << resource_type_state.resource_type_version;
55   for (SubscriptionState* subscription : resource_state.subscriptions) {
56     subscription->update_queue->emplace_back(type_url, name);
57   }
58 }
59 
UnsetResource(const std::string & type_url,const std::string & name)60 void AdsServiceImpl::UnsetResource(const std::string& type_url,
61                                    const std::string& name) {
62   grpc_core::MutexLock lock(&ads_mu_);
63   ResourceTypeState& resource_type_state = resource_map_[type_url];
64   ++resource_type_state.resource_type_version;
65   ResourceState& resource_state = resource_type_state.resource_name_map[name];
66   resource_state.resource_type_version =
67       resource_type_state.resource_type_version;
68   resource_state.resource.reset();
69   LOG(INFO) << "ADS[" << debug_label_ << "]: Unsetting " << type_url
70             << " resource " << name << "; resource_type_version now "
71             << resource_type_state.resource_type_version;
72   for (SubscriptionState* subscription : resource_state.subscriptions) {
73     subscription->update_queue->emplace_back(type_url, name);
74   }
75 }
76 
77 // Checks whether the client needs to receive a newer version of
78 // the resource.
ClientNeedsResourceUpdate(const ResourceTypeState & resource_type_state,const ResourceState & resource_state,int client_resource_type_version)79 bool AdsServiceImpl::ClientNeedsResourceUpdate(
80     const ResourceTypeState& resource_type_state,
81     const ResourceState& resource_state, int client_resource_type_version) {
82   return client_resource_type_version <
83              resource_type_state.resource_type_version &&
84          resource_state.resource_type_version <=
85              resource_type_state.resource_type_version;
86 }
87 
88 // Subscribes to a resource if not already subscribed:
89 // 1. Sets the update_queue field in subscription_state.
90 // 2. Adds subscription_state to resource_state->subscriptions.
MaybeSubscribe(const std::string & resource_type,const std::string & resource_name,SubscriptionState * subscription_state,ResourceState * resource_state,UpdateQueue * update_queue)91 bool AdsServiceImpl::MaybeSubscribe(const std::string& resource_type,
92                                     const std::string& resource_name,
93                                     SubscriptionState* subscription_state,
94                                     ResourceState* resource_state,
95                                     UpdateQueue* update_queue) {
96   // The update_queue will be null if we were not previously subscribed.
97   if (subscription_state->update_queue != nullptr) return false;
98   subscription_state->update_queue = update_queue;
99   resource_state->subscriptions.emplace(subscription_state);
100   LOG(INFO) << "ADS[" << debug_label_ << "]: subscribe to resource type "
101             << resource_type << " name " << resource_name << " state "
102             << &subscription_state;
103   return true;
104 }
105 
106 // Removes subscriptions for resources no longer present in the
107 // current request.
ProcessUnsubscriptions(const std::string & resource_type,const std::set<std::string> & resources_in_current_request,SubscriptionNameMap * subscription_name_map,ResourceNameMap * resource_name_map)108 void AdsServiceImpl::ProcessUnsubscriptions(
109     const std::string& resource_type,
110     const std::set<std::string>& resources_in_current_request,
111     SubscriptionNameMap* subscription_name_map,
112     ResourceNameMap* resource_name_map) {
113   for (auto it = subscription_name_map->begin();
114        it != subscription_name_map->end();) {
115     const std::string& resource_name = it->first;
116     SubscriptionState& subscription_state = it->second;
117     if (resources_in_current_request.find(resource_name) !=
118         resources_in_current_request.end()) {
119       ++it;
120       continue;
121     }
122     LOG(INFO) << "ADS[" << debug_label_
123               << "]: Unsubscribe to type=" << resource_type
124               << " name=" << resource_name << " state=" << &subscription_state;
125     auto resource_it = resource_name_map->find(resource_name);
126     CHECK(resource_it != resource_name_map->end());
127     auto& resource_state = resource_it->second;
128     resource_state.subscriptions.erase(&subscription_state);
129     if (resource_state.subscriptions.empty() &&
130         !resource_state.resource.has_value()) {
131       resource_name_map->erase(resource_it);
132     }
133     it = subscription_name_map->erase(it);
134   }
135 }
136 
Start()137 void AdsServiceImpl::Start() {
138   grpc_core::MutexLock lock(&ads_mu_);
139   ads_done_ = false;
140 }
141 
Shutdown()142 void AdsServiceImpl::Shutdown() {
143   {
144     grpc_core::MutexLock lock(&ads_mu_);
145     if (!ads_done_) {
146       ads_done_ = true;
147       ads_cond_.SignalAll();
148     }
149     resource_type_response_state_.clear();
150   }
151   LOG(INFO) << "ADS[" << debug_label_ << "]: shut down";
152 }
153 
154 //
155 // LrsServiceImpl::ClientStats
156 //
157 
total_successful_requests() const158 uint64_t LrsServiceImpl::ClientStats::total_successful_requests() const {
159   uint64_t sum = 0;
160   for (auto& p : locality_stats_) {
161     sum += p.second.total_successful_requests;
162   }
163   return sum;
164 }
165 
total_requests_in_progress() const166 uint64_t LrsServiceImpl::ClientStats::total_requests_in_progress() const {
167   uint64_t sum = 0;
168   for (auto& p : locality_stats_) {
169     sum += p.second.total_requests_in_progress;
170   }
171   return sum;
172 }
173 
total_error_requests() const174 uint64_t LrsServiceImpl::ClientStats::total_error_requests() const {
175   uint64_t sum = 0;
176   for (auto& p : locality_stats_) {
177     sum += p.second.total_error_requests;
178   }
179   return sum;
180 }
181 
total_issued_requests() const182 uint64_t LrsServiceImpl::ClientStats::total_issued_requests() const {
183   uint64_t sum = 0;
184   for (auto& p : locality_stats_) {
185     sum += p.second.total_issued_requests;
186   }
187   return sum;
188 }
189 
dropped_requests(const std::string & category) const190 uint64_t LrsServiceImpl::ClientStats::dropped_requests(
191     const std::string& category) const {
192   auto iter = dropped_requests_.find(category);
193   CHECK(iter != dropped_requests_.end());
194   return iter->second;
195 }
196 
operator +=(const ClientStats & other)197 LrsServiceImpl::ClientStats& LrsServiceImpl::ClientStats::operator+=(
198     const ClientStats& other) {
199   for (const auto& p : other.locality_stats_) {
200     locality_stats_[p.first] += p.second;
201   }
202   total_dropped_requests_ += other.total_dropped_requests_;
203   for (const auto& p : other.dropped_requests_) {
204     dropped_requests_[p.first] += p.second;
205   }
206   return *this;
207 }
208 
209 //
210 // LrsServiceImpl
211 //
212 
Start()213 void LrsServiceImpl::Start() {
214   {
215     grpc_core::MutexLock lock(&lrs_mu_);
216     lrs_done_ = false;
217   }
218   {
219     grpc_core::MutexLock lock(&load_report_mu_);
220     result_queue_.clear();
221   }
222 }
223 
Shutdown()224 void LrsServiceImpl::Shutdown() {
225   {
226     grpc_core::MutexLock lock(&lrs_mu_);
227     if (!lrs_done_) {
228       lrs_done_ = true;
229       lrs_cv_.SignalAll();
230     }
231   }
232   LOG(INFO) << "LRS[" << debug_label_ << "]: shut down";
233 }
234 
WaitForLoadReport(absl::Duration timeout)235 std::vector<LrsServiceImpl::ClientStats> LrsServiceImpl::WaitForLoadReport(
236     absl::Duration timeout) {
237   timeout *= grpc_test_slowdown_factor();
238   grpc_core::MutexLock lock(&load_report_mu_);
239   grpc_core::CondVar cv;
240   if (result_queue_.empty()) {
241     load_report_cond_ = &cv;
242     while (result_queue_.empty()) {
243       if (cv.WaitWithTimeout(&load_report_mu_, timeout)) {
244         LOG(ERROR) << "timed out waiting for load report";
245         return {};
246       }
247     }
248     load_report_cond_ = nullptr;
249   }
250   std::vector<ClientStats> result = std::move(result_queue_.front());
251   result_queue_.pop_front();
252   return result;
253 }
254 
255 }  // namespace testing
256 }  // namespace grpc
257