• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <inttypes.h>
20 #include <limits.h>
21 
22 #include "absl/strings/str_cat.h"
23 #include "absl/strings/str_format.h"
24 
25 #include <grpc/grpc.h>
26 
27 #include "src/core/ext/filters/client_channel/lb_policy.h"
28 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
30 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
31 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
32 #include "src/core/lib/channel/channel_args.h"
33 #include "src/core/lib/gprpp/orphanable.h"
34 #include "src/core/lib/gprpp/ref_counted_ptr.h"
35 #include "src/core/lib/iomgr/timer.h"
36 #include "src/core/lib/iomgr/work_serializer.h"
37 #include "src/core/lib/transport/error_utils.h"
38 
39 namespace grpc_core {
40 
41 TraceFlag grpc_lb_priority_trace(false, "priority_lb");
42 
43 namespace {
44 
45 constexpr char kPriority[] = "priority_experimental";
46 
47 // How long we keep a child around for after it is no longer being used
48 // (either because it has been removed from the config or because we
49 // have switched to a higher-priority child).
50 constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000;
51 
52 // Default for how long we wait for a newly created child to get connected
53 // before starting to attempt the next priority.  Overridable via channel arg.
54 constexpr int kDefaultChildFailoverTimeoutMs = 10000;
55 
56 // Config for priority LB policy.
57 class PriorityLbConfig : public LoadBalancingPolicy::Config {
58  public:
59   struct PriorityLbChild {
60     RefCountedPtr<LoadBalancingPolicy::Config> config;
61     bool ignore_reresolution_requests = false;
62   };
63 
PriorityLbConfig(std::map<std::string,PriorityLbChild> children,std::vector<std::string> priorities)64   PriorityLbConfig(std::map<std::string, PriorityLbChild> children,
65                    std::vector<std::string> priorities)
66       : children_(std::move(children)), priorities_(std::move(priorities)) {}
67 
name() const68   const char* name() const override { return kPriority; }
69 
children() const70   const std::map<std::string, PriorityLbChild>& children() const {
71     return children_;
72   }
priorities() const73   const std::vector<std::string>& priorities() const { return priorities_; }
74 
75  private:
76   const std::map<std::string, PriorityLbChild> children_;
77   const std::vector<std::string> priorities_;
78 };
79 
80 // priority LB policy.
81 class PriorityLb : public LoadBalancingPolicy {
82  public:
83   explicit PriorityLb(Args args);
84 
name() const85   const char* name() const override { return kPriority; }
86 
87   void UpdateLocked(UpdateArgs args) override;
88   void ExitIdleLocked() override;
89   void ResetBackoffLocked() override;
90 
91  private:
92   // Each ChildPriority holds a ref to the PriorityLb.
93   class ChildPriority : public InternallyRefCounted<ChildPriority> {
94    public:
95     ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name);
96 
~ChildPriority()97     ~ChildPriority() override {
98       priority_policy_.reset(DEBUG_LOCATION, "ChildPriority");
99     }
100 
name() const101     const std::string& name() const { return name_; }
102 
103     void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
104                       bool ignore_reresolution_requests);
105     void ExitIdleLocked();
106     void ResetBackoffLocked();
107     void DeactivateLocked();
108     void MaybeReactivateLocked();
109     void MaybeCancelFailoverTimerLocked();
110 
111     void Orphan() override;
112 
GetPicker()113     std::unique_ptr<SubchannelPicker> GetPicker() {
114       return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
115     }
116 
connectivity_state() const117     grpc_connectivity_state connectivity_state() const {
118       return connectivity_state_;
119     }
120 
connectivity_status() const121     const absl::Status& connectivity_status() const {
122       return connectivity_status_;
123     }
124 
failover_timer_callback_pending() const125     bool failover_timer_callback_pending() const {
126       return failover_timer_callback_pending_;
127     }
128 
129    private:
130     // A simple wrapper for ref-counting a picker from the child policy.
131     class RefCountedPicker : public RefCounted<RefCountedPicker> {
132      public:
RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)133       explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
134           : picker_(std::move(picker)) {}
Pick(PickArgs args)135       PickResult Pick(PickArgs args) { return picker_->Pick(args); }
136 
137      private:
138       std::unique_ptr<SubchannelPicker> picker_;
139     };
140 
141     // A non-ref-counted wrapper for RefCountedPicker.
142     class RefCountedPickerWrapper : public SubchannelPicker {
143      public:
RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)144       explicit RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)
145           : picker_(std::move(picker)) {}
Pick(PickArgs args)146       PickResult Pick(PickArgs args) override { return picker_->Pick(args); }
147 
148      private:
149       RefCountedPtr<RefCountedPicker> picker_;
150     };
151 
152     class Helper : public ChannelControlHelper {
153      public:
Helper(RefCountedPtr<ChildPriority> priority)154       explicit Helper(RefCountedPtr<ChildPriority> priority)
155           : priority_(std::move(priority)) {}
156 
~Helper()157       ~Helper() override { priority_.reset(DEBUG_LOCATION, "Helper"); }
158 
159       RefCountedPtr<SubchannelInterface> CreateSubchannel(
160           ServerAddress address, const grpc_channel_args& args) override;
161       void UpdateState(grpc_connectivity_state state,
162                        const absl::Status& status,
163                        std::unique_ptr<SubchannelPicker> picker) override;
164       void RequestReresolution() override;
165       void AddTraceEvent(TraceSeverity severity,
166                          absl::string_view message) override;
167 
168      private:
169       RefCountedPtr<ChildPriority> priority_;
170     };
171 
172     // Methods for dealing with the child policy.
173     OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
174         const grpc_channel_args* args);
175 
176     void OnConnectivityStateUpdateLocked(
177         grpc_connectivity_state state, const absl::Status& status,
178         std::unique_ptr<SubchannelPicker> picker);
179 
180     void StartFailoverTimerLocked();
181 
182     static void OnFailoverTimer(void* arg, grpc_error* error);
183     void OnFailoverTimerLocked(grpc_error* error);
184     static void OnDeactivationTimer(void* arg, grpc_error* error);
185     void OnDeactivationTimerLocked(grpc_error* error);
186 
187     RefCountedPtr<PriorityLb> priority_policy_;
188     const std::string name_;
189     bool ignore_reresolution_requests_ = false;
190 
191     OrphanablePtr<LoadBalancingPolicy> child_policy_;
192 
193     grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
194     absl::Status connectivity_status_;
195     RefCountedPtr<RefCountedPicker> picker_wrapper_;
196 
197     // States for delayed removal.
198     grpc_timer deactivation_timer_;
199     grpc_closure on_deactivation_timer_;
200     bool deactivation_timer_callback_pending_ = false;
201 
202     // States of failover.
203     grpc_timer failover_timer_;
204     grpc_closure on_failover_timer_;
205     bool failover_timer_callback_pending_ = false;
206   };
207 
208   ~PriorityLb() override;
209 
210   void ShutdownLocked() override;
211 
212   // Returns UINT32_MAX if child is not in current priority list.
213   uint32_t GetChildPriorityLocked(const std::string& child_name) const;
214 
215   void HandleChildConnectivityStateChangeLocked(ChildPriority* child);
216   void DeleteChild(ChildPriority* child);
217 
218   void TryNextPriorityLocked(bool report_connecting);
219   void SelectPriorityLocked(uint32_t priority);
220 
221   const int child_failover_timeout_ms_;
222 
223   // Current channel args and config from the resolver.
224   const grpc_channel_args* args_ = nullptr;
225   RefCountedPtr<PriorityLbConfig> config_;
226   HierarchicalAddressMap addresses_;
227 
228   // Internal state.
229   bool shutting_down_ = false;
230 
231   std::map<std::string, OrphanablePtr<ChildPriority>> children_;
232   // The priority that is being used.
233   uint32_t current_priority_ = UINT32_MAX;
234   // Points to the current child from before the most recent update.
235   // We will continue to use this child until we decide which of the new
236   // children to use.
237   ChildPriority* current_child_from_before_update_ = nullptr;
238 };
239 
240 //
241 // PriorityLb
242 //
243 
PriorityLb(Args args)244 PriorityLb::PriorityLb(Args args)
245     : LoadBalancingPolicy(std::move(args)),
246       child_failover_timeout_ms_(grpc_channel_args_find_integer(
247           args.args, GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS,
248           {kDefaultChildFailoverTimeoutMs, 0, INT_MAX})) {
249   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
250     gpr_log(GPR_INFO, "[priority_lb %p] created", this);
251   }
252 }
253 
~PriorityLb()254 PriorityLb::~PriorityLb() {
255   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
256     gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this);
257   }
258   grpc_channel_args_destroy(args_);
259 }
260 
ShutdownLocked()261 void PriorityLb::ShutdownLocked() {
262   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
263     gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this);
264   }
265   shutting_down_ = true;
266   children_.clear();
267 }
268 
ExitIdleLocked()269 void PriorityLb::ExitIdleLocked() {
270   if (current_priority_ != UINT32_MAX) {
271     const std::string& child_name = config_->priorities()[current_priority_];
272     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
273       gpr_log(GPR_INFO,
274               "[priority_lb %p] exiting IDLE for current priority %d child %s",
275               this, current_priority_, child_name.c_str());
276     }
277     children_[child_name]->ExitIdleLocked();
278   }
279 }
280 
ResetBackoffLocked()281 void PriorityLb::ResetBackoffLocked() {
282   for (const auto& p : children_) p.second->ResetBackoffLocked();
283 }
284 
UpdateLocked(UpdateArgs args)285 void PriorityLb::UpdateLocked(UpdateArgs args) {
286   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
287     gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
288   }
289   // Save current child.
290   if (current_priority_ != UINT32_MAX) {
291     const std::string& child_name = config_->priorities()[current_priority_];
292     current_child_from_before_update_ = children_[child_name].get();
293     // Unset current_priority_, since it was an index into the old
294     // config's priority list and may no longer be valid.  It will be
295     // reset later by TryNextPriorityLocked(), but we unset it here in
296     // case updating any of our children triggers a state update.
297     current_priority_ = UINT32_MAX;
298   }
299   // Update config.
300   config_ = std::move(args.config);
301   // Update args.
302   grpc_channel_args_destroy(args_);
303   args_ = args.args;
304   args.args = nullptr;
305   // Update addresses.
306   addresses_ = MakeHierarchicalAddressMap(args.addresses);
307   // Check all existing children against the new config.
308   for (const auto& p : children_) {
309     const std::string& child_name = p.first;
310     auto& child = p.second;
311     auto config_it = config_->children().find(child_name);
312     if (config_it == config_->children().end()) {
313       // Existing child not found in new config.  Deactivate it.
314       child->DeactivateLocked();
315     } else {
316       // Existing child found in new config.  Update it.
317       child->UpdateLocked(config_it->second.config,
318                           config_it->second.ignore_reresolution_requests);
319     }
320   }
321   // Try to get connected.
322   TryNextPriorityLocked(/*report_connecting=*/children_.empty());
323 }
324 
GetChildPriorityLocked(const std::string & child_name) const325 uint32_t PriorityLb::GetChildPriorityLocked(
326     const std::string& child_name) const {
327   for (uint32_t priority = 0; priority < config_->priorities().size();
328        ++priority) {
329     if (config_->priorities()[priority] == child_name) return priority;
330   }
331   return UINT32_MAX;
332 }
333 
HandleChildConnectivityStateChangeLocked(ChildPriority * child)334 void PriorityLb::HandleChildConnectivityStateChangeLocked(
335     ChildPriority* child) {
336   // Special case for the child that was the current child before the
337   // most recent update.
338   if (child == current_child_from_before_update_) {
339     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
340       gpr_log(GPR_INFO,
341               "[priority_lb %p] state update for current child from before "
342               "config update",
343               this);
344     }
345     if (child->connectivity_state() == GRPC_CHANNEL_READY ||
346         child->connectivity_state() == GRPC_CHANNEL_IDLE) {
347       // If it's still READY or IDLE, we stick with this child, so pass
348       // the new picker up to our parent.
349       channel_control_helper()->UpdateState(child->connectivity_state(),
350                                             child->connectivity_status(),
351                                             child->GetPicker());
352     } else {
353       // If it's no longer READY or IDLE, we should stop using it.
354       // We already started trying other priorities as a result of the
355       // update, but calling TryNextPriorityLocked() ensures that we will
356       // properly select between CONNECTING and TRANSIENT_FAILURE as the
357       // new state to report to our parent.
358       current_child_from_before_update_ = nullptr;
359       TryNextPriorityLocked(/*report_connecting=*/true);
360     }
361     return;
362   }
363   // Otherwise, find the child's priority.
364   uint32_t child_priority = GetChildPriorityLocked(child->name());
365   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
366     gpr_log(GPR_INFO,
367             "[priority_lb %p] state update for priority %u, child %s, current "
368             "priority %u",
369             this, child_priority, child->name().c_str(), current_priority_);
370   }
371   // Ignore priorities not in the current config.
372   if (child_priority == UINT32_MAX) return;
373   // Ignore lower-than-current priorities.
374   if (child_priority > current_priority_) return;
375   // If a child reports TRANSIENT_FAILURE, start trying the next priority.
376   // Note that even if this is for a higher-than-current priority, we
377   // may still need to create some children between this priority and
378   // the current one (e.g., if we got an update that inserted new
379   // priorities ahead of the current one).
380   if (child->connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) {
381     TryNextPriorityLocked(
382         /*report_connecting=*/child_priority == current_priority_);
383     return;
384   }
385   // The update is for a higher-than-current priority (or for any
386   // priority if we don't have any current priority).
387   if (child_priority < current_priority_) {
388     // If the child reports READY or IDLE, switch to that priority.
389     // Otherwise, ignore the update.
390     if (child->connectivity_state() == GRPC_CHANNEL_READY ||
391         child->connectivity_state() == GRPC_CHANNEL_IDLE) {
392       SelectPriorityLocked(child_priority);
393     }
394     return;
395   }
396   // The current priority has returned a new picker, so pass it up to
397   // our parent.
398   channel_control_helper()->UpdateState(child->connectivity_state(),
399                                         child->connectivity_status(),
400                                         child->GetPicker());
401 }
402 
DeleteChild(ChildPriority * child)403 void PriorityLb::DeleteChild(ChildPriority* child) {
404   // If this was the current child from before the most recent update,
405   // stop using it.  We already started trying other priorities as a
406   // result of the update, but calling TryNextPriorityLocked() ensures that
407   // we will properly select between CONNECTING and TRANSIENT_FAILURE as the
408   // new state to report to our parent.
409   if (current_child_from_before_update_ == child) {
410     current_child_from_before_update_ = nullptr;
411     TryNextPriorityLocked(/*report_connecting=*/true);
412   }
413   children_.erase(child->name());
414 }
415 
TryNextPriorityLocked(bool report_connecting)416 void PriorityLb::TryNextPriorityLocked(bool report_connecting) {
417   current_priority_ = UINT32_MAX;
418   for (uint32_t priority = 0; priority < config_->priorities().size();
419        ++priority) {
420     // If the child for the priority does not exist yet, create it.
421     const std::string& child_name = config_->priorities()[priority];
422     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
423       gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
424               priority, child_name.c_str());
425     }
426     auto& child = children_[child_name];
427     if (child == nullptr) {
428       if (report_connecting) {
429         channel_control_helper()->UpdateState(
430             GRPC_CHANNEL_CONNECTING, absl::Status(),
431             absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
432       }
433       child = MakeOrphanable<ChildPriority>(
434           Ref(DEBUG_LOCATION, "ChildPriority"), child_name);
435       auto child_config = config_->children().find(child_name);
436       GPR_DEBUG_ASSERT(child_config != config_->children().end());
437       child->UpdateLocked(child_config->second.config,
438                           child_config->second.ignore_reresolution_requests);
439       return;
440     }
441     // The child already exists.
442     child->MaybeReactivateLocked();
443     // If the child is in state READY or IDLE, switch to it.
444     if (child->connectivity_state() == GRPC_CHANNEL_READY ||
445         child->connectivity_state() == GRPC_CHANNEL_IDLE) {
446       SelectPriorityLocked(priority);
447       return;
448     }
449     // Child is not READY or IDLE.
450     // If its failover timer is still pending, give it time to fire.
451     if (child->failover_timer_callback_pending()) {
452       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
453         gpr_log(GPR_INFO,
454                 "[priority_lb %p] priority %u, child %s: child still "
455                 "attempting to connect, will wait",
456                 this, priority, child_name.c_str());
457       }
458       if (report_connecting) {
459         channel_control_helper()->UpdateState(
460             GRPC_CHANNEL_CONNECTING, absl::Status(),
461             absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
462       }
463       return;
464     }
465     // Child has been failing for a while.  Move on to the next priority.
466   }
467   // If there are no more priorities to try, report TRANSIENT_FAILURE.
468   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
469     gpr_log(GPR_INFO,
470             "[priority_lb %p] no priority reachable, putting channel in "
471             "TRANSIENT_FAILURE",
472             this);
473   }
474   current_child_from_before_update_ = nullptr;
475   grpc_error* error = grpc_error_set_int(
476       GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready priority"),
477       GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
478   channel_control_helper()->UpdateState(
479       GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
480       absl::make_unique<TransientFailurePicker>(error));
481 }
482 
SelectPriorityLocked(uint32_t priority)483 void PriorityLb::SelectPriorityLocked(uint32_t priority) {
484   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
485     gpr_log(GPR_INFO, "[priority_lb %p] selected priority %u, child %s", this,
486             priority, config_->priorities()[priority].c_str());
487   }
488   current_priority_ = priority;
489   current_child_from_before_update_ = nullptr;
490   // Deactivate lower priorities.
491   for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
492     const std::string& child_name = config_->priorities()[p];
493     auto it = children_.find(child_name);
494     if (it != children_.end()) it->second->DeactivateLocked();
495   }
496   // Update picker.
497   auto& child = children_[config_->priorities()[priority]];
498   channel_control_helper()->UpdateState(child->connectivity_state(),
499                                         child->connectivity_status(),
500                                         child->GetPicker());
501 }
502 
503 //
504 // PriorityLb::ChildPriority
505 //
506 
ChildPriority(RefCountedPtr<PriorityLb> priority_policy,std::string name)507 PriorityLb::ChildPriority::ChildPriority(
508     RefCountedPtr<PriorityLb> priority_policy, std::string name)
509     : priority_policy_(std::move(priority_policy)), name_(std::move(name)) {
510   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
511     gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)",
512             priority_policy_.get(), name_.c_str(), this);
513   }
514   GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this,
515                     grpc_schedule_on_exec_ctx);
516   GRPC_CLOSURE_INIT(&on_deactivation_timer_, OnDeactivationTimer, this,
517                     grpc_schedule_on_exec_ctx);
518   // Start the failover timer.
519   StartFailoverTimerLocked();
520 }
521 
Orphan()522 void PriorityLb::ChildPriority::Orphan() {
523   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
524     gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned",
525             priority_policy_.get(), name_.c_str(), this);
526   }
527   MaybeCancelFailoverTimerLocked();
528   if (deactivation_timer_callback_pending_) {
529     grpc_timer_cancel(&deactivation_timer_);
530   }
531   // Remove the child policy's interested_parties pollset_set from the
532   // xDS policy.
533   grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
534                                    priority_policy_->interested_parties());
535   child_policy_.reset();
536   // Drop our ref to the child's picker, in case it's holding a ref to
537   // the child.
538   picker_wrapper_.reset();
539   if (deactivation_timer_callback_pending_) {
540     grpc_timer_cancel(&deactivation_timer_);
541   }
542   Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
543 }
544 
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,bool ignore_reresolution_requests)545 void PriorityLb::ChildPriority::UpdateLocked(
546     RefCountedPtr<LoadBalancingPolicy::Config> config,
547     bool ignore_reresolution_requests) {
548   if (priority_policy_->shutting_down_) return;
549   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
550     gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
551             priority_policy_.get(), name_.c_str(), this);
552   }
553   ignore_reresolution_requests_ = ignore_reresolution_requests;
554   // Create policy if needed.
555   if (child_policy_ == nullptr) {
556     child_policy_ = CreateChildPolicyLocked(priority_policy_->args_);
557   }
558   // Construct update args.
559   UpdateArgs update_args;
560   update_args.config = std::move(config);
561   update_args.addresses = priority_policy_->addresses_[name_];
562   update_args.args = grpc_channel_args_copy(priority_policy_->args_);
563   // Update the policy.
564   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
565     gpr_log(GPR_INFO,
566             "[priority_lb %p] child %s (%p): updating child policy handler %p",
567             priority_policy_.get(), name_.c_str(), this, child_policy_.get());
568   }
569   child_policy_->UpdateLocked(std::move(update_args));
570 }
571 
572 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)573 PriorityLb::ChildPriority::CreateChildPolicyLocked(
574     const grpc_channel_args* args) {
575   LoadBalancingPolicy::Args lb_policy_args;
576   lb_policy_args.work_serializer = priority_policy_->work_serializer();
577   lb_policy_args.args = args;
578   lb_policy_args.channel_control_helper =
579       absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
580   OrphanablePtr<LoadBalancingPolicy> lb_policy =
581       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
582                                          &grpc_lb_priority_trace);
583   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
584     gpr_log(GPR_INFO,
585             "[priority_lb %p] child %s (%p): created new child policy "
586             "handler %p",
587             priority_policy_.get(), name_.c_str(), this, lb_policy.get());
588   }
589   // Add the parent's interested_parties pollset_set to that of the newly
590   // created child policy. This will make the child policy progress upon
591   // activity on the parent LB, which in turn is tied to the application's call.
592   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
593                                    priority_policy_->interested_parties());
594   return lb_policy;
595 }
596 
ExitIdleLocked()597 void PriorityLb::ChildPriority::ExitIdleLocked() {
598   if (connectivity_state_ == GRPC_CHANNEL_IDLE &&
599       !failover_timer_callback_pending_) {
600     StartFailoverTimerLocked();
601   }
602   child_policy_->ExitIdleLocked();
603 }
604 
ResetBackoffLocked()605 void PriorityLb::ChildPriority::ResetBackoffLocked() {
606   child_policy_->ResetBackoffLocked();
607 }
608 
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)609 void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
610     grpc_connectivity_state state, const absl::Status& status,
611     std::unique_ptr<SubchannelPicker> picker) {
612   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
613     gpr_log(GPR_INFO,
614             "[priority_lb %p] child %s (%p): state update: %s (%s) picker %p",
615             priority_policy_.get(), name_.c_str(), this,
616             ConnectivityStateName(state), status.ToString().c_str(),
617             picker.get());
618   }
619   // Store the state and picker.
620   connectivity_state_ = state;
621   connectivity_status_ = status;
622   picker_wrapper_ = MakeRefCounted<RefCountedPicker>(std::move(picker));
623   // If READY or TRANSIENT_FAILURE, cancel failover timer.
624   if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
625     MaybeCancelFailoverTimerLocked();
626   }
627   // Notify the parent policy.
628   priority_policy_->HandleChildConnectivityStateChangeLocked(this);
629 }
630 
StartFailoverTimerLocked()631 void PriorityLb::ChildPriority::StartFailoverTimerLocked() {
632   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
633     gpr_log(GPR_INFO,
634             "[priority_lb %p] child %s (%p): starting failover timer for %d ms",
635             priority_policy_.get(), name_.c_str(), this,
636             priority_policy_->child_failover_timeout_ms_);
637   }
638   Ref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked").release();
639   grpc_timer_init(
640       &failover_timer_,
641       ExecCtx::Get()->Now() + priority_policy_->child_failover_timeout_ms_,
642       &on_failover_timer_);
643   failover_timer_callback_pending_ = true;
644 }
645 
MaybeCancelFailoverTimerLocked()646 void PriorityLb::ChildPriority::MaybeCancelFailoverTimerLocked() {
647   if (failover_timer_callback_pending_) {
648     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
649       gpr_log(GPR_INFO,
650               "[priority_lb %p] child %s (%p): cancelling failover timer",
651               priority_policy_.get(), name_.c_str(), this);
652     }
653     grpc_timer_cancel(&failover_timer_);
654     failover_timer_callback_pending_ = false;
655   }
656 }
657 
OnFailoverTimer(void * arg,grpc_error * error)658 void PriorityLb::ChildPriority::OnFailoverTimer(void* arg, grpc_error* error) {
659   ChildPriority* self = static_cast<ChildPriority*>(arg);
660   GRPC_ERROR_REF(error);  // ref owned by lambda
661   self->priority_policy_->work_serializer()->Run(
662       [self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION);
663 }
664 
OnFailoverTimerLocked(grpc_error * error)665 void PriorityLb::ChildPriority::OnFailoverTimerLocked(grpc_error* error) {
666   if (error == GRPC_ERROR_NONE && failover_timer_callback_pending_ &&
667       !priority_policy_->shutting_down_) {
668     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
669       gpr_log(GPR_INFO,
670               "[priority_lb %p] child %s (%p): failover timer fired, "
671               "reporting TRANSIENT_FAILURE",
672               priority_policy_.get(), name_.c_str(), this);
673     }
674     failover_timer_callback_pending_ = false;
675     OnConnectivityStateUpdateLocked(
676         GRPC_CHANNEL_TRANSIENT_FAILURE,
677         absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
678         nullptr);
679   }
680   Unref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked");
681   GRPC_ERROR_UNREF(error);
682 }
683 
DeactivateLocked()684 void PriorityLb::ChildPriority::DeactivateLocked() {
685   // If already deactivated, don't do it again.
686   if (deactivation_timer_callback_pending_) return;
687   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
688     gpr_log(GPR_INFO,
689             "[priority_lb %p] child %s (%p): deactivating -- will remove in %d "
690             "ms.",
691             priority_policy_.get(), name_.c_str(), this,
692             kChildRetentionIntervalMs);
693   }
694   MaybeCancelFailoverTimerLocked();
695   // Start a timer to delete the child.
696   Ref(DEBUG_LOCATION, "ChildPriority+timer").release();
697   grpc_timer_init(&deactivation_timer_,
698                   ExecCtx::Get()->Now() + kChildRetentionIntervalMs,
699                   &on_deactivation_timer_);
700   deactivation_timer_callback_pending_ = true;
701 }
702 
MaybeReactivateLocked()703 void PriorityLb::ChildPriority::MaybeReactivateLocked() {
704   if (deactivation_timer_callback_pending_) {
705     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
706       gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
707               priority_policy_.get(), name_.c_str(), this);
708     }
709     deactivation_timer_callback_pending_ = false;
710     grpc_timer_cancel(&deactivation_timer_);
711   }
712 }
713 
OnDeactivationTimer(void * arg,grpc_error * error)714 void PriorityLb::ChildPriority::OnDeactivationTimer(void* arg,
715                                                     grpc_error* error) {
716   ChildPriority* self = static_cast<ChildPriority*>(arg);
717   GRPC_ERROR_REF(error);  // ref owned by lambda
718   self->priority_policy_->work_serializer()->Run(
719       [self, error]() { self->OnDeactivationTimerLocked(error); },
720       DEBUG_LOCATION);
721 }
722 
OnDeactivationTimerLocked(grpc_error * error)723 void PriorityLb::ChildPriority::OnDeactivationTimerLocked(grpc_error* error) {
724   if (error == GRPC_ERROR_NONE && deactivation_timer_callback_pending_ &&
725       !priority_policy_->shutting_down_) {
726     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
727       gpr_log(GPR_INFO,
728               "[priority_lb %p] child %s (%p): deactivation timer fired, "
729               "deleting child",
730               priority_policy_.get(), name_.c_str(), this);
731     }
732     deactivation_timer_callback_pending_ = false;
733     priority_policy_->DeleteChild(this);
734   }
735   Unref(DEBUG_LOCATION, "ChildPriority+timer");
736   GRPC_ERROR_UNREF(error);
737 }
738 
739 //
740 // PriorityLb::ChildPriority::Helper
741 //
742 
RequestReresolution()743 void PriorityLb::ChildPriority::Helper::RequestReresolution() {
744   if (priority_->priority_policy_->shutting_down_) return;
745   if (priority_->ignore_reresolution_requests_) {
746     return;
747   }
748   priority_->priority_policy_->channel_control_helper()->RequestReresolution();
749 }
750 
751 RefCountedPtr<SubchannelInterface>
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)752 PriorityLb::ChildPriority::Helper::CreateSubchannel(
753     ServerAddress address, const grpc_channel_args& args) {
754   if (priority_->priority_policy_->shutting_down_) return nullptr;
755   return priority_->priority_policy_->channel_control_helper()
756       ->CreateSubchannel(std::move(address), args);
757 }
758 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)759 void PriorityLb::ChildPriority::Helper::UpdateState(
760     grpc_connectivity_state state, const absl::Status& status,
761     std::unique_ptr<SubchannelPicker> picker) {
762   if (priority_->priority_policy_->shutting_down_) return;
763   // Notify the priority.
764   priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker));
765 }
766 
AddTraceEvent(TraceSeverity severity,absl::string_view message)767 void PriorityLb::ChildPriority::Helper::AddTraceEvent(
768     TraceSeverity severity, absl::string_view message) {
769   if (priority_->priority_policy_->shutting_down_) return;
770   priority_->priority_policy_->channel_control_helper()->AddTraceEvent(severity,
771                                                                        message);
772 }
773 
774 //
775 // factory
776 //
777 
778 class PriorityLbFactory : public LoadBalancingPolicyFactory {
779  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const780   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
781       LoadBalancingPolicy::Args args) const override {
782     return MakeOrphanable<PriorityLb>(std::move(args));
783   }
784 
name() const785   const char* name() const override { return kPriority; }
786 
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const787   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
788       const Json& json, grpc_error** error) const override {
789     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
790     if (json.type() == Json::Type::JSON_NULL) {
791       // priority was mentioned as a policy in the deprecated
792       // loadBalancingPolicy field or in the client API.
793       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
794           "field:loadBalancingPolicy error:priority policy requires "
795           "configuration. Please use loadBalancingConfig field of service "
796           "config instead.");
797       return nullptr;
798     }
799     std::vector<grpc_error*> error_list;
800     // Children.
801     std::map<std::string, PriorityLbConfig::PriorityLbChild> children;
802     auto it = json.object_value().find("children");
803     if (it == json.object_value().end()) {
804       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
805           "field:children error:required field missing"));
806     } else if (it->second.type() != Json::Type::OBJECT) {
807       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
808           "field:children error:type should be object"));
809     } else {
810       const Json::Object& object = it->second.object_value();
811       for (const auto& p : object) {
812         const std::string& child_name = p.first;
813         const Json& element = p.second;
814         if (element.type() != Json::Type::OBJECT) {
815           error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
816               absl::StrCat("field:children key:", child_name,
817                            " error:should be type object")
818                   .c_str()));
819         } else {
820           auto it2 = element.object_value().find("config");
821           if (it2 == element.object_value().end()) {
822             error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
823                 absl::StrCat("field:children key:", child_name,
824                              " error:missing 'config' field")
825                     .c_str()));
826           } else {
827             grpc_error* parse_error = GRPC_ERROR_NONE;
828             auto config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
829                 it2->second, &parse_error);
830             bool ignore_resolution_requests = false;
831             // If present, ignore_reresolution_requests must be of type
832             // boolean.
833             auto it3 =
834                 element.object_value().find("ignore_reresolution_requests");
835             if (it3 != element.object_value().end()) {
836               if (it3->second.type() == Json::Type::JSON_TRUE) {
837                 ignore_resolution_requests = true;
838               } else if (it3->second.type() != Json::Type::JSON_FALSE) {
839                 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
840                     absl::StrCat("field:children key:", child_name,
841                                  " field:ignore_reresolution_requests:should "
842                                  "be type boolean")
843                         .c_str()));
844               }
845             }
846             if (config == nullptr) {
847               GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
848               error_list.push_back(
849                   GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
850                       absl::StrCat("field:children key:", child_name).c_str(),
851                       &parse_error, 1));
852               GRPC_ERROR_UNREF(parse_error);
853             }
854             children[child_name].config = std::move(config);
855             children[child_name].ignore_reresolution_requests =
856                 ignore_resolution_requests;
857           }
858         }
859       }
860     }
861     // Priorities.
862     std::vector<std::string> priorities;
863     it = json.object_value().find("priorities");
864     if (it == json.object_value().end()) {
865       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
866           "field:priorities error:required field missing"));
867     } else if (it->second.type() != Json::Type::ARRAY) {
868       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
869           "field:priorities error:type should be array"));
870     } else {
871       const Json::Array& array = it->second.array_value();
872       for (size_t i = 0; i < array.size(); ++i) {
873         const Json& element = array[i];
874         if (element.type() != Json::Type::STRING) {
875           error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
876               absl::StrCat("field:priorities element:", i,
877                            " error:should be type string")
878                   .c_str()));
879         } else if (children.find(element.string_value()) == children.end()) {
880           error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
881               absl::StrCat("field:priorities element:", i,
882                            " error:unknown child '", element.string_value(),
883                            "'")
884                   .c_str()));
885         } else {
886           priorities.emplace_back(element.string_value());
887         }
888       }
889       if (priorities.size() != children.size()) {
890         error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
891             absl::StrCat("field:priorities error:priorities size (",
892                          priorities.size(), ") != children size (",
893                          children.size(), ")")
894                 .c_str()));
895       }
896     }
897     if (error_list.empty()) {
898       return MakeRefCounted<PriorityLbConfig>(std::move(children),
899                                               std::move(priorities));
900     } else {
901       *error = GRPC_ERROR_CREATE_FROM_VECTOR(
902           "priority_experimental LB policy config", &error_list);
903       return nullptr;
904     }
905   }
906 };
907 
908 }  // namespace
909 
910 }  // namespace grpc_core
911 
912 //
913 // Plugin registration
914 //
915 
grpc_lb_policy_priority_init()916 void grpc_lb_policy_priority_init() {
917   grpc_core::LoadBalancingPolicyRegistry::Builder::
918       RegisterLoadBalancingPolicyFactory(
919           absl::make_unique<grpc_core::PriorityLbFactory>());
920 }
921 
grpc_lb_policy_priority_shutdown()922 void grpc_lb_policy_priority_shutdown() {}
923