1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/load_balancing/pick_first/pick_first.h"
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/impl/channel_arg_names.h>
21 #include <grpc/impl/connectivity_state.h>
22 #include <grpc/support/port_platform.h>
23 #include <inttypes.h>
24 #include <string.h>
25
26 #include <memory>
27 #include <set>
28 #include <string>
29 #include <type_traits>
30 #include <utility>
31 #include <vector>
32
33 #include "absl/algorithm/container.h"
34 #include "absl/log/check.h"
35 #include "absl/log/log.h"
36 #include "absl/random/random.h"
37 #include "absl/status/status.h"
38 #include "absl/status/statusor.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/string_view.h"
41 #include "absl/types/optional.h"
42 #include "src/core/config/core_configuration.h"
43 #include "src/core/lib/address_utils/sockaddr_utils.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/debug/trace.h"
46 #include "src/core/lib/experiments/experiments.h"
47 #include "src/core/lib/iomgr/exec_ctx.h"
48 #include "src/core/lib/iomgr/iomgr_fwd.h"
49 #include "src/core/lib/iomgr/resolved_address.h"
50 #include "src/core/lib/transport/connectivity_state.h"
51 #include "src/core/load_balancing/health_check_client.h"
52 #include "src/core/load_balancing/lb_policy.h"
53 #include "src/core/load_balancing/lb_policy_factory.h"
54 #include "src/core/load_balancing/subchannel_interface.h"
55 #include "src/core/resolver/endpoint_addresses.h"
56 #include "src/core/telemetry/metrics.h"
57 #include "src/core/util/crash.h"
58 #include "src/core/util/debug_location.h"
59 #include "src/core/util/json/json.h"
60 #include "src/core/util/json/json_args.h"
61 #include "src/core/util/json/json_object_loader.h"
62 #include "src/core/util/orphanable.h"
63 #include "src/core/util/ref_counted_ptr.h"
64 #include "src/core/util/time.h"
65 #include "src/core/util/useful.h"
66 #include "src/core/util/work_serializer.h"
67
68 namespace grpc_core {
69
70 namespace {
71
72 //
73 // pick_first LB policy
74 //
75
76 constexpr absl::string_view kPickFirst = "pick_first";
77
78 const auto kMetricDisconnections =
79 GlobalInstrumentsRegistry::RegisterUInt64Counter(
80 "grpc.lb.pick_first.disconnections",
81 "EXPERIMENTAL. Number of times the selected subchannel becomes "
82 "disconnected.",
83 "{disconnection}", false)
84 .Labels(kMetricLabelTarget)
85 .Build();
86
87 const auto kMetricConnectionAttemptsSucceeded =
88 GlobalInstrumentsRegistry::RegisterUInt64Counter(
89 "grpc.lb.pick_first.connection_attempts_succeeded",
90 "EXPERIMENTAL. Number of successful connection attempts.", "{attempt}",
91 false)
92 .Labels(kMetricLabelTarget)
93 .Build();
94
95 const auto kMetricConnectionAttemptsFailed =
96 GlobalInstrumentsRegistry::RegisterUInt64Counter(
97 "grpc.lb.pick_first.connection_attempts_failed",
98 "EXPERIMENTAL. Number of failed connection attempts.", "{attempt}",
99 false)
100 .Labels(kMetricLabelTarget)
101 .Build();
102
103 class PickFirstConfig final : public LoadBalancingPolicy::Config {
104 public:
name() const105 absl::string_view name() const override { return kPickFirst; }
shuffle_addresses() const106 bool shuffle_addresses() const { return shuffle_addresses_; }
107
JsonLoader(const JsonArgs &)108 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
109 static const auto kJsonLoader =
110 JsonObjectLoader<PickFirstConfig>()
111 .OptionalField("shuffleAddressList",
112 &PickFirstConfig::shuffle_addresses_)
113 .Finish();
114 return kJsonLoader;
115 }
116
117 private:
118 bool shuffle_addresses_ = false;
119 };
120
121 class PickFirst final : public LoadBalancingPolicy {
122 public:
123 explicit PickFirst(Args args);
124
name() const125 absl::string_view name() const override { return kPickFirst; }
126
127 absl::Status UpdateLocked(UpdateArgs args) override;
128 void ExitIdleLocked() override;
129 void ResetBackoffLocked() override;
130
131 private:
132 ~PickFirst() override;
133
134 // A list of subchannels that we will attempt connections on.
135 class SubchannelList final : public InternallyRefCounted<SubchannelList> {
136 public:
137 // Data about the subchannel that is needed only while attempting to
138 // connect.
139 class SubchannelData final {
140 public:
141 // Stores the subchannel and its watcher. This is the state that
142 // is retained once a subchannel is chosen.
143 class SubchannelState final
144 : public InternallyRefCounted<SubchannelState> {
145 public:
146 SubchannelState(SubchannelData* subchannel_data,
147 RefCountedPtr<SubchannelInterface> subchannel);
148
149 void Orphan() override;
150
subchannel() const151 SubchannelInterface* subchannel() const { return subchannel_.get(); }
152
RequestConnection()153 void RequestConnection() { subchannel_->RequestConnection(); }
154
ResetBackoffLocked()155 void ResetBackoffLocked() { subchannel_->ResetBackoff(); }
156
157 private:
158 // Watcher for subchannel connectivity state.
159 class Watcher
160 : public SubchannelInterface::ConnectivityStateWatcherInterface {
161 public:
Watcher(RefCountedPtr<SubchannelState> subchannel_state)162 explicit Watcher(RefCountedPtr<SubchannelState> subchannel_state)
163 : subchannel_state_(std::move(subchannel_state)) {}
164
~Watcher()165 ~Watcher() override {
166 subchannel_state_.reset(DEBUG_LOCATION, "Watcher dtor");
167 }
168
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)169 void OnConnectivityStateChange(grpc_connectivity_state new_state,
170 absl::Status status) override {
171 subchannel_state_->OnConnectivityStateChange(new_state,
172 std::move(status));
173 }
174
interested_parties()175 grpc_pollset_set* interested_parties() override {
176 return subchannel_state_->pick_first_->interested_parties();
177 }
178
179 private:
180 RefCountedPtr<SubchannelState> subchannel_state_;
181 };
182
183 // Selects this subchannel. Called when the subchannel reports READY.
184 void Select();
185
186 // This method will be invoked once soon after instantiation to report
187 // the current connectivity state, and it will then be invoked again
188 // whenever the connectivity state changes.
189 void OnConnectivityStateChange(grpc_connectivity_state new_state,
190 absl::Status status);
191
192 // If non-null, then we are still part of a subchannel list
193 // trying to connect.
194 SubchannelData* subchannel_data_;
195
196 // TODO(roth): Once we remove pollset_set, we should no longer
197 // need to hold a ref to PickFirst. Instead, we can make this a
198 // raw pointer and put it in an absl::variant with subchannel_data_.
199 RefCountedPtr<PickFirst> pick_first_;
200
201 RefCountedPtr<SubchannelInterface> subchannel_;
202 SubchannelInterface::ConnectivityStateWatcherInterface* watcher_ =
203 nullptr;
204 };
205
206 SubchannelData(SubchannelList* subchannel_list, size_t index,
207 RefCountedPtr<SubchannelInterface> subchannel);
208
connectivity_state() const209 absl::optional<grpc_connectivity_state> connectivity_state() const {
210 return connectivity_state_;
211 }
connectivity_status() const212 const absl::Status& connectivity_status() const {
213 return connectivity_status_;
214 }
215
RequestConnection()216 void RequestConnection() { subchannel_state_->RequestConnection(); }
217
218 // Resets the connection backoff.
ResetBackoffLocked()219 void ResetBackoffLocked() { subchannel_state_->ResetBackoffLocked(); }
220
221 // Requests a connection attempt to start on this subchannel,
222 // with appropriate Connection Attempt Delay.
223 // Used only during the Happy Eyeballs pass.
224 void RequestConnectionWithTimer();
225
seen_transient_failure() const226 bool seen_transient_failure() const { return seen_transient_failure_; }
227
228 private:
229 // This method will be invoked once soon after instantiation to report
230 // the current connectivity state, and it will then be invoked again
231 // whenever the connectivity state changes.
232 void OnConnectivityStateChange(grpc_connectivity_state new_state,
233 absl::Status status);
234
235 // Backpointer to owning subchannel list. Not owned.
236 SubchannelList* subchannel_list_;
237 // Our index within subchannel_list_.
238 const size_t index_;
239 // Subchannel state.
240 OrphanablePtr<SubchannelState> subchannel_state_;
241 // Data updated by the watcher.
242 absl::optional<grpc_connectivity_state> connectivity_state_;
243 absl::Status connectivity_status_;
244 bool seen_transient_failure_ = false;
245 };
246
247 SubchannelList(RefCountedPtr<PickFirst> policy,
248 EndpointAddressesIterator* addresses,
249 const ChannelArgs& args, absl::string_view resolution_note);
250
251 ~SubchannelList() override;
252
253 void Orphan() override;
254
255 // The number of subchannels in the list.
size() const256 size_t size() const { return subchannels_.size(); }
257
258 // Resets connection backoff of all subchannels.
259 void ResetBackoffLocked();
260
IsHappyEyeballsPassComplete() const261 bool IsHappyEyeballsPassComplete() const {
262 // Checking attempting_index_ here is just an optimization -- if
263 // we haven't actually tried all subchannels yet, then we don't
264 // need to iterate.
265 if (attempting_index_ < size()) return false;
266 for (const auto& sd : subchannels_) {
267 if (!sd->seen_transient_failure()) return false;
268 }
269 return true;
270 }
271
272 void ReportTransientFailure(absl::Status status);
273
274 private:
275 // Returns true if all subchannels have seen their initial
276 // connectivity state notifications.
AllSubchannelsSeenInitialState() const277 bool AllSubchannelsSeenInitialState() const {
278 return num_subchannels_seen_initial_notification_ == size();
279 }
280
281 // Looks through subchannels_ starting from attempting_index_ to
282 // find the first one not currently in TRANSIENT_FAILURE, then
283 // triggers a connection attempt for that subchannel. If there are
284 // no more subchannels not in TRANSIENT_FAILURE, calls
285 // MaybeFinishHappyEyeballsPass().
286 void StartConnectingNextSubchannel();
287
288 // Checks to see if the initial Happy Eyeballs pass is complete --
289 // i.e., all subchannels have seen TRANSIENT_FAILURE state at least once.
290 // If so, transitions to a mode where we try to connect to all subchannels
291 // in parallel and returns true.
292 void MaybeFinishHappyEyeballsPass();
293
294 // Backpointer to owning policy.
295 RefCountedPtr<PickFirst> policy_;
296
297 ChannelArgs args_;
298 std::string resolution_note_;
299
300 // The list of subchannels.
301 std::vector<std::unique_ptr<SubchannelData>> subchannels_;
302
303 // Is this list shutting down? This may be true due to the shutdown of the
304 // policy itself or because a newer update has arrived while this one hadn't
305 // finished processing.
306 bool shutting_down_ = false;
307
308 size_t num_subchannels_seen_initial_notification_ = 0;
309
310 // The index into subchannels_ to which we are currently attempting
311 // to connect during the initial Happy Eyeballs pass. Once the
312 // initial pass is over, this will be equal to size().
313 size_t attempting_index_ = 0;
314 // Happy Eyeballs timer handle.
315 absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
316 timer_handle_;
317
318 // After the initial Happy Eyeballs pass, the number of failures
319 // we've seen. Every size() failures, we trigger re-resolution.
320 size_t num_failures_ = 0;
321
322 // The status from the last subchannel that reported TRANSIENT_FAILURE.
323 absl::Status last_failure_;
324 };
325
326 class HealthWatcher final
327 : public SubchannelInterface::ConnectivityStateWatcherInterface {
328 public:
HealthWatcher(RefCountedPtr<PickFirst> policy,absl::string_view resolution_note)329 HealthWatcher(RefCountedPtr<PickFirst> policy,
330 absl::string_view resolution_note)
331 : policy_(std::move(policy)), resolution_note_(resolution_note) {}
332
~HealthWatcher()333 ~HealthWatcher() override {
334 policy_.reset(DEBUG_LOCATION, "HealthWatcher dtor");
335 }
336
337 void OnConnectivityStateChange(grpc_connectivity_state new_state,
338 absl::Status status) override;
339
interested_parties()340 grpc_pollset_set* interested_parties() override {
341 return policy_->interested_parties();
342 }
343
344 private:
345 RefCountedPtr<PickFirst> policy_;
346 std::string resolution_note_;
347 };
348
349 class Picker final : public SubchannelPicker {
350 public:
Picker(RefCountedPtr<SubchannelInterface> subchannel)351 explicit Picker(RefCountedPtr<SubchannelInterface> subchannel)
352 : subchannel_(std::move(subchannel)) {}
353
Pick(PickArgs)354 PickResult Pick(PickArgs /*args*/) override {
355 return PickResult::Complete(subchannel_);
356 }
357
358 private:
359 RefCountedPtr<SubchannelInterface> subchannel_;
360 };
361
362 void ShutdownLocked() override;
363
364 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
365 RefCountedPtr<SubchannelPicker> picker);
366
367 void AttemptToConnectUsingLatestUpdateArgsLocked();
368
369 void UnsetSelectedSubchannel();
370
371 void GoIdle();
372
373 // When ExitIdleLocked() is called, we create a subchannel_list_ and start
374 // trying to connect, but we don't actually change state_ until the first
375 // subchannel reports CONNECTING. So in order to know if we're really
376 // idle, we need to check both state_ and subchannel_list_.
IsIdle() const377 bool IsIdle() const {
378 return state_ == GRPC_CHANNEL_IDLE && subchannel_list_ == nullptr;
379 }
380
381 // Whether we should enable health watching.
382 const bool enable_health_watch_;
383 // Whether we should omit our status message prefix.
384 const bool omit_status_message_prefix_;
385 // Connection Attempt Delay for Happy Eyeballs.
386 const Duration connection_attempt_delay_;
387
388 // Lateset update args.
389 UpdateArgs latest_update_args_;
390 // The list of subchannels that we're currently trying to connect to.
391 // Will generally be null when selected_ is set, except when we get a
392 // resolver update and need to check initial connectivity states for
393 // the new list to decide whether we keep using the existing
394 // connection or go IDLE.
395 OrphanablePtr<SubchannelList> subchannel_list_;
396 // Selected subchannel. Will generally be null when subchannel_list_
397 // is non-null, with the exception mentioned above.
398 OrphanablePtr<SubchannelList::SubchannelData::SubchannelState> selected_;
399 // Health watcher for the selected subchannel.
400 SubchannelInterface::ConnectivityStateWatcherInterface* health_watcher_ =
401 nullptr;
402 SubchannelInterface::DataWatcherInterface* health_data_watcher_ = nullptr;
403 // Current connectivity state.
404 grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
405 // Are we shut down?
406 bool shutdown_ = false;
407 // Random bit generator used for shuffling addresses if configured
408 absl::BitGen bit_gen_;
409 };
410
PickFirst(Args args)411 PickFirst::PickFirst(Args args)
412 : LoadBalancingPolicy(std::move(args)),
413 enable_health_watch_(
414 channel_args()
415 .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
416 .value_or(false)),
417 omit_status_message_prefix_(
418 channel_args()
419 .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)
420 .value_or(false)),
421 connection_attempt_delay_(Duration::Milliseconds(
422 Clamp(channel_args()
423 .GetInt(GRPC_ARG_HAPPY_EYEBALLS_CONNECTION_ATTEMPT_DELAY_MS)
424 .value_or(250),
425 100, 2000))) {
426 GRPC_TRACE_LOG(pick_first, INFO) << "Pick First " << this << " created.";
427 }
428
~PickFirst()429 PickFirst::~PickFirst() {
430 GRPC_TRACE_LOG(pick_first, INFO) << "Destroying Pick First " << this;
431 CHECK_EQ(subchannel_list_.get(), nullptr);
432 }
433
ShutdownLocked()434 void PickFirst::ShutdownLocked() {
435 GRPC_TRACE_LOG(pick_first, INFO) << "Pick First " << this << " Shutting down";
436 shutdown_ = true;
437 UnsetSelectedSubchannel();
438 subchannel_list_.reset();
439 }
440
ExitIdleLocked()441 void PickFirst::ExitIdleLocked() {
442 if (shutdown_) return;
443 if (IsIdle()) {
444 GRPC_TRACE_LOG(pick_first, INFO)
445 << "Pick First " << this << " exiting idle";
446 AttemptToConnectUsingLatestUpdateArgsLocked();
447 }
448 }
449
ResetBackoffLocked()450 void PickFirst::ResetBackoffLocked() {
451 if (subchannel_list_ != nullptr) subchannel_list_->ResetBackoffLocked();
452 }
453
AttemptToConnectUsingLatestUpdateArgsLocked()454 void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
455 // Create a subchannel list from latest_update_args_.
456 EndpointAddressesIterator* addresses = nullptr;
457 if (latest_update_args_.addresses.ok()) {
458 addresses = latest_update_args_.addresses->get();
459 }
460 // Replace subchannel_list_.
461 if (GRPC_TRACE_FLAG_ENABLED(pick_first) && subchannel_list_ != nullptr) {
462 LOG(INFO) << "[PF " << this << "] Shutting down previous subchannel list "
463 << subchannel_list_.get();
464 }
465 subchannel_list_ = MakeOrphanable<SubchannelList>(
466 RefAsSubclass<PickFirst>(DEBUG_LOCATION, "SubchannelList"), addresses,
467 latest_update_args_.args, latest_update_args_.resolution_note);
468 // Empty update or no valid subchannels. Put the channel in
469 // TRANSIENT_FAILURE and request re-resolution. Also unset the
470 // current selected subchannel.
471 if (subchannel_list_->size() == 0) {
472 channel_control_helper()->RequestReresolution();
473 absl::Status status = latest_update_args_.addresses.ok()
474 ? absl::UnavailableError("empty address list")
475 : latest_update_args_.addresses.status();
476 subchannel_list_->ReportTransientFailure(std::move(status));
477 UnsetSelectedSubchannel();
478 }
479 }
480
GetAddressFamily(const grpc_resolved_address & address)481 absl::string_view GetAddressFamily(const grpc_resolved_address& address) {
482 const char* uri_scheme = grpc_sockaddr_get_uri_scheme(&address);
483 return absl::string_view(uri_scheme == nullptr ? "other" : uri_scheme);
484 };
485
486 // An endpoint list iterator that returns only entries for a specific
487 // address family, as indicated by the URI scheme.
488 class AddressFamilyIterator final {
489 public:
AddressFamilyIterator(absl::string_view scheme,size_t index)490 AddressFamilyIterator(absl::string_view scheme, size_t index)
491 : scheme_(scheme), index_(index) {}
492
Next(EndpointAddressesList & endpoints,std::vector<bool> * endpoints_moved)493 EndpointAddresses* Next(EndpointAddressesList& endpoints,
494 std::vector<bool>* endpoints_moved) {
495 for (; index_ < endpoints.size(); ++index_) {
496 if (!(*endpoints_moved)[index_] &&
497 GetAddressFamily(endpoints[index_].address()) == scheme_) {
498 (*endpoints_moved)[index_] = true;
499 return &endpoints[index_++];
500 }
501 }
502 return nullptr;
503 }
504
505 private:
506 absl::string_view scheme_;
507 size_t index_;
508 };
509
UpdateLocked(UpdateArgs args)510 absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
511 if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
512 if (args.addresses.ok()) {
513 LOG(INFO) << "Pick First " << this << " received update";
514 } else {
515 LOG(INFO) << "Pick First " << this
516 << " received update with address error: "
517 << args.addresses.status();
518 }
519 }
520 // Set return status based on the address list.
521 absl::Status status;
522 if (!args.addresses.ok()) {
523 status = args.addresses.status();
524 } else {
525 EndpointAddressesList endpoints;
526 (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
527 endpoints.push_back(endpoint);
528 });
529 if (endpoints.empty()) {
530 status = absl::UnavailableError("address list must not be empty");
531 } else {
532 // Shuffle the list if needed.
533 auto config = static_cast<PickFirstConfig*>(args.config.get());
534 if (config->shuffle_addresses()) {
535 absl::c_shuffle(endpoints, bit_gen_);
536 }
537 // Flatten the list so that we have one address per endpoint.
538 // While we're iterating, also determine the desired address family
539 // order and the index of the first element of each family, for use in
540 // the interleaving below.
541 std::set<absl::string_view> address_families;
542 std::vector<AddressFamilyIterator> address_family_order;
543 EndpointAddressesList flattened_endpoints;
544 for (const auto& endpoint : endpoints) {
545 for (const auto& address : endpoint.addresses()) {
546 flattened_endpoints.emplace_back(address, endpoint.args());
547 absl::string_view scheme = GetAddressFamily(address);
548 bool inserted = address_families.insert(scheme).second;
549 if (inserted) {
550 address_family_order.emplace_back(scheme,
551 flattened_endpoints.size() - 1);
552 }
553 }
554 }
555 endpoints = std::move(flattened_endpoints);
556 // Interleave addresses as per RFC-8305 section 4.
557 EndpointAddressesList interleaved_endpoints;
558 interleaved_endpoints.reserve(endpoints.size());
559 std::vector<bool> endpoints_moved(endpoints.size());
560 size_t scheme_index = 0;
561 for (size_t i = 0; i < endpoints.size(); ++i) {
562 EndpointAddresses* endpoint;
563 do {
564 auto& iterator = address_family_order[scheme_index++ %
565 address_family_order.size()];
566 endpoint = iterator.Next(endpoints, &endpoints_moved);
567 } while (endpoint == nullptr);
568 interleaved_endpoints.emplace_back(std::move(*endpoint));
569 }
570 endpoints = std::move(interleaved_endpoints);
571 args.addresses =
572 std::make_shared<EndpointAddressesListIterator>(std::move(endpoints));
573 }
574 }
575 // If the update contains a resolver error and we have a previous update
576 // that was not a resolver error, keep using the previous addresses.
577 if (!args.addresses.ok() && latest_update_args_.config != nullptr) {
578 args.addresses = std::move(latest_update_args_.addresses);
579 }
580 // Update latest_update_args_.
581 latest_update_args_ = std::move(args);
582 // If we are not in idle, start connection attempt immediately.
583 // Otherwise, we defer the attempt into ExitIdleLocked().
584 if (!IsIdle()) {
585 AttemptToConnectUsingLatestUpdateArgsLocked();
586 }
587 return status;
588 }
589
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)590 void PickFirst::UpdateState(grpc_connectivity_state state,
591 const absl::Status& status,
592 RefCountedPtr<SubchannelPicker> picker) {
593 state_ = state;
594 channel_control_helper()->UpdateState(state, status, std::move(picker));
595 }
596
UnsetSelectedSubchannel()597 void PickFirst::UnsetSelectedSubchannel() {
598 if (selected_ != nullptr && health_data_watcher_ != nullptr) {
599 selected_->subchannel()->CancelDataWatcher(health_data_watcher_);
600 }
601 selected_.reset();
602 health_watcher_ = nullptr;
603 health_data_watcher_ = nullptr;
604 }
605
GoIdle()606 void PickFirst::GoIdle() {
607 // Unset the selected subchannel.
608 UnsetSelectedSubchannel();
609 // Drop the current subchannel list, if any.
610 subchannel_list_.reset();
611 // Request a re-resolution.
612 // TODO(qianchengz): We may want to request re-resolution in
613 // ExitIdleLocked() instead.
614 channel_control_helper()->RequestReresolution();
615 // Enter idle.
616 UpdateState(GRPC_CHANNEL_IDLE, absl::OkStatus(),
617 MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
618 }
619
620 //
621 // PickFirst::HealthWatcher
622 //
623
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)624 void PickFirst::HealthWatcher::OnConnectivityStateChange(
625 grpc_connectivity_state new_state, absl::Status status) {
626 if (policy_->health_watcher_ != this) return;
627 GRPC_TRACE_LOG(pick_first, INFO)
628 << "[PF " << policy_.get()
629 << "] health watch state update: " << ConnectivityStateName(new_state)
630 << " (" << status << ")";
631 switch (new_state) {
632 case GRPC_CHANNEL_READY:
633 policy_->channel_control_helper()->UpdateState(
634 GRPC_CHANNEL_READY, absl::OkStatus(),
635 MakeRefCounted<Picker>(policy_->selected_->subchannel()->Ref()));
636 break;
637 case GRPC_CHANNEL_IDLE:
638 // If the subchannel becomes disconnected, the health watcher
639 // might happen to see the change before the raw connectivity
640 // state watcher does. In this case, ignore it, since the raw
641 // connectivity state watcher will handle it shortly.
642 break;
643 case GRPC_CHANNEL_CONNECTING:
644 policy_->channel_control_helper()->UpdateState(
645 new_state, absl::OkStatus(),
646 MakeRefCounted<QueuePicker>(policy_->Ref()));
647 break;
648 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
649 std::string message = absl::StrCat("health watch: ", status.message());
650 if (!resolution_note_.empty()) {
651 absl::StrAppend(&message, " (", resolution_note_, ")");
652 }
653 policy_->channel_control_helper()->UpdateState(
654 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
655 MakeRefCounted<TransientFailurePicker>(
656 absl::UnavailableError(message)));
657 break;
658 }
659 case GRPC_CHANNEL_SHUTDOWN:
660 Crash("health watcher reported state SHUTDOWN");
661 }
662 }
663
664 //
665 // PickFirst::SubchannelList::SubchannelData::SubchannelState
666 //
667
SubchannelState(SubchannelData * subchannel_data,RefCountedPtr<SubchannelInterface> subchannel)668 PickFirst::SubchannelList::SubchannelData::SubchannelState::SubchannelState(
669 SubchannelData* subchannel_data,
670 RefCountedPtr<SubchannelInterface> subchannel)
671 : subchannel_data_(subchannel_data),
672 pick_first_(subchannel_data_->subchannel_list_->policy_),
673 subchannel_(std::move(subchannel)) {
674 GRPC_TRACE_LOG(pick_first, INFO)
675 << "[PF " << pick_first_.get() << "] subchannel state " << this
676 << " (subchannel " << subchannel_.get() << "): starting watch";
677 auto watcher = std::make_unique<Watcher>(Ref(DEBUG_LOCATION, "Watcher"));
678 watcher_ = watcher.get();
679 subchannel_->WatchConnectivityState(std::move(watcher));
680 }
681
Orphan()682 void PickFirst::SubchannelList::SubchannelData::SubchannelState::Orphan() {
683 GRPC_TRACE_LOG(pick_first, INFO)
684 << "[PF " << pick_first_.get() << "] subchannel state " << this
685 << " (subchannel " << subchannel_.get()
686 << "): cancelling watch and unreffing subchannel";
687 subchannel_data_ = nullptr;
688 subchannel_->CancelConnectivityStateWatch(watcher_);
689 watcher_ = nullptr;
690 subchannel_.reset();
691 pick_first_.reset();
692 Unref();
693 }
694
Select()695 void PickFirst::SubchannelList::SubchannelData::SubchannelState::Select() {
696 GRPC_TRACE_LOG(pick_first, INFO)
697 << "Pick First " << pick_first_.get() << " selected subchannel "
698 << subchannel_.get();
699 CHECK_NE(subchannel_data_, nullptr);
700 pick_first_->UnsetSelectedSubchannel(); // Cancel health watch, if any.
701 pick_first_->selected_ = std::move(subchannel_data_->subchannel_state_);
702 // If health checking is enabled, start the health watch, but don't
703 // report a new picker -- we want to stay in CONNECTING while we wait
704 // for the health status notification.
705 // If health checking is NOT enabled, report READY.
706 if (pick_first_->enable_health_watch_) {
707 GRPC_TRACE_LOG(pick_first, INFO)
708 << "[PF " << pick_first_.get() << "] starting health watch";
709 auto watcher = std::make_unique<HealthWatcher>(
710 pick_first_.Ref(DEBUG_LOCATION, "HealthWatcher"),
711 subchannel_data_->subchannel_list_->resolution_note_);
712 pick_first_->health_watcher_ = watcher.get();
713 auto health_data_watcher = MakeHealthCheckWatcher(
714 pick_first_->work_serializer(),
715 subchannel_data_->subchannel_list_->args_, std::move(watcher));
716 pick_first_->health_data_watcher_ = health_data_watcher.get();
717 subchannel_->AddDataWatcher(std::move(health_data_watcher));
718 } else {
719 pick_first_->UpdateState(GRPC_CHANNEL_READY, absl::OkStatus(),
720 MakeRefCounted<Picker>(subchannel_));
721 }
722 // Report successful connection.
723 // We consider it a successful connection attempt only if the
724 // previous state was CONNECTING. In particular, we don't want to
725 // increment this counter if we got a new address list and found the
726 // existing connection already in state READY.
727 if (subchannel_data_->connectivity_state_ == GRPC_CHANNEL_CONNECTING) {
728 auto& stats_plugins =
729 pick_first_->channel_control_helper()->GetStatsPluginGroup();
730 stats_plugins.AddCounter(
731 kMetricConnectionAttemptsSucceeded, 1,
732 {pick_first_->channel_control_helper()->GetTarget()}, {});
733 }
734 // Drop our pointer to subchannel_data_, so that we know not to
735 // interact with it on subsequent connectivity state updates.
736 subchannel_data_ = nullptr;
737 // Clean up subchannel list.
738 pick_first_->subchannel_list_.reset();
739 }
740
741 void PickFirst::SubchannelList::SubchannelData::SubchannelState::
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)742 OnConnectivityStateChange(grpc_connectivity_state new_state,
743 absl::Status status) {
744 if (watcher_ == nullptr) return;
745 GRPC_TRACE_LOG(pick_first, INFO)
746 << "[PF " << pick_first_.get() << "] subchannel state " << this
747 << " (subchannel " << subchannel_.get()
748 << "): connectivity changed: new_state="
749 << ConnectivityStateName(new_state) << ", status=" << status
750 << ", watcher=" << watcher_ << ", subchannel_data_=" << subchannel_data_
751 << ", pick_first_->selected_=" << pick_first_->selected_.get();
752 // If we're still part of a subchannel list trying to connect, check
753 // if we're connected.
754 if (subchannel_data_ != nullptr) {
755 CHECK_EQ(pick_first_->subchannel_list_.get(),
756 subchannel_data_->subchannel_list_);
757 // If the subchannel is READY, use it.
758 // Otherwise, tell the subchannel list to keep trying.
759 if (new_state == GRPC_CHANNEL_READY) {
760 Select();
761 } else {
762 subchannel_data_->OnConnectivityStateChange(new_state, std::move(status));
763 }
764 return;
765 }
766 // We aren't trying to connect, so we must be the selected subchannel.
767 CHECK_EQ(pick_first_->selected_.get(), this);
768 GRPC_TRACE_LOG(pick_first, INFO)
769 << "Pick First " << pick_first_.get()
770 << " selected subchannel connectivity changed to "
771 << ConnectivityStateName(new_state);
772 // Any state change is considered to be a failure of the existing
773 // connection. Report the failure.
774 auto& stats_plugins =
775 pick_first_->channel_control_helper()->GetStatsPluginGroup();
776 stats_plugins.AddCounter(kMetricDisconnections, 1,
777 {pick_first_->channel_control_helper()->GetTarget()},
778 {});
779 // Report IDLE.
780 pick_first_->GoIdle();
781 }
782
783 //
784 // PickFirst::SubchannelList::SubchannelData
785 //
786
SubchannelData(SubchannelList * subchannel_list,size_t index,RefCountedPtr<SubchannelInterface> subchannel)787 PickFirst::SubchannelList::SubchannelData::SubchannelData(
788 SubchannelList* subchannel_list, size_t index,
789 RefCountedPtr<SubchannelInterface> subchannel)
790 : subchannel_list_(subchannel_list), index_(index) {
791 GRPC_TRACE_LOG(pick_first, INFO)
792 << "[PF " << subchannel_list_->policy_.get() << "] subchannel list "
793 << subchannel_list_ << " index " << index_
794 << ": creating subchannel data";
795 subchannel_state_ =
796 MakeOrphanable<SubchannelState>(this, std::move(subchannel));
797 }
798
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)799 void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
800 grpc_connectivity_state new_state, absl::Status status) {
801 PickFirst* p = subchannel_list_->policy_.get();
802 GRPC_TRACE_LOG(pick_first, INFO)
803 << "[PF " << p << "] subchannel list " << subchannel_list_ << " index "
804 << index_ << " of " << subchannel_list_->size() << " (subchannel_state "
805 << subchannel_state_.get() << "): connectivity changed: old_state="
806 << (connectivity_state_.has_value()
807 ? ConnectivityStateName(*connectivity_state_)
808 : "N/A")
809 << ", new_state=" << ConnectivityStateName(new_state)
810 << ", status=" << status
811 << ", seen_transient_failure=" << seen_transient_failure_
812 << ", p->selected_=" << p->selected_.get()
813 << ", p->subchannel_list_=" << p->subchannel_list_.get()
814 << ", p->subchannel_list_->shutting_down_="
815 << p->subchannel_list_->shutting_down_;
816 if (subchannel_list_->shutting_down_) return;
817 // The notification must be for a subchannel in the current list.
818 CHECK_EQ(subchannel_list_, p->subchannel_list_.get());
819 // SHUTDOWN should never happen.
820 CHECK_NE(new_state, GRPC_CHANNEL_SHUTDOWN);
821 // READY should be caught by SubchannelState, in which case it will
822 // not call us in the first place.
823 CHECK_NE(new_state, GRPC_CHANNEL_READY);
824 // Update state.
825 absl::optional<grpc_connectivity_state> old_state = connectivity_state_;
826 connectivity_state_ = new_state;
827 connectivity_status_ = std::move(status);
828 // Make sure we note when a subchannel has seen TRANSIENT_FAILURE.
829 bool prev_seen_transient_failure = seen_transient_failure_;
830 if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
831 seen_transient_failure_ = true;
832 subchannel_list_->last_failure_ = connectivity_status_;
833 }
834 // If this is the initial connectivity state update for this subchannel,
835 // increment the counter in the subchannel list.
836 if (!old_state.has_value()) {
837 ++subchannel_list_->num_subchannels_seen_initial_notification_;
838 }
839 // If we haven't yet seen the initial connectivity state notification
840 // for all subchannels, do nothing.
841 if (!subchannel_list_->AllSubchannelsSeenInitialState()) return;
842 // If we're still here and this is the initial connectivity state
843 // notification for this subchannel, that means it was the last one to
844 // see its initial notification. So we now have enough state to
845 // figure out how to proceed.
846 if (!old_state.has_value()) {
847 // If we already have a selected subchannel and we got here, that
848 // means that none of the subchannels on the new list are in READY
849 // state, which means that the address we're currently connected to
850 // is not in the new list. In that case, we drop the current
851 // connection and report IDLE.
852 if (p->selected_ != nullptr) {
853 GRPC_TRACE_LOG(pick_first, INFO)
854 << "[PF " << p << "] subchannel list " << subchannel_list_
855 << ": new update has no subchannels in state READY; dropping "
856 "existing connection and going IDLE";
857 p->GoIdle();
858 } else {
859 // Start trying to connect, starting with the first subchannel.
860 subchannel_list_->StartConnectingNextSubchannel();
861 }
862 return;
863 }
864 // We've already started trying to connect. Any subchannel that
865 // reports TF is a connection attempt failure.
866 if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
867 auto& stats_plugins = subchannel_list_->policy_->channel_control_helper()
868 ->GetStatsPluginGroup();
869 stats_plugins.AddCounter(
870 kMetricConnectionAttemptsFailed, 1,
871 {subchannel_list_->policy_->channel_control_helper()->GetTarget()}, {});
872 }
873 // Otherwise, process connectivity state change.
874 switch (*connectivity_state_) {
875 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
876 // If this is the first failure we've seen on this subchannel,
877 // then we're still in the Happy Eyeballs pass.
878 if (!prev_seen_transient_failure && seen_transient_failure_) {
879 // If a connection attempt fails before the timer fires, then
880 // cancel the timer and start connecting on the next subchannel.
881 if (index_ == subchannel_list_->attempting_index_) {
882 if (subchannel_list_->timer_handle_.has_value()) {
883 p->channel_control_helper()->GetEventEngine()->Cancel(
884 *subchannel_list_->timer_handle_);
885 }
886 ++subchannel_list_->attempting_index_;
887 subchannel_list_->StartConnectingNextSubchannel();
888 } else {
889 // If this was the last subchannel to fail, check if the Happy
890 // Eyeballs pass is complete.
891 subchannel_list_->MaybeFinishHappyEyeballsPass();
892 }
893 } else if (subchannel_list_->IsHappyEyeballsPassComplete()) {
894 // We're done with the initial Happy Eyeballs pass and in a mode
895 // where we're attempting to connect to every subchannel in
896 // parallel. We count the number of failed connection attempts,
897 // and when that is equal to the number of subchannels, request
898 // re-resolution and report TRANSIENT_FAILURE again, so that the
899 // caller has the most recent status message. Note that this
900 // isn't necessarily the same as saying that we've seen one
901 // failure for each subchannel in the list, because the backoff
902 // state may be different in each subchannel, so we may have seen
903 // one subchannel fail more than once and another subchannel not
904 // fail at all. But it's a good enough heuristic.
905 ++subchannel_list_->num_failures_;
906 if (subchannel_list_->num_failures_ % subchannel_list_->size() == 0) {
907 p->channel_control_helper()->RequestReresolution();
908 status = absl::UnavailableError(absl::StrCat(
909 (p->omit_status_message_prefix_
910 ? ""
911 : "failed to connect to all addresses; last error: "),
912 connectivity_status_.ToString()));
913 subchannel_list_->ReportTransientFailure(std::move(status));
914 }
915 }
916 break;
917 }
918 case GRPC_CHANNEL_IDLE:
919 // If we've finished the first Happy Eyeballs pass, then we go
920 // into a mode where we immediately try to connect to every
921 // subchannel in parallel.
922 if (subchannel_list_->IsHappyEyeballsPassComplete()) {
923 subchannel_state_->RequestConnection();
924 }
925 break;
926 case GRPC_CHANNEL_CONNECTING:
927 // Only update connectivity state only if we're not already in
928 // TRANSIENT_FAILURE.
929 // TODO(roth): Squelch duplicate CONNECTING updates.
930 if (p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
931 p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::OkStatus(),
932 MakeRefCounted<QueuePicker>(nullptr));
933 }
934 break;
935 default:
936 // We handled READY above, and we should never see SHUTDOWN.
937 GPR_UNREACHABLE_CODE(break);
938 }
939 }
940
RequestConnectionWithTimer()941 void PickFirst::SubchannelList::SubchannelData::RequestConnectionWithTimer() {
942 CHECK(connectivity_state_.has_value());
943 if (connectivity_state_ == GRPC_CHANNEL_IDLE) {
944 subchannel_state_->RequestConnection();
945 } else {
946 CHECK_EQ(connectivity_state_.value(), GRPC_CHANNEL_CONNECTING);
947 }
948 // If this is not the last subchannel in the list, start the timer.
949 if (index_ != subchannel_list_->size() - 1) {
950 PickFirst* p = subchannel_list_->policy_.get();
951 GRPC_TRACE_LOG(pick_first, INFO)
952 << "Pick First " << p << " subchannel list " << subchannel_list_
953 << ": starting Connection Attempt Delay timer for "
954 << p->connection_attempt_delay_.millis() << "ms for index " << index_;
955 subchannel_list_->timer_handle_ =
956 p->channel_control_helper()->GetEventEngine()->RunAfter(
957 p->connection_attempt_delay_,
958 [subchannel_list =
959 subchannel_list_->Ref(DEBUG_LOCATION, "timer")]() mutable {
960 ApplicationCallbackExecCtx application_exec_ctx;
961 ExecCtx exec_ctx;
962 auto* sl = subchannel_list.get();
963 sl->policy_->work_serializer()->Run(
964 [subchannel_list = std::move(subchannel_list)]() {
965 GRPC_TRACE_LOG(pick_first, INFO)
966 << "Pick First " << subchannel_list->policy_.get()
967 << " subchannel list " << subchannel_list.get()
968 << ": Connection Attempt Delay timer fired "
969 "(shutting_down="
970 << subchannel_list->shutting_down_ << ", selected="
971 << subchannel_list->policy_->selected_.get() << ")";
972 if (subchannel_list->shutting_down_) return;
973 if (subchannel_list->policy_->selected_ != nullptr) return;
974 ++subchannel_list->attempting_index_;
975 subchannel_list->StartConnectingNextSubchannel();
976 },
977 DEBUG_LOCATION);
978 });
979 }
980 }
981
982 //
983 // PickFirst::SubchannelList
984 //
985
SubchannelList(RefCountedPtr<PickFirst> policy,EndpointAddressesIterator * addresses,const ChannelArgs & args,absl::string_view resolution_note)986 PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
987 EndpointAddressesIterator* addresses,
988 const ChannelArgs& args,
989 absl::string_view resolution_note)
990 : InternallyRefCounted<SubchannelList>(
991 GRPC_TRACE_FLAG_ENABLED(pick_first) ? "SubchannelList" : nullptr),
992 policy_(std::move(policy)),
993 args_(
994 args.Remove(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
995 .Remove(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)),
996 resolution_note_(resolution_note) {
997 GRPC_TRACE_LOG(pick_first, INFO)
998 << "[PF " << policy_.get() << "] Creating subchannel list " << this
999 << " - channel args: " << args_.ToString();
1000 if (addresses == nullptr) return;
1001 // Create a subchannel for each address.
1002 addresses->ForEach([&](const EndpointAddresses& address) {
1003 CHECK_EQ(address.addresses().size(), 1u);
1004 RefCountedPtr<SubchannelInterface> subchannel =
1005 policy_->channel_control_helper()->CreateSubchannel(
1006 address.address(), address.args(), args_);
1007 if (subchannel == nullptr) {
1008 // Subchannel could not be created.
1009 GRPC_TRACE_LOG(pick_first, INFO)
1010 << "[PF " << policy_.get()
1011 << "] could not create subchannel for address " << address.ToString()
1012 << ", ignoring";
1013 return;
1014 }
1015 GRPC_TRACE_LOG(pick_first, INFO)
1016 << "[PF " << policy_.get() << "] subchannel list " << this << " index "
1017 << subchannels_.size() << ": Created subchannel " << subchannel.get()
1018 << " for address " << address.ToString();
1019 subchannels_.emplace_back(std::make_unique<SubchannelData>(
1020 this, subchannels_.size(), std::move(subchannel)));
1021 });
1022 }
1023
~SubchannelList()1024 PickFirst::SubchannelList::~SubchannelList() {
1025 GRPC_TRACE_LOG(pick_first, INFO)
1026 << "[PF " << policy_.get() << "] Destroying subchannel_list " << this;
1027 }
1028
Orphan()1029 void PickFirst::SubchannelList::Orphan() {
1030 GRPC_TRACE_LOG(pick_first, INFO)
1031 << "[PF " << policy_.get() << "] Shutting down subchannel_list " << this;
1032 CHECK(!shutting_down_);
1033 shutting_down_ = true;
1034 // Shut down subchannels.
1035 subchannels_.clear();
1036 // Cancel Happy Eyeballs timer, if any.
1037 if (timer_handle_.has_value()) {
1038 policy_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
1039 }
1040 Unref();
1041 }
1042
ResetBackoffLocked()1043 void PickFirst::SubchannelList::ResetBackoffLocked() {
1044 for (auto& sd : subchannels_) {
1045 sd->ResetBackoffLocked();
1046 }
1047 }
1048
ReportTransientFailure(absl::Status status)1049 void PickFirst::SubchannelList::ReportTransientFailure(absl::Status status) {
1050 if (!resolution_note_.empty()) {
1051 status = absl::Status(status.code(), absl::StrCat(status.message(), " (",
1052 resolution_note_, ")"));
1053 }
1054 policy_->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1055 MakeRefCounted<TransientFailurePicker>(status));
1056 }
1057
StartConnectingNextSubchannel()1058 void PickFirst::SubchannelList::StartConnectingNextSubchannel() {
1059 // Find the next subchannel not in state TRANSIENT_FAILURE.
1060 // We skip subchannels in state TRANSIENT_FAILURE to avoid a
1061 // large recursion that could overflow the stack.
1062 for (; attempting_index_ < size(); ++attempting_index_) {
1063 SubchannelData* sc = subchannels_[attempting_index_].get();
1064 CHECK(sc->connectivity_state().has_value());
1065 if (sc->connectivity_state() != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1066 // Found a subchannel not in TRANSIENT_FAILURE, so trigger a
1067 // connection attempt.
1068 sc->RequestConnectionWithTimer();
1069 return;
1070 }
1071 }
1072 // If we didn't find a subchannel to request a connection on, check to
1073 // see if the Happy Eyeballs pass is complete.
1074 MaybeFinishHappyEyeballsPass();
1075 }
1076
MaybeFinishHappyEyeballsPass()1077 void PickFirst::SubchannelList::MaybeFinishHappyEyeballsPass() {
1078 // Make sure all subchannels have finished a connection attempt before
1079 // we consider the Happy Eyeballs pass complete.
1080 if (!IsHappyEyeballsPassComplete()) return;
1081 // We didn't find another subchannel not in state TRANSIENT_FAILURE,
1082 // so report TRANSIENT_FAILURE and switch to a mode in which we try to
1083 // connect to all addresses in parallel.
1084 GRPC_TRACE_LOG(pick_first, INFO)
1085 << "Pick First " << policy_.get() << " subchannel list " << this
1086 << " failed to connect to all subchannels";
1087 // Re-resolve and report TRANSIENT_FAILURE.
1088 policy_->channel_control_helper()->RequestReresolution();
1089 absl::Status status = absl::UnavailableError(
1090 absl::StrCat((policy_->omit_status_message_prefix_
1091 ? ""
1092 : "failed to connect to all addresses; last error: "),
1093 last_failure_.ToString()));
1094 ReportTransientFailure(std::move(status));
1095 // Drop the existing (working) connection, if any. This may be
1096 // sub-optimal, but we can't ignore what the control plane told us.
1097 policy_->UnsetSelectedSubchannel();
1098 // We now transition into a mode where we try to connect to all
1099 // subchannels in parallel. For any subchannel currently in IDLE,
1100 // trigger a connection attempt. For any subchannel not currently in
1101 // IDLE, we will trigger a connection attempt when it does report IDLE.
1102 for (auto& sd : subchannels_) {
1103 if (sd->connectivity_state() == GRPC_CHANNEL_IDLE) {
1104 sd->RequestConnection();
1105 }
1106 }
1107 }
1108
1109 // TODO(roth): Remove this when the pick_first_new experiment is removed.
1110 class OldPickFirst final : public LoadBalancingPolicy {
1111 public:
1112 explicit OldPickFirst(Args args);
1113
name() const1114 absl::string_view name() const override { return kPickFirst; }
1115
1116 absl::Status UpdateLocked(UpdateArgs args) override;
1117 void ExitIdleLocked() override;
1118 void ResetBackoffLocked() override;
1119
1120 private:
1121 ~OldPickFirst() override;
1122
1123 class SubchannelList final : public InternallyRefCounted<SubchannelList> {
1124 public:
1125 class SubchannelData final {
1126 public:
1127 SubchannelData(SubchannelList* subchannel_list, size_t index,
1128 RefCountedPtr<SubchannelInterface> subchannel);
1129
subchannel() const1130 SubchannelInterface* subchannel() const { return subchannel_.get(); }
connectivity_state() const1131 absl::optional<grpc_connectivity_state> connectivity_state() const {
1132 return connectivity_state_;
1133 }
connectivity_status() const1134 const absl::Status& connectivity_status() const {
1135 return connectivity_status_;
1136 }
1137
1138 // Resets the connection backoff.
ResetBackoffLocked()1139 void ResetBackoffLocked() {
1140 if (subchannel_ != nullptr) subchannel_->ResetBackoff();
1141 }
1142
RequestConnection()1143 void RequestConnection() { subchannel_->RequestConnection(); }
1144
1145 // Requests a connection attempt to start on this subchannel,
1146 // with appropriate Connection Attempt Delay.
1147 // Used only during the Happy Eyeballs pass.
1148 void RequestConnectionWithTimer();
1149
1150 // Cancels any pending connectivity watch and unrefs the subchannel.
1151 void ShutdownLocked();
1152
seen_transient_failure() const1153 bool seen_transient_failure() const { return seen_transient_failure_; }
1154
1155 private:
1156 // Watcher for subchannel connectivity state.
1157 class Watcher final
1158 : public SubchannelInterface::ConnectivityStateWatcherInterface {
1159 public:
Watcher(RefCountedPtr<SubchannelList> subchannel_list,size_t index)1160 Watcher(RefCountedPtr<SubchannelList> subchannel_list, size_t index)
1161 : subchannel_list_(std::move(subchannel_list)), index_(index) {}
1162
~Watcher()1163 ~Watcher() override {
1164 subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor");
1165 }
1166
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)1167 void OnConnectivityStateChange(grpc_connectivity_state new_state,
1168 absl::Status status) override {
1169 subchannel_list_->subchannels_[index_].OnConnectivityStateChange(
1170 new_state, std::move(status));
1171 }
1172
interested_parties()1173 grpc_pollset_set* interested_parties() override {
1174 return subchannel_list_->policy_->interested_parties();
1175 }
1176
1177 private:
1178 RefCountedPtr<SubchannelList> subchannel_list_;
1179 const size_t index_;
1180 };
1181
1182 // This method will be invoked once soon after instantiation to report
1183 // the current connectivity state, and it will then be invoked again
1184 // whenever the connectivity state changes.
1185 void OnConnectivityStateChange(grpc_connectivity_state new_state,
1186 absl::Status status);
1187
1188 // Processes the connectivity change to READY for an unselected
1189 // subchannel.
1190 void ProcessUnselectedReadyLocked();
1191
1192 // Backpointer to owning subchannel list. Not owned.
1193 SubchannelList* subchannel_list_;
1194 const size_t index_;
1195 // The subchannel.
1196 RefCountedPtr<SubchannelInterface> subchannel_;
1197 // Will be non-null when the subchannel's state is being watched.
1198 SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
1199 nullptr;
1200 // Data updated by the watcher.
1201 absl::optional<grpc_connectivity_state> connectivity_state_;
1202 absl::Status connectivity_status_;
1203 bool seen_transient_failure_ = false;
1204 };
1205
1206 SubchannelList(RefCountedPtr<OldPickFirst> policy,
1207 EndpointAddressesIterator* addresses,
1208 const ChannelArgs& args, absl::string_view resolution_note);
1209
1210 ~SubchannelList() override;
1211
1212 // The number of subchannels in the list.
size() const1213 size_t size() const { return subchannels_.size(); }
1214
1215 // Resets connection backoff of all subchannels.
1216 void ResetBackoffLocked();
1217
1218 void Orphan() override;
1219
IsHappyEyeballsPassComplete() const1220 bool IsHappyEyeballsPassComplete() const {
1221 // Checking attempting_index_ here is just an optimization -- if
1222 // we haven't actually tried all subchannels yet, then we don't
1223 // need to iterate.
1224 if (attempting_index_ < size()) return false;
1225 for (const SubchannelData& sd : subchannels_) {
1226 if (!sd.seen_transient_failure()) return false;
1227 }
1228 return true;
1229 }
1230
1231 void ReportTransientFailure(absl::Status status);
1232
1233 private:
1234 // Returns true if all subchannels have seen their initial
1235 // connectivity state notifications.
AllSubchannelsSeenInitialState() const1236 bool AllSubchannelsSeenInitialState() const {
1237 return num_subchannels_seen_initial_notification_ == size();
1238 }
1239
1240 // Looks through subchannels_ starting from attempting_index_ to
1241 // find the first one not currently in TRANSIENT_FAILURE, then
1242 // triggers a connection attempt for that subchannel. If there are
1243 // no more subchannels not in TRANSIENT_FAILURE, calls
1244 // MaybeFinishHappyEyeballsPass().
1245 void StartConnectingNextSubchannel();
1246
1247 // Checks to see if the initial Happy Eyeballs pass is complete --
1248 // i.e., all subchannels have seen TRANSIENT_FAILURE state at least once.
1249 // If so, transitions to a mode where we try to connect to all subchannels
1250 // in parallel and returns true.
1251 void MaybeFinishHappyEyeballsPass();
1252
1253 // Backpointer to owning policy.
1254 RefCountedPtr<OldPickFirst> policy_;
1255
1256 ChannelArgs args_;
1257 std::string resolution_note_;
1258
1259 // The list of subchannels.
1260 std::vector<SubchannelData> subchannels_;
1261
1262 // Is this list shutting down? This may be true due to the shutdown of the
1263 // policy itself or because a newer update has arrived while this one hadn't
1264 // finished processing.
1265 bool shutting_down_ = false;
1266
1267 size_t num_subchannels_seen_initial_notification_ = 0;
1268
1269 // The index into subchannels_ to which we are currently attempting
1270 // to connect during the initial Happy Eyeballs pass. Once the
1271 // initial pass is over, this will be equal to size().
1272 size_t attempting_index_ = 0;
1273 // Happy Eyeballs timer handle.
1274 absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
1275 timer_handle_;
1276
1277 // After the initial Happy Eyeballs pass, the number of failures
1278 // we've seen. Every size() failures, we trigger re-resolution.
1279 size_t num_failures_ = 0;
1280
1281 // The status from the last subchannel that reported TRANSIENT_FAILURE.
1282 absl::Status last_failure_;
1283 };
1284
1285 class HealthWatcher final
1286 : public SubchannelInterface::ConnectivityStateWatcherInterface {
1287 public:
HealthWatcher(RefCountedPtr<OldPickFirst> policy,absl::string_view resolution_note)1288 HealthWatcher(RefCountedPtr<OldPickFirst> policy,
1289 absl::string_view resolution_note)
1290 : policy_(std::move(policy)), resolution_note_(resolution_note) {}
1291
~HealthWatcher()1292 ~HealthWatcher() override {
1293 policy_.reset(DEBUG_LOCATION, "HealthWatcher dtor");
1294 }
1295
1296 void OnConnectivityStateChange(grpc_connectivity_state new_state,
1297 absl::Status status) override;
1298
interested_parties()1299 grpc_pollset_set* interested_parties() override {
1300 return policy_->interested_parties();
1301 }
1302
1303 private:
1304 RefCountedPtr<OldPickFirst> policy_;
1305 std::string resolution_note_;
1306 };
1307
1308 class Picker final : public SubchannelPicker {
1309 public:
Picker(RefCountedPtr<SubchannelInterface> subchannel)1310 explicit Picker(RefCountedPtr<SubchannelInterface> subchannel)
1311 : subchannel_(std::move(subchannel)) {}
1312
Pick(PickArgs)1313 PickResult Pick(PickArgs /*args*/) override {
1314 return PickResult::Complete(subchannel_);
1315 }
1316
1317 private:
1318 RefCountedPtr<SubchannelInterface> subchannel_;
1319 };
1320
1321 void ShutdownLocked() override;
1322
1323 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
1324 RefCountedPtr<SubchannelPicker> picker);
1325
1326 void AttemptToConnectUsingLatestUpdateArgsLocked();
1327
1328 void UnsetSelectedSubchannel();
1329
1330 // When ExitIdleLocked() is called, we create a subchannel_list_ and start
1331 // trying to connect, but we don't actually change state_ until the first
1332 // subchannel reports CONNECTING. So in order to know if we're really
1333 // idle, we need to check both state_ and subchannel_list_.
IsIdle() const1334 bool IsIdle() const {
1335 return state_ == GRPC_CHANNEL_IDLE && subchannel_list_ == nullptr;
1336 }
1337
1338 // Whether we should enable health watching.
1339 const bool enable_health_watch_;
1340 // Whether we should omit our status message prefix.
1341 const bool omit_status_message_prefix_;
1342 // Connection Attempt Delay for Happy Eyeballs.
1343 const Duration connection_attempt_delay_;
1344
1345 // Lateset update args.
1346 UpdateArgs latest_update_args_;
1347 // All our subchannels.
1348 OrphanablePtr<SubchannelList> subchannel_list_;
1349 // Latest pending subchannel list.
1350 OrphanablePtr<SubchannelList> latest_pending_subchannel_list_;
1351 // Selected subchannel in subchannel_list_.
1352 SubchannelList::SubchannelData* selected_ = nullptr;
1353 // Health watcher for the selected subchannel.
1354 SubchannelInterface::ConnectivityStateWatcherInterface* health_watcher_ =
1355 nullptr;
1356 SubchannelInterface::DataWatcherInterface* health_data_watcher_ = nullptr;
1357 // Current connectivity state.
1358 grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
1359 // Are we shut down?
1360 bool shutdown_ = false;
1361 // Random bit generator used for shuffling addresses if configured
1362 absl::BitGen bit_gen_;
1363 };
1364
OldPickFirst(Args args)1365 OldPickFirst::OldPickFirst(Args args)
1366 : LoadBalancingPolicy(std::move(args)),
1367 enable_health_watch_(
1368 channel_args()
1369 .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
1370 .value_or(false)),
1371 omit_status_message_prefix_(
1372 channel_args()
1373 .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)
1374 .value_or(false)),
1375 connection_attempt_delay_(Duration::Milliseconds(
1376 Clamp(channel_args()
1377 .GetInt(GRPC_ARG_HAPPY_EYEBALLS_CONNECTION_ATTEMPT_DELAY_MS)
1378 .value_or(250),
1379 100, 2000))) {
1380 GRPC_TRACE_LOG(pick_first, INFO) << "Pick First " << this << " created.";
1381 }
1382
~OldPickFirst()1383 OldPickFirst::~OldPickFirst() {
1384 GRPC_TRACE_LOG(pick_first, INFO) << "Destroying Pick First " << this;
1385 CHECK(subchannel_list_ == nullptr);
1386 CHECK(latest_pending_subchannel_list_ == nullptr);
1387 }
1388
ShutdownLocked()1389 void OldPickFirst::ShutdownLocked() {
1390 GRPC_TRACE_LOG(pick_first, INFO) << "Pick First " << this << " Shutting down";
1391 shutdown_ = true;
1392 UnsetSelectedSubchannel();
1393 subchannel_list_.reset();
1394 latest_pending_subchannel_list_.reset();
1395 }
1396
ExitIdleLocked()1397 void OldPickFirst::ExitIdleLocked() {
1398 if (shutdown_) return;
1399 if (IsIdle()) {
1400 GRPC_TRACE_LOG(pick_first, INFO)
1401 << "Pick First " << this << " exiting idle";
1402 AttemptToConnectUsingLatestUpdateArgsLocked();
1403 }
1404 }
1405
ResetBackoffLocked()1406 void OldPickFirst::ResetBackoffLocked() {
1407 if (subchannel_list_ != nullptr) subchannel_list_->ResetBackoffLocked();
1408 if (latest_pending_subchannel_list_ != nullptr) {
1409 latest_pending_subchannel_list_->ResetBackoffLocked();
1410 }
1411 }
1412
AttemptToConnectUsingLatestUpdateArgsLocked()1413 void OldPickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
1414 // Create a subchannel list from latest_update_args_.
1415 EndpointAddressesIterator* addresses = nullptr;
1416 if (latest_update_args_.addresses.ok()) {
1417 addresses = latest_update_args_.addresses->get();
1418 }
1419 // Replace latest_pending_subchannel_list_.
1420 if (GRPC_TRACE_FLAG_ENABLED(pick_first) &&
1421 latest_pending_subchannel_list_ != nullptr) {
1422 LOG(INFO) << "[PF " << this
1423 << "] Shutting down previous pending subchannel list "
1424 << latest_pending_subchannel_list_.get();
1425 }
1426 latest_pending_subchannel_list_ = MakeOrphanable<SubchannelList>(
1427 RefAsSubclass<OldPickFirst>(), addresses, latest_update_args_.args,
1428 latest_update_args_.resolution_note);
1429 // Empty update or no valid subchannels. Put the channel in
1430 // TRANSIENT_FAILURE and request re-resolution.
1431 if (latest_pending_subchannel_list_->size() == 0) {
1432 channel_control_helper()->RequestReresolution();
1433 absl::Status status = latest_update_args_.addresses.ok()
1434 ? absl::UnavailableError("empty address list")
1435 : latest_update_args_.addresses.status();
1436 latest_pending_subchannel_list_->ReportTransientFailure(std::move(status));
1437 }
1438 // If the new update is empty or we don't yet have a selected subchannel in
1439 // the current list, replace the current subchannel list immediately.
1440 if (latest_pending_subchannel_list_->size() == 0 || selected_ == nullptr) {
1441 UnsetSelectedSubchannel();
1442 if (GRPC_TRACE_FLAG_ENABLED(pick_first) && subchannel_list_ != nullptr) {
1443 LOG(INFO) << "[PF " << this << "] Shutting down previous subchannel list "
1444 << subchannel_list_.get();
1445 }
1446 subchannel_list_ = std::move(latest_pending_subchannel_list_);
1447 }
1448 }
1449
UpdateLocked(UpdateArgs args)1450 absl::Status OldPickFirst::UpdateLocked(UpdateArgs args) {
1451 if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
1452 if (args.addresses.ok()) {
1453 LOG(INFO) << "Pick First " << this << " received update";
1454 } else {
1455 LOG(INFO) << "Pick First " << this
1456 << " received update with address error: "
1457 << args.addresses.status();
1458 }
1459 }
1460 // Set return status based on the address list.
1461 absl::Status status;
1462 if (!args.addresses.ok()) {
1463 status = args.addresses.status();
1464 } else {
1465 EndpointAddressesList endpoints;
1466 (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
1467 endpoints.push_back(endpoint);
1468 });
1469 if (endpoints.empty()) {
1470 status = absl::UnavailableError("address list must not be empty");
1471 } else {
1472 // Shuffle the list if needed.
1473 auto config = static_cast<PickFirstConfig*>(args.config.get());
1474 if (config->shuffle_addresses()) {
1475 absl::c_shuffle(endpoints, bit_gen_);
1476 }
1477 // Flatten the list so that we have one address per endpoint.
1478 // While we're iterating, also determine the desired address family
1479 // order and the index of the first element of each family, for use in
1480 // the interleaving below.
1481 std::set<absl::string_view> address_families;
1482 std::vector<AddressFamilyIterator> address_family_order;
1483 EndpointAddressesList flattened_endpoints;
1484 for (const auto& endpoint : endpoints) {
1485 for (const auto& address : endpoint.addresses()) {
1486 flattened_endpoints.emplace_back(address, endpoint.args());
1487 absl::string_view scheme = GetAddressFamily(address);
1488 bool inserted = address_families.insert(scheme).second;
1489 if (inserted) {
1490 address_family_order.emplace_back(scheme,
1491 flattened_endpoints.size() - 1);
1492 }
1493 }
1494 }
1495 endpoints = std::move(flattened_endpoints);
1496 // Interleave addresses as per RFC-8305 section 4.
1497 EndpointAddressesList interleaved_endpoints;
1498 interleaved_endpoints.reserve(endpoints.size());
1499 std::vector<bool> endpoints_moved(endpoints.size());
1500 size_t scheme_index = 0;
1501 for (size_t i = 0; i < endpoints.size(); ++i) {
1502 EndpointAddresses* endpoint;
1503 do {
1504 auto& iterator = address_family_order[scheme_index++ %
1505 address_family_order.size()];
1506 endpoint = iterator.Next(endpoints, &endpoints_moved);
1507 } while (endpoint == nullptr);
1508 interleaved_endpoints.emplace_back(std::move(*endpoint));
1509 }
1510 endpoints = std::move(interleaved_endpoints);
1511 args.addresses =
1512 std::make_shared<EndpointAddressesListIterator>(std::move(endpoints));
1513 }
1514 }
1515 // If the update contains a resolver error and we have a previous update
1516 // that was not a resolver error, keep using the previous addresses.
1517 if (!args.addresses.ok() && latest_update_args_.config != nullptr) {
1518 args.addresses = std::move(latest_update_args_.addresses);
1519 }
1520 // Update latest_update_args_.
1521 latest_update_args_ = std::move(args);
1522 // If we are not in idle, start connection attempt immediately.
1523 // Otherwise, we defer the attempt into ExitIdleLocked().
1524 if (!IsIdle()) {
1525 AttemptToConnectUsingLatestUpdateArgsLocked();
1526 }
1527 return status;
1528 }
1529
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)1530 void OldPickFirst::UpdateState(grpc_connectivity_state state,
1531 const absl::Status& status,
1532 RefCountedPtr<SubchannelPicker> picker) {
1533 state_ = state;
1534 channel_control_helper()->UpdateState(state, status, std::move(picker));
1535 }
1536
UnsetSelectedSubchannel()1537 void OldPickFirst::UnsetSelectedSubchannel() {
1538 if (selected_ != nullptr && health_data_watcher_ != nullptr) {
1539 selected_->subchannel()->CancelDataWatcher(health_data_watcher_);
1540 }
1541 selected_ = nullptr;
1542 health_watcher_ = nullptr;
1543 health_data_watcher_ = nullptr;
1544 }
1545
1546 //
1547 // OldPickFirst::HealthWatcher
1548 //
1549
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)1550 void OldPickFirst::HealthWatcher::OnConnectivityStateChange(
1551 grpc_connectivity_state new_state, absl::Status status) {
1552 if (policy_->health_watcher_ != this) return;
1553 GRPC_TRACE_LOG(pick_first, INFO)
1554 << "[PF " << policy_.get()
1555 << "] health watch state update: " << ConnectivityStateName(new_state)
1556 << " (" << status << ")";
1557 switch (new_state) {
1558 case GRPC_CHANNEL_READY:
1559 policy_->channel_control_helper()->UpdateState(
1560 GRPC_CHANNEL_READY, absl::OkStatus(),
1561 MakeRefCounted<Picker>(policy_->selected_->subchannel()->Ref()));
1562 break;
1563 case GRPC_CHANNEL_IDLE:
1564 // If the subchannel becomes disconnected, the health watcher
1565 // might happen to see the change before the raw connectivity
1566 // state watcher does. In this case, ignore it, since the raw
1567 // connectivity state watcher will handle it shortly.
1568 break;
1569 case GRPC_CHANNEL_CONNECTING:
1570 policy_->channel_control_helper()->UpdateState(
1571 new_state, absl::OkStatus(),
1572 MakeRefCounted<QueuePicker>(policy_->Ref()));
1573 break;
1574 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
1575 std::string message = absl::StrCat("health watch: ", status.message());
1576 if (!resolution_note_.empty()) {
1577 absl::StrAppend(&message, " (", resolution_note_, ")");
1578 }
1579 policy_->channel_control_helper()->UpdateState(
1580 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1581 MakeRefCounted<TransientFailurePicker>(
1582 absl::UnavailableError(message)));
1583 break;
1584 }
1585 case GRPC_CHANNEL_SHUTDOWN:
1586 Crash("health watcher reported state SHUTDOWN");
1587 }
1588 }
1589
1590 //
1591 // OldPickFirst::SubchannelList::SubchannelData
1592 //
1593
SubchannelData(SubchannelList * subchannel_list,size_t index,RefCountedPtr<SubchannelInterface> subchannel)1594 OldPickFirst::SubchannelList::SubchannelData::SubchannelData(
1595 SubchannelList* subchannel_list, size_t index,
1596 RefCountedPtr<SubchannelInterface> subchannel)
1597 : subchannel_list_(subchannel_list),
1598 index_(index),
1599 subchannel_(std::move(subchannel)) {
1600 GRPC_TRACE_LOG(pick_first, INFO)
1601 << "[PF " << subchannel_list_->policy_.get() << "] subchannel list "
1602 << subchannel_list_ << " index " << index_ << " (subchannel "
1603 << subchannel_.get() << "): starting watch";
1604 auto watcher = std::make_unique<Watcher>(
1605 subchannel_list_->Ref(DEBUG_LOCATION, "Watcher"), index_);
1606 pending_watcher_ = watcher.get();
1607 subchannel_->WatchConnectivityState(std::move(watcher));
1608 }
1609
ShutdownLocked()1610 void OldPickFirst::SubchannelList::SubchannelData::ShutdownLocked() {
1611 if (subchannel_ != nullptr) {
1612 GRPC_TRACE_LOG(pick_first, INFO)
1613 << "[PF " << subchannel_list_->policy_.get() << "] subchannel list "
1614 << subchannel_list_ << " index " << index_ << " of "
1615 << subchannel_list_->size() << " (subchannel " << subchannel_.get()
1616 << "): cancelling watch and unreffing subchannel";
1617 subchannel_->CancelConnectivityStateWatch(pending_watcher_);
1618 pending_watcher_ = nullptr;
1619 subchannel_.reset();
1620 }
1621 }
1622
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)1623 void OldPickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
1624 grpc_connectivity_state new_state, absl::Status status) {
1625 OldPickFirst* p = subchannel_list_->policy_.get();
1626 GRPC_TRACE_LOG(pick_first, INFO)
1627 << "[PF " << p << "] subchannel list " << subchannel_list_ << " index "
1628 << index_ << " of " << subchannel_list_->size() << " (subchannel "
1629 << subchannel_.get() << "): connectivity changed: old_state="
1630 << (connectivity_state_.has_value()
1631 ? ConnectivityStateName(*connectivity_state_)
1632 : "N/A")
1633 << ", new_state=" << ConnectivityStateName(new_state)
1634 << ", status=" << status
1635 << ", shutting_down=" << subchannel_list_->shutting_down_
1636 << ", pending_watcher=" << pending_watcher_
1637 << ", seen_transient_failure=" << seen_transient_failure_
1638 << ", p->selected_=" << p->selected_
1639 << ", p->subchannel_list_=" << p->subchannel_list_.get()
1640 << ", p->latest_pending_subchannel_list_="
1641 << p->latest_pending_subchannel_list_.get();
1642 if (subchannel_list_->shutting_down_ || pending_watcher_ == nullptr) return;
1643 auto& stats_plugins = subchannel_list_->policy_->channel_control_helper()
1644 ->GetStatsPluginGroup();
1645 // The notification must be for a subchannel in either the current or
1646 // latest pending subchannel lists.
1647 CHECK(subchannel_list_ == p->subchannel_list_.get() ||
1648 subchannel_list_ == p->latest_pending_subchannel_list_.get());
1649 CHECK(new_state != GRPC_CHANNEL_SHUTDOWN);
1650 absl::optional<grpc_connectivity_state> old_state = connectivity_state_;
1651 connectivity_state_ = new_state;
1652 connectivity_status_ = std::move(status);
1653 // Handle updates for the currently selected subchannel.
1654 if (p->selected_ == this) {
1655 CHECK(subchannel_list_ == p->subchannel_list_.get());
1656 GRPC_TRACE_LOG(pick_first, INFO)
1657 << "Pick First " << p << " selected subchannel connectivity changed to "
1658 << ConnectivityStateName(new_state);
1659 // Any state change is considered to be a failure of the existing
1660 // connection.
1661 stats_plugins.AddCounter(
1662 kMetricDisconnections, 1,
1663 {subchannel_list_->policy_->channel_control_helper()->GetTarget()}, {});
1664 // TODO(roth): We could check the connectivity states of all the
1665 // subchannels here, just in case one of them happens to be READY,
1666 // and we could switch to that rather than going IDLE.
1667 // Request a re-resolution.
1668 // TODO(qianchengz): We may want to request re-resolution in
1669 // ExitIdleLocked().
1670 p->channel_control_helper()->RequestReresolution();
1671 // If there is a pending update, switch to the pending update.
1672 if (p->latest_pending_subchannel_list_ != nullptr) {
1673 GRPC_TRACE_LOG(pick_first, INFO)
1674 << "Pick First " << p << " promoting pending subchannel list "
1675 << p->latest_pending_subchannel_list_.get() << " to replace "
1676 << p->subchannel_list_.get();
1677 p->UnsetSelectedSubchannel();
1678 p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
1679 // Set our state to that of the pending subchannel list.
1680 if (p->subchannel_list_->IsHappyEyeballsPassComplete()) {
1681 status = absl::UnavailableError(absl::StrCat(
1682 "selected subchannel failed; switching to pending update; "
1683 "last failure: ",
1684 p->subchannel_list_->last_failure_.ToString()));
1685 subchannel_list_->ReportTransientFailure(std::move(status));
1686 } else if (p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1687 p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::OkStatus(),
1688 MakeRefCounted<QueuePicker>(nullptr));
1689 }
1690 return;
1691 }
1692 // Enter idle.
1693 p->UnsetSelectedSubchannel();
1694 p->subchannel_list_.reset();
1695 p->UpdateState(
1696 GRPC_CHANNEL_IDLE, absl::OkStatus(),
1697 MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
1698 return;
1699 }
1700 // If we get here, there are two possible cases:
1701 // 1. We do not currently have a selected subchannel, and the update is
1702 // for a subchannel in p->subchannel_list_ that we're trying to
1703 // connect to. The goal here is to find a subchannel that we can
1704 // select.
1705 // 2. We do currently have a selected subchannel, and the update is
1706 // for a subchannel in p->latest_pending_subchannel_list_. The
1707 // goal here is to find a subchannel from the update that we can
1708 // select in place of the current one.
1709 // If the subchannel is READY, use it.
1710 if (new_state == GRPC_CHANNEL_READY) {
1711 // We consider it a successful connection attempt only if the
1712 // previous state was CONNECTING. In particular, we don't want to
1713 // increment this counter if we got a new address list and found the
1714 // existing connection already in state READY.
1715 if (old_state == GRPC_CHANNEL_CONNECTING) {
1716 stats_plugins.AddCounter(
1717 kMetricConnectionAttemptsSucceeded, 1,
1718 {subchannel_list_->policy_->channel_control_helper()->GetTarget()},
1719 {});
1720 }
1721 ProcessUnselectedReadyLocked();
1722 return;
1723 }
1724 // Make sure we note when a subchannel has seen TRANSIENT_FAILURE.
1725 bool prev_seen_transient_failure = seen_transient_failure_;
1726 if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
1727 seen_transient_failure_ = true;
1728 subchannel_list_->last_failure_ = connectivity_status_;
1729 }
1730 // If this is the initial connectivity state update for this subchannel,
1731 // increment the counter in the subchannel list.
1732 if (!old_state.has_value()) {
1733 ++subchannel_list_->num_subchannels_seen_initial_notification_;
1734 }
1735 // If we haven't yet seen the initial connectivity state notification
1736 // for all subchannels, do nothing.
1737 if (!subchannel_list_->AllSubchannelsSeenInitialState()) return;
1738 // If we're still here and this is the initial connectivity state
1739 // notification for this subchannel, that means it was the last one to
1740 // see its initial notification. Start trying to connect, starting
1741 // with the first subchannel.
1742 if (!old_state.has_value()) {
1743 subchannel_list_->StartConnectingNextSubchannel();
1744 return;
1745 }
1746 // We've already started trying to connect. Any subchannel that
1747 // reports TF is a connection attempt failure.
1748 if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
1749 stats_plugins.AddCounter(
1750 kMetricConnectionAttemptsFailed, 1,
1751 {subchannel_list_->policy_->channel_control_helper()->GetTarget()}, {});
1752 }
1753 // Otherwise, process connectivity state change.
1754 switch (*connectivity_state_) {
1755 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
1756 // If this is the first failure we've seen on this subchannel,
1757 // then we're still in the Happy Eyeballs pass.
1758 if (!prev_seen_transient_failure && seen_transient_failure_) {
1759 // If a connection attempt fails before the timer fires, then
1760 // cancel the timer and start connecting on the next subchannel.
1761 if (index_ == subchannel_list_->attempting_index_) {
1762 if (subchannel_list_->timer_handle_.has_value()) {
1763 p->channel_control_helper()->GetEventEngine()->Cancel(
1764 *subchannel_list_->timer_handle_);
1765 }
1766 ++subchannel_list_->attempting_index_;
1767 subchannel_list_->StartConnectingNextSubchannel();
1768 } else {
1769 // If this was the last subchannel to fail, check if the Happy
1770 // Eyeballs pass is complete.
1771 subchannel_list_->MaybeFinishHappyEyeballsPass();
1772 }
1773 } else if (subchannel_list_->IsHappyEyeballsPassComplete()) {
1774 // We're done with the initial Happy Eyeballs pass and in a mode
1775 // where we're attempting to connect to every subchannel in
1776 // parallel. We count the number of failed connection attempts,
1777 // and when that is equal to the number of subchannels, request
1778 // re-resolution and report TRANSIENT_FAILURE again, so that the
1779 // caller has the most recent status message. Note that this
1780 // isn't necessarily the same as saying that we've seen one
1781 // failure for each subchannel in the list, because the backoff
1782 // state may be different in each subchannel, so we may have seen
1783 // one subchannel fail more than once and another subchannel not
1784 // fail at all. But it's a good enough heuristic.
1785 ++subchannel_list_->num_failures_;
1786 if (subchannel_list_->num_failures_ % subchannel_list_->size() == 0) {
1787 p->channel_control_helper()->RequestReresolution();
1788 status = absl::UnavailableError(absl::StrCat(
1789 (p->omit_status_message_prefix_
1790 ? ""
1791 : "failed to connect to all addresses; last error: "),
1792 connectivity_status_.ToString()));
1793 subchannel_list_->ReportTransientFailure(std::move(status));
1794 }
1795 }
1796 break;
1797 }
1798 case GRPC_CHANNEL_IDLE:
1799 // If we've finished the first Happy Eyeballs pass, then we go
1800 // into a mode where we immediately try to connect to every
1801 // subchannel in parallel.
1802 if (subchannel_list_->IsHappyEyeballsPassComplete()) {
1803 subchannel_->RequestConnection();
1804 }
1805 break;
1806 case GRPC_CHANNEL_CONNECTING:
1807 // Only update connectivity state in case 1, and only if we're not
1808 // already in TRANSIENT_FAILURE.
1809 if (subchannel_list_ == p->subchannel_list_.get() &&
1810 p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1811 p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::OkStatus(),
1812 MakeRefCounted<QueuePicker>(nullptr));
1813 }
1814 break;
1815 default:
1816 // We handled READY above, and we should never see SHUTDOWN.
1817 GPR_UNREACHABLE_CODE(break);
1818 }
1819 }
1820
1821 void OldPickFirst::SubchannelList::SubchannelData::
RequestConnectionWithTimer()1822 RequestConnectionWithTimer() {
1823 CHECK(connectivity_state_.has_value());
1824 if (connectivity_state_ == GRPC_CHANNEL_IDLE) {
1825 subchannel_->RequestConnection();
1826 } else {
1827 CHECK(connectivity_state_ == GRPC_CHANNEL_CONNECTING);
1828 }
1829 // If this is not the last subchannel in the list, start the timer.
1830 if (index_ != subchannel_list_->size() - 1) {
1831 OldPickFirst* p = subchannel_list_->policy_.get();
1832 GRPC_TRACE_LOG(pick_first, INFO)
1833 << "Pick First " << p << " subchannel list " << subchannel_list_
1834 << ": starting Connection Attempt Delay timer for "
1835 << p->connection_attempt_delay_.millis() << "ms for index " << index_;
1836 subchannel_list_->timer_handle_ =
1837 p->channel_control_helper()->GetEventEngine()->RunAfter(
1838 p->connection_attempt_delay_,
1839 [subchannel_list =
1840 subchannel_list_->Ref(DEBUG_LOCATION, "timer")]() mutable {
1841 ApplicationCallbackExecCtx application_exec_ctx;
1842 ExecCtx exec_ctx;
1843 auto* sl = subchannel_list.get();
1844 sl->policy_->work_serializer()->Run(
1845 [subchannel_list = std::move(subchannel_list)]() {
1846 GRPC_TRACE_LOG(pick_first, INFO)
1847 << "Pick First " << subchannel_list->policy_.get()
1848 << " subchannel list " << subchannel_list.get()
1849 << ": Connection Attempt Delay timer fired "
1850 << "(shutting_down=" << subchannel_list->shutting_down_
1851 << ", selected=" << subchannel_list->policy_->selected_
1852 << ")";
1853 if (subchannel_list->shutting_down_) return;
1854 if (subchannel_list->policy_->selected_ != nullptr) return;
1855 ++subchannel_list->attempting_index_;
1856 subchannel_list->StartConnectingNextSubchannel();
1857 },
1858 DEBUG_LOCATION);
1859 });
1860 }
1861 }
1862
1863 void OldPickFirst::SubchannelList::SubchannelData::
ProcessUnselectedReadyLocked()1864 ProcessUnselectedReadyLocked() {
1865 OldPickFirst* p = subchannel_list_->policy_.get();
1866 // Cancel Happy Eyeballs timer, if any.
1867 if (subchannel_list_->timer_handle_.has_value()) {
1868 p->channel_control_helper()->GetEventEngine()->Cancel(
1869 *subchannel_list_->timer_handle_);
1870 }
1871 // If we get here, there are two possible cases:
1872 // 1. We do not currently have a selected subchannel, and the update is
1873 // for a subchannel in p->subchannel_list_ that we're trying to
1874 // connect to. The goal here is to find a subchannel that we can
1875 // select.
1876 // 2. We do currently have a selected subchannel, and the update is
1877 // for a subchannel in p->latest_pending_subchannel_list_. The
1878 // goal here is to find a subchannel from the update that we can
1879 // select in place of the current one.
1880 CHECK(subchannel_list_ == p->subchannel_list_.get() ||
1881 subchannel_list_ == p->latest_pending_subchannel_list_.get());
1882 // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
1883 if (subchannel_list_ == p->latest_pending_subchannel_list_.get()) {
1884 GRPC_TRACE_LOG(pick_first, INFO)
1885 << "Pick First " << p << " promoting pending subchannel list "
1886 << p->latest_pending_subchannel_list_.get() << " to replace "
1887 << p->subchannel_list_.get();
1888 p->UnsetSelectedSubchannel();
1889 p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
1890 }
1891 // Cases 1 and 2.
1892 GRPC_TRACE_LOG(pick_first, INFO)
1893 << "Pick First " << p << " selected subchannel " << subchannel_.get();
1894 p->selected_ = this;
1895 // If health checking is enabled, start the health watch, but don't
1896 // report a new picker -- we want to stay in CONNECTING while we wait
1897 // for the health status notification.
1898 // If health checking is NOT enabled, report READY.
1899 if (p->enable_health_watch_) {
1900 GRPC_TRACE_LOG(pick_first, INFO)
1901 << "[PF " << p << "] starting health watch";
1902 auto watcher = std::make_unique<HealthWatcher>(
1903 p->RefAsSubclass<OldPickFirst>(DEBUG_LOCATION, "HealthWatcher"),
1904 subchannel_list_->resolution_note_);
1905 p->health_watcher_ = watcher.get();
1906 auto health_data_watcher = MakeHealthCheckWatcher(
1907 p->work_serializer(), subchannel_list_->args_, std::move(watcher));
1908 p->health_data_watcher_ = health_data_watcher.get();
1909 subchannel_->AddDataWatcher(std::move(health_data_watcher));
1910 } else {
1911 p->UpdateState(GRPC_CHANNEL_READY, absl::OkStatus(),
1912 MakeRefCounted<Picker>(subchannel()->Ref()));
1913 }
1914 // Unref all other subchannels in the list.
1915 for (size_t i = 0; i < subchannel_list_->size(); ++i) {
1916 if (i != index_) {
1917 subchannel_list_->subchannels_[i].ShutdownLocked();
1918 }
1919 }
1920 }
1921
1922 //
1923 // OldPickFirst::SubchannelList
1924 //
1925
SubchannelList(RefCountedPtr<OldPickFirst> policy,EndpointAddressesIterator * addresses,const ChannelArgs & args,absl::string_view resolution_note)1926 OldPickFirst::SubchannelList::SubchannelList(
1927 RefCountedPtr<OldPickFirst> policy, EndpointAddressesIterator* addresses,
1928 const ChannelArgs& args, absl::string_view resolution_note)
1929 : InternallyRefCounted<SubchannelList>(
1930 GRPC_TRACE_FLAG_ENABLED(pick_first) ? "SubchannelList" : nullptr),
1931 policy_(std::move(policy)),
1932 args_(
1933 args.Remove(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
1934 .Remove(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)),
1935 resolution_note_(resolution_note) {
1936 GRPC_TRACE_LOG(pick_first, INFO)
1937 << "[PF " << policy_.get() << "] Creating subchannel list " << this
1938 << " - channel args: " << args_.ToString();
1939 if (addresses == nullptr) return;
1940 // Create a subchannel for each address.
1941 addresses->ForEach([&](const EndpointAddresses& address) {
1942 CHECK_EQ(address.addresses().size(), 1u);
1943 RefCountedPtr<SubchannelInterface> subchannel =
1944 policy_->channel_control_helper()->CreateSubchannel(
1945 address.address(), address.args(), args_);
1946 if (subchannel == nullptr) {
1947 // Subchannel could not be created.
1948 GRPC_TRACE_LOG(pick_first, INFO)
1949 << "[PF " << policy_.get()
1950 << "] could not create subchannel for address " << address.ToString()
1951 << ", ignoring";
1952 return;
1953 }
1954 GRPC_TRACE_LOG(pick_first, INFO)
1955 << "[PF " << policy_.get() << "] subchannel list " << this << " index "
1956 << subchannels_.size() << ": Created subchannel " << subchannel.get()
1957 << " for address " << address.ToString();
1958 subchannels_.emplace_back(this, subchannels_.size(), std::move(subchannel));
1959 });
1960 }
1961
~SubchannelList()1962 OldPickFirst::SubchannelList::~SubchannelList() {
1963 GRPC_TRACE_LOG(pick_first, INFO)
1964 << "[PF " << policy_.get() << "] Destroying subchannel_list " << this;
1965 }
1966
Orphan()1967 void OldPickFirst::SubchannelList::Orphan() {
1968 GRPC_TRACE_LOG(pick_first, INFO)
1969 << "[PF " << policy_.get() << "] Shutting down subchannel_list " << this;
1970 CHECK(!shutting_down_);
1971 shutting_down_ = true;
1972 for (auto& sd : subchannels_) {
1973 sd.ShutdownLocked();
1974 }
1975 if (timer_handle_.has_value()) {
1976 policy_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
1977 }
1978 Unref();
1979 }
1980
ResetBackoffLocked()1981 void OldPickFirst::SubchannelList::ResetBackoffLocked() {
1982 for (auto& sd : subchannels_) {
1983 sd.ResetBackoffLocked();
1984 }
1985 }
1986
ReportTransientFailure(absl::Status status)1987 void OldPickFirst::SubchannelList::ReportTransientFailure(absl::Status status) {
1988 if (!resolution_note_.empty()) {
1989 status = absl::Status(status.code(), absl::StrCat(status.message(), " (",
1990 resolution_note_, ")"));
1991 }
1992 policy_->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1993 MakeRefCounted<TransientFailurePicker>(status));
1994 }
1995
StartConnectingNextSubchannel()1996 void OldPickFirst::SubchannelList::StartConnectingNextSubchannel() {
1997 // Find the next subchannel not in state TRANSIENT_FAILURE.
1998 // We skip subchannels in state TRANSIENT_FAILURE to avoid a
1999 // large recursion that could overflow the stack.
2000 for (; attempting_index_ < size(); ++attempting_index_) {
2001 SubchannelData* sc = &subchannels_[attempting_index_];
2002 CHECK(sc->connectivity_state().has_value());
2003 if (sc->connectivity_state() != GRPC_CHANNEL_TRANSIENT_FAILURE) {
2004 // Found a subchannel not in TRANSIENT_FAILURE, so trigger a
2005 // connection attempt.
2006 sc->RequestConnectionWithTimer();
2007 return;
2008 }
2009 }
2010 // If we didn't find a subchannel to request a connection on, check to
2011 // see if the Happy Eyeballs pass is complete.
2012 MaybeFinishHappyEyeballsPass();
2013 }
2014
MaybeFinishHappyEyeballsPass()2015 void OldPickFirst::SubchannelList::MaybeFinishHappyEyeballsPass() {
2016 // Make sure all subchannels have finished a connection attempt before
2017 // we consider the Happy Eyeballs pass complete.
2018 if (!IsHappyEyeballsPassComplete()) return;
2019 // We didn't find another subchannel not in state TRANSIENT_FAILURE,
2020 // so report TRANSIENT_FAILURE and switch to a mode in which we try to
2021 // connect to all addresses in parallel.
2022 GRPC_TRACE_LOG(pick_first, INFO)
2023 << "Pick First " << policy_.get() << " subchannel list " << this
2024 << " failed to connect to all subchannels";
2025 // In case 2, swap to the new subchannel list. This means reporting
2026 // TRANSIENT_FAILURE and dropping the existing (working) connection,
2027 // but we can't ignore what the control plane has told us.
2028 if (policy_->latest_pending_subchannel_list_.get() == this) {
2029 GRPC_TRACE_LOG(pick_first, INFO)
2030 << "Pick First " << policy_.get()
2031 << " promoting pending subchannel list "
2032 << policy_->latest_pending_subchannel_list_.get() << " to replace "
2033 << this;
2034 policy_->UnsetSelectedSubchannel();
2035 policy_->subchannel_list_ =
2036 std::move(policy_->latest_pending_subchannel_list_);
2037 }
2038 // If this is the current subchannel list (either because we were
2039 // in case 1 or because we were in case 2 and just promoted it to
2040 // be the current list), re-resolve and report new state.
2041 if (policy_->subchannel_list_.get() == this) {
2042 policy_->channel_control_helper()->RequestReresolution();
2043 absl::Status status = absl::UnavailableError(
2044 absl::StrCat((policy_->omit_status_message_prefix_
2045 ? ""
2046 : "failed to connect to all addresses; last error: "),
2047 last_failure_.ToString()));
2048 ReportTransientFailure(std::move(status));
2049 }
2050 // We now transition into a mode where we try to connect to all
2051 // subchannels in parallel. For any subchannel currently in IDLE,
2052 // trigger a connection attempt. For any subchannel not currently in
2053 // IDLE, we will trigger a connection attempt when it does report IDLE.
2054 for (SubchannelData& sd : subchannels_) {
2055 if (sd.connectivity_state() == GRPC_CHANNEL_IDLE) {
2056 sd.RequestConnection();
2057 }
2058 }
2059 }
2060
2061 //
2062 // factory
2063 //
2064
2065 class PickFirstFactory final : public LoadBalancingPolicyFactory {
2066 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const2067 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
2068 LoadBalancingPolicy::Args args) const override {
2069 if (!IsPickFirstNewEnabled()) {
2070 return MakeOrphanable<OldPickFirst>(std::move(args));
2071 }
2072 return MakeOrphanable<PickFirst>(std::move(args));
2073 }
2074
name() const2075 absl::string_view name() const override { return kPickFirst; }
2076
2077 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const2078 ParseLoadBalancingConfig(const Json& json) const override {
2079 return LoadFromJson<RefCountedPtr<PickFirstConfig>>(
2080 json, JsonArgs(), "errors validating pick_first LB policy config");
2081 }
2082 };
2083
2084 } // namespace
2085
RegisterPickFirstLbPolicy(CoreConfiguration::Builder * builder)2086 void RegisterPickFirstLbPolicy(CoreConfiguration::Builder* builder) {
2087 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
2088 std::make_unique<PickFirstFactory>());
2089 }
2090
2091 } // namespace grpc_core
2092