• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <string.h>
22 
23 #include <grpc/support/alloc.h>
24 
25 #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
26 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
27 #include "src/core/ext/filters/client_channel/subchannel.h"
28 #include "src/core/ext/filters/client_channel/subchannel_index.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/gprpp/mutex_lock.h"
31 #include "src/core/lib/iomgr/combiner.h"
32 #include "src/core/lib/iomgr/sockaddr_utils.h"
33 #include "src/core/lib/transport/connectivity_state.h"
34 
35 namespace grpc_core {
36 
37 TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
38 
39 namespace {
40 
41 //
42 // pick_first LB policy
43 //
44 
45 class PickFirst : public LoadBalancingPolicy {
46  public:
47   explicit PickFirst(const Args& args);
48 
49   void UpdateLocked(const grpc_channel_args& args) override;
50   bool PickLocked(PickState* pick, grpc_error** error) override;
51   void CancelPickLocked(PickState* pick, grpc_error* error) override;
52   void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
53                                  uint32_t initial_metadata_flags_eq,
54                                  grpc_error* error) override;
55   void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
56                                  grpc_closure* closure) override;
57   grpc_connectivity_state CheckConnectivityLocked(
58       grpc_error** connectivity_error) override;
59   void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
60   void ExitIdleLocked() override;
61   void ResetBackoffLocked() override;
62   void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
63                                 ChildRefsList* ignored) override;
64 
65  private:
66   ~PickFirst();
67 
68   class PickFirstSubchannelList;
69 
70   class PickFirstSubchannelData
71       : public SubchannelData<PickFirstSubchannelList,
72                               PickFirstSubchannelData> {
73    public:
PickFirstSubchannelData(SubchannelList<PickFirstSubchannelList,PickFirstSubchannelData> * subchannel_list,const grpc_lb_user_data_vtable * user_data_vtable,const grpc_lb_address & address,grpc_subchannel * subchannel,grpc_combiner * combiner)74     PickFirstSubchannelData(
75         SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
76             subchannel_list,
77         const grpc_lb_user_data_vtable* user_data_vtable,
78         const grpc_lb_address& address, grpc_subchannel* subchannel,
79         grpc_combiner* combiner)
80         : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
81                          combiner) {}
82 
83     void ProcessConnectivityChangeLocked(
84         grpc_connectivity_state connectivity_state, grpc_error* error) override;
85 
86     // Processes the connectivity change to READY for an unselected subchannel.
87     void ProcessUnselectedReadyLocked();
88 
89     void CheckConnectivityStateAndStartWatchingLocked();
90   };
91 
92   class PickFirstSubchannelList
93       : public SubchannelList<PickFirstSubchannelList,
94                               PickFirstSubchannelData> {
95    public:
PickFirstSubchannelList(PickFirst * policy,TraceFlag * tracer,const grpc_lb_addresses * addresses,grpc_combiner * combiner,grpc_client_channel_factory * client_channel_factory,const grpc_channel_args & args)96     PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
97                             const grpc_lb_addresses* addresses,
98                             grpc_combiner* combiner,
99                             grpc_client_channel_factory* client_channel_factory,
100                             const grpc_channel_args& args)
101         : SubchannelList(policy, tracer, addresses, combiner,
102                          client_channel_factory, args) {
103       // Need to maintain a ref to the LB policy as long as we maintain
104       // any references to subchannels, since the subchannels'
105       // pollset_sets will include the LB policy's pollset_set.
106       policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
107     }
108 
~PickFirstSubchannelList()109     ~PickFirstSubchannelList() {
110       PickFirst* p = static_cast<PickFirst*>(policy());
111       p->Unref(DEBUG_LOCATION, "subchannel_list");
112     }
113   };
114 
115   // Helper class to ensure that any function that modifies the child refs
116   // data structures will update the channelz snapshot data structures before
117   // returning.
118   class AutoChildRefsUpdater {
119    public:
AutoChildRefsUpdater(PickFirst * pf)120     explicit AutoChildRefsUpdater(PickFirst* pf) : pf_(pf) {}
~AutoChildRefsUpdater()121     ~AutoChildRefsUpdater() { pf_->UpdateChildRefsLocked(); }
122 
123    private:
124     PickFirst* pf_;
125   };
126 
127   void ShutdownLocked() override;
128 
129   void StartPickingLocked();
130   void UpdateChildRefsLocked();
131 
132   // All our subchannels.
133   OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
134   // Latest pending subchannel list.
135   OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
136   // Selected subchannel in \a subchannel_list_.
137   PickFirstSubchannelData* selected_ = nullptr;
138   // Have we started picking?
139   bool started_picking_ = false;
140   // Are we shut down?
141   bool shutdown_ = false;
142   // List of picks that are waiting on connectivity.
143   PickState* pending_picks_ = nullptr;
144   // Our connectivity state tracker.
145   grpc_connectivity_state_tracker state_tracker_;
146 
147   /// Lock and data used to capture snapshots of this channels child
148   /// channels and subchannels. This data is consumed by channelz.
149   gpr_mu child_refs_mu_;
150   ChildRefsList child_subchannels_;
151   ChildRefsList child_channels_;
152 };
153 
PickFirst(const Args & args)154 PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
155   GPR_ASSERT(args.client_channel_factory != nullptr);
156   gpr_mu_init(&child_refs_mu_);
157   grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
158                                "pick_first");
159   if (grpc_lb_pick_first_trace.enabled()) {
160     gpr_log(GPR_INFO, "Pick First %p created.", this);
161   }
162   UpdateLocked(*args.args);
163   grpc_subchannel_index_ref();
164 }
165 
~PickFirst()166 PickFirst::~PickFirst() {
167   if (grpc_lb_pick_first_trace.enabled()) {
168     gpr_log(GPR_INFO, "Destroying Pick First %p", this);
169   }
170   gpr_mu_destroy(&child_refs_mu_);
171   GPR_ASSERT(subchannel_list_ == nullptr);
172   GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
173   GPR_ASSERT(pending_picks_ == nullptr);
174   grpc_connectivity_state_destroy(&state_tracker_);
175   grpc_subchannel_index_unref();
176 }
177 
HandOffPendingPicksLocked(LoadBalancingPolicy * new_policy)178 void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
179   PickState* pick;
180   while ((pick = pending_picks_) != nullptr) {
181     pending_picks_ = pick->next;
182     grpc_error* error = GRPC_ERROR_NONE;
183     if (new_policy->PickLocked(pick, &error)) {
184       // Synchronous return, schedule closure.
185       GRPC_CLOSURE_SCHED(pick->on_complete, error);
186     }
187   }
188 }
189 
ShutdownLocked()190 void PickFirst::ShutdownLocked() {
191   AutoChildRefsUpdater guard(this);
192   grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
193   if (grpc_lb_pick_first_trace.enabled()) {
194     gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
195   }
196   shutdown_ = true;
197   PickState* pick;
198   while ((pick = pending_picks_) != nullptr) {
199     pending_picks_ = pick->next;
200     pick->connected_subchannel.reset();
201     GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
202   }
203   grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
204                               GRPC_ERROR_REF(error), "shutdown");
205   subchannel_list_.reset();
206   latest_pending_subchannel_list_.reset();
207   TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED);
208   GRPC_ERROR_UNREF(error);
209 }
210 
CancelPickLocked(PickState * pick,grpc_error * error)211 void PickFirst::CancelPickLocked(PickState* pick, grpc_error* error) {
212   PickState* pp = pending_picks_;
213   pending_picks_ = nullptr;
214   while (pp != nullptr) {
215     PickState* next = pp->next;
216     if (pp == pick) {
217       pick->connected_subchannel.reset();
218       GRPC_CLOSURE_SCHED(pick->on_complete,
219                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
220                              "Pick Cancelled", &error, 1));
221     } else {
222       pp->next = pending_picks_;
223       pending_picks_ = pp;
224     }
225     pp = next;
226   }
227   GRPC_ERROR_UNREF(error);
228 }
229 
CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,uint32_t initial_metadata_flags_eq,grpc_error * error)230 void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
231                                           uint32_t initial_metadata_flags_eq,
232                                           grpc_error* error) {
233   PickState* pick = pending_picks_;
234   pending_picks_ = nullptr;
235   while (pick != nullptr) {
236     PickState* next = pick->next;
237     if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
238         initial_metadata_flags_eq) {
239       GRPC_CLOSURE_SCHED(pick->on_complete,
240                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
241                              "Pick Cancelled", &error, 1));
242     } else {
243       pick->next = pending_picks_;
244       pending_picks_ = pick;
245     }
246     pick = next;
247   }
248   GRPC_ERROR_UNREF(error);
249 }
250 
StartPickingLocked()251 void PickFirst::StartPickingLocked() {
252   started_picking_ = true;
253   if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels() > 0) {
254     subchannel_list_->subchannel(0)
255         ->CheckConnectivityStateAndStartWatchingLocked();
256   }
257 }
258 
ExitIdleLocked()259 void PickFirst::ExitIdleLocked() {
260   if (!started_picking_) {
261     StartPickingLocked();
262   }
263 }
264 
ResetBackoffLocked()265 void PickFirst::ResetBackoffLocked() {
266   subchannel_list_->ResetBackoffLocked();
267   if (latest_pending_subchannel_list_ != nullptr) {
268     latest_pending_subchannel_list_->ResetBackoffLocked();
269   }
270 }
271 
PickLocked(PickState * pick,grpc_error ** error)272 bool PickFirst::PickLocked(PickState* pick, grpc_error** error) {
273   // If we have a selected subchannel already, return synchronously.
274   if (selected_ != nullptr) {
275     pick->connected_subchannel = selected_->connected_subchannel()->Ref();
276     return true;
277   }
278   // No subchannel selected yet, so handle asynchronously.
279   if (pick->on_complete == nullptr) {
280     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
281         "No pick result available but synchronous result required.");
282     return true;
283   }
284   pick->next = pending_picks_;
285   pending_picks_ = pick;
286   if (!started_picking_) {
287     StartPickingLocked();
288   }
289   return false;
290 }
291 
CheckConnectivityLocked(grpc_error ** error)292 grpc_connectivity_state PickFirst::CheckConnectivityLocked(grpc_error** error) {
293   return grpc_connectivity_state_get(&state_tracker_, error);
294 }
295 
NotifyOnStateChangeLocked(grpc_connectivity_state * current,grpc_closure * notify)296 void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
297                                           grpc_closure* notify) {
298   grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
299                                                  notify);
300 }
301 
FillChildRefsForChannelz(ChildRefsList * child_subchannels_to_fill,ChildRefsList * ignored)302 void PickFirst::FillChildRefsForChannelz(
303     ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
304   MutexLock lock(&child_refs_mu_);
305   for (size_t i = 0; i < child_subchannels_.size(); ++i) {
306     // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
307     // have to implement lightweight set. For now, we don't care about
308     // performance when channelz requests are made.
309     bool found = false;
310     for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
311       if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
312         found = true;
313         break;
314       }
315     }
316     if (!found) {
317       child_subchannels_to_fill->push_back(child_subchannels_[i]);
318     }
319   }
320 }
321 
UpdateChildRefsLocked()322 void PickFirst::UpdateChildRefsLocked() {
323   ChildRefsList cs;
324   if (subchannel_list_ != nullptr) {
325     subchannel_list_->PopulateChildRefsList(&cs);
326   }
327   if (latest_pending_subchannel_list_ != nullptr) {
328     latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
329   }
330   // atomically update the data that channelz will actually be looking at.
331   MutexLock lock(&child_refs_mu_);
332   child_subchannels_ = std::move(cs);
333 }
334 
UpdateLocked(const grpc_channel_args & args)335 void PickFirst::UpdateLocked(const grpc_channel_args& args) {
336   AutoChildRefsUpdater guard(this);
337   const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
338   if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
339     if (subchannel_list_ == nullptr) {
340       // If we don't have a current subchannel list, go into TRANSIENT FAILURE.
341       grpc_connectivity_state_set(
342           &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
343           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
344           "pf_update_missing");
345     } else {
346       // otherwise, keep using the current subchannel list (ignore this update).
347       gpr_log(GPR_ERROR,
348               "No valid LB addresses channel arg for Pick First %p update, "
349               "ignoring.",
350               this);
351     }
352     return;
353   }
354   const grpc_lb_addresses* addresses =
355       static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
356   if (grpc_lb_pick_first_trace.enabled()) {
357     gpr_log(GPR_INFO,
358             "Pick First %p received update with %" PRIuPTR " addresses", this,
359             addresses->num_addresses);
360   }
361   auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
362       this, &grpc_lb_pick_first_trace, addresses, combiner(),
363       client_channel_factory(), args);
364   if (subchannel_list->num_subchannels() == 0) {
365     // Empty update or no valid subchannels. Unsubscribe from all current
366     // subchannels and put the channel in TRANSIENT_FAILURE.
367     grpc_connectivity_state_set(
368         &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
369         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
370         "pf_update_empty");
371     subchannel_list_ = std::move(subchannel_list);  // Empty list.
372     selected_ = nullptr;
373     return;
374   }
375   if (selected_ == nullptr) {
376     // We don't yet have a selected subchannel, so replace the current
377     // subchannel list immediately.
378     subchannel_list_ = std::move(subchannel_list);
379     // If we've started picking, start trying to connect to the first
380     // subchannel in the new list.
381     if (started_picking_) {
382       subchannel_list_->subchannel(0)
383           ->CheckConnectivityStateAndStartWatchingLocked();
384     }
385   } else {
386     // We do have a selected subchannel.
387     // Check if it's present in the new list.  If so, we're done.
388     for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
389       PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
390       if (sd->subchannel() == selected_->subchannel()) {
391         // The currently selected subchannel is in the update: we are done.
392         if (grpc_lb_pick_first_trace.enabled()) {
393           gpr_log(GPR_INFO,
394                   "Pick First %p found already selected subchannel %p "
395                   "at update index %" PRIuPTR " of %" PRIuPTR "; update done",
396                   this, selected_->subchannel(), i,
397                   subchannel_list->num_subchannels());
398         }
399         // Make sure it's in state READY.  It might not be if we grabbed
400         // the combiner while a connectivity state notification
401         // informing us otherwise is pending.
402         // Note that CheckConnectivityStateLocked() also takes a ref to
403         // the connected subchannel.
404         grpc_error* error = GRPC_ERROR_NONE;
405         if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
406           selected_ = sd;
407           subchannel_list_ = std::move(subchannel_list);
408           sd->StartConnectivityWatchLocked();
409           // If there was a previously pending update (which may or may
410           // not have contained the currently selected subchannel), drop
411           // it, so that it doesn't override what we've done here.
412           latest_pending_subchannel_list_.reset();
413           return;
414         }
415         GRPC_ERROR_UNREF(error);
416       }
417     }
418     // Not keeping the previous selected subchannel, so set the latest
419     // pending subchannel list to the new subchannel list.  We will wait
420     // for it to report READY before swapping it into the current
421     // subchannel list.
422     if (latest_pending_subchannel_list_ != nullptr) {
423       if (grpc_lb_pick_first_trace.enabled()) {
424         gpr_log(GPR_INFO,
425                 "Pick First %p Shutting down latest pending subchannel list "
426                 "%p, about to be replaced by newer latest %p",
427                 this, latest_pending_subchannel_list_.get(),
428                 subchannel_list.get());
429       }
430     }
431     latest_pending_subchannel_list_ = std::move(subchannel_list);
432     // If we've started picking, start trying to connect to the first
433     // subchannel in the new list.
434     if (started_picking_) {
435       latest_pending_subchannel_list_->subchannel(0)
436           ->CheckConnectivityStateAndStartWatchingLocked();
437     }
438   }
439 }
440 
ProcessConnectivityChangeLocked(grpc_connectivity_state connectivity_state,grpc_error * error)441 void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
442     grpc_connectivity_state connectivity_state, grpc_error* error) {
443   PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
444   AutoChildRefsUpdater guard(p);
445   // The notification must be for a subchannel in either the current or
446   // latest pending subchannel lists.
447   GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
448              subchannel_list() == p->latest_pending_subchannel_list_.get());
449   GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
450   // Handle updates for the currently selected subchannel.
451   if (p->selected_ == this) {
452     if (grpc_lb_pick_first_trace.enabled()) {
453       gpr_log(GPR_INFO,
454               "Pick First %p connectivity changed for selected subchannel", p);
455     }
456     // If the new state is anything other than READY and there is a
457     // pending update, switch to the pending update.
458     if (connectivity_state != GRPC_CHANNEL_READY &&
459         p->latest_pending_subchannel_list_ != nullptr) {
460       if (grpc_lb_pick_first_trace.enabled()) {
461         gpr_log(GPR_INFO,
462                 "Pick First %p promoting pending subchannel list %p to "
463                 "replace %p",
464                 p, p->latest_pending_subchannel_list_.get(),
465                 p->subchannel_list_.get());
466       }
467       p->selected_ = nullptr;
468       StopConnectivityWatchLocked();
469       p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
470       grpc_connectivity_state_set(
471           &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
472           error != GRPC_ERROR_NONE
473               ? GRPC_ERROR_REF(error)
474               : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
475                     "selected subchannel not ready; switching to pending "
476                     "update"),
477           "selected_not_ready+switch_to_update");
478     } else {
479       if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
480         // If the selected subchannel goes bad, request a re-resolution. We also
481         // set the channel state to IDLE and reset started_picking_. The reason
482         // is that if the new state is TRANSIENT_FAILURE due to a GOAWAY
483         // reception we don't want to connect to the re-resolved backends until
484         // we leave the IDLE state.
485         grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
486                                     GRPC_ERROR_NONE,
487                                     "selected_changed+reresolve");
488         p->started_picking_ = false;
489         p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
490         // In transient failure. Rely on re-resolution to recover.
491         p->selected_ = nullptr;
492         StopConnectivityWatchLocked();
493       } else {
494         grpc_connectivity_state_set(&p->state_tracker_, connectivity_state,
495                                     GRPC_ERROR_REF(error), "selected_changed");
496         // Renew notification.
497         RenewConnectivityWatchLocked();
498       }
499     }
500     GRPC_ERROR_UNREF(error);
501     return;
502   }
503   // If we get here, there are two possible cases:
504   // 1. We do not currently have a selected subchannel, and the update is
505   //    for a subchannel in p->subchannel_list_ that we're trying to
506   //    connect to.  The goal here is to find a subchannel that we can
507   //    select.
508   // 2. We do currently have a selected subchannel, and the update is
509   //    for a subchannel in p->latest_pending_subchannel_list_.  The
510   //    goal here is to find a subchannel from the update that we can
511   //    select in place of the current one.
512   switch (connectivity_state) {
513     case GRPC_CHANNEL_READY: {
514       ProcessUnselectedReadyLocked();
515       // Renew notification.
516       RenewConnectivityWatchLocked();
517       break;
518     }
519     case GRPC_CHANNEL_TRANSIENT_FAILURE: {
520       StopConnectivityWatchLocked();
521       PickFirstSubchannelData* sd = this;
522       size_t next_index =
523           (sd->Index() + 1) % subchannel_list()->num_subchannels();
524       sd = subchannel_list()->subchannel(next_index);
525       // Case 1: Only set state to TRANSIENT_FAILURE if we've tried
526       // all subchannels.
527       if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
528         p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
529         grpc_connectivity_state_set(
530             &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
531             GRPC_ERROR_REF(error), "exhausted_subchannels");
532       }
533       sd->CheckConnectivityStateAndStartWatchingLocked();
534       break;
535     }
536     case GRPC_CHANNEL_CONNECTING:
537     case GRPC_CHANNEL_IDLE: {
538       // Only update connectivity state in case 1.
539       if (subchannel_list() == p->subchannel_list_.get()) {
540         grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
541                                     GRPC_ERROR_REF(error),
542                                     "connecting_changed");
543       }
544       // Renew notification.
545       RenewConnectivityWatchLocked();
546       break;
547     }
548     case GRPC_CHANNEL_SHUTDOWN:
549       GPR_UNREACHABLE_CODE(break);
550   }
551   GRPC_ERROR_UNREF(error);
552 }
553 
ProcessUnselectedReadyLocked()554 void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
555   PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
556   // If we get here, there are two possible cases:
557   // 1. We do not currently have a selected subchannel, and the update is
558   //    for a subchannel in p->subchannel_list_ that we're trying to
559   //    connect to.  The goal here is to find a subchannel that we can
560   //    select.
561   // 2. We do currently have a selected subchannel, and the update is
562   //    for a subchannel in p->latest_pending_subchannel_list_.  The
563   //    goal here is to find a subchannel from the update that we can
564   //    select in place of the current one.
565   GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
566              subchannel_list() == p->latest_pending_subchannel_list_.get());
567   // Case 2.  Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
568   if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
569     if (grpc_lb_pick_first_trace.enabled()) {
570       gpr_log(GPR_INFO,
571               "Pick First %p promoting pending subchannel list %p to "
572               "replace %p",
573               p, p->latest_pending_subchannel_list_.get(),
574               p->subchannel_list_.get());
575     }
576     p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
577   }
578   // Cases 1 and 2.
579   grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
580                               GRPC_ERROR_NONE, "subchannel_ready");
581   p->selected_ = this;
582   if (grpc_lb_pick_first_trace.enabled()) {
583     gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
584   }
585   // Update any calls that were waiting for a pick.
586   PickState* pick;
587   while ((pick = p->pending_picks_)) {
588     p->pending_picks_ = pick->next;
589     pick->connected_subchannel = p->selected_->connected_subchannel()->Ref();
590     if (grpc_lb_pick_first_trace.enabled()) {
591       gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p",
592               p->selected_->subchannel());
593     }
594     GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
595   }
596 }
597 
598 void PickFirst::PickFirstSubchannelData::
CheckConnectivityStateAndStartWatchingLocked()599     CheckConnectivityStateAndStartWatchingLocked() {
600   PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
601   grpc_error* error = GRPC_ERROR_NONE;
602   if (p->selected_ != this &&
603       CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
604     // We must process the READY subchannel before we start watching it.
605     // Otherwise, we won't know it's READY because we will be waiting for its
606     // connectivity state to change from READY.
607     ProcessUnselectedReadyLocked();
608   }
609   GRPC_ERROR_UNREF(error);
610   StartConnectivityWatchLocked();
611 }
612 
613 //
614 // factory
615 //
616 
617 class PickFirstFactory : public LoadBalancingPolicyFactory {
618  public:
CreateLoadBalancingPolicy(const LoadBalancingPolicy::Args & args) const619   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
620       const LoadBalancingPolicy::Args& args) const override {
621     return OrphanablePtr<LoadBalancingPolicy>(New<PickFirst>(args));
622   }
623 
name() const624   const char* name() const override { return "pick_first"; }
625 };
626 
627 }  // namespace
628 
629 }  // namespace grpc_core
630 
grpc_lb_policy_pick_first_init()631 void grpc_lb_policy_pick_first_init() {
632   grpc_core::LoadBalancingPolicyRegistry::Builder::
633       RegisterLoadBalancingPolicyFactory(
634           grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
635               grpc_core::New<grpc_core::PickFirstFactory>()));
636 }
637 
grpc_lb_policy_pick_first_shutdown()638 void grpc_lb_policy_pick_first_shutdown() {}
639