1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.grpclb; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static io.grpc.ConnectivityState.CONNECTING; 23 import static io.grpc.ConnectivityState.IDLE; 24 import static io.grpc.ConnectivityState.READY; 25 import static io.grpc.ConnectivityState.SHUTDOWN; 26 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 27 28 import com.google.common.annotations.VisibleForTesting; 29 import com.google.common.base.MoreObjects; 30 import com.google.common.base.Objects; 31 import com.google.common.base.Stopwatch; 32 import com.google.protobuf.util.Durations; 33 import io.grpc.Attributes; 34 import io.grpc.ChannelLogger; 35 import io.grpc.ChannelLogger.ChannelLogLevel; 36 import io.grpc.ConnectivityState; 37 import io.grpc.ConnectivityStateInfo; 38 import io.grpc.Context; 39 import io.grpc.EquivalentAddressGroup; 40 import io.grpc.LoadBalancer.CreateSubchannelArgs; 41 import io.grpc.LoadBalancer.Helper; 42 import io.grpc.LoadBalancer.PickResult; 43 import io.grpc.LoadBalancer.PickSubchannelArgs; 44 import io.grpc.LoadBalancer.Subchannel; 45 import io.grpc.LoadBalancer.SubchannelPicker; 46 import io.grpc.LoadBalancer.SubchannelStateListener; 47 import io.grpc.ManagedChannel; 48 import io.grpc.Metadata; 49 import io.grpc.Status; 50 import io.grpc.SynchronizationContext; 51 import io.grpc.SynchronizationContext.ScheduledHandle; 52 import io.grpc.grpclb.SubchannelPool.PooledSubchannelStateListener; 53 import io.grpc.internal.BackoffPolicy; 54 import io.grpc.internal.TimeProvider; 55 import io.grpc.lb.v1.ClientStats; 56 import io.grpc.lb.v1.InitialLoadBalanceRequest; 57 import io.grpc.lb.v1.InitialLoadBalanceResponse; 58 import io.grpc.lb.v1.LoadBalanceRequest; 59 import io.grpc.lb.v1.LoadBalanceResponse; 60 import io.grpc.lb.v1.LoadBalanceResponse.LoadBalanceResponseTypeCase; 61 import io.grpc.lb.v1.LoadBalancerGrpc; 62 import io.grpc.lb.v1.Server; 63 import io.grpc.lb.v1.ServerList; 64 import io.grpc.stub.StreamObserver; 65 import java.net.InetAddress; 66 import java.net.InetSocketAddress; 67 import java.net.UnknownHostException; 68 import java.util.ArrayList; 69 import java.util.Arrays; 70 import java.util.Collections; 71 import java.util.HashMap; 72 import java.util.List; 73 import java.util.Map; 74 import java.util.concurrent.ScheduledExecutorService; 75 import java.util.concurrent.TimeUnit; 76 import java.util.concurrent.atomic.AtomicBoolean; 77 import java.util.concurrent.atomic.AtomicReference; 78 import javax.annotation.Nullable; 79 import javax.annotation.concurrent.NotThreadSafe; 80 81 /** 82 * The states of a GRPCLB working session of {@link GrpclbLoadBalancer}. Created when 83 * GrpclbLoadBalancer switches to GRPCLB mode. Closed and discarded when GrpclbLoadBalancer 84 * switches away from GRPCLB mode. 85 */ 86 @NotThreadSafe 87 final class GrpclbState { 88 static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); 89 private static final Attributes LB_PROVIDED_BACKEND_ATTRS = 90 Attributes.newBuilder().set(GrpclbConstants.ATTR_LB_PROVIDED_BACKEND, true).build(); 91 92 // Temporary workaround to reduce log spam for a grpclb server that incessantly sends updates 93 // Tracked by b/198440401 94 static final boolean SHOULD_LOG_SERVER_LISTS = 95 Boolean.parseBoolean(System.getProperty("io.grpc.grpclb.LogServerLists", "true")); 96 97 @VisibleForTesting 98 static final PickResult DROP_PICK_RESULT = 99 PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer")); 100 @VisibleForTesting 101 static final Status NO_AVAILABLE_BACKENDS_STATUS = 102 Status.UNAVAILABLE.withDescription("LoadBalancer responded without any backends"); 103 @VisibleForTesting 104 static final Status BALANCER_TIMEOUT_STATUS = 105 Status.UNAVAILABLE.withDescription("Timeout waiting for remote balancer"); 106 @VisibleForTesting 107 static final Status BALANCER_REQUESTED_FALLBACK_STATUS = 108 Status.UNAVAILABLE.withDescription("Fallback requested by balancer"); 109 @VisibleForTesting 110 static final Status NO_FALLBACK_BACKENDS_STATUS = 111 Status.UNAVAILABLE.withDescription("Unable to fallback, no fallback addresses found"); 112 // This error status should never be propagated to RPC failures, as "no backend or balancer 113 // addresses found" should be directly handled as a name resolution error. So in cases of no 114 // balancer address, fallback should never fail. 115 private static final Status NO_LB_ADDRESS_PROVIDED_STATUS = 116 Status.UNAVAILABLE.withDescription("No balancer address found"); 117 118 119 @VisibleForTesting 120 static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { 121 @Override 122 public PickResult picked(Metadata headers) { 123 return PickResult.withNoResult(); 124 } 125 126 @Override 127 public String toString() { 128 return "BUFFER_ENTRY"; 129 } 130 }; 131 @VisibleForTesting 132 static final String NO_USE_AUTHORITY_SUFFIX = "-notIntendedToBeUsed"; 133 134 enum Mode { 135 ROUND_ROBIN, 136 PICK_FIRST, 137 } 138 139 private final String serviceName; 140 private final long fallbackTimeoutMs; 141 private final Helper helper; 142 private final Context context; 143 private final SynchronizationContext syncContext; 144 @Nullable 145 private final SubchannelPool subchannelPool; 146 private final TimeProvider time; 147 private final Stopwatch stopwatch; 148 private final ScheduledExecutorService timerService; 149 150 private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO = 151 Attributes.Key.create("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo"); 152 private final BackoffPolicy.Provider backoffPolicyProvider; 153 private final ChannelLogger logger; 154 155 // Scheduled only once. Never reset. 156 @Nullable 157 private ScheduledHandle fallbackTimer; 158 private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList(); 159 private boolean usingFallbackBackends; 160 // Reason to fallback, will be used as RPC's error message if fail to fallback (e.g., no 161 // fallback addresses found). 162 @Nullable 163 private Status fallbackReason; 164 // True if the current balancer has returned a serverlist. Will be reset to false when lost 165 // connection to a balancer. 166 private boolean balancerWorking; 167 @Nullable 168 private BackoffPolicy lbRpcRetryPolicy; 169 @Nullable 170 private ScheduledHandle lbRpcRetryTimer; 171 172 @Nullable 173 private ManagedChannel lbCommChannel; 174 175 @Nullable 176 private LbStream lbStream; 177 private Map<List<EquivalentAddressGroup>, Subchannel> subchannels = Collections.emptyMap(); 178 private final GrpclbConfig config; 179 180 // Has the same size as the round-robin list from the balancer. 181 // A drop entry from the round-robin list becomes a DropEntry here. 182 // A backend entry from the robin-robin list becomes a null here. 183 private List<DropEntry> dropList = Collections.emptyList(); 184 // Contains only non-drop, i.e., backends from the round-robin list from the balancer. 185 private List<BackendEntry> backendList = Collections.emptyList(); 186 private RoundRobinPicker currentPicker = 187 new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY)); 188 private boolean requestConnectionPending; 189 GrpclbState( GrpclbConfig config, Helper helper, Context context, SubchannelPool subchannelPool, TimeProvider time, Stopwatch stopwatch, BackoffPolicy.Provider backoffPolicyProvider)190 GrpclbState( 191 GrpclbConfig config, 192 Helper helper, 193 Context context, 194 SubchannelPool subchannelPool, 195 TimeProvider time, 196 Stopwatch stopwatch, 197 BackoffPolicy.Provider backoffPolicyProvider) { 198 this.config = checkNotNull(config, "config"); 199 this.helper = checkNotNull(helper, "helper"); 200 this.context = checkNotNull(context, "context"); 201 this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); 202 if (config.getMode() == Mode.ROUND_ROBIN) { 203 this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool"); 204 subchannelPool.registerListener( 205 new PooledSubchannelStateListener() { 206 @Override 207 public void onSubchannelState( 208 Subchannel subchannel, ConnectivityStateInfo newState) { 209 handleSubchannelState(subchannel, newState); 210 } 211 }); 212 } else { 213 this.subchannelPool = null; 214 } 215 this.time = checkNotNull(time, "time provider"); 216 this.stopwatch = checkNotNull(stopwatch, "stopwatch"); 217 this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService"); 218 this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); 219 if (config.getServiceName() != null) { 220 this.serviceName = config.getServiceName(); 221 } else { 222 this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority"); 223 } 224 this.fallbackTimeoutMs = config.getFallbackTimeoutMs(); 225 this.logger = checkNotNull(helper.getChannelLogger(), "logger"); 226 logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Created", serviceName); 227 } 228 handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState)229 void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { 230 if (newState.getState() == SHUTDOWN || !subchannels.containsValue(subchannel)) { 231 return; 232 } 233 if (config.getMode() == Mode.ROUND_ROBIN && newState.getState() == IDLE) { 234 subchannel.requestConnection(); 235 } 236 if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) { 237 helper.refreshNameResolution(); 238 } 239 240 AtomicReference<ConnectivityStateInfo> stateInfoRef = 241 subchannel.getAttributes().get(STATE_INFO); 242 // If all RR servers are unhealthy, it's possible that at least one connection is CONNECTING at 243 // every moment which causes RR to stay in CONNECTING. It's better to keep the TRANSIENT_FAILURE 244 // state in that case so that fail-fast RPCs can fail fast. 245 boolean keepState = 246 config.getMode() == Mode.ROUND_ROBIN 247 && stateInfoRef.get().getState() == TRANSIENT_FAILURE 248 && (newState.getState() == CONNECTING || newState.getState() == IDLE); 249 if (!keepState) { 250 stateInfoRef.set(newState); 251 maybeUseFallbackBackends(); 252 maybeUpdatePicker(); 253 } 254 } 255 256 /** 257 * Handle new addresses of the balancer and backends from the resolver, and create connection if 258 * not yet connected. 259 */ handleAddresses( List<EquivalentAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers)260 void handleAddresses( 261 List<EquivalentAddressGroup> newLbAddressGroups, 262 List<EquivalentAddressGroup> newBackendServers) { 263 logger.log( 264 ChannelLogLevel.DEBUG, 265 "[grpclb-<{0}>] Resolved addresses: lb addresses {1}, backends: {2}", 266 serviceName, 267 newLbAddressGroups, 268 newBackendServers); 269 fallbackBackendList = newBackendServers; 270 if (newLbAddressGroups.isEmpty()) { 271 // No balancer address: close existing balancer connection and prepare to enter fallback 272 // mode. If there is no successful backend connection, it enters fallback mode immediately. 273 // Otherwise, fallback does not happen until backend connections are lost. This behavior 274 // might be different from other languages (e.g., existing balancer connection is not 275 // closed in C-core), but we aren't changing it at this time. 276 shutdownLbComm(); 277 if (!usingFallbackBackends) { 278 fallbackReason = NO_LB_ADDRESS_PROVIDED_STATUS; 279 cancelFallbackTimer(); 280 maybeUseFallbackBackends(); 281 } 282 } else { 283 startLbComm(newLbAddressGroups); 284 // Avoid creating a new RPC just because the addresses were updated, as it can cause a 285 // stampeding herd. The current RPC may be on a connection to an address not present in 286 // newLbAddressGroups, but we're considering that "okay". If we detected the RPC is to an 287 // outdated backend, we could choose to re-create the RPC. 288 if (lbStream == null) { 289 cancelLbRpcRetryTimer(); 290 startLbRpc(); 291 } 292 // Start the fallback timer if it's never started and we are not already using fallback 293 // backends. 294 if (fallbackTimer == null && !usingFallbackBackends) { 295 fallbackTimer = 296 syncContext.schedule( 297 new FallbackModeTask(BALANCER_TIMEOUT_STATUS), 298 fallbackTimeoutMs, 299 TimeUnit.MILLISECONDS, 300 timerService); 301 } 302 } 303 if (usingFallbackBackends) { 304 // Populate the new fallback backends to round-robin list. 305 useFallbackBackends(); 306 } 307 maybeUpdatePicker(); 308 } 309 requestConnection()310 void requestConnection() { 311 requestConnectionPending = true; 312 for (RoundRobinEntry entry : currentPicker.pickList) { 313 if (entry instanceof IdleSubchannelEntry) { 314 ((IdleSubchannelEntry) entry).subchannel.requestConnection(); 315 requestConnectionPending = false; 316 } 317 } 318 } 319 maybeUseFallbackBackends()320 private void maybeUseFallbackBackends() { 321 if (balancerWorking || usingFallbackBackends) { 322 return; 323 } 324 // Balancer RPC should have either been broken or timed out. 325 checkState(fallbackReason != null, "no reason to fallback"); 326 for (Subchannel subchannel : subchannels.values()) { 327 ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get(); 328 if (stateInfo.getState() == READY) { 329 return; 330 } 331 // If we do have balancer-provided backends, use one of its error in the error message if 332 // fail to fallback. 333 if (stateInfo.getState() == TRANSIENT_FAILURE) { 334 fallbackReason = stateInfo.getStatus(); 335 } 336 } 337 // Fallback conditions met 338 useFallbackBackends(); 339 } 340 341 /** 342 * Populate backend servers to be used from the fallback backends. 343 */ useFallbackBackends()344 private void useFallbackBackends() { 345 usingFallbackBackends = true; 346 logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Using fallback backends", serviceName); 347 348 List<DropEntry> newDropList = new ArrayList<>(); 349 List<BackendAddressGroup> newBackendAddrList = new ArrayList<>(); 350 for (EquivalentAddressGroup eag : fallbackBackendList) { 351 newDropList.add(null); 352 newBackendAddrList.add(new BackendAddressGroup(eag, null)); 353 } 354 updateServerList(newDropList, newBackendAddrList, null); 355 } 356 shutdownLbComm()357 private void shutdownLbComm() { 358 if (lbCommChannel != null) { 359 lbCommChannel.shutdown(); 360 lbCommChannel = null; 361 } 362 shutdownLbRpc(); 363 } 364 shutdownLbRpc()365 private void shutdownLbRpc() { 366 if (lbStream != null) { 367 lbStream.close(Status.CANCELLED.withDescription("balancer shutdown").asException()); 368 // lbStream will be set to null in LbStream.cleanup() 369 } 370 } 371 startLbComm(List<EquivalentAddressGroup> overrideAuthorityEags)372 private void startLbComm(List<EquivalentAddressGroup> overrideAuthorityEags) { 373 checkNotNull(overrideAuthorityEags, "overrideAuthorityEags"); 374 assert !overrideAuthorityEags.isEmpty(); 375 String doNotUseAuthority = overrideAuthorityEags.get(0).getAttributes() 376 .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE) + NO_USE_AUTHORITY_SUFFIX; 377 if (lbCommChannel == null) { 378 lbCommChannel = helper.createOobChannel(overrideAuthorityEags, doNotUseAuthority); 379 logger.log( 380 ChannelLogLevel.DEBUG, 381 "[grpclb-<{0}>] Created grpclb channel: EAG={1}", 382 serviceName, 383 overrideAuthorityEags); 384 } else { 385 helper.updateOobChannelAddresses(lbCommChannel, overrideAuthorityEags); 386 } 387 } 388 startLbRpc()389 private void startLbRpc() { 390 checkState(lbStream == null, "previous lbStream has not been cleared yet"); 391 LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel); 392 lbStream = new LbStream(stub); 393 Context prevContext = context.attach(); 394 try { 395 lbStream.start(); 396 } finally { 397 context.detach(prevContext); 398 } 399 stopwatch.reset().start(); 400 401 LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder() 402 .setInitialRequest(InitialLoadBalanceRequest.newBuilder() 403 .setName(serviceName).build()) 404 .build(); 405 logger.log( 406 ChannelLogLevel.DEBUG, 407 "[grpclb-<{0}>] Sent initial grpclb request {1}", serviceName, initRequest); 408 try { 409 lbStream.lbRequestWriter.onNext(initRequest); 410 } catch (Exception e) { 411 lbStream.close(e); 412 } 413 } 414 cancelFallbackTimer()415 private void cancelFallbackTimer() { 416 if (fallbackTimer != null) { 417 fallbackTimer.cancel(); 418 } 419 } 420 cancelLbRpcRetryTimer()421 private void cancelLbRpcRetryTimer() { 422 if (lbRpcRetryTimer != null) { 423 lbRpcRetryTimer.cancel(); 424 lbRpcRetryTimer = null; 425 } 426 } 427 shutdown()428 void shutdown() { 429 logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Shutdown", serviceName); 430 shutdownLbComm(); 431 switch (config.getMode()) { 432 case ROUND_ROBIN: 433 // We close the subchannels through subchannelPool instead of helper just for convenience of 434 // testing. 435 for (Subchannel subchannel : subchannels.values()) { 436 returnSubchannelToPool(subchannel); 437 } 438 subchannelPool.clear(); 439 break; 440 case PICK_FIRST: 441 if (!subchannels.isEmpty()) { 442 checkState(subchannels.size() == 1, "Excessive Subchannels: %s", subchannels); 443 subchannels.values().iterator().next().shutdown(); 444 } 445 break; 446 default: 447 throw new AssertionError("Missing case for " + config.getMode()); 448 } 449 subchannels = Collections.emptyMap(); 450 cancelFallbackTimer(); 451 cancelLbRpcRetryTimer(); 452 } 453 propagateError(Status status)454 void propagateError(Status status) { 455 logger.log(ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Error: {1}", serviceName, status); 456 if (backendList.isEmpty()) { 457 Status error = 458 Status.UNAVAILABLE.withCause(status.getCause()).withDescription(status.getDescription()); 459 maybeUpdatePicker( 460 TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(error)))); 461 } 462 } 463 returnSubchannelToPool(Subchannel subchannel)464 private void returnSubchannelToPool(Subchannel subchannel) { 465 subchannelPool.returnSubchannel(subchannel, subchannel.getAttributes().get(STATE_INFO).get()); 466 } 467 468 @VisibleForTesting 469 @Nullable getLoadRecorder()470 GrpclbClientLoadRecorder getLoadRecorder() { 471 if (lbStream == null) { 472 return null; 473 } 474 return lbStream.loadRecorder; 475 } 476 477 /** 478 * Populate backend servers to be used based on the given list of addresses. 479 */ updateServerList( List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList, @Nullable GrpclbClientLoadRecorder loadRecorder)480 private void updateServerList( 481 List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList, 482 @Nullable GrpclbClientLoadRecorder loadRecorder) { 483 HashMap<List<EquivalentAddressGroup>, Subchannel> newSubchannelMap = 484 new HashMap<>(); 485 List<BackendEntry> newBackendList = new ArrayList<>(); 486 487 switch (config.getMode()) { 488 case ROUND_ROBIN: 489 for (BackendAddressGroup backendAddr : newBackendAddrList) { 490 EquivalentAddressGroup eag = backendAddr.getAddresses(); 491 List<EquivalentAddressGroup> eagAsList = Collections.singletonList(eag); 492 Subchannel subchannel = newSubchannelMap.get(eagAsList); 493 if (subchannel == null) { 494 subchannel = subchannels.get(eagAsList); 495 if (subchannel == null) { 496 subchannel = subchannelPool.takeOrCreateSubchannel(eag, createSubchannelAttrs()); 497 subchannel.requestConnection(); 498 } 499 newSubchannelMap.put(eagAsList, subchannel); 500 } 501 BackendEntry entry; 502 // Only picks with tokens are reported to LoadRecorder 503 if (backendAddr.getToken() == null) { 504 entry = new BackendEntry(subchannel); 505 } else { 506 entry = new BackendEntry(subchannel, loadRecorder, backendAddr.getToken()); 507 } 508 newBackendList.add(entry); 509 } 510 // Close Subchannels whose addresses have been delisted 511 for (Map.Entry<List<EquivalentAddressGroup>, Subchannel> entry : subchannels.entrySet()) { 512 List<EquivalentAddressGroup> eagList = entry.getKey(); 513 if (!newSubchannelMap.containsKey(eagList)) { 514 returnSubchannelToPool(entry.getValue()); 515 } 516 } 517 subchannels = Collections.unmodifiableMap(newSubchannelMap); 518 break; 519 case PICK_FIRST: 520 checkState(subchannels.size() <= 1, "Unexpected Subchannel count: %s", subchannels); 521 final Subchannel subchannel; 522 if (newBackendAddrList.isEmpty()) { 523 if (subchannels.size() == 1) { 524 subchannel = subchannels.values().iterator().next(); 525 subchannel.shutdown(); 526 subchannels = Collections.emptyMap(); 527 } 528 break; 529 } 530 List<EquivalentAddressGroup> eagList = new ArrayList<>(); 531 // Because for PICK_FIRST, we create a single Subchannel for all addresses, we have to 532 // attach the tokens to the EAG attributes and use TokenAttachingLoadRecorder to put them on 533 // headers. 534 // 535 // The PICK_FIRST code path doesn't cache Subchannels. 536 for (BackendAddressGroup bag : newBackendAddrList) { 537 EquivalentAddressGroup origEag = bag.getAddresses(); 538 Attributes eagAttrs = origEag.getAttributes(); 539 if (bag.getToken() != null) { 540 eagAttrs = eagAttrs.toBuilder() 541 .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, bag.getToken()).build(); 542 } 543 eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs)); 544 } 545 if (subchannels.isEmpty()) { 546 subchannel = 547 helper.createSubchannel( 548 CreateSubchannelArgs.newBuilder() 549 .setAddresses(eagList) 550 .setAttributes(createSubchannelAttrs()) 551 .build()); 552 subchannel.start(new SubchannelStateListener() { 553 @Override 554 public void onSubchannelState(ConnectivityStateInfo newState) { 555 handleSubchannelState(subchannel, newState); 556 } 557 }); 558 if (requestConnectionPending) { 559 subchannel.requestConnection(); 560 requestConnectionPending = false; 561 } 562 } else { 563 subchannel = subchannels.values().iterator().next(); 564 subchannel.updateAddresses(eagList); 565 } 566 subchannels = Collections.singletonMap(eagList, subchannel); 567 newBackendList.add( 568 new BackendEntry(subchannel, new TokenAttachingTracerFactory(loadRecorder))); 569 break; 570 default: 571 throw new AssertionError("Missing case for " + config.getMode()); 572 } 573 574 dropList = Collections.unmodifiableList(newDropList); 575 backendList = Collections.unmodifiableList(newBackendList); 576 } 577 578 @VisibleForTesting 579 class FallbackModeTask implements Runnable { 580 private final Status reason; 581 FallbackModeTask(Status reason)582 private FallbackModeTask(Status reason) { 583 this.reason = reason; 584 } 585 586 @Override run()587 public void run() { 588 // Timer should have been cancelled if entered fallback early. 589 checkState(!usingFallbackBackends, "already in fallback"); 590 fallbackReason = reason; 591 maybeUseFallbackBackends(); 592 maybeUpdatePicker(); 593 } 594 } 595 596 @VisibleForTesting 597 class LbRpcRetryTask implements Runnable { 598 @Override run()599 public void run() { 600 startLbRpc(); 601 } 602 } 603 604 @VisibleForTesting 605 static class LoadReportingTask implements Runnable { 606 private final LbStream stream; 607 LoadReportingTask(LbStream stream)608 LoadReportingTask(LbStream stream) { 609 this.stream = stream; 610 } 611 612 @Override run()613 public void run() { 614 stream.loadReportTimer = null; 615 stream.sendLoadReport(); 616 } 617 } 618 619 private class LbStream implements StreamObserver<LoadBalanceResponse> { 620 final GrpclbClientLoadRecorder loadRecorder; 621 final LoadBalancerGrpc.LoadBalancerStub stub; 622 StreamObserver<LoadBalanceRequest> lbRequestWriter; 623 624 // These fields are only accessed from helper.runSerialized() 625 boolean initialResponseReceived; 626 boolean closed; 627 long loadReportIntervalMillis = -1; 628 ScheduledHandle loadReportTimer; 629 LbStream(LoadBalancerGrpc.LoadBalancerStub stub)630 LbStream(LoadBalancerGrpc.LoadBalancerStub stub) { 631 this.stub = checkNotNull(stub, "stub"); 632 // Stats data only valid for current LbStream. We do not carry over data from previous 633 // stream. 634 loadRecorder = new GrpclbClientLoadRecorder(time); 635 } 636 start()637 void start() { 638 lbRequestWriter = stub.withWaitForReady().balanceLoad(this); 639 } 640 onNext(final LoadBalanceResponse response)641 @Override public void onNext(final LoadBalanceResponse response) { 642 syncContext.execute(new Runnable() { 643 @Override 644 public void run() { 645 handleResponse(response); 646 } 647 }); 648 } 649 onError(final Throwable error)650 @Override public void onError(final Throwable error) { 651 syncContext.execute(new Runnable() { 652 @Override 653 public void run() { 654 handleStreamClosed(Status.fromThrowable(error) 655 .augmentDescription("Stream to GRPCLB LoadBalancer had an error")); 656 } 657 }); 658 } 659 onCompleted()660 @Override public void onCompleted() { 661 syncContext.execute(new Runnable() { 662 @Override 663 public void run() { 664 handleStreamClosed( 665 Status.UNAVAILABLE.withDescription("Stream to GRPCLB LoadBalancer was closed")); 666 } 667 }); 668 } 669 670 // Following methods must be run in helper.runSerialized() 671 sendLoadReport()672 private void sendLoadReport() { 673 if (closed) { 674 return; 675 } 676 ClientStats stats = loadRecorder.generateLoadReport(); 677 // TODO(zhangkun83): flow control? 678 try { 679 lbRequestWriter.onNext(LoadBalanceRequest.newBuilder().setClientStats(stats).build()); 680 scheduleNextLoadReport(); 681 } catch (Exception e) { 682 close(e); 683 } 684 } 685 scheduleNextLoadReport()686 private void scheduleNextLoadReport() { 687 if (loadReportIntervalMillis > 0) { 688 loadReportTimer = syncContext.schedule( 689 new LoadReportingTask(this), loadReportIntervalMillis, TimeUnit.MILLISECONDS, 690 timerService); 691 } 692 } 693 handleResponse(LoadBalanceResponse response)694 private void handleResponse(LoadBalanceResponse response) { 695 if (closed) { 696 return; 697 } 698 699 LoadBalanceResponseTypeCase typeCase = response.getLoadBalanceResponseTypeCase(); 700 if (!initialResponseReceived) { 701 logger.log( 702 ChannelLogLevel.INFO, 703 "[grpclb-<{0}>] Got an LB initial response: {1}", serviceName, response); 704 if (typeCase != LoadBalanceResponseTypeCase.INITIAL_RESPONSE) { 705 logger.log( 706 ChannelLogLevel.WARNING, 707 "[grpclb-<{0}>] Received a response without initial response", 708 serviceName); 709 return; 710 } 711 initialResponseReceived = true; 712 InitialLoadBalanceResponse initialResponse = response.getInitialResponse(); 713 loadReportIntervalMillis = 714 Durations.toMillis(initialResponse.getClientStatsReportInterval()); 715 scheduleNextLoadReport(); 716 return; 717 } 718 if (SHOULD_LOG_SERVER_LISTS) { 719 logger.log( 720 ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Got an LB response: {1}", serviceName, response); 721 } else { 722 logger.log(ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Got an LB response", serviceName); 723 } 724 725 if (typeCase == LoadBalanceResponseTypeCase.FALLBACK_RESPONSE) { 726 // Force entering fallback requested by balancer. 727 cancelFallbackTimer(); 728 fallbackReason = BALANCER_REQUESTED_FALLBACK_STATUS; 729 useFallbackBackends(); 730 maybeUpdatePicker(); 731 return; 732 } else if (typeCase != LoadBalanceResponseTypeCase.SERVER_LIST) { 733 logger.log( 734 ChannelLogLevel.WARNING, 735 "[grpclb-<{0}>] Ignoring unexpected response type: {1}", 736 serviceName, 737 typeCase); 738 return; 739 } 740 741 balancerWorking = true; 742 // TODO(zhangkun83): handle delegate from initialResponse 743 ServerList serverList = response.getServerList(); 744 List<DropEntry> newDropList = new ArrayList<>(); 745 List<BackendAddressGroup> newBackendAddrList = new ArrayList<>(); 746 // Construct the new collections. Create new Subchannels when necessary. 747 for (Server server : serverList.getServersList()) { 748 String token = server.getLoadBalanceToken(); 749 if (server.getDrop()) { 750 newDropList.add(new DropEntry(loadRecorder, token)); 751 } else { 752 newDropList.add(null); 753 InetSocketAddress address; 754 try { 755 address = new InetSocketAddress( 756 InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort()); 757 } catch (UnknownHostException e) { 758 propagateError( 759 Status.UNAVAILABLE 760 .withDescription("Invalid backend address: " + server) 761 .withCause(e)); 762 continue; 763 } 764 // ALTS code can use the presence of ATTR_LB_PROVIDED_BACKEND to select ALTS instead of 765 // TLS, with Netty. 766 EquivalentAddressGroup eag = 767 new EquivalentAddressGroup(address, LB_PROVIDED_BACKEND_ATTRS); 768 newBackendAddrList.add(new BackendAddressGroup(eag, token)); 769 } 770 } 771 // Exit fallback as soon as a new server list is received from the balancer. 772 usingFallbackBackends = false; 773 fallbackReason = null; 774 cancelFallbackTimer(); 775 updateServerList(newDropList, newBackendAddrList, loadRecorder); 776 maybeUpdatePicker(); 777 } 778 handleStreamClosed(Status error)779 private void handleStreamClosed(Status error) { 780 checkArgument(!error.isOk(), "unexpected OK status"); 781 if (closed) { 782 return; 783 } 784 closed = true; 785 cleanUp(); 786 propagateError(error); 787 balancerWorking = false; 788 fallbackReason = error; 789 cancelFallbackTimer(); 790 maybeUseFallbackBackends(); 791 maybeUpdatePicker(); 792 793 long delayNanos = 0; 794 if (initialResponseReceived || lbRpcRetryPolicy == null) { 795 // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence 796 // has never been initialized. 797 lbRpcRetryPolicy = backoffPolicyProvider.get(); 798 } 799 // Backoff only when balancer wasn't working previously. 800 if (!initialResponseReceived) { 801 // The back-off policy determines the interval between consecutive RPC upstarts, thus the 802 // actual delay may be smaller than the value from the back-off policy, or even negative, 803 // depending how much time was spent in the previous RPC. 804 delayNanos = 805 lbRpcRetryPolicy.nextBackoffNanos() - stopwatch.elapsed(TimeUnit.NANOSECONDS); 806 } 807 if (delayNanos <= 0) { 808 startLbRpc(); 809 } else { 810 lbRpcRetryTimer = 811 syncContext.schedule(new LbRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, 812 timerService); 813 } 814 815 helper.refreshNameResolution(); 816 } 817 close(Exception error)818 void close(Exception error) { 819 if (closed) { 820 return; 821 } 822 closed = true; 823 cleanUp(); 824 lbRequestWriter.onError(error); 825 } 826 cleanUp()827 private void cleanUp() { 828 if (loadReportTimer != null) { 829 loadReportTimer.cancel(); 830 loadReportTimer = null; 831 } 832 if (lbStream == this) { 833 lbStream = null; 834 } 835 } 836 } 837 838 /** 839 * Make and use a picker out of the current lists and the states of subchannels if they have 840 * changed since the last picker created. 841 */ maybeUpdatePicker()842 private void maybeUpdatePicker() { 843 List<RoundRobinEntry> pickList; 844 ConnectivityState state; 845 if (backendList.isEmpty()) { 846 // Note balancer (is working) may enforce using fallback backends, and that fallback may 847 // fail. So we should check if currently in fallback first. 848 if (usingFallbackBackends) { 849 Status error = 850 NO_FALLBACK_BACKENDS_STATUS 851 .withCause(fallbackReason.getCause()) 852 .augmentDescription(fallbackReason.getDescription()); 853 pickList = Collections.<RoundRobinEntry>singletonList(new ErrorEntry(error)); 854 state = TRANSIENT_FAILURE; 855 } else if (balancerWorking) { 856 pickList = 857 Collections.<RoundRobinEntry>singletonList( 858 new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS)); 859 state = TRANSIENT_FAILURE; 860 } else { // still waiting for balancer 861 pickList = Collections.singletonList(BUFFER_ENTRY); 862 state = CONNECTING; 863 } 864 maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList)); 865 return; 866 } 867 switch (config.getMode()) { 868 case ROUND_ROBIN: 869 pickList = new ArrayList<>(backendList.size()); 870 Status error = null; 871 boolean hasPending = false; 872 for (BackendEntry entry : backendList) { 873 Subchannel subchannel = entry.subchannel; 874 Attributes attrs = subchannel.getAttributes(); 875 ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get(); 876 if (stateInfo.getState() == READY) { 877 pickList.add(entry); 878 } else if (stateInfo.getState() == TRANSIENT_FAILURE) { 879 error = stateInfo.getStatus(); 880 } else { 881 hasPending = true; 882 } 883 } 884 if (pickList.isEmpty()) { 885 if (hasPending) { 886 pickList.add(BUFFER_ENTRY); 887 state = CONNECTING; 888 } else { 889 pickList.add(new ErrorEntry(error)); 890 state = TRANSIENT_FAILURE; 891 } 892 } else { 893 state = READY; 894 } 895 break; 896 case PICK_FIRST: { 897 checkState(backendList.size() == 1, "Excessive backend entries: %s", backendList); 898 BackendEntry onlyEntry = backendList.get(0); 899 ConnectivityStateInfo stateInfo = 900 onlyEntry.subchannel.getAttributes().get(STATE_INFO).get(); 901 state = stateInfo.getState(); 902 switch (state) { 903 case READY: 904 pickList = Collections.<RoundRobinEntry>singletonList(onlyEntry); 905 break; 906 case TRANSIENT_FAILURE: 907 pickList = 908 Collections.<RoundRobinEntry>singletonList(new ErrorEntry(stateInfo.getStatus())); 909 break; 910 case CONNECTING: 911 pickList = Collections.singletonList(BUFFER_ENTRY); 912 break; 913 default: 914 pickList = Collections.<RoundRobinEntry>singletonList( 915 new IdleSubchannelEntry(onlyEntry.subchannel, syncContext)); 916 } 917 break; 918 } 919 default: 920 throw new AssertionError("Missing case for " + config.getMode()); 921 } 922 maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList)); 923 } 924 925 /** 926 * Update the given picker to the helper if it's different from the current one. 927 */ maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker)928 private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker) { 929 // Discard the new picker if we are sure it won't make any difference, in order to save 930 // re-processing pending streams, and avoid unnecessary resetting of the pointer in 931 // RoundRobinPicker. 932 if (picker.dropList.equals(currentPicker.dropList) 933 && picker.pickList.equals(currentPicker.pickList)) { 934 return; 935 } 936 currentPicker = picker; 937 helper.updateBalancingState(state, picker); 938 } 939 createSubchannelAttrs()940 private static Attributes createSubchannelAttrs() { 941 return Attributes.newBuilder() 942 .set(STATE_INFO, 943 new AtomicReference<>( 944 ConnectivityStateInfo.forNonError(IDLE))) 945 .build(); 946 } 947 948 @VisibleForTesting 949 static final class DropEntry { 950 private final GrpclbClientLoadRecorder loadRecorder; 951 private final String token; 952 DropEntry(GrpclbClientLoadRecorder loadRecorder, String token)953 DropEntry(GrpclbClientLoadRecorder loadRecorder, String token) { 954 this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder"); 955 this.token = checkNotNull(token, "token"); 956 } 957 picked()958 PickResult picked() { 959 loadRecorder.recordDroppedRequest(token); 960 return DROP_PICK_RESULT; 961 } 962 963 @Override toString()964 public String toString() { 965 // This is printed in logs. Only include useful information. 966 return "drop(" + token + ")"; 967 } 968 969 @Override hashCode()970 public int hashCode() { 971 return Objects.hashCode(loadRecorder, token); 972 } 973 974 @Override equals(Object other)975 public boolean equals(Object other) { 976 if (!(other instanceof DropEntry)) { 977 return false; 978 } 979 DropEntry that = (DropEntry) other; 980 return Objects.equal(loadRecorder, that.loadRecorder) && Objects.equal(token, that.token); 981 } 982 } 983 984 @VisibleForTesting 985 interface RoundRobinEntry { picked(Metadata headers)986 PickResult picked(Metadata headers); 987 } 988 989 @VisibleForTesting 990 static final class BackendEntry implements RoundRobinEntry { 991 final Subchannel subchannel; 992 @VisibleForTesting 993 final PickResult result; 994 @Nullable 995 private final String token; 996 997 /** 998 * For ROUND_ROBIN: creates a BackendEntry whose usage will be reported to load recorder. 999 */ BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token)1000 BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) { 1001 this.subchannel = checkNotNull(subchannel, "subchannel"); 1002 this.result = 1003 PickResult.withSubchannel(subchannel, checkNotNull(loadRecorder, "loadRecorder")); 1004 this.token = checkNotNull(token, "token"); 1005 } 1006 1007 /** 1008 * For ROUND_ROBIN/PICK_FIRST: creates a BackendEntry whose usage will not be reported. 1009 */ BackendEntry(Subchannel subchannel)1010 BackendEntry(Subchannel subchannel) { 1011 this.subchannel = checkNotNull(subchannel, "subchannel"); 1012 this.result = PickResult.withSubchannel(subchannel); 1013 this.token = null; 1014 } 1015 1016 /** 1017 * For PICK_FIRST: creates a BackendEntry that includes all addresses. 1018 */ BackendEntry(Subchannel subchannel, TokenAttachingTracerFactory tracerFactory)1019 BackendEntry(Subchannel subchannel, TokenAttachingTracerFactory tracerFactory) { 1020 this.subchannel = checkNotNull(subchannel, "subchannel"); 1021 this.result = 1022 PickResult.withSubchannel(subchannel, checkNotNull(tracerFactory, "tracerFactory")); 1023 this.token = null; 1024 } 1025 1026 @Override picked(Metadata headers)1027 public PickResult picked(Metadata headers) { 1028 headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY); 1029 if (token != null) { 1030 headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token); 1031 } 1032 return result; 1033 } 1034 1035 @Override toString()1036 public String toString() { 1037 // This is printed in logs. Only give out useful information. 1038 return "[" + subchannel.getAllAddresses().toString() + "(" + token + ")]"; 1039 } 1040 1041 @Override hashCode()1042 public int hashCode() { 1043 return Objects.hashCode(result, token); 1044 } 1045 1046 @Override equals(Object other)1047 public boolean equals(Object other) { 1048 if (!(other instanceof BackendEntry)) { 1049 return false; 1050 } 1051 BackendEntry that = (BackendEntry) other; 1052 return Objects.equal(result, that.result) && Objects.equal(token, that.token); 1053 } 1054 } 1055 1056 @VisibleForTesting 1057 static final class IdleSubchannelEntry implements RoundRobinEntry { 1058 private final SynchronizationContext syncContext; 1059 private final Subchannel subchannel; 1060 private final AtomicBoolean connectionRequested = new AtomicBoolean(false); 1061 IdleSubchannelEntry(Subchannel subchannel, SynchronizationContext syncContext)1062 IdleSubchannelEntry(Subchannel subchannel, SynchronizationContext syncContext) { 1063 this.subchannel = checkNotNull(subchannel, "subchannel"); 1064 this.syncContext = checkNotNull(syncContext, "syncContext"); 1065 } 1066 1067 @Override picked(Metadata headers)1068 public PickResult picked(Metadata headers) { 1069 if (connectionRequested.compareAndSet(false, true)) { 1070 syncContext.execute(new Runnable() { 1071 @Override 1072 public void run() { 1073 subchannel.requestConnection(); 1074 } 1075 }); 1076 } 1077 return PickResult.withNoResult(); 1078 } 1079 1080 @Override toString()1081 public String toString() { 1082 // This is printed in logs. Only give out useful information. 1083 return "(idle)[" + subchannel.getAllAddresses().toString() + "]"; 1084 } 1085 1086 @Override hashCode()1087 public int hashCode() { 1088 return Objects.hashCode(subchannel, syncContext); 1089 } 1090 1091 @Override equals(Object other)1092 public boolean equals(Object other) { 1093 if (!(other instanceof IdleSubchannelEntry)) { 1094 return false; 1095 } 1096 IdleSubchannelEntry that = (IdleSubchannelEntry) other; 1097 return Objects.equal(subchannel, that.subchannel) 1098 && Objects.equal(syncContext, that.syncContext); 1099 } 1100 } 1101 1102 @VisibleForTesting 1103 static final class ErrorEntry implements RoundRobinEntry { 1104 final PickResult result; 1105 ErrorEntry(Status status)1106 ErrorEntry(Status status) { 1107 result = PickResult.withError(status); 1108 } 1109 1110 @Override picked(Metadata headers)1111 public PickResult picked(Metadata headers) { 1112 return result; 1113 } 1114 1115 @Override hashCode()1116 public int hashCode() { 1117 return Objects.hashCode(result); 1118 } 1119 1120 @Override equals(Object other)1121 public boolean equals(Object other) { 1122 if (!(other instanceof ErrorEntry)) { 1123 return false; 1124 } 1125 return Objects.equal(result, ((ErrorEntry) other).result); 1126 } 1127 1128 @Override toString()1129 public String toString() { 1130 // This is printed in logs. Only include useful information. 1131 return result.getStatus().toString(); 1132 } 1133 } 1134 1135 @VisibleForTesting 1136 static final class RoundRobinPicker extends SubchannelPicker { 1137 @VisibleForTesting 1138 final List<DropEntry> dropList; 1139 private int dropIndex; 1140 1141 @VisibleForTesting 1142 final List<? extends RoundRobinEntry> pickList; 1143 private int pickIndex; 1144 1145 // dropList can be empty, which means no drop. 1146 // pickList must not be empty. RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList)1147 RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList) { 1148 this.dropList = checkNotNull(dropList, "dropList"); 1149 this.pickList = checkNotNull(pickList, "pickList"); 1150 checkArgument(!pickList.isEmpty(), "pickList is empty"); 1151 } 1152 1153 @Override pickSubchannel(PickSubchannelArgs args)1154 public PickResult pickSubchannel(PickSubchannelArgs args) { 1155 synchronized (pickList) { 1156 // Two-level round-robin. 1157 // First round-robin on dropList. If a drop entry is selected, request will be dropped. If 1158 // a non-drop entry is selected, then round-robin on pickList. This makes sure requests are 1159 // dropped at the same proportion as the drop entries appear on the round-robin list from 1160 // the balancer, while only backends from pickList are selected for the non-drop cases. 1161 if (!dropList.isEmpty()) { 1162 DropEntry drop = dropList.get(dropIndex); 1163 dropIndex++; 1164 if (dropIndex == dropList.size()) { 1165 dropIndex = 0; 1166 } 1167 if (drop != null) { 1168 return drop.picked(); 1169 } 1170 } 1171 1172 RoundRobinEntry pick = pickList.get(pickIndex); 1173 pickIndex++; 1174 if (pickIndex == pickList.size()) { 1175 pickIndex = 0; 1176 } 1177 return pick.picked(args.getHeaders()); 1178 } 1179 } 1180 1181 @Override toString()1182 public String toString() { 1183 if (SHOULD_LOG_SERVER_LISTS) { 1184 return MoreObjects.toStringHelper(RoundRobinPicker.class) 1185 .add("dropList", dropList) 1186 .add("pickList", pickList) 1187 .toString(); 1188 } 1189 return MoreObjects.toStringHelper(RoundRobinPicker.class).toString(); 1190 } 1191 } 1192 } 1193