• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static io.grpc.ConnectivityState.CONNECTING;
21 import static io.grpc.ConnectivityState.IDLE;
22 import static io.grpc.ConnectivityState.READY;
23 import static io.grpc.ConnectivityState.SHUTDOWN;
24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.ArgumentMatchers.eq;
33 import static org.mockito.ArgumentMatchers.isA;
34 import static org.mockito.ArgumentMatchers.same;
35 import static org.mockito.Mockito.mock;
36 import static org.mockito.Mockito.never;
37 import static org.mockito.Mockito.times;
38 import static org.mockito.Mockito.verify;
39 import static org.mockito.Mockito.verifyNoMoreInteractions;
40 import static org.mockito.Mockito.when;
41 
42 import com.google.common.collect.Iterables;
43 import io.grpc.Attributes;
44 import io.grpc.ConnectivityStateInfo;
45 import io.grpc.EquivalentAddressGroup;
46 import io.grpc.InternalChannelz;
47 import io.grpc.InternalLogId;
48 import io.grpc.InternalWithLogId;
49 import io.grpc.Status;
50 import io.grpc.SynchronizationContext;
51 import io.grpc.internal.InternalSubchannel.CallTracingTransport;
52 import io.grpc.internal.InternalSubchannel.Index;
53 import io.grpc.internal.InternalSubchannel.TransportLogger;
54 import io.grpc.internal.TestUtils.MockClientTransportInfo;
55 import java.net.SocketAddress;
56 import java.util.Arrays;
57 import java.util.LinkedList;
58 import java.util.List;
59 import java.util.concurrent.BlockingQueue;
60 import java.util.concurrent.TimeUnit;
61 import java.util.concurrent.atomic.AtomicInteger;
62 import org.junit.After;
63 import org.junit.Before;
64 import org.junit.Rule;
65 import org.junit.Test;
66 import org.junit.rules.ExpectedException;
67 import org.junit.runner.RunWith;
68 import org.junit.runners.JUnit4;
69 import org.mockito.Mock;
70 import org.mockito.junit.MockitoJUnit;
71 import org.mockito.junit.MockitoRule;
72 
73 /**
74  * Unit tests for {@link InternalSubchannel}.
75  */
76 @RunWith(JUnit4.class)
77 public class InternalSubchannelTest {
78   @Rule
79   public final MockitoRule mocks = MockitoJUnit.rule();
80   @SuppressWarnings("deprecation") // https://github.com/grpc/grpc-java/issues/7467
81   @Rule
82   public final ExpectedException thrown = ExpectedException.none();
83 
84   private static final String AUTHORITY = "fakeauthority";
85   private static final String USER_AGENT = "mosaic";
86   private static final ConnectivityStateInfo UNAVAILABLE_STATE =
87       ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE);
88   private static final ConnectivityStateInfo RESOURCE_EXHAUSTED_STATE =
89       ConnectivityStateInfo.forTransientFailure(Status.RESOURCE_EXHAUSTED);
90   private static final Status SHUTDOWN_REASON = Status.UNAVAILABLE.withDescription("for test");
91 
92   // For scheduled executor
93   private final FakeClock fakeClock = new FakeClock();
94   // For syncContext
95   private final FakeClock fakeExecutor = new FakeClock();
96   private final SynchronizationContext syncContext = new SynchronizationContext(
97       new Thread.UncaughtExceptionHandler() {
98         @Override
99         public void uncaughtException(Thread t, Throwable e) {
100           throw new AssertionError(e);
101         }
102       });
103 
104   private final InternalChannelz channelz = new InternalChannelz();
105 
106   @Mock private BackoffPolicy mockBackoffPolicy1;
107   @Mock private BackoffPolicy mockBackoffPolicy2;
108   @Mock private BackoffPolicy mockBackoffPolicy3;
109   @Mock private BackoffPolicy.Provider mockBackoffPolicyProvider;
110   @Mock private ClientTransportFactory mockTransportFactory;
111 
112   private final LinkedList<String> callbackInvokes = new LinkedList<>();
113   private final InternalSubchannel.Callback mockInternalSubchannelCallback =
114       new InternalSubchannel.Callback() {
115         @Override
116         protected void onTerminated(InternalSubchannel is) {
117           assertSame(internalSubchannel, is);
118           callbackInvokes.add("onTerminated");
119         }
120 
121         @Override
122         protected void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
123           assertSame(internalSubchannel, is);
124           callbackInvokes.add("onStateChange:" + newState);
125         }
126 
127         @Override
128         protected void onInUse(InternalSubchannel is) {
129           assertSame(internalSubchannel, is);
130           callbackInvokes.add("onInUse");
131         }
132 
133         @Override
134         protected void onNotInUse(InternalSubchannel is) {
135           assertSame(internalSubchannel, is);
136           callbackInvokes.add("onNotInUse");
137         }
138       };
139 
140   private InternalSubchannel internalSubchannel;
141   private BlockingQueue<MockClientTransportInfo> transports;
142 
setUp()143   @Before public void setUp() {
144     when(mockBackoffPolicyProvider.get())
145         .thenReturn(mockBackoffPolicy1, mockBackoffPolicy2, mockBackoffPolicy3);
146     when(mockBackoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
147     when(mockBackoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L);
148     when(mockBackoffPolicy3.nextBackoffNanos()).thenReturn(10L, 100L);
149     transports = TestUtils.captureTransports(mockTransportFactory);
150   }
151 
noMorePendingTasks()152   @After public void noMorePendingTasks() {
153     assertEquals(0, fakeClock.numPendingTasks());
154     assertEquals(0, fakeExecutor.numPendingTasks());
155   }
156 
157   @Test(expected = IllegalArgumentException.class)
constructor_emptyEagList_throws()158   public void constructor_emptyEagList_throws() {
159     createInternalSubchannel(new EquivalentAddressGroup[0]);
160   }
161 
162   @Test(expected = NullPointerException.class)
constructor_eagListWithNull_throws()163   public void constructor_eagListWithNull_throws() {
164     createInternalSubchannel(new EquivalentAddressGroup[] {null});
165   }
166 
eagAttribute_propagatesToTransport()167   @Test public void eagAttribute_propagatesToTransport() {
168     SocketAddress addr = new SocketAddress() {};
169     Attributes attr = Attributes.newBuilder().set(Attributes.Key.create("some-key"), "1").build();
170     createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addr), attr));
171 
172     // First attempt
173     assertNull(internalSubchannel.obtainActiveTransport());
174     assertEquals(CONNECTING, internalSubchannel.getState());
175     verify(mockTransportFactory).newClientTransport(
176         eq(addr),
177         eq(createClientTransportOptions().setEagAttributes(attr)),
178         isA(TransportLogger.class));
179   }
180 
eagAuthorityOverride_propagatesToTransport()181   @Test public void eagAuthorityOverride_propagatesToTransport() {
182     SocketAddress addr = new SocketAddress() {};
183     String overriddenAuthority = "authority-override";
184     Attributes attr = Attributes.newBuilder()
185             .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, overriddenAuthority).build();
186     createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addr), attr));
187 
188     // First attempt
189     assertNull(internalSubchannel.obtainActiveTransport());
190     assertEquals(CONNECTING, internalSubchannel.getState());
191     verify(mockTransportFactory).newClientTransport(
192         eq(addr),
193         eq(createClientTransportOptions().setAuthority(overriddenAuthority).setEagAttributes(attr)),
194         isA(TransportLogger.class));
195   }
196 
singleAddressReconnect()197   @Test public void singleAddressReconnect() {
198     SocketAddress addr = mock(SocketAddress.class);
199     createInternalSubchannel(addr);
200     assertEquals(IDLE, internalSubchannel.getState());
201 
202     // Invocation counters
203     int transportsCreated = 0;
204     int backoff1Consulted = 0;
205     int backoff2Consulted = 0;
206     int backoffReset = 0;
207 
208     // First attempt
209     assertEquals(IDLE, internalSubchannel.getState());
210     assertNoCallbackInvoke();
211     assertNull(internalSubchannel.obtainActiveTransport());
212     assertExactCallbackInvokes("onStateChange:CONNECTING");
213     assertEquals(CONNECTING, internalSubchannel.getState());
214     verify(mockTransportFactory, times(++transportsCreated))
215         .newClientTransport(
216             eq(addr),
217             eq(createClientTransportOptions()),
218             isA(TransportLogger.class));
219 
220     // Fail this one. Because there is only one address to try, enter TRANSIENT_FAILURE.
221     assertNoCallbackInvoke();
222     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
223     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
224     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
225     // Backoff reset and using first back-off value interval
226     verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos();
227     verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
228 
229     // Second attempt
230     // Transport creation doesn't happen until time is due
231     fakeClock.forwardNanos(9);
232     assertNull(internalSubchannel.obtainActiveTransport());
233     verify(mockTransportFactory, times(transportsCreated))
234         .newClientTransport(
235             eq(addr),
236             eq(createClientTransportOptions()),
237             isA(TransportLogger.class));
238     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
239 
240     assertNoCallbackInvoke();
241     fakeClock.forwardNanos(1);
242     assertExactCallbackInvokes("onStateChange:CONNECTING");
243     assertEquals(CONNECTING, internalSubchannel.getState());
244     verify(mockTransportFactory, times(++transportsCreated))
245         .newClientTransport(
246             eq(addr),
247             eq(createClientTransportOptions()),
248             isA(TransportLogger.class));
249     // Fail this one too
250     assertNoCallbackInvoke();
251     // Here we use a different status from the first failure, and verify that it's passed to
252     // the callback.
253     transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED);
254     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
255     assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE);
256     // Second back-off interval
257     verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos();
258     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
259 
260     // Third attempt
261     // Transport creation doesn't happen until time is due
262     fakeClock.forwardNanos(99);
263     assertNull(internalSubchannel.obtainActiveTransport());
264     verify(mockTransportFactory, times(transportsCreated))
265         .newClientTransport(
266             eq(addr),
267             eq(createClientTransportOptions()),
268             isA(TransportLogger.class));
269     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
270     assertNoCallbackInvoke();
271     fakeClock.forwardNanos(1);
272     assertEquals(CONNECTING, internalSubchannel.getState());
273     assertExactCallbackInvokes("onStateChange:CONNECTING");
274     assertNull(internalSubchannel.obtainActiveTransport());
275     verify(mockTransportFactory, times(++transportsCreated))
276         .newClientTransport(
277             eq(addr),
278             eq(createClientTransportOptions()),
279             isA(TransportLogger.class));
280     // Let this one succeed, will enter READY state.
281     assertNoCallbackInvoke();
282     transports.peek().listener.transportReady();
283     assertExactCallbackInvokes("onStateChange:READY");
284     assertEquals(READY, internalSubchannel.getState());
285     assertSame(
286         transports.peek().transport,
287         ((CallTracingTransport) internalSubchannel.obtainActiveTransport()).delegate());
288 
289     // Close the READY transport, will enter IDLE state.
290     assertNoCallbackInvoke();
291     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
292     assertEquals(IDLE, internalSubchannel.getState());
293     assertExactCallbackInvokes("onStateChange:IDLE");
294 
295     // Back-off is reset, and the next attempt will happen immediately
296     assertNull(internalSubchannel.obtainActiveTransport());
297     assertEquals(CONNECTING, internalSubchannel.getState());
298     assertExactCallbackInvokes("onStateChange:CONNECTING");
299     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
300     verify(mockTransportFactory, times(++transportsCreated))
301         .newClientTransport(
302             eq(addr),
303             eq(createClientTransportOptions()),
304             isA(TransportLogger.class));
305 
306     // Final checks for consultations on back-off policies
307     verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos();
308     verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffNanos();
309   }
310 
twoAddressesReconnect()311   @Test public void twoAddressesReconnect() {
312     SocketAddress addr1 = mock(SocketAddress.class);
313     SocketAddress addr2 = mock(SocketAddress.class);
314     createInternalSubchannel(addr1, addr2);
315     assertEquals(IDLE, internalSubchannel.getState());
316     // Invocation counters
317     int transportsAddr1 = 0;
318     int transportsAddr2 = 0;
319     int backoff1Consulted = 0;
320     int backoff2Consulted = 0;
321     int backoff3Consulted = 0;
322     int backoffReset = 0;
323 
324     // First attempt
325     assertNoCallbackInvoke();
326     assertNull(internalSubchannel.obtainActiveTransport());
327     assertExactCallbackInvokes("onStateChange:CONNECTING");
328     assertEquals(CONNECTING, internalSubchannel.getState());
329     verify(mockTransportFactory, times(++transportsAddr1))
330         .newClientTransport(
331             eq(addr1),
332             eq(createClientTransportOptions()),
333             isA(TransportLogger.class));
334 
335     // Let this one fail without success
336     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
337     // Still in CONNECTING
338     assertNull(internalSubchannel.obtainActiveTransport());
339     assertNoCallbackInvoke();
340     assertEquals(CONNECTING, internalSubchannel.getState());
341 
342     // Second attempt will start immediately. Still no back-off policy.
343     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
344     verify(mockTransportFactory, times(++transportsAddr2))
345         .newClientTransport(
346             eq(addr2),
347             eq(createClientTransportOptions()),
348             isA(TransportLogger.class));
349     assertNull(internalSubchannel.obtainActiveTransport());
350     // Fail this one too
351     assertNoCallbackInvoke();
352     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
353     // All addresses have failed. Delayed transport will be in back-off interval.
354     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
355     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
356     // Backoff reset and first back-off interval begins
357     verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos();
358     verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
359 
360     // No reconnect during TRANSIENT_FAILURE even when requested.
361     assertNull(internalSubchannel.obtainActiveTransport());
362     assertNoCallbackInvoke();
363     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
364 
365     // Third attempt is the first address, thus controlled by the first back-off interval.
366     fakeClock.forwardNanos(9);
367     verify(mockTransportFactory, times(transportsAddr1))
368         .newClientTransport(
369             eq(addr1),
370             eq(createClientTransportOptions()),
371             isA(TransportLogger.class));
372     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
373     assertNoCallbackInvoke();
374     fakeClock.forwardNanos(1);
375     assertExactCallbackInvokes("onStateChange:CONNECTING");
376     assertEquals(CONNECTING, internalSubchannel.getState());
377     verify(mockTransportFactory, times(++transportsAddr1))
378         .newClientTransport(
379             eq(addr1),
380             eq(createClientTransportOptions()),
381             isA(TransportLogger.class));
382     // Fail this one too
383     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
384     assertEquals(CONNECTING, internalSubchannel.getState());
385 
386     // Forth attempt will start immediately. Keep back-off policy.
387     assertNull(internalSubchannel.obtainActiveTransport());
388     assertEquals(CONNECTING, internalSubchannel.getState());
389     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
390     verify(mockTransportFactory, times(++transportsAddr2))
391         .newClientTransport(
392             eq(addr2),
393             eq(createClientTransportOptions()),
394             isA(TransportLogger.class));
395     // Fail this one too
396     assertNoCallbackInvoke();
397     transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED);
398     // All addresses have failed again. Delayed transport will be in back-off interval.
399     assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE);
400     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
401     // Second back-off interval begins
402     verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos();
403     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
404 
405     // Fifth attempt for the first address, thus controlled by the second back-off interval.
406     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
407     fakeClock.forwardNanos(99);
408     verify(mockTransportFactory, times(transportsAddr1))
409         .newClientTransport(
410             eq(addr1),
411             eq(createClientTransportOptions()),
412             isA(TransportLogger.class));
413     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
414     assertNoCallbackInvoke();
415     fakeClock.forwardNanos(1);
416     assertExactCallbackInvokes("onStateChange:CONNECTING");
417     assertEquals(CONNECTING, internalSubchannel.getState());
418     verify(mockTransportFactory, times(++transportsAddr1))
419         .newClientTransport(
420             eq(addr1),
421             eq(createClientTransportOptions()),
422             isA(TransportLogger.class));
423     // Let it through
424     assertNoCallbackInvoke();
425     transports.peek().listener.transportReady();
426     assertExactCallbackInvokes("onStateChange:READY");
427     assertEquals(READY, internalSubchannel.getState());
428 
429     assertSame(
430         transports.peek().transport,
431         ((CallTracingTransport) internalSubchannel.obtainActiveTransport()).delegate());
432     // Then close it.
433     assertNoCallbackInvoke();
434     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
435     assertExactCallbackInvokes("onStateChange:IDLE");
436     assertEquals(IDLE, internalSubchannel.getState());
437 
438     // First attempt after a successful connection. Old back-off policy should be ignored, but there
439     // is not yet a need for a new one. Start from the first address.
440     assertNull(internalSubchannel.obtainActiveTransport());
441     assertEquals(CONNECTING, internalSubchannel.getState());
442     assertExactCallbackInvokes("onStateChange:CONNECTING");
443     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
444     verify(mockTransportFactory, times(++transportsAddr1))
445         .newClientTransport(
446             eq(addr1),
447             eq(createClientTransportOptions()),
448             isA(TransportLogger.class));
449     // Fail the transport
450     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
451     assertEquals(CONNECTING, internalSubchannel.getState());
452 
453     // Second attempt will start immediately. Still no new back-off policy.
454     verify(mockBackoffPolicyProvider, times(backoffReset)).get();
455     verify(mockTransportFactory, times(++transportsAddr2))
456         .newClientTransport(
457             eq(addr2),
458             eq(createClientTransportOptions()),
459             isA(TransportLogger.class));
460     // Fail this one too
461     assertEquals(CONNECTING, internalSubchannel.getState());
462     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
463     // All addresses have failed. Enter TRANSIENT_FAILURE. Back-off in effect.
464     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
465     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
466     // Back-off reset and first back-off interval begins
467     verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffNanos();
468     verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
469 
470     // Third attempt is the first address, thus controlled by the first back-off interval.
471     fakeClock.forwardNanos(9);
472     verify(mockTransportFactory, times(transportsAddr1))
473         .newClientTransport(
474             eq(addr1),
475             eq(createClientTransportOptions()),
476             isA(TransportLogger.class));
477     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
478     assertNoCallbackInvoke();
479     fakeClock.forwardNanos(1);
480     assertExactCallbackInvokes("onStateChange:CONNECTING");
481     assertEquals(CONNECTING, internalSubchannel.getState());
482     verify(mockTransportFactory, times(++transportsAddr1))
483         .newClientTransport(
484             eq(addr1),
485             eq(createClientTransportOptions()),
486             isA(TransportLogger.class));
487 
488     // Final checks on invocations on back-off policies
489     verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos();
490     verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffNanos();
491     verify(mockBackoffPolicy3, times(backoff3Consulted)).nextBackoffNanos();
492   }
493 
494   @Test
updateAddresses_emptyEagList_throws()495   public void updateAddresses_emptyEagList_throws() {
496     SocketAddress addr = new FakeSocketAddress();
497     createInternalSubchannel(addr);
498     thrown.expect(IllegalArgumentException.class);
499     internalSubchannel.updateAddresses(Arrays.<EquivalentAddressGroup>asList());
500   }
501 
502   @Test
updateAddresses_eagListWithNull_throws()503   public void updateAddresses_eagListWithNull_throws() {
504     SocketAddress addr = new FakeSocketAddress();
505     createInternalSubchannel(addr);
506     List<EquivalentAddressGroup> eags = Arrays.asList((EquivalentAddressGroup) null);
507     thrown.expect(NullPointerException.class);
508     internalSubchannel.updateAddresses(eags);
509   }
510 
updateAddresses_intersecting_ready()511   @Test public void updateAddresses_intersecting_ready() {
512     SocketAddress addr1 = mock(SocketAddress.class);
513     SocketAddress addr2 = mock(SocketAddress.class);
514     SocketAddress addr3 = mock(SocketAddress.class);
515     createInternalSubchannel(addr1, addr2);
516     assertEquals(IDLE, internalSubchannel.getState());
517 
518     // First address fails
519     assertNull(internalSubchannel.obtainActiveTransport());
520     assertExactCallbackInvokes("onStateChange:CONNECTING");
521     verify(mockTransportFactory)
522         .newClientTransport(
523             eq(addr1),
524             eq(createClientTransportOptions()),
525             isA(TransportLogger.class));
526     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
527     assertEquals(CONNECTING, internalSubchannel.getState());
528 
529     // Second address connects
530     verify(mockTransportFactory)
531         .newClientTransport(
532             eq(addr2),
533             eq(createClientTransportOptions()),
534             isA(TransportLogger.class));
535     transports.peek().listener.transportReady();
536     assertExactCallbackInvokes("onStateChange:READY");
537     assertEquals(READY, internalSubchannel.getState());
538 
539     // Update addresses
540     internalSubchannel.updateAddresses(
541         Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
542     assertNoCallbackInvoke();
543     assertEquals(READY, internalSubchannel.getState());
544     verify(transports.peek().transport, never()).shutdown(any(Status.class));
545     verify(transports.peek().transport, never()).shutdownNow(any(Status.class));
546 
547     // And new addresses chosen when re-connecting
548     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
549     assertExactCallbackInvokes("onStateChange:IDLE");
550 
551     assertNull(internalSubchannel.obtainActiveTransport());
552     assertEquals(0, fakeClock.numPendingTasks());
553     verify(mockTransportFactory, times(2))
554         .newClientTransport(
555             eq(addr2),
556             eq(createClientTransportOptions()),
557             isA(TransportLogger.class));
558     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
559     verify(mockTransportFactory)
560         .newClientTransport(
561             eq(addr3),
562             eq(createClientTransportOptions()),
563             isA(TransportLogger.class));
564     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
565     verifyNoMoreInteractions(mockTransportFactory);
566 
567     fakeClock.forwardNanos(10); // Drain retry, but don't care about result
568   }
569 
updateAddresses_intersecting_connecting()570   @Test public void updateAddresses_intersecting_connecting() {
571     SocketAddress addr1 = mock(SocketAddress.class);
572     SocketAddress addr2 = mock(SocketAddress.class);
573     SocketAddress addr3 = mock(SocketAddress.class);
574     createInternalSubchannel(addr1, addr2);
575     assertEquals(IDLE, internalSubchannel.getState());
576 
577     // First address fails
578     assertNull(internalSubchannel.obtainActiveTransport());
579     assertExactCallbackInvokes("onStateChange:CONNECTING");
580     verify(mockTransportFactory)
581         .newClientTransport(
582             eq(addr1),
583             eq(createClientTransportOptions()),
584             isA(TransportLogger.class));
585     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
586     assertEquals(CONNECTING, internalSubchannel.getState());
587 
588     // Second address connecting
589     verify(mockTransportFactory)
590         .newClientTransport(
591             eq(addr2),
592             eq(createClientTransportOptions()),
593             isA(TransportLogger.class));
594     assertNoCallbackInvoke();
595     assertEquals(CONNECTING, internalSubchannel.getState());
596 
597     // Update addresses
598     internalSubchannel.updateAddresses(
599         Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
600     assertNoCallbackInvoke();
601     assertEquals(CONNECTING, internalSubchannel.getState());
602     verify(transports.peek().transport, never()).shutdown(any(Status.class));
603     verify(transports.peek().transport, never()).shutdownNow(any(Status.class));
604 
605     // And new addresses chosen when re-connecting
606     transports.peek().listener.transportReady();
607     assertExactCallbackInvokes("onStateChange:READY");
608     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
609     assertExactCallbackInvokes("onStateChange:IDLE");
610 
611     assertNull(internalSubchannel.obtainActiveTransport());
612     assertEquals(0, fakeClock.numPendingTasks());
613     verify(mockTransportFactory, times(2))
614         .newClientTransport(
615             eq(addr2),
616             eq(createClientTransportOptions()),
617             isA(TransportLogger.class));
618     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
619     verify(mockTransportFactory)
620         .newClientTransport(
621             eq(addr3),
622             eq(createClientTransportOptions()),
623             isA(TransportLogger.class));
624     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
625     verifyNoMoreInteractions(mockTransportFactory);
626 
627     fakeClock.forwardNanos(10); // Drain retry, but don't care about result
628   }
629 
updateAddresses_disjoint_idle()630   @Test public void updateAddresses_disjoint_idle() {
631     SocketAddress addr1 = mock(SocketAddress.class);
632     SocketAddress addr2 = mock(SocketAddress.class);
633 
634     createInternalSubchannel(addr1);
635     internalSubchannel.updateAddresses(Arrays.asList(new EquivalentAddressGroup(addr2)));
636 
637     // Nothing happened on address update
638     verify(mockTransportFactory, never())
639         .newClientTransport(
640             eq(addr1),
641             eq(createClientTransportOptions()),
642             isA(TransportLogger.class));
643     verify(mockTransportFactory, never())
644         .newClientTransport(
645             eq(addr2),
646             eq(createClientTransportOptions()),
647             isA(TransportLogger.class));
648     verifyNoMoreInteractions(mockTransportFactory);
649     assertNoCallbackInvoke();
650     assertEquals(IDLE, internalSubchannel.getState());
651 
652     // But new address chosen when connecting
653     assertNull(internalSubchannel.obtainActiveTransport());
654     assertExactCallbackInvokes("onStateChange:CONNECTING");
655     verify(mockTransportFactory)
656         .newClientTransport(
657             eq(addr2),
658             eq(createClientTransportOptions()),
659             isA(TransportLogger.class));
660 
661     // And no other addresses attempted
662     assertEquals(0, fakeClock.numPendingTasks());
663     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
664     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
665     assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
666     verifyNoMoreInteractions(mockTransportFactory);
667 
668     fakeClock.forwardNanos(10); // Drain retry, but don't care about result
669   }
670 
updateAddresses_disjoint_ready()671   @Test public void updateAddresses_disjoint_ready() {
672     SocketAddress addr1 = mock(SocketAddress.class);
673     SocketAddress addr2 = mock(SocketAddress.class);
674     SocketAddress addr3 = mock(SocketAddress.class);
675     SocketAddress addr4 = mock(SocketAddress.class);
676     createInternalSubchannel(addr1, addr2);
677     assertEquals(IDLE, internalSubchannel.getState());
678 
679     // First address fails
680     assertNull(internalSubchannel.obtainActiveTransport());
681     assertExactCallbackInvokes("onStateChange:CONNECTING");
682     verify(mockTransportFactory)
683         .newClientTransport(
684             eq(addr1),
685             eq(createClientTransportOptions()),
686             isA(TransportLogger.class));
687     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
688     assertEquals(CONNECTING, internalSubchannel.getState());
689 
690     // Second address connects
691     verify(mockTransportFactory)
692         .newClientTransport(
693             eq(addr2),
694             eq(createClientTransportOptions()),
695             isA(TransportLogger.class));
696     transports.peek().listener.transportReady();
697     assertExactCallbackInvokes("onStateChange:READY");
698     assertEquals(READY, internalSubchannel.getState());
699 
700     // Update addresses
701     internalSubchannel.updateAddresses(
702         Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4))));
703     assertExactCallbackInvokes("onStateChange:IDLE");
704     assertEquals(IDLE, internalSubchannel.getState());
705     verify(transports.peek().transport, never()).shutdown(any(Status.class));
706     fakeClock.forwardNanos(
707         TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS));
708     verify(transports.peek().transport).shutdown(any(Status.class));
709 
710     // And new addresses chosen when re-connecting
711     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
712     assertNoCallbackInvoke();
713     assertEquals(IDLE, internalSubchannel.getState());
714 
715     assertNull(internalSubchannel.obtainActiveTransport());
716     assertEquals(0, fakeClock.numPendingTasks());
717     verify(mockTransportFactory)
718         .newClientTransport(
719             eq(addr3),
720             eq(createClientTransportOptions()),
721             isA(TransportLogger.class));
722     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
723     verify(mockTransportFactory)
724         .newClientTransport(
725             eq(addr4),
726             eq(createClientTransportOptions()),
727             isA(TransportLogger.class));
728     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
729     verifyNoMoreInteractions(mockTransportFactory);
730 
731     fakeClock.forwardNanos(10); // Drain retry, but don't care about result
732   }
733 
updateAddresses_disjoint_connecting()734   @Test public void updateAddresses_disjoint_connecting() {
735     SocketAddress addr1 = mock(SocketAddress.class);
736     SocketAddress addr2 = mock(SocketAddress.class);
737     SocketAddress addr3 = mock(SocketAddress.class);
738     SocketAddress addr4 = mock(SocketAddress.class);
739     createInternalSubchannel(addr1, addr2);
740     assertEquals(IDLE, internalSubchannel.getState());
741 
742     // First address fails
743     assertNull(internalSubchannel.obtainActiveTransport());
744     assertExactCallbackInvokes("onStateChange:CONNECTING");
745     verify(mockTransportFactory)
746         .newClientTransport(
747             eq(addr1),
748             eq(createClientTransportOptions()),
749             isA(TransportLogger.class));
750     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
751     assertEquals(CONNECTING, internalSubchannel.getState());
752 
753     // Second address connecting
754     verify(mockTransportFactory)
755         .newClientTransport(
756             eq(addr2),
757             eq(createClientTransportOptions()),
758             isA(TransportLogger.class));
759     assertNoCallbackInvoke();
760     assertEquals(CONNECTING, internalSubchannel.getState());
761 
762     // Update addresses
763     internalSubchannel.updateAddresses(
764         Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4))));
765     assertNoCallbackInvoke();
766     assertEquals(CONNECTING, internalSubchannel.getState());
767 
768     // And new addresses chosen immediately
769     verify(transports.poll().transport).shutdown(any(Status.class));
770     assertNoCallbackInvoke();
771     assertEquals(CONNECTING, internalSubchannel.getState());
772 
773     assertNull(internalSubchannel.obtainActiveTransport());
774     assertEquals(0, fakeClock.numPendingTasks());
775     verify(mockTransportFactory)
776         .newClientTransport(
777             eq(addr3),
778             eq(createClientTransportOptions()),
779             isA(TransportLogger.class));
780     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
781     verify(mockTransportFactory)
782         .newClientTransport(
783             eq(addr4),
784             eq(createClientTransportOptions()),
785             isA(TransportLogger.class));
786     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
787     verifyNoMoreInteractions(mockTransportFactory);
788 
789     fakeClock.forwardNanos(10); // Drain retry, but don't care about result
790   }
791 
updateAddresses_disjoint_readyTwice()792   @Test public void updateAddresses_disjoint_readyTwice() {
793     SocketAddress addr1 = mock(SocketAddress.class);
794     createInternalSubchannel(addr1);
795     assertEquals(IDLE, internalSubchannel.getState());
796 
797     // Address connects
798     assertNull(internalSubchannel.obtainActiveTransport());
799     assertExactCallbackInvokes("onStateChange:CONNECTING");
800     verify(mockTransportFactory)
801         .newClientTransport(
802             eq(addr1),
803             eq(createClientTransportOptions()),
804             isA(TransportLogger.class));
805     transports.peek().listener.transportReady();
806     assertExactCallbackInvokes("onStateChange:READY");
807     assertEquals(READY, internalSubchannel.getState());
808 
809     // Update addresses
810     SocketAddress addr2 = mock(SocketAddress.class);
811     internalSubchannel.updateAddresses(
812         Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2))));
813     assertExactCallbackInvokes("onStateChange:IDLE");
814     assertEquals(IDLE, internalSubchannel.getState());
815     ConnectionClientTransport firstTransport = transports.poll().transport;
816     verify(firstTransport, never()).shutdown(any(Status.class));
817 
818     // Address connects
819     assertNull(internalSubchannel.obtainActiveTransport());
820     assertExactCallbackInvokes("onStateChange:CONNECTING");
821     verify(mockTransportFactory)
822         .newClientTransport(
823             eq(addr2),
824             eq(createClientTransportOptions()),
825             isA(TransportLogger.class));
826     transports.peek().listener.transportReady();
827     assertExactCallbackInvokes("onStateChange:READY");
828     assertEquals(READY, internalSubchannel.getState());
829 
830     // Update addresses
831     SocketAddress addr3 = mock(SocketAddress.class);
832     internalSubchannel.updateAddresses(
833         Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3))));
834     assertExactCallbackInvokes("onStateChange:IDLE");
835     assertEquals(IDLE, internalSubchannel.getState());
836     // Earlier transport is shutdown eagerly
837     verify(firstTransport).shutdown(any(Status.class));
838     ConnectionClientTransport secondTransport = transports.peek().transport;
839     verify(secondTransport, never()).shutdown(any(Status.class));
840 
841     internalSubchannel.shutdown(SHUTDOWN_REASON);
842     verify(secondTransport).shutdown(any(Status.class));
843   }
844 
845   @Test
connectIsLazy()846   public void connectIsLazy() {
847     SocketAddress addr = mock(SocketAddress.class);
848     createInternalSubchannel(addr);
849 
850     // Invocation counters
851     int transportsCreated = 0;
852 
853     // Won't connect until requested
854     verify(mockTransportFactory, times(transportsCreated))
855         .newClientTransport(
856             eq(addr),
857             eq(createClientTransportOptions()),
858             isA(TransportLogger.class));
859 
860     // First attempt
861     internalSubchannel.obtainActiveTransport();
862     assertExactCallbackInvokes("onStateChange:CONNECTING");
863     verify(mockTransportFactory, times(++transportsCreated))
864         .newClientTransport(
865             eq(addr),
866             eq(createClientTransportOptions()),
867             isA(TransportLogger.class));
868 
869     // Fail this one
870     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
871     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
872 
873     // Will always reconnect after back-off
874     fakeClock.forwardNanos(10);
875     assertExactCallbackInvokes("onStateChange:CONNECTING");
876     verify(mockTransportFactory, times(++transportsCreated))
877         .newClientTransport(
878             eq(addr),
879             eq(createClientTransportOptions()),
880             isA(TransportLogger.class));
881 
882     // Make this one proceed
883     transports.peek().listener.transportReady();
884     assertExactCallbackInvokes("onStateChange:READY");
885     // Then go-away
886     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
887     assertExactCallbackInvokes("onStateChange:IDLE");
888 
889     // No scheduled tasks that would ever try to reconnect ...
890     assertEquals(0, fakeClock.numPendingTasks());
891     assertEquals(0, fakeExecutor.numPendingTasks());
892 
893     // ... until it's requested.
894     internalSubchannel.obtainActiveTransport();
895     assertExactCallbackInvokes("onStateChange:CONNECTING");
896     verify(mockTransportFactory, times(++transportsCreated))
897         .newClientTransport(
898             eq(addr),
899             eq(createClientTransportOptions()),
900             isA(TransportLogger.class));
901   }
902 
903   @Test
shutdownWhenReady()904   public void shutdownWhenReady() throws Exception {
905     SocketAddress addr = mock(SocketAddress.class);
906     createInternalSubchannel(addr);
907 
908     internalSubchannel.obtainActiveTransport();
909     MockClientTransportInfo transportInfo = transports.poll();
910     transportInfo.listener.transportReady();
911     assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
912 
913     internalSubchannel.shutdown(SHUTDOWN_REASON);
914     verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON));
915     assertExactCallbackInvokes("onStateChange:SHUTDOWN");
916     transportInfo.listener.transportShutdown(SHUTDOWN_REASON);
917 
918     transportInfo.listener.transportTerminated();
919     assertExactCallbackInvokes("onTerminated");
920     verify(transportInfo.transport, never()).shutdownNow(any(Status.class));
921   }
922 
923   @Test
shutdownBeforeTransportCreated()924   public void shutdownBeforeTransportCreated() throws Exception {
925     SocketAddress addr = mock(SocketAddress.class);
926     createInternalSubchannel(addr);
927 
928     // First transport is created immediately
929     internalSubchannel.obtainActiveTransport();
930     assertExactCallbackInvokes("onStateChange:CONNECTING");
931     verify(mockTransportFactory)
932         .newClientTransport(
933             eq(addr),
934             eq(createClientTransportOptions()),
935             isA(TransportLogger.class));
936 
937     // Fail this one
938     MockClientTransportInfo transportInfo = transports.poll();
939     transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
940     transportInfo.listener.transportTerminated();
941 
942     // Entering TRANSIENT_FAILURE, waiting for back-off
943     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
944 
945     // Save the reconnectTask before shutting down
946     FakeClock.ScheduledTask reconnectTask = null;
947     for (FakeClock.ScheduledTask task : fakeClock.getPendingTasks()) {
948       if (task.command.toString().contains("EndOfCurrentBackoff")) {
949         assertNull("There shouldn't be more than one reconnectTask", reconnectTask);
950         assertFalse(task.isDone());
951         reconnectTask = task;
952       }
953     }
954     assertNotNull("There should be at least one reconnectTask", reconnectTask);
955 
956     // Shut down InternalSubchannel before the transport is created.
957     internalSubchannel.shutdown(SHUTDOWN_REASON);
958     assertTrue(reconnectTask.isCancelled());
959     // InternalSubchannel terminated promptly.
960     assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated");
961 
962     // Simulate a race between reconnectTask cancellation and execution -- the task runs anyway.
963     // This should not lead to the creation of a new transport.
964     reconnectTask.command.run();
965 
966     // Futher call to obtainActiveTransport() is no-op.
967     assertNull(internalSubchannel.obtainActiveTransport());
968     assertEquals(SHUTDOWN, internalSubchannel.getState());
969     assertNoCallbackInvoke();
970 
971     // No more transports will be created.
972     fakeClock.forwardNanos(10000);
973     assertEquals(SHUTDOWN, internalSubchannel.getState());
974     verifyNoMoreInteractions(mockTransportFactory);
975     assertEquals(0, transports.size());
976     assertNoCallbackInvoke();
977   }
978 
979   @Test
shutdownBeforeTransportReady()980   public void shutdownBeforeTransportReady() throws Exception {
981     SocketAddress addr = mock(SocketAddress.class);
982     createInternalSubchannel(addr);
983 
984     internalSubchannel.obtainActiveTransport();
985     assertExactCallbackInvokes("onStateChange:CONNECTING");
986     MockClientTransportInfo transportInfo = transports.poll();
987 
988     // Shutdown the InternalSubchannel before the pending transport is ready
989     assertNull(internalSubchannel.obtainActiveTransport());
990     internalSubchannel.shutdown(SHUTDOWN_REASON);
991     assertExactCallbackInvokes("onStateChange:SHUTDOWN");
992 
993     // The transport should've been shut down even though it's not the active transport yet.
994     verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON));
995     transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
996     assertNoCallbackInvoke();
997     transportInfo.listener.transportTerminated();
998     assertExactCallbackInvokes("onTerminated");
999     assertEquals(SHUTDOWN, internalSubchannel.getState());
1000   }
1001 
1002   @Test
shutdownNow()1003   public void shutdownNow() throws Exception {
1004     SocketAddress addr = mock(SocketAddress.class);
1005     createInternalSubchannel(addr);
1006 
1007     internalSubchannel.obtainActiveTransport();
1008     MockClientTransportInfo t1 = transports.poll();
1009     t1.listener.transportReady();
1010     assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
1011     t1.listener.transportShutdown(Status.UNAVAILABLE);
1012     assertExactCallbackInvokes("onStateChange:IDLE");
1013 
1014     internalSubchannel.obtainActiveTransport();
1015     assertExactCallbackInvokes("onStateChange:CONNECTING");
1016     MockClientTransportInfo t2 = transports.poll();
1017 
1018     Status status = Status.UNAVAILABLE.withDescription("Requested");
1019     internalSubchannel.shutdownNow(status);
1020 
1021     verify(t1.transport).shutdownNow(same(status));
1022     verify(t2.transport).shutdownNow(same(status));
1023     assertExactCallbackInvokes("onStateChange:SHUTDOWN");
1024   }
1025 
1026   @Test
obtainTransportAfterShutdown()1027   public void obtainTransportAfterShutdown() throws Exception {
1028     SocketAddress addr = mock(SocketAddress.class);
1029     createInternalSubchannel(addr);
1030 
1031     internalSubchannel.shutdown(SHUTDOWN_REASON);
1032     assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated");
1033     assertEquals(SHUTDOWN, internalSubchannel.getState());
1034     assertNull(internalSubchannel.obtainActiveTransport());
1035     verify(mockTransportFactory, times(0))
1036         .newClientTransport(
1037             addr,
1038             createClientTransportOptions(),
1039             internalSubchannel.getChannelLogger());
1040     assertNoCallbackInvoke();
1041     assertEquals(SHUTDOWN, internalSubchannel.getState());
1042   }
1043 
1044   @Test
logId()1045   public void logId() {
1046     createInternalSubchannel(mock(SocketAddress.class));
1047 
1048     assertNotNull(internalSubchannel.getLogId());
1049   }
1050 
1051   @Test
inUseState()1052   public void inUseState() {
1053     SocketAddress addr = mock(SocketAddress.class);
1054     createInternalSubchannel(addr);
1055 
1056     internalSubchannel.obtainActiveTransport();
1057     MockClientTransportInfo t0 = transports.poll();
1058     t0.listener.transportReady();
1059     assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
1060     t0.listener.transportInUse(true);
1061     assertExactCallbackInvokes("onInUse");
1062 
1063     t0.listener.transportInUse(false);
1064     assertExactCallbackInvokes("onNotInUse");
1065 
1066     t0.listener.transportInUse(true);
1067     assertExactCallbackInvokes("onInUse");
1068     t0.listener.transportShutdown(Status.UNAVAILABLE);
1069     assertExactCallbackInvokes("onStateChange:IDLE");
1070 
1071     assertNull(internalSubchannel.obtainActiveTransport());
1072     MockClientTransportInfo t1 = transports.poll();
1073     t1.listener.transportReady();
1074     assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
1075     t1.listener.transportInUse(true);
1076     // InternalSubchannel is already in-use, thus doesn't call the callback
1077     assertNoCallbackInvoke();
1078 
1079     t1.listener.transportInUse(false);
1080     // t0 is still in-use
1081     assertNoCallbackInvoke();
1082 
1083     t0.listener.transportInUse(false);
1084     assertExactCallbackInvokes("onNotInUse");
1085   }
1086 
1087   @Test
transportTerminateWithoutExitingInUse()1088   public void transportTerminateWithoutExitingInUse() {
1089     // An imperfect transport that terminates without going out of in-use. InternalSubchannel will
1090     // clear the in-use bit for it.
1091     SocketAddress addr = mock(SocketAddress.class);
1092     createInternalSubchannel(addr);
1093 
1094     internalSubchannel.obtainActiveTransport();
1095     MockClientTransportInfo t0 = transports.poll();
1096     t0.listener.transportReady();
1097     assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY");
1098     t0.listener.transportInUse(true);
1099     assertExactCallbackInvokes("onInUse");
1100 
1101     t0.listener.transportShutdown(Status.UNAVAILABLE);
1102     assertExactCallbackInvokes("onStateChange:IDLE");
1103     t0.listener.transportTerminated();
1104     assertExactCallbackInvokes("onNotInUse");
1105   }
1106 
1107   @Test
transportStartReturnsRunnable()1108   public void transportStartReturnsRunnable() {
1109     SocketAddress addr1 = mock(SocketAddress.class);
1110     SocketAddress addr2 = mock(SocketAddress.class);
1111     createInternalSubchannel(addr1, addr2);
1112     final AtomicInteger runnableInvokes = new AtomicInteger(0);
1113     Runnable startRunnable = new Runnable() {
1114         @Override
1115         public void run() {
1116           runnableInvokes.incrementAndGet();
1117         }
1118       };
1119     transports = TestUtils.captureTransports(mockTransportFactory, startRunnable);
1120 
1121     assertEquals(0, runnableInvokes.get());
1122     internalSubchannel.obtainActiveTransport();
1123     assertEquals(1, runnableInvokes.get());
1124     internalSubchannel.obtainActiveTransport();
1125     assertEquals(1, runnableInvokes.get());
1126 
1127     MockClientTransportInfo t0 = transports.poll();
1128     t0.listener.transportShutdown(Status.UNAVAILABLE);
1129     assertEquals(2, runnableInvokes.get());
1130 
1131     // 2nd address: reconnect immediatly
1132     MockClientTransportInfo t1 = transports.poll();
1133     t1.listener.transportShutdown(Status.UNAVAILABLE);
1134 
1135     // Addresses exhausted, waiting for back-off.
1136     assertEquals(2, runnableInvokes.get());
1137     // Run out the back-off period
1138     fakeClock.forwardNanos(10);
1139     assertEquals(3, runnableInvokes.get());
1140 
1141     // This test doesn't care about scheduled InternalSubchannel callbacks.  Clear it up so that
1142     // noMorePendingTasks() won't fail.
1143     fakeExecutor.runDueTasks();
1144     assertEquals(3, runnableInvokes.get());
1145   }
1146 
1147   @Test
resetConnectBackoff()1148   public void resetConnectBackoff() throws Exception {
1149     SocketAddress addr = mock(SocketAddress.class);
1150     createInternalSubchannel(addr);
1151 
1152     // Move into TRANSIENT_FAILURE to schedule reconnect
1153     internalSubchannel.obtainActiveTransport();
1154     assertExactCallbackInvokes("onStateChange:CONNECTING");
1155     verify(mockTransportFactory)
1156         .newClientTransport(
1157             eq(addr),
1158             eq(createClientTransportOptions()),
1159             isA(TransportLogger.class));
1160     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
1161     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
1162 
1163     // Save the reconnectTask
1164     FakeClock.ScheduledTask reconnectTask = null;
1165     for (FakeClock.ScheduledTask task : fakeClock.getPendingTasks()) {
1166       if (task.command.toString().contains("EndOfCurrentBackoff")) {
1167         assertNull("There shouldn't be more than one reconnectTask", reconnectTask);
1168         assertFalse(task.isDone());
1169         reconnectTask = task;
1170       }
1171     }
1172     assertNotNull("There should be at least one reconnectTask", reconnectTask);
1173 
1174     internalSubchannel.resetConnectBackoff();
1175 
1176     verify(mockTransportFactory, times(2))
1177         .newClientTransport(
1178             eq(addr),
1179             eq(createClientTransportOptions()),
1180             isA(TransportLogger.class));
1181     assertExactCallbackInvokes("onStateChange:CONNECTING");
1182     assertTrue(reconnectTask.isCancelled());
1183 
1184     // Simulate a race between cancel and the task scheduler. Should be a no-op.
1185     reconnectTask.command.run();
1186     assertNoCallbackInvoke();
1187     verify(mockTransportFactory, times(2))
1188         .newClientTransport(
1189             eq(addr),
1190             eq(createClientTransportOptions()),
1191             isA(TransportLogger.class));
1192     verify(mockBackoffPolicyProvider, times(1)).get();
1193 
1194     // Fail the reconnect attempt to verify that a fresh reconnect policy is generated after
1195     // invoking resetConnectBackoff()
1196     transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
1197     assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
1198     verify(mockBackoffPolicyProvider, times(2)).get();
1199     fakeClock.forwardNanos(10);
1200     assertExactCallbackInvokes("onStateChange:CONNECTING");
1201     assertEquals(CONNECTING, internalSubchannel.getState());
1202   }
1203 
1204   @Test
resetConnectBackoff_noopOnIdleTransport()1205   public void resetConnectBackoff_noopOnIdleTransport() throws Exception {
1206     SocketAddress addr = mock(SocketAddress.class);
1207     createInternalSubchannel(addr);
1208     assertEquals(IDLE, internalSubchannel.getState());
1209 
1210     internalSubchannel.resetConnectBackoff();
1211 
1212     assertNoCallbackInvoke();
1213   }
1214 
1215   @Test
channelzMembership()1216   public void channelzMembership() throws Exception {
1217     SocketAddress addr1 = mock(SocketAddress.class);
1218     createInternalSubchannel(addr1);
1219     internalSubchannel.obtainActiveTransport();
1220 
1221     MockClientTransportInfo t0 = transports.poll();
1222     t0.listener.transportReady();
1223     assertTrue(channelz.containsClientSocket(t0.transport.getLogId()));
1224     t0.listener.transportShutdown(Status.RESOURCE_EXHAUSTED);
1225     t0.listener.transportTerminated();
1226     assertFalse(channelz.containsClientSocket(t0.transport.getLogId()));
1227   }
1228 
1229   @Test
channelzStatContainsTransport()1230   public void channelzStatContainsTransport() throws Exception {
1231     SocketAddress addr = new SocketAddress() {};
1232     assertThat(transports).isEmpty();
1233     createInternalSubchannel(addr);
1234     internalSubchannel.obtainActiveTransport();
1235 
1236     InternalWithLogId registeredTransport
1237         = Iterables.getOnlyElement(internalSubchannel.getStats().get().sockets);
1238     MockClientTransportInfo actualTransport = Iterables.getOnlyElement(transports);
1239     assertEquals(actualTransport.transport.getLogId(), registeredTransport.getLogId());
1240   }
1241 
index_looping()1242   @Test public void index_looping() {
1243     Attributes.Key<String> key = Attributes.Key.create("some-key");
1244     Attributes attr1 = Attributes.newBuilder().set(key, "1").build();
1245     Attributes attr2 = Attributes.newBuilder().set(key, "2").build();
1246     Attributes attr3 = Attributes.newBuilder().set(key, "3").build();
1247     SocketAddress addr1 = new FakeSocketAddress();
1248     SocketAddress addr2 = new FakeSocketAddress();
1249     SocketAddress addr3 = new FakeSocketAddress();
1250     SocketAddress addr4 = new FakeSocketAddress();
1251     SocketAddress addr5 = new FakeSocketAddress();
1252     Index index = new Index(Arrays.asList(
1253         new EquivalentAddressGroup(Arrays.asList(addr1, addr2), attr1),
1254         new EquivalentAddressGroup(Arrays.asList(addr3), attr2),
1255         new EquivalentAddressGroup(Arrays.asList(addr4, addr5), attr3)));
1256     assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1);
1257     assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1);
1258     assertThat(index.isAtBeginning()).isTrue();
1259     assertThat(index.isValid()).isTrue();
1260 
1261     index.increment();
1262     assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2);
1263     assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1);
1264     assertThat(index.isAtBeginning()).isFalse();
1265     assertThat(index.isValid()).isTrue();
1266 
1267     index.increment();
1268     assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3);
1269     assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr2);
1270     assertThat(index.isAtBeginning()).isFalse();
1271     assertThat(index.isValid()).isTrue();
1272 
1273     index.increment();
1274     assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4);
1275     assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr3);
1276     assertThat(index.isAtBeginning()).isFalse();
1277     assertThat(index.isValid()).isTrue();
1278 
1279     index.increment();
1280     assertThat(index.getCurrentAddress()).isSameInstanceAs(addr5);
1281     assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr3);
1282     assertThat(index.isAtBeginning()).isFalse();
1283     assertThat(index.isValid()).isTrue();
1284 
1285     index.increment();
1286     assertThat(index.isAtBeginning()).isFalse();
1287     assertThat(index.isValid()).isFalse();
1288 
1289     index.reset();
1290     assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1);
1291     assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1);
1292     assertThat(index.isAtBeginning()).isTrue();
1293     assertThat(index.isValid()).isTrue();
1294 
1295     // We want to make sure both groupIndex and addressIndex are reset
1296     index.increment();
1297     index.increment();
1298     index.increment();
1299     index.increment();
1300     assertThat(index.getCurrentAddress()).isSameInstanceAs(addr5);
1301     assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr3);
1302     index.reset();
1303     assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1);
1304     assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1);
1305   }
1306 
index_updateGroups_resets()1307   @Test public void index_updateGroups_resets() {
1308     SocketAddress addr1 = new FakeSocketAddress();
1309     SocketAddress addr2 = new FakeSocketAddress();
1310     SocketAddress addr3 = new FakeSocketAddress();
1311     Index index = new Index(Arrays.asList(
1312         new EquivalentAddressGroup(Arrays.asList(addr1)),
1313         new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
1314     index.increment();
1315     index.increment();
1316     // We want to make sure both groupIndex and addressIndex are reset
1317     index.updateGroups(Arrays.asList(
1318         new EquivalentAddressGroup(Arrays.asList(addr1)),
1319         new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
1320     assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1);
1321   }
1322 
index_seekTo()1323   @Test public void index_seekTo() {
1324     SocketAddress addr1 = new FakeSocketAddress();
1325     SocketAddress addr2 = new FakeSocketAddress();
1326     SocketAddress addr3 = new FakeSocketAddress();
1327     Index index = new Index(Arrays.asList(
1328         new EquivalentAddressGroup(Arrays.asList(addr1, addr2)),
1329         new EquivalentAddressGroup(Arrays.asList(addr3))));
1330     assertThat(index.seekTo(addr3)).isTrue();
1331     assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3);
1332     assertThat(index.seekTo(addr1)).isTrue();
1333     assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1);
1334     assertThat(index.seekTo(addr2)).isTrue();
1335     assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2);
1336     index.seekTo(new FakeSocketAddress());
1337     // Failed seekTo doesn't change the index
1338     assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2);
1339   }
1340 
1341   /** Create ClientTransportOptions. Should not be reused if it may be mutated. */
createClientTransportOptions()1342   private ClientTransportFactory.ClientTransportOptions createClientTransportOptions() {
1343     return new ClientTransportFactory.ClientTransportOptions()
1344         .setAuthority(AUTHORITY)
1345         .setUserAgent(USER_AGENT);
1346   }
1347 
createInternalSubchannel(SocketAddress .... addrs)1348   private void createInternalSubchannel(SocketAddress ... addrs) {
1349     createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addrs)));
1350   }
1351 
createInternalSubchannel(EquivalentAddressGroup .... addrs)1352   private void createInternalSubchannel(EquivalentAddressGroup ... addrs) {
1353     List<EquivalentAddressGroup> addressGroups = Arrays.asList(addrs);
1354     InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY);
1355     ChannelTracer subchannelTracer = new ChannelTracer(logId, 10,
1356         fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel");
1357     internalSubchannel = new InternalSubchannel(addressGroups, AUTHORITY, USER_AGENT,
1358         mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(),
1359         fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback,
1360         channelz, CallTracer.getDefaultFactory().create(),
1361         subchannelTracer,
1362         logId,
1363         new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()));
1364   }
1365 
assertNoCallbackInvoke()1366   private void assertNoCallbackInvoke() {
1367     while (fakeExecutor.runDueTasks() > 0) {}
1368     assertEquals(0, callbackInvokes.size());
1369   }
1370 
assertExactCallbackInvokes(String .... expectedInvokes)1371   private void assertExactCallbackInvokes(String ... expectedInvokes) {
1372     assertEquals(Arrays.asList(expectedInvokes), callbackInvokes);
1373     callbackInvokes.clear();
1374   }
1375 
1376   private static class FakeSocketAddress extends SocketAddress {}
1377 }
1378