• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2006-2011 Christian Plattner. All rights reserved.
3  * Please refer to the LICENSE.txt for licensing details.
4  */
5 package ch.ethz.ssh2;
6 
7 import java.io.IOException;
8 import java.io.InputStream;
9 
10 /**
11  * A <code>StreamGobbler</code> is an InputStream that uses an internal worker
12  * thread to constantly consume input from another InputStream. It uses a buffer
13  * to store the consumed data. The buffer size is automatically adjusted, if needed.
14  * <p/>
15  * This class is sometimes very convenient - if you wrap a session's STDOUT and STDERR
16  * InputStreams with instances of this class, then you don't have to bother about
17  * the shared window of STDOUT and STDERR in the low level SSH-2 protocol,
18  * since all arriving data will be immediatelly consumed by the worker threads.
19  * Also, as a side effect, the streams will be buffered (e.g., single byte
20  * read() operations are faster).
21  * <p/>
22  * Other SSH for Java libraries include this functionality by default in
23  * their STDOUT and STDERR InputStream implementations, however, please be aware
24  * that this approach has also a downside:
25  * <p/>
26  * If you do not call the StreamGobbler's <code>read()</code> method often enough
27  * and the peer is constantly sending huge amounts of data, then you will sooner or later
28  * encounter a low memory situation due to the aggregated data (well, it also depends on the Java heap size).
29  * Joe Average will like this class anyway - a paranoid programmer would never use such an approach.
30  * <p/>
31  * The term "StreamGobbler" was taken from an article called "When Runtime.exec() won't",
32  * see http://www.javaworld.com/javaworld/jw-12-2000/jw-1229-traps.html.
33  *
34  * @author Christian Plattner
35  * @version 2.50, 03/15/10
36  */
37 
38 public class StreamGobbler extends InputStream
39 {
40 	class GobblerThread extends Thread
41 	{
42 		@Override
run()43 		public void run()
44 		{
45 			byte[] buff = new byte[8192];
46 
47 			while (true)
48 			{
49 				try
50 				{
51 					int avail = is.read(buff);
52 
53 					synchronized (synchronizer)
54 					{
55 						if (avail <= 0)
56 						{
57 							isEOF = true;
58 							synchronizer.notifyAll();
59 							break;
60 						}
61 
62 						int space_available = buffer.length - write_pos;
63 
64 						if (space_available < avail)
65 						{
66 							/* compact/resize buffer */
67 
68 							int unread_size = write_pos - read_pos;
69 							int need_space = unread_size + avail;
70 
71 							byte[] new_buffer = buffer;
72 
73 							if (need_space > buffer.length)
74 							{
75 								int inc = need_space / 3;
76 								inc = (inc < 256) ? 256 : inc;
77 								inc = (inc > 8192) ? 8192 : inc;
78 								new_buffer = new byte[need_space + inc];
79 							}
80 
81 							if (unread_size > 0)
82 								System.arraycopy(buffer, read_pos, new_buffer, 0, unread_size);
83 
84 							buffer = new_buffer;
85 
86 							read_pos = 0;
87 							write_pos = unread_size;
88 						}
89 
90 						System.arraycopy(buff, 0, buffer, write_pos, avail);
91 						write_pos += avail;
92 
93 						synchronizer.notifyAll();
94 					}
95 				}
96 				catch (IOException e)
97 				{
98 					synchronized (synchronizer)
99 					{
100 						exception = e;
101 						synchronizer.notifyAll();
102 						break;
103 					}
104 				}
105 			}
106 		}
107 	}
108 
109 	private InputStream is;
110 
111 	private final Object synchronizer = new Object();
112 
113 	private boolean isEOF = false;
114 	private boolean isClosed = false;
115 	private IOException exception = null;
116 
117 	private byte[] buffer = new byte[2048];
118 	private int read_pos = 0;
119 	private int write_pos = 0;
120 
StreamGobbler(InputStream is)121 	public StreamGobbler(InputStream is)
122 	{
123 		this.is = is;
124 		GobblerThread t = new GobblerThread();
125 		t.setDaemon(true);
126 		t.start();
127 	}
128 
129 	@Override
read()130 	public int read() throws IOException
131 	{
132 		boolean wasInterrupted = false;
133 
134 		try
135 		{
136 			synchronized (synchronizer)
137 			{
138 				if (isClosed)
139 					throw new IOException("This StreamGobbler is closed.");
140 
141 				while (read_pos == write_pos)
142 				{
143 					if (exception != null)
144 						throw exception;
145 
146 					if (isEOF)
147 						return -1;
148 
149 					try
150 					{
151 						synchronizer.wait();
152 					}
153 					catch (InterruptedException e)
154 					{
155 						wasInterrupted = true;
156 					}
157 				}
158 				return buffer[read_pos++] & 0xff;
159 			}
160 		}
161 		finally
162 		{
163 			if (wasInterrupted)
164 				Thread.currentThread().interrupt();
165 		}
166 	}
167 
168 	@Override
available()169 	public int available() throws IOException
170 	{
171 		synchronized (synchronizer)
172 		{
173 			if (isClosed)
174 				throw new IOException("This StreamGobbler is closed.");
175 
176 			return write_pos - read_pos;
177 		}
178 	}
179 
180 	@Override
read(byte[] b)181 	public int read(byte[] b) throws IOException
182 	{
183 		return read(b, 0, b.length);
184 	}
185 
186 	@Override
close()187 	public void close() throws IOException
188 	{
189 		synchronized (synchronizer)
190 		{
191 			if (isClosed)
192 				return;
193 			isClosed = true;
194 			isEOF = true;
195 			synchronizer.notifyAll();
196 			is.close();
197 		}
198 	}
199 
200 	@Override
read(byte[] b, int off, int len)201 	public int read(byte[] b, int off, int len) throws IOException
202 	{
203 		if (b == null)
204 			throw new NullPointerException();
205 
206 		if ((off < 0) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0) || (off > b.length))
207 			throw new IndexOutOfBoundsException();
208 
209 		if (len == 0)
210 			return 0;
211 
212 		boolean wasInterrupted = false;
213 
214 		try
215 		{
216 			synchronized (synchronizer)
217 			{
218 				if (isClosed)
219 					throw new IOException("This StreamGobbler is closed.");
220 
221 				while (read_pos == write_pos)
222 				{
223 					if (exception != null)
224 						throw exception;
225 
226 					if (isEOF)
227 						return -1;
228 
229 					try
230 					{
231 						synchronizer.wait();
232 					}
233 					catch (InterruptedException e)
234 					{
235 						wasInterrupted = true;
236 					}
237 				}
238 
239 				int avail = write_pos - read_pos;
240 
241 				avail = (avail > len) ? len : avail;
242 
243 				System.arraycopy(buffer, read_pos, b, off, avail);
244 
245 				read_pos += avail;
246 
247 				return avail;
248 			}
249 		}
250 		finally
251 		{
252 			if (wasInterrupted)
253 				Thread.currentThread().interrupt();
254 		}
255 	}
256 }
257