• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/load_balancing/pick_first/pick_first.h"
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/impl/channel_arg_names.h>
21 #include <grpc/impl/connectivity_state.h>
22 #include <grpc/support/port_platform.h>
23 #include <inttypes.h>
24 #include <string.h>
25 
26 #include <memory>
27 #include <set>
28 #include <string>
29 #include <type_traits>
30 #include <utility>
31 #include <vector>
32 
33 #include "absl/algorithm/container.h"
34 #include "absl/log/check.h"
35 #include "absl/log/log.h"
36 #include "absl/random/random.h"
37 #include "absl/status/status.h"
38 #include "absl/status/statusor.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/string_view.h"
41 #include "absl/types/optional.h"
42 #include "src/core/config/core_configuration.h"
43 #include "src/core/lib/address_utils/sockaddr_utils.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/debug/trace.h"
46 #include "src/core/lib/experiments/experiments.h"
47 #include "src/core/lib/iomgr/exec_ctx.h"
48 #include "src/core/lib/iomgr/iomgr_fwd.h"
49 #include "src/core/lib/iomgr/resolved_address.h"
50 #include "src/core/lib/transport/connectivity_state.h"
51 #include "src/core/load_balancing/health_check_client.h"
52 #include "src/core/load_balancing/lb_policy.h"
53 #include "src/core/load_balancing/lb_policy_factory.h"
54 #include "src/core/load_balancing/subchannel_interface.h"
55 #include "src/core/resolver/endpoint_addresses.h"
56 #include "src/core/telemetry/metrics.h"
57 #include "src/core/util/crash.h"
58 #include "src/core/util/debug_location.h"
59 #include "src/core/util/json/json.h"
60 #include "src/core/util/json/json_args.h"
61 #include "src/core/util/json/json_object_loader.h"
62 #include "src/core/util/orphanable.h"
63 #include "src/core/util/ref_counted_ptr.h"
64 #include "src/core/util/time.h"
65 #include "src/core/util/useful.h"
66 #include "src/core/util/work_serializer.h"
67 
68 namespace grpc_core {
69 
70 namespace {
71 
72 //
73 // pick_first LB policy
74 //
75 
76 constexpr absl::string_view kPickFirst = "pick_first";
77 
78 const auto kMetricDisconnections =
79     GlobalInstrumentsRegistry::RegisterUInt64Counter(
80         "grpc.lb.pick_first.disconnections",
81         "EXPERIMENTAL.  Number of times the selected subchannel becomes "
82         "disconnected.",
83         "{disconnection}", false)
84         .Labels(kMetricLabelTarget)
85         .Build();
86 
87 const auto kMetricConnectionAttemptsSucceeded =
88     GlobalInstrumentsRegistry::RegisterUInt64Counter(
89         "grpc.lb.pick_first.connection_attempts_succeeded",
90         "EXPERIMENTAL.  Number of successful connection attempts.", "{attempt}",
91         false)
92         .Labels(kMetricLabelTarget)
93         .Build();
94 
95 const auto kMetricConnectionAttemptsFailed =
96     GlobalInstrumentsRegistry::RegisterUInt64Counter(
97         "grpc.lb.pick_first.connection_attempts_failed",
98         "EXPERIMENTAL.  Number of failed connection attempts.", "{attempt}",
99         false)
100         .Labels(kMetricLabelTarget)
101         .Build();
102 
103 class PickFirstConfig final : public LoadBalancingPolicy::Config {
104  public:
name() const105   absl::string_view name() const override { return kPickFirst; }
shuffle_addresses() const106   bool shuffle_addresses() const { return shuffle_addresses_; }
107 
JsonLoader(const JsonArgs &)108   static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
109     static const auto kJsonLoader =
110         JsonObjectLoader<PickFirstConfig>()
111             .OptionalField("shuffleAddressList",
112                            &PickFirstConfig::shuffle_addresses_)
113             .Finish();
114     return kJsonLoader;
115   }
116 
117  private:
118   bool shuffle_addresses_ = false;
119 };
120 
121 class PickFirst final : public LoadBalancingPolicy {
122  public:
123   explicit PickFirst(Args args);
124 
name() const125   absl::string_view name() const override { return kPickFirst; }
126 
127   absl::Status UpdateLocked(UpdateArgs args) override;
128   void ExitIdleLocked() override;
129   void ResetBackoffLocked() override;
130 
131  private:
132   ~PickFirst() override;
133 
134   // A list of subchannels that we will attempt connections on.
135   class SubchannelList final : public InternallyRefCounted<SubchannelList> {
136    public:
137     // Data about the subchannel that is needed only while attempting to
138     // connect.
139     class SubchannelData final {
140      public:
141       // Stores the subchannel and its watcher.  This is the state that
142       // is retained once a subchannel is chosen.
143       class SubchannelState final
144           : public InternallyRefCounted<SubchannelState> {
145        public:
146         SubchannelState(SubchannelData* subchannel_data,
147                         RefCountedPtr<SubchannelInterface> subchannel);
148 
149         void Orphan() override;
150 
subchannel() const151         SubchannelInterface* subchannel() const { return subchannel_.get(); }
152 
RequestConnection()153         void RequestConnection() { subchannel_->RequestConnection(); }
154 
ResetBackoffLocked()155         void ResetBackoffLocked() { subchannel_->ResetBackoff(); }
156 
157        private:
158         // Watcher for subchannel connectivity state.
159         class Watcher
160             : public SubchannelInterface::ConnectivityStateWatcherInterface {
161          public:
Watcher(RefCountedPtr<SubchannelState> subchannel_state)162           explicit Watcher(RefCountedPtr<SubchannelState> subchannel_state)
163               : subchannel_state_(std::move(subchannel_state)) {}
164 
~Watcher()165           ~Watcher() override {
166             subchannel_state_.reset(DEBUG_LOCATION, "Watcher dtor");
167           }
168 
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)169           void OnConnectivityStateChange(grpc_connectivity_state new_state,
170                                          absl::Status status) override {
171             subchannel_state_->OnConnectivityStateChange(new_state,
172                                                          std::move(status));
173           }
174 
interested_parties()175           grpc_pollset_set* interested_parties() override {
176             return subchannel_state_->pick_first_->interested_parties();
177           }
178 
179          private:
180           RefCountedPtr<SubchannelState> subchannel_state_;
181         };
182 
183         // Selects this subchannel.  Called when the subchannel reports READY.
184         void Select();
185 
186         // This method will be invoked once soon after instantiation to report
187         // the current connectivity state, and it will then be invoked again
188         // whenever the connectivity state changes.
189         void OnConnectivityStateChange(grpc_connectivity_state new_state,
190                                        absl::Status status);
191 
192         // If non-null, then we are still part of a subchannel list
193         // trying to connect.
194         SubchannelData* subchannel_data_;
195 
196         // TODO(roth): Once we remove pollset_set, we should no longer
197         // need to hold a ref to PickFirst.  Instead, we can make this a
198         // raw pointer and put it in an absl::variant with subchannel_data_.
199         RefCountedPtr<PickFirst> pick_first_;
200 
201         RefCountedPtr<SubchannelInterface> subchannel_;
202         SubchannelInterface::ConnectivityStateWatcherInterface* watcher_ =
203             nullptr;
204       };
205 
206       SubchannelData(SubchannelList* subchannel_list, size_t index,
207                      RefCountedPtr<SubchannelInterface> subchannel);
208 
connectivity_state() const209       absl::optional<grpc_connectivity_state> connectivity_state() const {
210         return connectivity_state_;
211       }
connectivity_status() const212       const absl::Status& connectivity_status() const {
213         return connectivity_status_;
214       }
215 
RequestConnection()216       void RequestConnection() { subchannel_state_->RequestConnection(); }
217 
218       // Resets the connection backoff.
ResetBackoffLocked()219       void ResetBackoffLocked() { subchannel_state_->ResetBackoffLocked(); }
220 
221       // Requests a connection attempt to start on this subchannel,
222       // with appropriate Connection Attempt Delay.
223       // Used only during the Happy Eyeballs pass.
224       void RequestConnectionWithTimer();
225 
seen_transient_failure() const226       bool seen_transient_failure() const { return seen_transient_failure_; }
227 
228      private:
229       // This method will be invoked once soon after instantiation to report
230       // the current connectivity state, and it will then be invoked again
231       // whenever the connectivity state changes.
232       void OnConnectivityStateChange(grpc_connectivity_state new_state,
233                                      absl::Status status);
234 
235       // Backpointer to owning subchannel list.  Not owned.
236       SubchannelList* subchannel_list_;
237       // Our index within subchannel_list_.
238       const size_t index_;
239       // Subchannel state.
240       OrphanablePtr<SubchannelState> subchannel_state_;
241       // Data updated by the watcher.
242       absl::optional<grpc_connectivity_state> connectivity_state_;
243       absl::Status connectivity_status_;
244       bool seen_transient_failure_ = false;
245     };
246 
247     SubchannelList(RefCountedPtr<PickFirst> policy,
248                    EndpointAddressesIterator* addresses,
249                    const ChannelArgs& args, absl::string_view resolution_note);
250 
251     ~SubchannelList() override;
252 
253     void Orphan() override;
254 
255     // The number of subchannels in the list.
size() const256     size_t size() const { return subchannels_.size(); }
257 
258     // Resets connection backoff of all subchannels.
259     void ResetBackoffLocked();
260 
IsHappyEyeballsPassComplete() const261     bool IsHappyEyeballsPassComplete() const {
262       // Checking attempting_index_ here is just an optimization -- if
263       // we haven't actually tried all subchannels yet, then we don't
264       // need to iterate.
265       if (attempting_index_ < size()) return false;
266       for (const auto& sd : subchannels_) {
267         if (!sd->seen_transient_failure()) return false;
268       }
269       return true;
270     }
271 
272     void ReportTransientFailure(absl::Status status);
273 
274    private:
275     // Returns true if all subchannels have seen their initial
276     // connectivity state notifications.
AllSubchannelsSeenInitialState() const277     bool AllSubchannelsSeenInitialState() const {
278       return num_subchannels_seen_initial_notification_ == size();
279     }
280 
281     // Looks through subchannels_ starting from attempting_index_ to
282     // find the first one not currently in TRANSIENT_FAILURE, then
283     // triggers a connection attempt for that subchannel.  If there are
284     // no more subchannels not in TRANSIENT_FAILURE, calls
285     // MaybeFinishHappyEyeballsPass().
286     void StartConnectingNextSubchannel();
287 
288     // Checks to see if the initial Happy Eyeballs pass is complete --
289     // i.e., all subchannels have seen TRANSIENT_FAILURE state at least once.
290     // If so, transitions to a mode where we try to connect to all subchannels
291     // in parallel and returns true.
292     void MaybeFinishHappyEyeballsPass();
293 
294     // Backpointer to owning policy.
295     RefCountedPtr<PickFirst> policy_;
296 
297     ChannelArgs args_;
298     std::string resolution_note_;
299 
300     // The list of subchannels.
301     std::vector<std::unique_ptr<SubchannelData>> subchannels_;
302 
303     // Is this list shutting down? This may be true due to the shutdown of the
304     // policy itself or because a newer update has arrived while this one hadn't
305     // finished processing.
306     bool shutting_down_ = false;
307 
308     size_t num_subchannels_seen_initial_notification_ = 0;
309 
310     // The index into subchannels_ to which we are currently attempting
311     // to connect during the initial Happy Eyeballs pass.  Once the
312     // initial pass is over, this will be equal to size().
313     size_t attempting_index_ = 0;
314     // Happy Eyeballs timer handle.
315     absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
316         timer_handle_;
317 
318     // After the initial Happy Eyeballs pass, the number of failures
319     // we've seen.  Every size() failures, we trigger re-resolution.
320     size_t num_failures_ = 0;
321 
322     // The status from the last subchannel that reported TRANSIENT_FAILURE.
323     absl::Status last_failure_;
324   };
325 
326   class HealthWatcher final
327       : public SubchannelInterface::ConnectivityStateWatcherInterface {
328    public:
HealthWatcher(RefCountedPtr<PickFirst> policy,absl::string_view resolution_note)329     HealthWatcher(RefCountedPtr<PickFirst> policy,
330                   absl::string_view resolution_note)
331         : policy_(std::move(policy)), resolution_note_(resolution_note) {}
332 
~HealthWatcher()333     ~HealthWatcher() override {
334       policy_.reset(DEBUG_LOCATION, "HealthWatcher dtor");
335     }
336 
337     void OnConnectivityStateChange(grpc_connectivity_state new_state,
338                                    absl::Status status) override;
339 
interested_parties()340     grpc_pollset_set* interested_parties() override {
341       return policy_->interested_parties();
342     }
343 
344    private:
345     RefCountedPtr<PickFirst> policy_;
346     std::string resolution_note_;
347   };
348 
349   class Picker final : public SubchannelPicker {
350    public:
Picker(RefCountedPtr<SubchannelInterface> subchannel)351     explicit Picker(RefCountedPtr<SubchannelInterface> subchannel)
352         : subchannel_(std::move(subchannel)) {}
353 
Pick(PickArgs)354     PickResult Pick(PickArgs /*args*/) override {
355       return PickResult::Complete(subchannel_);
356     }
357 
358    private:
359     RefCountedPtr<SubchannelInterface> subchannel_;
360   };
361 
362   void ShutdownLocked() override;
363 
364   void UpdateState(grpc_connectivity_state state, const absl::Status& status,
365                    RefCountedPtr<SubchannelPicker> picker);
366 
367   void AttemptToConnectUsingLatestUpdateArgsLocked();
368 
369   void UnsetSelectedSubchannel();
370 
371   void GoIdle();
372 
373   // When ExitIdleLocked() is called, we create a subchannel_list_ and start
374   // trying to connect, but we don't actually change state_ until the first
375   // subchannel reports CONNECTING.  So in order to know if we're really
376   // idle, we need to check both state_ and subchannel_list_.
IsIdle() const377   bool IsIdle() const {
378     return state_ == GRPC_CHANNEL_IDLE && subchannel_list_ == nullptr;
379   }
380 
381   // Whether we should enable health watching.
382   const bool enable_health_watch_;
383   // Whether we should omit our status message prefix.
384   const bool omit_status_message_prefix_;
385   // Connection Attempt Delay for Happy Eyeballs.
386   const Duration connection_attempt_delay_;
387 
388   // Lateset update args.
389   UpdateArgs latest_update_args_;
390   // The list of subchannels that we're currently trying to connect to.
391   // Will generally be null when selected_ is set, except when we get a
392   // resolver update and need to check initial connectivity states for
393   // the new list to decide whether we keep using the existing
394   // connection or go IDLE.
395   OrphanablePtr<SubchannelList> subchannel_list_;
396   // Selected subchannel.  Will generally be null when subchannel_list_
397   // is non-null, with the exception mentioned above.
398   OrphanablePtr<SubchannelList::SubchannelData::SubchannelState> selected_;
399   // Health watcher for the selected subchannel.
400   SubchannelInterface::ConnectivityStateWatcherInterface* health_watcher_ =
401       nullptr;
402   SubchannelInterface::DataWatcherInterface* health_data_watcher_ = nullptr;
403   // Current connectivity state.
404   grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
405   // Are we shut down?
406   bool shutdown_ = false;
407   // Random bit generator used for shuffling addresses if configured
408   absl::BitGen bit_gen_;
409 };
410 
PickFirst(Args args)411 PickFirst::PickFirst(Args args)
412     : LoadBalancingPolicy(std::move(args)),
413       enable_health_watch_(
414           channel_args()
415               .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
416               .value_or(false)),
417       omit_status_message_prefix_(
418           channel_args()
419               .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)
420               .value_or(false)),
421       connection_attempt_delay_(Duration::Milliseconds(
422           Clamp(channel_args()
423                     .GetInt(GRPC_ARG_HAPPY_EYEBALLS_CONNECTION_ATTEMPT_DELAY_MS)
424                     .value_or(250),
425                 100, 2000))) {
426   GRPC_TRACE_LOG(pick_first, INFO) << "Pick First " << this << " created.";
427 }
428 
~PickFirst()429 PickFirst::~PickFirst() {
430   GRPC_TRACE_LOG(pick_first, INFO) << "Destroying Pick First " << this;
431   CHECK_EQ(subchannel_list_.get(), nullptr);
432 }
433 
ShutdownLocked()434 void PickFirst::ShutdownLocked() {
435   GRPC_TRACE_LOG(pick_first, INFO) << "Pick First " << this << " Shutting down";
436   shutdown_ = true;
437   UnsetSelectedSubchannel();
438   subchannel_list_.reset();
439 }
440 
ExitIdleLocked()441 void PickFirst::ExitIdleLocked() {
442   if (shutdown_) return;
443   if (IsIdle()) {
444     GRPC_TRACE_LOG(pick_first, INFO)
445         << "Pick First " << this << " exiting idle";
446     AttemptToConnectUsingLatestUpdateArgsLocked();
447   }
448 }
449 
ResetBackoffLocked()450 void PickFirst::ResetBackoffLocked() {
451   if (subchannel_list_ != nullptr) subchannel_list_->ResetBackoffLocked();
452 }
453 
AttemptToConnectUsingLatestUpdateArgsLocked()454 void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
455   // Create a subchannel list from latest_update_args_.
456   EndpointAddressesIterator* addresses = nullptr;
457   if (latest_update_args_.addresses.ok()) {
458     addresses = latest_update_args_.addresses->get();
459   }
460   // Replace subchannel_list_.
461   if (GRPC_TRACE_FLAG_ENABLED(pick_first) && subchannel_list_ != nullptr) {
462     LOG(INFO) << "[PF " << this << "] Shutting down previous subchannel list "
463               << subchannel_list_.get();
464   }
465   subchannel_list_ = MakeOrphanable<SubchannelList>(
466       RefAsSubclass<PickFirst>(DEBUG_LOCATION, "SubchannelList"), addresses,
467       latest_update_args_.args, latest_update_args_.resolution_note);
468   // Empty update or no valid subchannels.  Put the channel in
469   // TRANSIENT_FAILURE and request re-resolution.  Also unset the
470   // current selected subchannel.
471   if (subchannel_list_->size() == 0) {
472     channel_control_helper()->RequestReresolution();
473     absl::Status status = latest_update_args_.addresses.ok()
474                               ? absl::UnavailableError("empty address list")
475                               : latest_update_args_.addresses.status();
476     subchannel_list_->ReportTransientFailure(std::move(status));
477     UnsetSelectedSubchannel();
478   }
479 }
480 
GetAddressFamily(const grpc_resolved_address & address)481 absl::string_view GetAddressFamily(const grpc_resolved_address& address) {
482   const char* uri_scheme = grpc_sockaddr_get_uri_scheme(&address);
483   return absl::string_view(uri_scheme == nullptr ? "other" : uri_scheme);
484 };
485 
486 // An endpoint list iterator that returns only entries for a specific
487 // address family, as indicated by the URI scheme.
488 class AddressFamilyIterator final {
489  public:
AddressFamilyIterator(absl::string_view scheme,size_t index)490   AddressFamilyIterator(absl::string_view scheme, size_t index)
491       : scheme_(scheme), index_(index) {}
492 
Next(EndpointAddressesList & endpoints,std::vector<bool> * endpoints_moved)493   EndpointAddresses* Next(EndpointAddressesList& endpoints,
494                           std::vector<bool>* endpoints_moved) {
495     for (; index_ < endpoints.size(); ++index_) {
496       if (!(*endpoints_moved)[index_] &&
497           GetAddressFamily(endpoints[index_].address()) == scheme_) {
498         (*endpoints_moved)[index_] = true;
499         return &endpoints[index_++];
500       }
501     }
502     return nullptr;
503   }
504 
505  private:
506   absl::string_view scheme_;
507   size_t index_;
508 };
509 
UpdateLocked(UpdateArgs args)510 absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
511   if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
512     if (args.addresses.ok()) {
513       LOG(INFO) << "Pick First " << this << " received update";
514     } else {
515       LOG(INFO) << "Pick First " << this
516                 << " received update with address error: "
517                 << args.addresses.status();
518     }
519   }
520   // Set return status based on the address list.
521   absl::Status status;
522   if (!args.addresses.ok()) {
523     status = args.addresses.status();
524   } else {
525     EndpointAddressesList endpoints;
526     (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
527       endpoints.push_back(endpoint);
528     });
529     if (endpoints.empty()) {
530       status = absl::UnavailableError("address list must not be empty");
531     } else {
532       // Shuffle the list if needed.
533       auto config = static_cast<PickFirstConfig*>(args.config.get());
534       if (config->shuffle_addresses()) {
535         absl::c_shuffle(endpoints, bit_gen_);
536       }
537       // Flatten the list so that we have one address per endpoint.
538       // While we're iterating, also determine the desired address family
539       // order and the index of the first element of each family, for use in
540       // the interleaving below.
541       std::set<absl::string_view> address_families;
542       std::vector<AddressFamilyIterator> address_family_order;
543       EndpointAddressesList flattened_endpoints;
544       for (const auto& endpoint : endpoints) {
545         for (const auto& address : endpoint.addresses()) {
546           flattened_endpoints.emplace_back(address, endpoint.args());
547           absl::string_view scheme = GetAddressFamily(address);
548           bool inserted = address_families.insert(scheme).second;
549           if (inserted) {
550             address_family_order.emplace_back(scheme,
551                                               flattened_endpoints.size() - 1);
552           }
553         }
554       }
555       endpoints = std::move(flattened_endpoints);
556       // Interleave addresses as per RFC-8305 section 4.
557       EndpointAddressesList interleaved_endpoints;
558       interleaved_endpoints.reserve(endpoints.size());
559       std::vector<bool> endpoints_moved(endpoints.size());
560       size_t scheme_index = 0;
561       for (size_t i = 0; i < endpoints.size(); ++i) {
562         EndpointAddresses* endpoint;
563         do {
564           auto& iterator = address_family_order[scheme_index++ %
565                                                 address_family_order.size()];
566           endpoint = iterator.Next(endpoints, &endpoints_moved);
567         } while (endpoint == nullptr);
568         interleaved_endpoints.emplace_back(std::move(*endpoint));
569       }
570       endpoints = std::move(interleaved_endpoints);
571       args.addresses =
572           std::make_shared<EndpointAddressesListIterator>(std::move(endpoints));
573     }
574   }
575   // If the update contains a resolver error and we have a previous update
576   // that was not a resolver error, keep using the previous addresses.
577   if (!args.addresses.ok() && latest_update_args_.config != nullptr) {
578     args.addresses = std::move(latest_update_args_.addresses);
579   }
580   // Update latest_update_args_.
581   latest_update_args_ = std::move(args);
582   // If we are not in idle, start connection attempt immediately.
583   // Otherwise, we defer the attempt into ExitIdleLocked().
584   if (!IsIdle()) {
585     AttemptToConnectUsingLatestUpdateArgsLocked();
586   }
587   return status;
588 }
589 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)590 void PickFirst::UpdateState(grpc_connectivity_state state,
591                             const absl::Status& status,
592                             RefCountedPtr<SubchannelPicker> picker) {
593   state_ = state;
594   channel_control_helper()->UpdateState(state, status, std::move(picker));
595 }
596 
UnsetSelectedSubchannel()597 void PickFirst::UnsetSelectedSubchannel() {
598   if (selected_ != nullptr && health_data_watcher_ != nullptr) {
599     selected_->subchannel()->CancelDataWatcher(health_data_watcher_);
600   }
601   selected_.reset();
602   health_watcher_ = nullptr;
603   health_data_watcher_ = nullptr;
604 }
605 
GoIdle()606 void PickFirst::GoIdle() {
607   // Unset the selected subchannel.
608   UnsetSelectedSubchannel();
609   // Drop the current subchannel list, if any.
610   subchannel_list_.reset();
611   // Request a re-resolution.
612   // TODO(qianchengz): We may want to request re-resolution in
613   // ExitIdleLocked() instead.
614   channel_control_helper()->RequestReresolution();
615   // Enter idle.
616   UpdateState(GRPC_CHANNEL_IDLE, absl::OkStatus(),
617               MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
618 }
619 
620 //
621 // PickFirst::HealthWatcher
622 //
623 
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)624 void PickFirst::HealthWatcher::OnConnectivityStateChange(
625     grpc_connectivity_state new_state, absl::Status status) {
626   if (policy_->health_watcher_ != this) return;
627   GRPC_TRACE_LOG(pick_first, INFO)
628       << "[PF " << policy_.get()
629       << "] health watch state update: " << ConnectivityStateName(new_state)
630       << " (" << status << ")";
631   switch (new_state) {
632     case GRPC_CHANNEL_READY:
633       policy_->channel_control_helper()->UpdateState(
634           GRPC_CHANNEL_READY, absl::OkStatus(),
635           MakeRefCounted<Picker>(policy_->selected_->subchannel()->Ref()));
636       break;
637     case GRPC_CHANNEL_IDLE:
638       // If the subchannel becomes disconnected, the health watcher
639       // might happen to see the change before the raw connectivity
640       // state watcher does.  In this case, ignore it, since the raw
641       // connectivity state watcher will handle it shortly.
642       break;
643     case GRPC_CHANNEL_CONNECTING:
644       policy_->channel_control_helper()->UpdateState(
645           new_state, absl::OkStatus(),
646           MakeRefCounted<QueuePicker>(policy_->Ref()));
647       break;
648     case GRPC_CHANNEL_TRANSIENT_FAILURE: {
649       std::string message = absl::StrCat("health watch: ", status.message());
650       if (!resolution_note_.empty()) {
651         absl::StrAppend(&message, " (", resolution_note_, ")");
652       }
653       policy_->channel_control_helper()->UpdateState(
654           GRPC_CHANNEL_TRANSIENT_FAILURE, status,
655           MakeRefCounted<TransientFailurePicker>(
656               absl::UnavailableError(message)));
657       break;
658     }
659     case GRPC_CHANNEL_SHUTDOWN:
660       Crash("health watcher reported state SHUTDOWN");
661   }
662 }
663 
664 //
665 // PickFirst::SubchannelList::SubchannelData::SubchannelState
666 //
667 
SubchannelState(SubchannelData * subchannel_data,RefCountedPtr<SubchannelInterface> subchannel)668 PickFirst::SubchannelList::SubchannelData::SubchannelState::SubchannelState(
669     SubchannelData* subchannel_data,
670     RefCountedPtr<SubchannelInterface> subchannel)
671     : subchannel_data_(subchannel_data),
672       pick_first_(subchannel_data_->subchannel_list_->policy_),
673       subchannel_(std::move(subchannel)) {
674   GRPC_TRACE_LOG(pick_first, INFO)
675       << "[PF " << pick_first_.get() << "] subchannel state " << this
676       << " (subchannel " << subchannel_.get() << "): starting watch";
677   auto watcher = std::make_unique<Watcher>(Ref(DEBUG_LOCATION, "Watcher"));
678   watcher_ = watcher.get();
679   subchannel_->WatchConnectivityState(std::move(watcher));
680 }
681 
Orphan()682 void PickFirst::SubchannelList::SubchannelData::SubchannelState::Orphan() {
683   GRPC_TRACE_LOG(pick_first, INFO)
684       << "[PF " << pick_first_.get() << "] subchannel state " << this
685       << " (subchannel " << subchannel_.get()
686       << "): cancelling watch and unreffing subchannel";
687   subchannel_data_ = nullptr;
688   subchannel_->CancelConnectivityStateWatch(watcher_);
689   watcher_ = nullptr;
690   subchannel_.reset();
691   pick_first_.reset();
692   Unref();
693 }
694 
Select()695 void PickFirst::SubchannelList::SubchannelData::SubchannelState::Select() {
696   GRPC_TRACE_LOG(pick_first, INFO)
697       << "Pick First " << pick_first_.get() << " selected subchannel "
698       << subchannel_.get();
699   CHECK_NE(subchannel_data_, nullptr);
700   pick_first_->UnsetSelectedSubchannel();  // Cancel health watch, if any.
701   pick_first_->selected_ = std::move(subchannel_data_->subchannel_state_);
702   // If health checking is enabled, start the health watch, but don't
703   // report a new picker -- we want to stay in CONNECTING while we wait
704   // for the health status notification.
705   // If health checking is NOT enabled, report READY.
706   if (pick_first_->enable_health_watch_) {
707     GRPC_TRACE_LOG(pick_first, INFO)
708         << "[PF " << pick_first_.get() << "] starting health watch";
709     auto watcher = std::make_unique<HealthWatcher>(
710         pick_first_.Ref(DEBUG_LOCATION, "HealthWatcher"),
711         subchannel_data_->subchannel_list_->resolution_note_);
712     pick_first_->health_watcher_ = watcher.get();
713     auto health_data_watcher = MakeHealthCheckWatcher(
714         pick_first_->work_serializer(),
715         subchannel_data_->subchannel_list_->args_, std::move(watcher));
716     pick_first_->health_data_watcher_ = health_data_watcher.get();
717     subchannel_->AddDataWatcher(std::move(health_data_watcher));
718   } else {
719     pick_first_->UpdateState(GRPC_CHANNEL_READY, absl::OkStatus(),
720                              MakeRefCounted<Picker>(subchannel_));
721   }
722   // Report successful connection.
723   // We consider it a successful connection attempt only if the
724   // previous state was CONNECTING.  In particular, we don't want to
725   // increment this counter if we got a new address list and found the
726   // existing connection already in state READY.
727   if (subchannel_data_->connectivity_state_ == GRPC_CHANNEL_CONNECTING) {
728     auto& stats_plugins =
729         pick_first_->channel_control_helper()->GetStatsPluginGroup();
730     stats_plugins.AddCounter(
731         kMetricConnectionAttemptsSucceeded, 1,
732         {pick_first_->channel_control_helper()->GetTarget()}, {});
733   }
734   // Drop our pointer to subchannel_data_, so that we know not to
735   // interact with it on subsequent connectivity state updates.
736   subchannel_data_ = nullptr;
737   // Clean up subchannel list.
738   pick_first_->subchannel_list_.reset();
739 }
740 
741 void PickFirst::SubchannelList::SubchannelData::SubchannelState::
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)742     OnConnectivityStateChange(grpc_connectivity_state new_state,
743                               absl::Status status) {
744   if (watcher_ == nullptr) return;
745   GRPC_TRACE_LOG(pick_first, INFO)
746       << "[PF " << pick_first_.get() << "] subchannel state " << this
747       << " (subchannel " << subchannel_.get()
748       << "): connectivity changed: new_state="
749       << ConnectivityStateName(new_state) << ", status=" << status
750       << ", watcher=" << watcher_ << ", subchannel_data_=" << subchannel_data_
751       << ", pick_first_->selected_=" << pick_first_->selected_.get();
752   // If we're still part of a subchannel list trying to connect, check
753   // if we're connected.
754   if (subchannel_data_ != nullptr) {
755     CHECK_EQ(pick_first_->subchannel_list_.get(),
756              subchannel_data_->subchannel_list_);
757     // If the subchannel is READY, use it.
758     // Otherwise, tell the subchannel list to keep trying.
759     if (new_state == GRPC_CHANNEL_READY) {
760       Select();
761     } else {
762       subchannel_data_->OnConnectivityStateChange(new_state, std::move(status));
763     }
764     return;
765   }
766   // We aren't trying to connect, so we must be the selected subchannel.
767   CHECK_EQ(pick_first_->selected_.get(), this);
768   GRPC_TRACE_LOG(pick_first, INFO)
769       << "Pick First " << pick_first_.get()
770       << " selected subchannel connectivity changed to "
771       << ConnectivityStateName(new_state);
772   // Any state change is considered to be a failure of the existing
773   // connection.  Report the failure.
774   auto& stats_plugins =
775       pick_first_->channel_control_helper()->GetStatsPluginGroup();
776   stats_plugins.AddCounter(kMetricDisconnections, 1,
777                            {pick_first_->channel_control_helper()->GetTarget()},
778                            {});
779   // Report IDLE.
780   pick_first_->GoIdle();
781 }
782 
783 //
784 // PickFirst::SubchannelList::SubchannelData
785 //
786 
SubchannelData(SubchannelList * subchannel_list,size_t index,RefCountedPtr<SubchannelInterface> subchannel)787 PickFirst::SubchannelList::SubchannelData::SubchannelData(
788     SubchannelList* subchannel_list, size_t index,
789     RefCountedPtr<SubchannelInterface> subchannel)
790     : subchannel_list_(subchannel_list), index_(index) {
791   GRPC_TRACE_LOG(pick_first, INFO)
792       << "[PF " << subchannel_list_->policy_.get() << "] subchannel list "
793       << subchannel_list_ << " index " << index_
794       << ": creating subchannel data";
795   subchannel_state_ =
796       MakeOrphanable<SubchannelState>(this, std::move(subchannel));
797 }
798 
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)799 void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
800     grpc_connectivity_state new_state, absl::Status status) {
801   PickFirst* p = subchannel_list_->policy_.get();
802   GRPC_TRACE_LOG(pick_first, INFO)
803       << "[PF " << p << "] subchannel list " << subchannel_list_ << " index "
804       << index_ << " of " << subchannel_list_->size() << " (subchannel_state "
805       << subchannel_state_.get() << "): connectivity changed: old_state="
806       << (connectivity_state_.has_value()
807               ? ConnectivityStateName(*connectivity_state_)
808               : "N/A")
809       << ", new_state=" << ConnectivityStateName(new_state)
810       << ", status=" << status
811       << ", seen_transient_failure=" << seen_transient_failure_
812       << ", p->selected_=" << p->selected_.get()
813       << ", p->subchannel_list_=" << p->subchannel_list_.get()
814       << ", p->subchannel_list_->shutting_down_="
815       << p->subchannel_list_->shutting_down_;
816   if (subchannel_list_->shutting_down_) return;
817   // The notification must be for a subchannel in the current list.
818   CHECK_EQ(subchannel_list_, p->subchannel_list_.get());
819   // SHUTDOWN should never happen.
820   CHECK_NE(new_state, GRPC_CHANNEL_SHUTDOWN);
821   // READY should be caught by SubchannelState, in which case it will
822   // not call us in the first place.
823   CHECK_NE(new_state, GRPC_CHANNEL_READY);
824   // Update state.
825   absl::optional<grpc_connectivity_state> old_state = connectivity_state_;
826   connectivity_state_ = new_state;
827   connectivity_status_ = std::move(status);
828   // Make sure we note when a subchannel has seen TRANSIENT_FAILURE.
829   bool prev_seen_transient_failure = seen_transient_failure_;
830   if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
831     seen_transient_failure_ = true;
832     subchannel_list_->last_failure_ = connectivity_status_;
833   }
834   // If this is the initial connectivity state update for this subchannel,
835   // increment the counter in the subchannel list.
836   if (!old_state.has_value()) {
837     ++subchannel_list_->num_subchannels_seen_initial_notification_;
838   }
839   // If we haven't yet seen the initial connectivity state notification
840   // for all subchannels, do nothing.
841   if (!subchannel_list_->AllSubchannelsSeenInitialState()) return;
842   // If we're still here and this is the initial connectivity state
843   // notification for this subchannel, that means it was the last one to
844   // see its initial notification.  So we now have enough state to
845   // figure out how to proceed.
846   if (!old_state.has_value()) {
847     // If we already have a selected subchannel and we got here, that
848     // means that none of the subchannels on the new list are in READY
849     // state, which means that the address we're currently connected to
850     // is not in the new list.  In that case, we drop the current
851     // connection and report IDLE.
852     if (p->selected_ != nullptr) {
853       GRPC_TRACE_LOG(pick_first, INFO)
854           << "[PF " << p << "] subchannel list " << subchannel_list_
855           << ": new update has no subchannels in state READY; dropping "
856              "existing connection and going IDLE";
857       p->GoIdle();
858     } else {
859       // Start trying to connect, starting with the first subchannel.
860       subchannel_list_->StartConnectingNextSubchannel();
861     }
862     return;
863   }
864   // We've already started trying to connect.  Any subchannel that
865   // reports TF is a connection attempt failure.
866   if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
867     auto& stats_plugins = subchannel_list_->policy_->channel_control_helper()
868                               ->GetStatsPluginGroup();
869     stats_plugins.AddCounter(
870         kMetricConnectionAttemptsFailed, 1,
871         {subchannel_list_->policy_->channel_control_helper()->GetTarget()}, {});
872   }
873   // Otherwise, process connectivity state change.
874   switch (*connectivity_state_) {
875     case GRPC_CHANNEL_TRANSIENT_FAILURE: {
876       // If this is the first failure we've seen on this subchannel,
877       // then we're still in the Happy Eyeballs pass.
878       if (!prev_seen_transient_failure && seen_transient_failure_) {
879         // If a connection attempt fails before the timer fires, then
880         // cancel the timer and start connecting on the next subchannel.
881         if (index_ == subchannel_list_->attempting_index_) {
882           if (subchannel_list_->timer_handle_.has_value()) {
883             p->channel_control_helper()->GetEventEngine()->Cancel(
884                 *subchannel_list_->timer_handle_);
885           }
886           ++subchannel_list_->attempting_index_;
887           subchannel_list_->StartConnectingNextSubchannel();
888         } else {
889           // If this was the last subchannel to fail, check if the Happy
890           // Eyeballs pass is complete.
891           subchannel_list_->MaybeFinishHappyEyeballsPass();
892         }
893       } else if (subchannel_list_->IsHappyEyeballsPassComplete()) {
894         // We're done with the initial Happy Eyeballs pass and in a mode
895         // where we're attempting to connect to every subchannel in
896         // parallel.  We count the number of failed connection attempts,
897         // and when that is equal to the number of subchannels, request
898         // re-resolution and report TRANSIENT_FAILURE again, so that the
899         // caller has the most recent status message.  Note that this
900         // isn't necessarily the same as saying that we've seen one
901         // failure for each subchannel in the list, because the backoff
902         // state may be different in each subchannel, so we may have seen
903         // one subchannel fail more than once and another subchannel not
904         // fail at all.  But it's a good enough heuristic.
905         ++subchannel_list_->num_failures_;
906         if (subchannel_list_->num_failures_ % subchannel_list_->size() == 0) {
907           p->channel_control_helper()->RequestReresolution();
908           status = absl::UnavailableError(absl::StrCat(
909               (p->omit_status_message_prefix_
910                    ? ""
911                    : "failed to connect to all addresses; last error: "),
912               connectivity_status_.ToString()));
913           subchannel_list_->ReportTransientFailure(std::move(status));
914         }
915       }
916       break;
917     }
918     case GRPC_CHANNEL_IDLE:
919       // If we've finished the first Happy Eyeballs pass, then we go
920       // into a mode where we immediately try to connect to every
921       // subchannel in parallel.
922       if (subchannel_list_->IsHappyEyeballsPassComplete()) {
923         subchannel_state_->RequestConnection();
924       }
925       break;
926     case GRPC_CHANNEL_CONNECTING:
927       // Only update connectivity state only if we're not already in
928       // TRANSIENT_FAILURE.
929       // TODO(roth): Squelch duplicate CONNECTING updates.
930       if (p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
931         p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::OkStatus(),
932                        MakeRefCounted<QueuePicker>(nullptr));
933       }
934       break;
935     default:
936       // We handled READY above, and we should never see SHUTDOWN.
937       GPR_UNREACHABLE_CODE(break);
938   }
939 }
940 
RequestConnectionWithTimer()941 void PickFirst::SubchannelList::SubchannelData::RequestConnectionWithTimer() {
942   CHECK(connectivity_state_.has_value());
943   if (connectivity_state_ == GRPC_CHANNEL_IDLE) {
944     subchannel_state_->RequestConnection();
945   } else {
946     CHECK_EQ(connectivity_state_.value(), GRPC_CHANNEL_CONNECTING);
947   }
948   // If this is not the last subchannel in the list, start the timer.
949   if (index_ != subchannel_list_->size() - 1) {
950     PickFirst* p = subchannel_list_->policy_.get();
951     GRPC_TRACE_LOG(pick_first, INFO)
952         << "Pick First " << p << " subchannel list " << subchannel_list_
953         << ": starting Connection Attempt Delay timer for "
954         << p->connection_attempt_delay_.millis() << "ms for index " << index_;
955     subchannel_list_->timer_handle_ =
956         p->channel_control_helper()->GetEventEngine()->RunAfter(
957             p->connection_attempt_delay_,
958             [subchannel_list =
959                  subchannel_list_->Ref(DEBUG_LOCATION, "timer")]() mutable {
960               ApplicationCallbackExecCtx application_exec_ctx;
961               ExecCtx exec_ctx;
962               auto* sl = subchannel_list.get();
963               sl->policy_->work_serializer()->Run(
964                   [subchannel_list = std::move(subchannel_list)]() {
965                     GRPC_TRACE_LOG(pick_first, INFO)
966                         << "Pick First " << subchannel_list->policy_.get()
967                         << " subchannel list " << subchannel_list.get()
968                         << ": Connection Attempt Delay timer fired "
969                            "(shutting_down="
970                         << subchannel_list->shutting_down_ << ", selected="
971                         << subchannel_list->policy_->selected_.get() << ")";
972                     if (subchannel_list->shutting_down_) return;
973                     if (subchannel_list->policy_->selected_ != nullptr) return;
974                     ++subchannel_list->attempting_index_;
975                     subchannel_list->StartConnectingNextSubchannel();
976                   },
977                   DEBUG_LOCATION);
978             });
979   }
980 }
981 
982 //
983 // PickFirst::SubchannelList
984 //
985 
SubchannelList(RefCountedPtr<PickFirst> policy,EndpointAddressesIterator * addresses,const ChannelArgs & args,absl::string_view resolution_note)986 PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
987                                           EndpointAddressesIterator* addresses,
988                                           const ChannelArgs& args,
989                                           absl::string_view resolution_note)
990     : InternallyRefCounted<SubchannelList>(
991           GRPC_TRACE_FLAG_ENABLED(pick_first) ? "SubchannelList" : nullptr),
992       policy_(std::move(policy)),
993       args_(
994           args.Remove(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
995               .Remove(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)),
996       resolution_note_(resolution_note) {
997   GRPC_TRACE_LOG(pick_first, INFO)
998       << "[PF " << policy_.get() << "] Creating subchannel list " << this
999       << " - channel args: " << args_.ToString();
1000   if (addresses == nullptr) return;
1001   // Create a subchannel for each address.
1002   addresses->ForEach([&](const EndpointAddresses& address) {
1003     CHECK_EQ(address.addresses().size(), 1u);
1004     RefCountedPtr<SubchannelInterface> subchannel =
1005         policy_->channel_control_helper()->CreateSubchannel(
1006             address.address(), address.args(), args_);
1007     if (subchannel == nullptr) {
1008       // Subchannel could not be created.
1009       GRPC_TRACE_LOG(pick_first, INFO)
1010           << "[PF " << policy_.get()
1011           << "] could not create subchannel for address " << address.ToString()
1012           << ", ignoring";
1013       return;
1014     }
1015     GRPC_TRACE_LOG(pick_first, INFO)
1016         << "[PF " << policy_.get() << "] subchannel list " << this << " index "
1017         << subchannels_.size() << ": Created subchannel " << subchannel.get()
1018         << " for address " << address.ToString();
1019     subchannels_.emplace_back(std::make_unique<SubchannelData>(
1020         this, subchannels_.size(), std::move(subchannel)));
1021   });
1022 }
1023 
~SubchannelList()1024 PickFirst::SubchannelList::~SubchannelList() {
1025   GRPC_TRACE_LOG(pick_first, INFO)
1026       << "[PF " << policy_.get() << "] Destroying subchannel_list " << this;
1027 }
1028 
Orphan()1029 void PickFirst::SubchannelList::Orphan() {
1030   GRPC_TRACE_LOG(pick_first, INFO)
1031       << "[PF " << policy_.get() << "] Shutting down subchannel_list " << this;
1032   CHECK(!shutting_down_);
1033   shutting_down_ = true;
1034   // Shut down subchannels.
1035   subchannels_.clear();
1036   // Cancel Happy Eyeballs timer, if any.
1037   if (timer_handle_.has_value()) {
1038     policy_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
1039   }
1040   Unref();
1041 }
1042 
ResetBackoffLocked()1043 void PickFirst::SubchannelList::ResetBackoffLocked() {
1044   for (auto& sd : subchannels_) {
1045     sd->ResetBackoffLocked();
1046   }
1047 }
1048 
ReportTransientFailure(absl::Status status)1049 void PickFirst::SubchannelList::ReportTransientFailure(absl::Status status) {
1050   if (!resolution_note_.empty()) {
1051     status = absl::Status(status.code(), absl::StrCat(status.message(), " (",
1052                                                       resolution_note_, ")"));
1053   }
1054   policy_->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1055                        MakeRefCounted<TransientFailurePicker>(status));
1056 }
1057 
StartConnectingNextSubchannel()1058 void PickFirst::SubchannelList::StartConnectingNextSubchannel() {
1059   // Find the next subchannel not in state TRANSIENT_FAILURE.
1060   // We skip subchannels in state TRANSIENT_FAILURE to avoid a
1061   // large recursion that could overflow the stack.
1062   for (; attempting_index_ < size(); ++attempting_index_) {
1063     SubchannelData* sc = subchannels_[attempting_index_].get();
1064     CHECK(sc->connectivity_state().has_value());
1065     if (sc->connectivity_state() != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1066       // Found a subchannel not in TRANSIENT_FAILURE, so trigger a
1067       // connection attempt.
1068       sc->RequestConnectionWithTimer();
1069       return;
1070     }
1071   }
1072   // If we didn't find a subchannel to request a connection on, check to
1073   // see if the Happy Eyeballs pass is complete.
1074   MaybeFinishHappyEyeballsPass();
1075 }
1076 
MaybeFinishHappyEyeballsPass()1077 void PickFirst::SubchannelList::MaybeFinishHappyEyeballsPass() {
1078   // Make sure all subchannels have finished a connection attempt before
1079   // we consider the Happy Eyeballs pass complete.
1080   if (!IsHappyEyeballsPassComplete()) return;
1081   // We didn't find another subchannel not in state TRANSIENT_FAILURE,
1082   // so report TRANSIENT_FAILURE and switch to a mode in which we try to
1083   // connect to all addresses in parallel.
1084   GRPC_TRACE_LOG(pick_first, INFO)
1085       << "Pick First " << policy_.get() << " subchannel list " << this
1086       << " failed to connect to all subchannels";
1087   // Re-resolve and report TRANSIENT_FAILURE.
1088   policy_->channel_control_helper()->RequestReresolution();
1089   absl::Status status = absl::UnavailableError(
1090       absl::StrCat((policy_->omit_status_message_prefix_
1091                         ? ""
1092                         : "failed to connect to all addresses; last error: "),
1093                    last_failure_.ToString()));
1094   ReportTransientFailure(std::move(status));
1095   // Drop the existing (working) connection, if any.  This may be
1096   // sub-optimal, but we can't ignore what the control plane told us.
1097   policy_->UnsetSelectedSubchannel();
1098   // We now transition into a mode where we try to connect to all
1099   // subchannels in parallel.  For any subchannel currently in IDLE,
1100   // trigger a connection attempt.  For any subchannel not currently in
1101   // IDLE, we will trigger a connection attempt when it does report IDLE.
1102   for (auto& sd : subchannels_) {
1103     if (sd->connectivity_state() == GRPC_CHANNEL_IDLE) {
1104       sd->RequestConnection();
1105     }
1106   }
1107 }
1108 
1109 // TODO(roth): Remove this when the pick_first_new experiment is removed.
1110 class OldPickFirst final : public LoadBalancingPolicy {
1111  public:
1112   explicit OldPickFirst(Args args);
1113 
name() const1114   absl::string_view name() const override { return kPickFirst; }
1115 
1116   absl::Status UpdateLocked(UpdateArgs args) override;
1117   void ExitIdleLocked() override;
1118   void ResetBackoffLocked() override;
1119 
1120  private:
1121   ~OldPickFirst() override;
1122 
1123   class SubchannelList final : public InternallyRefCounted<SubchannelList> {
1124    public:
1125     class SubchannelData final {
1126      public:
1127       SubchannelData(SubchannelList* subchannel_list, size_t index,
1128                      RefCountedPtr<SubchannelInterface> subchannel);
1129 
subchannel() const1130       SubchannelInterface* subchannel() const { return subchannel_.get(); }
connectivity_state() const1131       absl::optional<grpc_connectivity_state> connectivity_state() const {
1132         return connectivity_state_;
1133       }
connectivity_status() const1134       const absl::Status& connectivity_status() const {
1135         return connectivity_status_;
1136       }
1137 
1138       // Resets the connection backoff.
ResetBackoffLocked()1139       void ResetBackoffLocked() {
1140         if (subchannel_ != nullptr) subchannel_->ResetBackoff();
1141       }
1142 
RequestConnection()1143       void RequestConnection() { subchannel_->RequestConnection(); }
1144 
1145       // Requests a connection attempt to start on this subchannel,
1146       // with appropriate Connection Attempt Delay.
1147       // Used only during the Happy Eyeballs pass.
1148       void RequestConnectionWithTimer();
1149 
1150       // Cancels any pending connectivity watch and unrefs the subchannel.
1151       void ShutdownLocked();
1152 
seen_transient_failure() const1153       bool seen_transient_failure() const { return seen_transient_failure_; }
1154 
1155      private:
1156       // Watcher for subchannel connectivity state.
1157       class Watcher final
1158           : public SubchannelInterface::ConnectivityStateWatcherInterface {
1159        public:
Watcher(RefCountedPtr<SubchannelList> subchannel_list,size_t index)1160         Watcher(RefCountedPtr<SubchannelList> subchannel_list, size_t index)
1161             : subchannel_list_(std::move(subchannel_list)), index_(index) {}
1162 
~Watcher()1163         ~Watcher() override {
1164           subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor");
1165         }
1166 
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)1167         void OnConnectivityStateChange(grpc_connectivity_state new_state,
1168                                        absl::Status status) override {
1169           subchannel_list_->subchannels_[index_].OnConnectivityStateChange(
1170               new_state, std::move(status));
1171         }
1172 
interested_parties()1173         grpc_pollset_set* interested_parties() override {
1174           return subchannel_list_->policy_->interested_parties();
1175         }
1176 
1177        private:
1178         RefCountedPtr<SubchannelList> subchannel_list_;
1179         const size_t index_;
1180       };
1181 
1182       // This method will be invoked once soon after instantiation to report
1183       // the current connectivity state, and it will then be invoked again
1184       // whenever the connectivity state changes.
1185       void OnConnectivityStateChange(grpc_connectivity_state new_state,
1186                                      absl::Status status);
1187 
1188       // Processes the connectivity change to READY for an unselected
1189       // subchannel.
1190       void ProcessUnselectedReadyLocked();
1191 
1192       // Backpointer to owning subchannel list.  Not owned.
1193       SubchannelList* subchannel_list_;
1194       const size_t index_;
1195       // The subchannel.
1196       RefCountedPtr<SubchannelInterface> subchannel_;
1197       // Will be non-null when the subchannel's state is being watched.
1198       SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
1199           nullptr;
1200       // Data updated by the watcher.
1201       absl::optional<grpc_connectivity_state> connectivity_state_;
1202       absl::Status connectivity_status_;
1203       bool seen_transient_failure_ = false;
1204     };
1205 
1206     SubchannelList(RefCountedPtr<OldPickFirst> policy,
1207                    EndpointAddressesIterator* addresses,
1208                    const ChannelArgs& args, absl::string_view resolution_note);
1209 
1210     ~SubchannelList() override;
1211 
1212     // The number of subchannels in the list.
size() const1213     size_t size() const { return subchannels_.size(); }
1214 
1215     // Resets connection backoff of all subchannels.
1216     void ResetBackoffLocked();
1217 
1218     void Orphan() override;
1219 
IsHappyEyeballsPassComplete() const1220     bool IsHappyEyeballsPassComplete() const {
1221       // Checking attempting_index_ here is just an optimization -- if
1222       // we haven't actually tried all subchannels yet, then we don't
1223       // need to iterate.
1224       if (attempting_index_ < size()) return false;
1225       for (const SubchannelData& sd : subchannels_) {
1226         if (!sd.seen_transient_failure()) return false;
1227       }
1228       return true;
1229     }
1230 
1231     void ReportTransientFailure(absl::Status status);
1232 
1233    private:
1234     // Returns true if all subchannels have seen their initial
1235     // connectivity state notifications.
AllSubchannelsSeenInitialState() const1236     bool AllSubchannelsSeenInitialState() const {
1237       return num_subchannels_seen_initial_notification_ == size();
1238     }
1239 
1240     // Looks through subchannels_ starting from attempting_index_ to
1241     // find the first one not currently in TRANSIENT_FAILURE, then
1242     // triggers a connection attempt for that subchannel.  If there are
1243     // no more subchannels not in TRANSIENT_FAILURE, calls
1244     // MaybeFinishHappyEyeballsPass().
1245     void StartConnectingNextSubchannel();
1246 
1247     // Checks to see if the initial Happy Eyeballs pass is complete --
1248     // i.e., all subchannels have seen TRANSIENT_FAILURE state at least once.
1249     // If so, transitions to a mode where we try to connect to all subchannels
1250     // in parallel and returns true.
1251     void MaybeFinishHappyEyeballsPass();
1252 
1253     // Backpointer to owning policy.
1254     RefCountedPtr<OldPickFirst> policy_;
1255 
1256     ChannelArgs args_;
1257     std::string resolution_note_;
1258 
1259     // The list of subchannels.
1260     std::vector<SubchannelData> subchannels_;
1261 
1262     // Is this list shutting down? This may be true due to the shutdown of the
1263     // policy itself or because a newer update has arrived while this one hadn't
1264     // finished processing.
1265     bool shutting_down_ = false;
1266 
1267     size_t num_subchannels_seen_initial_notification_ = 0;
1268 
1269     // The index into subchannels_ to which we are currently attempting
1270     // to connect during the initial Happy Eyeballs pass.  Once the
1271     // initial pass is over, this will be equal to size().
1272     size_t attempting_index_ = 0;
1273     // Happy Eyeballs timer handle.
1274     absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
1275         timer_handle_;
1276 
1277     // After the initial Happy Eyeballs pass, the number of failures
1278     // we've seen.  Every size() failures, we trigger re-resolution.
1279     size_t num_failures_ = 0;
1280 
1281     // The status from the last subchannel that reported TRANSIENT_FAILURE.
1282     absl::Status last_failure_;
1283   };
1284 
1285   class HealthWatcher final
1286       : public SubchannelInterface::ConnectivityStateWatcherInterface {
1287    public:
HealthWatcher(RefCountedPtr<OldPickFirst> policy,absl::string_view resolution_note)1288     HealthWatcher(RefCountedPtr<OldPickFirst> policy,
1289                   absl::string_view resolution_note)
1290         : policy_(std::move(policy)), resolution_note_(resolution_note) {}
1291 
~HealthWatcher()1292     ~HealthWatcher() override {
1293       policy_.reset(DEBUG_LOCATION, "HealthWatcher dtor");
1294     }
1295 
1296     void OnConnectivityStateChange(grpc_connectivity_state new_state,
1297                                    absl::Status status) override;
1298 
interested_parties()1299     grpc_pollset_set* interested_parties() override {
1300       return policy_->interested_parties();
1301     }
1302 
1303    private:
1304     RefCountedPtr<OldPickFirst> policy_;
1305     std::string resolution_note_;
1306   };
1307 
1308   class Picker final : public SubchannelPicker {
1309    public:
Picker(RefCountedPtr<SubchannelInterface> subchannel)1310     explicit Picker(RefCountedPtr<SubchannelInterface> subchannel)
1311         : subchannel_(std::move(subchannel)) {}
1312 
Pick(PickArgs)1313     PickResult Pick(PickArgs /*args*/) override {
1314       return PickResult::Complete(subchannel_);
1315     }
1316 
1317    private:
1318     RefCountedPtr<SubchannelInterface> subchannel_;
1319   };
1320 
1321   void ShutdownLocked() override;
1322 
1323   void UpdateState(grpc_connectivity_state state, const absl::Status& status,
1324                    RefCountedPtr<SubchannelPicker> picker);
1325 
1326   void AttemptToConnectUsingLatestUpdateArgsLocked();
1327 
1328   void UnsetSelectedSubchannel();
1329 
1330   // When ExitIdleLocked() is called, we create a subchannel_list_ and start
1331   // trying to connect, but we don't actually change state_ until the first
1332   // subchannel reports CONNECTING.  So in order to know if we're really
1333   // idle, we need to check both state_ and subchannel_list_.
IsIdle() const1334   bool IsIdle() const {
1335     return state_ == GRPC_CHANNEL_IDLE && subchannel_list_ == nullptr;
1336   }
1337 
1338   // Whether we should enable health watching.
1339   const bool enable_health_watch_;
1340   // Whether we should omit our status message prefix.
1341   const bool omit_status_message_prefix_;
1342   // Connection Attempt Delay for Happy Eyeballs.
1343   const Duration connection_attempt_delay_;
1344 
1345   // Lateset update args.
1346   UpdateArgs latest_update_args_;
1347   // All our subchannels.
1348   OrphanablePtr<SubchannelList> subchannel_list_;
1349   // Latest pending subchannel list.
1350   OrphanablePtr<SubchannelList> latest_pending_subchannel_list_;
1351   // Selected subchannel in subchannel_list_.
1352   SubchannelList::SubchannelData* selected_ = nullptr;
1353   // Health watcher for the selected subchannel.
1354   SubchannelInterface::ConnectivityStateWatcherInterface* health_watcher_ =
1355       nullptr;
1356   SubchannelInterface::DataWatcherInterface* health_data_watcher_ = nullptr;
1357   // Current connectivity state.
1358   grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
1359   // Are we shut down?
1360   bool shutdown_ = false;
1361   // Random bit generator used for shuffling addresses if configured
1362   absl::BitGen bit_gen_;
1363 };
1364 
OldPickFirst(Args args)1365 OldPickFirst::OldPickFirst(Args args)
1366     : LoadBalancingPolicy(std::move(args)),
1367       enable_health_watch_(
1368           channel_args()
1369               .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
1370               .value_or(false)),
1371       omit_status_message_prefix_(
1372           channel_args()
1373               .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)
1374               .value_or(false)),
1375       connection_attempt_delay_(Duration::Milliseconds(
1376           Clamp(channel_args()
1377                     .GetInt(GRPC_ARG_HAPPY_EYEBALLS_CONNECTION_ATTEMPT_DELAY_MS)
1378                     .value_or(250),
1379                 100, 2000))) {
1380   GRPC_TRACE_LOG(pick_first, INFO) << "Pick First " << this << " created.";
1381 }
1382 
~OldPickFirst()1383 OldPickFirst::~OldPickFirst() {
1384   GRPC_TRACE_LOG(pick_first, INFO) << "Destroying Pick First " << this;
1385   CHECK(subchannel_list_ == nullptr);
1386   CHECK(latest_pending_subchannel_list_ == nullptr);
1387 }
1388 
ShutdownLocked()1389 void OldPickFirst::ShutdownLocked() {
1390   GRPC_TRACE_LOG(pick_first, INFO) << "Pick First " << this << " Shutting down";
1391   shutdown_ = true;
1392   UnsetSelectedSubchannel();
1393   subchannel_list_.reset();
1394   latest_pending_subchannel_list_.reset();
1395 }
1396 
ExitIdleLocked()1397 void OldPickFirst::ExitIdleLocked() {
1398   if (shutdown_) return;
1399   if (IsIdle()) {
1400     GRPC_TRACE_LOG(pick_first, INFO)
1401         << "Pick First " << this << " exiting idle";
1402     AttemptToConnectUsingLatestUpdateArgsLocked();
1403   }
1404 }
1405 
ResetBackoffLocked()1406 void OldPickFirst::ResetBackoffLocked() {
1407   if (subchannel_list_ != nullptr) subchannel_list_->ResetBackoffLocked();
1408   if (latest_pending_subchannel_list_ != nullptr) {
1409     latest_pending_subchannel_list_->ResetBackoffLocked();
1410   }
1411 }
1412 
AttemptToConnectUsingLatestUpdateArgsLocked()1413 void OldPickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
1414   // Create a subchannel list from latest_update_args_.
1415   EndpointAddressesIterator* addresses = nullptr;
1416   if (latest_update_args_.addresses.ok()) {
1417     addresses = latest_update_args_.addresses->get();
1418   }
1419   // Replace latest_pending_subchannel_list_.
1420   if (GRPC_TRACE_FLAG_ENABLED(pick_first) &&
1421       latest_pending_subchannel_list_ != nullptr) {
1422     LOG(INFO) << "[PF " << this
1423               << "] Shutting down previous pending subchannel list "
1424               << latest_pending_subchannel_list_.get();
1425   }
1426   latest_pending_subchannel_list_ = MakeOrphanable<SubchannelList>(
1427       RefAsSubclass<OldPickFirst>(), addresses, latest_update_args_.args,
1428       latest_update_args_.resolution_note);
1429   // Empty update or no valid subchannels.  Put the channel in
1430   // TRANSIENT_FAILURE and request re-resolution.
1431   if (latest_pending_subchannel_list_->size() == 0) {
1432     channel_control_helper()->RequestReresolution();
1433     absl::Status status = latest_update_args_.addresses.ok()
1434                               ? absl::UnavailableError("empty address list")
1435                               : latest_update_args_.addresses.status();
1436     latest_pending_subchannel_list_->ReportTransientFailure(std::move(status));
1437   }
1438   // If the new update is empty or we don't yet have a selected subchannel in
1439   // the current list, replace the current subchannel list immediately.
1440   if (latest_pending_subchannel_list_->size() == 0 || selected_ == nullptr) {
1441     UnsetSelectedSubchannel();
1442     if (GRPC_TRACE_FLAG_ENABLED(pick_first) && subchannel_list_ != nullptr) {
1443       LOG(INFO) << "[PF " << this << "] Shutting down previous subchannel list "
1444                 << subchannel_list_.get();
1445     }
1446     subchannel_list_ = std::move(latest_pending_subchannel_list_);
1447   }
1448 }
1449 
UpdateLocked(UpdateArgs args)1450 absl::Status OldPickFirst::UpdateLocked(UpdateArgs args) {
1451   if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
1452     if (args.addresses.ok()) {
1453       LOG(INFO) << "Pick First " << this << " received update";
1454     } else {
1455       LOG(INFO) << "Pick First " << this
1456                 << " received update with address error: "
1457                 << args.addresses.status();
1458     }
1459   }
1460   // Set return status based on the address list.
1461   absl::Status status;
1462   if (!args.addresses.ok()) {
1463     status = args.addresses.status();
1464   } else {
1465     EndpointAddressesList endpoints;
1466     (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
1467       endpoints.push_back(endpoint);
1468     });
1469     if (endpoints.empty()) {
1470       status = absl::UnavailableError("address list must not be empty");
1471     } else {
1472       // Shuffle the list if needed.
1473       auto config = static_cast<PickFirstConfig*>(args.config.get());
1474       if (config->shuffle_addresses()) {
1475         absl::c_shuffle(endpoints, bit_gen_);
1476       }
1477       // Flatten the list so that we have one address per endpoint.
1478       // While we're iterating, also determine the desired address family
1479       // order and the index of the first element of each family, for use in
1480       // the interleaving below.
1481       std::set<absl::string_view> address_families;
1482       std::vector<AddressFamilyIterator> address_family_order;
1483       EndpointAddressesList flattened_endpoints;
1484       for (const auto& endpoint : endpoints) {
1485         for (const auto& address : endpoint.addresses()) {
1486           flattened_endpoints.emplace_back(address, endpoint.args());
1487           absl::string_view scheme = GetAddressFamily(address);
1488           bool inserted = address_families.insert(scheme).second;
1489           if (inserted) {
1490             address_family_order.emplace_back(scheme,
1491                                               flattened_endpoints.size() - 1);
1492           }
1493         }
1494       }
1495       endpoints = std::move(flattened_endpoints);
1496       // Interleave addresses as per RFC-8305 section 4.
1497       EndpointAddressesList interleaved_endpoints;
1498       interleaved_endpoints.reserve(endpoints.size());
1499       std::vector<bool> endpoints_moved(endpoints.size());
1500       size_t scheme_index = 0;
1501       for (size_t i = 0; i < endpoints.size(); ++i) {
1502         EndpointAddresses* endpoint;
1503         do {
1504           auto& iterator = address_family_order[scheme_index++ %
1505                                                 address_family_order.size()];
1506           endpoint = iterator.Next(endpoints, &endpoints_moved);
1507         } while (endpoint == nullptr);
1508         interleaved_endpoints.emplace_back(std::move(*endpoint));
1509       }
1510       endpoints = std::move(interleaved_endpoints);
1511       args.addresses =
1512           std::make_shared<EndpointAddressesListIterator>(std::move(endpoints));
1513     }
1514   }
1515   // If the update contains a resolver error and we have a previous update
1516   // that was not a resolver error, keep using the previous addresses.
1517   if (!args.addresses.ok() && latest_update_args_.config != nullptr) {
1518     args.addresses = std::move(latest_update_args_.addresses);
1519   }
1520   // Update latest_update_args_.
1521   latest_update_args_ = std::move(args);
1522   // If we are not in idle, start connection attempt immediately.
1523   // Otherwise, we defer the attempt into ExitIdleLocked().
1524   if (!IsIdle()) {
1525     AttemptToConnectUsingLatestUpdateArgsLocked();
1526   }
1527   return status;
1528 }
1529 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)1530 void OldPickFirst::UpdateState(grpc_connectivity_state state,
1531                                const absl::Status& status,
1532                                RefCountedPtr<SubchannelPicker> picker) {
1533   state_ = state;
1534   channel_control_helper()->UpdateState(state, status, std::move(picker));
1535 }
1536 
UnsetSelectedSubchannel()1537 void OldPickFirst::UnsetSelectedSubchannel() {
1538   if (selected_ != nullptr && health_data_watcher_ != nullptr) {
1539     selected_->subchannel()->CancelDataWatcher(health_data_watcher_);
1540   }
1541   selected_ = nullptr;
1542   health_watcher_ = nullptr;
1543   health_data_watcher_ = nullptr;
1544 }
1545 
1546 //
1547 // OldPickFirst::HealthWatcher
1548 //
1549 
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)1550 void OldPickFirst::HealthWatcher::OnConnectivityStateChange(
1551     grpc_connectivity_state new_state, absl::Status status) {
1552   if (policy_->health_watcher_ != this) return;
1553   GRPC_TRACE_LOG(pick_first, INFO)
1554       << "[PF " << policy_.get()
1555       << "] health watch state update: " << ConnectivityStateName(new_state)
1556       << " (" << status << ")";
1557   switch (new_state) {
1558     case GRPC_CHANNEL_READY:
1559       policy_->channel_control_helper()->UpdateState(
1560           GRPC_CHANNEL_READY, absl::OkStatus(),
1561           MakeRefCounted<Picker>(policy_->selected_->subchannel()->Ref()));
1562       break;
1563     case GRPC_CHANNEL_IDLE:
1564       // If the subchannel becomes disconnected, the health watcher
1565       // might happen to see the change before the raw connectivity
1566       // state watcher does.  In this case, ignore it, since the raw
1567       // connectivity state watcher will handle it shortly.
1568       break;
1569     case GRPC_CHANNEL_CONNECTING:
1570       policy_->channel_control_helper()->UpdateState(
1571           new_state, absl::OkStatus(),
1572           MakeRefCounted<QueuePicker>(policy_->Ref()));
1573       break;
1574     case GRPC_CHANNEL_TRANSIENT_FAILURE: {
1575       std::string message = absl::StrCat("health watch: ", status.message());
1576       if (!resolution_note_.empty()) {
1577         absl::StrAppend(&message, " (", resolution_note_, ")");
1578       }
1579       policy_->channel_control_helper()->UpdateState(
1580           GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1581           MakeRefCounted<TransientFailurePicker>(
1582               absl::UnavailableError(message)));
1583       break;
1584     }
1585     case GRPC_CHANNEL_SHUTDOWN:
1586       Crash("health watcher reported state SHUTDOWN");
1587   }
1588 }
1589 
1590 //
1591 // OldPickFirst::SubchannelList::SubchannelData
1592 //
1593 
SubchannelData(SubchannelList * subchannel_list,size_t index,RefCountedPtr<SubchannelInterface> subchannel)1594 OldPickFirst::SubchannelList::SubchannelData::SubchannelData(
1595     SubchannelList* subchannel_list, size_t index,
1596     RefCountedPtr<SubchannelInterface> subchannel)
1597     : subchannel_list_(subchannel_list),
1598       index_(index),
1599       subchannel_(std::move(subchannel)) {
1600   GRPC_TRACE_LOG(pick_first, INFO)
1601       << "[PF " << subchannel_list_->policy_.get() << "] subchannel list "
1602       << subchannel_list_ << " index " << index_ << " (subchannel "
1603       << subchannel_.get() << "): starting watch";
1604   auto watcher = std::make_unique<Watcher>(
1605       subchannel_list_->Ref(DEBUG_LOCATION, "Watcher"), index_);
1606   pending_watcher_ = watcher.get();
1607   subchannel_->WatchConnectivityState(std::move(watcher));
1608 }
1609 
ShutdownLocked()1610 void OldPickFirst::SubchannelList::SubchannelData::ShutdownLocked() {
1611   if (subchannel_ != nullptr) {
1612     GRPC_TRACE_LOG(pick_first, INFO)
1613         << "[PF " << subchannel_list_->policy_.get() << "] subchannel list "
1614         << subchannel_list_ << " index " << index_ << " of "
1615         << subchannel_list_->size() << " (subchannel " << subchannel_.get()
1616         << "): cancelling watch and unreffing subchannel";
1617     subchannel_->CancelConnectivityStateWatch(pending_watcher_);
1618     pending_watcher_ = nullptr;
1619     subchannel_.reset();
1620   }
1621 }
1622 
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)1623 void OldPickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
1624     grpc_connectivity_state new_state, absl::Status status) {
1625   OldPickFirst* p = subchannel_list_->policy_.get();
1626   GRPC_TRACE_LOG(pick_first, INFO)
1627       << "[PF " << p << "] subchannel list " << subchannel_list_ << " index "
1628       << index_ << " of " << subchannel_list_->size() << " (subchannel "
1629       << subchannel_.get() << "): connectivity changed: old_state="
1630       << (connectivity_state_.has_value()
1631               ? ConnectivityStateName(*connectivity_state_)
1632               : "N/A")
1633       << ", new_state=" << ConnectivityStateName(new_state)
1634       << ", status=" << status
1635       << ", shutting_down=" << subchannel_list_->shutting_down_
1636       << ", pending_watcher=" << pending_watcher_
1637       << ", seen_transient_failure=" << seen_transient_failure_
1638       << ", p->selected_=" << p->selected_
1639       << ", p->subchannel_list_=" << p->subchannel_list_.get()
1640       << ", p->latest_pending_subchannel_list_="
1641       << p->latest_pending_subchannel_list_.get();
1642   if (subchannel_list_->shutting_down_ || pending_watcher_ == nullptr) return;
1643   auto& stats_plugins = subchannel_list_->policy_->channel_control_helper()
1644                             ->GetStatsPluginGroup();
1645   // The notification must be for a subchannel in either the current or
1646   // latest pending subchannel lists.
1647   CHECK(subchannel_list_ == p->subchannel_list_.get() ||
1648         subchannel_list_ == p->latest_pending_subchannel_list_.get());
1649   CHECK(new_state != GRPC_CHANNEL_SHUTDOWN);
1650   absl::optional<grpc_connectivity_state> old_state = connectivity_state_;
1651   connectivity_state_ = new_state;
1652   connectivity_status_ = std::move(status);
1653   // Handle updates for the currently selected subchannel.
1654   if (p->selected_ == this) {
1655     CHECK(subchannel_list_ == p->subchannel_list_.get());
1656     GRPC_TRACE_LOG(pick_first, INFO)
1657         << "Pick First " << p << " selected subchannel connectivity changed to "
1658         << ConnectivityStateName(new_state);
1659     // Any state change is considered to be a failure of the existing
1660     // connection.
1661     stats_plugins.AddCounter(
1662         kMetricDisconnections, 1,
1663         {subchannel_list_->policy_->channel_control_helper()->GetTarget()}, {});
1664     // TODO(roth): We could check the connectivity states of all the
1665     // subchannels here, just in case one of them happens to be READY,
1666     // and we could switch to that rather than going IDLE.
1667     // Request a re-resolution.
1668     // TODO(qianchengz): We may want to request re-resolution in
1669     // ExitIdleLocked().
1670     p->channel_control_helper()->RequestReresolution();
1671     // If there is a pending update, switch to the pending update.
1672     if (p->latest_pending_subchannel_list_ != nullptr) {
1673       GRPC_TRACE_LOG(pick_first, INFO)
1674           << "Pick First " << p << " promoting pending subchannel list "
1675           << p->latest_pending_subchannel_list_.get() << " to replace "
1676           << p->subchannel_list_.get();
1677       p->UnsetSelectedSubchannel();
1678       p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
1679       // Set our state to that of the pending subchannel list.
1680       if (p->subchannel_list_->IsHappyEyeballsPassComplete()) {
1681         status = absl::UnavailableError(absl::StrCat(
1682             "selected subchannel failed; switching to pending update; "
1683             "last failure: ",
1684             p->subchannel_list_->last_failure_.ToString()));
1685         subchannel_list_->ReportTransientFailure(std::move(status));
1686       } else if (p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1687         p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::OkStatus(),
1688                        MakeRefCounted<QueuePicker>(nullptr));
1689       }
1690       return;
1691     }
1692     // Enter idle.
1693     p->UnsetSelectedSubchannel();
1694     p->subchannel_list_.reset();
1695     p->UpdateState(
1696         GRPC_CHANNEL_IDLE, absl::OkStatus(),
1697         MakeRefCounted<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
1698     return;
1699   }
1700   // If we get here, there are two possible cases:
1701   // 1. We do not currently have a selected subchannel, and the update is
1702   //    for a subchannel in p->subchannel_list_ that we're trying to
1703   //    connect to.  The goal here is to find a subchannel that we can
1704   //    select.
1705   // 2. We do currently have a selected subchannel, and the update is
1706   //    for a subchannel in p->latest_pending_subchannel_list_.  The
1707   //    goal here is to find a subchannel from the update that we can
1708   //    select in place of the current one.
1709   // If the subchannel is READY, use it.
1710   if (new_state == GRPC_CHANNEL_READY) {
1711     // We consider it a successful connection attempt only if the
1712     // previous state was CONNECTING.  In particular, we don't want to
1713     // increment this counter if we got a new address list and found the
1714     // existing connection already in state READY.
1715     if (old_state == GRPC_CHANNEL_CONNECTING) {
1716       stats_plugins.AddCounter(
1717           kMetricConnectionAttemptsSucceeded, 1,
1718           {subchannel_list_->policy_->channel_control_helper()->GetTarget()},
1719           {});
1720     }
1721     ProcessUnselectedReadyLocked();
1722     return;
1723   }
1724   // Make sure we note when a subchannel has seen TRANSIENT_FAILURE.
1725   bool prev_seen_transient_failure = seen_transient_failure_;
1726   if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
1727     seen_transient_failure_ = true;
1728     subchannel_list_->last_failure_ = connectivity_status_;
1729   }
1730   // If this is the initial connectivity state update for this subchannel,
1731   // increment the counter in the subchannel list.
1732   if (!old_state.has_value()) {
1733     ++subchannel_list_->num_subchannels_seen_initial_notification_;
1734   }
1735   // If we haven't yet seen the initial connectivity state notification
1736   // for all subchannels, do nothing.
1737   if (!subchannel_list_->AllSubchannelsSeenInitialState()) return;
1738   // If we're still here and this is the initial connectivity state
1739   // notification for this subchannel, that means it was the last one to
1740   // see its initial notification.  Start trying to connect, starting
1741   // with the first subchannel.
1742   if (!old_state.has_value()) {
1743     subchannel_list_->StartConnectingNextSubchannel();
1744     return;
1745   }
1746   // We've already started trying to connect.  Any subchannel that
1747   // reports TF is a connection attempt failure.
1748   if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
1749     stats_plugins.AddCounter(
1750         kMetricConnectionAttemptsFailed, 1,
1751         {subchannel_list_->policy_->channel_control_helper()->GetTarget()}, {});
1752   }
1753   // Otherwise, process connectivity state change.
1754   switch (*connectivity_state_) {
1755     case GRPC_CHANNEL_TRANSIENT_FAILURE: {
1756       // If this is the first failure we've seen on this subchannel,
1757       // then we're still in the Happy Eyeballs pass.
1758       if (!prev_seen_transient_failure && seen_transient_failure_) {
1759         // If a connection attempt fails before the timer fires, then
1760         // cancel the timer and start connecting on the next subchannel.
1761         if (index_ == subchannel_list_->attempting_index_) {
1762           if (subchannel_list_->timer_handle_.has_value()) {
1763             p->channel_control_helper()->GetEventEngine()->Cancel(
1764                 *subchannel_list_->timer_handle_);
1765           }
1766           ++subchannel_list_->attempting_index_;
1767           subchannel_list_->StartConnectingNextSubchannel();
1768         } else {
1769           // If this was the last subchannel to fail, check if the Happy
1770           // Eyeballs pass is complete.
1771           subchannel_list_->MaybeFinishHappyEyeballsPass();
1772         }
1773       } else if (subchannel_list_->IsHappyEyeballsPassComplete()) {
1774         // We're done with the initial Happy Eyeballs pass and in a mode
1775         // where we're attempting to connect to every subchannel in
1776         // parallel.  We count the number of failed connection attempts,
1777         // and when that is equal to the number of subchannels, request
1778         // re-resolution and report TRANSIENT_FAILURE again, so that the
1779         // caller has the most recent status message.  Note that this
1780         // isn't necessarily the same as saying that we've seen one
1781         // failure for each subchannel in the list, because the backoff
1782         // state may be different in each subchannel, so we may have seen
1783         // one subchannel fail more than once and another subchannel not
1784         // fail at all.  But it's a good enough heuristic.
1785         ++subchannel_list_->num_failures_;
1786         if (subchannel_list_->num_failures_ % subchannel_list_->size() == 0) {
1787           p->channel_control_helper()->RequestReresolution();
1788           status = absl::UnavailableError(absl::StrCat(
1789               (p->omit_status_message_prefix_
1790                    ? ""
1791                    : "failed to connect to all addresses; last error: "),
1792               connectivity_status_.ToString()));
1793           subchannel_list_->ReportTransientFailure(std::move(status));
1794         }
1795       }
1796       break;
1797     }
1798     case GRPC_CHANNEL_IDLE:
1799       // If we've finished the first Happy Eyeballs pass, then we go
1800       // into a mode where we immediately try to connect to every
1801       // subchannel in parallel.
1802       if (subchannel_list_->IsHappyEyeballsPassComplete()) {
1803         subchannel_->RequestConnection();
1804       }
1805       break;
1806     case GRPC_CHANNEL_CONNECTING:
1807       // Only update connectivity state in case 1, and only if we're not
1808       // already in TRANSIENT_FAILURE.
1809       if (subchannel_list_ == p->subchannel_list_.get() &&
1810           p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1811         p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::OkStatus(),
1812                        MakeRefCounted<QueuePicker>(nullptr));
1813       }
1814       break;
1815     default:
1816       // We handled READY above, and we should never see SHUTDOWN.
1817       GPR_UNREACHABLE_CODE(break);
1818   }
1819 }
1820 
1821 void OldPickFirst::SubchannelList::SubchannelData::
RequestConnectionWithTimer()1822     RequestConnectionWithTimer() {
1823   CHECK(connectivity_state_.has_value());
1824   if (connectivity_state_ == GRPC_CHANNEL_IDLE) {
1825     subchannel_->RequestConnection();
1826   } else {
1827     CHECK(connectivity_state_ == GRPC_CHANNEL_CONNECTING);
1828   }
1829   // If this is not the last subchannel in the list, start the timer.
1830   if (index_ != subchannel_list_->size() - 1) {
1831     OldPickFirst* p = subchannel_list_->policy_.get();
1832     GRPC_TRACE_LOG(pick_first, INFO)
1833         << "Pick First " << p << " subchannel list " << subchannel_list_
1834         << ": starting Connection Attempt Delay timer for "
1835         << p->connection_attempt_delay_.millis() << "ms for index " << index_;
1836     subchannel_list_->timer_handle_ =
1837         p->channel_control_helper()->GetEventEngine()->RunAfter(
1838             p->connection_attempt_delay_,
1839             [subchannel_list =
1840                  subchannel_list_->Ref(DEBUG_LOCATION, "timer")]() mutable {
1841               ApplicationCallbackExecCtx application_exec_ctx;
1842               ExecCtx exec_ctx;
1843               auto* sl = subchannel_list.get();
1844               sl->policy_->work_serializer()->Run(
1845                   [subchannel_list = std::move(subchannel_list)]() {
1846                     GRPC_TRACE_LOG(pick_first, INFO)
1847                         << "Pick First " << subchannel_list->policy_.get()
1848                         << " subchannel list " << subchannel_list.get()
1849                         << ": Connection Attempt Delay timer fired "
1850                         << "(shutting_down=" << subchannel_list->shutting_down_
1851                         << ", selected=" << subchannel_list->policy_->selected_
1852                         << ")";
1853                     if (subchannel_list->shutting_down_) return;
1854                     if (subchannel_list->policy_->selected_ != nullptr) return;
1855                     ++subchannel_list->attempting_index_;
1856                     subchannel_list->StartConnectingNextSubchannel();
1857                   },
1858                   DEBUG_LOCATION);
1859             });
1860   }
1861 }
1862 
1863 void OldPickFirst::SubchannelList::SubchannelData::
ProcessUnselectedReadyLocked()1864     ProcessUnselectedReadyLocked() {
1865   OldPickFirst* p = subchannel_list_->policy_.get();
1866   // Cancel Happy Eyeballs timer, if any.
1867   if (subchannel_list_->timer_handle_.has_value()) {
1868     p->channel_control_helper()->GetEventEngine()->Cancel(
1869         *subchannel_list_->timer_handle_);
1870   }
1871   // If we get here, there are two possible cases:
1872   // 1. We do not currently have a selected subchannel, and the update is
1873   //    for a subchannel in p->subchannel_list_ that we're trying to
1874   //    connect to.  The goal here is to find a subchannel that we can
1875   //    select.
1876   // 2. We do currently have a selected subchannel, and the update is
1877   //    for a subchannel in p->latest_pending_subchannel_list_.  The
1878   //    goal here is to find a subchannel from the update that we can
1879   //    select in place of the current one.
1880   CHECK(subchannel_list_ == p->subchannel_list_.get() ||
1881         subchannel_list_ == p->latest_pending_subchannel_list_.get());
1882   // Case 2.  Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
1883   if (subchannel_list_ == p->latest_pending_subchannel_list_.get()) {
1884     GRPC_TRACE_LOG(pick_first, INFO)
1885         << "Pick First " << p << " promoting pending subchannel list "
1886         << p->latest_pending_subchannel_list_.get() << " to replace "
1887         << p->subchannel_list_.get();
1888     p->UnsetSelectedSubchannel();
1889     p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
1890   }
1891   // Cases 1 and 2.
1892   GRPC_TRACE_LOG(pick_first, INFO)
1893       << "Pick First " << p << " selected subchannel " << subchannel_.get();
1894   p->selected_ = this;
1895   // If health checking is enabled, start the health watch, but don't
1896   // report a new picker -- we want to stay in CONNECTING while we wait
1897   // for the health status notification.
1898   // If health checking is NOT enabled, report READY.
1899   if (p->enable_health_watch_) {
1900     GRPC_TRACE_LOG(pick_first, INFO)
1901         << "[PF " << p << "] starting health watch";
1902     auto watcher = std::make_unique<HealthWatcher>(
1903         p->RefAsSubclass<OldPickFirst>(DEBUG_LOCATION, "HealthWatcher"),
1904         subchannel_list_->resolution_note_);
1905     p->health_watcher_ = watcher.get();
1906     auto health_data_watcher = MakeHealthCheckWatcher(
1907         p->work_serializer(), subchannel_list_->args_, std::move(watcher));
1908     p->health_data_watcher_ = health_data_watcher.get();
1909     subchannel_->AddDataWatcher(std::move(health_data_watcher));
1910   } else {
1911     p->UpdateState(GRPC_CHANNEL_READY, absl::OkStatus(),
1912                    MakeRefCounted<Picker>(subchannel()->Ref()));
1913   }
1914   // Unref all other subchannels in the list.
1915   for (size_t i = 0; i < subchannel_list_->size(); ++i) {
1916     if (i != index_) {
1917       subchannel_list_->subchannels_[i].ShutdownLocked();
1918     }
1919   }
1920 }
1921 
1922 //
1923 // OldPickFirst::SubchannelList
1924 //
1925 
SubchannelList(RefCountedPtr<OldPickFirst> policy,EndpointAddressesIterator * addresses,const ChannelArgs & args,absl::string_view resolution_note)1926 OldPickFirst::SubchannelList::SubchannelList(
1927     RefCountedPtr<OldPickFirst> policy, EndpointAddressesIterator* addresses,
1928     const ChannelArgs& args, absl::string_view resolution_note)
1929     : InternallyRefCounted<SubchannelList>(
1930           GRPC_TRACE_FLAG_ENABLED(pick_first) ? "SubchannelList" : nullptr),
1931       policy_(std::move(policy)),
1932       args_(
1933           args.Remove(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
1934               .Remove(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)),
1935       resolution_note_(resolution_note) {
1936   GRPC_TRACE_LOG(pick_first, INFO)
1937       << "[PF " << policy_.get() << "] Creating subchannel list " << this
1938       << " - channel args: " << args_.ToString();
1939   if (addresses == nullptr) return;
1940   // Create a subchannel for each address.
1941   addresses->ForEach([&](const EndpointAddresses& address) {
1942     CHECK_EQ(address.addresses().size(), 1u);
1943     RefCountedPtr<SubchannelInterface> subchannel =
1944         policy_->channel_control_helper()->CreateSubchannel(
1945             address.address(), address.args(), args_);
1946     if (subchannel == nullptr) {
1947       // Subchannel could not be created.
1948       GRPC_TRACE_LOG(pick_first, INFO)
1949           << "[PF " << policy_.get()
1950           << "] could not create subchannel for address " << address.ToString()
1951           << ", ignoring";
1952       return;
1953     }
1954     GRPC_TRACE_LOG(pick_first, INFO)
1955         << "[PF " << policy_.get() << "] subchannel list " << this << " index "
1956         << subchannels_.size() << ": Created subchannel " << subchannel.get()
1957         << " for address " << address.ToString();
1958     subchannels_.emplace_back(this, subchannels_.size(), std::move(subchannel));
1959   });
1960 }
1961 
~SubchannelList()1962 OldPickFirst::SubchannelList::~SubchannelList() {
1963   GRPC_TRACE_LOG(pick_first, INFO)
1964       << "[PF " << policy_.get() << "] Destroying subchannel_list " << this;
1965 }
1966 
Orphan()1967 void OldPickFirst::SubchannelList::Orphan() {
1968   GRPC_TRACE_LOG(pick_first, INFO)
1969       << "[PF " << policy_.get() << "] Shutting down subchannel_list " << this;
1970   CHECK(!shutting_down_);
1971   shutting_down_ = true;
1972   for (auto& sd : subchannels_) {
1973     sd.ShutdownLocked();
1974   }
1975   if (timer_handle_.has_value()) {
1976     policy_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
1977   }
1978   Unref();
1979 }
1980 
ResetBackoffLocked()1981 void OldPickFirst::SubchannelList::ResetBackoffLocked() {
1982   for (auto& sd : subchannels_) {
1983     sd.ResetBackoffLocked();
1984   }
1985 }
1986 
ReportTransientFailure(absl::Status status)1987 void OldPickFirst::SubchannelList::ReportTransientFailure(absl::Status status) {
1988   if (!resolution_note_.empty()) {
1989     status = absl::Status(status.code(), absl::StrCat(status.message(), " (",
1990                                                       resolution_note_, ")"));
1991   }
1992   policy_->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1993                        MakeRefCounted<TransientFailurePicker>(status));
1994 }
1995 
StartConnectingNextSubchannel()1996 void OldPickFirst::SubchannelList::StartConnectingNextSubchannel() {
1997   // Find the next subchannel not in state TRANSIENT_FAILURE.
1998   // We skip subchannels in state TRANSIENT_FAILURE to avoid a
1999   // large recursion that could overflow the stack.
2000   for (; attempting_index_ < size(); ++attempting_index_) {
2001     SubchannelData* sc = &subchannels_[attempting_index_];
2002     CHECK(sc->connectivity_state().has_value());
2003     if (sc->connectivity_state() != GRPC_CHANNEL_TRANSIENT_FAILURE) {
2004       // Found a subchannel not in TRANSIENT_FAILURE, so trigger a
2005       // connection attempt.
2006       sc->RequestConnectionWithTimer();
2007       return;
2008     }
2009   }
2010   // If we didn't find a subchannel to request a connection on, check to
2011   // see if the Happy Eyeballs pass is complete.
2012   MaybeFinishHappyEyeballsPass();
2013 }
2014 
MaybeFinishHappyEyeballsPass()2015 void OldPickFirst::SubchannelList::MaybeFinishHappyEyeballsPass() {
2016   // Make sure all subchannels have finished a connection attempt before
2017   // we consider the Happy Eyeballs pass complete.
2018   if (!IsHappyEyeballsPassComplete()) return;
2019   // We didn't find another subchannel not in state TRANSIENT_FAILURE,
2020   // so report TRANSIENT_FAILURE and switch to a mode in which we try to
2021   // connect to all addresses in parallel.
2022   GRPC_TRACE_LOG(pick_first, INFO)
2023       << "Pick First " << policy_.get() << " subchannel list " << this
2024       << " failed to connect to all subchannels";
2025   // In case 2, swap to the new subchannel list.  This means reporting
2026   // TRANSIENT_FAILURE and dropping the existing (working) connection,
2027   // but we can't ignore what the control plane has told us.
2028   if (policy_->latest_pending_subchannel_list_.get() == this) {
2029     GRPC_TRACE_LOG(pick_first, INFO)
2030         << "Pick First " << policy_.get()
2031         << " promoting pending subchannel list "
2032         << policy_->latest_pending_subchannel_list_.get() << " to replace "
2033         << this;
2034     policy_->UnsetSelectedSubchannel();
2035     policy_->subchannel_list_ =
2036         std::move(policy_->latest_pending_subchannel_list_);
2037   }
2038   // If this is the current subchannel list (either because we were
2039   // in case 1 or because we were in case 2 and just promoted it to
2040   // be the current list), re-resolve and report new state.
2041   if (policy_->subchannel_list_.get() == this) {
2042     policy_->channel_control_helper()->RequestReresolution();
2043     absl::Status status = absl::UnavailableError(
2044         absl::StrCat((policy_->omit_status_message_prefix_
2045                           ? ""
2046                           : "failed to connect to all addresses; last error: "),
2047                      last_failure_.ToString()));
2048     ReportTransientFailure(std::move(status));
2049   }
2050   // We now transition into a mode where we try to connect to all
2051   // subchannels in parallel.  For any subchannel currently in IDLE,
2052   // trigger a connection attempt.  For any subchannel not currently in
2053   // IDLE, we will trigger a connection attempt when it does report IDLE.
2054   for (SubchannelData& sd : subchannels_) {
2055     if (sd.connectivity_state() == GRPC_CHANNEL_IDLE) {
2056       sd.RequestConnection();
2057     }
2058   }
2059 }
2060 
2061 //
2062 // factory
2063 //
2064 
2065 class PickFirstFactory final : public LoadBalancingPolicyFactory {
2066  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const2067   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
2068       LoadBalancingPolicy::Args args) const override {
2069     if (!IsPickFirstNewEnabled()) {
2070       return MakeOrphanable<OldPickFirst>(std::move(args));
2071     }
2072     return MakeOrphanable<PickFirst>(std::move(args));
2073   }
2074 
name() const2075   absl::string_view name() const override { return kPickFirst; }
2076 
2077   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const2078   ParseLoadBalancingConfig(const Json& json) const override {
2079     return LoadFromJson<RefCountedPtr<PickFirstConfig>>(
2080         json, JsonArgs(), "errors validating pick_first LB policy config");
2081   }
2082 };
2083 
2084 }  // namespace
2085 
RegisterPickFirstLbPolicy(CoreConfiguration::Builder * builder)2086 void RegisterPickFirstLbPolicy(CoreConfiguration::Builder* builder) {
2087   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
2088       std::make_unique<PickFirstFactory>());
2089 }
2090 
2091 }  // namespace grpc_core
2092