1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Collections.Generic; 21 using System.Linq; 22 using System.Threading; 23 using System.Threading.Tasks; 24 using Grpc.Core.Interceptors; 25 using Grpc.Core.Internal; 26 using Grpc.Core.Logging; 27 using Grpc.Core.Utils; 28 29 namespace Grpc.Core.Internal 30 { 31 internal interface IServerCallHandler 32 { HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)33 Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq); 34 } 35 36 internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler 37 where TRequest : class 38 where TResponse : class 39 { 40 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<UnaryServerCallHandler<TRequest, TResponse>>(); 41 42 readonly Method<TRequest, TResponse> method; 43 readonly UnaryServerMethod<TRequest, TResponse> handler; 44 UnaryServerCallHandler(Method<TRequest, TResponse> method, UnaryServerMethod<TRequest, TResponse> handler)45 public UnaryServerCallHandler(Method<TRequest, TResponse> method, UnaryServerMethod<TRequest, TResponse> handler) 46 { 47 this.method = method; 48 this.handler = handler; 49 } 50 HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)51 public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) 52 { 53 var asyncCall = new AsyncCallServer<TRequest, TResponse>( 54 method.ResponseMarshaller.ContextualSerializer, 55 method.RequestMarshaller.ContextualDeserializer, 56 newRpc.Server); 57 58 asyncCall.Initialize(newRpc.Call, cq); 59 var finishedTask = asyncCall.ServerSideCallAsync(); 60 var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); 61 var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); 62 63 Status status; 64 AsyncCallServer<TRequest,TResponse>.ResponseWithFlags? responseWithFlags = null; 65 var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken); 66 try 67 { 68 GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); 69 var request = requestStream.Current; 70 var response = await handler(request, context).ConfigureAwait(false); 71 status = context.Status; 72 responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); 73 } 74 catch (Exception e) 75 { 76 if (!(e is RpcException)) 77 { 78 Logger.Warning(e, "Exception occurred in the handler or an interceptor."); 79 } 80 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); 81 } 82 try 83 { 84 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseWithFlags).ConfigureAwait(false); 85 } 86 catch (Exception) 87 { 88 asyncCall.Cancel(); 89 throw; 90 } 91 await finishedTask.ConfigureAwait(false); 92 } 93 } 94 95 internal class ServerStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler 96 where TRequest : class 97 where TResponse : class 98 { 99 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ServerStreamingServerCallHandler<TRequest, TResponse>>(); 100 101 readonly Method<TRequest, TResponse> method; 102 readonly ServerStreamingServerMethod<TRequest, TResponse> handler; 103 ServerStreamingServerCallHandler(Method<TRequest, TResponse> method, ServerStreamingServerMethod<TRequest, TResponse> handler)104 public ServerStreamingServerCallHandler(Method<TRequest, TResponse> method, ServerStreamingServerMethod<TRequest, TResponse> handler) 105 { 106 this.method = method; 107 this.handler = handler; 108 } 109 HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)110 public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) 111 { 112 var asyncCall = new AsyncCallServer<TRequest, TResponse>( 113 method.ResponseMarshaller.ContextualSerializer, 114 method.RequestMarshaller.ContextualDeserializer, 115 newRpc.Server); 116 117 asyncCall.Initialize(newRpc.Call, cq); 118 var finishedTask = asyncCall.ServerSideCallAsync(); 119 var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); 120 var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); 121 122 Status status; 123 var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken); 124 try 125 { 126 GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); 127 var request = requestStream.Current; 128 await handler(request, responseStream, context).ConfigureAwait(false); 129 status = context.Status; 130 } 131 catch (Exception e) 132 { 133 if (!(e is RpcException)) 134 { 135 Logger.Warning(e, "Exception occurred in the handler or an interceptor."); 136 } 137 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); 138 } 139 140 try 141 { 142 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); 143 } 144 catch (Exception) 145 { 146 asyncCall.Cancel(); 147 throw; 148 } 149 await finishedTask.ConfigureAwait(false); 150 } 151 } 152 153 internal class ClientStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler 154 where TRequest : class 155 where TResponse : class 156 { 157 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientStreamingServerCallHandler<TRequest, TResponse>>(); 158 159 readonly Method<TRequest, TResponse> method; 160 readonly ClientStreamingServerMethod<TRequest, TResponse> handler; 161 ClientStreamingServerCallHandler(Method<TRequest, TResponse> method, ClientStreamingServerMethod<TRequest, TResponse> handler)162 public ClientStreamingServerCallHandler(Method<TRequest, TResponse> method, ClientStreamingServerMethod<TRequest, TResponse> handler) 163 { 164 this.method = method; 165 this.handler = handler; 166 } 167 HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)168 public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) 169 { 170 var asyncCall = new AsyncCallServer<TRequest, TResponse>( 171 method.ResponseMarshaller.ContextualSerializer, 172 method.RequestMarshaller.ContextualDeserializer, 173 newRpc.Server); 174 175 asyncCall.Initialize(newRpc.Call, cq); 176 var finishedTask = asyncCall.ServerSideCallAsync(); 177 var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); 178 var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); 179 180 Status status; 181 AsyncCallServer<TRequest, TResponse>.ResponseWithFlags? responseWithFlags = null; 182 var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken); 183 try 184 { 185 var response = await handler(requestStream, context).ConfigureAwait(false); 186 status = context.Status; 187 responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); 188 } 189 catch (Exception e) 190 { 191 if (!(e is RpcException)) 192 { 193 Logger.Warning(e, "Exception occurred in the handler or an interceptor."); 194 } 195 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); 196 } 197 198 try 199 { 200 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseWithFlags).ConfigureAwait(false); 201 } 202 catch (Exception) 203 { 204 asyncCall.Cancel(); 205 throw; 206 } 207 await finishedTask.ConfigureAwait(false); 208 } 209 } 210 211 internal class DuplexStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler 212 where TRequest : class 213 where TResponse : class 214 { 215 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<DuplexStreamingServerCallHandler<TRequest, TResponse>>(); 216 217 readonly Method<TRequest, TResponse> method; 218 readonly DuplexStreamingServerMethod<TRequest, TResponse> handler; 219 DuplexStreamingServerCallHandler(Method<TRequest, TResponse> method, DuplexStreamingServerMethod<TRequest, TResponse> handler)220 public DuplexStreamingServerCallHandler(Method<TRequest, TResponse> method, DuplexStreamingServerMethod<TRequest, TResponse> handler) 221 { 222 this.method = method; 223 this.handler = handler; 224 } 225 HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)226 public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) 227 { 228 var asyncCall = new AsyncCallServer<TRequest, TResponse>( 229 method.ResponseMarshaller.ContextualSerializer, 230 method.RequestMarshaller.ContextualDeserializer, 231 newRpc.Server); 232 233 asyncCall.Initialize(newRpc.Call, cq); 234 var finishedTask = asyncCall.ServerSideCallAsync(); 235 var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); 236 var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); 237 238 Status status; 239 var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken); 240 try 241 { 242 await handler(requestStream, responseStream, context).ConfigureAwait(false); 243 status = context.Status; 244 } 245 catch (Exception e) 246 { 247 if (!(e is RpcException)) 248 { 249 Logger.Warning(e, "Exception occurred in the handler or an interceptor."); 250 } 251 status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); 252 } 253 try 254 { 255 await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); 256 } 257 catch (Exception) 258 { 259 asyncCall.Cancel(); 260 throw; 261 } 262 await finishedTask.ConfigureAwait(false); 263 } 264 } 265 266 internal class UnimplementedMethodCallHandler : IServerCallHandler 267 { 268 public static readonly UnimplementedMethodCallHandler Instance = new UnimplementedMethodCallHandler(); 269 270 DuplexStreamingServerCallHandler<byte[], byte[]> callHandlerImpl; 271 UnimplementedMethodCallHandler()272 public UnimplementedMethodCallHandler() 273 { 274 var marshaller = new Marshaller<byte[]>((payload) => payload, (payload) => payload); 275 var method = new Method<byte[], byte[]>(MethodType.DuplexStreaming, "", "", marshaller, marshaller); 276 this.callHandlerImpl = new DuplexStreamingServerCallHandler<byte[], byte[]>(method, new DuplexStreamingServerMethod<byte[], byte[]>(UnimplementedMethod)); 277 } 278 279 /// <summary> 280 /// Handler used for unimplemented method. 281 /// </summary> UnimplementedMethod(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext ctx)282 private Task UnimplementedMethod(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext ctx) 283 { 284 ctx.Status = new Status(StatusCode.Unimplemented, ""); 285 return TaskUtils.CompletedTask; 286 } 287 HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)288 public Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) 289 { 290 return callHandlerImpl.HandleCall(newRpc, cq); 291 } 292 } 293 294 internal static class HandlerUtils 295 { GetStatusFromExceptionAndMergeTrailers(Exception e, Metadata callContextResponseTrailers)296 public static Status GetStatusFromExceptionAndMergeTrailers(Exception e, Metadata callContextResponseTrailers) 297 { 298 var rpcException = e as RpcException; 299 if (rpcException != null) 300 { 301 // There are two sources of metadata entries on the server-side: 302 // 1. serverCallContext.ResponseTrailers 303 // 2. trailers in RpcException thrown by user code in server side handler. 304 // As metadata allows duplicate keys, the logical thing to do is 305 // to just merge trailers from RpcException into serverCallContext.ResponseTrailers. 306 foreach (var entry in rpcException.Trailers) 307 { 308 callContextResponseTrailers.Add(entry); 309 } 310 // use the status thrown by handler. 311 return rpcException.Status; 312 } 313 314 return new Status(StatusCode.Unknown, "Exception was thrown by handler."); 315 } 316 GetWriteFlags(WriteOptions writeOptions)317 public static WriteFlags GetWriteFlags(WriteOptions writeOptions) 318 { 319 return writeOptions != null ? writeOptions.Flags : default(WriteFlags); 320 } 321 NewContext(ServerRpcNew newRpc, IServerResponseStream serverResponseStream, CancellationToken cancellationToken)322 public static ServerCallContext NewContext(ServerRpcNew newRpc, IServerResponseStream serverResponseStream, CancellationToken cancellationToken) 323 { 324 DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime(); 325 return new DefaultServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, realtimeDeadline, newRpc.RequestMetadata, cancellationToken, serverResponseStream); 326 } 327 } 328 } 329