1 /* 2 * Copyright 2023 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.examples.multiplex; 18 19 import com.google.common.collect.ImmutableList; 20 import com.google.common.util.concurrent.AbstractFuture; 21 import io.grpc.Channel; 22 import io.grpc.Grpc; 23 import io.grpc.InsecureChannelCredentials; 24 import io.grpc.ManagedChannel; 25 import io.grpc.Status; 26 import io.grpc.StatusRuntimeException; 27 import io.grpc.examples.helloworld.GreeterGrpc; 28 import io.grpc.examples.helloworld.HelloReply; 29 import io.grpc.examples.helloworld.HelloRequest; 30 import io.grpc.examples.echo.EchoGrpc; 31 import io.grpc.examples.echo.EchoRequest; 32 import io.grpc.examples.echo.EchoResponse; 33 import io.grpc.examples.helloworld.HelloWorldClient; 34 import io.grpc.stub.StreamObserver; 35 import java.util.ArrayList; 36 import java.util.List; 37 import java.util.Random; 38 import java.util.concurrent.TimeUnit; 39 import java.util.logging.Level; 40 import java.util.logging.Logger; 41 import javax.annotation.Nullable; 42 43 44 /** 45 * A client that shares a channel across multiple stubs to a single service and across services 46 * being provided by one server process. 47 */ 48 public class SharingClient { 49 private static final Logger logger = Logger.getLogger( 50 HelloWorldClient.class.getName()); 51 52 private final GreeterGrpc.GreeterBlockingStub greeterStub1; 53 private final GreeterGrpc.GreeterBlockingStub greeterStub2; 54 private final EchoGrpc.EchoStub echoStub; 55 56 private Random random = new Random(); 57 58 /** Construct client for accessing HelloWorld server using the existing channel. */ SharingClient(Channel channel)59 public SharingClient(Channel channel) { 60 // 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to 61 // shut it down. 62 63 // Passing Channels to code makes code easier to test and makes it easier to reuse Channels. 64 greeterStub1 = GreeterGrpc.newBlockingStub(channel); 65 greeterStub2 = GreeterGrpc.newBlockingStub(channel); 66 echoStub = EchoGrpc.newStub(channel); 67 } 68 69 /** Say hello to server. */ greet(String name, GreeterGrpc.GreeterBlockingStub stub, String stubName)70 private void greet(String name, GreeterGrpc.GreeterBlockingStub stub, String stubName) 71 throws InterruptedException { 72 System.out.println("Will try to greet " + name + " using " + stubName); 73 HelloRequest request = HelloRequest.newBuilder().setName(name).build(); 74 HelloReply response; 75 try { 76 response = stub.sayHello(request); 77 } catch (StatusRuntimeException e) { 78 logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); 79 return; 80 } 81 System.out.println("Greeting: " + response.getMessage()); 82 // pause to allow interleaving 83 Thread.sleep(1000); 84 } 85 greet1(String name)86 public void greet1(String name) throws InterruptedException { 87 greet(name, greeterStub1, "greeter #1"); 88 } 89 greet2(String name)90 public void greet2(String name) throws InterruptedException { 91 greet(name, greeterStub2, "greeter #2"); 92 } 93 initiateEchos(List<String> valuesToSend)94 public StreamingFuture<List<String>> initiateEchos(List<String> valuesToSend) { 95 StreamingFuture<List<String>> future = new StreamingFuture<List<String>> (); 96 List<String> valuesReceived = new ArrayList<>(); 97 98 // The logic that gets called by the framework during the RPC's lifecycle 99 StreamObserver<EchoResponse> responseObserver = new StreamObserver<EchoResponse>() { 100 @Override 101 public void onNext(EchoResponse response) { 102 System.out.println("Received an echo: " + response.getMessage()); 103 valuesReceived.add(response.getMessage()); 104 } 105 106 @Override 107 public void onError(Throwable t) { 108 logger.warning("Echo Failed: {0}" + Status.fromThrowable(t)); 109 future.setException(t); 110 } 111 112 @Override 113 public void onCompleted() { 114 System.out.println("Server acknowledged end of echo stream."); 115 future.set(valuesReceived); 116 } 117 }; 118 119 future.setObserver(responseObserver); 120 121 new Thread(new Runnable() { 122 public void run() { 123 StreamObserver<EchoRequest> requestObserver = 124 echoStub.bidirectionalStreamingEcho(responseObserver); 125 126 try { 127 for (String curValue : valuesToSend) { 128 System.out.println("Sending an echo request for: " + curValue); 129 EchoRequest req = EchoRequest.newBuilder().setMessage(curValue).build(); 130 requestObserver.onNext(req); 131 132 // Sleep for a bit before sending the next one. 133 Thread.sleep(random.nextInt(1000) + 500); 134 } 135 } catch (RuntimeException e) { 136 // Cancel RPC 137 requestObserver.onError(e); 138 throw e; 139 } catch (InterruptedException e) { 140 Thread.currentThread().interrupt(); 141 requestObserver.onError(e); 142 return; 143 } 144 145 // Mark the end of requests 146 requestObserver.onCompleted(); 147 } 148 }).start(); 149 150 return future; 151 } 152 153 /** 154 * Greet server. If provided, the first element of {@code args} is the name to use in the 155 * greeting. The second argument is the target server. 156 * You can see the multiplexing in the server logs. 157 */ main(String[] args)158 public static void main(String[] args) throws Exception { 159 String user = "world"; 160 // Access a service running on the local machine on port 50051 161 String target = "localhost:50051"; 162 // Allow passing in the user and target strings as command line arguments 163 if (args.length > 0) { 164 if ("--help".equals(args[0])) { 165 System.err.println("Usage: [name [target]]"); 166 System.err.println(""); 167 System.err.println(" name The name you wish to be greeted by. Defaults to " + user); 168 System.err.println(" target The server to connect to. Defaults to " + target); 169 System.exit(1); 170 } 171 user = args[0]; 172 } 173 if (args.length > 1) { 174 target = args[1]; 175 } 176 177 // Create a communication channel to the server, known as a Channel. Channels are thread-safe 178 // and reusable. It is common to create channels at the beginning of your application and reuse 179 // them until the application shuts down. 180 // 181 // For the example we use plaintext insecure credentials to avoid needing TLS certificates. To 182 // use TLS, use TlsChannelCredentials instead. 183 ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) 184 .build(); 185 List<String> echoInput = ImmutableList.of("some", "thing", "wicked", "this", "way", "comes"); 186 try { 187 SharingClient client = new SharingClient(channel); 188 189 StreamingFuture<List<String>> future = client.initiateEchos(echoInput); 190 client.greet1(user + " the great"); 191 client.greet2(user + " the lesser"); 192 client.greet1(user + " the humble"); 193 // Receiving happens asynchronously 194 195 String resultStr = future.get(1, TimeUnit.MINUTES).toString(); 196 System.out.println("The echo requests and results were:"); 197 System.out.println(echoInput.toString()); 198 System.out.println(resultStr); 199 200 if (!future.isDone()) { 201 System.err.println("Streaming rpc failed to complete in 1 minute"); 202 } 203 } finally { 204 // ManagedChannels use resources like threads and TCP connections. To prevent leaking these 205 // resources the channel should be shut down when it will no longer be used. If it may be used 206 // again leave it running. 207 channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); 208 } 209 } 210 211 private class StreamingFuture<RespT> extends AbstractFuture<RespT> { 212 213 private StreamObserver<EchoResponse> responseObserver = null; 214 setObserver(StreamObserver<EchoResponse> responseObserver)215 private void setObserver(StreamObserver<EchoResponse> responseObserver) { 216 this.responseObserver = responseObserver; 217 } 218 219 @Override interruptTask()220 protected void interruptTask() { 221 if (responseObserver != null) { 222 responseObserver.onError(Status.ABORTED.asException()); 223 } 224 225 } 226 227 // These are needed for visibility from the parent object 228 @Override set(@ullable RespT resp)229 protected boolean set(@Nullable RespT resp) { 230 return super.set(resp); 231 } 232 233 @Override setException(Throwable throwable)234 protected boolean setException(Throwable throwable) { 235 return super.setException(throwable); 236 } 237 238 } 239 } 240