1 /* 2 * Copyright (C) 2013 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.caliper.runner; 16 17 import static com.google.common.base.Preconditions.checkArgument; 18 import static com.google.common.base.Preconditions.checkState; 19 import static org.junit.Assert.assertEquals; 20 import static org.junit.Assert.assertNotSame; 21 import static org.junit.Assert.assertTrue; 22 import static org.junit.Assert.fail; 23 24 import com.google.caliper.bridge.LogMessage; 25 import com.google.caliper.bridge.OpenedSocket; 26 import com.google.caliper.runner.FakeWorkers.DummyLogMessage; 27 import com.google.caliper.runner.StreamService.StreamItem; 28 import com.google.caliper.runner.StreamService.StreamItem.Kind; 29 import com.google.caliper.util.Parser; 30 import com.google.common.collect.Sets; 31 import com.google.common.util.concurrent.ListenableFuture; 32 import com.google.common.util.concurrent.ListenableFutureTask; 33 import com.google.common.util.concurrent.MoreExecutors; 34 import com.google.common.util.concurrent.Service.Listener; 35 import com.google.common.util.concurrent.Service.State; 36 37 import org.junit.After; 38 import org.junit.Before; 39 import org.junit.Test; 40 import org.junit.runner.RunWith; 41 import org.junit.runners.JUnit4; 42 43 import java.io.File; 44 import java.io.FileNotFoundException; 45 import java.io.IOException; 46 import java.io.PrintWriter; 47 import java.io.StringWriter; 48 import java.net.ServerSocket; 49 import java.net.SocketException; 50 import java.text.ParseException; 51 import java.util.Set; 52 import java.util.UUID; 53 import java.util.concurrent.Callable; 54 import java.util.concurrent.CountDownLatch; 55 import java.util.concurrent.TimeUnit; 56 57 /** 58 * Tests for {@link StreamService}. 59 */ 60 @RunWith(JUnit4.class) 61 62 public class StreamServiceTest { 63 64 private ServerSocket serverSocket; 65 private final StringWriter writer = new StringWriter(); 66 private final PrintWriter stdout = new PrintWriter(writer, true); 67 private final Parser<LogMessage> parser = new Parser<LogMessage>() { 68 @Override public LogMessage parse(final CharSequence text) throws ParseException { 69 return new DummyLogMessage(text.toString()); 70 } 71 }; 72 73 private StreamService service; 74 private final CountDownLatch terminalLatch = new CountDownLatch(1); 75 private static final int TRIAL_NUMBER = 3; 76 setUp()77 @Before public void setUp() throws IOException { 78 serverSocket = new ServerSocket(0); 79 } 80 closeSocket()81 @After public void closeSocket() throws IOException { 82 serverSocket.close(); 83 } 84 stopService()85 @After public void stopService() { 86 if (service != null && service.state() != State.FAILED && service.state() != State.TERMINATED) { 87 service.stopAsync().awaitTerminated(); 88 } 89 } 90 testReadOutput()91 @Test public void testReadOutput() throws Exception { 92 makeService(FakeWorkers.PrintClient.class, "foo", "bar"); 93 service.startAsync().awaitRunning(); 94 StreamItem item1 = readItem(); 95 assertEquals(Kind.DATA, item1.kind()); 96 Set<String> lines = Sets.newHashSet(); 97 lines.add(item1.content().toString()); 98 StreamItem item2 = readItem(); 99 assertEquals(Kind.DATA, item2.kind()); 100 lines.add(item2.content().toString()); 101 assertEquals(Sets.newHashSet("foo", "bar"), lines); 102 assertEquals(State.RUNNING, service.state()); 103 StreamItem item3 = readItem(); 104 assertEquals(Kind.EOF, item3.kind()); 105 awaitStopped(100, TimeUnit.MILLISECONDS); 106 assertTerminated(); 107 } 108 failingProcess()109 @Test public void failingProcess() throws Exception { 110 makeService(FakeWorkers.Exit.class, "1"); 111 service.startAsync().awaitRunning(); 112 assertEquals(Kind.EOF, readItem().kind()); 113 awaitStopped(100, TimeUnit.MILLISECONDS); 114 assertEquals(State.FAILED, service.state()); 115 } 116 processDoesntExit()117 @Test public void processDoesntExit() throws Exception { 118 // close all fds and then sleep 119 makeService(FakeWorkers.CloseAndSleep.class); 120 service.startAsync().awaitRunning(); 121 assertEquals(Kind.EOF, readItem().kind()); 122 awaitStopped(200, TimeUnit.MILLISECONDS); // we 123 assertEquals(State.FAILED, service.state()); 124 } 125 testSocketInputOutput()126 @Test public void testSocketInputOutput() throws Exception { 127 int localport = serverSocket.getLocalPort(); 128 // read from the socket and echo it back 129 makeService(FakeWorkers.SocketEchoClient.class, Integer.toString(localport)); 130 131 service.startAsync().awaitRunning(); 132 assertEquals(new DummyLogMessage("start"), readItem().content()); 133 service.sendMessage(new DummyLogMessage("hello socket world")); 134 assertEquals(new DummyLogMessage("hello socket world"), readItem().content()); 135 service.closeWriter(); 136 assertEquals(State.RUNNING, service.state()); 137 StreamItem nextItem = readItem(); 138 assertEquals("Expected EOF " + nextItem, Kind.EOF, nextItem.kind()); 139 awaitStopped(100, TimeUnit.MILLISECONDS); 140 assertTerminated(); 141 } 142 testSocketClosesBeforeProcess()143 @Test public void testSocketClosesBeforeProcess() throws Exception { 144 int localport = serverSocket.getLocalPort(); 145 // read from the socket and echo it back 146 makeService(FakeWorkers.SocketEchoClient.class, Integer.toString(localport), "foo"); 147 service.startAsync().awaitRunning(); 148 assertEquals(new DummyLogMessage("start"), readItem().content()); 149 service.sendMessage(new DummyLogMessage("hello socket world")); 150 assertEquals(new DummyLogMessage("hello socket world"), readItem().content()); 151 service.closeWriter(); 152 153 assertEquals("foo", readItem().content().toString()); 154 155 assertEquals(State.RUNNING, service.state()); 156 assertEquals(Kind.EOF, readItem().kind()); 157 awaitStopped(100, TimeUnit.MILLISECONDS); 158 assertTerminated(); 159 } 160 failsToAcceptConnection()161 @Test public void failsToAcceptConnection() throws Exception { 162 serverSocket.close(); // This will force serverSocket.accept to throw a SocketException 163 makeService(FakeWorkers.Sleeper.class, Long.toString(TimeUnit.MINUTES.toMillis(10))); 164 try { 165 service.startAsync().awaitRunning(); 166 fail(); 167 } catch (IllegalStateException expected) {} 168 assertEquals(SocketException.class, service.failureCause().getClass()); 169 } 170 171 /** Reads an item, asserting that there was no timeout. */ readItem()172 private StreamItem readItem() throws InterruptedException { 173 StreamItem item = service.readItem(10, TimeUnit.SECONDS); 174 assertNotSame("Timed out while reading item from worker", Kind.TIMEOUT, item.kind()); 175 return item; 176 } 177 178 /** 179 * Wait for the service to reach a terminal state without calling stop. 180 */ awaitStopped(long time, TimeUnit unit)181 private void awaitStopped(long time, TimeUnit unit) throws InterruptedException { 182 assertTrue(terminalLatch.await(time, unit)); 183 } 184 assertTerminated()185 private void assertTerminated() { 186 State state = service.state(); 187 if (state != State.TERMINATED) { 188 if (state == State.FAILED) { 189 throw new AssertionError(service.failureCause()); 190 } 191 fail("Expected service to be terminated but was: " + state); 192 } 193 } 194 195 @SuppressWarnings("resource") makeService(Class<?> main, String ...args)196 private void makeService(Class<?> main, String ...args) { 197 checkState(service == null, "You can only make one StreamService per test"); 198 UUID trialId = UUID.randomUUID(); 199 TrialOutputLogger trialOutput = new TrialOutputLogger(new TrialOutputFactory() { 200 @Override public FileAndWriter getTrialOutputFile(int trialNumber) 201 throws FileNotFoundException { 202 checkArgument(trialNumber == TRIAL_NUMBER); 203 return new FileAndWriter(new File("/tmp/not-a-file"), stdout); 204 } 205 206 @Override public void persistFile(File f) { 207 throw new UnsupportedOperationException(); 208 } 209 210 }, TRIAL_NUMBER, trialId, null /* experiment */); 211 try { 212 // normally the TrialRunLoop opens/closes the logger 213 trialOutput.open(); 214 } catch (IOException e) { 215 throw new RuntimeException(e); 216 } 217 service = new StreamService( 218 new WorkerProcess(FakeWorkers.createProcessBuilder(main, args), 219 trialId, 220 getSocketFuture(), 221 new RuntimeShutdownHookRegistrar()), 222 parser, 223 trialOutput); 224 service.addListener(new Listener() { 225 @Override public void starting() {} 226 @Override public void running() {} 227 @Override public void stopping(State from) {} 228 @Override public void terminated(State from) { 229 terminalLatch.countDown(); 230 } 231 @Override public void failed(State from, Throwable failure) { 232 terminalLatch.countDown(); 233 } 234 }, MoreExecutors.directExecutor()); 235 } 236 getSocketFuture()237 private ListenableFuture<OpenedSocket> getSocketFuture() { 238 ListenableFutureTask<OpenedSocket> openSocketTask = ListenableFutureTask.create( 239 new Callable<OpenedSocket>() { 240 @Override 241 public OpenedSocket call() throws Exception { 242 return OpenedSocket.fromSocket(serverSocket.accept()); 243 } 244 }); 245 // N.B. this thread will block on serverSocket.accept until a connection is accepted or the 246 // socket is closed, so no matter what this thread will die with the test. 247 Thread opener = new Thread(openSocketTask, "SocketOpener"); 248 opener.setDaemon(true); 249 opener.start(); 250 return openSocketTask; 251 } 252 } 253