• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.grpclb;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static io.grpc.ConnectivityState.CONNECTING;
21 import static io.grpc.ConnectivityState.IDLE;
22 import static io.grpc.ConnectivityState.READY;
23 import static io.grpc.ConnectivityState.SHUTDOWN;
24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25 import static io.grpc.grpclb.GrpclbState.BUFFER_ENTRY;
26 import static io.grpc.grpclb.GrpclbState.DROP_PICK_RESULT;
27 import static org.junit.Assert.assertEquals;
28 import static org.junit.Assert.assertFalse;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
32 import static org.mockito.AdditionalAnswers.delegatesTo;
33 import static org.mockito.Matchers.any;
34 import static org.mockito.Matchers.eq;
35 import static org.mockito.Matchers.same;
36 import static org.mockito.Mockito.atLeast;
37 import static org.mockito.Mockito.doAnswer;
38 import static org.mockito.Mockito.inOrder;
39 import static org.mockito.Mockito.mock;
40 import static org.mockito.Mockito.never;
41 import static org.mockito.Mockito.times;
42 import static org.mockito.Mockito.verify;
43 import static org.mockito.Mockito.verifyNoMoreInteractions;
44 import static org.mockito.Mockito.when;
45 
46 import com.google.common.util.concurrent.MoreExecutors;
47 import com.google.protobuf.ByteString;
48 import com.google.protobuf.util.Durations;
49 import com.google.protobuf.util.Timestamps;
50 import io.grpc.Attributes;
51 import io.grpc.CallOptions;
52 import io.grpc.ClientStreamTracer;
53 import io.grpc.ConnectivityState;
54 import io.grpc.ConnectivityStateInfo;
55 import io.grpc.EquivalentAddressGroup;
56 import io.grpc.LoadBalancer.Helper;
57 import io.grpc.LoadBalancer.PickResult;
58 import io.grpc.LoadBalancer.PickSubchannelArgs;
59 import io.grpc.LoadBalancer.Subchannel;
60 import io.grpc.LoadBalancer.SubchannelPicker;
61 import io.grpc.ManagedChannel;
62 import io.grpc.Metadata;
63 import io.grpc.Status;
64 import io.grpc.grpclb.GrpclbState.BackendEntry;
65 import io.grpc.grpclb.GrpclbState.DropEntry;
66 import io.grpc.grpclb.GrpclbState.ErrorEntry;
67 import io.grpc.grpclb.GrpclbState.RoundRobinPicker;
68 import io.grpc.inprocess.InProcessChannelBuilder;
69 import io.grpc.inprocess.InProcessServerBuilder;
70 import io.grpc.internal.BackoffPolicy;
71 import io.grpc.internal.FakeClock;
72 import io.grpc.internal.GrpcAttributes;
73 import io.grpc.internal.ObjectPool;
74 import io.grpc.internal.SerializingExecutor;
75 import io.grpc.internal.TimeProvider;
76 import io.grpc.lb.v1.ClientStats;
77 import io.grpc.lb.v1.ClientStatsPerToken;
78 import io.grpc.lb.v1.InitialLoadBalanceRequest;
79 import io.grpc.lb.v1.InitialLoadBalanceResponse;
80 import io.grpc.lb.v1.LoadBalanceRequest;
81 import io.grpc.lb.v1.LoadBalanceResponse;
82 import io.grpc.lb.v1.LoadBalancerGrpc;
83 import io.grpc.lb.v1.Server;
84 import io.grpc.lb.v1.ServerList;
85 import io.grpc.stub.StreamObserver;
86 import java.net.InetSocketAddress;
87 import java.net.SocketAddress;
88 import java.util.ArrayList;
89 import java.util.Arrays;
90 import java.util.Collections;
91 import java.util.LinkedList;
92 import java.util.List;
93 import java.util.concurrent.ScheduledExecutorService;
94 import java.util.concurrent.TimeUnit;
95 import javax.annotation.Nullable;
96 import org.junit.After;
97 import org.junit.Before;
98 import org.junit.Test;
99 import org.junit.runner.RunWith;
100 import org.junit.runners.JUnit4;
101 import org.mockito.ArgumentCaptor;
102 import org.mockito.Captor;
103 import org.mockito.InOrder;
104 import org.mockito.Mock;
105 import org.mockito.MockitoAnnotations;
106 import org.mockito.invocation.InvocationOnMock;
107 import org.mockito.stubbing.Answer;
108 
109 /** Unit tests for {@link GrpclbLoadBalancer}. */
110 @RunWith(JUnit4.class)
111 public class GrpclbLoadBalancerTest {
112   private static final Attributes.Key<String> RESOLUTION_ATTR =
113       Attributes.Key.create("resolution-attr");
114   private static final String SERVICE_AUTHORITY = "api.google.com";
115   private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER =
116       new FakeClock.TaskFilter() {
117         @Override
118         public boolean shouldAccept(Runnable command) {
119           return command instanceof GrpclbState.LoadReportingTask;
120         }
121       };
122   private static final FakeClock.TaskFilter FALLBACK_MODE_TASK_FILTER =
123       new FakeClock.TaskFilter() {
124         @Override
125         public boolean shouldAccept(Runnable command) {
126           return command instanceof GrpclbState.FallbackModeTask;
127         }
128       };
129   private static final FakeClock.TaskFilter LB_RPC_RETRY_TASK_FILTER =
130       new FakeClock.TaskFilter() {
131         @Override
132         public boolean shouldAccept(Runnable command) {
133           return command instanceof GrpclbState.LbRpcRetryTask;
134         }
135       };
136   private static final Attributes LB_BACKEND_ATTRS =
137       Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_PROVIDED_BACKEND, true).build();
138 
139   @Mock
140   private Helper helper;
141   @Mock
142   private SubchannelPool subchannelPool;
143   private SubchannelPicker currentPicker;
144   private LoadBalancerGrpc.LoadBalancerImplBase mockLbService;
145   @Captor
146   private ArgumentCaptor<StreamObserver<LoadBalanceResponse>> lbResponseObserverCaptor;
147   private final FakeClock fakeClock = new FakeClock();
148   private final LinkedList<StreamObserver<LoadBalanceRequest>> lbRequestObservers =
149       new LinkedList<StreamObserver<LoadBalanceRequest>>();
150   private final LinkedList<Subchannel> mockSubchannels = new LinkedList<Subchannel>();
151   private final LinkedList<ManagedChannel> fakeOobChannels = new LinkedList<ManagedChannel>();
152   private final ArrayList<Subchannel> subchannelTracker = new ArrayList<>();
153   private final ArrayList<ManagedChannel> oobChannelTracker = new ArrayList<>();
154   private final ArrayList<String> failingLbAuthorities = new ArrayList<>();
155   private final TimeProvider timeProvider = new TimeProvider() {
156       @Override
157       public long currentTimeNanos() {
158         return fakeClock.getTicker().read();
159       }
160     };
161   private io.grpc.Server fakeLbServer;
162   @Captor
163   private ArgumentCaptor<SubchannelPicker> pickerCaptor;
164   private final SerializingExecutor channelExecutor =
165       new SerializingExecutor(MoreExecutors.directExecutor());
166   @Mock
167   private ObjectPool<ScheduledExecutorService> timerServicePool;
168   @Mock
169   private BackoffPolicy.Provider backoffPolicyProvider;
170   @Mock
171   private BackoffPolicy backoffPolicy1;
172   @Mock
173   private BackoffPolicy backoffPolicy2;
174   private GrpclbLoadBalancer balancer;
175 
176   @SuppressWarnings("unchecked")
177   @Before
setUp()178   public void setUp() throws Exception {
179     MockitoAnnotations.initMocks(this);
180     mockLbService = mock(LoadBalancerGrpc.LoadBalancerImplBase.class, delegatesTo(
181         new LoadBalancerGrpc.LoadBalancerImplBase() {
182           @Override
183           public StreamObserver<LoadBalanceRequest> balanceLoad(
184               final StreamObserver<LoadBalanceResponse> responseObserver) {
185             StreamObserver<LoadBalanceRequest> requestObserver =
186                 mock(StreamObserver.class);
187             Answer<Void> closeRpc = new Answer<Void>() {
188                 @Override
189                 public Void answer(InvocationOnMock invocation) {
190                   responseObserver.onCompleted();
191                   return null;
192                 }
193               };
194             doAnswer(closeRpc).when(requestObserver).onCompleted();
195             lbRequestObservers.add(requestObserver);
196             return requestObserver;
197           }
198         }));
199     fakeLbServer = InProcessServerBuilder.forName("fakeLb")
200         .directExecutor().addService(mockLbService).build().start();
201     doAnswer(new Answer<ManagedChannel>() {
202         @Override
203         public ManagedChannel answer(InvocationOnMock invocation) throws Throwable {
204           String authority = (String) invocation.getArguments()[1];
205           ManagedChannel channel;
206           if (failingLbAuthorities.contains(authority)) {
207             channel = InProcessChannelBuilder.forName("nonExistFakeLb").directExecutor()
208                 .overrideAuthority(authority).build();
209           } else {
210             channel = InProcessChannelBuilder.forName("fakeLb").directExecutor()
211                 .overrideAuthority(authority).build();
212           }
213           fakeOobChannels.add(channel);
214           oobChannelTracker.add(channel);
215           return channel;
216         }
217       }).when(helper).createOobChannel(any(EquivalentAddressGroup.class), any(String.class));
218     doAnswer(new Answer<Subchannel>() {
219         @Override
220         public Subchannel answer(InvocationOnMock invocation) throws Throwable {
221           Subchannel subchannel = mock(Subchannel.class);
222           EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0];
223           Attributes attrs = (Attributes) invocation.getArguments()[1];
224           when(subchannel.getAddresses()).thenReturn(eag);
225           when(subchannel.getAttributes()).thenReturn(attrs);
226           mockSubchannels.add(subchannel);
227           subchannelTracker.add(subchannel);
228           return subchannel;
229         }
230       }).when(subchannelPool).takeOrCreateSubchannel(
231           any(EquivalentAddressGroup.class), any(Attributes.class));
232     doAnswer(new Answer<Void>() {
233         @Override
234         public Void answer(InvocationOnMock invocation) throws Throwable {
235           Runnable task = (Runnable) invocation.getArguments()[0];
236           channelExecutor.execute(task);
237           return null;
238         }
239       }).when(helper).runSerialized(any(Runnable.class));
240     doAnswer(new Answer<Void>() {
241         @Override
242         public Void answer(InvocationOnMock invocation) throws Throwable {
243           currentPicker = (SubchannelPicker) invocation.getArguments()[1];
244           return null;
245         }
246       }).when(helper).updateBalancingState(
247           any(ConnectivityState.class), any(SubchannelPicker.class));
248     when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY);
249     ScheduledExecutorService timerService = fakeClock.getScheduledExecutorService();
250     when(timerServicePool.getObject()).thenReturn(timerService);
251     when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
252     when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L);
253     when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
254     balancer = new GrpclbLoadBalancer(
255         helper,
256         subchannelPool,
257         timerServicePool,
258         timeProvider,
259         backoffPolicyProvider);
260     verify(subchannelPool).init(same(helper), same(timerService));
261   }
262 
263   @After
tearDown()264   public void tearDown() {
265     try {
266       if (balancer != null) {
267         channelExecutor.execute(new Runnable() {
268             @Override
269             public void run() {
270               balancer.shutdown();
271             }
272           });
273       }
274       for (ManagedChannel channel : oobChannelTracker) {
275         assertTrue(channel + " is shutdown", channel.isShutdown());
276         // balancer should have closed the LB stream, terminating the OOB channel.
277         assertTrue(channel + " is terminated", channel.isTerminated());
278       }
279       // GRPCLB manages subchannels only through subchannelPool
280       for (Subchannel subchannel: subchannelTracker) {
281         verify(subchannelPool).returnSubchannel(same(subchannel));
282         // Our mock subchannelPool never calls Subchannel.shutdown(), thus we can tell if
283         // LoadBalancer has called it expectedly.
284         verify(subchannel, never()).shutdown();
285       }
286       verify(helper, never())
287           .createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class));
288       // No timer should linger after shutdown
289       assertThat(fakeClock.getPendingTasks()).isEmpty();
290     } finally {
291       if (fakeLbServer != null) {
292         fakeLbServer.shutdownNow();
293       }
294     }
295   }
296 
297   @Test
roundRobinPickerNoDrop()298   public void roundRobinPickerNoDrop() {
299     GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider);
300     Subchannel subchannel = mock(Subchannel.class);
301     BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001");
302     BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002");
303 
304     List<BackendEntry> pickList = Arrays.asList(b1, b2);
305     RoundRobinPicker picker = new RoundRobinPicker(Collections.<DropEntry>emptyList(), pickList);
306 
307     PickSubchannelArgs args1 = mock(PickSubchannelArgs.class);
308     Metadata headers1 = new Metadata();
309     // The existing token on the headers will be replaced
310     headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
311     when(args1.getHeaders()).thenReturn(headers1);
312     assertSame(b1.result, picker.pickSubchannel(args1));
313     verify(args1).getHeaders();
314     assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
315 
316     PickSubchannelArgs args2 = mock(PickSubchannelArgs.class);
317     Metadata headers2 = new Metadata();
318     when(args2.getHeaders()).thenReturn(headers2);
319     assertSame(b2.result, picker.pickSubchannel(args2));
320     verify(args2).getHeaders();
321     assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
322 
323     PickSubchannelArgs args3 = mock(PickSubchannelArgs.class);
324     Metadata headers3 = new Metadata();
325     when(args3.getHeaders()).thenReturn(headers3);
326     assertSame(b1.result, picker.pickSubchannel(args3));
327     verify(args3).getHeaders();
328     assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
329 
330     verify(subchannel, never()).getAttributes();
331   }
332 
333 
334   @Test
roundRobinPickerWithDrop()335   public void roundRobinPickerWithDrop() {
336     assertTrue(DROP_PICK_RESULT.isDrop());
337     GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider);
338     Subchannel subchannel = mock(Subchannel.class);
339     // 1 out of 2 requests are to be dropped
340     DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003");
341     List<DropEntry> dropList = Arrays.asList(null, d);
342 
343     BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001");
344     BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002");
345     List<BackendEntry> pickList = Arrays.asList(b1, b2);
346     RoundRobinPicker picker = new RoundRobinPicker(dropList, pickList);
347 
348     // dropList[0], pickList[0]
349     PickSubchannelArgs args1 = mock(PickSubchannelArgs.class);
350     Metadata headers1 = new Metadata();
351     headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
352     when(args1.getHeaders()).thenReturn(headers1);
353     assertSame(b1.result, picker.pickSubchannel(args1));
354     verify(args1).getHeaders();
355     assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
356 
357     // dropList[1]: drop
358     PickSubchannelArgs args2 = mock(PickSubchannelArgs.class);
359     Metadata headers2 = new Metadata();
360     when(args2.getHeaders()).thenReturn(headers2);
361     assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args2));
362     verify(args2, never()).getHeaders();
363 
364     // dropList[0], pickList[1]
365     PickSubchannelArgs args3 = mock(PickSubchannelArgs.class);
366     Metadata headers3 = new Metadata();
367     when(args3.getHeaders()).thenReturn(headers3);
368     assertSame(b2.result, picker.pickSubchannel(args3));
369     verify(args3).getHeaders();
370     assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
371 
372     // dropList[1]: drop
373     PickSubchannelArgs args4 = mock(PickSubchannelArgs.class);
374     Metadata headers4 = new Metadata();
375     when(args4.getHeaders()).thenReturn(headers4);
376     assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args4));
377     verify(args4, never()).getHeaders();
378 
379     // dropList[0], pickList[0]
380     PickSubchannelArgs args5 = mock(PickSubchannelArgs.class);
381     Metadata headers5 = new Metadata();
382     when(args5.getHeaders()).thenReturn(headers5);
383     assertSame(b1.result, picker.pickSubchannel(args5));
384     verify(args5).getHeaders();
385     assertThat(headers5.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
386 
387     verify(subchannel, never()).getAttributes();
388   }
389 
390   @Test
loadReporting()391   public void loadReporting() {
392     Metadata headers = new Metadata();
393     PickSubchannelArgs args = mock(PickSubchannelArgs.class);
394     when(args.getHeaders()).thenReturn(headers);
395 
396     long loadReportIntervalMillis = 1983;
397     List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
398     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
399     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
400 
401     // Fallback timer is started as soon as address is resolved.
402     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
403 
404     assertEquals(1, fakeOobChannels.size());
405     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
406     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
407     assertEquals(1, lbRequestObservers.size());
408     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
409     InOrder inOrder = inOrder(lbRequestObserver);
410     InOrder helperInOrder = inOrder(helper, subchannelPool);
411 
412     inOrder.verify(lbRequestObserver).onNext(
413         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
414                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
415             .build()));
416 
417     // Simulate receiving LB response
418     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
419     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
420 
421     // Load reporting task is scheduled
422     assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
423     assertEquals(0, fakeClock.runDueTasks());
424 
425     List<ServerEntry> backends = Arrays.asList(
426         new ServerEntry("127.0.0.1", 2000, "token0001"),
427         new ServerEntry("token0001"),  // drop
428         new ServerEntry("127.0.0.1", 2010, "token0002"),
429         new ServerEntry("token0003"));  // drop
430 
431     lbResponseObserver.onNext(buildLbResponse(backends));
432 
433     assertEquals(2, mockSubchannels.size());
434     Subchannel subchannel1 = mockSubchannels.poll();
435     Subchannel subchannel2 = mockSubchannels.poll();
436     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
437     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
438     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
439     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
440 
441     helperInOrder.verify(helper, atLeast(1))
442         .updateBalancingState(eq(READY), pickerCaptor.capture());
443     RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
444     assertThat(picker.dropList).containsExactly(
445         null,
446         new DropEntry(getLoadRecorder(), "token0001"),
447         null,
448         new DropEntry(getLoadRecorder(), "token0003")).inOrder();
449     assertThat(picker.pickList).containsExactly(
450         new BackendEntry(subchannel1, getLoadRecorder(), "token0001"),
451         new BackendEntry(subchannel2, getLoadRecorder(), "token0002")).inOrder();
452 
453     // Report, no data
454     assertNextReport(
455         inOrder, lbRequestObserver, loadReportIntervalMillis,
456         ClientStats.newBuilder().build());
457 
458     PickResult pick1 = picker.pickSubchannel(args);
459     assertSame(subchannel1, pick1.getSubchannel());
460     assertSame(getLoadRecorder(), pick1.getStreamTracerFactory());
461 
462     // Merely the pick will not be recorded as upstart.
463     assertNextReport(
464         inOrder, lbRequestObserver, loadReportIntervalMillis,
465         ClientStats.newBuilder().build());
466 
467     ClientStreamTracer tracer1 =
468         pick1.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
469 
470     PickResult pick2 = picker.pickSubchannel(args);
471     assertNull(pick2.getSubchannel());
472     assertSame(DROP_PICK_RESULT, pick2);
473 
474     // Report includes upstart of pick1 and the drop of pick2
475     assertNextReport(
476         inOrder, lbRequestObserver, loadReportIntervalMillis,
477         ClientStats.newBuilder()
478             .setNumCallsStarted(2)
479             .setNumCallsFinished(1)  // pick2
480             .addCallsFinishedWithDrop(
481                 ClientStatsPerToken.newBuilder()
482                     .setLoadBalanceToken("token0001")
483                     .setNumCalls(1)          // pick2
484                     .build())
485             .build());
486 
487     PickResult pick3 = picker.pickSubchannel(args);
488     assertSame(subchannel2, pick3.getSubchannel());
489     assertSame(getLoadRecorder(), pick3.getStreamTracerFactory());
490     ClientStreamTracer tracer3 =
491         pick3.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
492 
493     // pick3 has sent out headers
494     tracer3.outboundHeaders();
495 
496     // 3rd report includes pick3's upstart
497     assertNextReport(
498         inOrder, lbRequestObserver, loadReportIntervalMillis,
499         ClientStats.newBuilder()
500             .setNumCallsStarted(1)
501             .build());
502 
503     PickResult pick4 = picker.pickSubchannel(args);
504     assertNull(pick4.getSubchannel());
505     assertSame(DROP_PICK_RESULT, pick4);
506 
507     // pick1 ended without sending anything
508     tracer1.streamClosed(Status.CANCELLED);
509 
510     // 4th report includes end of pick1 and drop of pick4
511     assertNextReport(
512         inOrder, lbRequestObserver, loadReportIntervalMillis,
513         ClientStats.newBuilder()
514             .setNumCallsStarted(1)  // pick4
515             .setNumCallsFinished(2)
516             .setNumCallsFinishedWithClientFailedToSend(1)   // pick1
517             .addCallsFinishedWithDrop(
518                 ClientStatsPerToken.newBuilder()
519                     .setLoadBalanceToken("token0003")
520                     .setNumCalls(1)   // pick4
521                     .build())
522         .build());
523 
524     PickResult pick5 = picker.pickSubchannel(args);
525     assertSame(subchannel1, pick1.getSubchannel());
526     assertSame(getLoadRecorder(), pick5.getStreamTracerFactory());
527     ClientStreamTracer tracer5 =
528         pick5.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
529 
530     // pick3 ended without receiving response headers
531     tracer3.streamClosed(Status.DEADLINE_EXCEEDED);
532 
533     // pick5 sent and received headers
534     tracer5.outboundHeaders();
535     tracer5.inboundHeaders();
536 
537     // 5th report includes pick3's end and pick5's upstart
538     assertNextReport(
539         inOrder, lbRequestObserver, loadReportIntervalMillis,
540         ClientStats.newBuilder()
541             .setNumCallsStarted(1)  // pick5
542             .setNumCallsFinished(1)  // pick3
543             .build());
544 
545     // pick5 ends
546     tracer5.streamClosed(Status.OK);
547 
548     // 6th report includes pick5's end
549     assertNextReport(
550         inOrder, lbRequestObserver, loadReportIntervalMillis,
551         ClientStats.newBuilder()
552             .setNumCallsFinished(1)
553             .setNumCallsFinishedKnownReceived(1)
554             .build());
555 
556     assertEquals(1, fakeClock.numPendingTasks());
557     // Balancer closes the stream, scheduled reporting task cancelled
558     lbResponseObserver.onError(Status.UNAVAILABLE.asException());
559     assertEquals(0, fakeClock.numPendingTasks());
560 
561     // New stream created
562     verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
563     lbResponseObserver = lbResponseObserverCaptor.getValue();
564     assertEquals(1, lbRequestObservers.size());
565     lbRequestObserver = lbRequestObservers.poll();
566     inOrder = inOrder(lbRequestObserver);
567 
568     inOrder.verify(lbRequestObserver).onNext(
569         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
570                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
571             .build()));
572 
573     // Load reporting is also requested
574     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
575 
576     // No picker created because balancer is still using the results from the last stream
577     helperInOrder.verify(helper, never())
578         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
579 
580     // Make a new pick on that picker.  It will not show up on the report of the new stream, because
581     // that picker is associated with the previous stream.
582     PickResult pick6 = picker.pickSubchannel(args);
583     assertNull(pick6.getSubchannel());
584     assertSame(DROP_PICK_RESULT, pick6);
585     assertNextReport(
586         inOrder, lbRequestObserver, loadReportIntervalMillis,
587         ClientStats.newBuilder().build());
588 
589     // New stream got the list update
590     lbResponseObserver.onNext(buildLbResponse(backends));
591 
592     // Same backends, thus no new subchannels
593     helperInOrder.verify(subchannelPool, never()).takeOrCreateSubchannel(
594         any(EquivalentAddressGroup.class), any(Attributes.class));
595     // But the new RoundRobinEntries have a new loadRecorder, thus considered different from
596     // the previous list, thus a new picker is created
597     helperInOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
598     picker = (RoundRobinPicker) pickerCaptor.getValue();
599 
600     PickResult pick1p = picker.pickSubchannel(args);
601     assertSame(subchannel1, pick1p.getSubchannel());
602     assertSame(getLoadRecorder(), pick1p.getStreamTracerFactory());
603     pick1p.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
604 
605     // The pick from the new stream will be included in the report
606     assertNextReport(
607         inOrder, lbRequestObserver, loadReportIntervalMillis,
608         ClientStats.newBuilder()
609             .setNumCallsStarted(1)
610             .build());
611 
612     verify(args, atLeast(0)).getHeaders();
613     verifyNoMoreInteractions(args);
614   }
615 
616   @Test
abundantInitialResponse()617   public void abundantInitialResponse() {
618     Metadata headers = new Metadata();
619     PickSubchannelArgs args = mock(PickSubchannelArgs.class);
620     when(args.getHeaders()).thenReturn(headers);
621 
622     List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
623     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
624     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
625     assertEquals(1, fakeOobChannels.size());
626     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
627     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
628 
629     // Simulate LB initial response
630     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
631     lbResponseObserver.onNext(buildInitialResponse(1983));
632 
633     // Load reporting task is scheduled
634     assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
635     FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next();
636     assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
637 
638     // Simulate an abundant LB initial response, with a different report interval
639     lbResponseObserver.onNext(buildInitialResponse(9097));
640     // It doesn't affect load-reporting at all
641     assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER))
642         .containsExactly(scheduledTask);
643     assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
644   }
645 
646   @Test
raceBetweenLoadReportingAndLbStreamClosure()647   public void raceBetweenLoadReportingAndLbStreamClosure() {
648     Metadata headers = new Metadata();
649     PickSubchannelArgs args = mock(PickSubchannelArgs.class);
650     when(args.getHeaders()).thenReturn(headers);
651 
652     List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
653     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
654     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
655     assertEquals(1, fakeOobChannels.size());
656     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
657     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
658     assertEquals(1, lbRequestObservers.size());
659     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
660     InOrder inOrder = inOrder(lbRequestObserver);
661 
662     inOrder.verify(lbRequestObserver).onNext(
663         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
664                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
665             .build()));
666 
667     // Simulate receiving LB response
668     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
669     lbResponseObserver.onNext(buildInitialResponse(1983));
670 
671     // Load reporting task is scheduled
672     assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
673     FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next();
674     assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
675 
676     // Close lbStream
677     lbResponseObserver.onCompleted();
678 
679     // Reporting task cancelled
680     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
681 
682     // Simulate a race condition where the task has just started when its cancelled
683     scheduledTask.command.run();
684 
685     // No report sent. No new task scheduled
686     inOrder.verify(lbRequestObserver, never()).onNext(any(LoadBalanceRequest.class));
687     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
688   }
689 
assertNextReport( InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver, long loadReportIntervalMillis, ClientStats expectedReport)690   private void assertNextReport(
691       InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver,
692       long loadReportIntervalMillis, ClientStats expectedReport) {
693     assertEquals(0, fakeClock.forwardTime(loadReportIntervalMillis - 1, TimeUnit.MILLISECONDS));
694     inOrder.verifyNoMoreInteractions();
695     assertEquals(1, fakeClock.forwardTime(1, TimeUnit.MILLISECONDS));
696     assertEquals(1, fakeClock.numPendingTasks());
697     inOrder.verify(lbRequestObserver).onNext(
698         eq(LoadBalanceRequest.newBuilder()
699             .setClientStats(
700                 ClientStats.newBuilder(expectedReport)
701                 .setTimestamp(Timestamps.fromNanos(fakeClock.getTicker().read()))
702                 .build())
703             .build()));
704   }
705 
706   @Test
acquireAndReleaseScheduledExecutor()707   public void acquireAndReleaseScheduledExecutor() {
708     verify(timerServicePool).getObject();
709     verifyNoMoreInteractions(timerServicePool);
710 
711     balancer.shutdown();
712     verify(timerServicePool).returnObject(same(fakeClock.getScheduledExecutorService()));
713     verifyNoMoreInteractions(timerServicePool);
714   }
715 
716   @Test
nameResolutionFailsThenRecoverToDelegate()717   public void nameResolutionFailsThenRecoverToDelegate() {
718     Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
719     deliverNameResolutionError(error);
720     verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
721     RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
722     assertThat(picker.dropList).isEmpty();
723     assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
724 
725     // Recover with a subsequent success
726     List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false);
727 
728     Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build();
729     deliverResolvedAddresses(resolvedServers, resolutionAttrs);
730   }
731 
732   @Test
nameResolutionFailsThenRecoverToGrpclb()733   public void nameResolutionFailsThenRecoverToGrpclb() {
734     Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
735     deliverNameResolutionError(error);
736     verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
737     RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
738     assertThat(picker.dropList).isEmpty();
739     assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
740 
741     // Recover with a subsequent success
742     List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(true);
743     EquivalentAddressGroup eag = resolvedServers.get(0);
744 
745     Attributes resolutionAttrs = Attributes.EMPTY;
746     deliverResolvedAddresses(resolvedServers, resolutionAttrs);
747 
748     verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0)));
749     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
750   }
751 
752   @Test
grpclbThenNameResolutionFails()753   public void grpclbThenNameResolutionFails() {
754     InOrder inOrder = inOrder(helper, subchannelPool);
755     // Go to GRPCLB first
756     List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
757     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
758     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
759 
760     verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
761     assertEquals(1, fakeOobChannels.size());
762     ManagedChannel oobChannel = fakeOobChannels.poll();
763     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
764     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
765 
766     // Let name resolution fail before round-robin list is ready
767     Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
768     deliverNameResolutionError(error);
769 
770     inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
771     RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
772     assertThat(picker.dropList).isEmpty();
773     assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
774     assertFalse(oobChannel.isShutdown());
775 
776     // Simulate receiving LB response
777     List<ServerEntry> backends = Arrays.asList(
778         new ServerEntry("127.0.0.1", 2000, "TOKEN1"),
779         new ServerEntry("127.0.0.1", 2010, "TOKEN2"));
780     verify(helper, never()).runSerialized(any(Runnable.class));
781     lbResponseObserver.onNext(buildInitialResponse());
782     lbResponseObserver.onNext(buildLbResponse(backends));
783 
784     verify(helper, times(2)).runSerialized(any(Runnable.class));
785     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
786         eq(new EquivalentAddressGroup(backends.get(0).addr, LB_BACKEND_ATTRS)),
787         any(Attributes.class));
788     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
789         eq(new EquivalentAddressGroup(backends.get(1).addr, LB_BACKEND_ATTRS)),
790         any(Attributes.class));
791   }
792 
793   @Test
grpclbUpdatedAddresses_avoidsReconnect()794   public void grpclbUpdatedAddresses_avoidsReconnect() {
795     List<EquivalentAddressGroup> grpclbResolutionList =
796         createResolvedServerAddresses(true, false);
797     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
798     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
799 
800     verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
801     ManagedChannel oobChannel = fakeOobChannels.poll();
802     assertEquals(1, lbRequestObservers.size());
803 
804     List<EquivalentAddressGroup> grpclbResolutionList2 =
805         createResolvedServerAddresses(true, false, true);
806     EquivalentAddressGroup combinedEag = new EquivalentAddressGroup(Arrays.asList(
807         grpclbResolutionList2.get(0).getAddresses().get(0),
808         grpclbResolutionList2.get(2).getAddresses().get(0)),
809         lbAttributes(lbAuthority(0)));
810     deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs);
811     verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(combinedEag));
812     assertEquals(1, lbRequestObservers.size()); // No additional RPC
813   }
814 
815   @Test
grpclbUpdatedAddresses_reconnectOnAuthorityChange()816   public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() {
817     List<EquivalentAddressGroup> grpclbResolutionList =
818         createResolvedServerAddresses(true, false);
819     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
820     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
821 
822     verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
823     ManagedChannel oobChannel = fakeOobChannels.poll();
824     assertEquals(1, lbRequestObservers.size());
825 
826     final String newAuthority = "some-new-authority";
827     List<EquivalentAddressGroup> grpclbResolutionList2 =
828         createResolvedServerAddresses(false);
829     grpclbResolutionList2.add(new EquivalentAddressGroup(
830         new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority)));
831     deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs);
832     assertTrue(oobChannel.isTerminated());
833     verify(helper).createOobChannel(eq(grpclbResolutionList2.get(1)), eq(newAuthority));
834     assertEquals(2, lbRequestObservers.size()); // An additional RPC
835   }
836 
837   @Test
grpclbWorking()838   public void grpclbWorking() {
839     InOrder inOrder = inOrder(helper, subchannelPool);
840     List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
841     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
842     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
843 
844     // Fallback timer is started as soon as the addresses are resolved.
845     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
846 
847     verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
848     assertEquals(1, fakeOobChannels.size());
849     ManagedChannel oobChannel = fakeOobChannels.poll();
850     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
851     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
852     assertEquals(1, lbRequestObservers.size());
853     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
854     verify(lbRequestObserver).onNext(
855         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
856                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
857             .build()));
858 
859     // Simulate receiving LB response
860     List<ServerEntry> backends1 = Arrays.asList(
861         new ServerEntry("127.0.0.1", 2000, "token0001"),
862         new ServerEntry("127.0.0.1", 2010, "token0002"));
863     inOrder.verify(helper, never())
864         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
865     lbResponseObserver.onNext(buildInitialResponse());
866     lbResponseObserver.onNext(buildLbResponse(backends1));
867 
868     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
869         eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)),
870         any(Attributes.class));
871     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
872         eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)),
873         any(Attributes.class));
874     assertEquals(2, mockSubchannels.size());
875     Subchannel subchannel1 = mockSubchannels.poll();
876     Subchannel subchannel2 = mockSubchannels.poll();
877     verify(subchannel1).requestConnection();
878     verify(subchannel2).requestConnection();
879     assertEquals(
880         new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS),
881         subchannel1.getAddresses());
882     assertEquals(
883         new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS),
884         subchannel2.getAddresses());
885 
886     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
887     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
888     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
889     RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
890     assertThat(picker0.dropList).containsExactly(null, null);
891     assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY);
892     inOrder.verifyNoMoreInteractions();
893 
894     // Let subchannels be connected
895     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
896     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
897     RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
898 
899     assertThat(picker1.dropList).containsExactly(null, null);
900     assertThat(picker1.pickList).containsExactly(
901         new BackendEntry(subchannel2, getLoadRecorder(), "token0002"));
902 
903     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
904     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
905     RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
906     assertThat(picker2.dropList).containsExactly(null, null);
907     assertThat(picker2.pickList).containsExactly(
908         new BackendEntry(subchannel1, getLoadRecorder(), "token0001"),
909         new BackendEntry(subchannel2, getLoadRecorder(), "token0002"))
910         .inOrder();
911 
912     // Disconnected subchannels
913     verify(subchannel1).requestConnection();
914     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE));
915     verify(subchannel1, times(2)).requestConnection();
916     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
917     RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
918     assertThat(picker3.dropList).containsExactly(null, null);
919     assertThat(picker3.pickList).containsExactly(
920         new BackendEntry(subchannel2, getLoadRecorder(), "token0002"));
921 
922     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
923     inOrder.verifyNoMoreInteractions();
924 
925     // As long as there is at least one READY subchannel, round robin will work.
926     Status error1 = Status.UNAVAILABLE.withDescription("error1");
927     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1));
928     inOrder.verifyNoMoreInteractions();
929 
930     // If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING
931     verify(subchannel2).requestConnection();
932     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE));
933     verify(subchannel2, times(2)).requestConnection();
934     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
935     RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
936     assertThat(picker4.dropList).containsExactly(null, null);
937     assertThat(picker4.pickList).containsExactly(BUFFER_ENTRY);
938 
939     // Update backends, with a drop entry
940     List<ServerEntry> backends2 =
941         Arrays.asList(
942             new ServerEntry("127.0.0.1", 2030, "token0003"),  // New address
943             new ServerEntry("token0003"),  // drop
944             new ServerEntry("127.0.0.1", 2010, "token0004"),  // Existing address with token changed
945             new ServerEntry("127.0.0.1", 2030, "token0005"),  // New address appearing second time
946             new ServerEntry("token0006"));  // drop
947     verify(subchannelPool, never()).returnSubchannel(same(subchannel1));
948 
949     lbResponseObserver.onNext(buildLbResponse(backends2));
950     // not in backends2, closed
951     verify(subchannelPool).returnSubchannel(same(subchannel1));
952     // backends2[2], will be kept
953     verify(subchannelPool, never()).returnSubchannel(same(subchannel2));
954 
955     inOrder.verify(subchannelPool, never()).takeOrCreateSubchannel(
956         eq(new EquivalentAddressGroup(backends2.get(2).addr, LB_BACKEND_ATTRS)),
957         any(Attributes.class));
958     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
959         eq(new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS)),
960         any(Attributes.class));
961     assertEquals(1, mockSubchannels.size());
962     Subchannel subchannel3 = mockSubchannels.poll();
963     verify(subchannel3).requestConnection();
964     assertEquals(
965         new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS),
966         subchannel3.getAddresses());
967     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
968     RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue();
969     assertThat(picker7.dropList).containsExactly(
970         null,
971         new DropEntry(getLoadRecorder(), "token0003"),
972         null,
973         null,
974         new DropEntry(getLoadRecorder(), "token0006")).inOrder();
975     assertThat(picker7.pickList).containsExactly(BUFFER_ENTRY);
976 
977     // State updates on obsolete subchannel1 will have no effect
978     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
979     deliverSubchannelState(
980         subchannel1, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
981     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN));
982     inOrder.verifyNoMoreInteractions();
983 
984     deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
985     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
986     RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue();
987     assertThat(picker8.dropList).containsExactly(
988         null,
989         new DropEntry(getLoadRecorder(), "token0003"),
990         null,
991         null,
992         new DropEntry(getLoadRecorder(), "token0006")).inOrder();
993     // subchannel2 is still IDLE, thus not in the active list
994     assertThat(picker8.pickList).containsExactly(
995         new BackendEntry(subchannel3, getLoadRecorder(), "token0003"),
996         new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder();
997     // subchannel2 becomes READY and makes it into the list
998     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
999     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
1000     RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue();
1001     assertThat(picker9.dropList).containsExactly(
1002         null,
1003         new DropEntry(getLoadRecorder(), "token0003"),
1004         null,
1005         null,
1006         new DropEntry(getLoadRecorder(), "token0006")).inOrder();
1007     assertThat(picker9.pickList).containsExactly(
1008         new BackendEntry(subchannel3, getLoadRecorder(), "token0003"),
1009         new BackendEntry(subchannel2, getLoadRecorder(), "token0004"),
1010         new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder();
1011     verify(subchannelPool, never()).returnSubchannel(same(subchannel3));
1012 
1013     // Update backends, with no entry
1014     lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));
1015     verify(subchannelPool).returnSubchannel(same(subchannel2));
1016     verify(subchannelPool).returnSubchannel(same(subchannel3));
1017     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
1018     RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue();
1019     assertThat(picker10.dropList).isEmpty();
1020     assertThat(picker10.pickList).containsExactly(BUFFER_ENTRY);
1021 
1022     assertFalse(oobChannel.isShutdown());
1023     assertEquals(0, lbRequestObservers.size());
1024     verify(lbRequestObserver, never()).onCompleted();
1025     verify(lbRequestObserver, never()).onError(any(Throwable.class));
1026 
1027     // Load reporting was not requested, thus never scheduled
1028     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
1029 
1030     verify(subchannelPool, never()).clear();
1031     balancer.shutdown();
1032     verify(subchannelPool).clear();
1033   }
1034 
1035   @Test
grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires()1036   public void grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires() {
1037     subtestGrpclbFallbackInitialTimeout(false);
1038   }
1039 
1040   @Test
grpclbFallback_initialTimeout_timerExpires()1041   public void grpclbFallback_initialTimeout_timerExpires() {
1042     subtestGrpclbFallbackInitialTimeout(true);
1043   }
1044 
1045   // Fallback or not within the period of the initial timeout.
subtestGrpclbFallbackInitialTimeout(boolean timerExpires)1046   private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) {
1047     long loadReportIntervalMillis = 1983;
1048     InOrder inOrder = inOrder(helper, subchannelPool);
1049 
1050     // Create a resolution list with a mixture of balancer and backend addresses
1051     List<EquivalentAddressGroup> resolutionList =
1052         createResolvedServerAddresses(false, true, false);
1053     Attributes resolutionAttrs = Attributes.EMPTY;
1054     deliverResolvedAddresses(resolutionList, resolutionAttrs);
1055 
1056     inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0)));
1057 
1058     // Attempted to connect to balancer
1059     assertEquals(1, fakeOobChannels.size());
1060     ManagedChannel oobChannel = fakeOobChannels.poll();
1061     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1062     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1063     assertEquals(1, lbRequestObservers.size());
1064     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
1065 
1066     verify(lbRequestObserver).onNext(
1067         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1068                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1069             .build()));
1070     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
1071     // We don't care if runSerialized() has been run.
1072     inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class));
1073     inOrder.verifyNoMoreInteractions();
1074 
1075     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1076     fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS);
1077     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1078 
1079     //////////////////////////////////
1080     // Fallback timer expires (or not)
1081     //////////////////////////////////
1082     if (timerExpires) {
1083       fakeClock.forwardTime(1, TimeUnit.MILLISECONDS);
1084       assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1085       // Fall back to the backends from resolver
1086       fallbackTestVerifyUseOfFallbackBackendLists(
1087           inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
1088 
1089       assertFalse(oobChannel.isShutdown());
1090       verify(lbRequestObserver, never()).onCompleted();
1091     }
1092 
1093     ////////////////////////////////////////////////////////
1094     // Name resolver sends new list without any backend addr
1095     ////////////////////////////////////////////////////////
1096     resolutionList = createResolvedServerAddresses(true, true);
1097     deliverResolvedAddresses(resolutionList, resolutionAttrs);
1098 
1099     // New addresses are updated to the OobChannel
1100     inOrder.verify(helper).updateOobChannelAddresses(
1101         same(oobChannel),
1102         eq(new EquivalentAddressGroup(
1103                 Arrays.asList(
1104                     resolutionList.get(0).getAddresses().get(0),
1105                     resolutionList.get(1).getAddresses().get(0)),
1106                 lbAttributes(lbAuthority(0)))));
1107 
1108     if (timerExpires) {
1109       // Still in fallback logic, except that the backend list is empty
1110       fallbackTestVerifyUseOfFallbackBackendLists(
1111           inOrder, Collections.<EquivalentAddressGroup>emptyList());
1112     }
1113 
1114     //////////////////////////////////////////////////
1115     // Name resolver sends new list with backend addrs
1116     //////////////////////////////////////////////////
1117     resolutionList = createResolvedServerAddresses(true, false, false);
1118     deliverResolvedAddresses(resolutionList, resolutionAttrs);
1119 
1120     // New LB address is updated to the OobChannel
1121     inOrder.verify(helper).updateOobChannelAddresses(
1122         same(oobChannel),
1123         eq(resolutionList.get(0)));
1124 
1125     if (timerExpires) {
1126       // New backend addresses are used for fallback
1127       fallbackTestVerifyUseOfFallbackBackendLists(
1128           inOrder, Arrays.asList(resolutionList.get(1), resolutionList.get(2)));
1129     }
1130 
1131     ////////////////////////////////////////////////
1132     // Break the LB stream after the timer expires
1133     ////////////////////////////////////////////////
1134     if (timerExpires) {
1135       Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
1136       lbResponseObserver.onError(streamError.asException());
1137 
1138       // The error will NOT propagate to picker because fallback list is in use.
1139       inOrder.verify(helper, never())
1140           .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1141       // A new stream is created
1142       verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
1143       lbResponseObserver = lbResponseObserverCaptor.getValue();
1144       assertEquals(1, lbRequestObservers.size());
1145       lbRequestObserver = lbRequestObservers.poll();
1146       verify(lbRequestObserver).onNext(
1147           eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1148                   InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1149               .build()));
1150     }
1151 
1152     /////////////////////////////////
1153     // Balancer returns a server list
1154     /////////////////////////////////
1155     List<ServerEntry> serverList = Arrays.asList(
1156         new ServerEntry("127.0.0.1", 2000, "token0001"),
1157         new ServerEntry("127.0.0.1", 2010, "token0002"));
1158     lbResponseObserver.onNext(buildInitialResponse());
1159     lbResponseObserver.onNext(buildLbResponse(serverList));
1160 
1161     // Balancer-provided server list now in effect
1162     fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList);
1163 
1164     ///////////////////////////////////////////////////////////////
1165     // New backend addresses from resolver outside of fallback mode
1166     ///////////////////////////////////////////////////////////////
1167     resolutionList = createResolvedServerAddresses(true, false);
1168     deliverResolvedAddresses(resolutionList, resolutionAttrs);
1169     // Will not affect the round robin list at all
1170     inOrder.verify(helper, never())
1171         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1172 
1173     // No fallback timeout timer scheduled.
1174     assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1175   }
1176 
1177   @Test
grpclbFallback_breakLbStreamBeforeFallbackTimerExpires()1178   public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() {
1179     long loadReportIntervalMillis = 1983;
1180     InOrder inOrder = inOrder(helper, subchannelPool);
1181 
1182     // Create a resolution list with a mixture of balancer and backend addresses
1183     List<EquivalentAddressGroup> resolutionList =
1184         createResolvedServerAddresses(false, true, false);
1185     Attributes resolutionAttrs = Attributes.EMPTY;
1186     deliverResolvedAddresses(resolutionList, resolutionAttrs);
1187 
1188     inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0)));
1189 
1190     // Attempted to connect to balancer
1191     assertEquals(1, fakeOobChannels.size());
1192     ManagedChannel oobChannel = fakeOobChannels.poll();
1193     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1194     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1195     assertEquals(1, lbRequestObservers.size());
1196     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
1197 
1198     verify(lbRequestObserver).onNext(
1199         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1200                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1201             .build()));
1202     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
1203     // We don't care if runSerialized() has been run.
1204     inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class));
1205 
1206     inOrder.verifyNoMoreInteractions();
1207 
1208     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1209 
1210     /////////////////////////////////////////////
1211     // Break the LB stream before timer expires
1212     /////////////////////////////////////////////
1213     Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
1214     lbResponseObserver.onError(streamError.asException());
1215 
1216     // Fall back to the backends from resolver
1217     fallbackTestVerifyUseOfFallbackBackendLists(
1218         inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
1219 
1220     // A new stream is created
1221     verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
1222     lbResponseObserver = lbResponseObserverCaptor.getValue();
1223     assertEquals(1, lbRequestObservers.size());
1224     lbRequestObserver = lbRequestObservers.poll();
1225     verify(lbRequestObserver).onNext(
1226         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1227                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1228             .build()));
1229   }
1230 
1231   @Test
grpclbFallback_balancerLost()1232   public void grpclbFallback_balancerLost() {
1233     subtestGrpclbFallbackConnectionLost(true, false);
1234   }
1235 
1236   @Test
grpclbFallback_subchannelsLost()1237   public void grpclbFallback_subchannelsLost() {
1238     subtestGrpclbFallbackConnectionLost(false, true);
1239   }
1240 
1241   @Test
grpclbFallback_allLost()1242   public void grpclbFallback_allLost() {
1243     subtestGrpclbFallbackConnectionLost(true, true);
1244   }
1245 
1246   // Fallback outside of the initial timeout, where all connections are lost.
subtestGrpclbFallbackConnectionLost( boolean balancerBroken, boolean allSubchannelsBroken)1247   private void subtestGrpclbFallbackConnectionLost(
1248       boolean balancerBroken, boolean allSubchannelsBroken) {
1249     long loadReportIntervalMillis = 1983;
1250     InOrder inOrder = inOrder(helper, mockLbService, subchannelPool);
1251 
1252     // Create a resolution list with a mixture of balancer and backend addresses
1253     List<EquivalentAddressGroup> resolutionList =
1254         createResolvedServerAddresses(false, true, false);
1255     Attributes resolutionAttrs = Attributes.EMPTY;
1256     deliverResolvedAddresses(resolutionList, resolutionAttrs);
1257 
1258     inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0)));
1259 
1260     // Attempted to connect to balancer
1261     assertEquals(1, fakeOobChannels.size());
1262     fakeOobChannels.poll();
1263     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1264     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1265     assertEquals(1, lbRequestObservers.size());
1266     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
1267 
1268     verify(lbRequestObserver).onNext(
1269         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1270                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1271             .build()));
1272     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
1273     // We don't care if runSerialized() has been run.
1274     inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class));
1275     inOrder.verifyNoMoreInteractions();
1276 
1277     // Balancer returns a server list
1278     List<ServerEntry> serverList = Arrays.asList(
1279         new ServerEntry("127.0.0.1", 2000, "token0001"),
1280         new ServerEntry("127.0.0.1", 2010, "token0002"));
1281     lbResponseObserver.onNext(buildInitialResponse());
1282     lbResponseObserver.onNext(buildLbResponse(serverList));
1283 
1284     List<Subchannel> subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList);
1285 
1286     // Break connections
1287     if (balancerBroken) {
1288       lbResponseObserver.onError(Status.UNAVAILABLE.asException());
1289       // A new stream to LB is created
1290       inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1291       lbResponseObserver = lbResponseObserverCaptor.getValue();
1292       assertEquals(1, lbRequestObservers.size());
1293       lbRequestObserver = lbRequestObservers.poll();
1294     }
1295     if (allSubchannelsBroken) {
1296       for (Subchannel subchannel : subchannels) {
1297         // A READY subchannel transits to IDLE when receiving a go-away
1298         deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
1299       }
1300     }
1301 
1302     if (balancerBroken && allSubchannelsBroken) {
1303       // Going into fallback
1304       subchannels = fallbackTestVerifyUseOfFallbackBackendLists(
1305           inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
1306 
1307       // When in fallback mode, fallback timer should not be scheduled when all backend
1308       // connections are lost
1309       for (Subchannel subchannel : subchannels) {
1310         deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
1311       }
1312 
1313       // Exit fallback mode or cancel fallback timer when receiving a new server list from balancer
1314       List<ServerEntry> serverList2 = Arrays.asList(
1315           new ServerEntry("127.0.0.1", 2001, "token0003"),
1316           new ServerEntry("127.0.0.1", 2011, "token0004"));
1317       lbResponseObserver.onNext(buildInitialResponse());
1318       lbResponseObserver.onNext(buildLbResponse(serverList2));
1319 
1320       fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList2);
1321     }
1322     assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1323 
1324     if (!(balancerBroken && allSubchannelsBroken)) {
1325       verify(subchannelPool, never()).takeOrCreateSubchannel(
1326           eq(resolutionList.get(0)), any(Attributes.class));
1327       verify(subchannelPool, never()).takeOrCreateSubchannel(
1328           eq(resolutionList.get(2)), any(Attributes.class));
1329     }
1330   }
1331 
fallbackTestVerifyUseOfFallbackBackendLists( InOrder inOrder, List<EquivalentAddressGroup> addrs)1332   private List<Subchannel> fallbackTestVerifyUseOfFallbackBackendLists(
1333       InOrder inOrder, List<EquivalentAddressGroup> addrs) {
1334     return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, null);
1335   }
1336 
fallbackTestVerifyUseOfBalancerBackendLists( InOrder inOrder, List<ServerEntry> servers)1337   private List<Subchannel> fallbackTestVerifyUseOfBalancerBackendLists(
1338       InOrder inOrder, List<ServerEntry> servers) {
1339     ArrayList<EquivalentAddressGroup> addrs = new ArrayList<>();
1340     ArrayList<String> tokens = new ArrayList<>();
1341     for (ServerEntry server : servers) {
1342       addrs.add(new EquivalentAddressGroup(server.addr, LB_BACKEND_ATTRS));
1343       tokens.add(server.token);
1344     }
1345     return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, tokens);
1346   }
1347 
fallbackTestVerifyUseOfBackendLists( InOrder inOrder, List<EquivalentAddressGroup> addrs, @Nullable List<String> tokens)1348   private List<Subchannel> fallbackTestVerifyUseOfBackendLists(
1349       InOrder inOrder, List<EquivalentAddressGroup> addrs,
1350       @Nullable List<String> tokens) {
1351     if (tokens != null) {
1352       assertEquals(addrs.size(), tokens.size());
1353     }
1354     for (EquivalentAddressGroup addr : addrs) {
1355       inOrder.verify(subchannelPool).takeOrCreateSubchannel(eq(addr), any(Attributes.class));
1356     }
1357     RoundRobinPicker picker = (RoundRobinPicker) currentPicker;
1358     assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null));
1359     assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY);
1360     assertEquals(addrs.size(), mockSubchannels.size());
1361     ArrayList<Subchannel> subchannels = new ArrayList<>(mockSubchannels);
1362     mockSubchannels.clear();
1363     for (Subchannel subchannel : subchannels) {
1364       deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
1365     }
1366     inOrder.verify(helper, atLeast(0))
1367         .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
1368     inOrder.verify(helper, never())
1369         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1370 
1371     ArrayList<BackendEntry> pickList = new ArrayList<>();
1372     for (int i = 0; i < addrs.size(); i++) {
1373       Subchannel subchannel = subchannels.get(i);
1374       BackendEntry backend;
1375       if (tokens == null) {
1376         backend = new BackendEntry(subchannel);
1377       } else {
1378         backend = new BackendEntry(subchannel, getLoadRecorder(), tokens.get(i));
1379       }
1380       pickList.add(backend);
1381       deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
1382       inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
1383       picker = (RoundRobinPicker) pickerCaptor.getValue();
1384       assertThat(picker.dropList)
1385           .containsExactlyElementsIn(Collections.nCopies(addrs.size(), null));
1386       assertThat(picker.pickList).containsExactlyElementsIn(pickList);
1387       inOrder.verify(helper, never())
1388           .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1389     }
1390     return subchannels;
1391   }
1392 
1393   @Test
grpclbMultipleAuthorities()1394   public void grpclbMultipleAuthorities() throws Exception {
1395     List<EquivalentAddressGroup> grpclbResolutionList = Arrays.asList(
1396         new EquivalentAddressGroup(
1397             new FakeSocketAddress("fake-address-1"),
1398             lbAttributes("fake-authority-1")),
1399         new EquivalentAddressGroup(
1400             new FakeSocketAddress("fake-address-2"),
1401             lbAttributes("fake-authority-2")),
1402         new EquivalentAddressGroup(
1403             new FakeSocketAddress("not-a-lb-address")),
1404         new EquivalentAddressGroup(
1405             new FakeSocketAddress("fake-address-3"),
1406             lbAttributes("fake-authority-1")));
1407     final EquivalentAddressGroup goldenOobChannelEag = new EquivalentAddressGroup(
1408         Arrays.<SocketAddress>asList(
1409             new FakeSocketAddress("fake-address-1"),
1410             new FakeSocketAddress("fake-address-3")),
1411         lbAttributes("fake-authority-1")); // Supporting multiple authorities would be good, one day
1412 
1413     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
1414     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
1415 
1416     verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1");
1417   }
1418 
1419   @Test
grpclbBalancerStreamRetry()1420   public void grpclbBalancerStreamRetry() throws Exception {
1421     LoadBalanceRequest expectedInitialRequest =
1422         LoadBalanceRequest.newBuilder()
1423             .setInitialRequest(
1424                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1425             .build();
1426     InOrder inOrder =
1427         inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
1428     List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
1429     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
1430     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
1431 
1432     assertEquals(1, fakeOobChannels.size());
1433     @SuppressWarnings("unused")
1434     ManagedChannel oobChannel = fakeOobChannels.poll();
1435 
1436     // First balancer RPC
1437     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1438     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1439     assertEquals(1, lbRequestObservers.size());
1440     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
1441     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1442     assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1443 
1444     // Balancer closes it immediately (erroneously)
1445     lbResponseObserver.onCompleted();
1446     // Will start backoff sequence 1 (10ns)
1447     inOrder.verify(backoffPolicyProvider).get();
1448     inOrder.verify(backoffPolicy1).nextBackoffNanos();
1449     assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1450 
1451     // Fast-forward to a moment before the retry
1452     fakeClock.forwardNanos(9);
1453     verifyNoMoreInteractions(mockLbService);
1454     // Then time for retry
1455     fakeClock.forwardNanos(1);
1456     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1457     lbResponseObserver = lbResponseObserverCaptor.getValue();
1458     assertEquals(1, lbRequestObservers.size());
1459     lbRequestObserver = lbRequestObservers.poll();
1460     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1461     assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1462 
1463     // Balancer closes it with an error.
1464     lbResponseObserver.onError(Status.UNAVAILABLE.asException());
1465     // Will continue the backoff sequence 1 (100ns)
1466     verifyNoMoreInteractions(backoffPolicyProvider);
1467     inOrder.verify(backoffPolicy1).nextBackoffNanos();
1468     assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1469 
1470     // Fast-forward to a moment before the retry
1471     fakeClock.forwardNanos(100 - 1);
1472     verifyNoMoreInteractions(mockLbService);
1473     // Then time for retry
1474     fakeClock.forwardNanos(1);
1475     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1476     lbResponseObserver = lbResponseObserverCaptor.getValue();
1477     assertEquals(1, lbRequestObservers.size());
1478     lbRequestObserver = lbRequestObservers.poll();
1479     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1480     assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1481 
1482     // Balancer sends initial response.
1483     lbResponseObserver.onNext(buildInitialResponse());
1484 
1485     // Then breaks the RPC
1486     lbResponseObserver.onError(Status.UNAVAILABLE.asException());
1487 
1488     // Will reset the retry sequence and retry immediately, because balancer has responded.
1489     inOrder.verify(backoffPolicyProvider).get();
1490     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1491     lbResponseObserver = lbResponseObserverCaptor.getValue();
1492     assertEquals(1, lbRequestObservers.size());
1493     lbRequestObserver = lbRequestObservers.poll();
1494     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1495 
1496     // Fail the retry after spending 4ns
1497     fakeClock.forwardNanos(4);
1498     lbResponseObserver.onError(Status.UNAVAILABLE.asException());
1499 
1500     // Will be on the first retry (10ns) of backoff sequence 2.
1501     inOrder.verify(backoffPolicy2).nextBackoffNanos();
1502     assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1503 
1504     // Fast-forward to a moment before the retry, the time spent in the last try is deducted.
1505     fakeClock.forwardNanos(10 - 4 - 1);
1506     verifyNoMoreInteractions(mockLbService);
1507     // Then time for retry
1508     fakeClock.forwardNanos(1);
1509     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1510     assertEquals(1, lbRequestObservers.size());
1511     lbRequestObserver = lbRequestObservers.poll();
1512     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1513     assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1514 
1515     // Wrapping up
1516     verify(backoffPolicyProvider, times(2)).get();
1517     verify(backoffPolicy1, times(2)).nextBackoffNanos();
1518     verify(backoffPolicy2, times(1)).nextBackoffNanos();
1519   }
1520 
deliverSubchannelState( final Subchannel subchannel, final ConnectivityStateInfo newState)1521   private void deliverSubchannelState(
1522       final Subchannel subchannel, final ConnectivityStateInfo newState) {
1523     channelExecutor.execute(new Runnable() {
1524         @Override
1525         public void run() {
1526           balancer.handleSubchannelState(subchannel, newState);
1527         }
1528       });
1529   }
1530 
deliverNameResolutionError(final Status error)1531   private void deliverNameResolutionError(final Status error) {
1532     channelExecutor.execute(new Runnable() {
1533         @Override
1534         public void run() {
1535           balancer.handleNameResolutionError(error);
1536         }
1537       });
1538   }
1539 
deliverResolvedAddresses( final List<EquivalentAddressGroup> addrs, final Attributes attrs)1540   private void deliverResolvedAddresses(
1541       final List<EquivalentAddressGroup> addrs, final Attributes attrs) {
1542     channelExecutor.execute(new Runnable() {
1543         @Override
1544         public void run() {
1545           balancer.handleResolvedAddressGroups(addrs, attrs);
1546         }
1547       });
1548   }
1549 
getLoadRecorder()1550   private GrpclbClientLoadRecorder getLoadRecorder() {
1551     return balancer.getGrpclbState().getLoadRecorder();
1552   }
1553 
createResolvedServerAddresses(boolean ... isLb)1554   private static List<EquivalentAddressGroup> createResolvedServerAddresses(boolean ... isLb) {
1555     ArrayList<EquivalentAddressGroup> list = new ArrayList<>();
1556     for (int i = 0; i < isLb.length; i++) {
1557       SocketAddress addr = new FakeSocketAddress("fake-address-" + i);
1558       EquivalentAddressGroup eag =
1559           new EquivalentAddressGroup(
1560               addr,
1561               isLb[i] ? lbAttributes(lbAuthority(i)) : Attributes.EMPTY);
1562       list.add(eag);
1563     }
1564     return list;
1565   }
1566 
lbAuthority(int unused)1567   private static String lbAuthority(int unused) {
1568     // TODO(ejona): Support varying authorities
1569     return "lb.google.com";
1570   }
1571 
lbAttributes(String authority)1572   private static Attributes lbAttributes(String authority) {
1573     return Attributes.newBuilder()
1574         .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority)
1575         .build();
1576   }
1577 
buildInitialResponse()1578   private static LoadBalanceResponse buildInitialResponse() {
1579     return buildInitialResponse(0);
1580   }
1581 
buildInitialResponse(long loadReportIntervalMillis)1582   private static LoadBalanceResponse buildInitialResponse(long loadReportIntervalMillis) {
1583     return LoadBalanceResponse.newBuilder()
1584         .setInitialResponse(
1585             InitialLoadBalanceResponse.newBuilder()
1586             .setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis)))
1587         .build();
1588   }
1589 
buildLbResponse(List<ServerEntry> servers)1590   private static LoadBalanceResponse buildLbResponse(List<ServerEntry> servers) {
1591     ServerList.Builder serverListBuilder = ServerList.newBuilder();
1592     for (ServerEntry server : servers) {
1593       if (server.addr != null) {
1594         serverListBuilder.addServers(Server.newBuilder()
1595             .setIpAddress(ByteString.copyFrom(server.addr.getAddress().getAddress()))
1596             .setPort(server.addr.getPort())
1597             .setLoadBalanceToken(server.token)
1598             .build());
1599       } else {
1600         serverListBuilder.addServers(Server.newBuilder()
1601             .setDrop(true)
1602             .setLoadBalanceToken(server.token)
1603             .build());
1604       }
1605     }
1606     return LoadBalanceResponse.newBuilder()
1607         .setServerList(serverListBuilder.build())
1608         .build();
1609   }
1610 
1611   private static class ServerEntry {
1612     final InetSocketAddress addr;
1613     final String token;
1614 
ServerEntry(String host, int port, String token)1615     ServerEntry(String host, int port, String token) {
1616       this.addr = new InetSocketAddress(host, port);
1617       this.token = token;
1618     }
1619 
1620     // Drop entry
ServerEntry(String token)1621     ServerEntry(String token) {
1622       this.addr = null;
1623       this.token = token;
1624     }
1625   }
1626 }
1627