1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /** Round Robin Policy.
20 *
21 * Before every pick, the \a get_next_ready_subchannel_index_locked function
22 * returns the p->subchannel_list->subchannels index for next subchannel,
23 * respecting the relative order of the addresses provided upon creation or
24 * updates. Note however that updates will start picking from the beginning of
25 * the updated list. */
26
27 #include <grpc/support/port_platform.h>
28
29 #include <string.h>
30
31 #include <grpc/support/alloc.h>
32
33 #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
34 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
35 #include "src/core/ext/filters/client_channel/subchannel.h"
36 #include "src/core/ext/filters/client_channel/subchannel_index.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/debug/trace.h"
39 #include "src/core/lib/gprpp/mutex_lock.h"
40 #include "src/core/lib/gprpp/ref_counted_ptr.h"
41 #include "src/core/lib/iomgr/combiner.h"
42 #include "src/core/lib/iomgr/sockaddr_utils.h"
43 #include "src/core/lib/transport/connectivity_state.h"
44 #include "src/core/lib/transport/static_metadata.h"
45
46 namespace grpc_core {
47
48 TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
49
50 namespace {
51
52 //
53 // round_robin LB policy
54 //
55
56 class RoundRobin : public LoadBalancingPolicy {
57 public:
58 explicit RoundRobin(const Args& args);
59
60 void UpdateLocked(const grpc_channel_args& args) override;
61 bool PickLocked(PickState* pick, grpc_error** error) override;
62 void CancelPickLocked(PickState* pick, grpc_error* error) override;
63 void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
64 uint32_t initial_metadata_flags_eq,
65 grpc_error* error) override;
66 void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
67 grpc_closure* closure) override;
68 grpc_connectivity_state CheckConnectivityLocked(
69 grpc_error** connectivity_error) override;
70 void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
71 void ExitIdleLocked() override;
72 void ResetBackoffLocked() override;
73 void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
74 ChildRefsList* ignored) override;
75
76 private:
77 ~RoundRobin();
78
79 // Forward declaration.
80 class RoundRobinSubchannelList;
81
82 // Data for a particular subchannel in a subchannel list.
83 // This subclass adds the following functionality:
84 // - Tracks user_data associated with each address, which will be
85 // returned along with picks that select the subchannel.
86 // - Tracks the previous connectivity state of the subchannel, so that
87 // we know how many subchannels are in each state.
88 class RoundRobinSubchannelData
89 : public SubchannelData<RoundRobinSubchannelList,
90 RoundRobinSubchannelData> {
91 public:
RoundRobinSubchannelData(SubchannelList<RoundRobinSubchannelList,RoundRobinSubchannelData> * subchannel_list,const grpc_lb_user_data_vtable * user_data_vtable,const grpc_lb_address & address,grpc_subchannel * subchannel,grpc_combiner * combiner)92 RoundRobinSubchannelData(
93 SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
94 subchannel_list,
95 const grpc_lb_user_data_vtable* user_data_vtable,
96 const grpc_lb_address& address, grpc_subchannel* subchannel,
97 grpc_combiner* combiner)
98 : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
99 combiner),
100 user_data_vtable_(user_data_vtable),
101 user_data_(user_data_vtable_ != nullptr
102 ? user_data_vtable_->copy(address.user_data)
103 : nullptr) {}
104
UnrefSubchannelLocked(const char * reason)105 void UnrefSubchannelLocked(const char* reason) override {
106 SubchannelData::UnrefSubchannelLocked(reason);
107 if (user_data_ != nullptr) {
108 GPR_ASSERT(user_data_vtable_ != nullptr);
109 user_data_vtable_->destroy(user_data_);
110 user_data_ = nullptr;
111 }
112 }
113
user_data() const114 void* user_data() const { return user_data_; }
115
connectivity_state() const116 grpc_connectivity_state connectivity_state() const {
117 return last_connectivity_state_;
118 }
119
120 void UpdateConnectivityStateLocked(
121 grpc_connectivity_state connectivity_state, grpc_error* error);
122
123 private:
124 void ProcessConnectivityChangeLocked(
125 grpc_connectivity_state connectivity_state, grpc_error* error) override;
126
127 const grpc_lb_user_data_vtable* user_data_vtable_;
128 void* user_data_ = nullptr;
129 grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
130 };
131
132 // A list of subchannels.
133 class RoundRobinSubchannelList
134 : public SubchannelList<RoundRobinSubchannelList,
135 RoundRobinSubchannelData> {
136 public:
RoundRobinSubchannelList(RoundRobin * policy,TraceFlag * tracer,const grpc_lb_addresses * addresses,grpc_combiner * combiner,grpc_client_channel_factory * client_channel_factory,const grpc_channel_args & args)137 RoundRobinSubchannelList(
138 RoundRobin* policy, TraceFlag* tracer,
139 const grpc_lb_addresses* addresses, grpc_combiner* combiner,
140 grpc_client_channel_factory* client_channel_factory,
141 const grpc_channel_args& args)
142 : SubchannelList(policy, tracer, addresses, combiner,
143 client_channel_factory, args),
144 last_ready_index_(num_subchannels() - 1) {
145 // Need to maintain a ref to the LB policy as long as we maintain
146 // any references to subchannels, since the subchannels'
147 // pollset_sets will include the LB policy's pollset_set.
148 policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
149 }
150
~RoundRobinSubchannelList()151 ~RoundRobinSubchannelList() {
152 GRPC_ERROR_UNREF(last_transient_failure_error_);
153 RoundRobin* p = static_cast<RoundRobin*>(policy());
154 p->Unref(DEBUG_LOCATION, "subchannel_list");
155 }
156
157 // Starts watching the subchannels in this list.
158 void StartWatchingLocked();
159
160 // Updates the counters of subchannels in each state when a
161 // subchannel transitions from old_state to new_state.
162 // transient_failure_error is the error that is reported when
163 // new_state is TRANSIENT_FAILURE.
164 void UpdateStateCountersLocked(grpc_connectivity_state old_state,
165 grpc_connectivity_state new_state,
166 grpc_error* transient_failure_error);
167
168 // If this subchannel list is the RR policy's current subchannel
169 // list, updates the RR policy's connectivity state based on the
170 // subchannel list's state counters.
171 void MaybeUpdateRoundRobinConnectivityStateLocked();
172
173 // Updates the RR policy's overall state based on the counters of
174 // subchannels in each state.
175 void UpdateRoundRobinStateFromSubchannelStateCountsLocked();
176
177 size_t GetNextReadySubchannelIndexLocked();
178 void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
179
180 private:
181 size_t num_ready_ = 0;
182 size_t num_connecting_ = 0;
183 size_t num_transient_failure_ = 0;
184 grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE;
185 size_t last_ready_index_; // Index into list of last pick.
186 };
187
188 // Helper class to ensure that any function that modifies the child refs
189 // data structures will update the channelz snapshot data structures before
190 // returning.
191 class AutoChildRefsUpdater {
192 public:
AutoChildRefsUpdater(RoundRobin * rr)193 explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {}
~AutoChildRefsUpdater()194 ~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); }
195
196 private:
197 RoundRobin* rr_;
198 };
199
200 void ShutdownLocked() override;
201
202 void StartPickingLocked();
203 bool DoPickLocked(PickState* pick);
204 void DrainPendingPicksLocked();
205 void UpdateChildRefsLocked();
206
207 /** list of subchannels */
208 OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
209 /** Latest version of the subchannel list.
210 * Subchannel connectivity callbacks will only promote updated subchannel
211 * lists if they equal \a latest_pending_subchannel_list. In other words,
212 * racing callbacks that reference outdated subchannel lists won't perform any
213 * update. */
214 OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
215 /** have we started picking? */
216 bool started_picking_ = false;
217 /** are we shutting down? */
218 bool shutdown_ = false;
219 /** List of picks that are waiting on connectivity */
220 PickState* pending_picks_ = nullptr;
221 /** our connectivity state tracker */
222 grpc_connectivity_state_tracker state_tracker_;
223 /// Lock and data used to capture snapshots of this channel's child
224 /// channels and subchannels. This data is consumed by channelz.
225 gpr_mu child_refs_mu_;
226 ChildRefsList child_subchannels_;
227 ChildRefsList child_channels_;
228 };
229
RoundRobin(const Args & args)230 RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
231 GPR_ASSERT(args.client_channel_factory != nullptr);
232 gpr_mu_init(&child_refs_mu_);
233 grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
234 "round_robin");
235 UpdateLocked(*args.args);
236 if (grpc_lb_round_robin_trace.enabled()) {
237 gpr_log(GPR_INFO, "[RR %p] Created with %" PRIuPTR " subchannels", this,
238 subchannel_list_->num_subchannels());
239 }
240 grpc_subchannel_index_ref();
241 }
242
~RoundRobin()243 RoundRobin::~RoundRobin() {
244 if (grpc_lb_round_robin_trace.enabled()) {
245 gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
246 }
247 gpr_mu_destroy(&child_refs_mu_);
248 GPR_ASSERT(subchannel_list_ == nullptr);
249 GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
250 GPR_ASSERT(pending_picks_ == nullptr);
251 grpc_connectivity_state_destroy(&state_tracker_);
252 grpc_subchannel_index_unref();
253 }
254
HandOffPendingPicksLocked(LoadBalancingPolicy * new_policy)255 void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
256 PickState* pick;
257 while ((pick = pending_picks_) != nullptr) {
258 pending_picks_ = pick->next;
259 grpc_error* error = GRPC_ERROR_NONE;
260 if (new_policy->PickLocked(pick, &error)) {
261 // Synchronous return, schedule closure.
262 GRPC_CLOSURE_SCHED(pick->on_complete, error);
263 }
264 }
265 }
266
ShutdownLocked()267 void RoundRobin::ShutdownLocked() {
268 AutoChildRefsUpdater guard(this);
269 grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
270 if (grpc_lb_round_robin_trace.enabled()) {
271 gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
272 }
273 shutdown_ = true;
274 PickState* pick;
275 while ((pick = pending_picks_) != nullptr) {
276 pending_picks_ = pick->next;
277 pick->connected_subchannel.reset();
278 GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
279 }
280 grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
281 GRPC_ERROR_REF(error), "rr_shutdown");
282 subchannel_list_.reset();
283 latest_pending_subchannel_list_.reset();
284 TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED);
285 GRPC_ERROR_UNREF(error);
286 }
287
CancelPickLocked(PickState * pick,grpc_error * error)288 void RoundRobin::CancelPickLocked(PickState* pick, grpc_error* error) {
289 PickState* pp = pending_picks_;
290 pending_picks_ = nullptr;
291 while (pp != nullptr) {
292 PickState* next = pp->next;
293 if (pp == pick) {
294 pick->connected_subchannel.reset();
295 GRPC_CLOSURE_SCHED(pick->on_complete,
296 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
297 "Pick Cancelled", &error, 1));
298 } else {
299 pp->next = pending_picks_;
300 pending_picks_ = pp;
301 }
302 pp = next;
303 }
304 GRPC_ERROR_UNREF(error);
305 }
306
CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,uint32_t initial_metadata_flags_eq,grpc_error * error)307 void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
308 uint32_t initial_metadata_flags_eq,
309 grpc_error* error) {
310 PickState* pick = pending_picks_;
311 pending_picks_ = nullptr;
312 while (pick != nullptr) {
313 PickState* next = pick->next;
314 if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
315 initial_metadata_flags_eq) {
316 pick->connected_subchannel.reset();
317 GRPC_CLOSURE_SCHED(pick->on_complete,
318 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
319 "Pick Cancelled", &error, 1));
320 } else {
321 pick->next = pending_picks_;
322 pending_picks_ = pick;
323 }
324 pick = next;
325 }
326 GRPC_ERROR_UNREF(error);
327 }
328
StartPickingLocked()329 void RoundRobin::StartPickingLocked() {
330 started_picking_ = true;
331 subchannel_list_->StartWatchingLocked();
332 }
333
ExitIdleLocked()334 void RoundRobin::ExitIdleLocked() {
335 if (!started_picking_) {
336 StartPickingLocked();
337 }
338 }
339
ResetBackoffLocked()340 void RoundRobin::ResetBackoffLocked() {
341 subchannel_list_->ResetBackoffLocked();
342 if (latest_pending_subchannel_list_ != nullptr) {
343 latest_pending_subchannel_list_->ResetBackoffLocked();
344 }
345 }
346
DoPickLocked(PickState * pick)347 bool RoundRobin::DoPickLocked(PickState* pick) {
348 const size_t next_ready_index =
349 subchannel_list_->GetNextReadySubchannelIndexLocked();
350 if (next_ready_index < subchannel_list_->num_subchannels()) {
351 /* readily available, report right away */
352 RoundRobinSubchannelData* sd =
353 subchannel_list_->subchannel(next_ready_index);
354 GPR_ASSERT(sd->connected_subchannel() != nullptr);
355 pick->connected_subchannel = sd->connected_subchannel()->Ref();
356 if (pick->user_data != nullptr) {
357 *pick->user_data = sd->user_data();
358 }
359 if (grpc_lb_round_robin_trace.enabled()) {
360 gpr_log(GPR_INFO,
361 "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
362 "index %" PRIuPTR ")",
363 this, sd->subchannel(), pick->connected_subchannel.get(),
364 sd->subchannel_list(), next_ready_index);
365 }
366 /* only advance the last picked pointer if the selection was used */
367 subchannel_list_->UpdateLastReadySubchannelIndexLocked(next_ready_index);
368 return true;
369 }
370 return false;
371 }
372
DrainPendingPicksLocked()373 void RoundRobin::DrainPendingPicksLocked() {
374 PickState* pick;
375 while ((pick = pending_picks_)) {
376 pending_picks_ = pick->next;
377 GPR_ASSERT(DoPickLocked(pick));
378 GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
379 }
380 }
381
PickLocked(PickState * pick,grpc_error ** error)382 bool RoundRobin::PickLocked(PickState* pick, grpc_error** error) {
383 if (grpc_lb_round_robin_trace.enabled()) {
384 gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_);
385 }
386 GPR_ASSERT(!shutdown_);
387 if (subchannel_list_ != nullptr) {
388 if (DoPickLocked(pick)) return true;
389 }
390 if (pick->on_complete == nullptr) {
391 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
392 "No pick result available but synchronous result required.");
393 return true;
394 }
395 /* no pick currently available. Save for later in list of pending picks */
396 pick->next = pending_picks_;
397 pending_picks_ = pick;
398 if (!started_picking_) {
399 StartPickingLocked();
400 }
401 return false;
402 }
403
FillChildRefsForChannelz(ChildRefsList * child_subchannels_to_fill,ChildRefsList * ignored)404 void RoundRobin::FillChildRefsForChannelz(
405 ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
406 MutexLock lock(&child_refs_mu_);
407 for (size_t i = 0; i < child_subchannels_.size(); ++i) {
408 // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
409 // have to implement lightweight set. For now, we don't care about
410 // performance when channelz requests are made.
411 bool found = false;
412 for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
413 if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
414 found = true;
415 break;
416 }
417 }
418 if (!found) {
419 child_subchannels_to_fill->push_back(child_subchannels_[i]);
420 }
421 }
422 }
423
UpdateChildRefsLocked()424 void RoundRobin::UpdateChildRefsLocked() {
425 ChildRefsList cs;
426 if (subchannel_list_ != nullptr) {
427 subchannel_list_->PopulateChildRefsList(&cs);
428 }
429 if (latest_pending_subchannel_list_ != nullptr) {
430 latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
431 }
432 // atomically update the data that channelz will actually be looking at.
433 MutexLock lock(&child_refs_mu_);
434 child_subchannels_ = std::move(cs);
435 }
436
StartWatchingLocked()437 void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
438 if (num_subchannels() == 0) return;
439 // Check current state of each subchannel synchronously, since any
440 // subchannel already used by some other channel may have a non-IDLE
441 // state.
442 for (size_t i = 0; i < num_subchannels(); ++i) {
443 grpc_error* error = GRPC_ERROR_NONE;
444 grpc_connectivity_state state =
445 subchannel(i)->CheckConnectivityStateLocked(&error);
446 if (state != GRPC_CHANNEL_IDLE) {
447 subchannel(i)->UpdateConnectivityStateLocked(state, error);
448 }
449 }
450 // Now set the LB policy's state based on the subchannels' states.
451 UpdateRoundRobinStateFromSubchannelStateCountsLocked();
452 // Start connectivity watch for each subchannel.
453 for (size_t i = 0; i < num_subchannels(); i++) {
454 if (subchannel(i)->subchannel() != nullptr) {
455 subchannel(i)->StartConnectivityWatchLocked();
456 }
457 }
458 }
459
UpdateStateCountersLocked(grpc_connectivity_state old_state,grpc_connectivity_state new_state,grpc_error * transient_failure_error)460 void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
461 grpc_connectivity_state old_state, grpc_connectivity_state new_state,
462 grpc_error* transient_failure_error) {
463 GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN);
464 GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
465 if (old_state == GRPC_CHANNEL_READY) {
466 GPR_ASSERT(num_ready_ > 0);
467 --num_ready_;
468 } else if (old_state == GRPC_CHANNEL_CONNECTING) {
469 GPR_ASSERT(num_connecting_ > 0);
470 --num_connecting_;
471 } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
472 GPR_ASSERT(num_transient_failure_ > 0);
473 --num_transient_failure_;
474 }
475 if (new_state == GRPC_CHANNEL_READY) {
476 ++num_ready_;
477 } else if (new_state == GRPC_CHANNEL_CONNECTING) {
478 ++num_connecting_;
479 } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
480 ++num_transient_failure_;
481 }
482 GRPC_ERROR_UNREF(last_transient_failure_error_);
483 last_transient_failure_error_ = transient_failure_error;
484 }
485
486 // Sets the RR policy's connectivity state based on the current
487 // subchannel list.
488 void RoundRobin::RoundRobinSubchannelList::
MaybeUpdateRoundRobinConnectivityStateLocked()489 MaybeUpdateRoundRobinConnectivityStateLocked() {
490 RoundRobin* p = static_cast<RoundRobin*>(policy());
491 // Only set connectivity state if this is the current subchannel list.
492 if (p->subchannel_list_.get() != this) return;
493 /* In priority order. The first rule to match terminates the search (ie, if we
494 * are on rule n, all previous rules were unfulfilled).
495 *
496 * 1) RULE: ANY subchannel is READY => policy is READY.
497 * CHECK: subchannel_list->num_ready > 0.
498 *
499 * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
500 * CHECK: sd->curr_connectivity_state == CONNECTING.
501 *
502 * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
503 * TRANSIENT_FAILURE.
504 * CHECK: subchannel_list->num_transient_failures ==
505 * subchannel_list->num_subchannels.
506 */
507 if (num_ready_ > 0) {
508 /* 1) READY */
509 grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
510 GRPC_ERROR_NONE, "rr_ready");
511 } else if (num_connecting_ > 0) {
512 /* 2) CONNECTING */
513 grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
514 GRPC_ERROR_NONE, "rr_connecting");
515 } else if (num_transient_failure_ == num_subchannels()) {
516 /* 3) TRANSIENT_FAILURE */
517 grpc_connectivity_state_set(&p->state_tracker_,
518 GRPC_CHANNEL_TRANSIENT_FAILURE,
519 GRPC_ERROR_REF(last_transient_failure_error_),
520 "rr_exhausted_subchannels");
521 }
522 }
523
524 void RoundRobin::RoundRobinSubchannelList::
UpdateRoundRobinStateFromSubchannelStateCountsLocked()525 UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
526 RoundRobin* p = static_cast<RoundRobin*>(policy());
527 AutoChildRefsUpdater guard(p);
528 if (num_ready_ > 0) {
529 if (p->subchannel_list_.get() != this) {
530 // Promote this list to p->subchannel_list_.
531 // This list must be p->latest_pending_subchannel_list_, because
532 // any previous update would have been shut down already and
533 // therefore we would not be receiving a notification for them.
534 GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this);
535 GPR_ASSERT(!shutting_down());
536 if (grpc_lb_round_robin_trace.enabled()) {
537 const size_t old_num_subchannels =
538 p->subchannel_list_ != nullptr
539 ? p->subchannel_list_->num_subchannels()
540 : 0;
541 gpr_log(GPR_INFO,
542 "[RR %p] phasing out subchannel list %p (size %" PRIuPTR
543 ") in favor of %p (size %" PRIuPTR ")",
544 p, p->subchannel_list_.get(), old_num_subchannels, this,
545 num_subchannels());
546 }
547 p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
548 }
549 // Drain pending picks.
550 p->DrainPendingPicksLocked();
551 }
552 // Update the RR policy's connectivity state if needed.
553 MaybeUpdateRoundRobinConnectivityStateLocked();
554 }
555
UpdateConnectivityStateLocked(grpc_connectivity_state connectivity_state,grpc_error * error)556 void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
557 grpc_connectivity_state connectivity_state, grpc_error* error) {
558 RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
559 if (grpc_lb_round_robin_trace.enabled()) {
560 gpr_log(
561 GPR_INFO,
562 "[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
563 "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
564 p, subchannel(), subchannel_list(), Index(),
565 subchannel_list()->num_subchannels(),
566 grpc_connectivity_state_name(last_connectivity_state_),
567 grpc_connectivity_state_name(connectivity_state));
568 }
569 subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
570 connectivity_state, error);
571 last_connectivity_state_ = connectivity_state;
572 }
573
ProcessConnectivityChangeLocked(grpc_connectivity_state connectivity_state,grpc_error * error)574 void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
575 grpc_connectivity_state connectivity_state, grpc_error* error) {
576 RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
577 GPR_ASSERT(subchannel() != nullptr);
578 // If the new state is TRANSIENT_FAILURE, re-resolve.
579 // Only do this if we've started watching, not at startup time.
580 // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
581 // when the subchannel list was created, we'd wind up in a constant
582 // loop of re-resolution.
583 if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
584 if (grpc_lb_round_robin_trace.enabled()) {
585 gpr_log(GPR_INFO,
586 "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
587 "Requesting re-resolution",
588 p, subchannel());
589 }
590 p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
591 }
592 // Update state counters.
593 UpdateConnectivityStateLocked(connectivity_state, error);
594 // Update overall state and renew notification.
595 subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
596 RenewConnectivityWatchLocked();
597 }
598
599 /** Returns the index into p->subchannel_list->subchannels of the next
600 * subchannel in READY state, or p->subchannel_list->num_subchannels if no
601 * subchannel is READY.
602 *
603 * Note that this function does *not* update p->last_ready_subchannel_index.
604 * The caller must do that if it returns a pick. */
605 size_t
GetNextReadySubchannelIndexLocked()606 RoundRobin::RoundRobinSubchannelList::GetNextReadySubchannelIndexLocked() {
607 if (grpc_lb_round_robin_trace.enabled()) {
608 gpr_log(GPR_INFO,
609 "[RR %p] getting next ready subchannel (out of %" PRIuPTR
610 "), last_ready_index=%" PRIuPTR,
611 policy(), num_subchannels(), last_ready_index_);
612 }
613 for (size_t i = 0; i < num_subchannels(); ++i) {
614 const size_t index = (i + last_ready_index_ + 1) % num_subchannels();
615 if (grpc_lb_round_robin_trace.enabled()) {
616 gpr_log(
617 GPR_INFO,
618 "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR
619 ": state=%s",
620 policy(), subchannel(index)->subchannel(), this, index,
621 grpc_connectivity_state_name(
622 subchannel(index)->connectivity_state()));
623 }
624 if (subchannel(index)->connectivity_state() == GRPC_CHANNEL_READY) {
625 if (grpc_lb_round_robin_trace.enabled()) {
626 gpr_log(GPR_INFO,
627 "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR
628 " of subchannel_list %p",
629 policy(), subchannel(index)->subchannel(), index, this);
630 }
631 return index;
632 }
633 }
634 if (grpc_lb_round_robin_trace.enabled()) {
635 gpr_log(GPR_INFO, "[RR %p] no subchannels in ready state", this);
636 }
637 return num_subchannels();
638 }
639
640 // Sets last_ready_index_ to last_ready_index.
UpdateLastReadySubchannelIndexLocked(size_t last_ready_index)641 void RoundRobin::RoundRobinSubchannelList::UpdateLastReadySubchannelIndexLocked(
642 size_t last_ready_index) {
643 GPR_ASSERT(last_ready_index < num_subchannels());
644 last_ready_index_ = last_ready_index;
645 if (grpc_lb_round_robin_trace.enabled()) {
646 gpr_log(GPR_INFO,
647 "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR
648 " (SC %p, CSC %p)",
649 policy(), last_ready_index,
650 subchannel(last_ready_index)->subchannel(),
651 subchannel(last_ready_index)->connected_subchannel());
652 }
653 }
654
CheckConnectivityLocked(grpc_error ** error)655 grpc_connectivity_state RoundRobin::CheckConnectivityLocked(
656 grpc_error** error) {
657 return grpc_connectivity_state_get(&state_tracker_, error);
658 }
659
NotifyOnStateChangeLocked(grpc_connectivity_state * current,grpc_closure * notify)660 void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
661 grpc_closure* notify) {
662 grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
663 notify);
664 }
665
UpdateLocked(const grpc_channel_args & args)666 void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
667 const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
668 AutoChildRefsUpdater guard(this);
669 if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
670 gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
671 // If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
672 // Otherwise, keep using the current subchannel list (ignore this update).
673 if (subchannel_list_ == nullptr) {
674 grpc_connectivity_state_set(
675 &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
676 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
677 "rr_update_missing");
678 }
679 return;
680 }
681 grpc_lb_addresses* addresses =
682 static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
683 if (grpc_lb_round_robin_trace.enabled()) {
684 gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
685 this, addresses->num_addresses);
686 }
687 // Replace latest_pending_subchannel_list_.
688 if (latest_pending_subchannel_list_ != nullptr) {
689 if (grpc_lb_round_robin_trace.enabled()) {
690 gpr_log(GPR_INFO,
691 "[RR %p] Shutting down previous pending subchannel list %p", this,
692 latest_pending_subchannel_list_.get());
693 }
694 }
695 latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
696 this, &grpc_lb_round_robin_trace, addresses, combiner(),
697 client_channel_factory(), args);
698 // If we haven't started picking yet or the new list is empty,
699 // immediately promote the new list to the current list.
700 if (!started_picking_ ||
701 latest_pending_subchannel_list_->num_subchannels() == 0) {
702 if (latest_pending_subchannel_list_->num_subchannels() == 0) {
703 grpc_connectivity_state_set(
704 &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
705 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
706 "rr_update_empty");
707 }
708 subchannel_list_ = std::move(latest_pending_subchannel_list_);
709 } else {
710 // If we've started picking, start watching the new list.
711 latest_pending_subchannel_list_->StartWatchingLocked();
712 }
713 }
714
715 //
716 // factory
717 //
718
719 class RoundRobinFactory : public LoadBalancingPolicyFactory {
720 public:
CreateLoadBalancingPolicy(const LoadBalancingPolicy::Args & args) const721 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
722 const LoadBalancingPolicy::Args& args) const override {
723 return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(args));
724 }
725
name() const726 const char* name() const override { return "round_robin"; }
727 };
728
729 } // namespace
730
731 } // namespace grpc_core
732
grpc_lb_policy_round_robin_init()733 void grpc_lb_policy_round_robin_init() {
734 grpc_core::LoadBalancingPolicyRegistry::Builder::
735 RegisterLoadBalancingPolicyFactory(
736 grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
737 grpc_core::New<grpc_core::RoundRobinFactory>()));
738 }
739
grpc_lb_policy_round_robin_shutdown()740 void grpc_lb_policy_round_robin_shutdown() {}
741