1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.commons.io.output; 18 19 import java.io.IOException; 20 import java.io.InterruptedIOException; 21 import java.io.OutputStream; 22 import java.io.PipedInputStream; 23 import java.io.PipedOutputStream; 24 import java.util.Objects; 25 import java.util.concurrent.BlockingQueue; 26 import java.util.concurrent.LinkedBlockingQueue; 27 28 import org.apache.commons.io.input.QueueInputStream; 29 30 /** 31 * Simple alternative to JDK {@link java.io.PipedOutputStream}; queue input stream provides what's written in queue 32 * output stream. 33 * <p> 34 * Example usage: 35 * </p> 36 * 37 * <pre> 38 * QueueOutputStream outputStream = new QueueOutputStream(); 39 * QueueInputStream inputStream = outputStream.newPipeInputStream(); 40 * 41 * outputStream.write("hello world".getBytes(UTF_8)); 42 * inputStream.read(); 43 * </pre> 44 * 45 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a 46 * single thread or multiple threads. Also, unlike JDK classes, no special meaning is attached to initial or current 47 * thread. Instances can be used longer after initial threads exited. 48 * <p> 49 * Closing a {@link QueueOutputStream} has no effect. The methods in this class can be called after the stream has been 50 * closed without generating an {@link IOException}. 51 * </p> 52 * 53 * @see QueueInputStream 54 * @since 2.9.0 55 */ 56 public class QueueOutputStream extends OutputStream { 57 58 private final BlockingQueue<Integer> blockingQueue; 59 60 /** 61 * Constructs a new instance with no limit to internal buffer size. 62 */ QueueOutputStream()63 public QueueOutputStream() { 64 this(new LinkedBlockingQueue<>()); 65 } 66 67 /** 68 * Constructs a new instance with given buffer. 69 * 70 * @param blockingQueue backing queue for the stream 71 */ QueueOutputStream(final BlockingQueue<Integer> blockingQueue)72 public QueueOutputStream(final BlockingQueue<Integer> blockingQueue) { 73 this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue"); 74 } 75 76 /** 77 * Creates a new QueueInputStream instance connected to this. Writes to this output stream will be visible to the 78 * input stream. 79 * 80 * @return QueueInputStream connected to this stream 81 */ newQueueInputStream()82 public QueueInputStream newQueueInputStream() { 83 return new QueueInputStream(blockingQueue); 84 } 85 86 /** 87 * Writes a single byte. 88 * 89 * @throws InterruptedIOException if the thread is interrupted while writing to the queue. 90 */ 91 @Override write(final int b)92 public void write(final int b) throws InterruptedIOException { 93 try { 94 blockingQueue.put(0xFF & b); 95 } catch (final InterruptedException e) { 96 Thread.currentThread().interrupt(); 97 final InterruptedIOException interruptedIoException = new InterruptedIOException(); 98 interruptedIoException.initCause(e); 99 throw interruptedIoException; 100 } 101 } 102 } 103