• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.benchmarks.qps;
18 
19 import static io.grpc.benchmarks.Utils.HISTOGRAM_MAX_VALUE;
20 import static io.grpc.benchmarks.Utils.HISTOGRAM_PRECISION;
21 import static io.grpc.benchmarks.Utils.saveHistogram;
22 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.ADDRESS;
23 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CHANNELS;
24 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CLIENT_PAYLOAD;
25 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DIRECTEXECUTOR;
26 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DURATION;
27 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.FLOW_CONTROL_WINDOW;
28 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.OUTSTANDING_RPCS;
29 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SAVE_HISTOGRAM;
30 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SERVER_PAYLOAD;
31 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.STREAMING_RPCS;
32 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TESTCA;
33 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS;
34 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TRANSPORT;
35 import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.WARMUP_DURATION;
36 
37 import com.google.common.base.Preconditions;
38 import com.google.protobuf.ByteString;
39 import io.grpc.Channel;
40 import io.grpc.ManagedChannel;
41 import io.grpc.Status;
42 import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
43 import io.grpc.benchmarks.proto.BenchmarkServiceGrpc.BenchmarkServiceStub;
44 import io.grpc.benchmarks.proto.Messages.Payload;
45 import io.grpc.benchmarks.proto.Messages.SimpleRequest;
46 import io.grpc.benchmarks.proto.Messages.SimpleResponse;
47 import io.grpc.stub.StreamObserver;
48 import java.util.ArrayList;
49 import java.util.List;
50 import java.util.concurrent.CancellationException;
51 import java.util.concurrent.Future;
52 import java.util.concurrent.TimeUnit;
53 import org.HdrHistogram.Histogram;
54 import org.HdrHistogram.HistogramIterationValue;
55 
56 /**
57  * QPS Client using the non-blocking API.
58  */
59 public class AsyncClient {
60 
61   private final ClientConfiguration config;
62 
AsyncClient(ClientConfiguration config)63   public AsyncClient(ClientConfiguration config) {
64     this.config = config;
65   }
66 
67   /**
68    * Start the QPS Client.
69    */
run()70   public void run() throws Exception {
71     if (config == null) {
72       return;
73     }
74 
75     SimpleRequest req = newRequest();
76 
77     List<ManagedChannel> channels = new ArrayList<>(config.channels);
78     for (int i = 0; i < config.channels; i++) {
79       channels.add(config.newChannel());
80     }
81 
82     // Do a warmup first. It's the same as the actual benchmark, except that
83     // we ignore the statistics.
84     warmup(req, channels);
85 
86     long startTime = System.nanoTime();
87     long endTime = startTime + TimeUnit.SECONDS.toNanos(config.duration);
88     List<Histogram> histograms = doBenchmark(req, channels, endTime);
89     long elapsedTime = System.nanoTime() - startTime;
90 
91     Histogram merged = merge(histograms);
92 
93     printStats(merged, elapsedTime);
94     if (config.histogramFile != null) {
95       saveHistogram(merged, config.histogramFile);
96     }
97     shutdown(channels);
98   }
99 
newRequest()100   private SimpleRequest newRequest() {
101     ByteString body = ByteString.copyFrom(new byte[config.clientPayload]);
102     Payload payload = Payload.newBuilder().setType(config.payloadType).setBody(body).build();
103 
104     return SimpleRequest.newBuilder()
105             .setResponseType(config.payloadType)
106             .setResponseSize(config.serverPayload)
107             .setPayload(payload)
108             .build();
109   }
110 
warmup(SimpleRequest req, List<? extends Channel> channels)111   private void warmup(SimpleRequest req, List<? extends Channel> channels) throws Exception {
112     long endTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(config.warmupDuration);
113     doBenchmark(req, channels, endTime);
114     // I don't know if this helps, but it doesn't hurt trying. We sometimes run warmups
115     // of several minutes at full load and it would be nice to start the actual benchmark
116     // with a clean heap.
117     System.gc();
118   }
119 
doBenchmark(SimpleRequest req, List<? extends Channel> channels, long endTime)120   private List<Histogram> doBenchmark(SimpleRequest req,
121                                       List<? extends Channel> channels,
122                                       long endTime) throws Exception {
123     // Initiate the concurrent calls
124     List<Future<Histogram>> futures =
125         new ArrayList<Future<Histogram>>(config.outstandingRpcsPerChannel);
126     for (int i = 0; i < config.channels; i++) {
127       for (int j = 0; j < config.outstandingRpcsPerChannel; j++) {
128         Channel channel = channels.get(i);
129         futures.add(doRpcs(channel, req, endTime));
130       }
131     }
132     // Wait for completion
133     List<Histogram> histograms = new ArrayList<>(futures.size());
134     for (Future<Histogram> future : futures) {
135       histograms.add(future.get());
136     }
137     return histograms;
138   }
139 
doRpcs(Channel channel, SimpleRequest request, long endTime)140   private Future<Histogram> doRpcs(Channel channel, SimpleRequest request, long endTime) {
141     switch (config.rpcType) {
142       case UNARY:
143         return doUnaryCalls(channel, request, endTime);
144       case STREAMING:
145         return doStreamingCalls(channel, request, endTime);
146       default:
147         throw new IllegalStateException("unsupported rpc type");
148     }
149   }
150 
doUnaryCalls(Channel channel, final SimpleRequest request, final long endTime)151   private Future<Histogram> doUnaryCalls(Channel channel, final SimpleRequest request,
152                                          final long endTime) {
153     final BenchmarkServiceStub stub = BenchmarkServiceGrpc.newStub(channel);
154     final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
155     final HistogramFuture future = new HistogramFuture(histogram);
156 
157     stub.unaryCall(request, new StreamObserver<SimpleResponse>() {
158       long lastCall = System.nanoTime();
159 
160       @Override
161       public void onNext(SimpleResponse value) {
162       }
163 
164       @Override
165       public void onError(Throwable t) {
166         Status status = Status.fromThrowable(t);
167         System.err.println("Encountered an error in unaryCall. Status is " + status);
168         t.printStackTrace();
169 
170         future.cancel(true);
171       }
172 
173       @Override
174       public void onCompleted() {
175         long now = System.nanoTime();
176         // Record the latencies in microseconds
177         histogram.recordValue((now - lastCall) / 1000);
178         lastCall = now;
179 
180         if (endTime - now > 0) {
181           stub.unaryCall(request, this);
182         } else {
183           future.done();
184         }
185       }
186     });
187 
188     return future;
189   }
190 
doStreamingCalls(Channel channel, final SimpleRequest request, final long endTime)191   private static Future<Histogram> doStreamingCalls(Channel channel, final SimpleRequest request,
192                                              final long endTime) {
193     final BenchmarkServiceStub stub = BenchmarkServiceGrpc.newStub(channel);
194     final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
195     final HistogramFuture future = new HistogramFuture(histogram);
196 
197     ThisIsAHackStreamObserver responseObserver =
198         new ThisIsAHackStreamObserver(request, histogram, future, endTime);
199 
200     StreamObserver<SimpleRequest> requestObserver = stub.streamingCall(responseObserver);
201     responseObserver.requestObserver = requestObserver;
202     requestObserver.onNext(request);
203     return future;
204   }
205 
206   /**
207    * This seems necessary as we need to reference the requestObserver in the responseObserver.
208    * The alternative would be to use the channel layer directly.
209    */
210   private static class ThisIsAHackStreamObserver implements StreamObserver<SimpleResponse> {
211 
212     final SimpleRequest request;
213     final Histogram histogram;
214     final HistogramFuture future;
215     final long endTime;
216     long lastCall = System.nanoTime();
217 
218     StreamObserver<SimpleRequest> requestObserver;
219 
ThisIsAHackStreamObserver(SimpleRequest request, Histogram histogram, HistogramFuture future, long endTime)220     ThisIsAHackStreamObserver(SimpleRequest request,
221                               Histogram histogram,
222                               HistogramFuture future,
223                               long endTime) {
224       this.request = request;
225       this.histogram = histogram;
226       this.future = future;
227       this.endTime = endTime;
228     }
229 
230     @Override
onNext(SimpleResponse value)231     public void onNext(SimpleResponse value) {
232       long now = System.nanoTime();
233       // Record the latencies in microseconds
234       histogram.recordValue((now - lastCall) / 1000);
235       lastCall = now;
236 
237       if (endTime - now > 0) {
238         requestObserver.onNext(request);
239       } else {
240         requestObserver.onCompleted();
241       }
242     }
243 
244     @Override
onError(Throwable t)245     public void onError(Throwable t) {
246       Status status = Status.fromThrowable(t);
247       System.err.println("Encountered an error in streamingCall. Status is " + status);
248       t.printStackTrace();
249 
250       future.cancel(true);
251     }
252 
253     @Override
onCompleted()254     public void onCompleted() {
255       future.done();
256     }
257   }
258 
merge(List<Histogram> histograms)259   private static Histogram merge(List<Histogram> histograms) {
260     Histogram merged = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
261     for (Histogram histogram : histograms) {
262       for (HistogramIterationValue value : histogram.allValues()) {
263         long latency = value.getValueIteratedTo();
264         long count = value.getCountAtValueIteratedTo();
265         merged.recordValueWithCount(latency, count);
266       }
267     }
268     return merged;
269   }
270 
printStats(Histogram histogram, long elapsedTime)271   private void printStats(Histogram histogram, long elapsedTime) {
272     long latency50 = histogram.getValueAtPercentile(50);
273     long latency90 = histogram.getValueAtPercentile(90);
274     long latency95 = histogram.getValueAtPercentile(95);
275     long latency99 = histogram.getValueAtPercentile(99);
276     long latency999 = histogram.getValueAtPercentile(99.9);
277     long latencyMax = histogram.getValueAtPercentile(100);
278     long queriesPerSecond = histogram.getTotalCount() * 1000000000L / elapsedTime;
279 
280     StringBuilder values = new StringBuilder();
281     values.append("Channels:                       ").append(config.channels).append('\n')
282           .append("Outstanding RPCs per Channel:   ")
283           .append(config.outstandingRpcsPerChannel).append('\n')
284           .append("Server Payload Size:            ").append(config.serverPayload).append('\n')
285           .append("Client Payload Size:            ").append(config.clientPayload).append('\n')
286           .append("50%ile Latency (in micros):     ").append(latency50).append('\n')
287           .append("90%ile Latency (in micros):     ").append(latency90).append('\n')
288           .append("95%ile Latency (in micros):     ").append(latency95).append('\n')
289           .append("99%ile Latency (in micros):     ").append(latency99).append('\n')
290           .append("99.9%ile Latency (in micros):   ").append(latency999).append('\n')
291           .append("Maximum Latency (in micros):    ").append(latencyMax).append('\n')
292           .append("QPS:                            ").append(queriesPerSecond).append('\n');
293     System.out.println(values);
294   }
295 
shutdown(List<ManagedChannel> channels)296   private static void shutdown(List<ManagedChannel> channels) {
297     for (ManagedChannel channel : channels) {
298       channel.shutdown();
299     }
300   }
301 
302   /**
303    * checkstyle complains if there is no javadoc comment here.
304    */
main(String... args)305   public static void main(String... args) throws Exception {
306     ClientConfiguration.Builder configBuilder = ClientConfiguration.newBuilder(
307         ADDRESS, CHANNELS, OUTSTANDING_RPCS, CLIENT_PAYLOAD, SERVER_PAYLOAD,
308         TLS, TESTCA, TRANSPORT, DURATION, WARMUP_DURATION, DIRECTEXECUTOR,
309         SAVE_HISTOGRAM, STREAMING_RPCS, FLOW_CONTROL_WINDOW);
310     ClientConfiguration config;
311     try {
312       config = configBuilder.build(args);
313     } catch (Exception e) {
314       System.out.println(e.getMessage());
315       configBuilder.printUsage();
316       return;
317     }
318     AsyncClient client = new AsyncClient(config);
319     client.run();
320   }
321 
322   private static class HistogramFuture implements Future<Histogram> {
323     private final Histogram histogram;
324     private boolean canceled;
325     private boolean done;
326 
HistogramFuture(Histogram histogram)327     HistogramFuture(Histogram histogram) {
328       Preconditions.checkNotNull(histogram, "histogram");
329       this.histogram = histogram;
330     }
331 
332     @Override
cancel(boolean mayInterruptIfRunning)333     public synchronized boolean cancel(boolean mayInterruptIfRunning) {
334       if (!done && !canceled) {
335         canceled = true;
336         notifyAll();
337         return true;
338       }
339       return false;
340     }
341 
342     @Override
isCancelled()343     public synchronized boolean isCancelled() {
344       return canceled;
345     }
346 
347     @Override
isDone()348     public synchronized boolean isDone() {
349       return done || canceled;
350     }
351 
352     @Override
get()353     public synchronized Histogram get() throws InterruptedException {
354       while (!isDone() && !isCancelled()) {
355         wait();
356       }
357 
358       if (isCancelled()) {
359         throw new CancellationException();
360       }
361 
362       return histogram;
363     }
364 
365     @Override
get(long timeout, TimeUnit unit)366     public Histogram get(long timeout, TimeUnit unit) {
367       throw new UnsupportedOperationException();
368     }
369 
done()370     private synchronized void done() {
371       done = true;
372       notifyAll();
373     }
374   }
375 }
376