1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static io.grpc.InternalChannelz.id; 21 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; 22 import static org.junit.Assert.assertEquals; 23 import static org.junit.Assert.assertFalse; 24 import static org.junit.Assert.assertNotNull; 25 import static org.junit.Assert.assertNotSame; 26 import static org.junit.Assert.assertNull; 27 import static org.junit.Assert.assertSame; 28 import static org.junit.Assert.assertTrue; 29 import static org.junit.Assert.fail; 30 import static org.mockito.AdditionalAnswers.delegatesTo; 31 import static org.mockito.Matchers.any; 32 import static org.mockito.Matchers.anyString; 33 import static org.mockito.Matchers.eq; 34 import static org.mockito.Matchers.isA; 35 import static org.mockito.Matchers.isNotNull; 36 import static org.mockito.Matchers.notNull; 37 import static org.mockito.Matchers.same; 38 import static org.mockito.Mockito.atLeast; 39 import static org.mockito.Mockito.doThrow; 40 import static org.mockito.Mockito.mock; 41 import static org.mockito.Mockito.never; 42 import static org.mockito.Mockito.times; 43 import static org.mockito.Mockito.verify; 44 import static org.mockito.Mockito.verifyNoMoreInteractions; 45 import static org.mockito.Mockito.when; 46 47 import com.google.common.util.concurrent.ListenableFuture; 48 import com.google.common.util.concurrent.MoreExecutors; 49 import com.google.common.util.concurrent.SettableFuture; 50 import io.grpc.Attributes; 51 import io.grpc.BinaryLog; 52 import io.grpc.Channel; 53 import io.grpc.Compressor; 54 import io.grpc.Context; 55 import io.grpc.Grpc; 56 import io.grpc.HandlerRegistry; 57 import io.grpc.IntegerMarshaller; 58 import io.grpc.InternalChannelz; 59 import io.grpc.InternalChannelz.ServerSocketsList; 60 import io.grpc.InternalChannelz.SocketStats; 61 import io.grpc.InternalInstrumented; 62 import io.grpc.InternalLogId; 63 import io.grpc.InternalServerInterceptors; 64 import io.grpc.Metadata; 65 import io.grpc.MethodDescriptor; 66 import io.grpc.ServerCall; 67 import io.grpc.ServerCall.Listener; 68 import io.grpc.ServerCallHandler; 69 import io.grpc.ServerInterceptor; 70 import io.grpc.ServerMethodDefinition; 71 import io.grpc.ServerServiceDefinition; 72 import io.grpc.ServerStreamTracer; 73 import io.grpc.ServerTransportFilter; 74 import io.grpc.ServiceDescriptor; 75 import io.grpc.Status; 76 import io.grpc.StringMarshaller; 77 import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener; 78 import io.grpc.internal.testing.SingleMessageProducer; 79 import io.grpc.internal.testing.TestServerStreamTracer; 80 import io.grpc.util.MutableHandlerRegistry; 81 import java.io.ByteArrayInputStream; 82 import java.io.File; 83 import java.io.IOException; 84 import java.io.InputStream; 85 import java.net.SocketAddress; 86 import java.util.Arrays; 87 import java.util.Collections; 88 import java.util.LinkedList; 89 import java.util.List; 90 import java.util.concurrent.CyclicBarrier; 91 import java.util.concurrent.Executor; 92 import java.util.concurrent.ScheduledExecutorService; 93 import java.util.concurrent.TimeUnit; 94 import java.util.concurrent.atomic.AtomicBoolean; 95 import java.util.concurrent.atomic.AtomicInteger; 96 import java.util.concurrent.atomic.AtomicReference; 97 import javax.annotation.Nullable; 98 import org.junit.After; 99 import org.junit.Before; 100 import org.junit.BeforeClass; 101 import org.junit.Rule; 102 import org.junit.Test; 103 import org.junit.rules.ExpectedException; 104 import org.junit.runner.RunWith; 105 import org.junit.runners.JUnit4; 106 import org.mockito.ArgumentCaptor; 107 import org.mockito.Captor; 108 import org.mockito.Matchers; 109 import org.mockito.Mock; 110 import org.mockito.MockitoAnnotations; 111 112 /** Unit tests for {@link ServerImpl}. */ 113 @RunWith(JUnit4.class) 114 public class ServerImplTest { 115 private static final IntegerMarshaller INTEGER_MARSHALLER = IntegerMarshaller.INSTANCE; 116 private static final StringMarshaller STRING_MARSHALLER = StringMarshaller.INSTANCE; 117 private static final MethodDescriptor<String, Integer> METHOD = 118 MethodDescriptor.<String, Integer>newBuilder() 119 .setType(MethodDescriptor.MethodType.UNKNOWN) 120 .setFullMethodName("Waiter/serve") 121 .setRequestMarshaller(STRING_MARSHALLER) 122 .setResponseMarshaller(INTEGER_MARSHALLER) 123 .build(); 124 private static final Context.Key<String> SERVER_ONLY = Context.key("serverOnly"); 125 private static final Context.Key<String> SERVER_TRACER_ADDED_KEY = Context.key("tracer-added"); 126 private static final Context.CancellableContext SERVER_CONTEXT = 127 Context.ROOT.withValue(SERVER_ONLY, "yes").withCancellation(); 128 private static final FakeClock.TaskFilter CONTEXT_CLOSER_TASK_FITLER = 129 new FakeClock.TaskFilter() { 130 @Override 131 public boolean shouldAccept(Runnable runnable) { 132 return runnable instanceof ServerImpl.ContextCloser; 133 } 134 }; 135 private static final String AUTHORITY = "some_authority"; 136 137 @Rule public final ExpectedException thrown = ExpectedException.none(); 138 139 @BeforeClass beforeStartUp()140 public static void beforeStartUp() { 141 // Cancel the root context. Server will fork it so the per-call context should not 142 // be cancelled. 143 SERVER_CONTEXT.cancel(null); 144 } 145 146 private final FakeClock executor = new FakeClock(); 147 private final FakeClock timer = new FakeClock(); 148 private final InternalChannelz channelz = new InternalChannelz(); 149 150 @Mock 151 private ServerStreamTracer.Factory streamTracerFactory; 152 private List<ServerStreamTracer.Factory> streamTracerFactories; 153 private final TestServerStreamTracer streamTracer = new TestServerStreamTracer() { 154 @Override 155 public Context filterContext(Context context) { 156 Context newCtx = super.filterContext(context); 157 return newCtx.withValue(SERVER_TRACER_ADDED_KEY, "context added by tracer"); 158 } 159 }; 160 @Mock 161 private ObjectPool<Executor> executorPool; 162 private Builder builder = new Builder(); 163 private MutableHandlerRegistry mutableFallbackRegistry = new MutableHandlerRegistry(); 164 private HandlerRegistry fallbackRegistry = mock( 165 HandlerRegistry.class, 166 delegatesTo(new HandlerRegistry() { 167 @Override 168 public ServerMethodDefinition<?, ?> lookupMethod( 169 String methodName, @Nullable String authority) { 170 return mutableFallbackRegistry.lookupMethod(methodName, authority); 171 } 172 173 @Override 174 public List<ServerServiceDefinition> getServices() { 175 return mutableFallbackRegistry.getServices(); 176 } 177 })); 178 private SimpleServer transportServer = new SimpleServer(); 179 private ServerImpl server; 180 181 @Captor 182 private ArgumentCaptor<Status> statusCaptor; 183 @Captor 184 private ArgumentCaptor<Metadata> metadataCaptor; 185 @Captor 186 private ArgumentCaptor<ServerStreamListener> streamListenerCaptor; 187 188 @Mock 189 private ServerStream stream; 190 @Mock 191 private ServerCall.Listener<String> callListener; 192 @Mock 193 private ServerCallHandler<String, Integer> callHandler; 194 195 /** Set up for test. */ 196 @Before startUp()197 public void startUp() throws IOException { 198 MockitoAnnotations.initMocks(this); 199 builder.channelz = channelz; 200 streamTracerFactories = Arrays.asList(streamTracerFactory); 201 when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); 202 when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class))) 203 .thenReturn(streamTracer); 204 when(stream.getAuthority()).thenReturn(AUTHORITY); 205 } 206 207 @After noPendingTasks()208 public void noPendingTasks() { 209 assertEquals(0, executor.numPendingTasks()); 210 assertEquals(0, timer.numPendingTasks()); 211 } 212 213 @Test startStopImmediate()214 public void startStopImmediate() throws IOException { 215 transportServer = new SimpleServer() { 216 @Override 217 public void shutdown() {} 218 }; 219 createAndStartServer(); 220 server.shutdown(); 221 assertTrue(server.isShutdown()); 222 assertFalse(server.isTerminated()); 223 transportServer.listener.serverShutdown(); 224 assertTrue(server.isTerminated()); 225 } 226 227 @Test stopImmediate()228 public void stopImmediate() throws IOException { 229 transportServer = new SimpleServer() { 230 @Override 231 public void shutdown() { 232 throw new AssertionError("Should not be called, because wasn't started"); 233 } 234 }; 235 createServer(); 236 server.shutdown(); 237 assertTrue(server.isShutdown()); 238 assertTrue(server.isTerminated()); 239 verifyNoMoreInteractions(executorPool); 240 } 241 242 @Test startStopImmediateWithChildTransport()243 public void startStopImmediateWithChildTransport() throws IOException { 244 createAndStartServer(); 245 verifyExecutorsAcquired(); 246 class DelayedShutdownServerTransport extends SimpleServerTransport { 247 boolean shutdown; 248 249 @Override 250 public void shutdown() { 251 shutdown = true; 252 } 253 } 254 255 DelayedShutdownServerTransport serverTransport = new DelayedShutdownServerTransport(); 256 transportServer.registerNewServerTransport(serverTransport); 257 server.shutdown(); 258 assertTrue(server.isShutdown()); 259 assertFalse(server.isTerminated()); 260 assertTrue(serverTransport.shutdown); 261 verifyExecutorsNotReturned(); 262 263 serverTransport.listener.transportTerminated(); 264 assertTrue(server.isTerminated()); 265 verifyExecutorsReturned(); 266 } 267 268 @Test startShutdownNowImmediateWithChildTransport()269 public void startShutdownNowImmediateWithChildTransport() throws IOException { 270 createAndStartServer(); 271 verifyExecutorsAcquired(); 272 class DelayedShutdownServerTransport extends SimpleServerTransport { 273 boolean shutdown; 274 275 @Override 276 public void shutdown() {} 277 278 @Override 279 public void shutdownNow(Status reason) { 280 shutdown = true; 281 } 282 } 283 284 DelayedShutdownServerTransport serverTransport = new DelayedShutdownServerTransport(); 285 transportServer.registerNewServerTransport(serverTransport); 286 server.shutdownNow(); 287 assertTrue(server.isShutdown()); 288 assertFalse(server.isTerminated()); 289 assertTrue(serverTransport.shutdown); 290 verifyExecutorsNotReturned(); 291 292 serverTransport.listener.transportTerminated(); 293 assertTrue(server.isTerminated()); 294 verifyExecutorsReturned(); 295 } 296 297 @Test shutdownNowAfterShutdown()298 public void shutdownNowAfterShutdown() throws IOException { 299 createAndStartServer(); 300 verifyExecutorsAcquired(); 301 class DelayedShutdownServerTransport extends SimpleServerTransport { 302 boolean shutdown; 303 304 @Override 305 public void shutdown() {} 306 307 @Override 308 public void shutdownNow(Status reason) { 309 shutdown = true; 310 } 311 } 312 313 DelayedShutdownServerTransport serverTransport = new DelayedShutdownServerTransport(); 314 transportServer.registerNewServerTransport(serverTransport); 315 server.shutdown(); 316 assertTrue(server.isShutdown()); 317 server.shutdownNow(); 318 assertFalse(server.isTerminated()); 319 assertTrue(serverTransport.shutdown); 320 verifyExecutorsNotReturned(); 321 322 serverTransport.listener.transportTerminated(); 323 assertTrue(server.isTerminated()); 324 verifyExecutorsReturned(); 325 } 326 327 @Test shutdownNowAfterSlowShutdown()328 public void shutdownNowAfterSlowShutdown() throws IOException { 329 transportServer = new SimpleServer() { 330 @Override 331 public void shutdown() { 332 // Don't call super which calls listener.serverShutdown(). We'll call it manually. 333 } 334 }; 335 createAndStartServer(); 336 verifyExecutorsAcquired(); 337 class DelayedShutdownServerTransport extends SimpleServerTransport { 338 boolean shutdown; 339 340 @Override 341 public void shutdown() {} 342 343 @Override 344 public void shutdownNow(Status reason) { 345 shutdown = true; 346 } 347 } 348 349 DelayedShutdownServerTransport serverTransport = new DelayedShutdownServerTransport(); 350 transportServer.registerNewServerTransport(serverTransport); 351 server.shutdown(); 352 server.shutdownNow(); 353 transportServer.listener.serverShutdown(); 354 assertTrue(server.isShutdown()); 355 assertFalse(server.isTerminated()); 356 357 verifyExecutorsNotReturned(); 358 serverTransport.listener.transportTerminated(); 359 verifyExecutorsReturned(); 360 assertTrue(server.isTerminated()); 361 } 362 363 @Test transportServerFailsStartup()364 public void transportServerFailsStartup() { 365 final IOException ex = new IOException(); 366 class FailingStartupServer extends SimpleServer { 367 @Override 368 public void start(ServerListener listener) throws IOException { 369 throw ex; 370 } 371 } 372 373 transportServer = new FailingStartupServer(); 374 createServer(); 375 try { 376 server.start(); 377 fail("expected exception"); 378 } catch (IOException e) { 379 assertSame(ex, e); 380 } 381 verifyNoMoreInteractions(executorPool); 382 } 383 384 @Test transportHandshakeTimeout_expired()385 public void transportHandshakeTimeout_expired() throws Exception { 386 class ShutdownRecordingTransport extends SimpleServerTransport { 387 Status shutdownNowStatus; 388 389 @Override public void shutdownNow(Status status) { 390 shutdownNowStatus = status; 391 super.shutdownNow(status); 392 } 393 } 394 395 builder.handshakeTimeout(60, TimeUnit.SECONDS); 396 createAndStartServer(); 397 ShutdownRecordingTransport serverTransport = new ShutdownRecordingTransport(); 398 transportServer.registerNewServerTransport(serverTransport); 399 timer.forwardTime(59, TimeUnit.SECONDS); 400 assertNull("shutdownNow status", serverTransport.shutdownNowStatus); 401 // Don't call transportReady() in time 402 timer.forwardTime(2, TimeUnit.SECONDS); 403 assertNotNull("shutdownNow status", serverTransport.shutdownNowStatus); 404 } 405 406 @Test methodNotFound()407 public void methodNotFound() throws Exception { 408 createAndStartServer(); 409 ServerTransportListener transportListener 410 = transportServer.registerNewServerTransport(new SimpleServerTransport()); 411 transportListener.transportReady(Attributes.EMPTY); 412 Metadata requestHeaders = new Metadata(); 413 StatsTraceContext statsTraceCtx = 414 StatsTraceContext.newServerContext( 415 streamTracerFactories, "Waiter/nonexist", requestHeaders); 416 when(stream.statsTraceContext()).thenReturn(statsTraceCtx); 417 transportListener.streamCreated(stream, "Waiter/nonexist", requestHeaders); 418 verify(stream).setListener(isA(ServerStreamListener.class)); 419 verify(stream, atLeast(1)).statsTraceContext(); 420 421 assertEquals(1, executor.runDueTasks()); 422 verify(stream).close(statusCaptor.capture(), any(Metadata.class)); 423 Status status = statusCaptor.getValue(); 424 assertEquals(Status.Code.UNIMPLEMENTED, status.getCode()); 425 assertEquals("Method not found: Waiter/nonexist", status.getDescription()); 426 427 verify(streamTracerFactory).newServerStreamTracer(eq("Waiter/nonexist"), same(requestHeaders)); 428 assertNull(streamTracer.getServerCallInfo()); 429 assertEquals(Status.Code.UNIMPLEMENTED, statusCaptor.getValue().getCode()); 430 } 431 432 @Test decompressorNotFound()433 public void decompressorNotFound() throws Exception { 434 String decompressorName = "NON_EXISTENT_DECOMPRESSOR"; 435 createAndStartServer(); 436 ServerTransportListener transportListener 437 = transportServer.registerNewServerTransport(new SimpleServerTransport()); 438 transportListener.transportReady(Attributes.EMPTY); 439 Metadata requestHeaders = new Metadata(); 440 requestHeaders.put(MESSAGE_ENCODING_KEY, decompressorName); 441 StatsTraceContext statsTraceCtx = 442 StatsTraceContext.newServerContext( 443 streamTracerFactories, "Waiter/nonexist", requestHeaders); 444 when(stream.statsTraceContext()).thenReturn(statsTraceCtx); 445 446 transportListener.streamCreated(stream, "Waiter/nonexist", requestHeaders); 447 448 verify(stream).close(statusCaptor.capture(), any(Metadata.class)); 449 Status status = statusCaptor.getValue(); 450 assertEquals(Status.Code.UNIMPLEMENTED, status.getCode()); 451 assertEquals("Can't find decompressor for " + decompressorName, status.getDescription()); 452 verifyNoMoreInteractions(stream); 453 } 454 455 @Test basicExchangeSuccessful()456 public void basicExchangeSuccessful() throws Exception { 457 createAndStartServer(); 458 basicExchangeHelper(METHOD, "Lots of pizza, please", 314, 50); 459 } 460 basicExchangeHelper( MethodDescriptor<String, Integer> method, String request, int firstResponse, Integer extraResponse)461 private void basicExchangeHelper( 462 MethodDescriptor<String, Integer> method, 463 String request, 464 int firstResponse, 465 Integer extraResponse) throws Exception { 466 final Metadata.Key<String> metadataKey 467 = Metadata.Key.of("inception", Metadata.ASCII_STRING_MARSHALLER); 468 final AtomicReference<ServerCall<String, Integer>> callReference 469 = new AtomicReference<ServerCall<String, Integer>>(); 470 final AtomicReference<Context> callContextReference = new AtomicReference<Context>(); 471 mutableFallbackRegistry.addService(ServerServiceDefinition.builder( 472 new ServiceDescriptor("Waiter", method)) 473 .addMethod( 474 method, 475 new ServerCallHandler<String, Integer>() { 476 @Override 477 public ServerCall.Listener<String> startCall( 478 ServerCall<String, Integer> call, 479 Metadata headers) { 480 assertEquals("Waiter/serve", call.getMethodDescriptor().getFullMethodName()); 481 assertNotNull(call); 482 assertNotNull(headers); 483 assertEquals("value", headers.get(metadataKey)); 484 callReference.set(call); 485 callContextReference.set(Context.current()); 486 return callListener; 487 } 488 }).build()); 489 ServerTransportListener transportListener 490 = transportServer.registerNewServerTransport(new SimpleServerTransport()); 491 transportListener.transportReady(Attributes.EMPTY); 492 493 Metadata requestHeaders = new Metadata(); 494 requestHeaders.put(metadataKey, "value"); 495 StatsTraceContext statsTraceCtx = 496 StatsTraceContext.newServerContext(streamTracerFactories, "Waiter/serve", requestHeaders); 497 when(stream.statsTraceContext()).thenReturn(statsTraceCtx); 498 499 transportListener.streamCreated(stream, "Waiter/serve", requestHeaders); 500 verify(stream).setListener(streamListenerCaptor.capture()); 501 ServerStreamListener streamListener = streamListenerCaptor.getValue(); 502 assertNotNull(streamListener); 503 verify(stream, atLeast(1)).statsTraceContext(); 504 verify(fallbackRegistry, never()).lookupMethod(any(String.class), any(String.class)); 505 506 assertEquals(1, executor.runDueTasks()); 507 ServerCall<String, Integer> call = callReference.get(); 508 assertNotNull(call); 509 assertEquals( 510 new ServerCallInfoImpl<String, Integer>( 511 call.getMethodDescriptor(), 512 call.getAttributes(), 513 call.getAuthority()), 514 streamTracer.getServerCallInfo()); 515 verify(fallbackRegistry).lookupMethod("Waiter/serve", AUTHORITY); 516 Context callContext = callContextReference.get(); 517 assertNotNull(callContext); 518 assertEquals("context added by tracer", SERVER_TRACER_ADDED_KEY.get(callContext)); 519 520 streamListener.messagesAvailable(new SingleMessageProducer(STRING_MARSHALLER.stream(request))); 521 assertEquals(1, executor.runDueTasks()); 522 verify(callListener).onMessage(request); 523 524 Metadata responseHeaders = new Metadata(); 525 responseHeaders.put(metadataKey, "response value"); 526 call.sendHeaders(responseHeaders); 527 verify(stream).writeHeaders(responseHeaders); 528 verify(stream).setCompressor(isA(Compressor.class)); 529 530 call.sendMessage(firstResponse); 531 ArgumentCaptor<InputStream> inputCaptor = ArgumentCaptor.forClass(InputStream.class); 532 verify(stream).writeMessage(inputCaptor.capture()); 533 verify(stream).flush(); 534 assertEquals(firstResponse, INTEGER_MARSHALLER.parse(inputCaptor.getValue()).intValue()); 535 536 streamListener.halfClosed(); // All full; no dessert. 537 assertEquals(1, executor.runDueTasks()); 538 verify(callListener).onHalfClose(); 539 540 if (extraResponse != null) { 541 call.sendMessage(extraResponse); 542 verify(stream, times(2)).writeMessage(inputCaptor.capture()); 543 verify(stream, times(2)).flush(); 544 assertEquals( 545 (int) extraResponse, INTEGER_MARSHALLER.parse(inputCaptor.getValue()).intValue()); 546 } 547 548 Metadata trailers = new Metadata(); 549 trailers.put(metadataKey, "another value"); 550 Status status = Status.OK.withDescription("A okay"); 551 call.close(status, trailers); 552 verify(stream).close(status, trailers); 553 554 streamListener.closed(Status.OK); 555 assertEquals(1, executor.runDueTasks()); 556 verify(callListener).onComplete(); 557 558 verify(stream, atLeast(1)).statsTraceContext(); 559 verifyNoMoreInteractions(callListener); 560 561 verify(streamTracerFactory).newServerStreamTracer(eq("Waiter/serve"), same(requestHeaders)); 562 } 563 564 @Test transportFilters()565 public void transportFilters() throws Exception { 566 final SocketAddress remoteAddr = mock(SocketAddress.class); 567 final Attributes.Key<String> key1 = Attributes.Key.create("test-key1"); 568 final Attributes.Key<String> key2 = Attributes.Key.create("test-key2"); 569 final Attributes.Key<String> key3 = Attributes.Key.create("test-key3"); 570 final AtomicReference<Attributes> filter1TerminationCallbackArgument = 571 new AtomicReference<Attributes>(); 572 final AtomicReference<Attributes> filter2TerminationCallbackArgument = 573 new AtomicReference<Attributes>(); 574 final AtomicInteger readyCallbackCalled = new AtomicInteger(0); 575 final AtomicInteger terminationCallbackCalled = new AtomicInteger(0); 576 builder.addTransportFilter(new ServerTransportFilter() { 577 @Override 578 public Attributes transportReady(Attributes attrs) { 579 assertEquals(Attributes.newBuilder() 580 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, remoteAddr) 581 .build(), attrs); 582 readyCallbackCalled.incrementAndGet(); 583 return attrs.toBuilder() 584 .set(key1, "yalayala") 585 .set(key2, "blabla") 586 .build(); 587 } 588 589 @Override 590 public void transportTerminated(Attributes attrs) { 591 terminationCallbackCalled.incrementAndGet(); 592 filter1TerminationCallbackArgument.set(attrs); 593 } 594 }); 595 builder.addTransportFilter(new ServerTransportFilter() { 596 @Override 597 public Attributes transportReady(Attributes attrs) { 598 assertEquals(Attributes.newBuilder() 599 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, remoteAddr) 600 .set(key1, "yalayala") 601 .set(key2, "blabla") 602 .build(), attrs); 603 readyCallbackCalled.incrementAndGet(); 604 return attrs.toBuilder() 605 .set(key1, "ouch") 606 .set(key3, "puff") 607 .build(); 608 } 609 610 @Override 611 public void transportTerminated(Attributes attrs) { 612 terminationCallbackCalled.incrementAndGet(); 613 filter2TerminationCallbackArgument.set(attrs); 614 } 615 }); 616 Attributes expectedTransportAttrs = Attributes.newBuilder() 617 .set(key1, "ouch") 618 .set(key2, "blabla") 619 .set(key3, "puff") 620 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, remoteAddr) 621 .build(); 622 623 createAndStartServer(); 624 ServerTransportListener transportListener 625 = transportServer.registerNewServerTransport(new SimpleServerTransport()); 626 Attributes transportAttrs = transportListener.transportReady(Attributes.newBuilder() 627 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, remoteAddr).build()); 628 629 assertEquals(expectedTransportAttrs, transportAttrs); 630 631 server.shutdown(); 632 server.awaitTermination(); 633 634 assertEquals(expectedTransportAttrs, filter1TerminationCallbackArgument.get()); 635 assertEquals(expectedTransportAttrs, filter2TerminationCallbackArgument.get()); 636 assertEquals(2, readyCallbackCalled.get()); 637 assertEquals(2, terminationCallbackCalled.get()); 638 } 639 640 @Test interceptors()641 public void interceptors() throws Exception { 642 final LinkedList<Context> capturedContexts = new LinkedList<Context>(); 643 final Context.Key<String> key1 = Context.key("key1"); 644 final Context.Key<String> key2 = Context.key("key2"); 645 final Context.Key<String> key3 = Context.key("key3"); 646 ServerInterceptor intercepter1 = new ServerInterceptor() { 647 @Override 648 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( 649 ServerCall<ReqT, RespT> call, 650 Metadata headers, 651 ServerCallHandler<ReqT, RespT> next) { 652 Context ctx = Context.current().withValue(key1, "value1"); 653 Context origCtx = ctx.attach(); 654 try { 655 capturedContexts.add(ctx); 656 return next.startCall(call, headers); 657 } finally { 658 ctx.detach(origCtx); 659 } 660 } 661 }; 662 ServerInterceptor intercepter2 = new ServerInterceptor() { 663 @Override 664 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( 665 ServerCall<ReqT, RespT> call, 666 Metadata headers, 667 ServerCallHandler<ReqT, RespT> next) { 668 Context ctx = Context.current().withValue(key2, "value2"); 669 Context origCtx = ctx.attach(); 670 try { 671 capturedContexts.add(ctx); 672 return next.startCall(call, headers); 673 } finally { 674 ctx.detach(origCtx); 675 } 676 } 677 }; 678 ServerCallHandler<String, Integer> callHandler = new ServerCallHandler<String, Integer>() { 679 @Override 680 public ServerCall.Listener<String> startCall( 681 ServerCall<String, Integer> call, 682 Metadata headers) { 683 capturedContexts.add(Context.current().withValue(key3, "value3")); 684 return callListener; 685 } 686 }; 687 688 mutableFallbackRegistry.addService( 689 ServerServiceDefinition.builder(new ServiceDescriptor("Waiter", METHOD)) 690 .addMethod(METHOD, callHandler).build()); 691 builder.intercept(intercepter2); 692 builder.intercept(intercepter1); 693 createServer(); 694 server.start(); 695 696 ServerTransportListener transportListener 697 = transportServer.registerNewServerTransport(new SimpleServerTransport()); 698 transportListener.transportReady(Attributes.EMPTY); 699 700 Metadata requestHeaders = new Metadata(); 701 StatsTraceContext statsTraceCtx = 702 StatsTraceContext.newServerContext(streamTracerFactories, "Waiter/serve", requestHeaders); 703 when(stream.statsTraceContext()).thenReturn(statsTraceCtx); 704 705 transportListener.streamCreated(stream, "Waiter/serve", requestHeaders); 706 assertEquals(1, executor.runDueTasks()); 707 708 Context ctx1 = capturedContexts.poll(); 709 assertEquals("value1", key1.get(ctx1)); 710 assertNull(key2.get(ctx1)); 711 assertNull(key3.get(ctx1)); 712 713 Context ctx2 = capturedContexts.poll(); 714 assertEquals("value1", key1.get(ctx2)); 715 assertEquals("value2", key2.get(ctx2)); 716 assertNull(key3.get(ctx2)); 717 718 Context ctx3 = capturedContexts.poll(); 719 assertEquals("value1", key1.get(ctx3)); 720 assertEquals("value2", key2.get(ctx3)); 721 assertEquals("value3", key3.get(ctx3)); 722 723 assertTrue(capturedContexts.isEmpty()); 724 } 725 726 @Test exceptionInStartCallPropagatesToStream()727 public void exceptionInStartCallPropagatesToStream() throws Exception { 728 createAndStartServer(); 729 final Status status = Status.ABORTED.withDescription("Oh, no!"); 730 mutableFallbackRegistry.addService(ServerServiceDefinition.builder( 731 new ServiceDescriptor("Waiter", METHOD)) 732 .addMethod(METHOD, 733 new ServerCallHandler<String, Integer>() { 734 @Override 735 public ServerCall.Listener<String> startCall( 736 ServerCall<String, Integer> call, 737 Metadata headers) { 738 throw status.asRuntimeException(); 739 } 740 }).build()); 741 ServerTransportListener transportListener 742 = transportServer.registerNewServerTransport(new SimpleServerTransport()); 743 transportListener.transportReady(Attributes.EMPTY); 744 745 Metadata requestHeaders = new Metadata(); 746 StatsTraceContext statsTraceCtx = 747 StatsTraceContext.newServerContext(streamTracerFactories, "Waiter/serve", requestHeaders); 748 when(stream.statsTraceContext()).thenReturn(statsTraceCtx); 749 750 transportListener.streamCreated(stream, "Waiter/serve", requestHeaders); 751 verify(stream).setListener(streamListenerCaptor.capture()); 752 ServerStreamListener streamListener = streamListenerCaptor.getValue(); 753 assertNotNull(streamListener); 754 verify(stream, atLeast(1)).statsTraceContext(); 755 verifyNoMoreInteractions(stream); 756 verify(fallbackRegistry, never()).lookupMethod(any(String.class), any(String.class)); 757 758 assertEquals(1, executor.runDueTasks()); 759 verify(fallbackRegistry).lookupMethod("Waiter/serve", AUTHORITY); 760 verify(stream).close(same(status), notNull(Metadata.class)); 761 verify(stream, atLeast(1)).statsTraceContext(); 762 } 763 764 @Test testNoDeadlockOnShutdown()765 public void testNoDeadlockOnShutdown() throws Exception { 766 final Object lock = new Object(); 767 final CyclicBarrier barrier = new CyclicBarrier(2); 768 class MaybeDeadlockingServer extends SimpleServer { 769 @Override 770 public void shutdown() { 771 // To deadlock, a lock would need to be held while this method is in progress. 772 try { 773 barrier.await(); 774 } catch (Exception ex) { 775 throw new AssertionError(ex); 776 } 777 // If deadlock is possible with this setup, this sychronization completes the loop because 778 // the serverShutdown needs a lock that Server is holding while calling this method. 779 synchronized (lock) { 780 } 781 } 782 } 783 784 transportServer = new MaybeDeadlockingServer(); 785 createAndStartServer(); 786 new Thread() { 787 @Override 788 public void run() { 789 synchronized (lock) { 790 try { 791 barrier.await(); 792 } catch (Exception ex) { 793 throw new AssertionError(ex); 794 } 795 // To deadlock, a lock would be needed for this call to proceed. 796 transportServer.listener.serverShutdown(); 797 } 798 } 799 }.start(); 800 server.shutdown(); 801 } 802 803 @Test testNoDeadlockOnTransportShutdown()804 public void testNoDeadlockOnTransportShutdown() throws Exception { 805 createAndStartServer(); 806 final Object lock = new Object(); 807 final CyclicBarrier barrier = new CyclicBarrier(2); 808 class MaybeDeadlockingServerTransport extends SimpleServerTransport { 809 @Override 810 public void shutdown() { 811 // To deadlock, a lock would need to be held while this method is in progress. 812 try { 813 barrier.await(); 814 } catch (Exception ex) { 815 throw new AssertionError(ex); 816 } 817 // If deadlock is possible with this setup, this sychronization completes the loop 818 // because the transportTerminated needs a lock that Server is holding while calling this 819 // method. 820 synchronized (lock) { 821 } 822 } 823 } 824 825 final ServerTransportListener transportListener 826 = transportServer.registerNewServerTransport(new MaybeDeadlockingServerTransport()); 827 new Thread() { 828 @Override 829 public void run() { 830 synchronized (lock) { 831 try { 832 barrier.await(); 833 } catch (Exception ex) { 834 throw new AssertionError(ex); 835 } 836 // To deadlock, a lock would be needed for this call to proceed. 837 transportListener.transportTerminated(); 838 } 839 } 840 }.start(); 841 server.shutdown(); 842 } 843 844 @Test testCallContextIsBoundInListenerCallbacks()845 public void testCallContextIsBoundInListenerCallbacks() throws Exception { 846 createAndStartServer(); 847 final AtomicBoolean onReadyCalled = new AtomicBoolean(false); 848 final AtomicBoolean onMessageCalled = new AtomicBoolean(false); 849 final AtomicBoolean onHalfCloseCalled = new AtomicBoolean(false); 850 final AtomicBoolean onCancelCalled = new AtomicBoolean(false); 851 mutableFallbackRegistry.addService(ServerServiceDefinition.builder( 852 new ServiceDescriptor("Waiter", METHOD)) 853 .addMethod( 854 METHOD, 855 new ServerCallHandler<String, Integer>() { 856 @Override 857 public ServerCall.Listener<String> startCall( 858 ServerCall<String, Integer> call, 859 Metadata headers) { 860 // Check that the current context is a descendant of SERVER_CONTEXT 861 final Context initial = Context.current(); 862 assertEquals("yes", SERVER_ONLY.get(initial)); 863 assertNotSame(SERVER_CONTEXT, initial); 864 assertFalse(initial.isCancelled()); 865 return new ServerCall.Listener<String>() { 866 867 @Override 868 public void onReady() { 869 checkContext(); 870 onReadyCalled.set(true); 871 } 872 873 @Override 874 public void onMessage(String message) { 875 checkContext(); 876 onMessageCalled.set(true); 877 } 878 879 @Override 880 public void onHalfClose() { 881 checkContext(); 882 onHalfCloseCalled.set(true); 883 } 884 885 @Override 886 public void onCancel() { 887 checkContext(); 888 onCancelCalled.set(true); 889 } 890 891 @Override 892 public void onComplete() { 893 checkContext(); 894 } 895 896 private void checkContext() { 897 // Check that the bound context is the same as the initial one. 898 assertSame(initial, Context.current()); 899 } 900 }; 901 } 902 }).build()); 903 ServerTransportListener transportListener 904 = transportServer.registerNewServerTransport(new SimpleServerTransport()); 905 transportListener.transportReady(Attributes.EMPTY); 906 907 Metadata requestHeaders = new Metadata(); 908 StatsTraceContext statsTraceCtx = 909 StatsTraceContext.newServerContext(streamTracerFactories, "Waitier/serve", requestHeaders); 910 when(stream.statsTraceContext()).thenReturn(statsTraceCtx); 911 912 transportListener.streamCreated(stream, "Waiter/serve", requestHeaders); 913 verify(stream).setListener(streamListenerCaptor.capture()); 914 ServerStreamListener streamListener = streamListenerCaptor.getValue(); 915 assertNotNull(streamListener); 916 917 streamListener.onReady(); 918 assertEquals(1, executor.runDueTasks()); 919 assertTrue(onReadyCalled.get()); 920 921 streamListener 922 .messagesAvailable(new SingleMessageProducer(new ByteArrayInputStream(new byte[0]))); 923 assertEquals(1, executor.runDueTasks()); 924 assertTrue(onMessageCalled.get()); 925 926 streamListener.halfClosed(); 927 assertEquals(1, executor.runDueTasks()); 928 assertTrue(onHalfCloseCalled.get()); 929 930 streamListener.closed(Status.CANCELLED); 931 assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FITLER)); 932 assertEquals(2, executor.runDueTasks()); 933 assertTrue(onCancelCalled.get()); 934 935 // Close should never be called if asserts in listener pass. 936 verify(stream, times(0)).close(isA(Status.class), isNotNull(Metadata.class)); 937 } 938 testClientClose_setup( final AtomicReference<ServerCall<String, Integer>> callReference, final AtomicReference<Context> context, final AtomicBoolean contextCancelled)939 private ServerStreamListener testClientClose_setup( 940 final AtomicReference<ServerCall<String, Integer>> callReference, 941 final AtomicReference<Context> context, 942 final AtomicBoolean contextCancelled) throws Exception { 943 createAndStartServer(); 944 callListener = new ServerCall.Listener<String>() { 945 @Override 946 public void onReady() { 947 context.set(Context.current()); 948 Context.current().addListener(new Context.CancellationListener() { 949 @Override 950 public void cancelled(Context context) { 951 contextCancelled.set(true); 952 } 953 }, MoreExecutors.directExecutor()); 954 } 955 }; 956 957 mutableFallbackRegistry.addService(ServerServiceDefinition.builder( 958 new ServiceDescriptor("Waiter", METHOD)) 959 .addMethod(METHOD, 960 new ServerCallHandler<String, Integer>() { 961 @Override 962 public ServerCall.Listener<String> startCall( 963 ServerCall<String, Integer> call, 964 Metadata headers) { 965 callReference.set(call); 966 return callListener; 967 } 968 }).build()); 969 ServerTransportListener transportListener 970 = transportServer.registerNewServerTransport(new SimpleServerTransport()); 971 transportListener.transportReady(Attributes.EMPTY); 972 Metadata requestHeaders = new Metadata(); 973 StatsTraceContext statsTraceCtx = 974 StatsTraceContext.newServerContext(streamTracerFactories, "Waitier/serve", requestHeaders); 975 when(stream.statsTraceContext()).thenReturn(statsTraceCtx); 976 transportListener.streamCreated(stream, "Waiter/serve", requestHeaders); 977 verify(stream).setListener(streamListenerCaptor.capture()); 978 ServerStreamListener streamListener = streamListenerCaptor.getValue(); 979 assertNotNull(streamListener); 980 981 streamListener.onReady(); 982 assertEquals(1, executor.runDueTasks()); 983 return streamListener; 984 } 985 986 @Test testClientClose_cancelTriggersImmediateCancellation()987 public void testClientClose_cancelTriggersImmediateCancellation() throws Exception { 988 AtomicBoolean contextCancelled = new AtomicBoolean(false); 989 AtomicReference<Context> context = new AtomicReference<Context>(); 990 AtomicReference<ServerCall<String, Integer>> callReference 991 = new AtomicReference<ServerCall<String, Integer>>(); 992 993 ServerStreamListener streamListener = testClientClose_setup(callReference, 994 context, contextCancelled); 995 996 // For close status being non OK: 997 // isCancelled is expected to be true immediately after calling closed(), without needing 998 // to wait for the main executor to run any tasks. 999 assertFalse(callReference.get().isCancelled()); 1000 assertFalse(context.get().isCancelled()); 1001 streamListener.closed(Status.CANCELLED); 1002 assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FITLER)); 1003 assertEquals(2, executor.runDueTasks()); 1004 assertTrue(callReference.get().isCancelled()); 1005 assertTrue(context.get().isCancelled()); 1006 assertTrue(contextCancelled.get()); 1007 } 1008 1009 @Test testClientClose_OkTriggersDelayedCancellation()1010 public void testClientClose_OkTriggersDelayedCancellation() throws Exception { 1011 AtomicBoolean contextCancelled = new AtomicBoolean(false); 1012 AtomicReference<Context> context = new AtomicReference<Context>(); 1013 AtomicReference<ServerCall<String, Integer>> callReference 1014 = new AtomicReference<ServerCall<String, Integer>>(); 1015 1016 ServerStreamListener streamListener = testClientClose_setup(callReference, 1017 context, contextCancelled); 1018 1019 // For close status OK: 1020 // isCancelled is expected to be true after all pending work is done 1021 assertFalse(callReference.get().isCancelled()); 1022 assertFalse(context.get().isCancelled()); 1023 streamListener.closed(Status.OK); 1024 assertFalse(callReference.get().isCancelled()); 1025 assertFalse(context.get().isCancelled()); 1026 1027 assertEquals(1, executor.runDueTasks()); 1028 assertTrue(callReference.get().isCancelled()); 1029 assertTrue(context.get().isCancelled()); 1030 assertTrue(contextCancelled.get()); 1031 } 1032 1033 @Test getPort()1034 public void getPort() throws Exception { 1035 transportServer = new SimpleServer() { 1036 @Override 1037 public int getPort() { 1038 return 65535; 1039 } 1040 }; 1041 createAndStartServer(); 1042 1043 assertThat(server.getPort()).isEqualTo(65535); 1044 } 1045 1046 @Test getPortBeforeStartedFails()1047 public void getPortBeforeStartedFails() { 1048 transportServer = new SimpleServer(); 1049 createServer(); 1050 thrown.expect(IllegalStateException.class); 1051 thrown.expectMessage("started"); 1052 server.getPort(); 1053 } 1054 1055 @Test getPortAfterTerminationFails()1056 public void getPortAfterTerminationFails() throws Exception { 1057 transportServer = new SimpleServer(); 1058 createAndStartServer(); 1059 server.shutdown(); 1060 server.awaitTermination(); 1061 thrown.expect(IllegalStateException.class); 1062 thrown.expectMessage("terminated"); 1063 server.getPort(); 1064 } 1065 1066 @Test handlerRegistryPriorities()1067 public void handlerRegistryPriorities() throws Exception { 1068 fallbackRegistry = mock(HandlerRegistry.class); 1069 builder.addService( 1070 ServerServiceDefinition.builder(new ServiceDescriptor("Waiter", METHOD)) 1071 .addMethod(METHOD, callHandler).build()); 1072 transportServer = new SimpleServer(); 1073 createAndStartServer(); 1074 1075 ServerTransportListener transportListener 1076 = transportServer.registerNewServerTransport(new SimpleServerTransport()); 1077 transportListener.transportReady(Attributes.EMPTY); 1078 Metadata requestHeaders = new Metadata(); 1079 StatsTraceContext statsTraceCtx = 1080 StatsTraceContext.newServerContext(streamTracerFactories, "Waiter/serve", requestHeaders); 1081 when(stream.statsTraceContext()).thenReturn(statsTraceCtx); 1082 1083 // This call will be handled by callHandler from the internal registry 1084 transportListener.streamCreated(stream, "Waiter/serve", requestHeaders); 1085 assertEquals(1, executor.runDueTasks()); 1086 verify(callHandler).startCall(Matchers.<ServerCall<String, Integer>>anyObject(), 1087 Matchers.<Metadata>anyObject()); 1088 // This call will be handled by the fallbackRegistry because it's not registred in the internal 1089 // registry. 1090 transportListener.streamCreated(stream, "Service1/Method2", requestHeaders); 1091 assertEquals(1, executor.runDueTasks()); 1092 verify(fallbackRegistry).lookupMethod("Service1/Method2", AUTHORITY); 1093 1094 verifyNoMoreInteractions(callHandler); 1095 verifyNoMoreInteractions(fallbackRegistry); 1096 } 1097 1098 @Test messageRead_errorCancelsCall()1099 public void messageRead_errorCancelsCall() throws Exception { 1100 JumpToApplicationThreadServerStreamListener listener 1101 = new JumpToApplicationThreadServerStreamListener( 1102 executor.getScheduledExecutorService(), 1103 executor.getScheduledExecutorService(), 1104 stream, 1105 Context.ROOT.withCancellation()); 1106 ServerStreamListener mockListener = mock(ServerStreamListener.class); 1107 listener.setListener(mockListener); 1108 1109 TestError expectedT = new TestError(); 1110 doThrow(expectedT).when(mockListener) 1111 .messagesAvailable(any(StreamListener.MessageProducer.class)); 1112 // Closing the InputStream is done by the delegated listener (generally ServerCallImpl) 1113 listener.messagesAvailable(mock(StreamListener.MessageProducer.class)); 1114 try { 1115 executor.runDueTasks(); 1116 fail("Expected exception"); 1117 } catch (TestError t) { 1118 assertSame(expectedT, t); 1119 ensureServerStateNotLeaked(); 1120 } 1121 } 1122 1123 @Test messageRead_runtimeExceptionCancelsCall()1124 public void messageRead_runtimeExceptionCancelsCall() throws Exception { 1125 JumpToApplicationThreadServerStreamListener listener 1126 = new JumpToApplicationThreadServerStreamListener( 1127 executor.getScheduledExecutorService(), 1128 executor.getScheduledExecutorService(), 1129 stream, 1130 Context.ROOT.withCancellation()); 1131 ServerStreamListener mockListener = mock(ServerStreamListener.class); 1132 listener.setListener(mockListener); 1133 1134 RuntimeException expectedT = new RuntimeException(); 1135 doThrow(expectedT).when(mockListener) 1136 .messagesAvailable(any(StreamListener.MessageProducer.class)); 1137 // Closing the InputStream is done by the delegated listener (generally ServerCallImpl) 1138 listener.messagesAvailable(mock(StreamListener.MessageProducer.class)); 1139 try { 1140 executor.runDueTasks(); 1141 fail("Expected exception"); 1142 } catch (RuntimeException t) { 1143 assertSame(expectedT, t); 1144 ensureServerStateNotLeaked(); 1145 } 1146 } 1147 1148 @Test halfClosed_errorCancelsCall()1149 public void halfClosed_errorCancelsCall() { 1150 JumpToApplicationThreadServerStreamListener listener 1151 = new JumpToApplicationThreadServerStreamListener( 1152 executor.getScheduledExecutorService(), 1153 executor.getScheduledExecutorService(), 1154 stream, 1155 Context.ROOT.withCancellation()); 1156 ServerStreamListener mockListener = mock(ServerStreamListener.class); 1157 listener.setListener(mockListener); 1158 1159 TestError expectedT = new TestError(); 1160 doThrow(expectedT).when(mockListener).halfClosed(); 1161 listener.halfClosed(); 1162 try { 1163 executor.runDueTasks(); 1164 fail("Expected exception"); 1165 } catch (TestError t) { 1166 assertSame(expectedT, t); 1167 ensureServerStateNotLeaked(); 1168 } 1169 } 1170 1171 @Test halfClosed_runtimeExceptionCancelsCall()1172 public void halfClosed_runtimeExceptionCancelsCall() { 1173 JumpToApplicationThreadServerStreamListener listener 1174 = new JumpToApplicationThreadServerStreamListener( 1175 executor.getScheduledExecutorService(), 1176 executor.getScheduledExecutorService(), 1177 stream, 1178 Context.ROOT.withCancellation()); 1179 ServerStreamListener mockListener = mock(ServerStreamListener.class); 1180 listener.setListener(mockListener); 1181 1182 RuntimeException expectedT = new RuntimeException(); 1183 doThrow(expectedT).when(mockListener).halfClosed(); 1184 listener.halfClosed(); 1185 try { 1186 executor.runDueTasks(); 1187 fail("Expected exception"); 1188 } catch (RuntimeException t) { 1189 assertSame(expectedT, t); 1190 ensureServerStateNotLeaked(); 1191 } 1192 } 1193 1194 @Test onReady_errorCancelsCall()1195 public void onReady_errorCancelsCall() { 1196 JumpToApplicationThreadServerStreamListener listener 1197 = new JumpToApplicationThreadServerStreamListener( 1198 executor.getScheduledExecutorService(), 1199 executor.getScheduledExecutorService(), 1200 stream, 1201 Context.ROOT.withCancellation()); 1202 ServerStreamListener mockListener = mock(ServerStreamListener.class); 1203 listener.setListener(mockListener); 1204 1205 TestError expectedT = new TestError(); 1206 doThrow(expectedT).when(mockListener).onReady(); 1207 listener.onReady(); 1208 try { 1209 executor.runDueTasks(); 1210 fail("Expected exception"); 1211 } catch (TestError t) { 1212 assertSame(expectedT, t); 1213 ensureServerStateNotLeaked(); 1214 } 1215 } 1216 1217 @Test onReady_runtimeExceptionCancelsCall()1218 public void onReady_runtimeExceptionCancelsCall() { 1219 JumpToApplicationThreadServerStreamListener listener 1220 = new JumpToApplicationThreadServerStreamListener( 1221 executor.getScheduledExecutorService(), 1222 executor.getScheduledExecutorService(), 1223 stream, 1224 Context.ROOT.withCancellation()); 1225 ServerStreamListener mockListener = mock(ServerStreamListener.class); 1226 listener.setListener(mockListener); 1227 1228 RuntimeException expectedT = new RuntimeException(); 1229 doThrow(expectedT).when(mockListener).onReady(); 1230 listener.onReady(); 1231 try { 1232 executor.runDueTasks(); 1233 fail("Expected exception"); 1234 } catch (RuntimeException t) { 1235 assertSame(expectedT, t); 1236 ensureServerStateNotLeaked(); 1237 } 1238 } 1239 1240 @Test binaryLogInstalled()1241 public void binaryLogInstalled() throws Exception { 1242 final SettableFuture<Boolean> intercepted = SettableFuture.create(); 1243 final ServerInterceptor interceptor = new ServerInterceptor() { 1244 @Override 1245 public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, 1246 Metadata headers, 1247 ServerCallHandler<ReqT, RespT> next) { 1248 intercepted.set(true); 1249 return next.startCall(call, headers); 1250 } 1251 }; 1252 1253 builder.binlog = new BinaryLog() { 1254 @Override 1255 public void close() throws IOException { 1256 // noop 1257 } 1258 1259 @Override 1260 public <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition( 1261 ServerMethodDefinition<ReqT, RespT> oMethodDef) { 1262 return ServerMethodDefinition.create( 1263 oMethodDef.getMethodDescriptor(), 1264 InternalServerInterceptors.interceptCallHandlerCreate( 1265 interceptor, 1266 oMethodDef.getServerCallHandler())); 1267 } 1268 1269 @Override 1270 public Channel wrapChannel(Channel channel) { 1271 return channel; 1272 } 1273 }; 1274 createAndStartServer(); 1275 basicExchangeHelper(METHOD, "Lots of pizza, please", 314, 50); 1276 assertTrue(intercepted.get()); 1277 } 1278 1279 @Test channelz_membership()1280 public void channelz_membership() throws Exception { 1281 createServer(); 1282 assertTrue(builder.channelz.containsServer(server.getLogId())); 1283 server.shutdownNow().awaitTermination(); 1284 assertFalse(builder.channelz.containsServer(server.getLogId())); 1285 } 1286 1287 @Test channelz_serverStats()1288 public void channelz_serverStats() throws Exception { 1289 createAndStartServer(); 1290 assertEquals(0, server.getStats().get().callsSucceeded); 1291 basicExchangeHelper(METHOD, "Lots of pizza, please", 314, null); 1292 assertEquals(1, server.getStats().get().callsSucceeded); 1293 } 1294 1295 @Test channelz_transport_membershp()1296 public void channelz_transport_membershp() throws Exception { 1297 createAndStartServer(); 1298 SimpleServerTransport transport = new SimpleServerTransport(); 1299 1300 ServerSocketsList before = builder.channelz 1301 .getServerSockets(id(server), id(transport), /*maxPageSize=*/ 1); 1302 assertThat(before.sockets).isEmpty(); 1303 assertTrue(before.end); 1304 1305 ServerTransportListener listener 1306 = transportServer.registerNewServerTransport(transport); 1307 ServerSocketsList added = builder.channelz 1308 .getServerSockets(id(server), id(transport), /*maxPageSize=*/ 1); 1309 assertThat(added.sockets).containsExactly(transport); 1310 assertTrue(before.end); 1311 1312 listener.transportTerminated(); 1313 ServerSocketsList after = builder.channelz 1314 .getServerSockets(id(server), id(transport), /*maxPageSize=*/ 1); 1315 assertThat(after.sockets).isEmpty(); 1316 assertTrue(after.end); 1317 } 1318 createAndStartServer()1319 private void createAndStartServer() throws IOException { 1320 createServer(); 1321 server.start(); 1322 } 1323 createServer()1324 private void createServer() { 1325 assertNull(server); 1326 1327 builder.fallbackHandlerRegistry(fallbackRegistry); 1328 builder.executorPool = executorPool; 1329 server = new ServerImpl(builder, transportServer, SERVER_CONTEXT); 1330 } 1331 verifyExecutorsAcquired()1332 private void verifyExecutorsAcquired() { 1333 verify(executorPool).getObject(); 1334 verifyNoMoreInteractions(executorPool); 1335 } 1336 verifyExecutorsNotReturned()1337 private void verifyExecutorsNotReturned() { 1338 verify(executorPool, never()).returnObject(any(Executor.class)); 1339 } 1340 verifyExecutorsReturned()1341 private void verifyExecutorsReturned() { 1342 verify(executorPool).returnObject(same(executor.getScheduledExecutorService())); 1343 verifyNoMoreInteractions(executorPool); 1344 } 1345 ensureServerStateNotLeaked()1346 private void ensureServerStateNotLeaked() { 1347 verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); 1348 assertEquals(Status.UNKNOWN, statusCaptor.getValue()); 1349 assertNull(statusCaptor.getValue().getCause()); 1350 assertTrue(metadataCaptor.getValue().keys().isEmpty()); 1351 } 1352 1353 private static class SimpleServer implements io.grpc.internal.InternalServer { 1354 ServerListener listener; 1355 1356 @Override start(ServerListener listener)1357 public void start(ServerListener listener) throws IOException { 1358 this.listener = listener; 1359 } 1360 1361 @Override getPort()1362 public int getPort() { 1363 return -1; 1364 } 1365 1366 @Override getListenSockets()1367 public List<InternalInstrumented<SocketStats>> getListenSockets() { 1368 return Collections.emptyList(); 1369 } 1370 1371 @Override shutdown()1372 public void shutdown() { 1373 listener.serverShutdown(); 1374 } 1375 registerNewServerTransport(SimpleServerTransport transport)1376 public ServerTransportListener registerNewServerTransport(SimpleServerTransport transport) { 1377 return transport.listener = listener.transportCreated(transport); 1378 } 1379 } 1380 1381 private class SimpleServerTransport implements ServerTransport { 1382 ServerTransportListener listener; 1383 InternalLogId id = InternalLogId.allocate(getClass().getName()); 1384 1385 @Override shutdown()1386 public void shutdown() { 1387 listener.transportTerminated(); 1388 } 1389 1390 @Override shutdownNow(Status status)1391 public void shutdownNow(Status status) { 1392 listener.transportTerminated(); 1393 } 1394 1395 @Override getLogId()1396 public InternalLogId getLogId() { 1397 return id; 1398 } 1399 1400 @Override getScheduledExecutorService()1401 public ScheduledExecutorService getScheduledExecutorService() { 1402 return timer.getScheduledExecutorService(); 1403 } 1404 1405 @Override getStats()1406 public ListenableFuture<SocketStats> getStats() { 1407 SettableFuture<SocketStats> ret = SettableFuture.create(); 1408 ret.set(null); 1409 return ret; 1410 } 1411 } 1412 1413 private static class Builder extends AbstractServerImplBuilder<Builder> { buildTransportServer( List<ServerStreamTracer.Factory> streamTracerFactories)1414 @Override protected InternalServer buildTransportServer( 1415 List<ServerStreamTracer.Factory> streamTracerFactories) { 1416 throw new UnsupportedOperationException(); 1417 } 1418 useTransportSecurity(File f1, File f2)1419 @Override public Builder useTransportSecurity(File f1, File f2) { 1420 throw new UnsupportedOperationException(); 1421 } 1422 } 1423 1424 /** Allows more precise catch blocks than plain Error to avoid catching AssertionError. */ 1425 private static final class TestError extends Error {} 1426 } 1427