1 /* 2 * Copyright 2023 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.examples.cancellation; 18 19 import com.google.common.util.concurrent.MoreExecutors; 20 import io.grpc.Context; 21 import io.grpc.Grpc; 22 import io.grpc.InsecureServerCredentials; 23 import io.grpc.Server; 24 import io.grpc.Status; 25 import io.grpc.examples.echo.EchoGrpc; 26 import io.grpc.examples.echo.EchoRequest; 27 import io.grpc.examples.echo.EchoResponse; 28 import io.grpc.stub.ServerCallStreamObserver; 29 import io.grpc.stub.StreamObserver; 30 import java.io.IOException; 31 import java.util.ArrayList; 32 import java.util.List; 33 import java.util.concurrent.Executors; 34 import java.util.concurrent.Future; 35 import java.util.concurrent.FutureTask; 36 import java.util.concurrent.ScheduledExecutorService; 37 import java.util.concurrent.TimeUnit; 38 import java.util.logging.Logger; 39 40 /** 41 * Server that manages startup/shutdown of a {@code Greeter} server. 42 * 43 * <p>Any abort of an ongoing RPC is considered "cancellation" of that RPC. The common causes of 44 * cancellation are the client explicitly cancelling, the deadline expires, and I/O failures. The 45 * service is not informed the reason for the cancellation. 46 * 47 * <p>There are two APIs for services to be notified of RPC cancellation: io.grpc.Context and 48 * ServerCallStreamObserver. Context listeners are called on a different thread, so need to be 49 * thread-safe. The ServerCallStreamObserver cancellation callback is called like other 50 * StreamObserver callbacks, so the application may not need thread-safe handling. Both APIs have 51 * thread-safe isCancelled() polling methods. 52 */ 53 public class CancellationServer { main(String[] args)54 public static void main(String[] args) throws IOException, InterruptedException { 55 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); 56 int port = 50051; 57 Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) 58 .addService(new SlowEcho(scheduler)) 59 .build() 60 .start(); 61 System.out.println("Server started, listening on " + port); 62 Runtime.getRuntime().addShutdownHook(new Thread() { 63 @Override 64 public void run() { 65 try { 66 server.shutdown().awaitTermination(30, TimeUnit.SECONDS); 67 } catch (InterruptedException e) { 68 e.printStackTrace(System.err); 69 } 70 } 71 }); 72 server.awaitTermination(); 73 scheduler.shutdown(); 74 if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { 75 scheduler.shutdownNow(); 76 } 77 } 78 79 static class SlowEcho extends EchoGrpc.EchoImplBase { 80 private final ScheduledExecutorService scheduler; 81 82 /** {@code scheduler} must be single-threaded. */ SlowEcho(ScheduledExecutorService scheduler)83 public SlowEcho(ScheduledExecutorService scheduler) { 84 this.scheduler = scheduler; 85 } 86 87 /** 88 * Repeatedly echos each request until the client has no more requests. It performs all work 89 * asynchronously on a single thread. It uses ServerCallStreamObserver to be notified of RPC 90 * cancellation. 91 */ 92 @Override bidirectionalStreamingEcho( StreamObserver<EchoResponse> responseObserver)93 public StreamObserver<EchoRequest> bidirectionalStreamingEcho( 94 StreamObserver<EchoResponse> responseObserver) { 95 // If the service is truly asynchronous, using ServerCallStreamObserver to receive 96 // cancellation notifications tends to work well. 97 98 // It is safe to cast the provided observer to ServerCallStreamObserver. 99 ServerCallStreamObserver<EchoResponse> responseCallObserver = 100 (ServerCallStreamObserver<EchoResponse>) responseObserver; 101 System.out.println("\nBidi RPC started"); 102 class EchoObserver implements StreamObserver<EchoRequest> { 103 private static final int delayMs = 200; 104 private final List<Future<?>> echos = new ArrayList<>(); 105 106 @Override 107 public void onNext(EchoRequest request) { 108 System.out.println("Bidi RPC received request: " + request.getMessage()); 109 EchoResponse response 110 = EchoResponse.newBuilder().setMessage(request.getMessage()).build(); 111 Runnable echo = () -> responseObserver.onNext(response); 112 echos.add(scheduler.scheduleAtFixedRate(echo, delayMs, delayMs, TimeUnit.MILLISECONDS)); 113 } 114 115 @Override 116 public void onCompleted() { 117 System.out.println("Bidi RPC client finished"); 118 // Let each echo happen two more times, and then stop. 119 List<Future<?>> echosCopy = new ArrayList<>(echos); 120 Runnable complete = () -> { 121 stopEchos(echosCopy); 122 responseObserver.onCompleted(); 123 System.out.println("Bidi RPC completed"); 124 }; 125 echos.add(scheduler.schedule(complete, 2*delayMs, TimeUnit.MILLISECONDS)); 126 } 127 128 @Override 129 public void onError(Throwable t) { 130 System.out.println("Bidi RPC failed: " + Status.fromThrowable(t)); 131 stopEchos(echos); 132 scheduler.execute(() -> responseObserver.onError(t)); 133 } 134 135 public void onCancel() { 136 // If onCompleted() hasn't been called by this point, then this method and onError are 137 // both called. If onCompleted() has been called, then just this method is called. 138 System.out.println("Bidi RPC cancelled"); 139 stopEchos(echos); 140 } 141 142 private void stopEchos(List<Future<?>> echos) { 143 for (Future<?> echo : echos) { 144 echo.cancel(false); 145 } 146 } 147 } 148 149 EchoObserver requestObserver = new EchoObserver(); 150 // onCancel() can be called even after the service completes or fails the RPC, because 151 // callbacks are racy and the response still has to be sent to the client. Use 152 // setOnCloseHandler() to be notified when the RPC completed without cancellation (as best as 153 // the server is able to tell). 154 responseCallObserver.setOnCancelHandler(requestObserver::onCancel); 155 return requestObserver; 156 } 157 158 /** 159 * Echos the request after a delay. It processes the request in-line within the callback. It 160 * uses Context to be notified of RPC cancellation. 161 */ 162 @Override unaryEcho(EchoRequest request, StreamObserver<EchoResponse> responseObserver)163 public void unaryEcho(EchoRequest request, StreamObserver<EchoResponse> responseObserver) { 164 // ServerCallStreamObserver.setOnCancelHandler(Runnable) is not useful for this method, since 165 // this method only returns once it has a result. ServerCallStreamObserver guarantees the 166 // Runnable is not run at the same time as other RPC callback methods (including this method), 167 // so the cancellation notification would be guaranteed to occur too late. 168 System.out.println("\nUnary RPC started: " + request.getMessage()); 169 Context currentContext = Context.current(); 170 // Let's start a multi-part operation. We can check cancellation periodically. 171 for (int i = 0; i < 10; i++) { 172 // ServerCallStreamObserver.isCancelled() returns true only if the RPC is cancelled. 173 // Context.isCancelled() is similar, but also returns true when the RPC completes normally. 174 // It doesn't matter which API is used here. 175 if (currentContext.isCancelled()) { 176 System.out.println("Unary RPC cancelled"); 177 responseObserver.onError( 178 Status.CANCELLED.withDescription("RPC cancelled").asRuntimeException()); 179 return; 180 } 181 182 FutureTask<Void> task = new FutureTask<>(() -> { 183 Thread.sleep(100); // Do some work 184 return null; 185 }); 186 // Some Java blocking APIs have a method to cancel an ongoing operation, like closing an 187 // InputStream or interrupting the thread. We can use a Context listener to call that API 188 // from another thread if the RPC is cancelled. 189 Context.CancellationListener listener = (Context context) -> task.cancel(true); 190 Context.current().addListener(listener, MoreExecutors.directExecutor()); 191 task.run(); // A cancellable operation 192 Context.current().removeListener(listener); 193 194 // gRPC stubs observe io.grpc.Context cancellation, so cancellation is automatically 195 // propagated when performing an RPC. You can use a different Context or use Context.fork() 196 // to disable the automatic propagation. For example, 197 // Context.ROOT.call(() -> futureStub.unaryEcho(request)); 198 // context.fork().call(() -> futureStub.unaryEcho(request)); 199 } 200 responseObserver.onNext( 201 EchoResponse.newBuilder().setMessage(request.getMessage()).build()); 202 responseObserver.onCompleted(); 203 System.out.println("Unary RPC completed"); 204 } 205 } 206 } 207