1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.trilead.ssh2; 18 19 import com.googlecode.android_scripting.Log; 20 21 import java.io.File; 22 import java.io.FileOutputStream; 23 import java.io.IOException; 24 import java.io.InputStream; 25 26 /** 27 * A <code>StreamGobbler</code> is an InputStream that uses an internal worker thread to constantly 28 * consume input from another InputStream. It uses a buffer to store the consumed data. The buffer 29 * size is automatically adjusted, if needed. 30 * <p> 31 * This class is sometimes very convenient - if you wrap a session's STDOUT and STDERR InputStreams 32 * with instances of this class, then you don't have to bother about the shared window of STDOUT and 33 * STDERR in the low level SSH-2 protocol, since all arriving data will be immediatelly consumed by 34 * the worker threads. Also, as a side effect, the streams will be buffered (e.g., single byte 35 * read() operations are faster). 36 * <p> 37 * Other SSH for Java libraries include this functionality by default in their STDOUT and STDERR 38 * InputStream implementations, however, please be aware that this approach has also a downside: 39 * <p> 40 * If you do not call the StreamGobbler's <code>read()</code> method often enough and the peer is 41 * constantly sending huge amounts of data, then you will sooner or later encounter a low memory 42 * situation due to the aggregated data (well, it also depends on the Java heap size). Joe Average 43 * will like this class anyway - a paranoid programmer would never use such an approach. 44 * <p> 45 * The term "StreamGobbler" was taken from an article called "When Runtime.exec() won't", see 46 * http://www.javaworld.com/javaworld/jw-12-2000/jw-1229-traps.html. 47 * 48 * @version $Id: StreamGobbler.java,v 1.1 2007/10/15 12:49:56 cplattne Exp $ 49 */ 50 51 public class StreamGobbler extends InputStream { 52 class GobblerThread extends Thread { 53 @Override run()54 public void run() { 55 56 while (true) { 57 try { 58 byte[] saveBuffer = null; 59 60 int avail = is.read(buffer, write_pos, buffer.length - write_pos); 61 62 synchronized (synchronizer) { 63 if (avail <= 0) { 64 isEOF = true; 65 synchronizer.notifyAll(); 66 break; 67 } 68 write_pos += avail; 69 70 int space_available = buffer.length - write_pos; 71 72 if (space_available == 0) { 73 if (read_pos > 0) { 74 saveBuffer = new byte[read_pos]; 75 System.arraycopy(buffer, 0, saveBuffer, 0, read_pos); 76 System.arraycopy(buffer, read_pos, buffer, 0, buffer.length - read_pos); 77 write_pos -= read_pos; 78 read_pos = 0; 79 } else { 80 write_pos = 0; 81 saveBuffer = buffer; 82 } 83 } 84 85 synchronizer.notifyAll(); 86 } 87 88 writeToFile(saveBuffer); 89 90 } catch (IOException e) { 91 synchronized (synchronizer) { 92 exception = e; 93 synchronizer.notifyAll(); 94 break; 95 } 96 } 97 } 98 } 99 } 100 101 private InputStream is; 102 private GobblerThread t; 103 104 private Object synchronizer = new Object(); 105 106 private boolean isEOF = false; 107 private boolean isClosed = false; 108 private IOException exception = null; 109 110 private byte[] buffer; 111 private int read_pos = 0; 112 private int write_pos = 0; 113 private final FileOutputStream mLogStream; 114 private final int mBufferSize; 115 StreamGobbler(InputStream is, File log, int buffer_size)116 public StreamGobbler(InputStream is, File log, int buffer_size) { 117 this.is = is; 118 mBufferSize = buffer_size; 119 FileOutputStream out = null; 120 try { 121 out = new FileOutputStream(log, false); 122 } catch (IOException e) { 123 Log.e(e); 124 } 125 mLogStream = out; 126 buffer = new byte[mBufferSize]; 127 t = new GobblerThread(); 128 t.setDaemon(true); 129 t.start(); 130 } 131 writeToFile(byte[] buffer)132 public void writeToFile(byte[] buffer) { 133 if (mLogStream != null && buffer != null) { 134 try { 135 mLogStream.write(buffer); 136 } catch (IOException e) { 137 Log.e(e); 138 } 139 } 140 } 141 142 @Override read()143 public int read() throws IOException { 144 synchronized (synchronizer) { 145 if (isClosed) { 146 throw new IOException("This StreamGobbler is closed."); 147 } 148 149 while (read_pos == write_pos) { 150 if (exception != null) { 151 throw exception; 152 } 153 154 if (isEOF) { 155 return -1; 156 } 157 158 try { 159 synchronizer.wait(); 160 } catch (InterruptedException e) { 161 } 162 } 163 164 int b = buffer[read_pos++] & 0xff; 165 166 return b; 167 } 168 } 169 170 @Override available()171 public int available() throws IOException { 172 synchronized (synchronizer) { 173 if (isClosed) { 174 throw new IOException("This StreamGobbler is closed."); 175 } 176 177 return write_pos - read_pos; 178 } 179 } 180 181 @Override read(byte[] b)182 public int read(byte[] b) throws IOException { 183 return read(b, 0, b.length); 184 } 185 186 @Override close()187 public void close() throws IOException { 188 synchronized (synchronizer) { 189 if (isClosed) { 190 return; 191 } 192 isClosed = true; 193 isEOF = true; 194 synchronizer.notifyAll(); 195 is.close(); 196 } 197 } 198 199 @Override read(byte[] b, int off, int len)200 public int read(byte[] b, int off, int len) throws IOException { 201 if (b == null) { 202 throw new NullPointerException(); 203 } 204 205 if ((off < 0) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0) || (off > b.length)) { 206 throw new IndexOutOfBoundsException(); 207 } 208 209 if (len == 0) { 210 return 0; 211 } 212 213 synchronized (synchronizer) { 214 if (isClosed) { 215 throw new IOException("This StreamGobbler is closed."); 216 } 217 218 while (read_pos == write_pos) { 219 if (exception != null) { 220 throw exception; 221 } 222 223 if (isEOF) { 224 return -1; 225 } 226 227 try { 228 synchronizer.wait(); 229 } catch (InterruptedException e) { 230 } 231 } 232 233 int avail = write_pos - read_pos; 234 235 avail = (avail > len) ? len : avail; 236 237 System.arraycopy(buffer, read_pos, b, off, avail); 238 239 read_pos += avail; 240 241 return avail; 242 } 243 } 244 } 245