1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Diagnostics; 21 using System.IO; 22 using System.Runtime.CompilerServices; 23 using System.Runtime.InteropServices; 24 using System.Threading; 25 using System.Threading.Tasks; 26 using Grpc.Core.Internal; 27 using Grpc.Core.Utils; 28 29 namespace Grpc.Core.Internal 30 { 31 /// <summary> 32 /// Manages server side native call lifecycle. 33 /// </summary> 34 internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>, IReceivedCloseOnServerCallback, ISendStatusFromServerCompletionCallback 35 { 36 readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); 37 readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); 38 readonly Server server; 39 AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server)40 public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server) : base(serializer, deserializer) 41 { 42 this.server = GrpcPreconditions.CheckNotNull(server); 43 } 44 Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue)45 public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue) 46 { 47 call.Initialize(completionQueue); 48 49 server.AddCallReference(this); 50 InitializeInternal(call); 51 } 52 53 /// <summary> 54 /// Only for testing purposes. 55 /// </summary> InitializeForTesting(INativeCall call)56 public void InitializeForTesting(INativeCall call) 57 { 58 server.AddCallReference(this); 59 InitializeInternal(call); 60 } 61 62 /// <summary> 63 /// Starts a server side call. 64 /// </summary> ServerSideCallAsync()65 public Task ServerSideCallAsync() 66 { 67 lock (myLock) 68 { 69 GrpcPreconditions.CheckNotNull(call); 70 71 started = true; 72 73 call.StartServerSide(ReceiveCloseOnServerCallback); 74 return finishedServersideTcs.Task; 75 } 76 } 77 78 /// <summary> 79 /// Sends a streaming response. Only one pending send action is allowed at any given time. 80 /// </summary> SendMessageAsync(TResponse msg, WriteFlags writeFlags)81 public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags) 82 { 83 return SendMessageInternalAsync(msg, writeFlags); 84 } 85 86 /// <summary> 87 /// Receives a streaming request. Only one pending read action is allowed at any given time. 88 /// </summary> ReadMessageAsync()89 public Task<TRequest> ReadMessageAsync() 90 { 91 return ReadMessageInternalAsync(); 92 } 93 94 /// <summary> 95 /// Initiates sending a initial metadata. 96 /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation 97 /// to make things simpler. 98 /// </summary> SendInitialMetadataAsync(Metadata headers)99 public Task SendInitialMetadataAsync(Metadata headers) 100 { 101 lock (myLock) 102 { 103 GrpcPreconditions.CheckNotNull(headers, "metadata"); 104 105 GrpcPreconditions.CheckState(started); 106 GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call."); 107 GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts."); 108 109 var earlyResult = CheckSendAllowedOrEarlyResult(); 110 if (earlyResult != null) 111 { 112 return earlyResult; 113 } 114 115 using (var metadataArray = MetadataArraySafeHandle.Create(headers)) 116 { 117 call.StartSendInitialMetadata(SendCompletionCallback, metadataArray); 118 } 119 120 this.initialMetadataSent = true; 121 streamingWriteTcs = new TaskCompletionSource<object>(); 122 return streamingWriteTcs.Task; 123 } 124 } 125 126 /// <summary> 127 /// Sends call result status, indicating we are done with writes. 128 /// Sending a status different from StatusCode.OK will also implicitly cancel the call. 129 /// </summary> SendStatusFromServerAsync(Status status, Metadata trailers, ResponseWithFlags? optionalWrite)130 public Task SendStatusFromServerAsync(Status status, Metadata trailers, ResponseWithFlags? optionalWrite) 131 { 132 byte[] payload = optionalWrite.HasValue ? UnsafeSerialize(optionalWrite.Value.Response) : null; 133 var writeFlags = optionalWrite.HasValue ? optionalWrite.Value.WriteFlags : default(WriteFlags); 134 135 lock (myLock) 136 { 137 GrpcPreconditions.CheckState(started); 138 GrpcPreconditions.CheckState(!disposed); 139 GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once."); 140 141 using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) 142 { 143 call.StartSendStatusFromServer(SendStatusFromServerCompletionCallback, status, metadataArray, !initialMetadataSent, 144 payload, writeFlags); 145 } 146 halfcloseRequested = true; 147 initialMetadataSent = true; 148 sendStatusFromServerTcs = new TaskCompletionSource<object>(); 149 if (optionalWrite.HasValue) 150 { 151 streamingWritesCounter++; 152 } 153 return sendStatusFromServerTcs.Task; 154 } 155 } 156 157 /// <summary> 158 /// Gets cancellation token that gets cancelled once close completion 159 /// is received and the cancelled flag is set. 160 /// </summary> 161 public CancellationToken CancellationToken 162 { 163 get 164 { 165 return cancellationTokenSource.Token; 166 } 167 } 168 169 public string Peer 170 { 171 get 172 { 173 return call.GetPeer(); 174 } 175 } 176 177 protected override bool IsClient 178 { 179 get { return false; } 180 } 181 GetRpcExceptionClientOnly()182 protected override Exception GetRpcExceptionClientOnly() 183 { 184 throw new InvalidOperationException("Call be only called for client calls"); 185 } 186 OnAfterReleaseResourcesLocked()187 protected override void OnAfterReleaseResourcesLocked() 188 { 189 server.RemoveCallReference(this); 190 } 191 CheckSendAllowedOrEarlyResult()192 protected override Task CheckSendAllowedOrEarlyResult() 193 { 194 GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed."); 195 GrpcPreconditions.CheckState(!finished, "Already finished."); 196 GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); 197 GrpcPreconditions.CheckState(!disposed); 198 199 return null; 200 } 201 202 /// <summary> 203 /// Handles the server side close completion. 204 /// </summary> HandleFinishedServerside(bool success, bool cancelled)205 private void HandleFinishedServerside(bool success, bool cancelled) 206 { 207 // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER, 208 // success will be always set to true. 209 bool releasedResources; 210 lock (myLock) 211 { 212 finished = true; 213 if (streamingReadTcs == null) 214 { 215 // if there's no pending read, readingDone=true will dispose now. 216 // if there is a pending read, we will dispose once that read finishes. 217 readingDone = true; 218 streamingReadTcs = new TaskCompletionSource<TRequest>(); 219 streamingReadTcs.SetResult(default(TRequest)); 220 } 221 releasedResources = ReleaseResourcesIfPossible(); 222 } 223 224 if (releasedResources) 225 { 226 OnAfterReleaseResourcesUnlocked(); 227 } 228 229 if (cancelled) 230 { 231 cancellationTokenSource.Cancel(); 232 } 233 234 finishedServersideTcs.SetResult(null); 235 } 236 237 IReceivedCloseOnServerCallback ReceiveCloseOnServerCallback => this; 238 IReceivedCloseOnServerCallback.OnReceivedCloseOnServer(bool success, bool cancelled)239 void IReceivedCloseOnServerCallback.OnReceivedCloseOnServer(bool success, bool cancelled) 240 { 241 HandleFinishedServerside(success, cancelled); 242 } 243 244 ISendStatusFromServerCompletionCallback SendStatusFromServerCompletionCallback => this; 245 ISendStatusFromServerCompletionCallback.OnSendStatusFromServerCompletion(bool success)246 void ISendStatusFromServerCompletionCallback.OnSendStatusFromServerCompletion(bool success) 247 { 248 HandleSendStatusFromServerFinished(success); 249 } 250 251 public struct ResponseWithFlags 252 { ResponseWithFlagsGrpc.Core.Internal.AsyncCallServer.ResponseWithFlags253 public ResponseWithFlags(TResponse response, WriteFlags writeFlags) 254 { 255 this.Response = response; 256 this.WriteFlags = writeFlags; 257 } 258 259 public TResponse Response { get; } 260 public WriteFlags WriteFlags { get; } 261 } 262 } 263 } 264