• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package org.apache.commons.io;
18 
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.junit.jupiter.api.Assertions.assertNotNull;
21 
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.nio.charset.StandardCharsets;
25 import java.util.HashMap;
26 import java.util.Random;
27 
28 import org.apache.commons.io.input.CharSequenceInputStream;
29 import org.apache.commons.io.input.DemuxInputStream;
30 import org.apache.commons.io.output.ByteArrayOutputStream;
31 import org.apache.commons.io.output.DemuxOutputStream;
32 import org.apache.commons.io.test.TestUtils;
33 import org.junit.jupiter.api.Test;
34 
35 /**
36  * Tests {@link DemuxInputStream}.
37  */
38 public class DemuxInputStreamTest {
39 
40     private static final class ReaderThread extends Thread {
41         private final DemuxInputStream demuxInputStream;
42         private final InputStream inputStream;
43         private final StringBuffer stringBuffer = new StringBuffer();
44 
ReaderThread(final String name, final InputStream input, final DemuxInputStream demux)45         ReaderThread(final String name, final InputStream input, final DemuxInputStream demux) {
46             super(name);
47             inputStream = input;
48             demuxInputStream = demux;
49         }
50 
getData()51         public String getData() {
52             return stringBuffer.toString();
53         }
54 
55         @Override
run()56         public void run() {
57             demuxInputStream.bindStream(inputStream);
58 
59             try {
60                 int ch = demuxInputStream.read();
61                 while (-1 != ch) {
62                     // System.out.println( "Reading: " + (char)ch );
63                     stringBuffer.append((char) ch);
64 
65                     final int sleepMillis = Math.abs(RANDOM.nextInt() % 10);
66                     TestUtils.sleep(sleepMillis);
67                     ch = demuxInputStream.read();
68                 }
69             } catch (final Exception e) {
70                 e.printStackTrace();
71             }
72         }
73     }
74 
75     private static final class WriterThread extends Thread {
76         private final byte[] byteArray;
77         private final DemuxOutputStream demuxOutputStream;
78         private final OutputStream outputStream;
79 
WriterThread(final String name, final String data, final OutputStream output, final DemuxOutputStream demux)80         WriterThread(final String name, final String data, final OutputStream output, final DemuxOutputStream demux) {
81             super(name);
82             outputStream = output;
83             demuxOutputStream = demux;
84             byteArray = data.getBytes();
85         }
86 
87         @Override
run()88         public void run() {
89             demuxOutputStream.bindStream(outputStream);
90             for (final byte element : byteArray) {
91                 try {
92                     // System.out.println( "Writing: " + (char)byteArray[ i ] );
93                     demuxOutputStream.write(element);
94                     final int sleepMillis = Math.abs(RANDOM.nextInt() % 10);
95                     TestUtils.sleep(sleepMillis);
96                 } catch (final Exception e) {
97                     e.printStackTrace();
98                 }
99             }
100         }
101     }
102 
103     private static final Random RANDOM = new Random();
104     private static final String DATA1 = "Data for thread1";
105 
106     private static final String DATA2 = "Data for thread2";
107     private static final String DATA3 = "Data for thread3";
108     private static final String DATA4 = "Data for thread4";
109     private static final String T1 = "Thread1";
110 
111     private static final String T2 = "Thread2";
112     private static final String T3 = "Thread3";
113     private static final String T4 = "Thread4";
114 
115     private final HashMap<String, ByteArrayOutputStream> outputMap = new HashMap<>();
116 
117     private final HashMap<String, Thread> threadMap = new HashMap<>();
118 
doJoin()119     private void doJoin() throws InterruptedException {
120         for (final String name : threadMap.keySet()) {
121             final Thread thread = threadMap.get(name);
122             thread.join();
123         }
124     }
125 
doStart()126     private void doStart() {
127         threadMap.keySet().forEach(name -> threadMap.get(name).start());
128     }
129 
getInput(final String threadName)130     private String getInput(final String threadName) {
131         final ReaderThread thread = (ReaderThread) threadMap.get(threadName);
132         assertNotNull(thread, "getInput()");
133         return thread.getData();
134     }
135 
getOutput(final String threadName)136     private String getOutput(final String threadName) {
137         final ByteArrayOutputStream output = outputMap.get(threadName);
138         assertNotNull(output, "getOutput()");
139         return output.toString(StandardCharsets.UTF_8);
140     }
141 
startReader(final String name, final String data, final DemuxInputStream demux)142     private void startReader(final String name, final String data, final DemuxInputStream demux) {
143         final InputStream input = CharSequenceInputStream.builder().setCharSequence(data).get();
144         final ReaderThread thread = new ReaderThread(name, input, demux);
145         threadMap.put(name, thread);
146     }
147 
startWriter(final String name, final String data, final DemuxOutputStream demux)148     private void startWriter(final String name, final String data, final DemuxOutputStream demux) {
149         final ByteArrayOutputStream output = new ByteArrayOutputStream();
150         outputMap.put(name, output);
151         final WriterThread thread = new WriterThread(name, data, output, demux);
152         threadMap.put(name, thread);
153     }
154 
155     @Test
testInputStream()156     public void testInputStream() throws Exception {
157         try (final DemuxInputStream input = new DemuxInputStream()) {
158             startReader(T1, DATA1, input);
159             startReader(T2, DATA2, input);
160             startReader(T3, DATA3, input);
161             startReader(T4, DATA4, input);
162 
163             doStart();
164             doJoin();
165 
166             assertEquals(DATA1, getInput(T1), "Data1");
167             assertEquals(DATA2, getInput(T2), "Data2");
168             assertEquals(DATA3, getInput(T3), "Data3");
169             assertEquals(DATA4, getInput(T4), "Data4");
170         }
171     }
172 
173     @Test
testOutputStream()174     public void testOutputStream() throws Exception {
175         try (final DemuxOutputStream output = new DemuxOutputStream()) {
176             startWriter(T1, DATA1, output);
177             startWriter(T2, DATA2, output);
178             startWriter(T3, DATA3, output);
179             startWriter(T4, DATA4, output);
180 
181             doStart();
182             doJoin();
183 
184             assertEquals(DATA1, getOutput(T1), "Data1");
185             assertEquals(DATA2, getOutput(T2), "Data2");
186             assertEquals(DATA3, getOutput(T3), "Data3");
187             assertEquals(DATA4, getOutput(T4), "Data4");
188         }
189     }
190 
191     @Test
testReadEOF()192     public void testReadEOF() throws Exception {
193         try (final DemuxInputStream input = new DemuxInputStream()) {
194             assertEquals(IOUtils.EOF, input.read());
195         }
196     }
197 }
198