1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.benchmarks.qps; 18 19 import com.google.common.util.concurrent.UncaughtExceptionHandlers; 20 import com.google.protobuf.ByteString; 21 import io.grpc.Server; 22 import io.grpc.Status; 23 import io.grpc.benchmarks.Utils; 24 import io.grpc.benchmarks.proto.BenchmarkServiceGrpc; 25 import io.grpc.benchmarks.proto.Messages; 26 import io.grpc.internal.testing.TestUtils; 27 import io.grpc.netty.NettyServerBuilder; 28 import io.grpc.stub.ServerCallStreamObserver; 29 import io.grpc.stub.StreamObserver; 30 import io.grpc.stub.StreamObservers; 31 import io.netty.channel.EventLoopGroup; 32 import io.netty.channel.ServerChannel; 33 import io.netty.channel.nio.NioEventLoopGroup; 34 import io.netty.channel.socket.nio.NioServerSocketChannel; 35 import io.netty.util.concurrent.DefaultThreadFactory; 36 import java.io.File; 37 import java.io.IOException; 38 import java.util.Iterator; 39 import java.util.concurrent.ForkJoinPool; 40 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 41 import java.util.concurrent.ForkJoinWorkerThread; 42 import java.util.concurrent.ThreadFactory; 43 import java.util.concurrent.atomic.AtomicBoolean; 44 import java.util.concurrent.atomic.AtomicInteger; 45 import java.util.logging.Logger; 46 47 /** 48 * QPS server using the non-blocking API. 49 */ 50 public class AsyncServer { 51 private static final Logger log = Logger.getLogger(AsyncServer.class.getName()); 52 53 /** 54 * checkstyle complains if there is no javadoc comment here. 55 */ main(String... args)56 public static void main(String... args) throws Exception { 57 new AsyncServer().run(args); 58 } 59 60 /** Equivalent of "main", but non-static. */ run(String[] args)61 public void run(String[] args) throws Exception { 62 ServerConfiguration.Builder configBuilder = ServerConfiguration.newBuilder(); 63 ServerConfiguration config; 64 try { 65 config = configBuilder.build(args); 66 } catch (Exception e) { 67 System.out.println(e.getMessage()); 68 configBuilder.printUsage(); 69 return; 70 } 71 72 final Server server = newServer(config); 73 server.start(); 74 75 System.out.println("QPS Server started on " + config.address); 76 77 Runtime.getRuntime().addShutdownHook(new Thread() { 78 @Override 79 public void run() { 80 try { 81 System.out.println("QPS Server shutting down"); 82 server.shutdown(); 83 } catch (Exception e) { 84 e.printStackTrace(); 85 } 86 } 87 }); 88 server.awaitTermination(); 89 } 90 91 @SuppressWarnings("LiteralClassName") // Epoll is not available on windows newServer(ServerConfiguration config)92 static Server newServer(ServerConfiguration config) throws IOException { 93 final EventLoopGroup boss; 94 final EventLoopGroup worker; 95 final Class<? extends ServerChannel> channelType; 96 ThreadFactory tf = new DefaultThreadFactory("server-elg-", true /*daemon */); 97 switch (config.transport) { 98 case NETTY_NIO: { 99 boss = new NioEventLoopGroup(1, tf); 100 worker = new NioEventLoopGroup(0, tf); 101 channelType = NioServerSocketChannel.class; 102 break; 103 } 104 case NETTY_EPOLL: { 105 try { 106 // These classes are only available on linux. 107 Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup"); 108 @SuppressWarnings("unchecked") 109 Class<? extends ServerChannel> channelClass = (Class<? extends ServerChannel>) 110 Class.forName("io.netty.channel.epoll.EpollServerSocketChannel"); 111 boss = 112 (EventLoopGroup) 113 (groupClass 114 .getConstructor(int.class, ThreadFactory.class) 115 .newInstance(1, tf)); 116 worker = 117 (EventLoopGroup) 118 (groupClass 119 .getConstructor(int.class, ThreadFactory.class) 120 .newInstance(0, tf)); 121 channelType = channelClass; 122 break; 123 } catch (Exception e) { 124 throw new RuntimeException(e); 125 } 126 } 127 case NETTY_UNIX_DOMAIN_SOCKET: { 128 try { 129 // These classes are only available on linux. 130 Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup"); 131 @SuppressWarnings("unchecked") 132 Class<? extends ServerChannel> channelClass = (Class<? extends ServerChannel>) 133 Class.forName("io.netty.channel.epoll.EpollServerDomainSocketChannel"); 134 boss = 135 (EventLoopGroup) 136 (groupClass 137 .getConstructor(int.class, ThreadFactory.class) 138 .newInstance(1, tf)); 139 worker = 140 (EventLoopGroup) 141 (groupClass 142 .getConstructor(int.class, ThreadFactory.class) 143 .newInstance(0, tf)); 144 channelType = channelClass; 145 break; 146 } catch (Exception e) { 147 throw new RuntimeException(e); 148 } 149 } 150 default: { 151 // Should never get here. 152 throw new IllegalArgumentException("Unsupported transport: " + config.transport); 153 } 154 } 155 156 NettyServerBuilder builder = NettyServerBuilder 157 .forAddress(config.address) 158 .bossEventLoopGroup(boss) 159 .workerEventLoopGroup(worker) 160 .channelType(channelType) 161 .addService(new BenchmarkServiceImpl()) 162 .flowControlWindow(config.flowControlWindow); 163 if (config.tls) { 164 System.out.println("Using fake CA for TLS certificate.\n" 165 + "Run the Java client with --tls --testca"); 166 167 File cert = TestUtils.loadCert("server1.pem"); 168 File key = TestUtils.loadCert("server1.key"); 169 builder.useTransportSecurity(cert, key); 170 } 171 if (config.directExecutor) { 172 builder.directExecutor(); 173 } else { 174 // TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be 175 // put. Move it somewhere else, or remove it if no longer necessary. 176 // See: https://github.com/grpc/grpc-java/issues/2119 177 builder.executor(new ForkJoinPool(Runtime.getRuntime().availableProcessors(), 178 new ForkJoinWorkerThreadFactory() { 179 final AtomicInteger num = new AtomicInteger(); 180 @Override 181 public ForkJoinWorkerThread newThread(ForkJoinPool pool) { 182 ForkJoinWorkerThread thread = 183 ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); 184 thread.setDaemon(true); 185 thread.setName("grpc-server-app-" + "-" + num.getAndIncrement()); 186 return thread; 187 } 188 }, UncaughtExceptionHandlers.systemExit(), true /* async */)); 189 } 190 191 return builder.build(); 192 } 193 194 public static class BenchmarkServiceImpl extends BenchmarkServiceGrpc.BenchmarkServiceImplBase { 195 // Always use the same canned response for bidi. This is allowed by the spec. 196 private static final int BIDI_RESPONSE_BYTES = 100; 197 private static final Messages.SimpleResponse BIDI_RESPONSE = Messages.SimpleResponse 198 .newBuilder() 199 .setPayload(Messages.Payload.newBuilder() 200 .setBody(ByteString.copyFrom(new byte[BIDI_RESPONSE_BYTES])).build()) 201 .build(); 202 203 private final AtomicBoolean shutdown = new AtomicBoolean(); 204 BenchmarkServiceImpl()205 public BenchmarkServiceImpl() { 206 } 207 shutdown()208 public void shutdown() { 209 shutdown.set(true); 210 } 211 212 @Override unaryCall(Messages.SimpleRequest request, StreamObserver<Messages.SimpleResponse> responseObserver)213 public void unaryCall(Messages.SimpleRequest request, 214 StreamObserver<Messages.SimpleResponse> responseObserver) { 215 responseObserver.onNext(Utils.makeResponse(request)); 216 responseObserver.onCompleted(); 217 } 218 219 @Override streamingCall( final StreamObserver<Messages.SimpleResponse> observer)220 public StreamObserver<Messages.SimpleRequest> streamingCall( 221 final StreamObserver<Messages.SimpleResponse> observer) { 222 final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver = 223 (ServerCallStreamObserver<Messages.SimpleResponse>) observer; 224 // TODO(spencerfang): flow control to stop reading when !responseObserver.isReady 225 return new StreamObserver<Messages.SimpleRequest>() { 226 @Override 227 public void onNext(Messages.SimpleRequest value) { 228 if (shutdown.get()) { 229 responseObserver.onCompleted(); 230 return; 231 } 232 responseObserver.onNext(Utils.makeResponse(value)); 233 } 234 235 @Override 236 public void onError(Throwable t) { 237 // other side closed with non OK 238 responseObserver.onError(t); 239 } 240 241 @Override 242 public void onCompleted() { 243 // other side closed with OK 244 responseObserver.onCompleted(); 245 } 246 }; 247 } 248 249 @Override 250 public StreamObserver<Messages.SimpleRequest> streamingFromClient( 251 final StreamObserver<Messages.SimpleResponse> responseObserver) { 252 return new StreamObserver<Messages.SimpleRequest>() { 253 Messages.SimpleRequest lastSeen = null; 254 255 @Override 256 public void onNext(Messages.SimpleRequest value) { 257 if (shutdown.get()) { 258 responseObserver.onCompleted(); 259 return; 260 } 261 lastSeen = value; 262 } 263 264 @Override 265 public void onError(Throwable t) { 266 // other side closed with non OK 267 responseObserver.onError(t); 268 } 269 270 @Override 271 public void onCompleted() { 272 if (lastSeen != null) { 273 responseObserver.onNext(Utils.makeResponse(lastSeen)); 274 responseObserver.onCompleted(); 275 } else { 276 responseObserver.onError( 277 Status.FAILED_PRECONDITION 278 .withDescription("never received any requests").asException()); 279 } 280 } 281 }; 282 } 283 284 @Override 285 public void streamingFromServer( 286 final Messages.SimpleRequest request, 287 final StreamObserver<Messages.SimpleResponse> observer) { 288 // send forever, until the client cancels or we shut down 289 final Messages.SimpleResponse response = Utils.makeResponse(request); 290 final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver = 291 (ServerCallStreamObserver<Messages.SimpleResponse>) observer; 292 // If the client cancels, copyWithFlowControl takes care of calling 293 // responseObserver.onCompleted() for us 294 StreamObservers.copyWithFlowControl( 295 new Iterator<Messages.SimpleResponse>() { 296 @Override 297 public boolean hasNext() { 298 return !shutdown.get() && !responseObserver.isCancelled(); 299 } 300 301 @Override 302 public Messages.SimpleResponse next() { 303 return response; 304 } 305 306 @Override 307 public void remove() { 308 throw new UnsupportedOperationException(); 309 } 310 }, 311 responseObserver); 312 } 313 314 @Override 315 public StreamObserver<Messages.SimpleRequest> streamingBothWays( 316 final StreamObserver<Messages.SimpleResponse> observer) { 317 // receive data forever and send data forever until client cancels or we shut down. 318 final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver = 319 (ServerCallStreamObserver<Messages.SimpleResponse>) observer; 320 // If the client cancels, copyWithFlowControl takes care of calling 321 // responseObserver.onCompleted() for us 322 StreamObservers.copyWithFlowControl( 323 new Iterator<Messages.SimpleResponse>() { 324 @Override 325 public boolean hasNext() { 326 return !shutdown.get() && !responseObserver.isCancelled(); 327 } 328 329 @Override 330 public Messages.SimpleResponse next() { 331 return BIDI_RESPONSE; 332 } 333 334 @Override 335 public void remove() { 336 throw new UnsupportedOperationException(); 337 } 338 }, 339 responseObserver 340 ); 341 342 return new StreamObserver<Messages.SimpleRequest>() { 343 @Override 344 public void onNext(final Messages.SimpleRequest request) { 345 // noop 346 } 347 348 @Override 349 public void onError(Throwable t) { 350 // other side cancelled 351 } 352 353 @Override 354 public void onCompleted() { 355 // Should never happen, because clients should cancel this call in order to stop 356 // the operation. Also because copyWithFlowControl hogs the inbound network thread 357 // via the handler for onReady, we would never expect this callback to be able to 358 // run anyways. 359 log.severe("clients should CANCEL the call to stop bidi streaming"); 360 } 361 }; 362 } 363 } 364 } 365