• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/grpc.h>
18 #include <grpc/support/json.h>
19 #include <stddef.h>
20 #include <stdint.h>
21 
22 #include <algorithm>
23 #include <array>
24 #include <chrono>
25 #include <memory>
26 #include <string>
27 #include <utility>
28 #include <vector>
29 
30 #include "absl/log/log.h"
31 #include "absl/status/status.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/types/optional.h"
34 #include "absl/types/span.h"
35 #include "gtest/gtest.h"
36 #include "src/core/load_balancing/backend_metric_data.h"
37 #include "src/core/load_balancing/lb_policy.h"
38 #include "src/core/resolver/endpoint_addresses.h"
39 #include "src/core/util/json/json.h"
40 #include "src/core/util/orphanable.h"
41 #include "src/core/util/ref_counted_ptr.h"
42 #include "src/core/util/time.h"
43 #include "test/core/load_balancing/lb_policy_test_lib.h"
44 #include "test/core/test_util/test_config.h"
45 
46 namespace grpc_core {
47 namespace testing {
48 namespace {
49 
50 class OutlierDetectionTest : public LoadBalancingPolicyTest {
51  protected:
52   class ConfigBuilder {
53    public:
ConfigBuilder()54     ConfigBuilder() {
55       SetChildPolicy(Json::Object{{"round_robin", Json::FromObject({})}});
56     }
57 
SetInterval(Duration duration)58     ConfigBuilder& SetInterval(Duration duration) {
59       json_["interval"] = Json::FromString(duration.ToJsonString());
60       return *this;
61     }
SetBaseEjectionTime(Duration duration)62     ConfigBuilder& SetBaseEjectionTime(Duration duration) {
63       json_["baseEjectionTime"] = Json::FromString(duration.ToJsonString());
64       return *this;
65     }
SetMaxEjectionTime(Duration duration)66     ConfigBuilder& SetMaxEjectionTime(Duration duration) {
67       json_["maxEjectionTime"] = Json::FromString(duration.ToJsonString());
68       return *this;
69     }
SetMaxEjectionPercent(uint32_t value)70     ConfigBuilder& SetMaxEjectionPercent(uint32_t value) {
71       json_["maxEjectionPercent"] = Json::FromNumber(value);
72       return *this;
73     }
SetChildPolicy(Json::Object child_policy)74     ConfigBuilder& SetChildPolicy(Json::Object child_policy) {
75       json_["childPolicy"] =
76           Json::FromArray({Json::FromObject(std::move(child_policy))});
77       return *this;
78     }
79 
SetSuccessRateStdevFactor(uint32_t value)80     ConfigBuilder& SetSuccessRateStdevFactor(uint32_t value) {
81       GetSuccessRate()["stdevFactor"] = Json::FromNumber(value);
82       return *this;
83     }
SetSuccessRateEnforcementPercentage(uint32_t value)84     ConfigBuilder& SetSuccessRateEnforcementPercentage(uint32_t value) {
85       GetSuccessRate()["enforcementPercentage"] = Json::FromNumber(value);
86       return *this;
87     }
SetSuccessRateMinHosts(uint32_t value)88     ConfigBuilder& SetSuccessRateMinHosts(uint32_t value) {
89       GetSuccessRate()["minimumHosts"] = Json::FromNumber(value);
90       return *this;
91     }
SetSuccessRateRequestVolume(uint32_t value)92     ConfigBuilder& SetSuccessRateRequestVolume(uint32_t value) {
93       GetSuccessRate()["requestVolume"] = Json::FromNumber(value);
94       return *this;
95     }
96 
SetFailurePercentageThreshold(uint32_t value)97     ConfigBuilder& SetFailurePercentageThreshold(uint32_t value) {
98       GetFailurePercentage()["threshold"] = Json::FromNumber(value);
99       return *this;
100     }
SetFailurePercentageEnforcementPercentage(uint32_t value)101     ConfigBuilder& SetFailurePercentageEnforcementPercentage(uint32_t value) {
102       GetFailurePercentage()["enforcementPercentage"] = Json::FromNumber(value);
103       return *this;
104     }
SetFailurePercentageMinimumHosts(uint32_t value)105     ConfigBuilder& SetFailurePercentageMinimumHosts(uint32_t value) {
106       GetFailurePercentage()["minimumHosts"] = Json::FromNumber(value);
107       return *this;
108     }
SetFailurePercentageRequestVolume(uint32_t value)109     ConfigBuilder& SetFailurePercentageRequestVolume(uint32_t value) {
110       GetFailurePercentage()["requestVolume"] = Json::FromNumber(value);
111       return *this;
112     }
113 
Build()114     RefCountedPtr<LoadBalancingPolicy::Config> Build() {
115       Json::Object fields = json_;
116       if (success_rate_.has_value()) {
117         fields["successRateEjection"] = Json::FromObject(*success_rate_);
118       }
119       if (failure_percentage_.has_value()) {
120         fields["failurePercentageEjection"] =
121             Json::FromObject(*failure_percentage_);
122       }
123       Json config = Json::FromArray(
124           {Json::FromObject({{"outlier_detection_experimental",
125                               Json::FromObject(std::move(fields))}})});
126       return MakeConfig(config);
127     }
128 
129    private:
GetSuccessRate()130     Json::Object& GetSuccessRate() {
131       if (!success_rate_.has_value()) success_rate_.emplace();
132       return *success_rate_;
133     }
134 
GetFailurePercentage()135     Json::Object& GetFailurePercentage() {
136       if (!failure_percentage_.has_value()) failure_percentage_.emplace();
137       return *failure_percentage_;
138     }
139 
140     Json::Object json_;
141     absl::optional<Json::Object> success_rate_;
142     absl::optional<Json::Object> failure_percentage_;
143   };
144 
OutlierDetectionTest()145   OutlierDetectionTest()
146       : LoadBalancingPolicyTest("outlier_detection_experimental") {}
147 
SetUp()148   void SetUp() override {
149     LoadBalancingPolicyTest::SetUp();
150     SetExpectedTimerDuration(std::chrono::seconds(10));
151   }
152 
DoPickWithFailedCall(LoadBalancingPolicy::SubchannelPicker * picker)153   absl::optional<std::string> DoPickWithFailedCall(
154       LoadBalancingPolicy::SubchannelPicker* picker) {
155     std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
156         subchannel_call_tracker;
157     auto address = ExpectPickComplete(picker, {}, {}, &subchannel_call_tracker);
158     if (address.has_value()) {
159       subchannel_call_tracker->Start();
160       FakeMetadata metadata({});
161       FakeBackendMetricAccessor backend_metric_accessor({});
162       LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
163           *address, absl::UnavailableError("uh oh"), &metadata,
164           &backend_metric_accessor};
165       subchannel_call_tracker->Finish(args);
166     }
167     return address;
168   }
169 };
170 
TEST_F(OutlierDetectionTest,Basic)171 TEST_F(OutlierDetectionTest, Basic) {
172   constexpr absl::string_view kAddressUri = "ipv4:127.0.0.1:443";
173   // Send an update containing one address.
174   absl::Status status = ApplyUpdate(
175       BuildUpdate({kAddressUri}, ConfigBuilder().Build()), lb_policy());
176   EXPECT_TRUE(status.ok()) << status;
177   // LB policy should have created a subchannel for the address.
178   auto* subchannel = FindSubchannel(kAddressUri);
179   ASSERT_NE(subchannel, nullptr);
180   // When the LB policy receives the subchannel's initial connectivity
181   // state notification (IDLE), it will request a connection.
182   EXPECT_TRUE(subchannel->ConnectionRequested());
183   // This causes the subchannel to start to connect, so it reports CONNECTING.
184   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
185   // LB policy should have reported CONNECTING state.
186   ExpectConnectingUpdate();
187   // When the subchannel becomes connected, it reports READY.
188   subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
189   // The LB policy will report CONNECTING some number of times (doesn't
190   // matter how many) and then report READY.
191   auto picker = WaitForConnected();
192   ASSERT_NE(picker, nullptr);
193   // Picker should return the same subchannel repeatedly.
194   for (size_t i = 0; i < 3; ++i) {
195     EXPECT_EQ(ExpectPickComplete(picker.get()), kAddressUri);
196   }
197 }
198 
TEST_F(OutlierDetectionTest,FailurePercentage)199 TEST_F(OutlierDetectionTest, FailurePercentage) {
200   constexpr std::array<absl::string_view, 3> kAddresses = {
201       "ipv4:127.0.0.1:440", "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"};
202   // Send initial update.
203   absl::Status status = ApplyUpdate(
204       BuildUpdate(kAddresses, ConfigBuilder()
205                                   .SetFailurePercentageThreshold(1)
206                                   .SetFailurePercentageMinimumHosts(1)
207                                   .SetFailurePercentageRequestVolume(1)
208                                   .SetMaxEjectionTime(Duration::Seconds(1))
209                                   .SetBaseEjectionTime(Duration::Seconds(1))
210                                   .Build()),
211       lb_policy());
212   EXPECT_TRUE(status.ok()) << status;
213   // Expect normal startup.
214   auto picker = ExpectRoundRobinStartup(kAddresses);
215   ASSERT_NE(picker, nullptr);
216   LOG(INFO) << "### RR startup complete";
217   // Do a pick and report a failed call.
218   auto address = DoPickWithFailedCall(picker.get());
219   ASSERT_TRUE(address.has_value());
220   LOG(INFO) << "### failed RPC on " << *address;
221   // Advance time and run the timer callback to trigger ejection.
222   IncrementTimeBy(Duration::Seconds(10));
223   LOG(INFO) << "### ejection complete";
224   // Expect a picker update.
225   std::vector<absl::string_view> remaining_addresses;
226   for (const auto& addr : kAddresses) {
227     if (addr != *address) remaining_addresses.push_back(addr);
228   }
229   WaitForRoundRobinListChange(kAddresses, remaining_addresses);
230   // Advance time and run the timer callback to trigger un-ejection.
231   IncrementTimeBy(Duration::Seconds(10));
232   LOG(INFO) << "### un-ejection complete";
233   // Expect a picker update.
234   WaitForRoundRobinListChange(remaining_addresses, kAddresses);
235 }
236 
TEST_F(OutlierDetectionTest,MultipleAddressesPerEndpoint)237 TEST_F(OutlierDetectionTest, MultipleAddressesPerEndpoint) {
238   // Can't use timer duration expectation here, because the Happy
239   // Eyeballs timer inside pick_first will use a different duration than
240   // the timer in outlier_detection.
241   SetExpectedTimerDuration(absl::nullopt);
242   constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
243       "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
244   constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
245       "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
246   constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
247       "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
248   const std::array<EndpointAddresses, 3> kEndpoints = {
249       MakeEndpointAddresses(kEndpoint1Addresses),
250       MakeEndpointAddresses(kEndpoint2Addresses),
251       MakeEndpointAddresses(kEndpoint3Addresses)};
252   // Send initial update.
253   absl::Status status = ApplyUpdate(
254       BuildUpdate(kEndpoints, ConfigBuilder()
255                                   .SetFailurePercentageThreshold(1)
256                                   .SetFailurePercentageMinimumHosts(1)
257                                   .SetFailurePercentageRequestVolume(1)
258                                   .SetMaxEjectionTime(Duration::Seconds(1))
259                                   .SetBaseEjectionTime(Duration::Seconds(1))
260                                   .Build()),
261       lb_policy_.get());
262   EXPECT_TRUE(status.ok()) << status;
263   // Expect normal startup.
264   auto picker = ExpectRoundRobinStartup(kEndpoints);
265   ASSERT_NE(picker, nullptr);
266   LOG(INFO) << "### RR startup complete";
267   // Do a pick and report a failed call.
268   auto address = DoPickWithFailedCall(picker.get());
269   ASSERT_TRUE(address.has_value());
270   LOG(INFO) << "### failed RPC on " << *address;
271   // Based on the address that the failed call went to, we determine
272   // which addresses to use in the subsequent steps.
273   absl::Span<const absl::string_view> ejected_endpoint_addresses;
274   absl::Span<const absl::string_view> sentinel_endpoint_addresses;
275   absl::string_view unmodified_endpoint_address;
276   std::vector<absl::string_view> final_addresses;
277   if (kEndpoint1Addresses[0] == *address) {
278     ejected_endpoint_addresses = kEndpoint1Addresses;
279     sentinel_endpoint_addresses = kEndpoint2Addresses;
280     unmodified_endpoint_address = kEndpoint3Addresses[0];
281     final_addresses = {kEndpoint1Addresses[1], kEndpoint2Addresses[1],
282                        kEndpoint3Addresses[0]};
283   } else if (kEndpoint2Addresses[0] == *address) {
284     ejected_endpoint_addresses = kEndpoint2Addresses;
285     sentinel_endpoint_addresses = kEndpoint1Addresses;
286     unmodified_endpoint_address = kEndpoint3Addresses[0];
287     final_addresses = {kEndpoint1Addresses[1], kEndpoint2Addresses[1],
288                        kEndpoint3Addresses[0]};
289   } else {
290     ejected_endpoint_addresses = kEndpoint3Addresses;
291     sentinel_endpoint_addresses = kEndpoint1Addresses;
292     unmodified_endpoint_address = kEndpoint2Addresses[0];
293     final_addresses = {kEndpoint1Addresses[1], kEndpoint2Addresses[0],
294                        kEndpoint3Addresses[1]};
295   }
296   // Advance time and run the timer callback to trigger ejection.
297   IncrementTimeBy(Duration::Seconds(10));
298   LOG(INFO) << "### ejection complete";
299   // Expect a picker that removes the ejected address.
300   WaitForRoundRobinListChange(
301       {kEndpoint1Addresses[0], kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
302       {sentinel_endpoint_addresses[0], unmodified_endpoint_address});
303   LOG(INFO) << "### ejected endpoint removed";
304   // Cause the connection to the ejected endpoint to fail, and then
305   // have it reconnect to a different address.  The endpoint is still
306   // ejected, so the new address should not be used.
307   ExpectEndpointAddressChange(ejected_endpoint_addresses, 0, 1, nullptr);
308   // Need to drain the picker updates before calling
309   // ExpectEndpointAddressChange() again, since that will expect a
310   // re-resolution request in the queue.
311   DrainRoundRobinPickerUpdates(
312       {sentinel_endpoint_addresses[0], unmodified_endpoint_address});
313   LOG(INFO) << "### done changing address of ejected endpoint";
314   // Do the same thing for the sentinel endpoint, so that we
315   // know that the LB policy has seen the address change for the ejected
316   // endpoint.
317   ExpectEndpointAddressChange(sentinel_endpoint_addresses, 0, 1, [&]() {
318     WaitForRoundRobinListChange(
319         {sentinel_endpoint_addresses[0], unmodified_endpoint_address},
320         {unmodified_endpoint_address});
321   });
322   WaitForRoundRobinListChange(
323       {unmodified_endpoint_address},
324       {sentinel_endpoint_addresses[1], unmodified_endpoint_address});
325   LOG(INFO) << "### done changing address of ejected endpoint";
326   // Advance time and run the timer callback to trigger un-ejection.
327   IncrementTimeBy(Duration::Seconds(10));
328   LOG(INFO) << "### un-ejection complete";
329   // The ejected endpoint should come back using the new address.
330   WaitForRoundRobinListChange(
331       {sentinel_endpoint_addresses[1], unmodified_endpoint_address},
332       final_addresses);
333 }
334 
TEST_F(OutlierDetectionTest,EjectionStateResetsWhenEndpointAddressesChange)335 TEST_F(OutlierDetectionTest, EjectionStateResetsWhenEndpointAddressesChange) {
336   // Can't use timer duration expectation here, because the Happy
337   // Eyeballs timer inside pick_first will use a different duration than
338   // the timer in outlier_detection.
339   SetExpectedTimerDuration(absl::nullopt);
340   constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
341       "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
342   constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
343       "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
344   constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
345       "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
346   const std::array<EndpointAddresses, 3> kEndpoints = {
347       MakeEndpointAddresses(kEndpoint1Addresses),
348       MakeEndpointAddresses(kEndpoint2Addresses),
349       MakeEndpointAddresses(kEndpoint3Addresses)};
350   auto kConfig = ConfigBuilder()
351                      .SetFailurePercentageThreshold(1)
352                      .SetFailurePercentageMinimumHosts(1)
353                      .SetFailurePercentageRequestVolume(1)
354                      .SetMaxEjectionTime(Duration::Seconds(1))
355                      .SetBaseEjectionTime(Duration::Seconds(1))
356                      .Build();
357   // Send initial update.
358   absl::Status status =
359       ApplyUpdate(BuildUpdate(kEndpoints, kConfig), lb_policy_.get());
360   EXPECT_TRUE(status.ok()) << status;
361   // Expect normal startup.
362   auto picker = ExpectRoundRobinStartup(kEndpoints);
363   ASSERT_NE(picker, nullptr);
364   LOG(INFO) << "### RR startup complete";
365   // Do a pick and report a failed call.
366   auto ejected_address = DoPickWithFailedCall(picker.get());
367   ASSERT_TRUE(ejected_address.has_value());
368   LOG(INFO) << "### failed RPC on " << *ejected_address;
369   // Based on the address that the failed call went to, we determine
370   // which addresses to use in the subsequent steps.
371   std::vector<absl::string_view> expected_round_robin_while_ejected;
372   std::vector<EndpointAddresses> new_endpoints;
373   if (kEndpoint1Addresses[0] == *ejected_address) {
374     expected_round_robin_while_ejected = {kEndpoint2Addresses[0],
375                                           kEndpoint3Addresses[0]};
376     new_endpoints = {MakeEndpointAddresses({kEndpoint1Addresses[0]}),
377                      MakeEndpointAddresses(kEndpoint2Addresses),
378                      MakeEndpointAddresses(kEndpoint3Addresses)};
379   } else if (kEndpoint2Addresses[0] == *ejected_address) {
380     expected_round_robin_while_ejected = {kEndpoint1Addresses[0],
381                                           kEndpoint3Addresses[0]};
382     new_endpoints = {MakeEndpointAddresses(kEndpoint1Addresses),
383                      MakeEndpointAddresses({kEndpoint2Addresses[0]}),
384                      MakeEndpointAddresses(kEndpoint3Addresses)};
385   } else {
386     expected_round_robin_while_ejected = {kEndpoint1Addresses[0],
387                                           kEndpoint2Addresses[0]};
388     new_endpoints = {MakeEndpointAddresses(kEndpoint1Addresses),
389                      MakeEndpointAddresses(kEndpoint2Addresses),
390                      MakeEndpointAddresses({kEndpoint3Addresses[0]})};
391   }
392   // Advance time and run the timer callback to trigger ejection.
393   IncrementTimeBy(Duration::Seconds(10));
394   LOG(INFO) << "### ejection complete";
395   // Expect a picker that removes the ejected address.
396   WaitForRoundRobinListChange(
397       {kEndpoint1Addresses[0], kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
398       expected_round_robin_while_ejected);
399   LOG(INFO) << "### ejected endpoint removed";
400   // Send an update that removes the other address from the ejected endpoint.
401   status = ApplyUpdate(BuildUpdate(new_endpoints, kConfig), lb_policy_.get());
402   EXPECT_TRUE(status.ok()) << status;
403   // This should cause the address to start getting used again, since
404   // it's now associated with a different endpoint.
405   WaitForRoundRobinListChange(
406       expected_round_robin_while_ejected,
407       {kEndpoint1Addresses[0], kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
408 }
409 
TEST_F(OutlierDetectionTest,DoesNotWorkWithPickFirst)410 TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) {
411   // Can't use timer duration expectation here, because the Happy
412   // Eyeballs timer inside pick_first will use a different duration than
413   // the timer in outlier_detection.
414   SetExpectedTimerDuration(absl::nullopt);
415   constexpr std::array<absl::string_view, 3> kAddresses = {
416       "ipv4:127.0.0.1:440", "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"};
417   // Send initial update.
418   absl::Status status = ApplyUpdate(
419       BuildUpdate(kAddresses,
420                   ConfigBuilder()
421                       .SetFailurePercentageThreshold(1)
422                       .SetFailurePercentageMinimumHosts(1)
423                       .SetFailurePercentageRequestVolume(1)
424                       .SetChildPolicy({{"pick_first", Json::FromObject({})}})
425                       .Build()),
426       lb_policy());
427   EXPECT_TRUE(status.ok()) << status;
428   // LB policy should have created a subchannel for the first address.
429   auto* subchannel = FindSubchannel(kAddresses[0]);
430   ASSERT_NE(subchannel, nullptr);
431   // When the LB policy receives the subchannel's initial connectivity
432   // state notification (IDLE), it will request a connection.
433   EXPECT_TRUE(subchannel->ConnectionRequested());
434   // This causes the subchannel to start to connect, so it reports CONNECTING.
435   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
436   // LB policy should have reported CONNECTING state.
437   ExpectConnectingUpdate();
438   // When the subchannel becomes connected, it reports READY.
439   subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
440   // The LB policy will report CONNECTING some number of times (doesn't
441   // matter how many) and then report READY.
442   auto picker = WaitForConnected();
443   ASSERT_NE(picker, nullptr);
444   // Picker should return the same subchannel repeatedly.
445   for (size_t i = 0; i < 3; ++i) {
446     EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
447   }
448   LOG(INFO) << "### PF startup complete";
449   // Now have an RPC to that subchannel fail.
450   auto address = DoPickWithFailedCall(picker.get());
451   ASSERT_TRUE(address.has_value());
452   LOG(INFO) << "### failed RPC on " << *address;
453   // Advance time and run the timer callback to trigger ejection.
454   IncrementTimeBy(Duration::Seconds(10));
455   LOG(INFO) << "### ejection timer pass complete";
456   // Subchannel should not be ejected.
457   ExpectQueueEmpty();
458   // Subchannel should not see a reconnection request.
459   EXPECT_FALSE(subchannel->ConnectionRequested());
460 }
461 
462 }  // namespace
463 }  // namespace testing
464 }  // namespace grpc_core
465 
main(int argc,char ** argv)466 int main(int argc, char** argv) {
467   ::testing::InitGoogleTest(&argc, argv);
468   grpc::testing::TestEnvironment env(&argc, argv);
469   return RUN_ALL_TESTS();
470 }
471