1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <cinttypes>
20 #include <memory>
21 #include <thread>
22
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/channel.h>
28 #include <grpcpp/client_context.h>
29 #include <grpcpp/create_channel.h>
30 #include <grpcpp/ext/health_check_service_server_builder_option.h>
31 #include <grpcpp/server.h>
32 #include <grpcpp/server_builder.h>
33 #include <grpcpp/server_context.h>
34
35 #include "src/core/ext/filters/client_channel/backup_poller.h"
36 #include "src/core/lib/gpr/tls.h"
37 #include "src/core/lib/iomgr/port.h"
38 #include "src/proto/grpc/health/v1/health.grpc.pb.h"
39 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
40 #include "src/proto/grpc/testing/echo.grpc.pb.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/util/string_ref_helper.h"
44 #include "test/cpp/util/test_credentials_provider.h"
45
46 #ifdef GRPC_POSIX_SOCKET_EV
47 #include "src/core/lib/iomgr/ev_posix.h"
48 #endif // GRPC_POSIX_SOCKET_EV
49
50 #include <gtest/gtest.h>
51
52 using grpc::testing::EchoRequest;
53 using grpc::testing::EchoResponse;
54 using grpc::testing::kTlsCredentialsType;
55 using std::chrono::system_clock;
56
57 namespace grpc {
58 namespace testing {
59
60 namespace {
61
tag(int i)62 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
detag(void * p)63 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
64
65 class Verifier {
66 public:
Verifier()67 Verifier() : lambda_run_(false) {}
68 // Expect sets the expected ok value for a specific tag
Expect(int i,bool expect_ok)69 Verifier& Expect(int i, bool expect_ok) {
70 return ExpectUnless(i, expect_ok, false);
71 }
72 // ExpectUnless sets the expected ok value for a specific tag
73 // unless the tag was already marked seen (as a result of ExpectMaybe)
ExpectUnless(int i,bool expect_ok,bool seen)74 Verifier& ExpectUnless(int i, bool expect_ok, bool seen) {
75 if (!seen) {
76 expectations_[tag(i)] = expect_ok;
77 }
78 return *this;
79 }
80 // ExpectMaybe sets the expected ok value for a specific tag, but does not
81 // require it to appear
82 // If it does, sets *seen to true
ExpectMaybe(int i,bool expect_ok,bool * seen)83 Verifier& ExpectMaybe(int i, bool expect_ok, bool* seen) {
84 if (!*seen) {
85 maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen};
86 }
87 return *this;
88 }
89
90 // Next waits for 1 async tag to complete, checks its
91 // expectations, and returns the tag
Next(CompletionQueue * cq,bool ignore_ok)92 int Next(CompletionQueue* cq, bool ignore_ok) {
93 bool ok;
94 void* got_tag;
95 EXPECT_TRUE(cq->Next(&got_tag, &ok));
96 GotTag(got_tag, ok, ignore_ok);
97 return detag(got_tag);
98 }
99
100 template <typename T>
DoOnceThenAsyncNext(CompletionQueue * cq,void ** got_tag,bool * ok,T deadline,std::function<void (void)> lambda)101 CompletionQueue::NextStatus DoOnceThenAsyncNext(
102 CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
103 std::function<void(void)> lambda) {
104 if (lambda_run_) {
105 return cq->AsyncNext(got_tag, ok, deadline);
106 } else {
107 lambda_run_ = true;
108 return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
109 }
110 }
111
112 // Verify keeps calling Next until all currently set
113 // expected tags are complete
Verify(CompletionQueue * cq)114 void Verify(CompletionQueue* cq) { Verify(cq, false); }
115
116 // This version of Verify allows optionally ignoring the
117 // outcome of the expectation
Verify(CompletionQueue * cq,bool ignore_ok)118 void Verify(CompletionQueue* cq, bool ignore_ok) {
119 GPR_ASSERT(!expectations_.empty() || !maybe_expectations_.empty());
120 while (!expectations_.empty()) {
121 Next(cq, ignore_ok);
122 }
123 }
124
125 // This version of Verify stops after a certain deadline
Verify(CompletionQueue * cq,std::chrono::system_clock::time_point deadline)126 void Verify(CompletionQueue* cq,
127 std::chrono::system_clock::time_point deadline) {
128 if (expectations_.empty()) {
129 bool ok;
130 void* got_tag;
131 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
132 CompletionQueue::TIMEOUT);
133 } else {
134 while (!expectations_.empty()) {
135 bool ok;
136 void* got_tag;
137 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
138 CompletionQueue::GOT_EVENT);
139 GotTag(got_tag, ok, false);
140 }
141 }
142 }
143
144 // This version of Verify stops after a certain deadline, and uses the
145 // DoThenAsyncNext API
146 // to call the lambda
Verify(CompletionQueue * cq,std::chrono::system_clock::time_point deadline,const std::function<void (void)> & lambda)147 void Verify(CompletionQueue* cq,
148 std::chrono::system_clock::time_point deadline,
149 const std::function<void(void)>& lambda) {
150 if (expectations_.empty()) {
151 bool ok;
152 void* got_tag;
153 EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
154 CompletionQueue::TIMEOUT);
155 } else {
156 while (!expectations_.empty()) {
157 bool ok;
158 void* got_tag;
159 EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
160 CompletionQueue::GOT_EVENT);
161 GotTag(got_tag, ok, false);
162 }
163 }
164 }
165
166 private:
GotTag(void * got_tag,bool ok,bool ignore_ok)167 void GotTag(void* got_tag, bool ok, bool ignore_ok) {
168 auto it = expectations_.find(got_tag);
169 if (it != expectations_.end()) {
170 if (!ignore_ok) {
171 EXPECT_EQ(it->second, ok);
172 }
173 expectations_.erase(it);
174 } else {
175 auto it2 = maybe_expectations_.find(got_tag);
176 if (it2 != maybe_expectations_.end()) {
177 if (it2->second.seen != nullptr) {
178 EXPECT_FALSE(*it2->second.seen);
179 *it2->second.seen = true;
180 }
181 if (!ignore_ok) {
182 EXPECT_EQ(it2->second.ok, ok);
183 }
184 } else {
185 gpr_log(GPR_ERROR, "Unexpected tag: %p", got_tag);
186 abort();
187 }
188 }
189 }
190
191 struct MaybeExpect {
192 bool ok;
193 bool* seen;
194 };
195
196 std::map<void*, bool> expectations_;
197 std::map<void*, MaybeExpect> maybe_expectations_;
198 bool lambda_run_;
199 };
200
plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin> & plugin)201 bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
202 return plugin->has_sync_methods();
203 }
204
205 // This class disables the server builder plugins that may add sync services to
206 // the server. If there are sync services, UnimplementedRpc test will triger
207 // the sync unknown rpc routine on the server side, rather than the async one
208 // that needs to be tested here.
209 class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
210 public:
UpdateArguments(ChannelArguments *)211 void UpdateArguments(ChannelArguments* /*arg*/) override {}
212
UpdatePlugins(std::vector<std::unique_ptr<ServerBuilderPlugin>> * plugins)213 void UpdatePlugins(
214 std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) override {
215 plugins->erase(std::remove_if(plugins->begin(), plugins->end(),
216 plugin_has_sync_methods),
217 plugins->end());
218 }
219 };
220
221 class TestScenario {
222 public:
TestScenario(bool inproc_stub,const std::string & creds_type,bool hcs,const std::string & content)223 TestScenario(bool inproc_stub, const std::string& creds_type, bool hcs,
224 const std::string& content)
225 : inproc(inproc_stub),
226 health_check_service(hcs),
227 credentials_type(creds_type),
228 message_content(content) {}
229 void Log() const;
230 bool inproc;
231 bool health_check_service;
232 const std::string credentials_type;
233 const std::string message_content;
234 };
235
operator <<(std::ostream & out,const TestScenario & scenario)236 static std::ostream& operator<<(std::ostream& out,
237 const TestScenario& scenario) {
238 return out << "TestScenario{inproc=" << (scenario.inproc ? "true" : "false")
239 << ", credentials='" << scenario.credentials_type
240 << ", health_check_service="
241 << (scenario.health_check_service ? "true" : "false")
242 << "', message_size=" << scenario.message_content.size() << "}";
243 }
244
Log() const245 void TestScenario::Log() const {
246 std::ostringstream out;
247 out << *this;
248 gpr_log(GPR_DEBUG, "%s", out.str().c_str());
249 }
250
251 class HealthCheck : public health::v1::Health::Service {};
252
253 class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
254 protected:
AsyncEnd2endTest()255 AsyncEnd2endTest() { GetParam().Log(); }
256
SetUp()257 void SetUp() override {
258 port_ = grpc_pick_unused_port_or_die();
259 server_address_ << "localhost:" << port_;
260
261 // Setup server
262 BuildAndStartServer();
263 }
264
TearDown()265 void TearDown() override {
266 server_->Shutdown();
267 void* ignored_tag;
268 bool ignored_ok;
269 cq_->Shutdown();
270 while (cq_->Next(&ignored_tag, &ignored_ok))
271 ;
272 stub_.reset();
273 grpc_recycle_unused_port(port_);
274 }
275
BuildAndStartServer()276 void BuildAndStartServer() {
277 ServerBuilder builder;
278 auto server_creds = GetCredentialsProvider()->GetServerCredentials(
279 GetParam().credentials_type);
280 builder.AddListeningPort(server_address_.str(), server_creds);
281 service_.reset(new grpc::testing::EchoTestService::AsyncService());
282 builder.RegisterService(service_.get());
283 if (GetParam().health_check_service) {
284 builder.RegisterService(&health_check_);
285 }
286 cq_ = builder.AddCompletionQueue();
287
288 // TODO(zyc): make a test option to choose wheather sync plugins should be
289 // deleted
290 std::unique_ptr<ServerBuilderOption> sync_plugin_disabler(
291 new ServerBuilderSyncPluginDisabler());
292 builder.SetOption(move(sync_plugin_disabler));
293 server_ = builder.BuildAndStart();
294 }
295
ResetStub()296 void ResetStub() {
297 ChannelArguments args;
298 auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
299 GetParam().credentials_type, &args);
300 std::shared_ptr<Channel> channel =
301 !(GetParam().inproc) ? ::grpc::CreateCustomChannel(
302 server_address_.str(), channel_creds, args)
303 : server_->InProcessChannel(args);
304 stub_ = grpc::testing::EchoTestService::NewStub(channel);
305 }
306
SendRpc(int num_rpcs)307 void SendRpc(int num_rpcs) {
308 for (int i = 0; i < num_rpcs; i++) {
309 EchoRequest send_request;
310 EchoRequest recv_request;
311 EchoResponse send_response;
312 EchoResponse recv_response;
313 Status recv_status;
314
315 ClientContext cli_ctx;
316 ServerContext srv_ctx;
317 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
318
319 send_request.set_message(GetParam().message_content);
320 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
321 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
322
323 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
324 cq_.get(), cq_.get(), tag(2));
325
326 response_reader->Finish(&recv_response, &recv_status, tag(4));
327
328 Verifier().Expect(2, true).Verify(cq_.get());
329 EXPECT_EQ(send_request.message(), recv_request.message());
330
331 send_response.set_message(recv_request.message());
332 response_writer.Finish(send_response, Status::OK, tag(3));
333 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
334
335 EXPECT_EQ(send_response.message(), recv_response.message());
336 EXPECT_TRUE(recv_status.ok());
337 }
338 }
339
340 std::unique_ptr<ServerCompletionQueue> cq_;
341 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
342 std::unique_ptr<Server> server_;
343 std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
344 HealthCheck health_check_;
345 std::ostringstream server_address_;
346 int port_;
347 };
348
TEST_P(AsyncEnd2endTest,SimpleRpc)349 TEST_P(AsyncEnd2endTest, SimpleRpc) {
350 ResetStub();
351 SendRpc(1);
352 }
353
TEST_P(AsyncEnd2endTest,SimpleRpcWithExpectedError)354 TEST_P(AsyncEnd2endTest, SimpleRpcWithExpectedError) {
355 ResetStub();
356
357 EchoRequest send_request;
358 EchoRequest recv_request;
359 EchoResponse send_response;
360 EchoResponse recv_response;
361 Status recv_status;
362
363 ClientContext cli_ctx;
364 ServerContext srv_ctx;
365 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
366 ErrorStatus error_status;
367
368 send_request.set_message(GetParam().message_content);
369 error_status.set_code(1); // CANCELLED
370 error_status.set_error_message("cancel error message");
371 *send_request.mutable_param()->mutable_expected_error() = error_status;
372
373 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
374 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
375
376 srv_ctx.AsyncNotifyWhenDone(tag(5));
377 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
378 cq_.get(), tag(2));
379
380 response_reader->Finish(&recv_response, &recv_status, tag(4));
381
382 Verifier().Expect(2, true).Verify(cq_.get());
383 EXPECT_EQ(send_request.message(), recv_request.message());
384
385 send_response.set_message(recv_request.message());
386 response_writer.Finish(
387 send_response,
388 Status(
389 static_cast<StatusCode>(recv_request.param().expected_error().code()),
390 recv_request.param().expected_error().error_message()),
391 tag(3));
392 Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
393
394 EXPECT_EQ(recv_response.message(), "");
395 EXPECT_EQ(recv_status.error_code(), error_status.code());
396 EXPECT_EQ(recv_status.error_message(), error_status.error_message());
397 EXPECT_FALSE(srv_ctx.IsCancelled());
398 }
399
TEST_P(AsyncEnd2endTest,SequentialRpcs)400 TEST_P(AsyncEnd2endTest, SequentialRpcs) {
401 ResetStub();
402 SendRpc(10);
403 }
404
TEST_P(AsyncEnd2endTest,ReconnectChannel)405 TEST_P(AsyncEnd2endTest, ReconnectChannel) {
406 // GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS is set to 100ms in main()
407 if (GetParam().inproc) {
408 return;
409 }
410 int poller_slowdown_factor = 1;
411 #ifdef GRPC_POSIX_SOCKET_EV
412 // It needs 2 pollset_works to reconnect the channel with polling engine
413 // "poll"
414 grpc_core::UniquePtr<char> poller = GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
415 if (0 == strcmp(poller.get(), "poll")) {
416 poller_slowdown_factor = 2;
417 }
418 #endif // GRPC_POSIX_SOCKET_EV
419 ResetStub();
420 SendRpc(1);
421 server_->Shutdown();
422 void* ignored_tag;
423 bool ignored_ok;
424 cq_->Shutdown();
425 while (cq_->Next(&ignored_tag, &ignored_ok))
426 ;
427 BuildAndStartServer();
428 // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
429 // reconnect the channel.
430 gpr_sleep_until(gpr_time_add(
431 gpr_now(GPR_CLOCK_REALTIME),
432 gpr_time_from_millis(
433 300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
434 GPR_TIMESPAN)));
435 SendRpc(1);
436 }
437
438 // We do not need to protect notify because the use is synchronized.
ServerWait(Server * server,int * notify)439 void ServerWait(Server* server, int* notify) {
440 server->Wait();
441 *notify = 1;
442 }
TEST_P(AsyncEnd2endTest,WaitAndShutdownTest)443 TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
444 int notify = 0;
445 std::thread wait_thread(&ServerWait, server_.get(), ¬ify);
446 ResetStub();
447 SendRpc(1);
448 EXPECT_EQ(0, notify);
449 server_->Shutdown();
450 wait_thread.join();
451 EXPECT_EQ(1, notify);
452 }
453
TEST_P(AsyncEnd2endTest,ShutdownThenWait)454 TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
455 ResetStub();
456 SendRpc(1);
457 std::thread t([this]() { server_->Shutdown(); });
458 server_->Wait();
459 t.join();
460 }
461
462 // Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest,AsyncNextRpc)463 TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
464 ResetStub();
465
466 EchoRequest send_request;
467 EchoRequest recv_request;
468 EchoResponse send_response;
469 EchoResponse recv_response;
470 Status recv_status;
471
472 ClientContext cli_ctx;
473 ServerContext srv_ctx;
474 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
475
476 send_request.set_message(GetParam().message_content);
477 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
478 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
479
480 std::chrono::system_clock::time_point time_now(
481 std::chrono::system_clock::now());
482 std::chrono::system_clock::time_point time_limit(
483 std::chrono::system_clock::now() + std::chrono::seconds(10));
484 Verifier().Verify(cq_.get(), time_now);
485 Verifier().Verify(cq_.get(), time_now);
486
487 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
488 cq_.get(), tag(2));
489 response_reader->Finish(&recv_response, &recv_status, tag(4));
490
491 Verifier().Expect(2, true).Verify(cq_.get(), time_limit);
492 EXPECT_EQ(send_request.message(), recv_request.message());
493
494 send_response.set_message(recv_request.message());
495 response_writer.Finish(send_response, Status::OK, tag(3));
496 Verifier().Expect(3, true).Expect(4, true).Verify(
497 cq_.get(), std::chrono::system_clock::time_point::max());
498
499 EXPECT_EQ(send_response.message(), recv_response.message());
500 EXPECT_TRUE(recv_status.ok());
501 }
502
503 // Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest,DoThenAsyncNextRpc)504 TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
505 ResetStub();
506
507 EchoRequest send_request;
508 EchoRequest recv_request;
509 EchoResponse send_response;
510 EchoResponse recv_response;
511 Status recv_status;
512
513 ClientContext cli_ctx;
514 ServerContext srv_ctx;
515 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
516
517 send_request.set_message(GetParam().message_content);
518 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
519 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
520
521 std::chrono::system_clock::time_point time_now(
522 std::chrono::system_clock::now());
523 std::chrono::system_clock::time_point time_limit(
524 std::chrono::system_clock::now() + std::chrono::seconds(10));
525 Verifier().Verify(cq_.get(), time_now);
526 Verifier().Verify(cq_.get(), time_now);
527
528 auto resp_writer_ptr = &response_writer;
529 auto lambda_2 = [&, this, resp_writer_ptr]() {
530 service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
531 cq_.get(), tag(2));
532 };
533 response_reader->Finish(&recv_response, &recv_status, tag(4));
534
535 Verifier().Expect(2, true).Verify(cq_.get(), time_limit, lambda_2);
536 EXPECT_EQ(send_request.message(), recv_request.message());
537
538 send_response.set_message(recv_request.message());
539 auto lambda_3 = [resp_writer_ptr, send_response]() {
540 resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
541 };
542 Verifier().Expect(3, true).Expect(4, true).Verify(
543 cq_.get(), std::chrono::system_clock::time_point::max(), lambda_3);
544
545 EXPECT_EQ(send_response.message(), recv_response.message());
546 EXPECT_TRUE(recv_status.ok());
547 }
548
549 // Two pings and a final pong.
TEST_P(AsyncEnd2endTest,SimpleClientStreaming)550 TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
551 ResetStub();
552
553 EchoRequest send_request;
554 EchoRequest recv_request;
555 EchoResponse send_response;
556 EchoResponse recv_response;
557 Status recv_status;
558 ClientContext cli_ctx;
559 ServerContext srv_ctx;
560 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
561
562 send_request.set_message(GetParam().message_content);
563 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
564 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
565
566 service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
567 tag(2));
568
569 Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
570
571 cli_stream->Write(send_request, tag(3));
572 srv_stream.Read(&recv_request, tag(4));
573 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
574 EXPECT_EQ(send_request.message(), recv_request.message());
575
576 cli_stream->Write(send_request, tag(5));
577 srv_stream.Read(&recv_request, tag(6));
578 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
579
580 EXPECT_EQ(send_request.message(), recv_request.message());
581 cli_stream->WritesDone(tag(7));
582 srv_stream.Read(&recv_request, tag(8));
583 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
584
585 send_response.set_message(recv_request.message());
586 srv_stream.Finish(send_response, Status::OK, tag(9));
587 cli_stream->Finish(&recv_status, tag(10));
588 Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
589
590 EXPECT_EQ(send_response.message(), recv_response.message());
591 EXPECT_TRUE(recv_status.ok());
592 }
593
594 // Two pings and a final pong.
TEST_P(AsyncEnd2endTest,SimpleClientStreamingWithCoalescingApi)595 TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
596 ResetStub();
597
598 EchoRequest send_request;
599 EchoRequest recv_request;
600 EchoResponse send_response;
601 EchoResponse recv_response;
602 Status recv_status;
603 ClientContext cli_ctx;
604 ServerContext srv_ctx;
605 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
606
607 send_request.set_message(GetParam().message_content);
608 cli_ctx.set_initial_metadata_corked(true);
609 // tag:1 never comes up since no op is performed
610 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
611 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
612
613 service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
614 tag(2));
615
616 cli_stream->Write(send_request, tag(3));
617
618 bool seen3 = false;
619
620 Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
621
622 srv_stream.Read(&recv_request, tag(4));
623
624 Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
625
626 EXPECT_EQ(send_request.message(), recv_request.message());
627
628 cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
629 srv_stream.Read(&recv_request, tag(6));
630 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
631 EXPECT_EQ(send_request.message(), recv_request.message());
632
633 srv_stream.Read(&recv_request, tag(7));
634 Verifier().Expect(7, false).Verify(cq_.get());
635
636 send_response.set_message(recv_request.message());
637 srv_stream.Finish(send_response, Status::OK, tag(8));
638 cli_stream->Finish(&recv_status, tag(9));
639 Verifier().Expect(8, true).Expect(9, true).Verify(cq_.get());
640
641 EXPECT_EQ(send_response.message(), recv_response.message());
642 EXPECT_TRUE(recv_status.ok());
643 }
644
645 // One ping, two pongs.
TEST_P(AsyncEnd2endTest,SimpleServerStreaming)646 TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
647 ResetStub();
648
649 EchoRequest send_request;
650 EchoRequest recv_request;
651 EchoResponse send_response;
652 EchoResponse recv_response;
653 Status recv_status;
654 ClientContext cli_ctx;
655 ServerContext srv_ctx;
656 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
657
658 send_request.set_message(GetParam().message_content);
659 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
660 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
661
662 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
663 cq_.get(), cq_.get(), tag(2));
664
665 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
666 EXPECT_EQ(send_request.message(), recv_request.message());
667
668 send_response.set_message(recv_request.message());
669 srv_stream.Write(send_response, tag(3));
670 cli_stream->Read(&recv_response, tag(4));
671 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
672 EXPECT_EQ(send_response.message(), recv_response.message());
673
674 srv_stream.Write(send_response, tag(5));
675 cli_stream->Read(&recv_response, tag(6));
676 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
677 EXPECT_EQ(send_response.message(), recv_response.message());
678
679 srv_stream.Finish(Status::OK, tag(7));
680 cli_stream->Read(&recv_response, tag(8));
681 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
682
683 cli_stream->Finish(&recv_status, tag(9));
684 Verifier().Expect(9, true).Verify(cq_.get());
685
686 EXPECT_TRUE(recv_status.ok());
687 }
688
689 // One ping, two pongs. Using WriteAndFinish API
TEST_P(AsyncEnd2endTest,SimpleServerStreamingWithCoalescingApiWAF)690 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
691 ResetStub();
692
693 EchoRequest send_request;
694 EchoRequest recv_request;
695 EchoResponse send_response;
696 EchoResponse recv_response;
697 Status recv_status;
698 ClientContext cli_ctx;
699 ServerContext srv_ctx;
700 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
701
702 send_request.set_message(GetParam().message_content);
703 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
704 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
705
706 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
707 cq_.get(), cq_.get(), tag(2));
708
709 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
710 EXPECT_EQ(send_request.message(), recv_request.message());
711
712 send_response.set_message(recv_request.message());
713 srv_stream.Write(send_response, tag(3));
714 cli_stream->Read(&recv_response, tag(4));
715 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
716 EXPECT_EQ(send_response.message(), recv_response.message());
717
718 srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
719 cli_stream->Read(&recv_response, tag(6));
720 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
721 EXPECT_EQ(send_response.message(), recv_response.message());
722
723 cli_stream->Read(&recv_response, tag(7));
724 Verifier().Expect(7, false).Verify(cq_.get());
725
726 cli_stream->Finish(&recv_status, tag(8));
727 Verifier().Expect(8, true).Verify(cq_.get());
728
729 EXPECT_TRUE(recv_status.ok());
730 }
731
732 // One ping, two pongs. Using WriteLast API
TEST_P(AsyncEnd2endTest,SimpleServerStreamingWithCoalescingApiWL)733 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
734 ResetStub();
735
736 EchoRequest send_request;
737 EchoRequest recv_request;
738 EchoResponse send_response;
739 EchoResponse recv_response;
740 Status recv_status;
741 ClientContext cli_ctx;
742 ServerContext srv_ctx;
743 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
744
745 send_request.set_message(GetParam().message_content);
746 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
747 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
748
749 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
750 cq_.get(), cq_.get(), tag(2));
751
752 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
753 EXPECT_EQ(send_request.message(), recv_request.message());
754
755 send_response.set_message(recv_request.message());
756 srv_stream.Write(send_response, tag(3));
757 cli_stream->Read(&recv_response, tag(4));
758 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
759 EXPECT_EQ(send_response.message(), recv_response.message());
760
761 srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
762 cli_stream->Read(&recv_response, tag(6));
763 srv_stream.Finish(Status::OK, tag(7));
764 Verifier().Expect(5, true).Expect(6, true).Expect(7, true).Verify(cq_.get());
765 EXPECT_EQ(send_response.message(), recv_response.message());
766
767 cli_stream->Read(&recv_response, tag(8));
768 Verifier().Expect(8, false).Verify(cq_.get());
769
770 cli_stream->Finish(&recv_status, tag(9));
771 Verifier().Expect(9, true).Verify(cq_.get());
772
773 EXPECT_TRUE(recv_status.ok());
774 }
775
776 // One ping, one pong.
TEST_P(AsyncEnd2endTest,SimpleBidiStreaming)777 TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
778 ResetStub();
779
780 EchoRequest send_request;
781 EchoRequest recv_request;
782 EchoResponse send_response;
783 EchoResponse recv_response;
784 Status recv_status;
785 ClientContext cli_ctx;
786 ServerContext srv_ctx;
787 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
788
789 send_request.set_message(GetParam().message_content);
790 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
791 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
792
793 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
794 tag(2));
795
796 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
797
798 cli_stream->Write(send_request, tag(3));
799 srv_stream.Read(&recv_request, tag(4));
800 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
801 EXPECT_EQ(send_request.message(), recv_request.message());
802
803 send_response.set_message(recv_request.message());
804 srv_stream.Write(send_response, tag(5));
805 cli_stream->Read(&recv_response, tag(6));
806 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
807 EXPECT_EQ(send_response.message(), recv_response.message());
808
809 cli_stream->WritesDone(tag(7));
810 srv_stream.Read(&recv_request, tag(8));
811 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
812
813 srv_stream.Finish(Status::OK, tag(9));
814 cli_stream->Finish(&recv_status, tag(10));
815 Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
816
817 EXPECT_TRUE(recv_status.ok());
818 }
819
820 // One ping, one pong. Using server:WriteAndFinish api
TEST_P(AsyncEnd2endTest,SimpleBidiStreamingWithCoalescingApiWAF)821 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
822 ResetStub();
823
824 EchoRequest send_request;
825 EchoRequest recv_request;
826 EchoResponse send_response;
827 EchoResponse recv_response;
828 Status recv_status;
829 ClientContext cli_ctx;
830 ServerContext srv_ctx;
831 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
832
833 send_request.set_message(GetParam().message_content);
834 cli_ctx.set_initial_metadata_corked(true);
835 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
836 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
837
838 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
839 tag(2));
840
841 cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
842
843 bool seen3 = false;
844
845 Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
846
847 srv_stream.Read(&recv_request, tag(4));
848
849 Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
850 EXPECT_EQ(send_request.message(), recv_request.message());
851
852 srv_stream.Read(&recv_request, tag(5));
853 Verifier().Expect(5, false).Verify(cq_.get());
854
855 send_response.set_message(recv_request.message());
856 srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
857 cli_stream->Read(&recv_response, tag(7));
858 Verifier().Expect(6, true).Expect(7, true).Verify(cq_.get());
859 EXPECT_EQ(send_response.message(), recv_response.message());
860
861 cli_stream->Finish(&recv_status, tag(8));
862 Verifier().Expect(8, true).Verify(cq_.get());
863
864 EXPECT_TRUE(recv_status.ok());
865 }
866
867 // One ping, one pong. Using server:WriteLast api
TEST_P(AsyncEnd2endTest,SimpleBidiStreamingWithCoalescingApiWL)868 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
869 ResetStub();
870
871 EchoRequest send_request;
872 EchoRequest recv_request;
873 EchoResponse send_response;
874 EchoResponse recv_response;
875 Status recv_status;
876 ClientContext cli_ctx;
877 ServerContext srv_ctx;
878 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
879
880 send_request.set_message(GetParam().message_content);
881 cli_ctx.set_initial_metadata_corked(true);
882 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
883 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
884
885 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
886 tag(2));
887
888 cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
889
890 bool seen3 = false;
891
892 Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
893
894 srv_stream.Read(&recv_request, tag(4));
895
896 Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
897 EXPECT_EQ(send_request.message(), recv_request.message());
898
899 srv_stream.Read(&recv_request, tag(5));
900 Verifier().Expect(5, false).Verify(cq_.get());
901
902 send_response.set_message(recv_request.message());
903 srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
904 srv_stream.Finish(Status::OK, tag(7));
905 cli_stream->Read(&recv_response, tag(8));
906 Verifier().Expect(6, true).Expect(7, true).Expect(8, true).Verify(cq_.get());
907 EXPECT_EQ(send_response.message(), recv_response.message());
908
909 cli_stream->Finish(&recv_status, tag(9));
910 Verifier().Expect(9, true).Verify(cq_.get());
911
912 EXPECT_TRUE(recv_status.ok());
913 }
914
915 // Metadata tests
TEST_P(AsyncEnd2endTest,ClientInitialMetadataRpc)916 TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
917 ResetStub();
918
919 EchoRequest send_request;
920 EchoRequest recv_request;
921 EchoResponse send_response;
922 EchoResponse recv_response;
923 Status recv_status;
924
925 ClientContext cli_ctx;
926 ServerContext srv_ctx;
927 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
928
929 send_request.set_message(GetParam().message_content);
930 std::pair<std::string, std::string> meta1("key1", "val1");
931 std::pair<std::string, std::string> meta2("key2", "val2");
932 std::pair<std::string, std::string> meta3("g.r.d-bin", "xyz");
933 cli_ctx.AddMetadata(meta1.first, meta1.second);
934 cli_ctx.AddMetadata(meta2.first, meta2.second);
935 cli_ctx.AddMetadata(meta3.first, meta3.second);
936
937 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
938 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
939 response_reader->Finish(&recv_response, &recv_status, tag(4));
940
941 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
942 cq_.get(), tag(2));
943 Verifier().Expect(2, true).Verify(cq_.get());
944 EXPECT_EQ(send_request.message(), recv_request.message());
945 const auto& client_initial_metadata = srv_ctx.client_metadata();
946 EXPECT_EQ(meta1.second,
947 ToString(client_initial_metadata.find(meta1.first)->second));
948 EXPECT_EQ(meta2.second,
949 ToString(client_initial_metadata.find(meta2.first)->second));
950 EXPECT_EQ(meta3.second,
951 ToString(client_initial_metadata.find(meta3.first)->second));
952 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
953
954 send_response.set_message(recv_request.message());
955 response_writer.Finish(send_response, Status::OK, tag(3));
956 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
957
958 EXPECT_EQ(send_response.message(), recv_response.message());
959 EXPECT_TRUE(recv_status.ok());
960 }
961
TEST_P(AsyncEnd2endTest,ServerInitialMetadataRpc)962 TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
963 ResetStub();
964
965 EchoRequest send_request;
966 EchoRequest recv_request;
967 EchoResponse send_response;
968 EchoResponse recv_response;
969 Status recv_status;
970
971 ClientContext cli_ctx;
972 ServerContext srv_ctx;
973 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
974
975 send_request.set_message(GetParam().message_content);
976 std::pair<std::string, std::string> meta1("key1", "val1");
977 std::pair<std::string, std::string> meta2("key2", "val2");
978
979 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
980 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
981 response_reader->ReadInitialMetadata(tag(4));
982
983 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
984 cq_.get(), tag(2));
985 Verifier().Expect(2, true).Verify(cq_.get());
986 EXPECT_EQ(send_request.message(), recv_request.message());
987 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
988 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
989 response_writer.SendInitialMetadata(tag(3));
990 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
991 const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
992 EXPECT_EQ(meta1.second,
993 ToString(server_initial_metadata.find(meta1.first)->second));
994 EXPECT_EQ(meta2.second,
995 ToString(server_initial_metadata.find(meta2.first)->second));
996 EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
997
998 send_response.set_message(recv_request.message());
999 response_writer.Finish(send_response, Status::OK, tag(5));
1000 response_reader->Finish(&recv_response, &recv_status, tag(6));
1001 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1002
1003 EXPECT_EQ(send_response.message(), recv_response.message());
1004 EXPECT_TRUE(recv_status.ok());
1005 }
1006
1007 // 1 ping, 2 pongs.
TEST_P(AsyncEnd2endTest,ServerInitialMetadataServerStreaming)1008 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreaming) {
1009 ResetStub();
1010 EchoRequest send_request;
1011 EchoRequest recv_request;
1012 EchoResponse send_response;
1013 EchoResponse recv_response;
1014 Status recv_status;
1015 ClientContext cli_ctx;
1016 ServerContext srv_ctx;
1017 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1018
1019 std::pair<::std::string, ::std::string> meta1("key1", "val1");
1020 std::pair<::std::string, ::std::string> meta2("key2", "val2");
1021
1022 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1023 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
1024 cli_stream->ReadInitialMetadata(tag(11));
1025 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1026 cq_.get(), cq_.get(), tag(2));
1027
1028 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1029
1030 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1031 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1032 srv_stream.SendInitialMetadata(tag(10));
1033 Verifier().Expect(10, true).Expect(11, true).Verify(cq_.get());
1034 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1035 EXPECT_EQ(meta1.second,
1036 ToString(server_initial_metadata.find(meta1.first)->second));
1037 EXPECT_EQ(meta2.second,
1038 ToString(server_initial_metadata.find(meta2.first)->second));
1039 EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
1040
1041 srv_stream.Write(send_response, tag(3));
1042
1043 cli_stream->Read(&recv_response, tag(4));
1044 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1045
1046 srv_stream.Write(send_response, tag(5));
1047 cli_stream->Read(&recv_response, tag(6));
1048 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1049
1050 srv_stream.Finish(Status::OK, tag(7));
1051 cli_stream->Read(&recv_response, tag(8));
1052 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
1053
1054 cli_stream->Finish(&recv_status, tag(9));
1055 Verifier().Expect(9, true).Verify(cq_.get());
1056
1057 EXPECT_TRUE(recv_status.ok());
1058 }
1059
1060 // 1 ping, 2 pongs.
1061 // Test for server initial metadata being sent implicitly
TEST_P(AsyncEnd2endTest,ServerInitialMetadataServerStreamingImplicit)1062 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreamingImplicit) {
1063 ResetStub();
1064 EchoRequest send_request;
1065 EchoRequest recv_request;
1066 EchoResponse send_response;
1067 EchoResponse recv_response;
1068 Status recv_status;
1069 ClientContext cli_ctx;
1070 ServerContext srv_ctx;
1071 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1072
1073 send_request.set_message(GetParam().message_content);
1074 std::pair<::std::string, ::std::string> meta1("key1", "val1");
1075 std::pair<::std::string, ::std::string> meta2("key2", "val2");
1076
1077 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1078 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
1079 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1080 cq_.get(), cq_.get(), tag(2));
1081
1082 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1083 EXPECT_EQ(send_request.message(), recv_request.message());
1084
1085 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1086 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1087 send_response.set_message(recv_request.message());
1088 srv_stream.Write(send_response, tag(3));
1089
1090 cli_stream->Read(&recv_response, tag(4));
1091 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1092 EXPECT_EQ(send_response.message(), recv_response.message());
1093
1094 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1095 EXPECT_EQ(meta1.second,
1096 ToString(server_initial_metadata.find(meta1.first)->second));
1097 EXPECT_EQ(meta2.second,
1098 ToString(server_initial_metadata.find(meta2.first)->second));
1099 EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
1100
1101 srv_stream.Write(send_response, tag(5));
1102 cli_stream->Read(&recv_response, tag(6));
1103 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1104
1105 srv_stream.Finish(Status::OK, tag(7));
1106 cli_stream->Read(&recv_response, tag(8));
1107 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
1108
1109 cli_stream->Finish(&recv_status, tag(9));
1110 Verifier().Expect(9, true).Verify(cq_.get());
1111
1112 EXPECT_TRUE(recv_status.ok());
1113 }
1114
TEST_P(AsyncEnd2endTest,ServerTrailingMetadataRpc)1115 TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
1116 ResetStub();
1117
1118 EchoRequest send_request;
1119 EchoRequest recv_request;
1120 EchoResponse send_response;
1121 EchoResponse recv_response;
1122 Status recv_status;
1123
1124 ClientContext cli_ctx;
1125 ServerContext srv_ctx;
1126 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1127
1128 send_request.set_message(GetParam().message_content);
1129 std::pair<std::string, std::string> meta1("key1", "val1");
1130 std::pair<std::string, std::string> meta2("key2", "val2");
1131
1132 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1133 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1134 response_reader->Finish(&recv_response, &recv_status, tag(5));
1135
1136 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1137 cq_.get(), tag(2));
1138 Verifier().Expect(2, true).Verify(cq_.get());
1139 EXPECT_EQ(send_request.message(), recv_request.message());
1140 response_writer.SendInitialMetadata(tag(3));
1141 Verifier().Expect(3, true).Verify(cq_.get());
1142
1143 send_response.set_message(recv_request.message());
1144 srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
1145 srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
1146 response_writer.Finish(send_response, Status::OK, tag(4));
1147
1148 Verifier().Expect(4, true).Expect(5, true).Verify(cq_.get());
1149
1150 EXPECT_EQ(send_response.message(), recv_response.message());
1151 EXPECT_TRUE(recv_status.ok());
1152 const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1153 EXPECT_EQ(meta1.second,
1154 ToString(server_trailing_metadata.find(meta1.first)->second));
1155 EXPECT_EQ(meta2.second,
1156 ToString(server_trailing_metadata.find(meta2.first)->second));
1157 EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
1158 }
1159
TEST_P(AsyncEnd2endTest,MetadataRpc)1160 TEST_P(AsyncEnd2endTest, MetadataRpc) {
1161 ResetStub();
1162
1163 EchoRequest send_request;
1164 EchoRequest recv_request;
1165 EchoResponse send_response;
1166 EchoResponse recv_response;
1167 Status recv_status;
1168
1169 ClientContext cli_ctx;
1170 ServerContext srv_ctx;
1171 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1172
1173 send_request.set_message(GetParam().message_content);
1174 std::pair<std::string, std::string> meta1("key1", "val1");
1175 std::pair<std::string, std::string> meta2(
1176 "key2-bin",
1177 std::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
1178 std::pair<std::string, std::string> meta3("key3", "val3");
1179 std::pair<std::string, std::string> meta6(
1180 "key4-bin",
1181 std::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
1182 14));
1183 std::pair<std::string, std::string> meta5("key5", "val5");
1184 std::pair<std::string, std::string> meta4(
1185 "key6-bin",
1186 std::string(
1187 "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
1188
1189 cli_ctx.AddMetadata(meta1.first, meta1.second);
1190 cli_ctx.AddMetadata(meta2.first, meta2.second);
1191
1192 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1193 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1194 response_reader->ReadInitialMetadata(tag(4));
1195
1196 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1197 cq_.get(), tag(2));
1198 Verifier().Expect(2, true).Verify(cq_.get());
1199 EXPECT_EQ(send_request.message(), recv_request.message());
1200 const auto& client_initial_metadata = srv_ctx.client_metadata();
1201 EXPECT_EQ(meta1.second,
1202 ToString(client_initial_metadata.find(meta1.first)->second));
1203 EXPECT_EQ(meta2.second,
1204 ToString(client_initial_metadata.find(meta2.first)->second));
1205 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
1206
1207 srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
1208 srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
1209 response_writer.SendInitialMetadata(tag(3));
1210 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1211 const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1212 EXPECT_EQ(meta3.second,
1213 ToString(server_initial_metadata.find(meta3.first)->second));
1214 EXPECT_EQ(meta4.second,
1215 ToString(server_initial_metadata.find(meta4.first)->second));
1216 EXPECT_GE(server_initial_metadata.size(), static_cast<size_t>(2));
1217
1218 send_response.set_message(recv_request.message());
1219 srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
1220 srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
1221 response_writer.Finish(send_response, Status::OK, tag(5));
1222 response_reader->Finish(&recv_response, &recv_status, tag(6));
1223
1224 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1225
1226 EXPECT_EQ(send_response.message(), recv_response.message());
1227 EXPECT_TRUE(recv_status.ok());
1228 const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1229 EXPECT_EQ(meta5.second,
1230 ToString(server_trailing_metadata.find(meta5.first)->second));
1231 EXPECT_EQ(meta6.second,
1232 ToString(server_trailing_metadata.find(meta6.first)->second));
1233 EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
1234 }
1235
1236 // Server uses AsyncNotifyWhenDone API to check for cancellation
TEST_P(AsyncEnd2endTest,ServerCheckCancellation)1237 TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
1238 ResetStub();
1239
1240 EchoRequest send_request;
1241 EchoRequest recv_request;
1242 EchoResponse send_response;
1243 EchoResponse recv_response;
1244 Status recv_status;
1245
1246 ClientContext cli_ctx;
1247 ServerContext srv_ctx;
1248 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1249
1250 send_request.set_message(GetParam().message_content);
1251 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1252 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1253 response_reader->Finish(&recv_response, &recv_status, tag(4));
1254
1255 srv_ctx.AsyncNotifyWhenDone(tag(5));
1256 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1257 cq_.get(), tag(2));
1258
1259 Verifier().Expect(2, true).Verify(cq_.get());
1260 EXPECT_EQ(send_request.message(), recv_request.message());
1261
1262 cli_ctx.TryCancel();
1263 Verifier().Expect(5, true).Expect(4, true).Verify(cq_.get());
1264 EXPECT_TRUE(srv_ctx.IsCancelled());
1265
1266 EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
1267 }
1268
1269 // Server uses AsyncNotifyWhenDone API to check for normal finish
TEST_P(AsyncEnd2endTest,ServerCheckDone)1270 TEST_P(AsyncEnd2endTest, ServerCheckDone) {
1271 ResetStub();
1272
1273 EchoRequest send_request;
1274 EchoRequest recv_request;
1275 EchoResponse send_response;
1276 EchoResponse recv_response;
1277 Status recv_status;
1278
1279 ClientContext cli_ctx;
1280 ServerContext srv_ctx;
1281 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1282
1283 send_request.set_message(GetParam().message_content);
1284 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1285 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1286 response_reader->Finish(&recv_response, &recv_status, tag(4));
1287
1288 srv_ctx.AsyncNotifyWhenDone(tag(5));
1289 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1290 cq_.get(), tag(2));
1291
1292 Verifier().Expect(2, true).Verify(cq_.get());
1293 EXPECT_EQ(send_request.message(), recv_request.message());
1294
1295 send_response.set_message(recv_request.message());
1296 response_writer.Finish(send_response, Status::OK, tag(3));
1297 Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
1298 EXPECT_FALSE(srv_ctx.IsCancelled());
1299
1300 EXPECT_EQ(send_response.message(), recv_response.message());
1301 EXPECT_TRUE(recv_status.ok());
1302 }
1303
TEST_P(AsyncEnd2endTest,UnimplementedRpc)1304 TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
1305 ChannelArguments args;
1306 const auto& channel_creds = GetCredentialsProvider()->GetChannelCredentials(
1307 GetParam().credentials_type, &args);
1308 std::shared_ptr<Channel> channel =
1309 !(GetParam().inproc) ? ::grpc::CreateCustomChannel(server_address_.str(),
1310 channel_creds, args)
1311 : server_->InProcessChannel(args);
1312 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1313 stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
1314 EchoRequest send_request;
1315 EchoResponse recv_response;
1316 Status recv_status;
1317
1318 ClientContext cli_ctx;
1319 send_request.set_message(GetParam().message_content);
1320 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1321 stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
1322
1323 response_reader->Finish(&recv_response, &recv_status, tag(4));
1324 Verifier().Expect(4, true).Verify(cq_.get());
1325
1326 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
1327 EXPECT_EQ("", recv_status.error_message());
1328 }
1329
1330 // This class is for testing scenarios where RPCs are cancelled on the server
1331 // by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
1332 // API to check for cancellation
1333 class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
1334 protected:
1335 typedef enum {
1336 DO_NOT_CANCEL = 0,
1337 CANCEL_BEFORE_PROCESSING,
1338 CANCEL_DURING_PROCESSING,
1339 CANCEL_AFTER_PROCESSING
1340 } ServerTryCancelRequestPhase;
1341
1342 // Helper for testing client-streaming RPCs which are cancelled on the server.
1343 // Depending on the value of server_try_cancel parameter, this will test one
1344 // of the following three scenarios:
1345 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
1346 // any messages from the client
1347 //
1348 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1349 // messages from the client
1350 //
1351 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1352 // messages from the client (but before sending any status back to the
1353 // client)
TestClientStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1354 void TestClientStreamingServerCancel(
1355 ServerTryCancelRequestPhase server_try_cancel) {
1356 ResetStub();
1357
1358 EchoRequest recv_request;
1359 EchoResponse send_response;
1360 EchoResponse recv_response;
1361 Status recv_status;
1362
1363 ClientContext cli_ctx;
1364 ServerContext srv_ctx;
1365 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1366
1367 // Initiate the 'RequestStream' call on client
1368 CompletionQueue cli_cq;
1369
1370 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
1371 stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1)));
1372
1373 // On the server, request to be notified of 'RequestStream' calls
1374 // and receive the 'RequestStream' call just made by the client
1375 srv_ctx.AsyncNotifyWhenDone(tag(11));
1376 service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1377 tag(2));
1378 std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1379 Verifier().Expect(2, true).Verify(cq_.get());
1380 t1.join();
1381
1382 bool expected_server_cq_result = true;
1383 bool expected_client_cq_result = true;
1384
1385 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1386 srv_ctx.TryCancel();
1387 Verifier().Expect(11, true).Verify(cq_.get());
1388 EXPECT_TRUE(srv_ctx.IsCancelled());
1389
1390 // Since cancellation is done before server reads any results, we know
1391 // for sure that all server cq results will return false from this
1392 // point forward
1393 expected_server_cq_result = false;
1394 expected_client_cq_result = false;
1395 }
1396
1397 bool ignore_client_cq_result =
1398 (server_try_cancel == CANCEL_DURING_PROCESSING) ||
1399 (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1400
1401 std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1402 &ignore_client_cq_result] {
1403 EchoRequest send_request;
1404 // Client sends 3 messages (tags 3, 4 and 5)
1405 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1406 send_request.set_message("Ping " + std::to_string(tag_idx));
1407 cli_stream->Write(send_request, tag(tag_idx));
1408 Verifier()
1409 .Expect(tag_idx, expected_client_cq_result)
1410 .Verify(&cli_cq, ignore_client_cq_result);
1411 }
1412 cli_stream->WritesDone(tag(6));
1413 // Ignore ok on WritesDone since cancel can affect it
1414 Verifier()
1415 .Expect(6, expected_client_cq_result)
1416 .Verify(&cli_cq, ignore_client_cq_result);
1417 });
1418
1419 bool ignore_cq_result = false;
1420 bool want_done_tag = false;
1421 std::thread* server_try_cancel_thd = nullptr;
1422
1423 auto verif = Verifier();
1424
1425 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1426 server_try_cancel_thd =
1427 new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1428 // Server will cancel the RPC in a parallel thread while reading the
1429 // requests from the client. Since the cancellation can happen at anytime,
1430 // some of the cq results (i.e those until cancellation) might be true but
1431 // its non deterministic. So better to ignore the cq results
1432 ignore_cq_result = true;
1433 // Expect that we might possibly see the done tag that
1434 // indicates cancellation completion in this case
1435 want_done_tag = true;
1436 verif.Expect(11, true);
1437 }
1438
1439 // Server reads 3 messages (tags 6, 7 and 8)
1440 // But if want_done_tag is true, we might also see tag 11
1441 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1442 srv_stream.Read(&recv_request, tag(tag_idx));
1443 // Note that we'll add something to the verifier and verify that
1444 // something was seen, but it might be tag 11 and not what we
1445 // just added
1446 int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
1447 .Next(cq_.get(), ignore_cq_result);
1448 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1449 if (got_tag == 11) {
1450 EXPECT_TRUE(srv_ctx.IsCancelled());
1451 want_done_tag = false;
1452 // Now get the other entry that we were waiting on
1453 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1454 }
1455 }
1456
1457 cli_thread.join();
1458
1459 if (server_try_cancel_thd != nullptr) {
1460 server_try_cancel_thd->join();
1461 delete server_try_cancel_thd;
1462 }
1463
1464 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1465 srv_ctx.TryCancel();
1466 want_done_tag = true;
1467 verif.Expect(11, true);
1468 }
1469
1470 if (want_done_tag) {
1471 verif.Verify(cq_.get());
1472 EXPECT_TRUE(srv_ctx.IsCancelled());
1473 want_done_tag = false;
1474 }
1475
1476 // The RPC has been cancelled at this point for sure (i.e irrespective of
1477 // the value of `server_try_cancel` is). So, from this point forward, we
1478 // know that cq results are supposed to return false on server.
1479
1480 // Server sends the final message and cancelled status (but the RPC is
1481 // already cancelled at this point. So we expect the operation to fail)
1482 srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
1483 Verifier().Expect(9, false).Verify(cq_.get());
1484
1485 // Client will see the cancellation
1486 cli_stream->Finish(&recv_status, tag(10));
1487 Verifier().Expect(10, true).Verify(&cli_cq);
1488 EXPECT_FALSE(recv_status.ok());
1489 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1490
1491 cli_cq.Shutdown();
1492 void* dummy_tag;
1493 bool dummy_ok;
1494 while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
1495 }
1496 }
1497
1498 // Helper for testing server-streaming RPCs which are cancelled on the server.
1499 // Depending on the value of server_try_cancel parameter, this will test one
1500 // of the following three scenarios:
1501 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
1502 // any messages to the client
1503 //
1504 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
1505 // messages to the client
1506 //
1507 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
1508 // messages to the client (but before sending any status back to the
1509 // client)
TestServerStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1510 void TestServerStreamingServerCancel(
1511 ServerTryCancelRequestPhase server_try_cancel) {
1512 ResetStub();
1513
1514 EchoRequest send_request;
1515 EchoRequest recv_request;
1516 EchoResponse send_response;
1517 Status recv_status;
1518 ClientContext cli_ctx;
1519 ServerContext srv_ctx;
1520 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1521
1522 send_request.set_message("Ping");
1523 // Initiate the 'ResponseStream' call on the client
1524 CompletionQueue cli_cq;
1525 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1526 stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1)));
1527 // On the server, request to be notified of 'ResponseStream' calls and
1528 // receive the call just made by the client
1529 srv_ctx.AsyncNotifyWhenDone(tag(11));
1530 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1531 cq_.get(), cq_.get(), tag(2));
1532
1533 std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1534 Verifier().Expect(2, true).Verify(cq_.get());
1535 t1.join();
1536
1537 EXPECT_EQ(send_request.message(), recv_request.message());
1538
1539 bool expected_cq_result = true;
1540 bool ignore_cq_result = false;
1541 bool want_done_tag = false;
1542 bool expected_client_cq_result = true;
1543 bool ignore_client_cq_result =
1544 (server_try_cancel != CANCEL_BEFORE_PROCESSING);
1545
1546 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1547 srv_ctx.TryCancel();
1548 Verifier().Expect(11, true).Verify(cq_.get());
1549 EXPECT_TRUE(srv_ctx.IsCancelled());
1550
1551 // We know for sure that all cq results will be false from this point
1552 // since the server cancelled the RPC
1553 expected_cq_result = false;
1554 expected_client_cq_result = false;
1555 }
1556
1557 std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1558 &ignore_client_cq_result] {
1559 // Client attempts to read the three messages from the server
1560 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1561 EchoResponse recv_response;
1562 cli_stream->Read(&recv_response, tag(tag_idx));
1563 Verifier()
1564 .Expect(tag_idx, expected_client_cq_result)
1565 .Verify(&cli_cq, ignore_client_cq_result);
1566 }
1567 });
1568
1569 std::thread* server_try_cancel_thd = nullptr;
1570
1571 auto verif = Verifier();
1572
1573 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1574 server_try_cancel_thd =
1575 new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1576
1577 // Server will cancel the RPC in a parallel thread while writing responses
1578 // to the client. Since the cancellation can happen at anytime, some of
1579 // the cq results (i.e those until cancellation) might be true but it is
1580 // non deterministic. So better to ignore the cq results
1581 ignore_cq_result = true;
1582 // Expect that we might possibly see the done tag that
1583 // indicates cancellation completion in this case
1584 want_done_tag = true;
1585 verif.Expect(11, true);
1586 }
1587
1588 // Server sends three messages (tags 3, 4 and 5)
1589 // But if want_done tag is true, we might also see tag 11
1590 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1591 send_response.set_message("Pong " + std::to_string(tag_idx));
1592 srv_stream.Write(send_response, tag(tag_idx));
1593 // Note that we'll add something to the verifier and verify that
1594 // something was seen, but it might be tag 11 and not what we
1595 // just added
1596 int got_tag = verif.Expect(tag_idx, expected_cq_result)
1597 .Next(cq_.get(), ignore_cq_result);
1598 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1599 if (got_tag == 11) {
1600 EXPECT_TRUE(srv_ctx.IsCancelled());
1601 want_done_tag = false;
1602 // Now get the other entry that we were waiting on
1603 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1604 }
1605 }
1606
1607 if (server_try_cancel_thd != nullptr) {
1608 server_try_cancel_thd->join();
1609 delete server_try_cancel_thd;
1610 }
1611
1612 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1613 srv_ctx.TryCancel();
1614 want_done_tag = true;
1615 verif.Expect(11, true);
1616 }
1617
1618 if (want_done_tag) {
1619 verif.Verify(cq_.get());
1620 EXPECT_TRUE(srv_ctx.IsCancelled());
1621 want_done_tag = false;
1622 }
1623
1624 cli_thread.join();
1625
1626 // The RPC has been cancelled at this point for sure (i.e irrespective of
1627 // the value of `server_try_cancel` is). So, from this point forward, we
1628 // know that cq results are supposed to return false on server.
1629
1630 // Server finishes the stream (but the RPC is already cancelled)
1631 srv_stream.Finish(Status::CANCELLED, tag(9));
1632 Verifier().Expect(9, false).Verify(cq_.get());
1633
1634 // Client will see the cancellation
1635 cli_stream->Finish(&recv_status, tag(10));
1636 Verifier().Expect(10, true).Verify(&cli_cq);
1637 EXPECT_FALSE(recv_status.ok());
1638 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1639
1640 cli_cq.Shutdown();
1641 void* dummy_tag;
1642 bool dummy_ok;
1643 while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
1644 }
1645 }
1646
1647 // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
1648 // server.
1649 //
1650 // Depending on the value of server_try_cancel parameter, this will
1651 // test one of the following three scenarios:
1652 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
1653 // writing any messages from/to the client
1654 //
1655 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1656 // messages from the client
1657 //
1658 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1659 // messages from the client (but before sending any status back to the
1660 // client)
TestBidiStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1661 void TestBidiStreamingServerCancel(
1662 ServerTryCancelRequestPhase server_try_cancel) {
1663 ResetStub();
1664
1665 EchoRequest send_request;
1666 EchoRequest recv_request;
1667 EchoResponse send_response;
1668 EchoResponse recv_response;
1669 Status recv_status;
1670 ClientContext cli_ctx;
1671 ServerContext srv_ctx;
1672 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1673
1674 // Initiate the call from the client side
1675 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
1676 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
1677
1678 // On the server, request to be notified of the 'BidiStream' call and
1679 // receive the call just made by the client
1680 srv_ctx.AsyncNotifyWhenDone(tag(11));
1681 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1682 tag(2));
1683 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1684
1685 auto verif = Verifier();
1686
1687 // Client sends the first and the only message
1688 send_request.set_message("Ping");
1689 cli_stream->Write(send_request, tag(3));
1690 verif.Expect(3, true);
1691
1692 bool expected_cq_result = true;
1693 bool ignore_cq_result = false;
1694 bool want_done_tag = false;
1695
1696 int got_tag, got_tag2;
1697 bool tag_3_done = false;
1698
1699 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1700 srv_ctx.TryCancel();
1701 verif.Expect(11, true);
1702 // We know for sure that all server cq results will be false from
1703 // this point since the server cancelled the RPC. However, we can't
1704 // say for sure about the client
1705 expected_cq_result = false;
1706 ignore_cq_result = true;
1707
1708 do {
1709 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1710 GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11));
1711 if (got_tag == 3) {
1712 tag_3_done = true;
1713 }
1714 } while (got_tag != 11);
1715 EXPECT_TRUE(srv_ctx.IsCancelled());
1716 }
1717
1718 std::thread* server_try_cancel_thd = nullptr;
1719
1720 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1721 server_try_cancel_thd =
1722 new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1723
1724 // Since server is going to cancel the RPC in a parallel thread, some of
1725 // the cq results (i.e those until the cancellation) might be true. Since
1726 // that number is non-deterministic, it is better to ignore the cq results
1727 ignore_cq_result = true;
1728 // Expect that we might possibly see the done tag that
1729 // indicates cancellation completion in this case
1730 want_done_tag = true;
1731 verif.Expect(11, true);
1732 }
1733
1734 srv_stream.Read(&recv_request, tag(4));
1735 verif.Expect(4, expected_cq_result);
1736 got_tag = tag_3_done ? 3 : verif.Next(cq_.get(), ignore_cq_result);
1737 got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1738 GPR_ASSERT((got_tag == 3) || (got_tag == 4) ||
1739 (got_tag == 11 && want_done_tag));
1740 GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) ||
1741 (got_tag2 == 11 && want_done_tag));
1742 // If we get 3 and 4, we don't need to wait for 11, but if
1743 // we get 11, we should also clear 3 and 4
1744 if (got_tag + got_tag2 != 7) {
1745 EXPECT_TRUE(srv_ctx.IsCancelled());
1746 want_done_tag = false;
1747 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1748 GPR_ASSERT((got_tag == 3) || (got_tag == 4));
1749 }
1750
1751 send_response.set_message("Pong");
1752 srv_stream.Write(send_response, tag(5));
1753 verif.Expect(5, expected_cq_result);
1754
1755 cli_stream->Read(&recv_response, tag(6));
1756 verif.Expect(6, expected_cq_result);
1757 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1758 got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1759 GPR_ASSERT((got_tag == 5) || (got_tag == 6) ||
1760 (got_tag == 11 && want_done_tag));
1761 GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) ||
1762 (got_tag2 == 11 && want_done_tag));
1763 // If we get 5 and 6, we don't need to wait for 11, but if
1764 // we get 11, we should also clear 5 and 6
1765 if (got_tag + got_tag2 != 11) {
1766 EXPECT_TRUE(srv_ctx.IsCancelled());
1767 want_done_tag = false;
1768 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1769 GPR_ASSERT((got_tag == 5) || (got_tag == 6));
1770 }
1771
1772 // This is expected to succeed in all cases
1773 cli_stream->WritesDone(tag(7));
1774 verif.Expect(7, true);
1775 // TODO(vjpai): Consider whether the following is too flexible
1776 // or whether it should just be reset to ignore_cq_result
1777 bool ignore_cq_wd_result =
1778 ignore_cq_result || (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1779 got_tag = verif.Next(cq_.get(), ignore_cq_wd_result);
1780 GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
1781 if (got_tag == 11) {
1782 EXPECT_TRUE(srv_ctx.IsCancelled());
1783 want_done_tag = false;
1784 // Now get the other entry that we were waiting on
1785 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_wd_result), 7);
1786 }
1787
1788 // This is expected to fail in all cases i.e for all values of
1789 // server_try_cancel. This is because at this point, either there are no
1790 // more msgs from the client (because client called WritesDone) or the RPC
1791 // is cancelled on the server
1792 srv_stream.Read(&recv_request, tag(8));
1793 verif.Expect(8, false);
1794 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1795 GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
1796 if (got_tag == 11) {
1797 EXPECT_TRUE(srv_ctx.IsCancelled());
1798 want_done_tag = false;
1799 // Now get the other entry that we were waiting on
1800 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
1801 }
1802
1803 if (server_try_cancel_thd != nullptr) {
1804 server_try_cancel_thd->join();
1805 delete server_try_cancel_thd;
1806 }
1807
1808 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1809 srv_ctx.TryCancel();
1810 want_done_tag = true;
1811 verif.Expect(11, true);
1812 }
1813
1814 if (want_done_tag) {
1815 verif.Verify(cq_.get());
1816 EXPECT_TRUE(srv_ctx.IsCancelled());
1817 want_done_tag = false;
1818 }
1819
1820 // The RPC has been cancelled at this point for sure (i.e irrespective of
1821 // the value of `server_try_cancel` is). So, from this point forward, we
1822 // know that cq results are supposed to return false on server.
1823
1824 srv_stream.Finish(Status::CANCELLED, tag(9));
1825 Verifier().Expect(9, false).Verify(cq_.get());
1826
1827 cli_stream->Finish(&recv_status, tag(10));
1828 Verifier().Expect(10, true).Verify(cq_.get());
1829 EXPECT_FALSE(recv_status.ok());
1830 EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1831 }
1832 };
1833
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelBefore)1834 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1835 TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1836 }
1837
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelDuring)1838 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1839 TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
1840 }
1841
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelAfter)1842 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1843 TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1844 }
1845
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelBefore)1846 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1847 TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1848 }
1849
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelDuring)1850 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1851 TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
1852 }
1853
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelAfter)1854 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1855 TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1856 }
1857
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelBefore)1858 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1859 TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1860 }
1861
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelDuring)1862 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1863 TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
1864 }
1865
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelAfter)1866 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1867 TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1868 }
1869
CreateTestScenarios(bool,bool test_message_size_limit)1870 std::vector<TestScenario> CreateTestScenarios(bool /*test_secure*/,
1871 bool test_message_size_limit) {
1872 std::vector<TestScenario> scenarios;
1873 std::vector<std::string> credentials_types;
1874 std::vector<std::string> messages;
1875
1876 auto insec_ok = [] {
1877 // Only allow insecure credentials type when it is registered with the
1878 // provider. User may create providers that do not have insecure.
1879 return GetCredentialsProvider()->GetChannelCredentials(
1880 kInsecureCredentialsType, nullptr) != nullptr;
1881 };
1882
1883 if (insec_ok()) {
1884 credentials_types.push_back(kInsecureCredentialsType);
1885 }
1886 auto sec_list = GetCredentialsProvider()->GetSecureCredentialsTypeList();
1887 for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
1888 credentials_types.push_back(*sec);
1889 }
1890 GPR_ASSERT(!credentials_types.empty());
1891
1892 messages.push_back("Hello");
1893 if (test_message_size_limit) {
1894 for (size_t k = 1; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024;
1895 k *= 32) {
1896 std::string big_msg;
1897 for (size_t i = 0; i < k * 1024; ++i) {
1898 char c = 'a' + (i % 26);
1899 big_msg += c;
1900 }
1901 messages.push_back(big_msg);
1902 }
1903 #ifndef MEMORY_SANITIZER
1904 // 4MB message processing with SSL is very slow under msan
1905 // (causes timeouts) and doesn't really increase the signal from tests.
1906 // Reserve 100 bytes for other fields of the message proto.
1907 messages.push_back(
1908 std::string(GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH - 100, 'a'));
1909 #endif
1910 }
1911
1912 // TODO (sreek) Renable tests with health check service after the issue
1913 // https://github.com/grpc/grpc/issues/11223 is resolved
1914 for (auto health_check_service : {false}) {
1915 for (auto msg = messages.begin(); msg != messages.end(); msg++) {
1916 for (auto cred = credentials_types.begin();
1917 cred != credentials_types.end(); ++cred) {
1918 scenarios.emplace_back(false, *cred, health_check_service, *msg);
1919 }
1920 if (insec_ok()) {
1921 scenarios.emplace_back(true, kInsecureCredentialsType,
1922 health_check_service, *msg);
1923 }
1924 }
1925 }
1926 return scenarios;
1927 }
1928
1929 INSTANTIATE_TEST_SUITE_P(AsyncEnd2end, AsyncEnd2endTest,
1930 ::testing::ValuesIn(CreateTestScenarios(true, true)));
1931 INSTANTIATE_TEST_SUITE_P(AsyncEnd2endServerTryCancel,
1932 AsyncEnd2endServerTryCancelTest,
1933 ::testing::ValuesIn(CreateTestScenarios(false,
1934 false)));
1935
1936 } // namespace
1937 } // namespace testing
1938 } // namespace grpc
1939
main(int argc,char ** argv)1940 int main(int argc, char** argv) {
1941 // Change the backup poll interval from 5s to 100ms to speed up the
1942 // ReconnectChannel test
1943 GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 100);
1944 grpc::testing::TestEnvironment env(argc, argv);
1945 ::testing::InitGoogleTest(&argc, argv);
1946 int ret = RUN_ALL_TESTS();
1947 return ret;
1948 }
1949