1 /* 2 * Copyright 2021 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static io.grpc.ConnectivityState.CONNECTING; 23 import static io.grpc.ConnectivityState.IDLE; 24 import static io.grpc.ConnectivityState.READY; 25 import static io.grpc.ConnectivityState.SHUTDOWN; 26 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 27 28 import com.google.common.base.MoreObjects; 29 import com.google.common.collect.HashMultiset; 30 import com.google.common.collect.Multiset; 31 import com.google.common.collect.Sets; 32 import com.google.common.primitives.UnsignedInteger; 33 import io.grpc.Attributes; 34 import io.grpc.ConnectivityState; 35 import io.grpc.ConnectivityStateInfo; 36 import io.grpc.EquivalentAddressGroup; 37 import io.grpc.InternalLogId; 38 import io.grpc.LoadBalancer; 39 import io.grpc.Status; 40 import io.grpc.SynchronizationContext; 41 import io.grpc.xds.XdsLogger.XdsLogLevel; 42 import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; 43 import java.net.SocketAddress; 44 import java.util.ArrayList; 45 import java.util.Collections; 46 import java.util.HashMap; 47 import java.util.HashSet; 48 import java.util.Iterator; 49 import java.util.List; 50 import java.util.Map; 51 import java.util.Random; 52 import java.util.Set; 53 import java.util.stream.Collectors; 54 import javax.annotation.Nullable; 55 56 /** 57 * A {@link LoadBalancer} that provides consistent hashing based load balancing to upstream hosts. 58 * It implements the "Ketama" hashing that maps hosts onto a circle (the "ring") by hashing its 59 * addresses. Each request is routed to a host by hashing some property of the request and finding 60 * the nearest corresponding host clockwise around the ring. Each host is placed on the ring some 61 * number of times proportional to its weight. With the ring partitioned appropriately, the 62 * addition or removal of one host from a set of N hosts will affect only 1/N requests. 63 */ 64 final class RingHashLoadBalancer extends LoadBalancer { 65 private static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO = 66 Attributes.Key.create("state-info"); 67 private static final Status RPC_HASH_NOT_FOUND = 68 Status.INTERNAL.withDescription("RPC hash not found. Probably a bug because xds resolver" 69 + " config selector always generates a hash."); 70 private static final XxHash64 hashFunc = XxHash64.INSTANCE; 71 72 private final XdsLogger logger; 73 private final SynchronizationContext syncContext; 74 private final Map<EquivalentAddressGroup, Subchannel> subchannels = new HashMap<>(); 75 private final Helper helper; 76 77 private List<RingEntry> ring; 78 private ConnectivityState currentState; 79 private Iterator<Subchannel> connectionAttemptIterator = subchannels.values().iterator(); 80 private final Random random = new Random(); 81 RingHashLoadBalancer(Helper helper)82 RingHashLoadBalancer(Helper helper) { 83 this.helper = checkNotNull(helper, "helper"); 84 syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); 85 logger = XdsLogger.withLogId(InternalLogId.allocate("ring_hash_lb", helper.getAuthority())); 86 logger.log(XdsLogLevel.INFO, "Created"); 87 } 88 89 @Override acceptResolvedAddresses(ResolvedAddresses resolvedAddresses)90 public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { 91 logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); 92 List<EquivalentAddressGroup> addrList = resolvedAddresses.getAddresses(); 93 if (!validateAddrList(addrList)) { 94 return false; 95 } 96 97 Map<EquivalentAddressGroup, EquivalentAddressGroup> latestAddrs = stripAttrs(addrList); 98 Set<EquivalentAddressGroup> removedAddrs = 99 Sets.newHashSet(Sets.difference(subchannels.keySet(), latestAddrs.keySet())); 100 101 RingHashConfig config = (RingHashConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); 102 Map<EquivalentAddressGroup, Long> serverWeights = new HashMap<>(); 103 long totalWeight = 0L; 104 for (EquivalentAddressGroup eag : addrList) { 105 Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT); 106 // Support two ways of server weighing: either multiple instances of the same address 107 // or each address contains a per-address weight attribute. If a weight is not provided, 108 // each occurrence of the address will be counted a weight value of one. 109 if (weight == null) { 110 weight = 1L; 111 } 112 totalWeight += weight; 113 EquivalentAddressGroup addrKey = stripAttrs(eag); 114 if (serverWeights.containsKey(addrKey)) { 115 serverWeights.put(addrKey, serverWeights.get(addrKey) + weight); 116 } else { 117 serverWeights.put(addrKey, weight); 118 } 119 120 Subchannel existingSubchannel = subchannels.get(addrKey); 121 if (existingSubchannel != null) { 122 existingSubchannel.updateAddresses(Collections.singletonList(eag)); 123 continue; 124 } 125 Attributes attr = Attributes.newBuilder().set( 126 STATE_INFO, new Ref<>(ConnectivityStateInfo.forNonError(IDLE))).build(); 127 final Subchannel subchannel = helper.createSubchannel( 128 CreateSubchannelArgs.newBuilder().setAddresses(eag).setAttributes(attr).build()); 129 subchannel.start(new SubchannelStateListener() { 130 @Override 131 public void onSubchannelState(ConnectivityStateInfo newState) { 132 processSubchannelState(subchannel, newState); 133 } 134 }); 135 subchannels.put(addrKey, subchannel); 136 } 137 long minWeight = Collections.min(serverWeights.values()); 138 double normalizedMinWeight = (double) minWeight / totalWeight; 139 // Scale up the number of hashes per host such that the least-weighted host gets a whole 140 // number of hashes on the the ring. Other hosts might not end up with whole numbers, and 141 // that's fine (the ring-building algorithm can handle this). This preserves the original 142 // implementation's behavior: when weights aren't provided, all hosts should get an equal 143 // number of hashes. In the case where this number exceeds the max_ring_size, it's scaled 144 // back down to fit. 145 double scale = Math.min( 146 Math.ceil(normalizedMinWeight * config.minRingSize) / normalizedMinWeight, 147 (double) config.maxRingSize); 148 ring = buildRing(serverWeights, totalWeight, scale); 149 150 // Shut down subchannels for delisted addresses. 151 List<Subchannel> removedSubchannels = new ArrayList<>(); 152 for (EquivalentAddressGroup addr : removedAddrs) { 153 removedSubchannels.add(subchannels.remove(addr)); 154 } 155 // If we need to proactively start connecting, iterate through all the subchannels, starting 156 // at a random position. 157 // Alternatively, we should better start at the same position. 158 connectionAttemptIterator = subchannels.values().iterator(); 159 int randomAdvance = random.nextInt(subchannels.size()); 160 while (randomAdvance-- > 0) { 161 connectionAttemptIterator.next(); 162 } 163 164 // Update the picker before shutting down the subchannels, to reduce the chance of race 165 // between picking a subchannel and shutting it down. 166 updateBalancingState(); 167 for (Subchannel subchann : removedSubchannels) { 168 shutdownSubchannel(subchann); 169 } 170 171 return true; 172 } 173 validateAddrList(List<EquivalentAddressGroup> addrList)174 private boolean validateAddrList(List<EquivalentAddressGroup> addrList) { 175 if (addrList.isEmpty()) { 176 handleNameResolutionError(Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS " 177 + "resolution was successful, but returned server addresses are empty.")); 178 return false; 179 } 180 181 String dupAddrString = validateNoDuplicateAddresses(addrList); 182 if (dupAddrString != null) { 183 handleNameResolutionError(Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS " 184 + "resolution was successful, but there were duplicate addresses: " + dupAddrString)); 185 return false; 186 } 187 188 long totalWeight = 0; 189 for (EquivalentAddressGroup eag : addrList) { 190 Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT); 191 192 if (weight == null) { 193 weight = 1L; 194 } 195 196 if (weight < 0) { 197 handleNameResolutionError(Status.UNAVAILABLE.withDescription( 198 String.format("Ring hash lb error: EDS resolution was successful, but returned a " 199 + "negative weight for %s.", stripAttrs(eag)))); 200 return false; 201 } 202 if (weight > UnsignedInteger.MAX_VALUE.longValue()) { 203 handleNameResolutionError(Status.UNAVAILABLE.withDescription( 204 String.format("Ring hash lb error: EDS resolution was successful, but returned a weight" 205 + " too large to fit in an unsigned int for %s.", stripAttrs(eag)))); 206 return false; 207 } 208 totalWeight += weight; 209 } 210 211 if (totalWeight > UnsignedInteger.MAX_VALUE.longValue()) { 212 handleNameResolutionError(Status.UNAVAILABLE.withDescription( 213 String.format( 214 "Ring hash lb error: EDS resolution was successful, but returned a sum of weights too" 215 + " large to fit in an unsigned int (%d).", totalWeight))); 216 return false; 217 } 218 219 return true; 220 } 221 222 @Nullable validateNoDuplicateAddresses(List<EquivalentAddressGroup> addrList)223 private String validateNoDuplicateAddresses(List<EquivalentAddressGroup> addrList) { 224 Set<SocketAddress> addresses = new HashSet<>(); 225 Multiset<String> dups = HashMultiset.create(); 226 for (EquivalentAddressGroup eag : addrList) { 227 for (SocketAddress address : eag.getAddresses()) { 228 if (!addresses.add(address)) { 229 dups.add(address.toString()); 230 } 231 } 232 } 233 234 if (!dups.isEmpty()) { 235 return dups.entrySet().stream() 236 .map((dup) -> 237 String.format("Address: %s, count: %d", dup.getElement(), dup.getCount() + 1)) 238 .collect(Collectors.joining("; ")); 239 } 240 241 return null; 242 } 243 buildRing( Map<EquivalentAddressGroup, Long> serverWeights, long totalWeight, double scale)244 private static List<RingEntry> buildRing( 245 Map<EquivalentAddressGroup, Long> serverWeights, long totalWeight, double scale) { 246 List<RingEntry> ring = new ArrayList<>(); 247 double currentHashes = 0.0; 248 double targetHashes = 0.0; 249 for (Map.Entry<EquivalentAddressGroup, Long> entry : serverWeights.entrySet()) { 250 EquivalentAddressGroup addrKey = entry.getKey(); 251 double normalizedWeight = (double) entry.getValue() / totalWeight; 252 // TODO(chengyuanzhang): is using the list of socket address correct? 253 StringBuilder sb = new StringBuilder(addrKey.getAddresses().toString()); 254 sb.append('_'); 255 int lengthWithoutCounter = sb.length(); 256 targetHashes += scale * normalizedWeight; 257 long i = 0L; 258 while (currentHashes < targetHashes) { 259 sb.append(i); 260 long hash = hashFunc.hashAsciiString(sb.toString()); 261 ring.add(new RingEntry(hash, addrKey)); 262 i++; 263 currentHashes++; 264 sb.setLength(lengthWithoutCounter); 265 } 266 } 267 Collections.sort(ring); 268 return Collections.unmodifiableList(ring); 269 } 270 271 @Override handleNameResolutionError(Status error)272 public void handleNameResolutionError(Status error) { 273 if (currentState != READY) { 274 helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); 275 } 276 } 277 278 @Override shutdown()279 public void shutdown() { 280 logger.log(XdsLogLevel.INFO, "Shutdown"); 281 for (Subchannel subchannel : subchannels.values()) { 282 shutdownSubchannel(subchannel); 283 } 284 subchannels.clear(); 285 } 286 287 /** 288 * Updates the overall balancing state by aggregating the connectivity states of all subchannels. 289 * 290 * <p>Aggregation rules (in order of dominance): 291 * <ol> 292 * <li>If there is at least one subchannel in READY state, overall state is READY</li> 293 * <li>If there are <em>2 or more</em> subchannels in TRANSIENT_FAILURE, overall state is 294 * TRANSIENT_FAILURE</li> 295 * <li>If there is at least one subchannel in CONNECTING state, overall state is 296 * CONNECTING</li> 297 * <li> If there is one subchannel in TRANSIENT_FAILURE state and there is 298 * more than one subchannel, report CONNECTING </li> 299 * <li>If there is at least one subchannel in IDLE state, overall state is IDLE</li> 300 * <li>Otherwise, overall state is TRANSIENT_FAILURE</li> 301 * </ol> 302 */ updateBalancingState()303 private void updateBalancingState() { 304 checkState(!subchannels.isEmpty(), "no subchannel has been created"); 305 boolean startConnectionAttempt = false; 306 int numIdle = 0; 307 int numReady = 0; 308 int numConnecting = 0; 309 int numTransientFailure = 0; 310 for (Subchannel subchannel : subchannels.values()) { 311 ConnectivityState state = getSubchannelStateInfoRef(subchannel).value.getState(); 312 if (state == READY) { 313 numReady++; 314 break; 315 } else if (state == TRANSIENT_FAILURE) { 316 numTransientFailure++; 317 } else if (state == CONNECTING ) { 318 numConnecting++; 319 } else if (state == IDLE) { 320 numIdle++; 321 } 322 } 323 ConnectivityState overallState; 324 if (numReady > 0) { 325 overallState = READY; 326 } else if (numTransientFailure >= 2) { 327 overallState = TRANSIENT_FAILURE; 328 startConnectionAttempt = (numConnecting == 0); 329 } else if (numConnecting > 0) { 330 overallState = CONNECTING; 331 } else if (numTransientFailure == 1 && subchannels.size() > 1) { 332 overallState = CONNECTING; 333 startConnectionAttempt = true; 334 } else if (numIdle > 0) { 335 overallState = IDLE; 336 } else { 337 overallState = TRANSIENT_FAILURE; 338 startConnectionAttempt = true; 339 } 340 RingHashPicker picker = new RingHashPicker(syncContext, ring, subchannels); 341 // TODO(chengyuanzhang): avoid unnecessary reprocess caused by duplicated server addr updates 342 helper.updateBalancingState(overallState, picker); 343 currentState = overallState; 344 // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will 345 // not be getting any pick requests from the priority policy. 346 // However, because the ring_hash policy does not attempt to 347 // reconnect to subchannels unless it is getting pick requests, 348 // it will need special handling to ensure that it will eventually 349 // recover from TRANSIENT_FAILURE state once the problem is resolved. 350 // Specifically, it will make sure that it is attempting to connect to 351 // at least one subchannel at any given time. After a given subchannel 352 // fails a connection attempt, it will move on to the next subchannel 353 // in the ring. It will keep doing this until one of the subchannels 354 // successfully connects, at which point it will report READY and stop 355 // proactively trying to connect. The policy will remain in 356 // TRANSIENT_FAILURE until at least one subchannel becomes connected, 357 // even if subchannels are in state CONNECTING during that time. 358 // 359 // Note that we do the same thing when the policy is in state 360 // CONNECTING, just to ensure that we don't remain in CONNECTING state 361 // indefinitely if there are no new picks coming in. 362 if (startConnectionAttempt) { 363 if (!connectionAttemptIterator.hasNext()) { 364 connectionAttemptIterator = subchannels.values().iterator(); 365 } 366 connectionAttemptIterator.next().requestConnection(); 367 } 368 } 369 processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo)370 private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { 371 if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) { 372 return; 373 } 374 if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) { 375 helper.refreshNameResolution(); 376 } 377 updateConnectivityState(subchannel, stateInfo); 378 updateBalancingState(); 379 } 380 updateConnectivityState(Subchannel subchannel, ConnectivityStateInfo stateInfo)381 private void updateConnectivityState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { 382 Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel); 383 ConnectivityState previousConnectivityState = subchannelStateRef.value.getState(); 384 // Don't proactively reconnect if the subchannel enters IDLE, even if previously was connected. 385 // If the subchannel was previously in TRANSIENT_FAILURE, it is considered to stay in 386 // TRANSIENT_FAILURE until it becomes READY. 387 if (previousConnectivityState == TRANSIENT_FAILURE) { 388 if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) { 389 return; 390 } 391 } 392 subchannelStateRef.value = stateInfo; 393 } 394 shutdownSubchannel(Subchannel subchannel)395 private static void shutdownSubchannel(Subchannel subchannel) { 396 subchannel.shutdown(); 397 getSubchannelStateInfoRef(subchannel).value = ConnectivityStateInfo.forNonError(SHUTDOWN); 398 } 399 400 /** 401 * Converts list of {@link EquivalentAddressGroup} to {@link EquivalentAddressGroup} set and 402 * remove all attributes. The values are the original EAGs. 403 */ stripAttrs( List<EquivalentAddressGroup> groupList)404 private static Map<EquivalentAddressGroup, EquivalentAddressGroup> stripAttrs( 405 List<EquivalentAddressGroup> groupList) { 406 Map<EquivalentAddressGroup, EquivalentAddressGroup> addrs = 407 new HashMap<>(groupList.size() * 2); 408 for (EquivalentAddressGroup group : groupList) { 409 addrs.put(stripAttrs(group), group); 410 } 411 return addrs; 412 } 413 stripAttrs(EquivalentAddressGroup eag)414 private static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) { 415 return new EquivalentAddressGroup(eag.getAddresses()); 416 } 417 getSubchannelStateInfoRef( Subchannel subchannel)418 private static Ref<ConnectivityStateInfo> getSubchannelStateInfoRef( 419 Subchannel subchannel) { 420 return checkNotNull(subchannel.getAttributes().get(STATE_INFO), "STATE_INFO"); 421 } 422 423 private static final class RingHashPicker extends SubchannelPicker { 424 private final SynchronizationContext syncContext; 425 private final List<RingEntry> ring; 426 // Avoid synchronization between pickSubchannel and subchannel's connectivity state change, 427 // freeze picker's view of subchannel's connectivity state. 428 // TODO(chengyuanzhang): can be more performance-friendly with 429 // IdentityHashMap<Subchannel, ConnectivityStateInfo> and RingEntry contains Subchannel. 430 private final Map<EquivalentAddressGroup, SubchannelView> pickableSubchannels; // read-only 431 RingHashPicker( SynchronizationContext syncContext, List<RingEntry> ring, Map<EquivalentAddressGroup, Subchannel> subchannels)432 private RingHashPicker( 433 SynchronizationContext syncContext, List<RingEntry> ring, 434 Map<EquivalentAddressGroup, Subchannel> subchannels) { 435 this.syncContext = syncContext; 436 this.ring = ring; 437 pickableSubchannels = new HashMap<>(subchannels.size()); 438 for (Map.Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()) { 439 Subchannel subchannel = entry.getValue(); 440 ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).value; 441 pickableSubchannels.put(entry.getKey(), new SubchannelView(subchannel, stateInfo)); 442 } 443 } 444 445 @Override pickSubchannel(PickSubchannelArgs args)446 public PickResult pickSubchannel(PickSubchannelArgs args) { 447 Long requestHash = args.getCallOptions().getOption(XdsNameResolver.RPC_HASH_KEY); 448 if (requestHash == null) { 449 return PickResult.withError(RPC_HASH_NOT_FOUND); 450 } 451 452 // Find the ring entry with hash next to (clockwise) the RPC's hash. 453 int low = 0; 454 int high = ring.size(); 455 int mid; 456 while (true) { 457 mid = (low + high) / 2; 458 if (mid == ring.size()) { 459 mid = 0; 460 break; 461 } 462 long midVal = ring.get(mid).hash; 463 long midValL = mid == 0 ? 0 : ring.get(mid - 1).hash; 464 if (requestHash <= midVal && requestHash > midValL) { 465 break; 466 } 467 if (midVal < requestHash) { 468 low = mid + 1; 469 } else { 470 high = mid - 1; 471 } 472 if (low > high) { 473 mid = 0; 474 break; 475 } 476 } 477 478 // Try finding a READY subchannel. Starting from the ring entry next to the RPC's hash. 479 // If the one of the first two subchannels is not in TRANSIENT_FAILURE, return result 480 // based on that subchannel. Otherwise, fail the pick unless a READY subchannel is found. 481 // Meanwhile, trigger connection for the channel and status: 482 // For the first subchannel that is in IDLE or TRANSIENT_FAILURE; 483 // And for the second subchannel that is in IDLE or TRANSIENT_FAILURE; 484 // And for each of the following subchannels that is in TRANSIENT_FAILURE or IDLE, 485 // stop until we find the first subchannel that is in CONNECTING or IDLE status. 486 boolean foundFirstNonFailed = false; // true if having subchannel(s) in CONNECTING or IDLE 487 Subchannel firstSubchannel = null; 488 Subchannel secondSubchannel = null; 489 for (int i = 0; i < ring.size(); i++) { 490 int index = (mid + i) % ring.size(); 491 EquivalentAddressGroup addrKey = ring.get(index).addrKey; 492 SubchannelView subchannel = pickableSubchannels.get(addrKey); 493 if (subchannel.stateInfo.getState() == READY) { 494 return PickResult.withSubchannel(subchannel.subchannel); 495 } 496 497 // RPCs can be buffered if any of the first two subchannels is pending. Otherwise, RPCs 498 // are failed unless there is a READY connection. 499 if (firstSubchannel == null) { 500 firstSubchannel = subchannel.subchannel; 501 PickResult maybeBuffer = pickSubchannelsNonReady(subchannel); 502 if (maybeBuffer != null) { 503 return maybeBuffer; 504 } 505 } else if (subchannel.subchannel != firstSubchannel && secondSubchannel == null) { 506 secondSubchannel = subchannel.subchannel; 507 PickResult maybeBuffer = pickSubchannelsNonReady(subchannel); 508 if (maybeBuffer != null) { 509 return maybeBuffer; 510 } 511 } else if (subchannel.subchannel != firstSubchannel 512 && subchannel.subchannel != secondSubchannel) { 513 if (!foundFirstNonFailed) { 514 pickSubchannelsNonReady(subchannel); 515 if (subchannel.stateInfo.getState() != TRANSIENT_FAILURE) { 516 foundFirstNonFailed = true; 517 } 518 } 519 } 520 } 521 // Fail the pick with error status of the original subchannel hit by hash. 522 SubchannelView originalSubchannel = pickableSubchannels.get(ring.get(mid).addrKey); 523 return PickResult.withError(originalSubchannel.stateInfo.getStatus()); 524 } 525 526 @Nullable pickSubchannelsNonReady(SubchannelView subchannel)527 private PickResult pickSubchannelsNonReady(SubchannelView subchannel) { 528 if (subchannel.stateInfo.getState() == TRANSIENT_FAILURE 529 || subchannel.stateInfo.getState() == IDLE ) { 530 final Subchannel finalSubchannel = subchannel.subchannel; 531 syncContext.execute(new Runnable() { 532 @Override 533 public void run() { 534 finalSubchannel.requestConnection(); 535 } 536 }); 537 } 538 if (subchannel.stateInfo.getState() == CONNECTING 539 || subchannel.stateInfo.getState() == IDLE) { 540 return PickResult.withNoResult(); 541 } else { 542 return null; 543 } 544 } 545 } 546 547 /** 548 * An unmodifiable view of a subchannel with state not subject to its real connectivity 549 * state changes. 550 */ 551 private static final class SubchannelView { 552 private final Subchannel subchannel; 553 private final ConnectivityStateInfo stateInfo; 554 SubchannelView(Subchannel subchannel, ConnectivityStateInfo stateInfo)555 private SubchannelView(Subchannel subchannel, ConnectivityStateInfo stateInfo) { 556 this.subchannel = subchannel; 557 this.stateInfo = stateInfo; 558 } 559 } 560 561 private static final class RingEntry implements Comparable<RingEntry> { 562 private final long hash; 563 private final EquivalentAddressGroup addrKey; 564 RingEntry(long hash, EquivalentAddressGroup addrKey)565 private RingEntry(long hash, EquivalentAddressGroup addrKey) { 566 this.hash = hash; 567 this.addrKey = addrKey; 568 } 569 570 @Override compareTo(RingEntry entry)571 public int compareTo(RingEntry entry) { 572 return Long.compare(hash, entry.hash); 573 } 574 } 575 576 /** 577 * A lighter weight Reference than AtomicReference. 578 */ 579 private static final class Ref<T> { 580 T value; 581 Ref(T value)582 Ref(T value) { 583 this.value = value; 584 } 585 } 586 587 /** 588 * Configures the ring property. The larger the ring is (that is, the more hashes there are 589 * for each provided host) the better the request distribution will reflect the desired weights. 590 */ 591 static final class RingHashConfig { 592 final long minRingSize; 593 final long maxRingSize; 594 RingHashConfig(long minRingSize, long maxRingSize)595 RingHashConfig(long minRingSize, long maxRingSize) { 596 checkArgument(minRingSize > 0, "minRingSize <= 0"); 597 checkArgument(maxRingSize > 0, "maxRingSize <= 0"); 598 checkArgument(minRingSize <= maxRingSize, "minRingSize > maxRingSize"); 599 this.minRingSize = minRingSize; 600 this.maxRingSize = maxRingSize; 601 } 602 603 @Override toString()604 public String toString() { 605 return MoreObjects.toStringHelper(this) 606 .add("minRingSize", minRingSize) 607 .add("maxRingSize", maxRingSize) 608 .toString(); 609 } 610 } 611 } 612