• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package org.apache.commons.io.input;
18 
19 import static org.apache.commons.io.IOUtils.EOF;
20 
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.PipedInputStream;
24 import java.io.PipedOutputStream;
25 import java.time.Duration;
26 import java.util.Objects;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.TimeUnit;
30 
31 import org.apache.commons.io.build.AbstractStreamBuilder;
32 import org.apache.commons.io.output.QueueOutputStream;
33 
34 /**
35  * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream.
36  * <p>
37  * To build an instance, see {@link Builder}.
38  * </p>
39  * <p>
40  * Example usage:
41  * </p>
42  *
43  * <pre>
44  * QueueInputStream inputStream = new QueueInputStream();
45  * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
46  *
47  * outputStream.write("hello world".getBytes(UTF_8));
48  * inputStream.read();
49  * </pre>
50  * <p>
51  * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
52  * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
53  * </p>
54  * <p>
55  * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
56  * {@link IOException}.
57  * </p>
58  *
59  * @see QueueOutputStream
60  * @since 2.9.0
61  */
62 public class QueueInputStream extends InputStream {
63 
64     /**
65      * Builds a new {@link QueueInputStream} instance.
66      * <p>
67      * For example:
68      * </p>
69      *
70      * <pre>{@code
71      * QueueInputStream s = QueueInputStream.builder()
72      *   .setBlockingQueue(new LinkedBlockingQueue<>())
73      *   .setTimeout(Duration.ZERO)
74      *   .get();}
75      * </pre>
76      *
77      * @since 2.12.0
78      */
79     public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {
80 
81         private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
82         private Duration timeout = Duration.ZERO;
83 
84         /**
85          * Constructs a new instance.
86          * <p>
87          * This builder use the aspects BlockingQueue and timeout.
88          * </p>
89          *
90          * @return a new instance.
91          */
92         @Override
get()93         public QueueInputStream get() {
94             return new QueueInputStream(blockingQueue, timeout);
95         }
96 
97         /**
98          * Sets backing queue for the stream.
99          *
100          * @param blockingQueue backing queue for the stream.
101          * @return this
102          */
setBlockingQueue(final BlockingQueue<Integer> blockingQueue)103         public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
104             this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
105             return this;
106         }
107 
108         /**
109          * Sets the polling timeout.
110          *
111          * @param timeout the polling timeout.
112          * @return this.
113          */
setTimeout(final Duration timeout)114         public Builder setTimeout(final Duration timeout) {
115             if (timeout != null && timeout.toNanos() < 0) {
116                 throw new IllegalArgumentException("timeout must not be negative");
117             }
118             this.timeout = timeout != null ? timeout : Duration.ZERO;
119             return this;
120         }
121 
122     }
123 
124     /**
125      * Constructs a new {@link Builder}.
126      *
127      * @return a new {@link Builder}.
128      * @since 2.12.0
129      */
builder()130     public static Builder builder() {
131         return new Builder();
132     }
133 
134     private final BlockingQueue<Integer> blockingQueue;
135 
136     private final long timeoutNanos;
137 
138     /**
139      * Constructs a new instance with no limit to its internal queue size and zero timeout.
140      */
QueueInputStream()141     public QueueInputStream() {
142         this(new LinkedBlockingQueue<>());
143     }
144 
145     /**
146      * Constructs a new instance with given queue and zero timeout.
147      *
148      * @param blockingQueue backing queue for the stream.
149      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
150      */
151     @Deprecated
QueueInputStream(final BlockingQueue<Integer> blockingQueue)152     public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
153         this(blockingQueue, Duration.ZERO);
154     }
155 
156     /**
157      * Constructs a new instance with given queue and timeout.
158      *
159      * @param blockingQueue backing queue for the stream.
160      * @param timeout       how long to wait before giving up when polling the queue.
161      */
QueueInputStream(final BlockingQueue<Integer> blockingQueue, final Duration timeout)162     private QueueInputStream(final BlockingQueue<Integer> blockingQueue, final Duration timeout) {
163         this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue");
164         this.timeoutNanos = Objects.requireNonNull(timeout, "timeout").toNanos();
165     }
166 
167     /**
168      * Gets the blocking queue.
169      *
170      * @return the blocking queue.
171      */
getBlockingQueue()172     BlockingQueue<Integer> getBlockingQueue() {
173         return blockingQueue;
174     }
175 
176     /**
177      * Gets the timeout duration.
178      *
179      * @return the timeout duration.
180      */
getTimeout()181     Duration getTimeout() {
182         return Duration.ofNanos(timeoutNanos);
183     }
184 
185     /**
186      * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
187      *
188      * @return QueueOutputStream connected to this stream.
189      */
newQueueOutputStream()190     public QueueOutputStream newQueueOutputStream() {
191         return new QueueOutputStream(blockingQueue);
192     }
193 
194     /**
195      * Reads and returns a single byte.
196      *
197      * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
198      * @throws IllegalStateException if thread is interrupted while waiting.
199      */
200     @Override
read()201     public int read() {
202         try {
203             final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
204             return value == null ? EOF : 0xFF & value;
205         } catch (final InterruptedException e) {
206             Thread.currentThread().interrupt();
207             // throw runtime unchecked exception to maintain signature backward-compatibility of
208             // this read method, which does not declare IOException
209             throw new IllegalStateException(e);
210         }
211     }
212 
213 }
214