• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static io.grpc.ConnectivityState.CONNECTING;
20 import static io.grpc.ConnectivityState.IDLE;
21 import static io.grpc.ConnectivityState.READY;
22 import static io.grpc.ConnectivityState.SHUTDOWN;
23 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
24 
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.MoreObjects;
27 import com.google.common.base.Preconditions;
28 import com.google.common.base.Stopwatch;
29 import com.google.common.base.Supplier;
30 import com.google.common.util.concurrent.ListenableFuture;
31 import com.google.common.util.concurrent.SettableFuture;
32 import com.google.errorprone.annotations.ForOverride;
33 import io.grpc.Attributes;
34 import io.grpc.CallOptions;
35 import io.grpc.ConnectivityState;
36 import io.grpc.ConnectivityStateInfo;
37 import io.grpc.EquivalentAddressGroup;
38 import io.grpc.InternalChannelz;
39 import io.grpc.InternalChannelz.ChannelStats;
40 import io.grpc.InternalChannelz.ChannelTrace;
41 import io.grpc.InternalInstrumented;
42 import io.grpc.InternalLogId;
43 import io.grpc.InternalWithLogId;
44 import io.grpc.Metadata;
45 import io.grpc.MethodDescriptor;
46 import io.grpc.Status;
47 import java.net.SocketAddress;
48 import java.util.ArrayList;
49 import java.util.Collection;
50 import java.util.Collections;
51 import java.util.List;
52 import java.util.concurrent.ScheduledExecutorService;
53 import java.util.concurrent.ScheduledFuture;
54 import java.util.concurrent.TimeUnit;
55 import java.util.logging.Level;
56 import java.util.logging.Logger;
57 import javax.annotation.CheckForNull;
58 import javax.annotation.Nullable;
59 import javax.annotation.concurrent.GuardedBy;
60 import javax.annotation.concurrent.ThreadSafe;
61 
62 /**
63  * Transports for a single {@link SocketAddress}.
64  */
65 @ThreadSafe
66 final class InternalSubchannel implements InternalInstrumented<ChannelStats> {
67   private static final Logger log = Logger.getLogger(InternalSubchannel.class.getName());
68 
69   private final InternalLogId logId = InternalLogId.allocate(getClass().getName());
70   private final String authority;
71   private final String userAgent;
72   private final BackoffPolicy.Provider backoffPolicyProvider;
73   private final Callback callback;
74   private final ClientTransportFactory transportFactory;
75   private final ScheduledExecutorService scheduledExecutor;
76   private final InternalChannelz channelz;
77   private final CallTracer callsTracer;
78   @CheckForNull
79   private final ChannelTracer channelTracer;
80   private final TimeProvider timeProvider;
81 
82   // File-specific convention: methods without GuardedBy("lock") MUST NOT be called under the lock.
83   private final Object lock = new Object();
84 
85   // File-specific convention:
86   //
87   // 1. In a method without GuardedBy("lock"), executeLater() MUST be followed by a drain() later in
88   // the same method.
89   //
90   // 2. drain() MUST NOT be called under "lock".
91   //
92   // 3. Every synchronized("lock") must be inside a try-finally which calls drain() in "finally".
93   private final ChannelExecutor channelExecutor;
94 
95   /**
96    * The index of the address corresponding to pendingTransport/activeTransport, or at beginning if
97    * both are null.
98    */
99   @GuardedBy("lock")
100   private Index addressIndex;
101 
102   /**
103    * The policy to control back off between reconnects. Non-{@code null} when a reconnect task is
104    * scheduled.
105    */
106   @GuardedBy("lock")
107   private BackoffPolicy reconnectPolicy;
108 
109   /**
110    * Timer monitoring duration since entering CONNECTING state.
111    */
112   @GuardedBy("lock")
113   private final Stopwatch connectingTimer;
114 
115   @GuardedBy("lock")
116   @Nullable
117   private ScheduledFuture<?> reconnectTask;
118 
119   @GuardedBy("lock")
120   private boolean reconnectCanceled;
121 
122   /**
123    * All transports that are not terminated. At the very least the value of {@link #activeTransport}
124    * will be present, but previously used transports that still have streams or are stopping may
125    * also be present.
126    */
127   @GuardedBy("lock")
128   private final Collection<ConnectionClientTransport> transports = new ArrayList<>();
129 
130   // Must only be used from channelExecutor
131   private final InUseStateAggregator<ConnectionClientTransport> inUseStateAggregator =
132       new InUseStateAggregator<ConnectionClientTransport>() {
133         @Override
134         void handleInUse() {
135           callback.onInUse(InternalSubchannel.this);
136         }
137 
138         @Override
139         void handleNotInUse() {
140           callback.onNotInUse(InternalSubchannel.this);
141         }
142       };
143 
144   /**
145    * The to-be active transport, which is not ready yet.
146    */
147   @GuardedBy("lock")
148   @Nullable
149   private ConnectionClientTransport pendingTransport;
150 
151   /**
152    * The transport for new outgoing requests. 'lock' must be held when assigning to it. Non-null
153    * only in READY state.
154    */
155   @Nullable
156   private volatile ManagedClientTransport activeTransport;
157 
158   @GuardedBy("lock")
159   private ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE);
160 
161   @GuardedBy("lock")
162   private Status shutdownReason;
163 
InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent, BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, Supplier<Stopwatch> stopwatchSupplier, ChannelExecutor channelExecutor, Callback callback, InternalChannelz channelz, CallTracer callsTracer, @Nullable ChannelTracer channelTracer, TimeProvider timeProvider)164   InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent,
165       BackoffPolicy.Provider backoffPolicyProvider,
166       ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
167       Supplier<Stopwatch> stopwatchSupplier, ChannelExecutor channelExecutor, Callback callback,
168       InternalChannelz channelz, CallTracer callsTracer, @Nullable ChannelTracer channelTracer,
169       TimeProvider timeProvider) {
170     Preconditions.checkNotNull(addressGroups, "addressGroups");
171     Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
172     checkListHasNoNulls(addressGroups, "addressGroups contains null entry");
173     this.addressIndex = new Index(
174         Collections.unmodifiableList(new ArrayList<>(addressGroups)));
175     this.authority = authority;
176     this.userAgent = userAgent;
177     this.backoffPolicyProvider = backoffPolicyProvider;
178     this.transportFactory = transportFactory;
179     this.scheduledExecutor = scheduledExecutor;
180     this.connectingTimer = stopwatchSupplier.get();
181     this.channelExecutor = channelExecutor;
182     this.callback = callback;
183     this.channelz = channelz;
184     this.callsTracer = callsTracer;
185     this.channelTracer = channelTracer;
186     this.timeProvider = timeProvider;
187   }
188 
189   /**
190    * Returns a READY transport that will be used to create new streams.
191    *
192    * <p>Returns {@code null} if the state is not READY.  Will try to connect if state is IDLE.
193    */
194   @Nullable
obtainActiveTransport()195   ClientTransport obtainActiveTransport() {
196     ClientTransport savedTransport = activeTransport;
197     if (savedTransport != null) {
198       return savedTransport;
199     }
200     try {
201       synchronized (lock) {
202         savedTransport = activeTransport;
203         // Check again, since it could have changed before acquiring the lock
204         if (savedTransport != null) {
205           return savedTransport;
206         }
207         if (state.getState() == IDLE) {
208           gotoNonErrorState(CONNECTING);
209           startNewTransport();
210         }
211       }
212     } finally {
213       channelExecutor.drain();
214     }
215     return null;
216   }
217 
218   @GuardedBy("lock")
startNewTransport()219   private void startNewTransport() {
220     Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
221 
222     if (addressIndex.isAtBeginning()) {
223       connectingTimer.reset().start();
224     }
225     SocketAddress address = addressIndex.getCurrentAddress();
226 
227     ProxyParameters proxy = null;
228     if (address instanceof ProxySocketAddress) {
229       proxy = ((ProxySocketAddress) address).getProxyParameters();
230       address = ((ProxySocketAddress) address).getAddress();
231     }
232 
233     ClientTransportFactory.ClientTransportOptions options =
234         new ClientTransportFactory.ClientTransportOptions()
235           .setAuthority(authority)
236           .setEagAttributes(addressIndex.getCurrentEagAttributes())
237           .setUserAgent(userAgent)
238           .setProxyParameters(proxy);
239     ConnectionClientTransport transport =
240         new CallTracingTransport(
241             transportFactory.newClientTransport(address, options), callsTracer);
242     channelz.addClientSocket(transport);
243     if (log.isLoggable(Level.FINE)) {
244       log.log(Level.FINE, "[{0}] Created {1} for {2}",
245           new Object[] {logId, transport.getLogId(), address});
246     }
247     pendingTransport = transport;
248     transports.add(transport);
249     Runnable runnable = transport.start(new TransportListener(transport, address));
250     if (runnable != null) {
251       channelExecutor.executeLater(runnable);
252     }
253   }
254 
255   /**
256    * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
257    * @param status the causal status when the channel begins transition to
258    *     TRANSIENT_FAILURE.
259    */
260   @GuardedBy("lock")
scheduleBackoff(final Status status)261   private void scheduleBackoff(final Status status) {
262     class EndOfCurrentBackoff implements Runnable {
263       @Override
264       public void run() {
265         try {
266           synchronized (lock) {
267             reconnectTask = null;
268             if (reconnectCanceled) {
269               // Even though cancelReconnectTask() will cancel this task, the task may have already
270               // started when it's being canceled.
271               return;
272             }
273             gotoNonErrorState(CONNECTING);
274             startNewTransport();
275           }
276         } catch (Throwable t) {
277           log.log(Level.WARNING, "Exception handling end of backoff", t);
278         } finally {
279           channelExecutor.drain();
280         }
281       }
282     }
283 
284     gotoState(ConnectivityStateInfo.forTransientFailure(status));
285     if (reconnectPolicy == null) {
286       reconnectPolicy = backoffPolicyProvider.get();
287     }
288     long delayNanos =
289         reconnectPolicy.nextBackoffNanos() - connectingTimer.elapsed(TimeUnit.NANOSECONDS);
290     if (log.isLoggable(Level.FINE)) {
291       log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ns", new Object[]{logId, delayNanos});
292     }
293     Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
294     reconnectCanceled = false;
295     reconnectTask = scheduledExecutor.schedule(
296         new LogExceptionRunnable(new EndOfCurrentBackoff()),
297         delayNanos,
298         TimeUnit.NANOSECONDS);
299   }
300 
301   /**
302    * Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise this
303    * method has no effect.
304    */
resetConnectBackoff()305   void resetConnectBackoff() {
306     try {
307       synchronized (lock) {
308         if (state.getState() != TRANSIENT_FAILURE) {
309           return;
310         }
311         cancelReconnectTask();
312         gotoNonErrorState(CONNECTING);
313         startNewTransport();
314       }
315     } finally {
316       channelExecutor.drain();
317     }
318   }
319 
320   @GuardedBy("lock")
gotoNonErrorState(ConnectivityState newState)321   private void gotoNonErrorState(ConnectivityState newState) {
322     gotoState(ConnectivityStateInfo.forNonError(newState));
323   }
324 
325   @GuardedBy("lock")
gotoState(final ConnectivityStateInfo newState)326   private void gotoState(final ConnectivityStateInfo newState) {
327     if (state.getState() != newState.getState()) {
328       Preconditions.checkState(state.getState() != SHUTDOWN,
329           "Cannot transition out of SHUTDOWN to " + newState);
330       state = newState;
331       if (channelTracer != null) {
332         channelTracer.reportEvent(
333             new ChannelTrace.Event.Builder()
334                 .setDescription("Entering " + state + " state")
335                 .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
336                 .setTimestampNanos(timeProvider.currentTimeNanos())
337                 .build());
338       }
339       channelExecutor.executeLater(new Runnable() {
340           @Override
341           public void run() {
342             callback.onStateChange(InternalSubchannel.this, newState);
343           }
344         });
345     }
346   }
347 
348   /** Replaces the existing addresses, avoiding unnecessary reconnects. */
updateAddresses(List<EquivalentAddressGroup> newAddressGroups)349   public void updateAddresses(List<EquivalentAddressGroup> newAddressGroups) {
350     Preconditions.checkNotNull(newAddressGroups, "newAddressGroups");
351     checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry");
352     Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty");
353     newAddressGroups =
354         Collections.unmodifiableList(new ArrayList<>(newAddressGroups));
355     ManagedClientTransport savedTransport = null;
356     try {
357       synchronized (lock) {
358         SocketAddress previousAddress = addressIndex.getCurrentAddress();
359         addressIndex.updateGroups(newAddressGroups);
360         if (state.getState() == READY || state.getState() == CONNECTING) {
361           if (!addressIndex.seekTo(previousAddress)) {
362             // Forced to drop the connection
363             if (state.getState() == READY) {
364               savedTransport = activeTransport;
365               activeTransport = null;
366               addressIndex.reset();
367               gotoNonErrorState(IDLE);
368             } else {
369               savedTransport = pendingTransport;
370               pendingTransport = null;
371               addressIndex.reset();
372               startNewTransport();
373             }
374           }
375         }
376       }
377     } finally {
378       channelExecutor.drain();
379     }
380     if (savedTransport != null) {
381       savedTransport.shutdown(
382           Status.UNAVAILABLE.withDescription(
383               "InternalSubchannel closed transport due to address change"));
384     }
385   }
386 
shutdown(Status reason)387   public void shutdown(Status reason) {
388     ManagedClientTransport savedActiveTransport;
389     ConnectionClientTransport savedPendingTransport;
390     try {
391       synchronized (lock) {
392         if (state.getState() == SHUTDOWN) {
393           return;
394         }
395         shutdownReason = reason;
396         gotoNonErrorState(SHUTDOWN);
397         savedActiveTransport = activeTransport;
398         savedPendingTransport = pendingTransport;
399         activeTransport = null;
400         pendingTransport = null;
401         addressIndex.reset();
402         if (transports.isEmpty()) {
403           handleTermination();
404           if (log.isLoggable(Level.FINE)) {
405             log.log(Level.FINE, "[{0}] Terminated in shutdown()", logId);
406           }
407         }  // else: the callback will be run once all transports have been terminated
408         cancelReconnectTask();
409       }
410     } finally {
411       channelExecutor.drain();
412     }
413     if (savedActiveTransport != null) {
414       savedActiveTransport.shutdown(reason);
415     }
416     if (savedPendingTransport != null) {
417       savedPendingTransport.shutdown(reason);
418     }
419   }
420 
421   @Override
toString()422   public String toString() {
423     // addressGroupsCopy being a little stale is fine, just avoid calling toString with the lock
424     // since there may be many addresses.
425     Object addressGroupsCopy;
426     synchronized (lock) {
427       addressGroupsCopy = addressIndex.getGroups();
428     }
429     return MoreObjects.toStringHelper(this)
430           .add("logId", logId.getId())
431           .add("addressGroups", addressGroupsCopy)
432           .toString();
433   }
434 
435   @GuardedBy("lock")
handleTermination()436   private void handleTermination() {
437     channelExecutor.executeLater(new Runnable() {
438         @Override
439         public void run() {
440           callback.onTerminated(InternalSubchannel.this);
441         }
442       });
443   }
444 
handleTransportInUseState( final ConnectionClientTransport transport, final boolean inUse)445   private void handleTransportInUseState(
446       final ConnectionClientTransport transport, final boolean inUse) {
447     channelExecutor.executeLater(new Runnable() {
448         @Override
449         public void run() {
450           inUseStateAggregator.updateObjectInUse(transport, inUse);
451         }
452       }).drain();
453   }
454 
shutdownNow(Status reason)455   void shutdownNow(Status reason) {
456     shutdown(reason);
457     Collection<ManagedClientTransport> transportsCopy;
458     try {
459       synchronized (lock) {
460         transportsCopy = new ArrayList<ManagedClientTransport>(transports);
461       }
462     } finally {
463       channelExecutor.drain();
464     }
465     for (ManagedClientTransport transport : transportsCopy) {
466       transport.shutdownNow(reason);
467     }
468   }
469 
getAddressGroups()470   List<EquivalentAddressGroup> getAddressGroups() {
471     try {
472       synchronized (lock) {
473         return addressIndex.getGroups();
474       }
475     } finally {
476       channelExecutor.drain();
477     }
478   }
479 
480   @GuardedBy("lock")
cancelReconnectTask()481   private void cancelReconnectTask() {
482     if (reconnectTask != null) {
483       reconnectTask.cancel(false);
484       reconnectCanceled = true;
485       reconnectTask = null;
486       reconnectPolicy = null;
487     }
488   }
489 
490   @Override
getLogId()491   public InternalLogId getLogId() {
492     return logId;
493   }
494 
495 
496   @Override
getStats()497   public ListenableFuture<ChannelStats> getStats() {
498     SettableFuture<ChannelStats> ret = SettableFuture.create();
499     ChannelStats.Builder builder = new ChannelStats.Builder();
500 
501     List<EquivalentAddressGroup> addressGroupsSnapshot;
502     List<InternalWithLogId> transportsSnapshot;
503     synchronized (lock) {
504       addressGroupsSnapshot = addressIndex.getGroups();
505       transportsSnapshot = new ArrayList<InternalWithLogId>(transports);
506     }
507 
508     builder.setTarget(addressGroupsSnapshot.toString()).setState(getState());
509     builder.setSockets(transportsSnapshot);
510     callsTracer.updateBuilder(builder);
511     if (channelTracer != null) {
512       channelTracer.updateBuilder(builder);
513     }
514     ret.set(builder.build());
515     return ret;
516   }
517 
518   @VisibleForTesting
getState()519   ConnectivityState getState() {
520     try {
521       synchronized (lock) {
522         return state.getState();
523       }
524     } finally {
525       channelExecutor.drain();
526     }
527   }
528 
checkListHasNoNulls(List<?> list, String msg)529   private static void checkListHasNoNulls(List<?> list, String msg) {
530     for (Object item : list) {
531       Preconditions.checkNotNull(item, msg);
532     }
533   }
534 
535   /** Listener for real transports. */
536   private class TransportListener implements ManagedClientTransport.Listener {
537     final ConnectionClientTransport transport;
538     final SocketAddress address;
539 
TransportListener(ConnectionClientTransport transport, SocketAddress address)540     TransportListener(ConnectionClientTransport transport, SocketAddress address) {
541       this.transport = transport;
542       this.address = address;
543     }
544 
545     @Override
transportReady()546     public void transportReady() {
547       if (log.isLoggable(Level.FINE)) {
548         log.log(Level.FINE, "[{0}] {1} for {2} is ready",
549             new Object[] {logId, transport.getLogId(), address});
550       }
551       Status savedShutdownReason;
552       try {
553         synchronized (lock) {
554           savedShutdownReason = shutdownReason;
555           reconnectPolicy = null;
556           if (savedShutdownReason != null) {
557             // activeTransport should have already been set to null by shutdown(). We keep it null.
558             Preconditions.checkState(activeTransport == null,
559                 "Unexpected non-null activeTransport");
560           } else if (pendingTransport == transport) {
561             gotoNonErrorState(READY);
562             activeTransport = transport;
563             pendingTransport = null;
564           }
565         }
566       } finally {
567         channelExecutor.drain();
568       }
569       if (savedShutdownReason != null) {
570         transport.shutdown(savedShutdownReason);
571       }
572     }
573 
574     @Override
transportInUse(boolean inUse)575     public void transportInUse(boolean inUse) {
576       handleTransportInUseState(transport, inUse);
577     }
578 
579     @Override
transportShutdown(Status s)580     public void transportShutdown(Status s) {
581       if (log.isLoggable(Level.FINE)) {
582         log.log(Level.FINE, "[{0}] {1} for {2} is being shutdown with status {3}",
583             new Object[] {logId, transport.getLogId(), address, s});
584       }
585       try {
586         synchronized (lock) {
587           if (state.getState() == SHUTDOWN) {
588             return;
589           }
590           if (activeTransport == transport) {
591             gotoNonErrorState(IDLE);
592             activeTransport = null;
593             addressIndex.reset();
594           } else if (pendingTransport == transport) {
595             Preconditions.checkState(state.getState() == CONNECTING,
596                 "Expected state is CONNECTING, actual state is %s", state.getState());
597             addressIndex.increment();
598             // Continue reconnect if there are still addresses to try.
599             if (!addressIndex.isValid()) {
600               pendingTransport = null;
601               addressIndex.reset();
602               // Initiate backoff
603               // Transition to TRANSIENT_FAILURE
604               scheduleBackoff(s);
605             } else {
606               startNewTransport();
607             }
608           }
609         }
610       } finally {
611         channelExecutor.drain();
612       }
613     }
614 
615     @Override
transportTerminated()616     public void transportTerminated() {
617       if (log.isLoggable(Level.FINE)) {
618         log.log(Level.FINE, "[{0}] {1} for {2} is terminated",
619             new Object[] {logId, transport.getLogId(), address});
620       }
621       channelz.removeClientSocket(transport);
622       handleTransportInUseState(transport, false);
623       try {
624         synchronized (lock) {
625           transports.remove(transport);
626           if (state.getState() == SHUTDOWN && transports.isEmpty()) {
627             if (log.isLoggable(Level.FINE)) {
628               log.log(Level.FINE, "[{0}] Terminated in transportTerminated()", logId);
629             }
630             handleTermination();
631           }
632         }
633       } finally {
634         channelExecutor.drain();
635       }
636       Preconditions.checkState(activeTransport != transport,
637           "activeTransport still points to this transport. "
638           + "Seems transportShutdown() was not called.");
639     }
640   }
641 
642   // All methods are called in channelExecutor, which is a serializing executor.
643   abstract static class Callback {
644     /**
645      * Called when the subchannel is terminated, which means it's shut down and all transports
646      * have been terminated.
647      */
648     @ForOverride
onTerminated(InternalSubchannel is)649     void onTerminated(InternalSubchannel is) { }
650 
651     /**
652      * Called when the subchannel's connectivity state has changed.
653      */
654     @ForOverride
onStateChange(InternalSubchannel is, ConnectivityStateInfo newState)655     void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { }
656 
657     /**
658      * Called when the subchannel's in-use state has changed to true, which means at least one
659      * transport is in use.
660      */
661     @ForOverride
onInUse(InternalSubchannel is)662     void onInUse(InternalSubchannel is) { }
663 
664     /**
665      * Called when the subchannel's in-use state has changed to false, which means no transport is
666      * in use.
667      */
668     @ForOverride
onNotInUse(InternalSubchannel is)669     void onNotInUse(InternalSubchannel is) { }
670   }
671 
672   @VisibleForTesting
673   static final class CallTracingTransport extends ForwardingConnectionClientTransport {
674     private final ConnectionClientTransport delegate;
675     private final CallTracer callTracer;
676 
CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer)677     private CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer) {
678       this.delegate = delegate;
679       this.callTracer = callTracer;
680     }
681 
682     @Override
delegate()683     protected ConnectionClientTransport delegate() {
684       return delegate;
685     }
686 
687     @Override
newStream( MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions)688     public ClientStream newStream(
689         MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
690       final ClientStream streamDelegate = super.newStream(method, headers, callOptions);
691       return new ForwardingClientStream() {
692         @Override
693         protected ClientStream delegate() {
694           return streamDelegate;
695         }
696 
697         @Override
698         public void start(final ClientStreamListener listener) {
699           callTracer.reportCallStarted();
700           super.start(new ForwardingClientStreamListener() {
701             @Override
702             protected ClientStreamListener delegate() {
703               return listener;
704             }
705 
706             @Override
707             public void closed(Status status, Metadata trailers) {
708               callTracer.reportCallEnded(status.isOk());
709               super.closed(status, trailers);
710             }
711 
712             @Override
713             public void closed(
714                 Status status, RpcProgress rpcProgress, Metadata trailers) {
715               callTracer.reportCallEnded(status.isOk());
716               super.closed(status, rpcProgress, trailers);
717             }
718           });
719         }
720       };
721     }
722   }
723 
724   /** Index as in 'i', the pointer to an entry. Not a "search index." */
725   @VisibleForTesting
726   static final class Index {
727     private List<EquivalentAddressGroup> addressGroups;
728     private int groupIndex;
729     private int addressIndex;
730 
731     public Index(List<EquivalentAddressGroup> groups) {
732       this.addressGroups = groups;
733     }
734 
735     public boolean isValid() {
736       // addressIndex will never be invalid
737       return groupIndex < addressGroups.size();
738     }
739 
740     public boolean isAtBeginning() {
741       return groupIndex == 0 && addressIndex == 0;
742     }
743 
744     public void increment() {
745       EquivalentAddressGroup group = addressGroups.get(groupIndex);
746       addressIndex++;
747       if (addressIndex >= group.getAddresses().size()) {
748         groupIndex++;
749         addressIndex = 0;
750       }
751     }
752 
753     public void reset() {
754       groupIndex = 0;
755       addressIndex = 0;
756     }
757 
758     public SocketAddress getCurrentAddress() {
759       return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
760     }
761 
762     public Attributes getCurrentEagAttributes() {
763       return addressGroups.get(groupIndex).getAttributes();
764     }
765 
766     public List<EquivalentAddressGroup> getGroups() {
767       return addressGroups;
768     }
769 
770     /** Update to new groups, resetting the current index. */
771     public void updateGroups(List<EquivalentAddressGroup> newGroups) {
772       addressGroups = newGroups;
773       reset();
774     }
775 
776     /** Returns false if the needle was not found and the current index was left unchanged. */
777     public boolean seekTo(SocketAddress needle) {
778       for (int i = 0; i < addressGroups.size(); i++) {
779         EquivalentAddressGroup group = addressGroups.get(i);
780         int j = group.getAddresses().indexOf(needle);
781         if (j == -1) {
782           continue;
783         }
784         this.groupIndex = i;
785         this.addressIndex = j;
786         return true;
787       }
788       return false;
789     }
790   }
791 }
792