• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2023 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.examples.multiplex;
18 
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.util.concurrent.AbstractFuture;
21 import io.grpc.Channel;
22 import io.grpc.Grpc;
23 import io.grpc.InsecureChannelCredentials;
24 import io.grpc.ManagedChannel;
25 import io.grpc.Status;
26 import io.grpc.StatusRuntimeException;
27 import io.grpc.examples.helloworld.GreeterGrpc;
28 import io.grpc.examples.helloworld.HelloReply;
29 import io.grpc.examples.helloworld.HelloRequest;
30 import io.grpc.examples.echo.EchoGrpc;
31 import io.grpc.examples.echo.EchoRequest;
32 import io.grpc.examples.echo.EchoResponse;
33 import io.grpc.examples.helloworld.HelloWorldClient;
34 import io.grpc.stub.StreamObserver;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.Random;
38 import java.util.concurrent.TimeUnit;
39 import java.util.logging.Level;
40 import java.util.logging.Logger;
41 import javax.annotation.Nullable;
42 
43 
44 /**
45  * A client that shares a channel across multiple stubs to a single service and across services
46  * being provided by one server process.
47  */
48 public class SharingClient {
49   private static final Logger logger = Logger.getLogger(
50       HelloWorldClient.class.getName());
51 
52   private final GreeterGrpc.GreeterBlockingStub greeterStub1;
53   private final GreeterGrpc.GreeterBlockingStub greeterStub2;
54   private final EchoGrpc.EchoStub echoStub;
55 
56   private Random random = new Random();
57 
58   /** Construct client for accessing HelloWorld server using the existing channel. */
SharingClient(Channel channel)59   public SharingClient(Channel channel) {
60     // 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to
61     // shut it down.
62 
63     // Passing Channels to code makes code easier to test and makes it easier to reuse Channels.
64     greeterStub1 = GreeterGrpc.newBlockingStub(channel);
65     greeterStub2 = GreeterGrpc.newBlockingStub(channel);
66     echoStub = EchoGrpc.newStub(channel);
67   }
68 
69   /** Say hello to server. */
greet(String name, GreeterGrpc.GreeterBlockingStub stub, String stubName)70   private void greet(String name, GreeterGrpc.GreeterBlockingStub stub, String stubName)
71       throws InterruptedException {
72     System.out.println("Will try to greet " + name + " using " + stubName);
73     HelloRequest request = HelloRequest.newBuilder().setName(name).build();
74     HelloReply response;
75     try {
76       response = stub.sayHello(request);
77     } catch (StatusRuntimeException e) {
78       logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
79       return;
80     }
81     System.out.println("Greeting: " + response.getMessage());
82     // pause to allow interleaving
83     Thread.sleep(1000);
84   }
85 
greet1(String name)86   public void greet1(String name) throws InterruptedException {
87     greet(name, greeterStub1, "greeter #1");
88   }
89 
greet2(String name)90   public void greet2(String name) throws InterruptedException {
91     greet(name, greeterStub2, "greeter #2");
92   }
93 
initiateEchos(List<String> valuesToSend)94   public StreamingFuture<List<String>> initiateEchos(List<String> valuesToSend) {
95     StreamingFuture<List<String>> future = new StreamingFuture<List<String>> ();
96     List<String> valuesReceived = new ArrayList<>();
97 
98     // The logic that gets called by the framework during the RPC's lifecycle
99     StreamObserver<EchoResponse> responseObserver = new StreamObserver<EchoResponse>() {
100       @Override
101       public void onNext(EchoResponse response) {
102         System.out.println("Received an echo: " + response.getMessage());
103         valuesReceived.add(response.getMessage());
104       }
105 
106       @Override
107       public void onError(Throwable t) {
108         logger.warning("Echo Failed: {0}" + Status.fromThrowable(t));
109         future.setException(t);
110       }
111 
112       @Override
113       public void onCompleted() {
114         System.out.println("Server acknowledged end of echo stream.");
115         future.set(valuesReceived);
116       }
117     };
118 
119     future.setObserver(responseObserver);
120 
121     new Thread(new Runnable() {
122       public void run() {
123         StreamObserver<EchoRequest> requestObserver =
124             echoStub.bidirectionalStreamingEcho(responseObserver);
125 
126         try {
127           for (String curValue : valuesToSend) {
128             System.out.println("Sending an echo request for: " + curValue);
129             EchoRequest req = EchoRequest.newBuilder().setMessage(curValue).build();
130             requestObserver.onNext(req);
131 
132             // Sleep for a bit before sending the next one.
133             Thread.sleep(random.nextInt(1000) + 500);
134           }
135         } catch (RuntimeException e) {
136           // Cancel RPC
137           requestObserver.onError(e);
138           throw e;
139         } catch (InterruptedException e) {
140           Thread.currentThread().interrupt();
141           requestObserver.onError(e);
142           return;
143         }
144 
145         // Mark the end of requests
146         requestObserver.onCompleted();
147       }
148     }).start();
149 
150     return future;
151   }
152 
153   /**
154    * Greet server. If provided, the first element of {@code args} is the name to use in the
155    * greeting. The second argument is the target server.
156    * You can see the multiplexing in the server logs.
157    */
main(String[] args)158   public static void main(String[] args) throws Exception {
159     String user = "world";
160     // Access a service running on the local machine on port 50051
161     String target = "localhost:50051";
162     // Allow passing in the user and target strings as command line arguments
163     if (args.length > 0) {
164       if ("--help".equals(args[0])) {
165         System.err.println("Usage: [name [target]]");
166         System.err.println("");
167         System.err.println("  name    The name you wish to be greeted by. Defaults to " + user);
168         System.err.println("  target  The server to connect to. Defaults to " + target);
169         System.exit(1);
170       }
171       user = args[0];
172     }
173     if (args.length > 1) {
174       target = args[1];
175     }
176 
177     // Create a communication channel to the server, known as a Channel. Channels are thread-safe
178     // and reusable. It is common to create channels at the beginning of your application and reuse
179     // them until the application shuts down.
180     //
181     // For the example we use plaintext insecure credentials to avoid needing TLS certificates. To
182     // use TLS, use TlsChannelCredentials instead.
183     ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
184         .build();
185     List<String> echoInput = ImmutableList.of("some", "thing", "wicked", "this", "way", "comes");
186     try {
187       SharingClient client = new SharingClient(channel);
188 
189       StreamingFuture<List<String>> future = client.initiateEchos(echoInput);
190       client.greet1(user + " the great");
191       client.greet2(user + " the lesser");
192       client.greet1(user + " the humble");
193       // Receiving happens asynchronously
194 
195       String resultStr = future.get(1, TimeUnit.MINUTES).toString();
196       System.out.println("The echo requests and results were:");
197       System.out.println(echoInput.toString());
198       System.out.println(resultStr);
199 
200       if (!future.isDone()) {
201         System.err.println("Streaming rpc failed to complete in 1 minute");
202       }
203     } finally {
204       // ManagedChannels use resources like threads and TCP connections. To prevent leaking these
205       // resources the channel should be shut down when it will no longer be used. If it may be used
206       // again leave it running.
207       channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
208     }
209   }
210 
211   private class StreamingFuture<RespT> extends AbstractFuture<RespT> {
212 
213     private StreamObserver<EchoResponse> responseObserver = null;
214 
setObserver(StreamObserver<EchoResponse> responseObserver)215     private void setObserver(StreamObserver<EchoResponse> responseObserver) {
216       this.responseObserver = responseObserver;
217     }
218 
219     @Override
interruptTask()220     protected void interruptTask() {
221       if (responseObserver != null) {
222         responseObserver.onError(Status.ABORTED.asException());
223       }
224 
225     }
226 
227     // These are needed for visibility from the parent object
228     @Override
set(@ullable RespT resp)229     protected boolean set(@Nullable RespT resp) {
230       return super.set(resp);
231     }
232 
233     @Override
setException(Throwable throwable)234     protected boolean setException(Throwable throwable) {
235       return super.setException(throwable);
236     }
237 
238   }
239 }
240