• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2014 The Android Open Source Project
3  * Copyright (c) 1995, 2013, Oracle and/or its affiliates. All rights reserved.
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This code is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License version 2 only, as
8  * published by the Free Software Foundation.  Oracle designates this
9  * particular file as subject to the "Classpath" exception as provided
10  * by Oracle in the LICENSE file that accompanied this code.
11  *
12  * This code is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15  * version 2 for more details (a copy is included in the LICENSE file that
16  * accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License version
19  * 2 along with this work; if not, write to the Free Software Foundation,
20  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
21  *
22  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
23  * or visit www.oracle.com if you need additional information or have any
24  * questions.
25  */
26 
27 package java.io;
28 
29 import libcore.io.IoUtils;
30 
31 /**
32  * A piped input stream should be connected
33  * to a piped output stream; the piped  input
34  * stream then provides whatever data bytes
35  * are written to the piped output  stream.
36  * Typically, data is read from a <code>PipedInputStream</code>
37  * object by one thread  and data is written
38  * to the corresponding <code>PipedOutputStream</code>
39  * by some  other thread. Attempting to use
40  * both objects from a single thread is not
41  * recommended, as it may deadlock the thread.
42  * The piped input stream contains a buffer,
43  * decoupling read operations from write operations,
44  * within limits.
45  * A pipe is said to be <a name="BROKEN"> <i>broken</i> </a> if a
46  * thread that was providing data bytes to the connected
47  * piped output stream is no longer alive.
48  *
49  * @author  James Gosling
50  * @see     java.io.PipedOutputStream
51  * @since   JDK1.0
52  */
53 public class PipedInputStream extends InputStream {
54     boolean closedByWriter = false;
55     volatile boolean closedByReader = false;
56     boolean connected = false;
57 
58         /* REMIND: identification of the read and write sides needs to be
59            more sophisticated.  Either using thread groups (but what about
60            pipes within a thread?) or using finalization (but it may be a
61            long time until the next GC). */
62     Thread readSide;
63     Thread writeSide;
64 
65     private static final int DEFAULT_PIPE_SIZE = 1024;
66 
67     /**
68      * The default size of the pipe's circular input buffer.
69      * @since   JDK1.1
70      */
71     // This used to be a constant before the pipe size was allowed
72     // to change. This field will continue to be maintained
73     // for backward compatibility.
74     protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
75 
76     /**
77      * The circular buffer into which incoming data is placed.
78      * @since   JDK1.1
79      */
80     protected byte buffer[];
81 
82     /**
83      * The index of the position in the circular buffer at which the
84      * next byte of data will be stored when received from the connected
85      * piped output stream. <code>in&lt;0</code> implies the buffer is empty,
86      * <code>in==out</code> implies the buffer is full
87      * @since   JDK1.1
88      */
89     protected int in = -1;
90 
91     /**
92      * The index of the position in the circular buffer at which the next
93      * byte of data will be read by this piped input stream.
94      * @since   JDK1.1
95      */
96     protected int out = 0;
97 
98     /**
99      * Creates a <code>PipedInputStream</code> so
100      * that it is connected to the piped output
101      * stream <code>src</code>. Data bytes written
102      * to <code>src</code> will then be  available
103      * as input from this stream.
104      *
105      * @param      src   the stream to connect to.
106      * @exception  IOException  if an I/O error occurs.
107      */
PipedInputStream(PipedOutputStream src)108     public PipedInputStream(PipedOutputStream src) throws IOException {
109         this(src, DEFAULT_PIPE_SIZE);
110     }
111 
112     /**
113      * Creates a <code>PipedInputStream</code> so that it is
114      * connected to the piped output stream
115      * <code>src</code> and uses the specified pipe size for
116      * the pipe's buffer.
117      * Data bytes written to <code>src</code> will then
118      * be available as input from this stream.
119      *
120      * @param      src   the stream to connect to.
121      * @param      pipeSize the size of the pipe's buffer.
122      * @exception  IOException  if an I/O error occurs.
123      * @exception  IllegalArgumentException if {@code pipeSize <= 0}.
124      * @since      1.6
125      */
PipedInputStream(PipedOutputStream src, int pipeSize)126     public PipedInputStream(PipedOutputStream src, int pipeSize)
127             throws IOException {
128          initPipe(pipeSize);
129          connect(src);
130     }
131 
132     /**
133      * Creates a <code>PipedInputStream</code> so
134      * that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
135      * connected}.
136      * It must be {@linkplain java.io.PipedOutputStream#connect(
137      * java.io.PipedInputStream) connected} to a
138      * <code>PipedOutputStream</code> before being used.
139      */
PipedInputStream()140     public PipedInputStream() {
141         initPipe(DEFAULT_PIPE_SIZE);
142     }
143 
144     /**
145      * Creates a <code>PipedInputStream</code> so that it is not yet
146      * {@linkplain #connect(java.io.PipedOutputStream) connected} and
147      * uses the specified pipe size for the pipe's buffer.
148      * It must be {@linkplain java.io.PipedOutputStream#connect(
149      * java.io.PipedInputStream)
150      * connected} to a <code>PipedOutputStream</code> before being used.
151      *
152      * @param      pipeSize the size of the pipe's buffer.
153      * @exception  IllegalArgumentException if {@code pipeSize <= 0}.
154      * @since      1.6
155      */
PipedInputStream(int pipeSize)156     public PipedInputStream(int pipeSize) {
157         initPipe(pipeSize);
158     }
159 
initPipe(int pipeSize)160     private void initPipe(int pipeSize) {
161          if (pipeSize <= 0) {
162             throw new IllegalArgumentException("Pipe Size <= 0");
163          }
164          buffer = new byte[pipeSize];
165     }
166 
167     /**
168      * Causes this piped input stream to be connected
169      * to the piped  output stream <code>src</code>.
170      * If this object is already connected to some
171      * other piped output  stream, an <code>IOException</code>
172      * is thrown.
173      * <p>
174      * If <code>src</code> is an
175      * unconnected piped output stream and <code>snk</code>
176      * is an unconnected piped input stream, they
177      * may be connected by either the call:
178      *
179      * <pre><code>snk.connect(src)</code> </pre>
180      * <p>
181      * or the call:
182      *
183      * <pre><code>src.connect(snk)</code> </pre>
184      * <p>
185      * The two calls have the same effect.
186      *
187      * @param      src   The piped output stream to connect to.
188      * @exception  IOException  if an I/O error occurs.
189      */
connect(PipedOutputStream src)190     public void connect(PipedOutputStream src) throws IOException {
191         src.connect(this);
192     }
193 
194     /**
195      * Receives a byte of data.  This method will block if no input is
196      * available.
197      * @param b the byte being received
198      * @exception IOException If the pipe is <a href="#BROKEN"> <code>broken</code></a>,
199      *          {@link #connect(java.io.PipedOutputStream) unconnected},
200      *          closed, or if an I/O error occurs.
201      * @since     JDK1.1
202      */
receive(int b)203     protected synchronized void receive(int b) throws IOException {
204         checkStateForReceive();
205         writeSide = Thread.currentThread();
206         if (in == out)
207             awaitSpace();
208         if (in < 0) {
209             in = 0;
210             out = 0;
211         }
212         buffer[in++] = (byte)(b & 0xFF);
213         if (in >= buffer.length) {
214             in = 0;
215         }
216     }
217 
218     /**
219      * Receives data into an array of bytes.  This method will
220      * block until some input is available.
221      * @param b the buffer into which the data is received
222      * @param off the start offset of the data
223      * @param len the maximum number of bytes received
224      * @exception IOException If the pipe is <a href="#BROKEN"> broken</a>,
225      *           {@link #connect(java.io.PipedOutputStream) unconnected},
226      *           closed,or if an I/O error occurs.
227      */
receive(byte b[], int off, int len)228     synchronized void receive(byte b[], int off, int len)  throws IOException {
229         checkStateForReceive();
230         writeSide = Thread.currentThread();
231         int bytesToTransfer = len;
232         while (bytesToTransfer > 0) {
233             if (in == out)
234                 awaitSpace();
235             int nextTransferAmount = 0;
236             if (out < in) {
237                 nextTransferAmount = buffer.length - in;
238             } else if (in < out) {
239                 if (in == -1) {
240                     in = out = 0;
241                     nextTransferAmount = buffer.length - in;
242                 } else {
243                     nextTransferAmount = out - in;
244                 }
245             }
246             if (nextTransferAmount > bytesToTransfer)
247                 nextTransferAmount = bytesToTransfer;
248             assert(nextTransferAmount > 0);
249             System.arraycopy(b, off, buffer, in, nextTransferAmount);
250             bytesToTransfer -= nextTransferAmount;
251             off += nextTransferAmount;
252             in += nextTransferAmount;
253             if (in >= buffer.length) {
254                 in = 0;
255             }
256         }
257     }
258 
checkStateForReceive()259     private void checkStateForReceive() throws IOException {
260         if (!connected) {
261             throw new IOException("Pipe not connected");
262         } else if (closedByWriter || closedByReader) {
263             throw new IOException("Pipe closed");
264         } else if (readSide != null && !readSide.isAlive()) {
265             throw new IOException("Read end dead");
266         }
267     }
268 
awaitSpace()269     private void awaitSpace() throws IOException {
270         while (in == out) {
271             checkStateForReceive();
272 
273             /* full: kick any waiting readers */
274             notifyAll();
275             try {
276                 wait(1000);
277             } catch (InterruptedException ex) {
278                 // Android-changed: re-set the thread's interrupt status
279                 // throw new java.io.InterruptedIOException();
280                 IoUtils.throwInterruptedIoException();
281             }
282         }
283     }
284 
285     /**
286      * Notifies all waiting threads that the last byte of data has been
287      * received.
288      */
receivedLast()289     synchronized void receivedLast() {
290         closedByWriter = true;
291         notifyAll();
292     }
293 
294     /**
295      * Reads the next byte of data from this piped input stream. The
296      * value byte is returned as an <code>int</code> in the range
297      * <code>0</code> to <code>255</code>.
298      * This method blocks until input data is available, the end of the
299      * stream is detected, or an exception is thrown.
300      *
301      * @return     the next byte of data, or <code>-1</code> if the end of the
302      *             stream is reached.
303      * @exception  IOException  if the pipe is
304      *           {@link #connect(java.io.PipedOutputStream) unconnected},
305      *           <a href="#BROKEN"> <code>broken</code></a>, closed,
306      *           or if an I/O error occurs.
307      */
read()308     public synchronized int read()  throws IOException {
309         if (!connected) {
310             throw new IOException("Pipe not connected");
311         } else if (closedByReader) {
312             throw new IOException("Pipe closed");
313         } else if (writeSide != null && !writeSide.isAlive()
314                    && !closedByWriter && (in < 0)) {
315             throw new IOException("Write end dead");
316         }
317 
318         readSide = Thread.currentThread();
319         int trials = 2;
320         while (in < 0) {
321             if (closedByWriter) {
322                 /* closed by writer, return EOF */
323                 return -1;
324             }
325             if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
326                 throw new IOException("Pipe broken");
327             }
328             /* might be a writer waiting */
329             notifyAll();
330             try {
331                 wait(1000);
332             } catch (InterruptedException ex) {
333                 // Android-changed: re-set the thread's interrupt status
334                 // throw new java.io.InterruptedIOException();
335                 IoUtils.throwInterruptedIoException();
336             }
337         }
338         int ret = buffer[out++] & 0xFF;
339         if (out >= buffer.length) {
340             out = 0;
341         }
342         if (in == out) {
343             /* now empty */
344             in = -1;
345         }
346 
347         return ret;
348     }
349 
350     /**
351      * Reads up to <code>len</code> bytes of data from this piped input
352      * stream into an array of bytes. Less than <code>len</code> bytes
353      * will be read if the end of the data stream is reached or if
354      * <code>len</code> exceeds the pipe's buffer size.
355      * If <code>len </code> is zero, then no bytes are read and 0 is returned;
356      * otherwise, the method blocks until at least 1 byte of input is
357      * available, end of the stream has been detected, or an exception is
358      * thrown.
359      *
360      * @param      b     the buffer into which the data is read.
361      * @param      off   the start offset in the destination array <code>b</code>
362      * @param      len   the maximum number of bytes read.
363      * @return     the total number of bytes read into the buffer, or
364      *             <code>-1</code> if there is no more data because the end of
365      *             the stream has been reached.
366      * @exception  NullPointerException If <code>b</code> is <code>null</code>.
367      * @exception  IndexOutOfBoundsException If <code>off</code> is negative,
368      * <code>len</code> is negative, or <code>len</code> is greater than
369      * <code>b.length - off</code>
370      * @exception  IOException if the pipe is <a href="#BROKEN"> <code>broken</code></a>,
371      *           {@link #connect(java.io.PipedOutputStream) unconnected},
372      *           closed, or if an I/O error occurs.
373      */
read(byte b[], int off, int len)374     public synchronized int read(byte b[], int off, int len)  throws IOException {
375         if (b == null) {
376             throw new NullPointerException();
377         } else if (off < 0 || len < 0 || len > b.length - off) {
378             throw new IndexOutOfBoundsException();
379         } else if (len == 0) {
380             return 0;
381         }
382 
383         /* possibly wait on the first character */
384         int c = read();
385         if (c < 0) {
386             return -1;
387         }
388         b[off] = (byte) c;
389         int rlen = 1;
390         while ((in >= 0) && (len > 1)) {
391 
392             int available;
393 
394             if (in > out) {
395                 available = Math.min((buffer.length - out), (in - out));
396             } else {
397                 available = buffer.length - out;
398             }
399 
400             // A byte is read beforehand outside the loop
401             if (available > (len - 1)) {
402                 available = len - 1;
403             }
404             System.arraycopy(buffer, out, b, off + rlen, available);
405             out += available;
406             rlen += available;
407             len -= available;
408 
409             if (out >= buffer.length) {
410                 out = 0;
411             }
412             if (in == out) {
413                 /* now empty */
414                 in = -1;
415             }
416         }
417         return rlen;
418     }
419 
420     /**
421      * Returns the number of bytes that can be read from this input
422      * stream without blocking.
423      *
424      * @return the number of bytes that can be read from this input stream
425      *         without blocking, or {@code 0} if this input stream has been
426      *         closed by invoking its {@link #close()} method, or if the pipe
427      *         is {@link #connect(java.io.PipedOutputStream) unconnected}, or
428      *          <a href="#BROKEN"> <code>broken</code></a>.
429      *
430      * @exception  IOException  if an I/O error occurs.
431      * @since   JDK1.0.2
432      */
available()433     public synchronized int available() throws IOException {
434         if(in < 0)
435             return 0;
436         else if(in == out)
437             return buffer.length;
438         else if (in > out)
439             return in - out;
440         else
441             return in + buffer.length - out;
442     }
443 
444     /**
445      * Closes this piped input stream and releases any system resources
446      * associated with the stream.
447      *
448      * @exception  IOException  if an I/O error occurs.
449      */
close()450     public void close()  throws IOException {
451         closedByReader = true;
452         synchronized (this) {
453             in = -1;
454         }
455     }
456 }
457