1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/grpc.h>
18 #include <grpc/support/json.h>
19 #include <inttypes.h>
20 #include <stddef.h>
21
22 #include <algorithm>
23 #include <array>
24 #include <chrono>
25 #include <functional>
26 #include <map>
27 #include <memory>
28 #include <string>
29 #include <utility>
30 #include <vector>
31
32 #include "absl/log/log.h"
33 #include "absl/status/status.h"
34 #include "absl/strings/str_join.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/time/clock.h"
37 #include "absl/time/time.h"
38 #include "absl/types/optional.h"
39 #include "absl/types/span.h"
40 #include "gtest/gtest.h"
41 #include "src/core/load_balancing/backend_metric_data.h"
42 #include "src/core/load_balancing/lb_policy.h"
43 #include "src/core/load_balancing/weighted_target/weighted_target.h"
44 #include "src/core/resolver/endpoint_addresses.h"
45 #include "src/core/util/debug_location.h"
46 #include "src/core/util/json/json.h"
47 #include "src/core/util/json/json_writer.h"
48 #include "src/core/util/orphanable.h"
49 #include "src/core/util/ref_counted_ptr.h"
50 #include "src/core/util/time.h"
51 #include "test/core/load_balancing/lb_policy_test_lib.h"
52 #include "test/core/test_util/fake_stats_plugin.h"
53 #include "test/core/test_util/test_config.h"
54
55 namespace grpc_core {
56 namespace testing {
57 namespace {
58
59 constexpr absl::string_view kLocalityName = "locality0";
60
61 class WeightedRoundRobinTest : public LoadBalancingPolicyTest {
62 protected:
63 class ConfigBuilder {
64 public:
ConfigBuilder()65 ConfigBuilder() {
66 // Set blackout period to 1s to make tests fast and deterministic.
67 SetBlackoutPeriod(Duration::Seconds(1));
68 }
69
SetEnableOobLoadReport(bool value)70 ConfigBuilder& SetEnableOobLoadReport(bool value) {
71 json_["enableOobLoadReport"] = Json::FromBool(value);
72 return *this;
73 }
SetOobReportingPeriod(Duration duration)74 ConfigBuilder& SetOobReportingPeriod(Duration duration) {
75 json_["oobReportingPeriod"] = Json::FromString(duration.ToJsonString());
76 return *this;
77 }
SetBlackoutPeriod(Duration duration)78 ConfigBuilder& SetBlackoutPeriod(Duration duration) {
79 json_["blackoutPeriod"] = Json::FromString(duration.ToJsonString());
80 return *this;
81 }
SetWeightUpdatePeriod(Duration duration)82 ConfigBuilder& SetWeightUpdatePeriod(Duration duration) {
83 json_["weightUpdatePeriod"] = Json::FromString(duration.ToJsonString());
84 return *this;
85 }
SetWeightExpirationPeriod(Duration duration)86 ConfigBuilder& SetWeightExpirationPeriod(Duration duration) {
87 json_["weightExpirationPeriod"] =
88 Json::FromString(duration.ToJsonString());
89 return *this;
90 }
SetErrorUtilizationPenalty(float value)91 ConfigBuilder& SetErrorUtilizationPenalty(float value) {
92 json_["errorUtilizationPenalty"] = Json::FromNumber(value);
93 return *this;
94 }
95
Build()96 RefCountedPtr<LoadBalancingPolicy::Config> Build() {
97 Json config = Json::FromArray({Json::FromObject(
98 {{"weighted_round_robin", Json::FromObject(json_)}})});
99 LOG(INFO) << "CONFIG: " << JsonDump(config);
100 return MakeConfig(config);
101 }
102
103 private:
104 Json::Object json_;
105 };
106
WeightedRoundRobinTest()107 WeightedRoundRobinTest()
108 : LoadBalancingPolicyTest(
109 "weighted_round_robin",
110 ChannelArgs().Set(GRPC_ARG_LB_WEIGHTED_TARGET_CHILD,
111 kLocalityName)) {}
112
SetUp()113 void SetUp() override {
114 LoadBalancingPolicyTest::SetUp();
115 SetExpectedTimerDuration(std::chrono::seconds(1));
116 }
117
118 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
SendInitialUpdateAndWaitForConnected(absl::Span<const absl::string_view> addresses,ConfigBuilder config_builder=ConfigBuilder (),absl::Span<const absl::string_view> update_addresses={},SourceLocation location=SourceLocation ())119 SendInitialUpdateAndWaitForConnected(
120 absl::Span<const absl::string_view> addresses,
121 ConfigBuilder config_builder = ConfigBuilder(),
122 absl::Span<const absl::string_view> update_addresses = {},
123 SourceLocation location = SourceLocation()) {
124 if (update_addresses.empty()) update_addresses = addresses;
125 EXPECT_EQ(ApplyUpdate(BuildUpdate(update_addresses, config_builder.Build()),
126 lb_policy()),
127 absl::OkStatus());
128 // RR should have created a subchannel for each address.
129 for (size_t i = 0; i < addresses.size(); ++i) {
130 auto* subchannel = FindSubchannel(addresses[i]);
131 EXPECT_NE(subchannel, nullptr)
132 << addresses[i] << " at " << location.file() << ":"
133 << location.line();
134 if (subchannel == nullptr) return nullptr;
135 // RR should ask each subchannel to connect.
136 EXPECT_TRUE(subchannel->ConnectionRequested())
137 << addresses[i] << " at " << location.file() << ":"
138 << location.line();
139 // The subchannel will connect successfully.
140 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
141 // Expect the initial CONNECTNG update with a picker that queues.
142 if (i == 0) ExpectConnectingUpdate(location);
143 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
144 }
145 return WaitForConnected(location);
146 }
147
148 // Returns a map indicating the number of picks for each address.
MakePickMap(absl::Span<const std::string> picks)149 static std::map<absl::string_view, size_t> MakePickMap(
150 absl::Span<const std::string> picks) {
151 std::map<absl::string_view, size_t> actual;
152 for (const auto& address : picks) {
153 ++actual.emplace(address, 0).first->second;
154 }
155 return actual;
156 }
157
158 // Returns a human-readable string representing the number of picks
159 // for each address.
PickMapString(const std::map<absl::string_view,size_t> & pick_map)160 static std::string PickMapString(
161 const std::map<absl::string_view, size_t>& pick_map) {
162 return absl::StrJoin(pick_map, ",", absl::PairFormatter("="));
163 }
164
MakeBackendMetricData(double app_utilization,double qps,double eps,double cpu_utilization=0)165 static BackendMetricData MakeBackendMetricData(double app_utilization,
166 double qps, double eps,
167 double cpu_utilization = 0) {
168 BackendMetricData b;
169 b.cpu_utilization = cpu_utilization;
170 b.application_utilization = app_utilization;
171 b.qps = qps;
172 b.eps = eps;
173 return b;
174 }
175
176 // Returns the number of picks we need to do to check the specified
177 // expectations.
NumPicksNeeded(const std::map<absl::string_view,size_t> & expected)178 static size_t NumPicksNeeded(const std::map<absl::string_view /*address*/,
179 size_t /*num_picks*/>& expected) {
180 size_t num_picks = 0;
181 for (const auto& p : expected) {
182 num_picks += p.second;
183 }
184 return num_picks;
185 }
186
187 // For each pick in picks, reports the backend metrics to the LB policy.
ReportBackendMetrics(absl::Span<const std::string> picks,const std::vector<std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>> & subchannel_call_trackers,const std::map<absl::string_view,BackendMetricData> & backend_metrics)188 static void ReportBackendMetrics(
189 absl::Span<const std::string> picks,
190 const std::vector<
191 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>>&
192 subchannel_call_trackers,
193 const std::map<absl::string_view /*address*/, BackendMetricData>&
194 backend_metrics) {
195 for (size_t i = 0; i < picks.size(); ++i) {
196 const auto& address = picks[i];
197 auto& subchannel_call_tracker = subchannel_call_trackers[i];
198 if (subchannel_call_tracker != nullptr) {
199 subchannel_call_tracker->Start();
200 absl::optional<BackendMetricData> backend_metric_data;
201 auto it = backend_metrics.find(address);
202 if (it != backend_metrics.end()) {
203 backend_metric_data.emplace();
204 backend_metric_data->qps = it->second.qps;
205 backend_metric_data->eps = it->second.eps;
206 backend_metric_data->cpu_utilization = it->second.cpu_utilization;
207 backend_metric_data->application_utilization =
208 it->second.application_utilization;
209 }
210 FakeMetadata metadata({});
211 FakeBackendMetricAccessor backend_metric_accessor(
212 std::move(backend_metric_data));
213 LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
214 address, absl::OkStatus(), &metadata, &backend_metric_accessor};
215 subchannel_call_tracker->Finish(args);
216 }
217 }
218 }
219
ReportOobBackendMetrics(const std::map<absl::string_view,BackendMetricData> & backend_metrics)220 void ReportOobBackendMetrics(
221 const std::map<absl::string_view /*address*/, BackendMetricData>&
222 backend_metrics) {
223 for (const auto& p : backend_metrics) {
224 auto* subchannel = FindSubchannel(p.first);
225 BackendMetricData backend_metric_data;
226 backend_metric_data.qps = p.second.qps;
227 backend_metric_data.eps = p.second.eps;
228 backend_metric_data.cpu_utilization = p.second.cpu_utilization;
229 backend_metric_data.application_utilization =
230 p.second.application_utilization;
231 subchannel->SendOobBackendMetricReport(backend_metric_data);
232 }
233 }
234
ExpectWeightedRoundRobinPicks(LoadBalancingPolicy::SubchannelPicker * picker,const std::map<absl::string_view,BackendMetricData> & backend_metrics,const std::map<absl::string_view,size_t> & expected,SourceLocation location=SourceLocation ())235 void ExpectWeightedRoundRobinPicks(
236 LoadBalancingPolicy::SubchannelPicker* picker,
237 const std::map<absl::string_view /*address*/, BackendMetricData>&
238 backend_metrics,
239 const std::map<absl::string_view /*address*/, size_t /*num_picks*/>&
240 expected,
241 SourceLocation location = SourceLocation()) {
242 std::vector<
243 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>>
244 subchannel_call_trackers;
245 auto picks = GetCompletePicks(picker, NumPicksNeeded(expected), {},
246 &subchannel_call_trackers, location);
247 ASSERT_TRUE(picks.has_value()) << location.file() << ":" << location.line();
248 LOG(INFO) << "PICKS: " << absl::StrJoin(*picks, " ");
249 ReportBackendMetrics(*picks, subchannel_call_trackers, backend_metrics);
250 auto actual = MakePickMap(*picks);
251 LOG(INFO) << "Pick map: " << PickMapString(actual);
252 EXPECT_EQ(expected, actual)
253 << "Expected: " << PickMapString(expected)
254 << "\nActual: " << PickMapString(actual) << "\nat " << location.file()
255 << ":" << location.line();
256 }
257
WaitForWeightedRoundRobinPicks(RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> * picker,const std::map<absl::string_view,BackendMetricData> & backend_metrics,std::map<absl::string_view,size_t> expected,absl::Duration timeout=absl::Seconds (5),bool run_timer_callbacks=true,SourceLocation location=SourceLocation ())258 bool WaitForWeightedRoundRobinPicks(
259 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>* picker,
260 const std::map<absl::string_view /*address*/, BackendMetricData>&
261 backend_metrics,
262 std::map<absl::string_view /*address*/, size_t /*num_picks*/> expected,
263 absl::Duration timeout = absl::Seconds(5),
264 bool run_timer_callbacks = true,
265 SourceLocation location = SourceLocation()) {
266 LOG(INFO) << "==> WaitForWeightedRoundRobinPicks(): Expecting "
267 << PickMapString(expected);
268 size_t num_picks = NumPicksNeeded(expected);
269 absl::Time deadline = absl::Now() + timeout;
270 while (true) {
271 LOG(INFO) << "TOP OF LOOP";
272 // We need to see the expected weights for 3 consecutive passes, just
273 // to make sure we're consistently returning the right weights.
274 size_t num_passes = 0;
275 for (; num_passes < 3; ++num_passes) {
276 LOG(INFO) << "PASS " << num_passes << ": DOING PICKS";
277 std::vector<std::unique_ptr<
278 LoadBalancingPolicy::SubchannelCallTrackerInterface>>
279 subchannel_call_trackers;
280 auto picks = GetCompletePicks(picker->get(), num_picks, {},
281 &subchannel_call_trackers, location);
282 EXPECT_TRUE(picks.has_value())
283 << location.file() << ":" << location.line();
284 if (!picks.has_value()) return false;
285 LOG(INFO) << "PICKS: " << absl::StrJoin(*picks, " ");
286 // Report backend metrics to the LB policy.
287 ReportBackendMetrics(*picks, subchannel_call_trackers, backend_metrics);
288 // Check the observed weights.
289 auto actual = MakePickMap(*picks);
290 LOG(INFO) << "Pick map:\nExpected: " << PickMapString(expected)
291 << "\n Actual: " << PickMapString(actual);
292 if (expected != actual) {
293 // Make sure each address is one of the expected addresses,
294 // even if the weights aren't as expected.
295 for (const auto& address : *picks) {
296 bool found = expected.find(address) != expected.end();
297 EXPECT_TRUE(found)
298 << "unexpected pick address " << address << " at "
299 << location.file() << ":" << location.line();
300 if (!found) return false;
301 }
302 break;
303 }
304 // If there's another picker update in the queue, don't bother
305 // doing another pass, since we want to make sure we're using
306 // the latest picker.
307 if (!helper_->QueueEmpty()) break;
308 }
309 if (num_passes == 3) return true;
310 // If we're out of time, give up.
311 absl::Time now = absl::Now();
312 EXPECT_LT(now, deadline) << location.file() << ":" << location.line();
313 if (now >= deadline) return false;
314 // Get a new picker if there is an update; otherwise, wait for the
315 // weights to be recalculated.
316 if (!helper_->QueueEmpty()) {
317 *picker = ExpectState(GRPC_CHANNEL_READY, absl::OkStatus(), location);
318 EXPECT_NE(*picker, nullptr)
319 << location.file() << ":" << location.line();
320 if (*picker == nullptr) return false;
321 } else if (run_timer_callbacks) {
322 LOG(INFO) << "running timer callback...";
323 // Increment time and run any timer callbacks.
324 IncrementTimeBy(Duration::Seconds(1));
325 }
326 }
327 }
328 };
329
TEST_F(WeightedRoundRobinTest,Basic)330 TEST_F(WeightedRoundRobinTest, Basic) {
331 // Send address list to LB policy.
332 const std::array<absl::string_view, 3> kAddresses = {
333 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
334 auto picker = SendInitialUpdateAndWaitForConnected(kAddresses);
335 ASSERT_NE(picker, nullptr);
336 // Address 0 gets weight 1, address 1 gets weight 3.
337 // No utilization report from backend 2, so it gets the average weight 2.
338 WaitForWeightedRoundRobinPicks(
339 &picker,
340 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
341 /*qps=*/100.0, /*eps=*/0.0)},
342 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
343 /*qps=*/100.0, /*eps=*/0.0)}},
344 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
345 // Now have backend 2 report utilization the same as backend 1, so its
346 // weight will be the same.
347 WaitForWeightedRoundRobinPicks(
348 &picker,
349 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
350 /*qps=*/100.0, /*eps=*/0.0)},
351 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
352 /*qps=*/100.0, /*eps=*/0.0)},
353 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
354 /*qps=*/100.0, /*eps=*/0.0)}},
355 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
356 }
357
TEST_F(WeightedRoundRobinTest,CpuUtilWithNoAppUtil)358 TEST_F(WeightedRoundRobinTest, CpuUtilWithNoAppUtil) {
359 // Send address list to LB policy.
360 const std::array<absl::string_view, 3> kAddresses = {
361 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
362 auto picker = SendInitialUpdateAndWaitForConnected(kAddresses);
363 ASSERT_NE(picker, nullptr);
364 // Address 0 gets weight 1, address 1 gets weight 3.
365 // No utilization report from backend 2, so it gets the average weight 2.
366 WaitForWeightedRoundRobinPicks(
367 &picker,
368 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0,
369 /*qps=*/100.0, /*eps=*/0.0,
370 /*cpu_utilization=*/0.9)},
371 {kAddresses[1],
372 MakeBackendMetricData(/*app_utilization=*/0,
373 /*qps=*/100.0,
374 /*eps=*/0.0, /*cpu_utilization=*/0.3)}},
375 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
376 // Now have backend 2 report utilization the same as backend 1, so its
377 // weight will be the same.
378 WaitForWeightedRoundRobinPicks(
379 &picker,
380 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0,
381 /*qps=*/100.0, /*eps=*/0.0,
382 /*cpu_utilization=*/0.9)},
383 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0,
384 /*qps=*/100.0, /*eps=*/0.0,
385 /*cpu_utilization=*/0.3)},
386 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0,
387 /*qps=*/100.0, /*eps=*/0.0,
388 /*cpu_utilization=*/0.3)}},
389 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
390 }
391
TEST_F(WeightedRoundRobinTest,AppUtilOverCpuUtil)392 TEST_F(WeightedRoundRobinTest, AppUtilOverCpuUtil) {
393 // Send address list to LB policy.
394 const std::array<absl::string_view, 3> kAddresses = {
395 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
396 auto picker = SendInitialUpdateAndWaitForConnected(kAddresses);
397 ASSERT_NE(picker, nullptr);
398 // Address 0 gets weight 1, address 1 gets weight 3.
399 // No utilization report from backend 2, so it gets the average weight 2.
400 WaitForWeightedRoundRobinPicks(
401 &picker,
402 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
403 /*qps=*/100.0, /*eps=*/0.0,
404 /*cpu_utilization=*/0.3)},
405 {kAddresses[1],
406 MakeBackendMetricData(/*app_utilization=*/0.3,
407 /*qps=*/100.0,
408 /*eps=*/0.0, /*cpu_utilization=*/0.4)}},
409 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
410 // Now have backend 2 report utilization the same as backend 1, so its
411 // weight will be the same.
412 WaitForWeightedRoundRobinPicks(
413 &picker,
414 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
415 /*qps=*/100.0, /*eps=*/0.0,
416 /*cpu_utilization=*/0.2)},
417 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
418 /*qps=*/100.0, /*eps=*/0.0,
419 /*cpu_utilization=*/0.6)},
420 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
421 /*qps=*/100.0, /*eps=*/0.0,
422 /*cpu_utilization=*/0.5)}},
423 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
424 }
425
TEST_F(WeightedRoundRobinTest,Eps)426 TEST_F(WeightedRoundRobinTest, Eps) {
427 // Send address list to LB policy.
428 const std::array<absl::string_view, 3> kAddresses = {
429 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
430 auto picker = SendInitialUpdateAndWaitForConnected(
431 kAddresses, ConfigBuilder().SetErrorUtilizationPenalty(1.0));
432 ASSERT_NE(picker, nullptr);
433 // Expected weights: 1/(0.1+0.5) : 1/(0.1+0.2) : 1/(0.1+0.1) = 1:2:3
434 WaitForWeightedRoundRobinPicks(
435 &picker,
436 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.1,
437 /*qps=*/100.0, /*eps=*/50.0)},
438 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.1,
439 /*qps=*/100.0, /*eps=*/20.0)},
440 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.1,
441 /*qps=*/100.0, /*eps=*/10.0)}},
442 {{kAddresses[0], 1}, {kAddresses[1], 2}, {kAddresses[2], 3}});
443 }
444
TEST_F(WeightedRoundRobinTest,IgnoresDuplicateAddresses)445 TEST_F(WeightedRoundRobinTest, IgnoresDuplicateAddresses) {
446 // Send address list to LB policy.
447 const std::array<absl::string_view, 3> kAddresses = {
448 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
449 const std::array<absl::string_view, 4> kUpdateAddresses = {
450 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443",
451 "ipv4:127.0.0.1:441"};
452 auto picker = SendInitialUpdateAndWaitForConnected(
453 kAddresses, ConfigBuilder(), kUpdateAddresses);
454 ASSERT_NE(picker, nullptr);
455 // Address 0 gets weight 1, address 1 gets weight 3.
456 // No utilization report from backend 2, so it gets the average weight 2.
457 WaitForWeightedRoundRobinPicks(
458 &picker,
459 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
460 /*qps=*/100.0, /*eps=*/0.0)},
461 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
462 /*qps=*/100.0, /*eps=*/0.0)}},
463 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
464 // Now have backend 2 report utilization the same as backend 1, so its
465 // weight will be the same.
466 WaitForWeightedRoundRobinPicks(
467 &picker,
468 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
469 /*qps=*/100.0, /*eps=*/0.0)},
470 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
471 /*qps=*/100.0, /*eps=*/0.0)},
472 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
473 /*qps=*/100.0, /*eps=*/0.0)}},
474 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
475 }
476
TEST_F(WeightedRoundRobinTest,FallsBackToRoundRobinWithoutWeights)477 TEST_F(WeightedRoundRobinTest, FallsBackToRoundRobinWithoutWeights) {
478 // Send address list to LB policy.
479 const std::array<absl::string_view, 3> kAddresses = {
480 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
481 auto picker = SendInitialUpdateAndWaitForConnected(kAddresses);
482 ASSERT_NE(picker, nullptr);
483 // Backends do not report utilization, so all are weighted the same.
484 WaitForWeightedRoundRobinPicks(
485 &picker, {},
486 {{kAddresses[0], 1}, {kAddresses[1], 1}, {kAddresses[2], 1}});
487 }
488
TEST_F(WeightedRoundRobinTest,OobReporting)489 TEST_F(WeightedRoundRobinTest, OobReporting) {
490 // Send address list to LB policy.
491 const std::array<absl::string_view, 3> kAddresses = {
492 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
493 auto picker = SendInitialUpdateAndWaitForConnected(
494 kAddresses, ConfigBuilder().SetEnableOobLoadReport(true));
495 ASSERT_NE(picker, nullptr);
496 // Address 0 gets weight 1, address 1 gets weight 3.
497 // No utilization report from backend 2, so it gets the average weight 2.
498 ReportOobBackendMetrics(
499 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
500 /*qps=*/100.0, /*eps=*/0.0)},
501 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
502 /*qps=*/100.0, /*eps=*/0.0)}});
503 WaitForWeightedRoundRobinPicks(
504 &picker, {},
505 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
506 // Now have backend 2 report utilization the same as backend 1, so its
507 // weight will be the same.
508 ReportOobBackendMetrics(
509 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
510 /*qps=*/100.0, /*eps=*/0.0)},
511 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
512 /*qps=*/100.0, /*eps=*/0.0)},
513 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
514 /*qps=*/100.0, /*eps=*/0.0)}});
515 WaitForWeightedRoundRobinPicks(
516 &picker, {},
517 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
518 // Verify that OOB reporting interval is the default.
519 for (const auto& address : kAddresses) {
520 auto* subchannel = FindSubchannel(address);
521 ASSERT_NE(subchannel, nullptr);
522 subchannel->CheckOobReportingPeriod(Duration::Seconds(10));
523 }
524 }
525
TEST_F(WeightedRoundRobinTest,OobReportingCpuUtilWithNoAppUtil)526 TEST_F(WeightedRoundRobinTest, OobReportingCpuUtilWithNoAppUtil) {
527 // Send address list to LB policy.
528 const std::array<absl::string_view, 3> kAddresses = {
529 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
530 auto picker = SendInitialUpdateAndWaitForConnected(
531 kAddresses, ConfigBuilder().SetEnableOobLoadReport(true));
532 ASSERT_NE(picker, nullptr);
533 // Address 0 gets weight 1, address 1 gets weight 3.
534 // No utilization report from backend 2, so it gets the average weight 2.
535 ReportOobBackendMetrics(
536 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0,
537 /*qps=*/100.0, /*eps=*/0.0,
538 /*cpu_utilization=*/0.9)},
539 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0,
540 /*qps=*/100.0, /*eps=*/0.0,
541 /*cpu_utilization=*/0.3)}});
542 WaitForWeightedRoundRobinPicks(
543 &picker, {},
544 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
545 // Now have backend 2 report utilization the same as backend 1, so its
546 // weight will be the same.
547 ReportOobBackendMetrics(
548 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0,
549 /*qps=*/100.0, /*eps=*/0.0,
550 /*cpu_utilization=*/0.9)},
551 {kAddresses[1],
552 MakeBackendMetricData(/*app_utilization=*/0,
553 /*qps=*/100.0,
554 /*eps=*/0.0, /*cpu_utilization=*/0.3)},
555 {kAddresses[2],
556 MakeBackendMetricData(/*app_utilization=*/0,
557 /*qps=*/100.0,
558 /*eps=*/0.0, /*cpu_utilization=*/0.3)}});
559 WaitForWeightedRoundRobinPicks(
560 &picker, {},
561 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
562 // Verify that OOB reporting interval is the default.
563 for (const auto& address : kAddresses) {
564 auto* subchannel = FindSubchannel(address);
565 ASSERT_NE(subchannel, nullptr);
566 subchannel->CheckOobReportingPeriod(Duration::Seconds(10));
567 }
568 }
569
TEST_F(WeightedRoundRobinTest,OobReportingAppUtilOverCpuUtil)570 TEST_F(WeightedRoundRobinTest, OobReportingAppUtilOverCpuUtil) {
571 // Send address list to LB policy.
572 const std::array<absl::string_view, 3> kAddresses = {
573 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
574 auto picker = SendInitialUpdateAndWaitForConnected(
575 kAddresses, ConfigBuilder().SetEnableOobLoadReport(true));
576 ASSERT_NE(picker, nullptr);
577 // Address 0 gets weight 1, address 1 gets weight 3.
578 // No utilization report from backend 2, so it gets the average weight 2.
579 ReportOobBackendMetrics(
580 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
581 /*qps=*/100.0, /*eps=*/0.0,
582 /*cpu_utilization=*/0.3)},
583 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
584 /*qps=*/100.0, /*eps=*/0.0,
585 /*cpu_utilization=*/0.4)}});
586 WaitForWeightedRoundRobinPicks(
587 &picker, {},
588 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
589 // Now have backend 2 report utilization the same as backend 1, so its
590 // weight will be the same.
591 ReportOobBackendMetrics(
592 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
593 /*qps=*/100.0, /*eps=*/0.0,
594 /*cpu_utilization=*/0.2)},
595 {kAddresses[1],
596 MakeBackendMetricData(/*app_utilization=*/0.3,
597 /*qps=*/100.0,
598 /*eps=*/0.0, /*cpu_utilization=*/0.6)},
599 {kAddresses[2],
600 MakeBackendMetricData(/*app_utilization=*/0.3,
601 /*qps=*/100.0,
602 /*eps=*/0.0, /*cpu_utilization=*/0.5)}});
603 WaitForWeightedRoundRobinPicks(
604 &picker, {},
605 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
606 // Verify that OOB reporting interval is the default.
607 for (const auto& address : kAddresses) {
608 auto* subchannel = FindSubchannel(address);
609 ASSERT_NE(subchannel, nullptr);
610 subchannel->CheckOobReportingPeriod(Duration::Seconds(10));
611 }
612 }
613
TEST_F(WeightedRoundRobinTest,HonorsOobReportingPeriod)614 TEST_F(WeightedRoundRobinTest, HonorsOobReportingPeriod) {
615 const std::array<absl::string_view, 3> kAddresses = {
616 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
617 auto picker = SendInitialUpdateAndWaitForConnected(
618 kAddresses,
619 ConfigBuilder().SetEnableOobLoadReport(true).SetOobReportingPeriod(
620 Duration::Seconds(5)));
621 ASSERT_NE(picker, nullptr);
622 ReportOobBackendMetrics(
623 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
624 /*qps=*/100.0, /*eps=*/0.0)},
625 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
626 /*qps=*/100.0, /*eps=*/0.0)},
627 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
628 /*qps=*/100.0, /*eps=*/0.0)}});
629 WaitForWeightedRoundRobinPicks(
630 &picker, {},
631 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
632 for (const auto& address : kAddresses) {
633 auto* subchannel = FindSubchannel(address);
634 ASSERT_NE(subchannel, nullptr);
635 subchannel->CheckOobReportingPeriod(Duration::Seconds(5));
636 }
637 }
638
TEST_F(WeightedRoundRobinTest,HonorsWeightUpdatePeriod)639 TEST_F(WeightedRoundRobinTest, HonorsWeightUpdatePeriod) {
640 const std::array<absl::string_view, 3> kAddresses = {
641 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
642 SetExpectedTimerDuration(std::chrono::seconds(2));
643 auto picker = SendInitialUpdateAndWaitForConnected(
644 kAddresses, ConfigBuilder().SetWeightUpdatePeriod(Duration::Seconds(2)));
645 ASSERT_NE(picker, nullptr);
646 WaitForWeightedRoundRobinPicks(
647 &picker,
648 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
649 /*qps=*/100.0, /*eps=*/0.0)},
650 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
651 /*qps=*/100.0, /*eps=*/0.0)},
652 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
653 /*qps=*/100.0, /*eps=*/0.0)}},
654 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
655 }
656
TEST_F(WeightedRoundRobinTest,WeightUpdatePeriodLowerBound)657 TEST_F(WeightedRoundRobinTest, WeightUpdatePeriodLowerBound) {
658 const std::array<absl::string_view, 3> kAddresses = {
659 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
660 SetExpectedTimerDuration(std::chrono::milliseconds(100));
661 auto picker = SendInitialUpdateAndWaitForConnected(
662 kAddresses,
663 ConfigBuilder().SetWeightUpdatePeriod(Duration::Milliseconds(10)));
664 ASSERT_NE(picker, nullptr);
665 WaitForWeightedRoundRobinPicks(
666 &picker,
667 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
668 /*qps=*/100.0, /*eps=*/0.0)},
669 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
670 /*qps=*/100.0, /*eps=*/0.0)},
671 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
672 /*qps=*/100.0, /*eps=*/0.0)}},
673 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
674 }
675
TEST_F(WeightedRoundRobinTest,WeightExpirationPeriod)676 TEST_F(WeightedRoundRobinTest, WeightExpirationPeriod) {
677 // Send address list to LB policy.
678 const std::array<absl::string_view, 3> kAddresses = {
679 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
680 auto picker = SendInitialUpdateAndWaitForConnected(
681 kAddresses,
682 ConfigBuilder().SetWeightExpirationPeriod(Duration::Seconds(2)));
683 ASSERT_NE(picker, nullptr);
684 // All backends report weights.
685 WaitForWeightedRoundRobinPicks(
686 &picker,
687 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
688 /*qps=*/100.0, /*eps=*/0.0)},
689 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
690 /*qps=*/100.0, /*eps=*/0.0)},
691 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
692 /*qps=*/100.0, /*eps=*/0.0)}},
693 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
694 // Advance time to make weights stale and trigger the timer callback
695 // to recompute weights.
696 IncrementTimeBy(Duration::Seconds(2));
697 // Picker should now be falling back to round-robin.
698 ExpectWeightedRoundRobinPicks(
699 picker.get(), {},
700 {{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}});
701 }
702
TEST_F(WeightedRoundRobinTest,BlackoutPeriodAfterWeightExpiration)703 TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterWeightExpiration) {
704 // Send address list to LB policy.
705 const std::array<absl::string_view, 3> kAddresses = {
706 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
707 auto picker = SendInitialUpdateAndWaitForConnected(
708 kAddresses,
709 ConfigBuilder().SetWeightExpirationPeriod(Duration::Seconds(2)));
710 ASSERT_NE(picker, nullptr);
711 // All backends report weights.
712 WaitForWeightedRoundRobinPicks(
713 &picker,
714 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
715 /*qps=*/100.0, /*eps=*/0.0)},
716 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
717 /*qps=*/100.0, /*eps=*/0.0)},
718 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
719 /*qps=*/100.0, /*eps=*/0.0)}},
720 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
721 // Advance time to make weights stale and trigger the timer callback
722 // to recompute weights.
723 IncrementTimeBy(Duration::Seconds(2));
724 // Picker should now be falling back to round-robin.
725 ExpectWeightedRoundRobinPicks(
726 picker.get(), {},
727 {{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}});
728 // Now start sending weights again. They should not be used yet,
729 // because we're still in the blackout period.
730 ExpectWeightedRoundRobinPicks(
731 picker.get(),
732 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.3,
733 /*qps=*/100.0, /*eps=*/0.0)},
734 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
735 /*qps=*/100.0, /*eps=*/0.0)},
736 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.9,
737 /*qps=*/100.0, /*eps=*/0.0)}},
738 {{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}});
739 // Advance time past the blackout period. This should cause the
740 // weights to be used.
741 IncrementTimeBy(Duration::Seconds(1));
742 ExpectWeightedRoundRobinPicks(
743 picker.get(), {},
744 {{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 1}});
745 }
746
TEST_F(WeightedRoundRobinTest,BlackoutPeriodAfterDisconnect)747 TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterDisconnect) {
748 // Send address list to LB policy.
749 const std::array<absl::string_view, 3> kAddresses = {
750 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
751 auto picker = SendInitialUpdateAndWaitForConnected(
752 kAddresses,
753 ConfigBuilder().SetWeightExpirationPeriod(Duration::Seconds(2)));
754 ASSERT_NE(picker, nullptr);
755 // All backends report weights.
756 WaitForWeightedRoundRobinPicks(
757 &picker,
758 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
759 /*qps=*/100.0, /*eps=*/0.0)},
760 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
761 /*qps=*/100.0, /*eps=*/0.0)},
762 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
763 /*qps=*/100.0, /*eps=*/0.0)}},
764 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
765 // Trigger disconnection and reconnection on address 2.
766 auto* subchannel = FindSubchannel(kAddresses[2]);
767 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
768 ExpectReresolutionRequest();
769 EXPECT_TRUE(subchannel->ConnectionRequested());
770 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
771 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
772 // Wait for the address to come back. Note that we have not advanced
773 // time, so the address will still be in the blackout period,
774 // resulting in it being assigned the average weight.
775 picker = ExpectState(GRPC_CHANNEL_READY, absl::OkStatus());
776 WaitForWeightedRoundRobinPicks(
777 &picker,
778 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
779 /*qps=*/100.0, /*eps=*/0.0)},
780 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
781 /*qps=*/100.0, /*eps=*/0.0)},
782 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
783 /*qps=*/100.0, /*eps=*/0.0)}},
784 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
785 // Advance time to exceed the blackout period and trigger the timer
786 // callback to recompute weights.
787 IncrementTimeBy(Duration::Seconds(1));
788 ExpectWeightedRoundRobinPicks(
789 picker.get(),
790 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.3,
791 /*qps=*/100.0, /*eps=*/0.0)},
792 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
793 /*qps=*/100.0, /*eps=*/0.0)},
794 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.9,
795 /*qps=*/100.0, /*eps=*/0.0)}},
796 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
797 }
798
TEST_F(WeightedRoundRobinTest,BlackoutPeriodDoesNotGetResetAfterUpdate)799 TEST_F(WeightedRoundRobinTest, BlackoutPeriodDoesNotGetResetAfterUpdate) {
800 // Send address list to LB policy.
801 const std::array<absl::string_view, 3> kAddresses = {
802 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
803 auto config_builder =
804 ConfigBuilder().SetWeightExpirationPeriod(Duration::Seconds(2));
805 auto picker =
806 SendInitialUpdateAndWaitForConnected(kAddresses, config_builder);
807 ASSERT_NE(picker, nullptr);
808 // All backends report weights.
809 WaitForWeightedRoundRobinPicks(
810 &picker,
811 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
812 /*qps=*/100.0, /*eps=*/0.0)},
813 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
814 /*qps=*/100.0, /*eps=*/0.0)},
815 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
816 /*qps=*/100.0, /*eps=*/0.0)}},
817 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
818 // Send a duplicate update with the same addresses and config.
819 EXPECT_EQ(
820 ApplyUpdate(BuildUpdate(kAddresses, config_builder.Build()), lb_policy()),
821 absl::OkStatus());
822 // Note that we have not advanced time, so if the update incorrectly
823 // triggers resetting the blackout period, none of the weights will
824 // actually be used.
825 picker = ExpectState(GRPC_CHANNEL_READY, absl::OkStatus());
826 WaitForWeightedRoundRobinPicks(
827 &picker,
828 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
829 /*qps=*/100.0, /*eps=*/0.0)},
830 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
831 /*qps=*/100.0, /*eps=*/0.0)},
832 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
833 /*qps=*/100.0, /*eps=*/0.0)}},
834 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}},
835 /*timeout=*/absl::Seconds(5), /*run_timer_callbacks=*/false);
836 }
837
TEST_F(WeightedRoundRobinTest,ZeroErrorUtilPenalty)838 TEST_F(WeightedRoundRobinTest, ZeroErrorUtilPenalty) {
839 // Send address list to LB policy.
840 const std::array<absl::string_view, 3> kAddresses = {
841 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
842 auto picker = SendInitialUpdateAndWaitForConnected(
843 kAddresses, ConfigBuilder().SetErrorUtilizationPenalty(0.0));
844 ASSERT_NE(picker, nullptr);
845 // Expected weights: 1:1:1
846 WaitForWeightedRoundRobinPicks(
847 &picker,
848 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.1,
849 /*qps=*/100.0, /*eps=*/50.0)},
850 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.1,
851 /*qps=*/100.0, /*eps=*/20.0)},
852 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.1,
853 /*qps=*/100.0, /*eps=*/10.0)}},
854 {{kAddresses[0], 1}, {kAddresses[1], 1}, {kAddresses[2], 1}});
855 }
856
TEST_F(WeightedRoundRobinTest,MultipleAddressesPerEndpoint)857 TEST_F(WeightedRoundRobinTest, MultipleAddressesPerEndpoint) {
858 // Can't use timer duration expectation here, because the Happy
859 // Eyeballs timer inside pick_first will use a different duration than
860 // the timer in WRR.
861 SetExpectedTimerDuration(absl::nullopt);
862 constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
863 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
864 constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
865 "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
866 constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
867 "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
868 const std::array<EndpointAddresses, 3> kEndpoints = {
869 MakeEndpointAddresses(kEndpoint1Addresses),
870 MakeEndpointAddresses(kEndpoint2Addresses),
871 MakeEndpointAddresses(kEndpoint3Addresses)};
872 EXPECT_EQ(ApplyUpdate(BuildUpdate(kEndpoints, ConfigBuilder().Build()),
873 lb_policy_.get()),
874 absl::OkStatus());
875 // WRR should have created a subchannel for each address.
876 auto* subchannel1_0 = FindSubchannel(kEndpoint1Addresses[0]);
877 ASSERT_NE(subchannel1_0, nullptr) << "Address: " << kEndpoint1Addresses[0];
878 auto* subchannel1_1 = FindSubchannel(kEndpoint1Addresses[1]);
879 ASSERT_NE(subchannel1_1, nullptr) << "Address: " << kEndpoint1Addresses[1];
880 auto* subchannel2_0 = FindSubchannel(kEndpoint2Addresses[0]);
881 ASSERT_NE(subchannel2_0, nullptr) << "Address: " << kEndpoint2Addresses[0];
882 auto* subchannel2_1 = FindSubchannel(kEndpoint2Addresses[1]);
883 ASSERT_NE(subchannel2_1, nullptr) << "Address: " << kEndpoint2Addresses[1];
884 auto* subchannel3_0 = FindSubchannel(kEndpoint3Addresses[0]);
885 ASSERT_NE(subchannel3_0, nullptr) << "Address: " << kEndpoint3Addresses[0];
886 auto* subchannel3_1 = FindSubchannel(kEndpoint3Addresses[1]);
887 ASSERT_NE(subchannel3_1, nullptr) << "Address: " << kEndpoint3Addresses[1];
888 // PF for each endpoint should try to connect to the first subchannel.
889 EXPECT_TRUE(subchannel1_0->ConnectionRequested());
890 EXPECT_FALSE(subchannel1_1->ConnectionRequested());
891 EXPECT_TRUE(subchannel2_0->ConnectionRequested());
892 EXPECT_FALSE(subchannel2_1->ConnectionRequested());
893 EXPECT_TRUE(subchannel3_0->ConnectionRequested());
894 EXPECT_FALSE(subchannel3_1->ConnectionRequested());
895 // In the first endpoint, the first subchannel reports CONNECTING.
896 // This causes WRR to report CONNECTING.
897 subchannel1_0->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
898 ExpectConnectingUpdate();
899 // In the second endpoint, the first subchannel reports CONNECTING.
900 subchannel2_0->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
901 // In the third endpoint, the first subchannel reports CONNECTING.
902 subchannel3_0->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
903 // In the first endpoint, the first subchannel fails to connect.
904 // This causes PF to start a connection attempt on the second subchannel.
905 subchannel1_0->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
906 absl::UnavailableError("ugh"));
907 EXPECT_TRUE(subchannel1_1->ConnectionRequested());
908 subchannel1_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
909 // In the second endpoint, the first subchannel becomes connected.
910 // This causes WRR to report READY with all RPCs going to a single address.
911 subchannel2_0->SetConnectivityState(GRPC_CHANNEL_READY);
912 auto picker = WaitForConnected();
913 ExpectRoundRobinPicks(picker.get(), {kEndpoint2Addresses[0]});
914 // In the third endpoint, the first subchannel becomes connected.
915 // This causes WRR to add it to the rotation.
916 subchannel3_0->SetConnectivityState(GRPC_CHANNEL_READY);
917 picker = WaitForRoundRobinListChange(
918 {kEndpoint2Addresses[0]},
919 {kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
920 // In the first endpoint, the second subchannel becomes connected.
921 // This causes WRR to add it to the rotation.
922 subchannel1_1->SetConnectivityState(GRPC_CHANNEL_READY);
923 picker = WaitForRoundRobinListChange(
924 {kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
925 {kEndpoint1Addresses[1], kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
926 // No more connection attempts triggered.
927 EXPECT_FALSE(subchannel1_0->ConnectionRequested());
928 EXPECT_FALSE(subchannel1_1->ConnectionRequested());
929 EXPECT_FALSE(subchannel2_0->ConnectionRequested());
930 EXPECT_FALSE(subchannel2_1->ConnectionRequested());
931 EXPECT_FALSE(subchannel3_0->ConnectionRequested());
932 EXPECT_FALSE(subchannel3_1->ConnectionRequested());
933 // Expected weights: 3:1:3
934 WaitForWeightedRoundRobinPicks(
935 &picker,
936 {{kEndpoint1Addresses[1],
937 MakeBackendMetricData(/*app_utilization=*/0.3, /*qps=*/100.0,
938 /*eps=*/0.0)},
939 {kEndpoint2Addresses[0],
940 MakeBackendMetricData(/*app_utilization=*/0.9, /*qps=*/100.0,
941 /*eps=*/0.0)},
942 {kEndpoint3Addresses[0],
943 MakeBackendMetricData(/*app_utilization=*/0.3, /*qps=*/100.0,
944 /*eps=*/0.0)}},
945 {{kEndpoint1Addresses[1], 3},
946 {kEndpoint2Addresses[0], 1},
947 {kEndpoint3Addresses[0], 3}});
948 // First endpoint first subchannel finishes backoff, but this doesn't
949 // affect anything -- in fact, PF isn't even watching this subchannel
950 // anymore, since it's connected to the other one. However, this
951 // ensures that the subchannel is in the right state when we try to
952 // reconnect below.
953 subchannel1_0->SetConnectivityState(GRPC_CHANNEL_IDLE);
954 EXPECT_FALSE(subchannel1_0->ConnectionRequested());
955 // Endpoint 1 switches to a different address.
956 ExpectEndpointAddressChange(
957 kEndpoint1Addresses, 1, 0,
958 // When the subchannel disconnects, WRR will remove the endpoint from
959 // the rotation.
960 [&]() {
961 picker = ExpectState(GRPC_CHANNEL_READY);
962 WaitForWeightedRoundRobinPicks(
963 &picker,
964 {{kEndpoint2Addresses[0],
965 MakeBackendMetricData(/*app_utilization=*/0.9, /*qps=*/100.0,
966 /*eps=*/0.0)},
967 {kEndpoint3Addresses[0],
968 MakeBackendMetricData(/*app_utilization=*/0.3, /*qps=*/100.0,
969 /*eps=*/0.0)}},
970 {{kEndpoint2Addresses[0], 1}, {kEndpoint3Addresses[0], 3}});
971 });
972 // When it connects to the new address, WRR adds it to the rotation.
973 WaitForWeightedRoundRobinPicks(
974 &picker,
975 {{kEndpoint1Addresses[0],
976 MakeBackendMetricData(/*app_utilization=*/0.3, /*qps=*/100.0,
977 /*eps=*/0.0)},
978 {kEndpoint2Addresses[0],
979 MakeBackendMetricData(/*app_utilization=*/0.9, /*qps=*/100.0,
980 /*eps=*/0.0)},
981 {kEndpoint3Addresses[0],
982 MakeBackendMetricData(/*app_utilization=*/0.3, /*qps=*/100.0,
983 /*eps=*/0.0)}},
984 {{kEndpoint1Addresses[0], 3},
985 {kEndpoint2Addresses[0], 1},
986 {kEndpoint3Addresses[0], 3}});
987 // No more connection attempts triggered.
988 EXPECT_FALSE(subchannel1_0->ConnectionRequested());
989 EXPECT_FALSE(subchannel1_1->ConnectionRequested());
990 EXPECT_FALSE(subchannel2_0->ConnectionRequested());
991 EXPECT_FALSE(subchannel2_1->ConnectionRequested());
992 EXPECT_FALSE(subchannel3_0->ConnectionRequested());
993 EXPECT_FALSE(subchannel3_1->ConnectionRequested());
994 }
995
TEST_F(WeightedRoundRobinTest,MetricDefinitionRrFallback)996 TEST_F(WeightedRoundRobinTest, MetricDefinitionRrFallback) {
997 const auto* descriptor =
998 GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
999 "grpc.lb.wrr.rr_fallback");
1000 ASSERT_NE(descriptor, nullptr);
1001 EXPECT_EQ(descriptor->value_type,
1002 GlobalInstrumentsRegistry::ValueType::kUInt64);
1003 EXPECT_EQ(descriptor->instrument_type,
1004 GlobalInstrumentsRegistry::InstrumentType::kCounter);
1005 EXPECT_EQ(descriptor->enable_by_default, false);
1006 EXPECT_EQ(descriptor->name, "grpc.lb.wrr.rr_fallback");
1007 EXPECT_EQ(descriptor->unit, "{update}");
1008 EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1009 EXPECT_THAT(descriptor->optional_label_keys,
1010 ::testing::ElementsAre("grpc.lb.locality"));
1011 }
1012
TEST_F(WeightedRoundRobinTest,MetricDefinitionEndpointWeightNotYetUsable)1013 TEST_F(WeightedRoundRobinTest, MetricDefinitionEndpointWeightNotYetUsable) {
1014 const auto* descriptor =
1015 GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1016 "grpc.lb.wrr.endpoint_weight_not_yet_usable");
1017 ASSERT_NE(descriptor, nullptr);
1018 EXPECT_EQ(descriptor->value_type,
1019 GlobalInstrumentsRegistry::ValueType::kUInt64);
1020 EXPECT_EQ(descriptor->instrument_type,
1021 GlobalInstrumentsRegistry::InstrumentType::kCounter);
1022 EXPECT_EQ(descriptor->enable_by_default, false);
1023 EXPECT_EQ(descriptor->name, "grpc.lb.wrr.endpoint_weight_not_yet_usable");
1024 EXPECT_EQ(descriptor->unit, "{endpoint}");
1025 EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1026 EXPECT_THAT(descriptor->optional_label_keys,
1027 ::testing::ElementsAre("grpc.lb.locality"));
1028 }
1029
TEST_F(WeightedRoundRobinTest,MetricDefinitionEndpointWeightStale)1030 TEST_F(WeightedRoundRobinTest, MetricDefinitionEndpointWeightStale) {
1031 const auto* descriptor =
1032 GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1033 "grpc.lb.wrr.endpoint_weight_stale");
1034 ASSERT_NE(descriptor, nullptr);
1035 EXPECT_EQ(descriptor->value_type,
1036 GlobalInstrumentsRegistry::ValueType::kUInt64);
1037 EXPECT_EQ(descriptor->instrument_type,
1038 GlobalInstrumentsRegistry::InstrumentType::kCounter);
1039 EXPECT_EQ(descriptor->enable_by_default, false);
1040 EXPECT_EQ(descriptor->name, "grpc.lb.wrr.endpoint_weight_stale");
1041 EXPECT_EQ(descriptor->unit, "{endpoint}");
1042 EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1043 EXPECT_THAT(descriptor->optional_label_keys,
1044 ::testing::ElementsAre("grpc.lb.locality"));
1045 }
1046
TEST_F(WeightedRoundRobinTest,MetricDefinitionEndpointWeights)1047 TEST_F(WeightedRoundRobinTest, MetricDefinitionEndpointWeights) {
1048 const auto* descriptor =
1049 GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1050 "grpc.lb.wrr.endpoint_weights");
1051 ASSERT_NE(descriptor, nullptr);
1052 EXPECT_EQ(descriptor->value_type,
1053 GlobalInstrumentsRegistry::ValueType::kDouble);
1054 EXPECT_EQ(descriptor->instrument_type,
1055 GlobalInstrumentsRegistry::InstrumentType::kHistogram);
1056 EXPECT_EQ(descriptor->enable_by_default, false);
1057 EXPECT_EQ(descriptor->name, "grpc.lb.wrr.endpoint_weights");
1058 EXPECT_EQ(descriptor->unit, "{weight}");
1059 EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1060 EXPECT_THAT(descriptor->optional_label_keys,
1061 ::testing::ElementsAre("grpc.lb.locality"));
1062 }
1063
TEST_F(WeightedRoundRobinTest,MetricValues)1064 TEST_F(WeightedRoundRobinTest, MetricValues) {
1065 const auto kRrFallback =
1066 GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
1067 "grpc.lb.wrr.rr_fallback")
1068 .value();
1069 const auto kEndpointWeightNotYetUsable =
1070 GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
1071 "grpc.lb.wrr.endpoint_weight_not_yet_usable")
1072 .value();
1073 const auto kEndpointWeightStale =
1074 GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
1075 "grpc.lb.wrr.endpoint_weight_stale")
1076 .value();
1077 const auto kEndpointWeights =
1078 GlobalInstrumentsRegistryTestPeer::FindDoubleHistogramHandleByName(
1079 "grpc.lb.wrr.endpoint_weights")
1080 .value();
1081 const absl::string_view kLabelValues[] = {target_};
1082 const absl::string_view kOptionalLabelValues[] = {kLocalityName};
1083 auto stats_plugin = std::make_shared<FakeStatsPlugin>(
1084 nullptr, /*use_disabled_by_default_metrics=*/true);
1085 stats_plugin_group_.AddStatsPlugin(stats_plugin, nullptr);
1086 // Send address list to LB policy.
1087 const std::array<absl::string_view, 3> kAddresses = {
1088 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
1089 auto picker = SendInitialUpdateAndWaitForConnected(
1090 kAddresses,
1091 ConfigBuilder().SetWeightExpirationPeriod(Duration::Seconds(2)));
1092 ASSERT_NE(picker, nullptr);
1093 // Address 0 gets weight 1, address 1 gets weight 3.
1094 // No utilization report from backend 2, so it gets the average weight 2.
1095 WaitForWeightedRoundRobinPicks(
1096 &picker,
1097 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
1098 /*qps=*/100.0, /*eps=*/0.0)},
1099 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
1100 /*qps=*/100.0, /*eps=*/0.0)}},
1101 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
1102 // Now have backend 2 report utilization the same as backend 1, so its
1103 // weight will be the same.
1104 WaitForWeightedRoundRobinPicks(
1105 &picker,
1106 {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
1107 /*qps=*/100.0, /*eps=*/0.0)},
1108 {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
1109 /*qps=*/100.0, /*eps=*/0.0)},
1110 {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
1111 /*qps=*/100.0, /*eps=*/0.0)}},
1112 {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
1113 // Check endpoint weights.
1114 EXPECT_THAT(stats_plugin->GetDoubleHistogramValue(
1115 kEndpointWeights, kLabelValues, kOptionalLabelValues),
1116 ::testing::Optional(::testing::ElementsAre(
1117 // Picker created for first endpoint becoming READY.
1118 0,
1119 // Picker update for second endpoint CONNECTING.
1120 0,
1121 // Picker update for second endpoint READY.
1122 0, 0,
1123 // Picker update for third endpoint CONNECTING.
1124 0, 0,
1125 // Picker update for third endpoint READY.
1126 0, 0, 0,
1127 // Weights for first two endpoints now start getting used.
1128 ::testing::DoubleNear(111.111115, 0.000001),
1129 ::testing::DoubleNear(333.333344, 0.000001), 0,
1130 // Weights for all endpoints are now used.
1131 ::testing::DoubleNear(111.111115, 0.000001),
1132 ::testing::DoubleNear(333.333344, 0.000001),
1133 ::testing::DoubleNear(333.333344, 0.000001))));
1134 // RR fallback should trigger for the first 5 updates above, because
1135 // there are less than two endpoints with valid weights.
1136 EXPECT_THAT(stats_plugin->GetUInt64CounterValue(kRrFallback, kLabelValues,
1137 kOptionalLabelValues),
1138 ::testing::Optional(5));
1139 // Endpoint-not-yet-usable will be incremented once for every endpoint
1140 // with weight 0 above.
1141 EXPECT_THAT(
1142 stats_plugin->GetUInt64CounterValue(kEndpointWeightNotYetUsable,
1143 kLabelValues, kOptionalLabelValues),
1144 ::testing::Optional(10));
1145 // There are no stale endpoint weights so far.
1146 EXPECT_THAT(stats_plugin->GetUInt64CounterValue(
1147 kEndpointWeightStale, kLabelValues, kOptionalLabelValues),
1148 ::testing::Optional(0));
1149 // Advance time to make weights stale and trigger the timer callback
1150 // to recompute weights.
1151 LOG(INFO) << "advancing time to trigger staleness...";
1152 IncrementTimeBy(Duration::Seconds(2));
1153 // Picker should now be falling back to round-robin.
1154 ExpectWeightedRoundRobinPicks(
1155 picker.get(), {},
1156 {{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}});
1157 // All three endpoints should now have stale weights.
1158 EXPECT_THAT(stats_plugin->GetUInt64CounterValue(
1159 kEndpointWeightStale, kLabelValues, kOptionalLabelValues),
1160 ::testing::Optional(3));
1161 }
1162
1163 } // namespace
1164 } // namespace testing
1165 } // namespace grpc_core
1166
main(int argc,char ** argv)1167 int main(int argc, char** argv) {
1168 ::testing::InitGoogleTest(&argc, argv);
1169 grpc::testing::TestEnvironment env(&argc, argv);
1170 return RUN_ALL_TESTS();
1171 }
1172