1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <string.h>
22
23 #include <grpc/support/alloc.h>
24
25 #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
26 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
27 #include "src/core/ext/filters/client_channel/subchannel.h"
28 #include "src/core/ext/filters/client_channel/subchannel_index.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/gprpp/mutex_lock.h"
31 #include "src/core/lib/iomgr/combiner.h"
32 #include "src/core/lib/iomgr/sockaddr_utils.h"
33 #include "src/core/lib/transport/connectivity_state.h"
34
35 namespace grpc_core {
36
37 TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
38
39 namespace {
40
41 //
42 // pick_first LB policy
43 //
44
45 class PickFirst : public LoadBalancingPolicy {
46 public:
47 explicit PickFirst(const Args& args);
48
49 void UpdateLocked(const grpc_channel_args& args) override;
50 bool PickLocked(PickState* pick, grpc_error** error) override;
51 void CancelPickLocked(PickState* pick, grpc_error* error) override;
52 void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
53 uint32_t initial_metadata_flags_eq,
54 grpc_error* error) override;
55 void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
56 grpc_closure* closure) override;
57 grpc_connectivity_state CheckConnectivityLocked(
58 grpc_error** connectivity_error) override;
59 void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
60 void ExitIdleLocked() override;
61 void ResetBackoffLocked() override;
62 void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
63 ChildRefsList* ignored) override;
64
65 private:
66 ~PickFirst();
67
68 class PickFirstSubchannelList;
69
70 class PickFirstSubchannelData
71 : public SubchannelData<PickFirstSubchannelList,
72 PickFirstSubchannelData> {
73 public:
PickFirstSubchannelData(SubchannelList<PickFirstSubchannelList,PickFirstSubchannelData> * subchannel_list,const grpc_lb_user_data_vtable * user_data_vtable,const grpc_lb_address & address,grpc_subchannel * subchannel,grpc_combiner * combiner)74 PickFirstSubchannelData(
75 SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
76 subchannel_list,
77 const grpc_lb_user_data_vtable* user_data_vtable,
78 const grpc_lb_address& address, grpc_subchannel* subchannel,
79 grpc_combiner* combiner)
80 : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
81 combiner) {}
82
83 void ProcessConnectivityChangeLocked(
84 grpc_connectivity_state connectivity_state, grpc_error* error) override;
85
86 // Processes the connectivity change to READY for an unselected subchannel.
87 void ProcessUnselectedReadyLocked();
88
89 void CheckConnectivityStateAndStartWatchingLocked();
90 };
91
92 class PickFirstSubchannelList
93 : public SubchannelList<PickFirstSubchannelList,
94 PickFirstSubchannelData> {
95 public:
PickFirstSubchannelList(PickFirst * policy,TraceFlag * tracer,const grpc_lb_addresses * addresses,grpc_combiner * combiner,grpc_client_channel_factory * client_channel_factory,const grpc_channel_args & args)96 PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
97 const grpc_lb_addresses* addresses,
98 grpc_combiner* combiner,
99 grpc_client_channel_factory* client_channel_factory,
100 const grpc_channel_args& args)
101 : SubchannelList(policy, tracer, addresses, combiner,
102 client_channel_factory, args) {
103 // Need to maintain a ref to the LB policy as long as we maintain
104 // any references to subchannels, since the subchannels'
105 // pollset_sets will include the LB policy's pollset_set.
106 policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
107 }
108
~PickFirstSubchannelList()109 ~PickFirstSubchannelList() {
110 PickFirst* p = static_cast<PickFirst*>(policy());
111 p->Unref(DEBUG_LOCATION, "subchannel_list");
112 }
113 };
114
115 // Helper class to ensure that any function that modifies the child refs
116 // data structures will update the channelz snapshot data structures before
117 // returning.
118 class AutoChildRefsUpdater {
119 public:
AutoChildRefsUpdater(PickFirst * pf)120 explicit AutoChildRefsUpdater(PickFirst* pf) : pf_(pf) {}
~AutoChildRefsUpdater()121 ~AutoChildRefsUpdater() { pf_->UpdateChildRefsLocked(); }
122
123 private:
124 PickFirst* pf_;
125 };
126
127 void ShutdownLocked() override;
128
129 void StartPickingLocked();
130 void UpdateChildRefsLocked();
131
132 // All our subchannels.
133 OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
134 // Latest pending subchannel list.
135 OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
136 // Selected subchannel in \a subchannel_list_.
137 PickFirstSubchannelData* selected_ = nullptr;
138 // Have we started picking?
139 bool started_picking_ = false;
140 // Are we shut down?
141 bool shutdown_ = false;
142 // List of picks that are waiting on connectivity.
143 PickState* pending_picks_ = nullptr;
144 // Our connectivity state tracker.
145 grpc_connectivity_state_tracker state_tracker_;
146
147 /// Lock and data used to capture snapshots of this channels child
148 /// channels and subchannels. This data is consumed by channelz.
149 gpr_mu child_refs_mu_;
150 ChildRefsList child_subchannels_;
151 ChildRefsList child_channels_;
152 };
153
PickFirst(const Args & args)154 PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
155 GPR_ASSERT(args.client_channel_factory != nullptr);
156 gpr_mu_init(&child_refs_mu_);
157 grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
158 "pick_first");
159 if (grpc_lb_pick_first_trace.enabled()) {
160 gpr_log(GPR_INFO, "Pick First %p created.", this);
161 }
162 UpdateLocked(*args.args);
163 grpc_subchannel_index_ref();
164 }
165
~PickFirst()166 PickFirst::~PickFirst() {
167 if (grpc_lb_pick_first_trace.enabled()) {
168 gpr_log(GPR_INFO, "Destroying Pick First %p", this);
169 }
170 gpr_mu_destroy(&child_refs_mu_);
171 GPR_ASSERT(subchannel_list_ == nullptr);
172 GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
173 GPR_ASSERT(pending_picks_ == nullptr);
174 grpc_connectivity_state_destroy(&state_tracker_);
175 grpc_subchannel_index_unref();
176 }
177
HandOffPendingPicksLocked(LoadBalancingPolicy * new_policy)178 void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
179 PickState* pick;
180 while ((pick = pending_picks_) != nullptr) {
181 pending_picks_ = pick->next;
182 grpc_error* error = GRPC_ERROR_NONE;
183 if (new_policy->PickLocked(pick, &error)) {
184 // Synchronous return, schedule closure.
185 GRPC_CLOSURE_SCHED(pick->on_complete, error);
186 }
187 }
188 }
189
ShutdownLocked()190 void PickFirst::ShutdownLocked() {
191 AutoChildRefsUpdater guard(this);
192 grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
193 if (grpc_lb_pick_first_trace.enabled()) {
194 gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
195 }
196 shutdown_ = true;
197 PickState* pick;
198 while ((pick = pending_picks_) != nullptr) {
199 pending_picks_ = pick->next;
200 pick->connected_subchannel.reset();
201 GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
202 }
203 grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
204 GRPC_ERROR_REF(error), "shutdown");
205 subchannel_list_.reset();
206 latest_pending_subchannel_list_.reset();
207 TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED);
208 GRPC_ERROR_UNREF(error);
209 }
210
CancelPickLocked(PickState * pick,grpc_error * error)211 void PickFirst::CancelPickLocked(PickState* pick, grpc_error* error) {
212 PickState* pp = pending_picks_;
213 pending_picks_ = nullptr;
214 while (pp != nullptr) {
215 PickState* next = pp->next;
216 if (pp == pick) {
217 pick->connected_subchannel.reset();
218 GRPC_CLOSURE_SCHED(pick->on_complete,
219 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
220 "Pick Cancelled", &error, 1));
221 } else {
222 pp->next = pending_picks_;
223 pending_picks_ = pp;
224 }
225 pp = next;
226 }
227 GRPC_ERROR_UNREF(error);
228 }
229
CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,uint32_t initial_metadata_flags_eq,grpc_error * error)230 void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
231 uint32_t initial_metadata_flags_eq,
232 grpc_error* error) {
233 PickState* pick = pending_picks_;
234 pending_picks_ = nullptr;
235 while (pick != nullptr) {
236 PickState* next = pick->next;
237 if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
238 initial_metadata_flags_eq) {
239 GRPC_CLOSURE_SCHED(pick->on_complete,
240 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
241 "Pick Cancelled", &error, 1));
242 } else {
243 pick->next = pending_picks_;
244 pending_picks_ = pick;
245 }
246 pick = next;
247 }
248 GRPC_ERROR_UNREF(error);
249 }
250
StartPickingLocked()251 void PickFirst::StartPickingLocked() {
252 started_picking_ = true;
253 if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels() > 0) {
254 subchannel_list_->subchannel(0)
255 ->CheckConnectivityStateAndStartWatchingLocked();
256 }
257 }
258
ExitIdleLocked()259 void PickFirst::ExitIdleLocked() {
260 if (!started_picking_) {
261 StartPickingLocked();
262 }
263 }
264
ResetBackoffLocked()265 void PickFirst::ResetBackoffLocked() {
266 subchannel_list_->ResetBackoffLocked();
267 if (latest_pending_subchannel_list_ != nullptr) {
268 latest_pending_subchannel_list_->ResetBackoffLocked();
269 }
270 }
271
PickLocked(PickState * pick,grpc_error ** error)272 bool PickFirst::PickLocked(PickState* pick, grpc_error** error) {
273 // If we have a selected subchannel already, return synchronously.
274 if (selected_ != nullptr) {
275 pick->connected_subchannel = selected_->connected_subchannel()->Ref();
276 return true;
277 }
278 // No subchannel selected yet, so handle asynchronously.
279 if (pick->on_complete == nullptr) {
280 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
281 "No pick result available but synchronous result required.");
282 return true;
283 }
284 pick->next = pending_picks_;
285 pending_picks_ = pick;
286 if (!started_picking_) {
287 StartPickingLocked();
288 }
289 return false;
290 }
291
CheckConnectivityLocked(grpc_error ** error)292 grpc_connectivity_state PickFirst::CheckConnectivityLocked(grpc_error** error) {
293 return grpc_connectivity_state_get(&state_tracker_, error);
294 }
295
NotifyOnStateChangeLocked(grpc_connectivity_state * current,grpc_closure * notify)296 void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
297 grpc_closure* notify) {
298 grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
299 notify);
300 }
301
FillChildRefsForChannelz(ChildRefsList * child_subchannels_to_fill,ChildRefsList * ignored)302 void PickFirst::FillChildRefsForChannelz(
303 ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
304 MutexLock lock(&child_refs_mu_);
305 for (size_t i = 0; i < child_subchannels_.size(); ++i) {
306 // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
307 // have to implement lightweight set. For now, we don't care about
308 // performance when channelz requests are made.
309 bool found = false;
310 for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
311 if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
312 found = true;
313 break;
314 }
315 }
316 if (!found) {
317 child_subchannels_to_fill->push_back(child_subchannels_[i]);
318 }
319 }
320 }
321
UpdateChildRefsLocked()322 void PickFirst::UpdateChildRefsLocked() {
323 ChildRefsList cs;
324 if (subchannel_list_ != nullptr) {
325 subchannel_list_->PopulateChildRefsList(&cs);
326 }
327 if (latest_pending_subchannel_list_ != nullptr) {
328 latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
329 }
330 // atomically update the data that channelz will actually be looking at.
331 MutexLock lock(&child_refs_mu_);
332 child_subchannels_ = std::move(cs);
333 }
334
UpdateLocked(const grpc_channel_args & args)335 void PickFirst::UpdateLocked(const grpc_channel_args& args) {
336 AutoChildRefsUpdater guard(this);
337 const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
338 if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
339 if (subchannel_list_ == nullptr) {
340 // If we don't have a current subchannel list, go into TRANSIENT FAILURE.
341 grpc_connectivity_state_set(
342 &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
343 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
344 "pf_update_missing");
345 } else {
346 // otherwise, keep using the current subchannel list (ignore this update).
347 gpr_log(GPR_ERROR,
348 "No valid LB addresses channel arg for Pick First %p update, "
349 "ignoring.",
350 this);
351 }
352 return;
353 }
354 const grpc_lb_addresses* addresses =
355 static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
356 if (grpc_lb_pick_first_trace.enabled()) {
357 gpr_log(GPR_INFO,
358 "Pick First %p received update with %" PRIuPTR " addresses", this,
359 addresses->num_addresses);
360 }
361 auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
362 this, &grpc_lb_pick_first_trace, addresses, combiner(),
363 client_channel_factory(), args);
364 if (subchannel_list->num_subchannels() == 0) {
365 // Empty update or no valid subchannels. Unsubscribe from all current
366 // subchannels and put the channel in TRANSIENT_FAILURE.
367 grpc_connectivity_state_set(
368 &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
369 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
370 "pf_update_empty");
371 subchannel_list_ = std::move(subchannel_list); // Empty list.
372 selected_ = nullptr;
373 return;
374 }
375 if (selected_ == nullptr) {
376 // We don't yet have a selected subchannel, so replace the current
377 // subchannel list immediately.
378 subchannel_list_ = std::move(subchannel_list);
379 // If we've started picking, start trying to connect to the first
380 // subchannel in the new list.
381 if (started_picking_) {
382 subchannel_list_->subchannel(0)
383 ->CheckConnectivityStateAndStartWatchingLocked();
384 }
385 } else {
386 // We do have a selected subchannel.
387 // Check if it's present in the new list. If so, we're done.
388 for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
389 PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
390 if (sd->subchannel() == selected_->subchannel()) {
391 // The currently selected subchannel is in the update: we are done.
392 if (grpc_lb_pick_first_trace.enabled()) {
393 gpr_log(GPR_INFO,
394 "Pick First %p found already selected subchannel %p "
395 "at update index %" PRIuPTR " of %" PRIuPTR "; update done",
396 this, selected_->subchannel(), i,
397 subchannel_list->num_subchannels());
398 }
399 // Make sure it's in state READY. It might not be if we grabbed
400 // the combiner while a connectivity state notification
401 // informing us otherwise is pending.
402 // Note that CheckConnectivityStateLocked() also takes a ref to
403 // the connected subchannel.
404 grpc_error* error = GRPC_ERROR_NONE;
405 if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
406 selected_ = sd;
407 subchannel_list_ = std::move(subchannel_list);
408 sd->StartConnectivityWatchLocked();
409 // If there was a previously pending update (which may or may
410 // not have contained the currently selected subchannel), drop
411 // it, so that it doesn't override what we've done here.
412 latest_pending_subchannel_list_.reset();
413 return;
414 }
415 GRPC_ERROR_UNREF(error);
416 }
417 }
418 // Not keeping the previous selected subchannel, so set the latest
419 // pending subchannel list to the new subchannel list. We will wait
420 // for it to report READY before swapping it into the current
421 // subchannel list.
422 if (latest_pending_subchannel_list_ != nullptr) {
423 if (grpc_lb_pick_first_trace.enabled()) {
424 gpr_log(GPR_INFO,
425 "Pick First %p Shutting down latest pending subchannel list "
426 "%p, about to be replaced by newer latest %p",
427 this, latest_pending_subchannel_list_.get(),
428 subchannel_list.get());
429 }
430 }
431 latest_pending_subchannel_list_ = std::move(subchannel_list);
432 // If we've started picking, start trying to connect to the first
433 // subchannel in the new list.
434 if (started_picking_) {
435 latest_pending_subchannel_list_->subchannel(0)
436 ->CheckConnectivityStateAndStartWatchingLocked();
437 }
438 }
439 }
440
ProcessConnectivityChangeLocked(grpc_connectivity_state connectivity_state,grpc_error * error)441 void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
442 grpc_connectivity_state connectivity_state, grpc_error* error) {
443 PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
444 AutoChildRefsUpdater guard(p);
445 // The notification must be for a subchannel in either the current or
446 // latest pending subchannel lists.
447 GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
448 subchannel_list() == p->latest_pending_subchannel_list_.get());
449 GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
450 // Handle updates for the currently selected subchannel.
451 if (p->selected_ == this) {
452 if (grpc_lb_pick_first_trace.enabled()) {
453 gpr_log(GPR_INFO,
454 "Pick First %p connectivity changed for selected subchannel", p);
455 }
456 // If the new state is anything other than READY and there is a
457 // pending update, switch to the pending update.
458 if (connectivity_state != GRPC_CHANNEL_READY &&
459 p->latest_pending_subchannel_list_ != nullptr) {
460 if (grpc_lb_pick_first_trace.enabled()) {
461 gpr_log(GPR_INFO,
462 "Pick First %p promoting pending subchannel list %p to "
463 "replace %p",
464 p, p->latest_pending_subchannel_list_.get(),
465 p->subchannel_list_.get());
466 }
467 p->selected_ = nullptr;
468 StopConnectivityWatchLocked();
469 p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
470 grpc_connectivity_state_set(
471 &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
472 error != GRPC_ERROR_NONE
473 ? GRPC_ERROR_REF(error)
474 : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
475 "selected subchannel not ready; switching to pending "
476 "update"),
477 "selected_not_ready+switch_to_update");
478 } else {
479 if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
480 // If the selected subchannel goes bad, request a re-resolution. We also
481 // set the channel state to IDLE and reset started_picking_. The reason
482 // is that if the new state is TRANSIENT_FAILURE due to a GOAWAY
483 // reception we don't want to connect to the re-resolved backends until
484 // we leave the IDLE state.
485 grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
486 GRPC_ERROR_NONE,
487 "selected_changed+reresolve");
488 p->started_picking_ = false;
489 p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
490 // In transient failure. Rely on re-resolution to recover.
491 p->selected_ = nullptr;
492 StopConnectivityWatchLocked();
493 } else {
494 grpc_connectivity_state_set(&p->state_tracker_, connectivity_state,
495 GRPC_ERROR_REF(error), "selected_changed");
496 // Renew notification.
497 RenewConnectivityWatchLocked();
498 }
499 }
500 GRPC_ERROR_UNREF(error);
501 return;
502 }
503 // If we get here, there are two possible cases:
504 // 1. We do not currently have a selected subchannel, and the update is
505 // for a subchannel in p->subchannel_list_ that we're trying to
506 // connect to. The goal here is to find a subchannel that we can
507 // select.
508 // 2. We do currently have a selected subchannel, and the update is
509 // for a subchannel in p->latest_pending_subchannel_list_. The
510 // goal here is to find a subchannel from the update that we can
511 // select in place of the current one.
512 switch (connectivity_state) {
513 case GRPC_CHANNEL_READY: {
514 ProcessUnselectedReadyLocked();
515 // Renew notification.
516 RenewConnectivityWatchLocked();
517 break;
518 }
519 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
520 StopConnectivityWatchLocked();
521 PickFirstSubchannelData* sd = this;
522 size_t next_index =
523 (sd->Index() + 1) % subchannel_list()->num_subchannels();
524 sd = subchannel_list()->subchannel(next_index);
525 // Case 1: Only set state to TRANSIENT_FAILURE if we've tried
526 // all subchannels.
527 if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
528 p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
529 grpc_connectivity_state_set(
530 &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
531 GRPC_ERROR_REF(error), "exhausted_subchannels");
532 }
533 sd->CheckConnectivityStateAndStartWatchingLocked();
534 break;
535 }
536 case GRPC_CHANNEL_CONNECTING:
537 case GRPC_CHANNEL_IDLE: {
538 // Only update connectivity state in case 1.
539 if (subchannel_list() == p->subchannel_list_.get()) {
540 grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
541 GRPC_ERROR_REF(error),
542 "connecting_changed");
543 }
544 // Renew notification.
545 RenewConnectivityWatchLocked();
546 break;
547 }
548 case GRPC_CHANNEL_SHUTDOWN:
549 GPR_UNREACHABLE_CODE(break);
550 }
551 GRPC_ERROR_UNREF(error);
552 }
553
ProcessUnselectedReadyLocked()554 void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
555 PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
556 // If we get here, there are two possible cases:
557 // 1. We do not currently have a selected subchannel, and the update is
558 // for a subchannel in p->subchannel_list_ that we're trying to
559 // connect to. The goal here is to find a subchannel that we can
560 // select.
561 // 2. We do currently have a selected subchannel, and the update is
562 // for a subchannel in p->latest_pending_subchannel_list_. The
563 // goal here is to find a subchannel from the update that we can
564 // select in place of the current one.
565 GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
566 subchannel_list() == p->latest_pending_subchannel_list_.get());
567 // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
568 if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
569 if (grpc_lb_pick_first_trace.enabled()) {
570 gpr_log(GPR_INFO,
571 "Pick First %p promoting pending subchannel list %p to "
572 "replace %p",
573 p, p->latest_pending_subchannel_list_.get(),
574 p->subchannel_list_.get());
575 }
576 p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
577 }
578 // Cases 1 and 2.
579 grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
580 GRPC_ERROR_NONE, "subchannel_ready");
581 p->selected_ = this;
582 if (grpc_lb_pick_first_trace.enabled()) {
583 gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
584 }
585 // Update any calls that were waiting for a pick.
586 PickState* pick;
587 while ((pick = p->pending_picks_)) {
588 p->pending_picks_ = pick->next;
589 pick->connected_subchannel = p->selected_->connected_subchannel()->Ref();
590 if (grpc_lb_pick_first_trace.enabled()) {
591 gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p",
592 p->selected_->subchannel());
593 }
594 GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
595 }
596 }
597
598 void PickFirst::PickFirstSubchannelData::
CheckConnectivityStateAndStartWatchingLocked()599 CheckConnectivityStateAndStartWatchingLocked() {
600 PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
601 grpc_error* error = GRPC_ERROR_NONE;
602 if (p->selected_ != this &&
603 CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
604 // We must process the READY subchannel before we start watching it.
605 // Otherwise, we won't know it's READY because we will be waiting for its
606 // connectivity state to change from READY.
607 ProcessUnselectedReadyLocked();
608 }
609 GRPC_ERROR_UNREF(error);
610 StartConnectivityWatchLocked();
611 }
612
613 //
614 // factory
615 //
616
617 class PickFirstFactory : public LoadBalancingPolicyFactory {
618 public:
CreateLoadBalancingPolicy(const LoadBalancingPolicy::Args & args) const619 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
620 const LoadBalancingPolicy::Args& args) const override {
621 return OrphanablePtr<LoadBalancingPolicy>(New<PickFirst>(args));
622 }
623
name() const624 const char* name() const override { return "pick_first"; }
625 };
626
627 } // namespace
628
629 } // namespace grpc_core
630
grpc_lb_policy_pick_first_init()631 void grpc_lb_policy_pick_first_init() {
632 grpc_core::LoadBalancingPolicyRegistry::Builder::
633 RegisterLoadBalancingPolicyFactory(
634 grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
635 grpc_core::New<grpc_core::PickFirstFactory>()));
636 }
637
grpc_lb_policy_pick_first_shutdown()638 void grpc_lb_policy_pick_first_shutdown() {}
639