• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpcpp/channel.h>
20 #include <grpcpp/client_context.h>
21 #include <grpcpp/create_channel.h>
22 #include <grpcpp/generic/generic_stub.h>
23 #include <grpcpp/impl/codegen/proto_utils.h>
24 #include <grpcpp/server.h>
25 #include <grpcpp/server_builder.h>
26 #include <grpcpp/server_context.h>
27 #include <grpcpp/support/client_callback.h>
28 #include <gtest/gtest.h>
29 
30 #include <algorithm>
31 #include <condition_variable>
32 #include <functional>
33 #include <mutex>
34 #include <sstream>
35 #include <thread>
36 
37 #include "src/core/lib/gpr/env.h"
38 #include "src/core/lib/iomgr/iomgr.h"
39 #include "src/proto/grpc/testing/echo.grpc.pb.h"
40 #include "test/core/util/port.h"
41 #include "test/core/util/test_config.h"
42 #include "test/cpp/end2end/interceptors_util.h"
43 #include "test/cpp/end2end/test_service_impl.h"
44 #include "test/cpp/util/byte_buffer_proto_helper.h"
45 #include "test/cpp/util/string_ref_helper.h"
46 #include "test/cpp/util/test_credentials_provider.h"
47 
48 namespace grpc {
49 namespace testing {
50 namespace {
51 
52 enum class Protocol { INPROC, TCP };
53 
54 class TestScenario {
55  public:
TestScenario(bool serve_callback,Protocol protocol,bool intercept,const std::string & creds_type)56   TestScenario(bool serve_callback, Protocol protocol, bool intercept,
57                const std::string& creds_type)
58       : callback_server(serve_callback),
59         protocol(protocol),
60         use_interceptors(intercept),
61         credentials_type(creds_type) {}
62   void Log() const;
63   bool callback_server;
64   Protocol protocol;
65   bool use_interceptors;
66   const std::string credentials_type;
67 };
68 
operator <<(std::ostream & out,const TestScenario & scenario)69 static std::ostream& operator<<(std::ostream& out,
70                                 const TestScenario& scenario) {
71   return out << "TestScenario{callback_server="
72              << (scenario.callback_server ? "true" : "false") << ",protocol="
73              << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP")
74              << ",intercept=" << (scenario.use_interceptors ? "true" : "false")
75              << ",creds=" << scenario.credentials_type << "}";
76 }
77 
Log() const78 void TestScenario::Log() const {
79   std::ostringstream out;
80   out << *this;
81   gpr_log(GPR_DEBUG, "%s", out.str().c_str());
82 }
83 
84 class ClientCallbackEnd2endTest
85     : public ::testing::TestWithParam<TestScenario> {
86  protected:
ClientCallbackEnd2endTest()87   ClientCallbackEnd2endTest() { GetParam().Log(); }
88 
SetUp()89   void SetUp() override {
90     ServerBuilder builder;
91 
92     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
93         GetParam().credentials_type);
94     // TODO(vjpai): Support testing of AuthMetadataProcessor
95 
96     if (GetParam().protocol == Protocol::TCP) {
97       picked_port_ = grpc_pick_unused_port_or_die();
98       server_address_ << "localhost:" << picked_port_;
99       builder.AddListeningPort(server_address_.str(), server_creds);
100     }
101     if (!GetParam().callback_server) {
102       builder.RegisterService(&service_);
103     } else {
104       builder.RegisterService(&callback_service_);
105     }
106 
107     if (GetParam().use_interceptors) {
108       std::vector<
109           std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
110           creators;
111       // Add 20 dummy server interceptors
112       creators.reserve(20);
113       for (auto i = 0; i < 20; i++) {
114         creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
115             new DummyInterceptorFactory()));
116       }
117       builder.experimental().SetInterceptorCreators(std::move(creators));
118     }
119 
120     server_ = builder.BuildAndStart();
121     is_server_started_ = true;
122   }
123 
ResetStub()124   void ResetStub() {
125     ChannelArguments args;
126     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
127         GetParam().credentials_type, &args);
128     switch (GetParam().protocol) {
129       case Protocol::TCP:
130         if (!GetParam().use_interceptors) {
131           channel_ = ::grpc::CreateCustomChannel(server_address_.str(),
132                                                  channel_creds, args);
133         } else {
134           channel_ = CreateCustomChannelWithInterceptors(
135               server_address_.str(), channel_creds, args,
136               CreateDummyClientInterceptors());
137         }
138         break;
139       case Protocol::INPROC:
140         if (!GetParam().use_interceptors) {
141           channel_ = server_->InProcessChannel(args);
142         } else {
143           channel_ = server_->experimental().InProcessChannelWithInterceptors(
144               args, CreateDummyClientInterceptors());
145         }
146         break;
147       default:
148         assert(false);
149     }
150     stub_ = grpc::testing::EchoTestService::NewStub(channel_);
151     generic_stub_.reset(new GenericStub(channel_));
152     DummyInterceptor::Reset();
153   }
154 
TearDown()155   void TearDown() override {
156     if (is_server_started_) {
157       // Although we would normally do an explicit shutdown, the server
158       // should also work correctly with just a destructor call. The regular
159       // end2end test uses explicit shutdown, so let this one just do reset.
160       server_.reset();
161     }
162     if (picked_port_ > 0) {
163       grpc_recycle_unused_port(picked_port_);
164     }
165   }
166 
SendRpcs(int num_rpcs,bool with_binary_metadata)167   void SendRpcs(int num_rpcs, bool with_binary_metadata) {
168     std::string test_string("");
169     for (int i = 0; i < num_rpcs; i++) {
170       EchoRequest request;
171       EchoResponse response;
172       ClientContext cli_ctx;
173 
174       test_string += "Hello world. ";
175       request.set_message(test_string);
176       std::string val;
177       if (with_binary_metadata) {
178         request.mutable_param()->set_echo_metadata(true);
179         char bytes[8] = {'\0', '\1', '\2', '\3',
180                          '\4', '\5', '\6', static_cast<char>(i)};
181         val = std::string(bytes, 8);
182         cli_ctx.AddMetadata("custom-bin", val);
183       }
184 
185       cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
186 
187       std::mutex mu;
188       std::condition_variable cv;
189       bool done = false;
190       stub_->experimental_async()->Echo(
191           &cli_ctx, &request, &response,
192           [&cli_ctx, &request, &response, &done, &mu, &cv, val,
193            with_binary_metadata](Status s) {
194             GPR_ASSERT(s.ok());
195 
196             EXPECT_EQ(request.message(), response.message());
197             if (with_binary_metadata) {
198               EXPECT_EQ(
199                   1u, cli_ctx.GetServerTrailingMetadata().count("custom-bin"));
200               EXPECT_EQ(val, ToString(cli_ctx.GetServerTrailingMetadata()
201                                           .find("custom-bin")
202                                           ->second));
203             }
204             std::lock_guard<std::mutex> l(mu);
205             done = true;
206             cv.notify_one();
207           });
208       std::unique_lock<std::mutex> l(mu);
209       while (!done) {
210         cv.wait(l);
211       }
212     }
213   }
214 
SendRpcsRawReq(int num_rpcs)215   void SendRpcsRawReq(int num_rpcs) {
216     std::string test_string("Hello raw world.");
217     EchoRequest request;
218     request.set_message(test_string);
219     std::unique_ptr<ByteBuffer> send_buf = SerializeToByteBuffer(&request);
220 
221     for (int i = 0; i < num_rpcs; i++) {
222       EchoResponse response;
223       ClientContext cli_ctx;
224 
225       std::mutex mu;
226       std::condition_variable cv;
227       bool done = false;
228       stub_->experimental_async()->Echo(
229           &cli_ctx, send_buf.get(), &response,
230           [&request, &response, &done, &mu, &cv](Status s) {
231             GPR_ASSERT(s.ok());
232 
233             EXPECT_EQ(request.message(), response.message());
234             std::lock_guard<std::mutex> l(mu);
235             done = true;
236             cv.notify_one();
237           });
238       std::unique_lock<std::mutex> l(mu);
239       while (!done) {
240         cv.wait(l);
241       }
242     }
243   }
244 
SendRpcsGeneric(int num_rpcs,bool maybe_except)245   void SendRpcsGeneric(int num_rpcs, bool maybe_except) {
246     const std::string kMethodName("/grpc.testing.EchoTestService/Echo");
247     std::string test_string("");
248     for (int i = 0; i < num_rpcs; i++) {
249       EchoRequest request;
250       std::unique_ptr<ByteBuffer> send_buf;
251       ByteBuffer recv_buf;
252       ClientContext cli_ctx;
253 
254       test_string += "Hello world. ";
255       request.set_message(test_string);
256       send_buf = SerializeToByteBuffer(&request);
257 
258       std::mutex mu;
259       std::condition_variable cv;
260       bool done = false;
261       generic_stub_->experimental().UnaryCall(
262           &cli_ctx, kMethodName, send_buf.get(), &recv_buf,
263           [&request, &recv_buf, &done, &mu, &cv, maybe_except](Status s) {
264             GPR_ASSERT(s.ok());
265 
266             EchoResponse response;
267             EXPECT_TRUE(ParseFromByteBuffer(&recv_buf, &response));
268             EXPECT_EQ(request.message(), response.message());
269             std::lock_guard<std::mutex> l(mu);
270             done = true;
271             cv.notify_one();
272 #if GRPC_ALLOW_EXCEPTIONS
273             if (maybe_except) {
274               throw - 1;
275             }
276 #else
277             GPR_ASSERT(!maybe_except);
278 #endif
279           });
280       std::unique_lock<std::mutex> l(mu);
281       while (!done) {
282         cv.wait(l);
283       }
284     }
285   }
286 
SendGenericEchoAsBidi(int num_rpcs,int reuses,bool do_writes_done)287   void SendGenericEchoAsBidi(int num_rpcs, int reuses, bool do_writes_done) {
288     const std::string kMethodName("/grpc.testing.EchoTestService/Echo");
289     std::string test_string("");
290     for (int i = 0; i < num_rpcs; i++) {
291       test_string += "Hello world. ";
292       class Client : public grpc::experimental::ClientBidiReactor<ByteBuffer,
293                                                                   ByteBuffer> {
294        public:
295         Client(ClientCallbackEnd2endTest* test, const std::string& method_name,
296                const std::string& test_str, int reuses, bool do_writes_done)
297             : reuses_remaining_(reuses), do_writes_done_(do_writes_done) {
298           activate_ = [this, test, method_name, test_str] {
299             if (reuses_remaining_ > 0) {
300               cli_ctx_.reset(new ClientContext);
301               reuses_remaining_--;
302               test->generic_stub_->experimental().PrepareBidiStreamingCall(
303                   cli_ctx_.get(), method_name, this);
304               request_.set_message(test_str);
305               send_buf_ = SerializeToByteBuffer(&request_);
306               StartWrite(send_buf_.get());
307               StartRead(&recv_buf_);
308               StartCall();
309             } else {
310               std::unique_lock<std::mutex> l(mu_);
311               done_ = true;
312               cv_.notify_one();
313             }
314           };
315           activate_();
316         }
317         void OnWriteDone(bool /*ok*/) override {
318           if (do_writes_done_) {
319             StartWritesDone();
320           }
321         }
322         void OnReadDone(bool /*ok*/) override {
323           EchoResponse response;
324           EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
325           EXPECT_EQ(request_.message(), response.message());
326         };
327         void OnDone(const Status& s) override {
328           EXPECT_TRUE(s.ok());
329           activate_();
330         }
331         void Await() {
332           std::unique_lock<std::mutex> l(mu_);
333           while (!done_) {
334             cv_.wait(l);
335           }
336         }
337 
338         EchoRequest request_;
339         std::unique_ptr<ByteBuffer> send_buf_;
340         ByteBuffer recv_buf_;
341         std::unique_ptr<ClientContext> cli_ctx_;
342         int reuses_remaining_;
343         std::function<void()> activate_;
344         std::mutex mu_;
345         std::condition_variable cv_;
346         bool done_ = false;
347         const bool do_writes_done_;
348       };
349 
350       Client rpc(this, kMethodName, test_string, reuses, do_writes_done);
351 
352       rpc.Await();
353     }
354   }
355   bool is_server_started_{false};
356   int picked_port_{0};
357   std::shared_ptr<Channel> channel_;
358   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
359   std::unique_ptr<grpc::GenericStub> generic_stub_;
360   TestServiceImpl service_;
361   CallbackTestServiceImpl callback_service_;
362   std::unique_ptr<Server> server_;
363   std::ostringstream server_address_;
364 };
365 
TEST_P(ClientCallbackEnd2endTest,SimpleRpc)366 TEST_P(ClientCallbackEnd2endTest, SimpleRpc) {
367   ResetStub();
368   SendRpcs(1, false);
369 }
370 
TEST_P(ClientCallbackEnd2endTest,SimpleRpcExpectedError)371 TEST_P(ClientCallbackEnd2endTest, SimpleRpcExpectedError) {
372   ResetStub();
373 
374   EchoRequest request;
375   EchoResponse response;
376   ClientContext cli_ctx;
377   ErrorStatus error_status;
378 
379   request.set_message("Hello failure");
380   error_status.set_code(1);  // CANCELLED
381   error_status.set_error_message("cancel error message");
382   *request.mutable_param()->mutable_expected_error() = error_status;
383 
384   std::mutex mu;
385   std::condition_variable cv;
386   bool done = false;
387 
388   stub_->experimental_async()->Echo(
389       &cli_ctx, &request, &response,
390       [&response, &done, &mu, &cv, &error_status](Status s) {
391         EXPECT_EQ("", response.message());
392         EXPECT_EQ(error_status.code(), s.error_code());
393         EXPECT_EQ(error_status.error_message(), s.error_message());
394         std::lock_guard<std::mutex> l(mu);
395         done = true;
396         cv.notify_one();
397       });
398 
399   std::unique_lock<std::mutex> l(mu);
400   while (!done) {
401     cv.wait(l);
402   }
403 }
404 
TEST_P(ClientCallbackEnd2endTest,SimpleRpcUnderLockNested)405 TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLockNested) {
406   ResetStub();
407   std::mutex mu1, mu2, mu3;
408   std::condition_variable cv;
409   bool done = false;
410   EchoRequest request1, request2, request3;
411   request1.set_message("Hello locked world1.");
412   request2.set_message("Hello locked world2.");
413   request3.set_message("Hello locked world3.");
414   EchoResponse response1, response2, response3;
415   ClientContext cli_ctx1, cli_ctx2, cli_ctx3;
416   {
417     std::lock_guard<std::mutex> l(mu1);
418     stub_->experimental_async()->Echo(
419         &cli_ctx1, &request1, &response1,
420         [this, &mu1, &mu2, &mu3, &cv, &done, &request1, &request2, &request3,
421          &response1, &response2, &response3, &cli_ctx2, &cli_ctx3](Status s1) {
422           std::lock_guard<std::mutex> l1(mu1);
423           EXPECT_TRUE(s1.ok());
424           EXPECT_EQ(request1.message(), response1.message());
425           // start the second level of nesting
426           std::unique_lock<std::mutex> l2(mu2);
427           this->stub_->experimental_async()->Echo(
428               &cli_ctx2, &request2, &response2,
429               [this, &mu2, &mu3, &cv, &done, &request2, &request3, &response2,
430                &response3, &cli_ctx3](Status s2) {
431                 std::lock_guard<std::mutex> l2(mu2);
432                 EXPECT_TRUE(s2.ok());
433                 EXPECT_EQ(request2.message(), response2.message());
434                 // start the third level of nesting
435                 std::lock_guard<std::mutex> l3(mu3);
436                 stub_->experimental_async()->Echo(
437                     &cli_ctx3, &request3, &response3,
438                     [&mu3, &cv, &done, &request3, &response3](Status s3) {
439                       std::lock_guard<std::mutex> l(mu3);
440                       EXPECT_TRUE(s3.ok());
441                       EXPECT_EQ(request3.message(), response3.message());
442                       done = true;
443                       cv.notify_all();
444                     });
445               });
446         });
447   }
448 
449   std::unique_lock<std::mutex> l(mu3);
450   while (!done) {
451     cv.wait(l);
452   }
453 }
454 
TEST_P(ClientCallbackEnd2endTest,SimpleRpcUnderLock)455 TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLock) {
456   ResetStub();
457   std::mutex mu;
458   std::condition_variable cv;
459   bool done = false;
460   EchoRequest request;
461   request.set_message("Hello locked world.");
462   EchoResponse response;
463   ClientContext cli_ctx;
464   {
465     std::lock_guard<std::mutex> l(mu);
466     stub_->experimental_async()->Echo(
467         &cli_ctx, &request, &response,
468         [&mu, &cv, &done, &request, &response](Status s) {
469           std::lock_guard<std::mutex> l(mu);
470           EXPECT_TRUE(s.ok());
471           EXPECT_EQ(request.message(), response.message());
472           done = true;
473           cv.notify_one();
474         });
475   }
476   std::unique_lock<std::mutex> l(mu);
477   while (!done) {
478     cv.wait(l);
479   }
480 }
481 
TEST_P(ClientCallbackEnd2endTest,SequentialRpcs)482 TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) {
483   ResetStub();
484   SendRpcs(10, false);
485 }
486 
TEST_P(ClientCallbackEnd2endTest,SequentialRpcsRawReq)487 TEST_P(ClientCallbackEnd2endTest, SequentialRpcsRawReq) {
488   ResetStub();
489   SendRpcsRawReq(10);
490 }
491 
TEST_P(ClientCallbackEnd2endTest,SendClientInitialMetadata)492 TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) {
493   ResetStub();
494   SimpleRequest request;
495   SimpleResponse response;
496   ClientContext cli_ctx;
497 
498   cli_ctx.AddMetadata(kCheckClientInitialMetadataKey,
499                       kCheckClientInitialMetadataVal);
500 
501   std::mutex mu;
502   std::condition_variable cv;
503   bool done = false;
504   stub_->experimental_async()->CheckClientInitialMetadata(
505       &cli_ctx, &request, &response, [&done, &mu, &cv](Status s) {
506         GPR_ASSERT(s.ok());
507 
508         std::lock_guard<std::mutex> l(mu);
509         done = true;
510         cv.notify_one();
511       });
512   std::unique_lock<std::mutex> l(mu);
513   while (!done) {
514     cv.wait(l);
515   }
516 }
517 
TEST_P(ClientCallbackEnd2endTest,SimpleRpcWithBinaryMetadata)518 TEST_P(ClientCallbackEnd2endTest, SimpleRpcWithBinaryMetadata) {
519   ResetStub();
520   SendRpcs(1, true);
521 }
522 
TEST_P(ClientCallbackEnd2endTest,SequentialRpcsWithVariedBinaryMetadataValue)523 TEST_P(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) {
524   ResetStub();
525   SendRpcs(10, true);
526 }
527 
TEST_P(ClientCallbackEnd2endTest,SequentialGenericRpcs)528 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
529   ResetStub();
530   SendRpcsGeneric(10, false);
531 }
532 
TEST_P(ClientCallbackEnd2endTest,SequentialGenericRpcsAsBidi)533 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) {
534   ResetStub();
535   SendGenericEchoAsBidi(10, 1, /*do_writes_done=*/true);
536 }
537 
TEST_P(ClientCallbackEnd2endTest,SequentialGenericRpcsAsBidiWithReactorReuse)538 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) {
539   ResetStub();
540   SendGenericEchoAsBidi(10, 10, /*do_writes_done=*/true);
541 }
542 
TEST_P(ClientCallbackEnd2endTest,GenericRpcNoWritesDone)543 TEST_P(ClientCallbackEnd2endTest, GenericRpcNoWritesDone) {
544   ResetStub();
545   SendGenericEchoAsBidi(1, 1, /*do_writes_done=*/false);
546 }
547 
548 #if GRPC_ALLOW_EXCEPTIONS
TEST_P(ClientCallbackEnd2endTest,ExceptingRpc)549 TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) {
550   ResetStub();
551   SendRpcsGeneric(10, true);
552 }
553 #endif
554 
TEST_P(ClientCallbackEnd2endTest,MultipleRpcsWithVariedBinaryMetadataValue)555 TEST_P(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
556   ResetStub();
557   std::vector<std::thread> threads;
558   threads.reserve(10);
559   for (int i = 0; i < 10; ++i) {
560     threads.emplace_back([this] { SendRpcs(10, true); });
561   }
562   for (int i = 0; i < 10; ++i) {
563     threads[i].join();
564   }
565 }
566 
TEST_P(ClientCallbackEnd2endTest,MultipleRpcs)567 TEST_P(ClientCallbackEnd2endTest, MultipleRpcs) {
568   ResetStub();
569   std::vector<std::thread> threads;
570   threads.reserve(10);
571   for (int i = 0; i < 10; ++i) {
572     threads.emplace_back([this] { SendRpcs(10, false); });
573   }
574   for (int i = 0; i < 10; ++i) {
575     threads[i].join();
576   }
577 }
578 
TEST_P(ClientCallbackEnd2endTest,CancelRpcBeforeStart)579 TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) {
580   ResetStub();
581   EchoRequest request;
582   EchoResponse response;
583   ClientContext context;
584   request.set_message("hello");
585   context.TryCancel();
586 
587   std::mutex mu;
588   std::condition_variable cv;
589   bool done = false;
590   stub_->experimental_async()->Echo(
591       &context, &request, &response, [&response, &done, &mu, &cv](Status s) {
592         EXPECT_EQ("", response.message());
593         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
594         std::lock_guard<std::mutex> l(mu);
595         done = true;
596         cv.notify_one();
597       });
598   std::unique_lock<std::mutex> l(mu);
599   while (!done) {
600     cv.wait(l);
601   }
602   if (GetParam().use_interceptors) {
603     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
604   }
605 }
606 
TEST_P(ClientCallbackEnd2endTest,RequestEchoServerCancel)607 TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) {
608   ResetStub();
609   EchoRequest request;
610   EchoResponse response;
611   ClientContext context;
612   request.set_message("hello");
613   context.AddMetadata(kServerTryCancelRequest,
614                       std::to_string(CANCEL_BEFORE_PROCESSING));
615 
616   std::mutex mu;
617   std::condition_variable cv;
618   bool done = false;
619   stub_->experimental_async()->Echo(
620       &context, &request, &response, [&done, &mu, &cv](Status s) {
621         EXPECT_FALSE(s.ok());
622         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
623         std::lock_guard<std::mutex> l(mu);
624         done = true;
625         cv.notify_one();
626       });
627   std::unique_lock<std::mutex> l(mu);
628   while (!done) {
629     cv.wait(l);
630   }
631 }
632 
633 struct ClientCancelInfo {
634   bool cancel{false};
635   int ops_before_cancel;
636 
ClientCancelInfogrpc::testing::__anon984e9ceb0111::ClientCancelInfo637   ClientCancelInfo() : cancel{false} {}
ClientCancelInfogrpc::testing::__anon984e9ceb0111::ClientCancelInfo638   explicit ClientCancelInfo(int ops) : cancel{true}, ops_before_cancel{ops} {}
639 };
640 
641 class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
642  public:
WriteClient(grpc::testing::EchoTestService::Stub * stub,ServerTryCancelRequestPhase server_try_cancel,int num_msgs_to_send,ClientCancelInfo client_cancel={})643   WriteClient(grpc::testing::EchoTestService::Stub* stub,
644               ServerTryCancelRequestPhase server_try_cancel,
645               int num_msgs_to_send, ClientCancelInfo client_cancel = {})
646       : server_try_cancel_(server_try_cancel),
647         num_msgs_to_send_(num_msgs_to_send),
648         client_cancel_{client_cancel} {
649     std::string msg{"Hello server."};
650     for (int i = 0; i < num_msgs_to_send; i++) {
651       desired_ += msg;
652     }
653     if (server_try_cancel != DO_NOT_CANCEL) {
654       // Send server_try_cancel value in the client metadata
655       context_.AddMetadata(kServerTryCancelRequest,
656                            std::to_string(server_try_cancel));
657     }
658     context_.set_initial_metadata_corked(true);
659     stub->experimental_async()->RequestStream(&context_, &response_, this);
660     StartCall();
661     request_.set_message(msg);
662     MaybeWrite();
663   }
OnWriteDone(bool ok)664   void OnWriteDone(bool ok) override {
665     if (ok) {
666       num_msgs_sent_++;
667       MaybeWrite();
668     }
669   }
OnDone(const Status & s)670   void OnDone(const Status& s) override {
671     gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent_);
672     int num_to_send =
673         (client_cancel_.cancel)
674             ? std::min(num_msgs_to_send_, client_cancel_.ops_before_cancel)
675             : num_msgs_to_send_;
676     switch (server_try_cancel_) {
677       case CANCEL_BEFORE_PROCESSING:
678       case CANCEL_DURING_PROCESSING:
679         // If the RPC is canceled by server before / during messages from the
680         // client, it means that the client most likely did not get a chance to
681         // send all the messages it wanted to send. i.e num_msgs_sent <=
682         // num_msgs_to_send
683         EXPECT_LE(num_msgs_sent_, num_to_send);
684         break;
685       case DO_NOT_CANCEL:
686       case CANCEL_AFTER_PROCESSING:
687         // If the RPC was not canceled or canceled after all messages were read
688         // by the server, the client did get a chance to send all its messages
689         EXPECT_EQ(num_msgs_sent_, num_to_send);
690         break;
691       default:
692         assert(false);
693         break;
694     }
695     if ((server_try_cancel_ == DO_NOT_CANCEL) && !client_cancel_.cancel) {
696       EXPECT_TRUE(s.ok());
697       EXPECT_EQ(response_.message(), desired_);
698     } else {
699       EXPECT_FALSE(s.ok());
700       EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
701     }
702     std::unique_lock<std::mutex> l(mu_);
703     done_ = true;
704     cv_.notify_one();
705   }
Await()706   void Await() {
707     std::unique_lock<std::mutex> l(mu_);
708     while (!done_) {
709       cv_.wait(l);
710     }
711   }
712 
713  private:
MaybeWrite()714   void MaybeWrite() {
715     if (client_cancel_.cancel &&
716         num_msgs_sent_ == client_cancel_.ops_before_cancel) {
717       context_.TryCancel();
718     } else if (num_msgs_to_send_ > num_msgs_sent_ + 1) {
719       StartWrite(&request_);
720     } else if (num_msgs_to_send_ == num_msgs_sent_ + 1) {
721       StartWriteLast(&request_, WriteOptions());
722     }
723   }
724   EchoRequest request_;
725   EchoResponse response_;
726   ClientContext context_;
727   const ServerTryCancelRequestPhase server_try_cancel_;
728   int num_msgs_sent_{0};
729   const int num_msgs_to_send_;
730   std::string desired_;
731   const ClientCancelInfo client_cancel_;
732   std::mutex mu_;
733   std::condition_variable cv_;
734   bool done_ = false;
735 };
736 
TEST_P(ClientCallbackEnd2endTest,RequestStream)737 TEST_P(ClientCallbackEnd2endTest, RequestStream) {
738   ResetStub();
739   WriteClient test{stub_.get(), DO_NOT_CANCEL, 3};
740   test.Await();
741   // Make sure that the server interceptors were not notified to cancel
742   if (GetParam().use_interceptors) {
743     EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
744   }
745 }
746 
TEST_P(ClientCallbackEnd2endTest,ClientCancelsRequestStream)747 TEST_P(ClientCallbackEnd2endTest, ClientCancelsRequestStream) {
748   ResetStub();
749   WriteClient test{stub_.get(), DO_NOT_CANCEL, 3, ClientCancelInfo{2}};
750   test.Await();
751   // Make sure that the server interceptors got the cancel
752   if (GetParam().use_interceptors) {
753     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
754   }
755 }
756 
757 // Server to cancel before doing reading the request
TEST_P(ClientCallbackEnd2endTest,RequestStreamServerCancelBeforeReads)758 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelBeforeReads) {
759   ResetStub();
760   WriteClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 1};
761   test.Await();
762   // Make sure that the server interceptors were notified
763   if (GetParam().use_interceptors) {
764     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
765   }
766 }
767 
768 // Server to cancel while reading a request from the stream in parallel
TEST_P(ClientCallbackEnd2endTest,RequestStreamServerCancelDuringRead)769 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelDuringRead) {
770   ResetStub();
771   WriteClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
772   test.Await();
773   // Make sure that the server interceptors were notified
774   if (GetParam().use_interceptors) {
775     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
776   }
777 }
778 
779 // Server to cancel after reading all the requests but before returning to the
780 // client
TEST_P(ClientCallbackEnd2endTest,RequestStreamServerCancelAfterReads)781 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) {
782   ResetStub();
783   WriteClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 4};
784   test.Await();
785   // Make sure that the server interceptors were notified
786   if (GetParam().use_interceptors) {
787     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
788   }
789 }
790 
TEST_P(ClientCallbackEnd2endTest,UnaryReactor)791 TEST_P(ClientCallbackEnd2endTest, UnaryReactor) {
792   ResetStub();
793   class UnaryClient : public grpc::experimental::ClientUnaryReactor {
794    public:
795     UnaryClient(grpc::testing::EchoTestService::Stub* stub) {
796       cli_ctx_.AddMetadata("key1", "val1");
797       cli_ctx_.AddMetadata("key2", "val2");
798       request_.mutable_param()->set_echo_metadata_initially(true);
799       request_.set_message("Hello metadata");
800       stub->experimental_async()->Echo(&cli_ctx_, &request_, &response_, this);
801       StartCall();
802     }
803     void OnReadInitialMetadataDone(bool ok) override {
804       EXPECT_TRUE(ok);
805       EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key1"));
806       EXPECT_EQ(
807           "val1",
808           ToString(cli_ctx_.GetServerInitialMetadata().find("key1")->second));
809       EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key2"));
810       EXPECT_EQ(
811           "val2",
812           ToString(cli_ctx_.GetServerInitialMetadata().find("key2")->second));
813       initial_metadata_done_ = true;
814     }
815     void OnDone(const Status& s) override {
816       EXPECT_TRUE(initial_metadata_done_);
817       EXPECT_EQ(0u, cli_ctx_.GetServerTrailingMetadata().size());
818       EXPECT_TRUE(s.ok());
819       EXPECT_EQ(request_.message(), response_.message());
820       std::unique_lock<std::mutex> l(mu_);
821       done_ = true;
822       cv_.notify_one();
823     }
824     void Await() {
825       std::unique_lock<std::mutex> l(mu_);
826       while (!done_) {
827         cv_.wait(l);
828       }
829     }
830 
831    private:
832     EchoRequest request_;
833     EchoResponse response_;
834     ClientContext cli_ctx_;
835     std::mutex mu_;
836     std::condition_variable cv_;
837     bool done_{false};
838     bool initial_metadata_done_{false};
839   };
840 
841   UnaryClient test{stub_.get()};
842   test.Await();
843   // Make sure that the server interceptors were not notified of a cancel
844   if (GetParam().use_interceptors) {
845     EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
846   }
847 }
848 
TEST_P(ClientCallbackEnd2endTest,GenericUnaryReactor)849 TEST_P(ClientCallbackEnd2endTest, GenericUnaryReactor) {
850   ResetStub();
851   const std::string kMethodName("/grpc.testing.EchoTestService/Echo");
852   class UnaryClient : public grpc::experimental::ClientUnaryReactor {
853    public:
854     UnaryClient(grpc::GenericStub* stub, const std::string& method_name) {
855       cli_ctx_.AddMetadata("key1", "val1");
856       cli_ctx_.AddMetadata("key2", "val2");
857       request_.mutable_param()->set_echo_metadata_initially(true);
858       request_.set_message("Hello metadata");
859       send_buf_ = SerializeToByteBuffer(&request_);
860 
861       stub->experimental().PrepareUnaryCall(&cli_ctx_, method_name,
862                                             send_buf_.get(), &recv_buf_, this);
863       StartCall();
864     }
865     void OnReadInitialMetadataDone(bool ok) override {
866       EXPECT_TRUE(ok);
867       EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key1"));
868       EXPECT_EQ(
869           "val1",
870           ToString(cli_ctx_.GetServerInitialMetadata().find("key1")->second));
871       EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key2"));
872       EXPECT_EQ(
873           "val2",
874           ToString(cli_ctx_.GetServerInitialMetadata().find("key2")->second));
875       initial_metadata_done_ = true;
876     }
877     void OnDone(const Status& s) override {
878       EXPECT_TRUE(initial_metadata_done_);
879       EXPECT_EQ(0u, cli_ctx_.GetServerTrailingMetadata().size());
880       EXPECT_TRUE(s.ok());
881       EchoResponse response;
882       EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
883       EXPECT_EQ(request_.message(), response.message());
884       std::unique_lock<std::mutex> l(mu_);
885       done_ = true;
886       cv_.notify_one();
887     }
888     void Await() {
889       std::unique_lock<std::mutex> l(mu_);
890       while (!done_) {
891         cv_.wait(l);
892       }
893     }
894 
895    private:
896     EchoRequest request_;
897     std::unique_ptr<ByteBuffer> send_buf_;
898     ByteBuffer recv_buf_;
899     ClientContext cli_ctx_;
900     std::mutex mu_;
901     std::condition_variable cv_;
902     bool done_{false};
903     bool initial_metadata_done_{false};
904   };
905 
906   UnaryClient test{generic_stub_.get(), kMethodName};
907   test.Await();
908   // Make sure that the server interceptors were not notified of a cancel
909   if (GetParam().use_interceptors) {
910     EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
911   }
912 }
913 
914 class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
915  public:
ReadClient(grpc::testing::EchoTestService::Stub * stub,ServerTryCancelRequestPhase server_try_cancel,ClientCancelInfo client_cancel={})916   ReadClient(grpc::testing::EchoTestService::Stub* stub,
917              ServerTryCancelRequestPhase server_try_cancel,
918              ClientCancelInfo client_cancel = {})
919       : server_try_cancel_(server_try_cancel), client_cancel_{client_cancel} {
920     if (server_try_cancel_ != DO_NOT_CANCEL) {
921       // Send server_try_cancel value in the client metadata
922       context_.AddMetadata(kServerTryCancelRequest,
923                            std::to_string(server_try_cancel));
924     }
925     request_.set_message("Hello client ");
926     stub->experimental_async()->ResponseStream(&context_, &request_, this);
927     if (client_cancel_.cancel &&
928         reads_complete_ == client_cancel_.ops_before_cancel) {
929       context_.TryCancel();
930     }
931     // Even if we cancel, read until failure because there might be responses
932     // pending
933     StartRead(&response_);
934     StartCall();
935   }
OnReadDone(bool ok)936   void OnReadDone(bool ok) override {
937     if (!ok) {
938       if (server_try_cancel_ == DO_NOT_CANCEL && !client_cancel_.cancel) {
939         EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
940       }
941     } else {
942       EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
943       EXPECT_EQ(response_.message(),
944                 request_.message() + std::to_string(reads_complete_));
945       reads_complete_++;
946       if (client_cancel_.cancel &&
947           reads_complete_ == client_cancel_.ops_before_cancel) {
948         context_.TryCancel();
949       }
950       // Even if we cancel, read until failure because there might be responses
951       // pending
952       StartRead(&response_);
953     }
954   }
OnDone(const Status & s)955   void OnDone(const Status& s) override {
956     gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
957     switch (server_try_cancel_) {
958       case DO_NOT_CANCEL:
959         if (!client_cancel_.cancel || client_cancel_.ops_before_cancel >
960                                           kServerDefaultResponseStreamsToSend) {
961           EXPECT_TRUE(s.ok());
962           EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
963         } else {
964           EXPECT_GE(reads_complete_, client_cancel_.ops_before_cancel);
965           EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
966           // Status might be ok or cancelled depending on whether server
967           // sent status before client cancel went through
968           if (!s.ok()) {
969             EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
970           }
971         }
972         break;
973       case CANCEL_BEFORE_PROCESSING:
974         EXPECT_FALSE(s.ok());
975         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
976         EXPECT_EQ(reads_complete_, 0);
977         break;
978       case CANCEL_DURING_PROCESSING:
979       case CANCEL_AFTER_PROCESSING:
980         // If server canceled while writing messages, client must have read
981         // less than or equal to the expected number of messages. Even if the
982         // server canceled after writing all messages, the RPC may be canceled
983         // before the Client got a chance to read all the messages.
984         EXPECT_FALSE(s.ok());
985         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
986         EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
987         break;
988       default:
989         assert(false);
990     }
991     std::unique_lock<std::mutex> l(mu_);
992     done_ = true;
993     cv_.notify_one();
994   }
Await()995   void Await() {
996     std::unique_lock<std::mutex> l(mu_);
997     while (!done_) {
998       cv_.wait(l);
999     }
1000   }
1001 
1002  private:
1003   EchoRequest request_;
1004   EchoResponse response_;
1005   ClientContext context_;
1006   const ServerTryCancelRequestPhase server_try_cancel_;
1007   int reads_complete_{0};
1008   const ClientCancelInfo client_cancel_;
1009   std::mutex mu_;
1010   std::condition_variable cv_;
1011   bool done_ = false;
1012 };
1013 
TEST_P(ClientCallbackEnd2endTest,ResponseStream)1014 TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
1015   ResetStub();
1016   ReadClient test{stub_.get(), DO_NOT_CANCEL};
1017   test.Await();
1018   // Make sure that the server interceptors were not notified of a cancel
1019   if (GetParam().use_interceptors) {
1020     EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
1021   }
1022 }
1023 
TEST_P(ClientCallbackEnd2endTest,ClientCancelsResponseStream)1024 TEST_P(ClientCallbackEnd2endTest, ClientCancelsResponseStream) {
1025   ResetStub();
1026   ReadClient test{stub_.get(), DO_NOT_CANCEL, ClientCancelInfo{2}};
1027   test.Await();
1028   // Because cancel in this case races with server finish, we can't be sure that
1029   // server interceptors even see cancellation
1030 }
1031 
1032 // Server to cancel before sending any response messages
TEST_P(ClientCallbackEnd2endTest,ResponseStreamServerCancelBefore)1033 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelBefore) {
1034   ResetStub();
1035   ReadClient test{stub_.get(), CANCEL_BEFORE_PROCESSING};
1036   test.Await();
1037   // Make sure that the server interceptors were notified
1038   if (GetParam().use_interceptors) {
1039     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1040   }
1041 }
1042 
1043 // Server to cancel while writing a response to the stream in parallel
TEST_P(ClientCallbackEnd2endTest,ResponseStreamServerCancelDuring)1044 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelDuring) {
1045   ResetStub();
1046   ReadClient test{stub_.get(), CANCEL_DURING_PROCESSING};
1047   test.Await();
1048   // Make sure that the server interceptors were notified
1049   if (GetParam().use_interceptors) {
1050     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1051   }
1052 }
1053 
1054 // Server to cancel after writing all the respones to the stream but before
1055 // returning to the client
TEST_P(ClientCallbackEnd2endTest,ResponseStreamServerCancelAfter)1056 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelAfter) {
1057   ResetStub();
1058   ReadClient test{stub_.get(), CANCEL_AFTER_PROCESSING};
1059   test.Await();
1060   // Make sure that the server interceptors were notified
1061   if (GetParam().use_interceptors) {
1062     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1063   }
1064 }
1065 
1066 class BidiClient
1067     : public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> {
1068  public:
BidiClient(grpc::testing::EchoTestService::Stub * stub,ServerTryCancelRequestPhase server_try_cancel,int num_msgs_to_send,bool cork_metadata,bool first_write_async,ClientCancelInfo client_cancel={})1069   BidiClient(grpc::testing::EchoTestService::Stub* stub,
1070              ServerTryCancelRequestPhase server_try_cancel,
1071              int num_msgs_to_send, bool cork_metadata, bool first_write_async,
1072              ClientCancelInfo client_cancel = {})
1073       : server_try_cancel_(server_try_cancel),
1074         msgs_to_send_{num_msgs_to_send},
1075         client_cancel_{client_cancel} {
1076     if (server_try_cancel_ != DO_NOT_CANCEL) {
1077       // Send server_try_cancel value in the client metadata
1078       context_.AddMetadata(kServerTryCancelRequest,
1079                            std::to_string(server_try_cancel));
1080     }
1081     request_.set_message("Hello fren ");
1082     context_.set_initial_metadata_corked(cork_metadata);
1083     stub->experimental_async()->BidiStream(&context_, this);
1084     MaybeAsyncWrite(first_write_async);
1085     StartRead(&response_);
1086     StartCall();
1087   }
OnReadDone(bool ok)1088   void OnReadDone(bool ok) override {
1089     if (!ok) {
1090       if (server_try_cancel_ == DO_NOT_CANCEL) {
1091         if (!client_cancel_.cancel) {
1092           EXPECT_EQ(reads_complete_, msgs_to_send_);
1093         } else {
1094           EXPECT_LE(reads_complete_, writes_complete_);
1095         }
1096       }
1097     } else {
1098       EXPECT_LE(reads_complete_, msgs_to_send_);
1099       EXPECT_EQ(response_.message(), request_.message());
1100       reads_complete_++;
1101       StartRead(&response_);
1102     }
1103   }
OnWriteDone(bool ok)1104   void OnWriteDone(bool ok) override {
1105     if (async_write_thread_.joinable()) {
1106       async_write_thread_.join();
1107       RemoveHold();
1108     }
1109     if (server_try_cancel_ == DO_NOT_CANCEL) {
1110       EXPECT_TRUE(ok);
1111     } else if (!ok) {
1112       return;
1113     }
1114     writes_complete_++;
1115     MaybeWrite();
1116   }
OnDone(const Status & s)1117   void OnDone(const Status& s) override {
1118     gpr_log(GPR_INFO, "Sent %d messages", writes_complete_);
1119     gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
1120     switch (server_try_cancel_) {
1121       case DO_NOT_CANCEL:
1122         if (!client_cancel_.cancel ||
1123             client_cancel_.ops_before_cancel > msgs_to_send_) {
1124           EXPECT_TRUE(s.ok());
1125           EXPECT_EQ(writes_complete_, msgs_to_send_);
1126           EXPECT_EQ(reads_complete_, writes_complete_);
1127         } else {
1128           EXPECT_FALSE(s.ok());
1129           EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1130           EXPECT_EQ(writes_complete_, client_cancel_.ops_before_cancel);
1131           EXPECT_LE(reads_complete_, writes_complete_);
1132         }
1133         break;
1134       case CANCEL_BEFORE_PROCESSING:
1135         EXPECT_FALSE(s.ok());
1136         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1137         // The RPC is canceled before the server did any work or returned any
1138         // reads, but it's possible that some writes took place first from the
1139         // client
1140         EXPECT_LE(writes_complete_, msgs_to_send_);
1141         EXPECT_EQ(reads_complete_, 0);
1142         break;
1143       case CANCEL_DURING_PROCESSING:
1144         EXPECT_FALSE(s.ok());
1145         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1146         EXPECT_LE(writes_complete_, msgs_to_send_);
1147         EXPECT_LE(reads_complete_, writes_complete_);
1148         break;
1149       case CANCEL_AFTER_PROCESSING:
1150         EXPECT_FALSE(s.ok());
1151         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1152         EXPECT_EQ(writes_complete_, msgs_to_send_);
1153         // The Server canceled after reading the last message and after writing
1154         // the message to the client. However, the RPC cancellation might have
1155         // taken effect before the client actually read the response.
1156         EXPECT_LE(reads_complete_, writes_complete_);
1157         break;
1158       default:
1159         assert(false);
1160     }
1161     std::unique_lock<std::mutex> l(mu_);
1162     done_ = true;
1163     cv_.notify_one();
1164   }
Await()1165   void Await() {
1166     std::unique_lock<std::mutex> l(mu_);
1167     while (!done_) {
1168       cv_.wait(l);
1169     }
1170   }
1171 
1172  private:
MaybeAsyncWrite(bool first_write_async)1173   void MaybeAsyncWrite(bool first_write_async) {
1174     if (first_write_async) {
1175       // Make sure that we have a write to issue.
1176       // TODO(vjpai): Make this work with 0 writes case as well.
1177       assert(msgs_to_send_ >= 1);
1178 
1179       AddHold();
1180       async_write_thread_ = std::thread([this] {
1181         std::unique_lock<std::mutex> lock(async_write_thread_mu_);
1182         async_write_thread_cv_.wait(
1183             lock, [this] { return async_write_thread_start_; });
1184         MaybeWrite();
1185       });
1186       std::lock_guard<std::mutex> lock(async_write_thread_mu_);
1187       async_write_thread_start_ = true;
1188       async_write_thread_cv_.notify_one();
1189       return;
1190     }
1191     MaybeWrite();
1192   }
MaybeWrite()1193   void MaybeWrite() {
1194     if (client_cancel_.cancel &&
1195         writes_complete_ == client_cancel_.ops_before_cancel) {
1196       context_.TryCancel();
1197     } else if (writes_complete_ == msgs_to_send_) {
1198       StartWritesDone();
1199     } else {
1200       StartWrite(&request_);
1201     }
1202   }
1203   EchoRequest request_;
1204   EchoResponse response_;
1205   ClientContext context_;
1206   const ServerTryCancelRequestPhase server_try_cancel_;
1207   int reads_complete_{0};
1208   int writes_complete_{0};
1209   const int msgs_to_send_;
1210   const ClientCancelInfo client_cancel_;
1211   std::mutex mu_;
1212   std::condition_variable cv_;
1213   bool done_ = false;
1214   std::thread async_write_thread_;
1215   bool async_write_thread_start_ = false;
1216   std::mutex async_write_thread_mu_;
1217   std::condition_variable async_write_thread_cv_;
1218 };
1219 
TEST_P(ClientCallbackEnd2endTest,BidiStream)1220 TEST_P(ClientCallbackEnd2endTest, BidiStream) {
1221   ResetStub();
1222   BidiClient test(stub_.get(), DO_NOT_CANCEL,
1223                   kServerDefaultResponseStreamsToSend,
1224                   /*cork_metadata=*/false, /*first_write_async=*/false);
1225   test.Await();
1226   // Make sure that the server interceptors were not notified of a cancel
1227   if (GetParam().use_interceptors) {
1228     EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
1229   }
1230 }
1231 
TEST_P(ClientCallbackEnd2endTest,BidiStreamFirstWriteAsync)1232 TEST_P(ClientCallbackEnd2endTest, BidiStreamFirstWriteAsync) {
1233   ResetStub();
1234   BidiClient test(stub_.get(), DO_NOT_CANCEL,
1235                   kServerDefaultResponseStreamsToSend,
1236                   /*cork_metadata=*/false, /*first_write_async=*/true);
1237   test.Await();
1238   // Make sure that the server interceptors were not notified of a cancel
1239   if (GetParam().use_interceptors) {
1240     EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
1241   }
1242 }
1243 
TEST_P(ClientCallbackEnd2endTest,BidiStreamCorked)1244 TEST_P(ClientCallbackEnd2endTest, BidiStreamCorked) {
1245   ResetStub();
1246   BidiClient test(stub_.get(), DO_NOT_CANCEL,
1247                   kServerDefaultResponseStreamsToSend,
1248                   /*cork_metadata=*/true, /*first_write_async=*/false);
1249   test.Await();
1250   // Make sure that the server interceptors were not notified of a cancel
1251   if (GetParam().use_interceptors) {
1252     EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
1253   }
1254 }
1255 
TEST_P(ClientCallbackEnd2endTest,BidiStreamCorkedFirstWriteAsync)1256 TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) {
1257   ResetStub();
1258   BidiClient test(stub_.get(), DO_NOT_CANCEL,
1259                   kServerDefaultResponseStreamsToSend,
1260                   /*cork_metadata=*/true, /*first_write_async=*/true);
1261   test.Await();
1262   // Make sure that the server interceptors were not notified of a cancel
1263   if (GetParam().use_interceptors) {
1264     EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
1265   }
1266 }
1267 
TEST_P(ClientCallbackEnd2endTest,ClientCancelsBidiStream)1268 TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
1269   ResetStub();
1270   BidiClient test(stub_.get(), DO_NOT_CANCEL,
1271                   kServerDefaultResponseStreamsToSend,
1272                   /*cork_metadata=*/false, /*first_write_async=*/false,
1273                   ClientCancelInfo(2));
1274   test.Await();
1275   // Make sure that the server interceptors were notified of a cancel
1276   if (GetParam().use_interceptors) {
1277     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1278   }
1279 }
1280 
1281 // Server to cancel before reading/writing any requests/responses on the stream
TEST_P(ClientCallbackEnd2endTest,BidiStreamServerCancelBefore)1282 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
1283   ResetStub();
1284   BidiClient test(stub_.get(), CANCEL_BEFORE_PROCESSING, /*num_msgs_to_send=*/2,
1285                   /*cork_metadata=*/false, /*first_write_async=*/false);
1286   test.Await();
1287   // Make sure that the server interceptors were notified
1288   if (GetParam().use_interceptors) {
1289     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1290   }
1291 }
1292 
1293 // Server to cancel while reading/writing requests/responses on the stream in
1294 // parallel
TEST_P(ClientCallbackEnd2endTest,BidiStreamServerCancelDuring)1295 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
1296   ResetStub();
1297   BidiClient test(stub_.get(), CANCEL_DURING_PROCESSING,
1298                   /*num_msgs_to_send=*/10, /*cork_metadata=*/false,
1299                   /*first_write_async=*/false);
1300   test.Await();
1301   // Make sure that the server interceptors were notified
1302   if (GetParam().use_interceptors) {
1303     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1304   }
1305 }
1306 
1307 // Server to cancel after reading/writing all requests/responses on the stream
1308 // but before returning to the client
TEST_P(ClientCallbackEnd2endTest,BidiStreamServerCancelAfter)1309 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) {
1310   ResetStub();
1311   BidiClient test(stub_.get(), CANCEL_AFTER_PROCESSING, /*num_msgs_to_send=*/5,
1312                   /*cork_metadata=*/false, /*first_write_async=*/false);
1313   test.Await();
1314   // Make sure that the server interceptors were notified
1315   if (GetParam().use_interceptors) {
1316     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1317   }
1318 }
1319 
TEST_P(ClientCallbackEnd2endTest,SimultaneousReadAndWritesDone)1320 TEST_P(ClientCallbackEnd2endTest, SimultaneousReadAndWritesDone) {
1321   ResetStub();
1322   class Client : public grpc::experimental::ClientBidiReactor<EchoRequest,
1323                                                               EchoResponse> {
1324    public:
1325     Client(grpc::testing::EchoTestService::Stub* stub) {
1326       request_.set_message("Hello bidi ");
1327       stub->experimental_async()->BidiStream(&context_, this);
1328       StartWrite(&request_);
1329       StartCall();
1330     }
1331     void OnReadDone(bool ok) override {
1332       EXPECT_TRUE(ok);
1333       EXPECT_EQ(response_.message(), request_.message());
1334     }
1335     void OnWriteDone(bool ok) override {
1336       EXPECT_TRUE(ok);
1337       // Now send out the simultaneous Read and WritesDone
1338       StartWritesDone();
1339       StartRead(&response_);
1340     }
1341     void OnDone(const Status& s) override {
1342       EXPECT_TRUE(s.ok());
1343       EXPECT_EQ(response_.message(), request_.message());
1344       std::unique_lock<std::mutex> l(mu_);
1345       done_ = true;
1346       cv_.notify_one();
1347     }
1348     void Await() {
1349       std::unique_lock<std::mutex> l(mu_);
1350       while (!done_) {
1351         cv_.wait(l);
1352       }
1353     }
1354 
1355    private:
1356     EchoRequest request_;
1357     EchoResponse response_;
1358     ClientContext context_;
1359     std::mutex mu_;
1360     std::condition_variable cv_;
1361     bool done_ = false;
1362   } test{stub_.get()};
1363 
1364   test.Await();
1365 }
1366 
TEST_P(ClientCallbackEnd2endTest,UnimplementedRpc)1367 TEST_P(ClientCallbackEnd2endTest, UnimplementedRpc) {
1368   ChannelArguments args;
1369   const auto& channel_creds = GetCredentialsProvider()->GetChannelCredentials(
1370       GetParam().credentials_type, &args);
1371   std::shared_ptr<Channel> channel =
1372       (GetParam().protocol == Protocol::TCP)
1373           ? ::grpc::CreateCustomChannel(server_address_.str(), channel_creds,
1374                                         args)
1375           : server_->InProcessChannel(args);
1376   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1377   stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
1378   EchoRequest request;
1379   EchoResponse response;
1380   ClientContext cli_ctx;
1381   request.set_message("Hello world.");
1382   std::mutex mu;
1383   std::condition_variable cv;
1384   bool done = false;
1385   stub->experimental_async()->Unimplemented(
1386       &cli_ctx, &request, &response, [&done, &mu, &cv](Status s) {
1387         EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
1388         EXPECT_EQ("", s.error_message());
1389 
1390         std::lock_guard<std::mutex> l(mu);
1391         done = true;
1392         cv.notify_one();
1393       });
1394   std::unique_lock<std::mutex> l(mu);
1395   while (!done) {
1396     cv.wait(l);
1397   }
1398 }
1399 
TEST_P(ClientCallbackEnd2endTest,ResponseStreamExtraReactionFlowReadsUntilDone)1400 TEST_P(ClientCallbackEnd2endTest,
1401        ResponseStreamExtraReactionFlowReadsUntilDone) {
1402   ResetStub();
1403   class ReadAllIncomingDataClient
1404       : public grpc::experimental::ClientReadReactor<EchoResponse> {
1405    public:
1406     ReadAllIncomingDataClient(grpc::testing::EchoTestService::Stub* stub) {
1407       request_.set_message("Hello client ");
1408       stub->experimental_async()->ResponseStream(&context_, &request_, this);
1409     }
1410     bool WaitForReadDone() {
1411       std::unique_lock<std::mutex> l(mu_);
1412       while (!read_done_) {
1413         read_cv_.wait(l);
1414       }
1415       read_done_ = false;
1416       return read_ok_;
1417     }
1418     void Await() {
1419       std::unique_lock<std::mutex> l(mu_);
1420       while (!done_) {
1421         done_cv_.wait(l);
1422       }
1423     }
1424     // RemoveHold under the same lock used for OnDone to make sure that we don't
1425     // call OnDone directly or indirectly from the RemoveHold function.
1426     void RemoveHoldUnderLock() {
1427       std::unique_lock<std::mutex> l(mu_);
1428       RemoveHold();
1429     }
1430     const Status& status() {
1431       std::unique_lock<std::mutex> l(mu_);
1432       return status_;
1433     }
1434 
1435    private:
1436     void OnReadDone(bool ok) override {
1437       std::unique_lock<std::mutex> l(mu_);
1438       read_ok_ = ok;
1439       read_done_ = true;
1440       read_cv_.notify_one();
1441     }
1442     void OnDone(const Status& s) override {
1443       std::unique_lock<std::mutex> l(mu_);
1444       done_ = true;
1445       status_ = s;
1446       done_cv_.notify_one();
1447     }
1448 
1449     EchoRequest request_;
1450     EchoResponse response_;
1451     ClientContext context_;
1452     bool read_ok_ = false;
1453     bool read_done_ = false;
1454     std::mutex mu_;
1455     std::condition_variable read_cv_;
1456     std::condition_variable done_cv_;
1457     bool done_ = false;
1458     Status status_;
1459   } client{stub_.get()};
1460 
1461   int reads_complete = 0;
1462   client.AddHold();
1463   client.StartCall();
1464 
1465   EchoResponse response;
1466   bool read_ok = true;
1467   while (read_ok) {
1468     client.StartRead(&response);
1469     read_ok = client.WaitForReadDone();
1470     if (read_ok) {
1471       ++reads_complete;
1472     }
1473   }
1474   client.RemoveHoldUnderLock();
1475   client.Await();
1476 
1477   EXPECT_EQ(kServerDefaultResponseStreamsToSend, reads_complete);
1478   EXPECT_EQ(client.status().error_code(), grpc::StatusCode::OK);
1479 }
1480 
CreateTestScenarios(bool test_insecure)1481 std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
1482 #if TARGET_OS_IPHONE
1483   // Workaround Apple CFStream bug
1484   gpr_setenv("grpc_cfstream", "0");
1485 #endif
1486 
1487   std::vector<TestScenario> scenarios;
1488   std::vector<std::string> credentials_types{
1489       GetCredentialsProvider()->GetSecureCredentialsTypeList()};
1490   auto insec_ok = [] {
1491     // Only allow insecure credentials type when it is registered with the
1492     // provider. User may create providers that do not have insecure.
1493     return GetCredentialsProvider()->GetChannelCredentials(
1494                kInsecureCredentialsType, nullptr) != nullptr;
1495   };
1496   if (test_insecure && insec_ok()) {
1497     credentials_types.push_back(kInsecureCredentialsType);
1498   }
1499   GPR_ASSERT(!credentials_types.empty());
1500 
1501   bool barr[]{false, true};
1502   Protocol parr[]{Protocol::INPROC, Protocol::TCP};
1503   for (Protocol p : parr) {
1504     for (const auto& cred : credentials_types) {
1505       // TODO(vjpai): Test inproc with secure credentials when feasible
1506       if (p == Protocol::INPROC &&
1507           (cred != kInsecureCredentialsType || !insec_ok())) {
1508         continue;
1509       }
1510       for (bool callback_server : barr) {
1511         for (bool use_interceptors : barr) {
1512           scenarios.emplace_back(callback_server, p, use_interceptors, cred);
1513         }
1514       }
1515     }
1516   }
1517   return scenarios;
1518 }
1519 
1520 INSTANTIATE_TEST_SUITE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest,
1521                          ::testing::ValuesIn(CreateTestScenarios(true)));
1522 
1523 }  // namespace
1524 }  // namespace testing
1525 }  // namespace grpc
1526 
main(int argc,char ** argv)1527 int main(int argc, char** argv) {
1528   ::testing::InitGoogleTest(&argc, argv);
1529   grpc::testing::TestEnvironment env(argc, argv);
1530   return RUN_ALL_TESTS();
1531 }
1532