• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Collections.Generic;
21 using System.Threading.Tasks;
22 
23 using Grpc.Core.Internal;
24 using Grpc.Core.Utils;
25 using NUnit.Framework;
26 
27 namespace Grpc.Core.Internal.Tests
28 {
29     /// <summary>
30     /// Uses fake native call to test interaction of <c>AsyncCall</c> wrapping code with C core in different situations.
31     /// </summary>
32     public class AsyncCallTest
33     {
34         Channel channel;
35         FakeNativeCall fakeCall;
36         AsyncCall<string, string> asyncCall;
37         FakeBufferReaderManager fakeBufferReaderManager;
38 
39         [SetUp]
Init()40         public void Init()
41         {
42             channel = new Channel("localhost", ChannelCredentials.Insecure);
43 
44             fakeCall = new FakeNativeCall();
45 
46             var callDetails = new CallInvocationDetails<string, string>(channel, "someMethod", null, Marshallers.StringMarshaller, Marshallers.StringMarshaller, new CallOptions());
47             asyncCall = new AsyncCall<string, string>(callDetails, fakeCall);
48             fakeBufferReaderManager = new FakeBufferReaderManager();
49         }
50 
51         [TearDown]
Cleanup()52         public void Cleanup()
53         {
54             channel.ShutdownAsync().Wait();
55             fakeBufferReaderManager.Dispose();
56         }
57 
58         [Test]
AsyncUnary_CanBeStartedOnlyOnce()59         public void AsyncUnary_CanBeStartedOnlyOnce()
60         {
61             asyncCall.UnaryCallAsync("request1");
62             Assert.Throws(typeof(InvalidOperationException),
63                 () => asyncCall.UnaryCallAsync("abc"));
64         }
65 
66         [Test]
AsyncUnary_StreamingOperationsNotAllowed()67         public void AsyncUnary_StreamingOperationsNotAllowed()
68         {
69             asyncCall.UnaryCallAsync("request1");
70             Assert.ThrowsAsync(typeof(InvalidOperationException),
71                 async () => await asyncCall.ReadMessageAsync());
72             Assert.Throws(typeof(InvalidOperationException),
73                 () => asyncCall.SendMessageAsync("abc", new WriteFlags()));
74         }
75 
76         [Test]
AsyncUnary_Success()77         public void AsyncUnary_Success()
78         {
79             var resultTask = asyncCall.UnaryCallAsync("request1");
80             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
81                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
82                 CreateResponsePayload(),
83                 new Metadata());
84 
85             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
86         }
87 
88         [Test]
AsyncUnary_NonSuccessStatusCode()89         public void AsyncUnary_NonSuccessStatusCode()
90         {
91             var resultTask = asyncCall.UnaryCallAsync("request1");
92             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
93                 CreateClientSideStatus(StatusCode.InvalidArgument),
94                 CreateNullResponse(),
95                 new Metadata());
96 
97             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
98         }
99 
100         [Test]
AsyncUnary_NullResponsePayload()101         public void AsyncUnary_NullResponsePayload()
102         {
103             var resultTask = asyncCall.UnaryCallAsync("request1");
104             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
105                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
106                 null,
107                 new Metadata());
108 
109             // failure to deserialize will result in InvalidArgument status.
110             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
111         }
112 
113         [Test]
AsyncUnary_RequestSerializationExceptionDoesntLeakResources()114         public void AsyncUnary_RequestSerializationExceptionDoesntLeakResources()
115         {
116             string nullRequest = null;  // will throw when serializing
117             Assert.Throws(typeof(ArgumentNullException), () => asyncCall.UnaryCallAsync(nullRequest));
118             Assert.AreEqual(0, channel.GetCallReferenceCount());
119             Assert.IsTrue(fakeCall.IsDisposed);
120         }
121 
122         [Test]
AsyncUnary_StartCallFailureDoesntLeakResources()123         public void AsyncUnary_StartCallFailureDoesntLeakResources()
124         {
125             fakeCall.MakeStartCallFail();
126             Assert.Throws(typeof(InvalidOperationException), () => asyncCall.UnaryCallAsync("request1"));
127             Assert.AreEqual(0, channel.GetCallReferenceCount());
128             Assert.IsTrue(fakeCall.IsDisposed);
129         }
130 
131         [Test]
SyncUnary_RequestSerializationExceptionDoesntLeakResources()132         public void SyncUnary_RequestSerializationExceptionDoesntLeakResources()
133         {
134             string nullRequest = null;  // will throw when serializing
135             Assert.Throws(typeof(ArgumentNullException), () => asyncCall.UnaryCall(nullRequest));
136             Assert.AreEqual(0, channel.GetCallReferenceCount());
137             Assert.IsTrue(fakeCall.IsDisposed);
138         }
139 
140         [Test]
SyncUnary_StartCallFailureDoesntLeakResources()141         public void SyncUnary_StartCallFailureDoesntLeakResources()
142         {
143             fakeCall.MakeStartCallFail();
144             Assert.Throws(typeof(InvalidOperationException), () => asyncCall.UnaryCall("request1"));
145             Assert.AreEqual(0, channel.GetCallReferenceCount());
146             Assert.IsTrue(fakeCall.IsDisposed);
147         }
148 
149         [Test]
ClientStreaming_StreamingReadNotAllowed()150         public void ClientStreaming_StreamingReadNotAllowed()
151         {
152             asyncCall.ClientStreamingCallAsync();
153             Assert.ThrowsAsync(typeof(InvalidOperationException),
154                 async () => await asyncCall.ReadMessageAsync());
155         }
156 
157         [Test]
ClientStreaming_NoRequest_Success()158         public void ClientStreaming_NoRequest_Success()
159         {
160             var resultTask = asyncCall.ClientStreamingCallAsync();
161             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
162                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
163                 CreateResponsePayload(),
164                 new Metadata());
165 
166             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
167         }
168 
169         [Test]
ClientStreaming_NoRequest_NonSuccessStatusCode()170         public void ClientStreaming_NoRequest_NonSuccessStatusCode()
171         {
172             var resultTask = asyncCall.ClientStreamingCallAsync();
173             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
174                 CreateClientSideStatus(StatusCode.InvalidArgument),
175                 CreateNullResponse(),
176                 new Metadata());
177 
178             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
179         }
180 
181         [Test]
ClientStreaming_MoreRequests_Success()182         public void ClientStreaming_MoreRequests_Success()
183         {
184             var resultTask = asyncCall.ClientStreamingCallAsync();
185             var requestStream = new ClientRequestStream<string, string>(asyncCall);
186 
187             var writeTask = requestStream.WriteAsync("request1");
188             fakeCall.SendCompletionCallback.OnSendCompletion(true);
189             writeTask.Wait();
190 
191             var writeTask2 = requestStream.WriteAsync("request2");
192             fakeCall.SendCompletionCallback.OnSendCompletion(true);
193             writeTask2.Wait();
194 
195             var completeTask = requestStream.CompleteAsync();
196             fakeCall.SendCompletionCallback.OnSendCompletion(true);
197             completeTask.Wait();
198 
199             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
200                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
201                 CreateResponsePayload(),
202                 new Metadata());
203 
204             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
205         }
206 
207         [Test]
ClientStreaming_WriteFailureThrowsRpcException()208         public void ClientStreaming_WriteFailureThrowsRpcException()
209         {
210             var resultTask = asyncCall.ClientStreamingCallAsync();
211             var requestStream = new ClientRequestStream<string, string>(asyncCall);
212 
213             var writeTask = requestStream.WriteAsync("request1");
214             fakeCall.SendCompletionCallback.OnSendCompletion(false);
215 
216             // The write will wait for call to finish to receive the status code.
217             Assert.IsFalse(writeTask.IsCompleted);
218 
219             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
220                 CreateClientSideStatus(StatusCode.Internal),
221                 CreateNullResponse(),
222                 new Metadata());
223 
224             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
225             Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
226 
227             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
228         }
229 
230         [Test]
ClientStreaming_WriteFailureThrowsRpcException2()231         public void ClientStreaming_WriteFailureThrowsRpcException2()
232         {
233             var resultTask = asyncCall.ClientStreamingCallAsync();
234             var requestStream = new ClientRequestStream<string, string>(asyncCall);
235 
236             var writeTask = requestStream.WriteAsync("request1");
237 
238             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
239                 CreateClientSideStatus(StatusCode.Internal),
240                 CreateNullResponse(),
241                 new Metadata());
242 
243             fakeCall.SendCompletionCallback.OnSendCompletion(false);
244 
245             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
246             Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
247 
248             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
249         }
250 
251         [Test]
ClientStreaming_WriteFailureThrowsRpcException3()252         public void ClientStreaming_WriteFailureThrowsRpcException3()
253         {
254             var resultTask = asyncCall.ClientStreamingCallAsync();
255             var requestStream = new ClientRequestStream<string, string>(asyncCall);
256 
257             var writeTask = requestStream.WriteAsync("request1");
258             fakeCall.SendCompletionCallback.OnSendCompletion(false);
259 
260             // Until the delayed write completion has been triggered,
261             // we still act as if there was an active write.
262             Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request2"));
263 
264             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
265                 CreateClientSideStatus(StatusCode.Internal),
266                 CreateNullResponse(),
267                 new Metadata());
268 
269             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
270             Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
271 
272             // Following attempts to write keep delivering the same status
273             var ex2 = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("after call has finished"));
274             Assert.AreEqual(StatusCode.Internal, ex2.Status.StatusCode);
275 
276             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
277         }
278 
279         [Test]
ClientStreaming_WriteAfterReceivingStatusThrowsRpcException()280         public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException()
281         {
282             var resultTask = asyncCall.ClientStreamingCallAsync();
283             var requestStream = new ClientRequestStream<string, string>(asyncCall);
284 
285             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
286                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
287                 CreateResponsePayload(),
288                 new Metadata());
289 
290             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
291 
292             var writeTask = requestStream.WriteAsync("request1");
293             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
294             Assert.AreEqual(Status.DefaultSuccess, ex.Status);
295         }
296 
297         [Test]
ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2()298         public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2()
299         {
300             var resultTask = asyncCall.ClientStreamingCallAsync();
301             var requestStream = new ClientRequestStream<string, string>(asyncCall);
302 
303             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
304                 new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
305                 CreateResponsePayload(),
306                 new Metadata());
307 
308             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
309 
310             var writeTask = requestStream.WriteAsync("request1");
311             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
312             Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
313         }
314 
315         [Test]
ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException()316         public void ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException()
317         {
318             var resultTask = asyncCall.ClientStreamingCallAsync();
319             var requestStream = new ClientRequestStream<string, string>(asyncCall);
320 
321             requestStream.CompleteAsync();
322 
323             Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
324 
325             fakeCall.SendCompletionCallback.OnSendCompletion(true);
326 
327             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
328                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
329                 CreateResponsePayload(),
330                 new Metadata());
331 
332             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
333         }
334 
335         [Test]
ClientStreaming_CompleteAfterReceivingStatusSucceeds()336         public void ClientStreaming_CompleteAfterReceivingStatusSucceeds()
337         {
338             var resultTask = asyncCall.ClientStreamingCallAsync();
339             var requestStream = new ClientRequestStream<string, string>(asyncCall);
340 
341             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
342                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
343                 CreateResponsePayload(),
344                 new Metadata());
345 
346             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
347             Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
348         }
349 
350         [Test]
ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()351         public void ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
352         {
353             var resultTask = asyncCall.ClientStreamingCallAsync();
354             var requestStream = new ClientRequestStream<string, string>(asyncCall);
355 
356             asyncCall.Cancel();
357             Assert.IsTrue(fakeCall.IsCancelled);
358 
359             var writeTask = requestStream.WriteAsync("request1");
360             Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
361 
362             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
363                 CreateClientSideStatus(StatusCode.Cancelled),
364                 CreateNullResponse(),
365                 new Metadata());
366 
367             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled);
368         }
369 
370         [Test]
ClientStreaming_StartCallFailureDoesntLeakResources()371         public void ClientStreaming_StartCallFailureDoesntLeakResources()
372         {
373             fakeCall.MakeStartCallFail();
374             Assert.Throws(typeof(InvalidOperationException), () => asyncCall.ClientStreamingCallAsync());
375             Assert.AreEqual(0, channel.GetCallReferenceCount());
376             Assert.IsTrue(fakeCall.IsDisposed);
377         }
378 
379         [Test]
ServerStreaming_StreamingSendNotAllowed()380         public void ServerStreaming_StreamingSendNotAllowed()
381         {
382             asyncCall.StartServerStreamingCall("request1");
383             Assert.Throws(typeof(InvalidOperationException),
384                 () => asyncCall.SendMessageAsync("abc", new WriteFlags()));
385         }
386 
387         [Test]
ServerStreaming_NoResponse_Success1()388         public void ServerStreaming_NoResponse_Success1()
389         {
390             asyncCall.StartServerStreamingCall("request1");
391             var responseStream = new ClientResponseStream<string, string>(asyncCall);
392             var readTask = responseStream.MoveNext();
393 
394             fakeCall.ReceivedResponseHeadersCallback.OnReceivedResponseHeaders(true, new Metadata());
395             Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
396 
397             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
398             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
399 
400             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
401         }
402 
403         [Test]
ServerStreaming_NoResponse_Success2()404         public void ServerStreaming_NoResponse_Success2()
405         {
406             asyncCall.StartServerStreamingCall("request1");
407             var responseStream = new ClientResponseStream<string, string>(asyncCall);
408             var readTask = responseStream.MoveNext();
409 
410             // try alternative order of completions
411             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
412             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
413 
414             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
415         }
416 
417         [Test]
ServerStreaming_NoResponse_ReadFailure()418         public void ServerStreaming_NoResponse_ReadFailure()
419         {
420             asyncCall.StartServerStreamingCall("request1");
421             var responseStream = new ClientResponseStream<string, string>(asyncCall);
422             var readTask = responseStream.MoveNext();
423 
424             fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, CreateNullResponse());  // after a failed read, we rely on C core to deliver appropriate status code.
425             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Internal));
426 
427             AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal);
428         }
429 
430         [Test]
ServerStreaming_MoreResponses_Success()431         public void ServerStreaming_MoreResponses_Success()
432         {
433             asyncCall.StartServerStreamingCall("request1");
434             var responseStream = new ClientResponseStream<string, string>(asyncCall);
435 
436             var readTask1 = responseStream.MoveNext();
437             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
438             Assert.IsTrue(readTask1.Result);
439             Assert.AreEqual("response1", responseStream.Current);
440 
441             var readTask2 = responseStream.MoveNext();
442             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
443             Assert.IsTrue(readTask2.Result);
444             Assert.AreEqual("response1", responseStream.Current);
445 
446             var readTask3 = responseStream.MoveNext();
447             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
448             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
449 
450             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3);
451         }
452 
453         [Test]
ServerStreaming_RequestSerializationExceptionDoesntLeakResources()454         public void ServerStreaming_RequestSerializationExceptionDoesntLeakResources()
455         {
456             string nullRequest = null;  // will throw when serializing
457             Assert.Throws(typeof(ArgumentNullException), () => asyncCall.StartServerStreamingCall(nullRequest));
458             Assert.AreEqual(0, channel.GetCallReferenceCount());
459             Assert.IsTrue(fakeCall.IsDisposed);
460 
461             var responseStream = new ClientResponseStream<string, string>(asyncCall);
462             var readTask = responseStream.MoveNext();
463         }
464 
465         [Test]
ServerStreaming_StartCallFailureDoesntLeakResources()466         public void ServerStreaming_StartCallFailureDoesntLeakResources()
467         {
468             fakeCall.MakeStartCallFail();
469             Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartServerStreamingCall("request1"));
470             Assert.AreEqual(0, channel.GetCallReferenceCount());
471             Assert.IsTrue(fakeCall.IsDisposed);
472         }
473 
474         [Test]
DuplexStreaming_NoRequestNoResponse_Success()475         public void DuplexStreaming_NoRequestNoResponse_Success()
476         {
477             asyncCall.StartDuplexStreamingCall();
478             var requestStream = new ClientRequestStream<string, string>(asyncCall);
479             var responseStream = new ClientResponseStream<string, string>(asyncCall);
480 
481             var writeTask1 = requestStream.CompleteAsync();
482             fakeCall.SendCompletionCallback.OnSendCompletion(true);
483             Assert.DoesNotThrowAsync(async () => await writeTask1);
484 
485             var readTask = responseStream.MoveNext();
486             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
487             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
488 
489             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
490         }
491 
492         [Test]
DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException()493         public void DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException()
494         {
495             asyncCall.StartDuplexStreamingCall();
496             var requestStream = new ClientRequestStream<string, string>(asyncCall);
497             var responseStream = new ClientResponseStream<string, string>(asyncCall);
498 
499             var readTask = responseStream.MoveNext();
500             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
501             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
502 
503             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
504 
505             var writeTask = requestStream.WriteAsync("request1");
506             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
507             Assert.AreEqual(Status.DefaultSuccess, ex.Status);
508         }
509 
510         [Test]
DuplexStreaming_CompleteAfterReceivingStatusSuceeds()511         public void DuplexStreaming_CompleteAfterReceivingStatusSuceeds()
512         {
513             asyncCall.StartDuplexStreamingCall();
514             var requestStream = new ClientRequestStream<string, string>(asyncCall);
515             var responseStream = new ClientResponseStream<string, string>(asyncCall);
516 
517             var readTask = responseStream.MoveNext();
518             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
519             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
520 
521             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
522 
523             Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
524         }
525 
526         [Test]
DuplexStreaming_WriteFailureThrowsRpcException()527         public void DuplexStreaming_WriteFailureThrowsRpcException()
528         {
529             asyncCall.StartDuplexStreamingCall();
530             var requestStream = new ClientRequestStream<string, string>(asyncCall);
531             var responseStream = new ClientResponseStream<string, string>(asyncCall);
532 
533             var writeTask = requestStream.WriteAsync("request1");
534             fakeCall.SendCompletionCallback.OnSendCompletion(false);
535 
536             // The write will wait for call to finish to receive the status code.
537             Assert.IsFalse(writeTask.IsCompleted);
538 
539             var readTask = responseStream.MoveNext();
540             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
541             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
542 
543             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
544             Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
545 
546             AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied);
547         }
548 
549         [Test]
DuplexStreaming_WriteFailureThrowsRpcException2()550         public void DuplexStreaming_WriteFailureThrowsRpcException2()
551         {
552             asyncCall.StartDuplexStreamingCall();
553             var requestStream = new ClientRequestStream<string, string>(asyncCall);
554             var responseStream = new ClientResponseStream<string, string>(asyncCall);
555 
556             var writeTask = requestStream.WriteAsync("request1");
557 
558             var readTask = responseStream.MoveNext();
559             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
560             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
561             fakeCall.SendCompletionCallback.OnSendCompletion(false);
562 
563             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
564             Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
565 
566             AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied);
567         }
568 
569         [Test]
DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()570         public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
571         {
572             asyncCall.StartDuplexStreamingCall();
573             var requestStream = new ClientRequestStream<string, string>(asyncCall);
574             var responseStream = new ClientResponseStream<string, string>(asyncCall);
575 
576             asyncCall.Cancel();
577             Assert.IsTrue(fakeCall.IsCancelled);
578 
579             var writeTask = requestStream.WriteAsync("request1");
580             Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
581 
582             var readTask = responseStream.MoveNext();
583             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
584             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
585 
586             AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled);
587         }
588 
589         [Test]
DuplexStreaming_ReadAfterCancellationRequestCanSucceed()590         public void DuplexStreaming_ReadAfterCancellationRequestCanSucceed()
591         {
592             asyncCall.StartDuplexStreamingCall();
593             var responseStream = new ClientResponseStream<string, string>(asyncCall);
594 
595             asyncCall.Cancel();
596             Assert.IsTrue(fakeCall.IsCancelled);
597 
598             var readTask1 = responseStream.MoveNext();
599             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
600             Assert.IsTrue(readTask1.Result);
601             Assert.AreEqual("response1", responseStream.Current);
602 
603             var readTask2 = responseStream.MoveNext();
604             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
605             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
606 
607             AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
608         }
609 
610         [Test]
DuplexStreaming_ReadStartedBeforeCancellationRequestCanSucceed()611         public void DuplexStreaming_ReadStartedBeforeCancellationRequestCanSucceed()
612         {
613             asyncCall.StartDuplexStreamingCall();
614             var responseStream = new ClientResponseStream<string, string>(asyncCall);
615 
616             var readTask1 = responseStream.MoveNext();  // initiate the read before cancel request
617             asyncCall.Cancel();
618             Assert.IsTrue(fakeCall.IsCancelled);
619 
620             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
621             Assert.IsTrue(readTask1.Result);
622             Assert.AreEqual("response1", responseStream.Current);
623 
624             var readTask2 = responseStream.MoveNext();
625             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
626             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
627 
628             AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
629         }
630 
631         [Test]
DuplexStreaming_StartCallFailureDoesntLeakResources()632         public void DuplexStreaming_StartCallFailureDoesntLeakResources()
633         {
634             fakeCall.MakeStartCallFail();
635             Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartDuplexStreamingCall());
636             Assert.AreEqual(0, channel.GetCallReferenceCount());
637             Assert.IsTrue(fakeCall.IsDisposed);
638         }
639 
CreateClientSideStatus(StatusCode statusCode)640         ClientSideStatus CreateClientSideStatus(StatusCode statusCode)
641         {
642             return new ClientSideStatus(new Status(statusCode, ""), new Metadata());
643         }
644 
CreateResponsePayload()645         IBufferReader CreateResponsePayload()
646         {
647             return fakeBufferReaderManager.CreateSingleSegmentBufferReader(Marshallers.StringMarshaller.Serializer("response1"));
648         }
649 
CreateNullResponse()650         IBufferReader CreateNullResponse()
651         {
652             return fakeBufferReaderManager.CreateNullPayloadBufferReader();
653         }
654 
AssertUnaryResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask)655         static void AssertUnaryResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask)
656         {
657             Assert.IsTrue(resultTask.IsCompleted);
658             Assert.IsTrue(fakeCall.IsDisposed);
659 
660             Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
661             Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
662             Assert.AreEqual(0, asyncCall.GetTrailers().Count);
663             Assert.AreEqual("response1", resultTask.Result);
664         }
665 
AssertStreamingResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask)666         static void AssertStreamingResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask)
667         {
668             Assert.IsTrue(moveNextTask.IsCompleted);
669             Assert.IsTrue(fakeCall.IsDisposed);
670 
671             Assert.IsFalse(moveNextTask.Result);
672             Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
673             Assert.AreEqual(0, asyncCall.GetTrailers().Count);
674         }
675 
AssertUnaryResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask, StatusCode expectedStatusCode)676         static void AssertUnaryResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask, StatusCode expectedStatusCode)
677         {
678             Assert.IsTrue(resultTask.IsCompleted);
679             Assert.IsTrue(fakeCall.IsDisposed);
680 
681             Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode);
682             var ex = Assert.ThrowsAsync<RpcException>(async () => await resultTask);
683             Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode);
684             Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
685             Assert.AreEqual(0, asyncCall.GetTrailers().Count);
686         }
687 
AssertStreamingResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask, StatusCode expectedStatusCode)688         static void AssertStreamingResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask, StatusCode expectedStatusCode)
689         {
690             Assert.IsTrue(moveNextTask.IsCompleted);
691             Assert.IsTrue(fakeCall.IsDisposed);
692 
693             var ex = Assert.ThrowsAsync<RpcException>(async () => await moveNextTask);
694             Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode);
695             Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode);
696             Assert.AreEqual(0, asyncCall.GetTrailers().Count);
697         }
698     }
699 }
700