1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 22 import static io.grpc.Contexts.statusFromCancelled; 23 import static io.grpc.Status.DEADLINE_EXCEEDED; 24 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; 25 import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; 26 import static java.util.concurrent.TimeUnit.NANOSECONDS; 27 28 import com.google.common.annotations.VisibleForTesting; 29 import com.google.common.base.MoreObjects; 30 import com.google.common.base.Preconditions; 31 import com.google.common.util.concurrent.Futures; 32 import com.google.common.util.concurrent.ListenableFuture; 33 import com.google.common.util.concurrent.SettableFuture; 34 import io.grpc.Attributes; 35 import io.grpc.BinaryLog; 36 import io.grpc.CompressorRegistry; 37 import io.grpc.Context; 38 import io.grpc.Deadline; 39 import io.grpc.Decompressor; 40 import io.grpc.DecompressorRegistry; 41 import io.grpc.HandlerRegistry; 42 import io.grpc.InternalChannelz; 43 import io.grpc.InternalChannelz.ServerStats; 44 import io.grpc.InternalChannelz.SocketStats; 45 import io.grpc.InternalInstrumented; 46 import io.grpc.InternalLogId; 47 import io.grpc.InternalServerInterceptors; 48 import io.grpc.InternalStatus; 49 import io.grpc.Metadata; 50 import io.grpc.ServerCall; 51 import io.grpc.ServerCallExecutorSupplier; 52 import io.grpc.ServerCallHandler; 53 import io.grpc.ServerInterceptor; 54 import io.grpc.ServerMethodDefinition; 55 import io.grpc.ServerServiceDefinition; 56 import io.grpc.ServerTransportFilter; 57 import io.grpc.Status; 58 import io.perfmark.Link; 59 import io.perfmark.PerfMark; 60 import io.perfmark.Tag; 61 import io.perfmark.TaskCloseable; 62 import java.io.IOException; 63 import java.io.InputStream; 64 import java.net.InetSocketAddress; 65 import java.net.SocketAddress; 66 import java.util.ArrayList; 67 import java.util.Collection; 68 import java.util.Collections; 69 import java.util.HashSet; 70 import java.util.List; 71 import java.util.Set; 72 import java.util.concurrent.Executor; 73 import java.util.concurrent.Future; 74 import java.util.concurrent.FutureTask; 75 import java.util.concurrent.TimeUnit; 76 import java.util.logging.Level; 77 import java.util.logging.Logger; 78 import javax.annotation.concurrent.GuardedBy; 79 80 /** 81 * Default implementation of {@link io.grpc.Server}, for creation by transports. 82 * 83 * <p>Expected usage (by a theoretical TCP transport): 84 * <pre><code>public class TcpTransportServerFactory { 85 * public static Server newServer(Executor executor, HandlerRegistry registry, 86 * String configuration) { 87 * return new ServerImpl(executor, registry, new TcpTransportServer(configuration)); 88 * } 89 * }</code></pre> 90 * 91 * <p>Starting the server starts the underlying transport for servicing requests. Stopping the 92 * server stops servicing new requests and waits for all connections to terminate. 93 */ 94 public final class ServerImpl extends io.grpc.Server implements InternalInstrumented<ServerStats> { 95 private static final Logger log = Logger.getLogger(ServerImpl.class.getName()); 96 private static final ServerStreamListener NOOP_LISTENER = new NoopListener(); 97 98 private final InternalLogId logId; 99 private final ObjectPool<? extends Executor> executorPool; 100 /** Executor for application processing. Safe to read after {@link #start()}. */ 101 private Executor executor; 102 private final HandlerRegistry registry; 103 private final HandlerRegistry fallbackRegistry; 104 private final List<ServerTransportFilter> transportFilters; 105 // This is iterated on a per-call basis. Use an array instead of a Collection to avoid iterator 106 // creations. 107 private final ServerInterceptor[] interceptors; 108 private final long handshakeTimeoutMillis; 109 @GuardedBy("lock") private boolean started; 110 @GuardedBy("lock") private boolean shutdown; 111 /** non-{@code null} if immediate shutdown has been requested. */ 112 @GuardedBy("lock") private Status shutdownNowStatus; 113 /** {@code true} if ServerListenerImpl.serverShutdown() was called. */ 114 @GuardedBy("lock") private boolean serverShutdownCallbackInvoked; 115 @GuardedBy("lock") private boolean terminated; 116 /** Service encapsulating something similar to an accept() socket. */ 117 private final InternalServer transportServer; 118 private final Object lock = new Object(); 119 @GuardedBy("lock") private boolean transportServersTerminated; 120 /** {@code transportServer} and services encapsulating something similar to a TCP connection. */ 121 @GuardedBy("lock") private final Set<ServerTransport> transports = new HashSet<>(); 122 123 private final Context rootContext; 124 125 private final DecompressorRegistry decompressorRegistry; 126 private final CompressorRegistry compressorRegistry; 127 private final BinaryLog binlog; 128 129 private final InternalChannelz channelz; 130 private final CallTracer serverCallTracer; 131 private final Deadline.Ticker ticker; 132 private final ServerCallExecutorSupplier executorSupplier; 133 134 /** 135 * Construct a server. 136 * 137 * @param builder builder with configuration for server 138 * @param transportServer transport servers that will create new incoming transports 139 * @param rootContext context that callbacks for new RPCs should be derived from 140 */ ServerImpl( ServerImplBuilder builder, InternalServer transportServer, Context rootContext)141 ServerImpl( 142 ServerImplBuilder builder, 143 InternalServer transportServer, 144 Context rootContext) { 145 this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool"); 146 this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder"); 147 this.fallbackRegistry = 148 Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); 149 this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer"); 150 this.logId = 151 InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle())); 152 // Fork from the passed in context so that it does not propagate cancellation, it only 153 // inherits values. 154 this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork(); 155 this.decompressorRegistry = builder.decompressorRegistry; 156 this.compressorRegistry = builder.compressorRegistry; 157 this.transportFilters = Collections.unmodifiableList( 158 new ArrayList<>(builder.transportFilters)); 159 this.interceptors = 160 builder.interceptors.toArray(new ServerInterceptor[builder.interceptors.size()]); 161 this.handshakeTimeoutMillis = builder.handshakeTimeoutMillis; 162 this.binlog = builder.binlog; 163 this.channelz = builder.channelz; 164 this.serverCallTracer = builder.callTracerFactory.create(); 165 this.ticker = checkNotNull(builder.ticker, "ticker"); 166 channelz.addServer(this); 167 this.executorSupplier = builder.executorSupplier; 168 } 169 170 /** 171 * Bind and start the server. 172 * 173 * @return {@code this} object 174 * @throws IllegalStateException if already started 175 * @throws IOException if unable to bind 176 */ 177 @Override start()178 public ServerImpl start() throws IOException { 179 synchronized (lock) { 180 checkState(!started, "Already started"); 181 checkState(!shutdown, "Shutting down"); 182 // Start and wait for any ports to actually be bound. 183 184 ServerListenerImpl listener = new ServerListenerImpl(); 185 transportServer.start(listener); 186 executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); 187 started = true; 188 return this; 189 } 190 } 191 192 193 @Override getPort()194 public int getPort() { 195 synchronized (lock) { 196 checkState(started, "Not started"); 197 checkState(!terminated, "Already terminated"); 198 for (SocketAddress addr: transportServer.getListenSocketAddresses()) { 199 if (addr instanceof InetSocketAddress) { 200 return ((InetSocketAddress) addr).getPort(); 201 } 202 } 203 return -1; 204 } 205 } 206 207 @Override getListenSockets()208 public List<SocketAddress> getListenSockets() { 209 synchronized (lock) { 210 checkState(started, "Not started"); 211 checkState(!terminated, "Already terminated"); 212 return getListenSocketsIgnoringLifecycle(); 213 } 214 } 215 getListenSocketsIgnoringLifecycle()216 private List<SocketAddress> getListenSocketsIgnoringLifecycle() { 217 synchronized (lock) { 218 return Collections.unmodifiableList(transportServer.getListenSocketAddresses()); 219 } 220 } 221 222 @Override getServices()223 public List<ServerServiceDefinition> getServices() { 224 List<ServerServiceDefinition> fallbackServices = fallbackRegistry.getServices(); 225 if (fallbackServices.isEmpty()) { 226 return registry.getServices(); 227 } else { 228 List<ServerServiceDefinition> registryServices = registry.getServices(); 229 int servicesCount = registryServices.size() + fallbackServices.size(); 230 List<ServerServiceDefinition> services = 231 new ArrayList<>(servicesCount); 232 services.addAll(registryServices); 233 services.addAll(fallbackServices); 234 return Collections.unmodifiableList(services); 235 } 236 } 237 238 @Override getImmutableServices()239 public List<ServerServiceDefinition> getImmutableServices() { 240 return registry.getServices(); 241 } 242 243 @Override getMutableServices()244 public List<ServerServiceDefinition> getMutableServices() { 245 return Collections.unmodifiableList(fallbackRegistry.getServices()); 246 } 247 248 /** 249 * Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected. 250 */ 251 @Override shutdown()252 public ServerImpl shutdown() { 253 boolean shutdownTransportServers; 254 synchronized (lock) { 255 if (shutdown) { 256 return this; 257 } 258 shutdown = true; 259 shutdownTransportServers = started; 260 if (!shutdownTransportServers) { 261 transportServersTerminated = true; 262 checkForTermination(); 263 } 264 } 265 if (shutdownTransportServers) { 266 transportServer.shutdown(); 267 } 268 return this; 269 } 270 271 @Override shutdownNow()272 public ServerImpl shutdownNow() { 273 shutdown(); 274 Collection<ServerTransport> transportsCopy; 275 Status nowStatus = Status.UNAVAILABLE.withDescription("Server shutdownNow invoked"); 276 boolean savedServerShutdownCallbackInvoked; 277 synchronized (lock) { 278 // Short-circuiting not strictly necessary, but prevents transports from needing to handle 279 // multiple shutdownNow invocations if shutdownNow is called multiple times. 280 if (shutdownNowStatus != null) { 281 return this; 282 } 283 shutdownNowStatus = nowStatus; 284 transportsCopy = new ArrayList<>(transports); 285 savedServerShutdownCallbackInvoked = serverShutdownCallbackInvoked; 286 } 287 // Short-circuiting not strictly necessary, but prevents transports from needing to handle 288 // multiple shutdownNow invocations, between here and the serverShutdown callback. 289 if (savedServerShutdownCallbackInvoked) { 290 // Have to call shutdownNow, because serverShutdown callback only called shutdown, not 291 // shutdownNow 292 for (ServerTransport transport : transportsCopy) { 293 transport.shutdownNow(nowStatus); 294 } 295 } 296 return this; 297 } 298 299 @Override isShutdown()300 public boolean isShutdown() { 301 synchronized (lock) { 302 return shutdown; 303 } 304 } 305 306 @Override awaitTermination(long timeout, TimeUnit unit)307 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 308 synchronized (lock) { 309 long timeoutNanos = unit.toNanos(timeout); 310 long endTimeNanos = System.nanoTime() + timeoutNanos; 311 while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) { 312 NANOSECONDS.timedWait(lock, timeoutNanos); 313 } 314 return terminated; 315 } 316 } 317 318 @Override awaitTermination()319 public void awaitTermination() throws InterruptedException { 320 synchronized (lock) { 321 while (!terminated) { 322 lock.wait(); 323 } 324 } 325 } 326 327 @Override isTerminated()328 public boolean isTerminated() { 329 synchronized (lock) { 330 return terminated; 331 } 332 } 333 334 /** 335 * Remove transport service from accounting collection and notify of complete shutdown if 336 * necessary. 337 * 338 * @param transport service to remove 339 */ transportClosed(ServerTransport transport)340 private void transportClosed(ServerTransport transport) { 341 synchronized (lock) { 342 if (!transports.remove(transport)) { 343 throw new AssertionError("Transport already removed"); 344 } 345 channelz.removeServerSocket(ServerImpl.this, transport); 346 checkForTermination(); 347 } 348 } 349 350 /** Notify of complete shutdown if necessary. */ checkForTermination()351 private void checkForTermination() { 352 synchronized (lock) { 353 if (shutdown && transports.isEmpty() && transportServersTerminated) { 354 if (terminated) { 355 throw new AssertionError("Server already terminated"); 356 } 357 terminated = true; 358 channelz.removeServer(this); 359 if (executor != null) { 360 executor = executorPool.returnObject(executor); 361 } 362 lock.notifyAll(); 363 } 364 } 365 } 366 367 private final class ServerListenerImpl implements ServerListener { 368 369 @Override transportCreated(ServerTransport transport)370 public ServerTransportListener transportCreated(ServerTransport transport) { 371 synchronized (lock) { 372 transports.add(transport); 373 } 374 ServerTransportListenerImpl stli = new ServerTransportListenerImpl(transport); 375 stli.init(); 376 return stli; 377 } 378 379 @Override serverShutdown()380 public void serverShutdown() { 381 ArrayList<ServerTransport> copiedTransports; 382 Status shutdownNowStatusCopy; 383 synchronized (lock) { 384 if (serverShutdownCallbackInvoked) { 385 return; 386 } 387 388 // transports collection can be modified during shutdown(), even if we hold the lock, due 389 // to reentrancy. 390 copiedTransports = new ArrayList<>(transports); 391 shutdownNowStatusCopy = shutdownNowStatus; 392 serverShutdownCallbackInvoked = true; 393 } 394 for (ServerTransport transport : copiedTransports) { 395 if (shutdownNowStatusCopy == null) { 396 transport.shutdown(); 397 } else { 398 transport.shutdownNow(shutdownNowStatusCopy); 399 } 400 } 401 synchronized (lock) { 402 transportServersTerminated = true; 403 checkForTermination(); 404 } 405 } 406 } 407 408 private final class ServerTransportListenerImpl implements ServerTransportListener { 409 private final ServerTransport transport; 410 private Future<?> handshakeTimeoutFuture; 411 private Attributes attributes; 412 ServerTransportListenerImpl(ServerTransport transport)413 ServerTransportListenerImpl(ServerTransport transport) { 414 this.transport = transport; 415 } 416 init()417 public void init() { 418 class TransportShutdownNow implements Runnable { 419 @Override public void run() { 420 transport.shutdownNow(Status.CANCELLED.withDescription("Handshake timeout exceeded")); 421 } 422 } 423 424 if (handshakeTimeoutMillis != Long.MAX_VALUE) { 425 handshakeTimeoutFuture = transport.getScheduledExecutorService() 426 .schedule(new TransportShutdownNow(), handshakeTimeoutMillis, TimeUnit.MILLISECONDS); 427 } else { 428 // Noop, to avoid triggering Thread creation in InProcessServer 429 handshakeTimeoutFuture = new FutureTask<Void>(new Runnable() { 430 @Override public void run() {} 431 }, null); 432 } 433 channelz.addServerSocket(ServerImpl.this, transport); 434 } 435 436 @Override transportReady(Attributes attributes)437 public Attributes transportReady(Attributes attributes) { 438 handshakeTimeoutFuture.cancel(false); 439 handshakeTimeoutFuture = null; 440 441 for (ServerTransportFilter filter : transportFilters) { 442 attributes = Preconditions.checkNotNull(filter.transportReady(attributes), 443 "Filter %s returned null", filter); 444 } 445 this.attributes = attributes; 446 return attributes; 447 } 448 449 @Override transportTerminated()450 public void transportTerminated() { 451 if (handshakeTimeoutFuture != null) { 452 handshakeTimeoutFuture.cancel(false); 453 handshakeTimeoutFuture = null; 454 } 455 for (ServerTransportFilter filter : transportFilters) { 456 filter.transportTerminated(attributes); 457 } 458 transportClosed(transport); 459 } 460 461 462 @Override streamCreated(ServerStream stream, String methodName, Metadata headers)463 public void streamCreated(ServerStream stream, String methodName, Metadata headers) { 464 Tag tag = PerfMark.createTag(methodName, stream.streamId()); 465 try (TaskCloseable ignore = PerfMark.traceTask("ServerTransportListener.streamCreated")) { 466 PerfMark.attachTag(tag); 467 streamCreatedInternal(stream, methodName, headers, tag); 468 } 469 } 470 streamCreatedInternal( final ServerStream stream, final String methodName, final Metadata headers, final Tag tag)471 private void streamCreatedInternal( 472 final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) { 473 final Executor wrappedExecutor; 474 // This is a performance optimization that avoids the synchronization and queuing overhead 475 // that comes with SerializingExecutor. 476 if (executorSupplier != null || executor != directExecutor()) { 477 wrappedExecutor = new SerializingExecutor(executor); 478 } else { 479 wrappedExecutor = new SerializeReentrantCallsDirectExecutor(); 480 stream.optimizeForDirectExecutor(); 481 } 482 483 if (headers.containsKey(MESSAGE_ENCODING_KEY)) { 484 String encoding = headers.get(MESSAGE_ENCODING_KEY); 485 Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding); 486 if (decompressor == null) { 487 stream.setListener(NOOP_LISTENER); 488 stream.close( 489 Status.UNIMPLEMENTED.withDescription( 490 String.format("Can't find decompressor for %s", encoding)), 491 new Metadata()); 492 return; 493 } 494 stream.setDecompressor(decompressor); 495 } 496 497 final StatsTraceContext statsTraceCtx = Preconditions.checkNotNull( 498 stream.statsTraceContext(), "statsTraceCtx not present from stream"); 499 500 final Context.CancellableContext context = createContext(headers, statsTraceCtx); 501 502 final Link link = PerfMark.linkOut(); 503 504 final JumpToApplicationThreadServerStreamListener jumpListener 505 = new JumpToApplicationThreadServerStreamListener( 506 wrappedExecutor, executor, stream, context, tag); 507 stream.setListener(jumpListener); 508 final SettableFuture<ServerCallParameters<?,?>> future = SettableFuture.create(); 509 // Run in serializing executor so jumpListener.setListener() is called before any callbacks 510 // are delivered, including any errors. MethodLookup() and HandleServerCall() are proactively 511 // queued before any callbacks are queued at serializing executor. 512 // MethodLookup() runs on the default executor. 513 // When executorSupplier is enabled, MethodLookup() may set/change the executor in the 514 // SerializingExecutor before it finishes running. 515 // Then HandleServerCall() and callbacks would switch to the executorSupplier executor. 516 // Otherwise, they all run on the default executor. 517 518 final class MethodLookup extends ContextRunnable { 519 MethodLookup() { 520 super(context); 521 } 522 523 @Override 524 public void runInContext() { 525 try (TaskCloseable ignore = 526 PerfMark.traceTask("ServerTransportListener$MethodLookup.startCall")) { 527 PerfMark.attachTag(tag); 528 PerfMark.linkIn(link); 529 runInternal(); 530 } 531 } 532 533 private void runInternal() { 534 ServerMethodDefinition<?, ?> wrapMethod; 535 ServerCallParameters<?, ?> callParams; 536 try { 537 ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName); 538 if (method == null) { 539 method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority()); 540 } 541 if (method == null) { 542 Status status = Status.UNIMPLEMENTED.withDescription( 543 "Method not found: " + methodName); 544 // TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in 545 // memory as a map whose key is the method name, this would allow a misbehaving 546 // client to blow up the server in-memory stats storage by sending large number of 547 // distinct unimplemented method 548 // names. (https://github.com/grpc/grpc-java/issues/2285) 549 jumpListener.setListener(NOOP_LISTENER); 550 stream.close(status, new Metadata()); 551 context.cancel(null); 552 future.cancel(false); 553 return; 554 } 555 wrapMethod = wrapMethod(stream, method, statsTraceCtx); 556 callParams = maySwitchExecutor(wrapMethod, stream, headers, context, tag); 557 future.set(callParams); 558 } catch (Throwable t) { 559 jumpListener.setListener(NOOP_LISTENER); 560 stream.close(Status.fromThrowable(t), new Metadata()); 561 context.cancel(null); 562 future.cancel(false); 563 throw t; 564 } 565 } 566 567 private <ReqT, RespT> ServerCallParameters<ReqT, RespT> maySwitchExecutor( 568 final ServerMethodDefinition<ReqT, RespT> methodDef, 569 final ServerStream stream, 570 final Metadata headers, 571 final Context.CancellableContext context, 572 final Tag tag) { 573 final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<>( 574 stream, 575 methodDef.getMethodDescriptor(), 576 headers, 577 context, 578 decompressorRegistry, 579 compressorRegistry, 580 serverCallTracer, 581 tag); 582 if (executorSupplier != null) { 583 Executor switchingExecutor = executorSupplier.getExecutor(call, headers); 584 if (switchingExecutor != null) { 585 ((SerializingExecutor)wrappedExecutor).setExecutor(switchingExecutor); 586 } 587 } 588 return new ServerCallParameters<>(call, methodDef.getServerCallHandler()); 589 } 590 } 591 592 final class HandleServerCall extends ContextRunnable { 593 HandleServerCall() { 594 super(context); 595 } 596 597 @Override 598 public void runInContext() { 599 try (TaskCloseable ignore = 600 PerfMark.traceTask("ServerTransportListener$HandleServerCall.startCall")) { 601 PerfMark.linkIn(link); 602 PerfMark.attachTag(tag); 603 runInternal(); 604 } 605 } 606 607 private void runInternal() { 608 ServerStreamListener listener = NOOP_LISTENER; 609 if (future.isCancelled()) { 610 return; 611 } 612 try { 613 listener = startWrappedCall(methodName, Futures.getDone(future), headers); 614 } catch (Throwable ex) { 615 stream.close(Status.fromThrowable(ex), new Metadata()); 616 context.cancel(null); 617 throw new IllegalStateException(ex); 618 } finally { 619 jumpListener.setListener(listener); 620 } 621 622 // An extremely short deadline may expire before stream.setListener(jumpListener). 623 // This causes NPE as in issue: https://github.com/grpc/grpc-java/issues/6300 624 // Delay of setting cancellationListener to context will fix the issue. 625 final class ServerStreamCancellationListener implements Context.CancellationListener { 626 @Override 627 public void cancelled(Context context) { 628 Status status = statusFromCancelled(context); 629 if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) { 630 // This should rarely get run, since the client will likely cancel the stream 631 // before the timeout is reached. 632 stream.cancel(status); 633 } 634 } 635 } 636 637 context.addListener(new ServerStreamCancellationListener(), directExecutor()); 638 } 639 } 640 641 wrappedExecutor.execute(new MethodLookup()); 642 wrappedExecutor.execute(new HandleServerCall()); 643 } 644 createContext( Metadata headers, StatsTraceContext statsTraceCtx)645 private Context.CancellableContext createContext( 646 Metadata headers, StatsTraceContext statsTraceCtx) { 647 Long timeoutNanos = headers.get(TIMEOUT_KEY); 648 649 Context baseContext = 650 statsTraceCtx 651 .serverFilterContext(rootContext) 652 .withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this); 653 654 if (timeoutNanos == null) { 655 return baseContext.withCancellation(); 656 } 657 658 Context.CancellableContext context = 659 baseContext.withDeadline( 660 Deadline.after(timeoutNanos, NANOSECONDS, ticker), 661 transport.getScheduledExecutorService()); 662 663 return context; 664 } 665 666 /** Never returns {@code null}. */ wrapMethod(ServerStream stream, ServerMethodDefinition<ReqT, RespT> methodDef, StatsTraceContext statsTraceCtx)667 private <ReqT, RespT> ServerMethodDefinition<?,?> wrapMethod(ServerStream stream, 668 ServerMethodDefinition<ReqT, RespT> methodDef, StatsTraceContext statsTraceCtx) { 669 // TODO(ejona86): should we update fullMethodName to have the canonical path of the method? 670 statsTraceCtx.serverCallStarted( 671 new ServerCallInfoImpl<>( 672 methodDef.getMethodDescriptor(), // notify with original method descriptor 673 stream.getAttributes(), 674 stream.getAuthority())); 675 ServerCallHandler<ReqT, RespT> handler = methodDef.getServerCallHandler(); 676 for (ServerInterceptor interceptor : interceptors) { 677 handler = InternalServerInterceptors.interceptCallHandlerCreate(interceptor, handler); 678 } 679 ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler); 680 ServerMethodDefinition<?, ?> wMethodDef = binlog == null 681 ? interceptedDef : binlog.wrapMethodDefinition(interceptedDef); 682 return wMethodDef; 683 } 684 685 private final class ServerCallParameters<ReqT, RespT> { 686 ServerCallImpl<ReqT, RespT> call; 687 ServerCallHandler<ReqT, RespT> callHandler; 688 ServerCallParameters(ServerCallImpl<ReqT, RespT> call, ServerCallHandler<ReqT, RespT> callHandler)689 public ServerCallParameters(ServerCallImpl<ReqT, RespT> call, 690 ServerCallHandler<ReqT, RespT> callHandler) { 691 this.call = call; 692 this.callHandler = callHandler; 693 } 694 } 695 startWrappedCall( String fullMethodName, ServerCallParameters<WReqT, WRespT> params, Metadata headers)696 private <WReqT, WRespT> ServerStreamListener startWrappedCall( 697 String fullMethodName, 698 ServerCallParameters<WReqT, WRespT> params, 699 Metadata headers) { 700 ServerCall.Listener<WReqT> callListener = 701 params.callHandler.startCall(params.call, headers); 702 if (callListener == null) { 703 throw new NullPointerException( 704 "startCall() returned a null listener for method " + fullMethodName); 705 } 706 return params.call.newServerStreamListener(callListener); 707 } 708 } 709 710 @Override getLogId()711 public InternalLogId getLogId() { 712 return logId; 713 } 714 715 @Override getStats()716 public ListenableFuture<ServerStats> getStats() { 717 ServerStats.Builder builder = new ServerStats.Builder(); 718 List<InternalInstrumented<SocketStats>> stats = transportServer.getListenSocketStatsList(); 719 if (stats != null ) { 720 builder.addListenSockets(stats); 721 } 722 serverCallTracer.updateBuilder(builder); 723 SettableFuture<ServerStats> ret = SettableFuture.create(); 724 ret.set(builder.build()); 725 return ret; 726 } 727 728 @Override toString()729 public String toString() { 730 return MoreObjects.toStringHelper(this) 731 .add("logId", logId.getId()) 732 .add("transportServer", transportServer) 733 .toString(); 734 } 735 736 private static final class NoopListener implements ServerStreamListener { 737 @Override messagesAvailable(MessageProducer producer)738 public void messagesAvailable(MessageProducer producer) { 739 InputStream message; 740 while ((message = producer.next()) != null) { 741 try { 742 message.close(); 743 } catch (IOException e) { 744 // Close any remaining messages 745 while ((message = producer.next()) != null) { 746 try { 747 message.close(); 748 } catch (IOException ioException) { 749 // just log additional exceptions as we are already going to throw 750 log.log(Level.WARNING, "Exception closing stream", ioException); 751 } 752 } 753 throw new RuntimeException(e); 754 } 755 } 756 } 757 758 @Override halfClosed()759 public void halfClosed() {} 760 761 @Override closed(Status status)762 public void closed(Status status) {} 763 764 @Override onReady()765 public void onReady() {} 766 } 767 768 /** 769 * Dispatches callbacks onto an application-provided executor and correctly propagates 770 * exceptions. 771 */ 772 @VisibleForTesting 773 static final class JumpToApplicationThreadServerStreamListener implements ServerStreamListener { 774 private final Executor callExecutor; 775 private final Executor cancelExecutor; 776 private final Context.CancellableContext context; 777 private final ServerStream stream; 778 private final Tag tag; 779 // Only accessed from callExecutor. 780 private ServerStreamListener listener; 781 JumpToApplicationThreadServerStreamListener(Executor executor, Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag)782 public JumpToApplicationThreadServerStreamListener(Executor executor, 783 Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) { 784 this.callExecutor = executor; 785 this.cancelExecutor = cancelExecutor; 786 this.stream = stream; 787 this.context = context; 788 this.tag = tag; 789 } 790 791 /** 792 * This call MUST be serialized on callExecutor to avoid races. 793 */ getListener()794 private ServerStreamListener getListener() { 795 if (listener == null) { 796 throw new IllegalStateException("listener unset"); 797 } 798 return listener; 799 } 800 801 @VisibleForTesting setListener(ServerStreamListener listener)802 void setListener(ServerStreamListener listener) { 803 Preconditions.checkNotNull(listener, "listener must not be null"); 804 Preconditions.checkState(this.listener == null, "Listener already set"); 805 this.listener = listener; 806 } 807 808 /** 809 * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. 810 */ internalClose(Throwable t)811 private void internalClose(Throwable t) { 812 // TODO(ejona86): this is not thread-safe :) 813 stream.close(Status.UNKNOWN.withCause(t), new Metadata()); 814 } 815 816 @Override messagesAvailable(final MessageProducer producer)817 public void messagesAvailable(final MessageProducer producer) { 818 try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.messagesAvailable")) { 819 PerfMark.attachTag(tag); 820 final Link link = PerfMark.linkOut(); 821 final class MessagesAvailable extends ContextRunnable { 822 823 MessagesAvailable() { 824 super(context); 825 } 826 827 @Override 828 public void runInContext() { 829 try (TaskCloseable ignore = 830 PerfMark.traceTask("ServerCallListener(app).messagesAvailable")) { 831 PerfMark.attachTag(tag); 832 PerfMark.linkIn(link); 833 getListener().messagesAvailable(producer); 834 } catch (Throwable t) { 835 internalClose(t); 836 throw t; 837 } 838 } 839 } 840 841 callExecutor.execute(new MessagesAvailable()); 842 } 843 } 844 845 @Override halfClosed()846 public void halfClosed() { 847 try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.halfClosed")) { 848 PerfMark.attachTag(tag); 849 final Link link = PerfMark.linkOut(); 850 final class HalfClosed extends ContextRunnable { 851 HalfClosed() { 852 super(context); 853 } 854 855 @Override 856 public void runInContext() { 857 try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).halfClosed")) { 858 PerfMark.attachTag(tag); 859 PerfMark.linkIn(link); 860 getListener().halfClosed(); 861 } catch (Throwable t) { 862 internalClose(t); 863 throw t; 864 } 865 } 866 } 867 868 callExecutor.execute(new HalfClosed()); 869 } 870 } 871 872 @Override closed(final Status status)873 public void closed(final Status status) { 874 try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.closed")) { 875 PerfMark.attachTag(tag); 876 closedInternal(status); 877 } 878 } 879 closedInternal(final Status status)880 private void closedInternal(final Status status) { 881 // For cancellations, promptly inform any users of the context that their work should be 882 // aborted. Otherwise, we can wait until pending work is done. 883 if (!status.isOk()) { 884 // Since status was not OK we know that the call did not complete and got cancelled. To 885 // reflect this on the context we need to close it with a cause exception. Since not every 886 // failed status has an exception we will create one here if needed. 887 Throwable cancelCause = status.getCause(); 888 if (cancelCause == null) { 889 cancelCause = InternalStatus.asRuntimeException( 890 Status.CANCELLED.withDescription("RPC cancelled"), null, false); 891 } 892 893 // The callExecutor might be busy doing user work. To avoid waiting, use an executor that 894 // is not serializing. 895 cancelExecutor.execute(new ContextCloser(context, cancelCause)); 896 } 897 final Link link = PerfMark.linkOut(); 898 899 final class Closed extends ContextRunnable { 900 Closed() { 901 super(context); 902 } 903 904 @Override 905 public void runInContext() { 906 try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).closed")) { 907 PerfMark.attachTag(tag); 908 PerfMark.linkIn(link); 909 getListener().closed(status); 910 } 911 } 912 } 913 914 callExecutor.execute(new Closed()); 915 } 916 917 @Override onReady()918 public void onReady() { 919 try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.onReady")) { 920 PerfMark.attachTag(tag); 921 final Link link = PerfMark.linkOut(); 922 923 final class OnReady extends ContextRunnable { 924 OnReady() { 925 super(context); 926 } 927 928 @Override 929 public void runInContext() { 930 try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).onReady")) { 931 PerfMark.attachTag(tag); 932 PerfMark.linkIn(link); 933 getListener().onReady(); 934 } catch (Throwable t) { 935 internalClose(t); 936 throw t; 937 } 938 } 939 } 940 941 callExecutor.execute(new OnReady()); 942 } 943 } 944 } 945 946 @VisibleForTesting 947 static final class ContextCloser implements Runnable { 948 private final Context.CancellableContext context; 949 private final Throwable cause; 950 ContextCloser(Context.CancellableContext context, Throwable cause)951 ContextCloser(Context.CancellableContext context, Throwable cause) { 952 this.context = context; 953 this.cause = cause; 954 } 955 956 @Override run()957 public void run() { 958 context.cancel(cause); 959 } 960 } 961 } 962