1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/load_balancing/outlier_detection/outlier_detection.h"
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/impl/connectivity_state.h>
21 #include <grpc/support/port_platform.h>
22 #include <inttypes.h>
23 #include <stddef.h>
24
25 #include <algorithm>
26 #include <atomic>
27 #include <cmath>
28 #include <map>
29 #include <memory>
30 #include <set>
31 #include <string>
32 #include <type_traits>
33 #include <utility>
34 #include <vector>
35
36 #include "absl/base/thread_annotations.h"
37 #include "absl/log/check.h"
38 #include "absl/log/log.h"
39 #include "absl/meta/type_traits.h"
40 #include "absl/random/random.h"
41 #include "absl/status/status.h"
42 #include "absl/status/statusor.h"
43 #include "absl/strings/string_view.h"
44 #include "absl/types/variant.h"
45 #include "src/core/client_channel/subchannel_interface_internal.h"
46 #include "src/core/config/core_configuration.h"
47 #include "src/core/lib/address_utils/sockaddr_utils.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/debug/trace.h"
50 #include "src/core/lib/experiments/experiments.h"
51 #include "src/core/lib/iomgr/exec_ctx.h"
52 #include "src/core/lib/iomgr/iomgr_fwd.h"
53 #include "src/core/lib/iomgr/pollset_set.h"
54 #include "src/core/lib/iomgr/resolved_address.h"
55 #include "src/core/lib/transport/connectivity_state.h"
56 #include "src/core/load_balancing/child_policy_handler.h"
57 #include "src/core/load_balancing/delegating_helper.h"
58 #include "src/core/load_balancing/health_check_client_internal.h"
59 #include "src/core/load_balancing/lb_policy.h"
60 #include "src/core/load_balancing/lb_policy_factory.h"
61 #include "src/core/load_balancing/lb_policy_registry.h"
62 #include "src/core/load_balancing/subchannel_interface.h"
63 #include "src/core/resolver/endpoint_addresses.h"
64 #include "src/core/util/debug_location.h"
65 #include "src/core/util/json/json.h"
66 #include "src/core/util/orphanable.h"
67 #include "src/core/util/ref_counted.h"
68 #include "src/core/util/ref_counted_ptr.h"
69 #include "src/core/util/sync.h"
70 #include "src/core/util/unique_type_name.h"
71 #include "src/core/util/validation_errors.h"
72 #include "src/core/util/work_serializer.h"
73
74 namespace grpc_core {
75
76 namespace {
77
78 using ::grpc_event_engine::experimental::EventEngine;
79
80 constexpr absl::string_view kOutlierDetection =
81 "outlier_detection_experimental";
82
83 // Config for xDS Cluster Impl LB policy.
84 class OutlierDetectionLbConfig final : public LoadBalancingPolicy::Config {
85 public:
OutlierDetectionLbConfig(OutlierDetectionConfig outlier_detection_config,RefCountedPtr<LoadBalancingPolicy::Config> child_policy)86 OutlierDetectionLbConfig(
87 OutlierDetectionConfig outlier_detection_config,
88 RefCountedPtr<LoadBalancingPolicy::Config> child_policy)
89 : outlier_detection_config_(outlier_detection_config),
90 child_policy_(std::move(child_policy)) {}
91
name() const92 absl::string_view name() const override { return kOutlierDetection; }
93
CountingEnabled() const94 bool CountingEnabled() const {
95 return outlier_detection_config_.success_rate_ejection.has_value() ||
96 outlier_detection_config_.failure_percentage_ejection.has_value();
97 }
98
outlier_detection_config() const99 const OutlierDetectionConfig& outlier_detection_config() const {
100 return outlier_detection_config_;
101 }
102
child_policy() const103 RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
104 return child_policy_;
105 }
106
107 private:
108 OutlierDetectionConfig outlier_detection_config_;
109 RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
110 };
111
112 // xDS Cluster Impl LB policy.
113 class OutlierDetectionLb final : public LoadBalancingPolicy {
114 public:
115 explicit OutlierDetectionLb(Args args);
116
name() const117 absl::string_view name() const override { return kOutlierDetection; }
118
119 absl::Status UpdateLocked(UpdateArgs args) override;
120 void ExitIdleLocked() override;
121 void ResetBackoffLocked() override;
122
123 private:
124 class SubchannelState;
125 class EndpointState;
126
127 class SubchannelWrapper final : public DelegatingSubchannel {
128 public:
SubchannelWrapper(std::shared_ptr<WorkSerializer> work_serializer,RefCountedPtr<SubchannelState> subchannel_state,RefCountedPtr<SubchannelInterface> subchannel)129 SubchannelWrapper(std::shared_ptr<WorkSerializer> work_serializer,
130 RefCountedPtr<SubchannelState> subchannel_state,
131 RefCountedPtr<SubchannelInterface> subchannel)
132 : DelegatingSubchannel(std::move(subchannel)),
133 work_serializer_(std::move(work_serializer)),
134 subchannel_state_(std::move(subchannel_state)) {
135 if (subchannel_state_ != nullptr) {
136 subchannel_state_->AddSubchannel(this);
137 if (subchannel_state_->endpoint_state()->ejection_time().has_value()) {
138 ejected_ = true;
139 }
140 }
141 }
142
143 void Eject();
144
145 void Uneject();
146
147 void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override;
148
149 void CancelDataWatcher(DataWatcherInterface* watcher) override;
150
endpoint_state() const151 RefCountedPtr<EndpointState> endpoint_state() const {
152 if (subchannel_state_ == nullptr) return nullptr;
153 return subchannel_state_->endpoint_state();
154 }
155
156 private:
157 class WatcherWrapper final
158 : public SubchannelInterface::ConnectivityStateWatcherInterface {
159 public:
WatcherWrapper(WeakRefCountedPtr<SubchannelWrapper> subchannel_wrapper,std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> health_watcher,bool ejected)160 WatcherWrapper(WeakRefCountedPtr<SubchannelWrapper> subchannel_wrapper,
161 std::shared_ptr<
162 SubchannelInterface::ConnectivityStateWatcherInterface>
163 health_watcher,
164 bool ejected)
165 : subchannel_wrapper_(std::move(subchannel_wrapper)),
166 watcher_(std::move(health_watcher)),
167 ejected_(ejected) {}
168
Eject()169 void Eject() {
170 ejected_ = true;
171 if (last_seen_state_.has_value()) {
172 watcher_->OnConnectivityStateChange(
173 GRPC_CHANNEL_TRANSIENT_FAILURE,
174 absl::UnavailableError(
175 absl::StrCat(subchannel_wrapper_->address(),
176 ": subchannel ejected by outlier detection")));
177 }
178 }
179
Uneject()180 void Uneject() {
181 ejected_ = false;
182 if (last_seen_state_.has_value()) {
183 watcher_->OnConnectivityStateChange(*last_seen_state_,
184 last_seen_status_);
185 }
186 }
187
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)188 void OnConnectivityStateChange(grpc_connectivity_state new_state,
189 absl::Status status) override {
190 const bool send_update = !last_seen_state_.has_value() || !ejected_;
191 last_seen_state_ = new_state;
192 last_seen_status_ = status;
193 if (send_update) {
194 if (ejected_) {
195 new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
196 status = absl::UnavailableError(
197 absl::StrCat(subchannel_wrapper_->address(),
198 ": subchannel ejected by outlier detection"));
199 }
200 watcher_->OnConnectivityStateChange(new_state, status);
201 }
202 }
203
interested_parties()204 grpc_pollset_set* interested_parties() override {
205 return watcher_->interested_parties();
206 }
207
208 private:
209 WeakRefCountedPtr<SubchannelWrapper> subchannel_wrapper_;
210 std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
211 watcher_;
212 absl::optional<grpc_connectivity_state> last_seen_state_;
213 absl::Status last_seen_status_;
214 bool ejected_;
215 };
216
Orphaned()217 void Orphaned() override {
218 if (!IsWorkSerializerDispatchEnabled()) {
219 if (subchannel_state_ != nullptr) {
220 subchannel_state_->RemoveSubchannel(this);
221 }
222 return;
223 }
224 work_serializer_->Run(
225 [self = WeakRefAsSubclass<SubchannelWrapper>()]() {
226 if (self->subchannel_state_ != nullptr) {
227 self->subchannel_state_->RemoveSubchannel(self.get());
228 }
229 },
230 DEBUG_LOCATION);
231 }
232
233 std::shared_ptr<WorkSerializer> work_serializer_;
234 RefCountedPtr<SubchannelState> subchannel_state_;
235 bool ejected_ = false;
236 WatcherWrapper* watcher_wrapper_ = nullptr;
237 };
238
239 class SubchannelState final : public RefCounted<SubchannelState> {
240 public:
AddSubchannel(SubchannelWrapper * wrapper)241 void AddSubchannel(SubchannelWrapper* wrapper) {
242 subchannels_.insert(wrapper);
243 }
244
RemoveSubchannel(SubchannelWrapper * wrapper)245 void RemoveSubchannel(SubchannelWrapper* wrapper) {
246 subchannels_.erase(wrapper);
247 }
248
endpoint_state()249 RefCountedPtr<EndpointState> endpoint_state() {
250 MutexLock lock(&mu_);
251 return endpoint_state_;
252 }
253
set_endpoint_state(RefCountedPtr<EndpointState> endpoint_state)254 void set_endpoint_state(RefCountedPtr<EndpointState> endpoint_state) {
255 MutexLock lock(&mu_);
256 endpoint_state_ = std::move(endpoint_state);
257 }
258
Eject()259 void Eject() {
260 // Ejecting the subchannel may cause the child policy to unref the
261 // subchannel, so we need to be prepared for the set to be modified
262 // while we are iterating.
263 for (auto it = subchannels_.begin(); it != subchannels_.end();) {
264 SubchannelWrapper* subchannel = *it;
265 ++it;
266 subchannel->Eject();
267 }
268 }
269
Uneject()270 void Uneject() {
271 for (auto& subchannel : subchannels_) {
272 subchannel->Uneject();
273 }
274 }
275
276 private:
277 std::set<SubchannelWrapper*> subchannels_;
278 Mutex mu_;
279 RefCountedPtr<EndpointState> endpoint_state_ ABSL_GUARDED_BY(mu_);
280 };
281
282 class EndpointState final : public RefCounted<EndpointState> {
283 public:
EndpointState(std::set<SubchannelState * > subchannels)284 explicit EndpointState(std::set<SubchannelState*> subchannels)
285 : subchannels_(std::move(subchannels)) {
286 for (SubchannelState* subchannel : subchannels_) {
287 subchannel->set_endpoint_state(Ref());
288 }
289 }
290
RotateBucket()291 void RotateBucket() {
292 backup_bucket_->successes = 0;
293 backup_bucket_->failures = 0;
294 current_bucket_.swap(backup_bucket_);
295 active_bucket_.store(current_bucket_.get());
296 }
297
GetSuccessRateAndVolume()298 absl::optional<std::pair<double, uint64_t>> GetSuccessRateAndVolume() {
299 uint64_t total_request =
300 backup_bucket_->successes + backup_bucket_->failures;
301 if (total_request == 0) {
302 return absl::nullopt;
303 }
304 double success_rate =
305 backup_bucket_->successes * 100.0 /
306 (backup_bucket_->successes + backup_bucket_->failures);
307 return {
308 {success_rate, backup_bucket_->successes + backup_bucket_->failures}};
309 }
310
AddSuccessCount()311 void AddSuccessCount() { active_bucket_.load()->successes.fetch_add(1); }
312
AddFailureCount()313 void AddFailureCount() { active_bucket_.load()->failures.fetch_add(1); }
314
ejection_time() const315 absl::optional<Timestamp> ejection_time() const { return ejection_time_; }
316
Eject(const Timestamp & time)317 void Eject(const Timestamp& time) {
318 ejection_time_ = time;
319 ++multiplier_;
320 for (SubchannelState* subchannel_state : subchannels_) {
321 subchannel_state->Eject();
322 }
323 }
324
Uneject()325 void Uneject() {
326 ejection_time_.reset();
327 for (SubchannelState* subchannel_state : subchannels_) {
328 subchannel_state->Uneject();
329 }
330 }
331
MaybeUneject(uint64_t base_ejection_time_in_millis,uint64_t max_ejection_time_in_millis)332 bool MaybeUneject(uint64_t base_ejection_time_in_millis,
333 uint64_t max_ejection_time_in_millis) {
334 if (!ejection_time_.has_value()) {
335 if (multiplier_ > 0) {
336 --multiplier_;
337 }
338 } else {
339 CHECK(ejection_time_.has_value());
340 auto change_time = ejection_time_.value() +
341 Duration::Milliseconds(std::min(
342 base_ejection_time_in_millis * multiplier_,
343 std::max(base_ejection_time_in_millis,
344 max_ejection_time_in_millis)));
345 if (change_time < Timestamp::Now()) {
346 Uneject();
347 return true;
348 }
349 }
350 return false;
351 }
352
DisableEjection()353 void DisableEjection() {
354 if (ejection_time_.has_value()) Uneject();
355 multiplier_ = 0;
356 }
357
358 private:
359 struct Bucket {
360 std::atomic<uint64_t> successes;
361 std::atomic<uint64_t> failures;
362 };
363
364 const std::set<SubchannelState*> subchannels_;
365
366 std::unique_ptr<Bucket> current_bucket_ = std::make_unique<Bucket>();
367 std::unique_ptr<Bucket> backup_bucket_ = std::make_unique<Bucket>();
368 // The bucket used to update call counts.
369 // Points to either current_bucket or active_bucket.
370 std::atomic<Bucket*> active_bucket_{current_bucket_.get()};
371 uint32_t multiplier_ = 0;
372 absl::optional<Timestamp> ejection_time_;
373 };
374
375 // A picker that wraps the picker from the child to perform outlier detection.
376 class Picker final : public SubchannelPicker {
377 public:
378 Picker(OutlierDetectionLb* outlier_detection_lb,
379 RefCountedPtr<SubchannelPicker> picker, bool counting_enabled);
380
381 PickResult Pick(PickArgs args) override;
382
383 private:
384 class SubchannelCallTracker;
385 RefCountedPtr<SubchannelPicker> picker_;
386 bool counting_enabled_;
387 };
388
389 class Helper final
390 : public ParentOwningDelegatingChannelControlHelper<OutlierDetectionLb> {
391 public:
Helper(RefCountedPtr<OutlierDetectionLb> outlier_detection_policy)392 explicit Helper(RefCountedPtr<OutlierDetectionLb> outlier_detection_policy)
393 : ParentOwningDelegatingChannelControlHelper(
394 std::move(outlier_detection_policy)) {}
395
396 RefCountedPtr<SubchannelInterface> CreateSubchannel(
397 const grpc_resolved_address& address,
398 const ChannelArgs& per_address_args, const ChannelArgs& args) override;
399 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
400 RefCountedPtr<SubchannelPicker> picker) override;
401 };
402
403 class EjectionTimer final : public InternallyRefCounted<EjectionTimer> {
404 public:
405 EjectionTimer(RefCountedPtr<OutlierDetectionLb> parent,
406 Timestamp start_time);
407
408 void Orphan() override;
409
StartTime() const410 Timestamp StartTime() const { return start_time_; }
411
412 private:
413 void OnTimerLocked();
414
415 RefCountedPtr<OutlierDetectionLb> parent_;
416 absl::optional<EventEngine::TaskHandle> timer_handle_;
417 Timestamp start_time_;
418 absl::BitGen bit_gen_;
419 };
420
421 ~OutlierDetectionLb() override;
422
423 void ShutdownLocked() override;
424
425 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
426 const ChannelArgs& args);
427
428 void MaybeUpdatePickerLocked();
429
430 // Current config from the resolver.
431 RefCountedPtr<OutlierDetectionLbConfig> config_;
432
433 // Internal state.
434 bool shutting_down_ = false;
435
436 OrphanablePtr<LoadBalancingPolicy> child_policy_;
437
438 // Latest state and picker reported by the child policy.
439 grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
440 absl::Status status_;
441 RefCountedPtr<SubchannelPicker> picker_;
442 std::map<EndpointAddressSet, RefCountedPtr<EndpointState>>
443 endpoint_state_map_;
444 std::map<grpc_resolved_address, RefCountedPtr<SubchannelState>,
445 ResolvedAddressLessThan>
446 subchannel_state_map_;
447 OrphanablePtr<EjectionTimer> ejection_timer_;
448 };
449
450 //
451 // OutlierDetectionLb::SubchannelWrapper
452 //
453
Eject()454 void OutlierDetectionLb::SubchannelWrapper::Eject() {
455 ejected_ = true;
456 if (watcher_wrapper_ != nullptr) watcher_wrapper_->Eject();
457 }
458
Uneject()459 void OutlierDetectionLb::SubchannelWrapper::Uneject() {
460 ejected_ = false;
461 if (watcher_wrapper_ != nullptr) watcher_wrapper_->Uneject();
462 }
463
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)464 void OutlierDetectionLb::SubchannelWrapper::AddDataWatcher(
465 std::unique_ptr<DataWatcherInterface> watcher) {
466 auto* w = static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get());
467 if (w->type() == HealthProducer::Type()) {
468 auto* health_watcher = static_cast<HealthWatcher*>(watcher.get());
469 auto watcher_wrapper = std::make_shared<WatcherWrapper>(
470 WeakRefAsSubclass<SubchannelWrapper>(), health_watcher->TakeWatcher(),
471 ejected_);
472 watcher_wrapper_ = watcher_wrapper.get();
473 health_watcher->SetWatcher(std::move(watcher_wrapper));
474 }
475 DelegatingSubchannel::AddDataWatcher(std::move(watcher));
476 }
477
CancelDataWatcher(DataWatcherInterface * watcher)478 void OutlierDetectionLb::SubchannelWrapper::CancelDataWatcher(
479 DataWatcherInterface* watcher) {
480 auto* w = static_cast<InternalSubchannelDataWatcherInterface*>(watcher);
481 if (w->type() == HealthProducer::Type()) watcher_wrapper_ = nullptr;
482 DelegatingSubchannel::CancelDataWatcher(watcher);
483 }
484
485 //
486 // OutlierDetectionLb::Picker::SubchannelCallTracker
487 //
488
489 class OutlierDetectionLb::Picker::SubchannelCallTracker final
490 : public LoadBalancingPolicy::SubchannelCallTrackerInterface {
491 public:
SubchannelCallTracker(std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface> original_subchannel_call_tracker,RefCountedPtr<EndpointState> endpoint_state)492 SubchannelCallTracker(
493 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
494 original_subchannel_call_tracker,
495 RefCountedPtr<EndpointState> endpoint_state)
496 : original_subchannel_call_tracker_(
497 std::move(original_subchannel_call_tracker)),
498 endpoint_state_(std::move(endpoint_state)) {}
499
~SubchannelCallTracker()500 ~SubchannelCallTracker() override {
501 endpoint_state_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
502 }
503
Start()504 void Start() override {
505 // This tracker does not care about started calls only finished calls.
506 // Delegate if needed.
507 if (original_subchannel_call_tracker_ != nullptr) {
508 original_subchannel_call_tracker_->Start();
509 }
510 }
511
Finish(FinishArgs args)512 void Finish(FinishArgs args) override {
513 // Delegate if needed.
514 if (original_subchannel_call_tracker_ != nullptr) {
515 original_subchannel_call_tracker_->Finish(args);
516 }
517 // Record call completion based on status for outlier detection
518 // calculations.
519 if (args.status.ok()) {
520 endpoint_state_->AddSuccessCount();
521 } else {
522 endpoint_state_->AddFailureCount();
523 }
524 }
525
526 private:
527 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
528 original_subchannel_call_tracker_;
529 RefCountedPtr<EndpointState> endpoint_state_;
530 };
531
532 //
533 // OutlierDetectionLb::Picker
534 //
535
Picker(OutlierDetectionLb * outlier_detection_lb,RefCountedPtr<SubchannelPicker> picker,bool counting_enabled)536 OutlierDetectionLb::Picker::Picker(OutlierDetectionLb* outlier_detection_lb,
537 RefCountedPtr<SubchannelPicker> picker,
538 bool counting_enabled)
539 : picker_(std::move(picker)), counting_enabled_(counting_enabled) {
540 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
541 << "[outlier_detection_lb " << outlier_detection_lb
542 << "] constructed new picker " << this << " and counting " << "is "
543 << (counting_enabled ? "enabled" : "disabled");
544 }
545
Pick(LoadBalancingPolicy::PickArgs args)546 LoadBalancingPolicy::PickResult OutlierDetectionLb::Picker::Pick(
547 LoadBalancingPolicy::PickArgs args) {
548 if (picker_ == nullptr) { // Should never happen.
549 return PickResult::Fail(absl::InternalError(
550 "outlier_detection picker not given any child picker"));
551 }
552 // Delegate to child picker
553 PickResult result = picker_->Pick(args);
554 auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
555 if (complete_pick != nullptr) {
556 auto* subchannel_wrapper =
557 static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
558 // Inject subchannel call tracker to record call completion as long as
559 // either success_rate_ejection or failure_percentage_ejection is enabled.
560 if (counting_enabled_) {
561 auto endpoint_state = subchannel_wrapper->endpoint_state();
562 if (endpoint_state != nullptr) {
563 complete_pick->subchannel_call_tracker =
564 std::make_unique<SubchannelCallTracker>(
565 std::move(complete_pick->subchannel_call_tracker),
566 std::move(endpoint_state));
567 }
568 }
569 // Unwrap subchannel to pass back up the stack.
570 complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
571 }
572 return result;
573 }
574
575 //
576 // OutlierDetectionLb
577 //
578
OutlierDetectionLb(Args args)579 OutlierDetectionLb::OutlierDetectionLb(Args args)
580 : LoadBalancingPolicy(std::move(args)) {
581 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
582 << "[outlier_detection_lb " << this << "] created";
583 }
584
~OutlierDetectionLb()585 OutlierDetectionLb::~OutlierDetectionLb() {
586 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
587 << "[outlier_detection_lb " << this
588 << "] destroying outlier_detection LB policy";
589 }
590
ShutdownLocked()591 void OutlierDetectionLb::ShutdownLocked() {
592 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
593 << "[outlier_detection_lb " << this << "] shutting down";
594 ejection_timer_.reset();
595 shutting_down_ = true;
596 // Remove the child policy's interested_parties pollset_set from the
597 // xDS policy.
598 if (child_policy_ != nullptr) {
599 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
600 interested_parties());
601 child_policy_.reset();
602 }
603 // Drop our ref to the child's picker, in case it's holding a ref to
604 // the child.
605 picker_.reset();
606 }
607
ExitIdleLocked()608 void OutlierDetectionLb::ExitIdleLocked() {
609 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
610 }
611
ResetBackoffLocked()612 void OutlierDetectionLb::ResetBackoffLocked() {
613 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
614 }
615
UpdateLocked(UpdateArgs args)616 absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
617 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
618 << "[outlier_detection_lb " << this << "] Received update";
619 auto old_config = std::move(config_);
620 // Update config.
621 config_ = args.config.TakeAsSubclass<OutlierDetectionLbConfig>();
622 // Update outlier detection timer.
623 if (!config_->CountingEnabled()) {
624 // No need for timer. Cancel the current timer, if any.
625 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
626 << "[outlier_detection_lb " << this
627 << "] counting disabled, cancelling timer";
628 ejection_timer_.reset();
629 } else if (ejection_timer_ == nullptr) {
630 // No timer running. Start it now.
631 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
632 << "[outlier_detection_lb " << this << "] starting timer";
633 ejection_timer_ = MakeOrphanable<EjectionTimer>(
634 RefAsSubclass<OutlierDetectionLb>(), Timestamp::Now());
635 for (const auto& p : endpoint_state_map_) {
636 p.second->RotateBucket(); // Reset call counters.
637 }
638 } else if (old_config->outlier_detection_config().interval !=
639 config_->outlier_detection_config().interval) {
640 // Timer interval changed. Cancel the current timer and start a new one
641 // with the same start time.
642 // Note that if the new deadline is in the past, the timer will fire
643 // immediately.
644 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
645 << "[outlier_detection_lb " << this
646 << "] interval changed, replacing timer";
647 ejection_timer_ = MakeOrphanable<EjectionTimer>(
648 RefAsSubclass<OutlierDetectionLb>(), ejection_timer_->StartTime());
649 }
650 // Update subchannel and endpoint maps.
651 if (args.addresses.ok()) {
652 std::set<EndpointAddressSet> current_endpoints;
653 std::set<grpc_resolved_address, ResolvedAddressLessThan> current_addresses;
654 (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
655 EndpointAddressSet key(endpoint.addresses());
656 current_endpoints.emplace(key);
657 for (const grpc_resolved_address& address : endpoint.addresses()) {
658 current_addresses.emplace(address);
659 }
660 // Find the entry in the endpoint map.
661 auto it = endpoint_state_map_.find(key);
662 if (it == endpoint_state_map_.end()) {
663 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
664 << "[outlier_detection_lb " << this
665 << "] adding endpoint entry for " << key.ToString();
666 // The endpoint is not present in the map, so we'll need to add it.
667 // Start by getting a pointer to the entry for each address in the
668 // subchannel map, creating the entry if needed.
669 std::set<SubchannelState*> subchannels;
670 for (const grpc_resolved_address& address : endpoint.addresses()) {
671 auto it2 = subchannel_state_map_.find(address);
672 if (it2 == subchannel_state_map_.end()) {
673 if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
674 std::string address_str = grpc_sockaddr_to_string(&address, false)
675 .value_or("<unknown>");
676 LOG(INFO) << "[outlier_detection_lb " << this
677 << "] adding address entry for " << address_str;
678 }
679 it2 = subchannel_state_map_
680 .emplace(address, MakeRefCounted<SubchannelState>())
681 .first;
682 }
683 subchannels.insert(it2->second.get());
684 }
685 // Now create the endpoint.
686 endpoint_state_map_.emplace(
687 key, MakeRefCounted<EndpointState>(std::move(subchannels)));
688 } else if (!config_->CountingEnabled()) {
689 // If counting is not enabled, reset state.
690 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
691 << "[outlier_detection_lb " << this
692 << "] counting disabled; disabling ejection for " << key.ToString();
693 it->second->DisableEjection();
694 }
695 });
696 // Remove any entries we no longer need in the subchannel map.
697 for (auto it = subchannel_state_map_.begin();
698 it != subchannel_state_map_.end();) {
699 if (current_addresses.find(it->first) == current_addresses.end()) {
700 if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
701 std::string address_str =
702 grpc_sockaddr_to_string(&it->first, false).value_or("<unknown>");
703 LOG(INFO) << "[outlier_detection_lb " << this
704 << "] removing subchannel map entry " << address_str;
705 }
706 // Don't hold a ref to the corresponding EndpointState object,
707 // because there could be subchannel wrappers keeping this alive
708 // for a while, and we don't need them to do any call tracking.
709 it->second->set_endpoint_state(nullptr);
710 it = subchannel_state_map_.erase(it);
711 } else {
712 ++it;
713 }
714 }
715 // Remove any entries we no longer need in the endpoint map.
716 for (auto it = endpoint_state_map_.begin();
717 it != endpoint_state_map_.end();) {
718 if (current_endpoints.find(it->first) == current_endpoints.end()) {
719 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
720 << "[outlier_detection_lb " << this
721 << "] removing endpoint map entry " << it->first.ToString();
722 it = endpoint_state_map_.erase(it);
723 } else {
724 ++it;
725 }
726 }
727 }
728 // Create child policy if needed.
729 if (child_policy_ == nullptr) {
730 child_policy_ = CreateChildPolicyLocked(args.args);
731 }
732 // Update child policy.
733 UpdateArgs update_args;
734 update_args.addresses = std::move(args.addresses);
735 update_args.resolution_note = std::move(args.resolution_note);
736 update_args.config = config_->child_policy();
737 update_args.args = std::move(args.args);
738 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
739 << "[outlier_detection_lb " << this << "] Updating child policy handler "
740 << child_policy_.get();
741 return child_policy_->UpdateLocked(std::move(update_args));
742 }
743
MaybeUpdatePickerLocked()744 void OutlierDetectionLb::MaybeUpdatePickerLocked() {
745 if (picker_ != nullptr) {
746 auto outlier_detection_picker =
747 MakeRefCounted<Picker>(this, picker_, config_->CountingEnabled());
748 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
749 << "[outlier_detection_lb " << this
750 << "] updating connectivity: state=" << ConnectivityStateName(state_)
751 << " status=(" << status_
752 << ") picker=" << outlier_detection_picker.get();
753 channel_control_helper()->UpdateState(state_, status_,
754 std::move(outlier_detection_picker));
755 }
756 }
757
CreateChildPolicyLocked(const ChannelArgs & args)758 OrphanablePtr<LoadBalancingPolicy> OutlierDetectionLb::CreateChildPolicyLocked(
759 const ChannelArgs& args) {
760 LoadBalancingPolicy::Args lb_policy_args;
761 lb_policy_args.work_serializer = work_serializer();
762 lb_policy_args.args = args;
763 lb_policy_args.channel_control_helper = std::make_unique<Helper>(
764 RefAsSubclass<OutlierDetectionLb>(DEBUG_LOCATION, "Helper"));
765 OrphanablePtr<LoadBalancingPolicy> lb_policy =
766 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
767 &outlier_detection_lb_trace);
768 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
769 << "[outlier_detection_lb " << this
770 << "] Created new child policy handler " << lb_policy.get();
771 // Add our interested_parties pollset_set to that of the newly created
772 // child policy. This will make the child policy progress upon activity on
773 // this policy, which in turn is tied to the application's call.
774 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
775 interested_parties());
776 return lb_policy;
777 }
778
779 //
780 // OutlierDetectionLb::Helper
781 //
782
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)783 RefCountedPtr<SubchannelInterface> OutlierDetectionLb::Helper::CreateSubchannel(
784 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
785 const ChannelArgs& args) {
786 if (parent()->shutting_down_) return nullptr;
787 RefCountedPtr<SubchannelState> subchannel_state;
788 auto it = parent()->subchannel_state_map_.find(address);
789 if (it != parent()->subchannel_state_map_.end()) {
790 subchannel_state = it->second->Ref();
791 }
792 if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
793 std::string address_str =
794 grpc_sockaddr_to_string(&address, false).value_or("<unknown>");
795 LOG(INFO) << "[outlier_detection_lb " << parent()
796 << "] creating subchannel for " << address_str
797 << ", subchannel state " << subchannel_state.get();
798 }
799 auto subchannel = MakeRefCounted<SubchannelWrapper>(
800 parent()->work_serializer(), subchannel_state,
801 parent()->channel_control_helper()->CreateSubchannel(
802 address, per_address_args, args));
803 if (subchannel_state != nullptr) {
804 subchannel_state->AddSubchannel(subchannel.get());
805 }
806 return subchannel;
807 }
808
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)809 void OutlierDetectionLb::Helper::UpdateState(
810 grpc_connectivity_state state, const absl::Status& status,
811 RefCountedPtr<SubchannelPicker> picker) {
812 if (parent()->shutting_down_) return;
813 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
814 << "[outlier_detection_lb " << parent()
815 << "] child connectivity state update: state="
816 << ConnectivityStateName(state) << " (" << status
817 << ") picker=" << picker.get();
818 // Save the state and picker.
819 parent()->state_ = state;
820 parent()->status_ = status;
821 parent()->picker_ = std::move(picker);
822 // Wrap the picker and return it to the channel.
823 parent()->MaybeUpdatePickerLocked();
824 }
825
826 //
827 // OutlierDetectionLb::EjectionTimer
828 //
829
EjectionTimer(RefCountedPtr<OutlierDetectionLb> parent,Timestamp start_time)830 OutlierDetectionLb::EjectionTimer::EjectionTimer(
831 RefCountedPtr<OutlierDetectionLb> parent, Timestamp start_time)
832 : parent_(std::move(parent)), start_time_(start_time) {
833 auto interval = parent_->config_->outlier_detection_config().interval;
834 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
835 << "[outlier_detection_lb " << parent_.get()
836 << "] ejection timer will run in " << interval.ToString();
837 timer_handle_ = parent_->channel_control_helper()->GetEventEngine()->RunAfter(
838 interval, [self = Ref(DEBUG_LOCATION, "EjectionTimer")]() mutable {
839 ApplicationCallbackExecCtx callback_exec_ctx;
840 ExecCtx exec_ctx;
841 auto self_ptr = self.get();
842 self_ptr->parent_->work_serializer()->Run(
843 [self = std::move(self)]() { self->OnTimerLocked(); },
844 DEBUG_LOCATION);
845 });
846 }
847
Orphan()848 void OutlierDetectionLb::EjectionTimer::Orphan() {
849 if (timer_handle_.has_value()) {
850 parent_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
851 timer_handle_.reset();
852 }
853 Unref();
854 }
855
OnTimerLocked()856 void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
857 if (!timer_handle_.has_value()) return;
858 timer_handle_.reset();
859 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
860 << "[outlier_detection_lb " << parent_.get()
861 << "] ejection timer running";
862 std::map<EndpointState*, double> success_rate_ejection_candidates;
863 std::map<EndpointState*, double> failure_percentage_ejection_candidates;
864 size_t ejected_host_count = 0;
865 double success_rate_sum = 0;
866 auto time_now = Timestamp::Now();
867 auto& config = parent_->config_->outlier_detection_config();
868 for (auto& state : parent_->endpoint_state_map_) {
869 auto* endpoint_state = state.second.get();
870 // For each address, swap the call counter's buckets in that address's
871 // map entry.
872 endpoint_state->RotateBucket();
873 // Gather data to run success rate algorithm or failure percentage
874 // algorithm.
875 if (endpoint_state->ejection_time().has_value()) {
876 ++ejected_host_count;
877 }
878 absl::optional<std::pair<double, uint64_t>> host_success_rate_and_volume =
879 endpoint_state->GetSuccessRateAndVolume();
880 if (!host_success_rate_and_volume.has_value()) {
881 continue;
882 }
883 double success_rate = host_success_rate_and_volume->first;
884 uint64_t request_volume = host_success_rate_and_volume->second;
885 if (config.success_rate_ejection.has_value()) {
886 if (request_volume >= config.success_rate_ejection->request_volume) {
887 success_rate_ejection_candidates[endpoint_state] = success_rate;
888 success_rate_sum += success_rate;
889 }
890 }
891 if (config.failure_percentage_ejection.has_value()) {
892 if (request_volume >=
893 config.failure_percentage_ejection->request_volume) {
894 failure_percentage_ejection_candidates[endpoint_state] = success_rate;
895 }
896 }
897 }
898 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
899 << "[outlier_detection_lb " << parent_.get() << "] found "
900 << success_rate_ejection_candidates.size()
901 << " success rate candidates and "
902 << failure_percentage_ejection_candidates.size()
903 << " failure percentage candidates; ejected_host_count="
904 << ejected_host_count
905 << "; success_rate_sum=" << absl::StrFormat("%.3f", success_rate_sum);
906 // success rate algorithm
907 if (!success_rate_ejection_candidates.empty() &&
908 success_rate_ejection_candidates.size() >=
909 config.success_rate_ejection->minimum_hosts) {
910 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
911 << "[outlier_detection_lb " << parent_.get()
912 << "] running success rate algorithm: " << "stdev_factor="
913 << config.success_rate_ejection->stdev_factor
914 << ", enforcement_percentage="
915 << config.success_rate_ejection->enforcement_percentage;
916 // calculate ejection threshold: (mean - stdev *
917 // (success_rate_ejection.stdev_factor / 1000))
918 double mean = success_rate_sum / success_rate_ejection_candidates.size();
919 double variance = 0;
920 for (const auto& p : success_rate_ejection_candidates) {
921 variance += std::pow(p.second - mean, 2);
922 }
923 variance /= success_rate_ejection_candidates.size();
924 double stdev = std::sqrt(variance);
925 const double success_rate_stdev_factor =
926 static_cast<double>(config.success_rate_ejection->stdev_factor) / 1000;
927 double ejection_threshold = mean - (stdev * success_rate_stdev_factor);
928 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
929 << "[outlier_detection_lb " << parent_.get() << "] stdev=" << stdev
930 << ", ejection_threshold=" << ejection_threshold;
931 for (auto& candidate : success_rate_ejection_candidates) {
932 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
933 << "[outlier_detection_lb " << parent_.get()
934 << "] checking candidate " << candidate.first
935 << ": success_rate=" << candidate.second;
936 if (candidate.second < ejection_threshold) {
937 uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
938 double current_percent =
939 100.0 * ejected_host_count / parent_->endpoint_state_map_.size();
940 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
941 << "[outlier_detection_lb " << parent_.get()
942 << "] random_key=" << random_key
943 << " ejected_host_count=" << ejected_host_count
944 << " current_percent=" << absl::StrFormat("%.3f", current_percent);
945 if (random_key < config.success_rate_ejection->enforcement_percentage &&
946 (ejected_host_count == 0 ||
947 (current_percent < config.max_ejection_percent))) {
948 // Eject and record the timestamp for use when ejecting addresses in
949 // this iteration.
950 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
951 << "[outlier_detection_lb " << parent_.get()
952 << "] ejecting candidate";
953 candidate.first->Eject(time_now);
954 ++ejected_host_count;
955 }
956 }
957 }
958 }
959 // failure percentage algorithm
960 if (!failure_percentage_ejection_candidates.empty() &&
961 failure_percentage_ejection_candidates.size() >=
962 config.failure_percentage_ejection->minimum_hosts) {
963 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
964 << "[outlier_detection_lb " << parent_.get()
965 << "] running failure percentage algorithm: " << "threshold="
966 << config.failure_percentage_ejection->threshold
967 << ", enforcement_percentage="
968 << config.failure_percentage_ejection->enforcement_percentage;
969 for (auto& candidate : failure_percentage_ejection_candidates) {
970 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
971 << "[outlier_detection_lb " << parent_.get()
972 << "] checking candidate " << candidate.first
973 << ": success_rate=" << candidate.second;
974 // Extra check to make sure success rate algorithm didn't already
975 // eject this backend.
976 if (candidate.first->ejection_time().has_value()) continue;
977 if ((100.0 - candidate.second) >
978 config.failure_percentage_ejection->threshold) {
979 uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
980 double current_percent =
981 100.0 * ejected_host_count / parent_->endpoint_state_map_.size();
982 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
983 << "[outlier_detection_lb " << parent_.get()
984 << "] random_key=" << random_key
985 << " ejected_host_count=" << ejected_host_count
986 << " current_percent=" << current_percent;
987 if (random_key <
988 config.failure_percentage_ejection->enforcement_percentage &&
989 (ejected_host_count == 0 ||
990 (current_percent < config.max_ejection_percent))) {
991 // Eject and record the timestamp for use when ejecting addresses in
992 // this iteration.
993 GRPC_TRACE_LOG(outlier_detection_lb, INFO)
994 << "[outlier_detection_lb " << parent_.get()
995 << "] ejecting candidate";
996 candidate.first->Eject(time_now);
997 ++ejected_host_count;
998 }
999 }
1000 }
1001 }
1002 // For each address in the map:
1003 // If the address is not ejected and the multiplier is greater than 0,
1004 // decrease the multiplier by 1. If the address is ejected, and the
1005 // current time is after ejection_timestamp + min(base_ejection_time *
1006 // multiplier, max(base_ejection_time, max_ejection_time)), un-eject the
1007 // address.
1008 for (auto& state : parent_->endpoint_state_map_) {
1009 auto* endpoint_state = state.second.get();
1010 const bool unejected = endpoint_state->MaybeUneject(
1011 config.base_ejection_time.millis(), config.max_ejection_time.millis());
1012 if (unejected && GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
1013 LOG(INFO) << "[outlier_detection_lb " << parent_.get()
1014 << "] unejected endpoint " << state.first.ToString() << " ("
1015 << endpoint_state << ")";
1016 }
1017 }
1018 parent_->ejection_timer_ =
1019 MakeOrphanable<EjectionTimer>(parent_, Timestamp::Now());
1020 }
1021
1022 //
1023 // factory
1024 //
1025
1026 class OutlierDetectionLbFactory final : public LoadBalancingPolicyFactory {
1027 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1028 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1029 LoadBalancingPolicy::Args args) const override {
1030 return MakeOrphanable<OutlierDetectionLb>(std::move(args));
1031 }
1032
name() const1033 absl::string_view name() const override { return kOutlierDetection; }
1034
1035 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const1036 ParseLoadBalancingConfig(const Json& json) const override {
1037 ValidationErrors errors;
1038 OutlierDetectionConfig outlier_detection_config;
1039 RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
1040 {
1041 outlier_detection_config =
1042 LoadFromJson<OutlierDetectionConfig>(json, JsonArgs(), &errors);
1043 // Parse childPolicy manually.
1044 {
1045 ValidationErrors::ScopedField field(&errors, ".childPolicy");
1046 auto it = json.object().find("childPolicy");
1047 if (it == json.object().end()) {
1048 errors.AddError("field not present");
1049 } else {
1050 auto child_policy_config = CoreConfiguration::Get()
1051 .lb_policy_registry()
1052 .ParseLoadBalancingConfig(it->second);
1053 if (!child_policy_config.ok()) {
1054 errors.AddError(child_policy_config.status().message());
1055 } else {
1056 child_policy = std::move(*child_policy_config);
1057 }
1058 }
1059 }
1060 }
1061 if (!errors.ok()) {
1062 return errors.status(
1063 absl::StatusCode::kInvalidArgument,
1064 "errors validating outlier_detection LB policy config");
1065 }
1066 return MakeRefCounted<OutlierDetectionLbConfig>(outlier_detection_config,
1067 std::move(child_policy));
1068 }
1069 };
1070
1071 } // namespace
1072
1073 //
1074 // OutlierDetectionConfig
1075 //
1076
1077 const JsonLoaderInterface*
JsonLoader(const JsonArgs &)1078 OutlierDetectionConfig::SuccessRateEjection::JsonLoader(const JsonArgs&) {
1079 static const auto* loader =
1080 JsonObjectLoader<SuccessRateEjection>()
1081 .OptionalField("stdevFactor", &SuccessRateEjection::stdev_factor)
1082 .OptionalField("enforcementPercentage",
1083 &SuccessRateEjection::enforcement_percentage)
1084 .OptionalField("minimumHosts", &SuccessRateEjection::minimum_hosts)
1085 .OptionalField("requestVolume", &SuccessRateEjection::request_volume)
1086 .Finish();
1087 return loader;
1088 }
1089
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)1090 void OutlierDetectionConfig::SuccessRateEjection::JsonPostLoad(
1091 const Json&, const JsonArgs&, ValidationErrors* errors) {
1092 if (enforcement_percentage > 100) {
1093 ValidationErrors::ScopedField field(errors, ".enforcement_percentage");
1094 errors->AddError("value must be <= 100");
1095 }
1096 }
1097
1098 const JsonLoaderInterface*
JsonLoader(const JsonArgs &)1099 OutlierDetectionConfig::FailurePercentageEjection::JsonLoader(const JsonArgs&) {
1100 static const auto* loader =
1101 JsonObjectLoader<FailurePercentageEjection>()
1102 .OptionalField("threshold", &FailurePercentageEjection::threshold)
1103 .OptionalField("enforcementPercentage",
1104 &FailurePercentageEjection::enforcement_percentage)
1105 .OptionalField("minimumHosts",
1106 &FailurePercentageEjection::minimum_hosts)
1107 .OptionalField("requestVolume",
1108 &FailurePercentageEjection::request_volume)
1109 .Finish();
1110 return loader;
1111 }
1112
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)1113 void OutlierDetectionConfig::FailurePercentageEjection::JsonPostLoad(
1114 const Json&, const JsonArgs&, ValidationErrors* errors) {
1115 if (enforcement_percentage > 100) {
1116 ValidationErrors::ScopedField field(errors, ".enforcement_percentage");
1117 errors->AddError("value must be <= 100");
1118 }
1119 if (threshold > 100) {
1120 ValidationErrors::ScopedField field(errors, ".threshold");
1121 errors->AddError("value must be <= 100");
1122 }
1123 }
1124
JsonLoader(const JsonArgs &)1125 const JsonLoaderInterface* OutlierDetectionConfig::JsonLoader(const JsonArgs&) {
1126 static const auto* loader =
1127 JsonObjectLoader<OutlierDetectionConfig>()
1128 .OptionalField("interval", &OutlierDetectionConfig::interval)
1129 .OptionalField("baseEjectionTime",
1130 &OutlierDetectionConfig::base_ejection_time)
1131 .OptionalField("maxEjectionTime",
1132 &OutlierDetectionConfig::max_ejection_time)
1133 .OptionalField("maxEjectionPercent",
1134 &OutlierDetectionConfig::max_ejection_percent)
1135 .OptionalField("successRateEjection",
1136 &OutlierDetectionConfig::success_rate_ejection)
1137 .OptionalField("failurePercentageEjection",
1138 &OutlierDetectionConfig::failure_percentage_ejection)
1139 .Finish();
1140 return loader;
1141 }
1142
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)1143 void OutlierDetectionConfig::JsonPostLoad(const Json& json, const JsonArgs&,
1144 ValidationErrors* errors) {
1145 if (json.object().find("maxEjectionTime") == json.object().end()) {
1146 max_ejection_time = std::max(base_ejection_time, Duration::Seconds(300));
1147 }
1148 if (max_ejection_percent > 100) {
1149 ValidationErrors::ScopedField field(errors, ".max_ejection_percent");
1150 errors->AddError("value must be <= 100");
1151 }
1152 }
1153
1154 //
1155 // Plugin registration
1156 //
1157
RegisterOutlierDetectionLbPolicy(CoreConfiguration::Builder * builder)1158 void RegisterOutlierDetectionLbPolicy(CoreConfiguration::Builder* builder) {
1159 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
1160 std::make_unique<OutlierDetectionLbFactory>());
1161 }
1162
1163 } // namespace grpc_core
1164