#region Copyright notice and license // Copyright 2015 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #endregion using System; using System.Diagnostics; using System.IO; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; using Grpc.Core.Profiling; using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// /// Base for handling both client side and server side calls. /// Manages native call lifecycle and provides convenience methods. /// internal abstract class AsyncCallBase : IReceivedMessageCallback, ISendCompletionCallback { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType>(); protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message."); readonly Action serializer; readonly Func deserializer; protected readonly object myLock = new object(); protected INativeCall call; protected bool disposed; protected bool started; protected bool cancelRequested; protected TaskCompletionSource streamingReadTcs; // Completion of a pending streaming read if not null. protected TaskCompletionSource streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null. protected TaskCompletionSource sendStatusFromServerTcs; protected bool isStreamingWriteCompletionDelayed; // Only used for the client side. protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool halfcloseRequested; // True if send close have been initiated. protected bool finished; // True if close has been received from the peer. protected bool initialMetadataSent; protected long streamingWritesCounter; // Number of streaming send operations started so far. public AsyncCallBase(Action serializer, Func deserializer) { this.serializer = GrpcPreconditions.CheckNotNull(serializer); this.deserializer = GrpcPreconditions.CheckNotNull(deserializer); } /// /// Requests cancelling the call. /// public void Cancel() { lock (myLock) { GrpcPreconditions.CheckState(started); cancelRequested = true; if (!disposed) { call.Cancel(); } } } /// /// Requests cancelling the call with given status. /// protected void CancelWithStatus(Status status) { lock (myLock) { cancelRequested = true; if (!disposed) { call.CancelWithStatus(status); } } } protected void InitializeInternal(INativeCall call) { lock (myLock) { this.call = call; } } /// /// Initiates sending a message. Only one send operation can be active at a time. /// protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags) { using (var serializationScope = DefaultSerializationContext.GetInitializedThreadLocalScope()) { var payload = UnsafeSerialize(msg, serializationScope.Context); lock (myLock) { GrpcPreconditions.CheckState(started); var earlyResult = CheckSendAllowedOrEarlyResult(); if (earlyResult != null) { return earlyResult; } call.StartSendMessage(SendCompletionCallback, payload, writeFlags, !initialMetadataSent); initialMetadataSent = true; streamingWritesCounter++; streamingWriteTcs = new TaskCompletionSource(); return streamingWriteTcs.Task; } } } /// /// Initiates reading a message. Only one read operation can be active at a time. /// protected Task ReadMessageInternalAsync() { lock (myLock) { GrpcPreconditions.CheckState(started); if (readingDone) { // the last read that returns null or throws an exception is idempotent // and maintains its state. GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads."); return streamingReadTcs.Task; } GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time"); GrpcPreconditions.CheckState(!disposed); call.StartReceiveMessage(ReceivedMessageCallback); streamingReadTcs = new TaskCompletionSource(); return streamingReadTcs.Task; } } /// /// If there are no more pending actions and no new actions can be started, releases /// the underlying native resources. /// protected bool ReleaseResourcesIfPossible() { if (!disposed && call != null) { bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished); if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); return true; } } return false; } protected abstract bool IsClient { get; } /// /// Returns an exception to throw for a failed send operation. /// It is only allowed to call this method for a call that has already finished. /// protected abstract Exception GetRpcExceptionClientOnly(); protected void ReleaseResources() { if (call != null) { call.Dispose(); } disposed = true; OnAfterReleaseResourcesLocked(); } protected virtual void OnAfterReleaseResourcesLocked() { } protected virtual void OnAfterReleaseResourcesUnlocked() { } /// /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send /// logic by directly returning the write operation result task. Normally, null is returned. /// protected abstract Task CheckSendAllowedOrEarlyResult(); // runs the serializer, propagating any exceptions being thrown without modifying them protected SliceBufferSafeHandle UnsafeSerialize(TWrite msg, DefaultSerializationContext context) { serializer(msg, context); return context.GetPayload(); } protected Exception TryDeserialize(IBufferReader reader, out TRead msg) { DefaultDeserializationContext context = null; try { context = DefaultDeserializationContext.GetInitializedThreadLocal(reader); msg = deserializer(context); return null; } catch (Exception e) { msg = default(TRead); return e; } finally { context?.Reset(); } } /// /// Handles send completion (including SendCloseFromClient). /// protected void HandleSendFinished(bool success) { bool delayCompletion = false; TaskCompletionSource origTcs = null; bool releasedResources; lock (myLock) { if (!success && !finished && IsClient) { // We should be setting this only once per call, following writes will be short circuited // because they cannot start until the entire call finishes. GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed); // leave streamingWriteTcs set, it will be completed once call finished. isStreamingWriteCompletionDelayed = true; delayCompletion = true; } else { origTcs = streamingWriteTcs; streamingWriteTcs = null; } releasedResources = ReleaseResourcesIfPossible(); } if (releasedResources) { OnAfterReleaseResourcesUnlocked(); } if (!success) { if (!delayCompletion) { if (IsClient) { GrpcPreconditions.CheckState(finished); // implied by !success && !delayCompletion && IsClient origTcs.SetException(GetRpcExceptionClientOnly()); } else { origTcs.SetException (new IOException("Error sending from server.")); } } // if delayCompletion == true, postpone SetException until call finishes. } else { origTcs.SetResult(null); } } /// /// Handles send status from server completion. /// protected void HandleSendStatusFromServerFinished(bool success) { bool releasedResources; lock (myLock) { releasedResources = ReleaseResourcesIfPossible(); } if (releasedResources) { OnAfterReleaseResourcesUnlocked(); } if (!success) { sendStatusFromServerTcs.SetException(new IOException("Error sending status from server.")); } else { sendStatusFromServerTcs.SetResult(null); } } /// /// Handles streaming read completion. /// protected void HandleReadFinished(bool success, IBufferReader receivedMessageReader) { // if success == false, the message reader will report null payload. It that case we will // treat this completion as the last read an rely on C core to handle the failed // read (e.g. deliver approriate statusCode on the clientside). TRead msg = default(TRead); var deserializeException = (success && receivedMessageReader.TotalLength.HasValue) ? TryDeserialize(receivedMessageReader, out msg) : null; TaskCompletionSource origTcs = null; bool releasedResources; lock (myLock) { origTcs = streamingReadTcs; if (!receivedMessageReader.TotalLength.HasValue) { // This was the last read. readingDone = true; } if (deserializeException != null && IsClient) { readingDone = true; // TODO(jtattermusch): it might be too late to set the status CancelWithStatus(DeserializeResponseFailureStatus); } if (!readingDone) { streamingReadTcs = null; } releasedResources = ReleaseResourcesIfPossible(); } if (releasedResources) { OnAfterReleaseResourcesUnlocked(); } if (deserializeException != null && !IsClient) { origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException)); return; } origTcs.SetResult(msg); } protected ISendCompletionCallback SendCompletionCallback => this; void ISendCompletionCallback.OnSendCompletion(bool success) { HandleSendFinished(success); } IReceivedMessageCallback ReceivedMessageCallback => this; void IReceivedMessageCallback.OnReceivedMessage(bool success, IBufferReader receivedMessageReader) { HandleReadFinished(success, receivedMessageReader); } internal CancellationTokenRegistration RegisterCancellationCallbackForToken(CancellationToken cancellationToken) { if (cancellationToken.CanBeCanceled) return cancellationToken.Register(CancelCallFromToken, this); return default(CancellationTokenRegistration); } private static readonly Action CancelCallFromToken = state => ((AsyncCallBase)state).Cancel(); } }