1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import java.io.IOException; 20 import java.io.OutputStream; 21 import java.nio.Buffer; 22 import java.nio.ByteBuffer; 23 import java.nio.InvalidMarkException; 24 import java.util.ArrayDeque; 25 import java.util.Deque; 26 import javax.annotation.Nullable; 27 28 /** 29 * A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a 30 * facade that allows multiple buffers to be treated as one. 31 * 32 * <p>When a buffer is added to a composite, its life cycle is controlled by the composite. Once 33 * the composite has read past the end of a given buffer, that buffer is automatically closed and 34 * removed from the composite. 35 */ 36 public class CompositeReadableBuffer extends AbstractReadableBuffer { 37 38 private final Deque<ReadableBuffer> readableBuffers; 39 private Deque<ReadableBuffer> rewindableBuffers; 40 private int readableBytes; 41 private boolean marked; 42 CompositeReadableBuffer(int initialCapacity)43 public CompositeReadableBuffer(int initialCapacity) { 44 readableBuffers = new ArrayDeque<>(initialCapacity); 45 } 46 CompositeReadableBuffer()47 public CompositeReadableBuffer() { 48 readableBuffers = new ArrayDeque<>(); 49 } 50 51 /** 52 * Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is 53 * expected that this {@code CompositeBuffer} has complete ownership. Any attempt to modify the 54 * buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of 55 * this {@code CompositeBuffer}. 56 */ addBuffer(ReadableBuffer buffer)57 public void addBuffer(ReadableBuffer buffer) { 58 boolean markHead = marked && readableBuffers.isEmpty(); 59 enqueueBuffer(buffer); 60 if (markHead) { 61 readableBuffers.peek().mark(); 62 } 63 } 64 enqueueBuffer(ReadableBuffer buffer)65 private void enqueueBuffer(ReadableBuffer buffer) { 66 if (!(buffer instanceof CompositeReadableBuffer)) { 67 readableBuffers.add(buffer); 68 readableBytes += buffer.readableBytes(); 69 return; 70 } 71 72 CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer; 73 while (!compositeBuffer.readableBuffers.isEmpty()) { 74 ReadableBuffer subBuffer = compositeBuffer.readableBuffers.remove(); 75 readableBuffers.add(subBuffer); 76 } 77 readableBytes += compositeBuffer.readableBytes; 78 compositeBuffer.readableBytes = 0; 79 compositeBuffer.close(); 80 } 81 82 @Override readableBytes()83 public int readableBytes() { 84 return readableBytes; 85 } 86 87 private static final NoThrowReadOperation<Void> UBYTE_OP = 88 new NoThrowReadOperation<Void>() { 89 @Override 90 public int read(ReadableBuffer buffer, int length, Void unused, int value) { 91 return buffer.readUnsignedByte(); 92 } 93 }; 94 95 @Override readUnsignedByte()96 public int readUnsignedByte() { 97 return executeNoThrow(UBYTE_OP, 1, null, 0); 98 } 99 100 private static final NoThrowReadOperation<Void> SKIP_OP = 101 new NoThrowReadOperation<Void>() { 102 @Override 103 public int read(ReadableBuffer buffer, int length, Void unused, int unused2) { 104 buffer.skipBytes(length); 105 return 0; 106 } 107 }; 108 109 @Override skipBytes(int length)110 public void skipBytes(int length) { 111 executeNoThrow(SKIP_OP, length, null, 0); 112 } 113 114 private static final NoThrowReadOperation<byte[]> BYTE_ARRAY_OP = 115 new NoThrowReadOperation<byte[]>() { 116 @Override 117 public int read(ReadableBuffer buffer, int length, byte[] dest, int offset) { 118 buffer.readBytes(dest, offset, length); 119 return offset + length; 120 } 121 }; 122 123 @Override readBytes(byte[] dest, int destOffset, int length)124 public void readBytes(byte[] dest, int destOffset, int length) { 125 executeNoThrow(BYTE_ARRAY_OP, length, dest, destOffset); 126 } 127 128 private static final NoThrowReadOperation<ByteBuffer> BYTE_BUF_OP = 129 new NoThrowReadOperation<ByteBuffer>() { 130 @Override 131 public int read(ReadableBuffer buffer, int length, ByteBuffer dest, int unused) { 132 // Change the limit so that only lengthToCopy bytes are available. 133 int prevLimit = dest.limit(); 134 ((Buffer) dest).limit(dest.position() + length); 135 // Write the bytes and restore the original limit. 136 buffer.readBytes(dest); 137 ((Buffer) dest).limit(prevLimit); 138 return 0; 139 } 140 }; 141 142 @Override readBytes(ByteBuffer dest)143 public void readBytes(ByteBuffer dest) { 144 executeNoThrow(BYTE_BUF_OP, dest.remaining(), dest, 0); 145 } 146 147 private static final ReadOperation<OutputStream> STREAM_OP = 148 new ReadOperation<OutputStream>() { 149 @Override 150 public int read(ReadableBuffer buffer, int length, OutputStream dest, int unused) 151 throws IOException { 152 buffer.readBytes(dest, length); 153 return 0; 154 } 155 }; 156 157 @Override readBytes(OutputStream dest, int length)158 public void readBytes(OutputStream dest, int length) throws IOException { 159 execute(STREAM_OP, length, dest, 0); 160 } 161 162 @Override readBytes(int length)163 public ReadableBuffer readBytes(int length) { 164 if (length <= 0) { 165 return ReadableBuffers.empty(); 166 } 167 checkReadable(length); 168 readableBytes -= length; 169 170 ReadableBuffer newBuffer = null; 171 CompositeReadableBuffer newComposite = null; 172 do { 173 ReadableBuffer buffer = readableBuffers.peek(); 174 int readable = buffer.readableBytes(); 175 ReadableBuffer readBuffer; 176 if (readable > length) { 177 readBuffer = buffer.readBytes(length); 178 length = 0; 179 } else { 180 if (marked) { 181 readBuffer = buffer.readBytes(readable); 182 advanceBuffer(); 183 } else { 184 readBuffer = readableBuffers.poll(); 185 } 186 length -= readable; 187 } 188 if (newBuffer == null) { 189 newBuffer = readBuffer; 190 } else { 191 if (newComposite == null) { 192 newComposite = new CompositeReadableBuffer( 193 length == 0 ? 2 : Math.min(readableBuffers.size() + 2, 16)); 194 newComposite.addBuffer(newBuffer); 195 newBuffer = newComposite; 196 } 197 newComposite.addBuffer(readBuffer); 198 } 199 } while (length > 0); 200 return newBuffer; 201 } 202 203 @Override markSupported()204 public boolean markSupported() { 205 for (ReadableBuffer buffer : readableBuffers) { 206 if (!buffer.markSupported()) { 207 return false; 208 } 209 } 210 return true; 211 } 212 213 @Override mark()214 public void mark() { 215 if (rewindableBuffers == null) { 216 rewindableBuffers = new ArrayDeque<>(Math.min(readableBuffers.size(), 16)); 217 } 218 while (!rewindableBuffers.isEmpty()) { 219 rewindableBuffers.remove().close(); 220 } 221 marked = true; 222 ReadableBuffer buffer = readableBuffers.peek(); 223 if (buffer != null) { 224 buffer.mark(); 225 } 226 } 227 228 @Override reset()229 public void reset() { 230 if (!marked) { 231 throw new InvalidMarkException(); 232 } 233 ReadableBuffer buffer; 234 if ((buffer = readableBuffers.peek()) != null) { 235 int currentRemain = buffer.readableBytes(); 236 buffer.reset(); 237 readableBytes += (buffer.readableBytes() - currentRemain); 238 } 239 while ((buffer = rewindableBuffers.pollLast()) != null) { 240 buffer.reset(); 241 readableBuffers.addFirst(buffer); 242 readableBytes += buffer.readableBytes(); 243 } 244 } 245 246 @Override byteBufferSupported()247 public boolean byteBufferSupported() { 248 for (ReadableBuffer buffer : readableBuffers) { 249 if (!buffer.byteBufferSupported()) { 250 return false; 251 } 252 } 253 return true; 254 } 255 256 @Nullable 257 @Override getByteBuffer()258 public ByteBuffer getByteBuffer() { 259 if (readableBuffers.isEmpty()) { 260 return null; 261 } 262 return readableBuffers.peek().getByteBuffer(); 263 } 264 265 @Override close()266 public void close() { 267 while (!readableBuffers.isEmpty()) { 268 readableBuffers.remove().close(); 269 } 270 if (rewindableBuffers != null) { 271 while (!rewindableBuffers.isEmpty()) { 272 rewindableBuffers.remove().close(); 273 } 274 } 275 } 276 277 /** 278 * Executes the given {@link ReadOperation} against the {@link ReadableBuffer}s required to 279 * satisfy the requested {@code length}. 280 */ execute(ReadOperation<T> op, int length, T dest, int value)281 private <T> int execute(ReadOperation<T> op, int length, T dest, int value) throws IOException { 282 checkReadable(length); 283 284 if (!readableBuffers.isEmpty()) { 285 advanceBufferIfNecessary(); 286 } 287 288 for (; length > 0 && !readableBuffers.isEmpty(); advanceBufferIfNecessary()) { 289 ReadableBuffer buffer = readableBuffers.peek(); 290 int lengthToCopy = Math.min(length, buffer.readableBytes()); 291 292 // Perform the read operation for this buffer. 293 value = op.read(buffer, lengthToCopy, dest, value); 294 295 length -= lengthToCopy; 296 readableBytes -= lengthToCopy; 297 } 298 299 if (length > 0) { 300 // Should never get here. 301 throw new AssertionError("Failed executing read operation"); 302 } 303 304 return value; 305 } 306 executeNoThrow(NoThrowReadOperation<T> op, int length, T dest, int value)307 private <T> int executeNoThrow(NoThrowReadOperation<T> op, int length, T dest, int value) { 308 try { 309 return execute(op, length, dest, value); 310 } catch (IOException e) { 311 throw new AssertionError(e); // shouldn't happen 312 } 313 } 314 315 /** 316 * If the current buffer is exhausted, removes and closes it. 317 */ advanceBufferIfNecessary()318 private void advanceBufferIfNecessary() { 319 ReadableBuffer buffer = readableBuffers.peek(); 320 if (buffer.readableBytes() == 0) { 321 advanceBuffer(); 322 } 323 } 324 325 /** 326 * Removes one buffer from the front and closes it. 327 */ advanceBuffer()328 private void advanceBuffer() { 329 if (marked) { 330 rewindableBuffers.add(readableBuffers.remove()); 331 ReadableBuffer next = readableBuffers.peek(); 332 if (next != null) { 333 next.mark(); 334 } 335 } else { 336 readableBuffers.remove().close(); 337 } 338 } 339 340 /** 341 * A simple read operation to perform on a single {@link ReadableBuffer}. 342 * All state management for the buffers is done by 343 * {@link CompositeReadableBuffer#execute(ReadOperation, int, Object, int)}. 344 */ 345 private interface ReadOperation<T> { 346 /** 347 * This method can also be used to simultaneously perform operation-specific int-valued 348 * aggregation over the sequence of buffers in a {@link CompositeReadableBuffer}. 349 * {@code value} is the return value from the prior buffer, or the "initial" value passed 350 * to {@code execute()} in the case of the first buffer. {@code execute()} returns the value 351 * returned by the operation called on the last buffer. 352 */ read(ReadableBuffer buffer, int length, T dest, int value)353 int read(ReadableBuffer buffer, int length, T dest, int value) throws IOException; 354 } 355 356 private interface NoThrowReadOperation<T> extends ReadOperation<T> { 357 @Override read(ReadableBuffer buffer, int length, T dest, int value)358 int read(ReadableBuffer buffer, int length, T dest, int value); 359 } 360 } 361