1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.grpclb; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static io.grpc.ConnectivityState.CONNECTING; 21 import static io.grpc.ConnectivityState.IDLE; 22 import static io.grpc.ConnectivityState.READY; 23 import static io.grpc.ConnectivityState.SHUTDOWN; 24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 25 import static io.grpc.grpclb.GrpclbState.BUFFER_ENTRY; 26 import static io.grpc.grpclb.GrpclbState.DROP_PICK_RESULT; 27 import static org.junit.Assert.assertEquals; 28 import static org.junit.Assert.assertFalse; 29 import static org.junit.Assert.assertNull; 30 import static org.junit.Assert.assertSame; 31 import static org.junit.Assert.assertTrue; 32 import static org.mockito.AdditionalAnswers.delegatesTo; 33 import static org.mockito.Matchers.any; 34 import static org.mockito.Matchers.eq; 35 import static org.mockito.Matchers.same; 36 import static org.mockito.Mockito.atLeast; 37 import static org.mockito.Mockito.doAnswer; 38 import static org.mockito.Mockito.inOrder; 39 import static org.mockito.Mockito.mock; 40 import static org.mockito.Mockito.never; 41 import static org.mockito.Mockito.times; 42 import static org.mockito.Mockito.verify; 43 import static org.mockito.Mockito.verifyNoMoreInteractions; 44 import static org.mockito.Mockito.when; 45 46 import com.google.common.util.concurrent.MoreExecutors; 47 import com.google.protobuf.ByteString; 48 import com.google.protobuf.util.Durations; 49 import com.google.protobuf.util.Timestamps; 50 import io.grpc.Attributes; 51 import io.grpc.CallOptions; 52 import io.grpc.ClientStreamTracer; 53 import io.grpc.ConnectivityState; 54 import io.grpc.ConnectivityStateInfo; 55 import io.grpc.EquivalentAddressGroup; 56 import io.grpc.LoadBalancer.Helper; 57 import io.grpc.LoadBalancer.PickResult; 58 import io.grpc.LoadBalancer.PickSubchannelArgs; 59 import io.grpc.LoadBalancer.Subchannel; 60 import io.grpc.LoadBalancer.SubchannelPicker; 61 import io.grpc.ManagedChannel; 62 import io.grpc.Metadata; 63 import io.grpc.Status; 64 import io.grpc.grpclb.GrpclbState.BackendEntry; 65 import io.grpc.grpclb.GrpclbState.DropEntry; 66 import io.grpc.grpclb.GrpclbState.ErrorEntry; 67 import io.grpc.grpclb.GrpclbState.RoundRobinPicker; 68 import io.grpc.inprocess.InProcessChannelBuilder; 69 import io.grpc.inprocess.InProcessServerBuilder; 70 import io.grpc.internal.BackoffPolicy; 71 import io.grpc.internal.FakeClock; 72 import io.grpc.internal.GrpcAttributes; 73 import io.grpc.internal.ObjectPool; 74 import io.grpc.internal.SerializingExecutor; 75 import io.grpc.internal.TimeProvider; 76 import io.grpc.lb.v1.ClientStats; 77 import io.grpc.lb.v1.ClientStatsPerToken; 78 import io.grpc.lb.v1.InitialLoadBalanceRequest; 79 import io.grpc.lb.v1.InitialLoadBalanceResponse; 80 import io.grpc.lb.v1.LoadBalanceRequest; 81 import io.grpc.lb.v1.LoadBalanceResponse; 82 import io.grpc.lb.v1.LoadBalancerGrpc; 83 import io.grpc.lb.v1.Server; 84 import io.grpc.lb.v1.ServerList; 85 import io.grpc.stub.StreamObserver; 86 import java.net.InetSocketAddress; 87 import java.net.SocketAddress; 88 import java.util.ArrayList; 89 import java.util.Arrays; 90 import java.util.Collections; 91 import java.util.LinkedList; 92 import java.util.List; 93 import java.util.concurrent.ScheduledExecutorService; 94 import java.util.concurrent.TimeUnit; 95 import javax.annotation.Nullable; 96 import org.junit.After; 97 import org.junit.Before; 98 import org.junit.Test; 99 import org.junit.runner.RunWith; 100 import org.junit.runners.JUnit4; 101 import org.mockito.ArgumentCaptor; 102 import org.mockito.Captor; 103 import org.mockito.InOrder; 104 import org.mockito.Mock; 105 import org.mockito.MockitoAnnotations; 106 import org.mockito.invocation.InvocationOnMock; 107 import org.mockito.stubbing.Answer; 108 109 /** Unit tests for {@link GrpclbLoadBalancer}. */ 110 @RunWith(JUnit4.class) 111 public class GrpclbLoadBalancerTest { 112 private static final Attributes.Key<String> RESOLUTION_ATTR = 113 Attributes.Key.create("resolution-attr"); 114 private static final String SERVICE_AUTHORITY = "api.google.com"; 115 private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER = 116 new FakeClock.TaskFilter() { 117 @Override 118 public boolean shouldAccept(Runnable command) { 119 return command instanceof GrpclbState.LoadReportingTask; 120 } 121 }; 122 private static final FakeClock.TaskFilter FALLBACK_MODE_TASK_FILTER = 123 new FakeClock.TaskFilter() { 124 @Override 125 public boolean shouldAccept(Runnable command) { 126 return command instanceof GrpclbState.FallbackModeTask; 127 } 128 }; 129 private static final FakeClock.TaskFilter LB_RPC_RETRY_TASK_FILTER = 130 new FakeClock.TaskFilter() { 131 @Override 132 public boolean shouldAccept(Runnable command) { 133 return command instanceof GrpclbState.LbRpcRetryTask; 134 } 135 }; 136 private static final Attributes LB_BACKEND_ATTRS = 137 Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_PROVIDED_BACKEND, true).build(); 138 139 @Mock 140 private Helper helper; 141 @Mock 142 private SubchannelPool subchannelPool; 143 private SubchannelPicker currentPicker; 144 private LoadBalancerGrpc.LoadBalancerImplBase mockLbService; 145 @Captor 146 private ArgumentCaptor<StreamObserver<LoadBalanceResponse>> lbResponseObserverCaptor; 147 private final FakeClock fakeClock = new FakeClock(); 148 private final LinkedList<StreamObserver<LoadBalanceRequest>> lbRequestObservers = 149 new LinkedList<StreamObserver<LoadBalanceRequest>>(); 150 private final LinkedList<Subchannel> mockSubchannels = new LinkedList<Subchannel>(); 151 private final LinkedList<ManagedChannel> fakeOobChannels = new LinkedList<ManagedChannel>(); 152 private final ArrayList<Subchannel> subchannelTracker = new ArrayList<>(); 153 private final ArrayList<ManagedChannel> oobChannelTracker = new ArrayList<>(); 154 private final ArrayList<String> failingLbAuthorities = new ArrayList<>(); 155 private final TimeProvider timeProvider = new TimeProvider() { 156 @Override 157 public long currentTimeNanos() { 158 return fakeClock.getTicker().read(); 159 } 160 }; 161 private io.grpc.Server fakeLbServer; 162 @Captor 163 private ArgumentCaptor<SubchannelPicker> pickerCaptor; 164 private final SerializingExecutor channelExecutor = 165 new SerializingExecutor(MoreExecutors.directExecutor()); 166 @Mock 167 private ObjectPool<ScheduledExecutorService> timerServicePool; 168 @Mock 169 private BackoffPolicy.Provider backoffPolicyProvider; 170 @Mock 171 private BackoffPolicy backoffPolicy1; 172 @Mock 173 private BackoffPolicy backoffPolicy2; 174 private GrpclbLoadBalancer balancer; 175 176 @SuppressWarnings("unchecked") 177 @Before setUp()178 public void setUp() throws Exception { 179 MockitoAnnotations.initMocks(this); 180 mockLbService = mock(LoadBalancerGrpc.LoadBalancerImplBase.class, delegatesTo( 181 new LoadBalancerGrpc.LoadBalancerImplBase() { 182 @Override 183 public StreamObserver<LoadBalanceRequest> balanceLoad( 184 final StreamObserver<LoadBalanceResponse> responseObserver) { 185 StreamObserver<LoadBalanceRequest> requestObserver = 186 mock(StreamObserver.class); 187 Answer<Void> closeRpc = new Answer<Void>() { 188 @Override 189 public Void answer(InvocationOnMock invocation) { 190 responseObserver.onCompleted(); 191 return null; 192 } 193 }; 194 doAnswer(closeRpc).when(requestObserver).onCompleted(); 195 lbRequestObservers.add(requestObserver); 196 return requestObserver; 197 } 198 })); 199 fakeLbServer = InProcessServerBuilder.forName("fakeLb") 200 .directExecutor().addService(mockLbService).build().start(); 201 doAnswer(new Answer<ManagedChannel>() { 202 @Override 203 public ManagedChannel answer(InvocationOnMock invocation) throws Throwable { 204 String authority = (String) invocation.getArguments()[1]; 205 ManagedChannel channel; 206 if (failingLbAuthorities.contains(authority)) { 207 channel = InProcessChannelBuilder.forName("nonExistFakeLb").directExecutor() 208 .overrideAuthority(authority).build(); 209 } else { 210 channel = InProcessChannelBuilder.forName("fakeLb").directExecutor() 211 .overrideAuthority(authority).build(); 212 } 213 fakeOobChannels.add(channel); 214 oobChannelTracker.add(channel); 215 return channel; 216 } 217 }).when(helper).createOobChannel(any(EquivalentAddressGroup.class), any(String.class)); 218 doAnswer(new Answer<Subchannel>() { 219 @Override 220 public Subchannel answer(InvocationOnMock invocation) throws Throwable { 221 Subchannel subchannel = mock(Subchannel.class); 222 EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0]; 223 Attributes attrs = (Attributes) invocation.getArguments()[1]; 224 when(subchannel.getAddresses()).thenReturn(eag); 225 when(subchannel.getAttributes()).thenReturn(attrs); 226 mockSubchannels.add(subchannel); 227 subchannelTracker.add(subchannel); 228 return subchannel; 229 } 230 }).when(subchannelPool).takeOrCreateSubchannel( 231 any(EquivalentAddressGroup.class), any(Attributes.class)); 232 doAnswer(new Answer<Void>() { 233 @Override 234 public Void answer(InvocationOnMock invocation) throws Throwable { 235 Runnable task = (Runnable) invocation.getArguments()[0]; 236 channelExecutor.execute(task); 237 return null; 238 } 239 }).when(helper).runSerialized(any(Runnable.class)); 240 doAnswer(new Answer<Void>() { 241 @Override 242 public Void answer(InvocationOnMock invocation) throws Throwable { 243 currentPicker = (SubchannelPicker) invocation.getArguments()[1]; 244 return null; 245 } 246 }).when(helper).updateBalancingState( 247 any(ConnectivityState.class), any(SubchannelPicker.class)); 248 when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY); 249 ScheduledExecutorService timerService = fakeClock.getScheduledExecutorService(); 250 when(timerServicePool.getObject()).thenReturn(timerService); 251 when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); 252 when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L); 253 when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); 254 balancer = new GrpclbLoadBalancer( 255 helper, 256 subchannelPool, 257 timerServicePool, 258 timeProvider, 259 backoffPolicyProvider); 260 verify(subchannelPool).init(same(helper), same(timerService)); 261 } 262 263 @After tearDown()264 public void tearDown() { 265 try { 266 if (balancer != null) { 267 channelExecutor.execute(new Runnable() { 268 @Override 269 public void run() { 270 balancer.shutdown(); 271 } 272 }); 273 } 274 for (ManagedChannel channel : oobChannelTracker) { 275 assertTrue(channel + " is shutdown", channel.isShutdown()); 276 // balancer should have closed the LB stream, terminating the OOB channel. 277 assertTrue(channel + " is terminated", channel.isTerminated()); 278 } 279 // GRPCLB manages subchannels only through subchannelPool 280 for (Subchannel subchannel: subchannelTracker) { 281 verify(subchannelPool).returnSubchannel(same(subchannel)); 282 // Our mock subchannelPool never calls Subchannel.shutdown(), thus we can tell if 283 // LoadBalancer has called it expectedly. 284 verify(subchannel, never()).shutdown(); 285 } 286 verify(helper, never()) 287 .createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); 288 // No timer should linger after shutdown 289 assertThat(fakeClock.getPendingTasks()).isEmpty(); 290 } finally { 291 if (fakeLbServer != null) { 292 fakeLbServer.shutdownNow(); 293 } 294 } 295 } 296 297 @Test roundRobinPickerNoDrop()298 public void roundRobinPickerNoDrop() { 299 GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider); 300 Subchannel subchannel = mock(Subchannel.class); 301 BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001"); 302 BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002"); 303 304 List<BackendEntry> pickList = Arrays.asList(b1, b2); 305 RoundRobinPicker picker = new RoundRobinPicker(Collections.<DropEntry>emptyList(), pickList); 306 307 PickSubchannelArgs args1 = mock(PickSubchannelArgs.class); 308 Metadata headers1 = new Metadata(); 309 // The existing token on the headers will be replaced 310 headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD"); 311 when(args1.getHeaders()).thenReturn(headers1); 312 assertSame(b1.result, picker.pickSubchannel(args1)); 313 verify(args1).getHeaders(); 314 assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); 315 316 PickSubchannelArgs args2 = mock(PickSubchannelArgs.class); 317 Metadata headers2 = new Metadata(); 318 when(args2.getHeaders()).thenReturn(headers2); 319 assertSame(b2.result, picker.pickSubchannel(args2)); 320 verify(args2).getHeaders(); 321 assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002"); 322 323 PickSubchannelArgs args3 = mock(PickSubchannelArgs.class); 324 Metadata headers3 = new Metadata(); 325 when(args3.getHeaders()).thenReturn(headers3); 326 assertSame(b1.result, picker.pickSubchannel(args3)); 327 verify(args3).getHeaders(); 328 assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); 329 330 verify(subchannel, never()).getAttributes(); 331 } 332 333 334 @Test roundRobinPickerWithDrop()335 public void roundRobinPickerWithDrop() { 336 assertTrue(DROP_PICK_RESULT.isDrop()); 337 GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider); 338 Subchannel subchannel = mock(Subchannel.class); 339 // 1 out of 2 requests are to be dropped 340 DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003"); 341 List<DropEntry> dropList = Arrays.asList(null, d); 342 343 BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001"); 344 BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002"); 345 List<BackendEntry> pickList = Arrays.asList(b1, b2); 346 RoundRobinPicker picker = new RoundRobinPicker(dropList, pickList); 347 348 // dropList[0], pickList[0] 349 PickSubchannelArgs args1 = mock(PickSubchannelArgs.class); 350 Metadata headers1 = new Metadata(); 351 headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD"); 352 when(args1.getHeaders()).thenReturn(headers1); 353 assertSame(b1.result, picker.pickSubchannel(args1)); 354 verify(args1).getHeaders(); 355 assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); 356 357 // dropList[1]: drop 358 PickSubchannelArgs args2 = mock(PickSubchannelArgs.class); 359 Metadata headers2 = new Metadata(); 360 when(args2.getHeaders()).thenReturn(headers2); 361 assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args2)); 362 verify(args2, never()).getHeaders(); 363 364 // dropList[0], pickList[1] 365 PickSubchannelArgs args3 = mock(PickSubchannelArgs.class); 366 Metadata headers3 = new Metadata(); 367 when(args3.getHeaders()).thenReturn(headers3); 368 assertSame(b2.result, picker.pickSubchannel(args3)); 369 verify(args3).getHeaders(); 370 assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002"); 371 372 // dropList[1]: drop 373 PickSubchannelArgs args4 = mock(PickSubchannelArgs.class); 374 Metadata headers4 = new Metadata(); 375 when(args4.getHeaders()).thenReturn(headers4); 376 assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args4)); 377 verify(args4, never()).getHeaders(); 378 379 // dropList[0], pickList[0] 380 PickSubchannelArgs args5 = mock(PickSubchannelArgs.class); 381 Metadata headers5 = new Metadata(); 382 when(args5.getHeaders()).thenReturn(headers5); 383 assertSame(b1.result, picker.pickSubchannel(args5)); 384 verify(args5).getHeaders(); 385 assertThat(headers5.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); 386 387 verify(subchannel, never()).getAttributes(); 388 } 389 390 @Test loadReporting()391 public void loadReporting() { 392 Metadata headers = new Metadata(); 393 PickSubchannelArgs args = mock(PickSubchannelArgs.class); 394 when(args.getHeaders()).thenReturn(headers); 395 396 long loadReportIntervalMillis = 1983; 397 List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); 398 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 399 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 400 401 // Fallback timer is started as soon as address is resolved. 402 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 403 404 assertEquals(1, fakeOobChannels.size()); 405 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 406 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 407 assertEquals(1, lbRequestObservers.size()); 408 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 409 InOrder inOrder = inOrder(lbRequestObserver); 410 InOrder helperInOrder = inOrder(helper, subchannelPool); 411 412 inOrder.verify(lbRequestObserver).onNext( 413 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 414 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 415 .build())); 416 417 // Simulate receiving LB response 418 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 419 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 420 421 // Load reporting task is scheduled 422 assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 423 assertEquals(0, fakeClock.runDueTasks()); 424 425 List<ServerEntry> backends = Arrays.asList( 426 new ServerEntry("127.0.0.1", 2000, "token0001"), 427 new ServerEntry("token0001"), // drop 428 new ServerEntry("127.0.0.1", 2010, "token0002"), 429 new ServerEntry("token0003")); // drop 430 431 lbResponseObserver.onNext(buildLbResponse(backends)); 432 433 assertEquals(2, mockSubchannels.size()); 434 Subchannel subchannel1 = mockSubchannels.poll(); 435 Subchannel subchannel2 = mockSubchannels.poll(); 436 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); 437 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); 438 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); 439 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); 440 441 helperInOrder.verify(helper, atLeast(1)) 442 .updateBalancingState(eq(READY), pickerCaptor.capture()); 443 RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); 444 assertThat(picker.dropList).containsExactly( 445 null, 446 new DropEntry(getLoadRecorder(), "token0001"), 447 null, 448 new DropEntry(getLoadRecorder(), "token0003")).inOrder(); 449 assertThat(picker.pickList).containsExactly( 450 new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), 451 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")).inOrder(); 452 453 // Report, no data 454 assertNextReport( 455 inOrder, lbRequestObserver, loadReportIntervalMillis, 456 ClientStats.newBuilder().build()); 457 458 PickResult pick1 = picker.pickSubchannel(args); 459 assertSame(subchannel1, pick1.getSubchannel()); 460 assertSame(getLoadRecorder(), pick1.getStreamTracerFactory()); 461 462 // Merely the pick will not be recorded as upstart. 463 assertNextReport( 464 inOrder, lbRequestObserver, loadReportIntervalMillis, 465 ClientStats.newBuilder().build()); 466 467 ClientStreamTracer tracer1 = 468 pick1.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); 469 470 PickResult pick2 = picker.pickSubchannel(args); 471 assertNull(pick2.getSubchannel()); 472 assertSame(DROP_PICK_RESULT, pick2); 473 474 // Report includes upstart of pick1 and the drop of pick2 475 assertNextReport( 476 inOrder, lbRequestObserver, loadReportIntervalMillis, 477 ClientStats.newBuilder() 478 .setNumCallsStarted(2) 479 .setNumCallsFinished(1) // pick2 480 .addCallsFinishedWithDrop( 481 ClientStatsPerToken.newBuilder() 482 .setLoadBalanceToken("token0001") 483 .setNumCalls(1) // pick2 484 .build()) 485 .build()); 486 487 PickResult pick3 = picker.pickSubchannel(args); 488 assertSame(subchannel2, pick3.getSubchannel()); 489 assertSame(getLoadRecorder(), pick3.getStreamTracerFactory()); 490 ClientStreamTracer tracer3 = 491 pick3.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); 492 493 // pick3 has sent out headers 494 tracer3.outboundHeaders(); 495 496 // 3rd report includes pick3's upstart 497 assertNextReport( 498 inOrder, lbRequestObserver, loadReportIntervalMillis, 499 ClientStats.newBuilder() 500 .setNumCallsStarted(1) 501 .build()); 502 503 PickResult pick4 = picker.pickSubchannel(args); 504 assertNull(pick4.getSubchannel()); 505 assertSame(DROP_PICK_RESULT, pick4); 506 507 // pick1 ended without sending anything 508 tracer1.streamClosed(Status.CANCELLED); 509 510 // 4th report includes end of pick1 and drop of pick4 511 assertNextReport( 512 inOrder, lbRequestObserver, loadReportIntervalMillis, 513 ClientStats.newBuilder() 514 .setNumCallsStarted(1) // pick4 515 .setNumCallsFinished(2) 516 .setNumCallsFinishedWithClientFailedToSend(1) // pick1 517 .addCallsFinishedWithDrop( 518 ClientStatsPerToken.newBuilder() 519 .setLoadBalanceToken("token0003") 520 .setNumCalls(1) // pick4 521 .build()) 522 .build()); 523 524 PickResult pick5 = picker.pickSubchannel(args); 525 assertSame(subchannel1, pick1.getSubchannel()); 526 assertSame(getLoadRecorder(), pick5.getStreamTracerFactory()); 527 ClientStreamTracer tracer5 = 528 pick5.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); 529 530 // pick3 ended without receiving response headers 531 tracer3.streamClosed(Status.DEADLINE_EXCEEDED); 532 533 // pick5 sent and received headers 534 tracer5.outboundHeaders(); 535 tracer5.inboundHeaders(); 536 537 // 5th report includes pick3's end and pick5's upstart 538 assertNextReport( 539 inOrder, lbRequestObserver, loadReportIntervalMillis, 540 ClientStats.newBuilder() 541 .setNumCallsStarted(1) // pick5 542 .setNumCallsFinished(1) // pick3 543 .build()); 544 545 // pick5 ends 546 tracer5.streamClosed(Status.OK); 547 548 // 6th report includes pick5's end 549 assertNextReport( 550 inOrder, lbRequestObserver, loadReportIntervalMillis, 551 ClientStats.newBuilder() 552 .setNumCallsFinished(1) 553 .setNumCallsFinishedKnownReceived(1) 554 .build()); 555 556 assertEquals(1, fakeClock.numPendingTasks()); 557 // Balancer closes the stream, scheduled reporting task cancelled 558 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 559 assertEquals(0, fakeClock.numPendingTasks()); 560 561 // New stream created 562 verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); 563 lbResponseObserver = lbResponseObserverCaptor.getValue(); 564 assertEquals(1, lbRequestObservers.size()); 565 lbRequestObserver = lbRequestObservers.poll(); 566 inOrder = inOrder(lbRequestObserver); 567 568 inOrder.verify(lbRequestObserver).onNext( 569 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 570 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 571 .build())); 572 573 // Load reporting is also requested 574 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 575 576 // No picker created because balancer is still using the results from the last stream 577 helperInOrder.verify(helper, never()) 578 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 579 580 // Make a new pick on that picker. It will not show up on the report of the new stream, because 581 // that picker is associated with the previous stream. 582 PickResult pick6 = picker.pickSubchannel(args); 583 assertNull(pick6.getSubchannel()); 584 assertSame(DROP_PICK_RESULT, pick6); 585 assertNextReport( 586 inOrder, lbRequestObserver, loadReportIntervalMillis, 587 ClientStats.newBuilder().build()); 588 589 // New stream got the list update 590 lbResponseObserver.onNext(buildLbResponse(backends)); 591 592 // Same backends, thus no new subchannels 593 helperInOrder.verify(subchannelPool, never()).takeOrCreateSubchannel( 594 any(EquivalentAddressGroup.class), any(Attributes.class)); 595 // But the new RoundRobinEntries have a new loadRecorder, thus considered different from 596 // the previous list, thus a new picker is created 597 helperInOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 598 picker = (RoundRobinPicker) pickerCaptor.getValue(); 599 600 PickResult pick1p = picker.pickSubchannel(args); 601 assertSame(subchannel1, pick1p.getSubchannel()); 602 assertSame(getLoadRecorder(), pick1p.getStreamTracerFactory()); 603 pick1p.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); 604 605 // The pick from the new stream will be included in the report 606 assertNextReport( 607 inOrder, lbRequestObserver, loadReportIntervalMillis, 608 ClientStats.newBuilder() 609 .setNumCallsStarted(1) 610 .build()); 611 612 verify(args, atLeast(0)).getHeaders(); 613 verifyNoMoreInteractions(args); 614 } 615 616 @Test abundantInitialResponse()617 public void abundantInitialResponse() { 618 Metadata headers = new Metadata(); 619 PickSubchannelArgs args = mock(PickSubchannelArgs.class); 620 when(args.getHeaders()).thenReturn(headers); 621 622 List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); 623 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 624 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 625 assertEquals(1, fakeOobChannels.size()); 626 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 627 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 628 629 // Simulate LB initial response 630 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 631 lbResponseObserver.onNext(buildInitialResponse(1983)); 632 633 // Load reporting task is scheduled 634 assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 635 FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next(); 636 assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); 637 638 // Simulate an abundant LB initial response, with a different report interval 639 lbResponseObserver.onNext(buildInitialResponse(9097)); 640 // It doesn't affect load-reporting at all 641 assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER)) 642 .containsExactly(scheduledTask); 643 assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); 644 } 645 646 @Test raceBetweenLoadReportingAndLbStreamClosure()647 public void raceBetweenLoadReportingAndLbStreamClosure() { 648 Metadata headers = new Metadata(); 649 PickSubchannelArgs args = mock(PickSubchannelArgs.class); 650 when(args.getHeaders()).thenReturn(headers); 651 652 List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); 653 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 654 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 655 assertEquals(1, fakeOobChannels.size()); 656 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 657 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 658 assertEquals(1, lbRequestObservers.size()); 659 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 660 InOrder inOrder = inOrder(lbRequestObserver); 661 662 inOrder.verify(lbRequestObserver).onNext( 663 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 664 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 665 .build())); 666 667 // Simulate receiving LB response 668 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 669 lbResponseObserver.onNext(buildInitialResponse(1983)); 670 671 // Load reporting task is scheduled 672 assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 673 FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next(); 674 assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); 675 676 // Close lbStream 677 lbResponseObserver.onCompleted(); 678 679 // Reporting task cancelled 680 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 681 682 // Simulate a race condition where the task has just started when its cancelled 683 scheduledTask.command.run(); 684 685 // No report sent. No new task scheduled 686 inOrder.verify(lbRequestObserver, never()).onNext(any(LoadBalanceRequest.class)); 687 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 688 } 689 assertNextReport( InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver, long loadReportIntervalMillis, ClientStats expectedReport)690 private void assertNextReport( 691 InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver, 692 long loadReportIntervalMillis, ClientStats expectedReport) { 693 assertEquals(0, fakeClock.forwardTime(loadReportIntervalMillis - 1, TimeUnit.MILLISECONDS)); 694 inOrder.verifyNoMoreInteractions(); 695 assertEquals(1, fakeClock.forwardTime(1, TimeUnit.MILLISECONDS)); 696 assertEquals(1, fakeClock.numPendingTasks()); 697 inOrder.verify(lbRequestObserver).onNext( 698 eq(LoadBalanceRequest.newBuilder() 699 .setClientStats( 700 ClientStats.newBuilder(expectedReport) 701 .setTimestamp(Timestamps.fromNanos(fakeClock.getTicker().read())) 702 .build()) 703 .build())); 704 } 705 706 @Test acquireAndReleaseScheduledExecutor()707 public void acquireAndReleaseScheduledExecutor() { 708 verify(timerServicePool).getObject(); 709 verifyNoMoreInteractions(timerServicePool); 710 711 balancer.shutdown(); 712 verify(timerServicePool).returnObject(same(fakeClock.getScheduledExecutorService())); 713 verifyNoMoreInteractions(timerServicePool); 714 } 715 716 @Test nameResolutionFailsThenRecoverToDelegate()717 public void nameResolutionFailsThenRecoverToDelegate() { 718 Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); 719 deliverNameResolutionError(error); 720 verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 721 RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); 722 assertThat(picker.dropList).isEmpty(); 723 assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); 724 725 // Recover with a subsequent success 726 List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false); 727 728 Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build(); 729 deliverResolvedAddresses(resolvedServers, resolutionAttrs); 730 } 731 732 @Test nameResolutionFailsThenRecoverToGrpclb()733 public void nameResolutionFailsThenRecoverToGrpclb() { 734 Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); 735 deliverNameResolutionError(error); 736 verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 737 RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); 738 assertThat(picker.dropList).isEmpty(); 739 assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); 740 741 // Recover with a subsequent success 742 List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(true); 743 EquivalentAddressGroup eag = resolvedServers.get(0); 744 745 Attributes resolutionAttrs = Attributes.EMPTY; 746 deliverResolvedAddresses(resolvedServers, resolutionAttrs); 747 748 verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0))); 749 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 750 } 751 752 @Test grpclbThenNameResolutionFails()753 public void grpclbThenNameResolutionFails() { 754 InOrder inOrder = inOrder(helper, subchannelPool); 755 // Go to GRPCLB first 756 List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); 757 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 758 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 759 760 verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); 761 assertEquals(1, fakeOobChannels.size()); 762 ManagedChannel oobChannel = fakeOobChannels.poll(); 763 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 764 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 765 766 // Let name resolution fail before round-robin list is ready 767 Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); 768 deliverNameResolutionError(error); 769 770 inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 771 RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); 772 assertThat(picker.dropList).isEmpty(); 773 assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); 774 assertFalse(oobChannel.isShutdown()); 775 776 // Simulate receiving LB response 777 List<ServerEntry> backends = Arrays.asList( 778 new ServerEntry("127.0.0.1", 2000, "TOKEN1"), 779 new ServerEntry("127.0.0.1", 2010, "TOKEN2")); 780 verify(helper, never()).runSerialized(any(Runnable.class)); 781 lbResponseObserver.onNext(buildInitialResponse()); 782 lbResponseObserver.onNext(buildLbResponse(backends)); 783 784 verify(helper, times(2)).runSerialized(any(Runnable.class)); 785 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 786 eq(new EquivalentAddressGroup(backends.get(0).addr, LB_BACKEND_ATTRS)), 787 any(Attributes.class)); 788 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 789 eq(new EquivalentAddressGroup(backends.get(1).addr, LB_BACKEND_ATTRS)), 790 any(Attributes.class)); 791 } 792 793 @Test grpclbUpdatedAddresses_avoidsReconnect()794 public void grpclbUpdatedAddresses_avoidsReconnect() { 795 List<EquivalentAddressGroup> grpclbResolutionList = 796 createResolvedServerAddresses(true, false); 797 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 798 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 799 800 verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); 801 ManagedChannel oobChannel = fakeOobChannels.poll(); 802 assertEquals(1, lbRequestObservers.size()); 803 804 List<EquivalentAddressGroup> grpclbResolutionList2 = 805 createResolvedServerAddresses(true, false, true); 806 EquivalentAddressGroup combinedEag = new EquivalentAddressGroup(Arrays.asList( 807 grpclbResolutionList2.get(0).getAddresses().get(0), 808 grpclbResolutionList2.get(2).getAddresses().get(0)), 809 lbAttributes(lbAuthority(0))); 810 deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs); 811 verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(combinedEag)); 812 assertEquals(1, lbRequestObservers.size()); // No additional RPC 813 } 814 815 @Test grpclbUpdatedAddresses_reconnectOnAuthorityChange()816 public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() { 817 List<EquivalentAddressGroup> grpclbResolutionList = 818 createResolvedServerAddresses(true, false); 819 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 820 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 821 822 verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); 823 ManagedChannel oobChannel = fakeOobChannels.poll(); 824 assertEquals(1, lbRequestObservers.size()); 825 826 final String newAuthority = "some-new-authority"; 827 List<EquivalentAddressGroup> grpclbResolutionList2 = 828 createResolvedServerAddresses(false); 829 grpclbResolutionList2.add(new EquivalentAddressGroup( 830 new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority))); 831 deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs); 832 assertTrue(oobChannel.isTerminated()); 833 verify(helper).createOobChannel(eq(grpclbResolutionList2.get(1)), eq(newAuthority)); 834 assertEquals(2, lbRequestObservers.size()); // An additional RPC 835 } 836 837 @Test grpclbWorking()838 public void grpclbWorking() { 839 InOrder inOrder = inOrder(helper, subchannelPool); 840 List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); 841 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 842 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 843 844 // Fallback timer is started as soon as the addresses are resolved. 845 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 846 847 verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); 848 assertEquals(1, fakeOobChannels.size()); 849 ManagedChannel oobChannel = fakeOobChannels.poll(); 850 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 851 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 852 assertEquals(1, lbRequestObservers.size()); 853 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 854 verify(lbRequestObserver).onNext( 855 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 856 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 857 .build())); 858 859 // Simulate receiving LB response 860 List<ServerEntry> backends1 = Arrays.asList( 861 new ServerEntry("127.0.0.1", 2000, "token0001"), 862 new ServerEntry("127.0.0.1", 2010, "token0002")); 863 inOrder.verify(helper, never()) 864 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 865 lbResponseObserver.onNext(buildInitialResponse()); 866 lbResponseObserver.onNext(buildLbResponse(backends1)); 867 868 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 869 eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), 870 any(Attributes.class)); 871 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 872 eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), 873 any(Attributes.class)); 874 assertEquals(2, mockSubchannels.size()); 875 Subchannel subchannel1 = mockSubchannels.poll(); 876 Subchannel subchannel2 = mockSubchannels.poll(); 877 verify(subchannel1).requestConnection(); 878 verify(subchannel2).requestConnection(); 879 assertEquals( 880 new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS), 881 subchannel1.getAddresses()); 882 assertEquals( 883 new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS), 884 subchannel2.getAddresses()); 885 886 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); 887 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); 888 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 889 RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); 890 assertThat(picker0.dropList).containsExactly(null, null); 891 assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY); 892 inOrder.verifyNoMoreInteractions(); 893 894 // Let subchannels be connected 895 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); 896 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 897 RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); 898 899 assertThat(picker1.dropList).containsExactly(null, null); 900 assertThat(picker1.pickList).containsExactly( 901 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); 902 903 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); 904 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 905 RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); 906 assertThat(picker2.dropList).containsExactly(null, null); 907 assertThat(picker2.pickList).containsExactly( 908 new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), 909 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) 910 .inOrder(); 911 912 // Disconnected subchannels 913 verify(subchannel1).requestConnection(); 914 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE)); 915 verify(subchannel1, times(2)).requestConnection(); 916 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 917 RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); 918 assertThat(picker3.dropList).containsExactly(null, null); 919 assertThat(picker3.pickList).containsExactly( 920 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); 921 922 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); 923 inOrder.verifyNoMoreInteractions(); 924 925 // As long as there is at least one READY subchannel, round robin will work. 926 Status error1 = Status.UNAVAILABLE.withDescription("error1"); 927 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1)); 928 inOrder.verifyNoMoreInteractions(); 929 930 // If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING 931 verify(subchannel2).requestConnection(); 932 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE)); 933 verify(subchannel2, times(2)).requestConnection(); 934 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 935 RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); 936 assertThat(picker4.dropList).containsExactly(null, null); 937 assertThat(picker4.pickList).containsExactly(BUFFER_ENTRY); 938 939 // Update backends, with a drop entry 940 List<ServerEntry> backends2 = 941 Arrays.asList( 942 new ServerEntry("127.0.0.1", 2030, "token0003"), // New address 943 new ServerEntry("token0003"), // drop 944 new ServerEntry("127.0.0.1", 2010, "token0004"), // Existing address with token changed 945 new ServerEntry("127.0.0.1", 2030, "token0005"), // New address appearing second time 946 new ServerEntry("token0006")); // drop 947 verify(subchannelPool, never()).returnSubchannel(same(subchannel1)); 948 949 lbResponseObserver.onNext(buildLbResponse(backends2)); 950 // not in backends2, closed 951 verify(subchannelPool).returnSubchannel(same(subchannel1)); 952 // backends2[2], will be kept 953 verify(subchannelPool, never()).returnSubchannel(same(subchannel2)); 954 955 inOrder.verify(subchannelPool, never()).takeOrCreateSubchannel( 956 eq(new EquivalentAddressGroup(backends2.get(2).addr, LB_BACKEND_ATTRS)), 957 any(Attributes.class)); 958 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 959 eq(new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS)), 960 any(Attributes.class)); 961 assertEquals(1, mockSubchannels.size()); 962 Subchannel subchannel3 = mockSubchannels.poll(); 963 verify(subchannel3).requestConnection(); 964 assertEquals( 965 new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS), 966 subchannel3.getAddresses()); 967 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 968 RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue(); 969 assertThat(picker7.dropList).containsExactly( 970 null, 971 new DropEntry(getLoadRecorder(), "token0003"), 972 null, 973 null, 974 new DropEntry(getLoadRecorder(), "token0006")).inOrder(); 975 assertThat(picker7.pickList).containsExactly(BUFFER_ENTRY); 976 977 // State updates on obsolete subchannel1 will have no effect 978 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); 979 deliverSubchannelState( 980 subchannel1, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); 981 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN)); 982 inOrder.verifyNoMoreInteractions(); 983 984 deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); 985 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 986 RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue(); 987 assertThat(picker8.dropList).containsExactly( 988 null, 989 new DropEntry(getLoadRecorder(), "token0003"), 990 null, 991 null, 992 new DropEntry(getLoadRecorder(), "token0006")).inOrder(); 993 // subchannel2 is still IDLE, thus not in the active list 994 assertThat(picker8.pickList).containsExactly( 995 new BackendEntry(subchannel3, getLoadRecorder(), "token0003"), 996 new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder(); 997 // subchannel2 becomes READY and makes it into the list 998 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); 999 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 1000 RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue(); 1001 assertThat(picker9.dropList).containsExactly( 1002 null, 1003 new DropEntry(getLoadRecorder(), "token0003"), 1004 null, 1005 null, 1006 new DropEntry(getLoadRecorder(), "token0006")).inOrder(); 1007 assertThat(picker9.pickList).containsExactly( 1008 new BackendEntry(subchannel3, getLoadRecorder(), "token0003"), 1009 new BackendEntry(subchannel2, getLoadRecorder(), "token0004"), 1010 new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder(); 1011 verify(subchannelPool, never()).returnSubchannel(same(subchannel3)); 1012 1013 // Update backends, with no entry 1014 lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList())); 1015 verify(subchannelPool).returnSubchannel(same(subchannel2)); 1016 verify(subchannelPool).returnSubchannel(same(subchannel3)); 1017 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 1018 RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue(); 1019 assertThat(picker10.dropList).isEmpty(); 1020 assertThat(picker10.pickList).containsExactly(BUFFER_ENTRY); 1021 1022 assertFalse(oobChannel.isShutdown()); 1023 assertEquals(0, lbRequestObservers.size()); 1024 verify(lbRequestObserver, never()).onCompleted(); 1025 verify(lbRequestObserver, never()).onError(any(Throwable.class)); 1026 1027 // Load reporting was not requested, thus never scheduled 1028 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 1029 1030 verify(subchannelPool, never()).clear(); 1031 balancer.shutdown(); 1032 verify(subchannelPool).clear(); 1033 } 1034 1035 @Test grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires()1036 public void grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires() { 1037 subtestGrpclbFallbackInitialTimeout(false); 1038 } 1039 1040 @Test grpclbFallback_initialTimeout_timerExpires()1041 public void grpclbFallback_initialTimeout_timerExpires() { 1042 subtestGrpclbFallbackInitialTimeout(true); 1043 } 1044 1045 // Fallback or not within the period of the initial timeout. subtestGrpclbFallbackInitialTimeout(boolean timerExpires)1046 private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { 1047 long loadReportIntervalMillis = 1983; 1048 InOrder inOrder = inOrder(helper, subchannelPool); 1049 1050 // Create a resolution list with a mixture of balancer and backend addresses 1051 List<EquivalentAddressGroup> resolutionList = 1052 createResolvedServerAddresses(false, true, false); 1053 Attributes resolutionAttrs = Attributes.EMPTY; 1054 deliverResolvedAddresses(resolutionList, resolutionAttrs); 1055 1056 inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); 1057 1058 // Attempted to connect to balancer 1059 assertEquals(1, fakeOobChannels.size()); 1060 ManagedChannel oobChannel = fakeOobChannels.poll(); 1061 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1062 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1063 assertEquals(1, lbRequestObservers.size()); 1064 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 1065 1066 verify(lbRequestObserver).onNext( 1067 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1068 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1069 .build())); 1070 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 1071 // We don't care if runSerialized() has been run. 1072 inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class)); 1073 inOrder.verifyNoMoreInteractions(); 1074 1075 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1076 fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS); 1077 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1078 1079 ////////////////////////////////// 1080 // Fallback timer expires (or not) 1081 ////////////////////////////////// 1082 if (timerExpires) { 1083 fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); 1084 assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1085 // Fall back to the backends from resolver 1086 fallbackTestVerifyUseOfFallbackBackendLists( 1087 inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); 1088 1089 assertFalse(oobChannel.isShutdown()); 1090 verify(lbRequestObserver, never()).onCompleted(); 1091 } 1092 1093 //////////////////////////////////////////////////////// 1094 // Name resolver sends new list without any backend addr 1095 //////////////////////////////////////////////////////// 1096 resolutionList = createResolvedServerAddresses(true, true); 1097 deliverResolvedAddresses(resolutionList, resolutionAttrs); 1098 1099 // New addresses are updated to the OobChannel 1100 inOrder.verify(helper).updateOobChannelAddresses( 1101 same(oobChannel), 1102 eq(new EquivalentAddressGroup( 1103 Arrays.asList( 1104 resolutionList.get(0).getAddresses().get(0), 1105 resolutionList.get(1).getAddresses().get(0)), 1106 lbAttributes(lbAuthority(0))))); 1107 1108 if (timerExpires) { 1109 // Still in fallback logic, except that the backend list is empty 1110 fallbackTestVerifyUseOfFallbackBackendLists( 1111 inOrder, Collections.<EquivalentAddressGroup>emptyList()); 1112 } 1113 1114 ////////////////////////////////////////////////// 1115 // Name resolver sends new list with backend addrs 1116 ////////////////////////////////////////////////// 1117 resolutionList = createResolvedServerAddresses(true, false, false); 1118 deliverResolvedAddresses(resolutionList, resolutionAttrs); 1119 1120 // New LB address is updated to the OobChannel 1121 inOrder.verify(helper).updateOobChannelAddresses( 1122 same(oobChannel), 1123 eq(resolutionList.get(0))); 1124 1125 if (timerExpires) { 1126 // New backend addresses are used for fallback 1127 fallbackTestVerifyUseOfFallbackBackendLists( 1128 inOrder, Arrays.asList(resolutionList.get(1), resolutionList.get(2))); 1129 } 1130 1131 //////////////////////////////////////////////// 1132 // Break the LB stream after the timer expires 1133 //////////////////////////////////////////////// 1134 if (timerExpires) { 1135 Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); 1136 lbResponseObserver.onError(streamError.asException()); 1137 1138 // The error will NOT propagate to picker because fallback list is in use. 1139 inOrder.verify(helper, never()) 1140 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1141 // A new stream is created 1142 verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); 1143 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1144 assertEquals(1, lbRequestObservers.size()); 1145 lbRequestObserver = lbRequestObservers.poll(); 1146 verify(lbRequestObserver).onNext( 1147 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1148 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1149 .build())); 1150 } 1151 1152 ///////////////////////////////// 1153 // Balancer returns a server list 1154 ///////////////////////////////// 1155 List<ServerEntry> serverList = Arrays.asList( 1156 new ServerEntry("127.0.0.1", 2000, "token0001"), 1157 new ServerEntry("127.0.0.1", 2010, "token0002")); 1158 lbResponseObserver.onNext(buildInitialResponse()); 1159 lbResponseObserver.onNext(buildLbResponse(serverList)); 1160 1161 // Balancer-provided server list now in effect 1162 fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList); 1163 1164 /////////////////////////////////////////////////////////////// 1165 // New backend addresses from resolver outside of fallback mode 1166 /////////////////////////////////////////////////////////////// 1167 resolutionList = createResolvedServerAddresses(true, false); 1168 deliverResolvedAddresses(resolutionList, resolutionAttrs); 1169 // Will not affect the round robin list at all 1170 inOrder.verify(helper, never()) 1171 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1172 1173 // No fallback timeout timer scheduled. 1174 assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1175 } 1176 1177 @Test grpclbFallback_breakLbStreamBeforeFallbackTimerExpires()1178 public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { 1179 long loadReportIntervalMillis = 1983; 1180 InOrder inOrder = inOrder(helper, subchannelPool); 1181 1182 // Create a resolution list with a mixture of balancer and backend addresses 1183 List<EquivalentAddressGroup> resolutionList = 1184 createResolvedServerAddresses(false, true, false); 1185 Attributes resolutionAttrs = Attributes.EMPTY; 1186 deliverResolvedAddresses(resolutionList, resolutionAttrs); 1187 1188 inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); 1189 1190 // Attempted to connect to balancer 1191 assertEquals(1, fakeOobChannels.size()); 1192 ManagedChannel oobChannel = fakeOobChannels.poll(); 1193 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1194 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1195 assertEquals(1, lbRequestObservers.size()); 1196 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 1197 1198 verify(lbRequestObserver).onNext( 1199 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1200 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1201 .build())); 1202 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 1203 // We don't care if runSerialized() has been run. 1204 inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class)); 1205 1206 inOrder.verifyNoMoreInteractions(); 1207 1208 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1209 1210 ///////////////////////////////////////////// 1211 // Break the LB stream before timer expires 1212 ///////////////////////////////////////////// 1213 Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); 1214 lbResponseObserver.onError(streamError.asException()); 1215 1216 // Fall back to the backends from resolver 1217 fallbackTestVerifyUseOfFallbackBackendLists( 1218 inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); 1219 1220 // A new stream is created 1221 verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); 1222 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1223 assertEquals(1, lbRequestObservers.size()); 1224 lbRequestObserver = lbRequestObservers.poll(); 1225 verify(lbRequestObserver).onNext( 1226 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1227 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1228 .build())); 1229 } 1230 1231 @Test grpclbFallback_balancerLost()1232 public void grpclbFallback_balancerLost() { 1233 subtestGrpclbFallbackConnectionLost(true, false); 1234 } 1235 1236 @Test grpclbFallback_subchannelsLost()1237 public void grpclbFallback_subchannelsLost() { 1238 subtestGrpclbFallbackConnectionLost(false, true); 1239 } 1240 1241 @Test grpclbFallback_allLost()1242 public void grpclbFallback_allLost() { 1243 subtestGrpclbFallbackConnectionLost(true, true); 1244 } 1245 1246 // Fallback outside of the initial timeout, where all connections are lost. subtestGrpclbFallbackConnectionLost( boolean balancerBroken, boolean allSubchannelsBroken)1247 private void subtestGrpclbFallbackConnectionLost( 1248 boolean balancerBroken, boolean allSubchannelsBroken) { 1249 long loadReportIntervalMillis = 1983; 1250 InOrder inOrder = inOrder(helper, mockLbService, subchannelPool); 1251 1252 // Create a resolution list with a mixture of balancer and backend addresses 1253 List<EquivalentAddressGroup> resolutionList = 1254 createResolvedServerAddresses(false, true, false); 1255 Attributes resolutionAttrs = Attributes.EMPTY; 1256 deliverResolvedAddresses(resolutionList, resolutionAttrs); 1257 1258 inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); 1259 1260 // Attempted to connect to balancer 1261 assertEquals(1, fakeOobChannels.size()); 1262 fakeOobChannels.poll(); 1263 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1264 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1265 assertEquals(1, lbRequestObservers.size()); 1266 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 1267 1268 verify(lbRequestObserver).onNext( 1269 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1270 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1271 .build())); 1272 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 1273 // We don't care if runSerialized() has been run. 1274 inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class)); 1275 inOrder.verifyNoMoreInteractions(); 1276 1277 // Balancer returns a server list 1278 List<ServerEntry> serverList = Arrays.asList( 1279 new ServerEntry("127.0.0.1", 2000, "token0001"), 1280 new ServerEntry("127.0.0.1", 2010, "token0002")); 1281 lbResponseObserver.onNext(buildInitialResponse()); 1282 lbResponseObserver.onNext(buildLbResponse(serverList)); 1283 1284 List<Subchannel> subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList); 1285 1286 // Break connections 1287 if (balancerBroken) { 1288 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 1289 // A new stream to LB is created 1290 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1291 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1292 assertEquals(1, lbRequestObservers.size()); 1293 lbRequestObserver = lbRequestObservers.poll(); 1294 } 1295 if (allSubchannelsBroken) { 1296 for (Subchannel subchannel : subchannels) { 1297 // A READY subchannel transits to IDLE when receiving a go-away 1298 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); 1299 } 1300 } 1301 1302 if (balancerBroken && allSubchannelsBroken) { 1303 // Going into fallback 1304 subchannels = fallbackTestVerifyUseOfFallbackBackendLists( 1305 inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); 1306 1307 // When in fallback mode, fallback timer should not be scheduled when all backend 1308 // connections are lost 1309 for (Subchannel subchannel : subchannels) { 1310 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); 1311 } 1312 1313 // Exit fallback mode or cancel fallback timer when receiving a new server list from balancer 1314 List<ServerEntry> serverList2 = Arrays.asList( 1315 new ServerEntry("127.0.0.1", 2001, "token0003"), 1316 new ServerEntry("127.0.0.1", 2011, "token0004")); 1317 lbResponseObserver.onNext(buildInitialResponse()); 1318 lbResponseObserver.onNext(buildLbResponse(serverList2)); 1319 1320 fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList2); 1321 } 1322 assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1323 1324 if (!(balancerBroken && allSubchannelsBroken)) { 1325 verify(subchannelPool, never()).takeOrCreateSubchannel( 1326 eq(resolutionList.get(0)), any(Attributes.class)); 1327 verify(subchannelPool, never()).takeOrCreateSubchannel( 1328 eq(resolutionList.get(2)), any(Attributes.class)); 1329 } 1330 } 1331 fallbackTestVerifyUseOfFallbackBackendLists( InOrder inOrder, List<EquivalentAddressGroup> addrs)1332 private List<Subchannel> fallbackTestVerifyUseOfFallbackBackendLists( 1333 InOrder inOrder, List<EquivalentAddressGroup> addrs) { 1334 return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, null); 1335 } 1336 fallbackTestVerifyUseOfBalancerBackendLists( InOrder inOrder, List<ServerEntry> servers)1337 private List<Subchannel> fallbackTestVerifyUseOfBalancerBackendLists( 1338 InOrder inOrder, List<ServerEntry> servers) { 1339 ArrayList<EquivalentAddressGroup> addrs = new ArrayList<>(); 1340 ArrayList<String> tokens = new ArrayList<>(); 1341 for (ServerEntry server : servers) { 1342 addrs.add(new EquivalentAddressGroup(server.addr, LB_BACKEND_ATTRS)); 1343 tokens.add(server.token); 1344 } 1345 return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, tokens); 1346 } 1347 fallbackTestVerifyUseOfBackendLists( InOrder inOrder, List<EquivalentAddressGroup> addrs, @Nullable List<String> tokens)1348 private List<Subchannel> fallbackTestVerifyUseOfBackendLists( 1349 InOrder inOrder, List<EquivalentAddressGroup> addrs, 1350 @Nullable List<String> tokens) { 1351 if (tokens != null) { 1352 assertEquals(addrs.size(), tokens.size()); 1353 } 1354 for (EquivalentAddressGroup addr : addrs) { 1355 inOrder.verify(subchannelPool).takeOrCreateSubchannel(eq(addr), any(Attributes.class)); 1356 } 1357 RoundRobinPicker picker = (RoundRobinPicker) currentPicker; 1358 assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); 1359 assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY); 1360 assertEquals(addrs.size(), mockSubchannels.size()); 1361 ArrayList<Subchannel> subchannels = new ArrayList<>(mockSubchannels); 1362 mockSubchannels.clear(); 1363 for (Subchannel subchannel : subchannels) { 1364 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); 1365 } 1366 inOrder.verify(helper, atLeast(0)) 1367 .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); 1368 inOrder.verify(helper, never()) 1369 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1370 1371 ArrayList<BackendEntry> pickList = new ArrayList<>(); 1372 for (int i = 0; i < addrs.size(); i++) { 1373 Subchannel subchannel = subchannels.get(i); 1374 BackendEntry backend; 1375 if (tokens == null) { 1376 backend = new BackendEntry(subchannel); 1377 } else { 1378 backend = new BackendEntry(subchannel, getLoadRecorder(), tokens.get(i)); 1379 } 1380 pickList.add(backend); 1381 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); 1382 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 1383 picker = (RoundRobinPicker) pickerCaptor.getValue(); 1384 assertThat(picker.dropList) 1385 .containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); 1386 assertThat(picker.pickList).containsExactlyElementsIn(pickList); 1387 inOrder.verify(helper, never()) 1388 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1389 } 1390 return subchannels; 1391 } 1392 1393 @Test grpclbMultipleAuthorities()1394 public void grpclbMultipleAuthorities() throws Exception { 1395 List<EquivalentAddressGroup> grpclbResolutionList = Arrays.asList( 1396 new EquivalentAddressGroup( 1397 new FakeSocketAddress("fake-address-1"), 1398 lbAttributes("fake-authority-1")), 1399 new EquivalentAddressGroup( 1400 new FakeSocketAddress("fake-address-2"), 1401 lbAttributes("fake-authority-2")), 1402 new EquivalentAddressGroup( 1403 new FakeSocketAddress("not-a-lb-address")), 1404 new EquivalentAddressGroup( 1405 new FakeSocketAddress("fake-address-3"), 1406 lbAttributes("fake-authority-1"))); 1407 final EquivalentAddressGroup goldenOobChannelEag = new EquivalentAddressGroup( 1408 Arrays.<SocketAddress>asList( 1409 new FakeSocketAddress("fake-address-1"), 1410 new FakeSocketAddress("fake-address-3")), 1411 lbAttributes("fake-authority-1")); // Supporting multiple authorities would be good, one day 1412 1413 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 1414 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 1415 1416 verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1"); 1417 } 1418 1419 @Test grpclbBalancerStreamRetry()1420 public void grpclbBalancerStreamRetry() throws Exception { 1421 LoadBalanceRequest expectedInitialRequest = 1422 LoadBalanceRequest.newBuilder() 1423 .setInitialRequest( 1424 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1425 .build(); 1426 InOrder inOrder = 1427 inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2); 1428 List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); 1429 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 1430 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 1431 1432 assertEquals(1, fakeOobChannels.size()); 1433 @SuppressWarnings("unused") 1434 ManagedChannel oobChannel = fakeOobChannels.poll(); 1435 1436 // First balancer RPC 1437 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1438 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1439 assertEquals(1, lbRequestObservers.size()); 1440 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 1441 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1442 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1443 1444 // Balancer closes it immediately (erroneously) 1445 lbResponseObserver.onCompleted(); 1446 // Will start backoff sequence 1 (10ns) 1447 inOrder.verify(backoffPolicyProvider).get(); 1448 inOrder.verify(backoffPolicy1).nextBackoffNanos(); 1449 assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1450 1451 // Fast-forward to a moment before the retry 1452 fakeClock.forwardNanos(9); 1453 verifyNoMoreInteractions(mockLbService); 1454 // Then time for retry 1455 fakeClock.forwardNanos(1); 1456 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1457 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1458 assertEquals(1, lbRequestObservers.size()); 1459 lbRequestObserver = lbRequestObservers.poll(); 1460 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1461 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1462 1463 // Balancer closes it with an error. 1464 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 1465 // Will continue the backoff sequence 1 (100ns) 1466 verifyNoMoreInteractions(backoffPolicyProvider); 1467 inOrder.verify(backoffPolicy1).nextBackoffNanos(); 1468 assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1469 1470 // Fast-forward to a moment before the retry 1471 fakeClock.forwardNanos(100 - 1); 1472 verifyNoMoreInteractions(mockLbService); 1473 // Then time for retry 1474 fakeClock.forwardNanos(1); 1475 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1476 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1477 assertEquals(1, lbRequestObservers.size()); 1478 lbRequestObserver = lbRequestObservers.poll(); 1479 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1480 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1481 1482 // Balancer sends initial response. 1483 lbResponseObserver.onNext(buildInitialResponse()); 1484 1485 // Then breaks the RPC 1486 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 1487 1488 // Will reset the retry sequence and retry immediately, because balancer has responded. 1489 inOrder.verify(backoffPolicyProvider).get(); 1490 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1491 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1492 assertEquals(1, lbRequestObservers.size()); 1493 lbRequestObserver = lbRequestObservers.poll(); 1494 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1495 1496 // Fail the retry after spending 4ns 1497 fakeClock.forwardNanos(4); 1498 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 1499 1500 // Will be on the first retry (10ns) of backoff sequence 2. 1501 inOrder.verify(backoffPolicy2).nextBackoffNanos(); 1502 assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1503 1504 // Fast-forward to a moment before the retry, the time spent in the last try is deducted. 1505 fakeClock.forwardNanos(10 - 4 - 1); 1506 verifyNoMoreInteractions(mockLbService); 1507 // Then time for retry 1508 fakeClock.forwardNanos(1); 1509 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1510 assertEquals(1, lbRequestObservers.size()); 1511 lbRequestObserver = lbRequestObservers.poll(); 1512 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1513 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1514 1515 // Wrapping up 1516 verify(backoffPolicyProvider, times(2)).get(); 1517 verify(backoffPolicy1, times(2)).nextBackoffNanos(); 1518 verify(backoffPolicy2, times(1)).nextBackoffNanos(); 1519 } 1520 deliverSubchannelState( final Subchannel subchannel, final ConnectivityStateInfo newState)1521 private void deliverSubchannelState( 1522 final Subchannel subchannel, final ConnectivityStateInfo newState) { 1523 channelExecutor.execute(new Runnable() { 1524 @Override 1525 public void run() { 1526 balancer.handleSubchannelState(subchannel, newState); 1527 } 1528 }); 1529 } 1530 deliverNameResolutionError(final Status error)1531 private void deliverNameResolutionError(final Status error) { 1532 channelExecutor.execute(new Runnable() { 1533 @Override 1534 public void run() { 1535 balancer.handleNameResolutionError(error); 1536 } 1537 }); 1538 } 1539 deliverResolvedAddresses( final List<EquivalentAddressGroup> addrs, final Attributes attrs)1540 private void deliverResolvedAddresses( 1541 final List<EquivalentAddressGroup> addrs, final Attributes attrs) { 1542 channelExecutor.execute(new Runnable() { 1543 @Override 1544 public void run() { 1545 balancer.handleResolvedAddressGroups(addrs, attrs); 1546 } 1547 }); 1548 } 1549 getLoadRecorder()1550 private GrpclbClientLoadRecorder getLoadRecorder() { 1551 return balancer.getGrpclbState().getLoadRecorder(); 1552 } 1553 createResolvedServerAddresses(boolean ... isLb)1554 private static List<EquivalentAddressGroup> createResolvedServerAddresses(boolean ... isLb) { 1555 ArrayList<EquivalentAddressGroup> list = new ArrayList<>(); 1556 for (int i = 0; i < isLb.length; i++) { 1557 SocketAddress addr = new FakeSocketAddress("fake-address-" + i); 1558 EquivalentAddressGroup eag = 1559 new EquivalentAddressGroup( 1560 addr, 1561 isLb[i] ? lbAttributes(lbAuthority(i)) : Attributes.EMPTY); 1562 list.add(eag); 1563 } 1564 return list; 1565 } 1566 lbAuthority(int unused)1567 private static String lbAuthority(int unused) { 1568 // TODO(ejona): Support varying authorities 1569 return "lb.google.com"; 1570 } 1571 lbAttributes(String authority)1572 private static Attributes lbAttributes(String authority) { 1573 return Attributes.newBuilder() 1574 .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority) 1575 .build(); 1576 } 1577 buildInitialResponse()1578 private static LoadBalanceResponse buildInitialResponse() { 1579 return buildInitialResponse(0); 1580 } 1581 buildInitialResponse(long loadReportIntervalMillis)1582 private static LoadBalanceResponse buildInitialResponse(long loadReportIntervalMillis) { 1583 return LoadBalanceResponse.newBuilder() 1584 .setInitialResponse( 1585 InitialLoadBalanceResponse.newBuilder() 1586 .setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis))) 1587 .build(); 1588 } 1589 buildLbResponse(List<ServerEntry> servers)1590 private static LoadBalanceResponse buildLbResponse(List<ServerEntry> servers) { 1591 ServerList.Builder serverListBuilder = ServerList.newBuilder(); 1592 for (ServerEntry server : servers) { 1593 if (server.addr != null) { 1594 serverListBuilder.addServers(Server.newBuilder() 1595 .setIpAddress(ByteString.copyFrom(server.addr.getAddress().getAddress())) 1596 .setPort(server.addr.getPort()) 1597 .setLoadBalanceToken(server.token) 1598 .build()); 1599 } else { 1600 serverListBuilder.addServers(Server.newBuilder() 1601 .setDrop(true) 1602 .setLoadBalanceToken(server.token) 1603 .build()); 1604 } 1605 } 1606 return LoadBalanceResponse.newBuilder() 1607 .setServerList(serverListBuilder.build()) 1608 .build(); 1609 } 1610 1611 private static class ServerEntry { 1612 final InetSocketAddress addr; 1613 final String token; 1614 ServerEntry(String host, int port, String token)1615 ServerEntry(String host, int port, String token) { 1616 this.addr = new InetSocketAddress(host, port); 1617 this.token = token; 1618 } 1619 1620 // Drop entry ServerEntry(String token)1621 ServerEntry(String token) { 1622 this.addr = null; 1623 this.token = token; 1624 } 1625 } 1626 } 1627