• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkState;
20 import static com.google.common.truth.Truth.assertThat;
21 import static io.grpc.ConnectivityState.CONNECTING;
22 import static io.grpc.ConnectivityState.IDLE;
23 import static io.grpc.ConnectivityState.READY;
24 import static io.grpc.ConnectivityState.SHUTDOWN;
25 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
26 import static junit.framework.TestCase.assertNotSame;
27 import static org.junit.Assert.assertEquals;
28 import static org.junit.Assert.assertFalse;
29 import static org.junit.Assert.assertNotEquals;
30 import static org.junit.Assert.assertNotNull;
31 import static org.junit.Assert.assertNull;
32 import static org.junit.Assert.assertSame;
33 import static org.junit.Assert.assertTrue;
34 import static org.mockito.Matchers.any;
35 import static org.mockito.Matchers.anyObject;
36 import static org.mockito.Matchers.eq;
37 import static org.mockito.Matchers.same;
38 import static org.mockito.Mockito.atLeast;
39 import static org.mockito.Mockito.doAnswer;
40 import static org.mockito.Mockito.doThrow;
41 import static org.mockito.Mockito.inOrder;
42 import static org.mockito.Mockito.mock;
43 import static org.mockito.Mockito.never;
44 import static org.mockito.Mockito.times;
45 import static org.mockito.Mockito.verify;
46 import static org.mockito.Mockito.verifyNoMoreInteractions;
47 import static org.mockito.Mockito.verifyZeroInteractions;
48 import static org.mockito.Mockito.when;
49 
50 import com.google.common.base.Throwables;
51 import com.google.common.collect.ImmutableList;
52 import com.google.common.collect.ImmutableMap;
53 import com.google.common.collect.Iterables;
54 import com.google.common.util.concurrent.ListenableFuture;
55 import com.google.common.util.concurrent.MoreExecutors;
56 import com.google.common.util.concurrent.SettableFuture;
57 import io.grpc.Attributes;
58 import io.grpc.BinaryLog;
59 import io.grpc.CallCredentials;
60 import io.grpc.CallCredentials.MetadataApplier;
61 import io.grpc.CallOptions;
62 import io.grpc.Channel;
63 import io.grpc.ClientCall;
64 import io.grpc.ClientInterceptor;
65 import io.grpc.ClientInterceptors;
66 import io.grpc.ClientStreamTracer;
67 import io.grpc.ConnectivityState;
68 import io.grpc.ConnectivityStateInfo;
69 import io.grpc.Context;
70 import io.grpc.EquivalentAddressGroup;
71 import io.grpc.IntegerMarshaller;
72 import io.grpc.InternalChannelz;
73 import io.grpc.InternalChannelz.ChannelStats;
74 import io.grpc.InternalChannelz.ChannelTrace;
75 import io.grpc.InternalInstrumented;
76 import io.grpc.LoadBalancer;
77 import io.grpc.LoadBalancer.Helper;
78 import io.grpc.LoadBalancer.PickResult;
79 import io.grpc.LoadBalancer.PickSubchannelArgs;
80 import io.grpc.LoadBalancer.Subchannel;
81 import io.grpc.LoadBalancer.SubchannelPicker;
82 import io.grpc.ManagedChannel;
83 import io.grpc.Metadata;
84 import io.grpc.MethodDescriptor;
85 import io.grpc.MethodDescriptor.MethodType;
86 import io.grpc.NameResolver;
87 import io.grpc.SecurityLevel;
88 import io.grpc.ServerMethodDefinition;
89 import io.grpc.Status;
90 import io.grpc.StringMarshaller;
91 import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
92 import io.grpc.internal.TestUtils.MockClientTransportInfo;
93 import io.grpc.stub.ClientCalls;
94 import io.grpc.testing.TestMethodDescriptors;
95 import java.io.IOException;
96 import java.net.SocketAddress;
97 import java.net.URI;
98 import java.util.ArrayList;
99 import java.util.Arrays;
100 import java.util.Collections;
101 import java.util.HashMap;
102 import java.util.LinkedList;
103 import java.util.List;
104 import java.util.Map;
105 import java.util.Random;
106 import java.util.concurrent.BlockingQueue;
107 import java.util.concurrent.ExecutionException;
108 import java.util.concurrent.Executor;
109 import java.util.concurrent.TimeUnit;
110 import java.util.concurrent.atomic.AtomicBoolean;
111 import java.util.concurrent.atomic.AtomicLong;
112 import java.util.concurrent.atomic.AtomicReference;
113 import javax.annotation.Nullable;
114 import org.junit.After;
115 import org.junit.Assert;
116 import org.junit.Assume;
117 import org.junit.Before;
118 import org.junit.Rule;
119 import org.junit.Test;
120 import org.junit.rules.ExpectedException;
121 import org.junit.runner.RunWith;
122 import org.junit.runners.JUnit4;
123 import org.mockito.ArgumentCaptor;
124 import org.mockito.Captor;
125 import org.mockito.InOrder;
126 import org.mockito.Matchers;
127 import org.mockito.Mock;
128 import org.mockito.MockitoAnnotations;
129 import org.mockito.invocation.InvocationOnMock;
130 import org.mockito.stubbing.Answer;
131 
132 /** Unit tests for {@link ManagedChannelImpl}. */
133 @RunWith(JUnit4.class)
134 public class ManagedChannelImplTest {
135   private static final Attributes NAME_RESOLVER_PARAMS =
136       Attributes.newBuilder().set(NameResolver.Factory.PARAMS_DEFAULT_PORT, 447).build();
137 
138   private static final MethodDescriptor<String, Integer> method =
139       MethodDescriptor.<String, Integer>newBuilder()
140           .setType(MethodType.UNKNOWN)
141           .setFullMethodName("service/method")
142           .setRequestMarshaller(new StringMarshaller())
143           .setResponseMarshaller(new IntegerMarshaller())
144           .build();
145   private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY =
146       Attributes.Key.create("subchannel-attr-key");
147   private static final long RECONNECT_BACKOFF_INTERVAL_NANOS = 10;
148   private static final String SERVICE_NAME = "fake.example.com";
149   private static final String AUTHORITY = SERVICE_NAME;
150   private static final String USER_AGENT = "userAgent";
151   private static final ClientTransportOptions clientTransportOptions =
152       new ClientTransportOptions()
153           .setAuthority(AUTHORITY)
154           .setUserAgent(USER_AGENT);
155   private static final String TARGET = "fake://" + SERVICE_NAME;
156   private URI expectedUri;
157   private final SocketAddress socketAddress = new SocketAddress() {};
158   private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
159   private final FakeClock timer = new FakeClock();
160   private final FakeClock executor = new FakeClock();
161   private final FakeClock oobExecutor = new FakeClock();
162   private static final FakeClock.TaskFilter NAME_RESOLVER_REFRESH_TASK_FILTER =
163       new FakeClock.TaskFilter() {
164         @Override
165         public boolean shouldAccept(Runnable command) {
166           return command instanceof ManagedChannelImpl.NameResolverRefresh;
167         }
168       };
169 
170   private final InternalChannelz channelz = new InternalChannelz();
171 
172   @Rule public final ExpectedException thrown = ExpectedException.none();
173 
174   private ManagedChannelImpl channel;
175   private Helper helper;
176   @Captor
177   private ArgumentCaptor<Status> statusCaptor;
178   @Captor
179   private ArgumentCaptor<CallOptions> callOptionsCaptor;
180   @Mock
181   private LoadBalancer.Factory mockLoadBalancerFactory;
182   @Mock
183   private LoadBalancer mockLoadBalancer;
184 
185   @Captor
186   private ArgumentCaptor<ConnectivityStateInfo> stateInfoCaptor;
187   @Mock
188   private SubchannelPicker mockPicker;
189   @Mock
190   private ClientTransportFactory mockTransportFactory;
191   @Mock
192   private ClientCall.Listener<Integer> mockCallListener;
193   @Mock
194   private ClientCall.Listener<Integer> mockCallListener2;
195   @Mock
196   private ClientCall.Listener<Integer> mockCallListener3;
197   @Mock
198   private ClientCall.Listener<Integer> mockCallListener4;
199   @Mock
200   private ClientCall.Listener<Integer> mockCallListener5;
201   @Mock
202   private ObjectPool<Executor> executorPool;
203   @Mock
204   private ObjectPool<Executor> oobExecutorPool;
205   @Mock
206   private CallCredentials creds;
207   private ChannelBuilder channelBuilder;
208   private boolean requestConnection = true;
209   private BlockingQueue<MockClientTransportInfo> transports;
210 
211   private ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
212       ArgumentCaptor.forClass(ClientStreamListener.class);
213 
createChannel(ClientInterceptor... interceptors)214   private void createChannel(ClientInterceptor... interceptors) {
215     checkState(channel == null);
216     TimeProvider fakeClockTimeProvider = new TimeProvider() {
217       @Override
218       public long currentTimeNanos() {
219         return timer.getTicker().read();
220       }
221     };
222 
223     channel = new ManagedChannelImpl(
224         channelBuilder, mockTransportFactory, new FakeBackoffPolicyProvider(),
225         oobExecutorPool, timer.getStopwatchSupplier(), Arrays.asList(interceptors),
226         fakeClockTimeProvider);
227 
228     if (requestConnection) {
229       int numExpectedTasks = 0;
230 
231       // Force-exit the initial idle-mode
232       channel.exitIdleMode();
233       if (channelBuilder.idleTimeoutMillis != ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE) {
234         numExpectedTasks += 1;
235       }
236 
237       if (getNameResolverRefresh() != null) {
238         numExpectedTasks += 1;
239       }
240 
241       assertEquals(numExpectedTasks, timer.numPendingTasks());
242 
243       ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
244       verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
245       helper = helperCaptor.getValue();
246     }
247   }
248 
249   @Before
setUp()250   public void setUp() throws Exception {
251     MockitoAnnotations.initMocks(this);
252     expectedUri = new URI(TARGET);
253     when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer);
254     transports = TestUtils.captureTransports(mockTransportFactory);
255     when(mockTransportFactory.getScheduledExecutorService())
256         .thenReturn(timer.getScheduledExecutorService());
257     when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
258     when(oobExecutorPool.getObject()).thenReturn(oobExecutor.getScheduledExecutorService());
259 
260     channelBuilder = new ChannelBuilder()
261         .nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri).build())
262         .loadBalancerFactory(mockLoadBalancerFactory)
263         .userAgent(USER_AGENT)
264         .idleTimeout(AbstractManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS);
265     channelBuilder.executorPool = executorPool;
266     channelBuilder.binlog = null;
267     channelBuilder.channelz = channelz;
268   }
269 
270   @After
allPendingTasksAreRun()271   public void allPendingTasksAreRun() throws Exception {
272     // The "never" verifications in the tests only hold up if all due tasks are done.
273     // As for timer, although there may be scheduled tasks in a future time, since we don't test
274     // any time-related behavior in this test suite, we only care the tasks that are due. This
275     // would ignore any time-sensitive tasks, e.g., back-off and the idle timer.
276     assertTrue(timer.getDueTasks() + " should be empty", timer.getDueTasks().isEmpty());
277     assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks());
278     if (channel != null) {
279       channel.shutdownNow();
280       channel = null;
281     }
282   }
283 
284   @Test
285   @SuppressWarnings("unchecked")
idleModeDisabled()286   public void idleModeDisabled() {
287     channelBuilder.nameResolverFactory(
288         new FakeNameResolverFactory.Builder(expectedUri)
289             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
290             .build());
291     createChannel();
292 
293     // In this test suite, the channel is always created with idle mode disabled.
294     // No task is scheduled to enter idle mode
295     assertEquals(0, timer.numPendingTasks());
296     assertEquals(0, executor.numPendingTasks());
297   }
298 
299   @Test
immediateDeadlineExceeded()300   public void immediateDeadlineExceeded() {
301     createChannel();
302     ClientCall<String, Integer> call =
303         channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS));
304     call.start(mockCallListener, new Metadata());
305     assertEquals(1, executor.runDueTasks());
306 
307     verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
308     Status status = statusCaptor.getValue();
309     assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode());
310   }
311 
312   @Test
shutdownWithNoTransportsEverCreated()313   public void shutdownWithNoTransportsEverCreated() {
314     channelBuilder.nameResolverFactory(
315         new FakeNameResolverFactory.Builder(expectedUri)
316             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
317             .build());
318     createChannel();
319     verify(executorPool).getObject();
320     verify(executorPool, never()).returnObject(anyObject());
321     verify(mockTransportFactory).getScheduledExecutorService();
322     verifyNoMoreInteractions(mockTransportFactory);
323     channel.shutdown();
324     assertTrue(channel.isShutdown());
325     assertTrue(channel.isTerminated());
326     verify(executorPool).returnObject(executor.getScheduledExecutorService());
327   }
328 
329   @Test
channelzMembership()330   public void channelzMembership() throws Exception {
331     createChannel();
332     assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
333     assertFalse(channelz.containsSubchannel(channel.getLogId()));
334     channel.shutdownNow();
335     channel.awaitTermination(5, TimeUnit.SECONDS);
336     assertNull(channelz.getRootChannel(channel.getLogId().getId()));
337     assertFalse(channelz.containsSubchannel(channel.getLogId()));
338   }
339 
340   @Test
channelzMembership_subchannel()341   public void channelzMembership_subchannel() throws Exception {
342     createChannel();
343     assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
344 
345     AbstractSubchannel subchannel =
346         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
347     // subchannels are not root channels
348     assertNull(channelz.getRootChannel(subchannel.getInternalSubchannel().getLogId().getId()));
349     assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
350     assertThat(getStats(channel).subchannels)
351         .containsExactly(subchannel.getInternalSubchannel());
352 
353     subchannel.requestConnection();
354     MockClientTransportInfo transportInfo = transports.poll();
355     assertNotNull(transportInfo);
356     assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId()));
357 
358     // terminate transport
359     transportInfo.listener.transportTerminated();
360     assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId()));
361 
362     // terminate subchannel
363     assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
364     subchannel.shutdown();
365     timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
366     timer.runDueTasks();
367     assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
368     assertThat(getStats(channel).subchannels).isEmpty();
369 
370     // channel still appears
371     assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
372   }
373 
374   @Test
channelzMembership_oob()375   public void channelzMembership_oob() throws Exception {
376     createChannel();
377     OobChannel oob = (OobChannel) helper.createOobChannel(addressGroup, AUTHORITY);
378     // oob channels are not root channels
379     assertNull(channelz.getRootChannel(oob.getLogId().getId()));
380     assertTrue(channelz.containsSubchannel(oob.getLogId()));
381     assertThat(getStats(channel).subchannels).containsExactly(oob);
382     assertTrue(channelz.containsSubchannel(oob.getLogId()));
383 
384     AbstractSubchannel subchannel = (AbstractSubchannel) oob.getSubchannel();
385     assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
386     assertThat(getStats(oob).subchannels)
387         .containsExactly(subchannel.getInternalSubchannel());
388     assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
389 
390     oob.getSubchannel().requestConnection();
391     MockClientTransportInfo transportInfo = transports.poll();
392     assertNotNull(transportInfo);
393     assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId()));
394 
395     // terminate transport
396     transportInfo.listener.transportTerminated();
397     assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId()));
398 
399     // terminate oobchannel
400     oob.shutdown();
401     assertFalse(channelz.containsSubchannel(oob.getLogId()));
402     assertThat(getStats(channel).subchannels).isEmpty();
403     assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
404 
405     // channel still appears
406     assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
407   }
408 
409   @Test
callsAndShutdown()410   public void callsAndShutdown() {
411     subtestCallsAndShutdown(false, false);
412   }
413 
414   @Test
callsAndShutdownNow()415   public void callsAndShutdownNow() {
416     subtestCallsAndShutdown(true, false);
417   }
418 
419   /** Make sure shutdownNow() after shutdown() has an effect. */
420   @Test
callsAndShutdownAndShutdownNow()421   public void callsAndShutdownAndShutdownNow() {
422     subtestCallsAndShutdown(false, true);
423   }
424 
subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown)425   private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown) {
426     FakeNameResolverFactory nameResolverFactory =
427         new FakeNameResolverFactory.Builder(expectedUri).build();
428     channelBuilder.nameResolverFactory(nameResolverFactory);
429     createChannel();
430     verify(executorPool).getObject();
431     ClientStream mockStream = mock(ClientStream.class);
432     ClientStream mockStream2 = mock(ClientStream.class);
433     Metadata headers = new Metadata();
434     Metadata headers2 = new Metadata();
435 
436     // Configure the picker so that first RPC goes to delayed transport, and second RPC goes to
437     // real transport.
438     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
439     subchannel.requestConnection();
440     verify(mockTransportFactory)
441         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
442     MockClientTransportInfo transportInfo = transports.poll();
443     ConnectionClientTransport mockTransport = transportInfo.transport;
444     verify(mockTransport).start(any(ManagedClientTransport.Listener.class));
445     ManagedClientTransport.Listener transportListener = transportInfo.listener;
446     when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT)))
447         .thenReturn(mockStream);
448     when(mockTransport.newStream(same(method), same(headers2), same(CallOptions.DEFAULT)))
449         .thenReturn(mockStream2);
450     transportListener.transportReady();
451     when(mockPicker.pickSubchannel(
452         new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))).thenReturn(
453         PickResult.withNoResult());
454     when(mockPicker.pickSubchannel(
455         new PickSubchannelArgsImpl(method, headers2, CallOptions.DEFAULT))).thenReturn(
456         PickResult.withSubchannel(subchannel));
457     helper.updateBalancingState(READY, mockPicker);
458 
459     // First RPC, will be pending
460     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
461     verify(mockTransportFactory)
462         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
463     call.start(mockCallListener, headers);
464 
465     verify(mockTransport, never())
466         .newStream(same(method), same(headers), same(CallOptions.DEFAULT));
467 
468     // Second RPC, will be assigned to the real transport
469     ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
470     call2.start(mockCallListener2, headers2);
471     verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT));
472     verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT));
473     verify(mockStream2).start(any(ClientStreamListener.class));
474 
475     // Shutdown
476     if (shutdownNow) {
477       channel.shutdownNow();
478     } else {
479       channel.shutdown();
480       if (shutdownNowAfterShutdown) {
481         channel.shutdownNow();
482         shutdownNow = true;
483       }
484     }
485     assertTrue(channel.isShutdown());
486     assertFalse(channel.isTerminated());
487     assertEquals(1, nameResolverFactory.resolvers.size());
488     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
489 
490     // Further calls should fail without going to the transport
491     ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT);
492     call3.start(mockCallListener3, headers2);
493     timer.runDueTasks();
494     executor.runDueTasks();
495 
496     verify(mockCallListener3).onClose(statusCaptor.capture(), any(Metadata.class));
497     assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
498 
499     if (shutdownNow) {
500       // LoadBalancer and NameResolver are shut down as soon as delayed transport is terminated.
501       verify(mockLoadBalancer).shutdown();
502       assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
503       // call should have been aborted by delayed transport
504       executor.runDueTasks();
505       verify(mockCallListener).onClose(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS),
506           any(Metadata.class));
507     } else {
508       // LoadBalancer and NameResolver are still running.
509       verify(mockLoadBalancer, never()).shutdown();
510       assertFalse(nameResolverFactory.resolvers.get(0).shutdown);
511       // call and call2 are still alive, and can still be assigned to a real transport
512       SubchannelPicker picker2 = mock(SubchannelPicker.class);
513       when(picker2.pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT)))
514           .thenReturn(PickResult.withSubchannel(subchannel));
515       helper.updateBalancingState(READY, picker2);
516       executor.runDueTasks();
517       verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT));
518       verify(mockStream).start(any(ClientStreamListener.class));
519     }
520 
521     // After call is moved out of delayed transport, LoadBalancer, NameResolver and the transports
522     // will be shutdown.
523     verify(mockLoadBalancer).shutdown();
524     assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
525 
526     if (shutdownNow) {
527       // Channel shutdownNow() all subchannels after shutting down LoadBalancer
528       verify(mockTransport).shutdownNow(ManagedChannelImpl.SHUTDOWN_NOW_STATUS);
529     } else {
530       verify(mockTransport, never()).shutdownNow(any(Status.class));
531     }
532     // LoadBalancer should shutdown the subchannel
533     subchannel.shutdown();
534     if (shutdownNow) {
535       verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS));
536     } else {
537       verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS));
538     }
539 
540     // Killing the remaining real transport will terminate the channel
541     transportListener.transportShutdown(Status.UNAVAILABLE);
542     assertFalse(channel.isTerminated());
543     verify(executorPool, never()).returnObject(anyObject());
544     transportListener.transportTerminated();
545     assertTrue(channel.isTerminated());
546     verify(executorPool).returnObject(executor.getScheduledExecutorService());
547     verifyNoMoreInteractions(oobExecutorPool);
548 
549     verify(mockTransportFactory)
550         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
551     verify(mockTransportFactory).close();
552     verify(mockTransport, atLeast(0)).getLogId();
553     verifyNoMoreInteractions(mockTransport);
554   }
555 
556   @Test
noMoreCallbackAfterLoadBalancerShutdown()557   public void noMoreCallbackAfterLoadBalancerShutdown() {
558     FakeNameResolverFactory nameResolverFactory =
559         new FakeNameResolverFactory.Builder(expectedUri)
560             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
561             .build();
562     channelBuilder.nameResolverFactory(nameResolverFactory);
563     Status resolutionError = Status.UNAVAILABLE.withDescription("Resolution failed");
564     createChannel();
565 
566     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
567     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
568     verify(mockLoadBalancer).handleResolvedAddressGroups(
569         eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY));
570 
571     Subchannel subchannel1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
572     Subchannel subchannel2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
573     subchannel1.requestConnection();
574     subchannel2.requestConnection();
575     verify(mockTransportFactory, times(2))
576         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
577     MockClientTransportInfo transportInfo1 = transports.poll();
578     MockClientTransportInfo transportInfo2 = transports.poll();
579 
580     // LoadBalancer receives all sorts of callbacks
581     transportInfo1.listener.transportReady();
582     verify(mockLoadBalancer, times(2))
583         .handleSubchannelState(same(subchannel1), stateInfoCaptor.capture());
584     assertSame(CONNECTING, stateInfoCaptor.getAllValues().get(0).getState());
585     assertSame(READY, stateInfoCaptor.getAllValues().get(1).getState());
586 
587     verify(mockLoadBalancer)
588         .handleSubchannelState(same(subchannel2), stateInfoCaptor.capture());
589     assertSame(CONNECTING, stateInfoCaptor.getValue().getState());
590 
591     resolver.listener.onError(resolutionError);
592     verify(mockLoadBalancer).handleNameResolutionError(resolutionError);
593 
594     verifyNoMoreInteractions(mockLoadBalancer);
595 
596     channel.shutdown();
597     verify(mockLoadBalancer).shutdown();
598 
599     // No more callback should be delivered to LoadBalancer after it's shut down
600     transportInfo2.listener.transportReady();
601     resolver.listener.onError(resolutionError);
602     resolver.resolved();
603     verifyNoMoreInteractions(mockLoadBalancer);
604   }
605 
606   @Test
interceptor()607   public void interceptor() throws Exception {
608     final AtomicLong atomic = new AtomicLong();
609     ClientInterceptor interceptor = new ClientInterceptor() {
610       @Override
611       public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> interceptCall(
612           MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions,
613           Channel next) {
614         atomic.set(1);
615         return next.newCall(method, callOptions);
616       }
617     };
618     createChannel(interceptor);
619     assertNotNull(channel.newCall(method, CallOptions.DEFAULT));
620     assertEquals(1, atomic.get());
621   }
622 
623   @Test
callOptionsExecutor()624   public void callOptionsExecutor() {
625     Metadata headers = new Metadata();
626     ClientStream mockStream = mock(ClientStream.class);
627     FakeClock callExecutor = new FakeClock();
628     createChannel();
629 
630     // Start a call with a call executor
631     CallOptions options =
632         CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService());
633     ClientCall<String, Integer> call = channel.newCall(method, options);
634     call.start(mockCallListener, headers);
635 
636     // Make the transport available
637     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
638     verify(mockTransportFactory, never())
639         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
640     subchannel.requestConnection();
641     verify(mockTransportFactory)
642         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
643     MockClientTransportInfo transportInfo = transports.poll();
644     ConnectionClientTransport mockTransport = transportInfo.transport;
645     ManagedClientTransport.Listener transportListener = transportInfo.listener;
646     when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class)))
647         .thenReturn(mockStream);
648     transportListener.transportReady();
649     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
650         .thenReturn(PickResult.withSubchannel(subchannel));
651     assertEquals(0, callExecutor.numPendingTasks());
652     helper.updateBalancingState(READY, mockPicker);
653 
654     // Real streams are started in the call executor if they were previously buffered.
655     assertEquals(1, callExecutor.runDueTasks());
656     verify(mockTransport).newStream(same(method), same(headers), same(options));
657     verify(mockStream).start(streamListenerCaptor.capture());
658 
659     // Call listener callbacks are also run in the call executor
660     ClientStreamListener streamListener = streamListenerCaptor.getValue();
661     Metadata trailers = new Metadata();
662     assertEquals(0, callExecutor.numPendingTasks());
663     streamListener.closed(Status.CANCELLED, trailers);
664     verify(mockCallListener, never()).onClose(same(Status.CANCELLED), same(trailers));
665     assertEquals(1, callExecutor.runDueTasks());
666     verify(mockCallListener).onClose(same(Status.CANCELLED), same(trailers));
667 
668 
669     transportListener.transportShutdown(Status.UNAVAILABLE);
670     transportListener.transportTerminated();
671 
672     // Clean up as much as possible to allow the channel to terminate.
673     subchannel.shutdown();
674     timer.forwardNanos(
675         TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS));
676   }
677 
678   @Test
nameResolutionFailed()679   public void nameResolutionFailed() {
680     Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
681     FakeNameResolverFactory nameResolverFactory =
682         new FakeNameResolverFactory.Builder(expectedUri)
683             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
684             .setError(error)
685             .build();
686     channelBuilder.nameResolverFactory(nameResolverFactory);
687     // Name resolution is started as soon as channel is created.
688     createChannel();
689     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
690     verify(mockLoadBalancer).handleNameResolutionError(same(error));
691     assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
692 
693     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1);
694     assertEquals(0, resolver.refreshCalled);
695 
696     timer.forwardNanos(1);
697     assertEquals(1, resolver.refreshCalled);
698     verify(mockLoadBalancer, times(2)).handleNameResolutionError(same(error));
699 
700     // Verify an additional name resolution failure does not schedule another timer
701     resolver.refresh();
702     verify(mockLoadBalancer, times(3)).handleNameResolutionError(same(error));
703     assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
704 
705     // Allow the next refresh attempt to succeed
706     resolver.error = null;
707 
708     // For the second attempt, the backoff should occur at RECONNECT_BACKOFF_INTERVAL_NANOS * 2
709     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS * 2 - 1);
710     assertEquals(2, resolver.refreshCalled);
711     timer.forwardNanos(1);
712     assertEquals(3, resolver.refreshCalled);
713     assertEquals(0, timer.numPendingTasks());
714 
715     // Verify that the successful resolution reset the backoff policy
716     resolver.listener.onError(error);
717     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1);
718     assertEquals(3, resolver.refreshCalled);
719     timer.forwardNanos(1);
720     assertEquals(4, resolver.refreshCalled);
721     assertEquals(0, timer.numPendingTasks());
722   }
723 
724   @Test
nameResolutionFailed_delayedTransportShutdownCancelsBackoff()725   public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() {
726     Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
727 
728     FakeNameResolverFactory nameResolverFactory =
729         new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
730     channelBuilder.nameResolverFactory(nameResolverFactory);
731     // Name resolution is started as soon as channel is created.
732     createChannel();
733     verify(mockLoadBalancer).handleNameResolutionError(same(error));
734 
735     FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
736     assertNotNull(nameResolverBackoff);
737     assertFalse(nameResolverBackoff.isCancelled());
738 
739     // Add a pending call to the delayed transport
740     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
741     Metadata headers = new Metadata();
742     call.start(mockCallListener, headers);
743 
744     // The pending call on the delayed transport stops the name resolver backoff from cancelling
745     channel.shutdown();
746     assertFalse(nameResolverBackoff.isCancelled());
747 
748     // Notify that a subchannel is ready, which drains the delayed transport
749     SubchannelPicker picker = mock(SubchannelPicker.class);
750     Status status = Status.UNAVAILABLE.withDescription("for test");
751     when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
752         .thenReturn(PickResult.withDrop(status));
753     helper.updateBalancingState(READY, picker);
754     executor.runDueTasks();
755     verify(mockCallListener).onClose(same(status), any(Metadata.class));
756 
757     assertTrue(nameResolverBackoff.isCancelled());
758   }
759 
760   @Test
nameResolverReturnsEmptySubLists()761   public void nameResolverReturnsEmptySubLists() {
762     String errorDescription = "NameResolver returned an empty list";
763 
764     // Pass a FakeNameResolverFactory with an empty list
765     createChannel();
766 
767     // LoadBalancer received the error
768     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
769     verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture());
770     Status status = statusCaptor.getValue();
771     assertSame(Status.Code.UNAVAILABLE, status.getCode());
772     assertEquals(errorDescription, status.getDescription());
773   }
774 
775   @Test
loadBalancerThrowsInHandleResolvedAddresses()776   public void loadBalancerThrowsInHandleResolvedAddresses() {
777     RuntimeException ex = new RuntimeException("simulated");
778     // Delay the success of name resolution until allResolved() is called
779     FakeNameResolverFactory nameResolverFactory =
780         new FakeNameResolverFactory.Builder(expectedUri)
781             .setResolvedAtStart(false)
782             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
783             .build();
784     channelBuilder.nameResolverFactory(nameResolverFactory);
785     createChannel();
786 
787     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
788     doThrow(ex).when(mockLoadBalancer).handleResolvedAddressGroups(
789         Matchers.<List<EquivalentAddressGroup>>anyObject(), any(Attributes.class));
790 
791     // NameResolver returns addresses.
792     nameResolverFactory.allResolved();
793 
794     // Exception thrown from balancer is caught by ChannelExecutor, making channel enter panic mode.
795     verifyPanicMode(ex);
796   }
797 
798   @Test
nameResolvedAfterChannelShutdown()799   public void nameResolvedAfterChannelShutdown() {
800     // Delay the success of name resolution until allResolved() is called.
801     FakeNameResolverFactory nameResolverFactory =
802         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build();
803     channelBuilder.nameResolverFactory(nameResolverFactory);
804     createChannel();
805 
806     channel.shutdown();
807 
808     assertTrue(channel.isShutdown());
809     assertTrue(channel.isTerminated());
810     verify(mockLoadBalancer).shutdown();
811     // Name resolved after the channel is shut down, which is possible if the name resolution takes
812     // time and is not cancellable. The resolved address will be dropped.
813     nameResolverFactory.allResolved();
814     verifyNoMoreInteractions(mockLoadBalancer);
815   }
816 
817   /**
818    * Verify that if the first resolved address points to a server that cannot be connected, the call
819    * will end up with the second address which works.
820    */
821   @Test
firstResolvedServerFailedToConnect()822   public void firstResolvedServerFailedToConnect() throws Exception {
823     final SocketAddress goodAddress = new SocketAddress() {
824         @Override public String toString() {
825           return "goodAddress";
826         }
827       };
828     final SocketAddress badAddress = new SocketAddress() {
829         @Override public String toString() {
830           return "badAddress";
831         }
832       };
833     InOrder inOrder = inOrder(mockLoadBalancer);
834 
835     List<SocketAddress> resolvedAddrs = Arrays.asList(badAddress, goodAddress);
836     FakeNameResolverFactory nameResolverFactory =
837         new FakeNameResolverFactory.Builder(expectedUri)
838             .setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs)))
839             .build();
840     channelBuilder.nameResolverFactory(nameResolverFactory);
841     createChannel();
842 
843     // Start the call
844     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
845     Metadata headers = new Metadata();
846     call.start(mockCallListener, headers);
847     executor.runDueTasks();
848 
849     // Simulate name resolution results
850     EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs);
851     inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups(
852         eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY));
853     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
854     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
855         .thenReturn(PickResult.withSubchannel(subchannel));
856     subchannel.requestConnection();
857     inOrder.verify(mockLoadBalancer).handleSubchannelState(
858         same(subchannel), stateInfoCaptor.capture());
859     assertEquals(CONNECTING, stateInfoCaptor.getValue().getState());
860 
861     // The channel will starts with the first address (badAddress)
862     verify(mockTransportFactory)
863         .newClientTransport(same(badAddress), any(ClientTransportOptions.class));
864     verify(mockTransportFactory, times(0))
865         .newClientTransport(same(goodAddress), any(ClientTransportOptions.class));
866 
867     MockClientTransportInfo badTransportInfo = transports.poll();
868     // Which failed to connect
869     badTransportInfo.listener.transportShutdown(Status.UNAVAILABLE);
870     inOrder.verifyNoMoreInteractions();
871 
872     // The channel then try the second address (goodAddress)
873     verify(mockTransportFactory)
874         .newClientTransport(same(goodAddress), any(ClientTransportOptions.class));
875     MockClientTransportInfo goodTransportInfo = transports.poll();
876     when(goodTransportInfo.transport.newStream(
877             any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
878         .thenReturn(mock(ClientStream.class));
879 
880     goodTransportInfo.listener.transportReady();
881     inOrder.verify(mockLoadBalancer).handleSubchannelState(
882         same(subchannel), stateInfoCaptor.capture());
883     assertEquals(READY, stateInfoCaptor.getValue().getState());
884 
885     // A typical LoadBalancer will call this once the subchannel becomes READY
886     helper.updateBalancingState(READY, mockPicker);
887     // Delayed transport uses the app executor to create real streams.
888     executor.runDueTasks();
889 
890     verify(goodTransportInfo.transport).newStream(same(method), same(headers),
891         same(CallOptions.DEFAULT));
892     // The bad transport was never used.
893     verify(badTransportInfo.transport, times(0)).newStream(any(MethodDescriptor.class),
894         any(Metadata.class), any(CallOptions.class));
895   }
896 
897   @Test
failFastRpcFailFromErrorFromBalancer()898   public void failFastRpcFailFromErrorFromBalancer() {
899     subtestFailRpcFromBalancer(false, false, true);
900   }
901 
902   @Test
failFastRpcFailFromDropFromBalancer()903   public void failFastRpcFailFromDropFromBalancer() {
904     subtestFailRpcFromBalancer(false, true, true);
905   }
906 
907   @Test
waitForReadyRpcImmuneFromErrorFromBalancer()908   public void waitForReadyRpcImmuneFromErrorFromBalancer() {
909     subtestFailRpcFromBalancer(true, false, false);
910   }
911 
912   @Test
waitForReadyRpcFailFromDropFromBalancer()913   public void waitForReadyRpcFailFromDropFromBalancer() {
914     subtestFailRpcFromBalancer(true, true, true);
915   }
916 
subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail)917   private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) {
918     createChannel();
919 
920     // This call will be buffered by the channel, thus involve delayed transport
921     CallOptions callOptions = CallOptions.DEFAULT;
922     if (waitForReady) {
923       callOptions = callOptions.withWaitForReady();
924     } else {
925       callOptions = callOptions.withoutWaitForReady();
926     }
927     ClientCall<String, Integer> call1 = channel.newCall(method, callOptions);
928     call1.start(mockCallListener, new Metadata());
929 
930     SubchannelPicker picker = mock(SubchannelPicker.class);
931     Status status = Status.UNAVAILABLE.withDescription("for test");
932 
933     when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
934         .thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status));
935     helper.updateBalancingState(READY, picker);
936 
937     executor.runDueTasks();
938     if (shouldFail) {
939       verify(mockCallListener).onClose(same(status), any(Metadata.class));
940     } else {
941       verifyZeroInteractions(mockCallListener);
942     }
943 
944     // This call doesn't involve delayed transport
945     ClientCall<String, Integer> call2 = channel.newCall(method, callOptions);
946     call2.start(mockCallListener2, new Metadata());
947 
948     executor.runDueTasks();
949     if (shouldFail) {
950       verify(mockCallListener2).onClose(same(status), any(Metadata.class));
951     } else {
952       verifyZeroInteractions(mockCallListener2);
953     }
954   }
955 
956   /**
957    * Verify that if all resolved addresses failed to connect, a fail-fast call will fail, while a
958    * wait-for-ready call will still be buffered.
959    */
960   @Test
allServersFailedToConnect()961   public void allServersFailedToConnect() throws Exception {
962     final SocketAddress addr1 = new SocketAddress() {
963         @Override public String toString() {
964           return "addr1";
965         }
966       };
967     final SocketAddress addr2 = new SocketAddress() {
968         @Override public String toString() {
969           return "addr2";
970         }
971       };
972     InOrder inOrder = inOrder(mockLoadBalancer);
973 
974     List<SocketAddress> resolvedAddrs = Arrays.asList(addr1, addr2);
975 
976     FakeNameResolverFactory nameResolverFactory =
977         new FakeNameResolverFactory.Builder(expectedUri)
978             .setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs)))
979             .build();
980     channelBuilder.nameResolverFactory(nameResolverFactory);
981     createChannel();
982 
983     // Start a wait-for-ready call
984     ClientCall<String, Integer> call =
985         channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
986     Metadata headers = new Metadata();
987     call.start(mockCallListener, headers);
988     // ... and a fail-fast call
989     ClientCall<String, Integer> call2 =
990         channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
991     call2.start(mockCallListener2, headers);
992     executor.runDueTasks();
993 
994     // Simulate name resolution results
995     EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs);
996     inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups(
997         eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY));
998     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
999     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
1000         .thenReturn(PickResult.withSubchannel(subchannel));
1001     subchannel.requestConnection();
1002 
1003     inOrder.verify(mockLoadBalancer).handleSubchannelState(
1004         same(subchannel), stateInfoCaptor.capture());
1005     assertEquals(CONNECTING, stateInfoCaptor.getValue().getState());
1006 
1007     // Connecting to server1, which will fail
1008     verify(mockTransportFactory)
1009         .newClientTransport(same(addr1), any(ClientTransportOptions.class));
1010     verify(mockTransportFactory, times(0))
1011         .newClientTransport(same(addr2), any(ClientTransportOptions.class));
1012     MockClientTransportInfo transportInfo1 = transports.poll();
1013     transportInfo1.listener.transportShutdown(Status.UNAVAILABLE);
1014 
1015     // Connecting to server2, which will fail too
1016     verify(mockTransportFactory)
1017         .newClientTransport(same(addr2), any(ClientTransportOptions.class));
1018     MockClientTransportInfo transportInfo2 = transports.poll();
1019     Status server2Error = Status.UNAVAILABLE.withDescription("Server2 failed to connect");
1020     transportInfo2.listener.transportShutdown(server2Error);
1021 
1022     // ... which makes the subchannel enter TRANSIENT_FAILURE. The last error Status is propagated
1023     // to LoadBalancer.
1024     inOrder.verify(mockLoadBalancer).handleSubchannelState(
1025         same(subchannel), stateInfoCaptor.capture());
1026     assertEquals(TRANSIENT_FAILURE, stateInfoCaptor.getValue().getState());
1027     assertSame(server2Error, stateInfoCaptor.getValue().getStatus());
1028 
1029     // A typical LoadBalancer would create a picker with error
1030     SubchannelPicker picker2 = mock(SubchannelPicker.class);
1031     when(picker2.pickSubchannel(any(PickSubchannelArgs.class)))
1032         .thenReturn(PickResult.withError(server2Error));
1033     helper.updateBalancingState(TRANSIENT_FAILURE, picker2);
1034     executor.runDueTasks();
1035 
1036     // ... which fails the fail-fast call
1037     verify(mockCallListener2).onClose(same(server2Error), any(Metadata.class));
1038     // ... while the wait-for-ready call stays
1039     verifyNoMoreInteractions(mockCallListener);
1040     // No real stream was ever created
1041     verify(transportInfo1.transport, times(0))
1042         .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
1043     verify(transportInfo2.transport, times(0))
1044         .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
1045   }
1046 
1047   @Test
subchannels()1048   public void subchannels() {
1049     createChannel();
1050 
1051     // createSubchannel() always return a new Subchannel
1052     Attributes attrs1 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr1").build();
1053     Attributes attrs2 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr2").build();
1054     Subchannel sub1 = helper.createSubchannel(addressGroup, attrs1);
1055     Subchannel sub2 = helper.createSubchannel(addressGroup, attrs2);
1056     assertNotSame(sub1, sub2);
1057     assertNotSame(attrs1, attrs2);
1058     assertSame(attrs1, sub1.getAttributes());
1059     assertSame(attrs2, sub2.getAttributes());
1060     assertSame(addressGroup, sub1.getAddresses());
1061     assertSame(addressGroup, sub2.getAddresses());
1062 
1063     // requestConnection()
1064     verify(mockTransportFactory, never())
1065         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
1066     sub1.requestConnection();
1067     verify(mockTransportFactory).newClientTransport(socketAddress, clientTransportOptions);
1068     MockClientTransportInfo transportInfo1 = transports.poll();
1069     assertNotNull(transportInfo1);
1070 
1071     sub2.requestConnection();
1072     verify(mockTransportFactory, times(2))
1073         .newClientTransport(socketAddress, clientTransportOptions);
1074     MockClientTransportInfo transportInfo2 = transports.poll();
1075     assertNotNull(transportInfo2);
1076 
1077     sub1.requestConnection();
1078     sub2.requestConnection();
1079     verify(mockTransportFactory, times(2))
1080         .newClientTransport(socketAddress, clientTransportOptions);
1081 
1082     // shutdown() has a delay
1083     sub1.shutdown();
1084     timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS);
1085     sub1.shutdown();
1086     verify(transportInfo1.transport, never()).shutdown(any(Status.class));
1087     timer.forwardTime(1, TimeUnit.SECONDS);
1088     verify(transportInfo1.transport).shutdown(same(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_STATUS));
1089 
1090     // ... but not after Channel is terminating
1091     verify(mockLoadBalancer, never()).shutdown();
1092     channel.shutdown();
1093     verify(mockLoadBalancer).shutdown();
1094     verify(transportInfo2.transport, never()).shutdown(any(Status.class));
1095 
1096     sub2.shutdown();
1097     verify(transportInfo2.transport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS));
1098 
1099     // Cleanup
1100     transportInfo1.listener.transportShutdown(Status.UNAVAILABLE);
1101     transportInfo1.listener.transportTerminated();
1102     transportInfo2.listener.transportShutdown(Status.UNAVAILABLE);
1103     transportInfo2.listener.transportTerminated();
1104     timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
1105   }
1106 
1107   @Test
subchannelsWhenChannelShutdownNow()1108   public void subchannelsWhenChannelShutdownNow() {
1109     createChannel();
1110     Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1111     Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1112     sub1.requestConnection();
1113     sub2.requestConnection();
1114 
1115     assertEquals(2, transports.size());
1116     MockClientTransportInfo ti1 = transports.poll();
1117     MockClientTransportInfo ti2 = transports.poll();
1118 
1119     ti1.listener.transportReady();
1120     ti2.listener.transportReady();
1121 
1122     channel.shutdownNow();
1123     verify(ti1.transport).shutdownNow(any(Status.class));
1124     verify(ti2.transport).shutdownNow(any(Status.class));
1125 
1126     ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
1127     ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
1128     ti1.listener.transportTerminated();
1129 
1130     assertFalse(channel.isTerminated());
1131     ti2.listener.transportTerminated();
1132     assertTrue(channel.isTerminated());
1133   }
1134 
1135   @Test
subchannelsNoConnectionShutdown()1136   public void subchannelsNoConnectionShutdown() {
1137     createChannel();
1138     Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1139     Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1140 
1141     channel.shutdown();
1142     verify(mockLoadBalancer).shutdown();
1143     sub1.shutdown();
1144     assertFalse(channel.isTerminated());
1145     sub2.shutdown();
1146     assertTrue(channel.isTerminated());
1147     verify(mockTransportFactory, never())
1148         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
1149   }
1150 
1151   @Test
subchannelsNoConnectionShutdownNow()1152   public void subchannelsNoConnectionShutdownNow() {
1153     createChannel();
1154     helper.createSubchannel(addressGroup, Attributes.EMPTY);
1155     helper.createSubchannel(addressGroup, Attributes.EMPTY);
1156     channel.shutdownNow();
1157 
1158     verify(mockLoadBalancer).shutdown();
1159     // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels.
1160     // Therefore, channel is terminated without relying on LoadBalancer to shutdown subchannels.
1161     assertTrue(channel.isTerminated());
1162     verify(mockTransportFactory, never())
1163         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
1164   }
1165 
1166   @Test
oobchannels()1167   public void oobchannels() {
1168     createChannel();
1169 
1170     ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority");
1171     ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority");
1172     verify(oobExecutorPool, times(2)).getObject();
1173 
1174     assertEquals("oob1authority", oob1.authority());
1175     assertEquals("oob2authority", oob2.authority());
1176 
1177     // OOB channels create connections lazily.  A new call will initiate the connection.
1178     Metadata headers = new Metadata();
1179     ClientCall<String, Integer> call = oob1.newCall(method, CallOptions.DEFAULT);
1180     call.start(mockCallListener, headers);
1181     verify(mockTransportFactory)
1182         .newClientTransport(
1183             socketAddress,
1184             new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT));
1185     MockClientTransportInfo transportInfo = transports.poll();
1186     assertNotNull(transportInfo);
1187 
1188     assertEquals(0, oobExecutor.numPendingTasks());
1189     transportInfo.listener.transportReady();
1190     assertEquals(1, oobExecutor.runDueTasks());
1191     verify(transportInfo.transport).newStream(same(method), same(headers),
1192         same(CallOptions.DEFAULT));
1193 
1194     // The transport goes away
1195     transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
1196     transportInfo.listener.transportTerminated();
1197 
1198     // A new call will trigger a new transport
1199     ClientCall<String, Integer> call2 = oob1.newCall(method, CallOptions.DEFAULT);
1200     call2.start(mockCallListener2, headers);
1201     ClientCall<String, Integer> call3 =
1202         oob1.newCall(method, CallOptions.DEFAULT.withWaitForReady());
1203     call3.start(mockCallListener3, headers);
1204     verify(mockTransportFactory, times(2)).newClientTransport(
1205         socketAddress,
1206         new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT));
1207     transportInfo = transports.poll();
1208     assertNotNull(transportInfo);
1209 
1210     // This transport fails
1211     Status transportError = Status.UNAVAILABLE.withDescription("Connection refused");
1212     assertEquals(0, oobExecutor.numPendingTasks());
1213     transportInfo.listener.transportShutdown(transportError);
1214     assertTrue(oobExecutor.runDueTasks() > 0);
1215 
1216     // Fail-fast RPC will fail, while wait-for-ready RPC will still be pending
1217     verify(mockCallListener2).onClose(same(transportError), any(Metadata.class));
1218     verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class));
1219 
1220     // Shutdown
1221     assertFalse(oob1.isShutdown());
1222     assertFalse(oob2.isShutdown());
1223     oob1.shutdown();
1224     verify(oobExecutorPool, never()).returnObject(anyObject());
1225     oob2.shutdownNow();
1226     assertTrue(oob1.isShutdown());
1227     assertTrue(oob2.isShutdown());
1228     assertTrue(oob2.isTerminated());
1229     verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService());
1230 
1231     // New RPCs will be rejected.
1232     assertEquals(0, oobExecutor.numPendingTasks());
1233     ClientCall<String, Integer> call4 = oob1.newCall(method, CallOptions.DEFAULT);
1234     ClientCall<String, Integer> call5 = oob2.newCall(method, CallOptions.DEFAULT);
1235     call4.start(mockCallListener4, headers);
1236     call5.start(mockCallListener5, headers);
1237     assertTrue(oobExecutor.runDueTasks() > 0);
1238     verify(mockCallListener4).onClose(statusCaptor.capture(), any(Metadata.class));
1239     Status status4 = statusCaptor.getValue();
1240     assertEquals(Status.Code.UNAVAILABLE, status4.getCode());
1241     verify(mockCallListener5).onClose(statusCaptor.capture(), any(Metadata.class));
1242     Status status5 = statusCaptor.getValue();
1243     assertEquals(Status.Code.UNAVAILABLE, status5.getCode());
1244 
1245     // The pending RPC will still be pending
1246     verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class));
1247 
1248     // This will shutdownNow() the delayed transport, terminating the pending RPC
1249     assertEquals(0, oobExecutor.numPendingTasks());
1250     oob1.shutdownNow();
1251     assertTrue(oobExecutor.runDueTasks() > 0);
1252     verify(mockCallListener3).onClose(any(Status.class), any(Metadata.class));
1253 
1254     // Shut down the channel, and it will not terminated because OOB channel has not.
1255     channel.shutdown();
1256     assertFalse(channel.isTerminated());
1257     // Delayed transport has already terminated.  Terminating the transport terminates the
1258     // subchannel, which in turn terimates the OOB channel, which terminates the channel.
1259     assertFalse(oob1.isTerminated());
1260     verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService());
1261     transportInfo.listener.transportTerminated();
1262     assertTrue(oob1.isTerminated());
1263     assertTrue(channel.isTerminated());
1264     verify(oobExecutorPool, times(2)).returnObject(oobExecutor.getScheduledExecutorService());
1265   }
1266 
1267   @Test
oobChannelsWhenChannelShutdownNow()1268   public void oobChannelsWhenChannelShutdownNow() {
1269     createChannel();
1270     ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
1271     ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
1272 
1273     oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
1274     oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata());
1275 
1276     assertEquals(2, transports.size());
1277     MockClientTransportInfo ti1 = transports.poll();
1278     MockClientTransportInfo ti2 = transports.poll();
1279 
1280     ti1.listener.transportReady();
1281     ti2.listener.transportReady();
1282 
1283     channel.shutdownNow();
1284     verify(ti1.transport).shutdownNow(any(Status.class));
1285     verify(ti2.transport).shutdownNow(any(Status.class));
1286 
1287     ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
1288     ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
1289     ti1.listener.transportTerminated();
1290 
1291     assertFalse(channel.isTerminated());
1292     ti2.listener.transportTerminated();
1293     assertTrue(channel.isTerminated());
1294   }
1295 
1296   @Test
oobChannelsNoConnectionShutdown()1297   public void oobChannelsNoConnectionShutdown() {
1298     createChannel();
1299     ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
1300     ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
1301     channel.shutdown();
1302 
1303     verify(mockLoadBalancer).shutdown();
1304     oob1.shutdown();
1305     assertTrue(oob1.isTerminated());
1306     assertFalse(channel.isTerminated());
1307     oob2.shutdown();
1308     assertTrue(oob2.isTerminated());
1309     assertTrue(channel.isTerminated());
1310     verify(mockTransportFactory, never())
1311         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
1312   }
1313 
1314   @Test
oobChannelsNoConnectionShutdownNow()1315   public void oobChannelsNoConnectionShutdownNow() {
1316     createChannel();
1317     helper.createOobChannel(addressGroup, "oob1Authority");
1318     helper.createOobChannel(addressGroup, "oob2Authority");
1319     channel.shutdownNow();
1320 
1321     verify(mockLoadBalancer).shutdown();
1322     assertTrue(channel.isTerminated());
1323     // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels.
1324     // Therefore, channel is terminated without relying on LoadBalancer to shutdown oobchannels.
1325     verify(mockTransportFactory, never())
1326         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
1327   }
1328 
1329   @Test
refreshNameResolutionWhenSubchannelConnectionFailed()1330   public void refreshNameResolutionWhenSubchannelConnectionFailed() {
1331     subtestRefreshNameResolutionWhenConnectionFailed(false);
1332   }
1333 
1334   @Test
refreshNameResolutionWhenOobChannelConnectionFailed()1335   public void refreshNameResolutionWhenOobChannelConnectionFailed() {
1336     subtestRefreshNameResolutionWhenConnectionFailed(true);
1337   }
1338 
subtestRefreshNameResolutionWhenConnectionFailed(boolean isOobChannel)1339   private void subtestRefreshNameResolutionWhenConnectionFailed(boolean isOobChannel) {
1340     FakeNameResolverFactory nameResolverFactory =
1341         new FakeNameResolverFactory.Builder(expectedUri)
1342             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
1343             .build();
1344     channelBuilder.nameResolverFactory(nameResolverFactory);
1345     createChannel();
1346     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
1347 
1348     if (isOobChannel) {
1349       OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobAuthority");
1350       oobChannel.getSubchannel().requestConnection();
1351     } else {
1352       Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1353       subchannel.requestConnection();
1354     }
1355 
1356     MockClientTransportInfo transportInfo = transports.poll();
1357     assertNotNull(transportInfo);
1358 
1359     // Transport closed when connecting
1360     assertEquals(0, resolver.refreshCalled);
1361     transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
1362     assertEquals(1, resolver.refreshCalled);
1363 
1364     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS);
1365     transportInfo = transports.poll();
1366     assertNotNull(transportInfo);
1367 
1368     transportInfo.listener.transportReady();
1369 
1370     // Transport closed when ready
1371     assertEquals(1, resolver.refreshCalled);
1372     transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
1373     assertEquals(2, resolver.refreshCalled);
1374   }
1375 
1376   @Test
uriPattern()1377   public void uriPattern() {
1378     assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("a:/").matches());
1379     assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("Z019+-.:/!@ #~ ").matches());
1380     assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a/:").matches()); // "/:" not matched
1381     assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("0a:/").matches()); // '0' not matched
1382     assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a,:/").matches()); // ',' not matched
1383     assertFalse(ManagedChannelImpl.URI_PATTERN.matcher(" a:/").matches()); // space not matched
1384   }
1385 
1386   /**
1387    * Test that information such as the Call's context, MethodDescriptor, authority, executor are
1388    * propagated to newStream() and applyRequestMetadata().
1389    */
1390   @Test
informationPropagatedToNewStreamAndCallCredentials()1391   public void informationPropagatedToNewStreamAndCallCredentials() {
1392     createChannel();
1393     CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds);
1394     final Context.Key<String> testKey = Context.key("testing");
1395     Context ctx = Context.current().withValue(testKey, "testValue");
1396     final LinkedList<Context> credsApplyContexts = new LinkedList<Context>();
1397     final LinkedList<Context> newStreamContexts = new LinkedList<Context>();
1398     doAnswer(new Answer<Void>() {
1399         @Override
1400         public Void answer(InvocationOnMock in) throws Throwable {
1401           credsApplyContexts.add(Context.current());
1402           return null;
1403         }
1404       }).when(creds).applyRequestMetadata(
1405           any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
1406           any(MetadataApplier.class));
1407 
1408     // First call will be on delayed transport.  Only newCall() is run within the expected context,
1409     // so that we can verify that the context is explicitly attached before calling newStream() and
1410     // applyRequestMetadata(), which happens after we detach the context from the thread.
1411     Context origCtx = ctx.attach();
1412     assertEquals("testValue", testKey.get());
1413     ClientCall<String, Integer> call = channel.newCall(method, callOptions);
1414     ctx.detach(origCtx);
1415     assertNull(testKey.get());
1416     call.start(mockCallListener, new Metadata());
1417 
1418     // Simulate name resolution results
1419     EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
1420     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1421     subchannel.requestConnection();
1422     verify(mockTransportFactory)
1423         .newClientTransport(same(socketAddress), eq(clientTransportOptions));
1424     MockClientTransportInfo transportInfo = transports.poll();
1425     final ConnectionClientTransport transport = transportInfo.transport;
1426     when(transport.getAttributes()).thenReturn(Attributes.EMPTY);
1427     doAnswer(new Answer<ClientStream>() {
1428         @Override
1429         public ClientStream answer(InvocationOnMock in) throws Throwable {
1430           newStreamContexts.add(Context.current());
1431           return mock(ClientStream.class);
1432         }
1433       }).when(transport).newStream(
1434           any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
1435 
1436     verify(creds, never()).applyRequestMetadata(
1437         any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
1438         any(MetadataApplier.class));
1439 
1440     // applyRequestMetadata() is called after the transport becomes ready.
1441     transportInfo.listener.transportReady();
1442     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
1443         .thenReturn(PickResult.withSubchannel(subchannel));
1444     helper.updateBalancingState(READY, mockPicker);
1445     executor.runDueTasks();
1446     ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(Attributes.class);
1447     ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(MetadataApplier.class);
1448     verify(creds).applyRequestMetadata(same(method), attrsCaptor.capture(),
1449         same(executor.getScheduledExecutorService()), applierCaptor.capture());
1450     assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
1451     assertEquals(AUTHORITY, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY));
1452     assertEquals(SecurityLevel.NONE,
1453         attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL));
1454     verify(transport, never()).newStream(
1455         any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
1456 
1457     // newStream() is called after apply() is called
1458     applierCaptor.getValue().apply(new Metadata());
1459     verify(transport).newStream(same(method), any(Metadata.class), same(callOptions));
1460     assertEquals("testValue", testKey.get(newStreamContexts.poll()));
1461     // The context should not live beyond the scope of newStream() and applyRequestMetadata()
1462     assertNull(testKey.get());
1463 
1464 
1465     // Second call will not be on delayed transport
1466     origCtx = ctx.attach();
1467     call = channel.newCall(method, callOptions);
1468     ctx.detach(origCtx);
1469     call.start(mockCallListener, new Metadata());
1470 
1471     verify(creds, times(2)).applyRequestMetadata(same(method), attrsCaptor.capture(),
1472         same(executor.getScheduledExecutorService()), applierCaptor.capture());
1473     assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
1474     assertEquals(AUTHORITY, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY));
1475     assertEquals(SecurityLevel.NONE,
1476         attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL));
1477     // This is from the first call
1478     verify(transport).newStream(
1479         any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
1480 
1481     // Still, newStream() is called after apply() is called
1482     applierCaptor.getValue().apply(new Metadata());
1483     verify(transport, times(2)).newStream(same(method), any(Metadata.class), same(callOptions));
1484     assertEquals("testValue", testKey.get(newStreamContexts.poll()));
1485 
1486     assertNull(testKey.get());
1487   }
1488 
1489   @Test
pickerReturnsStreamTracer_noDelay()1490   public void pickerReturnsStreamTracer_noDelay() {
1491     ClientStream mockStream = mock(ClientStream.class);
1492     ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
1493     ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
1494     createChannel();
1495     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1496     subchannel.requestConnection();
1497     MockClientTransportInfo transportInfo = transports.poll();
1498     transportInfo.listener.transportReady();
1499     ClientTransport mockTransport = transportInfo.transport;
1500     when(mockTransport.newStream(
1501             any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
1502         .thenReturn(mockStream);
1503 
1504     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
1505         PickResult.withSubchannel(subchannel, factory2));
1506     helper.updateBalancingState(READY, mockPicker);
1507 
1508     CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
1509     ClientCall<String, Integer> call = channel.newCall(method, callOptions);
1510     call.start(mockCallListener, new Metadata());
1511 
1512     verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
1513     verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
1514     assertEquals(
1515         Arrays.asList(factory1, factory2),
1516         callOptionsCaptor.getValue().getStreamTracerFactories());
1517     // The factories are safely not stubbed because we do not expect any usage of them.
1518     verifyZeroInteractions(factory1);
1519     verifyZeroInteractions(factory2);
1520   }
1521 
1522   @Test
pickerReturnsStreamTracer_delayed()1523   public void pickerReturnsStreamTracer_delayed() {
1524     ClientStream mockStream = mock(ClientStream.class);
1525     ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
1526     ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
1527     createChannel();
1528 
1529     CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
1530     ClientCall<String, Integer> call = channel.newCall(method, callOptions);
1531     call.start(mockCallListener, new Metadata());
1532 
1533     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1534     subchannel.requestConnection();
1535     MockClientTransportInfo transportInfo = transports.poll();
1536     transportInfo.listener.transportReady();
1537     ClientTransport mockTransport = transportInfo.transport;
1538     when(mockTransport.newStream(
1539             any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
1540         .thenReturn(mockStream);
1541     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
1542         PickResult.withSubchannel(subchannel, factory2));
1543 
1544     helper.updateBalancingState(READY, mockPicker);
1545     assertEquals(1, executor.runDueTasks());
1546 
1547     verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
1548     verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
1549     assertEquals(
1550         Arrays.asList(factory1, factory2),
1551         callOptionsCaptor.getValue().getStreamTracerFactories());
1552     // The factories are safely not stubbed because we do not expect any usage of them.
1553     verifyZeroInteractions(factory1);
1554     verifyZeroInteractions(factory2);
1555   }
1556 
1557   @Test
getState_loadBalancerSupportsChannelState()1558   public void getState_loadBalancerSupportsChannelState() {
1559     channelBuilder.nameResolverFactory(
1560         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
1561     createChannel();
1562     assertEquals(IDLE, channel.getState(false));
1563 
1564     helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker);
1565     assertEquals(TRANSIENT_FAILURE, channel.getState(false));
1566   }
1567 
1568   @Test
getState_withRequestConnect()1569   public void getState_withRequestConnect() {
1570     channelBuilder.nameResolverFactory(
1571         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
1572     requestConnection = false;
1573     createChannel();
1574 
1575     assertEquals(IDLE, channel.getState(false));
1576     verify(mockLoadBalancerFactory, never()).newLoadBalancer(any(Helper.class));
1577 
1578     // call getState() with requestConnection = true
1579     assertEquals(IDLE, channel.getState(true));
1580     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
1581     verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
1582     helper = helperCaptor.getValue();
1583 
1584     helper.updateBalancingState(CONNECTING, mockPicker);
1585     assertEquals(CONNECTING, channel.getState(false));
1586     assertEquals(CONNECTING, channel.getState(true));
1587     verifyNoMoreInteractions(mockLoadBalancerFactory);
1588   }
1589 
1590   @Test
getState_withRequestConnect_IdleWithLbRunning()1591   public void getState_withRequestConnect_IdleWithLbRunning() {
1592     channelBuilder.nameResolverFactory(
1593         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
1594     createChannel();
1595     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
1596 
1597     helper.updateBalancingState(IDLE, mockPicker);
1598 
1599     assertEquals(IDLE, channel.getState(true));
1600     verifyNoMoreInteractions(mockLoadBalancerFactory);
1601     verify(mockPicker).requestConnection();
1602   }
1603 
1604   @Test
notifyWhenStateChanged()1605   public void notifyWhenStateChanged() {
1606     final AtomicBoolean stateChanged = new AtomicBoolean();
1607     Runnable onStateChanged = new Runnable() {
1608       @Override
1609       public void run() {
1610         stateChanged.set(true);
1611       }
1612     };
1613 
1614     channelBuilder.nameResolverFactory(
1615         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
1616     createChannel();
1617     assertEquals(IDLE, channel.getState(false));
1618 
1619     channel.notifyWhenStateChanged(IDLE, onStateChanged);
1620     executor.runDueTasks();
1621     assertFalse(stateChanged.get());
1622 
1623     // state change from IDLE to CONNECTING
1624     helper.updateBalancingState(CONNECTING, mockPicker);
1625     // onStateChanged callback should run
1626     executor.runDueTasks();
1627     assertTrue(stateChanged.get());
1628 
1629     // clear and test form CONNECTING
1630     stateChanged.set(false);
1631     channel.notifyWhenStateChanged(IDLE, onStateChanged);
1632     // onStateChanged callback should run immediately
1633     executor.runDueTasks();
1634     assertTrue(stateChanged.get());
1635   }
1636 
1637   @Test
channelStateWhenChannelShutdown()1638   public void channelStateWhenChannelShutdown() {
1639     final AtomicBoolean stateChanged = new AtomicBoolean();
1640     Runnable onStateChanged = new Runnable() {
1641       @Override
1642       public void run() {
1643         stateChanged.set(true);
1644       }
1645     };
1646 
1647     channelBuilder.nameResolverFactory(
1648         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
1649     createChannel();
1650     assertEquals(IDLE, channel.getState(false));
1651     channel.notifyWhenStateChanged(IDLE, onStateChanged);
1652     executor.runDueTasks();
1653     assertFalse(stateChanged.get());
1654 
1655     channel.shutdown();
1656     assertEquals(SHUTDOWN, channel.getState(false));
1657     executor.runDueTasks();
1658     assertTrue(stateChanged.get());
1659 
1660     stateChanged.set(false);
1661     channel.notifyWhenStateChanged(SHUTDOWN, onStateChanged);
1662     helper.updateBalancingState(CONNECTING, mockPicker);
1663 
1664     assertEquals(SHUTDOWN, channel.getState(false));
1665     executor.runDueTasks();
1666     assertFalse(stateChanged.get());
1667   }
1668 
1669   @Test
stateIsIdleOnIdleTimeout()1670   public void stateIsIdleOnIdleTimeout() {
1671     long idleTimeoutMillis = 2000L;
1672     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1673     createChannel();
1674     assertEquals(IDLE, channel.getState(false));
1675 
1676     helper.updateBalancingState(CONNECTING, mockPicker);
1677     assertEquals(CONNECTING, channel.getState(false));
1678 
1679     timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
1680     assertEquals(IDLE, channel.getState(false));
1681   }
1682 
1683   @Test
panic_whenIdle()1684   public void panic_whenIdle() {
1685     subtestPanic(IDLE);
1686   }
1687 
1688   @Test
panic_whenConnecting()1689   public void panic_whenConnecting() {
1690     subtestPanic(CONNECTING);
1691   }
1692 
1693   @Test
panic_whenTransientFailure()1694   public void panic_whenTransientFailure() {
1695     subtestPanic(TRANSIENT_FAILURE);
1696   }
1697 
1698   @Test
panic_whenReady()1699   public void panic_whenReady() {
1700     subtestPanic(READY);
1701   }
1702 
subtestPanic(ConnectivityState initialState)1703   private void subtestPanic(ConnectivityState initialState) {
1704     assertNotEquals("We don't test panic mode if it's already SHUTDOWN", SHUTDOWN, initialState);
1705     long idleTimeoutMillis = 2000L;
1706     FakeNameResolverFactory nameResolverFactory =
1707         new FakeNameResolverFactory.Builder(expectedUri).build();
1708     channelBuilder.nameResolverFactory(nameResolverFactory);
1709     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1710     createChannel();
1711 
1712     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
1713     assertEquals(1, nameResolverFactory.resolvers.size());
1714     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0);
1715 
1716     Throwable panicReason = new Exception("Simulated uncaught exception");
1717     if (initialState == IDLE) {
1718       timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
1719     } else {
1720       helper.updateBalancingState(initialState, mockPicker);
1721     }
1722     assertEquals(initialState, channel.getState(false));
1723 
1724     if (initialState == IDLE) {
1725       // IDLE mode will shutdown resolver and balancer
1726       verify(mockLoadBalancer).shutdown();
1727       assertTrue(resolver.shutdown);
1728       // A new resolver is created
1729       assertEquals(1, nameResolverFactory.resolvers.size());
1730       resolver = nameResolverFactory.resolvers.remove(0);
1731       assertFalse(resolver.shutdown);
1732     } else {
1733       verify(mockLoadBalancer, never()).shutdown();
1734       assertFalse(resolver.shutdown);
1735     }
1736 
1737     // Make channel panic!
1738     channel.panic(panicReason);
1739 
1740     // Calls buffered in delayedTransport will fail
1741 
1742     // Resolver and balancer are shutdown
1743     verify(mockLoadBalancer).shutdown();
1744     assertTrue(resolver.shutdown);
1745 
1746     // Channel will stay in TRANSIENT_FAILURE. getState(true) will not revive it.
1747     assertEquals(TRANSIENT_FAILURE, channel.getState(true));
1748     assertEquals(TRANSIENT_FAILURE, channel.getState(true));
1749     verifyPanicMode(panicReason);
1750 
1751     // No new resolver or balancer are created
1752     verifyNoMoreInteractions(mockLoadBalancerFactory);
1753     assertEquals(0, nameResolverFactory.resolvers.size());
1754 
1755     // A misbehaving balancer that calls updateBalancingState() after it's shut down will not be
1756     // able to revive it.
1757     helper.updateBalancingState(READY, mockPicker);
1758     verifyPanicMode(panicReason);
1759 
1760     // Cannot be revived by exitIdleMode()
1761     channel.exitIdleMode();
1762     verifyPanicMode(panicReason);
1763 
1764     // Can still shutdown normally
1765     channel.shutdown();
1766     assertTrue(channel.isShutdown());
1767     assertTrue(channel.isTerminated());
1768     assertEquals(SHUTDOWN, channel.getState(false));
1769 
1770     // We didn't stub mockPicker, because it should have never been called in this test.
1771     verifyZeroInteractions(mockPicker);
1772   }
1773 
1774   @Test
panic_bufferedCallsWillFail()1775   public void panic_bufferedCallsWillFail() {
1776     createChannel();
1777 
1778     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
1779         .thenReturn(PickResult.withNoResult());
1780     helper.updateBalancingState(CONNECTING, mockPicker);
1781 
1782     // Start RPCs that will be buffered in delayedTransport
1783     ClientCall<String, Integer> call =
1784         channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
1785     call.start(mockCallListener, new Metadata());
1786 
1787     ClientCall<String, Integer> call2 =
1788         channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
1789     call2.start(mockCallListener2, new Metadata());
1790 
1791     executor.runDueTasks();
1792     verifyZeroInteractions(mockCallListener, mockCallListener2);
1793 
1794     // Enter panic
1795     Throwable panicReason = new Exception("Simulated uncaught exception");
1796     channel.panic(panicReason);
1797 
1798     // Buffered RPCs fail immediately
1799     executor.runDueTasks();
1800     verifyCallListenerClosed(mockCallListener, Status.Code.INTERNAL, panicReason);
1801     verifyCallListenerClosed(mockCallListener2, Status.Code.INTERNAL, panicReason);
1802   }
1803 
verifyPanicMode(Throwable cause)1804   private void verifyPanicMode(Throwable cause) {
1805     Assume.assumeTrue("Panic mode disabled to resolve issues with some tests. See #3293", false);
1806 
1807     @SuppressWarnings("unchecked")
1808     ClientCall.Listener<Integer> mockListener =
1809         (ClientCall.Listener<Integer>) mock(ClientCall.Listener.class);
1810     assertEquals(TRANSIENT_FAILURE, channel.getState(false));
1811     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
1812     call.start(mockListener, new Metadata());
1813     executor.runDueTasks();
1814     verifyCallListenerClosed(mockListener, Status.Code.INTERNAL, cause);
1815 
1816     // Channel is dead.  No more pending task to possibly revive it.
1817     assertEquals(0, timer.numPendingTasks());
1818     assertEquals(0, executor.numPendingTasks());
1819     assertEquals(0, oobExecutor.numPendingTasks());
1820   }
1821 
verifyCallListenerClosed( ClientCall.Listener<Integer> listener, Status.Code code, Throwable cause)1822   private void verifyCallListenerClosed(
1823       ClientCall.Listener<Integer> listener, Status.Code code, Throwable cause) {
1824     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(null);
1825     verify(listener).onClose(captor.capture(), any(Metadata.class));
1826     Status rpcStatus = captor.getValue();
1827     assertEquals(code, rpcStatus.getCode());
1828     assertSame(cause, rpcStatus.getCause());
1829     verifyNoMoreInteractions(listener);
1830   }
1831 
1832   @Test
idleTimeoutAndReconnect()1833   public void idleTimeoutAndReconnect() {
1834     long idleTimeoutMillis = 2000L;
1835     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1836     createChannel();
1837 
1838     timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
1839     assertEquals(IDLE, channel.getState(true /* request connection */));
1840 
1841     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
1842     // Two times of requesting connection will create loadBalancer twice.
1843     verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture());
1844     Helper helper2 = helperCaptor.getValue();
1845 
1846     // Updating on the old helper (whose balancer has been shutdown) does not change the channel
1847     // state.
1848     helper.updateBalancingState(CONNECTING, mockPicker);
1849     assertEquals(IDLE, channel.getState(false));
1850 
1851     helper2.updateBalancingState(CONNECTING, mockPicker);
1852     assertEquals(CONNECTING, channel.getState(false));
1853   }
1854 
1855   @Test
idleMode_resetsDelayedTransportPicker()1856   public void idleMode_resetsDelayedTransportPicker() {
1857     ClientStream mockStream = mock(ClientStream.class);
1858     Status pickError = Status.UNAVAILABLE.withDescription("pick result error");
1859     long idleTimeoutMillis = 1000L;
1860     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1861     channelBuilder.nameResolverFactory(
1862         new FakeNameResolverFactory.Builder(expectedUri)
1863             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
1864             .build());
1865     createChannel();
1866     assertEquals(IDLE, channel.getState(false));
1867 
1868     // This call will be buffered in delayedTransport
1869     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
1870     call.start(mockCallListener, new Metadata());
1871 
1872     // Move channel into TRANSIENT_FAILURE, which will fail the pending call
1873     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
1874         .thenReturn(PickResult.withError(pickError));
1875     helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker);
1876     assertEquals(TRANSIENT_FAILURE, channel.getState(false));
1877     executor.runDueTasks();
1878     verify(mockCallListener).onClose(same(pickError), any(Metadata.class));
1879 
1880     // Move channel to idle
1881     timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
1882     assertEquals(IDLE, channel.getState(false));
1883 
1884     // This call should be buffered, but will move the channel out of idle
1885     ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
1886     call2.start(mockCallListener2, new Metadata());
1887     executor.runDueTasks();
1888     verifyNoMoreInteractions(mockCallListener2);
1889 
1890     // Get the helper created on exiting idle
1891     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
1892     verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture());
1893     Helper helper2 = helperCaptor.getValue();
1894 
1895     // Establish a connection
1896     Subchannel subchannel = helper2.createSubchannel(addressGroup, Attributes.EMPTY);
1897     subchannel.requestConnection();
1898     MockClientTransportInfo transportInfo = transports.poll();
1899     ConnectionClientTransport mockTransport = transportInfo.transport;
1900     ManagedClientTransport.Listener transportListener = transportInfo.listener;
1901     when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
1902         .thenReturn(mockStream);
1903     transportListener.transportReady();
1904 
1905     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
1906         .thenReturn(PickResult.withSubchannel(subchannel));
1907     helper2.updateBalancingState(READY, mockPicker);
1908     assertEquals(READY, channel.getState(false));
1909     executor.runDueTasks();
1910 
1911     // Verify the buffered call was drained
1912     verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
1913     verify(mockStream).start(any(ClientStreamListener.class));
1914   }
1915 
1916   @Test
enterIdleEntersIdle()1917   public void enterIdleEntersIdle() {
1918     createChannel();
1919     helper.updateBalancingState(READY, mockPicker);
1920     assertEquals(READY, channel.getState(false));
1921 
1922     channel.enterIdle();
1923 
1924     assertEquals(IDLE, channel.getState(false));
1925   }
1926 
1927   @Test
enterIdleAfterIdleTimerIsNoOp()1928   public void enterIdleAfterIdleTimerIsNoOp() {
1929     long idleTimeoutMillis = 2000L;
1930     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1931     createChannel();
1932     timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
1933     assertEquals(IDLE, channel.getState(false));
1934 
1935     channel.enterIdle();
1936 
1937     assertEquals(IDLE, channel.getState(false));
1938   }
1939 
1940   @Test
enterIdle_exitsIdleIfDelayedStreamPending()1941   public void enterIdle_exitsIdleIfDelayedStreamPending() {
1942     FakeNameResolverFactory nameResolverFactory =
1943         new FakeNameResolverFactory.Builder(expectedUri)
1944             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
1945             .build();
1946     channelBuilder.nameResolverFactory(nameResolverFactory);
1947     createChannel();
1948 
1949     // Start a call that will be buffered in delayedTransport
1950     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
1951     call.start(mockCallListener, new Metadata());
1952 
1953     // enterIdle() will shut down the name resolver and lb policy used to get a pick for the delayed
1954     // call
1955     channel.enterIdle();
1956     assertEquals(IDLE, channel.getState(false));
1957 
1958     // enterIdle() will restart the delayed call by exiting idle. This creates a new helper.
1959     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
1960     verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture());
1961     Helper helper2 = helperCaptor.getValue();
1962 
1963     // Establish a connection
1964     Subchannel subchannel = helper2.createSubchannel(addressGroup, Attributes.EMPTY);
1965     subchannel.requestConnection();
1966     ClientStream mockStream = mock(ClientStream.class);
1967     MockClientTransportInfo transportInfo = transports.poll();
1968     ConnectionClientTransport mockTransport = transportInfo.transport;
1969     ManagedClientTransport.Listener transportListener = transportInfo.listener;
1970     when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
1971         .thenReturn(mockStream);
1972     transportListener.transportReady();
1973     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
1974         .thenReturn(PickResult.withSubchannel(subchannel));
1975     helper2.updateBalancingState(READY, mockPicker);
1976     assertEquals(READY, channel.getState(false));
1977 
1978     // Verify the original call was drained
1979     executor.runDueTasks();
1980     verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
1981     verify(mockStream).start(any(ClientStreamListener.class));
1982   }
1983 
1984   @Test
updateBalancingStateDoesUpdatePicker()1985   public void updateBalancingStateDoesUpdatePicker() {
1986     ClientStream mockStream = mock(ClientStream.class);
1987     createChannel();
1988 
1989     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
1990     call.start(mockCallListener, new Metadata());
1991 
1992     // Make the transport available with subchannel2
1993     Subchannel subchannel1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1994     Subchannel subchannel2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1995     subchannel2.requestConnection();
1996 
1997     MockClientTransportInfo transportInfo = transports.poll();
1998     ConnectionClientTransport mockTransport = transportInfo.transport;
1999     ManagedClientTransport.Listener transportListener = transportInfo.listener;
2000     when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
2001         .thenReturn(mockStream);
2002     transportListener.transportReady();
2003 
2004     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
2005         .thenReturn(PickResult.withSubchannel(subchannel1));
2006     helper.updateBalancingState(READY, mockPicker);
2007 
2008     executor.runDueTasks();
2009     verify(mockTransport, never())
2010         .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
2011     verify(mockStream, never()).start(any(ClientStreamListener.class));
2012 
2013 
2014     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
2015         .thenReturn(PickResult.withSubchannel(subchannel2));
2016     helper.updateBalancingState(READY, mockPicker);
2017 
2018     executor.runDueTasks();
2019     verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
2020     verify(mockStream).start(any(ClientStreamListener.class));
2021   }
2022 
2023   @Test
updateBalancingStateWithShutdownShouldBeIgnored()2024   public void updateBalancingStateWithShutdownShouldBeIgnored() {
2025     channelBuilder.nameResolverFactory(
2026         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
2027     createChannel();
2028     assertEquals(IDLE, channel.getState(false));
2029 
2030     Runnable onStateChanged = mock(Runnable.class);
2031     channel.notifyWhenStateChanged(IDLE, onStateChanged);
2032 
2033     helper.updateBalancingState(SHUTDOWN, mockPicker);
2034 
2035     assertEquals(IDLE, channel.getState(false));
2036     executor.runDueTasks();
2037     verify(onStateChanged, never()).run();
2038   }
2039 
2040   @Test
resetConnectBackoff()2041   public void resetConnectBackoff() {
2042     // Start with a name resolution failure to trigger backoff attempts
2043     Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
2044     FakeNameResolverFactory nameResolverFactory =
2045         new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
2046     channelBuilder.nameResolverFactory(nameResolverFactory);
2047     // Name resolution is started as soon as channel is created.
2048     createChannel();
2049     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
2050     verify(mockLoadBalancer).handleNameResolutionError(same(error));
2051 
2052     FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
2053     assertNotNull("There should be a name resolver backoff task", nameResolverBackoff);
2054     assertEquals(0, resolver.refreshCalled);
2055 
2056     // Verify resetConnectBackoff() calls refresh and cancels the scheduled backoff
2057     channel.resetConnectBackoff();
2058     assertEquals(1, resolver.refreshCalled);
2059     assertTrue(nameResolverBackoff.isCancelled());
2060 
2061     // Simulate a race between cancel and the task scheduler. Should be a no-op.
2062     nameResolverBackoff.command.run();
2063     assertEquals(1, resolver.refreshCalled);
2064 
2065     // Verify that the reconnect policy was recreated and the backoff multiplier reset to 1
2066     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS);
2067     assertEquals(2, resolver.refreshCalled);
2068   }
2069 
2070   @Test
resetConnectBackoff_noOpWithoutPendingResolverBackoff()2071   public void resetConnectBackoff_noOpWithoutPendingResolverBackoff() {
2072     FakeNameResolverFactory nameResolverFactory =
2073         new FakeNameResolverFactory.Builder(expectedUri)
2074             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
2075             .build();
2076     channelBuilder.nameResolverFactory(nameResolverFactory);
2077     createChannel();
2078     FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
2079     assertEquals(0, nameResolver.refreshCalled);
2080 
2081     channel.resetConnectBackoff();
2082 
2083     assertEquals(0, nameResolver.refreshCalled);
2084   }
2085 
2086   @Test
resetConnectBackoff_noOpWhenChannelShutdown()2087   public void resetConnectBackoff_noOpWhenChannelShutdown() {
2088     FakeNameResolverFactory nameResolverFactory =
2089         new FakeNameResolverFactory.Builder(expectedUri).build();
2090     channelBuilder.nameResolverFactory(nameResolverFactory);
2091     createChannel();
2092 
2093     channel.shutdown();
2094     assertTrue(channel.isShutdown());
2095     channel.resetConnectBackoff();
2096 
2097     FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
2098     assertEquals(0, nameResolver.refreshCalled);
2099   }
2100 
2101   @Test
resetConnectBackoff_noOpWhenNameResolverNotStarted()2102   public void resetConnectBackoff_noOpWhenNameResolverNotStarted() {
2103     FakeNameResolverFactory nameResolverFactory =
2104         new FakeNameResolverFactory.Builder(expectedUri).build();
2105     channelBuilder.nameResolverFactory(nameResolverFactory);
2106     requestConnection = false;
2107     createChannel();
2108 
2109     channel.resetConnectBackoff();
2110 
2111     FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
2112     assertEquals(0, nameResolver.refreshCalled);
2113   }
2114 
2115   @Test
channelsAndSubchannels_instrumented_name()2116   public void channelsAndSubchannels_instrumented_name() throws Exception {
2117     createChannel();
2118     assertEquals(TARGET, getStats(channel).target);
2119 
2120     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
2121     assertEquals(Collections.singletonList(addressGroup).toString(),
2122         getStats((AbstractSubchannel) subchannel).target);
2123   }
2124 
2125   @Test
channelTracing_channelCreationEvent()2126   public void channelTracing_channelCreationEvent() throws Exception {
2127     timer.forwardNanos(1234);
2128     channelBuilder.maxTraceEvents(10);
2129     createChannel();
2130     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2131         .setDescription("Channel created")
2132         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2133         .setTimestampNanos(timer.getTicker().read())
2134         .build());
2135   }
2136 
2137   @Test
channelTracing_subchannelCreationEvents()2138   public void channelTracing_subchannelCreationEvents() throws Exception {
2139     channelBuilder.maxTraceEvents(10);
2140     createChannel();
2141     timer.forwardNanos(1234);
2142     AbstractSubchannel subchannel =
2143         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
2144     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2145         .setDescription("Child channel created")
2146         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2147         .setTimestampNanos(timer.getTicker().read())
2148         .setSubchannelRef(subchannel.getInternalSubchannel())
2149         .build());
2150     assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2151         .setDescription("Subchannel created")
2152         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2153         .setTimestampNanos(timer.getTicker().read())
2154         .build());
2155   }
2156 
2157   @Test
channelTracing_nameResolvingErrorEvent()2158   public void channelTracing_nameResolvingErrorEvent() throws Exception {
2159     timer.forwardNanos(1234);
2160     channelBuilder.maxTraceEvents(10);
2161     createChannel();
2162     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2163         .setDescription("Failed to resolve name")
2164         .setSeverity(ChannelTrace.Event.Severity.CT_WARNING)
2165         .setTimestampNanos(timer.getTicker().read())
2166         .build());
2167   }
2168 
2169   @Test
channelTracing_nameResolvedEvent()2170   public void channelTracing_nameResolvedEvent() throws Exception {
2171     timer.forwardNanos(1234);
2172     channelBuilder.maxTraceEvents(10);
2173     FakeNameResolverFactory nameResolverFactory =
2174         new FakeNameResolverFactory.Builder(expectedUri)
2175             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
2176             .build();
2177     channelBuilder.nameResolverFactory(nameResolverFactory);
2178     createChannel();
2179     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2180         .setDescription("Address resolved: "
2181             + Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
2182         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2183         .setTimestampNanos(timer.getTicker().read())
2184         .build());
2185   }
2186 
2187   @Test
channelTracing_nameResolvedEvent_zeorAndNonzeroBackends()2188   public void channelTracing_nameResolvedEvent_zeorAndNonzeroBackends() throws Exception {
2189     timer.forwardNanos(1234);
2190     channelBuilder.maxTraceEvents(10);
2191     List<EquivalentAddressGroup> servers = new ArrayList<>();
2192     servers.add(new EquivalentAddressGroup(socketAddress));
2193     FakeNameResolverFactory nameResolverFactory =
2194         new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build();
2195     channelBuilder.nameResolverFactory(nameResolverFactory);
2196     createChannel();
2197 
2198     int prevSize = getStats(channel).channelTrace.events.size();
2199     nameResolverFactory.resolvers.get(0).listener.onAddresses(
2200         Collections.singletonList(new EquivalentAddressGroup(
2201             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
2202         Attributes.EMPTY);
2203     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
2204 
2205     prevSize = getStats(channel).channelTrace.events.size();
2206     nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
2207     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
2208 
2209     prevSize = getStats(channel).channelTrace.events.size();
2210     nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
2211     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
2212 
2213     prevSize = getStats(channel).channelTrace.events.size();
2214     nameResolverFactory.resolvers.get(0).listener.onAddresses(
2215         Collections.singletonList(new EquivalentAddressGroup(
2216             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
2217         Attributes.EMPTY);
2218     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
2219   }
2220 
2221   @Test
channelTracing_serviceConfigChange()2222   public void channelTracing_serviceConfigChange() throws Exception {
2223     timer.forwardNanos(1234);
2224     channelBuilder.maxTraceEvents(10);
2225     List<EquivalentAddressGroup> servers = new ArrayList<>();
2226     servers.add(new EquivalentAddressGroup(socketAddress));
2227     FakeNameResolverFactory nameResolverFactory =
2228         new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build();
2229     channelBuilder.nameResolverFactory(nameResolverFactory);
2230     createChannel();
2231 
2232     int prevSize = getStats(channel).channelTrace.events.size();
2233     Attributes attributes =
2234         Attributes.newBuilder()
2235             .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, new HashMap<String, Object>())
2236             .build();
2237     nameResolverFactory.resolvers.get(0).listener.onAddresses(
2238         Collections.singletonList(new EquivalentAddressGroup(
2239             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
2240         attributes);
2241     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
2242     assertThat(getStats(channel).channelTrace.events.get(prevSize))
2243         .isEqualTo(new ChannelTrace.Event.Builder()
2244             .setDescription("Service config changed")
2245             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2246             .setTimestampNanos(timer.getTicker().read())
2247             .build());
2248 
2249     prevSize = getStats(channel).channelTrace.events.size();
2250     nameResolverFactory.resolvers.get(0).listener.onAddresses(
2251         Collections.singletonList(new EquivalentAddressGroup(
2252             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
2253         attributes);
2254     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
2255 
2256     prevSize = getStats(channel).channelTrace.events.size();
2257     Map<String, Object> serviceConfig = new HashMap<String, Object>();
2258     serviceConfig.put("methodConfig", new HashMap<String, Object>());
2259     attributes =
2260         Attributes.newBuilder()
2261             .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig)
2262             .build();
2263     timer.forwardNanos(1234);
2264     nameResolverFactory.resolvers.get(0).listener.onAddresses(
2265         Collections.singletonList(new EquivalentAddressGroup(
2266             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
2267         attributes);
2268     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
2269     assertThat(getStats(channel).channelTrace.events.get(prevSize))
2270         .isEqualTo(new ChannelTrace.Event.Builder()
2271             .setDescription("Service config changed")
2272             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2273             .setTimestampNanos(timer.getTicker().read())
2274             .build());
2275   }
2276 
2277   @Test
channelTracing_stateChangeEvent()2278   public void channelTracing_stateChangeEvent() throws Exception {
2279     channelBuilder.maxTraceEvents(10);
2280     createChannel();
2281     timer.forwardNanos(1234);
2282     helper.updateBalancingState(CONNECTING, mockPicker);
2283     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2284         .setDescription("Entering CONNECTING state")
2285         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2286         .setTimestampNanos(timer.getTicker().read())
2287         .build());
2288   }
2289 
2290   @Test
channelTracing_subchannelStateChangeEvent()2291   public void channelTracing_subchannelStateChangeEvent() throws Exception {
2292     channelBuilder.maxTraceEvents(10);
2293     createChannel();
2294     AbstractSubchannel subchannel =
2295         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
2296     timer.forwardNanos(1234);
2297     subchannel.obtainActiveTransport();
2298     assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2299         .setDescription("Entering CONNECTING state")
2300         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2301         .setTimestampNanos(timer.getTicker().read())
2302         .build());
2303   }
2304 
2305   @Test
channelTracing_oobChannelStateChangeEvent()2306   public void channelTracing_oobChannelStateChangeEvent() throws Exception {
2307     channelBuilder.maxTraceEvents(10);
2308     createChannel();
2309     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority");
2310     timer.forwardNanos(1234);
2311     oobChannel.handleSubchannelStateChange(
2312         ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
2313     assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2314         .setDescription("Entering CONNECTING state")
2315         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2316         .setTimestampNanos(timer.getTicker().read())
2317         .build());
2318   }
2319 
2320   @Test
channelTracing_oobChannelCreationEvents()2321   public void channelTracing_oobChannelCreationEvents() throws Exception {
2322     channelBuilder.maxTraceEvents(10);
2323     createChannel();
2324     timer.forwardNanos(1234);
2325     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority");
2326     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2327         .setDescription("Child channel created")
2328         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2329         .setTimestampNanos(timer.getTicker().read())
2330         .setChannelRef(oobChannel)
2331         .build());
2332     assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2333         .setDescription("OobChannel created")
2334         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2335         .setTimestampNanos(timer.getTicker().read())
2336         .build());
2337     assertThat(getStats(oobChannel.getInternalSubchannel()).channelTrace.events).contains(
2338         new ChannelTrace.Event.Builder()
2339             .setDescription("Subchannel created")
2340             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2341             .setTimestampNanos(timer.getTicker().read())
2342             .build());
2343   }
2344 
2345   @Test
channelsAndSubchannels_instrumented_state()2346   public void channelsAndSubchannels_instrumented_state() throws Exception {
2347     createChannel();
2348 
2349     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
2350     verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
2351     helper = helperCaptor.getValue();
2352 
2353     assertEquals(IDLE, getStats(channel).state);
2354     helper.updateBalancingState(CONNECTING, mockPicker);
2355     assertEquals(CONNECTING, getStats(channel).state);
2356 
2357     AbstractSubchannel subchannel =
2358         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
2359 
2360     assertEquals(IDLE, getStats(subchannel).state);
2361     subchannel.requestConnection();
2362     assertEquals(CONNECTING, getStats(subchannel).state);
2363 
2364     MockClientTransportInfo transportInfo = transports.poll();
2365 
2366     assertEquals(CONNECTING, getStats(subchannel).state);
2367     transportInfo.listener.transportReady();
2368     assertEquals(READY, getStats(subchannel).state);
2369 
2370     assertEquals(CONNECTING, getStats(channel).state);
2371     helper.updateBalancingState(READY, mockPicker);
2372     assertEquals(READY, getStats(channel).state);
2373 
2374     channel.shutdownNow();
2375     assertEquals(SHUTDOWN, getStats(channel).state);
2376     assertEquals(SHUTDOWN, getStats(subchannel).state);
2377   }
2378 
2379   @Test
channelStat_callStarted()2380   public void channelStat_callStarted() throws Exception {
2381     createChannel();
2382     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
2383     assertEquals(0, getStats(channel).callsStarted);
2384     call.start(mockCallListener, new Metadata());
2385     assertEquals(1, getStats(channel).callsStarted);
2386     assertEquals(executor.getTicker().read(), getStats(channel).lastCallStartedNanos);
2387   }
2388 
2389   @Test
channelsAndSubChannels_instrumented_success()2390   public void channelsAndSubChannels_instrumented_success() throws Exception {
2391     channelsAndSubchannels_instrumented0(true);
2392   }
2393 
2394   @Test
channelsAndSubChannels_instrumented_fail()2395   public void channelsAndSubChannels_instrumented_fail() throws Exception {
2396     channelsAndSubchannels_instrumented0(false);
2397   }
2398 
channelsAndSubchannels_instrumented0(boolean success)2399   private void channelsAndSubchannels_instrumented0(boolean success) throws Exception {
2400     createChannel();
2401 
2402     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
2403 
2404     // Channel stat bumped when ClientCall.start() called
2405     assertEquals(0, getStats(channel).callsStarted);
2406     call.start(mockCallListener, new Metadata());
2407     assertEquals(1, getStats(channel).callsStarted);
2408 
2409     ClientStream mockStream = mock(ClientStream.class);
2410     ClientStreamTracer.Factory factory = mock(ClientStreamTracer.Factory.class);
2411     AbstractSubchannel subchannel =
2412         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
2413     subchannel.requestConnection();
2414     MockClientTransportInfo transportInfo = transports.poll();
2415     transportInfo.listener.transportReady();
2416     ClientTransport mockTransport = transportInfo.transport;
2417     when(mockTransport.newStream(
2418             any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
2419         .thenReturn(mockStream);
2420     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
2421         PickResult.withSubchannel(subchannel, factory));
2422 
2423     // subchannel stat bumped when call gets assigned to it
2424     assertEquals(0, getStats(subchannel).callsStarted);
2425     helper.updateBalancingState(READY, mockPicker);
2426     assertEquals(1, executor.runDueTasks());
2427     verify(mockStream).start(streamListenerCaptor.capture());
2428     assertEquals(1, getStats(subchannel).callsStarted);
2429 
2430     ClientStreamListener streamListener = streamListenerCaptor.getValue();
2431     call.halfClose();
2432 
2433     // closing stream listener affects subchannel stats immediately
2434     assertEquals(0, getStats(subchannel).callsSucceeded);
2435     assertEquals(0, getStats(subchannel).callsFailed);
2436     streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata());
2437     if (success) {
2438       assertEquals(1, getStats(subchannel).callsSucceeded);
2439       assertEquals(0, getStats(subchannel).callsFailed);
2440     } else {
2441       assertEquals(0, getStats(subchannel).callsSucceeded);
2442       assertEquals(1, getStats(subchannel).callsFailed);
2443     }
2444 
2445     // channel stats bumped when the ClientCall.Listener is notified
2446     assertEquals(0, getStats(channel).callsSucceeded);
2447     assertEquals(0, getStats(channel).callsFailed);
2448     executor.runDueTasks();
2449     if (success) {
2450       assertEquals(1, getStats(channel).callsSucceeded);
2451       assertEquals(0, getStats(channel).callsFailed);
2452     } else {
2453       assertEquals(0, getStats(channel).callsSucceeded);
2454       assertEquals(1, getStats(channel).callsFailed);
2455     }
2456   }
2457 
2458   @Test
channelsAndSubchannels_oob_instrumented_success()2459   public void channelsAndSubchannels_oob_instrumented_success() throws Exception {
2460     channelsAndSubchannels_oob_instrumented0(true);
2461   }
2462 
2463   @Test
channelsAndSubchannels_oob_instrumented_fail()2464   public void channelsAndSubchannels_oob_instrumented_fail() throws Exception {
2465     channelsAndSubchannels_oob_instrumented0(false);
2466   }
2467 
channelsAndSubchannels_oob_instrumented0(boolean success)2468   private void channelsAndSubchannels_oob_instrumented0(boolean success) throws Exception {
2469     // set up
2470     ClientStream mockStream = mock(ClientStream.class);
2471     createChannel();
2472 
2473     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority");
2474     AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel();
2475     FakeClock callExecutor = new FakeClock();
2476     CallOptions options =
2477         CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService());
2478     ClientCall<String, Integer> call = oobChannel.newCall(method, options);
2479     Metadata headers = new Metadata();
2480 
2481     // Channel stat bumped when ClientCall.start() called
2482     assertEquals(0, getStats(oobChannel).callsStarted);
2483     call.start(mockCallListener, headers);
2484     assertEquals(1, getStats(oobChannel).callsStarted);
2485 
2486     MockClientTransportInfo transportInfo = transports.poll();
2487     ConnectionClientTransport mockTransport = transportInfo.transport;
2488     ManagedClientTransport.Listener transportListener = transportInfo.listener;
2489     when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class)))
2490         .thenReturn(mockStream);
2491 
2492     // subchannel stat bumped when call gets assigned to it
2493     assertEquals(0, getStats(oobSubchannel).callsStarted);
2494     transportListener.transportReady();
2495     callExecutor.runDueTasks();
2496     verify(mockStream).start(streamListenerCaptor.capture());
2497     assertEquals(1, getStats(oobSubchannel).callsStarted);
2498 
2499     ClientStreamListener streamListener = streamListenerCaptor.getValue();
2500     call.halfClose();
2501 
2502     // closing stream listener affects subchannel stats immediately
2503     assertEquals(0, getStats(oobSubchannel).callsSucceeded);
2504     assertEquals(0, getStats(oobSubchannel).callsFailed);
2505     streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata());
2506     if (success) {
2507       assertEquals(1, getStats(oobSubchannel).callsSucceeded);
2508       assertEquals(0, getStats(oobSubchannel).callsFailed);
2509     } else {
2510       assertEquals(0, getStats(oobSubchannel).callsSucceeded);
2511       assertEquals(1, getStats(oobSubchannel).callsFailed);
2512     }
2513 
2514     // channel stats bumped when the ClientCall.Listener is notified
2515     assertEquals(0, getStats(oobChannel).callsSucceeded);
2516     assertEquals(0, getStats(oobChannel).callsFailed);
2517     callExecutor.runDueTasks();
2518     if (success) {
2519       assertEquals(1, getStats(oobChannel).callsSucceeded);
2520       assertEquals(0, getStats(oobChannel).callsFailed);
2521     } else {
2522       assertEquals(0, getStats(oobChannel).callsSucceeded);
2523       assertEquals(1, getStats(oobChannel).callsFailed);
2524     }
2525     // oob channel is separate from the original channel
2526     assertEquals(0, getStats(channel).callsSucceeded);
2527     assertEquals(0, getStats(channel).callsFailed);
2528   }
2529 
2530   @Test
channelsAndSubchannels_oob_instrumented_name()2531   public void channelsAndSubchannels_oob_instrumented_name() throws Exception {
2532     createChannel();
2533 
2534     String authority = "oobauthority";
2535     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, authority);
2536     assertEquals(authority, getStats(oobChannel).target);
2537   }
2538 
2539   @Test
channelsAndSubchannels_oob_instrumented_state()2540   public void channelsAndSubchannels_oob_instrumented_state() throws Exception {
2541     createChannel();
2542 
2543     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority");
2544     assertEquals(IDLE, getStats(oobChannel).state);
2545 
2546     oobChannel.getSubchannel().requestConnection();
2547     assertEquals(CONNECTING, getStats(oobChannel).state);
2548 
2549     MockClientTransportInfo transportInfo = transports.poll();
2550     ManagedClientTransport.Listener transportListener = transportInfo.listener;
2551 
2552     transportListener.transportReady();
2553     assertEquals(READY, getStats(oobChannel).state);
2554 
2555     // oobchannel state is separate from the ManagedChannel
2556     assertEquals(IDLE, getStats(channel).state);
2557     channel.shutdownNow();
2558     assertEquals(SHUTDOWN, getStats(channel).state);
2559     assertEquals(SHUTDOWN, getStats(oobChannel).state);
2560   }
2561 
2562   @Test
binaryLogInstalled()2563   public void binaryLogInstalled() throws Exception {
2564     final SettableFuture<Boolean> intercepted = SettableFuture.create();
2565     channelBuilder.binlog = new BinaryLog() {
2566       @Override
2567       public void close() throws IOException {
2568         // noop
2569       }
2570 
2571       @Override
2572       public <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
2573           ServerMethodDefinition<ReqT, RespT> oMethodDef) {
2574         return oMethodDef;
2575       }
2576 
2577       @Override
2578       public Channel wrapChannel(Channel channel) {
2579         return ClientInterceptors.intercept(channel,
2580             new ClientInterceptor() {
2581               @Override
2582               public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
2583                   MethodDescriptor<ReqT, RespT> method,
2584                   CallOptions callOptions,
2585                   Channel next) {
2586                 intercepted.set(true);
2587                 return next.newCall(method, callOptions);
2588               }
2589             });
2590       }
2591     };
2592 
2593     createChannel();
2594     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
2595     call.start(mockCallListener, new Metadata());
2596     assertTrue(intercepted.get());
2597   }
2598 
2599   @Test
retryBackoffThenChannelShutdown_retryShouldStillHappen_newCallShouldFail()2600   public void retryBackoffThenChannelShutdown_retryShouldStillHappen_newCallShouldFail() {
2601     Map<String, Object> retryPolicy = new HashMap<String, Object>();
2602     retryPolicy.put("maxAttempts", 3D);
2603     retryPolicy.put("initialBackoff", "10s");
2604     retryPolicy.put("maxBackoff", "30s");
2605     retryPolicy.put("backoffMultiplier", 2D);
2606     retryPolicy.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"));
2607     Map<String, Object> methodConfig = new HashMap<String, Object>();
2608     Map<String, Object> name = new HashMap<String, Object>();
2609     name.put("service", "service");
2610     methodConfig.put("name", Arrays.<Object>asList(name));
2611     methodConfig.put("retryPolicy", retryPolicy);
2612     Map<String, Object> serviceConfig = new HashMap<String, Object>();
2613     serviceConfig.put("methodConfig", Arrays.<Object>asList(methodConfig));
2614     Attributes attributesWithRetryPolicy = Attributes
2615         .newBuilder().set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
2616 
2617     FakeNameResolverFactory nameResolverFactory =
2618         new FakeNameResolverFactory.Builder(expectedUri)
2619             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
2620             .build();
2621     nameResolverFactory.nextResolvedAttributes.set(attributesWithRetryPolicy);
2622     channelBuilder.nameResolverFactory(nameResolverFactory);
2623     channelBuilder.executor(MoreExecutors.directExecutor());
2624     channelBuilder.enableRetry();
2625     RetriableStream.setRandom(
2626         // not random
2627         new Random() {
2628           @Override
2629           public double nextDouble() {
2630             return 1D; // fake random
2631           }
2632         });
2633 
2634     requestConnection = false;
2635     createChannel();
2636 
2637     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
2638     call.start(mockCallListener, new Metadata());
2639     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
2640     verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
2641     helper = helperCaptor.getValue();
2642     verify(mockLoadBalancer)
2643         .handleResolvedAddressGroups(nameResolverFactory.servers, attributesWithRetryPolicy);
2644 
2645     // simulating request connection and then transport ready after resolved address
2646     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
2647     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
2648         .thenReturn(PickResult.withSubchannel(subchannel));
2649     subchannel.requestConnection();
2650     MockClientTransportInfo transportInfo = transports.poll();
2651     ConnectionClientTransport mockTransport = transportInfo.transport;
2652     ClientStream mockStream = mock(ClientStream.class);
2653     ClientStream mockStream2 = mock(ClientStream.class);
2654     when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
2655         .thenReturn(mockStream).thenReturn(mockStream2);
2656     transportInfo.listener.transportReady();
2657     helper.updateBalancingState(READY, mockPicker);
2658 
2659     ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
2660         ArgumentCaptor.forClass(ClientStreamListener.class);
2661     verify(mockStream).start(streamListenerCaptor.capture());
2662     assertThat(timer.getPendingTasks()).isEmpty();
2663 
2664     // trigger retry
2665     streamListenerCaptor.getValue().closed(Status.UNAVAILABLE, new Metadata());
2666 
2667     // in backoff
2668     timer.forwardTime(5, TimeUnit.SECONDS);
2669     assertThat(timer.getPendingTasks()).hasSize(1);
2670     verify(mockStream2, never()).start(any(ClientStreamListener.class));
2671 
2672     // shutdown during backoff period
2673     channel.shutdown();
2674 
2675     assertThat(timer.getPendingTasks()).hasSize(1);
2676     verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class));
2677 
2678     ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
2679     call2.start(mockCallListener2, new Metadata());
2680 
2681     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
2682     verify(mockCallListener2).onClose(statusCaptor.capture(), any(Metadata.class));
2683     assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
2684     assertEquals("Channel shutdown invoked", statusCaptor.getValue().getDescription());
2685 
2686     // backoff ends
2687     timer.forwardTime(5, TimeUnit.SECONDS);
2688     assertThat(timer.getPendingTasks()).isEmpty();
2689     verify(mockStream2).start(streamListenerCaptor.capture());
2690     verify(mockLoadBalancer, never()).shutdown();
2691     assertFalse(
2692         "channel.isTerminated() is expected to be false but was true",
2693         channel.isTerminated());
2694 
2695     streamListenerCaptor.getValue().closed(Status.INTERNAL, new Metadata());
2696     verify(mockLoadBalancer).shutdown();
2697     // simulating the shutdown of load balancer triggers the shutdown of subchannel
2698     subchannel.shutdown();
2699     transportInfo.listener.transportTerminated(); // simulating transport terminated
2700     assertTrue(
2701         "channel.isTerminated() is expected to be true but was false",
2702         channel.isTerminated());
2703   }
2704 
2705   @Test
badServiceConfigIsRecoverable()2706   public void badServiceConfigIsRecoverable() throws Exception {
2707     final List<EquivalentAddressGroup> addresses =
2708         ImmutableList.of(new EquivalentAddressGroup(new SocketAddress() {}));
2709     final class FakeNameResolver extends NameResolver {
2710       Listener listener;
2711 
2712       @Override
2713       public String getServiceAuthority() {
2714         return "also fake";
2715       }
2716 
2717       @Override
2718       public void start(Listener listener) {
2719         this.listener = listener;
2720         listener.onAddresses(addresses,
2721             Attributes.newBuilder()
2722                 .set(
2723                     GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
2724                     ImmutableMap.<String, Object>of("loadBalancingPolicy", "kaboom"))
2725                 .build());
2726       }
2727 
2728       @Override
2729       public void shutdown() {}
2730     }
2731 
2732     final class FakeNameResolverFactory extends NameResolver.Factory {
2733       FakeNameResolver resolver;
2734 
2735       @Nullable
2736       @Override
2737       public NameResolver newNameResolver(URI targetUri, Attributes params) {
2738         return (resolver = new FakeNameResolver());
2739       }
2740 
2741       @Override
2742       public String getDefaultScheme() {
2743         return "fake";
2744       }
2745     }
2746 
2747     FakeNameResolverFactory factory = new FakeNameResolverFactory();
2748     final class CustomBuilder extends AbstractManagedChannelImplBuilder<CustomBuilder> {
2749 
2750       CustomBuilder() {
2751         super(TARGET);
2752         this.executorPool = ManagedChannelImplTest.this.executorPool;
2753         this.channelz = ManagedChannelImplTest.this.channelz;
2754       }
2755 
2756       @Override
2757       protected ClientTransportFactory buildTransportFactory() {
2758         return mockTransportFactory;
2759       }
2760     }
2761 
2762     ManagedChannel mychannel = new CustomBuilder()
2763         .nameResolverFactory(factory)
2764         .loadBalancerFactory(new AutoConfiguredLoadBalancerFactory()).build();
2765 
2766     ClientCall<Void, Void> call1 =
2767         mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT);
2768     ListenableFuture<Void> future1 = ClientCalls.futureUnaryCall(call1, null);
2769     executor.runDueTasks();
2770     try {
2771       future1.get();
2772       Assert.fail();
2773     } catch (ExecutionException e) {
2774       assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("kaboom");
2775     }
2776 
2777     // ok the service config is bad, let's fix it.
2778 
2779     factory.resolver.listener.onAddresses(addresses,
2780         Attributes.newBuilder()
2781         .set(
2782             GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
2783             ImmutableMap.<String, Object>of("loadBalancingPolicy", "round_robin"))
2784         .build());
2785 
2786     ClientCall<Void, Void> call2 = mychannel.newCall(
2787         TestMethodDescriptors.voidMethod(),
2788         CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS));
2789     ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null);
2790 
2791     timer.forwardTime(1234, TimeUnit.SECONDS);
2792 
2793     executor.runDueTasks();
2794     try {
2795       future2.get();
2796       Assert.fail();
2797     } catch (ExecutionException e) {
2798       assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("deadline");
2799     }
2800 
2801     mychannel.shutdownNow();
2802   }
2803 
2804   private static final class ChannelBuilder
2805       extends AbstractManagedChannelImplBuilder<ChannelBuilder> {
2806 
ChannelBuilder()2807     ChannelBuilder() {
2808       super(TARGET);
2809     }
2810 
buildTransportFactory()2811     @Override protected ClientTransportFactory buildTransportFactory() {
2812       throw new UnsupportedOperationException();
2813     }
2814 
getNameResolverParams()2815     @Override protected Attributes getNameResolverParams() {
2816       return NAME_RESOLVER_PARAMS;
2817     }
2818   }
2819 
2820   private static final class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
2821     @Override
get()2822     public BackoffPolicy get() {
2823       return new BackoffPolicy() {
2824         int multiplier = 1;
2825 
2826         @Override
2827         public long nextBackoffNanos() {
2828           return RECONNECT_BACKOFF_INTERVAL_NANOS * multiplier++;
2829         }
2830       };
2831     }
2832   }
2833 
2834   private static final class FakeNameResolverFactory extends NameResolver.Factory {
2835     final URI expectedUri;
2836     final List<EquivalentAddressGroup> servers;
2837     final boolean resolvedAtStart;
2838     final Status error;
2839     final ArrayList<FakeNameResolver> resolvers = new ArrayList<>();
2840     // The Attributes argument of the next invocation of listener.onAddresses(servers, attrs)
2841     final AtomicReference<Attributes> nextResolvedAttributes =
2842         new AtomicReference<Attributes>(Attributes.EMPTY);
2843 
FakeNameResolverFactory( URI expectedUri, List<EquivalentAddressGroup> servers, boolean resolvedAtStart, Status error)2844     FakeNameResolverFactory(
2845         URI expectedUri,
2846         List<EquivalentAddressGroup> servers,
2847         boolean resolvedAtStart,
2848         Status error) {
2849       this.expectedUri = expectedUri;
2850       this.servers = servers;
2851       this.resolvedAtStart = resolvedAtStart;
2852       this.error = error;
2853     }
2854 
2855     @Override
newNameResolver(final URI targetUri, Attributes params)2856     public NameResolver newNameResolver(final URI targetUri, Attributes params) {
2857       if (!expectedUri.equals(targetUri)) {
2858         return null;
2859       }
2860       assertSame(NAME_RESOLVER_PARAMS, params);
2861       FakeNameResolver resolver = new FakeNameResolver(error);
2862       resolvers.add(resolver);
2863       return resolver;
2864     }
2865 
2866     @Override
getDefaultScheme()2867     public String getDefaultScheme() {
2868       return "fake";
2869     }
2870 
allResolved()2871     void allResolved() {
2872       for (FakeNameResolver resolver : resolvers) {
2873         resolver.resolved();
2874       }
2875     }
2876 
2877     final class FakeNameResolver extends NameResolver {
2878       Listener listener;
2879       boolean shutdown;
2880       int refreshCalled;
2881       Status error;
2882 
FakeNameResolver(Status error)2883       FakeNameResolver(Status error) {
2884         this.error = error;
2885       }
2886 
getServiceAuthority()2887       @Override public String getServiceAuthority() {
2888         return expectedUri.getAuthority();
2889       }
2890 
start(final Listener listener)2891       @Override public void start(final Listener listener) {
2892         this.listener = listener;
2893         if (resolvedAtStart) {
2894           resolved();
2895         }
2896       }
2897 
refresh()2898       @Override public void refresh() {
2899         assertNotNull(listener);
2900         refreshCalled++;
2901         resolved();
2902       }
2903 
resolved()2904       void resolved() {
2905         if (error != null) {
2906           listener.onError(error);
2907           return;
2908         }
2909         listener.onAddresses(servers, nextResolvedAttributes.get());
2910       }
2911 
shutdown()2912       @Override public void shutdown() {
2913         shutdown = true;
2914       }
2915     }
2916 
2917     static final class Builder {
2918       final URI expectedUri;
2919       List<EquivalentAddressGroup> servers = ImmutableList.<EquivalentAddressGroup>of();
2920       boolean resolvedAtStart = true;
2921       Status error = null;
2922 
Builder(URI expectedUri)2923       Builder(URI expectedUri) {
2924         this.expectedUri = expectedUri;
2925       }
2926 
setServers(List<EquivalentAddressGroup> servers)2927       Builder setServers(List<EquivalentAddressGroup> servers) {
2928         this.servers = servers;
2929         return this;
2930       }
2931 
setResolvedAtStart(boolean resolvedAtStart)2932       Builder setResolvedAtStart(boolean resolvedAtStart) {
2933         this.resolvedAtStart = resolvedAtStart;
2934         return this;
2935       }
2936 
setError(Status error)2937       Builder setError(Status error) {
2938         this.error = error;
2939         return this;
2940       }
2941 
build()2942       FakeNameResolverFactory build() {
2943         return new FakeNameResolverFactory(expectedUri, servers, resolvedAtStart, error);
2944       }
2945     }
2946   }
2947 
getStats(AbstractSubchannel subchannel)2948   private static ChannelStats getStats(AbstractSubchannel subchannel) throws Exception {
2949     return subchannel.getInternalSubchannel().getStats().get();
2950   }
2951 
getStats( InternalInstrumented<ChannelStats> instrumented)2952   private static ChannelStats getStats(
2953       InternalInstrumented<ChannelStats> instrumented) throws Exception {
2954     return instrumented.getStats().get();
2955   }
2956 
getNameResolverRefresh()2957   private FakeClock.ScheduledTask getNameResolverRefresh() {
2958     return Iterables.getOnlyElement(timer.getPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER), null);
2959   }
2960 }
2961