• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2023, gRPC Authors All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.examples.grpcproxy;
18 
19 import com.google.common.io.ByteStreams;
20 import io.grpc.CallOptions;
21 import io.grpc.Channel;
22 import io.grpc.ClientCall;
23 import io.grpc.Grpc;
24 import io.grpc.HandlerRegistry;
25 import io.grpc.InsecureChannelCredentials;
26 import io.grpc.InsecureServerCredentials;
27 import io.grpc.ManagedChannel;
28 import io.grpc.Metadata;
29 import io.grpc.MethodDescriptor;
30 import io.grpc.Server;
31 import io.grpc.ServerCall;
32 import io.grpc.ServerCallHandler;
33 import io.grpc.ServerMethodDefinition;
34 import io.grpc.Status;
35 import java.io.ByteArrayInputStream;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.util.concurrent.TimeUnit;
39 import java.util.logging.Logger;
40 
41 /**
42  * A grpc-level proxy. GrpcProxy itself can be used unmodified to proxy any service for both unary
43  * and streaming. It doesn't care what type of messages are being used. The Registry class causes it
44  * to be called for any inbound RPC, and uses plain bytes for messages which avoids marshalling
45  * messages and the need for Protobuf schema information.
46  *
47  * <p>Route guide has unary and streaming RPCs which makes it a nice showcase. To test with route
48  * guide, run each in a separate terminal window:
49  * <pre>{@code
50  *   ./build/install/examples/bin/route-guide-server
51  *   ./build/install/examples/bin/grpc-proxy
52  *   ./build/install/examples/bin/route-guide-client localhost:8981
53  * }<pre>
54  *
55  * <p>You can verify the proxy is being used by shutting down the proxy and seeing the client fail.
56  */
57 public final class GrpcProxy<ReqT, RespT> implements ServerCallHandler<ReqT, RespT> {
58   private static final Logger logger = Logger.getLogger(GrpcProxy.class.getName());
59 
60   private final Channel channel;
61 
GrpcProxy(Channel channel)62   public GrpcProxy(Channel channel) {
63     this.channel = channel;
64   }
65 
66   @Override
startCall( ServerCall<ReqT, RespT> serverCall, Metadata headers)67   public ServerCall.Listener<ReqT> startCall(
68       ServerCall<ReqT, RespT> serverCall, Metadata headers) {
69     ClientCall<ReqT, RespT> clientCall
70         = channel.newCall(serverCall.getMethodDescriptor(), CallOptions.DEFAULT);
71     CallProxy<ReqT, RespT> proxy = new CallProxy<ReqT, RespT>(serverCall, clientCall);
72     clientCall.start(proxy.clientCallListener, headers);
73     serverCall.request(1);
74     clientCall.request(1);
75     return proxy.serverCallListener;
76   }
77 
78   private static class CallProxy<ReqT, RespT> {
79     final RequestProxy serverCallListener;
80     final ResponseProxy clientCallListener;
81 
CallProxy(ServerCall<ReqT, RespT> serverCall, ClientCall<ReqT, RespT> clientCall)82     public CallProxy(ServerCall<ReqT, RespT> serverCall, ClientCall<ReqT, RespT> clientCall) {
83       serverCallListener = new RequestProxy(clientCall);
84       clientCallListener = new ResponseProxy(serverCall);
85     }
86 
87     private class RequestProxy extends ServerCall.Listener<ReqT> {
88       private final ClientCall<ReqT, ?> clientCall;
89       // Hold 'this' lock when accessing
90       private boolean needToRequest;
91 
RequestProxy(ClientCall<ReqT, ?> clientCall)92       public RequestProxy(ClientCall<ReqT, ?> clientCall) {
93         this.clientCall = clientCall;
94       }
95 
onCancel()96       @Override public void onCancel() {
97         clientCall.cancel("Server cancelled", null);
98       }
99 
onHalfClose()100       @Override public void onHalfClose() {
101         clientCall.halfClose();
102       }
103 
onMessage(ReqT message)104       @Override public void onMessage(ReqT message) {
105         clientCall.sendMessage(message);
106         synchronized (this) {
107           if (clientCall.isReady()) {
108             clientCallListener.serverCall.request(1);
109           } else {
110             // The outgoing call is not ready for more requests. Stop requesting additional data and
111             // wait for it to catch up.
112             needToRequest = true;
113           }
114         }
115       }
116 
onReady()117       @Override public void onReady() {
118         clientCallListener.onServerReady();
119       }
120 
121       // Called from ResponseProxy, which is a different thread than the ServerCall.Listener
122       // callbacks.
onClientReady()123       synchronized void onClientReady() {
124         if (needToRequest) {
125           clientCallListener.serverCall.request(1);
126           needToRequest = false;
127         }
128       }
129     }
130 
131     private class ResponseProxy extends ClientCall.Listener<RespT> {
132       private final ServerCall<?, RespT> serverCall;
133       // Hold 'this' lock when accessing
134       private boolean needToRequest;
135 
ResponseProxy(ServerCall<?, RespT> serverCall)136       public ResponseProxy(ServerCall<?, RespT> serverCall) {
137         this.serverCall = serverCall;
138       }
139 
onClose(Status status, Metadata trailers)140       @Override public void onClose(Status status, Metadata trailers) {
141         serverCall.close(status, trailers);
142       }
143 
onHeaders(Metadata headers)144       @Override public void onHeaders(Metadata headers) {
145         serverCall.sendHeaders(headers);
146       }
147 
onMessage(RespT message)148       @Override public void onMessage(RespT message) {
149         serverCall.sendMessage(message);
150         synchronized (this) {
151           if (serverCall.isReady()) {
152             serverCallListener.clientCall.request(1);
153           } else {
154             // The incoming call is not ready for more responses. Stop requesting additional data
155             // and wait for it to catch up.
156             needToRequest = true;
157           }
158         }
159       }
160 
onReady()161       @Override public void onReady() {
162         serverCallListener.onClientReady();
163       }
164 
165       // Called from RequestProxy, which is a different thread than the ClientCall.Listener
166       // callbacks.
onServerReady()167       synchronized void onServerReady() {
168         if (needToRequest) {
169           serverCallListener.clientCall.request(1);
170           needToRequest = false;
171         }
172       }
173     }
174   }
175 
176   private static class ByteMarshaller implements MethodDescriptor.Marshaller<byte[]> {
parse(InputStream stream)177     @Override public byte[] parse(InputStream stream) {
178       try {
179         return ByteStreams.toByteArray(stream);
180       } catch (IOException ex) {
181         throw new RuntimeException();
182       }
183     }
184 
stream(byte[] value)185     @Override public InputStream stream(byte[] value) {
186       return new ByteArrayInputStream(value);
187     }
188   };
189 
190   public static class Registry extends HandlerRegistry {
191     private final MethodDescriptor.Marshaller<byte[]> byteMarshaller = new ByteMarshaller();
192     private final ServerCallHandler<byte[], byte[]> handler;
193 
Registry(ServerCallHandler<byte[], byte[]> handler)194     public Registry(ServerCallHandler<byte[], byte[]> handler) {
195       this.handler = handler;
196     }
197 
198     @Override
lookupMethod(String methodName, String authority)199     public ServerMethodDefinition<?,?> lookupMethod(String methodName, String authority) {
200       MethodDescriptor<byte[], byte[]> methodDescriptor
201           = MethodDescriptor.newBuilder(byteMarshaller, byteMarshaller)
202           .setFullMethodName(methodName)
203           .setType(MethodDescriptor.MethodType.UNKNOWN)
204           .build();
205       return ServerMethodDefinition.create(methodDescriptor, handler);
206     }
207   }
208 
main(String[] args)209   public static void main(String[] args) throws IOException, InterruptedException {
210     String target = "localhost:8980";
211     ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
212         .build();
213     logger.info("Proxy will connect to " + target);
214     GrpcProxy<byte[], byte[]> proxy = new GrpcProxy<byte[], byte[]>(channel);
215     int port = 8981;
216     Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
217         .fallbackHandlerRegistry(new Registry(proxy))
218         .build()
219         .start();
220     logger.info("Proxy started, listening on " + port);
221     Runtime.getRuntime().addShutdownHook(new Thread() {
222       @Override
223       public void run() {
224         server.shutdown();
225         try {
226           server.awaitTermination(10, TimeUnit.SECONDS);
227         } catch (InterruptedException ex) {
228           Thread.currentThread().interrupt();
229         }
230         server.shutdownNow();
231         channel.shutdownNow();
232       }
233     });
234     server.awaitTermination();
235     if (!channel.awaitTermination(1, TimeUnit.SECONDS)) {
236       System.out.println("Channel didn't shut down promptly");
237     }
238   }
239 }
240