1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.commons.io.input; 18 19 import static org.apache.commons.io.IOUtils.EOF; 20 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.io.PipedInputStream; 24 import java.io.PipedOutputStream; 25 import java.time.Duration; 26 import java.util.Objects; 27 import java.util.concurrent.BlockingQueue; 28 import java.util.concurrent.LinkedBlockingQueue; 29 import java.util.concurrent.TimeUnit; 30 31 import org.apache.commons.io.build.AbstractStreamBuilder; 32 import org.apache.commons.io.output.QueueOutputStream; 33 34 /** 35 * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream. 36 * <p> 37 * To build an instance, see {@link Builder}. 38 * </p> 39 * <p> 40 * Example usage: 41 * </p> 42 * 43 * <pre> 44 * QueueInputStream inputStream = new QueueInputStream(); 45 * QueueOutputStream outputStream = inputStream.newQueueOutputStream(); 46 * 47 * outputStream.write("hello world".getBytes(UTF_8)); 48 * inputStream.read(); 49 * </pre> 50 * <p> 51 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads. 52 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited. 53 * </p> 54 * <p> 55 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an 56 * {@link IOException}. 57 * </p> 58 * 59 * @see QueueOutputStream 60 * @since 2.9.0 61 */ 62 public class QueueInputStream extends InputStream { 63 64 /** 65 * Builds a new {@link QueueInputStream} instance. 66 * <p> 67 * For example: 68 * </p> 69 * 70 * <pre>{@code 71 * QueueInputStream s = QueueInputStream.builder() 72 * .setBlockingQueue(new LinkedBlockingQueue<>()) 73 * .setTimeout(Duration.ZERO) 74 * .get();} 75 * </pre> 76 * 77 * @since 2.12.0 78 */ 79 public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> { 80 81 private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(); 82 private Duration timeout = Duration.ZERO; 83 84 /** 85 * Constructs a new instance. 86 * <p> 87 * This builder use the aspects BlockingQueue and timeout. 88 * </p> 89 * 90 * @return a new instance. 91 */ 92 @Override get()93 public QueueInputStream get() { 94 return new QueueInputStream(blockingQueue, timeout); 95 } 96 97 /** 98 * Sets backing queue for the stream. 99 * 100 * @param blockingQueue backing queue for the stream. 101 * @return this 102 */ setBlockingQueue(final BlockingQueue<Integer> blockingQueue)103 public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) { 104 this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>(); 105 return this; 106 } 107 108 /** 109 * Sets the polling timeout. 110 * 111 * @param timeout the polling timeout. 112 * @return this. 113 */ setTimeout(final Duration timeout)114 public Builder setTimeout(final Duration timeout) { 115 if (timeout != null && timeout.toNanos() < 0) { 116 throw new IllegalArgumentException("timeout must not be negative"); 117 } 118 this.timeout = timeout != null ? timeout : Duration.ZERO; 119 return this; 120 } 121 122 } 123 124 /** 125 * Constructs a new {@link Builder}. 126 * 127 * @return a new {@link Builder}. 128 * @since 2.12.0 129 */ builder()130 public static Builder builder() { 131 return new Builder(); 132 } 133 134 private final BlockingQueue<Integer> blockingQueue; 135 136 private final long timeoutNanos; 137 138 /** 139 * Constructs a new instance with no limit to its internal queue size and zero timeout. 140 */ QueueInputStream()141 public QueueInputStream() { 142 this(new LinkedBlockingQueue<>()); 143 } 144 145 /** 146 * Constructs a new instance with given queue and zero timeout. 147 * 148 * @param blockingQueue backing queue for the stream. 149 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}. 150 */ 151 @Deprecated QueueInputStream(final BlockingQueue<Integer> blockingQueue)152 public QueueInputStream(final BlockingQueue<Integer> blockingQueue) { 153 this(blockingQueue, Duration.ZERO); 154 } 155 156 /** 157 * Constructs a new instance with given queue and timeout. 158 * 159 * @param blockingQueue backing queue for the stream. 160 * @param timeout how long to wait before giving up when polling the queue. 161 */ QueueInputStream(final BlockingQueue<Integer> blockingQueue, final Duration timeout)162 private QueueInputStream(final BlockingQueue<Integer> blockingQueue, final Duration timeout) { 163 this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue"); 164 this.timeoutNanos = Objects.requireNonNull(timeout, "timeout").toNanos(); 165 } 166 167 /** 168 * Gets the blocking queue. 169 * 170 * @return the blocking queue. 171 */ getBlockingQueue()172 BlockingQueue<Integer> getBlockingQueue() { 173 return blockingQueue; 174 } 175 176 /** 177 * Gets the timeout duration. 178 * 179 * @return the timeout duration. 180 */ getTimeout()181 Duration getTimeout() { 182 return Duration.ofNanos(timeoutNanos); 183 } 184 185 /** 186 * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream. 187 * 188 * @return QueueOutputStream connected to this stream. 189 */ newQueueOutputStream()190 public QueueOutputStream newQueueOutputStream() { 191 return new QueueOutputStream(blockingQueue); 192 } 193 194 /** 195 * Reads and returns a single byte. 196 * 197 * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available. 198 * @throws IllegalStateException if thread is interrupted while waiting. 199 */ 200 @Override read()201 public int read() { 202 try { 203 final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS); 204 return value == null ? EOF : 0xFF & value; 205 } catch (final InterruptedException e) { 206 Thread.currentThread().interrupt(); 207 // throw runtime unchecked exception to maintain signature backward-compatibility of 208 // this read method, which does not declare IOException 209 throw new IllegalStateException(e); 210 } 211 } 212 213 } 214