• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2019 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import static com.google.common.base.Charsets.UTF_8;
20 import static org.junit.Assert.assertEquals;
21 
22 import com.google.common.io.CharStreams;
23 import io.grpc.Deadline;
24 import io.grpc.ManagedChannel;
25 import io.grpc.StatusRuntimeException;
26 import io.grpc.alts.ComputeEngineChannelBuilder;
27 import io.grpc.testing.integration.Messages.GrpclbRouteType;
28 import io.grpc.testing.integration.Messages.SimpleRequest;
29 import io.grpc.testing.integration.Messages.SimpleResponse;
30 import java.io.InputStreamReader;
31 import java.util.concurrent.TimeUnit;
32 import java.util.logging.Logger;
33 
34 /**
35  * Test client that verifies that grpclb failover into fallback mode works under
36  * different failure modes.
37  * This client is suitable for testing fallback with any "grpclb" load-balanced
38  * service, but is particularly meant to implement a set of test cases described
39  * in an internal doc titled "DirectPath Cloud-to-Prod End-to-End Test Cases",
40  * section "gRPC DirectPath-to-CFE fallback".
41  */
42 public final class GrpclbFallbackTestClient {
43   private static final Logger logger =
44       Logger.getLogger(GrpclbFallbackTestClient.class.getName());
45 
46   /**
47    * Entry point.
48    */
main(String[] args)49   public static void main(String[] args) throws Exception {
50     final GrpclbFallbackTestClient client = new GrpclbFallbackTestClient();
51     client.parseArgs(args);
52     Runtime.getRuntime().addShutdownHook(new Thread() {
53       @Override
54       @SuppressWarnings("CatchAndPrintStackTrace")
55       public void run() {
56         System.out.println("Shutting down");
57         try {
58           client.tearDown();
59         } catch (Exception e) {
60           e.printStackTrace();
61         }
62       }
63     });
64     try {
65       client.run();
66     } finally {
67       client.tearDown();
68     }
69     System.exit(0);
70   }
71 
72   private String induceFallbackCmd = "exit 1";
73   private String serverUri;
74   private String customCredentialsType;
75   private String testCase;
76   private Boolean skipNetCmd = false;
77   private int numWarmupRpcs;
78   private int fallbackDeadlineSeconds = 1;
79 
80   private ManagedChannel channel;
81   private TestServiceGrpc.TestServiceBlockingStub blockingStub;
82 
parseArgs(String[] args)83   private void parseArgs(String[] args) {
84     boolean usage = false;
85     for (String arg : args) {
86       if (!arg.startsWith("--")) {
87         System.err.println("All arguments must start with '--': " + arg);
88         usage = true;
89         break;
90       }
91       String[] parts = arg.substring(2).split("=", 2);
92       String key = parts[0];
93       if ("help".equals(key)) {
94         usage = true;
95         break;
96       }
97       if (parts.length != 2) {
98         System.err.println("All arguments must be of the form --arg=value");
99         usage = true;
100         break;
101       }
102       String value = parts[1];
103       if ("server_uri".equals(key)) {
104         serverUri = value;
105       } else if ("test_case".equals(key)) {
106         testCase = value;
107       } else if ("induce_fallback_cmd".equals(key)) {
108         induceFallbackCmd = value;
109       } else if ("custom_credentials_type".equals(key)) {
110         customCredentialsType = value;
111       } else if ("skip_net_cmd".equals(key)) {
112         skipNetCmd = Boolean.valueOf(value);
113       } else if ("num_warmup_rpcs".equals(key)) {
114         numWarmupRpcs = Integer.valueOf(value);
115       } else if ("fallback_deadline_seconds".equals(key)) {
116         fallbackDeadlineSeconds = Integer.valueOf(value);
117       } else {
118         System.err.println("Unknown argument: " + key);
119         usage = true;
120         break;
121       }
122     }
123     if (usage) {
124       GrpclbFallbackTestClient c = new GrpclbFallbackTestClient();
125       System.out.println(
126           "Usage: [ARGS...]"
127           + "\n"
128           + "\n  --server_uri                          Server target. Default: "
129           + c.serverUri
130           + "\n  --custom_credentials_type             Name of Credentials to use. "
131           + "Default: " + c.customCredentialsType
132           + "\n  --induce_fallback_cmd Shell command to induce fallback, e.g. by "
133           + "making LB and/or backend addresses unroutable or black holed. Default: "
134           + c.induceFallbackCmd
135           + "\n  --skip_net_cmd                        Skip unroute and blackhole "
136           + "shell command to allow setting the net config outside of the test "
137           + "client. Default: "
138           + c.skipNetCmd
139           + "\n  --num_warmup_rpcs                     Number of RPCs to perform "
140           + "on a separate warmup channel before the actual test runs (each warmup "
141           + "RPC uses a 1 second deadline). Default: "
142           + c.numWarmupRpcs
143           + "\n  --fallback_deadline_seconds           Number of seconds to wait "
144           + "for fallback to occur after inducing fallback. Default: "
145           + c.fallbackDeadlineSeconds
146           + "\n  --test_case=TEST_CASE        Test case to run. Valid options are:"
147           + "\n      fallback_before_startup : fallback before startup e.g. due to "
148           + "LB/backend addresses being unreachable"
149           + "\n      fallback_after_startup : fallback after startup e.g. due to "
150           + "LB/backend addresses becoming unreachable"
151           + "\n      Default: " + c.testCase
152       );
153       System.exit(1);
154     }
155   }
156 
createChannel()157   private ManagedChannel createChannel() {
158     if (!customCredentialsType.equals("compute_engine_channel_creds")) {
159       throw new AssertionError(
160           "This test currently only supports "
161           + "--custom_credentials_type=compute_engine_channel_creds. "
162           + "TODO: add support for other types.");
163     }
164     ComputeEngineChannelBuilder builder = ComputeEngineChannelBuilder.forTarget(serverUri);
165     builder.keepAliveTime(3600, TimeUnit.SECONDS);
166     builder.keepAliveTimeout(20, TimeUnit.SECONDS);
167     return builder.build();
168   }
169 
initStub()170   void initStub() {
171     channel = createChannel();
172     blockingStub = TestServiceGrpc.newBlockingStub(channel);
173   }
174 
tearDown()175   private void tearDown() {
176     try {
177       if (channel != null) {
178         channel.shutdownNow();
179         channel.awaitTermination(1, TimeUnit.SECONDS);
180       }
181     } catch (Exception ex) {
182       throw new RuntimeException(ex);
183     }
184   }
185 
runShellCmd(String cmd)186   private void runShellCmd(String cmd) throws Exception {
187     if (skipNetCmd) {
188       logger.info("Skip net cmd because --skip_net_cmd is set to true");
189       return;
190     }
191     logger.info("Run shell command: " + cmd);
192     // Do not use bash -c here as bash may not exist in a container
193     ProcessBuilder pb = new ProcessBuilder(cmd.split(" "));
194     pb.redirectErrorStream(true);
195     Process process = pb.start();
196     logger.info("Shell command merged stdout and stderr: "
197         + CharStreams.toString(
198             new InputStreamReader(process.getInputStream(), UTF_8)));
199     int exitCode = process.waitFor();
200     logger.info("Shell command exit code: " + exitCode);
201     assertEquals(0, exitCode);
202   }
203 
doRpcAndGetPath( TestServiceGrpc.TestServiceBlockingStub stub, Deadline deadline)204   private GrpclbRouteType doRpcAndGetPath(
205       TestServiceGrpc.TestServiceBlockingStub stub, Deadline deadline) {
206     logger.info("doRpcAndGetPath deadline: " + deadline);
207     final SimpleRequest request = SimpleRequest.newBuilder()
208         .setFillGrpclbRouteType(true)
209         .build();
210     GrpclbRouteType result = GrpclbRouteType.GRPCLB_ROUTE_TYPE_UNKNOWN;
211     try {
212       SimpleResponse response = stub
213           .withDeadline(deadline)
214           .unaryCall(request);
215       result = response.getGrpclbRouteType();
216     } catch (StatusRuntimeException ex) {
217       logger.warning("doRpcAndGetPath failed. Status: " + ex);
218       return GrpclbRouteType.GRPCLB_ROUTE_TYPE_UNKNOWN;
219     }
220     logger.info("doRpcAndGetPath. GrpclbRouteType result: " + result);
221     if (result != GrpclbRouteType.GRPCLB_ROUTE_TYPE_FALLBACK
222         && result != GrpclbRouteType.GRPCLB_ROUTE_TYPE_BACKEND) {
223       throw new AssertionError("Received invalid LB route type. This suggests "
224           + "that the server hasn't implemented this test correctly.");
225     }
226     return result;
227   }
228 
waitForFallbackAndDoRpcs(Deadline fallbackDeadline)229   private void waitForFallbackAndDoRpcs(Deadline fallbackDeadline) throws Exception {
230     int fallbackRetryCount = 0;
231     boolean fallBack = false;
232     while (!fallbackDeadline.isExpired()) {
233       GrpclbRouteType grpclbRouteType = doRpcAndGetPath(
234           blockingStub, Deadline.after(1, TimeUnit.SECONDS));
235       if (grpclbRouteType == GrpclbRouteType.GRPCLB_ROUTE_TYPE_BACKEND) {
236         throw new AssertionError("Got grpclb route type backend. Backends are "
237             + "supposed to be unreachable, so this test is broken");
238       }
239       if (grpclbRouteType == GrpclbRouteType.GRPCLB_ROUTE_TYPE_FALLBACK) {
240         logger.info("Made one successful RPC to a fallback. Now expect the "
241             + "same for the rest.");
242         fallBack = true;
243         break;
244       } else {
245         logger.info("Retryable RPC failure on iteration: " + fallbackRetryCount);
246       }
247       fallbackRetryCount++;
248     }
249     if (!fallBack) {
250       throw new AssertionError("Didn't fall back within deadline");
251     }
252     for (int i = 0; i < 30; i++) {
253       assertEquals(
254           GrpclbRouteType.GRPCLB_ROUTE_TYPE_FALLBACK,
255           doRpcAndGetPath(blockingStub, Deadline.after(20, TimeUnit.SECONDS)));
256       Thread.sleep(1000);
257     }
258   }
259 
runFallbackBeforeStartup()260   private void runFallbackBeforeStartup() throws Exception {
261     runShellCmd(induceFallbackCmd);
262     final Deadline fallbackDeadline = Deadline.after(
263         fallbackDeadlineSeconds, TimeUnit.SECONDS);
264     initStub();
265     waitForFallbackAndDoRpcs(fallbackDeadline);
266   }
267 
runFallbackAfterStartup()268   private void runFallbackAfterStartup() throws Exception {
269     initStub();
270     assertEquals(
271         GrpclbRouteType.GRPCLB_ROUTE_TYPE_BACKEND,
272         doRpcAndGetPath(blockingStub, Deadline.after(20, TimeUnit.SECONDS)));
273     runShellCmd(induceFallbackCmd);
274     final Deadline fallbackDeadline = Deadline.after(
275         fallbackDeadlineSeconds, TimeUnit.SECONDS);
276     waitForFallbackAndDoRpcs(fallbackDeadline);
277   }
278 
279   // The purpose of this warmup method is to get potentially expensive one-per-process
280   // initialization out of the way, so that we can use aggressive timeouts in the actual
281   // test cases. Note that the warmup phase is done using a separate channel from the
282   // actual test cases, so that we don't affect the states of LB policies in the channel
283   // of the actual test case.
warmup()284   private void warmup() throws Exception {
285     logger.info("Begin warmup, performing " + numWarmupRpcs + " RPCs on the warmup channel");
286     ManagedChannel channel = createChannel();
287     TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel);
288     for (int i = 0; i < numWarmupRpcs; i++) {
289       doRpcAndGetPath(stub, Deadline.after(1, TimeUnit.SECONDS));
290     }
291     try {
292       channel.shutdownNow();
293       channel.awaitTermination(1, TimeUnit.SECONDS);
294     } catch (Exception ex) {
295       throw new RuntimeException(ex);
296     }
297   }
298 
run()299   private void run() throws Exception {
300     warmup();
301     logger.info("Begin test case: " + testCase);
302     if (testCase.equals("fallback_before_startup")) {
303       runFallbackBeforeStartup();
304     } else if (testCase.equals("fallback_after_startup")) {
305       runFallbackAfterStartup();
306     } else {
307       throw new RuntimeException("invalid testcase: " + testCase);
308     }
309     logger.info("Test case: " + testCase + " done!");
310   }
311 }
312