1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc; 18 19 import com.google.common.base.Preconditions; 20 import io.grpc.MethodDescriptor.Marshaller; 21 import java.io.InputStream; 22 import java.util.ArrayList; 23 import java.util.Arrays; 24 import java.util.Collections; 25 import java.util.List; 26 27 /** 28 * Utility methods for working with {@link ClientInterceptor}s. 29 */ 30 public class ClientInterceptors { 31 32 // Prevent instantiation ClientInterceptors()33 private ClientInterceptors() {} 34 35 /** 36 * Create a new {@link Channel} that will call {@code interceptors} before starting a call on the 37 * given channel. The first interceptor will have its {@link ClientInterceptor#interceptCall} 38 * called first. 39 * 40 * @param channel the underlying channel to intercept. 41 * @param interceptors array of interceptors to bind to {@code channel}. 42 * @return a new channel instance with the interceptors applied. 43 */ interceptForward(Channel channel, ClientInterceptor... interceptors)44 public static Channel interceptForward(Channel channel, ClientInterceptor... interceptors) { 45 return interceptForward(channel, Arrays.asList(interceptors)); 46 } 47 48 /** 49 * Create a new {@link Channel} that will call {@code interceptors} before starting a call on the 50 * given channel. The first interceptor will have its {@link ClientInterceptor#interceptCall} 51 * called first. 52 * 53 * @param channel the underlying channel to intercept. 54 * @param interceptors a list of interceptors to bind to {@code channel}. 55 * @return a new channel instance with the interceptors applied. 56 */ interceptForward(Channel channel, List<? extends ClientInterceptor> interceptors)57 public static Channel interceptForward(Channel channel, 58 List<? extends ClientInterceptor> interceptors) { 59 List<? extends ClientInterceptor> copy = new ArrayList<>(interceptors); 60 Collections.reverse(copy); 61 return intercept(channel, copy); 62 } 63 64 /** 65 * Create a new {@link Channel} that will call {@code interceptors} before starting a call on the 66 * given channel. The last interceptor will have its {@link ClientInterceptor#interceptCall} 67 * called first. 68 * 69 * @param channel the underlying channel to intercept. 70 * @param interceptors array of interceptors to bind to {@code channel}. 71 * @return a new channel instance with the interceptors applied. 72 */ intercept(Channel channel, ClientInterceptor... interceptors)73 public static Channel intercept(Channel channel, ClientInterceptor... interceptors) { 74 return intercept(channel, Arrays.asList(interceptors)); 75 } 76 77 /** 78 * Create a new {@link Channel} that will call {@code interceptors} before starting a call on the 79 * given channel. The last interceptor will have its {@link ClientInterceptor#interceptCall} 80 * called first. 81 * 82 * @param channel the underlying channel to intercept. 83 * @param interceptors a list of interceptors to bind to {@code channel}. 84 * @return a new channel instance with the interceptors applied. 85 */ intercept(Channel channel, List<? extends ClientInterceptor> interceptors)86 public static Channel intercept(Channel channel, List<? extends ClientInterceptor> interceptors) { 87 Preconditions.checkNotNull(channel, "channel"); 88 for (ClientInterceptor interceptor : interceptors) { 89 channel = new InterceptorChannel(channel, interceptor); 90 } 91 return channel; 92 } 93 94 /** 95 * Creates a new ClientInterceptor that transforms requests into {@code WReqT} and responses into 96 * {@code WRespT} before passing them into the {@code interceptor}. 97 */ wrapClientInterceptor( final ClientInterceptor interceptor, final Marshaller<WReqT> reqMarshaller, final Marshaller<WRespT> respMarshaller)98 static <WReqT, WRespT> ClientInterceptor wrapClientInterceptor( 99 final ClientInterceptor interceptor, 100 final Marshaller<WReqT> reqMarshaller, 101 final Marshaller<WRespT> respMarshaller) { 102 return new ClientInterceptor() { 103 @Override 104 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( 105 final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { 106 final MethodDescriptor<WReqT, WRespT> wrappedMethod = 107 method.toBuilder(reqMarshaller, respMarshaller).build(); 108 final ClientCall<WReqT, WRespT> wrappedCall = 109 interceptor.interceptCall(wrappedMethod, callOptions, next); 110 return new PartialForwardingClientCall<ReqT, RespT>() { 111 @Override 112 public void start(final Listener<RespT> responseListener, Metadata headers) { 113 wrappedCall.start(new PartialForwardingClientCallListener<WRespT>() { 114 @Override 115 public void onMessage(WRespT wMessage) { 116 InputStream bytes = respMarshaller.stream(wMessage); 117 RespT message = method.getResponseMarshaller().parse(bytes); 118 responseListener.onMessage(message); 119 } 120 121 @Override 122 protected Listener<?> delegate() { 123 return responseListener; 124 } 125 }, headers); 126 } 127 128 @Override 129 public void sendMessage(ReqT message) { 130 InputStream bytes = method.getRequestMarshaller().stream(message); 131 WReqT wReq = reqMarshaller.parse(bytes); 132 wrappedCall.sendMessage(wReq); 133 } 134 135 @Override 136 protected ClientCall<?, ?> delegate() { 137 return wrappedCall; 138 } 139 }; 140 } 141 }; 142 } 143 144 private static class InterceptorChannel extends Channel { 145 private final Channel channel; 146 private final ClientInterceptor interceptor; 147 148 private InterceptorChannel(Channel channel, ClientInterceptor interceptor) { 149 this.channel = channel; 150 this.interceptor = Preconditions.checkNotNull(interceptor, "interceptor"); 151 } 152 153 @Override 154 public <ReqT, RespT> ClientCall<ReqT, RespT> newCall( 155 MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) { 156 return interceptor.interceptCall(method, callOptions, channel); 157 } 158 159 @Override 160 public String authority() { 161 return channel.authority(); 162 } 163 } 164 165 private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() { 166 @Override 167 public void start(Listener<Object> responseListener, Metadata headers) {} 168 169 @Override 170 public void request(int numMessages) {} 171 172 @Override 173 public void cancel(String message, Throwable cause) {} 174 175 @Override 176 public void halfClose() {} 177 178 @Override 179 public void sendMessage(Object message) {} 180 181 /** 182 * Always returns {@code false}, since this is only used when the startup of the {@link 183 * ClientCall} fails (i.e. the {@link ClientCall} is closed). 184 */ 185 @Override 186 public boolean isReady() { 187 return false; 188 } 189 }; 190 191 /** 192 * A {@link io.grpc.ForwardingClientCall} that delivers exceptions from its start logic to the 193 * call listener. 194 * 195 * <p>{@link ClientCall#start(ClientCall.Listener, Metadata)} should not throw any 196 * exception other than those caused by misuse, e.g., {@link IllegalStateException}. {@code 197 * CheckedForwardingClientCall} provides {@code checkedStart()} in which throwing exceptions is 198 * allowed. 199 */ 200 public abstract static class CheckedForwardingClientCall<ReqT, RespT> 201 extends io.grpc.ForwardingClientCall<ReqT, RespT> { 202 203 private ClientCall<ReqT, RespT> delegate; 204 205 /** 206 * Subclasses implement the start logic here that would normally belong to {@code start()}. 207 * 208 * <p>Implementation should call {@code this.delegate().start()} in the normal path. Exceptions 209 * may safely be thrown prior to calling {@code this.delegate().start()}. Such exceptions will 210 * be handled by {@code CheckedForwardingClientCall} and be delivered to {@code 211 * responseListener}. Exceptions <em>must not</em> be thrown after calling {@code 212 * this.delegate().start()}, as this can result in {@link ClientCall.Listener#onClose} being 213 * called multiple times. 214 */ 215 protected abstract void checkedStart(Listener<RespT> responseListener, Metadata headers) 216 throws Exception; 217 218 protected CheckedForwardingClientCall(ClientCall<ReqT, RespT> delegate) { 219 this.delegate = delegate; 220 } 221 222 @Override 223 protected final ClientCall<ReqT, RespT> delegate() { 224 return delegate; 225 } 226 227 @Override 228 @SuppressWarnings("unchecked") 229 public final void start(Listener<RespT> responseListener, Metadata headers) { 230 try { 231 checkedStart(responseListener, headers); 232 } catch (Exception e) { 233 // Because start() doesn't throw, the caller may still try to call other methods on this 234 // call object. Passing these invocations to the original delegate will cause 235 // IllegalStateException because delegate().start() was not called. We switch the delegate 236 // to a NO-OP one to prevent the IllegalStateException. The user will finally get notified 237 // about the error through the listener. 238 delegate = (ClientCall<ReqT, RespT>) NOOP_CALL; 239 responseListener.onClose(Status.fromThrowable(e), new Metadata()); 240 } 241 } 242 } 243 } 244