1 /* 2 * Copyright (C) 2013 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.caliper.runner; 16 17 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.base.Preconditions.checkState; 19 20 import com.google.caliper.bridge.LogMessage; 21 import com.google.caliper.bridge.OpenedSocket; 22 import com.google.caliper.bridge.StopMeasurementLogMessage; 23 import com.google.caliper.model.Measurement; 24 import com.google.caliper.runner.StreamService.StreamItem.Kind; 25 import com.google.caliper.util.Parser; 26 import com.google.common.base.MoreObjects; 27 import com.google.common.base.MoreObjects.ToStringHelper; 28 import com.google.common.collect.Queues; 29 import com.google.common.io.Closeables; 30 import com.google.common.io.LineReader; 31 import com.google.common.util.concurrent.AbstractService; 32 import com.google.common.util.concurrent.ListenableFuture; 33 import com.google.common.util.concurrent.ListeningExecutorService; 34 import com.google.common.util.concurrent.MoreExecutors; 35 import com.google.common.util.concurrent.Service; // for javadoc 36 import com.google.common.util.concurrent.Service.State; // for javadoc 37 import com.google.common.util.concurrent.ThreadFactoryBuilder; 38 import com.google.common.util.concurrent.Uninterruptibles; 39 40 import java.io.IOException; 41 import java.io.InputStreamReader; 42 import java.io.Reader; 43 import java.io.Serializable; 44 import java.nio.charset.Charset; 45 import java.text.ParseException; 46 import java.util.concurrent.BlockingQueue; 47 import java.util.concurrent.Callable; 48 import java.util.concurrent.ExecutionException; 49 import java.util.concurrent.Executors; 50 import java.util.concurrent.TimeUnit; 51 import java.util.concurrent.atomic.AtomicInteger; 52 import java.util.logging.Logger; 53 54 import javax.annotation.Nullable; 55 import javax.inject.Inject; 56 57 /** 58 * A {@link Service} that establishes a connection over a socket to a process and then allows 59 * multiplexed access to the processes' line oriented output over the socket and the standard 60 * process streams (stdout and stderr) as well as allowing data to be written over the socket. 61 * 62 * <p>The {@linkplain State states} of this service are as follows: 63 * <ul> 64 * <li>{@linkplain State#NEW NEW} : Idle state, no reading or writing is allowed. 65 * <li>{@linkplain State#STARTING STARTING} : Streams are being opened 66 * <li>{@linkplain State#RUNNING RUNNING} : At least one stream is still open or the writer has 67 * not been closed yet. 68 * <li>{@linkplain State#STOPPING STOPPING} : All streams have closed but some threads may still 69 * be running. 70 * <li>{@linkplain State#TERMINATED TERMINATED} : Idle state, all streams are closed 71 * <li>{@linkplain State#FAILED FAILED} : The service will transition to failed if it encounters 72 * any errors while reading from or writing to the streams, service failure will also cause 73 * the worker process to be forcibly shutdown and {@link #readItem(long, TimeUnit)}, 74 * {@link #closeWriter()} and {@link #sendMessage(Serializable)} will start throwing 75 * IllegalStateExceptions. 76 * </ul> 77 */ 78 @TrialScoped final class StreamService extends AbstractService { 79 /** How long to wait for a process that should be exiting to actually exit. */ 80 private static final int SHUTDOWN_WAIT_MILLIS = 10; 81 82 private static final Logger logger = Logger.getLogger(StreamService.class.getName()); 83 private static final StreamItem TIMEOUT_ITEM = new StreamItem(Kind.TIMEOUT, null); 84 85 /** The final item that will be sent down the stream. */ 86 static final StreamItem EOF_ITEM = new StreamItem(Kind.EOF, null); 87 88 private final ListeningExecutorService streamExecutor = MoreExecutors.listeningDecorator( 89 Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build())); 90 private final BlockingQueue<StreamItem> outputQueue = Queues.newLinkedBlockingQueue(); 91 private final WorkerProcess worker; 92 private volatile Process process; 93 private final Parser<LogMessage> logMessageParser; 94 private final TrialOutputLogger trialOutput; 95 96 /** 97 * This represents the number of open streams from the users perspective. i.e. can you still 98 * write to the socket and read items. 99 * 100 * <p>This is decremented when either the socket is closed for writing or the EOF_ITEM has been 101 * read by the user. 102 */ 103 private final AtomicInteger openStreams = new AtomicInteger(); 104 105 /** 106 * Used to track how many read streams are open so we can correctly set the EOF_ITEM onto the 107 * queue. 108 */ 109 private final AtomicInteger runningReadStreams = new AtomicInteger(); 110 private OpenedSocket.Writer socketWriter; 111 StreamService(WorkerProcess worker, Parser<LogMessage> logMessageParser, TrialOutputLogger trialOutput)112 @Inject StreamService(WorkerProcess worker, 113 Parser<LogMessage> logMessageParser, 114 TrialOutputLogger trialOutput) { 115 this.worker = worker; 116 this.logMessageParser = logMessageParser; 117 this.trialOutput = trialOutput; 118 } 119 doStart()120 @Override protected void doStart() { 121 try { 122 // TODO(lukes): write the commandline to the trial output file? 123 process = worker.startWorker(); 124 } catch (IOException e) { 125 notifyFailed(e); 126 return; 127 } 128 // Failsafe kill the process and the executor service. 129 // If the process has already exited cleanly, this will be a no-op. 130 addListener(new Listener() { 131 @Override public void starting() {} 132 @Override public void running() {} 133 @Override public void stopping(State from) {} 134 @Override public void terminated(State from) { 135 cleanup(); 136 } 137 @Override public void failed(State from, Throwable failure) { 138 cleanup(); 139 } 140 141 void cleanup() { 142 streamExecutor.shutdown(); 143 process.destroy(); 144 try { 145 streamExecutor.awaitTermination(10, TimeUnit.MILLISECONDS); 146 } catch (InterruptedException e) { 147 Thread.currentThread().interrupt(); 148 } 149 streamExecutor.shutdownNow(); 150 } 151 }, MoreExecutors.directExecutor()); 152 // You may be thinking as you read this "Yo dawg, what if IOExceptions rain from the sky?" 153 // If a stream we are reading from throws an IOException then we fail the entire Service. This 154 // will cause the worker to be killed (if its not dead already) and the various StreamReaders to 155 // be interrupted (eventually). 156 157 // use the default charset because worker streams will use the default for output 158 Charset processCharset = Charset.defaultCharset(); 159 runningReadStreams.addAndGet(2); 160 openStreams.addAndGet(1); 161 streamExecutor.submit( 162 threadRenaming("worker-stderr", 163 new StreamReader("stderr", 164 new InputStreamReader(process.getErrorStream(), processCharset)))); 165 streamExecutor.submit( 166 threadRenaming("worker-stdout", 167 new StreamReader("stdout", 168 new InputStreamReader(process.getInputStream(), processCharset)))); 169 worker.socketFuture().addListener( 170 new Runnable() { 171 @Override public void run() { 172 try { 173 OpenedSocket openedSocket = 174 Uninterruptibles.getUninterruptibly(worker.socketFuture()); 175 logger.fine("successfully opened the pipe from the worker"); 176 socketWriter = openedSocket.writer(); 177 runningReadStreams.addAndGet(1); 178 openStreams.addAndGet(1); 179 streamExecutor.submit(threadRenaming("worker-socket", 180 new SocketStreamReader(openedSocket.reader()))); 181 } catch (ExecutionException e) { 182 notifyFailed(e.getCause()); 183 } 184 } 185 }, 186 MoreExecutors.directExecutor()); 187 notifyStarted(); 188 } 189 190 /** 191 * Reads a {@link StreamItem} from one of the streams waiting for one to become available if 192 * necessary. 193 */ readItem(long timeout, TimeUnit unit)194 StreamItem readItem(long timeout, TimeUnit unit) throws InterruptedException { 195 checkState(isRunning(), "Cannot read items from a %s StreamService", state()); 196 StreamItem line = outputQueue.poll(timeout, unit); 197 if (line == EOF_ITEM) { 198 closeStream(); 199 } 200 return (line == null) ? TIMEOUT_ITEM : line; 201 } 202 203 /** 204 * Write a line of data to the worker process over the socket. 205 * 206 * <p>N.B. Writing data via {@link #sendMessage(Serializable)} is only valid once the underlying 207 * socket has been opened. This should be fine assuming that socket writes are only in response 208 * to socket reads (which is currently the case), so there is no way that a write could happen 209 * prior to the socket being opened. 210 */ sendMessage(Serializable message)211 void sendMessage(Serializable message) throws IOException { 212 checkState(isRunning(), "Cannot read items from a %s StreamService", state()); 213 checkState(socketWriter != null, "Attempted to write to the socket before it was opened."); 214 try { 215 socketWriter.write(message); 216 // We need to flush since this is a back and forth lockstep protocol, buffering can cause 217 // deadlock! 218 socketWriter.flush(); 219 } catch (IOException e) { 220 Closeables.close(socketWriter, true); 221 notifyFailed(e); 222 throw e; 223 } 224 } 225 226 /** Closes the socket writer. */ closeWriter()227 void closeWriter() throws IOException { 228 checkState(isRunning(), "Cannot read items from a %s StreamService", state()); 229 checkState(socketWriter != null, "Attempted to close the socket before it was opened."); 230 try { 231 socketWriter.close(); 232 } catch (IOException e) { 233 notifyFailed(e); 234 throw e; 235 } 236 closeStream(); 237 } 238 doStop()239 @Override protected void doStop() { 240 if (openStreams.get() > 0) { 241 // This means stop was called on us externally and we are still reading/writing, just log a 242 // warning and do nothing 243 logger.warning("Attempting to stop the stream service with streams still open"); 244 } 245 final ListenableFuture<Integer> processFuture = streamExecutor.submit(new Callable<Integer>() { 246 @Override public Integer call() throws Exception { 247 return process.waitFor(); 248 } 249 }); 250 // Experimentally, even with well behaved processes there is some time between when all streams 251 // are closed as part of process shutdown and when the process has exited. So to not fail 252 // flakily when shutting down normally we need to do a timed wait 253 streamExecutor.submit(new Callable<Void>() { 254 @Override public Void call() throws Exception { 255 boolean threw = true; 256 try { 257 if (processFuture.get(SHUTDOWN_WAIT_MILLIS, TimeUnit.MILLISECONDS) == 0) { 258 notifyStopped(); 259 } else { 260 notifyFailed( 261 new Exception("Process failed to stop cleanly. Exit code: " + process.waitFor())); 262 } 263 threw = false; 264 } finally { 265 processFuture.cancel(true); // we don't need it anymore 266 if (threw) { 267 process.destroy(); 268 notifyFailed( 269 new Exception("Process failed to stop cleanly and was forcibly killed. Exit code: " 270 + process.waitFor())); 271 } 272 } 273 return null; 274 } 275 }); 276 } 277 closeStream()278 private void closeStream() { 279 if (openStreams.decrementAndGet() == 0) { 280 stopAsync(); 281 } 282 } 283 closeReadStream()284 private void closeReadStream() { 285 if (runningReadStreams.decrementAndGet() == 0) { 286 outputQueue.add(EOF_ITEM); 287 } 288 } 289 290 /** An item read from one of the streams. */ 291 static class StreamItem { 292 enum Kind { 293 /** This indicates that it is the last item. */ 294 EOF, 295 /** This indicates that reading the item timed out. */ 296 TIMEOUT, 297 /** This indicates that this item has content. */ 298 DATA; 299 } 300 301 @Nullable private final LogMessage logMessage; 302 private final Kind kind; 303 StreamItem(LogMessage line)304 private StreamItem(LogMessage line) { 305 this(Kind.DATA, checkNotNull(line)); 306 } 307 StreamItem(Kind state, @Nullable LogMessage logMessage)308 private StreamItem(Kind state, @Nullable LogMessage logMessage) { 309 this.logMessage = logMessage; 310 this.kind = state; 311 } 312 313 /** Returns the content. This is only valid if {@link #kind()} return {@link Kind#DATA}. */ content()314 LogMessage content() { 315 checkState(kind == Kind.DATA, "Only data lines have content: %s", this); 316 return logMessage; 317 } 318 kind()319 Kind kind() { 320 return kind; 321 } 322 toString()323 @Override public String toString() { 324 ToStringHelper helper = MoreObjects.toStringHelper(StreamItem.class); 325 if (kind == Kind.DATA) { 326 helper.addValue(logMessage); 327 } else { 328 helper.addValue(kind); 329 } 330 return helper.toString(); 331 } 332 } 333 334 /** Returns a callable that renames the the thread that the given callable runs in. */ threadRenaming(final String name, final Callable<T> callable)335 private static <T> Callable<T> threadRenaming(final String name, final Callable<T> callable) { 336 checkNotNull(name); 337 checkNotNull(callable); 338 return new Callable<T>() { 339 @Override public T call() throws Exception { 340 Thread currentThread = Thread.currentThread(); 341 String oldName = currentThread.getName(); 342 currentThread.setName(name); 343 try { 344 return callable.call(); 345 } finally { 346 currentThread.setName(oldName); 347 } 348 } 349 }; 350 } 351 352 /** 353 * A background task that reads lines of text from a {@link Reader} and puts them onto a 354 * {@link BlockingQueue}. 355 */ 356 private final class StreamReader implements Callable<Void> { 357 final Reader reader; 358 final String streamName; 359 360 StreamReader(String streamName, Reader reader) { 361 this.streamName = streamName; 362 this.reader = reader; 363 } 364 365 @Override public Void call() throws IOException, InterruptedException, ParseException { 366 LineReader lineReader = new LineReader(reader); 367 boolean threw = true; 368 try { 369 String line; 370 while ((line = lineReader.readLine()) != null) { 371 trialOutput.log(streamName, line); 372 LogMessage logMessage = logMessageParser.parse(line); 373 if (logMessage != null) { 374 outputQueue.put(new StreamItem(logMessage)); 375 } 376 } 377 threw = false; 378 } catch (Exception e) { 379 notifyFailed(e); 380 } finally { 381 closeReadStream(); 382 Closeables.close(reader, threw); 383 } 384 return null; 385 } 386 } 387 388 /** 389 * A background task that reads lines of text from a {@link OpenedSocket.Reader} and puts them 390 * onto a {@link BlockingQueue}. 391 */ 392 private final class SocketStreamReader implements Callable<Void> { 393 final OpenedSocket.Reader reader; 394 395 SocketStreamReader(OpenedSocket.Reader reader) { 396 this.reader = reader; 397 } 398 399 @Override public Void call() throws IOException, InterruptedException, ParseException { 400 boolean threw = true; 401 try { 402 Object obj; 403 while ((obj = reader.read()) != null) { 404 if (obj instanceof String) { 405 log(obj.toString()); 406 continue; 407 } 408 LogMessage message = (LogMessage) obj; 409 if (message instanceof StopMeasurementLogMessage) { 410 // TODO(lukes): how useful are these messages? They seem like leftover debugging info 411 for (Measurement measurement : ((StopMeasurementLogMessage) message).measurements()) { 412 log(String.format("I got a result! %s: %f%s%n", 413 measurement.description(), 414 measurement.value().magnitude() / measurement.weight(), 415 measurement.value().unit())); 416 } 417 } 418 outputQueue.put(new StreamItem(message)); 419 } 420 threw = false; 421 } catch (Exception e) { 422 notifyFailed(e); 423 } finally { 424 closeReadStream(); 425 Closeables.close(reader, threw); 426 } 427 return null; 428 } 429 430 private void log(String text) { 431 trialOutput.log("socket", text); 432 } 433 } 434 } 435