/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.commons.io; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.util.Random; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.function.Supplier; import java.util.zip.Inflater; import java.util.zip.InflaterInputStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; /** * See Jira ticket IO-802. */ public class IOUtilsMultithreadedSkipTest { private static final String FIXTURE = "TIKA-4065.bin"; long seed = 1; private final ThreadLocal threadLocal = ThreadLocal.withInitial(() -> new byte[4096]); private int[] generateExpected(final InputStream is, final int[] skips) throws IOException { final int[] testBytes = new int[skips.length]; for (int i = 0; i < skips.length; i++) { try { IOUtils.skipFully(is, skips[i]); testBytes[i] = is.read(); } catch (final EOFException e) { testBytes[i] = -1; } } return testBytes; } private int[] generateSkips(final byte[] bytes, final int numSkips, final Random random) { final int[] skips = new int[numSkips]; for (int i = 0; i < skips.length; i++) { skips[i] = random.nextInt(bytes.length / numSkips) + bytes.length / 10; } return skips; } private InputStream inflate(final byte[] deflated) throws IOException { final ByteArrayOutputStream bos = new ByteArrayOutputStream(); IOUtils.copy(new InflaterInputStream(new ByteArrayInputStream(deflated), new Inflater(true)), bos); return new ByteArrayInputStream(bos.toByteArray()); } @BeforeEach public void setUp() { // Not the best random we can use but good enough here. seed = new Random().nextLong(); } private void testSkipFullyOnInflaterInputStream(final Supplier baSupplier) throws Exception { final long thisSeed = seed; // thisSeed = -727624427837034313l; final Random random = new Random(thisSeed); final byte[] bytes; try (final InputStream inputStream = getClass().getResourceAsStream(FIXTURE)) { bytes = IOUtils.toByteArray(inputStream); } final int numSkips = random.nextInt(bytes.length) / 100 + 1; final int[] skips = generateSkips(bytes, numSkips, random); final int[] expected; try (final InputStream inflate = inflate(bytes)) { expected = generateExpected(inflate, skips); } final int numThreads = 2; final int iterations = 100; final ExecutorService executorService = Executors.newFixedThreadPool(numThreads); final ExecutorCompletionService executorCompletionService = new ExecutorCompletionService<>(executorService); for (int i = 0; i < numThreads; i++) { executorCompletionService.submit(() -> { for (int iteration = 0; iteration < iterations; iteration++) { try (InputStream is = new InflaterInputStream(new ByteArrayInputStream(bytes), new Inflater(true))) { for (int skipIndex = 0; skipIndex < skips.length; skipIndex++) { try { IOUtils.skipFully(is, skips[skipIndex], baSupplier); final int c = is.read(); assertEquals(expected[skipIndex], c, "failed on seed=" + seed + " iteration=" + iteration); } catch (final EOFException e) { assertEquals(expected[skipIndex], is.read(), "failed on " + "seed=" + seed + " iteration=" + iteration); } } } } return 1; }); } int finished = 0; while (finished < numThreads) { // blocking final Future future = executorCompletionService.take(); try { future.get(); } catch (final Exception e) { // printStackTrace() for simpler debugging e.printStackTrace(); fail("failed on seed=" + seed); } finished++; } } @Test public void testSkipFullyOnInflaterInputStream_New_bytes() throws Exception { testSkipFullyOnInflaterInputStream(() -> new byte[4096]); } @Test public void testSkipFullyOnInflaterInputStream_ThreadLocal() throws Exception { testSkipFullyOnInflaterInputStream(threadLocal::get); } }