1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Diagnostics; 21 using System.IO; 22 using System.Runtime.CompilerServices; 23 using System.Runtime.InteropServices; 24 using System.Threading; 25 using System.Threading.Tasks; 26 27 using Grpc.Core.Internal; 28 using Grpc.Core.Logging; 29 using Grpc.Core.Profiling; 30 using Grpc.Core.Utils; 31 32 namespace Grpc.Core.Internal 33 { 34 /// <summary> 35 /// Base for handling both client side and server side calls. 36 /// Manages native call lifecycle and provides convenience methods. 37 /// </summary> 38 internal abstract class AsyncCallBase<TWrite, TRead> : IReceivedMessageCallback, ISendCompletionCallback 39 { 40 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>(); 41 protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message."); 42 43 readonly Action<TWrite, SerializationContext> serializer; 44 readonly Func<DeserializationContext, TRead> deserializer; 45 46 protected readonly object myLock = new object(); 47 48 protected INativeCall call; 49 protected bool disposed; 50 51 protected bool started; 52 protected bool cancelRequested; 53 54 protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null. 55 protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null. 56 protected TaskCompletionSource<object> sendStatusFromServerTcs; 57 protected bool isStreamingWriteCompletionDelayed; // Only used for the client side. 58 59 protected bool readingDone; // True if last read (i.e. read with null payload) was already received. 60 protected bool halfcloseRequested; // True if send close have been initiated. 61 protected bool finished; // True if close has been received from the peer. 62 63 protected bool initialMetadataSent; 64 protected long streamingWritesCounter; // Number of streaming send operations started so far. 65 AsyncCallBase(Action<TWrite, SerializationContext> serializer, Func<DeserializationContext, TRead> deserializer)66 public AsyncCallBase(Action<TWrite, SerializationContext> serializer, Func<DeserializationContext, TRead> deserializer) 67 { 68 this.serializer = GrpcPreconditions.CheckNotNull(serializer); 69 this.deserializer = GrpcPreconditions.CheckNotNull(deserializer); 70 } 71 72 /// <summary> 73 /// Requests cancelling the call. 74 /// </summary> Cancel()75 public void Cancel() 76 { 77 lock (myLock) 78 { 79 GrpcPreconditions.CheckState(started); 80 cancelRequested = true; 81 82 if (!disposed) 83 { 84 call.Cancel(); 85 } 86 } 87 } 88 89 /// <summary> 90 /// Requests cancelling the call with given status. 91 /// </summary> CancelWithStatus(Status status)92 protected void CancelWithStatus(Status status) 93 { 94 lock (myLock) 95 { 96 cancelRequested = true; 97 98 if (!disposed) 99 { 100 call.CancelWithStatus(status); 101 } 102 } 103 } 104 InitializeInternal(INativeCall call)105 protected void InitializeInternal(INativeCall call) 106 { 107 lock (myLock) 108 { 109 this.call = call; 110 } 111 } 112 113 /// <summary> 114 /// Initiates sending a message. Only one send operation can be active at a time. 115 /// </summary> SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags)116 protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags) 117 { 118 using (var serializationScope = DefaultSerializationContext.GetInitializedThreadLocalScope()) 119 { 120 var payload = UnsafeSerialize(msg, serializationScope.Context); 121 lock (myLock) 122 { 123 GrpcPreconditions.CheckState(started); 124 var earlyResult = CheckSendAllowedOrEarlyResult(); 125 if (earlyResult != null) 126 { 127 return earlyResult; 128 } 129 130 call.StartSendMessage(SendCompletionCallback, payload, writeFlags, !initialMetadataSent); 131 132 initialMetadataSent = true; 133 streamingWritesCounter++; 134 streamingWriteTcs = new TaskCompletionSource<object>(); 135 return streamingWriteTcs.Task; 136 } 137 } 138 } 139 140 /// <summary> 141 /// Initiates reading a message. Only one read operation can be active at a time. 142 /// </summary> ReadMessageInternalAsync()143 protected Task<TRead> ReadMessageInternalAsync() 144 { 145 lock (myLock) 146 { 147 GrpcPreconditions.CheckState(started); 148 if (readingDone) 149 { 150 // the last read that returns null or throws an exception is idempotent 151 // and maintains its state. 152 GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads."); 153 return streamingReadTcs.Task; 154 } 155 156 GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time"); 157 GrpcPreconditions.CheckState(!disposed); 158 159 call.StartReceiveMessage(ReceivedMessageCallback); 160 streamingReadTcs = new TaskCompletionSource<TRead>(); 161 return streamingReadTcs.Task; 162 } 163 } 164 165 /// <summary> 166 /// If there are no more pending actions and no new actions can be started, releases 167 /// the underlying native resources. 168 /// </summary> ReleaseResourcesIfPossible()169 protected bool ReleaseResourcesIfPossible() 170 { 171 if (!disposed && call != null) 172 { 173 bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished); 174 if (noMoreSendCompletions && readingDone && finished) 175 { 176 ReleaseResources(); 177 return true; 178 } 179 } 180 return false; 181 } 182 183 protected abstract bool IsClient 184 { 185 get; 186 } 187 188 /// <summary> 189 /// Returns an exception to throw for a failed send operation. 190 /// It is only allowed to call this method for a call that has already finished. 191 /// </summary> GetRpcExceptionClientOnly()192 protected abstract Exception GetRpcExceptionClientOnly(); 193 ReleaseResources()194 protected void ReleaseResources() 195 { 196 if (call != null) 197 { 198 call.Dispose(); 199 } 200 disposed = true; 201 OnAfterReleaseResourcesLocked(); 202 } 203 OnAfterReleaseResourcesLocked()204 protected virtual void OnAfterReleaseResourcesLocked() 205 { 206 } 207 OnAfterReleaseResourcesUnlocked()208 protected virtual void OnAfterReleaseResourcesUnlocked() 209 { 210 } 211 212 /// <summary> 213 /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send 214 /// logic by directly returning the write operation result task. Normally, null is returned. 215 /// </summary> CheckSendAllowedOrEarlyResult()216 protected abstract Task CheckSendAllowedOrEarlyResult(); 217 218 // runs the serializer, propagating any exceptions being thrown without modifying them UnsafeSerialize(TWrite msg, DefaultSerializationContext context)219 protected SliceBufferSafeHandle UnsafeSerialize(TWrite msg, DefaultSerializationContext context) 220 { 221 serializer(msg, context); 222 return context.GetPayload(); 223 } 224 TryDeserialize(IBufferReader reader, out TRead msg)225 protected Exception TryDeserialize(IBufferReader reader, out TRead msg) 226 { 227 DefaultDeserializationContext context = null; 228 try 229 { 230 context = DefaultDeserializationContext.GetInitializedThreadLocal(reader); 231 msg = deserializer(context); 232 return null; 233 } 234 catch (Exception e) 235 { 236 msg = default(TRead); 237 return e; 238 } 239 finally 240 { 241 context?.Reset(); 242 } 243 } 244 245 /// <summary> 246 /// Handles send completion (including SendCloseFromClient). 247 /// </summary> HandleSendFinished(bool success)248 protected void HandleSendFinished(bool success) 249 { 250 bool delayCompletion = false; 251 TaskCompletionSource<object> origTcs = null; 252 bool releasedResources; 253 lock (myLock) 254 { 255 if (!success && !finished && IsClient) { 256 // We should be setting this only once per call, following writes will be short circuited 257 // because they cannot start until the entire call finishes. 258 GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed); 259 260 // leave streamingWriteTcs set, it will be completed once call finished. 261 isStreamingWriteCompletionDelayed = true; 262 delayCompletion = true; 263 } 264 else 265 { 266 origTcs = streamingWriteTcs; 267 streamingWriteTcs = null; 268 } 269 270 releasedResources = ReleaseResourcesIfPossible(); 271 } 272 273 if (releasedResources) 274 { 275 OnAfterReleaseResourcesUnlocked(); 276 } 277 278 if (!success) 279 { 280 if (!delayCompletion) 281 { 282 if (IsClient) 283 { 284 GrpcPreconditions.CheckState(finished); // implied by !success && !delayCompletion && IsClient 285 origTcs.SetException(GetRpcExceptionClientOnly()); 286 } 287 else 288 { 289 origTcs.SetException (new IOException("Error sending from server.")); 290 } 291 } 292 // if delayCompletion == true, postpone SetException until call finishes. 293 } 294 else 295 { 296 origTcs.SetResult(null); 297 } 298 } 299 300 /// <summary> 301 /// Handles send status from server completion. 302 /// </summary> HandleSendStatusFromServerFinished(bool success)303 protected void HandleSendStatusFromServerFinished(bool success) 304 { 305 bool releasedResources; 306 lock (myLock) 307 { 308 releasedResources = ReleaseResourcesIfPossible(); 309 } 310 311 if (releasedResources) 312 { 313 OnAfterReleaseResourcesUnlocked(); 314 } 315 316 if (!success) 317 { 318 sendStatusFromServerTcs.SetException(new IOException("Error sending status from server.")); 319 } 320 else 321 { 322 sendStatusFromServerTcs.SetResult(null); 323 } 324 } 325 326 /// <summary> 327 /// Handles streaming read completion. 328 /// </summary> HandleReadFinished(bool success, IBufferReader receivedMessageReader)329 protected void HandleReadFinished(bool success, IBufferReader receivedMessageReader) 330 { 331 // if success == false, the message reader will report null payload. It that case we will 332 // treat this completion as the last read an rely on C core to handle the failed 333 // read (e.g. deliver approriate statusCode on the clientside). 334 335 TRead msg = default(TRead); 336 var deserializeException = (success && receivedMessageReader.TotalLength.HasValue) ? TryDeserialize(receivedMessageReader, out msg) : null; 337 338 TaskCompletionSource<TRead> origTcs = null; 339 bool releasedResources; 340 lock (myLock) 341 { 342 origTcs = streamingReadTcs; 343 if (!receivedMessageReader.TotalLength.HasValue) 344 { 345 // This was the last read. 346 readingDone = true; 347 } 348 349 if (deserializeException != null && IsClient) 350 { 351 readingDone = true; 352 353 // TODO(jtattermusch): it might be too late to set the status 354 CancelWithStatus(DeserializeResponseFailureStatus); 355 } 356 357 if (!readingDone) 358 { 359 streamingReadTcs = null; 360 } 361 362 releasedResources = ReleaseResourcesIfPossible(); 363 } 364 365 if (releasedResources) 366 { 367 OnAfterReleaseResourcesUnlocked(); 368 } 369 370 if (deserializeException != null && !IsClient) 371 { 372 origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException)); 373 return; 374 } 375 origTcs.SetResult(msg); 376 } 377 378 protected ISendCompletionCallback SendCompletionCallback => this; 379 ISendCompletionCallback.OnSendCompletion(bool success)380 void ISendCompletionCallback.OnSendCompletion(bool success) 381 { 382 HandleSendFinished(success); 383 } 384 385 IReceivedMessageCallback ReceivedMessageCallback => this; 386 IReceivedMessageCallback.OnReceivedMessage(bool success, IBufferReader receivedMessageReader)387 void IReceivedMessageCallback.OnReceivedMessage(bool success, IBufferReader receivedMessageReader) 388 { 389 HandleReadFinished(success, receivedMessageReader); 390 } 391 RegisterCancellationCallbackForToken(CancellationToken cancellationToken)392 internal CancellationTokenRegistration RegisterCancellationCallbackForToken(CancellationToken cancellationToken) 393 { 394 if (cancellationToken.CanBeCanceled) return cancellationToken.Register(CancelCallFromToken, this); 395 return default(CancellationTokenRegistration); 396 } 397 398 private static readonly Action<object> CancelCallFromToken = state => ((AsyncCallBase<TWrite, TRead>)state).Cancel(); 399 } 400 } 401