1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static io.grpc.ConnectivityState.CONNECTING; 21 import static io.grpc.ConnectivityState.IDLE; 22 import static io.grpc.ConnectivityState.READY; 23 import static io.grpc.ConnectivityState.SHUTDOWN; 24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 25 import static org.junit.Assert.assertEquals; 26 import static org.junit.Assert.assertFalse; 27 import static org.junit.Assert.assertNotNull; 28 import static org.junit.Assert.assertNull; 29 import static org.junit.Assert.assertSame; 30 import static org.junit.Assert.assertTrue; 31 import static org.mockito.Matchers.any; 32 import static org.mockito.Matchers.same; 33 import static org.mockito.Mockito.mock; 34 import static org.mockito.Mockito.never; 35 import static org.mockito.Mockito.times; 36 import static org.mockito.Mockito.verify; 37 import static org.mockito.Mockito.verifyNoMoreInteractions; 38 import static org.mockito.Mockito.when; 39 40 import com.google.common.collect.Iterables; 41 import io.grpc.Attributes; 42 import io.grpc.ConnectivityStateInfo; 43 import io.grpc.EquivalentAddressGroup; 44 import io.grpc.InternalChannelz; 45 import io.grpc.InternalWithLogId; 46 import io.grpc.Status; 47 import io.grpc.internal.InternalSubchannel.CallTracingTransport; 48 import io.grpc.internal.InternalSubchannel.Index; 49 import io.grpc.internal.TestUtils.MockClientTransportInfo; 50 import java.net.SocketAddress; 51 import java.util.Arrays; 52 import java.util.LinkedList; 53 import java.util.List; 54 import java.util.concurrent.BlockingQueue; 55 import java.util.concurrent.atomic.AtomicInteger; 56 import org.junit.After; 57 import org.junit.Before; 58 import org.junit.Rule; 59 import org.junit.Test; 60 import org.junit.rules.ExpectedException; 61 import org.junit.runner.RunWith; 62 import org.junit.runners.JUnit4; 63 import org.mockito.Mock; 64 import org.mockito.MockitoAnnotations; 65 66 /** 67 * Unit tests for {@link InternalSubchannel}. 68 */ 69 @RunWith(JUnit4.class) 70 public class InternalSubchannelTest { 71 72 @Rule 73 public final ExpectedException thrown = ExpectedException.none(); 74 75 private static final String AUTHORITY = "fakeauthority"; 76 private static final String USER_AGENT = "mosaic"; 77 private static final ConnectivityStateInfo UNAVAILABLE_STATE = 78 ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE); 79 private static final ConnectivityStateInfo RESOURCE_EXHAUSTED_STATE = 80 ConnectivityStateInfo.forTransientFailure(Status.RESOURCE_EXHAUSTED); 81 private static final Status SHUTDOWN_REASON = Status.UNAVAILABLE.withDescription("for test"); 82 83 // For scheduled executor 84 private final FakeClock fakeClock = new FakeClock(); 85 // For channelExecutor 86 private final FakeClock fakeExecutor = new FakeClock(); 87 private final ChannelExecutor channelExecutor = new ChannelExecutor(); 88 89 private final InternalChannelz channelz = new InternalChannelz(); 90 91 @Mock private BackoffPolicy mockBackoffPolicy1; 92 @Mock private BackoffPolicy mockBackoffPolicy2; 93 @Mock private BackoffPolicy mockBackoffPolicy3; 94 @Mock private BackoffPolicy.Provider mockBackoffPolicyProvider; 95 @Mock private ClientTransportFactory mockTransportFactory; 96 97 private final LinkedList<String> callbackInvokes = new LinkedList<String>(); 98 private final InternalSubchannel.Callback mockInternalSubchannelCallback = 99 new InternalSubchannel.Callback() { 100 @Override 101 protected void onTerminated(InternalSubchannel is) { 102 assertSame(internalSubchannel, is); 103 callbackInvokes.add("onTerminated"); 104 } 105 106 @Override 107 protected void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { 108 assertSame(internalSubchannel, is); 109 callbackInvokes.add("onStateChange:" + newState); 110 } 111 112 @Override 113 protected void onInUse(InternalSubchannel is) { 114 assertSame(internalSubchannel, is); 115 callbackInvokes.add("onInUse"); 116 } 117 118 @Override 119 protected void onNotInUse(InternalSubchannel is) { 120 assertSame(internalSubchannel, is); 121 callbackInvokes.add("onNotInUse"); 122 } 123 }; 124 125 private InternalSubchannel internalSubchannel; 126 private BlockingQueue<MockClientTransportInfo> transports; 127 setUp()128 @Before public void setUp() { 129 MockitoAnnotations.initMocks(this); 130 131 when(mockBackoffPolicyProvider.get()) 132 .thenReturn(mockBackoffPolicy1, mockBackoffPolicy2, mockBackoffPolicy3); 133 when(mockBackoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); 134 when(mockBackoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L); 135 when(mockBackoffPolicy3.nextBackoffNanos()).thenReturn(10L, 100L); 136 transports = TestUtils.captureTransports(mockTransportFactory); 137 } 138 noMorePendingTasks()139 @After public void noMorePendingTasks() { 140 assertEquals(0, fakeClock.numPendingTasks()); 141 assertEquals(0, fakeExecutor.numPendingTasks()); 142 } 143 144 @Test(expected = IllegalArgumentException.class) constructor_emptyEagList_throws()145 public void constructor_emptyEagList_throws() { 146 createInternalSubchannel(new EquivalentAddressGroup[0]); 147 } 148 149 @Test(expected = NullPointerException.class) constructor_eagListWithNull_throws()150 public void constructor_eagListWithNull_throws() { 151 createInternalSubchannel(new EquivalentAddressGroup[] {null}); 152 } 153 eagAttribute_propagatesToTransport()154 @Test public void eagAttribute_propagatesToTransport() { 155 SocketAddress addr = new SocketAddress() {}; 156 Attributes attr = Attributes.newBuilder().set(Attributes.Key.create("some-key"), "1").build(); 157 createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addr), attr)); 158 159 // First attempt 160 assertNull(internalSubchannel.obtainActiveTransport()); 161 assertEquals(CONNECTING, internalSubchannel.getState()); 162 verify(mockTransportFactory) 163 .newClientTransport(addr, createClientTransportOptions().setEagAttributes(attr)); 164 } 165 singleAddressReconnect()166 @Test public void singleAddressReconnect() { 167 SocketAddress addr = mock(SocketAddress.class); 168 createInternalSubchannel(addr); 169 assertEquals(IDLE, internalSubchannel.getState()); 170 171 // Invocation counters 172 int transportsCreated = 0; 173 int backoff1Consulted = 0; 174 int backoff2Consulted = 0; 175 int backoffReset = 0; 176 177 // First attempt 178 assertEquals(IDLE, internalSubchannel.getState()); 179 assertNoCallbackInvoke(); 180 assertNull(internalSubchannel.obtainActiveTransport()); 181 assertExactCallbackInvokes("onStateChange:CONNECTING"); 182 assertEquals(CONNECTING, internalSubchannel.getState()); 183 verify(mockTransportFactory, times(++transportsCreated)) 184 .newClientTransport(addr, createClientTransportOptions()); 185 186 // Fail this one. Because there is only one address to try, enter TRANSIENT_FAILURE. 187 assertNoCallbackInvoke(); 188 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 189 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 190 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 191 // Backoff reset and using first back-off value interval 192 verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos(); 193 verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); 194 195 // Second attempt 196 // Transport creation doesn't happen until time is due 197 fakeClock.forwardNanos(9); 198 assertNull(internalSubchannel.obtainActiveTransport()); 199 verify(mockTransportFactory, times(transportsCreated)) 200 .newClientTransport(addr, createClientTransportOptions()); 201 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 202 203 assertNoCallbackInvoke(); 204 fakeClock.forwardNanos(1); 205 assertExactCallbackInvokes("onStateChange:CONNECTING"); 206 assertEquals(CONNECTING, internalSubchannel.getState()); 207 verify(mockTransportFactory, times(++transportsCreated)) 208 .newClientTransport(addr, createClientTransportOptions()); 209 // Fail this one too 210 assertNoCallbackInvoke(); 211 // Here we use a different status from the first failure, and verify that it's passed to 212 // the callback. 213 transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED); 214 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 215 assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE); 216 // Second back-off interval 217 verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos(); 218 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 219 220 // Third attempt 221 // Transport creation doesn't happen until time is due 222 fakeClock.forwardNanos(99); 223 assertNull(internalSubchannel.obtainActiveTransport()); 224 verify(mockTransportFactory, times(transportsCreated)) 225 .newClientTransport(addr, createClientTransportOptions()); 226 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 227 assertNoCallbackInvoke(); 228 fakeClock.forwardNanos(1); 229 assertEquals(CONNECTING, internalSubchannel.getState()); 230 assertExactCallbackInvokes("onStateChange:CONNECTING"); 231 assertNull(internalSubchannel.obtainActiveTransport()); 232 verify(mockTransportFactory, times(++transportsCreated)) 233 .newClientTransport(addr, createClientTransportOptions()); 234 // Let this one succeed, will enter READY state. 235 assertNoCallbackInvoke(); 236 transports.peek().listener.transportReady(); 237 assertExactCallbackInvokes("onStateChange:READY"); 238 assertEquals(READY, internalSubchannel.getState()); 239 assertSame( 240 transports.peek().transport, 241 ((CallTracingTransport) internalSubchannel.obtainActiveTransport()).delegate()); 242 243 // Close the READY transport, will enter IDLE state. 244 assertNoCallbackInvoke(); 245 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 246 assertEquals(IDLE, internalSubchannel.getState()); 247 assertExactCallbackInvokes("onStateChange:IDLE"); 248 249 // Back-off is reset, and the next attempt will happen immediately 250 assertNull(internalSubchannel.obtainActiveTransport()); 251 assertEquals(CONNECTING, internalSubchannel.getState()); 252 assertExactCallbackInvokes("onStateChange:CONNECTING"); 253 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 254 verify(mockTransportFactory, times(++transportsCreated)) 255 .newClientTransport(addr, createClientTransportOptions()); 256 257 // Final checks for consultations on back-off policies 258 verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos(); 259 verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffNanos(); 260 } 261 twoAddressesReconnect()262 @Test public void twoAddressesReconnect() { 263 SocketAddress addr1 = mock(SocketAddress.class); 264 SocketAddress addr2 = mock(SocketAddress.class); 265 createInternalSubchannel(addr1, addr2); 266 assertEquals(IDLE, internalSubchannel.getState()); 267 // Invocation counters 268 int transportsAddr1 = 0; 269 int transportsAddr2 = 0; 270 int backoff1Consulted = 0; 271 int backoff2Consulted = 0; 272 int backoff3Consulted = 0; 273 int backoffReset = 0; 274 275 // First attempt 276 assertNoCallbackInvoke(); 277 assertNull(internalSubchannel.obtainActiveTransport()); 278 assertExactCallbackInvokes("onStateChange:CONNECTING"); 279 assertEquals(CONNECTING, internalSubchannel.getState()); 280 verify(mockTransportFactory, times(++transportsAddr1)) 281 .newClientTransport(addr1, createClientTransportOptions()); 282 283 // Let this one fail without success 284 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 285 // Still in CONNECTING 286 assertNull(internalSubchannel.obtainActiveTransport()); 287 assertNoCallbackInvoke(); 288 assertEquals(CONNECTING, internalSubchannel.getState()); 289 290 // Second attempt will start immediately. Still no back-off policy. 291 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 292 verify(mockTransportFactory, times(++transportsAddr2)) 293 .newClientTransport(addr2, createClientTransportOptions()); 294 assertNull(internalSubchannel.obtainActiveTransport()); 295 // Fail this one too 296 assertNoCallbackInvoke(); 297 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 298 // All addresses have failed. Delayed transport will be in back-off interval. 299 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 300 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 301 // Backoff reset and first back-off interval begins 302 verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos(); 303 verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); 304 305 // No reconnect during TRANSIENT_FAILURE even when requested. 306 assertNull(internalSubchannel.obtainActiveTransport()); 307 assertNoCallbackInvoke(); 308 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 309 310 // Third attempt is the first address, thus controlled by the first back-off interval. 311 fakeClock.forwardNanos(9); 312 verify(mockTransportFactory, times(transportsAddr1)) 313 .newClientTransport(addr1, createClientTransportOptions()); 314 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 315 assertNoCallbackInvoke(); 316 fakeClock.forwardNanos(1); 317 assertExactCallbackInvokes("onStateChange:CONNECTING"); 318 assertEquals(CONNECTING, internalSubchannel.getState()); 319 verify(mockTransportFactory, times(++transportsAddr1)) 320 .newClientTransport(addr1, createClientTransportOptions()); 321 // Fail this one too 322 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 323 assertEquals(CONNECTING, internalSubchannel.getState()); 324 325 // Forth attempt will start immediately. Keep back-off policy. 326 assertNull(internalSubchannel.obtainActiveTransport()); 327 assertEquals(CONNECTING, internalSubchannel.getState()); 328 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 329 verify(mockTransportFactory, times(++transportsAddr2)) 330 .newClientTransport(addr2, createClientTransportOptions()); 331 // Fail this one too 332 assertNoCallbackInvoke(); 333 transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED); 334 // All addresses have failed again. Delayed transport will be in back-off interval. 335 assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE); 336 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 337 // Second back-off interval begins 338 verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffNanos(); 339 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 340 341 // Fifth attempt for the first address, thus controlled by the second back-off interval. 342 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 343 fakeClock.forwardNanos(99); 344 verify(mockTransportFactory, times(transportsAddr1)) 345 .newClientTransport(addr1, createClientTransportOptions()); 346 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 347 assertNoCallbackInvoke(); 348 fakeClock.forwardNanos(1); 349 assertExactCallbackInvokes("onStateChange:CONNECTING"); 350 assertEquals(CONNECTING, internalSubchannel.getState()); 351 verify(mockTransportFactory, times(++transportsAddr1)) 352 .newClientTransport(addr1, createClientTransportOptions()); 353 // Let it through 354 assertNoCallbackInvoke(); 355 transports.peek().listener.transportReady(); 356 assertExactCallbackInvokes("onStateChange:READY"); 357 assertEquals(READY, internalSubchannel.getState()); 358 359 assertSame( 360 transports.peek().transport, 361 ((CallTracingTransport) internalSubchannel.obtainActiveTransport()).delegate()); 362 // Then close it. 363 assertNoCallbackInvoke(); 364 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 365 assertExactCallbackInvokes("onStateChange:IDLE"); 366 assertEquals(IDLE, internalSubchannel.getState()); 367 368 // First attempt after a successful connection. Old back-off policy should be ignored, but there 369 // is not yet a need for a new one. Start from the first address. 370 assertNull(internalSubchannel.obtainActiveTransport()); 371 assertEquals(CONNECTING, internalSubchannel.getState()); 372 assertExactCallbackInvokes("onStateChange:CONNECTING"); 373 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 374 verify(mockTransportFactory, times(++transportsAddr1)) 375 .newClientTransport(addr1, createClientTransportOptions()); 376 // Fail the transport 377 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 378 assertEquals(CONNECTING, internalSubchannel.getState()); 379 380 // Second attempt will start immediately. Still no new back-off policy. 381 verify(mockBackoffPolicyProvider, times(backoffReset)).get(); 382 verify(mockTransportFactory, times(++transportsAddr2)) 383 .newClientTransport(addr2, createClientTransportOptions()); 384 // Fail this one too 385 assertEquals(CONNECTING, internalSubchannel.getState()); 386 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 387 // All addresses have failed. Enter TRANSIENT_FAILURE. Back-off in effect. 388 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 389 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 390 // Back-off reset and first back-off interval begins 391 verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffNanos(); 392 verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); 393 394 // Third attempt is the first address, thus controlled by the first back-off interval. 395 fakeClock.forwardNanos(9); 396 verify(mockTransportFactory, times(transportsAddr1)) 397 .newClientTransport(addr1, createClientTransportOptions()); 398 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 399 assertNoCallbackInvoke(); 400 fakeClock.forwardNanos(1); 401 assertExactCallbackInvokes("onStateChange:CONNECTING"); 402 assertEquals(CONNECTING, internalSubchannel.getState()); 403 verify(mockTransportFactory, times(++transportsAddr1)) 404 .newClientTransport(addr1, createClientTransportOptions()); 405 406 // Final checks on invocations on back-off policies 407 verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos(); 408 verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffNanos(); 409 verify(mockBackoffPolicy3, times(backoff3Consulted)).nextBackoffNanos(); 410 } 411 412 @Test updateAddresses_emptyEagList_throws()413 public void updateAddresses_emptyEagList_throws() { 414 SocketAddress addr = new FakeSocketAddress(); 415 createInternalSubchannel(addr); 416 thrown.expect(IllegalArgumentException.class); 417 internalSubchannel.updateAddresses(Arrays.<EquivalentAddressGroup>asList()); 418 } 419 420 @Test updateAddresses_eagListWithNull_throws()421 public void updateAddresses_eagListWithNull_throws() { 422 SocketAddress addr = new FakeSocketAddress(); 423 createInternalSubchannel(addr); 424 List<EquivalentAddressGroup> eags = Arrays.asList((EquivalentAddressGroup) null); 425 thrown.expect(NullPointerException.class); 426 internalSubchannel.updateAddresses(eags); 427 } 428 updateAddresses_intersecting_ready()429 @Test public void updateAddresses_intersecting_ready() { 430 SocketAddress addr1 = mock(SocketAddress.class); 431 SocketAddress addr2 = mock(SocketAddress.class); 432 SocketAddress addr3 = mock(SocketAddress.class); 433 createInternalSubchannel(addr1, addr2); 434 assertEquals(IDLE, internalSubchannel.getState()); 435 436 // First address fails 437 assertNull(internalSubchannel.obtainActiveTransport()); 438 assertExactCallbackInvokes("onStateChange:CONNECTING"); 439 verify(mockTransportFactory).newClientTransport(addr1, createClientTransportOptions()); 440 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 441 assertEquals(CONNECTING, internalSubchannel.getState()); 442 443 // Second address connects 444 verify(mockTransportFactory).newClientTransport(addr2, createClientTransportOptions()); 445 transports.peek().listener.transportReady(); 446 assertExactCallbackInvokes("onStateChange:READY"); 447 assertEquals(READY, internalSubchannel.getState()); 448 449 // Update addresses 450 internalSubchannel.updateAddresses( 451 Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); 452 assertNoCallbackInvoke(); 453 assertEquals(READY, internalSubchannel.getState()); 454 verify(transports.peek().transport, never()).shutdown(any(Status.class)); 455 verify(transports.peek().transport, never()).shutdownNow(any(Status.class)); 456 457 // And new addresses chosen when re-connecting 458 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 459 assertExactCallbackInvokes("onStateChange:IDLE"); 460 461 assertNull(internalSubchannel.obtainActiveTransport()); 462 assertEquals(0, fakeClock.numPendingTasks()); 463 verify(mockTransportFactory, times(2)) 464 .newClientTransport(addr2, createClientTransportOptions()); 465 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 466 verify(mockTransportFactory).newClientTransport(addr3, createClientTransportOptions()); 467 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 468 verifyNoMoreInteractions(mockTransportFactory); 469 470 fakeClock.forwardNanos(10); // Drain retry, but don't care about result 471 } 472 updateAddresses_intersecting_connecting()473 @Test public void updateAddresses_intersecting_connecting() { 474 SocketAddress addr1 = mock(SocketAddress.class); 475 SocketAddress addr2 = mock(SocketAddress.class); 476 SocketAddress addr3 = mock(SocketAddress.class); 477 createInternalSubchannel(addr1, addr2); 478 assertEquals(IDLE, internalSubchannel.getState()); 479 480 // First address fails 481 assertNull(internalSubchannel.obtainActiveTransport()); 482 assertExactCallbackInvokes("onStateChange:CONNECTING"); 483 verify(mockTransportFactory).newClientTransport(addr1, createClientTransportOptions()); 484 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 485 assertEquals(CONNECTING, internalSubchannel.getState()); 486 487 // Second address connecting 488 verify(mockTransportFactory).newClientTransport(addr2, createClientTransportOptions()); 489 assertNoCallbackInvoke(); 490 assertEquals(CONNECTING, internalSubchannel.getState()); 491 492 // Update addresses 493 internalSubchannel.updateAddresses( 494 Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); 495 assertNoCallbackInvoke(); 496 assertEquals(CONNECTING, internalSubchannel.getState()); 497 verify(transports.peek().transport, never()).shutdown(any(Status.class)); 498 verify(transports.peek().transport, never()).shutdownNow(any(Status.class)); 499 500 // And new addresses chosen when re-connecting 501 transports.peek().listener.transportReady(); 502 assertExactCallbackInvokes("onStateChange:READY"); 503 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 504 assertExactCallbackInvokes("onStateChange:IDLE"); 505 506 assertNull(internalSubchannel.obtainActiveTransport()); 507 assertEquals(0, fakeClock.numPendingTasks()); 508 verify(mockTransportFactory, times(2)) 509 .newClientTransport(addr2, createClientTransportOptions()); 510 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 511 verify(mockTransportFactory).newClientTransport(addr3, createClientTransportOptions()); 512 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 513 verifyNoMoreInteractions(mockTransportFactory); 514 515 fakeClock.forwardNanos(10); // Drain retry, but don't care about result 516 } 517 updateAddresses_disjoint_idle()518 @Test public void updateAddresses_disjoint_idle() { 519 SocketAddress addr1 = mock(SocketAddress.class); 520 SocketAddress addr2 = mock(SocketAddress.class); 521 522 createInternalSubchannel(addr1); 523 internalSubchannel.updateAddresses(Arrays.asList(new EquivalentAddressGroup(addr2))); 524 525 // Nothing happened on address update 526 verify(mockTransportFactory, never()) 527 .newClientTransport(addr1, createClientTransportOptions()); 528 verify(mockTransportFactory, never()) 529 .newClientTransport(addr2, createClientTransportOptions()); 530 verifyNoMoreInteractions(mockTransportFactory); 531 assertNoCallbackInvoke(); 532 assertEquals(IDLE, internalSubchannel.getState()); 533 534 // But new address chosen when connecting 535 assertNull(internalSubchannel.obtainActiveTransport()); 536 assertExactCallbackInvokes("onStateChange:CONNECTING"); 537 verify(mockTransportFactory).newClientTransport(addr2, createClientTransportOptions()); 538 539 // And no other addresses attempted 540 assertEquals(0, fakeClock.numPendingTasks()); 541 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 542 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 543 assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); 544 verifyNoMoreInteractions(mockTransportFactory); 545 546 fakeClock.forwardNanos(10); // Drain retry, but don't care about result 547 } 548 updateAddresses_disjoint_ready()549 @Test public void updateAddresses_disjoint_ready() { 550 SocketAddress addr1 = mock(SocketAddress.class); 551 SocketAddress addr2 = mock(SocketAddress.class); 552 SocketAddress addr3 = mock(SocketAddress.class); 553 SocketAddress addr4 = mock(SocketAddress.class); 554 createInternalSubchannel(addr1, addr2); 555 assertEquals(IDLE, internalSubchannel.getState()); 556 557 // First address fails 558 assertNull(internalSubchannel.obtainActiveTransport()); 559 assertExactCallbackInvokes("onStateChange:CONNECTING"); 560 verify(mockTransportFactory).newClientTransport(addr1, createClientTransportOptions()); 561 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 562 assertEquals(CONNECTING, internalSubchannel.getState()); 563 564 // Second address connects 565 verify(mockTransportFactory).newClientTransport(addr2, createClientTransportOptions()); 566 transports.peek().listener.transportReady(); 567 assertExactCallbackInvokes("onStateChange:READY"); 568 assertEquals(READY, internalSubchannel.getState()); 569 570 // Update addresses 571 internalSubchannel.updateAddresses( 572 Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4)))); 573 assertExactCallbackInvokes("onStateChange:IDLE"); 574 assertEquals(IDLE, internalSubchannel.getState()); 575 verify(transports.peek().transport).shutdown(any(Status.class)); 576 577 // And new addresses chosen when re-connecting 578 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 579 assertNoCallbackInvoke(); 580 assertEquals(IDLE, internalSubchannel.getState()); 581 582 assertNull(internalSubchannel.obtainActiveTransport()); 583 assertEquals(0, fakeClock.numPendingTasks()); 584 verify(mockTransportFactory).newClientTransport(addr3, createClientTransportOptions()); 585 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 586 verify(mockTransportFactory).newClientTransport(addr4, createClientTransportOptions()); 587 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 588 verifyNoMoreInteractions(mockTransportFactory); 589 590 fakeClock.forwardNanos(10); // Drain retry, but don't care about result 591 } 592 updateAddresses_disjoint_connecting()593 @Test public void updateAddresses_disjoint_connecting() { 594 SocketAddress addr1 = mock(SocketAddress.class); 595 SocketAddress addr2 = mock(SocketAddress.class); 596 SocketAddress addr3 = mock(SocketAddress.class); 597 SocketAddress addr4 = mock(SocketAddress.class); 598 createInternalSubchannel(addr1, addr2); 599 assertEquals(IDLE, internalSubchannel.getState()); 600 601 // First address fails 602 assertNull(internalSubchannel.obtainActiveTransport()); 603 assertExactCallbackInvokes("onStateChange:CONNECTING"); 604 verify(mockTransportFactory).newClientTransport(addr1, createClientTransportOptions()); 605 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 606 assertEquals(CONNECTING, internalSubchannel.getState()); 607 608 // Second address connecting 609 verify(mockTransportFactory).newClientTransport(addr2, createClientTransportOptions()); 610 assertNoCallbackInvoke(); 611 assertEquals(CONNECTING, internalSubchannel.getState()); 612 613 // Update addresses 614 internalSubchannel.updateAddresses( 615 Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4)))); 616 assertNoCallbackInvoke(); 617 assertEquals(CONNECTING, internalSubchannel.getState()); 618 619 // And new addresses chosen immediately 620 verify(transports.poll().transport).shutdown(any(Status.class)); 621 assertNoCallbackInvoke(); 622 assertEquals(CONNECTING, internalSubchannel.getState()); 623 624 assertNull(internalSubchannel.obtainActiveTransport()); 625 assertEquals(0, fakeClock.numPendingTasks()); 626 verify(mockTransportFactory).newClientTransport(addr3, createClientTransportOptions()); 627 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 628 verify(mockTransportFactory).newClientTransport(addr4, createClientTransportOptions()); 629 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 630 verifyNoMoreInteractions(mockTransportFactory); 631 632 fakeClock.forwardNanos(10); // Drain retry, but don't care about result 633 } 634 635 @Test connectIsLazy()636 public void connectIsLazy() { 637 SocketAddress addr = mock(SocketAddress.class); 638 createInternalSubchannel(addr); 639 640 // Invocation counters 641 int transportsCreated = 0; 642 643 // Won't connect until requested 644 verify(mockTransportFactory, times(transportsCreated)) 645 .newClientTransport(addr, createClientTransportOptions()); 646 647 // First attempt 648 internalSubchannel.obtainActiveTransport(); 649 assertExactCallbackInvokes("onStateChange:CONNECTING"); 650 verify(mockTransportFactory, times(++transportsCreated)) 651 .newClientTransport(addr, createClientTransportOptions()); 652 653 // Fail this one 654 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 655 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 656 657 // Will always reconnect after back-off 658 fakeClock.forwardNanos(10); 659 assertExactCallbackInvokes("onStateChange:CONNECTING"); 660 verify(mockTransportFactory, times(++transportsCreated)) 661 .newClientTransport(addr, createClientTransportOptions()); 662 663 // Make this one proceed 664 transports.peek().listener.transportReady(); 665 assertExactCallbackInvokes("onStateChange:READY"); 666 // Then go-away 667 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 668 assertExactCallbackInvokes("onStateChange:IDLE"); 669 670 // No scheduled tasks that would ever try to reconnect ... 671 assertEquals(0, fakeClock.numPendingTasks()); 672 assertEquals(0, fakeExecutor.numPendingTasks()); 673 674 // ... until it's requested. 675 internalSubchannel.obtainActiveTransport(); 676 assertExactCallbackInvokes("onStateChange:CONNECTING"); 677 verify(mockTransportFactory, times(++transportsCreated)) 678 .newClientTransport(addr, createClientTransportOptions()); 679 } 680 681 @Test shutdownWhenReady()682 public void shutdownWhenReady() throws Exception { 683 SocketAddress addr = mock(SocketAddress.class); 684 createInternalSubchannel(addr); 685 686 internalSubchannel.obtainActiveTransport(); 687 MockClientTransportInfo transportInfo = transports.poll(); 688 transportInfo.listener.transportReady(); 689 assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); 690 691 internalSubchannel.shutdown(SHUTDOWN_REASON); 692 verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON)); 693 assertExactCallbackInvokes("onStateChange:SHUTDOWN"); 694 695 transportInfo.listener.transportTerminated(); 696 assertExactCallbackInvokes("onTerminated"); 697 verify(transportInfo.transport, never()).shutdownNow(any(Status.class)); 698 } 699 700 @Test shutdownBeforeTransportCreated()701 public void shutdownBeforeTransportCreated() throws Exception { 702 SocketAddress addr = mock(SocketAddress.class); 703 createInternalSubchannel(addr); 704 705 // First transport is created immediately 706 internalSubchannel.obtainActiveTransport(); 707 assertExactCallbackInvokes("onStateChange:CONNECTING"); 708 verify(mockTransportFactory).newClientTransport(addr, createClientTransportOptions()); 709 710 // Fail this one 711 MockClientTransportInfo transportInfo = transports.poll(); 712 transportInfo.listener.transportShutdown(Status.UNAVAILABLE); 713 transportInfo.listener.transportTerminated(); 714 715 // Entering TRANSIENT_FAILURE, waiting for back-off 716 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 717 718 // Save the reconnectTask before shutting down 719 FakeClock.ScheduledTask reconnectTask = null; 720 for (FakeClock.ScheduledTask task : fakeClock.getPendingTasks()) { 721 if (task.command.toString().contains("EndOfCurrentBackoff")) { 722 assertNull("There shouldn't be more than one reconnectTask", reconnectTask); 723 assertFalse(task.isDone()); 724 reconnectTask = task; 725 } 726 } 727 assertNotNull("There should be at least one reconnectTask", reconnectTask); 728 729 // Shut down InternalSubchannel before the transport is created. 730 internalSubchannel.shutdown(SHUTDOWN_REASON); 731 assertTrue(reconnectTask.isCancelled()); 732 // InternalSubchannel terminated promptly. 733 assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated"); 734 735 // Simulate a race between reconnectTask cancellation and execution -- the task runs anyway. 736 // This should not lead to the creation of a new transport. 737 reconnectTask.command.run(); 738 739 // Futher call to obtainActiveTransport() is no-op. 740 assertNull(internalSubchannel.obtainActiveTransport()); 741 assertEquals(SHUTDOWN, internalSubchannel.getState()); 742 assertNoCallbackInvoke(); 743 744 // No more transports will be created. 745 fakeClock.forwardNanos(10000); 746 assertEquals(SHUTDOWN, internalSubchannel.getState()); 747 verifyNoMoreInteractions(mockTransportFactory); 748 assertEquals(0, transports.size()); 749 assertNoCallbackInvoke(); 750 } 751 752 @Test shutdownBeforeTransportReady()753 public void shutdownBeforeTransportReady() throws Exception { 754 SocketAddress addr = mock(SocketAddress.class); 755 createInternalSubchannel(addr); 756 757 internalSubchannel.obtainActiveTransport(); 758 assertExactCallbackInvokes("onStateChange:CONNECTING"); 759 MockClientTransportInfo transportInfo = transports.poll(); 760 761 // Shutdown the InternalSubchannel before the pending transport is ready 762 assertNull(internalSubchannel.obtainActiveTransport()); 763 internalSubchannel.shutdown(SHUTDOWN_REASON); 764 assertExactCallbackInvokes("onStateChange:SHUTDOWN"); 765 766 // The transport should've been shut down even though it's not the active transport yet. 767 verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON)); 768 transportInfo.listener.transportShutdown(Status.UNAVAILABLE); 769 assertNoCallbackInvoke(); 770 transportInfo.listener.transportTerminated(); 771 assertExactCallbackInvokes("onTerminated"); 772 assertEquals(SHUTDOWN, internalSubchannel.getState()); 773 } 774 775 @Test shutdownNow()776 public void shutdownNow() throws Exception { 777 SocketAddress addr = mock(SocketAddress.class); 778 createInternalSubchannel(addr); 779 780 internalSubchannel.obtainActiveTransport(); 781 MockClientTransportInfo t1 = transports.poll(); 782 t1.listener.transportReady(); 783 assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); 784 t1.listener.transportShutdown(Status.UNAVAILABLE); 785 assertExactCallbackInvokes("onStateChange:IDLE"); 786 787 internalSubchannel.obtainActiveTransport(); 788 assertExactCallbackInvokes("onStateChange:CONNECTING"); 789 MockClientTransportInfo t2 = transports.poll(); 790 791 Status status = Status.UNAVAILABLE.withDescription("Requested"); 792 internalSubchannel.shutdownNow(status); 793 794 verify(t1.transport).shutdownNow(same(status)); 795 verify(t2.transport).shutdownNow(same(status)); 796 assertExactCallbackInvokes("onStateChange:SHUTDOWN"); 797 } 798 799 @Test obtainTransportAfterShutdown()800 public void obtainTransportAfterShutdown() throws Exception { 801 SocketAddress addr = mock(SocketAddress.class); 802 createInternalSubchannel(addr); 803 804 internalSubchannel.shutdown(SHUTDOWN_REASON); 805 assertExactCallbackInvokes("onStateChange:SHUTDOWN", "onTerminated"); 806 assertEquals(SHUTDOWN, internalSubchannel.getState()); 807 assertNull(internalSubchannel.obtainActiveTransport()); 808 verify(mockTransportFactory, times(0)) 809 .newClientTransport(addr, createClientTransportOptions()); 810 assertNoCallbackInvoke(); 811 assertEquals(SHUTDOWN, internalSubchannel.getState()); 812 } 813 814 @Test logId()815 public void logId() { 816 createInternalSubchannel(mock(SocketAddress.class)); 817 818 assertNotNull(internalSubchannel.getLogId()); 819 } 820 821 @Test inUseState()822 public void inUseState() { 823 SocketAddress addr = mock(SocketAddress.class); 824 createInternalSubchannel(addr); 825 826 internalSubchannel.obtainActiveTransport(); 827 MockClientTransportInfo t0 = transports.poll(); 828 t0.listener.transportReady(); 829 assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); 830 t0.listener.transportInUse(true); 831 assertExactCallbackInvokes("onInUse"); 832 833 t0.listener.transportInUse(false); 834 assertExactCallbackInvokes("onNotInUse"); 835 836 t0.listener.transportInUse(true); 837 assertExactCallbackInvokes("onInUse"); 838 t0.listener.transportShutdown(Status.UNAVAILABLE); 839 assertExactCallbackInvokes("onStateChange:IDLE"); 840 841 assertNull(internalSubchannel.obtainActiveTransport()); 842 MockClientTransportInfo t1 = transports.poll(); 843 t1.listener.transportReady(); 844 assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); 845 t1.listener.transportInUse(true); 846 // InternalSubchannel is already in-use, thus doesn't call the callback 847 assertNoCallbackInvoke(); 848 849 t1.listener.transportInUse(false); 850 // t0 is still in-use 851 assertNoCallbackInvoke(); 852 853 t0.listener.transportInUse(false); 854 assertExactCallbackInvokes("onNotInUse"); 855 } 856 857 @Test transportTerminateWithoutExitingInUse()858 public void transportTerminateWithoutExitingInUse() { 859 // An imperfect transport that terminates without going out of in-use. InternalSubchannel will 860 // clear the in-use bit for it. 861 SocketAddress addr = mock(SocketAddress.class); 862 createInternalSubchannel(addr); 863 864 internalSubchannel.obtainActiveTransport(); 865 MockClientTransportInfo t0 = transports.poll(); 866 t0.listener.transportReady(); 867 assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); 868 t0.listener.transportInUse(true); 869 assertExactCallbackInvokes("onInUse"); 870 871 t0.listener.transportShutdown(Status.UNAVAILABLE); 872 assertExactCallbackInvokes("onStateChange:IDLE"); 873 t0.listener.transportTerminated(); 874 assertExactCallbackInvokes("onNotInUse"); 875 } 876 877 @Test transportStartReturnsRunnable()878 public void transportStartReturnsRunnable() { 879 SocketAddress addr1 = mock(SocketAddress.class); 880 SocketAddress addr2 = mock(SocketAddress.class); 881 createInternalSubchannel(addr1, addr2); 882 final AtomicInteger runnableInvokes = new AtomicInteger(0); 883 Runnable startRunnable = new Runnable() { 884 @Override 885 public void run() { 886 runnableInvokes.incrementAndGet(); 887 } 888 }; 889 transports = TestUtils.captureTransports(mockTransportFactory, startRunnable); 890 891 assertEquals(0, runnableInvokes.get()); 892 internalSubchannel.obtainActiveTransport(); 893 assertEquals(1, runnableInvokes.get()); 894 internalSubchannel.obtainActiveTransport(); 895 assertEquals(1, runnableInvokes.get()); 896 897 MockClientTransportInfo t0 = transports.poll(); 898 t0.listener.transportShutdown(Status.UNAVAILABLE); 899 assertEquals(2, runnableInvokes.get()); 900 901 // 2nd address: reconnect immediatly 902 MockClientTransportInfo t1 = transports.poll(); 903 t1.listener.transportShutdown(Status.UNAVAILABLE); 904 905 // Addresses exhausted, waiting for back-off. 906 assertEquals(2, runnableInvokes.get()); 907 // Run out the back-off period 908 fakeClock.forwardNanos(10); 909 assertEquals(3, runnableInvokes.get()); 910 911 // This test doesn't care about scheduled InternalSubchannel callbacks. Clear it up so that 912 // noMorePendingTasks() won't fail. 913 fakeExecutor.runDueTasks(); 914 assertEquals(3, runnableInvokes.get()); 915 } 916 917 @Test resetConnectBackoff()918 public void resetConnectBackoff() throws Exception { 919 SocketAddress addr = mock(SocketAddress.class); 920 createInternalSubchannel(addr); 921 922 // Move into TRANSIENT_FAILURE to schedule reconnect 923 internalSubchannel.obtainActiveTransport(); 924 assertExactCallbackInvokes("onStateChange:CONNECTING"); 925 verify(mockTransportFactory).newClientTransport(addr, createClientTransportOptions()); 926 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 927 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 928 929 // Save the reconnectTask 930 FakeClock.ScheduledTask reconnectTask = null; 931 for (FakeClock.ScheduledTask task : fakeClock.getPendingTasks()) { 932 if (task.command.toString().contains("EndOfCurrentBackoff")) { 933 assertNull("There shouldn't be more than one reconnectTask", reconnectTask); 934 assertFalse(task.isDone()); 935 reconnectTask = task; 936 } 937 } 938 assertNotNull("There should be at least one reconnectTask", reconnectTask); 939 940 internalSubchannel.resetConnectBackoff(); 941 942 verify(mockTransportFactory, times(2)) 943 .newClientTransport(addr, createClientTransportOptions()); 944 assertExactCallbackInvokes("onStateChange:CONNECTING"); 945 assertTrue(reconnectTask.isCancelled()); 946 947 // Simulate a race between cancel and the task scheduler. Should be a no-op. 948 reconnectTask.command.run(); 949 assertNoCallbackInvoke(); 950 verify(mockTransportFactory, times(2)) 951 .newClientTransport(addr, createClientTransportOptions()); 952 verify(mockBackoffPolicyProvider, times(1)).get(); 953 954 // Fail the reconnect attempt to verify that a fresh reconnect policy is generated after 955 // invoking resetConnectBackoff() 956 transports.poll().listener.transportShutdown(Status.UNAVAILABLE); 957 assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); 958 verify(mockBackoffPolicyProvider, times(2)).get(); 959 fakeClock.forwardNanos(10); 960 assertExactCallbackInvokes("onStateChange:CONNECTING"); 961 assertEquals(CONNECTING, internalSubchannel.getState()); 962 } 963 964 @Test resetConnectBackoff_noopOnIdleTransport()965 public void resetConnectBackoff_noopOnIdleTransport() throws Exception { 966 SocketAddress addr = mock(SocketAddress.class); 967 createInternalSubchannel(addr); 968 assertEquals(IDLE, internalSubchannel.getState()); 969 970 internalSubchannel.resetConnectBackoff(); 971 972 assertNoCallbackInvoke(); 973 } 974 975 @Test channelzMembership()976 public void channelzMembership() throws Exception { 977 SocketAddress addr1 = mock(SocketAddress.class); 978 createInternalSubchannel(addr1); 979 internalSubchannel.obtainActiveTransport(); 980 981 MockClientTransportInfo t0 = transports.poll(); 982 assertTrue(channelz.containsClientSocket(t0.transport.getLogId())); 983 t0.listener.transportTerminated(); 984 assertFalse(channelz.containsClientSocket(t0.transport.getLogId())); 985 } 986 987 @Test channelzStatContainsTransport()988 public void channelzStatContainsTransport() throws Exception { 989 SocketAddress addr = new SocketAddress() {}; 990 assertThat(transports).isEmpty(); 991 createInternalSubchannel(addr); 992 internalSubchannel.obtainActiveTransport(); 993 994 InternalWithLogId registeredTransport 995 = Iterables.getOnlyElement(internalSubchannel.getStats().get().sockets); 996 MockClientTransportInfo actualTransport = Iterables.getOnlyElement(transports); 997 assertEquals(actualTransport.transport.getLogId(), registeredTransport.getLogId()); 998 } 999 index_looping()1000 @Test public void index_looping() { 1001 Attributes.Key<String> key = Attributes.Key.create("some-key"); 1002 Attributes attr1 = Attributes.newBuilder().set(key, "1").build(); 1003 Attributes attr2 = Attributes.newBuilder().set(key, "2").build(); 1004 Attributes attr3 = Attributes.newBuilder().set(key, "3").build(); 1005 SocketAddress addr1 = new FakeSocketAddress(); 1006 SocketAddress addr2 = new FakeSocketAddress(); 1007 SocketAddress addr3 = new FakeSocketAddress(); 1008 SocketAddress addr4 = new FakeSocketAddress(); 1009 SocketAddress addr5 = new FakeSocketAddress(); 1010 Index index = new Index(Arrays.asList( 1011 new EquivalentAddressGroup(Arrays.asList(addr1, addr2), attr1), 1012 new EquivalentAddressGroup(Arrays.asList(addr3), attr2), 1013 new EquivalentAddressGroup(Arrays.asList(addr4, addr5), attr3))); 1014 assertThat(index.getCurrentAddress()).isSameAs(addr1); 1015 assertThat(index.getCurrentEagAttributes()).isSameAs(attr1); 1016 assertThat(index.isAtBeginning()).isTrue(); 1017 assertThat(index.isValid()).isTrue(); 1018 1019 index.increment(); 1020 assertThat(index.getCurrentAddress()).isSameAs(addr2); 1021 assertThat(index.getCurrentEagAttributes()).isSameAs(attr1); 1022 assertThat(index.isAtBeginning()).isFalse(); 1023 assertThat(index.isValid()).isTrue(); 1024 1025 index.increment(); 1026 assertThat(index.getCurrentAddress()).isSameAs(addr3); 1027 assertThat(index.getCurrentEagAttributes()).isSameAs(attr2); 1028 assertThat(index.isAtBeginning()).isFalse(); 1029 assertThat(index.isValid()).isTrue(); 1030 1031 index.increment(); 1032 assertThat(index.getCurrentAddress()).isSameAs(addr4); 1033 assertThat(index.getCurrentEagAttributes()).isSameAs(attr3); 1034 assertThat(index.isAtBeginning()).isFalse(); 1035 assertThat(index.isValid()).isTrue(); 1036 1037 index.increment(); 1038 assertThat(index.getCurrentAddress()).isSameAs(addr5); 1039 assertThat(index.getCurrentEagAttributes()).isSameAs(attr3); 1040 assertThat(index.isAtBeginning()).isFalse(); 1041 assertThat(index.isValid()).isTrue(); 1042 1043 index.increment(); 1044 assertThat(index.isAtBeginning()).isFalse(); 1045 assertThat(index.isValid()).isFalse(); 1046 1047 index.reset(); 1048 assertThat(index.getCurrentAddress()).isSameAs(addr1); 1049 assertThat(index.getCurrentEagAttributes()).isSameAs(attr1); 1050 assertThat(index.isAtBeginning()).isTrue(); 1051 assertThat(index.isValid()).isTrue(); 1052 1053 // We want to make sure both groupIndex and addressIndex are reset 1054 index.increment(); 1055 index.increment(); 1056 index.increment(); 1057 index.increment(); 1058 assertThat(index.getCurrentAddress()).isSameAs(addr5); 1059 assertThat(index.getCurrentEagAttributes()).isSameAs(attr3); 1060 index.reset(); 1061 assertThat(index.getCurrentAddress()).isSameAs(addr1); 1062 assertThat(index.getCurrentEagAttributes()).isSameAs(attr1); 1063 } 1064 index_updateGroups_resets()1065 @Test public void index_updateGroups_resets() { 1066 SocketAddress addr1 = new FakeSocketAddress(); 1067 SocketAddress addr2 = new FakeSocketAddress(); 1068 SocketAddress addr3 = new FakeSocketAddress(); 1069 Index index = new Index(Arrays.asList( 1070 new EquivalentAddressGroup(Arrays.asList(addr1)), 1071 new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); 1072 index.increment(); 1073 index.increment(); 1074 // We want to make sure both groupIndex and addressIndex are reset 1075 index.updateGroups(Arrays.asList( 1076 new EquivalentAddressGroup(Arrays.asList(addr1)), 1077 new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); 1078 assertThat(index.getCurrentAddress()).isSameAs(addr1); 1079 } 1080 index_seekTo()1081 @Test public void index_seekTo() { 1082 SocketAddress addr1 = new FakeSocketAddress(); 1083 SocketAddress addr2 = new FakeSocketAddress(); 1084 SocketAddress addr3 = new FakeSocketAddress(); 1085 Index index = new Index(Arrays.asList( 1086 new EquivalentAddressGroup(Arrays.asList(addr1, addr2)), 1087 new EquivalentAddressGroup(Arrays.asList(addr3)))); 1088 assertThat(index.seekTo(addr3)).isTrue(); 1089 assertThat(index.getCurrentAddress()).isSameAs(addr3); 1090 assertThat(index.seekTo(addr1)).isTrue(); 1091 assertThat(index.getCurrentAddress()).isSameAs(addr1); 1092 assertThat(index.seekTo(addr2)).isTrue(); 1093 assertThat(index.getCurrentAddress()).isSameAs(addr2); 1094 index.seekTo(new FakeSocketAddress()); 1095 // Failed seekTo doesn't change the index 1096 assertThat(index.getCurrentAddress()).isSameAs(addr2); 1097 } 1098 1099 /** Create ClientTransportOptions. Should not be reused if it may be mutated. */ createClientTransportOptions()1100 private ClientTransportFactory.ClientTransportOptions createClientTransportOptions() { 1101 return new ClientTransportFactory.ClientTransportOptions() 1102 .setAuthority(AUTHORITY) 1103 .setUserAgent(USER_AGENT); 1104 } 1105 createInternalSubchannel(SocketAddress .... addrs)1106 private void createInternalSubchannel(SocketAddress ... addrs) { 1107 createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addrs))); 1108 } 1109 createInternalSubchannel(EquivalentAddressGroup .... addrs)1110 private void createInternalSubchannel(EquivalentAddressGroup ... addrs) { 1111 List<EquivalentAddressGroup> addressGroups = Arrays.asList(addrs); 1112 internalSubchannel = new InternalSubchannel(addressGroups, AUTHORITY, USER_AGENT, 1113 mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(), 1114 fakeClock.getStopwatchSupplier(), channelExecutor, mockInternalSubchannelCallback, 1115 channelz, CallTracer.getDefaultFactory().create(), null, 1116 new TimeProvider() { 1117 @Override 1118 public long currentTimeNanos() { 1119 return fakeClock.getTicker().read(); 1120 } 1121 }); 1122 } 1123 assertNoCallbackInvoke()1124 private void assertNoCallbackInvoke() { 1125 while (fakeExecutor.runDueTasks() > 0) {} 1126 assertEquals(0, callbackInvokes.size()); 1127 } 1128 assertExactCallbackInvokes(String .... expectedInvokes)1129 private void assertExactCallbackInvokes(String ... expectedInvokes) { 1130 assertEquals(0, channelExecutor.numPendingTasks()); 1131 assertEquals(Arrays.asList(expectedInvokes), callbackInvokes); 1132 callbackInvokes.clear(); 1133 } 1134 1135 private static class FakeSocketAddress extends SocketAddress {} 1136 } 1137