• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.examples.manualflowcontrol;
18 
19 import io.grpc.Server;
20 import io.grpc.ServerBuilder;
21 import io.grpc.Status;
22 import io.grpc.stub.ServerCallStreamObserver;
23 import io.grpc.stub.StreamObserver;
24 
25 import java.io.IOException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.logging.Logger;
28 
29 public class ManualFlowControlServer {
30   private static final Logger logger =
31       Logger.getLogger(ManualFlowControlServer.class.getName());
32 
main(String[] args)33   public static void main(String[] args) throws InterruptedException, IOException {
34     // Service class implementation
35     StreamingGreeterGrpc.StreamingGreeterImplBase svc = new StreamingGreeterGrpc.StreamingGreeterImplBase() {
36       @Override
37       public StreamObserver<HelloRequest> sayHelloStreaming(final StreamObserver<HelloReply> responseObserver) {
38         // Set up manual flow control for the request stream. It feels backwards to configure the request
39         // stream's flow control using the response stream's observer, but this is the way it is.
40         final ServerCallStreamObserver<HelloReply> serverCallStreamObserver =
41             (ServerCallStreamObserver<HelloReply>) responseObserver;
42         serverCallStreamObserver.disableAutoInboundFlowControl();
43 
44         // Guard against spurious onReady() calls caused by a race between onNext() and onReady(). If the transport
45         // toggles isReady() from false to true while onNext() is executing, but before onNext() checks isReady(),
46         // request(1) would be called twice - once by onNext() and once by the onReady() scheduled during onNext()'s
47         // execution.
48         final AtomicBoolean wasReady = new AtomicBoolean(false);
49 
50         // Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked
51         // when the consuming side has enough buffer space to receive more messages.
52         //
53         // Note: the onReadyHandler's invocation is serialized on the same thread pool as the incoming StreamObserver's
54         // onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent additional messages
55         // from being processed by the incoming StreamObserver. The onReadyHandler must return in a timely manor or else
56         // message processing throughput will suffer.
57         serverCallStreamObserver.setOnReadyHandler(new Runnable() {
58           public void run() {
59             if (serverCallStreamObserver.isReady() && wasReady.compareAndSet(false, true)) {
60               logger.info("READY");
61               // Signal the request sender to send one message. This happens when isReady() turns true, signaling that
62               // the receive buffer has enough free space to receive more messages. Calling request() serves to prime
63               // the message pump.
64               serverCallStreamObserver.request(1);
65             }
66           }
67         });
68 
69         // Give gRPC a StreamObserver that can observe and process incoming requests.
70         return new StreamObserver<HelloRequest>() {
71           @Override
72           public void onNext(HelloRequest request) {
73             // Process the request and send a response or an error.
74             try {
75               // Accept and enqueue the request.
76               String name = request.getName();
77               logger.info("--> " + name);
78 
79               // Simulate server "work"
80               Thread.sleep(100);
81 
82               // Send a response.
83               String message = "Hello " + name;
84               logger.info("<-- " + message);
85               HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
86               responseObserver.onNext(reply);
87 
88               // Check the provided ServerCallStreamObserver to see if it is still ready to accept more messages.
89               if (serverCallStreamObserver.isReady()) {
90                 // Signal the sender to send another request. As long as isReady() stays true, the server will keep
91                 // cycling through the loop of onNext() -> request()...onNext() -> request()... until either the client
92                 // runs out of messages and ends the loop or the server runs out of receive buffer space.
93                 //
94                 // If the server runs out of buffer space, isReady() will turn false. When the receive buffer has
95                 // sufficiently drained, isReady() will turn true, and the serverCallStreamObserver's onReadyHandler
96                 // will be called to restart the message pump.
97                 serverCallStreamObserver.request(1);
98               } else {
99                 // If not, note that back-pressure has begun.
100                 wasReady.set(false);
101               }
102             } catch (Throwable throwable) {
103               throwable.printStackTrace();
104               responseObserver.onError(
105                   Status.UNKNOWN.withDescription("Error handling request").withCause(throwable).asException());
106             }
107           }
108 
109           @Override
110           public void onError(Throwable t) {
111             // End the response stream if the client presents an error.
112             t.printStackTrace();
113             responseObserver.onCompleted();
114           }
115 
116           @Override
117           public void onCompleted() {
118             // Signal the end of work when the client ends the request stream.
119             logger.info("COMPLETED");
120             responseObserver.onCompleted();
121           }
122         };
123       }
124     };
125 
126     final Server server = ServerBuilder
127         .forPort(50051)
128         .addService(svc)
129         .build()
130         .start();
131 
132     logger.info("Listening on " + server.getPort());
133 
134     Runtime.getRuntime().addShutdownHook(new Thread() {
135       @Override
136       public void run() {
137         logger.info("Shutting down");
138         server.shutdown();
139       }
140     });
141     server.awaitTermination();
142   }
143 }
144