• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2014 The Android Open Source Project
3  * Copyright (c) 1995, 2006, Oracle and/or its affiliates. All rights reserved.
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This code is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License version 2 only, as
8  * published by the Free Software Foundation.  Oracle designates this
9  * particular file as subject to the "Classpath" exception as provided
10  * by Oracle in the LICENSE file that accompanied this code.
11  *
12  * This code is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15  * version 2 for more details (a copy is included in the LICENSE file that
16  * accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License version
19  * 2 along with this work; if not, write to the Free Software Foundation,
20  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
21  *
22  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
23  * or visit www.oracle.com if you need additional information or have any
24  * questions.
25  */
26 
27 package java.io;
28 
29 /* ----- BEGIN android -----
30 Import required for IoUtils.throwInterruptedIoException.
31 ----- END android -----*/
32 import libcore.io.IoUtils;
33 
34 /**
35  * A piped input stream should be connected
36  * to a piped output stream; the piped  input
37  * stream then provides whatever data bytes
38  * are written to the piped output  stream.
39  * Typically, data is read from a <code>PipedInputStream</code>
40  * object by one thread  and data is written
41  * to the corresponding <code>PipedOutputStream</code>
42  * by some  other thread. Attempting to use
43  * both objects from a single thread is not
44  * recommended, as it may deadlock the thread.
45  * The piped input stream contains a buffer,
46  * decoupling read operations from write operations,
47  * within limits.
48  * A pipe is said to be <a name=BROKEN> <i>broken</i> </a> if a
49  * thread that was providing data bytes to the connected
50  * piped output stream is no longer alive.
51  *
52  * @author  James Gosling
53  * @see     java.io.PipedOutputStream
54  * @since   JDK1.0
55  */
56 public class PipedInputStream extends InputStream {
57     boolean closedByWriter = false;
58     volatile boolean closedByReader = false;
59     boolean connected = false;
60 
61         /* REMIND: identification of the read and write sides needs to be
62            more sophisticated.  Either using thread groups (but what about
63            pipes within a thread?) or using finalization (but it may be a
64            long time until the next GC). */
65     Thread readSide;
66     Thread writeSide;
67 
68     private static final int DEFAULT_PIPE_SIZE = 1024;
69 
70     /**
71      * The default size of the pipe's circular input buffer.
72      * @since   JDK1.1
73      */
74     // This used to be a constant before the pipe size was allowed
75     // to change. This field will continue to be maintained
76     // for backward compatibility.
77     protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
78 
79     /**
80      * The circular buffer into which incoming data is placed.
81      * @since   JDK1.1
82      */
83     protected byte buffer[];
84 
85     /**
86      * The index of the position in the circular buffer at which the
87      * next byte of data will be stored when received from the connected
88      * piped output stream. <code>in&lt;0</code> implies the buffer is empty,
89      * <code>in==out</code> implies the buffer is full
90      * @since   JDK1.1
91      */
92     protected int in = -1;
93 
94     /**
95      * The index of the position in the circular buffer at which the next
96      * byte of data will be read by this piped input stream.
97      * @since   JDK1.1
98      */
99     protected int out = 0;
100 
101     /**
102      * Creates a <code>PipedInputStream</code> so
103      * that it is connected to the piped output
104      * stream <code>src</code>. Data bytes written
105      * to <code>src</code> will then be  available
106      * as input from this stream.
107      *
108      * @param      src   the stream to connect to.
109      * @exception  IOException  if an I/O error occurs.
110      */
PipedInputStream(PipedOutputStream src)111     public PipedInputStream(PipedOutputStream src) throws IOException {
112         this(src, DEFAULT_PIPE_SIZE);
113     }
114 
115     /**
116      * Creates a <code>PipedInputStream</code> so that it is
117      * connected to the piped output stream
118      * <code>src</code> and uses the specified pipe size for
119      * the pipe's buffer.
120      * Data bytes written to <code>src</code> will then
121      * be available as input from this stream.
122      *
123      * @param      src   the stream to connect to.
124      * @param      pipeSize the size of the pipe's buffer.
125      * @exception  IOException  if an I/O error occurs.
126      * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
127      * @since      1.6
128      */
PipedInputStream(PipedOutputStream src, int pipeSize)129     public PipedInputStream(PipedOutputStream src, int pipeSize)
130             throws IOException {
131          initPipe(pipeSize);
132          connect(src);
133     }
134 
135     /**
136      * Creates a <code>PipedInputStream</code> so
137      * that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
138      * connected}.
139      * It must be {@linkplain java.io.PipedOutputStream#connect(
140      * java.io.PipedInputStream) connected} to a
141      * <code>PipedOutputStream</code> before being used.
142      */
PipedInputStream()143     public PipedInputStream() {
144         initPipe(DEFAULT_PIPE_SIZE);
145     }
146 
147     /**
148      * Creates a <code>PipedInputStream</code> so that it is not yet
149      * {@linkplain #connect(java.io.PipedOutputStream) connected} and
150      * uses the specified pipe size for the pipe's buffer.
151      * It must be {@linkplain java.io.PipedOutputStream#connect(
152      * java.io.PipedInputStream)
153      * connected} to a <code>PipedOutputStream</code> before being used.
154      *
155      * @param      pipeSize the size of the pipe's buffer.
156      * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
157      * @since      1.6
158      */
PipedInputStream(int pipeSize)159     public PipedInputStream(int pipeSize) {
160         initPipe(pipeSize);
161     }
162 
initPipe(int pipeSize)163     private void initPipe(int pipeSize) {
164          if (pipeSize <= 0) {
165             throw new IllegalArgumentException("Pipe Size <= 0");
166          }
167          buffer = new byte[pipeSize];
168     }
169 
170     /**
171      * Causes this piped input stream to be connected
172      * to the piped  output stream <code>src</code>.
173      * If this object is already connected to some
174      * other piped output  stream, an <code>IOException</code>
175      * is thrown.
176      * <p>
177      * If <code>src</code> is an
178      * unconnected piped output stream and <code>snk</code>
179      * is an unconnected piped input stream, they
180      * may be connected by either the call:
181      * <p>
182      * <pre><code>snk.connect(src)</code> </pre>
183      * <p>
184      * or the call:
185      * <p>
186      * <pre><code>src.connect(snk)</code> </pre>
187      * <p>
188      * The two
189      * calls have the same effect.
190      *
191      * @param      src   The piped output stream to connect to.
192      * @exception  IOException  if an I/O error occurs.
193      */
connect(PipedOutputStream src)194     public void connect(PipedOutputStream src) throws IOException {
195         src.connect(this);
196     }
197 
198     /**
199      * Receives a byte of data.  This method will block if no input is
200      * available.
201      * @param b the byte being received
202      * @exception IOException If the pipe is <a href=#BROKEN> <code>broken</code></a>,
203      *          {@link #connect(java.io.PipedOutputStream) unconnected},
204      *          closed, or if an I/O error occurs.
205      * @since     JDK1.1
206      */
receive(int b)207     protected synchronized void receive(int b) throws IOException {
208         checkStateForReceive();
209         writeSide = Thread.currentThread();
210         if (in == out)
211             awaitSpace();
212         if (in < 0) {
213             in = 0;
214             out = 0;
215         }
216         buffer[in++] = (byte)(b & 0xFF);
217         if (in >= buffer.length) {
218             in = 0;
219         }
220     }
221 
222     /**
223      * Receives data into an array of bytes.  This method will
224      * block until some input is available.
225      * @param b the buffer into which the data is received
226      * @param off the start offset of the data
227      * @param len the maximum number of bytes received
228      * @exception IOException If the pipe is <a href=#BROKEN> broken</a>,
229      *           {@link #connect(java.io.PipedOutputStream) unconnected},
230      *           closed,or if an I/O error occurs.
231      */
receive(byte b[], int off, int len)232     synchronized void receive(byte b[], int off, int len)  throws IOException {
233         checkStateForReceive();
234         writeSide = Thread.currentThread();
235         int bytesToTransfer = len;
236         while (bytesToTransfer > 0) {
237             if (in == out)
238                 awaitSpace();
239             int nextTransferAmount = 0;
240             if (out < in) {
241                 nextTransferAmount = buffer.length - in;
242             } else if (in < out) {
243                 if (in == -1) {
244                     in = out = 0;
245                     nextTransferAmount = buffer.length - in;
246                 } else {
247                     nextTransferAmount = out - in;
248                 }
249             }
250             if (nextTransferAmount > bytesToTransfer)
251                 nextTransferAmount = bytesToTransfer;
252             assert(nextTransferAmount > 0);
253             System.arraycopy(b, off, buffer, in, nextTransferAmount);
254             bytesToTransfer -= nextTransferAmount;
255             off += nextTransferAmount;
256             in += nextTransferAmount;
257             if (in >= buffer.length) {
258                 in = 0;
259             }
260         }
261     }
262 
checkStateForReceive()263     private void checkStateForReceive() throws IOException {
264         if (!connected) {
265             throw new IOException("Pipe not connected");
266         } else if (closedByWriter || closedByReader) {
267             throw new IOException("Pipe closed");
268         } else if (readSide != null && !readSide.isAlive()) {
269             throw new IOException("Read end dead");
270         }
271     }
272 
awaitSpace()273     private void awaitSpace() throws IOException {
274         while (in == out) {
275             checkStateForReceive();
276 
277             /* full: kick any waiting readers */
278             notifyAll();
279             try {
280                 wait(1000);
281             } catch (InterruptedException ex) {
282                 /* ----- BEGIN android -----
283                 // throw new java.io.InterruptedIOException();
284                 We need to re-set the interrupt status of the thread through
285                 IoUtils.throwInterruptedIoException.
286                 ----- END android ----- */
287                 IoUtils.throwInterruptedIoException();
288             }
289         }
290     }
291 
292     /**
293      * Notifies all waiting threads that the last byte of data has been
294      * received.
295      */
receivedLast()296     synchronized void receivedLast() {
297         closedByWriter = true;
298         notifyAll();
299     }
300 
301     /**
302      * Reads the next byte of data from this piped input stream. The
303      * value byte is returned as an <code>int</code> in the range
304      * <code>0</code> to <code>255</code>.
305      * This method blocks until input data is available, the end of the
306      * stream is detected, or an exception is thrown.
307      *
308      * @return     the next byte of data, or <code>-1</code> if the end of the
309      *             stream is reached.
310      * @exception  IOException  if the pipe is
311      *           {@link #connect(java.io.PipedOutputStream) unconnected},
312      *           <a href=#BROKEN> <code>broken</code></a>, closed,
313      *           or if an I/O error occurs.
314      */
read()315     public synchronized int read()  throws IOException {
316         if (!connected) {
317             throw new IOException("Pipe not connected");
318         } else if (closedByReader) {
319             throw new IOException("Pipe closed");
320         } else if (writeSide != null && !writeSide.isAlive()
321                    && !closedByWriter && (in < 0)) {
322             throw new IOException("Write end dead");
323         }
324 
325         readSide = Thread.currentThread();
326         int trials = 2;
327         while (in < 0) {
328             if (closedByWriter) {
329                 /* closed by writer, return EOF */
330                 return -1;
331             }
332             if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
333                 throw new IOException("Pipe broken");
334             }
335             /* might be a writer waiting */
336             notifyAll();
337             try {
338                 wait(1000);
339             } catch (InterruptedException ex) {
340                 /* ----- BEGIN android -----
341                 // throw new java.io.InterruptedIOException();
342                 We need to re-set the interrupt status of the thread through
343                 IoUtils.throwInterruptedIoException.
344                 ----- END android ----- */
345                 IoUtils.throwInterruptedIoException();
346             }
347         }
348         int ret = buffer[out++] & 0xFF;
349         if (out >= buffer.length) {
350             out = 0;
351         }
352         if (in == out) {
353             /* now empty */
354             in = -1;
355         }
356 
357         return ret;
358     }
359 
360     /**
361      * Reads up to <code>len</code> bytes of data from this piped input
362      * stream into an array of bytes. Less than <code>len</code> bytes
363      * will be read if the end of the data stream is reached or if
364      * <code>len</code> exceeds the pipe's buffer size.
365      * If <code>len </code> is zero, then no bytes are read and 0 is returned;
366      * otherwise, the method blocks until at least 1 byte of input is
367      * available, end of the stream has been detected, or an exception is
368      * thrown.
369      *
370      * @param      b     the buffer into which the data is read.
371      * @param      off   the start offset in the destination array <code>b</code>
372      * @param      len   the maximum number of bytes read.
373      * @return     the total number of bytes read into the buffer, or
374      *             <code>-1</code> if there is no more data because the end of
375      *             the stream has been reached.
376      * @exception  NullPointerException If <code>b</code> is <code>null</code>.
377      * @exception  IndexOutOfBoundsException If <code>off</code> is negative,
378      * <code>len</code> is negative, or <code>len</code> is greater than
379      * <code>b.length - off</code>
380      * @exception  IOException if the pipe is <a href=#BROKEN> <code>broken</code></a>,
381      *           {@link #connect(java.io.PipedOutputStream) unconnected},
382      *           closed, or if an I/O error occurs.
383      */
read(byte b[], int off, int len)384     public synchronized int read(byte b[], int off, int len)  throws IOException {
385         if (b == null) {
386             throw new NullPointerException();
387         } else if (off < 0 || len < 0 || len > b.length - off) {
388             throw new IndexOutOfBoundsException();
389         } else if (len == 0) {
390             return 0;
391         }
392 
393         /* possibly wait on the first character */
394         int c = read();
395         if (c < 0) {
396             return -1;
397         }
398         b[off] = (byte) c;
399         int rlen = 1;
400         while ((in >= 0) && (len > 1)) {
401 
402             int available;
403 
404             if (in > out) {
405                 available = Math.min((buffer.length - out), (in - out));
406             } else {
407                 available = buffer.length - out;
408             }
409 
410             // A byte is read beforehand outside the loop
411             if (available > (len - 1)) {
412                 available = len - 1;
413             }
414             System.arraycopy(buffer, out, b, off + rlen, available);
415             out += available;
416             rlen += available;
417             len -= available;
418 
419             if (out >= buffer.length) {
420                 out = 0;
421             }
422             if (in == out) {
423                 /* now empty */
424                 in = -1;
425             }
426         }
427         return rlen;
428     }
429 
430     /**
431      * Returns the number of bytes that can be read from this input
432      * stream without blocking.
433      *
434      * @return the number of bytes that can be read from this input stream
435      *         without blocking, or {@code 0} if this input stream has been
436      *         closed by invoking its {@link #close()} method, or if the pipe
437      *         is {@link #connect(java.io.PipedOutputStream) unconnected}, or
438      *          <a href=#BROKEN> <code>broken</code></a>.
439      *
440      * @exception  IOException  if an I/O error occurs.
441      * @since   JDK1.0.2
442      */
available()443     public synchronized int available() throws IOException {
444         if(in < 0)
445             return 0;
446         else if(in == out)
447             return buffer.length;
448         else if (in > out)
449             return in - out;
450         else
451             return in + buffer.length - out;
452     }
453 
454     /**
455      * Closes this piped input stream and releases any system resources
456      * associated with the stream.
457      *
458      * @exception  IOException  if an I/O error occurs.
459      */
close()460     public void close()  throws IOException {
461         closedByReader = true;
462         synchronized (this) {
463             in = -1;
464         }
465     }
466 }
467