• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2019 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import com.google.protobuf.ByteString;
20 import io.grpc.ManagedChannel;
21 import io.grpc.StatusRuntimeException;
22 import io.grpc.alts.ComputeEngineChannelBuilder;
23 import io.grpc.testing.integration.Messages.Payload;
24 import io.grpc.testing.integration.Messages.SimpleRequest;
25 import io.grpc.testing.integration.Messages.SimpleResponse;
26 import java.util.concurrent.TimeUnit;
27 import java.util.logging.Level;
28 import java.util.logging.Logger;
29 
30 /**
31  * Test client that verifies all requests are sent to the same server even running for an extended
32  * time, while allowing for occasionally switching server.  This is intended for testing the GRPCLB
33  * pick_first mode in GCE.
34  */
35 public final class GrpclbLongLivedAffinityTestClient {
36   private static final Logger logger =
37       Logger.getLogger(GrpclbLongLivedAffinityTestClient.class.getName());
38 
39   /**
40    * Entry point.
41    */
main(String[] args)42   public static void main(String[] args) throws Exception {
43     final GrpclbLongLivedAffinityTestClient client = new GrpclbLongLivedAffinityTestClient();
44     client.parseArgs(args);
45     client.setUp();
46 
47     Runtime.getRuntime().addShutdownHook(new Thread() {
48       @Override
49       @SuppressWarnings("CatchAndPrintStackTrace")
50       public void run() {
51         try {
52           client.shutdown();
53         } catch (Exception e) {
54           // At this moment logger may have stopped working
55           e.printStackTrace();
56         }
57       }
58     });
59 
60     try {
61       client.run();
62     } finally {
63       client.shutdown();
64     }
65   }
66 
67   private String target = "directpath-grpclb-with-pick-first-test.googleapis.com";
68   private long rpcErrorBudgetIncreaseMinutes = 2;
69   private long affinityBreakageBudgetIncreaseMinutes = 90;
70   private long rpcIntermissionSeconds = 1;
71   private long totalTestSeconds = 60;
72 
73   ManagedChannel channel;
74   TestServiceGrpc.TestServiceBlockingStub blockingStub;
75 
parseArgs(String[] args)76   private void parseArgs(String[] args) {
77     boolean usage = false;
78     for (String arg : args) {
79       if (!arg.startsWith("--")) {
80         System.err.println("All arguments must start with '--': " + arg);
81         usage = true;
82         break;
83       }
84       String[] parts = arg.substring(2).split("=", 2);
85       String key = parts[0];
86       if ("help".equals(key)) {
87         usage = true;
88         break;
89       }
90       if (parts.length != 2) {
91         System.err.println("All arguments must be of the form --arg=value");
92         usage = true;
93         break;
94       }
95       String value = parts[1];
96       if ("target".equals(key)) {
97         target = value;
98       } else if ("rpc_error_budget_increase_minutes".equals(key)) {
99         rpcErrorBudgetIncreaseMinutes = Long.parseLong(value);
100       } else if ("affinity_breakage_budget_increase_minutes".equals(key)) {
101         affinityBreakageBudgetIncreaseMinutes = Long.parseLong(value);
102       } else if ("rpc_intermission_seconds".equals(key)) {
103         rpcIntermissionSeconds = Long.parseLong(value);
104       } else if ("total_test_seconds".equals(key)) {
105         totalTestSeconds = Long.parseLong(value);
106       } else {
107         System.err.println("Unknown argument: " + key);
108         usage = true;
109         break;
110       }
111     }
112     if (usage) {
113       GrpclbLongLivedAffinityTestClient c = new GrpclbLongLivedAffinityTestClient();
114       System.out.println(
115           "Usage: [ARGS...]"
116           + "\n"
117           + "\n  --target=TARGET          Server target.             Default " + c.target
118           + "\n  --rpc_error_budget_increase_minutes=MINUTES         Default "
119           + c.rpcErrorBudgetIncreaseMinutes
120           + "\n  --affinity_breakage_budget_increase_minutes=MINUTES Default "
121           + c.affinityBreakageBudgetIncreaseMinutes
122           + "\n  --rpc_intermission_seconds=SECONDS                  Default "
123           + c.rpcIntermissionSeconds
124           + "\n  --total_test_seconds=SECONDS                        Default "
125           + c.totalTestSeconds
126       );
127       System.exit(1);
128     }
129   }
130 
setUp()131   private void setUp() {
132     channel = createChannel();
133     blockingStub = TestServiceGrpc.newBlockingStub(channel);
134   }
135 
shutdown()136   private void shutdown() {
137     try {
138       if (channel != null) {
139         channel.shutdownNow();
140         channel.awaitTermination(1, TimeUnit.SECONDS);
141       }
142     } catch (Exception ex) {
143       throw new RuntimeException(ex);
144     }
145   }
146 
run()147   private void run() throws Exception {
148     final long startTimeMillis = System.currentTimeMillis();
149     final long endTimeMillis = startTimeMillis + TimeUnit.SECONDS.toMillis(totalTestSeconds);
150     final long rpcIntermissionMillis = TimeUnit.SECONDS.toMillis(rpcIntermissionSeconds);
151     final long rpcErrorBudgetIncreasePeriodMillis =
152         TimeUnit.MINUTES.toMillis(rpcErrorBudgetIncreaseMinutes);
153     final long affinityBreakageBudgetIncreasePeriodMillis =
154         TimeUnit.MINUTES.toMillis(affinityBreakageBudgetIncreaseMinutes);
155     final SimpleRequest request = SimpleRequest.newBuilder()
156         .setResponseSize(314159)
157         .setFillServerId(true)
158         .setPayload(Payload.newBuilder()
159             .setBody(ByteString.copyFrom(new byte[271828])))
160         .build();
161     String lastServerId = null;
162     long rpcErrorBudget = 1;
163     long affinityBreakageBudget = 1;
164     long lastRpcErrorBudgetIncreaseTimeMillis = startTimeMillis;
165     long lastAffinityBreakageBudgetIncreaseTimeMillis = startTimeMillis;
166 
167     logger.info("Test started");
168 
169     while (true) {
170       try {
171         logger.info("Sending request");
172         SimpleResponse response =
173             blockingStub.withDeadlineAfter(1, TimeUnit.MINUTES).unaryCall(request);
174         logger.info("Received response");
175         String serverId = response.getServerId();
176         if (lastServerId != null && !lastServerId.equals(serverId)) {
177           String msg = "Expected serverId " + lastServerId + ", but got " + serverId;
178           logger.warning(msg + ". affinityBreakageBudget=" + affinityBreakageBudget);
179           affinityBreakageBudget--;
180           if (affinityBreakageBudget < 0) {
181             throw new AssertionError(msg);
182           }
183         }
184       } catch (StatusRuntimeException e) {
185         logger.log(Level.WARNING, "RPC error. rpcErrorBudget=" + rpcErrorBudget, e);
186         rpcErrorBudget--;
187         if (rpcErrorBudget < 0) {
188           throw e;
189         }
190       }
191       Thread.sleep(rpcIntermissionMillis);
192       long nowMillis = System.currentTimeMillis();
193       if (nowMillis > endTimeMillis) {
194         logger.info("Time is up");
195         break;
196       }
197       if (nowMillis > lastRpcErrorBudgetIncreaseTimeMillis + rpcErrorBudgetIncreasePeriodMillis) {
198         lastRpcErrorBudgetIncreaseTimeMillis = nowMillis;
199         rpcErrorBudget = Math.min(20, rpcErrorBudget + 1);
200         logger.info("rpcErrorBudget after refresh: " + rpcErrorBudget);
201       }
202       if (nowMillis > lastAffinityBreakageBudgetIncreaseTimeMillis
203           + affinityBreakageBudgetIncreasePeriodMillis) {
204         lastAffinityBreakageBudgetIncreaseTimeMillis = nowMillis;
205         affinityBreakageBudget = Math.min(3, affinityBreakageBudget + 1);
206         logger.info("affinityBreakageBudget after refresh: " + affinityBreakageBudget);
207       }
208     }
209 
210     logger.info("Test passed.");
211   }
212 
createChannel()213   private ManagedChannel createChannel() {
214     return ComputeEngineChannelBuilder.forTarget(target).build();
215   }
216 }
217 
218