1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Collections.Generic; 21 using System.Linq; 22 using System.Threading; 23 using System.Threading.Tasks; 24 using Grpc.Core.Interceptors; 25 using Grpc.Core.Internal; 26 using Grpc.Core.Logging; 27 using Grpc.Core.Utils; 28 29 namespace Grpc.Core.Internal 30 { 31 internal interface IServerCallHandler 32 { HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)33 Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq); Intercept(Interceptor interceptor)34 IServerCallHandler Intercept(Interceptor interceptor); 35 } 36 37 internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler 38 where TRequest : class 39 where TResponse : class 40 { 41 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<UnaryServerCallHandler<TRequest, TResponse>>(); 42 43 readonly Method<TRequest, TResponse> method; 44 readonly UnaryServerMethod<TRequest, TResponse> handler; 45 UnaryServerCallHandler(Method<TRequest, TResponse> method, UnaryServerMethod<TRequest, TResponse> handler)46 public UnaryServerCallHandler(Method<TRequest, TResponse> method, UnaryServerMethod<TRequest, TResponse> handler) 47 { 48 this.method = method; 49 this.handler = handler; 50 } 51 HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)52 public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) 53 { 54 var asyncCall = new AsyncCallServer<TRequest, TResponse>( 55 method.ResponseMarshaller.Serializer, 56 method.RequestMarshaller.Deserializer, 57 newRpc.Server); 58 59 asyncCall.Initialize(newRpc.Call, cq); 60 var finishedTask = asyncCall.ServerSideCallAsync(); 61 var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); 62 var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); 63 64 Status status; 65 AsyncCallServer<TRequest,TResponse>.ResponseWithFlags? responseWithFlags = null; 66 var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken); 67 try 68 { 69 GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); 70 var request = requestStream.Current; 71 var response = await handler(request, context).ConfigureAwait(false); 72 status = context.Status; 73 responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); 74 } 75 catch (Exception e) 76 { 77 if (!(e is RpcException)) 78 { 79 Logger.Warning(e, "Exception occurred in the handler or an interceptor."); 80 } 81 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); 82 } 83 try 84 { 85 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseWithFlags).ConfigureAwait(false); 86 } 87 catch (Exception) 88 { 89 asyncCall.Cancel(); 90 throw; 91 } 92 await finishedTask.ConfigureAwait(false); 93 } 94 Intercept(Interceptor interceptor)95 public IServerCallHandler Intercept(Interceptor interceptor) 96 { 97 return new UnaryServerCallHandler<TRequest, TResponse>(method, (request, context) => interceptor.UnaryServerHandler(request, context, handler)); 98 } 99 } 100 101 internal class ServerStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler 102 where TRequest : class 103 where TResponse : class 104 { 105 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ServerStreamingServerCallHandler<TRequest, TResponse>>(); 106 107 readonly Method<TRequest, TResponse> method; 108 readonly ServerStreamingServerMethod<TRequest, TResponse> handler; 109 ServerStreamingServerCallHandler(Method<TRequest, TResponse> method, ServerStreamingServerMethod<TRequest, TResponse> handler)110 public ServerStreamingServerCallHandler(Method<TRequest, TResponse> method, ServerStreamingServerMethod<TRequest, TResponse> handler) 111 { 112 this.method = method; 113 this.handler = handler; 114 } 115 HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)116 public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) 117 { 118 var asyncCall = new AsyncCallServer<TRequest, TResponse>( 119 method.ResponseMarshaller.Serializer, 120 method.RequestMarshaller.Deserializer, 121 newRpc.Server); 122 123 asyncCall.Initialize(newRpc.Call, cq); 124 var finishedTask = asyncCall.ServerSideCallAsync(); 125 var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); 126 var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); 127 128 Status status; 129 var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken); 130 try 131 { 132 GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); 133 var request = requestStream.Current; 134 await handler(request, responseStream, context).ConfigureAwait(false); 135 status = context.Status; 136 } 137 catch (Exception e) 138 { 139 if (!(e is RpcException)) 140 { 141 Logger.Warning(e, "Exception occurred in the handler or an interceptor."); 142 } 143 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); 144 } 145 146 try 147 { 148 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); 149 } 150 catch (Exception) 151 { 152 asyncCall.Cancel(); 153 throw; 154 } 155 await finishedTask.ConfigureAwait(false); 156 } 157 Intercept(Interceptor interceptor)158 public IServerCallHandler Intercept(Interceptor interceptor) 159 { 160 return new ServerStreamingServerCallHandler<TRequest, TResponse>(method, (request, responseStream, context) => interceptor.ServerStreamingServerHandler(request, responseStream, context, handler)); 161 } 162 } 163 164 internal class ClientStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler 165 where TRequest : class 166 where TResponse : class 167 { 168 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientStreamingServerCallHandler<TRequest, TResponse>>(); 169 170 readonly Method<TRequest, TResponse> method; 171 readonly ClientStreamingServerMethod<TRequest, TResponse> handler; 172 ClientStreamingServerCallHandler(Method<TRequest, TResponse> method, ClientStreamingServerMethod<TRequest, TResponse> handler)173 public ClientStreamingServerCallHandler(Method<TRequest, TResponse> method, ClientStreamingServerMethod<TRequest, TResponse> handler) 174 { 175 this.method = method; 176 this.handler = handler; 177 } 178 HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)179 public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) 180 { 181 var asyncCall = new AsyncCallServer<TRequest, TResponse>( 182 method.ResponseMarshaller.Serializer, 183 method.RequestMarshaller.Deserializer, 184 newRpc.Server); 185 186 asyncCall.Initialize(newRpc.Call, cq); 187 var finishedTask = asyncCall.ServerSideCallAsync(); 188 var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); 189 var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); 190 191 Status status; 192 AsyncCallServer<TRequest, TResponse>.ResponseWithFlags? responseWithFlags = null; 193 var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken); 194 try 195 { 196 var response = await handler(requestStream, context).ConfigureAwait(false); 197 status = context.Status; 198 responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); 199 } 200 catch (Exception e) 201 { 202 if (!(e is RpcException)) 203 { 204 Logger.Warning(e, "Exception occurred in the handler or an interceptor."); 205 } 206 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); 207 } 208 209 try 210 { 211 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseWithFlags).ConfigureAwait(false); 212 } 213 catch (Exception) 214 { 215 asyncCall.Cancel(); 216 throw; 217 } 218 await finishedTask.ConfigureAwait(false); 219 } 220 Intercept(Interceptor interceptor)221 public IServerCallHandler Intercept(Interceptor interceptor) 222 { 223 return new ClientStreamingServerCallHandler<TRequest, TResponse>(method, (requestStream, context) => interceptor.ClientStreamingServerHandler(requestStream, context, handler)); 224 } 225 } 226 227 internal class DuplexStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler 228 where TRequest : class 229 where TResponse : class 230 { 231 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<DuplexStreamingServerCallHandler<TRequest, TResponse>>(); 232 233 readonly Method<TRequest, TResponse> method; 234 readonly DuplexStreamingServerMethod<TRequest, TResponse> handler; 235 DuplexStreamingServerCallHandler(Method<TRequest, TResponse> method, DuplexStreamingServerMethod<TRequest, TResponse> handler)236 public DuplexStreamingServerCallHandler(Method<TRequest, TResponse> method, DuplexStreamingServerMethod<TRequest, TResponse> handler) 237 { 238 this.method = method; 239 this.handler = handler; 240 } 241 HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)242 public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) 243 { 244 var asyncCall = new AsyncCallServer<TRequest, TResponse>( 245 method.ResponseMarshaller.Serializer, 246 method.RequestMarshaller.Deserializer, 247 newRpc.Server); 248 249 asyncCall.Initialize(newRpc.Call, cq); 250 var finishedTask = asyncCall.ServerSideCallAsync(); 251 var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); 252 var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); 253 254 Status status; 255 var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken); 256 try 257 { 258 await handler(requestStream, responseStream, context).ConfigureAwait(false); 259 status = context.Status; 260 } 261 catch (Exception e) 262 { 263 if (!(e is RpcException)) 264 { 265 Logger.Warning(e, "Exception occurred in the handler or an interceptor."); 266 } 267 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); 268 } 269 try 270 { 271 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); 272 } 273 catch (Exception) 274 { 275 asyncCall.Cancel(); 276 throw; 277 } 278 await finishedTask.ConfigureAwait(false); 279 } 280 Intercept(Interceptor interceptor)281 public IServerCallHandler Intercept(Interceptor interceptor) 282 { 283 return new DuplexStreamingServerCallHandler<TRequest, TResponse>(method, (requestStream, responseStream, context) => interceptor.DuplexStreamingServerHandler(requestStream, responseStream, context, handler)); 284 } 285 } 286 287 internal class UnimplementedMethodCallHandler : IServerCallHandler 288 { 289 public static readonly UnimplementedMethodCallHandler Instance = new UnimplementedMethodCallHandler(); 290 291 DuplexStreamingServerCallHandler<byte[], byte[]> callHandlerImpl; 292 UnimplementedMethodCallHandler()293 public UnimplementedMethodCallHandler() 294 { 295 var marshaller = new Marshaller<byte[]>((payload) => payload, (payload) => payload); 296 var method = new Method<byte[], byte[]>(MethodType.DuplexStreaming, "", "", marshaller, marshaller); 297 this.callHandlerImpl = new DuplexStreamingServerCallHandler<byte[], byte[]>(method, new DuplexStreamingServerMethod<byte[], byte[]>(UnimplementedMethod)); 298 } 299 300 /// <summary> 301 /// Handler used for unimplemented method. 302 /// </summary> UnimplementedMethod(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext ctx)303 private Task UnimplementedMethod(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext ctx) 304 { 305 ctx.Status = new Status(StatusCode.Unimplemented, ""); 306 return TaskUtils.CompletedTask; 307 } 308 HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)309 public Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) 310 { 311 return callHandlerImpl.HandleCall(newRpc, cq); 312 } 313 Intercept(Interceptor interceptor)314 public IServerCallHandler Intercept(Interceptor interceptor) 315 { 316 return this; // Do not intercept unimplemented methods. 317 } 318 } 319 320 internal static class HandlerUtils 321 { GetStatusFromExceptionAndMergeTrailers(Exception e, Metadata callContextResponseTrailers)322 public static Status GetStatusFromExceptionAndMergeTrailers(Exception e, Metadata callContextResponseTrailers) 323 { 324 var rpcException = e as RpcException; 325 if (rpcException != null) 326 { 327 // There are two sources of metadata entries on the server-side: 328 // 1. serverCallContext.ResponseTrailers 329 // 2. trailers in RpcException thrown by user code in server side handler. 330 // As metadata allows duplicate keys, the logical thing to do is 331 // to just merge trailers from RpcException into serverCallContext.ResponseTrailers. 332 foreach (var entry in rpcException.Trailers) 333 { 334 callContextResponseTrailers.Add(entry); 335 } 336 // use the status thrown by handler. 337 return rpcException.Status; 338 } 339 340 return new Status(StatusCode.Unknown, "Exception was thrown by handler."); 341 } 342 GetWriteFlags(WriteOptions writeOptions)343 public static WriteFlags GetWriteFlags(WriteOptions writeOptions) 344 { 345 return writeOptions != null ? writeOptions.Flags : default(WriteFlags); 346 } 347 348 public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken) 349 where TRequest : class 350 where TResponse : class 351 { 352 DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime(); 353 354 return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, realtimeDeadline, 355 newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream); 356 } 357 } 358 } 359