• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Collections.Generic;
21 using System.Linq;
22 using System.Threading;
23 using System.Threading.Tasks;
24 using Grpc.Core.Interceptors;
25 using Grpc.Core.Internal;
26 using Grpc.Core.Logging;
27 using Grpc.Core.Utils;
28 
29 namespace Grpc.Core.Internal
30 {
31     internal interface IServerCallHandler
32     {
HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)33         Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq);
34     }
35 
36     internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
37         where TRequest : class
38         where TResponse : class
39     {
40         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<UnaryServerCallHandler<TRequest, TResponse>>();
41 
42         readonly Method<TRequest, TResponse> method;
43         readonly UnaryServerMethod<TRequest, TResponse> handler;
44 
UnaryServerCallHandler(Method<TRequest, TResponse> method, UnaryServerMethod<TRequest, TResponse> handler)45         public UnaryServerCallHandler(Method<TRequest, TResponse> method, UnaryServerMethod<TRequest, TResponse> handler)
46         {
47             this.method = method;
48             this.handler = handler;
49         }
50 
HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)51         public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
52         {
53             var asyncCall = new AsyncCallServer<TRequest, TResponse>(
54                 method.ResponseMarshaller.ContextualSerializer,
55                 method.RequestMarshaller.ContextualDeserializer,
56                 newRpc.Server);
57 
58             asyncCall.Initialize(newRpc.Call, cq);
59             var finishedTask = asyncCall.ServerSideCallAsync();
60             var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
61             var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
62 
63             Status status;
64             AsyncCallServer<TRequest,TResponse>.ResponseWithFlags? responseWithFlags = null;
65             var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken);
66             try
67             {
68                 GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
69                 var request = requestStream.Current;
70                 var response = await handler(request, context).ConfigureAwait(false);
71                 status = context.Status;
72                 responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
73             }
74             catch (Exception e)
75             {
76                 if (!(e is RpcException))
77                 {
78                     Logger.Warning(e, "Exception occurred in the handler or an interceptor.");
79                 }
80                 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
81             }
82             try
83             {
84                 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseWithFlags).ConfigureAwait(false);
85             }
86             catch (Exception)
87             {
88                 asyncCall.Cancel();
89                 throw;
90             }
91             await finishedTask.ConfigureAwait(false);
92         }
93     }
94 
95     internal class ServerStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
96         where TRequest : class
97         where TResponse : class
98     {
99         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ServerStreamingServerCallHandler<TRequest, TResponse>>();
100 
101         readonly Method<TRequest, TResponse> method;
102         readonly ServerStreamingServerMethod<TRequest, TResponse> handler;
103 
ServerStreamingServerCallHandler(Method<TRequest, TResponse> method, ServerStreamingServerMethod<TRequest, TResponse> handler)104         public ServerStreamingServerCallHandler(Method<TRequest, TResponse> method, ServerStreamingServerMethod<TRequest, TResponse> handler)
105         {
106             this.method = method;
107             this.handler = handler;
108         }
109 
HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)110         public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
111         {
112             var asyncCall = new AsyncCallServer<TRequest, TResponse>(
113                 method.ResponseMarshaller.ContextualSerializer,
114                 method.RequestMarshaller.ContextualDeserializer,
115                 newRpc.Server);
116 
117             asyncCall.Initialize(newRpc.Call, cq);
118             var finishedTask = asyncCall.ServerSideCallAsync();
119             var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
120             var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
121 
122             Status status;
123             var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken);
124             try
125             {
126                 GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
127                 var request = requestStream.Current;
128                 await handler(request, responseStream, context).ConfigureAwait(false);
129                 status = context.Status;
130             }
131             catch (Exception e)
132             {
133                 if (!(e is RpcException))
134                 {
135                     Logger.Warning(e, "Exception occurred in the handler or an interceptor.");
136                 }
137                 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
138             }
139 
140             try
141             {
142                 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
143             }
144             catch (Exception)
145             {
146                 asyncCall.Cancel();
147                 throw;
148             }
149             await finishedTask.ConfigureAwait(false);
150         }
151     }
152 
153     internal class ClientStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
154         where TRequest : class
155         where TResponse : class
156     {
157         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientStreamingServerCallHandler<TRequest, TResponse>>();
158 
159         readonly Method<TRequest, TResponse> method;
160         readonly ClientStreamingServerMethod<TRequest, TResponse> handler;
161 
ClientStreamingServerCallHandler(Method<TRequest, TResponse> method, ClientStreamingServerMethod<TRequest, TResponse> handler)162         public ClientStreamingServerCallHandler(Method<TRequest, TResponse> method, ClientStreamingServerMethod<TRequest, TResponse> handler)
163         {
164             this.method = method;
165             this.handler = handler;
166         }
167 
HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)168         public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
169         {
170             var asyncCall = new AsyncCallServer<TRequest, TResponse>(
171                 method.ResponseMarshaller.ContextualSerializer,
172                 method.RequestMarshaller.ContextualDeserializer,
173                 newRpc.Server);
174 
175             asyncCall.Initialize(newRpc.Call, cq);
176             var finishedTask = asyncCall.ServerSideCallAsync();
177             var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
178             var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
179 
180             Status status;
181             AsyncCallServer<TRequest, TResponse>.ResponseWithFlags? responseWithFlags = null;
182             var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken);
183             try
184             {
185                 var response = await handler(requestStream, context).ConfigureAwait(false);
186                 status = context.Status;
187                 responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
188             }
189             catch (Exception e)
190             {
191                 if (!(e is RpcException))
192                 {
193                     Logger.Warning(e, "Exception occurred in the handler or an interceptor.");
194                 }
195                 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
196             }
197 
198             try
199             {
200                 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseWithFlags).ConfigureAwait(false);
201             }
202             catch (Exception)
203             {
204                 asyncCall.Cancel();
205                 throw;
206             }
207             await finishedTask.ConfigureAwait(false);
208         }
209     }
210 
211     internal class DuplexStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
212         where TRequest : class
213         where TResponse : class
214     {
215         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<DuplexStreamingServerCallHandler<TRequest, TResponse>>();
216 
217         readonly Method<TRequest, TResponse> method;
218         readonly DuplexStreamingServerMethod<TRequest, TResponse> handler;
219 
DuplexStreamingServerCallHandler(Method<TRequest, TResponse> method, DuplexStreamingServerMethod<TRequest, TResponse> handler)220         public DuplexStreamingServerCallHandler(Method<TRequest, TResponse> method, DuplexStreamingServerMethod<TRequest, TResponse> handler)
221         {
222             this.method = method;
223             this.handler = handler;
224         }
225 
HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)226         public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
227         {
228             var asyncCall = new AsyncCallServer<TRequest, TResponse>(
229                 method.ResponseMarshaller.ContextualSerializer,
230                 method.RequestMarshaller.ContextualDeserializer,
231                 newRpc.Server);
232 
233             asyncCall.Initialize(newRpc.Call, cq);
234             var finishedTask = asyncCall.ServerSideCallAsync();
235             var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
236             var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
237 
238             Status status;
239             var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken);
240             try
241             {
242                 await handler(requestStream, responseStream, context).ConfigureAwait(false);
243                 status = context.Status;
244             }
245             catch (Exception e)
246             {
247                 if (!(e is RpcException))
248                 {
249                     Logger.Warning(e, "Exception occurred in the handler or an interceptor.");
250                 }
251                 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
252             }
253             try
254             {
255                 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
256             }
257             catch (Exception)
258             {
259                 asyncCall.Cancel();
260                 throw;
261             }
262             await finishedTask.ConfigureAwait(false);
263         }
264     }
265 
266     internal class UnimplementedMethodCallHandler : IServerCallHandler
267     {
268         public static readonly UnimplementedMethodCallHandler Instance = new UnimplementedMethodCallHandler();
269 
270         DuplexStreamingServerCallHandler<byte[], byte[]> callHandlerImpl;
271 
UnimplementedMethodCallHandler()272         public UnimplementedMethodCallHandler()
273         {
274             var marshaller = new Marshaller<byte[]>((payload) => payload, (payload) => payload);
275             var method = new Method<byte[], byte[]>(MethodType.DuplexStreaming, "", "", marshaller, marshaller);
276             this.callHandlerImpl = new DuplexStreamingServerCallHandler<byte[], byte[]>(method, new DuplexStreamingServerMethod<byte[], byte[]>(UnimplementedMethod));
277         }
278 
279         /// <summary>
280         /// Handler used for unimplemented method.
281         /// </summary>
UnimplementedMethod(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext ctx)282         private Task UnimplementedMethod(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext ctx)
283         {
284             ctx.Status = new Status(StatusCode.Unimplemented, "");
285             return TaskUtils.CompletedTask;
286         }
287 
HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)288         public Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
289         {
290             return callHandlerImpl.HandleCall(newRpc, cq);
291         }
292     }
293 
294     internal static class HandlerUtils
295     {
GetStatusFromExceptionAndMergeTrailers(Exception e, Metadata callContextResponseTrailers)296         public static Status GetStatusFromExceptionAndMergeTrailers(Exception e, Metadata callContextResponseTrailers)
297         {
298             var rpcException = e as RpcException;
299             if (rpcException != null)
300             {
301                 // There are two sources of metadata entries on the server-side:
302                 // 1. serverCallContext.ResponseTrailers
303                 // 2. trailers in RpcException thrown by user code in server side handler.
304                 // As metadata allows duplicate keys, the logical thing to do is
305                 // to just merge trailers from RpcException into serverCallContext.ResponseTrailers.
306                 foreach (var entry in rpcException.Trailers)
307                 {
308                     callContextResponseTrailers.Add(entry);
309                 }
310                 // use the status thrown by handler.
311                 return rpcException.Status;
312             }
313 
314             return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
315         }
316 
GetWriteFlags(WriteOptions writeOptions)317         public static WriteFlags GetWriteFlags(WriteOptions writeOptions)
318         {
319             return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
320         }
321 
NewContext(ServerRpcNew newRpc, IServerResponseStream serverResponseStream, CancellationToken cancellationToken)322         public static ServerCallContext NewContext(ServerRpcNew newRpc, IServerResponseStream serverResponseStream, CancellationToken cancellationToken)
323         {
324             DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime();
325             return new DefaultServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, realtimeDeadline, newRpc.RequestMetadata, cancellationToken, serverResponseStream);
326         }
327     }
328 }
329