1 /* 2 * Copyright 2022 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.istio; 18 19 import com.google.common.annotations.VisibleForTesting; 20 import com.google.common.base.CharMatcher; 21 import com.google.common.base.Splitter; 22 import com.google.common.collect.ImmutableMap; 23 import com.google.common.util.concurrent.FutureCallback; 24 import com.google.common.util.concurrent.Futures; 25 import com.google.common.util.concurrent.ListenableFuture; 26 import com.google.common.util.concurrent.MoreExecutors; 27 import io.grpc.ChannelCredentials; 28 import io.grpc.Context; 29 import io.grpc.Contexts; 30 import io.grpc.Grpc; 31 import io.grpc.InsecureChannelCredentials; 32 import io.grpc.InsecureServerCredentials; 33 import io.grpc.ManagedChannel; 34 import io.grpc.ManagedChannelBuilder; 35 import io.grpc.Metadata; 36 import io.grpc.Server; 37 import io.grpc.ServerBuilder; 38 import io.grpc.ServerCall; 39 import io.grpc.ServerCallHandler; 40 import io.grpc.ServerCredentials; 41 import io.grpc.ServerInterceptor; 42 import io.grpc.ServerInterceptors; 43 import io.grpc.ServerServiceDefinition; 44 import io.grpc.Status; 45 import io.grpc.StatusRuntimeException; 46 import io.grpc.TlsServerCredentials; 47 import io.grpc.services.AdminInterface; 48 import io.grpc.stub.MetadataUtils; 49 import io.grpc.stub.StreamObserver; 50 import io.grpc.xds.XdsChannelCredentials; 51 import io.grpc.xds.XdsServerCredentials; 52 import io.istio.test.Echo.EchoRequest; 53 import io.istio.test.Echo.EchoResponse; 54 import io.istio.test.Echo.ForwardEchoRequest; 55 import io.istio.test.Echo.ForwardEchoResponse; 56 import io.istio.test.Echo.Header; 57 import io.istio.test.EchoTestServiceGrpc; 58 import io.istio.test.EchoTestServiceGrpc.EchoTestServiceFutureStub; 59 import io.istio.test.EchoTestServiceGrpc.EchoTestServiceImplBase; 60 import java.io.File; 61 import java.io.IOException; 62 import java.net.InetAddress; 63 import java.net.InetSocketAddress; 64 import java.net.SocketAddress; 65 import java.time.Duration; 66 import java.time.Instant; 67 import java.util.ArrayList; 68 import java.util.Collection; 69 import java.util.HashMap; 70 import java.util.HashSet; 71 import java.util.List; 72 import java.util.Map; 73 import java.util.Random; 74 import java.util.Set; 75 import java.util.concurrent.CountDownLatch; 76 import java.util.concurrent.TimeUnit; 77 import java.util.logging.Level; 78 import java.util.logging.Logger; 79 import javax.annotation.Nullable; 80 81 /** 82 * This class implements the Istio echo server functionality similar to 83 * https://github.com/istio/istio/blob/master/pkg/test/echo/server/endpoint/grpc.go . 84 * Please see Istio framework docs https://github.com/istio/istio/wiki/Istio-Test-Framework . 85 */ 86 public final class EchoTestServer { 87 88 private static final Logger logger = Logger.getLogger(EchoTestServer.class.getName()); 89 90 static final Context.Key<String> CLIENT_ADDRESS_CONTEXT_KEY = 91 Context.key("io.grpc.testing.istio.ClientAddress"); 92 static final Context.Key<String> AUTHORITY_CONTEXT_KEY = 93 Context.key("io.grpc.testing.istio.Authority"); 94 static final Context.Key<Map<String,String>> REQUEST_HEADERS_CONTEXT_KEY = 95 Context.key("io.grpc.testing.istio.RequestHeaders"); 96 97 private static final String REQUEST_ID = "x-request-id"; 98 private static final String STATUS_CODE = "StatusCode"; 99 private static final String HOST = "Host"; 100 private static final String HOSTNAME = "Hostname"; 101 private static final String REQUEST_HEADER = "RequestHeader"; 102 private static final String IP = "IP"; 103 104 @VisibleForTesting List<Server> servers; 105 106 /** 107 * Preprocess args, for: 108 * - merging duplicate flags. So "--grpc=8080 --grpc=9090" becomes 109 * "--grpc=8080,9090". 110 **/ 111 @VisibleForTesting preprocessArgs(String[] args)112 static Map<String, List<String>> preprocessArgs(String[] args) { 113 HashMap<String, List<String>> argsMap = new HashMap<>(); 114 for (String arg : args) { 115 List<String> keyValue = Splitter.on('=').limit(2).splitToList(arg); 116 117 if (keyValue.size() == 2) { 118 String key = keyValue.get(0); 119 String value = keyValue.get(1); 120 List<String> oldValue = argsMap.get(key); 121 if (oldValue == null) { 122 oldValue = new ArrayList<>(); 123 } 124 oldValue.add(value); 125 argsMap.put(key, oldValue); 126 } 127 } 128 return ImmutableMap.<String, List<String>>builder().putAll(argsMap).build(); 129 } 130 131 /** Turn ports from a string list to an int list. */ 132 @VisibleForTesting getPorts(Map<String, List<String>> args, String flagName)133 static Set<Integer> getPorts(Map<String, List<String>> args, String flagName) { 134 List<String> grpcPorts = args.get(flagName); 135 Set<Integer> grpcPortsInt = new HashSet<>(grpcPorts.size()); 136 137 for (String port : grpcPorts) { 138 port = CharMatcher.is('\"').trimFrom(port); 139 grpcPortsInt.add(Integer.parseInt(port)); 140 } 141 return grpcPortsInt; 142 } 143 determineHostname()144 private static String determineHostname() { 145 try { 146 return InetAddress.getLocalHost().getHostName(); 147 } catch (IOException ex) { 148 logger.log(Level.INFO, "Failed to determine hostname. Will generate one", ex); 149 } 150 // let's make an identifier for ourselves. 151 return "generated-" + new Random().nextInt(); 152 } 153 154 /** 155 * The main application allowing this program to be launched from the command line. 156 */ main(String[] args)157 public static void main(String[] args) throws Exception { 158 Map<String, List<String>> processedArgs = preprocessArgs(args); 159 Set<Integer> grpcPorts = getPorts(processedArgs, "--grpc"); 160 Set<Integer> xdsPorts = getPorts(processedArgs, "--xds-grpc-server"); 161 // If an xds port does not exist in gRPC ports set, add it. 162 grpcPorts.addAll(xdsPorts); 163 // which ports are supposed to use tls 164 Set<Integer> tlsPorts = getPorts(processedArgs, "--tls"); 165 List<String> forwardingAddress = processedArgs.get("--forwarding-address"); 166 if (forwardingAddress.size() > 1) { 167 logger.severe("More than one value for --forwarding-address not allowed"); 168 System.exit(1); 169 } 170 if (forwardingAddress.size() == 0) { 171 forwardingAddress.add("0.0.0.0:7072"); 172 } 173 List<String> key = processedArgs.get("key"); 174 List<String> crt = processedArgs.get("crt"); 175 176 if (key.size() > 1 || crt.size() > 1) { 177 logger.severe("More than one value for --key or --crt not allowed"); 178 System.exit(1); 179 } 180 if (key.size() != crt.size()) { 181 logger.severe("Both --key or --crt should be present or absent"); 182 System.exit(1); 183 } 184 ServerCredentials tlsServerCredentials = null; 185 if (key.size() == 1) { 186 tlsServerCredentials = TlsServerCredentials.create(new File(crt.get(0)), 187 new File(key.get(0))); 188 } else if (!tlsPorts.isEmpty()) { 189 logger.severe("Both --key or --crt should be present if tls ports used"); 190 System.exit(1); 191 } 192 193 String hostname = determineHostname(); 194 EchoTestServer echoTestServer = new EchoTestServer(); 195 echoTestServer.runServers(hostname, grpcPorts, xdsPorts, tlsPorts, forwardingAddress.get(0), 196 tlsServerCredentials); 197 Runtime.getRuntime().addShutdownHook(new Thread(() -> { 198 try { 199 System.out.println("Shutting down"); 200 echoTestServer.stopServers(); 201 } catch (Exception e) { 202 logger.log(Level.SEVERE, "stopServers", e); 203 throw e; 204 } 205 })); 206 echoTestServer.blockUntilShutdown(); 207 } 208 runServers(String hostname, Collection<Integer> grpcPorts, Collection<Integer> xdsPorts, Collection<Integer> tlsPorts, String forwardingAddress, ServerCredentials tlsServerCredentials)209 void runServers(String hostname, Collection<Integer> grpcPorts, Collection<Integer> xdsPorts, 210 Collection<Integer> tlsPorts, String forwardingAddress, 211 ServerCredentials tlsServerCredentials) 212 throws IOException { 213 ServerServiceDefinition service = ServerInterceptors.intercept( 214 new EchoTestServiceImpl(hostname, forwardingAddress), new EchoTestServerInterceptor()); 215 servers = new ArrayList<>(grpcPorts.size() + 1); 216 boolean runAdminServices = Boolean.getBoolean("EXPOSE_GRPC_ADMIN"); 217 for (int port : grpcPorts) { 218 ServerCredentials serverCredentials = InsecureServerCredentials.create(); 219 String credTypeString = "over plaintext"; 220 if (xdsPorts.contains(port)) { 221 serverCredentials = XdsServerCredentials.create(InsecureServerCredentials.create()); 222 credTypeString = "over xDS-configured mTLS"; 223 } else if (tlsPorts.contains(port)) { 224 serverCredentials = tlsServerCredentials; 225 credTypeString = "over TLS"; 226 } 227 servers.add(runServer(port, service, serverCredentials, credTypeString, runAdminServices)); 228 } 229 } 230 runServer( int port, ServerServiceDefinition service, ServerCredentials serverCredentials, String credTypeString, boolean runAdminServices)231 static Server runServer( 232 int port, ServerServiceDefinition service, ServerCredentials serverCredentials, 233 String credTypeString, boolean runAdminServices) 234 throws IOException { 235 logger.log(Level.INFO, "Listening GRPC ({0}) on {1}", new Object[]{credTypeString, port}); 236 ServerBuilder<?> builder = Grpc.newServerBuilderForPort(port, serverCredentials) 237 .addService(service); 238 if (runAdminServices) { 239 builder = builder.addServices(AdminInterface.getStandardServices()); 240 } 241 return builder.build().start(); 242 } 243 stopServers()244 void stopServers() { 245 for (Server server : servers) { 246 server.shutdownNow(); 247 } 248 } 249 blockUntilShutdown()250 void blockUntilShutdown() throws InterruptedException { 251 for (Server server : servers) { 252 if (!server.awaitTermination(5, TimeUnit.SECONDS)) { 253 System.err.println("Timed out waiting for server shutdown"); 254 } 255 } 256 } 257 258 private static class EchoTestServerInterceptor implements ServerInterceptor { 259 260 @Override interceptCall(ServerCall<ReqT, RespT> call, final Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next)261 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, 262 final Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next) { 263 final String methodName = call.getMethodDescriptor().getBareMethodName(); 264 265 // we need this processing only for Echo 266 if (!"Echo".equals(methodName)) { 267 return next.startCall(call, requestHeaders); 268 } 269 final SocketAddress peerAddress = call.getAttributes() 270 .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); 271 272 Context ctx = Context.current(); 273 if (peerAddress instanceof InetSocketAddress) { 274 InetSocketAddress inetPeerAddress = (InetSocketAddress) peerAddress; 275 ctx = ctx.withValue(CLIENT_ADDRESS_CONTEXT_KEY, 276 inetPeerAddress.getAddress().getHostAddress()); 277 } 278 ctx = ctx.withValue(AUTHORITY_CONTEXT_KEY, call.getAuthority()); 279 Map<String, String> requestHeadersCopy = new HashMap<>(); 280 for (String key : requestHeaders.keys()) { 281 if (!key.endsWith("-bin")) { 282 requestHeadersCopy.put(key, 283 requestHeaders.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER))); 284 } 285 } 286 ctx = ctx.withValue(REQUEST_HEADERS_CONTEXT_KEY, requestHeadersCopy); 287 return Contexts.interceptCall( 288 ctx, 289 call, 290 requestHeaders, 291 next); 292 } 293 } 294 295 private static class EchoTestServiceImpl extends EchoTestServiceImplBase { 296 297 private final String hostname; 298 private final String forwardingAddress; 299 private final EchoTestServiceGrpc.EchoTestServiceBlockingStub forwardingStub; 300 EchoTestServiceImpl(String hostname, String forwardingAddress)301 EchoTestServiceImpl(String hostname, String forwardingAddress) { 302 this.hostname = hostname; 303 this.forwardingAddress = forwardingAddress; 304 this.forwardingStub = EchoTestServiceGrpc.newBlockingStub( 305 Grpc.newChannelBuilder(forwardingAddress, InsecureChannelCredentials.create()).build()); 306 } 307 308 @Override echo(EchoRequest request, io.grpc.stub.StreamObserver<EchoResponse> responseObserver)309 public void echo(EchoRequest request, 310 io.grpc.stub.StreamObserver<EchoResponse> responseObserver) { 311 312 EchoMessage echoMessage = new EchoMessage(); 313 echoMessage.writeKeyValue(HOSTNAME, hostname); 314 echoMessage.writeKeyValue("Echo", request.getMessage()); 315 String clientAddress = CLIENT_ADDRESS_CONTEXT_KEY.get(); 316 if (clientAddress != null) { 317 echoMessage.writeKeyValue(IP, clientAddress); 318 } 319 Map<String, String> requestHeadersCopy = REQUEST_HEADERS_CONTEXT_KEY.get(); 320 for (Map.Entry<String, String> entry : requestHeadersCopy.entrySet()) { 321 echoMessage.writeKeyValueForRequest(REQUEST_HEADER, entry.getKey(), entry.getValue()); 322 } 323 echoMessage.writeKeyValue(STATUS_CODE, "200"); 324 echoMessage.writeKeyValue(HOST, AUTHORITY_CONTEXT_KEY.get()); 325 EchoResponse echoResponse = EchoResponse.newBuilder() 326 .setMessage(echoMessage.toString()) 327 .build(); 328 329 responseObserver.onNext(echoResponse); 330 responseObserver.onCompleted(); 331 } 332 333 @Override forwardEcho(ForwardEchoRequest request, StreamObserver<ForwardEchoResponse> responseObserver)334 public void forwardEcho(ForwardEchoRequest request, 335 StreamObserver<ForwardEchoResponse> responseObserver) { 336 try { 337 responseObserver.onNext(buildEchoResponse(request)); 338 responseObserver.onCompleted(); 339 } catch (InterruptedException e) { 340 responseObserver.onError(e); 341 Thread.currentThread().interrupt(); 342 } catch (Exception e) { 343 responseObserver.onError(e); 344 } 345 } 346 347 private static final class EchoCall { 348 EchoResponse response; 349 Status status; 350 } 351 buildEchoResponse(ForwardEchoRequest request)352 private ForwardEchoResponse buildEchoResponse(ForwardEchoRequest request) 353 throws InterruptedException { 354 ForwardEchoResponse.Builder forwardEchoResponseBuilder 355 = ForwardEchoResponse.newBuilder(); 356 String rawUrl = request.getUrl(); 357 List<String> urlParts = Splitter.on(':').limit(2).splitToList(rawUrl); 358 if (urlParts.size() < 2) { 359 throw new StatusRuntimeException( 360 Status.INVALID_ARGUMENT.withDescription("No protocol configured for url " + rawUrl)); 361 } 362 ChannelCredentials creds; 363 String target = null; 364 if ("grpc".equals(urlParts.get(0))) { 365 // We don't really want to test this but the istio test infrastructure needs 366 // this to be supported. If we ever decide to add support for this properly, 367 // we would need to add support for TLS creds here. 368 creds = InsecureChannelCredentials.create(); 369 target = urlParts.get(1).substring(2); 370 } else if ("xds".equals(urlParts.get(0))) { 371 creds = XdsChannelCredentials.create(InsecureChannelCredentials.create()); 372 target = rawUrl; 373 } else { 374 logger.log(Level.INFO, "Protocol {0} not supported. Forwarding to {1}", 375 new String[]{urlParts.get(0), forwardingAddress}); 376 return forwardingStub.withDeadline(Context.current().getDeadline()).forwardEcho(request); 377 } 378 379 ManagedChannelBuilder<?> channelBuilder = Grpc.newChannelBuilder(target, creds); 380 ManagedChannel channel = channelBuilder.build(); 381 382 List<Header> requestHeaders = request.getHeadersList(); 383 Metadata metadata = new Metadata(); 384 385 for (Header header : requestHeaders) { 386 metadata.put(Metadata.Key.of(header.getKey(), Metadata.ASCII_STRING_MARSHALLER), 387 header.getValue()); 388 } 389 390 int count = request.getCount() == 0 ? 1 : request.getCount(); 391 // Calculate the amount of time to sleep after each call. 392 Duration durationPerQuery = Duration.ZERO; 393 if (request.getQps() > 0) { 394 durationPerQuery = Duration.ofNanos( 395 Duration.ofSeconds(1).toNanos() / request.getQps()); 396 } 397 logger.info("qps=" + request.getQps()); 398 logger.info("durationPerQuery=" + durationPerQuery); 399 EchoRequest echoRequest = EchoRequest.newBuilder() 400 .setMessage(request.getMessage()) 401 .build(); 402 Instant start = Instant.now(); 403 logger.info("starting instant=" + start); 404 Duration expected = Duration.ZERO; 405 final CountDownLatch latch = new CountDownLatch(count); 406 EchoCall[] echoCalls = new EchoCall[count]; 407 for (int i = 0; i < count; i++) { 408 Metadata currentMetadata = new Metadata(); 409 currentMetadata.merge(metadata); 410 currentMetadata.put( 411 Metadata.Key.of(REQUEST_ID, Metadata.ASCII_STRING_MARSHALLER), "" + i); 412 EchoTestServiceGrpc.EchoTestServiceFutureStub stub 413 = EchoTestServiceGrpc.newFutureStub(channel).withInterceptors( 414 MetadataUtils.newAttachHeadersInterceptor(currentMetadata)) 415 .withDeadlineAfter(request.getTimeoutMicros(), TimeUnit.MICROSECONDS); 416 417 echoCalls[i] = new EchoCall(); 418 callEcho(stub, echoRequest, echoCalls[i], latch); 419 Instant current = Instant.now(); 420 logger.info("after rpc instant=" + current); 421 Duration elapsed = Duration.between(start, current); 422 expected = expected.plus(durationPerQuery); 423 Duration timeLeft = expected.minus(elapsed); 424 logger.info("elapsed=" + elapsed + ", expected=" + expected + ", timeLeft=" + timeLeft); 425 if (!timeLeft.isNegative()) { 426 logger.info("sleeping for ms =" + timeLeft); 427 Thread.sleep(timeLeft.toMillis()); 428 } 429 } 430 latch.await(); 431 for (int i = 0; i < count; i++) { 432 if (Status.OK.equals(echoCalls[i].status)) { 433 forwardEchoResponseBuilder.addOutput( 434 buildForwardEchoStruct(i, echoCalls, request.getMessage())); 435 } else { 436 logger.log(Level.SEVERE, "RPC {0} failed {1}: {2}", 437 new Object[]{i, echoCalls[i].status.getCode(), echoCalls[i].status.getDescription()}); 438 forwardEchoResponseBuilder.clear(); 439 throw echoCalls[i].status.asRuntimeException(); 440 } 441 } 442 return forwardEchoResponseBuilder.build(); 443 } 444 buildForwardEchoStruct(int i, EchoCall[] echoCalls, String requestMessage)445 private static String buildForwardEchoStruct(int i, EchoCall[] echoCalls, 446 String requestMessage) { 447 // The test infrastructure might expect the entire struct instead of 448 // just the message. 449 StringBuilder sb = new StringBuilder(); 450 sb.append(String.format("[%d] grpcecho.Echo(%s)\n", i, requestMessage)); 451 Iterable<String> iterable = Splitter.on('\n').split(echoCalls[i].response.getMessage()); 452 for (String line : iterable) { 453 if (!line.isEmpty()) { 454 sb.append(String.format("[%d body] %s\n", i, line)); 455 } 456 } 457 return sb.toString(); 458 } 459 callEcho(EchoTestServiceFutureStub stub, EchoRequest echoRequest, final EchoCall echoCall, CountDownLatch latch)460 private void callEcho(EchoTestServiceFutureStub stub, 461 EchoRequest echoRequest, final EchoCall echoCall, CountDownLatch latch) { 462 463 ListenableFuture<EchoResponse> response = stub.echo(echoRequest); 464 Futures.addCallback( 465 response, 466 new FutureCallback<EchoResponse>() { 467 @Override 468 public void onSuccess(@Nullable EchoResponse result) { 469 echoCall.response = result; 470 echoCall.status = Status.OK; 471 latch.countDown(); 472 } 473 474 @Override 475 public void onFailure(Throwable t) { 476 echoCall.status = Status.fromThrowable(t); 477 latch.countDown(); 478 } 479 }, 480 MoreExecutors.directExecutor()); 481 } 482 } 483 484 private static class EchoMessage { 485 private final StringBuilder sb = new StringBuilder(); 486 writeKeyValue(String key, String value)487 void writeKeyValue(String key, String value) { 488 sb.append(key).append("=").append(value).append("\n"); 489 } 490 writeKeyValueForRequest(String requestHeader, String key, String value)491 void writeKeyValueForRequest(String requestHeader, String key, String value) { 492 if (value != null) { 493 writeKeyValue(requestHeader, key + ":" + value); 494 } 495 } 496 497 @Override toString()498 public String toString() { 499 return sb.toString(); 500 } 501 } 502 } 503