1 /* 2 * Copyright 2019 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import static com.google.common.base.Charsets.UTF_8; 20 import static org.junit.Assert.assertEquals; 21 22 import com.google.common.io.CharStreams; 23 import io.grpc.Deadline; 24 import io.grpc.ManagedChannel; 25 import io.grpc.StatusRuntimeException; 26 import io.grpc.alts.ComputeEngineChannelBuilder; 27 import io.grpc.testing.integration.Messages.GrpclbRouteType; 28 import io.grpc.testing.integration.Messages.SimpleRequest; 29 import io.grpc.testing.integration.Messages.SimpleResponse; 30 import java.io.InputStreamReader; 31 import java.util.concurrent.TimeUnit; 32 import java.util.logging.Logger; 33 34 /** 35 * Test client that verifies that grpclb failover into fallback mode works under 36 * different failure modes. 37 * This client is suitable for testing fallback with any "grpclb" load-balanced 38 * service, but is particularly meant to implement a set of test cases described 39 * in an internal doc titled "DirectPath Cloud-to-Prod End-to-End Test Cases", 40 * section "gRPC DirectPath-to-CFE fallback". 41 */ 42 public final class GrpclbFallbackTestClient { 43 private static final Logger logger = 44 Logger.getLogger(GrpclbFallbackTestClient.class.getName()); 45 46 /** 47 * Entry point. 48 */ main(String[] args)49 public static void main(String[] args) throws Exception { 50 final GrpclbFallbackTestClient client = new GrpclbFallbackTestClient(); 51 client.parseArgs(args); 52 Runtime.getRuntime().addShutdownHook(new Thread() { 53 @Override 54 @SuppressWarnings("CatchAndPrintStackTrace") 55 public void run() { 56 System.out.println("Shutting down"); 57 try { 58 client.tearDown(); 59 } catch (Exception e) { 60 e.printStackTrace(); 61 } 62 } 63 }); 64 try { 65 client.run(); 66 } finally { 67 client.tearDown(); 68 } 69 System.exit(0); 70 } 71 72 private String induceFallbackCmd = "exit 1"; 73 private String serverUri; 74 private String customCredentialsType; 75 private String testCase; 76 private Boolean skipNetCmd = false; 77 private int numWarmupRpcs; 78 private int fallbackDeadlineSeconds = 1; 79 80 private ManagedChannel channel; 81 private TestServiceGrpc.TestServiceBlockingStub blockingStub; 82 parseArgs(String[] args)83 private void parseArgs(String[] args) { 84 boolean usage = false; 85 for (String arg : args) { 86 if (!arg.startsWith("--")) { 87 System.err.println("All arguments must start with '--': " + arg); 88 usage = true; 89 break; 90 } 91 String[] parts = arg.substring(2).split("=", 2); 92 String key = parts[0]; 93 if ("help".equals(key)) { 94 usage = true; 95 break; 96 } 97 if (parts.length != 2) { 98 System.err.println("All arguments must be of the form --arg=value"); 99 usage = true; 100 break; 101 } 102 String value = parts[1]; 103 if ("server_uri".equals(key)) { 104 serverUri = value; 105 } else if ("test_case".equals(key)) { 106 testCase = value; 107 } else if ("induce_fallback_cmd".equals(key)) { 108 induceFallbackCmd = value; 109 } else if ("custom_credentials_type".equals(key)) { 110 customCredentialsType = value; 111 } else if ("skip_net_cmd".equals(key)) { 112 skipNetCmd = Boolean.valueOf(value); 113 } else if ("num_warmup_rpcs".equals(key)) { 114 numWarmupRpcs = Integer.valueOf(value); 115 } else if ("fallback_deadline_seconds".equals(key)) { 116 fallbackDeadlineSeconds = Integer.valueOf(value); 117 } else { 118 System.err.println("Unknown argument: " + key); 119 usage = true; 120 break; 121 } 122 } 123 if (usage) { 124 GrpclbFallbackTestClient c = new GrpclbFallbackTestClient(); 125 System.out.println( 126 "Usage: [ARGS...]" 127 + "\n" 128 + "\n --server_uri Server target. Default: " 129 + c.serverUri 130 + "\n --custom_credentials_type Name of Credentials to use. " 131 + "Default: " + c.customCredentialsType 132 + "\n --induce_fallback_cmd Shell command to induce fallback, e.g. by " 133 + "making LB and/or backend addresses unroutable or black holed. Default: " 134 + c.induceFallbackCmd 135 + "\n --skip_net_cmd Skip unroute and blackhole " 136 + "shell command to allow setting the net config outside of the test " 137 + "client. Default: " 138 + c.skipNetCmd 139 + "\n --num_warmup_rpcs Number of RPCs to perform " 140 + "on a separate warmup channel before the actual test runs (each warmup " 141 + "RPC uses a 1 second deadline). Default: " 142 + c.numWarmupRpcs 143 + "\n --fallback_deadline_seconds Number of seconds to wait " 144 + "for fallback to occur after inducing fallback. Default: " 145 + c.fallbackDeadlineSeconds 146 + "\n --test_case=TEST_CASE Test case to run. Valid options are:" 147 + "\n fallback_before_startup : fallback before startup e.g. due to " 148 + "LB/backend addresses being unreachable" 149 + "\n fallback_after_startup : fallback after startup e.g. due to " 150 + "LB/backend addresses becoming unreachable" 151 + "\n Default: " + c.testCase 152 ); 153 System.exit(1); 154 } 155 } 156 createChannel()157 private ManagedChannel createChannel() { 158 if (!customCredentialsType.equals("compute_engine_channel_creds")) { 159 throw new AssertionError( 160 "This test currently only supports " 161 + "--custom_credentials_type=compute_engine_channel_creds. " 162 + "TODO: add support for other types."); 163 } 164 ComputeEngineChannelBuilder builder = ComputeEngineChannelBuilder.forTarget(serverUri); 165 builder.keepAliveTime(3600, TimeUnit.SECONDS); 166 builder.keepAliveTimeout(20, TimeUnit.SECONDS); 167 return builder.build(); 168 } 169 initStub()170 void initStub() { 171 channel = createChannel(); 172 blockingStub = TestServiceGrpc.newBlockingStub(channel); 173 } 174 tearDown()175 private void tearDown() { 176 try { 177 if (channel != null) { 178 channel.shutdownNow(); 179 channel.awaitTermination(1, TimeUnit.SECONDS); 180 } 181 } catch (Exception ex) { 182 throw new RuntimeException(ex); 183 } 184 } 185 runShellCmd(String cmd)186 private void runShellCmd(String cmd) throws Exception { 187 if (skipNetCmd) { 188 logger.info("Skip net cmd because --skip_net_cmd is set to true"); 189 return; 190 } 191 logger.info("Run shell command: " + cmd); 192 // Do not use bash -c here as bash may not exist in a container 193 ProcessBuilder pb = new ProcessBuilder(cmd.split(" ")); 194 pb.redirectErrorStream(true); 195 Process process = pb.start(); 196 logger.info("Shell command merged stdout and stderr: " 197 + CharStreams.toString( 198 new InputStreamReader(process.getInputStream(), UTF_8))); 199 int exitCode = process.waitFor(); 200 logger.info("Shell command exit code: " + exitCode); 201 assertEquals(0, exitCode); 202 } 203 doRpcAndGetPath( TestServiceGrpc.TestServiceBlockingStub stub, Deadline deadline)204 private GrpclbRouteType doRpcAndGetPath( 205 TestServiceGrpc.TestServiceBlockingStub stub, Deadline deadline) { 206 logger.info("doRpcAndGetPath deadline: " + deadline); 207 final SimpleRequest request = SimpleRequest.newBuilder() 208 .setFillGrpclbRouteType(true) 209 .build(); 210 GrpclbRouteType result = GrpclbRouteType.GRPCLB_ROUTE_TYPE_UNKNOWN; 211 try { 212 SimpleResponse response = stub 213 .withDeadline(deadline) 214 .unaryCall(request); 215 result = response.getGrpclbRouteType(); 216 } catch (StatusRuntimeException ex) { 217 logger.warning("doRpcAndGetPath failed. Status: " + ex); 218 return GrpclbRouteType.GRPCLB_ROUTE_TYPE_UNKNOWN; 219 } 220 logger.info("doRpcAndGetPath. GrpclbRouteType result: " + result); 221 if (result != GrpclbRouteType.GRPCLB_ROUTE_TYPE_FALLBACK 222 && result != GrpclbRouteType.GRPCLB_ROUTE_TYPE_BACKEND) { 223 throw new AssertionError("Received invalid LB route type. This suggests " 224 + "that the server hasn't implemented this test correctly."); 225 } 226 return result; 227 } 228 waitForFallbackAndDoRpcs(Deadline fallbackDeadline)229 private void waitForFallbackAndDoRpcs(Deadline fallbackDeadline) throws Exception { 230 int fallbackRetryCount = 0; 231 boolean fallBack = false; 232 while (!fallbackDeadline.isExpired()) { 233 GrpclbRouteType grpclbRouteType = doRpcAndGetPath( 234 blockingStub, Deadline.after(1, TimeUnit.SECONDS)); 235 if (grpclbRouteType == GrpclbRouteType.GRPCLB_ROUTE_TYPE_BACKEND) { 236 throw new AssertionError("Got grpclb route type backend. Backends are " 237 + "supposed to be unreachable, so this test is broken"); 238 } 239 if (grpclbRouteType == GrpclbRouteType.GRPCLB_ROUTE_TYPE_FALLBACK) { 240 logger.info("Made one successful RPC to a fallback. Now expect the " 241 + "same for the rest."); 242 fallBack = true; 243 break; 244 } else { 245 logger.info("Retryable RPC failure on iteration: " + fallbackRetryCount); 246 } 247 fallbackRetryCount++; 248 } 249 if (!fallBack) { 250 throw new AssertionError("Didn't fall back within deadline"); 251 } 252 for (int i = 0; i < 30; i++) { 253 assertEquals( 254 GrpclbRouteType.GRPCLB_ROUTE_TYPE_FALLBACK, 255 doRpcAndGetPath(blockingStub, Deadline.after(20, TimeUnit.SECONDS))); 256 Thread.sleep(1000); 257 } 258 } 259 runFallbackBeforeStartup()260 private void runFallbackBeforeStartup() throws Exception { 261 runShellCmd(induceFallbackCmd); 262 final Deadline fallbackDeadline = Deadline.after( 263 fallbackDeadlineSeconds, TimeUnit.SECONDS); 264 initStub(); 265 waitForFallbackAndDoRpcs(fallbackDeadline); 266 } 267 runFallbackAfterStartup()268 private void runFallbackAfterStartup() throws Exception { 269 initStub(); 270 assertEquals( 271 GrpclbRouteType.GRPCLB_ROUTE_TYPE_BACKEND, 272 doRpcAndGetPath(blockingStub, Deadline.after(20, TimeUnit.SECONDS))); 273 runShellCmd(induceFallbackCmd); 274 final Deadline fallbackDeadline = Deadline.after( 275 fallbackDeadlineSeconds, TimeUnit.SECONDS); 276 waitForFallbackAndDoRpcs(fallbackDeadline); 277 } 278 279 // The purpose of this warmup method is to get potentially expensive one-per-process 280 // initialization out of the way, so that we can use aggressive timeouts in the actual 281 // test cases. Note that the warmup phase is done using a separate channel from the 282 // actual test cases, so that we don't affect the states of LB policies in the channel 283 // of the actual test case. warmup()284 private void warmup() throws Exception { 285 logger.info("Begin warmup, performing " + numWarmupRpcs + " RPCs on the warmup channel"); 286 ManagedChannel channel = createChannel(); 287 TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel); 288 for (int i = 0; i < numWarmupRpcs; i++) { 289 doRpcAndGetPath(stub, Deadline.after(1, TimeUnit.SECONDS)); 290 } 291 try { 292 channel.shutdownNow(); 293 channel.awaitTermination(1, TimeUnit.SECONDS); 294 } catch (Exception ex) { 295 throw new RuntimeException(ex); 296 } 297 } 298 run()299 private void run() throws Exception { 300 warmup(); 301 logger.info("Begin test case: " + testCase); 302 if (testCase.equals("fallback_before_startup")) { 303 runFallbackBeforeStartup(); 304 } else if (testCase.equals("fallback_after_startup")) { 305 runFallbackAfterStartup(); 306 } else { 307 throw new RuntimeException("invalid testcase: " + testCase); 308 } 309 logger.info("Test case: " + testCase + " done!"); 310 } 311 } 312