1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.services; 18 19 import com.google.common.annotations.VisibleForTesting; 20 import com.google.common.base.Preconditions; 21 import io.grpc.BinaryLog; 22 import io.grpc.CallOptions; 23 import io.grpc.Channel; 24 import io.grpc.ClientCall; 25 import io.grpc.ClientInterceptor; 26 import io.grpc.ClientInterceptors; 27 import io.grpc.Internal; 28 import io.grpc.InternalClientInterceptors; 29 import io.grpc.InternalServerInterceptors; 30 import io.grpc.ManagedChannel; 31 import io.grpc.MethodDescriptor; 32 import io.grpc.MethodDescriptor.Marshaller; 33 import io.grpc.ServerCallHandler; 34 import io.grpc.ServerInterceptor; 35 import io.grpc.ServerMethodDefinition; 36 import java.io.ByteArrayInputStream; 37 import java.io.ByteArrayOutputStream; 38 import java.io.IOException; 39 import java.io.InputStream; 40 import java.io.OutputStream; 41 import javax.annotation.Nullable; 42 43 // TODO(zpencer): rename class to AbstractBinaryLog 44 @Internal 45 public abstract class BinaryLogProvider extends BinaryLog { 46 @VisibleForTesting 47 public static final Marshaller<byte[]> BYTEARRAY_MARSHALLER = new ByteArrayMarshaller(); 48 49 private final ClientInterceptor binaryLogShim = new BinaryLogShim(); 50 51 /** 52 * Wraps a channel to provide binary logging on {@link ClientCall}s as needed. 53 */ 54 @Override wrapChannel(Channel channel)55 public final Channel wrapChannel(Channel channel) { 56 return ClientInterceptors.intercept(channel, binaryLogShim); 57 } 58 toByteBufferMethod( MethodDescriptor<?, ?> method)59 private static MethodDescriptor<byte[], byte[]> toByteBufferMethod( 60 MethodDescriptor<?, ?> method) { 61 return method.toBuilder(BYTEARRAY_MARSHALLER, BYTEARRAY_MARSHALLER).build(); 62 } 63 64 /** 65 * Wraps a {@link ServerMethodDefinition} such that it performs binary logging if needed. 66 */ 67 @Override wrapMethodDefinition( ServerMethodDefinition<ReqT, RespT> oMethodDef)68 public final <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition( 69 ServerMethodDefinition<ReqT, RespT> oMethodDef) { 70 ServerInterceptor binlogInterceptor = 71 getServerInterceptor(oMethodDef.getMethodDescriptor().getFullMethodName()); 72 if (binlogInterceptor == null) { 73 return oMethodDef; 74 } 75 MethodDescriptor<byte[], byte[]> binMethod = 76 BinaryLogProvider.toByteBufferMethod(oMethodDef.getMethodDescriptor()); 77 ServerMethodDefinition<byte[], byte[]> binDef = 78 InternalServerInterceptors.wrapMethod(oMethodDef, binMethod); 79 ServerCallHandler<byte[], byte[]> binlogHandler = 80 InternalServerInterceptors.interceptCallHandlerCreate( 81 binlogInterceptor, binDef.getServerCallHandler()); 82 return ServerMethodDefinition.create(binMethod, binlogHandler); 83 } 84 85 /** 86 * Returns a {@link ServerInterceptor} for binary logging. gRPC is free to cache the interceptor, 87 * so the interceptor must be reusable across calls. At runtime, the request and response 88 * marshallers are always {@code Marshaller<InputStream>}. 89 * Returns {@code null} if this method is not binary logged. 90 */ 91 // TODO(zpencer): ensure the interceptor properly handles retries and hedging 92 @Nullable getServerInterceptor(String fullMethodName)93 protected abstract ServerInterceptor getServerInterceptor(String fullMethodName); 94 95 /** 96 * Returns a {@link ClientInterceptor} for binary logging. gRPC is free to cache the interceptor, 97 * so the interceptor must be reusable across calls. At runtime, the request and response 98 * marshallers are always {@code Marshaller<InputStream>}. 99 * Returns {@code null} if this method is not binary logged. 100 */ 101 // TODO(zpencer): ensure the interceptor properly handles retries and hedging 102 @Nullable getClientInterceptor( String fullMethodName, CallOptions callOptions)103 protected abstract ClientInterceptor getClientInterceptor( 104 String fullMethodName, CallOptions callOptions); 105 106 @Override close()107 public void close() throws IOException { 108 // default impl: noop 109 // TODO(zpencer): make BinaryLogProvider provide a BinaryLog, and this method belongs there 110 } 111 112 // Creating a named class makes debugging easier 113 private static final class ByteArrayMarshaller implements Marshaller<byte[]> { 114 @Override stream(byte[] value)115 public InputStream stream(byte[] value) { 116 return new ByteArrayInputStream(value); 117 } 118 119 @Override parse(InputStream stream)120 public byte[] parse(InputStream stream) { 121 try { 122 return parseHelper(stream); 123 } catch (IOException e) { 124 throw new RuntimeException(e); 125 } 126 } 127 parseHelper(InputStream stream)128 private byte[] parseHelper(InputStream stream) throws IOException { 129 try { 130 return IoUtils.toByteArray(stream); 131 } finally { 132 stream.close(); 133 } 134 } 135 } 136 137 /** 138 * The pipeline of interceptors is hard coded when the {@link ManagedChannel} is created. 139 * This shim interceptor should always be installed as a placeholder. When a call starts, 140 * this interceptor checks with the {@link BinaryLogProvider} to see if logging should happen 141 * for this particular {@link ClientCall}'s method. 142 */ 143 private final class BinaryLogShim implements ClientInterceptor { 144 @Override interceptCall( MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next)145 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( 146 MethodDescriptor<ReqT, RespT> method, 147 CallOptions callOptions, 148 Channel next) { 149 ClientInterceptor binlogInterceptor = getClientInterceptor( 150 method.getFullMethodName(), callOptions); 151 if (binlogInterceptor == null) { 152 return next.newCall(method, callOptions); 153 } else { 154 return InternalClientInterceptors 155 .wrapClientInterceptor( 156 binlogInterceptor, 157 BYTEARRAY_MARSHALLER, 158 BYTEARRAY_MARSHALLER) 159 .interceptCall(method, callOptions, next); 160 } 161 } 162 } 163 164 // Copied from internal 165 private static final class IoUtils { 166 /** maximum buffer to be read is 16 KB. */ 167 private static final int MAX_BUFFER_LENGTH = 16384; 168 169 /** Returns the byte array. */ toByteArray(InputStream in)170 public static byte[] toByteArray(InputStream in) throws IOException { 171 ByteArrayOutputStream out = new ByteArrayOutputStream(); 172 copy(in, out); 173 return out.toByteArray(); 174 } 175 176 /** Copies the data from input stream to output stream. */ copy(InputStream from, OutputStream to)177 public static long copy(InputStream from, OutputStream to) throws IOException { 178 // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta) 179 Preconditions.checkNotNull(from); 180 Preconditions.checkNotNull(to); 181 byte[] buf = new byte[MAX_BUFFER_LENGTH]; 182 long total = 0; 183 while (true) { 184 int r = from.read(buf); 185 if (r == -1) { 186 break; 187 } 188 to.write(buf, 0, r); 189 total += r; 190 } 191 return total; 192 } 193 } 194 } 195