1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.commons.io.input; 18 19 import static java.nio.charset.StandardCharsets.UTF_8; 20 import static org.junit.jupiter.api.Assertions.assertEquals; 21 import static org.junit.jupiter.api.Assertions.assertThrows; 22 23 import java.io.BufferedInputStream; 24 import java.io.BufferedOutputStream; 25 import java.io.ByteArrayOutputStream; 26 import java.io.IOException; 27 import java.io.InputStream; 28 import java.io.InterruptedIOException; 29 import java.nio.charset.StandardCharsets; 30 import java.util.concurrent.BlockingQueue; 31 import java.util.concurrent.LinkedBlockingQueue; 32 import java.util.stream.Stream; 33 34 import org.apache.commons.io.IOUtils; 35 import org.apache.commons.io.output.QueueOutputStream; 36 import org.apache.commons.io.output.QueueOutputStreamTest; 37 import org.apache.commons.lang3.StringUtils; 38 import org.junit.jupiter.api.Test; 39 import org.junit.jupiter.params.ParameterizedTest; 40 import org.junit.jupiter.params.provider.Arguments; 41 import org.junit.jupiter.params.provider.MethodSource; 42 43 /** 44 * Test {@link QueueInputStream}. 45 * 46 * @see {@link QueueOutputStreamTest} 47 */ 48 public class QueueInputStreamTest { 49 inputData()50 public static Stream<Arguments> inputData() { 51 return Stream.of(Arguments.of(""), 52 Arguments.of("1"), 53 Arguments.of("12"), 54 Arguments.of("1234"), 55 Arguments.of("12345678"), 56 Arguments.of(StringUtils.repeat("A", 4095)), 57 Arguments.of(StringUtils.repeat("A", 4096)), 58 Arguments.of(StringUtils.repeat("A", 4097)), 59 Arguments.of(StringUtils.repeat("A", 8191)), 60 Arguments.of(StringUtils.repeat("A", 8192)), 61 Arguments.of(StringUtils.repeat("A", 8193)), 62 Arguments.of(StringUtils.repeat("A", 8192 * 4))); 63 } 64 65 @ParameterizedTest(name = "inputData={0}") 66 @MethodSource("inputData") bufferedReads(final String inputData)67 public void bufferedReads(final String inputData) throws IOException { 68 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); 69 try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue)); 70 final QueueOutputStream outputStream = new QueueOutputStream(queue)) { 71 outputStream.write(inputData.getBytes(UTF_8)); 72 final String actualData = IOUtils.toString(inputStream, UTF_8); 73 assertEquals(inputData, actualData); 74 } 75 } 76 77 @ParameterizedTest(name = "inputData={0}") 78 @MethodSource("inputData") bufferedReadWrite(final String inputData)79 public void bufferedReadWrite(final String inputData) throws IOException { 80 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); 81 try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue)); 82 final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) { 83 outputStream.write(inputData.getBytes(UTF_8)); 84 outputStream.flush(); 85 final String dataCopy = IOUtils.toString(inputStream, UTF_8); 86 assertEquals(inputData, dataCopy); 87 } 88 } 89 90 @ParameterizedTest(name = "inputData={0}") 91 @MethodSource("inputData") bufferedWrites(final String inputData)92 public void bufferedWrites(final String inputData) throws IOException { 93 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); 94 try (QueueInputStream inputStream = new QueueInputStream(queue); 95 final BufferedOutputStream outputStream = new BufferedOutputStream(new QueueOutputStream(queue), defaultBufferSize())) { 96 outputStream.write(inputData.getBytes(UTF_8)); 97 outputStream.flush(); 98 final String actualData = readUnbuffered(inputStream); 99 assertEquals(inputData, actualData); 100 } 101 } 102 defaultBufferSize()103 private int defaultBufferSize() { 104 return 8192; 105 } 106 readUnbuffered(final InputStream inputStream)107 private String readUnbuffered(final InputStream inputStream) throws IOException { 108 final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); 109 int n = -1; 110 while ((n = inputStream.read()) != -1) { 111 byteArrayOutputStream.write(n); 112 } 113 return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name()); 114 } 115 116 @Test testNullArgument()117 public void testNullArgument() { 118 assertThrows(NullPointerException.class, () -> new QueueInputStream(null), "queue is required"); 119 } 120 121 @ParameterizedTest(name = "inputData={0}") 122 @MethodSource("inputData") unbufferedReadWrite(final String inputData)123 public void unbufferedReadWrite(final String inputData) throws IOException { 124 try (QueueInputStream inputStream = new QueueInputStream(); 125 final QueueOutputStream outputStream = inputStream.newQueueOutputStream()) { 126 writeUnbuffered(outputStream, inputData); 127 final String actualData = readUnbuffered(inputStream); 128 assertEquals(inputData, actualData); 129 } 130 } 131 writeUnbuffered(final QueueOutputStream outputStream, final String inputData)132 private void writeUnbuffered(final QueueOutputStream outputStream, final String inputData) throws InterruptedIOException { 133 final byte[] bytes = inputData.getBytes(UTF_8); 134 for (final byte oneByte : bytes) { 135 outputStream.write(oneByte); 136 } 137 } 138 } 139