• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 package org.apache.commons.io;
19 
20 import static org.junit.jupiter.api.Assertions.assertEquals;
21 import static org.junit.jupiter.api.Assertions.fail;
22 
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.EOFException;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.util.Random;
29 import java.util.concurrent.ExecutorCompletionService;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.function.Supplier;
34 import java.util.zip.Inflater;
35 import java.util.zip.InflaterInputStream;
36 
37 import org.junit.jupiter.api.BeforeEach;
38 import org.junit.jupiter.api.Test;
39 
40 /**
41  * See Jira ticket IO-802.
42  */
43 public class IOUtilsMultithreadedSkipTest {
44 
45     private static final String FIXTURE = "TIKA-4065.bin";
46     long seed = 1;
47     private final ThreadLocal<byte[]> threadLocal = ThreadLocal.withInitial(() -> new byte[4096]);
48 
generateExpected(final InputStream is, final int[] skips)49     private int[] generateExpected(final InputStream is, final int[] skips) throws IOException {
50         final int[] testBytes = new int[skips.length];
51         for (int i = 0; i < skips.length; i++) {
52             try {
53                 IOUtils.skipFully(is, skips[i]);
54                 testBytes[i] = is.read();
55             } catch (final EOFException e) {
56                 testBytes[i] = -1;
57             }
58         }
59         return testBytes;
60     }
61 
generateSkips(final byte[] bytes, final int numSkips, final Random random)62     private int[] generateSkips(final byte[] bytes, final int numSkips, final Random random) {
63         final int[] skips = new int[numSkips];
64         for (int i = 0; i < skips.length; i++) {
65             skips[i] = random.nextInt(bytes.length / numSkips) + bytes.length / 10;
66         }
67         return skips;
68     }
69 
inflate(final byte[] deflated)70     private InputStream inflate(final byte[] deflated) throws IOException {
71         final ByteArrayOutputStream bos = new ByteArrayOutputStream();
72         IOUtils.copy(new InflaterInputStream(new ByteArrayInputStream(deflated), new Inflater(true)), bos);
73         return new ByteArrayInputStream(bos.toByteArray());
74     }
75 
76     @BeforeEach
setUp()77     public void setUp() {
78         // Not the best random we can use but good enough here.
79         seed = new Random().nextLong();
80     }
81 
testSkipFullyOnInflaterInputStream(final Supplier<byte[]> baSupplier)82     private void testSkipFullyOnInflaterInputStream(final Supplier<byte[]> baSupplier) throws Exception {
83         final long thisSeed = seed;
84         // thisSeed = -727624427837034313l;
85         final Random random = new Random(thisSeed);
86         final byte[] bytes;
87         try (final InputStream inputStream = getClass().getResourceAsStream(FIXTURE)) {
88             bytes = IOUtils.toByteArray(inputStream);
89         }
90         final int numSkips = random.nextInt(bytes.length) / 100 + 1;
91 
92         final int[] skips = generateSkips(bytes, numSkips, random);
93         final int[] expected;
94         try (final InputStream inflate = inflate(bytes)) {
95             expected = generateExpected(inflate, skips);
96         }
97 
98         final int numThreads = 2;
99         final int iterations = 100;
100         final ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
101         final ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
102 
103         for (int i = 0; i < numThreads; i++) {
104             executorCompletionService.submit(() -> {
105                 for (int iteration = 0; iteration < iterations; iteration++) {
106                     try (InputStream is = new InflaterInputStream(new ByteArrayInputStream(bytes), new Inflater(true))) {
107                         for (int skipIndex = 0; skipIndex < skips.length; skipIndex++) {
108                             try {
109                                 IOUtils.skipFully(is, skips[skipIndex], baSupplier);
110                                 final int c = is.read();
111                                 assertEquals(expected[skipIndex], c, "failed on seed=" + seed + " iteration=" + iteration);
112                             } catch (final EOFException e) {
113                                 assertEquals(expected[skipIndex], is.read(), "failed on " + "seed=" + seed + " iteration=" + iteration);
114                             }
115                         }
116                     }
117                 }
118                 return 1;
119             });
120         }
121 
122         int finished = 0;
123         while (finished < numThreads) {
124             // blocking
125             final Future<Integer> future = executorCompletionService.take();
126             try {
127                 future.get();
128             } catch (final Exception e) {
129                 // printStackTrace() for simpler debugging
130                 e.printStackTrace();
131                 fail("failed on seed=" + seed);
132             }
133             finished++;
134         }
135     }
136 
137     @Test
testSkipFullyOnInflaterInputStream_New_bytes()138     public void testSkipFullyOnInflaterInputStream_New_bytes() throws Exception {
139         testSkipFullyOnInflaterInputStream(() -> new byte[4096]);
140     }
141 
142     @Test
testSkipFullyOnInflaterInputStream_ThreadLocal()143     public void testSkipFullyOnInflaterInputStream_ThreadLocal() throws Exception {
144         testSkipFullyOnInflaterInputStream(threadLocal::get);
145     }
146 
147 }
148