1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.benchmarks.qps; 18 19 import static io.grpc.benchmarks.Utils.HISTOGRAM_MAX_VALUE; 20 import static io.grpc.benchmarks.Utils.HISTOGRAM_PRECISION; 21 import static io.grpc.benchmarks.Utils.saveHistogram; 22 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.ADDRESS; 23 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CHANNELS; 24 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CLIENT_PAYLOAD; 25 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DIRECTEXECUTOR; 26 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DURATION; 27 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.FLOW_CONTROL_WINDOW; 28 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.OUTSTANDING_RPCS; 29 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SAVE_HISTOGRAM; 30 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SERVER_PAYLOAD; 31 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.STREAMING_RPCS; 32 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TESTCA; 33 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS; 34 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TRANSPORT; 35 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.WARMUP_DURATION; 36 37 import com.google.common.base.Preconditions; 38 import com.google.protobuf.ByteString; 39 import io.grpc.Channel; 40 import io.grpc.ManagedChannel; 41 import io.grpc.Status; 42 import io.grpc.benchmarks.proto.BenchmarkServiceGrpc; 43 import io.grpc.benchmarks.proto.BenchmarkServiceGrpc.BenchmarkServiceStub; 44 import io.grpc.benchmarks.proto.Messages.Payload; 45 import io.grpc.benchmarks.proto.Messages.SimpleRequest; 46 import io.grpc.benchmarks.proto.Messages.SimpleResponse; 47 import io.grpc.stub.StreamObserver; 48 import java.util.ArrayList; 49 import java.util.List; 50 import java.util.concurrent.CancellationException; 51 import java.util.concurrent.Future; 52 import java.util.concurrent.TimeUnit; 53 import org.HdrHistogram.Histogram; 54 import org.HdrHistogram.HistogramIterationValue; 55 56 /** 57 * QPS Client using the non-blocking API. 58 */ 59 public class AsyncClient { 60 61 private final ClientConfiguration config; 62 AsyncClient(ClientConfiguration config)63 public AsyncClient(ClientConfiguration config) { 64 this.config = config; 65 } 66 67 /** 68 * Start the QPS Client. 69 */ run()70 public void run() throws Exception { 71 if (config == null) { 72 return; 73 } 74 75 SimpleRequest req = newRequest(); 76 77 List<ManagedChannel> channels = new ArrayList<>(config.channels); 78 for (int i = 0; i < config.channels; i++) { 79 channels.add(config.newChannel()); 80 } 81 82 // Do a warmup first. It's the same as the actual benchmark, except that 83 // we ignore the statistics. 84 warmup(req, channels); 85 86 long startTime = System.nanoTime(); 87 long endTime = startTime + TimeUnit.SECONDS.toNanos(config.duration); 88 List<Histogram> histograms = doBenchmark(req, channels, endTime); 89 long elapsedTime = System.nanoTime() - startTime; 90 91 Histogram merged = merge(histograms); 92 93 printStats(merged, elapsedTime); 94 if (config.histogramFile != null) { 95 saveHistogram(merged, config.histogramFile); 96 } 97 shutdown(channels); 98 } 99 newRequest()100 private SimpleRequest newRequest() { 101 ByteString body = ByteString.copyFrom(new byte[config.clientPayload]); 102 Payload payload = Payload.newBuilder().setType(config.payloadType).setBody(body).build(); 103 104 return SimpleRequest.newBuilder() 105 .setResponseType(config.payloadType) 106 .setResponseSize(config.serverPayload) 107 .setPayload(payload) 108 .build(); 109 } 110 warmup(SimpleRequest req, List<? extends Channel> channels)111 private void warmup(SimpleRequest req, List<? extends Channel> channels) throws Exception { 112 long endTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(config.warmupDuration); 113 doBenchmark(req, channels, endTime); 114 // I don't know if this helps, but it doesn't hurt trying. We sometimes run warmups 115 // of several minutes at full load and it would be nice to start the actual benchmark 116 // with a clean heap. 117 System.gc(); 118 } 119 doBenchmark(SimpleRequest req, List<? extends Channel> channels, long endTime)120 private List<Histogram> doBenchmark(SimpleRequest req, 121 List<? extends Channel> channels, 122 long endTime) throws Exception { 123 // Initiate the concurrent calls 124 List<Future<Histogram>> futures = 125 new ArrayList<Future<Histogram>>(config.outstandingRpcsPerChannel); 126 for (int i = 0; i < config.channels; i++) { 127 for (int j = 0; j < config.outstandingRpcsPerChannel; j++) { 128 Channel channel = channels.get(i); 129 futures.add(doRpcs(channel, req, endTime)); 130 } 131 } 132 // Wait for completion 133 List<Histogram> histograms = new ArrayList<>(futures.size()); 134 for (Future<Histogram> future : futures) { 135 histograms.add(future.get()); 136 } 137 return histograms; 138 } 139 doRpcs(Channel channel, SimpleRequest request, long endTime)140 private Future<Histogram> doRpcs(Channel channel, SimpleRequest request, long endTime) { 141 switch (config.rpcType) { 142 case UNARY: 143 return doUnaryCalls(channel, request, endTime); 144 case STREAMING: 145 return doStreamingCalls(channel, request, endTime); 146 default: 147 throw new IllegalStateException("unsupported rpc type"); 148 } 149 } 150 doUnaryCalls(Channel channel, final SimpleRequest request, final long endTime)151 private Future<Histogram> doUnaryCalls(Channel channel, final SimpleRequest request, 152 final long endTime) { 153 final BenchmarkServiceStub stub = BenchmarkServiceGrpc.newStub(channel); 154 final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION); 155 final HistogramFuture future = new HistogramFuture(histogram); 156 157 stub.unaryCall(request, new StreamObserver<SimpleResponse>() { 158 long lastCall = System.nanoTime(); 159 160 @Override 161 public void onNext(SimpleResponse value) { 162 } 163 164 @Override 165 public void onError(Throwable t) { 166 Status status = Status.fromThrowable(t); 167 System.err.println("Encountered an error in unaryCall. Status is " + status); 168 t.printStackTrace(); 169 170 future.cancel(true); 171 } 172 173 @Override 174 public void onCompleted() { 175 long now = System.nanoTime(); 176 // Record the latencies in microseconds 177 histogram.recordValue((now - lastCall) / 1000); 178 lastCall = now; 179 180 if (endTime - now > 0) { 181 stub.unaryCall(request, this); 182 } else { 183 future.done(); 184 } 185 } 186 }); 187 188 return future; 189 } 190 doStreamingCalls(Channel channel, final SimpleRequest request, final long endTime)191 private static Future<Histogram> doStreamingCalls(Channel channel, final SimpleRequest request, 192 final long endTime) { 193 final BenchmarkServiceStub stub = BenchmarkServiceGrpc.newStub(channel); 194 final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION); 195 final HistogramFuture future = new HistogramFuture(histogram); 196 197 ThisIsAHackStreamObserver responseObserver = 198 new ThisIsAHackStreamObserver(request, histogram, future, endTime); 199 200 StreamObserver<SimpleRequest> requestObserver = stub.streamingCall(responseObserver); 201 responseObserver.requestObserver = requestObserver; 202 requestObserver.onNext(request); 203 return future; 204 } 205 206 /** 207 * This seems necessary as we need to reference the requestObserver in the responseObserver. 208 * The alternative would be to use the channel layer directly. 209 */ 210 private static class ThisIsAHackStreamObserver implements StreamObserver<SimpleResponse> { 211 212 final SimpleRequest request; 213 final Histogram histogram; 214 final HistogramFuture future; 215 final long endTime; 216 long lastCall = System.nanoTime(); 217 218 StreamObserver<SimpleRequest> requestObserver; 219 ThisIsAHackStreamObserver(SimpleRequest request, Histogram histogram, HistogramFuture future, long endTime)220 ThisIsAHackStreamObserver(SimpleRequest request, 221 Histogram histogram, 222 HistogramFuture future, 223 long endTime) { 224 this.request = request; 225 this.histogram = histogram; 226 this.future = future; 227 this.endTime = endTime; 228 } 229 230 @Override onNext(SimpleResponse value)231 public void onNext(SimpleResponse value) { 232 long now = System.nanoTime(); 233 // Record the latencies in microseconds 234 histogram.recordValue((now - lastCall) / 1000); 235 lastCall = now; 236 237 if (endTime - now > 0) { 238 requestObserver.onNext(request); 239 } else { 240 requestObserver.onCompleted(); 241 } 242 } 243 244 @Override onError(Throwable t)245 public void onError(Throwable t) { 246 Status status = Status.fromThrowable(t); 247 System.err.println("Encountered an error in streamingCall. Status is " + status); 248 t.printStackTrace(); 249 250 future.cancel(true); 251 } 252 253 @Override onCompleted()254 public void onCompleted() { 255 future.done(); 256 } 257 } 258 merge(List<Histogram> histograms)259 private static Histogram merge(List<Histogram> histograms) { 260 Histogram merged = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION); 261 for (Histogram histogram : histograms) { 262 for (HistogramIterationValue value : histogram.allValues()) { 263 long latency = value.getValueIteratedTo(); 264 long count = value.getCountAtValueIteratedTo(); 265 merged.recordValueWithCount(latency, count); 266 } 267 } 268 return merged; 269 } 270 printStats(Histogram histogram, long elapsedTime)271 private void printStats(Histogram histogram, long elapsedTime) { 272 long latency50 = histogram.getValueAtPercentile(50); 273 long latency90 = histogram.getValueAtPercentile(90); 274 long latency95 = histogram.getValueAtPercentile(95); 275 long latency99 = histogram.getValueAtPercentile(99); 276 long latency999 = histogram.getValueAtPercentile(99.9); 277 long latencyMax = histogram.getValueAtPercentile(100); 278 long queriesPerSecond = histogram.getTotalCount() * 1000000000L / elapsedTime; 279 280 StringBuilder values = new StringBuilder(); 281 values.append("Channels: ").append(config.channels).append('\n') 282 .append("Outstanding RPCs per Channel: ") 283 .append(config.outstandingRpcsPerChannel).append('\n') 284 .append("Server Payload Size: ").append(config.serverPayload).append('\n') 285 .append("Client Payload Size: ").append(config.clientPayload).append('\n') 286 .append("50%ile Latency (in micros): ").append(latency50).append('\n') 287 .append("90%ile Latency (in micros): ").append(latency90).append('\n') 288 .append("95%ile Latency (in micros): ").append(latency95).append('\n') 289 .append("99%ile Latency (in micros): ").append(latency99).append('\n') 290 .append("99.9%ile Latency (in micros): ").append(latency999).append('\n') 291 .append("Maximum Latency (in micros): ").append(latencyMax).append('\n') 292 .append("QPS: ").append(queriesPerSecond).append('\n'); 293 System.out.println(values); 294 } 295 shutdown(List<ManagedChannel> channels)296 private static void shutdown(List<ManagedChannel> channels) { 297 for (ManagedChannel channel : channels) { 298 channel.shutdown(); 299 } 300 } 301 302 /** 303 * checkstyle complains if there is no javadoc comment here. 304 */ main(String... args)305 public static void main(String... args) throws Exception { 306 ClientConfiguration.Builder configBuilder = ClientConfiguration.newBuilder( 307 ADDRESS, CHANNELS, OUTSTANDING_RPCS, CLIENT_PAYLOAD, SERVER_PAYLOAD, 308 TLS, TESTCA, TRANSPORT, DURATION, WARMUP_DURATION, DIRECTEXECUTOR, 309 SAVE_HISTOGRAM, STREAMING_RPCS, FLOW_CONTROL_WINDOW); 310 ClientConfiguration config; 311 try { 312 config = configBuilder.build(args); 313 } catch (Exception e) { 314 System.out.println(e.getMessage()); 315 configBuilder.printUsage(); 316 return; 317 } 318 AsyncClient client = new AsyncClient(config); 319 client.run(); 320 } 321 322 private static class HistogramFuture implements Future<Histogram> { 323 private final Histogram histogram; 324 private boolean canceled; 325 private boolean done; 326 HistogramFuture(Histogram histogram)327 HistogramFuture(Histogram histogram) { 328 Preconditions.checkNotNull(histogram, "histogram"); 329 this.histogram = histogram; 330 } 331 332 @Override cancel(boolean mayInterruptIfRunning)333 public synchronized boolean cancel(boolean mayInterruptIfRunning) { 334 if (!done && !canceled) { 335 canceled = true; 336 notifyAll(); 337 return true; 338 } 339 return false; 340 } 341 342 @Override isCancelled()343 public synchronized boolean isCancelled() { 344 return canceled; 345 } 346 347 @Override isDone()348 public synchronized boolean isDone() { 349 return done || canceled; 350 } 351 352 @Override get()353 public synchronized Histogram get() throws InterruptedException { 354 while (!isDone() && !isCancelled()) { 355 wait(); 356 } 357 358 if (isCancelled()) { 359 throw new CancellationException(); 360 } 361 362 return histogram; 363 } 364 365 @Override get(long timeout, TimeUnit unit)366 public Histogram get(long timeout, TimeUnit unit) { 367 throw new UnsupportedOperationException(); 368 } 369 done()370 private synchronized void done() { 371 done = true; 372 notifyAll(); 373 } 374 } 375 } 376