1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Diagnostics; 21 using System.IO; 22 using System.Runtime.CompilerServices; 23 using System.Runtime.InteropServices; 24 using System.Threading; 25 using System.Threading.Tasks; 26 27 using Grpc.Core.Internal; 28 using Grpc.Core.Logging; 29 using Grpc.Core.Profiling; 30 using Grpc.Core.Utils; 31 32 namespace Grpc.Core.Internal 33 { 34 /// <summary> 35 /// Base for handling both client side and server side calls. 36 /// Manages native call lifecycle and provides convenience methods. 37 /// </summary> 38 internal abstract class AsyncCallBase<TWrite, TRead> : IReceivedMessageCallback, ISendCompletionCallback 39 { 40 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>(); 41 protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message."); 42 43 readonly Func<TWrite, byte[]> serializer; 44 readonly Func<byte[], TRead> deserializer; 45 46 protected readonly object myLock = new object(); 47 48 protected INativeCall call; 49 protected bool disposed; 50 51 protected bool started; 52 protected bool cancelRequested; 53 54 protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null. 55 protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null. 56 protected TaskCompletionSource<object> sendStatusFromServerTcs; 57 protected bool isStreamingWriteCompletionDelayed; // Only used for the client side. 58 59 protected bool readingDone; // True if last read (i.e. read with null payload) was already received. 60 protected bool halfcloseRequested; // True if send close have been initiated. 61 protected bool finished; // True if close has been received from the peer. 62 63 protected bool initialMetadataSent; 64 protected long streamingWritesCounter; // Number of streaming send operations started so far. 65 AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)66 public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) 67 { 68 this.serializer = GrpcPreconditions.CheckNotNull(serializer); 69 this.deserializer = GrpcPreconditions.CheckNotNull(deserializer); 70 } 71 72 /// <summary> 73 /// Requests cancelling the call. 74 /// </summary> Cancel()75 public void Cancel() 76 { 77 lock (myLock) 78 { 79 GrpcPreconditions.CheckState(started); 80 cancelRequested = true; 81 82 if (!disposed) 83 { 84 call.Cancel(); 85 } 86 } 87 } 88 89 /// <summary> 90 /// Requests cancelling the call with given status. 91 /// </summary> CancelWithStatus(Status status)92 protected void CancelWithStatus(Status status) 93 { 94 lock (myLock) 95 { 96 cancelRequested = true; 97 98 if (!disposed) 99 { 100 call.CancelWithStatus(status); 101 } 102 } 103 } 104 InitializeInternal(INativeCall call)105 protected void InitializeInternal(INativeCall call) 106 { 107 lock (myLock) 108 { 109 this.call = call; 110 } 111 } 112 113 /// <summary> 114 /// Initiates sending a message. Only one send operation can be active at a time. 115 /// </summary> SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags)116 protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags) 117 { 118 byte[] payload = UnsafeSerialize(msg); 119 120 lock (myLock) 121 { 122 GrpcPreconditions.CheckState(started); 123 var earlyResult = CheckSendAllowedOrEarlyResult(); 124 if (earlyResult != null) 125 { 126 return earlyResult; 127 } 128 129 call.StartSendMessage(SendCompletionCallback, payload, writeFlags, !initialMetadataSent); 130 131 initialMetadataSent = true; 132 streamingWritesCounter++; 133 streamingWriteTcs = new TaskCompletionSource<object>(); 134 return streamingWriteTcs.Task; 135 } 136 } 137 138 /// <summary> 139 /// Initiates reading a message. Only one read operation can be active at a time. 140 /// </summary> ReadMessageInternalAsync()141 protected Task<TRead> ReadMessageInternalAsync() 142 { 143 lock (myLock) 144 { 145 GrpcPreconditions.CheckState(started); 146 if (readingDone) 147 { 148 // the last read that returns null or throws an exception is idempotent 149 // and maintains its state. 150 GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads."); 151 return streamingReadTcs.Task; 152 } 153 154 GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time"); 155 GrpcPreconditions.CheckState(!disposed); 156 157 call.StartReceiveMessage(ReceivedMessageCallback); 158 streamingReadTcs = new TaskCompletionSource<TRead>(); 159 return streamingReadTcs.Task; 160 } 161 } 162 163 /// <summary> 164 /// If there are no more pending actions and no new actions can be started, releases 165 /// the underlying native resources. 166 /// </summary> ReleaseResourcesIfPossible()167 protected bool ReleaseResourcesIfPossible() 168 { 169 if (!disposed && call != null) 170 { 171 bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished); 172 if (noMoreSendCompletions && readingDone && finished) 173 { 174 ReleaseResources(); 175 return true; 176 } 177 } 178 return false; 179 } 180 181 protected abstract bool IsClient 182 { 183 get; 184 } 185 186 /// <summary> 187 /// Returns an exception to throw for a failed send operation. 188 /// It is only allowed to call this method for a call that has already finished. 189 /// </summary> GetRpcExceptionClientOnly()190 protected abstract Exception GetRpcExceptionClientOnly(); 191 ReleaseResources()192 protected void ReleaseResources() 193 { 194 if (call != null) 195 { 196 call.Dispose(); 197 } 198 disposed = true; 199 OnAfterReleaseResourcesLocked(); 200 } 201 OnAfterReleaseResourcesLocked()202 protected virtual void OnAfterReleaseResourcesLocked() 203 { 204 } 205 OnAfterReleaseResourcesUnlocked()206 protected virtual void OnAfterReleaseResourcesUnlocked() 207 { 208 } 209 210 /// <summary> 211 /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send 212 /// logic by directly returning the write operation result task. Normally, null is returned. 213 /// </summary> CheckSendAllowedOrEarlyResult()214 protected abstract Task CheckSendAllowedOrEarlyResult(); 215 UnsafeSerialize(TWrite msg)216 protected byte[] UnsafeSerialize(TWrite msg) 217 { 218 return serializer(msg); 219 } 220 TryDeserialize(byte[] payload, out TRead msg)221 protected Exception TryDeserialize(byte[] payload, out TRead msg) 222 { 223 try 224 { 225 msg = deserializer(payload); 226 return null; 227 } 228 catch (Exception e) 229 { 230 msg = default(TRead); 231 return e; 232 } 233 } 234 235 /// <summary> 236 /// Handles send completion (including SendCloseFromClient). 237 /// </summary> HandleSendFinished(bool success)238 protected void HandleSendFinished(bool success) 239 { 240 bool delayCompletion = false; 241 TaskCompletionSource<object> origTcs = null; 242 bool releasedResources; 243 lock (myLock) 244 { 245 if (!success && !finished && IsClient) { 246 // We should be setting this only once per call, following writes will be short circuited 247 // because they cannot start until the entire call finishes. 248 GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed); 249 250 // leave streamingWriteTcs set, it will be completed once call finished. 251 isStreamingWriteCompletionDelayed = true; 252 delayCompletion = true; 253 } 254 else 255 { 256 origTcs = streamingWriteTcs; 257 streamingWriteTcs = null; 258 } 259 260 releasedResources = ReleaseResourcesIfPossible(); 261 } 262 263 if (releasedResources) 264 { 265 OnAfterReleaseResourcesUnlocked(); 266 } 267 268 if (!success) 269 { 270 if (!delayCompletion) 271 { 272 if (IsClient) 273 { 274 GrpcPreconditions.CheckState(finished); // implied by !success && !delayCompletion && IsClient 275 origTcs.SetException(GetRpcExceptionClientOnly()); 276 } 277 else 278 { 279 origTcs.SetException (new IOException("Error sending from server.")); 280 } 281 } 282 // if delayCompletion == true, postpone SetException until call finishes. 283 } 284 else 285 { 286 origTcs.SetResult(null); 287 } 288 } 289 290 /// <summary> 291 /// Handles send status from server completion. 292 /// </summary> HandleSendStatusFromServerFinished(bool success)293 protected void HandleSendStatusFromServerFinished(bool success) 294 { 295 bool releasedResources; 296 lock (myLock) 297 { 298 releasedResources = ReleaseResourcesIfPossible(); 299 } 300 301 if (releasedResources) 302 { 303 OnAfterReleaseResourcesUnlocked(); 304 } 305 306 if (!success) 307 { 308 sendStatusFromServerTcs.SetException(new IOException("Error sending status from server.")); 309 } 310 else 311 { 312 sendStatusFromServerTcs.SetResult(null); 313 } 314 } 315 316 /// <summary> 317 /// Handles streaming read completion. 318 /// </summary> HandleReadFinished(bool success, byte[] receivedMessage)319 protected void HandleReadFinished(bool success, byte[] receivedMessage) 320 { 321 // if success == false, received message will be null. It that case we will 322 // treat this completion as the last read an rely on C core to handle the failed 323 // read (e.g. deliver approriate statusCode on the clientside). 324 325 TRead msg = default(TRead); 326 var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null; 327 328 TaskCompletionSource<TRead> origTcs = null; 329 bool releasedResources; 330 lock (myLock) 331 { 332 origTcs = streamingReadTcs; 333 if (receivedMessage == null) 334 { 335 // This was the last read. 336 readingDone = true; 337 } 338 339 if (deserializeException != null && IsClient) 340 { 341 readingDone = true; 342 343 // TODO(jtattermusch): it might be too late to set the status 344 CancelWithStatus(DeserializeResponseFailureStatus); 345 } 346 347 if (!readingDone) 348 { 349 streamingReadTcs = null; 350 } 351 352 releasedResources = ReleaseResourcesIfPossible(); 353 } 354 355 if (releasedResources) 356 { 357 OnAfterReleaseResourcesUnlocked(); 358 } 359 360 if (deserializeException != null && !IsClient) 361 { 362 origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException)); 363 return; 364 } 365 origTcs.SetResult(msg); 366 } 367 368 protected ISendCompletionCallback SendCompletionCallback => this; 369 ISendCompletionCallback.OnSendCompletion(bool success)370 void ISendCompletionCallback.OnSendCompletion(bool success) 371 { 372 HandleSendFinished(success); 373 } 374 375 IReceivedMessageCallback ReceivedMessageCallback => this; 376 IReceivedMessageCallback.OnReceivedMessage(bool success, byte[] receivedMessage)377 void IReceivedMessageCallback.OnReceivedMessage(bool success, byte[] receivedMessage) 378 { 379 HandleReadFinished(success, receivedMessage); 380 } 381 } 382 } 383