/*
* Copyright 2014 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.protobuf.lite;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.ExtensionRegistryLite;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import io.grpc.ExperimentalApi;
import io.grpc.KnownLength;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.MethodDescriptor.PrototypeMarshaller;
import io.grpc.Status;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
/**
* Utility methods for using protobuf with grpc.
*/
@ExperimentalApi("Experimental until Lite is stable in protobuf")
public final class ProtoLiteUtils {
// default visibility to avoid synthetic accessors
static volatile ExtensionRegistryLite globalRegistry =
ExtensionRegistryLite.getEmptyRegistry();
private static final int BUF_SIZE = 8192;
/**
* The same value as {@link io.grpc.internal.GrpcUtil#DEFAULT_MAX_MESSAGE_SIZE}.
*/
@VisibleForTesting
static final int DEFAULT_MAX_MESSAGE_SIZE = 4 * 1024 * 1024;
/**
* Sets the global registry for proto marshalling shared across all servers and clients.
*
*
Warning: This API will likely change over time. It is not possible to have separate
* registries per Process, Server, Channel, Service, or Method. This is intentional until there
* is a more appropriate API to set them.
*
*
Warning: Do NOT modify the extension registry after setting it. It is thread safe to call
* {@link #setExtensionRegistry}, but not to modify the underlying object.
*
*
If you need custom parsing behavior for protos, you will need to make your own
* {@code MethodDescriptor.Marshaller} for the time being.
*
* @since 1.0.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1787")
public static void setExtensionRegistry(ExtensionRegistryLite newRegistry) {
globalRegistry = checkNotNull(newRegistry, "newRegistry");
}
/**
* Creates a {@link Marshaller} for protos of the same type as {@code defaultInstance}.
*
* @since 1.0.0
*/
public static Marshaller marshaller(T defaultInstance) {
// TODO(ejona): consider changing return type to PrototypeMarshaller (assuming ABI safe)
return new MessageMarshaller(defaultInstance);
}
/**
* Produce a metadata marshaller for a protobuf type.
*
* @since 1.0.0
*/
public static Metadata.BinaryMarshaller metadataMarshaller(
T defaultInstance) {
return new MetadataMarshaller(defaultInstance);
}
/** Copies the data from input stream to output stream. */
static long copy(InputStream from, OutputStream to) throws IOException {
// Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
checkNotNull(from);
checkNotNull(to);
byte[] buf = new byte[BUF_SIZE];
long total = 0;
while (true) {
int r = from.read(buf);
if (r == -1) {
break;
}
to.write(buf, 0, r);
total += r;
}
return total;
}
private ProtoLiteUtils() {
}
private static final class MessageMarshaller
implements PrototypeMarshaller {
private static final ThreadLocal> bufs = new ThreadLocal>();
private final Parser parser;
private final T defaultInstance;
@SuppressWarnings("unchecked")
MessageMarshaller(T defaultInstance) {
this.defaultInstance = defaultInstance;
parser = (Parser) defaultInstance.getParserForType();
}
@SuppressWarnings("unchecked")
@Override
public Class getMessageClass() {
// Precisely T since protobuf doesn't let messages extend other messages.
return (Class) defaultInstance.getClass();
}
@Override
public T getMessagePrototype() {
return defaultInstance;
}
@Override
public InputStream stream(T value) {
return new ProtoInputStream(value, parser);
}
@Override
public T parse(InputStream stream) {
if (stream instanceof ProtoInputStream) {
ProtoInputStream protoStream = (ProtoInputStream) stream;
// Optimization for in-memory transport. Returning provided object is safe since protobufs
// are immutable.
//
// However, we can't assume the types match, so we have to verify the parser matches.
// Today the parser is always the same for a given proto, but that isn't guaranteed. Even
// if not, using the same MethodDescriptor would ensure the parser matches and permit us
// to enable this optimization.
if (protoStream.parser() == parser) {
try {
@SuppressWarnings("unchecked")
T message = (T) ((ProtoInputStream) stream).message();
return message;
} catch (IllegalStateException ex) {
// Stream must have been read from, which is a strange state. Since the point of this
// optimization is to be transparent, instead of throwing an error we'll continue,
// even though it seems likely there's a bug.
}
}
}
CodedInputStream cis = null;
try {
if (stream instanceof KnownLength) {
int size = stream.available();
if (size > 0 && size <= DEFAULT_MAX_MESSAGE_SIZE) {
Reference ref;
// buf should not be used after this method has returned.
byte[] buf;
if ((ref = bufs.get()) == null || (buf = ref.get()) == null || buf.length < size) {
buf = new byte[size];
bufs.set(new WeakReference(buf));
}
int remaining = size;
while (remaining > 0) {
int position = size - remaining;
int count = stream.read(buf, position, remaining);
if (count == -1) {
break;
}
remaining -= count;
}
if (remaining != 0) {
int position = size - remaining;
throw new RuntimeException("size inaccurate: " + size + " != " + position);
}
cis = CodedInputStream.newInstance(buf, 0, size);
} else if (size == 0) {
return defaultInstance;
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
if (cis == null) {
cis = CodedInputStream.newInstance(stream);
}
// Pre-create the CodedInputStream so that we can remove the size limit restriction
// when parsing.
cis.setSizeLimit(Integer.MAX_VALUE);
try {
return parseFrom(cis);
} catch (InvalidProtocolBufferException ipbe) {
throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence")
.withCause(ipbe).asRuntimeException();
}
}
private T parseFrom(CodedInputStream stream) throws InvalidProtocolBufferException {
T message = parser.parseFrom(stream, globalRegistry);
try {
stream.checkLastTagWas(0);
return message;
} catch (InvalidProtocolBufferException e) {
e.setUnfinishedMessage(message);
throw e;
}
}
}
private static final class MetadataMarshaller
implements Metadata.BinaryMarshaller {
private final T defaultInstance;
MetadataMarshaller(T defaultInstance) {
this.defaultInstance = defaultInstance;
}
@Override
public byte[] toBytes(T value) {
return value.toByteArray();
}
@Override
@SuppressWarnings("unchecked")
public T parseBytes(byte[] serialized) {
try {
return (T) defaultInstance.getParserForType().parseFrom(serialized, globalRegistry);
} catch (InvalidProtocolBufferException ipbe) {
throw new IllegalArgumentException(ipbe);
}
}
}
}