• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/grpc.h>
18 #include <grpc/support/json.h>
19 #include <inttypes.h>
20 #include <stddef.h>
21 
22 #include <algorithm>
23 #include <array>
24 #include <chrono>
25 #include <functional>
26 #include <map>
27 #include <memory>
28 #include <string>
29 #include <utility>
30 #include <vector>
31 
32 #include "absl/log/log.h"
33 #include "absl/status/status.h"
34 #include "absl/strings/str_join.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/time/clock.h"
37 #include "absl/time/time.h"
38 #include "absl/types/optional.h"
39 #include "absl/types/span.h"
40 #include "gtest/gtest.h"
41 #include "src/core/load_balancing/backend_metric_data.h"
42 #include "src/core/load_balancing/lb_policy.h"
43 #include "src/core/load_balancing/weighted_target/weighted_target.h"
44 #include "src/core/resolver/endpoint_addresses.h"
45 #include "src/core/util/debug_location.h"
46 #include "src/core/util/json/json.h"
47 #include "src/core/util/json/json_writer.h"
48 #include "src/core/util/orphanable.h"
49 #include "src/core/util/ref_counted_ptr.h"
50 #include "src/core/util/time.h"
51 #include "test/core/load_balancing/lb_policy_test_lib.h"
52 #include "test/core/test_util/fake_stats_plugin.h"
53 #include "test/core/test_util/test_config.h"
54 
55 namespace grpc_core {
56 namespace testing {
57 namespace {
58 
59 constexpr absl::string_view kLocalityName = "locality0";
60 
61 class WeightedRoundRobinTest : public LoadBalancingPolicyTest {
62  protected:
63   class ConfigBuilder {
64    public:
ConfigBuilder()65     ConfigBuilder() {
66       // Set blackout period to 1s to make tests fast and deterministic.
67       SetBlackoutPeriod(Duration::Seconds(1));
68     }
69 
SetEnableOobLoadReport(bool value)70     ConfigBuilder& SetEnableOobLoadReport(bool value) {
71       json_["enableOobLoadReport"] = Json::FromBool(value);
72       return *this;
73     }
SetOobReportingPeriod(Duration duration)74     ConfigBuilder& SetOobReportingPeriod(Duration duration) {
75       json_["oobReportingPeriod"] = Json::FromString(duration.ToJsonString());
76       return *this;
77     }
SetBlackoutPeriod(Duration duration)78     ConfigBuilder& SetBlackoutPeriod(Duration duration) {
79       json_["blackoutPeriod"] = Json::FromString(duration.ToJsonString());
80       return *this;
81     }
SetWeightUpdatePeriod(Duration duration)82     ConfigBuilder& SetWeightUpdatePeriod(Duration duration) {
83       json_["weightUpdatePeriod"] = Json::FromString(duration.ToJsonString());
84       return *this;
85     }
SetWeightExpirationPeriod(Duration duration)86     ConfigBuilder& SetWeightExpirationPeriod(Duration duration) {
87       json_["weightExpirationPeriod"] =
88           Json::FromString(duration.ToJsonString());
89       return *this;
90     }
SetErrorUtilizationPenalty(float value)91     ConfigBuilder& SetErrorUtilizationPenalty(float value) {
92       json_["errorUtilizationPenalty"] = Json::FromNumber(value);
93       return *this;
94     }
95 
Build()96     RefCountedPtr<LoadBalancingPolicy::Config> Build() {
97       Json config = Json::FromArray({Json::FromObject(
98           {{"weighted_round_robin", Json::FromObject(json_)}})});
99       LOG(INFO) << "CONFIG: " << JsonDump(config);
100       return MakeConfig(config);
101     }
102 
103    private:
104     Json::Object json_;
105   };
106 
WeightedRoundRobinTest()107   WeightedRoundRobinTest()
108       : LoadBalancingPolicyTest(
109             "weighted_round_robin",
110             ChannelArgs().Set(GRPC_ARG_LB_WEIGHTED_TARGET_CHILD,
111                               kLocalityName)) {}
112 
SetUp()113   void SetUp() override {
114     LoadBalancingPolicyTest::SetUp();
115     SetExpectedTimerDuration(std::chrono::seconds(1));
116   }
117 
118   RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
SendInitialUpdateAndWaitForConnected(absl::Span<const absl::string_view> addresses,ConfigBuilder config_builder=ConfigBuilder (),absl::Span<const absl::string_view> update_addresses={},SourceLocation location=SourceLocation ())119   SendInitialUpdateAndWaitForConnected(
120       absl::Span<const absl::string_view> addresses,
121       ConfigBuilder config_builder = ConfigBuilder(),
122       absl::Span<const absl::string_view> update_addresses = {},
123       SourceLocation location = SourceLocation()) {
124     if (update_addresses.empty()) update_addresses = addresses;
125     EXPECT_EQ(ApplyUpdate(BuildUpdate(update_addresses, config_builder.Build()),
126                           lb_policy()),
127               absl::OkStatus());
128     // RR should have created a subchannel for each address.
129     for (size_t i = 0; i < addresses.size(); ++i) {
130       auto* subchannel = FindSubchannel(addresses[i]);
131       EXPECT_NE(subchannel, nullptr)
132           << addresses[i] << " at " << location.file() << ":"
133           << location.line();
134       if (subchannel == nullptr) return nullptr;
135       // RR should ask each subchannel to connect.
136       EXPECT_TRUE(subchannel->ConnectionRequested())
137           << addresses[i] << " at " << location.file() << ":"
138           << location.line();
139       // The subchannel will connect successfully.
140       subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
141       // Expect the initial CONNECTNG update with a picker that queues.
142       if (i == 0) ExpectConnectingUpdate(location);
143       subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
144     }
145     return WaitForConnected(location);
146   }
147 
148   // Returns a map indicating the number of picks for each address.
MakePickMap(absl::Span<const std::string> picks)149   static std::map<absl::string_view, size_t> MakePickMap(
150       absl::Span<const std::string> picks) {
151     std::map<absl::string_view, size_t> actual;
152     for (const auto& address : picks) {
153       ++actual.emplace(address, 0).first->second;
154     }
155     return actual;
156   }
157 
158   // Returns a human-readable string representing the number of picks
159   // for each address.
PickMapString(const std::map<absl::string_view,size_t> & pick_map)160   static std::string PickMapString(
161       const std::map<absl::string_view, size_t>& pick_map) {
162     return absl::StrJoin(pick_map, ",", absl::PairFormatter("="));
163   }
164 
MakeBackendMetricData(double app_utilization,double qps,double eps,double cpu_utilization=0)165   static BackendMetricData MakeBackendMetricData(double app_utilization,
166                                                  double qps, double eps,
167                                                  double cpu_utilization = 0) {
168     BackendMetricData b;
169     b.cpu_utilization = cpu_utilization;
170     b.application_utilization = app_utilization;
171     b.qps = qps;
172     b.eps = eps;
173     return b;
174   }
175 
176   // Returns the number of picks we need to do to check the specified
177   // expectations.
NumPicksNeeded(const std::map<absl::string_view,size_t> & expected)178   static size_t NumPicksNeeded(const std::map<absl::string_view /*address*/,
179                                               size_t /*num_picks*/>& expected) {
180     size_t num_picks = 0;
181     for (const auto& p : expected) {
182       num_picks += p.second;
183     }
184     return num_picks;
185   }
186 
187   // For each pick in picks, reports the backend metrics to the LB policy.
ReportBackendMetrics(absl::Span<const std::string> picks,const std::vector<std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>> & subchannel_call_trackers,const std::map<absl::string_view,BackendMetricData> & backend_metrics)188   static void ReportBackendMetrics(
189       absl::Span<const std::string> picks,
190       const std::vector<
191           std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>>&
192           subchannel_call_trackers,
193       const std::map<absl::string_view /*address*/, BackendMetricData>&
194           backend_metrics) {
195     for (size_t i = 0; i < picks.size(); ++i) {
196       const auto& address = picks[i];
197       auto& subchannel_call_tracker = subchannel_call_trackers[i];
198       if (subchannel_call_tracker != nullptr) {
199         subchannel_call_tracker->Start();
200         absl::optional<BackendMetricData> backend_metric_data;
201         auto it = backend_metrics.find(address);
202         if (it != backend_metrics.end()) {
203           backend_metric_data.emplace();
204           backend_metric_data->qps = it->second.qps;
205           backend_metric_data->eps = it->second.eps;
206           backend_metric_data->cpu_utilization = it->second.cpu_utilization;
207           backend_metric_data->application_utilization =
208               it->second.application_utilization;
209         }
210         FakeMetadata metadata({});
211         FakeBackendMetricAccessor backend_metric_accessor(
212             std::move(backend_metric_data));
213         LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
214             address, absl::OkStatus(), &metadata, &backend_metric_accessor};
215         subchannel_call_tracker->Finish(args);
216       }
217     }
218   }
219 
ReportOobBackendMetrics(const std::map<absl::string_view,BackendMetricData> & backend_metrics)220   void ReportOobBackendMetrics(
221       const std::map<absl::string_view /*address*/, BackendMetricData>&
222           backend_metrics) {
223     for (const auto& p : backend_metrics) {
224       auto* subchannel = FindSubchannel(p.first);
225       BackendMetricData backend_metric_data;
226       backend_metric_data.qps = p.second.qps;
227       backend_metric_data.eps = p.second.eps;
228       backend_metric_data.cpu_utilization = p.second.cpu_utilization;
229       backend_metric_data.application_utilization =
230           p.second.application_utilization;
231       subchannel->SendOobBackendMetricReport(backend_metric_data);
232     }
233   }
234 
ExpectWeightedRoundRobinPicks(LoadBalancingPolicy::SubchannelPicker * picker,const std::map<absl::string_view,BackendMetricData> & backend_metrics,const std::map<absl::string_view,size_t> & expected,SourceLocation location=SourceLocation ())235   void ExpectWeightedRoundRobinPicks(
236       LoadBalancingPolicy::SubchannelPicker* picker,
237       const std::map<absl::string_view /*address*/, BackendMetricData>&
238           backend_metrics,
239       const std::map<absl::string_view /*address*/, size_t /*num_picks*/>&
240           expected,
241       SourceLocation location = SourceLocation()) {
242     std::vector<
243         std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>>
244         subchannel_call_trackers;
245     auto picks = GetCompletePicks(picker, NumPicksNeeded(expected), {},
246                                   &subchannel_call_trackers, location);
247     ASSERT_TRUE(picks.has_value()) << location.file() << ":" << location.line();
248     LOG(INFO) << "PICKS: " << absl::StrJoin(*picks, " ");
249     ReportBackendMetrics(*picks, subchannel_call_trackers, backend_metrics);
250     auto actual = MakePickMap(*picks);
251     LOG(INFO) << "Pick map: " << PickMapString(actual);
252     EXPECT_EQ(expected, actual)
253         << "Expected: " << PickMapString(expected)
254         << "\nActual: " << PickMapString(actual) << "\nat " << location.file()
255         << ":" << location.line();
256   }
257 
WaitForWeightedRoundRobinPicks(RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> * picker,const std::map<absl::string_view,BackendMetricData> & backend_metrics,std::map<absl::string_view,size_t> expected,absl::Duration timeout=absl::Seconds (5),bool run_timer_callbacks=true,SourceLocation location=SourceLocation ())258   bool WaitForWeightedRoundRobinPicks(
259       RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>* picker,
260       const std::map<absl::string_view /*address*/, BackendMetricData>&
261           backend_metrics,
262       std::map<absl::string_view /*address*/, size_t /*num_picks*/> expected,
263       absl::Duration timeout = absl::Seconds(5),
264       bool run_timer_callbacks = true,
265       SourceLocation location = SourceLocation()) {
266     LOG(INFO) << "==> WaitForWeightedRoundRobinPicks(): Expecting "
267               << PickMapString(expected);
268     size_t num_picks = NumPicksNeeded(expected);
269     absl::Time deadline = absl::Now() + timeout;
270     while (true) {
271       LOG(INFO) << "TOP OF LOOP";
272       // We need to see the expected weights for 3 consecutive passes, just
273       // to make sure we're consistently returning the right weights.
274       size_t num_passes = 0;
275       for (; num_passes < 3; ++num_passes) {
276         LOG(INFO) << "PASS " << num_passes << ": DOING PICKS";
277         std::vector<std::unique_ptr<
278             LoadBalancingPolicy::SubchannelCallTrackerInterface>>
279             subchannel_call_trackers;
280         auto picks = GetCompletePicks(picker->get(), num_picks, {},
281                                       &subchannel_call_trackers, location);
282         EXPECT_TRUE(picks.has_value())
283             << location.file() << ":" << location.line();
284         if (!picks.has_value()) return false;
285         LOG(INFO) << "PICKS: " << absl::StrJoin(*picks, " ");
286         // Report backend metrics to the LB policy.
287         ReportBackendMetrics(*picks, subchannel_call_trackers, backend_metrics);
288         // Check the observed weights.
289         auto actual = MakePickMap(*picks);
290         LOG(INFO) << "Pick map:\nExpected: " << PickMapString(expected)
291                   << "\n  Actual: " << PickMapString(actual);
292         if (expected != actual) {
293           // Make sure each address is one of the expected addresses,
294           // even if the weights aren't as expected.
295           for (const auto& address : *picks) {
296             bool found = expected.find(address) != expected.end();
297             EXPECT_TRUE(found)
298                 << "unexpected pick address " << address << " at "
299                 << location.file() << ":" << location.line();
300             if (!found) return false;
301           }
302           break;
303         }
304         // If there's another picker update in the queue, don't bother
305         // doing another pass, since we want to make sure we're using
306         // the latest picker.
307         if (!helper_->QueueEmpty()) break;
308       }
309       if (num_passes == 3) return true;
310       // If we're out of time, give up.
311       absl::Time now = absl::Now();
312       EXPECT_LT(now, deadline) << location.file() << ":" << location.line();
313       if (now >= deadline) return false;
314       // Get a new picker if there is an update; otherwise, wait for the
315       // weights to be recalculated.
316       if (!helper_->QueueEmpty()) {
317         *picker = ExpectState(GRPC_CHANNEL_READY, absl::OkStatus(), location);
318         EXPECT_NE(*picker, nullptr)
319             << location.file() << ":" << location.line();
320         if (*picker == nullptr) return false;
321       } else if (run_timer_callbacks) {
322         LOG(INFO) << "running timer callback...";
323         // Increment time and run any timer callbacks.
324         IncrementTimeBy(Duration::Seconds(1));
325       }
326     }
327   }
328 };
329 
TEST_F(WeightedRoundRobinTest,Basic)330 TEST_F(WeightedRoundRobinTest, Basic) {
331   // Send address list to LB policy.
332   const std::array<absl::string_view, 3> kAddresses = {
333       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
334   auto picker = SendInitialUpdateAndWaitForConnected(kAddresses);
335   ASSERT_NE(picker, nullptr);
336   // Address 0 gets weight 1, address 1 gets weight 3.
337   // No utilization report from backend 2, so it gets the average weight 2.
338   WaitForWeightedRoundRobinPicks(
339       &picker,
340       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
341                                              /*qps=*/100.0, /*eps=*/0.0)},
342        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
343                                              /*qps=*/100.0, /*eps=*/0.0)}},
344       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
345   // Now have backend 2 report utilization the same as backend 1, so its
346   // weight will be the same.
347   WaitForWeightedRoundRobinPicks(
348       &picker,
349       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
350                                              /*qps=*/100.0, /*eps=*/0.0)},
351        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
352                                              /*qps=*/100.0, /*eps=*/0.0)},
353        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
354                                              /*qps=*/100.0, /*eps=*/0.0)}},
355       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
356 }
357 
TEST_F(WeightedRoundRobinTest,CpuUtilWithNoAppUtil)358 TEST_F(WeightedRoundRobinTest, CpuUtilWithNoAppUtil) {
359   // Send address list to LB policy.
360   const std::array<absl::string_view, 3> kAddresses = {
361       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
362   auto picker = SendInitialUpdateAndWaitForConnected(kAddresses);
363   ASSERT_NE(picker, nullptr);
364   // Address 0 gets weight 1, address 1 gets weight 3.
365   // No utilization report from backend 2, so it gets the average weight 2.
366   WaitForWeightedRoundRobinPicks(
367       &picker,
368       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0,
369                                              /*qps=*/100.0, /*eps=*/0.0,
370                                              /*cpu_utilization=*/0.9)},
371        {kAddresses[1],
372         MakeBackendMetricData(/*app_utilization=*/0,
373                               /*qps=*/100.0,
374                               /*eps=*/0.0, /*cpu_utilization=*/0.3)}},
375       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
376   // Now have backend 2 report utilization the same as backend 1, so its
377   // weight will be the same.
378   WaitForWeightedRoundRobinPicks(
379       &picker,
380       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0,
381                                              /*qps=*/100.0, /*eps=*/0.0,
382                                              /*cpu_utilization=*/0.9)},
383        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0,
384                                              /*qps=*/100.0, /*eps=*/0.0,
385                                              /*cpu_utilization=*/0.3)},
386        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0,
387                                              /*qps=*/100.0, /*eps=*/0.0,
388                                              /*cpu_utilization=*/0.3)}},
389       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
390 }
391 
TEST_F(WeightedRoundRobinTest,AppUtilOverCpuUtil)392 TEST_F(WeightedRoundRobinTest, AppUtilOverCpuUtil) {
393   // Send address list to LB policy.
394   const std::array<absl::string_view, 3> kAddresses = {
395       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
396   auto picker = SendInitialUpdateAndWaitForConnected(kAddresses);
397   ASSERT_NE(picker, nullptr);
398   // Address 0 gets weight 1, address 1 gets weight 3.
399   // No utilization report from backend 2, so it gets the average weight 2.
400   WaitForWeightedRoundRobinPicks(
401       &picker,
402       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
403                                              /*qps=*/100.0, /*eps=*/0.0,
404                                              /*cpu_utilization=*/0.3)},
405        {kAddresses[1],
406         MakeBackendMetricData(/*app_utilization=*/0.3,
407                               /*qps=*/100.0,
408                               /*eps=*/0.0, /*cpu_utilization=*/0.4)}},
409       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
410   // Now have backend 2 report utilization the same as backend 1, so its
411   // weight will be the same.
412   WaitForWeightedRoundRobinPicks(
413       &picker,
414       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
415                                              /*qps=*/100.0, /*eps=*/0.0,
416                                              /*cpu_utilization=*/0.2)},
417        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
418                                              /*qps=*/100.0, /*eps=*/0.0,
419                                              /*cpu_utilization=*/0.6)},
420        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
421                                              /*qps=*/100.0, /*eps=*/0.0,
422                                              /*cpu_utilization=*/0.5)}},
423       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
424 }
425 
TEST_F(WeightedRoundRobinTest,Eps)426 TEST_F(WeightedRoundRobinTest, Eps) {
427   // Send address list to LB policy.
428   const std::array<absl::string_view, 3> kAddresses = {
429       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
430   auto picker = SendInitialUpdateAndWaitForConnected(
431       kAddresses, ConfigBuilder().SetErrorUtilizationPenalty(1.0));
432   ASSERT_NE(picker, nullptr);
433   // Expected weights: 1/(0.1+0.5) : 1/(0.1+0.2) : 1/(0.1+0.1) = 1:2:3
434   WaitForWeightedRoundRobinPicks(
435       &picker,
436       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.1,
437                                              /*qps=*/100.0, /*eps=*/50.0)},
438        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.1,
439                                              /*qps=*/100.0, /*eps=*/20.0)},
440        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.1,
441                                              /*qps=*/100.0, /*eps=*/10.0)}},
442       {{kAddresses[0], 1}, {kAddresses[1], 2}, {kAddresses[2], 3}});
443 }
444 
TEST_F(WeightedRoundRobinTest,IgnoresDuplicateAddresses)445 TEST_F(WeightedRoundRobinTest, IgnoresDuplicateAddresses) {
446   // Send address list to LB policy.
447   const std::array<absl::string_view, 3> kAddresses = {
448       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
449   const std::array<absl::string_view, 4> kUpdateAddresses = {
450       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443",
451       "ipv4:127.0.0.1:441"};
452   auto picker = SendInitialUpdateAndWaitForConnected(
453       kAddresses, ConfigBuilder(), kUpdateAddresses);
454   ASSERT_NE(picker, nullptr);
455   // Address 0 gets weight 1, address 1 gets weight 3.
456   // No utilization report from backend 2, so it gets the average weight 2.
457   WaitForWeightedRoundRobinPicks(
458       &picker,
459       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
460                                              /*qps=*/100.0, /*eps=*/0.0)},
461        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
462                                              /*qps=*/100.0, /*eps=*/0.0)}},
463       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
464   // Now have backend 2 report utilization the same as backend 1, so its
465   // weight will be the same.
466   WaitForWeightedRoundRobinPicks(
467       &picker,
468       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
469                                              /*qps=*/100.0, /*eps=*/0.0)},
470        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
471                                              /*qps=*/100.0, /*eps=*/0.0)},
472        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
473                                              /*qps=*/100.0, /*eps=*/0.0)}},
474       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
475 }
476 
TEST_F(WeightedRoundRobinTest,FallsBackToRoundRobinWithoutWeights)477 TEST_F(WeightedRoundRobinTest, FallsBackToRoundRobinWithoutWeights) {
478   // Send address list to LB policy.
479   const std::array<absl::string_view, 3> kAddresses = {
480       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
481   auto picker = SendInitialUpdateAndWaitForConnected(kAddresses);
482   ASSERT_NE(picker, nullptr);
483   // Backends do not report utilization, so all are weighted the same.
484   WaitForWeightedRoundRobinPicks(
485       &picker, {},
486       {{kAddresses[0], 1}, {kAddresses[1], 1}, {kAddresses[2], 1}});
487 }
488 
TEST_F(WeightedRoundRobinTest,OobReporting)489 TEST_F(WeightedRoundRobinTest, OobReporting) {
490   // Send address list to LB policy.
491   const std::array<absl::string_view, 3> kAddresses = {
492       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
493   auto picker = SendInitialUpdateAndWaitForConnected(
494       kAddresses, ConfigBuilder().SetEnableOobLoadReport(true));
495   ASSERT_NE(picker, nullptr);
496   // Address 0 gets weight 1, address 1 gets weight 3.
497   // No utilization report from backend 2, so it gets the average weight 2.
498   ReportOobBackendMetrics(
499       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
500                                              /*qps=*/100.0, /*eps=*/0.0)},
501        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
502                                              /*qps=*/100.0, /*eps=*/0.0)}});
503   WaitForWeightedRoundRobinPicks(
504       &picker, {},
505       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
506   // Now have backend 2 report utilization the same as backend 1, so its
507   // weight will be the same.
508   ReportOobBackendMetrics(
509       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
510                                              /*qps=*/100.0, /*eps=*/0.0)},
511        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
512                                              /*qps=*/100.0, /*eps=*/0.0)},
513        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
514                                              /*qps=*/100.0, /*eps=*/0.0)}});
515   WaitForWeightedRoundRobinPicks(
516       &picker, {},
517       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
518   // Verify that OOB reporting interval is the default.
519   for (const auto& address : kAddresses) {
520     auto* subchannel = FindSubchannel(address);
521     ASSERT_NE(subchannel, nullptr);
522     subchannel->CheckOobReportingPeriod(Duration::Seconds(10));
523   }
524 }
525 
TEST_F(WeightedRoundRobinTest,OobReportingCpuUtilWithNoAppUtil)526 TEST_F(WeightedRoundRobinTest, OobReportingCpuUtilWithNoAppUtil) {
527   // Send address list to LB policy.
528   const std::array<absl::string_view, 3> kAddresses = {
529       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
530   auto picker = SendInitialUpdateAndWaitForConnected(
531       kAddresses, ConfigBuilder().SetEnableOobLoadReport(true));
532   ASSERT_NE(picker, nullptr);
533   // Address 0 gets weight 1, address 1 gets weight 3.
534   // No utilization report from backend 2, so it gets the average weight 2.
535   ReportOobBackendMetrics(
536       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0,
537                                              /*qps=*/100.0, /*eps=*/0.0,
538                                              /*cpu_utilization=*/0.9)},
539        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0,
540                                              /*qps=*/100.0, /*eps=*/0.0,
541                                              /*cpu_utilization=*/0.3)}});
542   WaitForWeightedRoundRobinPicks(
543       &picker, {},
544       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
545   // Now have backend 2 report utilization the same as backend 1, so its
546   // weight will be the same.
547   ReportOobBackendMetrics(
548       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0,
549                                              /*qps=*/100.0, /*eps=*/0.0,
550                                              /*cpu_utilization=*/0.9)},
551        {kAddresses[1],
552         MakeBackendMetricData(/*app_utilization=*/0,
553                               /*qps=*/100.0,
554                               /*eps=*/0.0, /*cpu_utilization=*/0.3)},
555        {kAddresses[2],
556         MakeBackendMetricData(/*app_utilization=*/0,
557                               /*qps=*/100.0,
558                               /*eps=*/0.0, /*cpu_utilization=*/0.3)}});
559   WaitForWeightedRoundRobinPicks(
560       &picker, {},
561       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
562   // Verify that OOB reporting interval is the default.
563   for (const auto& address : kAddresses) {
564     auto* subchannel = FindSubchannel(address);
565     ASSERT_NE(subchannel, nullptr);
566     subchannel->CheckOobReportingPeriod(Duration::Seconds(10));
567   }
568 }
569 
TEST_F(WeightedRoundRobinTest,OobReportingAppUtilOverCpuUtil)570 TEST_F(WeightedRoundRobinTest, OobReportingAppUtilOverCpuUtil) {
571   // Send address list to LB policy.
572   const std::array<absl::string_view, 3> kAddresses = {
573       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
574   auto picker = SendInitialUpdateAndWaitForConnected(
575       kAddresses, ConfigBuilder().SetEnableOobLoadReport(true));
576   ASSERT_NE(picker, nullptr);
577   // Address 0 gets weight 1, address 1 gets weight 3.
578   // No utilization report from backend 2, so it gets the average weight 2.
579   ReportOobBackendMetrics(
580       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
581                                              /*qps=*/100.0, /*eps=*/0.0,
582                                              /*cpu_utilization=*/0.3)},
583        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
584                                              /*qps=*/100.0, /*eps=*/0.0,
585                                              /*cpu_utilization=*/0.4)}});
586   WaitForWeightedRoundRobinPicks(
587       &picker, {},
588       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
589   // Now have backend 2 report utilization the same as backend 1, so its
590   // weight will be the same.
591   ReportOobBackendMetrics(
592       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
593                                              /*qps=*/100.0, /*eps=*/0.0,
594                                              /*cpu_utilization=*/0.2)},
595        {kAddresses[1],
596         MakeBackendMetricData(/*app_utilization=*/0.3,
597                               /*qps=*/100.0,
598                               /*eps=*/0.0, /*cpu_utilization=*/0.6)},
599        {kAddresses[2],
600         MakeBackendMetricData(/*app_utilization=*/0.3,
601                               /*qps=*/100.0,
602                               /*eps=*/0.0, /*cpu_utilization=*/0.5)}});
603   WaitForWeightedRoundRobinPicks(
604       &picker, {},
605       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
606   // Verify that OOB reporting interval is the default.
607   for (const auto& address : kAddresses) {
608     auto* subchannel = FindSubchannel(address);
609     ASSERT_NE(subchannel, nullptr);
610     subchannel->CheckOobReportingPeriod(Duration::Seconds(10));
611   }
612 }
613 
TEST_F(WeightedRoundRobinTest,HonorsOobReportingPeriod)614 TEST_F(WeightedRoundRobinTest, HonorsOobReportingPeriod) {
615   const std::array<absl::string_view, 3> kAddresses = {
616       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
617   auto picker = SendInitialUpdateAndWaitForConnected(
618       kAddresses,
619       ConfigBuilder().SetEnableOobLoadReport(true).SetOobReportingPeriod(
620           Duration::Seconds(5)));
621   ASSERT_NE(picker, nullptr);
622   ReportOobBackendMetrics(
623       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
624                                              /*qps=*/100.0, /*eps=*/0.0)},
625        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
626                                              /*qps=*/100.0, /*eps=*/0.0)},
627        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
628                                              /*qps=*/100.0, /*eps=*/0.0)}});
629   WaitForWeightedRoundRobinPicks(
630       &picker, {},
631       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
632   for (const auto& address : kAddresses) {
633     auto* subchannel = FindSubchannel(address);
634     ASSERT_NE(subchannel, nullptr);
635     subchannel->CheckOobReportingPeriod(Duration::Seconds(5));
636   }
637 }
638 
TEST_F(WeightedRoundRobinTest,HonorsWeightUpdatePeriod)639 TEST_F(WeightedRoundRobinTest, HonorsWeightUpdatePeriod) {
640   const std::array<absl::string_view, 3> kAddresses = {
641       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
642   SetExpectedTimerDuration(std::chrono::seconds(2));
643   auto picker = SendInitialUpdateAndWaitForConnected(
644       kAddresses, ConfigBuilder().SetWeightUpdatePeriod(Duration::Seconds(2)));
645   ASSERT_NE(picker, nullptr);
646   WaitForWeightedRoundRobinPicks(
647       &picker,
648       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
649                                              /*qps=*/100.0, /*eps=*/0.0)},
650        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
651                                              /*qps=*/100.0, /*eps=*/0.0)},
652        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
653                                              /*qps=*/100.0, /*eps=*/0.0)}},
654       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
655 }
656 
TEST_F(WeightedRoundRobinTest,WeightUpdatePeriodLowerBound)657 TEST_F(WeightedRoundRobinTest, WeightUpdatePeriodLowerBound) {
658   const std::array<absl::string_view, 3> kAddresses = {
659       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
660   SetExpectedTimerDuration(std::chrono::milliseconds(100));
661   auto picker = SendInitialUpdateAndWaitForConnected(
662       kAddresses,
663       ConfigBuilder().SetWeightUpdatePeriod(Duration::Milliseconds(10)));
664   ASSERT_NE(picker, nullptr);
665   WaitForWeightedRoundRobinPicks(
666       &picker,
667       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
668                                              /*qps=*/100.0, /*eps=*/0.0)},
669        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
670                                              /*qps=*/100.0, /*eps=*/0.0)},
671        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
672                                              /*qps=*/100.0, /*eps=*/0.0)}},
673       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
674 }
675 
TEST_F(WeightedRoundRobinTest,WeightExpirationPeriod)676 TEST_F(WeightedRoundRobinTest, WeightExpirationPeriod) {
677   // Send address list to LB policy.
678   const std::array<absl::string_view, 3> kAddresses = {
679       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
680   auto picker = SendInitialUpdateAndWaitForConnected(
681       kAddresses,
682       ConfigBuilder().SetWeightExpirationPeriod(Duration::Seconds(2)));
683   ASSERT_NE(picker, nullptr);
684   // All backends report weights.
685   WaitForWeightedRoundRobinPicks(
686       &picker,
687       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
688                                              /*qps=*/100.0, /*eps=*/0.0)},
689        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
690                                              /*qps=*/100.0, /*eps=*/0.0)},
691        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
692                                              /*qps=*/100.0, /*eps=*/0.0)}},
693       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
694   // Advance time to make weights stale and trigger the timer callback
695   // to recompute weights.
696   IncrementTimeBy(Duration::Seconds(2));
697   // Picker should now be falling back to round-robin.
698   ExpectWeightedRoundRobinPicks(
699       picker.get(), {},
700       {{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}});
701 }
702 
TEST_F(WeightedRoundRobinTest,BlackoutPeriodAfterWeightExpiration)703 TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterWeightExpiration) {
704   // Send address list to LB policy.
705   const std::array<absl::string_view, 3> kAddresses = {
706       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
707   auto picker = SendInitialUpdateAndWaitForConnected(
708       kAddresses,
709       ConfigBuilder().SetWeightExpirationPeriod(Duration::Seconds(2)));
710   ASSERT_NE(picker, nullptr);
711   // All backends report weights.
712   WaitForWeightedRoundRobinPicks(
713       &picker,
714       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
715                                              /*qps=*/100.0, /*eps=*/0.0)},
716        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
717                                              /*qps=*/100.0, /*eps=*/0.0)},
718        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
719                                              /*qps=*/100.0, /*eps=*/0.0)}},
720       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
721   // Advance time to make weights stale and trigger the timer callback
722   // to recompute weights.
723   IncrementTimeBy(Duration::Seconds(2));
724   // Picker should now be falling back to round-robin.
725   ExpectWeightedRoundRobinPicks(
726       picker.get(), {},
727       {{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}});
728   // Now start sending weights again.  They should not be used yet,
729   // because we're still in the blackout period.
730   ExpectWeightedRoundRobinPicks(
731       picker.get(),
732       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.3,
733                                              /*qps=*/100.0, /*eps=*/0.0)},
734        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
735                                              /*qps=*/100.0, /*eps=*/0.0)},
736        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.9,
737                                              /*qps=*/100.0, /*eps=*/0.0)}},
738       {{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}});
739   // Advance time past the blackout period.  This should cause the
740   // weights to be used.
741   IncrementTimeBy(Duration::Seconds(1));
742   ExpectWeightedRoundRobinPicks(
743       picker.get(), {},
744       {{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 1}});
745 }
746 
TEST_F(WeightedRoundRobinTest,BlackoutPeriodAfterDisconnect)747 TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterDisconnect) {
748   // Send address list to LB policy.
749   const std::array<absl::string_view, 3> kAddresses = {
750       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
751   auto picker = SendInitialUpdateAndWaitForConnected(
752       kAddresses,
753       ConfigBuilder().SetWeightExpirationPeriod(Duration::Seconds(2)));
754   ASSERT_NE(picker, nullptr);
755   // All backends report weights.
756   WaitForWeightedRoundRobinPicks(
757       &picker,
758       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
759                                              /*qps=*/100.0, /*eps=*/0.0)},
760        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
761                                              /*qps=*/100.0, /*eps=*/0.0)},
762        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
763                                              /*qps=*/100.0, /*eps=*/0.0)}},
764       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
765   // Trigger disconnection and reconnection on address 2.
766   auto* subchannel = FindSubchannel(kAddresses[2]);
767   subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
768   ExpectReresolutionRequest();
769   EXPECT_TRUE(subchannel->ConnectionRequested());
770   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
771   subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
772   // Wait for the address to come back.  Note that we have not advanced
773   // time, so the address will still be in the blackout period,
774   // resulting in it being assigned the average weight.
775   picker = ExpectState(GRPC_CHANNEL_READY, absl::OkStatus());
776   WaitForWeightedRoundRobinPicks(
777       &picker,
778       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
779                                              /*qps=*/100.0, /*eps=*/0.0)},
780        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
781                                              /*qps=*/100.0, /*eps=*/0.0)},
782        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
783                                              /*qps=*/100.0, /*eps=*/0.0)}},
784       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
785   // Advance time to exceed the blackout period and trigger the timer
786   // callback to recompute weights.
787   IncrementTimeBy(Duration::Seconds(1));
788   ExpectWeightedRoundRobinPicks(
789       picker.get(),
790       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.3,
791                                              /*qps=*/100.0, /*eps=*/0.0)},
792        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
793                                              /*qps=*/100.0, /*eps=*/0.0)},
794        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.9,
795                                              /*qps=*/100.0, /*eps=*/0.0)}},
796       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
797 }
798 
TEST_F(WeightedRoundRobinTest,BlackoutPeriodDoesNotGetResetAfterUpdate)799 TEST_F(WeightedRoundRobinTest, BlackoutPeriodDoesNotGetResetAfterUpdate) {
800   // Send address list to LB policy.
801   const std::array<absl::string_view, 3> kAddresses = {
802       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
803   auto config_builder =
804       ConfigBuilder().SetWeightExpirationPeriod(Duration::Seconds(2));
805   auto picker =
806       SendInitialUpdateAndWaitForConnected(kAddresses, config_builder);
807   ASSERT_NE(picker, nullptr);
808   // All backends report weights.
809   WaitForWeightedRoundRobinPicks(
810       &picker,
811       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
812                                              /*qps=*/100.0, /*eps=*/0.0)},
813        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
814                                              /*qps=*/100.0, /*eps=*/0.0)},
815        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
816                                              /*qps=*/100.0, /*eps=*/0.0)}},
817       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
818   // Send a duplicate update with the same addresses and config.
819   EXPECT_EQ(
820       ApplyUpdate(BuildUpdate(kAddresses, config_builder.Build()), lb_policy()),
821       absl::OkStatus());
822   // Note that we have not advanced time, so if the update incorrectly
823   // triggers resetting the blackout period, none of the weights will
824   // actually be used.
825   picker = ExpectState(GRPC_CHANNEL_READY, absl::OkStatus());
826   WaitForWeightedRoundRobinPicks(
827       &picker,
828       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
829                                              /*qps=*/100.0, /*eps=*/0.0)},
830        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
831                                              /*qps=*/100.0, /*eps=*/0.0)},
832        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
833                                              /*qps=*/100.0, /*eps=*/0.0)}},
834       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}},
835       /*timeout=*/absl::Seconds(5), /*run_timer_callbacks=*/false);
836 }
837 
TEST_F(WeightedRoundRobinTest,ZeroErrorUtilPenalty)838 TEST_F(WeightedRoundRobinTest, ZeroErrorUtilPenalty) {
839   // Send address list to LB policy.
840   const std::array<absl::string_view, 3> kAddresses = {
841       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
842   auto picker = SendInitialUpdateAndWaitForConnected(
843       kAddresses, ConfigBuilder().SetErrorUtilizationPenalty(0.0));
844   ASSERT_NE(picker, nullptr);
845   // Expected weights: 1:1:1
846   WaitForWeightedRoundRobinPicks(
847       &picker,
848       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.1,
849                                              /*qps=*/100.0, /*eps=*/50.0)},
850        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.1,
851                                              /*qps=*/100.0, /*eps=*/20.0)},
852        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.1,
853                                              /*qps=*/100.0, /*eps=*/10.0)}},
854       {{kAddresses[0], 1}, {kAddresses[1], 1}, {kAddresses[2], 1}});
855 }
856 
TEST_F(WeightedRoundRobinTest,MultipleAddressesPerEndpoint)857 TEST_F(WeightedRoundRobinTest, MultipleAddressesPerEndpoint) {
858   // Can't use timer duration expectation here, because the Happy
859   // Eyeballs timer inside pick_first will use a different duration than
860   // the timer in WRR.
861   SetExpectedTimerDuration(absl::nullopt);
862   constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
863       "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
864   constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
865       "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
866   constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
867       "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
868   const std::array<EndpointAddresses, 3> kEndpoints = {
869       MakeEndpointAddresses(kEndpoint1Addresses),
870       MakeEndpointAddresses(kEndpoint2Addresses),
871       MakeEndpointAddresses(kEndpoint3Addresses)};
872   EXPECT_EQ(ApplyUpdate(BuildUpdate(kEndpoints, ConfigBuilder().Build()),
873                         lb_policy_.get()),
874             absl::OkStatus());
875   // WRR should have created a subchannel for each address.
876   auto* subchannel1_0 = FindSubchannel(kEndpoint1Addresses[0]);
877   ASSERT_NE(subchannel1_0, nullptr) << "Address: " << kEndpoint1Addresses[0];
878   auto* subchannel1_1 = FindSubchannel(kEndpoint1Addresses[1]);
879   ASSERT_NE(subchannel1_1, nullptr) << "Address: " << kEndpoint1Addresses[1];
880   auto* subchannel2_0 = FindSubchannel(kEndpoint2Addresses[0]);
881   ASSERT_NE(subchannel2_0, nullptr) << "Address: " << kEndpoint2Addresses[0];
882   auto* subchannel2_1 = FindSubchannel(kEndpoint2Addresses[1]);
883   ASSERT_NE(subchannel2_1, nullptr) << "Address: " << kEndpoint2Addresses[1];
884   auto* subchannel3_0 = FindSubchannel(kEndpoint3Addresses[0]);
885   ASSERT_NE(subchannel3_0, nullptr) << "Address: " << kEndpoint3Addresses[0];
886   auto* subchannel3_1 = FindSubchannel(kEndpoint3Addresses[1]);
887   ASSERT_NE(subchannel3_1, nullptr) << "Address: " << kEndpoint3Addresses[1];
888   // PF for each endpoint should try to connect to the first subchannel.
889   EXPECT_TRUE(subchannel1_0->ConnectionRequested());
890   EXPECT_FALSE(subchannel1_1->ConnectionRequested());
891   EXPECT_TRUE(subchannel2_0->ConnectionRequested());
892   EXPECT_FALSE(subchannel2_1->ConnectionRequested());
893   EXPECT_TRUE(subchannel3_0->ConnectionRequested());
894   EXPECT_FALSE(subchannel3_1->ConnectionRequested());
895   // In the first endpoint, the first subchannel reports CONNECTING.
896   // This causes WRR to report CONNECTING.
897   subchannel1_0->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
898   ExpectConnectingUpdate();
899   // In the second endpoint, the first subchannel reports CONNECTING.
900   subchannel2_0->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
901   // In the third endpoint, the first subchannel reports CONNECTING.
902   subchannel3_0->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
903   // In the first endpoint, the first subchannel fails to connect.
904   // This causes PF to start a connection attempt on the second subchannel.
905   subchannel1_0->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
906                                       absl::UnavailableError("ugh"));
907   EXPECT_TRUE(subchannel1_1->ConnectionRequested());
908   subchannel1_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
909   // In the second endpoint, the first subchannel becomes connected.
910   // This causes WRR to report READY with all RPCs going to a single address.
911   subchannel2_0->SetConnectivityState(GRPC_CHANNEL_READY);
912   auto picker = WaitForConnected();
913   ExpectRoundRobinPicks(picker.get(), {kEndpoint2Addresses[0]});
914   // In the third endpoint, the first subchannel becomes connected.
915   // This causes WRR to add it to the rotation.
916   subchannel3_0->SetConnectivityState(GRPC_CHANNEL_READY);
917   picker = WaitForRoundRobinListChange(
918       {kEndpoint2Addresses[0]},
919       {kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
920   // In the first endpoint, the second subchannel becomes connected.
921   // This causes WRR to add it to the rotation.
922   subchannel1_1->SetConnectivityState(GRPC_CHANNEL_READY);
923   picker = WaitForRoundRobinListChange(
924       {kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
925       {kEndpoint1Addresses[1], kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
926   // No more connection attempts triggered.
927   EXPECT_FALSE(subchannel1_0->ConnectionRequested());
928   EXPECT_FALSE(subchannel1_1->ConnectionRequested());
929   EXPECT_FALSE(subchannel2_0->ConnectionRequested());
930   EXPECT_FALSE(subchannel2_1->ConnectionRequested());
931   EXPECT_FALSE(subchannel3_0->ConnectionRequested());
932   EXPECT_FALSE(subchannel3_1->ConnectionRequested());
933   // Expected weights: 3:1:3
934   WaitForWeightedRoundRobinPicks(
935       &picker,
936       {{kEndpoint1Addresses[1],
937         MakeBackendMetricData(/*app_utilization=*/0.3, /*qps=*/100.0,
938                               /*eps=*/0.0)},
939        {kEndpoint2Addresses[0],
940         MakeBackendMetricData(/*app_utilization=*/0.9, /*qps=*/100.0,
941                               /*eps=*/0.0)},
942        {kEndpoint3Addresses[0],
943         MakeBackendMetricData(/*app_utilization=*/0.3, /*qps=*/100.0,
944                               /*eps=*/0.0)}},
945       {{kEndpoint1Addresses[1], 3},
946        {kEndpoint2Addresses[0], 1},
947        {kEndpoint3Addresses[0], 3}});
948   // First endpoint first subchannel finishes backoff, but this doesn't
949   // affect anything -- in fact, PF isn't even watching this subchannel
950   // anymore, since it's connected to the other one.  However, this
951   // ensures that the subchannel is in the right state when we try to
952   // reconnect below.
953   subchannel1_0->SetConnectivityState(GRPC_CHANNEL_IDLE);
954   EXPECT_FALSE(subchannel1_0->ConnectionRequested());
955   // Endpoint 1 switches to a different address.
956   ExpectEndpointAddressChange(
957       kEndpoint1Addresses, 1, 0,
958       // When the subchannel disconnects, WRR will remove the endpoint from
959       // the rotation.
960       [&]() {
961         picker = ExpectState(GRPC_CHANNEL_READY);
962         WaitForWeightedRoundRobinPicks(
963             &picker,
964             {{kEndpoint2Addresses[0],
965               MakeBackendMetricData(/*app_utilization=*/0.9, /*qps=*/100.0,
966                                     /*eps=*/0.0)},
967              {kEndpoint3Addresses[0],
968               MakeBackendMetricData(/*app_utilization=*/0.3, /*qps=*/100.0,
969                                     /*eps=*/0.0)}},
970             {{kEndpoint2Addresses[0], 1}, {kEndpoint3Addresses[0], 3}});
971       });
972   // When it connects to the new address, WRR adds it to the rotation.
973   WaitForWeightedRoundRobinPicks(
974       &picker,
975       {{kEndpoint1Addresses[0],
976         MakeBackendMetricData(/*app_utilization=*/0.3, /*qps=*/100.0,
977                               /*eps=*/0.0)},
978        {kEndpoint2Addresses[0],
979         MakeBackendMetricData(/*app_utilization=*/0.9, /*qps=*/100.0,
980                               /*eps=*/0.0)},
981        {kEndpoint3Addresses[0],
982         MakeBackendMetricData(/*app_utilization=*/0.3, /*qps=*/100.0,
983                               /*eps=*/0.0)}},
984       {{kEndpoint1Addresses[0], 3},
985        {kEndpoint2Addresses[0], 1},
986        {kEndpoint3Addresses[0], 3}});
987   // No more connection attempts triggered.
988   EXPECT_FALSE(subchannel1_0->ConnectionRequested());
989   EXPECT_FALSE(subchannel1_1->ConnectionRequested());
990   EXPECT_FALSE(subchannel2_0->ConnectionRequested());
991   EXPECT_FALSE(subchannel2_1->ConnectionRequested());
992   EXPECT_FALSE(subchannel3_0->ConnectionRequested());
993   EXPECT_FALSE(subchannel3_1->ConnectionRequested());
994 }
995 
TEST_F(WeightedRoundRobinTest,MetricDefinitionRrFallback)996 TEST_F(WeightedRoundRobinTest, MetricDefinitionRrFallback) {
997   const auto* descriptor =
998       GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
999           "grpc.lb.wrr.rr_fallback");
1000   ASSERT_NE(descriptor, nullptr);
1001   EXPECT_EQ(descriptor->value_type,
1002             GlobalInstrumentsRegistry::ValueType::kUInt64);
1003   EXPECT_EQ(descriptor->instrument_type,
1004             GlobalInstrumentsRegistry::InstrumentType::kCounter);
1005   EXPECT_EQ(descriptor->enable_by_default, false);
1006   EXPECT_EQ(descriptor->name, "grpc.lb.wrr.rr_fallback");
1007   EXPECT_EQ(descriptor->unit, "{update}");
1008   EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1009   EXPECT_THAT(descriptor->optional_label_keys,
1010               ::testing::ElementsAre("grpc.lb.locality"));
1011 }
1012 
TEST_F(WeightedRoundRobinTest,MetricDefinitionEndpointWeightNotYetUsable)1013 TEST_F(WeightedRoundRobinTest, MetricDefinitionEndpointWeightNotYetUsable) {
1014   const auto* descriptor =
1015       GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1016           "grpc.lb.wrr.endpoint_weight_not_yet_usable");
1017   ASSERT_NE(descriptor, nullptr);
1018   EXPECT_EQ(descriptor->value_type,
1019             GlobalInstrumentsRegistry::ValueType::kUInt64);
1020   EXPECT_EQ(descriptor->instrument_type,
1021             GlobalInstrumentsRegistry::InstrumentType::kCounter);
1022   EXPECT_EQ(descriptor->enable_by_default, false);
1023   EXPECT_EQ(descriptor->name, "grpc.lb.wrr.endpoint_weight_not_yet_usable");
1024   EXPECT_EQ(descriptor->unit, "{endpoint}");
1025   EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1026   EXPECT_THAT(descriptor->optional_label_keys,
1027               ::testing::ElementsAre("grpc.lb.locality"));
1028 }
1029 
TEST_F(WeightedRoundRobinTest,MetricDefinitionEndpointWeightStale)1030 TEST_F(WeightedRoundRobinTest, MetricDefinitionEndpointWeightStale) {
1031   const auto* descriptor =
1032       GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1033           "grpc.lb.wrr.endpoint_weight_stale");
1034   ASSERT_NE(descriptor, nullptr);
1035   EXPECT_EQ(descriptor->value_type,
1036             GlobalInstrumentsRegistry::ValueType::kUInt64);
1037   EXPECT_EQ(descriptor->instrument_type,
1038             GlobalInstrumentsRegistry::InstrumentType::kCounter);
1039   EXPECT_EQ(descriptor->enable_by_default, false);
1040   EXPECT_EQ(descriptor->name, "grpc.lb.wrr.endpoint_weight_stale");
1041   EXPECT_EQ(descriptor->unit, "{endpoint}");
1042   EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1043   EXPECT_THAT(descriptor->optional_label_keys,
1044               ::testing::ElementsAre("grpc.lb.locality"));
1045 }
1046 
TEST_F(WeightedRoundRobinTest,MetricDefinitionEndpointWeights)1047 TEST_F(WeightedRoundRobinTest, MetricDefinitionEndpointWeights) {
1048   const auto* descriptor =
1049       GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1050           "grpc.lb.wrr.endpoint_weights");
1051   ASSERT_NE(descriptor, nullptr);
1052   EXPECT_EQ(descriptor->value_type,
1053             GlobalInstrumentsRegistry::ValueType::kDouble);
1054   EXPECT_EQ(descriptor->instrument_type,
1055             GlobalInstrumentsRegistry::InstrumentType::kHistogram);
1056   EXPECT_EQ(descriptor->enable_by_default, false);
1057   EXPECT_EQ(descriptor->name, "grpc.lb.wrr.endpoint_weights");
1058   EXPECT_EQ(descriptor->unit, "{weight}");
1059   EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1060   EXPECT_THAT(descriptor->optional_label_keys,
1061               ::testing::ElementsAre("grpc.lb.locality"));
1062 }
1063 
TEST_F(WeightedRoundRobinTest,MetricValues)1064 TEST_F(WeightedRoundRobinTest, MetricValues) {
1065   const auto kRrFallback =
1066       GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
1067           "grpc.lb.wrr.rr_fallback")
1068           .value();
1069   const auto kEndpointWeightNotYetUsable =
1070       GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
1071           "grpc.lb.wrr.endpoint_weight_not_yet_usable")
1072           .value();
1073   const auto kEndpointWeightStale =
1074       GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
1075           "grpc.lb.wrr.endpoint_weight_stale")
1076           .value();
1077   const auto kEndpointWeights =
1078       GlobalInstrumentsRegistryTestPeer::FindDoubleHistogramHandleByName(
1079           "grpc.lb.wrr.endpoint_weights")
1080           .value();
1081   const absl::string_view kLabelValues[] = {target_};
1082   const absl::string_view kOptionalLabelValues[] = {kLocalityName};
1083   auto stats_plugin = std::make_shared<FakeStatsPlugin>(
1084       nullptr, /*use_disabled_by_default_metrics=*/true);
1085   stats_plugin_group_.AddStatsPlugin(stats_plugin, nullptr);
1086   // Send address list to LB policy.
1087   const std::array<absl::string_view, 3> kAddresses = {
1088       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
1089   auto picker = SendInitialUpdateAndWaitForConnected(
1090       kAddresses,
1091       ConfigBuilder().SetWeightExpirationPeriod(Duration::Seconds(2)));
1092   ASSERT_NE(picker, nullptr);
1093   // Address 0 gets weight 1, address 1 gets weight 3.
1094   // No utilization report from backend 2, so it gets the average weight 2.
1095   WaitForWeightedRoundRobinPicks(
1096       &picker,
1097       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
1098                                              /*qps=*/100.0, /*eps=*/0.0)},
1099        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
1100                                              /*qps=*/100.0, /*eps=*/0.0)}},
1101       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
1102   // Now have backend 2 report utilization the same as backend 1, so its
1103   // weight will be the same.
1104   WaitForWeightedRoundRobinPicks(
1105       &picker,
1106       {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
1107                                              /*qps=*/100.0, /*eps=*/0.0)},
1108        {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
1109                                              /*qps=*/100.0, /*eps=*/0.0)},
1110        {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
1111                                              /*qps=*/100.0, /*eps=*/0.0)}},
1112       {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
1113   // Check endpoint weights.
1114   EXPECT_THAT(stats_plugin->GetDoubleHistogramValue(
1115                   kEndpointWeights, kLabelValues, kOptionalLabelValues),
1116               ::testing::Optional(::testing::ElementsAre(
1117                   // Picker created for first endpoint becoming READY.
1118                   0,
1119                   // Picker update for second endpoint CONNECTING.
1120                   0,
1121                   // Picker update for second endpoint READY.
1122                   0, 0,
1123                   // Picker update for third endpoint CONNECTING.
1124                   0, 0,
1125                   // Picker update for third endpoint READY.
1126                   0, 0, 0,
1127                   // Weights for first two endpoints now start getting used.
1128                   ::testing::DoubleNear(111.111115, 0.000001),
1129                   ::testing::DoubleNear(333.333344, 0.000001), 0,
1130                   // Weights for all endpoints are now used.
1131                   ::testing::DoubleNear(111.111115, 0.000001),
1132                   ::testing::DoubleNear(333.333344, 0.000001),
1133                   ::testing::DoubleNear(333.333344, 0.000001))));
1134   // RR fallback should trigger for the first 5 updates above, because
1135   // there are less than two endpoints with valid weights.
1136   EXPECT_THAT(stats_plugin->GetUInt64CounterValue(kRrFallback, kLabelValues,
1137                                                   kOptionalLabelValues),
1138               ::testing::Optional(5));
1139   // Endpoint-not-yet-usable will be incremented once for every endpoint
1140   // with weight 0 above.
1141   EXPECT_THAT(
1142       stats_plugin->GetUInt64CounterValue(kEndpointWeightNotYetUsable,
1143                                           kLabelValues, kOptionalLabelValues),
1144       ::testing::Optional(10));
1145   // There are no stale endpoint weights so far.
1146   EXPECT_THAT(stats_plugin->GetUInt64CounterValue(
1147                   kEndpointWeightStale, kLabelValues, kOptionalLabelValues),
1148               ::testing::Optional(0));
1149   // Advance time to make weights stale and trigger the timer callback
1150   // to recompute weights.
1151   LOG(INFO) << "advancing time to trigger staleness...";
1152   IncrementTimeBy(Duration::Seconds(2));
1153   // Picker should now be falling back to round-robin.
1154   ExpectWeightedRoundRobinPicks(
1155       picker.get(), {},
1156       {{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}});
1157   // All three endpoints should now have stale weights.
1158   EXPECT_THAT(stats_plugin->GetUInt64CounterValue(
1159                   kEndpointWeightStale, kLabelValues, kOptionalLabelValues),
1160               ::testing::Optional(3));
1161 }
1162 
1163 }  // namespace
1164 }  // namespace testing
1165 }  // namespace grpc_core
1166 
main(int argc,char ** argv)1167 int main(int argc, char** argv) {
1168   ::testing::InitGoogleTest(&argc, argv);
1169   grpc::testing::TestEnvironment env(&argc, argv);
1170   return RUN_ALL_TESTS();
1171 }
1172