1 /* 2 * Copyright (C) 2012 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.caliper.runner; 18 19 import com.google.caliper.bridge.CommandLineSerializer; 20 import com.google.caliper.bridge.OpenedSocket; 21 import com.google.caliper.bridge.WorkerSpec; 22 import com.google.caliper.config.VmConfig; 23 import com.google.caliper.model.BenchmarkSpec; 24 import com.google.caliper.runner.Instrument.Instrumentation; 25 import com.google.caliper.worker.WorkerMain; 26 import com.google.common.annotations.VisibleForTesting; 27 import com.google.common.collect.ImmutableList; 28 import com.google.common.collect.ImmutableSet; 29 import com.google.common.collect.Iterables; 30 import com.google.common.collect.Lists; 31 import com.google.common.util.concurrent.ListenableFuture; 32 33 import java.io.IOException; 34 import java.io.InputStream; 35 import java.io.OutputStream; 36 import java.util.Collections; 37 import java.util.List; 38 import java.util.UUID; 39 import java.util.logging.Logger; 40 41 import javax.annotation.concurrent.GuardedBy; 42 import javax.inject.Inject; 43 44 /** 45 * A representation of an unstarted worker. 46 * 47 * <p>A worker is a sub process that runs a benchmark trial. Specifically it is a JVM running 48 * {@link com.google.caliper.worker.WorkerMain}. Because of this we can make certain assumptions 49 * about its behavior, including but not limited to: 50 * 51 * <ul> 52 * <li>The worker will connect back to us over a socket connection and send us UTF-8 json 53 * messages in a line oriented protocol. 54 * <li>TODO(lukes,gak): This is probably as good a place as any to specify the entire protocol. 55 * </ul> 56 */ 57 @TrialScoped final class WorkerProcess { 58 private static final Logger logger = Logger.getLogger(WorkerProcess.class.getName()); 59 60 @GuardedBy("this") 61 private Process worker; 62 private final ProcessBuilder workerBuilder; 63 private final ShutdownHookRegistrar shutdownHookRegistrar; 64 private final ListenableFuture<OpenedSocket> openedSocket; 65 private final UUID trialId; 66 WorkerProcess(ProcessBuilder workerBuilder, UUID trialId, ListenableFuture<OpenedSocket> openedSocket, ShutdownHookRegistrar shutdownHookRegistrar)67 @VisibleForTesting WorkerProcess(ProcessBuilder workerBuilder, 68 UUID trialId, 69 ListenableFuture<OpenedSocket> openedSocket, 70 ShutdownHookRegistrar shutdownHookRegistrar) { 71 this.trialId = trialId; 72 this.workerBuilder = workerBuilder; 73 this.openedSocket = openedSocket; 74 this.shutdownHookRegistrar = shutdownHookRegistrar; 75 } 76 WorkerProcess(@rialId UUID trialId, ListenableFuture<OpenedSocket> openedSocket, Experiment experiment, BenchmarkSpec benchmarkSpec, @LocalPort int localPort, BenchmarkClass benchmarkClass, ShutdownHookRegistrar shutdownHookRegistrar)77 @Inject WorkerProcess(@TrialId UUID trialId, 78 ListenableFuture<OpenedSocket> openedSocket, 79 Experiment experiment, 80 BenchmarkSpec benchmarkSpec, 81 @LocalPort int localPort, 82 BenchmarkClass benchmarkClass, 83 ShutdownHookRegistrar shutdownHookRegistrar) { 84 this.trialId = trialId; 85 this.workerBuilder = 86 buildProcess(trialId, experiment, benchmarkSpec, localPort, benchmarkClass); 87 this.openedSocket = openedSocket; 88 this.shutdownHookRegistrar = shutdownHookRegistrar; 89 } 90 socketFuture()91 ListenableFuture<OpenedSocket> socketFuture() { 92 return openedSocket; 93 } 94 95 /** 96 * Returns a {@link Process} representing this worker. The process will be started if it hasn't 97 * already. 98 */ startWorker()99 synchronized Process startWorker() throws IOException { 100 if (worker == null) { 101 final Process delegate = workerBuilder.start(); 102 final Thread shutdownHook = new Thread("worker-shutdown-hook-" + trialId) { 103 @Override public void run() { 104 delegate.destroy(); 105 } 106 }; 107 shutdownHookRegistrar.addShutdownHook(shutdownHook); 108 worker = new Process() { 109 @Override public OutputStream getOutputStream() { 110 return delegate.getOutputStream(); 111 } 112 113 @Override public InputStream getInputStream() { 114 return delegate.getInputStream(); 115 } 116 117 @Override public InputStream getErrorStream() { 118 return delegate.getErrorStream(); 119 } 120 121 @Override public int waitFor() throws InterruptedException { 122 int waitFor = delegate.waitFor(); 123 shutdownHookRegistrar.removeShutdownHook(shutdownHook); 124 return waitFor; 125 } 126 127 @Override public int exitValue() { 128 int exitValue = delegate.exitValue(); 129 // if it hasn't thrown, the process is done 130 shutdownHookRegistrar.removeShutdownHook(shutdownHook); 131 return exitValue; 132 } 133 134 @Override public void destroy() { 135 delegate.destroy(); 136 shutdownHookRegistrar.removeShutdownHook(shutdownHook); 137 } 138 }; 139 } 140 return worker; 141 } 142 buildProcess( UUID trialId, Experiment experiment, BenchmarkSpec benchmarkSpec, int localPort, BenchmarkClass benchmarkClass)143 @VisibleForTesting static ProcessBuilder buildProcess( 144 UUID trialId, 145 Experiment experiment, 146 BenchmarkSpec benchmarkSpec, 147 int localPort, 148 BenchmarkClass benchmarkClass) { 149 // TODO(lukes): it would be nice to split this method into a few smaller more targeted methods 150 Instrumentation instrumentation = experiment.instrumentation(); 151 Instrument instrument = instrumentation.instrument(); 152 WorkerSpec request = new WorkerSpec( 153 trialId, 154 instrumentation.workerClass(), 155 instrumentation.workerOptions(), 156 benchmarkSpec, 157 ImmutableList.copyOf(instrumentation.benchmarkMethod.getParameterTypes()), 158 localPort); 159 160 ProcessBuilder processBuilder = new ProcessBuilder().redirectErrorStream(false); 161 162 List<String> args = processBuilder.command(); 163 164 VirtualMachine vm = experiment.vm(); 165 VmConfig vmConfig = vm.config; 166 args.addAll(getJvmArgs(vm, benchmarkClass)); 167 168 Iterable<String> instrumentJvmOptions = instrument.getExtraCommandLineArgs(vmConfig); 169 logger.fine(String.format("Instrument(%s) Java args: %s", instrument.getClass().getName(), 170 instrumentJvmOptions)); 171 Iterables.addAll(args, instrumentJvmOptions); 172 173 // last to ensure that they're always applied 174 args.addAll(vmConfig.workerProcessArgs()); 175 176 args.add(WorkerMain.class.getName()); 177 args.add(CommandLineSerializer.render(request)); 178 179 logger.finest(String.format("Full JVM (%s) args: %s", vm.name, args)); 180 return processBuilder; 181 } 182 getJvmArgs( VirtualMachine vm, BenchmarkClass benchmarkClass)183 @VisibleForTesting static List<String> getJvmArgs( 184 VirtualMachine vm, 185 BenchmarkClass benchmarkClass) { 186 187 VmConfig vmConfig = vm.config; 188 String platformName = vmConfig.platformName(); 189 190 List<String> args = Lists.newArrayList(); 191 String jdkPath = vmConfig.vmExecutable().getAbsolutePath(); 192 args.add(jdkPath); 193 logger.fine(String.format("%s(%s) Path: %s", platformName, vm.name, jdkPath)); 194 195 ImmutableList<String> jvmOptions = vmConfig.options(); 196 args.addAll(jvmOptions); 197 logger.fine(String.format("%s(%s) args: %s", platformName, vm.name, jvmOptions)); 198 199 ImmutableSet<String> benchmarkJvmOptions = benchmarkClass.vmOptions(); 200 args.addAll(benchmarkJvmOptions); 201 logger.fine(String.format("Benchmark(%s) %s args: %s", benchmarkClass.name(), platformName, 202 benchmarkJvmOptions)); 203 204 ImmutableList<String> classPathArgs = vmConfig.workerClassPathArgs(); 205 args.addAll(classPathArgs); 206 logger.finer(String.format("Class path args: %s", classPathArgs)); 207 208 // TODO(iam): consider forwarding -Djava.library.path= for JNI library support. 209 return args; 210 } 211 } 212