• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2016 The Dagger Authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package dagger.grpc.server;
18 
19 import io.grpc.Metadata;
20 import io.grpc.MethodDescriptor;
21 import io.grpc.MethodDescriptor.Marshaller;
22 import io.grpc.ServerCall;
23 import io.grpc.ServerCall.Listener;
24 import io.grpc.ServerCallHandler;
25 import io.grpc.ServerMethodDefinition;
26 import io.grpc.ServerServiceDefinition;
27 import io.grpc.Status;
28 import java.io.InputStream;
29 
30 /**
31  * A {@link ServerCallHandler} that handles calls for a particular method by delegating to a handler
32  * in a {@link ServerServiceDefinition} returned by a factory.
33  *
34  * @param <RequestT> the type of the request payloads
35  * @param <ResponseT> the type of the response payloads
36  */
37 public final class ProxyServerCallHandler<RequestT, ResponseT>
38     implements ServerCallHandler<InputStream, InputStream> {
39 
40   /**
41    * A factory for the {@link ServerServiceDefinition} that a {@link ProxyServerCallHandler}
42    * delegates to.
43    */
44   public interface ServiceDefinitionFactory {
45     /**
46      * Returns a service definition that contains a {@link ServerCallHandler} for the
47      * {@link ProxyServerCallHandler}'s method.
48      */
getServiceDefinition(Metadata headers)49     ServerServiceDefinition getServiceDefinition(Metadata headers);
50   }
51 
52   private final MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor;
53   private final ServiceDefinitionFactory delegateServiceDefinitionFactory;
54 
55   /**
56    * Returns a proxy method definition for {@code methodDescriptor}.
57    *
58    * @param delegateServiceDefinitionFactory factory for the delegate service definition
59    */
proxyMethod( MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor, ServiceDefinitionFactory delegateServiceDefinitionFactory)60   public static <RequestT, ResponseT> ServerMethodDefinition<InputStream, InputStream> proxyMethod(
61       MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor,
62       ServiceDefinitionFactory delegateServiceDefinitionFactory) {
63     return ServerMethodDefinition.create(
64         MethodDescriptor.create(
65             delegateMethodDescriptor.getType(),
66             delegateMethodDescriptor.getFullMethodName(),
67             IDENTITY_MARSHALLER,
68             IDENTITY_MARSHALLER),
69         new ProxyServerCallHandler<>(delegateMethodDescriptor, delegateServiceDefinitionFactory));
70   }
71 
ProxyServerCallHandler( MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor, ServiceDefinitionFactory delegateServiceDefinitionFactory)72   ProxyServerCallHandler(
73       MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor,
74       ServiceDefinitionFactory delegateServiceDefinitionFactory) {
75     this.delegateMethodDescriptor = delegateMethodDescriptor;
76     this.delegateServiceDefinitionFactory = delegateServiceDefinitionFactory;
77   }
78 
79   @Override
startCall( ServerCall<InputStream, InputStream> call, Metadata headers)80   public Listener<InputStream> startCall(
81       ServerCall<InputStream, InputStream> call,
82       Metadata headers) {
83     ServerMethodDefinition<RequestT, ResponseT> delegateMethod = getMethodDefinition(headers);
84     Listener<RequestT> delegateListener =
85         delegateMethod
86             .getServerCallHandler()
87             .startCall(new ServerCallAdapter(call, delegateMethod.getMethodDescriptor()), headers);
88     return new ServerCallListenerAdapter(delegateListener);
89   }
90 
91   @SuppressWarnings("unchecked") // Method definition is the correct type.
getMethodDefinition(Metadata headers)92   private ServerMethodDefinition<RequestT, ResponseT> getMethodDefinition(Metadata headers) {
93     String fullMethodName = delegateMethodDescriptor.getFullMethodName();
94     for (ServerMethodDefinition<?, ?> methodDefinition :
95         delegateServiceDefinitionFactory.getServiceDefinition(headers).getMethods()) {
96       if (methodDefinition.getMethodDescriptor().getFullMethodName().equals(fullMethodName)) {
97         return (ServerMethodDefinition<RequestT, ResponseT>) methodDefinition;
98       }
99     }
100     throw new IllegalStateException("Could not find " + fullMethodName);
101   }
102 
103   private static final Marshaller<InputStream> IDENTITY_MARSHALLER =
104       new Marshaller<InputStream>() {
105         @Override
106         public InputStream stream(InputStream value) {
107           return value;
108         }
109 
110         @Override
111         public InputStream parse(InputStream stream) {
112           return stream;
113         }
114       };
115 
116   /** A {@link Listener} that adapts {@code Listener<RequestT>} to {@code Listener<InputStream>}. */
117   private final class ServerCallListenerAdapter extends Listener<InputStream> {
118 
119     private final Listener<RequestT> delegate;
120 
ServerCallListenerAdapter(Listener<RequestT> delegate)121     public ServerCallListenerAdapter(Listener<RequestT> delegate) {
122       this.delegate = delegate;
123     }
124 
125     @Override
onMessage(InputStream message)126     public void onMessage(InputStream message) {
127       delegate.onMessage(delegateMethodDescriptor.parseRequest(message));
128     }
129 
130     @Override
onHalfClose()131     public void onHalfClose() {
132       delegate.onHalfClose();
133     }
134 
135     @Override
onCancel()136     public void onCancel() {
137       delegate.onCancel();
138     }
139 
140     @Override
onComplete()141     public void onComplete() {
142       delegate.onComplete();
143     }
144   }
145 
146   /**
147    * A {@link ServerCall} that adapts {@code ServerCall<InputStream>} to {@code
148    * ServerCall<ResponseT>}.
149    */
150   final class ServerCallAdapter extends ServerCall<RequestT, ResponseT> {
151 
152     private final ServerCall<InputStream, InputStream> delegate;
153     private final MethodDescriptor<RequestT, ResponseT> method;
154 
ServerCallAdapter(ServerCall<InputStream, InputStream> delegate, MethodDescriptor<RequestT, ResponseT> method)155     ServerCallAdapter(ServerCall<InputStream, InputStream> delegate,
156         MethodDescriptor<RequestT, ResponseT> method) {
157       this.delegate = delegate;
158       this.method = method;
159     }
160 
161     @Override
getMethodDescriptor()162     public MethodDescriptor<RequestT, ResponseT> getMethodDescriptor() {
163       return method;
164     }
165 
166     @Override
request(int numMessages)167     public void request(int numMessages) {
168       delegate.request(numMessages);
169     }
170 
171     @Override
sendHeaders(Metadata headers)172     public void sendHeaders(Metadata headers) {
173       delegate.sendHeaders(headers);
174     }
175 
176     @Override
sendMessage(ResponseT message)177     public void sendMessage(ResponseT message) {
178       delegate.sendMessage(delegateMethodDescriptor.streamResponse(message));
179     }
180 
181     @Override
close(Status status, Metadata trailers)182     public void close(Status status, Metadata trailers) {
183       delegate.close(status, trailers);
184     }
185 
186     @Override
isCancelled()187     public boolean isCancelled() {
188       return delegate.isCancelled();
189     }
190   }
191 }
192