1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Threading; 21 using System.Threading.Tasks; 22 using Grpc.Core.Logging; 23 using Grpc.Core.Profiling; 24 using Grpc.Core.Utils; 25 26 namespace Grpc.Core.Internal 27 { 28 /// <summary> 29 /// Manages client side native call lifecycle. 30 /// </summary> 31 internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>, IUnaryResponseClientCallback, IReceivedStatusOnClientCallback, IReceivedResponseHeadersCallback 32 { 33 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>(); 34 35 readonly CallInvocationDetails<TRequest, TResponse> details; 36 readonly INativeCall injectedNativeCall; // for testing 37 38 bool registeredWithChannel; 39 40 // Dispose of to de-register cancellation token registration 41 CancellationTokenRegistration cancellationTokenRegistration; 42 43 // Completion of a pending unary response if not null. 44 TaskCompletionSource<TResponse> unaryResponseTcs; 45 46 // Completion of a streaming response call if not null. 47 TaskCompletionSource<object> streamingResponseCallFinishedTcs; 48 49 // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers). 50 // Response headers set here once received. 51 TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>(); 52 53 // Set after status is received. Used for both unary and streaming response calls. 54 ClientSideStatus? finishedStatus; 55 AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)56 public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails) 57 : base(callDetails.RequestMarshaller.ContextualSerializer, callDetails.ResponseMarshaller.ContextualDeserializer) 58 { 59 this.details = callDetails.WithOptions(callDetails.Options.Normalize()); 60 this.initialMetadataSent = true; // we always send metadata at the very beginning of the call. 61 } 62 63 /// <summary> 64 /// This constructor should only be used for testing. 65 /// </summary> AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall)66 public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails) 67 { 68 this.injectedNativeCall = injectedNativeCall; 69 } 70 71 // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but 72 // it is reusing fair amount of code in this class, so we are leaving it here. 73 /// <summary> 74 /// Blocking unary request - unary response call. 75 /// </summary> UnaryCall(TRequest msg)76 public TResponse UnaryCall(TRequest msg) 77 { 78 var profiler = Profilers.ForCurrentThread(); 79 80 using (profiler.NewScope("AsyncCall.UnaryCall")) 81 using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.CreateSync()) 82 { 83 bool callStartedOk = false; 84 try 85 { 86 unaryResponseTcs = new TaskCompletionSource<TResponse>(); 87 88 lock (myLock) 89 { 90 GrpcPreconditions.CheckState(!started); 91 started = true; 92 Initialize(cq); 93 94 halfcloseRequested = true; 95 readingDone = true; 96 } 97 98 using (var serializationScope = DefaultSerializationContext.GetInitializedThreadLocalScope()) 99 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) 100 { 101 var payload = UnsafeSerialize(msg, serializationScope.Context); // do before metadata array? 102 var ctx = details.Channel.Environment.BatchContextPool.Lease(); 103 try 104 { 105 call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); 106 callStartedOk = true; 107 108 var ev = cq.Pluck(ctx.Handle); 109 bool success = (ev.success != 0); 110 try 111 { 112 using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) 113 { 114 HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessageReader(), ctx.GetReceivedInitialMetadata()); 115 } 116 } 117 catch (Exception e) 118 { 119 Logger.Error(e, "Exception occurred while invoking completion delegate."); 120 } 121 } 122 finally 123 { 124 ctx.Recycle(); 125 } 126 } 127 } 128 finally 129 { 130 if (!callStartedOk) 131 { 132 lock (myLock) 133 { 134 OnFailedToStartCallLocked(); 135 } 136 } 137 } 138 139 // Once the blocking call returns, the result should be available synchronously. 140 // Note that GetAwaiter().GetResult() doesn't wrap exceptions in AggregateException. 141 return unaryResponseTcs.Task.GetAwaiter().GetResult(); 142 } 143 } 144 145 /// <summary> 146 /// Starts a unary request - unary response call. 147 /// </summary> UnaryCallAsync(TRequest msg)148 public Task<TResponse> UnaryCallAsync(TRequest msg) 149 { 150 lock (myLock) 151 { 152 bool callStartedOk = false; 153 try 154 { 155 GrpcPreconditions.CheckState(!started); 156 started = true; 157 158 Initialize(details.Channel.CompletionQueue); 159 160 halfcloseRequested = true; 161 readingDone = true; 162 163 using (var serializationScope = DefaultSerializationContext.GetInitializedThreadLocalScope()) 164 { 165 var payload = UnsafeSerialize(msg, serializationScope.Context); 166 unaryResponseTcs = new TaskCompletionSource<TResponse>(); 167 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) 168 { 169 call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); 170 callStartedOk = true; 171 } 172 } 173 174 return unaryResponseTcs.Task; 175 } 176 finally 177 { 178 if (!callStartedOk) 179 { 180 OnFailedToStartCallLocked(); 181 } 182 } 183 } 184 } 185 186 /// <summary> 187 /// Starts a streamed request - unary response call. 188 /// Use StartSendMessage and StartSendCloseFromClient to stream requests. 189 /// </summary> ClientStreamingCallAsync()190 public Task<TResponse> ClientStreamingCallAsync() 191 { 192 lock (myLock) 193 { 194 bool callStartedOk = false; 195 try 196 { 197 GrpcPreconditions.CheckState(!started); 198 started = true; 199 200 Initialize(details.Channel.CompletionQueue); 201 202 readingDone = true; 203 204 unaryResponseTcs = new TaskCompletionSource<TResponse>(); 205 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) 206 { 207 call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags); 208 callStartedOk = true; 209 } 210 211 return unaryResponseTcs.Task; 212 } 213 finally 214 { 215 if (!callStartedOk) 216 { 217 OnFailedToStartCallLocked(); 218 } 219 } 220 } 221 } 222 223 /// <summary> 224 /// Starts a unary request - streamed response call. 225 /// </summary> StartServerStreamingCall(TRequest msg)226 public void StartServerStreamingCall(TRequest msg) 227 { 228 lock (myLock) 229 { 230 bool callStartedOk = false; 231 try 232 { 233 GrpcPreconditions.CheckState(!started); 234 started = true; 235 236 Initialize(details.Channel.CompletionQueue); 237 238 halfcloseRequested = true; 239 240 using (var serializationScope = DefaultSerializationContext.GetInitializedThreadLocalScope()) 241 { 242 var payload = UnsafeSerialize(msg, serializationScope.Context); 243 streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); 244 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) 245 { 246 call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); 247 callStartedOk = true; 248 } 249 } 250 call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); 251 } 252 finally 253 { 254 if (!callStartedOk) 255 { 256 OnFailedToStartCallLocked(); 257 } 258 } 259 } 260 } 261 262 /// <summary> 263 /// Starts a streaming request - streaming response call. 264 /// Use StartSendMessage and StartSendCloseFromClient to stream requests. 265 /// </summary> StartDuplexStreamingCall()266 public void StartDuplexStreamingCall() 267 { 268 lock (myLock) 269 { 270 bool callStartedOk = false; 271 try 272 { 273 GrpcPreconditions.CheckState(!started); 274 started = true; 275 276 Initialize(details.Channel.CompletionQueue); 277 278 streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); 279 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) 280 { 281 call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags); 282 callStartedOk = true; 283 } 284 call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); 285 } 286 finally 287 { 288 if (!callStartedOk) 289 { 290 OnFailedToStartCallLocked(); 291 } 292 } 293 } 294 } 295 296 /// <summary> 297 /// Sends a streaming request. Only one pending send action is allowed at any given time. 298 /// </summary> SendMessageAsync(TRequest msg, WriteFlags writeFlags)299 public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags) 300 { 301 return SendMessageInternalAsync(msg, writeFlags); 302 } 303 304 /// <summary> 305 /// Receives a streaming response. Only one pending read action is allowed at any given time. 306 /// </summary> ReadMessageAsync()307 public Task<TResponse> ReadMessageAsync() 308 { 309 return ReadMessageInternalAsync(); 310 } 311 312 /// <summary> 313 /// Sends halfclose, indicating client is done with streaming requests. 314 /// Only one pending send action is allowed at any given time. 315 /// </summary> SendCloseFromClientAsync()316 public Task SendCloseFromClientAsync() 317 { 318 lock (myLock) 319 { 320 GrpcPreconditions.CheckState(started); 321 322 var earlyResult = CheckSendPreconditionsClientSide(); 323 if (earlyResult != null) 324 { 325 return earlyResult; 326 } 327 328 if (disposed || finished) 329 { 330 // In case the call has already been finished by the serverside, 331 // the halfclose has already been done implicitly, so just return 332 // completed task here. 333 halfcloseRequested = true; 334 return TaskUtils.CompletedTask; 335 } 336 call.StartSendCloseFromClient(SendCompletionCallback); 337 338 halfcloseRequested = true; 339 streamingWriteTcs = new TaskCompletionSource<object>(); 340 return streamingWriteTcs.Task; 341 } 342 } 343 344 /// <summary> 345 /// Get the task that completes once if streaming response call finishes with ok status and throws RpcException with given status otherwise. 346 /// </summary> 347 public Task StreamingResponseCallFinishedTask 348 { 349 get 350 { 351 return streamingResponseCallFinishedTcs.Task; 352 } 353 } 354 355 /// <summary> 356 /// Get the task that completes once response headers are received. 357 /// </summary> 358 public Task<Metadata> ResponseHeadersAsync 359 { 360 get 361 { 362 return responseHeadersTcs.Task; 363 } 364 } 365 366 /// <summary> 367 /// Gets the resulting status if the call has already finished. 368 /// Throws InvalidOperationException otherwise. 369 /// </summary> GetStatus()370 public Status GetStatus() 371 { 372 lock (myLock) 373 { 374 GrpcPreconditions.CheckState(finishedStatus.HasValue, "Status can only be accessed once the call has finished."); 375 return finishedStatus.Value.Status; 376 } 377 } 378 379 /// <summary> 380 /// Gets the trailing metadata if the call has already finished. 381 /// Throws InvalidOperationException otherwise. 382 /// </summary> GetTrailers()383 public Metadata GetTrailers() 384 { 385 lock (myLock) 386 { 387 GrpcPreconditions.CheckState(finishedStatus.HasValue, "Trailers can only be accessed once the call has finished."); 388 return finishedStatus.Value.Trailers; 389 } 390 } 391 392 public CallInvocationDetails<TRequest, TResponse> Details 393 { 394 get 395 { 396 return this.details; 397 } 398 } 399 OnAfterReleaseResourcesLocked()400 protected override void OnAfterReleaseResourcesLocked() 401 { 402 if (registeredWithChannel) 403 { 404 details.Channel.RemoveCallReference(this); 405 registeredWithChannel = false; 406 } 407 } 408 OnAfterReleaseResourcesUnlocked()409 protected override void OnAfterReleaseResourcesUnlocked() 410 { 411 // If cancellation callback is in progress, this can block 412 // so we need to do this outside of call's lock to prevent 413 // deadlock. 414 // See https://github.com/grpc/grpc/issues/14777 415 // See https://github.com/dotnet/corefx/issues/14903 416 cancellationTokenRegistration.Dispose(); 417 } 418 419 protected override bool IsClient 420 { 421 get { return true; } 422 } 423 GetRpcExceptionClientOnly()424 protected override Exception GetRpcExceptionClientOnly() 425 { 426 return new RpcException(finishedStatus.Value.Status, finishedStatus.Value.Trailers); 427 } 428 CheckSendAllowedOrEarlyResult()429 protected override Task CheckSendAllowedOrEarlyResult() 430 { 431 var earlyResult = CheckSendPreconditionsClientSide(); 432 if (earlyResult != null) 433 { 434 return earlyResult; 435 } 436 437 if (finishedStatus.HasValue) 438 { 439 // throwing RpcException if we already received status on client 440 // side makes the most sense. 441 // Note that this throws even for StatusCode.OK. 442 // Writing after the call has finished is not a programming error because server can close 443 // the call anytime, so don't throw directly, but let the write task finish with an error. 444 var tcs = new TaskCompletionSource<object>(); 445 tcs.SetException(new RpcException(finishedStatus.Value.Status, finishedStatus.Value.Trailers)); 446 return tcs.Task; 447 } 448 449 return null; 450 } 451 CheckSendPreconditionsClientSide()452 private Task CheckSendPreconditionsClientSide() 453 { 454 GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); 455 GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); 456 457 if (cancelRequested) 458 { 459 // Return a cancelled task. 460 var tcs = new TaskCompletionSource<object>(); 461 tcs.SetCanceled(); 462 return tcs.Task; 463 } 464 465 return null; 466 } 467 Initialize(CompletionQueueSafeHandle cq)468 private void Initialize(CompletionQueueSafeHandle cq) 469 { 470 var call = CreateNativeCall(cq); 471 472 details.Channel.AddCallReference(this); 473 registeredWithChannel = true; 474 InitializeInternal(call); 475 476 RegisterCancellationCallback(); 477 } 478 OnFailedToStartCallLocked()479 private void OnFailedToStartCallLocked() 480 { 481 ReleaseResources(); 482 483 // We need to execute the hook that disposes the cancellation token 484 // registration, but it cannot be done from under a lock. 485 // To make things simple, we just schedule the unregistering 486 // on a threadpool. 487 // - Once the native call is disposed, the Cancel() calls are ignored anyway 488 // - We don't care about the overhead as OnFailedToStartCallLocked() only happens 489 // when something goes very bad when initializing a call and that should 490 // never happen when gRPC is used correctly. 491 ThreadPool.QueueUserWorkItem((state) => OnAfterReleaseResourcesUnlocked()); 492 } 493 CreateNativeCall(CompletionQueueSafeHandle cq)494 private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq) 495 { 496 if (injectedNativeCall != null) 497 { 498 return injectedNativeCall; // allows injecting a mock INativeCall in tests. 499 } 500 501 var parentCall = details.Options.PropagationToken.AsImplOrNull()?.ParentCall ?? CallSafeHandle.NullInstance; 502 503 var credentials = details.Options.Credentials; 504 using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null) 505 { 506 var result = details.Channel.Handle.CreateCall( 507 parentCall, ContextPropagationTokenImpl.DefaultMask, cq, 508 details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials); 509 return result; 510 } 511 } 512 513 // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called. RegisterCancellationCallback()514 private void RegisterCancellationCallback() 515 { 516 cancellationTokenRegistration = RegisterCancellationCallbackForToken(details.Options.CancellationToken); 517 } 518 519 /// <summary> 520 /// Gets WriteFlags set in callDetails.Options.WriteOptions 521 /// </summary> GetWriteFlagsForCall()522 private WriteFlags GetWriteFlagsForCall() 523 { 524 var writeOptions = details.Options.WriteOptions; 525 return writeOptions != null ? writeOptions.Flags : default(WriteFlags); 526 } 527 528 /// <summary> 529 /// Handles receive status completion for calls with streaming response. 530 /// </summary> HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)531 private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders) 532 { 533 // TODO(jtattermusch): handle success==false 534 responseHeadersTcs.SetResult(responseHeaders); 535 } 536 537 /// <summary> 538 /// Handler for unary response completion. 539 /// </summary> HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders)540 private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders) 541 { 542 // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, 543 // success will be always set to true. 544 545 TaskCompletionSource<object> delayedStreamingWriteTcs = null; 546 TResponse msg = default(TResponse); 547 var deserializeException = TryDeserialize(receivedMessageReader, out msg); 548 549 bool releasedResources; 550 lock (myLock) 551 { 552 finished = true; 553 554 if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK) 555 { 556 receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers); 557 } 558 finishedStatus = receivedStatus; 559 560 if (isStreamingWriteCompletionDelayed) 561 { 562 delayedStreamingWriteTcs = streamingWriteTcs; 563 streamingWriteTcs = null; 564 } 565 566 releasedResources = ReleaseResourcesIfPossible(); 567 } 568 569 if (releasedResources) 570 { 571 OnAfterReleaseResourcesUnlocked(); 572 } 573 574 responseHeadersTcs.SetResult(responseHeaders); 575 576 if (delayedStreamingWriteTcs != null) 577 { 578 delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly()); 579 } 580 581 var status = receivedStatus.Status; 582 if (status.StatusCode != StatusCode.OK) 583 { 584 unaryResponseTcs.SetException(new RpcException(status, receivedStatus.Trailers)); 585 return; 586 } 587 588 unaryResponseTcs.SetResult(msg); 589 } 590 591 /// <summary> 592 /// Handles receive status completion for calls with streaming response. 593 /// </summary> HandleFinished(bool success, ClientSideStatus receivedStatus)594 private void HandleFinished(bool success, ClientSideStatus receivedStatus) 595 { 596 // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, 597 // success will be always set to true. 598 599 TaskCompletionSource<object> delayedStreamingWriteTcs = null; 600 601 bool releasedResources; 602 bool origCancelRequested; 603 lock (myLock) 604 { 605 finished = true; 606 finishedStatus = receivedStatus; 607 if (isStreamingWriteCompletionDelayed) 608 { 609 delayedStreamingWriteTcs = streamingWriteTcs; 610 streamingWriteTcs = null; 611 } 612 613 releasedResources = ReleaseResourcesIfPossible(); 614 origCancelRequested = cancelRequested; 615 } 616 617 if (releasedResources) 618 { 619 OnAfterReleaseResourcesUnlocked(); 620 } 621 622 if (delayedStreamingWriteTcs != null) 623 { 624 delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly()); 625 } 626 627 var status = receivedStatus.Status; 628 if (status.StatusCode != StatusCode.OK) 629 { 630 streamingResponseCallFinishedTcs.SetException(new RpcException(status, receivedStatus.Trailers)); 631 if (status.StatusCode == StatusCode.Cancelled || origCancelRequested) 632 { 633 // Make sure the exception set to the Task is observed, 634 // otherwise this can trigger "Unobserved exception" when the response stream 635 // is not read until its end and the task created by the TCS is garbage collected. 636 // See https://github.com/grpc/grpc/issues/17458 637 var _ = streamingResponseCallFinishedTcs.Task.Exception; 638 } 639 return; 640 } 641 642 streamingResponseCallFinishedTcs.SetResult(null); 643 } 644 645 IUnaryResponseClientCallback UnaryResponseClientCallback => this; 646 IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders)647 void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders) 648 { 649 HandleUnaryResponse(success, receivedStatus, receivedMessageReader, responseHeaders); 650 } 651 652 IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback => this; 653 IReceivedStatusOnClientCallback.OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus)654 void IReceivedStatusOnClientCallback.OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus) 655 { 656 HandleFinished(success, receivedStatus); 657 } 658 659 IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback => this; 660 IReceivedResponseHeadersCallback.OnReceivedResponseHeaders(bool success, Metadata responseHeaders)661 void IReceivedResponseHeadersCallback.OnReceivedResponseHeaders(bool success, Metadata responseHeaders) 662 { 663 HandleReceivedResponseHeaders(success, responseHeaders); 664 } 665 } 666 } 667