1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/impl/channel_arg_names.h>
19 #include <grpc/impl/connectivity_state.h>
20 #include <grpc/support/port_platform.h>
21 #include <inttypes.h>
22
23 #include <algorithm>
24 #include <map>
25 #include <memory>
26 #include <set>
27 #include <string>
28 #include <type_traits>
29 #include <utility>
30 #include <vector>
31
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "absl/status/status.h"
35 #include "absl/status/statusor.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/str_join.h"
38 #include "absl/strings/string_view.h"
39 #include "absl/types/optional.h"
40 #include "src/core/config/core_configuration.h"
41 #include "src/core/lib/channel/channel_args.h"
42 #include "src/core/lib/debug/trace.h"
43 #include "src/core/lib/iomgr/exec_ctx.h"
44 #include "src/core/lib/iomgr/pollset_set.h"
45 #include "src/core/lib/transport/connectivity_state.h"
46 #include "src/core/load_balancing/address_filtering.h"
47 #include "src/core/load_balancing/child_policy_handler.h"
48 #include "src/core/load_balancing/delegating_helper.h"
49 #include "src/core/load_balancing/lb_policy.h"
50 #include "src/core/load_balancing/lb_policy_factory.h"
51 #include "src/core/load_balancing/lb_policy_registry.h"
52 #include "src/core/resolver/endpoint_addresses.h"
53 #include "src/core/util/debug_location.h"
54 #include "src/core/util/json/json.h"
55 #include "src/core/util/json/json_args.h"
56 #include "src/core/util/json/json_object_loader.h"
57 #include "src/core/util/orphanable.h"
58 #include "src/core/util/ref_counted_ptr.h"
59 #include "src/core/util/ref_counted_string.h"
60 #include "src/core/util/time.h"
61 #include "src/core/util/validation_errors.h"
62 #include "src/core/util/work_serializer.h"
63
64 namespace grpc_core {
65
66 namespace {
67
68 using ::grpc_event_engine::experimental::EventEngine;
69
70 constexpr absl::string_view kPriority = "priority_experimental";
71
72 // How long we keep a child around for after it is no longer being used
73 // (either because it has been removed from the config or because we
74 // have switched to a higher-priority child).
75 constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
76
77 // Default for how long we wait for a newly created child to get connected
78 // before starting to attempt the next priority. Overridable via channel arg.
79 constexpr Duration kDefaultChildFailoverTimeout = Duration::Seconds(10);
80
81 // Config for priority LB policy.
82 class PriorityLbConfig final : public LoadBalancingPolicy::Config {
83 public:
84 struct PriorityLbChild {
85 RefCountedPtr<LoadBalancingPolicy::Config> config;
86 bool ignore_reresolution_requests = false;
87
88 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
89 void JsonPostLoad(const Json& json, const JsonArgs&,
90 ValidationErrors* errors);
91 };
92
93 PriorityLbConfig() = default;
94
95 PriorityLbConfig(const PriorityLbConfig&) = delete;
96 PriorityLbConfig& operator=(const PriorityLbConfig&) = delete;
97
98 PriorityLbConfig(PriorityLbConfig&& other) = delete;
99 PriorityLbConfig& operator=(PriorityLbConfig&& other) = delete;
100
name() const101 absl::string_view name() const override { return kPriority; }
102
children() const103 const std::map<std::string, PriorityLbChild>& children() const {
104 return children_;
105 }
priorities() const106 const std::vector<std::string>& priorities() const { return priorities_; }
107
108 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
109 void JsonPostLoad(const Json& json, const JsonArgs&,
110 ValidationErrors* errors);
111
112 private:
113 std::map<std::string, PriorityLbChild> children_;
114 std::vector<std::string> priorities_;
115 };
116
117 // priority LB policy.
118 class PriorityLb final : public LoadBalancingPolicy {
119 public:
120 explicit PriorityLb(Args args);
121
name() const122 absl::string_view name() const override { return kPriority; }
123
124 absl::Status UpdateLocked(UpdateArgs args) override;
125 void ExitIdleLocked() override;
126 void ResetBackoffLocked() override;
127
128 private:
129 // Each ChildPriority holds a ref to the PriorityLb.
130 class ChildPriority final : public InternallyRefCounted<ChildPriority> {
131 public:
132 ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name);
133
~ChildPriority()134 ~ChildPriority() override {
135 priority_policy_.reset(DEBUG_LOCATION, "ChildPriority");
136 }
137
name() const138 const std::string& name() const { return name_; }
139
140 absl::Status UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
141 bool ignore_reresolution_requests);
142 void ExitIdleLocked();
143 void ResetBackoffLocked();
144 void MaybeDeactivateLocked();
145 void MaybeReactivateLocked();
146
147 void Orphan() override;
148
149 RefCountedPtr<SubchannelPicker> GetPicker();
150
connectivity_state() const151 grpc_connectivity_state connectivity_state() const {
152 return connectivity_state_;
153 }
154
connectivity_status() const155 const absl::Status& connectivity_status() const {
156 return connectivity_status_;
157 }
158
FailoverTimerPending() const159 bool FailoverTimerPending() const { return failover_timer_ != nullptr; }
160
161 private:
162 class Helper final : public DelegatingChannelControlHelper {
163 public:
Helper(RefCountedPtr<ChildPriority> priority)164 explicit Helper(RefCountedPtr<ChildPriority> priority)
165 : priority_(std::move(priority)) {}
166
~Helper()167 ~Helper() override { priority_.reset(DEBUG_LOCATION, "Helper"); }
168
169 void UpdateState(grpc_connectivity_state state,
170 const absl::Status& status,
171 RefCountedPtr<SubchannelPicker> picker) override;
172 void RequestReresolution() override;
173
174 private:
parent_helper() const175 ChannelControlHelper* parent_helper() const override {
176 return priority_->priority_policy_->channel_control_helper();
177 }
178
179 RefCountedPtr<ChildPriority> priority_;
180 };
181
182 class DeactivationTimer final
183 : public InternallyRefCounted<DeactivationTimer> {
184 public:
185 explicit DeactivationTimer(RefCountedPtr<ChildPriority> child_priority);
186
187 void Orphan() override;
188
189 private:
190 void OnTimerLocked();
191
192 RefCountedPtr<ChildPriority> child_priority_;
193 absl::optional<EventEngine::TaskHandle> timer_handle_;
194 };
195
196 class FailoverTimer final : public InternallyRefCounted<FailoverTimer> {
197 public:
198 explicit FailoverTimer(RefCountedPtr<ChildPriority> child_priority);
199
200 void Orphan() override;
201
202 private:
203 void OnTimerLocked();
204
205 RefCountedPtr<ChildPriority> child_priority_;
206 absl::optional<EventEngine::TaskHandle> timer_handle_;
207 };
208
209 // Methods for dealing with the child policy.
210 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
211 const ChannelArgs& args);
212
213 void OnConnectivityStateUpdateLocked(
214 grpc_connectivity_state state, const absl::Status& status,
215 RefCountedPtr<SubchannelPicker> picker);
216
217 RefCountedPtr<PriorityLb> priority_policy_;
218 const std::string name_;
219 bool ignore_reresolution_requests_ = false;
220
221 OrphanablePtr<LoadBalancingPolicy> child_policy_;
222
223 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
224 absl::Status connectivity_status_;
225 RefCountedPtr<SubchannelPicker> picker_;
226
227 bool seen_ready_or_idle_since_transient_failure_ = true;
228
229 OrphanablePtr<DeactivationTimer> deactivation_timer_;
230 OrphanablePtr<FailoverTimer> failover_timer_;
231 };
232
233 ~PriorityLb() override;
234
235 void ShutdownLocked() override;
236
237 // Returns the priority of the specified child name, or UINT32_MAX if
238 // the child is not in the current priority list.
239 uint32_t GetChildPriorityLocked(const std::string& child_name) const;
240
241 // Deletes a child. Called when the child's deactivation timer fires.
242 void DeleteChild(ChildPriority* child);
243
244 // Iterates through the list of priorities to choose one:
245 // - If the child for a priority doesn't exist, creates it.
246 // - If a child's failover timer is pending, selects that priority
247 // while we wait for the child to attempt to connect.
248 // - If the child is connected, selects that priority.
249 // - Otherwise, continues on to the next child.
250 // Delegates to the last child if none of the children are connecting.
251 // Reports TRANSIENT_FAILURE if the priority list is empty.
252 //
253 // This method is idempotent; it should yield the same result every
254 // time as a function of the state of the children.
255 void ChoosePriorityLocked();
256
257 // Sets the specified priority as the current priority.
258 // Optionally deactivates any children at lower priorities.
259 // Returns the child's picker to the channel.
260 void SetCurrentPriorityLocked(int32_t priority,
261 bool deactivate_lower_priorities,
262 const char* reason);
263
264 const Duration child_failover_timeout_;
265
266 // Current channel args and config from the resolver.
267 ChannelArgs args_;
268 RefCountedPtr<PriorityLbConfig> config_;
269 absl::StatusOr<HierarchicalAddressMap> addresses_;
270 std::string resolution_note_;
271
272 // Internal state.
273 bool shutting_down_ = false;
274
275 bool update_in_progress_ = false;
276
277 // All children that currently exist.
278 // Some of these children may be in deactivated state.
279 std::map<std::string, OrphanablePtr<ChildPriority>> children_;
280 // The priority that is being used.
281 uint32_t current_priority_ = UINT32_MAX;
282 };
283
284 //
285 // PriorityLb
286 //
287
PriorityLb(Args args)288 PriorityLb::PriorityLb(Args args)
289 : LoadBalancingPolicy(std::move(args)),
290 child_failover_timeout_(std::max(
291 Duration::Zero(),
292 channel_args()
293 .GetDurationFromIntMillis(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS)
294 .value_or(kDefaultChildFailoverTimeout))) {
295 GRPC_TRACE_LOG(priority_lb, INFO) << "[priority_lb " << this << "] created";
296 }
297
~PriorityLb()298 PriorityLb::~PriorityLb() {
299 GRPC_TRACE_LOG(priority_lb, INFO)
300 << "[priority_lb " << this << "] destroying priority LB policy";
301 }
302
ShutdownLocked()303 void PriorityLb::ShutdownLocked() {
304 GRPC_TRACE_LOG(priority_lb, INFO)
305 << "[priority_lb " << this << "] shutting down";
306 shutting_down_ = true;
307 children_.clear();
308 }
309
ExitIdleLocked()310 void PriorityLb::ExitIdleLocked() {
311 if (current_priority_ != UINT32_MAX) {
312 const std::string& child_name = config_->priorities()[current_priority_];
313 GRPC_TRACE_LOG(priority_lb, INFO)
314 << "[priority_lb " << this << "] exiting IDLE for current priority "
315 << current_priority_ << " child " << child_name;
316 children_[child_name]->ExitIdleLocked();
317 }
318 }
319
ResetBackoffLocked()320 void PriorityLb::ResetBackoffLocked() {
321 for (const auto& p : children_) p.second->ResetBackoffLocked();
322 }
323
UpdateLocked(UpdateArgs args)324 absl::Status PriorityLb::UpdateLocked(UpdateArgs args) {
325 GRPC_TRACE_LOG(priority_lb, INFO)
326 << "[priority_lb " << this << "] received update";
327 // Update config.
328 config_ = args.config.TakeAsSubclass<PriorityLbConfig>();
329 // Update args.
330 args_ = std::move(args.args);
331 // Update addresses.
332 addresses_ = MakeHierarchicalAddressMap(args.addresses);
333 resolution_note_ = std::move(args.resolution_note);
334 // Check all existing children against the new config.
335 update_in_progress_ = true;
336 std::vector<std::string> errors;
337 for (const auto& p : children_) {
338 const std::string& child_name = p.first;
339 auto& child = p.second;
340 auto config_it = config_->children().find(child_name);
341 if (config_it == config_->children().end()) {
342 // Existing child not found in new config. Deactivate it.
343 child->MaybeDeactivateLocked();
344 } else {
345 // Existing child found in new config. Update it.
346 absl::Status status =
347 child->UpdateLocked(config_it->second.config,
348 config_it->second.ignore_reresolution_requests);
349 if (!status.ok()) {
350 errors.emplace_back(
351 absl::StrCat("child ", child_name, ": ", status.ToString()));
352 }
353 }
354 }
355 update_in_progress_ = false;
356 // Try to get connected.
357 ChoosePriorityLocked();
358 // Return status.
359 if (!errors.empty()) {
360 return absl::UnavailableError(absl::StrCat(
361 "errors from children: [", absl::StrJoin(errors, "; "), "]"));
362 }
363 return absl::OkStatus();
364 }
365
GetChildPriorityLocked(const std::string & child_name) const366 uint32_t PriorityLb::GetChildPriorityLocked(
367 const std::string& child_name) const {
368 for (uint32_t priority = 0; priority < config_->priorities().size();
369 ++priority) {
370 if (config_->priorities()[priority] == child_name) return priority;
371 }
372 return UINT32_MAX;
373 }
374
DeleteChild(ChildPriority * child)375 void PriorityLb::DeleteChild(ChildPriority* child) {
376 children_.erase(child->name());
377 }
378
ChoosePriorityLocked()379 void PriorityLb::ChoosePriorityLocked() {
380 // If priority list is empty, report TF.
381 if (config_->priorities().empty()) {
382 absl::Status status =
383 absl::UnavailableError("priority policy has empty priority list");
384 channel_control_helper()->UpdateState(
385 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
386 MakeRefCounted<TransientFailurePicker>(status));
387 return;
388 }
389 // Iterate through priorities, searching for one in READY or IDLE,
390 // creating new children as needed.
391 current_priority_ = UINT32_MAX;
392 for (uint32_t priority = 0; priority < config_->priorities().size();
393 ++priority) {
394 // If the child for the priority does not exist yet, create it.
395 const std::string& child_name = config_->priorities()[priority];
396 GRPC_TRACE_LOG(priority_lb, INFO)
397 << "[priority_lb " << this << "] trying priority " << priority
398 << ", child " << child_name;
399 auto& child = children_[child_name];
400 // Create child if needed.
401 if (child == nullptr) {
402 child = MakeOrphanable<ChildPriority>(
403 RefAsSubclass<PriorityLb>(DEBUG_LOCATION, "ChildPriority"),
404 child_name);
405 auto child_config = config_->children().find(child_name);
406 DCHECK(child_config != config_->children().end());
407 // If the child policy returns a non-OK status, request re-resolution.
408 // Note that this will initially cause fixed backoff delay in the
409 // resolver instead of exponential delay. However, once the
410 // resolver returns the initial re-resolution, we will be able to
411 // return non-OK from UpdateLocked(), which will trigger
412 // exponential backoff instead.
413 absl::Status status = child->UpdateLocked(
414 child_config->second.config,
415 child_config->second.ignore_reresolution_requests);
416 if (!status.ok()) channel_control_helper()->RequestReresolution();
417 } else {
418 // The child already exists. Reactivate if needed.
419 child->MaybeReactivateLocked();
420 }
421 // Select this child if it is in states READY or IDLE.
422 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
423 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
424 SetCurrentPriorityLocked(
425 priority, /*deactivate_lower_priorities=*/true,
426 ConnectivityStateName(child->connectivity_state()));
427 return;
428 }
429 // Select this child if its failover timer is pending.
430 if (child->FailoverTimerPending()) {
431 SetCurrentPriorityLocked(priority, /*deactivate_lower_priorities=*/false,
432 "failover timer pending");
433 return;
434 }
435 // Child has been failing for a while. Move on to the next priority.
436 GRPC_TRACE_LOG(priority_lb, INFO)
437 << "[priority_lb " << this << "] skipping priority " << priority
438 << ", child " << child_name
439 << ": state=" << ConnectivityStateName(child->connectivity_state())
440 << ", failover timer not pending";
441 }
442 // If we didn't find any priority to try, pick the first one in state
443 // CONNECTING.
444 GRPC_TRACE_LOG(priority_lb, INFO)
445 << "[priority_lb " << this
446 << "] no priority reachable, checking for CONNECTING priority to "
447 "delegate to";
448 for (uint32_t priority = 0; priority < config_->priorities().size();
449 ++priority) {
450 // If the child for the priority does not exist yet, create it.
451 const std::string& child_name = config_->priorities()[priority];
452 GRPC_TRACE_LOG(priority_lb, INFO)
453 << "[priority_lb " << this << "] trying priority " << priority
454 << ", child " << child_name;
455 auto& child = children_[child_name];
456 CHECK(child != nullptr);
457 if (child->connectivity_state() == GRPC_CHANNEL_CONNECTING) {
458 SetCurrentPriorityLocked(priority, /*deactivate_lower_priorities=*/false,
459 "CONNECTING (pass 2)");
460 return;
461 }
462 }
463 // Did not find any child in CONNECTING, delegate to last child.
464 SetCurrentPriorityLocked(config_->priorities().size() - 1,
465 /*deactivate_lower_priorities=*/false,
466 "no usable children");
467 }
468
SetCurrentPriorityLocked(int32_t priority,bool deactivate_lower_priorities,const char * reason)469 void PriorityLb::SetCurrentPriorityLocked(int32_t priority,
470 bool deactivate_lower_priorities,
471 const char* reason) {
472 GRPC_TRACE_LOG(priority_lb, INFO)
473 << "[priority_lb " << this << "] selecting priority " << priority
474 << ", child " << config_->priorities()[priority] << " (" << reason
475 << ", deactivate_lower_priorities=" << deactivate_lower_priorities << ")";
476 current_priority_ = priority;
477 if (deactivate_lower_priorities) {
478 for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
479 const std::string& child_name = config_->priorities()[p];
480 auto it = children_.find(child_name);
481 if (it != children_.end()) it->second->MaybeDeactivateLocked();
482 }
483 }
484 auto& child = children_[config_->priorities()[priority]];
485 CHECK(child != nullptr);
486 channel_control_helper()->UpdateState(child->connectivity_state(),
487 child->connectivity_status(),
488 child->GetPicker());
489 }
490
491 //
492 // PriorityLb::ChildPriority::DeactivationTimer
493 //
494
DeactivationTimer(RefCountedPtr<PriorityLb::ChildPriority> child_priority)495 PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer(
496 RefCountedPtr<PriorityLb::ChildPriority> child_priority)
497 : child_priority_(std::move(child_priority)) {
498 GRPC_TRACE_LOG(priority_lb, INFO)
499 << "[priority_lb " << child_priority_->priority_policy_.get()
500 << "] child " << child_priority_->name_ << " (" << child_priority_.get()
501 << "): deactivating -- will remove in "
502 << kChildRetentionInterval.millis() << "ms";
503 timer_handle_ =
504 child_priority_->priority_policy_->channel_control_helper()
505 ->GetEventEngine()
506 ->RunAfter(kChildRetentionInterval, [self = Ref(DEBUG_LOCATION,
507 "Timer")]() mutable {
508 ApplicationCallbackExecCtx callback_exec_ctx;
509 ExecCtx exec_ctx;
510 auto self_ptr = self.get();
511 self_ptr->child_priority_->priority_policy_->work_serializer()->Run(
512 [self = std::move(self)]() { self->OnTimerLocked(); },
513 DEBUG_LOCATION);
514 });
515 }
516
Orphan()517 void PriorityLb::ChildPriority::DeactivationTimer::Orphan() {
518 if (timer_handle_.has_value()) {
519 GRPC_TRACE_LOG(priority_lb, INFO)
520 << "[priority_lb " << child_priority_->priority_policy_.get()
521 << "] child " << child_priority_->name_ << " (" << child_priority_.get()
522 << "): reactivating";
523 child_priority_->priority_policy_->channel_control_helper()
524 ->GetEventEngine()
525 ->Cancel(*timer_handle_);
526 timer_handle_.reset();
527 }
528 Unref();
529 }
530
OnTimerLocked()531 void PriorityLb::ChildPriority::DeactivationTimer::OnTimerLocked() {
532 if (timer_handle_.has_value()) {
533 timer_handle_.reset();
534 GRPC_TRACE_LOG(priority_lb, INFO)
535 << "[priority_lb " << child_priority_->priority_policy_.get()
536 << "] child " << child_priority_->name_ << " (" << child_priority_.get()
537 << "): deactivation timer fired, deleting child";
538 child_priority_->priority_policy_->DeleteChild(child_priority_.get());
539 }
540 }
541
542 //
543 // PriorityLb::ChildPriority::FailoverTimer
544 //
545
FailoverTimer(RefCountedPtr<PriorityLb::ChildPriority> child_priority)546 PriorityLb::ChildPriority::FailoverTimer::FailoverTimer(
547 RefCountedPtr<PriorityLb::ChildPriority> child_priority)
548 : child_priority_(std::move(child_priority)) {
549 GRPC_TRACE_LOG(priority_lb, INFO)
550 << "[priority_lb " << child_priority_->priority_policy_.get()
551 << "] child " << child_priority_->name_ << " (" << child_priority_.get()
552 << "): starting failover timer for "
553 << child_priority_->priority_policy_->child_failover_timeout_.millis()
554 << "ms";
555 timer_handle_ =
556 child_priority_->priority_policy_->channel_control_helper()
557 ->GetEventEngine()
558 ->RunAfter(
559 child_priority_->priority_policy_->child_failover_timeout_,
560 [self = Ref(DEBUG_LOCATION, "Timer")]() mutable {
561 ApplicationCallbackExecCtx callback_exec_ctx;
562 ExecCtx exec_ctx;
563 auto self_ptr = self.get();
564 self_ptr->child_priority_->priority_policy_->work_serializer()
565 ->Run([self = std::move(self)]() { self->OnTimerLocked(); },
566 DEBUG_LOCATION);
567 });
568 }
569
Orphan()570 void PriorityLb::ChildPriority::FailoverTimer::Orphan() {
571 if (timer_handle_.has_value()) {
572 GRPC_TRACE_LOG(priority_lb, INFO)
573 << "[priority_lb " << child_priority_->priority_policy_.get()
574 << "] child " << child_priority_->name_ << " (" << child_priority_.get()
575 << "): cancelling failover timer";
576 child_priority_->priority_policy_->channel_control_helper()
577 ->GetEventEngine()
578 ->Cancel(*timer_handle_);
579 timer_handle_.reset();
580 }
581 Unref();
582 }
583
OnTimerLocked()584 void PriorityLb::ChildPriority::FailoverTimer::OnTimerLocked() {
585 if (timer_handle_.has_value()) {
586 timer_handle_.reset();
587 GRPC_TRACE_LOG(priority_lb, INFO)
588 << "[priority_lb " << child_priority_->priority_policy_.get()
589 << "] child " << child_priority_->name_ << " (" << child_priority_.get()
590 << "): failover timer fired, reporting TRANSIENT_FAILURE";
591 child_priority_->OnConnectivityStateUpdateLocked(
592 GRPC_CHANNEL_TRANSIENT_FAILURE,
593 absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
594 nullptr);
595 }
596 }
597
598 //
599 // PriorityLb::ChildPriority
600 //
601
ChildPriority(RefCountedPtr<PriorityLb> priority_policy,std::string name)602 PriorityLb::ChildPriority::ChildPriority(
603 RefCountedPtr<PriorityLb> priority_policy, std::string name)
604 : priority_policy_(std::move(priority_policy)), name_(std::move(name)) {
605 GRPC_TRACE_LOG(priority_lb, INFO)
606 << "[priority_lb " << priority_policy_.get() << "] creating child "
607 << name_ << " (" << this << ")";
608 // Start the failover timer.
609 failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
610 }
611
Orphan()612 void PriorityLb::ChildPriority::Orphan() {
613 GRPC_TRACE_LOG(priority_lb, INFO)
614 << "[priority_lb " << priority_policy_.get() << "] child " << name_
615 << " (" << this << "): orphaned";
616 failover_timer_.reset();
617 deactivation_timer_.reset();
618 // Remove the child policy's interested_parties pollset_set from the
619 // xDS policy.
620 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
621 priority_policy_->interested_parties());
622 child_policy_.reset();
623 // Drop our ref to the child's picker, in case it's holding a ref to
624 // the child.
625 picker_.reset();
626 Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
627 }
628
629 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
GetPicker()630 PriorityLb::ChildPriority::GetPicker() {
631 if (picker_ == nullptr) {
632 return MakeRefCounted<QueuePicker>(
633 priority_policy_->Ref(DEBUG_LOCATION, "QueuePicker"));
634 }
635 return picker_;
636 }
637
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,bool ignore_reresolution_requests)638 absl::Status PriorityLb::ChildPriority::UpdateLocked(
639 RefCountedPtr<LoadBalancingPolicy::Config> config,
640 bool ignore_reresolution_requests) {
641 if (priority_policy_->shutting_down_) return absl::OkStatus();
642 GRPC_TRACE_LOG(priority_lb, INFO)
643 << "[priority_lb " << priority_policy_.get() << "] child " << name_
644 << " (" << this << "): start update";
645 ignore_reresolution_requests_ = ignore_reresolution_requests;
646 // Create policy if needed.
647 if (child_policy_ == nullptr) {
648 child_policy_ = CreateChildPolicyLocked(priority_policy_->args_);
649 }
650 // Construct update args.
651 UpdateArgs update_args;
652 update_args.config = std::move(config);
653 if (priority_policy_->addresses_.ok()) {
654 auto it = priority_policy_->addresses_->find(name_);
655 if (it == priority_policy_->addresses_->end()) {
656 update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
657 EndpointAddressesList());
658 } else {
659 update_args.addresses = it->second;
660 }
661 } else {
662 update_args.addresses = priority_policy_->addresses_.status();
663 }
664 update_args.resolution_note = priority_policy_->resolution_note_;
665 update_args.args = priority_policy_->args_;
666 // Update the policy.
667 GRPC_TRACE_LOG(priority_lb, INFO)
668 << "[priority_lb " << priority_policy_.get() << "] child " << name_
669 << " (" << this << "): updating child policy handler "
670 << child_policy_.get();
671 return child_policy_->UpdateLocked(std::move(update_args));
672 }
673
674 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const ChannelArgs & args)675 PriorityLb::ChildPriority::CreateChildPolicyLocked(const ChannelArgs& args) {
676 LoadBalancingPolicy::Args lb_policy_args;
677 lb_policy_args.work_serializer = priority_policy_->work_serializer();
678 lb_policy_args.args = args;
679 lb_policy_args.channel_control_helper =
680 std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
681 OrphanablePtr<LoadBalancingPolicy> lb_policy =
682 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
683 &priority_lb_trace);
684 GRPC_TRACE_LOG(priority_lb, INFO)
685 << "[priority_lb " << priority_policy_.get() << "] child " << name_
686 << " (" << this << "): created new child policy handler "
687 << lb_policy.get();
688 // Add the parent's interested_parties pollset_set to that of the newly
689 // created child policy. This will make the child policy progress upon
690 // activity on the parent LB, which in turn is tied to the application's call.
691 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
692 priority_policy_->interested_parties());
693 return lb_policy;
694 }
695
ExitIdleLocked()696 void PriorityLb::ChildPriority::ExitIdleLocked() {
697 child_policy_->ExitIdleLocked();
698 }
699
ResetBackoffLocked()700 void PriorityLb::ChildPriority::ResetBackoffLocked() {
701 child_policy_->ResetBackoffLocked();
702 }
703
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)704 void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
705 grpc_connectivity_state state, const absl::Status& status,
706 RefCountedPtr<SubchannelPicker> picker) {
707 GRPC_TRACE_LOG(priority_lb, INFO)
708 << "[priority_lb " << priority_policy_.get() << "] child " << name_
709 << " (" << this << "): state update: " << ConnectivityStateName(state)
710 << " (" << status << ") picker " << picker.get();
711 // Store the state and picker.
712 connectivity_state_ = state;
713 connectivity_status_ = status;
714 // When the failover timer fires, this method will be called with picker
715 // set to null, because we want to consider the child to be in
716 // TRANSIENT_FAILURE, but we have no new picker to report. In that case,
717 // just keep using the old picker, in case we wind up delegating to this
718 // child when all priorities are failing.
719 if (picker != nullptr) picker_ = std::move(picker);
720 // If we transition to state CONNECTING and we've not seen
721 // TRANSIENT_FAILURE more recently than READY or IDLE, start failover
722 // timer if not already pending.
723 // In any other state, update seen_ready_or_idle_since_transient_failure_
724 // and cancel failover timer.
725 if (state == GRPC_CHANNEL_CONNECTING) {
726 if (seen_ready_or_idle_since_transient_failure_ &&
727 failover_timer_ == nullptr) {
728 failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
729 }
730 } else if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE) {
731 seen_ready_or_idle_since_transient_failure_ = true;
732 failover_timer_.reset();
733 } else if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
734 seen_ready_or_idle_since_transient_failure_ = false;
735 failover_timer_.reset();
736 }
737 // Call the LB policy's ChoosePriorityLocked() to choose a priority to
738 // use based on the updated state of this child.
739 //
740 // Note that if we're in the process of propagating an update from our
741 // parent to our children, we skip this, because we don't want to
742 // choose a new priority based on inconsistent state. Instead, the
743 // policy will choose a new priority once the update has been seen by
744 // all children.
745 if (!priority_policy_->update_in_progress_) {
746 priority_policy_->ChoosePriorityLocked();
747 }
748 }
749
MaybeDeactivateLocked()750 void PriorityLb::ChildPriority::MaybeDeactivateLocked() {
751 if (deactivation_timer_ == nullptr) {
752 deactivation_timer_ = MakeOrphanable<DeactivationTimer>(Ref());
753 }
754 }
755
MaybeReactivateLocked()756 void PriorityLb::ChildPriority::MaybeReactivateLocked() {
757 deactivation_timer_.reset();
758 }
759
760 //
761 // PriorityLb::ChildPriority::Helper
762 //
763
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)764 void PriorityLb::ChildPriority::Helper::UpdateState(
765 grpc_connectivity_state state, const absl::Status& status,
766 RefCountedPtr<SubchannelPicker> picker) {
767 if (priority_->priority_policy_->shutting_down_) return;
768 // Notify the priority.
769 priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker));
770 }
771
RequestReresolution()772 void PriorityLb::ChildPriority::Helper::RequestReresolution() {
773 if (priority_->priority_policy_->shutting_down_) return;
774 if (priority_->ignore_reresolution_requests_) {
775 return;
776 }
777 priority_->priority_policy_->channel_control_helper()->RequestReresolution();
778 }
779
780 //
781 // factory
782 //
783
JsonLoader(const JsonArgs &)784 const JsonLoaderInterface* PriorityLbConfig::PriorityLbChild::JsonLoader(
785 const JsonArgs&) {
786 static const auto* loader =
787 JsonObjectLoader<PriorityLbChild>()
788 // Note: The "config" field requires custom parsing, so it's
789 // handled in JsonPostLoad() instead of here.
790 .OptionalField("ignore_reresolution_requests",
791 &PriorityLbChild::ignore_reresolution_requests)
792 .Finish();
793 return loader;
794 }
795
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)796 void PriorityLbConfig::PriorityLbChild::JsonPostLoad(const Json& json,
797 const JsonArgs&,
798 ValidationErrors* errors) {
799 ValidationErrors::ScopedField field(errors, ".config");
800 auto it = json.object().find("config");
801 if (it == json.object().end()) {
802 errors->AddError("field not present");
803 return;
804 }
805 auto lb_config =
806 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
807 it->second);
808 if (!lb_config.ok()) {
809 errors->AddError(lb_config.status().message());
810 return;
811 }
812 config = std::move(*lb_config);
813 }
814
JsonLoader(const JsonArgs &)815 const JsonLoaderInterface* PriorityLbConfig::JsonLoader(const JsonArgs&) {
816 static const auto* loader =
817 JsonObjectLoader<PriorityLbConfig>()
818 .Field("children", &PriorityLbConfig::children_)
819 .Field("priorities", &PriorityLbConfig::priorities_)
820 .Finish();
821 return loader;
822 }
823
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)824 void PriorityLbConfig::JsonPostLoad(const Json& /*json*/, const JsonArgs&,
825 ValidationErrors* errors) {
826 std::set<std::string> unknown_priorities;
827 for (const std::string& priority : priorities_) {
828 if (children_.find(priority) == children_.end()) {
829 unknown_priorities.insert(priority);
830 }
831 }
832 if (!unknown_priorities.empty()) {
833 errors->AddError(absl::StrCat("unknown priorit(ies): [",
834 absl::StrJoin(unknown_priorities, ", "),
835 "]"));
836 }
837 }
838
839 class PriorityLbFactory final : public LoadBalancingPolicyFactory {
840 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const841 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
842 LoadBalancingPolicy::Args args) const override {
843 return MakeOrphanable<PriorityLb>(std::move(args));
844 }
845
name() const846 absl::string_view name() const override { return kPriority; }
847
848 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const849 ParseLoadBalancingConfig(const Json& json) const override {
850 return LoadFromJson<RefCountedPtr<PriorityLbConfig>>(
851 json, JsonArgs(), "errors validating priority LB policy config");
852 }
853 };
854
855 } // namespace
856
RegisterPriorityLbPolicy(CoreConfiguration::Builder * builder)857 void RegisterPriorityLbPolicy(CoreConfiguration::Builder* builder) {
858 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
859 std::make_unique<PriorityLbFactory>());
860 }
861
862 } // namespace grpc_core
863