1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/grpc.h>
18 #include <grpc/support/json.h>
19 #include <stddef.h>
20 #include <stdint.h>
21
22 #include <algorithm>
23 #include <array>
24 #include <chrono>
25 #include <memory>
26 #include <string>
27 #include <utility>
28 #include <vector>
29
30 #include "absl/log/log.h"
31 #include "absl/status/status.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/types/optional.h"
34 #include "absl/types/span.h"
35 #include "gtest/gtest.h"
36 #include "src/core/load_balancing/backend_metric_data.h"
37 #include "src/core/load_balancing/lb_policy.h"
38 #include "src/core/resolver/endpoint_addresses.h"
39 #include "src/core/util/json/json.h"
40 #include "src/core/util/orphanable.h"
41 #include "src/core/util/ref_counted_ptr.h"
42 #include "src/core/util/time.h"
43 #include "test/core/load_balancing/lb_policy_test_lib.h"
44 #include "test/core/test_util/test_config.h"
45
46 namespace grpc_core {
47 namespace testing {
48 namespace {
49
50 class OutlierDetectionTest : public LoadBalancingPolicyTest {
51 protected:
52 class ConfigBuilder {
53 public:
ConfigBuilder()54 ConfigBuilder() {
55 SetChildPolicy(Json::Object{{"round_robin", Json::FromObject({})}});
56 }
57
SetInterval(Duration duration)58 ConfigBuilder& SetInterval(Duration duration) {
59 json_["interval"] = Json::FromString(duration.ToJsonString());
60 return *this;
61 }
SetBaseEjectionTime(Duration duration)62 ConfigBuilder& SetBaseEjectionTime(Duration duration) {
63 json_["baseEjectionTime"] = Json::FromString(duration.ToJsonString());
64 return *this;
65 }
SetMaxEjectionTime(Duration duration)66 ConfigBuilder& SetMaxEjectionTime(Duration duration) {
67 json_["maxEjectionTime"] = Json::FromString(duration.ToJsonString());
68 return *this;
69 }
SetMaxEjectionPercent(uint32_t value)70 ConfigBuilder& SetMaxEjectionPercent(uint32_t value) {
71 json_["maxEjectionPercent"] = Json::FromNumber(value);
72 return *this;
73 }
SetChildPolicy(Json::Object child_policy)74 ConfigBuilder& SetChildPolicy(Json::Object child_policy) {
75 json_["childPolicy"] =
76 Json::FromArray({Json::FromObject(std::move(child_policy))});
77 return *this;
78 }
79
SetSuccessRateStdevFactor(uint32_t value)80 ConfigBuilder& SetSuccessRateStdevFactor(uint32_t value) {
81 GetSuccessRate()["stdevFactor"] = Json::FromNumber(value);
82 return *this;
83 }
SetSuccessRateEnforcementPercentage(uint32_t value)84 ConfigBuilder& SetSuccessRateEnforcementPercentage(uint32_t value) {
85 GetSuccessRate()["enforcementPercentage"] = Json::FromNumber(value);
86 return *this;
87 }
SetSuccessRateMinHosts(uint32_t value)88 ConfigBuilder& SetSuccessRateMinHosts(uint32_t value) {
89 GetSuccessRate()["minimumHosts"] = Json::FromNumber(value);
90 return *this;
91 }
SetSuccessRateRequestVolume(uint32_t value)92 ConfigBuilder& SetSuccessRateRequestVolume(uint32_t value) {
93 GetSuccessRate()["requestVolume"] = Json::FromNumber(value);
94 return *this;
95 }
96
SetFailurePercentageThreshold(uint32_t value)97 ConfigBuilder& SetFailurePercentageThreshold(uint32_t value) {
98 GetFailurePercentage()["threshold"] = Json::FromNumber(value);
99 return *this;
100 }
SetFailurePercentageEnforcementPercentage(uint32_t value)101 ConfigBuilder& SetFailurePercentageEnforcementPercentage(uint32_t value) {
102 GetFailurePercentage()["enforcementPercentage"] = Json::FromNumber(value);
103 return *this;
104 }
SetFailurePercentageMinimumHosts(uint32_t value)105 ConfigBuilder& SetFailurePercentageMinimumHosts(uint32_t value) {
106 GetFailurePercentage()["minimumHosts"] = Json::FromNumber(value);
107 return *this;
108 }
SetFailurePercentageRequestVolume(uint32_t value)109 ConfigBuilder& SetFailurePercentageRequestVolume(uint32_t value) {
110 GetFailurePercentage()["requestVolume"] = Json::FromNumber(value);
111 return *this;
112 }
113
Build()114 RefCountedPtr<LoadBalancingPolicy::Config> Build() {
115 Json::Object fields = json_;
116 if (success_rate_.has_value()) {
117 fields["successRateEjection"] = Json::FromObject(*success_rate_);
118 }
119 if (failure_percentage_.has_value()) {
120 fields["failurePercentageEjection"] =
121 Json::FromObject(*failure_percentage_);
122 }
123 Json config = Json::FromArray(
124 {Json::FromObject({{"outlier_detection_experimental",
125 Json::FromObject(std::move(fields))}})});
126 return MakeConfig(config);
127 }
128
129 private:
GetSuccessRate()130 Json::Object& GetSuccessRate() {
131 if (!success_rate_.has_value()) success_rate_.emplace();
132 return *success_rate_;
133 }
134
GetFailurePercentage()135 Json::Object& GetFailurePercentage() {
136 if (!failure_percentage_.has_value()) failure_percentage_.emplace();
137 return *failure_percentage_;
138 }
139
140 Json::Object json_;
141 absl::optional<Json::Object> success_rate_;
142 absl::optional<Json::Object> failure_percentage_;
143 };
144
OutlierDetectionTest()145 OutlierDetectionTest()
146 : LoadBalancingPolicyTest("outlier_detection_experimental") {}
147
SetUp()148 void SetUp() override {
149 LoadBalancingPolicyTest::SetUp();
150 SetExpectedTimerDuration(std::chrono::seconds(10));
151 }
152
DoPickWithFailedCall(LoadBalancingPolicy::SubchannelPicker * picker)153 absl::optional<std::string> DoPickWithFailedCall(
154 LoadBalancingPolicy::SubchannelPicker* picker) {
155 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
156 subchannel_call_tracker;
157 auto address = ExpectPickComplete(picker, {}, {}, &subchannel_call_tracker);
158 if (address.has_value()) {
159 subchannel_call_tracker->Start();
160 FakeMetadata metadata({});
161 FakeBackendMetricAccessor backend_metric_accessor({});
162 LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
163 *address, absl::UnavailableError("uh oh"), &metadata,
164 &backend_metric_accessor};
165 subchannel_call_tracker->Finish(args);
166 }
167 return address;
168 }
169 };
170
TEST_F(OutlierDetectionTest,Basic)171 TEST_F(OutlierDetectionTest, Basic) {
172 constexpr absl::string_view kAddressUri = "ipv4:127.0.0.1:443";
173 // Send an update containing one address.
174 absl::Status status = ApplyUpdate(
175 BuildUpdate({kAddressUri}, ConfigBuilder().Build()), lb_policy());
176 EXPECT_TRUE(status.ok()) << status;
177 // LB policy should have created a subchannel for the address.
178 auto* subchannel = FindSubchannel(kAddressUri);
179 ASSERT_NE(subchannel, nullptr);
180 // When the LB policy receives the subchannel's initial connectivity
181 // state notification (IDLE), it will request a connection.
182 EXPECT_TRUE(subchannel->ConnectionRequested());
183 // This causes the subchannel to start to connect, so it reports CONNECTING.
184 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
185 // LB policy should have reported CONNECTING state.
186 ExpectConnectingUpdate();
187 // When the subchannel becomes connected, it reports READY.
188 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
189 // The LB policy will report CONNECTING some number of times (doesn't
190 // matter how many) and then report READY.
191 auto picker = WaitForConnected();
192 ASSERT_NE(picker, nullptr);
193 // Picker should return the same subchannel repeatedly.
194 for (size_t i = 0; i < 3; ++i) {
195 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddressUri);
196 }
197 }
198
TEST_F(OutlierDetectionTest,FailurePercentage)199 TEST_F(OutlierDetectionTest, FailurePercentage) {
200 constexpr std::array<absl::string_view, 3> kAddresses = {
201 "ipv4:127.0.0.1:440", "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"};
202 // Send initial update.
203 absl::Status status = ApplyUpdate(
204 BuildUpdate(kAddresses, ConfigBuilder()
205 .SetFailurePercentageThreshold(1)
206 .SetFailurePercentageMinimumHosts(1)
207 .SetFailurePercentageRequestVolume(1)
208 .SetMaxEjectionTime(Duration::Seconds(1))
209 .SetBaseEjectionTime(Duration::Seconds(1))
210 .Build()),
211 lb_policy());
212 EXPECT_TRUE(status.ok()) << status;
213 // Expect normal startup.
214 auto picker = ExpectRoundRobinStartup(kAddresses);
215 ASSERT_NE(picker, nullptr);
216 LOG(INFO) << "### RR startup complete";
217 // Do a pick and report a failed call.
218 auto address = DoPickWithFailedCall(picker.get());
219 ASSERT_TRUE(address.has_value());
220 LOG(INFO) << "### failed RPC on " << *address;
221 // Advance time and run the timer callback to trigger ejection.
222 IncrementTimeBy(Duration::Seconds(10));
223 LOG(INFO) << "### ejection complete";
224 // Expect a picker update.
225 std::vector<absl::string_view> remaining_addresses;
226 for (const auto& addr : kAddresses) {
227 if (addr != *address) remaining_addresses.push_back(addr);
228 }
229 WaitForRoundRobinListChange(kAddresses, remaining_addresses);
230 // Advance time and run the timer callback to trigger un-ejection.
231 IncrementTimeBy(Duration::Seconds(10));
232 LOG(INFO) << "### un-ejection complete";
233 // Expect a picker update.
234 WaitForRoundRobinListChange(remaining_addresses, kAddresses);
235 }
236
TEST_F(OutlierDetectionTest,MultipleAddressesPerEndpoint)237 TEST_F(OutlierDetectionTest, MultipleAddressesPerEndpoint) {
238 // Can't use timer duration expectation here, because the Happy
239 // Eyeballs timer inside pick_first will use a different duration than
240 // the timer in outlier_detection.
241 SetExpectedTimerDuration(absl::nullopt);
242 constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
243 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
244 constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
245 "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
246 constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
247 "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
248 const std::array<EndpointAddresses, 3> kEndpoints = {
249 MakeEndpointAddresses(kEndpoint1Addresses),
250 MakeEndpointAddresses(kEndpoint2Addresses),
251 MakeEndpointAddresses(kEndpoint3Addresses)};
252 // Send initial update.
253 absl::Status status = ApplyUpdate(
254 BuildUpdate(kEndpoints, ConfigBuilder()
255 .SetFailurePercentageThreshold(1)
256 .SetFailurePercentageMinimumHosts(1)
257 .SetFailurePercentageRequestVolume(1)
258 .SetMaxEjectionTime(Duration::Seconds(1))
259 .SetBaseEjectionTime(Duration::Seconds(1))
260 .Build()),
261 lb_policy_.get());
262 EXPECT_TRUE(status.ok()) << status;
263 // Expect normal startup.
264 auto picker = ExpectRoundRobinStartup(kEndpoints);
265 ASSERT_NE(picker, nullptr);
266 LOG(INFO) << "### RR startup complete";
267 // Do a pick and report a failed call.
268 auto address = DoPickWithFailedCall(picker.get());
269 ASSERT_TRUE(address.has_value());
270 LOG(INFO) << "### failed RPC on " << *address;
271 // Based on the address that the failed call went to, we determine
272 // which addresses to use in the subsequent steps.
273 absl::Span<const absl::string_view> ejected_endpoint_addresses;
274 absl::Span<const absl::string_view> sentinel_endpoint_addresses;
275 absl::string_view unmodified_endpoint_address;
276 std::vector<absl::string_view> final_addresses;
277 if (kEndpoint1Addresses[0] == *address) {
278 ejected_endpoint_addresses = kEndpoint1Addresses;
279 sentinel_endpoint_addresses = kEndpoint2Addresses;
280 unmodified_endpoint_address = kEndpoint3Addresses[0];
281 final_addresses = {kEndpoint1Addresses[1], kEndpoint2Addresses[1],
282 kEndpoint3Addresses[0]};
283 } else if (kEndpoint2Addresses[0] == *address) {
284 ejected_endpoint_addresses = kEndpoint2Addresses;
285 sentinel_endpoint_addresses = kEndpoint1Addresses;
286 unmodified_endpoint_address = kEndpoint3Addresses[0];
287 final_addresses = {kEndpoint1Addresses[1], kEndpoint2Addresses[1],
288 kEndpoint3Addresses[0]};
289 } else {
290 ejected_endpoint_addresses = kEndpoint3Addresses;
291 sentinel_endpoint_addresses = kEndpoint1Addresses;
292 unmodified_endpoint_address = kEndpoint2Addresses[0];
293 final_addresses = {kEndpoint1Addresses[1], kEndpoint2Addresses[0],
294 kEndpoint3Addresses[1]};
295 }
296 // Advance time and run the timer callback to trigger ejection.
297 IncrementTimeBy(Duration::Seconds(10));
298 LOG(INFO) << "### ejection complete";
299 // Expect a picker that removes the ejected address.
300 WaitForRoundRobinListChange(
301 {kEndpoint1Addresses[0], kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
302 {sentinel_endpoint_addresses[0], unmodified_endpoint_address});
303 LOG(INFO) << "### ejected endpoint removed";
304 // Cause the connection to the ejected endpoint to fail, and then
305 // have it reconnect to a different address. The endpoint is still
306 // ejected, so the new address should not be used.
307 ExpectEndpointAddressChange(ejected_endpoint_addresses, 0, 1, nullptr);
308 // Need to drain the picker updates before calling
309 // ExpectEndpointAddressChange() again, since that will expect a
310 // re-resolution request in the queue.
311 DrainRoundRobinPickerUpdates(
312 {sentinel_endpoint_addresses[0], unmodified_endpoint_address});
313 LOG(INFO) << "### done changing address of ejected endpoint";
314 // Do the same thing for the sentinel endpoint, so that we
315 // know that the LB policy has seen the address change for the ejected
316 // endpoint.
317 ExpectEndpointAddressChange(sentinel_endpoint_addresses, 0, 1, [&]() {
318 WaitForRoundRobinListChange(
319 {sentinel_endpoint_addresses[0], unmodified_endpoint_address},
320 {unmodified_endpoint_address});
321 });
322 WaitForRoundRobinListChange(
323 {unmodified_endpoint_address},
324 {sentinel_endpoint_addresses[1], unmodified_endpoint_address});
325 LOG(INFO) << "### done changing address of ejected endpoint";
326 // Advance time and run the timer callback to trigger un-ejection.
327 IncrementTimeBy(Duration::Seconds(10));
328 LOG(INFO) << "### un-ejection complete";
329 // The ejected endpoint should come back using the new address.
330 WaitForRoundRobinListChange(
331 {sentinel_endpoint_addresses[1], unmodified_endpoint_address},
332 final_addresses);
333 }
334
TEST_F(OutlierDetectionTest,EjectionStateResetsWhenEndpointAddressesChange)335 TEST_F(OutlierDetectionTest, EjectionStateResetsWhenEndpointAddressesChange) {
336 // Can't use timer duration expectation here, because the Happy
337 // Eyeballs timer inside pick_first will use a different duration than
338 // the timer in outlier_detection.
339 SetExpectedTimerDuration(absl::nullopt);
340 constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
341 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
342 constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
343 "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
344 constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
345 "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
346 const std::array<EndpointAddresses, 3> kEndpoints = {
347 MakeEndpointAddresses(kEndpoint1Addresses),
348 MakeEndpointAddresses(kEndpoint2Addresses),
349 MakeEndpointAddresses(kEndpoint3Addresses)};
350 auto kConfig = ConfigBuilder()
351 .SetFailurePercentageThreshold(1)
352 .SetFailurePercentageMinimumHosts(1)
353 .SetFailurePercentageRequestVolume(1)
354 .SetMaxEjectionTime(Duration::Seconds(1))
355 .SetBaseEjectionTime(Duration::Seconds(1))
356 .Build();
357 // Send initial update.
358 absl::Status status =
359 ApplyUpdate(BuildUpdate(kEndpoints, kConfig), lb_policy_.get());
360 EXPECT_TRUE(status.ok()) << status;
361 // Expect normal startup.
362 auto picker = ExpectRoundRobinStartup(kEndpoints);
363 ASSERT_NE(picker, nullptr);
364 LOG(INFO) << "### RR startup complete";
365 // Do a pick and report a failed call.
366 auto ejected_address = DoPickWithFailedCall(picker.get());
367 ASSERT_TRUE(ejected_address.has_value());
368 LOG(INFO) << "### failed RPC on " << *ejected_address;
369 // Based on the address that the failed call went to, we determine
370 // which addresses to use in the subsequent steps.
371 std::vector<absl::string_view> expected_round_robin_while_ejected;
372 std::vector<EndpointAddresses> new_endpoints;
373 if (kEndpoint1Addresses[0] == *ejected_address) {
374 expected_round_robin_while_ejected = {kEndpoint2Addresses[0],
375 kEndpoint3Addresses[0]};
376 new_endpoints = {MakeEndpointAddresses({kEndpoint1Addresses[0]}),
377 MakeEndpointAddresses(kEndpoint2Addresses),
378 MakeEndpointAddresses(kEndpoint3Addresses)};
379 } else if (kEndpoint2Addresses[0] == *ejected_address) {
380 expected_round_robin_while_ejected = {kEndpoint1Addresses[0],
381 kEndpoint3Addresses[0]};
382 new_endpoints = {MakeEndpointAddresses(kEndpoint1Addresses),
383 MakeEndpointAddresses({kEndpoint2Addresses[0]}),
384 MakeEndpointAddresses(kEndpoint3Addresses)};
385 } else {
386 expected_round_robin_while_ejected = {kEndpoint1Addresses[0],
387 kEndpoint2Addresses[0]};
388 new_endpoints = {MakeEndpointAddresses(kEndpoint1Addresses),
389 MakeEndpointAddresses(kEndpoint2Addresses),
390 MakeEndpointAddresses({kEndpoint3Addresses[0]})};
391 }
392 // Advance time and run the timer callback to trigger ejection.
393 IncrementTimeBy(Duration::Seconds(10));
394 LOG(INFO) << "### ejection complete";
395 // Expect a picker that removes the ejected address.
396 WaitForRoundRobinListChange(
397 {kEndpoint1Addresses[0], kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
398 expected_round_robin_while_ejected);
399 LOG(INFO) << "### ejected endpoint removed";
400 // Send an update that removes the other address from the ejected endpoint.
401 status = ApplyUpdate(BuildUpdate(new_endpoints, kConfig), lb_policy_.get());
402 EXPECT_TRUE(status.ok()) << status;
403 // This should cause the address to start getting used again, since
404 // it's now associated with a different endpoint.
405 WaitForRoundRobinListChange(
406 expected_round_robin_while_ejected,
407 {kEndpoint1Addresses[0], kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
408 }
409
TEST_F(OutlierDetectionTest,DoesNotWorkWithPickFirst)410 TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) {
411 // Can't use timer duration expectation here, because the Happy
412 // Eyeballs timer inside pick_first will use a different duration than
413 // the timer in outlier_detection.
414 SetExpectedTimerDuration(absl::nullopt);
415 constexpr std::array<absl::string_view, 3> kAddresses = {
416 "ipv4:127.0.0.1:440", "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"};
417 // Send initial update.
418 absl::Status status = ApplyUpdate(
419 BuildUpdate(kAddresses,
420 ConfigBuilder()
421 .SetFailurePercentageThreshold(1)
422 .SetFailurePercentageMinimumHosts(1)
423 .SetFailurePercentageRequestVolume(1)
424 .SetChildPolicy({{"pick_first", Json::FromObject({})}})
425 .Build()),
426 lb_policy());
427 EXPECT_TRUE(status.ok()) << status;
428 // LB policy should have created a subchannel for the first address.
429 auto* subchannel = FindSubchannel(kAddresses[0]);
430 ASSERT_NE(subchannel, nullptr);
431 // When the LB policy receives the subchannel's initial connectivity
432 // state notification (IDLE), it will request a connection.
433 EXPECT_TRUE(subchannel->ConnectionRequested());
434 // This causes the subchannel to start to connect, so it reports CONNECTING.
435 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
436 // LB policy should have reported CONNECTING state.
437 ExpectConnectingUpdate();
438 // When the subchannel becomes connected, it reports READY.
439 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
440 // The LB policy will report CONNECTING some number of times (doesn't
441 // matter how many) and then report READY.
442 auto picker = WaitForConnected();
443 ASSERT_NE(picker, nullptr);
444 // Picker should return the same subchannel repeatedly.
445 for (size_t i = 0; i < 3; ++i) {
446 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
447 }
448 LOG(INFO) << "### PF startup complete";
449 // Now have an RPC to that subchannel fail.
450 auto address = DoPickWithFailedCall(picker.get());
451 ASSERT_TRUE(address.has_value());
452 LOG(INFO) << "### failed RPC on " << *address;
453 // Advance time and run the timer callback to trigger ejection.
454 IncrementTimeBy(Duration::Seconds(10));
455 LOG(INFO) << "### ejection timer pass complete";
456 // Subchannel should not be ejected.
457 ExpectQueueEmpty();
458 // Subchannel should not see a reconnection request.
459 EXPECT_FALSE(subchannel->ConnectionRequested());
460 }
461
462 } // namespace
463 } // namespace testing
464 } // namespace grpc_core
465
main(int argc,char ** argv)466 int main(int argc, char** argv) {
467 ::testing::InitGoogleTest(&argc, argv);
468 grpc::testing::TestEnvironment env(&argc, argv);
469 return RUN_ALL_TESTS();
470 }
471