1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.benchmarks; 18 19 import static io.grpc.benchmarks.Utils.pickUnusedPort; 20 21 import com.google.protobuf.ByteString; 22 import io.grpc.InsecureChannelCredentials; 23 import io.grpc.InsecureServerCredentials; 24 import io.grpc.ManagedChannel; 25 import io.grpc.ManagedChannelBuilder; 26 import io.grpc.Server; 27 import io.grpc.ServerBuilder; 28 import io.grpc.ServerCredentials; 29 import io.grpc.Status; 30 import io.grpc.StatusRuntimeException; 31 import io.grpc.benchmarks.proto.BenchmarkServiceGrpc; 32 import io.grpc.benchmarks.proto.Messages.Payload; 33 import io.grpc.benchmarks.proto.Messages.SimpleRequest; 34 import io.grpc.benchmarks.proto.Messages.SimpleResponse; 35 import io.grpc.benchmarks.qps.AsyncServer; 36 import io.grpc.inprocess.InProcessChannelBuilder; 37 import io.grpc.inprocess.InProcessServerBuilder; 38 import io.grpc.netty.NegotiationType; 39 import io.grpc.netty.NettyChannelBuilder; 40 import io.grpc.netty.NettyServerBuilder; 41 import io.grpc.okhttp.OkHttpChannelBuilder; 42 import io.grpc.stub.StreamObserver; 43 import io.netty.channel.Channel; 44 import io.netty.channel.DefaultEventLoopGroup; 45 import io.netty.channel.EventLoopGroup; 46 import io.netty.channel.ServerChannel; 47 import io.netty.channel.local.LocalAddress; 48 import io.netty.channel.local.LocalChannel; 49 import io.netty.channel.local.LocalServerChannel; 50 import java.net.InetSocketAddress; 51 import java.util.Iterator; 52 import java.util.concurrent.Future; 53 import java.util.concurrent.TimeUnit; 54 import org.openjdk.jmh.annotations.Benchmark; 55 import org.openjdk.jmh.annotations.BenchmarkMode; 56 import org.openjdk.jmh.annotations.Mode; 57 import org.openjdk.jmh.annotations.OperationsPerInvocation; 58 import org.openjdk.jmh.annotations.OutputTimeUnit; 59 import org.openjdk.jmh.annotations.Param; 60 import org.openjdk.jmh.annotations.Scope; 61 import org.openjdk.jmh.annotations.Setup; 62 import org.openjdk.jmh.annotations.State; 63 import org.openjdk.jmh.annotations.TearDown; 64 import org.openjdk.jmh.annotations.Threads; 65 66 /** Some text. */ 67 @State(Scope.Benchmark) 68 public class TransportBenchmark { 69 public enum Transport { 70 INPROCESS, NETTY, NETTY_LOCAL, NETTY_EPOLL, OKHTTP 71 } 72 73 @Param({"INPROCESS", "NETTY", "OKHTTP"}) 74 public Transport transport; 75 @Param({"true", "false"}) 76 public boolean direct; 77 78 private ManagedChannel channel; 79 private Server server; 80 private BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub; 81 private BenchmarkServiceGrpc.BenchmarkServiceStub asyncStub; 82 private EventLoopGroup groupToShutdown; 83 84 @Setup setUp()85 public void setUp() throws Exception { 86 ServerCredentials serverCreds = InsecureServerCredentials.create(); 87 ServerBuilder<?> serverBuilder; 88 ManagedChannelBuilder<?> channelBuilder; 89 switch (transport) { 90 case INPROCESS: { 91 String name = "bench" + Math.random(); 92 serverBuilder = InProcessServerBuilder.forName(name); 93 channelBuilder = InProcessChannelBuilder.forName(name); 94 break; 95 } 96 case NETTY: { 97 InetSocketAddress address = new InetSocketAddress("localhost", pickUnusedPort()); 98 serverBuilder = NettyServerBuilder.forAddress(address, serverCreds); 99 channelBuilder = NettyChannelBuilder.forAddress(address) 100 .negotiationType(NegotiationType.PLAINTEXT); 101 break; 102 } 103 case NETTY_LOCAL: { 104 String name = "bench" + Math.random(); 105 LocalAddress address = new LocalAddress(name); 106 EventLoopGroup group = new DefaultEventLoopGroup(); 107 serverBuilder = NettyServerBuilder.forAddress(address, serverCreds) 108 .bossEventLoopGroup(group) 109 .workerEventLoopGroup(group) 110 .channelType(LocalServerChannel.class); 111 channelBuilder = NettyChannelBuilder.forAddress(address) 112 .eventLoopGroup(group) 113 .channelType(LocalChannel.class) 114 .negotiationType(NegotiationType.PLAINTEXT); 115 groupToShutdown = group; 116 break; 117 } 118 case NETTY_EPOLL: { 119 InetSocketAddress address = new InetSocketAddress("localhost", pickUnusedPort()); 120 121 // Reflection used since they are only available on linux. 122 Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup"); 123 EventLoopGroup group = (EventLoopGroup) groupClass.getConstructor().newInstance(); 124 125 Class<? extends ServerChannel> serverChannelClass = 126 Class.forName("io.netty.channel.epoll.EpollServerSocketChannel") 127 .asSubclass(ServerChannel.class); 128 serverBuilder = NettyServerBuilder.forAddress(address, serverCreds) 129 .bossEventLoopGroup(group) 130 .workerEventLoopGroup(group) 131 .channelType(serverChannelClass); 132 Class<? extends Channel> channelClass = 133 Class.forName("io.netty.channel.epoll.EpollSocketChannel") 134 .asSubclass(Channel.class); 135 channelBuilder = NettyChannelBuilder.forAddress(address) 136 .eventLoopGroup(group) 137 .channelType(channelClass) 138 .negotiationType(NegotiationType.PLAINTEXT); 139 groupToShutdown = group; 140 break; 141 } 142 case OKHTTP: { 143 int port = pickUnusedPort(); 144 InetSocketAddress address = new InetSocketAddress("localhost", port); 145 serverBuilder = NettyServerBuilder.forAddress(address, serverCreds); 146 channelBuilder = OkHttpChannelBuilder 147 .forAddress("localhost", port, InsecureChannelCredentials.create()); 148 break; 149 } 150 default: 151 throw new Exception("Unknown transport: " + transport); 152 } 153 154 if (direct) { 155 serverBuilder.directExecutor(); 156 // Because blocking stubs avoid the executor, this doesn't do much. 157 channelBuilder.directExecutor(); 158 } 159 160 server = serverBuilder 161 .addService(new AsyncServer.BenchmarkServiceImpl()) 162 .build(); 163 server.start(); 164 channel = channelBuilder.build(); 165 stub = BenchmarkServiceGrpc.newBlockingStub(channel); 166 asyncStub = BenchmarkServiceGrpc.newStub(channel); 167 // Wait for channel to start 168 stub.unaryCall(SimpleRequest.getDefaultInstance()); 169 } 170 171 @TearDown tearDown()172 public void tearDown() throws Exception { 173 channel.shutdown(); 174 server.shutdown(); 175 channel.awaitTermination(1, TimeUnit.SECONDS); 176 server.awaitTermination(1, TimeUnit.SECONDS); 177 if (!channel.isTerminated()) { 178 throw new Exception("failed to shut down channel"); 179 } 180 if (!server.isTerminated()) { 181 throw new Exception("failed to shut down server"); 182 } 183 if (groupToShutdown != null) { 184 Future<?> unused = groupToShutdown.shutdownGracefully(0, 1, TimeUnit.SECONDS); 185 groupToShutdown.awaitTermination(1, TimeUnit.SECONDS); 186 if (!groupToShutdown.isTerminated()) { 187 throw new Exception("failed to shut down event loop group."); 188 } 189 } 190 } 191 192 private static final SimpleRequest UNARY_CALL_1024_REQUEST = SimpleRequest.newBuilder() 193 .setResponseSize(1024) 194 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[1024]))) 195 .build(); 196 197 @Benchmark 198 @BenchmarkMode(Mode.SampleTime) 199 @OutputTimeUnit(TimeUnit.NANOSECONDS) unaryCall1024Latency()200 public SimpleResponse unaryCall1024Latency() { 201 return stub.unaryCall(UNARY_CALL_1024_REQUEST); 202 } 203 204 private static final int BYTE_THROUGHPUT_RESPONSE_SIZE = 1048576; 205 private static final SimpleRequest BYTE_THROUGHPUT_REQUEST = SimpleRequest.newBuilder() 206 .setResponseSize(BYTE_THROUGHPUT_RESPONSE_SIZE) 207 .build(); 208 209 @Benchmark 210 @BenchmarkMode(Mode.Throughput) 211 @OperationsPerInvocation(BYTE_THROUGHPUT_RESPONSE_SIZE) 212 @Threads(10) unaryCallsByteThroughput()213 public SimpleResponse unaryCallsByteThroughput() { 214 return stub.unaryCall(BYTE_THROUGHPUT_REQUEST); 215 } 216 217 @SuppressWarnings("StaticAssignmentOfThrowable") 218 private static final Throwable OK_THROWABLE = new RuntimeException("OK"); 219 220 @State(Scope.Thread) 221 public static class PingPongStreamState { 222 private final ThreadlessExecutor executor = new ThreadlessExecutor(); 223 private StreamObserver<SimpleRequest> requestObserver; 224 private SimpleResponse response; 225 private Throwable status; 226 227 @Setup setUp(TransportBenchmark bench)228 public void setUp(TransportBenchmark bench) { 229 requestObserver = bench.asyncStub 230 .withExecutor(executor) 231 .streamingCall(new StreamObserver<SimpleResponse>() { 232 @Override public void onNext(SimpleResponse next) { 233 assert response == null; 234 response = next; 235 } 236 237 @Override public void onError(Throwable t) { 238 status = t; 239 } 240 241 @Override public void onCompleted() { 242 status = OK_THROWABLE; 243 } 244 }); 245 } 246 247 /** Issues request and waits for response. */ pingPong(SimpleRequest request)248 public SimpleResponse pingPong(SimpleRequest request) throws InterruptedException { 249 requestObserver.onNext(request); 250 while (true) { 251 executor.waitAndDrain(); 252 if (response != null) { 253 SimpleResponse savedResponse = response; 254 response = null; 255 return savedResponse; 256 } 257 if (status != null) { 258 throw new RuntimeException("Unexpected stream termination", status); 259 } 260 } 261 } 262 263 @TearDown tearDown()264 public void tearDown() throws InterruptedException { 265 requestObserver.onCompleted(); 266 while (status == null) { 267 executor.waitAndDrain(); 268 } 269 if (status != OK_THROWABLE) { 270 throw new RuntimeException("Non-graceful stream shutdown", status); 271 } 272 } 273 } 274 275 @Benchmark 276 @BenchmarkMode(Mode.Throughput) 277 @OperationsPerInvocation(BYTE_THROUGHPUT_RESPONSE_SIZE) 278 @Threads(10) streamingCallsByteThroughput(PingPongStreamState state)279 public SimpleResponse streamingCallsByteThroughput(PingPongStreamState state) 280 throws InterruptedException { 281 return state.pingPong(BYTE_THROUGHPUT_REQUEST); 282 } 283 284 @State(Scope.Thread) 285 public static class InfiniteStreamState { 286 private final CancellableInterceptor cancellableInterceptor = new CancellableInterceptor(); 287 private Iterator<SimpleResponse> iter; 288 289 @Setup setUp(TransportBenchmark bench)290 public void setUp(TransportBenchmark bench) { 291 iter = bench.stub 292 .withInterceptors(cancellableInterceptor) 293 .streamingFromServer(SimpleRequest.getDefaultInstance()); 294 } 295 recv()296 public SimpleResponse recv() throws InterruptedException { 297 return iter.next(); 298 } 299 300 @TearDown tearDown()301 public void tearDown() throws InterruptedException { 302 cancellableInterceptor.cancel("Normal tear-down", null); 303 try { 304 // Need to drain the queue 305 while (iter.hasNext()) { 306 iter.next(); 307 } 308 } catch (StatusRuntimeException ex) { 309 if (!Status.Code.CANCELLED.equals(ex.getStatus().getCode())) { 310 throw ex; 311 } 312 } 313 } 314 } 315 316 // NOTE: Causes OOM with NETTY_LOCAL. Probably a flow control problem in NETTY_LOCAL, but we 317 // aren't too concerned. 318 @Benchmark 319 @BenchmarkMode(Mode.Throughput) 320 @Threads(10) streamingCallsMessageThroughput(InfiniteStreamState state)321 public SimpleResponse streamingCallsMessageThroughput(InfiniteStreamState state) 322 throws InterruptedException { 323 return state.recv(); 324 } 325 } 326