1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Collections.Generic; 21 using System.Linq; 22 using System.Runtime.InteropServices; 23 using System.Threading.Tasks; 24 using Grpc.Core.Internal; 25 using Grpc.Core.Logging; 26 using Grpc.Core.Utils; 27 28 namespace Grpc.Core 29 { 30 /// <summary> 31 /// Encapsulates initialization and shutdown of gRPC library. 32 /// </summary> 33 public class GrpcEnvironment 34 { 35 const int MinDefaultThreadPoolSize = 4; 36 const int DefaultBatchContextPoolSharedCapacity = 10000; 37 const int DefaultBatchContextPoolThreadLocalCapacity = 64; 38 const int DefaultRequestCallContextPoolSharedCapacity = 10000; 39 const int DefaultRequestCallContextPoolThreadLocalCapacity = 64; 40 41 static object staticLock = new object(); 42 static GrpcEnvironment instance; 43 static int refCount; 44 static int? customThreadPoolSize; 45 static int? customCompletionQueueCount; 46 static bool inlineHandlers; 47 static int batchContextPoolSharedCapacity = DefaultBatchContextPoolSharedCapacity; 48 static int batchContextPoolThreadLocalCapacity = DefaultBatchContextPoolThreadLocalCapacity; 49 static int requestCallContextPoolSharedCapacity = DefaultRequestCallContextPoolSharedCapacity; 50 static int requestCallContextPoolThreadLocalCapacity = DefaultRequestCallContextPoolThreadLocalCapacity; 51 static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>(); 52 static readonly HashSet<Server> registeredServers = new HashSet<Server>(); 53 static readonly AtomicCounter nativeInitCounter = new AtomicCounter(); 54 55 static ILogger logger = new LogLevelFilterLogger(new ConsoleLogger(), LogLevel.Off, true); 56 57 readonly IObjectPool<BatchContextSafeHandle> batchContextPool; 58 readonly IObjectPool<RequestCallContextSafeHandle> requestCallContextPool; 59 readonly GrpcThreadPool threadPool; 60 readonly DebugStats debugStats = new DebugStats(); 61 readonly AtomicCounter cqPickerCounter = new AtomicCounter(); 62 63 bool isShutdown; 64 65 /// <summary> 66 /// Returns a reference-counted instance of initialized gRPC environment. 67 /// Subsequent invocations return the same instance unless reference count has dropped to zero previously. 68 /// </summary> AddRef()69 internal static GrpcEnvironment AddRef() 70 { 71 ShutdownHooks.Register(); 72 73 lock (staticLock) 74 { 75 refCount++; 76 if (instance == null) 77 { 78 instance = new GrpcEnvironment(); 79 } 80 return instance; 81 } 82 } 83 84 /// <summary> 85 /// Decrements the reference count for currently active environment and asynchronously shuts down the gRPC environment if reference count drops to zero. 86 /// </summary> ReleaseAsync()87 internal static async Task ReleaseAsync() 88 { 89 GrpcEnvironment instanceToShutdown = null; 90 lock (staticLock) 91 { 92 GrpcPreconditions.CheckState(refCount > 0); 93 refCount--; 94 if (refCount == 0) 95 { 96 instanceToShutdown = instance; 97 instance = null; 98 } 99 } 100 101 if (instanceToShutdown != null) 102 { 103 await instanceToShutdown.ShutdownAsync().ConfigureAwait(false); 104 } 105 } 106 GetRefCount()107 internal static int GetRefCount() 108 { 109 lock (staticLock) 110 { 111 return refCount; 112 } 113 } 114 RegisterChannel(Channel channel)115 internal static void RegisterChannel(Channel channel) 116 { 117 lock (staticLock) 118 { 119 GrpcPreconditions.CheckNotNull(channel); 120 registeredChannels.Add(channel); 121 } 122 } 123 UnregisterChannel(Channel channel)124 internal static void UnregisterChannel(Channel channel) 125 { 126 lock (staticLock) 127 { 128 GrpcPreconditions.CheckNotNull(channel); 129 GrpcPreconditions.CheckArgument(registeredChannels.Remove(channel), "Channel not found in the registered channels set."); 130 } 131 } 132 RegisterServer(Server server)133 internal static void RegisterServer(Server server) 134 { 135 lock (staticLock) 136 { 137 GrpcPreconditions.CheckNotNull(server); 138 registeredServers.Add(server); 139 } 140 } 141 UnregisterServer(Server server)142 internal static void UnregisterServer(Server server) 143 { 144 lock (staticLock) 145 { 146 GrpcPreconditions.CheckNotNull(server); 147 GrpcPreconditions.CheckArgument(registeredServers.Remove(server), "Server not found in the registered servers set."); 148 } 149 } 150 151 /// <summary> 152 /// Requests shutdown of all channels created by the current process. 153 /// </summary> ShutdownChannelsAsync()154 public static Task ShutdownChannelsAsync() 155 { 156 HashSet<Channel> snapshot = null; 157 lock (staticLock) 158 { 159 snapshot = new HashSet<Channel>(registeredChannels); 160 } 161 return Task.WhenAll(snapshot.Select((channel) => channel.ShutdownAsync())); 162 } 163 164 /// <summary> 165 /// Requests immediate shutdown of all servers created by the current process. 166 /// </summary> KillServersAsync()167 public static Task KillServersAsync() 168 { 169 HashSet<Server> snapshot = null; 170 lock (staticLock) 171 { 172 snapshot = new HashSet<Server>(registeredServers); 173 } 174 return Task.WhenAll(snapshot.Select((server) => server.KillAsync())); 175 } 176 177 /// <summary> 178 /// Gets application-wide logger used by gRPC. 179 /// </summary> 180 /// <value>The logger.</value> 181 public static ILogger Logger 182 { 183 get 184 { 185 return logger; 186 } 187 } 188 189 /// <summary> 190 /// Sets the application-wide logger that should be used by gRPC. 191 /// </summary> SetLogger(ILogger customLogger)192 public static void SetLogger(ILogger customLogger) 193 { 194 GrpcPreconditions.CheckNotNull(customLogger, "customLogger"); 195 logger = customLogger; 196 } 197 198 /// <summary> 199 /// Sets the number of threads in the gRPC thread pool that polls for internal RPC events. 200 /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. 201 /// Setting thread pool size is an advanced setting and you should only use it if you know what you are doing. 202 /// Most users should rely on the default value provided by gRPC library. 203 /// Note: this method is part of an experimental API that can change or be removed without any prior notice. 204 /// </summary> SetThreadPoolSize(int threadCount)205 public static void SetThreadPoolSize(int threadCount) 206 { 207 lock (staticLock) 208 { 209 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); 210 GrpcPreconditions.CheckArgument(threadCount > 0, "threadCount needs to be a positive number"); 211 customThreadPoolSize = threadCount; 212 } 213 } 214 215 /// <summary> 216 /// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events. 217 /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. 218 /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing. 219 /// Most users should rely on the default value provided by gRPC library. 220 /// Note: this method is part of an experimental API that can change or be removed without any prior notice. 221 /// </summary> SetCompletionQueueCount(int completionQueueCount)222 public static void SetCompletionQueueCount(int completionQueueCount) 223 { 224 lock (staticLock) 225 { 226 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); 227 GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number"); 228 customCompletionQueueCount = completionQueueCount; 229 } 230 } 231 232 /// <summary> 233 /// By default, gRPC's internal event handlers get offloaded to .NET default thread pool thread (<c>inlineHandlers=false</c>). 234 /// Setting <c>inlineHandlers</c> to <c>true</c> will allow scheduling the event handlers directly to 235 /// <c>GrpcThreadPool</c> internal threads. That can lead to significant performance gains in some situations, 236 /// but requires user to never block in async code (incorrectly written code can easily lead to deadlocks). 237 /// Inlining handlers is an advanced setting and you should only use it if you know what you are doing. 238 /// Most users should rely on the default value provided by gRPC library. 239 /// Note: this method is part of an experimental API that can change or be removed without any prior notice. 240 /// Note: <c>inlineHandlers=true</c> was the default in gRPC C# v1.4.x and earlier. 241 /// </summary> SetHandlerInlining(bool inlineHandlers)242 public static void SetHandlerInlining(bool inlineHandlers) 243 { 244 lock (staticLock) 245 { 246 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); 247 GrpcEnvironment.inlineHandlers = inlineHandlers; 248 } 249 } 250 251 /// <summary> 252 /// Sets the parameters for a pool that caches batch context instances. Reusing batch context instances 253 /// instead of creating a new one for every C core operation helps reducing the GC pressure. 254 /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. 255 /// This is an advanced setting and you should only use it if you know what you are doing. 256 /// Most users should rely on the default value provided by gRPC library. 257 /// Note: this method is part of an experimental API that can change or be removed without any prior notice. 258 /// </summary> SetBatchContextPoolParams(int sharedCapacity, int threadLocalCapacity)259 public static void SetBatchContextPoolParams(int sharedCapacity, int threadLocalCapacity) 260 { 261 lock (staticLock) 262 { 263 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); 264 GrpcPreconditions.CheckArgument(sharedCapacity >= 0, "Shared capacity needs to be a non-negative number"); 265 GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0, "Thread local capacity needs to be a non-negative number"); 266 batchContextPoolSharedCapacity = sharedCapacity; 267 batchContextPoolThreadLocalCapacity = threadLocalCapacity; 268 } 269 } 270 271 /// <summary> 272 /// Sets the parameters for a pool that caches request call context instances. Reusing request call context instances 273 /// instead of creating a new one for every requested call in C core helps reducing the GC pressure. 274 /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. 275 /// This is an advanced setting and you should only use it if you know what you are doing. 276 /// Most users should rely on the default value provided by gRPC library. 277 /// Note: this method is part of an experimental API that can change or be removed without any prior notice. 278 /// </summary> SetRequestCallContextPoolParams(int sharedCapacity, int threadLocalCapacity)279 public static void SetRequestCallContextPoolParams(int sharedCapacity, int threadLocalCapacity) 280 { 281 lock (staticLock) 282 { 283 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); 284 GrpcPreconditions.CheckArgument(sharedCapacity >= 0, "Shared capacity needs to be a non-negative number"); 285 GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0, "Thread local capacity needs to be a non-negative number"); 286 requestCallContextPoolSharedCapacity = sharedCapacity; 287 requestCallContextPoolThreadLocalCapacity = threadLocalCapacity; 288 } 289 } 290 291 /// <summary> 292 /// Occurs when <c>GrpcEnvironment</c> is about the start the shutdown logic. 293 /// If <c>GrpcEnvironment</c> is later initialized and shutdown, the event will be fired again (unless unregistered first). 294 /// </summary> 295 public static event EventHandler ShuttingDown; 296 297 /// <summary> 298 /// Creates gRPC environment. 299 /// </summary> GrpcEnvironment()300 private GrpcEnvironment() 301 { 302 GrpcNativeInit(); 303 batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity); 304 requestCallContextPool = new DefaultObjectPool<RequestCallContextSafeHandle>(() => RequestCallContextSafeHandle.Create(), requestCallContextPoolSharedCapacity, requestCallContextPoolThreadLocalCapacity); 305 threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers); 306 threadPool.Start(); 307 } 308 309 /// <summary> 310 /// Gets the completion queues used by this gRPC environment. 311 /// </summary> 312 internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues 313 { 314 get 315 { 316 return this.threadPool.CompletionQueues; 317 } 318 } 319 320 internal IObjectPool<BatchContextSafeHandle> BatchContextPool => batchContextPool; 321 322 internal IObjectPool<RequestCallContextSafeHandle> RequestCallContextPool => requestCallContextPool; 323 324 internal bool IsAlive 325 { 326 get 327 { 328 return this.threadPool.IsAlive; 329 } 330 } 331 332 /// <summary> 333 /// Picks a completion queue in a round-robin fashion. 334 /// Shouldn't be invoked on a per-call basis (used at per-channel basis). 335 /// </summary> PickCompletionQueue()336 internal CompletionQueueSafeHandle PickCompletionQueue() 337 { 338 var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count); 339 return this.threadPool.CompletionQueues.ElementAt(cqIndex); 340 } 341 342 /// <summary> 343 /// Gets the completion queue used by this gRPC environment. 344 /// </summary> 345 internal DebugStats DebugStats 346 { 347 get 348 { 349 return this.debugStats; 350 } 351 } 352 353 /// <summary> 354 /// Gets version of gRPC C core. 355 /// </summary> GetCoreVersionString()356 internal static string GetCoreVersionString() 357 { 358 var ptr = NativeMethods.Get().grpcsharp_version_string(); // the pointer is not owned 359 return Marshal.PtrToStringAnsi(ptr); 360 } 361 GrpcNativeInit()362 internal static void GrpcNativeInit() 363 { 364 if (!IsNativeShutdownAllowed && nativeInitCounter.Count > 0) 365 { 366 // Normally grpc_init and grpc_shutdown calls should come in pairs (C core does reference counting), 367 // but in case we avoid grpc_shutdown calls altogether, calling grpc_init has no effect 368 // besides incrementing an internal C core counter that could theoretically overflow. 369 // To avoid this theoretical possibility we guard repeated calls to grpc_init() 370 // with a 64-bit atomic counter (that can't realistically overflow). 371 return; 372 } 373 NativeMethods.Get().grpcsharp_init(); 374 nativeInitCounter.Increment(); 375 } 376 GrpcNativeShutdown()377 internal static void GrpcNativeShutdown() 378 { 379 if (IsNativeShutdownAllowed) 380 { 381 NativeMethods.Get().grpcsharp_shutdown(); 382 } 383 } 384 385 /// <summary> 386 /// Shuts down this environment. 387 /// </summary> ShutdownAsync()388 private async Task ShutdownAsync() 389 { 390 if (isShutdown) 391 { 392 throw new InvalidOperationException("ShutdownAsync has already been called"); 393 } 394 395 await Task.Run(() => ShuttingDown?.Invoke(this, null)).ConfigureAwait(false); 396 397 await threadPool.StopAsync().ConfigureAwait(false); 398 requestCallContextPool.Dispose(); 399 batchContextPool.Dispose(); 400 GrpcNativeShutdown(); 401 isShutdown = true; 402 403 debugStats.CheckOK(); 404 } 405 GetThreadPoolSizeOrDefault()406 private int GetThreadPoolSizeOrDefault() 407 { 408 if (customThreadPoolSize.HasValue) 409 { 410 return customThreadPoolSize.Value; 411 } 412 // In systems with many cores, use half of the cores for GrpcThreadPool 413 // and the other half for .NET thread pool. This heuristic definitely needs 414 // more work, but seems to work reasonably well for a start. 415 return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2); 416 } 417 GetCompletionQueueCountOrDefault()418 private int GetCompletionQueueCountOrDefault() 419 { 420 if (customCompletionQueueCount.HasValue) 421 { 422 return customCompletionQueueCount.Value; 423 } 424 // by default, create a completion queue for each thread 425 return GetThreadPoolSizeOrDefault(); 426 } 427 428 // On some platforms (specifically iOS), thread local variables in native code 429 // require initialization/destruction. By skipping the grpc_shutdown() call, 430 // we avoid a potential crash where grpc_shutdown() has already destroyed 431 // the thread local variables, but some C core's *_destroy() methods still 432 // need to run (e.g. they may be run by finalizer thread which is out of our control) 433 // For more context, see https://github.com/grpc/grpc/issues/16294 434 private static bool IsNativeShutdownAllowed => !PlatformApis.IsXamarinIOS && !PlatformApis.IsUnityIOS; 435 436 private static class ShutdownHooks 437 { 438 static object staticLock = new object(); 439 static bool hooksRegistered; 440 Register()441 public static void Register() 442 { 443 lock (staticLock) 444 { 445 if (!hooksRegistered) 446 { 447 // Under normal circumstances, the user is expected to shutdown all 448 // the gRPC channels and servers before the application exits. The following 449 // hooks provide some extra handling for cases when this is not the case, 450 // in the effort to achieve a reasonable behavior on shutdown. 451 #if NETSTANDARD1_5 452 // No action required at shutdown on .NET Core 453 // - In-progress P/Invoke calls (such as grpc_completion_queue_next) don't seem 454 // to prevent a .NET core application from terminating, so no special handling 455 // is needed. 456 // - .NET core doesn't run finalizers on shutdown, so there's no risk of getting 457 // a crash because grpc_*_destroy methods for native objects being invoked 458 // in wrong order. 459 // TODO(jtattermusch): Verify that the shutdown hooks are still not needed 460 // once we add support for new platforms using netstandard (e.g. Xamarin). 461 #else 462 // On desktop .NET framework and Mono, we need to register for a shutdown 463 // event to explicitly shutdown the GrpcEnvironment. 464 // - On Desktop .NET framework, we need to do a proper shutdown to prevent a crash 465 // when the framework attempts to run the finalizers for SafeHandle object representing the native 466 // grpc objects. The finalizers calls the native grpc_*_destroy methods (e.g. grpc_server_destroy) 467 // in a random order, which is not supported by gRPC. 468 // - On Mono, the process would hang as the GrpcThreadPool threads are sleeping 469 // in grpc_completion_queue_next P/Invoke invocation and mono won't let the 470 // process shutdown until the P/Invoke calls return. We achieve that by shutting down 471 // the completion queue(s) which associated with the GrpcThreadPool, which will 472 // cause the grpc_completion_queue_next calls to return immediately. 473 AppDomain.CurrentDomain.ProcessExit += (sender, eventArgs) => { HandleShutdown(); }; 474 AppDomain.CurrentDomain.DomainUnload += (sender, eventArgs) => { HandleShutdown(); }; 475 #endif 476 } 477 hooksRegistered = true; 478 } 479 } 480 481 /// <summary> 482 /// Handler for AppDomain.DomainUnload, AppDomain.ProcessExit and AssemblyLoadContext.Unloading hooks. 483 /// </summary> HandleShutdown()484 private static void HandleShutdown() 485 { 486 Task.WaitAll(GrpcEnvironment.ShutdownChannelsAsync(), GrpcEnvironment.KillServersAsync()); 487 } 488 } 489 } 490 } 491