1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.benchmarks; 18 19 import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory; 20 21 import com.google.common.util.concurrent.UncaughtExceptionHandlers; 22 import com.google.protobuf.ByteString; 23 import io.grpc.ManagedChannel; 24 import io.grpc.ManagedChannelBuilder; 25 import io.grpc.Status; 26 import io.grpc.benchmarks.proto.Messages; 27 import io.grpc.benchmarks.proto.Messages.Payload; 28 import io.grpc.benchmarks.proto.Messages.SimpleRequest; 29 import io.grpc.benchmarks.proto.Messages.SimpleResponse; 30 import io.grpc.internal.testing.TestUtils; 31 import io.grpc.netty.GrpcSslContexts; 32 import io.grpc.netty.NettyChannelBuilder; 33 import io.grpc.okhttp.OkHttpChannelBuilder; 34 import io.grpc.okhttp.internal.Platform; 35 import io.netty.channel.epoll.EpollDomainSocketChannel; 36 import io.netty.channel.epoll.EpollEventLoopGroup; 37 import io.netty.channel.epoll.EpollSocketChannel; 38 import io.netty.channel.nio.NioEventLoopGroup; 39 import io.netty.channel.socket.nio.NioSocketChannel; 40 import io.netty.channel.unix.DomainSocketAddress; 41 import io.netty.util.concurrent.DefaultThreadFactory; 42 import java.io.File; 43 import java.io.FileOutputStream; 44 import java.io.IOException; 45 import java.io.PrintStream; 46 import java.net.InetSocketAddress; 47 import java.net.ServerSocket; 48 import java.net.SocketAddress; 49 import java.util.concurrent.ExecutorService; 50 import java.util.concurrent.ForkJoinPool; 51 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 52 import java.util.concurrent.ForkJoinWorkerThread; 53 import java.util.concurrent.atomic.AtomicInteger; 54 import javax.annotation.Nullable; 55 import org.HdrHistogram.Histogram; 56 57 /** 58 * Utility methods to support benchmarking classes. 59 */ 60 public final class Utils { 61 private static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://"; 62 63 // The histogram can record values between 1 microsecond and 1 min. 64 public static final long HISTOGRAM_MAX_VALUE = 60000000L; 65 66 // Value quantization will be no more than 1%. See the README of HdrHistogram for more details. 67 public static final int HISTOGRAM_PRECISION = 2; 68 69 public static final int DEFAULT_FLOW_CONTROL_WINDOW = 70 NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW; 71 Utils()72 private Utils() { 73 } 74 parseBoolean(String value)75 public static boolean parseBoolean(String value) { 76 return value.isEmpty() || Boolean.parseBoolean(value); 77 } 78 79 /** 80 * Parse a {@link SocketAddress} from the given string. 81 */ parseSocketAddress(String value)82 public static SocketAddress parseSocketAddress(String value) { 83 if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) { 84 // Unix Domain Socket address. 85 // Create the underlying file for the Unix Domain Socket. 86 String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length()); 87 File file = new File(filePath); 88 if (!file.isAbsolute()) { 89 throw new IllegalArgumentException("File path must be absolute: " + filePath); 90 } 91 try { 92 if (file.createNewFile()) { 93 // If this application created the file, delete it when the application exits. 94 file.deleteOnExit(); 95 } 96 } catch (IOException ex) { 97 throw new RuntimeException(ex); 98 } 99 // Create the SocketAddress referencing the file. 100 return new DomainSocketAddress(file); 101 } else { 102 // Standard TCP/IP address. 103 String[] parts = value.split(":", 2); 104 if (parts.length < 2) { 105 throw new IllegalArgumentException( 106 "Address must be a unix:// path or be in the form host:port. Got: " + value); 107 } 108 String host = parts[0]; 109 int port = Integer.parseInt(parts[1]); 110 return new InetSocketAddress(host, port); 111 } 112 } 113 newOkHttpClientChannel( SocketAddress address, boolean tls, boolean testca)114 private static OkHttpChannelBuilder newOkHttpClientChannel( 115 SocketAddress address, boolean tls, boolean testca) { 116 InetSocketAddress addr = (InetSocketAddress) address; 117 OkHttpChannelBuilder builder = 118 OkHttpChannelBuilder.forAddress(addr.getHostName(), addr.getPort()); 119 if (!tls) { 120 builder.usePlaintext(); 121 } else if (testca) { 122 try { 123 builder.sslSocketFactory(TestUtils.newSslSocketFactoryForCa( 124 Platform.get().getProvider(), 125 TestUtils.loadCert("ca.pem"))); 126 } catch (Exception e) { 127 throw new RuntimeException(e); 128 } 129 } 130 return builder; 131 } 132 newNettyClientChannel(Transport transport, SocketAddress address, boolean tls, boolean testca, int flowControlWindow)133 private static NettyChannelBuilder newNettyClientChannel(Transport transport, 134 SocketAddress address, boolean tls, boolean testca, int flowControlWindow) 135 throws IOException { 136 NettyChannelBuilder builder = 137 NettyChannelBuilder.forAddress(address).flowControlWindow(flowControlWindow); 138 if (!tls) { 139 builder.usePlaintext(); 140 } else if (testca) { 141 File cert = TestUtils.loadCert("ca.pem"); 142 builder.sslContext(GrpcSslContexts.forClient().trustManager(cert).build()); 143 } 144 145 DefaultThreadFactory tf = new DefaultThreadFactory("client-elg-", true /*daemon */); 146 switch (transport) { 147 case NETTY_NIO: 148 builder 149 .eventLoopGroup(new NioEventLoopGroup(0, tf)) 150 .channelType(NioSocketChannel.class); 151 break; 152 153 case NETTY_EPOLL: 154 // These classes only work on Linux. 155 builder 156 .eventLoopGroup(new EpollEventLoopGroup(0, tf)) 157 .channelType(EpollSocketChannel.class); 158 break; 159 160 case NETTY_UNIX_DOMAIN_SOCKET: 161 // These classes only work on Linux. 162 builder 163 .eventLoopGroup(new EpollEventLoopGroup(0, tf)) 164 .channelType(EpollDomainSocketChannel.class); 165 break; 166 167 default: 168 // Should never get here. 169 throw new IllegalArgumentException("Unsupported transport: " + transport); 170 } 171 return builder; 172 } 173 174 private static ExecutorService clientExecutor; 175 getExecutor()176 private static synchronized ExecutorService getExecutor() { 177 if (clientExecutor == null) { 178 clientExecutor = new ForkJoinPool( 179 Runtime.getRuntime().availableProcessors(), 180 new ForkJoinWorkerThreadFactory() { 181 final AtomicInteger num = new AtomicInteger(); 182 @Override 183 public ForkJoinWorkerThread newThread(ForkJoinPool pool) { 184 ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool); 185 thread.setDaemon(true); 186 thread.setName("grpc-client-app-" + "-" + num.getAndIncrement()); 187 return thread; 188 } 189 }, UncaughtExceptionHandlers.systemExit(), true /* async */); 190 } 191 return clientExecutor; 192 } 193 194 /** 195 * Create a {@link ManagedChannel} for the given parameters. 196 */ newClientChannel(Transport transport, SocketAddress address, boolean tls, boolean testca, @Nullable String authorityOverride, int flowControlWindow, boolean directExecutor)197 public static ManagedChannel newClientChannel(Transport transport, SocketAddress address, 198 boolean tls, boolean testca, @Nullable String authorityOverride, 199 int flowControlWindow, boolean directExecutor) { 200 ManagedChannelBuilder<?> builder; 201 if (transport == Transport.OK_HTTP) { 202 builder = newOkHttpClientChannel(address, tls, testca); 203 } else { 204 try { 205 builder = newNettyClientChannel(transport, address, tls, testca, flowControlWindow); 206 } catch (Exception e) { 207 throw new RuntimeException(e); 208 } 209 } 210 if (authorityOverride != null) { 211 builder.overrideAuthority(authorityOverride); 212 } 213 214 if (directExecutor) { 215 builder.directExecutor(); 216 } else { 217 // TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be 218 // put. Move it somewhere else, or remove it if no longer necessary. 219 // See: https://github.com/grpc/grpc-java/issues/2119 220 builder.executor(getExecutor()); 221 } 222 223 return builder.build(); 224 } 225 226 /** 227 * Save a {@link Histogram} to a file. 228 */ saveHistogram(Histogram histogram, String filename)229 public static void saveHistogram(Histogram histogram, String filename) throws IOException { 230 File file; 231 PrintStream log = null; 232 try { 233 file = new File(filename); 234 if (file.exists() && !file.delete()) { 235 System.err.println("Failed deleting previous histogram file: " + file.getAbsolutePath()); 236 } 237 log = new PrintStream(new FileOutputStream(file), false); 238 histogram.outputPercentileDistribution(log, 1.0); 239 } finally { 240 if (log != null) { 241 log.close(); 242 } 243 } 244 } 245 246 /** 247 * Construct a {@link SimpleResponse} for the given request. 248 */ makeResponse(SimpleRequest request)249 public static SimpleResponse makeResponse(SimpleRequest request) { 250 if (request.getResponseSize() > 0) { 251 if (!Messages.PayloadType.COMPRESSABLE.equals(request.getResponseType())) { 252 throw Status.INTERNAL.augmentDescription("Error creating payload.").asRuntimeException(); 253 } 254 255 ByteString body = ByteString.copyFrom(new byte[request.getResponseSize()]); 256 Messages.PayloadType type = request.getResponseType(); 257 258 Payload payload = Payload.newBuilder().setType(type).setBody(body).build(); 259 return SimpleResponse.newBuilder().setPayload(payload).build(); 260 } 261 return SimpleResponse.getDefaultInstance(); 262 } 263 264 /** 265 * Construct a {@link SimpleRequest} with the specified dimensions. 266 */ makeRequest(Messages.PayloadType payloadType, int reqLength, int respLength)267 public static SimpleRequest makeRequest(Messages.PayloadType payloadType, int reqLength, 268 int respLength) { 269 ByteString body = ByteString.copyFrom(new byte[reqLength]); 270 Payload payload = Payload.newBuilder() 271 .setType(payloadType) 272 .setBody(body) 273 .build(); 274 275 return SimpleRequest.newBuilder() 276 .setResponseType(payloadType) 277 .setResponseSize(respLength) 278 .setPayload(payload) 279 .build(); 280 } 281 282 /** 283 * Picks a port that is not used right at this moment. 284 * Warning: Not thread safe. May see "BindException: Address already in use: bind" if using the 285 * returned port to create a new server socket when other threads/processes are concurrently 286 * creating new sockets without a specific port. 287 */ pickUnusedPort()288 public static int pickUnusedPort() { 289 try { 290 ServerSocket serverSocket = new ServerSocket(0); 291 int port = serverSocket.getLocalPort(); 292 serverSocket.close(); 293 return port; 294 } catch (IOException e) { 295 throw new RuntimeException(e); 296 } 297 } 298 } 299