1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkState; 20 import static com.google.common.truth.Truth.assertThat; 21 import static io.grpc.ConnectivityState.CONNECTING; 22 import static io.grpc.ConnectivityState.IDLE; 23 import static io.grpc.ConnectivityState.READY; 24 import static io.grpc.ConnectivityState.SHUTDOWN; 25 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 26 import static junit.framework.TestCase.assertNotSame; 27 import static org.junit.Assert.assertEquals; 28 import static org.junit.Assert.assertFalse; 29 import static org.junit.Assert.assertNotEquals; 30 import static org.junit.Assert.assertNotNull; 31 import static org.junit.Assert.assertNull; 32 import static org.junit.Assert.assertSame; 33 import static org.junit.Assert.assertTrue; 34 import static org.mockito.Matchers.any; 35 import static org.mockito.Matchers.anyObject; 36 import static org.mockito.Matchers.eq; 37 import static org.mockito.Matchers.same; 38 import static org.mockito.Mockito.atLeast; 39 import static org.mockito.Mockito.doAnswer; 40 import static org.mockito.Mockito.doThrow; 41 import static org.mockito.Mockito.inOrder; 42 import static org.mockito.Mockito.mock; 43 import static org.mockito.Mockito.never; 44 import static org.mockito.Mockito.times; 45 import static org.mockito.Mockito.verify; 46 import static org.mockito.Mockito.verifyNoMoreInteractions; 47 import static org.mockito.Mockito.verifyZeroInteractions; 48 import static org.mockito.Mockito.when; 49 50 import com.google.common.base.Throwables; 51 import com.google.common.collect.ImmutableList; 52 import com.google.common.collect.ImmutableMap; 53 import com.google.common.collect.Iterables; 54 import com.google.common.util.concurrent.ListenableFuture; 55 import com.google.common.util.concurrent.MoreExecutors; 56 import com.google.common.util.concurrent.SettableFuture; 57 import io.grpc.Attributes; 58 import io.grpc.BinaryLog; 59 import io.grpc.CallCredentials; 60 import io.grpc.CallCredentials.MetadataApplier; 61 import io.grpc.CallOptions; 62 import io.grpc.Channel; 63 import io.grpc.ClientCall; 64 import io.grpc.ClientInterceptor; 65 import io.grpc.ClientInterceptors; 66 import io.grpc.ClientStreamTracer; 67 import io.grpc.ConnectivityState; 68 import io.grpc.ConnectivityStateInfo; 69 import io.grpc.Context; 70 import io.grpc.EquivalentAddressGroup; 71 import io.grpc.IntegerMarshaller; 72 import io.grpc.InternalChannelz; 73 import io.grpc.InternalChannelz.ChannelStats; 74 import io.grpc.InternalChannelz.ChannelTrace; 75 import io.grpc.InternalInstrumented; 76 import io.grpc.LoadBalancer; 77 import io.grpc.LoadBalancer.Helper; 78 import io.grpc.LoadBalancer.PickResult; 79 import io.grpc.LoadBalancer.PickSubchannelArgs; 80 import io.grpc.LoadBalancer.Subchannel; 81 import io.grpc.LoadBalancer.SubchannelPicker; 82 import io.grpc.ManagedChannel; 83 import io.grpc.Metadata; 84 import io.grpc.MethodDescriptor; 85 import io.grpc.MethodDescriptor.MethodType; 86 import io.grpc.NameResolver; 87 import io.grpc.SecurityLevel; 88 import io.grpc.ServerMethodDefinition; 89 import io.grpc.Status; 90 import io.grpc.StringMarshaller; 91 import io.grpc.internal.ClientTransportFactory.ClientTransportOptions; 92 import io.grpc.internal.TestUtils.MockClientTransportInfo; 93 import io.grpc.stub.ClientCalls; 94 import io.grpc.testing.TestMethodDescriptors; 95 import java.io.IOException; 96 import java.net.SocketAddress; 97 import java.net.URI; 98 import java.util.ArrayList; 99 import java.util.Arrays; 100 import java.util.Collections; 101 import java.util.HashMap; 102 import java.util.LinkedList; 103 import java.util.List; 104 import java.util.Map; 105 import java.util.Random; 106 import java.util.concurrent.BlockingQueue; 107 import java.util.concurrent.ExecutionException; 108 import java.util.concurrent.Executor; 109 import java.util.concurrent.TimeUnit; 110 import java.util.concurrent.atomic.AtomicBoolean; 111 import java.util.concurrent.atomic.AtomicLong; 112 import java.util.concurrent.atomic.AtomicReference; 113 import javax.annotation.Nullable; 114 import org.junit.After; 115 import org.junit.Assert; 116 import org.junit.Assume; 117 import org.junit.Before; 118 import org.junit.Rule; 119 import org.junit.Test; 120 import org.junit.rules.ExpectedException; 121 import org.junit.runner.RunWith; 122 import org.junit.runners.JUnit4; 123 import org.mockito.ArgumentCaptor; 124 import org.mockito.Captor; 125 import org.mockito.InOrder; 126 import org.mockito.Matchers; 127 import org.mockito.Mock; 128 import org.mockito.MockitoAnnotations; 129 import org.mockito.invocation.InvocationOnMock; 130 import org.mockito.stubbing.Answer; 131 132 /** Unit tests for {@link ManagedChannelImpl}. */ 133 @RunWith(JUnit4.class) 134 public class ManagedChannelImplTest { 135 private static final Attributes NAME_RESOLVER_PARAMS = 136 Attributes.newBuilder().set(NameResolver.Factory.PARAMS_DEFAULT_PORT, 447).build(); 137 138 private static final MethodDescriptor<String, Integer> method = 139 MethodDescriptor.<String, Integer>newBuilder() 140 .setType(MethodType.UNKNOWN) 141 .setFullMethodName("service/method") 142 .setRequestMarshaller(new StringMarshaller()) 143 .setResponseMarshaller(new IntegerMarshaller()) 144 .build(); 145 private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY = 146 Attributes.Key.create("subchannel-attr-key"); 147 private static final long RECONNECT_BACKOFF_INTERVAL_NANOS = 10; 148 private static final String SERVICE_NAME = "fake.example.com"; 149 private static final String AUTHORITY = SERVICE_NAME; 150 private static final String USER_AGENT = "userAgent"; 151 private static final ClientTransportOptions clientTransportOptions = 152 new ClientTransportOptions() 153 .setAuthority(AUTHORITY) 154 .setUserAgent(USER_AGENT); 155 private static final String TARGET = "fake://" + SERVICE_NAME; 156 private URI expectedUri; 157 private final SocketAddress socketAddress = new SocketAddress() {}; 158 private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress); 159 private final FakeClock timer = new FakeClock(); 160 private final FakeClock executor = new FakeClock(); 161 private final FakeClock oobExecutor = new FakeClock(); 162 private static final FakeClock.TaskFilter NAME_RESOLVER_REFRESH_TASK_FILTER = 163 new FakeClock.TaskFilter() { 164 @Override 165 public boolean shouldAccept(Runnable command) { 166 return command instanceof ManagedChannelImpl.NameResolverRefresh; 167 } 168 }; 169 170 private final InternalChannelz channelz = new InternalChannelz(); 171 172 @Rule public final ExpectedException thrown = ExpectedException.none(); 173 174 private ManagedChannelImpl channel; 175 private Helper helper; 176 @Captor 177 private ArgumentCaptor<Status> statusCaptor; 178 @Captor 179 private ArgumentCaptor<CallOptions> callOptionsCaptor; 180 @Mock 181 private LoadBalancer.Factory mockLoadBalancerFactory; 182 @Mock 183 private LoadBalancer mockLoadBalancer; 184 185 @Captor 186 private ArgumentCaptor<ConnectivityStateInfo> stateInfoCaptor; 187 @Mock 188 private SubchannelPicker mockPicker; 189 @Mock 190 private ClientTransportFactory mockTransportFactory; 191 @Mock 192 private ClientCall.Listener<Integer> mockCallListener; 193 @Mock 194 private ClientCall.Listener<Integer> mockCallListener2; 195 @Mock 196 private ClientCall.Listener<Integer> mockCallListener3; 197 @Mock 198 private ClientCall.Listener<Integer> mockCallListener4; 199 @Mock 200 private ClientCall.Listener<Integer> mockCallListener5; 201 @Mock 202 private ObjectPool<Executor> executorPool; 203 @Mock 204 private ObjectPool<Executor> oobExecutorPool; 205 @Mock 206 private CallCredentials creds; 207 private ChannelBuilder channelBuilder; 208 private boolean requestConnection = true; 209 private BlockingQueue<MockClientTransportInfo> transports; 210 211 private ArgumentCaptor<ClientStreamListener> streamListenerCaptor = 212 ArgumentCaptor.forClass(ClientStreamListener.class); 213 createChannel(ClientInterceptor... interceptors)214 private void createChannel(ClientInterceptor... interceptors) { 215 checkState(channel == null); 216 TimeProvider fakeClockTimeProvider = new TimeProvider() { 217 @Override 218 public long currentTimeNanos() { 219 return timer.getTicker().read(); 220 } 221 }; 222 223 channel = new ManagedChannelImpl( 224 channelBuilder, mockTransportFactory, new FakeBackoffPolicyProvider(), 225 oobExecutorPool, timer.getStopwatchSupplier(), Arrays.asList(interceptors), 226 fakeClockTimeProvider); 227 228 if (requestConnection) { 229 int numExpectedTasks = 0; 230 231 // Force-exit the initial idle-mode 232 channel.exitIdleMode(); 233 if (channelBuilder.idleTimeoutMillis != ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE) { 234 numExpectedTasks += 1; 235 } 236 237 if (getNameResolverRefresh() != null) { 238 numExpectedTasks += 1; 239 } 240 241 assertEquals(numExpectedTasks, timer.numPendingTasks()); 242 243 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null); 244 verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); 245 helper = helperCaptor.getValue(); 246 } 247 } 248 249 @Before setUp()250 public void setUp() throws Exception { 251 MockitoAnnotations.initMocks(this); 252 expectedUri = new URI(TARGET); 253 when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer); 254 transports = TestUtils.captureTransports(mockTransportFactory); 255 when(mockTransportFactory.getScheduledExecutorService()) 256 .thenReturn(timer.getScheduledExecutorService()); 257 when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); 258 when(oobExecutorPool.getObject()).thenReturn(oobExecutor.getScheduledExecutorService()); 259 260 channelBuilder = new ChannelBuilder() 261 .nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri).build()) 262 .loadBalancerFactory(mockLoadBalancerFactory) 263 .userAgent(USER_AGENT) 264 .idleTimeout(AbstractManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS); 265 channelBuilder.executorPool = executorPool; 266 channelBuilder.binlog = null; 267 channelBuilder.channelz = channelz; 268 } 269 270 @After allPendingTasksAreRun()271 public void allPendingTasksAreRun() throws Exception { 272 // The "never" verifications in the tests only hold up if all due tasks are done. 273 // As for timer, although there may be scheduled tasks in a future time, since we don't test 274 // any time-related behavior in this test suite, we only care the tasks that are due. This 275 // would ignore any time-sensitive tasks, e.g., back-off and the idle timer. 276 assertTrue(timer.getDueTasks() + " should be empty", timer.getDueTasks().isEmpty()); 277 assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks()); 278 if (channel != null) { 279 channel.shutdownNow(); 280 channel = null; 281 } 282 } 283 284 @Test 285 @SuppressWarnings("unchecked") idleModeDisabled()286 public void idleModeDisabled() { 287 channelBuilder.nameResolverFactory( 288 new FakeNameResolverFactory.Builder(expectedUri) 289 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 290 .build()); 291 createChannel(); 292 293 // In this test suite, the channel is always created with idle mode disabled. 294 // No task is scheduled to enter idle mode 295 assertEquals(0, timer.numPendingTasks()); 296 assertEquals(0, executor.numPendingTasks()); 297 } 298 299 @Test immediateDeadlineExceeded()300 public void immediateDeadlineExceeded() { 301 createChannel(); 302 ClientCall<String, Integer> call = 303 channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS)); 304 call.start(mockCallListener, new Metadata()); 305 assertEquals(1, executor.runDueTasks()); 306 307 verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class)); 308 Status status = statusCaptor.getValue(); 309 assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode()); 310 } 311 312 @Test shutdownWithNoTransportsEverCreated()313 public void shutdownWithNoTransportsEverCreated() { 314 channelBuilder.nameResolverFactory( 315 new FakeNameResolverFactory.Builder(expectedUri) 316 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 317 .build()); 318 createChannel(); 319 verify(executorPool).getObject(); 320 verify(executorPool, never()).returnObject(anyObject()); 321 verify(mockTransportFactory).getScheduledExecutorService(); 322 verifyNoMoreInteractions(mockTransportFactory); 323 channel.shutdown(); 324 assertTrue(channel.isShutdown()); 325 assertTrue(channel.isTerminated()); 326 verify(executorPool).returnObject(executor.getScheduledExecutorService()); 327 } 328 329 @Test channelzMembership()330 public void channelzMembership() throws Exception { 331 createChannel(); 332 assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); 333 assertFalse(channelz.containsSubchannel(channel.getLogId())); 334 channel.shutdownNow(); 335 channel.awaitTermination(5, TimeUnit.SECONDS); 336 assertNull(channelz.getRootChannel(channel.getLogId().getId())); 337 assertFalse(channelz.containsSubchannel(channel.getLogId())); 338 } 339 340 @Test channelzMembership_subchannel()341 public void channelzMembership_subchannel() throws Exception { 342 createChannel(); 343 assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); 344 345 AbstractSubchannel subchannel = 346 (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY); 347 // subchannels are not root channels 348 assertNull(channelz.getRootChannel(subchannel.getInternalSubchannel().getLogId().getId())); 349 assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); 350 assertThat(getStats(channel).subchannels) 351 .containsExactly(subchannel.getInternalSubchannel()); 352 353 subchannel.requestConnection(); 354 MockClientTransportInfo transportInfo = transports.poll(); 355 assertNotNull(transportInfo); 356 assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId())); 357 358 // terminate transport 359 transportInfo.listener.transportTerminated(); 360 assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId())); 361 362 // terminate subchannel 363 assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); 364 subchannel.shutdown(); 365 timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); 366 timer.runDueTasks(); 367 assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); 368 assertThat(getStats(channel).subchannels).isEmpty(); 369 370 // channel still appears 371 assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); 372 } 373 374 @Test channelzMembership_oob()375 public void channelzMembership_oob() throws Exception { 376 createChannel(); 377 OobChannel oob = (OobChannel) helper.createOobChannel(addressGroup, AUTHORITY); 378 // oob channels are not root channels 379 assertNull(channelz.getRootChannel(oob.getLogId().getId())); 380 assertTrue(channelz.containsSubchannel(oob.getLogId())); 381 assertThat(getStats(channel).subchannels).containsExactly(oob); 382 assertTrue(channelz.containsSubchannel(oob.getLogId())); 383 384 AbstractSubchannel subchannel = (AbstractSubchannel) oob.getSubchannel(); 385 assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); 386 assertThat(getStats(oob).subchannels) 387 .containsExactly(subchannel.getInternalSubchannel()); 388 assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); 389 390 oob.getSubchannel().requestConnection(); 391 MockClientTransportInfo transportInfo = transports.poll(); 392 assertNotNull(transportInfo); 393 assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId())); 394 395 // terminate transport 396 transportInfo.listener.transportTerminated(); 397 assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId())); 398 399 // terminate oobchannel 400 oob.shutdown(); 401 assertFalse(channelz.containsSubchannel(oob.getLogId())); 402 assertThat(getStats(channel).subchannels).isEmpty(); 403 assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId())); 404 405 // channel still appears 406 assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); 407 } 408 409 @Test callsAndShutdown()410 public void callsAndShutdown() { 411 subtestCallsAndShutdown(false, false); 412 } 413 414 @Test callsAndShutdownNow()415 public void callsAndShutdownNow() { 416 subtestCallsAndShutdown(true, false); 417 } 418 419 /** Make sure shutdownNow() after shutdown() has an effect. */ 420 @Test callsAndShutdownAndShutdownNow()421 public void callsAndShutdownAndShutdownNow() { 422 subtestCallsAndShutdown(false, true); 423 } 424 subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown)425 private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown) { 426 FakeNameResolverFactory nameResolverFactory = 427 new FakeNameResolverFactory.Builder(expectedUri).build(); 428 channelBuilder.nameResolverFactory(nameResolverFactory); 429 createChannel(); 430 verify(executorPool).getObject(); 431 ClientStream mockStream = mock(ClientStream.class); 432 ClientStream mockStream2 = mock(ClientStream.class); 433 Metadata headers = new Metadata(); 434 Metadata headers2 = new Metadata(); 435 436 // Configure the picker so that first RPC goes to delayed transport, and second RPC goes to 437 // real transport. 438 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 439 subchannel.requestConnection(); 440 verify(mockTransportFactory) 441 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 442 MockClientTransportInfo transportInfo = transports.poll(); 443 ConnectionClientTransport mockTransport = transportInfo.transport; 444 verify(mockTransport).start(any(ManagedClientTransport.Listener.class)); 445 ManagedClientTransport.Listener transportListener = transportInfo.listener; 446 when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT))) 447 .thenReturn(mockStream); 448 when(mockTransport.newStream(same(method), same(headers2), same(CallOptions.DEFAULT))) 449 .thenReturn(mockStream2); 450 transportListener.transportReady(); 451 when(mockPicker.pickSubchannel( 452 new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))).thenReturn( 453 PickResult.withNoResult()); 454 when(mockPicker.pickSubchannel( 455 new PickSubchannelArgsImpl(method, headers2, CallOptions.DEFAULT))).thenReturn( 456 PickResult.withSubchannel(subchannel)); 457 helper.updateBalancingState(READY, mockPicker); 458 459 // First RPC, will be pending 460 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 461 verify(mockTransportFactory) 462 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 463 call.start(mockCallListener, headers); 464 465 verify(mockTransport, never()) 466 .newStream(same(method), same(headers), same(CallOptions.DEFAULT)); 467 468 // Second RPC, will be assigned to the real transport 469 ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT); 470 call2.start(mockCallListener2, headers2); 471 verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT)); 472 verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT)); 473 verify(mockStream2).start(any(ClientStreamListener.class)); 474 475 // Shutdown 476 if (shutdownNow) { 477 channel.shutdownNow(); 478 } else { 479 channel.shutdown(); 480 if (shutdownNowAfterShutdown) { 481 channel.shutdownNow(); 482 shutdownNow = true; 483 } 484 } 485 assertTrue(channel.isShutdown()); 486 assertFalse(channel.isTerminated()); 487 assertEquals(1, nameResolverFactory.resolvers.size()); 488 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); 489 490 // Further calls should fail without going to the transport 491 ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT); 492 call3.start(mockCallListener3, headers2); 493 timer.runDueTasks(); 494 executor.runDueTasks(); 495 496 verify(mockCallListener3).onClose(statusCaptor.capture(), any(Metadata.class)); 497 assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); 498 499 if (shutdownNow) { 500 // LoadBalancer and NameResolver are shut down as soon as delayed transport is terminated. 501 verify(mockLoadBalancer).shutdown(); 502 assertTrue(nameResolverFactory.resolvers.get(0).shutdown); 503 // call should have been aborted by delayed transport 504 executor.runDueTasks(); 505 verify(mockCallListener).onClose(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS), 506 any(Metadata.class)); 507 } else { 508 // LoadBalancer and NameResolver are still running. 509 verify(mockLoadBalancer, never()).shutdown(); 510 assertFalse(nameResolverFactory.resolvers.get(0).shutdown); 511 // call and call2 are still alive, and can still be assigned to a real transport 512 SubchannelPicker picker2 = mock(SubchannelPicker.class); 513 when(picker2.pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))) 514 .thenReturn(PickResult.withSubchannel(subchannel)); 515 helper.updateBalancingState(READY, picker2); 516 executor.runDueTasks(); 517 verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT)); 518 verify(mockStream).start(any(ClientStreamListener.class)); 519 } 520 521 // After call is moved out of delayed transport, LoadBalancer, NameResolver and the transports 522 // will be shutdown. 523 verify(mockLoadBalancer).shutdown(); 524 assertTrue(nameResolverFactory.resolvers.get(0).shutdown); 525 526 if (shutdownNow) { 527 // Channel shutdownNow() all subchannels after shutting down LoadBalancer 528 verify(mockTransport).shutdownNow(ManagedChannelImpl.SHUTDOWN_NOW_STATUS); 529 } else { 530 verify(mockTransport, never()).shutdownNow(any(Status.class)); 531 } 532 // LoadBalancer should shutdown the subchannel 533 subchannel.shutdown(); 534 if (shutdownNow) { 535 verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS)); 536 } else { 537 verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS)); 538 } 539 540 // Killing the remaining real transport will terminate the channel 541 transportListener.transportShutdown(Status.UNAVAILABLE); 542 assertFalse(channel.isTerminated()); 543 verify(executorPool, never()).returnObject(anyObject()); 544 transportListener.transportTerminated(); 545 assertTrue(channel.isTerminated()); 546 verify(executorPool).returnObject(executor.getScheduledExecutorService()); 547 verifyNoMoreInteractions(oobExecutorPool); 548 549 verify(mockTransportFactory) 550 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 551 verify(mockTransportFactory).close(); 552 verify(mockTransport, atLeast(0)).getLogId(); 553 verifyNoMoreInteractions(mockTransport); 554 } 555 556 @Test noMoreCallbackAfterLoadBalancerShutdown()557 public void noMoreCallbackAfterLoadBalancerShutdown() { 558 FakeNameResolverFactory nameResolverFactory = 559 new FakeNameResolverFactory.Builder(expectedUri) 560 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 561 .build(); 562 channelBuilder.nameResolverFactory(nameResolverFactory); 563 Status resolutionError = Status.UNAVAILABLE.withDescription("Resolution failed"); 564 createChannel(); 565 566 FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); 567 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); 568 verify(mockLoadBalancer).handleResolvedAddressGroups( 569 eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY)); 570 571 Subchannel subchannel1 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 572 Subchannel subchannel2 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 573 subchannel1.requestConnection(); 574 subchannel2.requestConnection(); 575 verify(mockTransportFactory, times(2)) 576 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 577 MockClientTransportInfo transportInfo1 = transports.poll(); 578 MockClientTransportInfo transportInfo2 = transports.poll(); 579 580 // LoadBalancer receives all sorts of callbacks 581 transportInfo1.listener.transportReady(); 582 verify(mockLoadBalancer, times(2)) 583 .handleSubchannelState(same(subchannel1), stateInfoCaptor.capture()); 584 assertSame(CONNECTING, stateInfoCaptor.getAllValues().get(0).getState()); 585 assertSame(READY, stateInfoCaptor.getAllValues().get(1).getState()); 586 587 verify(mockLoadBalancer) 588 .handleSubchannelState(same(subchannel2), stateInfoCaptor.capture()); 589 assertSame(CONNECTING, stateInfoCaptor.getValue().getState()); 590 591 resolver.listener.onError(resolutionError); 592 verify(mockLoadBalancer).handleNameResolutionError(resolutionError); 593 594 verifyNoMoreInteractions(mockLoadBalancer); 595 596 channel.shutdown(); 597 verify(mockLoadBalancer).shutdown(); 598 599 // No more callback should be delivered to LoadBalancer after it's shut down 600 transportInfo2.listener.transportReady(); 601 resolver.listener.onError(resolutionError); 602 resolver.resolved(); 603 verifyNoMoreInteractions(mockLoadBalancer); 604 } 605 606 @Test interceptor()607 public void interceptor() throws Exception { 608 final AtomicLong atomic = new AtomicLong(); 609 ClientInterceptor interceptor = new ClientInterceptor() { 610 @Override 611 public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> interceptCall( 612 MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions, 613 Channel next) { 614 atomic.set(1); 615 return next.newCall(method, callOptions); 616 } 617 }; 618 createChannel(interceptor); 619 assertNotNull(channel.newCall(method, CallOptions.DEFAULT)); 620 assertEquals(1, atomic.get()); 621 } 622 623 @Test callOptionsExecutor()624 public void callOptionsExecutor() { 625 Metadata headers = new Metadata(); 626 ClientStream mockStream = mock(ClientStream.class); 627 FakeClock callExecutor = new FakeClock(); 628 createChannel(); 629 630 // Start a call with a call executor 631 CallOptions options = 632 CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService()); 633 ClientCall<String, Integer> call = channel.newCall(method, options); 634 call.start(mockCallListener, headers); 635 636 // Make the transport available 637 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 638 verify(mockTransportFactory, never()) 639 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 640 subchannel.requestConnection(); 641 verify(mockTransportFactory) 642 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 643 MockClientTransportInfo transportInfo = transports.poll(); 644 ConnectionClientTransport mockTransport = transportInfo.transport; 645 ManagedClientTransport.Listener transportListener = transportInfo.listener; 646 when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class))) 647 .thenReturn(mockStream); 648 transportListener.transportReady(); 649 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 650 .thenReturn(PickResult.withSubchannel(subchannel)); 651 assertEquals(0, callExecutor.numPendingTasks()); 652 helper.updateBalancingState(READY, mockPicker); 653 654 // Real streams are started in the call executor if they were previously buffered. 655 assertEquals(1, callExecutor.runDueTasks()); 656 verify(mockTransport).newStream(same(method), same(headers), same(options)); 657 verify(mockStream).start(streamListenerCaptor.capture()); 658 659 // Call listener callbacks are also run in the call executor 660 ClientStreamListener streamListener = streamListenerCaptor.getValue(); 661 Metadata trailers = new Metadata(); 662 assertEquals(0, callExecutor.numPendingTasks()); 663 streamListener.closed(Status.CANCELLED, trailers); 664 verify(mockCallListener, never()).onClose(same(Status.CANCELLED), same(trailers)); 665 assertEquals(1, callExecutor.runDueTasks()); 666 verify(mockCallListener).onClose(same(Status.CANCELLED), same(trailers)); 667 668 669 transportListener.transportShutdown(Status.UNAVAILABLE); 670 transportListener.transportTerminated(); 671 672 // Clean up as much as possible to allow the channel to terminate. 673 subchannel.shutdown(); 674 timer.forwardNanos( 675 TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS)); 676 } 677 678 @Test nameResolutionFailed()679 public void nameResolutionFailed() { 680 Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error")); 681 FakeNameResolverFactory nameResolverFactory = 682 new FakeNameResolverFactory.Builder(expectedUri) 683 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 684 .setError(error) 685 .build(); 686 channelBuilder.nameResolverFactory(nameResolverFactory); 687 // Name resolution is started as soon as channel is created. 688 createChannel(); 689 FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); 690 verify(mockLoadBalancer).handleNameResolutionError(same(error)); 691 assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER)); 692 693 timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1); 694 assertEquals(0, resolver.refreshCalled); 695 696 timer.forwardNanos(1); 697 assertEquals(1, resolver.refreshCalled); 698 verify(mockLoadBalancer, times(2)).handleNameResolutionError(same(error)); 699 700 // Verify an additional name resolution failure does not schedule another timer 701 resolver.refresh(); 702 verify(mockLoadBalancer, times(3)).handleNameResolutionError(same(error)); 703 assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER)); 704 705 // Allow the next refresh attempt to succeed 706 resolver.error = null; 707 708 // For the second attempt, the backoff should occur at RECONNECT_BACKOFF_INTERVAL_NANOS * 2 709 timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS * 2 - 1); 710 assertEquals(2, resolver.refreshCalled); 711 timer.forwardNanos(1); 712 assertEquals(3, resolver.refreshCalled); 713 assertEquals(0, timer.numPendingTasks()); 714 715 // Verify that the successful resolution reset the backoff policy 716 resolver.listener.onError(error); 717 timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1); 718 assertEquals(3, resolver.refreshCalled); 719 timer.forwardNanos(1); 720 assertEquals(4, resolver.refreshCalled); 721 assertEquals(0, timer.numPendingTasks()); 722 } 723 724 @Test nameResolutionFailed_delayedTransportShutdownCancelsBackoff()725 public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() { 726 Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error")); 727 728 FakeNameResolverFactory nameResolverFactory = 729 new FakeNameResolverFactory.Builder(expectedUri).setError(error).build(); 730 channelBuilder.nameResolverFactory(nameResolverFactory); 731 // Name resolution is started as soon as channel is created. 732 createChannel(); 733 verify(mockLoadBalancer).handleNameResolutionError(same(error)); 734 735 FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh(); 736 assertNotNull(nameResolverBackoff); 737 assertFalse(nameResolverBackoff.isCancelled()); 738 739 // Add a pending call to the delayed transport 740 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 741 Metadata headers = new Metadata(); 742 call.start(mockCallListener, headers); 743 744 // The pending call on the delayed transport stops the name resolver backoff from cancelling 745 channel.shutdown(); 746 assertFalse(nameResolverBackoff.isCancelled()); 747 748 // Notify that a subchannel is ready, which drains the delayed transport 749 SubchannelPicker picker = mock(SubchannelPicker.class); 750 Status status = Status.UNAVAILABLE.withDescription("for test"); 751 when(picker.pickSubchannel(any(PickSubchannelArgs.class))) 752 .thenReturn(PickResult.withDrop(status)); 753 helper.updateBalancingState(READY, picker); 754 executor.runDueTasks(); 755 verify(mockCallListener).onClose(same(status), any(Metadata.class)); 756 757 assertTrue(nameResolverBackoff.isCancelled()); 758 } 759 760 @Test nameResolverReturnsEmptySubLists()761 public void nameResolverReturnsEmptySubLists() { 762 String errorDescription = "NameResolver returned an empty list"; 763 764 // Pass a FakeNameResolverFactory with an empty list 765 createChannel(); 766 767 // LoadBalancer received the error 768 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); 769 verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture()); 770 Status status = statusCaptor.getValue(); 771 assertSame(Status.Code.UNAVAILABLE, status.getCode()); 772 assertEquals(errorDescription, status.getDescription()); 773 } 774 775 @Test loadBalancerThrowsInHandleResolvedAddresses()776 public void loadBalancerThrowsInHandleResolvedAddresses() { 777 RuntimeException ex = new RuntimeException("simulated"); 778 // Delay the success of name resolution until allResolved() is called 779 FakeNameResolverFactory nameResolverFactory = 780 new FakeNameResolverFactory.Builder(expectedUri) 781 .setResolvedAtStart(false) 782 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 783 .build(); 784 channelBuilder.nameResolverFactory(nameResolverFactory); 785 createChannel(); 786 787 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); 788 doThrow(ex).when(mockLoadBalancer).handleResolvedAddressGroups( 789 Matchers.<List<EquivalentAddressGroup>>anyObject(), any(Attributes.class)); 790 791 // NameResolver returns addresses. 792 nameResolverFactory.allResolved(); 793 794 // Exception thrown from balancer is caught by ChannelExecutor, making channel enter panic mode. 795 verifyPanicMode(ex); 796 } 797 798 @Test nameResolvedAfterChannelShutdown()799 public void nameResolvedAfterChannelShutdown() { 800 // Delay the success of name resolution until allResolved() is called. 801 FakeNameResolverFactory nameResolverFactory = 802 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build(); 803 channelBuilder.nameResolverFactory(nameResolverFactory); 804 createChannel(); 805 806 channel.shutdown(); 807 808 assertTrue(channel.isShutdown()); 809 assertTrue(channel.isTerminated()); 810 verify(mockLoadBalancer).shutdown(); 811 // Name resolved after the channel is shut down, which is possible if the name resolution takes 812 // time and is not cancellable. The resolved address will be dropped. 813 nameResolverFactory.allResolved(); 814 verifyNoMoreInteractions(mockLoadBalancer); 815 } 816 817 /** 818 * Verify that if the first resolved address points to a server that cannot be connected, the call 819 * will end up with the second address which works. 820 */ 821 @Test firstResolvedServerFailedToConnect()822 public void firstResolvedServerFailedToConnect() throws Exception { 823 final SocketAddress goodAddress = new SocketAddress() { 824 @Override public String toString() { 825 return "goodAddress"; 826 } 827 }; 828 final SocketAddress badAddress = new SocketAddress() { 829 @Override public String toString() { 830 return "badAddress"; 831 } 832 }; 833 InOrder inOrder = inOrder(mockLoadBalancer); 834 835 List<SocketAddress> resolvedAddrs = Arrays.asList(badAddress, goodAddress); 836 FakeNameResolverFactory nameResolverFactory = 837 new FakeNameResolverFactory.Builder(expectedUri) 838 .setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs))) 839 .build(); 840 channelBuilder.nameResolverFactory(nameResolverFactory); 841 createChannel(); 842 843 // Start the call 844 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 845 Metadata headers = new Metadata(); 846 call.start(mockCallListener, headers); 847 executor.runDueTasks(); 848 849 // Simulate name resolution results 850 EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs); 851 inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups( 852 eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY)); 853 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 854 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 855 .thenReturn(PickResult.withSubchannel(subchannel)); 856 subchannel.requestConnection(); 857 inOrder.verify(mockLoadBalancer).handleSubchannelState( 858 same(subchannel), stateInfoCaptor.capture()); 859 assertEquals(CONNECTING, stateInfoCaptor.getValue().getState()); 860 861 // The channel will starts with the first address (badAddress) 862 verify(mockTransportFactory) 863 .newClientTransport(same(badAddress), any(ClientTransportOptions.class)); 864 verify(mockTransportFactory, times(0)) 865 .newClientTransport(same(goodAddress), any(ClientTransportOptions.class)); 866 867 MockClientTransportInfo badTransportInfo = transports.poll(); 868 // Which failed to connect 869 badTransportInfo.listener.transportShutdown(Status.UNAVAILABLE); 870 inOrder.verifyNoMoreInteractions(); 871 872 // The channel then try the second address (goodAddress) 873 verify(mockTransportFactory) 874 .newClientTransport(same(goodAddress), any(ClientTransportOptions.class)); 875 MockClientTransportInfo goodTransportInfo = transports.poll(); 876 when(goodTransportInfo.transport.newStream( 877 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) 878 .thenReturn(mock(ClientStream.class)); 879 880 goodTransportInfo.listener.transportReady(); 881 inOrder.verify(mockLoadBalancer).handleSubchannelState( 882 same(subchannel), stateInfoCaptor.capture()); 883 assertEquals(READY, stateInfoCaptor.getValue().getState()); 884 885 // A typical LoadBalancer will call this once the subchannel becomes READY 886 helper.updateBalancingState(READY, mockPicker); 887 // Delayed transport uses the app executor to create real streams. 888 executor.runDueTasks(); 889 890 verify(goodTransportInfo.transport).newStream(same(method), same(headers), 891 same(CallOptions.DEFAULT)); 892 // The bad transport was never used. 893 verify(badTransportInfo.transport, times(0)).newStream(any(MethodDescriptor.class), 894 any(Metadata.class), any(CallOptions.class)); 895 } 896 897 @Test failFastRpcFailFromErrorFromBalancer()898 public void failFastRpcFailFromErrorFromBalancer() { 899 subtestFailRpcFromBalancer(false, false, true); 900 } 901 902 @Test failFastRpcFailFromDropFromBalancer()903 public void failFastRpcFailFromDropFromBalancer() { 904 subtestFailRpcFromBalancer(false, true, true); 905 } 906 907 @Test waitForReadyRpcImmuneFromErrorFromBalancer()908 public void waitForReadyRpcImmuneFromErrorFromBalancer() { 909 subtestFailRpcFromBalancer(true, false, false); 910 } 911 912 @Test waitForReadyRpcFailFromDropFromBalancer()913 public void waitForReadyRpcFailFromDropFromBalancer() { 914 subtestFailRpcFromBalancer(true, true, true); 915 } 916 subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail)917 private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) { 918 createChannel(); 919 920 // This call will be buffered by the channel, thus involve delayed transport 921 CallOptions callOptions = CallOptions.DEFAULT; 922 if (waitForReady) { 923 callOptions = callOptions.withWaitForReady(); 924 } else { 925 callOptions = callOptions.withoutWaitForReady(); 926 } 927 ClientCall<String, Integer> call1 = channel.newCall(method, callOptions); 928 call1.start(mockCallListener, new Metadata()); 929 930 SubchannelPicker picker = mock(SubchannelPicker.class); 931 Status status = Status.UNAVAILABLE.withDescription("for test"); 932 933 when(picker.pickSubchannel(any(PickSubchannelArgs.class))) 934 .thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status)); 935 helper.updateBalancingState(READY, picker); 936 937 executor.runDueTasks(); 938 if (shouldFail) { 939 verify(mockCallListener).onClose(same(status), any(Metadata.class)); 940 } else { 941 verifyZeroInteractions(mockCallListener); 942 } 943 944 // This call doesn't involve delayed transport 945 ClientCall<String, Integer> call2 = channel.newCall(method, callOptions); 946 call2.start(mockCallListener2, new Metadata()); 947 948 executor.runDueTasks(); 949 if (shouldFail) { 950 verify(mockCallListener2).onClose(same(status), any(Metadata.class)); 951 } else { 952 verifyZeroInteractions(mockCallListener2); 953 } 954 } 955 956 /** 957 * Verify that if all resolved addresses failed to connect, a fail-fast call will fail, while a 958 * wait-for-ready call will still be buffered. 959 */ 960 @Test allServersFailedToConnect()961 public void allServersFailedToConnect() throws Exception { 962 final SocketAddress addr1 = new SocketAddress() { 963 @Override public String toString() { 964 return "addr1"; 965 } 966 }; 967 final SocketAddress addr2 = new SocketAddress() { 968 @Override public String toString() { 969 return "addr2"; 970 } 971 }; 972 InOrder inOrder = inOrder(mockLoadBalancer); 973 974 List<SocketAddress> resolvedAddrs = Arrays.asList(addr1, addr2); 975 976 FakeNameResolverFactory nameResolverFactory = 977 new FakeNameResolverFactory.Builder(expectedUri) 978 .setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs))) 979 .build(); 980 channelBuilder.nameResolverFactory(nameResolverFactory); 981 createChannel(); 982 983 // Start a wait-for-ready call 984 ClientCall<String, Integer> call = 985 channel.newCall(method, CallOptions.DEFAULT.withWaitForReady()); 986 Metadata headers = new Metadata(); 987 call.start(mockCallListener, headers); 988 // ... and a fail-fast call 989 ClientCall<String, Integer> call2 = 990 channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady()); 991 call2.start(mockCallListener2, headers); 992 executor.runDueTasks(); 993 994 // Simulate name resolution results 995 EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs); 996 inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups( 997 eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY)); 998 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 999 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 1000 .thenReturn(PickResult.withSubchannel(subchannel)); 1001 subchannel.requestConnection(); 1002 1003 inOrder.verify(mockLoadBalancer).handleSubchannelState( 1004 same(subchannel), stateInfoCaptor.capture()); 1005 assertEquals(CONNECTING, stateInfoCaptor.getValue().getState()); 1006 1007 // Connecting to server1, which will fail 1008 verify(mockTransportFactory) 1009 .newClientTransport(same(addr1), any(ClientTransportOptions.class)); 1010 verify(mockTransportFactory, times(0)) 1011 .newClientTransport(same(addr2), any(ClientTransportOptions.class)); 1012 MockClientTransportInfo transportInfo1 = transports.poll(); 1013 transportInfo1.listener.transportShutdown(Status.UNAVAILABLE); 1014 1015 // Connecting to server2, which will fail too 1016 verify(mockTransportFactory) 1017 .newClientTransport(same(addr2), any(ClientTransportOptions.class)); 1018 MockClientTransportInfo transportInfo2 = transports.poll(); 1019 Status server2Error = Status.UNAVAILABLE.withDescription("Server2 failed to connect"); 1020 transportInfo2.listener.transportShutdown(server2Error); 1021 1022 // ... which makes the subchannel enter TRANSIENT_FAILURE. The last error Status is propagated 1023 // to LoadBalancer. 1024 inOrder.verify(mockLoadBalancer).handleSubchannelState( 1025 same(subchannel), stateInfoCaptor.capture()); 1026 assertEquals(TRANSIENT_FAILURE, stateInfoCaptor.getValue().getState()); 1027 assertSame(server2Error, stateInfoCaptor.getValue().getStatus()); 1028 1029 // A typical LoadBalancer would create a picker with error 1030 SubchannelPicker picker2 = mock(SubchannelPicker.class); 1031 when(picker2.pickSubchannel(any(PickSubchannelArgs.class))) 1032 .thenReturn(PickResult.withError(server2Error)); 1033 helper.updateBalancingState(TRANSIENT_FAILURE, picker2); 1034 executor.runDueTasks(); 1035 1036 // ... which fails the fail-fast call 1037 verify(mockCallListener2).onClose(same(server2Error), any(Metadata.class)); 1038 // ... while the wait-for-ready call stays 1039 verifyNoMoreInteractions(mockCallListener); 1040 // No real stream was ever created 1041 verify(transportInfo1.transport, times(0)) 1042 .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); 1043 verify(transportInfo2.transport, times(0)) 1044 .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); 1045 } 1046 1047 @Test subchannels()1048 public void subchannels() { 1049 createChannel(); 1050 1051 // createSubchannel() always return a new Subchannel 1052 Attributes attrs1 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr1").build(); 1053 Attributes attrs2 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr2").build(); 1054 Subchannel sub1 = helper.createSubchannel(addressGroup, attrs1); 1055 Subchannel sub2 = helper.createSubchannel(addressGroup, attrs2); 1056 assertNotSame(sub1, sub2); 1057 assertNotSame(attrs1, attrs2); 1058 assertSame(attrs1, sub1.getAttributes()); 1059 assertSame(attrs2, sub2.getAttributes()); 1060 assertSame(addressGroup, sub1.getAddresses()); 1061 assertSame(addressGroup, sub2.getAddresses()); 1062 1063 // requestConnection() 1064 verify(mockTransportFactory, never()) 1065 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 1066 sub1.requestConnection(); 1067 verify(mockTransportFactory).newClientTransport(socketAddress, clientTransportOptions); 1068 MockClientTransportInfo transportInfo1 = transports.poll(); 1069 assertNotNull(transportInfo1); 1070 1071 sub2.requestConnection(); 1072 verify(mockTransportFactory, times(2)) 1073 .newClientTransport(socketAddress, clientTransportOptions); 1074 MockClientTransportInfo transportInfo2 = transports.poll(); 1075 assertNotNull(transportInfo2); 1076 1077 sub1.requestConnection(); 1078 sub2.requestConnection(); 1079 verify(mockTransportFactory, times(2)) 1080 .newClientTransport(socketAddress, clientTransportOptions); 1081 1082 // shutdown() has a delay 1083 sub1.shutdown(); 1084 timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS); 1085 sub1.shutdown(); 1086 verify(transportInfo1.transport, never()).shutdown(any(Status.class)); 1087 timer.forwardTime(1, TimeUnit.SECONDS); 1088 verify(transportInfo1.transport).shutdown(same(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_STATUS)); 1089 1090 // ... but not after Channel is terminating 1091 verify(mockLoadBalancer, never()).shutdown(); 1092 channel.shutdown(); 1093 verify(mockLoadBalancer).shutdown(); 1094 verify(transportInfo2.transport, never()).shutdown(any(Status.class)); 1095 1096 sub2.shutdown(); 1097 verify(transportInfo2.transport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS)); 1098 1099 // Cleanup 1100 transportInfo1.listener.transportShutdown(Status.UNAVAILABLE); 1101 transportInfo1.listener.transportTerminated(); 1102 transportInfo2.listener.transportShutdown(Status.UNAVAILABLE); 1103 transportInfo2.listener.transportTerminated(); 1104 timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); 1105 } 1106 1107 @Test subchannelsWhenChannelShutdownNow()1108 public void subchannelsWhenChannelShutdownNow() { 1109 createChannel(); 1110 Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1111 Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1112 sub1.requestConnection(); 1113 sub2.requestConnection(); 1114 1115 assertEquals(2, transports.size()); 1116 MockClientTransportInfo ti1 = transports.poll(); 1117 MockClientTransportInfo ti2 = transports.poll(); 1118 1119 ti1.listener.transportReady(); 1120 ti2.listener.transportReady(); 1121 1122 channel.shutdownNow(); 1123 verify(ti1.transport).shutdownNow(any(Status.class)); 1124 verify(ti2.transport).shutdownNow(any(Status.class)); 1125 1126 ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); 1127 ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); 1128 ti1.listener.transportTerminated(); 1129 1130 assertFalse(channel.isTerminated()); 1131 ti2.listener.transportTerminated(); 1132 assertTrue(channel.isTerminated()); 1133 } 1134 1135 @Test subchannelsNoConnectionShutdown()1136 public void subchannelsNoConnectionShutdown() { 1137 createChannel(); 1138 Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1139 Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1140 1141 channel.shutdown(); 1142 verify(mockLoadBalancer).shutdown(); 1143 sub1.shutdown(); 1144 assertFalse(channel.isTerminated()); 1145 sub2.shutdown(); 1146 assertTrue(channel.isTerminated()); 1147 verify(mockTransportFactory, never()) 1148 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 1149 } 1150 1151 @Test subchannelsNoConnectionShutdownNow()1152 public void subchannelsNoConnectionShutdownNow() { 1153 createChannel(); 1154 helper.createSubchannel(addressGroup, Attributes.EMPTY); 1155 helper.createSubchannel(addressGroup, Attributes.EMPTY); 1156 channel.shutdownNow(); 1157 1158 verify(mockLoadBalancer).shutdown(); 1159 // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels. 1160 // Therefore, channel is terminated without relying on LoadBalancer to shutdown subchannels. 1161 assertTrue(channel.isTerminated()); 1162 verify(mockTransportFactory, never()) 1163 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 1164 } 1165 1166 @Test oobchannels()1167 public void oobchannels() { 1168 createChannel(); 1169 1170 ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority"); 1171 ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority"); 1172 verify(oobExecutorPool, times(2)).getObject(); 1173 1174 assertEquals("oob1authority", oob1.authority()); 1175 assertEquals("oob2authority", oob2.authority()); 1176 1177 // OOB channels create connections lazily. A new call will initiate the connection. 1178 Metadata headers = new Metadata(); 1179 ClientCall<String, Integer> call = oob1.newCall(method, CallOptions.DEFAULT); 1180 call.start(mockCallListener, headers); 1181 verify(mockTransportFactory) 1182 .newClientTransport( 1183 socketAddress, 1184 new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT)); 1185 MockClientTransportInfo transportInfo = transports.poll(); 1186 assertNotNull(transportInfo); 1187 1188 assertEquals(0, oobExecutor.numPendingTasks()); 1189 transportInfo.listener.transportReady(); 1190 assertEquals(1, oobExecutor.runDueTasks()); 1191 verify(transportInfo.transport).newStream(same(method), same(headers), 1192 same(CallOptions.DEFAULT)); 1193 1194 // The transport goes away 1195 transportInfo.listener.transportShutdown(Status.UNAVAILABLE); 1196 transportInfo.listener.transportTerminated(); 1197 1198 // A new call will trigger a new transport 1199 ClientCall<String, Integer> call2 = oob1.newCall(method, CallOptions.DEFAULT); 1200 call2.start(mockCallListener2, headers); 1201 ClientCall<String, Integer> call3 = 1202 oob1.newCall(method, CallOptions.DEFAULT.withWaitForReady()); 1203 call3.start(mockCallListener3, headers); 1204 verify(mockTransportFactory, times(2)).newClientTransport( 1205 socketAddress, 1206 new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT)); 1207 transportInfo = transports.poll(); 1208 assertNotNull(transportInfo); 1209 1210 // This transport fails 1211 Status transportError = Status.UNAVAILABLE.withDescription("Connection refused"); 1212 assertEquals(0, oobExecutor.numPendingTasks()); 1213 transportInfo.listener.transportShutdown(transportError); 1214 assertTrue(oobExecutor.runDueTasks() > 0); 1215 1216 // Fail-fast RPC will fail, while wait-for-ready RPC will still be pending 1217 verify(mockCallListener2).onClose(same(transportError), any(Metadata.class)); 1218 verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class)); 1219 1220 // Shutdown 1221 assertFalse(oob1.isShutdown()); 1222 assertFalse(oob2.isShutdown()); 1223 oob1.shutdown(); 1224 verify(oobExecutorPool, never()).returnObject(anyObject()); 1225 oob2.shutdownNow(); 1226 assertTrue(oob1.isShutdown()); 1227 assertTrue(oob2.isShutdown()); 1228 assertTrue(oob2.isTerminated()); 1229 verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService()); 1230 1231 // New RPCs will be rejected. 1232 assertEquals(0, oobExecutor.numPendingTasks()); 1233 ClientCall<String, Integer> call4 = oob1.newCall(method, CallOptions.DEFAULT); 1234 ClientCall<String, Integer> call5 = oob2.newCall(method, CallOptions.DEFAULT); 1235 call4.start(mockCallListener4, headers); 1236 call5.start(mockCallListener5, headers); 1237 assertTrue(oobExecutor.runDueTasks() > 0); 1238 verify(mockCallListener4).onClose(statusCaptor.capture(), any(Metadata.class)); 1239 Status status4 = statusCaptor.getValue(); 1240 assertEquals(Status.Code.UNAVAILABLE, status4.getCode()); 1241 verify(mockCallListener5).onClose(statusCaptor.capture(), any(Metadata.class)); 1242 Status status5 = statusCaptor.getValue(); 1243 assertEquals(Status.Code.UNAVAILABLE, status5.getCode()); 1244 1245 // The pending RPC will still be pending 1246 verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class)); 1247 1248 // This will shutdownNow() the delayed transport, terminating the pending RPC 1249 assertEquals(0, oobExecutor.numPendingTasks()); 1250 oob1.shutdownNow(); 1251 assertTrue(oobExecutor.runDueTasks() > 0); 1252 verify(mockCallListener3).onClose(any(Status.class), any(Metadata.class)); 1253 1254 // Shut down the channel, and it will not terminated because OOB channel has not. 1255 channel.shutdown(); 1256 assertFalse(channel.isTerminated()); 1257 // Delayed transport has already terminated. Terminating the transport terminates the 1258 // subchannel, which in turn terimates the OOB channel, which terminates the channel. 1259 assertFalse(oob1.isTerminated()); 1260 verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService()); 1261 transportInfo.listener.transportTerminated(); 1262 assertTrue(oob1.isTerminated()); 1263 assertTrue(channel.isTerminated()); 1264 verify(oobExecutorPool, times(2)).returnObject(oobExecutor.getScheduledExecutorService()); 1265 } 1266 1267 @Test oobChannelsWhenChannelShutdownNow()1268 public void oobChannelsWhenChannelShutdownNow() { 1269 createChannel(); 1270 ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority"); 1271 ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority"); 1272 1273 oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata()); 1274 oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata()); 1275 1276 assertEquals(2, transports.size()); 1277 MockClientTransportInfo ti1 = transports.poll(); 1278 MockClientTransportInfo ti2 = transports.poll(); 1279 1280 ti1.listener.transportReady(); 1281 ti2.listener.transportReady(); 1282 1283 channel.shutdownNow(); 1284 verify(ti1.transport).shutdownNow(any(Status.class)); 1285 verify(ti2.transport).shutdownNow(any(Status.class)); 1286 1287 ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); 1288 ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); 1289 ti1.listener.transportTerminated(); 1290 1291 assertFalse(channel.isTerminated()); 1292 ti2.listener.transportTerminated(); 1293 assertTrue(channel.isTerminated()); 1294 } 1295 1296 @Test oobChannelsNoConnectionShutdown()1297 public void oobChannelsNoConnectionShutdown() { 1298 createChannel(); 1299 ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority"); 1300 ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority"); 1301 channel.shutdown(); 1302 1303 verify(mockLoadBalancer).shutdown(); 1304 oob1.shutdown(); 1305 assertTrue(oob1.isTerminated()); 1306 assertFalse(channel.isTerminated()); 1307 oob2.shutdown(); 1308 assertTrue(oob2.isTerminated()); 1309 assertTrue(channel.isTerminated()); 1310 verify(mockTransportFactory, never()) 1311 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 1312 } 1313 1314 @Test oobChannelsNoConnectionShutdownNow()1315 public void oobChannelsNoConnectionShutdownNow() { 1316 createChannel(); 1317 helper.createOobChannel(addressGroup, "oob1Authority"); 1318 helper.createOobChannel(addressGroup, "oob2Authority"); 1319 channel.shutdownNow(); 1320 1321 verify(mockLoadBalancer).shutdown(); 1322 assertTrue(channel.isTerminated()); 1323 // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels. 1324 // Therefore, channel is terminated without relying on LoadBalancer to shutdown oobchannels. 1325 verify(mockTransportFactory, never()) 1326 .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class)); 1327 } 1328 1329 @Test refreshNameResolutionWhenSubchannelConnectionFailed()1330 public void refreshNameResolutionWhenSubchannelConnectionFailed() { 1331 subtestRefreshNameResolutionWhenConnectionFailed(false); 1332 } 1333 1334 @Test refreshNameResolutionWhenOobChannelConnectionFailed()1335 public void refreshNameResolutionWhenOobChannelConnectionFailed() { 1336 subtestRefreshNameResolutionWhenConnectionFailed(true); 1337 } 1338 subtestRefreshNameResolutionWhenConnectionFailed(boolean isOobChannel)1339 private void subtestRefreshNameResolutionWhenConnectionFailed(boolean isOobChannel) { 1340 FakeNameResolverFactory nameResolverFactory = 1341 new FakeNameResolverFactory.Builder(expectedUri) 1342 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 1343 .build(); 1344 channelBuilder.nameResolverFactory(nameResolverFactory); 1345 createChannel(); 1346 FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); 1347 1348 if (isOobChannel) { 1349 OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobAuthority"); 1350 oobChannel.getSubchannel().requestConnection(); 1351 } else { 1352 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1353 subchannel.requestConnection(); 1354 } 1355 1356 MockClientTransportInfo transportInfo = transports.poll(); 1357 assertNotNull(transportInfo); 1358 1359 // Transport closed when connecting 1360 assertEquals(0, resolver.refreshCalled); 1361 transportInfo.listener.transportShutdown(Status.UNAVAILABLE); 1362 assertEquals(1, resolver.refreshCalled); 1363 1364 timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS); 1365 transportInfo = transports.poll(); 1366 assertNotNull(transportInfo); 1367 1368 transportInfo.listener.transportReady(); 1369 1370 // Transport closed when ready 1371 assertEquals(1, resolver.refreshCalled); 1372 transportInfo.listener.transportShutdown(Status.UNAVAILABLE); 1373 assertEquals(2, resolver.refreshCalled); 1374 } 1375 1376 @Test uriPattern()1377 public void uriPattern() { 1378 assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("a:/").matches()); 1379 assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("Z019+-.:/!@ #~ ").matches()); 1380 assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a/:").matches()); // "/:" not matched 1381 assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("0a:/").matches()); // '0' not matched 1382 assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a,:/").matches()); // ',' not matched 1383 assertFalse(ManagedChannelImpl.URI_PATTERN.matcher(" a:/").matches()); // space not matched 1384 } 1385 1386 /** 1387 * Test that information such as the Call's context, MethodDescriptor, authority, executor are 1388 * propagated to newStream() and applyRequestMetadata(). 1389 */ 1390 @Test informationPropagatedToNewStreamAndCallCredentials()1391 public void informationPropagatedToNewStreamAndCallCredentials() { 1392 createChannel(); 1393 CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds); 1394 final Context.Key<String> testKey = Context.key("testing"); 1395 Context ctx = Context.current().withValue(testKey, "testValue"); 1396 final LinkedList<Context> credsApplyContexts = new LinkedList<Context>(); 1397 final LinkedList<Context> newStreamContexts = new LinkedList<Context>(); 1398 doAnswer(new Answer<Void>() { 1399 @Override 1400 public Void answer(InvocationOnMock in) throws Throwable { 1401 credsApplyContexts.add(Context.current()); 1402 return null; 1403 } 1404 }).when(creds).applyRequestMetadata( 1405 any(MethodDescriptor.class), any(Attributes.class), any(Executor.class), 1406 any(MetadataApplier.class)); 1407 1408 // First call will be on delayed transport. Only newCall() is run within the expected context, 1409 // so that we can verify that the context is explicitly attached before calling newStream() and 1410 // applyRequestMetadata(), which happens after we detach the context from the thread. 1411 Context origCtx = ctx.attach(); 1412 assertEquals("testValue", testKey.get()); 1413 ClientCall<String, Integer> call = channel.newCall(method, callOptions); 1414 ctx.detach(origCtx); 1415 assertNull(testKey.get()); 1416 call.start(mockCallListener, new Metadata()); 1417 1418 // Simulate name resolution results 1419 EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress); 1420 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1421 subchannel.requestConnection(); 1422 verify(mockTransportFactory) 1423 .newClientTransport(same(socketAddress), eq(clientTransportOptions)); 1424 MockClientTransportInfo transportInfo = transports.poll(); 1425 final ConnectionClientTransport transport = transportInfo.transport; 1426 when(transport.getAttributes()).thenReturn(Attributes.EMPTY); 1427 doAnswer(new Answer<ClientStream>() { 1428 @Override 1429 public ClientStream answer(InvocationOnMock in) throws Throwable { 1430 newStreamContexts.add(Context.current()); 1431 return mock(ClientStream.class); 1432 } 1433 }).when(transport).newStream( 1434 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); 1435 1436 verify(creds, never()).applyRequestMetadata( 1437 any(MethodDescriptor.class), any(Attributes.class), any(Executor.class), 1438 any(MetadataApplier.class)); 1439 1440 // applyRequestMetadata() is called after the transport becomes ready. 1441 transportInfo.listener.transportReady(); 1442 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 1443 .thenReturn(PickResult.withSubchannel(subchannel)); 1444 helper.updateBalancingState(READY, mockPicker); 1445 executor.runDueTasks(); 1446 ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(Attributes.class); 1447 ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(MetadataApplier.class); 1448 verify(creds).applyRequestMetadata(same(method), attrsCaptor.capture(), 1449 same(executor.getScheduledExecutorService()), applierCaptor.capture()); 1450 assertEquals("testValue", testKey.get(credsApplyContexts.poll())); 1451 assertEquals(AUTHORITY, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY)); 1452 assertEquals(SecurityLevel.NONE, 1453 attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL)); 1454 verify(transport, never()).newStream( 1455 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); 1456 1457 // newStream() is called after apply() is called 1458 applierCaptor.getValue().apply(new Metadata()); 1459 verify(transport).newStream(same(method), any(Metadata.class), same(callOptions)); 1460 assertEquals("testValue", testKey.get(newStreamContexts.poll())); 1461 // The context should not live beyond the scope of newStream() and applyRequestMetadata() 1462 assertNull(testKey.get()); 1463 1464 1465 // Second call will not be on delayed transport 1466 origCtx = ctx.attach(); 1467 call = channel.newCall(method, callOptions); 1468 ctx.detach(origCtx); 1469 call.start(mockCallListener, new Metadata()); 1470 1471 verify(creds, times(2)).applyRequestMetadata(same(method), attrsCaptor.capture(), 1472 same(executor.getScheduledExecutorService()), applierCaptor.capture()); 1473 assertEquals("testValue", testKey.get(credsApplyContexts.poll())); 1474 assertEquals(AUTHORITY, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY)); 1475 assertEquals(SecurityLevel.NONE, 1476 attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL)); 1477 // This is from the first call 1478 verify(transport).newStream( 1479 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); 1480 1481 // Still, newStream() is called after apply() is called 1482 applierCaptor.getValue().apply(new Metadata()); 1483 verify(transport, times(2)).newStream(same(method), any(Metadata.class), same(callOptions)); 1484 assertEquals("testValue", testKey.get(newStreamContexts.poll())); 1485 1486 assertNull(testKey.get()); 1487 } 1488 1489 @Test pickerReturnsStreamTracer_noDelay()1490 public void pickerReturnsStreamTracer_noDelay() { 1491 ClientStream mockStream = mock(ClientStream.class); 1492 ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class); 1493 ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class); 1494 createChannel(); 1495 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1496 subchannel.requestConnection(); 1497 MockClientTransportInfo transportInfo = transports.poll(); 1498 transportInfo.listener.transportReady(); 1499 ClientTransport mockTransport = transportInfo.transport; 1500 when(mockTransport.newStream( 1501 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) 1502 .thenReturn(mockStream); 1503 1504 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( 1505 PickResult.withSubchannel(subchannel, factory2)); 1506 helper.updateBalancingState(READY, mockPicker); 1507 1508 CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1); 1509 ClientCall<String, Integer> call = channel.newCall(method, callOptions); 1510 call.start(mockCallListener, new Metadata()); 1511 1512 verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class)); 1513 verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture()); 1514 assertEquals( 1515 Arrays.asList(factory1, factory2), 1516 callOptionsCaptor.getValue().getStreamTracerFactories()); 1517 // The factories are safely not stubbed because we do not expect any usage of them. 1518 verifyZeroInteractions(factory1); 1519 verifyZeroInteractions(factory2); 1520 } 1521 1522 @Test pickerReturnsStreamTracer_delayed()1523 public void pickerReturnsStreamTracer_delayed() { 1524 ClientStream mockStream = mock(ClientStream.class); 1525 ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class); 1526 ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class); 1527 createChannel(); 1528 1529 CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1); 1530 ClientCall<String, Integer> call = channel.newCall(method, callOptions); 1531 call.start(mockCallListener, new Metadata()); 1532 1533 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1534 subchannel.requestConnection(); 1535 MockClientTransportInfo transportInfo = transports.poll(); 1536 transportInfo.listener.transportReady(); 1537 ClientTransport mockTransport = transportInfo.transport; 1538 when(mockTransport.newStream( 1539 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) 1540 .thenReturn(mockStream); 1541 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( 1542 PickResult.withSubchannel(subchannel, factory2)); 1543 1544 helper.updateBalancingState(READY, mockPicker); 1545 assertEquals(1, executor.runDueTasks()); 1546 1547 verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class)); 1548 verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture()); 1549 assertEquals( 1550 Arrays.asList(factory1, factory2), 1551 callOptionsCaptor.getValue().getStreamTracerFactories()); 1552 // The factories are safely not stubbed because we do not expect any usage of them. 1553 verifyZeroInteractions(factory1); 1554 verifyZeroInteractions(factory2); 1555 } 1556 1557 @Test getState_loadBalancerSupportsChannelState()1558 public void getState_loadBalancerSupportsChannelState() { 1559 channelBuilder.nameResolverFactory( 1560 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); 1561 createChannel(); 1562 assertEquals(IDLE, channel.getState(false)); 1563 1564 helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker); 1565 assertEquals(TRANSIENT_FAILURE, channel.getState(false)); 1566 } 1567 1568 @Test getState_withRequestConnect()1569 public void getState_withRequestConnect() { 1570 channelBuilder.nameResolverFactory( 1571 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); 1572 requestConnection = false; 1573 createChannel(); 1574 1575 assertEquals(IDLE, channel.getState(false)); 1576 verify(mockLoadBalancerFactory, never()).newLoadBalancer(any(Helper.class)); 1577 1578 // call getState() with requestConnection = true 1579 assertEquals(IDLE, channel.getState(true)); 1580 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null); 1581 verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); 1582 helper = helperCaptor.getValue(); 1583 1584 helper.updateBalancingState(CONNECTING, mockPicker); 1585 assertEquals(CONNECTING, channel.getState(false)); 1586 assertEquals(CONNECTING, channel.getState(true)); 1587 verifyNoMoreInteractions(mockLoadBalancerFactory); 1588 } 1589 1590 @Test getState_withRequestConnect_IdleWithLbRunning()1591 public void getState_withRequestConnect_IdleWithLbRunning() { 1592 channelBuilder.nameResolverFactory( 1593 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); 1594 createChannel(); 1595 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); 1596 1597 helper.updateBalancingState(IDLE, mockPicker); 1598 1599 assertEquals(IDLE, channel.getState(true)); 1600 verifyNoMoreInteractions(mockLoadBalancerFactory); 1601 verify(mockPicker).requestConnection(); 1602 } 1603 1604 @Test notifyWhenStateChanged()1605 public void notifyWhenStateChanged() { 1606 final AtomicBoolean stateChanged = new AtomicBoolean(); 1607 Runnable onStateChanged = new Runnable() { 1608 @Override 1609 public void run() { 1610 stateChanged.set(true); 1611 } 1612 }; 1613 1614 channelBuilder.nameResolverFactory( 1615 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); 1616 createChannel(); 1617 assertEquals(IDLE, channel.getState(false)); 1618 1619 channel.notifyWhenStateChanged(IDLE, onStateChanged); 1620 executor.runDueTasks(); 1621 assertFalse(stateChanged.get()); 1622 1623 // state change from IDLE to CONNECTING 1624 helper.updateBalancingState(CONNECTING, mockPicker); 1625 // onStateChanged callback should run 1626 executor.runDueTasks(); 1627 assertTrue(stateChanged.get()); 1628 1629 // clear and test form CONNECTING 1630 stateChanged.set(false); 1631 channel.notifyWhenStateChanged(IDLE, onStateChanged); 1632 // onStateChanged callback should run immediately 1633 executor.runDueTasks(); 1634 assertTrue(stateChanged.get()); 1635 } 1636 1637 @Test channelStateWhenChannelShutdown()1638 public void channelStateWhenChannelShutdown() { 1639 final AtomicBoolean stateChanged = new AtomicBoolean(); 1640 Runnable onStateChanged = new Runnable() { 1641 @Override 1642 public void run() { 1643 stateChanged.set(true); 1644 } 1645 }; 1646 1647 channelBuilder.nameResolverFactory( 1648 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); 1649 createChannel(); 1650 assertEquals(IDLE, channel.getState(false)); 1651 channel.notifyWhenStateChanged(IDLE, onStateChanged); 1652 executor.runDueTasks(); 1653 assertFalse(stateChanged.get()); 1654 1655 channel.shutdown(); 1656 assertEquals(SHUTDOWN, channel.getState(false)); 1657 executor.runDueTasks(); 1658 assertTrue(stateChanged.get()); 1659 1660 stateChanged.set(false); 1661 channel.notifyWhenStateChanged(SHUTDOWN, onStateChanged); 1662 helper.updateBalancingState(CONNECTING, mockPicker); 1663 1664 assertEquals(SHUTDOWN, channel.getState(false)); 1665 executor.runDueTasks(); 1666 assertFalse(stateChanged.get()); 1667 } 1668 1669 @Test stateIsIdleOnIdleTimeout()1670 public void stateIsIdleOnIdleTimeout() { 1671 long idleTimeoutMillis = 2000L; 1672 channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); 1673 createChannel(); 1674 assertEquals(IDLE, channel.getState(false)); 1675 1676 helper.updateBalancingState(CONNECTING, mockPicker); 1677 assertEquals(CONNECTING, channel.getState(false)); 1678 1679 timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); 1680 assertEquals(IDLE, channel.getState(false)); 1681 } 1682 1683 @Test panic_whenIdle()1684 public void panic_whenIdle() { 1685 subtestPanic(IDLE); 1686 } 1687 1688 @Test panic_whenConnecting()1689 public void panic_whenConnecting() { 1690 subtestPanic(CONNECTING); 1691 } 1692 1693 @Test panic_whenTransientFailure()1694 public void panic_whenTransientFailure() { 1695 subtestPanic(TRANSIENT_FAILURE); 1696 } 1697 1698 @Test panic_whenReady()1699 public void panic_whenReady() { 1700 subtestPanic(READY); 1701 } 1702 subtestPanic(ConnectivityState initialState)1703 private void subtestPanic(ConnectivityState initialState) { 1704 assertNotEquals("We don't test panic mode if it's already SHUTDOWN", SHUTDOWN, initialState); 1705 long idleTimeoutMillis = 2000L; 1706 FakeNameResolverFactory nameResolverFactory = 1707 new FakeNameResolverFactory.Builder(expectedUri).build(); 1708 channelBuilder.nameResolverFactory(nameResolverFactory); 1709 channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); 1710 createChannel(); 1711 1712 verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); 1713 assertEquals(1, nameResolverFactory.resolvers.size()); 1714 FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0); 1715 1716 Throwable panicReason = new Exception("Simulated uncaught exception"); 1717 if (initialState == IDLE) { 1718 timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); 1719 } else { 1720 helper.updateBalancingState(initialState, mockPicker); 1721 } 1722 assertEquals(initialState, channel.getState(false)); 1723 1724 if (initialState == IDLE) { 1725 // IDLE mode will shutdown resolver and balancer 1726 verify(mockLoadBalancer).shutdown(); 1727 assertTrue(resolver.shutdown); 1728 // A new resolver is created 1729 assertEquals(1, nameResolverFactory.resolvers.size()); 1730 resolver = nameResolverFactory.resolvers.remove(0); 1731 assertFalse(resolver.shutdown); 1732 } else { 1733 verify(mockLoadBalancer, never()).shutdown(); 1734 assertFalse(resolver.shutdown); 1735 } 1736 1737 // Make channel panic! 1738 channel.panic(panicReason); 1739 1740 // Calls buffered in delayedTransport will fail 1741 1742 // Resolver and balancer are shutdown 1743 verify(mockLoadBalancer).shutdown(); 1744 assertTrue(resolver.shutdown); 1745 1746 // Channel will stay in TRANSIENT_FAILURE. getState(true) will not revive it. 1747 assertEquals(TRANSIENT_FAILURE, channel.getState(true)); 1748 assertEquals(TRANSIENT_FAILURE, channel.getState(true)); 1749 verifyPanicMode(panicReason); 1750 1751 // No new resolver or balancer are created 1752 verifyNoMoreInteractions(mockLoadBalancerFactory); 1753 assertEquals(0, nameResolverFactory.resolvers.size()); 1754 1755 // A misbehaving balancer that calls updateBalancingState() after it's shut down will not be 1756 // able to revive it. 1757 helper.updateBalancingState(READY, mockPicker); 1758 verifyPanicMode(panicReason); 1759 1760 // Cannot be revived by exitIdleMode() 1761 channel.exitIdleMode(); 1762 verifyPanicMode(panicReason); 1763 1764 // Can still shutdown normally 1765 channel.shutdown(); 1766 assertTrue(channel.isShutdown()); 1767 assertTrue(channel.isTerminated()); 1768 assertEquals(SHUTDOWN, channel.getState(false)); 1769 1770 // We didn't stub mockPicker, because it should have never been called in this test. 1771 verifyZeroInteractions(mockPicker); 1772 } 1773 1774 @Test panic_bufferedCallsWillFail()1775 public void panic_bufferedCallsWillFail() { 1776 createChannel(); 1777 1778 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 1779 .thenReturn(PickResult.withNoResult()); 1780 helper.updateBalancingState(CONNECTING, mockPicker); 1781 1782 // Start RPCs that will be buffered in delayedTransport 1783 ClientCall<String, Integer> call = 1784 channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady()); 1785 call.start(mockCallListener, new Metadata()); 1786 1787 ClientCall<String, Integer> call2 = 1788 channel.newCall(method, CallOptions.DEFAULT.withWaitForReady()); 1789 call2.start(mockCallListener2, new Metadata()); 1790 1791 executor.runDueTasks(); 1792 verifyZeroInteractions(mockCallListener, mockCallListener2); 1793 1794 // Enter panic 1795 Throwable panicReason = new Exception("Simulated uncaught exception"); 1796 channel.panic(panicReason); 1797 1798 // Buffered RPCs fail immediately 1799 executor.runDueTasks(); 1800 verifyCallListenerClosed(mockCallListener, Status.Code.INTERNAL, panicReason); 1801 verifyCallListenerClosed(mockCallListener2, Status.Code.INTERNAL, panicReason); 1802 } 1803 verifyPanicMode(Throwable cause)1804 private void verifyPanicMode(Throwable cause) { 1805 Assume.assumeTrue("Panic mode disabled to resolve issues with some tests. See #3293", false); 1806 1807 @SuppressWarnings("unchecked") 1808 ClientCall.Listener<Integer> mockListener = 1809 (ClientCall.Listener<Integer>) mock(ClientCall.Listener.class); 1810 assertEquals(TRANSIENT_FAILURE, channel.getState(false)); 1811 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 1812 call.start(mockListener, new Metadata()); 1813 executor.runDueTasks(); 1814 verifyCallListenerClosed(mockListener, Status.Code.INTERNAL, cause); 1815 1816 // Channel is dead. No more pending task to possibly revive it. 1817 assertEquals(0, timer.numPendingTasks()); 1818 assertEquals(0, executor.numPendingTasks()); 1819 assertEquals(0, oobExecutor.numPendingTasks()); 1820 } 1821 verifyCallListenerClosed( ClientCall.Listener<Integer> listener, Status.Code code, Throwable cause)1822 private void verifyCallListenerClosed( 1823 ClientCall.Listener<Integer> listener, Status.Code code, Throwable cause) { 1824 ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(null); 1825 verify(listener).onClose(captor.capture(), any(Metadata.class)); 1826 Status rpcStatus = captor.getValue(); 1827 assertEquals(code, rpcStatus.getCode()); 1828 assertSame(cause, rpcStatus.getCause()); 1829 verifyNoMoreInteractions(listener); 1830 } 1831 1832 @Test idleTimeoutAndReconnect()1833 public void idleTimeoutAndReconnect() { 1834 long idleTimeoutMillis = 2000L; 1835 channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); 1836 createChannel(); 1837 1838 timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); 1839 assertEquals(IDLE, channel.getState(true /* request connection */)); 1840 1841 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); 1842 // Two times of requesting connection will create loadBalancer twice. 1843 verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture()); 1844 Helper helper2 = helperCaptor.getValue(); 1845 1846 // Updating on the old helper (whose balancer has been shutdown) does not change the channel 1847 // state. 1848 helper.updateBalancingState(CONNECTING, mockPicker); 1849 assertEquals(IDLE, channel.getState(false)); 1850 1851 helper2.updateBalancingState(CONNECTING, mockPicker); 1852 assertEquals(CONNECTING, channel.getState(false)); 1853 } 1854 1855 @Test idleMode_resetsDelayedTransportPicker()1856 public void idleMode_resetsDelayedTransportPicker() { 1857 ClientStream mockStream = mock(ClientStream.class); 1858 Status pickError = Status.UNAVAILABLE.withDescription("pick result error"); 1859 long idleTimeoutMillis = 1000L; 1860 channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); 1861 channelBuilder.nameResolverFactory( 1862 new FakeNameResolverFactory.Builder(expectedUri) 1863 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 1864 .build()); 1865 createChannel(); 1866 assertEquals(IDLE, channel.getState(false)); 1867 1868 // This call will be buffered in delayedTransport 1869 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 1870 call.start(mockCallListener, new Metadata()); 1871 1872 // Move channel into TRANSIENT_FAILURE, which will fail the pending call 1873 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 1874 .thenReturn(PickResult.withError(pickError)); 1875 helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker); 1876 assertEquals(TRANSIENT_FAILURE, channel.getState(false)); 1877 executor.runDueTasks(); 1878 verify(mockCallListener).onClose(same(pickError), any(Metadata.class)); 1879 1880 // Move channel to idle 1881 timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); 1882 assertEquals(IDLE, channel.getState(false)); 1883 1884 // This call should be buffered, but will move the channel out of idle 1885 ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT); 1886 call2.start(mockCallListener2, new Metadata()); 1887 executor.runDueTasks(); 1888 verifyNoMoreInteractions(mockCallListener2); 1889 1890 // Get the helper created on exiting idle 1891 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); 1892 verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture()); 1893 Helper helper2 = helperCaptor.getValue(); 1894 1895 // Establish a connection 1896 Subchannel subchannel = helper2.createSubchannel(addressGroup, Attributes.EMPTY); 1897 subchannel.requestConnection(); 1898 MockClientTransportInfo transportInfo = transports.poll(); 1899 ConnectionClientTransport mockTransport = transportInfo.transport; 1900 ManagedClientTransport.Listener transportListener = transportInfo.listener; 1901 when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) 1902 .thenReturn(mockStream); 1903 transportListener.transportReady(); 1904 1905 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 1906 .thenReturn(PickResult.withSubchannel(subchannel)); 1907 helper2.updateBalancingState(READY, mockPicker); 1908 assertEquals(READY, channel.getState(false)); 1909 executor.runDueTasks(); 1910 1911 // Verify the buffered call was drained 1912 verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class)); 1913 verify(mockStream).start(any(ClientStreamListener.class)); 1914 } 1915 1916 @Test enterIdleEntersIdle()1917 public void enterIdleEntersIdle() { 1918 createChannel(); 1919 helper.updateBalancingState(READY, mockPicker); 1920 assertEquals(READY, channel.getState(false)); 1921 1922 channel.enterIdle(); 1923 1924 assertEquals(IDLE, channel.getState(false)); 1925 } 1926 1927 @Test enterIdleAfterIdleTimerIsNoOp()1928 public void enterIdleAfterIdleTimerIsNoOp() { 1929 long idleTimeoutMillis = 2000L; 1930 channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS); 1931 createChannel(); 1932 timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); 1933 assertEquals(IDLE, channel.getState(false)); 1934 1935 channel.enterIdle(); 1936 1937 assertEquals(IDLE, channel.getState(false)); 1938 } 1939 1940 @Test enterIdle_exitsIdleIfDelayedStreamPending()1941 public void enterIdle_exitsIdleIfDelayedStreamPending() { 1942 FakeNameResolverFactory nameResolverFactory = 1943 new FakeNameResolverFactory.Builder(expectedUri) 1944 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 1945 .build(); 1946 channelBuilder.nameResolverFactory(nameResolverFactory); 1947 createChannel(); 1948 1949 // Start a call that will be buffered in delayedTransport 1950 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 1951 call.start(mockCallListener, new Metadata()); 1952 1953 // enterIdle() will shut down the name resolver and lb policy used to get a pick for the delayed 1954 // call 1955 channel.enterIdle(); 1956 assertEquals(IDLE, channel.getState(false)); 1957 1958 // enterIdle() will restart the delayed call by exiting idle. This creates a new helper. 1959 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); 1960 verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture()); 1961 Helper helper2 = helperCaptor.getValue(); 1962 1963 // Establish a connection 1964 Subchannel subchannel = helper2.createSubchannel(addressGroup, Attributes.EMPTY); 1965 subchannel.requestConnection(); 1966 ClientStream mockStream = mock(ClientStream.class); 1967 MockClientTransportInfo transportInfo = transports.poll(); 1968 ConnectionClientTransport mockTransport = transportInfo.transport; 1969 ManagedClientTransport.Listener transportListener = transportInfo.listener; 1970 when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) 1971 .thenReturn(mockStream); 1972 transportListener.transportReady(); 1973 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 1974 .thenReturn(PickResult.withSubchannel(subchannel)); 1975 helper2.updateBalancingState(READY, mockPicker); 1976 assertEquals(READY, channel.getState(false)); 1977 1978 // Verify the original call was drained 1979 executor.runDueTasks(); 1980 verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class)); 1981 verify(mockStream).start(any(ClientStreamListener.class)); 1982 } 1983 1984 @Test updateBalancingStateDoesUpdatePicker()1985 public void updateBalancingStateDoesUpdatePicker() { 1986 ClientStream mockStream = mock(ClientStream.class); 1987 createChannel(); 1988 1989 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 1990 call.start(mockCallListener, new Metadata()); 1991 1992 // Make the transport available with subchannel2 1993 Subchannel subchannel1 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1994 Subchannel subchannel2 = helper.createSubchannel(addressGroup, Attributes.EMPTY); 1995 subchannel2.requestConnection(); 1996 1997 MockClientTransportInfo transportInfo = transports.poll(); 1998 ConnectionClientTransport mockTransport = transportInfo.transport; 1999 ManagedClientTransport.Listener transportListener = transportInfo.listener; 2000 when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) 2001 .thenReturn(mockStream); 2002 transportListener.transportReady(); 2003 2004 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 2005 .thenReturn(PickResult.withSubchannel(subchannel1)); 2006 helper.updateBalancingState(READY, mockPicker); 2007 2008 executor.runDueTasks(); 2009 verify(mockTransport, never()) 2010 .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); 2011 verify(mockStream, never()).start(any(ClientStreamListener.class)); 2012 2013 2014 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 2015 .thenReturn(PickResult.withSubchannel(subchannel2)); 2016 helper.updateBalancingState(READY, mockPicker); 2017 2018 executor.runDueTasks(); 2019 verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class)); 2020 verify(mockStream).start(any(ClientStreamListener.class)); 2021 } 2022 2023 @Test updateBalancingStateWithShutdownShouldBeIgnored()2024 public void updateBalancingStateWithShutdownShouldBeIgnored() { 2025 channelBuilder.nameResolverFactory( 2026 new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build()); 2027 createChannel(); 2028 assertEquals(IDLE, channel.getState(false)); 2029 2030 Runnable onStateChanged = mock(Runnable.class); 2031 channel.notifyWhenStateChanged(IDLE, onStateChanged); 2032 2033 helper.updateBalancingState(SHUTDOWN, mockPicker); 2034 2035 assertEquals(IDLE, channel.getState(false)); 2036 executor.runDueTasks(); 2037 verify(onStateChanged, never()).run(); 2038 } 2039 2040 @Test resetConnectBackoff()2041 public void resetConnectBackoff() { 2042 // Start with a name resolution failure to trigger backoff attempts 2043 Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error")); 2044 FakeNameResolverFactory nameResolverFactory = 2045 new FakeNameResolverFactory.Builder(expectedUri).setError(error).build(); 2046 channelBuilder.nameResolverFactory(nameResolverFactory); 2047 // Name resolution is started as soon as channel is created. 2048 createChannel(); 2049 FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); 2050 verify(mockLoadBalancer).handleNameResolutionError(same(error)); 2051 2052 FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh(); 2053 assertNotNull("There should be a name resolver backoff task", nameResolverBackoff); 2054 assertEquals(0, resolver.refreshCalled); 2055 2056 // Verify resetConnectBackoff() calls refresh and cancels the scheduled backoff 2057 channel.resetConnectBackoff(); 2058 assertEquals(1, resolver.refreshCalled); 2059 assertTrue(nameResolverBackoff.isCancelled()); 2060 2061 // Simulate a race between cancel and the task scheduler. Should be a no-op. 2062 nameResolverBackoff.command.run(); 2063 assertEquals(1, resolver.refreshCalled); 2064 2065 // Verify that the reconnect policy was recreated and the backoff multiplier reset to 1 2066 timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS); 2067 assertEquals(2, resolver.refreshCalled); 2068 } 2069 2070 @Test resetConnectBackoff_noOpWithoutPendingResolverBackoff()2071 public void resetConnectBackoff_noOpWithoutPendingResolverBackoff() { 2072 FakeNameResolverFactory nameResolverFactory = 2073 new FakeNameResolverFactory.Builder(expectedUri) 2074 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 2075 .build(); 2076 channelBuilder.nameResolverFactory(nameResolverFactory); 2077 createChannel(); 2078 FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0); 2079 assertEquals(0, nameResolver.refreshCalled); 2080 2081 channel.resetConnectBackoff(); 2082 2083 assertEquals(0, nameResolver.refreshCalled); 2084 } 2085 2086 @Test resetConnectBackoff_noOpWhenChannelShutdown()2087 public void resetConnectBackoff_noOpWhenChannelShutdown() { 2088 FakeNameResolverFactory nameResolverFactory = 2089 new FakeNameResolverFactory.Builder(expectedUri).build(); 2090 channelBuilder.nameResolverFactory(nameResolverFactory); 2091 createChannel(); 2092 2093 channel.shutdown(); 2094 assertTrue(channel.isShutdown()); 2095 channel.resetConnectBackoff(); 2096 2097 FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0); 2098 assertEquals(0, nameResolver.refreshCalled); 2099 } 2100 2101 @Test resetConnectBackoff_noOpWhenNameResolverNotStarted()2102 public void resetConnectBackoff_noOpWhenNameResolverNotStarted() { 2103 FakeNameResolverFactory nameResolverFactory = 2104 new FakeNameResolverFactory.Builder(expectedUri).build(); 2105 channelBuilder.nameResolverFactory(nameResolverFactory); 2106 requestConnection = false; 2107 createChannel(); 2108 2109 channel.resetConnectBackoff(); 2110 2111 FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0); 2112 assertEquals(0, nameResolver.refreshCalled); 2113 } 2114 2115 @Test channelsAndSubchannels_instrumented_name()2116 public void channelsAndSubchannels_instrumented_name() throws Exception { 2117 createChannel(); 2118 assertEquals(TARGET, getStats(channel).target); 2119 2120 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 2121 assertEquals(Collections.singletonList(addressGroup).toString(), 2122 getStats((AbstractSubchannel) subchannel).target); 2123 } 2124 2125 @Test channelTracing_channelCreationEvent()2126 public void channelTracing_channelCreationEvent() throws Exception { 2127 timer.forwardNanos(1234); 2128 channelBuilder.maxTraceEvents(10); 2129 createChannel(); 2130 assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2131 .setDescription("Channel created") 2132 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2133 .setTimestampNanos(timer.getTicker().read()) 2134 .build()); 2135 } 2136 2137 @Test channelTracing_subchannelCreationEvents()2138 public void channelTracing_subchannelCreationEvents() throws Exception { 2139 channelBuilder.maxTraceEvents(10); 2140 createChannel(); 2141 timer.forwardNanos(1234); 2142 AbstractSubchannel subchannel = 2143 (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY); 2144 assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2145 .setDescription("Child channel created") 2146 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2147 .setTimestampNanos(timer.getTicker().read()) 2148 .setSubchannelRef(subchannel.getInternalSubchannel()) 2149 .build()); 2150 assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2151 .setDescription("Subchannel created") 2152 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2153 .setTimestampNanos(timer.getTicker().read()) 2154 .build()); 2155 } 2156 2157 @Test channelTracing_nameResolvingErrorEvent()2158 public void channelTracing_nameResolvingErrorEvent() throws Exception { 2159 timer.forwardNanos(1234); 2160 channelBuilder.maxTraceEvents(10); 2161 createChannel(); 2162 assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2163 .setDescription("Failed to resolve name") 2164 .setSeverity(ChannelTrace.Event.Severity.CT_WARNING) 2165 .setTimestampNanos(timer.getTicker().read()) 2166 .build()); 2167 } 2168 2169 @Test channelTracing_nameResolvedEvent()2170 public void channelTracing_nameResolvedEvent() throws Exception { 2171 timer.forwardNanos(1234); 2172 channelBuilder.maxTraceEvents(10); 2173 FakeNameResolverFactory nameResolverFactory = 2174 new FakeNameResolverFactory.Builder(expectedUri) 2175 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 2176 .build(); 2177 channelBuilder.nameResolverFactory(nameResolverFactory); 2178 createChannel(); 2179 assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2180 .setDescription("Address resolved: " 2181 + Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 2182 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2183 .setTimestampNanos(timer.getTicker().read()) 2184 .build()); 2185 } 2186 2187 @Test channelTracing_nameResolvedEvent_zeorAndNonzeroBackends()2188 public void channelTracing_nameResolvedEvent_zeorAndNonzeroBackends() throws Exception { 2189 timer.forwardNanos(1234); 2190 channelBuilder.maxTraceEvents(10); 2191 List<EquivalentAddressGroup> servers = new ArrayList<>(); 2192 servers.add(new EquivalentAddressGroup(socketAddress)); 2193 FakeNameResolverFactory nameResolverFactory = 2194 new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build(); 2195 channelBuilder.nameResolverFactory(nameResolverFactory); 2196 createChannel(); 2197 2198 int prevSize = getStats(channel).channelTrace.events.size(); 2199 nameResolverFactory.resolvers.get(0).listener.onAddresses( 2200 Collections.singletonList(new EquivalentAddressGroup( 2201 Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))), 2202 Attributes.EMPTY); 2203 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize); 2204 2205 prevSize = getStats(channel).channelTrace.events.size(); 2206 nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL); 2207 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1); 2208 2209 prevSize = getStats(channel).channelTrace.events.size(); 2210 nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL); 2211 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize); 2212 2213 prevSize = getStats(channel).channelTrace.events.size(); 2214 nameResolverFactory.resolvers.get(0).listener.onAddresses( 2215 Collections.singletonList(new EquivalentAddressGroup( 2216 Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))), 2217 Attributes.EMPTY); 2218 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1); 2219 } 2220 2221 @Test channelTracing_serviceConfigChange()2222 public void channelTracing_serviceConfigChange() throws Exception { 2223 timer.forwardNanos(1234); 2224 channelBuilder.maxTraceEvents(10); 2225 List<EquivalentAddressGroup> servers = new ArrayList<>(); 2226 servers.add(new EquivalentAddressGroup(socketAddress)); 2227 FakeNameResolverFactory nameResolverFactory = 2228 new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build(); 2229 channelBuilder.nameResolverFactory(nameResolverFactory); 2230 createChannel(); 2231 2232 int prevSize = getStats(channel).channelTrace.events.size(); 2233 Attributes attributes = 2234 Attributes.newBuilder() 2235 .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, new HashMap<String, Object>()) 2236 .build(); 2237 nameResolverFactory.resolvers.get(0).listener.onAddresses( 2238 Collections.singletonList(new EquivalentAddressGroup( 2239 Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))), 2240 attributes); 2241 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1); 2242 assertThat(getStats(channel).channelTrace.events.get(prevSize)) 2243 .isEqualTo(new ChannelTrace.Event.Builder() 2244 .setDescription("Service config changed") 2245 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2246 .setTimestampNanos(timer.getTicker().read()) 2247 .build()); 2248 2249 prevSize = getStats(channel).channelTrace.events.size(); 2250 nameResolverFactory.resolvers.get(0).listener.onAddresses( 2251 Collections.singletonList(new EquivalentAddressGroup( 2252 Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))), 2253 attributes); 2254 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize); 2255 2256 prevSize = getStats(channel).channelTrace.events.size(); 2257 Map<String, Object> serviceConfig = new HashMap<String, Object>(); 2258 serviceConfig.put("methodConfig", new HashMap<String, Object>()); 2259 attributes = 2260 Attributes.newBuilder() 2261 .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig) 2262 .build(); 2263 timer.forwardNanos(1234); 2264 nameResolverFactory.resolvers.get(0).listener.onAddresses( 2265 Collections.singletonList(new EquivalentAddressGroup( 2266 Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))), 2267 attributes); 2268 assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1); 2269 assertThat(getStats(channel).channelTrace.events.get(prevSize)) 2270 .isEqualTo(new ChannelTrace.Event.Builder() 2271 .setDescription("Service config changed") 2272 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2273 .setTimestampNanos(timer.getTicker().read()) 2274 .build()); 2275 } 2276 2277 @Test channelTracing_stateChangeEvent()2278 public void channelTracing_stateChangeEvent() throws Exception { 2279 channelBuilder.maxTraceEvents(10); 2280 createChannel(); 2281 timer.forwardNanos(1234); 2282 helper.updateBalancingState(CONNECTING, mockPicker); 2283 assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2284 .setDescription("Entering CONNECTING state") 2285 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2286 .setTimestampNanos(timer.getTicker().read()) 2287 .build()); 2288 } 2289 2290 @Test channelTracing_subchannelStateChangeEvent()2291 public void channelTracing_subchannelStateChangeEvent() throws Exception { 2292 channelBuilder.maxTraceEvents(10); 2293 createChannel(); 2294 AbstractSubchannel subchannel = 2295 (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY); 2296 timer.forwardNanos(1234); 2297 subchannel.obtainActiveTransport(); 2298 assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2299 .setDescription("Entering CONNECTING state") 2300 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2301 .setTimestampNanos(timer.getTicker().read()) 2302 .build()); 2303 } 2304 2305 @Test channelTracing_oobChannelStateChangeEvent()2306 public void channelTracing_oobChannelStateChangeEvent() throws Exception { 2307 channelBuilder.maxTraceEvents(10); 2308 createChannel(); 2309 OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority"); 2310 timer.forwardNanos(1234); 2311 oobChannel.handleSubchannelStateChange( 2312 ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING)); 2313 assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2314 .setDescription("Entering CONNECTING state") 2315 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2316 .setTimestampNanos(timer.getTicker().read()) 2317 .build()); 2318 } 2319 2320 @Test channelTracing_oobChannelCreationEvents()2321 public void channelTracing_oobChannelCreationEvents() throws Exception { 2322 channelBuilder.maxTraceEvents(10); 2323 createChannel(); 2324 timer.forwardNanos(1234); 2325 OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority"); 2326 assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2327 .setDescription("Child channel created") 2328 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2329 .setTimestampNanos(timer.getTicker().read()) 2330 .setChannelRef(oobChannel) 2331 .build()); 2332 assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() 2333 .setDescription("OobChannel created") 2334 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2335 .setTimestampNanos(timer.getTicker().read()) 2336 .build()); 2337 assertThat(getStats(oobChannel.getInternalSubchannel()).channelTrace.events).contains( 2338 new ChannelTrace.Event.Builder() 2339 .setDescription("Subchannel created") 2340 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 2341 .setTimestampNanos(timer.getTicker().read()) 2342 .build()); 2343 } 2344 2345 @Test channelsAndSubchannels_instrumented_state()2346 public void channelsAndSubchannels_instrumented_state() throws Exception { 2347 createChannel(); 2348 2349 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null); 2350 verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); 2351 helper = helperCaptor.getValue(); 2352 2353 assertEquals(IDLE, getStats(channel).state); 2354 helper.updateBalancingState(CONNECTING, mockPicker); 2355 assertEquals(CONNECTING, getStats(channel).state); 2356 2357 AbstractSubchannel subchannel = 2358 (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY); 2359 2360 assertEquals(IDLE, getStats(subchannel).state); 2361 subchannel.requestConnection(); 2362 assertEquals(CONNECTING, getStats(subchannel).state); 2363 2364 MockClientTransportInfo transportInfo = transports.poll(); 2365 2366 assertEquals(CONNECTING, getStats(subchannel).state); 2367 transportInfo.listener.transportReady(); 2368 assertEquals(READY, getStats(subchannel).state); 2369 2370 assertEquals(CONNECTING, getStats(channel).state); 2371 helper.updateBalancingState(READY, mockPicker); 2372 assertEquals(READY, getStats(channel).state); 2373 2374 channel.shutdownNow(); 2375 assertEquals(SHUTDOWN, getStats(channel).state); 2376 assertEquals(SHUTDOWN, getStats(subchannel).state); 2377 } 2378 2379 @Test channelStat_callStarted()2380 public void channelStat_callStarted() throws Exception { 2381 createChannel(); 2382 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 2383 assertEquals(0, getStats(channel).callsStarted); 2384 call.start(mockCallListener, new Metadata()); 2385 assertEquals(1, getStats(channel).callsStarted); 2386 assertEquals(executor.getTicker().read(), getStats(channel).lastCallStartedNanos); 2387 } 2388 2389 @Test channelsAndSubChannels_instrumented_success()2390 public void channelsAndSubChannels_instrumented_success() throws Exception { 2391 channelsAndSubchannels_instrumented0(true); 2392 } 2393 2394 @Test channelsAndSubChannels_instrumented_fail()2395 public void channelsAndSubChannels_instrumented_fail() throws Exception { 2396 channelsAndSubchannels_instrumented0(false); 2397 } 2398 channelsAndSubchannels_instrumented0(boolean success)2399 private void channelsAndSubchannels_instrumented0(boolean success) throws Exception { 2400 createChannel(); 2401 2402 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 2403 2404 // Channel stat bumped when ClientCall.start() called 2405 assertEquals(0, getStats(channel).callsStarted); 2406 call.start(mockCallListener, new Metadata()); 2407 assertEquals(1, getStats(channel).callsStarted); 2408 2409 ClientStream mockStream = mock(ClientStream.class); 2410 ClientStreamTracer.Factory factory = mock(ClientStreamTracer.Factory.class); 2411 AbstractSubchannel subchannel = 2412 (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY); 2413 subchannel.requestConnection(); 2414 MockClientTransportInfo transportInfo = transports.poll(); 2415 transportInfo.listener.transportReady(); 2416 ClientTransport mockTransport = transportInfo.transport; 2417 when(mockTransport.newStream( 2418 any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) 2419 .thenReturn(mockStream); 2420 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( 2421 PickResult.withSubchannel(subchannel, factory)); 2422 2423 // subchannel stat bumped when call gets assigned to it 2424 assertEquals(0, getStats(subchannel).callsStarted); 2425 helper.updateBalancingState(READY, mockPicker); 2426 assertEquals(1, executor.runDueTasks()); 2427 verify(mockStream).start(streamListenerCaptor.capture()); 2428 assertEquals(1, getStats(subchannel).callsStarted); 2429 2430 ClientStreamListener streamListener = streamListenerCaptor.getValue(); 2431 call.halfClose(); 2432 2433 // closing stream listener affects subchannel stats immediately 2434 assertEquals(0, getStats(subchannel).callsSucceeded); 2435 assertEquals(0, getStats(subchannel).callsFailed); 2436 streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata()); 2437 if (success) { 2438 assertEquals(1, getStats(subchannel).callsSucceeded); 2439 assertEquals(0, getStats(subchannel).callsFailed); 2440 } else { 2441 assertEquals(0, getStats(subchannel).callsSucceeded); 2442 assertEquals(1, getStats(subchannel).callsFailed); 2443 } 2444 2445 // channel stats bumped when the ClientCall.Listener is notified 2446 assertEquals(0, getStats(channel).callsSucceeded); 2447 assertEquals(0, getStats(channel).callsFailed); 2448 executor.runDueTasks(); 2449 if (success) { 2450 assertEquals(1, getStats(channel).callsSucceeded); 2451 assertEquals(0, getStats(channel).callsFailed); 2452 } else { 2453 assertEquals(0, getStats(channel).callsSucceeded); 2454 assertEquals(1, getStats(channel).callsFailed); 2455 } 2456 } 2457 2458 @Test channelsAndSubchannels_oob_instrumented_success()2459 public void channelsAndSubchannels_oob_instrumented_success() throws Exception { 2460 channelsAndSubchannels_oob_instrumented0(true); 2461 } 2462 2463 @Test channelsAndSubchannels_oob_instrumented_fail()2464 public void channelsAndSubchannels_oob_instrumented_fail() throws Exception { 2465 channelsAndSubchannels_oob_instrumented0(false); 2466 } 2467 channelsAndSubchannels_oob_instrumented0(boolean success)2468 private void channelsAndSubchannels_oob_instrumented0(boolean success) throws Exception { 2469 // set up 2470 ClientStream mockStream = mock(ClientStream.class); 2471 createChannel(); 2472 2473 OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority"); 2474 AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel(); 2475 FakeClock callExecutor = new FakeClock(); 2476 CallOptions options = 2477 CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService()); 2478 ClientCall<String, Integer> call = oobChannel.newCall(method, options); 2479 Metadata headers = new Metadata(); 2480 2481 // Channel stat bumped when ClientCall.start() called 2482 assertEquals(0, getStats(oobChannel).callsStarted); 2483 call.start(mockCallListener, headers); 2484 assertEquals(1, getStats(oobChannel).callsStarted); 2485 2486 MockClientTransportInfo transportInfo = transports.poll(); 2487 ConnectionClientTransport mockTransport = transportInfo.transport; 2488 ManagedClientTransport.Listener transportListener = transportInfo.listener; 2489 when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class))) 2490 .thenReturn(mockStream); 2491 2492 // subchannel stat bumped when call gets assigned to it 2493 assertEquals(0, getStats(oobSubchannel).callsStarted); 2494 transportListener.transportReady(); 2495 callExecutor.runDueTasks(); 2496 verify(mockStream).start(streamListenerCaptor.capture()); 2497 assertEquals(1, getStats(oobSubchannel).callsStarted); 2498 2499 ClientStreamListener streamListener = streamListenerCaptor.getValue(); 2500 call.halfClose(); 2501 2502 // closing stream listener affects subchannel stats immediately 2503 assertEquals(0, getStats(oobSubchannel).callsSucceeded); 2504 assertEquals(0, getStats(oobSubchannel).callsFailed); 2505 streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata()); 2506 if (success) { 2507 assertEquals(1, getStats(oobSubchannel).callsSucceeded); 2508 assertEquals(0, getStats(oobSubchannel).callsFailed); 2509 } else { 2510 assertEquals(0, getStats(oobSubchannel).callsSucceeded); 2511 assertEquals(1, getStats(oobSubchannel).callsFailed); 2512 } 2513 2514 // channel stats bumped when the ClientCall.Listener is notified 2515 assertEquals(0, getStats(oobChannel).callsSucceeded); 2516 assertEquals(0, getStats(oobChannel).callsFailed); 2517 callExecutor.runDueTasks(); 2518 if (success) { 2519 assertEquals(1, getStats(oobChannel).callsSucceeded); 2520 assertEquals(0, getStats(oobChannel).callsFailed); 2521 } else { 2522 assertEquals(0, getStats(oobChannel).callsSucceeded); 2523 assertEquals(1, getStats(oobChannel).callsFailed); 2524 } 2525 // oob channel is separate from the original channel 2526 assertEquals(0, getStats(channel).callsSucceeded); 2527 assertEquals(0, getStats(channel).callsFailed); 2528 } 2529 2530 @Test channelsAndSubchannels_oob_instrumented_name()2531 public void channelsAndSubchannels_oob_instrumented_name() throws Exception { 2532 createChannel(); 2533 2534 String authority = "oobauthority"; 2535 OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, authority); 2536 assertEquals(authority, getStats(oobChannel).target); 2537 } 2538 2539 @Test channelsAndSubchannels_oob_instrumented_state()2540 public void channelsAndSubchannels_oob_instrumented_state() throws Exception { 2541 createChannel(); 2542 2543 OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority"); 2544 assertEquals(IDLE, getStats(oobChannel).state); 2545 2546 oobChannel.getSubchannel().requestConnection(); 2547 assertEquals(CONNECTING, getStats(oobChannel).state); 2548 2549 MockClientTransportInfo transportInfo = transports.poll(); 2550 ManagedClientTransport.Listener transportListener = transportInfo.listener; 2551 2552 transportListener.transportReady(); 2553 assertEquals(READY, getStats(oobChannel).state); 2554 2555 // oobchannel state is separate from the ManagedChannel 2556 assertEquals(IDLE, getStats(channel).state); 2557 channel.shutdownNow(); 2558 assertEquals(SHUTDOWN, getStats(channel).state); 2559 assertEquals(SHUTDOWN, getStats(oobChannel).state); 2560 } 2561 2562 @Test binaryLogInstalled()2563 public void binaryLogInstalled() throws Exception { 2564 final SettableFuture<Boolean> intercepted = SettableFuture.create(); 2565 channelBuilder.binlog = new BinaryLog() { 2566 @Override 2567 public void close() throws IOException { 2568 // noop 2569 } 2570 2571 @Override 2572 public <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition( 2573 ServerMethodDefinition<ReqT, RespT> oMethodDef) { 2574 return oMethodDef; 2575 } 2576 2577 @Override 2578 public Channel wrapChannel(Channel channel) { 2579 return ClientInterceptors.intercept(channel, 2580 new ClientInterceptor() { 2581 @Override 2582 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( 2583 MethodDescriptor<ReqT, RespT> method, 2584 CallOptions callOptions, 2585 Channel next) { 2586 intercepted.set(true); 2587 return next.newCall(method, callOptions); 2588 } 2589 }); 2590 } 2591 }; 2592 2593 createChannel(); 2594 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 2595 call.start(mockCallListener, new Metadata()); 2596 assertTrue(intercepted.get()); 2597 } 2598 2599 @Test retryBackoffThenChannelShutdown_retryShouldStillHappen_newCallShouldFail()2600 public void retryBackoffThenChannelShutdown_retryShouldStillHappen_newCallShouldFail() { 2601 Map<String, Object> retryPolicy = new HashMap<String, Object>(); 2602 retryPolicy.put("maxAttempts", 3D); 2603 retryPolicy.put("initialBackoff", "10s"); 2604 retryPolicy.put("maxBackoff", "30s"); 2605 retryPolicy.put("backoffMultiplier", 2D); 2606 retryPolicy.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE")); 2607 Map<String, Object> methodConfig = new HashMap<String, Object>(); 2608 Map<String, Object> name = new HashMap<String, Object>(); 2609 name.put("service", "service"); 2610 methodConfig.put("name", Arrays.<Object>asList(name)); 2611 methodConfig.put("retryPolicy", retryPolicy); 2612 Map<String, Object> serviceConfig = new HashMap<String, Object>(); 2613 serviceConfig.put("methodConfig", Arrays.<Object>asList(methodConfig)); 2614 Attributes attributesWithRetryPolicy = Attributes 2615 .newBuilder().set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build(); 2616 2617 FakeNameResolverFactory nameResolverFactory = 2618 new FakeNameResolverFactory.Builder(expectedUri) 2619 .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) 2620 .build(); 2621 nameResolverFactory.nextResolvedAttributes.set(attributesWithRetryPolicy); 2622 channelBuilder.nameResolverFactory(nameResolverFactory); 2623 channelBuilder.executor(MoreExecutors.directExecutor()); 2624 channelBuilder.enableRetry(); 2625 RetriableStream.setRandom( 2626 // not random 2627 new Random() { 2628 @Override 2629 public double nextDouble() { 2630 return 1D; // fake random 2631 } 2632 }); 2633 2634 requestConnection = false; 2635 createChannel(); 2636 2637 ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); 2638 call.start(mockCallListener, new Metadata()); 2639 ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); 2640 verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); 2641 helper = helperCaptor.getValue(); 2642 verify(mockLoadBalancer) 2643 .handleResolvedAddressGroups(nameResolverFactory.servers, attributesWithRetryPolicy); 2644 2645 // simulating request connection and then transport ready after resolved address 2646 Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); 2647 when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) 2648 .thenReturn(PickResult.withSubchannel(subchannel)); 2649 subchannel.requestConnection(); 2650 MockClientTransportInfo transportInfo = transports.poll(); 2651 ConnectionClientTransport mockTransport = transportInfo.transport; 2652 ClientStream mockStream = mock(ClientStream.class); 2653 ClientStream mockStream2 = mock(ClientStream.class); 2654 when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) 2655 .thenReturn(mockStream).thenReturn(mockStream2); 2656 transportInfo.listener.transportReady(); 2657 helper.updateBalancingState(READY, mockPicker); 2658 2659 ArgumentCaptor<ClientStreamListener> streamListenerCaptor = 2660 ArgumentCaptor.forClass(ClientStreamListener.class); 2661 verify(mockStream).start(streamListenerCaptor.capture()); 2662 assertThat(timer.getPendingTasks()).isEmpty(); 2663 2664 // trigger retry 2665 streamListenerCaptor.getValue().closed(Status.UNAVAILABLE, new Metadata()); 2666 2667 // in backoff 2668 timer.forwardTime(5, TimeUnit.SECONDS); 2669 assertThat(timer.getPendingTasks()).hasSize(1); 2670 verify(mockStream2, never()).start(any(ClientStreamListener.class)); 2671 2672 // shutdown during backoff period 2673 channel.shutdown(); 2674 2675 assertThat(timer.getPendingTasks()).hasSize(1); 2676 verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class)); 2677 2678 ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT); 2679 call2.start(mockCallListener2, new Metadata()); 2680 2681 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 2682 verify(mockCallListener2).onClose(statusCaptor.capture(), any(Metadata.class)); 2683 assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); 2684 assertEquals("Channel shutdown invoked", statusCaptor.getValue().getDescription()); 2685 2686 // backoff ends 2687 timer.forwardTime(5, TimeUnit.SECONDS); 2688 assertThat(timer.getPendingTasks()).isEmpty(); 2689 verify(mockStream2).start(streamListenerCaptor.capture()); 2690 verify(mockLoadBalancer, never()).shutdown(); 2691 assertFalse( 2692 "channel.isTerminated() is expected to be false but was true", 2693 channel.isTerminated()); 2694 2695 streamListenerCaptor.getValue().closed(Status.INTERNAL, new Metadata()); 2696 verify(mockLoadBalancer).shutdown(); 2697 // simulating the shutdown of load balancer triggers the shutdown of subchannel 2698 subchannel.shutdown(); 2699 transportInfo.listener.transportTerminated(); // simulating transport terminated 2700 assertTrue( 2701 "channel.isTerminated() is expected to be true but was false", 2702 channel.isTerminated()); 2703 } 2704 2705 @Test badServiceConfigIsRecoverable()2706 public void badServiceConfigIsRecoverable() throws Exception { 2707 final List<EquivalentAddressGroup> addresses = 2708 ImmutableList.of(new EquivalentAddressGroup(new SocketAddress() {})); 2709 final class FakeNameResolver extends NameResolver { 2710 Listener listener; 2711 2712 @Override 2713 public String getServiceAuthority() { 2714 return "also fake"; 2715 } 2716 2717 @Override 2718 public void start(Listener listener) { 2719 this.listener = listener; 2720 listener.onAddresses(addresses, 2721 Attributes.newBuilder() 2722 .set( 2723 GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, 2724 ImmutableMap.<String, Object>of("loadBalancingPolicy", "kaboom")) 2725 .build()); 2726 } 2727 2728 @Override 2729 public void shutdown() {} 2730 } 2731 2732 final class FakeNameResolverFactory extends NameResolver.Factory { 2733 FakeNameResolver resolver; 2734 2735 @Nullable 2736 @Override 2737 public NameResolver newNameResolver(URI targetUri, Attributes params) { 2738 return (resolver = new FakeNameResolver()); 2739 } 2740 2741 @Override 2742 public String getDefaultScheme() { 2743 return "fake"; 2744 } 2745 } 2746 2747 FakeNameResolverFactory factory = new FakeNameResolverFactory(); 2748 final class CustomBuilder extends AbstractManagedChannelImplBuilder<CustomBuilder> { 2749 2750 CustomBuilder() { 2751 super(TARGET); 2752 this.executorPool = ManagedChannelImplTest.this.executorPool; 2753 this.channelz = ManagedChannelImplTest.this.channelz; 2754 } 2755 2756 @Override 2757 protected ClientTransportFactory buildTransportFactory() { 2758 return mockTransportFactory; 2759 } 2760 } 2761 2762 ManagedChannel mychannel = new CustomBuilder() 2763 .nameResolverFactory(factory) 2764 .loadBalancerFactory(new AutoConfiguredLoadBalancerFactory()).build(); 2765 2766 ClientCall<Void, Void> call1 = 2767 mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT); 2768 ListenableFuture<Void> future1 = ClientCalls.futureUnaryCall(call1, null); 2769 executor.runDueTasks(); 2770 try { 2771 future1.get(); 2772 Assert.fail(); 2773 } catch (ExecutionException e) { 2774 assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("kaboom"); 2775 } 2776 2777 // ok the service config is bad, let's fix it. 2778 2779 factory.resolver.listener.onAddresses(addresses, 2780 Attributes.newBuilder() 2781 .set( 2782 GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, 2783 ImmutableMap.<String, Object>of("loadBalancingPolicy", "round_robin")) 2784 .build()); 2785 2786 ClientCall<Void, Void> call2 = mychannel.newCall( 2787 TestMethodDescriptors.voidMethod(), 2788 CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS)); 2789 ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null); 2790 2791 timer.forwardTime(1234, TimeUnit.SECONDS); 2792 2793 executor.runDueTasks(); 2794 try { 2795 future2.get(); 2796 Assert.fail(); 2797 } catch (ExecutionException e) { 2798 assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("deadline"); 2799 } 2800 2801 mychannel.shutdownNow(); 2802 } 2803 2804 private static final class ChannelBuilder 2805 extends AbstractManagedChannelImplBuilder<ChannelBuilder> { 2806 ChannelBuilder()2807 ChannelBuilder() { 2808 super(TARGET); 2809 } 2810 buildTransportFactory()2811 @Override protected ClientTransportFactory buildTransportFactory() { 2812 throw new UnsupportedOperationException(); 2813 } 2814 getNameResolverParams()2815 @Override protected Attributes getNameResolverParams() { 2816 return NAME_RESOLVER_PARAMS; 2817 } 2818 } 2819 2820 private static final class FakeBackoffPolicyProvider implements BackoffPolicy.Provider { 2821 @Override get()2822 public BackoffPolicy get() { 2823 return new BackoffPolicy() { 2824 int multiplier = 1; 2825 2826 @Override 2827 public long nextBackoffNanos() { 2828 return RECONNECT_BACKOFF_INTERVAL_NANOS * multiplier++; 2829 } 2830 }; 2831 } 2832 } 2833 2834 private static final class FakeNameResolverFactory extends NameResolver.Factory { 2835 final URI expectedUri; 2836 final List<EquivalentAddressGroup> servers; 2837 final boolean resolvedAtStart; 2838 final Status error; 2839 final ArrayList<FakeNameResolver> resolvers = new ArrayList<>(); 2840 // The Attributes argument of the next invocation of listener.onAddresses(servers, attrs) 2841 final AtomicReference<Attributes> nextResolvedAttributes = 2842 new AtomicReference<Attributes>(Attributes.EMPTY); 2843 FakeNameResolverFactory( URI expectedUri, List<EquivalentAddressGroup> servers, boolean resolvedAtStart, Status error)2844 FakeNameResolverFactory( 2845 URI expectedUri, 2846 List<EquivalentAddressGroup> servers, 2847 boolean resolvedAtStart, 2848 Status error) { 2849 this.expectedUri = expectedUri; 2850 this.servers = servers; 2851 this.resolvedAtStart = resolvedAtStart; 2852 this.error = error; 2853 } 2854 2855 @Override newNameResolver(final URI targetUri, Attributes params)2856 public NameResolver newNameResolver(final URI targetUri, Attributes params) { 2857 if (!expectedUri.equals(targetUri)) { 2858 return null; 2859 } 2860 assertSame(NAME_RESOLVER_PARAMS, params); 2861 FakeNameResolver resolver = new FakeNameResolver(error); 2862 resolvers.add(resolver); 2863 return resolver; 2864 } 2865 2866 @Override getDefaultScheme()2867 public String getDefaultScheme() { 2868 return "fake"; 2869 } 2870 allResolved()2871 void allResolved() { 2872 for (FakeNameResolver resolver : resolvers) { 2873 resolver.resolved(); 2874 } 2875 } 2876 2877 final class FakeNameResolver extends NameResolver { 2878 Listener listener; 2879 boolean shutdown; 2880 int refreshCalled; 2881 Status error; 2882 FakeNameResolver(Status error)2883 FakeNameResolver(Status error) { 2884 this.error = error; 2885 } 2886 getServiceAuthority()2887 @Override public String getServiceAuthority() { 2888 return expectedUri.getAuthority(); 2889 } 2890 start(final Listener listener)2891 @Override public void start(final Listener listener) { 2892 this.listener = listener; 2893 if (resolvedAtStart) { 2894 resolved(); 2895 } 2896 } 2897 refresh()2898 @Override public void refresh() { 2899 assertNotNull(listener); 2900 refreshCalled++; 2901 resolved(); 2902 } 2903 resolved()2904 void resolved() { 2905 if (error != null) { 2906 listener.onError(error); 2907 return; 2908 } 2909 listener.onAddresses(servers, nextResolvedAttributes.get()); 2910 } 2911 shutdown()2912 @Override public void shutdown() { 2913 shutdown = true; 2914 } 2915 } 2916 2917 static final class Builder { 2918 final URI expectedUri; 2919 List<EquivalentAddressGroup> servers = ImmutableList.<EquivalentAddressGroup>of(); 2920 boolean resolvedAtStart = true; 2921 Status error = null; 2922 Builder(URI expectedUri)2923 Builder(URI expectedUri) { 2924 this.expectedUri = expectedUri; 2925 } 2926 setServers(List<EquivalentAddressGroup> servers)2927 Builder setServers(List<EquivalentAddressGroup> servers) { 2928 this.servers = servers; 2929 return this; 2930 } 2931 setResolvedAtStart(boolean resolvedAtStart)2932 Builder setResolvedAtStart(boolean resolvedAtStart) { 2933 this.resolvedAtStart = resolvedAtStart; 2934 return this; 2935 } 2936 setError(Status error)2937 Builder setError(Status error) { 2938 this.error = error; 2939 return this; 2940 } 2941 build()2942 FakeNameResolverFactory build() { 2943 return new FakeNameResolverFactory(expectedUri, servers, resolvedAtStart, error); 2944 } 2945 } 2946 } 2947 getStats(AbstractSubchannel subchannel)2948 private static ChannelStats getStats(AbstractSubchannel subchannel) throws Exception { 2949 return subchannel.getInternalSubchannel().getStats().get(); 2950 } 2951 getStats( InternalInstrumented<ChannelStats> instrumented)2952 private static ChannelStats getStats( 2953 InternalInstrumented<ChannelStats> instrumented) throws Exception { 2954 return instrumented.getStats().get(); 2955 } 2956 getNameResolverRefresh()2957 private FakeClock.ScheduledTask getNameResolverRefresh() { 2958 return Iterables.getOnlyElement(timer.getPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER), null); 2959 } 2960 } 2961