• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static io.grpc.ConnectivityState.CONNECTING;
20 import static io.grpc.ConnectivityState.IDLE;
21 import static io.grpc.ConnectivityState.READY;
22 import static io.grpc.ConnectivityState.SHUTDOWN;
23 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
24 
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.MoreObjects;
27 import com.google.common.base.Preconditions;
28 import com.google.common.base.Stopwatch;
29 import com.google.common.base.Supplier;
30 import com.google.common.util.concurrent.ListenableFuture;
31 import com.google.common.util.concurrent.SettableFuture;
32 import com.google.errorprone.annotations.ForOverride;
33 import io.grpc.Attributes;
34 import io.grpc.CallOptions;
35 import io.grpc.ChannelLogger;
36 import io.grpc.ChannelLogger.ChannelLogLevel;
37 import io.grpc.ClientStreamTracer;
38 import io.grpc.ConnectivityState;
39 import io.grpc.ConnectivityStateInfo;
40 import io.grpc.EquivalentAddressGroup;
41 import io.grpc.HttpConnectProxiedSocketAddress;
42 import io.grpc.InternalChannelz;
43 import io.grpc.InternalChannelz.ChannelStats;
44 import io.grpc.InternalInstrumented;
45 import io.grpc.InternalLogId;
46 import io.grpc.InternalWithLogId;
47 import io.grpc.Metadata;
48 import io.grpc.MethodDescriptor;
49 import io.grpc.Status;
50 import io.grpc.SynchronizationContext;
51 import io.grpc.SynchronizationContext.ScheduledHandle;
52 import java.net.SocketAddress;
53 import java.util.ArrayList;
54 import java.util.Collection;
55 import java.util.Collections;
56 import java.util.List;
57 import java.util.concurrent.ScheduledExecutorService;
58 import java.util.concurrent.TimeUnit;
59 import javax.annotation.Nullable;
60 import javax.annotation.concurrent.ThreadSafe;
61 
62 /**
63  * Transports for a single {@link SocketAddress}.
64  */
65 @ThreadSafe
66 final class InternalSubchannel implements InternalInstrumented<ChannelStats>, TransportProvider {
67 
68   private final InternalLogId logId;
69   private final String authority;
70   private final String userAgent;
71   private final BackoffPolicy.Provider backoffPolicyProvider;
72   private final Callback callback;
73   private final ClientTransportFactory transportFactory;
74   private final ScheduledExecutorService scheduledExecutor;
75   private final InternalChannelz channelz;
76   private final CallTracer callsTracer;
77   private final ChannelTracer channelTracer;
78   private final ChannelLogger channelLogger;
79 
80   /**
81    * All field must be mutated in the syncContext.
82    */
83   private final SynchronizationContext syncContext;
84 
85   /**
86    * The index of the address corresponding to pendingTransport/activeTransport, or at beginning if
87    * both are null.
88    *
89    * <p>Note: any {@link Index#updateAddresses(List)} should also update {@link #addressGroups}.
90    */
91   private final Index addressIndex;
92 
93   /**
94    * A volatile accessor to {@link Index#getAddressGroups()}. There are few methods ({@link
95    * #getAddressGroups()} and {@link #toString()} access this value where they supposed to access
96    * in the {@link #syncContext}. Ideally {@link Index#getAddressGroups()} can be volatile, so we
97    * don't need to maintain this volatile accessor. Although, having this accessor can reduce
98    * unnecessary volatile reads while it delivers clearer intention of why .
99    */
100   private volatile List<EquivalentAddressGroup> addressGroups;
101 
102   /**
103    * The policy to control back off between reconnects. Non-{@code null} when a reconnect task is
104    * scheduled.
105    */
106   private BackoffPolicy reconnectPolicy;
107 
108   /**
109    * Timer monitoring duration since entering CONNECTING state.
110    */
111   private final Stopwatch connectingTimer;
112 
113   @Nullable
114   private ScheduledHandle reconnectTask;
115   @Nullable
116   private ScheduledHandle shutdownDueToUpdateTask;
117   @Nullable
118   private ManagedClientTransport shutdownDueToUpdateTransport;
119 
120   /**
121    * All transports that are not terminated. At the very least the value of {@link #activeTransport}
122    * will be present, but previously used transports that still have streams or are stopping may
123    * also be present.
124    */
125   private final Collection<ConnectionClientTransport> transports = new ArrayList<>();
126 
127   // Must only be used from syncContext
128   private final InUseStateAggregator<ConnectionClientTransport> inUseStateAggregator =
129       new InUseStateAggregator<ConnectionClientTransport>() {
130         @Override
131         protected void handleInUse() {
132           callback.onInUse(InternalSubchannel.this);
133         }
134 
135         @Override
136         protected void handleNotInUse() {
137           callback.onNotInUse(InternalSubchannel.this);
138         }
139       };
140 
141   /**
142    * The to-be active transport, which is not ready yet.
143    */
144   @Nullable
145   private ConnectionClientTransport pendingTransport;
146 
147   /**
148    * The transport for new outgoing requests. Non-null only in READY state.
149    */
150   @Nullable
151   private volatile ManagedClientTransport activeTransport;
152 
153   private volatile ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE);
154 
155   private Status shutdownReason;
156 
InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent, BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext, Callback callback, InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer, InternalLogId logId, ChannelLogger channelLogger)157   InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent,
158       BackoffPolicy.Provider backoffPolicyProvider,
159       ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
160       Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext, Callback callback,
161       InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer,
162       InternalLogId logId, ChannelLogger channelLogger) {
163     Preconditions.checkNotNull(addressGroups, "addressGroups");
164     Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
165     checkListHasNoNulls(addressGroups, "addressGroups contains null entry");
166     List<EquivalentAddressGroup> unmodifiableAddressGroups =
167         Collections.unmodifiableList(new ArrayList<>(addressGroups));
168     this.addressGroups = unmodifiableAddressGroups;
169     this.addressIndex = new Index(unmodifiableAddressGroups);
170     this.authority = authority;
171     this.userAgent = userAgent;
172     this.backoffPolicyProvider = backoffPolicyProvider;
173     this.transportFactory = transportFactory;
174     this.scheduledExecutor = scheduledExecutor;
175     this.connectingTimer = stopwatchSupplier.get();
176     this.syncContext = syncContext;
177     this.callback = callback;
178     this.channelz = channelz;
179     this.callsTracer = callsTracer;
180     this.channelTracer = Preconditions.checkNotNull(channelTracer, "channelTracer");
181     this.logId = Preconditions.checkNotNull(logId, "logId");
182     this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
183   }
184 
getChannelLogger()185   ChannelLogger getChannelLogger() {
186     return channelLogger;
187   }
188 
189   @Override
obtainActiveTransport()190   public ClientTransport obtainActiveTransport() {
191     ClientTransport savedTransport = activeTransport;
192     if (savedTransport != null) {
193       return savedTransport;
194     }
195     syncContext.execute(new Runnable() {
196       @Override
197       public void run() {
198         if (state.getState() == IDLE) {
199           channelLogger.log(ChannelLogLevel.INFO, "CONNECTING as requested");
200           gotoNonErrorState(CONNECTING);
201           startNewTransport();
202         }
203       }
204     });
205     return null;
206   }
207 
208   /**
209    * Returns a READY transport if there is any, without trying to connect.
210    */
211   @Nullable
getTransport()212   ClientTransport getTransport() {
213     return activeTransport;
214   }
215 
216   /**
217    * Returns the authority string associated with this Subchannel.
218    */
getAuthority()219   String getAuthority() {
220     return authority;
221   }
222 
startNewTransport()223   private void startNewTransport() {
224     syncContext.throwIfNotInThisSynchronizationContext();
225 
226     Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
227 
228     if (addressIndex.isAtBeginning()) {
229       connectingTimer.reset().start();
230     }
231     SocketAddress address = addressIndex.getCurrentAddress();
232 
233     HttpConnectProxiedSocketAddress proxiedAddr = null;
234     if (address instanceof HttpConnectProxiedSocketAddress) {
235       proxiedAddr = (HttpConnectProxiedSocketAddress) address;
236       address = proxiedAddr.getTargetAddress();
237     }
238 
239     Attributes currentEagAttributes = addressIndex.getCurrentEagAttributes();
240     String eagChannelAuthority = currentEagAttributes
241             .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE);
242     ClientTransportFactory.ClientTransportOptions options =
243         new ClientTransportFactory.ClientTransportOptions()
244           .setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority)
245           .setEagAttributes(currentEagAttributes)
246           .setUserAgent(userAgent)
247           .setHttpConnectProxiedSocketAddress(proxiedAddr);
248     TransportLogger transportLogger = new TransportLogger();
249     // In case the transport logs in the constructor, use the subchannel logId
250     transportLogger.logId = getLogId();
251     ConnectionClientTransport transport =
252         new CallTracingTransport(
253             transportFactory
254                 .newClientTransport(address, options, transportLogger), callsTracer);
255     transportLogger.logId = transport.getLogId();
256     channelz.addClientSocket(transport);
257     pendingTransport = transport;
258     transports.add(transport);
259     Runnable runnable = transport.start(new TransportListener(transport));
260     if (runnable != null) {
261       syncContext.executeLater(runnable);
262     }
263     channelLogger.log(ChannelLogLevel.INFO, "Started transport {0}", transportLogger.logId);
264   }
265 
266   /**
267    * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
268    * @param status the causal status when the channel begins transition to
269    *     TRANSIENT_FAILURE.
270    */
scheduleBackoff(final Status status)271   private void scheduleBackoff(final Status status) {
272     syncContext.throwIfNotInThisSynchronizationContext();
273 
274     class EndOfCurrentBackoff implements Runnable {
275       @Override
276       public void run() {
277         reconnectTask = null;
278         channelLogger.log(ChannelLogLevel.INFO, "CONNECTING after backoff");
279         gotoNonErrorState(CONNECTING);
280         startNewTransport();
281       }
282     }
283 
284     gotoState(ConnectivityStateInfo.forTransientFailure(status));
285     if (reconnectPolicy == null) {
286       reconnectPolicy = backoffPolicyProvider.get();
287     }
288     long delayNanos =
289         reconnectPolicy.nextBackoffNanos() - connectingTimer.elapsed(TimeUnit.NANOSECONDS);
290     channelLogger.log(
291         ChannelLogLevel.INFO,
292         "TRANSIENT_FAILURE ({0}). Will reconnect after {1} ns",
293         printShortStatus(status), delayNanos);
294     Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
295     reconnectTask = syncContext.schedule(
296         new EndOfCurrentBackoff(),
297         delayNanos,
298         TimeUnit.NANOSECONDS,
299         scheduledExecutor);
300   }
301 
302   /**
303    * Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise this
304    * method has no effect.
305    */
resetConnectBackoff()306   void resetConnectBackoff() {
307     syncContext.execute(new Runnable() {
308       @Override
309       public void run() {
310         if (state.getState() != TRANSIENT_FAILURE) {
311           return;
312         }
313         cancelReconnectTask();
314         channelLogger.log(ChannelLogLevel.INFO, "CONNECTING; backoff interrupted");
315         gotoNonErrorState(CONNECTING);
316         startNewTransport();
317       }
318     });
319   }
320 
gotoNonErrorState(final ConnectivityState newState)321   private void gotoNonErrorState(final ConnectivityState newState) {
322     syncContext.throwIfNotInThisSynchronizationContext();
323 
324     gotoState(ConnectivityStateInfo.forNonError(newState));
325   }
326 
gotoState(final ConnectivityStateInfo newState)327   private void gotoState(final ConnectivityStateInfo newState) {
328     syncContext.throwIfNotInThisSynchronizationContext();
329 
330     if (state.getState() != newState.getState()) {
331       Preconditions.checkState(state.getState() != SHUTDOWN,
332           "Cannot transition out of SHUTDOWN to " + newState);
333       state = newState;
334       callback.onStateChange(InternalSubchannel.this, newState);
335     }
336   }
337 
338   /** Replaces the existing addresses, avoiding unnecessary reconnects. */
updateAddresses(final List<EquivalentAddressGroup> newAddressGroups)339   public void updateAddresses(final List<EquivalentAddressGroup> newAddressGroups) {
340     Preconditions.checkNotNull(newAddressGroups, "newAddressGroups");
341     checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry");
342     Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty");
343     final List<EquivalentAddressGroup> newImmutableAddressGroups =
344         Collections.unmodifiableList(new ArrayList<>(newAddressGroups));
345 
346     syncContext.execute(new Runnable() {
347       @Override
348       public void run() {
349         ManagedClientTransport savedTransport = null;
350         SocketAddress previousAddress = addressIndex.getCurrentAddress();
351         addressIndex.updateGroups(newImmutableAddressGroups);
352         addressGroups = newImmutableAddressGroups;
353         if (state.getState() == READY || state.getState() == CONNECTING) {
354           if (!addressIndex.seekTo(previousAddress)) {
355             // Forced to drop the connection
356             if (state.getState() == READY) {
357               savedTransport = activeTransport;
358               activeTransport = null;
359               addressIndex.reset();
360               gotoNonErrorState(IDLE);
361             } else {
362               pendingTransport.shutdown(
363                   Status.UNAVAILABLE.withDescription(
364                     "InternalSubchannel closed pending transport due to address change"));
365               pendingTransport = null;
366               addressIndex.reset();
367               startNewTransport();
368             }
369           }
370         }
371         if (savedTransport != null) {
372           if (shutdownDueToUpdateTask != null) {
373             // Keeping track of multiple shutdown tasks adds complexity, and shouldn't generally be
374             // necessary. This transport has probably already had plenty of time.
375             shutdownDueToUpdateTransport.shutdown(
376                 Status.UNAVAILABLE.withDescription(
377                     "InternalSubchannel closed transport early due to address change"));
378             shutdownDueToUpdateTask.cancel();
379             shutdownDueToUpdateTask = null;
380             shutdownDueToUpdateTransport = null;
381           }
382           // Avoid needless RPC failures by delaying the shutdown. See
383           // https://github.com/grpc/grpc-java/issues/2562
384           shutdownDueToUpdateTransport = savedTransport;
385           shutdownDueToUpdateTask = syncContext.schedule(
386               new Runnable() {
387                 @Override public void run() {
388                   ManagedClientTransport transport = shutdownDueToUpdateTransport;
389                   shutdownDueToUpdateTask = null;
390                   shutdownDueToUpdateTransport = null;
391                   transport.shutdown(
392                       Status.UNAVAILABLE.withDescription(
393                           "InternalSubchannel closed transport due to address change"));
394                 }
395               },
396               ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS,
397               TimeUnit.SECONDS,
398               scheduledExecutor);
399         }
400       }
401     });
402   }
403 
shutdown(final Status reason)404   public void shutdown(final Status reason) {
405     syncContext.execute(new Runnable() {
406       @Override
407       public void run() {
408         ManagedClientTransport savedActiveTransport;
409         ConnectionClientTransport savedPendingTransport;
410         if (state.getState() == SHUTDOWN) {
411           return;
412         }
413         shutdownReason = reason;
414         savedActiveTransport = activeTransport;
415         savedPendingTransport = pendingTransport;
416         activeTransport = null;
417         pendingTransport = null;
418         gotoNonErrorState(SHUTDOWN);
419         addressIndex.reset();
420         if (transports.isEmpty()) {
421           handleTermination();
422         }  // else: the callback will be run once all transports have been terminated
423         cancelReconnectTask();
424         if (shutdownDueToUpdateTask != null) {
425           shutdownDueToUpdateTask.cancel();
426           shutdownDueToUpdateTransport.shutdown(reason);
427           shutdownDueToUpdateTask = null;
428           shutdownDueToUpdateTransport = null;
429         }
430         if (savedActiveTransport != null) {
431           savedActiveTransport.shutdown(reason);
432         }
433         if (savedPendingTransport != null) {
434           savedPendingTransport.shutdown(reason);
435         }
436       }
437     });
438   }
439 
440   @Override
toString()441   public String toString() {
442     // addressGroupsCopy being a little stale is fine, just avoid calling toString with the lock
443     // since there may be many addresses.
444     return MoreObjects.toStringHelper(this)
445         .add("logId", logId.getId())
446         .add("addressGroups", addressGroups)
447         .toString();
448   }
449 
handleTermination()450   private void handleTermination() {
451     syncContext.execute(new Runnable() {
452       @Override
453       public void run() {
454         channelLogger.log(ChannelLogLevel.INFO, "Terminated");
455         callback.onTerminated(InternalSubchannel.this);
456       }
457     });
458   }
459 
handleTransportInUseState( final ConnectionClientTransport transport, final boolean inUse)460   private void handleTransportInUseState(
461       final ConnectionClientTransport transport, final boolean inUse) {
462     syncContext.execute(new Runnable() {
463       @Override
464       public void run() {
465         inUseStateAggregator.updateObjectInUse(transport, inUse);
466       }
467     });
468   }
469 
shutdownNow(final Status reason)470   void shutdownNow(final Status reason) {
471     shutdown(reason);
472     syncContext.execute(new Runnable() {
473       @Override
474       public void run() {
475         Collection<ManagedClientTransport> transportsCopy =
476             new ArrayList<ManagedClientTransport>(transports);
477 
478         for (ManagedClientTransport transport : transportsCopy) {
479           transport.shutdownNow(reason);
480         }
481       }
482     });
483   }
484 
getAddressGroups()485   List<EquivalentAddressGroup> getAddressGroups() {
486     return addressGroups;
487   }
488 
cancelReconnectTask()489   private void cancelReconnectTask() {
490     syncContext.throwIfNotInThisSynchronizationContext();
491 
492     if (reconnectTask != null) {
493       reconnectTask.cancel();
494       reconnectTask = null;
495       reconnectPolicy = null;
496     }
497   }
498 
499   @Override
getLogId()500   public InternalLogId getLogId() {
501     return logId;
502   }
503 
504   @Override
getStats()505   public ListenableFuture<ChannelStats> getStats() {
506     final SettableFuture<ChannelStats> channelStatsFuture = SettableFuture.create();
507     syncContext.execute(new Runnable() {
508       @Override
509       public void run() {
510         ChannelStats.Builder builder = new ChannelStats.Builder();
511         List<EquivalentAddressGroup> addressGroupsSnapshot = addressIndex.getGroups();
512         List<InternalWithLogId> transportsSnapshot = new ArrayList<InternalWithLogId>(transports);
513         builder.setTarget(addressGroupsSnapshot.toString()).setState(getState());
514         builder.setSockets(transportsSnapshot);
515         callsTracer.updateBuilder(builder);
516         channelTracer.updateBuilder(builder);
517         channelStatsFuture.set(builder.build());
518       }
519     });
520     return channelStatsFuture;
521   }
522 
getState()523   ConnectivityState getState() {
524     return state.getState();
525   }
526 
checkListHasNoNulls(List<?> list, String msg)527   private static void checkListHasNoNulls(List<?> list, String msg) {
528     for (Object item : list) {
529       Preconditions.checkNotNull(item, msg);
530     }
531   }
532 
533   /** Listener for real transports. */
534   private class TransportListener implements ManagedClientTransport.Listener {
535     final ConnectionClientTransport transport;
536     boolean shutdownInitiated = false;
537 
TransportListener(ConnectionClientTransport transport)538     TransportListener(ConnectionClientTransport transport) {
539       this.transport = transport;
540     }
541 
542     @Override
transportReady()543     public void transportReady() {
544       channelLogger.log(ChannelLogLevel.INFO, "READY");
545       syncContext.execute(new Runnable() {
546         @Override
547         public void run() {
548           reconnectPolicy = null;
549           if (shutdownReason != null) {
550             // activeTransport should have already been set to null by shutdown(). We keep it null.
551             Preconditions.checkState(activeTransport == null,
552                 "Unexpected non-null activeTransport");
553             transport.shutdown(shutdownReason);
554           } else if (pendingTransport == transport) {
555             activeTransport = transport;
556             pendingTransport = null;
557             gotoNonErrorState(READY);
558           }
559         }
560       });
561     }
562 
563     @Override
transportInUse(boolean inUse)564     public void transportInUse(boolean inUse) {
565       handleTransportInUseState(transport, inUse);
566     }
567 
568     @Override
transportShutdown(final Status s)569     public void transportShutdown(final Status s) {
570       channelLogger.log(
571           ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
572       shutdownInitiated = true;
573       syncContext.execute(new Runnable() {
574         @Override
575         public void run() {
576           if (state.getState() == SHUTDOWN) {
577             return;
578           }
579           if (activeTransport == transport) {
580             activeTransport = null;
581             addressIndex.reset();
582             gotoNonErrorState(IDLE);
583           } else if (pendingTransport == transport) {
584             Preconditions.checkState(state.getState() == CONNECTING,
585                 "Expected state is CONNECTING, actual state is %s", state.getState());
586             addressIndex.increment();
587             // Continue reconnect if there are still addresses to try.
588             if (!addressIndex.isValid()) {
589               pendingTransport = null;
590               addressIndex.reset();
591               // Initiate backoff
592               // Transition to TRANSIENT_FAILURE
593               scheduleBackoff(s);
594             } else {
595               startNewTransport();
596             }
597           }
598         }
599       });
600     }
601 
602     @Override
transportTerminated()603     public void transportTerminated() {
604       Preconditions.checkState(
605           shutdownInitiated, "transportShutdown() must be called before transportTerminated().");
606 
607       channelLogger.log(ChannelLogLevel.INFO, "{0} Terminated", transport.getLogId());
608       channelz.removeClientSocket(transport);
609       handleTransportInUseState(transport, false);
610       syncContext.execute(new Runnable() {
611         @Override
612         public void run() {
613           transports.remove(transport);
614           if (state.getState() == SHUTDOWN && transports.isEmpty()) {
615             handleTermination();
616           }
617         }
618       });
619     }
620   }
621 
622   // All methods are called in syncContext
623   abstract static class Callback {
624     /**
625      * Called when the subchannel is terminated, which means it's shut down and all transports
626      * have been terminated.
627      */
628     @ForOverride
onTerminated(InternalSubchannel is)629     void onTerminated(InternalSubchannel is) { }
630 
631     /**
632      * Called when the subchannel's connectivity state has changed.
633      */
634     @ForOverride
onStateChange(InternalSubchannel is, ConnectivityStateInfo newState)635     void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { }
636 
637     /**
638      * Called when the subchannel's in-use state has changed to true, which means at least one
639      * transport is in use.
640      */
641     @ForOverride
onInUse(InternalSubchannel is)642     void onInUse(InternalSubchannel is) { }
643 
644     /**
645      * Called when the subchannel's in-use state has changed to false, which means no transport is
646      * in use.
647      */
648     @ForOverride
onNotInUse(InternalSubchannel is)649     void onNotInUse(InternalSubchannel is) { }
650   }
651 
652   @VisibleForTesting
653   static final class CallTracingTransport extends ForwardingConnectionClientTransport {
654     private final ConnectionClientTransport delegate;
655     private final CallTracer callTracer;
656 
CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer)657     private CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer) {
658       this.delegate = delegate;
659       this.callTracer = callTracer;
660     }
661 
662     @Override
delegate()663     protected ConnectionClientTransport delegate() {
664       return delegate;
665     }
666 
667     @Override
newStream( MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions, ClientStreamTracer[] tracers)668     public ClientStream newStream(
669         MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
670         ClientStreamTracer[] tracers) {
671       final ClientStream streamDelegate = super.newStream(method, headers, callOptions, tracers);
672       return new ForwardingClientStream() {
673         @Override
674         protected ClientStream delegate() {
675           return streamDelegate;
676         }
677 
678         @Override
679         public void start(final ClientStreamListener listener) {
680           callTracer.reportCallStarted();
681           super.start(new ForwardingClientStreamListener() {
682             @Override
683             protected ClientStreamListener delegate() {
684               return listener;
685             }
686 
687             @Override
688             public void closed(
689                 Status status, RpcProgress rpcProgress, Metadata trailers) {
690               callTracer.reportCallEnded(status.isOk());
691               super.closed(status, rpcProgress, trailers);
692             }
693           });
694         }
695       };
696     }
697   }
698 
699   /** Index as in 'i', the pointer to an entry. Not a "search index." */
700   @VisibleForTesting
701   static final class Index {
702     private List<EquivalentAddressGroup> addressGroups;
703     private int groupIndex;
704     private int addressIndex;
705 
706     public Index(List<EquivalentAddressGroup> groups) {
707       this.addressGroups = groups;
708     }
709 
710     public boolean isValid() {
711       // addressIndex will never be invalid
712       return groupIndex < addressGroups.size();
713     }
714 
715     public boolean isAtBeginning() {
716       return groupIndex == 0 && addressIndex == 0;
717     }
718 
719     public void increment() {
720       EquivalentAddressGroup group = addressGroups.get(groupIndex);
721       addressIndex++;
722       if (addressIndex >= group.getAddresses().size()) {
723         groupIndex++;
724         addressIndex = 0;
725       }
726     }
727 
728     public void reset() {
729       groupIndex = 0;
730       addressIndex = 0;
731     }
732 
733     public SocketAddress getCurrentAddress() {
734       return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
735     }
736 
737     public Attributes getCurrentEagAttributes() {
738       return addressGroups.get(groupIndex).getAttributes();
739     }
740 
741     public List<EquivalentAddressGroup> getGroups() {
742       return addressGroups;
743     }
744 
745     /** Update to new groups, resetting the current index. */
746     public void updateGroups(List<EquivalentAddressGroup> newGroups) {
747       addressGroups = newGroups;
748       reset();
749     }
750 
751     /** Returns false if the needle was not found and the current index was left unchanged. */
752     public boolean seekTo(SocketAddress needle) {
753       for (int i = 0; i < addressGroups.size(); i++) {
754         EquivalentAddressGroup group = addressGroups.get(i);
755         int j = group.getAddresses().indexOf(needle);
756         if (j == -1) {
757           continue;
758         }
759         this.groupIndex = i;
760         this.addressIndex = j;
761         return true;
762       }
763       return false;
764     }
765   }
766 
767   private String printShortStatus(Status status) {
768     StringBuilder buffer = new StringBuilder();
769     buffer.append(status.getCode());
770     if (status.getDescription() != null) {
771       buffer.append("(").append(status.getDescription()).append(")");
772     }
773     if (status.getCause() != null) {
774       buffer.append("[").append(status.getCause()).append("]");
775     }
776     return buffer.toString();
777   }
778 
779   @VisibleForTesting
780   static final class TransportLogger extends ChannelLogger {
781     // Changed just after construction to break a cyclic dependency.
782     InternalLogId logId;
783 
784     @Override
785     public void log(ChannelLogLevel level, String message) {
786       ChannelLoggerImpl.logOnly(logId, level, message);
787     }
788 
789     @Override
790     public void log(ChannelLogLevel level, String messageFormat, Object... args) {
791       ChannelLoggerImpl.logOnly(logId, level, messageFormat, args);
792     }
793   }
794 }
795