• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc;
18 
19 import com.google.common.base.Preconditions;
20 import io.grpc.MethodDescriptor.Marshaller;
21 import java.io.InputStream;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collections;
25 import java.util.List;
26 
27 /**
28  * Utility methods for working with {@link ClientInterceptor}s.
29  */
30 public class ClientInterceptors {
31 
32   // Prevent instantiation
ClientInterceptors()33   private ClientInterceptors() {}
34 
35   /**
36    * Create a new {@link Channel} that will call {@code interceptors} before starting a call on the
37    * given channel. The first interceptor will have its {@link ClientInterceptor#interceptCall}
38    * called first.
39    *
40    * @param channel the underlying channel to intercept.
41    * @param interceptors array of interceptors to bind to {@code channel}.
42    * @return a new channel instance with the interceptors applied.
43    */
interceptForward(Channel channel, ClientInterceptor... interceptors)44   public static Channel interceptForward(Channel channel, ClientInterceptor... interceptors) {
45     return interceptForward(channel, Arrays.asList(interceptors));
46   }
47 
48   /**
49    * Create a new {@link Channel} that will call {@code interceptors} before starting a call on the
50    * given channel. The first interceptor will have its {@link ClientInterceptor#interceptCall}
51    * called first.
52    *
53    * @param channel the underlying channel to intercept.
54    * @param interceptors a list of interceptors to bind to {@code channel}.
55    * @return a new channel instance with the interceptors applied.
56    */
interceptForward(Channel channel, List<? extends ClientInterceptor> interceptors)57   public static Channel interceptForward(Channel channel,
58                                          List<? extends ClientInterceptor> interceptors) {
59     List<? extends ClientInterceptor> copy = new ArrayList<>(interceptors);
60     Collections.reverse(copy);
61     return intercept(channel, copy);
62   }
63 
64   /**
65    * Create a new {@link Channel} that will call {@code interceptors} before starting a call on the
66    * given channel. The last interceptor will have its {@link ClientInterceptor#interceptCall}
67    * called first.
68    *
69    * @param channel the underlying channel to intercept.
70    * @param interceptors array of interceptors to bind to {@code channel}.
71    * @return a new channel instance with the interceptors applied.
72    */
intercept(Channel channel, ClientInterceptor... interceptors)73   public static Channel intercept(Channel channel, ClientInterceptor... interceptors) {
74     return intercept(channel, Arrays.asList(interceptors));
75   }
76 
77   /**
78    * Create a new {@link Channel} that will call {@code interceptors} before starting a call on the
79    * given channel. The last interceptor will have its {@link ClientInterceptor#interceptCall}
80    * called first.
81    *
82    * @param channel the underlying channel to intercept.
83    * @param interceptors a list of interceptors to bind to {@code channel}.
84    * @return a new channel instance with the interceptors applied.
85    */
intercept(Channel channel, List<? extends ClientInterceptor> interceptors)86   public static Channel intercept(Channel channel, List<? extends ClientInterceptor> interceptors) {
87     Preconditions.checkNotNull(channel, "channel");
88     for (ClientInterceptor interceptor : interceptors) {
89       channel = new InterceptorChannel(channel, interceptor);
90     }
91     return channel;
92   }
93 
94   /**
95    * Creates a new ClientInterceptor that transforms requests into {@code WReqT} and responses into
96    * {@code WRespT} before passing them into the {@code interceptor}.
97    */
wrapClientInterceptor( final ClientInterceptor interceptor, final Marshaller<WReqT> reqMarshaller, final Marshaller<WRespT> respMarshaller)98   static <WReqT, WRespT> ClientInterceptor wrapClientInterceptor(
99       final ClientInterceptor interceptor,
100       final Marshaller<WReqT> reqMarshaller,
101       final Marshaller<WRespT> respMarshaller) {
102     return new ClientInterceptor() {
103       @Override
104       public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
105           final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
106         final MethodDescriptor<WReqT, WRespT> wrappedMethod =
107             method.toBuilder(reqMarshaller, respMarshaller).build();
108         final ClientCall<WReqT, WRespT> wrappedCall =
109             interceptor.interceptCall(wrappedMethod, callOptions, next);
110         return new PartialForwardingClientCall<ReqT, RespT>() {
111           @Override
112           public void start(final Listener<RespT> responseListener, Metadata headers) {
113             wrappedCall.start(new PartialForwardingClientCallListener<WRespT>() {
114               @Override
115               public void onMessage(WRespT wMessage) {
116                 InputStream bytes = respMarshaller.stream(wMessage);
117                 RespT message = method.getResponseMarshaller().parse(bytes);
118                 responseListener.onMessage(message);
119               }
120 
121               @Override
122               protected Listener<?> delegate() {
123                 return responseListener;
124               }
125             }, headers);
126           }
127 
128           @Override
129           public void sendMessage(ReqT message) {
130             InputStream bytes = method.getRequestMarshaller().stream(message);
131             WReqT wReq = reqMarshaller.parse(bytes);
132             wrappedCall.sendMessage(wReq);
133           }
134 
135           @Override
136           protected ClientCall<?, ?> delegate() {
137             return wrappedCall;
138           }
139         };
140       }
141     };
142   }
143 
144   private static class InterceptorChannel extends Channel {
145     private final Channel channel;
146     private final ClientInterceptor interceptor;
147 
148     private InterceptorChannel(Channel channel, ClientInterceptor interceptor) {
149       this.channel = channel;
150       this.interceptor = Preconditions.checkNotNull(interceptor, "interceptor");
151     }
152 
153     @Override
154     public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
155         MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
156       return interceptor.interceptCall(method, callOptions, channel);
157     }
158 
159     @Override
160     public String authority() {
161       return channel.authority();
162     }
163   }
164 
165   private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
166     @Override
167     public void start(Listener<Object> responseListener, Metadata headers) {}
168 
169     @Override
170     public void request(int numMessages) {}
171 
172     @Override
173     public void cancel(String message, Throwable cause) {}
174 
175     @Override
176     public void halfClose() {}
177 
178     @Override
179     public void sendMessage(Object message) {}
180 
181     /**
182      * Always returns {@code false}, since this is only used when the startup of the {@link
183      * ClientCall} fails (i.e. the {@link ClientCall} is closed).
184      */
185     @Override
186     public boolean isReady() {
187       return false;
188     }
189   };
190 
191   /**
192    * A {@link io.grpc.ForwardingClientCall} that delivers exceptions from its start logic to the
193    * call listener.
194    *
195    * <p>{@link ClientCall#start(ClientCall.Listener, Metadata)} should not throw any
196    * exception other than those caused by misuse, e.g., {@link IllegalStateException}.  {@code
197    * CheckedForwardingClientCall} provides {@code checkedStart()} in which throwing exceptions is
198    * allowed.
199    */
200   public abstract static class CheckedForwardingClientCall<ReqT, RespT>
201       extends io.grpc.ForwardingClientCall<ReqT, RespT> {
202 
203     private ClientCall<ReqT, RespT> delegate;
204 
205     /**
206      * Subclasses implement the start logic here that would normally belong to {@code start()}.
207      *
208      * <p>Implementation should call {@code this.delegate().start()} in the normal path. Exceptions
209      * may safely be thrown prior to calling {@code this.delegate().start()}. Such exceptions will
210      * be handled by {@code CheckedForwardingClientCall} and be delivered to {@code
211      * responseListener}.  Exceptions <em>must not</em> be thrown after calling {@code
212      * this.delegate().start()}, as this can result in {@link ClientCall.Listener#onClose} being
213      * called multiple times.
214      */
215     protected abstract void checkedStart(Listener<RespT> responseListener, Metadata headers)
216         throws Exception;
217 
218     protected CheckedForwardingClientCall(ClientCall<ReqT, RespT> delegate) {
219       this.delegate = delegate;
220     }
221 
222     @Override
223     protected final ClientCall<ReqT, RespT> delegate() {
224       return delegate;
225     }
226 
227     @Override
228     @SuppressWarnings("unchecked")
229     public final void start(Listener<RespT> responseListener, Metadata headers) {
230       try {
231         checkedStart(responseListener, headers);
232       } catch (Exception e) {
233         // Because start() doesn't throw, the caller may still try to call other methods on this
234         // call object. Passing these invocations to the original delegate will cause
235         // IllegalStateException because delegate().start() was not called. We switch the delegate
236         // to a NO-OP one to prevent the IllegalStateException. The user will finally get notified
237         // about the error through the listener.
238         delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
239         responseListener.onClose(Status.fromThrowable(e), new Metadata());
240       }
241     }
242   }
243 }
244