• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static io.grpc.ConnectivityState.CONNECTING;
21 import static io.grpc.ConnectivityState.IDLE;
22 import static io.grpc.ConnectivityState.READY;
23 import static io.grpc.ConnectivityState.SHUTDOWN;
24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.Matchers.any;
32 import static org.mockito.Matchers.same;
33 import static org.mockito.Mockito.mock;
34 import static org.mockito.Mockito.never;
35 import static org.mockito.Mockito.times;
36 import static org.mockito.Mockito.verify;
37 import static org.mockito.Mockito.verifyNoMoreInteractions;
38 import static org.mockito.Mockito.when;
39 
40 import com.google.common.collect.Iterables;
41 import io.grpc.Attributes;
42 import io.grpc.ConnectivityStateInfo;
43 import io.grpc.EquivalentAddressGroup;
44 import io.grpc.InternalChannelz;
45 import io.grpc.InternalWithLogId;
46 import io.grpc.Status;
47 import io.grpc.internal.InternalSubchannel.CallTracingTransport;
48 import io.grpc.internal.InternalSubchannel.Index;
49 import io.grpc.internal.TestUtils.MockClientTransportInfo;
50 import java.net.SocketAddress;
51 import java.util.Arrays;
52 import java.util.LinkedList;
53 import java.util.List;
54 import java.util.concurrent.BlockingQueue;
55 import java.util.concurrent.atomic.AtomicInteger;
56 import org.junit.After;
57 import org.junit.Before;
58 import org.junit.Rule;
59 import org.junit.Test;
60 import org.junit.rules.ExpectedException;
61 import org.junit.runner.RunWith;
62 import org.junit.runners.JUnit4;
63 import org.mockito.Mock;
64 import org.mockito.MockitoAnnotations;
65 
66 /**
67  * Unit tests for {@link InternalSubchannel}.
68  */
69 @RunWith(JUnit4.class)
70 public class InternalSubchannelTest {
71 
72   @Rule
73   public final ExpectedException thrown = ExpectedException.none();
74 
75   private static final String AUTHORITY = "fakeauthority";
76   private static final String USER_AGENT = "mosaic";
77   private static final ConnectivityStateInfo UNAVAILABLE_STATE =
78       ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE);
79   private static final ConnectivityStateInfo RESOURCE_EXHAUSTED_STATE =
80       ConnectivityStateInfo.forTransientFailure(Status.RESOURCE_EXHAUSTED);
81   private static final Status SHUTDOWN_REASON = Status.UNAVAILABLE.withDescription("for test");
82 
83   // For scheduled executor
84   private final FakeClock fakeClock = new FakeClock();
85   // For channelExecutor
86   private final FakeClock fakeExecutor = new FakeClock();
87   private final ChannelExecutor channelExecutor = new ChannelExecutor();
88 
89   private final InternalChannelz channelz = new InternalChannelz();
90 
91   @Mock private BackoffPolicy mockBackoffPolicy1;
92   @Mock private BackoffPolicy mockBackoffPolicy2;
93   @Mock private BackoffPolicy mockBackoffPolicy3;
94   @Mock private BackoffPolicy.Provider mockBackoffPolicyProvider;
95   @Mock private ClientTransportFactory mockTransportFactory;
96 
97   private final LinkedList<String> callbackInvokes = new LinkedList<String>();
98   private final InternalSubchannel.Callback mockInternalSubchannelCallback =
99       new InternalSubchannel.Callback() {
100         @Override
101         protected void onTerminated(InternalSubchannel is) {
102           assertSame(internalSubchannel, is);
103           callbackInvokes.add("onTerminated");
104         }
105 
106         @Override
107         protected void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
108           assertSame(internalSubchannel, is);
109           callbackInvokes.add("onStateChange:" + newState);
110         }
111 
112         @Override
113         protected void onInUse(InternalSubchannel is) {
114           assertSame(internalSubchannel, is);
115           callbackInvokes.add("onInUse");
116         }
117 
118         @Override
119         protected void onNotInUse(InternalSubchannel is) {
120           assertSame(internalSubchannel, is);
121           callbackInvokes.add("onNotInUse");
122         }
123       };
124 
125   private InternalSubchannel internalSubchannel;
126   private BlockingQueue<MockClientTransportInfo> transports;
127 
setUp()128   @Before public void setUp() {
129     MockitoAnnotations.initMocks(this);
130 
131     when(mockBackoffPolicyProvider.get())
132         .thenReturn(mockBackoffPolicy1, mockBackoffPolicy2, mockBackoffPolicy3);
133     when(mockBackoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
134     when(mockBackoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L);
135     when(mockBackoffPolicy3.nextBackoffNanos()).thenReturn(10L, 100L);
136     transports = TestUtils.captureTransports(mockTransportFactory);
137   }
138 
noMorePendingTasks()139   @After public void noMorePendingTasks() {
140     assertEquals(0, fakeClock.numPendingTasks());
141     assertEquals(0, fakeExecutor.numPendingTasks());
142   }
143 
144   @Test(expected = IllegalArgumentException.class)
constructor_emptyEagList_throws()145   public void constructor_emptyEagList_throws() {
146     createInternalSubchannel(new EquivalentAddressGroup[0]);
147   }
148 
149   @Test(expected = NullPointerException.class)
constructor_eagListWithNull_throws()150   public void constructor_eagListWithNull_throws() {
151     createInternalSubchannel(new EquivalentAddressGroup[] {null});
152   }
153 
eagAttribute_propagatesToTransport()154   @Test public void eagAttribute_propagatesToTransport() {
155     SocketAddress addr = new SocketAddress() {};
156     Attributes attr = Attributes.newBuilder().set(Attributes.Key.create("some-key"), "1").build();
157     createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addr), attr));
158 
159     // First attempt
160     assertNull(internalSubchannel.obtainActiveTransport());
161     assertEquals(CONNECTING, internalSubchannel.getState());
162     verify(mockTransportFactory)
163         .newClientTransport(addr, createClientTransportOptions().setEagAttributes(attr));
164   }
165 
singleAddressReconnect()166   @Test public void singleAddressReconnect() {
167     SocketAddress addr = mock(SocketAddress.class);
168     createInternalSubchannel(addr);
169     assertEquals(IDLE, internalSubchannel.getState());
170 
171     // Invocation counters
172     int transportsCreated = 0;
173     int backoff1Consulted = 0;
174     int backoff2Consulted = 0;
175     int backoffReset = 0;
176 
177     // First attempt
178     assertEquals(IDLE, internalSubchannel.getState());
179     assertNoCallbackInvoke();
180     assertNull(internalSubchannel.obtainActiveTransport());
181     assertExactCallbackInvokes("onStateChange:CONNECTING");
182     assertEquals(CONNECTING, internalSubchannel.getState());
183     verify(mockTransportFactory, times(++transportsCreated))
184         .newClientTransport(addr, createClientTransportOptions());
185 
186     // Fail this one. Because there is only one address to try, enter TRANSIENT_FAILURE.
187     assertNoCallbackInvoke();
188     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
189     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
190     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
191     // Backoff reset and using first back-off value interval
192     verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos();
193     verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
194 
195     // Second attempt
196     // Transport creation doesn't happen until time is due
197     fakeClock.forwardNanos(9);
198     assertNull(internalSubchannel.obtainActiveTransport());
199     verify(mockTransportFactory, times(transportsCreated))
200         .newClientTransport(addr, createClientTransportOptions());
201     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
202 
203     assertNoCallbackInvoke();
204     fakeClock.forwardNanos(1);
205     assertExactCallbackInvokes("onStateChange:CONNECTING");
206     assertEquals(CONNECTING, internalSubchannel.getState());
207     verify(mockTransportFactory, times(++transportsCreated))
208         .newClientTransport(addr, createClientTransportOptions());
209     // Fail this one too
210     assertNoCallbackInvoke();
211     // Here we use a different status from the first failure, and verify that it's passed to
212     // the callback.
213     transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED);
214     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
215     assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE);
216     // Second back-off interval
217     verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos();
218     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
219 
220     // Third attempt
221     // Transport creation doesn't happen until time is due
222     fakeClock.forwardNanos(99);
223     assertNull(internalSubchannel.obtainActiveTransport());
224     verify(mockTransportFactory, times(transportsCreated))
225         .newClientTransport(addr, createClientTransportOptions());
226     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
227     assertNoCallbackInvoke();
228     fakeClock.forwardNanos(1);
229     assertEquals(CONNECTING, internalSubchannel.getState());
230     assertExactCallbackInvokes("onStateChange:CONNECTING");
231     assertNull(internalSubchannel.obtainActiveTransport());
232     verify(mockTransportFactory, times(++transportsCreated))
233         .newClientTransport(addr, createClientTransportOptions());
234     // Let this one succeed, will enter READY state.
235     assertNoCallbackInvoke();
236     transports.peek().listener.transportReady();
237     assertExactCallbackInvokes("onStateChange:READY");
238     assertEquals(READY, internalSubchannel.getState());
239     assertSame(
240         transports.peek().transport,
241         ((CallTracingTransport) internalSubchannel.obtainActiveTransport()).delegate());
242 
243     // Close the READY transport, will enter IDLE state.
244     assertNoCallbackInvoke();
245     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
246     assertEquals(IDLE, internalSubchannel.getState());
247     assertExactCallbackInvokes("onStateChange:IDLE");
248 
249     // Back-off is reset, and the next attempt will happen immediately
250     assertNull(internalSubchannel.obtainActiveTransport());
251     assertEquals(CONNECTING, internalSubchannel.getState());
252     assertExactCallbackInvokes("onStateChange:CONNECTING");
253     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
254     verify(mockTransportFactory, times(++transportsCreated))
255         .newClientTransport(addr, createClientTransportOptions());
256 
257     // Final checks for consultations on back-off policies
258     verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos();
259     verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffNanos();
260   }
261 
twoAddressesReconnect()262   @Test public void twoAddressesReconnect() {
263     SocketAddress addr1 = mock(SocketAddress.class);
264     SocketAddress addr2 = mock(SocketAddress.class);
265     createInternalSubchannel(addr1, addr2);
266     assertEquals(IDLE, internalSubchannel.getState());
267     // Invocation counters
268     int transportsAddr1 = 0;
269     int transportsAddr2 = 0;
270     int backoff1Consulted = 0;
271     int backoff2Consulted = 0;
272     int backoff3Consulted = 0;
273     int backoffReset = 0;
274 
275     // First attempt
276     assertNoCallbackInvoke();
277     assertNull(internalSubchannel.obtainActiveTransport());
278     assertExactCallbackInvokes("onStateChange:CONNECTING");
279     assertEquals(CONNECTING, internalSubchannel.getState());
280     verify(mockTransportFactory, times(++transportsAddr1))
281         .newClientTransport(addr1, createClientTransportOptions());
282 
283     // Let this one fail without success
284     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
285     // Still in CONNECTING
286     assertNull(internalSubchannel.obtainActiveTransport());
287     assertNoCallbackInvoke();
288     assertEquals(CONNECTING, internalSubchannel.getState());
289 
290     // Second attempt will start immediately. Still no back-off policy.
291     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
292     verify(mockTransportFactory, times(++transportsAddr2))
293         .newClientTransport(addr2, createClientTransportOptions());
294     assertNull(internalSubchannel.obtainActiveTransport());
295     // Fail this one too
296     assertNoCallbackInvoke();
297     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
298     // All addresses have failed. Delayed transport will be in back-off interval.
299     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
300     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
301     // Backoff reset and first back-off interval begins
302     verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos();
303     verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
304 
305     // No reconnect during TRANSIENT_FAILURE even when requested.
306     assertNull(internalSubchannel.obtainActiveTransport());
307     assertNoCallbackInvoke();
308     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
309 
310     // Third attempt is the first address, thus controlled by the first back-off interval.
311     fakeClock.forwardNanos(9);
312     verify(mockTransportFactory, times(transportsAddr1))
313         .newClientTransport(addr1, createClientTransportOptions());
314     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
315     assertNoCallbackInvoke();
316     fakeClock.forwardNanos(1);
317     assertExactCallbackInvokes("onStateChange:CONNECTING");
318     assertEquals(CONNECTING, internalSubchannel.getState());
319     verify(mockTransportFactory, times(++transportsAddr1))
320         .newClientTransport(addr1, createClientTransportOptions());
321     // Fail this one too
322     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
323     assertEquals(CONNECTING, internalSubchannel.getState());
324 
325     // Forth attempt will start immediately. Keep back-off policy.
326     assertNull(internalSubchannel.obtainActiveTransport());
327     assertEquals(CONNECTING, internalSubchannel.getState());
328     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
329     verify(mockTransportFactory, times(++transportsAddr2))
330         .newClientTransport(addr2, createClientTransportOptions());
331     // Fail this one too
332     assertNoCallbackInvoke();
333     transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED);
334     // All addresses have failed again. Delayed transport will be in back-off interval.
335     assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE);
336     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
337     // Second back-off interval begins
338     verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos();
339     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
340 
341     // Fifth attempt for the first address, thus controlled by the second back-off interval.
342     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
343     fakeClock.forwardNanos(99);
344     verify(mockTransportFactory, times(transportsAddr1))
345         .newClientTransport(addr1, createClientTransportOptions());
346     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
347     assertNoCallbackInvoke();
348     fakeClock.forwardNanos(1);
349     assertExactCallbackInvokes("onStateChange:CONNECTING");
350     assertEquals(CONNECTING, internalSubchannel.getState());
351     verify(mockTransportFactory, times(++transportsAddr1))
352         .newClientTransport(addr1, createClientTransportOptions());
353     // Let it through
354     assertNoCallbackInvoke();
355     transports.peek().listener.transportReady();
356     assertExactCallbackInvokes("onStateChange:READY");
357     assertEquals(READY, internalSubchannel.getState());
358 
359     assertSame(
360         transports.peek().transport,
361         ((CallTracingTransport) internalSubchannel.obtainActiveTransport()).delegate());
362     // Then close it.
363     assertNoCallbackInvoke();
364     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
365     assertExactCallbackInvokes("onStateChange:IDLE");
366     assertEquals(IDLE, internalSubchannel.getState());
367 
368     // First attempt after a successful connection. Old back-off policy should be ignored, but there
369     // is not yet a need for a new one. Start from the first address.
370     assertNull(internalSubchannel.obtainActiveTransport());
371     assertEquals(CONNECTING, internalSubchannel.getState());
372     assertExactCallbackInvokes("onStateChange:CONNECTING");
373     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
374     verify(mockTransportFactory, times(++transportsAddr1))
375         .newClientTransport(addr1, createClientTransportOptions());
376     // Fail the transport
377     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
378     assertEquals(CONNECTING, internalSubchannel.getState());
379 
380     // Second attempt will start immediately. Still no new back-off policy.
381     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
382     verify(mockTransportFactory, times(++transportsAddr2))
383         .newClientTransport(addr2, createClientTransportOptions());
384     // Fail this one too
385     assertEquals(CONNECTING, internalSubchannel.getState());
386     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
387     // All addresses have failed. Enter TRANSIENT_FAILURE. Back-off in effect.
388     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
389     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
390     // Back-off reset and first back-off interval begins
391     verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffNanos();
392     verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
393 
394     // Third attempt is the first address, thus controlled by the first back-off interval.
395     fakeClock.forwardNanos(9);
396     verify(mockTransportFactory, times(transportsAddr1))
397         .newClientTransport(addr1, createClientTransportOptions());
398     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
399     assertNoCallbackInvoke();
400     fakeClock.forwardNanos(1);
401     assertExactCallbackInvokes("onStateChange:CONNECTING");
402     assertEquals(CONNECTING, internalSubchannel.getState());
403     verify(mockTransportFactory, times(++transportsAddr1))
404         .newClientTransport(addr1, createClientTransportOptions());
405 
406     // Final checks on invocations on back-off policies
407     verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos();
408     verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffNanos();
409     verify(mockBackoffPolicy3, times(backoff3Consulted)).nextBackoffNanos();
410   }
411 
412   @Test
updateAddresses_emptyEagList_throws()413   public void updateAddresses_emptyEagList_throws() {
414     SocketAddress addr = new FakeSocketAddress();
415     createInternalSubchannel(addr);
416     thrown.expect(IllegalArgumentException.class);
417     internalSubchannel.updateAddresses(Arrays.<EquivalentAddressGroup>asList());
418   }
419 
420   @Test
updateAddresses_eagListWithNull_throws()421   public void updateAddresses_eagListWithNull_throws() {
422     SocketAddress addr = new FakeSocketAddress();
423     createInternalSubchannel(addr);
424     List<EquivalentAddressGroup> eags = Arrays.asList((EquivalentAddressGroup) null);
425     thrown.expect(NullPointerException.class);
426     internalSubchannel.updateAddresses(eags);
427   }
428 
updateAddresses_intersecting_ready()429   @Test public void updateAddresses_intersecting_ready() {
430     SocketAddress addr1 = mock(SocketAddress.class);
431     SocketAddress addr2 = mock(SocketAddress.class);
432     SocketAddress addr3 = mock(SocketAddress.class);
433     createInternalSubchannel(addr1, addr2);
434     assertEquals(IDLE, internalSubchannel.getState());
435 
436     // First address fails
437     assertNull(internalSubchannel.obtainActiveTransport());
438     assertExactCallbackInvokes("onStateChange:CONNECTING");
439     verify(mockTransportFactory).newClientTransport(addr1, createClientTransportOptions());
440     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
441     assertEquals(CONNECTING, internalSubchannel.getState());
442 
443     // Second address connects
444     verify(mockTransportFactory).newClientTransport(addr2, createClientTransportOptions());
445     transports.peek().listener.transportReady();
446     assertExactCallbackInvokes("onStateChange:READY");
447     assertEquals(READY, internalSubchannel.getState());
448 
449     // Update addresses
450     internalSubchannel.updateAddresses(
451         Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
452     assertNoCallbackInvoke();
453     assertEquals(READY, internalSubchannel.getState());
454     verify(transports.peek().transport, never()).shutdown(any(Status.class));
455     verify(transports.peek().transport, never()).shutdownNow(any(Status.class));
456 
457     // And new addresses chosen when re-connecting
458     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
459     assertExactCallbackInvokes("onStateChange:IDLE");
460 
461     assertNull(internalSubchannel.obtainActiveTransport());
462     assertEquals(0, fakeClock.numPendingTasks());
463     verify(mockTransportFactory, times(2))
464         .newClientTransport(addr2, createClientTransportOptions());
465     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
466     verify(mockTransportFactory).newClientTransport(addr3, createClientTransportOptions());
467     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
468     verifyNoMoreInteractions(mockTransportFactory);
469 
470     fakeClock.forwardNanos(10); // Drain retry, but don't care about result
471   }
472 
updateAddresses_intersecting_connecting()473   @Test public void updateAddresses_intersecting_connecting() {
474     SocketAddress addr1 = mock(SocketAddress.class);
475     SocketAddress addr2 = mock(SocketAddress.class);
476     SocketAddress addr3 = mock(SocketAddress.class);
477     createInternalSubchannel(addr1, addr2);
478     assertEquals(IDLE, internalSubchannel.getState());
479 
480     // First address fails
481     assertNull(internalSubchannel.obtainActiveTransport());
482     assertExactCallbackInvokes("onStateChange:CONNECTING");
483     verify(mockTransportFactory).newClientTransport(addr1, createClientTransportOptions());
484     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
485     assertEquals(CONNECTING, internalSubchannel.getState());
486 
487     // Second address connecting
488     verify(mockTransportFactory).newClientTransport(addr2, createClientTransportOptions());
489     assertNoCallbackInvoke();
490     assertEquals(CONNECTING, internalSubchannel.getState());
491 
492     // Update addresses
493     internalSubchannel.updateAddresses(
494         Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
495     assertNoCallbackInvoke();
496     assertEquals(CONNECTING, internalSubchannel.getState());
497     verify(transports.peek().transport, never()).shutdown(any(Status.class));
498     verify(transports.peek().transport, never()).shutdownNow(any(Status.class));
499 
500     // And new addresses chosen when re-connecting
501     transports.peek().listener.transportReady();
502     assertExactCallbackInvokes("onStateChange:READY");
503     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
504     assertExactCallbackInvokes("onStateChange:IDLE");
505 
506     assertNull(internalSubchannel.obtainActiveTransport());
507     assertEquals(0, fakeClock.numPendingTasks());
508     verify(mockTransportFactory, times(2))
509         .newClientTransport(addr2, createClientTransportOptions());
510     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
511     verify(mockTransportFactory).newClientTransport(addr3, createClientTransportOptions());
512     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
513     verifyNoMoreInteractions(mockTransportFactory);
514 
515     fakeClock.forwardNanos(10); // Drain retry, but don't care about result
516   }
517 
updateAddresses_disjoint_idle()518   @Test public void updateAddresses_disjoint_idle() {
519     SocketAddress addr1 = mock(SocketAddress.class);
520     SocketAddress addr2 = mock(SocketAddress.class);
521 
522     createInternalSubchannel(addr1);
523     internalSubchannel.updateAddresses(Arrays.asList(new EquivalentAddressGroup(addr2)));
524 
525     // Nothing happened on address update
526     verify(mockTransportFactory, never())
527         .newClientTransport(addr1, createClientTransportOptions());
528     verify(mockTransportFactory, never())
529         .newClientTransport(addr2, createClientTransportOptions());
530     verifyNoMoreInteractions(mockTransportFactory);
531     assertNoCallbackInvoke();
532     assertEquals(IDLE, internalSubchannel.getState());
533 
534     // But new address chosen when connecting
535     assertNull(internalSubchannel.obtainActiveTransport());
536     assertExactCallbackInvokes("onStateChange:CONNECTING");
537     verify(mockTransportFactory).newClientTransport(addr2, createClientTransportOptions());
538 
539     // And no other addresses attempted
540     assertEquals(0, fakeClock.numPendingTasks());
541     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
542     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
543     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
544     verifyNoMoreInteractions(mockTransportFactory);
545 
546     fakeClock.forwardNanos(10); // Drain retry, but don't care about result
547   }
548 
updateAddresses_disjoint_ready()549   @Test public void updateAddresses_disjoint_ready() {
550     SocketAddress addr1 = mock(SocketAddress.class);
551     SocketAddress addr2 = mock(SocketAddress.class);
552     SocketAddress addr3 = mock(SocketAddress.class);
553     SocketAddress addr4 = mock(SocketAddress.class);
554     createInternalSubchannel(addr1, addr2);
555     assertEquals(IDLE, internalSubchannel.getState());
556 
557     // First address fails
558     assertNull(internalSubchannel.obtainActiveTransport());
559     assertExactCallbackInvokes("onStateChange:CONNECTING");
560     verify(mockTransportFactory).newClientTransport(addr1, createClientTransportOptions());
561     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
562     assertEquals(CONNECTING, internalSubchannel.getState());
563 
564     // Second address connects
565     verify(mockTransportFactory).newClientTransport(addr2, createClientTransportOptions());
566     transports.peek().listener.transportReady();
567     assertExactCallbackInvokes("onStateChange:READY");
568     assertEquals(READY, internalSubchannel.getState());
569 
570     // Update addresses
571     internalSubchannel.updateAddresses(
572         Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4))));
573     assertExactCallbackInvokes("onStateChange:IDLE");
574     assertEquals(IDLE, internalSubchannel.getState());
575     verify(transports.peek().transport).shutdown(any(Status.class));
576 
577     // And new addresses chosen when re-connecting
578     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
579     assertNoCallbackInvoke();
580     assertEquals(IDLE, internalSubchannel.getState());
581 
582     assertNull(internalSubchannel.obtainActiveTransport());
583     assertEquals(0, fakeClock.numPendingTasks());
584     verify(mockTransportFactory).newClientTransport(addr3, createClientTransportOptions());
585     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
586     verify(mockTransportFactory).newClientTransport(addr4, createClientTransportOptions());
587     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
588     verifyNoMoreInteractions(mockTransportFactory);
589 
590     fakeClock.forwardNanos(10); // Drain retry, but don't care about result
591   }
592 
updateAddresses_disjoint_connecting()593   @Test public void updateAddresses_disjoint_connecting() {
594     SocketAddress addr1 = mock(SocketAddress.class);
595     SocketAddress addr2 = mock(SocketAddress.class);
596     SocketAddress addr3 = mock(SocketAddress.class);
597     SocketAddress addr4 = mock(SocketAddress.class);
598     createInternalSubchannel(addr1, addr2);
599     assertEquals(IDLE, internalSubchannel.getState());
600 
601     // First address fails
602     assertNull(internalSubchannel.obtainActiveTransport());
603     assertExactCallbackInvokes("onStateChange:CONNECTING");
604     verify(mockTransportFactory).newClientTransport(addr1, createClientTransportOptions());
605     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
606     assertEquals(CONNECTING, internalSubchannel.getState());
607 
608     // Second address connecting
609     verify(mockTransportFactory).newClientTransport(addr2, createClientTransportOptions());
610     assertNoCallbackInvoke();
611     assertEquals(CONNECTING, internalSubchannel.getState());
612 
613     // Update addresses
614     internalSubchannel.updateAddresses(
615         Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4))));
616     assertNoCallbackInvoke();
617     assertEquals(CONNECTING, internalSubchannel.getState());
618 
619     // And new addresses chosen immediately
620     verify(transports.poll().transport).shutdown(any(Status.class));
621     assertNoCallbackInvoke();
622     assertEquals(CONNECTING, internalSubchannel.getState());
623 
624     assertNull(internalSubchannel.obtainActiveTransport());
625     assertEquals(0, fakeClock.numPendingTasks());
626     verify(mockTransportFactory).newClientTransport(addr3, createClientTransportOptions());
627     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
628     verify(mockTransportFactory).newClientTransport(addr4, createClientTransportOptions());
629     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
630     verifyNoMoreInteractions(mockTransportFactory);
631 
632     fakeClock.forwardNanos(10); // Drain retry, but don't care about result
633   }
634 
635   @Test
connectIsLazy()636   public void connectIsLazy() {
637     SocketAddress addr = mock(SocketAddress.class);
638     createInternalSubchannel(addr);
639 
640     // Invocation counters
641     int transportsCreated = 0;
642 
643     // Won't connect until requested
644     verify(mockTransportFactory, times(transportsCreated))
645         .newClientTransport(addr, createClientTransportOptions());
646 
647     // First attempt
648     internalSubchannel.obtainActiveTransport();
649     assertExactCallbackInvokes("onStateChange:CONNECTING");
650     verify(mockTransportFactory, times(++transportsCreated))
651         .newClientTransport(addr, createClientTransportOptions());
652 
653     // Fail this one
654     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
655     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
656 
657     // Will always reconnect after back-off
658     fakeClock.forwardNanos(10);
659     assertExactCallbackInvokes("onStateChange:CONNECTING");
660     verify(mockTransportFactory, times(++transportsCreated))
661         .newClientTransport(addr, createClientTransportOptions());
662 
663     // Make this one proceed
664     transports.peek().listener.transportReady();
665     assertExactCallbackInvokes("onStateChange:READY");
666     // Then go-away
667     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
668     assertExactCallbackInvokes("onStateChange:IDLE");
669 
670     // No scheduled tasks that would ever try to reconnect ...
671     assertEquals(0, fakeClock.numPendingTasks());
672     assertEquals(0, fakeExecutor.numPendingTasks());
673 
674     // ... until it's requested.
675     internalSubchannel.obtainActiveTransport();
676     assertExactCallbackInvokes("onStateChange:CONNECTING");
677     verify(mockTransportFactory, times(++transportsCreated))
678         .newClientTransport(addr, createClientTransportOptions());
679   }
680 
681   @Test
shutdownWhenReady()682   public void shutdownWhenReady() throws Exception {
683     SocketAddress addr = mock(SocketAddress.class);
684     createInternalSubchannel(addr);
685 
686     internalSubchannel.obtainActiveTransport();
687     MockClientTransportInfo transportInfo = transports.poll();
688     transportInfo.listener.transportReady();
689     assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
690 
691     internalSubchannel.shutdown(SHUTDOWN_REASON);
692     verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON));
693     assertExactCallbackInvokes("onStateChange:SHUTDOWN");
694 
695     transportInfo.listener.transportTerminated();
696     assertExactCallbackInvokes("onTerminated");
697     verify(transportInfo.transport, never()).shutdownNow(any(Status.class));
698   }
699 
700   @Test
shutdownBeforeTransportCreated()701   public void shutdownBeforeTransportCreated() throws Exception {
702     SocketAddress addr = mock(SocketAddress.class);
703     createInternalSubchannel(addr);
704 
705     // First transport is created immediately
706     internalSubchannel.obtainActiveTransport();
707     assertExactCallbackInvokes("onStateChange:CONNECTING");
708     verify(mockTransportFactory).newClientTransport(addr, createClientTransportOptions());
709 
710     // Fail this one
711     MockClientTransportInfo transportInfo = transports.poll();
712     transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
713     transportInfo.listener.transportTerminated();
714 
715     // Entering TRANSIENT_FAILURE, waiting for back-off
716     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
717 
718     // Save the reconnectTask before shutting down
719     FakeClock.ScheduledTask reconnectTask = null;
720     for (FakeClock.ScheduledTask task : fakeClock.getPendingTasks()) {
721       if (task.command.toString().contains("EndOfCurrentBackoff")) {
722         assertNull("There shouldn't be more than one reconnectTask", reconnectTask);
723         assertFalse(task.isDone());
724         reconnectTask = task;
725       }
726     }
727     assertNotNull("There should be at least one reconnectTask", reconnectTask);
728 
729     // Shut down InternalSubchannel before the transport is created.
730     internalSubchannel.shutdown(SHUTDOWN_REASON);
731     assertTrue(reconnectTask.isCancelled());
732     // InternalSubchannel terminated promptly.
733     assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated");
734 
735     // Simulate a race between reconnectTask cancellation and execution -- the task runs anyway.
736     // This should not lead to the creation of a new transport.
737     reconnectTask.command.run();
738 
739     // Futher call to obtainActiveTransport() is no-op.
740     assertNull(internalSubchannel.obtainActiveTransport());
741     assertEquals(SHUTDOWN, internalSubchannel.getState());
742     assertNoCallbackInvoke();
743 
744     // No more transports will be created.
745     fakeClock.forwardNanos(10000);
746     assertEquals(SHUTDOWN, internalSubchannel.getState());
747     verifyNoMoreInteractions(mockTransportFactory);
748     assertEquals(0, transports.size());
749     assertNoCallbackInvoke();
750   }
751 
752   @Test
shutdownBeforeTransportReady()753   public void shutdownBeforeTransportReady() throws Exception {
754     SocketAddress addr = mock(SocketAddress.class);
755     createInternalSubchannel(addr);
756 
757     internalSubchannel.obtainActiveTransport();
758     assertExactCallbackInvokes("onStateChange:CONNECTING");
759     MockClientTransportInfo transportInfo = transports.poll();
760 
761     // Shutdown the InternalSubchannel before the pending transport is ready
762     assertNull(internalSubchannel.obtainActiveTransport());
763     internalSubchannel.shutdown(SHUTDOWN_REASON);
764     assertExactCallbackInvokes("onStateChange:SHUTDOWN");
765 
766     // The transport should've been shut down even though it's not the active transport yet.
767     verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON));
768     transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
769     assertNoCallbackInvoke();
770     transportInfo.listener.transportTerminated();
771     assertExactCallbackInvokes("onTerminated");
772     assertEquals(SHUTDOWN, internalSubchannel.getState());
773   }
774 
775   @Test
shutdownNow()776   public void shutdownNow() throws Exception {
777     SocketAddress addr = mock(SocketAddress.class);
778     createInternalSubchannel(addr);
779 
780     internalSubchannel.obtainActiveTransport();
781     MockClientTransportInfo t1 = transports.poll();
782     t1.listener.transportReady();
783     assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
784     t1.listener.transportShutdown(Status.UNAVAILABLE);
785     assertExactCallbackInvokes("onStateChange:IDLE");
786 
787     internalSubchannel.obtainActiveTransport();
788     assertExactCallbackInvokes("onStateChange:CONNECTING");
789     MockClientTransportInfo t2 = transports.poll();
790 
791     Status status = Status.UNAVAILABLE.withDescription("Requested");
792     internalSubchannel.shutdownNow(status);
793 
794     verify(t1.transport).shutdownNow(same(status));
795     verify(t2.transport).shutdownNow(same(status));
796     assertExactCallbackInvokes("onStateChange:SHUTDOWN");
797   }
798 
799   @Test
obtainTransportAfterShutdown()800   public void obtainTransportAfterShutdown() throws Exception {
801     SocketAddress addr = mock(SocketAddress.class);
802     createInternalSubchannel(addr);
803 
804     internalSubchannel.shutdown(SHUTDOWN_REASON);
805     assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated");
806     assertEquals(SHUTDOWN, internalSubchannel.getState());
807     assertNull(internalSubchannel.obtainActiveTransport());
808     verify(mockTransportFactory, times(0))
809         .newClientTransport(addr, createClientTransportOptions());
810     assertNoCallbackInvoke();
811     assertEquals(SHUTDOWN, internalSubchannel.getState());
812   }
813 
814   @Test
logId()815   public void logId() {
816     createInternalSubchannel(mock(SocketAddress.class));
817 
818     assertNotNull(internalSubchannel.getLogId());
819   }
820 
821   @Test
inUseState()822   public void inUseState() {
823     SocketAddress addr = mock(SocketAddress.class);
824     createInternalSubchannel(addr);
825 
826     internalSubchannel.obtainActiveTransport();
827     MockClientTransportInfo t0 = transports.poll();
828     t0.listener.transportReady();
829     assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
830     t0.listener.transportInUse(true);
831     assertExactCallbackInvokes("onInUse");
832 
833     t0.listener.transportInUse(false);
834     assertExactCallbackInvokes("onNotInUse");
835 
836     t0.listener.transportInUse(true);
837     assertExactCallbackInvokes("onInUse");
838     t0.listener.transportShutdown(Status.UNAVAILABLE);
839     assertExactCallbackInvokes("onStateChange:IDLE");
840 
841     assertNull(internalSubchannel.obtainActiveTransport());
842     MockClientTransportInfo t1 = transports.poll();
843     t1.listener.transportReady();
844     assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
845     t1.listener.transportInUse(true);
846     // InternalSubchannel is already in-use, thus doesn't call the callback
847     assertNoCallbackInvoke();
848 
849     t1.listener.transportInUse(false);
850     // t0 is still in-use
851     assertNoCallbackInvoke();
852 
853     t0.listener.transportInUse(false);
854     assertExactCallbackInvokes("onNotInUse");
855   }
856 
857   @Test
transportTerminateWithoutExitingInUse()858   public void transportTerminateWithoutExitingInUse() {
859     // An imperfect transport that terminates without going out of in-use. InternalSubchannel will
860     // clear the in-use bit for it.
861     SocketAddress addr = mock(SocketAddress.class);
862     createInternalSubchannel(addr);
863 
864     internalSubchannel.obtainActiveTransport();
865     MockClientTransportInfo t0 = transports.poll();
866     t0.listener.transportReady();
867     assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
868     t0.listener.transportInUse(true);
869     assertExactCallbackInvokes("onInUse");
870 
871     t0.listener.transportShutdown(Status.UNAVAILABLE);
872     assertExactCallbackInvokes("onStateChange:IDLE");
873     t0.listener.transportTerminated();
874     assertExactCallbackInvokes("onNotInUse");
875   }
876 
877   @Test
transportStartReturnsRunnable()878   public void transportStartReturnsRunnable() {
879     SocketAddress addr1 = mock(SocketAddress.class);
880     SocketAddress addr2 = mock(SocketAddress.class);
881     createInternalSubchannel(addr1, addr2);
882     final AtomicInteger runnableInvokes = new AtomicInteger(0);
883     Runnable startRunnable = new Runnable() {
884         @Override
885         public void run() {
886           runnableInvokes.incrementAndGet();
887         }
888       };
889     transports = TestUtils.captureTransports(mockTransportFactory, startRunnable);
890 
891     assertEquals(0, runnableInvokes.get());
892     internalSubchannel.obtainActiveTransport();
893     assertEquals(1, runnableInvokes.get());
894     internalSubchannel.obtainActiveTransport();
895     assertEquals(1, runnableInvokes.get());
896 
897     MockClientTransportInfo t0 = transports.poll();
898     t0.listener.transportShutdown(Status.UNAVAILABLE);
899     assertEquals(2, runnableInvokes.get());
900 
901     // 2nd address: reconnect immediatly
902     MockClientTransportInfo t1 = transports.poll();
903     t1.listener.transportShutdown(Status.UNAVAILABLE);
904 
905     // Addresses exhausted, waiting for back-off.
906     assertEquals(2, runnableInvokes.get());
907     // Run out the back-off period
908     fakeClock.forwardNanos(10);
909     assertEquals(3, runnableInvokes.get());
910 
911     // This test doesn't care about scheduled InternalSubchannel callbacks.  Clear it up so that
912     // noMorePendingTasks() won't fail.
913     fakeExecutor.runDueTasks();
914     assertEquals(3, runnableInvokes.get());
915   }
916 
917   @Test
resetConnectBackoff()918   public void resetConnectBackoff() throws Exception {
919     SocketAddress addr = mock(SocketAddress.class);
920     createInternalSubchannel(addr);
921 
922     // Move into TRANSIENT_FAILURE to schedule reconnect
923     internalSubchannel.obtainActiveTransport();
924     assertExactCallbackInvokes("onStateChange:CONNECTING");
925     verify(mockTransportFactory).newClientTransport(addr, createClientTransportOptions());
926     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
927     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
928 
929     // Save the reconnectTask
930     FakeClock.ScheduledTask reconnectTask = null;
931     for (FakeClock.ScheduledTask task : fakeClock.getPendingTasks()) {
932       if (task.command.toString().contains("EndOfCurrentBackoff")) {
933         assertNull("There shouldn't be more than one reconnectTask", reconnectTask);
934         assertFalse(task.isDone());
935         reconnectTask = task;
936       }
937     }
938     assertNotNull("There should be at least one reconnectTask", reconnectTask);
939 
940     internalSubchannel.resetConnectBackoff();
941 
942     verify(mockTransportFactory, times(2))
943         .newClientTransport(addr, createClientTransportOptions());
944     assertExactCallbackInvokes("onStateChange:CONNECTING");
945     assertTrue(reconnectTask.isCancelled());
946 
947     // Simulate a race between cancel and the task scheduler. Should be a no-op.
948     reconnectTask.command.run();
949     assertNoCallbackInvoke();
950     verify(mockTransportFactory, times(2))
951         .newClientTransport(addr, createClientTransportOptions());
952     verify(mockBackoffPolicyProvider, times(1)).get();
953 
954     // Fail the reconnect attempt to verify that a fresh reconnect policy is generated after
955     // invoking resetConnectBackoff()
956     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
957     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
958     verify(mockBackoffPolicyProvider, times(2)).get();
959     fakeClock.forwardNanos(10);
960     assertExactCallbackInvokes("onStateChange:CONNECTING");
961     assertEquals(CONNECTING, internalSubchannel.getState());
962   }
963 
964   @Test
resetConnectBackoff_noopOnIdleTransport()965   public void resetConnectBackoff_noopOnIdleTransport() throws Exception {
966     SocketAddress addr = mock(SocketAddress.class);
967     createInternalSubchannel(addr);
968     assertEquals(IDLE, internalSubchannel.getState());
969 
970     internalSubchannel.resetConnectBackoff();
971 
972     assertNoCallbackInvoke();
973   }
974 
975   @Test
channelzMembership()976   public void channelzMembership() throws Exception {
977     SocketAddress addr1 = mock(SocketAddress.class);
978     createInternalSubchannel(addr1);
979     internalSubchannel.obtainActiveTransport();
980 
981     MockClientTransportInfo t0 = transports.poll();
982     assertTrue(channelz.containsClientSocket(t0.transport.getLogId()));
983     t0.listener.transportTerminated();
984     assertFalse(channelz.containsClientSocket(t0.transport.getLogId()));
985   }
986 
987   @Test
channelzStatContainsTransport()988   public void channelzStatContainsTransport() throws Exception {
989     SocketAddress addr = new SocketAddress() {};
990     assertThat(transports).isEmpty();
991     createInternalSubchannel(addr);
992     internalSubchannel.obtainActiveTransport();
993 
994     InternalWithLogId registeredTransport
995         = Iterables.getOnlyElement(internalSubchannel.getStats().get().sockets);
996     MockClientTransportInfo actualTransport = Iterables.getOnlyElement(transports);
997     assertEquals(actualTransport.transport.getLogId(), registeredTransport.getLogId());
998   }
999 
index_looping()1000   @Test public void index_looping() {
1001     Attributes.Key<String> key = Attributes.Key.create("some-key");
1002     Attributes attr1 = Attributes.newBuilder().set(key, "1").build();
1003     Attributes attr2 = Attributes.newBuilder().set(key, "2").build();
1004     Attributes attr3 = Attributes.newBuilder().set(key, "3").build();
1005     SocketAddress addr1 = new FakeSocketAddress();
1006     SocketAddress addr2 = new FakeSocketAddress();
1007     SocketAddress addr3 = new FakeSocketAddress();
1008     SocketAddress addr4 = new FakeSocketAddress();
1009     SocketAddress addr5 = new FakeSocketAddress();
1010     Index index = new Index(Arrays.asList(
1011         new EquivalentAddressGroup(Arrays.asList(addr1, addr2), attr1),
1012         new EquivalentAddressGroup(Arrays.asList(addr3), attr2),
1013         new EquivalentAddressGroup(Arrays.asList(addr4, addr5), attr3)));
1014     assertThat(index.getCurrentAddress()).isSameAs(addr1);
1015     assertThat(index.getCurrentEagAttributes()).isSameAs(attr1);
1016     assertThat(index.isAtBeginning()).isTrue();
1017     assertThat(index.isValid()).isTrue();
1018 
1019     index.increment();
1020     assertThat(index.getCurrentAddress()).isSameAs(addr2);
1021     assertThat(index.getCurrentEagAttributes()).isSameAs(attr1);
1022     assertThat(index.isAtBeginning()).isFalse();
1023     assertThat(index.isValid()).isTrue();
1024 
1025     index.increment();
1026     assertThat(index.getCurrentAddress()).isSameAs(addr3);
1027     assertThat(index.getCurrentEagAttributes()).isSameAs(attr2);
1028     assertThat(index.isAtBeginning()).isFalse();
1029     assertThat(index.isValid()).isTrue();
1030 
1031     index.increment();
1032     assertThat(index.getCurrentAddress()).isSameAs(addr4);
1033     assertThat(index.getCurrentEagAttributes()).isSameAs(attr3);
1034     assertThat(index.isAtBeginning()).isFalse();
1035     assertThat(index.isValid()).isTrue();
1036 
1037     index.increment();
1038     assertThat(index.getCurrentAddress()).isSameAs(addr5);
1039     assertThat(index.getCurrentEagAttributes()).isSameAs(attr3);
1040     assertThat(index.isAtBeginning()).isFalse();
1041     assertThat(index.isValid()).isTrue();
1042 
1043     index.increment();
1044     assertThat(index.isAtBeginning()).isFalse();
1045     assertThat(index.isValid()).isFalse();
1046 
1047     index.reset();
1048     assertThat(index.getCurrentAddress()).isSameAs(addr1);
1049     assertThat(index.getCurrentEagAttributes()).isSameAs(attr1);
1050     assertThat(index.isAtBeginning()).isTrue();
1051     assertThat(index.isValid()).isTrue();
1052 
1053     // We want to make sure both groupIndex and addressIndex are reset
1054     index.increment();
1055     index.increment();
1056     index.increment();
1057     index.increment();
1058     assertThat(index.getCurrentAddress()).isSameAs(addr5);
1059     assertThat(index.getCurrentEagAttributes()).isSameAs(attr3);
1060     index.reset();
1061     assertThat(index.getCurrentAddress()).isSameAs(addr1);
1062     assertThat(index.getCurrentEagAttributes()).isSameAs(attr1);
1063   }
1064 
index_updateGroups_resets()1065   @Test public void index_updateGroups_resets() {
1066     SocketAddress addr1 = new FakeSocketAddress();
1067     SocketAddress addr2 = new FakeSocketAddress();
1068     SocketAddress addr3 = new FakeSocketAddress();
1069     Index index = new Index(Arrays.asList(
1070         new EquivalentAddressGroup(Arrays.asList(addr1)),
1071         new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
1072     index.increment();
1073     index.increment();
1074     // We want to make sure both groupIndex and addressIndex are reset
1075     index.updateGroups(Arrays.asList(
1076         new EquivalentAddressGroup(Arrays.asList(addr1)),
1077         new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
1078     assertThat(index.getCurrentAddress()).isSameAs(addr1);
1079   }
1080 
index_seekTo()1081   @Test public void index_seekTo() {
1082     SocketAddress addr1 = new FakeSocketAddress();
1083     SocketAddress addr2 = new FakeSocketAddress();
1084     SocketAddress addr3 = new FakeSocketAddress();
1085     Index index = new Index(Arrays.asList(
1086         new EquivalentAddressGroup(Arrays.asList(addr1, addr2)),
1087         new EquivalentAddressGroup(Arrays.asList(addr3))));
1088     assertThat(index.seekTo(addr3)).isTrue();
1089     assertThat(index.getCurrentAddress()).isSameAs(addr3);
1090     assertThat(index.seekTo(addr1)).isTrue();
1091     assertThat(index.getCurrentAddress()).isSameAs(addr1);
1092     assertThat(index.seekTo(addr2)).isTrue();
1093     assertThat(index.getCurrentAddress()).isSameAs(addr2);
1094     index.seekTo(new FakeSocketAddress());
1095     // Failed seekTo doesn't change the index
1096     assertThat(index.getCurrentAddress()).isSameAs(addr2);
1097   }
1098 
1099   /** Create ClientTransportOptions. Should not be reused if it may be mutated. */
createClientTransportOptions()1100   private ClientTransportFactory.ClientTransportOptions createClientTransportOptions() {
1101     return new ClientTransportFactory.ClientTransportOptions()
1102         .setAuthority(AUTHORITY)
1103         .setUserAgent(USER_AGENT);
1104   }
1105 
createInternalSubchannel(SocketAddress .... addrs)1106   private void createInternalSubchannel(SocketAddress ... addrs) {
1107     createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addrs)));
1108   }
1109 
createInternalSubchannel(EquivalentAddressGroup .... addrs)1110   private void createInternalSubchannel(EquivalentAddressGroup ... addrs) {
1111     List<EquivalentAddressGroup> addressGroups = Arrays.asList(addrs);
1112     internalSubchannel = new InternalSubchannel(addressGroups, AUTHORITY, USER_AGENT,
1113         mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(),
1114         fakeClock.getStopwatchSupplier(), channelExecutor, mockInternalSubchannelCallback,
1115         channelz, CallTracer.getDefaultFactory().create(), null,
1116         new TimeProvider() {
1117           @Override
1118           public long currentTimeNanos() {
1119             return fakeClock.getTicker().read();
1120           }
1121         });
1122   }
1123 
assertNoCallbackInvoke()1124   private void assertNoCallbackInvoke() {
1125     while (fakeExecutor.runDueTasks() > 0) {}
1126     assertEquals(0, callbackInvokes.size());
1127   }
1128 
assertExactCallbackInvokes(String .... expectedInvokes)1129   private void assertExactCallbackInvokes(String ... expectedInvokes) {
1130     assertEquals(0, channelExecutor.numPendingTasks());
1131     assertEquals(Arrays.asList(expectedInvokes), callbackInvokes);
1132     callbackInvokes.clear();
1133   }
1134 
1135   private static class FakeSocketAddress extends SocketAddress {}
1136 }
1137