• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Collections;
21 using System.Collections.Generic;
22 using System.IO;
23 using System.Linq;
24 using System.Threading.Tasks;
25 using Grpc.Core.Internal;
26 using Grpc.Core.Logging;
27 using Grpc.Core.Utils;
28 
29 namespace Grpc.Core
30 {
31     /// <summary>
32     /// gRPC server. A single server can serve an arbitrary number of services and can listen on more than one port.
33     /// </summary>
34     public class Server
35     {
36         const int DefaultRequestCallTokensPerCq = 2000;
37         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
38 
39         readonly AtomicCounter activeCallCounter = new AtomicCounter();
40 
41         readonly ServiceDefinitionCollection serviceDefinitions;
42         readonly ServerPortCollection ports;
43         readonly GrpcEnvironment environment;
44         readonly List<ChannelOption> options;
45         readonly ServerSafeHandle handle;
46         readonly object myLock = new object();
47 
48         readonly List<ServerServiceDefinition> serviceDefinitionsList = new List<ServerServiceDefinition>();
49         readonly List<ServerPort> serverPortList = new List<ServerPort>();
50         readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
51         readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
52 
53         bool startRequested;
54         volatile bool shutdownRequested;
55         int requestCallTokensPerCq = DefaultRequestCallTokensPerCq;
56 
57         /// <summary>
58         /// Creates a new server.
59         /// </summary>
Server()60         public Server() : this(null)
61         {
62         }
63 
64         /// <summary>
65         /// Creates a new server.
66         /// </summary>
67         /// <param name="options">Channel options.</param>
Server(IEnumerable<ChannelOption> options)68         public Server(IEnumerable<ChannelOption> options)
69         {
70             this.serviceDefinitions = new ServiceDefinitionCollection(this);
71             this.ports = new ServerPortCollection(this);
72             this.environment = GrpcEnvironment.AddRef();
73             this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
74             using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
75             {
76                 this.handle = ServerSafeHandle.NewServer(channelArgs);
77             }
78 
79             foreach (var cq in environment.CompletionQueues)
80             {
81                 this.handle.RegisterCompletionQueue(cq);
82             }
83             GrpcEnvironment.RegisterServer(this);
84         }
85 
86         /// <summary>
87         /// Services that will be exported by the server once started. Register a service with this
88         /// server by adding its definition to this collection.
89         /// </summary>
90         public ServiceDefinitionCollection Services
91         {
92             get
93             {
94                 return serviceDefinitions;
95             }
96         }
97 
98         /// <summary>
99         /// Ports on which the server will listen once started. Register a port with this
100         /// server by adding its definition to this collection.
101         /// </summary>
102         public ServerPortCollection Ports
103         {
104             get
105             {
106                 return ports;
107             }
108         }
109 
110         /// <summary>
111         /// To allow awaiting termination of the server.
112         /// </summary>
113         public Task ShutdownTask
114         {
115             get
116             {
117                 return shutdownTcs.Task;
118             }
119         }
120 
121         /// <summary>
122         /// Experimental API. Might anytime change without prior notice.
123         /// Number or calls requested via grpc_server_request_call at any given time for each completion queue.
124         /// </summary>
125         public int RequestCallTokensPerCompletionQueue
126         {
127             get
128             {
129                 return requestCallTokensPerCq;
130             }
131             set
132             {
133                 lock (myLock)
134                 {
135                     GrpcPreconditions.CheckState(!startRequested);
136                     GrpcPreconditions.CheckArgument(value > 0);
137                     requestCallTokensPerCq = value;
138                 }
139             }
140         }
141 
142         /// <summary>
143         /// Starts the server.
144         /// Throws <c>IOException</c> if not all ports have been bound successfully (see <c>Ports.Add</c> method).
145         /// Even if some of that ports haven't been bound, the server will still serve normally on all ports that have been
146         /// bound successfully (and the user is expected to shutdown the server by invoking <c>ShutdownAsync</c> or <c>KillAsync</c>).
147         /// </summary>
Start()148         public void Start()
149         {
150             lock (myLock)
151             {
152                 GrpcPreconditions.CheckState(!startRequested);
153                 GrpcPreconditions.CheckState(!shutdownRequested);
154                 startRequested = true;
155 
156                 handle.Start();
157 
158                 for (int i = 0; i < requestCallTokensPerCq; i++)
159                 {
160                     foreach (var cq in environment.CompletionQueues)
161                     {
162                         AllowOneRpc(cq);
163                     }
164                 }
165 
166                 // Throw if some ports weren't bound successfully.
167                 // Even when that happens, some server ports might have been
168                 // bound successfully, so we let server initialization
169                 // proceed as usual and we only throw at the very end of the
170                 // Start() method.
171                 CheckPortsBoundSuccessfully();
172             }
173         }
174 
175         /// <summary>
176         /// Requests server shutdown and when there are no more calls being serviced,
177         /// cleans up used resources. The returned task finishes when shutdown procedure
178         /// is complete.
179         /// </summary>
180         /// <remarks>
181         /// It is strongly recommended to shutdown all previously created servers before exiting from the process.
182         /// </remarks>
ShutdownAsync()183         public Task ShutdownAsync()
184         {
185             return ShutdownInternalAsync(false);
186         }
187 
188         /// <summary>
189         /// Requests server shutdown while cancelling all the in-progress calls.
190         /// The returned task finishes when shutdown procedure is complete.
191         /// </summary>
192         /// <remarks>
193         /// It is strongly recommended to shutdown all previously created servers before exiting from the process.
194         /// </remarks>
KillAsync()195         public Task KillAsync()
196         {
197             return ShutdownInternalAsync(true);
198         }
199 
AddCallReference(object call)200         internal void AddCallReference(object call)
201         {
202             activeCallCounter.Increment();
203 
204             bool success = false;
205             handle.DangerousAddRef(ref success);
206             GrpcPreconditions.CheckState(success);
207         }
208 
RemoveCallReference(object call)209         internal void RemoveCallReference(object call)
210         {
211             handle.DangerousRelease();
212             activeCallCounter.Decrement();
213         }
214 
215         /// <summary>
216         /// Shuts down the server.
217         /// </summary>
ShutdownInternalAsync(bool kill)218         private async Task ShutdownInternalAsync(bool kill)
219         {
220             lock (myLock)
221             {
222                 GrpcPreconditions.CheckState(!shutdownRequested);
223                 shutdownRequested = true;
224             }
225             GrpcEnvironment.UnregisterServer(this);
226 
227             var cq = environment.CompletionQueues.First();  // any cq will do
228             handle.ShutdownAndNotify(HandleServerShutdown, cq);
229             if (kill)
230             {
231                 handle.CancelAllCalls();
232             }
233             await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false);
234 
235             DisposeHandle();
236 
237             await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
238         }
239 
240         /// <summary>
241         /// In case the environment's threadpool becomes dead, the shutdown completion will
242         /// never be delivered, but we need to release the environment's handle anyway.
243         /// </summary>
ShutdownCompleteOrEnvironmentDeadAsync()244         private async Task ShutdownCompleteOrEnvironmentDeadAsync()
245         {
246             while (true)
247             {
248                 var task = await Task.WhenAny(shutdownTcs.Task, Task.Delay(20)).ConfigureAwait(false);
249                 if (shutdownTcs.Task == task)
250                 {
251                     return;
252                 }
253                 if (!environment.IsAlive)
254                 {
255                     return;
256                 }
257             }
258         }
259 
260         /// <summary>
261         /// Adds a service definition.
262         /// </summary>
AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition)263         private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition)
264         {
265             lock (myLock)
266             {
267                 GrpcPreconditions.CheckState(!startRequested);
268                 foreach (var entry in serviceDefinition.GetCallHandlers())
269                 {
270                     callHandlers.Add(entry.Key, entry.Value);
271                 }
272                 serviceDefinitionsList.Add(serviceDefinition);
273             }
274         }
275 
276         /// <summary>
277         /// Adds a listening port.
278         /// </summary>
AddPortInternal(ServerPort serverPort)279         private int AddPortInternal(ServerPort serverPort)
280         {
281             lock (myLock)
282             {
283                 GrpcPreconditions.CheckNotNull(serverPort.Credentials, "serverPort");
284                 GrpcPreconditions.CheckState(!startRequested);
285                 var address = string.Format("{0}:{1}", serverPort.Host, serverPort.Port);
286                 int boundPort;
287                 using (var nativeCredentials = serverPort.Credentials.ToNativeCredentials())
288                 {
289                     if (nativeCredentials != null)
290                     {
291                         boundPort = handle.AddSecurePort(address, nativeCredentials);
292                     }
293                     else
294                     {
295                         boundPort = handle.AddInsecurePort(address);
296                     }
297                 }
298                 var newServerPort = new ServerPort(serverPort, boundPort);
299                 this.serverPortList.Add(newServerPort);
300                 return boundPort;
301             }
302         }
303 
304         /// <summary>
305         /// Allows one new RPC call to be received by server.
306         /// </summary>
AllowOneRpc(CompletionQueueSafeHandle cq)307         private void AllowOneRpc(CompletionQueueSafeHandle cq)
308         {
309             if (!shutdownRequested)
310             {
311                 // TODO(jtattermusch): avoid unnecessary delegate allocation
312                 handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq);
313             }
314         }
315 
316         /// <summary>
317         /// Checks that all ports have been bound successfully.
318         /// </summary>
CheckPortsBoundSuccessfully()319         private void CheckPortsBoundSuccessfully()
320         {
321             lock (myLock)
322             {
323                 var unboundPort = ports.FirstOrDefault(port => port.BoundPort == 0);
324                 if (unboundPort != null)
325                 {
326                     throw new IOException(
327                         string.Format("Failed to bind port \"{0}:{1}\"", unboundPort.Host, unboundPort.Port));
328                 }
329             }
330         }
331 
DisposeHandle()332         private void DisposeHandle()
333         {
334             var activeCallCount = activeCallCounter.Count;
335             if (activeCallCount > 0)
336             {
337                 Logger.Warning("Server shutdown has finished but there are still {0} active calls for that server.", activeCallCount);
338             }
339             handle.Dispose();
340         }
341 
342         /// <summary>
343         /// Selects corresponding handler for given call and handles the call.
344         /// </summary>
HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq, Action<Server, CompletionQueueSafeHandle> continuation)345         private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq, Action<Server, CompletionQueueSafeHandle> continuation)
346         {
347             try
348             {
349                 IServerCallHandler callHandler;
350                 if (!callHandlers.TryGetValue(newRpc.Method, out callHandler))
351                 {
352                     callHandler = UnimplementedMethodCallHandler.Instance;
353                 }
354                 await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false);
355             }
356             catch (Exception e)
357             {
358                 Logger.Warning(e, "Exception while handling RPC.");
359             }
360             finally
361             {
362                 continuation(this, cq);
363             }
364         }
365 
366         /// <summary>
367         /// Handles the native callback.
368         /// </summary>
HandleNewServerRpc(bool success, RequestCallContextSafeHandle ctx, CompletionQueueSafeHandle cq)369         private void HandleNewServerRpc(bool success, RequestCallContextSafeHandle ctx, CompletionQueueSafeHandle cq)
370         {
371             bool nextRpcRequested = false;
372             if (success)
373             {
374                 var newRpc = ctx.GetServerRpcNew(this);
375 
376                 // after server shutdown, the callback returns with null call
377                 if (!newRpc.Call.IsInvalid)
378                 {
379                     nextRpcRequested = true;
380 
381                     // Start asynchronous handler for the call.
382                     // Don't await, the continuations will run on gRPC thread pool once triggered
383                     // by cq.Next().
384                     #pragma warning disable 4014
385                     HandleCallAsync(newRpc, cq, (server, state) => server.AllowOneRpc(state));
386                     #pragma warning restore 4014
387                 }
388             }
389 
390             if (!nextRpcRequested)
391             {
392                 AllowOneRpc(cq);
393             }
394         }
395 
396         /// <summary>
397         /// Handles native callback.
398         /// </summary>
HandleServerShutdown(bool success, BatchContextSafeHandle ctx, object state)399         private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx, object state)
400         {
401             shutdownTcs.SetResult(null);
402         }
403 
404         /// <summary>
405         /// Collection of service definitions.
406         /// </summary>
407         public class ServiceDefinitionCollection : IEnumerable<ServerServiceDefinition>
408         {
409             readonly Server server;
410 
ServiceDefinitionCollection(Server server)411             internal ServiceDefinitionCollection(Server server)
412             {
413                 this.server = server;
414             }
415 
416             /// <summary>
417             /// Adds a service definition to the server. This is how you register
418             /// handlers for a service with the server. Only call this before Start().
419             /// </summary>
Add(ServerServiceDefinition serviceDefinition)420             public void Add(ServerServiceDefinition serviceDefinition)
421             {
422                 server.AddServiceDefinitionInternal(serviceDefinition);
423             }
424 
425             /// <summary>
426             /// Gets enumerator for this collection.
427             /// </summary>
GetEnumerator()428             public IEnumerator<ServerServiceDefinition> GetEnumerator()
429             {
430                 return server.serviceDefinitionsList.GetEnumerator();
431             }
432 
IEnumerable.GetEnumerator()433             IEnumerator IEnumerable.GetEnumerator()
434             {
435                 return server.serviceDefinitionsList.GetEnumerator();
436             }
437         }
438 
439         /// <summary>
440         /// Collection of server ports.
441         /// </summary>
442         public class ServerPortCollection : IEnumerable<ServerPort>
443         {
444             readonly Server server;
445 
ServerPortCollection(Server server)446             internal ServerPortCollection(Server server)
447             {
448                 this.server = server;
449             }
450 
451             /// <summary>
452             /// Adds a new port on which server should listen.
453             /// Only call this before Start().
454             /// <returns>The port on which server will be listening. Return value of zero means that binding the port has failed.</returns>
455             /// </summary>
Add(ServerPort serverPort)456             public int Add(ServerPort serverPort)
457             {
458                 return server.AddPortInternal(serverPort);
459             }
460 
461             /// <summary>
462             /// Adds a new port on which server should listen.
463             /// <returns>The port on which server will be listening. Return value of zero means that binding the port has failed.</returns>
464             /// </summary>
465             /// <param name="host">the host</param>
466             /// <param name="port">the port. If zero, an unused port is chosen automatically.</param>
467             /// <param name="credentials">credentials to use to secure this port.</param>
Add(string host, int port, ServerCredentials credentials)468             public int Add(string host, int port, ServerCredentials credentials)
469             {
470                 return Add(new ServerPort(host, port, credentials));
471             }
472 
473             /// <summary>
474             /// Gets enumerator for this collection.
475             /// </summary>
GetEnumerator()476             public IEnumerator<ServerPort> GetEnumerator()
477             {
478                 return server.serverPortList.GetEnumerator();
479             }
480 
IEnumerable.GetEnumerator()481             IEnumerator IEnumerable.GetEnumerator()
482             {
483                 return server.serverPortList.GetEnumerator();
484             }
485         }
486     }
487 }
488