• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2023 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.examples.cancellation;
18 
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import io.grpc.Channel;
24 import io.grpc.Context;
25 import io.grpc.Context.CancellableContext;
26 import io.grpc.Grpc;
27 import io.grpc.InsecureChannelCredentials;
28 import io.grpc.ManagedChannel;
29 import io.grpc.Status;
30 import io.grpc.StatusRuntimeException;
31 import io.grpc.examples.echo.EchoGrpc;
32 import io.grpc.examples.echo.EchoRequest;
33 import io.grpc.examples.echo.EchoResponse;
34 import io.grpc.stub.ClientCallStreamObserver;
35 import io.grpc.stub.StreamObserver;
36 import java.util.concurrent.TimeUnit;
37 
38 /**
39  * A client that cancels RPCs to an Echo server.
40  */
41 public class CancellationClient {
42   private final Channel channel;
43 
CancellationClient(Channel channel)44   public CancellationClient(Channel channel) {
45     this.channel = channel;
46   }
47 
demonstrateCancellation()48   private void demonstrateCancellation() throws Exception {
49     echoBlocking("I'M A BLOCKING CLIENT! HEAR ME ROAR!");
50 
51     // io.grpc.Context can be used to cancel RPCs using any of the stubs. It is the only way to
52     // cancel blocking stub RPCs. io.grpc.Context is a general-purpose alternative to thread
53     // interruption and can be used outside of gRPC, like to coordinate within your application.
54     //
55     // CancellableContext must always be cancelled or closed at the end of its lifetime, otherwise
56     // it could "leak" memory.
57     try (CancellableContext context = Context.current().withCancellation()) {
58       new Thread(() -> {
59         try {
60           Thread.sleep(500); // Do some work
61         } catch (InterruptedException ex) {
62           Thread.currentThread().interrupt();
63         }
64         // Cancellation reasons are never sent to the server. But they are echoed back to the
65         // client as the RPC failure reason.
66         context.cancel(new RuntimeException("Oops. Messed that up, let me try again"));
67       }).start();
68 
69       // context.run() attaches the context to this thread for gRPC to observe. It also restores
70       // the previous context before returning.
71       context.run(() -> echoBlocking("RAAWRR haha lol hehe AWWRR GRRR"));
72     }
73 
74     // Futures cancelled with interruption cancel the RPC.
75     ListenableFuture<EchoResponse> future = echoFuture("Future clie*cough*nt was here!");
76     Thread.sleep(500); // Do some work
77     // We realize we really don't want to hear that echo.
78     future.cancel(true);
79     Thread.sleep(100); // Make logs more obvious. Cancel is async
80 
81     ClientCallStreamObserver<EchoRequest> reqCallObserver = echoAsync("Testing, testing, 1, 2, 3");
82     reqCallObserver.onCompleted();
83     Thread.sleep(500); // Make logs more obvious. Wait for completion
84 
85     // Async's onError() will cancel. But the method can't be called concurrently with other calls
86     // on the StreamObserver. If you need thread-safety, use CancellableContext as above.
87     StreamObserver<EchoRequest> reqObserver = echoAsync("... async client... is the... best...");
88     try {
89       Thread.sleep(500); // Do some work
90     } catch (InterruptedException ex) {
91       Thread.currentThread().interrupt();
92     }
93     // Since reqObserver.onCompleted() hasn't been called, we can use onError().
94     reqObserver.onError(new RuntimeException("That was weak..."));
95     Thread.sleep(100); // Make logs more obvious. Cancel is async
96 
97     // Async's cancel() will cancel. Also may not be called concurrently with other calls on the
98     // StreamObserver.
99     reqCallObserver = echoAsync("Async client or bust!");
100     reqCallObserver.onCompleted();
101     try {
102       Thread.sleep(250); // Do some work
103     } catch (InterruptedException ex) {
104       Thread.currentThread().interrupt();
105     }
106     // Since onCompleted() has been called, we can't use onError(). It is safe to use cancel()
107     // regardless of onCompleted() being called.
108     reqCallObserver.cancel("That's enough. I'm bored", null);
109     Thread.sleep(100); // Make logs more obvious. Cancel is async
110   }
111 
112   /** Say hello to server, just like in helloworld example. */
echoBlocking(String text)113   public void echoBlocking(String text) {
114     System.out.println("\nYelling: " + text);
115     EchoRequest request = EchoRequest.newBuilder().setMessage(text).build();
116     EchoResponse response;
117     try {
118       response = EchoGrpc.newBlockingStub(channel).unaryEcho(request);
119     } catch (StatusRuntimeException e) {
120       System.out.println("RPC failed: " + e.getStatus());
121       return;
122     }
123     System.out.println("Echo: " + response.getMessage());
124   }
125 
126   /** Say hello to the server, but using future API. */
echoFuture(String text)127   public ListenableFuture<EchoResponse> echoFuture(String text) {
128     System.out.println("\nYelling: " + text);
129     EchoRequest request = EchoRequest.newBuilder().setMessage(text).build();
130     ListenableFuture<EchoResponse> future = EchoGrpc.newFutureStub(channel).unaryEcho(request);
131     Futures.addCallback(future, new FutureCallback<EchoResponse>() {
132       @Override
133       public void onSuccess(EchoResponse response) {
134         System.out.println("Echo: " + response.getMessage());
135       }
136 
137       @Override
138       public void onFailure(Throwable t) {
139         System.out.println("RPC failed: " + Status.fromThrowable(t));
140       }
141     }, MoreExecutors.directExecutor());
142     return future;
143   }
144 
145   /** Say hello to the server, but using async API and cancelling. */
echoAsync(String text)146   public ClientCallStreamObserver<EchoRequest> echoAsync(String text) {
147     System.out.println("\nYelling: " + text);
148     EchoRequest request = EchoRequest.newBuilder().setMessage(text).build();
149 
150     // Client-streaming and bidirectional RPCs can cast the returned StreamObserver to
151     // ClientCallStreamObserver.
152     //
153     // Unary and server-streaming stub methods don't return a StreamObserver. For such RPCs, you can
154     // use ClientResponseObserver to get the ClientCallStreamObserver. For example:
155     //     EchoGrpc.newStub(channel).unaryEcho(new ClientResponseObserver<EchoResponse>() {...});
156     // Since ClientCallStreamObserver.cancel() is not thread-safe, it isn't safe to call from
157     // another thread until the RPC stub method (e.g., unaryEcho()) returns.
158     ClientCallStreamObserver<EchoRequest> reqObserver = (ClientCallStreamObserver<EchoRequest>)
159         EchoGrpc.newStub(channel).bidirectionalStreamingEcho(new StreamObserver<EchoResponse>() {
160       @Override
161       public void onNext(EchoResponse response) {
162         System.out.println("Echo: " + response.getMessage());
163       }
164 
165       @Override
166       public void onCompleted() {
167         System.out.println("RPC completed");
168       }
169 
170       @Override
171       public void onError(Throwable t) {
172         System.out.println("RPC failed: " + Status.fromThrowable(t));
173       }
174     });
175 
176     reqObserver.onNext(request);
177     return reqObserver;
178   }
179 
180   /**
181    * Cancel RPCs to a server. If provided, the first element of {@code args} is the target server.
182    */
main(String[] args)183   public static void main(String[] args) throws Exception {
184     String target = "localhost:50051";
185     if (args.length > 0) {
186       if ("--help".equals(args[0])) {
187         System.err.println("Usage: [target]");
188         System.err.println("");
189         System.err.println("  target  The server to connect to. Defaults to " + target);
190         System.exit(1);
191       }
192       target = args[0];
193     }
194 
195     ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
196         .build();
197     try {
198       CancellationClient client = new CancellationClient(channel);
199       client.demonstrateCancellation();
200     } finally {
201       channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
202     }
203   }
204 }
205