• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static io.grpc.InternalChannelz.id;
21 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertNotSame;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertSame;
28 import static org.junit.Assert.assertTrue;
29 import static org.junit.Assert.fail;
30 import static org.mockito.AdditionalAnswers.delegatesTo;
31 import static org.mockito.Matchers.any;
32 import static org.mockito.Matchers.anyString;
33 import static org.mockito.Matchers.eq;
34 import static org.mockito.Matchers.isA;
35 import static org.mockito.Matchers.isNotNull;
36 import static org.mockito.Matchers.notNull;
37 import static org.mockito.Matchers.same;
38 import static org.mockito.Mockito.atLeast;
39 import static org.mockito.Mockito.doThrow;
40 import static org.mockito.Mockito.mock;
41 import static org.mockito.Mockito.never;
42 import static org.mockito.Mockito.times;
43 import static org.mockito.Mockito.verify;
44 import static org.mockito.Mockito.verifyNoMoreInteractions;
45 import static org.mockito.Mockito.when;
46 
47 import com.google.common.util.concurrent.ListenableFuture;
48 import com.google.common.util.concurrent.MoreExecutors;
49 import com.google.common.util.concurrent.SettableFuture;
50 import io.grpc.Attributes;
51 import io.grpc.BinaryLog;
52 import io.grpc.Channel;
53 import io.grpc.Compressor;
54 import io.grpc.Context;
55 import io.grpc.Grpc;
56 import io.grpc.HandlerRegistry;
57 import io.grpc.IntegerMarshaller;
58 import io.grpc.InternalChannelz;
59 import io.grpc.InternalChannelz.ServerSocketsList;
60 import io.grpc.InternalChannelz.SocketStats;
61 import io.grpc.InternalInstrumented;
62 import io.grpc.InternalLogId;
63 import io.grpc.InternalServerInterceptors;
64 import io.grpc.Metadata;
65 import io.grpc.MethodDescriptor;
66 import io.grpc.ServerCall;
67 import io.grpc.ServerCall.Listener;
68 import io.grpc.ServerCallHandler;
69 import io.grpc.ServerInterceptor;
70 import io.grpc.ServerMethodDefinition;
71 import io.grpc.ServerServiceDefinition;
72 import io.grpc.ServerStreamTracer;
73 import io.grpc.ServerTransportFilter;
74 import io.grpc.ServiceDescriptor;
75 import io.grpc.Status;
76 import io.grpc.StringMarshaller;
77 import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener;
78 import io.grpc.internal.testing.SingleMessageProducer;
79 import io.grpc.internal.testing.TestServerStreamTracer;
80 import io.grpc.util.MutableHandlerRegistry;
81 import java.io.ByteArrayInputStream;
82 import java.io.File;
83 import java.io.IOException;
84 import java.io.InputStream;
85 import java.net.SocketAddress;
86 import java.util.Arrays;
87 import java.util.Collections;
88 import java.util.LinkedList;
89 import java.util.List;
90 import java.util.concurrent.CyclicBarrier;
91 import java.util.concurrent.Executor;
92 import java.util.concurrent.ScheduledExecutorService;
93 import java.util.concurrent.TimeUnit;
94 import java.util.concurrent.atomic.AtomicBoolean;
95 import java.util.concurrent.atomic.AtomicInteger;
96 import java.util.concurrent.atomic.AtomicReference;
97 import javax.annotation.Nullable;
98 import org.junit.After;
99 import org.junit.Before;
100 import org.junit.BeforeClass;
101 import org.junit.Rule;
102 import org.junit.Test;
103 import org.junit.rules.ExpectedException;
104 import org.junit.runner.RunWith;
105 import org.junit.runners.JUnit4;
106 import org.mockito.ArgumentCaptor;
107 import org.mockito.Captor;
108 import org.mockito.Matchers;
109 import org.mockito.Mock;
110 import org.mockito.MockitoAnnotations;
111 
112 /** Unit tests for {@link ServerImpl}. */
113 @RunWith(JUnit4.class)
114 public class ServerImplTest {
115   private static final IntegerMarshaller INTEGER_MARSHALLER = IntegerMarshaller.INSTANCE;
116   private static final StringMarshaller STRING_MARSHALLER = StringMarshaller.INSTANCE;
117   private static final MethodDescriptor<String, Integer> METHOD =
118       MethodDescriptor.<String, Integer>newBuilder()
119           .setType(MethodDescriptor.MethodType.UNKNOWN)
120           .setFullMethodName("Waiter/serve")
121           .setRequestMarshaller(STRING_MARSHALLER)
122           .setResponseMarshaller(INTEGER_MARSHALLER)
123           .build();
124   private static final Context.Key<String> SERVER_ONLY = Context.key("serverOnly");
125   private static final Context.Key<String> SERVER_TRACER_ADDED_KEY = Context.key("tracer-added");
126   private static final Context.CancellableContext SERVER_CONTEXT =
127       Context.ROOT.withValue(SERVER_ONLY, "yes").withCancellation();
128   private static final FakeClock.TaskFilter CONTEXT_CLOSER_TASK_FITLER =
129       new FakeClock.TaskFilter() {
130         @Override
131         public boolean shouldAccept(Runnable runnable) {
132           return runnable instanceof ServerImpl.ContextCloser;
133         }
134       };
135   private static final String AUTHORITY = "some_authority";
136 
137   @Rule public final ExpectedException thrown = ExpectedException.none();
138 
139   @BeforeClass
beforeStartUp()140   public static void beforeStartUp() {
141     // Cancel the root context. Server will fork it so the per-call context should not
142     // be cancelled.
143     SERVER_CONTEXT.cancel(null);
144   }
145 
146   private final FakeClock executor = new FakeClock();
147   private final FakeClock timer = new FakeClock();
148   private final InternalChannelz channelz = new InternalChannelz();
149 
150   @Mock
151   private ServerStreamTracer.Factory streamTracerFactory;
152   private List<ServerStreamTracer.Factory> streamTracerFactories;
153   private final TestServerStreamTracer streamTracer = new TestServerStreamTracer() {
154       @Override
155       public Context filterContext(Context context) {
156         Context newCtx = super.filterContext(context);
157         return newCtx.withValue(SERVER_TRACER_ADDED_KEY, "context added by tracer");
158       }
159     };
160   @Mock
161   private ObjectPool<Executor> executorPool;
162   private Builder builder = new Builder();
163   private MutableHandlerRegistry mutableFallbackRegistry = new MutableHandlerRegistry();
164   private HandlerRegistry fallbackRegistry = mock(
165       HandlerRegistry.class,
166       delegatesTo(new HandlerRegistry() {
167         @Override
168         public ServerMethodDefinition<?, ?> lookupMethod(
169             String methodName, @Nullable String authority) {
170           return mutableFallbackRegistry.lookupMethod(methodName, authority);
171         }
172 
173         @Override
174         public List<ServerServiceDefinition> getServices() {
175           return mutableFallbackRegistry.getServices();
176         }
177       }));
178   private SimpleServer transportServer = new SimpleServer();
179   private ServerImpl server;
180 
181   @Captor
182   private ArgumentCaptor<Status> statusCaptor;
183   @Captor
184   private ArgumentCaptor<Metadata> metadataCaptor;
185   @Captor
186   private ArgumentCaptor<ServerStreamListener> streamListenerCaptor;
187 
188   @Mock
189   private ServerStream stream;
190   @Mock
191   private ServerCall.Listener<String> callListener;
192   @Mock
193   private ServerCallHandler<String, Integer> callHandler;
194 
195   /** Set up for test. */
196   @Before
startUp()197   public void startUp() throws IOException {
198     MockitoAnnotations.initMocks(this);
199     builder.channelz = channelz;
200     streamTracerFactories = Arrays.asList(streamTracerFactory);
201     when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
202     when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class)))
203         .thenReturn(streamTracer);
204     when(stream.getAuthority()).thenReturn(AUTHORITY);
205   }
206 
207   @After
noPendingTasks()208   public void noPendingTasks() {
209     assertEquals(0, executor.numPendingTasks());
210     assertEquals(0, timer.numPendingTasks());
211   }
212 
213   @Test
startStopImmediate()214   public void startStopImmediate() throws IOException {
215     transportServer = new SimpleServer() {
216       @Override
217       public void shutdown() {}
218     };
219     createAndStartServer();
220     server.shutdown();
221     assertTrue(server.isShutdown());
222     assertFalse(server.isTerminated());
223     transportServer.listener.serverShutdown();
224     assertTrue(server.isTerminated());
225   }
226 
227   @Test
stopImmediate()228   public void stopImmediate() throws IOException {
229     transportServer = new SimpleServer() {
230       @Override
231       public void shutdown() {
232         throw new AssertionError("Should not be called, because wasn't started");
233       }
234     };
235     createServer();
236     server.shutdown();
237     assertTrue(server.isShutdown());
238     assertTrue(server.isTerminated());
239     verifyNoMoreInteractions(executorPool);
240   }
241 
242   @Test
startStopImmediateWithChildTransport()243   public void startStopImmediateWithChildTransport() throws IOException {
244     createAndStartServer();
245     verifyExecutorsAcquired();
246     class DelayedShutdownServerTransport extends SimpleServerTransport {
247       boolean shutdown;
248 
249       @Override
250       public void shutdown() {
251         shutdown = true;
252       }
253     }
254 
255     DelayedShutdownServerTransport serverTransport = new DelayedShutdownServerTransport();
256     transportServer.registerNewServerTransport(serverTransport);
257     server.shutdown();
258     assertTrue(server.isShutdown());
259     assertFalse(server.isTerminated());
260     assertTrue(serverTransport.shutdown);
261     verifyExecutorsNotReturned();
262 
263     serverTransport.listener.transportTerminated();
264     assertTrue(server.isTerminated());
265     verifyExecutorsReturned();
266   }
267 
268   @Test
startShutdownNowImmediateWithChildTransport()269   public void startShutdownNowImmediateWithChildTransport() throws IOException {
270     createAndStartServer();
271     verifyExecutorsAcquired();
272     class DelayedShutdownServerTransport extends SimpleServerTransport {
273       boolean shutdown;
274 
275       @Override
276       public void shutdown() {}
277 
278       @Override
279       public void shutdownNow(Status reason) {
280         shutdown = true;
281       }
282     }
283 
284     DelayedShutdownServerTransport serverTransport = new DelayedShutdownServerTransport();
285     transportServer.registerNewServerTransport(serverTransport);
286     server.shutdownNow();
287     assertTrue(server.isShutdown());
288     assertFalse(server.isTerminated());
289     assertTrue(serverTransport.shutdown);
290     verifyExecutorsNotReturned();
291 
292     serverTransport.listener.transportTerminated();
293     assertTrue(server.isTerminated());
294     verifyExecutorsReturned();
295   }
296 
297   @Test
shutdownNowAfterShutdown()298   public void shutdownNowAfterShutdown() throws IOException {
299     createAndStartServer();
300     verifyExecutorsAcquired();
301     class DelayedShutdownServerTransport extends SimpleServerTransport {
302       boolean shutdown;
303 
304       @Override
305       public void shutdown() {}
306 
307       @Override
308       public void shutdownNow(Status reason) {
309         shutdown = true;
310       }
311     }
312 
313     DelayedShutdownServerTransport serverTransport = new DelayedShutdownServerTransport();
314     transportServer.registerNewServerTransport(serverTransport);
315     server.shutdown();
316     assertTrue(server.isShutdown());
317     server.shutdownNow();
318     assertFalse(server.isTerminated());
319     assertTrue(serverTransport.shutdown);
320     verifyExecutorsNotReturned();
321 
322     serverTransport.listener.transportTerminated();
323     assertTrue(server.isTerminated());
324     verifyExecutorsReturned();
325   }
326 
327   @Test
shutdownNowAfterSlowShutdown()328   public void shutdownNowAfterSlowShutdown() throws IOException {
329     transportServer = new SimpleServer() {
330       @Override
331       public void shutdown() {
332         // Don't call super which calls listener.serverShutdown(). We'll call it manually.
333       }
334     };
335     createAndStartServer();
336     verifyExecutorsAcquired();
337     class DelayedShutdownServerTransport extends SimpleServerTransport {
338       boolean shutdown;
339 
340       @Override
341       public void shutdown() {}
342 
343       @Override
344       public void shutdownNow(Status reason) {
345         shutdown = true;
346       }
347     }
348 
349     DelayedShutdownServerTransport serverTransport = new DelayedShutdownServerTransport();
350     transportServer.registerNewServerTransport(serverTransport);
351     server.shutdown();
352     server.shutdownNow();
353     transportServer.listener.serverShutdown();
354     assertTrue(server.isShutdown());
355     assertFalse(server.isTerminated());
356 
357     verifyExecutorsNotReturned();
358     serverTransport.listener.transportTerminated();
359     verifyExecutorsReturned();
360     assertTrue(server.isTerminated());
361   }
362 
363   @Test
transportServerFailsStartup()364   public void transportServerFailsStartup() {
365     final IOException ex = new IOException();
366     class FailingStartupServer extends SimpleServer {
367       @Override
368       public void start(ServerListener listener) throws IOException {
369         throw ex;
370       }
371     }
372 
373     transportServer = new FailingStartupServer();
374     createServer();
375     try {
376       server.start();
377       fail("expected exception");
378     } catch (IOException e) {
379       assertSame(ex, e);
380     }
381     verifyNoMoreInteractions(executorPool);
382   }
383 
384   @Test
transportHandshakeTimeout_expired()385   public void transportHandshakeTimeout_expired() throws Exception {
386     class ShutdownRecordingTransport extends SimpleServerTransport {
387       Status shutdownNowStatus;
388 
389       @Override public void shutdownNow(Status status) {
390         shutdownNowStatus = status;
391         super.shutdownNow(status);
392       }
393     }
394 
395     builder.handshakeTimeout(60, TimeUnit.SECONDS);
396     createAndStartServer();
397     ShutdownRecordingTransport serverTransport = new ShutdownRecordingTransport();
398     transportServer.registerNewServerTransport(serverTransport);
399     timer.forwardTime(59, TimeUnit.SECONDS);
400     assertNull("shutdownNow status", serverTransport.shutdownNowStatus);
401     // Don't call transportReady() in time
402     timer.forwardTime(2, TimeUnit.SECONDS);
403     assertNotNull("shutdownNow status", serverTransport.shutdownNowStatus);
404   }
405 
406   @Test
methodNotFound()407   public void methodNotFound() throws Exception {
408     createAndStartServer();
409     ServerTransportListener transportListener
410         = transportServer.registerNewServerTransport(new SimpleServerTransport());
411     transportListener.transportReady(Attributes.EMPTY);
412     Metadata requestHeaders = new Metadata();
413     StatsTraceContext statsTraceCtx =
414         StatsTraceContext.newServerContext(
415             streamTracerFactories, "Waiter/nonexist", requestHeaders);
416     when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
417     transportListener.streamCreated(stream, "Waiter/nonexist", requestHeaders);
418     verify(stream).setListener(isA(ServerStreamListener.class));
419     verify(stream, atLeast(1)).statsTraceContext();
420 
421     assertEquals(1, executor.runDueTasks());
422     verify(stream).close(statusCaptor.capture(), any(Metadata.class));
423     Status status = statusCaptor.getValue();
424     assertEquals(Status.Code.UNIMPLEMENTED, status.getCode());
425     assertEquals("Method not found: Waiter/nonexist", status.getDescription());
426 
427     verify(streamTracerFactory).newServerStreamTracer(eq("Waiter/nonexist"), same(requestHeaders));
428     assertNull(streamTracer.getServerCallInfo());
429     assertEquals(Status.Code.UNIMPLEMENTED, statusCaptor.getValue().getCode());
430   }
431 
432   @Test
decompressorNotFound()433   public void decompressorNotFound() throws Exception {
434     String decompressorName = "NON_EXISTENT_DECOMPRESSOR";
435     createAndStartServer();
436     ServerTransportListener transportListener
437         = transportServer.registerNewServerTransport(new SimpleServerTransport());
438     transportListener.transportReady(Attributes.EMPTY);
439     Metadata requestHeaders = new Metadata();
440     requestHeaders.put(MESSAGE_ENCODING_KEY, decompressorName);
441     StatsTraceContext statsTraceCtx =
442         StatsTraceContext.newServerContext(
443             streamTracerFactories, "Waiter/nonexist", requestHeaders);
444     when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
445 
446     transportListener.streamCreated(stream, "Waiter/nonexist", requestHeaders);
447 
448     verify(stream).close(statusCaptor.capture(), any(Metadata.class));
449     Status status = statusCaptor.getValue();
450     assertEquals(Status.Code.UNIMPLEMENTED, status.getCode());
451     assertEquals("Can't find decompressor for " + decompressorName, status.getDescription());
452     verifyNoMoreInteractions(stream);
453   }
454 
455   @Test
basicExchangeSuccessful()456   public void basicExchangeSuccessful() throws Exception {
457     createAndStartServer();
458     basicExchangeHelper(METHOD, "Lots of pizza, please", 314, 50);
459   }
460 
basicExchangeHelper( MethodDescriptor<String, Integer> method, String request, int firstResponse, Integer extraResponse)461   private void basicExchangeHelper(
462       MethodDescriptor<String, Integer> method,
463       String request,
464       int firstResponse,
465       Integer extraResponse) throws Exception {
466     final Metadata.Key<String> metadataKey
467         = Metadata.Key.of("inception", Metadata.ASCII_STRING_MARSHALLER);
468     final AtomicReference<ServerCall<String, Integer>> callReference
469         = new AtomicReference<ServerCall<String, Integer>>();
470     final AtomicReference<Context> callContextReference = new AtomicReference<Context>();
471     mutableFallbackRegistry.addService(ServerServiceDefinition.builder(
472         new ServiceDescriptor("Waiter", method))
473         .addMethod(
474             method,
475             new ServerCallHandler<String, Integer>() {
476               @Override
477               public ServerCall.Listener<String> startCall(
478                   ServerCall<String, Integer> call,
479                   Metadata headers) {
480                 assertEquals("Waiter/serve", call.getMethodDescriptor().getFullMethodName());
481                 assertNotNull(call);
482                 assertNotNull(headers);
483                 assertEquals("value", headers.get(metadataKey));
484                 callReference.set(call);
485                 callContextReference.set(Context.current());
486                 return callListener;
487               }
488             }).build());
489     ServerTransportListener transportListener
490         = transportServer.registerNewServerTransport(new SimpleServerTransport());
491     transportListener.transportReady(Attributes.EMPTY);
492 
493     Metadata requestHeaders = new Metadata();
494     requestHeaders.put(metadataKey, "value");
495     StatsTraceContext statsTraceCtx =
496         StatsTraceContext.newServerContext(streamTracerFactories, "Waiter/serve", requestHeaders);
497     when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
498 
499     transportListener.streamCreated(stream, "Waiter/serve", requestHeaders);
500     verify(stream).setListener(streamListenerCaptor.capture());
501     ServerStreamListener streamListener = streamListenerCaptor.getValue();
502     assertNotNull(streamListener);
503     verify(stream, atLeast(1)).statsTraceContext();
504     verify(fallbackRegistry, never()).lookupMethod(any(String.class), any(String.class));
505 
506     assertEquals(1, executor.runDueTasks());
507     ServerCall<String, Integer> call = callReference.get();
508     assertNotNull(call);
509     assertEquals(
510         new ServerCallInfoImpl<String, Integer>(
511             call.getMethodDescriptor(),
512             call.getAttributes(),
513             call.getAuthority()),
514         streamTracer.getServerCallInfo());
515     verify(fallbackRegistry).lookupMethod("Waiter/serve", AUTHORITY);
516     Context callContext = callContextReference.get();
517     assertNotNull(callContext);
518     assertEquals("context added by tracer", SERVER_TRACER_ADDED_KEY.get(callContext));
519 
520     streamListener.messagesAvailable(new SingleMessageProducer(STRING_MARSHALLER.stream(request)));
521     assertEquals(1, executor.runDueTasks());
522     verify(callListener).onMessage(request);
523 
524     Metadata responseHeaders = new Metadata();
525     responseHeaders.put(metadataKey, "response value");
526     call.sendHeaders(responseHeaders);
527     verify(stream).writeHeaders(responseHeaders);
528     verify(stream).setCompressor(isA(Compressor.class));
529 
530     call.sendMessage(firstResponse);
531     ArgumentCaptor<InputStream> inputCaptor = ArgumentCaptor.forClass(InputStream.class);
532     verify(stream).writeMessage(inputCaptor.capture());
533     verify(stream).flush();
534     assertEquals(firstResponse, INTEGER_MARSHALLER.parse(inputCaptor.getValue()).intValue());
535 
536     streamListener.halfClosed(); // All full; no dessert.
537     assertEquals(1, executor.runDueTasks());
538     verify(callListener).onHalfClose();
539 
540     if (extraResponse != null) {
541       call.sendMessage(extraResponse);
542       verify(stream, times(2)).writeMessage(inputCaptor.capture());
543       verify(stream, times(2)).flush();
544       assertEquals(
545           (int) extraResponse, INTEGER_MARSHALLER.parse(inputCaptor.getValue()).intValue());
546     }
547 
548     Metadata trailers = new Metadata();
549     trailers.put(metadataKey, "another value");
550     Status status = Status.OK.withDescription("A okay");
551     call.close(status, trailers);
552     verify(stream).close(status, trailers);
553 
554     streamListener.closed(Status.OK);
555     assertEquals(1, executor.runDueTasks());
556     verify(callListener).onComplete();
557 
558     verify(stream, atLeast(1)).statsTraceContext();
559     verifyNoMoreInteractions(callListener);
560 
561     verify(streamTracerFactory).newServerStreamTracer(eq("Waiter/serve"), same(requestHeaders));
562   }
563 
564   @Test
transportFilters()565   public void transportFilters() throws Exception {
566     final SocketAddress remoteAddr = mock(SocketAddress.class);
567     final Attributes.Key<String> key1 = Attributes.Key.create("test-key1");
568     final Attributes.Key<String> key2 = Attributes.Key.create("test-key2");
569     final Attributes.Key<String> key3 = Attributes.Key.create("test-key3");
570     final AtomicReference<Attributes> filter1TerminationCallbackArgument =
571         new AtomicReference<Attributes>();
572     final AtomicReference<Attributes> filter2TerminationCallbackArgument =
573         new AtomicReference<Attributes>();
574     final AtomicInteger readyCallbackCalled = new AtomicInteger(0);
575     final AtomicInteger terminationCallbackCalled = new AtomicInteger(0);
576     builder.addTransportFilter(new ServerTransportFilter() {
577         @Override
578         public Attributes transportReady(Attributes attrs) {
579           assertEquals(Attributes.newBuilder()
580               .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, remoteAddr)
581               .build(), attrs);
582           readyCallbackCalled.incrementAndGet();
583           return attrs.toBuilder()
584               .set(key1, "yalayala")
585               .set(key2, "blabla")
586               .build();
587         }
588 
589         @Override
590         public void transportTerminated(Attributes attrs) {
591           terminationCallbackCalled.incrementAndGet();
592           filter1TerminationCallbackArgument.set(attrs);
593         }
594       });
595     builder.addTransportFilter(new ServerTransportFilter() {
596         @Override
597         public Attributes transportReady(Attributes attrs) {
598           assertEquals(Attributes.newBuilder()
599               .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, remoteAddr)
600               .set(key1, "yalayala")
601               .set(key2, "blabla")
602               .build(), attrs);
603           readyCallbackCalled.incrementAndGet();
604           return attrs.toBuilder()
605               .set(key1, "ouch")
606               .set(key3, "puff")
607               .build();
608         }
609 
610         @Override
611         public void transportTerminated(Attributes attrs) {
612           terminationCallbackCalled.incrementAndGet();
613           filter2TerminationCallbackArgument.set(attrs);
614         }
615       });
616     Attributes expectedTransportAttrs = Attributes.newBuilder()
617         .set(key1, "ouch")
618         .set(key2, "blabla")
619         .set(key3, "puff")
620         .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, remoteAddr)
621         .build();
622 
623     createAndStartServer();
624     ServerTransportListener transportListener
625         = transportServer.registerNewServerTransport(new SimpleServerTransport());
626     Attributes transportAttrs = transportListener.transportReady(Attributes.newBuilder()
627         .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, remoteAddr).build());
628 
629     assertEquals(expectedTransportAttrs, transportAttrs);
630 
631     server.shutdown();
632     server.awaitTermination();
633 
634     assertEquals(expectedTransportAttrs, filter1TerminationCallbackArgument.get());
635     assertEquals(expectedTransportAttrs, filter2TerminationCallbackArgument.get());
636     assertEquals(2, readyCallbackCalled.get());
637     assertEquals(2, terminationCallbackCalled.get());
638   }
639 
640   @Test
interceptors()641   public void interceptors() throws Exception {
642     final LinkedList<Context> capturedContexts = new LinkedList<Context>();
643     final Context.Key<String> key1 = Context.key("key1");
644     final Context.Key<String> key2 = Context.key("key2");
645     final Context.Key<String> key3 = Context.key("key3");
646     ServerInterceptor intercepter1 = new ServerInterceptor() {
647         @Override
648         public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
649             ServerCall<ReqT, RespT> call,
650             Metadata headers,
651             ServerCallHandler<ReqT, RespT> next) {
652           Context ctx = Context.current().withValue(key1, "value1");
653           Context origCtx = ctx.attach();
654           try {
655             capturedContexts.add(ctx);
656             return next.startCall(call, headers);
657           } finally {
658             ctx.detach(origCtx);
659           }
660         }
661       };
662     ServerInterceptor intercepter2 = new ServerInterceptor() {
663         @Override
664         public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
665             ServerCall<ReqT, RespT> call,
666             Metadata headers,
667             ServerCallHandler<ReqT, RespT> next) {
668           Context ctx = Context.current().withValue(key2, "value2");
669           Context origCtx = ctx.attach();
670           try {
671             capturedContexts.add(ctx);
672             return next.startCall(call, headers);
673           } finally {
674             ctx.detach(origCtx);
675           }
676         }
677       };
678     ServerCallHandler<String, Integer> callHandler = new ServerCallHandler<String, Integer>() {
679         @Override
680         public ServerCall.Listener<String> startCall(
681             ServerCall<String, Integer> call,
682             Metadata headers) {
683           capturedContexts.add(Context.current().withValue(key3, "value3"));
684           return callListener;
685         }
686       };
687 
688     mutableFallbackRegistry.addService(
689         ServerServiceDefinition.builder(new ServiceDescriptor("Waiter", METHOD))
690             .addMethod(METHOD, callHandler).build());
691     builder.intercept(intercepter2);
692     builder.intercept(intercepter1);
693     createServer();
694     server.start();
695 
696     ServerTransportListener transportListener
697         = transportServer.registerNewServerTransport(new SimpleServerTransport());
698     transportListener.transportReady(Attributes.EMPTY);
699 
700     Metadata requestHeaders = new Metadata();
701     StatsTraceContext statsTraceCtx =
702         StatsTraceContext.newServerContext(streamTracerFactories, "Waiter/serve", requestHeaders);
703     when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
704 
705     transportListener.streamCreated(stream, "Waiter/serve", requestHeaders);
706     assertEquals(1, executor.runDueTasks());
707 
708     Context ctx1 = capturedContexts.poll();
709     assertEquals("value1", key1.get(ctx1));
710     assertNull(key2.get(ctx1));
711     assertNull(key3.get(ctx1));
712 
713     Context ctx2 = capturedContexts.poll();
714     assertEquals("value1", key1.get(ctx2));
715     assertEquals("value2", key2.get(ctx2));
716     assertNull(key3.get(ctx2));
717 
718     Context ctx3 = capturedContexts.poll();
719     assertEquals("value1", key1.get(ctx3));
720     assertEquals("value2", key2.get(ctx3));
721     assertEquals("value3", key3.get(ctx3));
722 
723     assertTrue(capturedContexts.isEmpty());
724   }
725 
726   @Test
exceptionInStartCallPropagatesToStream()727   public void exceptionInStartCallPropagatesToStream() throws Exception {
728     createAndStartServer();
729     final Status status = Status.ABORTED.withDescription("Oh, no!");
730     mutableFallbackRegistry.addService(ServerServiceDefinition.builder(
731         new ServiceDescriptor("Waiter", METHOD))
732         .addMethod(METHOD,
733             new ServerCallHandler<String, Integer>() {
734               @Override
735               public ServerCall.Listener<String> startCall(
736                   ServerCall<String, Integer> call,
737                   Metadata headers) {
738                 throw status.asRuntimeException();
739               }
740             }).build());
741     ServerTransportListener transportListener
742         = transportServer.registerNewServerTransport(new SimpleServerTransport());
743     transportListener.transportReady(Attributes.EMPTY);
744 
745     Metadata requestHeaders = new Metadata();
746     StatsTraceContext statsTraceCtx =
747         StatsTraceContext.newServerContext(streamTracerFactories, "Waiter/serve", requestHeaders);
748     when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
749 
750     transportListener.streamCreated(stream, "Waiter/serve", requestHeaders);
751     verify(stream).setListener(streamListenerCaptor.capture());
752     ServerStreamListener streamListener = streamListenerCaptor.getValue();
753     assertNotNull(streamListener);
754     verify(stream, atLeast(1)).statsTraceContext();
755     verifyNoMoreInteractions(stream);
756     verify(fallbackRegistry, never()).lookupMethod(any(String.class), any(String.class));
757 
758     assertEquals(1, executor.runDueTasks());
759     verify(fallbackRegistry).lookupMethod("Waiter/serve", AUTHORITY);
760     verify(stream).close(same(status), notNull(Metadata.class));
761     verify(stream, atLeast(1)).statsTraceContext();
762   }
763 
764   @Test
testNoDeadlockOnShutdown()765   public void testNoDeadlockOnShutdown() throws Exception {
766     final Object lock = new Object();
767     final CyclicBarrier barrier = new CyclicBarrier(2);
768     class MaybeDeadlockingServer extends SimpleServer {
769       @Override
770       public void shutdown() {
771         // To deadlock, a lock would need to be held while this method is in progress.
772         try {
773           barrier.await();
774         } catch (Exception ex) {
775           throw new AssertionError(ex);
776         }
777         // If deadlock is possible with this setup, this sychronization completes the loop because
778         // the serverShutdown needs a lock that Server is holding while calling this method.
779         synchronized (lock) {
780         }
781       }
782     }
783 
784     transportServer = new MaybeDeadlockingServer();
785     createAndStartServer();
786     new Thread() {
787       @Override
788       public void run() {
789         synchronized (lock) {
790           try {
791             barrier.await();
792           } catch (Exception ex) {
793             throw new AssertionError(ex);
794           }
795           // To deadlock, a lock would be needed for this call to proceed.
796           transportServer.listener.serverShutdown();
797         }
798       }
799     }.start();
800     server.shutdown();
801   }
802 
803   @Test
testNoDeadlockOnTransportShutdown()804   public void testNoDeadlockOnTransportShutdown() throws Exception {
805     createAndStartServer();
806     final Object lock = new Object();
807     final CyclicBarrier barrier = new CyclicBarrier(2);
808     class MaybeDeadlockingServerTransport extends SimpleServerTransport {
809       @Override
810       public void shutdown() {
811         // To deadlock, a lock would need to be held while this method is in progress.
812         try {
813           barrier.await();
814         } catch (Exception ex) {
815           throw new AssertionError(ex);
816         }
817         // If deadlock is possible with this setup, this sychronization completes the loop
818         // because the transportTerminated needs a lock that Server is holding while calling this
819         // method.
820         synchronized (lock) {
821         }
822       }
823     }
824 
825     final ServerTransportListener transportListener
826         = transportServer.registerNewServerTransport(new MaybeDeadlockingServerTransport());
827     new Thread() {
828       @Override
829       public void run() {
830         synchronized (lock) {
831           try {
832             barrier.await();
833           } catch (Exception ex) {
834             throw new AssertionError(ex);
835           }
836           // To deadlock, a lock would be needed for this call to proceed.
837           transportListener.transportTerminated();
838         }
839       }
840     }.start();
841     server.shutdown();
842   }
843 
844   @Test
testCallContextIsBoundInListenerCallbacks()845   public void testCallContextIsBoundInListenerCallbacks() throws Exception {
846     createAndStartServer();
847     final AtomicBoolean  onReadyCalled = new AtomicBoolean(false);
848     final AtomicBoolean onMessageCalled = new AtomicBoolean(false);
849     final AtomicBoolean onHalfCloseCalled = new AtomicBoolean(false);
850     final AtomicBoolean onCancelCalled = new AtomicBoolean(false);
851     mutableFallbackRegistry.addService(ServerServiceDefinition.builder(
852         new ServiceDescriptor("Waiter", METHOD))
853         .addMethod(
854             METHOD,
855             new ServerCallHandler<String, Integer>() {
856               @Override
857               public ServerCall.Listener<String> startCall(
858                   ServerCall<String, Integer> call,
859                   Metadata headers) {
860                 // Check that the current context is a descendant of SERVER_CONTEXT
861                 final Context initial = Context.current();
862                 assertEquals("yes", SERVER_ONLY.get(initial));
863                 assertNotSame(SERVER_CONTEXT, initial);
864                 assertFalse(initial.isCancelled());
865                 return new ServerCall.Listener<String>() {
866 
867                   @Override
868                   public void onReady() {
869                     checkContext();
870                     onReadyCalled.set(true);
871                   }
872 
873                   @Override
874                   public void onMessage(String message) {
875                     checkContext();
876                     onMessageCalled.set(true);
877                   }
878 
879                   @Override
880                   public void onHalfClose() {
881                     checkContext();
882                     onHalfCloseCalled.set(true);
883                   }
884 
885                   @Override
886                   public void onCancel() {
887                     checkContext();
888                     onCancelCalled.set(true);
889                   }
890 
891                   @Override
892                   public void onComplete() {
893                     checkContext();
894                   }
895 
896                   private void checkContext() {
897                     // Check that the bound context is the same as the initial one.
898                     assertSame(initial, Context.current());
899                   }
900                 };
901               }
902             }).build());
903     ServerTransportListener transportListener
904         = transportServer.registerNewServerTransport(new SimpleServerTransport());
905     transportListener.transportReady(Attributes.EMPTY);
906 
907     Metadata requestHeaders = new Metadata();
908     StatsTraceContext statsTraceCtx =
909         StatsTraceContext.newServerContext(streamTracerFactories, "Waitier/serve", requestHeaders);
910     when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
911 
912     transportListener.streamCreated(stream, "Waiter/serve", requestHeaders);
913     verify(stream).setListener(streamListenerCaptor.capture());
914     ServerStreamListener streamListener = streamListenerCaptor.getValue();
915     assertNotNull(streamListener);
916 
917     streamListener.onReady();
918     assertEquals(1, executor.runDueTasks());
919     assertTrue(onReadyCalled.get());
920 
921     streamListener
922         .messagesAvailable(new SingleMessageProducer(new ByteArrayInputStream(new byte[0])));
923     assertEquals(1, executor.runDueTasks());
924     assertTrue(onMessageCalled.get());
925 
926     streamListener.halfClosed();
927     assertEquals(1, executor.runDueTasks());
928     assertTrue(onHalfCloseCalled.get());
929 
930     streamListener.closed(Status.CANCELLED);
931     assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FITLER));
932     assertEquals(2, executor.runDueTasks());
933     assertTrue(onCancelCalled.get());
934 
935     // Close should never be called if asserts in listener pass.
936     verify(stream, times(0)).close(isA(Status.class), isNotNull(Metadata.class));
937   }
938 
testClientClose_setup( final AtomicReference<ServerCall<String, Integer>> callReference, final AtomicReference<Context> context, final AtomicBoolean contextCancelled)939   private ServerStreamListener testClientClose_setup(
940       final AtomicReference<ServerCall<String, Integer>> callReference,
941       final AtomicReference<Context> context,
942       final AtomicBoolean contextCancelled) throws Exception {
943     createAndStartServer();
944     callListener = new ServerCall.Listener<String>() {
945       @Override
946       public void onReady() {
947         context.set(Context.current());
948         Context.current().addListener(new Context.CancellationListener() {
949           @Override
950           public void cancelled(Context context) {
951             contextCancelled.set(true);
952           }
953         }, MoreExecutors.directExecutor());
954       }
955     };
956 
957     mutableFallbackRegistry.addService(ServerServiceDefinition.builder(
958         new ServiceDescriptor("Waiter", METHOD))
959         .addMethod(METHOD,
960             new ServerCallHandler<String, Integer>() {
961               @Override
962               public ServerCall.Listener<String> startCall(
963                   ServerCall<String, Integer> call,
964                   Metadata headers) {
965                 callReference.set(call);
966                 return callListener;
967               }
968             }).build());
969     ServerTransportListener transportListener
970         = transportServer.registerNewServerTransport(new SimpleServerTransport());
971     transportListener.transportReady(Attributes.EMPTY);
972     Metadata requestHeaders = new Metadata();
973     StatsTraceContext statsTraceCtx =
974         StatsTraceContext.newServerContext(streamTracerFactories, "Waitier/serve", requestHeaders);
975     when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
976     transportListener.streamCreated(stream, "Waiter/serve", requestHeaders);
977     verify(stream).setListener(streamListenerCaptor.capture());
978     ServerStreamListener streamListener = streamListenerCaptor.getValue();
979     assertNotNull(streamListener);
980 
981     streamListener.onReady();
982     assertEquals(1, executor.runDueTasks());
983     return streamListener;
984   }
985 
986   @Test
testClientClose_cancelTriggersImmediateCancellation()987   public void testClientClose_cancelTriggersImmediateCancellation() throws Exception {
988     AtomicBoolean contextCancelled = new AtomicBoolean(false);
989     AtomicReference<Context> context = new AtomicReference<Context>();
990     AtomicReference<ServerCall<String, Integer>> callReference
991         = new AtomicReference<ServerCall<String, Integer>>();
992 
993     ServerStreamListener streamListener = testClientClose_setup(callReference,
994         context, contextCancelled);
995 
996     // For close status being non OK:
997     // isCancelled is expected to be true immediately after calling closed(), without needing
998     // to wait for the main executor to run any tasks.
999     assertFalse(callReference.get().isCancelled());
1000     assertFalse(context.get().isCancelled());
1001     streamListener.closed(Status.CANCELLED);
1002     assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FITLER));
1003     assertEquals(2, executor.runDueTasks());
1004     assertTrue(callReference.get().isCancelled());
1005     assertTrue(context.get().isCancelled());
1006     assertTrue(contextCancelled.get());
1007   }
1008 
1009   @Test
testClientClose_OkTriggersDelayedCancellation()1010   public void testClientClose_OkTriggersDelayedCancellation() throws Exception {
1011     AtomicBoolean contextCancelled = new AtomicBoolean(false);
1012     AtomicReference<Context> context = new AtomicReference<Context>();
1013     AtomicReference<ServerCall<String, Integer>> callReference
1014         = new AtomicReference<ServerCall<String, Integer>>();
1015 
1016     ServerStreamListener streamListener = testClientClose_setup(callReference,
1017         context, contextCancelled);
1018 
1019     // For close status OK:
1020     // isCancelled is expected to be true after all pending work is done
1021     assertFalse(callReference.get().isCancelled());
1022     assertFalse(context.get().isCancelled());
1023     streamListener.closed(Status.OK);
1024     assertFalse(callReference.get().isCancelled());
1025     assertFalse(context.get().isCancelled());
1026 
1027     assertEquals(1, executor.runDueTasks());
1028     assertTrue(callReference.get().isCancelled());
1029     assertTrue(context.get().isCancelled());
1030     assertTrue(contextCancelled.get());
1031   }
1032 
1033   @Test
getPort()1034   public void getPort() throws Exception {
1035     transportServer = new SimpleServer() {
1036       @Override
1037       public int getPort() {
1038         return 65535;
1039       }
1040     };
1041     createAndStartServer();
1042 
1043     assertThat(server.getPort()).isEqualTo(65535);
1044   }
1045 
1046   @Test
getPortBeforeStartedFails()1047   public void getPortBeforeStartedFails() {
1048     transportServer = new SimpleServer();
1049     createServer();
1050     thrown.expect(IllegalStateException.class);
1051     thrown.expectMessage("started");
1052     server.getPort();
1053   }
1054 
1055   @Test
getPortAfterTerminationFails()1056   public void getPortAfterTerminationFails() throws Exception {
1057     transportServer = new SimpleServer();
1058     createAndStartServer();
1059     server.shutdown();
1060     server.awaitTermination();
1061     thrown.expect(IllegalStateException.class);
1062     thrown.expectMessage("terminated");
1063     server.getPort();
1064   }
1065 
1066   @Test
handlerRegistryPriorities()1067   public void handlerRegistryPriorities() throws Exception {
1068     fallbackRegistry = mock(HandlerRegistry.class);
1069     builder.addService(
1070         ServerServiceDefinition.builder(new ServiceDescriptor("Waiter", METHOD))
1071             .addMethod(METHOD, callHandler).build());
1072     transportServer = new SimpleServer();
1073     createAndStartServer();
1074 
1075     ServerTransportListener transportListener
1076         = transportServer.registerNewServerTransport(new SimpleServerTransport());
1077     transportListener.transportReady(Attributes.EMPTY);
1078     Metadata requestHeaders = new Metadata();
1079     StatsTraceContext statsTraceCtx =
1080         StatsTraceContext.newServerContext(streamTracerFactories, "Waiter/serve", requestHeaders);
1081     when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
1082 
1083     // This call will be handled by callHandler from the internal registry
1084     transportListener.streamCreated(stream, "Waiter/serve", requestHeaders);
1085     assertEquals(1, executor.runDueTasks());
1086     verify(callHandler).startCall(Matchers.<ServerCall<String, Integer>>anyObject(),
1087         Matchers.<Metadata>anyObject());
1088     // This call will be handled by the fallbackRegistry because it's not registred in the internal
1089     // registry.
1090     transportListener.streamCreated(stream, "Service1/Method2", requestHeaders);
1091     assertEquals(1, executor.runDueTasks());
1092     verify(fallbackRegistry).lookupMethod("Service1/Method2", AUTHORITY);
1093 
1094     verifyNoMoreInteractions(callHandler);
1095     verifyNoMoreInteractions(fallbackRegistry);
1096   }
1097 
1098   @Test
messageRead_errorCancelsCall()1099   public void messageRead_errorCancelsCall() throws Exception {
1100     JumpToApplicationThreadServerStreamListener listener
1101         = new JumpToApplicationThreadServerStreamListener(
1102             executor.getScheduledExecutorService(),
1103             executor.getScheduledExecutorService(),
1104             stream,
1105             Context.ROOT.withCancellation());
1106     ServerStreamListener mockListener = mock(ServerStreamListener.class);
1107     listener.setListener(mockListener);
1108 
1109     TestError expectedT = new TestError();
1110     doThrow(expectedT).when(mockListener)
1111         .messagesAvailable(any(StreamListener.MessageProducer.class));
1112     // Closing the InputStream is done by the delegated listener (generally ServerCallImpl)
1113     listener.messagesAvailable(mock(StreamListener.MessageProducer.class));
1114     try {
1115       executor.runDueTasks();
1116       fail("Expected exception");
1117     } catch (TestError t) {
1118       assertSame(expectedT, t);
1119       ensureServerStateNotLeaked();
1120     }
1121   }
1122 
1123   @Test
messageRead_runtimeExceptionCancelsCall()1124   public void messageRead_runtimeExceptionCancelsCall() throws Exception {
1125     JumpToApplicationThreadServerStreamListener listener
1126         = new JumpToApplicationThreadServerStreamListener(
1127             executor.getScheduledExecutorService(),
1128             executor.getScheduledExecutorService(),
1129             stream,
1130             Context.ROOT.withCancellation());
1131     ServerStreamListener mockListener = mock(ServerStreamListener.class);
1132     listener.setListener(mockListener);
1133 
1134     RuntimeException expectedT = new RuntimeException();
1135     doThrow(expectedT).when(mockListener)
1136         .messagesAvailable(any(StreamListener.MessageProducer.class));
1137     // Closing the InputStream is done by the delegated listener (generally ServerCallImpl)
1138     listener.messagesAvailable(mock(StreamListener.MessageProducer.class));
1139     try {
1140       executor.runDueTasks();
1141       fail("Expected exception");
1142     } catch (RuntimeException t) {
1143       assertSame(expectedT, t);
1144       ensureServerStateNotLeaked();
1145     }
1146   }
1147 
1148   @Test
halfClosed_errorCancelsCall()1149   public void halfClosed_errorCancelsCall() {
1150     JumpToApplicationThreadServerStreamListener listener
1151         = new JumpToApplicationThreadServerStreamListener(
1152             executor.getScheduledExecutorService(),
1153             executor.getScheduledExecutorService(),
1154             stream,
1155             Context.ROOT.withCancellation());
1156     ServerStreamListener mockListener = mock(ServerStreamListener.class);
1157     listener.setListener(mockListener);
1158 
1159     TestError expectedT = new TestError();
1160     doThrow(expectedT).when(mockListener).halfClosed();
1161     listener.halfClosed();
1162     try {
1163       executor.runDueTasks();
1164       fail("Expected exception");
1165     } catch (TestError t) {
1166       assertSame(expectedT, t);
1167       ensureServerStateNotLeaked();
1168     }
1169   }
1170 
1171   @Test
halfClosed_runtimeExceptionCancelsCall()1172   public void halfClosed_runtimeExceptionCancelsCall() {
1173     JumpToApplicationThreadServerStreamListener listener
1174         = new JumpToApplicationThreadServerStreamListener(
1175             executor.getScheduledExecutorService(),
1176             executor.getScheduledExecutorService(),
1177             stream,
1178             Context.ROOT.withCancellation());
1179     ServerStreamListener mockListener = mock(ServerStreamListener.class);
1180     listener.setListener(mockListener);
1181 
1182     RuntimeException expectedT = new RuntimeException();
1183     doThrow(expectedT).when(mockListener).halfClosed();
1184     listener.halfClosed();
1185     try {
1186       executor.runDueTasks();
1187       fail("Expected exception");
1188     } catch (RuntimeException t) {
1189       assertSame(expectedT, t);
1190       ensureServerStateNotLeaked();
1191     }
1192   }
1193 
1194   @Test
onReady_errorCancelsCall()1195   public void onReady_errorCancelsCall() {
1196     JumpToApplicationThreadServerStreamListener listener
1197         = new JumpToApplicationThreadServerStreamListener(
1198             executor.getScheduledExecutorService(),
1199             executor.getScheduledExecutorService(),
1200             stream,
1201             Context.ROOT.withCancellation());
1202     ServerStreamListener mockListener = mock(ServerStreamListener.class);
1203     listener.setListener(mockListener);
1204 
1205     TestError expectedT = new TestError();
1206     doThrow(expectedT).when(mockListener).onReady();
1207     listener.onReady();
1208     try {
1209       executor.runDueTasks();
1210       fail("Expected exception");
1211     } catch (TestError t) {
1212       assertSame(expectedT, t);
1213       ensureServerStateNotLeaked();
1214     }
1215   }
1216 
1217   @Test
onReady_runtimeExceptionCancelsCall()1218   public void onReady_runtimeExceptionCancelsCall() {
1219     JumpToApplicationThreadServerStreamListener listener
1220         = new JumpToApplicationThreadServerStreamListener(
1221             executor.getScheduledExecutorService(),
1222             executor.getScheduledExecutorService(),
1223             stream,
1224             Context.ROOT.withCancellation());
1225     ServerStreamListener mockListener = mock(ServerStreamListener.class);
1226     listener.setListener(mockListener);
1227 
1228     RuntimeException expectedT = new RuntimeException();
1229     doThrow(expectedT).when(mockListener).onReady();
1230     listener.onReady();
1231     try {
1232       executor.runDueTasks();
1233       fail("Expected exception");
1234     } catch (RuntimeException t) {
1235       assertSame(expectedT, t);
1236       ensureServerStateNotLeaked();
1237     }
1238   }
1239 
1240   @Test
binaryLogInstalled()1241   public void binaryLogInstalled() throws Exception {
1242     final SettableFuture<Boolean> intercepted = SettableFuture.create();
1243     final ServerInterceptor interceptor = new ServerInterceptor() {
1244       @Override
1245       public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
1246           Metadata headers,
1247           ServerCallHandler<ReqT, RespT> next) {
1248         intercepted.set(true);
1249         return next.startCall(call, headers);
1250       }
1251     };
1252 
1253     builder.binlog = new BinaryLog() {
1254       @Override
1255       public void close() throws IOException {
1256         // noop
1257       }
1258 
1259       @Override
1260       public <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
1261           ServerMethodDefinition<ReqT, RespT> oMethodDef) {
1262         return ServerMethodDefinition.create(
1263             oMethodDef.getMethodDescriptor(),
1264             InternalServerInterceptors.interceptCallHandlerCreate(
1265                 interceptor,
1266                 oMethodDef.getServerCallHandler()));
1267       }
1268 
1269       @Override
1270       public Channel wrapChannel(Channel channel) {
1271         return channel;
1272       }
1273     };
1274     createAndStartServer();
1275     basicExchangeHelper(METHOD, "Lots of pizza, please", 314, 50);
1276     assertTrue(intercepted.get());
1277   }
1278 
1279   @Test
channelz_membership()1280   public void channelz_membership() throws Exception {
1281     createServer();
1282     assertTrue(builder.channelz.containsServer(server.getLogId()));
1283     server.shutdownNow().awaitTermination();
1284     assertFalse(builder.channelz.containsServer(server.getLogId()));
1285   }
1286 
1287   @Test
channelz_serverStats()1288   public void channelz_serverStats() throws Exception {
1289     createAndStartServer();
1290     assertEquals(0, server.getStats().get().callsSucceeded);
1291     basicExchangeHelper(METHOD, "Lots of pizza, please", 314, null);
1292     assertEquals(1, server.getStats().get().callsSucceeded);
1293   }
1294 
1295   @Test
channelz_transport_membershp()1296   public void channelz_transport_membershp() throws Exception {
1297     createAndStartServer();
1298     SimpleServerTransport transport = new SimpleServerTransport();
1299 
1300     ServerSocketsList before = builder.channelz
1301         .getServerSockets(id(server), id(transport), /*maxPageSize=*/ 1);
1302     assertThat(before.sockets).isEmpty();
1303     assertTrue(before.end);
1304 
1305     ServerTransportListener listener
1306         = transportServer.registerNewServerTransport(transport);
1307     ServerSocketsList added = builder.channelz
1308         .getServerSockets(id(server), id(transport), /*maxPageSize=*/ 1);
1309     assertThat(added.sockets).containsExactly(transport);
1310     assertTrue(before.end);
1311 
1312     listener.transportTerminated();
1313     ServerSocketsList after = builder.channelz
1314         .getServerSockets(id(server), id(transport), /*maxPageSize=*/ 1);
1315     assertThat(after.sockets).isEmpty();
1316     assertTrue(after.end);
1317   }
1318 
createAndStartServer()1319   private void createAndStartServer() throws IOException {
1320     createServer();
1321     server.start();
1322   }
1323 
createServer()1324   private void createServer() {
1325     assertNull(server);
1326 
1327     builder.fallbackHandlerRegistry(fallbackRegistry);
1328     builder.executorPool = executorPool;
1329     server = new ServerImpl(builder, transportServer, SERVER_CONTEXT);
1330   }
1331 
verifyExecutorsAcquired()1332   private void verifyExecutorsAcquired() {
1333     verify(executorPool).getObject();
1334     verifyNoMoreInteractions(executorPool);
1335   }
1336 
verifyExecutorsNotReturned()1337   private void verifyExecutorsNotReturned() {
1338     verify(executorPool, never()).returnObject(any(Executor.class));
1339   }
1340 
verifyExecutorsReturned()1341   private void verifyExecutorsReturned() {
1342     verify(executorPool).returnObject(same(executor.getScheduledExecutorService()));
1343     verifyNoMoreInteractions(executorPool);
1344   }
1345 
ensureServerStateNotLeaked()1346   private void ensureServerStateNotLeaked() {
1347     verify(stream).close(statusCaptor.capture(), metadataCaptor.capture());
1348     assertEquals(Status.UNKNOWN, statusCaptor.getValue());
1349     assertNull(statusCaptor.getValue().getCause());
1350     assertTrue(metadataCaptor.getValue().keys().isEmpty());
1351   }
1352 
1353   private static class SimpleServer implements io.grpc.internal.InternalServer {
1354     ServerListener listener;
1355 
1356     @Override
start(ServerListener listener)1357     public void start(ServerListener listener) throws IOException {
1358       this.listener = listener;
1359     }
1360 
1361     @Override
getPort()1362     public int getPort() {
1363       return -1;
1364     }
1365 
1366     @Override
getListenSockets()1367     public List<InternalInstrumented<SocketStats>> getListenSockets() {
1368       return Collections.emptyList();
1369     }
1370 
1371     @Override
shutdown()1372     public void shutdown() {
1373       listener.serverShutdown();
1374     }
1375 
registerNewServerTransport(SimpleServerTransport transport)1376     public ServerTransportListener registerNewServerTransport(SimpleServerTransport transport) {
1377       return transport.listener = listener.transportCreated(transport);
1378     }
1379   }
1380 
1381   private class SimpleServerTransport implements ServerTransport {
1382     ServerTransportListener listener;
1383     InternalLogId id = InternalLogId.allocate(getClass().getName());
1384 
1385     @Override
shutdown()1386     public void shutdown() {
1387       listener.transportTerminated();
1388     }
1389 
1390     @Override
shutdownNow(Status status)1391     public void shutdownNow(Status status) {
1392       listener.transportTerminated();
1393     }
1394 
1395     @Override
getLogId()1396     public InternalLogId getLogId() {
1397       return id;
1398     }
1399 
1400     @Override
getScheduledExecutorService()1401     public ScheduledExecutorService getScheduledExecutorService() {
1402       return timer.getScheduledExecutorService();
1403     }
1404 
1405     @Override
getStats()1406     public ListenableFuture<SocketStats> getStats() {
1407       SettableFuture<SocketStats> ret = SettableFuture.create();
1408       ret.set(null);
1409       return ret;
1410     }
1411   }
1412 
1413   private static class Builder extends AbstractServerImplBuilder<Builder> {
buildTransportServer( List<ServerStreamTracer.Factory> streamTracerFactories)1414     @Override protected InternalServer buildTransportServer(
1415         List<ServerStreamTracer.Factory> streamTracerFactories) {
1416       throw new UnsupportedOperationException();
1417     }
1418 
useTransportSecurity(File f1, File f2)1419     @Override public Builder useTransportSecurity(File f1, File f2)  {
1420       throw new UnsupportedOperationException();
1421     }
1422   }
1423 
1424   /** Allows more precise catch blocks than plain Error to avoid catching AssertionError. */
1425   private static final class TestError extends Error {}
1426 }
1427