• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/impl/channel_arg_names.h>
19 #include <grpc/impl/connectivity_state.h>
20 #include <grpc/support/port_platform.h>
21 #include <inttypes.h>
22 
23 #include <algorithm>
24 #include <map>
25 #include <memory>
26 #include <set>
27 #include <string>
28 #include <type_traits>
29 #include <utility>
30 #include <vector>
31 
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "absl/status/status.h"
35 #include "absl/status/statusor.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/str_join.h"
38 #include "absl/strings/string_view.h"
39 #include "absl/types/optional.h"
40 #include "src/core/config/core_configuration.h"
41 #include "src/core/lib/channel/channel_args.h"
42 #include "src/core/lib/debug/trace.h"
43 #include "src/core/lib/iomgr/exec_ctx.h"
44 #include "src/core/lib/iomgr/pollset_set.h"
45 #include "src/core/lib/transport/connectivity_state.h"
46 #include "src/core/load_balancing/address_filtering.h"
47 #include "src/core/load_balancing/child_policy_handler.h"
48 #include "src/core/load_balancing/delegating_helper.h"
49 #include "src/core/load_balancing/lb_policy.h"
50 #include "src/core/load_balancing/lb_policy_factory.h"
51 #include "src/core/load_balancing/lb_policy_registry.h"
52 #include "src/core/resolver/endpoint_addresses.h"
53 #include "src/core/util/debug_location.h"
54 #include "src/core/util/json/json.h"
55 #include "src/core/util/json/json_args.h"
56 #include "src/core/util/json/json_object_loader.h"
57 #include "src/core/util/orphanable.h"
58 #include "src/core/util/ref_counted_ptr.h"
59 #include "src/core/util/ref_counted_string.h"
60 #include "src/core/util/time.h"
61 #include "src/core/util/validation_errors.h"
62 #include "src/core/util/work_serializer.h"
63 
64 namespace grpc_core {
65 
66 namespace {
67 
68 using ::grpc_event_engine::experimental::EventEngine;
69 
70 constexpr absl::string_view kPriority = "priority_experimental";
71 
72 // How long we keep a child around for after it is no longer being used
73 // (either because it has been removed from the config or because we
74 // have switched to a higher-priority child).
75 constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
76 
77 // Default for how long we wait for a newly created child to get connected
78 // before starting to attempt the next priority.  Overridable via channel arg.
79 constexpr Duration kDefaultChildFailoverTimeout = Duration::Seconds(10);
80 
81 // Config for priority LB policy.
82 class PriorityLbConfig final : public LoadBalancingPolicy::Config {
83  public:
84   struct PriorityLbChild {
85     RefCountedPtr<LoadBalancingPolicy::Config> config;
86     bool ignore_reresolution_requests = false;
87 
88     static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
89     void JsonPostLoad(const Json& json, const JsonArgs&,
90                       ValidationErrors* errors);
91   };
92 
93   PriorityLbConfig() = default;
94 
95   PriorityLbConfig(const PriorityLbConfig&) = delete;
96   PriorityLbConfig& operator=(const PriorityLbConfig&) = delete;
97 
98   PriorityLbConfig(PriorityLbConfig&& other) = delete;
99   PriorityLbConfig& operator=(PriorityLbConfig&& other) = delete;
100 
name() const101   absl::string_view name() const override { return kPriority; }
102 
children() const103   const std::map<std::string, PriorityLbChild>& children() const {
104     return children_;
105   }
priorities() const106   const std::vector<std::string>& priorities() const { return priorities_; }
107 
108   static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
109   void JsonPostLoad(const Json& json, const JsonArgs&,
110                     ValidationErrors* errors);
111 
112  private:
113   std::map<std::string, PriorityLbChild> children_;
114   std::vector<std::string> priorities_;
115 };
116 
117 // priority LB policy.
118 class PriorityLb final : public LoadBalancingPolicy {
119  public:
120   explicit PriorityLb(Args args);
121 
name() const122   absl::string_view name() const override { return kPriority; }
123 
124   absl::Status UpdateLocked(UpdateArgs args) override;
125   void ExitIdleLocked() override;
126   void ResetBackoffLocked() override;
127 
128  private:
129   // Each ChildPriority holds a ref to the PriorityLb.
130   class ChildPriority final : public InternallyRefCounted<ChildPriority> {
131    public:
132     ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name);
133 
~ChildPriority()134     ~ChildPriority() override {
135       priority_policy_.reset(DEBUG_LOCATION, "ChildPriority");
136     }
137 
name() const138     const std::string& name() const { return name_; }
139 
140     absl::Status UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
141                               bool ignore_reresolution_requests);
142     void ExitIdleLocked();
143     void ResetBackoffLocked();
144     void MaybeDeactivateLocked();
145     void MaybeReactivateLocked();
146 
147     void Orphan() override;
148 
149     RefCountedPtr<SubchannelPicker> GetPicker();
150 
connectivity_state() const151     grpc_connectivity_state connectivity_state() const {
152       return connectivity_state_;
153     }
154 
connectivity_status() const155     const absl::Status& connectivity_status() const {
156       return connectivity_status_;
157     }
158 
FailoverTimerPending() const159     bool FailoverTimerPending() const { return failover_timer_ != nullptr; }
160 
161    private:
162     class Helper final : public DelegatingChannelControlHelper {
163      public:
Helper(RefCountedPtr<ChildPriority> priority)164       explicit Helper(RefCountedPtr<ChildPriority> priority)
165           : priority_(std::move(priority)) {}
166 
~Helper()167       ~Helper() override { priority_.reset(DEBUG_LOCATION, "Helper"); }
168 
169       void UpdateState(grpc_connectivity_state state,
170                        const absl::Status& status,
171                        RefCountedPtr<SubchannelPicker> picker) override;
172       void RequestReresolution() override;
173 
174      private:
parent_helper() const175       ChannelControlHelper* parent_helper() const override {
176         return priority_->priority_policy_->channel_control_helper();
177       }
178 
179       RefCountedPtr<ChildPriority> priority_;
180     };
181 
182     class DeactivationTimer final
183         : public InternallyRefCounted<DeactivationTimer> {
184      public:
185       explicit DeactivationTimer(RefCountedPtr<ChildPriority> child_priority);
186 
187       void Orphan() override;
188 
189      private:
190       void OnTimerLocked();
191 
192       RefCountedPtr<ChildPriority> child_priority_;
193       absl::optional<EventEngine::TaskHandle> timer_handle_;
194     };
195 
196     class FailoverTimer final : public InternallyRefCounted<FailoverTimer> {
197      public:
198       explicit FailoverTimer(RefCountedPtr<ChildPriority> child_priority);
199 
200       void Orphan() override;
201 
202      private:
203       void OnTimerLocked();
204 
205       RefCountedPtr<ChildPriority> child_priority_;
206       absl::optional<EventEngine::TaskHandle> timer_handle_;
207     };
208 
209     // Methods for dealing with the child policy.
210     OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
211         const ChannelArgs& args);
212 
213     void OnConnectivityStateUpdateLocked(
214         grpc_connectivity_state state, const absl::Status& status,
215         RefCountedPtr<SubchannelPicker> picker);
216 
217     RefCountedPtr<PriorityLb> priority_policy_;
218     const std::string name_;
219     bool ignore_reresolution_requests_ = false;
220 
221     OrphanablePtr<LoadBalancingPolicy> child_policy_;
222 
223     grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
224     absl::Status connectivity_status_;
225     RefCountedPtr<SubchannelPicker> picker_;
226 
227     bool seen_ready_or_idle_since_transient_failure_ = true;
228 
229     OrphanablePtr<DeactivationTimer> deactivation_timer_;
230     OrphanablePtr<FailoverTimer> failover_timer_;
231   };
232 
233   ~PriorityLb() override;
234 
235   void ShutdownLocked() override;
236 
237   // Returns the priority of the specified child name, or UINT32_MAX if
238   // the child is not in the current priority list.
239   uint32_t GetChildPriorityLocked(const std::string& child_name) const;
240 
241   // Deletes a child.  Called when the child's deactivation timer fires.
242   void DeleteChild(ChildPriority* child);
243 
244   // Iterates through the list of priorities to choose one:
245   // - If the child for a priority doesn't exist, creates it.
246   // - If a child's failover timer is pending, selects that priority
247   //   while we wait for the child to attempt to connect.
248   // - If the child is connected, selects that priority.
249   // - Otherwise, continues on to the next child.
250   // Delegates to the last child if none of the children are connecting.
251   // Reports TRANSIENT_FAILURE if the priority list is empty.
252   //
253   // This method is idempotent; it should yield the same result every
254   // time as a function of the state of the children.
255   void ChoosePriorityLocked();
256 
257   // Sets the specified priority as the current priority.
258   // Optionally deactivates any children at lower priorities.
259   // Returns the child's picker to the channel.
260   void SetCurrentPriorityLocked(int32_t priority,
261                                 bool deactivate_lower_priorities,
262                                 const char* reason);
263 
264   const Duration child_failover_timeout_;
265 
266   // Current channel args and config from the resolver.
267   ChannelArgs args_;
268   RefCountedPtr<PriorityLbConfig> config_;
269   absl::StatusOr<HierarchicalAddressMap> addresses_;
270   std::string resolution_note_;
271 
272   // Internal state.
273   bool shutting_down_ = false;
274 
275   bool update_in_progress_ = false;
276 
277   // All children that currently exist.
278   // Some of these children may be in deactivated state.
279   std::map<std::string, OrphanablePtr<ChildPriority>> children_;
280   // The priority that is being used.
281   uint32_t current_priority_ = UINT32_MAX;
282 };
283 
284 //
285 // PriorityLb
286 //
287 
PriorityLb(Args args)288 PriorityLb::PriorityLb(Args args)
289     : LoadBalancingPolicy(std::move(args)),
290       child_failover_timeout_(std::max(
291           Duration::Zero(),
292           channel_args()
293               .GetDurationFromIntMillis(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS)
294               .value_or(kDefaultChildFailoverTimeout))) {
295   GRPC_TRACE_LOG(priority_lb, INFO) << "[priority_lb " << this << "] created";
296 }
297 
~PriorityLb()298 PriorityLb::~PriorityLb() {
299   GRPC_TRACE_LOG(priority_lb, INFO)
300       << "[priority_lb " << this << "] destroying priority LB policy";
301 }
302 
ShutdownLocked()303 void PriorityLb::ShutdownLocked() {
304   GRPC_TRACE_LOG(priority_lb, INFO)
305       << "[priority_lb " << this << "] shutting down";
306   shutting_down_ = true;
307   children_.clear();
308 }
309 
ExitIdleLocked()310 void PriorityLb::ExitIdleLocked() {
311   if (current_priority_ != UINT32_MAX) {
312     const std::string& child_name = config_->priorities()[current_priority_];
313     GRPC_TRACE_LOG(priority_lb, INFO)
314         << "[priority_lb " << this << "] exiting IDLE for current priority "
315         << current_priority_ << " child " << child_name;
316     children_[child_name]->ExitIdleLocked();
317   }
318 }
319 
ResetBackoffLocked()320 void PriorityLb::ResetBackoffLocked() {
321   for (const auto& p : children_) p.second->ResetBackoffLocked();
322 }
323 
UpdateLocked(UpdateArgs args)324 absl::Status PriorityLb::UpdateLocked(UpdateArgs args) {
325   GRPC_TRACE_LOG(priority_lb, INFO)
326       << "[priority_lb " << this << "] received update";
327   // Update config.
328   config_ = args.config.TakeAsSubclass<PriorityLbConfig>();
329   // Update args.
330   args_ = std::move(args.args);
331   // Update addresses.
332   addresses_ = MakeHierarchicalAddressMap(args.addresses);
333   resolution_note_ = std::move(args.resolution_note);
334   // Check all existing children against the new config.
335   update_in_progress_ = true;
336   std::vector<std::string> errors;
337   for (const auto& p : children_) {
338     const std::string& child_name = p.first;
339     auto& child = p.second;
340     auto config_it = config_->children().find(child_name);
341     if (config_it == config_->children().end()) {
342       // Existing child not found in new config.  Deactivate it.
343       child->MaybeDeactivateLocked();
344     } else {
345       // Existing child found in new config.  Update it.
346       absl::Status status =
347           child->UpdateLocked(config_it->second.config,
348                               config_it->second.ignore_reresolution_requests);
349       if (!status.ok()) {
350         errors.emplace_back(
351             absl::StrCat("child ", child_name, ": ", status.ToString()));
352       }
353     }
354   }
355   update_in_progress_ = false;
356   // Try to get connected.
357   ChoosePriorityLocked();
358   // Return status.
359   if (!errors.empty()) {
360     return absl::UnavailableError(absl::StrCat(
361         "errors from children: [", absl::StrJoin(errors, "; "), "]"));
362   }
363   return absl::OkStatus();
364 }
365 
GetChildPriorityLocked(const std::string & child_name) const366 uint32_t PriorityLb::GetChildPriorityLocked(
367     const std::string& child_name) const {
368   for (uint32_t priority = 0; priority < config_->priorities().size();
369        ++priority) {
370     if (config_->priorities()[priority] == child_name) return priority;
371   }
372   return UINT32_MAX;
373 }
374 
DeleteChild(ChildPriority * child)375 void PriorityLb::DeleteChild(ChildPriority* child) {
376   children_.erase(child->name());
377 }
378 
ChoosePriorityLocked()379 void PriorityLb::ChoosePriorityLocked() {
380   // If priority list is empty, report TF.
381   if (config_->priorities().empty()) {
382     absl::Status status =
383         absl::UnavailableError("priority policy has empty priority list");
384     channel_control_helper()->UpdateState(
385         GRPC_CHANNEL_TRANSIENT_FAILURE, status,
386         MakeRefCounted<TransientFailurePicker>(status));
387     return;
388   }
389   // Iterate through priorities, searching for one in READY or IDLE,
390   // creating new children as needed.
391   current_priority_ = UINT32_MAX;
392   for (uint32_t priority = 0; priority < config_->priorities().size();
393        ++priority) {
394     // If the child for the priority does not exist yet, create it.
395     const std::string& child_name = config_->priorities()[priority];
396     GRPC_TRACE_LOG(priority_lb, INFO)
397         << "[priority_lb " << this << "] trying priority " << priority
398         << ", child " << child_name;
399     auto& child = children_[child_name];
400     // Create child if needed.
401     if (child == nullptr) {
402       child = MakeOrphanable<ChildPriority>(
403           RefAsSubclass<PriorityLb>(DEBUG_LOCATION, "ChildPriority"),
404           child_name);
405       auto child_config = config_->children().find(child_name);
406       DCHECK(child_config != config_->children().end());
407       // If the child policy returns a non-OK status, request re-resolution.
408       // Note that this will initially cause fixed backoff delay in the
409       // resolver instead of exponential delay.  However, once the
410       // resolver returns the initial re-resolution, we will be able to
411       // return non-OK from UpdateLocked(), which will trigger
412       // exponential backoff instead.
413       absl::Status status = child->UpdateLocked(
414           child_config->second.config,
415           child_config->second.ignore_reresolution_requests);
416       if (!status.ok()) channel_control_helper()->RequestReresolution();
417     } else {
418       // The child already exists.  Reactivate if needed.
419       child->MaybeReactivateLocked();
420     }
421     // Select this child if it is in states READY or IDLE.
422     if (child->connectivity_state() == GRPC_CHANNEL_READY ||
423         child->connectivity_state() == GRPC_CHANNEL_IDLE) {
424       SetCurrentPriorityLocked(
425           priority, /*deactivate_lower_priorities=*/true,
426           ConnectivityStateName(child->connectivity_state()));
427       return;
428     }
429     // Select this child if its failover timer is pending.
430     if (child->FailoverTimerPending()) {
431       SetCurrentPriorityLocked(priority, /*deactivate_lower_priorities=*/false,
432                                "failover timer pending");
433       return;
434     }
435     // Child has been failing for a while.  Move on to the next priority.
436     GRPC_TRACE_LOG(priority_lb, INFO)
437         << "[priority_lb " << this << "] skipping priority " << priority
438         << ", child " << child_name
439         << ": state=" << ConnectivityStateName(child->connectivity_state())
440         << ", failover timer not pending";
441   }
442   // If we didn't find any priority to try, pick the first one in state
443   // CONNECTING.
444   GRPC_TRACE_LOG(priority_lb, INFO)
445       << "[priority_lb " << this
446       << "] no priority reachable, checking for CONNECTING priority to "
447          "delegate to";
448   for (uint32_t priority = 0; priority < config_->priorities().size();
449        ++priority) {
450     // If the child for the priority does not exist yet, create it.
451     const std::string& child_name = config_->priorities()[priority];
452     GRPC_TRACE_LOG(priority_lb, INFO)
453         << "[priority_lb " << this << "] trying priority " << priority
454         << ", child " << child_name;
455     auto& child = children_[child_name];
456     CHECK(child != nullptr);
457     if (child->connectivity_state() == GRPC_CHANNEL_CONNECTING) {
458       SetCurrentPriorityLocked(priority, /*deactivate_lower_priorities=*/false,
459                                "CONNECTING (pass 2)");
460       return;
461     }
462   }
463   // Did not find any child in CONNECTING, delegate to last child.
464   SetCurrentPriorityLocked(config_->priorities().size() - 1,
465                            /*deactivate_lower_priorities=*/false,
466                            "no usable children");
467 }
468 
SetCurrentPriorityLocked(int32_t priority,bool deactivate_lower_priorities,const char * reason)469 void PriorityLb::SetCurrentPriorityLocked(int32_t priority,
470                                           bool deactivate_lower_priorities,
471                                           const char* reason) {
472   GRPC_TRACE_LOG(priority_lb, INFO)
473       << "[priority_lb " << this << "] selecting priority " << priority
474       << ", child " << config_->priorities()[priority] << " (" << reason
475       << ", deactivate_lower_priorities=" << deactivate_lower_priorities << ")";
476   current_priority_ = priority;
477   if (deactivate_lower_priorities) {
478     for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
479       const std::string& child_name = config_->priorities()[p];
480       auto it = children_.find(child_name);
481       if (it != children_.end()) it->second->MaybeDeactivateLocked();
482     }
483   }
484   auto& child = children_[config_->priorities()[priority]];
485   CHECK(child != nullptr);
486   channel_control_helper()->UpdateState(child->connectivity_state(),
487                                         child->connectivity_status(),
488                                         child->GetPicker());
489 }
490 
491 //
492 // PriorityLb::ChildPriority::DeactivationTimer
493 //
494 
DeactivationTimer(RefCountedPtr<PriorityLb::ChildPriority> child_priority)495 PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer(
496     RefCountedPtr<PriorityLb::ChildPriority> child_priority)
497     : child_priority_(std::move(child_priority)) {
498   GRPC_TRACE_LOG(priority_lb, INFO)
499       << "[priority_lb " << child_priority_->priority_policy_.get()
500       << "] child " << child_priority_->name_ << " (" << child_priority_.get()
501       << "): deactivating -- will remove in "
502       << kChildRetentionInterval.millis() << "ms";
503   timer_handle_ =
504       child_priority_->priority_policy_->channel_control_helper()
505           ->GetEventEngine()
506           ->RunAfter(kChildRetentionInterval, [self = Ref(DEBUG_LOCATION,
507                                                           "Timer")]() mutable {
508             ApplicationCallbackExecCtx callback_exec_ctx;
509             ExecCtx exec_ctx;
510             auto self_ptr = self.get();
511             self_ptr->child_priority_->priority_policy_->work_serializer()->Run(
512                 [self = std::move(self)]() { self->OnTimerLocked(); },
513                 DEBUG_LOCATION);
514           });
515 }
516 
Orphan()517 void PriorityLb::ChildPriority::DeactivationTimer::Orphan() {
518   if (timer_handle_.has_value()) {
519     GRPC_TRACE_LOG(priority_lb, INFO)
520         << "[priority_lb " << child_priority_->priority_policy_.get()
521         << "] child " << child_priority_->name_ << " (" << child_priority_.get()
522         << "): reactivating";
523     child_priority_->priority_policy_->channel_control_helper()
524         ->GetEventEngine()
525         ->Cancel(*timer_handle_);
526     timer_handle_.reset();
527   }
528   Unref();
529 }
530 
OnTimerLocked()531 void PriorityLb::ChildPriority::DeactivationTimer::OnTimerLocked() {
532   if (timer_handle_.has_value()) {
533     timer_handle_.reset();
534     GRPC_TRACE_LOG(priority_lb, INFO)
535         << "[priority_lb " << child_priority_->priority_policy_.get()
536         << "] child " << child_priority_->name_ << " (" << child_priority_.get()
537         << "): deactivation timer fired, deleting child";
538     child_priority_->priority_policy_->DeleteChild(child_priority_.get());
539   }
540 }
541 
542 //
543 // PriorityLb::ChildPriority::FailoverTimer
544 //
545 
FailoverTimer(RefCountedPtr<PriorityLb::ChildPriority> child_priority)546 PriorityLb::ChildPriority::FailoverTimer::FailoverTimer(
547     RefCountedPtr<PriorityLb::ChildPriority> child_priority)
548     : child_priority_(std::move(child_priority)) {
549   GRPC_TRACE_LOG(priority_lb, INFO)
550       << "[priority_lb " << child_priority_->priority_policy_.get()
551       << "] child " << child_priority_->name_ << " (" << child_priority_.get()
552       << "): starting failover timer for "
553       << child_priority_->priority_policy_->child_failover_timeout_.millis()
554       << "ms";
555   timer_handle_ =
556       child_priority_->priority_policy_->channel_control_helper()
557           ->GetEventEngine()
558           ->RunAfter(
559               child_priority_->priority_policy_->child_failover_timeout_,
560               [self = Ref(DEBUG_LOCATION, "Timer")]() mutable {
561                 ApplicationCallbackExecCtx callback_exec_ctx;
562                 ExecCtx exec_ctx;
563                 auto self_ptr = self.get();
564                 self_ptr->child_priority_->priority_policy_->work_serializer()
565                     ->Run([self = std::move(self)]() { self->OnTimerLocked(); },
566                           DEBUG_LOCATION);
567               });
568 }
569 
Orphan()570 void PriorityLb::ChildPriority::FailoverTimer::Orphan() {
571   if (timer_handle_.has_value()) {
572     GRPC_TRACE_LOG(priority_lb, INFO)
573         << "[priority_lb " << child_priority_->priority_policy_.get()
574         << "] child " << child_priority_->name_ << " (" << child_priority_.get()
575         << "): cancelling failover timer";
576     child_priority_->priority_policy_->channel_control_helper()
577         ->GetEventEngine()
578         ->Cancel(*timer_handle_);
579     timer_handle_.reset();
580   }
581   Unref();
582 }
583 
OnTimerLocked()584 void PriorityLb::ChildPriority::FailoverTimer::OnTimerLocked() {
585   if (timer_handle_.has_value()) {
586     timer_handle_.reset();
587     GRPC_TRACE_LOG(priority_lb, INFO)
588         << "[priority_lb " << child_priority_->priority_policy_.get()
589         << "] child " << child_priority_->name_ << " (" << child_priority_.get()
590         << "): failover timer fired, reporting TRANSIENT_FAILURE";
591     child_priority_->OnConnectivityStateUpdateLocked(
592         GRPC_CHANNEL_TRANSIENT_FAILURE,
593         absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
594         nullptr);
595   }
596 }
597 
598 //
599 // PriorityLb::ChildPriority
600 //
601 
ChildPriority(RefCountedPtr<PriorityLb> priority_policy,std::string name)602 PriorityLb::ChildPriority::ChildPriority(
603     RefCountedPtr<PriorityLb> priority_policy, std::string name)
604     : priority_policy_(std::move(priority_policy)), name_(std::move(name)) {
605   GRPC_TRACE_LOG(priority_lb, INFO)
606       << "[priority_lb " << priority_policy_.get() << "] creating child "
607       << name_ << " (" << this << ")";
608   // Start the failover timer.
609   failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
610 }
611 
Orphan()612 void PriorityLb::ChildPriority::Orphan() {
613   GRPC_TRACE_LOG(priority_lb, INFO)
614       << "[priority_lb " << priority_policy_.get() << "] child " << name_
615       << " (" << this << "): orphaned";
616   failover_timer_.reset();
617   deactivation_timer_.reset();
618   // Remove the child policy's interested_parties pollset_set from the
619   // xDS policy.
620   grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
621                                    priority_policy_->interested_parties());
622   child_policy_.reset();
623   // Drop our ref to the child's picker, in case it's holding a ref to
624   // the child.
625   picker_.reset();
626   Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
627 }
628 
629 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
GetPicker()630 PriorityLb::ChildPriority::GetPicker() {
631   if (picker_ == nullptr) {
632     return MakeRefCounted<QueuePicker>(
633         priority_policy_->Ref(DEBUG_LOCATION, "QueuePicker"));
634   }
635   return picker_;
636 }
637 
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,bool ignore_reresolution_requests)638 absl::Status PriorityLb::ChildPriority::UpdateLocked(
639     RefCountedPtr<LoadBalancingPolicy::Config> config,
640     bool ignore_reresolution_requests) {
641   if (priority_policy_->shutting_down_) return absl::OkStatus();
642   GRPC_TRACE_LOG(priority_lb, INFO)
643       << "[priority_lb " << priority_policy_.get() << "] child " << name_
644       << " (" << this << "): start update";
645   ignore_reresolution_requests_ = ignore_reresolution_requests;
646   // Create policy if needed.
647   if (child_policy_ == nullptr) {
648     child_policy_ = CreateChildPolicyLocked(priority_policy_->args_);
649   }
650   // Construct update args.
651   UpdateArgs update_args;
652   update_args.config = std::move(config);
653   if (priority_policy_->addresses_.ok()) {
654     auto it = priority_policy_->addresses_->find(name_);
655     if (it == priority_policy_->addresses_->end()) {
656       update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
657           EndpointAddressesList());
658     } else {
659       update_args.addresses = it->second;
660     }
661   } else {
662     update_args.addresses = priority_policy_->addresses_.status();
663   }
664   update_args.resolution_note = priority_policy_->resolution_note_;
665   update_args.args = priority_policy_->args_;
666   // Update the policy.
667   GRPC_TRACE_LOG(priority_lb, INFO)
668       << "[priority_lb " << priority_policy_.get() << "] child " << name_
669       << " (" << this << "): updating child policy handler "
670       << child_policy_.get();
671   return child_policy_->UpdateLocked(std::move(update_args));
672 }
673 
674 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const ChannelArgs & args)675 PriorityLb::ChildPriority::CreateChildPolicyLocked(const ChannelArgs& args) {
676   LoadBalancingPolicy::Args lb_policy_args;
677   lb_policy_args.work_serializer = priority_policy_->work_serializer();
678   lb_policy_args.args = args;
679   lb_policy_args.channel_control_helper =
680       std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
681   OrphanablePtr<LoadBalancingPolicy> lb_policy =
682       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
683                                          &priority_lb_trace);
684   GRPC_TRACE_LOG(priority_lb, INFO)
685       << "[priority_lb " << priority_policy_.get() << "] child " << name_
686       << " (" << this << "): created new child policy handler "
687       << lb_policy.get();
688   // Add the parent's interested_parties pollset_set to that of the newly
689   // created child policy. This will make the child policy progress upon
690   // activity on the parent LB, which in turn is tied to the application's call.
691   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
692                                    priority_policy_->interested_parties());
693   return lb_policy;
694 }
695 
ExitIdleLocked()696 void PriorityLb::ChildPriority::ExitIdleLocked() {
697   child_policy_->ExitIdleLocked();
698 }
699 
ResetBackoffLocked()700 void PriorityLb::ChildPriority::ResetBackoffLocked() {
701   child_policy_->ResetBackoffLocked();
702 }
703 
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)704 void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
705     grpc_connectivity_state state, const absl::Status& status,
706     RefCountedPtr<SubchannelPicker> picker) {
707   GRPC_TRACE_LOG(priority_lb, INFO)
708       << "[priority_lb " << priority_policy_.get() << "] child " << name_
709       << " (" << this << "): state update: " << ConnectivityStateName(state)
710       << " (" << status << ") picker " << picker.get();
711   // Store the state and picker.
712   connectivity_state_ = state;
713   connectivity_status_ = status;
714   // When the failover timer fires, this method will be called with picker
715   // set to null, because we want to consider the child to be in
716   // TRANSIENT_FAILURE, but we have no new picker to report.  In that case,
717   // just keep using the old picker, in case we wind up delegating to this
718   // child when all priorities are failing.
719   if (picker != nullptr) picker_ = std::move(picker);
720   // If we transition to state CONNECTING and we've not seen
721   // TRANSIENT_FAILURE more recently than READY or IDLE, start failover
722   // timer if not already pending.
723   // In any other state, update seen_ready_or_idle_since_transient_failure_
724   // and cancel failover timer.
725   if (state == GRPC_CHANNEL_CONNECTING) {
726     if (seen_ready_or_idle_since_transient_failure_ &&
727         failover_timer_ == nullptr) {
728       failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
729     }
730   } else if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE) {
731     seen_ready_or_idle_since_transient_failure_ = true;
732     failover_timer_.reset();
733   } else if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
734     seen_ready_or_idle_since_transient_failure_ = false;
735     failover_timer_.reset();
736   }
737   // Call the LB policy's ChoosePriorityLocked() to choose a priority to
738   // use based on the updated state of this child.
739   //
740   // Note that if we're in the process of propagating an update from our
741   // parent to our children, we skip this, because we don't want to
742   // choose a new priority based on inconsistent state.  Instead, the
743   // policy will choose a new priority once the update has been seen by
744   // all children.
745   if (!priority_policy_->update_in_progress_) {
746     priority_policy_->ChoosePriorityLocked();
747   }
748 }
749 
MaybeDeactivateLocked()750 void PriorityLb::ChildPriority::MaybeDeactivateLocked() {
751   if (deactivation_timer_ == nullptr) {
752     deactivation_timer_ = MakeOrphanable<DeactivationTimer>(Ref());
753   }
754 }
755 
MaybeReactivateLocked()756 void PriorityLb::ChildPriority::MaybeReactivateLocked() {
757   deactivation_timer_.reset();
758 }
759 
760 //
761 // PriorityLb::ChildPriority::Helper
762 //
763 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)764 void PriorityLb::ChildPriority::Helper::UpdateState(
765     grpc_connectivity_state state, const absl::Status& status,
766     RefCountedPtr<SubchannelPicker> picker) {
767   if (priority_->priority_policy_->shutting_down_) return;
768   // Notify the priority.
769   priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker));
770 }
771 
RequestReresolution()772 void PriorityLb::ChildPriority::Helper::RequestReresolution() {
773   if (priority_->priority_policy_->shutting_down_) return;
774   if (priority_->ignore_reresolution_requests_) {
775     return;
776   }
777   priority_->priority_policy_->channel_control_helper()->RequestReresolution();
778 }
779 
780 //
781 // factory
782 //
783 
JsonLoader(const JsonArgs &)784 const JsonLoaderInterface* PriorityLbConfig::PriorityLbChild::JsonLoader(
785     const JsonArgs&) {
786   static const auto* loader =
787       JsonObjectLoader<PriorityLbChild>()
788           // Note: The "config" field requires custom parsing, so it's
789           // handled in JsonPostLoad() instead of here.
790           .OptionalField("ignore_reresolution_requests",
791                          &PriorityLbChild::ignore_reresolution_requests)
792           .Finish();
793   return loader;
794 }
795 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)796 void PriorityLbConfig::PriorityLbChild::JsonPostLoad(const Json& json,
797                                                      const JsonArgs&,
798                                                      ValidationErrors* errors) {
799   ValidationErrors::ScopedField field(errors, ".config");
800   auto it = json.object().find("config");
801   if (it == json.object().end()) {
802     errors->AddError("field not present");
803     return;
804   }
805   auto lb_config =
806       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
807           it->second);
808   if (!lb_config.ok()) {
809     errors->AddError(lb_config.status().message());
810     return;
811   }
812   config = std::move(*lb_config);
813 }
814 
JsonLoader(const JsonArgs &)815 const JsonLoaderInterface* PriorityLbConfig::JsonLoader(const JsonArgs&) {
816   static const auto* loader =
817       JsonObjectLoader<PriorityLbConfig>()
818           .Field("children", &PriorityLbConfig::children_)
819           .Field("priorities", &PriorityLbConfig::priorities_)
820           .Finish();
821   return loader;
822 }
823 
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)824 void PriorityLbConfig::JsonPostLoad(const Json& /*json*/, const JsonArgs&,
825                                     ValidationErrors* errors) {
826   std::set<std::string> unknown_priorities;
827   for (const std::string& priority : priorities_) {
828     if (children_.find(priority) == children_.end()) {
829       unknown_priorities.insert(priority);
830     }
831   }
832   if (!unknown_priorities.empty()) {
833     errors->AddError(absl::StrCat("unknown priorit(ies): [",
834                                   absl::StrJoin(unknown_priorities, ", "),
835                                   "]"));
836   }
837 }
838 
839 class PriorityLbFactory final : public LoadBalancingPolicyFactory {
840  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const841   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
842       LoadBalancingPolicy::Args args) const override {
843     return MakeOrphanable<PriorityLb>(std::move(args));
844   }
845 
name() const846   absl::string_view name() const override { return kPriority; }
847 
848   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const849   ParseLoadBalancingConfig(const Json& json) const override {
850     return LoadFromJson<RefCountedPtr<PriorityLbConfig>>(
851         json, JsonArgs(), "errors validating priority LB policy config");
852   }
853 };
854 
855 }  // namespace
856 
RegisterPriorityLbPolicy(CoreConfiguration::Builder * builder)857 void RegisterPriorityLbPolicy(CoreConfiguration::Builder* builder) {
858   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
859       std::make_unique<PriorityLbFactory>());
860 }
861 
862 }  // namespace grpc_core
863