1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Collections.Generic; 21 using System.Linq; 22 using System.Threading; 23 using System.Threading.Tasks; 24 using Grpc.Core.Logging; 25 using Grpc.Core.Profiling; 26 using Grpc.Core.Utils; 27 28 namespace Grpc.Core.Internal 29 { 30 /// <summary> 31 /// Pool of threads polling on a set of completions queues. 32 /// </summary> 33 internal class GrpcThreadPool 34 { 35 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>(); 36 const int FinishContinuationsSleepMillis = 10; 37 const int MaxFinishContinuationsSleepTotalMillis = 10000; 38 39 readonly GrpcEnvironment environment; 40 readonly object myLock = new object(); 41 readonly List<Thread> threads = new List<Thread>(); 42 readonly int poolSize; 43 readonly int completionQueueCount; 44 readonly bool inlineHandlers; 45 readonly WaitCallback runCompletionQueueEventCallbackSuccess; 46 readonly WaitCallback runCompletionQueueEventCallbackFailure; 47 readonly AtomicCounter queuedContinuationCounter = new AtomicCounter(); 48 49 readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>(); // profilers assigned to threadpool threads 50 51 bool stopRequested; 52 53 IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues; 54 55 /// <summary> 56 /// Creates a thread pool threads polling on a set of completions queues. 57 /// </summary> 58 /// <param name="environment">Environment.</param> 59 /// <param name="poolSize">Pool size.</param> 60 /// <param name="completionQueueCount">Completion queue count.</param> 61 /// <param name="inlineHandlers">Handler inlining.</param> GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount, bool inlineHandlers)62 public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount, bool inlineHandlers) 63 { 64 this.environment = environment; 65 this.poolSize = poolSize; 66 this.completionQueueCount = completionQueueCount; 67 this.inlineHandlers = inlineHandlers; 68 GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount, 69 "Thread pool size cannot be smaller than the number of completion queues used."); 70 71 this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((IOpCompletionCallback) callback, true)); 72 this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((IOpCompletionCallback) callback, false)); 73 } 74 Start()75 public void Start() 76 { 77 lock (myLock) 78 { 79 GrpcPreconditions.CheckState(completionQueues == null, "Already started."); 80 completionQueues = CreateCompletionQueueList(environment, completionQueueCount); 81 82 for (int i = 0; i < poolSize; i++) 83 { 84 var optionalProfiler = i < threadProfilers.Count ? threadProfilers[i] : null; 85 threads.Add(CreateAndStartThread(i, optionalProfiler)); 86 } 87 } 88 } 89 StopAsync()90 public Task StopAsync() 91 { 92 lock (myLock) 93 { 94 GrpcPreconditions.CheckState(!stopRequested, "Stop already requested."); 95 stopRequested = true; 96 97 foreach (var cq in completionQueues) 98 { 99 cq.Shutdown(); 100 } 101 } 102 103 return Task.Run(() => 104 { 105 foreach (var thread in threads) 106 { 107 thread.Join(); 108 } 109 110 foreach (var cq in completionQueues) 111 { 112 cq.Dispose(); 113 } 114 115 for (int i = 0; i < threadProfilers.Count; i++) 116 { 117 threadProfilers[i].Dump(string.Format("grpc_trace_thread_{0}.txt", i)); 118 } 119 }); 120 } 121 122 /// <summary> 123 /// Returns true if there is at least one thread pool thread that hasn't 124 /// already stopped. 125 /// Threads can either stop because all completion queues shut down or 126 /// because all foreground threads have already shutdown and process is 127 /// going to exit. 128 /// </summary> 129 internal bool IsAlive 130 { 131 get 132 { 133 return threads.Any(t => t.ThreadState != ThreadState.Stopped); 134 } 135 } 136 137 internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues 138 { 139 get 140 { 141 return completionQueues; 142 } 143 } 144 CreateAndStartThread(int threadIndex, IProfiler optionalProfiler)145 private Thread CreateAndStartThread(int threadIndex, IProfiler optionalProfiler) 146 { 147 var cqIndex = threadIndex % completionQueues.Count; 148 var cq = completionQueues.ElementAt(cqIndex); 149 150 var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq, optionalProfiler))); 151 thread.IsBackground = true; 152 thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex); 153 thread.Start(); 154 155 return thread; 156 } 157 158 /// <summary> 159 /// Body of the polling thread. 160 /// </summary> RunHandlerLoop(CompletionQueueSafeHandle cq, IProfiler optionalProfiler)161 private void RunHandlerLoop(CompletionQueueSafeHandle cq, IProfiler optionalProfiler) 162 { 163 if (optionalProfiler != null) 164 { 165 Profilers.SetForCurrentThread(optionalProfiler); 166 } 167 168 CompletionQueueEvent ev; 169 do 170 { 171 ev = cq.Next(); 172 if (ev.type == CompletionQueueEvent.CompletionType.OpComplete) 173 { 174 bool success = (ev.success != 0); 175 IntPtr tag = ev.tag; 176 try 177 { 178 var callback = cq.CompletionRegistry.Extract(tag); 179 queuedContinuationCounter.Increment(); 180 if (!inlineHandlers) 181 { 182 // Use cached delegates to avoid unnecessary allocations 183 ThreadPool.QueueUserWorkItem(success ? runCompletionQueueEventCallbackSuccess : runCompletionQueueEventCallbackFailure, callback); 184 } 185 else 186 { 187 RunCompletionQueueEventCallback(callback, success); 188 } 189 } 190 catch (Exception e) 191 { 192 Logger.Error(e, "Exception occurred while extracting event from completion registry."); 193 } 194 } 195 } 196 while (ev.type != CompletionQueueEvent.CompletionType.Shutdown); 197 198 // Continuations are running on default threadpool that consists of background threads. 199 // GrpcThreadPool thread (a foreground thread) will not exit unless all queued work had 200 // been finished to prevent terminating the continuations queued prematurely. 201 int sleepIterations = 0; 202 while (queuedContinuationCounter.Count != 0) 203 { 204 // Only happens on shutdown and having pending continuations shouldn't very common, 205 // so sleeping here for a little bit is fine. 206 if (sleepIterations >= MaxFinishContinuationsSleepTotalMillis / FinishContinuationsSleepMillis) 207 { 208 Logger.Warning("Shutting down gRPC thread [{0}] with unfinished callbacks (Timed out waiting for callbacks to finish).", 209 Thread.CurrentThread.Name); 210 break; 211 } 212 Thread.Sleep(FinishContinuationsSleepMillis); 213 sleepIterations ++; 214 } 215 } 216 CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount)217 private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount) 218 { 219 var list = new List<CompletionQueueSafeHandle>(); 220 for (int i = 0; i < completionQueueCount; i++) 221 { 222 var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease(), () => environment.RequestCallContextPool.Lease()); 223 list.Add(CompletionQueueSafeHandle.CreateAsync(completionRegistry)); 224 } 225 return list.AsReadOnly(); 226 } 227 RunCompletionQueueEventCallback(IOpCompletionCallback callback, bool success)228 private void RunCompletionQueueEventCallback(IOpCompletionCallback callback, bool success) 229 { 230 try 231 { 232 callback.OnComplete(success); 233 } 234 catch (Exception e) 235 { 236 Logger.Error(e, "Exception occurred while invoking completion delegate"); 237 } 238 finally 239 { 240 queuedContinuationCounter.Decrement(); 241 } 242 } 243 } 244 } 245