• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
22 import static io.grpc.Contexts.statusFromCancelled;
23 import static io.grpc.Status.DEADLINE_EXCEEDED;
24 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
25 import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
26 import static java.util.concurrent.TimeUnit.NANOSECONDS;
27 
28 import com.google.common.annotations.VisibleForTesting;
29 import com.google.common.base.MoreObjects;
30 import com.google.common.base.Preconditions;
31 import com.google.common.util.concurrent.Futures;
32 import com.google.common.util.concurrent.ListenableFuture;
33 import com.google.common.util.concurrent.SettableFuture;
34 import io.grpc.Attributes;
35 import io.grpc.BinaryLog;
36 import io.grpc.CompressorRegistry;
37 import io.grpc.Context;
38 import io.grpc.Deadline;
39 import io.grpc.Decompressor;
40 import io.grpc.DecompressorRegistry;
41 import io.grpc.HandlerRegistry;
42 import io.grpc.InternalChannelz;
43 import io.grpc.InternalChannelz.ServerStats;
44 import io.grpc.InternalChannelz.SocketStats;
45 import io.grpc.InternalInstrumented;
46 import io.grpc.InternalLogId;
47 import io.grpc.InternalServerInterceptors;
48 import io.grpc.InternalStatus;
49 import io.grpc.Metadata;
50 import io.grpc.ServerCall;
51 import io.grpc.ServerCallExecutorSupplier;
52 import io.grpc.ServerCallHandler;
53 import io.grpc.ServerInterceptor;
54 import io.grpc.ServerMethodDefinition;
55 import io.grpc.ServerServiceDefinition;
56 import io.grpc.ServerTransportFilter;
57 import io.grpc.Status;
58 import io.perfmark.Link;
59 import io.perfmark.PerfMark;
60 import io.perfmark.Tag;
61 import io.perfmark.TaskCloseable;
62 import java.io.IOException;
63 import java.io.InputStream;
64 import java.net.InetSocketAddress;
65 import java.net.SocketAddress;
66 import java.util.ArrayList;
67 import java.util.Collection;
68 import java.util.Collections;
69 import java.util.HashSet;
70 import java.util.List;
71 import java.util.Set;
72 import java.util.concurrent.Executor;
73 import java.util.concurrent.Future;
74 import java.util.concurrent.FutureTask;
75 import java.util.concurrent.TimeUnit;
76 import java.util.logging.Level;
77 import java.util.logging.Logger;
78 import javax.annotation.concurrent.GuardedBy;
79 
80 /**
81  * Default implementation of {@link io.grpc.Server}, for creation by transports.
82  *
83  * <p>Expected usage (by a theoretical TCP transport):
84  * <pre><code>public class TcpTransportServerFactory {
85  *   public static Server newServer(Executor executor, HandlerRegistry registry,
86  *       String configuration) {
87  *     return new ServerImpl(executor, registry, new TcpTransportServer(configuration));
88  *   }
89  * }</code></pre>
90  *
91  * <p>Starting the server starts the underlying transport for servicing requests. Stopping the
92  * server stops servicing new requests and waits for all connections to terminate.
93  */
94 public final class ServerImpl extends io.grpc.Server implements InternalInstrumented<ServerStats> {
95   private static final Logger log = Logger.getLogger(ServerImpl.class.getName());
96   private static final ServerStreamListener NOOP_LISTENER = new NoopListener();
97 
98   private final InternalLogId logId;
99   private final ObjectPool<? extends Executor> executorPool;
100   /** Executor for application processing. Safe to read after {@link #start()}. */
101   private Executor executor;
102   private final HandlerRegistry registry;
103   private final HandlerRegistry fallbackRegistry;
104   private final List<ServerTransportFilter> transportFilters;
105   // This is iterated on a per-call basis.  Use an array instead of a Collection to avoid iterator
106   // creations.
107   private final ServerInterceptor[] interceptors;
108   private final long handshakeTimeoutMillis;
109   @GuardedBy("lock") private boolean started;
110   @GuardedBy("lock") private boolean shutdown;
111   /** non-{@code null} if immediate shutdown has been requested. */
112   @GuardedBy("lock") private Status shutdownNowStatus;
113   /** {@code true} if ServerListenerImpl.serverShutdown() was called. */
114   @GuardedBy("lock") private boolean serverShutdownCallbackInvoked;
115   @GuardedBy("lock") private boolean terminated;
116   /** Service encapsulating something similar to an accept() socket. */
117   private final InternalServer transportServer;
118   private final Object lock = new Object();
119   @GuardedBy("lock") private boolean transportServersTerminated;
120   /** {@code transportServer} and services encapsulating something similar to a TCP connection. */
121   @GuardedBy("lock") private final Set<ServerTransport> transports = new HashSet<>();
122 
123   private final Context rootContext;
124 
125   private final DecompressorRegistry decompressorRegistry;
126   private final CompressorRegistry compressorRegistry;
127   private final BinaryLog binlog;
128 
129   private final InternalChannelz channelz;
130   private final CallTracer serverCallTracer;
131   private final Deadline.Ticker ticker;
132   private final ServerCallExecutorSupplier executorSupplier;
133 
134   /**
135    * Construct a server.
136    *
137    * @param builder builder with configuration for server
138    * @param transportServer transport servers that will create new incoming transports
139    * @param rootContext context that callbacks for new RPCs should be derived from
140    */
ServerImpl( ServerImplBuilder builder, InternalServer transportServer, Context rootContext)141   ServerImpl(
142       ServerImplBuilder builder,
143       InternalServer transportServer,
144       Context rootContext) {
145     this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
146     this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
147     this.fallbackRegistry =
148         Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
149     this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
150     this.logId =
151         InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle()));
152     // Fork from the passed in context so that it does not propagate cancellation, it only
153     // inherits values.
154     this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork();
155     this.decompressorRegistry = builder.decompressorRegistry;
156     this.compressorRegistry = builder.compressorRegistry;
157     this.transportFilters = Collections.unmodifiableList(
158         new ArrayList<>(builder.transportFilters));
159     this.interceptors =
160         builder.interceptors.toArray(new ServerInterceptor[builder.interceptors.size()]);
161     this.handshakeTimeoutMillis = builder.handshakeTimeoutMillis;
162     this.binlog = builder.binlog;
163     this.channelz = builder.channelz;
164     this.serverCallTracer = builder.callTracerFactory.create();
165     this.ticker = checkNotNull(builder.ticker, "ticker");
166     channelz.addServer(this);
167     this.executorSupplier = builder.executorSupplier;
168   }
169 
170   /**
171    * Bind and start the server.
172    *
173    * @return {@code this} object
174    * @throws IllegalStateException if already started
175    * @throws IOException if unable to bind
176    */
177   @Override
start()178   public ServerImpl start() throws IOException {
179     synchronized (lock) {
180       checkState(!started, "Already started");
181       checkState(!shutdown, "Shutting down");
182       // Start and wait for any ports to actually be bound.
183 
184       ServerListenerImpl listener = new ServerListenerImpl();
185       transportServer.start(listener);
186       executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
187       started = true;
188       return this;
189     }
190   }
191 
192 
193   @Override
getPort()194   public int getPort() {
195     synchronized (lock) {
196       checkState(started, "Not started");
197       checkState(!terminated, "Already terminated");
198       for (SocketAddress addr: transportServer.getListenSocketAddresses()) {
199         if (addr instanceof InetSocketAddress) {
200           return ((InetSocketAddress) addr).getPort();
201         }
202       }
203       return -1;
204     }
205   }
206 
207   @Override
getListenSockets()208   public List<SocketAddress> getListenSockets() {
209     synchronized (lock) {
210       checkState(started, "Not started");
211       checkState(!terminated, "Already terminated");
212       return getListenSocketsIgnoringLifecycle();
213     }
214   }
215 
getListenSocketsIgnoringLifecycle()216   private List<SocketAddress> getListenSocketsIgnoringLifecycle() {
217     synchronized (lock) {
218       return Collections.unmodifiableList(transportServer.getListenSocketAddresses());
219     }
220   }
221 
222   @Override
getServices()223   public List<ServerServiceDefinition> getServices() {
224     List<ServerServiceDefinition> fallbackServices = fallbackRegistry.getServices();
225     if (fallbackServices.isEmpty()) {
226       return registry.getServices();
227     } else {
228       List<ServerServiceDefinition> registryServices = registry.getServices();
229       int servicesCount = registryServices.size() + fallbackServices.size();
230       List<ServerServiceDefinition> services =
231           new ArrayList<>(servicesCount);
232       services.addAll(registryServices);
233       services.addAll(fallbackServices);
234       return Collections.unmodifiableList(services);
235     }
236   }
237 
238   @Override
getImmutableServices()239   public List<ServerServiceDefinition> getImmutableServices() {
240     return registry.getServices();
241   }
242 
243   @Override
getMutableServices()244   public List<ServerServiceDefinition> getMutableServices() {
245     return Collections.unmodifiableList(fallbackRegistry.getServices());
246   }
247 
248   /**
249    * Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected.
250    */
251   @Override
shutdown()252   public ServerImpl shutdown() {
253     boolean shutdownTransportServers;
254     synchronized (lock) {
255       if (shutdown) {
256         return this;
257       }
258       shutdown = true;
259       shutdownTransportServers = started;
260       if (!shutdownTransportServers) {
261         transportServersTerminated = true;
262         checkForTermination();
263       }
264     }
265     if (shutdownTransportServers) {
266       transportServer.shutdown();
267     }
268     return this;
269   }
270 
271   @Override
shutdownNow()272   public ServerImpl shutdownNow() {
273     shutdown();
274     Collection<ServerTransport> transportsCopy;
275     Status nowStatus = Status.UNAVAILABLE.withDescription("Server shutdownNow invoked");
276     boolean savedServerShutdownCallbackInvoked;
277     synchronized (lock) {
278       // Short-circuiting not strictly necessary, but prevents transports from needing to handle
279       // multiple shutdownNow invocations if shutdownNow is called multiple times.
280       if (shutdownNowStatus != null) {
281         return this;
282       }
283       shutdownNowStatus = nowStatus;
284       transportsCopy = new ArrayList<>(transports);
285       savedServerShutdownCallbackInvoked = serverShutdownCallbackInvoked;
286     }
287     // Short-circuiting not strictly necessary, but prevents transports from needing to handle
288     // multiple shutdownNow invocations, between here and the serverShutdown callback.
289     if (savedServerShutdownCallbackInvoked) {
290       // Have to call shutdownNow, because serverShutdown callback only called shutdown, not
291       // shutdownNow
292       for (ServerTransport transport : transportsCopy) {
293         transport.shutdownNow(nowStatus);
294       }
295     }
296     return this;
297   }
298 
299   @Override
isShutdown()300   public boolean isShutdown() {
301     synchronized (lock) {
302       return shutdown;
303     }
304   }
305 
306   @Override
awaitTermination(long timeout, TimeUnit unit)307   public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
308     synchronized (lock) {
309       long timeoutNanos = unit.toNanos(timeout);
310       long endTimeNanos = System.nanoTime() + timeoutNanos;
311       while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) {
312         NANOSECONDS.timedWait(lock, timeoutNanos);
313       }
314       return terminated;
315     }
316   }
317 
318   @Override
awaitTermination()319   public void awaitTermination() throws InterruptedException {
320     synchronized (lock) {
321       while (!terminated) {
322         lock.wait();
323       }
324     }
325   }
326 
327   @Override
isTerminated()328   public boolean isTerminated() {
329     synchronized (lock) {
330       return terminated;
331     }
332   }
333 
334   /**
335    * Remove transport service from accounting collection and notify of complete shutdown if
336    * necessary.
337    *
338    * @param transport service to remove
339    */
transportClosed(ServerTransport transport)340   private void transportClosed(ServerTransport transport) {
341     synchronized (lock) {
342       if (!transports.remove(transport)) {
343         throw new AssertionError("Transport already removed");
344       }
345       channelz.removeServerSocket(ServerImpl.this, transport);
346       checkForTermination();
347     }
348   }
349 
350   /** Notify of complete shutdown if necessary. */
checkForTermination()351   private void checkForTermination() {
352     synchronized (lock) {
353       if (shutdown && transports.isEmpty() && transportServersTerminated) {
354         if (terminated) {
355           throw new AssertionError("Server already terminated");
356         }
357         terminated = true;
358         channelz.removeServer(this);
359         if (executor != null) {
360           executor = executorPool.returnObject(executor);
361         }
362         lock.notifyAll();
363       }
364     }
365   }
366 
367   private final class ServerListenerImpl implements ServerListener {
368 
369     @Override
transportCreated(ServerTransport transport)370     public ServerTransportListener transportCreated(ServerTransport transport) {
371       synchronized (lock) {
372         transports.add(transport);
373       }
374       ServerTransportListenerImpl stli = new ServerTransportListenerImpl(transport);
375       stli.init();
376       return stli;
377     }
378 
379     @Override
serverShutdown()380     public void serverShutdown() {
381       ArrayList<ServerTransport> copiedTransports;
382       Status shutdownNowStatusCopy;
383       synchronized (lock) {
384         if (serverShutdownCallbackInvoked) {
385           return;
386         }
387 
388         // transports collection can be modified during shutdown(), even if we hold the lock, due
389         // to reentrancy.
390         copiedTransports = new ArrayList<>(transports);
391         shutdownNowStatusCopy = shutdownNowStatus;
392         serverShutdownCallbackInvoked = true;
393       }
394       for (ServerTransport transport : copiedTransports) {
395         if (shutdownNowStatusCopy == null) {
396           transport.shutdown();
397         } else {
398           transport.shutdownNow(shutdownNowStatusCopy);
399         }
400       }
401       synchronized (lock) {
402         transportServersTerminated = true;
403         checkForTermination();
404       }
405     }
406   }
407 
408   private final class ServerTransportListenerImpl implements ServerTransportListener {
409     private final ServerTransport transport;
410     private Future<?> handshakeTimeoutFuture;
411     private Attributes attributes;
412 
ServerTransportListenerImpl(ServerTransport transport)413     ServerTransportListenerImpl(ServerTransport transport) {
414       this.transport = transport;
415     }
416 
init()417     public void init() {
418       class TransportShutdownNow implements Runnable {
419         @Override public void run() {
420           transport.shutdownNow(Status.CANCELLED.withDescription("Handshake timeout exceeded"));
421         }
422       }
423 
424       if (handshakeTimeoutMillis != Long.MAX_VALUE) {
425         handshakeTimeoutFuture = transport.getScheduledExecutorService()
426             .schedule(new TransportShutdownNow(), handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
427       } else {
428         // Noop, to avoid triggering Thread creation in InProcessServer
429         handshakeTimeoutFuture = new FutureTask<Void>(new Runnable() {
430           @Override public void run() {}
431         }, null);
432       }
433       channelz.addServerSocket(ServerImpl.this, transport);
434     }
435 
436     @Override
transportReady(Attributes attributes)437     public Attributes transportReady(Attributes attributes) {
438       handshakeTimeoutFuture.cancel(false);
439       handshakeTimeoutFuture = null;
440 
441       for (ServerTransportFilter filter : transportFilters) {
442         attributes = Preconditions.checkNotNull(filter.transportReady(attributes),
443             "Filter %s returned null", filter);
444       }
445       this.attributes = attributes;
446       return attributes;
447     }
448 
449     @Override
transportTerminated()450     public void transportTerminated() {
451       if (handshakeTimeoutFuture != null) {
452         handshakeTimeoutFuture.cancel(false);
453         handshakeTimeoutFuture = null;
454       }
455       for (ServerTransportFilter filter : transportFilters) {
456         filter.transportTerminated(attributes);
457       }
458       transportClosed(transport);
459     }
460 
461 
462     @Override
streamCreated(ServerStream stream, String methodName, Metadata headers)463     public void streamCreated(ServerStream stream, String methodName, Metadata headers) {
464       Tag tag = PerfMark.createTag(methodName, stream.streamId());
465       try (TaskCloseable ignore = PerfMark.traceTask("ServerTransportListener.streamCreated")) {
466         PerfMark.attachTag(tag);
467         streamCreatedInternal(stream, methodName, headers, tag);
468       }
469     }
470 
streamCreatedInternal( final ServerStream stream, final String methodName, final Metadata headers, final Tag tag)471     private void streamCreatedInternal(
472         final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) {
473       final Executor wrappedExecutor;
474       // This is a performance optimization that avoids the synchronization and queuing overhead
475       // that comes with SerializingExecutor.
476       if (executorSupplier != null || executor != directExecutor()) {
477         wrappedExecutor = new SerializingExecutor(executor);
478       } else {
479         wrappedExecutor = new SerializeReentrantCallsDirectExecutor();
480         stream.optimizeForDirectExecutor();
481       }
482 
483       if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
484         String encoding = headers.get(MESSAGE_ENCODING_KEY);
485         Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding);
486         if (decompressor == null) {
487           stream.setListener(NOOP_LISTENER);
488           stream.close(
489               Status.UNIMPLEMENTED.withDescription(
490                   String.format("Can't find decompressor for %s", encoding)),
491               new Metadata());
492           return;
493         }
494         stream.setDecompressor(decompressor);
495       }
496 
497       final StatsTraceContext statsTraceCtx = Preconditions.checkNotNull(
498           stream.statsTraceContext(), "statsTraceCtx not present from stream");
499 
500       final Context.CancellableContext context = createContext(headers, statsTraceCtx);
501 
502       final Link link = PerfMark.linkOut();
503 
504       final JumpToApplicationThreadServerStreamListener jumpListener
505           = new JumpToApplicationThreadServerStreamListener(
506                   wrappedExecutor, executor, stream, context, tag);
507       stream.setListener(jumpListener);
508       final SettableFuture<ServerCallParameters<?,?>> future = SettableFuture.create();
509       // Run in serializing executor so jumpListener.setListener() is called before any callbacks
510       // are delivered, including any errors. MethodLookup() and HandleServerCall() are proactively
511       // queued before any callbacks are queued at serializing executor.
512       // MethodLookup() runs on the default executor.
513       // When executorSupplier is enabled, MethodLookup() may set/change the executor in the
514       // SerializingExecutor before it finishes running.
515       // Then HandleServerCall() and callbacks would switch to the executorSupplier executor.
516       // Otherwise, they all run on the default executor.
517 
518       final class MethodLookup extends ContextRunnable {
519         MethodLookup() {
520           super(context);
521         }
522 
523         @Override
524         public void runInContext() {
525           try (TaskCloseable ignore =
526                    PerfMark.traceTask("ServerTransportListener$MethodLookup.startCall")) {
527             PerfMark.attachTag(tag);
528             PerfMark.linkIn(link);
529             runInternal();
530           }
531         }
532 
533         private void runInternal() {
534           ServerMethodDefinition<?, ?> wrapMethod;
535           ServerCallParameters<?, ?> callParams;
536           try {
537             ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
538             if (method == null) {
539               method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
540             }
541             if (method == null) {
542               Status status = Status.UNIMPLEMENTED.withDescription(
543                       "Method not found: " + methodName);
544               // TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in
545               // memory as a map whose key is the method name, this would allow a misbehaving
546               // client to blow up the server in-memory stats storage by sending large number of
547               // distinct unimplemented method
548               // names. (https://github.com/grpc/grpc-java/issues/2285)
549               jumpListener.setListener(NOOP_LISTENER);
550               stream.close(status, new Metadata());
551               context.cancel(null);
552               future.cancel(false);
553               return;
554             }
555             wrapMethod = wrapMethod(stream, method, statsTraceCtx);
556             callParams = maySwitchExecutor(wrapMethod, stream, headers, context, tag);
557             future.set(callParams);
558           } catch (Throwable t) {
559             jumpListener.setListener(NOOP_LISTENER);
560             stream.close(Status.fromThrowable(t), new Metadata());
561             context.cancel(null);
562             future.cancel(false);
563             throw t;
564           }
565         }
566 
567         private <ReqT, RespT> ServerCallParameters<ReqT, RespT> maySwitchExecutor(
568             final ServerMethodDefinition<ReqT, RespT> methodDef,
569             final ServerStream stream,
570             final Metadata headers,
571             final Context.CancellableContext context,
572             final Tag tag) {
573           final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<>(
574                   stream,
575                   methodDef.getMethodDescriptor(),
576                   headers,
577                   context,
578                   decompressorRegistry,
579                   compressorRegistry,
580                   serverCallTracer,
581                   tag);
582           if (executorSupplier != null) {
583             Executor switchingExecutor = executorSupplier.getExecutor(call, headers);
584             if (switchingExecutor != null) {
585               ((SerializingExecutor)wrappedExecutor).setExecutor(switchingExecutor);
586             }
587           }
588           return new ServerCallParameters<>(call, methodDef.getServerCallHandler());
589         }
590       }
591 
592       final class HandleServerCall extends ContextRunnable {
593         HandleServerCall() {
594           super(context);
595         }
596 
597         @Override
598         public void runInContext() {
599           try (TaskCloseable ignore =
600                    PerfMark.traceTask("ServerTransportListener$HandleServerCall.startCall")) {
601             PerfMark.linkIn(link);
602             PerfMark.attachTag(tag);
603             runInternal();
604           }
605         }
606 
607         private void runInternal() {
608           ServerStreamListener listener = NOOP_LISTENER;
609           if (future.isCancelled()) {
610             return;
611           }
612           try {
613             listener = startWrappedCall(methodName, Futures.getDone(future), headers);
614           } catch (Throwable ex) {
615             stream.close(Status.fromThrowable(ex), new Metadata());
616             context.cancel(null);
617             throw new IllegalStateException(ex);
618           } finally {
619             jumpListener.setListener(listener);
620           }
621 
622           // An extremely short deadline may expire before stream.setListener(jumpListener).
623           // This causes NPE as in issue: https://github.com/grpc/grpc-java/issues/6300
624           // Delay of setting cancellationListener to context will fix the issue.
625           final class ServerStreamCancellationListener implements Context.CancellationListener {
626             @Override
627             public void cancelled(Context context) {
628               Status status = statusFromCancelled(context);
629               if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
630                 // This should rarely get run, since the client will likely cancel the stream
631                 // before the timeout is reached.
632                 stream.cancel(status);
633               }
634             }
635           }
636 
637           context.addListener(new ServerStreamCancellationListener(), directExecutor());
638         }
639       }
640 
641       wrappedExecutor.execute(new MethodLookup());
642       wrappedExecutor.execute(new HandleServerCall());
643     }
644 
createContext( Metadata headers, StatsTraceContext statsTraceCtx)645     private Context.CancellableContext createContext(
646         Metadata headers, StatsTraceContext statsTraceCtx) {
647       Long timeoutNanos = headers.get(TIMEOUT_KEY);
648 
649       Context baseContext =
650           statsTraceCtx
651               .serverFilterContext(rootContext)
652               .withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this);
653 
654       if (timeoutNanos == null) {
655         return baseContext.withCancellation();
656       }
657 
658       Context.CancellableContext context =
659           baseContext.withDeadline(
660               Deadline.after(timeoutNanos, NANOSECONDS, ticker),
661               transport.getScheduledExecutorService());
662 
663       return context;
664     }
665 
666     /** Never returns {@code null}. */
wrapMethod(ServerStream stream, ServerMethodDefinition<ReqT, RespT> methodDef, StatsTraceContext statsTraceCtx)667     private <ReqT, RespT> ServerMethodDefinition<?,?> wrapMethod(ServerStream stream,
668         ServerMethodDefinition<ReqT, RespT> methodDef, StatsTraceContext statsTraceCtx) {
669       // TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
670       statsTraceCtx.serverCallStarted(
671           new ServerCallInfoImpl<>(
672               methodDef.getMethodDescriptor(), // notify with original method descriptor
673               stream.getAttributes(),
674               stream.getAuthority()));
675       ServerCallHandler<ReqT, RespT> handler = methodDef.getServerCallHandler();
676       for (ServerInterceptor interceptor : interceptors) {
677         handler = InternalServerInterceptors.interceptCallHandlerCreate(interceptor, handler);
678       }
679       ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
680       ServerMethodDefinition<?, ?> wMethodDef = binlog == null
681           ? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
682       return wMethodDef;
683     }
684 
685     private final class ServerCallParameters<ReqT, RespT> {
686       ServerCallImpl<ReqT, RespT> call;
687       ServerCallHandler<ReqT, RespT> callHandler;
688 
ServerCallParameters(ServerCallImpl<ReqT, RespT> call, ServerCallHandler<ReqT, RespT> callHandler)689       public ServerCallParameters(ServerCallImpl<ReqT, RespT> call,
690                                   ServerCallHandler<ReqT, RespT> callHandler) {
691         this.call = call;
692         this.callHandler = callHandler;
693       }
694     }
695 
startWrappedCall( String fullMethodName, ServerCallParameters<WReqT, WRespT> params, Metadata headers)696     private <WReqT, WRespT> ServerStreamListener startWrappedCall(
697         String fullMethodName,
698         ServerCallParameters<WReqT, WRespT> params,
699         Metadata headers) {
700       ServerCall.Listener<WReqT> callListener =
701               params.callHandler.startCall(params.call, headers);
702       if (callListener == null) {
703         throw new NullPointerException(
704                 "startCall() returned a null listener for method " + fullMethodName);
705       }
706       return params.call.newServerStreamListener(callListener);
707     }
708   }
709 
710   @Override
getLogId()711   public InternalLogId getLogId() {
712     return logId;
713   }
714 
715   @Override
getStats()716   public ListenableFuture<ServerStats> getStats() {
717     ServerStats.Builder builder = new ServerStats.Builder();
718     List<InternalInstrumented<SocketStats>> stats = transportServer.getListenSocketStatsList();
719     if (stats != null ) {
720       builder.addListenSockets(stats);
721     }
722     serverCallTracer.updateBuilder(builder);
723     SettableFuture<ServerStats> ret = SettableFuture.create();
724     ret.set(builder.build());
725     return ret;
726   }
727 
728   @Override
toString()729   public String toString() {
730     return MoreObjects.toStringHelper(this)
731         .add("logId", logId.getId())
732         .add("transportServer", transportServer)
733         .toString();
734   }
735 
736   private static final class NoopListener implements ServerStreamListener {
737     @Override
messagesAvailable(MessageProducer producer)738     public void messagesAvailable(MessageProducer producer) {
739       InputStream message;
740       while ((message = producer.next()) != null) {
741         try {
742           message.close();
743         } catch (IOException e) {
744           // Close any remaining messages
745           while ((message = producer.next()) != null) {
746             try {
747               message.close();
748             } catch (IOException ioException) {
749               // just log additional exceptions as we are already going to throw
750               log.log(Level.WARNING, "Exception closing stream", ioException);
751             }
752           }
753           throw new RuntimeException(e);
754         }
755       }
756     }
757 
758     @Override
halfClosed()759     public void halfClosed() {}
760 
761     @Override
closed(Status status)762     public void closed(Status status) {}
763 
764     @Override
onReady()765     public void onReady() {}
766   }
767 
768   /**
769    * Dispatches callbacks onto an application-provided executor and correctly propagates
770    * exceptions.
771    */
772   @VisibleForTesting
773   static final class JumpToApplicationThreadServerStreamListener implements ServerStreamListener {
774     private final Executor callExecutor;
775     private final Executor cancelExecutor;
776     private final Context.CancellableContext context;
777     private final ServerStream stream;
778     private final Tag tag;
779     // Only accessed from callExecutor.
780     private ServerStreamListener listener;
781 
JumpToApplicationThreadServerStreamListener(Executor executor, Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag)782     public JumpToApplicationThreadServerStreamListener(Executor executor,
783         Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) {
784       this.callExecutor = executor;
785       this.cancelExecutor = cancelExecutor;
786       this.stream = stream;
787       this.context = context;
788       this.tag = tag;
789     }
790 
791     /**
792      * This call MUST be serialized on callExecutor to avoid races.
793      */
getListener()794     private ServerStreamListener getListener() {
795       if (listener == null) {
796         throw new IllegalStateException("listener unset");
797       }
798       return listener;
799     }
800 
801     @VisibleForTesting
setListener(ServerStreamListener listener)802     void setListener(ServerStreamListener listener) {
803       Preconditions.checkNotNull(listener, "listener must not be null");
804       Preconditions.checkState(this.listener == null, "Listener already set");
805       this.listener = listener;
806     }
807 
808     /**
809      * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
810      */
internalClose(Throwable t)811     private void internalClose(Throwable t) {
812       // TODO(ejona86): this is not thread-safe :)
813       stream.close(Status.UNKNOWN.withCause(t), new Metadata());
814     }
815 
816     @Override
messagesAvailable(final MessageProducer producer)817     public void messagesAvailable(final MessageProducer producer) {
818       try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.messagesAvailable")) {
819         PerfMark.attachTag(tag);
820         final Link link = PerfMark.linkOut();
821         final class MessagesAvailable extends ContextRunnable {
822 
823           MessagesAvailable() {
824             super(context);
825           }
826 
827           @Override
828           public void runInContext() {
829             try (TaskCloseable ignore =
830                      PerfMark.traceTask("ServerCallListener(app).messagesAvailable")) {
831               PerfMark.attachTag(tag);
832               PerfMark.linkIn(link);
833               getListener().messagesAvailable(producer);
834             } catch (Throwable t) {
835               internalClose(t);
836               throw t;
837             }
838           }
839         }
840 
841         callExecutor.execute(new MessagesAvailable());
842       }
843     }
844 
845     @Override
halfClosed()846     public void halfClosed() {
847       try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.halfClosed")) {
848         PerfMark.attachTag(tag);
849         final Link link = PerfMark.linkOut();
850         final class HalfClosed extends ContextRunnable {
851           HalfClosed() {
852             super(context);
853           }
854 
855           @Override
856           public void runInContext() {
857             try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).halfClosed")) {
858               PerfMark.attachTag(tag);
859               PerfMark.linkIn(link);
860               getListener().halfClosed();
861             } catch (Throwable t) {
862               internalClose(t);
863               throw t;
864             }
865           }
866         }
867 
868         callExecutor.execute(new HalfClosed());
869       }
870     }
871 
872     @Override
closed(final Status status)873     public void closed(final Status status) {
874       try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.closed")) {
875         PerfMark.attachTag(tag);
876         closedInternal(status);
877       }
878     }
879 
closedInternal(final Status status)880     private void closedInternal(final Status status) {
881       // For cancellations, promptly inform any users of the context that their work should be
882       // aborted. Otherwise, we can wait until pending work is done.
883       if (!status.isOk()) {
884         // Since status was not OK we know that the call did not complete and got cancelled. To
885         // reflect this on the context we need to close it with a cause exception. Since not every
886         // failed status has an exception we will create one here if needed.
887         Throwable cancelCause = status.getCause();
888         if (cancelCause == null) {
889           cancelCause = InternalStatus.asRuntimeException(
890               Status.CANCELLED.withDescription("RPC cancelled"), null, false);
891         }
892 
893         // The callExecutor might be busy doing user work. To avoid waiting, use an executor that
894         // is not serializing.
895         cancelExecutor.execute(new ContextCloser(context, cancelCause));
896       }
897       final Link link = PerfMark.linkOut();
898 
899       final class Closed extends ContextRunnable {
900         Closed() {
901           super(context);
902         }
903 
904         @Override
905         public void runInContext() {
906           try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).closed")) {
907             PerfMark.attachTag(tag);
908             PerfMark.linkIn(link);
909             getListener().closed(status);
910           }
911         }
912       }
913 
914       callExecutor.execute(new Closed());
915     }
916 
917     @Override
onReady()918     public void onReady() {
919       try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.onReady")) {
920         PerfMark.attachTag(tag);
921         final Link link = PerfMark.linkOut();
922 
923         final class OnReady extends ContextRunnable {
924           OnReady() {
925             super(context);
926           }
927 
928           @Override
929           public void runInContext() {
930             try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).onReady")) {
931               PerfMark.attachTag(tag);
932               PerfMark.linkIn(link);
933               getListener().onReady();
934             } catch (Throwable t) {
935               internalClose(t);
936               throw t;
937             }
938           }
939         }
940 
941         callExecutor.execute(new OnReady());
942       }
943     }
944   }
945 
946   @VisibleForTesting
947   static final class ContextCloser implements Runnable {
948     private final Context.CancellableContext context;
949     private final Throwable cause;
950 
ContextCloser(Context.CancellableContext context, Throwable cause)951     ContextCloser(Context.CancellableContext context, Throwable cause) {
952       this.context = context;
953       this.cause = cause;
954     }
955 
956     @Override
run()957     public void run() {
958       context.cancel(cause);
959     }
960   }
961 }
962