1 /* 2 * Copyright 2023 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import static java.util.concurrent.TimeUnit.SECONDS; 20 import static org.junit.Assert.assertTrue; 21 22 import io.grpc.ChannelCredentials; 23 import io.grpc.InsecureChannelCredentials; 24 import io.grpc.ManagedChannelBuilder; 25 import io.grpc.alts.ComputeEngineChannelCredentials; 26 import io.grpc.netty.NettyChannelBuilder; 27 import java.util.ArrayList; 28 import java.util.logging.Logger; 29 30 /** 31 * Test client that can be used to verify that XDS federation works. A list of 32 * server URIs (which can each be load balanced by different XDS servers), can 33 * be configured via flags. A separate thread is created for each of these clients 34 * and the configured test (either rpc_soak or channel_soak) is ran for each client 35 * on each thread. 36 */ 37 public final class XdsFederationTestClient { 38 private static final Logger logger = 39 Logger.getLogger(XdsFederationTestClient.class.getName()); 40 41 /** 42 * Entry point. 43 */ main(String[] args)44 public static void main(String[] args) throws Exception { 45 final XdsFederationTestClient client = new XdsFederationTestClient(); 46 client.parseArgs(args); 47 Runtime.getRuntime() 48 .addShutdownHook( 49 new Thread() { 50 @Override 51 @SuppressWarnings("CatchAndPrintStackTrace") 52 public void run() { 53 System.out.println("Shutting down"); 54 try { 55 client.tearDown(); 56 } catch (RuntimeException e) { 57 e.printStackTrace(); 58 } 59 } 60 }); 61 client.setUp(); 62 try { 63 client.run(); 64 } finally { 65 client.tearDown(); 66 } 67 System.exit(0); 68 } 69 70 private String serverUris = ""; 71 private String credentialsTypes = ""; 72 private int soakIterations = 10; 73 private int soakMaxFailures = 0; 74 private int soakPerIterationMaxAcceptableLatencyMs = 1000; 75 private int soakOverallTimeoutSeconds = 10; 76 private int soakMinTimeMsBetweenRpcs = 0; 77 private String testCase = "rpc_soak"; 78 private final ArrayList<InnerClient> clients = new ArrayList<>(); 79 parseArgs(String[] args)80 private void parseArgs(String[] args) { 81 boolean usage = false; 82 for (String arg : args) { 83 if (!arg.startsWith("--")) { 84 System.err.println("All arguments must start with '--': " + arg); 85 usage = true; 86 break; 87 } 88 String[] parts = arg.substring(2).split("=", 2); 89 String key = parts[0]; 90 if (key.equals("help")) { 91 usage = true; 92 break; 93 } 94 if (parts.length != 2) { 95 System.err.println("All arguments must be of the form --arg=value"); 96 usage = true; 97 break; 98 } 99 String value = parts[1]; 100 switch (key) { 101 case "server_uris": 102 serverUris = value; 103 break; 104 case "credentials_types": 105 credentialsTypes = value; 106 break; 107 case "test_case": 108 testCase = value; 109 break; 110 case "soak_iterations": 111 soakIterations = Integer.parseInt(value); 112 break; 113 case "soak_max_failures": 114 soakMaxFailures = Integer.parseInt(value); 115 break; 116 case "soak_per_iteration_max_acceptable_latency_ms": 117 soakPerIterationMaxAcceptableLatencyMs = Integer.parseInt(value); 118 break; 119 case "soak_overall_timeout_seconds": 120 soakOverallTimeoutSeconds = Integer.parseInt(value); 121 break; 122 case "soak_min_time_ms_between_rpcs": 123 soakMinTimeMsBetweenRpcs = Integer.parseInt(value); 124 break; 125 default: 126 System.err.println("Unknown argument: " + key); 127 usage = true; 128 break; 129 } 130 } 131 if (usage) { 132 XdsFederationTestClient c = new XdsFederationTestClient(); 133 System.out.println( 134 "Usage: [ARGS...]" 135 + "\n" 136 + "\n --server_uris Comma separated list of server " 137 + "URIs to make RPCs to. Default: " 138 + c.serverUris 139 + "\n --credentials_types Comma-separated list of " 140 + "\n credentials, each entry is used " 141 + "\n for the server of the " 142 + "\n corresponding index in server_uris. " 143 + "\n Supported values: " 144 + "compute_engine_channel_creds,INSECURE_CREDENTIALS. Default: " 145 + c.credentialsTypes 146 + "\n --soak_iterations The number of iterations to use " 147 + "\n for the two tests: rpc_soak and " 148 + "\n channel_soak. Default: " 149 + c.soakIterations 150 + "\n --soak_max_failures The number of iterations in soak " 151 + "\n tests that are allowed to fail " 152 + "\n (either due to non-OK status code " 153 + "\n or exceeding the per-iteration max " 154 + "\n acceptable latency). Default: " 155 + c.soakMaxFailures 156 + "\n --soak_per_iteration_max_acceptable_latency_ms" 157 + "\n The number of milliseconds a " 158 + "\n single iteration in the two soak " 159 + "\n tests (rpc_soak and channel_soak) " 160 + "\n should take. Default: " 161 + c.soakPerIterationMaxAcceptableLatencyMs 162 + "\n --soak_overall_timeout_seconds The overall number of seconds " 163 + "\n after which a soak test should " 164 + "\n stop and fail, if the desired " 165 + "\n number of iterations have not yet " 166 + "\n completed. Default: " 167 + c.soakOverallTimeoutSeconds 168 + "\n --soak_min_time_ms_between_rpcs The minimum time in milliseconds " 169 + "\n between consecutive RPCs in a soak " 170 + "\n test (rpc_soak or channel_soak), " 171 + "\n useful for limiting QPS. Default: " 172 + c.soakMinTimeMsBetweenRpcs 173 + "\n --test_case=TEST_CASE Test case to run. Valid options are:" 174 + "\n rpc_soak: sends --soak_iterations large_unary RPCs" 175 + "\n channel_soak: sends --soak_iterations RPCs, rebuilding the channel " 176 + "each time." 177 + "\n Default: " + c.testCase 178 ); 179 System.exit(1); 180 } 181 } 182 setUp()183 void setUp() { 184 String[] uris = serverUris.split(",", -1); 185 String[] creds = credentialsTypes.split(",", -1); 186 if (uris.length == 0) { 187 throw new IllegalArgumentException("--server_uris is empty"); 188 } 189 if (uris.length != creds.length) { 190 throw new IllegalArgumentException("Number of entries in --server_uris " 191 + "does not match number of entries in --credentials_types"); 192 } 193 for (int i = 0; i < uris.length; i++) { 194 clients.add(new InnerClient(creds[i], uris[i])); 195 } 196 for (InnerClient c : clients) { 197 c.setUp(); 198 } 199 } 200 tearDown()201 private synchronized void tearDown() { 202 for (InnerClient c : clients) { 203 c.tearDown(); 204 } 205 } 206 207 /** 208 * Wraps a single client stub configuration and executes a 209 * soak test case with that configuration. 210 */ 211 class InnerClient extends AbstractInteropTest { 212 private final String credentialsType; 213 private final String serverUri; 214 private boolean runSucceeded = false; 215 InnerClient(String credentialsType, String serverUri)216 public InnerClient(String credentialsType, String serverUri) { 217 this.credentialsType = credentialsType; 218 this.serverUri = serverUri; 219 } 220 221 /** 222 * Indicates whether run succeeded or not. This must only be called 223 * after run() has finished. 224 */ runSucceeded()225 public boolean runSucceeded() { 226 return runSucceeded; 227 } 228 229 /** 230 * Run the intended soak test. 231 */ run()232 public void run() { 233 boolean resetChannelPerIteration; 234 switch (testCase) { 235 case "rpc_soak": 236 resetChannelPerIteration = false; 237 break; 238 case "channel_soak": 239 resetChannelPerIteration = true; 240 break; 241 default: 242 throw new RuntimeException("invalid testcase: " + testCase); 243 } 244 try { 245 performSoakTest( 246 serverUri, 247 resetChannelPerIteration, 248 soakIterations, 249 soakMaxFailures, 250 soakPerIterationMaxAcceptableLatencyMs, 251 soakMinTimeMsBetweenRpcs, 252 soakOverallTimeoutSeconds); 253 logger.info("Test case: " + testCase + " done for server: " + serverUri); 254 runSucceeded = true; 255 } catch (Exception e) { 256 logger.info("Test case: " + testCase + " failed for server: " + serverUri); 257 throw new RuntimeException(e); 258 } 259 } 260 261 @Override createChannelBuilder()262 protected ManagedChannelBuilder<?> createChannelBuilder() { 263 ChannelCredentials channelCredentials; 264 switch (credentialsType) { 265 case "compute_engine_channel_creds": 266 channelCredentials = ComputeEngineChannelCredentials.create(); 267 break; 268 case "INSECURE_CREDENTIALS": 269 channelCredentials = InsecureChannelCredentials.create(); 270 break; 271 default: 272 throw new IllegalArgumentException("Unknown custom credentials: " + credentialsType); 273 } 274 return NettyChannelBuilder.forTarget(serverUri, channelCredentials) 275 .keepAliveTime(3600, SECONDS) 276 .keepAliveTimeout(20, SECONDS); 277 } 278 } 279 run()280 private void run() throws Exception { 281 logger.info("Begin test case: " + testCase); 282 ArrayList<Thread> threads = new ArrayList<>(); 283 for (InnerClient c : clients) { 284 Thread t = new Thread(c::run); 285 t.start(); 286 threads.add(t); 287 } 288 for (Thread t : threads) { 289 t.join(); 290 } 291 for (InnerClient c : clients) { 292 assertTrue(c.runSucceeded()); 293 } 294 logger.info("Test case: " + testCase + " done for all clients!"); 295 } 296 } 297