• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2021 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.xds;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static io.grpc.ConnectivityState.CONNECTING;
23 import static io.grpc.ConnectivityState.IDLE;
24 import static io.grpc.ConnectivityState.READY;
25 import static io.grpc.ConnectivityState.SHUTDOWN;
26 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27 
28 import com.google.common.base.MoreObjects;
29 import com.google.common.collect.HashMultiset;
30 import com.google.common.collect.Multiset;
31 import com.google.common.collect.Sets;
32 import com.google.common.primitives.UnsignedInteger;
33 import io.grpc.Attributes;
34 import io.grpc.ConnectivityState;
35 import io.grpc.ConnectivityStateInfo;
36 import io.grpc.EquivalentAddressGroup;
37 import io.grpc.InternalLogId;
38 import io.grpc.LoadBalancer;
39 import io.grpc.Status;
40 import io.grpc.SynchronizationContext;
41 import io.grpc.xds.XdsLogger.XdsLogLevel;
42 import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
43 import java.net.SocketAddress;
44 import java.util.ArrayList;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.HashSet;
48 import java.util.Iterator;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.Random;
52 import java.util.Set;
53 import java.util.stream.Collectors;
54 import javax.annotation.Nullable;
55 
56 /**
57  * A {@link LoadBalancer} that provides consistent hashing based load balancing to upstream hosts.
58  * It implements the "Ketama" hashing that maps hosts onto a circle (the "ring") by hashing its
59  * addresses. Each request is routed to a host by hashing some property of the request and finding
60  * the nearest corresponding host clockwise around the ring. Each host is placed on the ring some
61  * number of times proportional to its weight. With the ring partitioned appropriately, the
62  * addition or removal of one host from a set of N hosts will affect only 1/N requests.
63  */
64 final class RingHashLoadBalancer extends LoadBalancer {
65   private static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO =
66       Attributes.Key.create("state-info");
67   private static final Status RPC_HASH_NOT_FOUND =
68       Status.INTERNAL.withDescription("RPC hash not found. Probably a bug because xds resolver"
69           + " config selector always generates a hash.");
70   private static final XxHash64 hashFunc = XxHash64.INSTANCE;
71 
72   private final XdsLogger logger;
73   private final SynchronizationContext syncContext;
74   private final Map<EquivalentAddressGroup, Subchannel> subchannels = new HashMap<>();
75   private final Helper helper;
76 
77   private List<RingEntry> ring;
78   private ConnectivityState currentState;
79   private Iterator<Subchannel> connectionAttemptIterator = subchannels.values().iterator();
80   private final Random random = new Random();
81 
RingHashLoadBalancer(Helper helper)82   RingHashLoadBalancer(Helper helper) {
83     this.helper = checkNotNull(helper, "helper");
84     syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
85     logger = XdsLogger.withLogId(InternalLogId.allocate("ring_hash_lb", helper.getAuthority()));
86     logger.log(XdsLogLevel.INFO, "Created");
87   }
88 
89   @Override
acceptResolvedAddresses(ResolvedAddresses resolvedAddresses)90   public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
91     logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
92     List<EquivalentAddressGroup> addrList = resolvedAddresses.getAddresses();
93     if (!validateAddrList(addrList)) {
94       return false;
95     }
96 
97     Map<EquivalentAddressGroup, EquivalentAddressGroup> latestAddrs = stripAttrs(addrList);
98     Set<EquivalentAddressGroup> removedAddrs =
99         Sets.newHashSet(Sets.difference(subchannels.keySet(), latestAddrs.keySet()));
100 
101     RingHashConfig config = (RingHashConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
102     Map<EquivalentAddressGroup, Long> serverWeights = new HashMap<>();
103     long totalWeight = 0L;
104     for (EquivalentAddressGroup eag : addrList) {
105       Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT);
106       // Support two ways of server weighing: either multiple instances of the same address
107       // or each address contains a per-address weight attribute. If a weight is not provided,
108       // each occurrence of the address will be counted a weight value of one.
109       if (weight == null) {
110         weight = 1L;
111       }
112       totalWeight += weight;
113       EquivalentAddressGroup addrKey = stripAttrs(eag);
114       if (serverWeights.containsKey(addrKey)) {
115         serverWeights.put(addrKey, serverWeights.get(addrKey) + weight);
116       } else {
117         serverWeights.put(addrKey, weight);
118       }
119 
120       Subchannel existingSubchannel = subchannels.get(addrKey);
121       if (existingSubchannel != null) {
122         existingSubchannel.updateAddresses(Collections.singletonList(eag));
123         continue;
124       }
125       Attributes attr = Attributes.newBuilder().set(
126           STATE_INFO, new Ref<>(ConnectivityStateInfo.forNonError(IDLE))).build();
127       final Subchannel subchannel = helper.createSubchannel(
128           CreateSubchannelArgs.newBuilder().setAddresses(eag).setAttributes(attr).build());
129       subchannel.start(new SubchannelStateListener() {
130         @Override
131         public void onSubchannelState(ConnectivityStateInfo newState) {
132           processSubchannelState(subchannel, newState);
133         }
134       });
135       subchannels.put(addrKey, subchannel);
136     }
137     long minWeight = Collections.min(serverWeights.values());
138     double normalizedMinWeight = (double) minWeight / totalWeight;
139     // Scale up the number of hashes per host such that the least-weighted host gets a whole
140     // number of hashes on the the ring. Other hosts might not end up with whole numbers, and
141     // that's fine (the ring-building algorithm can handle this). This preserves the original
142     // implementation's behavior: when weights aren't provided, all hosts should get an equal
143     // number of hashes. In the case where this number exceeds the max_ring_size, it's scaled
144     // back down to fit.
145     double scale = Math.min(
146         Math.ceil(normalizedMinWeight * config.minRingSize) / normalizedMinWeight,
147         (double) config.maxRingSize);
148     ring = buildRing(serverWeights, totalWeight, scale);
149 
150     // Shut down subchannels for delisted addresses.
151     List<Subchannel> removedSubchannels = new ArrayList<>();
152     for (EquivalentAddressGroup addr : removedAddrs) {
153       removedSubchannels.add(subchannels.remove(addr));
154     }
155     // If we need to proactively start connecting, iterate through all the subchannels, starting
156     // at a random position.
157     // Alternatively, we should better start at the same position.
158     connectionAttemptIterator = subchannels.values().iterator();
159     int randomAdvance = random.nextInt(subchannels.size());
160     while (randomAdvance-- > 0) {
161       connectionAttemptIterator.next();
162     }
163 
164     // Update the picker before shutting down the subchannels, to reduce the chance of race
165     // between picking a subchannel and shutting it down.
166     updateBalancingState();
167     for (Subchannel subchann : removedSubchannels) {
168       shutdownSubchannel(subchann);
169     }
170 
171     return true;
172   }
173 
validateAddrList(List<EquivalentAddressGroup> addrList)174   private boolean validateAddrList(List<EquivalentAddressGroup> addrList) {
175     if (addrList.isEmpty()) {
176       handleNameResolutionError(Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS "
177           + "resolution was successful, but returned server addresses are empty."));
178       return false;
179     }
180 
181     String dupAddrString = validateNoDuplicateAddresses(addrList);
182     if (dupAddrString != null) {
183       handleNameResolutionError(Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS "
184           + "resolution was successful, but there were duplicate addresses: " + dupAddrString));
185       return false;
186     }
187 
188     long totalWeight = 0;
189     for (EquivalentAddressGroup eag : addrList) {
190       Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT);
191 
192       if (weight == null) {
193         weight = 1L;
194       }
195 
196       if (weight < 0) {
197         handleNameResolutionError(Status.UNAVAILABLE.withDescription(
198             String.format("Ring hash lb error: EDS resolution was successful, but returned a "
199                 + "negative weight for %s.", stripAttrs(eag))));
200         return false;
201       }
202       if (weight > UnsignedInteger.MAX_VALUE.longValue()) {
203         handleNameResolutionError(Status.UNAVAILABLE.withDescription(
204             String.format("Ring hash lb error: EDS resolution was successful, but returned a weight"
205                 + " too large to fit in an unsigned int for %s.", stripAttrs(eag))));
206         return false;
207       }
208       totalWeight += weight;
209     }
210 
211     if (totalWeight > UnsignedInteger.MAX_VALUE.longValue()) {
212       handleNameResolutionError(Status.UNAVAILABLE.withDescription(
213           String.format(
214               "Ring hash lb error: EDS resolution was successful, but returned a sum of weights too"
215               + " large to fit in an unsigned int (%d).", totalWeight)));
216       return false;
217     }
218 
219     return true;
220   }
221 
222   @Nullable
validateNoDuplicateAddresses(List<EquivalentAddressGroup> addrList)223   private String validateNoDuplicateAddresses(List<EquivalentAddressGroup> addrList) {
224     Set<SocketAddress> addresses = new HashSet<>();
225     Multiset<String> dups = HashMultiset.create();
226     for (EquivalentAddressGroup eag : addrList) {
227       for (SocketAddress address : eag.getAddresses()) {
228         if (!addresses.add(address)) {
229           dups.add(address.toString());
230         }
231       }
232     }
233 
234     if (!dups.isEmpty()) {
235       return dups.entrySet().stream()
236           .map((dup) ->
237               String.format("Address: %s, count: %d", dup.getElement(), dup.getCount() + 1))
238           .collect(Collectors.joining("; "));
239     }
240 
241     return null;
242   }
243 
buildRing( Map<EquivalentAddressGroup, Long> serverWeights, long totalWeight, double scale)244   private static List<RingEntry> buildRing(
245       Map<EquivalentAddressGroup, Long> serverWeights, long totalWeight, double scale) {
246     List<RingEntry> ring = new ArrayList<>();
247     double currentHashes = 0.0;
248     double targetHashes = 0.0;
249     for (Map.Entry<EquivalentAddressGroup, Long> entry : serverWeights.entrySet()) {
250       EquivalentAddressGroup addrKey = entry.getKey();
251       double normalizedWeight = (double) entry.getValue() / totalWeight;
252       // TODO(chengyuanzhang): is using the list of socket address correct?
253       StringBuilder sb = new StringBuilder(addrKey.getAddresses().toString());
254       sb.append('_');
255       int lengthWithoutCounter = sb.length();
256       targetHashes += scale * normalizedWeight;
257       long i = 0L;
258       while (currentHashes < targetHashes) {
259         sb.append(i);
260         long hash = hashFunc.hashAsciiString(sb.toString());
261         ring.add(new RingEntry(hash, addrKey));
262         i++;
263         currentHashes++;
264         sb.setLength(lengthWithoutCounter);
265       }
266     }
267     Collections.sort(ring);
268     return Collections.unmodifiableList(ring);
269   }
270 
271   @Override
handleNameResolutionError(Status error)272   public void handleNameResolutionError(Status error) {
273     if (currentState != READY) {
274       helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
275     }
276   }
277 
278   @Override
shutdown()279   public void shutdown() {
280     logger.log(XdsLogLevel.INFO, "Shutdown");
281     for (Subchannel subchannel : subchannels.values()) {
282       shutdownSubchannel(subchannel);
283     }
284     subchannels.clear();
285   }
286 
287   /**
288    * Updates the overall balancing state by aggregating the connectivity states of all subchannels.
289    *
290    * <p>Aggregation rules (in order of dominance):
291    * <ol>
292    *   <li>If there is at least one subchannel in READY state, overall state is READY</li>
293    *   <li>If there are <em>2 or more</em> subchannels in TRANSIENT_FAILURE, overall state is
294    *   TRANSIENT_FAILURE</li>
295    *   <li>If there is at least one subchannel in CONNECTING state, overall state is
296    *   CONNECTING</li>
297    *   <li> If there is one subchannel in TRANSIENT_FAILURE state and there is
298    *    more than one subchannel, report CONNECTING </li>
299    *   <li>If there is at least one subchannel in IDLE state, overall state is IDLE</li>
300    *   <li>Otherwise, overall state is TRANSIENT_FAILURE</li>
301    * </ol>
302    */
updateBalancingState()303   private void updateBalancingState() {
304     checkState(!subchannels.isEmpty(), "no subchannel has been created");
305     boolean startConnectionAttempt = false;
306     int numIdle = 0;
307     int numReady = 0;
308     int numConnecting = 0;
309     int numTransientFailure = 0;
310     for (Subchannel subchannel : subchannels.values()) {
311       ConnectivityState state = getSubchannelStateInfoRef(subchannel).value.getState();
312       if (state == READY) {
313         numReady++;
314         break;
315       } else if (state == TRANSIENT_FAILURE) {
316         numTransientFailure++;
317       } else if (state == CONNECTING ) {
318         numConnecting++;
319       } else if (state == IDLE) {
320         numIdle++;
321       }
322     }
323     ConnectivityState overallState;
324     if (numReady > 0) {
325       overallState = READY;
326     } else if (numTransientFailure >= 2) {
327       overallState = TRANSIENT_FAILURE;
328       startConnectionAttempt = (numConnecting == 0);
329     } else if (numConnecting > 0) {
330       overallState = CONNECTING;
331     } else if (numTransientFailure == 1 && subchannels.size() > 1) {
332       overallState = CONNECTING;
333       startConnectionAttempt = true;
334     } else if (numIdle > 0) {
335       overallState = IDLE;
336     } else {
337       overallState = TRANSIENT_FAILURE;
338       startConnectionAttempt = true;
339     }
340     RingHashPicker picker = new RingHashPicker(syncContext, ring, subchannels);
341     // TODO(chengyuanzhang): avoid unnecessary reprocess caused by duplicated server addr updates
342     helper.updateBalancingState(overallState, picker);
343     currentState = overallState;
344     // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
345     // not be getting any pick requests from the priority policy.
346     // However, because the ring_hash policy does not attempt to
347     // reconnect to subchannels unless it is getting pick requests,
348     // it will need special handling to ensure that it will eventually
349     // recover from TRANSIENT_FAILURE state once the problem is resolved.
350     // Specifically, it will make sure that it is attempting to connect to
351     // at least one subchannel at any given time.  After a given subchannel
352     // fails a connection attempt, it will move on to the next subchannel
353     // in the ring.  It will keep doing this until one of the subchannels
354     // successfully connects, at which point it will report READY and stop
355     // proactively trying to connect.  The policy will remain in
356     // TRANSIENT_FAILURE until at least one subchannel becomes connected,
357     // even if subchannels are in state CONNECTING during that time.
358     //
359     // Note that we do the same thing when the policy is in state
360     // CONNECTING, just to ensure that we don't remain in CONNECTING state
361     // indefinitely if there are no new picks coming in.
362     if (startConnectionAttempt) {
363       if (!connectionAttemptIterator.hasNext()) {
364         connectionAttemptIterator = subchannels.values().iterator();
365       }
366       connectionAttemptIterator.next().requestConnection();
367     }
368   }
369 
processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo)370   private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
371     if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
372       return;
373     }
374     if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
375       helper.refreshNameResolution();
376     }
377     updateConnectivityState(subchannel, stateInfo);
378     updateBalancingState();
379   }
380 
updateConnectivityState(Subchannel subchannel, ConnectivityStateInfo stateInfo)381   private void updateConnectivityState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
382     Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel);
383     ConnectivityState previousConnectivityState = subchannelStateRef.value.getState();
384     // Don't proactively reconnect if the subchannel enters IDLE, even if previously was connected.
385     // If the subchannel was previously in TRANSIENT_FAILURE, it is considered to stay in
386     // TRANSIENT_FAILURE until it becomes READY.
387     if (previousConnectivityState == TRANSIENT_FAILURE) {
388       if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) {
389         return;
390       }
391     }
392     subchannelStateRef.value = stateInfo;
393   }
394 
shutdownSubchannel(Subchannel subchannel)395   private static void shutdownSubchannel(Subchannel subchannel) {
396     subchannel.shutdown();
397     getSubchannelStateInfoRef(subchannel).value = ConnectivityStateInfo.forNonError(SHUTDOWN);
398   }
399 
400   /**
401    * Converts list of {@link EquivalentAddressGroup} to {@link EquivalentAddressGroup} set and
402    * remove all attributes. The values are the original EAGs.
403    */
stripAttrs( List<EquivalentAddressGroup> groupList)404   private static Map<EquivalentAddressGroup, EquivalentAddressGroup> stripAttrs(
405       List<EquivalentAddressGroup> groupList) {
406     Map<EquivalentAddressGroup, EquivalentAddressGroup> addrs =
407         new HashMap<>(groupList.size() * 2);
408     for (EquivalentAddressGroup group : groupList) {
409       addrs.put(stripAttrs(group), group);
410     }
411     return addrs;
412   }
413 
stripAttrs(EquivalentAddressGroup eag)414   private static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) {
415     return new EquivalentAddressGroup(eag.getAddresses());
416   }
417 
getSubchannelStateInfoRef( Subchannel subchannel)418   private static Ref<ConnectivityStateInfo> getSubchannelStateInfoRef(
419       Subchannel subchannel) {
420     return checkNotNull(subchannel.getAttributes().get(STATE_INFO), "STATE_INFO");
421   }
422 
423   private static final class RingHashPicker extends SubchannelPicker {
424     private final SynchronizationContext syncContext;
425     private final List<RingEntry> ring;
426     // Avoid synchronization between pickSubchannel and subchannel's connectivity state change,
427     // freeze picker's view of subchannel's connectivity state.
428     // TODO(chengyuanzhang): can be more performance-friendly with
429     //  IdentityHashMap<Subchannel, ConnectivityStateInfo> and RingEntry contains Subchannel.
430     private final Map<EquivalentAddressGroup, SubchannelView> pickableSubchannels;  // read-only
431 
RingHashPicker( SynchronizationContext syncContext, List<RingEntry> ring, Map<EquivalentAddressGroup, Subchannel> subchannels)432     private RingHashPicker(
433         SynchronizationContext syncContext, List<RingEntry> ring,
434         Map<EquivalentAddressGroup, Subchannel> subchannels) {
435       this.syncContext = syncContext;
436       this.ring = ring;
437       pickableSubchannels = new HashMap<>(subchannels.size());
438       for (Map.Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()) {
439         Subchannel subchannel = entry.getValue();
440         ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).value;
441         pickableSubchannels.put(entry.getKey(), new SubchannelView(subchannel, stateInfo));
442       }
443     }
444 
445     @Override
pickSubchannel(PickSubchannelArgs args)446     public PickResult pickSubchannel(PickSubchannelArgs args) {
447       Long requestHash = args.getCallOptions().getOption(XdsNameResolver.RPC_HASH_KEY);
448       if (requestHash == null) {
449         return PickResult.withError(RPC_HASH_NOT_FOUND);
450       }
451 
452       // Find the ring entry with hash next to (clockwise) the RPC's hash.
453       int low = 0;
454       int high = ring.size();
455       int mid;
456       while (true) {
457         mid = (low + high) / 2;
458         if (mid == ring.size()) {
459           mid = 0;
460           break;
461         }
462         long midVal = ring.get(mid).hash;
463         long midValL = mid == 0 ? 0 : ring.get(mid - 1).hash;
464         if (requestHash <= midVal && requestHash > midValL) {
465           break;
466         }
467         if (midVal < requestHash) {
468           low = mid + 1;
469         } else {
470           high =  mid - 1;
471         }
472         if (low > high) {
473           mid = 0;
474           break;
475         }
476       }
477 
478       // Try finding a READY subchannel. Starting from the ring entry next to the RPC's hash.
479       // If the one of the first two subchannels is not in TRANSIENT_FAILURE, return result
480       // based on that subchannel. Otherwise, fail the pick unless a READY subchannel is found.
481       // Meanwhile, trigger connection for the channel and status:
482       // For the first subchannel that is in IDLE or TRANSIENT_FAILURE;
483       // And for the second subchannel that is in IDLE or TRANSIENT_FAILURE;
484       // And for each of the following subchannels that is in TRANSIENT_FAILURE or IDLE,
485       // stop until we find the first subchannel that is in CONNECTING or IDLE status.
486       boolean foundFirstNonFailed = false;  // true if having subchannel(s) in CONNECTING or IDLE
487       Subchannel firstSubchannel = null;
488       Subchannel secondSubchannel = null;
489       for (int i = 0; i < ring.size(); i++) {
490         int index = (mid + i) % ring.size();
491         EquivalentAddressGroup addrKey = ring.get(index).addrKey;
492         SubchannelView subchannel = pickableSubchannels.get(addrKey);
493         if (subchannel.stateInfo.getState() == READY) {
494           return PickResult.withSubchannel(subchannel.subchannel);
495         }
496 
497         // RPCs can be buffered if any of the first two subchannels is pending. Otherwise, RPCs
498         // are failed unless there is a READY connection.
499         if (firstSubchannel == null) {
500           firstSubchannel = subchannel.subchannel;
501           PickResult maybeBuffer = pickSubchannelsNonReady(subchannel);
502           if (maybeBuffer != null) {
503             return maybeBuffer;
504           }
505         } else if (subchannel.subchannel != firstSubchannel && secondSubchannel == null) {
506           secondSubchannel = subchannel.subchannel;
507           PickResult maybeBuffer = pickSubchannelsNonReady(subchannel);
508           if (maybeBuffer != null) {
509             return maybeBuffer;
510           }
511         } else if (subchannel.subchannel != firstSubchannel
512             && subchannel.subchannel != secondSubchannel) {
513           if (!foundFirstNonFailed) {
514             pickSubchannelsNonReady(subchannel);
515             if (subchannel.stateInfo.getState() != TRANSIENT_FAILURE) {
516               foundFirstNonFailed = true;
517             }
518           }
519         }
520       }
521       // Fail the pick with error status of the original subchannel hit by hash.
522       SubchannelView originalSubchannel = pickableSubchannels.get(ring.get(mid).addrKey);
523       return PickResult.withError(originalSubchannel.stateInfo.getStatus());
524     }
525 
526     @Nullable
pickSubchannelsNonReady(SubchannelView subchannel)527     private PickResult pickSubchannelsNonReady(SubchannelView subchannel) {
528       if (subchannel.stateInfo.getState() == TRANSIENT_FAILURE
529           || subchannel.stateInfo.getState() == IDLE ) {
530         final Subchannel finalSubchannel = subchannel.subchannel;
531         syncContext.execute(new Runnable() {
532           @Override
533           public void run() {
534             finalSubchannel.requestConnection();
535           }
536         });
537       }
538       if (subchannel.stateInfo.getState() == CONNECTING
539           || subchannel.stateInfo.getState() == IDLE) {
540         return PickResult.withNoResult();
541       } else {
542         return null;
543       }
544     }
545   }
546 
547   /**
548    * An unmodifiable view of a subchannel with state not subject to its real connectivity
549    * state changes.
550    */
551   private static final class SubchannelView {
552     private final Subchannel subchannel;
553     private final ConnectivityStateInfo stateInfo;
554 
SubchannelView(Subchannel subchannel, ConnectivityStateInfo stateInfo)555     private SubchannelView(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
556       this.subchannel = subchannel;
557       this.stateInfo = stateInfo;
558     }
559   }
560 
561   private static final class RingEntry implements Comparable<RingEntry> {
562     private final long hash;
563     private final EquivalentAddressGroup addrKey;
564 
RingEntry(long hash, EquivalentAddressGroup addrKey)565     private RingEntry(long hash, EquivalentAddressGroup addrKey) {
566       this.hash = hash;
567       this.addrKey = addrKey;
568     }
569 
570     @Override
compareTo(RingEntry entry)571     public int compareTo(RingEntry entry) {
572       return Long.compare(hash, entry.hash);
573     }
574   }
575 
576   /**
577    * A lighter weight Reference than AtomicReference.
578    */
579   private static final class Ref<T> {
580     T value;
581 
Ref(T value)582     Ref(T value) {
583       this.value = value;
584     }
585   }
586 
587   /**
588    * Configures the ring property. The larger the ring is (that is, the more hashes there are
589    * for each provided host) the better the request distribution will reflect the desired weights.
590    */
591   static final class RingHashConfig {
592     final long minRingSize;
593     final long maxRingSize;
594 
RingHashConfig(long minRingSize, long maxRingSize)595     RingHashConfig(long minRingSize, long maxRingSize) {
596       checkArgument(minRingSize > 0, "minRingSize <= 0");
597       checkArgument(maxRingSize > 0, "maxRingSize <= 0");
598       checkArgument(minRingSize <= maxRingSize, "minRingSize > maxRingSize");
599       this.minRingSize = minRingSize;
600       this.maxRingSize = maxRingSize;
601     }
602 
603     @Override
toString()604     public String toString() {
605       return MoreObjects.toStringHelper(this)
606           .add("minRingSize", minRingSize)
607           .add("maxRingSize", maxRingSize)
608           .toString();
609     }
610   }
611 }
612