1 /* 2 * Copyright 2023 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.examples.cancellation; 18 19 import com.google.common.util.concurrent.FutureCallback; 20 import com.google.common.util.concurrent.Futures; 21 import com.google.common.util.concurrent.ListenableFuture; 22 import com.google.common.util.concurrent.MoreExecutors; 23 import io.grpc.Channel; 24 import io.grpc.Context; 25 import io.grpc.Context.CancellableContext; 26 import io.grpc.Grpc; 27 import io.grpc.InsecureChannelCredentials; 28 import io.grpc.ManagedChannel; 29 import io.grpc.Status; 30 import io.grpc.StatusRuntimeException; 31 import io.grpc.examples.echo.EchoGrpc; 32 import io.grpc.examples.echo.EchoRequest; 33 import io.grpc.examples.echo.EchoResponse; 34 import io.grpc.stub.ClientCallStreamObserver; 35 import io.grpc.stub.StreamObserver; 36 import java.util.concurrent.TimeUnit; 37 38 /** 39 * A client that cancels RPCs to an Echo server. 40 */ 41 public class CancellationClient { 42 private final Channel channel; 43 CancellationClient(Channel channel)44 public CancellationClient(Channel channel) { 45 this.channel = channel; 46 } 47 demonstrateCancellation()48 private void demonstrateCancellation() throws Exception { 49 echoBlocking("I'M A BLOCKING CLIENT! HEAR ME ROAR!"); 50 51 // io.grpc.Context can be used to cancel RPCs using any of the stubs. It is the only way to 52 // cancel blocking stub RPCs. io.grpc.Context is a general-purpose alternative to thread 53 // interruption and can be used outside of gRPC, like to coordinate within your application. 54 // 55 // CancellableContext must always be cancelled or closed at the end of its lifetime, otherwise 56 // it could "leak" memory. 57 try (CancellableContext context = Context.current().withCancellation()) { 58 new Thread(() -> { 59 try { 60 Thread.sleep(500); // Do some work 61 } catch (InterruptedException ex) { 62 Thread.currentThread().interrupt(); 63 } 64 // Cancellation reasons are never sent to the server. But they are echoed back to the 65 // client as the RPC failure reason. 66 context.cancel(new RuntimeException("Oops. Messed that up, let me try again")); 67 }).start(); 68 69 // context.run() attaches the context to this thread for gRPC to observe. It also restores 70 // the previous context before returning. 71 context.run(() -> echoBlocking("RAAWRR haha lol hehe AWWRR GRRR")); 72 } 73 74 // Futures cancelled with interruption cancel the RPC. 75 ListenableFuture<EchoResponse> future = echoFuture("Future clie*cough*nt was here!"); 76 Thread.sleep(500); // Do some work 77 // We realize we really don't want to hear that echo. 78 future.cancel(true); 79 Thread.sleep(100); // Make logs more obvious. Cancel is async 80 81 ClientCallStreamObserver<EchoRequest> reqCallObserver = echoAsync("Testing, testing, 1, 2, 3"); 82 reqCallObserver.onCompleted(); 83 Thread.sleep(500); // Make logs more obvious. Wait for completion 84 85 // Async's onError() will cancel. But the method can't be called concurrently with other calls 86 // on the StreamObserver. If you need thread-safety, use CancellableContext as above. 87 StreamObserver<EchoRequest> reqObserver = echoAsync("... async client... is the... best..."); 88 try { 89 Thread.sleep(500); // Do some work 90 } catch (InterruptedException ex) { 91 Thread.currentThread().interrupt(); 92 } 93 // Since reqObserver.onCompleted() hasn't been called, we can use onError(). 94 reqObserver.onError(new RuntimeException("That was weak...")); 95 Thread.sleep(100); // Make logs more obvious. Cancel is async 96 97 // Async's cancel() will cancel. Also may not be called concurrently with other calls on the 98 // StreamObserver. 99 reqCallObserver = echoAsync("Async client or bust!"); 100 reqCallObserver.onCompleted(); 101 try { 102 Thread.sleep(250); // Do some work 103 } catch (InterruptedException ex) { 104 Thread.currentThread().interrupt(); 105 } 106 // Since onCompleted() has been called, we can't use onError(). It is safe to use cancel() 107 // regardless of onCompleted() being called. 108 reqCallObserver.cancel("That's enough. I'm bored", null); 109 Thread.sleep(100); // Make logs more obvious. Cancel is async 110 } 111 112 /** Say hello to server, just like in helloworld example. */ echoBlocking(String text)113 public void echoBlocking(String text) { 114 System.out.println("\nYelling: " + text); 115 EchoRequest request = EchoRequest.newBuilder().setMessage(text).build(); 116 EchoResponse response; 117 try { 118 response = EchoGrpc.newBlockingStub(channel).unaryEcho(request); 119 } catch (StatusRuntimeException e) { 120 System.out.println("RPC failed: " + e.getStatus()); 121 return; 122 } 123 System.out.println("Echo: " + response.getMessage()); 124 } 125 126 /** Say hello to the server, but using future API. */ echoFuture(String text)127 public ListenableFuture<EchoResponse> echoFuture(String text) { 128 System.out.println("\nYelling: " + text); 129 EchoRequest request = EchoRequest.newBuilder().setMessage(text).build(); 130 ListenableFuture<EchoResponse> future = EchoGrpc.newFutureStub(channel).unaryEcho(request); 131 Futures.addCallback(future, new FutureCallback<EchoResponse>() { 132 @Override 133 public void onSuccess(EchoResponse response) { 134 System.out.println("Echo: " + response.getMessage()); 135 } 136 137 @Override 138 public void onFailure(Throwable t) { 139 System.out.println("RPC failed: " + Status.fromThrowable(t)); 140 } 141 }, MoreExecutors.directExecutor()); 142 return future; 143 } 144 145 /** Say hello to the server, but using async API and cancelling. */ echoAsync(String text)146 public ClientCallStreamObserver<EchoRequest> echoAsync(String text) { 147 System.out.println("\nYelling: " + text); 148 EchoRequest request = EchoRequest.newBuilder().setMessage(text).build(); 149 150 // Client-streaming and bidirectional RPCs can cast the returned StreamObserver to 151 // ClientCallStreamObserver. 152 // 153 // Unary and server-streaming stub methods don't return a StreamObserver. For such RPCs, you can 154 // use ClientResponseObserver to get the ClientCallStreamObserver. For example: 155 // EchoGrpc.newStub(channel).unaryEcho(new ClientResponseObserver<EchoResponse>() {...}); 156 // Since ClientCallStreamObserver.cancel() is not thread-safe, it isn't safe to call from 157 // another thread until the RPC stub method (e.g., unaryEcho()) returns. 158 ClientCallStreamObserver<EchoRequest> reqObserver = (ClientCallStreamObserver<EchoRequest>) 159 EchoGrpc.newStub(channel).bidirectionalStreamingEcho(new StreamObserver<EchoResponse>() { 160 @Override 161 public void onNext(EchoResponse response) { 162 System.out.println("Echo: " + response.getMessage()); 163 } 164 165 @Override 166 public void onCompleted() { 167 System.out.println("RPC completed"); 168 } 169 170 @Override 171 public void onError(Throwable t) { 172 System.out.println("RPC failed: " + Status.fromThrowable(t)); 173 } 174 }); 175 176 reqObserver.onNext(request); 177 return reqObserver; 178 } 179 180 /** 181 * Cancel RPCs to a server. If provided, the first element of {@code args} is the target server. 182 */ main(String[] args)183 public static void main(String[] args) throws Exception { 184 String target = "localhost:50051"; 185 if (args.length > 0) { 186 if ("--help".equals(args[0])) { 187 System.err.println("Usage: [target]"); 188 System.err.println(""); 189 System.err.println(" target The server to connect to. Defaults to " + target); 190 System.exit(1); 191 } 192 target = args[0]; 193 } 194 195 ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) 196 .build(); 197 try { 198 CancellationClient client = new CancellationClient(channel); 199 client.demonstrateCancellation(); 200 } finally { 201 channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); 202 } 203 } 204 } 205