• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.benchmarks.qps;
18 
19 import com.google.common.util.concurrent.UncaughtExceptionHandlers;
20 import com.google.protobuf.ByteString;
21 import io.grpc.Server;
22 import io.grpc.Status;
23 import io.grpc.benchmarks.Utils;
24 import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
25 import io.grpc.benchmarks.proto.Messages;
26 import io.grpc.internal.testing.TestUtils;
27 import io.grpc.netty.NettyServerBuilder;
28 import io.grpc.stub.ServerCallStreamObserver;
29 import io.grpc.stub.StreamObserver;
30 import io.grpc.stub.StreamObservers;
31 import io.netty.channel.EventLoopGroup;
32 import io.netty.channel.ServerChannel;
33 import io.netty.channel.nio.NioEventLoopGroup;
34 import io.netty.channel.socket.nio.NioServerSocketChannel;
35 import io.netty.util.concurrent.DefaultThreadFactory;
36 import java.io.File;
37 import java.io.IOException;
38 import java.util.Iterator;
39 import java.util.concurrent.ForkJoinPool;
40 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
41 import java.util.concurrent.ForkJoinWorkerThread;
42 import java.util.concurrent.ThreadFactory;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicInteger;
45 import java.util.logging.Logger;
46 
47 /**
48  * QPS server using the non-blocking API.
49  */
50 public class AsyncServer {
51   private static final Logger log = Logger.getLogger(AsyncServer.class.getName());
52 
53   /**
54    * checkstyle complains if there is no javadoc comment here.
55    */
main(String... args)56   public static void main(String... args) throws Exception {
57     new AsyncServer().run(args);
58   }
59 
60   /** Equivalent of "main", but non-static. */
run(String[] args)61   public void run(String[] args) throws Exception {
62     ServerConfiguration.Builder configBuilder = ServerConfiguration.newBuilder();
63     ServerConfiguration config;
64     try {
65       config = configBuilder.build(args);
66     } catch (Exception e) {
67       System.out.println(e.getMessage());
68       configBuilder.printUsage();
69       return;
70     }
71 
72     final Server server = newServer(config);
73     server.start();
74 
75     System.out.println("QPS Server started on " + config.address);
76 
77     Runtime.getRuntime().addShutdownHook(new Thread() {
78       @Override
79       public void run() {
80         try {
81           System.out.println("QPS Server shutting down");
82           server.shutdown();
83         } catch (Exception e) {
84           e.printStackTrace();
85         }
86       }
87     });
88     server.awaitTermination();
89   }
90 
91   @SuppressWarnings("LiteralClassName") // Epoll is not available on windows
newServer(ServerConfiguration config)92   static Server newServer(ServerConfiguration config) throws IOException {
93     final EventLoopGroup boss;
94     final EventLoopGroup worker;
95     final Class<? extends ServerChannel> channelType;
96     ThreadFactory tf = new DefaultThreadFactory("server-elg-", true /*daemon */);
97     switch (config.transport) {
98       case NETTY_NIO: {
99         boss = new NioEventLoopGroup(1, tf);
100         worker = new NioEventLoopGroup(0, tf);
101         channelType = NioServerSocketChannel.class;
102         break;
103       }
104       case NETTY_EPOLL: {
105         try {
106           // These classes are only available on linux.
107           Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
108           @SuppressWarnings("unchecked")
109           Class<? extends ServerChannel> channelClass = (Class<? extends ServerChannel>)
110               Class.forName("io.netty.channel.epoll.EpollServerSocketChannel");
111           boss =
112               (EventLoopGroup)
113                   (groupClass
114                       .getConstructor(int.class, ThreadFactory.class)
115                       .newInstance(1, tf));
116           worker =
117               (EventLoopGroup)
118                   (groupClass
119                       .getConstructor(int.class, ThreadFactory.class)
120                       .newInstance(0, tf));
121           channelType = channelClass;
122           break;
123         } catch (Exception e) {
124           throw new RuntimeException(e);
125         }
126       }
127       case NETTY_UNIX_DOMAIN_SOCKET: {
128         try {
129           // These classes are only available on linux.
130           Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
131           @SuppressWarnings("unchecked")
132           Class<? extends ServerChannel> channelClass = (Class<? extends ServerChannel>)
133               Class.forName("io.netty.channel.epoll.EpollServerDomainSocketChannel");
134           boss =
135               (EventLoopGroup)
136                   (groupClass
137                       .getConstructor(int.class, ThreadFactory.class)
138                       .newInstance(1, tf));
139           worker =
140               (EventLoopGroup)
141                   (groupClass
142                       .getConstructor(int.class, ThreadFactory.class)
143                       .newInstance(0, tf));
144           channelType = channelClass;
145           break;
146         } catch (Exception e) {
147           throw new RuntimeException(e);
148         }
149       }
150       default: {
151         // Should never get here.
152         throw new IllegalArgumentException("Unsupported transport: " + config.transport);
153       }
154     }
155 
156     NettyServerBuilder builder = NettyServerBuilder
157         .forAddress(config.address)
158         .bossEventLoopGroup(boss)
159         .workerEventLoopGroup(worker)
160         .channelType(channelType)
161         .addService(new BenchmarkServiceImpl())
162         .flowControlWindow(config.flowControlWindow);
163     if (config.tls) {
164       System.out.println("Using fake CA for TLS certificate.\n"
165           + "Run the Java client with --tls --testca");
166 
167       File cert = TestUtils.loadCert("server1.pem");
168       File key = TestUtils.loadCert("server1.key");
169       builder.useTransportSecurity(cert, key);
170     }
171     if (config.directExecutor) {
172       builder.directExecutor();
173     } else {
174       // TODO(carl-mastrangelo): This should not be necessary.  I don't know where this should be
175       // put.  Move it somewhere else, or remove it if no longer necessary.
176       // See: https://github.com/grpc/grpc-java/issues/2119
177       builder.executor(new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
178           new ForkJoinWorkerThreadFactory() {
179             final AtomicInteger num = new AtomicInteger();
180             @Override
181             public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
182               ForkJoinWorkerThread thread =
183                   ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
184               thread.setDaemon(true);
185               thread.setName("grpc-server-app-" + "-" + num.getAndIncrement());
186               return thread;
187             }
188           }, UncaughtExceptionHandlers.systemExit(), true /* async */));
189     }
190 
191     return builder.build();
192   }
193 
194   public static class BenchmarkServiceImpl extends BenchmarkServiceGrpc.BenchmarkServiceImplBase {
195     // Always use the same canned response for bidi. This is allowed by the spec.
196     private static final int BIDI_RESPONSE_BYTES = 100;
197     private static final Messages.SimpleResponse BIDI_RESPONSE = Messages.SimpleResponse
198         .newBuilder()
199         .setPayload(Messages.Payload.newBuilder()
200             .setBody(ByteString.copyFrom(new byte[BIDI_RESPONSE_BYTES])).build())
201         .build();
202 
203     private final AtomicBoolean shutdown = new AtomicBoolean();
204 
BenchmarkServiceImpl()205     public BenchmarkServiceImpl() {
206     }
207 
shutdown()208     public void shutdown() {
209       shutdown.set(true);
210     }
211 
212     @Override
unaryCall(Messages.SimpleRequest request, StreamObserver<Messages.SimpleResponse> responseObserver)213     public void unaryCall(Messages.SimpleRequest request,
214         StreamObserver<Messages.SimpleResponse> responseObserver) {
215       responseObserver.onNext(Utils.makeResponse(request));
216       responseObserver.onCompleted();
217     }
218 
219     @Override
streamingCall( final StreamObserver<Messages.SimpleResponse> observer)220     public StreamObserver<Messages.SimpleRequest> streamingCall(
221         final StreamObserver<Messages.SimpleResponse> observer) {
222       final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
223           (ServerCallStreamObserver<Messages.SimpleResponse>) observer;
224       // TODO(spencerfang): flow control to stop reading when !responseObserver.isReady
225       return new StreamObserver<Messages.SimpleRequest>() {
226         @Override
227         public void onNext(Messages.SimpleRequest value) {
228           if (shutdown.get()) {
229             responseObserver.onCompleted();
230             return;
231           }
232           responseObserver.onNext(Utils.makeResponse(value));
233         }
234 
235         @Override
236         public void onError(Throwable t) {
237           // other side closed with non OK
238           responseObserver.onError(t);
239         }
240 
241         @Override
242         public void onCompleted() {
243           // other side closed with OK
244           responseObserver.onCompleted();
245         }
246       };
247     }
248 
249     @Override
250     public StreamObserver<Messages.SimpleRequest> streamingFromClient(
251         final StreamObserver<Messages.SimpleResponse> responseObserver) {
252       return new StreamObserver<Messages.SimpleRequest>() {
253         Messages.SimpleRequest lastSeen = null;
254 
255         @Override
256         public void onNext(Messages.SimpleRequest value) {
257           if (shutdown.get()) {
258             responseObserver.onCompleted();
259             return;
260           }
261           lastSeen = value;
262         }
263 
264         @Override
265         public void onError(Throwable t) {
266           // other side closed with non OK
267           responseObserver.onError(t);
268         }
269 
270         @Override
271         public void onCompleted() {
272           if (lastSeen != null) {
273             responseObserver.onNext(Utils.makeResponse(lastSeen));
274             responseObserver.onCompleted();
275           } else {
276             responseObserver.onError(
277                 Status.FAILED_PRECONDITION
278                     .withDescription("never received any requests").asException());
279           }
280         }
281       };
282     }
283 
284     @Override
285     public void streamingFromServer(
286         final Messages.SimpleRequest request,
287         final StreamObserver<Messages.SimpleResponse> observer) {
288       // send forever, until the client cancels or we shut down
289       final Messages.SimpleResponse response = Utils.makeResponse(request);
290       final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
291           (ServerCallStreamObserver<Messages.SimpleResponse>) observer;
292       // If the client cancels, copyWithFlowControl takes care of calling
293       // responseObserver.onCompleted() for us
294       StreamObservers.copyWithFlowControl(
295           new Iterator<Messages.SimpleResponse>() {
296             @Override
297             public boolean hasNext() {
298               return !shutdown.get() && !responseObserver.isCancelled();
299             }
300 
301             @Override
302             public Messages.SimpleResponse next() {
303               return response;
304             }
305 
306             @Override
307             public void remove() {
308               throw new UnsupportedOperationException();
309             }
310           },
311           responseObserver);
312     }
313 
314     @Override
315     public StreamObserver<Messages.SimpleRequest> streamingBothWays(
316         final StreamObserver<Messages.SimpleResponse> observer) {
317       // receive data forever and send data forever until client cancels or we shut down.
318       final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
319           (ServerCallStreamObserver<Messages.SimpleResponse>) observer;
320       // If the client cancels, copyWithFlowControl takes care of calling
321       // responseObserver.onCompleted() for us
322       StreamObservers.copyWithFlowControl(
323           new Iterator<Messages.SimpleResponse>() {
324             @Override
325             public boolean hasNext() {
326               return !shutdown.get() && !responseObserver.isCancelled();
327             }
328 
329             @Override
330             public Messages.SimpleResponse next() {
331               return BIDI_RESPONSE;
332             }
333 
334             @Override
335             public void remove() {
336               throw new UnsupportedOperationException();
337             }
338           },
339           responseObserver
340       );
341 
342       return new StreamObserver<Messages.SimpleRequest>() {
343         @Override
344         public void onNext(final Messages.SimpleRequest request) {
345           // noop
346         }
347 
348         @Override
349         public void onError(Throwable t) {
350           // other side cancelled
351         }
352 
353         @Override
354         public void onCompleted() {
355           // Should never happen, because clients should cancel this call in order to stop
356           // the operation. Also because copyWithFlowControl hogs the inbound network thread
357           // via the handler for onReady, we would never expect this callback to be able to
358           // run anyways.
359           log.severe("clients should CANCEL the call to stop bidi streaming");
360         }
361       };
362     }
363   }
364 }
365