1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Threading; 21 using System.Threading.Tasks; 22 using Grpc.Core.Logging; 23 using Grpc.Core.Profiling; 24 using Grpc.Core.Utils; 25 26 namespace Grpc.Core.Internal 27 { 28 /// <summary> 29 /// Manages client side native call lifecycle. 30 /// </summary> 31 internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>, IUnaryResponseClientCallback, IReceivedStatusOnClientCallback, IReceivedResponseHeadersCallback 32 { 33 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>(); 34 35 readonly CallInvocationDetails<TRequest, TResponse> details; 36 readonly INativeCall injectedNativeCall; // for testing 37 38 bool registeredWithChannel; 39 40 // Dispose of to de-register cancellation token registration 41 IDisposable cancellationTokenRegistration; 42 43 // Completion of a pending unary response if not null. 44 TaskCompletionSource<TResponse> unaryResponseTcs; 45 46 // Completion of a streaming response call if not null. 47 TaskCompletionSource<object> streamingResponseCallFinishedTcs; 48 49 // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers). 50 // Response headers set here once received. 51 TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>(); 52 53 // Set after status is received. Used for both unary and streaming response calls. 54 ClientSideStatus? finishedStatus; 55 AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)56 public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails) 57 : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer) 58 { 59 this.details = callDetails.WithOptions(callDetails.Options.Normalize()); 60 this.initialMetadataSent = true; // we always send metadata at the very beginning of the call. 61 } 62 63 /// <summary> 64 /// This constructor should only be used for testing. 65 /// </summary> AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall)66 public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails) 67 { 68 this.injectedNativeCall = injectedNativeCall; 69 } 70 71 // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but 72 // it is reusing fair amount of code in this class, so we are leaving it here. 73 /// <summary> 74 /// Blocking unary request - unary response call. 75 /// </summary> UnaryCall(TRequest msg)76 public TResponse UnaryCall(TRequest msg) 77 { 78 var profiler = Profilers.ForCurrentThread(); 79 80 using (profiler.NewScope("AsyncCall.UnaryCall")) 81 using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.CreateSync()) 82 { 83 bool callStartedOk = false; 84 try 85 { 86 unaryResponseTcs = new TaskCompletionSource<TResponse>(); 87 88 lock (myLock) 89 { 90 GrpcPreconditions.CheckState(!started); 91 started = true; 92 Initialize(cq); 93 94 halfcloseRequested = true; 95 readingDone = true; 96 } 97 98 byte[] payload = UnsafeSerialize(msg); 99 100 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) 101 { 102 var ctx = details.Channel.Environment.BatchContextPool.Lease(); 103 try 104 { 105 call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); 106 callStartedOk = true; 107 108 var ev = cq.Pluck(ctx.Handle); 109 bool success = (ev.success != 0); 110 try 111 { 112 using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) 113 { 114 HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); 115 } 116 } 117 catch (Exception e) 118 { 119 Logger.Error(e, "Exception occurred while invoking completion delegate."); 120 } 121 } 122 finally 123 { 124 ctx.Recycle(); 125 } 126 } 127 } 128 finally 129 { 130 if (!callStartedOk) 131 { 132 lock (myLock) 133 { 134 OnFailedToStartCallLocked(); 135 } 136 } 137 } 138 139 // Once the blocking call returns, the result should be available synchronously. 140 // Note that GetAwaiter().GetResult() doesn't wrap exceptions in AggregateException. 141 return unaryResponseTcs.Task.GetAwaiter().GetResult(); 142 } 143 } 144 145 /// <summary> 146 /// Starts a unary request - unary response call. 147 /// </summary> UnaryCallAsync(TRequest msg)148 public Task<TResponse> UnaryCallAsync(TRequest msg) 149 { 150 lock (myLock) 151 { 152 bool callStartedOk = false; 153 try 154 { 155 GrpcPreconditions.CheckState(!started); 156 started = true; 157 158 Initialize(details.Channel.CompletionQueue); 159 160 halfcloseRequested = true; 161 readingDone = true; 162 163 byte[] payload = UnsafeSerialize(msg); 164 165 unaryResponseTcs = new TaskCompletionSource<TResponse>(); 166 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) 167 { 168 call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); 169 callStartedOk = true; 170 } 171 172 return unaryResponseTcs.Task; 173 } 174 finally 175 { 176 if (!callStartedOk) 177 { 178 OnFailedToStartCallLocked(); 179 } 180 } 181 } 182 } 183 184 /// <summary> 185 /// Starts a streamed request - unary response call. 186 /// Use StartSendMessage and StartSendCloseFromClient to stream requests. 187 /// </summary> ClientStreamingCallAsync()188 public Task<TResponse> ClientStreamingCallAsync() 189 { 190 lock (myLock) 191 { 192 bool callStartedOk = false; 193 try 194 { 195 GrpcPreconditions.CheckState(!started); 196 started = true; 197 198 Initialize(details.Channel.CompletionQueue); 199 200 readingDone = true; 201 202 unaryResponseTcs = new TaskCompletionSource<TResponse>(); 203 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) 204 { 205 call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags); 206 callStartedOk = true; 207 } 208 209 return unaryResponseTcs.Task; 210 } 211 finally 212 { 213 if (!callStartedOk) 214 { 215 OnFailedToStartCallLocked(); 216 } 217 } 218 } 219 } 220 221 /// <summary> 222 /// Starts a unary request - streamed response call. 223 /// </summary> StartServerStreamingCall(TRequest msg)224 public void StartServerStreamingCall(TRequest msg) 225 { 226 lock (myLock) 227 { 228 bool callStartedOk = false; 229 try 230 { 231 GrpcPreconditions.CheckState(!started); 232 started = true; 233 234 Initialize(details.Channel.CompletionQueue); 235 236 halfcloseRequested = true; 237 238 byte[] payload = UnsafeSerialize(msg); 239 240 streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); 241 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) 242 { 243 call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); 244 callStartedOk = true; 245 } 246 call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); 247 } 248 finally 249 { 250 if (!callStartedOk) 251 { 252 OnFailedToStartCallLocked(); 253 } 254 } 255 } 256 } 257 258 /// <summary> 259 /// Starts a streaming request - streaming response call. 260 /// Use StartSendMessage and StartSendCloseFromClient to stream requests. 261 /// </summary> StartDuplexStreamingCall()262 public void StartDuplexStreamingCall() 263 { 264 lock (myLock) 265 { 266 bool callStartedOk = false; 267 try 268 { 269 GrpcPreconditions.CheckState(!started); 270 started = true; 271 272 Initialize(details.Channel.CompletionQueue); 273 274 streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); 275 using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) 276 { 277 call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags); 278 callStartedOk = true; 279 } 280 call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); 281 } 282 finally 283 { 284 if (!callStartedOk) 285 { 286 OnFailedToStartCallLocked(); 287 } 288 } 289 } 290 } 291 292 /// <summary> 293 /// Sends a streaming request. Only one pending send action is allowed at any given time. 294 /// </summary> SendMessageAsync(TRequest msg, WriteFlags writeFlags)295 public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags) 296 { 297 return SendMessageInternalAsync(msg, writeFlags); 298 } 299 300 /// <summary> 301 /// Receives a streaming response. Only one pending read action is allowed at any given time. 302 /// </summary> ReadMessageAsync()303 public Task<TResponse> ReadMessageAsync() 304 { 305 return ReadMessageInternalAsync(); 306 } 307 308 /// <summary> 309 /// Sends halfclose, indicating client is done with streaming requests. 310 /// Only one pending send action is allowed at any given time. 311 /// </summary> SendCloseFromClientAsync()312 public Task SendCloseFromClientAsync() 313 { 314 lock (myLock) 315 { 316 GrpcPreconditions.CheckState(started); 317 318 var earlyResult = CheckSendPreconditionsClientSide(); 319 if (earlyResult != null) 320 { 321 return earlyResult; 322 } 323 324 if (disposed || finished) 325 { 326 // In case the call has already been finished by the serverside, 327 // the halfclose has already been done implicitly, so just return 328 // completed task here. 329 halfcloseRequested = true; 330 return TaskUtils.CompletedTask; 331 } 332 call.StartSendCloseFromClient(SendCompletionCallback); 333 334 halfcloseRequested = true; 335 streamingWriteTcs = new TaskCompletionSource<object>(); 336 return streamingWriteTcs.Task; 337 } 338 } 339 340 /// <summary> 341 /// Get the task that completes once if streaming response call finishes with ok status and throws RpcException with given status otherwise. 342 /// </summary> 343 public Task StreamingResponseCallFinishedTask 344 { 345 get 346 { 347 return streamingResponseCallFinishedTcs.Task; 348 } 349 } 350 351 /// <summary> 352 /// Get the task that completes once response headers are received. 353 /// </summary> 354 public Task<Metadata> ResponseHeadersAsync 355 { 356 get 357 { 358 return responseHeadersTcs.Task; 359 } 360 } 361 362 /// <summary> 363 /// Gets the resulting status if the call has already finished. 364 /// Throws InvalidOperationException otherwise. 365 /// </summary> GetStatus()366 public Status GetStatus() 367 { 368 lock (myLock) 369 { 370 GrpcPreconditions.CheckState(finishedStatus.HasValue, "Status can only be accessed once the call has finished."); 371 return finishedStatus.Value.Status; 372 } 373 } 374 375 /// <summary> 376 /// Gets the trailing metadata if the call has already finished. 377 /// Throws InvalidOperationException otherwise. 378 /// </summary> GetTrailers()379 public Metadata GetTrailers() 380 { 381 lock (myLock) 382 { 383 GrpcPreconditions.CheckState(finishedStatus.HasValue, "Trailers can only be accessed once the call has finished."); 384 return finishedStatus.Value.Trailers; 385 } 386 } 387 388 public CallInvocationDetails<TRequest, TResponse> Details 389 { 390 get 391 { 392 return this.details; 393 } 394 } 395 OnAfterReleaseResourcesLocked()396 protected override void OnAfterReleaseResourcesLocked() 397 { 398 if (registeredWithChannel) 399 { 400 details.Channel.RemoveCallReference(this); 401 registeredWithChannel = false; 402 } 403 } 404 OnAfterReleaseResourcesUnlocked()405 protected override void OnAfterReleaseResourcesUnlocked() 406 { 407 // If cancellation callback is in progress, this can block 408 // so we need to do this outside of call's lock to prevent 409 // deadlock. 410 // See https://github.com/grpc/grpc/issues/14777 411 // See https://github.com/dotnet/corefx/issues/14903 412 cancellationTokenRegistration?.Dispose(); 413 } 414 415 protected override bool IsClient 416 { 417 get { return true; } 418 } 419 GetRpcExceptionClientOnly()420 protected override Exception GetRpcExceptionClientOnly() 421 { 422 return new RpcException(finishedStatus.Value.Status, finishedStatus.Value.Trailers); 423 } 424 CheckSendAllowedOrEarlyResult()425 protected override Task CheckSendAllowedOrEarlyResult() 426 { 427 var earlyResult = CheckSendPreconditionsClientSide(); 428 if (earlyResult != null) 429 { 430 return earlyResult; 431 } 432 433 if (finishedStatus.HasValue) 434 { 435 // throwing RpcException if we already received status on client 436 // side makes the most sense. 437 // Note that this throws even for StatusCode.OK. 438 // Writing after the call has finished is not a programming error because server can close 439 // the call anytime, so don't throw directly, but let the write task finish with an error. 440 var tcs = new TaskCompletionSource<object>(); 441 tcs.SetException(new RpcException(finishedStatus.Value.Status, finishedStatus.Value.Trailers)); 442 return tcs.Task; 443 } 444 445 return null; 446 } 447 CheckSendPreconditionsClientSide()448 private Task CheckSendPreconditionsClientSide() 449 { 450 GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); 451 GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); 452 453 if (cancelRequested) 454 { 455 // Return a cancelled task. 456 var tcs = new TaskCompletionSource<object>(); 457 tcs.SetCanceled(); 458 return tcs.Task; 459 } 460 461 return null; 462 } 463 Initialize(CompletionQueueSafeHandle cq)464 private void Initialize(CompletionQueueSafeHandle cq) 465 { 466 var call = CreateNativeCall(cq); 467 468 details.Channel.AddCallReference(this); 469 registeredWithChannel = true; 470 InitializeInternal(call); 471 472 RegisterCancellationCallback(); 473 } 474 OnFailedToStartCallLocked()475 private void OnFailedToStartCallLocked() 476 { 477 ReleaseResources(); 478 479 // We need to execute the hook that disposes the cancellation token 480 // registration, but it cannot be done from under a lock. 481 // To make things simple, we just schedule the unregistering 482 // on a threadpool. 483 // - Once the native call is disposed, the Cancel() calls are ignored anyway 484 // - We don't care about the overhead as OnFailedToStartCallLocked() only happens 485 // when something goes very bad when initializing a call and that should 486 // never happen when gRPC is used correctly. 487 ThreadPool.QueueUserWorkItem((state) => OnAfterReleaseResourcesUnlocked()); 488 } 489 CreateNativeCall(CompletionQueueSafeHandle cq)490 private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq) 491 { 492 if (injectedNativeCall != null) 493 { 494 return injectedNativeCall; // allows injecting a mock INativeCall in tests. 495 } 496 497 var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance; 498 499 var credentials = details.Options.Credentials; 500 using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null) 501 { 502 var result = details.Channel.Handle.CreateCall( 503 parentCall, ContextPropagationToken.DefaultMask, cq, 504 details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials); 505 return result; 506 } 507 } 508 509 // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called. RegisterCancellationCallback()510 private void RegisterCancellationCallback() 511 { 512 var token = details.Options.CancellationToken; 513 if (token.CanBeCanceled) 514 { 515 cancellationTokenRegistration = token.Register(() => this.Cancel()); 516 } 517 } 518 519 /// <summary> 520 /// Gets WriteFlags set in callDetails.Options.WriteOptions 521 /// </summary> GetWriteFlagsForCall()522 private WriteFlags GetWriteFlagsForCall() 523 { 524 var writeOptions = details.Options.WriteOptions; 525 return writeOptions != null ? writeOptions.Flags : default(WriteFlags); 526 } 527 528 /// <summary> 529 /// Handles receive status completion for calls with streaming response. 530 /// </summary> HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)531 private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders) 532 { 533 // TODO(jtattermusch): handle success==false 534 responseHeadersTcs.SetResult(responseHeaders); 535 } 536 537 /// <summary> 538 /// Handler for unary response completion. 539 /// </summary> HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)540 private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) 541 { 542 // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, 543 // success will be always set to true. 544 545 TaskCompletionSource<object> delayedStreamingWriteTcs = null; 546 TResponse msg = default(TResponse); 547 var deserializeException = TryDeserialize(receivedMessage, out msg); 548 549 bool releasedResources; 550 lock (myLock) 551 { 552 finished = true; 553 554 if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK) 555 { 556 receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers); 557 } 558 finishedStatus = receivedStatus; 559 560 if (isStreamingWriteCompletionDelayed) 561 { 562 delayedStreamingWriteTcs = streamingWriteTcs; 563 streamingWriteTcs = null; 564 } 565 566 releasedResources = ReleaseResourcesIfPossible(); 567 } 568 569 if (releasedResources) 570 { 571 OnAfterReleaseResourcesUnlocked(); 572 } 573 574 responseHeadersTcs.SetResult(responseHeaders); 575 576 if (delayedStreamingWriteTcs != null) 577 { 578 delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly()); 579 } 580 581 var status = receivedStatus.Status; 582 if (status.StatusCode != StatusCode.OK) 583 { 584 unaryResponseTcs.SetException(new RpcException(status, receivedStatus.Trailers)); 585 return; 586 } 587 588 unaryResponseTcs.SetResult(msg); 589 } 590 591 /// <summary> 592 /// Handles receive status completion for calls with streaming response. 593 /// </summary> HandleFinished(bool success, ClientSideStatus receivedStatus)594 private void HandleFinished(bool success, ClientSideStatus receivedStatus) 595 { 596 // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, 597 // success will be always set to true. 598 599 TaskCompletionSource<object> delayedStreamingWriteTcs = null; 600 601 bool releasedResources; 602 lock (myLock) 603 { 604 finished = true; 605 finishedStatus = receivedStatus; 606 if (isStreamingWriteCompletionDelayed) 607 { 608 delayedStreamingWriteTcs = streamingWriteTcs; 609 streamingWriteTcs = null; 610 } 611 612 releasedResources = ReleaseResourcesIfPossible(); 613 } 614 615 if (releasedResources) 616 { 617 OnAfterReleaseResourcesUnlocked(); 618 } 619 620 if (delayedStreamingWriteTcs != null) 621 { 622 delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly()); 623 } 624 625 var status = receivedStatus.Status; 626 if (status.StatusCode != StatusCode.OK) 627 { 628 streamingResponseCallFinishedTcs.SetException(new RpcException(status, receivedStatus.Trailers)); 629 return; 630 } 631 632 streamingResponseCallFinishedTcs.SetResult(null); 633 } 634 635 IUnaryResponseClientCallback UnaryResponseClientCallback => this; 636 IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)637 void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) 638 { 639 HandleUnaryResponse(success, receivedStatus, receivedMessage, responseHeaders); 640 } 641 642 IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback => this; 643 IReceivedStatusOnClientCallback.OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus)644 void IReceivedStatusOnClientCallback.OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus) 645 { 646 HandleFinished(success, receivedStatus); 647 } 648 649 IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback => this; 650 IReceivedResponseHeadersCallback.OnReceivedResponseHeaders(bool success, Metadata responseHeaders)651 void IReceivedResponseHeadersCallback.OnReceivedResponseHeaders(bool success, Metadata responseHeaders) 652 { 653 HandleReceivedResponseHeaders(success, responseHeaders); 654 } 655 } 656 } 657