• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2022 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.util;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static java.util.concurrent.TimeUnit.NANOSECONDS;
23 
24 import com.google.common.annotations.VisibleForTesting;
25 import com.google.common.collect.ForwardingMap;
26 import com.google.common.collect.ImmutableList;
27 import com.google.common.collect.ImmutableSet;
28 import io.grpc.Attributes;
29 import io.grpc.ChannelLogger;
30 import io.grpc.ChannelLogger.ChannelLogLevel;
31 import io.grpc.ClientStreamTracer;
32 import io.grpc.ClientStreamTracer.StreamInfo;
33 import io.grpc.ConnectivityState;
34 import io.grpc.ConnectivityStateInfo;
35 import io.grpc.EquivalentAddressGroup;
36 import io.grpc.Internal;
37 import io.grpc.LoadBalancer;
38 import io.grpc.Metadata;
39 import io.grpc.Status;
40 import io.grpc.SynchronizationContext;
41 import io.grpc.SynchronizationContext.ScheduledHandle;
42 import io.grpc.internal.ServiceConfigUtil.PolicySelection;
43 import io.grpc.internal.TimeProvider;
44 import java.net.SocketAddress;
45 import java.util.ArrayList;
46 import java.util.Collection;
47 import java.util.HashMap;
48 import java.util.HashSet;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.Random;
52 import java.util.Set;
53 import java.util.concurrent.ScheduledExecutorService;
54 import java.util.concurrent.atomic.AtomicLong;
55 import javax.annotation.Nullable;
56 
57 /**
58  * Wraps a child {@code LoadBalancer} while monitoring for outlier backends and removing them from
59  * the use of the child LB.
60  *
61  * <p>This implements the outlier detection gRFC:
62  * https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md
63  */
64 @Internal
65 public final class OutlierDetectionLoadBalancer extends LoadBalancer {
66 
67   @VisibleForTesting
68   final AddressTrackerMap trackerMap;
69 
70   private final SynchronizationContext syncContext;
71   private final Helper childHelper;
72   private final GracefulSwitchLoadBalancer switchLb;
73   private TimeProvider timeProvider;
74   private final ScheduledExecutorService timeService;
75   private ScheduledHandle detectionTimerHandle;
76   private Long detectionTimerStartNanos;
77 
78   private final ChannelLogger logger;
79 
80   private static final Attributes.Key<AddressTracker> ADDRESS_TRACKER_ATTR_KEY
81       = Attributes.Key.create("addressTrackerKey");
82 
83   /**
84    * Creates a new instance of {@link OutlierDetectionLoadBalancer}.
85    */
OutlierDetectionLoadBalancer(Helper helper, TimeProvider timeProvider)86   public OutlierDetectionLoadBalancer(Helper helper, TimeProvider timeProvider) {
87     logger = helper.getChannelLogger();
88     childHelper = new ChildHelper(checkNotNull(helper, "helper"));
89     switchLb = new GracefulSwitchLoadBalancer(childHelper);
90     trackerMap = new AddressTrackerMap();
91     this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
92     this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
93     this.timeProvider = timeProvider;
94     logger.log(ChannelLogLevel.DEBUG, "OutlierDetection lb created.");
95   }
96 
97   @Override
acceptResolvedAddresses(ResolvedAddresses resolvedAddresses)98   public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
99     logger.log(ChannelLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
100     OutlierDetectionLoadBalancerConfig config
101         = (OutlierDetectionLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
102 
103     // The map should only retain entries for addresses in this latest update.
104     ArrayList<SocketAddress> addresses = new ArrayList<>();
105     for (EquivalentAddressGroup addressGroup : resolvedAddresses.getAddresses()) {
106       addresses.addAll(addressGroup.getAddresses());
107     }
108     trackerMap.keySet().retainAll(addresses);
109 
110     trackerMap.updateTrackerConfigs(config);
111 
112     // Add any new ones.
113     trackerMap.putNewTrackers(config, addresses);
114 
115     switchLb.switchTo(config.childPolicy.getProvider());
116 
117     // If outlier detection is actually configured, start a timer that will periodically try to
118     // detect outliers.
119     if (config.outlierDetectionEnabled()) {
120       Long initialDelayNanos;
121 
122       if (detectionTimerStartNanos == null) {
123         // On the first go we use the configured interval.
124         initialDelayNanos = config.intervalNanos;
125       } else {
126         // If a timer has started earlier we cancel it and use the difference between the start
127         // time and now as the interval.
128         initialDelayNanos = Math.max(0L,
129             config.intervalNanos - (timeProvider.currentTimeNanos() - detectionTimerStartNanos));
130       }
131 
132       // If a timer has been previously created we need to cancel it and reset all the call counters
133       // for a fresh start.
134       if (detectionTimerHandle != null) {
135         detectionTimerHandle.cancel();
136         trackerMap.resetCallCounters();
137       }
138 
139       detectionTimerHandle = syncContext.scheduleWithFixedDelay(new DetectionTimer(config, logger),
140           initialDelayNanos, config.intervalNanos, NANOSECONDS, timeService);
141     } else if (detectionTimerHandle != null) {
142       // Outlier detection is not configured, but we have a lingering timer. Let's cancel it and
143       // uneject any addresses we may have ejected.
144       detectionTimerHandle.cancel();
145       detectionTimerStartNanos = null;
146       trackerMap.cancelTracking();
147     }
148 
149     switchLb.handleResolvedAddresses(
150         resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config.childPolicy.getConfig())
151             .build());
152     return true;
153   }
154 
155   @Override
handleNameResolutionError(Status error)156   public void handleNameResolutionError(Status error) {
157     switchLb.handleNameResolutionError(error);
158   }
159 
160   @Override
shutdown()161   public void shutdown() {
162     switchLb.shutdown();
163   }
164 
165   /**
166    * This timer will be invoked periodically, according to configuration, and it will look for any
167    * outlier subchannels.
168    */
169   class DetectionTimer implements Runnable {
170 
171     OutlierDetectionLoadBalancerConfig config;
172     ChannelLogger logger;
173 
DetectionTimer(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger)174     DetectionTimer(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger) {
175       this.config = config;
176       this.logger = logger;
177     }
178 
179     @Override
run()180     public void run() {
181       detectionTimerStartNanos = timeProvider.currentTimeNanos();
182 
183       trackerMap.swapCounters();
184 
185       for (OutlierEjectionAlgorithm algo : OutlierEjectionAlgorithm.forConfig(config, logger)) {
186         algo.ejectOutliers(trackerMap, detectionTimerStartNanos);
187       }
188 
189       trackerMap.maybeUnejectOutliers(detectionTimerStartNanos);
190     }
191   }
192 
193   /**
194    * This child helper wraps the provided helper so that it can hand out wrapped {@link
195    * OutlierDetectionSubchannel}s and manage the address info map.
196    */
197   class ChildHelper extends ForwardingLoadBalancerHelper {
198 
199     private Helper delegate;
200 
ChildHelper(Helper delegate)201     ChildHelper(Helper delegate) {
202       this.delegate = delegate;
203     }
204 
205     @Override
delegate()206     protected Helper delegate() {
207       return delegate;
208     }
209 
210     @Override
createSubchannel(CreateSubchannelArgs args)211     public Subchannel createSubchannel(CreateSubchannelArgs args) {
212       // Subchannels are wrapped so that we can monitor call results and to trigger failures when
213       // we decide to eject the subchannel.
214       OutlierDetectionSubchannel subchannel = new OutlierDetectionSubchannel(
215           delegate.createSubchannel(args));
216 
217       // If the subchannel is associated with a single address that is also already in the map
218       // the subchannel will be added to the map and be included in outlier detection.
219       List<EquivalentAddressGroup> addressGroups = args.getAddresses();
220       if (hasSingleAddress(addressGroups)
221           && trackerMap.containsKey(addressGroups.get(0).getAddresses().get(0))) {
222         AddressTracker tracker = trackerMap.get(addressGroups.get(0).getAddresses().get(0));
223         tracker.addSubchannel(subchannel);
224 
225         // If this address has already been ejected, we need to immediately eject this Subchannel.
226         if (tracker.ejectionTimeNanos != null) {
227           subchannel.eject();
228         }
229       }
230 
231       return subchannel;
232     }
233 
234     @Override
updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker)235     public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
236       delegate.updateBalancingState(newState, new OutlierDetectionPicker(newPicker));
237     }
238   }
239 
240   class OutlierDetectionSubchannel extends ForwardingSubchannel {
241 
242     private final Subchannel delegate;
243     private AddressTracker addressTracker;
244     private boolean ejected;
245     private ConnectivityStateInfo lastSubchannelState;
246     private SubchannelStateListener subchannelStateListener;
247     private final ChannelLogger logger;
248 
OutlierDetectionSubchannel(Subchannel delegate)249     OutlierDetectionSubchannel(Subchannel delegate) {
250       this.delegate = delegate;
251       this.logger = delegate.getChannelLogger();
252     }
253 
254     @Override
start(SubchannelStateListener listener)255     public void start(SubchannelStateListener listener) {
256       subchannelStateListener = listener;
257       super.start(new OutlierDetectionSubchannelStateListener(listener));
258     }
259 
260     @Override
getAttributes()261     public Attributes getAttributes() {
262       if (addressTracker != null) {
263         return delegate.getAttributes().toBuilder().set(ADDRESS_TRACKER_ATTR_KEY, addressTracker)
264             .build();
265       } else {
266         return delegate.getAttributes();
267       }
268     }
269 
270     @Override
updateAddresses(List<EquivalentAddressGroup> addressGroups)271     public void updateAddresses(List<EquivalentAddressGroup> addressGroups) {
272       // Outlier detection only supports subchannels with a single address, but the list of
273       // addressGroups associated with a subchannel can change at any time, so we need to react to
274       // changes in the address list plurality.
275 
276       // No change in address plurality, we replace the single one with a new one.
277       if (hasSingleAddress(getAllAddresses()) && hasSingleAddress(addressGroups)) {
278         // Remove the current subchannel from the old address it is associated with in the map.
279         if (trackerMap.containsValue(addressTracker)) {
280           addressTracker.removeSubchannel(this);
281         }
282 
283         // If the map has an entry for the new address, we associate this subchannel with it.
284         SocketAddress address = addressGroups.get(0).getAddresses().get(0);
285         if (trackerMap.containsKey(address)) {
286           trackerMap.get(address).addSubchannel(this);
287         }
288       } else if (hasSingleAddress(getAllAddresses()) && !hasSingleAddress(addressGroups)) {
289         // We go from a single address to having multiple, making this subchannel uneligible for
290         // outlier detection. Remove it from all trackers and reset the call counters of all the
291         // associated trackers.
292         // Remove the current subchannel from the old address it is associated with in the map.
293         if (trackerMap.containsKey(getAddresses().getAddresses().get(0))) {
294           AddressTracker tracker = trackerMap.get(getAddresses().getAddresses().get(0));
295           tracker.removeSubchannel(this);
296           tracker.resetCallCounters();
297         }
298       } else if (!hasSingleAddress(getAllAddresses()) && hasSingleAddress(addressGroups)) {
299         // We go from, previously uneligble, multiple address mode to a single address. If the map
300         // has an entry for the new address, we associate this subchannel with it.
301         SocketAddress address = addressGroups.get(0).getAddresses().get(0);
302         if (trackerMap.containsKey(address)) {
303           AddressTracker tracker = trackerMap.get(address);
304           tracker.addSubchannel(this);
305         }
306       }
307 
308       // We could also have multiple addressGroups and get an update for multiple new ones. This is
309       // a no-op as we will just continue to ignore multiple address subchannels.
310 
311       delegate.updateAddresses(addressGroups);
312     }
313 
314     /**
315      * If the {@link Subchannel} is considered for outlier detection the associated {@link
316      * AddressTracker} should be set.
317      */
setAddressTracker(AddressTracker addressTracker)318     void setAddressTracker(AddressTracker addressTracker) {
319       this.addressTracker = addressTracker;
320     }
321 
clearAddressTracker()322     void clearAddressTracker() {
323       this.addressTracker = null;
324     }
325 
eject()326     void eject() {
327       ejected = true;
328       subchannelStateListener.onSubchannelState(
329           ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
330       logger.log(ChannelLogLevel.INFO, "Subchannel ejected: {0}", this);
331     }
332 
uneject()333     void uneject() {
334       ejected = false;
335       if (lastSubchannelState != null) {
336         subchannelStateListener.onSubchannelState(lastSubchannelState);
337         logger.log(ChannelLogLevel.INFO, "Subchannel unejected: {0}", this);
338       }
339     }
340 
isEjected()341     boolean isEjected() {
342       return ejected;
343     }
344 
345     @Override
delegate()346     protected Subchannel delegate() {
347       return delegate;
348     }
349 
350     /**
351      * Wraps the actual listener so that state changes from the actual one can be intercepted.
352      */
353     class OutlierDetectionSubchannelStateListener implements SubchannelStateListener {
354 
355       private final SubchannelStateListener delegate;
356 
OutlierDetectionSubchannelStateListener(SubchannelStateListener delegate)357       OutlierDetectionSubchannelStateListener(SubchannelStateListener delegate) {
358         this.delegate = delegate;
359       }
360 
361       @Override
onSubchannelState(ConnectivityStateInfo newState)362       public void onSubchannelState(ConnectivityStateInfo newState) {
363         lastSubchannelState = newState;
364         if (!ejected) {
365           delegate.onSubchannelState(newState);
366         }
367       }
368     }
369 
370     @Override
toString()371     public String toString() {
372       return "OutlierDetectionSubchannel{"
373               + "addresses=" + delegate.getAllAddresses()
374               + '}';
375     }
376   }
377 
378 
379   /**
380    * This picker delegates the actual picking logic to a wrapped delegate, but associates a {@link
381    * ClientStreamTracer} with each pick to track the results of each subchannel stream.
382    */
383   class OutlierDetectionPicker extends SubchannelPicker {
384 
385     private final SubchannelPicker delegate;
386 
OutlierDetectionPicker(SubchannelPicker delegate)387     OutlierDetectionPicker(SubchannelPicker delegate) {
388       this.delegate = delegate;
389     }
390 
391     @Override
pickSubchannel(PickSubchannelArgs args)392     public PickResult pickSubchannel(PickSubchannelArgs args) {
393       PickResult pickResult = delegate.pickSubchannel(args);
394 
395       Subchannel subchannel = pickResult.getSubchannel();
396       if (subchannel != null) {
397         return PickResult.withSubchannel(subchannel,
398             new ResultCountingClientStreamTracerFactory(
399                 subchannel.getAttributes().get(ADDRESS_TRACKER_ATTR_KEY)));
400       }
401 
402       return pickResult;
403     }
404 
405     /**
406      * Builds instances of {@link ResultCountingClientStreamTracer}.
407      */
408     class ResultCountingClientStreamTracerFactory extends ClientStreamTracer.Factory {
409 
410       private final AddressTracker tracker;
411 
ResultCountingClientStreamTracerFactory(AddressTracker tracker)412       ResultCountingClientStreamTracerFactory(AddressTracker tracker) {
413         this.tracker = tracker;
414       }
415 
416       @Override
newClientStreamTracer(StreamInfo info, Metadata headers)417       public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
418         return new ResultCountingClientStreamTracer(tracker);
419       }
420     }
421 
422     /**
423      * Counts the results (successful/unsuccessful) of a particular {@link
424      * OutlierDetectionSubchannel}s streams and increments the counter in the associated {@link
425      * AddressTracker}.
426      */
427     class ResultCountingClientStreamTracer extends ClientStreamTracer {
428 
429       AddressTracker tracker;
430 
ResultCountingClientStreamTracer(AddressTracker tracker)431       public ResultCountingClientStreamTracer(AddressTracker tracker) {
432         this.tracker = tracker;
433       }
434 
435       @Override
streamClosed(Status status)436       public void streamClosed(Status status) {
437         tracker.incrementCallCount(status.isOk());
438       }
439     }
440   }
441 
442   /**
443    * Tracks additional information about a set of equivalent addresses needed for outlier
444    * detection.
445    */
446   static class AddressTracker {
447 
448     private OutlierDetectionLoadBalancerConfig config;
449     // Marked as volatile to assure that when the inactive counter is swapped in as the new active
450     // one, all threads see the change and don't hold on to a reference to the now inactive counter.
451     private volatile CallCounter activeCallCounter = new CallCounter();
452     private CallCounter inactiveCallCounter = new CallCounter();
453     private Long ejectionTimeNanos;
454     private int ejectionTimeMultiplier;
455     private final Set<OutlierDetectionSubchannel> subchannels = new HashSet<>();
456 
AddressTracker(OutlierDetectionLoadBalancerConfig config)457     AddressTracker(OutlierDetectionLoadBalancerConfig config) {
458       this.config = config;
459     }
460 
setConfig(OutlierDetectionLoadBalancerConfig config)461     void setConfig(OutlierDetectionLoadBalancerConfig config) {
462       this.config = config;
463     }
464 
465     /**
466      * Adds a subchannel to the tracker, while assuring that the subchannel ejection status is
467      * updated to match the tracker's if needed.
468      */
addSubchannel(OutlierDetectionSubchannel subchannel)469     boolean addSubchannel(OutlierDetectionSubchannel subchannel) {
470       // Make sure that the subchannel is in the same ejection state as the new tracker it is
471       // associated with.
472       if (subchannelsEjected() && !subchannel.isEjected()) {
473         subchannel.eject();
474       } else if (!subchannelsEjected() && subchannel.isEjected()) {
475         subchannel.uneject();
476       }
477       subchannel.setAddressTracker(this);
478       return subchannels.add(subchannel);
479     }
480 
removeSubchannel(OutlierDetectionSubchannel subchannel)481     boolean removeSubchannel(OutlierDetectionSubchannel subchannel) {
482       subchannel.clearAddressTracker();
483       return subchannels.remove(subchannel);
484     }
485 
containsSubchannel(OutlierDetectionSubchannel subchannel)486     boolean containsSubchannel(OutlierDetectionSubchannel subchannel) {
487       return subchannels.contains(subchannel);
488     }
489 
490     @VisibleForTesting
getSubchannels()491     Set<OutlierDetectionSubchannel> getSubchannels() {
492       return ImmutableSet.copyOf(subchannels);
493     }
494 
incrementCallCount(boolean success)495     void incrementCallCount(boolean success) {
496       // If neither algorithm is configured, no point in incrementing counters.
497       if (config.successRateEjection == null && config.failurePercentageEjection == null) {
498         return;
499       }
500 
501       if (success) {
502         activeCallCounter.successCount.getAndIncrement();
503       } else {
504         activeCallCounter.failureCount.getAndIncrement();
505       }
506     }
507 
508     @VisibleForTesting
activeVolume()509     long activeVolume() {
510       return activeCallCounter.successCount.get() + activeCallCounter.failureCount.get();
511     }
512 
inactiveVolume()513     long inactiveVolume() {
514       return inactiveCallCounter.successCount.get() + inactiveCallCounter.failureCount.get();
515     }
516 
successRate()517     double successRate() {
518       return ((double) inactiveCallCounter.successCount.get()) / inactiveVolume();
519     }
520 
failureRate()521     double failureRate() {
522       return ((double)inactiveCallCounter.failureCount.get()) / inactiveVolume();
523     }
524 
resetCallCounters()525     void resetCallCounters() {
526       activeCallCounter.reset();
527       inactiveCallCounter.reset();
528     }
529 
decrementEjectionTimeMultiplier()530     void decrementEjectionTimeMultiplier() {
531       // The multiplier should not go negative.
532       ejectionTimeMultiplier = ejectionTimeMultiplier == 0 ? 0 : ejectionTimeMultiplier - 1;
533     }
534 
resetEjectionTimeMultiplier()535     void resetEjectionTimeMultiplier() {
536       ejectionTimeMultiplier = 0;
537     }
538 
539     /**
540      * Swaps the active and inactive counters.
541      *
542      * <p>Note that this method is not thread safe as the swap is not done atomically. This is
543      * expected to only be called from the timer that is scheduled at a fixed delay, assuring that
544      * only one timer is active at a time.
545      */
swapCounters()546     void swapCounters() {
547       inactiveCallCounter.reset();
548       CallCounter tempCounter = activeCallCounter;
549       activeCallCounter = inactiveCallCounter;
550       inactiveCallCounter = tempCounter;
551     }
552 
ejectSubchannels(long ejectionTimeNanos)553     void ejectSubchannels(long ejectionTimeNanos) {
554       this.ejectionTimeNanos = ejectionTimeNanos;
555       ejectionTimeMultiplier++;
556       for (OutlierDetectionSubchannel subchannel : subchannels) {
557         subchannel.eject();
558       }
559     }
560 
561     /**
562      * Uneject a currently ejected address.
563      */
unejectSubchannels()564     void unejectSubchannels() {
565       checkState(ejectionTimeNanos != null, "not currently ejected");
566       ejectionTimeNanos = null;
567       for (OutlierDetectionSubchannel subchannel : subchannels) {
568         subchannel.uneject();
569       }
570     }
571 
subchannelsEjected()572     boolean subchannelsEjected() {
573       return ejectionTimeNanos != null;
574     }
575 
maxEjectionTimeElapsed(long currentTimeNanos)576     public boolean maxEjectionTimeElapsed(long currentTimeNanos) {
577       // The instant in time beyond which the address should no longer be ejected. Also making sure
578       // we honor any maximum ejection time setting.
579       long maxEjectionDurationSecs
580           = Math.max(config.baseEjectionTimeNanos, config.maxEjectionTimeNanos);
581       long maxEjectionTimeNanos =
582           ejectionTimeNanos + Math.min(
583               config.baseEjectionTimeNanos * ejectionTimeMultiplier,
584               maxEjectionDurationSecs);
585 
586       return currentTimeNanos > maxEjectionTimeNanos;
587     }
588 
589     /** Tracks both successful and failed call counts. */
590     private static class CallCounter {
591       AtomicLong successCount = new AtomicLong();
592       AtomicLong failureCount = new AtomicLong();
593 
reset()594       void reset() {
595         successCount.set(0);
596         failureCount.set(0);
597       }
598     }
599 
600     @Override
toString()601     public String toString() {
602       return "AddressTracker{"
603               + "subchannels=" + subchannels
604               + '}';
605     }
606   }
607 
608   /**
609    * Maintains a mapping from addresses to their trackers.
610    */
611   static class AddressTrackerMap extends ForwardingMap<SocketAddress, AddressTracker> {
612     private final Map<SocketAddress, AddressTracker> trackerMap;
613 
AddressTrackerMap()614     AddressTrackerMap() {
615       trackerMap = new HashMap<>();
616     }
617 
618     @Override
delegate()619     protected Map<SocketAddress, AddressTracker> delegate() {
620       return trackerMap;
621     }
622 
updateTrackerConfigs(OutlierDetectionLoadBalancerConfig config)623     void updateTrackerConfigs(OutlierDetectionLoadBalancerConfig config) {
624       for (AddressTracker tracker: trackerMap.values()) {
625         tracker.setConfig(config);
626       }
627     }
628 
629     /** Adds a new tracker for every given address. */
putNewTrackers(OutlierDetectionLoadBalancerConfig config, Collection<SocketAddress> addresses)630     void putNewTrackers(OutlierDetectionLoadBalancerConfig config,
631         Collection<SocketAddress> addresses) {
632       for (SocketAddress address : addresses) {
633         if (!trackerMap.containsKey(address)) {
634           trackerMap.put(address, new AddressTracker(config));
635         }
636       }
637     }
638 
639     /** Resets the call counters for all the trackers in the map. */
resetCallCounters()640     void resetCallCounters() {
641       for (AddressTracker tracker : trackerMap.values()) {
642         tracker.resetCallCounters();
643       }
644     }
645 
646     /**
647      * When OD gets disabled we need to uneject any subchannels that may have been ejected and
648      * to reset the ejection time multiplier.
649      */
cancelTracking()650     void cancelTracking() {
651       for (AddressTracker tracker : trackerMap.values()) {
652         if (tracker.subchannelsEjected()) {
653           tracker.unejectSubchannels();
654         }
655         tracker.resetEjectionTimeMultiplier();
656       }
657     }
658 
659     /** Swaps the active and inactive counters for each tracker. */
swapCounters()660     void swapCounters() {
661       for (AddressTracker tracker : trackerMap.values()) {
662         tracker.swapCounters();
663       }
664     }
665 
666     /**
667      * At the end of a timer run we need to decrement the ejection time multiplier for trackers
668      * that don't have ejected subchannels and uneject ones that have spent the maximum ejection
669      * time allowed.
670      */
maybeUnejectOutliers(Long detectionTimerStartNanos)671     void maybeUnejectOutliers(Long detectionTimerStartNanos) {
672       for (AddressTracker tracker : trackerMap.values()) {
673         if (!tracker.subchannelsEjected()) {
674           tracker.decrementEjectionTimeMultiplier();
675         }
676 
677         if (tracker.subchannelsEjected() && tracker.maxEjectionTimeElapsed(
678             detectionTimerStartNanos)) {
679           tracker.unejectSubchannels();
680         }
681       }
682     }
683 
684     /**
685      * How many percent of the addresses have been ejected.
686      */
ejectionPercentage()687     double ejectionPercentage() {
688       if (trackerMap.isEmpty()) {
689         return 0;
690       }
691       int totalAddresses = 0;
692       int ejectedAddresses = 0;
693       for (AddressTracker tracker : trackerMap.values()) {
694         totalAddresses++;
695         if (tracker.subchannelsEjected()) {
696           ejectedAddresses++;
697         }
698       }
699       return ((double)ejectedAddresses / totalAddresses) * 100;
700     }
701   }
702 
703 
704   /**
705    * Implementations provide different ways of ejecting outlier addresses..
706    */
707   interface OutlierEjectionAlgorithm {
708 
709     /** Eject any outlier addresses. */
ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos)710     void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos);
711 
712     /** Builds a list of algorithms that are enabled in the given config. */
713     @Nullable
forConfig(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger)714     static List<OutlierEjectionAlgorithm> forConfig(OutlierDetectionLoadBalancerConfig config,
715                                                     ChannelLogger logger) {
716       ImmutableList.Builder<OutlierEjectionAlgorithm> algoListBuilder = ImmutableList.builder();
717       if (config.successRateEjection != null) {
718         algoListBuilder.add(new SuccessRateOutlierEjectionAlgorithm(config, logger));
719       }
720       if (config.failurePercentageEjection != null) {
721         algoListBuilder.add(new FailurePercentageOutlierEjectionAlgorithm(config, logger));
722       }
723       return algoListBuilder.build();
724     }
725   }
726 
727   /**
728    * This algorithm ejects addresses that don't maintain a required rate of successful calls. The
729    * required rate is not fixed, but is based on the mean and standard deviation of the success
730    * rates of all of the addresses.
731    */
732   static class SuccessRateOutlierEjectionAlgorithm implements OutlierEjectionAlgorithm {
733 
734     private final OutlierDetectionLoadBalancerConfig config;
735 
736     private final ChannelLogger logger;
737 
SuccessRateOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger)738     SuccessRateOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config,
739                                         ChannelLogger logger) {
740       checkArgument(config.successRateEjection != null, "success rate ejection config is null");
741       this.config = config;
742       this.logger = logger;
743     }
744 
745     @Override
ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos)746     public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) {
747 
748       // Only consider addresses that have the minimum request volume specified in the config.
749       List<AddressTracker> trackersWithVolume = trackersWithVolume(trackerMap,
750           config.successRateEjection.requestVolume);
751       // If we don't have enough addresses with significant volume then there's nothing to do.
752       if (trackersWithVolume.size() < config.successRateEjection.minimumHosts
753           || trackersWithVolume.size() == 0) {
754         return;
755       }
756 
757       // Calculate mean and standard deviation of the fractions of successful calls.
758       List<Double> successRates = new ArrayList<>();
759       for (AddressTracker tracker : trackersWithVolume) {
760         successRates.add(tracker.successRate());
761       }
762       double mean = mean(successRates);
763       double stdev = standardDeviation(successRates, mean);
764 
765       double requiredSuccessRate =
766           mean - stdev * (config.successRateEjection.stdevFactor / 1000f);
767 
768       for (AddressTracker tracker : trackersWithVolume) {
769         // If we are above or equal to the max ejection percentage, don't eject any more. This will
770         // allow the total ejections to go one above the max, but at the same time it assures at
771         // least one ejection, which the spec calls for. This behavior matches what Envoy proxy
772         // does.
773         if (trackerMap.ejectionPercentage() >= config.maxEjectionPercent) {
774           return;
775         }
776 
777         // If success rate is below the threshold, eject the address.
778         if (tracker.successRate() < requiredSuccessRate) {
779           logger.log(ChannelLogLevel.DEBUG,
780                   "SuccessRate algorithm detected outlier: {0}. "
781                           + "Parameters: successRate={1}, mean={2}, stdev={3}, "
782                           + "requiredSuccessRate={4}",
783                   tracker, tracker.successRate(),  mean, stdev, requiredSuccessRate);
784           // Only eject some addresses based on the enforcement percentage.
785           if (new Random().nextInt(100) < config.successRateEjection.enforcementPercentage) {
786             tracker.ejectSubchannels(ejectionTimeNanos);
787           }
788         }
789       }
790     }
791 
792     /** Calculates the mean of the given values. */
793     @VisibleForTesting
mean(Collection<Double> values)794     static double mean(Collection<Double> values) {
795       double totalValue = 0;
796       for (double value : values) {
797         totalValue += value;
798       }
799 
800       return totalValue / values.size();
801     }
802 
803     /** Calculates the standard deviation for the given values and their mean. */
804     @VisibleForTesting
standardDeviation(Collection<Double> values, double mean)805     static double standardDeviation(Collection<Double> values, double mean) {
806       double squaredDifferenceSum = 0;
807       for (double value : values) {
808         double difference = value - mean;
809         squaredDifferenceSum += difference * difference;
810       }
811       double variance = squaredDifferenceSum / values.size();
812 
813       return Math.sqrt(variance);
814     }
815   }
816 
817   static class FailurePercentageOutlierEjectionAlgorithm implements OutlierEjectionAlgorithm {
818 
819     private final OutlierDetectionLoadBalancerConfig config;
820 
821     private final ChannelLogger logger;
822 
FailurePercentageOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger)823     FailurePercentageOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config,
824                                               ChannelLogger logger) {
825       this.config = config;
826       this.logger = logger;
827     }
828 
829     @Override
ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos)830     public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) {
831 
832       // Only consider addresses that have the minimum request volume specified in the config.
833       List<AddressTracker> trackersWithVolume = trackersWithVolume(trackerMap,
834           config.failurePercentageEjection.requestVolume);
835       // If we don't have enough addresses with significant volume then there's nothing to do.
836       if (trackersWithVolume.size() < config.failurePercentageEjection.minimumHosts
837           || trackersWithVolume.size() == 0) {
838         return;
839       }
840 
841       // If this address does not have enough volume to be considered, skip to the next one.
842       for (AddressTracker tracker : trackersWithVolume) {
843         // If we are above or equal to the max ejection percentage, don't eject any more. This will
844         // allow the total ejections to go one above the max, but at the same time it assures at
845         // least one ejection, which the spec calls for. This behavior matches what Envoy proxy
846         // does.
847         if (trackerMap.ejectionPercentage() >= config.maxEjectionPercent) {
848           return;
849         }
850 
851         if (tracker.inactiveVolume() < config.failurePercentageEjection.requestVolume) {
852           continue;
853         }
854 
855         // If the failure rate is above the threshold, we should eject...
856         double maxFailureRate = ((double)config.failurePercentageEjection.threshold) / 100;
857         if (tracker.failureRate() > maxFailureRate) {
858           logger.log(ChannelLogLevel.DEBUG,
859                   "FailurePercentage algorithm detected outlier: {0}, failureRate={1}",
860                   tracker, tracker.failureRate());
861           // ...but only enforce this based on the enforcement percentage.
862           if (new Random().nextInt(100) < config.failurePercentageEjection.enforcementPercentage) {
863             tracker.ejectSubchannels(ejectionTimeNanos);
864           }
865         }
866       }
867     }
868   }
869 
870   /** Returns only the trackers that have the minimum configured volume to be considered. */
trackersWithVolume(AddressTrackerMap trackerMap, int volume)871   private static List<AddressTracker> trackersWithVolume(AddressTrackerMap trackerMap,
872       int volume) {
873     List<AddressTracker> trackersWithVolume = new ArrayList<>();
874     for (AddressTracker tracker : trackerMap.values()) {
875       if (tracker.inactiveVolume() >= volume) {
876         trackersWithVolume.add(tracker);
877       }
878     }
879     return trackersWithVolume;
880   }
881 
882   /** Counts how many addresses are in a given address group. */
hasSingleAddress(List<EquivalentAddressGroup> addressGroups)883   private static boolean hasSingleAddress(List<EquivalentAddressGroup> addressGroups) {
884     int addressCount = 0;
885     for (EquivalentAddressGroup addressGroup : addressGroups) {
886       addressCount += addressGroup.getAddresses().size();
887       if (addressCount > 1) {
888         return false;
889       }
890     }
891     return true;
892   }
893 
894   /**
895    * The configuration for {@link OutlierDetectionLoadBalancer}.
896    */
897   public static final class OutlierDetectionLoadBalancerConfig {
898 
899     public final Long intervalNanos;
900     public final Long baseEjectionTimeNanos;
901     public final Long maxEjectionTimeNanos;
902     public final Integer maxEjectionPercent;
903     public final SuccessRateEjection successRateEjection;
904     public final FailurePercentageEjection failurePercentageEjection;
905     public final PolicySelection childPolicy;
906 
OutlierDetectionLoadBalancerConfig(Long intervalNanos, Long baseEjectionTimeNanos, Long maxEjectionTimeNanos, Integer maxEjectionPercent, SuccessRateEjection successRateEjection, FailurePercentageEjection failurePercentageEjection, PolicySelection childPolicy)907     private OutlierDetectionLoadBalancerConfig(Long intervalNanos,
908         Long baseEjectionTimeNanos,
909         Long maxEjectionTimeNanos,
910         Integer maxEjectionPercent,
911         SuccessRateEjection successRateEjection,
912         FailurePercentageEjection failurePercentageEjection,
913         PolicySelection childPolicy) {
914       this.intervalNanos = intervalNanos;
915       this.baseEjectionTimeNanos = baseEjectionTimeNanos;
916       this.maxEjectionTimeNanos = maxEjectionTimeNanos;
917       this.maxEjectionPercent = maxEjectionPercent;
918       this.successRateEjection = successRateEjection;
919       this.failurePercentageEjection = failurePercentageEjection;
920       this.childPolicy = childPolicy;
921     }
922 
923     /** Builds a new {@link OutlierDetectionLoadBalancerConfig}. */
924     public static class Builder {
925       Long intervalNanos = 10_000_000_000L; // 10s
926       Long baseEjectionTimeNanos = 30_000_000_000L; // 30s
927       Long maxEjectionTimeNanos = 300_000_000_000L; // 300s
928       Integer maxEjectionPercent = 10;
929       SuccessRateEjection successRateEjection;
930       FailurePercentageEjection failurePercentageEjection;
931       PolicySelection childPolicy;
932 
933       /** The interval between outlier detection sweeps. */
setIntervalNanos(Long intervalNanos)934       public Builder setIntervalNanos(Long intervalNanos) {
935         checkArgument(intervalNanos != null);
936         this.intervalNanos = intervalNanos;
937         return this;
938       }
939 
940       /** The base time an address is ejected for. */
setBaseEjectionTimeNanos(Long baseEjectionTimeNanos)941       public Builder setBaseEjectionTimeNanos(Long baseEjectionTimeNanos) {
942         checkArgument(baseEjectionTimeNanos != null);
943         this.baseEjectionTimeNanos = baseEjectionTimeNanos;
944         return this;
945       }
946 
947       /** The longest time an address can be ejected. */
setMaxEjectionTimeNanos(Long maxEjectionTimeNanos)948       public Builder setMaxEjectionTimeNanos(Long maxEjectionTimeNanos) {
949         checkArgument(maxEjectionTimeNanos != null);
950         this.maxEjectionTimeNanos = maxEjectionTimeNanos;
951         return this;
952       }
953 
954       /** The algorithm agnostic maximum percentage of addresses that can be ejected. */
setMaxEjectionPercent(Integer maxEjectionPercent)955       public Builder setMaxEjectionPercent(Integer maxEjectionPercent) {
956         checkArgument(maxEjectionPercent != null);
957         this.maxEjectionPercent = maxEjectionPercent;
958         return this;
959       }
960 
961       /** Set to enable success rate ejection. */
setSuccessRateEjection( SuccessRateEjection successRateEjection)962       public Builder setSuccessRateEjection(
963           SuccessRateEjection successRateEjection) {
964         this.successRateEjection = successRateEjection;
965         return this;
966       }
967 
968       /** Set to enable failure percentage ejection. */
setFailurePercentageEjection( FailurePercentageEjection failurePercentageEjection)969       public Builder setFailurePercentageEjection(
970           FailurePercentageEjection failurePercentageEjection) {
971         this.failurePercentageEjection = failurePercentageEjection;
972         return this;
973       }
974 
975       /** Sets the child policy the {@link OutlierDetectionLoadBalancer} delegates to. */
setChildPolicy(PolicySelection childPolicy)976       public Builder setChildPolicy(PolicySelection childPolicy) {
977         checkState(childPolicy != null);
978         this.childPolicy = childPolicy;
979         return this;
980       }
981 
982       /** Builds a new instance of {@link OutlierDetectionLoadBalancerConfig}. */
build()983       public OutlierDetectionLoadBalancerConfig build() {
984         checkState(childPolicy != null);
985         return new OutlierDetectionLoadBalancerConfig(intervalNanos, baseEjectionTimeNanos,
986             maxEjectionTimeNanos, maxEjectionPercent, successRateEjection,
987             failurePercentageEjection, childPolicy);
988       }
989     }
990 
991     /** The configuration for success rate ejection. */
992     public static class SuccessRateEjection {
993 
994       public final Integer stdevFactor;
995       public final Integer enforcementPercentage;
996       public final Integer minimumHosts;
997       public final Integer requestVolume;
998 
SuccessRateEjection(Integer stdevFactor, Integer enforcementPercentage, Integer minimumHosts, Integer requestVolume)999       SuccessRateEjection(Integer stdevFactor, Integer enforcementPercentage, Integer minimumHosts,
1000           Integer requestVolume) {
1001         this.stdevFactor = stdevFactor;
1002         this.enforcementPercentage = enforcementPercentage;
1003         this.minimumHosts = minimumHosts;
1004         this.requestVolume = requestVolume;
1005       }
1006 
1007       /** Builds new instances of {@link SuccessRateEjection}. */
1008       public static final class Builder {
1009 
1010         Integer stdevFactor = 1900;
1011         Integer enforcementPercentage = 100;
1012         Integer minimumHosts = 5;
1013         Integer requestVolume = 100;
1014 
1015         /** The product of this and the standard deviation of success rates determine the ejection
1016          * threshold.
1017          */
setStdevFactor(Integer stdevFactor)1018         public Builder setStdevFactor(Integer stdevFactor) {
1019           checkArgument(stdevFactor != null);
1020           this.stdevFactor = stdevFactor;
1021           return this;
1022         }
1023 
1024         /** Only eject this percentage of outliers. */
setEnforcementPercentage(Integer enforcementPercentage)1025         public Builder setEnforcementPercentage(Integer enforcementPercentage) {
1026           checkArgument(enforcementPercentage != null);
1027           checkArgument(enforcementPercentage >= 0 && enforcementPercentage <= 100);
1028           this.enforcementPercentage = enforcementPercentage;
1029           return this;
1030         }
1031 
1032         /** The minimum amount of hosts needed for success rate ejection. */
setMinimumHosts(Integer minimumHosts)1033         public Builder setMinimumHosts(Integer minimumHosts) {
1034           checkArgument(minimumHosts != null);
1035           checkArgument(minimumHosts >= 0);
1036           this.minimumHosts = minimumHosts;
1037           return this;
1038         }
1039 
1040         /** The minimum address request volume to be considered for success rate ejection. */
setRequestVolume(Integer requestVolume)1041         public Builder setRequestVolume(Integer requestVolume) {
1042           checkArgument(requestVolume != null);
1043           checkArgument(requestVolume >= 0);
1044           this.requestVolume = requestVolume;
1045           return this;
1046         }
1047 
1048         /** Builds a new instance of {@link SuccessRateEjection}. */
build()1049         public SuccessRateEjection build() {
1050           return new SuccessRateEjection(stdevFactor, enforcementPercentage, minimumHosts,
1051               requestVolume);
1052         }
1053       }
1054     }
1055 
1056     /** The configuration for failure percentage ejection. */
1057     public static class FailurePercentageEjection {
1058       public final Integer threshold;
1059       public final Integer enforcementPercentage;
1060       public final Integer minimumHosts;
1061       public final Integer requestVolume;
1062 
FailurePercentageEjection(Integer threshold, Integer enforcementPercentage, Integer minimumHosts, Integer requestVolume)1063       FailurePercentageEjection(Integer threshold, Integer enforcementPercentage,
1064           Integer minimumHosts, Integer requestVolume) {
1065         this.threshold = threshold;
1066         this.enforcementPercentage = enforcementPercentage;
1067         this.minimumHosts = minimumHosts;
1068         this.requestVolume = requestVolume;
1069       }
1070 
1071       /** For building new {@link FailurePercentageEjection} instances. */
1072       public static class Builder {
1073         Integer threshold = 85;
1074         Integer enforcementPercentage = 100;
1075         Integer minimumHosts = 5;
1076         Integer requestVolume = 50;
1077 
1078         /** The failure percentage that will result in an address being considered an outlier. */
setThreshold(Integer threshold)1079         public Builder setThreshold(Integer threshold) {
1080           checkArgument(threshold != null);
1081           checkArgument(threshold >= 0 && threshold <= 100);
1082           this.threshold = threshold;
1083           return this;
1084         }
1085 
1086         /** Only eject this percentage of outliers. */
setEnforcementPercentage(Integer enforcementPercentage)1087         public Builder setEnforcementPercentage(Integer enforcementPercentage) {
1088           checkArgument(enforcementPercentage != null);
1089           checkArgument(enforcementPercentage >= 0 && enforcementPercentage <= 100);
1090           this.enforcementPercentage = enforcementPercentage;
1091           return this;
1092         }
1093 
1094         /** The minimum amount of host for failure percentage ejection to be enabled. */
setMinimumHosts(Integer minimumHosts)1095         public Builder setMinimumHosts(Integer minimumHosts) {
1096           checkArgument(minimumHosts != null);
1097           checkArgument(minimumHosts >= 0);
1098           this.minimumHosts = minimumHosts;
1099           return this;
1100         }
1101 
1102         /**
1103          * The request volume required for an address to be considered for failure percentage
1104          * ejection.
1105          */
setRequestVolume(Integer requestVolume)1106         public Builder setRequestVolume(Integer requestVolume) {
1107           checkArgument(requestVolume != null);
1108           checkArgument(requestVolume >= 0);
1109           this.requestVolume = requestVolume;
1110           return this;
1111         }
1112 
1113         /** Builds a new instance of {@link FailurePercentageEjection}. */
build()1114         public FailurePercentageEjection build() {
1115           return new FailurePercentageEjection(threshold, enforcementPercentage, minimumHosts,
1116               requestVolume);
1117         }
1118       }
1119     }
1120 
1121     /** Determine if any outlier detection algorithms are enabled in the config. */
outlierDetectionEnabled()1122     boolean outlierDetectionEnabled() {
1123       return successRateEjection != null || failurePercentageEjection != null;
1124     }
1125   }
1126 }
1127