• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import java.io.IOException;
20 import java.io.OutputStream;
21 import java.nio.ByteBuffer;
22 import java.util.ArrayDeque;
23 import java.util.Queue;
24 
25 /**
26  * A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a
27  * facade that allows multiple buffers to be treated as one.
28  *
29  * <p>When a buffer is added to a composite, its life cycle is controlled by the composite. Once
30  * the composite has read past the end of a given buffer, that buffer is automatically closed and
31  * removed from the composite.
32  */
33 public class CompositeReadableBuffer extends AbstractReadableBuffer {
34 
35   private int readableBytes;
36   private final Queue<ReadableBuffer> buffers = new ArrayDeque<ReadableBuffer>();
37 
38   /**
39    * Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is
40    * expected that this {@code CompositeBuffer} has complete ownership. Any attempt to modify the
41    * buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of
42    * this {@code CompositeBuffer}.
43    */
addBuffer(ReadableBuffer buffer)44   public void addBuffer(ReadableBuffer buffer) {
45     if (!(buffer instanceof CompositeReadableBuffer)) {
46       buffers.add(buffer);
47       readableBytes += buffer.readableBytes();
48       return;
49     }
50 
51     CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer;
52     while (!compositeBuffer.buffers.isEmpty()) {
53       ReadableBuffer subBuffer = compositeBuffer.buffers.remove();
54       buffers.add(subBuffer);
55     }
56     readableBytes += compositeBuffer.readableBytes;
57     compositeBuffer.readableBytes = 0;
58     compositeBuffer.close();
59   }
60 
61   @Override
readableBytes()62   public int readableBytes() {
63     return readableBytes;
64   }
65 
66   @Override
readUnsignedByte()67   public int readUnsignedByte() {
68     ReadOperation op = new ReadOperation() {
69       @Override
70       int readInternal(ReadableBuffer buffer, int length) {
71         return buffer.readUnsignedByte();
72       }
73     };
74     execute(op, 1);
75     return op.value;
76   }
77 
78   @Override
skipBytes(int length)79   public void skipBytes(int length) {
80     execute(new ReadOperation() {
81       @Override
82       public int readInternal(ReadableBuffer buffer, int length) {
83         buffer.skipBytes(length);
84         return 0;
85       }
86     }, length);
87   }
88 
89   @Override
readBytes(final byte[] dest, final int destOffset, int length)90   public void readBytes(final byte[] dest, final int destOffset, int length) {
91     execute(new ReadOperation() {
92       int currentOffset = destOffset;
93       @Override
94       public int readInternal(ReadableBuffer buffer, int length) {
95         buffer.readBytes(dest, currentOffset, length);
96         currentOffset += length;
97         return 0;
98       }
99     }, length);
100   }
101 
102   @Override
readBytes(final ByteBuffer dest)103   public void readBytes(final ByteBuffer dest) {
104     execute(new ReadOperation() {
105       @Override
106       public int readInternal(ReadableBuffer buffer, int length) {
107         // Change the limit so that only lengthToCopy bytes are available.
108         int prevLimit = dest.limit();
109         dest.limit(dest.position() + length);
110 
111         // Write the bytes and restore the original limit.
112         buffer.readBytes(dest);
113         dest.limit(prevLimit);
114         return 0;
115       }
116     }, dest.remaining());
117   }
118 
119   @Override
readBytes(final OutputStream dest, int length)120   public void readBytes(final OutputStream dest, int length) throws IOException {
121     ReadOperation op = new ReadOperation() {
122       @Override
123       public int readInternal(ReadableBuffer buffer, int length) throws IOException {
124         buffer.readBytes(dest, length);
125         return 0;
126       }
127     };
128     execute(op, length);
129 
130     // If an exception occurred, throw it.
131     if (op.isError()) {
132       throw op.ex;
133     }
134   }
135 
136   @Override
readBytes(int length)137   public CompositeReadableBuffer readBytes(int length) {
138     checkReadable(length);
139     readableBytes -= length;
140 
141     CompositeReadableBuffer newBuffer = new CompositeReadableBuffer();
142     while (length > 0) {
143       ReadableBuffer buffer = buffers.peek();
144       if (buffer.readableBytes() > length) {
145         newBuffer.addBuffer(buffer.readBytes(length));
146         length = 0;
147       } else {
148         newBuffer.addBuffer(buffers.poll());
149         length -= buffer.readableBytes();
150       }
151     }
152     return newBuffer;
153   }
154 
155   @Override
close()156   public void close() {
157     while (!buffers.isEmpty()) {
158       buffers.remove().close();
159     }
160   }
161 
162   /**
163    * Executes the given {@link ReadOperation} against the {@link ReadableBuffer}s required to
164    * satisfy the requested {@code length}.
165    */
execute(ReadOperation op, int length)166   private void execute(ReadOperation op, int length) {
167     checkReadable(length);
168 
169     if (!buffers.isEmpty()) {
170       advanceBufferIfNecessary();
171     }
172 
173     for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) {
174       ReadableBuffer buffer = buffers.peek();
175       int lengthToCopy = Math.min(length, buffer.readableBytes());
176 
177       // Perform the read operation for this buffer.
178       op.read(buffer, lengthToCopy);
179       if (op.isError()) {
180         return;
181       }
182 
183       length -= lengthToCopy;
184       readableBytes -= lengthToCopy;
185     }
186 
187     if (length > 0) {
188       // Should never get here.
189       throw new AssertionError("Failed executing read operation");
190     }
191   }
192 
193   /**
194    * If the current buffer is exhausted, removes and closes it.
195    */
advanceBufferIfNecessary()196   private void advanceBufferIfNecessary() {
197     ReadableBuffer buffer = buffers.peek();
198     if (buffer.readableBytes() == 0) {
199       buffers.remove().close();
200     }
201   }
202 
203   /**
204    * A simple read operation to perform on a single {@link ReadableBuffer}. All state management for
205    * the buffers is done by {@link CompositeReadableBuffer#execute(ReadOperation, int)}.
206    */
207   private abstract static class ReadOperation {
208     /**
209      * Only used by {@link CompositeReadableBuffer#readUnsignedByte()}.
210      */
211     int value;
212 
213     /**
214      * Only used by {@link CompositeReadableBuffer#readBytes(OutputStream, int)}.
215      */
216     IOException ex;
217 
read(ReadableBuffer buffer, int length)218     final void read(ReadableBuffer buffer, int length) {
219       try {
220         value = readInternal(buffer, length);
221       } catch (IOException e) {
222         ex = e;
223       }
224     }
225 
isError()226     final boolean isError() {
227       return ex != null;
228     }
229 
readInternal(ReadableBuffer buffer, int length)230     abstract int readInternal(ReadableBuffer buffer, int length) throws IOException;
231   }
232 }
233