• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2019 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.xds.orca;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static com.google.common.truth.Truth.assertThat;
22 import static io.grpc.ConnectivityState.CONNECTING;
23 import static io.grpc.ConnectivityState.IDLE;
24 import static io.grpc.ConnectivityState.READY;
25 import static io.grpc.ConnectivityState.SHUTDOWN;
26 import static org.junit.Assert.fail;
27 import static org.mockito.AdditionalAnswers.delegatesTo;
28 import static org.mockito.ArgumentMatchers.argThat;
29 import static org.mockito.ArgumentMatchers.eq;
30 import static org.mockito.Mockito.atLeast;
31 import static org.mockito.Mockito.inOrder;
32 import static org.mockito.Mockito.mock;
33 import static org.mockito.Mockito.times;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.verifyNoInteractions;
36 import static org.mockito.Mockito.verifyNoMoreInteractions;
37 import static org.mockito.Mockito.when;
38 
39 import com.github.xds.data.orca.v3.OrcaLoadReport;
40 import com.github.xds.service.orca.v3.OpenRcaServiceGrpc;
41 import com.github.xds.service.orca.v3.OrcaLoadReportRequest;
42 import com.google.common.util.concurrent.MoreExecutors;
43 import com.google.protobuf.util.Durations;
44 import io.grpc.Attributes;
45 import io.grpc.Channel;
46 import io.grpc.ChannelLogger;
47 import io.grpc.ConnectivityState;
48 import io.grpc.ConnectivityStateInfo;
49 import io.grpc.Context;
50 import io.grpc.Context.CancellationListener;
51 import io.grpc.EquivalentAddressGroup;
52 import io.grpc.LoadBalancer;
53 import io.grpc.LoadBalancer.CreateSubchannelArgs;
54 import io.grpc.LoadBalancer.Helper;
55 import io.grpc.LoadBalancer.Subchannel;
56 import io.grpc.LoadBalancer.SubchannelPicker;
57 import io.grpc.LoadBalancer.SubchannelStateListener;
58 import io.grpc.ManagedChannel;
59 import io.grpc.Status;
60 import io.grpc.SynchronizationContext;
61 import io.grpc.inprocess.InProcessChannelBuilder;
62 import io.grpc.inprocess.InProcessServerBuilder;
63 import io.grpc.internal.BackoffPolicy;
64 import io.grpc.internal.FakeClock;
65 import io.grpc.services.MetricReport;
66 import io.grpc.stub.StreamObserver;
67 import io.grpc.testing.GrpcCleanupRule;
68 import io.grpc.util.ForwardingLoadBalancerHelper;
69 import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
70 import io.grpc.xds.orca.OrcaOobUtil.OrcaReportingConfig;
71 import io.grpc.xds.orca.OrcaOobUtil.SubchannelImpl;
72 import java.net.SocketAddress;
73 import java.text.MessageFormat;
74 import java.util.ArrayDeque;
75 import java.util.ArrayList;
76 import java.util.Arrays;
77 import java.util.List;
78 import java.util.Queue;
79 import java.util.Random;
80 import java.util.concurrent.ScheduledExecutorService;
81 import java.util.concurrent.TimeUnit;
82 import java.util.concurrent.atomic.AtomicReference;
83 import org.junit.After;
84 import org.junit.Before;
85 import org.junit.Rule;
86 import org.junit.Test;
87 import org.junit.runner.RunWith;
88 import org.junit.runners.JUnit4;
89 import org.mockito.ArgumentCaptor;
90 import org.mockito.InOrder;
91 import org.mockito.Mock;
92 import org.mockito.junit.MockitoJUnit;
93 import org.mockito.junit.MockitoRule;
94 
95 /**
96  * Unit tests for {@link OrcaOobUtil} class.
97  */
98 @RunWith(JUnit4.class)
99 public class OrcaOobUtilTest {
100 
101   private static final int NUM_SUBCHANNELS = 2;
102   private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY =
103       Attributes.Key.create("subchannel-attr-for-test");
104   private static final OrcaReportingConfig SHORT_INTERVAL_CONFIG =
105       OrcaReportingConfig.newBuilder().setReportInterval(5L, TimeUnit.NANOSECONDS).build();
106   private static final OrcaReportingConfig MEDIUM_INTERVAL_CONFIG =
107       OrcaReportingConfig.newBuilder().setReportInterval(543L, TimeUnit.MICROSECONDS).build();
108   private static final OrcaReportingConfig LONG_INTERVAL_CONFIG =
109       OrcaReportingConfig.newBuilder().setReportInterval(1232L, TimeUnit.MILLISECONDS).build();
110   @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
111   @Rule public final MockitoRule mocks = MockitoJUnit.rule();
112 
113   @SuppressWarnings({"rawtypes", "unchecked"})
114   private final List<EquivalentAddressGroup>[] eagLists = new List[NUM_SUBCHANNELS];
115   private final SubchannelStateListener[] mockStateListeners =
116       new SubchannelStateListener[NUM_SUBCHANNELS];
117   private final ManagedChannel[] channels = new ManagedChannel[NUM_SUBCHANNELS];
118   private final OpenRcaServiceImp[] orcaServiceImps = new OpenRcaServiceImp[NUM_SUBCHANNELS];
119   private final SynchronizationContext syncContext = new SynchronizationContext(
120       new Thread.UncaughtExceptionHandler() {
121         @Override
122         public void uncaughtException(Thread t, Throwable e) {
123           throw new AssertionError(e);
124         }
125       });
126 
127   private final FakeClock fakeClock = new FakeClock();
128   private final Helper origHelper = mock(Helper.class, delegatesTo(new FakeHelper()));
129   @Mock
130   private OrcaOobReportListener mockOrcaListener0;
131   @Mock
132   private OrcaOobReportListener mockOrcaListener1;
133   @Mock
134   private OrcaOobReportListener mockOrcaListener2;
135   @Mock private BackoffPolicy.Provider backoffPolicyProvider;
136   @Mock private BackoffPolicy backoffPolicy1;
137   @Mock private BackoffPolicy backoffPolicy2;
138   private FakeSubchannel[] subchannels = new FakeSubchannel[NUM_SUBCHANNELS];
139   private LoadBalancer.Helper orcaHelper;
140   private LoadBalancer.Helper parentHelper;
141   private LoadBalancer.Helper childHelper;
142   private Subchannel savedParentSubchannel;
143 
unwrap(Subchannel s)144   private static FakeSubchannel unwrap(Subchannel s) {
145     return (FakeSubchannel) ((SubchannelImpl) s).delegate();
146   }
147 
buildOrcaRequestFromConfig( OrcaReportingConfig config)148   private static OrcaLoadReportRequest buildOrcaRequestFromConfig(
149       OrcaReportingConfig config) {
150     return OrcaLoadReportRequest.newBuilder()
151         .setReportInterval(Durations.fromNanos(config.getReportIntervalNanos()))
152         .build();
153   }
154 
assertLog(List<String> logs, String expectedLog)155   private static void assertLog(List<String> logs, String expectedLog) {
156     assertThat(logs).containsExactly(expectedLog);
157     logs.clear();
158   }
159 
160   @After
tearDown()161   public void tearDown() {
162     for (int i = 0; i < NUM_SUBCHANNELS; i++) {
163       if (subchannels[i] != null) {
164         subchannels[i].shutdown();
165       }
166     }
167   }
168 
169   @Test
orcaReportingConfig_construct()170   public void orcaReportingConfig_construct() {
171     int interval = new Random().nextInt(Integer.MAX_VALUE);
172     OrcaReportingConfig config =
173         OrcaReportingConfig.newBuilder()
174             .setReportInterval(interval, TimeUnit.MICROSECONDS)
175             .build();
176     assertThat(config.getReportIntervalNanos()).isEqualTo(TimeUnit.MICROSECONDS.toNanos(interval));
177     String str = config.toString();
178     assertThat(str).contains("reportIntervalNanos=");
179     OrcaReportingConfig rebuildedConfig = config.toBuilder().build();
180     assertThat(rebuildedConfig.getReportIntervalNanos())
181         .isEqualTo(TimeUnit.MICROSECONDS.toNanos(interval));
182   }
183 
184   @Before
setUp()185   public void setUp() throws Exception {
186     for (int i = 0; i < NUM_SUBCHANNELS; i++) {
187       orcaServiceImps[i] = new OpenRcaServiceImp();
188       cleanupRule.register(
189           InProcessServerBuilder.forName("orca-reporting-test-" + i)
190               .addService(orcaServiceImps[i])
191               .directExecutor()
192               .build()
193               .start());
194       ManagedChannel channel =
195           cleanupRule.register(
196               InProcessChannelBuilder.forName("orca-reporting-test-" + i).directExecutor().build());
197       channels[i] = channel;
198       EquivalentAddressGroup eag =
199           new EquivalentAddressGroup(new FakeSocketAddress("address-" + i));
200       List<EquivalentAddressGroup> eagList = Arrays.asList(eag);
201       eagLists[i] = eagList;
202       mockStateListeners[i] = mock(SubchannelStateListener.class);
203     }
204 
205     when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
206     when(backoffPolicy1.nextBackoffNanos()).thenReturn(11L, 21L);
207     when(backoffPolicy2.nextBackoffNanos()).thenReturn(12L, 22L);
208 
209     orcaHelper =
210         OrcaOobUtil.newOrcaReportingHelper(
211             origHelper,
212             backoffPolicyProvider,
213             fakeClock.getStopwatchSupplier());
214     parentHelper =
215         new ForwardingLoadBalancerHelper() {
216           @Override
217           protected Helper delegate() {
218             return orcaHelper;
219           }
220 
221           @Override
222           public Subchannel createSubchannel(CreateSubchannelArgs args) {
223             Subchannel subchannel = super.createSubchannel(args);
224             savedParentSubchannel = subchannel;
225             return subchannel;
226           }
227         };
228     childHelper =
229         OrcaOobUtil.newOrcaReportingHelper(
230             parentHelper,
231             backoffPolicyProvider,
232             fakeClock.getStopwatchSupplier());
233   }
234 
235   @Test
singlePolicyTypicalWorkflow()236   public void singlePolicyTypicalWorkflow() {
237     verify(origHelper, atLeast(0)).getSynchronizationContext();
238     verifyNoMoreInteractions(origHelper);
239 
240     // Calling createSubchannel() on orcaHelper correctly passes augmented CreateSubchannelArgs
241     // to origHelper.
242     ArgumentCaptor<CreateSubchannelArgs> createArgsCaptor =
243         ArgumentCaptor.forClass(CreateSubchannelArgs.class);
244     for (int i = 0; i < NUM_SUBCHANNELS; i++) {
245       String subchannelAttrValue = "eag attr " + i;
246       Attributes attrs =
247           Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build();
248       Subchannel created = createSubchannel(orcaHelper, i, attrs);
249       assertThat(unwrap(created)).isSameInstanceAs(subchannels[i]);
250       setOrcaReportConfig(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
251       verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture());
252       assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]);
253       assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY))
254           .isEqualTo(subchannelAttrValue);
255     }
256 
257     // ORCA reporting does not start until underlying Subchannel is READY.
258     for (int i = 0; i < NUM_SUBCHANNELS; i++) {
259       FakeSubchannel subchannel = subchannels[i];
260       OpenRcaServiceImp orcaServiceImp = orcaServiceImps[i];
261       SubchannelStateListener mockStateListener = mockStateListeners[i];
262       InOrder inOrder = inOrder(mockStateListener);
263       deliverSubchannelState(i, ConnectivityStateInfo.forNonError(IDLE));
264       deliverSubchannelState(i, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
265       deliverSubchannelState(i, ConnectivityStateInfo.forNonError(CONNECTING));
266 
267       inOrder.verify(mockStateListener)
268           .onSubchannelState(eq(ConnectivityStateInfo.forNonError(IDLE)));
269       inOrder.verify(mockStateListener)
270           .onSubchannelState(eq(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)));
271       inOrder.verify(mockStateListener)
272           .onSubchannelState(eq(ConnectivityStateInfo.forNonError(CONNECTING)));
273       verifyNoMoreInteractions(mockStateListener);
274 
275       assertThat(subchannel.logs).isEmpty();
276       assertThat(orcaServiceImp.calls).isEmpty();
277       verifyNoMoreInteractions(mockOrcaListener0);
278       deliverSubchannelState(i, ConnectivityStateInfo.forNonError(READY));
279       verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
280       assertThat(orcaServiceImp.calls).hasSize(1);
281       ServerSideCall serverCall = orcaServiceImp.calls.peek();
282       assertThat(serverCall.request).isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
283       assertLog(subchannel.logs,
284           "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses());
285 
286       // Simulate an ORCA service response. Registered listener will receive an ORCA report for
287       // each backend.
288       OrcaLoadReport report = OrcaLoadReport.getDefaultInstance();
289       serverCall.responseObserver.onNext(report);
290       assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
291       verify(mockOrcaListener0, times(i + 1)).onLoadReport(
292           argThat(new OrcaPerRequestUtilTest.MetricsReportMatcher(
293               OrcaPerRequestUtil.fromOrcaLoadReport(report))));
294     }
295 
296     for (int i = 0; i < NUM_SUBCHANNELS; i++) {
297       FakeSubchannel subchannel = subchannels[i];
298       SubchannelStateListener mockStateListener = mockStateListeners[i];
299 
300       ServerSideCall serverCall = orcaServiceImps[i].calls.peek();
301       assertThat(serverCall.cancelled).isFalse();
302       verifyNoMoreInteractions(mockStateListener);
303 
304       // Shutting down the subchannel will cancel the ORCA reporting RPC.
305       subchannel.shutdown();
306       verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(SHUTDOWN)));
307       assertThat(serverCall.cancelled).isTrue();
308       assertThat(subchannel.logs).isEmpty();
309       verifyNoMoreInteractions(mockOrcaListener0);
310     }
311 
312     for (int i = 0; i < NUM_SUBCHANNELS; i++) {
313       assertThat(orcaServiceImps[i].calls).hasSize(1);
314     }
315 
316     verifyNoInteractions(backoffPolicyProvider);
317   }
318 
319   @Test
twoLevelPoliciesTypicalWorkflow()320   public void twoLevelPoliciesTypicalWorkflow() {
321     verify(origHelper, atLeast(0)).getSynchronizationContext();
322     verifyNoMoreInteractions(origHelper);
323 
324     // Calling createSubchannel() on child helper correctly passes augmented CreateSubchannelArgs
325     // to origHelper.
326     ArgumentCaptor<CreateSubchannelArgs> createArgsCaptor =
327         ArgumentCaptor.forClass(CreateSubchannelArgs.class);
328     for (int i = 0; i < NUM_SUBCHANNELS; i++) {
329       String subchannelAttrValue = "eag attr " + i;
330       Attributes attrs =
331           Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build();
332       Subchannel created = createSubchannel(childHelper, i, attrs);
333       assertThat(unwrap(((SubchannelImpl) created).delegate())).isSameInstanceAs(subchannels[i]);
334       OrcaOobUtil.setListener(created, mockOrcaListener1, SHORT_INTERVAL_CONFIG);
335       verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture());
336       assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]);
337       assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY))
338           .isEqualTo(subchannelAttrValue);
339     }
340 
341     // ORCA reporting does not start until underlying Subchannel is READY.
342     for (int i = 0; i < NUM_SUBCHANNELS; i++) {
343       FakeSubchannel subchannel = subchannels[i];
344       OpenRcaServiceImp orcaServiceImp = orcaServiceImps[i];
345       SubchannelStateListener mockStateListener = mockStateListeners[i];
346       InOrder inOrder = inOrder(mockStateListener);
347       deliverSubchannelState(i, ConnectivityStateInfo.forNonError(IDLE));
348       deliverSubchannelState(i, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
349       deliverSubchannelState(i, ConnectivityStateInfo.forNonError(CONNECTING));
350 
351       inOrder
352           .verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(IDLE)));
353       inOrder
354           .verify(mockStateListener)
355           .onSubchannelState(eq(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)));
356       inOrder
357           .verify(mockStateListener)
358           .onSubchannelState(eq(ConnectivityStateInfo.forNonError(CONNECTING)));
359       verifyNoMoreInteractions(mockStateListener);
360 
361       assertThat(subchannel.logs).isEmpty();
362       assertThat(orcaServiceImp.calls).isEmpty();
363       verifyNoMoreInteractions(mockOrcaListener1);
364       verifyNoMoreInteractions(mockOrcaListener2);
365       deliverSubchannelState(i, ConnectivityStateInfo.forNonError(READY));
366       verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
367       assertThat(orcaServiceImp.calls).hasSize(1);
368       ServerSideCall serverCall = orcaServiceImp.calls.peek();
369       assertThat(serverCall.request).isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
370       assertLog(subchannel.logs,
371           "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses());
372 
373       // Simulate an ORCA service response. Registered listener will receive an ORCA report for
374       // each backend.
375       OrcaLoadReport report = OrcaLoadReport.getDefaultInstance();
376       serverCall.responseObserver.onNext(report);
377       assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
378       verify(mockOrcaListener1, times(i + 1)).onLoadReport(
379           argThat(new OrcaPerRequestUtilTest.MetricsReportMatcher(
380               OrcaPerRequestUtil.fromOrcaLoadReport(report))));
381     }
382 
383     for (int i = 0; i < NUM_SUBCHANNELS; i++) {
384       FakeSubchannel subchannel = subchannels[i];
385       SubchannelStateListener mockStateListener = mockStateListeners[i];
386 
387       ServerSideCall serverCall = orcaServiceImps[i].calls.peek();
388       assertThat(serverCall.cancelled).isFalse();
389       verifyNoMoreInteractions(mockStateListener);
390 
391       // Shutting down the subchannel will cancel the ORCA reporting RPC.
392       subchannel.shutdown();
393       verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(SHUTDOWN)));
394       assertThat(serverCall.cancelled).isTrue();
395       assertThat(subchannel.logs).isEmpty();
396       verifyNoMoreInteractions(mockOrcaListener1, mockOrcaListener2);
397     }
398 
399     for (int i = 0; i < NUM_SUBCHANNELS; i++) {
400       assertThat(orcaServiceImps[i].calls).hasSize(1);
401     }
402 
403     verifyNoInteractions(backoffPolicyProvider);
404   }
405 
406   @Test
orcReportingDisabledWhenServiceNotImplemented()407   public void orcReportingDisabledWhenServiceNotImplemented() {
408     final Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
409     OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
410     FakeSubchannel subchannel = subchannels[0];
411     OpenRcaServiceImp orcaServiceImp = orcaServiceImps[0];
412     SubchannelStateListener mockStateListener = mockStateListeners[0];
413     deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
414     verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
415     assertThat(orcaServiceImp.calls).hasSize(1);
416 
417     ServerSideCall serverCall = orcaServiceImp.calls.poll();
418     assertThat(serverCall.request).isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
419     subchannel.logs.clear();
420     serverCall.responseObserver.onError(Status.UNIMPLEMENTED.asException());
421     assertLog(subchannel.logs,
422         "ERROR: OpenRcaService disabled: " + Status.UNIMPLEMENTED);
423     verifyNoMoreInteractions(mockOrcaListener0);
424 
425     // Re-connecting on Subchannel will reset the "disabled" flag and restart ORCA reporting.
426     assertThat(orcaServiceImp.calls).hasSize(0);
427     deliverSubchannelState(0, ConnectivityStateInfo.forNonError(IDLE));
428     deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
429     assertLog(subchannel.logs,
430         "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses());
431     assertThat(orcaServiceImp.calls).hasSize(1);
432     serverCall = orcaServiceImp.calls.poll();
433     OrcaLoadReport report = OrcaLoadReport.getDefaultInstance();
434     serverCall.responseObserver.onNext(report);
435     assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
436     verify(mockOrcaListener0).onLoadReport(
437         argThat(new OrcaPerRequestUtilTest.MetricsReportMatcher(
438             OrcaPerRequestUtil.fromOrcaLoadReport(report))));
439     verifyNoInteractions(backoffPolicyProvider);
440   }
441 
442   @Test
orcaReportingStreamClosedAndRetried()443   public void orcaReportingStreamClosedAndRetried() {
444     final Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
445     OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
446     FakeSubchannel subchannel = subchannels[0];
447     OpenRcaServiceImp orcaServiceImp = orcaServiceImps[0];
448     SubchannelStateListener mockStateListener = mockStateListeners[0];
449     InOrder inOrder = inOrder(mockStateListener, mockOrcaListener0, backoffPolicyProvider,
450         backoffPolicy1, backoffPolicy2);
451 
452     deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
453     inOrder
454         .verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
455     assertLog(subchannel.logs,
456         "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses());
457 
458     // Server closes the ORCA reporting RPC without any response, will start backoff
459     // sequence 1 (11ns).
460     orcaServiceImp.calls.poll().responseObserver.onCompleted();
461     assertLog(subchannel.logs,
462         "DEBUG: ORCA reporting stream closed with " + Status.OK + ", backoff in 11" + " ns");
463     inOrder.verify(backoffPolicyProvider).get();
464     inOrder.verify(backoffPolicy1).nextBackoffNanos();
465     verifyRetryAfterNanos(inOrder, orcaServiceImp, 11);
466     assertLog(subchannel.logs,
467         "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses());
468 
469     // Server closes the ORCA reporting RPC with an error, will continue backoff sequence 1 (21ns).
470     orcaServiceImp.calls.poll().responseObserver.onError(Status.UNAVAILABLE.asException());
471     assertLog(subchannel.logs,
472         "DEBUG: ORCA reporting stream closed with " + Status.UNAVAILABLE + ", backoff in 21"
473             + " ns");
474     inOrder.verify(backoffPolicy1).nextBackoffNanos();
475     verifyRetryAfterNanos(inOrder, orcaServiceImp, 21);
476     assertLog(subchannel.logs,
477         "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses());
478 
479     // Server responds normally.
480     OrcaLoadReport report = OrcaLoadReport.getDefaultInstance();
481     orcaServiceImp.calls.peek().responseObserver.onNext(report);
482     assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
483     inOrder.verify(mockOrcaListener0).onLoadReport(
484         argThat(new OrcaPerRequestUtilTest.MetricsReportMatcher(
485             OrcaPerRequestUtil.fromOrcaLoadReport(report))));
486     // Server closes the ORCA reporting RPC after a response, will restart immediately.
487     orcaServiceImp.calls.poll().responseObserver.onCompleted();
488     assertThat(subchannel.logs).containsExactly(
489         "DEBUG: ORCA reporting stream closed with " + Status.OK + ", backoff in 0" + " ns",
490         "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses());
491     subchannel.logs.clear();
492 
493     // Backoff policy is set to sequence 2 in previous retry.
494     // Server closes the ORCA reporting RPC with an error, will start backoff sequence 2 (12ns).
495     orcaServiceImp.calls.poll().responseObserver.onError(Status.UNAVAILABLE.asException());
496     assertLog(subchannel.logs,
497         "DEBUG: ORCA reporting stream closed with " + Status.UNAVAILABLE + ", backoff in 12"
498             + " ns");
499     inOrder.verify(backoffPolicyProvider).get();
500     inOrder.verify(backoffPolicy2).nextBackoffNanos();
501     verifyRetryAfterNanos(inOrder, orcaServiceImp, 12);
502     assertLog(subchannel.logs,
503         "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses());
504 
505     verifyNoMoreInteractions(mockStateListener, mockOrcaListener0, backoffPolicyProvider,
506         backoffPolicy1, backoffPolicy2);
507   }
508 
509   @Test
reportingNotStartedUntilConfigured()510   public void reportingNotStartedUntilConfigured() {
511     Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
512     deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
513     verify(mockStateListeners[0])
514         .onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
515 
516     assertThat(orcaServiceImps[0].calls).isEmpty();
517     assertThat(subchannels[0].logs).isEmpty();
518     OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
519     assertThat(orcaServiceImps[0].calls).hasSize(1);
520     assertLog(subchannels[0].logs,
521         "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
522     assertThat(orcaServiceImps[0].calls.peek().request)
523         .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
524   }
525 
526   @Test
updateListenerThrows()527   public void updateListenerThrows() {
528     Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
529     OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
530     deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
531     verify(mockStateListeners[0])
532         .onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
533 
534     assertThat(orcaServiceImps[0].calls).hasSize(1);
535     assertLog(subchannels[0].logs,
536         "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
537     assertThat(orcaServiceImps[0].calls.peek().request)
538         .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
539     assertThat(unwrap(created)).isSameInstanceAs(subchannels[0]);
540     try {
541       OrcaOobUtil.setListener(subchannels[0], mockOrcaListener1, MEDIUM_INTERVAL_CONFIG);
542       fail("Update orca listener on non-orca subchannel should fail");
543     } catch (IllegalArgumentException ex) {
544       assertThat(ex.getMessage()).isEqualTo("Subchannel does not have orca Out-Of-Band "
545           + "stream enabled. Try to use a subchannel created by OrcaOobUtil.OrcaHelper.");
546     }
547   }
548 
549   @Test
removeListener()550   public void removeListener() {
551     Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
552     OrcaOobUtil.setListener(created, null, SHORT_INTERVAL_CONFIG);
553     deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
554     verify(mockStateListeners[0])
555             .onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
556 
557     assertThat(orcaServiceImps[0].calls).isEmpty();
558     assertThat(subchannels[0].logs).isEmpty();
559     assertThat(unwrap(created)).isSameInstanceAs(subchannels[0]);
560 
561     OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
562     assertThat(orcaServiceImps[0].calls).hasSize(1);
563     assertLog(subchannels[0].logs,
564             "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
565     assertThat(orcaServiceImps[0].calls.peek().request)
566             .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
567 
568     OrcaOobUtil.setListener(created, null, null);
569     assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue();
570     assertThat(orcaServiceImps[0].calls).isEmpty();
571     assertThat(subchannels[0].logs).isEmpty();
572     assertThat(fakeClock.getPendingTasks()).isEmpty();
573     verifyNoMoreInteractions(mockOrcaListener0);
574     verifyNoInteractions(backoffPolicyProvider);
575   }
576 
577   @Test
updateReportingIntervalBeforeCreatingSubchannel()578   public void updateReportingIntervalBeforeCreatingSubchannel() {
579     Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
580     OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
581     deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
582     verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
583 
584     assertThat(orcaServiceImps[0].calls).hasSize(1);
585     assertLog(subchannels[0].logs,
586         "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
587     assertThat(orcaServiceImps[0].calls.poll().request)
588         .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
589   }
590 
591   @Test
updateReportingIntervalBeforeSubchannelReady()592   public void updateReportingIntervalBeforeSubchannelReady() {
593     Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
594     OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
595     deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
596     verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
597 
598     assertThat(orcaServiceImps[0].calls).hasSize(1);
599     assertLog(subchannels[0].logs,
600         "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
601     assertThat(orcaServiceImps[0].calls.poll().request)
602         .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
603   }
604 
605   @Test
updateReportingIntervalWhenRpcActive()606   public void updateReportingIntervalWhenRpcActive() {
607     // Sets report interval before creating a Subchannel, reporting starts right after suchannel
608     // state becomes READY.
609     Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
610     OrcaOobUtil.setListener(created, mockOrcaListener0,
611         MEDIUM_INTERVAL_CONFIG);
612     deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
613     verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
614 
615     assertThat(orcaServiceImps[0].calls).hasSize(1);
616     assertLog(subchannels[0].logs,
617         "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
618     assertThat(orcaServiceImps[0].calls.peek().request)
619         .isEqualTo(buildOrcaRequestFromConfig(MEDIUM_INTERVAL_CONFIG));
620 
621     // Make reporting less frequent.
622     OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG);
623     assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue();
624     assertThat(orcaServiceImps[0].calls).hasSize(1);
625     assertLog(subchannels[0].logs,
626         "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
627     assertThat(orcaServiceImps[0].calls.peek().request)
628         .isEqualTo(buildOrcaRequestFromConfig(LONG_INTERVAL_CONFIG));
629 
630     // Configuring with the same report interval again does not restart ORCA RPC.
631     OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG);
632     assertThat(orcaServiceImps[0].calls.peek().cancelled).isFalse();
633     assertThat(subchannels[0].logs).isEmpty();
634 
635     // Make reporting more frequent.
636     OrcaOobUtil.setListener(created, mockOrcaListener0,
637         SHORT_INTERVAL_CONFIG);
638     assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue();
639     assertThat(orcaServiceImps[0].calls).hasSize(1);
640     assertLog(subchannels[0].logs,
641         "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
642     assertThat(orcaServiceImps[0].calls.poll().request)
643         .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
644   }
645 
646   @Test
updateReportingIntervalWhenRpcPendingRetry()647   public void updateReportingIntervalWhenRpcPendingRetry() {
648     Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
649     OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
650     deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
651     verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
652 
653     assertThat(orcaServiceImps[0].calls).hasSize(1);
654     assertLog(subchannels[0].logs,
655         "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
656     assertThat(orcaServiceImps[0].calls.peek().request)
657         .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
658 
659     // Server closes the RPC without response, client will retry with backoff.
660     assertThat(fakeClock.getPendingTasks()).isEmpty();
661     orcaServiceImps[0].calls.poll().responseObserver.onCompleted();
662     assertLog(subchannels[0].logs,
663         "DEBUG: ORCA reporting stream closed with " + Status.OK + ", backoff in 11"
664             + " ns");
665     assertThat(fakeClock.getPendingTasks()).hasSize(1);
666     assertThat(orcaServiceImps[0].calls).isEmpty();
667 
668     // Make reporting less frequent.
669     OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG);
670     // Retry task will be canceled and restarts new RPC immediately.
671     assertThat(fakeClock.getPendingTasks()).isEmpty();
672     assertThat(orcaServiceImps[0].calls).hasSize(1);
673     assertLog(subchannels[0].logs,
674         "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
675     assertThat(orcaServiceImps[0].calls.peek().request)
676         .isEqualTo(buildOrcaRequestFromConfig(LONG_INTERVAL_CONFIG));
677   }
678 
679   @Test
policiesReceiveSameReportIndependently()680   public void policiesReceiveSameReportIndependently() {
681     Subchannel childSubchannel = createSubchannel(childHelper, 0, Attributes.EMPTY);
682     deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
683 
684     // No helper sets ORCA reporting interval, so load reporting is not started.
685     verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
686     assertThat(orcaServiceImps[0].calls).isEmpty();
687     assertThat(subchannels[0].logs).isEmpty();
688 
689     // Parent helper requests ORCA reports with a certain interval, load reporting starts.
690     OrcaOobUtil.setListener(savedParentSubchannel, mockOrcaListener1, SHORT_INTERVAL_CONFIG);
691     assertThat(orcaServiceImps[0].calls).hasSize(1);
692     assertLog(subchannels[0].logs,
693         "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
694 
695     OrcaLoadReport report = OrcaLoadReport.getDefaultInstance();
696     assertThat(orcaServiceImps[0].calls).hasSize(1);
697     orcaServiceImps[0].calls.peek().responseObserver.onNext(report);
698     assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report);
699     // Only parent helper's listener receives the report.
700     ArgumentCaptor<MetricReport> parentReportCaptor = ArgumentCaptor.forClass(MetricReport.class);
701     verify(mockOrcaListener1).onLoadReport(parentReportCaptor.capture());
702     assertThat(OrcaPerRequestUtilTest.reportEqual(parentReportCaptor.getValue(),
703         OrcaPerRequestUtil.fromOrcaLoadReport(report))).isTrue();
704     verifyNoMoreInteractions(mockOrcaListener2);
705 
706     // Now child helper also wants to receive reports.
707     OrcaOobUtil.setListener(childSubchannel, mockOrcaListener2, SHORT_INTERVAL_CONFIG);
708     orcaServiceImps[0].calls.peek().responseObserver.onNext(report);
709     assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report);
710     // Both helper receives the same report instance.
711     ArgumentCaptor<MetricReport> childReportCaptor = ArgumentCaptor.forClass(MetricReport.class);
712     verify(mockOrcaListener1, times(2))
713         .onLoadReport(parentReportCaptor.capture());
714     verify(mockOrcaListener2)
715         .onLoadReport(childReportCaptor.capture());
716     assertThat(childReportCaptor.getValue()).isSameInstanceAs(parentReportCaptor.getValue());
717   }
718 
719   @Test
reportWithMostFrequentIntervalRequested()720   public void reportWithMostFrequentIntervalRequested() {
721     Subchannel created = createSubchannel(childHelper, 0, Attributes.EMPTY);
722     OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG);
723     OrcaOobUtil.setListener(created, mockOrcaListener1, SHORT_INTERVAL_CONFIG);
724     deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
725     verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
726     assertThat(orcaServiceImps[0].calls).hasSize(1);
727     assertLog(subchannels[0].logs,
728         "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
729 
730     // The real report interval to be requested is the minimum of intervals requested by helpers.
731     assertThat(Durations.toNanos(orcaServiceImps[0].calls.peek().request.getReportInterval()))
732         .isEqualTo(SHORT_INTERVAL_CONFIG.getReportIntervalNanos());
733 
734     // Parent helper wants reporting to be more frequent than its current setting while it is still
735     // less frequent than parent helper. Nothing should happen on existing RPC.
736     OrcaOobUtil.setListener(savedParentSubchannel, mockOrcaListener0, MEDIUM_INTERVAL_CONFIG);
737     assertThat(orcaServiceImps[0].calls.peek().cancelled).isFalse();
738     assertThat(subchannels[0].logs).isEmpty();
739 
740     // Parent helper wants reporting to be less frequent.
741     OrcaOobUtil.setListener(created, mockOrcaListener1, MEDIUM_INTERVAL_CONFIG);
742     assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue();
743     assertThat(orcaServiceImps[0].calls).hasSize(1);
744     assertLog(subchannels[0].logs,
745         "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
746     // ORCA reporting RPC restarts and the the real report interval is adjusted.
747     assertThat(Durations.toNanos(orcaServiceImps[0].calls.poll().request.getReportInterval()))
748         .isEqualTo(MEDIUM_INTERVAL_CONFIG.getReportIntervalNanos());
749   }
750 
verifyRetryAfterNanos(InOrder inOrder, OpenRcaServiceImp orcaServiceImp, long nanos)751   private void verifyRetryAfterNanos(InOrder inOrder, OpenRcaServiceImp orcaServiceImp,
752       long nanos) {
753     assertThat(fakeClock.getPendingTasks()).hasSize(1);
754     assertThat(orcaServiceImp.calls).isEmpty();
755     fakeClock.forwardNanos(nanos - 1);
756     assertThat(orcaServiceImp.calls).isEmpty();
757     inOrder.verifyNoMoreInteractions();
758     fakeClock.forwardNanos(1);
759     assertThat(orcaServiceImp.calls).hasSize(1);
760     assertThat(fakeClock.getPendingTasks()).isEmpty();
761   }
762 
deliverSubchannelState(final int index, final ConnectivityStateInfo newState)763   private void deliverSubchannelState(final int index, final ConnectivityStateInfo newState) {
764     syncContext.execute(
765         new Runnable() {
766           @Override
767           public void run() {
768             subchannels[index].stateListener.onSubchannelState(newState);
769           }
770         });
771   }
772 
createSubchannel(final Helper helper, final int index, final Attributes attrs)773   private Subchannel createSubchannel(final Helper helper, final int index,
774       final Attributes attrs) {
775     final AtomicReference<Subchannel> newSubchannel = new AtomicReference<>();
776     syncContext.execute(
777         new Runnable() {
778           @Override
779           public void run() {
780             Subchannel s =
781                 helper.createSubchannel(
782                     CreateSubchannelArgs.newBuilder()
783                         .setAddresses(eagLists[index])
784                         .setAttributes(attrs)
785                         .build());
786             s.start(mockStateListeners[index]);
787             newSubchannel.set(s);
788           }
789         });
790     return newSubchannel.get();
791   }
792 
setOrcaReportConfig( final Subchannel subchannel, final OrcaOobReportListener listener, final OrcaReportingConfig config)793   private void setOrcaReportConfig(
794       final Subchannel subchannel,
795       final OrcaOobReportListener listener,
796       final OrcaReportingConfig config) {
797     OrcaOobUtil.setListener(subchannel, listener, config);
798   }
799 
800   private static final class OpenRcaServiceImp extends OpenRcaServiceGrpc.OpenRcaServiceImplBase {
801     final Queue<ServerSideCall> calls = new ArrayDeque<>();
802 
803     @Override
streamCoreMetrics( OrcaLoadReportRequest request, StreamObserver<OrcaLoadReport> responseObserver)804     public void streamCoreMetrics(
805         OrcaLoadReportRequest request, StreamObserver<OrcaLoadReport> responseObserver) {
806       final ServerSideCall call = new ServerSideCall(request, responseObserver);
807       Context.current()
808           .addListener(
809               new CancellationListener() {
810                 @Override
811                 public void cancelled(Context ctx) {
812                   call.cancelled = true;
813                 }
814               },
815               MoreExecutors.directExecutor());
816       calls.add(call);
817     }
818   }
819 
820   private static final class ServerSideCall {
821     final OrcaLoadReportRequest request;
822     final StreamObserver<OrcaLoadReport> responseObserver;
823     boolean cancelled;
824 
ServerSideCall(OrcaLoadReportRequest request, StreamObserver<OrcaLoadReport> responseObserver)825     ServerSideCall(OrcaLoadReportRequest request, StreamObserver<OrcaLoadReport> responseObserver) {
826       this.request = request;
827       this.responseObserver = responseObserver;
828     }
829   }
830 
831   private static final class FakeSocketAddress extends SocketAddress {
832     final String name;
833 
FakeSocketAddress(String name)834     FakeSocketAddress(String name) {
835       this.name = name;
836     }
837 
838     @Override
toString()839     public String toString() {
840       return name;
841     }
842   }
843 
844   private final class FakeSubchannel extends Subchannel {
845     final List<EquivalentAddressGroup> eagList;
846     final Attributes attrs;
847     final Channel channel;
848     final List<String> logs = new ArrayList<>();
849     final int index;
850     SubchannelStateListener stateListener;
851     private final ChannelLogger logger =
852         new ChannelLogger() {
853           @Override
854           public void log(ChannelLogLevel level, String msg) {
855             logs.add(level + ": " + msg);
856           }
857 
858           @Override
859           public void log(ChannelLogLevel level, String template, Object... args) {
860             log(level, MessageFormat.format(template, args));
861           }
862         };
863 
FakeSubchannel(int index, CreateSubchannelArgs args, Channel channel)864     FakeSubchannel(int index, CreateSubchannelArgs args, Channel channel) {
865       this.index = index;
866       this.eagList = args.getAddresses();
867       this.attrs = args.getAttributes();
868       this.channel = checkNotNull(channel);
869     }
870 
871     @Override
start(SubchannelStateListener listener)872     public void start(SubchannelStateListener listener) {
873       checkState(this.stateListener == null);
874       this.stateListener = listener;
875     }
876 
877     @Override
shutdown()878     public void shutdown() {
879       deliverSubchannelState(index, ConnectivityStateInfo.forNonError(SHUTDOWN));
880     }
881 
882     @Override
requestConnection()883     public void requestConnection() {
884       throw new AssertionError("Should not be called");
885     }
886 
887     @Override
getAllAddresses()888     public List<EquivalentAddressGroup> getAllAddresses() {
889       return eagList;
890     }
891 
892     @Override
getAttributes()893     public Attributes getAttributes() {
894       return attrs;
895     }
896 
897     @Override
asChannel()898     public Channel asChannel() {
899       return channel;
900     }
901 
902     @Override
getChannelLogger()903     public ChannelLogger getChannelLogger() {
904       return logger;
905     }
906   }
907 
908   private final class FakeHelper extends Helper {
909     @Override
createSubchannel(CreateSubchannelArgs args)910     public Subchannel createSubchannel(CreateSubchannelArgs args) {
911       int index = -1;
912       for (int i = 0; i < NUM_SUBCHANNELS; i++) {
913         if (eagLists[i].equals(args.getAddresses())) {
914           index = i;
915           break;
916         }
917       }
918       checkState(index >= 0, "addrs " + args.getAddresses() + " not found");
919       FakeSubchannel subchannel = new FakeSubchannel(index, args, channels[index]);
920       checkState(subchannels[index] == null, "subchannels[" + index + "] already created");
921       subchannels[index] = subchannel;
922       return subchannel;
923     }
924 
925     @Override
updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker)926     public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
927       throw new AssertionError("Should not be called");
928     }
929 
930     @Override
getSynchronizationContext()931     public SynchronizationContext getSynchronizationContext() {
932       return syncContext;
933     }
934 
935     @Override
getScheduledExecutorService()936     public ScheduledExecutorService getScheduledExecutorService() {
937       return fakeClock.getScheduledExecutorService();
938     }
939 
940     @Override
getAuthority()941     public String getAuthority() {
942       throw new AssertionError("Should not be called");
943     }
944 
945     @Override
createOobChannel(EquivalentAddressGroup eag, String authority)946     public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
947       throw new AssertionError("Should not be called");
948     }
949   }
950 }
951