• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.stub;
18 
19 import io.grpc.ExperimentalApi;
20 
21 /**
22  * A refinement of {@link CallStreamObserver} to allows for interaction with call
23  * cancellation events on the server side. An instance of this class is obtained by casting the
24  * {@code StreamObserver} passed as an argument to service implementations.
25  *
26  * <p>Like {@code StreamObserver}, implementations are not required to be thread-safe; if multiple
27  * threads will be writing to an instance concurrently, the application must synchronize its calls.
28  *
29  * <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create
30  * "real" RPCs suitable for testing and interact with the server using a normal client stub.
31  */
32 public abstract class ServerCallStreamObserver<RespT> extends CallStreamObserver<RespT> {
33 
34   /**
35    * Returns {@code true} when the call is cancelled and the server is encouraged to abort
36    * processing to save resources, since the client will not be processing any further methods.
37    * Cancellations can be caused by timeouts, explicit cancellation by client, network errors, and
38    * similar.
39    *
40    * <p>This method may safely be called concurrently from multiple threads.
41    */
isCancelled()42   public abstract boolean isCancelled();
43 
44   /**
45    * Sets a {@link Runnable} to be called if the call is cancelled and the server is encouraged to
46    * abort processing to save resources, since the client will not process any further messages.
47    * Cancellations can be caused by timeouts, explicit cancellation by the client, network errors,
48    * etc.
49    *
50    * <p>It is guaranteed that execution of the {@link Runnable} is serialized with calls to the
51    * 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if other
52    * callbacks are running; if one of those other callbacks runs for a significant amount of time
53    * it can poll {@link #isCancelled()}, which is not delayed.
54    *
55    * <p>This method may only be called during the initial call to the application, before the
56    * service returns its {@code StreamObserver}.
57    *
58    * <p>Setting the onCancelHandler will suppress the on-cancel exception thrown by
59    * {@link #onNext}. If the caller is already handling cancellation via polling or cannot
60    * substantially benefit from observing cancellation, using a no-op {@code onCancelHandler} is
61    * useful just to suppress the {@code onNext()} exception.
62    *
63    * @param onCancelHandler to call when client has cancelled the call.
64    */
setOnCancelHandler(Runnable onCancelHandler)65   public abstract void setOnCancelHandler(Runnable onCancelHandler);
66 
67   /**
68    * Sets the compression algorithm to use for the call. May only be called before sending any
69    * messages. Default gRPC servers support the "gzip" compressor.
70    *
71    * <p>It is safe to call this even if the client does not support the compression format chosen.
72    * The implementation will handle negotiation with the client and may fall back to no compression.
73    *
74    * @param compression the compression algorithm to use.
75    * @throws IllegalArgumentException if the compressor name can not be found.
76    */
setCompression(String compression)77   public abstract void setCompression(String compression);
78 
79   /**
80    * Swaps to manual flow control where no message will be delivered to {@link
81    * StreamObserver#onNext(Object)} unless it is {@link #request request()}ed.
82    *
83    * <p>It may only be called during the initial call to the application, before the service returns
84    * its {@code StreamObserver}.
85    *
86    * <p>Note that for cases where the message is received before the service handler is invoked,
87    * this method will have no effect. This is true for:
88    *
89    * <ul>
90    *   <li>{@link io.grpc.MethodDescriptor.MethodType#UNARY} operations.</li>
91    *   <li>{@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.</li>
92    * </ul>
93    * </p>
94    */
disableAutoRequest()95   public void disableAutoRequest() {
96     throw new UnsupportedOperationException();
97   }
98 
99 
100   /**
101    * If {@code true}, indicates that the observer is capable of sending additional messages
102    * without requiring excessive buffering internally. This value is just a suggestion and the
103    * application is free to ignore it, however doing so may result in excessive buffering within the
104    * observer.
105    *
106    * <p>If {@code false}, the runnable passed to {@link #setOnReadyHandler} will be called after
107    * {@code isReady()} transitions to {@code true}.
108    */
109   @Override
isReady()110   public abstract boolean isReady();
111 
112   /**
113    * Set a {@link Runnable} that will be executed every time the stream {@link #isReady()} state
114    * changes from {@code false} to {@code true}.  While it is not guaranteed that the same
115    * thread will always be used to execute the {@link Runnable}, it is guaranteed that executions
116    * are serialized with calls to the 'inbound' {@link StreamObserver}.
117    *
118    * <p>May only be called during the initial call to the application, before the service returns
119    * its {@code StreamObserver}.
120    *
121    * <p>Because there is a processing delay to deliver this notification, it is possible for
122    * concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious"
123    * notifications by checking {@code isReady()}'s current value instead of assuming it is now
124    * {@code true}. If {@code isReady() == false} the normal expectations apply, so there would be
125    * <em>another</em> {@code onReadyHandler} callback.
126    *
127    * @param onReadyHandler to call when peer is ready to receive more messages.
128    */
129   @Override
setOnReadyHandler(Runnable onReadyHandler)130   public abstract void setOnReadyHandler(Runnable onReadyHandler);
131 
132   /**
133    * Requests the peer to produce {@code count} more messages to be delivered to the 'inbound'
134    * {@link StreamObserver}.
135    *
136    * <p>This method is safe to call from multiple threads without external synchronization.
137    *
138    * @param count more messages
139    */
140   @Override
request(int count)141   public abstract void request(int count);
142 
143   /**
144    * Sets message compression for subsequent calls to {@link #onNext}.
145    *
146    * @param enable whether to enable compression.
147    */
148   @Override
setMessageCompression(boolean enable)149   public abstract void setMessageCompression(boolean enable);
150 
151   /**
152    * Sets a {@link Runnable} to be executed when the call is closed cleanly from the server's
153    * point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called,
154    * all the messages and trailing metadata have been sent and the stream has been closed. Note
155    * however that the client still may have not received all the messages due to network delay,
156    * client crashes, and cancellation races.
157    *
158    * <p>Exactly one of {@code onCloseHandler} and {@code onCancelHandler} is guaranteed to be called
159    * when the RPC terminates.</p>
160    *
161    * <p>It is guaranteed that execution of {@code onCloseHandler} is serialized with calls to
162    * the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if
163    * other callbacks are running.</p>
164    *
165    * <p>This method may only be called during the initial call to the application, before the
166    * service returns its {@link StreamObserver request observer}.</p>
167    *
168    * @param onCloseHandler to execute when the call has been closed cleanly.
169    */
170   @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467")
setOnCloseHandler(Runnable onCloseHandler)171   public void setOnCloseHandler(Runnable onCloseHandler) {
172     throw new UnsupportedOperationException();
173   }
174 }
175