• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.util;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static io.grpc.ConnectivityState.CONNECTING;
21 import static io.grpc.ConnectivityState.IDLE;
22 import static io.grpc.ConnectivityState.READY;
23 import static io.grpc.ConnectivityState.SHUTDOWN;
24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25 
26 import com.google.common.annotations.VisibleForTesting;
27 import com.google.common.base.Objects;
28 import com.google.common.base.Preconditions;
29 
30 import io.grpc.Attributes;
31 import io.grpc.ConnectivityState;
32 import io.grpc.ConnectivityStateInfo;
33 import io.grpc.EquivalentAddressGroup;
34 import io.grpc.ExperimentalApi;
35 import io.grpc.LoadBalancer;
36 import io.grpc.LoadBalancer.PickResult;
37 import io.grpc.LoadBalancer.PickSubchannelArgs;
38 import io.grpc.LoadBalancer.Subchannel;
39 import io.grpc.LoadBalancer.SubchannelPicker;
40 import io.grpc.Metadata;
41 import io.grpc.Metadata.Key;
42 import io.grpc.NameResolver;
43 import io.grpc.Status;
44 import io.grpc.internal.GrpcAttributes;
45 import io.grpc.internal.ServiceConfigUtil;
46 import java.util.ArrayList;
47 import java.util.Collection;
48 import java.util.HashMap;
49 import java.util.HashSet;
50 import java.util.List;
51 import java.util.Map;
52 import java.util.Queue;
53 import java.util.Random;
54 import java.util.Set;
55 import java.util.concurrent.ConcurrentHashMap;
56 import java.util.concurrent.ConcurrentLinkedQueue;
57 import java.util.concurrent.ConcurrentMap;
58 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
59 import java.util.logging.Level;
60 import java.util.logging.Logger;
61 import javax.annotation.Nonnull;
62 import javax.annotation.Nullable;
63 
64 /**
65  * A {@link LoadBalancer} that provides round-robin load balancing mechanism over the
66  * addresses from the {@link NameResolver}.  The sub-lists received from the name resolver
67  * are considered to be an {@link EquivalentAddressGroup} and each of these sub-lists is
68  * what is then balanced across.
69  */
70 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
71 public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
72 
73   private static final RoundRobinLoadBalancerFactory INSTANCE =
74       new RoundRobinLoadBalancerFactory();
75 
RoundRobinLoadBalancerFactory()76   private RoundRobinLoadBalancerFactory() {}
77 
78   /**
79    * A lighter weight Reference than AtomicReference.
80    */
81   @VisibleForTesting
82   static final class Ref<T> {
83     T value;
84 
Ref(T value)85     Ref(T value) {
86       this.value = value;
87     }
88   }
89 
90   /**
91    * Gets the singleton instance of this factory.
92    */
getInstance()93   public static RoundRobinLoadBalancerFactory getInstance() {
94     return INSTANCE;
95   }
96 
97   @Override
newLoadBalancer(LoadBalancer.Helper helper)98   public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
99     return new RoundRobinLoadBalancer(helper);
100   }
101 
102   @VisibleForTesting
103   static final class RoundRobinLoadBalancer extends LoadBalancer {
104     @VisibleForTesting
105     static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO =
106         Attributes.Key.create("state-info");
107     // package-private to avoid synthetic access
108     static final Attributes.Key<Ref<Subchannel>> STICKY_REF = Attributes.Key.create("sticky-ref");
109 
110     private static final Logger logger = Logger.getLogger(RoundRobinLoadBalancer.class.getName());
111 
112     private final Helper helper;
113     private final Map<EquivalentAddressGroup, Subchannel> subchannels =
114         new HashMap<EquivalentAddressGroup, Subchannel>();
115     private final Random random;
116 
117     private ConnectivityState currentState;
118     private RoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK);
119 
120     @Nullable
121     private StickinessState stickinessState;
122 
RoundRobinLoadBalancer(Helper helper)123     RoundRobinLoadBalancer(Helper helper) {
124       this.helper = checkNotNull(helper, "helper");
125       this.random = new Random();
126     }
127 
128     @Override
handleResolvedAddressGroups( List<EquivalentAddressGroup> servers, Attributes attributes)129     public void handleResolvedAddressGroups(
130         List<EquivalentAddressGroup> servers, Attributes attributes) {
131       Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet();
132       Set<EquivalentAddressGroup> latestAddrs = stripAttrs(servers);
133       Set<EquivalentAddressGroup> addedAddrs = setsDifference(latestAddrs, currentAddrs);
134       Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs);
135 
136       Map<String, Object> serviceConfig =
137           attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
138       if (serviceConfig != null) {
139         String stickinessMetadataKey =
140             ServiceConfigUtil.getStickinessMetadataKeyFromServiceConfig(serviceConfig);
141         if (stickinessMetadataKey != null) {
142           if (stickinessMetadataKey.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
143             logger.log(
144                 Level.FINE,
145                 "Binary stickiness header is not supported. The header '{0}' will be ignored",
146                 stickinessMetadataKey);
147           } else if (stickinessState == null
148               || !stickinessState.key.name().equals(stickinessMetadataKey)) {
149             stickinessState = new StickinessState(stickinessMetadataKey);
150           }
151         }
152       }
153 
154       // Create new subchannels for new addresses.
155       for (EquivalentAddressGroup addressGroup : addedAddrs) {
156         // NB(lukaszx0): we don't merge `attributes` with `subchannelAttr` because subchannel
157         // doesn't need them. They're describing the resolved server list but we're not taking
158         // any action based on this information.
159         Attributes.Builder subchannelAttrs = Attributes.newBuilder()
160             // NB(lukaszx0): because attributes are immutable we can't set new value for the key
161             // after creation but since we can mutate the values we leverage that and set
162             // AtomicReference which will allow mutating state info for given channel.
163             .set(STATE_INFO,
164                 new Ref<ConnectivityStateInfo>(ConnectivityStateInfo.forNonError(IDLE)));
165 
166         Ref<Subchannel> stickyRef = null;
167         if (stickinessState != null) {
168           subchannelAttrs.set(STICKY_REF, stickyRef = new Ref<Subchannel>(null));
169         }
170 
171         Subchannel subchannel = checkNotNull(
172             helper.createSubchannel(addressGroup, subchannelAttrs.build()), "subchannel");
173         if (stickyRef != null) {
174           stickyRef.value = subchannel;
175         }
176         subchannels.put(addressGroup, subchannel);
177         subchannel.requestConnection();
178       }
179 
180       // Shutdown subchannels for removed addresses.
181       for (EquivalentAddressGroup addressGroup : removedAddrs) {
182         Subchannel subchannel = subchannels.remove(addressGroup);
183         shutdownSubchannel(subchannel);
184       }
185 
186       updateBalancingState();
187     }
188 
189     @Override
handleNameResolutionError(Status error)190     public void handleNameResolutionError(Status error) {
191       // ready pickers aren't affected by status changes
192       updateBalancingState(TRANSIENT_FAILURE,
193           currentPicker instanceof ReadyPicker ? currentPicker : new EmptyPicker(error));
194     }
195 
196     @Override
handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo)197     public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
198       if (subchannels.get(subchannel.getAddresses()) != subchannel) {
199         return;
200       }
201       if (stateInfo.getState() == SHUTDOWN && stickinessState != null) {
202         stickinessState.remove(subchannel);
203       }
204       if (stateInfo.getState() == IDLE) {
205         subchannel.requestConnection();
206       }
207       getSubchannelStateInfoRef(subchannel).value = stateInfo;
208       updateBalancingState();
209     }
210 
shutdownSubchannel(Subchannel subchannel)211     private void shutdownSubchannel(Subchannel subchannel) {
212       subchannel.shutdown();
213       getSubchannelStateInfoRef(subchannel).value =
214           ConnectivityStateInfo.forNonError(SHUTDOWN);
215       if (stickinessState != null) {
216         stickinessState.remove(subchannel);
217       }
218     }
219 
220     @Override
shutdown()221     public void shutdown() {
222       for (Subchannel subchannel : getSubchannels()) {
223         shutdownSubchannel(subchannel);
224       }
225     }
226 
227     private static final Status EMPTY_OK = Status.OK.withDescription("no subchannels ready");
228 
229     /**
230      * Updates picker with the list of active subchannels (state == READY).
231      */
232     @SuppressWarnings("ReferenceEquality")
updateBalancingState()233     private void updateBalancingState() {
234       List<Subchannel> activeList = filterNonFailingSubchannels(getSubchannels());
235       if (activeList.isEmpty()) {
236         // No READY subchannels, determine aggregate state and error status
237         boolean isConnecting = false;
238         Status aggStatus = EMPTY_OK;
239         for (Subchannel subchannel : getSubchannels()) {
240           ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value;
241           // This subchannel IDLE is not because of channel IDLE_TIMEOUT,
242           // in which case LB is already shutdown.
243           // RRLB will request connection immediately on subchannel IDLE.
244           if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) {
245             isConnecting = true;
246           }
247           if (aggStatus == EMPTY_OK || !aggStatus.isOk()) {
248             aggStatus = stateInfo.getStatus();
249           }
250         }
251         updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE,
252             // If all subchannels are TRANSIENT_FAILURE, return the Status associated with
253             // an arbitrary subchannel, otherwise return OK.
254             new EmptyPicker(aggStatus));
255       } else {
256         // initialize the Picker to a random start index to ensure that a high frequency of Picker
257         // churn does not skew subchannel selection.
258         int startIndex = random.nextInt(activeList.size());
259         updateBalancingState(READY, new ReadyPicker(activeList, startIndex, stickinessState));
260       }
261     }
262 
updateBalancingState(ConnectivityState state, RoundRobinPicker picker)263     private void updateBalancingState(ConnectivityState state, RoundRobinPicker picker) {
264       if (state != currentState || !picker.isEquivalentTo(currentPicker)) {
265         helper.updateBalancingState(state, picker);
266         currentState = state;
267         currentPicker = picker;
268       }
269     }
270 
271     /**
272      * Filters out non-ready subchannels.
273      */
filterNonFailingSubchannels( Collection<Subchannel> subchannels)274     private static List<Subchannel> filterNonFailingSubchannels(
275         Collection<Subchannel> subchannels) {
276       List<Subchannel> readySubchannels = new ArrayList<>(subchannels.size());
277       for (Subchannel subchannel : subchannels) {
278         if (isReady(subchannel)) {
279           readySubchannels.add(subchannel);
280         }
281       }
282       return readySubchannels;
283     }
284 
285     /**
286      * Converts list of {@link EquivalentAddressGroup} to {@link EquivalentAddressGroup} set and
287      * remove all attributes.
288      */
stripAttrs(List<EquivalentAddressGroup> groupList)289     private static Set<EquivalentAddressGroup> stripAttrs(List<EquivalentAddressGroup> groupList) {
290       Set<EquivalentAddressGroup> addrs = new HashSet<EquivalentAddressGroup>(groupList.size());
291       for (EquivalentAddressGroup group : groupList) {
292         addrs.add(new EquivalentAddressGroup(group.getAddresses()));
293       }
294       return addrs;
295     }
296 
297     @VisibleForTesting
getSubchannels()298     Collection<Subchannel> getSubchannels() {
299       return subchannels.values();
300     }
301 
getSubchannelStateInfoRef( Subchannel subchannel)302     private static Ref<ConnectivityStateInfo> getSubchannelStateInfoRef(
303         Subchannel subchannel) {
304       return checkNotNull(subchannel.getAttributes().get(STATE_INFO), "STATE_INFO");
305     }
306 
307     // package-private to avoid synthetic access
isReady(Subchannel subchannel)308     static boolean isReady(Subchannel subchannel) {
309       return getSubchannelStateInfoRef(subchannel).value.getState() == READY;
310     }
311 
setsDifference(Set<T> a, Set<T> b)312     private static <T> Set<T> setsDifference(Set<T> a, Set<T> b) {
313       Set<T> aCopy = new HashSet<T>(a);
314       aCopy.removeAll(b);
315       return aCopy;
316     }
317 
getStickinessMapForTest()318     Map<String, Ref<Subchannel>> getStickinessMapForTest() {
319       if (stickinessState == null) {
320         return null;
321       }
322       return stickinessState.stickinessMap;
323     }
324 
325     /**
326      * Holds stickiness related states: The stickiness key, a registry mapping stickiness values to
327      * the associated Subchannel Ref, and a map from Subchannel to Subchannel Ref.
328      */
329     @VisibleForTesting
330     static final class StickinessState {
331       static final int MAX_ENTRIES = 1000;
332 
333       final Key<String> key;
334       final ConcurrentMap<String, Ref<Subchannel>> stickinessMap =
335           new ConcurrentHashMap<String, Ref<Subchannel>>();
336 
337       final Queue<String> evictionQueue = new ConcurrentLinkedQueue<String>();
338 
StickinessState(@onnull String stickinessKey)339       StickinessState(@Nonnull String stickinessKey) {
340         this.key = Key.of(stickinessKey, Metadata.ASCII_STRING_MARSHALLER);
341       }
342 
343       /**
344        * Returns the subchannel associated to the stickiness value if available in both the
345        * registry and the round robin list, otherwise associates the given subchannel with the
346        * stickiness key in the registry and returns the given subchannel.
347        */
348       @Nonnull
maybeRegister( String stickinessValue, @Nonnull Subchannel subchannel)349       Subchannel maybeRegister(
350           String stickinessValue, @Nonnull Subchannel subchannel) {
351         final Ref<Subchannel> newSubchannelRef = subchannel.getAttributes().get(STICKY_REF);
352         while (true) {
353           Ref<Subchannel> existingSubchannelRef =
354               stickinessMap.putIfAbsent(stickinessValue, newSubchannelRef);
355           if (existingSubchannelRef == null) {
356             // new entry
357             addToEvictionQueue(stickinessValue);
358             return subchannel;
359           } else {
360             // existing entry
361             Subchannel existingSubchannel = existingSubchannelRef.value;
362             if (existingSubchannel != null && isReady(existingSubchannel)) {
363               return existingSubchannel;
364             }
365           }
366           // existingSubchannelRef is not null but no longer valid, replace it
367           if (stickinessMap.replace(stickinessValue, existingSubchannelRef, newSubchannelRef)) {
368             return subchannel;
369           }
370           // another thread concurrently removed or updated the entry, try again
371         }
372       }
373 
addToEvictionQueue(String value)374       private void addToEvictionQueue(String value) {
375         String oldValue;
376         while (stickinessMap.size() >= MAX_ENTRIES && (oldValue = evictionQueue.poll()) != null) {
377           stickinessMap.remove(oldValue);
378         }
379         evictionQueue.add(value);
380       }
381 
382       /**
383        * Unregister the subchannel from StickinessState.
384        */
remove(Subchannel subchannel)385       void remove(Subchannel subchannel) {
386         subchannel.getAttributes().get(STICKY_REF).value = null;
387       }
388 
389       /**
390        * Gets the subchannel associated with the stickiness value if there is.
391        */
392       @Nullable
getSubchannel(String stickinessValue)393       Subchannel getSubchannel(String stickinessValue) {
394         Ref<Subchannel> subchannelRef = stickinessMap.get(stickinessValue);
395         if (subchannelRef != null) {
396           return subchannelRef.value;
397         }
398         return null;
399       }
400     }
401   }
402 
403   // Only subclasses are ReadyPicker or EmptyPicker
404   private abstract static class RoundRobinPicker extends SubchannelPicker {
isEquivalentTo(RoundRobinPicker picker)405     abstract boolean isEquivalentTo(RoundRobinPicker picker);
406   }
407 
408   @VisibleForTesting
409   static final class ReadyPicker extends RoundRobinPicker {
410     private static final AtomicIntegerFieldUpdater<ReadyPicker> indexUpdater =
411         AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index");
412 
413     private final List<Subchannel> list; // non-empty
414     @Nullable
415     private final RoundRobinLoadBalancer.StickinessState stickinessState;
416     @SuppressWarnings("unused")
417     private volatile int index;
418 
ReadyPicker(List<Subchannel> list, int startIndex, @Nullable RoundRobinLoadBalancer.StickinessState stickinessState)419     ReadyPicker(List<Subchannel> list, int startIndex,
420         @Nullable RoundRobinLoadBalancer.StickinessState stickinessState) {
421       Preconditions.checkArgument(!list.isEmpty(), "empty list");
422       this.list = list;
423       this.stickinessState = stickinessState;
424       this.index = startIndex - 1;
425     }
426 
427     @Override
pickSubchannel(PickSubchannelArgs args)428     public PickResult pickSubchannel(PickSubchannelArgs args) {
429       Subchannel subchannel = null;
430       if (stickinessState != null) {
431         String stickinessValue = args.getHeaders().get(stickinessState.key);
432         if (stickinessValue != null) {
433           subchannel = stickinessState.getSubchannel(stickinessValue);
434           if (subchannel == null || !RoundRobinLoadBalancer.isReady(subchannel)) {
435             subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel());
436           }
437         }
438       }
439 
440       return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel());
441     }
442 
nextSubchannel()443     private Subchannel nextSubchannel() {
444       int size = list.size();
445       int i = indexUpdater.incrementAndGet(this);
446       if (i >= size) {
447         int oldi = i;
448         i %= size;
449         indexUpdater.compareAndSet(this, oldi, i);
450       }
451       return list.get(i);
452     }
453 
454     @VisibleForTesting
getList()455     List<Subchannel> getList() {
456       return list;
457     }
458 
459     @Override
isEquivalentTo(RoundRobinPicker picker)460     boolean isEquivalentTo(RoundRobinPicker picker) {
461       if (!(picker instanceof ReadyPicker)) {
462         return false;
463       }
464       ReadyPicker other = (ReadyPicker) picker;
465       // the lists cannot contain duplicate subchannels
466       return other == this || (stickinessState == other.stickinessState
467           && list.size() == other.list.size()
468           && new HashSet<Subchannel>(list).containsAll(other.list));
469     }
470   }
471 
472   @VisibleForTesting
473   static final class EmptyPicker extends RoundRobinPicker {
474 
475     private final Status status;
476 
EmptyPicker(@onnull Status status)477     EmptyPicker(@Nonnull Status status) {
478       this.status = Preconditions.checkNotNull(status, "status");
479     }
480 
481     @Override
pickSubchannel(PickSubchannelArgs args)482     public PickResult pickSubchannel(PickSubchannelArgs args) {
483       return status.isOk() ? PickResult.withNoResult() : PickResult.withError(status);
484     }
485 
486     @Override
isEquivalentTo(RoundRobinPicker picker)487     boolean isEquivalentTo(RoundRobinPicker picker) {
488       return picker instanceof EmptyPicker && (Objects.equal(status, ((EmptyPicker) picker).status)
489           || (status.isOk() && ((EmptyPicker) picker).status.isOk()));
490     }
491   }
492 }
493