• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2023 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.examples.cancellation;
18 
19 import com.google.common.util.concurrent.MoreExecutors;
20 import io.grpc.Context;
21 import io.grpc.Grpc;
22 import io.grpc.InsecureServerCredentials;
23 import io.grpc.Server;
24 import io.grpc.Status;
25 import io.grpc.examples.echo.EchoGrpc;
26 import io.grpc.examples.echo.EchoRequest;
27 import io.grpc.examples.echo.EchoResponse;
28 import io.grpc.stub.ServerCallStreamObserver;
29 import io.grpc.stub.StreamObserver;
30 import java.io.IOException;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.FutureTask;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.TimeUnit;
38 import java.util.logging.Logger;
39 
40 /**
41  * Server that manages startup/shutdown of a {@code Greeter} server.
42  *
43  * <p>Any abort of an ongoing RPC is considered "cancellation" of that RPC. The common causes of
44  * cancellation are the client explicitly cancelling, the deadline expires, and I/O failures. The
45  * service is not informed the reason for the cancellation.
46  *
47  * <p>There are two APIs for services to be notified of RPC cancellation: io.grpc.Context and
48  * ServerCallStreamObserver. Context listeners are called on a different thread, so need to be
49  * thread-safe. The ServerCallStreamObserver cancellation callback is called like other
50  * StreamObserver callbacks, so the application may not need thread-safe handling. Both APIs have
51  * thread-safe isCancelled() polling methods.
52  */
53 public class CancellationServer {
main(String[] args)54   public static void main(String[] args) throws IOException, InterruptedException {
55     ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
56     int port = 50051;
57     Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
58         .addService(new SlowEcho(scheduler))
59         .build()
60         .start();
61     System.out.println("Server started, listening on " + port);
62     Runtime.getRuntime().addShutdownHook(new Thread() {
63       @Override
64       public void run() {
65         try {
66           server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
67         } catch (InterruptedException e) {
68           e.printStackTrace(System.err);
69         }
70       }
71     });
72     server.awaitTermination();
73     scheduler.shutdown();
74     if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
75       scheduler.shutdownNow();
76     }
77   }
78 
79   static class SlowEcho extends EchoGrpc.EchoImplBase {
80     private final ScheduledExecutorService scheduler;
81 
82     /** {@code scheduler} must be single-threaded. */
SlowEcho(ScheduledExecutorService scheduler)83     public SlowEcho(ScheduledExecutorService scheduler) {
84       this.scheduler = scheduler;
85     }
86 
87     /**
88      * Repeatedly echos each request until the client has no more requests. It performs all work
89      * asynchronously on a single thread. It uses ServerCallStreamObserver to be notified of RPC
90      * cancellation.
91      */
92     @Override
bidirectionalStreamingEcho( StreamObserver<EchoResponse> responseObserver)93     public StreamObserver<EchoRequest> bidirectionalStreamingEcho(
94         StreamObserver<EchoResponse> responseObserver) {
95       // If the service is truly asynchronous, using ServerCallStreamObserver to receive
96       // cancellation notifications tends to work well.
97 
98       // It is safe to cast the provided observer to ServerCallStreamObserver.
99       ServerCallStreamObserver<EchoResponse> responseCallObserver =
100           (ServerCallStreamObserver<EchoResponse>) responseObserver;
101       System.out.println("\nBidi RPC started");
102       class EchoObserver implements StreamObserver<EchoRequest> {
103         private static final int delayMs = 200;
104         private final List<Future<?>> echos = new ArrayList<>();
105 
106         @Override
107         public void onNext(EchoRequest request) {
108           System.out.println("Bidi RPC received request: " + request.getMessage());
109           EchoResponse response
110               = EchoResponse.newBuilder().setMessage(request.getMessage()).build();
111           Runnable echo = () -> responseObserver.onNext(response);
112           echos.add(scheduler.scheduleAtFixedRate(echo, delayMs, delayMs, TimeUnit.MILLISECONDS));
113         }
114 
115         @Override
116         public void onCompleted() {
117           System.out.println("Bidi RPC client finished");
118           // Let each echo happen two more times, and then stop.
119           List<Future<?>> echosCopy = new ArrayList<>(echos);
120           Runnable complete = () -> {
121             stopEchos(echosCopy);
122             responseObserver.onCompleted();
123             System.out.println("Bidi RPC completed");
124           };
125           echos.add(scheduler.schedule(complete, 2*delayMs, TimeUnit.MILLISECONDS));
126         }
127 
128         @Override
129         public void onError(Throwable t) {
130           System.out.println("Bidi RPC failed: " + Status.fromThrowable(t));
131           stopEchos(echos);
132           scheduler.execute(() -> responseObserver.onError(t));
133         }
134 
135         public void onCancel() {
136           // If onCompleted() hasn't been called by this point, then this method and onError are
137           // both called. If onCompleted() has been called, then just this method is called.
138           System.out.println("Bidi RPC cancelled");
139           stopEchos(echos);
140         }
141 
142         private void stopEchos(List<Future<?>> echos) {
143           for (Future<?> echo : echos) {
144             echo.cancel(false);
145           }
146         }
147       }
148 
149       EchoObserver requestObserver = new EchoObserver();
150       // onCancel() can be called even after the service completes or fails the RPC, because
151       // callbacks are racy and the response still has to be sent to the client. Use
152       // setOnCloseHandler() to be notified when the RPC completed without cancellation (as best as
153       // the server is able to tell).
154       responseCallObserver.setOnCancelHandler(requestObserver::onCancel);
155       return requestObserver;
156     }
157 
158     /**
159      * Echos the request after a delay. It processes the request in-line within the callback. It
160      * uses Context to be notified of RPC cancellation.
161      */
162     @Override
unaryEcho(EchoRequest request, StreamObserver<EchoResponse> responseObserver)163     public void unaryEcho(EchoRequest request, StreamObserver<EchoResponse> responseObserver) {
164       // ServerCallStreamObserver.setOnCancelHandler(Runnable) is not useful for this method, since
165       // this method only returns once it has a result. ServerCallStreamObserver guarantees the
166       // Runnable is not run at the same time as other RPC callback methods (including this method),
167       // so the cancellation notification would be guaranteed to occur too late.
168       System.out.println("\nUnary RPC started: " + request.getMessage());
169       Context currentContext = Context.current();
170       // Let's start a multi-part operation. We can check cancellation periodically.
171       for (int i = 0; i < 10; i++) {
172         // ServerCallStreamObserver.isCancelled() returns true only if the RPC is cancelled.
173         // Context.isCancelled() is similar, but also returns true when the RPC completes normally.
174         // It doesn't matter which API is used here.
175         if (currentContext.isCancelled()) {
176           System.out.println("Unary RPC cancelled");
177           responseObserver.onError(
178               Status.CANCELLED.withDescription("RPC cancelled").asRuntimeException());
179           return;
180         }
181 
182         FutureTask<Void> task = new FutureTask<>(() -> {
183           Thread.sleep(100); // Do some work
184           return null;
185         });
186         // Some Java blocking APIs have a method to cancel an ongoing operation, like closing an
187         // InputStream or interrupting the thread. We can use a Context listener to call that API
188         // from another thread if the RPC is cancelled.
189         Context.CancellationListener listener = (Context context) -> task.cancel(true);
190         Context.current().addListener(listener, MoreExecutors.directExecutor());
191         task.run(); // A cancellable operation
192         Context.current().removeListener(listener);
193 
194         // gRPC stubs observe io.grpc.Context cancellation, so cancellation is automatically
195         // propagated when performing an RPC. You can use a different Context or use Context.fork()
196         // to disable the automatic propagation. For example,
197         //   Context.ROOT.call(() -> futureStub.unaryEcho(request));
198         //   context.fork().call(() -> futureStub.unaryEcho(request));
199       }
200       responseObserver.onNext(
201           EchoResponse.newBuilder().setMessage(request.getMessage()).build());
202       responseObserver.onCompleted();
203       System.out.println("Unary RPC completed");
204     }
205   }
206 }
207