• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.benchmarks.driver;
18 
19 import com.google.common.util.concurrent.ThreadFactoryBuilder;
20 import io.grpc.Server;
21 import io.grpc.Status;
22 import io.grpc.benchmarks.proto.Control;
23 import io.grpc.benchmarks.proto.Control.ClientArgs;
24 import io.grpc.benchmarks.proto.Control.ServerArgs;
25 import io.grpc.benchmarks.proto.Control.ServerArgs.ArgtypeCase;
26 import io.grpc.benchmarks.proto.WorkerServiceGrpc;
27 import io.grpc.netty.NettyServerBuilder;
28 import io.grpc.stub.StreamObserver;
29 import io.netty.channel.nio.NioEventLoopGroup;
30 import java.util.logging.Level;
31 import java.util.logging.Logger;
32 
33 /**
34  * A load worker process which a driver can use to create clients and servers. The worker
35  * implements the contract defined in 'control.proto'.
36  */
37 public class LoadWorker {
38 
39   private static final Logger log = Logger.getLogger(LoadWorker.class.getName());
40 
41   private final int serverPort;
42   private final Server driverServer;
43 
LoadWorker(int driverPort, int serverPort)44   LoadWorker(int driverPort, int serverPort) throws Exception {
45     this.serverPort = serverPort;
46     NioEventLoopGroup singleThreadGroup = new NioEventLoopGroup(1,
47         new ThreadFactoryBuilder()
48             .setDaemon(true)
49             .setNameFormat("load-worker-%d")
50             .build());
51     this.driverServer = NettyServerBuilder.forPort(driverPort)
52         .directExecutor()
53         .workerEventLoopGroup(singleThreadGroup)
54         .bossEventLoopGroup(singleThreadGroup)
55         .addService(new WorkerServiceImpl())
56         .build();
57   }
58 
start()59   public void start() throws Exception {
60     driverServer.start();
61   }
62 
63   /**
64    * Start the load worker process.
65    */
main(String[] args)66   public static void main(String[] args) throws Exception {
67     boolean usage = false;
68     int serverPort = 0;
69     int driverPort = 0;
70     for (String arg : args) {
71       if (!arg.startsWith("--")) {
72         System.err.println("All arguments must start with '--': " + arg);
73         usage = true;
74         break;
75       }
76       String[] parts = arg.substring(2).split("=", 2);
77       String key = parts[0];
78       if ("help".equals(key)) {
79         usage = true;
80         break;
81       }
82       if (parts.length != 2) {
83         System.err.println("All arguments must be of the form --arg=value");
84         usage = true;
85         break;
86       }
87       String value = parts[1];
88       if ("server_port".equals(key)) {
89         serverPort = Integer.valueOf(value);
90       } else if ("driver_port".equals(key)) {
91         driverPort = Integer.valueOf(value);
92       } else {
93         System.err.println("Unknown argument: " + key);
94         usage = true;
95         break;
96       }
97     }
98     if (usage || driverPort == 0) {
99       System.err.println(
100           "Usage: [ARGS...]"
101               + "\n"
102               + "\n  --driver_port=<port>"
103               + "\n    Port to expose grpc.testing.WorkerService, used by driver to initiate work."
104               + "\n  --server_port=<port>"
105               + "\n    Port to start load servers on. Defaults to any available port");
106       System.exit(1);
107     }
108     LoadWorker loadWorker = new LoadWorker(driverPort, serverPort);
109     loadWorker.start();
110     loadWorker.driverServer.awaitTermination();
111     log.log(Level.INFO, "DriverServer has terminated.");
112 
113     // Allow enough time for quitWorker to deliver OK status to the driver.
114     Thread.sleep(3000);
115   }
116 
117   /**
118    * Implement the worker service contract which can launch clients and servers.
119    */
120   private class WorkerServiceImpl extends WorkerServiceGrpc.WorkerServiceImplBase {
121 
122     private LoadServer workerServer;
123     private LoadClient workerClient;
124 
125     @Override
runServer( final StreamObserver<Control.ServerStatus> responseObserver)126     public StreamObserver<ServerArgs> runServer(
127         final StreamObserver<Control.ServerStatus> responseObserver) {
128       return new StreamObserver<ServerArgs>() {
129         @Override
130         public void onNext(ServerArgs value) {
131           try {
132             ArgtypeCase argTypeCase = value.getArgtypeCase();
133             if (argTypeCase == ServerArgs.ArgtypeCase.SETUP && workerServer == null) {
134               if (serverPort != 0 && value.getSetup().getPort() == 0) {
135                 Control.ServerArgs.Builder builder = value.toBuilder();
136                 builder.getSetupBuilder().setPort(serverPort);
137                 value = builder.build();
138               }
139               workerServer = new LoadServer(value.getSetup());
140               workerServer.start();
141               responseObserver.onNext(Control.ServerStatus.newBuilder()
142                   .setPort(workerServer.getPort())
143                   .setCores(workerServer.getCores())
144                   .build());
145             } else if (argTypeCase == ArgtypeCase.MARK && workerServer != null) {
146               responseObserver.onNext(Control.ServerStatus.newBuilder()
147                   .setStats(workerServer.getStats())
148                   .build());
149             } else {
150               responseObserver.onError(Status.ALREADY_EXISTS
151                   .withDescription("Server already started")
152                   .asRuntimeException());
153             }
154           } catch (Throwable t) {
155             log.log(Level.WARNING, "Error running server", t);
156             responseObserver.onError(Status.INTERNAL.withCause(t).asException());
157             // Shutdown server if we can
158             onCompleted();
159           }
160         }
161 
162         @Override
163         public void onError(Throwable t) {
164           Status status = Status.fromThrowable(t);
165           if (status.getCode() != Status.Code.CANCELLED) {
166             log.log(Level.WARNING, "Error driving server", t);
167           }
168           onCompleted();
169         }
170 
171         @Override
172         public void onCompleted() {
173           try {
174             if (workerServer != null) {
175               workerServer.shutdownNow();
176             }
177           } finally {
178             workerServer = null;
179             responseObserver.onCompleted();
180           }
181         }
182       };
183     }
184 
185     @Override
186     public StreamObserver<ClientArgs> runClient(
187         final StreamObserver<Control.ClientStatus> responseObserver) {
188       return new StreamObserver<ClientArgs>() {
189         @Override
190         public void onNext(ClientArgs value) {
191           try {
192             ClientArgs.ArgtypeCase argTypeCase = value.getArgtypeCase();
193             if (argTypeCase == ClientArgs.ArgtypeCase.SETUP && workerClient == null) {
194               workerClient = new LoadClient(value.getSetup());
195               workerClient.start();
196               responseObserver.onNext(Control.ClientStatus.newBuilder().build());
197             } else if (argTypeCase == ClientArgs.ArgtypeCase.MARK && workerClient != null) {
198               responseObserver.onNext(Control.ClientStatus.newBuilder()
199                   .setStats(workerClient.getStats())
200                   .build());
201             } else {
202               responseObserver.onError(Status.ALREADY_EXISTS
203                   .withDescription("Client already started")
204                   .asRuntimeException());
205             }
206           } catch (Throwable t) {
207             log.log(Level.WARNING, "Error running client", t);
208             responseObserver.onError(Status.INTERNAL.withCause(t).asException());
209             // Shutdown the client if we can
210             onCompleted();
211           }
212         }
213 
214         @Override
215         public void onError(Throwable t) {
216           Status status = Status.fromThrowable(t);
217           if (status.getCode() != Status.Code.CANCELLED) {
218             log.log(Level.WARNING, "Error driving client", t);
219           }
220           onCompleted();
221         }
222 
223         @Override
224         public void onCompleted() {
225           try {
226             if (workerClient != null) {
227               workerClient.shutdownNow();
228             }
229           } finally {
230             workerClient = null;
231             responseObserver.onCompleted();
232           }
233         }
234       };
235     }
236 
237     @Override
238     public void coreCount(Control.CoreRequest request,
239                           StreamObserver<Control.CoreResponse> responseObserver) {
240       responseObserver.onNext(
241           Control.CoreResponse.newBuilder()
242               .setCores(Runtime.getRuntime().availableProcessors())
243               .build());
244       responseObserver.onCompleted();
245     }
246 
247     @Override
248     public void quitWorker(Control.Void request,
249                            StreamObserver<Control.Void> responseObserver) {
250       try {
251         log.log(Level.INFO, "Received quitWorker request.");
252         responseObserver.onNext(Control.Void.getDefaultInstance());
253         responseObserver.onCompleted();
254         driverServer.shutdownNow();
255       } catch (Throwable t) {
256         log.log(Level.WARNING, "Error during shutdown", t);
257       }
258     }
259   }
260 }
261