• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2020 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import com.google.common.base.Splitter;
20 import com.google.common.collect.Iterables;
21 import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
22 import io.grpc.InsecureServerCredentials;
23 import io.grpc.Metadata;
24 import io.grpc.Server;
25 import io.grpc.ServerCall;
26 import io.grpc.ServerCallHandler;
27 import io.grpc.ServerInterceptor;
28 import io.grpc.ServerInterceptors;
29 import io.grpc.Status;
30 import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
31 import io.grpc.netty.NettyServerBuilder;
32 import io.grpc.protobuf.services.HealthStatusManager;
33 import io.grpc.protobuf.services.ProtoReflectionService;
34 import io.grpc.services.AdminInterface;
35 import io.grpc.stub.StreamObserver;
36 import io.grpc.testing.integration.Messages.SimpleRequest;
37 import io.grpc.testing.integration.Messages.SimpleResponse;
38 import io.grpc.xds.XdsServerBuilder;
39 import io.grpc.xds.XdsServerCredentials;
40 import java.net.InetAddress;
41 import java.net.UnknownHostException;
42 import java.util.ArrayList;
43 import java.util.List;
44 import java.util.concurrent.TimeUnit;
45 import java.util.logging.Level;
46 import java.util.logging.Logger;
47 
48 /** Interop test server that implements the xDS testing service. */
49 public final class XdsTestServer {
50   static final Metadata.Key<String> HOSTNAME_KEY =
51       Metadata.Key.of("hostname", Metadata.ASCII_STRING_MARSHALLER);
52   private static final Metadata.Key<String> CALL_BEHAVIOR_MD_KEY =
53       Metadata.Key.of("rpc-behavior", Metadata.ASCII_STRING_MARSHALLER);
54   private static final Metadata.Key<String> ATTEMPT_NUM =
55       Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);
56   private static final String CALL_BEHAVIOR_KEEP_OPEN_VALUE = "keep-open";
57   private static final String CALL_BEHAVIOR_SLEEP_VALUE = "sleep-";
58   private static final String CALL_BEHAVIOR_SUCCEED_ON_RETRY_ATTEMPT_VALUE =
59       "succeed-on-retry-attempt-";
60   private static final String CALL_BEHAVIOR_ERROR_CODE =
61       "error-code-";
62   private static final String CALL_BEHAVIOR_HOSTNAME = "hostname=";
63   private static final Splitter HEADER_VALUE_SPLITTER = Splitter.on(',')
64       .trimResults()
65       .omitEmptyStrings();
66   private static final Splitter HEADER_HOSTNAME_SPLITTER = Splitter.on(' ');
67 
68   private static Logger logger = Logger.getLogger(XdsTestServer.class.getName());
69 
70   private int port = 8080;
71   private int maintenancePort = 8080;
72   private boolean secureMode = false;
73   private String serverId = "java_server";
74   private HealthStatusManager health;
75   private Server server;
76   private Server maintenanceServer;
77   private String host;
78 
79   /**
80    * The main application allowing this client to be launched from the command line.
81    */
main(String[] args)82   public static void main(String[] args) throws Exception {
83     final XdsTestServer server = new XdsTestServer();
84     server.parseArgs(args);
85     Runtime.getRuntime()
86         .addShutdownHook(
87             new Thread() {
88               @Override
89               @SuppressWarnings("CatchAndPrintStackTrace")
90               public void run() {
91                 try {
92                   System.out.println("Shutting down");
93                   server.stop();
94                 } catch (Exception e) {
95                   e.printStackTrace();
96                 }
97               }
98             });
99     server.start();
100     System.out.println("Server started on port " + server.port);
101     server.blockUntilShutdown();
102   }
103 
parseArgs(String[] args)104   private void parseArgs(String[] args) {
105     boolean usage = false;
106     for (String arg : args) {
107       if (!arg.startsWith("--")) {
108         System.err.println("All arguments must start with '--': " + arg);
109         usage = true;
110         break;
111       }
112       String[] parts = arg.substring(2).split("=", 2);
113       String key = parts[0];
114       if ("help".equals(key)) {
115         usage = true;
116         break;
117       }
118       if (parts.length != 2) {
119         System.err.println("All arguments must be of the form --arg=value");
120         usage = true;
121         break;
122       }
123       String value = parts[1];
124       if ("port".equals(key)) {
125         port = Integer.valueOf(value);
126       } else if ("maintenance_port".equals(key)) {
127         maintenancePort = Integer.valueOf(value);
128       } else if ("secure_mode".equals(key)) {
129         secureMode = Boolean.parseBoolean(value);
130       } else if ("server_id".equals(key)) {
131         serverId = value;
132       } else {
133         System.err.println("Unknown argument: " + key);
134         usage = true;
135         break;
136       }
137     }
138 
139     if (secureMode && (port == maintenancePort)) {
140       System.err.println(
141           "port and maintenance_port should be different for secure mode: port="
142               + port
143               + ", maintenance_port="
144               + maintenancePort);
145       usage = true;
146     }
147 
148     if (usage) {
149       XdsTestServer s = new XdsTestServer();
150       System.err.println(
151           "Usage: [ARGS...]"
152               + "\n"
153               + "\n  --port=INT          listening port for test server."
154               + "\n                      Default: "
155               + s.port
156               + "\n  --maintenance_port=INT      listening port for other servers."
157               + "\n                      Default: "
158               + s.maintenancePort
159               + "\n  --secure_mode=BOOLEAN Use true to enable XdsCredentials."
160               + " port and maintenance_port should be different for secure mode."
161               + "\n                      Default: "
162               + s.secureMode
163               + "\n  --server_id=STRING  server ID for response."
164               + "\n                      Default: "
165               + s.serverId);
166       System.exit(1);
167     }
168   }
169 
start()170   private void start() throws Exception {
171     try {
172       host = InetAddress.getLocalHost().getHostName();
173     } catch (UnknownHostException e) {
174       logger.log(Level.SEVERE, "Failed to get host", e);
175       throw new RuntimeException(e);
176     }
177     health = new HealthStatusManager();
178     if (secureMode) {
179       maintenanceServer =
180           NettyServerBuilder.forPort(maintenancePort)
181               .addService(new XdsUpdateHealthServiceImpl(health))
182               .addService(health.getHealthService())
183               .addService(ProtoReflectionService.newInstance())
184               .addServices(AdminInterface.getStandardServices())
185               .build();
186       maintenanceServer.start();
187       server =
188           XdsServerBuilder.forPort(
189                   port, XdsServerCredentials.create(InsecureServerCredentials.create()))
190               .addService(
191                   ServerInterceptors.intercept(
192                       new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
193               .build();
194       server.start();
195     } else {
196       server =
197           NettyServerBuilder.forPort(port)
198               .addService(
199                   ServerInterceptors.intercept(
200                       new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
201               .addService(new XdsUpdateHealthServiceImpl(health))
202               .addService(health.getHealthService())
203               .addService(ProtoReflectionService.newInstance())
204               .addServices(AdminInterface.getStandardServices())
205               .build();
206       server.start();
207       maintenanceServer = null;
208     }
209     health.setStatus("", ServingStatus.SERVING);
210   }
211 
stop()212   private void stop() throws Exception {
213     server.shutdownNow();
214     if (maintenanceServer != null) {
215       maintenanceServer.shutdownNow();
216     }
217     if (!server.awaitTermination(5, TimeUnit.SECONDS)) {
218       System.err.println("Timed out waiting for server shutdown");
219     }
220     if (maintenanceServer != null && !maintenanceServer.awaitTermination(5, TimeUnit.SECONDS)) {
221       System.err.println("Timed out waiting for maintenanceServer shutdown");
222     }
223   }
224 
blockUntilShutdown()225   private void blockUntilShutdown() throws InterruptedException {
226     if (server != null) {
227       server.awaitTermination();
228     }
229     if (maintenanceServer != null) {
230       maintenanceServer.awaitTermination();
231     }
232   }
233 
234   private static class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
235     private final String serverId;
236     private final String host;
237 
TestServiceImpl(String serverId, String host)238     private TestServiceImpl(String serverId, String host) {
239       this.serverId = serverId;
240       this.host = host;
241     }
242 
243     @Override
emptyCall( EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver)244     public void emptyCall(
245         EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver) {
246       responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance());
247       responseObserver.onCompleted();
248     }
249 
250     @Override
unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver)251     public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) {
252       responseObserver.onNext(
253           SimpleResponse.newBuilder().setServerId(serverId).setHostname(host).build());
254       responseObserver.onCompleted();
255     }
256   }
257 
258   private static class XdsUpdateHealthServiceImpl
259       extends XdsUpdateHealthServiceGrpc.XdsUpdateHealthServiceImplBase {
260     private HealthStatusManager health;
261 
XdsUpdateHealthServiceImpl(HealthStatusManager health)262     private XdsUpdateHealthServiceImpl(HealthStatusManager health) {
263       this.health = health;
264     }
265 
266     @Override
setServing( EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver)267     public void setServing(
268         EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver) {
269       health.setStatus("", ServingStatus.SERVING);
270       responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance());
271       responseObserver.onCompleted();
272     }
273 
274     @Override
setNotServing( EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver)275     public void setNotServing(
276         EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver) {
277       health.setStatus("", ServingStatus.NOT_SERVING);
278       responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance());
279       responseObserver.onCompleted();
280     }
281   }
282 
283   private static class TestInfoInterceptor implements ServerInterceptor {
284     private final String host;
285 
TestInfoInterceptor(String host)286     private TestInfoInterceptor(String host) {
287       this.host = host;
288     }
289 
290     @Override
interceptCall( ServerCall<ReqT, RespT> call, final Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next)291     public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
292         ServerCall<ReqT, RespT> call,
293         final Metadata requestHeaders,
294         ServerCallHandler<ReqT, RespT> next) {
295       List<String> callBehaviors = getCallBehaviors(requestHeaders);
296       ServerCall<ReqT, RespT> newCall = new SimpleForwardingServerCall<ReqT, RespT>(call) {
297         @Override
298         public void sendHeaders(Metadata responseHeaders) {
299           responseHeaders.put(HOSTNAME_KEY, host);
300           super.sendHeaders(responseHeaders);
301         }
302       };
303       ServerCall.Listener<ReqT> noopListener = new ServerCall.Listener<ReqT>() {};
304 
305       int attemptNum = 0;
306       String attemptNumHeader = requestHeaders.get(ATTEMPT_NUM);
307       if (attemptNumHeader != null) {
308         try {
309           attemptNum = Integer.valueOf(attemptNumHeader);
310         } catch (NumberFormatException e) {
311           newCall.close(
312               Status.INVALID_ARGUMENT.withDescription(
313                   "Invalid format for grpc-previous-rpc-attempts header: " + attemptNumHeader),
314               new Metadata());
315           return noopListener;
316         }
317       }
318 
319       for (String callBehavior : callBehaviors) {
320         if (callBehavior.startsWith(CALL_BEHAVIOR_HOSTNAME)) {
321           List<String> splitHeader = HEADER_HOSTNAME_SPLITTER.splitToList(callBehavior);
322           if (splitHeader.size() > 1) {
323             if (!splitHeader.get(0).substring(CALL_BEHAVIOR_HOSTNAME.length()).equals(host)) {
324               continue;
325             }
326             callBehavior = splitHeader.get(1);
327           } else {
328             newCall.close(
329                 Status.INVALID_ARGUMENT.withDescription(
330                     "Invalid format for rpc-behavior header: " + callBehavior),
331                 new Metadata()
332             );
333             return noopListener;
334           }
335         }
336 
337         if (callBehavior.startsWith(CALL_BEHAVIOR_SLEEP_VALUE)) {
338           try {
339             int timeout = Integer.parseInt(
340                 callBehavior.substring(CALL_BEHAVIOR_SLEEP_VALUE.length()));
341             Thread.sleep(timeout * 1000L);
342           } catch (NumberFormatException e) {
343             newCall.close(
344                 Status.INVALID_ARGUMENT.withDescription(
345                     "Invalid format for rpc-behavior header: " + callBehavior),
346                 new Metadata());
347             return noopListener;
348           } catch (InterruptedException e) {
349             Thread.currentThread().interrupt();
350             newCall.close(
351                 Status.ABORTED.withDescription("execution of server interrupted"),
352                 new Metadata());
353             return noopListener;
354           }
355         }
356 
357         if (callBehavior.startsWith(CALL_BEHAVIOR_SUCCEED_ON_RETRY_ATTEMPT_VALUE)) {
358           int succeedOnAttemptNum = Integer.MAX_VALUE;
359           try {
360             succeedOnAttemptNum = Integer.parseInt(
361                 callBehavior.substring(CALL_BEHAVIOR_SUCCEED_ON_RETRY_ATTEMPT_VALUE.length()));
362           } catch (NumberFormatException e) {
363             newCall.close(
364                 Status.INVALID_ARGUMENT.withDescription(
365                     "Invalid format for rpc-behavior header: " + callBehavior),
366                 new Metadata());
367             return noopListener;
368           }
369           if (attemptNum == succeedOnAttemptNum) {
370             return next.startCall(newCall, requestHeaders);
371           }
372         }
373 
374         // hang if instructed by rpc-behavior
375         if (callBehavior.equals(CALL_BEHAVIOR_KEEP_OPEN_VALUE)) {
376           return noopListener;
377         }
378 
379         if (callBehavior.startsWith(CALL_BEHAVIOR_ERROR_CODE)) {
380           try {
381             int codeValue = Integer.valueOf(
382                 callBehavior.substring(CALL_BEHAVIOR_ERROR_CODE.length()));
383             newCall.close(
384                 Status.fromCodeValue(codeValue).withDescription(
385                     "Rpc failed as per the rpc-behavior header value:" + callBehaviors),
386                 new Metadata());
387             return noopListener;
388           } catch (NumberFormatException e) {
389             newCall.close(
390                 Status.INVALID_ARGUMENT.withDescription(
391                     "Invalid format for rpc-behavior header: " + callBehavior),
392                 new Metadata());
393             return noopListener;
394           }
395         }
396       }
397 
398       return next.startCall(newCall, requestHeaders);
399     }
400   }
401 
getCallBehaviors(Metadata requestHeaders)402   private static List<String> getCallBehaviors(Metadata requestHeaders) {
403     List<String> callBehaviors = new ArrayList<>();
404     Iterable<String> values = requestHeaders.getAll(CALL_BEHAVIOR_MD_KEY);
405     if (values == null) {
406       return callBehaviors;
407     }
408     for (String value : values) {
409       Iterables.addAll(callBehaviors, HEADER_VALUE_SPLITTER.split(value));
410     }
411     return callBehaviors;
412   }
413 }
414