• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2015 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 
15 package com.google.common.io;
16 
17 import static com.google.common.base.Preconditions.checkArgument;
18 import static com.google.common.base.Preconditions.checkNotNull;
19 import static com.google.common.base.Preconditions.checkPositionIndexes;
20 
21 import com.google.common.annotations.GwtIncompatible;
22 import com.google.common.annotations.J2ktIncompatible;
23 import com.google.common.primitives.UnsignedBytes;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.Reader;
27 import java.nio.Buffer;
28 import java.nio.ByteBuffer;
29 import java.nio.CharBuffer;
30 import java.nio.charset.Charset;
31 import java.nio.charset.CharsetEncoder;
32 import java.nio.charset.CoderResult;
33 import java.nio.charset.CodingErrorAction;
34 import java.util.Arrays;
35 
36 /**
37  * An {@link InputStream} that converts characters from a {@link Reader} into bytes using an
38  * arbitrary Charset.
39  *
40  * <p>This is an alternative to copying the data to an {@code OutputStream} via a {@code Writer},
41  * which is necessarily blocking. By implementing an {@code InputStream} it allows consumers to
42  * "pull" as much data as they can handle, which is more convenient when dealing with flow
43  * controlled, async APIs.
44  *
45  * @author Chris Nokleberg
46  */
47 @J2ktIncompatible
48 @GwtIncompatible
49 @ElementTypesAreNonnullByDefault
50 final class ReaderInputStream extends InputStream {
51   private final Reader reader;
52   private final CharsetEncoder encoder;
53   private final byte[] singleByte = new byte[1];
54 
55   /**
56    * charBuffer holds characters that have been read from the Reader but not encoded yet. The buffer
57    * is perpetually "flipped" (unencoded characters between position and limit).
58    */
59   private CharBuffer charBuffer;
60 
61   /**
62    * byteBuffer holds encoded characters that have not yet been sent to the caller of the input
63    * stream. When encoding it is "unflipped" (encoded bytes between 0 and position) and when
64    * draining it is flipped (undrained bytes between position and limit).
65    */
66   private ByteBuffer byteBuffer;
67 
68   /** Whether we've finished reading the reader. */
69   private boolean endOfInput;
70   /** Whether we're copying encoded bytes to the caller's buffer. */
71   private boolean draining;
72   /** Whether we've successfully flushed the encoder. */
73   private boolean doneFlushing;
74 
75   /**
76    * Creates a new input stream that will encode the characters from {@code reader} into bytes using
77    * the given character set. Malformed input and unmappable characters will be replaced.
78    *
79    * @param reader input source
80    * @param charset character set used for encoding chars to bytes
81    * @param bufferSize size of internal input and output buffers
82    * @throws IllegalArgumentException if bufferSize is non-positive
83    */
ReaderInputStream(Reader reader, Charset charset, int bufferSize)84   ReaderInputStream(Reader reader, Charset charset, int bufferSize) {
85     this(
86         reader,
87         charset
88             .newEncoder()
89             .onMalformedInput(CodingErrorAction.REPLACE)
90             .onUnmappableCharacter(CodingErrorAction.REPLACE),
91         bufferSize);
92   }
93 
94   /**
95    * Creates a new input stream that will encode the characters from {@code reader} into bytes using
96    * the given character set encoder.
97    *
98    * @param reader input source
99    * @param encoder character set encoder used for encoding chars to bytes
100    * @param bufferSize size of internal input and output buffers
101    * @throws IllegalArgumentException if bufferSize is non-positive
102    */
ReaderInputStream(Reader reader, CharsetEncoder encoder, int bufferSize)103   ReaderInputStream(Reader reader, CharsetEncoder encoder, int bufferSize) {
104     this.reader = checkNotNull(reader);
105     this.encoder = checkNotNull(encoder);
106     checkArgument(bufferSize > 0, "bufferSize must be positive: %s", bufferSize);
107     encoder.reset();
108 
109     charBuffer = CharBuffer.allocate(bufferSize);
110     Java8Compatibility.flip(charBuffer);
111 
112     byteBuffer = ByteBuffer.allocate(bufferSize);
113   }
114 
115   @Override
close()116   public void close() throws IOException {
117     reader.close();
118   }
119 
120   @Override
read()121   public int read() throws IOException {
122     return (read(singleByte) == 1) ? UnsignedBytes.toInt(singleByte[0]) : -1;
123   }
124 
125   // TODO(chrisn): Consider trying to encode/flush directly to the argument byte
126   // buffer when possible.
127   @Override
read(byte[] b, int off, int len)128   public int read(byte[] b, int off, int len) throws IOException {
129     // Obey InputStream contract.
130     checkPositionIndexes(off, off + len, b.length);
131     if (len == 0) {
132       return 0;
133     }
134 
135     // The rest of this method implements the process described by the CharsetEncoder javadoc.
136     int totalBytesRead = 0;
137     boolean doneEncoding = endOfInput;
138 
139     DRAINING:
140     while (true) {
141       // We stay in draining mode until there are no bytes left in the output buffer. Then we go
142       // back to encoding/flushing.
143       if (draining) {
144         totalBytesRead += drain(b, off + totalBytesRead, len - totalBytesRead);
145         if (totalBytesRead == len || doneFlushing) {
146           return (totalBytesRead > 0) ? totalBytesRead : -1;
147         }
148         draining = false;
149         Java8Compatibility.clear(byteBuffer);
150       }
151 
152       while (true) {
153         // We call encode until there is no more input. The last call to encode will have endOfInput
154         // == true. Then there is a final call to flush.
155         CoderResult result;
156         if (doneFlushing) {
157           result = CoderResult.UNDERFLOW;
158         } else if (doneEncoding) {
159           result = encoder.flush(byteBuffer);
160         } else {
161           result = encoder.encode(charBuffer, byteBuffer, endOfInput);
162         }
163 
164         if (result.isOverflow()) {
165           // Not enough room in output buffer--drain it, creating a bigger buffer if necessary.
166           startDraining(true);
167           continue DRAINING;
168         } else if (result.isUnderflow()) {
169           // If encoder underflows, it means either:
170           // a) the final flush() succeeded; next drain (then done)
171           // b) we encoded all of the input; next flush
172           // c) we ran of out input to encode; next read more input
173           if (doneEncoding) { // (a)
174             doneFlushing = true;
175             startDraining(false);
176             continue DRAINING;
177           } else if (endOfInput) { // (b)
178             doneEncoding = true;
179           } else { // (c)
180             readMoreChars();
181           }
182         } else if (result.isError()) {
183           // Only reach here if a CharsetEncoder with non-REPLACE settings is used.
184           result.throwException();
185           return 0; // Not called.
186         }
187       }
188     }
189   }
190 
191   /** Returns a new CharBuffer identical to buf, except twice the capacity. */
grow(CharBuffer buf)192   private static CharBuffer grow(CharBuffer buf) {
193     char[] copy = Arrays.copyOf(buf.array(), buf.capacity() * 2);
194     CharBuffer bigger = CharBuffer.wrap(copy);
195     Java8Compatibility.position(bigger, buf.position());
196     Java8Compatibility.limit(bigger, buf.limit());
197     return bigger;
198   }
199 
200   /** Handle the case of underflow caused by needing more input characters. */
readMoreChars()201   private void readMoreChars() throws IOException {
202     // Possibilities:
203     // 1) array has space available on right-hand side (between limit and capacity)
204     // 2) array has space available on left-hand side (before position)
205     // 3) array has no space available
206     //
207     // In case 2 we shift the existing chars to the left, and in case 3 we create a bigger
208     // array, then they both become case 1.
209 
210     if (availableCapacity(charBuffer) == 0) {
211       if (charBuffer.position() > 0) {
212         // (2) There is room in the buffer. Move existing bytes to the beginning.
213         Java8Compatibility.flip(charBuffer.compact());
214       } else {
215         // (3) Entire buffer is full, need bigger buffer.
216         charBuffer = grow(charBuffer);
217       }
218     }
219 
220     // (1) Read more characters into free space at end of array.
221     int limit = charBuffer.limit();
222     int numChars = reader.read(charBuffer.array(), limit, availableCapacity(charBuffer));
223     if (numChars == -1) {
224       endOfInput = true;
225     } else {
226       Java8Compatibility.limit(charBuffer, limit + numChars);
227     }
228   }
229 
230   /** Returns the number of elements between the limit and capacity. */
availableCapacity(Buffer buffer)231   private static int availableCapacity(Buffer buffer) {
232     return buffer.capacity() - buffer.limit();
233   }
234 
235   /**
236    * Flips the buffer output buffer so we can start reading bytes from it. If we are starting to
237    * drain because there was overflow, and there aren't actually any characters to drain, then the
238    * overflow must be due to a small output buffer.
239    */
startDraining(boolean overflow)240   private void startDraining(boolean overflow) {
241     Java8Compatibility.flip(byteBuffer);
242     if (overflow && byteBuffer.remaining() == 0) {
243       byteBuffer = ByteBuffer.allocate(byteBuffer.capacity() * 2);
244     } else {
245       draining = true;
246     }
247   }
248 
249   /**
250    * Copy as much of the byte buffer into the output array as possible, returning the (positive)
251    * number of characters copied.
252    */
drain(byte[] b, int off, int len)253   private int drain(byte[] b, int off, int len) {
254     int remaining = Math.min(len, byteBuffer.remaining());
255     byteBuffer.get(b, off, remaining);
256     return remaining;
257   }
258 }
259