• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import java.io.IOException;
20 import java.io.OutputStream;
21 import java.nio.Buffer;
22 import java.nio.ByteBuffer;
23 import java.nio.InvalidMarkException;
24 import java.util.ArrayDeque;
25 import java.util.Deque;
26 import javax.annotation.Nullable;
27 
28 /**
29  * A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a
30  * facade that allows multiple buffers to be treated as one.
31  *
32  * <p>When a buffer is added to a composite, its life cycle is controlled by the composite. Once
33  * the composite has read past the end of a given buffer, that buffer is automatically closed and
34  * removed from the composite.
35  */
36 public class CompositeReadableBuffer extends AbstractReadableBuffer {
37 
38   private final Deque<ReadableBuffer> readableBuffers;
39   private Deque<ReadableBuffer> rewindableBuffers;
40   private int readableBytes;
41   private boolean marked;
42 
CompositeReadableBuffer(int initialCapacity)43   public CompositeReadableBuffer(int initialCapacity) {
44     readableBuffers = new ArrayDeque<>(initialCapacity);
45   }
46 
CompositeReadableBuffer()47   public CompositeReadableBuffer() {
48     readableBuffers = new ArrayDeque<>();
49   }
50 
51   /**
52    * Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is
53    * expected that this {@code CompositeBuffer} has complete ownership. Any attempt to modify the
54    * buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of
55    * this {@code CompositeBuffer}.
56    */
addBuffer(ReadableBuffer buffer)57   public void addBuffer(ReadableBuffer buffer) {
58     boolean markHead = marked && readableBuffers.isEmpty();
59     enqueueBuffer(buffer);
60     if (markHead) {
61       readableBuffers.peek().mark();
62     }
63   }
64 
enqueueBuffer(ReadableBuffer buffer)65   private void enqueueBuffer(ReadableBuffer buffer) {
66     if (!(buffer instanceof CompositeReadableBuffer)) {
67       readableBuffers.add(buffer);
68       readableBytes += buffer.readableBytes();
69       return;
70     }
71 
72     CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer;
73     while (!compositeBuffer.readableBuffers.isEmpty()) {
74       ReadableBuffer subBuffer = compositeBuffer.readableBuffers.remove();
75       readableBuffers.add(subBuffer);
76     }
77     readableBytes += compositeBuffer.readableBytes;
78     compositeBuffer.readableBytes = 0;
79     compositeBuffer.close();
80   }
81 
82   @Override
readableBytes()83   public int readableBytes() {
84     return readableBytes;
85   }
86 
87   private static final NoThrowReadOperation<Void> UBYTE_OP =
88       new NoThrowReadOperation<Void>() {
89         @Override
90         public int read(ReadableBuffer buffer, int length, Void unused, int value) {
91           return buffer.readUnsignedByte();
92         }
93       };
94 
95   @Override
readUnsignedByte()96   public int readUnsignedByte() {
97     return executeNoThrow(UBYTE_OP, 1, null, 0);
98   }
99 
100   private static final NoThrowReadOperation<Void> SKIP_OP =
101       new NoThrowReadOperation<Void>() {
102         @Override
103         public int read(ReadableBuffer buffer, int length, Void unused, int unused2) {
104           buffer.skipBytes(length);
105           return 0;
106         }
107       };
108 
109   @Override
skipBytes(int length)110   public void skipBytes(int length) {
111     executeNoThrow(SKIP_OP, length, null, 0);
112   }
113 
114   private static final NoThrowReadOperation<byte[]> BYTE_ARRAY_OP =
115       new NoThrowReadOperation<byte[]>() {
116         @Override
117         public int read(ReadableBuffer buffer, int length, byte[] dest, int offset) {
118           buffer.readBytes(dest, offset, length);
119           return offset + length;
120         }
121       };
122 
123   @Override
readBytes(byte[] dest, int destOffset, int length)124   public void readBytes(byte[] dest, int destOffset, int length) {
125     executeNoThrow(BYTE_ARRAY_OP, length, dest, destOffset);
126   }
127 
128   private static final NoThrowReadOperation<ByteBuffer> BYTE_BUF_OP =
129       new NoThrowReadOperation<ByteBuffer>() {
130         @Override
131         public int read(ReadableBuffer buffer, int length, ByteBuffer dest, int unused) {
132           // Change the limit so that only lengthToCopy bytes are available.
133           int prevLimit = dest.limit();
134           ((Buffer) dest).limit(dest.position() + length);
135           // Write the bytes and restore the original limit.
136           buffer.readBytes(dest);
137           ((Buffer) dest).limit(prevLimit);
138           return 0;
139         }
140       };
141 
142   @Override
readBytes(ByteBuffer dest)143   public void readBytes(ByteBuffer dest) {
144     executeNoThrow(BYTE_BUF_OP, dest.remaining(), dest, 0);
145   }
146 
147   private static final ReadOperation<OutputStream> STREAM_OP =
148       new ReadOperation<OutputStream>() {
149         @Override
150         public int read(ReadableBuffer buffer, int length, OutputStream dest, int unused)
151             throws IOException {
152           buffer.readBytes(dest, length);
153           return 0;
154         }
155       };
156 
157   @Override
readBytes(OutputStream dest, int length)158   public void readBytes(OutputStream dest, int length) throws IOException {
159     execute(STREAM_OP, length, dest, 0);
160   }
161 
162   @Override
readBytes(int length)163   public ReadableBuffer readBytes(int length) {
164     if (length <= 0) {
165       return ReadableBuffers.empty();
166     }
167     checkReadable(length);
168     readableBytes -= length;
169 
170     ReadableBuffer newBuffer = null;
171     CompositeReadableBuffer newComposite = null;
172     do {
173       ReadableBuffer buffer = readableBuffers.peek();
174       int readable = buffer.readableBytes();
175       ReadableBuffer readBuffer;
176       if (readable > length) {
177         readBuffer = buffer.readBytes(length);
178         length = 0;
179       } else {
180         if (marked) {
181           readBuffer = buffer.readBytes(readable);
182           advanceBuffer();
183         } else {
184           readBuffer = readableBuffers.poll();
185         }
186         length -= readable;
187       }
188       if (newBuffer == null) {
189         newBuffer = readBuffer;
190       } else {
191         if (newComposite == null) {
192           newComposite = new CompositeReadableBuffer(
193               length == 0 ? 2 : Math.min(readableBuffers.size() + 2, 16));
194           newComposite.addBuffer(newBuffer);
195           newBuffer = newComposite;
196         }
197         newComposite.addBuffer(readBuffer);
198       }
199     } while (length > 0);
200     return newBuffer;
201   }
202 
203   @Override
markSupported()204   public boolean markSupported() {
205     for (ReadableBuffer buffer : readableBuffers) {
206       if (!buffer.markSupported()) {
207         return false;
208       }
209     }
210     return true;
211   }
212 
213   @Override
mark()214   public void mark() {
215     if (rewindableBuffers == null) {
216       rewindableBuffers = new ArrayDeque<>(Math.min(readableBuffers.size(), 16));
217     }
218     while (!rewindableBuffers.isEmpty()) {
219       rewindableBuffers.remove().close();
220     }
221     marked = true;
222     ReadableBuffer buffer = readableBuffers.peek();
223     if (buffer != null) {
224       buffer.mark();
225     }
226   }
227 
228   @Override
reset()229   public void reset() {
230     if (!marked) {
231       throw new InvalidMarkException();
232     }
233     ReadableBuffer buffer;
234     if ((buffer = readableBuffers.peek()) != null) {
235       int currentRemain = buffer.readableBytes();
236       buffer.reset();
237       readableBytes += (buffer.readableBytes() - currentRemain);
238     }
239     while ((buffer = rewindableBuffers.pollLast()) != null) {
240       buffer.reset();
241       readableBuffers.addFirst(buffer);
242       readableBytes += buffer.readableBytes();
243     }
244   }
245 
246   @Override
byteBufferSupported()247   public boolean byteBufferSupported() {
248     for (ReadableBuffer buffer : readableBuffers) {
249       if (!buffer.byteBufferSupported()) {
250         return false;
251       }
252     }
253     return true;
254   }
255 
256   @Nullable
257   @Override
getByteBuffer()258   public ByteBuffer getByteBuffer() {
259     if (readableBuffers.isEmpty()) {
260       return null;
261     }
262     return readableBuffers.peek().getByteBuffer();
263   }
264 
265   @Override
close()266   public void close() {
267     while (!readableBuffers.isEmpty()) {
268       readableBuffers.remove().close();
269     }
270     if (rewindableBuffers != null) {
271       while (!rewindableBuffers.isEmpty()) {
272         rewindableBuffers.remove().close();
273       }
274     }
275   }
276 
277   /**
278    * Executes the given {@link ReadOperation} against the {@link ReadableBuffer}s required to
279    * satisfy the requested {@code length}.
280    */
execute(ReadOperation<T> op, int length, T dest, int value)281   private <T> int execute(ReadOperation<T> op, int length, T dest, int value) throws IOException {
282     checkReadable(length);
283 
284     if (!readableBuffers.isEmpty()) {
285       advanceBufferIfNecessary();
286     }
287 
288     for (; length > 0 && !readableBuffers.isEmpty(); advanceBufferIfNecessary()) {
289       ReadableBuffer buffer = readableBuffers.peek();
290       int lengthToCopy = Math.min(length, buffer.readableBytes());
291 
292       // Perform the read operation for this buffer.
293       value = op.read(buffer, lengthToCopy, dest, value);
294 
295       length -= lengthToCopy;
296       readableBytes -= lengthToCopy;
297     }
298 
299     if (length > 0) {
300       // Should never get here.
301       throw new AssertionError("Failed executing read operation");
302     }
303 
304     return value;
305   }
306 
executeNoThrow(NoThrowReadOperation<T> op, int length, T dest, int value)307   private <T> int executeNoThrow(NoThrowReadOperation<T> op, int length, T dest, int value) {
308     try {
309       return execute(op, length, dest, value);
310     } catch (IOException e) {
311       throw new AssertionError(e); // shouldn't happen
312     }
313   }
314 
315   /**
316    * If the current buffer is exhausted, removes and closes it.
317    */
advanceBufferIfNecessary()318   private void advanceBufferIfNecessary() {
319     ReadableBuffer buffer = readableBuffers.peek();
320     if (buffer.readableBytes() == 0) {
321       advanceBuffer();
322     }
323   }
324 
325   /**
326    * Removes one buffer from the front and closes it.
327    */
advanceBuffer()328   private void advanceBuffer() {
329     if (marked) {
330       rewindableBuffers.add(readableBuffers.remove());
331       ReadableBuffer next = readableBuffers.peek();
332       if (next != null) {
333         next.mark();
334       }
335     } else {
336       readableBuffers.remove().close();
337     }
338   }
339 
340   /**
341    * A simple read operation to perform on a single {@link ReadableBuffer}.
342    * All state management for the buffers is done by
343    * {@link CompositeReadableBuffer#execute(ReadOperation, int, Object, int)}.
344    */
345   private interface ReadOperation<T> {
346     /**
347      * This method can also be used to simultaneously perform operation-specific int-valued
348      * aggregation over the sequence of buffers in a {@link CompositeReadableBuffer}.
349      * {@code value} is the return value from the prior buffer, or the "initial" value passed
350      * to {@code execute()} in the case of the first buffer. {@code execute()} returns the value
351      * returned by the operation called on the last buffer.
352      */
read(ReadableBuffer buffer, int length, T dest, int value)353     int read(ReadableBuffer buffer, int length, T dest, int value) throws IOException;
354   }
355 
356   private interface NoThrowReadOperation<T> extends ReadOperation<T> {
357     @Override
read(ReadableBuffer buffer, int length, T dest, int value)358     int read(ReadableBuffer buffer, int length, T dest, int value);
359   }
360 }
361