1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.stub; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import com.google.common.base.Preconditions; 24 import io.grpc.Metadata; 25 import io.grpc.MethodDescriptor; 26 import io.grpc.ServerCall; 27 import io.grpc.ServerCallHandler; 28 import io.grpc.Status; 29 30 /** 31 * Utility functions for adapting {@link ServerCallHandler}s to application service implementation, 32 * meant to be used by the generated code. 33 */ 34 public final class ServerCalls { 35 36 @VisibleForTesting 37 static final String TOO_MANY_REQUESTS = "Too many requests"; 38 @VisibleForTesting 39 static final String MISSING_REQUEST = "Half-closed without a request"; 40 ServerCalls()41 private ServerCalls() { 42 } 43 44 /** 45 * Creates a {@link ServerCallHandler} for a unary call method of the service. 46 * 47 * @param method an adaptor to the actual method on the service implementation. 48 */ asyncUnaryCall( UnaryMethod<ReqT, RespT> method)49 public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall( 50 UnaryMethod<ReqT, RespT> method) { 51 return new UnaryServerCallHandler<>(method, false); 52 } 53 54 /** 55 * Creates a {@link ServerCallHandler} for a server streaming method of the service. 56 * 57 * @param method an adaptor to the actual method on the service implementation. 58 */ asyncServerStreamingCall( ServerStreamingMethod<ReqT, RespT> method)59 public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingCall( 60 ServerStreamingMethod<ReqT, RespT> method) { 61 return new UnaryServerCallHandler<>(method, true); 62 } 63 64 /** 65 * Creates a {@link ServerCallHandler} for a client streaming method of the service. 66 * 67 * @param method an adaptor to the actual method on the service implementation. 68 */ asyncClientStreamingCall( ClientStreamingMethod<ReqT, RespT> method)69 public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingCall( 70 ClientStreamingMethod<ReqT, RespT> method) { 71 return new StreamingServerCallHandler<>(method, false); 72 } 73 74 /** 75 * Creates a {@link ServerCallHandler} for a bidi streaming method of the service. 76 * 77 * @param method an adaptor to the actual method on the service implementation. 78 */ asyncBidiStreamingCall( BidiStreamingMethod<ReqT, RespT> method)79 public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncBidiStreamingCall( 80 BidiStreamingMethod<ReqT, RespT> method) { 81 return new StreamingServerCallHandler<>(method, true); 82 } 83 84 /** 85 * Adaptor to a unary call method. 86 */ 87 public interface UnaryMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> { invoke(ReqT request, StreamObserver<RespT> responseObserver)88 @Override void invoke(ReqT request, StreamObserver<RespT> responseObserver); 89 } 90 91 /** 92 * Adaptor to a server streaming method. 93 */ 94 public interface ServerStreamingMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> { invoke(ReqT request, StreamObserver<RespT> responseObserver)95 @Override void invoke(ReqT request, StreamObserver<RespT> responseObserver); 96 } 97 98 /** 99 * Adaptor to a client streaming method. 100 */ 101 public interface ClientStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> { invoke(StreamObserver<RespT> responseObserver)102 @Override StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver); 103 } 104 105 /** 106 * Adaptor to a bidirectional streaming method. 107 */ 108 public interface BidiStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> { invoke(StreamObserver<RespT> responseObserver)109 @Override StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver); 110 } 111 112 private static final class UnaryServerCallHandler<ReqT, RespT> 113 implements ServerCallHandler<ReqT, RespT> { 114 115 private final UnaryRequestMethod<ReqT, RespT> method; 116 private final boolean serverStreaming; 117 118 // Non private to avoid synthetic class UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method, boolean serverStreaming)119 UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method, boolean serverStreaming) { 120 this.method = method; 121 this.serverStreaming = serverStreaming; 122 } 123 124 @Override startCall(ServerCall<ReqT, RespT> call, Metadata headers)125 public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) { 126 Preconditions.checkArgument( 127 call.getMethodDescriptor().getType().clientSendsOneMessage(), 128 "asyncUnaryRequestCall is only for clientSendsOneMessage methods"); 129 ServerCallStreamObserverImpl<ReqT, RespT> responseObserver = 130 new ServerCallStreamObserverImpl<>(call, serverStreaming); 131 // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client 132 // sends more than 1 requests, ServerCall will catch it. Note that disabling auto 133 // inbound flow control has no effect on unary calls. 134 call.request(2); 135 return new UnaryServerCallListener(responseObserver, call); 136 } 137 138 private final class UnaryServerCallListener extends ServerCall.Listener<ReqT> { 139 private final ServerCall<ReqT, RespT> call; 140 private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver; 141 private boolean canInvoke = true; 142 private boolean wasReady; 143 private ReqT request; 144 145 // Non private to avoid synthetic class UnaryServerCallListener( ServerCallStreamObserverImpl<ReqT, RespT> responseObserver, ServerCall<ReqT, RespT> call)146 UnaryServerCallListener( 147 ServerCallStreamObserverImpl<ReqT, RespT> responseObserver, 148 ServerCall<ReqT, RespT> call) { 149 this.call = call; 150 this.responseObserver = responseObserver; 151 } 152 153 @Override onMessage(ReqT request)154 public void onMessage(ReqT request) { 155 if (this.request != null) { 156 // Safe to close the call, because the application has not yet been invoked 157 call.close( 158 Status.INTERNAL.withDescription(TOO_MANY_REQUESTS), 159 new Metadata()); 160 canInvoke = false; 161 return; 162 } 163 164 // We delay calling method.invoke() until onHalfClose() to make sure the client 165 // half-closes. 166 this.request = request; 167 } 168 169 @Override onHalfClose()170 public void onHalfClose() { 171 if (!canInvoke) { 172 return; 173 } 174 if (request == null) { 175 // Safe to close the call, because the application has not yet been invoked 176 call.close( 177 Status.INTERNAL.withDescription(MISSING_REQUEST), 178 new Metadata()); 179 return; 180 } 181 182 method.invoke(request, responseObserver); 183 request = null; 184 responseObserver.freeze(); 185 if (wasReady) { 186 // Since we are calling invoke in halfClose we have missed the onReady 187 // event from the transport so recover it here. 188 onReady(); 189 } 190 } 191 192 @Override onCancel()193 public void onCancel() { 194 if (responseObserver.onCancelHandler != null) { 195 responseObserver.onCancelHandler.run(); 196 } else { 197 // Only trigger exceptions if unable to provide notification via a callback 198 responseObserver.cancelled = true; 199 } 200 } 201 202 @Override onReady()203 public void onReady() { 204 wasReady = true; 205 if (responseObserver.onReadyHandler != null) { 206 responseObserver.onReadyHandler.run(); 207 } 208 } 209 210 @Override onComplete()211 public void onComplete() { 212 if (responseObserver.onCloseHandler != null) { 213 responseObserver.onCloseHandler.run(); 214 } 215 } 216 } 217 } 218 219 private static final class StreamingServerCallHandler<ReqT, RespT> 220 implements ServerCallHandler<ReqT, RespT> { 221 222 private final StreamingRequestMethod<ReqT, RespT> method; 223 private final boolean bidi; 224 225 // Non private to avoid synthetic class StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method, boolean bidi)226 StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method, boolean bidi) { 227 this.method = method; 228 this.bidi = bidi; 229 } 230 231 @Override startCall(ServerCall<ReqT, RespT> call, Metadata headers)232 public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) { 233 ServerCallStreamObserverImpl<ReqT, RespT> responseObserver = 234 new ServerCallStreamObserverImpl<>(call, bidi); 235 StreamObserver<ReqT> requestObserver = method.invoke(responseObserver); 236 responseObserver.freeze(); 237 if (responseObserver.autoRequestEnabled) { 238 call.request(1); 239 } 240 return new StreamingServerCallListener(requestObserver, responseObserver, call); 241 } 242 243 private final class StreamingServerCallListener extends ServerCall.Listener<ReqT> { 244 245 private final StreamObserver<ReqT> requestObserver; 246 private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver; 247 private final ServerCall<ReqT, RespT> call; 248 private boolean halfClosed = false; 249 250 // Non private to avoid synthetic class StreamingServerCallListener( StreamObserver<ReqT> requestObserver, ServerCallStreamObserverImpl<ReqT, RespT> responseObserver, ServerCall<ReqT, RespT> call)251 StreamingServerCallListener( 252 StreamObserver<ReqT> requestObserver, 253 ServerCallStreamObserverImpl<ReqT, RespT> responseObserver, 254 ServerCall<ReqT, RespT> call) { 255 this.requestObserver = requestObserver; 256 this.responseObserver = responseObserver; 257 this.call = call; 258 } 259 260 @Override onMessage(ReqT request)261 public void onMessage(ReqT request) { 262 requestObserver.onNext(request); 263 264 // Request delivery of the next inbound message. 265 if (responseObserver.autoRequestEnabled) { 266 call.request(1); 267 } 268 } 269 270 @Override onHalfClose()271 public void onHalfClose() { 272 halfClosed = true; 273 requestObserver.onCompleted(); 274 } 275 276 @Override onCancel()277 public void onCancel() { 278 if (responseObserver.onCancelHandler != null) { 279 responseObserver.onCancelHandler.run(); 280 } else { 281 // Only trigger exceptions if unable to provide notification via a callback. Even though 282 // onError would provide notification to the server, we still throw an error since there 283 // isn't a guaranteed callback available. If the cancellation happened in a different 284 // order the service could be surprised to see the exception. 285 responseObserver.cancelled = true; 286 } 287 if (!halfClosed) { 288 requestObserver.onError( 289 Status.CANCELLED 290 .withDescription("client cancelled") 291 .asRuntimeException()); 292 } 293 } 294 295 @Override onReady()296 public void onReady() { 297 if (responseObserver.onReadyHandler != null) { 298 responseObserver.onReadyHandler.run(); 299 } 300 } 301 302 @Override onComplete()303 public void onComplete() { 304 if (responseObserver.onCloseHandler != null) { 305 responseObserver.onCloseHandler.run(); 306 } 307 } 308 } 309 } 310 311 private interface UnaryRequestMethod<ReqT, RespT> { 312 /** 313 * The provided {@code responseObserver} will extend {@link ServerCallStreamObserver}. 314 */ invoke(ReqT request, StreamObserver<RespT> responseObserver)315 void invoke(ReqT request, StreamObserver<RespT> responseObserver); 316 } 317 318 private interface StreamingRequestMethod<ReqT, RespT> { 319 /** 320 * The provided {@code responseObserver} will extend {@link ServerCallStreamObserver}. 321 */ invoke(StreamObserver<RespT> responseObserver)322 StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver); 323 } 324 325 private static final class ServerCallStreamObserverImpl<ReqT, RespT> 326 extends ServerCallStreamObserver<RespT> { 327 final ServerCall<ReqT, RespT> call; 328 private final boolean serverStreamingOrBidi; 329 volatile boolean cancelled; 330 private boolean frozen; 331 private boolean autoRequestEnabled = true; 332 private boolean sentHeaders; 333 private Runnable onReadyHandler; 334 private Runnable onCancelHandler; 335 private boolean aborted = false; 336 private boolean completed = false; 337 private Runnable onCloseHandler; 338 339 // Non private to avoid synthetic class ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call, boolean serverStreamingOrBidi)340 ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call, boolean serverStreamingOrBidi) { 341 this.call = call; 342 this.serverStreamingOrBidi = serverStreamingOrBidi; 343 } 344 freeze()345 private void freeze() { 346 this.frozen = true; 347 } 348 349 @Override setMessageCompression(boolean enable)350 public void setMessageCompression(boolean enable) { 351 call.setMessageCompression(enable); 352 } 353 354 @Override setCompression(String compression)355 public void setCompression(String compression) { 356 call.setCompression(compression); 357 } 358 359 @Override onNext(RespT response)360 public void onNext(RespT response) { 361 if (cancelled) { 362 if (serverStreamingOrBidi) { 363 throw Status.CANCELLED 364 .withDescription("call already cancelled. " 365 + "Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception") 366 .asRuntimeException(); 367 } else { 368 // We choose not to throw for unary responses. The exception is intended to stop servers 369 // from continuing processing, but for unary responses there is no further processing 370 // so throwing an exception would not provide a benefit and would increase application 371 // complexity. 372 } 373 } 374 checkState(!aborted, "Stream was terminated by error, no further calls are allowed"); 375 checkState(!completed, "Stream is already completed, no further calls are allowed"); 376 if (!sentHeaders) { 377 call.sendHeaders(new Metadata()); 378 sentHeaders = true; 379 } 380 call.sendMessage(response); 381 } 382 383 @Override onError(Throwable t)384 public void onError(Throwable t) { 385 Metadata metadata = Status.trailersFromThrowable(t); 386 if (metadata == null) { 387 metadata = new Metadata(); 388 } 389 call.close(Status.fromThrowable(t), metadata); 390 aborted = true; 391 } 392 393 @Override onCompleted()394 public void onCompleted() { 395 call.close(Status.OK, new Metadata()); 396 completed = true; 397 } 398 399 @Override isReady()400 public boolean isReady() { 401 return call.isReady(); 402 } 403 404 @Override setOnReadyHandler(Runnable r)405 public void setOnReadyHandler(Runnable r) { 406 checkState(!frozen, "Cannot alter onReadyHandler after initialization. May only be called " 407 + "during the initial call to the application, before the service returns its " 408 + "StreamObserver"); 409 this.onReadyHandler = r; 410 } 411 412 @Override isCancelled()413 public boolean isCancelled() { 414 return call.isCancelled(); 415 } 416 417 @Override setOnCancelHandler(Runnable onCancelHandler)418 public void setOnCancelHandler(Runnable onCancelHandler) { 419 checkState(!frozen, "Cannot alter onCancelHandler after initialization. May only be called " 420 + "during the initial call to the application, before the service returns its " 421 + "StreamObserver"); 422 this.onCancelHandler = onCancelHandler; 423 } 424 425 @Override disableAutoInboundFlowControl()426 public void disableAutoInboundFlowControl() { 427 disableAutoRequest(); 428 } 429 430 @Override disableAutoRequest()431 public void disableAutoRequest() { 432 checkState(!frozen, "Cannot disable auto flow control after initialization"); 433 autoRequestEnabled = false; 434 } 435 436 @Override request(int count)437 public void request(int count) { 438 call.request(count); 439 } 440 441 @Override setOnCloseHandler(Runnable onCloseHandler)442 public void setOnCloseHandler(Runnable onCloseHandler) { 443 checkState(!frozen, "Cannot alter onCloseHandler after initialization. May only be called " 444 + "during the initial call to the application, before the service returns its " 445 + "StreamObserver"); 446 this.onCloseHandler = onCloseHandler; 447 } 448 } 449 450 /** 451 * Sets unimplemented status for method on given response stream for unary call. 452 * 453 * @param methodDescriptor of method for which error will be thrown. 454 * @param responseObserver on which error will be set. 455 */ asyncUnimplementedUnaryCall( MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver)456 public static void asyncUnimplementedUnaryCall( 457 MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) { 458 checkNotNull(methodDescriptor, "methodDescriptor"); 459 checkNotNull(responseObserver, "responseObserver"); 460 responseObserver.onError(Status.UNIMPLEMENTED 461 .withDescription(String.format("Method %s is unimplemented", 462 methodDescriptor.getFullMethodName())) 463 .asRuntimeException()); 464 } 465 466 /** 467 * Sets unimplemented status for streaming call. 468 * 469 * @param methodDescriptor of method for which error will be thrown. 470 * @param responseObserver on which error will be set. 471 */ asyncUnimplementedStreamingCall( MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver)472 public static <ReqT> StreamObserver<ReqT> asyncUnimplementedStreamingCall( 473 MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) { 474 // NB: For streaming call we want to do the same as for unary call. Fail-fast by setting error 475 // on responseObserver and then return no-op observer. 476 asyncUnimplementedUnaryCall(methodDescriptor, responseObserver); 477 return new NoopStreamObserver<>(); 478 } 479 480 /** 481 * No-op implementation of StreamObserver. Used in abstract stubs for default implementations of 482 * methods which throws UNIMPLEMENTED error and tests. 483 */ 484 static class NoopStreamObserver<V> implements StreamObserver<V> { 485 @Override onNext(V value)486 public void onNext(V value) { 487 } 488 489 @Override onError(Throwable t)490 public void onError(Throwable t) { 491 } 492 493 @Override onCompleted()494 public void onCompleted() { 495 } 496 } 497 } 498