• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Collections;
21 using System.Collections.Generic;
22 using System.IO;
23 using System.Linq;
24 using System.Threading.Tasks;
25 using Grpc.Core.Internal;
26 using Grpc.Core.Logging;
27 using Grpc.Core.Utils;
28 
29 namespace Grpc.Core
30 {
31     /// <summary>
32     /// gRPC server. A single server can serve an arbitrary number of services and can listen on more than one port.
33     /// </summary>
34     public class Server
35     {
36         const int DefaultRequestCallTokensPerCq = 2000;
37         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
38 
39         readonly AtomicCounter activeCallCounter = new AtomicCounter();
40 
41         readonly ServiceDefinitionCollection serviceDefinitions;
42         readonly ServerPortCollection ports;
43         readonly GrpcEnvironment environment;
44         readonly List<ChannelOption> options;
45         readonly ServerSafeHandle handle;
46         readonly object myLock = new object();
47 
48         readonly List<ServerServiceDefinition> serviceDefinitionsList = new List<ServerServiceDefinition>();
49         readonly List<ServerPort> serverPortList = new List<ServerPort>();
50         readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
51         readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
52 
53         bool startRequested;
54         volatile bool shutdownRequested;
55         int requestCallTokensPerCq = DefaultRequestCallTokensPerCq;
56 
57         /// <summary>
58         /// Creates a new server.
59         /// </summary>
Server()60         public Server() : this(null)
61         {
62         }
63 
64         /// <summary>
65         /// Creates a new server.
66         /// </summary>
67         /// <param name="options">Channel options.</param>
Server(IEnumerable<ChannelOption> options)68         public Server(IEnumerable<ChannelOption> options)
69         {
70             this.serviceDefinitions = new ServiceDefinitionCollection(this);
71             this.ports = new ServerPortCollection(this);
72             this.environment = GrpcEnvironment.AddRef();
73             this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
74             using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
75             {
76                 this.handle = ServerSafeHandle.NewServer(channelArgs);
77             }
78 
79             foreach (var cq in environment.CompletionQueues)
80             {
81                 this.handle.RegisterCompletionQueue(cq);
82             }
83             GrpcEnvironment.RegisterServer(this);
84         }
85 
86         /// <summary>
87         /// Services that will be exported by the server once started. Register a service with this
88         /// server by adding its definition to this collection.
89         /// </summary>
90         public ServiceDefinitionCollection Services
91         {
92             get
93             {
94                 return serviceDefinitions;
95             }
96         }
97 
98         /// <summary>
99         /// Ports on which the server will listen once started. Register a port with this
100         /// server by adding its definition to this collection.
101         /// </summary>
102         public ServerPortCollection Ports
103         {
104             get
105             {
106                 return ports;
107             }
108         }
109 
110         /// <summary>
111         /// To allow awaiting termination of the server.
112         /// </summary>
113         public Task ShutdownTask
114         {
115             get
116             {
117                 return shutdownTcs.Task;
118             }
119         }
120 
121         /// <summary>
122         /// Experimental API. Might anytime change without prior notice.
123         /// Number or calls requested via grpc_server_request_call at any given time for each completion queue.
124         /// </summary>
125         public int RequestCallTokensPerCompletionQueue
126         {
127             get
128             {
129                 return requestCallTokensPerCq;
130             }
131             set
132             {
133                 lock (myLock)
134                 {
135                     GrpcPreconditions.CheckState(!startRequested);
136                     GrpcPreconditions.CheckArgument(value > 0);
137                     requestCallTokensPerCq = value;
138                 }
139             }
140         }
141 
142         /// <summary>
143         /// Starts the server.
144         /// Throws <c>IOException</c> if not successful.
145         /// </summary>
Start()146         public void Start()
147         {
148             lock (myLock)
149             {
150                 GrpcPreconditions.CheckState(!startRequested);
151                 GrpcPreconditions.CheckState(!shutdownRequested);
152                 startRequested = true;
153 
154                 CheckPortsBoundSuccessfully();
155                 handle.Start();
156 
157                 for (int i = 0; i < requestCallTokensPerCq; i++)
158                 {
159                     foreach (var cq in environment.CompletionQueues)
160                     {
161                         AllowOneRpc(cq);
162                     }
163                 }
164             }
165         }
166 
167         /// <summary>
168         /// Requests server shutdown and when there are no more calls being serviced,
169         /// cleans up used resources. The returned task finishes when shutdown procedure
170         /// is complete.
171         /// </summary>
172         /// <remarks>
173         /// It is strongly recommended to shutdown all previously created servers before exiting from the process.
174         /// </remarks>
ShutdownAsync()175         public Task ShutdownAsync()
176         {
177             return ShutdownInternalAsync(false);
178         }
179 
180         /// <summary>
181         /// Requests server shutdown while cancelling all the in-progress calls.
182         /// The returned task finishes when shutdown procedure is complete.
183         /// </summary>
184         /// <remarks>
185         /// It is strongly recommended to shutdown all previously created servers before exiting from the process.
186         /// </remarks>
KillAsync()187         public Task KillAsync()
188         {
189             return ShutdownInternalAsync(true);
190         }
191 
AddCallReference(object call)192         internal void AddCallReference(object call)
193         {
194             activeCallCounter.Increment();
195 
196             bool success = false;
197             handle.DangerousAddRef(ref success);
198             GrpcPreconditions.CheckState(success);
199         }
200 
RemoveCallReference(object call)201         internal void RemoveCallReference(object call)
202         {
203             handle.DangerousRelease();
204             activeCallCounter.Decrement();
205         }
206 
207         /// <summary>
208         /// Shuts down the server.
209         /// </summary>
ShutdownInternalAsync(bool kill)210         private async Task ShutdownInternalAsync(bool kill)
211         {
212             lock (myLock)
213             {
214                 GrpcPreconditions.CheckState(!shutdownRequested);
215                 shutdownRequested = true;
216             }
217             GrpcEnvironment.UnregisterServer(this);
218 
219             var cq = environment.CompletionQueues.First();  // any cq will do
220             handle.ShutdownAndNotify(HandleServerShutdown, cq);
221             if (kill)
222             {
223                 handle.CancelAllCalls();
224             }
225             await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false);
226 
227             DisposeHandle();
228 
229             await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
230         }
231 
232         /// <summary>
233         /// In case the environment's threadpool becomes dead, the shutdown completion will
234         /// never be delivered, but we need to release the environment's handle anyway.
235         /// </summary>
ShutdownCompleteOrEnvironmentDeadAsync()236         private async Task ShutdownCompleteOrEnvironmentDeadAsync()
237         {
238             while (true)
239             {
240                 var task = await Task.WhenAny(shutdownTcs.Task, Task.Delay(20)).ConfigureAwait(false);
241                 if (shutdownTcs.Task == task)
242                 {
243                     return;
244                 }
245                 if (!environment.IsAlive)
246                 {
247                     return;
248                 }
249             }
250         }
251 
252         /// <summary>
253         /// Adds a service definition.
254         /// </summary>
AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition)255         private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition)
256         {
257             lock (myLock)
258             {
259                 GrpcPreconditions.CheckState(!startRequested);
260                 foreach (var entry in serviceDefinition.CallHandlers)
261                 {
262                     callHandlers.Add(entry.Key, entry.Value);
263                 }
264                 serviceDefinitionsList.Add(serviceDefinition);
265             }
266         }
267 
268         /// <summary>
269         /// Adds a listening port.
270         /// </summary>
AddPortInternal(ServerPort serverPort)271         private int AddPortInternal(ServerPort serverPort)
272         {
273             lock (myLock)
274             {
275                 GrpcPreconditions.CheckNotNull(serverPort.Credentials, "serverPort");
276                 GrpcPreconditions.CheckState(!startRequested);
277                 var address = string.Format("{0}:{1}", serverPort.Host, serverPort.Port);
278                 int boundPort;
279                 using (var nativeCredentials = serverPort.Credentials.ToNativeCredentials())
280                 {
281                     if (nativeCredentials != null)
282                     {
283                         boundPort = handle.AddSecurePort(address, nativeCredentials);
284                     }
285                     else
286                     {
287                         boundPort = handle.AddInsecurePort(address);
288                     }
289                 }
290                 var newServerPort = new ServerPort(serverPort, boundPort);
291                 this.serverPortList.Add(newServerPort);
292                 return boundPort;
293             }
294         }
295 
296         /// <summary>
297         /// Allows one new RPC call to be received by server.
298         /// </summary>
AllowOneRpc(CompletionQueueSafeHandle cq)299         private void AllowOneRpc(CompletionQueueSafeHandle cq)
300         {
301             if (!shutdownRequested)
302             {
303                 // TODO(jtattermusch): avoid unnecessary delegate allocation
304                 handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq);
305             }
306         }
307 
308         /// <summary>
309         /// Checks that all ports have been bound successfully.
310         /// </summary>
CheckPortsBoundSuccessfully()311         private void CheckPortsBoundSuccessfully()
312         {
313             lock (myLock)
314             {
315                 var unboundPort = ports.FirstOrDefault(port => port.BoundPort == 0);
316                 if (unboundPort != null)
317                 {
318                     throw new IOException(
319                         string.Format("Failed to bind port \"{0}:{1}\"", unboundPort.Host, unboundPort.Port));
320                 }
321             }
322         }
323 
DisposeHandle()324         private void DisposeHandle()
325         {
326             var activeCallCount = activeCallCounter.Count;
327             if (activeCallCount > 0)
328             {
329                 Logger.Warning("Server shutdown has finished but there are still {0} active calls for that server.", activeCallCount);
330             }
331             handle.Dispose();
332         }
333 
334         /// <summary>
335         /// Selects corresponding handler for given call and handles the call.
336         /// </summary>
HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq, Action continuation)337         private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq, Action continuation)
338         {
339             try
340             {
341                 IServerCallHandler callHandler;
342                 if (!callHandlers.TryGetValue(newRpc.Method, out callHandler))
343                 {
344                     callHandler = UnimplementedMethodCallHandler.Instance;
345                 }
346                 await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false);
347             }
348             catch (Exception e)
349             {
350                 Logger.Warning(e, "Exception while handling RPC.");
351             }
352             finally
353             {
354                 continuation();
355             }
356         }
357 
358         /// <summary>
359         /// Handles the native callback.
360         /// </summary>
HandleNewServerRpc(bool success, RequestCallContextSafeHandle ctx, CompletionQueueSafeHandle cq)361         private void HandleNewServerRpc(bool success, RequestCallContextSafeHandle ctx, CompletionQueueSafeHandle cq)
362         {
363             bool nextRpcRequested = false;
364             if (success)
365             {
366                 var newRpc = ctx.GetServerRpcNew(this);
367 
368                 // after server shutdown, the callback returns with null call
369                 if (!newRpc.Call.IsInvalid)
370                 {
371                     nextRpcRequested = true;
372 
373                     // Start asynchronous handler for the call.
374                     // Don't await, the continuations will run on gRPC thread pool once triggered
375                     // by cq.Next().
376                     #pragma warning disable 4014
377                     HandleCallAsync(newRpc, cq, () => AllowOneRpc(cq));
378                     #pragma warning restore 4014
379                 }
380             }
381 
382             if (!nextRpcRequested)
383             {
384                 AllowOneRpc(cq);
385             }
386         }
387 
388         /// <summary>
389         /// Handles native callback.
390         /// </summary>
HandleServerShutdown(bool success, BatchContextSafeHandle ctx, object state)391         private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx, object state)
392         {
393             shutdownTcs.SetResult(null);
394         }
395 
396         /// <summary>
397         /// Collection of service definitions.
398         /// </summary>
399         public class ServiceDefinitionCollection : IEnumerable<ServerServiceDefinition>
400         {
401             readonly Server server;
402 
ServiceDefinitionCollection(Server server)403             internal ServiceDefinitionCollection(Server server)
404             {
405                 this.server = server;
406             }
407 
408             /// <summary>
409             /// Adds a service definition to the server. This is how you register
410             /// handlers for a service with the server. Only call this before Start().
411             /// </summary>
Add(ServerServiceDefinition serviceDefinition)412             public void Add(ServerServiceDefinition serviceDefinition)
413             {
414                 server.AddServiceDefinitionInternal(serviceDefinition);
415             }
416 
417             /// <summary>
418             /// Gets enumerator for this collection.
419             /// </summary>
GetEnumerator()420             public IEnumerator<ServerServiceDefinition> GetEnumerator()
421             {
422                 return server.serviceDefinitionsList.GetEnumerator();
423             }
424 
IEnumerable.GetEnumerator()425             IEnumerator IEnumerable.GetEnumerator()
426             {
427                 return server.serviceDefinitionsList.GetEnumerator();
428             }
429         }
430 
431         /// <summary>
432         /// Collection of server ports.
433         /// </summary>
434         public class ServerPortCollection : IEnumerable<ServerPort>
435         {
436             readonly Server server;
437 
ServerPortCollection(Server server)438             internal ServerPortCollection(Server server)
439             {
440                 this.server = server;
441             }
442 
443             /// <summary>
444             /// Adds a new port on which server should listen.
445             /// Only call this before Start().
446             /// <returns>The port on which server will be listening.</returns>
447             /// </summary>
Add(ServerPort serverPort)448             public int Add(ServerPort serverPort)
449             {
450                 return server.AddPortInternal(serverPort);
451             }
452 
453             /// <summary>
454             /// Adds a new port on which server should listen.
455             /// <returns>The port on which server will be listening.</returns>
456             /// </summary>
457             /// <param name="host">the host</param>
458             /// <param name="port">the port. If zero, an unused port is chosen automatically.</param>
459             /// <param name="credentials">credentials to use to secure this port.</param>
Add(string host, int port, ServerCredentials credentials)460             public int Add(string host, int port, ServerCredentials credentials)
461             {
462                 return Add(new ServerPort(host, port, credentials));
463             }
464 
465             /// <summary>
466             /// Gets enumerator for this collection.
467             /// </summary>
GetEnumerator()468             public IEnumerator<ServerPort> GetEnumerator()
469             {
470                 return server.serverPortList.GetEnumerator();
471             }
472 
IEnumerable.GetEnumerator()473             IEnumerator IEnumerable.GetEnumerator()
474             {
475                 return server.serverPortList.GetEnumerator();
476             }
477         }
478     }
479 }
480