1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import java.io.IOException; 20 import java.io.OutputStream; 21 import java.nio.ByteBuffer; 22 import java.util.ArrayDeque; 23 import java.util.Queue; 24 25 /** 26 * A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a 27 * facade that allows multiple buffers to be treated as one. 28 * 29 * <p>When a buffer is added to a composite, its life cycle is controlled by the composite. Once 30 * the composite has read past the end of a given buffer, that buffer is automatically closed and 31 * removed from the composite. 32 */ 33 public class CompositeReadableBuffer extends AbstractReadableBuffer { 34 35 private int readableBytes; 36 private final Queue<ReadableBuffer> buffers = new ArrayDeque<ReadableBuffer>(); 37 38 /** 39 * Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is 40 * expected that this {@code CompositeBuffer} has complete ownership. Any attempt to modify the 41 * buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of 42 * this {@code CompositeBuffer}. 43 */ addBuffer(ReadableBuffer buffer)44 public void addBuffer(ReadableBuffer buffer) { 45 if (!(buffer instanceof CompositeReadableBuffer)) { 46 buffers.add(buffer); 47 readableBytes += buffer.readableBytes(); 48 return; 49 } 50 51 CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer; 52 while (!compositeBuffer.buffers.isEmpty()) { 53 ReadableBuffer subBuffer = compositeBuffer.buffers.remove(); 54 buffers.add(subBuffer); 55 } 56 readableBytes += compositeBuffer.readableBytes; 57 compositeBuffer.readableBytes = 0; 58 compositeBuffer.close(); 59 } 60 61 @Override readableBytes()62 public int readableBytes() { 63 return readableBytes; 64 } 65 66 @Override readUnsignedByte()67 public int readUnsignedByte() { 68 ReadOperation op = new ReadOperation() { 69 @Override 70 int readInternal(ReadableBuffer buffer, int length) { 71 return buffer.readUnsignedByte(); 72 } 73 }; 74 execute(op, 1); 75 return op.value; 76 } 77 78 @Override skipBytes(int length)79 public void skipBytes(int length) { 80 execute(new ReadOperation() { 81 @Override 82 public int readInternal(ReadableBuffer buffer, int length) { 83 buffer.skipBytes(length); 84 return 0; 85 } 86 }, length); 87 } 88 89 @Override readBytes(final byte[] dest, final int destOffset, int length)90 public void readBytes(final byte[] dest, final int destOffset, int length) { 91 execute(new ReadOperation() { 92 int currentOffset = destOffset; 93 @Override 94 public int readInternal(ReadableBuffer buffer, int length) { 95 buffer.readBytes(dest, currentOffset, length); 96 currentOffset += length; 97 return 0; 98 } 99 }, length); 100 } 101 102 @Override readBytes(final ByteBuffer dest)103 public void readBytes(final ByteBuffer dest) { 104 execute(new ReadOperation() { 105 @Override 106 public int readInternal(ReadableBuffer buffer, int length) { 107 // Change the limit so that only lengthToCopy bytes are available. 108 int prevLimit = dest.limit(); 109 dest.limit(dest.position() + length); 110 111 // Write the bytes and restore the original limit. 112 buffer.readBytes(dest); 113 dest.limit(prevLimit); 114 return 0; 115 } 116 }, dest.remaining()); 117 } 118 119 @Override readBytes(final OutputStream dest, int length)120 public void readBytes(final OutputStream dest, int length) throws IOException { 121 ReadOperation op = new ReadOperation() { 122 @Override 123 public int readInternal(ReadableBuffer buffer, int length) throws IOException { 124 buffer.readBytes(dest, length); 125 return 0; 126 } 127 }; 128 execute(op, length); 129 130 // If an exception occurred, throw it. 131 if (op.isError()) { 132 throw op.ex; 133 } 134 } 135 136 @Override readBytes(int length)137 public CompositeReadableBuffer readBytes(int length) { 138 checkReadable(length); 139 readableBytes -= length; 140 141 CompositeReadableBuffer newBuffer = new CompositeReadableBuffer(); 142 while (length > 0) { 143 ReadableBuffer buffer = buffers.peek(); 144 if (buffer.readableBytes() > length) { 145 newBuffer.addBuffer(buffer.readBytes(length)); 146 length = 0; 147 } else { 148 newBuffer.addBuffer(buffers.poll()); 149 length -= buffer.readableBytes(); 150 } 151 } 152 return newBuffer; 153 } 154 155 @Override close()156 public void close() { 157 while (!buffers.isEmpty()) { 158 buffers.remove().close(); 159 } 160 } 161 162 /** 163 * Executes the given {@link ReadOperation} against the {@link ReadableBuffer}s required to 164 * satisfy the requested {@code length}. 165 */ execute(ReadOperation op, int length)166 private void execute(ReadOperation op, int length) { 167 checkReadable(length); 168 169 if (!buffers.isEmpty()) { 170 advanceBufferIfNecessary(); 171 } 172 173 for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) { 174 ReadableBuffer buffer = buffers.peek(); 175 int lengthToCopy = Math.min(length, buffer.readableBytes()); 176 177 // Perform the read operation for this buffer. 178 op.read(buffer, lengthToCopy); 179 if (op.isError()) { 180 return; 181 } 182 183 length -= lengthToCopy; 184 readableBytes -= lengthToCopy; 185 } 186 187 if (length > 0) { 188 // Should never get here. 189 throw new AssertionError("Failed executing read operation"); 190 } 191 } 192 193 /** 194 * If the current buffer is exhausted, removes and closes it. 195 */ advanceBufferIfNecessary()196 private void advanceBufferIfNecessary() { 197 ReadableBuffer buffer = buffers.peek(); 198 if (buffer.readableBytes() == 0) { 199 buffers.remove().close(); 200 } 201 } 202 203 /** 204 * A simple read operation to perform on a single {@link ReadableBuffer}. All state management for 205 * the buffers is done by {@link CompositeReadableBuffer#execute(ReadOperation, int)}. 206 */ 207 private abstract static class ReadOperation { 208 /** 209 * Only used by {@link CompositeReadableBuffer#readUnsignedByte()}. 210 */ 211 int value; 212 213 /** 214 * Only used by {@link CompositeReadableBuffer#readBytes(OutputStream, int)}. 215 */ 216 IOException ex; 217 read(ReadableBuffer buffer, int length)218 final void read(ReadableBuffer buffer, int length) { 219 try { 220 value = readInternal(buffer, length); 221 } catch (IOException e) { 222 ex = e; 223 } 224 } 225 isError()226 final boolean isError() { 227 return ex != null; 228 } 229 readInternal(ReadableBuffer buffer, int length)230 abstract int readInternal(ReadableBuffer buffer, int length) throws IOException; 231 } 232 } 233