1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.benchmarks.driver; 18 19 import com.google.common.util.concurrent.ThreadFactoryBuilder; 20 import io.grpc.Server; 21 import io.grpc.Status; 22 import io.grpc.benchmarks.proto.Control; 23 import io.grpc.benchmarks.proto.Control.ClientArgs; 24 import io.grpc.benchmarks.proto.Control.ServerArgs; 25 import io.grpc.benchmarks.proto.Control.ServerArgs.ArgtypeCase; 26 import io.grpc.benchmarks.proto.WorkerServiceGrpc; 27 import io.grpc.netty.NettyServerBuilder; 28 import io.grpc.stub.StreamObserver; 29 import io.netty.channel.nio.NioEventLoopGroup; 30 import java.util.logging.Level; 31 import java.util.logging.Logger; 32 33 /** 34 * A load worker process which a driver can use to create clients and servers. The worker 35 * implements the contract defined in 'control.proto'. 36 */ 37 public class LoadWorker { 38 39 private static final Logger log = Logger.getLogger(LoadWorker.class.getName()); 40 41 private final int serverPort; 42 private final Server driverServer; 43 LoadWorker(int driverPort, int serverPort)44 LoadWorker(int driverPort, int serverPort) throws Exception { 45 this.serverPort = serverPort; 46 NioEventLoopGroup singleThreadGroup = new NioEventLoopGroup(1, 47 new ThreadFactoryBuilder() 48 .setDaemon(true) 49 .setNameFormat("load-worker-%d") 50 .build()); 51 this.driverServer = NettyServerBuilder.forPort(driverPort) 52 .directExecutor() 53 .workerEventLoopGroup(singleThreadGroup) 54 .bossEventLoopGroup(singleThreadGroup) 55 .addService(new WorkerServiceImpl()) 56 .build(); 57 } 58 start()59 public void start() throws Exception { 60 driverServer.start(); 61 } 62 63 /** 64 * Start the load worker process. 65 */ main(String[] args)66 public static void main(String[] args) throws Exception { 67 boolean usage = false; 68 int serverPort = 0; 69 int driverPort = 0; 70 for (String arg : args) { 71 if (!arg.startsWith("--")) { 72 System.err.println("All arguments must start with '--': " + arg); 73 usage = true; 74 break; 75 } 76 String[] parts = arg.substring(2).split("=", 2); 77 String key = parts[0]; 78 if ("help".equals(key)) { 79 usage = true; 80 break; 81 } 82 if (parts.length != 2) { 83 System.err.println("All arguments must be of the form --arg=value"); 84 usage = true; 85 break; 86 } 87 String value = parts[1]; 88 if ("server_port".equals(key)) { 89 serverPort = Integer.valueOf(value); 90 } else if ("driver_port".equals(key)) { 91 driverPort = Integer.valueOf(value); 92 } else { 93 System.err.println("Unknown argument: " + key); 94 usage = true; 95 break; 96 } 97 } 98 if (usage || driverPort == 0) { 99 System.err.println( 100 "Usage: [ARGS...]" 101 + "\n" 102 + "\n --driver_port=<port>" 103 + "\n Port to expose grpc.testing.WorkerService, used by driver to initiate work." 104 + "\n --server_port=<port>" 105 + "\n Port to start load servers on. Defaults to any available port"); 106 System.exit(1); 107 } 108 LoadWorker loadWorker = new LoadWorker(driverPort, serverPort); 109 loadWorker.start(); 110 loadWorker.driverServer.awaitTermination(); 111 log.log(Level.INFO, "DriverServer has terminated."); 112 113 // Allow enough time for quitWorker to deliver OK status to the driver. 114 Thread.sleep(3000); 115 } 116 117 /** 118 * Implement the worker service contract which can launch clients and servers. 119 */ 120 private class WorkerServiceImpl extends WorkerServiceGrpc.WorkerServiceImplBase { 121 122 private LoadServer workerServer; 123 private LoadClient workerClient; 124 125 @Override runServer( final StreamObserver<Control.ServerStatus> responseObserver)126 public StreamObserver<ServerArgs> runServer( 127 final StreamObserver<Control.ServerStatus> responseObserver) { 128 return new StreamObserver<ServerArgs>() { 129 @Override 130 public void onNext(ServerArgs value) { 131 try { 132 ArgtypeCase argTypeCase = value.getArgtypeCase(); 133 if (argTypeCase == ServerArgs.ArgtypeCase.SETUP && workerServer == null) { 134 if (serverPort != 0 && value.getSetup().getPort() == 0) { 135 Control.ServerArgs.Builder builder = value.toBuilder(); 136 builder.getSetupBuilder().setPort(serverPort); 137 value = builder.build(); 138 } 139 workerServer = new LoadServer(value.getSetup()); 140 workerServer.start(); 141 responseObserver.onNext(Control.ServerStatus.newBuilder() 142 .setPort(workerServer.getPort()) 143 .setCores(workerServer.getCores()) 144 .build()); 145 } else if (argTypeCase == ArgtypeCase.MARK && workerServer != null) { 146 responseObserver.onNext(Control.ServerStatus.newBuilder() 147 .setStats(workerServer.getStats()) 148 .build()); 149 } else { 150 responseObserver.onError(Status.ALREADY_EXISTS 151 .withDescription("Server already started") 152 .asRuntimeException()); 153 } 154 } catch (Throwable t) { 155 log.log(Level.WARNING, "Error running server", t); 156 responseObserver.onError(Status.INTERNAL.withCause(t).asException()); 157 // Shutdown server if we can 158 onCompleted(); 159 } 160 } 161 162 @Override 163 public void onError(Throwable t) { 164 Status status = Status.fromThrowable(t); 165 if (status.getCode() != Status.Code.CANCELLED) { 166 log.log(Level.WARNING, "Error driving server", t); 167 } 168 onCompleted(); 169 } 170 171 @Override 172 public void onCompleted() { 173 try { 174 if (workerServer != null) { 175 workerServer.shutdownNow(); 176 } 177 } finally { 178 workerServer = null; 179 responseObserver.onCompleted(); 180 } 181 } 182 }; 183 } 184 185 @Override 186 public StreamObserver<ClientArgs> runClient( 187 final StreamObserver<Control.ClientStatus> responseObserver) { 188 return new StreamObserver<ClientArgs>() { 189 @Override 190 public void onNext(ClientArgs value) { 191 try { 192 ClientArgs.ArgtypeCase argTypeCase = value.getArgtypeCase(); 193 if (argTypeCase == ClientArgs.ArgtypeCase.SETUP && workerClient == null) { 194 workerClient = new LoadClient(value.getSetup()); 195 workerClient.start(); 196 responseObserver.onNext(Control.ClientStatus.newBuilder().build()); 197 } else if (argTypeCase == ClientArgs.ArgtypeCase.MARK && workerClient != null) { 198 responseObserver.onNext(Control.ClientStatus.newBuilder() 199 .setStats(workerClient.getStats()) 200 .build()); 201 } else { 202 responseObserver.onError(Status.ALREADY_EXISTS 203 .withDescription("Client already started") 204 .asRuntimeException()); 205 } 206 } catch (Throwable t) { 207 log.log(Level.WARNING, "Error running client", t); 208 responseObserver.onError(Status.INTERNAL.withCause(t).asException()); 209 // Shutdown the client if we can 210 onCompleted(); 211 } 212 } 213 214 @Override 215 public void onError(Throwable t) { 216 Status status = Status.fromThrowable(t); 217 if (status.getCode() != Status.Code.CANCELLED) { 218 log.log(Level.WARNING, "Error driving client", t); 219 } 220 onCompleted(); 221 } 222 223 @Override 224 public void onCompleted() { 225 try { 226 if (workerClient != null) { 227 workerClient.shutdownNow(); 228 } 229 } finally { 230 workerClient = null; 231 responseObserver.onCompleted(); 232 } 233 } 234 }; 235 } 236 237 @Override 238 public void coreCount(Control.CoreRequest request, 239 StreamObserver<Control.CoreResponse> responseObserver) { 240 responseObserver.onNext( 241 Control.CoreResponse.newBuilder() 242 .setCores(Runtime.getRuntime().availableProcessors()) 243 .build()); 244 responseObserver.onCompleted(); 245 } 246 247 @Override 248 public void quitWorker(Control.Void request, 249 StreamObserver<Control.Void> responseObserver) { 250 try { 251 log.log(Level.INFO, "Received quitWorker request."); 252 responseObserver.onNext(Control.Void.getDefaultInstance()); 253 responseObserver.onCompleted(); 254 driverServer.shutdownNow(); 255 } catch (Throwable t) { 256 log.log(Level.WARNING, "Error during shutdown", t); 257 } 258 } 259 } 260 } 261