1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.stub; 18 19 import io.grpc.ExperimentalApi; 20 21 /** 22 * A refinement of {@link CallStreamObserver} to allows for interaction with call 23 * cancellation events on the server side. An instance of this class is obtained by casting the 24 * {@code StreamObserver} passed as an argument to service implementations. 25 * 26 * <p>Like {@code StreamObserver}, implementations are not required to be thread-safe; if multiple 27 * threads will be writing to an instance concurrently, the application must synchronize its calls. 28 * 29 * <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create 30 * "real" RPCs suitable for testing and interact with the server using a normal client stub. 31 */ 32 public abstract class ServerCallStreamObserver<RespT> extends CallStreamObserver<RespT> { 33 34 /** 35 * Returns {@code true} when the call is cancelled and the server is encouraged to abort 36 * processing to save resources, since the client will not be processing any further methods. 37 * Cancellations can be caused by timeouts, explicit cancellation by client, network errors, and 38 * similar. 39 * 40 * <p>This method may safely be called concurrently from multiple threads. 41 */ isCancelled()42 public abstract boolean isCancelled(); 43 44 /** 45 * Sets a {@link Runnable} to be called if the call is cancelled and the server is encouraged to 46 * abort processing to save resources, since the client will not process any further messages. 47 * Cancellations can be caused by timeouts, explicit cancellation by the client, network errors, 48 * etc. 49 * 50 * <p>It is guaranteed that execution of the {@link Runnable} is serialized with calls to the 51 * 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if other 52 * callbacks are running; if one of those other callbacks runs for a significant amount of time 53 * it can poll {@link #isCancelled()}, which is not delayed. 54 * 55 * <p>This method may only be called during the initial call to the application, before the 56 * service returns its {@code StreamObserver}. 57 * 58 * <p>Setting the onCancelHandler will suppress the on-cancel exception thrown by 59 * {@link #onNext}. If the caller is already handling cancellation via polling or cannot 60 * substantially benefit from observing cancellation, using a no-op {@code onCancelHandler} is 61 * useful just to suppress the {@code onNext()} exception. 62 * 63 * @param onCancelHandler to call when client has cancelled the call. 64 */ setOnCancelHandler(Runnable onCancelHandler)65 public abstract void setOnCancelHandler(Runnable onCancelHandler); 66 67 /** 68 * Sets the compression algorithm to use for the call. May only be called before sending any 69 * messages. Default gRPC servers support the "gzip" compressor. 70 * 71 * <p>It is safe to call this even if the client does not support the compression format chosen. 72 * The implementation will handle negotiation with the client and may fall back to no compression. 73 * 74 * @param compression the compression algorithm to use. 75 * @throws IllegalArgumentException if the compressor name can not be found. 76 */ setCompression(String compression)77 public abstract void setCompression(String compression); 78 79 /** 80 * Swaps to manual flow control where no message will be delivered to {@link 81 * StreamObserver#onNext(Object)} unless it is {@link #request request()}ed. 82 * 83 * <p>It may only be called during the initial call to the application, before the service returns 84 * its {@code StreamObserver}. 85 * 86 * <p>Note that for cases where the message is received before the service handler is invoked, 87 * this method will have no effect. This is true for: 88 * 89 * <ul> 90 * <li>{@link io.grpc.MethodDescriptor.MethodType#UNARY} operations.</li> 91 * <li>{@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.</li> 92 * </ul> 93 * </p> 94 */ disableAutoRequest()95 public void disableAutoRequest() { 96 throw new UnsupportedOperationException(); 97 } 98 99 100 /** 101 * If {@code true}, indicates that the observer is capable of sending additional messages 102 * without requiring excessive buffering internally. This value is just a suggestion and the 103 * application is free to ignore it, however doing so may result in excessive buffering within the 104 * observer. 105 * 106 * <p>If {@code false}, the runnable passed to {@link #setOnReadyHandler} will be called after 107 * {@code isReady()} transitions to {@code true}. 108 */ 109 @Override isReady()110 public abstract boolean isReady(); 111 112 /** 113 * Set a {@link Runnable} that will be executed every time the stream {@link #isReady()} state 114 * changes from {@code false} to {@code true}. While it is not guaranteed that the same 115 * thread will always be used to execute the {@link Runnable}, it is guaranteed that executions 116 * are serialized with calls to the 'inbound' {@link StreamObserver}. 117 * 118 * <p>May only be called during the initial call to the application, before the service returns 119 * its {@code StreamObserver}. 120 * 121 * <p>Because there is a processing delay to deliver this notification, it is possible for 122 * concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious" 123 * notifications by checking {@code isReady()}'s current value instead of assuming it is now 124 * {@code true}. If {@code isReady() == false} the normal expectations apply, so there would be 125 * <em>another</em> {@code onReadyHandler} callback. 126 * 127 * @param onReadyHandler to call when peer is ready to receive more messages. 128 */ 129 @Override setOnReadyHandler(Runnable onReadyHandler)130 public abstract void setOnReadyHandler(Runnable onReadyHandler); 131 132 /** 133 * Requests the peer to produce {@code count} more messages to be delivered to the 'inbound' 134 * {@link StreamObserver}. 135 * 136 * <p>This method is safe to call from multiple threads without external synchronization. 137 * 138 * @param count more messages 139 */ 140 @Override request(int count)141 public abstract void request(int count); 142 143 /** 144 * Sets message compression for subsequent calls to {@link #onNext}. 145 * 146 * @param enable whether to enable compression. 147 */ 148 @Override setMessageCompression(boolean enable)149 public abstract void setMessageCompression(boolean enable); 150 151 /** 152 * Sets a {@link Runnable} to be executed when the call is closed cleanly from the server's 153 * point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called, 154 * all the messages and trailing metadata have been sent and the stream has been closed. Note 155 * however that the client still may have not received all the messages due to network delay, 156 * client crashes, and cancellation races. 157 * 158 * <p>Exactly one of {@code onCloseHandler} and {@code onCancelHandler} is guaranteed to be called 159 * when the RPC terminates.</p> 160 * 161 * <p>It is guaranteed that execution of {@code onCloseHandler} is serialized with calls to 162 * the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if 163 * other callbacks are running.</p> 164 * 165 * <p>This method may only be called during the initial call to the application, before the 166 * service returns its {@link StreamObserver request observer}.</p> 167 * 168 * @param onCloseHandler to execute when the call has been closed cleanly. 169 */ 170 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467") setOnCloseHandler(Runnable onCloseHandler)171 public void setOnCloseHandler(Runnable onCloseHandler) { 172 throw new UnsupportedOperationException(); 173 } 174 } 175