• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Collections.Generic;
21 using System.Linq;
22 using System.Threading;
23 using System.Threading.Tasks;
24 using Grpc.Core.Logging;
25 using Grpc.Core.Profiling;
26 using Grpc.Core.Utils;
27 
28 namespace Grpc.Core.Internal
29 {
30     /// <summary>
31     /// Pool of threads polling on a set of completions queues.
32     /// </summary>
33     internal class GrpcThreadPool
34     {
35         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>();
36         const int FinishContinuationsSleepMillis = 10;
37         const int MaxFinishContinuationsSleepTotalMillis = 10000;
38 
39         readonly GrpcEnvironment environment;
40         readonly object myLock = new object();
41         readonly List<Thread> threads = new List<Thread>();
42         readonly int poolSize;
43         readonly int completionQueueCount;
44         readonly bool inlineHandlers;
45         readonly WaitCallback runCompletionQueueEventCallbackSuccess;
46         readonly WaitCallback runCompletionQueueEventCallbackFailure;
47         readonly AtomicCounter queuedContinuationCounter = new AtomicCounter();
48 
49         readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>();  // profilers assigned to threadpool threads
50 
51         bool stopRequested;
52 
53         IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
54 
55         /// <summary>
56         /// Creates a thread pool threads polling on a set of completions queues.
57         /// </summary>
58         /// <param name="environment">Environment.</param>
59         /// <param name="poolSize">Pool size.</param>
60         /// <param name="completionQueueCount">Completion queue count.</param>
61         /// <param name="inlineHandlers">Handler inlining.</param>
GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount, bool inlineHandlers)62         public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount, bool inlineHandlers)
63         {
64             this.environment = environment;
65             this.poolSize = poolSize;
66             this.completionQueueCount = completionQueueCount;
67             this.inlineHandlers = inlineHandlers;
68             GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
69                 "Thread pool size cannot be smaller than the number of completion queues used.");
70 
71             this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((IOpCompletionCallback) callback, true));
72             this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((IOpCompletionCallback) callback, false));
73         }
74 
Start()75         public void Start()
76         {
77             lock (myLock)
78             {
79                 GrpcPreconditions.CheckState(completionQueues == null, "Already started.");
80                 completionQueues = CreateCompletionQueueList(environment, completionQueueCount);
81 
82                 for (int i = 0; i < poolSize; i++)
83                 {
84                     var optionalProfiler = i < threadProfilers.Count ? threadProfilers[i] : null;
85                     threads.Add(CreateAndStartThread(i, optionalProfiler));
86                 }
87             }
88         }
89 
StopAsync()90         public Task StopAsync()
91         {
92             lock (myLock)
93             {
94                 GrpcPreconditions.CheckState(!stopRequested, "Stop already requested.");
95                 stopRequested = true;
96 
97                 foreach (var cq in completionQueues)
98                 {
99                     cq.Shutdown();
100                 }
101             }
102 
103             return Task.Run(() =>
104             {
105                 foreach (var thread in threads)
106                 {
107                     thread.Join();
108                 }
109 
110                 foreach (var cq in completionQueues)
111                 {
112                     cq.Dispose();
113                 }
114 
115                 for (int i = 0; i < threadProfilers.Count; i++)
116                 {
117                     threadProfilers[i].Dump(string.Format("grpc_trace_thread_{0}.txt", i));
118                 }
119             });
120         }
121 
122         /// <summary>
123         /// Returns true if there is at least one thread pool thread that hasn't
124         /// already stopped.
125         /// Threads can either stop because all completion queues shut down or
126         /// because all foreground threads have already shutdown and process is
127         /// going to exit.
128         /// </summary>
129         internal bool IsAlive
130         {
131             get
132             {
133                 return threads.Any(t => t.ThreadState != ThreadState.Stopped);
134             }
135         }
136 
137         internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
138         {
139             get
140             {
141                 return completionQueues;
142             }
143         }
144 
CreateAndStartThread(int threadIndex, IProfiler optionalProfiler)145         private Thread CreateAndStartThread(int threadIndex, IProfiler optionalProfiler)
146         {
147             var cqIndex = threadIndex % completionQueues.Count;
148             var cq = completionQueues.ElementAt(cqIndex);
149 
150             var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq, optionalProfiler)));
151             thread.IsBackground = true;
152             thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
153             thread.Start();
154 
155             return thread;
156         }
157 
158         /// <summary>
159         /// Body of the polling thread.
160         /// </summary>
RunHandlerLoop(CompletionQueueSafeHandle cq, IProfiler optionalProfiler)161         private void RunHandlerLoop(CompletionQueueSafeHandle cq, IProfiler optionalProfiler)
162         {
163             if (optionalProfiler != null)
164             {
165                 Profilers.SetForCurrentThread(optionalProfiler);
166             }
167 
168             CompletionQueueEvent ev;
169             do
170             {
171                 ev = cq.Next();
172                 if (ev.type == CompletionQueueEvent.CompletionType.OpComplete)
173                 {
174                     bool success = (ev.success != 0);
175                     IntPtr tag = ev.tag;
176                     try
177                     {
178                         var callback = cq.CompletionRegistry.Extract(tag);
179                         queuedContinuationCounter.Increment();
180                         if (!inlineHandlers)
181                         {
182                             // Use cached delegates to avoid unnecessary allocations
183                             ThreadPool.QueueUserWorkItem(success ? runCompletionQueueEventCallbackSuccess : runCompletionQueueEventCallbackFailure, callback);
184                         }
185                         else
186                         {
187                             RunCompletionQueueEventCallback(callback, success);
188                         }
189                     }
190                     catch (Exception e)
191                     {
192                         Logger.Error(e, "Exception occurred while extracting event from completion registry.");
193                     }
194                 }
195             }
196             while (ev.type != CompletionQueueEvent.CompletionType.Shutdown);
197 
198             // Continuations are running on default threadpool that consists of background threads.
199             // GrpcThreadPool thread (a foreground thread) will not exit unless all queued work had
200             // been finished to prevent terminating the continuations queued prematurely.
201             int sleepIterations = 0;
202             while (queuedContinuationCounter.Count != 0)
203             {
204                 // Only happens on shutdown and having pending continuations shouldn't very common,
205                 // so sleeping here for a little bit is fine.
206                 if (sleepIterations >= MaxFinishContinuationsSleepTotalMillis / FinishContinuationsSleepMillis)
207                 {
208                     Logger.Warning("Shutting down gRPC thread [{0}] with unfinished callbacks (Timed out waiting for callbacks to finish).",
209                         Thread.CurrentThread.Name);
210                     break;
211                 }
212                 Thread.Sleep(FinishContinuationsSleepMillis);
213                 sleepIterations ++;
214             }
215         }
216 
CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount)217         private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount)
218         {
219             var list = new List<CompletionQueueSafeHandle>();
220             for (int i = 0; i < completionQueueCount; i++)
221             {
222                 var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease(), () => environment.RequestCallContextPool.Lease());
223                 list.Add(CompletionQueueSafeHandle.CreateAsync(completionRegistry));
224             }
225             return list.AsReadOnly();
226         }
227 
RunCompletionQueueEventCallback(IOpCompletionCallback callback, bool success)228         private void RunCompletionQueueEventCallback(IOpCompletionCallback callback, bool success)
229         {
230             try
231             {
232                 callback.OnComplete(success);
233             }
234             catch (Exception e)
235             {
236                 Logger.Error(e, "Exception occurred while invoking completion delegate");
237             }
238             finally
239             {
240                 queuedContinuationCounter.Decrement();
241             }
242         }
243     }
244 }
245