1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import com.google.common.base.Splitter; 20 import com.google.common.collect.Iterables; 21 import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; 22 import io.grpc.InsecureServerCredentials; 23 import io.grpc.Metadata; 24 import io.grpc.Server; 25 import io.grpc.ServerCall; 26 import io.grpc.ServerCallHandler; 27 import io.grpc.ServerInterceptor; 28 import io.grpc.ServerInterceptors; 29 import io.grpc.Status; 30 import io.grpc.health.v1.HealthCheckResponse.ServingStatus; 31 import io.grpc.netty.NettyServerBuilder; 32 import io.grpc.protobuf.services.HealthStatusManager; 33 import io.grpc.protobuf.services.ProtoReflectionService; 34 import io.grpc.services.AdminInterface; 35 import io.grpc.stub.StreamObserver; 36 import io.grpc.testing.integration.Messages.SimpleRequest; 37 import io.grpc.testing.integration.Messages.SimpleResponse; 38 import io.grpc.xds.XdsServerBuilder; 39 import io.grpc.xds.XdsServerCredentials; 40 import java.net.InetAddress; 41 import java.net.UnknownHostException; 42 import java.util.ArrayList; 43 import java.util.List; 44 import java.util.concurrent.TimeUnit; 45 import java.util.logging.Level; 46 import java.util.logging.Logger; 47 48 /** Interop test server that implements the xDS testing service. */ 49 public final class XdsTestServer { 50 static final Metadata.Key<String> HOSTNAME_KEY = 51 Metadata.Key.of("hostname", Metadata.ASCII_STRING_MARSHALLER); 52 private static final Metadata.Key<String> CALL_BEHAVIOR_MD_KEY = 53 Metadata.Key.of("rpc-behavior", Metadata.ASCII_STRING_MARSHALLER); 54 private static final Metadata.Key<String> ATTEMPT_NUM = 55 Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER); 56 private static final String CALL_BEHAVIOR_KEEP_OPEN_VALUE = "keep-open"; 57 private static final String CALL_BEHAVIOR_SLEEP_VALUE = "sleep-"; 58 private static final String CALL_BEHAVIOR_SUCCEED_ON_RETRY_ATTEMPT_VALUE = 59 "succeed-on-retry-attempt-"; 60 private static final String CALL_BEHAVIOR_ERROR_CODE = 61 "error-code-"; 62 private static final String CALL_BEHAVIOR_HOSTNAME = "hostname="; 63 private static final Splitter HEADER_VALUE_SPLITTER = Splitter.on(',') 64 .trimResults() 65 .omitEmptyStrings(); 66 private static final Splitter HEADER_HOSTNAME_SPLITTER = Splitter.on(' '); 67 68 private static Logger logger = Logger.getLogger(XdsTestServer.class.getName()); 69 70 private int port = 8080; 71 private int maintenancePort = 8080; 72 private boolean secureMode = false; 73 private String serverId = "java_server"; 74 private HealthStatusManager health; 75 private Server server; 76 private Server maintenanceServer; 77 private String host; 78 79 /** 80 * The main application allowing this client to be launched from the command line. 81 */ main(String[] args)82 public static void main(String[] args) throws Exception { 83 final XdsTestServer server = new XdsTestServer(); 84 server.parseArgs(args); 85 Runtime.getRuntime() 86 .addShutdownHook( 87 new Thread() { 88 @Override 89 @SuppressWarnings("CatchAndPrintStackTrace") 90 public void run() { 91 try { 92 System.out.println("Shutting down"); 93 server.stop(); 94 } catch (Exception e) { 95 e.printStackTrace(); 96 } 97 } 98 }); 99 server.start(); 100 System.out.println("Server started on port " + server.port); 101 server.blockUntilShutdown(); 102 } 103 parseArgs(String[] args)104 private void parseArgs(String[] args) { 105 boolean usage = false; 106 for (String arg : args) { 107 if (!arg.startsWith("--")) { 108 System.err.println("All arguments must start with '--': " + arg); 109 usage = true; 110 break; 111 } 112 String[] parts = arg.substring(2).split("=", 2); 113 String key = parts[0]; 114 if ("help".equals(key)) { 115 usage = true; 116 break; 117 } 118 if (parts.length != 2) { 119 System.err.println("All arguments must be of the form --arg=value"); 120 usage = true; 121 break; 122 } 123 String value = parts[1]; 124 if ("port".equals(key)) { 125 port = Integer.valueOf(value); 126 } else if ("maintenance_port".equals(key)) { 127 maintenancePort = Integer.valueOf(value); 128 } else if ("secure_mode".equals(key)) { 129 secureMode = Boolean.parseBoolean(value); 130 } else if ("server_id".equals(key)) { 131 serverId = value; 132 } else { 133 System.err.println("Unknown argument: " + key); 134 usage = true; 135 break; 136 } 137 } 138 139 if (secureMode && (port == maintenancePort)) { 140 System.err.println( 141 "port and maintenance_port should be different for secure mode: port=" 142 + port 143 + ", maintenance_port=" 144 + maintenancePort); 145 usage = true; 146 } 147 148 if (usage) { 149 XdsTestServer s = new XdsTestServer(); 150 System.err.println( 151 "Usage: [ARGS...]" 152 + "\n" 153 + "\n --port=INT listening port for test server." 154 + "\n Default: " 155 + s.port 156 + "\n --maintenance_port=INT listening port for other servers." 157 + "\n Default: " 158 + s.maintenancePort 159 + "\n --secure_mode=BOOLEAN Use true to enable XdsCredentials." 160 + " port and maintenance_port should be different for secure mode." 161 + "\n Default: " 162 + s.secureMode 163 + "\n --server_id=STRING server ID for response." 164 + "\n Default: " 165 + s.serverId); 166 System.exit(1); 167 } 168 } 169 start()170 private void start() throws Exception { 171 try { 172 host = InetAddress.getLocalHost().getHostName(); 173 } catch (UnknownHostException e) { 174 logger.log(Level.SEVERE, "Failed to get host", e); 175 throw new RuntimeException(e); 176 } 177 health = new HealthStatusManager(); 178 if (secureMode) { 179 maintenanceServer = 180 NettyServerBuilder.forPort(maintenancePort) 181 .addService(new XdsUpdateHealthServiceImpl(health)) 182 .addService(health.getHealthService()) 183 .addService(ProtoReflectionService.newInstance()) 184 .addServices(AdminInterface.getStandardServices()) 185 .build(); 186 maintenanceServer.start(); 187 server = 188 XdsServerBuilder.forPort( 189 port, XdsServerCredentials.create(InsecureServerCredentials.create())) 190 .addService( 191 ServerInterceptors.intercept( 192 new TestServiceImpl(serverId, host), new TestInfoInterceptor(host))) 193 .build(); 194 server.start(); 195 } else { 196 server = 197 NettyServerBuilder.forPort(port) 198 .addService( 199 ServerInterceptors.intercept( 200 new TestServiceImpl(serverId, host), new TestInfoInterceptor(host))) 201 .addService(new XdsUpdateHealthServiceImpl(health)) 202 .addService(health.getHealthService()) 203 .addService(ProtoReflectionService.newInstance()) 204 .addServices(AdminInterface.getStandardServices()) 205 .build(); 206 server.start(); 207 maintenanceServer = null; 208 } 209 health.setStatus("", ServingStatus.SERVING); 210 } 211 stop()212 private void stop() throws Exception { 213 server.shutdownNow(); 214 if (maintenanceServer != null) { 215 maintenanceServer.shutdownNow(); 216 } 217 if (!server.awaitTermination(5, TimeUnit.SECONDS)) { 218 System.err.println("Timed out waiting for server shutdown"); 219 } 220 if (maintenanceServer != null && !maintenanceServer.awaitTermination(5, TimeUnit.SECONDS)) { 221 System.err.println("Timed out waiting for maintenanceServer shutdown"); 222 } 223 } 224 blockUntilShutdown()225 private void blockUntilShutdown() throws InterruptedException { 226 if (server != null) { 227 server.awaitTermination(); 228 } 229 if (maintenanceServer != null) { 230 maintenanceServer.awaitTermination(); 231 } 232 } 233 234 private static class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { 235 private final String serverId; 236 private final String host; 237 TestServiceImpl(String serverId, String host)238 private TestServiceImpl(String serverId, String host) { 239 this.serverId = serverId; 240 this.host = host; 241 } 242 243 @Override emptyCall( EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver)244 public void emptyCall( 245 EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver) { 246 responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance()); 247 responseObserver.onCompleted(); 248 } 249 250 @Override unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver)251 public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) { 252 responseObserver.onNext( 253 SimpleResponse.newBuilder().setServerId(serverId).setHostname(host).build()); 254 responseObserver.onCompleted(); 255 } 256 } 257 258 private static class XdsUpdateHealthServiceImpl 259 extends XdsUpdateHealthServiceGrpc.XdsUpdateHealthServiceImplBase { 260 private HealthStatusManager health; 261 XdsUpdateHealthServiceImpl(HealthStatusManager health)262 private XdsUpdateHealthServiceImpl(HealthStatusManager health) { 263 this.health = health; 264 } 265 266 @Override setServing( EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver)267 public void setServing( 268 EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver) { 269 health.setStatus("", ServingStatus.SERVING); 270 responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance()); 271 responseObserver.onCompleted(); 272 } 273 274 @Override setNotServing( EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver)275 public void setNotServing( 276 EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver) { 277 health.setStatus("", ServingStatus.NOT_SERVING); 278 responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance()); 279 responseObserver.onCompleted(); 280 } 281 } 282 283 private static class TestInfoInterceptor implements ServerInterceptor { 284 private final String host; 285 TestInfoInterceptor(String host)286 private TestInfoInterceptor(String host) { 287 this.host = host; 288 } 289 290 @Override interceptCall( ServerCall<ReqT, RespT> call, final Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next)291 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( 292 ServerCall<ReqT, RespT> call, 293 final Metadata requestHeaders, 294 ServerCallHandler<ReqT, RespT> next) { 295 List<String> callBehaviors = getCallBehaviors(requestHeaders); 296 ServerCall<ReqT, RespT> newCall = new SimpleForwardingServerCall<ReqT, RespT>(call) { 297 @Override 298 public void sendHeaders(Metadata responseHeaders) { 299 responseHeaders.put(HOSTNAME_KEY, host); 300 super.sendHeaders(responseHeaders); 301 } 302 }; 303 ServerCall.Listener<ReqT> noopListener = new ServerCall.Listener<ReqT>() {}; 304 305 int attemptNum = 0; 306 String attemptNumHeader = requestHeaders.get(ATTEMPT_NUM); 307 if (attemptNumHeader != null) { 308 try { 309 attemptNum = Integer.valueOf(attemptNumHeader); 310 } catch (NumberFormatException e) { 311 newCall.close( 312 Status.INVALID_ARGUMENT.withDescription( 313 "Invalid format for grpc-previous-rpc-attempts header: " + attemptNumHeader), 314 new Metadata()); 315 return noopListener; 316 } 317 } 318 319 for (String callBehavior : callBehaviors) { 320 if (callBehavior.startsWith(CALL_BEHAVIOR_HOSTNAME)) { 321 List<String> splitHeader = HEADER_HOSTNAME_SPLITTER.splitToList(callBehavior); 322 if (splitHeader.size() > 1) { 323 if (!splitHeader.get(0).substring(CALL_BEHAVIOR_HOSTNAME.length()).equals(host)) { 324 continue; 325 } 326 callBehavior = splitHeader.get(1); 327 } else { 328 newCall.close( 329 Status.INVALID_ARGUMENT.withDescription( 330 "Invalid format for rpc-behavior header: " + callBehavior), 331 new Metadata() 332 ); 333 return noopListener; 334 } 335 } 336 337 if (callBehavior.startsWith(CALL_BEHAVIOR_SLEEP_VALUE)) { 338 try { 339 int timeout = Integer.parseInt( 340 callBehavior.substring(CALL_BEHAVIOR_SLEEP_VALUE.length())); 341 Thread.sleep(timeout * 1000L); 342 } catch (NumberFormatException e) { 343 newCall.close( 344 Status.INVALID_ARGUMENT.withDescription( 345 "Invalid format for rpc-behavior header: " + callBehavior), 346 new Metadata()); 347 return noopListener; 348 } catch (InterruptedException e) { 349 Thread.currentThread().interrupt(); 350 newCall.close( 351 Status.ABORTED.withDescription("execution of server interrupted"), 352 new Metadata()); 353 return noopListener; 354 } 355 } 356 357 if (callBehavior.startsWith(CALL_BEHAVIOR_SUCCEED_ON_RETRY_ATTEMPT_VALUE)) { 358 int succeedOnAttemptNum = Integer.MAX_VALUE; 359 try { 360 succeedOnAttemptNum = Integer.parseInt( 361 callBehavior.substring(CALL_BEHAVIOR_SUCCEED_ON_RETRY_ATTEMPT_VALUE.length())); 362 } catch (NumberFormatException e) { 363 newCall.close( 364 Status.INVALID_ARGUMENT.withDescription( 365 "Invalid format for rpc-behavior header: " + callBehavior), 366 new Metadata()); 367 return noopListener; 368 } 369 if (attemptNum == succeedOnAttemptNum) { 370 return next.startCall(newCall, requestHeaders); 371 } 372 } 373 374 // hang if instructed by rpc-behavior 375 if (callBehavior.equals(CALL_BEHAVIOR_KEEP_OPEN_VALUE)) { 376 return noopListener; 377 } 378 379 if (callBehavior.startsWith(CALL_BEHAVIOR_ERROR_CODE)) { 380 try { 381 int codeValue = Integer.valueOf( 382 callBehavior.substring(CALL_BEHAVIOR_ERROR_CODE.length())); 383 newCall.close( 384 Status.fromCodeValue(codeValue).withDescription( 385 "Rpc failed as per the rpc-behavior header value:" + callBehaviors), 386 new Metadata()); 387 return noopListener; 388 } catch (NumberFormatException e) { 389 newCall.close( 390 Status.INVALID_ARGUMENT.withDescription( 391 "Invalid format for rpc-behavior header: " + callBehavior), 392 new Metadata()); 393 return noopListener; 394 } 395 } 396 } 397 398 return next.startCall(newCall, requestHeaders); 399 } 400 } 401 getCallBehaviors(Metadata requestHeaders)402 private static List<String> getCallBehaviors(Metadata requestHeaders) { 403 List<String> callBehaviors = new ArrayList<>(); 404 Iterable<String> values = requestHeaders.getAll(CALL_BEHAVIOR_MD_KEY); 405 if (values == null) { 406 return callBehaviors; 407 } 408 for (String value : values) { 409 Iterables.addAll(callBehaviors, HEADER_VALUE_SPLITTER.split(value)); 410 } 411 return callBehaviors; 412 } 413 } 414