• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Collections.Generic;
21 using System.Linq;
22 using System.Runtime.InteropServices;
23 using System.Threading.Tasks;
24 using Grpc.Core.Internal;
25 using Grpc.Core.Logging;
26 using Grpc.Core.Utils;
27 
28 namespace Grpc.Core
29 {
30     /// <summary>
31     /// Encapsulates initialization and shutdown of gRPC library.
32     /// </summary>
33     public class GrpcEnvironment
34     {
35         const int MinDefaultThreadPoolSize = 4;
36         const int DefaultBatchContextPoolSharedCapacity = 10000;
37         const int DefaultBatchContextPoolThreadLocalCapacity = 64;
38         const int DefaultRequestCallContextPoolSharedCapacity = 10000;
39         const int DefaultRequestCallContextPoolThreadLocalCapacity = 64;
40 
41         static object staticLock = new object();
42         static GrpcEnvironment instance;
43         static int refCount;
44         static int? customThreadPoolSize;
45         static int? customCompletionQueueCount;
46         static bool inlineHandlers;
47         static int batchContextPoolSharedCapacity = DefaultBatchContextPoolSharedCapacity;
48         static int batchContextPoolThreadLocalCapacity = DefaultBatchContextPoolThreadLocalCapacity;
49         static int requestCallContextPoolSharedCapacity = DefaultRequestCallContextPoolSharedCapacity;
50         static int requestCallContextPoolThreadLocalCapacity = DefaultRequestCallContextPoolThreadLocalCapacity;
51         static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
52         static readonly HashSet<Server> registeredServers = new HashSet<Server>();
53         static readonly AtomicCounter nativeInitCounter = new AtomicCounter();
54 
55         static ILogger logger = new LogLevelFilterLogger(new ConsoleLogger(), LogLevel.Off, true);
56 
57         readonly IObjectPool<BatchContextSafeHandle> batchContextPool;
58         readonly IObjectPool<RequestCallContextSafeHandle> requestCallContextPool;
59         readonly GrpcThreadPool threadPool;
60         readonly DebugStats debugStats = new DebugStats();
61         readonly AtomicCounter cqPickerCounter = new AtomicCounter();
62 
63         bool isShutdown;
64 
65         /// <summary>
66         /// Returns a reference-counted instance of initialized gRPC environment.
67         /// Subsequent invocations return the same instance unless reference count has dropped to zero previously.
68         /// </summary>
AddRef()69         internal static GrpcEnvironment AddRef()
70         {
71             ShutdownHooks.Register();
72 
73             lock (staticLock)
74             {
75                 refCount++;
76                 if (instance == null)
77                 {
78                     instance = new GrpcEnvironment();
79                 }
80                 return instance;
81             }
82         }
83 
84         /// <summary>
85         /// Decrements the reference count for currently active environment and asynchronously shuts down the gRPC environment if reference count drops to zero.
86         /// </summary>
ReleaseAsync()87         internal static async Task ReleaseAsync()
88         {
89             GrpcEnvironment instanceToShutdown = null;
90             lock (staticLock)
91             {
92                 GrpcPreconditions.CheckState(refCount > 0);
93                 refCount--;
94                 if (refCount == 0)
95                 {
96                     instanceToShutdown = instance;
97                     instance = null;
98                 }
99             }
100 
101             if (instanceToShutdown != null)
102             {
103                 await instanceToShutdown.ShutdownAsync().ConfigureAwait(false);
104             }
105         }
106 
GetRefCount()107         internal static int GetRefCount()
108         {
109             lock (staticLock)
110             {
111                 return refCount;
112             }
113         }
114 
RegisterChannel(Channel channel)115         internal static void RegisterChannel(Channel channel)
116         {
117             lock (staticLock)
118             {
119                 GrpcPreconditions.CheckNotNull(channel);
120                 registeredChannels.Add(channel);
121             }
122         }
123 
UnregisterChannel(Channel channel)124         internal static void UnregisterChannel(Channel channel)
125         {
126             lock (staticLock)
127             {
128                 GrpcPreconditions.CheckNotNull(channel);
129                 GrpcPreconditions.CheckArgument(registeredChannels.Remove(channel), "Channel not found in the registered channels set.");
130             }
131         }
132 
RegisterServer(Server server)133         internal static void RegisterServer(Server server)
134         {
135             lock (staticLock)
136             {
137                 GrpcPreconditions.CheckNotNull(server);
138                 registeredServers.Add(server);
139             }
140         }
141 
UnregisterServer(Server server)142         internal static void UnregisterServer(Server server)
143         {
144             lock (staticLock)
145             {
146                 GrpcPreconditions.CheckNotNull(server);
147                 GrpcPreconditions.CheckArgument(registeredServers.Remove(server), "Server not found in the registered servers set.");
148             }
149         }
150 
151         /// <summary>
152         /// Requests shutdown of all channels created by the current process.
153         /// </summary>
ShutdownChannelsAsync()154         public static Task ShutdownChannelsAsync()
155         {
156             HashSet<Channel> snapshot = null;
157             lock (staticLock)
158             {
159                 snapshot = new HashSet<Channel>(registeredChannels);
160             }
161             return Task.WhenAll(snapshot.Select((channel) => channel.ShutdownAsync()));
162         }
163 
164         /// <summary>
165         /// Requests immediate shutdown of all servers created by the current process.
166         /// </summary>
KillServersAsync()167         public static Task KillServersAsync()
168         {
169             HashSet<Server> snapshot = null;
170             lock (staticLock)
171             {
172                 snapshot = new HashSet<Server>(registeredServers);
173             }
174             return Task.WhenAll(snapshot.Select((server) => server.KillAsync()));
175         }
176 
177         /// <summary>
178         /// Gets application-wide logger used by gRPC.
179         /// </summary>
180         /// <value>The logger.</value>
181         public static ILogger Logger
182         {
183             get
184             {
185                 return logger;
186             }
187         }
188 
189         /// <summary>
190         /// Sets the application-wide logger that should be used by gRPC.
191         /// </summary>
SetLogger(ILogger customLogger)192         public static void SetLogger(ILogger customLogger)
193         {
194             GrpcPreconditions.CheckNotNull(customLogger, "customLogger");
195             logger = customLogger;
196         }
197 
198         /// <summary>
199         /// Sets the number of threads in the gRPC thread pool that polls for internal RPC events.
200         /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
201         /// Setting thread pool size is an advanced setting and you should only use it if you know what you are doing.
202         /// Most users should rely on the default value provided by gRPC library.
203         /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
204         /// </summary>
SetThreadPoolSize(int threadCount)205         public static void SetThreadPoolSize(int threadCount)
206         {
207             lock (staticLock)
208             {
209                 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
210                 GrpcPreconditions.CheckArgument(threadCount > 0, "threadCount needs to be a positive number");
211                 customThreadPoolSize = threadCount;
212             }
213         }
214 
215         /// <summary>
216         /// Sets the number of completion queues in the  gRPC thread pool that polls for internal RPC events.
217         /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
218         /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing.
219         /// Most users should rely on the default value provided by gRPC library.
220         /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
221         /// </summary>
SetCompletionQueueCount(int completionQueueCount)222         public static void SetCompletionQueueCount(int completionQueueCount)
223         {
224             lock (staticLock)
225             {
226                 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
227                 GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number");
228                 customCompletionQueueCount = completionQueueCount;
229             }
230         }
231 
232         /// <summary>
233         /// By default, gRPC's internal event handlers get offloaded to .NET default thread pool thread (<c>inlineHandlers=false</c>).
234         /// Setting <c>inlineHandlers</c> to <c>true</c> will allow scheduling the event handlers directly to
235         /// <c>GrpcThreadPool</c> internal threads. That can lead to significant performance gains in some situations,
236         /// but requires user to never block in async code (incorrectly written code can easily lead to deadlocks).
237         /// Inlining handlers is an advanced setting and you should only use it if you know what you are doing.
238         /// Most users should rely on the default value provided by gRPC library.
239         /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
240         /// Note: <c>inlineHandlers=true</c> was the default in gRPC C# v1.4.x and earlier.
241         /// </summary>
SetHandlerInlining(bool inlineHandlers)242         public static void SetHandlerInlining(bool inlineHandlers)
243         {
244             lock (staticLock)
245             {
246                 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
247                 GrpcEnvironment.inlineHandlers = inlineHandlers;
248             }
249         }
250 
251         /// <summary>
252         /// Sets the parameters for a pool that caches batch context instances. Reusing batch context instances
253         /// instead of creating a new one for every C core operation helps reducing the GC pressure.
254         /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
255         /// This is an advanced setting and you should only use it if you know what you are doing.
256         /// Most users should rely on the default value provided by gRPC library.
257         /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
258         /// </summary>
SetBatchContextPoolParams(int sharedCapacity, int threadLocalCapacity)259         public static void SetBatchContextPoolParams(int sharedCapacity, int threadLocalCapacity)
260         {
261             lock (staticLock)
262             {
263                 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
264                 GrpcPreconditions.CheckArgument(sharedCapacity >= 0, "Shared capacity needs to be a non-negative number");
265                 GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0, "Thread local capacity needs to be a non-negative number");
266                 batchContextPoolSharedCapacity = sharedCapacity;
267                 batchContextPoolThreadLocalCapacity = threadLocalCapacity;
268             }
269         }
270 
271         /// <summary>
272         /// Sets the parameters for a pool that caches request call context instances. Reusing request call context instances
273         /// instead of creating a new one for every requested call in C core helps reducing the GC pressure.
274         /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
275         /// This is an advanced setting and you should only use it if you know what you are doing.
276         /// Most users should rely on the default value provided by gRPC library.
277         /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
278         /// </summary>
SetRequestCallContextPoolParams(int sharedCapacity, int threadLocalCapacity)279         public static void SetRequestCallContextPoolParams(int sharedCapacity, int threadLocalCapacity)
280         {
281             lock (staticLock)
282             {
283                 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
284                 GrpcPreconditions.CheckArgument(sharedCapacity >= 0, "Shared capacity needs to be a non-negative number");
285                 GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0, "Thread local capacity needs to be a non-negative number");
286                 requestCallContextPoolSharedCapacity = sharedCapacity;
287                 requestCallContextPoolThreadLocalCapacity = threadLocalCapacity;
288             }
289         }
290 
291         /// <summary>
292         /// Occurs when <c>GrpcEnvironment</c> is about the start the shutdown logic.
293         /// If <c>GrpcEnvironment</c> is later initialized and shutdown, the event will be fired again (unless unregistered first).
294         /// </summary>
295         public static event EventHandler ShuttingDown;
296 
297         /// <summary>
298         /// Creates gRPC environment.
299         /// </summary>
GrpcEnvironment()300         private GrpcEnvironment()
301         {
302             GrpcNativeInit();
303             batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity);
304             requestCallContextPool = new DefaultObjectPool<RequestCallContextSafeHandle>(() => RequestCallContextSafeHandle.Create(), requestCallContextPoolSharedCapacity, requestCallContextPoolThreadLocalCapacity);
305             threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers);
306             threadPool.Start();
307         }
308 
309         /// <summary>
310         /// Gets the completion queues used by this gRPC environment.
311         /// </summary>
312         internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
313         {
314             get
315             {
316                 return this.threadPool.CompletionQueues;
317             }
318         }
319 
320         internal IObjectPool<BatchContextSafeHandle> BatchContextPool => batchContextPool;
321 
322         internal IObjectPool<RequestCallContextSafeHandle> RequestCallContextPool => requestCallContextPool;
323 
324         internal bool IsAlive
325         {
326             get
327             {
328                 return this.threadPool.IsAlive;
329             }
330         }
331 
332         /// <summary>
333         /// Picks a completion queue in a round-robin fashion.
334         /// Shouldn't be invoked on a per-call basis (used at per-channel basis).
335         /// </summary>
PickCompletionQueue()336         internal CompletionQueueSafeHandle PickCompletionQueue()
337         {
338             var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count);
339             return this.threadPool.CompletionQueues.ElementAt(cqIndex);
340         }
341 
342         /// <summary>
343         /// Gets the completion queue used by this gRPC environment.
344         /// </summary>
345         internal DebugStats DebugStats
346         {
347             get
348             {
349                 return this.debugStats;
350             }
351         }
352 
353         /// <summary>
354         /// Gets version of gRPC C core.
355         /// </summary>
GetCoreVersionString()356         internal static string GetCoreVersionString()
357         {
358             var ptr = NativeMethods.Get().grpcsharp_version_string();  // the pointer is not owned
359             return Marshal.PtrToStringAnsi(ptr);
360         }
361 
GrpcNativeInit()362         internal static void GrpcNativeInit()
363         {
364             if (!IsNativeShutdownAllowed && nativeInitCounter.Count > 0)
365             {
366                 // Normally grpc_init and grpc_shutdown calls should come in pairs (C core does reference counting),
367                 // but in case we avoid grpc_shutdown calls altogether, calling grpc_init has no effect
368                 // besides incrementing an internal C core counter that could theoretically overflow.
369                 // To avoid this theoretical possibility we guard repeated calls to grpc_init()
370                 // with a 64-bit atomic counter (that can't realistically overflow).
371                 return;
372             }
373             NativeMethods.Get().grpcsharp_init();
374             nativeInitCounter.Increment();
375         }
376 
GrpcNativeShutdown()377         internal static void GrpcNativeShutdown()
378         {
379             if (IsNativeShutdownAllowed)
380             {
381                 NativeMethods.Get().grpcsharp_shutdown();
382             }
383         }
384 
385         /// <summary>
386         /// Shuts down this environment.
387         /// </summary>
ShutdownAsync()388         private async Task ShutdownAsync()
389         {
390             if (isShutdown)
391             {
392                 throw new InvalidOperationException("ShutdownAsync has already been called");
393             }
394 
395             await Task.Run(() => ShuttingDown?.Invoke(this, null)).ConfigureAwait(false);
396 
397             await threadPool.StopAsync().ConfigureAwait(false);
398             requestCallContextPool.Dispose();
399             batchContextPool.Dispose();
400             GrpcNativeShutdown();
401             isShutdown = true;
402 
403             debugStats.CheckOK();
404         }
405 
GetThreadPoolSizeOrDefault()406         private int GetThreadPoolSizeOrDefault()
407         {
408             if (customThreadPoolSize.HasValue)
409             {
410                 return customThreadPoolSize.Value;
411             }
412             // In systems with many cores, use half of the cores for GrpcThreadPool
413             // and the other half for .NET thread pool. This heuristic definitely needs
414             // more work, but seems to work reasonably well for a start.
415             return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2);
416         }
417 
GetCompletionQueueCountOrDefault()418         private int GetCompletionQueueCountOrDefault()
419         {
420             if (customCompletionQueueCount.HasValue)
421             {
422                 return customCompletionQueueCount.Value;
423             }
424             // by default, create a completion queue for each thread
425             return GetThreadPoolSizeOrDefault();
426         }
427 
428         // On some platforms (specifically iOS), thread local variables in native code
429         // require initialization/destruction. By skipping the grpc_shutdown() call,
430         // we avoid a potential crash where grpc_shutdown() has already destroyed
431         // the thread local variables, but some C core's *_destroy() methods still
432         // need to run (e.g. they may be run by finalizer thread which is out of our control)
433         // For more context, see https://github.com/grpc/grpc/issues/16294
434         private static bool IsNativeShutdownAllowed => !PlatformApis.IsXamarinIOS && !PlatformApis.IsUnityIOS;
435 
436         private static class ShutdownHooks
437         {
438             static object staticLock = new object();
439             static bool hooksRegistered;
440 
Register()441             public static void Register()
442             {
443                 lock (staticLock)
444                 {
445                     if (!hooksRegistered)
446                     {
447                         // Under normal circumstances, the user is expected to shutdown all
448                         // the gRPC channels and servers before the application exits. The following
449                         // hooks provide some extra handling for cases when this is not the case,
450                         // in the effort to achieve a reasonable behavior on shutdown.
451 #if NETSTANDARD
452                         // No action required at shutdown on .NET Core
453                         // - In-progress P/Invoke calls (such as grpc_completion_queue_next) don't seem
454                         //   to prevent a .NET core application from terminating, so no special handling
455                         //   is needed.
456                         // - .NET core doesn't run finalizers on shutdown, so there's no risk of getting
457                         //   a crash because grpc_*_destroy methods for native objects being invoked
458                         //   in wrong order.
459                         // TODO(jtattermusch): Verify that the shutdown hooks are still not needed
460                         // once we add support for new platforms using netstandard (e.g. Xamarin).
461 #else
462                         // On desktop .NET framework and Mono, we need to register for a shutdown
463                         // event to explicitly shutdown the GrpcEnvironment.
464                         // - On Desktop .NET framework, we need to do a proper shutdown to prevent a crash
465                         //   when the framework attempts to run the finalizers for SafeHandle object representing the native
466                         //   grpc objects. The finalizers calls the native grpc_*_destroy methods (e.g. grpc_server_destroy)
467                         //   in a random order, which is not supported by gRPC.
468                         // - On Mono, the process would hang as the GrpcThreadPool threads are sleeping
469                         //   in grpc_completion_queue_next P/Invoke invocation and mono won't let the
470                         //   process shutdown until the P/Invoke calls return. We achieve that by shutting down
471                         //   the completion queue(s) which associated with the GrpcThreadPool, which will
472                         //   cause the grpc_completion_queue_next calls to return immediately.
473                         AppDomain.CurrentDomain.ProcessExit += (sender, eventArgs) => { HandleShutdown(); };
474                         AppDomain.CurrentDomain.DomainUnload += (sender, eventArgs) => { HandleShutdown(); };
475 #endif
476                     }
477                     hooksRegistered = true;
478                 }
479             }
480 
481             /// <summary>
482             /// Handler for AppDomain.DomainUnload, AppDomain.ProcessExit and AssemblyLoadContext.Unloading hooks.
483             /// </summary>
HandleShutdown()484             private static void HandleShutdown()
485             {
486                 Task.WaitAll(GrpcEnvironment.ShutdownChannelsAsync(), GrpcEnvironment.KillServersAsync());
487             }
488         }
489     }
490 }
491