1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.commons.io.output; 18 19 import static java.nio.charset.StandardCharsets.UTF_8; 20 import static java.util.concurrent.TimeUnit.SECONDS; 21 import static org.junit.jupiter.api.Assertions.assertEquals; 22 import static org.junit.jupiter.api.Assertions.assertNotNull; 23 import static org.junit.jupiter.api.Assertions.assertThrows; 24 25 import java.io.InterruptedIOException; 26 import java.nio.charset.StandardCharsets; 27 import java.util.concurrent.Callable; 28 import java.util.concurrent.Exchanger; 29 import java.util.concurrent.ExecutorService; 30 import java.util.concurrent.Executors; 31 import java.util.concurrent.LinkedBlockingQueue; 32 33 import org.apache.commons.io.IOUtils; 34 import org.apache.commons.io.input.QueueInputStream; 35 import org.apache.commons.io.input.QueueInputStreamTest; 36 import org.junit.jupiter.api.AfterAll; 37 import org.junit.jupiter.api.Test; 38 39 /** 40 * Test {@link QueueOutputStream} and {@link QueueInputStream} 41 * 42 * @see QueueInputStreamTest 43 */ 44 public class QueueOutputStreamTest { 45 46 private static final ExecutorService executorService = Executors.newFixedThreadPool(5); 47 48 @AfterAll afterAll()49 public static void afterAll() { 50 executorService.shutdown(); 51 } 52 callInThrowAwayThread(final Callable<T> callable)53 private static <T> T callInThrowAwayThread(final Callable<T> callable) throws Exception { 54 final Exchanger<T> exchanger = new Exchanger<>(); 55 executorService.submit(() -> { 56 final T value = callable.call(); 57 exchanger.exchange(value); 58 return null; 59 }); 60 return exchanger.exchange(null); 61 } 62 63 @Test testNullArgument()64 public void testNullArgument() { 65 assertThrows(NullPointerException.class, () -> new QueueOutputStream(null), "queue is required"); 66 } 67 68 @Test writeInterrupted()69 public void writeInterrupted() throws Exception { 70 try (QueueOutputStream outputStream = new QueueOutputStream(new LinkedBlockingQueue<>(1)); 71 final QueueInputStream inputStream = outputStream.newQueueInputStream()) { 72 73 final int timeout = 1; 74 final Exchanger<Thread> writerThreadExchanger = new Exchanger<>(); 75 final Exchanger<Exception> exceptionExchanger = new Exchanger<>(); 76 executorService.submit(() -> { 77 final Thread writerThread = writerThreadExchanger.exchange(null, timeout, SECONDS); 78 writerThread.interrupt(); 79 return null; 80 }); 81 82 executorService.submit(() -> { 83 try { 84 writerThreadExchanger.exchange(Thread.currentThread(), timeout, SECONDS); 85 outputStream.write("ABC".getBytes(StandardCharsets.UTF_8)); 86 } catch (final Exception e) { 87 Thread.interrupted(); //clear interrupt 88 exceptionExchanger.exchange(e, timeout, SECONDS); 89 } 90 return null; 91 }); 92 93 final Exception exception = exceptionExchanger.exchange(null, timeout, SECONDS); 94 assertNotNull(exception); 95 assertEquals(exception.getClass(), InterruptedIOException.class); 96 } 97 } 98 99 @Test writeString()100 public void writeString() throws Exception { 101 try (QueueOutputStream outputStream = new QueueOutputStream(); 102 final QueueInputStream inputStream = outputStream.newQueueInputStream()) { 103 outputStream.write("ABC".getBytes(UTF_8)); 104 final String value = IOUtils.toString(inputStream, UTF_8); 105 assertEquals("ABC", value); 106 } 107 } 108 109 @Test writeStringMultiThread()110 public void writeStringMultiThread() throws Exception { 111 try (QueueOutputStream outputStream = callInThrowAwayThread(QueueOutputStream::new); 112 final QueueInputStream inputStream = callInThrowAwayThread(outputStream::newQueueInputStream)) { 113 callInThrowAwayThread(() -> { 114 outputStream.write("ABC".getBytes(UTF_8)); 115 return null; 116 }); 117 118 final String value = callInThrowAwayThread(() -> IOUtils.toString(inputStream, UTF_8)); 119 assertEquals("ABC", value); 120 } 121 } 122 } 123