1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/load_balancing/pick_first/pick_first.h"
18
19 #include <grpc/grpc.h>
20 #include <grpc/support/json.h>
21 #include <stddef.h>
22
23 #include <algorithm>
24 #include <array>
25 #include <chrono>
26 #include <map>
27 #include <memory>
28 #include <utility>
29 #include <vector>
30
31 #include "absl/status/status.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/synchronization/notification.h"
34 #include "absl/types/optional.h"
35 #include "absl/types/span.h"
36 #include "gmock/gmock.h"
37 #include "gtest/gtest.h"
38 #include "src/core/lib/experiments/experiments.h"
39 #include "src/core/lib/iomgr/exec_ctx.h"
40 #include "src/core/load_balancing/lb_policy.h"
41 #include "src/core/resolver/endpoint_addresses.h"
42 #include "src/core/telemetry/metrics.h"
43 #include "src/core/util/debug_location.h"
44 #include "src/core/util/json/json.h"
45 #include "src/core/util/orphanable.h"
46 #include "src/core/util/ref_counted_ptr.h"
47 #include "src/core/util/time.h"
48 #include "src/core/util/work_serializer.h"
49 #include "test/core/load_balancing/lb_policy_test_lib.h"
50 #include "test/core/test_util/fake_stats_plugin.h"
51 #include "test/core/test_util/test_config.h"
52
53 namespace grpc_core {
54 namespace testing {
55 namespace {
56
57 class PickFirstTest : public LoadBalancingPolicyTest {
58 protected:
PickFirstTest(ChannelArgs channel_args=ChannelArgs ())59 explicit PickFirstTest(ChannelArgs channel_args = ChannelArgs())
60 : LoadBalancingPolicyTest("pick_first", channel_args) {}
61
SetUp()62 void SetUp() override {
63 LoadBalancingPolicyTest::SetUp();
64 SetExpectedTimerDuration(std::chrono::milliseconds(250));
65 }
66
MakePickFirstConfig(absl::optional<bool> shuffle_address_list=absl::nullopt)67 static RefCountedPtr<LoadBalancingPolicy::Config> MakePickFirstConfig(
68 absl::optional<bool> shuffle_address_list = absl::nullopt) {
69 return MakeConfig(Json::FromArray({Json::FromObject(
70 {{"pick_first",
71 shuffle_address_list.has_value()
72 ? Json::FromObject({{"shuffleAddressList",
73 Json::FromBool(*shuffle_address_list)}})
74 : Json::FromObject({})}})}));
75 }
76
77 // Gets order the addresses are being picked. Return type is void so
78 // assertions can be used.
GetOrderAddressesArePicked(absl::Span<const absl::string_view> addresses,std::vector<absl::string_view> * out_address_order)79 void GetOrderAddressesArePicked(
80 absl::Span<const absl::string_view> addresses,
81 std::vector<absl::string_view>* out_address_order) {
82 out_address_order->clear();
83 ExitIdle();
84 // Construct a map of subchannel to address.
85 // We will remove entries as each subchannel starts to connect.
86 std::map<SubchannelState*, absl::string_view> subchannels;
87 for (auto address : addresses) {
88 auto* subchannel = FindSubchannel(address);
89 ASSERT_NE(subchannel, nullptr);
90 subchannels.emplace(subchannel, address);
91 }
92 // Now process each subchannel in the order in which pick_first tries it.
93 while (!subchannels.empty()) {
94 // Find the subchannel that is being attempted.
95 SubchannelState* subchannel = nullptr;
96 for (const auto& p : subchannels) {
97 if (p.first->ConnectionRequested()) {
98 out_address_order->push_back(p.second);
99 subchannel = p.first;
100 break;
101 }
102 }
103 ASSERT_NE(subchannel, nullptr);
104 // The subchannel reports CONNECTING.
105 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
106 // If this is the first subchannel being attempted, expect a CONNECTING
107 // update.
108 if (subchannels.size() == addresses.size()) {
109 ExpectConnectingUpdate();
110 }
111 if (subchannels.size() > 1) {
112 // Not the last subchannel in the list. Connection attempt should fail.
113 subchannel->SetConnectivityState(
114 GRPC_CHANNEL_TRANSIENT_FAILURE,
115 absl::UnavailableError("failed to connect"));
116 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
117 } else {
118 // Last subchannel in the list. Connection attempt should succeed.
119 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
120 auto picker = WaitForConnected();
121 ASSERT_NE(picker, nullptr);
122 EXPECT_EQ(ExpectPickComplete(picker.get()), out_address_order->back());
123 // Then it should become disconnected.
124 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
125 ExpectReresolutionRequest();
126 // We would normally call ExpectStateAndQueueingPicker() here instead of
127 // just ExpectState(). However, calling the picker would also trigger
128 // exiting IDLE, which we don't want here, because if the test is going
129 // to send an address list update and call GetOrderAddressesArePicked()
130 // again, we don't want to trigger a connection attempt on any
131 // subchannel until after that next address list update is processed.
132 ExpectState(GRPC_CHANNEL_IDLE);
133 }
134 // Remove the subchannel from the map.
135 subchannels.erase(subchannel);
136 }
137 }
138 };
139
TEST_F(PickFirstTest,FirstAddressWorks)140 TEST_F(PickFirstTest, FirstAddressWorks) {
141 // Send an update containing two addresses.
142 constexpr std::array<absl::string_view, 2> kAddresses = {
143 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
144 absl::Status status = ApplyUpdate(
145 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
146 EXPECT_TRUE(status.ok()) << status;
147 // LB policy should have created a subchannel for both addresses.
148 auto* subchannel = FindSubchannel(kAddresses[0]);
149 ASSERT_NE(subchannel, nullptr);
150 auto* subchannel2 = FindSubchannel(kAddresses[1]);
151 ASSERT_NE(subchannel2, nullptr);
152 // When the LB policy receives the first subchannel's initial connectivity
153 // state notification (IDLE), it will request a connection.
154 EXPECT_TRUE(subchannel->ConnectionRequested());
155 // This causes the subchannel to start to connect, so it reports CONNECTING.
156 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
157 // LB policy should have reported CONNECTING state.
158 ExpectConnectingUpdate();
159 // The second subchannel should not be connecting.
160 EXPECT_FALSE(subchannel2->ConnectionRequested());
161 // When the first subchannel becomes connected, it reports READY.
162 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
163 // The LB policy will report CONNECTING some number of times (doesn't
164 // matter how many) and then report READY.
165 auto picker = WaitForConnected();
166 ASSERT_NE(picker, nullptr);
167 // Picker should return the same subchannel repeatedly.
168 for (size_t i = 0; i < 3; ++i) {
169 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
170 }
171 }
172
TEST_F(PickFirstTest,FirstAddressFails)173 TEST_F(PickFirstTest, FirstAddressFails) {
174 // Send an update containing two addresses.
175 constexpr std::array<absl::string_view, 2> kAddresses = {
176 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
177 absl::Status status = ApplyUpdate(
178 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
179 EXPECT_TRUE(status.ok()) << status;
180 // LB policy should have created a subchannel for both addresses.
181 auto* subchannel = FindSubchannel(kAddresses[0]);
182 ASSERT_NE(subchannel, nullptr);
183 auto* subchannel2 = FindSubchannel(kAddresses[1]);
184 ASSERT_NE(subchannel2, nullptr);
185 // When the LB policy receives the first subchannel's initial connectivity
186 // state notification (IDLE), it will request a connection.
187 EXPECT_TRUE(subchannel->ConnectionRequested());
188 // This causes the subchannel to start to connect, so it reports
189 // CONNECTING.
190 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
191 // LB policy should have reported CONNECTING state.
192 ExpectConnectingUpdate();
193 // The second subchannel should not be connecting.
194 EXPECT_FALSE(subchannel2->ConnectionRequested());
195 // The first subchannel's connection attempt fails.
196 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
197 absl::UnavailableError("failed to connect"));
198 // The LB policy will start a connection attempt on the second subchannel.
199 EXPECT_TRUE(subchannel2->ConnectionRequested());
200 // This causes the subchannel to start to connect, so it reports
201 // CONNECTING.
202 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
203 // The connection attempt succeeds.
204 subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
205 // The LB policy will report CONNECTING some number of times (doesn't
206 // matter how many) and then report READY.
207 auto picker = WaitForConnected();
208 ASSERT_NE(picker, nullptr);
209 // Picker should return the same subchannel repeatedly.
210 for (size_t i = 0; i < 3; ++i) {
211 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
212 }
213 }
214
TEST_F(PickFirstTest,FlattensEndpointAddressesList)215 TEST_F(PickFirstTest, FlattensEndpointAddressesList) {
216 // Send an update containing two endpoints, the first one with two addresses.
217 constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
218 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
219 constexpr std::array<absl::string_view, 1> kEndpoint2Addresses = {
220 "ipv4:127.0.0.1:445"};
221 const std::array<EndpointAddresses, 2> kEndpoints = {
222 MakeEndpointAddresses(kEndpoint1Addresses),
223 MakeEndpointAddresses(kEndpoint2Addresses)};
224 absl::Status status = ApplyUpdate(
225 BuildUpdate(kEndpoints, MakePickFirstConfig(false)), lb_policy_.get());
226 EXPECT_TRUE(status.ok()) << status;
227 // LB policy should have created a subchannel for all 3 addresses.
228 auto* subchannel = FindSubchannel(kEndpoint1Addresses[0]);
229 ASSERT_NE(subchannel, nullptr);
230 auto* subchannel2 = FindSubchannel(kEndpoint1Addresses[1]);
231 ASSERT_NE(subchannel2, nullptr);
232 auto* subchannel3 = FindSubchannel(kEndpoint2Addresses[0]);
233 ASSERT_NE(subchannel3, nullptr);
234 // When the LB policy receives the first subchannel's initial connectivity
235 // state notification (IDLE), it will request a connection.
236 EXPECT_TRUE(subchannel->ConnectionRequested());
237 // This causes the subchannel to start to connect, so it reports
238 // CONNECTING.
239 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
240 // LB policy should have reported CONNECTING state.
241 ExpectConnectingUpdate();
242 // The other subchannels should not be connecting.
243 EXPECT_FALSE(subchannel2->ConnectionRequested());
244 EXPECT_FALSE(subchannel3->ConnectionRequested());
245 // The first subchannel's connection attempt fails.
246 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
247 absl::UnavailableError("failed to connect"));
248 // The LB policy will start a connection attempt on the second subchannel.
249 EXPECT_TRUE(subchannel2->ConnectionRequested());
250 EXPECT_FALSE(subchannel3->ConnectionRequested());
251 // This causes the subchannel to start to connect, so it reports
252 // CONNECTING.
253 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
254 // The connection attempt fails.
255 subchannel2->SetConnectivityState(
256 GRPC_CHANNEL_TRANSIENT_FAILURE,
257 absl::UnavailableError("failed to connect"));
258 // The LB policy will start a connection attempt on the third subchannel.
259 EXPECT_TRUE(subchannel3->ConnectionRequested());
260 // This causes the subchannel to start to connect, so it reports
261 // CONNECTING.
262 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
263 // This one succeeds.
264 subchannel3->SetConnectivityState(GRPC_CHANNEL_READY);
265 // The LB policy will report CONNECTING some number of times (doesn't
266 // matter how many) and then report READY.
267 auto picker = WaitForConnected();
268 ASSERT_NE(picker, nullptr);
269 // Picker should return the same subchannel repeatedly.
270 for (size_t i = 0; i < 3; ++i) {
271 EXPECT_EQ(ExpectPickComplete(picker.get()), kEndpoint2Addresses[0]);
272 }
273 }
274
TEST_F(PickFirstTest,FirstTwoAddressesInTransientFailureAtStart)275 TEST_F(PickFirstTest, FirstTwoAddressesInTransientFailureAtStart) {
276 // Send an update containing three addresses.
277 // The first two addresses are already in state TRANSIENT_FAILURE when the
278 // LB policy gets the update.
279 constexpr std::array<absl::string_view, 3> kAddresses = {
280 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445"};
281 auto* subchannel = CreateSubchannel(kAddresses[0]);
282 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
283 absl::UnavailableError("failed to connect"),
284 /*validate_state_transition=*/false);
285 auto* subchannel2 = CreateSubchannel(kAddresses[1]);
286 subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
287 absl::UnavailableError("failed to connect"),
288 /*validate_state_transition=*/false);
289 absl::Status status = ApplyUpdate(
290 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
291 EXPECT_TRUE(status.ok()) << status;
292 // LB policy should have created a subchannel for all addresses.
293 auto* subchannel3 = FindSubchannel(kAddresses[2]);
294 ASSERT_NE(subchannel3, nullptr);
295 // When the LB policy receives the first subchannel's initial connectivity
296 // state notification (TRANSIENT_FAILURE), it will move on to the second
297 // subchannel. The second subchannel is also in state TRANSIENT_FAILURE,
298 // so the LB policy will move on to the third subchannel. That
299 // subchannel is in state IDLE, so the LB policy will request a connection
300 // attempt on it.
301 EXPECT_TRUE(subchannel3->ConnectionRequested());
302 // This causes the subchannel to start to connect, so it reports
303 // CONNECTING.
304 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
305 // LB policy should have reported CONNECTING state.
306 ExpectConnectingUpdate();
307 // The connection attempt succeeds.
308 subchannel3->SetConnectivityState(GRPC_CHANNEL_READY);
309 // The LB policy will report CONNECTING some number of times (doesn't
310 // matter how many) and then report READY.
311 auto picker = WaitForConnected();
312 ASSERT_NE(picker, nullptr);
313 // Picker should return the same subchannel repeatedly.
314 for (size_t i = 0; i < 3; ++i) {
315 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[2]);
316 }
317 }
318
TEST_F(PickFirstTest,AllAddressesInTransientFailureAtStart)319 TEST_F(PickFirstTest, AllAddressesInTransientFailureAtStart) {
320 // Send an update containing two addresses, both in TRANSIENT_FAILURE
321 // when the LB policy gets the update.
322 constexpr std::array<absl::string_view, 2> kAddresses = {
323 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
324 auto* subchannel = CreateSubchannel(kAddresses[0]);
325 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
326 absl::UnavailableError("failed to connect"),
327 /*validate_state_transition=*/false);
328 auto* subchannel2 = CreateSubchannel(kAddresses[1]);
329 subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
330 absl::UnavailableError("failed to connect"),
331 /*validate_state_transition=*/false);
332 absl::Status status = ApplyUpdate(
333 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
334 EXPECT_TRUE(status.ok()) << status;
335 // The LB policy should request re-resolution.
336 ExpectReresolutionRequest();
337 // The LB policy should report TRANSIENT_FAILURE.
338 WaitForConnectionFailed([&](const absl::Status& status) {
339 EXPECT_EQ(status, absl::UnavailableError(
340 "failed to connect to all addresses; "
341 "last error: UNAVAILABLE: failed to connect"));
342 });
343 // No connections should have been requested.
344 EXPECT_FALSE(subchannel->ConnectionRequested());
345 EXPECT_FALSE(subchannel2->ConnectionRequested());
346 // Now have the first subchannel report IDLE.
347 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
348 // The policy will ask it to connect.
349 EXPECT_TRUE(subchannel->ConnectionRequested());
350 // This causes the subchannel to start to connect, so it reports
351 // CONNECTING.
352 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
353 // The connection attempt succeeds.
354 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
355 // The LB policy will report READY.
356 auto picker = ExpectState(GRPC_CHANNEL_READY);
357 ASSERT_NE(picker, nullptr);
358 // Picker should return the same subchannel repeatedly.
359 for (size_t i = 0; i < 3; ++i) {
360 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
361 }
362 }
363
TEST_F(PickFirstTest,StaysInTransientFailureAfterAddressListUpdate)364 TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) {
365 // Send an update containing two addresses, both in TRANSIENT_FAILURE
366 // when the LB policy gets the update.
367 constexpr std::array<absl::string_view, 2> kAddresses = {
368 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
369 auto* subchannel = CreateSubchannel(kAddresses[0]);
370 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
371 absl::UnavailableError("failed to connect"),
372 /*validate_state_transition=*/false);
373 auto* subchannel2 = CreateSubchannel(kAddresses[1]);
374 subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
375 absl::UnavailableError("failed to connect"),
376 /*validate_state_transition=*/false);
377 absl::Status status = ApplyUpdate(
378 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
379 EXPECT_TRUE(status.ok()) << status;
380 // The LB policy should request re-resolution.
381 ExpectReresolutionRequest();
382 // The LB policy should report TRANSIENT_FAILURE.
383 WaitForConnectionFailed([&](const absl::Status& status) {
384 EXPECT_EQ(status, absl::UnavailableError(
385 "failed to connect to all addresses; "
386 "last error: UNAVAILABLE: failed to connect"));
387 });
388 // No connections should have been requested.
389 EXPECT_FALSE(subchannel->ConnectionRequested());
390 EXPECT_FALSE(subchannel2->ConnectionRequested());
391 // Now send an address list update. This contains the first address
392 // from the previous update plus a new address, whose subchannel will
393 // be in state IDLE.
394 constexpr std::array<absl::string_view, 2> kAddresses2 = {
395 kAddresses[0], "ipv4:127.0.0.1:445"};
396 status = ApplyUpdate(BuildUpdate(kAddresses2, MakePickFirstConfig(false)),
397 lb_policy());
398 EXPECT_TRUE(status.ok()) << status;
399 // The LB policy should have created a subchannel for the new address.
400 auto* subchannel3 = FindSubchannel(kAddresses2[1]);
401 ASSERT_NE(subchannel3, nullptr);
402 // The policy will ask it to connect.
403 EXPECT_TRUE(subchannel3->ConnectionRequested());
404 // This causes it to start to connect, so it reports CONNECTING.
405 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
406 // The connection attempt succeeds.
407 subchannel3->SetConnectivityState(GRPC_CHANNEL_READY);
408 // The LB policy will report READY.
409 auto picker = ExpectState(GRPC_CHANNEL_READY);
410 ASSERT_NE(picker, nullptr);
411 // Picker should return the same subchannel repeatedly.
412 for (size_t i = 0; i < 3; ++i) {
413 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses2[1]);
414 }
415 }
416
417 // This tests a real-world bug in which PF ignored a resolver update if
418 // it had just created the subchannels but had not yet seen their
419 // initial connectivity state notification.
TEST_F(PickFirstTest,ResolverUpdateBeforeLeavingIdle)420 TEST_F(PickFirstTest, ResolverUpdateBeforeLeavingIdle) {
421 constexpr std::array<absl::string_view, 2> kAddresses = {
422 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
423 constexpr std::array<absl::string_view, 2> kNewAddresses = {
424 "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
425 // Send initial update containing two addresses.
426 absl::Status status = ApplyUpdate(
427 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
428 EXPECT_TRUE(status.ok()) << status;
429 // LB policy should have created a subchannel for both addresses.
430 auto* subchannel = FindSubchannel(kAddresses[0]);
431 ASSERT_NE(subchannel, nullptr);
432 auto* subchannel2 = FindSubchannel(kAddresses[1]);
433 ASSERT_NE(subchannel2, nullptr);
434 // When the LB policy receives the first subchannel's initial connectivity
435 // state notification (IDLE), it will request a connection.
436 EXPECT_TRUE(subchannel->ConnectionRequested());
437 // This causes the subchannel to start to connect, so it reports CONNECTING.
438 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
439 // LB policy should have reported CONNECTING state.
440 ExpectConnectingUpdate();
441 // The second subchannel should not be connecting.
442 EXPECT_FALSE(subchannel2->ConnectionRequested());
443 // When the first subchannel becomes connected, it reports READY.
444 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
445 // The LB policy will report CONNECTING some number of times (doesn't
446 // matter how many) and then report READY.
447 auto picker = WaitForConnected();
448 ASSERT_NE(picker, nullptr);
449 // Picker should return the same subchannel repeatedly.
450 for (size_t i = 0; i < 3; ++i) {
451 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
452 }
453 // Now the connection is closed, so we go IDLE.
454 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
455 ExpectReresolutionRequest();
456 ExpectState(GRPC_CHANNEL_IDLE);
457 // Now we tell the LB policy to exit idle. This causes it to create a
458 // new subchannel list from the original update. However, before it
459 // can get the initial connectivity state notifications for those
460 // subchannels (i.e., before it can transition from IDLE to CONNECTING),
461 // we send a new update.
462 absl::Notification notification;
463 work_serializer_->Run(
464 [&]() {
465 // Inject second update into WorkSerializer queue before we
466 // exit idle, so that the second update gets run before the initial
467 // subchannel connectivity state notifications from the first update
468 // are delivered.
469 work_serializer_->Run(
470 [&]() {
471 // Second update.
472 absl::Status status = lb_policy()->UpdateLocked(
473 BuildUpdate(kNewAddresses, MakePickFirstConfig(false)));
474 EXPECT_TRUE(status.ok()) << status;
475 // Trigger notification once all connectivity state
476 // notifications have been delivered.
477 work_serializer_->Run([&]() { notification.Notify(); },
478 DEBUG_LOCATION);
479 },
480 DEBUG_LOCATION);
481 // Exit idle.
482 lb_policy()->ExitIdleLocked();
483 },
484 DEBUG_LOCATION);
485 while (!notification.HasBeenNotified()) {
486 fuzzing_ee_->Tick();
487 }
488 // The LB policy should have created subchannels for the new addresses.
489 auto* subchannel3 = FindSubchannel(kNewAddresses[0]);
490 ASSERT_NE(subchannel3, nullptr);
491 auto* subchannel4 = FindSubchannel(kNewAddresses[1]);
492 ASSERT_NE(subchannel4, nullptr);
493 // The LB policy will request a connection on the first new subchannel,
494 // none of the others.
495 EXPECT_TRUE(subchannel3->ConnectionRequested());
496 EXPECT_FALSE(subchannel->ConnectionRequested());
497 EXPECT_FALSE(subchannel2->ConnectionRequested());
498 EXPECT_FALSE(subchannel4->ConnectionRequested());
499 // The subchannel starts a connection attempt.
500 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
501 // The LB policy should now report CONNECTING.
502 ExpectConnectingUpdate();
503 // The connection attempt succeeds.
504 subchannel3->SetConnectivityState(GRPC_CHANNEL_READY);
505 // The LB policy will report CONNECTING some number of times (doesn't
506 // matter how many) and then report READY.
507 picker = WaitForConnected();
508 ASSERT_NE(picker, nullptr);
509 // Picker should return the same subchannel repeatedly.
510 for (size_t i = 0; i < 3; ++i) {
511 EXPECT_EQ(ExpectPickComplete(picker.get()), kNewAddresses[0]);
512 }
513 }
514
TEST_F(PickFirstTest,HappyEyeballs)515 TEST_F(PickFirstTest, HappyEyeballs) {
516 // Send an update containing three addresses.
517 constexpr std::array<absl::string_view, 3> kAddresses = {
518 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445"};
519 absl::Status status = ApplyUpdate(
520 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
521 EXPECT_TRUE(status.ok()) << status;
522 // LB policy should have created a subchannel for both addresses.
523 auto* subchannel = FindSubchannel(kAddresses[0]);
524 ASSERT_NE(subchannel, nullptr);
525 auto* subchannel2 = FindSubchannel(kAddresses[1]);
526 ASSERT_NE(subchannel2, nullptr);
527 auto* subchannel3 = FindSubchannel(kAddresses[2]);
528 ASSERT_NE(subchannel3, nullptr);
529 // When the LB policy receives the first subchannel's initial connectivity
530 // state notification (IDLE), it will request a connection.
531 EXPECT_TRUE(subchannel->ConnectionRequested());
532 // This causes the subchannel to start to connect, so it reports
533 // CONNECTING.
534 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
535 // LB policy should have reported CONNECTING state.
536 ExpectConnectingUpdate();
537 // The second subchannel should not be connecting.
538 EXPECT_FALSE(subchannel2->ConnectionRequested());
539 // The timer fires before the connection attempt completes.
540 IncrementTimeBy(Duration::Milliseconds(250));
541 // This causes the LB policy to start connecting to the second subchannel.
542 EXPECT_TRUE(subchannel2->ConnectionRequested());
543 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
544 // The second subchannel fails before the timer fires.
545 subchannel2->SetConnectivityState(
546 GRPC_CHANNEL_TRANSIENT_FAILURE,
547 absl::UnavailableError("failed to connect"));
548 // This causes the LB policy to start connecting to the third subchannel.
549 EXPECT_TRUE(subchannel3->ConnectionRequested());
550 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
551 // Incrementing the time here has no effect, because the LB policy
552 // does not use a timer for the last subchannel in the list.
553 // So if there are any queued updates at this point, they will be
554 // CONNECTING state.
555 IncrementTimeBy(Duration::Milliseconds(250));
556 DrainConnectingUpdates();
557 // The first subchannel becomes connected.
558 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
559 // The LB policy will report CONNECTING some number of times (doesn't
560 // matter how many) and then report READY.
561 auto picker = WaitForConnected();
562 ASSERT_NE(picker, nullptr);
563 // Picker should return the same subchannel repeatedly.
564 for (size_t i = 0; i < 3; ++i) {
565 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
566 }
567 }
568
TEST_F(PickFirstTest,HappyEyeballsCompletesWithoutSuccess)569 TEST_F(PickFirstTest, HappyEyeballsCompletesWithoutSuccess) {
570 // Send an update containing three addresses.
571 constexpr std::array<absl::string_view, 3> kAddresses = {
572 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445"};
573 absl::Status status = ApplyUpdate(
574 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
575 EXPECT_TRUE(status.ok()) << status;
576 // LB policy should have created a subchannel for both addresses.
577 auto* subchannel = FindSubchannel(kAddresses[0]);
578 ASSERT_NE(subchannel, nullptr);
579 auto* subchannel2 = FindSubchannel(kAddresses[1]);
580 ASSERT_NE(subchannel2, nullptr);
581 auto* subchannel3 = FindSubchannel(kAddresses[2]);
582 ASSERT_NE(subchannel3, nullptr);
583 // When the LB policy receives the first subchannel's initial connectivity
584 // state notification (IDLE), it will request a connection.
585 EXPECT_TRUE(subchannel->ConnectionRequested());
586 // This causes the subchannel to start to connect, so it reports
587 // CONNECTING.
588 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
589 // LB policy should have reported CONNECTING state.
590 ExpectConnectingUpdate();
591 // The second subchannel should not be connecting.
592 EXPECT_FALSE(subchannel2->ConnectionRequested());
593 // The timer fires before the connection attempt completes.
594 IncrementTimeBy(Duration::Milliseconds(250));
595 // This causes the LB policy to start connecting to the second subchannel.
596 EXPECT_TRUE(subchannel2->ConnectionRequested());
597 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
598 // The second subchannel fails before the timer fires.
599 subchannel2->SetConnectivityState(
600 GRPC_CHANNEL_TRANSIENT_FAILURE,
601 absl::UnavailableError("failed to connect"));
602 // This causes the LB policy to start connecting to the third subchannel.
603 EXPECT_TRUE(subchannel3->ConnectionRequested());
604 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
605 // Incrementing the time here has no effect, because the LB policy
606 // does not use a timer for the last subchannel in the list.
607 // So if there are any queued updates at this point, they will be
608 // CONNECTING state.
609 IncrementTimeBy(Duration::Milliseconds(250));
610 DrainConnectingUpdates();
611 // Set subchannel 2 back to IDLE, so it's already in that state when
612 // Happy Eyeballs fails.
613 subchannel2->SetConnectivityState(GRPC_CHANNEL_IDLE);
614 // Third subchannel fails to connect.
615 subchannel3->SetConnectivityState(
616 GRPC_CHANNEL_TRANSIENT_FAILURE,
617 absl::UnavailableError("failed to connect"));
618 ExpectQueueEmpty();
619 // Eventually, the first subchannel fails as well.
620 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
621 absl::UnavailableError("failed to connect"));
622 // The LB policy should request re-resolution.
623 ExpectReresolutionRequest();
624 // The LB policy should report TRANSIENT_FAILURE.
625 WaitForConnectionFailed([&](const absl::Status& status) {
626 EXPECT_EQ(status, absl::UnavailableError(
627 "failed to connect to all addresses; "
628 "last error: UNAVAILABLE: failed to connect"));
629 });
630 // We are now done with the Happy Eyeballs pass, and we move into a
631 // mode where we try to connect to all subchannels in parallel.
632 // Subchannel 2 was already in state IDLE, so the LB policy will
633 // immediately trigger a connection request on it. It will not do so
634 // for subchannels 1 or 3, which are in TRANSIENT_FAILURE.
635 EXPECT_FALSE(subchannel->ConnectionRequested());
636 EXPECT_TRUE(subchannel2->ConnectionRequested());
637 EXPECT_FALSE(subchannel3->ConnectionRequested());
638 // Subchannel 2 reports CONNECTING.
639 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
640 // Now subchannel 1 reports IDLE. This should trigger another
641 // connection attempt.
642 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
643 EXPECT_TRUE(subchannel->ConnectionRequested());
644 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
645 // Now subchannel 1 reports TRANSIENT_FAILURE. This is the first failure
646 // since we finished Happy Eyeballs.
647 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
648 absl::UnavailableError("failed to connect"));
649 EXPECT_FALSE(subchannel->ConnectionRequested());
650 // Now subchannel 3 reports IDLE. This should trigger another
651 // connection attempt.
652 subchannel3->SetConnectivityState(GRPC_CHANNEL_IDLE);
653 EXPECT_TRUE(subchannel3->ConnectionRequested());
654 subchannel3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
655 // Subchannel 2 reports TF. This is the second failure since we
656 // finished Happy Eyeballs.
657 subchannel2->SetConnectivityState(
658 GRPC_CHANNEL_TRANSIENT_FAILURE,
659 absl::UnavailableError("failed to connect"));
660 EXPECT_FALSE(subchannel2->ConnectionRequested());
661 // Finally, subchannel 3 reports TF. This is the third failure since
662 // we finished Happy Eyeballs, so the LB policy will request
663 // re-resolution and report TF again.
664 subchannel3->SetConnectivityState(
665 GRPC_CHANNEL_TRANSIENT_FAILURE,
666 absl::UnavailableError("failed to connect"));
667 EXPECT_FALSE(subchannel3->ConnectionRequested());
668 ExpectReresolutionRequest();
669 ExpectTransientFailureUpdate(
670 absl::UnavailableError("failed to connect to all addresses; "
671 "last error: UNAVAILABLE: failed to connect"));
672 // Now the second subchannel goes IDLE.
673 subchannel2->SetConnectivityState(GRPC_CHANNEL_IDLE);
674 // The LB policy asks it to connect.
675 EXPECT_TRUE(subchannel2->ConnectionRequested());
676 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
677 // This time, the connection attempt succeeds.
678 subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
679 // The LB policy will report READY.
680 auto picker = ExpectState(GRPC_CHANNEL_READY);
681 ASSERT_NE(picker, nullptr);
682 // Picker should return the same subchannel repeatedly.
683 for (size_t i = 0; i < 3; ++i) {
684 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
685 }
686 }
687
TEST_F(PickFirstTest,HappyEyeballsLastSubchannelFailsWhileAnotherIsStillPending)688 TEST_F(PickFirstTest,
689 HappyEyeballsLastSubchannelFailsWhileAnotherIsStillPending) {
690 // Send an update containing three addresses.
691 constexpr std::array<absl::string_view, 2> kAddresses = {
692 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
693 absl::Status status = ApplyUpdate(
694 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
695 EXPECT_TRUE(status.ok()) << status;
696 // LB policy should have created a subchannel for both addresses.
697 auto* subchannel = FindSubchannel(kAddresses[0]);
698 ASSERT_NE(subchannel, nullptr);
699 auto* subchannel2 = FindSubchannel(kAddresses[1]);
700 ASSERT_NE(subchannel2, nullptr);
701 // When the LB policy receives the first subchannel's initial connectivity
702 // state notification (IDLE), it will request a connection.
703 EXPECT_TRUE(subchannel->ConnectionRequested());
704 // This causes the subchannel to start to connect, so it reports
705 // CONNECTING.
706 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
707 // LB policy should have reported CONNECTING state.
708 ExpectConnectingUpdate();
709 // The second subchannel should not be connecting.
710 EXPECT_FALSE(subchannel2->ConnectionRequested());
711 // The timer fires before the connection attempt completes.
712 IncrementTimeBy(Duration::Milliseconds(250));
713 // This causes the LB policy to start connecting to the second subchannel.
714 EXPECT_TRUE(subchannel2->ConnectionRequested());
715 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
716 // The second subchannel fails.
717 subchannel2->SetConnectivityState(
718 GRPC_CHANNEL_TRANSIENT_FAILURE,
719 absl::UnavailableError("failed to connect"));
720 // The LB policy should not yet report TRANSIENT_FAILURE, because the
721 // first subchannel is still CONNECTING.
722 DrainConnectingUpdates();
723 // Set subchannel 2 back to IDLE, so it's already in that state when
724 // Happy Eyeballs fails.
725 subchannel2->SetConnectivityState(GRPC_CHANNEL_IDLE);
726 // Now the first subchannel fails.
727 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
728 absl::UnavailableError("failed to connect"));
729 // The LB policy should request re-resolution.
730 ExpectReresolutionRequest();
731 // The LB policy should report TRANSIENT_FAILURE.
732 WaitForConnectionFailed([&](const absl::Status& status) {
733 EXPECT_EQ(status, absl::UnavailableError(
734 "failed to connect to all addresses; "
735 "last error: UNAVAILABLE: failed to connect"));
736 });
737 // We are now done with the Happy Eyeballs pass, and we move into a
738 // mode where we try to connect to all subchannels in parallel.
739 // Subchannel 2 was already in state IDLE, so the LB policy will
740 // immediately trigger a connection request on it. It will not do so
741 // for subchannel 1, which is still in TRANSIENT_FAILURE.
742 EXPECT_FALSE(subchannel->ConnectionRequested());
743 EXPECT_TRUE(subchannel2->ConnectionRequested());
744 // Subchannel 2 reports CONNECTING.
745 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
746 // Subchannel 2 reports READY.
747 subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
748 // The LB policy will report READY.
749 auto picker = ExpectState(GRPC_CHANNEL_READY);
750 ASSERT_NE(picker, nullptr);
751 // Picker should return the same subchannel repeatedly.
752 for (size_t i = 0; i < 3; ++i) {
753 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
754 }
755 }
756
TEST_F(PickFirstTest,HappyEyeballsAddressInterleaving)757 TEST_F(PickFirstTest, HappyEyeballsAddressInterleaving) {
758 // Send an update containing four IPv4 addresses followed by two
759 // IPv6 addresses.
760 constexpr std::array<absl::string_view, 6> kAddresses = {
761 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445",
762 "ipv4:127.0.0.1:446", "ipv6:[::1]:444", "ipv6:[::1]:445"};
763 absl::Status status = ApplyUpdate(
764 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
765 EXPECT_TRUE(status.ok()) << status;
766 // LB policy should have created a subchannel for all addresses.
767 auto* subchannel_ipv4_1 = FindSubchannel(kAddresses[0]);
768 ASSERT_NE(subchannel_ipv4_1, nullptr);
769 auto* subchannel_ipv4_2 = FindSubchannel(kAddresses[1]);
770 ASSERT_NE(subchannel_ipv4_2, nullptr);
771 auto* subchannel_ipv4_3 = FindSubchannel(kAddresses[2]);
772 ASSERT_NE(subchannel_ipv4_3, nullptr);
773 auto* subchannel_ipv4_4 = FindSubchannel(kAddresses[3]);
774 ASSERT_NE(subchannel_ipv4_4, nullptr);
775 auto* subchannel_ipv6_1 = FindSubchannel(kAddresses[4]);
776 ASSERT_NE(subchannel_ipv6_1, nullptr);
777 auto* subchannel_ipv6_2 = FindSubchannel(kAddresses[5]);
778 ASSERT_NE(subchannel_ipv6_2, nullptr);
779 // When the LB policy receives the subchannels' initial connectivity
780 // state notifications (all IDLE), it will request a connection on the
781 // first IPv4 subchannel.
782 EXPECT_TRUE(subchannel_ipv4_1->ConnectionRequested());
783 subchannel_ipv4_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
784 // LB policy should have reported CONNECTING state.
785 ExpectConnectingUpdate();
786 // No other subchannels should be connecting.
787 EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
788 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
789 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
790 EXPECT_FALSE(subchannel_ipv6_1->ConnectionRequested());
791 EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
792 // The timer fires before the connection attempt completes.
793 IncrementTimeBy(Duration::Milliseconds(250));
794 // This causes the LB policy to start connecting to the first IPv6
795 // subchannel.
796 EXPECT_TRUE(subchannel_ipv6_1->ConnectionRequested());
797 subchannel_ipv6_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
798 // LB policy should have reported CONNECTING state.
799 ExpectConnectingUpdate();
800 // No other subchannels should be connecting.
801 EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
802 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
803 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
804 EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
805 // The timer fires before the connection attempt completes.
806 IncrementTimeBy(Duration::Milliseconds(250));
807 // This causes the LB policy to start connecting to the second IPv4
808 // subchannel.
809 EXPECT_TRUE(subchannel_ipv4_2->ConnectionRequested());
810 subchannel_ipv4_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
811 // LB policy should have reported CONNECTING state.
812 ExpectConnectingUpdate();
813 // No other subchannels should be connecting.
814 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
815 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
816 EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
817 // The timer fires before the connection attempt completes.
818 IncrementTimeBy(Duration::Milliseconds(250));
819 // This causes the LB policy to start connecting to the second IPv6
820 // subchannel.
821 EXPECT_TRUE(subchannel_ipv6_2->ConnectionRequested());
822 subchannel_ipv6_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
823 // LB policy should have reported CONNECTING state.
824 ExpectConnectingUpdate();
825 // No other subchannels should be connecting.
826 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
827 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
828 // The timer fires before the connection attempt completes.
829 IncrementTimeBy(Duration::Milliseconds(250));
830 // This causes the LB policy to start connecting to the third IPv4
831 // subchannel.
832 EXPECT_TRUE(subchannel_ipv4_3->ConnectionRequested());
833 subchannel_ipv4_3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
834 // LB policy should have reported CONNECTING state.
835 ExpectConnectingUpdate();
836 // No other subchannels should be connecting.
837 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
838 // The timer fires before the connection attempt completes.
839 IncrementTimeBy(Duration::Milliseconds(250));
840 // This causes the LB policy to start connecting to the fourth IPv4
841 // subchannel.
842 EXPECT_TRUE(subchannel_ipv4_4->ConnectionRequested());
843 subchannel_ipv4_4->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
844 // LB policy should have reported CONNECTING state.
845 ExpectConnectingUpdate();
846 }
847
TEST_F(PickFirstTest,HappyEyeballsAddressInterleavingSecondFamilyHasMoreAddresses)848 TEST_F(PickFirstTest,
849 HappyEyeballsAddressInterleavingSecondFamilyHasMoreAddresses) {
850 // Send an update containing two IPv6 addresses followed by four IPv4
851 // addresses.
852 constexpr std::array<absl::string_view, 6> kAddresses = {
853 "ipv6:[::1]:444", "ipv6:[::1]:445", "ipv4:127.0.0.1:443",
854 "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
855 absl::Status status = ApplyUpdate(
856 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
857 EXPECT_TRUE(status.ok()) << status;
858 // LB policy should have created a subchannel for all addresses.
859 auto* subchannel_ipv6_1 = FindSubchannel(kAddresses[0]);
860 ASSERT_NE(subchannel_ipv6_1, nullptr);
861 auto* subchannel_ipv6_2 = FindSubchannel(kAddresses[1]);
862 ASSERT_NE(subchannel_ipv6_2, nullptr);
863 auto* subchannel_ipv4_1 = FindSubchannel(kAddresses[2]);
864 ASSERT_NE(subchannel_ipv4_1, nullptr);
865 auto* subchannel_ipv4_2 = FindSubchannel(kAddresses[3]);
866 ASSERT_NE(subchannel_ipv4_2, nullptr);
867 auto* subchannel_ipv4_3 = FindSubchannel(kAddresses[4]);
868 ASSERT_NE(subchannel_ipv4_3, nullptr);
869 auto* subchannel_ipv4_4 = FindSubchannel(kAddresses[5]);
870 ASSERT_NE(subchannel_ipv4_4, nullptr);
871 // When the LB policy receives the subchannels' initial connectivity
872 // state notifications (all IDLE), it will request a connection on the
873 // first IPv6 subchannel.
874 EXPECT_TRUE(subchannel_ipv6_1->ConnectionRequested());
875 subchannel_ipv6_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
876 // LB policy should have reported CONNECTING state.
877 ExpectConnectingUpdate();
878 // No other subchannels should be connecting.
879 EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
880 EXPECT_FALSE(subchannel_ipv4_1->ConnectionRequested());
881 EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
882 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
883 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
884 // The timer fires before the connection attempt completes.
885 IncrementTimeBy(Duration::Milliseconds(250));
886 // This causes the LB policy to start connecting to the first IPv4
887 // subchannel.
888 EXPECT_TRUE(subchannel_ipv4_1->ConnectionRequested());
889 subchannel_ipv4_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
890 // LB policy should have reported CONNECTING state.
891 ExpectConnectingUpdate();
892 // No other subchannels should be connecting.
893 EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
894 EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
895 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
896 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
897 // The timer fires before the connection attempt completes.
898 IncrementTimeBy(Duration::Milliseconds(250));
899 // This causes the LB policy to start connecting to the second IPv6
900 // subchannel.
901 EXPECT_TRUE(subchannel_ipv6_2->ConnectionRequested());
902 subchannel_ipv6_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
903 // LB policy should have reported CONNECTING state.
904 ExpectConnectingUpdate();
905 // No other subchannels should be connecting.
906 EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
907 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
908 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
909 // The timer fires before the connection attempt completes.
910 IncrementTimeBy(Duration::Milliseconds(250));
911 // This causes the LB policy to start connecting to the second IPv4
912 // subchannel.
913 EXPECT_TRUE(subchannel_ipv4_2->ConnectionRequested());
914 subchannel_ipv4_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
915 // LB policy should have reported CONNECTING state.
916 ExpectConnectingUpdate();
917 // No other subchannels should be connecting.
918 EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
919 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
920 // The timer fires before the connection attempt completes.
921 IncrementTimeBy(Duration::Milliseconds(250));
922 // This causes the LB policy to start connecting to the third IPv4
923 // subchannel.
924 EXPECT_TRUE(subchannel_ipv4_3->ConnectionRequested());
925 subchannel_ipv4_3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
926 // LB policy should have reported CONNECTING state.
927 ExpectConnectingUpdate();
928 // No other subchannels should be connecting.
929 EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
930 // The timer fires before the connection attempt completes.
931 IncrementTimeBy(Duration::Milliseconds(250));
932 // This causes the LB policy to start connecting to the fourth IPv4
933 // subchannel.
934 EXPECT_TRUE(subchannel_ipv4_4->ConnectionRequested());
935 subchannel_ipv4_4->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
936 // LB policy should have reported CONNECTING state.
937 ExpectConnectingUpdate();
938 }
939
TEST_F(PickFirstTest,FirstAddressGoesIdleBeforeSecondOneFails)940 TEST_F(PickFirstTest, FirstAddressGoesIdleBeforeSecondOneFails) {
941 // Send an update containing two addresses.
942 constexpr std::array<absl::string_view, 2> kAddresses = {
943 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
944 absl::Status status = ApplyUpdate(
945 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
946 EXPECT_TRUE(status.ok()) << status;
947 // LB policy should have created a subchannel for both addresses.
948 auto* subchannel = FindSubchannel(kAddresses[0]);
949 ASSERT_NE(subchannel, nullptr);
950 auto* subchannel2 = FindSubchannel(kAddresses[1]);
951 ASSERT_NE(subchannel2, nullptr);
952 // When the LB policy receives the first subchannel's initial connectivity
953 // state notification (IDLE), it will request a connection.
954 EXPECT_TRUE(subchannel->ConnectionRequested());
955 // This causes the subchannel to start to connect, so it reports
956 // CONNECTING.
957 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
958 // LB policy should have reported CONNECTING state.
959 ExpectConnectingUpdate();
960 // The second subchannel should not be connecting.
961 EXPECT_FALSE(subchannel2->ConnectionRequested());
962 // The first subchannel's connection attempt fails.
963 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
964 absl::UnavailableError("failed to connect"));
965 // The LB policy will start a connection attempt on the second subchannel.
966 EXPECT_TRUE(subchannel2->ConnectionRequested());
967 // This causes the subchannel to start to connect, so it reports
968 // CONNECTING.
969 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
970 // LB policy should have reported CONNECTING state.
971 ExpectConnectingUpdate();
972 // Before the second subchannel's attempt completes, the first
973 // subchannel reports IDLE.
974 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
975 // Now the connection attempt on the second subchannel fails.
976 subchannel2->SetConnectivityState(
977 GRPC_CHANNEL_TRANSIENT_FAILURE,
978 absl::UnavailableError("failed to connect"));
979 // The LB policy should request re-resolution.
980 ExpectReresolutionRequest();
981 // The LB policy will report TRANSIENT_FAILURE.
982 WaitForConnectionFailed([&](const absl::Status& status) {
983 EXPECT_EQ(status, absl::UnavailableError(
984 "failed to connect to all addresses; "
985 "last error: UNAVAILABLE: failed to connect"));
986 });
987 // It will then start connecting to the first address again.
988 EXPECT_TRUE(subchannel->ConnectionRequested());
989 // This time, the connection attempt succeeds.
990 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
991 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
992 // The LB policy will report READY.
993 auto picker = ExpectState(GRPC_CHANNEL_READY);
994 ASSERT_NE(picker, nullptr);
995 // Picker should return the same subchannel repeatedly.
996 for (size_t i = 0; i < 3; ++i) {
997 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
998 }
999 }
1000
TEST_F(PickFirstTest,GoesIdleWhenConnectionFailsThenCanReconnect)1001 TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) {
1002 // Send an update containing two addresses.
1003 constexpr std::array<absl::string_view, 2> kAddresses = {
1004 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
1005 absl::Status status = ApplyUpdate(
1006 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
1007 EXPECT_TRUE(status.ok()) << status;
1008 // LB policy should have created a subchannel for both addresses.
1009 auto* subchannel = FindSubchannel(kAddresses[0]);
1010 ASSERT_NE(subchannel, nullptr);
1011 auto* subchannel2 = FindSubchannel(kAddresses[1]);
1012 ASSERT_NE(subchannel2, nullptr);
1013 // When the LB policy receives the first subchannel's initial connectivity
1014 // state notification (IDLE), it will request a connection.
1015 EXPECT_TRUE(subchannel->ConnectionRequested());
1016 // This causes the subchannel to start to connect, so it reports CONNECTING.
1017 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1018 // LB policy should have reported CONNECTING state.
1019 ExpectConnectingUpdate();
1020 // The second subchannel should not be connecting.
1021 EXPECT_FALSE(subchannel2->ConnectionRequested());
1022 // When the first subchannel becomes connected, it reports READY.
1023 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
1024 // The LB policy will report CONNECTING some number of times (doesn't
1025 // matter how many) and then report READY.
1026 auto picker = WaitForConnected();
1027 ASSERT_NE(picker, nullptr);
1028 // Picker should return the same subchannel repeatedly.
1029 for (size_t i = 0; i < 3; ++i) {
1030 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
1031 }
1032 // Connection fails.
1033 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
1034 // We should see a re-resolution request.
1035 ExpectReresolutionRequest();
1036 // LB policy reports IDLE with a queueing picker.
1037 ExpectStateAndQueuingPicker(GRPC_CHANNEL_IDLE);
1038 // By checking the picker, we told the LB policy to trigger a new
1039 // connection attempt, so it should start over with the first
1040 // subchannel.
1041 // Note that the picker will have enqueued the ExitIdle() call in the
1042 // WorkSerializer, so the first flush will execute that call. But
1043 // executing that call will result in enqueueing subchannel
1044 // connectivity state notifications, so we need to flush again to make
1045 // sure all of that work is done before we continue.
1046 WaitForWorkSerializerToFlush();
1047 WaitForWorkSerializerToFlush();
1048 EXPECT_TRUE(subchannel->ConnectionRequested());
1049 // The subchannel starts connecting.
1050 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1051 // LB policy should have reported CONNECTING state.
1052 ExpectConnectingUpdate();
1053 // Subchannel succeeds in connecting.
1054 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
1055 // LB policy reports READY.
1056 picker = WaitForConnected();
1057 ASSERT_NE(picker, nullptr);
1058 // Picker should return the same subchannel repeatedly.
1059 for (size_t i = 0; i < 3; ++i) {
1060 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
1061 }
1062 }
1063
TEST_F(PickFirstTest,AddressUpdateRemovedSelectedAddress)1064 TEST_F(PickFirstTest, AddressUpdateRemovedSelectedAddress) {
1065 if (!IsPickFirstNewEnabled()) return;
1066 // Send an update containing two addresses.
1067 constexpr std::array<absl::string_view, 2> kAddresses = {
1068 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
1069 absl::Status status = ApplyUpdate(
1070 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
1071 EXPECT_TRUE(status.ok()) << status;
1072 // LB policy should have created a subchannel for both addresses.
1073 auto* subchannel = FindSubchannel(kAddresses[0]);
1074 ASSERT_NE(subchannel, nullptr);
1075 auto* subchannel2 = FindSubchannel(kAddresses[1]);
1076 ASSERT_NE(subchannel2, nullptr);
1077 // When the LB policy receives the first subchannel's initial connectivity
1078 // state notification (IDLE), it will request a connection.
1079 EXPECT_TRUE(subchannel->ConnectionRequested());
1080 // This causes the subchannel to start to connect, so it reports CONNECTING.
1081 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1082 // LB policy should have reported CONNECTING state.
1083 ExpectConnectingUpdate();
1084 // The second subchannel should not be connecting.
1085 EXPECT_FALSE(subchannel2->ConnectionRequested());
1086 // When the first subchannel becomes connected, it reports READY.
1087 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
1088 // The LB policy will report CONNECTING some number of times (doesn't
1089 // matter how many) and then report READY.
1090 auto picker = WaitForConnected();
1091 ASSERT_NE(picker, nullptr);
1092 // Picker should return the same subchannel repeatedly.
1093 for (size_t i = 0; i < 3; ++i) {
1094 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
1095 }
1096 // Now send an address update that removes the selected address.
1097 status = ApplyUpdate(BuildUpdate({kAddresses[1]}, MakePickFirstConfig(false)),
1098 lb_policy());
1099 EXPECT_TRUE(status.ok()) << status;
1100 // We should see a re-resolution request.
1101 ExpectReresolutionRequest();
1102 // LB policy reports IDLE with a queueing picker.
1103 ExpectStateAndQueuingPicker(GRPC_CHANNEL_IDLE);
1104 // By checking the picker, we told the LB policy to trigger a new
1105 // connection attempt, so it should start one on the subchannel for
1106 // the remaining address.
1107 // Note that the picker will have enqueued the ExitIdle() call in the
1108 // WorkSerializer, so the first flush will execute that call. But
1109 // executing that call will result in enqueueing subchannel
1110 // connectivity state notifications, so we need to flush again to make
1111 // sure all of that work is done before we continue.
1112 WaitForWorkSerializerToFlush();
1113 WaitForWorkSerializerToFlush();
1114 EXPECT_TRUE(subchannel2->ConnectionRequested());
1115 // The subchannel starts connecting.
1116 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1117 // LB policy should have reported CONNECTING state.
1118 ExpectConnectingUpdate();
1119 // Subchannel succeeds in connecting.
1120 subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
1121 // LB policy reports READY.
1122 picker = WaitForConnected();
1123 ASSERT_NE(picker, nullptr);
1124 // Picker should return the same subchannel repeatedly.
1125 for (size_t i = 0; i < 3; ++i) {
1126 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
1127 }
1128 }
1129
TEST_F(PickFirstTest,AddressUpdateRetainsSelectedAddress)1130 TEST_F(PickFirstTest, AddressUpdateRetainsSelectedAddress) {
1131 // Send an update containing two addresses.
1132 constexpr std::array<absl::string_view, 2> kAddresses = {
1133 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
1134 absl::Status status = ApplyUpdate(
1135 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
1136 EXPECT_TRUE(status.ok()) << status;
1137 // LB policy should have created a subchannel for both addresses.
1138 auto* subchannel = FindSubchannel(kAddresses[0]);
1139 ASSERT_NE(subchannel, nullptr);
1140 auto* subchannel2 = FindSubchannel(kAddresses[1]);
1141 ASSERT_NE(subchannel2, nullptr);
1142 // When the LB policy receives the first subchannel's initial connectivity
1143 // state notification (IDLE), it will request a connection.
1144 EXPECT_TRUE(subchannel->ConnectionRequested());
1145 // This causes the subchannel to start to connect, so it reports CONNECTING.
1146 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1147 // LB policy should have reported CONNECTING state.
1148 ExpectConnectingUpdate();
1149 // The second subchannel should not be connecting.
1150 EXPECT_FALSE(subchannel2->ConnectionRequested());
1151 // When the first subchannel becomes connected, it reports READY.
1152 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
1153 // The LB policy will report CONNECTING some number of times (doesn't
1154 // matter how many) and then report READY.
1155 auto picker = WaitForConnected();
1156 ASSERT_NE(picker, nullptr);
1157 // Picker should return the same subchannel repeatedly.
1158 for (size_t i = 0; i < 3; ++i) {
1159 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
1160 }
1161 // Now send an address update that reverses the order of the addresses.
1162 status = ApplyUpdate(
1163 BuildUpdate({kAddresses[1], kAddresses[0]}, MakePickFirstConfig(false)),
1164 lb_policy());
1165 EXPECT_TRUE(status.ok()) << status;
1166 // The address we were already connected to is second in the new list,
1167 // but since it's already READY, we should stick with it.
1168 picker = ExpectState(GRPC_CHANNEL_READY);
1169 ASSERT_NE(picker, nullptr);
1170 // Picker should return the same subchannel repeatedly.
1171 for (size_t i = 0; i < 3; ++i) {
1172 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
1173 }
1174 EXPECT_FALSE(subchannel2->ConnectionRequested());
1175 }
1176
1177 // This exercizes a bug seen in the wild that caused a crash. For
1178 // details, see https://github.com/grpc/grpc/pull/38144.
TEST_F(PickFirstTest,SubchannelNotificationAfterShutdown)1179 TEST_F(PickFirstTest, SubchannelNotificationAfterShutdown) {
1180 // Send an update containing one address.
1181 constexpr std::array<absl::string_view, 2> kAddresses = {
1182 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
1183 absl::Status status = ApplyUpdate(
1184 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
1185 EXPECT_TRUE(status.ok()) << status;
1186 // LB policy should have created a subchannel for each address.
1187 auto* subchannel = FindSubchannel(kAddresses[0]);
1188 ASSERT_NE(subchannel, nullptr);
1189 auto* subchannel2 = FindSubchannel(kAddresses[1]);
1190 ASSERT_NE(subchannel2, nullptr);
1191 // When the LB policy receives the first subchannel's initial connectivity
1192 // state notification (IDLE), it will request a connection.
1193 EXPECT_TRUE(subchannel->ConnectionRequested());
1194 // This causes the subchannel to start to connect, so it reports CONNECTING.
1195 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1196 // LB policy should have reported CONNECTING state.
1197 ExpectConnectingUpdate();
1198 // The following things happen in order:
1199 // 1. We enqueue a READY notification for the subchannel in the
1200 // WorkSerializer, but do not yet execute it.
1201 // 2. We enqueue the Happy Eyeballs timer callback in the
1202 // WorkSerializer, but do not yet execute it.
1203 // 3. We shut down the LB policy. This will try to cancel the Happy
1204 // Eyeballs timer, but since the timer has already fired,
1205 // cancellation will fail.
1206 // 4. Now we drain the WorkSerializer queue. The LB policy sees the READY
1207 // notification. Before the bug fix, this caused us to select the
1208 // subchannel instead of ignoring the notification. With the bug fix,
1209 // this update should never actually be delivered to the LB policy,
1210 // since it will have already shut down the subchannel.
1211 // 5. The LB policy now sees the Happy Eyeballs timer callback. This
1212 // is a no-op, because the LB policy has already been shut down,
1213 // but it will release the last ref to the subchannel list.
1214 //
1215 // To get the ordering right here, we need to do steps 2 and 3
1216 // inside the WorkSerializer, after the READY notification has been
1217 // enqueued but before we drain the WorkSerializer queue.
1218 subchannel->SetConnectivityState(
1219 GRPC_CHANNEL_READY, /*status=*/absl::OkStatus(),
1220 /*validate_state_transition=*/true,
1221 /*run_before_flush=*/[&]() {
1222 // Step 2: Trigger the timer. The callback will be enqueued in
1223 // the WorkSerializer, but we don't drain it yet.
1224 IncrementTimeBy(Duration::Milliseconds(250),
1225 /*flush_work_serializer=*/false);
1226 // Step 3: Shut down the LB policy.
1227 lb_policy_.reset();
1228 });
1229 // Now the subchannel reports IDLE. Before the bug fix, this
1230 // triggered a crash.
1231 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
1232 }
1233
TEST_F(PickFirstTest,WithShuffle)1234 TEST_F(PickFirstTest, WithShuffle) {
1235 constexpr std::array<absl::string_view, 6> kAddresses = {
1236 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445",
1237 "ipv4:127.0.0.1:446", "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
1238 // 6 addresses have 6! = 720 permutations or roughly 0.14% chance that
1239 // the shuffle returns same permutation. We allow for several tries to
1240 // prevent flake test.
1241 constexpr size_t kMaxTries = 10;
1242 std::vector<absl::string_view> addresses_after_update;
1243 bool shuffled = false;
1244 for (size_t i = 0; i < kMaxTries; ++i) {
1245 absl::Status status = ApplyUpdate(
1246 BuildUpdate(kAddresses, MakePickFirstConfig(true)), lb_policy());
1247 EXPECT_TRUE(status.ok()) << status;
1248 GetOrderAddressesArePicked(kAddresses, &addresses_after_update);
1249 if (absl::MakeConstSpan(addresses_after_update) !=
1250 absl::MakeConstSpan(kAddresses)) {
1251 shuffled = true;
1252 break;
1253 }
1254 }
1255 ASSERT_TRUE(shuffled);
1256 // Address order should be stable between updates
1257 std::vector<absl::string_view> addresses_on_another_try;
1258 GetOrderAddressesArePicked(kAddresses, &addresses_on_another_try);
1259 EXPECT_EQ(addresses_on_another_try, addresses_after_update);
1260 }
1261
TEST_F(PickFirstTest,ShufflingDisabled)1262 TEST_F(PickFirstTest, ShufflingDisabled) {
1263 constexpr std::array<absl::string_view, 6> kAddresses = {
1264 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445",
1265 "ipv4:127.0.0.1:446", "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
1266 constexpr static size_t kMaxAttempts = 5;
1267 for (size_t attempt = 0; attempt < kMaxAttempts; ++attempt) {
1268 absl::Status status = ApplyUpdate(
1269 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
1270 EXPECT_TRUE(status.ok()) << status;
1271 std::vector<absl::string_view> address_order;
1272 GetOrderAddressesArePicked(kAddresses, &address_order);
1273 EXPECT_THAT(address_order, ::testing::ElementsAreArray(kAddresses));
1274 }
1275 }
1276
TEST_F(PickFirstTest,MetricDefinitionDisconnections)1277 TEST_F(PickFirstTest, MetricDefinitionDisconnections) {
1278 const auto* descriptor =
1279 GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1280 "grpc.lb.pick_first.disconnections");
1281 ASSERT_NE(descriptor, nullptr);
1282 EXPECT_EQ(descriptor->value_type,
1283 GlobalInstrumentsRegistry::ValueType::kUInt64);
1284 EXPECT_EQ(descriptor->instrument_type,
1285 GlobalInstrumentsRegistry::InstrumentType::kCounter);
1286 EXPECT_EQ(descriptor->enable_by_default, false);
1287 EXPECT_EQ(descriptor->name, "grpc.lb.pick_first.disconnections");
1288 EXPECT_EQ(descriptor->unit, "{disconnection}");
1289 EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1290 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1291 }
1292
TEST_F(PickFirstTest,MetricDefinitionConnectionAttemptsSucceeded)1293 TEST_F(PickFirstTest, MetricDefinitionConnectionAttemptsSucceeded) {
1294 const auto* descriptor =
1295 GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1296 "grpc.lb.pick_first.connection_attempts_succeeded");
1297 ASSERT_NE(descriptor, nullptr);
1298 EXPECT_EQ(descriptor->value_type,
1299 GlobalInstrumentsRegistry::ValueType::kUInt64);
1300 EXPECT_EQ(descriptor->instrument_type,
1301 GlobalInstrumentsRegistry::InstrumentType::kCounter);
1302 EXPECT_EQ(descriptor->enable_by_default, false);
1303 EXPECT_EQ(descriptor->name,
1304 "grpc.lb.pick_first.connection_attempts_succeeded");
1305 EXPECT_EQ(descriptor->unit, "{attempt}");
1306 EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1307 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1308 }
1309
TEST_F(PickFirstTest,MetricDefinitionConnectionAttemptsFailed)1310 TEST_F(PickFirstTest, MetricDefinitionConnectionAttemptsFailed) {
1311 const auto* descriptor =
1312 GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1313 "grpc.lb.pick_first.connection_attempts_failed");
1314 ASSERT_NE(descriptor, nullptr);
1315 EXPECT_EQ(descriptor->value_type,
1316 GlobalInstrumentsRegistry::ValueType::kUInt64);
1317 EXPECT_EQ(descriptor->instrument_type,
1318 GlobalInstrumentsRegistry::InstrumentType::kCounter);
1319 EXPECT_EQ(descriptor->enable_by_default, false);
1320 EXPECT_EQ(descriptor->name, "grpc.lb.pick_first.connection_attempts_failed");
1321 EXPECT_EQ(descriptor->unit, "{attempt}");
1322 EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
1323 EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1324 }
1325
TEST_F(PickFirstTest,MetricValues)1326 TEST_F(PickFirstTest, MetricValues) {
1327 const auto kDisconnections =
1328 GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
1329 "grpc.lb.pick_first.disconnections")
1330 .value();
1331 const auto kConnectionAttemptsSucceeded =
1332 GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
1333 "grpc.lb.pick_first.connection_attempts_succeeded")
1334 .value();
1335 const auto kConnectionAttemptsFailed =
1336 GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
1337 "grpc.lb.pick_first.connection_attempts_failed")
1338 .value();
1339 const absl::string_view kLabelValues[] = {target_};
1340 auto stats_plugin = std::make_shared<FakeStatsPlugin>(
1341 nullptr, /*use_disabled_by_default_metrics=*/true);
1342 stats_plugin_group_.AddStatsPlugin(stats_plugin, nullptr);
1343 // Send an update containing two addresses.
1344 constexpr std::array<absl::string_view, 2> kAddresses = {
1345 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
1346 absl::Status status = ApplyUpdate(
1347 BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
1348 EXPECT_TRUE(status.ok()) << status;
1349 // LB policy should have created a subchannel for both addresses.
1350 auto* subchannel = FindSubchannel(kAddresses[0]);
1351 ASSERT_NE(subchannel, nullptr);
1352 auto* subchannel2 = FindSubchannel(kAddresses[1]);
1353 ASSERT_NE(subchannel2, nullptr);
1354 // When the LB policy receives the first subchannel's initial connectivity
1355 // state notification (IDLE), it will request a connection.
1356 EXPECT_TRUE(subchannel->ConnectionRequested());
1357 // This causes the subchannel to start to connect, so it reports
1358 // CONNECTING.
1359 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1360 // LB policy should have reported CONNECTING state.
1361 ExpectConnectingUpdate();
1362 // The second subchannel should not be connecting.
1363 EXPECT_FALSE(subchannel2->ConnectionRequested());
1364 // The first subchannel's connection attempt fails.
1365 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
1366 absl::UnavailableError("failed to connect"));
1367 EXPECT_THAT(stats_plugin->GetUInt64CounterValue(kConnectionAttemptsFailed,
1368 kLabelValues, {}),
1369 ::testing::Optional(1));
1370 // The LB policy will start a connection attempt on the second subchannel.
1371 EXPECT_TRUE(subchannel2->ConnectionRequested());
1372 // This causes the subchannel to start to connect, so it reports
1373 // CONNECTING.
1374 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1375 // The connection attempt succeeds.
1376 subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
1377 EXPECT_THAT(stats_plugin->GetUInt64CounterValue(kConnectionAttemptsSucceeded,
1378 kLabelValues, {}),
1379 ::testing::Optional(1));
1380 // The LB policy will report CONNECTING some number of times (doesn't
1381 // matter how many) and then report READY.
1382 auto picker = WaitForConnected();
1383 ASSERT_NE(picker, nullptr);
1384 // Picker should return the same subchannel repeatedly.
1385 for (size_t i = 0; i < 3; ++i) {
1386 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]);
1387 }
1388 // Now the subchannel becomes disconnected.
1389 subchannel2->SetConnectivityState(GRPC_CHANNEL_IDLE);
1390 ExpectReresolutionRequest();
1391 ExpectState(GRPC_CHANNEL_IDLE);
1392 EXPECT_THAT(
1393 stats_plugin->GetUInt64CounterValue(kDisconnections, kLabelValues, {}),
1394 ::testing::Optional(1));
1395 }
1396
1397 class PickFirstHealthCheckingEnabledTest : public PickFirstTest {
1398 protected:
PickFirstHealthCheckingEnabledTest()1399 PickFirstHealthCheckingEnabledTest()
1400 : PickFirstTest(ChannelArgs().Set(
1401 GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING, true)) {}
1402 };
1403
TEST_F(PickFirstHealthCheckingEnabledTest,UpdateWithReadyChannel)1404 TEST_F(PickFirstHealthCheckingEnabledTest, UpdateWithReadyChannel) {
1405 constexpr absl::string_view kAddress = "ipv4:127.0.0.1:443";
1406 LoadBalancingPolicy::UpdateArgs update =
1407 BuildUpdate({kAddress}, MakePickFirstConfig());
1408 absl::Status status = ApplyUpdate(update, lb_policy());
1409 EXPECT_TRUE(status.ok()) << status;
1410 // LB policy should have created a subchannel for the address.
1411 auto* subchannel = FindSubchannel(kAddress);
1412 ASSERT_NE(subchannel, nullptr);
1413 // When the LB policy receives the first subchannel's initial connectivity
1414 // state notification (IDLE), it will request a connection.
1415 EXPECT_TRUE(subchannel->ConnectionRequested());
1416 // This causes the subchannel to start to connect, so it reports CONNECTING.
1417 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
1418 // LB policy should have reported CONNECTING state.
1419 ExpectConnectingUpdate();
1420 // When the subchannel becomes connected, it reports READY.
1421 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
1422 // The LB policy will report CONNECTING some number of times (doesn't
1423 // matter how many) and then report READY.
1424 auto picker = WaitForConnected();
1425 ASSERT_NE(picker, nullptr);
1426 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddress);
1427 // Reapply the same update we did before. The the underlying
1428 // subchannel will immediately become ready.
1429 status =
1430 ApplyUpdate(BuildUpdate({kAddress}, MakePickFirstConfig()), lb_policy());
1431 EXPECT_TRUE(status.ok()) << status;
1432 picker = ExpectState(GRPC_CHANNEL_READY);
1433 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddress);
1434 // At this point, NumWatchers() should account for our
1435 // subchannel connectivity watcher and our health watcher.
1436 EXPECT_EQ(subchannel->NumWatchers(), 2);
1437 }
1438
1439 } // namespace
1440 } // namespace testing
1441 } // namespace grpc_core
1442
main(int argc,char ** argv)1443 int main(int argc, char** argv) {
1444 ::testing::InitGoogleTest(&argc, argv);
1445 grpc::testing::TestEnvironment env(&argc, argv);
1446 return RUN_ALL_TESTS();
1447 }
1448