1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static java.lang.Math.min; 23 24 import io.grpc.Codec; 25 import io.grpc.Compressor; 26 import io.grpc.Drainable; 27 import io.grpc.KnownLength; 28 import io.grpc.Status; 29 import java.io.ByteArrayInputStream; 30 import java.io.IOException; 31 import java.io.InputStream; 32 import java.io.OutputStream; 33 import java.nio.ByteBuffer; 34 import java.util.ArrayList; 35 import java.util.List; 36 import javax.annotation.Nullable; 37 38 /** 39 * Encodes gRPC messages to be delivered via the transport layer which implements {@link 40 * MessageFramer.Sink}. 41 */ 42 public class MessageFramer implements Framer { 43 44 private static final int NO_MAX_OUTBOUND_MESSAGE_SIZE = -1; 45 46 /** 47 * Sink implemented by the transport layer to receive frames and forward them to their 48 * destination. 49 */ 50 public interface Sink { 51 /** 52 * Delivers a frame via the transport. 53 * 54 * @param frame a non-empty buffer to deliver or {@code null} if the framer is being 55 * closed and there is no data to deliver. 56 * @param endOfStream whether the frame is the last one for the GRPC stream 57 * @param flush {@code true} if more data may not be arriving soon 58 * @param numMessages the number of messages that this series of frames represents 59 */ deliverFrame( @ullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)60 void deliverFrame( 61 @Nullable WritableBuffer frame, 62 boolean endOfStream, 63 boolean flush, 64 int numMessages); 65 } 66 67 private static final int HEADER_LENGTH = 5; 68 private static final byte UNCOMPRESSED = 0; 69 private static final byte COMPRESSED = 1; 70 71 private final Sink sink; 72 // effectively final. Can only be set once. 73 private int maxOutboundMessageSize = NO_MAX_OUTBOUND_MESSAGE_SIZE; 74 private WritableBuffer buffer; 75 private Compressor compressor = Codec.Identity.NONE; 76 private boolean messageCompression = true; 77 private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter(); 78 private final byte[] headerScratch = new byte[HEADER_LENGTH]; 79 private final WritableBufferAllocator bufferAllocator; 80 private final StatsTraceContext statsTraceCtx; 81 // transportTracer is nullable until it is integrated with client transports 82 private boolean closed; 83 84 // Tracing and stats-related states 85 private int messagesBuffered; 86 private int currentMessageSeqNo = -1; 87 private long currentMessageWireSize; 88 89 /** 90 * Creates a {@code MessageFramer}. 91 * 92 * @param sink the sink used to deliver frames to the transport 93 * @param bufferAllocator allocates buffers that the transport can commit to the wire. 94 */ MessageFramer( Sink sink, WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx)95 public MessageFramer( 96 Sink sink, WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) { 97 this.sink = checkNotNull(sink, "sink"); 98 this.bufferAllocator = checkNotNull(bufferAllocator, "bufferAllocator"); 99 this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); 100 } 101 102 @Override setCompressor(Compressor compressor)103 public MessageFramer setCompressor(Compressor compressor) { 104 this.compressor = checkNotNull(compressor, "Can't pass an empty compressor"); 105 return this; 106 } 107 108 @Override setMessageCompression(boolean enable)109 public MessageFramer setMessageCompression(boolean enable) { 110 messageCompression = enable; 111 return this; 112 } 113 114 @Override setMaxOutboundMessageSize(int maxSize)115 public void setMaxOutboundMessageSize(int maxSize) { 116 checkState(maxOutboundMessageSize == NO_MAX_OUTBOUND_MESSAGE_SIZE, "max size already set"); 117 maxOutboundMessageSize = maxSize; 118 } 119 120 /** 121 * Writes out a payload message. 122 * 123 * @param message contains the message to be written out. It will be completely consumed. 124 */ 125 @Override writePayload(InputStream message)126 public void writePayload(InputStream message) { 127 verifyNotClosed(); 128 messagesBuffered++; 129 currentMessageSeqNo++; 130 currentMessageWireSize = 0; 131 statsTraceCtx.outboundMessage(currentMessageSeqNo); 132 boolean compressed = messageCompression && compressor != Codec.Identity.NONE; 133 int written = -1; 134 int messageLength = -2; 135 try { 136 messageLength = getKnownLength(message); 137 if (messageLength != 0 && compressed) { 138 written = writeCompressed(message, messageLength); 139 } else { 140 written = writeUncompressed(message, messageLength); 141 } 142 } catch (IOException e) { 143 // This should not be possible, since sink#deliverFrame doesn't throw. 144 throw Status.INTERNAL 145 .withDescription("Failed to frame message") 146 .withCause(e) 147 .asRuntimeException(); 148 } catch (RuntimeException e) { 149 throw Status.INTERNAL 150 .withDescription("Failed to frame message") 151 .withCause(e) 152 .asRuntimeException(); 153 } 154 155 if (messageLength != -1 && written != messageLength) { 156 String err = String.format("Message length inaccurate %s != %s", written, messageLength); 157 throw Status.INTERNAL.withDescription(err).asRuntimeException(); 158 } 159 statsTraceCtx.outboundUncompressedSize(written); 160 statsTraceCtx.outboundWireSize(currentMessageWireSize); 161 statsTraceCtx.outboundMessageSent(currentMessageSeqNo, currentMessageWireSize, written); 162 } 163 writeUncompressed(InputStream message, int messageLength)164 private int writeUncompressed(InputStream message, int messageLength) throws IOException { 165 if (messageLength != -1) { 166 currentMessageWireSize = messageLength; 167 return writeKnownLengthUncompressed(message, messageLength); 168 } 169 BufferChainOutputStream bufferChain = new BufferChainOutputStream(); 170 int written = writeToOutputStream(message, bufferChain); 171 if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) { 172 throw Status.RESOURCE_EXHAUSTED 173 .withDescription( 174 String.format("message too large %d > %d", written , maxOutboundMessageSize)) 175 .asRuntimeException(); 176 } 177 writeBufferChain(bufferChain, false); 178 return written; 179 } 180 writeCompressed(InputStream message, int unusedMessageLength)181 private int writeCompressed(InputStream message, int unusedMessageLength) throws IOException { 182 BufferChainOutputStream bufferChain = new BufferChainOutputStream(); 183 184 OutputStream compressingStream = compressor.compress(bufferChain); 185 int written; 186 try { 187 written = writeToOutputStream(message, compressingStream); 188 } finally { 189 compressingStream.close(); 190 } 191 if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) { 192 throw Status.RESOURCE_EXHAUSTED 193 .withDescription( 194 String.format("message too large %d > %d", written , maxOutboundMessageSize)) 195 .asRuntimeException(); 196 } 197 198 writeBufferChain(bufferChain, true); 199 return written; 200 } 201 getKnownLength(InputStream inputStream)202 private int getKnownLength(InputStream inputStream) throws IOException { 203 if (inputStream instanceof KnownLength || inputStream instanceof ByteArrayInputStream) { 204 return inputStream.available(); 205 } 206 return -1; 207 } 208 209 /** 210 * Write an unserialized message with a known length, uncompressed. 211 */ writeKnownLengthUncompressed(InputStream message, int messageLength)212 private int writeKnownLengthUncompressed(InputStream message, int messageLength) 213 throws IOException { 214 if (maxOutboundMessageSize >= 0 && messageLength > maxOutboundMessageSize) { 215 throw Status.RESOURCE_EXHAUSTED 216 .withDescription( 217 String.format("message too large %d > %d", messageLength , maxOutboundMessageSize)) 218 .asRuntimeException(); 219 } 220 ByteBuffer header = ByteBuffer.wrap(headerScratch); 221 header.put(UNCOMPRESSED); 222 header.putInt(messageLength); 223 // Allocate the initial buffer chunk based on frame header + payload length. 224 // Note that the allocator may allocate a buffer larger or smaller than this length 225 if (buffer == null) { 226 buffer = bufferAllocator.allocate(header.position() + messageLength); 227 } 228 writeRaw(headerScratch, 0, header.position()); 229 return writeToOutputStream(message, outputStreamAdapter); 230 } 231 232 /** 233 * Write a message that has been serialized to a sequence of buffers. 234 */ writeBufferChain(BufferChainOutputStream bufferChain, boolean compressed)235 private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compressed) { 236 ByteBuffer header = ByteBuffer.wrap(headerScratch); 237 header.put(compressed ? COMPRESSED : UNCOMPRESSED); 238 int messageLength = bufferChain.readableBytes(); 239 header.putInt(messageLength); 240 WritableBuffer writeableHeader = bufferAllocator.allocate(HEADER_LENGTH); 241 writeableHeader.write(headerScratch, 0, header.position()); 242 if (messageLength == 0) { 243 // the payload had 0 length so make the header the current buffer. 244 buffer = writeableHeader; 245 return; 246 } 247 // Note that we are always delivering a small message to the transport here which 248 // may incur transport framing overhead as it may be sent separately to the contents 249 // of the GRPC frame. 250 // The final message may not be completely written because we do not flush the last buffer. 251 // Do not report the last message as sent. 252 sink.deliverFrame(writeableHeader, false, false, messagesBuffered - 1); 253 messagesBuffered = 1; 254 // Commit all except the last buffer to the sink 255 List<WritableBuffer> bufferList = bufferChain.bufferList; 256 for (int i = 0; i < bufferList.size() - 1; i++) { 257 sink.deliverFrame(bufferList.get(i), false, false, 0); 258 } 259 // Assign the current buffer to the last in the chain so it can be used 260 // for future writes or written with end-of-stream=true on close. 261 buffer = bufferList.get(bufferList.size() - 1); 262 currentMessageWireSize = messageLength; 263 } 264 writeToOutputStream(InputStream message, OutputStream outputStream)265 private static int writeToOutputStream(InputStream message, OutputStream outputStream) 266 throws IOException { 267 if (message instanceof Drainable) { 268 return ((Drainable) message).drainTo(outputStream); 269 } else { 270 // This makes an unnecessary copy of the bytes when bytebuf supports array(). However, we 271 // expect performance-critical code to support flushTo(). 272 long written = IoUtils.copy(message, outputStream); 273 checkArgument(written <= Integer.MAX_VALUE, "Message size overflow: %s", written); 274 return (int) written; 275 } 276 } 277 writeRaw(byte[] b, int off, int len)278 private void writeRaw(byte[] b, int off, int len) { 279 while (len > 0) { 280 if (buffer != null && buffer.writableBytes() == 0) { 281 commitToSink(false, false); 282 } 283 if (buffer == null) { 284 // Request a buffer allocation using the message length as a hint. 285 buffer = bufferAllocator.allocate(len); 286 } 287 int toWrite = min(len, buffer.writableBytes()); 288 buffer.write(b, off, toWrite); 289 off += toWrite; 290 len -= toWrite; 291 } 292 } 293 294 /** 295 * Flushes any buffered data in the framer to the sink. 296 */ 297 @Override flush()298 public void flush() { 299 if (buffer != null && buffer.readableBytes() > 0) { 300 commitToSink(false, true); 301 } 302 } 303 304 /** 305 * Indicates whether or not this framer has been closed via a call to either 306 * {@link #close()} or {@link #dispose()}. 307 */ 308 @Override isClosed()309 public boolean isClosed() { 310 return closed; 311 } 312 313 /** 314 * Flushes and closes the framer and releases any buffers. After the framer is closed or 315 * disposed, additional calls to this method will have no affect. 316 */ 317 @Override close()318 public void close() { 319 if (!isClosed()) { 320 closed = true; 321 // With the current code we don't expect readableBytes > 0 to be possible here, added 322 // defensively to prevent buffer leak issues if the framer code changes later. 323 if (buffer != null && buffer.readableBytes() == 0) { 324 releaseBuffer(); 325 } 326 commitToSink(true, true); 327 } 328 } 329 330 /** 331 * Closes the framer and releases any buffers, but does not flush. After the framer is 332 * closed or disposed, additional calls to this method will have no affect. 333 */ 334 @Override dispose()335 public void dispose() { 336 closed = true; 337 releaseBuffer(); 338 } 339 releaseBuffer()340 private void releaseBuffer() { 341 if (buffer != null) { 342 buffer.release(); 343 buffer = null; 344 } 345 } 346 commitToSink(boolean endOfStream, boolean flush)347 private void commitToSink(boolean endOfStream, boolean flush) { 348 WritableBuffer buf = buffer; 349 buffer = null; 350 sink.deliverFrame(buf, endOfStream, flush, messagesBuffered); 351 messagesBuffered = 0; 352 } 353 verifyNotClosed()354 private void verifyNotClosed() { 355 if (isClosed()) { 356 throw new IllegalStateException("Framer already closed"); 357 } 358 } 359 360 /** OutputStream whose write()s are passed to the framer. */ 361 private class OutputStreamAdapter extends OutputStream { 362 /** 363 * This is slow, don't call it. If you care about write overhead, use a BufferedOutputStream. 364 * Better yet, you can use your own single byte buffer and call 365 * {@link #write(byte[], int, int)}. 366 */ 367 @Override write(int b)368 public void write(int b) { 369 byte[] singleByte = new byte[]{(byte)b}; 370 write(singleByte, 0, 1); 371 } 372 373 @Override write(byte[] b, int off, int len)374 public void write(byte[] b, int off, int len) { 375 writeRaw(b, off, len); 376 } 377 } 378 379 /** 380 * Produce a collection of {@link WritableBuffer} instances from the data written to an 381 * {@link OutputStream}. 382 */ 383 private final class BufferChainOutputStream extends OutputStream { 384 private final List<WritableBuffer> bufferList = new ArrayList<>(); 385 private WritableBuffer current; 386 387 /** 388 * This is slow, don't call it. If you care about write overhead, use a BufferedOutputStream. 389 * Better yet, you can use your own single byte buffer and call 390 * {@link #write(byte[], int, int)}. 391 */ 392 @Override write(int b)393 public void write(int b) throws IOException { 394 if (current != null && current.writableBytes() > 0) { 395 current.write((byte)b); 396 return; 397 } 398 byte[] singleByte = new byte[]{(byte)b}; 399 write(singleByte, 0, 1); 400 } 401 402 @Override write(byte[] b, int off, int len)403 public void write(byte[] b, int off, int len) { 404 if (current == null) { 405 // Request len bytes initially from the allocator, it may give us more. 406 current = bufferAllocator.allocate(len); 407 bufferList.add(current); 408 } 409 while (len > 0) { 410 int canWrite = Math.min(len, current.writableBytes()); 411 if (canWrite == 0) { 412 // Assume message is twice as large as previous assumption if were still not done, 413 // the allocator may allocate more or less than this amount. 414 int needed = Math.max(len, current.readableBytes() * 2); 415 current = bufferAllocator.allocate(needed); 416 bufferList.add(current); 417 } else { 418 current.write(b, off, canWrite); 419 off += canWrite; 420 len -= canWrite; 421 } 422 } 423 } 424 readableBytes()425 private int readableBytes() { 426 int readable = 0; 427 for (WritableBuffer writableBuffer : bufferList) { 428 readable += writableBuffer.readableBytes(); 429 } 430 return readable; 431 } 432 } 433 } 434