• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #region Copyright notice and license
2 
3 // Copyright 2020 The gRPC Authors
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Collections.Generic;
21 using System.Diagnostics;
22 using System.Threading;
23 using System.Threading.Tasks;
24 
25 using CommandLine;
26 using Grpc.Core;
27 using Grpc.Core.Logging;
28 using Grpc.Core.Internal;
29 using Grpc.Testing;
30 
31 namespace Grpc.IntegrationTesting
32 {
33     public class XdsInteropClient
34     {
35         internal class ClientOptions
36         {
37             [Option("num_channels", Default = 1)]
38             public int NumChannels { get; set; }
39 
40             [Option("qps", Default = 1)]
41 
42             // The desired QPS per channel, for each type of RPC.
43             public int Qps { get; set; }
44 
45             [Option("server", Default = "localhost:8080")]
46             public string Server { get; set; }
47 
48             [Option("stats_port", Default = 8081)]
49             public int StatsPort { get; set; }
50 
51             [Option("rpc_timeout_sec", Default = 30)]
52             public int RpcTimeoutSec { get; set; }
53 
54             [Option("print_response", Default = false)]
55             public bool PrintResponse { get; set; }
56 
57             // Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall
58             [Option("rpc", Default = "UnaryCall")]
59             public string Rpc { get; set; }
60 
61             // The metadata to send with each RPC, in the format EmptyCall:key1:value1,UnaryCall:key2:value2
62             [Option("metadata", Default = null)]
63             public string Metadata { get; set; }
64         }
65 
66         internal enum RpcType
67         {
68             UnaryCall,
69             EmptyCall
70         }
71 
72         ClientOptions options;
73 
74         StatsWatcher statsWatcher = new StatsWatcher();
75 
76         List<RpcType> rpcs;
77         Dictionary<RpcType, Metadata> metadata;
78 
79         // make watcher accessible by tests
80         internal StatsWatcher StatsWatcher => statsWatcher;
81 
XdsInteropClient(ClientOptions options)82         internal XdsInteropClient(ClientOptions options)
83         {
84             this.options = options;
85             this.rpcs = ParseRpcArgument(this.options.Rpc);
86             this.metadata = ParseMetadataArgument(this.options.Metadata);
87         }
88 
Run(string[] args)89         public static void Run(string[] args)
90         {
91             GrpcEnvironment.SetLogger(new ConsoleLogger());
92             var parserResult = Parser.Default.ParseArguments<ClientOptions>(args)
93                 .WithNotParsed(errors => Environment.Exit(1))
94                 .WithParsed(options =>
95                 {
96                     var xdsInteropClient = new XdsInteropClient(options);
97                     xdsInteropClient.RunAsync().Wait();
98                 });
99         }
100 
RunAsync()101         private async Task RunAsync()
102         {
103             var server = new Server
104             {
105                 Services = { LoadBalancerStatsService.BindService(new LoadBalancerStatsServiceImpl(statsWatcher)) }
106             };
107 
108             string host = "0.0.0.0";
109             server.Ports.Add(host, options.StatsPort, ServerCredentials.Insecure);
110             Console.WriteLine($"Running server on {host}:{options.StatsPort}");
111             server.Start();
112 
113             var cancellationTokenSource = new CancellationTokenSource();
114             await RunChannelsAsync(cancellationTokenSource.Token);
115 
116             await server.ShutdownAsync();
117         }
118 
119         // method made internal to make it runnable by tests
RunChannelsAsync(CancellationToken cancellationToken)120         internal async Task RunChannelsAsync(CancellationToken cancellationToken)
121         {
122             var channelTasks = new List<Task>();
123             for (int channelId = 0; channelId < options.NumChannels; channelId++)
124             {
125                 var channelTask = RunSingleChannelAsync(channelId, cancellationToken);
126                 channelTasks.Add(channelTask);
127             }
128 
129             for (int channelId = 0; channelId < options.NumChannels; channelId++)
130             {
131                 await channelTasks[channelId];
132             }
133         }
134 
RunSingleChannelAsync(int channelId, CancellationToken cancellationToken)135         private async Task RunSingleChannelAsync(int channelId, CancellationToken cancellationToken)
136         {
137             Console.WriteLine($"Starting channel {channelId}");
138             var channel = new Channel(options.Server, ChannelCredentials.Insecure);
139             var client = new TestService.TestServiceClient(channel);
140 
141             var inflightTasks = new List<Task>();
142             long rpcsStarted = 0;
143             var stopwatch = Stopwatch.StartNew();
144             while (!cancellationToken.IsCancellationRequested)
145             {
146                 foreach (var rpcType in rpcs)
147                 {
148                     inflightTasks.Add(RunSingleRpcAsync(client, cancellationToken, rpcType));
149                     rpcsStarted++;
150                 }
151 
152                 // only cleanup calls that have already completed, calls that are still inflight will be cleaned up later.
153                 await CleanupCompletedTasksAsync(inflightTasks);
154 
155                 // if needed, wait a bit before we start the next RPC.
156                 int nextDueInMillis = (int) Math.Max(0, (1000 * rpcsStarted / options.Qps / rpcs.Count) - stopwatch.ElapsedMilliseconds);
157                 if (nextDueInMillis > 0)
158                 {
159                     await Task.Delay(nextDueInMillis);
160                 }
161             }
162             stopwatch.Stop();
163 
164             Console.WriteLine($"Shutting down channel {channelId}");
165             await channel.ShutdownAsync();
166             Console.WriteLine($"Channel shutdown {channelId}");
167         }
168 
RunSingleRpcAsync(TestService.TestServiceClient client, CancellationToken cancellationToken, RpcType rpcType)169         private async Task RunSingleRpcAsync(TestService.TestServiceClient client, CancellationToken cancellationToken, RpcType rpcType)
170         {
171             long rpcId = statsWatcher.RpcIdGenerator.Increment();
172             try
173             {
174                 // metadata to send with the RPC
175                 var headers = new Metadata();
176                 if (metadata.ContainsKey(rpcType))
177                 {
178                     headers = metadata[rpcType];
179                     if (headers.Count > 0)
180                     {
181                         var printableHeaders = "[" + string.Join(", ", headers) + "]";
182                     }
183                 }
184 
185                 if (rpcType == RpcType.UnaryCall)
186                 {
187 
188                     var call = client.UnaryCallAsync(new SimpleRequest(),
189                         new CallOptions(headers: headers, cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec)));
190 
191                     var response = await call;
192                     var hostname = (await call.ResponseHeadersAsync).GetValue("hostname") ?? response.Hostname;
193                     statsWatcher.OnRpcComplete(rpcId, rpcType, hostname);
194                     if (options.PrintResponse)
195                     {
196                         Console.WriteLine($"Got response {response}");
197                     }
198                 }
199                 else if (rpcType == RpcType.EmptyCall)
200                 {
201                     var call = client.EmptyCallAsync(new Empty(),
202                         new CallOptions(headers: headers, cancellationToken: cancellationToken, deadline: DateTime.UtcNow.AddSeconds(options.RpcTimeoutSec)));
203 
204                     var response = await call;
205                     var hostname = (await call.ResponseHeadersAsync).GetValue("hostname");
206                     statsWatcher.OnRpcComplete(rpcId, rpcType, hostname);
207                     if (options.PrintResponse)
208                     {
209                         Console.WriteLine($"Got response {response}");
210                     }
211                 }
212                 else
213                 {
214                     throw new InvalidOperationException($"Unsupported RPC type ${rpcType}");
215                 }
216             }
217             catch (RpcException ex)
218             {
219                 statsWatcher.OnRpcComplete(rpcId, rpcType, null);
220                 if (options.PrintResponse)
221                 {
222                     Console.WriteLine($"RPC {rpcId} failed: {ex}");
223                 }
224             }
225         }
226 
CleanupCompletedTasksAsync(List<Task> tasks)227         private async Task CleanupCompletedTasksAsync(List<Task> tasks)
228         {
229             var toRemove = new List<Task>();
230             foreach (var task in tasks)
231             {
232                 if (task.IsCompleted)
233                 {
234                     // awaiting tasks that have already completed should be instantaneous
235                     await task;
236                 }
237                 toRemove.Add(task);
238             }
239             foreach (var task in toRemove)
240             {
241                 tasks.Remove(task);
242             }
243         }
244 
ParseRpcArgument(string rpcArg)245         private static List<RpcType> ParseRpcArgument(string rpcArg)
246         {
247             var result = new List<RpcType>();
248             foreach (var part in rpcArg.Split(','))
249             {
250                 result.Add(ParseRpc(part));
251             }
252             return result;
253         }
254 
ParseRpc(string rpc)255         private static RpcType ParseRpc(string rpc)
256         {
257             switch (rpc)
258             {
259                 case "UnaryCall":
260                     return RpcType.UnaryCall;
261                 case "EmptyCall":
262                     return RpcType.EmptyCall;
263                 default:
264                     throw new ArgumentException($"Unknown RPC: \"{rpc}\"");
265             }
266         }
267 
ParseMetadataArgument(string metadataArg)268         private static Dictionary<RpcType, Metadata> ParseMetadataArgument(string metadataArg)
269         {
270             var rpcMetadata = new Dictionary<RpcType, Metadata>();
271             if (string.IsNullOrEmpty(metadataArg))
272             {
273                 return rpcMetadata;
274             }
275 
276             foreach (var metadata in metadataArg.Split(','))
277             {
278                 var parts = metadata.Split(':');
279                 if (parts.Length != 3)
280                 {
281                     throw new ArgumentException($"Invalid metadata: \"{metadata}\"");
282                 }
283                 var rpc = ParseRpc(parts[0]);
284                 var key = parts[1];
285                 var value = parts[2];
286 
287                 var md = new Metadata { {key, value} };
288 
289                 if (rpcMetadata.ContainsKey(rpc))
290                 {
291                     var existingMetadata = rpcMetadata[rpc];
292                     foreach (var entry in md)
293                     {
294                         existingMetadata.Add(entry);
295                     }
296                 }
297                 else
298                 {
299                     rpcMetadata.Add(rpc, md);
300                 }
301             }
302             return rpcMetadata;
303         }
304     }
305 
306     internal class StatsWatcher
307     {
308         private readonly object myLock = new object();
309         private readonly AtomicCounter rpcIdGenerator = new AtomicCounter(0);
310 
311         private long? firstAcceptedRpcId;
312         private int numRpcsWanted;
313         private int rpcsCompleted;
314         private int rpcsNoHostname;
315         private Dictionary<string, int> rpcsByHostname;
316         private Dictionary<string, Dictionary<string, int>> rpcsByMethod;
317 
318         public AtomicCounter RpcIdGenerator => rpcIdGenerator;
319 
StatsWatcher()320         public StatsWatcher()
321         {
322             Reset();
323         }
324 
OnRpcComplete(long rpcId, XdsInteropClient.RpcType rpcType, string responseHostname)325         public void OnRpcComplete(long rpcId, XdsInteropClient.RpcType rpcType, string responseHostname)
326         {
327             lock (myLock)
328             {
329                 if (!firstAcceptedRpcId.HasValue || rpcId < firstAcceptedRpcId || rpcId >= firstAcceptedRpcId + numRpcsWanted)
330                 {
331                     return;
332                 }
333 
334                 if (string.IsNullOrEmpty(responseHostname))
335                 {
336                     rpcsNoHostname ++;
337                 }
338                 else
339                 {
340                     // update rpcsByHostname
341                     if (!rpcsByHostname.ContainsKey(responseHostname))
342                     {
343                         rpcsByHostname[responseHostname] = 0;
344                     }
345                     rpcsByHostname[responseHostname] += 1;
346 
347                     // update rpcsByMethod
348                     var method = rpcType.ToString();
349                     if (!rpcsByMethod.ContainsKey(method))
350                     {
351                         rpcsByMethod[method] = new Dictionary<string, int>();
352                     }
353                     if (!rpcsByMethod[method].ContainsKey(responseHostname))
354                     {
355                         rpcsByMethod[method][responseHostname] = 0;
356                     }
357                     rpcsByMethod[method][responseHostname] += 1;
358                 }
359                 rpcsCompleted += 1;
360 
361                 if (rpcsCompleted >= numRpcsWanted)
362                 {
363                     Monitor.Pulse(myLock);
364                 }
365             }
366         }
367 
Reset()368         public void Reset()
369         {
370             lock (myLock)
371             {
372                 firstAcceptedRpcId = null;
373                 numRpcsWanted = 0;
374                 rpcsCompleted = 0;
375                 rpcsNoHostname = 0;
376                 rpcsByHostname = new Dictionary<string, int>();
377                 rpcsByMethod = new Dictionary<string, Dictionary<string, int>>();
378             }
379         }
380 
WaitForRpcStatsResponse(int rpcsWanted, int timeoutSec)381         public LoadBalancerStatsResponse WaitForRpcStatsResponse(int rpcsWanted, int timeoutSec)
382         {
383             lock (myLock)
384             {
385                 if (firstAcceptedRpcId.HasValue)
386                 {
387                     throw new InvalidOperationException("StateWatcher is already collecting stats.");
388                 }
389                 // we are only interested in the next numRpcsWanted RPCs
390                 firstAcceptedRpcId = rpcIdGenerator.Count + 1;
391                 numRpcsWanted = rpcsWanted;
392 
393                 var deadline = DateTime.UtcNow.AddSeconds(timeoutSec);
394                 while (true)
395                 {
396                     var timeoutMillis = Math.Max((int)(deadline - DateTime.UtcNow).TotalMilliseconds, 0);
397                     if (!Monitor.Wait(myLock, timeoutMillis) || rpcsCompleted >= rpcsWanted)
398                     {
399                         // we collected enough RPCs, or timed out waiting
400                         var response = new LoadBalancerStatsResponse { NumFailures = rpcsNoHostname };
401                         response.RpcsByPeer.Add(rpcsByHostname);
402 
403                         response.RpcsByMethod.Clear();
404                         foreach (var methodEntry in rpcsByMethod)
405                         {
406                             var rpcsByPeer = new LoadBalancerStatsResponse.Types.RpcsByPeer();
407                             rpcsByPeer.RpcsByPeer_.Add(methodEntry.Value);
408                             response.RpcsByMethod[methodEntry.Key] = rpcsByPeer;
409                         }
410                         Reset();
411                         return response;
412                     }
413                 }
414             }
415         }
416     }
417 
418     /// <summary>
419     /// Implementation of LoadBalancerStatsService server
420     /// </summary>
421     internal class LoadBalancerStatsServiceImpl : LoadBalancerStatsService.LoadBalancerStatsServiceBase
422     {
423         StatsWatcher statsWatcher;
424 
LoadBalancerStatsServiceImpl(StatsWatcher statsWatcher)425         public LoadBalancerStatsServiceImpl(StatsWatcher statsWatcher)
426         {
427             this.statsWatcher = statsWatcher;
428         }
429 
GetClientStats(LoadBalancerStatsRequest request, ServerCallContext context)430         public override async Task<LoadBalancerStatsResponse> GetClientStats(LoadBalancerStatsRequest request, ServerCallContext context)
431         {
432             // run as a task to avoid blocking
433             var response = await Task.Run(() => statsWatcher.WaitForRpcStatsResponse(request.NumRpcs, request.TimeoutSec));
434             Console.WriteLine($"Returning stats {response} (num of requested RPCs: {request.NumRpcs})");
435             return response;
436         }
437     }
438 }
439