1 /* 2 * Copyright 2019 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds.orca; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 import static com.google.common.truth.Truth.assertThat; 22 import static io.grpc.ConnectivityState.CONNECTING; 23 import static io.grpc.ConnectivityState.IDLE; 24 import static io.grpc.ConnectivityState.READY; 25 import static io.grpc.ConnectivityState.SHUTDOWN; 26 import static org.junit.Assert.fail; 27 import static org.mockito.AdditionalAnswers.delegatesTo; 28 import static org.mockito.ArgumentMatchers.argThat; 29 import static org.mockito.ArgumentMatchers.eq; 30 import static org.mockito.Mockito.atLeast; 31 import static org.mockito.Mockito.inOrder; 32 import static org.mockito.Mockito.mock; 33 import static org.mockito.Mockito.times; 34 import static org.mockito.Mockito.verify; 35 import static org.mockito.Mockito.verifyNoInteractions; 36 import static org.mockito.Mockito.verifyNoMoreInteractions; 37 import static org.mockito.Mockito.when; 38 39 import com.github.xds.data.orca.v3.OrcaLoadReport; 40 import com.github.xds.service.orca.v3.OpenRcaServiceGrpc; 41 import com.github.xds.service.orca.v3.OrcaLoadReportRequest; 42 import com.google.common.util.concurrent.MoreExecutors; 43 import com.google.protobuf.util.Durations; 44 import io.grpc.Attributes; 45 import io.grpc.Channel; 46 import io.grpc.ChannelLogger; 47 import io.grpc.ConnectivityState; 48 import io.grpc.ConnectivityStateInfo; 49 import io.grpc.Context; 50 import io.grpc.Context.CancellationListener; 51 import io.grpc.EquivalentAddressGroup; 52 import io.grpc.LoadBalancer; 53 import io.grpc.LoadBalancer.CreateSubchannelArgs; 54 import io.grpc.LoadBalancer.Helper; 55 import io.grpc.LoadBalancer.Subchannel; 56 import io.grpc.LoadBalancer.SubchannelPicker; 57 import io.grpc.LoadBalancer.SubchannelStateListener; 58 import io.grpc.ManagedChannel; 59 import io.grpc.Status; 60 import io.grpc.SynchronizationContext; 61 import io.grpc.inprocess.InProcessChannelBuilder; 62 import io.grpc.inprocess.InProcessServerBuilder; 63 import io.grpc.internal.BackoffPolicy; 64 import io.grpc.internal.FakeClock; 65 import io.grpc.services.MetricReport; 66 import io.grpc.stub.StreamObserver; 67 import io.grpc.testing.GrpcCleanupRule; 68 import io.grpc.util.ForwardingLoadBalancerHelper; 69 import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener; 70 import io.grpc.xds.orca.OrcaOobUtil.OrcaReportingConfig; 71 import io.grpc.xds.orca.OrcaOobUtil.SubchannelImpl; 72 import java.net.SocketAddress; 73 import java.text.MessageFormat; 74 import java.util.ArrayDeque; 75 import java.util.ArrayList; 76 import java.util.Arrays; 77 import java.util.List; 78 import java.util.Queue; 79 import java.util.Random; 80 import java.util.concurrent.ScheduledExecutorService; 81 import java.util.concurrent.TimeUnit; 82 import java.util.concurrent.atomic.AtomicReference; 83 import org.junit.After; 84 import org.junit.Before; 85 import org.junit.Rule; 86 import org.junit.Test; 87 import org.junit.runner.RunWith; 88 import org.junit.runners.JUnit4; 89 import org.mockito.ArgumentCaptor; 90 import org.mockito.InOrder; 91 import org.mockito.Mock; 92 import org.mockito.junit.MockitoJUnit; 93 import org.mockito.junit.MockitoRule; 94 95 /** 96 * Unit tests for {@link OrcaOobUtil} class. 97 */ 98 @RunWith(JUnit4.class) 99 public class OrcaOobUtilTest { 100 101 private static final int NUM_SUBCHANNELS = 2; 102 private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY = 103 Attributes.Key.create("subchannel-attr-for-test"); 104 private static final OrcaReportingConfig SHORT_INTERVAL_CONFIG = 105 OrcaReportingConfig.newBuilder().setReportInterval(5L, TimeUnit.NANOSECONDS).build(); 106 private static final OrcaReportingConfig MEDIUM_INTERVAL_CONFIG = 107 OrcaReportingConfig.newBuilder().setReportInterval(543L, TimeUnit.MICROSECONDS).build(); 108 private static final OrcaReportingConfig LONG_INTERVAL_CONFIG = 109 OrcaReportingConfig.newBuilder().setReportInterval(1232L, TimeUnit.MILLISECONDS).build(); 110 @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); 111 @Rule public final MockitoRule mocks = MockitoJUnit.rule(); 112 113 @SuppressWarnings({"rawtypes", "unchecked"}) 114 private final List<EquivalentAddressGroup>[] eagLists = new List[NUM_SUBCHANNELS]; 115 private final SubchannelStateListener[] mockStateListeners = 116 new SubchannelStateListener[NUM_SUBCHANNELS]; 117 private final ManagedChannel[] channels = new ManagedChannel[NUM_SUBCHANNELS]; 118 private final OpenRcaServiceImp[] orcaServiceImps = new OpenRcaServiceImp[NUM_SUBCHANNELS]; 119 private final SynchronizationContext syncContext = new SynchronizationContext( 120 new Thread.UncaughtExceptionHandler() { 121 @Override 122 public void uncaughtException(Thread t, Throwable e) { 123 throw new AssertionError(e); 124 } 125 }); 126 127 private final FakeClock fakeClock = new FakeClock(); 128 private final Helper origHelper = mock(Helper.class, delegatesTo(new FakeHelper())); 129 @Mock 130 private OrcaOobReportListener mockOrcaListener0; 131 @Mock 132 private OrcaOobReportListener mockOrcaListener1; 133 @Mock 134 private OrcaOobReportListener mockOrcaListener2; 135 @Mock private BackoffPolicy.Provider backoffPolicyProvider; 136 @Mock private BackoffPolicy backoffPolicy1; 137 @Mock private BackoffPolicy backoffPolicy2; 138 private FakeSubchannel[] subchannels = new FakeSubchannel[NUM_SUBCHANNELS]; 139 private LoadBalancer.Helper orcaHelper; 140 private LoadBalancer.Helper parentHelper; 141 private LoadBalancer.Helper childHelper; 142 private Subchannel savedParentSubchannel; 143 unwrap(Subchannel s)144 private static FakeSubchannel unwrap(Subchannel s) { 145 return (FakeSubchannel) ((SubchannelImpl) s).delegate(); 146 } 147 buildOrcaRequestFromConfig( OrcaReportingConfig config)148 private static OrcaLoadReportRequest buildOrcaRequestFromConfig( 149 OrcaReportingConfig config) { 150 return OrcaLoadReportRequest.newBuilder() 151 .setReportInterval(Durations.fromNanos(config.getReportIntervalNanos())) 152 .build(); 153 } 154 assertLog(List<String> logs, String expectedLog)155 private static void assertLog(List<String> logs, String expectedLog) { 156 assertThat(logs).containsExactly(expectedLog); 157 logs.clear(); 158 } 159 160 @After tearDown()161 public void tearDown() { 162 for (int i = 0; i < NUM_SUBCHANNELS; i++) { 163 if (subchannels[i] != null) { 164 subchannels[i].shutdown(); 165 } 166 } 167 } 168 169 @Test orcaReportingConfig_construct()170 public void orcaReportingConfig_construct() { 171 int interval = new Random().nextInt(Integer.MAX_VALUE); 172 OrcaReportingConfig config = 173 OrcaReportingConfig.newBuilder() 174 .setReportInterval(interval, TimeUnit.MICROSECONDS) 175 .build(); 176 assertThat(config.getReportIntervalNanos()).isEqualTo(TimeUnit.MICROSECONDS.toNanos(interval)); 177 String str = config.toString(); 178 assertThat(str).contains("reportIntervalNanos="); 179 OrcaReportingConfig rebuildedConfig = config.toBuilder().build(); 180 assertThat(rebuildedConfig.getReportIntervalNanos()) 181 .isEqualTo(TimeUnit.MICROSECONDS.toNanos(interval)); 182 } 183 184 @Before setUp()185 public void setUp() throws Exception { 186 for (int i = 0; i < NUM_SUBCHANNELS; i++) { 187 orcaServiceImps[i] = new OpenRcaServiceImp(); 188 cleanupRule.register( 189 InProcessServerBuilder.forName("orca-reporting-test-" + i) 190 .addService(orcaServiceImps[i]) 191 .directExecutor() 192 .build() 193 .start()); 194 ManagedChannel channel = 195 cleanupRule.register( 196 InProcessChannelBuilder.forName("orca-reporting-test-" + i).directExecutor().build()); 197 channels[i] = channel; 198 EquivalentAddressGroup eag = 199 new EquivalentAddressGroup(new FakeSocketAddress("address-" + i)); 200 List<EquivalentAddressGroup> eagList = Arrays.asList(eag); 201 eagLists[i] = eagList; 202 mockStateListeners[i] = mock(SubchannelStateListener.class); 203 } 204 205 when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); 206 when(backoffPolicy1.nextBackoffNanos()).thenReturn(11L, 21L); 207 when(backoffPolicy2.nextBackoffNanos()).thenReturn(12L, 22L); 208 209 orcaHelper = 210 OrcaOobUtil.newOrcaReportingHelper( 211 origHelper, 212 backoffPolicyProvider, 213 fakeClock.getStopwatchSupplier()); 214 parentHelper = 215 new ForwardingLoadBalancerHelper() { 216 @Override 217 protected Helper delegate() { 218 return orcaHelper; 219 } 220 221 @Override 222 public Subchannel createSubchannel(CreateSubchannelArgs args) { 223 Subchannel subchannel = super.createSubchannel(args); 224 savedParentSubchannel = subchannel; 225 return subchannel; 226 } 227 }; 228 childHelper = 229 OrcaOobUtil.newOrcaReportingHelper( 230 parentHelper, 231 backoffPolicyProvider, 232 fakeClock.getStopwatchSupplier()); 233 } 234 235 @Test singlePolicyTypicalWorkflow()236 public void singlePolicyTypicalWorkflow() { 237 verify(origHelper, atLeast(0)).getSynchronizationContext(); 238 verifyNoMoreInteractions(origHelper); 239 240 // Calling createSubchannel() on orcaHelper correctly passes augmented CreateSubchannelArgs 241 // to origHelper. 242 ArgumentCaptor<CreateSubchannelArgs> createArgsCaptor = 243 ArgumentCaptor.forClass(CreateSubchannelArgs.class); 244 for (int i = 0; i < NUM_SUBCHANNELS; i++) { 245 String subchannelAttrValue = "eag attr " + i; 246 Attributes attrs = 247 Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build(); 248 Subchannel created = createSubchannel(orcaHelper, i, attrs); 249 assertThat(unwrap(created)).isSameInstanceAs(subchannels[i]); 250 setOrcaReportConfig(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); 251 verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture()); 252 assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]); 253 assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY)) 254 .isEqualTo(subchannelAttrValue); 255 } 256 257 // ORCA reporting does not start until underlying Subchannel is READY. 258 for (int i = 0; i < NUM_SUBCHANNELS; i++) { 259 FakeSubchannel subchannel = subchannels[i]; 260 OpenRcaServiceImp orcaServiceImp = orcaServiceImps[i]; 261 SubchannelStateListener mockStateListener = mockStateListeners[i]; 262 InOrder inOrder = inOrder(mockStateListener); 263 deliverSubchannelState(i, ConnectivityStateInfo.forNonError(IDLE)); 264 deliverSubchannelState(i, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); 265 deliverSubchannelState(i, ConnectivityStateInfo.forNonError(CONNECTING)); 266 267 inOrder.verify(mockStateListener) 268 .onSubchannelState(eq(ConnectivityStateInfo.forNonError(IDLE))); 269 inOrder.verify(mockStateListener) 270 .onSubchannelState(eq(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE))); 271 inOrder.verify(mockStateListener) 272 .onSubchannelState(eq(ConnectivityStateInfo.forNonError(CONNECTING))); 273 verifyNoMoreInteractions(mockStateListener); 274 275 assertThat(subchannel.logs).isEmpty(); 276 assertThat(orcaServiceImp.calls).isEmpty(); 277 verifyNoMoreInteractions(mockOrcaListener0); 278 deliverSubchannelState(i, ConnectivityStateInfo.forNonError(READY)); 279 verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); 280 assertThat(orcaServiceImp.calls).hasSize(1); 281 ServerSideCall serverCall = orcaServiceImp.calls.peek(); 282 assertThat(serverCall.request).isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG)); 283 assertLog(subchannel.logs, 284 "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses()); 285 286 // Simulate an ORCA service response. Registered listener will receive an ORCA report for 287 // each backend. 288 OrcaLoadReport report = OrcaLoadReport.getDefaultInstance(); 289 serverCall.responseObserver.onNext(report); 290 assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report); 291 verify(mockOrcaListener0, times(i + 1)).onLoadReport( 292 argThat(new OrcaPerRequestUtilTest.MetricsReportMatcher( 293 OrcaPerRequestUtil.fromOrcaLoadReport(report)))); 294 } 295 296 for (int i = 0; i < NUM_SUBCHANNELS; i++) { 297 FakeSubchannel subchannel = subchannels[i]; 298 SubchannelStateListener mockStateListener = mockStateListeners[i]; 299 300 ServerSideCall serverCall = orcaServiceImps[i].calls.peek(); 301 assertThat(serverCall.cancelled).isFalse(); 302 verifyNoMoreInteractions(mockStateListener); 303 304 // Shutting down the subchannel will cancel the ORCA reporting RPC. 305 subchannel.shutdown(); 306 verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(SHUTDOWN))); 307 assertThat(serverCall.cancelled).isTrue(); 308 assertThat(subchannel.logs).isEmpty(); 309 verifyNoMoreInteractions(mockOrcaListener0); 310 } 311 312 for (int i = 0; i < NUM_SUBCHANNELS; i++) { 313 assertThat(orcaServiceImps[i].calls).hasSize(1); 314 } 315 316 verifyNoInteractions(backoffPolicyProvider); 317 } 318 319 @Test twoLevelPoliciesTypicalWorkflow()320 public void twoLevelPoliciesTypicalWorkflow() { 321 verify(origHelper, atLeast(0)).getSynchronizationContext(); 322 verifyNoMoreInteractions(origHelper); 323 324 // Calling createSubchannel() on child helper correctly passes augmented CreateSubchannelArgs 325 // to origHelper. 326 ArgumentCaptor<CreateSubchannelArgs> createArgsCaptor = 327 ArgumentCaptor.forClass(CreateSubchannelArgs.class); 328 for (int i = 0; i < NUM_SUBCHANNELS; i++) { 329 String subchannelAttrValue = "eag attr " + i; 330 Attributes attrs = 331 Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build(); 332 Subchannel created = createSubchannel(childHelper, i, attrs); 333 assertThat(unwrap(((SubchannelImpl) created).delegate())).isSameInstanceAs(subchannels[i]); 334 OrcaOobUtil.setListener(created, mockOrcaListener1, SHORT_INTERVAL_CONFIG); 335 verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture()); 336 assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]); 337 assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY)) 338 .isEqualTo(subchannelAttrValue); 339 } 340 341 // ORCA reporting does not start until underlying Subchannel is READY. 342 for (int i = 0; i < NUM_SUBCHANNELS; i++) { 343 FakeSubchannel subchannel = subchannels[i]; 344 OpenRcaServiceImp orcaServiceImp = orcaServiceImps[i]; 345 SubchannelStateListener mockStateListener = mockStateListeners[i]; 346 InOrder inOrder = inOrder(mockStateListener); 347 deliverSubchannelState(i, ConnectivityStateInfo.forNonError(IDLE)); 348 deliverSubchannelState(i, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); 349 deliverSubchannelState(i, ConnectivityStateInfo.forNonError(CONNECTING)); 350 351 inOrder 352 .verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(IDLE))); 353 inOrder 354 .verify(mockStateListener) 355 .onSubchannelState(eq(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE))); 356 inOrder 357 .verify(mockStateListener) 358 .onSubchannelState(eq(ConnectivityStateInfo.forNonError(CONNECTING))); 359 verifyNoMoreInteractions(mockStateListener); 360 361 assertThat(subchannel.logs).isEmpty(); 362 assertThat(orcaServiceImp.calls).isEmpty(); 363 verifyNoMoreInteractions(mockOrcaListener1); 364 verifyNoMoreInteractions(mockOrcaListener2); 365 deliverSubchannelState(i, ConnectivityStateInfo.forNonError(READY)); 366 verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); 367 assertThat(orcaServiceImp.calls).hasSize(1); 368 ServerSideCall serverCall = orcaServiceImp.calls.peek(); 369 assertThat(serverCall.request).isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG)); 370 assertLog(subchannel.logs, 371 "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses()); 372 373 // Simulate an ORCA service response. Registered listener will receive an ORCA report for 374 // each backend. 375 OrcaLoadReport report = OrcaLoadReport.getDefaultInstance(); 376 serverCall.responseObserver.onNext(report); 377 assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report); 378 verify(mockOrcaListener1, times(i + 1)).onLoadReport( 379 argThat(new OrcaPerRequestUtilTest.MetricsReportMatcher( 380 OrcaPerRequestUtil.fromOrcaLoadReport(report)))); 381 } 382 383 for (int i = 0; i < NUM_SUBCHANNELS; i++) { 384 FakeSubchannel subchannel = subchannels[i]; 385 SubchannelStateListener mockStateListener = mockStateListeners[i]; 386 387 ServerSideCall serverCall = orcaServiceImps[i].calls.peek(); 388 assertThat(serverCall.cancelled).isFalse(); 389 verifyNoMoreInteractions(mockStateListener); 390 391 // Shutting down the subchannel will cancel the ORCA reporting RPC. 392 subchannel.shutdown(); 393 verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(SHUTDOWN))); 394 assertThat(serverCall.cancelled).isTrue(); 395 assertThat(subchannel.logs).isEmpty(); 396 verifyNoMoreInteractions(mockOrcaListener1, mockOrcaListener2); 397 } 398 399 for (int i = 0; i < NUM_SUBCHANNELS; i++) { 400 assertThat(orcaServiceImps[i].calls).hasSize(1); 401 } 402 403 verifyNoInteractions(backoffPolicyProvider); 404 } 405 406 @Test orcReportingDisabledWhenServiceNotImplemented()407 public void orcReportingDisabledWhenServiceNotImplemented() { 408 final Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); 409 OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); 410 FakeSubchannel subchannel = subchannels[0]; 411 OpenRcaServiceImp orcaServiceImp = orcaServiceImps[0]; 412 SubchannelStateListener mockStateListener = mockStateListeners[0]; 413 deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); 414 verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); 415 assertThat(orcaServiceImp.calls).hasSize(1); 416 417 ServerSideCall serverCall = orcaServiceImp.calls.poll(); 418 assertThat(serverCall.request).isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG)); 419 subchannel.logs.clear(); 420 serverCall.responseObserver.onError(Status.UNIMPLEMENTED.asException()); 421 assertLog(subchannel.logs, 422 "ERROR: OpenRcaService disabled: " + Status.UNIMPLEMENTED); 423 verifyNoMoreInteractions(mockOrcaListener0); 424 425 // Re-connecting on Subchannel will reset the "disabled" flag and restart ORCA reporting. 426 assertThat(orcaServiceImp.calls).hasSize(0); 427 deliverSubchannelState(0, ConnectivityStateInfo.forNonError(IDLE)); 428 deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); 429 assertLog(subchannel.logs, 430 "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses()); 431 assertThat(orcaServiceImp.calls).hasSize(1); 432 serverCall = orcaServiceImp.calls.poll(); 433 OrcaLoadReport report = OrcaLoadReport.getDefaultInstance(); 434 serverCall.responseObserver.onNext(report); 435 assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report); 436 verify(mockOrcaListener0).onLoadReport( 437 argThat(new OrcaPerRequestUtilTest.MetricsReportMatcher( 438 OrcaPerRequestUtil.fromOrcaLoadReport(report)))); 439 verifyNoInteractions(backoffPolicyProvider); 440 } 441 442 @Test orcaReportingStreamClosedAndRetried()443 public void orcaReportingStreamClosedAndRetried() { 444 final Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); 445 OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); 446 FakeSubchannel subchannel = subchannels[0]; 447 OpenRcaServiceImp orcaServiceImp = orcaServiceImps[0]; 448 SubchannelStateListener mockStateListener = mockStateListeners[0]; 449 InOrder inOrder = inOrder(mockStateListener, mockOrcaListener0, backoffPolicyProvider, 450 backoffPolicy1, backoffPolicy2); 451 452 deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); 453 inOrder 454 .verify(mockStateListener).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); 455 assertLog(subchannel.logs, 456 "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses()); 457 458 // Server closes the ORCA reporting RPC without any response, will start backoff 459 // sequence 1 (11ns). 460 orcaServiceImp.calls.poll().responseObserver.onCompleted(); 461 assertLog(subchannel.logs, 462 "DEBUG: ORCA reporting stream closed with " + Status.OK + ", backoff in 11" + " ns"); 463 inOrder.verify(backoffPolicyProvider).get(); 464 inOrder.verify(backoffPolicy1).nextBackoffNanos(); 465 verifyRetryAfterNanos(inOrder, orcaServiceImp, 11); 466 assertLog(subchannel.logs, 467 "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses()); 468 469 // Server closes the ORCA reporting RPC with an error, will continue backoff sequence 1 (21ns). 470 orcaServiceImp.calls.poll().responseObserver.onError(Status.UNAVAILABLE.asException()); 471 assertLog(subchannel.logs, 472 "DEBUG: ORCA reporting stream closed with " + Status.UNAVAILABLE + ", backoff in 21" 473 + " ns"); 474 inOrder.verify(backoffPolicy1).nextBackoffNanos(); 475 verifyRetryAfterNanos(inOrder, orcaServiceImp, 21); 476 assertLog(subchannel.logs, 477 "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses()); 478 479 // Server responds normally. 480 OrcaLoadReport report = OrcaLoadReport.getDefaultInstance(); 481 orcaServiceImp.calls.peek().responseObserver.onNext(report); 482 assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report); 483 inOrder.verify(mockOrcaListener0).onLoadReport( 484 argThat(new OrcaPerRequestUtilTest.MetricsReportMatcher( 485 OrcaPerRequestUtil.fromOrcaLoadReport(report)))); 486 // Server closes the ORCA reporting RPC after a response, will restart immediately. 487 orcaServiceImp.calls.poll().responseObserver.onCompleted(); 488 assertThat(subchannel.logs).containsExactly( 489 "DEBUG: ORCA reporting stream closed with " + Status.OK + ", backoff in 0" + " ns", 490 "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses()); 491 subchannel.logs.clear(); 492 493 // Backoff policy is set to sequence 2 in previous retry. 494 // Server closes the ORCA reporting RPC with an error, will start backoff sequence 2 (12ns). 495 orcaServiceImp.calls.poll().responseObserver.onError(Status.UNAVAILABLE.asException()); 496 assertLog(subchannel.logs, 497 "DEBUG: ORCA reporting stream closed with " + Status.UNAVAILABLE + ", backoff in 12" 498 + " ns"); 499 inOrder.verify(backoffPolicyProvider).get(); 500 inOrder.verify(backoffPolicy2).nextBackoffNanos(); 501 verifyRetryAfterNanos(inOrder, orcaServiceImp, 12); 502 assertLog(subchannel.logs, 503 "DEBUG: Starting ORCA reporting for " + subchannel.getAllAddresses()); 504 505 verifyNoMoreInteractions(mockStateListener, mockOrcaListener0, backoffPolicyProvider, 506 backoffPolicy1, backoffPolicy2); 507 } 508 509 @Test reportingNotStartedUntilConfigured()510 public void reportingNotStartedUntilConfigured() { 511 Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); 512 deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); 513 verify(mockStateListeners[0]) 514 .onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); 515 516 assertThat(orcaServiceImps[0].calls).isEmpty(); 517 assertThat(subchannels[0].logs).isEmpty(); 518 OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); 519 assertThat(orcaServiceImps[0].calls).hasSize(1); 520 assertLog(subchannels[0].logs, 521 "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); 522 assertThat(orcaServiceImps[0].calls.peek().request) 523 .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG)); 524 } 525 526 @Test updateListenerThrows()527 public void updateListenerThrows() { 528 Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); 529 OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); 530 deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); 531 verify(mockStateListeners[0]) 532 .onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); 533 534 assertThat(orcaServiceImps[0].calls).hasSize(1); 535 assertLog(subchannels[0].logs, 536 "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); 537 assertThat(orcaServiceImps[0].calls.peek().request) 538 .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG)); 539 assertThat(unwrap(created)).isSameInstanceAs(subchannels[0]); 540 try { 541 OrcaOobUtil.setListener(subchannels[0], mockOrcaListener1, MEDIUM_INTERVAL_CONFIG); 542 fail("Update orca listener on non-orca subchannel should fail"); 543 } catch (IllegalArgumentException ex) { 544 assertThat(ex.getMessage()).isEqualTo("Subchannel does not have orca Out-Of-Band " 545 + "stream enabled. Try to use a subchannel created by OrcaOobUtil.OrcaHelper."); 546 } 547 } 548 549 @Test removeListener()550 public void removeListener() { 551 Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); 552 OrcaOobUtil.setListener(created, null, SHORT_INTERVAL_CONFIG); 553 deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); 554 verify(mockStateListeners[0]) 555 .onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); 556 557 assertThat(orcaServiceImps[0].calls).isEmpty(); 558 assertThat(subchannels[0].logs).isEmpty(); 559 assertThat(unwrap(created)).isSameInstanceAs(subchannels[0]); 560 561 OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); 562 assertThat(orcaServiceImps[0].calls).hasSize(1); 563 assertLog(subchannels[0].logs, 564 "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); 565 assertThat(orcaServiceImps[0].calls.peek().request) 566 .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG)); 567 568 OrcaOobUtil.setListener(created, null, null); 569 assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue(); 570 assertThat(orcaServiceImps[0].calls).isEmpty(); 571 assertThat(subchannels[0].logs).isEmpty(); 572 assertThat(fakeClock.getPendingTasks()).isEmpty(); 573 verifyNoMoreInteractions(mockOrcaListener0); 574 verifyNoInteractions(backoffPolicyProvider); 575 } 576 577 @Test updateReportingIntervalBeforeCreatingSubchannel()578 public void updateReportingIntervalBeforeCreatingSubchannel() { 579 Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); 580 OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); 581 deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); 582 verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); 583 584 assertThat(orcaServiceImps[0].calls).hasSize(1); 585 assertLog(subchannels[0].logs, 586 "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); 587 assertThat(orcaServiceImps[0].calls.poll().request) 588 .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG)); 589 } 590 591 @Test updateReportingIntervalBeforeSubchannelReady()592 public void updateReportingIntervalBeforeSubchannelReady() { 593 Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); 594 OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); 595 deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); 596 verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); 597 598 assertThat(orcaServiceImps[0].calls).hasSize(1); 599 assertLog(subchannels[0].logs, 600 "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); 601 assertThat(orcaServiceImps[0].calls.poll().request) 602 .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG)); 603 } 604 605 @Test updateReportingIntervalWhenRpcActive()606 public void updateReportingIntervalWhenRpcActive() { 607 // Sets report interval before creating a Subchannel, reporting starts right after suchannel 608 // state becomes READY. 609 Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); 610 OrcaOobUtil.setListener(created, mockOrcaListener0, 611 MEDIUM_INTERVAL_CONFIG); 612 deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); 613 verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); 614 615 assertThat(orcaServiceImps[0].calls).hasSize(1); 616 assertLog(subchannels[0].logs, 617 "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); 618 assertThat(orcaServiceImps[0].calls.peek().request) 619 .isEqualTo(buildOrcaRequestFromConfig(MEDIUM_INTERVAL_CONFIG)); 620 621 // Make reporting less frequent. 622 OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG); 623 assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue(); 624 assertThat(orcaServiceImps[0].calls).hasSize(1); 625 assertLog(subchannels[0].logs, 626 "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); 627 assertThat(orcaServiceImps[0].calls.peek().request) 628 .isEqualTo(buildOrcaRequestFromConfig(LONG_INTERVAL_CONFIG)); 629 630 // Configuring with the same report interval again does not restart ORCA RPC. 631 OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG); 632 assertThat(orcaServiceImps[0].calls.peek().cancelled).isFalse(); 633 assertThat(subchannels[0].logs).isEmpty(); 634 635 // Make reporting more frequent. 636 OrcaOobUtil.setListener(created, mockOrcaListener0, 637 SHORT_INTERVAL_CONFIG); 638 assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue(); 639 assertThat(orcaServiceImps[0].calls).hasSize(1); 640 assertLog(subchannels[0].logs, 641 "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); 642 assertThat(orcaServiceImps[0].calls.poll().request) 643 .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG)); 644 } 645 646 @Test updateReportingIntervalWhenRpcPendingRetry()647 public void updateReportingIntervalWhenRpcPendingRetry() { 648 Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); 649 OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); 650 deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); 651 verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); 652 653 assertThat(orcaServiceImps[0].calls).hasSize(1); 654 assertLog(subchannels[0].logs, 655 "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); 656 assertThat(orcaServiceImps[0].calls.peek().request) 657 .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG)); 658 659 // Server closes the RPC without response, client will retry with backoff. 660 assertThat(fakeClock.getPendingTasks()).isEmpty(); 661 orcaServiceImps[0].calls.poll().responseObserver.onCompleted(); 662 assertLog(subchannels[0].logs, 663 "DEBUG: ORCA reporting stream closed with " + Status.OK + ", backoff in 11" 664 + " ns"); 665 assertThat(fakeClock.getPendingTasks()).hasSize(1); 666 assertThat(orcaServiceImps[0].calls).isEmpty(); 667 668 // Make reporting less frequent. 669 OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG); 670 // Retry task will be canceled and restarts new RPC immediately. 671 assertThat(fakeClock.getPendingTasks()).isEmpty(); 672 assertThat(orcaServiceImps[0].calls).hasSize(1); 673 assertLog(subchannels[0].logs, 674 "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); 675 assertThat(orcaServiceImps[0].calls.peek().request) 676 .isEqualTo(buildOrcaRequestFromConfig(LONG_INTERVAL_CONFIG)); 677 } 678 679 @Test policiesReceiveSameReportIndependently()680 public void policiesReceiveSameReportIndependently() { 681 Subchannel childSubchannel = createSubchannel(childHelper, 0, Attributes.EMPTY); 682 deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); 683 684 // No helper sets ORCA reporting interval, so load reporting is not started. 685 verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); 686 assertThat(orcaServiceImps[0].calls).isEmpty(); 687 assertThat(subchannels[0].logs).isEmpty(); 688 689 // Parent helper requests ORCA reports with a certain interval, load reporting starts. 690 OrcaOobUtil.setListener(savedParentSubchannel, mockOrcaListener1, SHORT_INTERVAL_CONFIG); 691 assertThat(orcaServiceImps[0].calls).hasSize(1); 692 assertLog(subchannels[0].logs, 693 "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); 694 695 OrcaLoadReport report = OrcaLoadReport.getDefaultInstance(); 696 assertThat(orcaServiceImps[0].calls).hasSize(1); 697 orcaServiceImps[0].calls.peek().responseObserver.onNext(report); 698 assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report); 699 // Only parent helper's listener receives the report. 700 ArgumentCaptor<MetricReport> parentReportCaptor = ArgumentCaptor.forClass(MetricReport.class); 701 verify(mockOrcaListener1).onLoadReport(parentReportCaptor.capture()); 702 assertThat(OrcaPerRequestUtilTest.reportEqual(parentReportCaptor.getValue(), 703 OrcaPerRequestUtil.fromOrcaLoadReport(report))).isTrue(); 704 verifyNoMoreInteractions(mockOrcaListener2); 705 706 // Now child helper also wants to receive reports. 707 OrcaOobUtil.setListener(childSubchannel, mockOrcaListener2, SHORT_INTERVAL_CONFIG); 708 orcaServiceImps[0].calls.peek().responseObserver.onNext(report); 709 assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report); 710 // Both helper receives the same report instance. 711 ArgumentCaptor<MetricReport> childReportCaptor = ArgumentCaptor.forClass(MetricReport.class); 712 verify(mockOrcaListener1, times(2)) 713 .onLoadReport(parentReportCaptor.capture()); 714 verify(mockOrcaListener2) 715 .onLoadReport(childReportCaptor.capture()); 716 assertThat(childReportCaptor.getValue()).isSameInstanceAs(parentReportCaptor.getValue()); 717 } 718 719 @Test reportWithMostFrequentIntervalRequested()720 public void reportWithMostFrequentIntervalRequested() { 721 Subchannel created = createSubchannel(childHelper, 0, Attributes.EMPTY); 722 OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG); 723 OrcaOobUtil.setListener(created, mockOrcaListener1, SHORT_INTERVAL_CONFIG); 724 deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); 725 verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); 726 assertThat(orcaServiceImps[0].calls).hasSize(1); 727 assertLog(subchannels[0].logs, 728 "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); 729 730 // The real report interval to be requested is the minimum of intervals requested by helpers. 731 assertThat(Durations.toNanos(orcaServiceImps[0].calls.peek().request.getReportInterval())) 732 .isEqualTo(SHORT_INTERVAL_CONFIG.getReportIntervalNanos()); 733 734 // Parent helper wants reporting to be more frequent than its current setting while it is still 735 // less frequent than parent helper. Nothing should happen on existing RPC. 736 OrcaOobUtil.setListener(savedParentSubchannel, mockOrcaListener0, MEDIUM_INTERVAL_CONFIG); 737 assertThat(orcaServiceImps[0].calls.peek().cancelled).isFalse(); 738 assertThat(subchannels[0].logs).isEmpty(); 739 740 // Parent helper wants reporting to be less frequent. 741 OrcaOobUtil.setListener(created, mockOrcaListener1, MEDIUM_INTERVAL_CONFIG); 742 assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue(); 743 assertThat(orcaServiceImps[0].calls).hasSize(1); 744 assertLog(subchannels[0].logs, 745 "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); 746 // ORCA reporting RPC restarts and the the real report interval is adjusted. 747 assertThat(Durations.toNanos(orcaServiceImps[0].calls.poll().request.getReportInterval())) 748 .isEqualTo(MEDIUM_INTERVAL_CONFIG.getReportIntervalNanos()); 749 } 750 verifyRetryAfterNanos(InOrder inOrder, OpenRcaServiceImp orcaServiceImp, long nanos)751 private void verifyRetryAfterNanos(InOrder inOrder, OpenRcaServiceImp orcaServiceImp, 752 long nanos) { 753 assertThat(fakeClock.getPendingTasks()).hasSize(1); 754 assertThat(orcaServiceImp.calls).isEmpty(); 755 fakeClock.forwardNanos(nanos - 1); 756 assertThat(orcaServiceImp.calls).isEmpty(); 757 inOrder.verifyNoMoreInteractions(); 758 fakeClock.forwardNanos(1); 759 assertThat(orcaServiceImp.calls).hasSize(1); 760 assertThat(fakeClock.getPendingTasks()).isEmpty(); 761 } 762 deliverSubchannelState(final int index, final ConnectivityStateInfo newState)763 private void deliverSubchannelState(final int index, final ConnectivityStateInfo newState) { 764 syncContext.execute( 765 new Runnable() { 766 @Override 767 public void run() { 768 subchannels[index].stateListener.onSubchannelState(newState); 769 } 770 }); 771 } 772 createSubchannel(final Helper helper, final int index, final Attributes attrs)773 private Subchannel createSubchannel(final Helper helper, final int index, 774 final Attributes attrs) { 775 final AtomicReference<Subchannel> newSubchannel = new AtomicReference<>(); 776 syncContext.execute( 777 new Runnable() { 778 @Override 779 public void run() { 780 Subchannel s = 781 helper.createSubchannel( 782 CreateSubchannelArgs.newBuilder() 783 .setAddresses(eagLists[index]) 784 .setAttributes(attrs) 785 .build()); 786 s.start(mockStateListeners[index]); 787 newSubchannel.set(s); 788 } 789 }); 790 return newSubchannel.get(); 791 } 792 setOrcaReportConfig( final Subchannel subchannel, final OrcaOobReportListener listener, final OrcaReportingConfig config)793 private void setOrcaReportConfig( 794 final Subchannel subchannel, 795 final OrcaOobReportListener listener, 796 final OrcaReportingConfig config) { 797 OrcaOobUtil.setListener(subchannel, listener, config); 798 } 799 800 private static final class OpenRcaServiceImp extends OpenRcaServiceGrpc.OpenRcaServiceImplBase { 801 final Queue<ServerSideCall> calls = new ArrayDeque<>(); 802 803 @Override streamCoreMetrics( OrcaLoadReportRequest request, StreamObserver<OrcaLoadReport> responseObserver)804 public void streamCoreMetrics( 805 OrcaLoadReportRequest request, StreamObserver<OrcaLoadReport> responseObserver) { 806 final ServerSideCall call = new ServerSideCall(request, responseObserver); 807 Context.current() 808 .addListener( 809 new CancellationListener() { 810 @Override 811 public void cancelled(Context ctx) { 812 call.cancelled = true; 813 } 814 }, 815 MoreExecutors.directExecutor()); 816 calls.add(call); 817 } 818 } 819 820 private static final class ServerSideCall { 821 final OrcaLoadReportRequest request; 822 final StreamObserver<OrcaLoadReport> responseObserver; 823 boolean cancelled; 824 ServerSideCall(OrcaLoadReportRequest request, StreamObserver<OrcaLoadReport> responseObserver)825 ServerSideCall(OrcaLoadReportRequest request, StreamObserver<OrcaLoadReport> responseObserver) { 826 this.request = request; 827 this.responseObserver = responseObserver; 828 } 829 } 830 831 private static final class FakeSocketAddress extends SocketAddress { 832 final String name; 833 FakeSocketAddress(String name)834 FakeSocketAddress(String name) { 835 this.name = name; 836 } 837 838 @Override toString()839 public String toString() { 840 return name; 841 } 842 } 843 844 private final class FakeSubchannel extends Subchannel { 845 final List<EquivalentAddressGroup> eagList; 846 final Attributes attrs; 847 final Channel channel; 848 final List<String> logs = new ArrayList<>(); 849 final int index; 850 SubchannelStateListener stateListener; 851 private final ChannelLogger logger = 852 new ChannelLogger() { 853 @Override 854 public void log(ChannelLogLevel level, String msg) { 855 logs.add(level + ": " + msg); 856 } 857 858 @Override 859 public void log(ChannelLogLevel level, String template, Object... args) { 860 log(level, MessageFormat.format(template, args)); 861 } 862 }; 863 FakeSubchannel(int index, CreateSubchannelArgs args, Channel channel)864 FakeSubchannel(int index, CreateSubchannelArgs args, Channel channel) { 865 this.index = index; 866 this.eagList = args.getAddresses(); 867 this.attrs = args.getAttributes(); 868 this.channel = checkNotNull(channel); 869 } 870 871 @Override start(SubchannelStateListener listener)872 public void start(SubchannelStateListener listener) { 873 checkState(this.stateListener == null); 874 this.stateListener = listener; 875 } 876 877 @Override shutdown()878 public void shutdown() { 879 deliverSubchannelState(index, ConnectivityStateInfo.forNonError(SHUTDOWN)); 880 } 881 882 @Override requestConnection()883 public void requestConnection() { 884 throw new AssertionError("Should not be called"); 885 } 886 887 @Override getAllAddresses()888 public List<EquivalentAddressGroup> getAllAddresses() { 889 return eagList; 890 } 891 892 @Override getAttributes()893 public Attributes getAttributes() { 894 return attrs; 895 } 896 897 @Override asChannel()898 public Channel asChannel() { 899 return channel; 900 } 901 902 @Override getChannelLogger()903 public ChannelLogger getChannelLogger() { 904 return logger; 905 } 906 } 907 908 private final class FakeHelper extends Helper { 909 @Override createSubchannel(CreateSubchannelArgs args)910 public Subchannel createSubchannel(CreateSubchannelArgs args) { 911 int index = -1; 912 for (int i = 0; i < NUM_SUBCHANNELS; i++) { 913 if (eagLists[i].equals(args.getAddresses())) { 914 index = i; 915 break; 916 } 917 } 918 checkState(index >= 0, "addrs " + args.getAddresses() + " not found"); 919 FakeSubchannel subchannel = new FakeSubchannel(index, args, channels[index]); 920 checkState(subchannels[index] == null, "subchannels[" + index + "] already created"); 921 subchannels[index] = subchannel; 922 return subchannel; 923 } 924 925 @Override updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker)926 public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { 927 throw new AssertionError("Should not be called"); 928 } 929 930 @Override getSynchronizationContext()931 public SynchronizationContext getSynchronizationContext() { 932 return syncContext; 933 } 934 935 @Override getScheduledExecutorService()936 public ScheduledExecutorService getScheduledExecutorService() { 937 return fakeClock.getScheduledExecutorService(); 938 } 939 940 @Override getAuthority()941 public String getAuthority() { 942 throw new AssertionError("Should not be called"); 943 } 944 945 @Override createOobChannel(EquivalentAddressGroup eag, String authority)946 public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { 947 throw new AssertionError("Should not be called"); 948 } 949 } 950 } 951