1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Collections; 21 using System.Collections.Generic; 22 using System.IO; 23 using System.Linq; 24 using System.Threading.Tasks; 25 using Grpc.Core.Internal; 26 using Grpc.Core.Logging; 27 using Grpc.Core.Utils; 28 29 namespace Grpc.Core 30 { 31 /// <summary> 32 /// gRPC server. A single server can serve an arbitrary number of services and can listen on more than one port. 33 /// </summary> 34 public class Server 35 { 36 const int DefaultRequestCallTokensPerCq = 2000; 37 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); 38 39 readonly AtomicCounter activeCallCounter = new AtomicCounter(); 40 41 readonly ServiceDefinitionCollection serviceDefinitions; 42 readonly ServerPortCollection ports; 43 readonly GrpcEnvironment environment; 44 readonly List<ChannelOption> options; 45 readonly ServerSafeHandle handle; 46 readonly object myLock = new object(); 47 48 readonly List<ServerServiceDefinition> serviceDefinitionsList = new List<ServerServiceDefinition>(); 49 readonly List<ServerPort> serverPortList = new List<ServerPort>(); 50 readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>(); 51 readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>(); 52 53 bool startRequested; 54 volatile bool shutdownRequested; 55 int requestCallTokensPerCq = DefaultRequestCallTokensPerCq; 56 57 /// <summary> 58 /// Creates a new server. 59 /// </summary> Server()60 public Server() : this(null) 61 { 62 } 63 64 /// <summary> 65 /// Creates a new server. 66 /// </summary> 67 /// <param name="options">Channel options.</param> Server(IEnumerable<ChannelOption> options)68 public Server(IEnumerable<ChannelOption> options) 69 { 70 this.serviceDefinitions = new ServiceDefinitionCollection(this); 71 this.ports = new ServerPortCollection(this); 72 this.environment = GrpcEnvironment.AddRef(); 73 this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); 74 using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) 75 { 76 this.handle = ServerSafeHandle.NewServer(channelArgs); 77 } 78 79 foreach (var cq in environment.CompletionQueues) 80 { 81 this.handle.RegisterCompletionQueue(cq); 82 } 83 GrpcEnvironment.RegisterServer(this); 84 } 85 86 /// <summary> 87 /// Services that will be exported by the server once started. Register a service with this 88 /// server by adding its definition to this collection. 89 /// </summary> 90 public ServiceDefinitionCollection Services 91 { 92 get 93 { 94 return serviceDefinitions; 95 } 96 } 97 98 /// <summary> 99 /// Ports on which the server will listen once started. Register a port with this 100 /// server by adding its definition to this collection. 101 /// </summary> 102 public ServerPortCollection Ports 103 { 104 get 105 { 106 return ports; 107 } 108 } 109 110 /// <summary> 111 /// To allow awaiting termination of the server. 112 /// </summary> 113 public Task ShutdownTask 114 { 115 get 116 { 117 return shutdownTcs.Task; 118 } 119 } 120 121 /// <summary> 122 /// Experimental API. Might anytime change without prior notice. 123 /// Number or calls requested via grpc_server_request_call at any given time for each completion queue. 124 /// </summary> 125 public int RequestCallTokensPerCompletionQueue 126 { 127 get 128 { 129 return requestCallTokensPerCq; 130 } 131 set 132 { 133 lock (myLock) 134 { 135 GrpcPreconditions.CheckState(!startRequested); 136 GrpcPreconditions.CheckArgument(value > 0); 137 requestCallTokensPerCq = value; 138 } 139 } 140 } 141 142 /// <summary> 143 /// Starts the server. 144 /// Throws <c>IOException</c> if not all ports have been bound successfully (see <c>Ports.Add</c> method). 145 /// Even if some of that ports haven't been bound, the server will still serve normally on all ports that have been 146 /// bound successfully (and the user is expected to shutdown the server by invoking <c>ShutdownAsync</c> or <c>KillAsync</c>). 147 /// </summary> Start()148 public void Start() 149 { 150 lock (myLock) 151 { 152 GrpcPreconditions.CheckState(!startRequested); 153 GrpcPreconditions.CheckState(!shutdownRequested); 154 startRequested = true; 155 156 handle.Start(); 157 158 for (int i = 0; i < requestCallTokensPerCq; i++) 159 { 160 foreach (var cq in environment.CompletionQueues) 161 { 162 AllowOneRpc(cq); 163 } 164 } 165 166 // Throw if some ports weren't bound successfully. 167 // Even when that happens, some server ports might have been 168 // bound successfully, so we let server initialization 169 // proceed as usual and we only throw at the very end of the 170 // Start() method. 171 CheckPortsBoundSuccessfully(); 172 } 173 } 174 175 /// <summary> 176 /// Requests server shutdown and when there are no more calls being serviced, 177 /// cleans up used resources. The returned task finishes when shutdown procedure 178 /// is complete. 179 /// </summary> 180 /// <remarks> 181 /// It is strongly recommended to shutdown all previously created servers before exiting from the process. 182 /// </remarks> ShutdownAsync()183 public Task ShutdownAsync() 184 { 185 return ShutdownInternalAsync(false); 186 } 187 188 /// <summary> 189 /// Requests server shutdown while cancelling all the in-progress calls. 190 /// The returned task finishes when shutdown procedure is complete. 191 /// </summary> 192 /// <remarks> 193 /// It is strongly recommended to shutdown all previously created servers before exiting from the process. 194 /// </remarks> KillAsync()195 public Task KillAsync() 196 { 197 return ShutdownInternalAsync(true); 198 } 199 AddCallReference(object call)200 internal void AddCallReference(object call) 201 { 202 activeCallCounter.Increment(); 203 204 bool success = false; 205 handle.DangerousAddRef(ref success); 206 GrpcPreconditions.CheckState(success); 207 } 208 RemoveCallReference(object call)209 internal void RemoveCallReference(object call) 210 { 211 handle.DangerousRelease(); 212 activeCallCounter.Decrement(); 213 } 214 215 /// <summary> 216 /// Shuts down the server. 217 /// </summary> ShutdownInternalAsync(bool kill)218 private async Task ShutdownInternalAsync(bool kill) 219 { 220 lock (myLock) 221 { 222 GrpcPreconditions.CheckState(!shutdownRequested); 223 shutdownRequested = true; 224 } 225 GrpcEnvironment.UnregisterServer(this); 226 227 var cq = environment.CompletionQueues.First(); // any cq will do 228 handle.ShutdownAndNotify(HandleServerShutdown, cq); 229 if (kill) 230 { 231 handle.CancelAllCalls(); 232 } 233 await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false); 234 235 DisposeHandle(); 236 237 await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false); 238 } 239 240 /// <summary> 241 /// In case the environment's threadpool becomes dead, the shutdown completion will 242 /// never be delivered, but we need to release the environment's handle anyway. 243 /// </summary> ShutdownCompleteOrEnvironmentDeadAsync()244 private async Task ShutdownCompleteOrEnvironmentDeadAsync() 245 { 246 while (true) 247 { 248 var task = await Task.WhenAny(shutdownTcs.Task, Task.Delay(20)).ConfigureAwait(false); 249 if (shutdownTcs.Task == task) 250 { 251 return; 252 } 253 if (!environment.IsAlive) 254 { 255 return; 256 } 257 } 258 } 259 260 /// <summary> 261 /// Adds a service definition. 262 /// </summary> AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition)263 private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition) 264 { 265 lock (myLock) 266 { 267 GrpcPreconditions.CheckState(!startRequested); 268 foreach (var entry in serviceDefinition.GetCallHandlers()) 269 { 270 callHandlers.Add(entry.Key, entry.Value); 271 } 272 serviceDefinitionsList.Add(serviceDefinition); 273 } 274 } 275 276 /// <summary> 277 /// Adds a listening port. 278 /// </summary> AddPortInternal(ServerPort serverPort)279 private int AddPortInternal(ServerPort serverPort) 280 { 281 lock (myLock) 282 { 283 GrpcPreconditions.CheckNotNull(serverPort.Credentials, "serverPort"); 284 GrpcPreconditions.CheckState(!startRequested); 285 var address = string.Format("{0}:{1}", serverPort.Host, serverPort.Port); 286 int boundPort; 287 using (var nativeCredentials = serverPort.Credentials.ToNativeCredentials()) 288 { 289 if (nativeCredentials != null) 290 { 291 boundPort = handle.AddSecurePort(address, nativeCredentials); 292 } 293 else 294 { 295 boundPort = handle.AddInsecurePort(address); 296 } 297 } 298 var newServerPort = new ServerPort(serverPort, boundPort); 299 this.serverPortList.Add(newServerPort); 300 return boundPort; 301 } 302 } 303 304 /// <summary> 305 /// Allows one new RPC call to be received by server. 306 /// </summary> AllowOneRpc(CompletionQueueSafeHandle cq)307 private void AllowOneRpc(CompletionQueueSafeHandle cq) 308 { 309 if (!shutdownRequested) 310 { 311 // TODO(jtattermusch): avoid unnecessary delegate allocation 312 handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq); 313 } 314 } 315 316 /// <summary> 317 /// Checks that all ports have been bound successfully. 318 /// </summary> CheckPortsBoundSuccessfully()319 private void CheckPortsBoundSuccessfully() 320 { 321 lock (myLock) 322 { 323 var unboundPort = ports.FirstOrDefault(port => port.BoundPort == 0); 324 if (unboundPort != null) 325 { 326 throw new IOException( 327 string.Format("Failed to bind port \"{0}:{1}\"", unboundPort.Host, unboundPort.Port)); 328 } 329 } 330 } 331 DisposeHandle()332 private void DisposeHandle() 333 { 334 var activeCallCount = activeCallCounter.Count; 335 if (activeCallCount > 0) 336 { 337 Logger.Warning("Server shutdown has finished but there are still {0} active calls for that server.", activeCallCount); 338 } 339 handle.Dispose(); 340 } 341 342 /// <summary> 343 /// Selects corresponding handler for given call and handles the call. 344 /// </summary> HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq, Action<Server, CompletionQueueSafeHandle> continuation)345 private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq, Action<Server, CompletionQueueSafeHandle> continuation) 346 { 347 try 348 { 349 IServerCallHandler callHandler; 350 if (!callHandlers.TryGetValue(newRpc.Method, out callHandler)) 351 { 352 callHandler = UnimplementedMethodCallHandler.Instance; 353 } 354 await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false); 355 } 356 catch (Exception e) 357 { 358 Logger.Warning(e, "Exception while handling RPC."); 359 } 360 finally 361 { 362 continuation(this, cq); 363 } 364 } 365 366 /// <summary> 367 /// Handles the native callback. 368 /// </summary> HandleNewServerRpc(bool success, RequestCallContextSafeHandle ctx, CompletionQueueSafeHandle cq)369 private void HandleNewServerRpc(bool success, RequestCallContextSafeHandle ctx, CompletionQueueSafeHandle cq) 370 { 371 bool nextRpcRequested = false; 372 if (success) 373 { 374 var newRpc = ctx.GetServerRpcNew(this); 375 376 // after server shutdown, the callback returns with null call 377 if (!newRpc.Call.IsInvalid) 378 { 379 nextRpcRequested = true; 380 381 // Start asynchronous handler for the call. 382 // Don't await, the continuations will run on gRPC thread pool once triggered 383 // by cq.Next(). 384 #pragma warning disable 4014 385 HandleCallAsync(newRpc, cq, (server, state) => server.AllowOneRpc(state)); 386 #pragma warning restore 4014 387 } 388 } 389 390 if (!nextRpcRequested) 391 { 392 AllowOneRpc(cq); 393 } 394 } 395 396 /// <summary> 397 /// Handles native callback. 398 /// </summary> HandleServerShutdown(bool success, BatchContextSafeHandle ctx, object state)399 private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx, object state) 400 { 401 shutdownTcs.SetResult(null); 402 } 403 404 /// <summary> 405 /// Collection of service definitions. 406 /// </summary> 407 public class ServiceDefinitionCollection : IEnumerable<ServerServiceDefinition> 408 { 409 readonly Server server; 410 ServiceDefinitionCollection(Server server)411 internal ServiceDefinitionCollection(Server server) 412 { 413 this.server = server; 414 } 415 416 /// <summary> 417 /// Adds a service definition to the server. This is how you register 418 /// handlers for a service with the server. Only call this before Start(). 419 /// </summary> Add(ServerServiceDefinition serviceDefinition)420 public void Add(ServerServiceDefinition serviceDefinition) 421 { 422 server.AddServiceDefinitionInternal(serviceDefinition); 423 } 424 425 /// <summary> 426 /// Gets enumerator for this collection. 427 /// </summary> GetEnumerator()428 public IEnumerator<ServerServiceDefinition> GetEnumerator() 429 { 430 return server.serviceDefinitionsList.GetEnumerator(); 431 } 432 IEnumerable.GetEnumerator()433 IEnumerator IEnumerable.GetEnumerator() 434 { 435 return server.serviceDefinitionsList.GetEnumerator(); 436 } 437 } 438 439 /// <summary> 440 /// Collection of server ports. 441 /// </summary> 442 public class ServerPortCollection : IEnumerable<ServerPort> 443 { 444 readonly Server server; 445 ServerPortCollection(Server server)446 internal ServerPortCollection(Server server) 447 { 448 this.server = server; 449 } 450 451 /// <summary> 452 /// Adds a new port on which server should listen. 453 /// Only call this before Start(). 454 /// <returns>The port on which server will be listening. Return value of zero means that binding the port has failed.</returns> 455 /// </summary> Add(ServerPort serverPort)456 public int Add(ServerPort serverPort) 457 { 458 return server.AddPortInternal(serverPort); 459 } 460 461 /// <summary> 462 /// Adds a new port on which server should listen. 463 /// <returns>The port on which server will be listening. Return value of zero means that binding the port has failed.</returns> 464 /// </summary> 465 /// <param name="host">the host</param> 466 /// <param name="port">the port. If zero, an unused port is chosen automatically.</param> 467 /// <param name="credentials">credentials to use to secure this port.</param> Add(string host, int port, ServerCredentials credentials)468 public int Add(string host, int port, ServerCredentials credentials) 469 { 470 return Add(new ServerPort(host, port, credentials)); 471 } 472 473 /// <summary> 474 /// Gets enumerator for this collection. 475 /// </summary> GetEnumerator()476 public IEnumerator<ServerPort> GetEnumerator() 477 { 478 return server.serverPortList.GetEnumerator(); 479 } 480 IEnumerable.GetEnumerator()481 IEnumerator IEnumerable.GetEnumerator() 482 { 483 return server.serverPortList.GetEnumerator(); 484 } 485 } 486 } 487 } 488