1 /* 2 * Copyright 2019 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import com.google.protobuf.ByteString; 20 import io.grpc.ManagedChannel; 21 import io.grpc.StatusRuntimeException; 22 import io.grpc.alts.ComputeEngineChannelBuilder; 23 import io.grpc.testing.integration.Messages.Payload; 24 import io.grpc.testing.integration.Messages.SimpleRequest; 25 import io.grpc.testing.integration.Messages.SimpleResponse; 26 import java.util.concurrent.TimeUnit; 27 import java.util.logging.Level; 28 import java.util.logging.Logger; 29 30 /** 31 * Test client that verifies all requests are sent to the same server even running for an extended 32 * time, while allowing for occasionally switching server. This is intended for testing the GRPCLB 33 * pick_first mode in GCE. 34 */ 35 public final class GrpclbLongLivedAffinityTestClient { 36 private static final Logger logger = 37 Logger.getLogger(GrpclbLongLivedAffinityTestClient.class.getName()); 38 39 /** 40 * Entry point. 41 */ main(String[] args)42 public static void main(String[] args) throws Exception { 43 final GrpclbLongLivedAffinityTestClient client = new GrpclbLongLivedAffinityTestClient(); 44 client.parseArgs(args); 45 client.setUp(); 46 47 Runtime.getRuntime().addShutdownHook(new Thread() { 48 @Override 49 @SuppressWarnings("CatchAndPrintStackTrace") 50 public void run() { 51 try { 52 client.shutdown(); 53 } catch (Exception e) { 54 // At this moment logger may have stopped working 55 e.printStackTrace(); 56 } 57 } 58 }); 59 60 try { 61 client.run(); 62 } finally { 63 client.shutdown(); 64 } 65 } 66 67 private String target = "directpath-grpclb-with-pick-first-test.googleapis.com"; 68 private long rpcErrorBudgetIncreaseMinutes = 2; 69 private long affinityBreakageBudgetIncreaseMinutes = 90; 70 private long rpcIntermissionSeconds = 1; 71 private long totalTestSeconds = 60; 72 73 ManagedChannel channel; 74 TestServiceGrpc.TestServiceBlockingStub blockingStub; 75 parseArgs(String[] args)76 private void parseArgs(String[] args) { 77 boolean usage = false; 78 for (String arg : args) { 79 if (!arg.startsWith("--")) { 80 System.err.println("All arguments must start with '--': " + arg); 81 usage = true; 82 break; 83 } 84 String[] parts = arg.substring(2).split("=", 2); 85 String key = parts[0]; 86 if ("help".equals(key)) { 87 usage = true; 88 break; 89 } 90 if (parts.length != 2) { 91 System.err.println("All arguments must be of the form --arg=value"); 92 usage = true; 93 break; 94 } 95 String value = parts[1]; 96 if ("target".equals(key)) { 97 target = value; 98 } else if ("rpc_error_budget_increase_minutes".equals(key)) { 99 rpcErrorBudgetIncreaseMinutes = Long.parseLong(value); 100 } else if ("affinity_breakage_budget_increase_minutes".equals(key)) { 101 affinityBreakageBudgetIncreaseMinutes = Long.parseLong(value); 102 } else if ("rpc_intermission_seconds".equals(key)) { 103 rpcIntermissionSeconds = Long.parseLong(value); 104 } else if ("total_test_seconds".equals(key)) { 105 totalTestSeconds = Long.parseLong(value); 106 } else { 107 System.err.println("Unknown argument: " + key); 108 usage = true; 109 break; 110 } 111 } 112 if (usage) { 113 GrpclbLongLivedAffinityTestClient c = new GrpclbLongLivedAffinityTestClient(); 114 System.out.println( 115 "Usage: [ARGS...]" 116 + "\n" 117 + "\n --target=TARGET Server target. Default " + c.target 118 + "\n --rpc_error_budget_increase_minutes=MINUTES Default " 119 + c.rpcErrorBudgetIncreaseMinutes 120 + "\n --affinity_breakage_budget_increase_minutes=MINUTES Default " 121 + c.affinityBreakageBudgetIncreaseMinutes 122 + "\n --rpc_intermission_seconds=SECONDS Default " 123 + c.rpcIntermissionSeconds 124 + "\n --total_test_seconds=SECONDS Default " 125 + c.totalTestSeconds 126 ); 127 System.exit(1); 128 } 129 } 130 setUp()131 private void setUp() { 132 channel = createChannel(); 133 blockingStub = TestServiceGrpc.newBlockingStub(channel); 134 } 135 shutdown()136 private void shutdown() { 137 try { 138 if (channel != null) { 139 channel.shutdownNow(); 140 channel.awaitTermination(1, TimeUnit.SECONDS); 141 } 142 } catch (Exception ex) { 143 throw new RuntimeException(ex); 144 } 145 } 146 run()147 private void run() throws Exception { 148 final long startTimeMillis = System.currentTimeMillis(); 149 final long endTimeMillis = startTimeMillis + TimeUnit.SECONDS.toMillis(totalTestSeconds); 150 final long rpcIntermissionMillis = TimeUnit.SECONDS.toMillis(rpcIntermissionSeconds); 151 final long rpcErrorBudgetIncreasePeriodMillis = 152 TimeUnit.MINUTES.toMillis(rpcErrorBudgetIncreaseMinutes); 153 final long affinityBreakageBudgetIncreasePeriodMillis = 154 TimeUnit.MINUTES.toMillis(affinityBreakageBudgetIncreaseMinutes); 155 final SimpleRequest request = SimpleRequest.newBuilder() 156 .setResponseSize(314159) 157 .setFillServerId(true) 158 .setPayload(Payload.newBuilder() 159 .setBody(ByteString.copyFrom(new byte[271828]))) 160 .build(); 161 String lastServerId = null; 162 long rpcErrorBudget = 1; 163 long affinityBreakageBudget = 1; 164 long lastRpcErrorBudgetIncreaseTimeMillis = startTimeMillis; 165 long lastAffinityBreakageBudgetIncreaseTimeMillis = startTimeMillis; 166 167 logger.info("Test started"); 168 169 while (true) { 170 try { 171 logger.info("Sending request"); 172 SimpleResponse response = 173 blockingStub.withDeadlineAfter(1, TimeUnit.MINUTES).unaryCall(request); 174 logger.info("Received response"); 175 String serverId = response.getServerId(); 176 if (lastServerId != null && !lastServerId.equals(serverId)) { 177 String msg = "Expected serverId " + lastServerId + ", but got " + serverId; 178 logger.warning(msg + ". affinityBreakageBudget=" + affinityBreakageBudget); 179 affinityBreakageBudget--; 180 if (affinityBreakageBudget < 0) { 181 throw new AssertionError(msg); 182 } 183 } 184 } catch (StatusRuntimeException e) { 185 logger.log(Level.WARNING, "RPC error. rpcErrorBudget=" + rpcErrorBudget, e); 186 rpcErrorBudget--; 187 if (rpcErrorBudget < 0) { 188 throw e; 189 } 190 } 191 Thread.sleep(rpcIntermissionMillis); 192 long nowMillis = System.currentTimeMillis(); 193 if (nowMillis > endTimeMillis) { 194 logger.info("Time is up"); 195 break; 196 } 197 if (nowMillis > lastRpcErrorBudgetIncreaseTimeMillis + rpcErrorBudgetIncreasePeriodMillis) { 198 lastRpcErrorBudgetIncreaseTimeMillis = nowMillis; 199 rpcErrorBudget = Math.min(20, rpcErrorBudget + 1); 200 logger.info("rpcErrorBudget after refresh: " + rpcErrorBudget); 201 } 202 if (nowMillis > lastAffinityBreakageBudgetIncreaseTimeMillis 203 + affinityBreakageBudgetIncreasePeriodMillis) { 204 lastAffinityBreakageBudgetIncreaseTimeMillis = nowMillis; 205 affinityBreakageBudget = Math.min(3, affinityBreakageBudget + 1); 206 logger.info("affinityBreakageBudget after refresh: " + affinityBreakageBudget); 207 } 208 } 209 210 logger.info("Test passed."); 211 } 212 createChannel()213 private ManagedChannel createChannel() { 214 return ComputeEngineChannelBuilder.forTarget(target).build(); 215 } 216 } 217 218