1 #region Copyright notice and license 2 3 // Copyright 2020 The gRPC Authors 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Collections.Generic; 21 using System.Diagnostics; 22 using System.Threading; 23 using System.Threading.Tasks; 24 25 using CommandLine; 26 using Grpc.Core; 27 using Grpc.Core.Logging; 28 using Grpc.Core.Internal; 29 using Grpc.Testing; 30 31 namespace Grpc.IntegrationTesting 32 { 33 public class XdsInteropClient 34 { 35 internal class ClientOptions 36 { 37 [Option("num_channels", Default = 1)] 38 public int NumChannels { get; set; } 39 40 [Option("qps", Default = 1)] 41 42 // The desired QPS per channel, for each type of RPC. 43 public int Qps { get; set; } 44 45 [Option("server", Default = "localhost:8080")] 46 public string Server { get; set; } 47 48 [Option("stats_port", Default = 8081)] 49 public int StatsPort { get; set; } 50 51 [Option("rpc_timeout_sec", Default = 30)] 52 public int RpcTimeoutSec { get; set; } 53 54 [Option("print_response", Default = false)] 55 public bool PrintResponse { get; set; } 56 57 // Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall 58 [Option("rpc", Default = "UnaryCall")] 59 public string Rpc { get; set; } 60 61 // The metadata to send with each RPC, in the format EmptyCall:key1:value1,UnaryCall:key2:value2 62 [Option("metadata", Default = null)] 63 public string Metadata { get; set; } 64 } 65 66 internal enum RpcType 67 { 68 UnaryCall, 69 EmptyCall 70 } 71 72 ClientOptions options; 73 74 StatsWatcher statsWatcher = new StatsWatcher(); 75 76 List<RpcType> rpcs; 77 Dictionary<RpcType, Metadata> metadata; 78 79 // make watcher accessible by tests 80 internal StatsWatcher StatsWatcher => statsWatcher; 81 XdsInteropClient(ClientOptions options)82 internal XdsInteropClient(ClientOptions options) 83 { 84 this.options = options; 85 this.rpcs = ParseRpcArgument(this.options.Rpc); 86 this.metadata = ParseMetadataArgument(this.options.Metadata); 87 } 88 Run(string[] args)89 public static void Run(string[] args) 90 { 91 GrpcEnvironment.SetLogger(new ConsoleLogger()); 92 var parserResult = Parser.Default.ParseArguments<ClientOptions>(args) 93 .WithNotParsed(errors => Environment.Exit(1)) 94 .WithParsed(options => 95 { 96 var xdsInteropClient = new XdsInteropClient(options); 97 xdsInteropClient.RunAsync().Wait(); 98 }); 99 } 100 RunAsync()101 private async Task RunAsync() 102 { 103 var server = new Server 104 { 105 Services = { LoadBalancerStatsService.BindService(new LoadBalancerStatsServiceImpl(statsWatcher)) } 106 }; 107 108 string host = "0.0.0.0"; 109 server.Ports.Add(host, options.StatsPort, ServerCredentials.Insecure); 110 Console.WriteLine($"Running server on {host}:{options.StatsPort}"); 111 server.Start(); 112 113 var cancellationTokenSource = new CancellationTokenSource(); 114 await RunChannelsAsync(cancellationTokenSource.Token); 115 116 await server.ShutdownAsync(); 117 } 118 119 // method made internal to make it runnable by tests RunChannelsAsync(CancellationToken cancellationToken)120 internal async Task RunChannelsAsync(CancellationToken cancellationToken) 121 { 122 var channelTasks = new List<Task>(); 123 for (int channelId = 0; channelId < options.NumChannels; channelId++) 124 { 125 var channelTask = RunSingleChannelAsync(channelId, cancellationToken); 126 channelTasks.Add(channelTask); 127 } 128 129 for (int channelId = 0; channelId < options.NumChannels; channelId++) 130 { 131 await channelTasks[channelId]; 132 } 133 } 134 RunSingleChannelAsync(int channelId, CancellationToken cancellationToken)135 private async Task RunSingleChannelAsync(int channelId, CancellationToken cancellationToken) 136 { 137 Console.WriteLine($"Starting channel {channelId}"); 138 var channel = new Channel(options.Server, ChannelCredentials.Insecure); 139 var client = new TestService.TestServiceClient(channel); 140 141 var inflightTasks = new List<Task>(); 142 long rpcsStarted = 0; 143 var stopwatch = Stopwatch.StartNew(); 144 while (!cancellationToken.IsCancellationRequested) 145 { 146 foreach (var rpcType in rpcs) 147 { 148 inflightTasks.Add(RunSingleRpcAsync(client, cancellationToken, rpcType)); 149 rpcsStarted++; 150 } 151 152 // only cleanup calls that have already completed, calls that are still inflight will be cleaned up later. 153 await CleanupCompletedTasksAsync(inflightTasks); 154 155 // if needed, wait a bit before we start the next RPC. 156 int nextDueInMillis = (int) Math.Max(0, (1000 * rpcsStarted / options.Qps / rpcs.Count) - stopwatch.ElapsedMilliseconds); 157 if (nextDueInMillis > 0) 158 { 159 await Task.Delay(nextDueInMillis); 160 } 161 } 162 stopwatch.Stop(); 163 164 Console.WriteLine($"Shutting down channel {channelId}"); 165 await channel.ShutdownAsync(); 166 Console.WriteLine($"Channel shutdown {channelId}"); 167 } 168 RunSingleRpcAsync(TestService.TestServiceClient client, CancellationToken cancellationToken, RpcType rpcType)169 private async Task RunSingleRpcAsync(TestService.TestServiceClient client, CancellationToken cancellationToken, RpcType rpcType) 170 { 171 long rpcId = statsWatcher.RpcIdGenerator.Increment(); 172 try 173 { 174 // metadata to send with the RPC 175 var headers = new Metadata(); 176 if (metadata.ContainsKey(rpcType)) 177 { 178 headers = metadata[rpcType]; 179 if (headers.Count > 0) 180 { 181 var printableHeaders = "[" + string.Join(", ", headers) + "]"; 182 } 183 } 184 185 if (rpcType == RpcType.UnaryCall) 186 { 187 188 var call = client.UnaryCallAsync(new SimpleRequest(), 189 new CallOptions(headers: headers, cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec))); 190 191 var response = await call; 192 var hostname = (await call.ResponseHeadersAsync).GetValue("hostname") ?? response.Hostname; 193 statsWatcher.OnRpcComplete(rpcId, rpcType, hostname); 194 if (options.PrintResponse) 195 { 196 Console.WriteLine($"Got response {response}"); 197 } 198 } 199 else if (rpcType == RpcType.EmptyCall) 200 { 201 var call = client.EmptyCallAsync(new Empty(), 202 new CallOptions(headers: headers, cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec))); 203 204 var response = await call; 205 var hostname = (await call.ResponseHeadersAsync).GetValue("hostname"); 206 statsWatcher.OnRpcComplete(rpcId, rpcType, hostname); 207 if (options.PrintResponse) 208 { 209 Console.WriteLine($"Got response {response}"); 210 } 211 } 212 else 213 { 214 throw new InvalidOperationException($"Unsupported RPC type ${rpcType}"); 215 } 216 } 217 catch (RpcException ex) 218 { 219 statsWatcher.OnRpcComplete(rpcId, rpcType, null); 220 if (options.PrintResponse) 221 { 222 Console.WriteLine($"RPC {rpcId} failed: {ex}"); 223 } 224 } 225 } 226 CleanupCompletedTasksAsync(List<Task> tasks)227 private async Task CleanupCompletedTasksAsync(List<Task> tasks) 228 { 229 var toRemove = new List<Task>(); 230 foreach (var task in tasks) 231 { 232 if (task.IsCompleted) 233 { 234 // awaiting tasks that have already completed should be instantaneous 235 await task; 236 } 237 toRemove.Add(task); 238 } 239 foreach (var task in toRemove) 240 { 241 tasks.Remove(task); 242 } 243 } 244 ParseRpcArgument(string rpcArg)245 private static List<RpcType> ParseRpcArgument(string rpcArg) 246 { 247 var result = new List<RpcType>(); 248 foreach (var part in rpcArg.Split(',')) 249 { 250 result.Add(ParseRpc(part)); 251 } 252 return result; 253 } 254 ParseRpc(string rpc)255 private static RpcType ParseRpc(string rpc) 256 { 257 switch (rpc) 258 { 259 case "UnaryCall": 260 return RpcType.UnaryCall; 261 case "EmptyCall": 262 return RpcType.EmptyCall; 263 default: 264 throw new ArgumentException($"Unknown RPC: \"{rpc}\""); 265 } 266 } 267 ParseMetadataArgument(string metadataArg)268 private static Dictionary<RpcType, Metadata> ParseMetadataArgument(string metadataArg) 269 { 270 var rpcMetadata = new Dictionary<RpcType, Metadata>(); 271 if (string.IsNullOrEmpty(metadataArg)) 272 { 273 return rpcMetadata; 274 } 275 276 foreach (var metadata in metadataArg.Split(',')) 277 { 278 var parts = metadata.Split(':'); 279 if (parts.Length != 3) 280 { 281 throw new ArgumentException($"Invalid metadata: \"{metadata}\""); 282 } 283 var rpc = ParseRpc(parts[0]); 284 var key = parts[1]; 285 var value = parts[2]; 286 287 var md = new Metadata { {key, value} }; 288 289 if (rpcMetadata.ContainsKey(rpc)) 290 { 291 var existingMetadata = rpcMetadata[rpc]; 292 foreach (var entry in md) 293 { 294 existingMetadata.Add(entry); 295 } 296 } 297 else 298 { 299 rpcMetadata.Add(rpc, md); 300 } 301 } 302 return rpcMetadata; 303 } 304 } 305 306 internal class StatsWatcher 307 { 308 private readonly object myLock = new object(); 309 private readonly AtomicCounter rpcIdGenerator = new AtomicCounter(0); 310 311 private long? firstAcceptedRpcId; 312 private int numRpcsWanted; 313 private int rpcsCompleted; 314 private int rpcsNoHostname; 315 private Dictionary<string, int> rpcsByHostname; 316 private Dictionary<string, Dictionary<string, int>> rpcsByMethod; 317 318 public AtomicCounter RpcIdGenerator => rpcIdGenerator; 319 StatsWatcher()320 public StatsWatcher() 321 { 322 Reset(); 323 } 324 OnRpcComplete(long rpcId, XdsInteropClient.RpcType rpcType, string responseHostname)325 public void OnRpcComplete(long rpcId, XdsInteropClient.RpcType rpcType, string responseHostname) 326 { 327 lock (myLock) 328 { 329 if (!firstAcceptedRpcId.HasValue || rpcId < firstAcceptedRpcId || rpcId >= firstAcceptedRpcId + numRpcsWanted) 330 { 331 return; 332 } 333 334 if (string.IsNullOrEmpty(responseHostname)) 335 { 336 rpcsNoHostname ++; 337 } 338 else 339 { 340 // update rpcsByHostname 341 if (!rpcsByHostname.ContainsKey(responseHostname)) 342 { 343 rpcsByHostname[responseHostname] = 0; 344 } 345 rpcsByHostname[responseHostname] += 1; 346 347 // update rpcsByMethod 348 var method = rpcType.ToString(); 349 if (!rpcsByMethod.ContainsKey(method)) 350 { 351 rpcsByMethod[method] = new Dictionary<string, int>(); 352 } 353 if (!rpcsByMethod[method].ContainsKey(responseHostname)) 354 { 355 rpcsByMethod[method][responseHostname] = 0; 356 } 357 rpcsByMethod[method][responseHostname] += 1; 358 } 359 rpcsCompleted += 1; 360 361 if (rpcsCompleted >= numRpcsWanted) 362 { 363 Monitor.Pulse(myLock); 364 } 365 } 366 } 367 Reset()368 public void Reset() 369 { 370 lock (myLock) 371 { 372 firstAcceptedRpcId = null; 373 numRpcsWanted = 0; 374 rpcsCompleted = 0; 375 rpcsNoHostname = 0; 376 rpcsByHostname = new Dictionary<string, int>(); 377 rpcsByMethod = new Dictionary<string, Dictionary<string, int>>(); 378 } 379 } 380 WaitForRpcStatsResponse(int rpcsWanted, int timeoutSec)381 public LoadBalancerStatsResponse WaitForRpcStatsResponse(int rpcsWanted, int timeoutSec) 382 { 383 lock (myLock) 384 { 385 if (firstAcceptedRpcId.HasValue) 386 { 387 throw new InvalidOperationException("StateWatcher is already collecting stats."); 388 } 389 // we are only interested in the next numRpcsWanted RPCs 390 firstAcceptedRpcId = rpcIdGenerator.Count + 1; 391 numRpcsWanted = rpcsWanted; 392 393 var deadline = DateTime.UtcNow.AddSeconds(timeoutSec); 394 while (true) 395 { 396 var timeoutMillis = Math.Max((int)(deadline - DateTime.UtcNow).TotalMilliseconds, 0); 397 if (!Monitor.Wait(myLock, timeoutMillis) || rpcsCompleted >= rpcsWanted) 398 { 399 // we collected enough RPCs, or timed out waiting 400 var response = new LoadBalancerStatsResponse { NumFailures = rpcsNoHostname }; 401 response.RpcsByPeer.Add(rpcsByHostname); 402 403 response.RpcsByMethod.Clear(); 404 foreach (var methodEntry in rpcsByMethod) 405 { 406 var rpcsByPeer = new LoadBalancerStatsResponse.Types.RpcsByPeer(); 407 rpcsByPeer.RpcsByPeer_.Add(methodEntry.Value); 408 response.RpcsByMethod[methodEntry.Key] = rpcsByPeer; 409 } 410 Reset(); 411 return response; 412 } 413 } 414 } 415 } 416 } 417 418 /// <summary> 419 /// Implementation of LoadBalancerStatsService server 420 /// </summary> 421 internal class LoadBalancerStatsServiceImpl : LoadBalancerStatsService.LoadBalancerStatsServiceBase 422 { 423 StatsWatcher statsWatcher; 424 LoadBalancerStatsServiceImpl(StatsWatcher statsWatcher)425 public LoadBalancerStatsServiceImpl(StatsWatcher statsWatcher) 426 { 427 this.statsWatcher = statsWatcher; 428 } 429 GetClientStats(LoadBalancerStatsRequest request, ServerCallContext context)430 public override async Task<LoadBalancerStatsResponse> GetClientStats(LoadBalancerStatsRequest request, ServerCallContext context) 431 { 432 // run as a task to avoid blocking 433 var response = await Task.Run(() => statsWatcher.WaitForRpcStatsResponse(request.NumRpcs, request.TimeoutSec)); 434 Console.WriteLine($"Returning stats {response} (num of requested RPCs: {request.NumRpcs})"); 435 return response; 436 } 437 } 438 } 439