• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static java.lang.Math.min;
23 
24 import io.grpc.Codec;
25 import io.grpc.Compressor;
26 import io.grpc.Drainable;
27 import io.grpc.KnownLength;
28 import io.grpc.Status;
29 import java.io.ByteArrayInputStream;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.io.OutputStream;
33 import java.nio.ByteBuffer;
34 import java.util.ArrayList;
35 import java.util.List;
36 import javax.annotation.Nullable;
37 
38 /**
39  * Encodes gRPC messages to be delivered via the transport layer which implements {@link
40  * MessageFramer.Sink}.
41  */
42 public class MessageFramer implements Framer {
43 
44   private static final int NO_MAX_OUTBOUND_MESSAGE_SIZE = -1;
45 
46   /**
47    * Sink implemented by the transport layer to receive frames and forward them to their
48    * destination.
49    */
50   public interface Sink {
51     /**
52      * Delivers a frame via the transport.
53      *
54      * @param frame a non-empty buffer to deliver or {@code null} if the framer is being
55      *              closed and there is no data to deliver.
56      * @param endOfStream whether the frame is the last one for the GRPC stream
57      * @param flush {@code true} if more data may not be arriving soon
58      * @param numMessages the number of messages that this series of frames represents
59      */
deliverFrame( @ullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)60     void deliverFrame(
61         @Nullable WritableBuffer frame,
62         boolean endOfStream,
63         boolean flush,
64         int numMessages);
65   }
66 
67   private static final int HEADER_LENGTH = 5;
68   private static final byte UNCOMPRESSED = 0;
69   private static final byte COMPRESSED = 1;
70 
71   private final Sink sink;
72   // effectively final.  Can only be set once.
73   private int maxOutboundMessageSize = NO_MAX_OUTBOUND_MESSAGE_SIZE;
74   private WritableBuffer buffer;
75   private Compressor compressor = Codec.Identity.NONE;
76   private boolean messageCompression = true;
77   private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter();
78   private final byte[] headerScratch = new byte[HEADER_LENGTH];
79   private final WritableBufferAllocator bufferAllocator;
80   private final StatsTraceContext statsTraceCtx;
81   // transportTracer is nullable until it is integrated with client transports
82   private boolean closed;
83 
84   // Tracing and stats-related states
85   private int messagesBuffered;
86   private int currentMessageSeqNo = -1;
87   private long currentMessageWireSize;
88 
89   /**
90    * Creates a {@code MessageFramer}.
91    *
92    * @param sink the sink used to deliver frames to the transport
93    * @param bufferAllocator allocates buffers that the transport can commit to the wire.
94    */
MessageFramer( Sink sink, WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx)95   public MessageFramer(
96       Sink sink, WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) {
97     this.sink = checkNotNull(sink, "sink");
98     this.bufferAllocator = checkNotNull(bufferAllocator, "bufferAllocator");
99     this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
100   }
101 
102   @Override
setCompressor(Compressor compressor)103   public MessageFramer setCompressor(Compressor compressor) {
104     this.compressor = checkNotNull(compressor, "Can't pass an empty compressor");
105     return this;
106   }
107 
108   @Override
setMessageCompression(boolean enable)109   public MessageFramer setMessageCompression(boolean enable) {
110     messageCompression = enable;
111     return this;
112   }
113 
114   @Override
setMaxOutboundMessageSize(int maxSize)115   public void setMaxOutboundMessageSize(int maxSize) {
116     checkState(maxOutboundMessageSize == NO_MAX_OUTBOUND_MESSAGE_SIZE, "max size already set");
117     maxOutboundMessageSize = maxSize;
118   }
119 
120   /**
121    * Writes out a payload message.
122    *
123    * @param message contains the message to be written out. It will be completely consumed.
124    */
125   @Override
writePayload(InputStream message)126   public void writePayload(InputStream message) {
127     verifyNotClosed();
128     messagesBuffered++;
129     currentMessageSeqNo++;
130     currentMessageWireSize = 0;
131     statsTraceCtx.outboundMessage(currentMessageSeqNo);
132     boolean compressed = messageCompression && compressor != Codec.Identity.NONE;
133     int written = -1;
134     int messageLength = -2;
135     try {
136       messageLength = getKnownLength(message);
137       if (messageLength != 0 && compressed) {
138         written = writeCompressed(message, messageLength);
139       } else {
140         written = writeUncompressed(message, messageLength);
141       }
142     } catch (IOException e) {
143       // This should not be possible, since sink#deliverFrame doesn't throw.
144       throw Status.INTERNAL
145           .withDescription("Failed to frame message")
146           .withCause(e)
147           .asRuntimeException();
148     } catch (RuntimeException e) {
149       throw Status.INTERNAL
150           .withDescription("Failed to frame message")
151           .withCause(e)
152           .asRuntimeException();
153     }
154 
155     if (messageLength != -1 && written != messageLength) {
156       String err = String.format("Message length inaccurate %s != %s", written, messageLength);
157       throw Status.INTERNAL.withDescription(err).asRuntimeException();
158     }
159     statsTraceCtx.outboundUncompressedSize(written);
160     statsTraceCtx.outboundWireSize(currentMessageWireSize);
161     statsTraceCtx.outboundMessageSent(currentMessageSeqNo, currentMessageWireSize, written);
162   }
163 
writeUncompressed(InputStream message, int messageLength)164   private int writeUncompressed(InputStream message, int messageLength) throws IOException {
165     if (messageLength != -1) {
166       currentMessageWireSize = messageLength;
167       return writeKnownLengthUncompressed(message, messageLength);
168     }
169     BufferChainOutputStream bufferChain = new BufferChainOutputStream();
170     int written = writeToOutputStream(message, bufferChain);
171     if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
172       throw Status.RESOURCE_EXHAUSTED
173           .withDescription(
174               String.format("message too large %d > %d", written , maxOutboundMessageSize))
175           .asRuntimeException();
176     }
177     writeBufferChain(bufferChain, false);
178     return written;
179   }
180 
writeCompressed(InputStream message, int unusedMessageLength)181   private int writeCompressed(InputStream message, int unusedMessageLength) throws IOException {
182     BufferChainOutputStream bufferChain = new BufferChainOutputStream();
183 
184     OutputStream compressingStream = compressor.compress(bufferChain);
185     int written;
186     try {
187       written = writeToOutputStream(message, compressingStream);
188     } finally {
189       compressingStream.close();
190     }
191     if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
192       throw Status.RESOURCE_EXHAUSTED
193           .withDescription(
194               String.format("message too large %d > %d", written , maxOutboundMessageSize))
195           .asRuntimeException();
196     }
197 
198     writeBufferChain(bufferChain, true);
199     return written;
200   }
201 
getKnownLength(InputStream inputStream)202   private int getKnownLength(InputStream inputStream) throws IOException {
203     if (inputStream instanceof KnownLength || inputStream instanceof ByteArrayInputStream) {
204       return inputStream.available();
205     }
206     return -1;
207   }
208 
209   /**
210    * Write an unserialized message with a known length, uncompressed.
211    */
writeKnownLengthUncompressed(InputStream message, int messageLength)212   private int writeKnownLengthUncompressed(InputStream message, int messageLength)
213       throws IOException {
214     if (maxOutboundMessageSize >= 0 && messageLength > maxOutboundMessageSize) {
215       throw Status.RESOURCE_EXHAUSTED
216           .withDescription(
217               String.format("message too large %d > %d", messageLength , maxOutboundMessageSize))
218           .asRuntimeException();
219     }
220     ByteBuffer header = ByteBuffer.wrap(headerScratch);
221     header.put(UNCOMPRESSED);
222     header.putInt(messageLength);
223     // Allocate the initial buffer chunk based on frame header + payload length.
224     // Note that the allocator may allocate a buffer larger or smaller than this length
225     if (buffer == null) {
226       buffer = bufferAllocator.allocate(header.position() + messageLength);
227     }
228     writeRaw(headerScratch, 0, header.position());
229     return writeToOutputStream(message, outputStreamAdapter);
230   }
231 
232   /**
233    * Write a message that has been serialized to a sequence of buffers.
234    */
writeBufferChain(BufferChainOutputStream bufferChain, boolean compressed)235   private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compressed) {
236     ByteBuffer header = ByteBuffer.wrap(headerScratch);
237     header.put(compressed ? COMPRESSED : UNCOMPRESSED);
238     int messageLength = bufferChain.readableBytes();
239     header.putInt(messageLength);
240     WritableBuffer writeableHeader = bufferAllocator.allocate(HEADER_LENGTH);
241     writeableHeader.write(headerScratch, 0, header.position());
242     if (messageLength == 0) {
243       // the payload had 0 length so make the header the current buffer.
244       buffer = writeableHeader;
245       return;
246     }
247     // Note that we are always delivering a small message to the transport here which
248     // may incur transport framing overhead as it may be sent separately to the contents
249     // of the GRPC frame.
250     // The final message may not be completely written because we do not flush the last buffer.
251     // Do not report the last message as sent.
252     sink.deliverFrame(writeableHeader, false, false, messagesBuffered - 1);
253     messagesBuffered = 1;
254     // Commit all except the last buffer to the sink
255     List<WritableBuffer> bufferList = bufferChain.bufferList;
256     for (int i = 0; i < bufferList.size() - 1; i++) {
257       sink.deliverFrame(bufferList.get(i), false, false, 0);
258     }
259     // Assign the current buffer to the last in the chain so it can be used
260     // for future writes or written with end-of-stream=true on close.
261     buffer = bufferList.get(bufferList.size() - 1);
262     currentMessageWireSize = messageLength;
263   }
264 
writeToOutputStream(InputStream message, OutputStream outputStream)265   private static int writeToOutputStream(InputStream message, OutputStream outputStream)
266       throws IOException {
267     if (message instanceof Drainable) {
268       return ((Drainable) message).drainTo(outputStream);
269     } else {
270       // This makes an unnecessary copy of the bytes when bytebuf supports array(). However, we
271       // expect performance-critical code to support flushTo().
272       long written = IoUtils.copy(message, outputStream);
273       checkArgument(written <= Integer.MAX_VALUE, "Message size overflow: %s", written);
274       return (int) written;
275     }
276   }
277 
writeRaw(byte[] b, int off, int len)278   private void writeRaw(byte[] b, int off, int len) {
279     while (len > 0) {
280       if (buffer != null && buffer.writableBytes() == 0) {
281         commitToSink(false, false);
282       }
283       if (buffer == null) {
284         // Request a buffer allocation using the message length as a hint.
285         buffer = bufferAllocator.allocate(len);
286       }
287       int toWrite = min(len, buffer.writableBytes());
288       buffer.write(b, off, toWrite);
289       off += toWrite;
290       len -= toWrite;
291     }
292   }
293 
294   /**
295    * Flushes any buffered data in the framer to the sink.
296    */
297   @Override
flush()298   public void flush() {
299     if (buffer != null && buffer.readableBytes() > 0) {
300       commitToSink(false, true);
301     }
302   }
303 
304   /**
305    * Indicates whether or not this framer has been closed via a call to either
306    * {@link #close()} or {@link #dispose()}.
307    */
308   @Override
isClosed()309   public boolean isClosed() {
310     return closed;
311   }
312 
313   /**
314    * Flushes and closes the framer and releases any buffers. After the framer is closed or
315    * disposed, additional calls to this method will have no affect.
316    */
317   @Override
close()318   public void close() {
319     if (!isClosed()) {
320       closed = true;
321       // With the current code we don't expect readableBytes > 0 to be possible here, added
322       // defensively to prevent buffer leak issues if the framer code changes later.
323       if (buffer != null && buffer.readableBytes() == 0) {
324         releaseBuffer();
325       }
326       commitToSink(true, true);
327     }
328   }
329 
330   /**
331    * Closes the framer and releases any buffers, but does not flush. After the framer is
332    * closed or disposed, additional calls to this method will have no affect.
333    */
334   @Override
dispose()335   public void dispose() {
336     closed = true;
337     releaseBuffer();
338   }
339 
releaseBuffer()340   private void releaseBuffer() {
341     if (buffer != null) {
342       buffer.release();
343       buffer = null;
344     }
345   }
346 
commitToSink(boolean endOfStream, boolean flush)347   private void commitToSink(boolean endOfStream, boolean flush) {
348     WritableBuffer buf = buffer;
349     buffer = null;
350     sink.deliverFrame(buf, endOfStream, flush, messagesBuffered);
351     messagesBuffered = 0;
352   }
353 
verifyNotClosed()354   private void verifyNotClosed() {
355     if (isClosed()) {
356       throw new IllegalStateException("Framer already closed");
357     }
358   }
359 
360   /** OutputStream whose write()s are passed to the framer. */
361   private class OutputStreamAdapter extends OutputStream {
362     /**
363      * This is slow, don't call it.  If you care about write overhead, use a BufferedOutputStream.
364      * Better yet, you can use your own single byte buffer and call
365      * {@link #write(byte[], int, int)}.
366      */
367     @Override
write(int b)368     public void write(int b) {
369       byte[] singleByte = new byte[]{(byte)b};
370       write(singleByte, 0, 1);
371     }
372 
373     @Override
write(byte[] b, int off, int len)374     public void write(byte[] b, int off, int len) {
375       writeRaw(b, off, len);
376     }
377   }
378 
379   /**
380    * Produce a collection of {@link WritableBuffer} instances from the data written to an
381    * {@link OutputStream}.
382    */
383   private final class BufferChainOutputStream extends OutputStream {
384     private final List<WritableBuffer> bufferList = new ArrayList<>();
385     private WritableBuffer current;
386 
387     /**
388      * This is slow, don't call it.  If you care about write overhead, use a BufferedOutputStream.
389      * Better yet, you can use your own single byte buffer and call
390      * {@link #write(byte[], int, int)}.
391      */
392     @Override
write(int b)393     public void write(int b) throws IOException {
394       if (current != null && current.writableBytes() > 0) {
395         current.write((byte)b);
396         return;
397       }
398       byte[] singleByte = new byte[]{(byte)b};
399       write(singleByte, 0, 1);
400     }
401 
402     @Override
write(byte[] b, int off, int len)403     public void write(byte[] b, int off, int len) {
404       if (current == null) {
405         // Request len bytes initially from the allocator, it may give us more.
406         current = bufferAllocator.allocate(len);
407         bufferList.add(current);
408       }
409       while (len > 0) {
410         int canWrite = Math.min(len, current.writableBytes());
411         if (canWrite == 0) {
412           // Assume message is twice as large as previous assumption if were still not done,
413           // the allocator may allocate more or less than this amount.
414           int needed = Math.max(len, current.readableBytes() * 2);
415           current = bufferAllocator.allocate(needed);
416           bufferList.add(current);
417         } else {
418           current.write(b, off, canWrite);
419           off += canWrite;
420           len -= canWrite;
421         }
422       }
423     }
424 
readableBytes()425     private int readableBytes() {
426       int readable = 0;
427       for (WritableBuffer writableBuffer : bufferList) {
428         readable += writableBuffer.readableBytes();
429       }
430       return readable;
431     }
432   }
433 }
434