• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.benchmarks;
18 
19 import static io.grpc.benchmarks.Utils.pickUnusedPort;
20 
21 import com.google.protobuf.ByteString;
22 import io.grpc.InsecureChannelCredentials;
23 import io.grpc.InsecureServerCredentials;
24 import io.grpc.ManagedChannel;
25 import io.grpc.ManagedChannelBuilder;
26 import io.grpc.Server;
27 import io.grpc.ServerBuilder;
28 import io.grpc.ServerCredentials;
29 import io.grpc.Status;
30 import io.grpc.StatusRuntimeException;
31 import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
32 import io.grpc.benchmarks.proto.Messages.Payload;
33 import io.grpc.benchmarks.proto.Messages.SimpleRequest;
34 import io.grpc.benchmarks.proto.Messages.SimpleResponse;
35 import io.grpc.benchmarks.qps.AsyncServer;
36 import io.grpc.inprocess.InProcessChannelBuilder;
37 import io.grpc.inprocess.InProcessServerBuilder;
38 import io.grpc.netty.NegotiationType;
39 import io.grpc.netty.NettyChannelBuilder;
40 import io.grpc.netty.NettyServerBuilder;
41 import io.grpc.okhttp.OkHttpChannelBuilder;
42 import io.grpc.stub.StreamObserver;
43 import io.netty.channel.Channel;
44 import io.netty.channel.DefaultEventLoopGroup;
45 import io.netty.channel.EventLoopGroup;
46 import io.netty.channel.ServerChannel;
47 import io.netty.channel.local.LocalAddress;
48 import io.netty.channel.local.LocalChannel;
49 import io.netty.channel.local.LocalServerChannel;
50 import java.net.InetSocketAddress;
51 import java.util.Iterator;
52 import java.util.concurrent.Future;
53 import java.util.concurrent.TimeUnit;
54 import org.openjdk.jmh.annotations.Benchmark;
55 import org.openjdk.jmh.annotations.BenchmarkMode;
56 import org.openjdk.jmh.annotations.Mode;
57 import org.openjdk.jmh.annotations.OperationsPerInvocation;
58 import org.openjdk.jmh.annotations.OutputTimeUnit;
59 import org.openjdk.jmh.annotations.Param;
60 import org.openjdk.jmh.annotations.Scope;
61 import org.openjdk.jmh.annotations.Setup;
62 import org.openjdk.jmh.annotations.State;
63 import org.openjdk.jmh.annotations.TearDown;
64 import org.openjdk.jmh.annotations.Threads;
65 
66 /** Some text. */
67 @State(Scope.Benchmark)
68 public class TransportBenchmark {
69   public enum Transport {
70     INPROCESS, NETTY, NETTY_LOCAL, NETTY_EPOLL, OKHTTP
71   }
72 
73   @Param({"INPROCESS", "NETTY", "OKHTTP"})
74   public Transport transport;
75   @Param({"true", "false"})
76   public boolean direct;
77 
78   private ManagedChannel channel;
79   private Server server;
80   private BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub;
81   private BenchmarkServiceGrpc.BenchmarkServiceStub asyncStub;
82   private EventLoopGroup groupToShutdown;
83 
84   @Setup
setUp()85   public void setUp() throws Exception {
86     ServerCredentials serverCreds = InsecureServerCredentials.create();
87     ServerBuilder<?> serverBuilder;
88     ManagedChannelBuilder<?> channelBuilder;
89     switch (transport) {
90       case INPROCESS: {
91         String name = "bench" + Math.random();
92         serverBuilder = InProcessServerBuilder.forName(name);
93         channelBuilder = InProcessChannelBuilder.forName(name);
94         break;
95       }
96       case NETTY: {
97         InetSocketAddress address = new InetSocketAddress("localhost", pickUnusedPort());
98         serverBuilder = NettyServerBuilder.forAddress(address, serverCreds);
99         channelBuilder = NettyChannelBuilder.forAddress(address)
100             .negotiationType(NegotiationType.PLAINTEXT);
101         break;
102       }
103       case NETTY_LOCAL: {
104         String name = "bench" + Math.random();
105         LocalAddress address = new LocalAddress(name);
106         EventLoopGroup group = new DefaultEventLoopGroup();
107         serverBuilder = NettyServerBuilder.forAddress(address, serverCreds)
108             .bossEventLoopGroup(group)
109             .workerEventLoopGroup(group)
110             .channelType(LocalServerChannel.class);
111         channelBuilder = NettyChannelBuilder.forAddress(address)
112             .eventLoopGroup(group)
113             .channelType(LocalChannel.class)
114             .negotiationType(NegotiationType.PLAINTEXT);
115         groupToShutdown = group;
116         break;
117       }
118       case NETTY_EPOLL: {
119         InetSocketAddress address = new InetSocketAddress("localhost", pickUnusedPort());
120 
121         // Reflection used since they are only available on linux.
122         Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
123         EventLoopGroup group = (EventLoopGroup) groupClass.getConstructor().newInstance();
124 
125         Class<? extends ServerChannel> serverChannelClass =
126             Class.forName("io.netty.channel.epoll.EpollServerSocketChannel")
127               .asSubclass(ServerChannel.class);
128         serverBuilder = NettyServerBuilder.forAddress(address, serverCreds)
129             .bossEventLoopGroup(group)
130             .workerEventLoopGroup(group)
131             .channelType(serverChannelClass);
132         Class<? extends Channel> channelClass =
133             Class.forName("io.netty.channel.epoll.EpollSocketChannel")
134               .asSubclass(Channel.class);
135         channelBuilder = NettyChannelBuilder.forAddress(address)
136             .eventLoopGroup(group)
137             .channelType(channelClass)
138             .negotiationType(NegotiationType.PLAINTEXT);
139         groupToShutdown = group;
140         break;
141       }
142       case OKHTTP: {
143         int port = pickUnusedPort();
144         InetSocketAddress address = new InetSocketAddress("localhost", port);
145         serverBuilder = NettyServerBuilder.forAddress(address, serverCreds);
146         channelBuilder = OkHttpChannelBuilder
147             .forAddress("localhost", port, InsecureChannelCredentials.create());
148         break;
149       }
150       default:
151         throw new Exception("Unknown transport: " + transport);
152     }
153 
154     if (direct) {
155       serverBuilder.directExecutor();
156       // Because blocking stubs avoid the executor, this doesn't do much.
157       channelBuilder.directExecutor();
158     }
159 
160     server = serverBuilder
161         .addService(new AsyncServer.BenchmarkServiceImpl())
162         .build();
163     server.start();
164     channel = channelBuilder.build();
165     stub = BenchmarkServiceGrpc.newBlockingStub(channel);
166     asyncStub = BenchmarkServiceGrpc.newStub(channel);
167     // Wait for channel to start
168     stub.unaryCall(SimpleRequest.getDefaultInstance());
169   }
170 
171   @TearDown
tearDown()172   public void tearDown() throws Exception {
173     channel.shutdown();
174     server.shutdown();
175     channel.awaitTermination(1, TimeUnit.SECONDS);
176     server.awaitTermination(1, TimeUnit.SECONDS);
177     if (!channel.isTerminated()) {
178       throw new Exception("failed to shut down channel");
179     }
180     if (!server.isTerminated()) {
181       throw new Exception("failed to shut down server");
182     }
183     if (groupToShutdown != null) {
184       Future<?> unused = groupToShutdown.shutdownGracefully(0, 1, TimeUnit.SECONDS);
185       groupToShutdown.awaitTermination(1, TimeUnit.SECONDS);
186       if (!groupToShutdown.isTerminated()) {
187         throw new Exception("failed to shut down event loop group.");
188       }
189     }
190   }
191 
192   private static final SimpleRequest UNARY_CALL_1024_REQUEST = SimpleRequest.newBuilder()
193       .setResponseSize(1024)
194       .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[1024])))
195       .build();
196 
197   @Benchmark
198   @BenchmarkMode(Mode.SampleTime)
199   @OutputTimeUnit(TimeUnit.NANOSECONDS)
unaryCall1024Latency()200   public SimpleResponse unaryCall1024Latency() {
201     return stub.unaryCall(UNARY_CALL_1024_REQUEST);
202   }
203 
204   private static final int BYTE_THROUGHPUT_RESPONSE_SIZE = 1048576;
205   private static final SimpleRequest BYTE_THROUGHPUT_REQUEST = SimpleRequest.newBuilder()
206       .setResponseSize(BYTE_THROUGHPUT_RESPONSE_SIZE)
207       .build();
208 
209   @Benchmark
210   @BenchmarkMode(Mode.Throughput)
211   @OperationsPerInvocation(BYTE_THROUGHPUT_RESPONSE_SIZE)
212   @Threads(10)
unaryCallsByteThroughput()213   public SimpleResponse unaryCallsByteThroughput() {
214     return stub.unaryCall(BYTE_THROUGHPUT_REQUEST);
215   }
216 
217   @SuppressWarnings("StaticAssignmentOfThrowable")
218   private static final Throwable OK_THROWABLE = new RuntimeException("OK");
219 
220   @State(Scope.Thread)
221   public static class PingPongStreamState {
222     private final ThreadlessExecutor executor = new ThreadlessExecutor();
223     private StreamObserver<SimpleRequest> requestObserver;
224     private SimpleResponse response;
225     private Throwable status;
226 
227     @Setup
setUp(TransportBenchmark bench)228     public void setUp(TransportBenchmark bench) {
229       requestObserver = bench.asyncStub
230           .withExecutor(executor)
231           .streamingCall(new StreamObserver<SimpleResponse>() {
232             @Override public void onNext(SimpleResponse next) {
233               assert response == null;
234               response = next;
235             }
236 
237             @Override public void onError(Throwable t) {
238               status = t;
239             }
240 
241             @Override public void onCompleted() {
242               status = OK_THROWABLE;
243             }
244           });
245     }
246 
247     /** Issues request and waits for response. */
pingPong(SimpleRequest request)248     public SimpleResponse pingPong(SimpleRequest request) throws InterruptedException {
249       requestObserver.onNext(request);
250       while (true) {
251         executor.waitAndDrain();
252         if (response != null) {
253           SimpleResponse savedResponse = response;
254           response = null;
255           return savedResponse;
256         }
257         if (status != null) {
258           throw new RuntimeException("Unexpected stream termination", status);
259         }
260       }
261     }
262 
263     @TearDown
tearDown()264     public void tearDown() throws InterruptedException {
265       requestObserver.onCompleted();
266       while (status == null) {
267         executor.waitAndDrain();
268       }
269       if (status != OK_THROWABLE) {
270         throw new RuntimeException("Non-graceful stream shutdown", status);
271       }
272     }
273   }
274 
275   @Benchmark
276   @BenchmarkMode(Mode.Throughput)
277   @OperationsPerInvocation(BYTE_THROUGHPUT_RESPONSE_SIZE)
278   @Threads(10)
streamingCallsByteThroughput(PingPongStreamState state)279   public SimpleResponse streamingCallsByteThroughput(PingPongStreamState state)
280       throws InterruptedException {
281     return state.pingPong(BYTE_THROUGHPUT_REQUEST);
282   }
283 
284   @State(Scope.Thread)
285   public static class InfiniteStreamState {
286     private final CancellableInterceptor cancellableInterceptor = new CancellableInterceptor();
287     private Iterator<SimpleResponse> iter;
288 
289     @Setup
setUp(TransportBenchmark bench)290     public void setUp(TransportBenchmark bench) {
291       iter = bench.stub
292           .withInterceptors(cancellableInterceptor)
293           .streamingFromServer(SimpleRequest.getDefaultInstance());
294     }
295 
recv()296     public SimpleResponse recv() throws InterruptedException {
297       return iter.next();
298     }
299 
300     @TearDown
tearDown()301     public void tearDown() throws InterruptedException {
302       cancellableInterceptor.cancel("Normal tear-down", null);
303       try {
304         // Need to drain the queue
305         while (iter.hasNext()) {
306           iter.next();
307         }
308       } catch (StatusRuntimeException ex) {
309         if (!Status.Code.CANCELLED.equals(ex.getStatus().getCode())) {
310           throw ex;
311         }
312       }
313     }
314   }
315 
316   // NOTE: Causes OOM with NETTY_LOCAL. Probably a flow control problem in NETTY_LOCAL, but we
317   // aren't too concerned.
318   @Benchmark
319   @BenchmarkMode(Mode.Throughput)
320   @Threads(10)
streamingCallsMessageThroughput(InfiniteStreamState state)321   public SimpleResponse streamingCallsMessageThroughput(InfiniteStreamState state)
322       throws InterruptedException {
323     return state.recv();
324   }
325 }
326