• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2013 Google Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 
15 package com.google.caliper.runner;
16 
17 import static com.google.common.base.Preconditions.checkNotNull;
18 import static com.google.common.base.Preconditions.checkState;
19 
20 import com.google.caliper.bridge.LogMessage;
21 import com.google.caliper.bridge.OpenedSocket;
22 import com.google.caliper.bridge.StopMeasurementLogMessage;
23 import com.google.caliper.model.Measurement;
24 import com.google.caliper.runner.StreamService.StreamItem.Kind;
25 import com.google.caliper.util.Parser;
26 import com.google.common.base.MoreObjects;
27 import com.google.common.base.MoreObjects.ToStringHelper;
28 import com.google.common.collect.Queues;
29 import com.google.common.io.Closeables;
30 import com.google.common.io.LineReader;
31 import com.google.common.util.concurrent.AbstractService;
32 import com.google.common.util.concurrent.ListenableFuture;
33 import com.google.common.util.concurrent.ListeningExecutorService;
34 import com.google.common.util.concurrent.MoreExecutors;
35 import com.google.common.util.concurrent.Service; // for javadoc
36 import com.google.common.util.concurrent.Service.State; // for javadoc
37 import com.google.common.util.concurrent.ThreadFactoryBuilder;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 
40 import java.io.IOException;
41 import java.io.InputStreamReader;
42 import java.io.Reader;
43 import java.io.Serializable;
44 import java.nio.charset.Charset;
45 import java.text.ParseException;
46 import java.util.concurrent.BlockingQueue;
47 import java.util.concurrent.Callable;
48 import java.util.concurrent.ExecutionException;
49 import java.util.concurrent.Executors;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.atomic.AtomicInteger;
52 import java.util.logging.Logger;
53 
54 import javax.annotation.Nullable;
55 import javax.inject.Inject;
56 
57 /**
58  * A {@link Service} that establishes a connection over a socket to a process and then allows
59  * multiplexed access to the processes' line oriented output over the socket and the standard
60  * process streams (stdout and stderr) as well as allowing data to be written over the socket.
61  *
62  * <p>The {@linkplain State states} of this service are as follows:
63  * <ul>
64  *   <li>{@linkplain State#NEW NEW} : Idle state, no reading or writing is allowed.
65  *   <li>{@linkplain State#STARTING STARTING} : Streams are being opened
66  *   <li>{@linkplain State#RUNNING RUNNING} : At least one stream is still open or the writer has
67  *       not been closed yet.
68  *   <li>{@linkplain State#STOPPING STOPPING} : All streams have closed but some threads may still
69  *       be running.
70  *   <li>{@linkplain State#TERMINATED TERMINATED} : Idle state, all streams are closed
71  *   <li>{@linkplain State#FAILED FAILED} : The service will transition to failed if it encounters
72  *       any errors while reading from or writing to the streams, service failure will also cause
73  *       the worker process to be forcibly shutdown and {@link #readItem(long, TimeUnit)},
74  *       {@link #closeWriter()} and {@link #sendMessage(Serializable)} will start throwing
75  *       IllegalStateExceptions.
76  * </ul>
77  */
78 @TrialScoped final class StreamService extends AbstractService {
79   /** How long to wait for a process that should be exiting to actually exit. */
80   private static final int SHUTDOWN_WAIT_MILLIS = 10;
81 
82   private static final Logger logger = Logger.getLogger(StreamService.class.getName());
83   private static final StreamItem TIMEOUT_ITEM = new StreamItem(Kind.TIMEOUT, null);
84 
85   /** The final item that will be sent down the stream. */
86   static final StreamItem EOF_ITEM = new StreamItem(Kind.EOF, null);
87 
88   private final ListeningExecutorService streamExecutor = MoreExecutors.listeningDecorator(
89       Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build()));
90   private final BlockingQueue<StreamItem> outputQueue = Queues.newLinkedBlockingQueue();
91   private final WorkerProcess worker;
92   private volatile Process process;
93   private final Parser<LogMessage> logMessageParser;
94   private final TrialOutputLogger trialOutput;
95 
96   /**
97    * This represents the number of open streams from the users perspective.  i.e. can you still
98    * write to the socket and read items.
99    *
100    * <p>This is decremented when either the socket is closed for writing or the EOF_ITEM has been
101    * read by the user.
102    */
103   private final AtomicInteger openStreams = new AtomicInteger();
104 
105   /**
106    * Used to track how many read streams are open so we can correctly set the EOF_ITEM onto the
107    * queue.
108    */
109   private final AtomicInteger runningReadStreams = new AtomicInteger();
110   private OpenedSocket.Writer socketWriter;
111 
StreamService(WorkerProcess worker, Parser<LogMessage> logMessageParser, TrialOutputLogger trialOutput)112   @Inject StreamService(WorkerProcess worker,
113       Parser<LogMessage> logMessageParser,
114       TrialOutputLogger trialOutput) {
115     this.worker = worker;
116     this.logMessageParser = logMessageParser;
117     this.trialOutput = trialOutput;
118   }
119 
doStart()120   @Override protected void doStart() {
121     try {
122       // TODO(lukes): write the commandline to the trial output file?
123       process = worker.startWorker();
124     } catch (IOException e) {
125       notifyFailed(e);
126       return;
127     }
128     // Failsafe kill the process and the executor service.
129     // If the process has already exited cleanly, this will be a no-op.
130     addListener(new Listener() {
131       @Override public void starting() {}
132       @Override public void running() {}
133       @Override public void stopping(State from) {}
134       @Override public void terminated(State from) {
135         cleanup();
136       }
137       @Override public void failed(State from, Throwable failure) {
138         cleanup();
139       }
140 
141       void cleanup() {
142         streamExecutor.shutdown();
143         process.destroy();
144         try {
145           streamExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
146         } catch (InterruptedException e) {
147           Thread.currentThread().interrupt();
148         }
149         streamExecutor.shutdownNow();
150       }
151     }, MoreExecutors.directExecutor());
152     // You may be thinking as you read this "Yo dawg, what if IOExceptions rain from the sky?"
153     // If a stream we are reading from throws an IOException then we fail the entire Service. This
154     // will cause the worker to be killed (if its not dead already) and the various StreamReaders to
155     // be interrupted (eventually).
156 
157     // use the default charset because worker streams will use the default for output
158     Charset processCharset = Charset.defaultCharset();
159     runningReadStreams.addAndGet(2);
160     openStreams.addAndGet(1);
161     ListenableFuture possiblyIgnoredError1 = streamExecutor.submit(
162         threadRenaming("worker-stderr",
163             new StreamReader("stderr",
164                 new InputStreamReader(process.getErrorStream(), processCharset))));
165     ListenableFuture possiblyIgnoredError2 = streamExecutor.submit(
166         threadRenaming("worker-stdout",
167             new StreamReader("stdout",
168                 new InputStreamReader(process.getInputStream(), processCharset))));
169     worker.socketFuture().addListener(
170         new Runnable() {
171           @Override public void run() {
172             try {
173               OpenedSocket openedSocket =
174                   Uninterruptibles.getUninterruptibly(worker.socketFuture());
175               logger.fine("successfully opened the pipe from the worker");
176               socketWriter = openedSocket.writer();
177               runningReadStreams.addAndGet(1);
178               openStreams.addAndGet(1);
179               ListenableFuture possiblyIgnoredError = streamExecutor.submit(threadRenaming("worker-socket",
180                   new SocketStreamReader(openedSocket.reader())));
181             } catch (ExecutionException e) {
182               notifyFailed(e.getCause());
183             }
184           }
185         },
186         MoreExecutors.directExecutor());
187     notifyStarted();
188   }
189 
190   /**
191    * Reads a {@link StreamItem} from one of the streams waiting for one to become available if
192    * necessary.
193    */
readItem(long timeout, TimeUnit unit)194   StreamItem readItem(long timeout, TimeUnit unit) throws InterruptedException {
195     checkState(isRunning(), "Cannot read items from a %s StreamService", state());
196     StreamItem line = outputQueue.poll(timeout, unit);
197     if (line == EOF_ITEM) {
198       closeStream();
199     }
200     return (line == null) ? TIMEOUT_ITEM : line;
201   }
202 
203   /**
204    * Write a line of data to the worker process over the socket.
205    *
206    * <p>N.B. Writing data via {@link #sendMessage(Serializable)} is only valid once the underlying
207    * socket has been opened.  This should be fine assuming that socket writes are only in response
208    * to socket reads (which is currently the case), so there is no way that a write could happen
209    * prior to the socket being opened.
210   */
sendMessage(Serializable message)211   void sendMessage(Serializable message) throws IOException {
212     checkState(isRunning(), "Cannot read items from a %s StreamService", state());
213     checkState(socketWriter != null, "Attempted to write to the socket before it was opened.");
214     try {
215       socketWriter.write(message);
216       // We need to flush since this is a back and forth lockstep protocol, buffering can cause
217       // deadlock!
218       socketWriter.flush();
219     } catch (IOException e) {
220       Closeables.close(socketWriter, true);
221       notifyFailed(e);
222       throw e;
223     }
224   }
225 
226   /** Closes the socket writer. */
closeWriter()227   void closeWriter() throws IOException {
228     checkState(isRunning(), "Cannot read items from a %s StreamService", state());
229     checkState(socketWriter != null, "Attempted to close the socket before it was opened.");
230     try {
231       socketWriter.close();
232     } catch (IOException e) {
233       notifyFailed(e);
234       throw e;
235     }
236     closeStream();
237   }
238 
doStop()239   @Override protected void doStop() {
240     if (openStreams.get() > 0) {
241       // This means stop was called on us externally and we are still reading/writing, just log a
242       // warning and do nothing
243       logger.warning("Attempting to stop the stream service with streams still open");
244     }
245     final ListenableFuture<Integer> processFuture = streamExecutor.submit(new Callable<Integer>() {
246       @Override public Integer call() throws Exception {
247         return process.waitFor();
248       }
249     });
250     // Experimentally, even with well behaved processes there is some time between when all streams
251     // are closed as part of process shutdown and when the process has exited. So to not fail
252     // flakily when shutting down normally we need to do a timed wait
253     ListenableFuture possiblyIgnoredError = streamExecutor.submit(new Callable<Void>() {
254       @Override public Void call() throws Exception {
255         boolean threw = true;
256         try {
257           if (processFuture.get(SHUTDOWN_WAIT_MILLIS, TimeUnit.MILLISECONDS) == 0) {
258             notifyStopped();
259           } else {
260             notifyFailed(
261                 new Exception("Process failed to stop cleanly. Exit code: " + process.waitFor()));
262           }
263           threw = false;
264         } finally {
265           processFuture.cancel(true);  // we don't need it anymore
266           if (threw) {
267             process.destroy();
268             notifyFailed(
269                 new Exception("Process failed to stop cleanly and was forcibly killed. Exit code: "
270                     + process.waitFor()));
271           }
272         }
273         return null;
274       }
275     });
276   }
277 
closeStream()278   private void closeStream() {
279     if (openStreams.decrementAndGet() == 0) {
280       stopAsync();
281     }
282   }
283 
closeReadStream()284   private void closeReadStream() {
285     if (runningReadStreams.decrementAndGet() == 0) {
286       outputQueue.add(EOF_ITEM);
287     }
288   }
289 
290   /** An item read from one of the streams. */
291   static class StreamItem {
292     enum Kind {
293       /** This indicates that it is the last item. */
294       EOF,
295       /** This indicates that reading the item timed out. */
296       TIMEOUT,
297       /** This indicates that this item has content. */
298       DATA;
299     }
300 
301     @Nullable private final LogMessage logMessage;
302     private final Kind kind;
303 
StreamItem(LogMessage line)304     private StreamItem(LogMessage line) {
305       this(Kind.DATA, checkNotNull(line));
306     }
307 
StreamItem(Kind state, @Nullable LogMessage logMessage)308     private StreamItem(Kind state, @Nullable LogMessage logMessage) {
309       this.logMessage = logMessage;
310       this.kind = state;
311     }
312 
313     /** Returns the content.  This is only valid if {@link #kind()} return {@link Kind#DATA}. */
content()314     LogMessage content() {
315       checkState(kind == Kind.DATA, "Only data lines have content: %s", this);
316       return logMessage;
317     }
318 
kind()319     Kind kind() {
320       return kind;
321     }
322 
toString()323     @Override public String toString() {
324       ToStringHelper helper = MoreObjects.toStringHelper(StreamItem.class);
325       if (kind == Kind.DATA) {
326         helper.addValue(logMessage);
327       } else {
328         helper.addValue(kind);
329       }
330       return helper.toString();
331     }
332   }
333 
334   /** Returns a callable that renames the the thread that the given callable runs in. */
threadRenaming(final String name, final Callable<T> callable)335   private static <T> Callable<T> threadRenaming(final String name, final Callable<T> callable) {
336     checkNotNull(name);
337     checkNotNull(callable);
338     return new Callable<T>() {
339       @Override public T call() throws Exception {
340         Thread currentThread = Thread.currentThread();
341         String oldName = currentThread.getName();
342         currentThread.setName(name);
343         try {
344           return callable.call();
345         } finally {
346           currentThread.setName(oldName);
347         }
348       }
349     };
350   }
351 
352   /**
353    * A background task that reads lines of text from a {@link Reader} and puts them onto a
354    * {@link BlockingQueue}.
355    */
356   private final class StreamReader implements Callable<Void> {
357     final Reader reader;
358     final String streamName;
359 
360     StreamReader(String streamName, Reader reader) {
361       this.streamName = streamName;
362       this.reader = reader;
363     }
364 
365     @Override public Void call() throws IOException, InterruptedException, ParseException {
366       LineReader lineReader = new LineReader(reader);
367       boolean threw = true;
368       try {
369         String line;
370         while ((line = lineReader.readLine()) != null) {
371           trialOutput.log(streamName, line);
372           LogMessage logMessage = logMessageParser.parse(line);
373           if (logMessage != null) {
374             outputQueue.put(new StreamItem(logMessage));
375           }
376         }
377         threw = false;
378       } catch (Exception e) {
379         notifyFailed(e);
380       } finally {
381         closeReadStream();
382         Closeables.close(reader, threw);
383       }
384       return null;
385     }
386   }
387 
388   /**
389    * A background task that reads lines of text from a {@link OpenedSocket.Reader} and puts them
390    * onto a {@link BlockingQueue}.
391    */
392   private final class SocketStreamReader implements Callable<Void> {
393     final OpenedSocket.Reader reader;
394 
395     SocketStreamReader(OpenedSocket.Reader reader) {
396       this.reader = reader;
397     }
398 
399     @Override public Void call() throws IOException, InterruptedException, ParseException {
400       boolean threw = true;
401       try {
402         Object obj;
403         while ((obj = reader.read()) != null) {
404           if (obj instanceof String) {
405             log(obj.toString());
406             continue;
407           }
408           LogMessage message = (LogMessage) obj;
409           if (message instanceof StopMeasurementLogMessage) {
410             // TODO(lukes): how useful are these messages?  They seem like leftover debugging info
411             for (Measurement measurement : ((StopMeasurementLogMessage) message).measurements()) {
412               log(String.format("I got a result! %s: %f%s%n",
413                   measurement.description(),
414                   measurement.value().magnitude() / measurement.weight(),
415                   measurement.value().unit()));
416             }
417           }
418           outputQueue.put(new StreamItem(message));
419         }
420         threw = false;
421       } catch (Exception e) {
422         notifyFailed(e);
423       } finally {
424         closeReadStream();
425         Closeables.close(reader, threw);
426       }
427       return null;
428     }
429 
430     private void log(String text) {
431       trialOutput.log("socket", text);
432     }
433   }
434 }
435