1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.stub; 18 19 import io.grpc.ExperimentalApi; 20 21 /** 22 * A refinement of StreamObserver provided by the GRPC runtime to the application (the client or 23 * the server) that allows for more complex interactions with call behavior. 24 * 25 * <p>In any call there are logically four {@link StreamObserver} implementations: 26 * <ul> 27 * <li>'inbound', client-side - which the GRPC runtime calls when it receives messages from 28 * the server. This is implemented by the client application and passed into a service method 29 * on a stub object. 30 * </li> 31 * <li>'outbound', client-side - which the GRPC runtime provides to the client application and the 32 * client uses this {@code StreamObserver} to send messages to the server. 33 * </li> 34 * <li>'inbound', server-side - which the GRPC runtime calls when it receives messages from 35 * the client. This is implemented by the server application and returned from service 36 * implementations of client-side streaming and bidirectional streaming methods. 37 * </li> 38 * <li>'outbound', server-side - which the GRPC runtime provides to the server application and 39 * the server uses this {@code StreamObserver} to send messages (responses) to the client. 40 * </li> 41 * </ul> 42 * 43 * <p>Implementations of this class represent the 'outbound' message streams. The client-side 44 * one is {@link ClientCallStreamObserver} and the service-side one is 45 * {@link ServerCallStreamObserver}. 46 * 47 * <p>Like {@code StreamObserver}, implementations are not required to be thread-safe; if multiple 48 * threads will be writing to an instance concurrently, the application must synchronize its calls. 49 * 50 * <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create 51 * "real" RPCs suitable for testing. 52 * 53 * @param <V> type of outbound message. 54 */ 55 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8499") 56 public abstract class CallStreamObserver<V> implements StreamObserver<V> { 57 58 /** 59 * If {@code true}, indicates that the observer is capable of sending additional messages 60 * without requiring excessive buffering internally. This value is just a suggestion and the 61 * application is free to ignore it, however doing so may result in excessive buffering within the 62 * observer. 63 * 64 * <p>If {@code false}, the runnable passed to {@link #setOnReadyHandler} will be called after 65 * {@code isReady()} transitions to {@code true}. 66 */ isReady()67 public abstract boolean isReady(); 68 69 /** 70 * Set a {@link Runnable} that will be executed every time the stream {@link #isReady()} state 71 * changes from {@code false} to {@code true}. While it is not guaranteed that the same 72 * thread will always be used to execute the {@link Runnable}, it is guaranteed that executions 73 * are serialized with calls to the 'inbound' {@link StreamObserver}. 74 * 75 * <p>On client-side this method may only be called during {@link 76 * ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial 77 * call to the application, before the service returns its {@code StreamObserver}. 78 * 79 * <p>Because there is a processing delay to deliver this notification, it is possible for 80 * concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious" 81 * notifications by checking {@code isReady()}'s current value instead of assuming it is now 82 * {@code true}. If {@code isReady() == false} the normal expectations apply, so there would be 83 * <em>another</em> {@code onReadyHandler} callback. 84 * 85 * @param onReadyHandler to call when peer is ready to receive more messages. 86 */ setOnReadyHandler(Runnable onReadyHandler)87 public abstract void setOnReadyHandler(Runnable onReadyHandler); 88 89 /** 90 * Disables automatic flow control where a token is returned to the peer after a call 91 * to the 'inbound' {@link io.grpc.stub.StreamObserver#onNext(Object)} has completed. If disabled 92 * an application must make explicit calls to {@link #request} to receive messages. 93 * 94 * <p>On client-side this method may only be called during {@link 95 * ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial 96 * call to the application, before the service returns its {@code StreamObserver}. 97 * 98 * <p>Note that for cases where the runtime knows that only one inbound message is allowed 99 * calling this method will have no effect and the runtime will always permit one and only 100 * one message. This is true for: 101 * <ul> 102 * <li>{@link io.grpc.MethodDescriptor.MethodType#UNARY} operations on both the 103 * client and server. 104 * </li> 105 * <li>{@link io.grpc.MethodDescriptor.MethodType#CLIENT_STREAMING} operations on the client. 106 * </li> 107 * <li>{@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations on the server. 108 * </li> 109 * </ul> 110 * </p> 111 * 112 * <p>This API is being replaced, but is not yet deprecated. On server-side it being replaced 113 * with {@link ServerCallStreamObserver#disableAutoRequest}. On client-side {@link 114 * ClientCallStreamObserver#disableAutoRequestWithInitial disableAutoRequestWithInitial(1)}. 115 */ disableAutoInboundFlowControl()116 public abstract void disableAutoInboundFlowControl(); 117 118 /** 119 * Requests the peer to produce {@code count} more messages to be delivered to the 'inbound' 120 * {@link StreamObserver}. 121 * 122 * <p>This method is safe to call from multiple threads without external synchronization. 123 * 124 * @param count more messages 125 */ request(int count)126 public abstract void request(int count); 127 128 /** 129 * Sets message compression for subsequent calls to {@link #onNext}. 130 * 131 * @param enable whether to enable compression. 132 */ setMessageCompression(boolean enable)133 public abstract void setMessageCompression(boolean enable); 134 } 135