1 /* 2 * Copyright (C) 2014 The Android Open Source Project 3 * Copyright (c) 1995, 2006, Oracle and/or its affiliates. All rights reserved. 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This code is free software; you can redistribute it and/or modify it 7 * under the terms of the GNU General Public License version 2 only, as 8 * published by the Free Software Foundation. Oracle designates this 9 * particular file as subject to the "Classpath" exception as provided 10 * by Oracle in the LICENSE file that accompanied this code. 11 * 12 * This code is distributed in the hope that it will be useful, but WITHOUT 13 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 14 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 15 * version 2 for more details (a copy is included in the LICENSE file that 16 * accompanied this code). 17 * 18 * You should have received a copy of the GNU General Public License version 19 * 2 along with this work; if not, write to the Free Software Foundation, 20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 21 * 22 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 23 * or visit www.oracle.com if you need additional information or have any 24 * questions. 25 */ 26 27 package java.io; 28 29 /* ----- BEGIN android ----- 30 Import required for IoUtils.throwInterruptedIoException. 31 ----- END android -----*/ 32 import libcore.io.IoUtils; 33 34 /** 35 * A piped input stream should be connected 36 * to a piped output stream; the piped input 37 * stream then provides whatever data bytes 38 * are written to the piped output stream. 39 * Typically, data is read from a <code>PipedInputStream</code> 40 * object by one thread and data is written 41 * to the corresponding <code>PipedOutputStream</code> 42 * by some other thread. Attempting to use 43 * both objects from a single thread is not 44 * recommended, as it may deadlock the thread. 45 * The piped input stream contains a buffer, 46 * decoupling read operations from write operations, 47 * within limits. 48 * A pipe is said to be <a name=BROKEN> <i>broken</i> </a> if a 49 * thread that was providing data bytes to the connected 50 * piped output stream is no longer alive. 51 * 52 * @author James Gosling 53 * @see java.io.PipedOutputStream 54 * @since JDK1.0 55 */ 56 public class PipedInputStream extends InputStream { 57 boolean closedByWriter = false; 58 volatile boolean closedByReader = false; 59 boolean connected = false; 60 61 /* REMIND: identification of the read and write sides needs to be 62 more sophisticated. Either using thread groups (but what about 63 pipes within a thread?) or using finalization (but it may be a 64 long time until the next GC). */ 65 Thread readSide; 66 Thread writeSide; 67 68 private static final int DEFAULT_PIPE_SIZE = 1024; 69 70 /** 71 * The default size of the pipe's circular input buffer. 72 * @since JDK1.1 73 */ 74 // This used to be a constant before the pipe size was allowed 75 // to change. This field will continue to be maintained 76 // for backward compatibility. 77 protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE; 78 79 /** 80 * The circular buffer into which incoming data is placed. 81 * @since JDK1.1 82 */ 83 protected byte buffer[]; 84 85 /** 86 * The index of the position in the circular buffer at which the 87 * next byte of data will be stored when received from the connected 88 * piped output stream. <code>in<0</code> implies the buffer is empty, 89 * <code>in==out</code> implies the buffer is full 90 * @since JDK1.1 91 */ 92 protected int in = -1; 93 94 /** 95 * The index of the position in the circular buffer at which the next 96 * byte of data will be read by this piped input stream. 97 * @since JDK1.1 98 */ 99 protected int out = 0; 100 101 /** 102 * Creates a <code>PipedInputStream</code> so 103 * that it is connected to the piped output 104 * stream <code>src</code>. Data bytes written 105 * to <code>src</code> will then be available 106 * as input from this stream. 107 * 108 * @param src the stream to connect to. 109 * @exception IOException if an I/O error occurs. 110 */ PipedInputStream(PipedOutputStream src)111 public PipedInputStream(PipedOutputStream src) throws IOException { 112 this(src, DEFAULT_PIPE_SIZE); 113 } 114 115 /** 116 * Creates a <code>PipedInputStream</code> so that it is 117 * connected to the piped output stream 118 * <code>src</code> and uses the specified pipe size for 119 * the pipe's buffer. 120 * Data bytes written to <code>src</code> will then 121 * be available as input from this stream. 122 * 123 * @param src the stream to connect to. 124 * @param pipeSize the size of the pipe's buffer. 125 * @exception IOException if an I/O error occurs. 126 * @exception IllegalArgumentException if <code>pipeSize <= 0</code>. 127 * @since 1.6 128 */ PipedInputStream(PipedOutputStream src, int pipeSize)129 public PipedInputStream(PipedOutputStream src, int pipeSize) 130 throws IOException { 131 initPipe(pipeSize); 132 connect(src); 133 } 134 135 /** 136 * Creates a <code>PipedInputStream</code> so 137 * that it is not yet {@linkplain #connect(java.io.PipedOutputStream) 138 * connected}. 139 * It must be {@linkplain java.io.PipedOutputStream#connect( 140 * java.io.PipedInputStream) connected} to a 141 * <code>PipedOutputStream</code> before being used. 142 */ PipedInputStream()143 public PipedInputStream() { 144 initPipe(DEFAULT_PIPE_SIZE); 145 } 146 147 /** 148 * Creates a <code>PipedInputStream</code> so that it is not yet 149 * {@linkplain #connect(java.io.PipedOutputStream) connected} and 150 * uses the specified pipe size for the pipe's buffer. 151 * It must be {@linkplain java.io.PipedOutputStream#connect( 152 * java.io.PipedInputStream) 153 * connected} to a <code>PipedOutputStream</code> before being used. 154 * 155 * @param pipeSize the size of the pipe's buffer. 156 * @exception IllegalArgumentException if <code>pipeSize <= 0</code>. 157 * @since 1.6 158 */ PipedInputStream(int pipeSize)159 public PipedInputStream(int pipeSize) { 160 initPipe(pipeSize); 161 } 162 initPipe(int pipeSize)163 private void initPipe(int pipeSize) { 164 if (pipeSize <= 0) { 165 throw new IllegalArgumentException("Pipe Size <= 0"); 166 } 167 buffer = new byte[pipeSize]; 168 } 169 170 /** 171 * Causes this piped input stream to be connected 172 * to the piped output stream <code>src</code>. 173 * If this object is already connected to some 174 * other piped output stream, an <code>IOException</code> 175 * is thrown. 176 * <p> 177 * If <code>src</code> is an 178 * unconnected piped output stream and <code>snk</code> 179 * is an unconnected piped input stream, they 180 * may be connected by either the call: 181 * <p> 182 * <pre><code>snk.connect(src)</code> </pre> 183 * <p> 184 * or the call: 185 * <p> 186 * <pre><code>src.connect(snk)</code> </pre> 187 * <p> 188 * The two 189 * calls have the same effect. 190 * 191 * @param src The piped output stream to connect to. 192 * @exception IOException if an I/O error occurs. 193 */ connect(PipedOutputStream src)194 public void connect(PipedOutputStream src) throws IOException { 195 src.connect(this); 196 } 197 198 /** 199 * Receives a byte of data. This method will block if no input is 200 * available. 201 * @param b the byte being received 202 * @exception IOException If the pipe is <a href=#BROKEN> <code>broken</code></a>, 203 * {@link #connect(java.io.PipedOutputStream) unconnected}, 204 * closed, or if an I/O error occurs. 205 * @since JDK1.1 206 */ receive(int b)207 protected synchronized void receive(int b) throws IOException { 208 checkStateForReceive(); 209 writeSide = Thread.currentThread(); 210 if (in == out) 211 awaitSpace(); 212 if (in < 0) { 213 in = 0; 214 out = 0; 215 } 216 buffer[in++] = (byte)(b & 0xFF); 217 if (in >= buffer.length) { 218 in = 0; 219 } 220 } 221 222 /** 223 * Receives data into an array of bytes. This method will 224 * block until some input is available. 225 * @param b the buffer into which the data is received 226 * @param off the start offset of the data 227 * @param len the maximum number of bytes received 228 * @exception IOException If the pipe is <a href=#BROKEN> broken</a>, 229 * {@link #connect(java.io.PipedOutputStream) unconnected}, 230 * closed,or if an I/O error occurs. 231 */ receive(byte b[], int off, int len)232 synchronized void receive(byte b[], int off, int len) throws IOException { 233 checkStateForReceive(); 234 writeSide = Thread.currentThread(); 235 int bytesToTransfer = len; 236 while (bytesToTransfer > 0) { 237 if (in == out) 238 awaitSpace(); 239 int nextTransferAmount = 0; 240 if (out < in) { 241 nextTransferAmount = buffer.length - in; 242 } else if (in < out) { 243 if (in == -1) { 244 in = out = 0; 245 nextTransferAmount = buffer.length - in; 246 } else { 247 nextTransferAmount = out - in; 248 } 249 } 250 if (nextTransferAmount > bytesToTransfer) 251 nextTransferAmount = bytesToTransfer; 252 assert(nextTransferAmount > 0); 253 System.arraycopy(b, off, buffer, in, nextTransferAmount); 254 bytesToTransfer -= nextTransferAmount; 255 off += nextTransferAmount; 256 in += nextTransferAmount; 257 if (in >= buffer.length) { 258 in = 0; 259 } 260 } 261 } 262 checkStateForReceive()263 private void checkStateForReceive() throws IOException { 264 if (!connected) { 265 throw new IOException("Pipe not connected"); 266 } else if (closedByWriter || closedByReader) { 267 throw new IOException("Pipe closed"); 268 } else if (readSide != null && !readSide.isAlive()) { 269 throw new IOException("Read end dead"); 270 } 271 } 272 awaitSpace()273 private void awaitSpace() throws IOException { 274 while (in == out) { 275 checkStateForReceive(); 276 277 /* full: kick any waiting readers */ 278 notifyAll(); 279 try { 280 wait(1000); 281 } catch (InterruptedException ex) { 282 /* ----- BEGIN android ----- 283 // throw new java.io.InterruptedIOException(); 284 We need to re-set the interrupt status of the thread through 285 IoUtils.throwInterruptedIoException. 286 ----- END android ----- */ 287 IoUtils.throwInterruptedIoException(); 288 } 289 } 290 } 291 292 /** 293 * Notifies all waiting threads that the last byte of data has been 294 * received. 295 */ receivedLast()296 synchronized void receivedLast() { 297 closedByWriter = true; 298 notifyAll(); 299 } 300 301 /** 302 * Reads the next byte of data from this piped input stream. The 303 * value byte is returned as an <code>int</code> in the range 304 * <code>0</code> to <code>255</code>. 305 * This method blocks until input data is available, the end of the 306 * stream is detected, or an exception is thrown. 307 * 308 * @return the next byte of data, or <code>-1</code> if the end of the 309 * stream is reached. 310 * @exception IOException if the pipe is 311 * {@link #connect(java.io.PipedOutputStream) unconnected}, 312 * <a href=#BROKEN> <code>broken</code></a>, closed, 313 * or if an I/O error occurs. 314 */ read()315 public synchronized int read() throws IOException { 316 if (!connected) { 317 throw new IOException("Pipe not connected"); 318 } else if (closedByReader) { 319 throw new IOException("Pipe closed"); 320 } else if (writeSide != null && !writeSide.isAlive() 321 && !closedByWriter && (in < 0)) { 322 throw new IOException("Write end dead"); 323 } 324 325 readSide = Thread.currentThread(); 326 int trials = 2; 327 while (in < 0) { 328 if (closedByWriter) { 329 /* closed by writer, return EOF */ 330 return -1; 331 } 332 if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { 333 throw new IOException("Pipe broken"); 334 } 335 /* might be a writer waiting */ 336 notifyAll(); 337 try { 338 wait(1000); 339 } catch (InterruptedException ex) { 340 /* ----- BEGIN android ----- 341 // throw new java.io.InterruptedIOException(); 342 We need to re-set the interrupt status of the thread through 343 IoUtils.throwInterruptedIoException. 344 ----- END android ----- */ 345 IoUtils.throwInterruptedIoException(); 346 } 347 } 348 int ret = buffer[out++] & 0xFF; 349 if (out >= buffer.length) { 350 out = 0; 351 } 352 if (in == out) { 353 /* now empty */ 354 in = -1; 355 } 356 357 return ret; 358 } 359 360 /** 361 * Reads up to <code>len</code> bytes of data from this piped input 362 * stream into an array of bytes. Less than <code>len</code> bytes 363 * will be read if the end of the data stream is reached or if 364 * <code>len</code> exceeds the pipe's buffer size. 365 * If <code>len </code> is zero, then no bytes are read and 0 is returned; 366 * otherwise, the method blocks until at least 1 byte of input is 367 * available, end of the stream has been detected, or an exception is 368 * thrown. 369 * 370 * @param b the buffer into which the data is read. 371 * @param off the start offset in the destination array <code>b</code> 372 * @param len the maximum number of bytes read. 373 * @return the total number of bytes read into the buffer, or 374 * <code>-1</code> if there is no more data because the end of 375 * the stream has been reached. 376 * @exception NullPointerException If <code>b</code> is <code>null</code>. 377 * @exception IndexOutOfBoundsException If <code>off</code> is negative, 378 * <code>len</code> is negative, or <code>len</code> is greater than 379 * <code>b.length - off</code> 380 * @exception IOException if the pipe is <a href=#BROKEN> <code>broken</code></a>, 381 * {@link #connect(java.io.PipedOutputStream) unconnected}, 382 * closed, or if an I/O error occurs. 383 */ read(byte b[], int off, int len)384 public synchronized int read(byte b[], int off, int len) throws IOException { 385 if (b == null) { 386 throw new NullPointerException(); 387 } else if (off < 0 || len < 0 || len > b.length - off) { 388 throw new IndexOutOfBoundsException(); 389 } else if (len == 0) { 390 return 0; 391 } 392 393 /* possibly wait on the first character */ 394 int c = read(); 395 if (c < 0) { 396 return -1; 397 } 398 b[off] = (byte) c; 399 int rlen = 1; 400 while ((in >= 0) && (len > 1)) { 401 402 int available; 403 404 if (in > out) { 405 available = Math.min((buffer.length - out), (in - out)); 406 } else { 407 available = buffer.length - out; 408 } 409 410 // A byte is read beforehand outside the loop 411 if (available > (len - 1)) { 412 available = len - 1; 413 } 414 System.arraycopy(buffer, out, b, off + rlen, available); 415 out += available; 416 rlen += available; 417 len -= available; 418 419 if (out >= buffer.length) { 420 out = 0; 421 } 422 if (in == out) { 423 /* now empty */ 424 in = -1; 425 } 426 } 427 return rlen; 428 } 429 430 /** 431 * Returns the number of bytes that can be read from this input 432 * stream without blocking. 433 * 434 * @return the number of bytes that can be read from this input stream 435 * without blocking, or {@code 0} if this input stream has been 436 * closed by invoking its {@link #close()} method, or if the pipe 437 * is {@link #connect(java.io.PipedOutputStream) unconnected}, or 438 * <a href=#BROKEN> <code>broken</code></a>. 439 * 440 * @exception IOException if an I/O error occurs. 441 * @since JDK1.0.2 442 */ available()443 public synchronized int available() throws IOException { 444 if(in < 0) 445 return 0; 446 else if(in == out) 447 return buffer.length; 448 else if (in > out) 449 return in - out; 450 else 451 return in + buffer.length - out; 452 } 453 454 /** 455 * Closes this piped input stream and releases any system resources 456 * associated with the stream. 457 * 458 * @exception IOException if an I/O error occurs. 459 */ close()460 public void close() throws IOException { 461 closedByReader = true; 462 synchronized (this) { 463 in = -1; 464 } 465 } 466 } 467