1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpcpp/channel.h>
20 #include <grpcpp/client_context.h>
21 #include <grpcpp/create_channel.h>
22 #include <grpcpp/generic/generic_stub.h>
23 #include <grpcpp/impl/proto_utils.h>
24 #include <grpcpp/server.h>
25 #include <grpcpp/server_builder.h>
26 #include <grpcpp/server_context.h>
27 #include <grpcpp/support/server_interceptor.h>
28 #include <gtest/gtest.h>
29
30 #include <memory>
31 #include <vector>
32
33 #include "absl/memory/memory.h"
34 #include "absl/strings/match.h"
35 #include "src/proto/grpc/testing/echo.grpc.pb.h"
36 #include "test/core/test_util/port.h"
37 #include "test/core/test_util/test_config.h"
38 #include "test/cpp/end2end/interceptors_util.h"
39 #include "test/cpp/end2end/test_service_impl.h"
40 #include "test/cpp/util/byte_buffer_proto_helper.h"
41
42 namespace grpc {
43 namespace testing {
44 namespace {
45
46 class LoggingInterceptor : public experimental::Interceptor {
47 public:
LoggingInterceptor(experimental::ServerRpcInfo * info)48 explicit LoggingInterceptor(experimental::ServerRpcInfo* info) {
49 // Check the method name and compare to the type
50 const char* method = info->method();
51 experimental::ServerRpcInfo::Type type = info->type();
52
53 // Check that we use one of our standard methods with expected type.
54 // Also allow the health checking service.
55 // We accept BIDI_STREAMING for Echo in case it's an AsyncGenericService
56 // being tested (the GenericRpc test).
57 // The empty method is for the Unimplemented requests that arise
58 // when draining the CQ.
59 EXPECT_TRUE(
60 strstr(method, "/grpc.health") == method ||
61 (strcmp(method, "/grpc.testing.EchoTestService/Echo") == 0 &&
62 (type == experimental::ServerRpcInfo::Type::UNARY ||
63 type == experimental::ServerRpcInfo::Type::BIDI_STREAMING)) ||
64 (strcmp(method, "/grpc.testing.EchoTestService/RequestStream") == 0 &&
65 type == experimental::ServerRpcInfo::Type::CLIENT_STREAMING) ||
66 (strcmp(method, "/grpc.testing.EchoTestService/ResponseStream") == 0 &&
67 type == experimental::ServerRpcInfo::Type::SERVER_STREAMING) ||
68 (strcmp(method, "/grpc.testing.EchoTestService/BidiStream") == 0 &&
69 type == experimental::ServerRpcInfo::Type::BIDI_STREAMING) ||
70 strcmp(method, "/grpc.testing.EchoTestService/Unimplemented") == 0 ||
71 (strcmp(method, "") == 0 &&
72 type == experimental::ServerRpcInfo::Type::BIDI_STREAMING));
73 }
74
Intercept(experimental::InterceptorBatchMethods * methods)75 void Intercept(experimental::InterceptorBatchMethods* methods) override {
76 if (methods->QueryInterceptionHookPoint(
77 experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
78 auto* map = methods->GetSendInitialMetadata();
79 // Got nothing better to do here for now
80 EXPECT_EQ(map->size(), 0);
81 }
82 if (methods->QueryInterceptionHookPoint(
83 experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
84 EchoRequest req;
85 auto* buffer = methods->GetSerializedSendMessage();
86 auto copied_buffer = *buffer;
87 EXPECT_TRUE(
88 SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req)
89 .ok());
90 EXPECT_TRUE(req.message().find("Hello") == 0);
91 }
92 if (methods->QueryInterceptionHookPoint(
93 experimental::InterceptionHookPoints::PRE_SEND_STATUS)) {
94 auto* map = methods->GetSendTrailingMetadata();
95 bool found = false;
96 // Check that we received the metadata as an echo
97 for (const auto& pair : *map) {
98 found = absl::StartsWith(pair.first, "testkey") &&
99 absl::StartsWith(pair.second, "testvalue");
100 if (found) break;
101 }
102 EXPECT_EQ(found, true);
103 auto status = methods->GetSendStatus();
104 EXPECT_EQ(status.ok(), true);
105 }
106 if (methods->QueryInterceptionHookPoint(
107 experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) {
108 auto* map = methods->GetRecvInitialMetadata();
109 bool found = false;
110 // Check that we received the metadata as an echo
111 for (const auto& pair : *map) {
112 found = pair.first.starts_with("testkey") &&
113 pair.second.starts_with("testvalue");
114 if (found) break;
115 }
116 EXPECT_EQ(found, true);
117 }
118 if (methods->QueryInterceptionHookPoint(
119 experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
120 EchoResponse* resp =
121 static_cast<EchoResponse*>(methods->GetRecvMessage());
122 if (resp != nullptr) {
123 EXPECT_TRUE(resp->message().find("Hello") == 0);
124 }
125 }
126 if (methods->QueryInterceptionHookPoint(
127 experimental::InterceptionHookPoints::POST_RECV_CLOSE)) {
128 // Got nothing interesting to do here
129 }
130 methods->Proceed();
131 }
132 };
133
134 class LoggingInterceptorFactory
135 : public experimental::ServerInterceptorFactoryInterface {
136 public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)137 experimental::Interceptor* CreateServerInterceptor(
138 experimental::ServerRpcInfo* info) override {
139 return new LoggingInterceptor(info);
140 }
141 };
142
143 // Test if SendMessage function family works as expected for sync/callback apis
144 class SyncSendMessageTester : public experimental::Interceptor {
145 public:
SyncSendMessageTester(experimental::ServerRpcInfo *)146 explicit SyncSendMessageTester(experimental::ServerRpcInfo* /*info*/) {}
147
Intercept(experimental::InterceptorBatchMethods * methods)148 void Intercept(experimental::InterceptorBatchMethods* methods) override {
149 if (methods->QueryInterceptionHookPoint(
150 experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
151 string old_msg =
152 static_cast<const EchoRequest*>(methods->GetSendMessage())->message();
153 EXPECT_EQ(old_msg.find("Hello"), 0u);
154 new_msg_.set_message("World" + old_msg);
155 methods->ModifySendMessage(&new_msg_);
156 }
157 methods->Proceed();
158 }
159
160 private:
161 EchoRequest new_msg_;
162 };
163
164 class SyncSendMessageTesterFactory
165 : public experimental::ServerInterceptorFactoryInterface {
166 public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)167 experimental::Interceptor* CreateServerInterceptor(
168 experimental::ServerRpcInfo* info) override {
169 return new SyncSendMessageTester(info);
170 }
171 };
172
173 // Test if SendMessage function family works as expected for sync/callback apis
174 class SyncSendMessageVerifier : public experimental::Interceptor {
175 public:
SyncSendMessageVerifier(experimental::ServerRpcInfo *)176 explicit SyncSendMessageVerifier(experimental::ServerRpcInfo* /*info*/) {}
177
Intercept(experimental::InterceptorBatchMethods * methods)178 void Intercept(experimental::InterceptorBatchMethods* methods) override {
179 if (methods->QueryInterceptionHookPoint(
180 experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
181 // Make sure that the changes made in SyncSendMessageTester persisted
182 string old_msg =
183 static_cast<const EchoRequest*>(methods->GetSendMessage())->message();
184 EXPECT_EQ(old_msg.find("World"), 0u);
185
186 // Remove the "World" part of the string that we added earlier
187 new_msg_.set_message(old_msg.erase(0, 5));
188 methods->ModifySendMessage(&new_msg_);
189
190 // LoggingInterceptor verifies that changes got reverted
191 }
192 methods->Proceed();
193 }
194
195 private:
196 EchoRequest new_msg_;
197 };
198
199 class SyncSendMessageVerifierFactory
200 : public experimental::ServerInterceptorFactoryInterface {
201 public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)202 experimental::Interceptor* CreateServerInterceptor(
203 experimental::ServerRpcInfo* info) override {
204 return new SyncSendMessageVerifier(info);
205 }
206 };
207
MakeBidiStreamingCall(const std::shared_ptr<Channel> & channel)208 void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel) {
209 auto stub = grpc::testing::EchoTestService::NewStub(channel);
210 ClientContext ctx;
211 EchoRequest req;
212 EchoResponse resp;
213 ctx.AddMetadata("testkey", "testvalue");
214 auto stream = stub->BidiStream(&ctx);
215 for (auto i = 0; i < 10; i++) {
216 req.set_message("Hello" + std::to_string(i));
217 stream->Write(req);
218 stream->Read(&resp);
219 EXPECT_EQ(req.message(), resp.message());
220 }
221 ASSERT_TRUE(stream->WritesDone());
222 Status s = stream->Finish();
223 EXPECT_EQ(s.ok(), true);
224 }
225
226 class ServerInterceptorsEnd2endSyncUnaryTest : public ::testing::Test {
227 protected:
ServerInterceptorsEnd2endSyncUnaryTest()228 ServerInterceptorsEnd2endSyncUnaryTest() {
229 int port = grpc_pick_unused_port_or_die();
230
231 ServerBuilder builder;
232 server_address_ = "localhost:" + std::to_string(port);
233 builder.AddListeningPort(server_address_, InsecureServerCredentials());
234 builder.RegisterService(&service_);
235
236 std::vector<
237 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
238 creators;
239 creators.push_back(
240 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
241 new SyncSendMessageTesterFactory()));
242 creators.push_back(
243 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
244 new SyncSendMessageVerifierFactory()));
245 creators.push_back(
246 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
247 new LoggingInterceptorFactory()));
248 // Add 20 phony interceptor factories and null interceptor factories
249 for (auto i = 0; i < 20; i++) {
250 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
251 creators.push_back(std::make_unique<NullInterceptorFactory>());
252 }
253 builder.experimental().SetInterceptorCreators(std::move(creators));
254 server_ = builder.BuildAndStart();
255 }
256 std::string server_address_;
257 TestServiceImpl service_;
258 std::unique_ptr<Server> server_;
259 };
260
TEST_F(ServerInterceptorsEnd2endSyncUnaryTest,UnaryTest)261 TEST_F(ServerInterceptorsEnd2endSyncUnaryTest, UnaryTest) {
262 ChannelArguments args;
263 PhonyInterceptor::Reset();
264 auto channel =
265 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
266 MakeCall(channel);
267 // Make sure all 20 phony interceptors were run
268 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
269 }
270
271 class ServerInterceptorsEnd2endSyncStreamingTest : public ::testing::Test {
272 protected:
ServerInterceptorsEnd2endSyncStreamingTest()273 ServerInterceptorsEnd2endSyncStreamingTest() {
274 int port = grpc_pick_unused_port_or_die();
275
276 ServerBuilder builder;
277 server_address_ = "localhost:" + std::to_string(port);
278 builder.AddListeningPort(server_address_, InsecureServerCredentials());
279 builder.RegisterService(&service_);
280
281 std::vector<
282 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
283 creators;
284 creators.push_back(
285 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
286 new SyncSendMessageTesterFactory()));
287 creators.push_back(
288 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
289 new SyncSendMessageVerifierFactory()));
290 creators.push_back(
291 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
292 new LoggingInterceptorFactory()));
293 for (auto i = 0; i < 20; i++) {
294 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
295 }
296 builder.experimental().SetInterceptorCreators(std::move(creators));
297 server_ = builder.BuildAndStart();
298 }
299 std::string server_address_;
300 EchoTestServiceStreamingImpl service_;
301 std::unique_ptr<Server> server_;
302 };
303
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,ClientStreamingTest)304 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ClientStreamingTest) {
305 ChannelArguments args;
306 PhonyInterceptor::Reset();
307 auto channel =
308 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
309 MakeClientStreamingCall(channel);
310 // Make sure all 20 phony interceptors were run
311 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
312 }
313
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,ServerStreamingTest)314 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ServerStreamingTest) {
315 ChannelArguments args;
316 PhonyInterceptor::Reset();
317 auto channel =
318 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
319 MakeServerStreamingCall(channel);
320 // Make sure all 20 phony interceptors were run
321 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
322 }
323
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,BidiStreamingTest)324 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, BidiStreamingTest) {
325 ChannelArguments args;
326 PhonyInterceptor::Reset();
327 auto channel =
328 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
329 MakeBidiStreamingCall(channel);
330 // Make sure all 20 phony interceptors were run
331 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
332 }
333
334 class ServerInterceptorsAsyncEnd2endTest : public ::testing::Test {};
335
TEST_F(ServerInterceptorsAsyncEnd2endTest,UnaryTest)336 TEST_F(ServerInterceptorsAsyncEnd2endTest, UnaryTest) {
337 PhonyInterceptor::Reset();
338 int port = grpc_pick_unused_port_or_die();
339 string server_address = "localhost:" + std::to_string(port);
340 ServerBuilder builder;
341 EchoTestService::AsyncService service;
342 builder.AddListeningPort(server_address, InsecureServerCredentials());
343 builder.RegisterService(&service);
344 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
345 creators;
346 creators.push_back(
347 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
348 new LoggingInterceptorFactory()));
349 for (auto i = 0; i < 20; i++) {
350 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
351 }
352 builder.experimental().SetInterceptorCreators(std::move(creators));
353 auto cq = builder.AddCompletionQueue();
354 auto server = builder.BuildAndStart();
355
356 ChannelArguments args;
357 auto channel =
358 grpc::CreateChannel(server_address, InsecureChannelCredentials());
359 auto stub = grpc::testing::EchoTestService::NewStub(channel);
360
361 EchoRequest send_request;
362 EchoRequest recv_request;
363 EchoResponse send_response;
364 EchoResponse recv_response;
365 Status recv_status;
366
367 ClientContext cli_ctx;
368 ServerContext srv_ctx;
369 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
370
371 send_request.set_message("Hello");
372 cli_ctx.AddMetadata("testkey", "testvalue");
373 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
374 stub->AsyncEcho(&cli_ctx, send_request, cq.get()));
375
376 service.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq.get(),
377 cq.get(), tag(2));
378
379 response_reader->Finish(&recv_response, &recv_status, tag(4));
380
381 Verifier().Expect(2, true).Verify(cq.get());
382 EXPECT_EQ(send_request.message(), recv_request.message());
383
384 EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
385 srv_ctx.AddTrailingMetadata("testkey", "testvalue");
386
387 send_response.set_message(recv_request.message());
388 response_writer.Finish(send_response, Status::OK, tag(3));
389 Verifier().Expect(3, true).Expect(4, true).Verify(cq.get());
390
391 EXPECT_EQ(send_response.message(), recv_response.message());
392 EXPECT_TRUE(recv_status.ok());
393 EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
394 "testvalue"));
395
396 // Make sure all 20 phony interceptors were run
397 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
398
399 server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
400 cq->Shutdown();
401 void* ignored_tag;
402 bool ignored_ok;
403 while (cq->Next(&ignored_tag, &ignored_ok)) {
404 }
405 grpc_recycle_unused_port(port);
406 }
407
TEST_F(ServerInterceptorsAsyncEnd2endTest,BidiStreamingTest)408 TEST_F(ServerInterceptorsAsyncEnd2endTest, BidiStreamingTest) {
409 PhonyInterceptor::Reset();
410 int port = grpc_pick_unused_port_or_die();
411 string server_address = "localhost:" + std::to_string(port);
412 ServerBuilder builder;
413 EchoTestService::AsyncService service;
414 builder.AddListeningPort(server_address, InsecureServerCredentials());
415 builder.RegisterService(&service);
416 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
417 creators;
418 creators.push_back(
419 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
420 new LoggingInterceptorFactory()));
421 for (auto i = 0; i < 20; i++) {
422 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
423 }
424 builder.experimental().SetInterceptorCreators(std::move(creators));
425 auto cq = builder.AddCompletionQueue();
426 auto server = builder.BuildAndStart();
427
428 ChannelArguments args;
429 auto channel =
430 grpc::CreateChannel(server_address, InsecureChannelCredentials());
431 auto stub = grpc::testing::EchoTestService::NewStub(channel);
432
433 EchoRequest send_request;
434 EchoRequest recv_request;
435 EchoResponse send_response;
436 EchoResponse recv_response;
437 Status recv_status;
438
439 ClientContext cli_ctx;
440 ServerContext srv_ctx;
441 grpc::ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
442
443 send_request.set_message("Hello");
444 cli_ctx.AddMetadata("testkey", "testvalue");
445 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
446 cli_stream(stub->AsyncBidiStream(&cli_ctx, cq.get(), tag(1)));
447
448 service.RequestBidiStream(&srv_ctx, &srv_stream, cq.get(), cq.get(), tag(2));
449
450 Verifier().Expect(1, true).Expect(2, true).Verify(cq.get());
451
452 EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
453 srv_ctx.AddTrailingMetadata("testkey", "testvalue");
454
455 cli_stream->Write(send_request, tag(3));
456 srv_stream.Read(&recv_request, tag(4));
457 Verifier().Expect(3, true).Expect(4, true).Verify(cq.get());
458 EXPECT_EQ(send_request.message(), recv_request.message());
459
460 send_response.set_message(recv_request.message());
461 srv_stream.Write(send_response, tag(5));
462 cli_stream->Read(&recv_response, tag(6));
463 Verifier().Expect(5, true).Expect(6, true).Verify(cq.get());
464 EXPECT_EQ(send_response.message(), recv_response.message());
465
466 cli_stream->WritesDone(tag(7));
467 srv_stream.Read(&recv_request, tag(8));
468 Verifier().Expect(7, true).Expect(8, false).Verify(cq.get());
469
470 srv_stream.Finish(Status::OK, tag(9));
471 cli_stream->Finish(&recv_status, tag(10));
472 Verifier().Expect(9, true).Expect(10, true).Verify(cq.get());
473
474 EXPECT_TRUE(recv_status.ok());
475 EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
476 "testvalue"));
477
478 // Make sure all 20 phony interceptors were run
479 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
480
481 server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
482 cq->Shutdown();
483 void* ignored_tag;
484 bool ignored_ok;
485 while (cq->Next(&ignored_tag, &ignored_ok)) {
486 }
487 grpc_recycle_unused_port(port);
488 }
489
TEST_F(ServerInterceptorsAsyncEnd2endTest,GenericRPCTest)490 TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) {
491 PhonyInterceptor::Reset();
492 int port = grpc_pick_unused_port_or_die();
493 string server_address = "localhost:" + std::to_string(port);
494 ServerBuilder builder;
495 AsyncGenericService service;
496 builder.AddListeningPort(server_address, InsecureServerCredentials());
497 builder.RegisterAsyncGenericService(&service);
498 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
499 creators;
500 creators.reserve(20);
501 for (auto i = 0; i < 20; i++) {
502 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
503 }
504 builder.experimental().SetInterceptorCreators(std::move(creators));
505 auto srv_cq = builder.AddCompletionQueue();
506 CompletionQueue cli_cq;
507 auto server = builder.BuildAndStart();
508
509 ChannelArguments args;
510 auto channel =
511 grpc::CreateChannel(server_address, InsecureChannelCredentials());
512 GenericStub generic_stub(channel);
513
514 const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
515 EchoRequest send_request;
516 EchoRequest recv_request;
517 EchoResponse send_response;
518 EchoResponse recv_response;
519 Status recv_status;
520
521 ClientContext cli_ctx;
522 GenericServerContext srv_ctx;
523 GenericServerAsyncReaderWriter stream(&srv_ctx);
524
525 // The string needs to be long enough to test heap-based slice.
526 send_request.set_message("Hello");
527 cli_ctx.AddMetadata("testkey", "testvalue");
528
529 CompletionQueue* cq = srv_cq.get();
530 std::thread request_call([cq]() { Verifier().Expect(4, true).Verify(cq); });
531 std::unique_ptr<GenericClientAsyncReaderWriter> call =
532 generic_stub.PrepareCall(&cli_ctx, kMethodName, &cli_cq);
533 call->StartCall(tag(1));
534 Verifier().Expect(1, true).Verify(&cli_cq);
535 std::unique_ptr<ByteBuffer> send_buffer =
536 SerializeToByteBuffer(&send_request);
537 call->Write(*send_buffer, tag(2));
538 // Send ByteBuffer can be destroyed after calling Write.
539 send_buffer.reset();
540 Verifier().Expect(2, true).Verify(&cli_cq);
541 call->WritesDone(tag(3));
542 Verifier().Expect(3, true).Verify(&cli_cq);
543
544 service.RequestCall(&srv_ctx, &stream, srv_cq.get(), srv_cq.get(), tag(4));
545
546 request_call.join();
547 EXPECT_EQ(kMethodName, srv_ctx.method());
548 EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
549 srv_ctx.AddTrailingMetadata("testkey", "testvalue");
550
551 ByteBuffer recv_buffer;
552 stream.Read(&recv_buffer, tag(5));
553 Verifier().Expect(5, true).Verify(srv_cq.get());
554 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
555 EXPECT_EQ(send_request.message(), recv_request.message());
556
557 send_response.set_message(recv_request.message());
558 send_buffer = SerializeToByteBuffer(&send_response);
559 stream.Write(*send_buffer, tag(6));
560 send_buffer.reset();
561 Verifier().Expect(6, true).Verify(srv_cq.get());
562
563 stream.Finish(Status::OK, tag(7));
564 // Shutdown srv_cq before we try to get the tag back, to verify that the
565 // interception API handles completion queue shutdowns that take place before
566 // all the tags are returned
567 srv_cq->Shutdown();
568 Verifier().Expect(7, true).Verify(srv_cq.get());
569
570 recv_buffer.Clear();
571 call->Read(&recv_buffer, tag(8));
572 Verifier().Expect(8, true).Verify(&cli_cq);
573 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
574
575 call->Finish(&recv_status, tag(9));
576 cli_cq.Shutdown();
577 Verifier().Expect(9, true).Verify(&cli_cq);
578
579 EXPECT_EQ(send_response.message(), recv_response.message());
580 EXPECT_TRUE(recv_status.ok());
581 EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
582 "testvalue"));
583
584 // Make sure all 20 phony interceptors were run
585 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
586
587 server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
588 void* ignored_tag;
589 bool ignored_ok;
590 while (cli_cq.Next(&ignored_tag, &ignored_ok)) {
591 }
592 while (srv_cq->Next(&ignored_tag, &ignored_ok)) {
593 }
594 grpc_recycle_unused_port(port);
595 }
596
TEST_F(ServerInterceptorsAsyncEnd2endTest,UnimplementedRpcTest)597 TEST_F(ServerInterceptorsAsyncEnd2endTest, UnimplementedRpcTest) {
598 PhonyInterceptor::Reset();
599 int port = grpc_pick_unused_port_or_die();
600 string server_address = "localhost:" + std::to_string(port);
601 ServerBuilder builder;
602 builder.AddListeningPort(server_address, InsecureServerCredentials());
603 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
604 creators;
605 creators.reserve(20);
606 for (auto i = 0; i < 20; i++) {
607 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
608 }
609 builder.experimental().SetInterceptorCreators(std::move(creators));
610 auto cq = builder.AddCompletionQueue();
611 auto server = builder.BuildAndStart();
612
613 ChannelArguments args;
614 std::shared_ptr<Channel> channel =
615 grpc::CreateChannel(server_address, InsecureChannelCredentials());
616 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
617 stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
618 EchoRequest send_request;
619 EchoResponse recv_response;
620 Status recv_status;
621
622 ClientContext cli_ctx;
623 send_request.set_message("Hello");
624 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
625 stub->AsyncUnimplemented(&cli_ctx, send_request, cq.get()));
626
627 response_reader->Finish(&recv_response, &recv_status, tag(4));
628 Verifier().Expect(4, true).Verify(cq.get());
629
630 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
631 EXPECT_EQ("", recv_status.error_message());
632
633 // Make sure all 20 phony interceptors were run
634 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
635
636 server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
637 cq->Shutdown();
638 void* ignored_tag;
639 bool ignored_ok;
640 while (cq->Next(&ignored_tag, &ignored_ok)) {
641 }
642 grpc_recycle_unused_port(port);
643 }
644
645 class ServerInterceptorsSyncUnimplementedEnd2endTest : public ::testing::Test {
646 };
647
TEST_F(ServerInterceptorsSyncUnimplementedEnd2endTest,UnimplementedRpcTest)648 TEST_F(ServerInterceptorsSyncUnimplementedEnd2endTest, UnimplementedRpcTest) {
649 PhonyInterceptor::Reset();
650 int port = grpc_pick_unused_port_or_die();
651 string server_address = "localhost:" + std::to_string(port);
652 ServerBuilder builder;
653 TestServiceImpl service;
654 builder.RegisterService(&service);
655 builder.AddListeningPort(server_address, InsecureServerCredentials());
656 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
657 creators;
658 creators.reserve(20);
659 for (auto i = 0; i < 20; i++) {
660 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
661 }
662 builder.experimental().SetInterceptorCreators(std::move(creators));
663 auto server = builder.BuildAndStart();
664
665 ChannelArguments args;
666 std::shared_ptr<Channel> channel =
667 grpc::CreateChannel(server_address, InsecureChannelCredentials());
668 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
669 stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
670 EchoRequest send_request;
671 EchoResponse recv_response;
672
673 ClientContext cli_ctx;
674 send_request.set_message("Hello");
675 Status recv_status =
676 stub->Unimplemented(&cli_ctx, send_request, &recv_response);
677
678 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
679 EXPECT_EQ("", recv_status.error_message());
680
681 // Make sure all 20 phony interceptors were run
682 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
683
684 server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
685 grpc_recycle_unused_port(port);
686 }
687
688 } // namespace
689 } // namespace testing
690 } // namespace grpc
691
main(int argc,char ** argv)692 int main(int argc, char** argv) {
693 grpc::testing::TestEnvironment env(&argc, argv);
694 ::testing::InitGoogleTest(&argc, argv);
695 return RUN_ALL_TESTS();
696 }
697