1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkState; 20 import static com.google.common.truth.Truth.assertThat; 21 import static io.grpc.ConnectivityState.CONNECTING; 22 import static io.grpc.ConnectivityState.IDLE; 23 import static io.grpc.ConnectivityState.READY; 24 import static io.grpc.ConnectivityState.SHUTDOWN; 25 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 26 import static junit.framework.TestCase.assertNotSame; 27 import static org.junit.Assert.assertEquals; 28 import static org.junit.Assert.assertFalse; 29 import static org.junit.Assert.assertNotEquals; 30 import static org.junit.Assert.assertNotNull; 31 import static org.junit.Assert.assertNull; 32 import static org.junit.Assert.assertSame; 33 import static org.junit.Assert.assertTrue; 34 import static org.mockito.Matchers.any; 35 import static org.mockito.Matchers.anyObject; 36 import static org.mockito.Matchers.eq; 37 import static org.mockito.Matchers.same; 38 import static org.mockito.Mockito.atLeast; 39 import static org.mockito.Mockito.doAnswer; 40 import static org.mockito.Mockito.doThrow; 41 import static org.mockito.Mockito.inOrder; 42 import static org.mockito.Mockito.mock; 43 import static org.mockito.Mockito.never; 44 import static org.mockito.Mockito.times; 45 import static org.mockito.Mockito.verify; 46 import static org.mockito.Mockito.verifyNoMoreInteractions; 47 import static org.mockito.Mockito.verifyZeroInteractions; 48 import static org.mockito.Mockito.when; 49 50 import com.google.common.base.Throwables; 51 import com.google.common.collect.ImmutableList; 52 import com.google.common.collect.ImmutableMap; 53 import com.google.common.collect.Iterables; 54 import com.google.common.util.concurrent.ListenableFuture; 55 import com.google.common.util.concurrent.MoreExecutors; 56 import com.google.common.util.concurrent.SettableFuture; 57 import io.grpc.Attributes; 58 import io.grpc.BinaryLog; 59 import io.grpc.CallCredentials; 60 import io.grpc.CallOptions; 61 import io.grpc.Channel; 62 import io.grpc.ClientCall; 63 import io.grpc.ClientInterceptor; 64 import io.grpc.ClientInterceptors; 65 import io.grpc.ClientStreamTracer; 66 import io.grpc.ConnectivityState; 67 import io.grpc.ConnectivityStateInfo; 68 import io.grpc.Context; 69 import io.grpc.EquivalentAddressGroup; 70 import io.grpc.IntegerMarshaller; 71 import io.grpc.InternalChannelz; 72 import io.grpc.InternalChannelz.ChannelStats; 73 import io.grpc.InternalChannelz.ChannelTrace; 74 import io.grpc.InternalInstrumented; 75 import io.grpc.LoadBalancer; 76 import io.grpc.LoadBalancer.Helper; 77 import io.grpc.LoadBalancer.PickResult; 78 import io.grpc.LoadBalancer.PickSubchannelArgs; 79 import io.grpc.LoadBalancer.Subchannel; 80 import io.grpc.LoadBalancer.SubchannelPicker; 81 import io.grpc.ManagedChannel; 82 import io.grpc.Metadata; 83 import io.grpc.MethodDescriptor; 84 import io.grpc.MethodDescriptor.MethodType; 85 import io.grpc.NameResolver; 86 import io.grpc.SecurityLevel; 87 import io.grpc.ServerMethodDefinition; 88 import io.grpc.Status; 89 import io.grpc.StringMarshaller; 90 import io.grpc.internal.ClientTransportFactory.ClientTransportOptions; 91 import io.grpc.internal.TestUtils.MockClientTransportInfo; 92 import io.grpc.stub.ClientCalls; 93 import io.grpc.testing.TestMethodDescriptors; 94 import java.io.IOException; 95 import java.net.SocketAddress; 96 import java.net.URI; 97 import java.util.ArrayList; 98 import java.util.Arrays; 99 import java.util.Collections; 100 import java.util.HashMap; 101 import java.util.LinkedList; 102 import java.util.List; 103 import java.util.Map; 104 import java.util.Random; 105 import java.util.concurrent.BlockingQueue; 106 import java.util.concurrent.ExecutionException; 107 import java.util.concurrent.Executor; 108 import java.util.concurrent.TimeUnit; 109 import java.util.concurrent.atomic.AtomicBoolean; 110 import java.util.concurrent.atomic.AtomicLong; 111 import java.util.concurrent.atomic.AtomicReference; 112 import javax.annotation.Nullable; 113 import org.junit.After; 114 import org.junit.Assert; 115 import org.junit.Assume; 116 import org.junit.Before; 117 import org.junit.Rule; 118 import org.junit.Test; 119 import org.junit.rules.ExpectedException; 120 import org.junit.runner.RunWith; 121 import org.junit.runners.JUnit4; 122 import org.mockito.ArgumentCaptor; 123 import org.mockito.Captor; 124 import org.mockito.InOrder; 125 import org.mockito.Matchers; 126 import org.mockito.Mock; 127 import org.mockito.MockitoAnnotations; 128 import org.mockito.invocation.InvocationOnMock; 129 import org.mockito.stubbing.Answer; 130 131 /** Unit tests for {@link ManagedChannelImpl}. */ 132 @RunWith(JUnit4.class) 133 public class ManagedChannelImplTest { 134 private static final Attributes NAME_RESOLVER_PARAMS = 135 Attributes.newBuilder().set(NameResolver.Factory.PARAMS_DEFAULT_PORT, 447).build(); 136 137 private static final MethodDescriptor<String, Integer> method = 138 MethodDescriptor.<String, Integer>newBuilder() 139 .setType(MethodType.UNKNOWN) 140 .setFullMethodName("service/method") 141 .setRequestMarshaller(new StringMarshaller()) 142 .setResponseMarshaller(new IntegerMarshaller()) 143 .build(); 144 private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY = 145 Attributes.Key.create("subchannel-attr-key"); 146 private static final long RECONNECT_BACKOFF_INTERVAL_NANOS = 10; 147 private static final String SERVICE_NAME = "fake.example.com"; 148 private static final String AUTHORITY = SERVICE_NAME; 149 private static final String USER_AGENT = "userAgent"; 150 private static final ClientTransportOptions clientTransportOptions = 151 new ClientTransportOptions() 152 .setAuthority(AUTHORITY) 153 .setUserAgent(USER_AGENT); 154 private static final String TARGET = "fake://" + SERVICE_NAME; 155 private URI expectedUri; 156 private final SocketAddress socketAddress = new SocketAddress() {}; 157 private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress); 158 private final FakeClock timer = new FakeClock(); 159 private final FakeClock executor = new FakeClock(); 160 private final FakeClock oobExecutor = new FakeClock(); 161 private static final FakeClock.TaskFilter NAME_RESOLVER_REFRESH_TASK_FILTER = 162 new FakeClock.TaskFilter() { 163 @Override 164 public boolean shouldAccept(Runnable command) { 165 return command instanceof ManagedChannelImpl.NameResolverRefresh; 166 } 167 }; 168 169 private final InternalChannelz channelz = new InternalChannelz(); 170 171 @Rule public final ExpectedException thrown = ExpectedException.none(); 172 173 private ManagedChannelImpl channel; 174 private Helper helper; 175 @Captor 176 private ArgumentCaptor<Status> statusCaptor; 177 @Captor 178 private ArgumentCaptor<CallOptions> callOptionsCaptor; 179 @Mock 180 private LoadBalancer.Factory mockLoadBalancerFactory; 181 @Mock 182 private LoadBalancer mockLoadBalancer; 183 184 @Captor 185 private ArgumentCaptor<ConnectivityStateInfo> stateInfoCaptor; 186 @Mock 187 private SubchannelPicker mockPicker; 188 @Mock 189 private ClientTransportFactory mockTransportFactory; 190 @Mock 191 private ClientCall.Listener<Integer> mockCallListener; 192 @Mock 193 private ClientCall.Listener<Integer> mockCallListener2; 194 @Mock 195 private ClientCall.Listener<Integer> mockCallListener3; 196 @Mock 197 private ClientCall.Listener<Integer> mockCallListener4; 198 @Mock 199 private ClientCall.Listener<Integer> mockCallListener5; 200 @Mock 201 private ObjectPool<Executor> executorPool; 202 @Mock 203 private ObjectPool<Executor> oobExecutorPool; 204 @Mock 205 private CallCredentials creds; 206 private ChannelBuilder channelBuilder; 207 private boolean requestConnection = true; 208 private BlockingQueue<MockClientTransportInfo> transports; 209 210 private ArgumentCaptor<ClientStreamListener> streamListenerCaptor = 211 ArgumentCaptor.forClass(ClientStreamListener.class); 212 createChannel(ClientInterceptor... interceptors)213 private void createChannel(ClientInterceptor... interceptors) { 214 checkState(channel == null); 215 TimeProvider fakeClockTimeProvider = new TimeProvider() { 216 @Override 217 public long currentTimeNanos() { 218 return timer.getTicker().read(); 219 } 220 }; 221 222 channel = new ManagedChannelImpl( 223 channelBuilder, mockTransportFactory, new FakeBackoffPolicyProvider(), 224 oobExecutorPool, timer.getStopwatchSupplier(), Arrays.asList(interceptors), 225 fakeClockTimeProvider); 226 227 if (requestConnection) { 228 int numExpectedTasks = 0; 229 230 // Force-exit the initial idle-mode 231 channel.exitIdleMode(); 232 if (channelBuilder.idleTimeoutMillis != ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE) { 233 numExpectedTasks += 1; 234 } 235 236 if (getNameResolverRefresh() != null) { 237 numExpectedTasks += 1; 238 } 239 240 assertEquals(numExpectedTasks, timer.numPendingTasks()); 241 242 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null); 243 verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); 244 helper = helperCaptor.getValue(); 245 } 246 } 247 248 @Before setUp()249 public void setUp() throws Exception { 250 MockitoAnnotations.initMocks(this); 251 expectedUri = new URI(TARGET); 252 when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer); 253 transports = TestUtils.captureTransports(mockTransportFactory); 254 when(mockTransportFactory.getScheduledExecutorService()) 255 .thenReturn(timer.getScheduledExecutorService()); 256 when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); 257 when(oobExecutorPool.getObject()).thenReturn(oobExecutor.getScheduledExecutorService()); 258 259 channelBuilder = new ChannelBuilder() 260 .nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri).build()) 261 .loadBalancerFactory(mockLoadBalancerFactory) 262 .userAgent(USER_AGENT) 263 .idleTimeout(AbstractManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS); 264 channelBuilder.executorPool = executorPool; 265 channelBuilder.binlog = null; 266 channelBuilder.channelz = channelz; 267 } 268 269 @After allPendingTasksAreRun()270 public void allPendingTasksAreRun() throws Exception { 271 // The "never" verifications in the tests only hold up if all due tasks are done. 272 // As for timer, although there may be scheduled tasks in a future time, since we don't test 273 // any time-related behavior in this test suite, we only care the tasks that are due. This 274 // would ignore any time-sensitive tasks, e.g., back-off and the idle timer. 275 assertTrue(timer.getDueTasks() + " should be empty", timer.getDueTasks().isEmpty()); 276 assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks()); 277 if (channel != null) { 278 channel.shutdownNow(); 279 channel = null; 280 } 281 } 282 283 @Test 284 @SuppressWarnings("unchecked") idleModeDisabled()285 public void idleModeDisabled() { 286 channelBuilder.nameResolverFactory( 287 new FakeNameResolverFactory.Builder(expectedUri) 288 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 289 .build()); 290 createChannel(); 291 292 // In this test suite, the channel is always created with idle mode disabled. 293 // No task is scheduled to enter idle mode 294 assertEquals(0, timer.numPendingTasks()); 295 assertEquals(0, executor.numPendingTasks()); 296 } 297 298 @Test immediateDeadlineExceeded()299 public void immediateDeadlineExceeded() { 300 createChannel(); 301 ClientCall<String, Integer> call = 302 channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS)); 303 call.start(mockCallListener, new Metadata()); 304 assertEquals(1, executor.runDueTasks()); 305 306 verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class)); 307 Status status = statusCaptor.getValue(); 308 assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode()); 309 } 310 311 @Test shutdownWithNoTransportsEverCreated()312 public void shutdownWithNoTransportsEverCreated() { 313 channelBuilder.nameResolverFactory( 314 new FakeNameResolverFactory.Builder(expectedUri) 315 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 316 .build()); 317 createChannel(); 318 verify(executorPool).getObject(); 319 verify(executorPool, never()).returnObject(anyObject()); 320 verify(mockTransportFactory).getScheduledExecutorService(); 321 verifyNoMoreInteractions(mockTransportFactory); 322 channel.shutdown(); 323 assertTrue(channel.isShutdown()); 324 assertTrue(channel.isTerminated()); 325 verify(executorPool).returnObject(executor.getScheduledExecutorService()); 326 } 327 328 @Test channelzMembership()329 public void channelzMembership() throws Exception { 330 createChannel(); 331 assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); 332 assertFalse(channelz.containsSubchannel(channel.getLogId())); 333 channel.shutdownNow(); 334 channel.awaitTermination(5, TimeUnit.SECONDS); 335 assertNull(channelz.getRootChannel(channel.getLogId().getId())); 336 assertFalse(channelz.containsSubchannel(channel.getLogId())); 337 } 338 339 @Test channelzMembership_subchannel()340 public void channelzMembership_subchannel() throws Exception { 341 createChannel(); 342 assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); 343 344 AbstractSubchannel subchannel = 345 (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY); 346 // subchannels are not root channels 347 assertNull(channelz.getRootChannel(subchannel.getInternalSubchannel().getLogId().getId())); 348 assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); 349 assertThat(getStats(channel).subchannels) 350 .containsExactly(subchannel.getInternalSubchannel()); 351 352 subchannel.requestConnection(); 353 MockClientTransportInfo transportInfo = transports.poll(); 354 assertNotNull(transportInfo); 355 assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId())); 356 357 // terminate transport 358 transportInfo.listener.transportTerminated(); 359 assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId())); 360 361 // terminate subchannel 362 assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); 363 subchannel.shutdown(); 364 timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); 365 timer.runDueTasks(); 366 assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); 367 assertThat(getStats(channel).subchannels).isEmpty(); 368 369 // channel still appears 370 assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); 371 } 372 373 @Test channelzMembership_oob()374 public void channelzMembership_oob() throws Exception { 375 createChannel(); 376 OobChannel oob = (OobChannel) helper.createOobChannel(addressGroup, AUTHORITY); 377 // oob channels are not root channels 378 assertNull(channelz.getRootChannel(oob.getLogId().getId())); 379 assertTrue(channelz.containsSubchannel(oob.getLogId())); 380 assertThat(getStats(channel).subchannels).containsExactly(oob); 381 assertTrue(channelz.containsSubchannel(oob.getLogId())); 382 383 AbstractSubchannel subchannel = (AbstractSubchannel) oob.getSubchannel(); 384 assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); 385 assertThat(getStats(oob).subchannels) 386 .containsExactly(subchannel.getInternalSubchannel()); 387 assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); 388 389 oob.getSubchannel().requestConnection(); 390 MockClientTransportInfo transportInfo = transports.poll(); 391 assertNotNull(transportInfo); 392 assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId())); 393 394 // terminate transport 395 transportInfo.listener.transportTerminated(); 396 assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId())); 397 398 // terminate oobchannel 399 oob.shutdown(); 400 assertFalse(channelz.containsSubchannel(oob.getLogId())); 401 assertThat(getStats(channel).subchannels).isEmpty(); 402 assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); 403 404 // channel still appears 405 assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); 406 } 407 408 @Test callsAndShutdown()409 public void callsAndShutdown() { 410 subtestCallsAndShutdown(false, false); 411 } 412 413 @Test callsAndShutdownNow()414 public void callsAndShutdownNow() { 415 subtestCallsAndShutdown(true, false); 416 } 417 418 /** Make sure shutdownNow() after shutdown() has an effect. */ 419 @Test callsAndShutdownAndShutdownNow()420 public void callsAndShutdownAndShutdownNow() { 421 subtestCallsAndShutdown(false, true); 422 } 423 subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown)424 private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown) { 425 FakeNameResolverFactory nameResolverFactory = 426 new FakeNameResolverFactory.Builder(expectedUri).build(); 427 channelBuilder.nameResolverFactory(nameResolverFactory); 428 createChannel(); 429 verify(executorPool).getObject(); 430 ClientStream mockStream = mock(ClientStream.class); 431 ClientStream mockStream2 = mock(ClientStream.class); 432 Metadata headers = new Metadata(); 433 Metadata headers2 = new Metadata(); 434 435 // Configure the picker so that first RPC goes to delayed transport, and second RPC goes to 436 // real transport. 437 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 438 subchannel.requestConnection(); 439 verify(mockTransportFactory) 440 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 441 MockClientTransportInfo transportInfo = transports.poll(); 442 ConnectionClientTransport mockTransport = transportInfo.transport; 443 verify(mockTransport).start(any(ManagedClientTransport.Listener.class)); 444 ManagedClientTransport.Listener transportListener = transportInfo.listener; 445 when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT))) 446 .thenReturn(mockStream); 447 when(mockTransport.newStream(same(method), same(headers2), same(CallOptions.DEFAULT))) 448 .thenReturn(mockStream2); 449 transportListener.transportReady(); 450 when(mockPicker.pickSubchannel( 451 new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))).thenReturn( 452 PickResult.withNoResult()); 453 when(mockPicker.pickSubchannel( 454 new PickSubchannelArgsImpl(method, headers2, CallOptions.DEFAULT))).thenReturn( 455 PickResult.withSubchannel(subchannel)); 456 helper.updateBalancingState(READY, mockPicker); 457 458 // First RPC, will be pending 459 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 460 verify(mockTransportFactory) 461 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 462 call.start(mockCallListener, headers); 463 464 verify(mockTransport, never()) 465 .newStream(same(method), same(headers), same(CallOptions.DEFAULT)); 466 467 // Second RPC, will be assigned to the real transport 468 ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT); 469 call2.start(mockCallListener2, headers2); 470 verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT)); 471 verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT)); 472 verify(mockStream2).start(any(ClientStreamListener.class)); 473 474 // Shutdown 475 if (shutdownNow) { 476 channel.shutdownNow(); 477 } else { 478 channel.shutdown(); 479 if (shutdownNowAfterShutdown) { 480 channel.shutdownNow(); 481 shutdownNow = true; 482 } 483 } 484 assertTrue(channel.isShutdown()); 485 assertFalse(channel.isTerminated()); 486 assertEquals(1, nameResolverFactory.resolvers.size()); 487 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); 488 489 // Further calls should fail without going to the transport 490 ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT); 491 call3.start(mockCallListener3, headers2); 492 timer.runDueTasks(); 493 executor.runDueTasks(); 494 495 verify(mockCallListener3).onClose(statusCaptor.capture(), any(Metadata.class)); 496 assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); 497 498 if (shutdownNow) { 499 // LoadBalancer and NameResolver are shut down as soon as delayed transport is terminated. 500 verify(mockLoadBalancer).shutdown(); 501 assertTrue(nameResolverFactory.resolvers.get(0).shutdown); 502 // call should have been aborted by delayed transport 503 executor.runDueTasks(); 504 verify(mockCallListener).onClose(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS), 505 any(Metadata.class)); 506 } else { 507 // LoadBalancer and NameResolver are still running. 508 verify(mockLoadBalancer, never()).shutdown(); 509 assertFalse(nameResolverFactory.resolvers.get(0).shutdown); 510 // call and call2 are still alive, and can still be assigned to a real transport 511 SubchannelPicker picker2 = mock(SubchannelPicker.class); 512 when(picker2.pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))) 513 .thenReturn(PickResult.withSubchannel(subchannel)); 514 helper.updateBalancingState(READY, picker2); 515 executor.runDueTasks(); 516 verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT)); 517 verify(mockStream).start(any(ClientStreamListener.class)); 518 } 519 520 // After call is moved out of delayed transport, LoadBalancer, NameResolver and the transports 521 // will be shutdown. 522 verify(mockLoadBalancer).shutdown(); 523 assertTrue(nameResolverFactory.resolvers.get(0).shutdown); 524 525 if (shutdownNow) { 526 // Channel shutdownNow() all subchannels after shutting down LoadBalancer 527 verify(mockTransport).shutdownNow(ManagedChannelImpl.SHUTDOWN_NOW_STATUS); 528 } else { 529 verify(mockTransport, never()).shutdownNow(any(Status.class)); 530 } 531 // LoadBalancer should shutdown the subchannel 532 subchannel.shutdown(); 533 if (shutdownNow) { 534 verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS)); 535 } else { 536 verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS)); 537 } 538 539 // Killing the remaining real transport will terminate the channel 540 transportListener.transportShutdown(Status.UNAVAILABLE); 541 assertFalse(channel.isTerminated()); 542 verify(executorPool, never()).returnObject(anyObject()); 543 transportListener.transportTerminated(); 544 assertTrue(channel.isTerminated()); 545 verify(executorPool).returnObject(executor.getScheduledExecutorService()); 546 verifyNoMoreInteractions(oobExecutorPool); 547 548 verify(mockTransportFactory) 549 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 550 verify(mockTransportFactory).close(); 551 verify(mockTransport, atLeast(0)).getLogId(); 552 verifyNoMoreInteractions(mockTransport); 553 } 554 555 @Test noMoreCallbackAfterLoadBalancerShutdown()556 public void noMoreCallbackAfterLoadBalancerShutdown() { 557 FakeNameResolverFactory nameResolverFactory = 558 new FakeNameResolverFactory.Builder(expectedUri) 559 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 560 .build(); 561 channelBuilder.nameResolverFactory(nameResolverFactory); 562 Status resolutionError = Status.UNAVAILABLE.withDescription("Resolution failed"); 563 createChannel(); 564 565 FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); 566 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); 567 verify(mockLoadBalancer).handleResolvedAddressGroups( 568 eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY)); 569 570 Subchannel subchannel1 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 571 Subchannel subchannel2 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 572 subchannel1.requestConnection(); 573 subchannel2.requestConnection(); 574 verify(mockTransportFactory, times(2)) 575 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 576 MockClientTransportInfo transportInfo1 = transports.poll(); 577 MockClientTransportInfo transportInfo2 = transports.poll(); 578 579 // LoadBalancer receives all sorts of callbacks 580 transportInfo1.listener.transportReady(); 581 verify(mockLoadBalancer, times(2)) 582 .handleSubchannelState(same(subchannel1), stateInfoCaptor.capture()); 583 assertSame(CONNECTING, stateInfoCaptor.getAllValues().get(0).getState()); 584 assertSame(READY, stateInfoCaptor.getAllValues().get(1).getState()); 585 586 verify(mockLoadBalancer) 587 .handleSubchannelState(same(subchannel2), stateInfoCaptor.capture()); 588 assertSame(CONNECTING, stateInfoCaptor.getValue().getState()); 589 590 resolver.listener.onError(resolutionError); 591 verify(mockLoadBalancer).handleNameResolutionError(resolutionError); 592 593 verifyNoMoreInteractions(mockLoadBalancer); 594 595 channel.shutdown(); 596 verify(mockLoadBalancer).shutdown(); 597 598 // No more callback should be delivered to LoadBalancer after it's shut down 599 transportInfo2.listener.transportReady(); 600 resolver.listener.onError(resolutionError); 601 resolver.resolved(); 602 verifyNoMoreInteractions(mockLoadBalancer); 603 } 604 605 @Test interceptor()606 public void interceptor() throws Exception { 607 final AtomicLong atomic = new AtomicLong(); 608 ClientInterceptor interceptor = new ClientInterceptor() { 609 @Override 610 public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> interceptCall( 611 MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions, 612 Channel next) { 613 atomic.set(1); 614 return next.newCall(method, callOptions); 615 } 616 }; 617 createChannel(interceptor); 618 assertNotNull(channel.newCall(method, CallOptions.DEFAULT)); 619 assertEquals(1, atomic.get()); 620 } 621 622 @Test callOptionsExecutor()623 public void callOptionsExecutor() { 624 Metadata headers = new Metadata(); 625 ClientStream mockStream = mock(ClientStream.class); 626 FakeClock callExecutor = new FakeClock(); 627 createChannel(); 628 629 // Start a call with a call executor 630 CallOptions options = 631 CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService()); 632 ClientCall<String, Integer> call = channel.newCall(method, options); 633 call.start(mockCallListener, headers); 634 635 // Make the transport available 636 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 637 verify(mockTransportFactory, never()) 638 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 639 subchannel.requestConnection(); 640 verify(mockTransportFactory) 641 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 642 MockClientTransportInfo transportInfo = transports.poll(); 643 ConnectionClientTransport mockTransport = transportInfo.transport; 644 ManagedClientTransport.Listener transportListener = transportInfo.listener; 645 when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class))) 646 .thenReturn(mockStream); 647 transportListener.transportReady(); 648 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 649 .thenReturn(PickResult.withSubchannel(subchannel)); 650 assertEquals(0, callExecutor.numPendingTasks()); 651 helper.updateBalancingState(READY, mockPicker); 652 653 // Real streams are started in the call executor if they were previously buffered. 654 assertEquals(1, callExecutor.runDueTasks()); 655 verify(mockTransport).newStream(same(method), same(headers), same(options)); 656 verify(mockStream).start(streamListenerCaptor.capture()); 657 658 // Call listener callbacks are also run in the call executor 659 ClientStreamListener streamListener = streamListenerCaptor.getValue(); 660 Metadata trailers = new Metadata(); 661 assertEquals(0, callExecutor.numPendingTasks()); 662 streamListener.closed(Status.CANCELLED, trailers); 663 verify(mockCallListener, never()).onClose(same(Status.CANCELLED), same(trailers)); 664 assertEquals(1, callExecutor.runDueTasks()); 665 verify(mockCallListener).onClose(same(Status.CANCELLED), same(trailers)); 666 667 668 transportListener.transportShutdown(Status.UNAVAILABLE); 669 transportListener.transportTerminated(); 670 671 // Clean up as much as possible to allow the channel to terminate. 672 subchannel.shutdown(); 673 timer.forwardNanos( 674 TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS)); 675 } 676 677 @Test nameResolutionFailed()678 public void nameResolutionFailed() { 679 Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error")); 680 FakeNameResolverFactory nameResolverFactory = 681 new FakeNameResolverFactory.Builder(expectedUri) 682 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 683 .setError(error) 684 .build(); 685 channelBuilder.nameResolverFactory(nameResolverFactory); 686 // Name resolution is started as soon as channel is created. 687 createChannel(); 688 FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); 689 verify(mockLoadBalancer).handleNameResolutionError(same(error)); 690 assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER)); 691 692 timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1); 693 assertEquals(0, resolver.refreshCalled); 694 695 timer.forwardNanos(1); 696 assertEquals(1, resolver.refreshCalled); 697 verify(mockLoadBalancer, times(2)).handleNameResolutionError(same(error)); 698 699 // Verify an additional name resolution failure does not schedule another timer 700 resolver.refresh(); 701 verify(mockLoadBalancer, times(3)).handleNameResolutionError(same(error)); 702 assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER)); 703 704 // Allow the next refresh attempt to succeed 705 resolver.error = null; 706 707 // For the second attempt, the backoff should occur at RECONNECT_BACKOFF_INTERVAL_NANOS * 2 708 timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS * 2 - 1); 709 assertEquals(2, resolver.refreshCalled); 710 timer.forwardNanos(1); 711 assertEquals(3, resolver.refreshCalled); 712 assertEquals(0, timer.numPendingTasks()); 713 714 // Verify that the successful resolution reset the backoff policy 715 resolver.listener.onError(error); 716 timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1); 717 assertEquals(3, resolver.refreshCalled); 718 timer.forwardNanos(1); 719 assertEquals(4, resolver.refreshCalled); 720 assertEquals(0, timer.numPendingTasks()); 721 } 722 723 @Test nameResolutionFailed_delayedTransportShutdownCancelsBackoff()724 public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() { 725 Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error")); 726 727 FakeNameResolverFactory nameResolverFactory = 728 new FakeNameResolverFactory.Builder(expectedUri).setError(error).build(); 729 channelBuilder.nameResolverFactory(nameResolverFactory); 730 // Name resolution is started as soon as channel is created. 731 createChannel(); 732 verify(mockLoadBalancer).handleNameResolutionError(same(error)); 733 734 FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh(); 735 assertNotNull(nameResolverBackoff); 736 assertFalse(nameResolverBackoff.isCancelled()); 737 738 // Add a pending call to the delayed transport 739 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 740 Metadata headers = new Metadata(); 741 call.start(mockCallListener, headers); 742 743 // The pending call on the delayed transport stops the name resolver backoff from cancelling 744 channel.shutdown(); 745 assertFalse(nameResolverBackoff.isCancelled()); 746 747 // Notify that a subchannel is ready, which drains the delayed transport 748 SubchannelPicker picker = mock(SubchannelPicker.class); 749 Status status = Status.UNAVAILABLE.withDescription("for test"); 750 when(picker.pickSubchannel(any(PickSubchannelArgs.class))) 751 .thenReturn(PickResult.withDrop(status)); 752 helper.updateBalancingState(READY, picker); 753 executor.runDueTasks(); 754 verify(mockCallListener).onClose(same(status), any(Metadata.class)); 755 756 assertTrue(nameResolverBackoff.isCancelled()); 757 } 758 759 @Test nameResolverReturnsEmptySubLists()760 public void nameResolverReturnsEmptySubLists() { 761 String errorDescription = "NameResolver returned an empty list"; 762 763 // Pass a FakeNameResolverFactory with an empty list 764 createChannel(); 765 766 // LoadBalancer received the error 767 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); 768 verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture()); 769 Status status = statusCaptor.getValue(); 770 assertSame(Status.Code.UNAVAILABLE, status.getCode()); 771 assertEquals(errorDescription, status.getDescription()); 772 } 773 774 @Test loadBalancerThrowsInHandleResolvedAddresses()775 public void loadBalancerThrowsInHandleResolvedAddresses() { 776 RuntimeException ex = new RuntimeException("simulated"); 777 // Delay the success of name resolution until allResolved() is called 778 FakeNameResolverFactory nameResolverFactory = 779 new FakeNameResolverFactory.Builder(expectedUri) 780 .setResolvedAtStart(false) 781 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 782 .build(); 783 channelBuilder.nameResolverFactory(nameResolverFactory); 784 createChannel(); 785 786 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); 787 doThrow(ex).when(mockLoadBalancer).handleResolvedAddressGroups( 788 Matchers.<List<EquivalentAddressGroup>>anyObject(), any(Attributes.class)); 789 790 // NameResolver returns addresses. 791 nameResolverFactory.allResolved(); 792 793 // Exception thrown from balancer is caught by ChannelExecutor, making channel enter panic mode. 794 verifyPanicMode(ex); 795 } 796 797 @Test nameResolvedAfterChannelShutdown()798 public void nameResolvedAfterChannelShutdown() { 799 // Delay the success of name resolution until allResolved() is called. 800 FakeNameResolverFactory nameResolverFactory = 801 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build(); 802 channelBuilder.nameResolverFactory(nameResolverFactory); 803 createChannel(); 804 805 channel.shutdown(); 806 807 assertTrue(channel.isShutdown()); 808 assertTrue(channel.isTerminated()); 809 verify(mockLoadBalancer).shutdown(); 810 // Name resolved after the channel is shut down, which is possible if the name resolution takes 811 // time and is not cancellable. The resolved address will be dropped. 812 nameResolverFactory.allResolved(); 813 verifyNoMoreInteractions(mockLoadBalancer); 814 } 815 816 /** 817 * Verify that if the first resolved address points to a server that cannot be connected, the call 818 * will end up with the second address which works. 819 */ 820 @Test firstResolvedServerFailedToConnect()821 public void firstResolvedServerFailedToConnect() throws Exception { 822 final SocketAddress goodAddress = new SocketAddress() { 823 @Override public String toString() { 824 return "goodAddress"; 825 } 826 }; 827 final SocketAddress badAddress = new SocketAddress() { 828 @Override public String toString() { 829 return "badAddress"; 830 } 831 }; 832 InOrder inOrder = inOrder(mockLoadBalancer); 833 834 List<SocketAddress> resolvedAddrs = Arrays.asList(badAddress, goodAddress); 835 FakeNameResolverFactory nameResolverFactory = 836 new FakeNameResolverFactory.Builder(expectedUri) 837 .setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs))) 838 .build(); 839 channelBuilder.nameResolverFactory(nameResolverFactory); 840 createChannel(); 841 842 // Start the call 843 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 844 Metadata headers = new Metadata(); 845 call.start(mockCallListener, headers); 846 executor.runDueTasks(); 847 848 // Simulate name resolution results 849 EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs); 850 inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups( 851 eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY)); 852 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 853 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 854 .thenReturn(PickResult.withSubchannel(subchannel)); 855 subchannel.requestConnection(); 856 inOrder.verify(mockLoadBalancer).handleSubchannelState( 857 same(subchannel), stateInfoCaptor.capture()); 858 assertEquals(CONNECTING, stateInfoCaptor.getValue().getState()); 859 860 // The channel will starts with the first address (badAddress) 861 verify(mockTransportFactory) 862 .newClientTransport(same(badAddress), any(ClientTransportOptions.class)); 863 verify(mockTransportFactory, times(0)) 864 .newClientTransport(same(goodAddress), any(ClientTransportOptions.class)); 865 866 MockClientTransportInfo badTransportInfo = transports.poll(); 867 // Which failed to connect 868 badTransportInfo.listener.transportShutdown(Status.UNAVAILABLE); 869 inOrder.verifyNoMoreInteractions(); 870 871 // The channel then try the second address (goodAddress) 872 verify(mockTransportFactory) 873 .newClientTransport(same(goodAddress), any(ClientTransportOptions.class)); 874 MockClientTransportInfo goodTransportInfo = transports.poll(); 875 when(goodTransportInfo.transport.newStream( 876 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) 877 .thenReturn(mock(ClientStream.class)); 878 879 goodTransportInfo.listener.transportReady(); 880 inOrder.verify(mockLoadBalancer).handleSubchannelState( 881 same(subchannel), stateInfoCaptor.capture()); 882 assertEquals(READY, stateInfoCaptor.getValue().getState()); 883 884 // A typical LoadBalancer will call this once the subchannel becomes READY 885 helper.updateBalancingState(READY, mockPicker); 886 // Delayed transport uses the app executor to create real streams. 887 executor.runDueTasks(); 888 889 verify(goodTransportInfo.transport).newStream(same(method), same(headers), 890 same(CallOptions.DEFAULT)); 891 // The bad transport was never used. 892 verify(badTransportInfo.transport, times(0)).newStream(any(MethodDescriptor.class), 893 any(Metadata.class), any(CallOptions.class)); 894 } 895 896 @Test failFastRpcFailFromErrorFromBalancer()897 public void failFastRpcFailFromErrorFromBalancer() { 898 subtestFailRpcFromBalancer(false, false, true); 899 } 900 901 @Test failFastRpcFailFromDropFromBalancer()902 public void failFastRpcFailFromDropFromBalancer() { 903 subtestFailRpcFromBalancer(false, true, true); 904 } 905 906 @Test waitForReadyRpcImmuneFromErrorFromBalancer()907 public void waitForReadyRpcImmuneFromErrorFromBalancer() { 908 subtestFailRpcFromBalancer(true, false, false); 909 } 910 911 @Test waitForReadyRpcFailFromDropFromBalancer()912 public void waitForReadyRpcFailFromDropFromBalancer() { 913 subtestFailRpcFromBalancer(true, true, true); 914 } 915 subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail)916 private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) { 917 createChannel(); 918 919 // This call will be buffered by the channel, thus involve delayed transport 920 CallOptions callOptions = CallOptions.DEFAULT; 921 if (waitForReady) { 922 callOptions = callOptions.withWaitForReady(); 923 } else { 924 callOptions = callOptions.withoutWaitForReady(); 925 } 926 ClientCall<String, Integer> call1 = channel.newCall(method, callOptions); 927 call1.start(mockCallListener, new Metadata()); 928 929 SubchannelPicker picker = mock(SubchannelPicker.class); 930 Status status = Status.UNAVAILABLE.withDescription("for test"); 931 932 when(picker.pickSubchannel(any(PickSubchannelArgs.class))) 933 .thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status)); 934 helper.updateBalancingState(READY, picker); 935 936 executor.runDueTasks(); 937 if (shouldFail) { 938 verify(mockCallListener).onClose(same(status), any(Metadata.class)); 939 } else { 940 verifyZeroInteractions(mockCallListener); 941 } 942 943 // This call doesn't involve delayed transport 944 ClientCall<String, Integer> call2 = channel.newCall(method, callOptions); 945 call2.start(mockCallListener2, new Metadata()); 946 947 executor.runDueTasks(); 948 if (shouldFail) { 949 verify(mockCallListener2).onClose(same(status), any(Metadata.class)); 950 } else { 951 verifyZeroInteractions(mockCallListener2); 952 } 953 } 954 955 /** 956 * Verify that if all resolved addresses failed to connect, a fail-fast call will fail, while a 957 * wait-for-ready call will still be buffered. 958 */ 959 @Test allServersFailedToConnect()960 public void allServersFailedToConnect() throws Exception { 961 final SocketAddress addr1 = new SocketAddress() { 962 @Override public String toString() { 963 return "addr1"; 964 } 965 }; 966 final SocketAddress addr2 = new SocketAddress() { 967 @Override public String toString() { 968 return "addr2"; 969 } 970 }; 971 InOrder inOrder = inOrder(mockLoadBalancer); 972 973 List<SocketAddress> resolvedAddrs = Arrays.asList(addr1, addr2); 974 975 FakeNameResolverFactory nameResolverFactory = 976 new FakeNameResolverFactory.Builder(expectedUri) 977 .setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs))) 978 .build(); 979 channelBuilder.nameResolverFactory(nameResolverFactory); 980 createChannel(); 981 982 // Start a wait-for-ready call 983 ClientCall<String, Integer> call = 984 channel.newCall(method, CallOptions.DEFAULT.withWaitForReady()); 985 Metadata headers = new Metadata(); 986 call.start(mockCallListener, headers); 987 // ... and a fail-fast call 988 ClientCall<String, Integer> call2 = 989 channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady()); 990 call2.start(mockCallListener2, headers); 991 executor.runDueTasks(); 992 993 // Simulate name resolution results 994 EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs); 995 inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups( 996 eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY)); 997 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 998 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 999 .thenReturn(PickResult.withSubchannel(subchannel)); 1000 subchannel.requestConnection(); 1001 1002 inOrder.verify(mockLoadBalancer).handleSubchannelState( 1003 same(subchannel), stateInfoCaptor.capture()); 1004 assertEquals(CONNECTING, stateInfoCaptor.getValue().getState()); 1005 1006 // Connecting to server1, which will fail 1007 verify(mockTransportFactory) 1008 .newClientTransport(same(addr1), any(ClientTransportOptions.class)); 1009 verify(mockTransportFactory, times(0)) 1010 .newClientTransport(same(addr2), any(ClientTransportOptions.class)); 1011 MockClientTransportInfo transportInfo1 = transports.poll(); 1012 transportInfo1.listener.transportShutdown(Status.UNAVAILABLE); 1013 1014 // Connecting to server2, which will fail too 1015 verify(mockTransportFactory) 1016 .newClientTransport(same(addr2), any(ClientTransportOptions.class)); 1017 MockClientTransportInfo transportInfo2 = transports.poll(); 1018 Status server2Error = Status.UNAVAILABLE.withDescription("Server2 failed to connect"); 1019 transportInfo2.listener.transportShutdown(server2Error); 1020 1021 // ... which makes the subchannel enter TRANSIENT_FAILURE. The last error Status is propagated 1022 // to LoadBalancer. 1023 inOrder.verify(mockLoadBalancer).handleSubchannelState( 1024 same(subchannel), stateInfoCaptor.capture()); 1025 assertEquals(TRANSIENT_FAILURE, stateInfoCaptor.getValue().getState()); 1026 assertSame(server2Error, stateInfoCaptor.getValue().getStatus()); 1027 1028 // A typical LoadBalancer would create a picker with error 1029 SubchannelPicker picker2 = mock(SubchannelPicker.class); 1030 when(picker2.pickSubchannel(any(PickSubchannelArgs.class))) 1031 .thenReturn(PickResult.withError(server2Error)); 1032 helper.updateBalancingState(TRANSIENT_FAILURE, picker2); 1033 executor.runDueTasks(); 1034 1035 // ... which fails the fail-fast call 1036 verify(mockCallListener2).onClose(same(server2Error), any(Metadata.class)); 1037 // ... while the wait-for-ready call stays 1038 verifyNoMoreInteractions(mockCallListener); 1039 // No real stream was ever created 1040 verify(transportInfo1.transport, times(0)) 1041 .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); 1042 verify(transportInfo2.transport, times(0)) 1043 .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); 1044 } 1045 1046 @Test subchannels()1047 public void subchannels() { 1048 createChannel(); 1049 1050 // createSubchannel() always return a new Subchannel 1051 Attributes attrs1 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr1").build(); 1052 Attributes attrs2 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr2").build(); 1053 Subchannel sub1 = helper.createSubchannel(addressGroup, attrs1); 1054 Subchannel sub2 = helper.createSubchannel(addressGroup, attrs2); 1055 assertNotSame(sub1, sub2); 1056 assertNotSame(attrs1, attrs2); 1057 assertSame(attrs1, sub1.getAttributes()); 1058 assertSame(attrs2, sub2.getAttributes()); 1059 assertSame(addressGroup, sub1.getAddresses()); 1060 assertSame(addressGroup, sub2.getAddresses()); 1061 1062 // requestConnection() 1063 verify(mockTransportFactory, never()) 1064 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 1065 sub1.requestConnection(); 1066 verify(mockTransportFactory).newClientTransport(socketAddress, clientTransportOptions); 1067 MockClientTransportInfo transportInfo1 = transports.poll(); 1068 assertNotNull(transportInfo1); 1069 1070 sub2.requestConnection(); 1071 verify(mockTransportFactory, times(2)) 1072 .newClientTransport(socketAddress, clientTransportOptions); 1073 MockClientTransportInfo transportInfo2 = transports.poll(); 1074 assertNotNull(transportInfo2); 1075 1076 sub1.requestConnection(); 1077 sub2.requestConnection(); 1078 verify(mockTransportFactory, times(2)) 1079 .newClientTransport(socketAddress, clientTransportOptions); 1080 1081 // shutdown() has a delay 1082 sub1.shutdown(); 1083 timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS); 1084 sub1.shutdown(); 1085 verify(transportInfo1.transport, never()).shutdown(any(Status.class)); 1086 timer.forwardTime(1, TimeUnit.SECONDS); 1087 verify(transportInfo1.transport).shutdown(same(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_STATUS)); 1088 1089 // ... but not after Channel is terminating 1090 verify(mockLoadBalancer, never()).shutdown(); 1091 channel.shutdown(); 1092 verify(mockLoadBalancer).shutdown(); 1093 verify(transportInfo2.transport, never()).shutdown(any(Status.class)); 1094 1095 sub2.shutdown(); 1096 verify(transportInfo2.transport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS)); 1097 1098 // Cleanup 1099 transportInfo1.listener.transportShutdown(Status.UNAVAILABLE); 1100 transportInfo1.listener.transportTerminated(); 1101 transportInfo2.listener.transportShutdown(Status.UNAVAILABLE); 1102 transportInfo2.listener.transportTerminated(); 1103 timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); 1104 } 1105 1106 @Test subchannelsWhenChannelShutdownNow()1107 public void subchannelsWhenChannelShutdownNow() { 1108 createChannel(); 1109 Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1110 Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1111 sub1.requestConnection(); 1112 sub2.requestConnection(); 1113 1114 assertEquals(2, transports.size()); 1115 MockClientTransportInfo ti1 = transports.poll(); 1116 MockClientTransportInfo ti2 = transports.poll(); 1117 1118 ti1.listener.transportReady(); 1119 ti2.listener.transportReady(); 1120 1121 channel.shutdownNow(); 1122 verify(ti1.transport).shutdownNow(any(Status.class)); 1123 verify(ti2.transport).shutdownNow(any(Status.class)); 1124 1125 ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); 1126 ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); 1127 ti1.listener.transportTerminated(); 1128 1129 assertFalse(channel.isTerminated()); 1130 ti2.listener.transportTerminated(); 1131 assertTrue(channel.isTerminated()); 1132 } 1133 1134 @Test subchannelsNoConnectionShutdown()1135 public void subchannelsNoConnectionShutdown() { 1136 createChannel(); 1137 Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1138 Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1139 1140 channel.shutdown(); 1141 verify(mockLoadBalancer).shutdown(); 1142 sub1.shutdown(); 1143 assertFalse(channel.isTerminated()); 1144 sub2.shutdown(); 1145 assertTrue(channel.isTerminated()); 1146 verify(mockTransportFactory, never()) 1147 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 1148 } 1149 1150 @Test subchannelsNoConnectionShutdownNow()1151 public void subchannelsNoConnectionShutdownNow() { 1152 createChannel(); 1153 helper.createSubchannel(addressGroup, Attributes.EMPTY); 1154 helper.createSubchannel(addressGroup, Attributes.EMPTY); 1155 channel.shutdownNow(); 1156 1157 verify(mockLoadBalancer).shutdown(); 1158 // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels. 1159 // Therefore, channel is terminated without relying on LoadBalancer to shutdown subchannels. 1160 assertTrue(channel.isTerminated()); 1161 verify(mockTransportFactory, never()) 1162 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 1163 } 1164 1165 @Test oobchannels()1166 public void oobchannels() { 1167 createChannel(); 1168 1169 ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority"); 1170 ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority"); 1171 verify(oobExecutorPool, times(2)).getObject(); 1172 1173 assertEquals("oob1authority", oob1.authority()); 1174 assertEquals("oob2authority", oob2.authority()); 1175 1176 // OOB channels create connections lazily. A new call will initiate the connection. 1177 Metadata headers = new Metadata(); 1178 ClientCall<String, Integer> call = oob1.newCall(method, CallOptions.DEFAULT); 1179 call.start(mockCallListener, headers); 1180 verify(mockTransportFactory) 1181 .newClientTransport( 1182 socketAddress, 1183 new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT)); 1184 MockClientTransportInfo transportInfo = transports.poll(); 1185 assertNotNull(transportInfo); 1186 1187 assertEquals(0, oobExecutor.numPendingTasks()); 1188 transportInfo.listener.transportReady(); 1189 assertEquals(1, oobExecutor.runDueTasks()); 1190 verify(transportInfo.transport).newStream(same(method), same(headers), 1191 same(CallOptions.DEFAULT)); 1192 1193 // The transport goes away 1194 transportInfo.listener.transportShutdown(Status.UNAVAILABLE); 1195 transportInfo.listener.transportTerminated(); 1196 1197 // A new call will trigger a new transport 1198 ClientCall<String, Integer> call2 = oob1.newCall(method, CallOptions.DEFAULT); 1199 call2.start(mockCallListener2, headers); 1200 ClientCall<String, Integer> call3 = 1201 oob1.newCall(method, CallOptions.DEFAULT.withWaitForReady()); 1202 call3.start(mockCallListener3, headers); 1203 verify(mockTransportFactory, times(2)).newClientTransport( 1204 socketAddress, 1205 new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT)); 1206 transportInfo = transports.poll(); 1207 assertNotNull(transportInfo); 1208 1209 // This transport fails 1210 Status transportError = Status.UNAVAILABLE.withDescription("Connection refused"); 1211 assertEquals(0, oobExecutor.numPendingTasks()); 1212 transportInfo.listener.transportShutdown(transportError); 1213 assertTrue(oobExecutor.runDueTasks() > 0); 1214 1215 // Fail-fast RPC will fail, while wait-for-ready RPC will still be pending 1216 verify(mockCallListener2).onClose(same(transportError), any(Metadata.class)); 1217 verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class)); 1218 1219 // Shutdown 1220 assertFalse(oob1.isShutdown()); 1221 assertFalse(oob2.isShutdown()); 1222 oob1.shutdown(); 1223 verify(oobExecutorPool, never()).returnObject(anyObject()); 1224 oob2.shutdownNow(); 1225 assertTrue(oob1.isShutdown()); 1226 assertTrue(oob2.isShutdown()); 1227 assertTrue(oob2.isTerminated()); 1228 verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService()); 1229 1230 // New RPCs will be rejected. 1231 assertEquals(0, oobExecutor.numPendingTasks()); 1232 ClientCall<String, Integer> call4 = oob1.newCall(method, CallOptions.DEFAULT); 1233 ClientCall<String, Integer> call5 = oob2.newCall(method, CallOptions.DEFAULT); 1234 call4.start(mockCallListener4, headers); 1235 call5.start(mockCallListener5, headers); 1236 assertTrue(oobExecutor.runDueTasks() > 0); 1237 verify(mockCallListener4).onClose(statusCaptor.capture(), any(Metadata.class)); 1238 Status status4 = statusCaptor.getValue(); 1239 assertEquals(Status.Code.UNAVAILABLE, status4.getCode()); 1240 verify(mockCallListener5).onClose(statusCaptor.capture(), any(Metadata.class)); 1241 Status status5 = statusCaptor.getValue(); 1242 assertEquals(Status.Code.UNAVAILABLE, status5.getCode()); 1243 1244 // The pending RPC will still be pending 1245 verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class)); 1246 1247 // This will shutdownNow() the delayed transport, terminating the pending RPC 1248 assertEquals(0, oobExecutor.numPendingTasks()); 1249 oob1.shutdownNow(); 1250 assertTrue(oobExecutor.runDueTasks() > 0); 1251 verify(mockCallListener3).onClose(any(Status.class), any(Metadata.class)); 1252 1253 // Shut down the channel, and it will not terminated because OOB channel has not. 1254 channel.shutdown(); 1255 assertFalse(channel.isTerminated()); 1256 // Delayed transport has already terminated. Terminating the transport terminates the 1257 // subchannel, which in turn terimates the OOB channel, which terminates the channel. 1258 assertFalse(oob1.isTerminated()); 1259 verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService()); 1260 transportInfo.listener.transportTerminated(); 1261 assertTrue(oob1.isTerminated()); 1262 assertTrue(channel.isTerminated()); 1263 verify(oobExecutorPool, times(2)).returnObject(oobExecutor.getScheduledExecutorService()); 1264 } 1265 1266 @Test oobChannelsWhenChannelShutdownNow()1267 public void oobChannelsWhenChannelShutdownNow() { 1268 createChannel(); 1269 ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority"); 1270 ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority"); 1271 1272 oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata()); 1273 oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata()); 1274 1275 assertEquals(2, transports.size()); 1276 MockClientTransportInfo ti1 = transports.poll(); 1277 MockClientTransportInfo ti2 = transports.poll(); 1278 1279 ti1.listener.transportReady(); 1280 ti2.listener.transportReady(); 1281 1282 channel.shutdownNow(); 1283 verify(ti1.transport).shutdownNow(any(Status.class)); 1284 verify(ti2.transport).shutdownNow(any(Status.class)); 1285 1286 ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); 1287 ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); 1288 ti1.listener.transportTerminated(); 1289 1290 assertFalse(channel.isTerminated()); 1291 ti2.listener.transportTerminated(); 1292 assertTrue(channel.isTerminated()); 1293 } 1294 1295 @Test oobChannelsNoConnectionShutdown()1296 public void oobChannelsNoConnectionShutdown() { 1297 createChannel(); 1298 ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority"); 1299 ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority"); 1300 channel.shutdown(); 1301 1302 verify(mockLoadBalancer).shutdown(); 1303 oob1.shutdown(); 1304 assertTrue(oob1.isTerminated()); 1305 assertFalse(channel.isTerminated()); 1306 oob2.shutdown(); 1307 assertTrue(oob2.isTerminated()); 1308 assertTrue(channel.isTerminated()); 1309 verify(mockTransportFactory, never()) 1310 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 1311 } 1312 1313 @Test oobChannelsNoConnectionShutdownNow()1314 public void oobChannelsNoConnectionShutdownNow() { 1315 createChannel(); 1316 helper.createOobChannel(addressGroup, "oob1Authority"); 1317 helper.createOobChannel(addressGroup, "oob2Authority"); 1318 channel.shutdownNow(); 1319 1320 verify(mockLoadBalancer).shutdown(); 1321 assertTrue(channel.isTerminated()); 1322 // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels. 1323 // Therefore, channel is terminated without relying on LoadBalancer to shutdown oobchannels. 1324 verify(mockTransportFactory, never()) 1325 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 1326 } 1327 1328 @Test refreshNameResolutionWhenSubchannelConnectionFailed()1329 public void refreshNameResolutionWhenSubchannelConnectionFailed() { 1330 subtestRefreshNameResolutionWhenConnectionFailed(false); 1331 } 1332 1333 @Test refreshNameResolutionWhenOobChannelConnectionFailed()1334 public void refreshNameResolutionWhenOobChannelConnectionFailed() { 1335 subtestRefreshNameResolutionWhenConnectionFailed(true); 1336 } 1337 subtestRefreshNameResolutionWhenConnectionFailed(boolean isOobChannel)1338 private void subtestRefreshNameResolutionWhenConnectionFailed(boolean isOobChannel) { 1339 FakeNameResolverFactory nameResolverFactory = 1340 new FakeNameResolverFactory.Builder(expectedUri) 1341 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 1342 .build(); 1343 channelBuilder.nameResolverFactory(nameResolverFactory); 1344 createChannel(); 1345 FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); 1346 1347 if (isOobChannel) { 1348 OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobAuthority"); 1349 oobChannel.getSubchannel().requestConnection(); 1350 } else { 1351 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1352 subchannel.requestConnection(); 1353 } 1354 1355 MockClientTransportInfo transportInfo = transports.poll(); 1356 assertNotNull(transportInfo); 1357 1358 // Transport closed when connecting 1359 assertEquals(0, resolver.refreshCalled); 1360 transportInfo.listener.transportShutdown(Status.UNAVAILABLE); 1361 assertEquals(1, resolver.refreshCalled); 1362 1363 timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS); 1364 transportInfo = transports.poll(); 1365 assertNotNull(transportInfo); 1366 1367 transportInfo.listener.transportReady(); 1368 1369 // Transport closed when ready 1370 assertEquals(1, resolver.refreshCalled); 1371 transportInfo.listener.transportShutdown(Status.UNAVAILABLE); 1372 assertEquals(2, resolver.refreshCalled); 1373 } 1374 1375 @Test uriPattern()1376 public void uriPattern() { 1377 assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("a:/").matches()); 1378 assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("Z019+-.:/!@ #~ ").matches()); 1379 assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a/:").matches()); // "/:" not matched 1380 assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("0a:/").matches()); // '0' not matched 1381 assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a,:/").matches()); // ',' not matched 1382 assertFalse(ManagedChannelImpl.URI_PATTERN.matcher(" a:/").matches()); // space not matched 1383 } 1384 1385 /** 1386 * Test that information such as the Call's context, MethodDescriptor, authority, executor are 1387 * propagated to newStream() and applyRequestMetadata(). 1388 */ 1389 @Test 1390 @SuppressWarnings("deprecation") informationPropagatedToNewStreamAndCallCredentials()1391 public void informationPropagatedToNewStreamAndCallCredentials() { 1392 createChannel(); 1393 CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds); 1394 final Context.Key<String> testKey = Context.key("testing"); 1395 Context ctx = Context.current().withValue(testKey, "testValue"); 1396 final LinkedList<Context> credsApplyContexts = new LinkedList<Context>(); 1397 final LinkedList<Context> newStreamContexts = new LinkedList<Context>(); 1398 doAnswer(new Answer<Void>() { 1399 @Override 1400 public Void answer(InvocationOnMock in) throws Throwable { 1401 credsApplyContexts.add(Context.current()); 1402 return null; 1403 } 1404 }).when(creds).applyRequestMetadata( // TODO(zhangkun83): remove suppression of deprecations 1405 any(MethodDescriptor.class), any(Attributes.class), any(Executor.class), 1406 any(CallCredentials.MetadataApplier.class)); 1407 1408 // First call will be on delayed transport. Only newCall() is run within the expected context, 1409 // so that we can verify that the context is explicitly attached before calling newStream() and 1410 // applyRequestMetadata(), which happens after we detach the context from the thread. 1411 Context origCtx = ctx.attach(); 1412 assertEquals("testValue", testKey.get()); 1413 ClientCall<String, Integer> call = channel.newCall(method, callOptions); 1414 ctx.detach(origCtx); 1415 assertNull(testKey.get()); 1416 call.start(mockCallListener, new Metadata()); 1417 1418 // Simulate name resolution results 1419 EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress); 1420 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1421 subchannel.requestConnection(); 1422 verify(mockTransportFactory) 1423 .newClientTransport(same(socketAddress), eq(clientTransportOptions)); 1424 MockClientTransportInfo transportInfo = transports.poll(); 1425 final ConnectionClientTransport transport = transportInfo.transport; 1426 when(transport.getAttributes()).thenReturn(Attributes.EMPTY); 1427 doAnswer(new Answer<ClientStream>() { 1428 @Override 1429 public ClientStream answer(InvocationOnMock in) throws Throwable { 1430 newStreamContexts.add(Context.current()); 1431 return mock(ClientStream.class); 1432 } 1433 }).when(transport).newStream( 1434 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); 1435 1436 verify(creds, never()).applyRequestMetadata( 1437 any(MethodDescriptor.class), any(Attributes.class), any(Executor.class), 1438 any(CallCredentials.MetadataApplier.class)); 1439 1440 // applyRequestMetadata() is called after the transport becomes ready. 1441 transportInfo.listener.transportReady(); 1442 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 1443 .thenReturn(PickResult.withSubchannel(subchannel)); 1444 helper.updateBalancingState(READY, mockPicker); 1445 executor.runDueTasks(); 1446 ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(Attributes.class); 1447 ArgumentCaptor<CallCredentials.MetadataApplier> applierCaptor 1448 = ArgumentCaptor.forClass(CallCredentials.MetadataApplier.class); 1449 verify(creds).applyRequestMetadata(same(method), attrsCaptor.capture(), 1450 same(executor.getScheduledExecutorService()), applierCaptor.capture()); 1451 assertEquals("testValue", testKey.get(credsApplyContexts.poll())); 1452 assertEquals(AUTHORITY, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY)); 1453 assertEquals(SecurityLevel.NONE, 1454 attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL)); 1455 verify(transport, never()).newStream( 1456 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); 1457 1458 // newStream() is called after apply() is called 1459 applierCaptor.getValue().apply(new Metadata()); 1460 verify(transport).newStream(same(method), any(Metadata.class), same(callOptions)); 1461 assertEquals("testValue", testKey.get(newStreamContexts.poll())); 1462 // The context should not live beyond the scope of newStream() and applyRequestMetadata() 1463 assertNull(testKey.get()); 1464 1465 1466 // Second call will not be on delayed transport 1467 origCtx = ctx.attach(); 1468 call = channel.newCall(method, callOptions); 1469 ctx.detach(origCtx); 1470 call.start(mockCallListener, new Metadata()); 1471 1472 verify(creds, times(2)).applyRequestMetadata(same(method), attrsCaptor.capture(), 1473 same(executor.getScheduledExecutorService()), applierCaptor.capture()); 1474 assertEquals("testValue", testKey.get(credsApplyContexts.poll())); 1475 assertEquals(AUTHORITY, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY)); 1476 assertEquals(SecurityLevel.NONE, 1477 attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL)); 1478 // This is from the first call 1479 verify(transport).newStream( 1480 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); 1481 1482 // Still, newStream() is called after apply() is called 1483 applierCaptor.getValue().apply(new Metadata()); 1484 verify(transport, times(2)).newStream(same(method), any(Metadata.class), same(callOptions)); 1485 assertEquals("testValue", testKey.get(newStreamContexts.poll())); 1486 1487 assertNull(testKey.get()); 1488 } 1489 1490 @Test pickerReturnsStreamTracer_noDelay()1491 public void pickerReturnsStreamTracer_noDelay() { 1492 ClientStream mockStream = mock(ClientStream.class); 1493 ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class); 1494 ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class); 1495 createChannel(); 1496 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1497 subchannel.requestConnection(); 1498 MockClientTransportInfo transportInfo = transports.poll(); 1499 transportInfo.listener.transportReady(); 1500 ClientTransport mockTransport = transportInfo.transport; 1501 when(mockTransport.newStream( 1502 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) 1503 .thenReturn(mockStream); 1504 1505 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( 1506 PickResult.withSubchannel(subchannel, factory2)); 1507 helper.updateBalancingState(READY, mockPicker); 1508 1509 CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1); 1510 ClientCall<String, Integer> call = channel.newCall(method, callOptions); 1511 call.start(mockCallListener, new Metadata()); 1512 1513 verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class)); 1514 verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture()); 1515 assertEquals( 1516 Arrays.asList(factory1, factory2), 1517 callOptionsCaptor.getValue().getStreamTracerFactories()); 1518 // The factories are safely not stubbed because we do not expect any usage of them. 1519 verifyZeroInteractions(factory1); 1520 verifyZeroInteractions(factory2); 1521 } 1522 1523 @Test pickerReturnsStreamTracer_delayed()1524 public void pickerReturnsStreamTracer_delayed() { 1525 ClientStream mockStream = mock(ClientStream.class); 1526 ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class); 1527 ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class); 1528 createChannel(); 1529 1530 CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1); 1531 ClientCall<String, Integer> call = channel.newCall(method, callOptions); 1532 call.start(mockCallListener, new Metadata()); 1533 1534 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1535 subchannel.requestConnection(); 1536 MockClientTransportInfo transportInfo = transports.poll(); 1537 transportInfo.listener.transportReady(); 1538 ClientTransport mockTransport = transportInfo.transport; 1539 when(mockTransport.newStream( 1540 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) 1541 .thenReturn(mockStream); 1542 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( 1543 PickResult.withSubchannel(subchannel, factory2)); 1544 1545 helper.updateBalancingState(READY, mockPicker); 1546 assertEquals(1, executor.runDueTasks()); 1547 1548 verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class)); 1549 verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture()); 1550 assertEquals( 1551 Arrays.asList(factory1, factory2), 1552 callOptionsCaptor.getValue().getStreamTracerFactories()); 1553 // The factories are safely not stubbed because we do not expect any usage of them. 1554 verifyZeroInteractions(factory1); 1555 verifyZeroInteractions(factory2); 1556 } 1557 1558 @Test getState_loadBalancerSupportsChannelState()1559 public void getState_loadBalancerSupportsChannelState() { 1560 channelBuilder.nameResolverFactory( 1561 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); 1562 createChannel(); 1563 assertEquals(IDLE, channel.getState(false)); 1564 1565 helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker); 1566 assertEquals(TRANSIENT_FAILURE, channel.getState(false)); 1567 } 1568 1569 @Test getState_withRequestConnect()1570 public void getState_withRequestConnect() { 1571 channelBuilder.nameResolverFactory( 1572 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); 1573 requestConnection = false; 1574 createChannel(); 1575 1576 assertEquals(IDLE, channel.getState(false)); 1577 verify(mockLoadBalancerFactory, never()).newLoadBalancer(any(Helper.class)); 1578 1579 // call getState() with requestConnection = true 1580 assertEquals(IDLE, channel.getState(true)); 1581 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null); 1582 verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); 1583 helper = helperCaptor.getValue(); 1584 1585 helper.updateBalancingState(CONNECTING, mockPicker); 1586 assertEquals(CONNECTING, channel.getState(false)); 1587 assertEquals(CONNECTING, channel.getState(true)); 1588 verifyNoMoreInteractions(mockLoadBalancerFactory); 1589 } 1590 1591 @Test getState_withRequestConnect_IdleWithLbRunning()1592 public void getState_withRequestConnect_IdleWithLbRunning() { 1593 channelBuilder.nameResolverFactory( 1594 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); 1595 createChannel(); 1596 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); 1597 1598 helper.updateBalancingState(IDLE, mockPicker); 1599 1600 assertEquals(IDLE, channel.getState(true)); 1601 verifyNoMoreInteractions(mockLoadBalancerFactory); 1602 verify(mockPicker).requestConnection(); 1603 } 1604 1605 @Test notifyWhenStateChanged()1606 public void notifyWhenStateChanged() { 1607 final AtomicBoolean stateChanged = new AtomicBoolean(); 1608 Runnable onStateChanged = new Runnable() { 1609 @Override 1610 public void run() { 1611 stateChanged.set(true); 1612 } 1613 }; 1614 1615 channelBuilder.nameResolverFactory( 1616 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); 1617 createChannel(); 1618 assertEquals(IDLE, channel.getState(false)); 1619 1620 channel.notifyWhenStateChanged(IDLE, onStateChanged); 1621 executor.runDueTasks(); 1622 assertFalse(stateChanged.get()); 1623 1624 // state change from IDLE to CONNECTING 1625 helper.updateBalancingState(CONNECTING, mockPicker); 1626 // onStateChanged callback should run 1627 executor.runDueTasks(); 1628 assertTrue(stateChanged.get()); 1629 1630 // clear and test form CONNECTING 1631 stateChanged.set(false); 1632 channel.notifyWhenStateChanged(IDLE, onStateChanged); 1633 // onStateChanged callback should run immediately 1634 executor.runDueTasks(); 1635 assertTrue(stateChanged.get()); 1636 } 1637 1638 @Test channelStateWhenChannelShutdown()1639 public void channelStateWhenChannelShutdown() { 1640 final AtomicBoolean stateChanged = new AtomicBoolean(); 1641 Runnable onStateChanged = new Runnable() { 1642 @Override 1643 public void run() { 1644 stateChanged.set(true); 1645 } 1646 }; 1647 1648 channelBuilder.nameResolverFactory( 1649 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); 1650 createChannel(); 1651 assertEquals(IDLE, channel.getState(false)); 1652 channel.notifyWhenStateChanged(IDLE, onStateChanged); 1653 executor.runDueTasks(); 1654 assertFalse(stateChanged.get()); 1655 1656 channel.shutdown(); 1657 assertEquals(SHUTDOWN, channel.getState(false)); 1658 executor.runDueTasks(); 1659 assertTrue(stateChanged.get()); 1660 1661 stateChanged.set(false); 1662 channel.notifyWhenStateChanged(SHUTDOWN, onStateChanged); 1663 helper.updateBalancingState(CONNECTING, mockPicker); 1664 1665 assertEquals(SHUTDOWN, channel.getState(false)); 1666 executor.runDueTasks(); 1667 assertFalse(stateChanged.get()); 1668 } 1669 1670 @Test stateIsIdleOnIdleTimeout()1671 public void stateIsIdleOnIdleTimeout() { 1672 long idleTimeoutMillis = 2000L; 1673 channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); 1674 createChannel(); 1675 assertEquals(IDLE, channel.getState(false)); 1676 1677 helper.updateBalancingState(CONNECTING, mockPicker); 1678 assertEquals(CONNECTING, channel.getState(false)); 1679 1680 timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); 1681 assertEquals(IDLE, channel.getState(false)); 1682 } 1683 1684 @Test panic_whenIdle()1685 public void panic_whenIdle() { 1686 subtestPanic(IDLE); 1687 } 1688 1689 @Test panic_whenConnecting()1690 public void panic_whenConnecting() { 1691 subtestPanic(CONNECTING); 1692 } 1693 1694 @Test panic_whenTransientFailure()1695 public void panic_whenTransientFailure() { 1696 subtestPanic(TRANSIENT_FAILURE); 1697 } 1698 1699 @Test panic_whenReady()1700 public void panic_whenReady() { 1701 subtestPanic(READY); 1702 } 1703 subtestPanic(ConnectivityState initialState)1704 private void subtestPanic(ConnectivityState initialState) { 1705 assertNotEquals("We don't test panic mode if it's already SHUTDOWN", SHUTDOWN, initialState); 1706 long idleTimeoutMillis = 2000L; 1707 FakeNameResolverFactory nameResolverFactory = 1708 new FakeNameResolverFactory.Builder(expectedUri).build(); 1709 channelBuilder.nameResolverFactory(nameResolverFactory); 1710 channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); 1711 createChannel(); 1712 1713 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); 1714 assertEquals(1, nameResolverFactory.resolvers.size()); 1715 FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0); 1716 1717 Throwable panicReason = new Exception("Simulated uncaught exception"); 1718 if (initialState == IDLE) { 1719 timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); 1720 } else { 1721 helper.updateBalancingState(initialState, mockPicker); 1722 } 1723 assertEquals(initialState, channel.getState(false)); 1724 1725 if (initialState == IDLE) { 1726 // IDLE mode will shutdown resolver and balancer 1727 verify(mockLoadBalancer).shutdown(); 1728 assertTrue(resolver.shutdown); 1729 // A new resolver is created 1730 assertEquals(1, nameResolverFactory.resolvers.size()); 1731 resolver = nameResolverFactory.resolvers.remove(0); 1732 assertFalse(resolver.shutdown); 1733 } else { 1734 verify(mockLoadBalancer, never()).shutdown(); 1735 assertFalse(resolver.shutdown); 1736 } 1737 1738 // Make channel panic! 1739 channel.panic(panicReason); 1740 1741 // Calls buffered in delayedTransport will fail 1742 1743 // Resolver and balancer are shutdown 1744 verify(mockLoadBalancer).shutdown(); 1745 assertTrue(resolver.shutdown); 1746 1747 // Channel will stay in TRANSIENT_FAILURE. getState(true) will not revive it. 1748 assertEquals(TRANSIENT_FAILURE, channel.getState(true)); 1749 assertEquals(TRANSIENT_FAILURE, channel.getState(true)); 1750 verifyPanicMode(panicReason); 1751 1752 // No new resolver or balancer are created 1753 verifyNoMoreInteractions(mockLoadBalancerFactory); 1754 assertEquals(0, nameResolverFactory.resolvers.size()); 1755 1756 // A misbehaving balancer that calls updateBalancingState() after it's shut down will not be 1757 // able to revive it. 1758 helper.updateBalancingState(READY, mockPicker); 1759 verifyPanicMode(panicReason); 1760 1761 // Cannot be revived by exitIdleMode() 1762 channel.exitIdleMode(); 1763 verifyPanicMode(panicReason); 1764 1765 // Can still shutdown normally 1766 channel.shutdown(); 1767 assertTrue(channel.isShutdown()); 1768 assertTrue(channel.isTerminated()); 1769 assertEquals(SHUTDOWN, channel.getState(false)); 1770 1771 // We didn't stub mockPicker, because it should have never been called in this test. 1772 verifyZeroInteractions(mockPicker); 1773 } 1774 1775 @Test panic_bufferedCallsWillFail()1776 public void panic_bufferedCallsWillFail() { 1777 createChannel(); 1778 1779 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 1780 .thenReturn(PickResult.withNoResult()); 1781 helper.updateBalancingState(CONNECTING, mockPicker); 1782 1783 // Start RPCs that will be buffered in delayedTransport 1784 ClientCall<String, Integer> call = 1785 channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady()); 1786 call.start(mockCallListener, new Metadata()); 1787 1788 ClientCall<String, Integer> call2 = 1789 channel.newCall(method, CallOptions.DEFAULT.withWaitForReady()); 1790 call2.start(mockCallListener2, new Metadata()); 1791 1792 executor.runDueTasks(); 1793 verifyZeroInteractions(mockCallListener, mockCallListener2); 1794 1795 // Enter panic 1796 Throwable panicReason = new Exception("Simulated uncaught exception"); 1797 channel.panic(panicReason); 1798 1799 // Buffered RPCs fail immediately 1800 executor.runDueTasks(); 1801 verifyCallListenerClosed(mockCallListener, Status.Code.INTERNAL, panicReason); 1802 verifyCallListenerClosed(mockCallListener2, Status.Code.INTERNAL, panicReason); 1803 } 1804 verifyPanicMode(Throwable cause)1805 private void verifyPanicMode(Throwable cause) { 1806 Assume.assumeTrue("Panic mode disabled to resolve issues with some tests. See #3293", false); 1807 1808 @SuppressWarnings("unchecked") 1809 ClientCall.Listener<Integer> mockListener = 1810 (ClientCall.Listener<Integer>) mock(ClientCall.Listener.class); 1811 assertEquals(TRANSIENT_FAILURE, channel.getState(false)); 1812 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 1813 call.start(mockListener, new Metadata()); 1814 executor.runDueTasks(); 1815 verifyCallListenerClosed(mockListener, Status.Code.INTERNAL, cause); 1816 1817 // Channel is dead. No more pending task to possibly revive it. 1818 assertEquals(0, timer.numPendingTasks()); 1819 assertEquals(0, executor.numPendingTasks()); 1820 assertEquals(0, oobExecutor.numPendingTasks()); 1821 } 1822 verifyCallListenerClosed( ClientCall.Listener<Integer> listener, Status.Code code, Throwable cause)1823 private void verifyCallListenerClosed( 1824 ClientCall.Listener<Integer> listener, Status.Code code, Throwable cause) { 1825 ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(null); 1826 verify(listener).onClose(captor.capture(), any(Metadata.class)); 1827 Status rpcStatus = captor.getValue(); 1828 assertEquals(code, rpcStatus.getCode()); 1829 assertSame(cause, rpcStatus.getCause()); 1830 verifyNoMoreInteractions(listener); 1831 } 1832 1833 @Test idleTimeoutAndReconnect()1834 public void idleTimeoutAndReconnect() { 1835 long idleTimeoutMillis = 2000L; 1836 channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); 1837 createChannel(); 1838 1839 timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); 1840 assertEquals(IDLE, channel.getState(true /* request connection */)); 1841 1842 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); 1843 // Two times of requesting connection will create loadBalancer twice. 1844 verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture()); 1845 Helper helper2 = helperCaptor.getValue(); 1846 1847 // Updating on the old helper (whose balancer has been shutdown) does not change the channel 1848 // state. 1849 helper.updateBalancingState(CONNECTING, mockPicker); 1850 assertEquals(IDLE, channel.getState(false)); 1851 1852 helper2.updateBalancingState(CONNECTING, mockPicker); 1853 assertEquals(CONNECTING, channel.getState(false)); 1854 } 1855 1856 @Test idleMode_resetsDelayedTransportPicker()1857 public void idleMode_resetsDelayedTransportPicker() { 1858 ClientStream mockStream = mock(ClientStream.class); 1859 Status pickError = Status.UNAVAILABLE.withDescription("pick result error"); 1860 long idleTimeoutMillis = 1000L; 1861 channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); 1862 channelBuilder.nameResolverFactory( 1863 new FakeNameResolverFactory.Builder(expectedUri) 1864 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 1865 .build()); 1866 createChannel(); 1867 assertEquals(IDLE, channel.getState(false)); 1868 1869 // This call will be buffered in delayedTransport 1870 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 1871 call.start(mockCallListener, new Metadata()); 1872 1873 // Move channel into TRANSIENT_FAILURE, which will fail the pending call 1874 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 1875 .thenReturn(PickResult.withError(pickError)); 1876 helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker); 1877 assertEquals(TRANSIENT_FAILURE, channel.getState(false)); 1878 executor.runDueTasks(); 1879 verify(mockCallListener).onClose(same(pickError), any(Metadata.class)); 1880 1881 // Move channel to idle 1882 timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); 1883 assertEquals(IDLE, channel.getState(false)); 1884 1885 // This call should be buffered, but will move the channel out of idle 1886 ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT); 1887 call2.start(mockCallListener2, new Metadata()); 1888 executor.runDueTasks(); 1889 verifyNoMoreInteractions(mockCallListener2); 1890 1891 // Get the helper created on exiting idle 1892 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); 1893 verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture()); 1894 Helper helper2 = helperCaptor.getValue(); 1895 1896 // Establish a connection 1897 Subchannel subchannel = helper2.createSubchannel(addressGroup, Attributes.EMPTY); 1898 subchannel.requestConnection(); 1899 MockClientTransportInfo transportInfo = transports.poll(); 1900 ConnectionClientTransport mockTransport = transportInfo.transport; 1901 ManagedClientTransport.Listener transportListener = transportInfo.listener; 1902 when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) 1903 .thenReturn(mockStream); 1904 transportListener.transportReady(); 1905 1906 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 1907 .thenReturn(PickResult.withSubchannel(subchannel)); 1908 helper2.updateBalancingState(READY, mockPicker); 1909 assertEquals(READY, channel.getState(false)); 1910 executor.runDueTasks(); 1911 1912 // Verify the buffered call was drained 1913 verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class)); 1914 verify(mockStream).start(any(ClientStreamListener.class)); 1915 } 1916 1917 @Test enterIdleEntersIdle()1918 public void enterIdleEntersIdle() { 1919 createChannel(); 1920 helper.updateBalancingState(READY, mockPicker); 1921 assertEquals(READY, channel.getState(false)); 1922 1923 channel.enterIdle(); 1924 1925 assertEquals(IDLE, channel.getState(false)); 1926 } 1927 1928 @Test enterIdleAfterIdleTimerIsNoOp()1929 public void enterIdleAfterIdleTimerIsNoOp() { 1930 long idleTimeoutMillis = 2000L; 1931 channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); 1932 createChannel(); 1933 timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); 1934 assertEquals(IDLE, channel.getState(false)); 1935 1936 channel.enterIdle(); 1937 1938 assertEquals(IDLE, channel.getState(false)); 1939 } 1940 1941 @Test enterIdle_exitsIdleIfDelayedStreamPending()1942 public void enterIdle_exitsIdleIfDelayedStreamPending() { 1943 FakeNameResolverFactory nameResolverFactory = 1944 new FakeNameResolverFactory.Builder(expectedUri) 1945 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 1946 .build(); 1947 channelBuilder.nameResolverFactory(nameResolverFactory); 1948 createChannel(); 1949 1950 // Start a call that will be buffered in delayedTransport 1951 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 1952 call.start(mockCallListener, new Metadata()); 1953 1954 // enterIdle() will shut down the name resolver and lb policy used to get a pick for the delayed 1955 // call 1956 channel.enterIdle(); 1957 assertEquals(IDLE, channel.getState(false)); 1958 1959 // enterIdle() will restart the delayed call by exiting idle. This creates a new helper. 1960 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); 1961 verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture()); 1962 Helper helper2 = helperCaptor.getValue(); 1963 1964 // Establish a connection 1965 Subchannel subchannel = helper2.createSubchannel(addressGroup, Attributes.EMPTY); 1966 subchannel.requestConnection(); 1967 ClientStream mockStream = mock(ClientStream.class); 1968 MockClientTransportInfo transportInfo = transports.poll(); 1969 ConnectionClientTransport mockTransport = transportInfo.transport; 1970 ManagedClientTransport.Listener transportListener = transportInfo.listener; 1971 when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) 1972 .thenReturn(mockStream); 1973 transportListener.transportReady(); 1974 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 1975 .thenReturn(PickResult.withSubchannel(subchannel)); 1976 helper2.updateBalancingState(READY, mockPicker); 1977 assertEquals(READY, channel.getState(false)); 1978 1979 // Verify the original call was drained 1980 executor.runDueTasks(); 1981 verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class)); 1982 verify(mockStream).start(any(ClientStreamListener.class)); 1983 } 1984 1985 @Test updateBalancingStateDoesUpdatePicker()1986 public void updateBalancingStateDoesUpdatePicker() { 1987 ClientStream mockStream = mock(ClientStream.class); 1988 createChannel(); 1989 1990 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 1991 call.start(mockCallListener, new Metadata()); 1992 1993 // Make the transport available with subchannel2 1994 Subchannel subchannel1 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1995 Subchannel subchannel2 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1996 subchannel2.requestConnection(); 1997 1998 MockClientTransportInfo transportInfo = transports.poll(); 1999 ConnectionClientTransport mockTransport = transportInfo.transport; 2000 ManagedClientTransport.Listener transportListener = transportInfo.listener; 2001 when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) 2002 .thenReturn(mockStream); 2003 transportListener.transportReady(); 2004 2005 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 2006 .thenReturn(PickResult.withSubchannel(subchannel1)); 2007 helper.updateBalancingState(READY, mockPicker); 2008 2009 executor.runDueTasks(); 2010 verify(mockTransport, never()) 2011 .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); 2012 verify(mockStream, never()).start(any(ClientStreamListener.class)); 2013 2014 2015 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 2016 .thenReturn(PickResult.withSubchannel(subchannel2)); 2017 helper.updateBalancingState(READY, mockPicker); 2018 2019 executor.runDueTasks(); 2020 verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class)); 2021 verify(mockStream).start(any(ClientStreamListener.class)); 2022 } 2023 2024 @Test updateBalancingStateWithShutdownShouldBeIgnored()2025 public void updateBalancingStateWithShutdownShouldBeIgnored() { 2026 channelBuilder.nameResolverFactory( 2027 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); 2028 createChannel(); 2029 assertEquals(IDLE, channel.getState(false)); 2030 2031 Runnable onStateChanged = mock(Runnable.class); 2032 channel.notifyWhenStateChanged(IDLE, onStateChanged); 2033 2034 helper.updateBalancingState(SHUTDOWN, mockPicker); 2035 2036 assertEquals(IDLE, channel.getState(false)); 2037 executor.runDueTasks(); 2038 verify(onStateChanged, never()).run(); 2039 } 2040 2041 @Test resetConnectBackoff()2042 public void resetConnectBackoff() { 2043 // Start with a name resolution failure to trigger backoff attempts 2044 Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error")); 2045 FakeNameResolverFactory nameResolverFactory = 2046 new FakeNameResolverFactory.Builder(expectedUri).setError(error).build(); 2047 channelBuilder.nameResolverFactory(nameResolverFactory); 2048 // Name resolution is started as soon as channel is created. 2049 createChannel(); 2050 FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); 2051 verify(mockLoadBalancer).handleNameResolutionError(same(error)); 2052 2053 FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh(); 2054 assertNotNull("There should be a name resolver backoff task", nameResolverBackoff); 2055 assertEquals(0, resolver.refreshCalled); 2056 2057 // Verify resetConnectBackoff() calls refresh and cancels the scheduled backoff 2058 channel.resetConnectBackoff(); 2059 assertEquals(1, resolver.refreshCalled); 2060 assertTrue(nameResolverBackoff.isCancelled()); 2061 2062 // Simulate a race between cancel and the task scheduler. Should be a no-op. 2063 nameResolverBackoff.command.run(); 2064 assertEquals(1, resolver.refreshCalled); 2065 2066 // Verify that the reconnect policy was recreated and the backoff multiplier reset to 1 2067 timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS); 2068 assertEquals(2, resolver.refreshCalled); 2069 } 2070 2071 @Test resetConnectBackoff_noOpWithoutPendingResolverBackoff()2072 public void resetConnectBackoff_noOpWithoutPendingResolverBackoff() { 2073 FakeNameResolverFactory nameResolverFactory = 2074 new FakeNameResolverFactory.Builder(expectedUri) 2075 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 2076 .build(); 2077 channelBuilder.nameResolverFactory(nameResolverFactory); 2078 createChannel(); 2079 FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0); 2080 assertEquals(0, nameResolver.refreshCalled); 2081 2082 channel.resetConnectBackoff(); 2083 2084 assertEquals(0, nameResolver.refreshCalled); 2085 } 2086 2087 @Test resetConnectBackoff_noOpWhenChannelShutdown()2088 public void resetConnectBackoff_noOpWhenChannelShutdown() { 2089 FakeNameResolverFactory nameResolverFactory = 2090 new FakeNameResolverFactory.Builder(expectedUri).build(); 2091 channelBuilder.nameResolverFactory(nameResolverFactory); 2092 createChannel(); 2093 2094 channel.shutdown(); 2095 assertTrue(channel.isShutdown()); 2096 channel.resetConnectBackoff(); 2097 2098 FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0); 2099 assertEquals(0, nameResolver.refreshCalled); 2100 } 2101 2102 @Test resetConnectBackoff_noOpWhenNameResolverNotStarted()2103 public void resetConnectBackoff_noOpWhenNameResolverNotStarted() { 2104 FakeNameResolverFactory nameResolverFactory = 2105 new FakeNameResolverFactory.Builder(expectedUri).build(); 2106 channelBuilder.nameResolverFactory(nameResolverFactory); 2107 requestConnection = false; 2108 createChannel(); 2109 2110 channel.resetConnectBackoff(); 2111 2112 FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0); 2113 assertEquals(0, nameResolver.refreshCalled); 2114 } 2115 2116 @Test channelsAndSubchannels_instrumented_name()2117 public void channelsAndSubchannels_instrumented_name() throws Exception { 2118 createChannel(); 2119 assertEquals(TARGET, getStats(channel).target); 2120 2121 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 2122 assertEquals(Collections.singletonList(addressGroup).toString(), 2123 getStats((AbstractSubchannel) subchannel).target); 2124 } 2125 2126 @Test channelTracing_channelCreationEvent()2127 public void channelTracing_channelCreationEvent() throws Exception { 2128 timer.forwardNanos(1234); 2129 channelBuilder.maxTraceEvents(10); 2130 createChannel(); 2131 assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2132 .setDescription("Channel created") 2133 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2134 .setTimestampNanos(timer.getTicker().read()) 2135 .build()); 2136 } 2137 2138 @Test channelTracing_subchannelCreationEvents()2139 public void channelTracing_subchannelCreationEvents() throws Exception { 2140 channelBuilder.maxTraceEvents(10); 2141 createChannel(); 2142 timer.forwardNanos(1234); 2143 AbstractSubchannel subchannel = 2144 (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY); 2145 assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2146 .setDescription("Child channel created") 2147 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2148 .setTimestampNanos(timer.getTicker().read()) 2149 .setSubchannelRef(subchannel.getInternalSubchannel()) 2150 .build()); 2151 assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2152 .setDescription("Subchannel created") 2153 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2154 .setTimestampNanos(timer.getTicker().read()) 2155 .build()); 2156 } 2157 2158 @Test channelTracing_nameResolvingErrorEvent()2159 public void channelTracing_nameResolvingErrorEvent() throws Exception { 2160 timer.forwardNanos(1234); 2161 channelBuilder.maxTraceEvents(10); 2162 createChannel(); 2163 assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2164 .setDescription("Failed to resolve name") 2165 .setSeverity(ChannelTrace.Event.Severity.CT_WARNING) 2166 .setTimestampNanos(timer.getTicker().read()) 2167 .build()); 2168 } 2169 2170 @Test channelTracing_nameResolvedEvent()2171 public void channelTracing_nameResolvedEvent() throws Exception { 2172 timer.forwardNanos(1234); 2173 channelBuilder.maxTraceEvents(10); 2174 FakeNameResolverFactory nameResolverFactory = 2175 new FakeNameResolverFactory.Builder(expectedUri) 2176 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 2177 .build(); 2178 channelBuilder.nameResolverFactory(nameResolverFactory); 2179 createChannel(); 2180 assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2181 .setDescription("Address resolved: " 2182 + Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 2183 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2184 .setTimestampNanos(timer.getTicker().read()) 2185 .build()); 2186 } 2187 2188 @Test channelTracing_nameResolvedEvent_zeorAndNonzeroBackends()2189 public void channelTracing_nameResolvedEvent_zeorAndNonzeroBackends() throws Exception { 2190 timer.forwardNanos(1234); 2191 channelBuilder.maxTraceEvents(10); 2192 List<EquivalentAddressGroup> servers = new ArrayList<>(); 2193 servers.add(new EquivalentAddressGroup(socketAddress)); 2194 FakeNameResolverFactory nameResolverFactory = 2195 new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build(); 2196 channelBuilder.nameResolverFactory(nameResolverFactory); 2197 createChannel(); 2198 2199 int prevSize = getStats(channel).channelTrace.events.size(); 2200 nameResolverFactory.resolvers.get(0).listener.onAddresses( 2201 Collections.singletonList(new EquivalentAddressGroup( 2202 Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))), 2203 Attributes.EMPTY); 2204 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize); 2205 2206 prevSize = getStats(channel).channelTrace.events.size(); 2207 nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL); 2208 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1); 2209 2210 prevSize = getStats(channel).channelTrace.events.size(); 2211 nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL); 2212 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize); 2213 2214 prevSize = getStats(channel).channelTrace.events.size(); 2215 nameResolverFactory.resolvers.get(0).listener.onAddresses( 2216 Collections.singletonList(new EquivalentAddressGroup( 2217 Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))), 2218 Attributes.EMPTY); 2219 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1); 2220 } 2221 2222 @Test channelTracing_serviceConfigChange()2223 public void channelTracing_serviceConfigChange() throws Exception { 2224 timer.forwardNanos(1234); 2225 channelBuilder.maxTraceEvents(10); 2226 List<EquivalentAddressGroup> servers = new ArrayList<>(); 2227 servers.add(new EquivalentAddressGroup(socketAddress)); 2228 FakeNameResolverFactory nameResolverFactory = 2229 new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build(); 2230 channelBuilder.nameResolverFactory(nameResolverFactory); 2231 createChannel(); 2232 2233 int prevSize = getStats(channel).channelTrace.events.size(); 2234 Attributes attributes = 2235 Attributes.newBuilder() 2236 .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, new HashMap<String, Object>()) 2237 .build(); 2238 nameResolverFactory.resolvers.get(0).listener.onAddresses( 2239 Collections.singletonList(new EquivalentAddressGroup( 2240 Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))), 2241 attributes); 2242 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1); 2243 assertThat(getStats(channel).channelTrace.events.get(prevSize)) 2244 .isEqualTo(new ChannelTrace.Event.Builder() 2245 .setDescription("Service config changed") 2246 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2247 .setTimestampNanos(timer.getTicker().read()) 2248 .build()); 2249 2250 prevSize = getStats(channel).channelTrace.events.size(); 2251 nameResolverFactory.resolvers.get(0).listener.onAddresses( 2252 Collections.singletonList(new EquivalentAddressGroup( 2253 Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))), 2254 attributes); 2255 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize); 2256 2257 prevSize = getStats(channel).channelTrace.events.size(); 2258 Map<String, Object> serviceConfig = new HashMap<String, Object>(); 2259 serviceConfig.put("methodConfig", new HashMap<String, Object>()); 2260 attributes = 2261 Attributes.newBuilder() 2262 .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig) 2263 .build(); 2264 timer.forwardNanos(1234); 2265 nameResolverFactory.resolvers.get(0).listener.onAddresses( 2266 Collections.singletonList(new EquivalentAddressGroup( 2267 Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))), 2268 attributes); 2269 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1); 2270 assertThat(getStats(channel).channelTrace.events.get(prevSize)) 2271 .isEqualTo(new ChannelTrace.Event.Builder() 2272 .setDescription("Service config changed") 2273 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2274 .setTimestampNanos(timer.getTicker().read()) 2275 .build()); 2276 } 2277 2278 @Test channelTracing_stateChangeEvent()2279 public void channelTracing_stateChangeEvent() throws Exception { 2280 channelBuilder.maxTraceEvents(10); 2281 createChannel(); 2282 timer.forwardNanos(1234); 2283 helper.updateBalancingState(CONNECTING, mockPicker); 2284 assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2285 .setDescription("Entering CONNECTING state") 2286 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2287 .setTimestampNanos(timer.getTicker().read()) 2288 .build()); 2289 } 2290 2291 @Test channelTracing_subchannelStateChangeEvent()2292 public void channelTracing_subchannelStateChangeEvent() throws Exception { 2293 channelBuilder.maxTraceEvents(10); 2294 createChannel(); 2295 AbstractSubchannel subchannel = 2296 (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY); 2297 timer.forwardNanos(1234); 2298 subchannel.obtainActiveTransport(); 2299 assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2300 .setDescription("Entering CONNECTING state") 2301 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2302 .setTimestampNanos(timer.getTicker().read()) 2303 .build()); 2304 } 2305 2306 @Test channelTracing_oobChannelStateChangeEvent()2307 public void channelTracing_oobChannelStateChangeEvent() throws Exception { 2308 channelBuilder.maxTraceEvents(10); 2309 createChannel(); 2310 OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority"); 2311 timer.forwardNanos(1234); 2312 oobChannel.handleSubchannelStateChange( 2313 ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING)); 2314 assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2315 .setDescription("Entering CONNECTING state") 2316 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2317 .setTimestampNanos(timer.getTicker().read()) 2318 .build()); 2319 } 2320 2321 @Test channelTracing_oobChannelCreationEvents()2322 public void channelTracing_oobChannelCreationEvents() throws Exception { 2323 channelBuilder.maxTraceEvents(10); 2324 createChannel(); 2325 timer.forwardNanos(1234); 2326 OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority"); 2327 assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2328 .setDescription("Child channel created") 2329 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2330 .setTimestampNanos(timer.getTicker().read()) 2331 .setChannelRef(oobChannel) 2332 .build()); 2333 assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2334 .setDescription("OobChannel created") 2335 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2336 .setTimestampNanos(timer.getTicker().read()) 2337 .build()); 2338 assertThat(getStats(oobChannel.getInternalSubchannel()).channelTrace.events).contains( 2339 new ChannelTrace.Event.Builder() 2340 .setDescription("Subchannel created") 2341 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2342 .setTimestampNanos(timer.getTicker().read()) 2343 .build()); 2344 } 2345 2346 @Test channelsAndSubchannels_instrumented_state()2347 public void channelsAndSubchannels_instrumented_state() throws Exception { 2348 createChannel(); 2349 2350 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null); 2351 verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); 2352 helper = helperCaptor.getValue(); 2353 2354 assertEquals(IDLE, getStats(channel).state); 2355 helper.updateBalancingState(CONNECTING, mockPicker); 2356 assertEquals(CONNECTING, getStats(channel).state); 2357 2358 AbstractSubchannel subchannel = 2359 (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY); 2360 2361 assertEquals(IDLE, getStats(subchannel).state); 2362 subchannel.requestConnection(); 2363 assertEquals(CONNECTING, getStats(subchannel).state); 2364 2365 MockClientTransportInfo transportInfo = transports.poll(); 2366 2367 assertEquals(CONNECTING, getStats(subchannel).state); 2368 transportInfo.listener.transportReady(); 2369 assertEquals(READY, getStats(subchannel).state); 2370 2371 assertEquals(CONNECTING, getStats(channel).state); 2372 helper.updateBalancingState(READY, mockPicker); 2373 assertEquals(READY, getStats(channel).state); 2374 2375 channel.shutdownNow(); 2376 assertEquals(SHUTDOWN, getStats(channel).state); 2377 assertEquals(SHUTDOWN, getStats(subchannel).state); 2378 } 2379 2380 @Test channelStat_callStarted()2381 public void channelStat_callStarted() throws Exception { 2382 createChannel(); 2383 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 2384 assertEquals(0, getStats(channel).callsStarted); 2385 call.start(mockCallListener, new Metadata()); 2386 assertEquals(1, getStats(channel).callsStarted); 2387 assertEquals(executor.getTicker().read(), getStats(channel).lastCallStartedNanos); 2388 } 2389 2390 @Test channelsAndSubChannels_instrumented_success()2391 public void channelsAndSubChannels_instrumented_success() throws Exception { 2392 channelsAndSubchannels_instrumented0(true); 2393 } 2394 2395 @Test channelsAndSubChannels_instrumented_fail()2396 public void channelsAndSubChannels_instrumented_fail() throws Exception { 2397 channelsAndSubchannels_instrumented0(false); 2398 } 2399 channelsAndSubchannels_instrumented0(boolean success)2400 private void channelsAndSubchannels_instrumented0(boolean success) throws Exception { 2401 createChannel(); 2402 2403 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 2404 2405 // Channel stat bumped when ClientCall.start() called 2406 assertEquals(0, getStats(channel).callsStarted); 2407 call.start(mockCallListener, new Metadata()); 2408 assertEquals(1, getStats(channel).callsStarted); 2409 2410 ClientStream mockStream = mock(ClientStream.class); 2411 ClientStreamTracer.Factory factory = mock(ClientStreamTracer.Factory.class); 2412 AbstractSubchannel subchannel = 2413 (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY); 2414 subchannel.requestConnection(); 2415 MockClientTransportInfo transportInfo = transports.poll(); 2416 transportInfo.listener.transportReady(); 2417 ClientTransport mockTransport = transportInfo.transport; 2418 when(mockTransport.newStream( 2419 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) 2420 .thenReturn(mockStream); 2421 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( 2422 PickResult.withSubchannel(subchannel, factory)); 2423 2424 // subchannel stat bumped when call gets assigned to it 2425 assertEquals(0, getStats(subchannel).callsStarted); 2426 helper.updateBalancingState(READY, mockPicker); 2427 assertEquals(1, executor.runDueTasks()); 2428 verify(mockStream).start(streamListenerCaptor.capture()); 2429 assertEquals(1, getStats(subchannel).callsStarted); 2430 2431 ClientStreamListener streamListener = streamListenerCaptor.getValue(); 2432 call.halfClose(); 2433 2434 // closing stream listener affects subchannel stats immediately 2435 assertEquals(0, getStats(subchannel).callsSucceeded); 2436 assertEquals(0, getStats(subchannel).callsFailed); 2437 streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata()); 2438 if (success) { 2439 assertEquals(1, getStats(subchannel).callsSucceeded); 2440 assertEquals(0, getStats(subchannel).callsFailed); 2441 } else { 2442 assertEquals(0, getStats(subchannel).callsSucceeded); 2443 assertEquals(1, getStats(subchannel).callsFailed); 2444 } 2445 2446 // channel stats bumped when the ClientCall.Listener is notified 2447 assertEquals(0, getStats(channel).callsSucceeded); 2448 assertEquals(0, getStats(channel).callsFailed); 2449 executor.runDueTasks(); 2450 if (success) { 2451 assertEquals(1, getStats(channel).callsSucceeded); 2452 assertEquals(0, getStats(channel).callsFailed); 2453 } else { 2454 assertEquals(0, getStats(channel).callsSucceeded); 2455 assertEquals(1, getStats(channel).callsFailed); 2456 } 2457 } 2458 2459 @Test channelsAndSubchannels_oob_instrumented_success()2460 public void channelsAndSubchannels_oob_instrumented_success() throws Exception { 2461 channelsAndSubchannels_oob_instrumented0(true); 2462 } 2463 2464 @Test channelsAndSubchannels_oob_instrumented_fail()2465 public void channelsAndSubchannels_oob_instrumented_fail() throws Exception { 2466 channelsAndSubchannels_oob_instrumented0(false); 2467 } 2468 channelsAndSubchannels_oob_instrumented0(boolean success)2469 private void channelsAndSubchannels_oob_instrumented0(boolean success) throws Exception { 2470 // set up 2471 ClientStream mockStream = mock(ClientStream.class); 2472 createChannel(); 2473 2474 OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority"); 2475 AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel(); 2476 FakeClock callExecutor = new FakeClock(); 2477 CallOptions options = 2478 CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService()); 2479 ClientCall<String, Integer> call = oobChannel.newCall(method, options); 2480 Metadata headers = new Metadata(); 2481 2482 // Channel stat bumped when ClientCall.start() called 2483 assertEquals(0, getStats(oobChannel).callsStarted); 2484 call.start(mockCallListener, headers); 2485 assertEquals(1, getStats(oobChannel).callsStarted); 2486 2487 MockClientTransportInfo transportInfo = transports.poll(); 2488 ConnectionClientTransport mockTransport = transportInfo.transport; 2489 ManagedClientTransport.Listener transportListener = transportInfo.listener; 2490 when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class))) 2491 .thenReturn(mockStream); 2492 2493 // subchannel stat bumped when call gets assigned to it 2494 assertEquals(0, getStats(oobSubchannel).callsStarted); 2495 transportListener.transportReady(); 2496 callExecutor.runDueTasks(); 2497 verify(mockStream).start(streamListenerCaptor.capture()); 2498 assertEquals(1, getStats(oobSubchannel).callsStarted); 2499 2500 ClientStreamListener streamListener = streamListenerCaptor.getValue(); 2501 call.halfClose(); 2502 2503 // closing stream listener affects subchannel stats immediately 2504 assertEquals(0, getStats(oobSubchannel).callsSucceeded); 2505 assertEquals(0, getStats(oobSubchannel).callsFailed); 2506 streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata()); 2507 if (success) { 2508 assertEquals(1, getStats(oobSubchannel).callsSucceeded); 2509 assertEquals(0, getStats(oobSubchannel).callsFailed); 2510 } else { 2511 assertEquals(0, getStats(oobSubchannel).callsSucceeded); 2512 assertEquals(1, getStats(oobSubchannel).callsFailed); 2513 } 2514 2515 // channel stats bumped when the ClientCall.Listener is notified 2516 assertEquals(0, getStats(oobChannel).callsSucceeded); 2517 assertEquals(0, getStats(oobChannel).callsFailed); 2518 callExecutor.runDueTasks(); 2519 if (success) { 2520 assertEquals(1, getStats(oobChannel).callsSucceeded); 2521 assertEquals(0, getStats(oobChannel).callsFailed); 2522 } else { 2523 assertEquals(0, getStats(oobChannel).callsSucceeded); 2524 assertEquals(1, getStats(oobChannel).callsFailed); 2525 } 2526 // oob channel is separate from the original channel 2527 assertEquals(0, getStats(channel).callsSucceeded); 2528 assertEquals(0, getStats(channel).callsFailed); 2529 } 2530 2531 @Test channelsAndSubchannels_oob_instrumented_name()2532 public void channelsAndSubchannels_oob_instrumented_name() throws Exception { 2533 createChannel(); 2534 2535 String authority = "oobauthority"; 2536 OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, authority); 2537 assertEquals(authority, getStats(oobChannel).target); 2538 } 2539 2540 @Test channelsAndSubchannels_oob_instrumented_state()2541 public void channelsAndSubchannels_oob_instrumented_state() throws Exception { 2542 createChannel(); 2543 2544 OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority"); 2545 assertEquals(IDLE, getStats(oobChannel).state); 2546 2547 oobChannel.getSubchannel().requestConnection(); 2548 assertEquals(CONNECTING, getStats(oobChannel).state); 2549 2550 MockClientTransportInfo transportInfo = transports.poll(); 2551 ManagedClientTransport.Listener transportListener = transportInfo.listener; 2552 2553 transportListener.transportReady(); 2554 assertEquals(READY, getStats(oobChannel).state); 2555 2556 // oobchannel state is separate from the ManagedChannel 2557 assertEquals(IDLE, getStats(channel).state); 2558 channel.shutdownNow(); 2559 assertEquals(SHUTDOWN, getStats(channel).state); 2560 assertEquals(SHUTDOWN, getStats(oobChannel).state); 2561 } 2562 2563 @Test binaryLogInstalled()2564 public void binaryLogInstalled() throws Exception { 2565 final SettableFuture<Boolean> intercepted = SettableFuture.create(); 2566 channelBuilder.binlog = new BinaryLog() { 2567 @Override 2568 public void close() throws IOException { 2569 // noop 2570 } 2571 2572 @Override 2573 public <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition( 2574 ServerMethodDefinition<ReqT, RespT> oMethodDef) { 2575 return oMethodDef; 2576 } 2577 2578 @Override 2579 public Channel wrapChannel(Channel channel) { 2580 return ClientInterceptors.intercept(channel, 2581 new ClientInterceptor() { 2582 @Override 2583 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( 2584 MethodDescriptor<ReqT, RespT> method, 2585 CallOptions callOptions, 2586 Channel next) { 2587 intercepted.set(true); 2588 return next.newCall(method, callOptions); 2589 } 2590 }); 2591 } 2592 }; 2593 2594 createChannel(); 2595 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 2596 call.start(mockCallListener, new Metadata()); 2597 assertTrue(intercepted.get()); 2598 } 2599 2600 @Test retryBackoffThenChannelShutdown_retryShouldStillHappen_newCallShouldFail()2601 public void retryBackoffThenChannelShutdown_retryShouldStillHappen_newCallShouldFail() { 2602 Map<String, Object> retryPolicy = new HashMap<String, Object>(); 2603 retryPolicy.put("maxAttempts", 3D); 2604 retryPolicy.put("initialBackoff", "10s"); 2605 retryPolicy.put("maxBackoff", "30s"); 2606 retryPolicy.put("backoffMultiplier", 2D); 2607 retryPolicy.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE")); 2608 Map<String, Object> methodConfig = new HashMap<String, Object>(); 2609 Map<String, Object> name = new HashMap<String, Object>(); 2610 name.put("service", "service"); 2611 methodConfig.put("name", Arrays.<Object>asList(name)); 2612 methodConfig.put("retryPolicy", retryPolicy); 2613 Map<String, Object> serviceConfig = new HashMap<String, Object>(); 2614 serviceConfig.put("methodConfig", Arrays.<Object>asList(methodConfig)); 2615 Attributes attributesWithRetryPolicy = Attributes 2616 .newBuilder().set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build(); 2617 2618 FakeNameResolverFactory nameResolverFactory = 2619 new FakeNameResolverFactory.Builder(expectedUri) 2620 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 2621 .build(); 2622 nameResolverFactory.nextResolvedAttributes.set(attributesWithRetryPolicy); 2623 channelBuilder.nameResolverFactory(nameResolverFactory); 2624 channelBuilder.executor(MoreExecutors.directExecutor()); 2625 channelBuilder.enableRetry(); 2626 RetriableStream.setRandom( 2627 // not random 2628 new Random() { 2629 @Override 2630 public double nextDouble() { 2631 return 1D; // fake random 2632 } 2633 }); 2634 2635 requestConnection = false; 2636 createChannel(); 2637 2638 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 2639 call.start(mockCallListener, new Metadata()); 2640 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); 2641 verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); 2642 helper = helperCaptor.getValue(); 2643 verify(mockLoadBalancer) 2644 .handleResolvedAddressGroups(nameResolverFactory.servers, attributesWithRetryPolicy); 2645 2646 // simulating request connection and then transport ready after resolved address 2647 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 2648 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 2649 .thenReturn(PickResult.withSubchannel(subchannel)); 2650 subchannel.requestConnection(); 2651 MockClientTransportInfo transportInfo = transports.poll(); 2652 ConnectionClientTransport mockTransport = transportInfo.transport; 2653 ClientStream mockStream = mock(ClientStream.class); 2654 ClientStream mockStream2 = mock(ClientStream.class); 2655 when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) 2656 .thenReturn(mockStream).thenReturn(mockStream2); 2657 transportInfo.listener.transportReady(); 2658 helper.updateBalancingState(READY, mockPicker); 2659 2660 ArgumentCaptor<ClientStreamListener> streamListenerCaptor = 2661 ArgumentCaptor.forClass(ClientStreamListener.class); 2662 verify(mockStream).start(streamListenerCaptor.capture()); 2663 assertThat(timer.getPendingTasks()).isEmpty(); 2664 2665 // trigger retry 2666 streamListenerCaptor.getValue().closed(Status.UNAVAILABLE, new Metadata()); 2667 2668 // in backoff 2669 timer.forwardTime(5, TimeUnit.SECONDS); 2670 assertThat(timer.getPendingTasks()).hasSize(1); 2671 verify(mockStream2, never()).start(any(ClientStreamListener.class)); 2672 2673 // shutdown during backoff period 2674 channel.shutdown(); 2675 2676 assertThat(timer.getPendingTasks()).hasSize(1); 2677 verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class)); 2678 2679 ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT); 2680 call2.start(mockCallListener2, new Metadata()); 2681 2682 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 2683 verify(mockCallListener2).onClose(statusCaptor.capture(), any(Metadata.class)); 2684 assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); 2685 assertEquals("Channel shutdown invoked", statusCaptor.getValue().getDescription()); 2686 2687 // backoff ends 2688 timer.forwardTime(5, TimeUnit.SECONDS); 2689 assertThat(timer.getPendingTasks()).isEmpty(); 2690 verify(mockStream2).start(streamListenerCaptor.capture()); 2691 verify(mockLoadBalancer, never()).shutdown(); 2692 assertFalse( 2693 "channel.isTerminated() is expected to be false but was true", 2694 channel.isTerminated()); 2695 2696 streamListenerCaptor.getValue().closed(Status.INTERNAL, new Metadata()); 2697 verify(mockLoadBalancer).shutdown(); 2698 // simulating the shutdown of load balancer triggers the shutdown of subchannel 2699 subchannel.shutdown(); 2700 transportInfo.listener.transportTerminated(); // simulating transport terminated 2701 assertTrue( 2702 "channel.isTerminated() is expected to be true but was false", 2703 channel.isTerminated()); 2704 } 2705 2706 @Test badServiceConfigIsRecoverable()2707 public void badServiceConfigIsRecoverable() throws Exception { 2708 final List<EquivalentAddressGroup> addresses = 2709 ImmutableList.of(new EquivalentAddressGroup(new SocketAddress() {})); 2710 final class FakeNameResolver extends NameResolver { 2711 Listener listener; 2712 2713 @Override 2714 public String getServiceAuthority() { 2715 return "also fake"; 2716 } 2717 2718 @Override 2719 public void start(Listener listener) { 2720 this.listener = listener; 2721 listener.onAddresses(addresses, 2722 Attributes.newBuilder() 2723 .set( 2724 GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, 2725 ImmutableMap.<String, Object>of("loadBalancingPolicy", "kaboom")) 2726 .build()); 2727 } 2728 2729 @Override 2730 public void shutdown() {} 2731 } 2732 2733 final class FakeNameResolverFactory extends NameResolver.Factory { 2734 FakeNameResolver resolver; 2735 2736 @Nullable 2737 @Override 2738 public NameResolver newNameResolver(URI targetUri, Attributes params) { 2739 return (resolver = new FakeNameResolver()); 2740 } 2741 2742 @Override 2743 public String getDefaultScheme() { 2744 return "fake"; 2745 } 2746 } 2747 2748 FakeNameResolverFactory factory = new FakeNameResolverFactory(); 2749 final class CustomBuilder extends AbstractManagedChannelImplBuilder<CustomBuilder> { 2750 2751 CustomBuilder() { 2752 super(TARGET); 2753 this.executorPool = ManagedChannelImplTest.this.executorPool; 2754 this.channelz = ManagedChannelImplTest.this.channelz; 2755 } 2756 2757 @Override 2758 protected ClientTransportFactory buildTransportFactory() { 2759 return mockTransportFactory; 2760 } 2761 } 2762 2763 ManagedChannel mychannel = new CustomBuilder() 2764 .nameResolverFactory(factory) 2765 .loadBalancerFactory(new AutoConfiguredLoadBalancerFactory(null, null)).build(); 2766 2767 ClientCall<Void, Void> call1 = 2768 mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT); 2769 ListenableFuture<Void> future1 = ClientCalls.futureUnaryCall(call1, null); 2770 executor.runDueTasks(); 2771 try { 2772 future1.get(); 2773 Assert.fail(); 2774 } catch (ExecutionException e) { 2775 assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("kaboom"); 2776 } 2777 2778 // ok the service config is bad, let's fix it. 2779 2780 factory.resolver.listener.onAddresses(addresses, 2781 Attributes.newBuilder() 2782 .set( 2783 GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, 2784 ImmutableMap.<String, Object>of("loadBalancingPolicy", "round_robin")) 2785 .build()); 2786 2787 ClientCall<Void, Void> call2 = mychannel.newCall( 2788 TestMethodDescriptors.voidMethod(), 2789 CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS)); 2790 ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null); 2791 2792 timer.forwardTime(1234, TimeUnit.SECONDS); 2793 2794 executor.runDueTasks(); 2795 try { 2796 future2.get(); 2797 Assert.fail(); 2798 } catch (ExecutionException e) { 2799 assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("deadline"); 2800 } 2801 2802 mychannel.shutdownNow(); 2803 } 2804 2805 @Test getAuthorityAfterShutdown()2806 public void getAuthorityAfterShutdown() throws Exception { 2807 createChannel(); 2808 assertEquals(SERVICE_NAME, channel.authority()); 2809 channel.shutdownNow().awaitTermination(1, TimeUnit.SECONDS); 2810 assertEquals(SERVICE_NAME, channel.authority()); 2811 } 2812 2813 private static final class ChannelBuilder 2814 extends AbstractManagedChannelImplBuilder<ChannelBuilder> { 2815 ChannelBuilder()2816 ChannelBuilder() { 2817 super(TARGET); 2818 } 2819 buildTransportFactory()2820 @Override protected ClientTransportFactory buildTransportFactory() { 2821 throw new UnsupportedOperationException(); 2822 } 2823 getNameResolverParams()2824 @Override protected Attributes getNameResolverParams() { 2825 return NAME_RESOLVER_PARAMS; 2826 } 2827 } 2828 2829 private static final class FakeBackoffPolicyProvider implements BackoffPolicy.Provider { 2830 @Override get()2831 public BackoffPolicy get() { 2832 return new BackoffPolicy() { 2833 int multiplier = 1; 2834 2835 @Override 2836 public long nextBackoffNanos() { 2837 return RECONNECT_BACKOFF_INTERVAL_NANOS * multiplier++; 2838 } 2839 }; 2840 } 2841 } 2842 2843 private static final class FakeNameResolverFactory extends NameResolver.Factory { 2844 final URI expectedUri; 2845 final List<EquivalentAddressGroup> servers; 2846 final boolean resolvedAtStart; 2847 final Status error; 2848 final ArrayList<FakeNameResolver> resolvers = new ArrayList<>(); 2849 // The Attributes argument of the next invocation of listener.onAddresses(servers, attrs) 2850 final AtomicReference<Attributes> nextResolvedAttributes = 2851 new AtomicReference<Attributes>(Attributes.EMPTY); 2852 FakeNameResolverFactory( URI expectedUri, List<EquivalentAddressGroup> servers, boolean resolvedAtStart, Status error)2853 FakeNameResolverFactory( 2854 URI expectedUri, 2855 List<EquivalentAddressGroup> servers, 2856 boolean resolvedAtStart, 2857 Status error) { 2858 this.expectedUri = expectedUri; 2859 this.servers = servers; 2860 this.resolvedAtStart = resolvedAtStart; 2861 this.error = error; 2862 } 2863 2864 @Override newNameResolver(final URI targetUri, Attributes params)2865 public NameResolver newNameResolver(final URI targetUri, Attributes params) { 2866 if (!expectedUri.equals(targetUri)) { 2867 return null; 2868 } 2869 assertSame(NAME_RESOLVER_PARAMS, params); 2870 FakeNameResolver resolver = new FakeNameResolver(error); 2871 resolvers.add(resolver); 2872 return resolver; 2873 } 2874 2875 @Override getDefaultScheme()2876 public String getDefaultScheme() { 2877 return "fake"; 2878 } 2879 allResolved()2880 void allResolved() { 2881 for (FakeNameResolver resolver : resolvers) { 2882 resolver.resolved(); 2883 } 2884 } 2885 2886 final class FakeNameResolver extends NameResolver { 2887 Listener listener; 2888 boolean shutdown; 2889 int refreshCalled; 2890 Status error; 2891 FakeNameResolver(Status error)2892 FakeNameResolver(Status error) { 2893 this.error = error; 2894 } 2895 getServiceAuthority()2896 @Override public String getServiceAuthority() { 2897 return expectedUri.getAuthority(); 2898 } 2899 start(final Listener listener)2900 @Override public void start(final Listener listener) { 2901 this.listener = listener; 2902 if (resolvedAtStart) { 2903 resolved(); 2904 } 2905 } 2906 refresh()2907 @Override public void refresh() { 2908 assertNotNull(listener); 2909 refreshCalled++; 2910 resolved(); 2911 } 2912 resolved()2913 void resolved() { 2914 if (error != null) { 2915 listener.onError(error); 2916 return; 2917 } 2918 listener.onAddresses(servers, nextResolvedAttributes.get()); 2919 } 2920 shutdown()2921 @Override public void shutdown() { 2922 shutdown = true; 2923 } 2924 } 2925 2926 static final class Builder { 2927 final URI expectedUri; 2928 List<EquivalentAddressGroup> servers = ImmutableList.<EquivalentAddressGroup>of(); 2929 boolean resolvedAtStart = true; 2930 Status error = null; 2931 Builder(URI expectedUri)2932 Builder(URI expectedUri) { 2933 this.expectedUri = expectedUri; 2934 } 2935 setServers(List<EquivalentAddressGroup> servers)2936 Builder setServers(List<EquivalentAddressGroup> servers) { 2937 this.servers = servers; 2938 return this; 2939 } 2940 setResolvedAtStart(boolean resolvedAtStart)2941 Builder setResolvedAtStart(boolean resolvedAtStart) { 2942 this.resolvedAtStart = resolvedAtStart; 2943 return this; 2944 } 2945 setError(Status error)2946 Builder setError(Status error) { 2947 this.error = error; 2948 return this; 2949 } 2950 build()2951 FakeNameResolverFactory build() { 2952 return new FakeNameResolverFactory(expectedUri, servers, resolvedAtStart, error); 2953 } 2954 } 2955 } 2956 getStats(AbstractSubchannel subchannel)2957 private static ChannelStats getStats(AbstractSubchannel subchannel) throws Exception { 2958 return subchannel.getInternalSubchannel().getStats().get(); 2959 } 2960 getStats( InternalInstrumented<ChannelStats> instrumented)2961 private static ChannelStats getStats( 2962 InternalInstrumented<ChannelStats> instrumented) throws Exception { 2963 return instrumented.getStats().get(); 2964 } 2965 getNameResolverRefresh()2966 private FakeClock.ScheduledTask getNameResolverRefresh() { 2967 return Iterables.getOnlyElement(timer.getPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER), null); 2968 } 2969 } 2970