• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.protobuf.lite;
18 
19 import com.google.protobuf.CodedOutputStream;
20 import com.google.protobuf.MessageLite;
21 import com.google.protobuf.Parser;
22 import io.grpc.Drainable;
23 import io.grpc.KnownLength;
24 import java.io.ByteArrayInputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.OutputStream;
28 import javax.annotation.Nullable;
29 
30 /**
31  * An {@link InputStream} backed by a protobuf.
32  */
33 final class ProtoInputStream extends InputStream implements Drainable, KnownLength {
34 
35   // ProtoInputStream is first initialized with a *message*. *partial* is initially null.
36   // Once there has been a read operation on this stream, *message* is serialized to *partial* and
37   // set to null.
38   @Nullable private MessageLite message;
39   private final Parser<?> parser;
40   @Nullable private ByteArrayInputStream partial;
41 
ProtoInputStream(MessageLite message, Parser<?> parser)42   ProtoInputStream(MessageLite message, Parser<?> parser) {
43     this.message = message;
44     this.parser = parser;
45   }
46 
47   @Override
drainTo(OutputStream target)48   public int drainTo(OutputStream target) throws IOException {
49     int written;
50     if (message != null) {
51       written = message.getSerializedSize();
52       message.writeTo(target);
53       message = null;
54     } else if (partial != null) {
55       written = (int) ProtoLiteUtils.copy(partial, target);
56       partial = null;
57     } else {
58       written = 0;
59     }
60     return written;
61   }
62 
63   @Override
read()64   public int read() throws IOException {
65     if (message != null) {
66       partial = new ByteArrayInputStream(message.toByteArray());
67       message = null;
68     }
69     if (partial != null) {
70       return partial.read();
71     }
72     return -1;
73   }
74 
75   @Override
read(byte[] b, int off, int len)76   public int read(byte[] b, int off, int len) throws IOException {
77     if (message != null) {
78       int size = message.getSerializedSize();
79       if (size == 0) {
80         message = null;
81         partial = null;
82         return -1;
83       }
84       if (len >= size) {
85         // This is the only case that is zero-copy.
86         CodedOutputStream stream = CodedOutputStream.newInstance(b, off, size);
87         message.writeTo(stream);
88         stream.flush();
89         stream.checkNoSpaceLeft();
90 
91         message = null;
92         partial = null;
93         return size;
94       }
95 
96       partial = new ByteArrayInputStream(message.toByteArray());
97       message = null;
98     }
99     if (partial != null) {
100       return partial.read(b, off, len);
101     }
102     return -1;
103   }
104 
105   @Override
available()106   public int available() {
107     if (message != null) {
108       return message.getSerializedSize();
109     } else if (partial != null) {
110       return partial.available();
111     }
112     return 0;
113   }
114 
message()115   MessageLite message() {
116     if (message == null) {
117       throw new IllegalStateException("message not available");
118     }
119     return message;
120   }
121 
parser()122   Parser<?> parser() {
123     return parser;
124   }
125 }
126