• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/load_balancing/outlier_detection/outlier_detection.h"
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/impl/connectivity_state.h>
21 #include <grpc/support/port_platform.h>
22 #include <inttypes.h>
23 #include <stddef.h>
24 
25 #include <algorithm>
26 #include <atomic>
27 #include <cmath>
28 #include <map>
29 #include <memory>
30 #include <set>
31 #include <string>
32 #include <type_traits>
33 #include <utility>
34 #include <vector>
35 
36 #include "absl/base/thread_annotations.h"
37 #include "absl/log/check.h"
38 #include "absl/log/log.h"
39 #include "absl/meta/type_traits.h"
40 #include "absl/random/random.h"
41 #include "absl/status/status.h"
42 #include "absl/status/statusor.h"
43 #include "absl/strings/string_view.h"
44 #include "absl/types/variant.h"
45 #include "src/core/client_channel/subchannel_interface_internal.h"
46 #include "src/core/config/core_configuration.h"
47 #include "src/core/lib/address_utils/sockaddr_utils.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/debug/trace.h"
50 #include "src/core/lib/experiments/experiments.h"
51 #include "src/core/lib/iomgr/exec_ctx.h"
52 #include "src/core/lib/iomgr/iomgr_fwd.h"
53 #include "src/core/lib/iomgr/pollset_set.h"
54 #include "src/core/lib/iomgr/resolved_address.h"
55 #include "src/core/lib/transport/connectivity_state.h"
56 #include "src/core/load_balancing/child_policy_handler.h"
57 #include "src/core/load_balancing/delegating_helper.h"
58 #include "src/core/load_balancing/health_check_client_internal.h"
59 #include "src/core/load_balancing/lb_policy.h"
60 #include "src/core/load_balancing/lb_policy_factory.h"
61 #include "src/core/load_balancing/lb_policy_registry.h"
62 #include "src/core/load_balancing/subchannel_interface.h"
63 #include "src/core/resolver/endpoint_addresses.h"
64 #include "src/core/util/debug_location.h"
65 #include "src/core/util/json/json.h"
66 #include "src/core/util/orphanable.h"
67 #include "src/core/util/ref_counted.h"
68 #include "src/core/util/ref_counted_ptr.h"
69 #include "src/core/util/sync.h"
70 #include "src/core/util/unique_type_name.h"
71 #include "src/core/util/validation_errors.h"
72 #include "src/core/util/work_serializer.h"
73 
74 namespace grpc_core {
75 
76 namespace {
77 
78 using ::grpc_event_engine::experimental::EventEngine;
79 
80 constexpr absl::string_view kOutlierDetection =
81     "outlier_detection_experimental";
82 
83 // Config for xDS Cluster Impl LB policy.
84 class OutlierDetectionLbConfig final : public LoadBalancingPolicy::Config {
85  public:
OutlierDetectionLbConfig(OutlierDetectionConfig outlier_detection_config,RefCountedPtr<LoadBalancingPolicy::Config> child_policy)86   OutlierDetectionLbConfig(
87       OutlierDetectionConfig outlier_detection_config,
88       RefCountedPtr<LoadBalancingPolicy::Config> child_policy)
89       : outlier_detection_config_(outlier_detection_config),
90         child_policy_(std::move(child_policy)) {}
91 
name() const92   absl::string_view name() const override { return kOutlierDetection; }
93 
CountingEnabled() const94   bool CountingEnabled() const {
95     return outlier_detection_config_.success_rate_ejection.has_value() ||
96            outlier_detection_config_.failure_percentage_ejection.has_value();
97   }
98 
outlier_detection_config() const99   const OutlierDetectionConfig& outlier_detection_config() const {
100     return outlier_detection_config_;
101   }
102 
child_policy() const103   RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
104     return child_policy_;
105   }
106 
107  private:
108   OutlierDetectionConfig outlier_detection_config_;
109   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
110 };
111 
112 // xDS Cluster Impl LB policy.
113 class OutlierDetectionLb final : public LoadBalancingPolicy {
114  public:
115   explicit OutlierDetectionLb(Args args);
116 
name() const117   absl::string_view name() const override { return kOutlierDetection; }
118 
119   absl::Status UpdateLocked(UpdateArgs args) override;
120   void ExitIdleLocked() override;
121   void ResetBackoffLocked() override;
122 
123  private:
124   class SubchannelState;
125   class EndpointState;
126 
127   class SubchannelWrapper final : public DelegatingSubchannel {
128    public:
SubchannelWrapper(std::shared_ptr<WorkSerializer> work_serializer,RefCountedPtr<SubchannelState> subchannel_state,RefCountedPtr<SubchannelInterface> subchannel)129     SubchannelWrapper(std::shared_ptr<WorkSerializer> work_serializer,
130                       RefCountedPtr<SubchannelState> subchannel_state,
131                       RefCountedPtr<SubchannelInterface> subchannel)
132         : DelegatingSubchannel(std::move(subchannel)),
133           work_serializer_(std::move(work_serializer)),
134           subchannel_state_(std::move(subchannel_state)) {
135       if (subchannel_state_ != nullptr) {
136         subchannel_state_->AddSubchannel(this);
137         if (subchannel_state_->endpoint_state()->ejection_time().has_value()) {
138           ejected_ = true;
139         }
140       }
141     }
142 
143     void Eject();
144 
145     void Uneject();
146 
147     void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override;
148 
149     void CancelDataWatcher(DataWatcherInterface* watcher) override;
150 
endpoint_state() const151     RefCountedPtr<EndpointState> endpoint_state() const {
152       if (subchannel_state_ == nullptr) return nullptr;
153       return subchannel_state_->endpoint_state();
154     }
155 
156    private:
157     class WatcherWrapper final
158         : public SubchannelInterface::ConnectivityStateWatcherInterface {
159      public:
WatcherWrapper(WeakRefCountedPtr<SubchannelWrapper> subchannel_wrapper,std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> health_watcher,bool ejected)160       WatcherWrapper(WeakRefCountedPtr<SubchannelWrapper> subchannel_wrapper,
161                      std::shared_ptr<
162                          SubchannelInterface::ConnectivityStateWatcherInterface>
163                          health_watcher,
164                      bool ejected)
165           : subchannel_wrapper_(std::move(subchannel_wrapper)),
166             watcher_(std::move(health_watcher)),
167             ejected_(ejected) {}
168 
Eject()169       void Eject() {
170         ejected_ = true;
171         if (last_seen_state_.has_value()) {
172           watcher_->OnConnectivityStateChange(
173               GRPC_CHANNEL_TRANSIENT_FAILURE,
174               absl::UnavailableError(
175                   absl::StrCat(subchannel_wrapper_->address(),
176                                ": subchannel ejected by outlier detection")));
177         }
178       }
179 
Uneject()180       void Uneject() {
181         ejected_ = false;
182         if (last_seen_state_.has_value()) {
183           watcher_->OnConnectivityStateChange(*last_seen_state_,
184                                               last_seen_status_);
185         }
186       }
187 
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)188       void OnConnectivityStateChange(grpc_connectivity_state new_state,
189                                      absl::Status status) override {
190         const bool send_update = !last_seen_state_.has_value() || !ejected_;
191         last_seen_state_ = new_state;
192         last_seen_status_ = status;
193         if (send_update) {
194           if (ejected_) {
195             new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
196             status = absl::UnavailableError(
197                 absl::StrCat(subchannel_wrapper_->address(),
198                              ": subchannel ejected by outlier detection"));
199           }
200           watcher_->OnConnectivityStateChange(new_state, status);
201         }
202       }
203 
interested_parties()204       grpc_pollset_set* interested_parties() override {
205         return watcher_->interested_parties();
206       }
207 
208      private:
209       WeakRefCountedPtr<SubchannelWrapper> subchannel_wrapper_;
210       std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
211           watcher_;
212       absl::optional<grpc_connectivity_state> last_seen_state_;
213       absl::Status last_seen_status_;
214       bool ejected_;
215     };
216 
Orphaned()217     void Orphaned() override {
218       if (!IsWorkSerializerDispatchEnabled()) {
219         if (subchannel_state_ != nullptr) {
220           subchannel_state_->RemoveSubchannel(this);
221         }
222         return;
223       }
224       work_serializer_->Run(
225           [self = WeakRefAsSubclass<SubchannelWrapper>()]() {
226             if (self->subchannel_state_ != nullptr) {
227               self->subchannel_state_->RemoveSubchannel(self.get());
228             }
229           },
230           DEBUG_LOCATION);
231     }
232 
233     std::shared_ptr<WorkSerializer> work_serializer_;
234     RefCountedPtr<SubchannelState> subchannel_state_;
235     bool ejected_ = false;
236     WatcherWrapper* watcher_wrapper_ = nullptr;
237   };
238 
239   class SubchannelState final : public RefCounted<SubchannelState> {
240    public:
AddSubchannel(SubchannelWrapper * wrapper)241     void AddSubchannel(SubchannelWrapper* wrapper) {
242       subchannels_.insert(wrapper);
243     }
244 
RemoveSubchannel(SubchannelWrapper * wrapper)245     void RemoveSubchannel(SubchannelWrapper* wrapper) {
246       subchannels_.erase(wrapper);
247     }
248 
endpoint_state()249     RefCountedPtr<EndpointState> endpoint_state() {
250       MutexLock lock(&mu_);
251       return endpoint_state_;
252     }
253 
set_endpoint_state(RefCountedPtr<EndpointState> endpoint_state)254     void set_endpoint_state(RefCountedPtr<EndpointState> endpoint_state) {
255       MutexLock lock(&mu_);
256       endpoint_state_ = std::move(endpoint_state);
257     }
258 
Eject()259     void Eject() {
260       // Ejecting the subchannel may cause the child policy to unref the
261       // subchannel, so we need to be prepared for the set to be modified
262       // while we are iterating.
263       for (auto it = subchannels_.begin(); it != subchannels_.end();) {
264         SubchannelWrapper* subchannel = *it;
265         ++it;
266         subchannel->Eject();
267       }
268     }
269 
Uneject()270     void Uneject() {
271       for (auto& subchannel : subchannels_) {
272         subchannel->Uneject();
273       }
274     }
275 
276    private:
277     std::set<SubchannelWrapper*> subchannels_;
278     Mutex mu_;
279     RefCountedPtr<EndpointState> endpoint_state_ ABSL_GUARDED_BY(mu_);
280   };
281 
282   class EndpointState final : public RefCounted<EndpointState> {
283    public:
EndpointState(std::set<SubchannelState * > subchannels)284     explicit EndpointState(std::set<SubchannelState*> subchannels)
285         : subchannels_(std::move(subchannels)) {
286       for (SubchannelState* subchannel : subchannels_) {
287         subchannel->set_endpoint_state(Ref());
288       }
289     }
290 
RotateBucket()291     void RotateBucket() {
292       backup_bucket_->successes = 0;
293       backup_bucket_->failures = 0;
294       current_bucket_.swap(backup_bucket_);
295       active_bucket_.store(current_bucket_.get());
296     }
297 
GetSuccessRateAndVolume()298     absl::optional<std::pair<double, uint64_t>> GetSuccessRateAndVolume() {
299       uint64_t total_request =
300           backup_bucket_->successes + backup_bucket_->failures;
301       if (total_request == 0) {
302         return absl::nullopt;
303       }
304       double success_rate =
305           backup_bucket_->successes * 100.0 /
306           (backup_bucket_->successes + backup_bucket_->failures);
307       return {
308           {success_rate, backup_bucket_->successes + backup_bucket_->failures}};
309     }
310 
AddSuccessCount()311     void AddSuccessCount() { active_bucket_.load()->successes.fetch_add(1); }
312 
AddFailureCount()313     void AddFailureCount() { active_bucket_.load()->failures.fetch_add(1); }
314 
ejection_time() const315     absl::optional<Timestamp> ejection_time() const { return ejection_time_; }
316 
Eject(const Timestamp & time)317     void Eject(const Timestamp& time) {
318       ejection_time_ = time;
319       ++multiplier_;
320       for (SubchannelState* subchannel_state : subchannels_) {
321         subchannel_state->Eject();
322       }
323     }
324 
Uneject()325     void Uneject() {
326       ejection_time_.reset();
327       for (SubchannelState* subchannel_state : subchannels_) {
328         subchannel_state->Uneject();
329       }
330     }
331 
MaybeUneject(uint64_t base_ejection_time_in_millis,uint64_t max_ejection_time_in_millis)332     bool MaybeUneject(uint64_t base_ejection_time_in_millis,
333                       uint64_t max_ejection_time_in_millis) {
334       if (!ejection_time_.has_value()) {
335         if (multiplier_ > 0) {
336           --multiplier_;
337         }
338       } else {
339         CHECK(ejection_time_.has_value());
340         auto change_time = ejection_time_.value() +
341                            Duration::Milliseconds(std::min(
342                                base_ejection_time_in_millis * multiplier_,
343                                std::max(base_ejection_time_in_millis,
344                                         max_ejection_time_in_millis)));
345         if (change_time < Timestamp::Now()) {
346           Uneject();
347           return true;
348         }
349       }
350       return false;
351     }
352 
DisableEjection()353     void DisableEjection() {
354       if (ejection_time_.has_value()) Uneject();
355       multiplier_ = 0;
356     }
357 
358    private:
359     struct Bucket {
360       std::atomic<uint64_t> successes;
361       std::atomic<uint64_t> failures;
362     };
363 
364     const std::set<SubchannelState*> subchannels_;
365 
366     std::unique_ptr<Bucket> current_bucket_ = std::make_unique<Bucket>();
367     std::unique_ptr<Bucket> backup_bucket_ = std::make_unique<Bucket>();
368     // The bucket used to update call counts.
369     // Points to either current_bucket or active_bucket.
370     std::atomic<Bucket*> active_bucket_{current_bucket_.get()};
371     uint32_t multiplier_ = 0;
372     absl::optional<Timestamp> ejection_time_;
373   };
374 
375   // A picker that wraps the picker from the child to perform outlier detection.
376   class Picker final : public SubchannelPicker {
377    public:
378     Picker(OutlierDetectionLb* outlier_detection_lb,
379            RefCountedPtr<SubchannelPicker> picker, bool counting_enabled);
380 
381     PickResult Pick(PickArgs args) override;
382 
383    private:
384     class SubchannelCallTracker;
385     RefCountedPtr<SubchannelPicker> picker_;
386     bool counting_enabled_;
387   };
388 
389   class Helper final
390       : public ParentOwningDelegatingChannelControlHelper<OutlierDetectionLb> {
391    public:
Helper(RefCountedPtr<OutlierDetectionLb> outlier_detection_policy)392     explicit Helper(RefCountedPtr<OutlierDetectionLb> outlier_detection_policy)
393         : ParentOwningDelegatingChannelControlHelper(
394               std::move(outlier_detection_policy)) {}
395 
396     RefCountedPtr<SubchannelInterface> CreateSubchannel(
397         const grpc_resolved_address& address,
398         const ChannelArgs& per_address_args, const ChannelArgs& args) override;
399     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
400                      RefCountedPtr<SubchannelPicker> picker) override;
401   };
402 
403   class EjectionTimer final : public InternallyRefCounted<EjectionTimer> {
404    public:
405     EjectionTimer(RefCountedPtr<OutlierDetectionLb> parent,
406                   Timestamp start_time);
407 
408     void Orphan() override;
409 
StartTime() const410     Timestamp StartTime() const { return start_time_; }
411 
412    private:
413     void OnTimerLocked();
414 
415     RefCountedPtr<OutlierDetectionLb> parent_;
416     absl::optional<EventEngine::TaskHandle> timer_handle_;
417     Timestamp start_time_;
418     absl::BitGen bit_gen_;
419   };
420 
421   ~OutlierDetectionLb() override;
422 
423   void ShutdownLocked() override;
424 
425   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
426       const ChannelArgs& args);
427 
428   void MaybeUpdatePickerLocked();
429 
430   // Current config from the resolver.
431   RefCountedPtr<OutlierDetectionLbConfig> config_;
432 
433   // Internal state.
434   bool shutting_down_ = false;
435 
436   OrphanablePtr<LoadBalancingPolicy> child_policy_;
437 
438   // Latest state and picker reported by the child policy.
439   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
440   absl::Status status_;
441   RefCountedPtr<SubchannelPicker> picker_;
442   std::map<EndpointAddressSet, RefCountedPtr<EndpointState>>
443       endpoint_state_map_;
444   std::map<grpc_resolved_address, RefCountedPtr<SubchannelState>,
445            ResolvedAddressLessThan>
446       subchannel_state_map_;
447   OrphanablePtr<EjectionTimer> ejection_timer_;
448 };
449 
450 //
451 // OutlierDetectionLb::SubchannelWrapper
452 //
453 
Eject()454 void OutlierDetectionLb::SubchannelWrapper::Eject() {
455   ejected_ = true;
456   if (watcher_wrapper_ != nullptr) watcher_wrapper_->Eject();
457 }
458 
Uneject()459 void OutlierDetectionLb::SubchannelWrapper::Uneject() {
460   ejected_ = false;
461   if (watcher_wrapper_ != nullptr) watcher_wrapper_->Uneject();
462 }
463 
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)464 void OutlierDetectionLb::SubchannelWrapper::AddDataWatcher(
465     std::unique_ptr<DataWatcherInterface> watcher) {
466   auto* w = static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get());
467   if (w->type() == HealthProducer::Type()) {
468     auto* health_watcher = static_cast<HealthWatcher*>(watcher.get());
469     auto watcher_wrapper = std::make_shared<WatcherWrapper>(
470         WeakRefAsSubclass<SubchannelWrapper>(), health_watcher->TakeWatcher(),
471         ejected_);
472     watcher_wrapper_ = watcher_wrapper.get();
473     health_watcher->SetWatcher(std::move(watcher_wrapper));
474   }
475   DelegatingSubchannel::AddDataWatcher(std::move(watcher));
476 }
477 
CancelDataWatcher(DataWatcherInterface * watcher)478 void OutlierDetectionLb::SubchannelWrapper::CancelDataWatcher(
479     DataWatcherInterface* watcher) {
480   auto* w = static_cast<InternalSubchannelDataWatcherInterface*>(watcher);
481   if (w->type() == HealthProducer::Type()) watcher_wrapper_ = nullptr;
482   DelegatingSubchannel::CancelDataWatcher(watcher);
483 }
484 
485 //
486 // OutlierDetectionLb::Picker::SubchannelCallTracker
487 //
488 
489 class OutlierDetectionLb::Picker::SubchannelCallTracker final
490     : public LoadBalancingPolicy::SubchannelCallTrackerInterface {
491  public:
SubchannelCallTracker(std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface> original_subchannel_call_tracker,RefCountedPtr<EndpointState> endpoint_state)492   SubchannelCallTracker(
493       std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
494           original_subchannel_call_tracker,
495       RefCountedPtr<EndpointState> endpoint_state)
496       : original_subchannel_call_tracker_(
497             std::move(original_subchannel_call_tracker)),
498         endpoint_state_(std::move(endpoint_state)) {}
499 
~SubchannelCallTracker()500   ~SubchannelCallTracker() override {
501     endpoint_state_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
502   }
503 
Start()504   void Start() override {
505     // This tracker does not care about started calls only finished calls.
506     // Delegate if needed.
507     if (original_subchannel_call_tracker_ != nullptr) {
508       original_subchannel_call_tracker_->Start();
509     }
510   }
511 
Finish(FinishArgs args)512   void Finish(FinishArgs args) override {
513     // Delegate if needed.
514     if (original_subchannel_call_tracker_ != nullptr) {
515       original_subchannel_call_tracker_->Finish(args);
516     }
517     // Record call completion based on status for outlier detection
518     // calculations.
519     if (args.status.ok()) {
520       endpoint_state_->AddSuccessCount();
521     } else {
522       endpoint_state_->AddFailureCount();
523     }
524   }
525 
526  private:
527   std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
528       original_subchannel_call_tracker_;
529   RefCountedPtr<EndpointState> endpoint_state_;
530 };
531 
532 //
533 // OutlierDetectionLb::Picker
534 //
535 
Picker(OutlierDetectionLb * outlier_detection_lb,RefCountedPtr<SubchannelPicker> picker,bool counting_enabled)536 OutlierDetectionLb::Picker::Picker(OutlierDetectionLb* outlier_detection_lb,
537                                    RefCountedPtr<SubchannelPicker> picker,
538                                    bool counting_enabled)
539     : picker_(std::move(picker)), counting_enabled_(counting_enabled) {
540   GRPC_TRACE_LOG(outlier_detection_lb, INFO)
541       << "[outlier_detection_lb " << outlier_detection_lb
542       << "] constructed new picker " << this << " and counting " << "is "
543       << (counting_enabled ? "enabled" : "disabled");
544 }
545 
Pick(LoadBalancingPolicy::PickArgs args)546 LoadBalancingPolicy::PickResult OutlierDetectionLb::Picker::Pick(
547     LoadBalancingPolicy::PickArgs args) {
548   if (picker_ == nullptr) {  // Should never happen.
549     return PickResult::Fail(absl::InternalError(
550         "outlier_detection picker not given any child picker"));
551   }
552   // Delegate to child picker
553   PickResult result = picker_->Pick(args);
554   auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
555   if (complete_pick != nullptr) {
556     auto* subchannel_wrapper =
557         static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
558     // Inject subchannel call tracker to record call completion as long as
559     // either success_rate_ejection or failure_percentage_ejection is enabled.
560     if (counting_enabled_) {
561       auto endpoint_state = subchannel_wrapper->endpoint_state();
562       if (endpoint_state != nullptr) {
563         complete_pick->subchannel_call_tracker =
564             std::make_unique<SubchannelCallTracker>(
565                 std::move(complete_pick->subchannel_call_tracker),
566                 std::move(endpoint_state));
567       }
568     }
569     // Unwrap subchannel to pass back up the stack.
570     complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
571   }
572   return result;
573 }
574 
575 //
576 // OutlierDetectionLb
577 //
578 
OutlierDetectionLb(Args args)579 OutlierDetectionLb::OutlierDetectionLb(Args args)
580     : LoadBalancingPolicy(std::move(args)) {
581   GRPC_TRACE_LOG(outlier_detection_lb, INFO)
582       << "[outlier_detection_lb " << this << "] created";
583 }
584 
~OutlierDetectionLb()585 OutlierDetectionLb::~OutlierDetectionLb() {
586   GRPC_TRACE_LOG(outlier_detection_lb, INFO)
587       << "[outlier_detection_lb " << this
588       << "] destroying outlier_detection LB policy";
589 }
590 
ShutdownLocked()591 void OutlierDetectionLb::ShutdownLocked() {
592   GRPC_TRACE_LOG(outlier_detection_lb, INFO)
593       << "[outlier_detection_lb " << this << "] shutting down";
594   ejection_timer_.reset();
595   shutting_down_ = true;
596   // Remove the child policy's interested_parties pollset_set from the
597   // xDS policy.
598   if (child_policy_ != nullptr) {
599     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
600                                      interested_parties());
601     child_policy_.reset();
602   }
603   // Drop our ref to the child's picker, in case it's holding a ref to
604   // the child.
605   picker_.reset();
606 }
607 
ExitIdleLocked()608 void OutlierDetectionLb::ExitIdleLocked() {
609   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
610 }
611 
ResetBackoffLocked()612 void OutlierDetectionLb::ResetBackoffLocked() {
613   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
614 }
615 
UpdateLocked(UpdateArgs args)616 absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
617   GRPC_TRACE_LOG(outlier_detection_lb, INFO)
618       << "[outlier_detection_lb " << this << "] Received update";
619   auto old_config = std::move(config_);
620   // Update config.
621   config_ = args.config.TakeAsSubclass<OutlierDetectionLbConfig>();
622   // Update outlier detection timer.
623   if (!config_->CountingEnabled()) {
624     // No need for timer.  Cancel the current timer, if any.
625     GRPC_TRACE_LOG(outlier_detection_lb, INFO)
626         << "[outlier_detection_lb " << this
627         << "] counting disabled, cancelling timer";
628     ejection_timer_.reset();
629   } else if (ejection_timer_ == nullptr) {
630     // No timer running.  Start it now.
631     GRPC_TRACE_LOG(outlier_detection_lb, INFO)
632         << "[outlier_detection_lb " << this << "] starting timer";
633     ejection_timer_ = MakeOrphanable<EjectionTimer>(
634         RefAsSubclass<OutlierDetectionLb>(), Timestamp::Now());
635     for (const auto& p : endpoint_state_map_) {
636       p.second->RotateBucket();  // Reset call counters.
637     }
638   } else if (old_config->outlier_detection_config().interval !=
639              config_->outlier_detection_config().interval) {
640     // Timer interval changed.  Cancel the current timer and start a new one
641     // with the same start time.
642     // Note that if the new deadline is in the past, the timer will fire
643     // immediately.
644     GRPC_TRACE_LOG(outlier_detection_lb, INFO)
645         << "[outlier_detection_lb " << this
646         << "] interval changed, replacing timer";
647     ejection_timer_ = MakeOrphanable<EjectionTimer>(
648         RefAsSubclass<OutlierDetectionLb>(), ejection_timer_->StartTime());
649   }
650   // Update subchannel and endpoint maps.
651   if (args.addresses.ok()) {
652     std::set<EndpointAddressSet> current_endpoints;
653     std::set<grpc_resolved_address, ResolvedAddressLessThan> current_addresses;
654     (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
655       EndpointAddressSet key(endpoint.addresses());
656       current_endpoints.emplace(key);
657       for (const grpc_resolved_address& address : endpoint.addresses()) {
658         current_addresses.emplace(address);
659       }
660       // Find the entry in the endpoint map.
661       auto it = endpoint_state_map_.find(key);
662       if (it == endpoint_state_map_.end()) {
663         GRPC_TRACE_LOG(outlier_detection_lb, INFO)
664             << "[outlier_detection_lb " << this
665             << "] adding endpoint entry for " << key.ToString();
666         // The endpoint is not present in the map, so we'll need to add it.
667         // Start by getting a pointer to the entry for each address in the
668         // subchannel map, creating the entry if needed.
669         std::set<SubchannelState*> subchannels;
670         for (const grpc_resolved_address& address : endpoint.addresses()) {
671           auto it2 = subchannel_state_map_.find(address);
672           if (it2 == subchannel_state_map_.end()) {
673             if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
674               std::string address_str = grpc_sockaddr_to_string(&address, false)
675                                             .value_or("<unknown>");
676               LOG(INFO) << "[outlier_detection_lb " << this
677                         << "] adding address entry for " << address_str;
678             }
679             it2 = subchannel_state_map_
680                       .emplace(address, MakeRefCounted<SubchannelState>())
681                       .first;
682           }
683           subchannels.insert(it2->second.get());
684         }
685         // Now create the endpoint.
686         endpoint_state_map_.emplace(
687             key, MakeRefCounted<EndpointState>(std::move(subchannels)));
688       } else if (!config_->CountingEnabled()) {
689         // If counting is not enabled, reset state.
690         GRPC_TRACE_LOG(outlier_detection_lb, INFO)
691             << "[outlier_detection_lb " << this
692             << "] counting disabled; disabling ejection for " << key.ToString();
693         it->second->DisableEjection();
694       }
695     });
696     // Remove any entries we no longer need in the subchannel map.
697     for (auto it = subchannel_state_map_.begin();
698          it != subchannel_state_map_.end();) {
699       if (current_addresses.find(it->first) == current_addresses.end()) {
700         if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
701           std::string address_str =
702               grpc_sockaddr_to_string(&it->first, false).value_or("<unknown>");
703           LOG(INFO) << "[outlier_detection_lb " << this
704                     << "] removing subchannel map entry " << address_str;
705         }
706         // Don't hold a ref to the corresponding EndpointState object,
707         // because there could be subchannel wrappers keeping this alive
708         // for a while, and we don't need them to do any call tracking.
709         it->second->set_endpoint_state(nullptr);
710         it = subchannel_state_map_.erase(it);
711       } else {
712         ++it;
713       }
714     }
715     // Remove any entries we no longer need in the endpoint map.
716     for (auto it = endpoint_state_map_.begin();
717          it != endpoint_state_map_.end();) {
718       if (current_endpoints.find(it->first) == current_endpoints.end()) {
719         GRPC_TRACE_LOG(outlier_detection_lb, INFO)
720             << "[outlier_detection_lb " << this
721             << "] removing endpoint map entry " << it->first.ToString();
722         it = endpoint_state_map_.erase(it);
723       } else {
724         ++it;
725       }
726     }
727   }
728   // Create child policy if needed.
729   if (child_policy_ == nullptr) {
730     child_policy_ = CreateChildPolicyLocked(args.args);
731   }
732   // Update child policy.
733   UpdateArgs update_args;
734   update_args.addresses = std::move(args.addresses);
735   update_args.resolution_note = std::move(args.resolution_note);
736   update_args.config = config_->child_policy();
737   update_args.args = std::move(args.args);
738   GRPC_TRACE_LOG(outlier_detection_lb, INFO)
739       << "[outlier_detection_lb " << this << "] Updating child policy handler "
740       << child_policy_.get();
741   return child_policy_->UpdateLocked(std::move(update_args));
742 }
743 
MaybeUpdatePickerLocked()744 void OutlierDetectionLb::MaybeUpdatePickerLocked() {
745   if (picker_ != nullptr) {
746     auto outlier_detection_picker =
747         MakeRefCounted<Picker>(this, picker_, config_->CountingEnabled());
748     GRPC_TRACE_LOG(outlier_detection_lb, INFO)
749         << "[outlier_detection_lb " << this
750         << "] updating connectivity: state=" << ConnectivityStateName(state_)
751         << " status=(" << status_
752         << ") picker=" << outlier_detection_picker.get();
753     channel_control_helper()->UpdateState(state_, status_,
754                                           std::move(outlier_detection_picker));
755   }
756 }
757 
CreateChildPolicyLocked(const ChannelArgs & args)758 OrphanablePtr<LoadBalancingPolicy> OutlierDetectionLb::CreateChildPolicyLocked(
759     const ChannelArgs& args) {
760   LoadBalancingPolicy::Args lb_policy_args;
761   lb_policy_args.work_serializer = work_serializer();
762   lb_policy_args.args = args;
763   lb_policy_args.channel_control_helper = std::make_unique<Helper>(
764       RefAsSubclass<OutlierDetectionLb>(DEBUG_LOCATION, "Helper"));
765   OrphanablePtr<LoadBalancingPolicy> lb_policy =
766       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
767                                          &outlier_detection_lb_trace);
768   GRPC_TRACE_LOG(outlier_detection_lb, INFO)
769       << "[outlier_detection_lb " << this
770       << "] Created new child policy handler " << lb_policy.get();
771   // Add our interested_parties pollset_set to that of the newly created
772   // child policy. This will make the child policy progress upon activity on
773   // this policy, which in turn is tied to the application's call.
774   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
775                                    interested_parties());
776   return lb_policy;
777 }
778 
779 //
780 // OutlierDetectionLb::Helper
781 //
782 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)783 RefCountedPtr<SubchannelInterface> OutlierDetectionLb::Helper::CreateSubchannel(
784     const grpc_resolved_address& address, const ChannelArgs& per_address_args,
785     const ChannelArgs& args) {
786   if (parent()->shutting_down_) return nullptr;
787   RefCountedPtr<SubchannelState> subchannel_state;
788   auto it = parent()->subchannel_state_map_.find(address);
789   if (it != parent()->subchannel_state_map_.end()) {
790     subchannel_state = it->second->Ref();
791   }
792   if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
793     std::string address_str =
794         grpc_sockaddr_to_string(&address, false).value_or("<unknown>");
795     LOG(INFO) << "[outlier_detection_lb " << parent()
796               << "] creating subchannel for " << address_str
797               << ", subchannel state " << subchannel_state.get();
798   }
799   auto subchannel = MakeRefCounted<SubchannelWrapper>(
800       parent()->work_serializer(), subchannel_state,
801       parent()->channel_control_helper()->CreateSubchannel(
802           address, per_address_args, args));
803   if (subchannel_state != nullptr) {
804     subchannel_state->AddSubchannel(subchannel.get());
805   }
806   return subchannel;
807 }
808 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)809 void OutlierDetectionLb::Helper::UpdateState(
810     grpc_connectivity_state state, const absl::Status& status,
811     RefCountedPtr<SubchannelPicker> picker) {
812   if (parent()->shutting_down_) return;
813   GRPC_TRACE_LOG(outlier_detection_lb, INFO)
814       << "[outlier_detection_lb " << parent()
815       << "] child connectivity state update: state="
816       << ConnectivityStateName(state) << " (" << status
817       << ") picker=" << picker.get();
818   // Save the state and picker.
819   parent()->state_ = state;
820   parent()->status_ = status;
821   parent()->picker_ = std::move(picker);
822   // Wrap the picker and return it to the channel.
823   parent()->MaybeUpdatePickerLocked();
824 }
825 
826 //
827 // OutlierDetectionLb::EjectionTimer
828 //
829 
EjectionTimer(RefCountedPtr<OutlierDetectionLb> parent,Timestamp start_time)830 OutlierDetectionLb::EjectionTimer::EjectionTimer(
831     RefCountedPtr<OutlierDetectionLb> parent, Timestamp start_time)
832     : parent_(std::move(parent)), start_time_(start_time) {
833   auto interval = parent_->config_->outlier_detection_config().interval;
834   GRPC_TRACE_LOG(outlier_detection_lb, INFO)
835       << "[outlier_detection_lb " << parent_.get()
836       << "] ejection timer will run in " << interval.ToString();
837   timer_handle_ = parent_->channel_control_helper()->GetEventEngine()->RunAfter(
838       interval, [self = Ref(DEBUG_LOCATION, "EjectionTimer")]() mutable {
839         ApplicationCallbackExecCtx callback_exec_ctx;
840         ExecCtx exec_ctx;
841         auto self_ptr = self.get();
842         self_ptr->parent_->work_serializer()->Run(
843             [self = std::move(self)]() { self->OnTimerLocked(); },
844             DEBUG_LOCATION);
845       });
846 }
847 
Orphan()848 void OutlierDetectionLb::EjectionTimer::Orphan() {
849   if (timer_handle_.has_value()) {
850     parent_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
851     timer_handle_.reset();
852   }
853   Unref();
854 }
855 
OnTimerLocked()856 void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
857   if (!timer_handle_.has_value()) return;
858   timer_handle_.reset();
859   GRPC_TRACE_LOG(outlier_detection_lb, INFO)
860       << "[outlier_detection_lb " << parent_.get()
861       << "] ejection timer running";
862   std::map<EndpointState*, double> success_rate_ejection_candidates;
863   std::map<EndpointState*, double> failure_percentage_ejection_candidates;
864   size_t ejected_host_count = 0;
865   double success_rate_sum = 0;
866   auto time_now = Timestamp::Now();
867   auto& config = parent_->config_->outlier_detection_config();
868   for (auto& state : parent_->endpoint_state_map_) {
869     auto* endpoint_state = state.second.get();
870     // For each address, swap the call counter's buckets in that address's
871     // map entry.
872     endpoint_state->RotateBucket();
873     // Gather data to run success rate algorithm or failure percentage
874     // algorithm.
875     if (endpoint_state->ejection_time().has_value()) {
876       ++ejected_host_count;
877     }
878     absl::optional<std::pair<double, uint64_t>> host_success_rate_and_volume =
879         endpoint_state->GetSuccessRateAndVolume();
880     if (!host_success_rate_and_volume.has_value()) {
881       continue;
882     }
883     double success_rate = host_success_rate_and_volume->first;
884     uint64_t request_volume = host_success_rate_and_volume->second;
885     if (config.success_rate_ejection.has_value()) {
886       if (request_volume >= config.success_rate_ejection->request_volume) {
887         success_rate_ejection_candidates[endpoint_state] = success_rate;
888         success_rate_sum += success_rate;
889       }
890     }
891     if (config.failure_percentage_ejection.has_value()) {
892       if (request_volume >=
893           config.failure_percentage_ejection->request_volume) {
894         failure_percentage_ejection_candidates[endpoint_state] = success_rate;
895       }
896     }
897   }
898   GRPC_TRACE_LOG(outlier_detection_lb, INFO)
899       << "[outlier_detection_lb " << parent_.get() << "] found "
900       << success_rate_ejection_candidates.size()
901       << " success rate candidates and "
902       << failure_percentage_ejection_candidates.size()
903       << " failure percentage candidates; ejected_host_count="
904       << ejected_host_count
905       << "; success_rate_sum=" << absl::StrFormat("%.3f", success_rate_sum);
906   // success rate algorithm
907   if (!success_rate_ejection_candidates.empty() &&
908       success_rate_ejection_candidates.size() >=
909           config.success_rate_ejection->minimum_hosts) {
910     GRPC_TRACE_LOG(outlier_detection_lb, INFO)
911         << "[outlier_detection_lb " << parent_.get()
912         << "] running success rate algorithm: " << "stdev_factor="
913         << config.success_rate_ejection->stdev_factor
914         << ", enforcement_percentage="
915         << config.success_rate_ejection->enforcement_percentage;
916     // calculate ejection threshold: (mean - stdev *
917     // (success_rate_ejection.stdev_factor / 1000))
918     double mean = success_rate_sum / success_rate_ejection_candidates.size();
919     double variance = 0;
920     for (const auto& p : success_rate_ejection_candidates) {
921       variance += std::pow(p.second - mean, 2);
922     }
923     variance /= success_rate_ejection_candidates.size();
924     double stdev = std::sqrt(variance);
925     const double success_rate_stdev_factor =
926         static_cast<double>(config.success_rate_ejection->stdev_factor) / 1000;
927     double ejection_threshold = mean - (stdev * success_rate_stdev_factor);
928     GRPC_TRACE_LOG(outlier_detection_lb, INFO)
929         << "[outlier_detection_lb " << parent_.get() << "] stdev=" << stdev
930         << ", ejection_threshold=" << ejection_threshold;
931     for (auto& candidate : success_rate_ejection_candidates) {
932       GRPC_TRACE_LOG(outlier_detection_lb, INFO)
933           << "[outlier_detection_lb " << parent_.get()
934           << "] checking candidate " << candidate.first
935           << ": success_rate=" << candidate.second;
936       if (candidate.second < ejection_threshold) {
937         uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
938         double current_percent =
939             100.0 * ejected_host_count / parent_->endpoint_state_map_.size();
940         GRPC_TRACE_LOG(outlier_detection_lb, INFO)
941             << "[outlier_detection_lb " << parent_.get()
942             << "] random_key=" << random_key
943             << " ejected_host_count=" << ejected_host_count
944             << " current_percent=" << absl::StrFormat("%.3f", current_percent);
945         if (random_key < config.success_rate_ejection->enforcement_percentage &&
946             (ejected_host_count == 0 ||
947              (current_percent < config.max_ejection_percent))) {
948           // Eject and record the timestamp for use when ejecting addresses in
949           // this iteration.
950           GRPC_TRACE_LOG(outlier_detection_lb, INFO)
951               << "[outlier_detection_lb " << parent_.get()
952               << "] ejecting candidate";
953           candidate.first->Eject(time_now);
954           ++ejected_host_count;
955         }
956       }
957     }
958   }
959   // failure percentage algorithm
960   if (!failure_percentage_ejection_candidates.empty() &&
961       failure_percentage_ejection_candidates.size() >=
962           config.failure_percentage_ejection->minimum_hosts) {
963     GRPC_TRACE_LOG(outlier_detection_lb, INFO)
964         << "[outlier_detection_lb " << parent_.get()
965         << "] running failure percentage algorithm: " << "threshold="
966         << config.failure_percentage_ejection->threshold
967         << ", enforcement_percentage="
968         << config.failure_percentage_ejection->enforcement_percentage;
969     for (auto& candidate : failure_percentage_ejection_candidates) {
970       GRPC_TRACE_LOG(outlier_detection_lb, INFO)
971           << "[outlier_detection_lb " << parent_.get()
972           << "] checking candidate " << candidate.first
973           << ": success_rate=" << candidate.second;
974       // Extra check to make sure success rate algorithm didn't already
975       // eject this backend.
976       if (candidate.first->ejection_time().has_value()) continue;
977       if ((100.0 - candidate.second) >
978           config.failure_percentage_ejection->threshold) {
979         uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
980         double current_percent =
981             100.0 * ejected_host_count / parent_->endpoint_state_map_.size();
982         GRPC_TRACE_LOG(outlier_detection_lb, INFO)
983             << "[outlier_detection_lb " << parent_.get()
984             << "] random_key=" << random_key
985             << " ejected_host_count=" << ejected_host_count
986             << " current_percent=" << current_percent;
987         if (random_key <
988                 config.failure_percentage_ejection->enforcement_percentage &&
989             (ejected_host_count == 0 ||
990              (current_percent < config.max_ejection_percent))) {
991           // Eject and record the timestamp for use when ejecting addresses in
992           // this iteration.
993           GRPC_TRACE_LOG(outlier_detection_lb, INFO)
994               << "[outlier_detection_lb " << parent_.get()
995               << "] ejecting candidate";
996           candidate.first->Eject(time_now);
997           ++ejected_host_count;
998         }
999       }
1000     }
1001   }
1002   // For each address in the map:
1003   //   If the address is not ejected and the multiplier is greater than 0,
1004   //   decrease the multiplier by 1. If the address is ejected, and the
1005   //   current time is after ejection_timestamp + min(base_ejection_time *
1006   //   multiplier, max(base_ejection_time, max_ejection_time)), un-eject the
1007   //   address.
1008   for (auto& state : parent_->endpoint_state_map_) {
1009     auto* endpoint_state = state.second.get();
1010     const bool unejected = endpoint_state->MaybeUneject(
1011         config.base_ejection_time.millis(), config.max_ejection_time.millis());
1012     if (unejected && GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
1013       LOG(INFO) << "[outlier_detection_lb " << parent_.get()
1014                 << "] unejected endpoint " << state.first.ToString() << " ("
1015                 << endpoint_state << ")";
1016     }
1017   }
1018   parent_->ejection_timer_ =
1019       MakeOrphanable<EjectionTimer>(parent_, Timestamp::Now());
1020 }
1021 
1022 //
1023 // factory
1024 //
1025 
1026 class OutlierDetectionLbFactory final : public LoadBalancingPolicyFactory {
1027  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1028   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1029       LoadBalancingPolicy::Args args) const override {
1030     return MakeOrphanable<OutlierDetectionLb>(std::move(args));
1031   }
1032 
name() const1033   absl::string_view name() const override { return kOutlierDetection; }
1034 
1035   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const1036   ParseLoadBalancingConfig(const Json& json) const override {
1037     ValidationErrors errors;
1038     OutlierDetectionConfig outlier_detection_config;
1039     RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
1040     {
1041       outlier_detection_config =
1042           LoadFromJson<OutlierDetectionConfig>(json, JsonArgs(), &errors);
1043       // Parse childPolicy manually.
1044       {
1045         ValidationErrors::ScopedField field(&errors, ".childPolicy");
1046         auto it = json.object().find("childPolicy");
1047         if (it == json.object().end()) {
1048           errors.AddError("field not present");
1049         } else {
1050           auto child_policy_config = CoreConfiguration::Get()
1051                                          .lb_policy_registry()
1052                                          .ParseLoadBalancingConfig(it->second);
1053           if (!child_policy_config.ok()) {
1054             errors.AddError(child_policy_config.status().message());
1055           } else {
1056             child_policy = std::move(*child_policy_config);
1057           }
1058         }
1059       }
1060     }
1061     if (!errors.ok()) {
1062       return errors.status(
1063           absl::StatusCode::kInvalidArgument,
1064           "errors validating outlier_detection LB policy config");
1065     }
1066     return MakeRefCounted<OutlierDetectionLbConfig>(outlier_detection_config,
1067                                                     std::move(child_policy));
1068   }
1069 };
1070 
1071 }  // namespace
1072 
1073 //
1074 // OutlierDetectionConfig
1075 //
1076 
1077 const JsonLoaderInterface*
JsonLoader(const JsonArgs &)1078 OutlierDetectionConfig::SuccessRateEjection::JsonLoader(const JsonArgs&) {
1079   static const auto* loader =
1080       JsonObjectLoader<SuccessRateEjection>()
1081           .OptionalField("stdevFactor", &SuccessRateEjection::stdev_factor)
1082           .OptionalField("enforcementPercentage",
1083                          &SuccessRateEjection::enforcement_percentage)
1084           .OptionalField("minimumHosts", &SuccessRateEjection::minimum_hosts)
1085           .OptionalField("requestVolume", &SuccessRateEjection::request_volume)
1086           .Finish();
1087   return loader;
1088 }
1089 
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)1090 void OutlierDetectionConfig::SuccessRateEjection::JsonPostLoad(
1091     const Json&, const JsonArgs&, ValidationErrors* errors) {
1092   if (enforcement_percentage > 100) {
1093     ValidationErrors::ScopedField field(errors, ".enforcement_percentage");
1094     errors->AddError("value must be <= 100");
1095   }
1096 }
1097 
1098 const JsonLoaderInterface*
JsonLoader(const JsonArgs &)1099 OutlierDetectionConfig::FailurePercentageEjection::JsonLoader(const JsonArgs&) {
1100   static const auto* loader =
1101       JsonObjectLoader<FailurePercentageEjection>()
1102           .OptionalField("threshold", &FailurePercentageEjection::threshold)
1103           .OptionalField("enforcementPercentage",
1104                          &FailurePercentageEjection::enforcement_percentage)
1105           .OptionalField("minimumHosts",
1106                          &FailurePercentageEjection::minimum_hosts)
1107           .OptionalField("requestVolume",
1108                          &FailurePercentageEjection::request_volume)
1109           .Finish();
1110   return loader;
1111 }
1112 
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)1113 void OutlierDetectionConfig::FailurePercentageEjection::JsonPostLoad(
1114     const Json&, const JsonArgs&, ValidationErrors* errors) {
1115   if (enforcement_percentage > 100) {
1116     ValidationErrors::ScopedField field(errors, ".enforcement_percentage");
1117     errors->AddError("value must be <= 100");
1118   }
1119   if (threshold > 100) {
1120     ValidationErrors::ScopedField field(errors, ".threshold");
1121     errors->AddError("value must be <= 100");
1122   }
1123 }
1124 
JsonLoader(const JsonArgs &)1125 const JsonLoaderInterface* OutlierDetectionConfig::JsonLoader(const JsonArgs&) {
1126   static const auto* loader =
1127       JsonObjectLoader<OutlierDetectionConfig>()
1128           .OptionalField("interval", &OutlierDetectionConfig::interval)
1129           .OptionalField("baseEjectionTime",
1130                          &OutlierDetectionConfig::base_ejection_time)
1131           .OptionalField("maxEjectionTime",
1132                          &OutlierDetectionConfig::max_ejection_time)
1133           .OptionalField("maxEjectionPercent",
1134                          &OutlierDetectionConfig::max_ejection_percent)
1135           .OptionalField("successRateEjection",
1136                          &OutlierDetectionConfig::success_rate_ejection)
1137           .OptionalField("failurePercentageEjection",
1138                          &OutlierDetectionConfig::failure_percentage_ejection)
1139           .Finish();
1140   return loader;
1141 }
1142 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)1143 void OutlierDetectionConfig::JsonPostLoad(const Json& json, const JsonArgs&,
1144                                           ValidationErrors* errors) {
1145   if (json.object().find("maxEjectionTime") == json.object().end()) {
1146     max_ejection_time = std::max(base_ejection_time, Duration::Seconds(300));
1147   }
1148   if (max_ejection_percent > 100) {
1149     ValidationErrors::ScopedField field(errors, ".max_ejection_percent");
1150     errors->AddError("value must be <= 100");
1151   }
1152 }
1153 
1154 //
1155 // Plugin registration
1156 //
1157 
RegisterOutlierDetectionLbPolicy(CoreConfiguration::Builder * builder)1158 void RegisterOutlierDetectionLbPolicy(CoreConfiguration::Builder* builder) {
1159   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
1160       std::make_unique<OutlierDetectionLbFactory>());
1161 }
1162 
1163 }  // namespace grpc_core
1164