1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Collections.Concurrent; 21 using System.Collections.Generic; 22 using System.Threading; 23 using System.Threading.Tasks; 24 using BenchmarkDotNet.Attributes; 25 using Grpc.Core; 26 27 namespace Grpc.Microbenchmarks 28 { 29 30 // common base-type for tests that need to run with some level of concurrency; 31 // note there's nothing *special* about this type - it is just to save some 32 // boilerplate 33 34 [ClrJob, CoreJob] // test .NET Core and .NET Framework 35 [MemoryDiagnoser] // allocations 36 public abstract class CommonThreadedBase 37 { 38 protected virtual bool NeedsEnvironment => true; 39 40 [Params(1, 2, 4, 6)] 41 public int ThreadCount { get; set; } 42 43 protected GrpcEnvironment Environment { get; private set; } 44 45 private List<Thread> workers; 46 47 private List<BlockingCollection<Action>> dispatchQueues; 48 49 [GlobalSetup] Setup()50 public virtual void Setup() 51 { 52 dispatchQueues = new List<BlockingCollection<Action>>(); 53 workers = new List<Thread>(); 54 for (int i = 0; i < ThreadCount; i++) 55 { 56 var dispatchQueue = new BlockingCollection<Action>(); 57 var thread = new Thread(new ThreadStart(() => WorkerThreadBody(dispatchQueue))); 58 thread.Name = string.Format("threaded benchmark worker {0}", i); 59 thread.Start(); 60 workers.Add(thread); 61 dispatchQueues.Add(dispatchQueue); 62 } 63 64 if (NeedsEnvironment) Environment = GrpcEnvironment.AddRef(); 65 } 66 67 [GlobalCleanup] Cleanup()68 public virtual void Cleanup() 69 { 70 for (int i = 0; i < ThreadCount; i++) 71 { 72 dispatchQueues[i].Add(null); // null action request termination of the worker thread. 73 workers[i].Join(); 74 } 75 76 if (Environment != null) 77 { 78 Environment = null; 79 GrpcEnvironment.ReleaseAsync().Wait(); 80 } 81 } 82 83 /// <summary> 84 /// Runs the operation in parallel (once on each worker thread). 85 /// This method tries to incur as little 86 /// overhead as possible, but there is some inherent overhead 87 /// that is hard to avoid (thread hop etc.). Therefore it is strongly 88 /// recommended that the benchmarked operation runs long enough to 89 /// make this overhead negligible. 90 /// </summary> RunConcurrent(Action operation)91 protected void RunConcurrent(Action operation) 92 { 93 var workItemTasks = new Task[ThreadCount]; 94 for (int i = 0; i < ThreadCount; i++) 95 { 96 var tcs = new TaskCompletionSource<object>(); 97 var workItem = new Action(() => 98 { 99 try 100 { 101 operation(); 102 tcs.SetResult(null); 103 } 104 catch (Exception e) 105 { 106 tcs.SetException(e); 107 } 108 }); 109 workItemTasks[i] = tcs.Task; 110 dispatchQueues[i].Add(workItem); 111 } 112 Task.WaitAll(workItemTasks); 113 } 114 WorkerThreadBody(BlockingCollection<Action> dispatchQueue)115 private void WorkerThreadBody(BlockingCollection<Action> dispatchQueue) 116 { 117 while(true) 118 { 119 var workItem = dispatchQueue.Take(); 120 if (workItem == null) 121 { 122 // stop the worker if null action was provided 123 break; 124 } 125 workItem(); 126 } 127 } 128 } 129 } 130