1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.commons.io; 19 20 import static org.junit.jupiter.api.Assertions.assertEquals; 21 import static org.junit.jupiter.api.Assertions.fail; 22 23 import java.io.ByteArrayInputStream; 24 import java.io.ByteArrayOutputStream; 25 import java.io.EOFException; 26 import java.io.IOException; 27 import java.io.InputStream; 28 import java.util.Random; 29 import java.util.concurrent.ExecutorCompletionService; 30 import java.util.concurrent.ExecutorService; 31 import java.util.concurrent.Executors; 32 import java.util.concurrent.Future; 33 import java.util.function.Supplier; 34 import java.util.zip.Inflater; 35 import java.util.zip.InflaterInputStream; 36 37 import org.junit.jupiter.api.BeforeEach; 38 import org.junit.jupiter.api.Test; 39 40 /** 41 * See Jira ticket IO-802. 42 */ 43 public class IOUtilsMultithreadedSkipTest { 44 45 private static final String FIXTURE = "TIKA-4065.bin"; 46 long seed = 1; 47 private final ThreadLocal<byte[]> threadLocal = ThreadLocal.withInitial(() -> new byte[4096]); 48 generateExpected(final InputStream is, final int[] skips)49 private int[] generateExpected(final InputStream is, final int[] skips) throws IOException { 50 final int[] testBytes = new int[skips.length]; 51 for (int i = 0; i < skips.length; i++) { 52 try { 53 IOUtils.skipFully(is, skips[i]); 54 testBytes[i] = is.read(); 55 } catch (final EOFException e) { 56 testBytes[i] = -1; 57 } 58 } 59 return testBytes; 60 } 61 generateSkips(final byte[] bytes, final int numSkips, final Random random)62 private int[] generateSkips(final byte[] bytes, final int numSkips, final Random random) { 63 final int[] skips = new int[numSkips]; 64 for (int i = 0; i < skips.length; i++) { 65 skips[i] = random.nextInt(bytes.length / numSkips) + bytes.length / 10; 66 } 67 return skips; 68 } 69 inflate(final byte[] deflated)70 private InputStream inflate(final byte[] deflated) throws IOException { 71 final ByteArrayOutputStream bos = new ByteArrayOutputStream(); 72 IOUtils.copy(new InflaterInputStream(new ByteArrayInputStream(deflated), new Inflater(true)), bos); 73 return new ByteArrayInputStream(bos.toByteArray()); 74 } 75 76 @BeforeEach setUp()77 public void setUp() { 78 // Not the best random we can use but good enough here. 79 seed = new Random().nextLong(); 80 } 81 testSkipFullyOnInflaterInputStream(final Supplier<byte[]> baSupplier)82 private void testSkipFullyOnInflaterInputStream(final Supplier<byte[]> baSupplier) throws Exception { 83 final long thisSeed = seed; 84 // thisSeed = -727624427837034313l; 85 final Random random = new Random(thisSeed); 86 final byte[] bytes; 87 try (final InputStream inputStream = getClass().getResourceAsStream(FIXTURE)) { 88 bytes = IOUtils.toByteArray(inputStream); 89 } 90 final int numSkips = random.nextInt(bytes.length) / 100 + 1; 91 92 final int[] skips = generateSkips(bytes, numSkips, random); 93 final int[] expected; 94 try (final InputStream inflate = inflate(bytes)) { 95 expected = generateExpected(inflate, skips); 96 } 97 98 final int numThreads = 2; 99 final int iterations = 100; 100 final ExecutorService executorService = Executors.newFixedThreadPool(numThreads); 101 final ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService); 102 103 for (int i = 0; i < numThreads; i++) { 104 executorCompletionService.submit(() -> { 105 for (int iteration = 0; iteration < iterations; iteration++) { 106 try (InputStream is = new InflaterInputStream(new ByteArrayInputStream(bytes), new Inflater(true))) { 107 for (int skipIndex = 0; skipIndex < skips.length; skipIndex++) { 108 try { 109 IOUtils.skipFully(is, skips[skipIndex], baSupplier); 110 final int c = is.read(); 111 assertEquals(expected[skipIndex], c, "failed on seed=" + seed + " iteration=" + iteration); 112 } catch (final EOFException e) { 113 assertEquals(expected[skipIndex], is.read(), "failed on " + "seed=" + seed + " iteration=" + iteration); 114 } 115 } 116 } 117 } 118 return 1; 119 }); 120 } 121 122 int finished = 0; 123 while (finished < numThreads) { 124 // blocking 125 final Future<Integer> future = executorCompletionService.take(); 126 try { 127 future.get(); 128 } catch (final Exception e) { 129 // printStackTrace() for simpler debugging 130 e.printStackTrace(); 131 fail("failed on seed=" + seed); 132 } 133 finished++; 134 } 135 } 136 137 @Test testSkipFullyOnInflaterInputStream_New_bytes()138 public void testSkipFullyOnInflaterInputStream_New_bytes() throws Exception { 139 testSkipFullyOnInflaterInputStream(() -> new byte[4096]); 140 } 141 142 @Test testSkipFullyOnInflaterInputStream_ThreadLocal()143 public void testSkipFullyOnInflaterInputStream_ThreadLocal() throws Exception { 144 testSkipFullyOnInflaterInputStream(threadLocal::get); 145 } 146 147 } 148