1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static io.grpc.ConnectivityState.CONNECTING; 20 import static io.grpc.ConnectivityState.IDLE; 21 import static io.grpc.ConnectivityState.READY; 22 import static io.grpc.ConnectivityState.SHUTDOWN; 23 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 24 25 import com.google.common.annotations.VisibleForTesting; 26 import com.google.common.base.MoreObjects; 27 import com.google.common.base.Preconditions; 28 import com.google.common.base.Stopwatch; 29 import com.google.common.base.Supplier; 30 import com.google.common.util.concurrent.ListenableFuture; 31 import com.google.common.util.concurrent.SettableFuture; 32 import com.google.errorprone.annotations.ForOverride; 33 import io.grpc.Attributes; 34 import io.grpc.CallOptions; 35 import io.grpc.ConnectivityState; 36 import io.grpc.ConnectivityStateInfo; 37 import io.grpc.EquivalentAddressGroup; 38 import io.grpc.InternalChannelz; 39 import io.grpc.InternalChannelz.ChannelStats; 40 import io.grpc.InternalChannelz.ChannelTrace; 41 import io.grpc.InternalInstrumented; 42 import io.grpc.InternalLogId; 43 import io.grpc.InternalWithLogId; 44 import io.grpc.Metadata; 45 import io.grpc.MethodDescriptor; 46 import io.grpc.Status; 47 import java.net.SocketAddress; 48 import java.util.ArrayList; 49 import java.util.Collection; 50 import java.util.Collections; 51 import java.util.List; 52 import java.util.concurrent.ScheduledExecutorService; 53 import java.util.concurrent.ScheduledFuture; 54 import java.util.concurrent.TimeUnit; 55 import java.util.logging.Level; 56 import java.util.logging.Logger; 57 import javax.annotation.CheckForNull; 58 import javax.annotation.Nullable; 59 import javax.annotation.concurrent.GuardedBy; 60 import javax.annotation.concurrent.ThreadSafe; 61 62 /** 63 * Transports for a single {@link SocketAddress}. 64 */ 65 @ThreadSafe 66 final class InternalSubchannel implements InternalInstrumented<ChannelStats> { 67 private static final Logger log = Logger.getLogger(InternalSubchannel.class.getName()); 68 69 private final InternalLogId logId = InternalLogId.allocate(getClass().getName()); 70 private final String authority; 71 private final String userAgent; 72 private final BackoffPolicy.Provider backoffPolicyProvider; 73 private final Callback callback; 74 private final ClientTransportFactory transportFactory; 75 private final ScheduledExecutorService scheduledExecutor; 76 private final InternalChannelz channelz; 77 private final CallTracer callsTracer; 78 @CheckForNull 79 private final ChannelTracer channelTracer; 80 private final TimeProvider timeProvider; 81 82 // File-specific convention: methods without GuardedBy("lock") MUST NOT be called under the lock. 83 private final Object lock = new Object(); 84 85 // File-specific convention: 86 // 87 // 1. In a method without GuardedBy("lock"), executeLater() MUST be followed by a drain() later in 88 // the same method. 89 // 90 // 2. drain() MUST NOT be called under "lock". 91 // 92 // 3. Every synchronized("lock") must be inside a try-finally which calls drain() in "finally". 93 private final ChannelExecutor channelExecutor; 94 95 /** 96 * The index of the address corresponding to pendingTransport/activeTransport, or at beginning if 97 * both are null. 98 */ 99 @GuardedBy("lock") 100 private Index addressIndex; 101 102 /** 103 * The policy to control back off between reconnects. Non-{@code null} when a reconnect task is 104 * scheduled. 105 */ 106 @GuardedBy("lock") 107 private BackoffPolicy reconnectPolicy; 108 109 /** 110 * Timer monitoring duration since entering CONNECTING state. 111 */ 112 @GuardedBy("lock") 113 private final Stopwatch connectingTimer; 114 115 @GuardedBy("lock") 116 @Nullable 117 private ScheduledFuture<?> reconnectTask; 118 119 @GuardedBy("lock") 120 private boolean reconnectCanceled; 121 122 /** 123 * All transports that are not terminated. At the very least the value of {@link #activeTransport} 124 * will be present, but previously used transports that still have streams or are stopping may 125 * also be present. 126 */ 127 @GuardedBy("lock") 128 private final Collection<ConnectionClientTransport> transports = new ArrayList<>(); 129 130 // Must only be used from channelExecutor 131 private final InUseStateAggregator<ConnectionClientTransport> inUseStateAggregator = 132 new InUseStateAggregator<ConnectionClientTransport>() { 133 @Override 134 void handleInUse() { 135 callback.onInUse(InternalSubchannel.this); 136 } 137 138 @Override 139 void handleNotInUse() { 140 callback.onNotInUse(InternalSubchannel.this); 141 } 142 }; 143 144 /** 145 * The to-be active transport, which is not ready yet. 146 */ 147 @GuardedBy("lock") 148 @Nullable 149 private ConnectionClientTransport pendingTransport; 150 151 /** 152 * The transport for new outgoing requests. 'lock' must be held when assigning to it. Non-null 153 * only in READY state. 154 */ 155 @Nullable 156 private volatile ManagedClientTransport activeTransport; 157 158 @GuardedBy("lock") 159 private ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE); 160 161 @GuardedBy("lock") 162 private Status shutdownReason; 163 InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent, BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, Supplier<Stopwatch> stopwatchSupplier, ChannelExecutor channelExecutor, Callback callback, InternalChannelz channelz, CallTracer callsTracer, @Nullable ChannelTracer channelTracer, TimeProvider timeProvider)164 InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent, 165 BackoffPolicy.Provider backoffPolicyProvider, 166 ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, 167 Supplier<Stopwatch> stopwatchSupplier, ChannelExecutor channelExecutor, Callback callback, 168 InternalChannelz channelz, CallTracer callsTracer, @Nullable ChannelTracer channelTracer, 169 TimeProvider timeProvider) { 170 Preconditions.checkNotNull(addressGroups, "addressGroups"); 171 Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty"); 172 checkListHasNoNulls(addressGroups, "addressGroups contains null entry"); 173 this.addressIndex = new Index( 174 Collections.unmodifiableList(new ArrayList<>(addressGroups))); 175 this.authority = authority; 176 this.userAgent = userAgent; 177 this.backoffPolicyProvider = backoffPolicyProvider; 178 this.transportFactory = transportFactory; 179 this.scheduledExecutor = scheduledExecutor; 180 this.connectingTimer = stopwatchSupplier.get(); 181 this.channelExecutor = channelExecutor; 182 this.callback = callback; 183 this.channelz = channelz; 184 this.callsTracer = callsTracer; 185 this.channelTracer = channelTracer; 186 this.timeProvider = timeProvider; 187 } 188 189 /** 190 * Returns a READY transport that will be used to create new streams. 191 * 192 * <p>Returns {@code null} if the state is not READY. Will try to connect if state is IDLE. 193 */ 194 @Nullable obtainActiveTransport()195 ClientTransport obtainActiveTransport() { 196 ClientTransport savedTransport = activeTransport; 197 if (savedTransport != null) { 198 return savedTransport; 199 } 200 try { 201 synchronized (lock) { 202 savedTransport = activeTransport; 203 // Check again, since it could have changed before acquiring the lock 204 if (savedTransport != null) { 205 return savedTransport; 206 } 207 if (state.getState() == IDLE) { 208 gotoNonErrorState(CONNECTING); 209 startNewTransport(); 210 } 211 } 212 } finally { 213 channelExecutor.drain(); 214 } 215 return null; 216 } 217 218 @GuardedBy("lock") startNewTransport()219 private void startNewTransport() { 220 Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled"); 221 222 if (addressIndex.isAtBeginning()) { 223 connectingTimer.reset().start(); 224 } 225 SocketAddress address = addressIndex.getCurrentAddress(); 226 227 ProxyParameters proxy = null; 228 if (address instanceof ProxySocketAddress) { 229 proxy = ((ProxySocketAddress) address).getProxyParameters(); 230 address = ((ProxySocketAddress) address).getAddress(); 231 } 232 233 ClientTransportFactory.ClientTransportOptions options = 234 new ClientTransportFactory.ClientTransportOptions() 235 .setAuthority(authority) 236 .setEagAttributes(addressIndex.getCurrentEagAttributes()) 237 .setUserAgent(userAgent) 238 .setProxyParameters(proxy); 239 ConnectionClientTransport transport = 240 new CallTracingTransport( 241 transportFactory.newClientTransport(address, options), callsTracer); 242 channelz.addClientSocket(transport); 243 if (log.isLoggable(Level.FINE)) { 244 log.log(Level.FINE, "[{0}] Created {1} for {2}", 245 new Object[] {logId, transport.getLogId(), address}); 246 } 247 pendingTransport = transport; 248 transports.add(transport); 249 Runnable runnable = transport.start(new TransportListener(transport, address)); 250 if (runnable != null) { 251 channelExecutor.executeLater(runnable); 252 } 253 } 254 255 /** 256 * Only called after all addresses attempted and failed (TRANSIENT_FAILURE). 257 * @param status the causal status when the channel begins transition to 258 * TRANSIENT_FAILURE. 259 */ 260 @GuardedBy("lock") scheduleBackoff(final Status status)261 private void scheduleBackoff(final Status status) { 262 class EndOfCurrentBackoff implements Runnable { 263 @Override 264 public void run() { 265 try { 266 synchronized (lock) { 267 reconnectTask = null; 268 if (reconnectCanceled) { 269 // Even though cancelReconnectTask() will cancel this task, the task may have already 270 // started when it's being canceled. 271 return; 272 } 273 gotoNonErrorState(CONNECTING); 274 startNewTransport(); 275 } 276 } catch (Throwable t) { 277 log.log(Level.WARNING, "Exception handling end of backoff", t); 278 } finally { 279 channelExecutor.drain(); 280 } 281 } 282 } 283 284 gotoState(ConnectivityStateInfo.forTransientFailure(status)); 285 if (reconnectPolicy == null) { 286 reconnectPolicy = backoffPolicyProvider.get(); 287 } 288 long delayNanos = 289 reconnectPolicy.nextBackoffNanos() - connectingTimer.elapsed(TimeUnit.NANOSECONDS); 290 if (log.isLoggable(Level.FINE)) { 291 log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ns", new Object[]{logId, delayNanos}); 292 } 293 Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done"); 294 reconnectCanceled = false; 295 reconnectTask = scheduledExecutor.schedule( 296 new LogExceptionRunnable(new EndOfCurrentBackoff()), 297 delayNanos, 298 TimeUnit.NANOSECONDS); 299 } 300 301 /** 302 * Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise this 303 * method has no effect. 304 */ resetConnectBackoff()305 void resetConnectBackoff() { 306 try { 307 synchronized (lock) { 308 if (state.getState() != TRANSIENT_FAILURE) { 309 return; 310 } 311 cancelReconnectTask(); 312 gotoNonErrorState(CONNECTING); 313 startNewTransport(); 314 } 315 } finally { 316 channelExecutor.drain(); 317 } 318 } 319 320 @GuardedBy("lock") gotoNonErrorState(ConnectivityState newState)321 private void gotoNonErrorState(ConnectivityState newState) { 322 gotoState(ConnectivityStateInfo.forNonError(newState)); 323 } 324 325 @GuardedBy("lock") gotoState(final ConnectivityStateInfo newState)326 private void gotoState(final ConnectivityStateInfo newState) { 327 if (state.getState() != newState.getState()) { 328 Preconditions.checkState(state.getState() != SHUTDOWN, 329 "Cannot transition out of SHUTDOWN to " + newState); 330 state = newState; 331 if (channelTracer != null) { 332 channelTracer.reportEvent( 333 new ChannelTrace.Event.Builder() 334 .setDescription("Entering " + state + " state") 335 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 336 .setTimestampNanos(timeProvider.currentTimeNanos()) 337 .build()); 338 } 339 channelExecutor.executeLater(new Runnable() { 340 @Override 341 public void run() { 342 callback.onStateChange(InternalSubchannel.this, newState); 343 } 344 }); 345 } 346 } 347 348 /** Replaces the existing addresses, avoiding unnecessary reconnects. */ updateAddresses(List<EquivalentAddressGroup> newAddressGroups)349 public void updateAddresses(List<EquivalentAddressGroup> newAddressGroups) { 350 Preconditions.checkNotNull(newAddressGroups, "newAddressGroups"); 351 checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry"); 352 Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty"); 353 newAddressGroups = 354 Collections.unmodifiableList(new ArrayList<>(newAddressGroups)); 355 ManagedClientTransport savedTransport = null; 356 try { 357 synchronized (lock) { 358 SocketAddress previousAddress = addressIndex.getCurrentAddress(); 359 addressIndex.updateGroups(newAddressGroups); 360 if (state.getState() == READY || state.getState() == CONNECTING) { 361 if (!addressIndex.seekTo(previousAddress)) { 362 // Forced to drop the connection 363 if (state.getState() == READY) { 364 savedTransport = activeTransport; 365 activeTransport = null; 366 addressIndex.reset(); 367 gotoNonErrorState(IDLE); 368 } else { 369 savedTransport = pendingTransport; 370 pendingTransport = null; 371 addressIndex.reset(); 372 startNewTransport(); 373 } 374 } 375 } 376 } 377 } finally { 378 channelExecutor.drain(); 379 } 380 if (savedTransport != null) { 381 savedTransport.shutdown( 382 Status.UNAVAILABLE.withDescription( 383 "InternalSubchannel closed transport due to address change")); 384 } 385 } 386 shutdown(Status reason)387 public void shutdown(Status reason) { 388 ManagedClientTransport savedActiveTransport; 389 ConnectionClientTransport savedPendingTransport; 390 try { 391 synchronized (lock) { 392 if (state.getState() == SHUTDOWN) { 393 return; 394 } 395 shutdownReason = reason; 396 gotoNonErrorState(SHUTDOWN); 397 savedActiveTransport = activeTransport; 398 savedPendingTransport = pendingTransport; 399 activeTransport = null; 400 pendingTransport = null; 401 addressIndex.reset(); 402 if (transports.isEmpty()) { 403 handleTermination(); 404 if (log.isLoggable(Level.FINE)) { 405 log.log(Level.FINE, "[{0}] Terminated in shutdown()", logId); 406 } 407 } // else: the callback will be run once all transports have been terminated 408 cancelReconnectTask(); 409 } 410 } finally { 411 channelExecutor.drain(); 412 } 413 if (savedActiveTransport != null) { 414 savedActiveTransport.shutdown(reason); 415 } 416 if (savedPendingTransport != null) { 417 savedPendingTransport.shutdown(reason); 418 } 419 } 420 421 @Override toString()422 public String toString() { 423 // addressGroupsCopy being a little stale is fine, just avoid calling toString with the lock 424 // since there may be many addresses. 425 Object addressGroupsCopy; 426 synchronized (lock) { 427 addressGroupsCopy = addressIndex.getGroups(); 428 } 429 return MoreObjects.toStringHelper(this) 430 .add("logId", logId.getId()) 431 .add("addressGroups", addressGroupsCopy) 432 .toString(); 433 } 434 435 @GuardedBy("lock") handleTermination()436 private void handleTermination() { 437 channelExecutor.executeLater(new Runnable() { 438 @Override 439 public void run() { 440 callback.onTerminated(InternalSubchannel.this); 441 } 442 }); 443 } 444 handleTransportInUseState( final ConnectionClientTransport transport, final boolean inUse)445 private void handleTransportInUseState( 446 final ConnectionClientTransport transport, final boolean inUse) { 447 channelExecutor.executeLater(new Runnable() { 448 @Override 449 public void run() { 450 inUseStateAggregator.updateObjectInUse(transport, inUse); 451 } 452 }).drain(); 453 } 454 shutdownNow(Status reason)455 void shutdownNow(Status reason) { 456 shutdown(reason); 457 Collection<ManagedClientTransport> transportsCopy; 458 try { 459 synchronized (lock) { 460 transportsCopy = new ArrayList<ManagedClientTransport>(transports); 461 } 462 } finally { 463 channelExecutor.drain(); 464 } 465 for (ManagedClientTransport transport : transportsCopy) { 466 transport.shutdownNow(reason); 467 } 468 } 469 getAddressGroups()470 List<EquivalentAddressGroup> getAddressGroups() { 471 try { 472 synchronized (lock) { 473 return addressIndex.getGroups(); 474 } 475 } finally { 476 channelExecutor.drain(); 477 } 478 } 479 480 @GuardedBy("lock") cancelReconnectTask()481 private void cancelReconnectTask() { 482 if (reconnectTask != null) { 483 reconnectTask.cancel(false); 484 reconnectCanceled = true; 485 reconnectTask = null; 486 reconnectPolicy = null; 487 } 488 } 489 490 @Override getLogId()491 public InternalLogId getLogId() { 492 return logId; 493 } 494 495 496 @Override getStats()497 public ListenableFuture<ChannelStats> getStats() { 498 SettableFuture<ChannelStats> ret = SettableFuture.create(); 499 ChannelStats.Builder builder = new ChannelStats.Builder(); 500 501 List<EquivalentAddressGroup> addressGroupsSnapshot; 502 List<InternalWithLogId> transportsSnapshot; 503 synchronized (lock) { 504 addressGroupsSnapshot = addressIndex.getGroups(); 505 transportsSnapshot = new ArrayList<InternalWithLogId>(transports); 506 } 507 508 builder.setTarget(addressGroupsSnapshot.toString()).setState(getState()); 509 builder.setSockets(transportsSnapshot); 510 callsTracer.updateBuilder(builder); 511 if (channelTracer != null) { 512 channelTracer.updateBuilder(builder); 513 } 514 ret.set(builder.build()); 515 return ret; 516 } 517 518 @VisibleForTesting getState()519 ConnectivityState getState() { 520 try { 521 synchronized (lock) { 522 return state.getState(); 523 } 524 } finally { 525 channelExecutor.drain(); 526 } 527 } 528 checkListHasNoNulls(List<?> list, String msg)529 private static void checkListHasNoNulls(List<?> list, String msg) { 530 for (Object item : list) { 531 Preconditions.checkNotNull(item, msg); 532 } 533 } 534 535 /** Listener for real transports. */ 536 private class TransportListener implements ManagedClientTransport.Listener { 537 final ConnectionClientTransport transport; 538 final SocketAddress address; 539 TransportListener(ConnectionClientTransport transport, SocketAddress address)540 TransportListener(ConnectionClientTransport transport, SocketAddress address) { 541 this.transport = transport; 542 this.address = address; 543 } 544 545 @Override transportReady()546 public void transportReady() { 547 if (log.isLoggable(Level.FINE)) { 548 log.log(Level.FINE, "[{0}] {1} for {2} is ready", 549 new Object[] {logId, transport.getLogId(), address}); 550 } 551 Status savedShutdownReason; 552 try { 553 synchronized (lock) { 554 savedShutdownReason = shutdownReason; 555 reconnectPolicy = null; 556 if (savedShutdownReason != null) { 557 // activeTransport should have already been set to null by shutdown(). We keep it null. 558 Preconditions.checkState(activeTransport == null, 559 "Unexpected non-null activeTransport"); 560 } else if (pendingTransport == transport) { 561 gotoNonErrorState(READY); 562 activeTransport = transport; 563 pendingTransport = null; 564 } 565 } 566 } finally { 567 channelExecutor.drain(); 568 } 569 if (savedShutdownReason != null) { 570 transport.shutdown(savedShutdownReason); 571 } 572 } 573 574 @Override transportInUse(boolean inUse)575 public void transportInUse(boolean inUse) { 576 handleTransportInUseState(transport, inUse); 577 } 578 579 @Override transportShutdown(Status s)580 public void transportShutdown(Status s) { 581 if (log.isLoggable(Level.FINE)) { 582 log.log(Level.FINE, "[{0}] {1} for {2} is being shutdown with status {3}", 583 new Object[] {logId, transport.getLogId(), address, s}); 584 } 585 try { 586 synchronized (lock) { 587 if (state.getState() == SHUTDOWN) { 588 return; 589 } 590 if (activeTransport == transport) { 591 gotoNonErrorState(IDLE); 592 activeTransport = null; 593 addressIndex.reset(); 594 } else if (pendingTransport == transport) { 595 Preconditions.checkState(state.getState() == CONNECTING, 596 "Expected state is CONNECTING, actual state is %s", state.getState()); 597 addressIndex.increment(); 598 // Continue reconnect if there are still addresses to try. 599 if (!addressIndex.isValid()) { 600 pendingTransport = null; 601 addressIndex.reset(); 602 // Initiate backoff 603 // Transition to TRANSIENT_FAILURE 604 scheduleBackoff(s); 605 } else { 606 startNewTransport(); 607 } 608 } 609 } 610 } finally { 611 channelExecutor.drain(); 612 } 613 } 614 615 @Override transportTerminated()616 public void transportTerminated() { 617 if (log.isLoggable(Level.FINE)) { 618 log.log(Level.FINE, "[{0}] {1} for {2} is terminated", 619 new Object[] {logId, transport.getLogId(), address}); 620 } 621 channelz.removeClientSocket(transport); 622 handleTransportInUseState(transport, false); 623 try { 624 synchronized (lock) { 625 transports.remove(transport); 626 if (state.getState() == SHUTDOWN && transports.isEmpty()) { 627 if (log.isLoggable(Level.FINE)) { 628 log.log(Level.FINE, "[{0}] Terminated in transportTerminated()", logId); 629 } 630 handleTermination(); 631 } 632 } 633 } finally { 634 channelExecutor.drain(); 635 } 636 Preconditions.checkState(activeTransport != transport, 637 "activeTransport still points to this transport. " 638 + "Seems transportShutdown() was not called."); 639 } 640 } 641 642 // All methods are called in channelExecutor, which is a serializing executor. 643 abstract static class Callback { 644 /** 645 * Called when the subchannel is terminated, which means it's shut down and all transports 646 * have been terminated. 647 */ 648 @ForOverride onTerminated(InternalSubchannel is)649 void onTerminated(InternalSubchannel is) { } 650 651 /** 652 * Called when the subchannel's connectivity state has changed. 653 */ 654 @ForOverride onStateChange(InternalSubchannel is, ConnectivityStateInfo newState)655 void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { } 656 657 /** 658 * Called when the subchannel's in-use state has changed to true, which means at least one 659 * transport is in use. 660 */ 661 @ForOverride onInUse(InternalSubchannel is)662 void onInUse(InternalSubchannel is) { } 663 664 /** 665 * Called when the subchannel's in-use state has changed to false, which means no transport is 666 * in use. 667 */ 668 @ForOverride onNotInUse(InternalSubchannel is)669 void onNotInUse(InternalSubchannel is) { } 670 } 671 672 @VisibleForTesting 673 static final class CallTracingTransport extends ForwardingConnectionClientTransport { 674 private final ConnectionClientTransport delegate; 675 private final CallTracer callTracer; 676 CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer)677 private CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer) { 678 this.delegate = delegate; 679 this.callTracer = callTracer; 680 } 681 682 @Override delegate()683 protected ConnectionClientTransport delegate() { 684 return delegate; 685 } 686 687 @Override newStream( MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions)688 public ClientStream newStream( 689 MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) { 690 final ClientStream streamDelegate = super.newStream(method, headers, callOptions); 691 return new ForwardingClientStream() { 692 @Override 693 protected ClientStream delegate() { 694 return streamDelegate; 695 } 696 697 @Override 698 public void start(final ClientStreamListener listener) { 699 callTracer.reportCallStarted(); 700 super.start(new ForwardingClientStreamListener() { 701 @Override 702 protected ClientStreamListener delegate() { 703 return listener; 704 } 705 706 @Override 707 public void closed(Status status, Metadata trailers) { 708 callTracer.reportCallEnded(status.isOk()); 709 super.closed(status, trailers); 710 } 711 712 @Override 713 public void closed( 714 Status status, RpcProgress rpcProgress, Metadata trailers) { 715 callTracer.reportCallEnded(status.isOk()); 716 super.closed(status, rpcProgress, trailers); 717 } 718 }); 719 } 720 }; 721 } 722 } 723 724 /** Index as in 'i', the pointer to an entry. Not a "search index." */ 725 @VisibleForTesting 726 static final class Index { 727 private List<EquivalentAddressGroup> addressGroups; 728 private int groupIndex; 729 private int addressIndex; 730 731 public Index(List<EquivalentAddressGroup> groups) { 732 this.addressGroups = groups; 733 } 734 735 public boolean isValid() { 736 // addressIndex will never be invalid 737 return groupIndex < addressGroups.size(); 738 } 739 740 public boolean isAtBeginning() { 741 return groupIndex == 0 && addressIndex == 0; 742 } 743 744 public void increment() { 745 EquivalentAddressGroup group = addressGroups.get(groupIndex); 746 addressIndex++; 747 if (addressIndex >= group.getAddresses().size()) { 748 groupIndex++; 749 addressIndex = 0; 750 } 751 } 752 753 public void reset() { 754 groupIndex = 0; 755 addressIndex = 0; 756 } 757 758 public SocketAddress getCurrentAddress() { 759 return addressGroups.get(groupIndex).getAddresses().get(addressIndex); 760 } 761 762 public Attributes getCurrentEagAttributes() { 763 return addressGroups.get(groupIndex).getAttributes(); 764 } 765 766 public List<EquivalentAddressGroup> getGroups() { 767 return addressGroups; 768 } 769 770 /** Update to new groups, resetting the current index. */ 771 public void updateGroups(List<EquivalentAddressGroup> newGroups) { 772 addressGroups = newGroups; 773 reset(); 774 } 775 776 /** Returns false if the needle was not found and the current index was left unchanged. */ 777 public boolean seekTo(SocketAddress needle) { 778 for (int i = 0; i < addressGroups.size(); i++) { 779 EquivalentAddressGroup group = addressGroups.get(i); 780 int j = group.getAddresses().indexOf(needle); 781 if (j == -1) { 782 continue; 783 } 784 this.groupIndex = i; 785 this.addressIndex = j; 786 return true; 787 } 788 return false; 789 } 790 } 791 } 792