• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Diagnostics;
21 using System.IO;
22 using System.Runtime.CompilerServices;
23 using System.Runtime.InteropServices;
24 using System.Threading;
25 using System.Threading.Tasks;
26 
27 using Grpc.Core.Internal;
28 using Grpc.Core.Logging;
29 using Grpc.Core.Profiling;
30 using Grpc.Core.Utils;
31 
32 namespace Grpc.Core.Internal
33 {
34     /// <summary>
35     /// Base for handling both client side and server side calls.
36     /// Manages native call lifecycle and provides convenience methods.
37     /// </summary>
38     internal abstract class AsyncCallBase<TWrite, TRead> : IReceivedMessageCallback, ISendCompletionCallback
39     {
40         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>();
41         protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message.");
42 
43         readonly Action<TWrite, SerializationContext> serializer;
44         readonly Func<DeserializationContext, TRead> deserializer;
45 
46         protected readonly object myLock = new object();
47 
48         protected INativeCall call;
49         protected bool disposed;
50 
51         protected bool started;
52         protected bool cancelRequested;
53 
54         protected TaskCompletionSource<TRead> streamingReadTcs;  // Completion of a pending streaming read if not null.
55         protected TaskCompletionSource<object> streamingWriteTcs;  // Completion of a pending streaming write or send close from client if not null.
56         protected TaskCompletionSource<object> sendStatusFromServerTcs;
57         protected bool isStreamingWriteCompletionDelayed;  // Only used for the client side.
58 
59         protected bool readingDone;  // True if last read (i.e. read with null payload) was already received.
60         protected bool halfcloseRequested;  // True if send close have been initiated.
61         protected bool finished;  // True if close has been received from the peer.
62 
63         protected bool initialMetadataSent;
64         protected long streamingWritesCounter;  // Number of streaming send operations started so far.
65 
AsyncCallBase(Action<TWrite, SerializationContext> serializer, Func<DeserializationContext, TRead> deserializer)66         public AsyncCallBase(Action<TWrite, SerializationContext> serializer, Func<DeserializationContext, TRead> deserializer)
67         {
68             this.serializer = GrpcPreconditions.CheckNotNull(serializer);
69             this.deserializer = GrpcPreconditions.CheckNotNull(deserializer);
70         }
71 
72         /// <summary>
73         /// Requests cancelling the call.
74         /// </summary>
Cancel()75         public void Cancel()
76         {
77             lock (myLock)
78             {
79                 GrpcPreconditions.CheckState(started);
80                 cancelRequested = true;
81 
82                 if (!disposed)
83                 {
84                     call.Cancel();
85                 }
86             }
87         }
88 
89         /// <summary>
90         /// Requests cancelling the call with given status.
91         /// </summary>
CancelWithStatus(Status status)92         protected void CancelWithStatus(Status status)
93         {
94             lock (myLock)
95             {
96                 cancelRequested = true;
97 
98                 if (!disposed)
99                 {
100                     call.CancelWithStatus(status);
101                 }
102             }
103         }
104 
InitializeInternal(INativeCall call)105         protected void InitializeInternal(INativeCall call)
106         {
107             lock (myLock)
108             {
109                 this.call = call;
110             }
111         }
112 
113         /// <summary>
114         /// Initiates sending a message. Only one send operation can be active at a time.
115         /// </summary>
SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags)116         protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags)
117         {
118             using (var serializationScope = DefaultSerializationContext.GetInitializedThreadLocalScope())
119             {
120                 var payload = UnsafeSerialize(msg, serializationScope.Context);
121                 lock (myLock)
122                 {
123                     GrpcPreconditions.CheckState(started);
124                     var earlyResult = CheckSendAllowedOrEarlyResult();
125                     if (earlyResult != null)
126                     {
127                         return earlyResult;
128                     }
129 
130                     call.StartSendMessage(SendCompletionCallback, payload, writeFlags, !initialMetadataSent);
131 
132                     initialMetadataSent = true;
133                     streamingWritesCounter++;
134                     streamingWriteTcs = new TaskCompletionSource<object>();
135                     return streamingWriteTcs.Task;
136                 }
137             }
138         }
139 
140         /// <summary>
141         /// Initiates reading a message. Only one read operation can be active at a time.
142         /// </summary>
ReadMessageInternalAsync()143         protected Task<TRead> ReadMessageInternalAsync()
144         {
145             lock (myLock)
146             {
147                 GrpcPreconditions.CheckState(started);
148                 if (readingDone)
149                 {
150                     // the last read that returns null or throws an exception is idempotent
151                     // and maintains its state.
152                     GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
153                     return streamingReadTcs.Task;
154                 }
155 
156                 GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time");
157                 GrpcPreconditions.CheckState(!disposed);
158 
159                 call.StartReceiveMessage(ReceivedMessageCallback);
160                 streamingReadTcs = new TaskCompletionSource<TRead>();
161                 return streamingReadTcs.Task;
162             }
163         }
164 
165         /// <summary>
166         /// If there are no more pending actions and no new actions can be started, releases
167         /// the underlying native resources.
168         /// </summary>
ReleaseResourcesIfPossible()169         protected bool ReleaseResourcesIfPossible()
170         {
171             if (!disposed && call != null)
172             {
173                 bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
174                 if (noMoreSendCompletions && readingDone && finished)
175                 {
176                     ReleaseResources();
177                     return true;
178                 }
179             }
180             return false;
181         }
182 
183         protected abstract bool IsClient
184         {
185             get;
186         }
187 
188         /// <summary>
189         /// Returns an exception to throw for a failed send operation.
190         /// It is only allowed to call this method for a call that has already finished.
191         /// </summary>
GetRpcExceptionClientOnly()192         protected abstract Exception GetRpcExceptionClientOnly();
193 
ReleaseResources()194         protected void ReleaseResources()
195         {
196             if (call != null)
197             {
198                 call.Dispose();
199             }
200             disposed = true;
201             OnAfterReleaseResourcesLocked();
202         }
203 
OnAfterReleaseResourcesLocked()204         protected virtual void OnAfterReleaseResourcesLocked()
205         {
206         }
207 
OnAfterReleaseResourcesUnlocked()208         protected virtual void OnAfterReleaseResourcesUnlocked()
209         {
210         }
211 
212         /// <summary>
213         /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send
214         /// logic by directly returning the write operation result task. Normally, null is returned.
215         /// </summary>
CheckSendAllowedOrEarlyResult()216         protected abstract Task CheckSendAllowedOrEarlyResult();
217 
218         // runs the serializer, propagating any exceptions being thrown without modifying them
UnsafeSerialize(TWrite msg, DefaultSerializationContext context)219         protected SliceBufferSafeHandle UnsafeSerialize(TWrite msg, DefaultSerializationContext context)
220         {
221             serializer(msg, context);
222             return context.GetPayload();
223         }
224 
TryDeserialize(IBufferReader reader, out TRead msg)225         protected Exception TryDeserialize(IBufferReader reader, out TRead msg)
226         {
227             DefaultDeserializationContext context = null;
228             try
229             {
230                 context = DefaultDeserializationContext.GetInitializedThreadLocal(reader);
231                 msg = deserializer(context);
232                 return null;
233             }
234             catch (Exception e)
235             {
236                 msg = default(TRead);
237                 return e;
238             }
239             finally
240             {
241                 context?.Reset();
242             }
243         }
244 
245         /// <summary>
246         /// Handles send completion (including SendCloseFromClient).
247         /// </summary>
HandleSendFinished(bool success)248         protected void HandleSendFinished(bool success)
249         {
250             bool delayCompletion = false;
251             TaskCompletionSource<object> origTcs = null;
252             bool releasedResources;
253             lock (myLock)
254             {
255                 if (!success && !finished && IsClient) {
256                     // We should be setting this only once per call, following writes will be short circuited
257                     // because they cannot start until the entire call finishes.
258                     GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed);
259 
260                     // leave streamingWriteTcs set, it will be completed once call finished.
261                     isStreamingWriteCompletionDelayed = true;
262                     delayCompletion = true;
263                 }
264                 else
265                 {
266                     origTcs = streamingWriteTcs;
267                     streamingWriteTcs = null;
268                 }
269 
270                 releasedResources = ReleaseResourcesIfPossible();
271             }
272 
273             if (releasedResources)
274             {
275                 OnAfterReleaseResourcesUnlocked();
276             }
277 
278             if (!success)
279             {
280                 if (!delayCompletion)
281                 {
282                     if (IsClient)
283                     {
284                         GrpcPreconditions.CheckState(finished);  // implied by !success && !delayCompletion && IsClient
285                         origTcs.SetException(GetRpcExceptionClientOnly());
286                     }
287                     else
288                     {
289                         origTcs.SetException (new IOException("Error sending from server."));
290                     }
291                 }
292                 // if delayCompletion == true, postpone SetException until call finishes.
293             }
294             else
295             {
296                 origTcs.SetResult(null);
297             }
298         }
299 
300         /// <summary>
301         /// Handles send status from server completion.
302         /// </summary>
HandleSendStatusFromServerFinished(bool success)303         protected void HandleSendStatusFromServerFinished(bool success)
304         {
305             bool releasedResources;
306             lock (myLock)
307             {
308                 releasedResources = ReleaseResourcesIfPossible();
309             }
310 
311             if (releasedResources)
312             {
313                 OnAfterReleaseResourcesUnlocked();
314             }
315 
316             if (!success)
317             {
318                 sendStatusFromServerTcs.SetException(new IOException("Error sending status from server."));
319             }
320             else
321             {
322                 sendStatusFromServerTcs.SetResult(null);
323             }
324         }
325 
326         /// <summary>
327         /// Handles streaming read completion.
328         /// </summary>
HandleReadFinished(bool success, IBufferReader receivedMessageReader)329         protected void HandleReadFinished(bool success, IBufferReader receivedMessageReader)
330         {
331             // if success == false, the message reader will report null payload. It that case we will
332             // treat this completion as the last read an rely on C core to handle the failed
333             // read (e.g. deliver approriate statusCode on the clientside).
334 
335             TRead msg = default(TRead);
336             var deserializeException = (success && receivedMessageReader.TotalLength.HasValue) ? TryDeserialize(receivedMessageReader, out msg) : null;
337 
338             TaskCompletionSource<TRead> origTcs = null;
339             bool releasedResources;
340             lock (myLock)
341             {
342                 origTcs = streamingReadTcs;
343                 if (!receivedMessageReader.TotalLength.HasValue)
344                 {
345                     // This was the last read.
346                     readingDone = true;
347                 }
348 
349                 if (deserializeException != null && IsClient)
350                 {
351                     readingDone = true;
352 
353                     // TODO(jtattermusch): it might be too late to set the status
354                     CancelWithStatus(DeserializeResponseFailureStatus);
355                 }
356 
357                 if (!readingDone)
358                 {
359                     streamingReadTcs = null;
360                 }
361 
362                 releasedResources = ReleaseResourcesIfPossible();
363             }
364 
365             if (releasedResources)
366             {
367                 OnAfterReleaseResourcesUnlocked();
368             }
369 
370             if (deserializeException != null && !IsClient)
371             {
372                 origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException));
373                 return;
374             }
375             origTcs.SetResult(msg);
376         }
377 
378         protected ISendCompletionCallback SendCompletionCallback => this;
379 
ISendCompletionCallback.OnSendCompletion(bool success)380         void ISendCompletionCallback.OnSendCompletion(bool success)
381         {
382             HandleSendFinished(success);
383         }
384 
385         IReceivedMessageCallback ReceivedMessageCallback => this;
386 
IReceivedMessageCallback.OnReceivedMessage(bool success, IBufferReader receivedMessageReader)387         void IReceivedMessageCallback.OnReceivedMessage(bool success, IBufferReader receivedMessageReader)
388         {
389             HandleReadFinished(success, receivedMessageReader);
390         }
391 
RegisterCancellationCallbackForToken(CancellationToken cancellationToken)392         internal CancellationTokenRegistration RegisterCancellationCallbackForToken(CancellationToken cancellationToken)
393         {
394             if (cancellationToken.CanBeCanceled) return cancellationToken.Register(CancelCallFromToken, this);
395             return default(CancellationTokenRegistration);
396         }
397 
398         private static readonly Action<object> CancelCallFromToken = state => ((AsyncCallBase<TWrite, TRead>)state).Cancel();
399     }
400 }
401