1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.protobuf.lite; 18 19 import com.google.protobuf.CodedOutputStream; 20 import com.google.protobuf.MessageLite; 21 import com.google.protobuf.Parser; 22 import io.grpc.Drainable; 23 import io.grpc.KnownLength; 24 import java.io.ByteArrayInputStream; 25 import java.io.IOException; 26 import java.io.InputStream; 27 import java.io.OutputStream; 28 import javax.annotation.Nullable; 29 30 /** 31 * An {@link InputStream} backed by a protobuf. 32 */ 33 final class ProtoInputStream extends InputStream implements Drainable, KnownLength { 34 35 // ProtoInputStream is first initialized with a *message*. *partial* is initially null. 36 // Once there has been a read operation on this stream, *message* is serialized to *partial* and 37 // set to null. 38 @Nullable private MessageLite message; 39 private final Parser<?> parser; 40 @Nullable private ByteArrayInputStream partial; 41 ProtoInputStream(MessageLite message, Parser<?> parser)42 ProtoInputStream(MessageLite message, Parser<?> parser) { 43 this.message = message; 44 this.parser = parser; 45 } 46 47 @Override drainTo(OutputStream target)48 public int drainTo(OutputStream target) throws IOException { 49 int written; 50 if (message != null) { 51 written = message.getSerializedSize(); 52 message.writeTo(target); 53 message = null; 54 } else if (partial != null) { 55 written = (int) ProtoLiteUtils.copy(partial, target); 56 partial = null; 57 } else { 58 written = 0; 59 } 60 return written; 61 } 62 63 @Override read()64 public int read() throws IOException { 65 if (message != null) { 66 partial = new ByteArrayInputStream(message.toByteArray()); 67 message = null; 68 } 69 if (partial != null) { 70 return partial.read(); 71 } 72 return -1; 73 } 74 75 @Override read(byte[] b, int off, int len)76 public int read(byte[] b, int off, int len) throws IOException { 77 if (message != null) { 78 int size = message.getSerializedSize(); 79 if (size == 0) { 80 message = null; 81 partial = null; 82 return -1; 83 } 84 if (len >= size) { 85 // This is the only case that is zero-copy. 86 CodedOutputStream stream = CodedOutputStream.newInstance(b, off, size); 87 message.writeTo(stream); 88 stream.flush(); 89 stream.checkNoSpaceLeft(); 90 91 message = null; 92 partial = null; 93 return size; 94 } 95 96 partial = new ByteArrayInputStream(message.toByteArray()); 97 message = null; 98 } 99 if (partial != null) { 100 return partial.read(b, off, len); 101 } 102 return -1; 103 } 104 105 @Override available()106 public int available() { 107 if (message != null) { 108 return message.getSerializedSize(); 109 } else if (partial != null) { 110 return partial.available(); 111 } 112 return 0; 113 } 114 message()115 MessageLite message() { 116 if (message == null) { 117 throw new IllegalStateException("message not available"); 118 } 119 return message; 120 } 121 parser()122 Parser<?> parser() { 123 return parser; 124 } 125 } 126