• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.examples.manualflowcontrol;
18 
19 import io.grpc.Server;
20 import io.grpc.ServerBuilder;
21 import io.grpc.Status;
22 import io.grpc.stub.ServerCallStreamObserver;
23 import io.grpc.stub.StreamObserver;
24 
25 import java.io.IOException;
26 import java.util.concurrent.TimeUnit;
27 import java.util.logging.Logger;
28 
29 public class ManualFlowControlServer {
30   private static final Logger logger =
31       Logger.getLogger(ManualFlowControlServer.class.getName());
32 
main(String[] args)33   public static void main(String[] args) throws InterruptedException, IOException {
34     // Service class implementation
35     StreamingGreeterGrpc.StreamingGreeterImplBase svc = new StreamingGreeterGrpc.StreamingGreeterImplBase() {
36       @Override
37       public StreamObserver<HelloRequest> sayHelloStreaming(final StreamObserver<HelloReply> responseObserver) {
38         // Set up manual flow control for the request stream. It feels backwards to configure the request
39         // stream's flow control using the response stream's observer, but this is the way it is.
40         final ServerCallStreamObserver<HelloReply> serverCallStreamObserver =
41             (ServerCallStreamObserver<HelloReply>) responseObserver;
42         serverCallStreamObserver.disableAutoRequest();
43 
44         // Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked
45         // when the consuming side has enough buffer space to receive more messages.
46         //
47         // Note: the onReadyHandler's invocation is serialized on the same thread pool as the incoming StreamObserver's
48         // onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent additional messages
49         // from being processed by the incoming StreamObserver. The onReadyHandler must return in a timely manner or
50         // else message processing throughput will suffer.
51         class OnReadyHandler implements Runnable {
52           // Guard against spurious onReady() calls caused by a race between onNext() and onReady(). If the transport
53           // toggles isReady() from false to true while onNext() is executing, but before onNext() checks isReady(),
54           // request(1) would be called twice - once by onNext() and once by the onReady() scheduled during onNext()'s
55           // execution.
56           private boolean wasReady = false;
57 
58           @Override
59           public void run() {
60             if (serverCallStreamObserver.isReady() && !wasReady) {
61               wasReady = true;
62               logger.info("READY");
63               // Signal the request sender to send one message. This happens when isReady() turns true, signaling that
64               // the receive buffer has enough free space to receive more messages. Calling request() serves to prime
65               // the message pump.
66               serverCallStreamObserver.request(1);
67             }
68           }
69         }
70         final OnReadyHandler onReadyHandler = new OnReadyHandler();
71         serverCallStreamObserver.setOnReadyHandler(onReadyHandler);
72 
73         // Give gRPC a StreamObserver that can observe and process incoming requests.
74         return new StreamObserver<HelloRequest>() {
75           @Override
76           public void onNext(HelloRequest request) {
77             // Process the request and send a response or an error.
78             try {
79               // Accept and enqueue the request.
80               String name = request.getName();
81               logger.info("--> " + name);
82 
83               // Simulate server "work"
84               Thread.sleep(100);
85 
86               // Send a response.
87               String message = "Hello " + name;
88               logger.info("<-- " + message);
89               HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
90               responseObserver.onNext(reply);
91 
92               // Check the provided ServerCallStreamObserver to see if it is still ready to accept more messages.
93               if (serverCallStreamObserver.isReady()) {
94                 // Signal the sender to send another request. As long as isReady() stays true, the server will keep
95                 // cycling through the loop of onNext() -> request(1)...onNext() -> request(1)... until the client runs
96                 // out of messages and ends the loop (via onCompleted()).
97                 //
98                 // If request() was called here with the argument of more than 1, the server might runs out of receive
99                 // buffer space, and isReady() will turn false. When the receive buffer has sufficiently drained,
100                 // isReady() will turn true, and the serverCallStreamObserver's onReadyHandler will be called to restart
101                 // the message pump.
102                 serverCallStreamObserver.request(1);
103               } else {
104                 // If not, note that back-pressure has begun.
105                 onReadyHandler.wasReady = false;
106               }
107             } catch (Throwable throwable) {
108               throwable.printStackTrace();
109               responseObserver.onError(
110                   Status.UNKNOWN.withDescription("Error handling request").withCause(throwable).asException());
111             }
112           }
113 
114           @Override
115           public void onError(Throwable t) {
116             // End the response stream if the client presents an error.
117             t.printStackTrace();
118             responseObserver.onCompleted();
119           }
120 
121           @Override
122           public void onCompleted() {
123             // Signal the end of work when the client ends the request stream.
124             logger.info("COMPLETED");
125             responseObserver.onCompleted();
126           }
127         };
128       }
129     };
130 
131     final Server server = ServerBuilder
132         .forPort(50051)
133         .addService(svc)
134         .build()
135         .start();
136 
137     logger.info("Listening on " + server.getPort());
138 
139     Runtime.getRuntime().addShutdownHook(new Thread() {
140       @Override
141       public void run() {
142         // Use stderr here since the logger may have been reset by its JVM shutdown hook.
143         System.err.println("Shutting down");
144         try {
145           server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
146         } catch (InterruptedException e) {
147           e.printStackTrace(System.err);
148         }
149       }
150     });
151     server.awaitTermination();
152   }
153 }
154