• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.services;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.Preconditions;
21 import io.grpc.BinaryLog;
22 import io.grpc.CallOptions;
23 import io.grpc.Channel;
24 import io.grpc.ClientCall;
25 import io.grpc.ClientInterceptor;
26 import io.grpc.ClientInterceptors;
27 import io.grpc.Internal;
28 import io.grpc.InternalClientInterceptors;
29 import io.grpc.InternalServerInterceptors;
30 import io.grpc.ManagedChannel;
31 import io.grpc.MethodDescriptor;
32 import io.grpc.MethodDescriptor.Marshaller;
33 import io.grpc.ServerCallHandler;
34 import io.grpc.ServerInterceptor;
35 import io.grpc.ServerMethodDefinition;
36 import java.io.ByteArrayInputStream;
37 import java.io.ByteArrayOutputStream;
38 import java.io.IOException;
39 import java.io.InputStream;
40 import java.io.OutputStream;
41 import javax.annotation.Nullable;
42 
43 // TODO(zpencer): rename class to AbstractBinaryLog
44 @Internal
45 public abstract class BinaryLogProvider extends BinaryLog {
46   @VisibleForTesting
47   public static final Marshaller<byte[]> BYTEARRAY_MARSHALLER = new ByteArrayMarshaller();
48 
49   private final ClientInterceptor binaryLogShim = new BinaryLogShim();
50 
51   /**
52    * Wraps a channel to provide binary logging on {@link ClientCall}s as needed.
53    */
54   @Override
wrapChannel(Channel channel)55   public final Channel wrapChannel(Channel channel) {
56     return ClientInterceptors.intercept(channel, binaryLogShim);
57   }
58 
toByteBufferMethod( MethodDescriptor<?, ?> method)59   private static MethodDescriptor<byte[], byte[]> toByteBufferMethod(
60       MethodDescriptor<?, ?> method) {
61     return method.toBuilder(BYTEARRAY_MARSHALLER, BYTEARRAY_MARSHALLER).build();
62   }
63 
64   /**
65    * Wraps a {@link ServerMethodDefinition} such that it performs binary logging if needed.
66    */
67   @Override
wrapMethodDefinition( ServerMethodDefinition<ReqT, RespT> oMethodDef)68   public final <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
69       ServerMethodDefinition<ReqT, RespT> oMethodDef) {
70     ServerInterceptor binlogInterceptor =
71         getServerInterceptor(oMethodDef.getMethodDescriptor().getFullMethodName());
72     if (binlogInterceptor == null) {
73       return oMethodDef;
74     }
75     MethodDescriptor<byte[], byte[]> binMethod =
76         BinaryLogProvider.toByteBufferMethod(oMethodDef.getMethodDescriptor());
77     ServerMethodDefinition<byte[], byte[]> binDef =
78         InternalServerInterceptors.wrapMethod(oMethodDef, binMethod);
79     ServerCallHandler<byte[], byte[]> binlogHandler =
80         InternalServerInterceptors.interceptCallHandlerCreate(
81             binlogInterceptor, binDef.getServerCallHandler());
82     return ServerMethodDefinition.create(binMethod, binlogHandler);
83   }
84 
85   /**
86    * Returns a {@link ServerInterceptor} for binary logging. gRPC is free to cache the interceptor,
87    * so the interceptor must be reusable across calls. At runtime, the request and response
88    * marshallers are always {@code Marshaller<InputStream>}.
89    * Returns {@code null} if this method is not binary logged.
90    */
91   // TODO(zpencer): ensure the interceptor properly handles retries and hedging
92   @Nullable
getServerInterceptor(String fullMethodName)93   protected abstract ServerInterceptor getServerInterceptor(String fullMethodName);
94 
95   /**
96    * Returns a {@link ClientInterceptor} for binary logging. gRPC is free to cache the interceptor,
97    * so the interceptor must be reusable across calls. At runtime, the request and response
98    * marshallers are always {@code Marshaller<InputStream>}.
99    * Returns {@code null} if this method is not binary logged.
100    */
101   // TODO(zpencer): ensure the interceptor properly handles retries and hedging
102   @Nullable
getClientInterceptor( String fullMethodName, CallOptions callOptions)103   protected abstract ClientInterceptor getClientInterceptor(
104       String fullMethodName, CallOptions callOptions);
105 
106   @Override
close()107   public void close() throws IOException {
108     // default impl: noop
109     // TODO(zpencer): make BinaryLogProvider provide a BinaryLog, and this method belongs there
110   }
111 
112   // Creating a named class makes debugging easier
113   private static final class ByteArrayMarshaller implements Marshaller<byte[]> {
114     @Override
stream(byte[] value)115     public InputStream stream(byte[] value) {
116       return new ByteArrayInputStream(value);
117     }
118 
119     @Override
parse(InputStream stream)120     public byte[] parse(InputStream stream) {
121       try {
122         return parseHelper(stream);
123       } catch (IOException e) {
124         throw new RuntimeException(e);
125       }
126     }
127 
parseHelper(InputStream stream)128     private byte[] parseHelper(InputStream stream) throws IOException {
129       try {
130         return IoUtils.toByteArray(stream);
131       } finally {
132         stream.close();
133       }
134     }
135   }
136 
137   /**
138    * The pipeline of interceptors is hard coded when the {@link ManagedChannel} is created.
139    * This shim interceptor should always be installed as a placeholder. When a call starts,
140    * this interceptor checks with the {@link BinaryLogProvider} to see if logging should happen
141    * for this particular {@link ClientCall}'s method.
142    */
143   private final class BinaryLogShim implements ClientInterceptor {
144     @Override
interceptCall( MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next)145     public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
146         MethodDescriptor<ReqT, RespT> method,
147         CallOptions callOptions,
148         Channel next) {
149       ClientInterceptor binlogInterceptor = getClientInterceptor(
150           method.getFullMethodName(), callOptions);
151       if (binlogInterceptor == null) {
152         return next.newCall(method, callOptions);
153       } else {
154         return InternalClientInterceptors
155             .wrapClientInterceptor(
156                 binlogInterceptor,
157                 BYTEARRAY_MARSHALLER,
158                 BYTEARRAY_MARSHALLER)
159             .interceptCall(method, callOptions, next);
160       }
161     }
162   }
163 
164   // Copied from internal
165   private static final class IoUtils {
166     /** maximum buffer to be read is 16 KB. */
167     private static final int MAX_BUFFER_LENGTH = 16384;
168 
169     /** Returns the byte array. */
toByteArray(InputStream in)170     public static byte[] toByteArray(InputStream in) throws IOException {
171       ByteArrayOutputStream out = new ByteArrayOutputStream();
172       copy(in, out);
173       return out.toByteArray();
174     }
175 
176     /** Copies the data from input stream to output stream. */
copy(InputStream from, OutputStream to)177     public static long copy(InputStream from, OutputStream to) throws IOException {
178       // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
179       Preconditions.checkNotNull(from);
180       Preconditions.checkNotNull(to);
181       byte[] buf = new byte[MAX_BUFFER_LENGTH];
182       long total = 0;
183       while (true) {
184         int r = from.read(buf);
185         if (r == -1) {
186           break;
187         }
188         to.write(buf, 0, r);
189         total += r;
190       }
191       return total;
192     }
193   }
194 }
195