1 /* 2 * Copyright 2019 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.examples.hedging; 18 19 import static java.nio.charset.StandardCharsets.UTF_8; 20 21 import com.google.gson.Gson; 22 import com.google.gson.stream.JsonReader; 23 import io.grpc.Grpc; 24 import io.grpc.InsecureChannelCredentials; 25 import io.grpc.ManagedChannel; 26 import io.grpc.ManagedChannelBuilder; 27 import io.grpc.StatusRuntimeException; 28 import io.grpc.examples.helloworld.GreeterGrpc; 29 import io.grpc.examples.helloworld.HelloReply; 30 import io.grpc.examples.helloworld.HelloRequest; 31 import java.io.InputStreamReader; 32 import java.util.Map; 33 import java.util.concurrent.ForkJoinPool; 34 import java.util.concurrent.PriorityBlockingQueue; 35 import java.util.concurrent.TimeUnit; 36 import java.util.concurrent.atomic.AtomicInteger; 37 import java.util.logging.Level; 38 import java.util.logging.Logger; 39 40 /** 41 * A client that requests a greeting from the {@link HedgingHelloWorldServer} with a hedging policy. 42 */ 43 public class HedgingHelloWorldClient { 44 static final String ENV_DISABLE_HEDGING = "DISABLE_HEDGING_IN_HEDGING_EXAMPLE"; 45 46 private static final Logger logger = Logger.getLogger(HedgingHelloWorldClient.class.getName()); 47 48 private final boolean hedging; 49 private final ManagedChannel channel; 50 private final GreeterGrpc.GreeterBlockingStub blockingStub; 51 private final PriorityBlockingQueue<Long> latencies = new PriorityBlockingQueue<>(); 52 private final AtomicInteger failedRpcs = new AtomicInteger(); 53 54 /** Construct client connecting to HelloWorld server at {@code host:port}. */ HedgingHelloWorldClient(String host, int port, boolean hedging)55 public HedgingHelloWorldClient(String host, int port, boolean hedging) { 56 ManagedChannelBuilder<?> channelBuilder 57 = Grpc.newChannelBuilderForAddress(host, port, InsecureChannelCredentials.create()); 58 if (hedging) { 59 Map<String, ?> hedgingServiceConfig = 60 new Gson() 61 .fromJson( 62 new JsonReader( 63 new InputStreamReader( 64 HedgingHelloWorldClient.class.getResourceAsStream( 65 "hedging_service_config.json"), 66 UTF_8)), 67 Map.class); 68 channelBuilder.defaultServiceConfig(hedgingServiceConfig).enableRetry(); 69 } 70 channel = channelBuilder.build(); 71 blockingStub = GreeterGrpc.newBlockingStub(channel); 72 this.hedging = hedging; 73 } 74 shutdown()75 public void shutdown() throws InterruptedException { 76 channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); 77 } 78 79 /** Say hello to server. */ greet(String name)80 public void greet(String name) { 81 HelloRequest request = HelloRequest.newBuilder().setName(name).build(); 82 HelloReply response = null; 83 StatusRuntimeException statusRuntimeException = null; 84 long startTime = System.nanoTime(); 85 try { 86 response = blockingStub.sayHello(request); 87 } catch (StatusRuntimeException e) { 88 failedRpcs.incrementAndGet(); 89 statusRuntimeException = e; 90 } 91 long latencyMills = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); 92 latencies.offer(latencyMills); 93 94 if (statusRuntimeException == null) { 95 logger.log( 96 Level.INFO, 97 "Greeting: {0}. Latency: {1}ms", 98 new Object[] {response.getMessage(), latencyMills}); 99 } else { 100 logger.log( 101 Level.INFO, 102 "RPC failed: {0}. Latency: {1}ms", 103 new Object[] {statusRuntimeException.getStatus(), latencyMills}); 104 } 105 } 106 printSummary()107 void printSummary() { 108 int rpcCount = latencies.size(); 109 long latency50 = 0L; 110 long latency90 = 0L; 111 long latency95 = 0L; 112 long latency99 = 0L; 113 long latency999 = 0L; 114 long latencyMax = 0L; 115 for (int i = 0; i < rpcCount; i++) { 116 long latency = latencies.poll(); 117 if (i == rpcCount * 50 / 100 - 1) { 118 latency50 = latency; 119 } 120 if (i == rpcCount * 90 / 100 - 1) { 121 latency90 = latency; 122 } 123 if (i == rpcCount * 95 / 100 - 1) { 124 latency95 = latency; 125 } 126 if (i == rpcCount * 99 / 100 - 1) { 127 latency99 = latency; 128 } 129 if (i == rpcCount * 999 / 1000 - 1) { 130 latency999 = latency; 131 } 132 if (i == rpcCount - 1) { 133 latencyMax = latency; 134 } 135 } 136 137 logger.log( 138 Level.INFO, 139 "\n\nTotal RPCs sent: {0}. Total RPCs failed: {1}\n" 140 + (hedging ? "[Hedging enabled]\n" : "[Hedging disabled]\n") 141 + "========================\n" 142 + "50% latency: {2}ms\n" 143 + "90% latency: {3}ms\n" 144 + "95% latency: {4}ms\n" 145 + "99% latency: {5}ms\n" 146 + "99.9% latency: {6}ms\n" 147 + "Max latency: {7}ms\n" 148 + "========================\n", 149 new Object[]{ 150 rpcCount, failedRpcs.get(), 151 latency50, latency90, latency95, latency99, latency999, latencyMax}); 152 153 if (hedging) { 154 logger.log( 155 Level.INFO, 156 "To disable hedging, run the client with environment variable {0}=true.", 157 ENV_DISABLE_HEDGING); 158 } else { 159 logger.log( 160 Level.INFO, 161 "To enable hedging, unset environment variable {0} and then run the client.", 162 ENV_DISABLE_HEDGING); 163 } 164 } 165 main(String[] args)166 public static void main(String[] args) throws Exception { 167 boolean hedging = !Boolean.parseBoolean(System.getenv(ENV_DISABLE_HEDGING)); 168 final HedgingHelloWorldClient client = new HedgingHelloWorldClient("localhost", 50051, hedging); 169 ForkJoinPool executor = new ForkJoinPool(); 170 171 for (int i = 0; i < 2000; i++) { 172 final String userId = "user" + i; 173 executor.execute( 174 new Runnable() { 175 @Override 176 public void run() { 177 client.greet(userId); 178 } 179 }); 180 } 181 182 executor.awaitQuiescence(100, TimeUnit.SECONDS); 183 executor.shutdown(); 184 client.printSummary(); 185 client.shutdown(); 186 } 187 } 188