1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static io.grpc.ConnectivityState.CONNECTING; 21 import static io.grpc.ConnectivityState.IDLE; 22 import static io.grpc.ConnectivityState.READY; 23 import static io.grpc.ConnectivityState.SHUTDOWN; 24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 25 import static org.junit.Assert.assertEquals; 26 import static org.junit.Assert.assertFalse; 27 import static org.junit.Assert.assertNotNull; 28 import static org.junit.Assert.assertNull; 29 import static org.junit.Assert.assertSame; 30 import static org.junit.Assert.assertTrue; 31 import static org.mockito.ArgumentMatchers.any; 32 import static org.mockito.ArgumentMatchers.eq; 33 import static org.mockito.ArgumentMatchers.isA; 34 import static org.mockito.ArgumentMatchers.same; 35 import static org.mockito.Mockito.mock; 36 import static org.mockito.Mockito.never; 37 import static org.mockito.Mockito.times; 38 import static org.mockito.Mockito.verify; 39 import static org.mockito.Mockito.verifyNoMoreInteractions; 40 import static org.mockito.Mockito.when; 41 42 import com.google.common.collect.Iterables; 43 import io.grpc.Attributes; 44 import io.grpc.ConnectivityStateInfo; 45 import io.grpc.EquivalentAddressGroup; 46 import io.grpc.InternalChannelz; 47 import io.grpc.InternalLogId; 48 import io.grpc.InternalWithLogId; 49 import io.grpc.Status; 50 import io.grpc.SynchronizationContext; 51 import io.grpc.internal.InternalSubchannel.CallTracingTransport; 52 import io.grpc.internal.InternalSubchannel.Index; 53 import io.grpc.internal.InternalSubchannel.TransportLogger; 54 import io.grpc.internal.TestUtils.MockClientTransportInfo; 55 import java.net.SocketAddress; 56 import java.util.Arrays; 57 import java.util.LinkedList; 58 import java.util.List; 59 import java.util.concurrent.BlockingQueue; 60 import java.util.concurrent.TimeUnit; 61 import java.util.concurrent.atomic.AtomicInteger; 62 import org.junit.After; 63 import org.junit.Before; 64 import org.junit.Rule; 65 import org.junit.Test; 66 import org.junit.rules.ExpectedException; 67 import org.junit.runner.RunWith; 68 import org.junit.runners.JUnit4; 69 import org.mockito.Mock; 70 import org.mockito.junit.MockitoJUnit; 71 import org.mockito.junit.MockitoRule; 72 73 /** 74 * Unit tests for {@link InternalSubchannel}. 75 */ 76 @RunWith(JUnit4.class) 77 public class InternalSubchannelTest { 78 @Rule 79 public final MockitoRule mocks = MockitoJUnit.rule(); 80 @SuppressWarnings("deprecation") // https://github.com/grpc/grpc-java/issues/7467 81 @Rule 82 public final ExpectedException thrown = ExpectedException.none(); 83 84 private static final String AUTHORITY = "fakeauthority"; 85 private static final String USER_AGENT = "mosaic"; 86 private static final ConnectivityStateInfo UNAVAILABLE_STATE = 87 ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE); 88 private static final ConnectivityStateInfo RESOURCE_EXHAUSTED_STATE = 89 ConnectivityStateInfo.forTransientFailure(Status.RESOURCE_EXHAUSTED); 90 private static final Status SHUTDOWN_REASON = Status.UNAVAILABLE.withDescription("for test"); 91 92 // For scheduled executor 93 private final FakeClock fakeClock = new FakeClock(); 94 // For syncContext 95 private final FakeClock fakeExecutor = new FakeClock(); 96 private final SynchronizationContext syncContext = new SynchronizationContext( 97 new Thread.UncaughtExceptionHandler() { 98 @Override 99 public void uncaughtException(Thread t, Throwable e) { 100 throw new AssertionError(e); 101 } 102 }); 103 104 private final InternalChannelz channelz = new InternalChannelz(); 105 106 @Mock private BackoffPolicy mockBackoffPolicy1; 107 @Mock private BackoffPolicy mockBackoffPolicy2; 108 @Mock private BackoffPolicy mockBackoffPolicy3; 109 @Mock private BackoffPolicy.Provider mockBackoffPolicyProvider; 110 @Mock private ClientTransportFactory mockTransportFactory; 111 112 private final LinkedList<String> callbackInvokes = new LinkedList<>(); 113 private final InternalSubchannel.Callback mockInternalSubchannelCallback = 114 new InternalSubchannel.Callback() { 115 @Override 116 protected void onTerminated(InternalSubchannel is) { 117 assertSame(internalSubchannel, is); 118 callbackInvokes.add("onTerminated"); 119 } 120 121 @Override 122 protected void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { 123 assertSame(internalSubchannel, is); 124 callbackInvokes.add("onStateChange:" + newState); 125 } 126 127 @Override 128 protected void onInUse(InternalSubchannel is) { 129 assertSame(internalSubchannel, is); 130 callbackInvokes.add("onInUse"); 131 } 132 133 @Override 134 protected void onNotInUse(InternalSubchannel is) { 135 assertSame(internalSubchannel, is); 136 callbackInvokes.add("onNotInUse"); 137 } 138 }; 139 140 private InternalSubchannel internalSubchannel; 141 private BlockingQueue<MockClientTransportInfo> transports; 142 setUp()143 @Before public void setUp() { 144 when(mockBackoffPolicyProvider.get()) 145 .thenReturn(mockBackoffPolicy1, mockBackoffPolicy2, mockBackoffPolicy3); 146 when(mockBackoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); 147 when(mockBackoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L); 148 when(mockBackoffPolicy3.nextBackoffNanos()).thenReturn(10L, 100L); 149 transports = TestUtils.captureTransports(mockTransportFactory); 150 } 151 noMorePendingTasks()152 @After public void noMorePendingTasks() { 153 assertEquals(0, fakeClock.numPendingTasks()); 154 assertEquals(0, fakeExecutor.numPendingTasks()); 155 } 156 157 @Test(expected = IllegalArgumentException.class) constructor_emptyEagList_throws()158 public void constructor_emptyEagList_throws() { 159 createInternalSubchannel(new EquivalentAddressGroup[0]); 160 } 161 162 @Test(expected = NullPointerException.class) constructor_eagListWithNull_throws()163 public void constructor_eagListWithNull_throws() { 164 createInternalSubchannel(new EquivalentAddressGroup[] {null}); 165 } 166 eagAttribute_propagatesToTransport()167 @Test public void eagAttribute_propagatesToTransport() { 168 SocketAddress addr = new SocketAddress() {}; 169 Attributes attr = Attributes.newBuilder().set(Attributes.Key.create("some-key"), "1").build(); 170 createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addr), attr)); 171 172 // First attempt 173 assertNull(internalSubchannel.obtainActiveTransport()); 174 assertEquals(CONNECTING, internalSubchannel.getState()); 175 verify(mockTransportFactory).newClientTransport( 176 eq(addr), 177 eq(createClientTransportOptions().setEagAttributes(attr)), 178 isA(TransportLogger.class)); 179 } 180 eagAuthorityOverride_propagatesToTransport()181 @Test public void eagAuthorityOverride_propagatesToTransport() { 182 SocketAddress addr = new SocketAddress() {}; 183 String overriddenAuthority = "authority-override"; 184 Attributes attr = Attributes.newBuilder() 185 .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, overriddenAuthority).build(); 186 createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addr), attr)); 187 188 // First attempt 189 assertNull(internalSubchannel.obtainActiveTransport()); 190 assertEquals(CONNECTING, internalSubchannel.getState()); 191 verify(mockTransportFactory).newClientTransport( 192 eq(addr), 193 eq(createClientTransportOptions().setAuthority(overriddenAuthority).setEagAttributes(attr)), 194 isA(TransportLogger.class)); 195 } 196 singleAddressReconnect()197 @Test public void singleAddressReconnect() { 198 SocketAddress addr = mock(SocketAddress.class); 199 createInternalSubchannel(addr); 200 assertEquals(IDLE, internalSubchannel.getState()); 201 202 // Invocation counters 203 int transportsCreated = 0; 204 int backoff1Consulted = 0; 205 int backoff2Consulted = 0; 206 int backoffReset = 0; 207 208 // First attempt 209 assertEquals(IDLE, internalSubchannel.getState()); 210 assertNoCallbackInvoke(); 211 assertNull(internalSubchannel.obtainActiveTransport()); 212 assertExactCallbackInvokes("onStateChange:CONNECTING"); 213 assertEquals(CONNECTING, internalSubchannel.getState()); 214 verify(mockTransportFactory, times(++transportsCreated)) 215 .newClientTransport( 216 eq(addr), 217 eq(createClientTransportOptions()), 218 isA(TransportLogger.class)); 219 220 // Fail this one. Because there is only one address to try, enter TRANSIENT_FAILURE. 221 assertNoCallbackInvoke(); 222 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 223 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 224 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 225 // Backoff reset and using first back-off value interval 226 verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos(); 227 verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); 228 229 // Second attempt 230 // Transport creation doesn't happen until time is due 231 fakeClock.forwardNanos(9); 232 assertNull(internalSubchannel.obtainActiveTransport()); 233 verify(mockTransportFactory, times(transportsCreated)) 234 .newClientTransport( 235 eq(addr), 236 eq(createClientTransportOptions()), 237 isA(TransportLogger.class)); 238 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 239 240 assertNoCallbackInvoke(); 241 fakeClock.forwardNanos(1); 242 assertExactCallbackInvokes("onStateChange:CONNECTING"); 243 assertEquals(CONNECTING, internalSubchannel.getState()); 244 verify(mockTransportFactory, times(++transportsCreated)) 245 .newClientTransport( 246 eq(addr), 247 eq(createClientTransportOptions()), 248 isA(TransportLogger.class)); 249 // Fail this one too 250 assertNoCallbackInvoke(); 251 // Here we use a different status from the first failure, and verify that it's passed to 252 // the callback. 253 transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED); 254 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 255 assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE); 256 // Second back-off interval 257 verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos(); 258 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 259 260 // Third attempt 261 // Transport creation doesn't happen until time is due 262 fakeClock.forwardNanos(99); 263 assertNull(internalSubchannel.obtainActiveTransport()); 264 verify(mockTransportFactory, times(transportsCreated)) 265 .newClientTransport( 266 eq(addr), 267 eq(createClientTransportOptions()), 268 isA(TransportLogger.class)); 269 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 270 assertNoCallbackInvoke(); 271 fakeClock.forwardNanos(1); 272 assertEquals(CONNECTING, internalSubchannel.getState()); 273 assertExactCallbackInvokes("onStateChange:CONNECTING"); 274 assertNull(internalSubchannel.obtainActiveTransport()); 275 verify(mockTransportFactory, times(++transportsCreated)) 276 .newClientTransport( 277 eq(addr), 278 eq(createClientTransportOptions()), 279 isA(TransportLogger.class)); 280 // Let this one succeed, will enter READY state. 281 assertNoCallbackInvoke(); 282 transports.peek().listener.transportReady(); 283 assertExactCallbackInvokes("onStateChange:READY"); 284 assertEquals(READY, internalSubchannel.getState()); 285 assertSame( 286 transports.peek().transport, 287 ((CallTracingTransport) internalSubchannel.obtainActiveTransport()).delegate()); 288 289 // Close the READY transport, will enter IDLE state. 290 assertNoCallbackInvoke(); 291 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 292 assertEquals(IDLE, internalSubchannel.getState()); 293 assertExactCallbackInvokes("onStateChange:IDLE"); 294 295 // Back-off is reset, and the next attempt will happen immediately 296 assertNull(internalSubchannel.obtainActiveTransport()); 297 assertEquals(CONNECTING, internalSubchannel.getState()); 298 assertExactCallbackInvokes("onStateChange:CONNECTING"); 299 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 300 verify(mockTransportFactory, times(++transportsCreated)) 301 .newClientTransport( 302 eq(addr), 303 eq(createClientTransportOptions()), 304 isA(TransportLogger.class)); 305 306 // Final checks for consultations on back-off policies 307 verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos(); 308 verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffNanos(); 309 } 310 twoAddressesReconnect()311 @Test public void twoAddressesReconnect() { 312 SocketAddress addr1 = mock(SocketAddress.class); 313 SocketAddress addr2 = mock(SocketAddress.class); 314 createInternalSubchannel(addr1, addr2); 315 assertEquals(IDLE, internalSubchannel.getState()); 316 // Invocation counters 317 int transportsAddr1 = 0; 318 int transportsAddr2 = 0; 319 int backoff1Consulted = 0; 320 int backoff2Consulted = 0; 321 int backoff3Consulted = 0; 322 int backoffReset = 0; 323 324 // First attempt 325 assertNoCallbackInvoke(); 326 assertNull(internalSubchannel.obtainActiveTransport()); 327 assertExactCallbackInvokes("onStateChange:CONNECTING"); 328 assertEquals(CONNECTING, internalSubchannel.getState()); 329 verify(mockTransportFactory, times(++transportsAddr1)) 330 .newClientTransport( 331 eq(addr1), 332 eq(createClientTransportOptions()), 333 isA(TransportLogger.class)); 334 335 // Let this one fail without success 336 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 337 // Still in CONNECTING 338 assertNull(internalSubchannel.obtainActiveTransport()); 339 assertNoCallbackInvoke(); 340 assertEquals(CONNECTING, internalSubchannel.getState()); 341 342 // Second attempt will start immediately. Still no back-off policy. 343 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 344 verify(mockTransportFactory, times(++transportsAddr2)) 345 .newClientTransport( 346 eq(addr2), 347 eq(createClientTransportOptions()), 348 isA(TransportLogger.class)); 349 assertNull(internalSubchannel.obtainActiveTransport()); 350 // Fail this one too 351 assertNoCallbackInvoke(); 352 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 353 // All addresses have failed. Delayed transport will be in back-off interval. 354 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 355 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 356 // Backoff reset and first back-off interval begins 357 verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos(); 358 verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); 359 360 // No reconnect during TRANSIENT_FAILURE even when requested. 361 assertNull(internalSubchannel.obtainActiveTransport()); 362 assertNoCallbackInvoke(); 363 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 364 365 // Third attempt is the first address, thus controlled by the first back-off interval. 366 fakeClock.forwardNanos(9); 367 verify(mockTransportFactory, times(transportsAddr1)) 368 .newClientTransport( 369 eq(addr1), 370 eq(createClientTransportOptions()), 371 isA(TransportLogger.class)); 372 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 373 assertNoCallbackInvoke(); 374 fakeClock.forwardNanos(1); 375 assertExactCallbackInvokes("onStateChange:CONNECTING"); 376 assertEquals(CONNECTING, internalSubchannel.getState()); 377 verify(mockTransportFactory, times(++transportsAddr1)) 378 .newClientTransport( 379 eq(addr1), 380 eq(createClientTransportOptions()), 381 isA(TransportLogger.class)); 382 // Fail this one too 383 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 384 assertEquals(CONNECTING, internalSubchannel.getState()); 385 386 // Forth attempt will start immediately. Keep back-off policy. 387 assertNull(internalSubchannel.obtainActiveTransport()); 388 assertEquals(CONNECTING, internalSubchannel.getState()); 389 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 390 verify(mockTransportFactory, times(++transportsAddr2)) 391 .newClientTransport( 392 eq(addr2), 393 eq(createClientTransportOptions()), 394 isA(TransportLogger.class)); 395 // Fail this one too 396 assertNoCallbackInvoke(); 397 transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED); 398 // All addresses have failed again. Delayed transport will be in back-off interval. 399 assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE); 400 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 401 // Second back-off interval begins 402 verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos(); 403 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 404 405 // Fifth attempt for the first address, thus controlled by the second back-off interval. 406 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 407 fakeClock.forwardNanos(99); 408 verify(mockTransportFactory, times(transportsAddr1)) 409 .newClientTransport( 410 eq(addr1), 411 eq(createClientTransportOptions()), 412 isA(TransportLogger.class)); 413 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 414 assertNoCallbackInvoke(); 415 fakeClock.forwardNanos(1); 416 assertExactCallbackInvokes("onStateChange:CONNECTING"); 417 assertEquals(CONNECTING, internalSubchannel.getState()); 418 verify(mockTransportFactory, times(++transportsAddr1)) 419 .newClientTransport( 420 eq(addr1), 421 eq(createClientTransportOptions()), 422 isA(TransportLogger.class)); 423 // Let it through 424 assertNoCallbackInvoke(); 425 transports.peek().listener.transportReady(); 426 assertExactCallbackInvokes("onStateChange:READY"); 427 assertEquals(READY, internalSubchannel.getState()); 428 429 assertSame( 430 transports.peek().transport, 431 ((CallTracingTransport) internalSubchannel.obtainActiveTransport()).delegate()); 432 // Then close it. 433 assertNoCallbackInvoke(); 434 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 435 assertExactCallbackInvokes("onStateChange:IDLE"); 436 assertEquals(IDLE, internalSubchannel.getState()); 437 438 // First attempt after a successful connection. Old back-off policy should be ignored, but there 439 // is not yet a need for a new one. Start from the first address. 440 assertNull(internalSubchannel.obtainActiveTransport()); 441 assertEquals(CONNECTING, internalSubchannel.getState()); 442 assertExactCallbackInvokes("onStateChange:CONNECTING"); 443 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 444 verify(mockTransportFactory, times(++transportsAddr1)) 445 .newClientTransport( 446 eq(addr1), 447 eq(createClientTransportOptions()), 448 isA(TransportLogger.class)); 449 // Fail the transport 450 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 451 assertEquals(CONNECTING, internalSubchannel.getState()); 452 453 // Second attempt will start immediately. Still no new back-off policy. 454 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 455 verify(mockTransportFactory, times(++transportsAddr2)) 456 .newClientTransport( 457 eq(addr2), 458 eq(createClientTransportOptions()), 459 isA(TransportLogger.class)); 460 // Fail this one too 461 assertEquals(CONNECTING, internalSubchannel.getState()); 462 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 463 // All addresses have failed. Enter TRANSIENT_FAILURE. Back-off in effect. 464 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 465 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 466 // Back-off reset and first back-off interval begins 467 verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffNanos(); 468 verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); 469 470 // Third attempt is the first address, thus controlled by the first back-off interval. 471 fakeClock.forwardNanos(9); 472 verify(mockTransportFactory, times(transportsAddr1)) 473 .newClientTransport( 474 eq(addr1), 475 eq(createClientTransportOptions()), 476 isA(TransportLogger.class)); 477 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 478 assertNoCallbackInvoke(); 479 fakeClock.forwardNanos(1); 480 assertExactCallbackInvokes("onStateChange:CONNECTING"); 481 assertEquals(CONNECTING, internalSubchannel.getState()); 482 verify(mockTransportFactory, times(++transportsAddr1)) 483 .newClientTransport( 484 eq(addr1), 485 eq(createClientTransportOptions()), 486 isA(TransportLogger.class)); 487 488 // Final checks on invocations on back-off policies 489 verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos(); 490 verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffNanos(); 491 verify(mockBackoffPolicy3, times(backoff3Consulted)).nextBackoffNanos(); 492 } 493 494 @Test updateAddresses_emptyEagList_throws()495 public void updateAddresses_emptyEagList_throws() { 496 SocketAddress addr = new FakeSocketAddress(); 497 createInternalSubchannel(addr); 498 thrown.expect(IllegalArgumentException.class); 499 internalSubchannel.updateAddresses(Arrays.<EquivalentAddressGroup>asList()); 500 } 501 502 @Test updateAddresses_eagListWithNull_throws()503 public void updateAddresses_eagListWithNull_throws() { 504 SocketAddress addr = new FakeSocketAddress(); 505 createInternalSubchannel(addr); 506 List<EquivalentAddressGroup> eags = Arrays.asList((EquivalentAddressGroup) null); 507 thrown.expect(NullPointerException.class); 508 internalSubchannel.updateAddresses(eags); 509 } 510 updateAddresses_intersecting_ready()511 @Test public void updateAddresses_intersecting_ready() { 512 SocketAddress addr1 = mock(SocketAddress.class); 513 SocketAddress addr2 = mock(SocketAddress.class); 514 SocketAddress addr3 = mock(SocketAddress.class); 515 createInternalSubchannel(addr1, addr2); 516 assertEquals(IDLE, internalSubchannel.getState()); 517 518 // First address fails 519 assertNull(internalSubchannel.obtainActiveTransport()); 520 assertExactCallbackInvokes("onStateChange:CONNECTING"); 521 verify(mockTransportFactory) 522 .newClientTransport( 523 eq(addr1), 524 eq(createClientTransportOptions()), 525 isA(TransportLogger.class)); 526 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 527 assertEquals(CONNECTING, internalSubchannel.getState()); 528 529 // Second address connects 530 verify(mockTransportFactory) 531 .newClientTransport( 532 eq(addr2), 533 eq(createClientTransportOptions()), 534 isA(TransportLogger.class)); 535 transports.peek().listener.transportReady(); 536 assertExactCallbackInvokes("onStateChange:READY"); 537 assertEquals(READY, internalSubchannel.getState()); 538 539 // Update addresses 540 internalSubchannel.updateAddresses( 541 Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); 542 assertNoCallbackInvoke(); 543 assertEquals(READY, internalSubchannel.getState()); 544 verify(transports.peek().transport, never()).shutdown(any(Status.class)); 545 verify(transports.peek().transport, never()).shutdownNow(any(Status.class)); 546 547 // And new addresses chosen when re-connecting 548 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 549 assertExactCallbackInvokes("onStateChange:IDLE"); 550 551 assertNull(internalSubchannel.obtainActiveTransport()); 552 assertEquals(0, fakeClock.numPendingTasks()); 553 verify(mockTransportFactory, times(2)) 554 .newClientTransport( 555 eq(addr2), 556 eq(createClientTransportOptions()), 557 isA(TransportLogger.class)); 558 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 559 verify(mockTransportFactory) 560 .newClientTransport( 561 eq(addr3), 562 eq(createClientTransportOptions()), 563 isA(TransportLogger.class)); 564 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 565 verifyNoMoreInteractions(mockTransportFactory); 566 567 fakeClock.forwardNanos(10); // Drain retry, but don't care about result 568 } 569 updateAddresses_intersecting_connecting()570 @Test public void updateAddresses_intersecting_connecting() { 571 SocketAddress addr1 = mock(SocketAddress.class); 572 SocketAddress addr2 = mock(SocketAddress.class); 573 SocketAddress addr3 = mock(SocketAddress.class); 574 createInternalSubchannel(addr1, addr2); 575 assertEquals(IDLE, internalSubchannel.getState()); 576 577 // First address fails 578 assertNull(internalSubchannel.obtainActiveTransport()); 579 assertExactCallbackInvokes("onStateChange:CONNECTING"); 580 verify(mockTransportFactory) 581 .newClientTransport( 582 eq(addr1), 583 eq(createClientTransportOptions()), 584 isA(TransportLogger.class)); 585 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 586 assertEquals(CONNECTING, internalSubchannel.getState()); 587 588 // Second address connecting 589 verify(mockTransportFactory) 590 .newClientTransport( 591 eq(addr2), 592 eq(createClientTransportOptions()), 593 isA(TransportLogger.class)); 594 assertNoCallbackInvoke(); 595 assertEquals(CONNECTING, internalSubchannel.getState()); 596 597 // Update addresses 598 internalSubchannel.updateAddresses( 599 Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); 600 assertNoCallbackInvoke(); 601 assertEquals(CONNECTING, internalSubchannel.getState()); 602 verify(transports.peek().transport, never()).shutdown(any(Status.class)); 603 verify(transports.peek().transport, never()).shutdownNow(any(Status.class)); 604 605 // And new addresses chosen when re-connecting 606 transports.peek().listener.transportReady(); 607 assertExactCallbackInvokes("onStateChange:READY"); 608 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 609 assertExactCallbackInvokes("onStateChange:IDLE"); 610 611 assertNull(internalSubchannel.obtainActiveTransport()); 612 assertEquals(0, fakeClock.numPendingTasks()); 613 verify(mockTransportFactory, times(2)) 614 .newClientTransport( 615 eq(addr2), 616 eq(createClientTransportOptions()), 617 isA(TransportLogger.class)); 618 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 619 verify(mockTransportFactory) 620 .newClientTransport( 621 eq(addr3), 622 eq(createClientTransportOptions()), 623 isA(TransportLogger.class)); 624 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 625 verifyNoMoreInteractions(mockTransportFactory); 626 627 fakeClock.forwardNanos(10); // Drain retry, but don't care about result 628 } 629 updateAddresses_disjoint_idle()630 @Test public void updateAddresses_disjoint_idle() { 631 SocketAddress addr1 = mock(SocketAddress.class); 632 SocketAddress addr2 = mock(SocketAddress.class); 633 634 createInternalSubchannel(addr1); 635 internalSubchannel.updateAddresses(Arrays.asList(new EquivalentAddressGroup(addr2))); 636 637 // Nothing happened on address update 638 verify(mockTransportFactory, never()) 639 .newClientTransport( 640 eq(addr1), 641 eq(createClientTransportOptions()), 642 isA(TransportLogger.class)); 643 verify(mockTransportFactory, never()) 644 .newClientTransport( 645 eq(addr2), 646 eq(createClientTransportOptions()), 647 isA(TransportLogger.class)); 648 verifyNoMoreInteractions(mockTransportFactory); 649 assertNoCallbackInvoke(); 650 assertEquals(IDLE, internalSubchannel.getState()); 651 652 // But new address chosen when connecting 653 assertNull(internalSubchannel.obtainActiveTransport()); 654 assertExactCallbackInvokes("onStateChange:CONNECTING"); 655 verify(mockTransportFactory) 656 .newClientTransport( 657 eq(addr2), 658 eq(createClientTransportOptions()), 659 isA(TransportLogger.class)); 660 661 // And no other addresses attempted 662 assertEquals(0, fakeClock.numPendingTasks()); 663 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 664 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 665 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 666 verifyNoMoreInteractions(mockTransportFactory); 667 668 fakeClock.forwardNanos(10); // Drain retry, but don't care about result 669 } 670 updateAddresses_disjoint_ready()671 @Test public void updateAddresses_disjoint_ready() { 672 SocketAddress addr1 = mock(SocketAddress.class); 673 SocketAddress addr2 = mock(SocketAddress.class); 674 SocketAddress addr3 = mock(SocketAddress.class); 675 SocketAddress addr4 = mock(SocketAddress.class); 676 createInternalSubchannel(addr1, addr2); 677 assertEquals(IDLE, internalSubchannel.getState()); 678 679 // First address fails 680 assertNull(internalSubchannel.obtainActiveTransport()); 681 assertExactCallbackInvokes("onStateChange:CONNECTING"); 682 verify(mockTransportFactory) 683 .newClientTransport( 684 eq(addr1), 685 eq(createClientTransportOptions()), 686 isA(TransportLogger.class)); 687 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 688 assertEquals(CONNECTING, internalSubchannel.getState()); 689 690 // Second address connects 691 verify(mockTransportFactory) 692 .newClientTransport( 693 eq(addr2), 694 eq(createClientTransportOptions()), 695 isA(TransportLogger.class)); 696 transports.peek().listener.transportReady(); 697 assertExactCallbackInvokes("onStateChange:READY"); 698 assertEquals(READY, internalSubchannel.getState()); 699 700 // Update addresses 701 internalSubchannel.updateAddresses( 702 Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4)))); 703 assertExactCallbackInvokes("onStateChange:IDLE"); 704 assertEquals(IDLE, internalSubchannel.getState()); 705 verify(transports.peek().transport, never()).shutdown(any(Status.class)); 706 fakeClock.forwardNanos( 707 TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS)); 708 verify(transports.peek().transport).shutdown(any(Status.class)); 709 710 // And new addresses chosen when re-connecting 711 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 712 assertNoCallbackInvoke(); 713 assertEquals(IDLE, internalSubchannel.getState()); 714 715 assertNull(internalSubchannel.obtainActiveTransport()); 716 assertEquals(0, fakeClock.numPendingTasks()); 717 verify(mockTransportFactory) 718 .newClientTransport( 719 eq(addr3), 720 eq(createClientTransportOptions()), 721 isA(TransportLogger.class)); 722 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 723 verify(mockTransportFactory) 724 .newClientTransport( 725 eq(addr4), 726 eq(createClientTransportOptions()), 727 isA(TransportLogger.class)); 728 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 729 verifyNoMoreInteractions(mockTransportFactory); 730 731 fakeClock.forwardNanos(10); // Drain retry, but don't care about result 732 } 733 updateAddresses_disjoint_connecting()734 @Test public void updateAddresses_disjoint_connecting() { 735 SocketAddress addr1 = mock(SocketAddress.class); 736 SocketAddress addr2 = mock(SocketAddress.class); 737 SocketAddress addr3 = mock(SocketAddress.class); 738 SocketAddress addr4 = mock(SocketAddress.class); 739 createInternalSubchannel(addr1, addr2); 740 assertEquals(IDLE, internalSubchannel.getState()); 741 742 // First address fails 743 assertNull(internalSubchannel.obtainActiveTransport()); 744 assertExactCallbackInvokes("onStateChange:CONNECTING"); 745 verify(mockTransportFactory) 746 .newClientTransport( 747 eq(addr1), 748 eq(createClientTransportOptions()), 749 isA(TransportLogger.class)); 750 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 751 assertEquals(CONNECTING, internalSubchannel.getState()); 752 753 // Second address connecting 754 verify(mockTransportFactory) 755 .newClientTransport( 756 eq(addr2), 757 eq(createClientTransportOptions()), 758 isA(TransportLogger.class)); 759 assertNoCallbackInvoke(); 760 assertEquals(CONNECTING, internalSubchannel.getState()); 761 762 // Update addresses 763 internalSubchannel.updateAddresses( 764 Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4)))); 765 assertNoCallbackInvoke(); 766 assertEquals(CONNECTING, internalSubchannel.getState()); 767 768 // And new addresses chosen immediately 769 verify(transports.poll().transport).shutdown(any(Status.class)); 770 assertNoCallbackInvoke(); 771 assertEquals(CONNECTING, internalSubchannel.getState()); 772 773 assertNull(internalSubchannel.obtainActiveTransport()); 774 assertEquals(0, fakeClock.numPendingTasks()); 775 verify(mockTransportFactory) 776 .newClientTransport( 777 eq(addr3), 778 eq(createClientTransportOptions()), 779 isA(TransportLogger.class)); 780 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 781 verify(mockTransportFactory) 782 .newClientTransport( 783 eq(addr4), 784 eq(createClientTransportOptions()), 785 isA(TransportLogger.class)); 786 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 787 verifyNoMoreInteractions(mockTransportFactory); 788 789 fakeClock.forwardNanos(10); // Drain retry, but don't care about result 790 } 791 updateAddresses_disjoint_readyTwice()792 @Test public void updateAddresses_disjoint_readyTwice() { 793 SocketAddress addr1 = mock(SocketAddress.class); 794 createInternalSubchannel(addr1); 795 assertEquals(IDLE, internalSubchannel.getState()); 796 797 // Address connects 798 assertNull(internalSubchannel.obtainActiveTransport()); 799 assertExactCallbackInvokes("onStateChange:CONNECTING"); 800 verify(mockTransportFactory) 801 .newClientTransport( 802 eq(addr1), 803 eq(createClientTransportOptions()), 804 isA(TransportLogger.class)); 805 transports.peek().listener.transportReady(); 806 assertExactCallbackInvokes("onStateChange:READY"); 807 assertEquals(READY, internalSubchannel.getState()); 808 809 // Update addresses 810 SocketAddress addr2 = mock(SocketAddress.class); 811 internalSubchannel.updateAddresses( 812 Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2)))); 813 assertExactCallbackInvokes("onStateChange:IDLE"); 814 assertEquals(IDLE, internalSubchannel.getState()); 815 ConnectionClientTransport firstTransport = transports.poll().transport; 816 verify(firstTransport, never()).shutdown(any(Status.class)); 817 818 // Address connects 819 assertNull(internalSubchannel.obtainActiveTransport()); 820 assertExactCallbackInvokes("onStateChange:CONNECTING"); 821 verify(mockTransportFactory) 822 .newClientTransport( 823 eq(addr2), 824 eq(createClientTransportOptions()), 825 isA(TransportLogger.class)); 826 transports.peek().listener.transportReady(); 827 assertExactCallbackInvokes("onStateChange:READY"); 828 assertEquals(READY, internalSubchannel.getState()); 829 830 // Update addresses 831 SocketAddress addr3 = mock(SocketAddress.class); 832 internalSubchannel.updateAddresses( 833 Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3)))); 834 assertExactCallbackInvokes("onStateChange:IDLE"); 835 assertEquals(IDLE, internalSubchannel.getState()); 836 // Earlier transport is shutdown eagerly 837 verify(firstTransport).shutdown(any(Status.class)); 838 ConnectionClientTransport secondTransport = transports.peek().transport; 839 verify(secondTransport, never()).shutdown(any(Status.class)); 840 841 internalSubchannel.shutdown(SHUTDOWN_REASON); 842 verify(secondTransport).shutdown(any(Status.class)); 843 } 844 845 @Test connectIsLazy()846 public void connectIsLazy() { 847 SocketAddress addr = mock(SocketAddress.class); 848 createInternalSubchannel(addr); 849 850 // Invocation counters 851 int transportsCreated = 0; 852 853 // Won't connect until requested 854 verify(mockTransportFactory, times(transportsCreated)) 855 .newClientTransport( 856 eq(addr), 857 eq(createClientTransportOptions()), 858 isA(TransportLogger.class)); 859 860 // First attempt 861 internalSubchannel.obtainActiveTransport(); 862 assertExactCallbackInvokes("onStateChange:CONNECTING"); 863 verify(mockTransportFactory, times(++transportsCreated)) 864 .newClientTransport( 865 eq(addr), 866 eq(createClientTransportOptions()), 867 isA(TransportLogger.class)); 868 869 // Fail this one 870 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 871 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 872 873 // Will always reconnect after back-off 874 fakeClock.forwardNanos(10); 875 assertExactCallbackInvokes("onStateChange:CONNECTING"); 876 verify(mockTransportFactory, times(++transportsCreated)) 877 .newClientTransport( 878 eq(addr), 879 eq(createClientTransportOptions()), 880 isA(TransportLogger.class)); 881 882 // Make this one proceed 883 transports.peek().listener.transportReady(); 884 assertExactCallbackInvokes("onStateChange:READY"); 885 // Then go-away 886 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 887 assertExactCallbackInvokes("onStateChange:IDLE"); 888 889 // No scheduled tasks that would ever try to reconnect ... 890 assertEquals(0, fakeClock.numPendingTasks()); 891 assertEquals(0, fakeExecutor.numPendingTasks()); 892 893 // ... until it's requested. 894 internalSubchannel.obtainActiveTransport(); 895 assertExactCallbackInvokes("onStateChange:CONNECTING"); 896 verify(mockTransportFactory, times(++transportsCreated)) 897 .newClientTransport( 898 eq(addr), 899 eq(createClientTransportOptions()), 900 isA(TransportLogger.class)); 901 } 902 903 @Test shutdownWhenReady()904 public void shutdownWhenReady() throws Exception { 905 SocketAddress addr = mock(SocketAddress.class); 906 createInternalSubchannel(addr); 907 908 internalSubchannel.obtainActiveTransport(); 909 MockClientTransportInfo transportInfo = transports.poll(); 910 transportInfo.listener.transportReady(); 911 assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); 912 913 internalSubchannel.shutdown(SHUTDOWN_REASON); 914 verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON)); 915 assertExactCallbackInvokes("onStateChange:SHUTDOWN"); 916 transportInfo.listener.transportShutdown(SHUTDOWN_REASON); 917 918 transportInfo.listener.transportTerminated(); 919 assertExactCallbackInvokes("onTerminated"); 920 verify(transportInfo.transport, never()).shutdownNow(any(Status.class)); 921 } 922 923 @Test shutdownBeforeTransportCreated()924 public void shutdownBeforeTransportCreated() throws Exception { 925 SocketAddress addr = mock(SocketAddress.class); 926 createInternalSubchannel(addr); 927 928 // First transport is created immediately 929 internalSubchannel.obtainActiveTransport(); 930 assertExactCallbackInvokes("onStateChange:CONNECTING"); 931 verify(mockTransportFactory) 932 .newClientTransport( 933 eq(addr), 934 eq(createClientTransportOptions()), 935 isA(TransportLogger.class)); 936 937 // Fail this one 938 MockClientTransportInfo transportInfo = transports.poll(); 939 transportInfo.listener.transportShutdown(Status.UNAVAILABLE); 940 transportInfo.listener.transportTerminated(); 941 942 // Entering TRANSIENT_FAILURE, waiting for back-off 943 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 944 945 // Save the reconnectTask before shutting down 946 FakeClock.ScheduledTask reconnectTask = null; 947 for (FakeClock.ScheduledTask task : fakeClock.getPendingTasks()) { 948 if (task.command.toString().contains("EndOfCurrentBackoff")) { 949 assertNull("There shouldn't be more than one reconnectTask", reconnectTask); 950 assertFalse(task.isDone()); 951 reconnectTask = task; 952 } 953 } 954 assertNotNull("There should be at least one reconnectTask", reconnectTask); 955 956 // Shut down InternalSubchannel before the transport is created. 957 internalSubchannel.shutdown(SHUTDOWN_REASON); 958 assertTrue(reconnectTask.isCancelled()); 959 // InternalSubchannel terminated promptly. 960 assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated"); 961 962 // Simulate a race between reconnectTask cancellation and execution -- the task runs anyway. 963 // This should not lead to the creation of a new transport. 964 reconnectTask.command.run(); 965 966 // Futher call to obtainActiveTransport() is no-op. 967 assertNull(internalSubchannel.obtainActiveTransport()); 968 assertEquals(SHUTDOWN, internalSubchannel.getState()); 969 assertNoCallbackInvoke(); 970 971 // No more transports will be created. 972 fakeClock.forwardNanos(10000); 973 assertEquals(SHUTDOWN, internalSubchannel.getState()); 974 verifyNoMoreInteractions(mockTransportFactory); 975 assertEquals(0, transports.size()); 976 assertNoCallbackInvoke(); 977 } 978 979 @Test shutdownBeforeTransportReady()980 public void shutdownBeforeTransportReady() throws Exception { 981 SocketAddress addr = mock(SocketAddress.class); 982 createInternalSubchannel(addr); 983 984 internalSubchannel.obtainActiveTransport(); 985 assertExactCallbackInvokes("onStateChange:CONNECTING"); 986 MockClientTransportInfo transportInfo = transports.poll(); 987 988 // Shutdown the InternalSubchannel before the pending transport is ready 989 assertNull(internalSubchannel.obtainActiveTransport()); 990 internalSubchannel.shutdown(SHUTDOWN_REASON); 991 assertExactCallbackInvokes("onStateChange:SHUTDOWN"); 992 993 // The transport should've been shut down even though it's not the active transport yet. 994 verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON)); 995 transportInfo.listener.transportShutdown(Status.UNAVAILABLE); 996 assertNoCallbackInvoke(); 997 transportInfo.listener.transportTerminated(); 998 assertExactCallbackInvokes("onTerminated"); 999 assertEquals(SHUTDOWN, internalSubchannel.getState()); 1000 } 1001 1002 @Test shutdownNow()1003 public void shutdownNow() throws Exception { 1004 SocketAddress addr = mock(SocketAddress.class); 1005 createInternalSubchannel(addr); 1006 1007 internalSubchannel.obtainActiveTransport(); 1008 MockClientTransportInfo t1 = transports.poll(); 1009 t1.listener.transportReady(); 1010 assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); 1011 t1.listener.transportShutdown(Status.UNAVAILABLE); 1012 assertExactCallbackInvokes("onStateChange:IDLE"); 1013 1014 internalSubchannel.obtainActiveTransport(); 1015 assertExactCallbackInvokes("onStateChange:CONNECTING"); 1016 MockClientTransportInfo t2 = transports.poll(); 1017 1018 Status status = Status.UNAVAILABLE.withDescription("Requested"); 1019 internalSubchannel.shutdownNow(status); 1020 1021 verify(t1.transport).shutdownNow(same(status)); 1022 verify(t2.transport).shutdownNow(same(status)); 1023 assertExactCallbackInvokes("onStateChange:SHUTDOWN"); 1024 } 1025 1026 @Test obtainTransportAfterShutdown()1027 public void obtainTransportAfterShutdown() throws Exception { 1028 SocketAddress addr = mock(SocketAddress.class); 1029 createInternalSubchannel(addr); 1030 1031 internalSubchannel.shutdown(SHUTDOWN_REASON); 1032 assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated"); 1033 assertEquals(SHUTDOWN, internalSubchannel.getState()); 1034 assertNull(internalSubchannel.obtainActiveTransport()); 1035 verify(mockTransportFactory, times(0)) 1036 .newClientTransport( 1037 addr, 1038 createClientTransportOptions(), 1039 internalSubchannel.getChannelLogger()); 1040 assertNoCallbackInvoke(); 1041 assertEquals(SHUTDOWN, internalSubchannel.getState()); 1042 } 1043 1044 @Test logId()1045 public void logId() { 1046 createInternalSubchannel(mock(SocketAddress.class)); 1047 1048 assertNotNull(internalSubchannel.getLogId()); 1049 } 1050 1051 @Test inUseState()1052 public void inUseState() { 1053 SocketAddress addr = mock(SocketAddress.class); 1054 createInternalSubchannel(addr); 1055 1056 internalSubchannel.obtainActiveTransport(); 1057 MockClientTransportInfo t0 = transports.poll(); 1058 t0.listener.transportReady(); 1059 assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); 1060 t0.listener.transportInUse(true); 1061 assertExactCallbackInvokes("onInUse"); 1062 1063 t0.listener.transportInUse(false); 1064 assertExactCallbackInvokes("onNotInUse"); 1065 1066 t0.listener.transportInUse(true); 1067 assertExactCallbackInvokes("onInUse"); 1068 t0.listener.transportShutdown(Status.UNAVAILABLE); 1069 assertExactCallbackInvokes("onStateChange:IDLE"); 1070 1071 assertNull(internalSubchannel.obtainActiveTransport()); 1072 MockClientTransportInfo t1 = transports.poll(); 1073 t1.listener.transportReady(); 1074 assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); 1075 t1.listener.transportInUse(true); 1076 // InternalSubchannel is already in-use, thus doesn't call the callback 1077 assertNoCallbackInvoke(); 1078 1079 t1.listener.transportInUse(false); 1080 // t0 is still in-use 1081 assertNoCallbackInvoke(); 1082 1083 t0.listener.transportInUse(false); 1084 assertExactCallbackInvokes("onNotInUse"); 1085 } 1086 1087 @Test transportTerminateWithoutExitingInUse()1088 public void transportTerminateWithoutExitingInUse() { 1089 // An imperfect transport that terminates without going out of in-use. InternalSubchannel will 1090 // clear the in-use bit for it. 1091 SocketAddress addr = mock(SocketAddress.class); 1092 createInternalSubchannel(addr); 1093 1094 internalSubchannel.obtainActiveTransport(); 1095 MockClientTransportInfo t0 = transports.poll(); 1096 t0.listener.transportReady(); 1097 assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); 1098 t0.listener.transportInUse(true); 1099 assertExactCallbackInvokes("onInUse"); 1100 1101 t0.listener.transportShutdown(Status.UNAVAILABLE); 1102 assertExactCallbackInvokes("onStateChange:IDLE"); 1103 t0.listener.transportTerminated(); 1104 assertExactCallbackInvokes("onNotInUse"); 1105 } 1106 1107 @Test transportStartReturnsRunnable()1108 public void transportStartReturnsRunnable() { 1109 SocketAddress addr1 = mock(SocketAddress.class); 1110 SocketAddress addr2 = mock(SocketAddress.class); 1111 createInternalSubchannel(addr1, addr2); 1112 final AtomicInteger runnableInvokes = new AtomicInteger(0); 1113 Runnable startRunnable = new Runnable() { 1114 @Override 1115 public void run() { 1116 runnableInvokes.incrementAndGet(); 1117 } 1118 }; 1119 transports = TestUtils.captureTransports(mockTransportFactory, startRunnable); 1120 1121 assertEquals(0, runnableInvokes.get()); 1122 internalSubchannel.obtainActiveTransport(); 1123 assertEquals(1, runnableInvokes.get()); 1124 internalSubchannel.obtainActiveTransport(); 1125 assertEquals(1, runnableInvokes.get()); 1126 1127 MockClientTransportInfo t0 = transports.poll(); 1128 t0.listener.transportShutdown(Status.UNAVAILABLE); 1129 assertEquals(2, runnableInvokes.get()); 1130 1131 // 2nd address: reconnect immediatly 1132 MockClientTransportInfo t1 = transports.poll(); 1133 t1.listener.transportShutdown(Status.UNAVAILABLE); 1134 1135 // Addresses exhausted, waiting for back-off. 1136 assertEquals(2, runnableInvokes.get()); 1137 // Run out the back-off period 1138 fakeClock.forwardNanos(10); 1139 assertEquals(3, runnableInvokes.get()); 1140 1141 // This test doesn't care about scheduled InternalSubchannel callbacks. Clear it up so that 1142 // noMorePendingTasks() won't fail. 1143 fakeExecutor.runDueTasks(); 1144 assertEquals(3, runnableInvokes.get()); 1145 } 1146 1147 @Test resetConnectBackoff()1148 public void resetConnectBackoff() throws Exception { 1149 SocketAddress addr = mock(SocketAddress.class); 1150 createInternalSubchannel(addr); 1151 1152 // Move into TRANSIENT_FAILURE to schedule reconnect 1153 internalSubchannel.obtainActiveTransport(); 1154 assertExactCallbackInvokes("onStateChange:CONNECTING"); 1155 verify(mockTransportFactory) 1156 .newClientTransport( 1157 eq(addr), 1158 eq(createClientTransportOptions()), 1159 isA(TransportLogger.class)); 1160 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 1161 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 1162 1163 // Save the reconnectTask 1164 FakeClock.ScheduledTask reconnectTask = null; 1165 for (FakeClock.ScheduledTask task : fakeClock.getPendingTasks()) { 1166 if (task.command.toString().contains("EndOfCurrentBackoff")) { 1167 assertNull("There shouldn't be more than one reconnectTask", reconnectTask); 1168 assertFalse(task.isDone()); 1169 reconnectTask = task; 1170 } 1171 } 1172 assertNotNull("There should be at least one reconnectTask", reconnectTask); 1173 1174 internalSubchannel.resetConnectBackoff(); 1175 1176 verify(mockTransportFactory, times(2)) 1177 .newClientTransport( 1178 eq(addr), 1179 eq(createClientTransportOptions()), 1180 isA(TransportLogger.class)); 1181 assertExactCallbackInvokes("onStateChange:CONNECTING"); 1182 assertTrue(reconnectTask.isCancelled()); 1183 1184 // Simulate a race between cancel and the task scheduler. Should be a no-op. 1185 reconnectTask.command.run(); 1186 assertNoCallbackInvoke(); 1187 verify(mockTransportFactory, times(2)) 1188 .newClientTransport( 1189 eq(addr), 1190 eq(createClientTransportOptions()), 1191 isA(TransportLogger.class)); 1192 verify(mockBackoffPolicyProvider, times(1)).get(); 1193 1194 // Fail the reconnect attempt to verify that a fresh reconnect policy is generated after 1195 // invoking resetConnectBackoff() 1196 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 1197 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 1198 verify(mockBackoffPolicyProvider, times(2)).get(); 1199 fakeClock.forwardNanos(10); 1200 assertExactCallbackInvokes("onStateChange:CONNECTING"); 1201 assertEquals(CONNECTING, internalSubchannel.getState()); 1202 } 1203 1204 @Test resetConnectBackoff_noopOnIdleTransport()1205 public void resetConnectBackoff_noopOnIdleTransport() throws Exception { 1206 SocketAddress addr = mock(SocketAddress.class); 1207 createInternalSubchannel(addr); 1208 assertEquals(IDLE, internalSubchannel.getState()); 1209 1210 internalSubchannel.resetConnectBackoff(); 1211 1212 assertNoCallbackInvoke(); 1213 } 1214 1215 @Test channelzMembership()1216 public void channelzMembership() throws Exception { 1217 SocketAddress addr1 = mock(SocketAddress.class); 1218 createInternalSubchannel(addr1); 1219 internalSubchannel.obtainActiveTransport(); 1220 1221 MockClientTransportInfo t0 = transports.poll(); 1222 t0.listener.transportReady(); 1223 assertTrue(channelz.containsClientSocket(t0.transport.getLogId())); 1224 t0.listener.transportShutdown(Status.RESOURCE_EXHAUSTED); 1225 t0.listener.transportTerminated(); 1226 assertFalse(channelz.containsClientSocket(t0.transport.getLogId())); 1227 } 1228 1229 @Test channelzStatContainsTransport()1230 public void channelzStatContainsTransport() throws Exception { 1231 SocketAddress addr = new SocketAddress() {}; 1232 assertThat(transports).isEmpty(); 1233 createInternalSubchannel(addr); 1234 internalSubchannel.obtainActiveTransport(); 1235 1236 InternalWithLogId registeredTransport 1237 = Iterables.getOnlyElement(internalSubchannel.getStats().get().sockets); 1238 MockClientTransportInfo actualTransport = Iterables.getOnlyElement(transports); 1239 assertEquals(actualTransport.transport.getLogId(), registeredTransport.getLogId()); 1240 } 1241 index_looping()1242 @Test public void index_looping() { 1243 Attributes.Key<String> key = Attributes.Key.create("some-key"); 1244 Attributes attr1 = Attributes.newBuilder().set(key, "1").build(); 1245 Attributes attr2 = Attributes.newBuilder().set(key, "2").build(); 1246 Attributes attr3 = Attributes.newBuilder().set(key, "3").build(); 1247 SocketAddress addr1 = new FakeSocketAddress(); 1248 SocketAddress addr2 = new FakeSocketAddress(); 1249 SocketAddress addr3 = new FakeSocketAddress(); 1250 SocketAddress addr4 = new FakeSocketAddress(); 1251 SocketAddress addr5 = new FakeSocketAddress(); 1252 Index index = new Index(Arrays.asList( 1253 new EquivalentAddressGroup(Arrays.asList(addr1, addr2), attr1), 1254 new EquivalentAddressGroup(Arrays.asList(addr3), attr2), 1255 new EquivalentAddressGroup(Arrays.asList(addr4, addr5), attr3))); 1256 assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1); 1257 assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1); 1258 assertThat(index.isAtBeginning()).isTrue(); 1259 assertThat(index.isValid()).isTrue(); 1260 1261 index.increment(); 1262 assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2); 1263 assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1); 1264 assertThat(index.isAtBeginning()).isFalse(); 1265 assertThat(index.isValid()).isTrue(); 1266 1267 index.increment(); 1268 assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3); 1269 assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr2); 1270 assertThat(index.isAtBeginning()).isFalse(); 1271 assertThat(index.isValid()).isTrue(); 1272 1273 index.increment(); 1274 assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4); 1275 assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr3); 1276 assertThat(index.isAtBeginning()).isFalse(); 1277 assertThat(index.isValid()).isTrue(); 1278 1279 index.increment(); 1280 assertThat(index.getCurrentAddress()).isSameInstanceAs(addr5); 1281 assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr3); 1282 assertThat(index.isAtBeginning()).isFalse(); 1283 assertThat(index.isValid()).isTrue(); 1284 1285 index.increment(); 1286 assertThat(index.isAtBeginning()).isFalse(); 1287 assertThat(index.isValid()).isFalse(); 1288 1289 index.reset(); 1290 assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1); 1291 assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1); 1292 assertThat(index.isAtBeginning()).isTrue(); 1293 assertThat(index.isValid()).isTrue(); 1294 1295 // We want to make sure both groupIndex and addressIndex are reset 1296 index.increment(); 1297 index.increment(); 1298 index.increment(); 1299 index.increment(); 1300 assertThat(index.getCurrentAddress()).isSameInstanceAs(addr5); 1301 assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr3); 1302 index.reset(); 1303 assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1); 1304 assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1); 1305 } 1306 index_updateGroups_resets()1307 @Test public void index_updateGroups_resets() { 1308 SocketAddress addr1 = new FakeSocketAddress(); 1309 SocketAddress addr2 = new FakeSocketAddress(); 1310 SocketAddress addr3 = new FakeSocketAddress(); 1311 Index index = new Index(Arrays.asList( 1312 new EquivalentAddressGroup(Arrays.asList(addr1)), 1313 new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); 1314 index.increment(); 1315 index.increment(); 1316 // We want to make sure both groupIndex and addressIndex are reset 1317 index.updateGroups(Arrays.asList( 1318 new EquivalentAddressGroup(Arrays.asList(addr1)), 1319 new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); 1320 assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1); 1321 } 1322 index_seekTo()1323 @Test public void index_seekTo() { 1324 SocketAddress addr1 = new FakeSocketAddress(); 1325 SocketAddress addr2 = new FakeSocketAddress(); 1326 SocketAddress addr3 = new FakeSocketAddress(); 1327 Index index = new Index(Arrays.asList( 1328 new EquivalentAddressGroup(Arrays.asList(addr1, addr2)), 1329 new EquivalentAddressGroup(Arrays.asList(addr3)))); 1330 assertThat(index.seekTo(addr3)).isTrue(); 1331 assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3); 1332 assertThat(index.seekTo(addr1)).isTrue(); 1333 assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1); 1334 assertThat(index.seekTo(addr2)).isTrue(); 1335 assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2); 1336 index.seekTo(new FakeSocketAddress()); 1337 // Failed seekTo doesn't change the index 1338 assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2); 1339 } 1340 1341 /** Create ClientTransportOptions. Should not be reused if it may be mutated. */ createClientTransportOptions()1342 private ClientTransportFactory.ClientTransportOptions createClientTransportOptions() { 1343 return new ClientTransportFactory.ClientTransportOptions() 1344 .setAuthority(AUTHORITY) 1345 .setUserAgent(USER_AGENT); 1346 } 1347 createInternalSubchannel(SocketAddress .... addrs)1348 private void createInternalSubchannel(SocketAddress ... addrs) { 1349 createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addrs))); 1350 } 1351 createInternalSubchannel(EquivalentAddressGroup .... addrs)1352 private void createInternalSubchannel(EquivalentAddressGroup ... addrs) { 1353 List<EquivalentAddressGroup> addressGroups = Arrays.asList(addrs); 1354 InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY); 1355 ChannelTracer subchannelTracer = new ChannelTracer(logId, 10, 1356 fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel"); 1357 internalSubchannel = new InternalSubchannel(addressGroups, AUTHORITY, USER_AGENT, 1358 mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(), 1359 fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback, 1360 channelz, CallTracer.getDefaultFactory().create(), 1361 subchannelTracer, 1362 logId, 1363 new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider())); 1364 } 1365 assertNoCallbackInvoke()1366 private void assertNoCallbackInvoke() { 1367 while (fakeExecutor.runDueTasks() > 0) {} 1368 assertEquals(0, callbackInvokes.size()); 1369 } 1370 assertExactCallbackInvokes(String .... expectedInvokes)1371 private void assertExactCallbackInvokes(String ... expectedInvokes) { 1372 assertEquals(Arrays.asList(expectedInvokes), callbackInvokes); 1373 callbackInvokes.clear(); 1374 } 1375 1376 private static class FakeSocketAddress extends SocketAddress {} 1377 } 1378