• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /** Round Robin Policy.
20  *
21  * Before every pick, the \a get_next_ready_subchannel_index_locked function
22  * returns the p->subchannel_list->subchannels index for next subchannel,
23  * respecting the relative order of the addresses provided upon creation or
24  * updates. Note however that updates will start picking from the beginning of
25  * the updated list. */
26 
27 #include <grpc/support/port_platform.h>
28 
29 #include <string.h>
30 
31 #include <grpc/support/alloc.h>
32 
33 #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
34 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
35 #include "src/core/ext/filters/client_channel/subchannel.h"
36 #include "src/core/ext/filters/client_channel/subchannel_index.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/debug/trace.h"
39 #include "src/core/lib/gprpp/mutex_lock.h"
40 #include "src/core/lib/gprpp/ref_counted_ptr.h"
41 #include "src/core/lib/iomgr/combiner.h"
42 #include "src/core/lib/iomgr/sockaddr_utils.h"
43 #include "src/core/lib/transport/connectivity_state.h"
44 #include "src/core/lib/transport/static_metadata.h"
45 
46 namespace grpc_core {
47 
48 TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
49 
50 namespace {
51 
52 //
53 // round_robin LB policy
54 //
55 
56 class RoundRobin : public LoadBalancingPolicy {
57  public:
58   explicit RoundRobin(const Args& args);
59 
60   void UpdateLocked(const grpc_channel_args& args) override;
61   bool PickLocked(PickState* pick, grpc_error** error) override;
62   void CancelPickLocked(PickState* pick, grpc_error* error) override;
63   void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
64                                  uint32_t initial_metadata_flags_eq,
65                                  grpc_error* error) override;
66   void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
67                                  grpc_closure* closure) override;
68   grpc_connectivity_state CheckConnectivityLocked(
69       grpc_error** connectivity_error) override;
70   void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
71   void ExitIdleLocked() override;
72   void ResetBackoffLocked() override;
73   void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
74                                 ChildRefsList* ignored) override;
75 
76  private:
77   ~RoundRobin();
78 
79   // Forward declaration.
80   class RoundRobinSubchannelList;
81 
82   // Data for a particular subchannel in a subchannel list.
83   // This subclass adds the following functionality:
84   // - Tracks user_data associated with each address, which will be
85   //   returned along with picks that select the subchannel.
86   // - Tracks the previous connectivity state of the subchannel, so that
87   //   we know how many subchannels are in each state.
88   class RoundRobinSubchannelData
89       : public SubchannelData<RoundRobinSubchannelList,
90                               RoundRobinSubchannelData> {
91    public:
RoundRobinSubchannelData(SubchannelList<RoundRobinSubchannelList,RoundRobinSubchannelData> * subchannel_list,const grpc_lb_user_data_vtable * user_data_vtable,const grpc_lb_address & address,grpc_subchannel * subchannel,grpc_combiner * combiner)92     RoundRobinSubchannelData(
93         SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
94             subchannel_list,
95         const grpc_lb_user_data_vtable* user_data_vtable,
96         const grpc_lb_address& address, grpc_subchannel* subchannel,
97         grpc_combiner* combiner)
98         : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
99                          combiner),
100           user_data_vtable_(user_data_vtable),
101           user_data_(user_data_vtable_ != nullptr
102                          ? user_data_vtable_->copy(address.user_data)
103                          : nullptr) {}
104 
UnrefSubchannelLocked(const char * reason)105     void UnrefSubchannelLocked(const char* reason) override {
106       SubchannelData::UnrefSubchannelLocked(reason);
107       if (user_data_ != nullptr) {
108         GPR_ASSERT(user_data_vtable_ != nullptr);
109         user_data_vtable_->destroy(user_data_);
110         user_data_ = nullptr;
111       }
112     }
113 
user_data() const114     void* user_data() const { return user_data_; }
115 
connectivity_state() const116     grpc_connectivity_state connectivity_state() const {
117       return last_connectivity_state_;
118     }
119 
120     void UpdateConnectivityStateLocked(
121         grpc_connectivity_state connectivity_state, grpc_error* error);
122 
123    private:
124     void ProcessConnectivityChangeLocked(
125         grpc_connectivity_state connectivity_state, grpc_error* error) override;
126 
127     const grpc_lb_user_data_vtable* user_data_vtable_;
128     void* user_data_ = nullptr;
129     grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
130   };
131 
132   // A list of subchannels.
133   class RoundRobinSubchannelList
134       : public SubchannelList<RoundRobinSubchannelList,
135                               RoundRobinSubchannelData> {
136    public:
RoundRobinSubchannelList(RoundRobin * policy,TraceFlag * tracer,const grpc_lb_addresses * addresses,grpc_combiner * combiner,grpc_client_channel_factory * client_channel_factory,const grpc_channel_args & args)137     RoundRobinSubchannelList(
138         RoundRobin* policy, TraceFlag* tracer,
139         const grpc_lb_addresses* addresses, grpc_combiner* combiner,
140         grpc_client_channel_factory* client_channel_factory,
141         const grpc_channel_args& args)
142         : SubchannelList(policy, tracer, addresses, combiner,
143                          client_channel_factory, args),
144           last_ready_index_(num_subchannels() - 1) {
145       // Need to maintain a ref to the LB policy as long as we maintain
146       // any references to subchannels, since the subchannels'
147       // pollset_sets will include the LB policy's pollset_set.
148       policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
149     }
150 
~RoundRobinSubchannelList()151     ~RoundRobinSubchannelList() {
152       GRPC_ERROR_UNREF(last_transient_failure_error_);
153       RoundRobin* p = static_cast<RoundRobin*>(policy());
154       p->Unref(DEBUG_LOCATION, "subchannel_list");
155     }
156 
157     // Starts watching the subchannels in this list.
158     void StartWatchingLocked();
159 
160     // Updates the counters of subchannels in each state when a
161     // subchannel transitions from old_state to new_state.
162     // transient_failure_error is the error that is reported when
163     // new_state is TRANSIENT_FAILURE.
164     void UpdateStateCountersLocked(grpc_connectivity_state old_state,
165                                    grpc_connectivity_state new_state,
166                                    grpc_error* transient_failure_error);
167 
168     // If this subchannel list is the RR policy's current subchannel
169     // list, updates the RR policy's connectivity state based on the
170     // subchannel list's state counters.
171     void MaybeUpdateRoundRobinConnectivityStateLocked();
172 
173     // Updates the RR policy's overall state based on the counters of
174     // subchannels in each state.
175     void UpdateRoundRobinStateFromSubchannelStateCountsLocked();
176 
177     size_t GetNextReadySubchannelIndexLocked();
178     void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
179 
180    private:
181     size_t num_ready_ = 0;
182     size_t num_connecting_ = 0;
183     size_t num_transient_failure_ = 0;
184     grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE;
185     size_t last_ready_index_;  // Index into list of last pick.
186   };
187 
188   // Helper class to ensure that any function that modifies the child refs
189   // data structures will update the channelz snapshot data structures before
190   // returning.
191   class AutoChildRefsUpdater {
192    public:
AutoChildRefsUpdater(RoundRobin * rr)193     explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {}
~AutoChildRefsUpdater()194     ~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); }
195 
196    private:
197     RoundRobin* rr_;
198   };
199 
200   void ShutdownLocked() override;
201 
202   void StartPickingLocked();
203   bool DoPickLocked(PickState* pick);
204   void DrainPendingPicksLocked();
205   void UpdateChildRefsLocked();
206 
207   /** list of subchannels */
208   OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
209   /** Latest version of the subchannel list.
210    * Subchannel connectivity callbacks will only promote updated subchannel
211    * lists if they equal \a latest_pending_subchannel_list. In other words,
212    * racing callbacks that reference outdated subchannel lists won't perform any
213    * update. */
214   OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
215   /** have we started picking? */
216   bool started_picking_ = false;
217   /** are we shutting down? */
218   bool shutdown_ = false;
219   /** List of picks that are waiting on connectivity */
220   PickState* pending_picks_ = nullptr;
221   /** our connectivity state tracker */
222   grpc_connectivity_state_tracker state_tracker_;
223   /// Lock and data used to capture snapshots of this channel's child
224   /// channels and subchannels. This data is consumed by channelz.
225   gpr_mu child_refs_mu_;
226   ChildRefsList child_subchannels_;
227   ChildRefsList child_channels_;
228 };
229 
RoundRobin(const Args & args)230 RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
231   GPR_ASSERT(args.client_channel_factory != nullptr);
232   gpr_mu_init(&child_refs_mu_);
233   grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
234                                "round_robin");
235   UpdateLocked(*args.args);
236   if (grpc_lb_round_robin_trace.enabled()) {
237     gpr_log(GPR_INFO, "[RR %p] Created with %" PRIuPTR " subchannels", this,
238             subchannel_list_->num_subchannels());
239   }
240   grpc_subchannel_index_ref();
241 }
242 
~RoundRobin()243 RoundRobin::~RoundRobin() {
244   if (grpc_lb_round_robin_trace.enabled()) {
245     gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
246   }
247   gpr_mu_destroy(&child_refs_mu_);
248   GPR_ASSERT(subchannel_list_ == nullptr);
249   GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
250   GPR_ASSERT(pending_picks_ == nullptr);
251   grpc_connectivity_state_destroy(&state_tracker_);
252   grpc_subchannel_index_unref();
253 }
254 
HandOffPendingPicksLocked(LoadBalancingPolicy * new_policy)255 void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
256   PickState* pick;
257   while ((pick = pending_picks_) != nullptr) {
258     pending_picks_ = pick->next;
259     grpc_error* error = GRPC_ERROR_NONE;
260     if (new_policy->PickLocked(pick, &error)) {
261       // Synchronous return, schedule closure.
262       GRPC_CLOSURE_SCHED(pick->on_complete, error);
263     }
264   }
265 }
266 
ShutdownLocked()267 void RoundRobin::ShutdownLocked() {
268   AutoChildRefsUpdater guard(this);
269   grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
270   if (grpc_lb_round_robin_trace.enabled()) {
271     gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
272   }
273   shutdown_ = true;
274   PickState* pick;
275   while ((pick = pending_picks_) != nullptr) {
276     pending_picks_ = pick->next;
277     pick->connected_subchannel.reset();
278     GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
279   }
280   grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
281                               GRPC_ERROR_REF(error), "rr_shutdown");
282   subchannel_list_.reset();
283   latest_pending_subchannel_list_.reset();
284   TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED);
285   GRPC_ERROR_UNREF(error);
286 }
287 
CancelPickLocked(PickState * pick,grpc_error * error)288 void RoundRobin::CancelPickLocked(PickState* pick, grpc_error* error) {
289   PickState* pp = pending_picks_;
290   pending_picks_ = nullptr;
291   while (pp != nullptr) {
292     PickState* next = pp->next;
293     if (pp == pick) {
294       pick->connected_subchannel.reset();
295       GRPC_CLOSURE_SCHED(pick->on_complete,
296                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
297                              "Pick Cancelled", &error, 1));
298     } else {
299       pp->next = pending_picks_;
300       pending_picks_ = pp;
301     }
302     pp = next;
303   }
304   GRPC_ERROR_UNREF(error);
305 }
306 
CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,uint32_t initial_metadata_flags_eq,grpc_error * error)307 void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
308                                            uint32_t initial_metadata_flags_eq,
309                                            grpc_error* error) {
310   PickState* pick = pending_picks_;
311   pending_picks_ = nullptr;
312   while (pick != nullptr) {
313     PickState* next = pick->next;
314     if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
315         initial_metadata_flags_eq) {
316       pick->connected_subchannel.reset();
317       GRPC_CLOSURE_SCHED(pick->on_complete,
318                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
319                              "Pick Cancelled", &error, 1));
320     } else {
321       pick->next = pending_picks_;
322       pending_picks_ = pick;
323     }
324     pick = next;
325   }
326   GRPC_ERROR_UNREF(error);
327 }
328 
StartPickingLocked()329 void RoundRobin::StartPickingLocked() {
330   started_picking_ = true;
331   subchannel_list_->StartWatchingLocked();
332 }
333 
ExitIdleLocked()334 void RoundRobin::ExitIdleLocked() {
335   if (!started_picking_) {
336     StartPickingLocked();
337   }
338 }
339 
ResetBackoffLocked()340 void RoundRobin::ResetBackoffLocked() {
341   subchannel_list_->ResetBackoffLocked();
342   if (latest_pending_subchannel_list_ != nullptr) {
343     latest_pending_subchannel_list_->ResetBackoffLocked();
344   }
345 }
346 
DoPickLocked(PickState * pick)347 bool RoundRobin::DoPickLocked(PickState* pick) {
348   const size_t next_ready_index =
349       subchannel_list_->GetNextReadySubchannelIndexLocked();
350   if (next_ready_index < subchannel_list_->num_subchannels()) {
351     /* readily available, report right away */
352     RoundRobinSubchannelData* sd =
353         subchannel_list_->subchannel(next_ready_index);
354     GPR_ASSERT(sd->connected_subchannel() != nullptr);
355     pick->connected_subchannel = sd->connected_subchannel()->Ref();
356     if (pick->user_data != nullptr) {
357       *pick->user_data = sd->user_data();
358     }
359     if (grpc_lb_round_robin_trace.enabled()) {
360       gpr_log(GPR_INFO,
361               "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
362               "index %" PRIuPTR ")",
363               this, sd->subchannel(), pick->connected_subchannel.get(),
364               sd->subchannel_list(), next_ready_index);
365     }
366     /* only advance the last picked pointer if the selection was used */
367     subchannel_list_->UpdateLastReadySubchannelIndexLocked(next_ready_index);
368     return true;
369   }
370   return false;
371 }
372 
DrainPendingPicksLocked()373 void RoundRobin::DrainPendingPicksLocked() {
374   PickState* pick;
375   while ((pick = pending_picks_)) {
376     pending_picks_ = pick->next;
377     GPR_ASSERT(DoPickLocked(pick));
378     GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
379   }
380 }
381 
PickLocked(PickState * pick,grpc_error ** error)382 bool RoundRobin::PickLocked(PickState* pick, grpc_error** error) {
383   if (grpc_lb_round_robin_trace.enabled()) {
384     gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_);
385   }
386   GPR_ASSERT(!shutdown_);
387   if (subchannel_list_ != nullptr) {
388     if (DoPickLocked(pick)) return true;
389   }
390   if (pick->on_complete == nullptr) {
391     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
392         "No pick result available but synchronous result required.");
393     return true;
394   }
395   /* no pick currently available. Save for later in list of pending picks */
396   pick->next = pending_picks_;
397   pending_picks_ = pick;
398   if (!started_picking_) {
399     StartPickingLocked();
400   }
401   return false;
402 }
403 
FillChildRefsForChannelz(ChildRefsList * child_subchannels_to_fill,ChildRefsList * ignored)404 void RoundRobin::FillChildRefsForChannelz(
405     ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
406   MutexLock lock(&child_refs_mu_);
407   for (size_t i = 0; i < child_subchannels_.size(); ++i) {
408     // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
409     // have to implement lightweight set. For now, we don't care about
410     // performance when channelz requests are made.
411     bool found = false;
412     for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
413       if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
414         found = true;
415         break;
416       }
417     }
418     if (!found) {
419       child_subchannels_to_fill->push_back(child_subchannels_[i]);
420     }
421   }
422 }
423 
UpdateChildRefsLocked()424 void RoundRobin::UpdateChildRefsLocked() {
425   ChildRefsList cs;
426   if (subchannel_list_ != nullptr) {
427     subchannel_list_->PopulateChildRefsList(&cs);
428   }
429   if (latest_pending_subchannel_list_ != nullptr) {
430     latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
431   }
432   // atomically update the data that channelz will actually be looking at.
433   MutexLock lock(&child_refs_mu_);
434   child_subchannels_ = std::move(cs);
435 }
436 
StartWatchingLocked()437 void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
438   if (num_subchannels() == 0) return;
439   // Check current state of each subchannel synchronously, since any
440   // subchannel already used by some other channel may have a non-IDLE
441   // state.
442   for (size_t i = 0; i < num_subchannels(); ++i) {
443     grpc_error* error = GRPC_ERROR_NONE;
444     grpc_connectivity_state state =
445         subchannel(i)->CheckConnectivityStateLocked(&error);
446     if (state != GRPC_CHANNEL_IDLE) {
447       subchannel(i)->UpdateConnectivityStateLocked(state, error);
448     }
449   }
450   // Now set the LB policy's state based on the subchannels' states.
451   UpdateRoundRobinStateFromSubchannelStateCountsLocked();
452   // Start connectivity watch for each subchannel.
453   for (size_t i = 0; i < num_subchannels(); i++) {
454     if (subchannel(i)->subchannel() != nullptr) {
455       subchannel(i)->StartConnectivityWatchLocked();
456     }
457   }
458 }
459 
UpdateStateCountersLocked(grpc_connectivity_state old_state,grpc_connectivity_state new_state,grpc_error * transient_failure_error)460 void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
461     grpc_connectivity_state old_state, grpc_connectivity_state new_state,
462     grpc_error* transient_failure_error) {
463   GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN);
464   GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
465   if (old_state == GRPC_CHANNEL_READY) {
466     GPR_ASSERT(num_ready_ > 0);
467     --num_ready_;
468   } else if (old_state == GRPC_CHANNEL_CONNECTING) {
469     GPR_ASSERT(num_connecting_ > 0);
470     --num_connecting_;
471   } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
472     GPR_ASSERT(num_transient_failure_ > 0);
473     --num_transient_failure_;
474   }
475   if (new_state == GRPC_CHANNEL_READY) {
476     ++num_ready_;
477   } else if (new_state == GRPC_CHANNEL_CONNECTING) {
478     ++num_connecting_;
479   } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
480     ++num_transient_failure_;
481   }
482   GRPC_ERROR_UNREF(last_transient_failure_error_);
483   last_transient_failure_error_ = transient_failure_error;
484 }
485 
486 // Sets the RR policy's connectivity state based on the current
487 // subchannel list.
488 void RoundRobin::RoundRobinSubchannelList::
MaybeUpdateRoundRobinConnectivityStateLocked()489     MaybeUpdateRoundRobinConnectivityStateLocked() {
490   RoundRobin* p = static_cast<RoundRobin*>(policy());
491   // Only set connectivity state if this is the current subchannel list.
492   if (p->subchannel_list_.get() != this) return;
493   /* In priority order. The first rule to match terminates the search (ie, if we
494    * are on rule n, all previous rules were unfulfilled).
495    *
496    * 1) RULE: ANY subchannel is READY => policy is READY.
497    *    CHECK: subchannel_list->num_ready > 0.
498    *
499    * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
500    *    CHECK: sd->curr_connectivity_state == CONNECTING.
501    *
502    * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
503    *                                                   TRANSIENT_FAILURE.
504    *    CHECK: subchannel_list->num_transient_failures ==
505    *           subchannel_list->num_subchannels.
506    */
507   if (num_ready_ > 0) {
508     /* 1) READY */
509     grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
510                                 GRPC_ERROR_NONE, "rr_ready");
511   } else if (num_connecting_ > 0) {
512     /* 2) CONNECTING */
513     grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
514                                 GRPC_ERROR_NONE, "rr_connecting");
515   } else if (num_transient_failure_ == num_subchannels()) {
516     /* 3) TRANSIENT_FAILURE */
517     grpc_connectivity_state_set(&p->state_tracker_,
518                                 GRPC_CHANNEL_TRANSIENT_FAILURE,
519                                 GRPC_ERROR_REF(last_transient_failure_error_),
520                                 "rr_exhausted_subchannels");
521   }
522 }
523 
524 void RoundRobin::RoundRobinSubchannelList::
UpdateRoundRobinStateFromSubchannelStateCountsLocked()525     UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
526   RoundRobin* p = static_cast<RoundRobin*>(policy());
527   AutoChildRefsUpdater guard(p);
528   if (num_ready_ > 0) {
529     if (p->subchannel_list_.get() != this) {
530       // Promote this list to p->subchannel_list_.
531       // This list must be p->latest_pending_subchannel_list_, because
532       // any previous update would have been shut down already and
533       // therefore we would not be receiving a notification for them.
534       GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this);
535       GPR_ASSERT(!shutting_down());
536       if (grpc_lb_round_robin_trace.enabled()) {
537         const size_t old_num_subchannels =
538             p->subchannel_list_ != nullptr
539                 ? p->subchannel_list_->num_subchannels()
540                 : 0;
541         gpr_log(GPR_INFO,
542                 "[RR %p] phasing out subchannel list %p (size %" PRIuPTR
543                 ") in favor of %p (size %" PRIuPTR ")",
544                 p, p->subchannel_list_.get(), old_num_subchannels, this,
545                 num_subchannels());
546       }
547       p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
548     }
549     // Drain pending picks.
550     p->DrainPendingPicksLocked();
551   }
552   // Update the RR policy's connectivity state if needed.
553   MaybeUpdateRoundRobinConnectivityStateLocked();
554 }
555 
UpdateConnectivityStateLocked(grpc_connectivity_state connectivity_state,grpc_error * error)556 void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
557     grpc_connectivity_state connectivity_state, grpc_error* error) {
558   RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
559   if (grpc_lb_round_robin_trace.enabled()) {
560     gpr_log(
561         GPR_INFO,
562         "[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
563         "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
564         p, subchannel(), subchannel_list(), Index(),
565         subchannel_list()->num_subchannels(),
566         grpc_connectivity_state_name(last_connectivity_state_),
567         grpc_connectivity_state_name(connectivity_state));
568   }
569   subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
570                                                connectivity_state, error);
571   last_connectivity_state_ = connectivity_state;
572 }
573 
ProcessConnectivityChangeLocked(grpc_connectivity_state connectivity_state,grpc_error * error)574 void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
575     grpc_connectivity_state connectivity_state, grpc_error* error) {
576   RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
577   GPR_ASSERT(subchannel() != nullptr);
578   // If the new state is TRANSIENT_FAILURE, re-resolve.
579   // Only do this if we've started watching, not at startup time.
580   // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
581   // when the subchannel list was created, we'd wind up in a constant
582   // loop of re-resolution.
583   if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
584     if (grpc_lb_round_robin_trace.enabled()) {
585       gpr_log(GPR_INFO,
586               "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
587               "Requesting re-resolution",
588               p, subchannel());
589     }
590     p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
591   }
592   // Update state counters.
593   UpdateConnectivityStateLocked(connectivity_state, error);
594   // Update overall state and renew notification.
595   subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
596   RenewConnectivityWatchLocked();
597 }
598 
599 /** Returns the index into p->subchannel_list->subchannels of the next
600  * subchannel in READY state, or p->subchannel_list->num_subchannels if no
601  * subchannel is READY.
602  *
603  * Note that this function does *not* update p->last_ready_subchannel_index.
604  * The caller must do that if it returns a pick. */
605 size_t
GetNextReadySubchannelIndexLocked()606 RoundRobin::RoundRobinSubchannelList::GetNextReadySubchannelIndexLocked() {
607   if (grpc_lb_round_robin_trace.enabled()) {
608     gpr_log(GPR_INFO,
609             "[RR %p] getting next ready subchannel (out of %" PRIuPTR
610             "), last_ready_index=%" PRIuPTR,
611             policy(), num_subchannels(), last_ready_index_);
612   }
613   for (size_t i = 0; i < num_subchannels(); ++i) {
614     const size_t index = (i + last_ready_index_ + 1) % num_subchannels();
615     if (grpc_lb_round_robin_trace.enabled()) {
616       gpr_log(
617           GPR_INFO,
618           "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR
619           ": state=%s",
620           policy(), subchannel(index)->subchannel(), this, index,
621           grpc_connectivity_state_name(
622               subchannel(index)->connectivity_state()));
623     }
624     if (subchannel(index)->connectivity_state() == GRPC_CHANNEL_READY) {
625       if (grpc_lb_round_robin_trace.enabled()) {
626         gpr_log(GPR_INFO,
627                 "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR
628                 " of subchannel_list %p",
629                 policy(), subchannel(index)->subchannel(), index, this);
630       }
631       return index;
632     }
633   }
634   if (grpc_lb_round_robin_trace.enabled()) {
635     gpr_log(GPR_INFO, "[RR %p] no subchannels in ready state", this);
636   }
637   return num_subchannels();
638 }
639 
640 // Sets last_ready_index_ to last_ready_index.
UpdateLastReadySubchannelIndexLocked(size_t last_ready_index)641 void RoundRobin::RoundRobinSubchannelList::UpdateLastReadySubchannelIndexLocked(
642     size_t last_ready_index) {
643   GPR_ASSERT(last_ready_index < num_subchannels());
644   last_ready_index_ = last_ready_index;
645   if (grpc_lb_round_robin_trace.enabled()) {
646     gpr_log(GPR_INFO,
647             "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR
648             " (SC %p, CSC %p)",
649             policy(), last_ready_index,
650             subchannel(last_ready_index)->subchannel(),
651             subchannel(last_ready_index)->connected_subchannel());
652   }
653 }
654 
CheckConnectivityLocked(grpc_error ** error)655 grpc_connectivity_state RoundRobin::CheckConnectivityLocked(
656     grpc_error** error) {
657   return grpc_connectivity_state_get(&state_tracker_, error);
658 }
659 
NotifyOnStateChangeLocked(grpc_connectivity_state * current,grpc_closure * notify)660 void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
661                                            grpc_closure* notify) {
662   grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
663                                                  notify);
664 }
665 
UpdateLocked(const grpc_channel_args & args)666 void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
667   const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
668   AutoChildRefsUpdater guard(this);
669   if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
670     gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
671     // If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
672     // Otherwise, keep using the current subchannel list (ignore this update).
673     if (subchannel_list_ == nullptr) {
674       grpc_connectivity_state_set(
675           &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
676           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
677           "rr_update_missing");
678     }
679     return;
680   }
681   grpc_lb_addresses* addresses =
682       static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
683   if (grpc_lb_round_robin_trace.enabled()) {
684     gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
685             this, addresses->num_addresses);
686   }
687   // Replace latest_pending_subchannel_list_.
688   if (latest_pending_subchannel_list_ != nullptr) {
689     if (grpc_lb_round_robin_trace.enabled()) {
690       gpr_log(GPR_INFO,
691               "[RR %p] Shutting down previous pending subchannel list %p", this,
692               latest_pending_subchannel_list_.get());
693     }
694   }
695   latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
696       this, &grpc_lb_round_robin_trace, addresses, combiner(),
697       client_channel_factory(), args);
698   // If we haven't started picking yet or the new list is empty,
699   // immediately promote the new list to the current list.
700   if (!started_picking_ ||
701       latest_pending_subchannel_list_->num_subchannels() == 0) {
702     if (latest_pending_subchannel_list_->num_subchannels() == 0) {
703       grpc_connectivity_state_set(
704           &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
705           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
706           "rr_update_empty");
707     }
708     subchannel_list_ = std::move(latest_pending_subchannel_list_);
709   } else {
710     // If we've started picking, start watching the new list.
711     latest_pending_subchannel_list_->StartWatchingLocked();
712   }
713 }
714 
715 //
716 // factory
717 //
718 
719 class RoundRobinFactory : public LoadBalancingPolicyFactory {
720  public:
CreateLoadBalancingPolicy(const LoadBalancingPolicy::Args & args) const721   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
722       const LoadBalancingPolicy::Args& args) const override {
723     return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(args));
724   }
725 
name() const726   const char* name() const override { return "round_robin"; }
727 };
728 
729 }  // namespace
730 
731 }  // namespace grpc_core
732 
grpc_lb_policy_round_robin_init()733 void grpc_lb_policy_round_robin_init() {
734   grpc_core::LoadBalancingPolicyRegistry::Builder::
735       RegisterLoadBalancingPolicyFactory(
736           grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
737               grpc_core::New<grpc_core::RoundRobinFactory>()));
738 }
739 
grpc_lb_policy_round_robin_shutdown()740 void grpc_lb_policy_round_robin_shutdown() {}
741