1 //
2 // Copyright 2017 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "test/cpp/end2end/xds/xds_server.h"
18
19 #include <deque>
20 #include <set>
21 #include <string>
22 #include <thread>
23 #include <vector>
24
25 #include "absl/log/check.h"
26 #include "absl/log/log.h"
27 #include "absl/types/optional.h"
28 #include "src/core/lib/address_utils/parse_address.h"
29 #include "src/core/util/crash.h"
30 #include "src/core/util/sync.h"
31 #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h"
32 #include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
33 #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h"
34
35 namespace grpc {
36 namespace testing {
37
38 //
39 // AdsServiceImpl
40 //
41
SetResource(google::protobuf::Any resource,const std::string & type_url,const std::string & name)42 void AdsServiceImpl::SetResource(google::protobuf::Any resource,
43 const std::string& type_url,
44 const std::string& name) {
45 grpc_core::MutexLock lock(&ads_mu_);
46 ResourceTypeState& resource_type_state = resource_map_[type_url];
47 ++resource_type_state.resource_type_version;
48 ResourceState& resource_state = resource_type_state.resource_name_map[name];
49 resource_state.resource_type_version =
50 resource_type_state.resource_type_version;
51 resource_state.resource = std::move(resource);
52 LOG(INFO) << "ADS[" << debug_label_ << "]: Updating " << type_url
53 << " resource " << name << "; resource_type_version now "
54 << resource_type_state.resource_type_version;
55 for (SubscriptionState* subscription : resource_state.subscriptions) {
56 subscription->update_queue->emplace_back(type_url, name);
57 }
58 }
59
UnsetResource(const std::string & type_url,const std::string & name)60 void AdsServiceImpl::UnsetResource(const std::string& type_url,
61 const std::string& name) {
62 grpc_core::MutexLock lock(&ads_mu_);
63 ResourceTypeState& resource_type_state = resource_map_[type_url];
64 ++resource_type_state.resource_type_version;
65 ResourceState& resource_state = resource_type_state.resource_name_map[name];
66 resource_state.resource_type_version =
67 resource_type_state.resource_type_version;
68 resource_state.resource.reset();
69 LOG(INFO) << "ADS[" << debug_label_ << "]: Unsetting " << type_url
70 << " resource " << name << "; resource_type_version now "
71 << resource_type_state.resource_type_version;
72 for (SubscriptionState* subscription : resource_state.subscriptions) {
73 subscription->update_queue->emplace_back(type_url, name);
74 }
75 }
76
77 // Checks whether the client needs to receive a newer version of
78 // the resource.
ClientNeedsResourceUpdate(const ResourceTypeState & resource_type_state,const ResourceState & resource_state,int client_resource_type_version)79 bool AdsServiceImpl::ClientNeedsResourceUpdate(
80 const ResourceTypeState& resource_type_state,
81 const ResourceState& resource_state, int client_resource_type_version) {
82 return client_resource_type_version <
83 resource_type_state.resource_type_version &&
84 resource_state.resource_type_version <=
85 resource_type_state.resource_type_version;
86 }
87
88 // Subscribes to a resource if not already subscribed:
89 // 1. Sets the update_queue field in subscription_state.
90 // 2. Adds subscription_state to resource_state->subscriptions.
MaybeSubscribe(const std::string & resource_type,const std::string & resource_name,SubscriptionState * subscription_state,ResourceState * resource_state,UpdateQueue * update_queue)91 bool AdsServiceImpl::MaybeSubscribe(const std::string& resource_type,
92 const std::string& resource_name,
93 SubscriptionState* subscription_state,
94 ResourceState* resource_state,
95 UpdateQueue* update_queue) {
96 // The update_queue will be null if we were not previously subscribed.
97 if (subscription_state->update_queue != nullptr) return false;
98 subscription_state->update_queue = update_queue;
99 resource_state->subscriptions.emplace(subscription_state);
100 LOG(INFO) << "ADS[" << debug_label_ << "]: subscribe to resource type "
101 << resource_type << " name " << resource_name << " state "
102 << &subscription_state;
103 return true;
104 }
105
106 // Removes subscriptions for resources no longer present in the
107 // current request.
ProcessUnsubscriptions(const std::string & resource_type,const std::set<std::string> & resources_in_current_request,SubscriptionNameMap * subscription_name_map,ResourceNameMap * resource_name_map)108 void AdsServiceImpl::ProcessUnsubscriptions(
109 const std::string& resource_type,
110 const std::set<std::string>& resources_in_current_request,
111 SubscriptionNameMap* subscription_name_map,
112 ResourceNameMap* resource_name_map) {
113 for (auto it = subscription_name_map->begin();
114 it != subscription_name_map->end();) {
115 const std::string& resource_name = it->first;
116 SubscriptionState& subscription_state = it->second;
117 if (resources_in_current_request.find(resource_name) !=
118 resources_in_current_request.end()) {
119 ++it;
120 continue;
121 }
122 LOG(INFO) << "ADS[" << debug_label_
123 << "]: Unsubscribe to type=" << resource_type
124 << " name=" << resource_name << " state=" << &subscription_state;
125 auto resource_it = resource_name_map->find(resource_name);
126 CHECK(resource_it != resource_name_map->end());
127 auto& resource_state = resource_it->second;
128 resource_state.subscriptions.erase(&subscription_state);
129 if (resource_state.subscriptions.empty() &&
130 !resource_state.resource.has_value()) {
131 resource_name_map->erase(resource_it);
132 }
133 it = subscription_name_map->erase(it);
134 }
135 }
136
Start()137 void AdsServiceImpl::Start() {
138 grpc_core::MutexLock lock(&ads_mu_);
139 ads_done_ = false;
140 }
141
Shutdown()142 void AdsServiceImpl::Shutdown() {
143 {
144 grpc_core::MutexLock lock(&ads_mu_);
145 if (!ads_done_) {
146 ads_done_ = true;
147 ads_cond_.SignalAll();
148 }
149 resource_type_response_state_.clear();
150 }
151 LOG(INFO) << "ADS[" << debug_label_ << "]: shut down";
152 }
153
154 //
155 // LrsServiceImpl::ClientStats
156 //
157
total_successful_requests() const158 uint64_t LrsServiceImpl::ClientStats::total_successful_requests() const {
159 uint64_t sum = 0;
160 for (auto& p : locality_stats_) {
161 sum += p.second.total_successful_requests;
162 }
163 return sum;
164 }
165
total_requests_in_progress() const166 uint64_t LrsServiceImpl::ClientStats::total_requests_in_progress() const {
167 uint64_t sum = 0;
168 for (auto& p : locality_stats_) {
169 sum += p.second.total_requests_in_progress;
170 }
171 return sum;
172 }
173
total_error_requests() const174 uint64_t LrsServiceImpl::ClientStats::total_error_requests() const {
175 uint64_t sum = 0;
176 for (auto& p : locality_stats_) {
177 sum += p.second.total_error_requests;
178 }
179 return sum;
180 }
181
total_issued_requests() const182 uint64_t LrsServiceImpl::ClientStats::total_issued_requests() const {
183 uint64_t sum = 0;
184 for (auto& p : locality_stats_) {
185 sum += p.second.total_issued_requests;
186 }
187 return sum;
188 }
189
dropped_requests(const std::string & category) const190 uint64_t LrsServiceImpl::ClientStats::dropped_requests(
191 const std::string& category) const {
192 auto iter = dropped_requests_.find(category);
193 CHECK(iter != dropped_requests_.end());
194 return iter->second;
195 }
196
operator +=(const ClientStats & other)197 LrsServiceImpl::ClientStats& LrsServiceImpl::ClientStats::operator+=(
198 const ClientStats& other) {
199 for (const auto& p : other.locality_stats_) {
200 locality_stats_[p.first] += p.second;
201 }
202 total_dropped_requests_ += other.total_dropped_requests_;
203 for (const auto& p : other.dropped_requests_) {
204 dropped_requests_[p.first] += p.second;
205 }
206 return *this;
207 }
208
209 //
210 // LrsServiceImpl
211 //
212
Start()213 void LrsServiceImpl::Start() {
214 {
215 grpc_core::MutexLock lock(&lrs_mu_);
216 lrs_done_ = false;
217 }
218 {
219 grpc_core::MutexLock lock(&load_report_mu_);
220 result_queue_.clear();
221 }
222 }
223
Shutdown()224 void LrsServiceImpl::Shutdown() {
225 {
226 grpc_core::MutexLock lock(&lrs_mu_);
227 if (!lrs_done_) {
228 lrs_done_ = true;
229 lrs_cv_.SignalAll();
230 }
231 }
232 LOG(INFO) << "LRS[" << debug_label_ << "]: shut down";
233 }
234
WaitForLoadReport(absl::Duration timeout)235 std::vector<LrsServiceImpl::ClientStats> LrsServiceImpl::WaitForLoadReport(
236 absl::Duration timeout) {
237 timeout *= grpc_test_slowdown_factor();
238 grpc_core::MutexLock lock(&load_report_mu_);
239 grpc_core::CondVar cv;
240 if (result_queue_.empty()) {
241 load_report_cond_ = &cv;
242 while (result_queue_.empty()) {
243 if (cv.WaitWithTimeout(&load_report_mu_, timeout)) {
244 LOG(ERROR) << "timed out waiting for load report";
245 return {};
246 }
247 }
248 load_report_cond_ = nullptr;
249 }
250 std::vector<ClientStats> result = std::move(result_queue_.front());
251 result_queue_.pop_front();
252 return result;
253 }
254
255 } // namespace testing
256 } // namespace grpc
257