1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpcpp/channel.h>
20 #include <grpcpp/client_context.h>
21 #include <grpcpp/create_channel.h>
22 #include <grpcpp/generic/generic_stub.h>
23 #include <grpcpp/impl/codegen/proto_utils.h>
24 #include <grpcpp/server.h>
25 #include <grpcpp/server_builder.h>
26 #include <grpcpp/server_context.h>
27 #include <grpcpp/support/client_callback.h>
28 #include <gtest/gtest.h>
29
30 #include <algorithm>
31 #include <condition_variable>
32 #include <functional>
33 #include <mutex>
34 #include <sstream>
35 #include <thread>
36
37 #include "src/core/lib/gpr/env.h"
38 #include "src/core/lib/iomgr/iomgr.h"
39 #include "src/proto/grpc/testing/echo.grpc.pb.h"
40 #include "test/core/util/port.h"
41 #include "test/core/util/test_config.h"
42 #include "test/cpp/end2end/interceptors_util.h"
43 #include "test/cpp/end2end/test_service_impl.h"
44 #include "test/cpp/util/byte_buffer_proto_helper.h"
45 #include "test/cpp/util/string_ref_helper.h"
46 #include "test/cpp/util/test_credentials_provider.h"
47
48 namespace grpc {
49 namespace testing {
50 namespace {
51
52 enum class Protocol { INPROC, TCP };
53
54 class TestScenario {
55 public:
TestScenario(bool serve_callback,Protocol protocol,bool intercept,const std::string & creds_type)56 TestScenario(bool serve_callback, Protocol protocol, bool intercept,
57 const std::string& creds_type)
58 : callback_server(serve_callback),
59 protocol(protocol),
60 use_interceptors(intercept),
61 credentials_type(creds_type) {}
62 void Log() const;
63 bool callback_server;
64 Protocol protocol;
65 bool use_interceptors;
66 const std::string credentials_type;
67 };
68
operator <<(std::ostream & out,const TestScenario & scenario)69 static std::ostream& operator<<(std::ostream& out,
70 const TestScenario& scenario) {
71 return out << "TestScenario{callback_server="
72 << (scenario.callback_server ? "true" : "false") << ",protocol="
73 << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP")
74 << ",intercept=" << (scenario.use_interceptors ? "true" : "false")
75 << ",creds=" << scenario.credentials_type << "}";
76 }
77
Log() const78 void TestScenario::Log() const {
79 std::ostringstream out;
80 out << *this;
81 gpr_log(GPR_DEBUG, "%s", out.str().c_str());
82 }
83
84 class ClientCallbackEnd2endTest
85 : public ::testing::TestWithParam<TestScenario> {
86 protected:
ClientCallbackEnd2endTest()87 ClientCallbackEnd2endTest() { GetParam().Log(); }
88
SetUp()89 void SetUp() override {
90 ServerBuilder builder;
91
92 auto server_creds = GetCredentialsProvider()->GetServerCredentials(
93 GetParam().credentials_type);
94 // TODO(vjpai): Support testing of AuthMetadataProcessor
95
96 if (GetParam().protocol == Protocol::TCP) {
97 picked_port_ = grpc_pick_unused_port_or_die();
98 server_address_ << "localhost:" << picked_port_;
99 builder.AddListeningPort(server_address_.str(), server_creds);
100 }
101 if (!GetParam().callback_server) {
102 builder.RegisterService(&service_);
103 } else {
104 builder.RegisterService(&callback_service_);
105 }
106
107 if (GetParam().use_interceptors) {
108 std::vector<
109 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
110 creators;
111 // Add 20 dummy server interceptors
112 creators.reserve(20);
113 for (auto i = 0; i < 20; i++) {
114 creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
115 new DummyInterceptorFactory()));
116 }
117 builder.experimental().SetInterceptorCreators(std::move(creators));
118 }
119
120 server_ = builder.BuildAndStart();
121 is_server_started_ = true;
122 }
123
ResetStub()124 void ResetStub() {
125 ChannelArguments args;
126 auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
127 GetParam().credentials_type, &args);
128 switch (GetParam().protocol) {
129 case Protocol::TCP:
130 if (!GetParam().use_interceptors) {
131 channel_ = ::grpc::CreateCustomChannel(server_address_.str(),
132 channel_creds, args);
133 } else {
134 channel_ = CreateCustomChannelWithInterceptors(
135 server_address_.str(), channel_creds, args,
136 CreateDummyClientInterceptors());
137 }
138 break;
139 case Protocol::INPROC:
140 if (!GetParam().use_interceptors) {
141 channel_ = server_->InProcessChannel(args);
142 } else {
143 channel_ = server_->experimental().InProcessChannelWithInterceptors(
144 args, CreateDummyClientInterceptors());
145 }
146 break;
147 default:
148 assert(false);
149 }
150 stub_ = grpc::testing::EchoTestService::NewStub(channel_);
151 generic_stub_.reset(new GenericStub(channel_));
152 DummyInterceptor::Reset();
153 }
154
TearDown()155 void TearDown() override {
156 if (is_server_started_) {
157 // Although we would normally do an explicit shutdown, the server
158 // should also work correctly with just a destructor call. The regular
159 // end2end test uses explicit shutdown, so let this one just do reset.
160 server_.reset();
161 }
162 if (picked_port_ > 0) {
163 grpc_recycle_unused_port(picked_port_);
164 }
165 }
166
SendRpcs(int num_rpcs,bool with_binary_metadata)167 void SendRpcs(int num_rpcs, bool with_binary_metadata) {
168 std::string test_string("");
169 for (int i = 0; i < num_rpcs; i++) {
170 EchoRequest request;
171 EchoResponse response;
172 ClientContext cli_ctx;
173
174 test_string += "Hello world. ";
175 request.set_message(test_string);
176 std::string val;
177 if (with_binary_metadata) {
178 request.mutable_param()->set_echo_metadata(true);
179 char bytes[8] = {'\0', '\1', '\2', '\3',
180 '\4', '\5', '\6', static_cast<char>(i)};
181 val = std::string(bytes, 8);
182 cli_ctx.AddMetadata("custom-bin", val);
183 }
184
185 cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
186
187 std::mutex mu;
188 std::condition_variable cv;
189 bool done = false;
190 stub_->experimental_async()->Echo(
191 &cli_ctx, &request, &response,
192 [&cli_ctx, &request, &response, &done, &mu, &cv, val,
193 with_binary_metadata](Status s) {
194 GPR_ASSERT(s.ok());
195
196 EXPECT_EQ(request.message(), response.message());
197 if (with_binary_metadata) {
198 EXPECT_EQ(
199 1u, cli_ctx.GetServerTrailingMetadata().count("custom-bin"));
200 EXPECT_EQ(val, ToString(cli_ctx.GetServerTrailingMetadata()
201 .find("custom-bin")
202 ->second));
203 }
204 std::lock_guard<std::mutex> l(mu);
205 done = true;
206 cv.notify_one();
207 });
208 std::unique_lock<std::mutex> l(mu);
209 while (!done) {
210 cv.wait(l);
211 }
212 }
213 }
214
SendRpcsRawReq(int num_rpcs)215 void SendRpcsRawReq(int num_rpcs) {
216 std::string test_string("Hello raw world.");
217 EchoRequest request;
218 request.set_message(test_string);
219 std::unique_ptr<ByteBuffer> send_buf = SerializeToByteBuffer(&request);
220
221 for (int i = 0; i < num_rpcs; i++) {
222 EchoResponse response;
223 ClientContext cli_ctx;
224
225 std::mutex mu;
226 std::condition_variable cv;
227 bool done = false;
228 stub_->experimental_async()->Echo(
229 &cli_ctx, send_buf.get(), &response,
230 [&request, &response, &done, &mu, &cv](Status s) {
231 GPR_ASSERT(s.ok());
232
233 EXPECT_EQ(request.message(), response.message());
234 std::lock_guard<std::mutex> l(mu);
235 done = true;
236 cv.notify_one();
237 });
238 std::unique_lock<std::mutex> l(mu);
239 while (!done) {
240 cv.wait(l);
241 }
242 }
243 }
244
SendRpcsGeneric(int num_rpcs,bool maybe_except)245 void SendRpcsGeneric(int num_rpcs, bool maybe_except) {
246 const std::string kMethodName("/grpc.testing.EchoTestService/Echo");
247 std::string test_string("");
248 for (int i = 0; i < num_rpcs; i++) {
249 EchoRequest request;
250 std::unique_ptr<ByteBuffer> send_buf;
251 ByteBuffer recv_buf;
252 ClientContext cli_ctx;
253
254 test_string += "Hello world. ";
255 request.set_message(test_string);
256 send_buf = SerializeToByteBuffer(&request);
257
258 std::mutex mu;
259 std::condition_variable cv;
260 bool done = false;
261 generic_stub_->experimental().UnaryCall(
262 &cli_ctx, kMethodName, send_buf.get(), &recv_buf,
263 [&request, &recv_buf, &done, &mu, &cv, maybe_except](Status s) {
264 GPR_ASSERT(s.ok());
265
266 EchoResponse response;
267 EXPECT_TRUE(ParseFromByteBuffer(&recv_buf, &response));
268 EXPECT_EQ(request.message(), response.message());
269 std::lock_guard<std::mutex> l(mu);
270 done = true;
271 cv.notify_one();
272 #if GRPC_ALLOW_EXCEPTIONS
273 if (maybe_except) {
274 throw - 1;
275 }
276 #else
277 GPR_ASSERT(!maybe_except);
278 #endif
279 });
280 std::unique_lock<std::mutex> l(mu);
281 while (!done) {
282 cv.wait(l);
283 }
284 }
285 }
286
SendGenericEchoAsBidi(int num_rpcs,int reuses,bool do_writes_done)287 void SendGenericEchoAsBidi(int num_rpcs, int reuses, bool do_writes_done) {
288 const std::string kMethodName("/grpc.testing.EchoTestService/Echo");
289 std::string test_string("");
290 for (int i = 0; i < num_rpcs; i++) {
291 test_string += "Hello world. ";
292 class Client : public grpc::experimental::ClientBidiReactor<ByteBuffer,
293 ByteBuffer> {
294 public:
295 Client(ClientCallbackEnd2endTest* test, const std::string& method_name,
296 const std::string& test_str, int reuses, bool do_writes_done)
297 : reuses_remaining_(reuses), do_writes_done_(do_writes_done) {
298 activate_ = [this, test, method_name, test_str] {
299 if (reuses_remaining_ > 0) {
300 cli_ctx_.reset(new ClientContext);
301 reuses_remaining_--;
302 test->generic_stub_->experimental().PrepareBidiStreamingCall(
303 cli_ctx_.get(), method_name, this);
304 request_.set_message(test_str);
305 send_buf_ = SerializeToByteBuffer(&request_);
306 StartWrite(send_buf_.get());
307 StartRead(&recv_buf_);
308 StartCall();
309 } else {
310 std::unique_lock<std::mutex> l(mu_);
311 done_ = true;
312 cv_.notify_one();
313 }
314 };
315 activate_();
316 }
317 void OnWriteDone(bool /*ok*/) override {
318 if (do_writes_done_) {
319 StartWritesDone();
320 }
321 }
322 void OnReadDone(bool /*ok*/) override {
323 EchoResponse response;
324 EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
325 EXPECT_EQ(request_.message(), response.message());
326 };
327 void OnDone(const Status& s) override {
328 EXPECT_TRUE(s.ok());
329 activate_();
330 }
331 void Await() {
332 std::unique_lock<std::mutex> l(mu_);
333 while (!done_) {
334 cv_.wait(l);
335 }
336 }
337
338 EchoRequest request_;
339 std::unique_ptr<ByteBuffer> send_buf_;
340 ByteBuffer recv_buf_;
341 std::unique_ptr<ClientContext> cli_ctx_;
342 int reuses_remaining_;
343 std::function<void()> activate_;
344 std::mutex mu_;
345 std::condition_variable cv_;
346 bool done_ = false;
347 const bool do_writes_done_;
348 };
349
350 Client rpc(this, kMethodName, test_string, reuses, do_writes_done);
351
352 rpc.Await();
353 }
354 }
355 bool is_server_started_{false};
356 int picked_port_{0};
357 std::shared_ptr<Channel> channel_;
358 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
359 std::unique_ptr<grpc::GenericStub> generic_stub_;
360 TestServiceImpl service_;
361 CallbackTestServiceImpl callback_service_;
362 std::unique_ptr<Server> server_;
363 std::ostringstream server_address_;
364 };
365
TEST_P(ClientCallbackEnd2endTest,SimpleRpc)366 TEST_P(ClientCallbackEnd2endTest, SimpleRpc) {
367 ResetStub();
368 SendRpcs(1, false);
369 }
370
TEST_P(ClientCallbackEnd2endTest,SimpleRpcExpectedError)371 TEST_P(ClientCallbackEnd2endTest, SimpleRpcExpectedError) {
372 ResetStub();
373
374 EchoRequest request;
375 EchoResponse response;
376 ClientContext cli_ctx;
377 ErrorStatus error_status;
378
379 request.set_message("Hello failure");
380 error_status.set_code(1); // CANCELLED
381 error_status.set_error_message("cancel error message");
382 *request.mutable_param()->mutable_expected_error() = error_status;
383
384 std::mutex mu;
385 std::condition_variable cv;
386 bool done = false;
387
388 stub_->experimental_async()->Echo(
389 &cli_ctx, &request, &response,
390 [&response, &done, &mu, &cv, &error_status](Status s) {
391 EXPECT_EQ("", response.message());
392 EXPECT_EQ(error_status.code(), s.error_code());
393 EXPECT_EQ(error_status.error_message(), s.error_message());
394 std::lock_guard<std::mutex> l(mu);
395 done = true;
396 cv.notify_one();
397 });
398
399 std::unique_lock<std::mutex> l(mu);
400 while (!done) {
401 cv.wait(l);
402 }
403 }
404
TEST_P(ClientCallbackEnd2endTest,SimpleRpcUnderLockNested)405 TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLockNested) {
406 ResetStub();
407 std::mutex mu1, mu2, mu3;
408 std::condition_variable cv;
409 bool done = false;
410 EchoRequest request1, request2, request3;
411 request1.set_message("Hello locked world1.");
412 request2.set_message("Hello locked world2.");
413 request3.set_message("Hello locked world3.");
414 EchoResponse response1, response2, response3;
415 ClientContext cli_ctx1, cli_ctx2, cli_ctx3;
416 {
417 std::lock_guard<std::mutex> l(mu1);
418 stub_->experimental_async()->Echo(
419 &cli_ctx1, &request1, &response1,
420 [this, &mu1, &mu2, &mu3, &cv, &done, &request1, &request2, &request3,
421 &response1, &response2, &response3, &cli_ctx2, &cli_ctx3](Status s1) {
422 std::lock_guard<std::mutex> l1(mu1);
423 EXPECT_TRUE(s1.ok());
424 EXPECT_EQ(request1.message(), response1.message());
425 // start the second level of nesting
426 std::unique_lock<std::mutex> l2(mu2);
427 this->stub_->experimental_async()->Echo(
428 &cli_ctx2, &request2, &response2,
429 [this, &mu2, &mu3, &cv, &done, &request2, &request3, &response2,
430 &response3, &cli_ctx3](Status s2) {
431 std::lock_guard<std::mutex> l2(mu2);
432 EXPECT_TRUE(s2.ok());
433 EXPECT_EQ(request2.message(), response2.message());
434 // start the third level of nesting
435 std::lock_guard<std::mutex> l3(mu3);
436 stub_->experimental_async()->Echo(
437 &cli_ctx3, &request3, &response3,
438 [&mu3, &cv, &done, &request3, &response3](Status s3) {
439 std::lock_guard<std::mutex> l(mu3);
440 EXPECT_TRUE(s3.ok());
441 EXPECT_EQ(request3.message(), response3.message());
442 done = true;
443 cv.notify_all();
444 });
445 });
446 });
447 }
448
449 std::unique_lock<std::mutex> l(mu3);
450 while (!done) {
451 cv.wait(l);
452 }
453 }
454
TEST_P(ClientCallbackEnd2endTest,SimpleRpcUnderLock)455 TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLock) {
456 ResetStub();
457 std::mutex mu;
458 std::condition_variable cv;
459 bool done = false;
460 EchoRequest request;
461 request.set_message("Hello locked world.");
462 EchoResponse response;
463 ClientContext cli_ctx;
464 {
465 std::lock_guard<std::mutex> l(mu);
466 stub_->experimental_async()->Echo(
467 &cli_ctx, &request, &response,
468 [&mu, &cv, &done, &request, &response](Status s) {
469 std::lock_guard<std::mutex> l(mu);
470 EXPECT_TRUE(s.ok());
471 EXPECT_EQ(request.message(), response.message());
472 done = true;
473 cv.notify_one();
474 });
475 }
476 std::unique_lock<std::mutex> l(mu);
477 while (!done) {
478 cv.wait(l);
479 }
480 }
481
TEST_P(ClientCallbackEnd2endTest,SequentialRpcs)482 TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) {
483 ResetStub();
484 SendRpcs(10, false);
485 }
486
TEST_P(ClientCallbackEnd2endTest,SequentialRpcsRawReq)487 TEST_P(ClientCallbackEnd2endTest, SequentialRpcsRawReq) {
488 ResetStub();
489 SendRpcsRawReq(10);
490 }
491
TEST_P(ClientCallbackEnd2endTest,SendClientInitialMetadata)492 TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) {
493 ResetStub();
494 SimpleRequest request;
495 SimpleResponse response;
496 ClientContext cli_ctx;
497
498 cli_ctx.AddMetadata(kCheckClientInitialMetadataKey,
499 kCheckClientInitialMetadataVal);
500
501 std::mutex mu;
502 std::condition_variable cv;
503 bool done = false;
504 stub_->experimental_async()->CheckClientInitialMetadata(
505 &cli_ctx, &request, &response, [&done, &mu, &cv](Status s) {
506 GPR_ASSERT(s.ok());
507
508 std::lock_guard<std::mutex> l(mu);
509 done = true;
510 cv.notify_one();
511 });
512 std::unique_lock<std::mutex> l(mu);
513 while (!done) {
514 cv.wait(l);
515 }
516 }
517
TEST_P(ClientCallbackEnd2endTest,SimpleRpcWithBinaryMetadata)518 TEST_P(ClientCallbackEnd2endTest, SimpleRpcWithBinaryMetadata) {
519 ResetStub();
520 SendRpcs(1, true);
521 }
522
TEST_P(ClientCallbackEnd2endTest,SequentialRpcsWithVariedBinaryMetadataValue)523 TEST_P(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) {
524 ResetStub();
525 SendRpcs(10, true);
526 }
527
TEST_P(ClientCallbackEnd2endTest,SequentialGenericRpcs)528 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
529 ResetStub();
530 SendRpcsGeneric(10, false);
531 }
532
TEST_P(ClientCallbackEnd2endTest,SequentialGenericRpcsAsBidi)533 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) {
534 ResetStub();
535 SendGenericEchoAsBidi(10, 1, /*do_writes_done=*/true);
536 }
537
TEST_P(ClientCallbackEnd2endTest,SequentialGenericRpcsAsBidiWithReactorReuse)538 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) {
539 ResetStub();
540 SendGenericEchoAsBidi(10, 10, /*do_writes_done=*/true);
541 }
542
TEST_P(ClientCallbackEnd2endTest,GenericRpcNoWritesDone)543 TEST_P(ClientCallbackEnd2endTest, GenericRpcNoWritesDone) {
544 ResetStub();
545 SendGenericEchoAsBidi(1, 1, /*do_writes_done=*/false);
546 }
547
548 #if GRPC_ALLOW_EXCEPTIONS
TEST_P(ClientCallbackEnd2endTest,ExceptingRpc)549 TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) {
550 ResetStub();
551 SendRpcsGeneric(10, true);
552 }
553 #endif
554
TEST_P(ClientCallbackEnd2endTest,MultipleRpcsWithVariedBinaryMetadataValue)555 TEST_P(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
556 ResetStub();
557 std::vector<std::thread> threads;
558 threads.reserve(10);
559 for (int i = 0; i < 10; ++i) {
560 threads.emplace_back([this] { SendRpcs(10, true); });
561 }
562 for (int i = 0; i < 10; ++i) {
563 threads[i].join();
564 }
565 }
566
TEST_P(ClientCallbackEnd2endTest,MultipleRpcs)567 TEST_P(ClientCallbackEnd2endTest, MultipleRpcs) {
568 ResetStub();
569 std::vector<std::thread> threads;
570 threads.reserve(10);
571 for (int i = 0; i < 10; ++i) {
572 threads.emplace_back([this] { SendRpcs(10, false); });
573 }
574 for (int i = 0; i < 10; ++i) {
575 threads[i].join();
576 }
577 }
578
TEST_P(ClientCallbackEnd2endTest,CancelRpcBeforeStart)579 TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) {
580 ResetStub();
581 EchoRequest request;
582 EchoResponse response;
583 ClientContext context;
584 request.set_message("hello");
585 context.TryCancel();
586
587 std::mutex mu;
588 std::condition_variable cv;
589 bool done = false;
590 stub_->experimental_async()->Echo(
591 &context, &request, &response, [&response, &done, &mu, &cv](Status s) {
592 EXPECT_EQ("", response.message());
593 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
594 std::lock_guard<std::mutex> l(mu);
595 done = true;
596 cv.notify_one();
597 });
598 std::unique_lock<std::mutex> l(mu);
599 while (!done) {
600 cv.wait(l);
601 }
602 if (GetParam().use_interceptors) {
603 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
604 }
605 }
606
TEST_P(ClientCallbackEnd2endTest,RequestEchoServerCancel)607 TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) {
608 ResetStub();
609 EchoRequest request;
610 EchoResponse response;
611 ClientContext context;
612 request.set_message("hello");
613 context.AddMetadata(kServerTryCancelRequest,
614 std::to_string(CANCEL_BEFORE_PROCESSING));
615
616 std::mutex mu;
617 std::condition_variable cv;
618 bool done = false;
619 stub_->experimental_async()->Echo(
620 &context, &request, &response, [&done, &mu, &cv](Status s) {
621 EXPECT_FALSE(s.ok());
622 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
623 std::lock_guard<std::mutex> l(mu);
624 done = true;
625 cv.notify_one();
626 });
627 std::unique_lock<std::mutex> l(mu);
628 while (!done) {
629 cv.wait(l);
630 }
631 }
632
633 struct ClientCancelInfo {
634 bool cancel{false};
635 int ops_before_cancel;
636
ClientCancelInfogrpc::testing::__anon984e9ceb0111::ClientCancelInfo637 ClientCancelInfo() : cancel{false} {}
ClientCancelInfogrpc::testing::__anon984e9ceb0111::ClientCancelInfo638 explicit ClientCancelInfo(int ops) : cancel{true}, ops_before_cancel{ops} {}
639 };
640
641 class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
642 public:
WriteClient(grpc::testing::EchoTestService::Stub * stub,ServerTryCancelRequestPhase server_try_cancel,int num_msgs_to_send,ClientCancelInfo client_cancel={})643 WriteClient(grpc::testing::EchoTestService::Stub* stub,
644 ServerTryCancelRequestPhase server_try_cancel,
645 int num_msgs_to_send, ClientCancelInfo client_cancel = {})
646 : server_try_cancel_(server_try_cancel),
647 num_msgs_to_send_(num_msgs_to_send),
648 client_cancel_{client_cancel} {
649 std::string msg{"Hello server."};
650 for (int i = 0; i < num_msgs_to_send; i++) {
651 desired_ += msg;
652 }
653 if (server_try_cancel != DO_NOT_CANCEL) {
654 // Send server_try_cancel value in the client metadata
655 context_.AddMetadata(kServerTryCancelRequest,
656 std::to_string(server_try_cancel));
657 }
658 context_.set_initial_metadata_corked(true);
659 stub->experimental_async()->RequestStream(&context_, &response_, this);
660 StartCall();
661 request_.set_message(msg);
662 MaybeWrite();
663 }
OnWriteDone(bool ok)664 void OnWriteDone(bool ok) override {
665 if (ok) {
666 num_msgs_sent_++;
667 MaybeWrite();
668 }
669 }
OnDone(const Status & s)670 void OnDone(const Status& s) override {
671 gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent_);
672 int num_to_send =
673 (client_cancel_.cancel)
674 ? std::min(num_msgs_to_send_, client_cancel_.ops_before_cancel)
675 : num_msgs_to_send_;
676 switch (server_try_cancel_) {
677 case CANCEL_BEFORE_PROCESSING:
678 case CANCEL_DURING_PROCESSING:
679 // If the RPC is canceled by server before / during messages from the
680 // client, it means that the client most likely did not get a chance to
681 // send all the messages it wanted to send. i.e num_msgs_sent <=
682 // num_msgs_to_send
683 EXPECT_LE(num_msgs_sent_, num_to_send);
684 break;
685 case DO_NOT_CANCEL:
686 case CANCEL_AFTER_PROCESSING:
687 // If the RPC was not canceled or canceled after all messages were read
688 // by the server, the client did get a chance to send all its messages
689 EXPECT_EQ(num_msgs_sent_, num_to_send);
690 break;
691 default:
692 assert(false);
693 break;
694 }
695 if ((server_try_cancel_ == DO_NOT_CANCEL) && !client_cancel_.cancel) {
696 EXPECT_TRUE(s.ok());
697 EXPECT_EQ(response_.message(), desired_);
698 } else {
699 EXPECT_FALSE(s.ok());
700 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
701 }
702 std::unique_lock<std::mutex> l(mu_);
703 done_ = true;
704 cv_.notify_one();
705 }
Await()706 void Await() {
707 std::unique_lock<std::mutex> l(mu_);
708 while (!done_) {
709 cv_.wait(l);
710 }
711 }
712
713 private:
MaybeWrite()714 void MaybeWrite() {
715 if (client_cancel_.cancel &&
716 num_msgs_sent_ == client_cancel_.ops_before_cancel) {
717 context_.TryCancel();
718 } else if (num_msgs_to_send_ > num_msgs_sent_ + 1) {
719 StartWrite(&request_);
720 } else if (num_msgs_to_send_ == num_msgs_sent_ + 1) {
721 StartWriteLast(&request_, WriteOptions());
722 }
723 }
724 EchoRequest request_;
725 EchoResponse response_;
726 ClientContext context_;
727 const ServerTryCancelRequestPhase server_try_cancel_;
728 int num_msgs_sent_{0};
729 const int num_msgs_to_send_;
730 std::string desired_;
731 const ClientCancelInfo client_cancel_;
732 std::mutex mu_;
733 std::condition_variable cv_;
734 bool done_ = false;
735 };
736
TEST_P(ClientCallbackEnd2endTest,RequestStream)737 TEST_P(ClientCallbackEnd2endTest, RequestStream) {
738 ResetStub();
739 WriteClient test{stub_.get(), DO_NOT_CANCEL, 3};
740 test.Await();
741 // Make sure that the server interceptors were not notified to cancel
742 if (GetParam().use_interceptors) {
743 EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
744 }
745 }
746
TEST_P(ClientCallbackEnd2endTest,ClientCancelsRequestStream)747 TEST_P(ClientCallbackEnd2endTest, ClientCancelsRequestStream) {
748 ResetStub();
749 WriteClient test{stub_.get(), DO_NOT_CANCEL, 3, ClientCancelInfo{2}};
750 test.Await();
751 // Make sure that the server interceptors got the cancel
752 if (GetParam().use_interceptors) {
753 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
754 }
755 }
756
757 // Server to cancel before doing reading the request
TEST_P(ClientCallbackEnd2endTest,RequestStreamServerCancelBeforeReads)758 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelBeforeReads) {
759 ResetStub();
760 WriteClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 1};
761 test.Await();
762 // Make sure that the server interceptors were notified
763 if (GetParam().use_interceptors) {
764 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
765 }
766 }
767
768 // Server to cancel while reading a request from the stream in parallel
TEST_P(ClientCallbackEnd2endTest,RequestStreamServerCancelDuringRead)769 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelDuringRead) {
770 ResetStub();
771 WriteClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
772 test.Await();
773 // Make sure that the server interceptors were notified
774 if (GetParam().use_interceptors) {
775 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
776 }
777 }
778
779 // Server to cancel after reading all the requests but before returning to the
780 // client
TEST_P(ClientCallbackEnd2endTest,RequestStreamServerCancelAfterReads)781 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) {
782 ResetStub();
783 WriteClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 4};
784 test.Await();
785 // Make sure that the server interceptors were notified
786 if (GetParam().use_interceptors) {
787 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
788 }
789 }
790
TEST_P(ClientCallbackEnd2endTest,UnaryReactor)791 TEST_P(ClientCallbackEnd2endTest, UnaryReactor) {
792 ResetStub();
793 class UnaryClient : public grpc::experimental::ClientUnaryReactor {
794 public:
795 UnaryClient(grpc::testing::EchoTestService::Stub* stub) {
796 cli_ctx_.AddMetadata("key1", "val1");
797 cli_ctx_.AddMetadata("key2", "val2");
798 request_.mutable_param()->set_echo_metadata_initially(true);
799 request_.set_message("Hello metadata");
800 stub->experimental_async()->Echo(&cli_ctx_, &request_, &response_, this);
801 StartCall();
802 }
803 void OnReadInitialMetadataDone(bool ok) override {
804 EXPECT_TRUE(ok);
805 EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key1"));
806 EXPECT_EQ(
807 "val1",
808 ToString(cli_ctx_.GetServerInitialMetadata().find("key1")->second));
809 EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key2"));
810 EXPECT_EQ(
811 "val2",
812 ToString(cli_ctx_.GetServerInitialMetadata().find("key2")->second));
813 initial_metadata_done_ = true;
814 }
815 void OnDone(const Status& s) override {
816 EXPECT_TRUE(initial_metadata_done_);
817 EXPECT_EQ(0u, cli_ctx_.GetServerTrailingMetadata().size());
818 EXPECT_TRUE(s.ok());
819 EXPECT_EQ(request_.message(), response_.message());
820 std::unique_lock<std::mutex> l(mu_);
821 done_ = true;
822 cv_.notify_one();
823 }
824 void Await() {
825 std::unique_lock<std::mutex> l(mu_);
826 while (!done_) {
827 cv_.wait(l);
828 }
829 }
830
831 private:
832 EchoRequest request_;
833 EchoResponse response_;
834 ClientContext cli_ctx_;
835 std::mutex mu_;
836 std::condition_variable cv_;
837 bool done_{false};
838 bool initial_metadata_done_{false};
839 };
840
841 UnaryClient test{stub_.get()};
842 test.Await();
843 // Make sure that the server interceptors were not notified of a cancel
844 if (GetParam().use_interceptors) {
845 EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
846 }
847 }
848
TEST_P(ClientCallbackEnd2endTest,GenericUnaryReactor)849 TEST_P(ClientCallbackEnd2endTest, GenericUnaryReactor) {
850 ResetStub();
851 const std::string kMethodName("/grpc.testing.EchoTestService/Echo");
852 class UnaryClient : public grpc::experimental::ClientUnaryReactor {
853 public:
854 UnaryClient(grpc::GenericStub* stub, const std::string& method_name) {
855 cli_ctx_.AddMetadata("key1", "val1");
856 cli_ctx_.AddMetadata("key2", "val2");
857 request_.mutable_param()->set_echo_metadata_initially(true);
858 request_.set_message("Hello metadata");
859 send_buf_ = SerializeToByteBuffer(&request_);
860
861 stub->experimental().PrepareUnaryCall(&cli_ctx_, method_name,
862 send_buf_.get(), &recv_buf_, this);
863 StartCall();
864 }
865 void OnReadInitialMetadataDone(bool ok) override {
866 EXPECT_TRUE(ok);
867 EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key1"));
868 EXPECT_EQ(
869 "val1",
870 ToString(cli_ctx_.GetServerInitialMetadata().find("key1")->second));
871 EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key2"));
872 EXPECT_EQ(
873 "val2",
874 ToString(cli_ctx_.GetServerInitialMetadata().find("key2")->second));
875 initial_metadata_done_ = true;
876 }
877 void OnDone(const Status& s) override {
878 EXPECT_TRUE(initial_metadata_done_);
879 EXPECT_EQ(0u, cli_ctx_.GetServerTrailingMetadata().size());
880 EXPECT_TRUE(s.ok());
881 EchoResponse response;
882 EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
883 EXPECT_EQ(request_.message(), response.message());
884 std::unique_lock<std::mutex> l(mu_);
885 done_ = true;
886 cv_.notify_one();
887 }
888 void Await() {
889 std::unique_lock<std::mutex> l(mu_);
890 while (!done_) {
891 cv_.wait(l);
892 }
893 }
894
895 private:
896 EchoRequest request_;
897 std::unique_ptr<ByteBuffer> send_buf_;
898 ByteBuffer recv_buf_;
899 ClientContext cli_ctx_;
900 std::mutex mu_;
901 std::condition_variable cv_;
902 bool done_{false};
903 bool initial_metadata_done_{false};
904 };
905
906 UnaryClient test{generic_stub_.get(), kMethodName};
907 test.Await();
908 // Make sure that the server interceptors were not notified of a cancel
909 if (GetParam().use_interceptors) {
910 EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
911 }
912 }
913
914 class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
915 public:
ReadClient(grpc::testing::EchoTestService::Stub * stub,ServerTryCancelRequestPhase server_try_cancel,ClientCancelInfo client_cancel={})916 ReadClient(grpc::testing::EchoTestService::Stub* stub,
917 ServerTryCancelRequestPhase server_try_cancel,
918 ClientCancelInfo client_cancel = {})
919 : server_try_cancel_(server_try_cancel), client_cancel_{client_cancel} {
920 if (server_try_cancel_ != DO_NOT_CANCEL) {
921 // Send server_try_cancel value in the client metadata
922 context_.AddMetadata(kServerTryCancelRequest,
923 std::to_string(server_try_cancel));
924 }
925 request_.set_message("Hello client ");
926 stub->experimental_async()->ResponseStream(&context_, &request_, this);
927 if (client_cancel_.cancel &&
928 reads_complete_ == client_cancel_.ops_before_cancel) {
929 context_.TryCancel();
930 }
931 // Even if we cancel, read until failure because there might be responses
932 // pending
933 StartRead(&response_);
934 StartCall();
935 }
OnReadDone(bool ok)936 void OnReadDone(bool ok) override {
937 if (!ok) {
938 if (server_try_cancel_ == DO_NOT_CANCEL && !client_cancel_.cancel) {
939 EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
940 }
941 } else {
942 EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
943 EXPECT_EQ(response_.message(),
944 request_.message() + std::to_string(reads_complete_));
945 reads_complete_++;
946 if (client_cancel_.cancel &&
947 reads_complete_ == client_cancel_.ops_before_cancel) {
948 context_.TryCancel();
949 }
950 // Even if we cancel, read until failure because there might be responses
951 // pending
952 StartRead(&response_);
953 }
954 }
OnDone(const Status & s)955 void OnDone(const Status& s) override {
956 gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
957 switch (server_try_cancel_) {
958 case DO_NOT_CANCEL:
959 if (!client_cancel_.cancel || client_cancel_.ops_before_cancel >
960 kServerDefaultResponseStreamsToSend) {
961 EXPECT_TRUE(s.ok());
962 EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
963 } else {
964 EXPECT_GE(reads_complete_, client_cancel_.ops_before_cancel);
965 EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
966 // Status might be ok or cancelled depending on whether server
967 // sent status before client cancel went through
968 if (!s.ok()) {
969 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
970 }
971 }
972 break;
973 case CANCEL_BEFORE_PROCESSING:
974 EXPECT_FALSE(s.ok());
975 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
976 EXPECT_EQ(reads_complete_, 0);
977 break;
978 case CANCEL_DURING_PROCESSING:
979 case CANCEL_AFTER_PROCESSING:
980 // If server canceled while writing messages, client must have read
981 // less than or equal to the expected number of messages. Even if the
982 // server canceled after writing all messages, the RPC may be canceled
983 // before the Client got a chance to read all the messages.
984 EXPECT_FALSE(s.ok());
985 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
986 EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
987 break;
988 default:
989 assert(false);
990 }
991 std::unique_lock<std::mutex> l(mu_);
992 done_ = true;
993 cv_.notify_one();
994 }
Await()995 void Await() {
996 std::unique_lock<std::mutex> l(mu_);
997 while (!done_) {
998 cv_.wait(l);
999 }
1000 }
1001
1002 private:
1003 EchoRequest request_;
1004 EchoResponse response_;
1005 ClientContext context_;
1006 const ServerTryCancelRequestPhase server_try_cancel_;
1007 int reads_complete_{0};
1008 const ClientCancelInfo client_cancel_;
1009 std::mutex mu_;
1010 std::condition_variable cv_;
1011 bool done_ = false;
1012 };
1013
TEST_P(ClientCallbackEnd2endTest,ResponseStream)1014 TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
1015 ResetStub();
1016 ReadClient test{stub_.get(), DO_NOT_CANCEL};
1017 test.Await();
1018 // Make sure that the server interceptors were not notified of a cancel
1019 if (GetParam().use_interceptors) {
1020 EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
1021 }
1022 }
1023
TEST_P(ClientCallbackEnd2endTest,ClientCancelsResponseStream)1024 TEST_P(ClientCallbackEnd2endTest, ClientCancelsResponseStream) {
1025 ResetStub();
1026 ReadClient test{stub_.get(), DO_NOT_CANCEL, ClientCancelInfo{2}};
1027 test.Await();
1028 // Because cancel in this case races with server finish, we can't be sure that
1029 // server interceptors even see cancellation
1030 }
1031
1032 // Server to cancel before sending any response messages
TEST_P(ClientCallbackEnd2endTest,ResponseStreamServerCancelBefore)1033 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelBefore) {
1034 ResetStub();
1035 ReadClient test{stub_.get(), CANCEL_BEFORE_PROCESSING};
1036 test.Await();
1037 // Make sure that the server interceptors were notified
1038 if (GetParam().use_interceptors) {
1039 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1040 }
1041 }
1042
1043 // Server to cancel while writing a response to the stream in parallel
TEST_P(ClientCallbackEnd2endTest,ResponseStreamServerCancelDuring)1044 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelDuring) {
1045 ResetStub();
1046 ReadClient test{stub_.get(), CANCEL_DURING_PROCESSING};
1047 test.Await();
1048 // Make sure that the server interceptors were notified
1049 if (GetParam().use_interceptors) {
1050 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1051 }
1052 }
1053
1054 // Server to cancel after writing all the respones to the stream but before
1055 // returning to the client
TEST_P(ClientCallbackEnd2endTest,ResponseStreamServerCancelAfter)1056 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelAfter) {
1057 ResetStub();
1058 ReadClient test{stub_.get(), CANCEL_AFTER_PROCESSING};
1059 test.Await();
1060 // Make sure that the server interceptors were notified
1061 if (GetParam().use_interceptors) {
1062 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1063 }
1064 }
1065
1066 class BidiClient
1067 : public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> {
1068 public:
BidiClient(grpc::testing::EchoTestService::Stub * stub,ServerTryCancelRequestPhase server_try_cancel,int num_msgs_to_send,bool cork_metadata,bool first_write_async,ClientCancelInfo client_cancel={})1069 BidiClient(grpc::testing::EchoTestService::Stub* stub,
1070 ServerTryCancelRequestPhase server_try_cancel,
1071 int num_msgs_to_send, bool cork_metadata, bool first_write_async,
1072 ClientCancelInfo client_cancel = {})
1073 : server_try_cancel_(server_try_cancel),
1074 msgs_to_send_{num_msgs_to_send},
1075 client_cancel_{client_cancel} {
1076 if (server_try_cancel_ != DO_NOT_CANCEL) {
1077 // Send server_try_cancel value in the client metadata
1078 context_.AddMetadata(kServerTryCancelRequest,
1079 std::to_string(server_try_cancel));
1080 }
1081 request_.set_message("Hello fren ");
1082 context_.set_initial_metadata_corked(cork_metadata);
1083 stub->experimental_async()->BidiStream(&context_, this);
1084 MaybeAsyncWrite(first_write_async);
1085 StartRead(&response_);
1086 StartCall();
1087 }
OnReadDone(bool ok)1088 void OnReadDone(bool ok) override {
1089 if (!ok) {
1090 if (server_try_cancel_ == DO_NOT_CANCEL) {
1091 if (!client_cancel_.cancel) {
1092 EXPECT_EQ(reads_complete_, msgs_to_send_);
1093 } else {
1094 EXPECT_LE(reads_complete_, writes_complete_);
1095 }
1096 }
1097 } else {
1098 EXPECT_LE(reads_complete_, msgs_to_send_);
1099 EXPECT_EQ(response_.message(), request_.message());
1100 reads_complete_++;
1101 StartRead(&response_);
1102 }
1103 }
OnWriteDone(bool ok)1104 void OnWriteDone(bool ok) override {
1105 if (async_write_thread_.joinable()) {
1106 async_write_thread_.join();
1107 RemoveHold();
1108 }
1109 if (server_try_cancel_ == DO_NOT_CANCEL) {
1110 EXPECT_TRUE(ok);
1111 } else if (!ok) {
1112 return;
1113 }
1114 writes_complete_++;
1115 MaybeWrite();
1116 }
OnDone(const Status & s)1117 void OnDone(const Status& s) override {
1118 gpr_log(GPR_INFO, "Sent %d messages", writes_complete_);
1119 gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
1120 switch (server_try_cancel_) {
1121 case DO_NOT_CANCEL:
1122 if (!client_cancel_.cancel ||
1123 client_cancel_.ops_before_cancel > msgs_to_send_) {
1124 EXPECT_TRUE(s.ok());
1125 EXPECT_EQ(writes_complete_, msgs_to_send_);
1126 EXPECT_EQ(reads_complete_, writes_complete_);
1127 } else {
1128 EXPECT_FALSE(s.ok());
1129 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1130 EXPECT_EQ(writes_complete_, client_cancel_.ops_before_cancel);
1131 EXPECT_LE(reads_complete_, writes_complete_);
1132 }
1133 break;
1134 case CANCEL_BEFORE_PROCESSING:
1135 EXPECT_FALSE(s.ok());
1136 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1137 // The RPC is canceled before the server did any work or returned any
1138 // reads, but it's possible that some writes took place first from the
1139 // client
1140 EXPECT_LE(writes_complete_, msgs_to_send_);
1141 EXPECT_EQ(reads_complete_, 0);
1142 break;
1143 case CANCEL_DURING_PROCESSING:
1144 EXPECT_FALSE(s.ok());
1145 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1146 EXPECT_LE(writes_complete_, msgs_to_send_);
1147 EXPECT_LE(reads_complete_, writes_complete_);
1148 break;
1149 case CANCEL_AFTER_PROCESSING:
1150 EXPECT_FALSE(s.ok());
1151 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1152 EXPECT_EQ(writes_complete_, msgs_to_send_);
1153 // The Server canceled after reading the last message and after writing
1154 // the message to the client. However, the RPC cancellation might have
1155 // taken effect before the client actually read the response.
1156 EXPECT_LE(reads_complete_, writes_complete_);
1157 break;
1158 default:
1159 assert(false);
1160 }
1161 std::unique_lock<std::mutex> l(mu_);
1162 done_ = true;
1163 cv_.notify_one();
1164 }
Await()1165 void Await() {
1166 std::unique_lock<std::mutex> l(mu_);
1167 while (!done_) {
1168 cv_.wait(l);
1169 }
1170 }
1171
1172 private:
MaybeAsyncWrite(bool first_write_async)1173 void MaybeAsyncWrite(bool first_write_async) {
1174 if (first_write_async) {
1175 // Make sure that we have a write to issue.
1176 // TODO(vjpai): Make this work with 0 writes case as well.
1177 assert(msgs_to_send_ >= 1);
1178
1179 AddHold();
1180 async_write_thread_ = std::thread([this] {
1181 std::unique_lock<std::mutex> lock(async_write_thread_mu_);
1182 async_write_thread_cv_.wait(
1183 lock, [this] { return async_write_thread_start_; });
1184 MaybeWrite();
1185 });
1186 std::lock_guard<std::mutex> lock(async_write_thread_mu_);
1187 async_write_thread_start_ = true;
1188 async_write_thread_cv_.notify_one();
1189 return;
1190 }
1191 MaybeWrite();
1192 }
MaybeWrite()1193 void MaybeWrite() {
1194 if (client_cancel_.cancel &&
1195 writes_complete_ == client_cancel_.ops_before_cancel) {
1196 context_.TryCancel();
1197 } else if (writes_complete_ == msgs_to_send_) {
1198 StartWritesDone();
1199 } else {
1200 StartWrite(&request_);
1201 }
1202 }
1203 EchoRequest request_;
1204 EchoResponse response_;
1205 ClientContext context_;
1206 const ServerTryCancelRequestPhase server_try_cancel_;
1207 int reads_complete_{0};
1208 int writes_complete_{0};
1209 const int msgs_to_send_;
1210 const ClientCancelInfo client_cancel_;
1211 std::mutex mu_;
1212 std::condition_variable cv_;
1213 bool done_ = false;
1214 std::thread async_write_thread_;
1215 bool async_write_thread_start_ = false;
1216 std::mutex async_write_thread_mu_;
1217 std::condition_variable async_write_thread_cv_;
1218 };
1219
TEST_P(ClientCallbackEnd2endTest,BidiStream)1220 TEST_P(ClientCallbackEnd2endTest, BidiStream) {
1221 ResetStub();
1222 BidiClient test(stub_.get(), DO_NOT_CANCEL,
1223 kServerDefaultResponseStreamsToSend,
1224 /*cork_metadata=*/false, /*first_write_async=*/false);
1225 test.Await();
1226 // Make sure that the server interceptors were not notified of a cancel
1227 if (GetParam().use_interceptors) {
1228 EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
1229 }
1230 }
1231
TEST_P(ClientCallbackEnd2endTest,BidiStreamFirstWriteAsync)1232 TEST_P(ClientCallbackEnd2endTest, BidiStreamFirstWriteAsync) {
1233 ResetStub();
1234 BidiClient test(stub_.get(), DO_NOT_CANCEL,
1235 kServerDefaultResponseStreamsToSend,
1236 /*cork_metadata=*/false, /*first_write_async=*/true);
1237 test.Await();
1238 // Make sure that the server interceptors were not notified of a cancel
1239 if (GetParam().use_interceptors) {
1240 EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
1241 }
1242 }
1243
TEST_P(ClientCallbackEnd2endTest,BidiStreamCorked)1244 TEST_P(ClientCallbackEnd2endTest, BidiStreamCorked) {
1245 ResetStub();
1246 BidiClient test(stub_.get(), DO_NOT_CANCEL,
1247 kServerDefaultResponseStreamsToSend,
1248 /*cork_metadata=*/true, /*first_write_async=*/false);
1249 test.Await();
1250 // Make sure that the server interceptors were not notified of a cancel
1251 if (GetParam().use_interceptors) {
1252 EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
1253 }
1254 }
1255
TEST_P(ClientCallbackEnd2endTest,BidiStreamCorkedFirstWriteAsync)1256 TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) {
1257 ResetStub();
1258 BidiClient test(stub_.get(), DO_NOT_CANCEL,
1259 kServerDefaultResponseStreamsToSend,
1260 /*cork_metadata=*/true, /*first_write_async=*/true);
1261 test.Await();
1262 // Make sure that the server interceptors were not notified of a cancel
1263 if (GetParam().use_interceptors) {
1264 EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
1265 }
1266 }
1267
TEST_P(ClientCallbackEnd2endTest,ClientCancelsBidiStream)1268 TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
1269 ResetStub();
1270 BidiClient test(stub_.get(), DO_NOT_CANCEL,
1271 kServerDefaultResponseStreamsToSend,
1272 /*cork_metadata=*/false, /*first_write_async=*/false,
1273 ClientCancelInfo(2));
1274 test.Await();
1275 // Make sure that the server interceptors were notified of a cancel
1276 if (GetParam().use_interceptors) {
1277 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1278 }
1279 }
1280
1281 // Server to cancel before reading/writing any requests/responses on the stream
TEST_P(ClientCallbackEnd2endTest,BidiStreamServerCancelBefore)1282 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
1283 ResetStub();
1284 BidiClient test(stub_.get(), CANCEL_BEFORE_PROCESSING, /*num_msgs_to_send=*/2,
1285 /*cork_metadata=*/false, /*first_write_async=*/false);
1286 test.Await();
1287 // Make sure that the server interceptors were notified
1288 if (GetParam().use_interceptors) {
1289 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1290 }
1291 }
1292
1293 // Server to cancel while reading/writing requests/responses on the stream in
1294 // parallel
TEST_P(ClientCallbackEnd2endTest,BidiStreamServerCancelDuring)1295 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
1296 ResetStub();
1297 BidiClient test(stub_.get(), CANCEL_DURING_PROCESSING,
1298 /*num_msgs_to_send=*/10, /*cork_metadata=*/false,
1299 /*first_write_async=*/false);
1300 test.Await();
1301 // Make sure that the server interceptors were notified
1302 if (GetParam().use_interceptors) {
1303 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1304 }
1305 }
1306
1307 // Server to cancel after reading/writing all requests/responses on the stream
1308 // but before returning to the client
TEST_P(ClientCallbackEnd2endTest,BidiStreamServerCancelAfter)1309 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) {
1310 ResetStub();
1311 BidiClient test(stub_.get(), CANCEL_AFTER_PROCESSING, /*num_msgs_to_send=*/5,
1312 /*cork_metadata=*/false, /*first_write_async=*/false);
1313 test.Await();
1314 // Make sure that the server interceptors were notified
1315 if (GetParam().use_interceptors) {
1316 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1317 }
1318 }
1319
TEST_P(ClientCallbackEnd2endTest,SimultaneousReadAndWritesDone)1320 TEST_P(ClientCallbackEnd2endTest, SimultaneousReadAndWritesDone) {
1321 ResetStub();
1322 class Client : public grpc::experimental::ClientBidiReactor<EchoRequest,
1323 EchoResponse> {
1324 public:
1325 Client(grpc::testing::EchoTestService::Stub* stub) {
1326 request_.set_message("Hello bidi ");
1327 stub->experimental_async()->BidiStream(&context_, this);
1328 StartWrite(&request_);
1329 StartCall();
1330 }
1331 void OnReadDone(bool ok) override {
1332 EXPECT_TRUE(ok);
1333 EXPECT_EQ(response_.message(), request_.message());
1334 }
1335 void OnWriteDone(bool ok) override {
1336 EXPECT_TRUE(ok);
1337 // Now send out the simultaneous Read and WritesDone
1338 StartWritesDone();
1339 StartRead(&response_);
1340 }
1341 void OnDone(const Status& s) override {
1342 EXPECT_TRUE(s.ok());
1343 EXPECT_EQ(response_.message(), request_.message());
1344 std::unique_lock<std::mutex> l(mu_);
1345 done_ = true;
1346 cv_.notify_one();
1347 }
1348 void Await() {
1349 std::unique_lock<std::mutex> l(mu_);
1350 while (!done_) {
1351 cv_.wait(l);
1352 }
1353 }
1354
1355 private:
1356 EchoRequest request_;
1357 EchoResponse response_;
1358 ClientContext context_;
1359 std::mutex mu_;
1360 std::condition_variable cv_;
1361 bool done_ = false;
1362 } test{stub_.get()};
1363
1364 test.Await();
1365 }
1366
TEST_P(ClientCallbackEnd2endTest,UnimplementedRpc)1367 TEST_P(ClientCallbackEnd2endTest, UnimplementedRpc) {
1368 ChannelArguments args;
1369 const auto& channel_creds = GetCredentialsProvider()->GetChannelCredentials(
1370 GetParam().credentials_type, &args);
1371 std::shared_ptr<Channel> channel =
1372 (GetParam().protocol == Protocol::TCP)
1373 ? ::grpc::CreateCustomChannel(server_address_.str(), channel_creds,
1374 args)
1375 : server_->InProcessChannel(args);
1376 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1377 stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
1378 EchoRequest request;
1379 EchoResponse response;
1380 ClientContext cli_ctx;
1381 request.set_message("Hello world.");
1382 std::mutex mu;
1383 std::condition_variable cv;
1384 bool done = false;
1385 stub->experimental_async()->Unimplemented(
1386 &cli_ctx, &request, &response, [&done, &mu, &cv](Status s) {
1387 EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
1388 EXPECT_EQ("", s.error_message());
1389
1390 std::lock_guard<std::mutex> l(mu);
1391 done = true;
1392 cv.notify_one();
1393 });
1394 std::unique_lock<std::mutex> l(mu);
1395 while (!done) {
1396 cv.wait(l);
1397 }
1398 }
1399
TEST_P(ClientCallbackEnd2endTest,ResponseStreamExtraReactionFlowReadsUntilDone)1400 TEST_P(ClientCallbackEnd2endTest,
1401 ResponseStreamExtraReactionFlowReadsUntilDone) {
1402 ResetStub();
1403 class ReadAllIncomingDataClient
1404 : public grpc::experimental::ClientReadReactor<EchoResponse> {
1405 public:
1406 ReadAllIncomingDataClient(grpc::testing::EchoTestService::Stub* stub) {
1407 request_.set_message("Hello client ");
1408 stub->experimental_async()->ResponseStream(&context_, &request_, this);
1409 }
1410 bool WaitForReadDone() {
1411 std::unique_lock<std::mutex> l(mu_);
1412 while (!read_done_) {
1413 read_cv_.wait(l);
1414 }
1415 read_done_ = false;
1416 return read_ok_;
1417 }
1418 void Await() {
1419 std::unique_lock<std::mutex> l(mu_);
1420 while (!done_) {
1421 done_cv_.wait(l);
1422 }
1423 }
1424 // RemoveHold under the same lock used for OnDone to make sure that we don't
1425 // call OnDone directly or indirectly from the RemoveHold function.
1426 void RemoveHoldUnderLock() {
1427 std::unique_lock<std::mutex> l(mu_);
1428 RemoveHold();
1429 }
1430 const Status& status() {
1431 std::unique_lock<std::mutex> l(mu_);
1432 return status_;
1433 }
1434
1435 private:
1436 void OnReadDone(bool ok) override {
1437 std::unique_lock<std::mutex> l(mu_);
1438 read_ok_ = ok;
1439 read_done_ = true;
1440 read_cv_.notify_one();
1441 }
1442 void OnDone(const Status& s) override {
1443 std::unique_lock<std::mutex> l(mu_);
1444 done_ = true;
1445 status_ = s;
1446 done_cv_.notify_one();
1447 }
1448
1449 EchoRequest request_;
1450 EchoResponse response_;
1451 ClientContext context_;
1452 bool read_ok_ = false;
1453 bool read_done_ = false;
1454 std::mutex mu_;
1455 std::condition_variable read_cv_;
1456 std::condition_variable done_cv_;
1457 bool done_ = false;
1458 Status status_;
1459 } client{stub_.get()};
1460
1461 int reads_complete = 0;
1462 client.AddHold();
1463 client.StartCall();
1464
1465 EchoResponse response;
1466 bool read_ok = true;
1467 while (read_ok) {
1468 client.StartRead(&response);
1469 read_ok = client.WaitForReadDone();
1470 if (read_ok) {
1471 ++reads_complete;
1472 }
1473 }
1474 client.RemoveHoldUnderLock();
1475 client.Await();
1476
1477 EXPECT_EQ(kServerDefaultResponseStreamsToSend, reads_complete);
1478 EXPECT_EQ(client.status().error_code(), grpc::StatusCode::OK);
1479 }
1480
CreateTestScenarios(bool test_insecure)1481 std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
1482 #if TARGET_OS_IPHONE
1483 // Workaround Apple CFStream bug
1484 gpr_setenv("grpc_cfstream", "0");
1485 #endif
1486
1487 std::vector<TestScenario> scenarios;
1488 std::vector<std::string> credentials_types{
1489 GetCredentialsProvider()->GetSecureCredentialsTypeList()};
1490 auto insec_ok = [] {
1491 // Only allow insecure credentials type when it is registered with the
1492 // provider. User may create providers that do not have insecure.
1493 return GetCredentialsProvider()->GetChannelCredentials(
1494 kInsecureCredentialsType, nullptr) != nullptr;
1495 };
1496 if (test_insecure && insec_ok()) {
1497 credentials_types.push_back(kInsecureCredentialsType);
1498 }
1499 GPR_ASSERT(!credentials_types.empty());
1500
1501 bool barr[]{false, true};
1502 Protocol parr[]{Protocol::INPROC, Protocol::TCP};
1503 for (Protocol p : parr) {
1504 for (const auto& cred : credentials_types) {
1505 // TODO(vjpai): Test inproc with secure credentials when feasible
1506 if (p == Protocol::INPROC &&
1507 (cred != kInsecureCredentialsType || !insec_ok())) {
1508 continue;
1509 }
1510 for (bool callback_server : barr) {
1511 for (bool use_interceptors : barr) {
1512 scenarios.emplace_back(callback_server, p, use_interceptors, cred);
1513 }
1514 }
1515 }
1516 }
1517 return scenarios;
1518 }
1519
1520 INSTANTIATE_TEST_SUITE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest,
1521 ::testing::ValuesIn(CreateTestScenarios(true)));
1522
1523 } // namespace
1524 } // namespace testing
1525 } // namespace grpc
1526
main(int argc,char ** argv)1527 int main(int argc, char** argv) {
1528 ::testing::InitGoogleTest(&argc, argv);
1529 grpc::testing::TestEnvironment env(argc, argv);
1530 return RUN_ALL_TESTS();
1531 }
1532