1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.grpclb; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static io.grpc.ConnectivityState.CONNECTING; 23 import static io.grpc.ConnectivityState.IDLE; 24 import static io.grpc.ConnectivityState.READY; 25 import static io.grpc.ConnectivityState.SHUTDOWN; 26 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 27 28 import com.google.common.annotations.VisibleForTesting; 29 import com.google.common.base.MoreObjects; 30 import com.google.common.base.Objects; 31 import com.google.protobuf.util.Durations; 32 import io.grpc.Attributes; 33 import io.grpc.ConnectivityState; 34 import io.grpc.ConnectivityStateInfo; 35 import io.grpc.EquivalentAddressGroup; 36 import io.grpc.InternalLogId; 37 import io.grpc.LoadBalancer.Helper; 38 import io.grpc.LoadBalancer.PickResult; 39 import io.grpc.LoadBalancer.PickSubchannelArgs; 40 import io.grpc.LoadBalancer.Subchannel; 41 import io.grpc.LoadBalancer.SubchannelPicker; 42 import io.grpc.ManagedChannel; 43 import io.grpc.Metadata; 44 import io.grpc.Status; 45 import io.grpc.internal.BackoffPolicy; 46 import io.grpc.internal.GrpcAttributes; 47 import io.grpc.internal.TimeProvider; 48 import io.grpc.lb.v1.ClientStats; 49 import io.grpc.lb.v1.InitialLoadBalanceRequest; 50 import io.grpc.lb.v1.InitialLoadBalanceResponse; 51 import io.grpc.lb.v1.LoadBalanceRequest; 52 import io.grpc.lb.v1.LoadBalanceResponse; 53 import io.grpc.lb.v1.LoadBalanceResponse.LoadBalanceResponseTypeCase; 54 import io.grpc.lb.v1.LoadBalancerGrpc; 55 import io.grpc.lb.v1.Server; 56 import io.grpc.lb.v1.ServerList; 57 import io.grpc.stub.StreamObserver; 58 import java.net.InetAddress; 59 import java.net.InetSocketAddress; 60 import java.net.SocketAddress; 61 import java.net.UnknownHostException; 62 import java.util.ArrayList; 63 import java.util.Arrays; 64 import java.util.Collections; 65 import java.util.HashMap; 66 import java.util.List; 67 import java.util.Map; 68 import java.util.Map.Entry; 69 import java.util.concurrent.ScheduledExecutorService; 70 import java.util.concurrent.ScheduledFuture; 71 import java.util.concurrent.TimeUnit; 72 import java.util.concurrent.atomic.AtomicReference; 73 import java.util.logging.Level; 74 import java.util.logging.Logger; 75 import javax.annotation.Nullable; 76 import javax.annotation.concurrent.NotThreadSafe; 77 78 /** 79 * The states of a GRPCLB working session of {@link GrpclbLoadBalancer}. Created when 80 * GrpclbLoadBalancer switches to GRPCLB mode. Closed and discarded when GrpclbLoadBalancer 81 * switches away from GRPCLB mode. 82 */ 83 @NotThreadSafe 84 final class GrpclbState { 85 private static final Logger logger = Logger.getLogger(GrpclbState.class.getName()); 86 87 static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); 88 private static final Attributes LB_PROVIDED_BACKEND_ATTRS = 89 Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_PROVIDED_BACKEND, true).build(); 90 91 @VisibleForTesting 92 static final PickResult DROP_PICK_RESULT = 93 PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer")); 94 95 @VisibleForTesting 96 static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { 97 @Override 98 public PickResult picked(Metadata headers) { 99 return PickResult.withNoResult(); 100 } 101 102 @Override 103 public String toString() { 104 return "BUFFER_ENTRY"; 105 } 106 }; 107 108 private final InternalLogId logId; 109 private final String serviceName; 110 private final Helper helper; 111 private final SubchannelPool subchannelPool; 112 private final TimeProvider time; 113 private final ScheduledExecutorService timerService; 114 115 private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO = 116 Attributes.Key.create("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo"); 117 private final BackoffPolicy.Provider backoffPolicyProvider; 118 119 // Scheduled only once. Never reset. 120 @Nullable 121 private FallbackModeTask fallbackTimer; 122 private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList(); 123 private boolean usingFallbackBackends; 124 // True if the current balancer has returned a serverlist. Will be reset to false when lost 125 // connection to a balancer. 126 private boolean balancerWorking; 127 @Nullable 128 private BackoffPolicy lbRpcRetryPolicy; 129 @Nullable 130 private LbRpcRetryTask lbRpcRetryTimer; 131 private long prevLbRpcStartNanos; 132 133 @Nullable 134 private ManagedChannel lbCommChannel; 135 136 @Nullable 137 private LbStream lbStream; 138 private Map<EquivalentAddressGroup, Subchannel> subchannels = Collections.emptyMap(); 139 140 // Has the same size as the round-robin list from the balancer. 141 // A drop entry from the round-robin list becomes a DropEntry here. 142 // A backend entry from the robin-robin list becomes a null here. 143 private List<DropEntry> dropList = Collections.emptyList(); 144 // Contains only non-drop, i.e., backends from the round-robin list from the balancer. 145 private List<BackendEntry> backendList = Collections.emptyList(); 146 private RoundRobinPicker currentPicker = 147 new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY)); 148 GrpclbState( Helper helper, SubchannelPool subchannelPool, TimeProvider time, ScheduledExecutorService timerService, BackoffPolicy.Provider backoffPolicyProvider, InternalLogId logId)149 GrpclbState( 150 Helper helper, 151 SubchannelPool subchannelPool, 152 TimeProvider time, 153 ScheduledExecutorService timerService, 154 BackoffPolicy.Provider backoffPolicyProvider, 155 InternalLogId logId) { 156 this.helper = checkNotNull(helper, "helper"); 157 this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool"); 158 this.time = checkNotNull(time, "time provider"); 159 this.timerService = checkNotNull(timerService, "timerService"); 160 this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); 161 this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority"); 162 this.logId = checkNotNull(logId, "logId"); 163 } 164 handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState)165 void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { 166 if (newState.getState() == SHUTDOWN || !(subchannels.values().contains(subchannel))) { 167 return; 168 } 169 if (newState.getState() == IDLE) { 170 subchannel.requestConnection(); 171 } 172 subchannel.getAttributes().get(STATE_INFO).set(newState); 173 maybeUseFallbackBackends(); 174 maybeUpdatePicker(); 175 } 176 177 /** 178 * Handle new addresses of the balancer and backends from the resolver, and create connection if 179 * not yet connected. 180 */ handleAddresses( List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers)181 void handleAddresses( 182 List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers) { 183 if (newLbAddressGroups.isEmpty()) { 184 propagateError(Status.UNAVAILABLE.withDescription( 185 "NameResolver returned no LB address while asking for GRPCLB")); 186 return; 187 } 188 LbAddressGroup newLbAddressGroup = flattenLbAddressGroups(newLbAddressGroups); 189 startLbComm(newLbAddressGroup); 190 // Avoid creating a new RPC just because the addresses were updated, as it can cause a 191 // stampeding herd. The current RPC may be on a connection to an address not present in 192 // newLbAddressGroups, but we're considering that "okay". If we detected the RPC is to an 193 // outdated backend, we could choose to re-create the RPC. 194 if (lbStream == null) { 195 startLbRpc(); 196 } 197 fallbackBackendList = newBackendServers; 198 // Start the fallback timer if it's never started 199 if (fallbackTimer == null) { 200 logger.log(Level.FINE, "[{0}] Starting fallback timer.", new Object[] {logId}); 201 fallbackTimer = new FallbackModeTask(); 202 fallbackTimer.schedule(); 203 } 204 if (usingFallbackBackends) { 205 // Populate the new fallback backends to round-robin list. 206 useFallbackBackends(); 207 } 208 maybeUpdatePicker(); 209 } 210 maybeUseFallbackBackends()211 private void maybeUseFallbackBackends() { 212 if (balancerWorking) { 213 return; 214 } 215 if (usingFallbackBackends) { 216 return; 217 } 218 if (fallbackTimer != null && !fallbackTimer.discarded) { 219 return; 220 } 221 int numReadySubchannels = 0; 222 for (Subchannel subchannel : subchannels.values()) { 223 if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) { 224 numReadySubchannels++; 225 } 226 } 227 if (numReadySubchannels > 0) { 228 return; 229 } 230 // Fallback contiditions met 231 useFallbackBackends(); 232 } 233 234 /** 235 * Populate the round-robin lists with the fallback backends. 236 */ useFallbackBackends()237 private void useFallbackBackends() { 238 usingFallbackBackends = true; 239 logger.log(Level.INFO, "[{0}] Using fallback: {1}", new Object[] {logId, fallbackBackendList}); 240 241 List<DropEntry> newDropList = new ArrayList<>(); 242 List<BackendAddressGroup> newBackendAddrList = new ArrayList<>(); 243 for (EquivalentAddressGroup eag : fallbackBackendList) { 244 newDropList.add(null); 245 newBackendAddrList.add(new BackendAddressGroup(eag, null)); 246 } 247 useRoundRobinLists(newDropList, newBackendAddrList, null); 248 } 249 shutdownLbComm()250 private void shutdownLbComm() { 251 if (lbCommChannel != null) { 252 lbCommChannel.shutdown(); 253 lbCommChannel = null; 254 } 255 shutdownLbRpc(); 256 } 257 shutdownLbRpc()258 private void shutdownLbRpc() { 259 if (lbStream != null) { 260 lbStream.close(null); 261 // lbStream will be set to null in LbStream.cleanup() 262 } 263 } 264 startLbComm(LbAddressGroup lbAddressGroup)265 private void startLbComm(LbAddressGroup lbAddressGroup) { 266 checkNotNull(lbAddressGroup, "lbAddressGroup"); 267 if (lbCommChannel == null) { 268 lbCommChannel = helper.createOobChannel( 269 lbAddressGroup.getAddresses(), lbAddressGroup.getAuthority()); 270 } else if (lbAddressGroup.getAuthority().equals(lbCommChannel.authority())) { 271 helper.updateOobChannelAddresses(lbCommChannel, lbAddressGroup.getAddresses()); 272 } else { 273 // Full restart of channel 274 shutdownLbComm(); 275 lbCommChannel = helper.createOobChannel( 276 lbAddressGroup.getAddresses(), lbAddressGroup.getAuthority()); 277 } 278 } 279 startLbRpc()280 private void startLbRpc() { 281 checkState(lbStream == null, "previous lbStream has not been cleared yet"); 282 LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel); 283 lbStream = new LbStream(stub); 284 lbStream.start(); 285 prevLbRpcStartNanos = time.currentTimeNanos(); 286 287 LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder() 288 .setInitialRequest(InitialLoadBalanceRequest.newBuilder() 289 .setName(serviceName).build()) 290 .build(); 291 try { 292 lbStream.lbRequestWriter.onNext(initRequest); 293 } catch (Exception e) { 294 lbStream.close(e); 295 } 296 } 297 cancelFallbackTimer()298 private void cancelFallbackTimer() { 299 if (fallbackTimer != null) { 300 fallbackTimer.cancel(); 301 } 302 } 303 cancelLbRpcRetryTimer()304 private void cancelLbRpcRetryTimer() { 305 if (lbRpcRetryTimer != null) { 306 lbRpcRetryTimer.cancel(); 307 } 308 } 309 shutdown()310 void shutdown() { 311 shutdownLbComm(); 312 // We close the subchannels through subchannelPool instead of helper just for convenience of 313 // testing. 314 for (Subchannel subchannel : subchannels.values()) { 315 subchannelPool.returnSubchannel(subchannel); 316 } 317 subchannels = Collections.emptyMap(); 318 subchannelPool.clear(); 319 cancelFallbackTimer(); 320 cancelLbRpcRetryTimer(); 321 } 322 propagateError(Status status)323 void propagateError(Status status) { 324 logger.log(Level.FINE, "[{0}] Had an error: {1}; dropList={2}; backendList={3}", 325 new Object[] {logId, status, dropList, backendList}); 326 if (backendList.isEmpty()) { 327 maybeUpdatePicker( 328 TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(status)))); 329 } 330 } 331 332 @VisibleForTesting 333 @Nullable getLoadRecorder()334 GrpclbClientLoadRecorder getLoadRecorder() { 335 if (lbStream == null) { 336 return null; 337 } 338 return lbStream.loadRecorder; 339 } 340 341 /** 342 * Populate the round-robin lists with the given values. 343 */ useRoundRobinLists( List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList, @Nullable GrpclbClientLoadRecorder loadRecorder)344 private void useRoundRobinLists( 345 List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList, 346 @Nullable GrpclbClientLoadRecorder loadRecorder) { 347 logger.log(Level.FINE, "[{0}] Using round-robin list: {1}, droplist={2}", 348 new Object[] {logId, newBackendAddrList, newDropList}); 349 HashMap<EquivalentAddressGroup, Subchannel> newSubchannelMap = 350 new HashMap<EquivalentAddressGroup, Subchannel>(); 351 List<BackendEntry> newBackendList = new ArrayList<>(); 352 353 for (BackendAddressGroup backendAddr : newBackendAddrList) { 354 EquivalentAddressGroup eag = backendAddr.getAddresses(); 355 Subchannel subchannel = newSubchannelMap.get(eag); 356 if (subchannel == null) { 357 subchannel = subchannels.get(eag); 358 if (subchannel == null) { 359 Attributes subchannelAttrs = Attributes.newBuilder() 360 .set(STATE_INFO, 361 new AtomicReference<ConnectivityStateInfo>( 362 ConnectivityStateInfo.forNonError(IDLE))) 363 .build(); 364 subchannel = subchannelPool.takeOrCreateSubchannel(eag, subchannelAttrs); 365 subchannel.requestConnection(); 366 } 367 newSubchannelMap.put(eag, subchannel); 368 } 369 BackendEntry entry; 370 // Only picks with tokens are reported to LoadRecorder 371 if (backendAddr.getToken() == null) { 372 entry = new BackendEntry(subchannel); 373 } else { 374 entry = new BackendEntry(subchannel, loadRecorder, backendAddr.getToken()); 375 } 376 newBackendList.add(entry); 377 } 378 379 // Close Subchannels whose addresses have been delisted 380 for (Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()) { 381 EquivalentAddressGroup eag = entry.getKey(); 382 if (!newSubchannelMap.containsKey(eag)) { 383 subchannelPool.returnSubchannel(entry.getValue()); 384 } 385 } 386 387 subchannels = Collections.unmodifiableMap(newSubchannelMap); 388 dropList = Collections.unmodifiableList(newDropList); 389 backendList = Collections.unmodifiableList(newBackendList); 390 } 391 392 @VisibleForTesting 393 class FallbackModeTask implements Runnable { 394 private ScheduledFuture<?> scheduledFuture; 395 private boolean discarded; 396 397 @Override run()398 public void run() { 399 helper.runSerialized(new Runnable() { 400 @Override 401 public void run() { 402 checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch"); 403 discarded = true; 404 maybeUseFallbackBackends(); 405 maybeUpdatePicker(); 406 } 407 }); 408 } 409 cancel()410 void cancel() { 411 discarded = true; 412 scheduledFuture.cancel(false); 413 } 414 schedule()415 void schedule() { 416 checkState(scheduledFuture == null, "FallbackModeTask already scheduled"); 417 scheduledFuture = timerService.schedule(this, FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); 418 } 419 } 420 421 @VisibleForTesting 422 class LbRpcRetryTask implements Runnable { 423 private ScheduledFuture<?> scheduledFuture; 424 425 @Override run()426 public void run() { 427 helper.runSerialized(new Runnable() { 428 @Override 429 public void run() { 430 checkState( 431 lbRpcRetryTimer == LbRpcRetryTask.this, "LbRpc retry timer mismatch"); 432 startLbRpc(); 433 } 434 }); 435 } 436 cancel()437 void cancel() { 438 scheduledFuture.cancel(false); 439 } 440 schedule(long delayNanos)441 void schedule(long delayNanos) { 442 checkState(scheduledFuture == null, "LbRpcRetryTask already scheduled"); 443 scheduledFuture = timerService.schedule(this, delayNanos, TimeUnit.NANOSECONDS); 444 } 445 } 446 447 @VisibleForTesting 448 class LoadReportingTask implements Runnable { 449 private final LbStream stream; 450 LoadReportingTask(LbStream stream)451 LoadReportingTask(LbStream stream) { 452 this.stream = stream; 453 } 454 455 @Override run()456 public void run() { 457 helper.runSerialized(new Runnable() { 458 @Override 459 public void run() { 460 stream.loadReportFuture = null; 461 stream.sendLoadReport(); 462 } 463 }); 464 } 465 } 466 467 private class LbStream implements StreamObserver<LoadBalanceResponse> { 468 final GrpclbClientLoadRecorder loadRecorder; 469 final LoadBalancerGrpc.LoadBalancerStub stub; 470 StreamObserver<LoadBalanceRequest> lbRequestWriter; 471 472 // These fields are only accessed from helper.runSerialized() 473 boolean initialResponseReceived; 474 boolean closed; 475 long loadReportIntervalMillis = -1; 476 ScheduledFuture<?> loadReportFuture; 477 LbStream(LoadBalancerGrpc.LoadBalancerStub stub)478 LbStream(LoadBalancerGrpc.LoadBalancerStub stub) { 479 this.stub = checkNotNull(stub, "stub"); 480 // Stats data only valid for current LbStream. We do not carry over data from previous 481 // stream. 482 loadRecorder = new GrpclbClientLoadRecorder(time); 483 } 484 start()485 void start() { 486 lbRequestWriter = stub.withWaitForReady().balanceLoad(this); 487 } 488 onNext(final LoadBalanceResponse response)489 @Override public void onNext(final LoadBalanceResponse response) { 490 helper.runSerialized(new Runnable() { 491 @Override 492 public void run() { 493 handleResponse(response); 494 } 495 }); 496 } 497 onError(final Throwable error)498 @Override public void onError(final Throwable error) { 499 helper.runSerialized(new Runnable() { 500 @Override 501 public void run() { 502 handleStreamClosed(Status.fromThrowable(error) 503 .augmentDescription("Stream to GRPCLB LoadBalancer had an error")); 504 } 505 }); 506 } 507 onCompleted()508 @Override public void onCompleted() { 509 helper.runSerialized(new Runnable() { 510 @Override 511 public void run() { 512 handleStreamClosed( 513 Status.UNAVAILABLE.withDescription("Stream to GRPCLB LoadBalancer was closed")); 514 } 515 }); 516 } 517 518 // Following methods must be run in helper.runSerialized() 519 sendLoadReport()520 private void sendLoadReport() { 521 if (closed) { 522 return; 523 } 524 ClientStats stats = loadRecorder.generateLoadReport(); 525 // TODO(zhangkun83): flow control? 526 try { 527 lbRequestWriter.onNext(LoadBalanceRequest.newBuilder().setClientStats(stats).build()); 528 scheduleNextLoadReport(); 529 } catch (Exception e) { 530 close(e); 531 } 532 } 533 scheduleNextLoadReport()534 private void scheduleNextLoadReport() { 535 if (loadReportIntervalMillis > 0) { 536 loadReportFuture = timerService.schedule( 537 new LoadReportingTask(this), loadReportIntervalMillis, TimeUnit.MILLISECONDS); 538 } 539 } 540 handleResponse(LoadBalanceResponse response)541 private void handleResponse(LoadBalanceResponse response) { 542 if (closed) { 543 return; 544 } 545 logger.log(Level.FINER, "[{0}] Got an LB response: {1}", new Object[] {logId, response}); 546 547 LoadBalanceResponseTypeCase typeCase = response.getLoadBalanceResponseTypeCase(); 548 if (!initialResponseReceived) { 549 if (typeCase != LoadBalanceResponseTypeCase.INITIAL_RESPONSE) { 550 logger.log( 551 Level.WARNING, 552 "[{0}] : Did not receive response with type initial response: {1}", 553 new Object[] {logId, response}); 554 return; 555 } 556 initialResponseReceived = true; 557 InitialLoadBalanceResponse initialResponse = response.getInitialResponse(); 558 loadReportIntervalMillis = 559 Durations.toMillis(initialResponse.getClientStatsReportInterval()); 560 scheduleNextLoadReport(); 561 return; 562 } 563 564 if (typeCase != LoadBalanceResponseTypeCase.SERVER_LIST) { 565 logger.log( 566 Level.WARNING, 567 "[{0}] : Ignoring unexpected response type: {1}", 568 new Object[] {logId, response}); 569 return; 570 } 571 572 balancerWorking = true; 573 // TODO(zhangkun83): handle delegate from initialResponse 574 ServerList serverList = response.getServerList(); 575 List<DropEntry> newDropList = new ArrayList<>(); 576 List<BackendAddressGroup> newBackendAddrList = new ArrayList<>(); 577 // Construct the new collections. Create new Subchannels when necessary. 578 for (Server server : serverList.getServersList()) { 579 String token = server.getLoadBalanceToken(); 580 if (server.getDrop()) { 581 newDropList.add(new DropEntry(loadRecorder, token)); 582 } else { 583 newDropList.add(null); 584 InetSocketAddress address; 585 try { 586 address = new InetSocketAddress( 587 InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort()); 588 } catch (UnknownHostException e) { 589 propagateError( 590 Status.UNAVAILABLE 591 .withDescription("Host for server not found: " + server) 592 .withCause(e)); 593 continue; 594 } 595 // ALTS code can use the presence of ATTR_LB_PROVIDED_BACKEND to select ALTS instead of 596 // TLS, with Netty. 597 EquivalentAddressGroup eag = 598 new EquivalentAddressGroup(address, LB_PROVIDED_BACKEND_ATTRS); 599 newBackendAddrList.add(new BackendAddressGroup(eag, token)); 600 } 601 } 602 // Stop using fallback backends as soon as a new server list is received from the balancer. 603 usingFallbackBackends = false; 604 cancelFallbackTimer(); 605 useRoundRobinLists(newDropList, newBackendAddrList, loadRecorder); 606 maybeUpdatePicker(); 607 } 608 handleStreamClosed(Status error)609 private void handleStreamClosed(Status error) { 610 checkArgument(!error.isOk(), "unexpected OK status"); 611 if (closed) { 612 return; 613 } 614 closed = true; 615 cleanUp(); 616 propagateError(error); 617 balancerWorking = false; 618 maybeUseFallbackBackends(); 619 maybeUpdatePicker(); 620 621 long delayNanos = 0; 622 if (initialResponseReceived || lbRpcRetryPolicy == null) { 623 // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence 624 // has never been initialized. 625 lbRpcRetryPolicy = backoffPolicyProvider.get(); 626 } 627 // Backoff only when balancer wasn't working previously. 628 if (!initialResponseReceived) { 629 // The back-off policy determines the interval between consecutive RPC upstarts, thus the 630 // actual delay may be smaller than the value from the back-off policy, or even negative, 631 // depending how much time was spent in the previous RPC. 632 delayNanos = 633 prevLbRpcStartNanos + lbRpcRetryPolicy.nextBackoffNanos() - time.currentTimeNanos(); 634 } 635 if (delayNanos <= 0) { 636 startLbRpc(); 637 } else { 638 lbRpcRetryTimer = new LbRpcRetryTask(); 639 lbRpcRetryTimer.schedule(delayNanos); 640 } 641 } 642 close(@ullable Exception error)643 void close(@Nullable Exception error) { 644 if (closed) { 645 return; 646 } 647 closed = true; 648 cleanUp(); 649 try { 650 if (error == null) { 651 lbRequestWriter.onCompleted(); 652 } else { 653 lbRequestWriter.onError(error); 654 } 655 } catch (Exception e) { 656 // Don't care 657 } 658 } 659 cleanUp()660 private void cleanUp() { 661 if (loadReportFuture != null) { 662 loadReportFuture.cancel(false); 663 loadReportFuture = null; 664 } 665 if (lbStream == this) { 666 lbStream = null; 667 } 668 } 669 } 670 671 /** 672 * Make and use a picker out of the current lists and the states of subchannels if they have 673 * changed since the last picker created. 674 */ maybeUpdatePicker()675 private void maybeUpdatePicker() { 676 List<RoundRobinEntry> pickList = new ArrayList<>(backendList.size()); 677 Status error = null; 678 boolean hasIdle = false; 679 for (BackendEntry entry : backendList) { 680 Subchannel subchannel = entry.result.getSubchannel(); 681 Attributes attrs = subchannel.getAttributes(); 682 ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get(); 683 if (stateInfo.getState() == READY) { 684 pickList.add(entry); 685 } else if (stateInfo.getState() == TRANSIENT_FAILURE) { 686 error = stateInfo.getStatus(); 687 } else if (stateInfo.getState() == IDLE) { 688 hasIdle = true; 689 } 690 } 691 ConnectivityState state; 692 if (pickList.isEmpty()) { 693 if (error != null && !hasIdle) { 694 logger.log(Level.FINE, "[{0}] No ready Subchannel. Using error: {1}", 695 new Object[] {logId, error}); 696 pickList.add(new ErrorEntry(error)); 697 state = TRANSIENT_FAILURE; 698 } else { 699 logger.log(Level.FINE, "[{0}] No ready Subchannel and still connecting", logId); 700 pickList.add(BUFFER_ENTRY); 701 state = CONNECTING; 702 } 703 } else { 704 logger.log( 705 Level.FINE, "[{0}] Using drop list {1} and pick list {2}", 706 new Object[] {logId, dropList, pickList}); 707 state = READY; 708 } 709 maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList)); 710 } 711 712 /** 713 * Update the given picker to the helper if it's different from the current one. 714 */ maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker)715 private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker) { 716 // Discard the new picker if we are sure it won't make any difference, in order to save 717 // re-processing pending streams, and avoid unnecessary resetting of the pointer in 718 // RoundRobinPicker. 719 if (picker.dropList.equals(currentPicker.dropList) 720 && picker.pickList.equals(currentPicker.pickList)) { 721 return; 722 } 723 // No need to skip ErrorPicker. If the current picker is ErrorPicker, there won't be any pending 724 // stream thus no time is wasted in re-process. 725 currentPicker = picker; 726 helper.updateBalancingState(state, picker); 727 } 728 flattenLbAddressGroups(List<LbAddressGroup> groupList)729 private LbAddressGroup flattenLbAddressGroups(List<LbAddressGroup> groupList) { 730 assert !groupList.isEmpty(); 731 List<EquivalentAddressGroup> eags = new ArrayList<>(groupList.size()); 732 String authority = groupList.get(0).getAuthority(); 733 for (LbAddressGroup group : groupList) { 734 if (!authority.equals(group.getAuthority())) { 735 // TODO(ejona): Allow different authorities for different addresses. Requires support from 736 // Helper. 737 logger.log(Level.WARNING, 738 "[{0}] Multiple authorities found for LB. " 739 + "Skipping addresses for {0} in preference to {1}", 740 new Object[] {logId, group.getAuthority(), authority}); 741 } else { 742 eags.add(group.getAddresses()); 743 } 744 } 745 // ALTS code can use the presence of ATTR_LB_ADDR_AUTHORITY to select ALTS instead of TLS, with 746 // Netty. 747 // TODO(ejona): The process here is a bit of a hack because ATTR_LB_ADDR_AUTHORITY isn't 748 // actually used in the normal case. https://github.com/grpc/grpc-java/issues/4618 should allow 749 // this to be more obvious. 750 Attributes attrs = Attributes.newBuilder() 751 .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority) 752 .build(); 753 return new LbAddressGroup(flattenEquivalentAddressGroup(eags, attrs), authority); 754 } 755 756 /** 757 * Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object. 758 */ flattenEquivalentAddressGroup( List<EquivalentAddressGroup> groupList, Attributes attrs)759 private static EquivalentAddressGroup flattenEquivalentAddressGroup( 760 List<EquivalentAddressGroup> groupList, Attributes attrs) { 761 List<SocketAddress> addrs = new ArrayList<>(); 762 for (EquivalentAddressGroup group : groupList) { 763 addrs.addAll(group.getAddresses()); 764 } 765 return new EquivalentAddressGroup(addrs, attrs); 766 } 767 768 @VisibleForTesting 769 static final class DropEntry { 770 private final GrpclbClientLoadRecorder loadRecorder; 771 private final String token; 772 DropEntry(GrpclbClientLoadRecorder loadRecorder, String token)773 DropEntry(GrpclbClientLoadRecorder loadRecorder, String token) { 774 this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder"); 775 this.token = checkNotNull(token, "token"); 776 } 777 picked()778 PickResult picked() { 779 loadRecorder.recordDroppedRequest(token); 780 return DROP_PICK_RESULT; 781 } 782 783 @Override toString()784 public String toString() { 785 return MoreObjects.toStringHelper(this) 786 .add("loadRecorder", loadRecorder) 787 .add("token", token) 788 .toString(); 789 } 790 791 @Override hashCode()792 public int hashCode() { 793 return Objects.hashCode(loadRecorder, token); 794 } 795 796 @Override equals(Object other)797 public boolean equals(Object other) { 798 if (!(other instanceof DropEntry)) { 799 return false; 800 } 801 DropEntry that = (DropEntry) other; 802 return Objects.equal(loadRecorder, that.loadRecorder) && Objects.equal(token, that.token); 803 } 804 } 805 806 private interface RoundRobinEntry { picked(Metadata headers)807 PickResult picked(Metadata headers); 808 } 809 810 @VisibleForTesting 811 static final class BackendEntry implements RoundRobinEntry { 812 @VisibleForTesting 813 final PickResult result; 814 @Nullable 815 private final GrpclbClientLoadRecorder loadRecorder; 816 @Nullable 817 private final String token; 818 819 /** 820 * Creates a BackendEntry whose usage will be reported to load recorder. 821 */ BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token)822 BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) { 823 this.result = PickResult.withSubchannel(subchannel, loadRecorder); 824 this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder"); 825 this.token = checkNotNull(token, "token"); 826 } 827 828 /** 829 * Creates a BackendEntry whose usage will not be reported. 830 */ BackendEntry(Subchannel subchannel)831 BackendEntry(Subchannel subchannel) { 832 this.result = PickResult.withSubchannel(subchannel); 833 this.loadRecorder = null; 834 this.token = null; 835 } 836 837 @Override picked(Metadata headers)838 public PickResult picked(Metadata headers) { 839 headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY); 840 if (token != null) { 841 headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token); 842 } 843 return result; 844 } 845 846 @Override toString()847 public String toString() { 848 return MoreObjects.toStringHelper(this) 849 .add("result", result) 850 .add("loadRecorder", loadRecorder) 851 .add("token", token) 852 .toString(); 853 } 854 855 @Override hashCode()856 public int hashCode() { 857 return Objects.hashCode(loadRecorder, result, token); 858 } 859 860 @Override equals(Object other)861 public boolean equals(Object other) { 862 if (!(other instanceof BackendEntry)) { 863 return false; 864 } 865 BackendEntry that = (BackendEntry) other; 866 return Objects.equal(result, that.result) && Objects.equal(token, that.token) 867 && Objects.equal(loadRecorder, that.loadRecorder); 868 } 869 } 870 871 @VisibleForTesting 872 static final class ErrorEntry implements RoundRobinEntry { 873 final PickResult result; 874 ErrorEntry(Status status)875 ErrorEntry(Status status) { 876 result = PickResult.withError(status); 877 } 878 879 @Override picked(Metadata headers)880 public PickResult picked(Metadata headers) { 881 return result; 882 } 883 884 @Override hashCode()885 public int hashCode() { 886 return Objects.hashCode(result); 887 } 888 889 @Override equals(Object other)890 public boolean equals(Object other) { 891 if (!(other instanceof ErrorEntry)) { 892 return false; 893 } 894 return Objects.equal(result, ((ErrorEntry) other).result); 895 } 896 897 @Override toString()898 public String toString() { 899 return MoreObjects.toStringHelper(this) 900 .add("result", result) 901 .toString(); 902 } 903 } 904 905 @VisibleForTesting 906 static final class RoundRobinPicker extends SubchannelPicker { 907 @VisibleForTesting 908 final List<DropEntry> dropList; 909 private int dropIndex; 910 911 @VisibleForTesting 912 final List<? extends RoundRobinEntry> pickList; 913 private int pickIndex; 914 915 // dropList can be empty, which means no drop. 916 // pickList must not be empty. RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList)917 RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList) { 918 this.dropList = checkNotNull(dropList, "dropList"); 919 this.pickList = checkNotNull(pickList, "pickList"); 920 checkArgument(!pickList.isEmpty(), "pickList is empty"); 921 } 922 923 @Override pickSubchannel(PickSubchannelArgs args)924 public PickResult pickSubchannel(PickSubchannelArgs args) { 925 synchronized (pickList) { 926 // Two-level round-robin. 927 // First round-robin on dropList. If a drop entry is selected, request will be dropped. If 928 // a non-drop entry is selected, then round-robin on pickList. This makes sure requests are 929 // dropped at the same proportion as the drop entries appear on the round-robin list from 930 // the balancer, while only READY backends (that make up pickList) are selected for the 931 // non-drop cases. 932 if (!dropList.isEmpty()) { 933 DropEntry drop = dropList.get(dropIndex); 934 dropIndex++; 935 if (dropIndex == dropList.size()) { 936 dropIndex = 0; 937 } 938 if (drop != null) { 939 return drop.picked(); 940 } 941 } 942 943 RoundRobinEntry pick = pickList.get(pickIndex); 944 pickIndex++; 945 if (pickIndex == pickList.size()) { 946 pickIndex = 0; 947 } 948 return pick.picked(args.getHeaders()); 949 } 950 } 951 } 952 } 953