1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.grpclb; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static io.grpc.ConnectivityState.CONNECTING; 21 import static io.grpc.ConnectivityState.IDLE; 22 import static io.grpc.ConnectivityState.READY; 23 import static io.grpc.ConnectivityState.SHUTDOWN; 24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 25 import static io.grpc.grpclb.GrpclbState.BUFFER_ENTRY; 26 import static io.grpc.grpclb.GrpclbState.DROP_PICK_RESULT; 27 import static org.junit.Assert.assertEquals; 28 import static org.junit.Assert.assertFalse; 29 import static org.junit.Assert.assertNull; 30 import static org.junit.Assert.assertSame; 31 import static org.junit.Assert.assertTrue; 32 import static org.mockito.AdditionalAnswers.delegatesTo; 33 import static org.mockito.Matchers.any; 34 import static org.mockito.Matchers.eq; 35 import static org.mockito.Matchers.same; 36 import static org.mockito.Mockito.atLeast; 37 import static org.mockito.Mockito.doAnswer; 38 import static org.mockito.Mockito.inOrder; 39 import static org.mockito.Mockito.mock; 40 import static org.mockito.Mockito.never; 41 import static org.mockito.Mockito.times; 42 import static org.mockito.Mockito.verify; 43 import static org.mockito.Mockito.verifyNoMoreInteractions; 44 import static org.mockito.Mockito.when; 45 46 import com.google.common.collect.Iterables; 47 import com.google.common.util.concurrent.MoreExecutors; 48 import com.google.protobuf.ByteString; 49 import com.google.protobuf.util.Durations; 50 import com.google.protobuf.util.Timestamps; 51 import io.grpc.Attributes; 52 import io.grpc.CallOptions; 53 import io.grpc.ClientStreamTracer; 54 import io.grpc.ConnectivityState; 55 import io.grpc.ConnectivityStateInfo; 56 import io.grpc.EquivalentAddressGroup; 57 import io.grpc.LoadBalancer.Helper; 58 import io.grpc.LoadBalancer.PickResult; 59 import io.grpc.LoadBalancer.PickSubchannelArgs; 60 import io.grpc.LoadBalancer.Subchannel; 61 import io.grpc.LoadBalancer.SubchannelPicker; 62 import io.grpc.ManagedChannel; 63 import io.grpc.Metadata; 64 import io.grpc.Status; 65 import io.grpc.grpclb.GrpclbState.BackendEntry; 66 import io.grpc.grpclb.GrpclbState.DropEntry; 67 import io.grpc.grpclb.GrpclbState.ErrorEntry; 68 import io.grpc.grpclb.GrpclbState.RoundRobinPicker; 69 import io.grpc.inprocess.InProcessChannelBuilder; 70 import io.grpc.inprocess.InProcessServerBuilder; 71 import io.grpc.internal.BackoffPolicy; 72 import io.grpc.internal.FakeClock; 73 import io.grpc.internal.GrpcAttributes; 74 import io.grpc.internal.ObjectPool; 75 import io.grpc.internal.SerializingExecutor; 76 import io.grpc.internal.TimeProvider; 77 import io.grpc.lb.v1.ClientStats; 78 import io.grpc.lb.v1.ClientStatsPerToken; 79 import io.grpc.lb.v1.InitialLoadBalanceRequest; 80 import io.grpc.lb.v1.InitialLoadBalanceResponse; 81 import io.grpc.lb.v1.LoadBalanceRequest; 82 import io.grpc.lb.v1.LoadBalanceResponse; 83 import io.grpc.lb.v1.LoadBalancerGrpc; 84 import io.grpc.lb.v1.Server; 85 import io.grpc.lb.v1.ServerList; 86 import io.grpc.stub.StreamObserver; 87 import java.net.InetSocketAddress; 88 import java.net.SocketAddress; 89 import java.util.ArrayList; 90 import java.util.Arrays; 91 import java.util.Collections; 92 import java.util.LinkedList; 93 import java.util.List; 94 import java.util.concurrent.ScheduledExecutorService; 95 import java.util.concurrent.TimeUnit; 96 import javax.annotation.Nullable; 97 import org.junit.After; 98 import org.junit.Before; 99 import org.junit.Test; 100 import org.junit.runner.RunWith; 101 import org.junit.runners.JUnit4; 102 import org.mockito.ArgumentCaptor; 103 import org.mockito.Captor; 104 import org.mockito.InOrder; 105 import org.mockito.Mock; 106 import org.mockito.MockitoAnnotations; 107 import org.mockito.invocation.InvocationOnMock; 108 import org.mockito.stubbing.Answer; 109 110 /** Unit tests for {@link GrpclbLoadBalancer}. */ 111 @RunWith(JUnit4.class) 112 public class GrpclbLoadBalancerTest { 113 private static final Attributes.Key<String> RESOLUTION_ATTR = 114 Attributes.Key.create("resolution-attr"); 115 private static final String SERVICE_AUTHORITY = "api.google.com"; 116 private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER = 117 new FakeClock.TaskFilter() { 118 @Override 119 public boolean shouldAccept(Runnable command) { 120 return command instanceof GrpclbState.LoadReportingTask; 121 } 122 }; 123 private static final FakeClock.TaskFilter FALLBACK_MODE_TASK_FILTER = 124 new FakeClock.TaskFilter() { 125 @Override 126 public boolean shouldAccept(Runnable command) { 127 return command instanceof GrpclbState.FallbackModeTask; 128 } 129 }; 130 private static final FakeClock.TaskFilter LB_RPC_RETRY_TASK_FILTER = 131 new FakeClock.TaskFilter() { 132 @Override 133 public boolean shouldAccept(Runnable command) { 134 return command instanceof GrpclbState.LbRpcRetryTask; 135 } 136 }; 137 private static final Attributes LB_BACKEND_ATTRS = 138 Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_PROVIDED_BACKEND, true).build(); 139 140 @Mock 141 private Helper helper; 142 @Mock 143 private SubchannelPool subchannelPool; 144 private SubchannelPicker currentPicker; 145 private LoadBalancerGrpc.LoadBalancerImplBase mockLbService; 146 @Captor 147 private ArgumentCaptor<StreamObserver<LoadBalanceResponse>> lbResponseObserverCaptor; 148 private final FakeClock fakeClock = new FakeClock(); 149 private final LinkedList<StreamObserver<LoadBalanceRequest>> lbRequestObservers = 150 new LinkedList<StreamObserver<LoadBalanceRequest>>(); 151 private final LinkedList<Subchannel> mockSubchannels = new LinkedList<Subchannel>(); 152 private final LinkedList<ManagedChannel> fakeOobChannels = new LinkedList<ManagedChannel>(); 153 private final ArrayList<Subchannel> subchannelTracker = new ArrayList<>(); 154 private final ArrayList<ManagedChannel> oobChannelTracker = new ArrayList<>(); 155 private final ArrayList<String> failingLbAuthorities = new ArrayList<>(); 156 private final TimeProvider timeProvider = new TimeProvider() { 157 @Override 158 public long currentTimeNanos() { 159 return fakeClock.getTicker().read(); 160 } 161 }; 162 private io.grpc.Server fakeLbServer; 163 @Captor 164 private ArgumentCaptor<SubchannelPicker> pickerCaptor; 165 private final SerializingExecutor channelExecutor = 166 new SerializingExecutor(MoreExecutors.directExecutor()); 167 @Mock 168 private ObjectPool<ScheduledExecutorService> timerServicePool; 169 @Mock 170 private BackoffPolicy.Provider backoffPolicyProvider; 171 @Mock 172 private BackoffPolicy backoffPolicy1; 173 @Mock 174 private BackoffPolicy backoffPolicy2; 175 private GrpclbLoadBalancer balancer; 176 177 @SuppressWarnings("unchecked") 178 @Before setUp()179 public void setUp() throws Exception { 180 MockitoAnnotations.initMocks(this); 181 mockLbService = mock(LoadBalancerGrpc.LoadBalancerImplBase.class, delegatesTo( 182 new LoadBalancerGrpc.LoadBalancerImplBase() { 183 @Override 184 public StreamObserver<LoadBalanceRequest> balanceLoad( 185 final StreamObserver<LoadBalanceResponse> responseObserver) { 186 StreamObserver<LoadBalanceRequest> requestObserver = 187 mock(StreamObserver.class); 188 Answer<Void> closeRpc = new Answer<Void>() { 189 @Override 190 public Void answer(InvocationOnMock invocation) { 191 responseObserver.onCompleted(); 192 return null; 193 } 194 }; 195 doAnswer(closeRpc).when(requestObserver).onCompleted(); 196 lbRequestObservers.add(requestObserver); 197 return requestObserver; 198 } 199 })); 200 fakeLbServer = InProcessServerBuilder.forName("fakeLb") 201 .directExecutor().addService(mockLbService).build().start(); 202 doAnswer(new Answer<ManagedChannel>() { 203 @Override 204 public ManagedChannel answer(InvocationOnMock invocation) throws Throwable { 205 String authority = (String) invocation.getArguments()[1]; 206 ManagedChannel channel; 207 if (failingLbAuthorities.contains(authority)) { 208 channel = InProcessChannelBuilder.forName("nonExistFakeLb").directExecutor() 209 .overrideAuthority(authority).build(); 210 } else { 211 channel = InProcessChannelBuilder.forName("fakeLb").directExecutor() 212 .overrideAuthority(authority).build(); 213 } 214 fakeOobChannels.add(channel); 215 oobChannelTracker.add(channel); 216 return channel; 217 } 218 }).when(helper).createOobChannel(any(EquivalentAddressGroup.class), any(String.class)); 219 doAnswer(new Answer<Subchannel>() { 220 @Override 221 public Subchannel answer(InvocationOnMock invocation) throws Throwable { 222 Subchannel subchannel = mock(Subchannel.class); 223 EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0]; 224 Attributes attrs = (Attributes) invocation.getArguments()[1]; 225 when(subchannel.getAddresses()).thenReturn(eag); 226 when(subchannel.getAttributes()).thenReturn(attrs); 227 mockSubchannels.add(subchannel); 228 subchannelTracker.add(subchannel); 229 return subchannel; 230 } 231 }).when(subchannelPool).takeOrCreateSubchannel( 232 any(EquivalentAddressGroup.class), any(Attributes.class)); 233 doAnswer(new Answer<Void>() { 234 @Override 235 public Void answer(InvocationOnMock invocation) throws Throwable { 236 Runnable task = (Runnable) invocation.getArguments()[0]; 237 channelExecutor.execute(task); 238 return null; 239 } 240 }).when(helper).runSerialized(any(Runnable.class)); 241 doAnswer(new Answer<Void>() { 242 @Override 243 public Void answer(InvocationOnMock invocation) throws Throwable { 244 currentPicker = (SubchannelPicker) invocation.getArguments()[1]; 245 return null; 246 } 247 }).when(helper).updateBalancingState( 248 any(ConnectivityState.class), any(SubchannelPicker.class)); 249 when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY); 250 ScheduledExecutorService timerService = fakeClock.getScheduledExecutorService(); 251 when(timerServicePool.getObject()).thenReturn(timerService); 252 when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); 253 when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L); 254 when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); 255 balancer = new GrpclbLoadBalancer( 256 helper, 257 subchannelPool, 258 timerServicePool, 259 timeProvider, 260 backoffPolicyProvider); 261 verify(subchannelPool).init(same(helper), same(timerService)); 262 } 263 264 @After tearDown()265 public void tearDown() { 266 try { 267 if (balancer != null) { 268 channelExecutor.execute(new Runnable() { 269 @Override 270 public void run() { 271 balancer.shutdown(); 272 } 273 }); 274 } 275 for (ManagedChannel channel : oobChannelTracker) { 276 assertTrue(channel + " is shutdown", channel.isShutdown()); 277 // balancer should have closed the LB stream, terminating the OOB channel. 278 assertTrue(channel + " is terminated", channel.isTerminated()); 279 } 280 // GRPCLB manages subchannels only through subchannelPool 281 for (Subchannel subchannel: subchannelTracker) { 282 verify(subchannelPool).returnSubchannel(same(subchannel)); 283 // Our mock subchannelPool never calls Subchannel.shutdown(), thus we can tell if 284 // LoadBalancer has called it expectedly. 285 verify(subchannel, never()).shutdown(); 286 } 287 verify(helper, never()) 288 .createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); 289 // No timer should linger after shutdown 290 assertThat(fakeClock.getPendingTasks()).isEmpty(); 291 } finally { 292 if (fakeLbServer != null) { 293 fakeLbServer.shutdownNow(); 294 } 295 } 296 } 297 298 @Test roundRobinPickerNoDrop()299 public void roundRobinPickerNoDrop() { 300 GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider); 301 Subchannel subchannel = mock(Subchannel.class); 302 BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001"); 303 BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002"); 304 305 List<BackendEntry> pickList = Arrays.asList(b1, b2); 306 RoundRobinPicker picker = new RoundRobinPicker(Collections.<DropEntry>emptyList(), pickList); 307 308 PickSubchannelArgs args1 = mock(PickSubchannelArgs.class); 309 Metadata headers1 = new Metadata(); 310 // The existing token on the headers will be replaced 311 headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD"); 312 when(args1.getHeaders()).thenReturn(headers1); 313 assertSame(b1.result, picker.pickSubchannel(args1)); 314 verify(args1).getHeaders(); 315 assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); 316 317 PickSubchannelArgs args2 = mock(PickSubchannelArgs.class); 318 Metadata headers2 = new Metadata(); 319 when(args2.getHeaders()).thenReturn(headers2); 320 assertSame(b2.result, picker.pickSubchannel(args2)); 321 verify(args2).getHeaders(); 322 assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002"); 323 324 PickSubchannelArgs args3 = mock(PickSubchannelArgs.class); 325 Metadata headers3 = new Metadata(); 326 when(args3.getHeaders()).thenReturn(headers3); 327 assertSame(b1.result, picker.pickSubchannel(args3)); 328 verify(args3).getHeaders(); 329 assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); 330 331 verify(subchannel, never()).getAttributes(); 332 } 333 334 335 @Test roundRobinPickerWithDrop()336 public void roundRobinPickerWithDrop() { 337 assertTrue(DROP_PICK_RESULT.isDrop()); 338 GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider); 339 Subchannel subchannel = mock(Subchannel.class); 340 // 1 out of 2 requests are to be dropped 341 DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003"); 342 List<DropEntry> dropList = Arrays.asList(null, d); 343 344 BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001"); 345 BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002"); 346 List<BackendEntry> pickList = Arrays.asList(b1, b2); 347 RoundRobinPicker picker = new RoundRobinPicker(dropList, pickList); 348 349 // dropList[0], pickList[0] 350 PickSubchannelArgs args1 = mock(PickSubchannelArgs.class); 351 Metadata headers1 = new Metadata(); 352 headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD"); 353 when(args1.getHeaders()).thenReturn(headers1); 354 assertSame(b1.result, picker.pickSubchannel(args1)); 355 verify(args1).getHeaders(); 356 assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); 357 358 // dropList[1]: drop 359 PickSubchannelArgs args2 = mock(PickSubchannelArgs.class); 360 Metadata headers2 = new Metadata(); 361 when(args2.getHeaders()).thenReturn(headers2); 362 assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args2)); 363 verify(args2, never()).getHeaders(); 364 365 // dropList[0], pickList[1] 366 PickSubchannelArgs args3 = mock(PickSubchannelArgs.class); 367 Metadata headers3 = new Metadata(); 368 when(args3.getHeaders()).thenReturn(headers3); 369 assertSame(b2.result, picker.pickSubchannel(args3)); 370 verify(args3).getHeaders(); 371 assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002"); 372 373 // dropList[1]: drop 374 PickSubchannelArgs args4 = mock(PickSubchannelArgs.class); 375 Metadata headers4 = new Metadata(); 376 when(args4.getHeaders()).thenReturn(headers4); 377 assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args4)); 378 verify(args4, never()).getHeaders(); 379 380 // dropList[0], pickList[0] 381 PickSubchannelArgs args5 = mock(PickSubchannelArgs.class); 382 Metadata headers5 = new Metadata(); 383 when(args5.getHeaders()).thenReturn(headers5); 384 assertSame(b1.result, picker.pickSubchannel(args5)); 385 verify(args5).getHeaders(); 386 assertThat(headers5.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001"); 387 388 verify(subchannel, never()).getAttributes(); 389 } 390 391 @Test loadReporting()392 public void loadReporting() { 393 Metadata headers = new Metadata(); 394 PickSubchannelArgs args = mock(PickSubchannelArgs.class); 395 when(args.getHeaders()).thenReturn(headers); 396 397 long loadReportIntervalMillis = 1983; 398 List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); 399 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 400 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 401 402 // Fallback timer is started as soon as address is resolved. 403 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 404 405 assertEquals(1, fakeOobChannels.size()); 406 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 407 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 408 assertEquals(1, lbRequestObservers.size()); 409 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 410 InOrder inOrder = inOrder(lbRequestObserver); 411 InOrder helperInOrder = inOrder(helper, subchannelPool); 412 413 inOrder.verify(lbRequestObserver).onNext( 414 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 415 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 416 .build())); 417 418 // Simulate receiving LB response 419 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 420 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 421 422 // Load reporting task is scheduled 423 assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 424 assertEquals(0, fakeClock.runDueTasks()); 425 426 List<ServerEntry> backends = Arrays.asList( 427 new ServerEntry("127.0.0.1", 2000, "token0001"), 428 new ServerEntry("token0001"), // drop 429 new ServerEntry("127.0.0.1", 2010, "token0002"), 430 new ServerEntry("token0003")); // drop 431 432 lbResponseObserver.onNext(buildLbResponse(backends)); 433 434 assertEquals(2, mockSubchannels.size()); 435 Subchannel subchannel1 = mockSubchannels.poll(); 436 Subchannel subchannel2 = mockSubchannels.poll(); 437 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); 438 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); 439 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); 440 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); 441 442 helperInOrder.verify(helper, atLeast(1)) 443 .updateBalancingState(eq(READY), pickerCaptor.capture()); 444 RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); 445 assertThat(picker.dropList).containsExactly( 446 null, 447 new DropEntry(getLoadRecorder(), "token0001"), 448 null, 449 new DropEntry(getLoadRecorder(), "token0003")).inOrder(); 450 assertThat(picker.pickList).containsExactly( 451 new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), 452 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")).inOrder(); 453 454 // Report, no data 455 assertNextReport( 456 inOrder, lbRequestObserver, loadReportIntervalMillis, 457 ClientStats.newBuilder().build()); 458 459 PickResult pick1 = picker.pickSubchannel(args); 460 assertSame(subchannel1, pick1.getSubchannel()); 461 assertSame(getLoadRecorder(), pick1.getStreamTracerFactory()); 462 463 // Merely the pick will not be recorded as upstart. 464 assertNextReport( 465 inOrder, lbRequestObserver, loadReportIntervalMillis, 466 ClientStats.newBuilder().build()); 467 468 ClientStreamTracer tracer1 = 469 pick1.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); 470 471 PickResult pick2 = picker.pickSubchannel(args); 472 assertNull(pick2.getSubchannel()); 473 assertSame(DROP_PICK_RESULT, pick2); 474 475 // Report includes upstart of pick1 and the drop of pick2 476 assertNextReport( 477 inOrder, lbRequestObserver, loadReportIntervalMillis, 478 ClientStats.newBuilder() 479 .setNumCallsStarted(2) 480 .setNumCallsFinished(1) // pick2 481 .addCallsFinishedWithDrop( 482 ClientStatsPerToken.newBuilder() 483 .setLoadBalanceToken("token0001") 484 .setNumCalls(1) // pick2 485 .build()) 486 .build()); 487 488 PickResult pick3 = picker.pickSubchannel(args); 489 assertSame(subchannel2, pick3.getSubchannel()); 490 assertSame(getLoadRecorder(), pick3.getStreamTracerFactory()); 491 ClientStreamTracer tracer3 = 492 pick3.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); 493 494 // pick3 has sent out headers 495 tracer3.outboundHeaders(); 496 497 // 3rd report includes pick3's upstart 498 assertNextReport( 499 inOrder, lbRequestObserver, loadReportIntervalMillis, 500 ClientStats.newBuilder() 501 .setNumCallsStarted(1) 502 .build()); 503 504 PickResult pick4 = picker.pickSubchannel(args); 505 assertNull(pick4.getSubchannel()); 506 assertSame(DROP_PICK_RESULT, pick4); 507 508 // pick1 ended without sending anything 509 tracer1.streamClosed(Status.CANCELLED); 510 511 // 4th report includes end of pick1 and drop of pick4 512 assertNextReport( 513 inOrder, lbRequestObserver, loadReportIntervalMillis, 514 ClientStats.newBuilder() 515 .setNumCallsStarted(1) // pick4 516 .setNumCallsFinished(2) 517 .setNumCallsFinishedWithClientFailedToSend(1) // pick1 518 .addCallsFinishedWithDrop( 519 ClientStatsPerToken.newBuilder() 520 .setLoadBalanceToken("token0003") 521 .setNumCalls(1) // pick4 522 .build()) 523 .build()); 524 525 PickResult pick5 = picker.pickSubchannel(args); 526 assertSame(subchannel1, pick1.getSubchannel()); 527 assertSame(getLoadRecorder(), pick5.getStreamTracerFactory()); 528 ClientStreamTracer tracer5 = 529 pick5.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); 530 531 // pick3 ended without receiving response headers 532 tracer3.streamClosed(Status.DEADLINE_EXCEEDED); 533 534 // pick5 sent and received headers 535 tracer5.outboundHeaders(); 536 tracer5.inboundHeaders(); 537 538 // 5th report includes pick3's end and pick5's upstart 539 assertNextReport( 540 inOrder, lbRequestObserver, loadReportIntervalMillis, 541 ClientStats.newBuilder() 542 .setNumCallsStarted(1) // pick5 543 .setNumCallsFinished(1) // pick3 544 .build()); 545 546 // pick5 ends 547 tracer5.streamClosed(Status.OK); 548 549 // 6th report includes pick5's end 550 assertNextReport( 551 inOrder, lbRequestObserver, loadReportIntervalMillis, 552 ClientStats.newBuilder() 553 .setNumCallsFinished(1) 554 .setNumCallsFinishedKnownReceived(1) 555 .build()); 556 557 assertEquals(1, fakeClock.numPendingTasks()); 558 // Balancer closes the stream, scheduled reporting task cancelled 559 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 560 assertEquals(0, fakeClock.numPendingTasks()); 561 562 // New stream created 563 verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); 564 lbResponseObserver = lbResponseObserverCaptor.getValue(); 565 assertEquals(1, lbRequestObservers.size()); 566 lbRequestObserver = lbRequestObservers.poll(); 567 inOrder = inOrder(lbRequestObserver); 568 569 inOrder.verify(lbRequestObserver).onNext( 570 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 571 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 572 .build())); 573 574 // Load reporting is also requested 575 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 576 577 // No picker created because balancer is still using the results from the last stream 578 helperInOrder.verify(helper, never()) 579 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 580 581 // Make a new pick on that picker. It will not show up on the report of the new stream, because 582 // that picker is associated with the previous stream. 583 PickResult pick6 = picker.pickSubchannel(args); 584 assertNull(pick6.getSubchannel()); 585 assertSame(DROP_PICK_RESULT, pick6); 586 assertNextReport( 587 inOrder, lbRequestObserver, loadReportIntervalMillis, 588 ClientStats.newBuilder().build()); 589 590 // New stream got the list update 591 lbResponseObserver.onNext(buildLbResponse(backends)); 592 593 // Same backends, thus no new subchannels 594 helperInOrder.verify(subchannelPool, never()).takeOrCreateSubchannel( 595 any(EquivalentAddressGroup.class), any(Attributes.class)); 596 // But the new RoundRobinEntries have a new loadRecorder, thus considered different from 597 // the previous list, thus a new picker is created 598 helperInOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 599 picker = (RoundRobinPicker) pickerCaptor.getValue(); 600 601 PickResult pick1p = picker.pickSubchannel(args); 602 assertSame(subchannel1, pick1p.getSubchannel()); 603 assertSame(getLoadRecorder(), pick1p.getStreamTracerFactory()); 604 pick1p.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); 605 606 // The pick from the new stream will be included in the report 607 assertNextReport( 608 inOrder, lbRequestObserver, loadReportIntervalMillis, 609 ClientStats.newBuilder() 610 .setNumCallsStarted(1) 611 .build()); 612 613 verify(args, atLeast(0)).getHeaders(); 614 verifyNoMoreInteractions(args); 615 } 616 617 @Test abundantInitialResponse()618 public void abundantInitialResponse() { 619 Metadata headers = new Metadata(); 620 PickSubchannelArgs args = mock(PickSubchannelArgs.class); 621 when(args.getHeaders()).thenReturn(headers); 622 623 List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); 624 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 625 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 626 assertEquals(1, fakeOobChannels.size()); 627 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 628 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 629 630 // Simulate LB initial response 631 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 632 lbResponseObserver.onNext(buildInitialResponse(1983)); 633 634 // Load reporting task is scheduled 635 assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 636 FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next(); 637 assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); 638 639 // Simulate an abundant LB initial response, with a different report interval 640 lbResponseObserver.onNext(buildInitialResponse(9097)); 641 // It doesn't affect load-reporting at all 642 assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER)) 643 .containsExactly(scheduledTask); 644 assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); 645 } 646 647 @Test raceBetweenLoadReportingAndLbStreamClosure()648 public void raceBetweenLoadReportingAndLbStreamClosure() { 649 Metadata headers = new Metadata(); 650 PickSubchannelArgs args = mock(PickSubchannelArgs.class); 651 when(args.getHeaders()).thenReturn(headers); 652 653 List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); 654 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 655 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 656 assertEquals(1, fakeOobChannels.size()); 657 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 658 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 659 assertEquals(1, lbRequestObservers.size()); 660 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 661 InOrder inOrder = inOrder(lbRequestObserver); 662 663 inOrder.verify(lbRequestObserver).onNext( 664 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 665 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 666 .build())); 667 668 // Simulate receiving LB response 669 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 670 lbResponseObserver.onNext(buildInitialResponse(1983)); 671 672 // Load reporting task is scheduled 673 assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 674 FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next(); 675 assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); 676 677 // Close lbStream 678 lbResponseObserver.onCompleted(); 679 680 // Reporting task cancelled 681 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 682 683 // Simulate a race condition where the task has just started when its cancelled 684 scheduledTask.command.run(); 685 686 // No report sent. No new task scheduled 687 inOrder.verify(lbRequestObserver, never()).onNext(any(LoadBalanceRequest.class)); 688 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 689 } 690 assertNextReport( InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver, long loadReportIntervalMillis, ClientStats expectedReport)691 private void assertNextReport( 692 InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver, 693 long loadReportIntervalMillis, ClientStats expectedReport) { 694 assertEquals(0, fakeClock.forwardTime(loadReportIntervalMillis - 1, TimeUnit.MILLISECONDS)); 695 inOrder.verifyNoMoreInteractions(); 696 assertEquals(1, fakeClock.forwardTime(1, TimeUnit.MILLISECONDS)); 697 assertEquals(1, fakeClock.numPendingTasks()); 698 inOrder.verify(lbRequestObserver).onNext( 699 eq(LoadBalanceRequest.newBuilder() 700 .setClientStats( 701 ClientStats.newBuilder(expectedReport) 702 .setTimestamp(Timestamps.fromNanos(fakeClock.getTicker().read())) 703 .build()) 704 .build())); 705 } 706 707 @Test acquireAndReleaseScheduledExecutor()708 public void acquireAndReleaseScheduledExecutor() { 709 verify(timerServicePool).getObject(); 710 verifyNoMoreInteractions(timerServicePool); 711 712 balancer.shutdown(); 713 verify(timerServicePool).returnObject(same(fakeClock.getScheduledExecutorService())); 714 verifyNoMoreInteractions(timerServicePool); 715 } 716 717 @Test nameResolutionFailsThenRecoverToDelegate()718 public void nameResolutionFailsThenRecoverToDelegate() { 719 Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); 720 deliverNameResolutionError(error); 721 verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 722 RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); 723 assertThat(picker.dropList).isEmpty(); 724 assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); 725 726 // Recover with a subsequent success 727 List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false); 728 729 Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build(); 730 deliverResolvedAddresses(resolvedServers, resolutionAttrs); 731 } 732 733 @Test nameResolutionFailsThenRecoverToGrpclb()734 public void nameResolutionFailsThenRecoverToGrpclb() { 735 Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); 736 deliverNameResolutionError(error); 737 verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 738 RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); 739 assertThat(picker.dropList).isEmpty(); 740 assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); 741 742 // Recover with a subsequent success 743 List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(true); 744 EquivalentAddressGroup eag = resolvedServers.get(0); 745 746 Attributes resolutionAttrs = Attributes.EMPTY; 747 deliverResolvedAddresses(resolvedServers, resolutionAttrs); 748 749 verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0))); 750 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 751 } 752 753 @Test grpclbThenNameResolutionFails()754 public void grpclbThenNameResolutionFails() { 755 InOrder inOrder = inOrder(helper, subchannelPool); 756 // Go to GRPCLB first 757 List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); 758 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 759 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 760 761 verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); 762 assertEquals(1, fakeOobChannels.size()); 763 ManagedChannel oobChannel = fakeOobChannels.poll(); 764 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 765 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 766 767 // Let name resolution fail before round-robin list is ready 768 Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); 769 deliverNameResolutionError(error); 770 771 inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 772 RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); 773 assertThat(picker.dropList).isEmpty(); 774 assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); 775 assertFalse(oobChannel.isShutdown()); 776 777 // Simulate receiving LB response 778 List<ServerEntry> backends = Arrays.asList( 779 new ServerEntry("127.0.0.1", 2000, "TOKEN1"), 780 new ServerEntry("127.0.0.1", 2010, "TOKEN2")); 781 verify(helper, never()).runSerialized(any(Runnable.class)); 782 lbResponseObserver.onNext(buildInitialResponse()); 783 lbResponseObserver.onNext(buildLbResponse(backends)); 784 785 verify(helper, times(2)).runSerialized(any(Runnable.class)); 786 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 787 eq(new EquivalentAddressGroup(backends.get(0).addr, LB_BACKEND_ATTRS)), 788 any(Attributes.class)); 789 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 790 eq(new EquivalentAddressGroup(backends.get(1).addr, LB_BACKEND_ATTRS)), 791 any(Attributes.class)); 792 } 793 794 @Test grpclbUpdatedAddresses_avoidsReconnect()795 public void grpclbUpdatedAddresses_avoidsReconnect() { 796 List<EquivalentAddressGroup> grpclbResolutionList = 797 createResolvedServerAddresses(true, false); 798 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 799 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 800 801 verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); 802 ManagedChannel oobChannel = fakeOobChannels.poll(); 803 assertEquals(1, lbRequestObservers.size()); 804 805 List<EquivalentAddressGroup> grpclbResolutionList2 = 806 createResolvedServerAddresses(true, false, true); 807 EquivalentAddressGroup combinedEag = new EquivalentAddressGroup(Arrays.asList( 808 grpclbResolutionList2.get(0).getAddresses().get(0), 809 grpclbResolutionList2.get(2).getAddresses().get(0)), 810 lbAttributes(lbAuthority(0))); 811 deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs); 812 verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(combinedEag)); 813 assertEquals(1, lbRequestObservers.size()); // No additional RPC 814 } 815 816 @Test grpclbUpdatedAddresses_reconnectOnAuthorityChange()817 public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() { 818 List<EquivalentAddressGroup> grpclbResolutionList = 819 createResolvedServerAddresses(true, false); 820 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 821 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 822 823 verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); 824 ManagedChannel oobChannel = fakeOobChannels.poll(); 825 assertEquals(1, lbRequestObservers.size()); 826 827 final String newAuthority = "some-new-authority"; 828 List<EquivalentAddressGroup> grpclbResolutionList2 = 829 createResolvedServerAddresses(false); 830 grpclbResolutionList2.add(new EquivalentAddressGroup( 831 new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority))); 832 deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs); 833 assertTrue(oobChannel.isTerminated()); 834 verify(helper).createOobChannel(eq(grpclbResolutionList2.get(1)), eq(newAuthority)); 835 assertEquals(2, lbRequestObservers.size()); // An additional RPC 836 } 837 838 @Test grpclbWorking()839 public void grpclbWorking() { 840 InOrder inOrder = inOrder(helper, subchannelPool); 841 List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); 842 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 843 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 844 845 // Fallback timer is started as soon as the addresses are resolved. 846 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 847 848 verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); 849 assertEquals(1, fakeOobChannels.size()); 850 ManagedChannel oobChannel = fakeOobChannels.poll(); 851 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 852 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 853 assertEquals(1, lbRequestObservers.size()); 854 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 855 verify(lbRequestObserver).onNext( 856 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 857 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 858 .build())); 859 860 // Simulate receiving LB response 861 List<ServerEntry> backends1 = Arrays.asList( 862 new ServerEntry("127.0.0.1", 2000, "token0001"), 863 new ServerEntry("127.0.0.1", 2010, "token0002")); 864 inOrder.verify(helper, never()) 865 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 866 lbResponseObserver.onNext(buildInitialResponse()); 867 lbResponseObserver.onNext(buildLbResponse(backends1)); 868 869 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 870 eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)), 871 any(Attributes.class)); 872 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 873 eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)), 874 any(Attributes.class)); 875 assertEquals(2, mockSubchannels.size()); 876 Subchannel subchannel1 = mockSubchannels.poll(); 877 Subchannel subchannel2 = mockSubchannels.poll(); 878 verify(subchannel1).requestConnection(); 879 verify(subchannel2).requestConnection(); 880 assertEquals( 881 new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS), 882 subchannel1.getAddresses()); 883 assertEquals( 884 new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS), 885 subchannel2.getAddresses()); 886 887 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); 888 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); 889 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 890 RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); 891 assertThat(picker0.dropList).containsExactly(null, null); 892 assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY); 893 inOrder.verifyNoMoreInteractions(); 894 895 // Let subchannels be connected 896 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); 897 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 898 RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); 899 900 assertThat(picker1.dropList).containsExactly(null, null); 901 assertThat(picker1.pickList).containsExactly( 902 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); 903 904 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); 905 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 906 RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); 907 assertThat(picker2.dropList).containsExactly(null, null); 908 assertThat(picker2.pickList).containsExactly( 909 new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), 910 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) 911 .inOrder(); 912 913 // Disconnected subchannels 914 verify(subchannel1).requestConnection(); 915 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE)); 916 verify(subchannel1, times(2)).requestConnection(); 917 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 918 RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); 919 assertThat(picker3.dropList).containsExactly(null, null); 920 assertThat(picker3.pickList).containsExactly( 921 new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); 922 923 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); 924 inOrder.verifyNoMoreInteractions(); 925 926 // As long as there is at least one READY subchannel, round robin will work. 927 Status error1 = Status.UNAVAILABLE.withDescription("error1"); 928 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1)); 929 inOrder.verifyNoMoreInteractions(); 930 931 // If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING 932 verify(subchannel2).requestConnection(); 933 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE)); 934 verify(subchannel2, times(2)).requestConnection(); 935 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 936 RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); 937 assertThat(picker4.dropList).containsExactly(null, null); 938 assertThat(picker4.pickList).containsExactly(BUFFER_ENTRY); 939 940 // Update backends, with a drop entry 941 List<ServerEntry> backends2 = 942 Arrays.asList( 943 new ServerEntry("127.0.0.1", 2030, "token0003"), // New address 944 new ServerEntry("token0003"), // drop 945 new ServerEntry("127.0.0.1", 2010, "token0004"), // Existing address with token changed 946 new ServerEntry("127.0.0.1", 2030, "token0005"), // New address appearing second time 947 new ServerEntry("token0006")); // drop 948 verify(subchannelPool, never()).returnSubchannel(same(subchannel1)); 949 950 lbResponseObserver.onNext(buildLbResponse(backends2)); 951 // not in backends2, closed 952 verify(subchannelPool).returnSubchannel(same(subchannel1)); 953 // backends2[2], will be kept 954 verify(subchannelPool, never()).returnSubchannel(same(subchannel2)); 955 956 inOrder.verify(subchannelPool, never()).takeOrCreateSubchannel( 957 eq(new EquivalentAddressGroup(backends2.get(2).addr, LB_BACKEND_ATTRS)), 958 any(Attributes.class)); 959 inOrder.verify(subchannelPool).takeOrCreateSubchannel( 960 eq(new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS)), 961 any(Attributes.class)); 962 assertEquals(1, mockSubchannels.size()); 963 Subchannel subchannel3 = mockSubchannels.poll(); 964 verify(subchannel3).requestConnection(); 965 assertEquals( 966 new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS), 967 subchannel3.getAddresses()); 968 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 969 RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue(); 970 assertThat(picker7.dropList).containsExactly( 971 null, 972 new DropEntry(getLoadRecorder(), "token0003"), 973 null, 974 null, 975 new DropEntry(getLoadRecorder(), "token0006")).inOrder(); 976 assertThat(picker7.pickList).containsExactly(BUFFER_ENTRY); 977 978 // State updates on obsolete subchannel1 will have no effect 979 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); 980 deliverSubchannelState( 981 subchannel1, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); 982 deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN)); 983 inOrder.verifyNoMoreInteractions(); 984 985 deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); 986 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 987 RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue(); 988 assertThat(picker8.dropList).containsExactly( 989 null, 990 new DropEntry(getLoadRecorder(), "token0003"), 991 null, 992 null, 993 new DropEntry(getLoadRecorder(), "token0006")).inOrder(); 994 // subchannel2 is still IDLE, thus not in the active list 995 assertThat(picker8.pickList).containsExactly( 996 new BackendEntry(subchannel3, getLoadRecorder(), "token0003"), 997 new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder(); 998 // subchannel2 becomes READY and makes it into the list 999 deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); 1000 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 1001 RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue(); 1002 assertThat(picker9.dropList).containsExactly( 1003 null, 1004 new DropEntry(getLoadRecorder(), "token0003"), 1005 null, 1006 null, 1007 new DropEntry(getLoadRecorder(), "token0006")).inOrder(); 1008 assertThat(picker9.pickList).containsExactly( 1009 new BackendEntry(subchannel3, getLoadRecorder(), "token0003"), 1010 new BackendEntry(subchannel2, getLoadRecorder(), "token0004"), 1011 new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder(); 1012 verify(subchannelPool, never()).returnSubchannel(same(subchannel3)); 1013 1014 // Update backends, with no entry 1015 lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList())); 1016 verify(subchannelPool).returnSubchannel(same(subchannel2)); 1017 verify(subchannelPool).returnSubchannel(same(subchannel3)); 1018 inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); 1019 RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue(); 1020 assertThat(picker10.dropList).isEmpty(); 1021 assertThat(picker10.pickList).containsExactly(BUFFER_ENTRY); 1022 1023 assertFalse(oobChannel.isShutdown()); 1024 assertEquals(0, lbRequestObservers.size()); 1025 verify(lbRequestObserver, never()).onCompleted(); 1026 verify(lbRequestObserver, never()).onError(any(Throwable.class)); 1027 1028 // Load reporting was not requested, thus never scheduled 1029 assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); 1030 1031 verify(subchannelPool, never()).clear(); 1032 balancer.shutdown(); 1033 verify(subchannelPool).clear(); 1034 } 1035 1036 @Test grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires()1037 public void grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires() { 1038 subtestGrpclbFallbackInitialTimeout(false); 1039 } 1040 1041 @Test grpclbFallback_initialTimeout_timerExpires()1042 public void grpclbFallback_initialTimeout_timerExpires() { 1043 subtestGrpclbFallbackInitialTimeout(true); 1044 } 1045 1046 // Fallback or not within the period of the initial timeout. subtestGrpclbFallbackInitialTimeout(boolean timerExpires)1047 private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { 1048 long loadReportIntervalMillis = 1983; 1049 InOrder inOrder = inOrder(helper, subchannelPool); 1050 1051 // Create a resolution list with a mixture of balancer and backend addresses 1052 List<EquivalentAddressGroup> resolutionList = 1053 createResolvedServerAddresses(false, true, false); 1054 Attributes resolutionAttrs = Attributes.EMPTY; 1055 deliverResolvedAddresses(resolutionList, resolutionAttrs); 1056 1057 inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); 1058 1059 // Attempted to connect to balancer 1060 assertEquals(1, fakeOobChannels.size()); 1061 ManagedChannel oobChannel = fakeOobChannels.poll(); 1062 verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1063 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1064 assertEquals(1, lbRequestObservers.size()); 1065 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 1066 1067 verify(lbRequestObserver).onNext( 1068 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1069 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1070 .build())); 1071 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 1072 // We don't care if runSerialized() has been run. 1073 inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class)); 1074 inOrder.verifyNoMoreInteractions(); 1075 1076 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1077 fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS); 1078 assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1079 1080 ///////////////////////////////////////////// 1081 // Break the LB stream before timer expires 1082 ///////////////////////////////////////////// 1083 Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); 1084 lbResponseObserver.onError(streamError.asException()); 1085 // Not in fallback mode. The error will be propagated. 1086 verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); 1087 RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); 1088 assertThat(picker.dropList).isEmpty(); 1089 ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList); 1090 Status status = errorEntry.result.getStatus(); 1091 assertThat(status.getCode()).isEqualTo(streamError.getCode()); 1092 assertThat(status.getDescription()).contains(streamError.getDescription()); 1093 // A new stream is created 1094 verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); 1095 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1096 assertEquals(1, lbRequestObservers.size()); 1097 lbRequestObserver = lbRequestObservers.poll(); 1098 verify(lbRequestObserver).onNext( 1099 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1100 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1101 .build())); 1102 1103 ////////////////////////////////// 1104 // Fallback timer expires (or not) 1105 ////////////////////////////////// 1106 if (timerExpires) { 1107 fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); 1108 assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1109 // Fall back to the backends from resolver 1110 fallbackTestVerifyUseOfFallbackBackendLists( 1111 inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); 1112 1113 assertFalse(oobChannel.isShutdown()); 1114 verify(lbRequestObserver, never()).onCompleted(); 1115 } 1116 1117 //////////////////////////////////////////////////////// 1118 // Name resolver sends new list without any backend addr 1119 //////////////////////////////////////////////////////// 1120 resolutionList = createResolvedServerAddresses(true, true); 1121 deliverResolvedAddresses(resolutionList, resolutionAttrs); 1122 1123 // New addresses are updated to the OobChannel 1124 inOrder.verify(helper).updateOobChannelAddresses( 1125 same(oobChannel), 1126 eq(new EquivalentAddressGroup( 1127 Arrays.asList( 1128 resolutionList.get(0).getAddresses().get(0), 1129 resolutionList.get(1).getAddresses().get(0)), 1130 lbAttributes(lbAuthority(0))))); 1131 1132 if (timerExpires) { 1133 // Still in fallback logic, except that the backend list is empty 1134 fallbackTestVerifyUseOfFallbackBackendLists( 1135 inOrder, Collections.<EquivalentAddressGroup>emptyList()); 1136 } 1137 1138 ////////////////////////////////////////////////// 1139 // Name resolver sends new list with backend addrs 1140 ////////////////////////////////////////////////// 1141 resolutionList = createResolvedServerAddresses(true, false, false); 1142 deliverResolvedAddresses(resolutionList, resolutionAttrs); 1143 1144 // New LB address is updated to the OobChannel 1145 inOrder.verify(helper).updateOobChannelAddresses( 1146 same(oobChannel), 1147 eq(resolutionList.get(0))); 1148 1149 if (timerExpires) { 1150 // New backend addresses are used for fallback 1151 fallbackTestVerifyUseOfFallbackBackendLists( 1152 inOrder, Arrays.asList(resolutionList.get(1), resolutionList.get(2))); 1153 } 1154 1155 //////////////////////////////////////////////// 1156 // Break the LB stream after the timer expires 1157 //////////////////////////////////////////////// 1158 if (timerExpires) { 1159 lbResponseObserver.onError(streamError.asException()); 1160 1161 // The error will NOT propagate to picker because fallback list is in use. 1162 inOrder.verify(helper, never()) 1163 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1164 // A new stream is created 1165 verify(mockLbService, times(3)).balanceLoad(lbResponseObserverCaptor.capture()); 1166 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1167 assertEquals(1, lbRequestObservers.size()); 1168 lbRequestObserver = lbRequestObservers.poll(); 1169 verify(lbRequestObserver).onNext( 1170 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1171 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1172 .build())); 1173 } 1174 1175 ///////////////////////////////// 1176 // Balancer returns a server list 1177 ///////////////////////////////// 1178 List<ServerEntry> serverList = Arrays.asList( 1179 new ServerEntry("127.0.0.1", 2000, "token0001"), 1180 new ServerEntry("127.0.0.1", 2010, "token0002")); 1181 lbResponseObserver.onNext(buildInitialResponse()); 1182 lbResponseObserver.onNext(buildLbResponse(serverList)); 1183 1184 // Balancer-provided server list now in effect 1185 fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList); 1186 1187 /////////////////////////////////////////////////////////////// 1188 // New backend addresses from resolver outside of fallback mode 1189 /////////////////////////////////////////////////////////////// 1190 resolutionList = createResolvedServerAddresses(true, false); 1191 deliverResolvedAddresses(resolutionList, resolutionAttrs); 1192 // Will not affect the round robin list at all 1193 inOrder.verify(helper, never()) 1194 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1195 1196 // No fallback timeout timer scheduled. 1197 assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1198 } 1199 1200 @Test grpclbFallback_balancerLost()1201 public void grpclbFallback_balancerLost() { 1202 subtestGrpclbFallbackConnectionLost(true, false); 1203 } 1204 1205 @Test grpclbFallback_subchannelsLost()1206 public void grpclbFallback_subchannelsLost() { 1207 subtestGrpclbFallbackConnectionLost(false, true); 1208 } 1209 1210 @Test grpclbFallback_allLost()1211 public void grpclbFallback_allLost() { 1212 subtestGrpclbFallbackConnectionLost(true, true); 1213 } 1214 1215 // Fallback outside of the initial timeout, where all connections are lost. subtestGrpclbFallbackConnectionLost( boolean balancerBroken, boolean allSubchannelsBroken)1216 private void subtestGrpclbFallbackConnectionLost( 1217 boolean balancerBroken, boolean allSubchannelsBroken) { 1218 long loadReportIntervalMillis = 1983; 1219 InOrder inOrder = inOrder(helper, mockLbService, subchannelPool); 1220 1221 // Create a resolution list with a mixture of balancer and backend addresses 1222 List<EquivalentAddressGroup> resolutionList = 1223 createResolvedServerAddresses(false, true, false); 1224 Attributes resolutionAttrs = Attributes.EMPTY; 1225 deliverResolvedAddresses(resolutionList, resolutionAttrs); 1226 1227 inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); 1228 1229 // Attempted to connect to balancer 1230 assertEquals(1, fakeOobChannels.size()); 1231 fakeOobChannels.poll(); 1232 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1233 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1234 assertEquals(1, lbRequestObservers.size()); 1235 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 1236 1237 verify(lbRequestObserver).onNext( 1238 eq(LoadBalanceRequest.newBuilder().setInitialRequest( 1239 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1240 .build())); 1241 lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); 1242 // We don't care if runSerialized() has been run. 1243 inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class)); 1244 inOrder.verifyNoMoreInteractions(); 1245 1246 // Balancer returns a server list 1247 List<ServerEntry> serverList = Arrays.asList( 1248 new ServerEntry("127.0.0.1", 2000, "token0001"), 1249 new ServerEntry("127.0.0.1", 2010, "token0002")); 1250 lbResponseObserver.onNext(buildInitialResponse()); 1251 lbResponseObserver.onNext(buildLbResponse(serverList)); 1252 1253 List<Subchannel> subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList); 1254 1255 // Break connections 1256 if (balancerBroken) { 1257 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 1258 // A new stream to LB is created 1259 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1260 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1261 assertEquals(1, lbRequestObservers.size()); 1262 lbRequestObserver = lbRequestObservers.poll(); 1263 } 1264 if (allSubchannelsBroken) { 1265 for (Subchannel subchannel : subchannels) { 1266 // A READY subchannel transits to IDLE when receiving a go-away 1267 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); 1268 } 1269 } 1270 1271 if (balancerBroken && allSubchannelsBroken) { 1272 // Going into fallback 1273 subchannels = fallbackTestVerifyUseOfFallbackBackendLists( 1274 inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); 1275 1276 // When in fallback mode, fallback timer should not be scheduled when all backend 1277 // connections are lost 1278 for (Subchannel subchannel : subchannels) { 1279 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); 1280 } 1281 1282 // Exit fallback mode or cancel fallback timer when receiving a new server list from balancer 1283 List<ServerEntry> serverList2 = Arrays.asList( 1284 new ServerEntry("127.0.0.1", 2001, "token0003"), 1285 new ServerEntry("127.0.0.1", 2011, "token0004")); 1286 lbResponseObserver.onNext(buildInitialResponse()); 1287 lbResponseObserver.onNext(buildLbResponse(serverList2)); 1288 1289 fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList2); 1290 } 1291 assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); 1292 1293 if (!(balancerBroken && allSubchannelsBroken)) { 1294 verify(subchannelPool, never()).takeOrCreateSubchannel( 1295 eq(resolutionList.get(0)), any(Attributes.class)); 1296 verify(subchannelPool, never()).takeOrCreateSubchannel( 1297 eq(resolutionList.get(2)), any(Attributes.class)); 1298 } 1299 } 1300 fallbackTestVerifyUseOfFallbackBackendLists( InOrder inOrder, List<EquivalentAddressGroup> addrs)1301 private List<Subchannel> fallbackTestVerifyUseOfFallbackBackendLists( 1302 InOrder inOrder, List<EquivalentAddressGroup> addrs) { 1303 return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, null); 1304 } 1305 fallbackTestVerifyUseOfBalancerBackendLists( InOrder inOrder, List<ServerEntry> servers)1306 private List<Subchannel> fallbackTestVerifyUseOfBalancerBackendLists( 1307 InOrder inOrder, List<ServerEntry> servers) { 1308 ArrayList<EquivalentAddressGroup> addrs = new ArrayList<>(); 1309 ArrayList<String> tokens = new ArrayList<>(); 1310 for (ServerEntry server : servers) { 1311 addrs.add(new EquivalentAddressGroup(server.addr, LB_BACKEND_ATTRS)); 1312 tokens.add(server.token); 1313 } 1314 return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, tokens); 1315 } 1316 fallbackTestVerifyUseOfBackendLists( InOrder inOrder, List<EquivalentAddressGroup> addrs, @Nullable List<String> tokens)1317 private List<Subchannel> fallbackTestVerifyUseOfBackendLists( 1318 InOrder inOrder, List<EquivalentAddressGroup> addrs, 1319 @Nullable List<String> tokens) { 1320 if (tokens != null) { 1321 assertEquals(addrs.size(), tokens.size()); 1322 } 1323 for (EquivalentAddressGroup addr : addrs) { 1324 inOrder.verify(subchannelPool).takeOrCreateSubchannel(eq(addr), any(Attributes.class)); 1325 } 1326 RoundRobinPicker picker = (RoundRobinPicker) currentPicker; 1327 assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); 1328 assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY); 1329 assertEquals(addrs.size(), mockSubchannels.size()); 1330 ArrayList<Subchannel> subchannels = new ArrayList<>(mockSubchannels); 1331 mockSubchannels.clear(); 1332 for (Subchannel subchannel : subchannels) { 1333 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); 1334 } 1335 inOrder.verify(helper, atLeast(0)) 1336 .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); 1337 inOrder.verify(helper, never()) 1338 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1339 1340 ArrayList<BackendEntry> pickList = new ArrayList<>(); 1341 for (int i = 0; i < addrs.size(); i++) { 1342 Subchannel subchannel = subchannels.get(i); 1343 BackendEntry backend; 1344 if (tokens == null) { 1345 backend = new BackendEntry(subchannel); 1346 } else { 1347 backend = new BackendEntry(subchannel, getLoadRecorder(), tokens.get(i)); 1348 } 1349 pickList.add(backend); 1350 deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); 1351 inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); 1352 picker = (RoundRobinPicker) pickerCaptor.getValue(); 1353 assertThat(picker.dropList) 1354 .containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); 1355 assertThat(picker.pickList).containsExactlyElementsIn(pickList); 1356 inOrder.verify(helper, never()) 1357 .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); 1358 } 1359 return subchannels; 1360 } 1361 1362 @Test grpclbMultipleAuthorities()1363 public void grpclbMultipleAuthorities() throws Exception { 1364 List<EquivalentAddressGroup> grpclbResolutionList = Arrays.asList( 1365 new EquivalentAddressGroup( 1366 new FakeSocketAddress("fake-address-1"), 1367 lbAttributes("fake-authority-1")), 1368 new EquivalentAddressGroup( 1369 new FakeSocketAddress("fake-address-2"), 1370 lbAttributes("fake-authority-2")), 1371 new EquivalentAddressGroup( 1372 new FakeSocketAddress("not-a-lb-address")), 1373 new EquivalentAddressGroup( 1374 new FakeSocketAddress("fake-address-3"), 1375 lbAttributes("fake-authority-1"))); 1376 final EquivalentAddressGroup goldenOobChannelEag = new EquivalentAddressGroup( 1377 Arrays.<SocketAddress>asList( 1378 new FakeSocketAddress("fake-address-1"), 1379 new FakeSocketAddress("fake-address-3")), 1380 lbAttributes("fake-authority-1")); // Supporting multiple authorities would be good, one day 1381 1382 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 1383 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 1384 1385 verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1"); 1386 } 1387 1388 @Test grpclbBalancerStreamRetry()1389 public void grpclbBalancerStreamRetry() throws Exception { 1390 LoadBalanceRequest expectedInitialRequest = 1391 LoadBalanceRequest.newBuilder() 1392 .setInitialRequest( 1393 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) 1394 .build(); 1395 InOrder inOrder = 1396 inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2); 1397 List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); 1398 Attributes grpclbResolutionAttrs = Attributes.EMPTY; 1399 deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); 1400 1401 assertEquals(1, fakeOobChannels.size()); 1402 @SuppressWarnings("unused") 1403 ManagedChannel oobChannel = fakeOobChannels.poll(); 1404 1405 // First balancer RPC 1406 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1407 StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); 1408 assertEquals(1, lbRequestObservers.size()); 1409 StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); 1410 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1411 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1412 1413 // Balancer closes it immediately (erroneously) 1414 lbResponseObserver.onCompleted(); 1415 // Will start backoff sequence 1 (10ns) 1416 inOrder.verify(backoffPolicyProvider).get(); 1417 inOrder.verify(backoffPolicy1).nextBackoffNanos(); 1418 assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1419 1420 // Fast-forward to a moment before the retry 1421 fakeClock.forwardNanos(9); 1422 verifyNoMoreInteractions(mockLbService); 1423 // Then time for retry 1424 fakeClock.forwardNanos(1); 1425 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1426 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1427 assertEquals(1, lbRequestObservers.size()); 1428 lbRequestObserver = lbRequestObservers.poll(); 1429 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1430 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1431 1432 // Balancer closes it with an error. 1433 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 1434 // Will continue the backoff sequence 1 (100ns) 1435 verifyNoMoreInteractions(backoffPolicyProvider); 1436 inOrder.verify(backoffPolicy1).nextBackoffNanos(); 1437 assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1438 1439 // Fast-forward to a moment before the retry 1440 fakeClock.forwardNanos(100 - 1); 1441 verifyNoMoreInteractions(mockLbService); 1442 // Then time for retry 1443 fakeClock.forwardNanos(1); 1444 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1445 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1446 assertEquals(1, lbRequestObservers.size()); 1447 lbRequestObserver = lbRequestObservers.poll(); 1448 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1449 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1450 1451 // Balancer sends initial response. 1452 lbResponseObserver.onNext(buildInitialResponse()); 1453 1454 // Then breaks the RPC 1455 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 1456 1457 // Will reset the retry sequence and retry immediately, because balancer has responded. 1458 inOrder.verify(backoffPolicyProvider).get(); 1459 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1460 lbResponseObserver = lbResponseObserverCaptor.getValue(); 1461 assertEquals(1, lbRequestObservers.size()); 1462 lbRequestObserver = lbRequestObservers.poll(); 1463 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1464 1465 // Fail the retry after spending 4ns 1466 fakeClock.forwardNanos(4); 1467 lbResponseObserver.onError(Status.UNAVAILABLE.asException()); 1468 1469 // Will be on the first retry (10ns) of backoff sequence 2. 1470 inOrder.verify(backoffPolicy2).nextBackoffNanos(); 1471 assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1472 1473 // Fast-forward to a moment before the retry, the time spent in the last try is deducted. 1474 fakeClock.forwardNanos(10 - 4 - 1); 1475 verifyNoMoreInteractions(mockLbService); 1476 // Then time for retry 1477 fakeClock.forwardNanos(1); 1478 inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); 1479 assertEquals(1, lbRequestObservers.size()); 1480 lbRequestObserver = lbRequestObservers.poll(); 1481 verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); 1482 assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); 1483 1484 // Wrapping up 1485 verify(backoffPolicyProvider, times(2)).get(); 1486 verify(backoffPolicy1, times(2)).nextBackoffNanos(); 1487 verify(backoffPolicy2, times(1)).nextBackoffNanos(); 1488 } 1489 deliverSubchannelState( final Subchannel subchannel, final ConnectivityStateInfo newState)1490 private void deliverSubchannelState( 1491 final Subchannel subchannel, final ConnectivityStateInfo newState) { 1492 channelExecutor.execute(new Runnable() { 1493 @Override 1494 public void run() { 1495 balancer.handleSubchannelState(subchannel, newState); 1496 } 1497 }); 1498 } 1499 deliverNameResolutionError(final Status error)1500 private void deliverNameResolutionError(final Status error) { 1501 channelExecutor.execute(new Runnable() { 1502 @Override 1503 public void run() { 1504 balancer.handleNameResolutionError(error); 1505 } 1506 }); 1507 } 1508 deliverResolvedAddresses( final List<EquivalentAddressGroup> addrs, final Attributes attrs)1509 private void deliverResolvedAddresses( 1510 final List<EquivalentAddressGroup> addrs, final Attributes attrs) { 1511 channelExecutor.execute(new Runnable() { 1512 @Override 1513 public void run() { 1514 balancer.handleResolvedAddressGroups(addrs, attrs); 1515 } 1516 }); 1517 } 1518 getLoadRecorder()1519 private GrpclbClientLoadRecorder getLoadRecorder() { 1520 return balancer.getGrpclbState().getLoadRecorder(); 1521 } 1522 createResolvedServerAddresses(boolean ... isLb)1523 private static List<EquivalentAddressGroup> createResolvedServerAddresses(boolean ... isLb) { 1524 ArrayList<EquivalentAddressGroup> list = new ArrayList<>(); 1525 for (int i = 0; i < isLb.length; i++) { 1526 SocketAddress addr = new FakeSocketAddress("fake-address-" + i); 1527 EquivalentAddressGroup eag = 1528 new EquivalentAddressGroup( 1529 addr, 1530 isLb[i] ? lbAttributes(lbAuthority(i)) : Attributes.EMPTY); 1531 list.add(eag); 1532 } 1533 return list; 1534 } 1535 lbAuthority(int unused)1536 private static String lbAuthority(int unused) { 1537 // TODO(ejona): Support varying authorities 1538 return "lb.google.com"; 1539 } 1540 lbAttributes(String authority)1541 private static Attributes lbAttributes(String authority) { 1542 return Attributes.newBuilder() 1543 .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority) 1544 .build(); 1545 } 1546 buildInitialResponse()1547 private static LoadBalanceResponse buildInitialResponse() { 1548 return buildInitialResponse(0); 1549 } 1550 buildInitialResponse(long loadReportIntervalMillis)1551 private static LoadBalanceResponse buildInitialResponse(long loadReportIntervalMillis) { 1552 return LoadBalanceResponse.newBuilder() 1553 .setInitialResponse( 1554 InitialLoadBalanceResponse.newBuilder() 1555 .setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis))) 1556 .build(); 1557 } 1558 buildLbResponse(List<ServerEntry> servers)1559 private static LoadBalanceResponse buildLbResponse(List<ServerEntry> servers) { 1560 ServerList.Builder serverListBuilder = ServerList.newBuilder(); 1561 for (ServerEntry server : servers) { 1562 if (server.addr != null) { 1563 serverListBuilder.addServers(Server.newBuilder() 1564 .setIpAddress(ByteString.copyFrom(server.addr.getAddress().getAddress())) 1565 .setPort(server.addr.getPort()) 1566 .setLoadBalanceToken(server.token) 1567 .build()); 1568 } else { 1569 serverListBuilder.addServers(Server.newBuilder() 1570 .setDrop(true) 1571 .setLoadBalanceToken(server.token) 1572 .build()); 1573 } 1574 } 1575 return LoadBalanceResponse.newBuilder() 1576 .setServerList(serverListBuilder.build()) 1577 .build(); 1578 } 1579 1580 private static class ServerEntry { 1581 final InetSocketAddress addr; 1582 final String token; 1583 ServerEntry(String host, int port, String token)1584 ServerEntry(String host, int port, String token) { 1585 this.addr = new InetSocketAddress(host, port); 1586 this.token = token; 1587 } 1588 1589 // Drop entry ServerEntry(String token)1590 ServerEntry(String token) { 1591 this.addr = null; 1592 this.token = token; 1593 } 1594 } 1595 } 1596