1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.util; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static io.grpc.ConnectivityState.CONNECTING; 21 import static io.grpc.ConnectivityState.IDLE; 22 import static io.grpc.ConnectivityState.READY; 23 import static io.grpc.ConnectivityState.SHUTDOWN; 24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 25 26 import com.google.common.annotations.VisibleForTesting; 27 import com.google.common.base.Objects; 28 import com.google.common.base.Preconditions; 29 30 import io.grpc.Attributes; 31 import io.grpc.ConnectivityState; 32 import io.grpc.ConnectivityStateInfo; 33 import io.grpc.EquivalentAddressGroup; 34 import io.grpc.ExperimentalApi; 35 import io.grpc.LoadBalancer; 36 import io.grpc.LoadBalancer.PickResult; 37 import io.grpc.LoadBalancer.PickSubchannelArgs; 38 import io.grpc.LoadBalancer.Subchannel; 39 import io.grpc.LoadBalancer.SubchannelPicker; 40 import io.grpc.Metadata; 41 import io.grpc.Metadata.Key; 42 import io.grpc.NameResolver; 43 import io.grpc.Status; 44 import io.grpc.internal.GrpcAttributes; 45 import io.grpc.internal.ServiceConfigUtil; 46 import java.util.ArrayList; 47 import java.util.Collection; 48 import java.util.HashMap; 49 import java.util.HashSet; 50 import java.util.List; 51 import java.util.Map; 52 import java.util.Queue; 53 import java.util.Random; 54 import java.util.Set; 55 import java.util.concurrent.ConcurrentHashMap; 56 import java.util.concurrent.ConcurrentLinkedQueue; 57 import java.util.concurrent.ConcurrentMap; 58 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; 59 import java.util.logging.Level; 60 import java.util.logging.Logger; 61 import javax.annotation.Nonnull; 62 import javax.annotation.Nullable; 63 64 /** 65 * A {@link LoadBalancer} that provides round-robin load balancing mechanism over the 66 * addresses from the {@link NameResolver}. The sub-lists received from the name resolver 67 * are considered to be an {@link EquivalentAddressGroup} and each of these sub-lists is 68 * what is then balanced across. 69 */ 70 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771") 71 public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory { 72 73 private static final RoundRobinLoadBalancerFactory INSTANCE = 74 new RoundRobinLoadBalancerFactory(); 75 RoundRobinLoadBalancerFactory()76 private RoundRobinLoadBalancerFactory() {} 77 78 /** 79 * A lighter weight Reference than AtomicReference. 80 */ 81 @VisibleForTesting 82 static final class Ref<T> { 83 T value; 84 Ref(T value)85 Ref(T value) { 86 this.value = value; 87 } 88 } 89 90 /** 91 * Gets the singleton instance of this factory. 92 */ getInstance()93 public static RoundRobinLoadBalancerFactory getInstance() { 94 return INSTANCE; 95 } 96 97 @Override newLoadBalancer(LoadBalancer.Helper helper)98 public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { 99 return new RoundRobinLoadBalancer(helper); 100 } 101 102 @VisibleForTesting 103 static final class RoundRobinLoadBalancer extends LoadBalancer { 104 @VisibleForTesting 105 static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO = 106 Attributes.Key.create("state-info"); 107 // package-private to avoid synthetic access 108 static final Attributes.Key<Ref<Subchannel>> STICKY_REF = Attributes.Key.create("sticky-ref"); 109 110 private static final Logger logger = Logger.getLogger(RoundRobinLoadBalancer.class.getName()); 111 112 private final Helper helper; 113 private final Map<EquivalentAddressGroup, Subchannel> subchannels = 114 new HashMap<EquivalentAddressGroup, Subchannel>(); 115 private final Random random; 116 117 private ConnectivityState currentState; 118 private RoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK); 119 120 @Nullable 121 private StickinessState stickinessState; 122 RoundRobinLoadBalancer(Helper helper)123 RoundRobinLoadBalancer(Helper helper) { 124 this.helper = checkNotNull(helper, "helper"); 125 this.random = new Random(); 126 } 127 128 @Override handleResolvedAddressGroups( List<EquivalentAddressGroup> servers, Attributes attributes)129 public void handleResolvedAddressGroups( 130 List<EquivalentAddressGroup> servers, Attributes attributes) { 131 Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet(); 132 Set<EquivalentAddressGroup> latestAddrs = stripAttrs(servers); 133 Set<EquivalentAddressGroup> addedAddrs = setsDifference(latestAddrs, currentAddrs); 134 Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs); 135 136 Map<String, Object> serviceConfig = 137 attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); 138 if (serviceConfig != null) { 139 String stickinessMetadataKey = 140 ServiceConfigUtil.getStickinessMetadataKeyFromServiceConfig(serviceConfig); 141 if (stickinessMetadataKey != null) { 142 if (stickinessMetadataKey.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { 143 logger.log( 144 Level.FINE, 145 "Binary stickiness header is not supported. The header '{0}' will be ignored", 146 stickinessMetadataKey); 147 } else if (stickinessState == null 148 || !stickinessState.key.name().equals(stickinessMetadataKey)) { 149 stickinessState = new StickinessState(stickinessMetadataKey); 150 } 151 } 152 } 153 154 // Create new subchannels for new addresses. 155 for (EquivalentAddressGroup addressGroup : addedAddrs) { 156 // NB(lukaszx0): we don't merge `attributes` with `subchannelAttr` because subchannel 157 // doesn't need them. They're describing the resolved server list but we're not taking 158 // any action based on this information. 159 Attributes.Builder subchannelAttrs = Attributes.newBuilder() 160 // NB(lukaszx0): because attributes are immutable we can't set new value for the key 161 // after creation but since we can mutate the values we leverage that and set 162 // AtomicReference which will allow mutating state info for given channel. 163 .set(STATE_INFO, 164 new Ref<ConnectivityStateInfo>(ConnectivityStateInfo.forNonError(IDLE))); 165 166 Ref<Subchannel> stickyRef = null; 167 if (stickinessState != null) { 168 subchannelAttrs.set(STICKY_REF, stickyRef = new Ref<Subchannel>(null)); 169 } 170 171 Subchannel subchannel = checkNotNull( 172 helper.createSubchannel(addressGroup, subchannelAttrs.build()), "subchannel"); 173 if (stickyRef != null) { 174 stickyRef.value = subchannel; 175 } 176 subchannels.put(addressGroup, subchannel); 177 subchannel.requestConnection(); 178 } 179 180 // Shutdown subchannels for removed addresses. 181 for (EquivalentAddressGroup addressGroup : removedAddrs) { 182 Subchannel subchannel = subchannels.remove(addressGroup); 183 shutdownSubchannel(subchannel); 184 } 185 186 updateBalancingState(); 187 } 188 189 @Override handleNameResolutionError(Status error)190 public void handleNameResolutionError(Status error) { 191 // ready pickers aren't affected by status changes 192 updateBalancingState(TRANSIENT_FAILURE, 193 currentPicker instanceof ReadyPicker ? currentPicker : new EmptyPicker(error)); 194 } 195 196 @Override handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo)197 public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { 198 if (subchannels.get(subchannel.getAddresses()) != subchannel) { 199 return; 200 } 201 if (stateInfo.getState() == SHUTDOWN && stickinessState != null) { 202 stickinessState.remove(subchannel); 203 } 204 if (stateInfo.getState() == IDLE) { 205 subchannel.requestConnection(); 206 } 207 getSubchannelStateInfoRef(subchannel).value = stateInfo; 208 updateBalancingState(); 209 } 210 shutdownSubchannel(Subchannel subchannel)211 private void shutdownSubchannel(Subchannel subchannel) { 212 subchannel.shutdown(); 213 getSubchannelStateInfoRef(subchannel).value = 214 ConnectivityStateInfo.forNonError(SHUTDOWN); 215 if (stickinessState != null) { 216 stickinessState.remove(subchannel); 217 } 218 } 219 220 @Override shutdown()221 public void shutdown() { 222 for (Subchannel subchannel : getSubchannels()) { 223 shutdownSubchannel(subchannel); 224 } 225 } 226 227 private static final Status EMPTY_OK = Status.OK.withDescription("no subchannels ready"); 228 229 /** 230 * Updates picker with the list of active subchannels (state == READY). 231 */ 232 @SuppressWarnings("ReferenceEquality") updateBalancingState()233 private void updateBalancingState() { 234 List<Subchannel> activeList = filterNonFailingSubchannels(getSubchannels()); 235 if (activeList.isEmpty()) { 236 // No READY subchannels, determine aggregate state and error status 237 boolean isConnecting = false; 238 Status aggStatus = EMPTY_OK; 239 for (Subchannel subchannel : getSubchannels()) { 240 ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value; 241 // This subchannel IDLE is not because of channel IDLE_TIMEOUT, 242 // in which case LB is already shutdown. 243 // RRLB will request connection immediately on subchannel IDLE. 244 if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) { 245 isConnecting = true; 246 } 247 if (aggStatus == EMPTY_OK || !aggStatus.isOk()) { 248 aggStatus = stateInfo.getStatus(); 249 } 250 } 251 updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE, 252 // If all subchannels are TRANSIENT_FAILURE, return the Status associated with 253 // an arbitrary subchannel, otherwise return OK. 254 new EmptyPicker(aggStatus)); 255 } else { 256 // initialize the Picker to a random start index to ensure that a high frequency of Picker 257 // churn does not skew subchannel selection. 258 int startIndex = random.nextInt(activeList.size()); 259 updateBalancingState(READY, new ReadyPicker(activeList, startIndex, stickinessState)); 260 } 261 } 262 updateBalancingState(ConnectivityState state, RoundRobinPicker picker)263 private void updateBalancingState(ConnectivityState state, RoundRobinPicker picker) { 264 if (state != currentState || !picker.isEquivalentTo(currentPicker)) { 265 helper.updateBalancingState(state, picker); 266 currentState = state; 267 currentPicker = picker; 268 } 269 } 270 271 /** 272 * Filters out non-ready subchannels. 273 */ filterNonFailingSubchannels( Collection<Subchannel> subchannels)274 private static List<Subchannel> filterNonFailingSubchannels( 275 Collection<Subchannel> subchannels) { 276 List<Subchannel> readySubchannels = new ArrayList<>(subchannels.size()); 277 for (Subchannel subchannel : subchannels) { 278 if (isReady(subchannel)) { 279 readySubchannels.add(subchannel); 280 } 281 } 282 return readySubchannels; 283 } 284 285 /** 286 * Converts list of {@link EquivalentAddressGroup} to {@link EquivalentAddressGroup} set and 287 * remove all attributes. 288 */ stripAttrs(List<EquivalentAddressGroup> groupList)289 private static Set<EquivalentAddressGroup> stripAttrs(List<EquivalentAddressGroup> groupList) { 290 Set<EquivalentAddressGroup> addrs = new HashSet<EquivalentAddressGroup>(groupList.size()); 291 for (EquivalentAddressGroup group : groupList) { 292 addrs.add(new EquivalentAddressGroup(group.getAddresses())); 293 } 294 return addrs; 295 } 296 297 @VisibleForTesting getSubchannels()298 Collection<Subchannel> getSubchannels() { 299 return subchannels.values(); 300 } 301 getSubchannelStateInfoRef( Subchannel subchannel)302 private static Ref<ConnectivityStateInfo> getSubchannelStateInfoRef( 303 Subchannel subchannel) { 304 return checkNotNull(subchannel.getAttributes().get(STATE_INFO), "STATE_INFO"); 305 } 306 307 // package-private to avoid synthetic access isReady(Subchannel subchannel)308 static boolean isReady(Subchannel subchannel) { 309 return getSubchannelStateInfoRef(subchannel).value.getState() == READY; 310 } 311 setsDifference(Set<T> a, Set<T> b)312 private static <T> Set<T> setsDifference(Set<T> a, Set<T> b) { 313 Set<T> aCopy = new HashSet<T>(a); 314 aCopy.removeAll(b); 315 return aCopy; 316 } 317 getStickinessMapForTest()318 Map<String, Ref<Subchannel>> getStickinessMapForTest() { 319 if (stickinessState == null) { 320 return null; 321 } 322 return stickinessState.stickinessMap; 323 } 324 325 /** 326 * Holds stickiness related states: The stickiness key, a registry mapping stickiness values to 327 * the associated Subchannel Ref, and a map from Subchannel to Subchannel Ref. 328 */ 329 @VisibleForTesting 330 static final class StickinessState { 331 static final int MAX_ENTRIES = 1000; 332 333 final Key<String> key; 334 final ConcurrentMap<String, Ref<Subchannel>> stickinessMap = 335 new ConcurrentHashMap<String, Ref<Subchannel>>(); 336 337 final Queue<String> evictionQueue = new ConcurrentLinkedQueue<String>(); 338 StickinessState(@onnull String stickinessKey)339 StickinessState(@Nonnull String stickinessKey) { 340 this.key = Key.of(stickinessKey, Metadata.ASCII_STRING_MARSHALLER); 341 } 342 343 /** 344 * Returns the subchannel associated to the stickiness value if available in both the 345 * registry and the round robin list, otherwise associates the given subchannel with the 346 * stickiness key in the registry and returns the given subchannel. 347 */ 348 @Nonnull maybeRegister( String stickinessValue, @Nonnull Subchannel subchannel)349 Subchannel maybeRegister( 350 String stickinessValue, @Nonnull Subchannel subchannel) { 351 final Ref<Subchannel> newSubchannelRef = subchannel.getAttributes().get(STICKY_REF); 352 while (true) { 353 Ref<Subchannel> existingSubchannelRef = 354 stickinessMap.putIfAbsent(stickinessValue, newSubchannelRef); 355 if (existingSubchannelRef == null) { 356 // new entry 357 addToEvictionQueue(stickinessValue); 358 return subchannel; 359 } else { 360 // existing entry 361 Subchannel existingSubchannel = existingSubchannelRef.value; 362 if (existingSubchannel != null && isReady(existingSubchannel)) { 363 return existingSubchannel; 364 } 365 } 366 // existingSubchannelRef is not null but no longer valid, replace it 367 if (stickinessMap.replace(stickinessValue, existingSubchannelRef, newSubchannelRef)) { 368 return subchannel; 369 } 370 // another thread concurrently removed or updated the entry, try again 371 } 372 } 373 addToEvictionQueue(String value)374 private void addToEvictionQueue(String value) { 375 String oldValue; 376 while (stickinessMap.size() >= MAX_ENTRIES && (oldValue = evictionQueue.poll()) != null) { 377 stickinessMap.remove(oldValue); 378 } 379 evictionQueue.add(value); 380 } 381 382 /** 383 * Unregister the subchannel from StickinessState. 384 */ remove(Subchannel subchannel)385 void remove(Subchannel subchannel) { 386 subchannel.getAttributes().get(STICKY_REF).value = null; 387 } 388 389 /** 390 * Gets the subchannel associated with the stickiness value if there is. 391 */ 392 @Nullable getSubchannel(String stickinessValue)393 Subchannel getSubchannel(String stickinessValue) { 394 Ref<Subchannel> subchannelRef = stickinessMap.get(stickinessValue); 395 if (subchannelRef != null) { 396 return subchannelRef.value; 397 } 398 return null; 399 } 400 } 401 } 402 403 // Only subclasses are ReadyPicker or EmptyPicker 404 private abstract static class RoundRobinPicker extends SubchannelPicker { isEquivalentTo(RoundRobinPicker picker)405 abstract boolean isEquivalentTo(RoundRobinPicker picker); 406 } 407 408 @VisibleForTesting 409 static final class ReadyPicker extends RoundRobinPicker { 410 private static final AtomicIntegerFieldUpdater<ReadyPicker> indexUpdater = 411 AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index"); 412 413 private final List<Subchannel> list; // non-empty 414 @Nullable 415 private final RoundRobinLoadBalancer.StickinessState stickinessState; 416 @SuppressWarnings("unused") 417 private volatile int index; 418 ReadyPicker(List<Subchannel> list, int startIndex, @Nullable RoundRobinLoadBalancer.StickinessState stickinessState)419 ReadyPicker(List<Subchannel> list, int startIndex, 420 @Nullable RoundRobinLoadBalancer.StickinessState stickinessState) { 421 Preconditions.checkArgument(!list.isEmpty(), "empty list"); 422 this.list = list; 423 this.stickinessState = stickinessState; 424 this.index = startIndex - 1; 425 } 426 427 @Override pickSubchannel(PickSubchannelArgs args)428 public PickResult pickSubchannel(PickSubchannelArgs args) { 429 Subchannel subchannel = null; 430 if (stickinessState != null) { 431 String stickinessValue = args.getHeaders().get(stickinessState.key); 432 if (stickinessValue != null) { 433 subchannel = stickinessState.getSubchannel(stickinessValue); 434 if (subchannel == null || !RoundRobinLoadBalancer.isReady(subchannel)) { 435 subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel()); 436 } 437 } 438 } 439 440 return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel()); 441 } 442 nextSubchannel()443 private Subchannel nextSubchannel() { 444 int size = list.size(); 445 int i = indexUpdater.incrementAndGet(this); 446 if (i >= size) { 447 int oldi = i; 448 i %= size; 449 indexUpdater.compareAndSet(this, oldi, i); 450 } 451 return list.get(i); 452 } 453 454 @VisibleForTesting getList()455 List<Subchannel> getList() { 456 return list; 457 } 458 459 @Override isEquivalentTo(RoundRobinPicker picker)460 boolean isEquivalentTo(RoundRobinPicker picker) { 461 if (!(picker instanceof ReadyPicker)) { 462 return false; 463 } 464 ReadyPicker other = (ReadyPicker) picker; 465 // the lists cannot contain duplicate subchannels 466 return other == this || (stickinessState == other.stickinessState 467 && list.size() == other.list.size() 468 && new HashSet<Subchannel>(list).containsAll(other.list)); 469 } 470 } 471 472 @VisibleForTesting 473 static final class EmptyPicker extends RoundRobinPicker { 474 475 private final Status status; 476 EmptyPicker(@onnull Status status)477 EmptyPicker(@Nonnull Status status) { 478 this.status = Preconditions.checkNotNull(status, "status"); 479 } 480 481 @Override pickSubchannel(PickSubchannelArgs args)482 public PickResult pickSubchannel(PickSubchannelArgs args) { 483 return status.isOk() ? PickResult.withNoResult() : PickResult.withError(status); 484 } 485 486 @Override isEquivalentTo(RoundRobinPicker picker)487 boolean isEquivalentTo(RoundRobinPicker picker) { 488 return picker instanceof EmptyPicker && (Objects.equal(status, ((EmptyPicker) picker).status) 489 || (status.isOk() && ((EmptyPicker) picker).status.isOk())); 490 } 491 } 492 } 493