1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.commons.io; 18 19 import static org.junit.jupiter.api.Assertions.assertEquals; 20 import static org.junit.jupiter.api.Assertions.assertNotNull; 21 22 import java.io.InputStream; 23 import java.io.OutputStream; 24 import java.nio.charset.StandardCharsets; 25 import java.util.HashMap; 26 import java.util.Random; 27 28 import org.apache.commons.io.input.CharSequenceInputStream; 29 import org.apache.commons.io.input.DemuxInputStream; 30 import org.apache.commons.io.output.ByteArrayOutputStream; 31 import org.apache.commons.io.output.DemuxOutputStream; 32 import org.apache.commons.io.test.TestUtils; 33 import org.junit.jupiter.api.Test; 34 35 /** 36 * Tests {@link DemuxInputStream}. 37 */ 38 public class DemuxInputStreamTest { 39 40 private static final class ReaderThread extends Thread { 41 private final DemuxInputStream demuxInputStream; 42 private final InputStream inputStream; 43 private final StringBuffer stringBuffer = new StringBuffer(); 44 ReaderThread(final String name, final InputStream input, final DemuxInputStream demux)45 ReaderThread(final String name, final InputStream input, final DemuxInputStream demux) { 46 super(name); 47 inputStream = input; 48 demuxInputStream = demux; 49 } 50 getData()51 public String getData() { 52 return stringBuffer.toString(); 53 } 54 55 @Override run()56 public void run() { 57 demuxInputStream.bindStream(inputStream); 58 59 try { 60 int ch = demuxInputStream.read(); 61 while (-1 != ch) { 62 // System.out.println( "Reading: " + (char)ch ); 63 stringBuffer.append((char) ch); 64 65 final int sleepMillis = Math.abs(RANDOM.nextInt() % 10); 66 TestUtils.sleep(sleepMillis); 67 ch = demuxInputStream.read(); 68 } 69 } catch (final Exception e) { 70 e.printStackTrace(); 71 } 72 } 73 } 74 75 private static final class WriterThread extends Thread { 76 private final byte[] byteArray; 77 private final DemuxOutputStream demuxOutputStream; 78 private final OutputStream outputStream; 79 WriterThread(final String name, final String data, final OutputStream output, final DemuxOutputStream demux)80 WriterThread(final String name, final String data, final OutputStream output, final DemuxOutputStream demux) { 81 super(name); 82 outputStream = output; 83 demuxOutputStream = demux; 84 byteArray = data.getBytes(); 85 } 86 87 @Override run()88 public void run() { 89 demuxOutputStream.bindStream(outputStream); 90 for (final byte element : byteArray) { 91 try { 92 // System.out.println( "Writing: " + (char)byteArray[ i ] ); 93 demuxOutputStream.write(element); 94 final int sleepMillis = Math.abs(RANDOM.nextInt() % 10); 95 TestUtils.sleep(sleepMillis); 96 } catch (final Exception e) { 97 e.printStackTrace(); 98 } 99 } 100 } 101 } 102 103 private static final Random RANDOM = new Random(); 104 private static final String DATA1 = "Data for thread1"; 105 106 private static final String DATA2 = "Data for thread2"; 107 private static final String DATA3 = "Data for thread3"; 108 private static final String DATA4 = "Data for thread4"; 109 private static final String T1 = "Thread1"; 110 111 private static final String T2 = "Thread2"; 112 private static final String T3 = "Thread3"; 113 private static final String T4 = "Thread4"; 114 115 private final HashMap<String, ByteArrayOutputStream> outputMap = new HashMap<>(); 116 117 private final HashMap<String, Thread> threadMap = new HashMap<>(); 118 doJoin()119 private void doJoin() throws InterruptedException { 120 for (final String name : threadMap.keySet()) { 121 final Thread thread = threadMap.get(name); 122 thread.join(); 123 } 124 } 125 doStart()126 private void doStart() { 127 threadMap.keySet().forEach(name -> threadMap.get(name).start()); 128 } 129 getInput(final String threadName)130 private String getInput(final String threadName) { 131 final ReaderThread thread = (ReaderThread) threadMap.get(threadName); 132 assertNotNull(thread, "getInput()"); 133 return thread.getData(); 134 } 135 getOutput(final String threadName)136 private String getOutput(final String threadName) { 137 final ByteArrayOutputStream output = outputMap.get(threadName); 138 assertNotNull(output, "getOutput()"); 139 return output.toString(StandardCharsets.UTF_8); 140 } 141 startReader(final String name, final String data, final DemuxInputStream demux)142 private void startReader(final String name, final String data, final DemuxInputStream demux) { 143 final InputStream input = CharSequenceInputStream.builder().setCharSequence(data).get(); 144 final ReaderThread thread = new ReaderThread(name, input, demux); 145 threadMap.put(name, thread); 146 } 147 startWriter(final String name, final String data, final DemuxOutputStream demux)148 private void startWriter(final String name, final String data, final DemuxOutputStream demux) { 149 final ByteArrayOutputStream output = new ByteArrayOutputStream(); 150 outputMap.put(name, output); 151 final WriterThread thread = new WriterThread(name, data, output, demux); 152 threadMap.put(name, thread); 153 } 154 155 @Test testInputStream()156 public void testInputStream() throws Exception { 157 try (final DemuxInputStream input = new DemuxInputStream()) { 158 startReader(T1, DATA1, input); 159 startReader(T2, DATA2, input); 160 startReader(T3, DATA3, input); 161 startReader(T4, DATA4, input); 162 163 doStart(); 164 doJoin(); 165 166 assertEquals(DATA1, getInput(T1), "Data1"); 167 assertEquals(DATA2, getInput(T2), "Data2"); 168 assertEquals(DATA3, getInput(T3), "Data3"); 169 assertEquals(DATA4, getInput(T4), "Data4"); 170 } 171 } 172 173 @Test testOutputStream()174 public void testOutputStream() throws Exception { 175 try (final DemuxOutputStream output = new DemuxOutputStream()) { 176 startWriter(T1, DATA1, output); 177 startWriter(T2, DATA2, output); 178 startWriter(T3, DATA3, output); 179 startWriter(T4, DATA4, output); 180 181 doStart(); 182 doJoin(); 183 184 assertEquals(DATA1, getOutput(T1), "Data1"); 185 assertEquals(DATA2, getOutput(T2), "Data2"); 186 assertEquals(DATA3, getOutput(T3), "Data3"); 187 assertEquals(DATA4, getOutput(T4), "Data4"); 188 } 189 } 190 191 @Test testReadEOF()192 public void testReadEOF() throws Exception { 193 try (final DemuxInputStream input = new DemuxInputStream()) { 194 assertEquals(IOUtils.EOF, input.read()); 195 } 196 } 197 } 198