1 /* 2 * Copyright 2022 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.util; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static java.util.concurrent.TimeUnit.NANOSECONDS; 23 24 import com.google.common.annotations.VisibleForTesting; 25 import com.google.common.collect.ForwardingMap; 26 import com.google.common.collect.ImmutableList; 27 import com.google.common.collect.ImmutableSet; 28 import io.grpc.Attributes; 29 import io.grpc.ChannelLogger; 30 import io.grpc.ChannelLogger.ChannelLogLevel; 31 import io.grpc.ClientStreamTracer; 32 import io.grpc.ClientStreamTracer.StreamInfo; 33 import io.grpc.ConnectivityState; 34 import io.grpc.ConnectivityStateInfo; 35 import io.grpc.EquivalentAddressGroup; 36 import io.grpc.Internal; 37 import io.grpc.LoadBalancer; 38 import io.grpc.Metadata; 39 import io.grpc.Status; 40 import io.grpc.SynchronizationContext; 41 import io.grpc.SynchronizationContext.ScheduledHandle; 42 import io.grpc.internal.ServiceConfigUtil.PolicySelection; 43 import io.grpc.internal.TimeProvider; 44 import java.net.SocketAddress; 45 import java.util.ArrayList; 46 import java.util.Collection; 47 import java.util.HashMap; 48 import java.util.HashSet; 49 import java.util.List; 50 import java.util.Map; 51 import java.util.Random; 52 import java.util.Set; 53 import java.util.concurrent.ScheduledExecutorService; 54 import java.util.concurrent.atomic.AtomicLong; 55 import javax.annotation.Nullable; 56 57 /** 58 * Wraps a child {@code LoadBalancer} while monitoring for outlier backends and removing them from 59 * the use of the child LB. 60 * 61 * <p>This implements the outlier detection gRFC: 62 * https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md 63 */ 64 @Internal 65 public final class OutlierDetectionLoadBalancer extends LoadBalancer { 66 67 @VisibleForTesting 68 final AddressTrackerMap trackerMap; 69 70 private final SynchronizationContext syncContext; 71 private final Helper childHelper; 72 private final GracefulSwitchLoadBalancer switchLb; 73 private TimeProvider timeProvider; 74 private final ScheduledExecutorService timeService; 75 private ScheduledHandle detectionTimerHandle; 76 private Long detectionTimerStartNanos; 77 78 private final ChannelLogger logger; 79 80 private static final Attributes.Key<AddressTracker> ADDRESS_TRACKER_ATTR_KEY 81 = Attributes.Key.create("addressTrackerKey"); 82 83 /** 84 * Creates a new instance of {@link OutlierDetectionLoadBalancer}. 85 */ OutlierDetectionLoadBalancer(Helper helper, TimeProvider timeProvider)86 public OutlierDetectionLoadBalancer(Helper helper, TimeProvider timeProvider) { 87 logger = helper.getChannelLogger(); 88 childHelper = new ChildHelper(checkNotNull(helper, "helper")); 89 switchLb = new GracefulSwitchLoadBalancer(childHelper); 90 trackerMap = new AddressTrackerMap(); 91 this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); 92 this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService"); 93 this.timeProvider = timeProvider; 94 logger.log(ChannelLogLevel.DEBUG, "OutlierDetection lb created."); 95 } 96 97 @Override acceptResolvedAddresses(ResolvedAddresses resolvedAddresses)98 public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { 99 logger.log(ChannelLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); 100 OutlierDetectionLoadBalancerConfig config 101 = (OutlierDetectionLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); 102 103 // The map should only retain entries for addresses in this latest update. 104 ArrayList<SocketAddress> addresses = new ArrayList<>(); 105 for (EquivalentAddressGroup addressGroup : resolvedAddresses.getAddresses()) { 106 addresses.addAll(addressGroup.getAddresses()); 107 } 108 trackerMap.keySet().retainAll(addresses); 109 110 trackerMap.updateTrackerConfigs(config); 111 112 // Add any new ones. 113 trackerMap.putNewTrackers(config, addresses); 114 115 switchLb.switchTo(config.childPolicy.getProvider()); 116 117 // If outlier detection is actually configured, start a timer that will periodically try to 118 // detect outliers. 119 if (config.outlierDetectionEnabled()) { 120 Long initialDelayNanos; 121 122 if (detectionTimerStartNanos == null) { 123 // On the first go we use the configured interval. 124 initialDelayNanos = config.intervalNanos; 125 } else { 126 // If a timer has started earlier we cancel it and use the difference between the start 127 // time and now as the interval. 128 initialDelayNanos = Math.max(0L, 129 config.intervalNanos - (timeProvider.currentTimeNanos() - detectionTimerStartNanos)); 130 } 131 132 // If a timer has been previously created we need to cancel it and reset all the call counters 133 // for a fresh start. 134 if (detectionTimerHandle != null) { 135 detectionTimerHandle.cancel(); 136 trackerMap.resetCallCounters(); 137 } 138 139 detectionTimerHandle = syncContext.scheduleWithFixedDelay(new DetectionTimer(config, logger), 140 initialDelayNanos, config.intervalNanos, NANOSECONDS, timeService); 141 } else if (detectionTimerHandle != null) { 142 // Outlier detection is not configured, but we have a lingering timer. Let's cancel it and 143 // uneject any addresses we may have ejected. 144 detectionTimerHandle.cancel(); 145 detectionTimerStartNanos = null; 146 trackerMap.cancelTracking(); 147 } 148 149 switchLb.handleResolvedAddresses( 150 resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config.childPolicy.getConfig()) 151 .build()); 152 return true; 153 } 154 155 @Override handleNameResolutionError(Status error)156 public void handleNameResolutionError(Status error) { 157 switchLb.handleNameResolutionError(error); 158 } 159 160 @Override shutdown()161 public void shutdown() { 162 switchLb.shutdown(); 163 } 164 165 /** 166 * This timer will be invoked periodically, according to configuration, and it will look for any 167 * outlier subchannels. 168 */ 169 class DetectionTimer implements Runnable { 170 171 OutlierDetectionLoadBalancerConfig config; 172 ChannelLogger logger; 173 DetectionTimer(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger)174 DetectionTimer(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger) { 175 this.config = config; 176 this.logger = logger; 177 } 178 179 @Override run()180 public void run() { 181 detectionTimerStartNanos = timeProvider.currentTimeNanos(); 182 183 trackerMap.swapCounters(); 184 185 for (OutlierEjectionAlgorithm algo : OutlierEjectionAlgorithm.forConfig(config, logger)) { 186 algo.ejectOutliers(trackerMap, detectionTimerStartNanos); 187 } 188 189 trackerMap.maybeUnejectOutliers(detectionTimerStartNanos); 190 } 191 } 192 193 /** 194 * This child helper wraps the provided helper so that it can hand out wrapped {@link 195 * OutlierDetectionSubchannel}s and manage the address info map. 196 */ 197 class ChildHelper extends ForwardingLoadBalancerHelper { 198 199 private Helper delegate; 200 ChildHelper(Helper delegate)201 ChildHelper(Helper delegate) { 202 this.delegate = delegate; 203 } 204 205 @Override delegate()206 protected Helper delegate() { 207 return delegate; 208 } 209 210 @Override createSubchannel(CreateSubchannelArgs args)211 public Subchannel createSubchannel(CreateSubchannelArgs args) { 212 // Subchannels are wrapped so that we can monitor call results and to trigger failures when 213 // we decide to eject the subchannel. 214 OutlierDetectionSubchannel subchannel = new OutlierDetectionSubchannel( 215 delegate.createSubchannel(args)); 216 217 // If the subchannel is associated with a single address that is also already in the map 218 // the subchannel will be added to the map and be included in outlier detection. 219 List<EquivalentAddressGroup> addressGroups = args.getAddresses(); 220 if (hasSingleAddress(addressGroups) 221 && trackerMap.containsKey(addressGroups.get(0).getAddresses().get(0))) { 222 AddressTracker tracker = trackerMap.get(addressGroups.get(0).getAddresses().get(0)); 223 tracker.addSubchannel(subchannel); 224 225 // If this address has already been ejected, we need to immediately eject this Subchannel. 226 if (tracker.ejectionTimeNanos != null) { 227 subchannel.eject(); 228 } 229 } 230 231 return subchannel; 232 } 233 234 @Override updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker)235 public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { 236 delegate.updateBalancingState(newState, new OutlierDetectionPicker(newPicker)); 237 } 238 } 239 240 class OutlierDetectionSubchannel extends ForwardingSubchannel { 241 242 private final Subchannel delegate; 243 private AddressTracker addressTracker; 244 private boolean ejected; 245 private ConnectivityStateInfo lastSubchannelState; 246 private SubchannelStateListener subchannelStateListener; 247 private final ChannelLogger logger; 248 OutlierDetectionSubchannel(Subchannel delegate)249 OutlierDetectionSubchannel(Subchannel delegate) { 250 this.delegate = delegate; 251 this.logger = delegate.getChannelLogger(); 252 } 253 254 @Override start(SubchannelStateListener listener)255 public void start(SubchannelStateListener listener) { 256 subchannelStateListener = listener; 257 super.start(new OutlierDetectionSubchannelStateListener(listener)); 258 } 259 260 @Override getAttributes()261 public Attributes getAttributes() { 262 if (addressTracker != null) { 263 return delegate.getAttributes().toBuilder().set(ADDRESS_TRACKER_ATTR_KEY, addressTracker) 264 .build(); 265 } else { 266 return delegate.getAttributes(); 267 } 268 } 269 270 @Override updateAddresses(List<EquivalentAddressGroup> addressGroups)271 public void updateAddresses(List<EquivalentAddressGroup> addressGroups) { 272 // Outlier detection only supports subchannels with a single address, but the list of 273 // addressGroups associated with a subchannel can change at any time, so we need to react to 274 // changes in the address list plurality. 275 276 // No change in address plurality, we replace the single one with a new one. 277 if (hasSingleAddress(getAllAddresses()) && hasSingleAddress(addressGroups)) { 278 // Remove the current subchannel from the old address it is associated with in the map. 279 if (trackerMap.containsValue(addressTracker)) { 280 addressTracker.removeSubchannel(this); 281 } 282 283 // If the map has an entry for the new address, we associate this subchannel with it. 284 SocketAddress address = addressGroups.get(0).getAddresses().get(0); 285 if (trackerMap.containsKey(address)) { 286 trackerMap.get(address).addSubchannel(this); 287 } 288 } else if (hasSingleAddress(getAllAddresses()) && !hasSingleAddress(addressGroups)) { 289 // We go from a single address to having multiple, making this subchannel uneligible for 290 // outlier detection. Remove it from all trackers and reset the call counters of all the 291 // associated trackers. 292 // Remove the current subchannel from the old address it is associated with in the map. 293 if (trackerMap.containsKey(getAddresses().getAddresses().get(0))) { 294 AddressTracker tracker = trackerMap.get(getAddresses().getAddresses().get(0)); 295 tracker.removeSubchannel(this); 296 tracker.resetCallCounters(); 297 } 298 } else if (!hasSingleAddress(getAllAddresses()) && hasSingleAddress(addressGroups)) { 299 // We go from, previously uneligble, multiple address mode to a single address. If the map 300 // has an entry for the new address, we associate this subchannel with it. 301 SocketAddress address = addressGroups.get(0).getAddresses().get(0); 302 if (trackerMap.containsKey(address)) { 303 AddressTracker tracker = trackerMap.get(address); 304 tracker.addSubchannel(this); 305 } 306 } 307 308 // We could also have multiple addressGroups and get an update for multiple new ones. This is 309 // a no-op as we will just continue to ignore multiple address subchannels. 310 311 delegate.updateAddresses(addressGroups); 312 } 313 314 /** 315 * If the {@link Subchannel} is considered for outlier detection the associated {@link 316 * AddressTracker} should be set. 317 */ setAddressTracker(AddressTracker addressTracker)318 void setAddressTracker(AddressTracker addressTracker) { 319 this.addressTracker = addressTracker; 320 } 321 clearAddressTracker()322 void clearAddressTracker() { 323 this.addressTracker = null; 324 } 325 eject()326 void eject() { 327 ejected = true; 328 subchannelStateListener.onSubchannelState( 329 ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); 330 logger.log(ChannelLogLevel.INFO, "Subchannel ejected: {0}", this); 331 } 332 uneject()333 void uneject() { 334 ejected = false; 335 if (lastSubchannelState != null) { 336 subchannelStateListener.onSubchannelState(lastSubchannelState); 337 logger.log(ChannelLogLevel.INFO, "Subchannel unejected: {0}", this); 338 } 339 } 340 isEjected()341 boolean isEjected() { 342 return ejected; 343 } 344 345 @Override delegate()346 protected Subchannel delegate() { 347 return delegate; 348 } 349 350 /** 351 * Wraps the actual listener so that state changes from the actual one can be intercepted. 352 */ 353 class OutlierDetectionSubchannelStateListener implements SubchannelStateListener { 354 355 private final SubchannelStateListener delegate; 356 OutlierDetectionSubchannelStateListener(SubchannelStateListener delegate)357 OutlierDetectionSubchannelStateListener(SubchannelStateListener delegate) { 358 this.delegate = delegate; 359 } 360 361 @Override onSubchannelState(ConnectivityStateInfo newState)362 public void onSubchannelState(ConnectivityStateInfo newState) { 363 lastSubchannelState = newState; 364 if (!ejected) { 365 delegate.onSubchannelState(newState); 366 } 367 } 368 } 369 370 @Override toString()371 public String toString() { 372 return "OutlierDetectionSubchannel{" 373 + "addresses=" + delegate.getAllAddresses() 374 + '}'; 375 } 376 } 377 378 379 /** 380 * This picker delegates the actual picking logic to a wrapped delegate, but associates a {@link 381 * ClientStreamTracer} with each pick to track the results of each subchannel stream. 382 */ 383 class OutlierDetectionPicker extends SubchannelPicker { 384 385 private final SubchannelPicker delegate; 386 OutlierDetectionPicker(SubchannelPicker delegate)387 OutlierDetectionPicker(SubchannelPicker delegate) { 388 this.delegate = delegate; 389 } 390 391 @Override pickSubchannel(PickSubchannelArgs args)392 public PickResult pickSubchannel(PickSubchannelArgs args) { 393 PickResult pickResult = delegate.pickSubchannel(args); 394 395 Subchannel subchannel = pickResult.getSubchannel(); 396 if (subchannel != null) { 397 return PickResult.withSubchannel(subchannel, 398 new ResultCountingClientStreamTracerFactory( 399 subchannel.getAttributes().get(ADDRESS_TRACKER_ATTR_KEY))); 400 } 401 402 return pickResult; 403 } 404 405 /** 406 * Builds instances of {@link ResultCountingClientStreamTracer}. 407 */ 408 class ResultCountingClientStreamTracerFactory extends ClientStreamTracer.Factory { 409 410 private final AddressTracker tracker; 411 ResultCountingClientStreamTracerFactory(AddressTracker tracker)412 ResultCountingClientStreamTracerFactory(AddressTracker tracker) { 413 this.tracker = tracker; 414 } 415 416 @Override newClientStreamTracer(StreamInfo info, Metadata headers)417 public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { 418 return new ResultCountingClientStreamTracer(tracker); 419 } 420 } 421 422 /** 423 * Counts the results (successful/unsuccessful) of a particular {@link 424 * OutlierDetectionSubchannel}s streams and increments the counter in the associated {@link 425 * AddressTracker}. 426 */ 427 class ResultCountingClientStreamTracer extends ClientStreamTracer { 428 429 AddressTracker tracker; 430 ResultCountingClientStreamTracer(AddressTracker tracker)431 public ResultCountingClientStreamTracer(AddressTracker tracker) { 432 this.tracker = tracker; 433 } 434 435 @Override streamClosed(Status status)436 public void streamClosed(Status status) { 437 tracker.incrementCallCount(status.isOk()); 438 } 439 } 440 } 441 442 /** 443 * Tracks additional information about a set of equivalent addresses needed for outlier 444 * detection. 445 */ 446 static class AddressTracker { 447 448 private OutlierDetectionLoadBalancerConfig config; 449 // Marked as volatile to assure that when the inactive counter is swapped in as the new active 450 // one, all threads see the change and don't hold on to a reference to the now inactive counter. 451 private volatile CallCounter activeCallCounter = new CallCounter(); 452 private CallCounter inactiveCallCounter = new CallCounter(); 453 private Long ejectionTimeNanos; 454 private int ejectionTimeMultiplier; 455 private final Set<OutlierDetectionSubchannel> subchannels = new HashSet<>(); 456 AddressTracker(OutlierDetectionLoadBalancerConfig config)457 AddressTracker(OutlierDetectionLoadBalancerConfig config) { 458 this.config = config; 459 } 460 setConfig(OutlierDetectionLoadBalancerConfig config)461 void setConfig(OutlierDetectionLoadBalancerConfig config) { 462 this.config = config; 463 } 464 465 /** 466 * Adds a subchannel to the tracker, while assuring that the subchannel ejection status is 467 * updated to match the tracker's if needed. 468 */ addSubchannel(OutlierDetectionSubchannel subchannel)469 boolean addSubchannel(OutlierDetectionSubchannel subchannel) { 470 // Make sure that the subchannel is in the same ejection state as the new tracker it is 471 // associated with. 472 if (subchannelsEjected() && !subchannel.isEjected()) { 473 subchannel.eject(); 474 } else if (!subchannelsEjected() && subchannel.isEjected()) { 475 subchannel.uneject(); 476 } 477 subchannel.setAddressTracker(this); 478 return subchannels.add(subchannel); 479 } 480 removeSubchannel(OutlierDetectionSubchannel subchannel)481 boolean removeSubchannel(OutlierDetectionSubchannel subchannel) { 482 subchannel.clearAddressTracker(); 483 return subchannels.remove(subchannel); 484 } 485 containsSubchannel(OutlierDetectionSubchannel subchannel)486 boolean containsSubchannel(OutlierDetectionSubchannel subchannel) { 487 return subchannels.contains(subchannel); 488 } 489 490 @VisibleForTesting getSubchannels()491 Set<OutlierDetectionSubchannel> getSubchannels() { 492 return ImmutableSet.copyOf(subchannels); 493 } 494 incrementCallCount(boolean success)495 void incrementCallCount(boolean success) { 496 // If neither algorithm is configured, no point in incrementing counters. 497 if (config.successRateEjection == null && config.failurePercentageEjection == null) { 498 return; 499 } 500 501 if (success) { 502 activeCallCounter.successCount.getAndIncrement(); 503 } else { 504 activeCallCounter.failureCount.getAndIncrement(); 505 } 506 } 507 508 @VisibleForTesting activeVolume()509 long activeVolume() { 510 return activeCallCounter.successCount.get() + activeCallCounter.failureCount.get(); 511 } 512 inactiveVolume()513 long inactiveVolume() { 514 return inactiveCallCounter.successCount.get() + inactiveCallCounter.failureCount.get(); 515 } 516 successRate()517 double successRate() { 518 return ((double) inactiveCallCounter.successCount.get()) / inactiveVolume(); 519 } 520 failureRate()521 double failureRate() { 522 return ((double)inactiveCallCounter.failureCount.get()) / inactiveVolume(); 523 } 524 resetCallCounters()525 void resetCallCounters() { 526 activeCallCounter.reset(); 527 inactiveCallCounter.reset(); 528 } 529 decrementEjectionTimeMultiplier()530 void decrementEjectionTimeMultiplier() { 531 // The multiplier should not go negative. 532 ejectionTimeMultiplier = ejectionTimeMultiplier == 0 ? 0 : ejectionTimeMultiplier - 1; 533 } 534 resetEjectionTimeMultiplier()535 void resetEjectionTimeMultiplier() { 536 ejectionTimeMultiplier = 0; 537 } 538 539 /** 540 * Swaps the active and inactive counters. 541 * 542 * <p>Note that this method is not thread safe as the swap is not done atomically. This is 543 * expected to only be called from the timer that is scheduled at a fixed delay, assuring that 544 * only one timer is active at a time. 545 */ swapCounters()546 void swapCounters() { 547 inactiveCallCounter.reset(); 548 CallCounter tempCounter = activeCallCounter; 549 activeCallCounter = inactiveCallCounter; 550 inactiveCallCounter = tempCounter; 551 } 552 ejectSubchannels(long ejectionTimeNanos)553 void ejectSubchannels(long ejectionTimeNanos) { 554 this.ejectionTimeNanos = ejectionTimeNanos; 555 ejectionTimeMultiplier++; 556 for (OutlierDetectionSubchannel subchannel : subchannels) { 557 subchannel.eject(); 558 } 559 } 560 561 /** 562 * Uneject a currently ejected address. 563 */ unejectSubchannels()564 void unejectSubchannels() { 565 checkState(ejectionTimeNanos != null, "not currently ejected"); 566 ejectionTimeNanos = null; 567 for (OutlierDetectionSubchannel subchannel : subchannels) { 568 subchannel.uneject(); 569 } 570 } 571 subchannelsEjected()572 boolean subchannelsEjected() { 573 return ejectionTimeNanos != null; 574 } 575 maxEjectionTimeElapsed(long currentTimeNanos)576 public boolean maxEjectionTimeElapsed(long currentTimeNanos) { 577 // The instant in time beyond which the address should no longer be ejected. Also making sure 578 // we honor any maximum ejection time setting. 579 long maxEjectionDurationSecs 580 = Math.max(config.baseEjectionTimeNanos, config.maxEjectionTimeNanos); 581 long maxEjectionTimeNanos = 582 ejectionTimeNanos + Math.min( 583 config.baseEjectionTimeNanos * ejectionTimeMultiplier, 584 maxEjectionDurationSecs); 585 586 return currentTimeNanos > maxEjectionTimeNanos; 587 } 588 589 /** Tracks both successful and failed call counts. */ 590 private static class CallCounter { 591 AtomicLong successCount = new AtomicLong(); 592 AtomicLong failureCount = new AtomicLong(); 593 reset()594 void reset() { 595 successCount.set(0); 596 failureCount.set(0); 597 } 598 } 599 600 @Override toString()601 public String toString() { 602 return "AddressTracker{" 603 + "subchannels=" + subchannels 604 + '}'; 605 } 606 } 607 608 /** 609 * Maintains a mapping from addresses to their trackers. 610 */ 611 static class AddressTrackerMap extends ForwardingMap<SocketAddress, AddressTracker> { 612 private final Map<SocketAddress, AddressTracker> trackerMap; 613 AddressTrackerMap()614 AddressTrackerMap() { 615 trackerMap = new HashMap<>(); 616 } 617 618 @Override delegate()619 protected Map<SocketAddress, AddressTracker> delegate() { 620 return trackerMap; 621 } 622 updateTrackerConfigs(OutlierDetectionLoadBalancerConfig config)623 void updateTrackerConfigs(OutlierDetectionLoadBalancerConfig config) { 624 for (AddressTracker tracker: trackerMap.values()) { 625 tracker.setConfig(config); 626 } 627 } 628 629 /** Adds a new tracker for every given address. */ putNewTrackers(OutlierDetectionLoadBalancerConfig config, Collection<SocketAddress> addresses)630 void putNewTrackers(OutlierDetectionLoadBalancerConfig config, 631 Collection<SocketAddress> addresses) { 632 for (SocketAddress address : addresses) { 633 if (!trackerMap.containsKey(address)) { 634 trackerMap.put(address, new AddressTracker(config)); 635 } 636 } 637 } 638 639 /** Resets the call counters for all the trackers in the map. */ resetCallCounters()640 void resetCallCounters() { 641 for (AddressTracker tracker : trackerMap.values()) { 642 tracker.resetCallCounters(); 643 } 644 } 645 646 /** 647 * When OD gets disabled we need to uneject any subchannels that may have been ejected and 648 * to reset the ejection time multiplier. 649 */ cancelTracking()650 void cancelTracking() { 651 for (AddressTracker tracker : trackerMap.values()) { 652 if (tracker.subchannelsEjected()) { 653 tracker.unejectSubchannels(); 654 } 655 tracker.resetEjectionTimeMultiplier(); 656 } 657 } 658 659 /** Swaps the active and inactive counters for each tracker. */ swapCounters()660 void swapCounters() { 661 for (AddressTracker tracker : trackerMap.values()) { 662 tracker.swapCounters(); 663 } 664 } 665 666 /** 667 * At the end of a timer run we need to decrement the ejection time multiplier for trackers 668 * that don't have ejected subchannels and uneject ones that have spent the maximum ejection 669 * time allowed. 670 */ maybeUnejectOutliers(Long detectionTimerStartNanos)671 void maybeUnejectOutliers(Long detectionTimerStartNanos) { 672 for (AddressTracker tracker : trackerMap.values()) { 673 if (!tracker.subchannelsEjected()) { 674 tracker.decrementEjectionTimeMultiplier(); 675 } 676 677 if (tracker.subchannelsEjected() && tracker.maxEjectionTimeElapsed( 678 detectionTimerStartNanos)) { 679 tracker.unejectSubchannels(); 680 } 681 } 682 } 683 684 /** 685 * How many percent of the addresses have been ejected. 686 */ ejectionPercentage()687 double ejectionPercentage() { 688 if (trackerMap.isEmpty()) { 689 return 0; 690 } 691 int totalAddresses = 0; 692 int ejectedAddresses = 0; 693 for (AddressTracker tracker : trackerMap.values()) { 694 totalAddresses++; 695 if (tracker.subchannelsEjected()) { 696 ejectedAddresses++; 697 } 698 } 699 return ((double)ejectedAddresses / totalAddresses) * 100; 700 } 701 } 702 703 704 /** 705 * Implementations provide different ways of ejecting outlier addresses.. 706 */ 707 interface OutlierEjectionAlgorithm { 708 709 /** Eject any outlier addresses. */ ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos)710 void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos); 711 712 /** Builds a list of algorithms that are enabled in the given config. */ 713 @Nullable forConfig(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger)714 static List<OutlierEjectionAlgorithm> forConfig(OutlierDetectionLoadBalancerConfig config, 715 ChannelLogger logger) { 716 ImmutableList.Builder<OutlierEjectionAlgorithm> algoListBuilder = ImmutableList.builder(); 717 if (config.successRateEjection != null) { 718 algoListBuilder.add(new SuccessRateOutlierEjectionAlgorithm(config, logger)); 719 } 720 if (config.failurePercentageEjection != null) { 721 algoListBuilder.add(new FailurePercentageOutlierEjectionAlgorithm(config, logger)); 722 } 723 return algoListBuilder.build(); 724 } 725 } 726 727 /** 728 * This algorithm ejects addresses that don't maintain a required rate of successful calls. The 729 * required rate is not fixed, but is based on the mean and standard deviation of the success 730 * rates of all of the addresses. 731 */ 732 static class SuccessRateOutlierEjectionAlgorithm implements OutlierEjectionAlgorithm { 733 734 private final OutlierDetectionLoadBalancerConfig config; 735 736 private final ChannelLogger logger; 737 SuccessRateOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger)738 SuccessRateOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config, 739 ChannelLogger logger) { 740 checkArgument(config.successRateEjection != null, "success rate ejection config is null"); 741 this.config = config; 742 this.logger = logger; 743 } 744 745 @Override ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos)746 public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) { 747 748 // Only consider addresses that have the minimum request volume specified in the config. 749 List<AddressTracker> trackersWithVolume = trackersWithVolume(trackerMap, 750 config.successRateEjection.requestVolume); 751 // If we don't have enough addresses with significant volume then there's nothing to do. 752 if (trackersWithVolume.size() < config.successRateEjection.minimumHosts 753 || trackersWithVolume.size() == 0) { 754 return; 755 } 756 757 // Calculate mean and standard deviation of the fractions of successful calls. 758 List<Double> successRates = new ArrayList<>(); 759 for (AddressTracker tracker : trackersWithVolume) { 760 successRates.add(tracker.successRate()); 761 } 762 double mean = mean(successRates); 763 double stdev = standardDeviation(successRates, mean); 764 765 double requiredSuccessRate = 766 mean - stdev * (config.successRateEjection.stdevFactor / 1000f); 767 768 for (AddressTracker tracker : trackersWithVolume) { 769 // If we are above or equal to the max ejection percentage, don't eject any more. This will 770 // allow the total ejections to go one above the max, but at the same time it assures at 771 // least one ejection, which the spec calls for. This behavior matches what Envoy proxy 772 // does. 773 if (trackerMap.ejectionPercentage() >= config.maxEjectionPercent) { 774 return; 775 } 776 777 // If success rate is below the threshold, eject the address. 778 if (tracker.successRate() < requiredSuccessRate) { 779 logger.log(ChannelLogLevel.DEBUG, 780 "SuccessRate algorithm detected outlier: {0}. " 781 + "Parameters: successRate={1}, mean={2}, stdev={3}, " 782 + "requiredSuccessRate={4}", 783 tracker, tracker.successRate(), mean, stdev, requiredSuccessRate); 784 // Only eject some addresses based on the enforcement percentage. 785 if (new Random().nextInt(100) < config.successRateEjection.enforcementPercentage) { 786 tracker.ejectSubchannels(ejectionTimeNanos); 787 } 788 } 789 } 790 } 791 792 /** Calculates the mean of the given values. */ 793 @VisibleForTesting mean(Collection<Double> values)794 static double mean(Collection<Double> values) { 795 double totalValue = 0; 796 for (double value : values) { 797 totalValue += value; 798 } 799 800 return totalValue / values.size(); 801 } 802 803 /** Calculates the standard deviation for the given values and their mean. */ 804 @VisibleForTesting standardDeviation(Collection<Double> values, double mean)805 static double standardDeviation(Collection<Double> values, double mean) { 806 double squaredDifferenceSum = 0; 807 for (double value : values) { 808 double difference = value - mean; 809 squaredDifferenceSum += difference * difference; 810 } 811 double variance = squaredDifferenceSum / values.size(); 812 813 return Math.sqrt(variance); 814 } 815 } 816 817 static class FailurePercentageOutlierEjectionAlgorithm implements OutlierEjectionAlgorithm { 818 819 private final OutlierDetectionLoadBalancerConfig config; 820 821 private final ChannelLogger logger; 822 FailurePercentageOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config, ChannelLogger logger)823 FailurePercentageOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config, 824 ChannelLogger logger) { 825 this.config = config; 826 this.logger = logger; 827 } 828 829 @Override ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos)830 public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) { 831 832 // Only consider addresses that have the minimum request volume specified in the config. 833 List<AddressTracker> trackersWithVolume = trackersWithVolume(trackerMap, 834 config.failurePercentageEjection.requestVolume); 835 // If we don't have enough addresses with significant volume then there's nothing to do. 836 if (trackersWithVolume.size() < config.failurePercentageEjection.minimumHosts 837 || trackersWithVolume.size() == 0) { 838 return; 839 } 840 841 // If this address does not have enough volume to be considered, skip to the next one. 842 for (AddressTracker tracker : trackersWithVolume) { 843 // If we are above or equal to the max ejection percentage, don't eject any more. This will 844 // allow the total ejections to go one above the max, but at the same time it assures at 845 // least one ejection, which the spec calls for. This behavior matches what Envoy proxy 846 // does. 847 if (trackerMap.ejectionPercentage() >= config.maxEjectionPercent) { 848 return; 849 } 850 851 if (tracker.inactiveVolume() < config.failurePercentageEjection.requestVolume) { 852 continue; 853 } 854 855 // If the failure rate is above the threshold, we should eject... 856 double maxFailureRate = ((double)config.failurePercentageEjection.threshold) / 100; 857 if (tracker.failureRate() > maxFailureRate) { 858 logger.log(ChannelLogLevel.DEBUG, 859 "FailurePercentage algorithm detected outlier: {0}, failureRate={1}", 860 tracker, tracker.failureRate()); 861 // ...but only enforce this based on the enforcement percentage. 862 if (new Random().nextInt(100) < config.failurePercentageEjection.enforcementPercentage) { 863 tracker.ejectSubchannels(ejectionTimeNanos); 864 } 865 } 866 } 867 } 868 } 869 870 /** Returns only the trackers that have the minimum configured volume to be considered. */ trackersWithVolume(AddressTrackerMap trackerMap, int volume)871 private static List<AddressTracker> trackersWithVolume(AddressTrackerMap trackerMap, 872 int volume) { 873 List<AddressTracker> trackersWithVolume = new ArrayList<>(); 874 for (AddressTracker tracker : trackerMap.values()) { 875 if (tracker.inactiveVolume() >= volume) { 876 trackersWithVolume.add(tracker); 877 } 878 } 879 return trackersWithVolume; 880 } 881 882 /** Counts how many addresses are in a given address group. */ hasSingleAddress(List<EquivalentAddressGroup> addressGroups)883 private static boolean hasSingleAddress(List<EquivalentAddressGroup> addressGroups) { 884 int addressCount = 0; 885 for (EquivalentAddressGroup addressGroup : addressGroups) { 886 addressCount += addressGroup.getAddresses().size(); 887 if (addressCount > 1) { 888 return false; 889 } 890 } 891 return true; 892 } 893 894 /** 895 * The configuration for {@link OutlierDetectionLoadBalancer}. 896 */ 897 public static final class OutlierDetectionLoadBalancerConfig { 898 899 public final Long intervalNanos; 900 public final Long baseEjectionTimeNanos; 901 public final Long maxEjectionTimeNanos; 902 public final Integer maxEjectionPercent; 903 public final SuccessRateEjection successRateEjection; 904 public final FailurePercentageEjection failurePercentageEjection; 905 public final PolicySelection childPolicy; 906 OutlierDetectionLoadBalancerConfig(Long intervalNanos, Long baseEjectionTimeNanos, Long maxEjectionTimeNanos, Integer maxEjectionPercent, SuccessRateEjection successRateEjection, FailurePercentageEjection failurePercentageEjection, PolicySelection childPolicy)907 private OutlierDetectionLoadBalancerConfig(Long intervalNanos, 908 Long baseEjectionTimeNanos, 909 Long maxEjectionTimeNanos, 910 Integer maxEjectionPercent, 911 SuccessRateEjection successRateEjection, 912 FailurePercentageEjection failurePercentageEjection, 913 PolicySelection childPolicy) { 914 this.intervalNanos = intervalNanos; 915 this.baseEjectionTimeNanos = baseEjectionTimeNanos; 916 this.maxEjectionTimeNanos = maxEjectionTimeNanos; 917 this.maxEjectionPercent = maxEjectionPercent; 918 this.successRateEjection = successRateEjection; 919 this.failurePercentageEjection = failurePercentageEjection; 920 this.childPolicy = childPolicy; 921 } 922 923 /** Builds a new {@link OutlierDetectionLoadBalancerConfig}. */ 924 public static class Builder { 925 Long intervalNanos = 10_000_000_000L; // 10s 926 Long baseEjectionTimeNanos = 30_000_000_000L; // 30s 927 Long maxEjectionTimeNanos = 300_000_000_000L; // 300s 928 Integer maxEjectionPercent = 10; 929 SuccessRateEjection successRateEjection; 930 FailurePercentageEjection failurePercentageEjection; 931 PolicySelection childPolicy; 932 933 /** The interval between outlier detection sweeps. */ setIntervalNanos(Long intervalNanos)934 public Builder setIntervalNanos(Long intervalNanos) { 935 checkArgument(intervalNanos != null); 936 this.intervalNanos = intervalNanos; 937 return this; 938 } 939 940 /** The base time an address is ejected for. */ setBaseEjectionTimeNanos(Long baseEjectionTimeNanos)941 public Builder setBaseEjectionTimeNanos(Long baseEjectionTimeNanos) { 942 checkArgument(baseEjectionTimeNanos != null); 943 this.baseEjectionTimeNanos = baseEjectionTimeNanos; 944 return this; 945 } 946 947 /** The longest time an address can be ejected. */ setMaxEjectionTimeNanos(Long maxEjectionTimeNanos)948 public Builder setMaxEjectionTimeNanos(Long maxEjectionTimeNanos) { 949 checkArgument(maxEjectionTimeNanos != null); 950 this.maxEjectionTimeNanos = maxEjectionTimeNanos; 951 return this; 952 } 953 954 /** The algorithm agnostic maximum percentage of addresses that can be ejected. */ setMaxEjectionPercent(Integer maxEjectionPercent)955 public Builder setMaxEjectionPercent(Integer maxEjectionPercent) { 956 checkArgument(maxEjectionPercent != null); 957 this.maxEjectionPercent = maxEjectionPercent; 958 return this; 959 } 960 961 /** Set to enable success rate ejection. */ setSuccessRateEjection( SuccessRateEjection successRateEjection)962 public Builder setSuccessRateEjection( 963 SuccessRateEjection successRateEjection) { 964 this.successRateEjection = successRateEjection; 965 return this; 966 } 967 968 /** Set to enable failure percentage ejection. */ setFailurePercentageEjection( FailurePercentageEjection failurePercentageEjection)969 public Builder setFailurePercentageEjection( 970 FailurePercentageEjection failurePercentageEjection) { 971 this.failurePercentageEjection = failurePercentageEjection; 972 return this; 973 } 974 975 /** Sets the child policy the {@link OutlierDetectionLoadBalancer} delegates to. */ setChildPolicy(PolicySelection childPolicy)976 public Builder setChildPolicy(PolicySelection childPolicy) { 977 checkState(childPolicy != null); 978 this.childPolicy = childPolicy; 979 return this; 980 } 981 982 /** Builds a new instance of {@link OutlierDetectionLoadBalancerConfig}. */ build()983 public OutlierDetectionLoadBalancerConfig build() { 984 checkState(childPolicy != null); 985 return new OutlierDetectionLoadBalancerConfig(intervalNanos, baseEjectionTimeNanos, 986 maxEjectionTimeNanos, maxEjectionPercent, successRateEjection, 987 failurePercentageEjection, childPolicy); 988 } 989 } 990 991 /** The configuration for success rate ejection. */ 992 public static class SuccessRateEjection { 993 994 public final Integer stdevFactor; 995 public final Integer enforcementPercentage; 996 public final Integer minimumHosts; 997 public final Integer requestVolume; 998 SuccessRateEjection(Integer stdevFactor, Integer enforcementPercentage, Integer minimumHosts, Integer requestVolume)999 SuccessRateEjection(Integer stdevFactor, Integer enforcementPercentage, Integer minimumHosts, 1000 Integer requestVolume) { 1001 this.stdevFactor = stdevFactor; 1002 this.enforcementPercentage = enforcementPercentage; 1003 this.minimumHosts = minimumHosts; 1004 this.requestVolume = requestVolume; 1005 } 1006 1007 /** Builds new instances of {@link SuccessRateEjection}. */ 1008 public static final class Builder { 1009 1010 Integer stdevFactor = 1900; 1011 Integer enforcementPercentage = 100; 1012 Integer minimumHosts = 5; 1013 Integer requestVolume = 100; 1014 1015 /** The product of this and the standard deviation of success rates determine the ejection 1016 * threshold. 1017 */ setStdevFactor(Integer stdevFactor)1018 public Builder setStdevFactor(Integer stdevFactor) { 1019 checkArgument(stdevFactor != null); 1020 this.stdevFactor = stdevFactor; 1021 return this; 1022 } 1023 1024 /** Only eject this percentage of outliers. */ setEnforcementPercentage(Integer enforcementPercentage)1025 public Builder setEnforcementPercentage(Integer enforcementPercentage) { 1026 checkArgument(enforcementPercentage != null); 1027 checkArgument(enforcementPercentage >= 0 && enforcementPercentage <= 100); 1028 this.enforcementPercentage = enforcementPercentage; 1029 return this; 1030 } 1031 1032 /** The minimum amount of hosts needed for success rate ejection. */ setMinimumHosts(Integer minimumHosts)1033 public Builder setMinimumHosts(Integer minimumHosts) { 1034 checkArgument(minimumHosts != null); 1035 checkArgument(minimumHosts >= 0); 1036 this.minimumHosts = minimumHosts; 1037 return this; 1038 } 1039 1040 /** The minimum address request volume to be considered for success rate ejection. */ setRequestVolume(Integer requestVolume)1041 public Builder setRequestVolume(Integer requestVolume) { 1042 checkArgument(requestVolume != null); 1043 checkArgument(requestVolume >= 0); 1044 this.requestVolume = requestVolume; 1045 return this; 1046 } 1047 1048 /** Builds a new instance of {@link SuccessRateEjection}. */ build()1049 public SuccessRateEjection build() { 1050 return new SuccessRateEjection(stdevFactor, enforcementPercentage, minimumHosts, 1051 requestVolume); 1052 } 1053 } 1054 } 1055 1056 /** The configuration for failure percentage ejection. */ 1057 public static class FailurePercentageEjection { 1058 public final Integer threshold; 1059 public final Integer enforcementPercentage; 1060 public final Integer minimumHosts; 1061 public final Integer requestVolume; 1062 FailurePercentageEjection(Integer threshold, Integer enforcementPercentage, Integer minimumHosts, Integer requestVolume)1063 FailurePercentageEjection(Integer threshold, Integer enforcementPercentage, 1064 Integer minimumHosts, Integer requestVolume) { 1065 this.threshold = threshold; 1066 this.enforcementPercentage = enforcementPercentage; 1067 this.minimumHosts = minimumHosts; 1068 this.requestVolume = requestVolume; 1069 } 1070 1071 /** For building new {@link FailurePercentageEjection} instances. */ 1072 public static class Builder { 1073 Integer threshold = 85; 1074 Integer enforcementPercentage = 100; 1075 Integer minimumHosts = 5; 1076 Integer requestVolume = 50; 1077 1078 /** The failure percentage that will result in an address being considered an outlier. */ setThreshold(Integer threshold)1079 public Builder setThreshold(Integer threshold) { 1080 checkArgument(threshold != null); 1081 checkArgument(threshold >= 0 && threshold <= 100); 1082 this.threshold = threshold; 1083 return this; 1084 } 1085 1086 /** Only eject this percentage of outliers. */ setEnforcementPercentage(Integer enforcementPercentage)1087 public Builder setEnforcementPercentage(Integer enforcementPercentage) { 1088 checkArgument(enforcementPercentage != null); 1089 checkArgument(enforcementPercentage >= 0 && enforcementPercentage <= 100); 1090 this.enforcementPercentage = enforcementPercentage; 1091 return this; 1092 } 1093 1094 /** The minimum amount of host for failure percentage ejection to be enabled. */ setMinimumHosts(Integer minimumHosts)1095 public Builder setMinimumHosts(Integer minimumHosts) { 1096 checkArgument(minimumHosts != null); 1097 checkArgument(minimumHosts >= 0); 1098 this.minimumHosts = minimumHosts; 1099 return this; 1100 } 1101 1102 /** 1103 * The request volume required for an address to be considered for failure percentage 1104 * ejection. 1105 */ setRequestVolume(Integer requestVolume)1106 public Builder setRequestVolume(Integer requestVolume) { 1107 checkArgument(requestVolume != null); 1108 checkArgument(requestVolume >= 0); 1109 this.requestVolume = requestVolume; 1110 return this; 1111 } 1112 1113 /** Builds a new instance of {@link FailurePercentageEjection}. */ build()1114 public FailurePercentageEjection build() { 1115 return new FailurePercentageEjection(threshold, enforcementPercentage, minimumHosts, 1116 requestVolume); 1117 } 1118 } 1119 } 1120 1121 /** Determine if any outlier detection algorithms are enabled in the config. */ outlierDetectionEnabled()1122 boolean outlierDetectionEnabled() { 1123 return successRateEjection != null || failurePercentageEjection != null; 1124 } 1125 } 1126 } 1127