1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <cinttypes>
20 #include <memory>
21 #include <thread>
22
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/channel.h>
28 #include <grpcpp/client_context.h>
29 #include <grpcpp/create_channel.h>
30 #include <grpcpp/ext/health_check_service_server_builder_option.h>
31 #include <grpcpp/server.h>
32 #include <grpcpp/server_builder.h>
33 #include <grpcpp/server_context.h>
34
35 #include "src/core/lib/gpr/env.h"
36 #include "src/core/lib/gpr/tls.h"
37 #include "src/core/lib/iomgr/port.h"
38 #include "src/proto/grpc/health/v1/health.grpc.pb.h"
39 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
40 #include "src/proto/grpc/testing/echo.grpc.pb.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/util/string_ref_helper.h"
44 #include "test/cpp/util/test_credentials_provider.h"
45
46 #include <gtest/gtest.h>
47
48 using grpc::testing::EchoRequest;
49 using grpc::testing::EchoResponse;
50 using grpc::testing::kTlsCredentialsType;
51 using std::chrono::system_clock;
52
53 namespace grpc {
54 namespace testing {
55
56 namespace {
57
tag(int i)58 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
detag(void * p)59 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
60
61 class Verifier {
62 public:
Verifier()63 Verifier() : lambda_run_(false) {}
64 // Expect sets the expected ok value for a specific tag
Expect(int i,bool expect_ok)65 Verifier& Expect(int i, bool expect_ok) {
66 return ExpectUnless(i, expect_ok, false);
67 }
68 // ExpectUnless sets the expected ok value for a specific tag
69 // unless the tag was already marked seen (as a result of ExpectMaybe)
ExpectUnless(int i,bool expect_ok,bool seen)70 Verifier& ExpectUnless(int i, bool expect_ok, bool seen) {
71 if (!seen) {
72 expectations_[tag(i)] = expect_ok;
73 }
74 return *this;
75 }
76 // ExpectMaybe sets the expected ok value for a specific tag, but does not
77 // require it to appear
78 // If it does, sets *seen to true
ExpectMaybe(int i,bool expect_ok,bool * seen)79 Verifier& ExpectMaybe(int i, bool expect_ok, bool* seen) {
80 if (!*seen) {
81 maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen};
82 }
83 return *this;
84 }
85
86 // Next waits for 1 async tag to complete, checks its
87 // expectations, and returns the tag
Next(CompletionQueue * cq,bool ignore_ok)88 int Next(CompletionQueue* cq, bool ignore_ok) {
89 bool ok;
90 void* got_tag;
91 EXPECT_TRUE(cq->Next(&got_tag, &ok));
92 GotTag(got_tag, ok, ignore_ok);
93 return detag(got_tag);
94 }
95
96 template <typename T>
DoOnceThenAsyncNext(CompletionQueue * cq,void ** got_tag,bool * ok,T deadline,std::function<void (void)> lambda)97 CompletionQueue::NextStatus DoOnceThenAsyncNext(
98 CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
99 std::function<void(void)> lambda) {
100 if (lambda_run_) {
101 return cq->AsyncNext(got_tag, ok, deadline);
102 } else {
103 lambda_run_ = true;
104 return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
105 }
106 }
107
108 // Verify keeps calling Next until all currently set
109 // expected tags are complete
Verify(CompletionQueue * cq)110 void Verify(CompletionQueue* cq) { Verify(cq, false); }
111
112 // This version of Verify allows optionally ignoring the
113 // outcome of the expectation
Verify(CompletionQueue * cq,bool ignore_ok)114 void Verify(CompletionQueue* cq, bool ignore_ok) {
115 GPR_ASSERT(!expectations_.empty() || !maybe_expectations_.empty());
116 while (!expectations_.empty()) {
117 Next(cq, ignore_ok);
118 }
119 }
120
121 // This version of Verify stops after a certain deadline
Verify(CompletionQueue * cq,std::chrono::system_clock::time_point deadline)122 void Verify(CompletionQueue* cq,
123 std::chrono::system_clock::time_point deadline) {
124 if (expectations_.empty()) {
125 bool ok;
126 void* got_tag;
127 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
128 CompletionQueue::TIMEOUT);
129 } else {
130 while (!expectations_.empty()) {
131 bool ok;
132 void* got_tag;
133 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
134 CompletionQueue::GOT_EVENT);
135 GotTag(got_tag, ok, false);
136 }
137 }
138 }
139
140 // This version of Verify stops after a certain deadline, and uses the
141 // DoThenAsyncNext API
142 // to call the lambda
Verify(CompletionQueue * cq,std::chrono::system_clock::time_point deadline,const std::function<void (void)> & lambda)143 void Verify(CompletionQueue* cq,
144 std::chrono::system_clock::time_point deadline,
145 const std::function<void(void)>& lambda) {
146 if (expectations_.empty()) {
147 bool ok;
148 void* got_tag;
149 EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
150 CompletionQueue::TIMEOUT);
151 } else {
152 while (!expectations_.empty()) {
153 bool ok;
154 void* got_tag;
155 EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
156 CompletionQueue::GOT_EVENT);
157 GotTag(got_tag, ok, false);
158 }
159 }
160 }
161
162 private:
GotTag(void * got_tag,bool ok,bool ignore_ok)163 void GotTag(void* got_tag, bool ok, bool ignore_ok) {
164 auto it = expectations_.find(got_tag);
165 if (it != expectations_.end()) {
166 if (!ignore_ok) {
167 EXPECT_EQ(it->second, ok);
168 }
169 expectations_.erase(it);
170 } else {
171 auto it2 = maybe_expectations_.find(got_tag);
172 if (it2 != maybe_expectations_.end()) {
173 if (it2->second.seen != nullptr) {
174 EXPECT_FALSE(*it2->second.seen);
175 *it2->second.seen = true;
176 }
177 if (!ignore_ok) {
178 EXPECT_EQ(it2->second.ok, ok);
179 }
180 } else {
181 gpr_log(GPR_ERROR, "Unexpected tag: %p", got_tag);
182 abort();
183 }
184 }
185 }
186
187 struct MaybeExpect {
188 bool ok;
189 bool* seen;
190 };
191
192 std::map<void*, bool> expectations_;
193 std::map<void*, MaybeExpect> maybe_expectations_;
194 bool lambda_run_;
195 };
196
plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin> & plugin)197 bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
198 return plugin->has_sync_methods();
199 }
200
201 // This class disables the server builder plugins that may add sync services to
202 // the server. If there are sync services, UnimplementedRpc test will triger
203 // the sync unknown rpc routine on the server side, rather than the async one
204 // that needs to be tested here.
205 class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
206 public:
UpdateArguments(ChannelArguments * arg)207 void UpdateArguments(ChannelArguments* arg) override {}
208
UpdatePlugins(std::vector<std::unique_ptr<ServerBuilderPlugin>> * plugins)209 void UpdatePlugins(
210 std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) override {
211 plugins->erase(std::remove_if(plugins->begin(), plugins->end(),
212 plugin_has_sync_methods),
213 plugins->end());
214 }
215 };
216
217 class TestScenario {
218 public:
TestScenario(bool inproc_stub,const grpc::string & creds_type,bool hcs,const grpc::string & content)219 TestScenario(bool inproc_stub, const grpc::string& creds_type, bool hcs,
220 const grpc::string& content)
221 : inproc(inproc_stub),
222 health_check_service(hcs),
223 credentials_type(creds_type),
224 message_content(content) {}
225 void Log() const;
226 bool inproc;
227 bool health_check_service;
228 const grpc::string credentials_type;
229 const grpc::string message_content;
230 };
231
operator <<(std::ostream & out,const TestScenario & scenario)232 static std::ostream& operator<<(std::ostream& out,
233 const TestScenario& scenario) {
234 return out << "TestScenario{inproc=" << (scenario.inproc ? "true" : "false")
235 << ", credentials='" << scenario.credentials_type
236 << ", health_check_service="
237 << (scenario.health_check_service ? "true" : "false")
238 << "', message_size=" << scenario.message_content.size() << "}";
239 }
240
Log() const241 void TestScenario::Log() const {
242 std::ostringstream out;
243 out << *this;
244 gpr_log(GPR_DEBUG, "%s", out.str().c_str());
245 }
246
247 class HealthCheck : public health::v1::Health::Service {};
248
249 class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
250 protected:
AsyncEnd2endTest()251 AsyncEnd2endTest() { GetParam().Log(); }
252
SetUp()253 void SetUp() override {
254 port_ = grpc_pick_unused_port_or_die();
255 server_address_ << "localhost:" << port_;
256
257 // Setup server
258 BuildAndStartServer();
259 }
260
TearDown()261 void TearDown() override {
262 server_->Shutdown();
263 void* ignored_tag;
264 bool ignored_ok;
265 cq_->Shutdown();
266 while (cq_->Next(&ignored_tag, &ignored_ok))
267 ;
268 stub_.reset();
269 grpc_recycle_unused_port(port_);
270 }
271
BuildAndStartServer()272 void BuildAndStartServer() {
273 ServerBuilder builder;
274 auto server_creds = GetCredentialsProvider()->GetServerCredentials(
275 GetParam().credentials_type);
276 builder.AddListeningPort(server_address_.str(), server_creds);
277 service_.reset(new grpc::testing::EchoTestService::AsyncService());
278 builder.RegisterService(service_.get());
279 if (GetParam().health_check_service) {
280 builder.RegisterService(&health_check_);
281 }
282 cq_ = builder.AddCompletionQueue();
283
284 // TODO(zyc): make a test option to choose wheather sync plugins should be
285 // deleted
286 std::unique_ptr<ServerBuilderOption> sync_plugin_disabler(
287 new ServerBuilderSyncPluginDisabler());
288 builder.SetOption(move(sync_plugin_disabler));
289 server_ = builder.BuildAndStart();
290 }
291
ResetStub()292 void ResetStub() {
293 ChannelArguments args;
294 auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
295 GetParam().credentials_type, &args);
296 std::shared_ptr<Channel> channel =
297 !(GetParam().inproc)
298 ? CreateCustomChannel(server_address_.str(), channel_creds, args)
299 : server_->InProcessChannel(args);
300 stub_ = grpc::testing::EchoTestService::NewStub(channel);
301 }
302
SendRpc(int num_rpcs)303 void SendRpc(int num_rpcs) {
304 for (int i = 0; i < num_rpcs; i++) {
305 EchoRequest send_request;
306 EchoRequest recv_request;
307 EchoResponse send_response;
308 EchoResponse recv_response;
309 Status recv_status;
310
311 ClientContext cli_ctx;
312 ServerContext srv_ctx;
313 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
314
315 send_request.set_message(GetParam().message_content);
316 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
317 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
318
319 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
320 cq_.get(), cq_.get(), tag(2));
321
322 response_reader->Finish(&recv_response, &recv_status, tag(4));
323
324 Verifier().Expect(2, true).Verify(cq_.get());
325 EXPECT_EQ(send_request.message(), recv_request.message());
326
327 send_response.set_message(recv_request.message());
328 response_writer.Finish(send_response, Status::OK, tag(3));
329 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
330
331 EXPECT_EQ(send_response.message(), recv_response.message());
332 EXPECT_TRUE(recv_status.ok());
333 }
334 }
335
336 std::unique_ptr<ServerCompletionQueue> cq_;
337 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
338 std::unique_ptr<Server> server_;
339 std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
340 HealthCheck health_check_;
341 std::ostringstream server_address_;
342 int port_;
343 };
344
TEST_P(AsyncEnd2endTest,SimpleRpc)345 TEST_P(AsyncEnd2endTest, SimpleRpc) {
346 ResetStub();
347 SendRpc(1);
348 }
349
TEST_P(AsyncEnd2endTest,SequentialRpcs)350 TEST_P(AsyncEnd2endTest, SequentialRpcs) {
351 ResetStub();
352 SendRpc(10);
353 }
354
TEST_P(AsyncEnd2endTest,ReconnectChannel)355 TEST_P(AsyncEnd2endTest, ReconnectChannel) {
356 // GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS is set to 100ms in main()
357 if (GetParam().inproc) {
358 return;
359 }
360 int poller_slowdown_factor = 1;
361 // It needs 2 pollset_works to reconnect the channel with polling engine
362 // "poll"
363 char* s = gpr_getenv("GRPC_POLL_STRATEGY");
364 if (s != nullptr && 0 == strcmp(s, "poll")) {
365 poller_slowdown_factor = 2;
366 }
367 gpr_free(s);
368 ResetStub();
369 SendRpc(1);
370 server_->Shutdown();
371 void* ignored_tag;
372 bool ignored_ok;
373 cq_->Shutdown();
374 while (cq_->Next(&ignored_tag, &ignored_ok))
375 ;
376 BuildAndStartServer();
377 // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
378 // reconnect the channel.
379 gpr_sleep_until(gpr_time_add(
380 gpr_now(GPR_CLOCK_REALTIME),
381 gpr_time_from_millis(
382 300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
383 GPR_TIMESPAN)));
384 SendRpc(1);
385 }
386
387 // We do not need to protect notify because the use is synchronized.
ServerWait(Server * server,int * notify)388 void ServerWait(Server* server, int* notify) {
389 server->Wait();
390 *notify = 1;
391 }
TEST_P(AsyncEnd2endTest,WaitAndShutdownTest)392 TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
393 int notify = 0;
394 std::thread wait_thread(&ServerWait, server_.get(), ¬ify);
395 ResetStub();
396 SendRpc(1);
397 EXPECT_EQ(0, notify);
398 server_->Shutdown();
399 wait_thread.join();
400 EXPECT_EQ(1, notify);
401 }
402
TEST_P(AsyncEnd2endTest,ShutdownThenWait)403 TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
404 ResetStub();
405 SendRpc(1);
406 std::thread t([this]() { server_->Shutdown(); });
407 server_->Wait();
408 t.join();
409 }
410
411 // Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest,AsyncNextRpc)412 TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
413 ResetStub();
414
415 EchoRequest send_request;
416 EchoRequest recv_request;
417 EchoResponse send_response;
418 EchoResponse recv_response;
419 Status recv_status;
420
421 ClientContext cli_ctx;
422 ServerContext srv_ctx;
423 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
424
425 send_request.set_message(GetParam().message_content);
426 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
427 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
428
429 std::chrono::system_clock::time_point time_now(
430 std::chrono::system_clock::now());
431 std::chrono::system_clock::time_point time_limit(
432 std::chrono::system_clock::now() + std::chrono::seconds(10));
433 Verifier().Verify(cq_.get(), time_now);
434 Verifier().Verify(cq_.get(), time_now);
435
436 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
437 cq_.get(), tag(2));
438 response_reader->Finish(&recv_response, &recv_status, tag(4));
439
440 Verifier().Expect(2, true).Verify(cq_.get(), time_limit);
441 EXPECT_EQ(send_request.message(), recv_request.message());
442
443 send_response.set_message(recv_request.message());
444 response_writer.Finish(send_response, Status::OK, tag(3));
445 Verifier().Expect(3, true).Expect(4, true).Verify(
446 cq_.get(), std::chrono::system_clock::time_point::max());
447
448 EXPECT_EQ(send_response.message(), recv_response.message());
449 EXPECT_TRUE(recv_status.ok());
450 }
451
452 // Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest,DoThenAsyncNextRpc)453 TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
454 ResetStub();
455
456 EchoRequest send_request;
457 EchoRequest recv_request;
458 EchoResponse send_response;
459 EchoResponse recv_response;
460 Status recv_status;
461
462 ClientContext cli_ctx;
463 ServerContext srv_ctx;
464 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
465
466 send_request.set_message(GetParam().message_content);
467 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
468 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
469
470 std::chrono::system_clock::time_point time_now(
471 std::chrono::system_clock::now());
472 std::chrono::system_clock::time_point time_limit(
473 std::chrono::system_clock::now() + std::chrono::seconds(10));
474 Verifier().Verify(cq_.get(), time_now);
475 Verifier().Verify(cq_.get(), time_now);
476
477 auto resp_writer_ptr = &response_writer;
478 auto lambda_2 = [&, this, resp_writer_ptr]() {
479 service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
480 cq_.get(), tag(2));
481 };
482 response_reader->Finish(&recv_response, &recv_status, tag(4));
483
484 Verifier().Expect(2, true).Verify(cq_.get(), time_limit, lambda_2);
485 EXPECT_EQ(send_request.message(), recv_request.message());
486
487 send_response.set_message(recv_request.message());
488 auto lambda_3 = [resp_writer_ptr, send_response]() {
489 resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
490 };
491 Verifier().Expect(3, true).Expect(4, true).Verify(
492 cq_.get(), std::chrono::system_clock::time_point::max(), lambda_3);
493
494 EXPECT_EQ(send_response.message(), recv_response.message());
495 EXPECT_TRUE(recv_status.ok());
496 }
497
498 // Two pings and a final pong.
TEST_P(AsyncEnd2endTest,SimpleClientStreaming)499 TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
500 ResetStub();
501
502 EchoRequest send_request;
503 EchoRequest recv_request;
504 EchoResponse send_response;
505 EchoResponse recv_response;
506 Status recv_status;
507 ClientContext cli_ctx;
508 ServerContext srv_ctx;
509 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
510
511 send_request.set_message(GetParam().message_content);
512 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
513 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
514
515 service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
516 tag(2));
517
518 Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
519
520 cli_stream->Write(send_request, tag(3));
521 srv_stream.Read(&recv_request, tag(4));
522 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
523 EXPECT_EQ(send_request.message(), recv_request.message());
524
525 cli_stream->Write(send_request, tag(5));
526 srv_stream.Read(&recv_request, tag(6));
527 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
528
529 EXPECT_EQ(send_request.message(), recv_request.message());
530 cli_stream->WritesDone(tag(7));
531 srv_stream.Read(&recv_request, tag(8));
532 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
533
534 send_response.set_message(recv_request.message());
535 srv_stream.Finish(send_response, Status::OK, tag(9));
536 cli_stream->Finish(&recv_status, tag(10));
537 Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
538
539 EXPECT_EQ(send_response.message(), recv_response.message());
540 EXPECT_TRUE(recv_status.ok());
541 }
542
543 // Two pings and a final pong.
TEST_P(AsyncEnd2endTest,SimpleClientStreamingWithCoalescingApi)544 TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
545 ResetStub();
546
547 EchoRequest send_request;
548 EchoRequest recv_request;
549 EchoResponse send_response;
550 EchoResponse recv_response;
551 Status recv_status;
552 ClientContext cli_ctx;
553 ServerContext srv_ctx;
554 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
555
556 send_request.set_message(GetParam().message_content);
557 cli_ctx.set_initial_metadata_corked(true);
558 // tag:1 never comes up since no op is performed
559 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
560 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
561
562 service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
563 tag(2));
564
565 cli_stream->Write(send_request, tag(3));
566
567 bool seen3 = false;
568
569 Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
570
571 srv_stream.Read(&recv_request, tag(4));
572
573 Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
574
575 EXPECT_EQ(send_request.message(), recv_request.message());
576
577 cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
578 srv_stream.Read(&recv_request, tag(6));
579 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
580 EXPECT_EQ(send_request.message(), recv_request.message());
581
582 srv_stream.Read(&recv_request, tag(7));
583 Verifier().Expect(7, false).Verify(cq_.get());
584
585 send_response.set_message(recv_request.message());
586 srv_stream.Finish(send_response, Status::OK, tag(8));
587 cli_stream->Finish(&recv_status, tag(9));
588 Verifier().Expect(8, true).Expect(9, true).Verify(cq_.get());
589
590 EXPECT_EQ(send_response.message(), recv_response.message());
591 EXPECT_TRUE(recv_status.ok());
592 }
593
594 // One ping, two pongs.
TEST_P(AsyncEnd2endTest,SimpleServerStreaming)595 TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
596 ResetStub();
597
598 EchoRequest send_request;
599 EchoRequest recv_request;
600 EchoResponse send_response;
601 EchoResponse recv_response;
602 Status recv_status;
603 ClientContext cli_ctx;
604 ServerContext srv_ctx;
605 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
606
607 send_request.set_message(GetParam().message_content);
608 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
609 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
610
611 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
612 cq_.get(), cq_.get(), tag(2));
613
614 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
615 EXPECT_EQ(send_request.message(), recv_request.message());
616
617 send_response.set_message(recv_request.message());
618 srv_stream.Write(send_response, tag(3));
619 cli_stream->Read(&recv_response, tag(4));
620 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
621 EXPECT_EQ(send_response.message(), recv_response.message());
622
623 srv_stream.Write(send_response, tag(5));
624 cli_stream->Read(&recv_response, tag(6));
625 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
626 EXPECT_EQ(send_response.message(), recv_response.message());
627
628 srv_stream.Finish(Status::OK, tag(7));
629 cli_stream->Read(&recv_response, tag(8));
630 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
631
632 cli_stream->Finish(&recv_status, tag(9));
633 Verifier().Expect(9, true).Verify(cq_.get());
634
635 EXPECT_TRUE(recv_status.ok());
636 }
637
638 // One ping, two pongs. Using WriteAndFinish API
TEST_P(AsyncEnd2endTest,SimpleServerStreamingWithCoalescingApiWAF)639 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
640 ResetStub();
641
642 EchoRequest send_request;
643 EchoRequest recv_request;
644 EchoResponse send_response;
645 EchoResponse recv_response;
646 Status recv_status;
647 ClientContext cli_ctx;
648 ServerContext srv_ctx;
649 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
650
651 send_request.set_message(GetParam().message_content);
652 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
653 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
654
655 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
656 cq_.get(), cq_.get(), tag(2));
657
658 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
659 EXPECT_EQ(send_request.message(), recv_request.message());
660
661 send_response.set_message(recv_request.message());
662 srv_stream.Write(send_response, tag(3));
663 cli_stream->Read(&recv_response, tag(4));
664 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
665 EXPECT_EQ(send_response.message(), recv_response.message());
666
667 srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
668 cli_stream->Read(&recv_response, tag(6));
669 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
670 EXPECT_EQ(send_response.message(), recv_response.message());
671
672 cli_stream->Read(&recv_response, tag(7));
673 Verifier().Expect(7, false).Verify(cq_.get());
674
675 cli_stream->Finish(&recv_status, tag(8));
676 Verifier().Expect(8, true).Verify(cq_.get());
677
678 EXPECT_TRUE(recv_status.ok());
679 }
680
681 // One ping, two pongs. Using WriteLast API
TEST_P(AsyncEnd2endTest,SimpleServerStreamingWithCoalescingApiWL)682 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
683 ResetStub();
684
685 EchoRequest send_request;
686 EchoRequest recv_request;
687 EchoResponse send_response;
688 EchoResponse recv_response;
689 Status recv_status;
690 ClientContext cli_ctx;
691 ServerContext srv_ctx;
692 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
693
694 send_request.set_message(GetParam().message_content);
695 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
696 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
697
698 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
699 cq_.get(), cq_.get(), tag(2));
700
701 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
702 EXPECT_EQ(send_request.message(), recv_request.message());
703
704 send_response.set_message(recv_request.message());
705 srv_stream.Write(send_response, tag(3));
706 cli_stream->Read(&recv_response, tag(4));
707 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
708 EXPECT_EQ(send_response.message(), recv_response.message());
709
710 srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
711 cli_stream->Read(&recv_response, tag(6));
712 srv_stream.Finish(Status::OK, tag(7));
713 Verifier().Expect(5, true).Expect(6, true).Expect(7, true).Verify(cq_.get());
714 EXPECT_EQ(send_response.message(), recv_response.message());
715
716 cli_stream->Read(&recv_response, tag(8));
717 Verifier().Expect(8, false).Verify(cq_.get());
718
719 cli_stream->Finish(&recv_status, tag(9));
720 Verifier().Expect(9, true).Verify(cq_.get());
721
722 EXPECT_TRUE(recv_status.ok());
723 }
724
725 // One ping, one pong.
TEST_P(AsyncEnd2endTest,SimpleBidiStreaming)726 TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
727 ResetStub();
728
729 EchoRequest send_request;
730 EchoRequest recv_request;
731 EchoResponse send_response;
732 EchoResponse recv_response;
733 Status recv_status;
734 ClientContext cli_ctx;
735 ServerContext srv_ctx;
736 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
737
738 send_request.set_message(GetParam().message_content);
739 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
740 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
741
742 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
743 tag(2));
744
745 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
746
747 cli_stream->Write(send_request, tag(3));
748 srv_stream.Read(&recv_request, tag(4));
749 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
750 EXPECT_EQ(send_request.message(), recv_request.message());
751
752 send_response.set_message(recv_request.message());
753 srv_stream.Write(send_response, tag(5));
754 cli_stream->Read(&recv_response, tag(6));
755 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
756 EXPECT_EQ(send_response.message(), recv_response.message());
757
758 cli_stream->WritesDone(tag(7));
759 srv_stream.Read(&recv_request, tag(8));
760 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
761
762 srv_stream.Finish(Status::OK, tag(9));
763 cli_stream->Finish(&recv_status, tag(10));
764 Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
765
766 EXPECT_TRUE(recv_status.ok());
767 }
768
769 // One ping, one pong. Using server:WriteAndFinish api
TEST_P(AsyncEnd2endTest,SimpleBidiStreamingWithCoalescingApiWAF)770 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
771 ResetStub();
772
773 EchoRequest send_request;
774 EchoRequest recv_request;
775 EchoResponse send_response;
776 EchoResponse recv_response;
777 Status recv_status;
778 ClientContext cli_ctx;
779 ServerContext srv_ctx;
780 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
781
782 send_request.set_message(GetParam().message_content);
783 cli_ctx.set_initial_metadata_corked(true);
784 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
785 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
786
787 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
788 tag(2));
789
790 cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
791
792 bool seen3 = false;
793
794 Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
795
796 srv_stream.Read(&recv_request, tag(4));
797
798 Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
799 EXPECT_EQ(send_request.message(), recv_request.message());
800
801 srv_stream.Read(&recv_request, tag(5));
802 Verifier().Expect(5, false).Verify(cq_.get());
803
804 send_response.set_message(recv_request.message());
805 srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
806 cli_stream->Read(&recv_response, tag(7));
807 Verifier().Expect(6, true).Expect(7, true).Verify(cq_.get());
808 EXPECT_EQ(send_response.message(), recv_response.message());
809
810 cli_stream->Finish(&recv_status, tag(8));
811 Verifier().Expect(8, true).Verify(cq_.get());
812
813 EXPECT_TRUE(recv_status.ok());
814 }
815
816 // One ping, one pong. Using server:WriteLast api
TEST_P(AsyncEnd2endTest,SimpleBidiStreamingWithCoalescingApiWL)817 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
818 ResetStub();
819
820 EchoRequest send_request;
821 EchoRequest recv_request;
822 EchoResponse send_response;
823 EchoResponse recv_response;
824 Status recv_status;
825 ClientContext cli_ctx;
826 ServerContext srv_ctx;
827 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
828
829 send_request.set_message(GetParam().message_content);
830 cli_ctx.set_initial_metadata_corked(true);
831 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
832 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
833
834 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
835 tag(2));
836
837 cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
838
839 bool seen3 = false;
840
841 Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
842
843 srv_stream.Read(&recv_request, tag(4));
844
845 Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
846 EXPECT_EQ(send_request.message(), recv_request.message());
847
848 srv_stream.Read(&recv_request, tag(5));
849 Verifier().Expect(5, false).Verify(cq_.get());
850
851 send_response.set_message(recv_request.message());
852 srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
853 srv_stream.Finish(Status::OK, tag(7));
854 cli_stream->Read(&recv_response, tag(8));
855 Verifier().Expect(6, true).Expect(7, true).Expect(8, true).Verify(cq_.get());
856 EXPECT_EQ(send_response.message(), recv_response.message());
857
858 cli_stream->Finish(&recv_status, tag(9));
859 Verifier().Expect(9, true).Verify(cq_.get());
860
861 EXPECT_TRUE(recv_status.ok());
862 }
863
864 // Metadata tests
TEST_P(AsyncEnd2endTest,ClientInitialMetadataRpc)865 TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
866 ResetStub();
867
868 EchoRequest send_request;
869 EchoRequest recv_request;
870 EchoResponse send_response;
871 EchoResponse recv_response;
872 Status recv_status;
873
874 ClientContext cli_ctx;
875 ServerContext srv_ctx;
876 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
877
878 send_request.set_message(GetParam().message_content);
879 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
880 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
881 std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz");
882 cli_ctx.AddMetadata(meta1.first, meta1.second);
883 cli_ctx.AddMetadata(meta2.first, meta2.second);
884 cli_ctx.AddMetadata(meta3.first, meta3.second);
885
886 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
887 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
888 response_reader->Finish(&recv_response, &recv_status, tag(4));
889
890 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
891 cq_.get(), tag(2));
892 Verifier().Expect(2, true).Verify(cq_.get());
893 EXPECT_EQ(send_request.message(), recv_request.message());
894 const auto& client_initial_metadata = srv_ctx.client_metadata();
895 EXPECT_EQ(meta1.second,
896 ToString(client_initial_metadata.find(meta1.first)->second));
897 EXPECT_EQ(meta2.second,
898 ToString(client_initial_metadata.find(meta2.first)->second));
899 EXPECT_EQ(meta3.second,
900 ToString(client_initial_metadata.find(meta3.first)->second));
901 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
902
903 send_response.set_message(recv_request.message());
904 response_writer.Finish(send_response, Status::OK, tag(3));
905 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
906
907 EXPECT_EQ(send_response.message(), recv_response.message());
908 EXPECT_TRUE(recv_status.ok());
909 }
910
TEST_P(AsyncEnd2endTest,ServerInitialMetadataRpc)911 TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
912 ResetStub();
913
914 EchoRequest send_request;
915 EchoRequest recv_request;
916 EchoResponse send_response;
917 EchoResponse recv_response;
918 Status recv_status;
919
920 ClientContext cli_ctx;
921 ServerContext srv_ctx;
922 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
923
924 send_request.set_message(GetParam().message_content);
925 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
926 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
927
928 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
929 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
930 response_reader->ReadInitialMetadata(tag(4));
931
932 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
933 cq_.get(), tag(2));
934 Verifier().Expect(2, true).Verify(cq_.get());
935 EXPECT_EQ(send_request.message(), recv_request.message());
936 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
937 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
938 response_writer.SendInitialMetadata(tag(3));
939 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
940 const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
941 EXPECT_EQ(meta1.second,
942 ToString(server_initial_metadata.find(meta1.first)->second));
943 EXPECT_EQ(meta2.second,
944 ToString(server_initial_metadata.find(meta2.first)->second));
945 EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
946
947 send_response.set_message(recv_request.message());
948 response_writer.Finish(send_response, Status::OK, tag(5));
949 response_reader->Finish(&recv_response, &recv_status, tag(6));
950 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
951
952 EXPECT_EQ(send_response.message(), recv_response.message());
953 EXPECT_TRUE(recv_status.ok());
954 }
955
TEST_P(AsyncEnd2endTest,ServerTrailingMetadataRpc)956 TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
957 ResetStub();
958
959 EchoRequest send_request;
960 EchoRequest recv_request;
961 EchoResponse send_response;
962 EchoResponse recv_response;
963 Status recv_status;
964
965 ClientContext cli_ctx;
966 ServerContext srv_ctx;
967 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
968
969 send_request.set_message(GetParam().message_content);
970 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
971 std::pair<grpc::string, grpc::string> meta2("key2", "val2");
972
973 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
974 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
975 response_reader->Finish(&recv_response, &recv_status, tag(5));
976
977 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
978 cq_.get(), tag(2));
979 Verifier().Expect(2, true).Verify(cq_.get());
980 EXPECT_EQ(send_request.message(), recv_request.message());
981 response_writer.SendInitialMetadata(tag(3));
982 Verifier().Expect(3, true).Verify(cq_.get());
983
984 send_response.set_message(recv_request.message());
985 srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
986 srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
987 response_writer.Finish(send_response, Status::OK, tag(4));
988
989 Verifier().Expect(4, true).Expect(5, true).Verify(cq_.get());
990
991 EXPECT_EQ(send_response.message(), recv_response.message());
992 EXPECT_TRUE(recv_status.ok());
993 const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
994 EXPECT_EQ(meta1.second,
995 ToString(server_trailing_metadata.find(meta1.first)->second));
996 EXPECT_EQ(meta2.second,
997 ToString(server_trailing_metadata.find(meta2.first)->second));
998 EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
999 }
1000
TEST_P(AsyncEnd2endTest,MetadataRpc)1001 TEST_P(AsyncEnd2endTest, MetadataRpc) {
1002 ResetStub();
1003
1004 EchoRequest send_request;
1005 EchoRequest recv_request;
1006 EchoResponse send_response;
1007 EchoResponse recv_response;
1008 Status recv_status;
1009
1010 ClientContext cli_ctx;
1011 ServerContext srv_ctx;
1012 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1013
1014 send_request.set_message(GetParam().message_content);
1015 std::pair<grpc::string, grpc::string> meta1("key1", "val1");
1016 std::pair<grpc::string, grpc::string> meta2(
1017 "key2-bin",
1018 grpc::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
1019 std::pair<grpc::string, grpc::string> meta3("key3", "val3");
1020 std::pair<grpc::string, grpc::string> meta6(
1021 "key4-bin",
1022 grpc::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
1023 14));
1024 std::pair<grpc::string, grpc::string> meta5("key5", "val5");
1025 std::pair<grpc::string, grpc::string> meta4(
1026 "key6-bin",
1027 grpc::string(
1028 "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
1029
1030 cli_ctx.AddMetadata(meta1.first, meta1.second);
1031 cli_ctx.AddMetadata(meta2.first, meta2.second);
1032
1033 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1034 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1035 response_reader->ReadInitialMetadata(tag(4));
1036
1037 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1038 cq_.get(), tag(2));
1039 Verifier().Expect(2, true).Verify(cq_.get());
1040 EXPECT_EQ(send_request.message(), recv_request.message());
1041 const auto& client_initial_metadata = srv_ctx.client_metadata();
1042 EXPECT_EQ(meta1.second,
1043 ToString(client_initial_metadata.find(meta1.first)->second));
1044 EXPECT_EQ(meta2.second,
1045 ToString(client_initial_metadata.find(meta2.first)->second));
1046 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
1047
1048 srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
1049 srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
1050 response_writer.SendInitialMetadata(tag(3));
1051 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1052 const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1053 EXPECT_EQ(meta3.second,
1054 ToString(server_initial_metadata.find(meta3.first)->second));
1055 EXPECT_EQ(meta4.second,
1056 ToString(server_initial_metadata.find(meta4.first)->second));
1057 EXPECT_GE(server_initial_metadata.size(), static_cast<size_t>(2));
1058
1059 send_response.set_message(recv_request.message());
1060 srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
1061 srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
1062 response_writer.Finish(send_response, Status::OK, tag(5));
1063 response_reader->Finish(&recv_response, &recv_status, tag(6));
1064
1065 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1066
1067 EXPECT_EQ(send_response.message(), recv_response.message());
1068 EXPECT_TRUE(recv_status.ok());
1069 const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1070 EXPECT_EQ(meta5.second,
1071 ToString(server_trailing_metadata.find(meta5.first)->second));
1072 EXPECT_EQ(meta6.second,
1073 ToString(server_trailing_metadata.find(meta6.first)->second));
1074 EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
1075 }
1076
1077 // Server uses AsyncNotifyWhenDone API to check for cancellation
TEST_P(AsyncEnd2endTest,ServerCheckCancellation)1078 TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
1079 ResetStub();
1080
1081 EchoRequest send_request;
1082 EchoRequest recv_request;
1083 EchoResponse send_response;
1084 EchoResponse recv_response;
1085 Status recv_status;
1086
1087 ClientContext cli_ctx;
1088 ServerContext srv_ctx;
1089 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1090
1091 send_request.set_message(GetParam().message_content);
1092 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1093 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1094 response_reader->Finish(&recv_response, &recv_status, tag(4));
1095
1096 srv_ctx.AsyncNotifyWhenDone(tag(5));
1097 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1098 cq_.get(), tag(2));
1099
1100 Verifier().Expect(2, true).Verify(cq_.get());
1101 EXPECT_EQ(send_request.message(), recv_request.message());
1102
1103 cli_ctx.TryCancel();
1104 Verifier().Expect(5, true).Expect(4, true).Verify(cq_.get());
1105 EXPECT_TRUE(srv_ctx.IsCancelled());
1106
1107 EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
1108 }
1109
1110 // Server uses AsyncNotifyWhenDone API to check for normal finish
TEST_P(AsyncEnd2endTest,ServerCheckDone)1111 TEST_P(AsyncEnd2endTest, ServerCheckDone) {
1112 ResetStub();
1113
1114 EchoRequest send_request;
1115 EchoRequest recv_request;
1116 EchoResponse send_response;
1117 EchoResponse recv_response;
1118 Status recv_status;
1119
1120 ClientContext cli_ctx;
1121 ServerContext srv_ctx;
1122 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1123
1124 send_request.set_message(GetParam().message_content);
1125 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1126 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1127 response_reader->Finish(&recv_response, &recv_status, tag(4));
1128
1129 srv_ctx.AsyncNotifyWhenDone(tag(5));
1130 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1131 cq_.get(), tag(2));
1132
1133 Verifier().Expect(2, true).Verify(cq_.get());
1134 EXPECT_EQ(send_request.message(), recv_request.message());
1135
1136 send_response.set_message(recv_request.message());
1137 response_writer.Finish(send_response, Status::OK, tag(3));
1138 Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
1139 EXPECT_FALSE(srv_ctx.IsCancelled());
1140
1141 EXPECT_EQ(send_response.message(), recv_response.message());
1142 EXPECT_TRUE(recv_status.ok());
1143 }
1144
TEST_P(AsyncEnd2endTest,UnimplementedRpc)1145 TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
1146 ChannelArguments args;
1147 const auto& channel_creds = GetCredentialsProvider()->GetChannelCredentials(
1148 GetParam().credentials_type, &args);
1149 std::shared_ptr<Channel> channel =
1150 !(GetParam().inproc)
1151 ? CreateCustomChannel(server_address_.str(), channel_creds, args)
1152 : server_->InProcessChannel(args);
1153 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1154 stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
1155 EchoRequest send_request;
1156 EchoResponse recv_response;
1157 Status recv_status;
1158
1159 ClientContext cli_ctx;
1160 send_request.set_message(GetParam().message_content);
1161 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1162 stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
1163
1164 response_reader->Finish(&recv_response, &recv_status, tag(4));
1165 Verifier().Expect(4, true).Verify(cq_.get());
1166
1167 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
1168 EXPECT_EQ("", recv_status.error_message());
1169 }
1170
1171 // This class is for testing scenarios where RPCs are cancelled on the server
1172 // by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
1173 // API to check for cancellation
1174 class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
1175 protected:
1176 typedef enum {
1177 DO_NOT_CANCEL = 0,
1178 CANCEL_BEFORE_PROCESSING,
1179 CANCEL_DURING_PROCESSING,
1180 CANCEL_AFTER_PROCESSING
1181 } ServerTryCancelRequestPhase;
1182
1183 // Helper for testing client-streaming RPCs which are cancelled on the server.
1184 // Depending on the value of server_try_cancel parameter, this will test one
1185 // of the following three scenarios:
1186 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
1187 // any messages from the client
1188 //
1189 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1190 // messages from the client
1191 //
1192 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1193 // messages from the client (but before sending any status back to the
1194 // client)
TestClientStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1195 void TestClientStreamingServerCancel(
1196 ServerTryCancelRequestPhase server_try_cancel) {
1197 ResetStub();
1198
1199 EchoRequest recv_request;
1200 EchoResponse send_response;
1201 EchoResponse recv_response;
1202 Status recv_status;
1203
1204 ClientContext cli_ctx;
1205 ServerContext srv_ctx;
1206 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1207
1208 // Initiate the 'RequestStream' call on client
1209 CompletionQueue cli_cq;
1210
1211 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
1212 stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1)));
1213
1214 // On the server, request to be notified of 'RequestStream' calls
1215 // and receive the 'RequestStream' call just made by the client
1216 srv_ctx.AsyncNotifyWhenDone(tag(11));
1217 service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1218 tag(2));
1219 std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1220 Verifier().Expect(2, true).Verify(cq_.get());
1221 t1.join();
1222
1223 bool expected_server_cq_result = true;
1224 bool expected_client_cq_result = true;
1225
1226 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1227 srv_ctx.TryCancel();
1228 Verifier().Expect(11, true).Verify(cq_.get());
1229 EXPECT_TRUE(srv_ctx.IsCancelled());
1230
1231 // Since cancellation is done before server reads any results, we know
1232 // for sure that all server cq results will return false from this
1233 // point forward
1234 expected_server_cq_result = false;
1235 expected_client_cq_result = false;
1236 }
1237
1238 bool ignore_client_cq_result =
1239 (server_try_cancel == CANCEL_DURING_PROCESSING) ||
1240 (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1241
1242 std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1243 &ignore_client_cq_result] {
1244 EchoRequest send_request;
1245 // Client sends 3 messages (tags 3, 4 and 5)
1246 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1247 send_request.set_message("Ping " + grpc::to_string(tag_idx));
1248 cli_stream->Write(send_request, tag(tag_idx));
1249 Verifier()
1250 .Expect(tag_idx, expected_client_cq_result)
1251 .Verify(&cli_cq, ignore_client_cq_result);
1252 }
1253 cli_stream->WritesDone(tag(6));
1254 // Ignore ok on WritesDone since cancel can affect it
1255 Verifier()
1256 .Expect(6, expected_client_cq_result)
1257 .Verify(&cli_cq, ignore_client_cq_result);
1258 });
1259
1260 bool ignore_cq_result = false;
1261 bool want_done_tag = false;
1262 std::thread* server_try_cancel_thd = nullptr;
1263
1264 auto verif = Verifier();
1265
1266 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1267 server_try_cancel_thd =
1268 new std::thread(&ServerContext::TryCancel, &srv_ctx);
1269 // Server will cancel the RPC in a parallel thread while reading the
1270 // requests from the client. Since the cancellation can happen at anytime,
1271 // some of the cq results (i.e those until cancellation) might be true but
1272 // its non deterministic. So better to ignore the cq results
1273 ignore_cq_result = true;
1274 // Expect that we might possibly see the done tag that
1275 // indicates cancellation completion in this case
1276 want_done_tag = true;
1277 verif.Expect(11, true);
1278 }
1279
1280 // Server reads 3 messages (tags 6, 7 and 8)
1281 // But if want_done_tag is true, we might also see tag 11
1282 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1283 srv_stream.Read(&recv_request, tag(tag_idx));
1284 // Note that we'll add something to the verifier and verify that
1285 // something was seen, but it might be tag 11 and not what we
1286 // just added
1287 int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
1288 .Next(cq_.get(), ignore_cq_result);
1289 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1290 if (got_tag == 11) {
1291 EXPECT_TRUE(srv_ctx.IsCancelled());
1292 want_done_tag = false;
1293 // Now get the other entry that we were waiting on
1294 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1295 }
1296 }
1297
1298 cli_thread.join();
1299
1300 if (server_try_cancel_thd != nullptr) {
1301 server_try_cancel_thd->join();
1302 delete server_try_cancel_thd;
1303 }
1304
1305 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1306 srv_ctx.TryCancel();
1307 want_done_tag = true;
1308 verif.Expect(11, true);
1309 }
1310
1311 if (want_done_tag) {
1312 verif.Verify(cq_.get());
1313 EXPECT_TRUE(srv_ctx.IsCancelled());
1314 want_done_tag = false;
1315 }
1316
1317 // The RPC has been cancelled at this point for sure (i.e irrespective of
1318 // the value of `server_try_cancel` is). So, from this point forward, we
1319 // know that cq results are supposed to return false on server.
1320
1321 // Server sends the final message and cancelled status (but the RPC is
1322 // already cancelled at this point. So we expect the operation to fail)
1323 srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
1324 Verifier().Expect(9, false).Verify(cq_.get());
1325
1326 // Client will see the cancellation
1327 cli_stream->Finish(&recv_status, tag(10));
1328 Verifier().Expect(10, true).Verify(&cli_cq);
1329 EXPECT_FALSE(recv_status.ok());
1330 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1331
1332 cli_cq.Shutdown();
1333 void* dummy_tag;
1334 bool dummy_ok;
1335 while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
1336 }
1337 }
1338
1339 // Helper for testing server-streaming RPCs which are cancelled on the server.
1340 // Depending on the value of server_try_cancel parameter, this will test one
1341 // of the following three scenarios:
1342 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
1343 // any messages to the client
1344 //
1345 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
1346 // messages to the client
1347 //
1348 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
1349 // messages to the client (but before sending any status back to the
1350 // client)
TestServerStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1351 void TestServerStreamingServerCancel(
1352 ServerTryCancelRequestPhase server_try_cancel) {
1353 ResetStub();
1354
1355 EchoRequest send_request;
1356 EchoRequest recv_request;
1357 EchoResponse send_response;
1358 Status recv_status;
1359 ClientContext cli_ctx;
1360 ServerContext srv_ctx;
1361 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1362
1363 send_request.set_message("Ping");
1364 // Initiate the 'ResponseStream' call on the client
1365 CompletionQueue cli_cq;
1366 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1367 stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1)));
1368 // On the server, request to be notified of 'ResponseStream' calls and
1369 // receive the call just made by the client
1370 srv_ctx.AsyncNotifyWhenDone(tag(11));
1371 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1372 cq_.get(), cq_.get(), tag(2));
1373
1374 std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1375 Verifier().Expect(2, true).Verify(cq_.get());
1376 t1.join();
1377
1378 EXPECT_EQ(send_request.message(), recv_request.message());
1379
1380 bool expected_cq_result = true;
1381 bool ignore_cq_result = false;
1382 bool want_done_tag = false;
1383 bool expected_client_cq_result = true;
1384 bool ignore_client_cq_result =
1385 (server_try_cancel != CANCEL_BEFORE_PROCESSING);
1386
1387 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1388 srv_ctx.TryCancel();
1389 Verifier().Expect(11, true).Verify(cq_.get());
1390 EXPECT_TRUE(srv_ctx.IsCancelled());
1391
1392 // We know for sure that all cq results will be false from this point
1393 // since the server cancelled the RPC
1394 expected_cq_result = false;
1395 expected_client_cq_result = false;
1396 }
1397
1398 std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1399 &ignore_client_cq_result] {
1400 // Client attempts to read the three messages from the server
1401 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1402 EchoResponse recv_response;
1403 cli_stream->Read(&recv_response, tag(tag_idx));
1404 Verifier()
1405 .Expect(tag_idx, expected_client_cq_result)
1406 .Verify(&cli_cq, ignore_client_cq_result);
1407 }
1408 });
1409
1410 std::thread* server_try_cancel_thd = nullptr;
1411
1412 auto verif = Verifier();
1413
1414 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1415 server_try_cancel_thd =
1416 new std::thread(&ServerContext::TryCancel, &srv_ctx);
1417
1418 // Server will cancel the RPC in a parallel thread while writing responses
1419 // to the client. Since the cancellation can happen at anytime, some of
1420 // the cq results (i.e those until cancellation) might be true but it is
1421 // non deterministic. So better to ignore the cq results
1422 ignore_cq_result = true;
1423 // Expect that we might possibly see the done tag that
1424 // indicates cancellation completion in this case
1425 want_done_tag = true;
1426 verif.Expect(11, true);
1427 }
1428
1429 // Server sends three messages (tags 3, 4 and 5)
1430 // But if want_done tag is true, we might also see tag 11
1431 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1432 send_response.set_message("Pong " + grpc::to_string(tag_idx));
1433 srv_stream.Write(send_response, tag(tag_idx));
1434 // Note that we'll add something to the verifier and verify that
1435 // something was seen, but it might be tag 11 and not what we
1436 // just added
1437 int got_tag = verif.Expect(tag_idx, expected_cq_result)
1438 .Next(cq_.get(), ignore_cq_result);
1439 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1440 if (got_tag == 11) {
1441 EXPECT_TRUE(srv_ctx.IsCancelled());
1442 want_done_tag = false;
1443 // Now get the other entry that we were waiting on
1444 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1445 }
1446 }
1447
1448 if (server_try_cancel_thd != nullptr) {
1449 server_try_cancel_thd->join();
1450 delete server_try_cancel_thd;
1451 }
1452
1453 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1454 srv_ctx.TryCancel();
1455 want_done_tag = true;
1456 verif.Expect(11, true);
1457 }
1458
1459 if (want_done_tag) {
1460 verif.Verify(cq_.get());
1461 EXPECT_TRUE(srv_ctx.IsCancelled());
1462 want_done_tag = false;
1463 }
1464
1465 cli_thread.join();
1466
1467 // The RPC has been cancelled at this point for sure (i.e irrespective of
1468 // the value of `server_try_cancel` is). So, from this point forward, we
1469 // know that cq results are supposed to return false on server.
1470
1471 // Server finishes the stream (but the RPC is already cancelled)
1472 srv_stream.Finish(Status::CANCELLED, tag(9));
1473 Verifier().Expect(9, false).Verify(cq_.get());
1474
1475 // Client will see the cancellation
1476 cli_stream->Finish(&recv_status, tag(10));
1477 Verifier().Expect(10, true).Verify(&cli_cq);
1478 EXPECT_FALSE(recv_status.ok());
1479 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1480
1481 cli_cq.Shutdown();
1482 void* dummy_tag;
1483 bool dummy_ok;
1484 while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
1485 }
1486 }
1487
1488 // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
1489 // server.
1490 //
1491 // Depending on the value of server_try_cancel parameter, this will
1492 // test one of the following three scenarios:
1493 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
1494 // writing any messages from/to the client
1495 //
1496 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1497 // messages from the client
1498 //
1499 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1500 // messages from the client (but before sending any status back to the
1501 // client)
TestBidiStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1502 void TestBidiStreamingServerCancel(
1503 ServerTryCancelRequestPhase server_try_cancel) {
1504 ResetStub();
1505
1506 EchoRequest send_request;
1507 EchoRequest recv_request;
1508 EchoResponse send_response;
1509 EchoResponse recv_response;
1510 Status recv_status;
1511 ClientContext cli_ctx;
1512 ServerContext srv_ctx;
1513 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1514
1515 // Initiate the call from the client side
1516 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
1517 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
1518
1519 // On the server, request to be notified of the 'BidiStream' call and
1520 // receive the call just made by the client
1521 srv_ctx.AsyncNotifyWhenDone(tag(11));
1522 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1523 tag(2));
1524 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1525
1526 auto verif = Verifier();
1527
1528 // Client sends the first and the only message
1529 send_request.set_message("Ping");
1530 cli_stream->Write(send_request, tag(3));
1531 verif.Expect(3, true);
1532
1533 bool expected_cq_result = true;
1534 bool ignore_cq_result = false;
1535 bool want_done_tag = false;
1536
1537 int got_tag, got_tag2;
1538 bool tag_3_done = false;
1539
1540 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1541 srv_ctx.TryCancel();
1542 verif.Expect(11, true);
1543 // We know for sure that all server cq results will be false from
1544 // this point since the server cancelled the RPC. However, we can't
1545 // say for sure about the client
1546 expected_cq_result = false;
1547 ignore_cq_result = true;
1548
1549 do {
1550 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1551 GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11));
1552 if (got_tag == 3) {
1553 tag_3_done = true;
1554 }
1555 } while (got_tag != 11);
1556 EXPECT_TRUE(srv_ctx.IsCancelled());
1557 }
1558
1559 std::thread* server_try_cancel_thd = nullptr;
1560
1561 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1562 server_try_cancel_thd =
1563 new std::thread(&ServerContext::TryCancel, &srv_ctx);
1564
1565 // Since server is going to cancel the RPC in a parallel thread, some of
1566 // the cq results (i.e those until the cancellation) might be true. Since
1567 // that number is non-deterministic, it is better to ignore the cq results
1568 ignore_cq_result = true;
1569 // Expect that we might possibly see the done tag that
1570 // indicates cancellation completion in this case
1571 want_done_tag = true;
1572 verif.Expect(11, true);
1573 }
1574
1575 srv_stream.Read(&recv_request, tag(4));
1576 verif.Expect(4, expected_cq_result);
1577 got_tag = tag_3_done ? 3 : verif.Next(cq_.get(), ignore_cq_result);
1578 got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1579 GPR_ASSERT((got_tag == 3) || (got_tag == 4) ||
1580 (got_tag == 11 && want_done_tag));
1581 GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) ||
1582 (got_tag2 == 11 && want_done_tag));
1583 // If we get 3 and 4, we don't need to wait for 11, but if
1584 // we get 11, we should also clear 3 and 4
1585 if (got_tag + got_tag2 != 7) {
1586 EXPECT_TRUE(srv_ctx.IsCancelled());
1587 want_done_tag = false;
1588 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1589 GPR_ASSERT((got_tag == 3) || (got_tag == 4));
1590 }
1591
1592 send_response.set_message("Pong");
1593 srv_stream.Write(send_response, tag(5));
1594 verif.Expect(5, expected_cq_result);
1595
1596 cli_stream->Read(&recv_response, tag(6));
1597 verif.Expect(6, expected_cq_result);
1598 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1599 got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1600 GPR_ASSERT((got_tag == 5) || (got_tag == 6) ||
1601 (got_tag == 11 && want_done_tag));
1602 GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) ||
1603 (got_tag2 == 11 && want_done_tag));
1604 // If we get 5 and 6, we don't need to wait for 11, but if
1605 // we get 11, we should also clear 5 and 6
1606 if (got_tag + got_tag2 != 11) {
1607 EXPECT_TRUE(srv_ctx.IsCancelled());
1608 want_done_tag = false;
1609 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1610 GPR_ASSERT((got_tag == 5) || (got_tag == 6));
1611 }
1612
1613 // This is expected to succeed in all cases
1614 cli_stream->WritesDone(tag(7));
1615 verif.Expect(7, true);
1616 // TODO(vjpai): Consider whether the following is too flexible
1617 // or whether it should just be reset to ignore_cq_result
1618 bool ignore_cq_wd_result =
1619 ignore_cq_result || (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1620 got_tag = verif.Next(cq_.get(), ignore_cq_wd_result);
1621 GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
1622 if (got_tag == 11) {
1623 EXPECT_TRUE(srv_ctx.IsCancelled());
1624 want_done_tag = false;
1625 // Now get the other entry that we were waiting on
1626 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_wd_result), 7);
1627 }
1628
1629 // This is expected to fail in all cases i.e for all values of
1630 // server_try_cancel. This is because at this point, either there are no
1631 // more msgs from the client (because client called WritesDone) or the RPC
1632 // is cancelled on the server
1633 srv_stream.Read(&recv_request, tag(8));
1634 verif.Expect(8, false);
1635 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1636 GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
1637 if (got_tag == 11) {
1638 EXPECT_TRUE(srv_ctx.IsCancelled());
1639 want_done_tag = false;
1640 // Now get the other entry that we were waiting on
1641 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
1642 }
1643
1644 if (server_try_cancel_thd != nullptr) {
1645 server_try_cancel_thd->join();
1646 delete server_try_cancel_thd;
1647 }
1648
1649 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1650 srv_ctx.TryCancel();
1651 want_done_tag = true;
1652 verif.Expect(11, true);
1653 }
1654
1655 if (want_done_tag) {
1656 verif.Verify(cq_.get());
1657 EXPECT_TRUE(srv_ctx.IsCancelled());
1658 want_done_tag = false;
1659 }
1660
1661 // The RPC has been cancelled at this point for sure (i.e irrespective of
1662 // the value of `server_try_cancel` is). So, from this point forward, we
1663 // know that cq results are supposed to return false on server.
1664
1665 srv_stream.Finish(Status::CANCELLED, tag(9));
1666 Verifier().Expect(9, false).Verify(cq_.get());
1667
1668 cli_stream->Finish(&recv_status, tag(10));
1669 Verifier().Expect(10, true).Verify(cq_.get());
1670 EXPECT_FALSE(recv_status.ok());
1671 EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1672 }
1673 };
1674
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelBefore)1675 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1676 TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1677 }
1678
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelDuring)1679 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1680 TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
1681 }
1682
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelAfter)1683 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1684 TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1685 }
1686
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelBefore)1687 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1688 TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1689 }
1690
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelDuring)1691 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1692 TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
1693 }
1694
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelAfter)1695 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1696 TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1697 }
1698
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelBefore)1699 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1700 TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1701 }
1702
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelDuring)1703 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1704 TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
1705 }
1706
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelAfter)1707 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1708 TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1709 }
1710
CreateTestScenarios(bool test_secure,bool test_message_size_limit)1711 std::vector<TestScenario> CreateTestScenarios(bool test_secure,
1712 bool test_message_size_limit) {
1713 std::vector<TestScenario> scenarios;
1714 std::vector<grpc::string> credentials_types;
1715 std::vector<grpc::string> messages;
1716
1717 auto insec_ok = [] {
1718 // Only allow insecure credentials type when it is registered with the
1719 // provider. User may create providers that do not have insecure.
1720 return GetCredentialsProvider()->GetChannelCredentials(
1721 kInsecureCredentialsType, nullptr) != nullptr;
1722 };
1723
1724 if (insec_ok()) {
1725 credentials_types.push_back(kInsecureCredentialsType);
1726 }
1727 auto sec_list = GetCredentialsProvider()->GetSecureCredentialsTypeList();
1728 for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
1729 credentials_types.push_back(*sec);
1730 }
1731 GPR_ASSERT(!credentials_types.empty());
1732
1733 messages.push_back("Hello");
1734 if (test_message_size_limit) {
1735 for (size_t k = 1; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024;
1736 k *= 32) {
1737 grpc::string big_msg;
1738 for (size_t i = 0; i < k * 1024; ++i) {
1739 char c = 'a' + (i % 26);
1740 big_msg += c;
1741 }
1742 messages.push_back(big_msg);
1743 }
1744 messages.push_back(
1745 grpc::string(GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH - 10, 'a'));
1746 }
1747
1748 // TODO (sreek) Renable tests with health check service after the issue
1749 // https://github.com/grpc/grpc/issues/11223 is resolved
1750 for (auto health_check_service : {false}) {
1751 for (auto msg = messages.begin(); msg != messages.end(); msg++) {
1752 for (auto cred = credentials_types.begin();
1753 cred != credentials_types.end(); ++cred) {
1754 scenarios.emplace_back(false, *cred, health_check_service, *msg);
1755 }
1756 if (insec_ok()) {
1757 scenarios.emplace_back(true, kInsecureCredentialsType,
1758 health_check_service, *msg);
1759 }
1760 }
1761 }
1762 return scenarios;
1763 }
1764
1765 INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
1766 ::testing::ValuesIn(CreateTestScenarios(true, true)));
1767 INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
1768 AsyncEnd2endServerTryCancelTest,
1769 ::testing::ValuesIn(CreateTestScenarios(false, false)));
1770
1771 } // namespace
1772 } // namespace testing
1773 } // namespace grpc
1774
main(int argc,char ** argv)1775 int main(int argc, char** argv) {
1776 // Change the backup poll interval from 5s to 100ms to speed up the
1777 // ReconnectChannel test
1778 gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "100");
1779 grpc_test_init(argc, argv);
1780 ::testing::InitGoogleTest(&argc, argv);
1781 int ret = RUN_ALL_TESTS();
1782 return ret;
1783 }
1784