1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.grpclb; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static io.grpc.ConnectivityState.CONNECTING; 23 import static io.grpc.ConnectivityState.IDLE; 24 import static io.grpc.ConnectivityState.READY; 25 import static io.grpc.ConnectivityState.SHUTDOWN; 26 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 27 28 import com.google.common.annotations.VisibleForTesting; 29 import com.google.common.base.MoreObjects; 30 import com.google.common.base.Objects; 31 import com.google.protobuf.util.Durations; 32 import io.grpc.Attributes; 33 import io.grpc.ConnectivityState; 34 import io.grpc.ConnectivityStateInfo; 35 import io.grpc.EquivalentAddressGroup; 36 import io.grpc.InternalLogId; 37 import io.grpc.LoadBalancer.Helper; 38 import io.grpc.LoadBalancer.PickResult; 39 import io.grpc.LoadBalancer.PickSubchannelArgs; 40 import io.grpc.LoadBalancer.Subchannel; 41 import io.grpc.LoadBalancer.SubchannelPicker; 42 import io.grpc.ManagedChannel; 43 import io.grpc.Metadata; 44 import io.grpc.Status; 45 import io.grpc.internal.BackoffPolicy; 46 import io.grpc.internal.GrpcAttributes; 47 import io.grpc.internal.TimeProvider; 48 import io.grpc.lb.v1.ClientStats; 49 import io.grpc.lb.v1.InitialLoadBalanceRequest; 50 import io.grpc.lb.v1.InitialLoadBalanceResponse; 51 import io.grpc.lb.v1.LoadBalanceRequest; 52 import io.grpc.lb.v1.LoadBalanceResponse; 53 import io.grpc.lb.v1.LoadBalanceResponse.LoadBalanceResponseTypeCase; 54 import io.grpc.lb.v1.LoadBalancerGrpc; 55 import io.grpc.lb.v1.Server; 56 import io.grpc.lb.v1.ServerList; 57 import io.grpc.stub.StreamObserver; 58 import java.net.InetAddress; 59 import java.net.InetSocketAddress; 60 import java.net.SocketAddress; 61 import java.net.UnknownHostException; 62 import java.util.ArrayList; 63 import java.util.Arrays; 64 import java.util.Collections; 65 import java.util.HashMap; 66 import java.util.List; 67 import java.util.Map; 68 import java.util.Map.Entry; 69 import java.util.concurrent.ScheduledExecutorService; 70 import java.util.concurrent.ScheduledFuture; 71 import java.util.concurrent.TimeUnit; 72 import java.util.concurrent.atomic.AtomicReference; 73 import java.util.logging.Level; 74 import java.util.logging.Logger; 75 import javax.annotation.Nullable; 76 import javax.annotation.concurrent.NotThreadSafe; 77 78 /** 79 * The states of a GRPCLB working session of {@link GrpclbLoadBalancer}. Created when 80 * GrpclbLoadBalancer switches to GRPCLB mode. Closed and discarded when GrpclbLoadBalancer 81 * switches away from GRPCLB mode. 82 */ 83 @NotThreadSafe 84 final class GrpclbState { 85 private static final Logger logger = Logger.getLogger(GrpclbState.class.getName()); 86 87 static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); 88 private static final Attributes LB_PROVIDED_BACKEND_ATTRS = 89 Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_PROVIDED_BACKEND, true).build(); 90 91 @VisibleForTesting 92 static final PickResult DROP_PICK_RESULT = 93 PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer")); 94 95 @VisibleForTesting 96 static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { 97 @Override 98 public PickResult picked(Metadata headers) { 99 return PickResult.withNoResult(); 100 } 101 102 @Override 103 public String toString() { 104 return "BUFFER_ENTRY"; 105 } 106 }; 107 108 private final InternalLogId logId; 109 private final String serviceName; 110 private final Helper helper; 111 private final SubchannelPool subchannelPool; 112 private final TimeProvider time; 113 private final ScheduledExecutorService timerService; 114 115 private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO = 116 Attributes.Key.create("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo"); 117 private final BackoffPolicy.Provider backoffPolicyProvider; 118 119 // Scheduled only once. Never reset. 120 @Nullable 121 private FallbackModeTask fallbackTimer; 122 private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList(); 123 private boolean usingFallbackBackends; 124 // True if the current balancer has returned a serverlist. Will be reset to false when lost 125 // connection to a balancer. 126 private boolean balancerWorking; 127 @Nullable 128 private BackoffPolicy lbRpcRetryPolicy; 129 @Nullable 130 private LbRpcRetryTask lbRpcRetryTimer; 131 private long prevLbRpcStartNanos; 132 133 @Nullable 134 private ManagedChannel lbCommChannel; 135 136 @Nullable 137 private LbStream lbStream; 138 private Map<EquivalentAddressGroup, Subchannel> subchannels = Collections.emptyMap(); 139 140 // Has the same size as the round-robin list from the balancer. 141 // A drop entry from the round-robin list becomes a DropEntry here. 142 // A backend entry from the robin-robin list becomes a null here. 143 private List<DropEntry> dropList = Collections.emptyList(); 144 // Contains only non-drop, i.e., backends from the round-robin list from the balancer. 145 private List<BackendEntry> backendList = Collections.emptyList(); 146 private RoundRobinPicker currentPicker = 147 new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY)); 148 GrpclbState( Helper helper, SubchannelPool subchannelPool, TimeProvider time, ScheduledExecutorService timerService, BackoffPolicy.Provider backoffPolicyProvider, InternalLogId logId)149 GrpclbState( 150 Helper helper, 151 SubchannelPool subchannelPool, 152 TimeProvider time, 153 ScheduledExecutorService timerService, 154 BackoffPolicy.Provider backoffPolicyProvider, 155 InternalLogId logId) { 156 this.helper = checkNotNull(helper, "helper"); 157 this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool"); 158 this.time = checkNotNull(time, "time provider"); 159 this.timerService = checkNotNull(timerService, "timerService"); 160 this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); 161 this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority"); 162 this.logId = checkNotNull(logId, "logId"); 163 } 164 handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState)165 void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { 166 if (newState.getState() == SHUTDOWN || !(subchannels.values().contains(subchannel))) { 167 return; 168 } 169 if (newState.getState() == IDLE) { 170 subchannel.requestConnection(); 171 } 172 subchannel.getAttributes().get(STATE_INFO).set(newState); 173 maybeUseFallbackBackends(); 174 maybeUpdatePicker(); 175 } 176 177 /** 178 * Handle new addresses of the balancer and backends from the resolver, and create connection if 179 * not yet connected. 180 */ handleAddresses( List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers)181 void handleAddresses( 182 List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers) { 183 if (newLbAddressGroups.isEmpty()) { 184 propagateError(Status.UNAVAILABLE.withDescription( 185 "NameResolver returned no LB address while asking for GRPCLB")); 186 return; 187 } 188 LbAddressGroup newLbAddressGroup = flattenLbAddressGroups(newLbAddressGroups); 189 startLbComm(newLbAddressGroup); 190 // Avoid creating a new RPC just because the addresses were updated, as it can cause a 191 // stampeding herd. The current RPC may be on a connection to an address not present in 192 // newLbAddressGroups, but we're considering that "okay". If we detected the RPC is to an 193 // outdated backend, we could choose to re-create the RPC. 194 if (lbStream == null) { 195 startLbRpc(); 196 } 197 fallbackBackendList = newBackendServers; 198 // Start the fallback timer if it's never started 199 if (fallbackTimer == null) { 200 logger.log(Level.FINE, "[{0}] Starting fallback timer.", new Object[] {logId}); 201 fallbackTimer = new FallbackModeTask(); 202 fallbackTimer.schedule(); 203 } 204 if (usingFallbackBackends) { 205 // Populate the new fallback backends to round-robin list. 206 useFallbackBackends(); 207 } 208 maybeUpdatePicker(); 209 } 210 maybeUseFallbackBackends()211 private void maybeUseFallbackBackends() { 212 if (balancerWorking) { 213 return; 214 } 215 if (usingFallbackBackends) { 216 return; 217 } 218 int numReadySubchannels = 0; 219 for (Subchannel subchannel : subchannels.values()) { 220 if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) { 221 numReadySubchannels++; 222 } 223 } 224 if (numReadySubchannels > 0) { 225 return; 226 } 227 // Fallback contiditions met 228 useFallbackBackends(); 229 } 230 231 /** 232 * Populate the round-robin lists with the fallback backends. 233 */ useFallbackBackends()234 private void useFallbackBackends() { 235 usingFallbackBackends = true; 236 logger.log(Level.INFO, "[{0}] Using fallback: {1}", new Object[] {logId, fallbackBackendList}); 237 238 List<DropEntry> newDropList = new ArrayList<>(); 239 List<BackendAddressGroup> newBackendAddrList = new ArrayList<>(); 240 for (EquivalentAddressGroup eag : fallbackBackendList) { 241 newDropList.add(null); 242 newBackendAddrList.add(new BackendAddressGroup(eag, null)); 243 } 244 useRoundRobinLists(newDropList, newBackendAddrList, null); 245 } 246 shutdownLbComm()247 private void shutdownLbComm() { 248 if (lbCommChannel != null) { 249 lbCommChannel.shutdown(); 250 lbCommChannel = null; 251 } 252 shutdownLbRpc(); 253 } 254 shutdownLbRpc()255 private void shutdownLbRpc() { 256 if (lbStream != null) { 257 lbStream.close(null); 258 // lbStream will be set to null in LbStream.cleanup() 259 } 260 } 261 startLbComm(LbAddressGroup lbAddressGroup)262 private void startLbComm(LbAddressGroup lbAddressGroup) { 263 checkNotNull(lbAddressGroup, "lbAddressGroup"); 264 if (lbCommChannel == null) { 265 lbCommChannel = helper.createOobChannel( 266 lbAddressGroup.getAddresses(), lbAddressGroup.getAuthority()); 267 } else if (lbAddressGroup.getAuthority().equals(lbCommChannel.authority())) { 268 helper.updateOobChannelAddresses(lbCommChannel, lbAddressGroup.getAddresses()); 269 } else { 270 // Full restart of channel 271 shutdownLbComm(); 272 lbCommChannel = helper.createOobChannel( 273 lbAddressGroup.getAddresses(), lbAddressGroup.getAuthority()); 274 } 275 } 276 startLbRpc()277 private void startLbRpc() { 278 checkState(lbStream == null, "previous lbStream has not been cleared yet"); 279 LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel); 280 lbStream = new LbStream(stub); 281 lbStream.start(); 282 prevLbRpcStartNanos = time.currentTimeNanos(); 283 284 LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder() 285 .setInitialRequest(InitialLoadBalanceRequest.newBuilder() 286 .setName(serviceName).build()) 287 .build(); 288 try { 289 lbStream.lbRequestWriter.onNext(initRequest); 290 } catch (Exception e) { 291 lbStream.close(e); 292 } 293 } 294 cancelFallbackTimer()295 private void cancelFallbackTimer() { 296 if (fallbackTimer != null) { 297 fallbackTimer.cancel(); 298 } 299 } 300 cancelLbRpcRetryTimer()301 private void cancelLbRpcRetryTimer() { 302 if (lbRpcRetryTimer != null) { 303 lbRpcRetryTimer.cancel(); 304 } 305 } 306 shutdown()307 void shutdown() { 308 shutdownLbComm(); 309 // We close the subchannels through subchannelPool instead of helper just for convenience of 310 // testing. 311 for (Subchannel subchannel : subchannels.values()) { 312 subchannelPool.returnSubchannel(subchannel); 313 } 314 subchannels = Collections.emptyMap(); 315 subchannelPool.clear(); 316 cancelFallbackTimer(); 317 cancelLbRpcRetryTimer(); 318 } 319 propagateError(Status status)320 void propagateError(Status status) { 321 logger.log(Level.FINE, "[{0}] Had an error: {1}; dropList={2}; backendList={3}", 322 new Object[] {logId, status, dropList, backendList}); 323 if (backendList.isEmpty()) { 324 maybeUpdatePicker( 325 TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(status)))); 326 } 327 } 328 329 @VisibleForTesting 330 @Nullable getLoadRecorder()331 GrpclbClientLoadRecorder getLoadRecorder() { 332 if (lbStream == null) { 333 return null; 334 } 335 return lbStream.loadRecorder; 336 } 337 338 /** 339 * Populate the round-robin lists with the given values. 340 */ useRoundRobinLists( List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList, @Nullable GrpclbClientLoadRecorder loadRecorder)341 private void useRoundRobinLists( 342 List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList, 343 @Nullable GrpclbClientLoadRecorder loadRecorder) { 344 logger.log(Level.FINE, "[{0}] Using round-robin list: {1}, droplist={2}", 345 new Object[] {logId, newBackendAddrList, newDropList}); 346 HashMap<EquivalentAddressGroup, Subchannel> newSubchannelMap = 347 new HashMap<EquivalentAddressGroup, Subchannel>(); 348 List<BackendEntry> newBackendList = new ArrayList<>(); 349 350 for (BackendAddressGroup backendAddr : newBackendAddrList) { 351 EquivalentAddressGroup eag = backendAddr.getAddresses(); 352 Subchannel subchannel = newSubchannelMap.get(eag); 353 if (subchannel == null) { 354 subchannel = subchannels.get(eag); 355 if (subchannel == null) { 356 Attributes subchannelAttrs = Attributes.newBuilder() 357 .set(STATE_INFO, 358 new AtomicReference<ConnectivityStateInfo>( 359 ConnectivityStateInfo.forNonError(IDLE))) 360 .build(); 361 subchannel = subchannelPool.takeOrCreateSubchannel(eag, subchannelAttrs); 362 subchannel.requestConnection(); 363 } 364 newSubchannelMap.put(eag, subchannel); 365 } 366 BackendEntry entry; 367 // Only picks with tokens are reported to LoadRecorder 368 if (backendAddr.getToken() == null) { 369 entry = new BackendEntry(subchannel); 370 } else { 371 entry = new BackendEntry(subchannel, loadRecorder, backendAddr.getToken()); 372 } 373 newBackendList.add(entry); 374 } 375 376 // Close Subchannels whose addresses have been delisted 377 for (Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()) { 378 EquivalentAddressGroup eag = entry.getKey(); 379 if (!newSubchannelMap.containsKey(eag)) { 380 subchannelPool.returnSubchannel(entry.getValue()); 381 } 382 } 383 384 subchannels = Collections.unmodifiableMap(newSubchannelMap); 385 dropList = Collections.unmodifiableList(newDropList); 386 backendList = Collections.unmodifiableList(newBackendList); 387 } 388 389 @VisibleForTesting 390 class FallbackModeTask implements Runnable { 391 private ScheduledFuture<?> scheduledFuture; 392 393 @Override run()394 public void run() { 395 helper.runSerialized(new Runnable() { 396 @Override 397 public void run() { 398 checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch"); 399 maybeUseFallbackBackends(); 400 maybeUpdatePicker(); 401 } 402 }); 403 } 404 cancel()405 void cancel() { 406 scheduledFuture.cancel(false); 407 } 408 schedule()409 void schedule() { 410 checkState(scheduledFuture == null, "FallbackModeTask already scheduled"); 411 scheduledFuture = timerService.schedule(this, FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); 412 } 413 } 414 415 @VisibleForTesting 416 class LbRpcRetryTask implements Runnable { 417 private ScheduledFuture<?> scheduledFuture; 418 419 @Override run()420 public void run() { 421 helper.runSerialized(new Runnable() { 422 @Override 423 public void run() { 424 checkState( 425 lbRpcRetryTimer == LbRpcRetryTask.this, "LbRpc retry timer mismatch"); 426 startLbRpc(); 427 } 428 }); 429 } 430 cancel()431 void cancel() { 432 scheduledFuture.cancel(false); 433 } 434 schedule(long delayNanos)435 void schedule(long delayNanos) { 436 checkState(scheduledFuture == null, "LbRpcRetryTask already scheduled"); 437 scheduledFuture = timerService.schedule(this, delayNanos, TimeUnit.NANOSECONDS); 438 } 439 } 440 441 @VisibleForTesting 442 class LoadReportingTask implements Runnable { 443 private final LbStream stream; 444 LoadReportingTask(LbStream stream)445 LoadReportingTask(LbStream stream) { 446 this.stream = stream; 447 } 448 449 @Override run()450 public void run() { 451 helper.runSerialized(new Runnable() { 452 @Override 453 public void run() { 454 stream.loadReportFuture = null; 455 stream.sendLoadReport(); 456 } 457 }); 458 } 459 } 460 461 private class LbStream implements StreamObserver<LoadBalanceResponse> { 462 final GrpclbClientLoadRecorder loadRecorder; 463 final LoadBalancerGrpc.LoadBalancerStub stub; 464 StreamObserver<LoadBalanceRequest> lbRequestWriter; 465 466 // These fields are only accessed from helper.runSerialized() 467 boolean initialResponseReceived; 468 boolean closed; 469 long loadReportIntervalMillis = -1; 470 ScheduledFuture<?> loadReportFuture; 471 LbStream(LoadBalancerGrpc.LoadBalancerStub stub)472 LbStream(LoadBalancerGrpc.LoadBalancerStub stub) { 473 this.stub = checkNotNull(stub, "stub"); 474 // Stats data only valid for current LbStream. We do not carry over data from previous 475 // stream. 476 loadRecorder = new GrpclbClientLoadRecorder(time); 477 } 478 start()479 void start() { 480 lbRequestWriter = stub.withWaitForReady().balanceLoad(this); 481 } 482 onNext(final LoadBalanceResponse response)483 @Override public void onNext(final LoadBalanceResponse response) { 484 helper.runSerialized(new Runnable() { 485 @Override 486 public void run() { 487 handleResponse(response); 488 } 489 }); 490 } 491 onError(final Throwable error)492 @Override public void onError(final Throwable error) { 493 helper.runSerialized(new Runnable() { 494 @Override 495 public void run() { 496 handleStreamClosed(Status.fromThrowable(error) 497 .augmentDescription("Stream to GRPCLB LoadBalancer had an error")); 498 } 499 }); 500 } 501 onCompleted()502 @Override public void onCompleted() { 503 helper.runSerialized(new Runnable() { 504 @Override 505 public void run() { 506 handleStreamClosed( 507 Status.UNAVAILABLE.withDescription("Stream to GRPCLB LoadBalancer was closed")); 508 } 509 }); 510 } 511 512 // Following methods must be run in helper.runSerialized() 513 sendLoadReport()514 private void sendLoadReport() { 515 if (closed) { 516 return; 517 } 518 ClientStats stats = loadRecorder.generateLoadReport(); 519 // TODO(zhangkun83): flow control? 520 try { 521 lbRequestWriter.onNext(LoadBalanceRequest.newBuilder().setClientStats(stats).build()); 522 scheduleNextLoadReport(); 523 } catch (Exception e) { 524 close(e); 525 } 526 } 527 scheduleNextLoadReport()528 private void scheduleNextLoadReport() { 529 if (loadReportIntervalMillis > 0) { 530 loadReportFuture = timerService.schedule( 531 new LoadReportingTask(this), loadReportIntervalMillis, TimeUnit.MILLISECONDS); 532 } 533 } 534 handleResponse(LoadBalanceResponse response)535 private void handleResponse(LoadBalanceResponse response) { 536 if (closed) { 537 return; 538 } 539 logger.log(Level.FINER, "[{0}] Got an LB response: {1}", new Object[] {logId, response}); 540 541 LoadBalanceResponseTypeCase typeCase = response.getLoadBalanceResponseTypeCase(); 542 if (!initialResponseReceived) { 543 if (typeCase != LoadBalanceResponseTypeCase.INITIAL_RESPONSE) { 544 logger.log( 545 Level.WARNING, 546 "[{0}] : Did not receive response with type initial response: {1}", 547 new Object[] {logId, response}); 548 return; 549 } 550 initialResponseReceived = true; 551 InitialLoadBalanceResponse initialResponse = response.getInitialResponse(); 552 loadReportIntervalMillis = 553 Durations.toMillis(initialResponse.getClientStatsReportInterval()); 554 scheduleNextLoadReport(); 555 return; 556 } 557 558 if (typeCase != LoadBalanceResponseTypeCase.SERVER_LIST) { 559 logger.log( 560 Level.WARNING, 561 "[{0}] : Ignoring unexpected response type: {1}", 562 new Object[] {logId, response}); 563 return; 564 } 565 566 balancerWorking = true; 567 // TODO(zhangkun83): handle delegate from initialResponse 568 ServerList serverList = response.getServerList(); 569 List<DropEntry> newDropList = new ArrayList<>(); 570 List<BackendAddressGroup> newBackendAddrList = new ArrayList<>(); 571 // Construct the new collections. Create new Subchannels when necessary. 572 for (Server server : serverList.getServersList()) { 573 String token = server.getLoadBalanceToken(); 574 if (server.getDrop()) { 575 newDropList.add(new DropEntry(loadRecorder, token)); 576 } else { 577 newDropList.add(null); 578 InetSocketAddress address; 579 try { 580 address = new InetSocketAddress( 581 InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort()); 582 } catch (UnknownHostException e) { 583 propagateError( 584 Status.UNAVAILABLE 585 .withDescription("Host for server not found: " + server) 586 .withCause(e)); 587 continue; 588 } 589 // ALTS code can use the presence of ATTR_LB_PROVIDED_BACKEND to select ALTS instead of 590 // TLS, with Netty. 591 EquivalentAddressGroup eag = 592 new EquivalentAddressGroup(address, LB_PROVIDED_BACKEND_ATTRS); 593 newBackendAddrList.add(new BackendAddressGroup(eag, token)); 594 } 595 } 596 // Stop using fallback backends as soon as a new server list is received from the balancer. 597 usingFallbackBackends = false; 598 cancelFallbackTimer(); 599 useRoundRobinLists(newDropList, newBackendAddrList, loadRecorder); 600 maybeUpdatePicker(); 601 } 602 handleStreamClosed(Status error)603 private void handleStreamClosed(Status error) { 604 checkArgument(!error.isOk(), "unexpected OK status"); 605 if (closed) { 606 return; 607 } 608 closed = true; 609 cleanUp(); 610 propagateError(error); 611 balancerWorking = false; 612 maybeUseFallbackBackends(); 613 maybeUpdatePicker(); 614 615 long delayNanos = 0; 616 if (initialResponseReceived || lbRpcRetryPolicy == null) { 617 // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence 618 // has never been initialized. 619 lbRpcRetryPolicy = backoffPolicyProvider.get(); 620 } 621 // Backoff only when balancer wasn't working previously. 622 if (!initialResponseReceived) { 623 // The back-off policy determines the interval between consecutive RPC upstarts, thus the 624 // actual delay may be smaller than the value from the back-off policy, or even negative, 625 // depending how much time was spent in the previous RPC. 626 delayNanos = 627 prevLbRpcStartNanos + lbRpcRetryPolicy.nextBackoffNanos() - time.currentTimeNanos(); 628 } 629 if (delayNanos <= 0) { 630 startLbRpc(); 631 } else { 632 lbRpcRetryTimer = new LbRpcRetryTask(); 633 lbRpcRetryTimer.schedule(delayNanos); 634 } 635 } 636 close(@ullable Exception error)637 void close(@Nullable Exception error) { 638 if (closed) { 639 return; 640 } 641 closed = true; 642 cleanUp(); 643 try { 644 if (error == null) { 645 lbRequestWriter.onCompleted(); 646 } else { 647 lbRequestWriter.onError(error); 648 } 649 } catch (Exception e) { 650 // Don't care 651 } 652 } 653 cleanUp()654 private void cleanUp() { 655 if (loadReportFuture != null) { 656 loadReportFuture.cancel(false); 657 loadReportFuture = null; 658 } 659 if (lbStream == this) { 660 lbStream = null; 661 } 662 } 663 } 664 665 /** 666 * Make and use a picker out of the current lists and the states of subchannels if they have 667 * changed since the last picker created. 668 */ maybeUpdatePicker()669 private void maybeUpdatePicker() { 670 List<RoundRobinEntry> pickList = new ArrayList<>(backendList.size()); 671 Status error = null; 672 boolean hasIdle = false; 673 for (BackendEntry entry : backendList) { 674 Subchannel subchannel = entry.result.getSubchannel(); 675 Attributes attrs = subchannel.getAttributes(); 676 ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get(); 677 if (stateInfo.getState() == READY) { 678 pickList.add(entry); 679 } else if (stateInfo.getState() == TRANSIENT_FAILURE) { 680 error = stateInfo.getStatus(); 681 } else if (stateInfo.getState() == IDLE) { 682 hasIdle = true; 683 } 684 } 685 ConnectivityState state; 686 if (pickList.isEmpty()) { 687 if (error != null && !hasIdle) { 688 logger.log(Level.FINE, "[{0}] No ready Subchannel. Using error: {1}", 689 new Object[] {logId, error}); 690 pickList.add(new ErrorEntry(error)); 691 state = TRANSIENT_FAILURE; 692 } else { 693 logger.log(Level.FINE, "[{0}] No ready Subchannel and still connecting", logId); 694 pickList.add(BUFFER_ENTRY); 695 state = CONNECTING; 696 } 697 } else { 698 logger.log( 699 Level.FINE, "[{0}] Using drop list {1} and pick list {2}", 700 new Object[] {logId, dropList, pickList}); 701 state = READY; 702 } 703 maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList)); 704 } 705 706 /** 707 * Update the given picker to the helper if it's different from the current one. 708 */ maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker)709 private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker) { 710 // Discard the new picker if we are sure it won't make any difference, in order to save 711 // re-processing pending streams, and avoid unnecessary resetting of the pointer in 712 // RoundRobinPicker. 713 if (picker.dropList.equals(currentPicker.dropList) 714 && picker.pickList.equals(currentPicker.pickList)) { 715 return; 716 } 717 // No need to skip ErrorPicker. If the current picker is ErrorPicker, there won't be any pending 718 // stream thus no time is wasted in re-process. 719 currentPicker = picker; 720 helper.updateBalancingState(state, picker); 721 } 722 flattenLbAddressGroups(List<LbAddressGroup> groupList)723 private LbAddressGroup flattenLbAddressGroups(List<LbAddressGroup> groupList) { 724 assert !groupList.isEmpty(); 725 List<EquivalentAddressGroup> eags = new ArrayList<>(groupList.size()); 726 String authority = groupList.get(0).getAuthority(); 727 for (LbAddressGroup group : groupList) { 728 if (!authority.equals(group.getAuthority())) { 729 // TODO(ejona): Allow different authorities for different addresses. Requires support from 730 // Helper. 731 logger.log(Level.WARNING, 732 "[{0}] Multiple authorities found for LB. " 733 + "Skipping addresses for {0} in preference to {1}", 734 new Object[] {logId, group.getAuthority(), authority}); 735 } else { 736 eags.add(group.getAddresses()); 737 } 738 } 739 // ALTS code can use the presence of ATTR_LB_ADDR_AUTHORITY to select ALTS instead of TLS, with 740 // Netty. 741 // TODO(ejona): The process here is a bit of a hack because ATTR_LB_ADDR_AUTHORITY isn't 742 // actually used in the normal case. https://github.com/grpc/grpc-java/issues/4618 should allow 743 // this to be more obvious. 744 Attributes attrs = Attributes.newBuilder() 745 .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority) 746 .build(); 747 return new LbAddressGroup(flattenEquivalentAddressGroup(eags, attrs), authority); 748 } 749 750 /** 751 * Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object. 752 */ flattenEquivalentAddressGroup( List<EquivalentAddressGroup> groupList, Attributes attrs)753 private static EquivalentAddressGroup flattenEquivalentAddressGroup( 754 List<EquivalentAddressGroup> groupList, Attributes attrs) { 755 List<SocketAddress> addrs = new ArrayList<>(); 756 for (EquivalentAddressGroup group : groupList) { 757 addrs.addAll(group.getAddresses()); 758 } 759 return new EquivalentAddressGroup(addrs, attrs); 760 } 761 762 @VisibleForTesting 763 static final class DropEntry { 764 private final GrpclbClientLoadRecorder loadRecorder; 765 private final String token; 766 DropEntry(GrpclbClientLoadRecorder loadRecorder, String token)767 DropEntry(GrpclbClientLoadRecorder loadRecorder, String token) { 768 this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder"); 769 this.token = checkNotNull(token, "token"); 770 } 771 picked()772 PickResult picked() { 773 loadRecorder.recordDroppedRequest(token); 774 return DROP_PICK_RESULT; 775 } 776 777 @Override toString()778 public String toString() { 779 return MoreObjects.toStringHelper(this) 780 .add("loadRecorder", loadRecorder) 781 .add("token", token) 782 .toString(); 783 } 784 785 @Override hashCode()786 public int hashCode() { 787 return Objects.hashCode(loadRecorder, token); 788 } 789 790 @Override equals(Object other)791 public boolean equals(Object other) { 792 if (!(other instanceof DropEntry)) { 793 return false; 794 } 795 DropEntry that = (DropEntry) other; 796 return Objects.equal(loadRecorder, that.loadRecorder) && Objects.equal(token, that.token); 797 } 798 } 799 800 private interface RoundRobinEntry { picked(Metadata headers)801 PickResult picked(Metadata headers); 802 } 803 804 @VisibleForTesting 805 static final class BackendEntry implements RoundRobinEntry { 806 @VisibleForTesting 807 final PickResult result; 808 @Nullable 809 private final GrpclbClientLoadRecorder loadRecorder; 810 @Nullable 811 private final String token; 812 813 /** 814 * Creates a BackendEntry whose usage will be reported to load recorder. 815 */ BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token)816 BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) { 817 this.result = PickResult.withSubchannel(subchannel, loadRecorder); 818 this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder"); 819 this.token = checkNotNull(token, "token"); 820 } 821 822 /** 823 * Creates a BackendEntry whose usage will not be reported. 824 */ BackendEntry(Subchannel subchannel)825 BackendEntry(Subchannel subchannel) { 826 this.result = PickResult.withSubchannel(subchannel); 827 this.loadRecorder = null; 828 this.token = null; 829 } 830 831 @Override picked(Metadata headers)832 public PickResult picked(Metadata headers) { 833 headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY); 834 if (token != null) { 835 headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token); 836 } 837 return result; 838 } 839 840 @Override toString()841 public String toString() { 842 return MoreObjects.toStringHelper(this) 843 .add("result", result) 844 .add("loadRecorder", loadRecorder) 845 .add("token", token) 846 .toString(); 847 } 848 849 @Override hashCode()850 public int hashCode() { 851 return Objects.hashCode(loadRecorder, result, token); 852 } 853 854 @Override equals(Object other)855 public boolean equals(Object other) { 856 if (!(other instanceof BackendEntry)) { 857 return false; 858 } 859 BackendEntry that = (BackendEntry) other; 860 return Objects.equal(result, that.result) && Objects.equal(token, that.token) 861 && Objects.equal(loadRecorder, that.loadRecorder); 862 } 863 } 864 865 @VisibleForTesting 866 static final class ErrorEntry implements RoundRobinEntry { 867 final PickResult result; 868 ErrorEntry(Status status)869 ErrorEntry(Status status) { 870 result = PickResult.withError(status); 871 } 872 873 @Override picked(Metadata headers)874 public PickResult picked(Metadata headers) { 875 return result; 876 } 877 878 @Override hashCode()879 public int hashCode() { 880 return Objects.hashCode(result); 881 } 882 883 @Override equals(Object other)884 public boolean equals(Object other) { 885 if (!(other instanceof ErrorEntry)) { 886 return false; 887 } 888 return Objects.equal(result, ((ErrorEntry) other).result); 889 } 890 891 @Override toString()892 public String toString() { 893 return MoreObjects.toStringHelper(this) 894 .add("result", result) 895 .toString(); 896 } 897 } 898 899 @VisibleForTesting 900 static final class RoundRobinPicker extends SubchannelPicker { 901 @VisibleForTesting 902 final List<DropEntry> dropList; 903 private int dropIndex; 904 905 @VisibleForTesting 906 final List<? extends RoundRobinEntry> pickList; 907 private int pickIndex; 908 909 // dropList can be empty, which means no drop. 910 // pickList must not be empty. RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList)911 RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList) { 912 this.dropList = checkNotNull(dropList, "dropList"); 913 this.pickList = checkNotNull(pickList, "pickList"); 914 checkArgument(!pickList.isEmpty(), "pickList is empty"); 915 } 916 917 @Override pickSubchannel(PickSubchannelArgs args)918 public PickResult pickSubchannel(PickSubchannelArgs args) { 919 synchronized (pickList) { 920 // Two-level round-robin. 921 // First round-robin on dropList. If a drop entry is selected, request will be dropped. If 922 // a non-drop entry is selected, then round-robin on pickList. This makes sure requests are 923 // dropped at the same proportion as the drop entries appear on the round-robin list from 924 // the balancer, while only READY backends (that make up pickList) are selected for the 925 // non-drop cases. 926 if (!dropList.isEmpty()) { 927 DropEntry drop = dropList.get(dropIndex); 928 dropIndex++; 929 if (dropIndex == dropList.size()) { 930 dropIndex = 0; 931 } 932 if (drop != null) { 933 return drop.picked(); 934 } 935 } 936 937 RoundRobinEntry pick = pickList.get(pickIndex); 938 pickIndex++; 939 if (pickIndex == pickList.size()) { 940 pickIndex = 0; 941 } 942 return pick.picked(args.getHeaders()); 943 } 944 } 945 } 946 } 947