1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <inttypes.h>
20 #include <limits.h>
21
22 #include "absl/strings/str_cat.h"
23 #include "absl/strings/str_format.h"
24
25 #include <grpc/grpc.h>
26
27 #include "src/core/ext/filters/client_channel/lb_policy.h"
28 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
30 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
31 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
32 #include "src/core/lib/channel/channel_args.h"
33 #include "src/core/lib/gprpp/orphanable.h"
34 #include "src/core/lib/gprpp/ref_counted_ptr.h"
35 #include "src/core/lib/iomgr/timer.h"
36 #include "src/core/lib/iomgr/work_serializer.h"
37 #include "src/core/lib/transport/error_utils.h"
38
39 namespace grpc_core {
40
41 TraceFlag grpc_lb_priority_trace(false, "priority_lb");
42
43 namespace {
44
45 constexpr char kPriority[] = "priority_experimental";
46
47 // How long we keep a child around for after it is no longer being used
48 // (either because it has been removed from the config or because we
49 // have switched to a higher-priority child).
50 constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000;
51
52 // Default for how long we wait for a newly created child to get connected
53 // before starting to attempt the next priority. Overridable via channel arg.
54 constexpr int kDefaultChildFailoverTimeoutMs = 10000;
55
56 // Config for priority LB policy.
57 class PriorityLbConfig : public LoadBalancingPolicy::Config {
58 public:
59 struct PriorityLbChild {
60 RefCountedPtr<LoadBalancingPolicy::Config> config;
61 bool ignore_reresolution_requests = false;
62 };
63
PriorityLbConfig(std::map<std::string,PriorityLbChild> children,std::vector<std::string> priorities)64 PriorityLbConfig(std::map<std::string, PriorityLbChild> children,
65 std::vector<std::string> priorities)
66 : children_(std::move(children)), priorities_(std::move(priorities)) {}
67
name() const68 const char* name() const override { return kPriority; }
69
children() const70 const std::map<std::string, PriorityLbChild>& children() const {
71 return children_;
72 }
priorities() const73 const std::vector<std::string>& priorities() const { return priorities_; }
74
75 private:
76 const std::map<std::string, PriorityLbChild> children_;
77 const std::vector<std::string> priorities_;
78 };
79
80 // priority LB policy.
81 class PriorityLb : public LoadBalancingPolicy {
82 public:
83 explicit PriorityLb(Args args);
84
name() const85 const char* name() const override { return kPriority; }
86
87 void UpdateLocked(UpdateArgs args) override;
88 void ExitIdleLocked() override;
89 void ResetBackoffLocked() override;
90
91 private:
92 // Each ChildPriority holds a ref to the PriorityLb.
93 class ChildPriority : public InternallyRefCounted<ChildPriority> {
94 public:
95 ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name);
96
~ChildPriority()97 ~ChildPriority() override {
98 priority_policy_.reset(DEBUG_LOCATION, "ChildPriority");
99 }
100
name() const101 const std::string& name() const { return name_; }
102
103 void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
104 bool ignore_reresolution_requests);
105 void ExitIdleLocked();
106 void ResetBackoffLocked();
107 void DeactivateLocked();
108 void MaybeReactivateLocked();
109 void MaybeCancelFailoverTimerLocked();
110
111 void Orphan() override;
112
GetPicker()113 std::unique_ptr<SubchannelPicker> GetPicker() {
114 return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
115 }
116
connectivity_state() const117 grpc_connectivity_state connectivity_state() const {
118 return connectivity_state_;
119 }
120
connectivity_status() const121 const absl::Status& connectivity_status() const {
122 return connectivity_status_;
123 }
124
failover_timer_callback_pending() const125 bool failover_timer_callback_pending() const {
126 return failover_timer_callback_pending_;
127 }
128
129 private:
130 // A simple wrapper for ref-counting a picker from the child policy.
131 class RefCountedPicker : public RefCounted<RefCountedPicker> {
132 public:
RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)133 explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
134 : picker_(std::move(picker)) {}
Pick(PickArgs args)135 PickResult Pick(PickArgs args) { return picker_->Pick(args); }
136
137 private:
138 std::unique_ptr<SubchannelPicker> picker_;
139 };
140
141 // A non-ref-counted wrapper for RefCountedPicker.
142 class RefCountedPickerWrapper : public SubchannelPicker {
143 public:
RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)144 explicit RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)
145 : picker_(std::move(picker)) {}
Pick(PickArgs args)146 PickResult Pick(PickArgs args) override { return picker_->Pick(args); }
147
148 private:
149 RefCountedPtr<RefCountedPicker> picker_;
150 };
151
152 class Helper : public ChannelControlHelper {
153 public:
Helper(RefCountedPtr<ChildPriority> priority)154 explicit Helper(RefCountedPtr<ChildPriority> priority)
155 : priority_(std::move(priority)) {}
156
~Helper()157 ~Helper() override { priority_.reset(DEBUG_LOCATION, "Helper"); }
158
159 RefCountedPtr<SubchannelInterface> CreateSubchannel(
160 ServerAddress address, const grpc_channel_args& args) override;
161 void UpdateState(grpc_connectivity_state state,
162 const absl::Status& status,
163 std::unique_ptr<SubchannelPicker> picker) override;
164 void RequestReresolution() override;
165 void AddTraceEvent(TraceSeverity severity,
166 absl::string_view message) override;
167
168 private:
169 RefCountedPtr<ChildPriority> priority_;
170 };
171
172 // Methods for dealing with the child policy.
173 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
174 const grpc_channel_args* args);
175
176 void OnConnectivityStateUpdateLocked(
177 grpc_connectivity_state state, const absl::Status& status,
178 std::unique_ptr<SubchannelPicker> picker);
179
180 void StartFailoverTimerLocked();
181
182 static void OnFailoverTimer(void* arg, grpc_error* error);
183 void OnFailoverTimerLocked(grpc_error* error);
184 static void OnDeactivationTimer(void* arg, grpc_error* error);
185 void OnDeactivationTimerLocked(grpc_error* error);
186
187 RefCountedPtr<PriorityLb> priority_policy_;
188 const std::string name_;
189 bool ignore_reresolution_requests_ = false;
190
191 OrphanablePtr<LoadBalancingPolicy> child_policy_;
192
193 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
194 absl::Status connectivity_status_;
195 RefCountedPtr<RefCountedPicker> picker_wrapper_;
196
197 // States for delayed removal.
198 grpc_timer deactivation_timer_;
199 grpc_closure on_deactivation_timer_;
200 bool deactivation_timer_callback_pending_ = false;
201
202 // States of failover.
203 grpc_timer failover_timer_;
204 grpc_closure on_failover_timer_;
205 bool failover_timer_callback_pending_ = false;
206 };
207
208 ~PriorityLb() override;
209
210 void ShutdownLocked() override;
211
212 // Returns UINT32_MAX if child is not in current priority list.
213 uint32_t GetChildPriorityLocked(const std::string& child_name) const;
214
215 void HandleChildConnectivityStateChangeLocked(ChildPriority* child);
216 void DeleteChild(ChildPriority* child);
217
218 void TryNextPriorityLocked(bool report_connecting);
219 void SelectPriorityLocked(uint32_t priority);
220
221 const int child_failover_timeout_ms_;
222
223 // Current channel args and config from the resolver.
224 const grpc_channel_args* args_ = nullptr;
225 RefCountedPtr<PriorityLbConfig> config_;
226 HierarchicalAddressMap addresses_;
227
228 // Internal state.
229 bool shutting_down_ = false;
230
231 std::map<std::string, OrphanablePtr<ChildPriority>> children_;
232 // The priority that is being used.
233 uint32_t current_priority_ = UINT32_MAX;
234 // Points to the current child from before the most recent update.
235 // We will continue to use this child until we decide which of the new
236 // children to use.
237 ChildPriority* current_child_from_before_update_ = nullptr;
238 };
239
240 //
241 // PriorityLb
242 //
243
PriorityLb(Args args)244 PriorityLb::PriorityLb(Args args)
245 : LoadBalancingPolicy(std::move(args)),
246 child_failover_timeout_ms_(grpc_channel_args_find_integer(
247 args.args, GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS,
248 {kDefaultChildFailoverTimeoutMs, 0, INT_MAX})) {
249 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
250 gpr_log(GPR_INFO, "[priority_lb %p] created", this);
251 }
252 }
253
~PriorityLb()254 PriorityLb::~PriorityLb() {
255 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
256 gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this);
257 }
258 grpc_channel_args_destroy(args_);
259 }
260
ShutdownLocked()261 void PriorityLb::ShutdownLocked() {
262 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
263 gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this);
264 }
265 shutting_down_ = true;
266 children_.clear();
267 }
268
ExitIdleLocked()269 void PriorityLb::ExitIdleLocked() {
270 if (current_priority_ != UINT32_MAX) {
271 const std::string& child_name = config_->priorities()[current_priority_];
272 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
273 gpr_log(GPR_INFO,
274 "[priority_lb %p] exiting IDLE for current priority %d child %s",
275 this, current_priority_, child_name.c_str());
276 }
277 children_[child_name]->ExitIdleLocked();
278 }
279 }
280
ResetBackoffLocked()281 void PriorityLb::ResetBackoffLocked() {
282 for (const auto& p : children_) p.second->ResetBackoffLocked();
283 }
284
UpdateLocked(UpdateArgs args)285 void PriorityLb::UpdateLocked(UpdateArgs args) {
286 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
287 gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
288 }
289 // Save current child.
290 if (current_priority_ != UINT32_MAX) {
291 const std::string& child_name = config_->priorities()[current_priority_];
292 current_child_from_before_update_ = children_[child_name].get();
293 // Unset current_priority_, since it was an index into the old
294 // config's priority list and may no longer be valid. It will be
295 // reset later by TryNextPriorityLocked(), but we unset it here in
296 // case updating any of our children triggers a state update.
297 current_priority_ = UINT32_MAX;
298 }
299 // Update config.
300 config_ = std::move(args.config);
301 // Update args.
302 grpc_channel_args_destroy(args_);
303 args_ = args.args;
304 args.args = nullptr;
305 // Update addresses.
306 addresses_ = MakeHierarchicalAddressMap(args.addresses);
307 // Check all existing children against the new config.
308 for (const auto& p : children_) {
309 const std::string& child_name = p.first;
310 auto& child = p.second;
311 auto config_it = config_->children().find(child_name);
312 if (config_it == config_->children().end()) {
313 // Existing child not found in new config. Deactivate it.
314 child->DeactivateLocked();
315 } else {
316 // Existing child found in new config. Update it.
317 child->UpdateLocked(config_it->second.config,
318 config_it->second.ignore_reresolution_requests);
319 }
320 }
321 // Try to get connected.
322 TryNextPriorityLocked(/*report_connecting=*/children_.empty());
323 }
324
GetChildPriorityLocked(const std::string & child_name) const325 uint32_t PriorityLb::GetChildPriorityLocked(
326 const std::string& child_name) const {
327 for (uint32_t priority = 0; priority < config_->priorities().size();
328 ++priority) {
329 if (config_->priorities()[priority] == child_name) return priority;
330 }
331 return UINT32_MAX;
332 }
333
HandleChildConnectivityStateChangeLocked(ChildPriority * child)334 void PriorityLb::HandleChildConnectivityStateChangeLocked(
335 ChildPriority* child) {
336 // Special case for the child that was the current child before the
337 // most recent update.
338 if (child == current_child_from_before_update_) {
339 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
340 gpr_log(GPR_INFO,
341 "[priority_lb %p] state update for current child from before "
342 "config update",
343 this);
344 }
345 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
346 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
347 // If it's still READY or IDLE, we stick with this child, so pass
348 // the new picker up to our parent.
349 channel_control_helper()->UpdateState(child->connectivity_state(),
350 child->connectivity_status(),
351 child->GetPicker());
352 } else {
353 // If it's no longer READY or IDLE, we should stop using it.
354 // We already started trying other priorities as a result of the
355 // update, but calling TryNextPriorityLocked() ensures that we will
356 // properly select between CONNECTING and TRANSIENT_FAILURE as the
357 // new state to report to our parent.
358 current_child_from_before_update_ = nullptr;
359 TryNextPriorityLocked(/*report_connecting=*/true);
360 }
361 return;
362 }
363 // Otherwise, find the child's priority.
364 uint32_t child_priority = GetChildPriorityLocked(child->name());
365 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
366 gpr_log(GPR_INFO,
367 "[priority_lb %p] state update for priority %u, child %s, current "
368 "priority %u",
369 this, child_priority, child->name().c_str(), current_priority_);
370 }
371 // Ignore priorities not in the current config.
372 if (child_priority == UINT32_MAX) return;
373 // Ignore lower-than-current priorities.
374 if (child_priority > current_priority_) return;
375 // If a child reports TRANSIENT_FAILURE, start trying the next priority.
376 // Note that even if this is for a higher-than-current priority, we
377 // may still need to create some children between this priority and
378 // the current one (e.g., if we got an update that inserted new
379 // priorities ahead of the current one).
380 if (child->connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) {
381 TryNextPriorityLocked(
382 /*report_connecting=*/child_priority == current_priority_);
383 return;
384 }
385 // The update is for a higher-than-current priority (or for any
386 // priority if we don't have any current priority).
387 if (child_priority < current_priority_) {
388 // If the child reports READY or IDLE, switch to that priority.
389 // Otherwise, ignore the update.
390 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
391 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
392 SelectPriorityLocked(child_priority);
393 }
394 return;
395 }
396 // The current priority has returned a new picker, so pass it up to
397 // our parent.
398 channel_control_helper()->UpdateState(child->connectivity_state(),
399 child->connectivity_status(),
400 child->GetPicker());
401 }
402
DeleteChild(ChildPriority * child)403 void PriorityLb::DeleteChild(ChildPriority* child) {
404 // If this was the current child from before the most recent update,
405 // stop using it. We already started trying other priorities as a
406 // result of the update, but calling TryNextPriorityLocked() ensures that
407 // we will properly select between CONNECTING and TRANSIENT_FAILURE as the
408 // new state to report to our parent.
409 if (current_child_from_before_update_ == child) {
410 current_child_from_before_update_ = nullptr;
411 TryNextPriorityLocked(/*report_connecting=*/true);
412 }
413 children_.erase(child->name());
414 }
415
TryNextPriorityLocked(bool report_connecting)416 void PriorityLb::TryNextPriorityLocked(bool report_connecting) {
417 current_priority_ = UINT32_MAX;
418 for (uint32_t priority = 0; priority < config_->priorities().size();
419 ++priority) {
420 // If the child for the priority does not exist yet, create it.
421 const std::string& child_name = config_->priorities()[priority];
422 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
423 gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
424 priority, child_name.c_str());
425 }
426 auto& child = children_[child_name];
427 if (child == nullptr) {
428 if (report_connecting) {
429 channel_control_helper()->UpdateState(
430 GRPC_CHANNEL_CONNECTING, absl::Status(),
431 absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
432 }
433 child = MakeOrphanable<ChildPriority>(
434 Ref(DEBUG_LOCATION, "ChildPriority"), child_name);
435 auto child_config = config_->children().find(child_name);
436 GPR_DEBUG_ASSERT(child_config != config_->children().end());
437 child->UpdateLocked(child_config->second.config,
438 child_config->second.ignore_reresolution_requests);
439 return;
440 }
441 // The child already exists.
442 child->MaybeReactivateLocked();
443 // If the child is in state READY or IDLE, switch to it.
444 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
445 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
446 SelectPriorityLocked(priority);
447 return;
448 }
449 // Child is not READY or IDLE.
450 // If its failover timer is still pending, give it time to fire.
451 if (child->failover_timer_callback_pending()) {
452 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
453 gpr_log(GPR_INFO,
454 "[priority_lb %p] priority %u, child %s: child still "
455 "attempting to connect, will wait",
456 this, priority, child_name.c_str());
457 }
458 if (report_connecting) {
459 channel_control_helper()->UpdateState(
460 GRPC_CHANNEL_CONNECTING, absl::Status(),
461 absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
462 }
463 return;
464 }
465 // Child has been failing for a while. Move on to the next priority.
466 }
467 // If there are no more priorities to try, report TRANSIENT_FAILURE.
468 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
469 gpr_log(GPR_INFO,
470 "[priority_lb %p] no priority reachable, putting channel in "
471 "TRANSIENT_FAILURE",
472 this);
473 }
474 current_child_from_before_update_ = nullptr;
475 grpc_error* error = grpc_error_set_int(
476 GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready priority"),
477 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
478 channel_control_helper()->UpdateState(
479 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
480 absl::make_unique<TransientFailurePicker>(error));
481 }
482
SelectPriorityLocked(uint32_t priority)483 void PriorityLb::SelectPriorityLocked(uint32_t priority) {
484 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
485 gpr_log(GPR_INFO, "[priority_lb %p] selected priority %u, child %s", this,
486 priority, config_->priorities()[priority].c_str());
487 }
488 current_priority_ = priority;
489 current_child_from_before_update_ = nullptr;
490 // Deactivate lower priorities.
491 for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
492 const std::string& child_name = config_->priorities()[p];
493 auto it = children_.find(child_name);
494 if (it != children_.end()) it->second->DeactivateLocked();
495 }
496 // Update picker.
497 auto& child = children_[config_->priorities()[priority]];
498 channel_control_helper()->UpdateState(child->connectivity_state(),
499 child->connectivity_status(),
500 child->GetPicker());
501 }
502
503 //
504 // PriorityLb::ChildPriority
505 //
506
ChildPriority(RefCountedPtr<PriorityLb> priority_policy,std::string name)507 PriorityLb::ChildPriority::ChildPriority(
508 RefCountedPtr<PriorityLb> priority_policy, std::string name)
509 : priority_policy_(std::move(priority_policy)), name_(std::move(name)) {
510 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
511 gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)",
512 priority_policy_.get(), name_.c_str(), this);
513 }
514 GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this,
515 grpc_schedule_on_exec_ctx);
516 GRPC_CLOSURE_INIT(&on_deactivation_timer_, OnDeactivationTimer, this,
517 grpc_schedule_on_exec_ctx);
518 // Start the failover timer.
519 StartFailoverTimerLocked();
520 }
521
Orphan()522 void PriorityLb::ChildPriority::Orphan() {
523 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
524 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned",
525 priority_policy_.get(), name_.c_str(), this);
526 }
527 MaybeCancelFailoverTimerLocked();
528 if (deactivation_timer_callback_pending_) {
529 grpc_timer_cancel(&deactivation_timer_);
530 }
531 // Remove the child policy's interested_parties pollset_set from the
532 // xDS policy.
533 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
534 priority_policy_->interested_parties());
535 child_policy_.reset();
536 // Drop our ref to the child's picker, in case it's holding a ref to
537 // the child.
538 picker_wrapper_.reset();
539 if (deactivation_timer_callback_pending_) {
540 grpc_timer_cancel(&deactivation_timer_);
541 }
542 Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
543 }
544
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,bool ignore_reresolution_requests)545 void PriorityLb::ChildPriority::UpdateLocked(
546 RefCountedPtr<LoadBalancingPolicy::Config> config,
547 bool ignore_reresolution_requests) {
548 if (priority_policy_->shutting_down_) return;
549 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
550 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
551 priority_policy_.get(), name_.c_str(), this);
552 }
553 ignore_reresolution_requests_ = ignore_reresolution_requests;
554 // Create policy if needed.
555 if (child_policy_ == nullptr) {
556 child_policy_ = CreateChildPolicyLocked(priority_policy_->args_);
557 }
558 // Construct update args.
559 UpdateArgs update_args;
560 update_args.config = std::move(config);
561 update_args.addresses = priority_policy_->addresses_[name_];
562 update_args.args = grpc_channel_args_copy(priority_policy_->args_);
563 // Update the policy.
564 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
565 gpr_log(GPR_INFO,
566 "[priority_lb %p] child %s (%p): updating child policy handler %p",
567 priority_policy_.get(), name_.c_str(), this, child_policy_.get());
568 }
569 child_policy_->UpdateLocked(std::move(update_args));
570 }
571
572 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)573 PriorityLb::ChildPriority::CreateChildPolicyLocked(
574 const grpc_channel_args* args) {
575 LoadBalancingPolicy::Args lb_policy_args;
576 lb_policy_args.work_serializer = priority_policy_->work_serializer();
577 lb_policy_args.args = args;
578 lb_policy_args.channel_control_helper =
579 absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
580 OrphanablePtr<LoadBalancingPolicy> lb_policy =
581 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
582 &grpc_lb_priority_trace);
583 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
584 gpr_log(GPR_INFO,
585 "[priority_lb %p] child %s (%p): created new child policy "
586 "handler %p",
587 priority_policy_.get(), name_.c_str(), this, lb_policy.get());
588 }
589 // Add the parent's interested_parties pollset_set to that of the newly
590 // created child policy. This will make the child policy progress upon
591 // activity on the parent LB, which in turn is tied to the application's call.
592 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
593 priority_policy_->interested_parties());
594 return lb_policy;
595 }
596
ExitIdleLocked()597 void PriorityLb::ChildPriority::ExitIdleLocked() {
598 if (connectivity_state_ == GRPC_CHANNEL_IDLE &&
599 !failover_timer_callback_pending_) {
600 StartFailoverTimerLocked();
601 }
602 child_policy_->ExitIdleLocked();
603 }
604
ResetBackoffLocked()605 void PriorityLb::ChildPriority::ResetBackoffLocked() {
606 child_policy_->ResetBackoffLocked();
607 }
608
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)609 void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
610 grpc_connectivity_state state, const absl::Status& status,
611 std::unique_ptr<SubchannelPicker> picker) {
612 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
613 gpr_log(GPR_INFO,
614 "[priority_lb %p] child %s (%p): state update: %s (%s) picker %p",
615 priority_policy_.get(), name_.c_str(), this,
616 ConnectivityStateName(state), status.ToString().c_str(),
617 picker.get());
618 }
619 // Store the state and picker.
620 connectivity_state_ = state;
621 connectivity_status_ = status;
622 picker_wrapper_ = MakeRefCounted<RefCountedPicker>(std::move(picker));
623 // If READY or TRANSIENT_FAILURE, cancel failover timer.
624 if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
625 MaybeCancelFailoverTimerLocked();
626 }
627 // Notify the parent policy.
628 priority_policy_->HandleChildConnectivityStateChangeLocked(this);
629 }
630
StartFailoverTimerLocked()631 void PriorityLb::ChildPriority::StartFailoverTimerLocked() {
632 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
633 gpr_log(GPR_INFO,
634 "[priority_lb %p] child %s (%p): starting failover timer for %d ms",
635 priority_policy_.get(), name_.c_str(), this,
636 priority_policy_->child_failover_timeout_ms_);
637 }
638 Ref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked").release();
639 grpc_timer_init(
640 &failover_timer_,
641 ExecCtx::Get()->Now() + priority_policy_->child_failover_timeout_ms_,
642 &on_failover_timer_);
643 failover_timer_callback_pending_ = true;
644 }
645
MaybeCancelFailoverTimerLocked()646 void PriorityLb::ChildPriority::MaybeCancelFailoverTimerLocked() {
647 if (failover_timer_callback_pending_) {
648 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
649 gpr_log(GPR_INFO,
650 "[priority_lb %p] child %s (%p): cancelling failover timer",
651 priority_policy_.get(), name_.c_str(), this);
652 }
653 grpc_timer_cancel(&failover_timer_);
654 failover_timer_callback_pending_ = false;
655 }
656 }
657
OnFailoverTimer(void * arg,grpc_error * error)658 void PriorityLb::ChildPriority::OnFailoverTimer(void* arg, grpc_error* error) {
659 ChildPriority* self = static_cast<ChildPriority*>(arg);
660 GRPC_ERROR_REF(error); // ref owned by lambda
661 self->priority_policy_->work_serializer()->Run(
662 [self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION);
663 }
664
OnFailoverTimerLocked(grpc_error * error)665 void PriorityLb::ChildPriority::OnFailoverTimerLocked(grpc_error* error) {
666 if (error == GRPC_ERROR_NONE && failover_timer_callback_pending_ &&
667 !priority_policy_->shutting_down_) {
668 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
669 gpr_log(GPR_INFO,
670 "[priority_lb %p] child %s (%p): failover timer fired, "
671 "reporting TRANSIENT_FAILURE",
672 priority_policy_.get(), name_.c_str(), this);
673 }
674 failover_timer_callback_pending_ = false;
675 OnConnectivityStateUpdateLocked(
676 GRPC_CHANNEL_TRANSIENT_FAILURE,
677 absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
678 nullptr);
679 }
680 Unref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked");
681 GRPC_ERROR_UNREF(error);
682 }
683
DeactivateLocked()684 void PriorityLb::ChildPriority::DeactivateLocked() {
685 // If already deactivated, don't do it again.
686 if (deactivation_timer_callback_pending_) return;
687 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
688 gpr_log(GPR_INFO,
689 "[priority_lb %p] child %s (%p): deactivating -- will remove in %d "
690 "ms.",
691 priority_policy_.get(), name_.c_str(), this,
692 kChildRetentionIntervalMs);
693 }
694 MaybeCancelFailoverTimerLocked();
695 // Start a timer to delete the child.
696 Ref(DEBUG_LOCATION, "ChildPriority+timer").release();
697 grpc_timer_init(&deactivation_timer_,
698 ExecCtx::Get()->Now() + kChildRetentionIntervalMs,
699 &on_deactivation_timer_);
700 deactivation_timer_callback_pending_ = true;
701 }
702
MaybeReactivateLocked()703 void PriorityLb::ChildPriority::MaybeReactivateLocked() {
704 if (deactivation_timer_callback_pending_) {
705 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
706 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
707 priority_policy_.get(), name_.c_str(), this);
708 }
709 deactivation_timer_callback_pending_ = false;
710 grpc_timer_cancel(&deactivation_timer_);
711 }
712 }
713
OnDeactivationTimer(void * arg,grpc_error * error)714 void PriorityLb::ChildPriority::OnDeactivationTimer(void* arg,
715 grpc_error* error) {
716 ChildPriority* self = static_cast<ChildPriority*>(arg);
717 GRPC_ERROR_REF(error); // ref owned by lambda
718 self->priority_policy_->work_serializer()->Run(
719 [self, error]() { self->OnDeactivationTimerLocked(error); },
720 DEBUG_LOCATION);
721 }
722
OnDeactivationTimerLocked(grpc_error * error)723 void PriorityLb::ChildPriority::OnDeactivationTimerLocked(grpc_error* error) {
724 if (error == GRPC_ERROR_NONE && deactivation_timer_callback_pending_ &&
725 !priority_policy_->shutting_down_) {
726 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
727 gpr_log(GPR_INFO,
728 "[priority_lb %p] child %s (%p): deactivation timer fired, "
729 "deleting child",
730 priority_policy_.get(), name_.c_str(), this);
731 }
732 deactivation_timer_callback_pending_ = false;
733 priority_policy_->DeleteChild(this);
734 }
735 Unref(DEBUG_LOCATION, "ChildPriority+timer");
736 GRPC_ERROR_UNREF(error);
737 }
738
739 //
740 // PriorityLb::ChildPriority::Helper
741 //
742
RequestReresolution()743 void PriorityLb::ChildPriority::Helper::RequestReresolution() {
744 if (priority_->priority_policy_->shutting_down_) return;
745 if (priority_->ignore_reresolution_requests_) {
746 return;
747 }
748 priority_->priority_policy_->channel_control_helper()->RequestReresolution();
749 }
750
751 RefCountedPtr<SubchannelInterface>
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)752 PriorityLb::ChildPriority::Helper::CreateSubchannel(
753 ServerAddress address, const grpc_channel_args& args) {
754 if (priority_->priority_policy_->shutting_down_) return nullptr;
755 return priority_->priority_policy_->channel_control_helper()
756 ->CreateSubchannel(std::move(address), args);
757 }
758
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)759 void PriorityLb::ChildPriority::Helper::UpdateState(
760 grpc_connectivity_state state, const absl::Status& status,
761 std::unique_ptr<SubchannelPicker> picker) {
762 if (priority_->priority_policy_->shutting_down_) return;
763 // Notify the priority.
764 priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker));
765 }
766
AddTraceEvent(TraceSeverity severity,absl::string_view message)767 void PriorityLb::ChildPriority::Helper::AddTraceEvent(
768 TraceSeverity severity, absl::string_view message) {
769 if (priority_->priority_policy_->shutting_down_) return;
770 priority_->priority_policy_->channel_control_helper()->AddTraceEvent(severity,
771 message);
772 }
773
774 //
775 // factory
776 //
777
778 class PriorityLbFactory : public LoadBalancingPolicyFactory {
779 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const780 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
781 LoadBalancingPolicy::Args args) const override {
782 return MakeOrphanable<PriorityLb>(std::move(args));
783 }
784
name() const785 const char* name() const override { return kPriority; }
786
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const787 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
788 const Json& json, grpc_error** error) const override {
789 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
790 if (json.type() == Json::Type::JSON_NULL) {
791 // priority was mentioned as a policy in the deprecated
792 // loadBalancingPolicy field or in the client API.
793 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
794 "field:loadBalancingPolicy error:priority policy requires "
795 "configuration. Please use loadBalancingConfig field of service "
796 "config instead.");
797 return nullptr;
798 }
799 std::vector<grpc_error*> error_list;
800 // Children.
801 std::map<std::string, PriorityLbConfig::PriorityLbChild> children;
802 auto it = json.object_value().find("children");
803 if (it == json.object_value().end()) {
804 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
805 "field:children error:required field missing"));
806 } else if (it->second.type() != Json::Type::OBJECT) {
807 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
808 "field:children error:type should be object"));
809 } else {
810 const Json::Object& object = it->second.object_value();
811 for (const auto& p : object) {
812 const std::string& child_name = p.first;
813 const Json& element = p.second;
814 if (element.type() != Json::Type::OBJECT) {
815 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
816 absl::StrCat("field:children key:", child_name,
817 " error:should be type object")
818 .c_str()));
819 } else {
820 auto it2 = element.object_value().find("config");
821 if (it2 == element.object_value().end()) {
822 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
823 absl::StrCat("field:children key:", child_name,
824 " error:missing 'config' field")
825 .c_str()));
826 } else {
827 grpc_error* parse_error = GRPC_ERROR_NONE;
828 auto config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
829 it2->second, &parse_error);
830 bool ignore_resolution_requests = false;
831 // If present, ignore_reresolution_requests must be of type
832 // boolean.
833 auto it3 =
834 element.object_value().find("ignore_reresolution_requests");
835 if (it3 != element.object_value().end()) {
836 if (it3->second.type() == Json::Type::JSON_TRUE) {
837 ignore_resolution_requests = true;
838 } else if (it3->second.type() != Json::Type::JSON_FALSE) {
839 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
840 absl::StrCat("field:children key:", child_name,
841 " field:ignore_reresolution_requests:should "
842 "be type boolean")
843 .c_str()));
844 }
845 }
846 if (config == nullptr) {
847 GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
848 error_list.push_back(
849 GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
850 absl::StrCat("field:children key:", child_name).c_str(),
851 &parse_error, 1));
852 GRPC_ERROR_UNREF(parse_error);
853 }
854 children[child_name].config = std::move(config);
855 children[child_name].ignore_reresolution_requests =
856 ignore_resolution_requests;
857 }
858 }
859 }
860 }
861 // Priorities.
862 std::vector<std::string> priorities;
863 it = json.object_value().find("priorities");
864 if (it == json.object_value().end()) {
865 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
866 "field:priorities error:required field missing"));
867 } else if (it->second.type() != Json::Type::ARRAY) {
868 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
869 "field:priorities error:type should be array"));
870 } else {
871 const Json::Array& array = it->second.array_value();
872 for (size_t i = 0; i < array.size(); ++i) {
873 const Json& element = array[i];
874 if (element.type() != Json::Type::STRING) {
875 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
876 absl::StrCat("field:priorities element:", i,
877 " error:should be type string")
878 .c_str()));
879 } else if (children.find(element.string_value()) == children.end()) {
880 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
881 absl::StrCat("field:priorities element:", i,
882 " error:unknown child '", element.string_value(),
883 "'")
884 .c_str()));
885 } else {
886 priorities.emplace_back(element.string_value());
887 }
888 }
889 if (priorities.size() != children.size()) {
890 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
891 absl::StrCat("field:priorities error:priorities size (",
892 priorities.size(), ") != children size (",
893 children.size(), ")")
894 .c_str()));
895 }
896 }
897 if (error_list.empty()) {
898 return MakeRefCounted<PriorityLbConfig>(std::move(children),
899 std::move(priorities));
900 } else {
901 *error = GRPC_ERROR_CREATE_FROM_VECTOR(
902 "priority_experimental LB policy config", &error_list);
903 return nullptr;
904 }
905 }
906 };
907
908 } // namespace
909
910 } // namespace grpc_core
911
912 //
913 // Plugin registration
914 //
915
grpc_lb_policy_priority_init()916 void grpc_lb_policy_priority_init() {
917 grpc_core::LoadBalancingPolicyRegistry::Builder::
918 RegisterLoadBalancingPolicyFactory(
919 absl::make_unique<grpc_core::PriorityLbFactory>());
920 }
921
grpc_lb_policy_priority_shutdown()922 void grpc_lb_policy_priority_shutdown() {}
923