• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Threading;
21 using System.Threading.Tasks;
22 using Grpc.Core.Logging;
23 using Grpc.Core.Profiling;
24 using Grpc.Core.Utils;
25 
26 namespace Grpc.Core.Internal
27 {
28     /// <summary>
29     /// Manages client side native call lifecycle.
30     /// </summary>
31     internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>, IUnaryResponseClientCallback, IReceivedStatusOnClientCallback, IReceivedResponseHeadersCallback
32     {
33         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
34 
35         readonly CallInvocationDetails<TRequest, TResponse> details;
36         readonly INativeCall injectedNativeCall;  // for testing
37 
38         bool registeredWithChannel;
39 
40         // Dispose of to de-register cancellation token registration
41         CancellationTokenRegistration cancellationTokenRegistration;
42 
43         // Completion of a pending unary response if not null.
44         TaskCompletionSource<TResponse> unaryResponseTcs;
45 
46         // Completion of a streaming response call if not null.
47         TaskCompletionSource<object> streamingResponseCallFinishedTcs;
48 
49         // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers).
50         // Response headers set here once received.
51         TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
52 
53         // Set after status is received. Used for both unary and streaming response calls.
54         ClientSideStatus? finishedStatus;
55 
AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)56         public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
57             : base(callDetails.RequestMarshaller.ContextualSerializer, callDetails.ResponseMarshaller.ContextualDeserializer)
58         {
59             this.details = callDetails.WithOptions(callDetails.Options.Normalize());
60             this.initialMetadataSent = true;  // we always send metadata at the very beginning of the call.
61         }
62 
63         /// <summary>
64         /// This constructor should only be used for testing.
65         /// </summary>
AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall)66         public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails)
67         {
68             this.injectedNativeCall = injectedNativeCall;
69         }
70 
71         // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
72         // it is reusing fair amount of code in this class, so we are leaving it here.
73         /// <summary>
74         /// Blocking unary request - unary response call.
75         /// </summary>
UnaryCall(TRequest msg)76         public TResponse UnaryCall(TRequest msg)
77         {
78             var profiler = Profilers.ForCurrentThread();
79 
80             using (profiler.NewScope("AsyncCall.UnaryCall"))
81             using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.CreateSync())
82             {
83                 bool callStartedOk = false;
84                 try
85                 {
86                     unaryResponseTcs = new TaskCompletionSource<TResponse>();
87 
88                     lock (myLock)
89                     {
90                         GrpcPreconditions.CheckState(!started);
91                         started = true;
92                         Initialize(cq);
93 
94                         halfcloseRequested = true;
95                         readingDone = true;
96                     }
97 
98                     using (var serializationScope = DefaultSerializationContext.GetInitializedThreadLocalScope())
99                     using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
100                     {
101                         var payload = UnsafeSerialize(msg, serializationScope.Context); // do before metadata array?
102                         var ctx = details.Channel.Environment.BatchContextPool.Lease();
103                         try
104                         {
105                             call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
106                             callStartedOk = true;
107 
108                             var ev = cq.Pluck(ctx.Handle);
109                             bool success = (ev.success != 0);
110                             try
111                             {
112                                 using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
113                                 {
114                                     HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessageReader(), ctx.GetReceivedInitialMetadata());
115                                 }
116                             }
117                             catch (Exception e)
118                             {
119                                 Logger.Error(e, "Exception occurred while invoking completion delegate.");
120                             }
121                         }
122                         finally
123                         {
124                             ctx.Recycle();
125                         }
126                     }
127                 }
128                 finally
129                 {
130                     if (!callStartedOk)
131                     {
132                         lock (myLock)
133                         {
134                             OnFailedToStartCallLocked();
135                         }
136                     }
137                 }
138 
139                 // Once the blocking call returns, the result should be available synchronously.
140                 // Note that GetAwaiter().GetResult() doesn't wrap exceptions in AggregateException.
141                 return unaryResponseTcs.Task.GetAwaiter().GetResult();
142             }
143         }
144 
145         /// <summary>
146         /// Starts a unary request - unary response call.
147         /// </summary>
UnaryCallAsync(TRequest msg)148         public Task<TResponse> UnaryCallAsync(TRequest msg)
149         {
150             lock (myLock)
151             {
152                 bool callStartedOk = false;
153                 try
154                 {
155                     GrpcPreconditions.CheckState(!started);
156                     started = true;
157 
158                     Initialize(details.Channel.CompletionQueue);
159 
160                     halfcloseRequested = true;
161                     readingDone = true;
162 
163                     using (var serializationScope = DefaultSerializationContext.GetInitializedThreadLocalScope())
164                     {
165                         var payload = UnsafeSerialize(msg, serializationScope.Context);
166                         unaryResponseTcs = new TaskCompletionSource<TResponse>();
167                         using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
168                         {
169                             call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
170                             callStartedOk = true;
171                         }
172                     }
173 
174                     return unaryResponseTcs.Task;
175                 }
176                 finally
177                 {
178                     if (!callStartedOk)
179                     {
180                         OnFailedToStartCallLocked();
181                     }
182                 }
183             }
184         }
185 
186         /// <summary>
187         /// Starts a streamed request - unary response call.
188         /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
189         /// </summary>
ClientStreamingCallAsync()190         public Task<TResponse> ClientStreamingCallAsync()
191         {
192             lock (myLock)
193             {
194                 bool callStartedOk = false;
195                 try
196                 {
197                     GrpcPreconditions.CheckState(!started);
198                     started = true;
199 
200                     Initialize(details.Channel.CompletionQueue);
201 
202                     readingDone = true;
203 
204                     unaryResponseTcs = new TaskCompletionSource<TResponse>();
205                     using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
206                     {
207                         call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags);
208                         callStartedOk = true;
209                     }
210 
211                     return unaryResponseTcs.Task;
212                 }
213                 finally
214                 {
215                     if (!callStartedOk)
216                     {
217                         OnFailedToStartCallLocked();
218                     }
219                 }
220             }
221         }
222 
223         /// <summary>
224         /// Starts a unary request - streamed response call.
225         /// </summary>
StartServerStreamingCall(TRequest msg)226         public void StartServerStreamingCall(TRequest msg)
227         {
228             lock (myLock)
229             {
230                 bool callStartedOk = false;
231                 try
232                 {
233                     GrpcPreconditions.CheckState(!started);
234                     started = true;
235 
236                     Initialize(details.Channel.CompletionQueue);
237 
238                     halfcloseRequested = true;
239 
240                     using (var serializationScope = DefaultSerializationContext.GetInitializedThreadLocalScope())
241                     {
242                         var payload = UnsafeSerialize(msg, serializationScope.Context);
243                         streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
244                         using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
245                         {
246                             call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
247                             callStartedOk = true;
248                         }
249                     }
250                     call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
251                 }
252                 finally
253                 {
254                     if (!callStartedOk)
255                     {
256                         OnFailedToStartCallLocked();
257                     }
258                 }
259             }
260         }
261 
262         /// <summary>
263         /// Starts a streaming request - streaming response call.
264         /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
265         /// </summary>
StartDuplexStreamingCall()266         public void StartDuplexStreamingCall()
267         {
268             lock (myLock)
269             {
270                 bool callStartedOk = false;
271                 try
272                 {
273                     GrpcPreconditions.CheckState(!started);
274                     started = true;
275 
276                     Initialize(details.Channel.CompletionQueue);
277 
278                     streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
279                     using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
280                     {
281                         call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags);
282                         callStartedOk = true;
283                     }
284                     call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
285                 }
286                 finally
287                 {
288                     if (!callStartedOk)
289                     {
290                         OnFailedToStartCallLocked();
291                     }
292                 }
293             }
294         }
295 
296         /// <summary>
297         /// Sends a streaming request. Only one pending send action is allowed at any given time.
298         /// </summary>
SendMessageAsync(TRequest msg, WriteFlags writeFlags)299         public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags)
300         {
301             return SendMessageInternalAsync(msg, writeFlags);
302         }
303 
304         /// <summary>
305         /// Receives a streaming response. Only one pending read action is allowed at any given time.
306         /// </summary>
ReadMessageAsync()307         public Task<TResponse> ReadMessageAsync()
308         {
309             return ReadMessageInternalAsync();
310         }
311 
312         /// <summary>
313         /// Sends halfclose, indicating client is done with streaming requests.
314         /// Only one pending send action is allowed at any given time.
315         /// </summary>
SendCloseFromClientAsync()316         public Task SendCloseFromClientAsync()
317         {
318             lock (myLock)
319             {
320                 GrpcPreconditions.CheckState(started);
321 
322                 var earlyResult = CheckSendPreconditionsClientSide();
323                 if (earlyResult != null)
324                 {
325                     return earlyResult;
326                 }
327 
328                 if (disposed || finished)
329                 {
330                     // In case the call has already been finished by the serverside,
331                     // the halfclose has already been done implicitly, so just return
332                     // completed task here.
333                     halfcloseRequested = true;
334                     return TaskUtils.CompletedTask;
335                 }
336                 call.StartSendCloseFromClient(SendCompletionCallback);
337 
338                 halfcloseRequested = true;
339                 streamingWriteTcs = new TaskCompletionSource<object>();
340                 return streamingWriteTcs.Task;
341             }
342         }
343 
344         /// <summary>
345         /// Get the task that completes once if streaming response call finishes with ok status and throws RpcException with given status otherwise.
346         /// </summary>
347         public Task StreamingResponseCallFinishedTask
348         {
349             get
350             {
351                 return streamingResponseCallFinishedTcs.Task;
352             }
353         }
354 
355         /// <summary>
356         /// Get the task that completes once response headers are received.
357         /// </summary>
358         public Task<Metadata> ResponseHeadersAsync
359         {
360             get
361             {
362                 return responseHeadersTcs.Task;
363             }
364         }
365 
366         /// <summary>
367         /// Gets the resulting status if the call has already finished.
368         /// Throws InvalidOperationException otherwise.
369         /// </summary>
GetStatus()370         public Status GetStatus()
371         {
372             lock (myLock)
373             {
374                 GrpcPreconditions.CheckState(finishedStatus.HasValue, "Status can only be accessed once the call has finished.");
375                 return finishedStatus.Value.Status;
376             }
377         }
378 
379         /// <summary>
380         /// Gets the trailing metadata if the call has already finished.
381         /// Throws InvalidOperationException otherwise.
382         /// </summary>
GetTrailers()383         public Metadata GetTrailers()
384         {
385             lock (myLock)
386             {
387                 GrpcPreconditions.CheckState(finishedStatus.HasValue, "Trailers can only be accessed once the call has finished.");
388                 return finishedStatus.Value.Trailers;
389             }
390         }
391 
392         public CallInvocationDetails<TRequest, TResponse> Details
393         {
394             get
395             {
396                 return this.details;
397             }
398         }
399 
OnAfterReleaseResourcesLocked()400         protected override void OnAfterReleaseResourcesLocked()
401         {
402             if (registeredWithChannel)
403             {
404                 details.Channel.RemoveCallReference(this);
405                 registeredWithChannel = false;
406             }
407         }
408 
OnAfterReleaseResourcesUnlocked()409         protected override void OnAfterReleaseResourcesUnlocked()
410         {
411             // If cancellation callback is in progress, this can block
412             // so we need to do this outside of call's lock to prevent
413             // deadlock.
414             // See https://github.com/grpc/grpc/issues/14777
415             // See https://github.com/dotnet/corefx/issues/14903
416             cancellationTokenRegistration.Dispose();
417         }
418 
419         protected override bool IsClient
420         {
421             get { return true; }
422         }
423 
GetRpcExceptionClientOnly()424         protected override Exception GetRpcExceptionClientOnly()
425         {
426             return new RpcException(finishedStatus.Value.Status, finishedStatus.Value.Trailers);
427         }
428 
CheckSendAllowedOrEarlyResult()429         protected override Task CheckSendAllowedOrEarlyResult()
430         {
431             var earlyResult = CheckSendPreconditionsClientSide();
432             if (earlyResult != null)
433             {
434                 return earlyResult;
435             }
436 
437             if (finishedStatus.HasValue)
438             {
439                 // throwing RpcException if we already received status on client
440                 // side makes the most sense.
441                 // Note that this throws even for StatusCode.OK.
442                 // Writing after the call has finished is not a programming error because server can close
443                 // the call anytime, so don't throw directly, but let the write task finish with an error.
444                 var tcs = new TaskCompletionSource<object>();
445                 tcs.SetException(new RpcException(finishedStatus.Value.Status, finishedStatus.Value.Trailers));
446                 return tcs.Task;
447             }
448 
449             return null;
450         }
451 
CheckSendPreconditionsClientSide()452         private Task CheckSendPreconditionsClientSide()
453         {
454             GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
455             GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
456 
457             if (cancelRequested)
458             {
459                 // Return a cancelled task.
460                 var tcs = new TaskCompletionSource<object>();
461                 tcs.SetCanceled();
462                 return tcs.Task;
463             }
464 
465             return null;
466         }
467 
Initialize(CompletionQueueSafeHandle cq)468         private void Initialize(CompletionQueueSafeHandle cq)
469         {
470             var call = CreateNativeCall(cq);
471 
472             details.Channel.AddCallReference(this);
473             registeredWithChannel = true;
474             InitializeInternal(call);
475 
476             RegisterCancellationCallback();
477         }
478 
OnFailedToStartCallLocked()479         private void OnFailedToStartCallLocked()
480         {
481             ReleaseResources();
482 
483             // We need to execute the hook that disposes the cancellation token
484             // registration, but it cannot be done from under a lock.
485             // To make things simple, we just schedule the unregistering
486             // on a threadpool.
487             // - Once the native call is disposed, the Cancel() calls are ignored anyway
488             // - We don't care about the overhead as OnFailedToStartCallLocked() only happens
489             //   when something goes very bad when initializing a call and that should
490             //   never happen when gRPC is used correctly.
491             ThreadPool.QueueUserWorkItem((state) => OnAfterReleaseResourcesUnlocked());
492         }
493 
CreateNativeCall(CompletionQueueSafeHandle cq)494         private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq)
495         {
496             if (injectedNativeCall != null)
497             {
498                 return injectedNativeCall;  // allows injecting a mock INativeCall in tests.
499             }
500 
501             var parentCall = details.Options.PropagationToken.AsImplOrNull()?.ParentCall ?? CallSafeHandle.NullInstance;
502 
503             var credentials = details.Options.Credentials;
504             using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
505             {
506                 var result = details.Channel.Handle.CreateCall(
507                              parentCall, ContextPropagationTokenImpl.DefaultMask, cq,
508                              details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
509                 return result;
510             }
511         }
512 
513         // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
RegisterCancellationCallback()514         private void RegisterCancellationCallback()
515         {
516             cancellationTokenRegistration = RegisterCancellationCallbackForToken(details.Options.CancellationToken);
517         }
518 
519         /// <summary>
520         /// Gets WriteFlags set in callDetails.Options.WriteOptions
521         /// </summary>
GetWriteFlagsForCall()522         private WriteFlags GetWriteFlagsForCall()
523         {
524             var writeOptions = details.Options.WriteOptions;
525             return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
526         }
527 
528         /// <summary>
529         /// Handles receive status completion for calls with streaming response.
530         /// </summary>
HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)531         private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
532         {
533             // TODO(jtattermusch): handle success==false
534             responseHeadersTcs.SetResult(responseHeaders);
535         }
536 
537         /// <summary>
538         /// Handler for unary response completion.
539         /// </summary>
HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders)540         private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders)
541         {
542             // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
543             // success will be always set to true.
544 
545             TaskCompletionSource<object> delayedStreamingWriteTcs = null;
546             TResponse msg = default(TResponse);
547             var deserializeException = TryDeserialize(receivedMessageReader, out msg);
548 
549             bool releasedResources;
550             lock (myLock)
551             {
552                 finished = true;
553 
554                 if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
555                 {
556                     receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
557                 }
558                 finishedStatus = receivedStatus;
559 
560                 if (isStreamingWriteCompletionDelayed)
561                 {
562                     delayedStreamingWriteTcs = streamingWriteTcs;
563                     streamingWriteTcs = null;
564                 }
565 
566                 releasedResources = ReleaseResourcesIfPossible();
567             }
568 
569             if (releasedResources)
570             {
571                 OnAfterReleaseResourcesUnlocked();
572             }
573 
574             responseHeadersTcs.SetResult(responseHeaders);
575 
576             if (delayedStreamingWriteTcs != null)
577             {
578                 delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
579             }
580 
581             var status = receivedStatus.Status;
582             if (status.StatusCode != StatusCode.OK)
583             {
584                 unaryResponseTcs.SetException(new RpcException(status, receivedStatus.Trailers));
585                 return;
586             }
587 
588             unaryResponseTcs.SetResult(msg);
589         }
590 
591         /// <summary>
592         /// Handles receive status completion for calls with streaming response.
593         /// </summary>
HandleFinished(bool success, ClientSideStatus receivedStatus)594         private void HandleFinished(bool success, ClientSideStatus receivedStatus)
595         {
596             // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
597             // success will be always set to true.
598 
599             TaskCompletionSource<object> delayedStreamingWriteTcs = null;
600 
601             bool releasedResources;
602             bool origCancelRequested;
603             lock (myLock)
604             {
605                 finished = true;
606                 finishedStatus = receivedStatus;
607                 if (isStreamingWriteCompletionDelayed)
608                 {
609                     delayedStreamingWriteTcs = streamingWriteTcs;
610                     streamingWriteTcs = null;
611                 }
612 
613                 releasedResources = ReleaseResourcesIfPossible();
614                 origCancelRequested = cancelRequested;
615             }
616 
617             if (releasedResources)
618             {
619                 OnAfterReleaseResourcesUnlocked();
620             }
621 
622             if (delayedStreamingWriteTcs != null)
623             {
624                 delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
625             }
626 
627             var status = receivedStatus.Status;
628             if (status.StatusCode != StatusCode.OK)
629             {
630                 streamingResponseCallFinishedTcs.SetException(new RpcException(status, receivedStatus.Trailers));
631                 if (status.StatusCode == StatusCode.Cancelled || origCancelRequested)
632                 {
633                     // Make sure the exception set to the Task is observed,
634                     // otherwise this can trigger "Unobserved exception" when the response stream
635                     // is not read until its end and the task created by the TCS is garbage collected.
636                     // See https://github.com/grpc/grpc/issues/17458
637                     var _ = streamingResponseCallFinishedTcs.Task.Exception;
638                 }
639                 return;
640             }
641 
642             streamingResponseCallFinishedTcs.SetResult(null);
643         }
644 
645         IUnaryResponseClientCallback UnaryResponseClientCallback => this;
646 
IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders)647         void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders)
648         {
649             HandleUnaryResponse(success, receivedStatus, receivedMessageReader, responseHeaders);
650         }
651 
652         IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback => this;
653 
IReceivedStatusOnClientCallback.OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus)654         void IReceivedStatusOnClientCallback.OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus)
655         {
656             HandleFinished(success, receivedStatus);
657         }
658 
659         IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback => this;
660 
IReceivedResponseHeadersCallback.OnReceivedResponseHeaders(bool success, Metadata responseHeaders)661         void IReceivedResponseHeadersCallback.OnReceivedResponseHeaders(bool success, Metadata responseHeaders)
662         {
663             HandleReceivedResponseHeaders(success, responseHeaders);
664         }
665     }
666 }
667