• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package org.apache.commons.io.input;
18 
19 import static java.nio.charset.StandardCharsets.UTF_8;
20 import static org.junit.jupiter.api.Assertions.assertEquals;
21 import static org.junit.jupiter.api.Assertions.assertThrows;
22 
23 import java.io.BufferedInputStream;
24 import java.io.BufferedOutputStream;
25 import java.io.ByteArrayOutputStream;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.InterruptedIOException;
29 import java.nio.charset.StandardCharsets;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.stream.Stream;
33 
34 import org.apache.commons.io.IOUtils;
35 import org.apache.commons.io.output.QueueOutputStream;
36 import org.apache.commons.io.output.QueueOutputStreamTest;
37 import org.apache.commons.lang3.StringUtils;
38 import org.junit.jupiter.api.Test;
39 import org.junit.jupiter.params.ParameterizedTest;
40 import org.junit.jupiter.params.provider.Arguments;
41 import org.junit.jupiter.params.provider.MethodSource;
42 
43 /**
44  * Test {@link QueueInputStream}.
45  *
46  * @see {@link QueueOutputStreamTest}
47  */
48 public class QueueInputStreamTest {
49 
inputData()50     public static Stream<Arguments> inputData() {
51         return Stream.of(Arguments.of(""),
52                 Arguments.of("1"),
53                 Arguments.of("12"),
54                 Arguments.of("1234"),
55                 Arguments.of("12345678"),
56                 Arguments.of(StringUtils.repeat("A", 4095)),
57                 Arguments.of(StringUtils.repeat("A", 4096)),
58                 Arguments.of(StringUtils.repeat("A", 4097)),
59                 Arguments.of(StringUtils.repeat("A", 8191)),
60                 Arguments.of(StringUtils.repeat("A", 8192)),
61                 Arguments.of(StringUtils.repeat("A", 8193)),
62                 Arguments.of(StringUtils.repeat("A", 8192 * 4)));
63     }
64 
65     @ParameterizedTest(name = "inputData={0}")
66     @MethodSource("inputData")
bufferedReads(final String inputData)67     public void bufferedReads(final String inputData) throws IOException {
68         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
69         try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
70                 final QueueOutputStream outputStream = new QueueOutputStream(queue)) {
71             outputStream.write(inputData.getBytes(UTF_8));
72             final String actualData = IOUtils.toString(inputStream, UTF_8);
73             assertEquals(inputData, actualData);
74         }
75     }
76 
77     @ParameterizedTest(name = "inputData={0}")
78     @MethodSource("inputData")
bufferedReadWrite(final String inputData)79     public void bufferedReadWrite(final String inputData) throws IOException {
80         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
81         try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
82                 final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
83             outputStream.write(inputData.getBytes(UTF_8));
84             outputStream.flush();
85             final String dataCopy = IOUtils.toString(inputStream, UTF_8);
86             assertEquals(inputData, dataCopy);
87         }
88     }
89 
90     @ParameterizedTest(name = "inputData={0}")
91     @MethodSource("inputData")
bufferedWrites(final String inputData)92     public void bufferedWrites(final String inputData) throws IOException {
93         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
94         try (QueueInputStream inputStream = new QueueInputStream(queue);
95                 final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) {
96             outputStream.write(inputData.getBytes(UTF_8));
97             outputStream.flush();
98             final String actualData = readUnbuffered(inputStream);
99             assertEquals(inputData, actualData);
100         }
101     }
102 
defaultBufferSize()103     private int defaultBufferSize() {
104         return 8192;
105     }
106 
readUnbuffered(final InputStream inputStream)107     private String readUnbuffered(final InputStream inputStream) throws IOException {
108         final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
109         int n = -1;
110         while ((n = inputStream.read()) != -1) {
111             byteArrayOutputStream.write(n);
112         }
113         return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name());
114     }
115 
116     @Test
testNullArgument()117     public void testNullArgument() {
118         assertThrows(NullPointerException.class, () -> new QueueInputStream(null), "queue is required");
119     }
120 
121     @ParameterizedTest(name = "inputData={0}")
122     @MethodSource("inputData")
unbufferedReadWrite(final String inputData)123     public void unbufferedReadWrite(final String inputData) throws IOException {
124         try (QueueInputStream inputStream = new QueueInputStream();
125                 final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
126             writeUnbuffered(outputStream, inputData);
127             final String actualData = readUnbuffered(inputStream);
128             assertEquals(inputData, actualData);
129         }
130     }
131 
writeUnbuffered(final QueueOutputStream outputStream, final String inputData)132     private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws InterruptedIOException {
133         final byte[] bytes = inputData.getBytes(UTF_8);
134         for (final byte oneByte : bytes) {
135             outputStream.write(oneByte);
136         }
137     }
138 }
139