1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.examples.manualflowcontrol; 18 19 import io.grpc.Server; 20 import io.grpc.ServerBuilder; 21 import io.grpc.Status; 22 import io.grpc.stub.ServerCallStreamObserver; 23 import io.grpc.stub.StreamObserver; 24 25 import java.io.IOException; 26 import java.util.concurrent.atomic.AtomicBoolean; 27 import java.util.logging.Logger; 28 29 public class ManualFlowControlServer { 30 private static final Logger logger = 31 Logger.getLogger(ManualFlowControlServer.class.getName()); 32 main(String[] args)33 public static void main(String[] args) throws InterruptedException, IOException { 34 // Service class implementation 35 StreamingGreeterGrpc.StreamingGreeterImplBase svc = new StreamingGreeterGrpc.StreamingGreeterImplBase() { 36 @Override 37 public StreamObserver<HelloRequest> sayHelloStreaming(final StreamObserver<HelloReply> responseObserver) { 38 // Set up manual flow control for the request stream. It feels backwards to configure the request 39 // stream's flow control using the response stream's observer, but this is the way it is. 40 final ServerCallStreamObserver<HelloReply> serverCallStreamObserver = 41 (ServerCallStreamObserver<HelloReply>) responseObserver; 42 serverCallStreamObserver.disableAutoInboundFlowControl(); 43 44 // Guard against spurious onReady() calls caused by a race between onNext() and onReady(). If the transport 45 // toggles isReady() from false to true while onNext() is executing, but before onNext() checks isReady(), 46 // request(1) would be called twice - once by onNext() and once by the onReady() scheduled during onNext()'s 47 // execution. 48 final AtomicBoolean wasReady = new AtomicBoolean(false); 49 50 // Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked 51 // when the consuming side has enough buffer space to receive more messages. 52 // 53 // Note: the onReadyHandler's invocation is serialized on the same thread pool as the incoming StreamObserver's 54 // onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent additional messages 55 // from being processed by the incoming StreamObserver. The onReadyHandler must return in a timely manor or else 56 // message processing throughput will suffer. 57 serverCallStreamObserver.setOnReadyHandler(new Runnable() { 58 public void run() { 59 if (serverCallStreamObserver.isReady() && wasReady.compareAndSet(false, true)) { 60 logger.info("READY"); 61 // Signal the request sender to send one message. This happens when isReady() turns true, signaling that 62 // the receive buffer has enough free space to receive more messages. Calling request() serves to prime 63 // the message pump. 64 serverCallStreamObserver.request(1); 65 } 66 } 67 }); 68 69 // Give gRPC a StreamObserver that can observe and process incoming requests. 70 return new StreamObserver<HelloRequest>() { 71 @Override 72 public void onNext(HelloRequest request) { 73 // Process the request and send a response or an error. 74 try { 75 // Accept and enqueue the request. 76 String name = request.getName(); 77 logger.info("--> " + name); 78 79 // Simulate server "work" 80 Thread.sleep(100); 81 82 // Send a response. 83 String message = "Hello " + name; 84 logger.info("<-- " + message); 85 HelloReply reply = HelloReply.newBuilder().setMessage(message).build(); 86 responseObserver.onNext(reply); 87 88 // Check the provided ServerCallStreamObserver to see if it is still ready to accept more messages. 89 if (serverCallStreamObserver.isReady()) { 90 // Signal the sender to send another request. As long as isReady() stays true, the server will keep 91 // cycling through the loop of onNext() -> request()...onNext() -> request()... until either the client 92 // runs out of messages and ends the loop or the server runs out of receive buffer space. 93 // 94 // If the server runs out of buffer space, isReady() will turn false. When the receive buffer has 95 // sufficiently drained, isReady() will turn true, and the serverCallStreamObserver's onReadyHandler 96 // will be called to restart the message pump. 97 serverCallStreamObserver.request(1); 98 } else { 99 // If not, note that back-pressure has begun. 100 wasReady.set(false); 101 } 102 } catch (Throwable throwable) { 103 throwable.printStackTrace(); 104 responseObserver.onError( 105 Status.UNKNOWN.withDescription("Error handling request").withCause(throwable).asException()); 106 } 107 } 108 109 @Override 110 public void onError(Throwable t) { 111 // End the response stream if the client presents an error. 112 t.printStackTrace(); 113 responseObserver.onCompleted(); 114 } 115 116 @Override 117 public void onCompleted() { 118 // Signal the end of work when the client ends the request stream. 119 logger.info("COMPLETED"); 120 responseObserver.onCompleted(); 121 } 122 }; 123 } 124 }; 125 126 final Server server = ServerBuilder 127 .forPort(50051) 128 .addService(svc) 129 .build() 130 .start(); 131 132 logger.info("Listening on " + server.getPort()); 133 134 Runtime.getRuntime().addShutdownHook(new Thread() { 135 @Override 136 public void run() { 137 logger.info("Shutting down"); 138 server.shutdown(); 139 } 140 }); 141 server.awaitTermination(); 142 } 143 } 144