• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2022 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.istio;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.CharMatcher;
21 import com.google.common.base.Splitter;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.util.concurrent.FutureCallback;
24 import com.google.common.util.concurrent.Futures;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import com.google.common.util.concurrent.MoreExecutors;
27 import io.grpc.ChannelCredentials;
28 import io.grpc.Context;
29 import io.grpc.Contexts;
30 import io.grpc.Grpc;
31 import io.grpc.InsecureChannelCredentials;
32 import io.grpc.InsecureServerCredentials;
33 import io.grpc.ManagedChannel;
34 import io.grpc.ManagedChannelBuilder;
35 import io.grpc.Metadata;
36 import io.grpc.Server;
37 import io.grpc.ServerBuilder;
38 import io.grpc.ServerCall;
39 import io.grpc.ServerCallHandler;
40 import io.grpc.ServerCredentials;
41 import io.grpc.ServerInterceptor;
42 import io.grpc.ServerInterceptors;
43 import io.grpc.ServerServiceDefinition;
44 import io.grpc.Status;
45 import io.grpc.StatusRuntimeException;
46 import io.grpc.TlsServerCredentials;
47 import io.grpc.services.AdminInterface;
48 import io.grpc.stub.MetadataUtils;
49 import io.grpc.stub.StreamObserver;
50 import io.grpc.xds.XdsChannelCredentials;
51 import io.grpc.xds.XdsServerCredentials;
52 import io.istio.test.Echo.EchoRequest;
53 import io.istio.test.Echo.EchoResponse;
54 import io.istio.test.Echo.ForwardEchoRequest;
55 import io.istio.test.Echo.ForwardEchoResponse;
56 import io.istio.test.Echo.Header;
57 import io.istio.test.EchoTestServiceGrpc;
58 import io.istio.test.EchoTestServiceGrpc.EchoTestServiceFutureStub;
59 import io.istio.test.EchoTestServiceGrpc.EchoTestServiceImplBase;
60 import java.io.File;
61 import java.io.IOException;
62 import java.net.InetAddress;
63 import java.net.InetSocketAddress;
64 import java.net.SocketAddress;
65 import java.time.Duration;
66 import java.time.Instant;
67 import java.util.ArrayList;
68 import java.util.Collection;
69 import java.util.HashMap;
70 import java.util.HashSet;
71 import java.util.List;
72 import java.util.Map;
73 import java.util.Random;
74 import java.util.Set;
75 import java.util.concurrent.CountDownLatch;
76 import java.util.concurrent.TimeUnit;
77 import java.util.logging.Level;
78 import java.util.logging.Logger;
79 import javax.annotation.Nullable;
80 
81 /**
82  * This class implements the Istio echo server functionality similar to
83  * https://github.com/istio/istio/blob/master/pkg/test/echo/server/endpoint/grpc.go .
84  * Please see Istio framework docs https://github.com/istio/istio/wiki/Istio-Test-Framework .
85  */
86 public final class EchoTestServer {
87 
88   private static final Logger logger = Logger.getLogger(EchoTestServer.class.getName());
89 
90   static final Context.Key<String> CLIENT_ADDRESS_CONTEXT_KEY =
91       Context.key("io.grpc.testing.istio.ClientAddress");
92   static final Context.Key<String> AUTHORITY_CONTEXT_KEY =
93       Context.key("io.grpc.testing.istio.Authority");
94   static final Context.Key<Map<String,String>> REQUEST_HEADERS_CONTEXT_KEY =
95       Context.key("io.grpc.testing.istio.RequestHeaders");
96 
97   private static final String REQUEST_ID = "x-request-id";
98   private static final String STATUS_CODE = "StatusCode";
99   private static final String HOST = "Host";
100   private static final String HOSTNAME = "Hostname";
101   private static final String REQUEST_HEADER = "RequestHeader";
102   private static final String IP = "IP";
103 
104   @VisibleForTesting List<Server> servers;
105 
106   /**
107    * Preprocess args, for:
108    * - merging duplicate flags. So "--grpc=8080 --grpc=9090" becomes
109    * "--grpc=8080,9090".
110    **/
111   @VisibleForTesting
preprocessArgs(String[] args)112   static Map<String, List<String>> preprocessArgs(String[] args) {
113     HashMap<String, List<String>> argsMap = new HashMap<>();
114     for (String arg : args) {
115       List<String> keyValue = Splitter.on('=').limit(2).splitToList(arg);
116 
117       if (keyValue.size() == 2) {
118         String key = keyValue.get(0);
119         String value = keyValue.get(1);
120         List<String> oldValue = argsMap.get(key);
121         if (oldValue == null) {
122           oldValue = new ArrayList<>();
123         }
124         oldValue.add(value);
125         argsMap.put(key, oldValue);
126       }
127     }
128     return ImmutableMap.<String, List<String>>builder().putAll(argsMap).build();
129   }
130 
131   /** Turn ports from a string list to an int list. */
132   @VisibleForTesting
getPorts(Map<String, List<String>> args, String flagName)133   static Set<Integer> getPorts(Map<String, List<String>> args, String flagName) {
134     List<String> grpcPorts = args.get(flagName);
135     Set<Integer> grpcPortsInt = new HashSet<>(grpcPorts.size());
136 
137     for (String port : grpcPorts) {
138       port = CharMatcher.is('\"').trimFrom(port);
139       grpcPortsInt.add(Integer.parseInt(port));
140     }
141     return grpcPortsInt;
142   }
143 
determineHostname()144   private static String determineHostname() {
145     try {
146       return InetAddress.getLocalHost().getHostName();
147     } catch (IOException ex) {
148       logger.log(Level.INFO, "Failed to determine hostname. Will generate one", ex);
149     }
150     // let's make an identifier for ourselves.
151     return "generated-" + new Random().nextInt();
152   }
153 
154   /**
155    * The main application allowing this program to be launched from the command line.
156    */
main(String[] args)157   public static void main(String[] args) throws Exception {
158     Map<String, List<String>> processedArgs = preprocessArgs(args);
159     Set<Integer> grpcPorts = getPorts(processedArgs, "--grpc");
160     Set<Integer> xdsPorts = getPorts(processedArgs, "--xds-grpc-server");
161     // If an xds port does not exist in gRPC ports set, add it.
162     grpcPorts.addAll(xdsPorts);
163     // which ports are supposed to use tls
164     Set<Integer> tlsPorts = getPorts(processedArgs, "--tls");
165     List<String> forwardingAddress = processedArgs.get("--forwarding-address");
166     if (forwardingAddress.size() > 1) {
167       logger.severe("More than one value for --forwarding-address not allowed");
168       System.exit(1);
169     }
170     if (forwardingAddress.size() == 0) {
171       forwardingAddress.add("0.0.0.0:7072");
172     }
173     List<String> key = processedArgs.get("key");
174     List<String> crt = processedArgs.get("crt");
175 
176     if (key.size() > 1 || crt.size() > 1) {
177       logger.severe("More than one value for --key or --crt not allowed");
178       System.exit(1);
179     }
180     if (key.size() != crt.size()) {
181       logger.severe("Both --key or --crt should be present or absent");
182       System.exit(1);
183     }
184     ServerCredentials tlsServerCredentials = null;
185     if (key.size() == 1) {
186       tlsServerCredentials = TlsServerCredentials.create(new File(crt.get(0)),
187           new File(key.get(0)));
188     } else if (!tlsPorts.isEmpty()) {
189       logger.severe("Both --key or --crt should be present if tls ports used");
190       System.exit(1);
191     }
192 
193     String hostname = determineHostname();
194     EchoTestServer echoTestServer = new EchoTestServer();
195     echoTestServer.runServers(hostname, grpcPorts, xdsPorts, tlsPorts, forwardingAddress.get(0),
196         tlsServerCredentials);
197     Runtime.getRuntime().addShutdownHook(new Thread(() -> {
198       try {
199         System.out.println("Shutting down");
200         echoTestServer.stopServers();
201       } catch (Exception e) {
202         logger.log(Level.SEVERE, "stopServers", e);
203         throw e;
204       }
205     }));
206     echoTestServer.blockUntilShutdown();
207   }
208 
runServers(String hostname, Collection<Integer> grpcPorts, Collection<Integer> xdsPorts, Collection<Integer> tlsPorts, String forwardingAddress, ServerCredentials tlsServerCredentials)209   void runServers(String hostname, Collection<Integer> grpcPorts, Collection<Integer> xdsPorts,
210       Collection<Integer> tlsPorts, String forwardingAddress,
211       ServerCredentials tlsServerCredentials)
212       throws IOException {
213     ServerServiceDefinition service = ServerInterceptors.intercept(
214         new EchoTestServiceImpl(hostname, forwardingAddress), new EchoTestServerInterceptor());
215     servers = new ArrayList<>(grpcPorts.size() + 1);
216     boolean runAdminServices = Boolean.getBoolean("EXPOSE_GRPC_ADMIN");
217     for (int port : grpcPorts) {
218       ServerCredentials serverCredentials = InsecureServerCredentials.create();
219       String credTypeString = "over plaintext";
220       if (xdsPorts.contains(port)) {
221         serverCredentials = XdsServerCredentials.create(InsecureServerCredentials.create());
222         credTypeString = "over xDS-configured mTLS";
223       } else if (tlsPorts.contains(port)) {
224         serverCredentials = tlsServerCredentials;
225         credTypeString = "over TLS";
226       }
227       servers.add(runServer(port, service, serverCredentials, credTypeString, runAdminServices));
228     }
229   }
230 
runServer( int port, ServerServiceDefinition service, ServerCredentials serverCredentials, String credTypeString, boolean runAdminServices)231   static Server runServer(
232       int port, ServerServiceDefinition service, ServerCredentials serverCredentials,
233       String credTypeString, boolean runAdminServices)
234       throws IOException {
235     logger.log(Level.INFO, "Listening GRPC ({0}) on {1}", new Object[]{credTypeString, port});
236     ServerBuilder<?> builder = Grpc.newServerBuilderForPort(port, serverCredentials)
237         .addService(service);
238     if (runAdminServices) {
239       builder = builder.addServices(AdminInterface.getStandardServices());
240     }
241     return builder.build().start();
242   }
243 
stopServers()244   void stopServers() {
245     for (Server server : servers) {
246       server.shutdownNow();
247     }
248   }
249 
blockUntilShutdown()250   void blockUntilShutdown() throws InterruptedException {
251     for (Server server : servers) {
252       if (!server.awaitTermination(5, TimeUnit.SECONDS)) {
253         System.err.println("Timed out waiting for server shutdown");
254       }
255     }
256   }
257 
258   private static class EchoTestServerInterceptor implements ServerInterceptor {
259 
260     @Override
interceptCall(ServerCall<ReqT, RespT> call, final Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next)261     public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
262         final Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next) {
263       final String methodName = call.getMethodDescriptor().getBareMethodName();
264 
265       // we need this processing only for Echo
266       if (!"Echo".equals(methodName)) {
267         return next.startCall(call, requestHeaders);
268       }
269       final SocketAddress peerAddress = call.getAttributes()
270           .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
271 
272       Context ctx = Context.current();
273       if (peerAddress instanceof InetSocketAddress) {
274         InetSocketAddress inetPeerAddress = (InetSocketAddress) peerAddress;
275         ctx = ctx.withValue(CLIENT_ADDRESS_CONTEXT_KEY,
276             inetPeerAddress.getAddress().getHostAddress());
277       }
278       ctx = ctx.withValue(AUTHORITY_CONTEXT_KEY, call.getAuthority());
279       Map<String, String> requestHeadersCopy = new HashMap<>();
280       for (String key : requestHeaders.keys()) {
281         if (!key.endsWith("-bin")) {
282           requestHeadersCopy.put(key,
283               requestHeaders.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)));
284         }
285       }
286       ctx = ctx.withValue(REQUEST_HEADERS_CONTEXT_KEY, requestHeadersCopy);
287       return Contexts.interceptCall(
288           ctx,
289           call,
290           requestHeaders,
291           next);
292     }
293   }
294 
295   private static class EchoTestServiceImpl extends EchoTestServiceImplBase {
296 
297     private final String hostname;
298     private final String forwardingAddress;
299     private final EchoTestServiceGrpc.EchoTestServiceBlockingStub forwardingStub;
300 
EchoTestServiceImpl(String hostname, String forwardingAddress)301     EchoTestServiceImpl(String hostname, String forwardingAddress) {
302       this.hostname = hostname;
303       this.forwardingAddress = forwardingAddress;
304       this.forwardingStub = EchoTestServiceGrpc.newBlockingStub(
305           Grpc.newChannelBuilder(forwardingAddress, InsecureChannelCredentials.create()).build());
306     }
307 
308     @Override
echo(EchoRequest request, io.grpc.stub.StreamObserver<EchoResponse> responseObserver)309     public void echo(EchoRequest request,
310         io.grpc.stub.StreamObserver<EchoResponse> responseObserver) {
311 
312       EchoMessage echoMessage = new EchoMessage();
313       echoMessage.writeKeyValue(HOSTNAME, hostname);
314       echoMessage.writeKeyValue("Echo", request.getMessage());
315       String clientAddress = CLIENT_ADDRESS_CONTEXT_KEY.get();
316       if (clientAddress != null) {
317         echoMessage.writeKeyValue(IP, clientAddress);
318       }
319       Map<String, String> requestHeadersCopy = REQUEST_HEADERS_CONTEXT_KEY.get();
320       for (Map.Entry<String, String> entry : requestHeadersCopy.entrySet()) {
321         echoMessage.writeKeyValueForRequest(REQUEST_HEADER, entry.getKey(), entry.getValue());
322       }
323       echoMessage.writeKeyValue(STATUS_CODE, "200");
324       echoMessage.writeKeyValue(HOST, AUTHORITY_CONTEXT_KEY.get());
325       EchoResponse echoResponse = EchoResponse.newBuilder()
326           .setMessage(echoMessage.toString())
327           .build();
328 
329       responseObserver.onNext(echoResponse);
330       responseObserver.onCompleted();
331     }
332 
333     @Override
forwardEcho(ForwardEchoRequest request, StreamObserver<ForwardEchoResponse> responseObserver)334     public void forwardEcho(ForwardEchoRequest request,
335         StreamObserver<ForwardEchoResponse> responseObserver) {
336       try {
337         responseObserver.onNext(buildEchoResponse(request));
338         responseObserver.onCompleted();
339       } catch (InterruptedException e) {
340         responseObserver.onError(e);
341         Thread.currentThread().interrupt();
342       } catch (Exception e) {
343         responseObserver.onError(e);
344       }
345     }
346 
347     private static final class EchoCall {
348       EchoResponse response;
349       Status status;
350     }
351 
buildEchoResponse(ForwardEchoRequest request)352     private ForwardEchoResponse buildEchoResponse(ForwardEchoRequest request)
353         throws InterruptedException {
354       ForwardEchoResponse.Builder forwardEchoResponseBuilder
355           = ForwardEchoResponse.newBuilder();
356       String rawUrl = request.getUrl();
357       List<String> urlParts = Splitter.on(':').limit(2).splitToList(rawUrl);
358       if (urlParts.size() < 2) {
359         throw new StatusRuntimeException(
360             Status.INVALID_ARGUMENT.withDescription("No protocol configured for url " + rawUrl));
361       }
362       ChannelCredentials creds;
363       String target = null;
364       if ("grpc".equals(urlParts.get(0))) {
365         // We don't really want to test this but the istio test infrastructure needs
366         // this to be supported. If we ever decide to add support for this properly,
367         // we would need to add support for TLS creds here.
368         creds = InsecureChannelCredentials.create();
369         target = urlParts.get(1).substring(2);
370       } else if ("xds".equals(urlParts.get(0))) {
371         creds = XdsChannelCredentials.create(InsecureChannelCredentials.create());
372         target = rawUrl;
373       } else {
374         logger.log(Level.INFO, "Protocol {0} not supported. Forwarding to {1}",
375             new String[]{urlParts.get(0), forwardingAddress});
376         return forwardingStub.withDeadline(Context.current().getDeadline()).forwardEcho(request);
377       }
378 
379       ManagedChannelBuilder<?> channelBuilder = Grpc.newChannelBuilder(target, creds);
380       ManagedChannel channel = channelBuilder.build();
381 
382       List<Header> requestHeaders = request.getHeadersList();
383       Metadata metadata = new Metadata();
384 
385       for (Header header : requestHeaders) {
386         metadata.put(Metadata.Key.of(header.getKey(), Metadata.ASCII_STRING_MARSHALLER),
387             header.getValue());
388       }
389 
390       int count = request.getCount() == 0 ? 1 : request.getCount();
391       // Calculate the amount of time to sleep after each call.
392       Duration durationPerQuery = Duration.ZERO;
393       if (request.getQps() > 0) {
394         durationPerQuery = Duration.ofNanos(
395             Duration.ofSeconds(1).toNanos() / request.getQps());
396       }
397       logger.info("qps=" + request.getQps());
398       logger.info("durationPerQuery=" + durationPerQuery);
399       EchoRequest echoRequest = EchoRequest.newBuilder()
400           .setMessage(request.getMessage())
401           .build();
402       Instant start = Instant.now();
403       logger.info("starting instant=" + start);
404       Duration expected = Duration.ZERO;
405       final CountDownLatch latch = new CountDownLatch(count);
406       EchoCall[] echoCalls = new EchoCall[count];
407       for (int i = 0; i < count; i++) {
408         Metadata currentMetadata = new Metadata();
409         currentMetadata.merge(metadata);
410         currentMetadata.put(
411             Metadata.Key.of(REQUEST_ID, Metadata.ASCII_STRING_MARSHALLER), "" + i);
412         EchoTestServiceGrpc.EchoTestServiceFutureStub stub
413             = EchoTestServiceGrpc.newFutureStub(channel).withInterceptors(
414                 MetadataUtils.newAttachHeadersInterceptor(currentMetadata))
415             .withDeadlineAfter(request.getTimeoutMicros(), TimeUnit.MICROSECONDS);
416 
417         echoCalls[i] = new EchoCall();
418         callEcho(stub, echoRequest, echoCalls[i], latch);
419         Instant current = Instant.now();
420         logger.info("after rpc instant=" + current);
421         Duration elapsed = Duration.between(start, current);
422         expected = expected.plus(durationPerQuery);
423         Duration timeLeft = expected.minus(elapsed);
424         logger.info("elapsed=" + elapsed + ", expected=" + expected + ", timeLeft=" + timeLeft);
425         if (!timeLeft.isNegative()) {
426           logger.info("sleeping for ms =" + timeLeft);
427           Thread.sleep(timeLeft.toMillis());
428         }
429       }
430       latch.await();
431       for (int i = 0; i < count; i++) {
432         if (Status.OK.equals(echoCalls[i].status)) {
433           forwardEchoResponseBuilder.addOutput(
434               buildForwardEchoStruct(i, echoCalls, request.getMessage()));
435         } else {
436           logger.log(Level.SEVERE, "RPC {0} failed {1}: {2}",
437               new Object[]{i, echoCalls[i].status.getCode(), echoCalls[i].status.getDescription()});
438           forwardEchoResponseBuilder.clear();
439           throw echoCalls[i].status.asRuntimeException();
440         }
441       }
442       return forwardEchoResponseBuilder.build();
443     }
444 
buildForwardEchoStruct(int i, EchoCall[] echoCalls, String requestMessage)445     private static String buildForwardEchoStruct(int i, EchoCall[] echoCalls,
446         String requestMessage) {
447       // The test infrastructure might expect the entire struct instead of
448       // just the message.
449       StringBuilder sb = new StringBuilder();
450       sb.append(String.format("[%d] grpcecho.Echo(%s)\n", i, requestMessage));
451       Iterable<String> iterable = Splitter.on('\n').split(echoCalls[i].response.getMessage());
452       for (String line : iterable) {
453         if (!line.isEmpty()) {
454           sb.append(String.format("[%d body] %s\n", i, line));
455         }
456       }
457       return sb.toString();
458     }
459 
callEcho(EchoTestServiceFutureStub stub, EchoRequest echoRequest, final EchoCall echoCall, CountDownLatch latch)460     private void callEcho(EchoTestServiceFutureStub stub,
461         EchoRequest echoRequest, final EchoCall echoCall, CountDownLatch latch) {
462 
463       ListenableFuture<EchoResponse> response = stub.echo(echoRequest);
464       Futures.addCallback(
465           response,
466           new FutureCallback<EchoResponse>() {
467             @Override
468             public void onSuccess(@Nullable EchoResponse result) {
469               echoCall.response = result;
470               echoCall.status = Status.OK;
471               latch.countDown();
472             }
473 
474             @Override
475             public void onFailure(Throwable t) {
476               echoCall.status = Status.fromThrowable(t);
477               latch.countDown();
478             }
479           },
480           MoreExecutors.directExecutor());
481     }
482   }
483 
484   private static class EchoMessage {
485     private final StringBuilder sb = new StringBuilder();
486 
writeKeyValue(String key, String value)487     void writeKeyValue(String key, String value) {
488       sb.append(key).append("=").append(value).append("\n");
489     }
490 
writeKeyValueForRequest(String requestHeader, String key, String value)491     void writeKeyValueForRequest(String requestHeader, String key, String value) {
492       if (value != null) {
493         writeKeyValue(requestHeader, key + ":" + value);
494       }
495     }
496 
497     @Override
toString()498     public String toString() {
499       return sb.toString();
500     }
501   }
502 }
503