1 #region Copyright notice and license 2 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Collections.Generic; 21 using System.Threading.Tasks; 22 23 using Grpc.Core.Internal; 24 using Grpc.Core.Utils; 25 using NUnit.Framework; 26 27 namespace Grpc.Core.Internal.Tests 28 { 29 /// <summary> 30 /// Uses fake native call to test interaction of <c>AsyncCall</c> wrapping code with C core in different situations. 31 /// </summary> 32 public class AsyncCallTest 33 { 34 Channel channel; 35 FakeNativeCall fakeCall; 36 AsyncCall<string, string> asyncCall; 37 FakeBufferReaderManager fakeBufferReaderManager; 38 39 [SetUp] Init()40 public void Init() 41 { 42 channel = new Channel("localhost", ChannelCredentials.Insecure); 43 44 fakeCall = new FakeNativeCall(); 45 46 var callDetails = new CallInvocationDetails<string, string>(channel, "someMethod", null, Marshallers.StringMarshaller, Marshallers.StringMarshaller, new CallOptions()); 47 asyncCall = new AsyncCall<string, string>(callDetails, fakeCall); 48 fakeBufferReaderManager = new FakeBufferReaderManager(); 49 } 50 51 [TearDown] Cleanup()52 public void Cleanup() 53 { 54 channel.ShutdownAsync().Wait(); 55 fakeBufferReaderManager.Dispose(); 56 } 57 58 [Test] AsyncUnary_CanBeStartedOnlyOnce()59 public void AsyncUnary_CanBeStartedOnlyOnce() 60 { 61 asyncCall.UnaryCallAsync("request1"); 62 Assert.Throws(typeof(InvalidOperationException), 63 () => asyncCall.UnaryCallAsync("abc")); 64 } 65 66 [Test] AsyncUnary_StreamingOperationsNotAllowed()67 public void AsyncUnary_StreamingOperationsNotAllowed() 68 { 69 asyncCall.UnaryCallAsync("request1"); 70 Assert.ThrowsAsync(typeof(InvalidOperationException), 71 async () => await asyncCall.ReadMessageAsync()); 72 Assert.Throws(typeof(InvalidOperationException), 73 () => asyncCall.SendMessageAsync("abc", new WriteFlags())); 74 } 75 76 [Test] AsyncUnary_Success()77 public void AsyncUnary_Success() 78 { 79 var resultTask = asyncCall.UnaryCallAsync("request1"); 80 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 81 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 82 CreateResponsePayload(), 83 new Metadata()); 84 85 AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); 86 } 87 88 [Test] AsyncUnary_NonSuccessStatusCode()89 public void AsyncUnary_NonSuccessStatusCode() 90 { 91 var resultTask = asyncCall.UnaryCallAsync("request1"); 92 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 93 CreateClientSideStatus(StatusCode.InvalidArgument), 94 CreateNullResponse(), 95 new Metadata()); 96 97 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); 98 } 99 100 [Test] AsyncUnary_NullResponsePayload()101 public void AsyncUnary_NullResponsePayload() 102 { 103 var resultTask = asyncCall.UnaryCallAsync("request1"); 104 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 105 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 106 null, 107 new Metadata()); 108 109 // failure to deserialize will result in InvalidArgument status. 110 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); 111 } 112 113 [Test] AsyncUnary_RequestSerializationExceptionDoesntLeakResources()114 public void AsyncUnary_RequestSerializationExceptionDoesntLeakResources() 115 { 116 string nullRequest = null; // will throw when serializing 117 Assert.Throws(typeof(ArgumentNullException), () => asyncCall.UnaryCallAsync(nullRequest)); 118 Assert.AreEqual(0, channel.GetCallReferenceCount()); 119 Assert.IsTrue(fakeCall.IsDisposed); 120 } 121 122 [Test] AsyncUnary_StartCallFailureDoesntLeakResources()123 public void AsyncUnary_StartCallFailureDoesntLeakResources() 124 { 125 fakeCall.MakeStartCallFail(); 126 Assert.Throws(typeof(InvalidOperationException), () => asyncCall.UnaryCallAsync("request1")); 127 Assert.AreEqual(0, channel.GetCallReferenceCount()); 128 Assert.IsTrue(fakeCall.IsDisposed); 129 } 130 131 [Test] SyncUnary_RequestSerializationExceptionDoesntLeakResources()132 public void SyncUnary_RequestSerializationExceptionDoesntLeakResources() 133 { 134 string nullRequest = null; // will throw when serializing 135 Assert.Throws(typeof(ArgumentNullException), () => asyncCall.UnaryCall(nullRequest)); 136 Assert.AreEqual(0, channel.GetCallReferenceCount()); 137 Assert.IsTrue(fakeCall.IsDisposed); 138 } 139 140 [Test] SyncUnary_StartCallFailureDoesntLeakResources()141 public void SyncUnary_StartCallFailureDoesntLeakResources() 142 { 143 fakeCall.MakeStartCallFail(); 144 Assert.Throws(typeof(InvalidOperationException), () => asyncCall.UnaryCall("request1")); 145 Assert.AreEqual(0, channel.GetCallReferenceCount()); 146 Assert.IsTrue(fakeCall.IsDisposed); 147 } 148 149 [Test] ClientStreaming_StreamingReadNotAllowed()150 public void ClientStreaming_StreamingReadNotAllowed() 151 { 152 asyncCall.ClientStreamingCallAsync(); 153 Assert.ThrowsAsync(typeof(InvalidOperationException), 154 async () => await asyncCall.ReadMessageAsync()); 155 } 156 157 [Test] ClientStreaming_NoRequest_Success()158 public void ClientStreaming_NoRequest_Success() 159 { 160 var resultTask = asyncCall.ClientStreamingCallAsync(); 161 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 162 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 163 CreateResponsePayload(), 164 new Metadata()); 165 166 AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); 167 } 168 169 [Test] ClientStreaming_NoRequest_NonSuccessStatusCode()170 public void ClientStreaming_NoRequest_NonSuccessStatusCode() 171 { 172 var resultTask = asyncCall.ClientStreamingCallAsync(); 173 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 174 CreateClientSideStatus(StatusCode.InvalidArgument), 175 CreateNullResponse(), 176 new Metadata()); 177 178 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); 179 } 180 181 [Test] ClientStreaming_MoreRequests_Success()182 public void ClientStreaming_MoreRequests_Success() 183 { 184 var resultTask = asyncCall.ClientStreamingCallAsync(); 185 var requestStream = new ClientRequestStream<string, string>(asyncCall); 186 187 var writeTask = requestStream.WriteAsync("request1"); 188 fakeCall.SendCompletionCallback.OnSendCompletion(true); 189 writeTask.Wait(); 190 191 var writeTask2 = requestStream.WriteAsync("request2"); 192 fakeCall.SendCompletionCallback.OnSendCompletion(true); 193 writeTask2.Wait(); 194 195 var completeTask = requestStream.CompleteAsync(); 196 fakeCall.SendCompletionCallback.OnSendCompletion(true); 197 completeTask.Wait(); 198 199 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 200 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 201 CreateResponsePayload(), 202 new Metadata()); 203 204 AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); 205 } 206 207 [Test] ClientStreaming_WriteFailureThrowsRpcException()208 public void ClientStreaming_WriteFailureThrowsRpcException() 209 { 210 var resultTask = asyncCall.ClientStreamingCallAsync(); 211 var requestStream = new ClientRequestStream<string, string>(asyncCall); 212 213 var writeTask = requestStream.WriteAsync("request1"); 214 fakeCall.SendCompletionCallback.OnSendCompletion(false); 215 216 // The write will wait for call to finish to receive the status code. 217 Assert.IsFalse(writeTask.IsCompleted); 218 219 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 220 CreateClientSideStatus(StatusCode.Internal), 221 CreateNullResponse(), 222 new Metadata()); 223 224 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 225 Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode); 226 227 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); 228 } 229 230 [Test] ClientStreaming_WriteFailureThrowsRpcException2()231 public void ClientStreaming_WriteFailureThrowsRpcException2() 232 { 233 var resultTask = asyncCall.ClientStreamingCallAsync(); 234 var requestStream = new ClientRequestStream<string, string>(asyncCall); 235 236 var writeTask = requestStream.WriteAsync("request1"); 237 238 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 239 CreateClientSideStatus(StatusCode.Internal), 240 CreateNullResponse(), 241 new Metadata()); 242 243 fakeCall.SendCompletionCallback.OnSendCompletion(false); 244 245 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 246 Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode); 247 248 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); 249 } 250 251 [Test] ClientStreaming_WriteFailureThrowsRpcException3()252 public void ClientStreaming_WriteFailureThrowsRpcException3() 253 { 254 var resultTask = asyncCall.ClientStreamingCallAsync(); 255 var requestStream = new ClientRequestStream<string, string>(asyncCall); 256 257 var writeTask = requestStream.WriteAsync("request1"); 258 fakeCall.SendCompletionCallback.OnSendCompletion(false); 259 260 // Until the delayed write completion has been triggered, 261 // we still act as if there was an active write. 262 Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request2")); 263 264 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 265 CreateClientSideStatus(StatusCode.Internal), 266 CreateNullResponse(), 267 new Metadata()); 268 269 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 270 Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode); 271 272 // Following attempts to write keep delivering the same status 273 var ex2 = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("after call has finished")); 274 Assert.AreEqual(StatusCode.Internal, ex2.Status.StatusCode); 275 276 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); 277 } 278 279 [Test] ClientStreaming_WriteAfterReceivingStatusThrowsRpcException()280 public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException() 281 { 282 var resultTask = asyncCall.ClientStreamingCallAsync(); 283 var requestStream = new ClientRequestStream<string, string>(asyncCall); 284 285 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 286 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 287 CreateResponsePayload(), 288 new Metadata()); 289 290 AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); 291 292 var writeTask = requestStream.WriteAsync("request1"); 293 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 294 Assert.AreEqual(Status.DefaultSuccess, ex.Status); 295 } 296 297 [Test] ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2()298 public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2() 299 { 300 var resultTask = asyncCall.ClientStreamingCallAsync(); 301 var requestStream = new ClientRequestStream<string, string>(asyncCall); 302 303 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 304 new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()), 305 CreateResponsePayload(), 306 new Metadata()); 307 308 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange); 309 310 var writeTask = requestStream.WriteAsync("request1"); 311 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 312 Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode); 313 } 314 315 [Test] ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException()316 public void ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException() 317 { 318 var resultTask = asyncCall.ClientStreamingCallAsync(); 319 var requestStream = new ClientRequestStream<string, string>(asyncCall); 320 321 requestStream.CompleteAsync(); 322 323 Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1")); 324 325 fakeCall.SendCompletionCallback.OnSendCompletion(true); 326 327 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 328 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 329 CreateResponsePayload(), 330 new Metadata()); 331 332 AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); 333 } 334 335 [Test] ClientStreaming_CompleteAfterReceivingStatusSucceeds()336 public void ClientStreaming_CompleteAfterReceivingStatusSucceeds() 337 { 338 var resultTask = asyncCall.ClientStreamingCallAsync(); 339 var requestStream = new ClientRequestStream<string, string>(asyncCall); 340 341 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 342 new ClientSideStatus(Status.DefaultSuccess, new Metadata()), 343 CreateResponsePayload(), 344 new Metadata()); 345 346 AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); 347 Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync()); 348 } 349 350 [Test] ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()351 public void ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException() 352 { 353 var resultTask = asyncCall.ClientStreamingCallAsync(); 354 var requestStream = new ClientRequestStream<string, string>(asyncCall); 355 356 asyncCall.Cancel(); 357 Assert.IsTrue(fakeCall.IsCancelled); 358 359 var writeTask = requestStream.WriteAsync("request1"); 360 Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); 361 362 fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, 363 CreateClientSideStatus(StatusCode.Cancelled), 364 CreateNullResponse(), 365 new Metadata()); 366 367 AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled); 368 } 369 370 [Test] ClientStreaming_StartCallFailureDoesntLeakResources()371 public void ClientStreaming_StartCallFailureDoesntLeakResources() 372 { 373 fakeCall.MakeStartCallFail(); 374 Assert.Throws(typeof(InvalidOperationException), () => asyncCall.ClientStreamingCallAsync()); 375 Assert.AreEqual(0, channel.GetCallReferenceCount()); 376 Assert.IsTrue(fakeCall.IsDisposed); 377 } 378 379 [Test] ServerStreaming_StreamingSendNotAllowed()380 public void ServerStreaming_StreamingSendNotAllowed() 381 { 382 asyncCall.StartServerStreamingCall("request1"); 383 Assert.Throws(typeof(InvalidOperationException), 384 () => asyncCall.SendMessageAsync("abc", new WriteFlags())); 385 } 386 387 [Test] ServerStreaming_NoResponse_Success1()388 public void ServerStreaming_NoResponse_Success1() 389 { 390 asyncCall.StartServerStreamingCall("request1"); 391 var responseStream = new ClientResponseStream<string, string>(asyncCall); 392 var readTask = responseStream.MoveNext(); 393 394 fakeCall.ReceivedResponseHeadersCallback.OnReceivedResponseHeaders(true, new Metadata()); 395 Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); 396 397 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); 398 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); 399 400 AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); 401 } 402 403 [Test] ServerStreaming_NoResponse_Success2()404 public void ServerStreaming_NoResponse_Success2() 405 { 406 asyncCall.StartServerStreamingCall("request1"); 407 var responseStream = new ClientResponseStream<string, string>(asyncCall); 408 var readTask = responseStream.MoveNext(); 409 410 // try alternative order of completions 411 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); 412 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); 413 414 AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); 415 } 416 417 [Test] ServerStreaming_NoResponse_ReadFailure()418 public void ServerStreaming_NoResponse_ReadFailure() 419 { 420 asyncCall.StartServerStreamingCall("request1"); 421 var responseStream = new ClientResponseStream<string, string>(asyncCall); 422 var readTask = responseStream.MoveNext(); 423 424 fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, CreateNullResponse()); // after a failed read, we rely on C core to deliver appropriate status code. 425 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Internal)); 426 427 AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal); 428 } 429 430 [Test] ServerStreaming_MoreResponses_Success()431 public void ServerStreaming_MoreResponses_Success() 432 { 433 asyncCall.StartServerStreamingCall("request1"); 434 var responseStream = new ClientResponseStream<string, string>(asyncCall); 435 436 var readTask1 = responseStream.MoveNext(); 437 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); 438 Assert.IsTrue(readTask1.Result); 439 Assert.AreEqual("response1", responseStream.Current); 440 441 var readTask2 = responseStream.MoveNext(); 442 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); 443 Assert.IsTrue(readTask2.Result); 444 Assert.AreEqual("response1", responseStream.Current); 445 446 var readTask3 = responseStream.MoveNext(); 447 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); 448 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); 449 450 AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3); 451 } 452 453 [Test] ServerStreaming_RequestSerializationExceptionDoesntLeakResources()454 public void ServerStreaming_RequestSerializationExceptionDoesntLeakResources() 455 { 456 string nullRequest = null; // will throw when serializing 457 Assert.Throws(typeof(ArgumentNullException), () => asyncCall.StartServerStreamingCall(nullRequest)); 458 Assert.AreEqual(0, channel.GetCallReferenceCount()); 459 Assert.IsTrue(fakeCall.IsDisposed); 460 461 var responseStream = new ClientResponseStream<string, string>(asyncCall); 462 var readTask = responseStream.MoveNext(); 463 } 464 465 [Test] ServerStreaming_StartCallFailureDoesntLeakResources()466 public void ServerStreaming_StartCallFailureDoesntLeakResources() 467 { 468 fakeCall.MakeStartCallFail(); 469 Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartServerStreamingCall("request1")); 470 Assert.AreEqual(0, channel.GetCallReferenceCount()); 471 Assert.IsTrue(fakeCall.IsDisposed); 472 } 473 474 [Test] DuplexStreaming_NoRequestNoResponse_Success()475 public void DuplexStreaming_NoRequestNoResponse_Success() 476 { 477 asyncCall.StartDuplexStreamingCall(); 478 var requestStream = new ClientRequestStream<string, string>(asyncCall); 479 var responseStream = new ClientResponseStream<string, string>(asyncCall); 480 481 var writeTask1 = requestStream.CompleteAsync(); 482 fakeCall.SendCompletionCallback.OnSendCompletion(true); 483 Assert.DoesNotThrowAsync(async () => await writeTask1); 484 485 var readTask = responseStream.MoveNext(); 486 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); 487 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); 488 489 AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); 490 } 491 492 [Test] DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException()493 public void DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException() 494 { 495 asyncCall.StartDuplexStreamingCall(); 496 var requestStream = new ClientRequestStream<string, string>(asyncCall); 497 var responseStream = new ClientResponseStream<string, string>(asyncCall); 498 499 var readTask = responseStream.MoveNext(); 500 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); 501 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); 502 503 AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); 504 505 var writeTask = requestStream.WriteAsync("request1"); 506 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 507 Assert.AreEqual(Status.DefaultSuccess, ex.Status); 508 } 509 510 [Test] DuplexStreaming_CompleteAfterReceivingStatusSuceeds()511 public void DuplexStreaming_CompleteAfterReceivingStatusSuceeds() 512 { 513 asyncCall.StartDuplexStreamingCall(); 514 var requestStream = new ClientRequestStream<string, string>(asyncCall); 515 var responseStream = new ClientResponseStream<string, string>(asyncCall); 516 517 var readTask = responseStream.MoveNext(); 518 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); 519 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); 520 521 AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); 522 523 Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync()); 524 } 525 526 [Test] DuplexStreaming_WriteFailureThrowsRpcException()527 public void DuplexStreaming_WriteFailureThrowsRpcException() 528 { 529 asyncCall.StartDuplexStreamingCall(); 530 var requestStream = new ClientRequestStream<string, string>(asyncCall); 531 var responseStream = new ClientResponseStream<string, string>(asyncCall); 532 533 var writeTask = requestStream.WriteAsync("request1"); 534 fakeCall.SendCompletionCallback.OnSendCompletion(false); 535 536 // The write will wait for call to finish to receive the status code. 537 Assert.IsFalse(writeTask.IsCompleted); 538 539 var readTask = responseStream.MoveNext(); 540 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); 541 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied)); 542 543 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 544 Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode); 545 546 AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied); 547 } 548 549 [Test] DuplexStreaming_WriteFailureThrowsRpcException2()550 public void DuplexStreaming_WriteFailureThrowsRpcException2() 551 { 552 asyncCall.StartDuplexStreamingCall(); 553 var requestStream = new ClientRequestStream<string, string>(asyncCall); 554 var responseStream = new ClientResponseStream<string, string>(asyncCall); 555 556 var writeTask = requestStream.WriteAsync("request1"); 557 558 var readTask = responseStream.MoveNext(); 559 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); 560 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied)); 561 fakeCall.SendCompletionCallback.OnSendCompletion(false); 562 563 var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); 564 Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode); 565 566 AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied); 567 } 568 569 [Test] DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()570 public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException() 571 { 572 asyncCall.StartDuplexStreamingCall(); 573 var requestStream = new ClientRequestStream<string, string>(asyncCall); 574 var responseStream = new ClientResponseStream<string, string>(asyncCall); 575 576 asyncCall.Cancel(); 577 Assert.IsTrue(fakeCall.IsCancelled); 578 579 var writeTask = requestStream.WriteAsync("request1"); 580 Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); 581 582 var readTask = responseStream.MoveNext(); 583 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); 584 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); 585 586 AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled); 587 } 588 589 [Test] DuplexStreaming_ReadAfterCancellationRequestCanSucceed()590 public void DuplexStreaming_ReadAfterCancellationRequestCanSucceed() 591 { 592 asyncCall.StartDuplexStreamingCall(); 593 var responseStream = new ClientResponseStream<string, string>(asyncCall); 594 595 asyncCall.Cancel(); 596 Assert.IsTrue(fakeCall.IsCancelled); 597 598 var readTask1 = responseStream.MoveNext(); 599 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); 600 Assert.IsTrue(readTask1.Result); 601 Assert.AreEqual("response1", responseStream.Current); 602 603 var readTask2 = responseStream.MoveNext(); 604 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); 605 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); 606 607 AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); 608 } 609 610 [Test] DuplexStreaming_ReadStartedBeforeCancellationRequestCanSucceed()611 public void DuplexStreaming_ReadStartedBeforeCancellationRequestCanSucceed() 612 { 613 asyncCall.StartDuplexStreamingCall(); 614 var responseStream = new ClientResponseStream<string, string>(asyncCall); 615 616 var readTask1 = responseStream.MoveNext(); // initiate the read before cancel request 617 asyncCall.Cancel(); 618 Assert.IsTrue(fakeCall.IsCancelled); 619 620 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); 621 Assert.IsTrue(readTask1.Result); 622 Assert.AreEqual("response1", responseStream.Current); 623 624 var readTask2 = responseStream.MoveNext(); 625 fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse()); 626 fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); 627 628 AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); 629 } 630 631 [Test] DuplexStreaming_StartCallFailureDoesntLeakResources()632 public void DuplexStreaming_StartCallFailureDoesntLeakResources() 633 { 634 fakeCall.MakeStartCallFail(); 635 Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartDuplexStreamingCall()); 636 Assert.AreEqual(0, channel.GetCallReferenceCount()); 637 Assert.IsTrue(fakeCall.IsDisposed); 638 } 639 CreateClientSideStatus(StatusCode statusCode)640 ClientSideStatus CreateClientSideStatus(StatusCode statusCode) 641 { 642 return new ClientSideStatus(new Status(statusCode, ""), new Metadata()); 643 } 644 CreateResponsePayload()645 IBufferReader CreateResponsePayload() 646 { 647 return fakeBufferReaderManager.CreateSingleSegmentBufferReader(Marshallers.StringMarshaller.Serializer("response1")); 648 } 649 CreateNullResponse()650 IBufferReader CreateNullResponse() 651 { 652 return fakeBufferReaderManager.CreateNullPayloadBufferReader(); 653 } 654 AssertUnaryResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask)655 static void AssertUnaryResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask) 656 { 657 Assert.IsTrue(resultTask.IsCompleted); 658 Assert.IsTrue(fakeCall.IsDisposed); 659 660 Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus()); 661 Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); 662 Assert.AreEqual(0, asyncCall.GetTrailers().Count); 663 Assert.AreEqual("response1", resultTask.Result); 664 } 665 AssertStreamingResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask)666 static void AssertStreamingResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask) 667 { 668 Assert.IsTrue(moveNextTask.IsCompleted); 669 Assert.IsTrue(fakeCall.IsDisposed); 670 671 Assert.IsFalse(moveNextTask.Result); 672 Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus()); 673 Assert.AreEqual(0, asyncCall.GetTrailers().Count); 674 } 675 AssertUnaryResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask, StatusCode expectedStatusCode)676 static void AssertUnaryResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask, StatusCode expectedStatusCode) 677 { 678 Assert.IsTrue(resultTask.IsCompleted); 679 Assert.IsTrue(fakeCall.IsDisposed); 680 681 Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode); 682 var ex = Assert.ThrowsAsync<RpcException>(async () => await resultTask); 683 Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode); 684 Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); 685 Assert.AreEqual(0, asyncCall.GetTrailers().Count); 686 } 687 AssertStreamingResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask, StatusCode expectedStatusCode)688 static void AssertStreamingResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask, StatusCode expectedStatusCode) 689 { 690 Assert.IsTrue(moveNextTask.IsCompleted); 691 Assert.IsTrue(fakeCall.IsDisposed); 692 693 var ex = Assert.ThrowsAsync<RpcException>(async () => await moveNextTask); 694 Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode); 695 Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode); 696 Assert.AreEqual(0, asyncCall.GetTrailers().Count); 697 } 698 } 699 } 700