• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.stub;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Preconditions;
24 import io.grpc.Metadata;
25 import io.grpc.MethodDescriptor;
26 import io.grpc.ServerCall;
27 import io.grpc.ServerCallHandler;
28 import io.grpc.Status;
29 
30 /**
31  * Utility functions for adapting {@link ServerCallHandler}s to application service implementation,
32  * meant to be used by the generated code.
33  */
34 public final class ServerCalls {
35 
36   @VisibleForTesting
37   static final String TOO_MANY_REQUESTS = "Too many requests";
38   @VisibleForTesting
39   static final String MISSING_REQUEST = "Half-closed without a request";
40 
ServerCalls()41   private ServerCalls() {
42   }
43 
44   /**
45    * Creates a {@link ServerCallHandler} for a unary call method of the service.
46    *
47    * @param method an adaptor to the actual method on the service implementation.
48    */
asyncUnaryCall( UnaryMethod<ReqT, RespT> method)49   public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall(
50       UnaryMethod<ReqT, RespT> method) {
51     return new UnaryServerCallHandler<>(method, false);
52   }
53 
54   /**
55    * Creates a {@link ServerCallHandler} for a server streaming method of the service.
56    *
57    * @param method an adaptor to the actual method on the service implementation.
58    */
asyncServerStreamingCall( ServerStreamingMethod<ReqT, RespT> method)59   public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingCall(
60       ServerStreamingMethod<ReqT, RespT> method) {
61     return new UnaryServerCallHandler<>(method, true);
62   }
63 
64   /**
65    * Creates a {@link ServerCallHandler} for a client streaming method of the service.
66    *
67    * @param method an adaptor to the actual method on the service implementation.
68    */
asyncClientStreamingCall( ClientStreamingMethod<ReqT, RespT> method)69   public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingCall(
70       ClientStreamingMethod<ReqT, RespT> method) {
71     return new StreamingServerCallHandler<>(method, false);
72   }
73 
74   /**
75    * Creates a {@link ServerCallHandler} for a bidi streaming method of the service.
76    *
77    * @param method an adaptor to the actual method on the service implementation.
78    */
asyncBidiStreamingCall( BidiStreamingMethod<ReqT, RespT> method)79   public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncBidiStreamingCall(
80       BidiStreamingMethod<ReqT, RespT> method) {
81     return new StreamingServerCallHandler<>(method, true);
82   }
83 
84   /**
85    * Adaptor to a unary call method.
86    */
87   public interface UnaryMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> {
invoke(ReqT request, StreamObserver<RespT> responseObserver)88     @Override void invoke(ReqT request, StreamObserver<RespT> responseObserver);
89   }
90 
91   /**
92    * Adaptor to a server streaming method.
93    */
94   public interface ServerStreamingMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> {
invoke(ReqT request, StreamObserver<RespT> responseObserver)95     @Override void invoke(ReqT request, StreamObserver<RespT> responseObserver);
96   }
97 
98   /**
99    * Adaptor to a client streaming method.
100    */
101   public interface ClientStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {
invoke(StreamObserver<RespT> responseObserver)102     @Override StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
103   }
104 
105   /**
106    * Adaptor to a bidirectional streaming method.
107    */
108   public interface BidiStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {
invoke(StreamObserver<RespT> responseObserver)109     @Override StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
110   }
111 
112   private static final class UnaryServerCallHandler<ReqT, RespT>
113       implements ServerCallHandler<ReqT, RespT> {
114 
115     private final UnaryRequestMethod<ReqT, RespT> method;
116     private final boolean serverStreaming;
117 
118     // Non private to avoid synthetic class
UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method, boolean serverStreaming)119     UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method, boolean serverStreaming) {
120       this.method = method;
121       this.serverStreaming = serverStreaming;
122     }
123 
124     @Override
startCall(ServerCall<ReqT, RespT> call, Metadata headers)125     public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
126       Preconditions.checkArgument(
127           call.getMethodDescriptor().getType().clientSendsOneMessage(),
128           "asyncUnaryRequestCall is only for clientSendsOneMessage methods");
129       ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
130           new ServerCallStreamObserverImpl<>(call, serverStreaming);
131       // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
132       // sends more than 1 requests, ServerCall will catch it. Note that disabling auto
133       // inbound flow control has no effect on unary calls.
134       call.request(2);
135       return new UnaryServerCallListener(responseObserver, call);
136     }
137 
138     private final class UnaryServerCallListener extends ServerCall.Listener<ReqT> {
139       private final ServerCall<ReqT, RespT> call;
140       private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver;
141       private boolean canInvoke = true;
142       private boolean wasReady;
143       private ReqT request;
144 
145       // Non private to avoid synthetic class
UnaryServerCallListener( ServerCallStreamObserverImpl<ReqT, RespT> responseObserver, ServerCall<ReqT, RespT> call)146       UnaryServerCallListener(
147           ServerCallStreamObserverImpl<ReqT, RespT> responseObserver,
148           ServerCall<ReqT, RespT> call) {
149         this.call = call;
150         this.responseObserver = responseObserver;
151       }
152 
153       @Override
onMessage(ReqT request)154       public void onMessage(ReqT request) {
155         if (this.request != null) {
156           // Safe to close the call, because the application has not yet been invoked
157           call.close(
158               Status.INTERNAL.withDescription(TOO_MANY_REQUESTS),
159               new Metadata());
160           canInvoke = false;
161           return;
162         }
163 
164         // We delay calling method.invoke() until onHalfClose() to make sure the client
165         // half-closes.
166         this.request = request;
167       }
168 
169       @Override
onHalfClose()170       public void onHalfClose() {
171         if (!canInvoke) {
172           return;
173         }
174         if (request == null) {
175           // Safe to close the call, because the application has not yet been invoked
176           call.close(
177               Status.INTERNAL.withDescription(MISSING_REQUEST),
178               new Metadata());
179           return;
180         }
181 
182         method.invoke(request, responseObserver);
183         request = null;
184         responseObserver.freeze();
185         if (wasReady) {
186           // Since we are calling invoke in halfClose we have missed the onReady
187           // event from the transport so recover it here.
188           onReady();
189         }
190       }
191 
192       @Override
onCancel()193       public void onCancel() {
194         if (responseObserver.onCancelHandler != null) {
195           responseObserver.onCancelHandler.run();
196         } else {
197           // Only trigger exceptions if unable to provide notification via a callback
198           responseObserver.cancelled = true;
199         }
200       }
201 
202       @Override
onReady()203       public void onReady() {
204         wasReady = true;
205         if (responseObserver.onReadyHandler != null) {
206           responseObserver.onReadyHandler.run();
207         }
208       }
209 
210       @Override
onComplete()211       public void onComplete() {
212         if (responseObserver.onCloseHandler != null) {
213           responseObserver.onCloseHandler.run();
214         }
215       }
216     }
217   }
218 
219   private static final class StreamingServerCallHandler<ReqT, RespT>
220       implements ServerCallHandler<ReqT, RespT> {
221 
222     private final StreamingRequestMethod<ReqT, RespT> method;
223     private final boolean bidi;
224 
225     // Non private to avoid synthetic class
StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method, boolean bidi)226     StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method, boolean bidi) {
227       this.method = method;
228       this.bidi = bidi;
229     }
230 
231     @Override
startCall(ServerCall<ReqT, RespT> call, Metadata headers)232     public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
233       ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
234           new ServerCallStreamObserverImpl<>(call, bidi);
235       StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
236       responseObserver.freeze();
237       if (responseObserver.autoRequestEnabled) {
238         call.request(1);
239       }
240       return new StreamingServerCallListener(requestObserver, responseObserver, call);
241     }
242 
243     private final class StreamingServerCallListener extends ServerCall.Listener<ReqT> {
244 
245       private final StreamObserver<ReqT> requestObserver;
246       private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver;
247       private final ServerCall<ReqT, RespT> call;
248       private boolean halfClosed = false;
249 
250       // Non private to avoid synthetic class
StreamingServerCallListener( StreamObserver<ReqT> requestObserver, ServerCallStreamObserverImpl<ReqT, RespT> responseObserver, ServerCall<ReqT, RespT> call)251       StreamingServerCallListener(
252           StreamObserver<ReqT> requestObserver,
253           ServerCallStreamObserverImpl<ReqT, RespT> responseObserver,
254           ServerCall<ReqT, RespT> call) {
255         this.requestObserver = requestObserver;
256         this.responseObserver = responseObserver;
257         this.call = call;
258       }
259 
260       @Override
onMessage(ReqT request)261       public void onMessage(ReqT request) {
262         requestObserver.onNext(request);
263 
264         // Request delivery of the next inbound message.
265         if (responseObserver.autoRequestEnabled) {
266           call.request(1);
267         }
268       }
269 
270       @Override
onHalfClose()271       public void onHalfClose() {
272         halfClosed = true;
273         requestObserver.onCompleted();
274       }
275 
276       @Override
onCancel()277       public void onCancel() {
278         if (responseObserver.onCancelHandler != null) {
279           responseObserver.onCancelHandler.run();
280         } else {
281           // Only trigger exceptions if unable to provide notification via a callback. Even though
282           // onError would provide notification to the server, we still throw an error since there
283           // isn't a guaranteed callback available. If the cancellation happened in a different
284           // order the service could be surprised to see the exception.
285           responseObserver.cancelled = true;
286         }
287         if (!halfClosed) {
288           requestObserver.onError(
289               Status.CANCELLED
290                   .withDescription("client cancelled")
291                   .asRuntimeException());
292         }
293       }
294 
295       @Override
onReady()296       public void onReady() {
297         if (responseObserver.onReadyHandler != null) {
298           responseObserver.onReadyHandler.run();
299         }
300       }
301 
302       @Override
onComplete()303       public void onComplete() {
304         if (responseObserver.onCloseHandler != null) {
305           responseObserver.onCloseHandler.run();
306         }
307       }
308     }
309   }
310 
311   private interface UnaryRequestMethod<ReqT, RespT> {
312     /**
313      * The provided {@code responseObserver} will extend {@link ServerCallStreamObserver}.
314      */
invoke(ReqT request, StreamObserver<RespT> responseObserver)315     void invoke(ReqT request, StreamObserver<RespT> responseObserver);
316   }
317 
318   private interface StreamingRequestMethod<ReqT, RespT> {
319     /**
320      * The provided {@code responseObserver} will extend {@link ServerCallStreamObserver}.
321      */
invoke(StreamObserver<RespT> responseObserver)322     StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
323   }
324 
325   private static final class ServerCallStreamObserverImpl<ReqT, RespT>
326       extends ServerCallStreamObserver<RespT> {
327     final ServerCall<ReqT, RespT> call;
328     private final boolean serverStreamingOrBidi;
329     volatile boolean cancelled;
330     private boolean frozen;
331     private boolean autoRequestEnabled = true;
332     private boolean sentHeaders;
333     private Runnable onReadyHandler;
334     private Runnable onCancelHandler;
335     private boolean aborted = false;
336     private boolean completed = false;
337     private Runnable onCloseHandler;
338 
339     // Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call, boolean serverStreamingOrBidi)340     ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call, boolean serverStreamingOrBidi) {
341       this.call = call;
342       this.serverStreamingOrBidi = serverStreamingOrBidi;
343     }
344 
freeze()345     private void freeze() {
346       this.frozen = true;
347     }
348 
349     @Override
setMessageCompression(boolean enable)350     public void setMessageCompression(boolean enable) {
351       call.setMessageCompression(enable);
352     }
353 
354     @Override
setCompression(String compression)355     public void setCompression(String compression) {
356       call.setCompression(compression);
357     }
358 
359     @Override
onNext(RespT response)360     public void onNext(RespT response) {
361       if (cancelled) {
362         if (serverStreamingOrBidi) {
363           throw Status.CANCELLED
364               .withDescription("call already cancelled. "
365                   + "Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception")
366               .asRuntimeException();
367         } else {
368           // We choose not to throw for unary responses. The exception is intended to stop servers
369           // from continuing processing, but for unary responses there is no further processing
370           // so throwing an exception would not provide a benefit and would increase application
371           // complexity.
372         }
373       }
374       checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
375       checkState(!completed, "Stream is already completed, no further calls are allowed");
376       if (!sentHeaders) {
377         call.sendHeaders(new Metadata());
378         sentHeaders = true;
379       }
380       call.sendMessage(response);
381     }
382 
383     @Override
onError(Throwable t)384     public void onError(Throwable t) {
385       Metadata metadata = Status.trailersFromThrowable(t);
386       if (metadata == null) {
387         metadata = new Metadata();
388       }
389       call.close(Status.fromThrowable(t), metadata);
390       aborted = true;
391     }
392 
393     @Override
onCompleted()394     public void onCompleted() {
395       call.close(Status.OK, new Metadata());
396       completed = true;
397     }
398 
399     @Override
isReady()400     public boolean isReady() {
401       return call.isReady();
402     }
403 
404     @Override
setOnReadyHandler(Runnable r)405     public void setOnReadyHandler(Runnable r) {
406       checkState(!frozen, "Cannot alter onReadyHandler after initialization. May only be called "
407           + "during the initial call to the application, before the service returns its "
408           + "StreamObserver");
409       this.onReadyHandler = r;
410     }
411 
412     @Override
isCancelled()413     public boolean isCancelled() {
414       return call.isCancelled();
415     }
416 
417     @Override
setOnCancelHandler(Runnable onCancelHandler)418     public void setOnCancelHandler(Runnable onCancelHandler) {
419       checkState(!frozen, "Cannot alter onCancelHandler after initialization. May only be called "
420           + "during the initial call to the application, before the service returns its "
421           + "StreamObserver");
422       this.onCancelHandler = onCancelHandler;
423     }
424 
425     @Override
disableAutoInboundFlowControl()426     public void disableAutoInboundFlowControl() {
427       disableAutoRequest();
428     }
429 
430     @Override
disableAutoRequest()431     public void disableAutoRequest() {
432       checkState(!frozen, "Cannot disable auto flow control after initialization");
433       autoRequestEnabled = false;
434     }
435 
436     @Override
request(int count)437     public void request(int count) {
438       call.request(count);
439     }
440 
441     @Override
setOnCloseHandler(Runnable onCloseHandler)442     public void setOnCloseHandler(Runnable onCloseHandler) {
443       checkState(!frozen, "Cannot alter onCloseHandler after initialization. May only be called "
444           + "during the initial call to the application, before the service returns its "
445           + "StreamObserver");
446       this.onCloseHandler = onCloseHandler;
447     }
448   }
449 
450   /**
451    * Sets unimplemented status for method on given response stream for unary call.
452    *
453    * @param methodDescriptor of method for which error will be thrown.
454    * @param responseObserver on which error will be set.
455    */
asyncUnimplementedUnaryCall( MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver)456   public static void asyncUnimplementedUnaryCall(
457       MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) {
458     checkNotNull(methodDescriptor, "methodDescriptor");
459     checkNotNull(responseObserver, "responseObserver");
460     responseObserver.onError(Status.UNIMPLEMENTED
461         .withDescription(String.format("Method %s is unimplemented",
462             methodDescriptor.getFullMethodName()))
463         .asRuntimeException());
464   }
465 
466   /**
467    * Sets unimplemented status for streaming call.
468    *
469    * @param methodDescriptor of method for which error will be thrown.
470    * @param responseObserver on which error will be set.
471    */
asyncUnimplementedStreamingCall( MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver)472   public static <ReqT> StreamObserver<ReqT> asyncUnimplementedStreamingCall(
473       MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) {
474     // NB: For streaming call we want to do the same as for unary call. Fail-fast by setting error
475     // on responseObserver and then return no-op observer.
476     asyncUnimplementedUnaryCall(methodDescriptor, responseObserver);
477     return new NoopStreamObserver<>();
478   }
479 
480   /**
481    * No-op implementation of StreamObserver. Used in abstract stubs for default implementations of
482    * methods which throws UNIMPLEMENTED error and tests.
483    */
484   static class NoopStreamObserver<V> implements StreamObserver<V> {
485     @Override
onNext(V value)486     public void onNext(V value) {
487     }
488 
489     @Override
onError(Throwable t)490     public void onError(Throwable t) {
491     }
492 
493     @Override
onCompleted()494     public void onCompleted() {
495     }
496   }
497 }
498