1 /* 2 * Copyright 2023, gRPC Authors All rights reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.examples.grpcproxy; 18 19 import com.google.common.io.ByteStreams; 20 import io.grpc.CallOptions; 21 import io.grpc.Channel; 22 import io.grpc.ClientCall; 23 import io.grpc.Grpc; 24 import io.grpc.HandlerRegistry; 25 import io.grpc.InsecureChannelCredentials; 26 import io.grpc.InsecureServerCredentials; 27 import io.grpc.ManagedChannel; 28 import io.grpc.Metadata; 29 import io.grpc.MethodDescriptor; 30 import io.grpc.Server; 31 import io.grpc.ServerCall; 32 import io.grpc.ServerCallHandler; 33 import io.grpc.ServerMethodDefinition; 34 import io.grpc.Status; 35 import java.io.ByteArrayInputStream; 36 import java.io.IOException; 37 import java.io.InputStream; 38 import java.util.concurrent.TimeUnit; 39 import java.util.logging.Logger; 40 41 /** 42 * A grpc-level proxy. GrpcProxy itself can be used unmodified to proxy any service for both unary 43 * and streaming. It doesn't care what type of messages are being used. The Registry class causes it 44 * to be called for any inbound RPC, and uses plain bytes for messages which avoids marshalling 45 * messages and the need for Protobuf schema information. 46 * 47 * <p>Route guide has unary and streaming RPCs which makes it a nice showcase. To test with route 48 * guide, run each in a separate terminal window: 49 * <pre>{@code 50 * ./build/install/examples/bin/route-guide-server 51 * ./build/install/examples/bin/grpc-proxy 52 * ./build/install/examples/bin/route-guide-client localhost:8981 53 * }<pre> 54 * 55 * <p>You can verify the proxy is being used by shutting down the proxy and seeing the client fail. 56 */ 57 public final class GrpcProxy<ReqT, RespT> implements ServerCallHandler<ReqT, RespT> { 58 private static final Logger logger = Logger.getLogger(GrpcProxy.class.getName()); 59 60 private final Channel channel; 61 GrpcProxy(Channel channel)62 public GrpcProxy(Channel channel) { 63 this.channel = channel; 64 } 65 66 @Override startCall( ServerCall<ReqT, RespT> serverCall, Metadata headers)67 public ServerCall.Listener<ReqT> startCall( 68 ServerCall<ReqT, RespT> serverCall, Metadata headers) { 69 ClientCall<ReqT, RespT> clientCall 70 = channel.newCall(serverCall.getMethodDescriptor(), CallOptions.DEFAULT); 71 CallProxy<ReqT, RespT> proxy = new CallProxy<ReqT, RespT>(serverCall, clientCall); 72 clientCall.start(proxy.clientCallListener, headers); 73 serverCall.request(1); 74 clientCall.request(1); 75 return proxy.serverCallListener; 76 } 77 78 private static class CallProxy<ReqT, RespT> { 79 final RequestProxy serverCallListener; 80 final ResponseProxy clientCallListener; 81 CallProxy(ServerCall<ReqT, RespT> serverCall, ClientCall<ReqT, RespT> clientCall)82 public CallProxy(ServerCall<ReqT, RespT> serverCall, ClientCall<ReqT, RespT> clientCall) { 83 serverCallListener = new RequestProxy(clientCall); 84 clientCallListener = new ResponseProxy(serverCall); 85 } 86 87 private class RequestProxy extends ServerCall.Listener<ReqT> { 88 private final ClientCall<ReqT, ?> clientCall; 89 // Hold 'this' lock when accessing 90 private boolean needToRequest; 91 RequestProxy(ClientCall<ReqT, ?> clientCall)92 public RequestProxy(ClientCall<ReqT, ?> clientCall) { 93 this.clientCall = clientCall; 94 } 95 onCancel()96 @Override public void onCancel() { 97 clientCall.cancel("Server cancelled", null); 98 } 99 onHalfClose()100 @Override public void onHalfClose() { 101 clientCall.halfClose(); 102 } 103 onMessage(ReqT message)104 @Override public void onMessage(ReqT message) { 105 clientCall.sendMessage(message); 106 synchronized (this) { 107 if (clientCall.isReady()) { 108 clientCallListener.serverCall.request(1); 109 } else { 110 // The outgoing call is not ready for more requests. Stop requesting additional data and 111 // wait for it to catch up. 112 needToRequest = true; 113 } 114 } 115 } 116 onReady()117 @Override public void onReady() { 118 clientCallListener.onServerReady(); 119 } 120 121 // Called from ResponseProxy, which is a different thread than the ServerCall.Listener 122 // callbacks. onClientReady()123 synchronized void onClientReady() { 124 if (needToRequest) { 125 clientCallListener.serverCall.request(1); 126 needToRequest = false; 127 } 128 } 129 } 130 131 private class ResponseProxy extends ClientCall.Listener<RespT> { 132 private final ServerCall<?, RespT> serverCall; 133 // Hold 'this' lock when accessing 134 private boolean needToRequest; 135 ResponseProxy(ServerCall<?, RespT> serverCall)136 public ResponseProxy(ServerCall<?, RespT> serverCall) { 137 this.serverCall = serverCall; 138 } 139 onClose(Status status, Metadata trailers)140 @Override public void onClose(Status status, Metadata trailers) { 141 serverCall.close(status, trailers); 142 } 143 onHeaders(Metadata headers)144 @Override public void onHeaders(Metadata headers) { 145 serverCall.sendHeaders(headers); 146 } 147 onMessage(RespT message)148 @Override public void onMessage(RespT message) { 149 serverCall.sendMessage(message); 150 synchronized (this) { 151 if (serverCall.isReady()) { 152 serverCallListener.clientCall.request(1); 153 } else { 154 // The incoming call is not ready for more responses. Stop requesting additional data 155 // and wait for it to catch up. 156 needToRequest = true; 157 } 158 } 159 } 160 onReady()161 @Override public void onReady() { 162 serverCallListener.onClientReady(); 163 } 164 165 // Called from RequestProxy, which is a different thread than the ClientCall.Listener 166 // callbacks. onServerReady()167 synchronized void onServerReady() { 168 if (needToRequest) { 169 serverCallListener.clientCall.request(1); 170 needToRequest = false; 171 } 172 } 173 } 174 } 175 176 private static class ByteMarshaller implements MethodDescriptor.Marshaller<byte[]> { parse(InputStream stream)177 @Override public byte[] parse(InputStream stream) { 178 try { 179 return ByteStreams.toByteArray(stream); 180 } catch (IOException ex) { 181 throw new RuntimeException(); 182 } 183 } 184 stream(byte[] value)185 @Override public InputStream stream(byte[] value) { 186 return new ByteArrayInputStream(value); 187 } 188 }; 189 190 public static class Registry extends HandlerRegistry { 191 private final MethodDescriptor.Marshaller<byte[]> byteMarshaller = new ByteMarshaller(); 192 private final ServerCallHandler<byte[], byte[]> handler; 193 Registry(ServerCallHandler<byte[], byte[]> handler)194 public Registry(ServerCallHandler<byte[], byte[]> handler) { 195 this.handler = handler; 196 } 197 198 @Override lookupMethod(String methodName, String authority)199 public ServerMethodDefinition<?,?> lookupMethod(String methodName, String authority) { 200 MethodDescriptor<byte[], byte[]> methodDescriptor 201 = MethodDescriptor.newBuilder(byteMarshaller, byteMarshaller) 202 .setFullMethodName(methodName) 203 .setType(MethodDescriptor.MethodType.UNKNOWN) 204 .build(); 205 return ServerMethodDefinition.create(methodDescriptor, handler); 206 } 207 } 208 main(String[] args)209 public static void main(String[] args) throws IOException, InterruptedException { 210 String target = "localhost:8980"; 211 ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()) 212 .build(); 213 logger.info("Proxy will connect to " + target); 214 GrpcProxy<byte[], byte[]> proxy = new GrpcProxy<byte[], byte[]>(channel); 215 int port = 8981; 216 Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) 217 .fallbackHandlerRegistry(new Registry(proxy)) 218 .build() 219 .start(); 220 logger.info("Proxy started, listening on " + port); 221 Runtime.getRuntime().addShutdownHook(new Thread() { 222 @Override 223 public void run() { 224 server.shutdown(); 225 try { 226 server.awaitTermination(10, TimeUnit.SECONDS); 227 } catch (InterruptedException ex) { 228 Thread.currentThread().interrupt(); 229 } 230 server.shutdownNow(); 231 channel.shutdownNow(); 232 } 233 }); 234 server.awaitTermination(); 235 if (!channel.awaitTermination(1, TimeUnit.SECONDS)) { 236 System.out.println("Channel didn't shut down promptly"); 237 } 238 } 239 } 240