1 /* 2 * Copyright (C) 2016 The Dagger Authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package dagger.grpc.server; 18 19 import io.grpc.Metadata; 20 import io.grpc.MethodDescriptor; 21 import io.grpc.MethodDescriptor.Marshaller; 22 import io.grpc.ServerCall; 23 import io.grpc.ServerCall.Listener; 24 import io.grpc.ServerCallHandler; 25 import io.grpc.ServerMethodDefinition; 26 import io.grpc.ServerServiceDefinition; 27 import io.grpc.Status; 28 import java.io.InputStream; 29 30 /** 31 * A {@link ServerCallHandler} that handles calls for a particular method by delegating to a handler 32 * in a {@link ServerServiceDefinition} returned by a factory. 33 * 34 * @param <RequestT> the type of the request payloads 35 * @param <ResponseT> the type of the response payloads 36 */ 37 public final class ProxyServerCallHandler<RequestT, ResponseT> 38 implements ServerCallHandler<InputStream, InputStream> { 39 40 /** 41 * A factory for the {@link ServerServiceDefinition} that a {@link ProxyServerCallHandler} 42 * delegates to. 43 */ 44 public interface ServiceDefinitionFactory { 45 /** 46 * Returns a service definition that contains a {@link ServerCallHandler} for the 47 * {@link ProxyServerCallHandler}'s method. 48 */ getServiceDefinition(Metadata headers)49 ServerServiceDefinition getServiceDefinition(Metadata headers); 50 } 51 52 private final MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor; 53 private final ServiceDefinitionFactory delegateServiceDefinitionFactory; 54 55 /** 56 * Returns a proxy method definition for {@code methodDescriptor}. 57 * 58 * @param delegateServiceDefinitionFactory factory for the delegate service definition 59 */ proxyMethod( MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor, ServiceDefinitionFactory delegateServiceDefinitionFactory)60 public static <RequestT, ResponseT> ServerMethodDefinition<InputStream, InputStream> proxyMethod( 61 MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor, 62 ServiceDefinitionFactory delegateServiceDefinitionFactory) { 63 return ServerMethodDefinition.create( 64 MethodDescriptor.create( 65 delegateMethodDescriptor.getType(), 66 delegateMethodDescriptor.getFullMethodName(), 67 IDENTITY_MARSHALLER, 68 IDENTITY_MARSHALLER), 69 new ProxyServerCallHandler<>(delegateMethodDescriptor, delegateServiceDefinitionFactory)); 70 } 71 ProxyServerCallHandler( MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor, ServiceDefinitionFactory delegateServiceDefinitionFactory)72 ProxyServerCallHandler( 73 MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor, 74 ServiceDefinitionFactory delegateServiceDefinitionFactory) { 75 this.delegateMethodDescriptor = delegateMethodDescriptor; 76 this.delegateServiceDefinitionFactory = delegateServiceDefinitionFactory; 77 } 78 79 @Override startCall( ServerCall<InputStream, InputStream> call, Metadata headers)80 public Listener<InputStream> startCall( 81 ServerCall<InputStream, InputStream> call, 82 Metadata headers) { 83 ServerMethodDefinition<RequestT, ResponseT> delegateMethod = getMethodDefinition(headers); 84 Listener<RequestT> delegateListener = 85 delegateMethod 86 .getServerCallHandler() 87 .startCall(new ServerCallAdapter(call, delegateMethod.getMethodDescriptor()), headers); 88 return new ServerCallListenerAdapter(delegateListener); 89 } 90 91 @SuppressWarnings("unchecked") // Method definition is the correct type. getMethodDefinition(Metadata headers)92 private ServerMethodDefinition<RequestT, ResponseT> getMethodDefinition(Metadata headers) { 93 String fullMethodName = delegateMethodDescriptor.getFullMethodName(); 94 for (ServerMethodDefinition<?, ?> methodDefinition : 95 delegateServiceDefinitionFactory.getServiceDefinition(headers).getMethods()) { 96 if (methodDefinition.getMethodDescriptor().getFullMethodName().equals(fullMethodName)) { 97 return (ServerMethodDefinition<RequestT, ResponseT>) methodDefinition; 98 } 99 } 100 throw new IllegalStateException("Could not find " + fullMethodName); 101 } 102 103 private static final Marshaller<InputStream> IDENTITY_MARSHALLER = 104 new Marshaller<InputStream>() { 105 @Override 106 public InputStream stream(InputStream value) { 107 return value; 108 } 109 110 @Override 111 public InputStream parse(InputStream stream) { 112 return stream; 113 } 114 }; 115 116 /** A {@link Listener} that adapts {@code Listener<RequestT>} to {@code Listener<InputStream>}. */ 117 private final class ServerCallListenerAdapter extends Listener<InputStream> { 118 119 private final Listener<RequestT> delegate; 120 ServerCallListenerAdapter(Listener<RequestT> delegate)121 public ServerCallListenerAdapter(Listener<RequestT> delegate) { 122 this.delegate = delegate; 123 } 124 125 @Override onMessage(InputStream message)126 public void onMessage(InputStream message) { 127 delegate.onMessage(delegateMethodDescriptor.parseRequest(message)); 128 } 129 130 @Override onHalfClose()131 public void onHalfClose() { 132 delegate.onHalfClose(); 133 } 134 135 @Override onCancel()136 public void onCancel() { 137 delegate.onCancel(); 138 } 139 140 @Override onComplete()141 public void onComplete() { 142 delegate.onComplete(); 143 } 144 } 145 146 /** 147 * A {@link ServerCall} that adapts {@code ServerCall<InputStream>} to {@code 148 * ServerCall<ResponseT>}. 149 */ 150 final class ServerCallAdapter extends ServerCall<RequestT, ResponseT> { 151 152 private final ServerCall<InputStream, InputStream> delegate; 153 private final MethodDescriptor<RequestT, ResponseT> method; 154 ServerCallAdapter(ServerCall<InputStream, InputStream> delegate, MethodDescriptor<RequestT, ResponseT> method)155 ServerCallAdapter(ServerCall<InputStream, InputStream> delegate, 156 MethodDescriptor<RequestT, ResponseT> method) { 157 this.delegate = delegate; 158 this.method = method; 159 } 160 161 @Override getMethodDescriptor()162 public MethodDescriptor<RequestT, ResponseT> getMethodDescriptor() { 163 return method; 164 } 165 166 @Override request(int numMessages)167 public void request(int numMessages) { 168 delegate.request(numMessages); 169 } 170 171 @Override sendHeaders(Metadata headers)172 public void sendHeaders(Metadata headers) { 173 delegate.sendHeaders(headers); 174 } 175 176 @Override sendMessage(ResponseT message)177 public void sendMessage(ResponseT message) { 178 delegate.sendMessage(delegateMethodDescriptor.streamResponse(message)); 179 } 180 181 @Override close(Status status, Metadata trailers)182 public void close(Status status, Metadata trailers) { 183 delegate.close(status, trailers); 184 } 185 186 @Override isCancelled()187 public boolean isCancelled() { 188 return delegate.isCancelled(); 189 } 190 } 191 } 192