1 /* 2 * Copyright (C) 2014 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package dexfuzz; 18 19 import java.io.BufferedReader; 20 import java.io.InputStream; 21 import java.io.InputStreamReader; 22 import java.io.IOException; 23 import java.util.ArrayList; 24 import java.util.List; 25 import java.util.concurrent.Semaphore; 26 27 /** 28 * process.waitFor() can block if its output buffers are not drained. 29 * These threads are used to keep the buffers drained, and provide the final 30 * output once the command has finished executing. Each Executor has its own 31 * output and error StreamConsumers. 32 */ 33 public class StreamConsumer extends Thread { 34 private List<String> output; 35 private BufferedReader reader; 36 37 private State state; 38 39 private Semaphore workToBeDone; 40 private Semaphore outputIsReady; 41 42 enum State { 43 WAITING, 44 CONSUMING, 45 SHOULD_STOP_CONSUMING, 46 FINISHED, 47 ERROR 48 } 49 50 /** 51 * Create a StreamConsumer, will be immediately ready to start consuming. 52 */ StreamConsumer()53 public StreamConsumer() { 54 output = new ArrayList<String>(); 55 workToBeDone = new Semaphore(0); 56 outputIsReady = new Semaphore(0); 57 58 state = State.WAITING; 59 } 60 61 /** 62 * Executor should call this to provide its StreamConsumers with the Streams 63 * for a Process it is about to call waitFor() on. 64 */ giveStreamAndStartConsuming(InputStream stream)65 public void giveStreamAndStartConsuming(InputStream stream) { 66 output.clear(); 67 68 reader = new BufferedReader(new InputStreamReader(stream)); 69 70 changeState(State.CONSUMING, State.WAITING); 71 72 // Tell consumer there is work to be done. 73 workToBeDone.release(); 74 } 75 76 /** 77 * Executor should call this once its call to waitFor() returns. 78 */ processFinished()79 public void processFinished() { 80 changeState(State.SHOULD_STOP_CONSUMING, State.CONSUMING); 81 } 82 83 /** 84 * Executor should call this to get the captured output of this StreamConsumer. 85 */ getOutput()86 public List<String> getOutput() { 87 88 try { 89 // Wait until the output is ready. 90 outputIsReady.acquire(); 91 } catch (InterruptedException e) { 92 Log.error("Client of StreamConsumer was interrupted while waiting for output?"); 93 return null; 94 } 95 96 // Take a copy of the Strings, so when we call output.clear(), we don't 97 // clear the ExecutionResult's list. 98 List<String> copy = new ArrayList<String>(output); 99 return copy; 100 } 101 102 /** 103 * Executor should call this when we're shutting down. 104 */ shutdown()105 public void shutdown() { 106 changeState(State.FINISHED, State.WAITING); 107 108 // Tell Consumer there is work to be done (it will check first if FINISHED has been set.) 109 workToBeDone.release(); 110 } 111 consume()112 private void consume() { 113 try { 114 115 if (checkState(State.SHOULD_STOP_CONSUMING)) { 116 // Caller already called processFinished() before we even started 117 // consuming. Just get what we can and finish. 118 while (reader.ready()) { 119 output.add(reader.readLine()); 120 } 121 } else { 122 // Caller's process is still executing, so just loop and consume. 123 while (checkState(State.CONSUMING)) { 124 Thread.sleep(50); 125 while (reader.ready()) { 126 output.add(reader.readLine()); 127 } 128 } 129 } 130 131 if (checkState(State.SHOULD_STOP_CONSUMING)) { 132 changeState(State.WAITING, State.SHOULD_STOP_CONSUMING); 133 } else { 134 Log.error("StreamConsumer stopped consuming, but was not told to?"); 135 setErrorState(); 136 } 137 138 reader.close(); 139 140 } catch (IOException e) { 141 Log.error("StreamConsumer caught IOException while consuming"); 142 setErrorState(); 143 } catch (InterruptedException e) { 144 Log.error("StreamConsumer caught InterruptedException while consuming"); 145 setErrorState(); 146 } 147 148 // Tell client of Consumer that the output is ready. 149 outputIsReady.release(); 150 } 151 152 @Override run()153 public void run() { 154 while (checkState(State.WAITING)) { 155 try { 156 // Wait until there is work to be done 157 workToBeDone.acquire(); 158 } catch (InterruptedException e) { 159 Log.error("StreamConsumer caught InterruptedException while waiting for work"); 160 setErrorState(); 161 break; 162 } 163 164 // Check first if we're done 165 if (checkState(State.FINISHED)) { 166 break; 167 } 168 169 // Make sure we're either supposed to be consuming 170 // or supposed to be finishing up consuming 171 if (!(checkState(State.CONSUMING) || checkState(State.SHOULD_STOP_CONSUMING))) { 172 Log.error("invalid state: StreamConsumer told about work, but not CONSUMING?"); 173 Log.error("state was: " + getCurrentState()); 174 setErrorState(); 175 break; 176 } 177 178 consume(); 179 } 180 } 181 checkState(State expectedState)182 private synchronized boolean checkState(State expectedState) { 183 return (expectedState == state); 184 } 185 changeState(State newState, State previousState)186 private synchronized void changeState(State newState, State previousState) { 187 if (state != previousState) { 188 Log.error("StreamConsumer Unexpected state: " + state + ", expected " + previousState); 189 state = State.ERROR; 190 } else { 191 state = newState; 192 } 193 } 194 setErrorState()195 private synchronized void setErrorState() { 196 state = State.ERROR; 197 } 198 getCurrentState()199 private synchronized State getCurrentState() { 200 return state; 201 } 202 } 203