• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2019 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.examples.hedging;
18 
19 import static java.nio.charset.StandardCharsets.UTF_8;
20 
21 import com.google.gson.Gson;
22 import com.google.gson.stream.JsonReader;
23 import io.grpc.Grpc;
24 import io.grpc.InsecureChannelCredentials;
25 import io.grpc.ManagedChannel;
26 import io.grpc.ManagedChannelBuilder;
27 import io.grpc.StatusRuntimeException;
28 import io.grpc.examples.helloworld.GreeterGrpc;
29 import io.grpc.examples.helloworld.HelloReply;
30 import io.grpc.examples.helloworld.HelloRequest;
31 import java.io.InputStreamReader;
32 import java.util.Map;
33 import java.util.concurrent.ForkJoinPool;
34 import java.util.concurrent.PriorityBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.logging.Level;
38 import java.util.logging.Logger;
39 
40 /**
41  * A client that requests a greeting from the {@link HedgingHelloWorldServer} with a hedging policy.
42  */
43 public class HedgingHelloWorldClient {
44   static final String ENV_DISABLE_HEDGING = "DISABLE_HEDGING_IN_HEDGING_EXAMPLE";
45 
46   private static final Logger logger = Logger.getLogger(HedgingHelloWorldClient.class.getName());
47 
48   private final boolean hedging;
49   private final ManagedChannel channel;
50   private final GreeterGrpc.GreeterBlockingStub blockingStub;
51   private final PriorityBlockingQueue<Long> latencies = new PriorityBlockingQueue<>();
52   private final AtomicInteger failedRpcs = new AtomicInteger();
53 
54   /** Construct client connecting to HelloWorld server at {@code host:port}. */
HedgingHelloWorldClient(String host, int port, boolean hedging)55   public HedgingHelloWorldClient(String host, int port, boolean hedging) {
56     ManagedChannelBuilder<?> channelBuilder
57         = Grpc.newChannelBuilderForAddress(host, port, InsecureChannelCredentials.create());
58     if (hedging) {
59       Map<String, ?> hedgingServiceConfig =
60           new Gson()
61               .fromJson(
62                   new JsonReader(
63                       new InputStreamReader(
64                           HedgingHelloWorldClient.class.getResourceAsStream(
65                               "hedging_service_config.json"),
66                           UTF_8)),
67                   Map.class);
68       channelBuilder.defaultServiceConfig(hedgingServiceConfig).enableRetry();
69     }
70     channel = channelBuilder.build();
71     blockingStub = GreeterGrpc.newBlockingStub(channel);
72     this.hedging = hedging;
73   }
74 
shutdown()75   public void shutdown() throws InterruptedException {
76     channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
77   }
78 
79   /** Say hello to server. */
greet(String name)80   public void greet(String name) {
81     HelloRequest request = HelloRequest.newBuilder().setName(name).build();
82     HelloReply response = null;
83     StatusRuntimeException statusRuntimeException = null;
84     long startTime = System.nanoTime();
85     try {
86       response = blockingStub.sayHello(request);
87     } catch (StatusRuntimeException e) {
88       failedRpcs.incrementAndGet();
89       statusRuntimeException = e;
90     }
91     long latencyMills = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
92     latencies.offer(latencyMills);
93 
94     if (statusRuntimeException == null) {
95       logger.log(
96           Level.INFO,
97           "Greeting: {0}. Latency: {1}ms",
98           new Object[] {response.getMessage(), latencyMills});
99     } else {
100       logger.log(
101           Level.INFO,
102           "RPC failed: {0}. Latency: {1}ms",
103           new Object[] {statusRuntimeException.getStatus(), latencyMills});
104     }
105   }
106 
printSummary()107   void printSummary() {
108     int rpcCount = latencies.size();
109     long latency50 = 0L;
110     long latency90 = 0L;
111     long latency95 = 0L;
112     long latency99 = 0L;
113     long latency999 = 0L;
114     long latencyMax = 0L;
115     for (int i = 0; i < rpcCount; i++) {
116       long latency = latencies.poll();
117       if (i == rpcCount * 50 / 100 - 1) {
118         latency50 = latency;
119       }
120       if (i == rpcCount * 90 / 100 - 1) {
121         latency90 = latency;
122       }
123       if (i == rpcCount * 95 / 100 - 1) {
124         latency95 = latency;
125       }
126       if (i == rpcCount * 99 / 100 - 1) {
127         latency99 = latency;
128       }
129       if (i == rpcCount * 999 / 1000 - 1) {
130         latency999 = latency;
131       }
132       if (i == rpcCount - 1) {
133         latencyMax = latency;
134       }
135     }
136 
137     logger.log(
138         Level.INFO,
139         "\n\nTotal RPCs sent: {0}. Total RPCs failed: {1}\n"
140             + (hedging ? "[Hedging enabled]\n" : "[Hedging disabled]\n")
141             + "========================\n"
142             + "50% latency: {2}ms\n"
143             + "90% latency: {3}ms\n"
144             + "95% latency: {4}ms\n"
145             + "99% latency: {5}ms\n"
146             + "99.9% latency: {6}ms\n"
147             + "Max latency: {7}ms\n"
148             + "========================\n",
149         new Object[]{
150             rpcCount, failedRpcs.get(),
151             latency50, latency90, latency95, latency99, latency999, latencyMax});
152 
153     if (hedging) {
154       logger.log(
155           Level.INFO,
156           "To disable hedging, run the client with environment variable {0}=true.",
157           ENV_DISABLE_HEDGING);
158     } else {
159       logger.log(
160           Level.INFO,
161           "To enable hedging, unset environment variable {0} and then run the client.",
162           ENV_DISABLE_HEDGING);
163     }
164   }
165 
main(String[] args)166   public static void main(String[] args) throws Exception {
167     boolean hedging = !Boolean.parseBoolean(System.getenv(ENV_DISABLE_HEDGING));
168     final HedgingHelloWorldClient client = new HedgingHelloWorldClient("localhost", 50051, hedging);
169     ForkJoinPool executor = new ForkJoinPool();
170 
171     for (int i = 0; i < 2000; i++) {
172       final String userId = "user" + i;
173       executor.execute(
174           new Runnable() {
175             @Override
176             public void run() {
177               client.greet(userId);
178             }
179           });
180     }
181 
182     executor.awaitQuiescence(100, TimeUnit.SECONDS);
183     executor.shutdown();
184     client.printSummary();
185     client.shutdown();
186   }
187 }
188