1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Collections; 21 using System.Collections.Generic; 22 using System.IO; 23 using System.Linq; 24 using System.Threading.Tasks; 25 using Grpc.Core.Internal; 26 using Grpc.Core.Logging; 27 using Grpc.Core.Utils; 28 29 namespace Grpc.Core 30 { 31 /// <summary> 32 /// gRPC server. A single server can serve an arbitrary number of services and can listen on more than one port. 33 /// </summary> 34 public class Server 35 { 36 const int DefaultRequestCallTokensPerCq = 2000; 37 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); 38 39 readonly AtomicCounter activeCallCounter = new AtomicCounter(); 40 41 readonly ServiceDefinitionCollection serviceDefinitions; 42 readonly ServerPortCollection ports; 43 readonly GrpcEnvironment environment; 44 readonly List<ChannelOption> options; 45 readonly ServerSafeHandle handle; 46 readonly object myLock = new object(); 47 48 readonly List<ServerServiceDefinition> serviceDefinitionsList = new List<ServerServiceDefinition>(); 49 readonly List<ServerPort> serverPortList = new List<ServerPort>(); 50 readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>(); 51 readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>(); 52 53 bool startRequested; 54 volatile bool shutdownRequested; 55 int requestCallTokensPerCq = DefaultRequestCallTokensPerCq; 56 57 /// <summary> 58 /// Creates a new server. 59 /// </summary> Server()60 public Server() : this(null) 61 { 62 } 63 64 /// <summary> 65 /// Creates a new server. 66 /// </summary> 67 /// <param name="options">Channel options.</param> Server(IEnumerable<ChannelOption> options)68 public Server(IEnumerable<ChannelOption> options) 69 { 70 this.serviceDefinitions = new ServiceDefinitionCollection(this); 71 this.ports = new ServerPortCollection(this); 72 this.environment = GrpcEnvironment.AddRef(); 73 this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); 74 using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) 75 { 76 this.handle = ServerSafeHandle.NewServer(channelArgs); 77 } 78 79 foreach (var cq in environment.CompletionQueues) 80 { 81 this.handle.RegisterCompletionQueue(cq); 82 } 83 GrpcEnvironment.RegisterServer(this); 84 } 85 86 /// <summary> 87 /// Services that will be exported by the server once started. Register a service with this 88 /// server by adding its definition to this collection. 89 /// </summary> 90 public ServiceDefinitionCollection Services 91 { 92 get 93 { 94 return serviceDefinitions; 95 } 96 } 97 98 /// <summary> 99 /// Ports on which the server will listen once started. Register a port with this 100 /// server by adding its definition to this collection. 101 /// </summary> 102 public ServerPortCollection Ports 103 { 104 get 105 { 106 return ports; 107 } 108 } 109 110 /// <summary> 111 /// To allow awaiting termination of the server. 112 /// </summary> 113 public Task ShutdownTask 114 { 115 get 116 { 117 return shutdownTcs.Task; 118 } 119 } 120 121 /// <summary> 122 /// Experimental API. Might anytime change without prior notice. 123 /// Number or calls requested via grpc_server_request_call at any given time for each completion queue. 124 /// </summary> 125 public int RequestCallTokensPerCompletionQueue 126 { 127 get 128 { 129 return requestCallTokensPerCq; 130 } 131 set 132 { 133 lock (myLock) 134 { 135 GrpcPreconditions.CheckState(!startRequested); 136 GrpcPreconditions.CheckArgument(value > 0); 137 requestCallTokensPerCq = value; 138 } 139 } 140 } 141 142 /// <summary> 143 /// Starts the server. 144 /// Throws <c>IOException</c> if not successful. 145 /// </summary> Start()146 public void Start() 147 { 148 lock (myLock) 149 { 150 GrpcPreconditions.CheckState(!startRequested); 151 GrpcPreconditions.CheckState(!shutdownRequested); 152 startRequested = true; 153 154 CheckPortsBoundSuccessfully(); 155 handle.Start(); 156 157 for (int i = 0; i < requestCallTokensPerCq; i++) 158 { 159 foreach (var cq in environment.CompletionQueues) 160 { 161 AllowOneRpc(cq); 162 } 163 } 164 } 165 } 166 167 /// <summary> 168 /// Requests server shutdown and when there are no more calls being serviced, 169 /// cleans up used resources. The returned task finishes when shutdown procedure 170 /// is complete. 171 /// </summary> 172 /// <remarks> 173 /// It is strongly recommended to shutdown all previously created servers before exiting from the process. 174 /// </remarks> ShutdownAsync()175 public Task ShutdownAsync() 176 { 177 return ShutdownInternalAsync(false); 178 } 179 180 /// <summary> 181 /// Requests server shutdown while cancelling all the in-progress calls. 182 /// The returned task finishes when shutdown procedure is complete. 183 /// </summary> 184 /// <remarks> 185 /// It is strongly recommended to shutdown all previously created servers before exiting from the process. 186 /// </remarks> KillAsync()187 public Task KillAsync() 188 { 189 return ShutdownInternalAsync(true); 190 } 191 AddCallReference(object call)192 internal void AddCallReference(object call) 193 { 194 activeCallCounter.Increment(); 195 196 bool success = false; 197 handle.DangerousAddRef(ref success); 198 GrpcPreconditions.CheckState(success); 199 } 200 RemoveCallReference(object call)201 internal void RemoveCallReference(object call) 202 { 203 handle.DangerousRelease(); 204 activeCallCounter.Decrement(); 205 } 206 207 /// <summary> 208 /// Shuts down the server. 209 /// </summary> ShutdownInternalAsync(bool kill)210 private async Task ShutdownInternalAsync(bool kill) 211 { 212 lock (myLock) 213 { 214 GrpcPreconditions.CheckState(!shutdownRequested); 215 shutdownRequested = true; 216 } 217 GrpcEnvironment.UnregisterServer(this); 218 219 var cq = environment.CompletionQueues.First(); // any cq will do 220 handle.ShutdownAndNotify(HandleServerShutdown, cq); 221 if (kill) 222 { 223 handle.CancelAllCalls(); 224 } 225 await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false); 226 227 DisposeHandle(); 228 229 await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false); 230 } 231 232 /// <summary> 233 /// In case the environment's threadpool becomes dead, the shutdown completion will 234 /// never be delivered, but we need to release the environment's handle anyway. 235 /// </summary> ShutdownCompleteOrEnvironmentDeadAsync()236 private async Task ShutdownCompleteOrEnvironmentDeadAsync() 237 { 238 while (true) 239 { 240 var task = await Task.WhenAny(shutdownTcs.Task, Task.Delay(20)).ConfigureAwait(false); 241 if (shutdownTcs.Task == task) 242 { 243 return; 244 } 245 if (!environment.IsAlive) 246 { 247 return; 248 } 249 } 250 } 251 252 /// <summary> 253 /// Adds a service definition. 254 /// </summary> AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition)255 private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition) 256 { 257 lock (myLock) 258 { 259 GrpcPreconditions.CheckState(!startRequested); 260 foreach (var entry in serviceDefinition.CallHandlers) 261 { 262 callHandlers.Add(entry.Key, entry.Value); 263 } 264 serviceDefinitionsList.Add(serviceDefinition); 265 } 266 } 267 268 /// <summary> 269 /// Adds a listening port. 270 /// </summary> AddPortInternal(ServerPort serverPort)271 private int AddPortInternal(ServerPort serverPort) 272 { 273 lock (myLock) 274 { 275 GrpcPreconditions.CheckNotNull(serverPort.Credentials, "serverPort"); 276 GrpcPreconditions.CheckState(!startRequested); 277 var address = string.Format("{0}:{1}", serverPort.Host, serverPort.Port); 278 int boundPort; 279 using (var nativeCredentials = serverPort.Credentials.ToNativeCredentials()) 280 { 281 if (nativeCredentials != null) 282 { 283 boundPort = handle.AddSecurePort(address, nativeCredentials); 284 } 285 else 286 { 287 boundPort = handle.AddInsecurePort(address); 288 } 289 } 290 var newServerPort = new ServerPort(serverPort, boundPort); 291 this.serverPortList.Add(newServerPort); 292 return boundPort; 293 } 294 } 295 296 /// <summary> 297 /// Allows one new RPC call to be received by server. 298 /// </summary> AllowOneRpc(CompletionQueueSafeHandle cq)299 private void AllowOneRpc(CompletionQueueSafeHandle cq) 300 { 301 if (!shutdownRequested) 302 { 303 // TODO(jtattermusch): avoid unnecessary delegate allocation 304 handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq); 305 } 306 } 307 308 /// <summary> 309 /// Checks that all ports have been bound successfully. 310 /// </summary> CheckPortsBoundSuccessfully()311 private void CheckPortsBoundSuccessfully() 312 { 313 lock (myLock) 314 { 315 var unboundPort = ports.FirstOrDefault(port => port.BoundPort == 0); 316 if (unboundPort != null) 317 { 318 throw new IOException( 319 string.Format("Failed to bind port \"{0}:{1}\"", unboundPort.Host, unboundPort.Port)); 320 } 321 } 322 } 323 DisposeHandle()324 private void DisposeHandle() 325 { 326 var activeCallCount = activeCallCounter.Count; 327 if (activeCallCount > 0) 328 { 329 Logger.Warning("Server shutdown has finished but there are still {0} active calls for that server.", activeCallCount); 330 } 331 handle.Dispose(); 332 } 333 334 /// <summary> 335 /// Selects corresponding handler for given call and handles the call. 336 /// </summary> HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq, Action continuation)337 private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq, Action continuation) 338 { 339 try 340 { 341 IServerCallHandler callHandler; 342 if (!callHandlers.TryGetValue(newRpc.Method, out callHandler)) 343 { 344 callHandler = UnimplementedMethodCallHandler.Instance; 345 } 346 await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false); 347 } 348 catch (Exception e) 349 { 350 Logger.Warning(e, "Exception while handling RPC."); 351 } 352 finally 353 { 354 continuation(); 355 } 356 } 357 358 /// <summary> 359 /// Handles the native callback. 360 /// </summary> HandleNewServerRpc(bool success, RequestCallContextSafeHandle ctx, CompletionQueueSafeHandle cq)361 private void HandleNewServerRpc(bool success, RequestCallContextSafeHandle ctx, CompletionQueueSafeHandle cq) 362 { 363 bool nextRpcRequested = false; 364 if (success) 365 { 366 var newRpc = ctx.GetServerRpcNew(this); 367 368 // after server shutdown, the callback returns with null call 369 if (!newRpc.Call.IsInvalid) 370 { 371 nextRpcRequested = true; 372 373 // Start asynchronous handler for the call. 374 // Don't await, the continuations will run on gRPC thread pool once triggered 375 // by cq.Next(). 376 #pragma warning disable 4014 377 HandleCallAsync(newRpc, cq, () => AllowOneRpc(cq)); 378 #pragma warning restore 4014 379 } 380 } 381 382 if (!nextRpcRequested) 383 { 384 AllowOneRpc(cq); 385 } 386 } 387 388 /// <summary> 389 /// Handles native callback. 390 /// </summary> HandleServerShutdown(bool success, BatchContextSafeHandle ctx, object state)391 private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx, object state) 392 { 393 shutdownTcs.SetResult(null); 394 } 395 396 /// <summary> 397 /// Collection of service definitions. 398 /// </summary> 399 public class ServiceDefinitionCollection : IEnumerable<ServerServiceDefinition> 400 { 401 readonly Server server; 402 ServiceDefinitionCollection(Server server)403 internal ServiceDefinitionCollection(Server server) 404 { 405 this.server = server; 406 } 407 408 /// <summary> 409 /// Adds a service definition to the server. This is how you register 410 /// handlers for a service with the server. Only call this before Start(). 411 /// </summary> Add(ServerServiceDefinition serviceDefinition)412 public void Add(ServerServiceDefinition serviceDefinition) 413 { 414 server.AddServiceDefinitionInternal(serviceDefinition); 415 } 416 417 /// <summary> 418 /// Gets enumerator for this collection. 419 /// </summary> GetEnumerator()420 public IEnumerator<ServerServiceDefinition> GetEnumerator() 421 { 422 return server.serviceDefinitionsList.GetEnumerator(); 423 } 424 IEnumerable.GetEnumerator()425 IEnumerator IEnumerable.GetEnumerator() 426 { 427 return server.serviceDefinitionsList.GetEnumerator(); 428 } 429 } 430 431 /// <summary> 432 /// Collection of server ports. 433 /// </summary> 434 public class ServerPortCollection : IEnumerable<ServerPort> 435 { 436 readonly Server server; 437 ServerPortCollection(Server server)438 internal ServerPortCollection(Server server) 439 { 440 this.server = server; 441 } 442 443 /// <summary> 444 /// Adds a new port on which server should listen. 445 /// Only call this before Start(). 446 /// <returns>The port on which server will be listening.</returns> 447 /// </summary> Add(ServerPort serverPort)448 public int Add(ServerPort serverPort) 449 { 450 return server.AddPortInternal(serverPort); 451 } 452 453 /// <summary> 454 /// Adds a new port on which server should listen. 455 /// <returns>The port on which server will be listening.</returns> 456 /// </summary> 457 /// <param name="host">the host</param> 458 /// <param name="port">the port. If zero, an unused port is chosen automatically.</param> 459 /// <param name="credentials">credentials to use to secure this port.</param> Add(string host, int port, ServerCredentials credentials)460 public int Add(string host, int port, ServerCredentials credentials) 461 { 462 return Add(new ServerPort(host, port, credentials)); 463 } 464 465 /// <summary> 466 /// Gets enumerator for this collection. 467 /// </summary> GetEnumerator()468 public IEnumerator<ServerPort> GetEnumerator() 469 { 470 return server.serverPortList.GetEnumerator(); 471 } 472 IEnumerable.GetEnumerator()473 IEnumerator IEnumerable.GetEnumerator() 474 { 475 return server.serverPortList.GetEnumerator(); 476 } 477 } 478 } 479 } 480