1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <inttypes.h>
20 #include <limits.h>
21
22 #include "absl/strings/str_cat.h"
23 #include "absl/strings/str_format.h"
24
25 #include <grpc/grpc.h>
26
27 #include "src/core/ext/filters/client_channel/lb_policy.h"
28 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
30 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
31 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
32 #include "src/core/lib/channel/channel_args.h"
33 #include "src/core/lib/gprpp/orphanable.h"
34 #include "src/core/lib/gprpp/ref_counted_ptr.h"
35 #include "src/core/lib/iomgr/timer.h"
36 #include "src/core/lib/iomgr/work_serializer.h"
37 #include "src/core/lib/transport/error_utils.h"
38
39 namespace grpc_core {
40
41 TraceFlag grpc_lb_priority_trace(false, "priority_lb");
42
43 namespace {
44
45 constexpr char kPriority[] = "priority_experimental";
46
47 // How long we keep a child around for after it is no longer being used
48 // (either because it has been removed from the config or because we
49 // have switched to a higher-priority child).
50 constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000;
51
52 // Default for how long we wait for a newly created child to get connected
53 // before starting to attempt the next priority. Overridable via channel arg.
54 constexpr int kDefaultChildFailoverTimeoutMs = 10000;
55
56 // Config for priority LB policy.
57 class PriorityLbConfig : public LoadBalancingPolicy::Config {
58 public:
59 struct PriorityLbChild {
60 RefCountedPtr<LoadBalancingPolicy::Config> config;
61 bool ignore_reresolution_requests = false;
62 };
63
PriorityLbConfig(std::map<std::string,PriorityLbChild> children,std::vector<std::string> priorities)64 PriorityLbConfig(std::map<std::string, PriorityLbChild> children,
65 std::vector<std::string> priorities)
66 : children_(std::move(children)), priorities_(std::move(priorities)) {}
67
name() const68 const char* name() const override { return kPriority; }
69
children() const70 const std::map<std::string, PriorityLbChild>& children() const {
71 return children_;
72 }
priorities() const73 const std::vector<std::string>& priorities() const { return priorities_; }
74
75 private:
76 const std::map<std::string, PriorityLbChild> children_;
77 const std::vector<std::string> priorities_;
78 };
79
80 // priority LB policy.
81 class PriorityLb : public LoadBalancingPolicy {
82 public:
83 explicit PriorityLb(Args args);
84
name() const85 const char* name() const override { return kPriority; }
86
87 void UpdateLocked(UpdateArgs args) override;
88 void ExitIdleLocked() override;
89 void ResetBackoffLocked() override;
90
91 private:
92 // Each ChildPriority holds a ref to the PriorityLb.
93 class ChildPriority : public InternallyRefCounted<ChildPriority> {
94 public:
95 ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name);
96
~ChildPriority()97 ~ChildPriority() override {
98 priority_policy_.reset(DEBUG_LOCATION, "ChildPriority");
99 }
100
name() const101 const std::string& name() const { return name_; }
102
103 void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
104 bool ignore_reresolution_requests);
105 void ExitIdleLocked();
106 void ResetBackoffLocked();
107 void DeactivateLocked();
108 void MaybeReactivateLocked();
109 void MaybeCancelFailoverTimerLocked();
110
111 void Orphan() override;
112
GetPicker()113 std::unique_ptr<SubchannelPicker> GetPicker() {
114 return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
115 }
116
connectivity_state() const117 grpc_connectivity_state connectivity_state() const {
118 return connectivity_state_;
119 }
120
connectivity_status() const121 const absl::Status& connectivity_status() const {
122 return connectivity_status_;
123 }
124
failover_timer_callback_pending() const125 bool failover_timer_callback_pending() const {
126 return failover_timer_callback_pending_;
127 }
128
129 private:
130 // A simple wrapper for ref-counting a picker from the child policy.
131 class RefCountedPicker : public RefCounted<RefCountedPicker> {
132 public:
RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)133 explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
134 : picker_(std::move(picker)) {}
Pick(PickArgs args)135 PickResult Pick(PickArgs args) { return picker_->Pick(args); }
136
137 private:
138 std::unique_ptr<SubchannelPicker> picker_;
139 };
140
141 // A non-ref-counted wrapper for RefCountedPicker.
142 class RefCountedPickerWrapper : public SubchannelPicker {
143 public:
RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)144 explicit RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)
145 : picker_(std::move(picker)) {}
Pick(PickArgs args)146 PickResult Pick(PickArgs args) override { return picker_->Pick(args); }
147
148 private:
149 RefCountedPtr<RefCountedPicker> picker_;
150 };
151
152 class Helper : public ChannelControlHelper {
153 public:
Helper(RefCountedPtr<ChildPriority> priority)154 explicit Helper(RefCountedPtr<ChildPriority> priority)
155 : priority_(std::move(priority)) {}
156
~Helper()157 ~Helper() override { priority_.reset(DEBUG_LOCATION, "Helper"); }
158
159 RefCountedPtr<SubchannelInterface> CreateSubchannel(
160 ServerAddress address, const grpc_channel_args& args) override;
161 void UpdateState(grpc_connectivity_state state,
162 const absl::Status& status,
163 std::unique_ptr<SubchannelPicker> picker) override;
164 void RequestReresolution() override;
165 void AddTraceEvent(TraceSeverity severity,
166 absl::string_view message) override;
167
168 private:
169 RefCountedPtr<ChildPriority> priority_;
170 };
171
172 // Methods for dealing with the child policy.
173 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
174 const grpc_channel_args* args);
175
176 void OnConnectivityStateUpdateLocked(
177 grpc_connectivity_state state, const absl::Status& status,
178 std::unique_ptr<SubchannelPicker> picker);
179
180 void StartFailoverTimerLocked();
181
182 static void OnFailoverTimer(void* arg, grpc_error_handle error);
183 void OnFailoverTimerLocked(grpc_error_handle error);
184 static void OnDeactivationTimer(void* arg, grpc_error_handle error);
185 void OnDeactivationTimerLocked(grpc_error_handle error);
186
187 RefCountedPtr<PriorityLb> priority_policy_;
188 const std::string name_;
189 bool ignore_reresolution_requests_ = false;
190
191 OrphanablePtr<LoadBalancingPolicy> child_policy_;
192
193 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
194 absl::Status connectivity_status_;
195 RefCountedPtr<RefCountedPicker> picker_wrapper_;
196
197 // States for delayed removal.
198 grpc_timer deactivation_timer_;
199 grpc_closure on_deactivation_timer_;
200 bool deactivation_timer_callback_pending_ = false;
201
202 // States of failover.
203 grpc_timer failover_timer_;
204 grpc_closure on_failover_timer_;
205 bool failover_timer_callback_pending_ = false;
206 };
207
208 ~PriorityLb() override;
209
210 void ShutdownLocked() override;
211
212 // Returns UINT32_MAX if child is not in current priority list.
213 uint32_t GetChildPriorityLocked(const std::string& child_name) const;
214
215 void HandleChildConnectivityStateChangeLocked(ChildPriority* child);
216 void DeleteChild(ChildPriority* child);
217
218 void TryNextPriorityLocked(bool report_connecting);
219 void SelectPriorityLocked(uint32_t priority);
220
221 const int child_failover_timeout_ms_;
222
223 // Current channel args and config from the resolver.
224 const grpc_channel_args* args_ = nullptr;
225 RefCountedPtr<PriorityLbConfig> config_;
226 HierarchicalAddressMap addresses_;
227
228 // Internal state.
229 bool shutting_down_ = false;
230
231 std::map<std::string, OrphanablePtr<ChildPriority>> children_;
232 // The priority that is being used.
233 uint32_t current_priority_ = UINT32_MAX;
234 // Points to the current child from before the most recent update.
235 // We will continue to use this child until we decide which of the new
236 // children to use.
237 ChildPriority* current_child_from_before_update_ = nullptr;
238 };
239
240 //
241 // PriorityLb
242 //
243
PriorityLb(Args args)244 PriorityLb::PriorityLb(Args args)
245 : LoadBalancingPolicy(std::move(args)),
246 child_failover_timeout_ms_(grpc_channel_args_find_integer(
247 args.args, GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS,
248 {kDefaultChildFailoverTimeoutMs, 0, INT_MAX})) {
249 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
250 gpr_log(GPR_INFO, "[priority_lb %p] created", this);
251 }
252 }
253
~PriorityLb()254 PriorityLb::~PriorityLb() {
255 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
256 gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this);
257 }
258 grpc_channel_args_destroy(args_);
259 }
260
ShutdownLocked()261 void PriorityLb::ShutdownLocked() {
262 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
263 gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this);
264 }
265 shutting_down_ = true;
266 children_.clear();
267 }
268
ExitIdleLocked()269 void PriorityLb::ExitIdleLocked() {
270 if (current_priority_ != UINT32_MAX) {
271 const std::string& child_name = config_->priorities()[current_priority_];
272 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
273 gpr_log(GPR_INFO,
274 "[priority_lb %p] exiting IDLE for current priority %d child %s",
275 this, current_priority_, child_name.c_str());
276 }
277 children_[child_name]->ExitIdleLocked();
278 }
279 }
280
ResetBackoffLocked()281 void PriorityLb::ResetBackoffLocked() {
282 for (const auto& p : children_) p.second->ResetBackoffLocked();
283 }
284
UpdateLocked(UpdateArgs args)285 void PriorityLb::UpdateLocked(UpdateArgs args) {
286 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
287 gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
288 }
289 // Save current child.
290 if (current_priority_ != UINT32_MAX) {
291 const std::string& child_name = config_->priorities()[current_priority_];
292 current_child_from_before_update_ = children_[child_name].get();
293 // Unset current_priority_, since it was an index into the old
294 // config's priority list and may no longer be valid. It will be
295 // reset later by TryNextPriorityLocked(), but we unset it here in
296 // case updating any of our children triggers a state update.
297 current_priority_ = UINT32_MAX;
298 }
299 // Update config.
300 config_ = std::move(args.config);
301 // Update args.
302 grpc_channel_args_destroy(args_);
303 args_ = args.args;
304 args.args = nullptr;
305 // Update addresses.
306 addresses_ = MakeHierarchicalAddressMap(args.addresses);
307 // Check all existing children against the new config.
308 for (const auto& p : children_) {
309 const std::string& child_name = p.first;
310 auto& child = p.second;
311 auto config_it = config_->children().find(child_name);
312 if (config_it == config_->children().end()) {
313 // Existing child not found in new config. Deactivate it.
314 child->DeactivateLocked();
315 } else {
316 // Existing child found in new config. Update it.
317 child->UpdateLocked(config_it->second.config,
318 config_it->second.ignore_reresolution_requests);
319 }
320 }
321 // Try to get connected.
322 TryNextPriorityLocked(/*report_connecting=*/children_.empty());
323 }
324
GetChildPriorityLocked(const std::string & child_name) const325 uint32_t PriorityLb::GetChildPriorityLocked(
326 const std::string& child_name) const {
327 for (uint32_t priority = 0; priority < config_->priorities().size();
328 ++priority) {
329 if (config_->priorities()[priority] == child_name) return priority;
330 }
331 return UINT32_MAX;
332 }
333
HandleChildConnectivityStateChangeLocked(ChildPriority * child)334 void PriorityLb::HandleChildConnectivityStateChangeLocked(
335 ChildPriority* child) {
336 // Special case for the child that was the current child before the
337 // most recent update.
338 if (child == current_child_from_before_update_) {
339 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
340 gpr_log(GPR_INFO,
341 "[priority_lb %p] state update for current child from before "
342 "config update",
343 this);
344 }
345 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
346 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
347 // If it's still READY or IDLE, we stick with this child, so pass
348 // the new picker up to our parent.
349 channel_control_helper()->UpdateState(child->connectivity_state(),
350 child->connectivity_status(),
351 child->GetPicker());
352 } else {
353 // If it's no longer READY or IDLE, we should stop using it.
354 // We already started trying other priorities as a result of the
355 // update, but calling TryNextPriorityLocked() ensures that we will
356 // properly select between CONNECTING and TRANSIENT_FAILURE as the
357 // new state to report to our parent.
358 current_child_from_before_update_ = nullptr;
359 TryNextPriorityLocked(/*report_connecting=*/true);
360 }
361 return;
362 }
363 // Otherwise, find the child's priority.
364 uint32_t child_priority = GetChildPriorityLocked(child->name());
365 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
366 gpr_log(GPR_INFO,
367 "[priority_lb %p] state update for priority %u, child %s, current "
368 "priority %u",
369 this, child_priority, child->name().c_str(), current_priority_);
370 }
371 // Ignore priorities not in the current config.
372 if (child_priority == UINT32_MAX) return;
373 // Ignore lower-than-current priorities.
374 if (child_priority > current_priority_) return;
375 // If a child reports TRANSIENT_FAILURE, start trying the next priority.
376 // Note that even if this is for a higher-than-current priority, we
377 // may still need to create some children between this priority and
378 // the current one (e.g., if we got an update that inserted new
379 // priorities ahead of the current one).
380 if (child->connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) {
381 TryNextPriorityLocked(
382 /*report_connecting=*/child_priority == current_priority_);
383 return;
384 }
385 // The update is for a higher-than-current priority (or for any
386 // priority if we don't have any current priority).
387 if (child_priority < current_priority_) {
388 // If the child reports READY or IDLE, switch to that priority.
389 // Otherwise, ignore the update.
390 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
391 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
392 SelectPriorityLocked(child_priority);
393 }
394 return;
395 }
396 // The current priority has returned a new picker, so pass it up to
397 // our parent.
398 channel_control_helper()->UpdateState(child->connectivity_state(),
399 child->connectivity_status(),
400 child->GetPicker());
401 }
402
DeleteChild(ChildPriority * child)403 void PriorityLb::DeleteChild(ChildPriority* child) {
404 // If this was the current child from before the most recent update,
405 // stop using it. We already started trying other priorities as a
406 // result of the update, but calling TryNextPriorityLocked() ensures that
407 // we will properly select between CONNECTING and TRANSIENT_FAILURE as the
408 // new state to report to our parent.
409 if (current_child_from_before_update_ == child) {
410 current_child_from_before_update_ = nullptr;
411 TryNextPriorityLocked(/*report_connecting=*/true);
412 }
413 children_.erase(child->name());
414 }
415
TryNextPriorityLocked(bool report_connecting)416 void PriorityLb::TryNextPriorityLocked(bool report_connecting) {
417 current_priority_ = UINT32_MAX;
418 for (uint32_t priority = 0; priority < config_->priorities().size();
419 ++priority) {
420 // If the child for the priority does not exist yet, create it.
421 const std::string& child_name = config_->priorities()[priority];
422 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
423 gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
424 priority, child_name.c_str());
425 }
426 auto& child = children_[child_name];
427 if (child == nullptr) {
428 if (report_connecting) {
429 channel_control_helper()->UpdateState(
430 GRPC_CHANNEL_CONNECTING, absl::Status(),
431 absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
432 }
433 child = MakeOrphanable<ChildPriority>(
434 Ref(DEBUG_LOCATION, "ChildPriority"), child_name);
435 auto child_config = config_->children().find(child_name);
436 GPR_DEBUG_ASSERT(child_config != config_->children().end());
437 child->UpdateLocked(child_config->second.config,
438 child_config->second.ignore_reresolution_requests);
439 return;
440 }
441 // The child already exists.
442 child->MaybeReactivateLocked();
443 // If the child is in state READY or IDLE, switch to it.
444 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
445 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
446 SelectPriorityLocked(priority);
447 return;
448 }
449 // Child is not READY or IDLE.
450 // If its failover timer is still pending, give it time to fire.
451 if (child->failover_timer_callback_pending()) {
452 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
453 gpr_log(GPR_INFO,
454 "[priority_lb %p] priority %u, child %s: child still "
455 "attempting to connect, will wait",
456 this, priority, child_name.c_str());
457 }
458 if (report_connecting) {
459 channel_control_helper()->UpdateState(
460 GRPC_CHANNEL_CONNECTING, absl::Status(),
461 absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
462 }
463 return;
464 }
465 // Child has been failing for a while. Move on to the next priority.
466 }
467 // If there are no more priorities to try, report TRANSIENT_FAILURE.
468 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
469 gpr_log(GPR_INFO,
470 "[priority_lb %p] no priority reachable, putting channel in "
471 "TRANSIENT_FAILURE",
472 this);
473 }
474 current_child_from_before_update_ = nullptr;
475 grpc_error_handle error = grpc_error_set_int(
476 GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready priority"),
477 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
478 channel_control_helper()->UpdateState(
479 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
480 absl::make_unique<TransientFailurePicker>(error));
481 }
482
SelectPriorityLocked(uint32_t priority)483 void PriorityLb::SelectPriorityLocked(uint32_t priority) {
484 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
485 gpr_log(GPR_INFO, "[priority_lb %p] selected priority %u, child %s", this,
486 priority, config_->priorities()[priority].c_str());
487 }
488 current_priority_ = priority;
489 current_child_from_before_update_ = nullptr;
490 // Deactivate lower priorities.
491 for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
492 const std::string& child_name = config_->priorities()[p];
493 auto it = children_.find(child_name);
494 if (it != children_.end()) it->second->DeactivateLocked();
495 }
496 // Update picker.
497 auto& child = children_[config_->priorities()[priority]];
498 channel_control_helper()->UpdateState(child->connectivity_state(),
499 child->connectivity_status(),
500 child->GetPicker());
501 }
502
503 //
504 // PriorityLb::ChildPriority
505 //
506
ChildPriority(RefCountedPtr<PriorityLb> priority_policy,std::string name)507 PriorityLb::ChildPriority::ChildPriority(
508 RefCountedPtr<PriorityLb> priority_policy, std::string name)
509 : priority_policy_(std::move(priority_policy)), name_(std::move(name)) {
510 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
511 gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)",
512 priority_policy_.get(), name_.c_str(), this);
513 }
514 GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this,
515 grpc_schedule_on_exec_ctx);
516 GRPC_CLOSURE_INIT(&on_deactivation_timer_, OnDeactivationTimer, this,
517 grpc_schedule_on_exec_ctx);
518 // Start the failover timer.
519 StartFailoverTimerLocked();
520 }
521
Orphan()522 void PriorityLb::ChildPriority::Orphan() {
523 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
524 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned",
525 priority_policy_.get(), name_.c_str(), this);
526 }
527 MaybeCancelFailoverTimerLocked();
528 if (deactivation_timer_callback_pending_) {
529 grpc_timer_cancel(&deactivation_timer_);
530 }
531 // Remove the child policy's interested_parties pollset_set from the
532 // xDS policy.
533 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
534 priority_policy_->interested_parties());
535 child_policy_.reset();
536 // Drop our ref to the child's picker, in case it's holding a ref to
537 // the child.
538 picker_wrapper_.reset();
539 if (deactivation_timer_callback_pending_) {
540 grpc_timer_cancel(&deactivation_timer_);
541 }
542 Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
543 }
544
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,bool ignore_reresolution_requests)545 void PriorityLb::ChildPriority::UpdateLocked(
546 RefCountedPtr<LoadBalancingPolicy::Config> config,
547 bool ignore_reresolution_requests) {
548 if (priority_policy_->shutting_down_) return;
549 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
550 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
551 priority_policy_.get(), name_.c_str(), this);
552 }
553 ignore_reresolution_requests_ = ignore_reresolution_requests;
554 // Create policy if needed.
555 if (child_policy_ == nullptr) {
556 child_policy_ = CreateChildPolicyLocked(priority_policy_->args_);
557 }
558 // Construct update args.
559 UpdateArgs update_args;
560 update_args.config = std::move(config);
561 update_args.addresses = priority_policy_->addresses_[name_];
562 update_args.args = grpc_channel_args_copy(priority_policy_->args_);
563 // Update the policy.
564 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
565 gpr_log(GPR_INFO,
566 "[priority_lb %p] child %s (%p): updating child policy handler %p",
567 priority_policy_.get(), name_.c_str(), this, child_policy_.get());
568 }
569 child_policy_->UpdateLocked(std::move(update_args));
570 }
571
572 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)573 PriorityLb::ChildPriority::CreateChildPolicyLocked(
574 const grpc_channel_args* args) {
575 LoadBalancingPolicy::Args lb_policy_args;
576 lb_policy_args.work_serializer = priority_policy_->work_serializer();
577 lb_policy_args.args = args;
578 lb_policy_args.channel_control_helper =
579 absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
580 OrphanablePtr<LoadBalancingPolicy> lb_policy =
581 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
582 &grpc_lb_priority_trace);
583 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
584 gpr_log(GPR_INFO,
585 "[priority_lb %p] child %s (%p): created new child policy "
586 "handler %p",
587 priority_policy_.get(), name_.c_str(), this, lb_policy.get());
588 }
589 // Add the parent's interested_parties pollset_set to that of the newly
590 // created child policy. This will make the child policy progress upon
591 // activity on the parent LB, which in turn is tied to the application's call.
592 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
593 priority_policy_->interested_parties());
594 return lb_policy;
595 }
596
ExitIdleLocked()597 void PriorityLb::ChildPriority::ExitIdleLocked() {
598 if (connectivity_state_ == GRPC_CHANNEL_IDLE &&
599 !failover_timer_callback_pending_) {
600 StartFailoverTimerLocked();
601 }
602 child_policy_->ExitIdleLocked();
603 }
604
ResetBackoffLocked()605 void PriorityLb::ChildPriority::ResetBackoffLocked() {
606 child_policy_->ResetBackoffLocked();
607 }
608
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)609 void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
610 grpc_connectivity_state state, const absl::Status& status,
611 std::unique_ptr<SubchannelPicker> picker) {
612 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
613 gpr_log(GPR_INFO,
614 "[priority_lb %p] child %s (%p): state update: %s (%s) picker %p",
615 priority_policy_.get(), name_.c_str(), this,
616 ConnectivityStateName(state), status.ToString().c_str(),
617 picker.get());
618 }
619 // Store the state and picker.
620 connectivity_state_ = state;
621 connectivity_status_ = status;
622 picker_wrapper_ = MakeRefCounted<RefCountedPicker>(std::move(picker));
623 // If READY or TRANSIENT_FAILURE, cancel failover timer.
624 if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
625 MaybeCancelFailoverTimerLocked();
626 }
627 // Notify the parent policy.
628 priority_policy_->HandleChildConnectivityStateChangeLocked(this);
629 }
630
StartFailoverTimerLocked()631 void PriorityLb::ChildPriority::StartFailoverTimerLocked() {
632 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
633 gpr_log(GPR_INFO,
634 "[priority_lb %p] child %s (%p): starting failover timer for %d ms",
635 priority_policy_.get(), name_.c_str(), this,
636 priority_policy_->child_failover_timeout_ms_);
637 }
638 Ref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked").release();
639 grpc_timer_init(
640 &failover_timer_,
641 ExecCtx::Get()->Now() + priority_policy_->child_failover_timeout_ms_,
642 &on_failover_timer_);
643 failover_timer_callback_pending_ = true;
644 }
645
MaybeCancelFailoverTimerLocked()646 void PriorityLb::ChildPriority::MaybeCancelFailoverTimerLocked() {
647 if (failover_timer_callback_pending_) {
648 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
649 gpr_log(GPR_INFO,
650 "[priority_lb %p] child %s (%p): cancelling failover timer",
651 priority_policy_.get(), name_.c_str(), this);
652 }
653 grpc_timer_cancel(&failover_timer_);
654 failover_timer_callback_pending_ = false;
655 }
656 }
657
OnFailoverTimer(void * arg,grpc_error_handle error)658 void PriorityLb::ChildPriority::OnFailoverTimer(void* arg,
659 grpc_error_handle error) {
660 ChildPriority* self = static_cast<ChildPriority*>(arg);
661 GRPC_ERROR_REF(error); // ref owned by lambda
662 self->priority_policy_->work_serializer()->Run(
663 [self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION);
664 }
665
OnFailoverTimerLocked(grpc_error_handle error)666 void PriorityLb::ChildPriority::OnFailoverTimerLocked(grpc_error_handle error) {
667 if (error == GRPC_ERROR_NONE && failover_timer_callback_pending_ &&
668 !priority_policy_->shutting_down_) {
669 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
670 gpr_log(GPR_INFO,
671 "[priority_lb %p] child %s (%p): failover timer fired, "
672 "reporting TRANSIENT_FAILURE",
673 priority_policy_.get(), name_.c_str(), this);
674 }
675 failover_timer_callback_pending_ = false;
676 OnConnectivityStateUpdateLocked(
677 GRPC_CHANNEL_TRANSIENT_FAILURE,
678 absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
679 nullptr);
680 }
681 Unref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked");
682 GRPC_ERROR_UNREF(error);
683 }
684
DeactivateLocked()685 void PriorityLb::ChildPriority::DeactivateLocked() {
686 // If already deactivated, don't do it again.
687 if (deactivation_timer_callback_pending_) return;
688 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
689 gpr_log(GPR_INFO,
690 "[priority_lb %p] child %s (%p): deactivating -- will remove in %d "
691 "ms.",
692 priority_policy_.get(), name_.c_str(), this,
693 kChildRetentionIntervalMs);
694 }
695 MaybeCancelFailoverTimerLocked();
696 // Start a timer to delete the child.
697 Ref(DEBUG_LOCATION, "ChildPriority+timer").release();
698 grpc_timer_init(&deactivation_timer_,
699 ExecCtx::Get()->Now() + kChildRetentionIntervalMs,
700 &on_deactivation_timer_);
701 deactivation_timer_callback_pending_ = true;
702 }
703
MaybeReactivateLocked()704 void PriorityLb::ChildPriority::MaybeReactivateLocked() {
705 if (deactivation_timer_callback_pending_) {
706 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
707 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
708 priority_policy_.get(), name_.c_str(), this);
709 }
710 deactivation_timer_callback_pending_ = false;
711 grpc_timer_cancel(&deactivation_timer_);
712 }
713 }
714
OnDeactivationTimer(void * arg,grpc_error_handle error)715 void PriorityLb::ChildPriority::OnDeactivationTimer(void* arg,
716 grpc_error_handle error) {
717 ChildPriority* self = static_cast<ChildPriority*>(arg);
718 GRPC_ERROR_REF(error); // ref owned by lambda
719 self->priority_policy_->work_serializer()->Run(
720 [self, error]() { self->OnDeactivationTimerLocked(error); },
721 DEBUG_LOCATION);
722 }
723
OnDeactivationTimerLocked(grpc_error_handle error)724 void PriorityLb::ChildPriority::OnDeactivationTimerLocked(
725 grpc_error_handle error) {
726 if (error == GRPC_ERROR_NONE && deactivation_timer_callback_pending_ &&
727 !priority_policy_->shutting_down_) {
728 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
729 gpr_log(GPR_INFO,
730 "[priority_lb %p] child %s (%p): deactivation timer fired, "
731 "deleting child",
732 priority_policy_.get(), name_.c_str(), this);
733 }
734 deactivation_timer_callback_pending_ = false;
735 priority_policy_->DeleteChild(this);
736 }
737 Unref(DEBUG_LOCATION, "ChildPriority+timer");
738 GRPC_ERROR_UNREF(error);
739 }
740
741 //
742 // PriorityLb::ChildPriority::Helper
743 //
744
RequestReresolution()745 void PriorityLb::ChildPriority::Helper::RequestReresolution() {
746 if (priority_->priority_policy_->shutting_down_) return;
747 if (priority_->ignore_reresolution_requests_) {
748 return;
749 }
750 priority_->priority_policy_->channel_control_helper()->RequestReresolution();
751 }
752
753 RefCountedPtr<SubchannelInterface>
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)754 PriorityLb::ChildPriority::Helper::CreateSubchannel(
755 ServerAddress address, const grpc_channel_args& args) {
756 if (priority_->priority_policy_->shutting_down_) return nullptr;
757 return priority_->priority_policy_->channel_control_helper()
758 ->CreateSubchannel(std::move(address), args);
759 }
760
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)761 void PriorityLb::ChildPriority::Helper::UpdateState(
762 grpc_connectivity_state state, const absl::Status& status,
763 std::unique_ptr<SubchannelPicker> picker) {
764 if (priority_->priority_policy_->shutting_down_) return;
765 // Notify the priority.
766 priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker));
767 }
768
AddTraceEvent(TraceSeverity severity,absl::string_view message)769 void PriorityLb::ChildPriority::Helper::AddTraceEvent(
770 TraceSeverity severity, absl::string_view message) {
771 if (priority_->priority_policy_->shutting_down_) return;
772 priority_->priority_policy_->channel_control_helper()->AddTraceEvent(severity,
773 message);
774 }
775
776 //
777 // factory
778 //
779
780 class PriorityLbFactory : public LoadBalancingPolicyFactory {
781 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const782 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
783 LoadBalancingPolicy::Args args) const override {
784 return MakeOrphanable<PriorityLb>(std::move(args));
785 }
786
name() const787 const char* name() const override { return kPriority; }
788
ParseLoadBalancingConfig(const Json & json,grpc_error_handle * error) const789 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
790 const Json& json, grpc_error_handle* error) const override {
791 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
792 if (json.type() == Json::Type::JSON_NULL) {
793 // priority was mentioned as a policy in the deprecated
794 // loadBalancingPolicy field or in the client API.
795 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
796 "field:loadBalancingPolicy error:priority policy requires "
797 "configuration. Please use loadBalancingConfig field of service "
798 "config instead.");
799 return nullptr;
800 }
801 std::vector<grpc_error_handle> error_list;
802 // Children.
803 std::map<std::string, PriorityLbConfig::PriorityLbChild> children;
804 auto it = json.object_value().find("children");
805 if (it == json.object_value().end()) {
806 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
807 "field:children error:required field missing"));
808 } else if (it->second.type() != Json::Type::OBJECT) {
809 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
810 "field:children error:type should be object"));
811 } else {
812 const Json::Object& object = it->second.object_value();
813 for (const auto& p : object) {
814 const std::string& child_name = p.first;
815 const Json& element = p.second;
816 if (element.type() != Json::Type::OBJECT) {
817 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
818 absl::StrCat("field:children key:", child_name,
819 " error:should be type object")
820 .c_str()));
821 } else {
822 auto it2 = element.object_value().find("config");
823 if (it2 == element.object_value().end()) {
824 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
825 absl::StrCat("field:children key:", child_name,
826 " error:missing 'config' field")
827 .c_str()));
828 } else {
829 grpc_error_handle parse_error = GRPC_ERROR_NONE;
830 auto config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
831 it2->second, &parse_error);
832 bool ignore_resolution_requests = false;
833 // If present, ignore_reresolution_requests must be of type
834 // boolean.
835 auto it3 =
836 element.object_value().find("ignore_reresolution_requests");
837 if (it3 != element.object_value().end()) {
838 if (it3->second.type() == Json::Type::JSON_TRUE) {
839 ignore_resolution_requests = true;
840 } else if (it3->second.type() != Json::Type::JSON_FALSE) {
841 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
842 absl::StrCat("field:children key:", child_name,
843 " field:ignore_reresolution_requests:should "
844 "be type boolean")
845 .c_str()));
846 }
847 }
848 if (config == nullptr) {
849 GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
850 error_list.push_back(
851 GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
852 absl::StrCat("field:children key:", child_name).c_str(),
853 &parse_error, 1));
854 GRPC_ERROR_UNREF(parse_error);
855 }
856 children[child_name].config = std::move(config);
857 children[child_name].ignore_reresolution_requests =
858 ignore_resolution_requests;
859 }
860 }
861 }
862 }
863 // Priorities.
864 std::vector<std::string> priorities;
865 it = json.object_value().find("priorities");
866 if (it == json.object_value().end()) {
867 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
868 "field:priorities error:required field missing"));
869 } else if (it->second.type() != Json::Type::ARRAY) {
870 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
871 "field:priorities error:type should be array"));
872 } else {
873 const Json::Array& array = it->second.array_value();
874 for (size_t i = 0; i < array.size(); ++i) {
875 const Json& element = array[i];
876 if (element.type() != Json::Type::STRING) {
877 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
878 absl::StrCat("field:priorities element:", i,
879 " error:should be type string")
880 .c_str()));
881 } else if (children.find(element.string_value()) == children.end()) {
882 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
883 absl::StrCat("field:priorities element:", i,
884 " error:unknown child '", element.string_value(),
885 "'")
886 .c_str()));
887 } else {
888 priorities.emplace_back(element.string_value());
889 }
890 }
891 if (priorities.size() != children.size()) {
892 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
893 absl::StrCat("field:priorities error:priorities size (",
894 priorities.size(), ") != children size (",
895 children.size(), ")")
896 .c_str()));
897 }
898 }
899 if (error_list.empty()) {
900 return MakeRefCounted<PriorityLbConfig>(std::move(children),
901 std::move(priorities));
902 } else {
903 *error = GRPC_ERROR_CREATE_FROM_VECTOR(
904 "priority_experimental LB policy config", &error_list);
905 return nullptr;
906 }
907 }
908 };
909
910 } // namespace
911
912 } // namespace grpc_core
913
914 //
915 // Plugin registration
916 //
917
grpc_lb_policy_priority_init()918 void grpc_lb_policy_priority_init() {
919 grpc_core::LoadBalancingPolicyRegistry::Builder::
920 RegisterLoadBalancingPolicyFactory(
921 absl::make_unique<grpc_core::PriorityLbFactory>());
922 }
923
grpc_lb_policy_priority_shutdown()924 void grpc_lb_policy_priority_shutdown() {}
925