1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <memory>
20 #include <thread>
21
22 #include <grpc/grpc.h>
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/create_channel.h>
26 #include <grpcpp/generic/async_generic_service.h>
27 #include <grpcpp/server.h>
28 #include <grpcpp/server_builder.h>
29 #include <grpcpp/server_context.h>
30
31 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
32 #include "src/proto/grpc/testing/echo.grpc.pb.h"
33 #include "test/core/util/port.h"
34 #include "test/core/util/test_config.h"
35 #include "test/cpp/end2end/test_service_impl.h"
36 #include "test/cpp/util/byte_buffer_proto_helper.h"
37
38 #include <gtest/gtest.h>
39
40 namespace grpc {
41 namespace testing {
42
43 namespace {
44
tag(int i)45 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
46
VerifyReturnSuccess(CompletionQueue * cq,int i)47 bool VerifyReturnSuccess(CompletionQueue* cq, int i) {
48 void* got_tag;
49 bool ok;
50 EXPECT_TRUE(cq->Next(&got_tag, &ok));
51 EXPECT_EQ(tag(i), got_tag);
52 return ok;
53 }
54
Verify(CompletionQueue * cq,int i,bool expect_ok)55 void Verify(CompletionQueue* cq, int i, bool expect_ok) {
56 EXPECT_EQ(expect_ok, VerifyReturnSuccess(cq, i));
57 }
58
59 // Handlers to handle async request at a server. To be run in a separate thread.
60 template <class Service>
HandleEcho(Service * service,ServerCompletionQueue * cq,bool dup_service)61 void HandleEcho(Service* service, ServerCompletionQueue* cq, bool dup_service) {
62 ServerContext srv_ctx;
63 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
64 EchoRequest recv_request;
65 EchoResponse send_response;
66 service->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq, cq,
67 tag(1));
68 Verify(cq, 1, true);
69 send_response.set_message(recv_request.message());
70 if (dup_service) {
71 send_response.mutable_message()->append("_dup");
72 }
73 response_writer.Finish(send_response, Status::OK, tag(2));
74 Verify(cq, 2, true);
75 }
76
77 // Handlers to handle raw request at a server. To be run in a
78 // separate thread. Note that this is the same as the async version, except
79 // that the req/resp are ByteBuffers
80 template <class Service>
HandleRawEcho(Service * service,ServerCompletionQueue * cq,bool dup_service)81 void HandleRawEcho(Service* service, ServerCompletionQueue* cq,
82 bool dup_service) {
83 ServerContext srv_ctx;
84 GenericServerAsyncResponseWriter response_writer(&srv_ctx);
85 ByteBuffer recv_buffer;
86 service->RequestEcho(&srv_ctx, &recv_buffer, &response_writer, cq, cq,
87 tag(1));
88 Verify(cq, 1, true);
89 EchoRequest recv_request;
90 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
91 EchoResponse send_response;
92 send_response.set_message(recv_request.message());
93 auto send_buffer = SerializeToByteBuffer(&send_response);
94 response_writer.Finish(*send_buffer, Status::OK, tag(2));
95 Verify(cq, 2, true);
96 }
97
98 template <class Service>
HandleClientStreaming(Service * service,ServerCompletionQueue * cq)99 void HandleClientStreaming(Service* service, ServerCompletionQueue* cq) {
100 ServerContext srv_ctx;
101 EchoRequest recv_request;
102 EchoResponse send_response;
103 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
104 service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
105 Verify(cq, 1, true);
106 int i = 1;
107 do {
108 i++;
109 send_response.mutable_message()->append(recv_request.message());
110 srv_stream.Read(&recv_request, tag(i));
111 } while (VerifyReturnSuccess(cq, i));
112 srv_stream.Finish(send_response, Status::OK, tag(100));
113 Verify(cq, 100, true);
114 }
115
116 template <class Service>
HandleRawClientStreaming(Service * service,ServerCompletionQueue * cq)117 void HandleRawClientStreaming(Service* service, ServerCompletionQueue* cq) {
118 ServerContext srv_ctx;
119 ByteBuffer recv_buffer;
120 EchoRequest recv_request;
121 EchoResponse send_response;
122 GenericServerAsyncReader srv_stream(&srv_ctx);
123 service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
124 Verify(cq, 1, true);
125 int i = 1;
126 while (true) {
127 i++;
128 srv_stream.Read(&recv_buffer, tag(i));
129 if (!VerifyReturnSuccess(cq, i)) {
130 break;
131 }
132 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
133 send_response.mutable_message()->append(recv_request.message());
134 }
135 auto send_buffer = SerializeToByteBuffer(&send_response);
136 srv_stream.Finish(*send_buffer, Status::OK, tag(100));
137 Verify(cq, 100, true);
138 }
139
140 template <class Service>
HandleServerStreaming(Service * service,ServerCompletionQueue * cq)141 void HandleServerStreaming(Service* service, ServerCompletionQueue* cq) {
142 ServerContext srv_ctx;
143 EchoRequest recv_request;
144 EchoResponse send_response;
145 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
146 service->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq, cq,
147 tag(1));
148 Verify(cq, 1, true);
149 send_response.set_message(recv_request.message() + "0");
150 srv_stream.Write(send_response, tag(2));
151 Verify(cq, 2, true);
152 send_response.set_message(recv_request.message() + "1");
153 srv_stream.Write(send_response, tag(3));
154 Verify(cq, 3, true);
155 send_response.set_message(recv_request.message() + "2");
156 srv_stream.Write(send_response, tag(4));
157 Verify(cq, 4, true);
158 srv_stream.Finish(Status::OK, tag(5));
159 Verify(cq, 5, true);
160 }
161
HandleGenericEcho(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)162 void HandleGenericEcho(GenericServerAsyncReaderWriter* stream,
163 CompletionQueue* cq) {
164 ByteBuffer recv_buffer;
165 stream->Read(&recv_buffer, tag(2));
166 Verify(cq, 2, true);
167 EchoRequest recv_request;
168 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
169 EchoResponse send_response;
170 send_response.set_message(recv_request.message());
171 auto send_buffer = SerializeToByteBuffer(&send_response);
172 stream->Write(*send_buffer, tag(3));
173 Verify(cq, 3, true);
174 stream->Finish(Status::OK, tag(4));
175 Verify(cq, 4, true);
176 }
177
HandleGenericRequestStream(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)178 void HandleGenericRequestStream(GenericServerAsyncReaderWriter* stream,
179 CompletionQueue* cq) {
180 ByteBuffer recv_buffer;
181 EchoRequest recv_request;
182 EchoResponse send_response;
183 int i = 1;
184 while (true) {
185 i++;
186 stream->Read(&recv_buffer, tag(i));
187 if (!VerifyReturnSuccess(cq, i)) {
188 break;
189 }
190 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
191 send_response.mutable_message()->append(recv_request.message());
192 }
193 auto send_buffer = SerializeToByteBuffer(&send_response);
194 stream->Write(*send_buffer, tag(99));
195 Verify(cq, 99, true);
196 stream->Finish(Status::OK, tag(100));
197 Verify(cq, 100, true);
198 }
199
200 // Request and handle one generic call.
HandleGenericCall(AsyncGenericService * service,ServerCompletionQueue * cq)201 void HandleGenericCall(AsyncGenericService* service,
202 ServerCompletionQueue* cq) {
203 GenericServerContext srv_ctx;
204 GenericServerAsyncReaderWriter stream(&srv_ctx);
205 service->RequestCall(&srv_ctx, &stream, cq, cq, tag(1));
206 Verify(cq, 1, true);
207 if (srv_ctx.method() == "/grpc.testing.EchoTestService/Echo") {
208 HandleGenericEcho(&stream, cq);
209 } else if (srv_ctx.method() ==
210 "/grpc.testing.EchoTestService/RequestStream") {
211 HandleGenericRequestStream(&stream, cq);
212 } else { // other methods not handled yet.
213 gpr_log(GPR_ERROR, "method: %s", srv_ctx.method().c_str());
214 GPR_ASSERT(0);
215 }
216 }
217
218 class TestServiceImplDupPkg
219 : public ::grpc::testing::duplicate::EchoTestService::Service {
220 public:
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)221 Status Echo(ServerContext* context, const EchoRequest* request,
222 EchoResponse* response) override {
223 response->set_message(request->message() + "_dup");
224 return Status::OK;
225 }
226 };
227
228 class HybridEnd2endTest : public ::testing::Test {
229 protected:
HybridEnd2endTest()230 HybridEnd2endTest() {}
231
SetUpServer(::grpc::Service * service1,::grpc::Service * service2,AsyncGenericService * generic_service,int max_message_size=0)232 void SetUpServer(::grpc::Service* service1, ::grpc::Service* service2,
233 AsyncGenericService* generic_service,
234 int max_message_size = 0) {
235 int port = grpc_pick_unused_port_or_die();
236 server_address_ << "localhost:" << port;
237
238 // Setup server
239 ServerBuilder builder;
240 builder.AddListeningPort(server_address_.str(),
241 grpc::InsecureServerCredentials());
242 // Always add a sync unimplemented service: we rely on having at least one
243 // synchronous method to get a listening cq
244 builder.RegisterService(&unimplemented_service_);
245 builder.RegisterService(service1);
246 if (service2) {
247 builder.RegisterService(service2);
248 }
249 if (generic_service) {
250 builder.RegisterAsyncGenericService(generic_service);
251 }
252
253 if (max_message_size != 0) {
254 builder.SetMaxMessageSize(max_message_size);
255 }
256
257 // Create a separate cq for each potential handler.
258 for (int i = 0; i < 5; i++) {
259 cqs_.push_back(builder.AddCompletionQueue(false));
260 }
261 server_ = builder.BuildAndStart();
262 }
263
TearDown()264 void TearDown() override {
265 if (server_) {
266 server_->Shutdown();
267 }
268 void* ignored_tag;
269 bool ignored_ok;
270 for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
271 (*it)->Shutdown();
272 while ((*it)->Next(&ignored_tag, &ignored_ok))
273 ;
274 }
275 }
276
ResetStub()277 void ResetStub() {
278 std::shared_ptr<Channel> channel =
279 CreateChannel(server_address_.str(), InsecureChannelCredentials());
280 stub_ = grpc::testing::EchoTestService::NewStub(channel);
281 }
282
283 // Test all rpc methods.
TestAllMethods()284 void TestAllMethods() {
285 SendEcho();
286 SendSimpleClientStreaming();
287 SendSimpleServerStreaming();
288 SendBidiStreaming();
289 }
290
SendEcho()291 void SendEcho() {
292 EchoRequest send_request;
293 EchoResponse recv_response;
294 ClientContext cli_ctx;
295 cli_ctx.set_wait_for_ready(true);
296 send_request.set_message("Hello");
297 Status recv_status = stub_->Echo(&cli_ctx, send_request, &recv_response);
298 EXPECT_EQ(send_request.message(), recv_response.message());
299 EXPECT_TRUE(recv_status.ok());
300 }
301
SendEchoToDupService()302 void SendEchoToDupService() {
303 std::shared_ptr<Channel> channel =
304 CreateChannel(server_address_.str(), InsecureChannelCredentials());
305 auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
306 EchoRequest send_request;
307 EchoResponse recv_response;
308 ClientContext cli_ctx;
309 cli_ctx.set_wait_for_ready(true);
310 send_request.set_message("Hello");
311 Status recv_status = stub->Echo(&cli_ctx, send_request, &recv_response);
312 EXPECT_EQ(send_request.message() + "_dup", recv_response.message());
313 EXPECT_TRUE(recv_status.ok());
314 }
315
SendSimpleClientStreaming()316 void SendSimpleClientStreaming() {
317 EchoRequest send_request;
318 EchoResponse recv_response;
319 grpc::string expected_message;
320 ClientContext cli_ctx;
321 cli_ctx.set_wait_for_ready(true);
322 send_request.set_message("Hello");
323 auto stream = stub_->RequestStream(&cli_ctx, &recv_response);
324 for (int i = 0; i < 5; i++) {
325 EXPECT_TRUE(stream->Write(send_request));
326 expected_message.append(send_request.message());
327 }
328 stream->WritesDone();
329 Status recv_status = stream->Finish();
330 EXPECT_EQ(expected_message, recv_response.message());
331 EXPECT_TRUE(recv_status.ok());
332 }
333
SendSimpleServerStreaming()334 void SendSimpleServerStreaming() {
335 EchoRequest request;
336 EchoResponse response;
337 ClientContext context;
338 context.set_wait_for_ready(true);
339 request.set_message("hello");
340
341 auto stream = stub_->ResponseStream(&context, request);
342 EXPECT_TRUE(stream->Read(&response));
343 EXPECT_EQ(response.message(), request.message() + "0");
344 EXPECT_TRUE(stream->Read(&response));
345 EXPECT_EQ(response.message(), request.message() + "1");
346 EXPECT_TRUE(stream->Read(&response));
347 EXPECT_EQ(response.message(), request.message() + "2");
348 EXPECT_FALSE(stream->Read(&response));
349
350 Status s = stream->Finish();
351 EXPECT_TRUE(s.ok());
352 }
353
SendSimpleServerStreamingToDupService()354 void SendSimpleServerStreamingToDupService() {
355 std::shared_ptr<Channel> channel =
356 CreateChannel(server_address_.str(), InsecureChannelCredentials());
357 auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
358 EchoRequest request;
359 EchoResponse response;
360 ClientContext context;
361 context.set_wait_for_ready(true);
362 request.set_message("hello");
363
364 auto stream = stub->ResponseStream(&context, request);
365 EXPECT_TRUE(stream->Read(&response));
366 EXPECT_EQ(response.message(), request.message() + "0_dup");
367 EXPECT_TRUE(stream->Read(&response));
368 EXPECT_EQ(response.message(), request.message() + "1_dup");
369 EXPECT_TRUE(stream->Read(&response));
370 EXPECT_EQ(response.message(), request.message() + "2_dup");
371 EXPECT_FALSE(stream->Read(&response));
372
373 Status s = stream->Finish();
374 EXPECT_TRUE(s.ok());
375 }
376
SendBidiStreaming()377 void SendBidiStreaming() {
378 EchoRequest request;
379 EchoResponse response;
380 ClientContext context;
381 context.set_wait_for_ready(true);
382 grpc::string msg("hello");
383
384 auto stream = stub_->BidiStream(&context);
385
386 request.set_message(msg + "0");
387 EXPECT_TRUE(stream->Write(request));
388 EXPECT_TRUE(stream->Read(&response));
389 EXPECT_EQ(response.message(), request.message());
390
391 request.set_message(msg + "1");
392 EXPECT_TRUE(stream->Write(request));
393 EXPECT_TRUE(stream->Read(&response));
394 EXPECT_EQ(response.message(), request.message());
395
396 request.set_message(msg + "2");
397 EXPECT_TRUE(stream->Write(request));
398 EXPECT_TRUE(stream->Read(&response));
399 EXPECT_EQ(response.message(), request.message());
400
401 stream->WritesDone();
402 EXPECT_FALSE(stream->Read(&response));
403 EXPECT_FALSE(stream->Read(&response));
404
405 Status s = stream->Finish();
406 EXPECT_TRUE(s.ok());
407 }
408
409 grpc::testing::UnimplementedEchoService::Service unimplemented_service_;
410 std::vector<std::unique_ptr<ServerCompletionQueue>> cqs_;
411 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
412 std::unique_ptr<Server> server_;
413 std::ostringstream server_address_;
414 };
415
TEST_F(HybridEnd2endTest,AsyncEcho)416 TEST_F(HybridEnd2endTest, AsyncEcho) {
417 typedef EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> SType;
418 SType service;
419 SetUpServer(&service, nullptr, nullptr);
420 ResetStub();
421 std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
422 false);
423 TestAllMethods();
424 echo_handler_thread.join();
425 }
426
TEST_F(HybridEnd2endTest,RawEcho)427 TEST_F(HybridEnd2endTest, RawEcho) {
428 typedef EchoTestService::WithRawMethod_Echo<TestServiceImpl> SType;
429 SType service;
430 SetUpServer(&service, nullptr, nullptr);
431 ResetStub();
432 std::thread echo_handler_thread(HandleRawEcho<SType>, &service, cqs_[0].get(),
433 false);
434 TestAllMethods();
435 echo_handler_thread.join();
436 }
437
TEST_F(HybridEnd2endTest,RawRequestStream)438 TEST_F(HybridEnd2endTest, RawRequestStream) {
439 typedef EchoTestService::WithRawMethod_RequestStream<TestServiceImpl> SType;
440 SType service;
441 SetUpServer(&service, nullptr, nullptr);
442 ResetStub();
443 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
444 &service, cqs_[0].get());
445 TestAllMethods();
446 request_stream_handler_thread.join();
447 }
448
TEST_F(HybridEnd2endTest,AsyncEchoRawRequestStream)449 TEST_F(HybridEnd2endTest, AsyncEchoRawRequestStream) {
450 typedef EchoTestService::WithRawMethod_RequestStream<
451 EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
452 SType;
453 SType service;
454 SetUpServer(&service, nullptr, nullptr);
455 ResetStub();
456 std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
457 false);
458 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
459 &service, cqs_[1].get());
460 TestAllMethods();
461 request_stream_handler_thread.join();
462 echo_handler_thread.join();
463 }
464
TEST_F(HybridEnd2endTest,GenericEchoRawRequestStream)465 TEST_F(HybridEnd2endTest, GenericEchoRawRequestStream) {
466 typedef EchoTestService::WithRawMethod_RequestStream<
467 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
468 SType;
469 SType service;
470 AsyncGenericService generic_service;
471 SetUpServer(&service, nullptr, &generic_service);
472 ResetStub();
473 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
474 cqs_[0].get());
475 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
476 &service, cqs_[1].get());
477 TestAllMethods();
478 generic_handler_thread.join();
479 request_stream_handler_thread.join();
480 }
481
TEST_F(HybridEnd2endTest,AsyncEchoRequestStream)482 TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) {
483 typedef EchoTestService::WithAsyncMethod_RequestStream<
484 EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
485 SType;
486 SType service;
487 SetUpServer(&service, nullptr, nullptr);
488 ResetStub();
489 std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
490 false);
491 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
492 &service, cqs_[1].get());
493 TestAllMethods();
494 echo_handler_thread.join();
495 request_stream_handler_thread.join();
496 }
497
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream)498 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) {
499 typedef EchoTestService::WithAsyncMethod_RequestStream<
500 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
501 SType;
502 SType service;
503 SetUpServer(&service, nullptr, nullptr);
504 ResetStub();
505 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
506 &service, cqs_[0].get());
507 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
508 &service, cqs_[1].get());
509 TestAllMethods();
510 response_stream_handler_thread.join();
511 request_stream_handler_thread.join();
512 }
513
514 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncDupService)515 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) {
516 typedef EchoTestService::WithAsyncMethod_RequestStream<
517 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
518 SType;
519 SType service;
520 TestServiceImplDupPkg dup_service;
521 SetUpServer(&service, &dup_service, nullptr);
522 ResetStub();
523 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
524 &service, cqs_[0].get());
525 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
526 &service, cqs_[1].get());
527 TestAllMethods();
528 SendEchoToDupService();
529 response_stream_handler_thread.join();
530 request_stream_handler_thread.join();
531 }
532
533 // Add a second service with one sync streamed unary method.
534 class StreamedUnaryDupPkg
535 : public duplicate::EchoTestService::WithStreamedUnaryMethod_Echo<
536 TestServiceImplDupPkg> {
537 public:
StreamedEcho(ServerContext * context,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)538 Status StreamedEcho(
539 ServerContext* context,
540 ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
541 EchoRequest req;
542 EchoResponse resp;
543 uint32_t next_msg_sz;
544 stream->NextMessageSize(&next_msg_sz);
545 gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
546 GPR_ASSERT(stream->Read(&req));
547 resp.set_message(req.message() + "_dup");
548 GPR_ASSERT(stream->Write(resp));
549 return Status::OK;
550 }
551 };
552
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncStreamedUnaryDupService)553 TEST_F(HybridEnd2endTest,
554 AsyncRequestStreamResponseStream_SyncStreamedUnaryDupService) {
555 typedef EchoTestService::WithAsyncMethod_RequestStream<
556 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
557 SType;
558 SType service;
559 StreamedUnaryDupPkg dup_service;
560 SetUpServer(&service, &dup_service, nullptr, 8192);
561 ResetStub();
562 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
563 &service, cqs_[0].get());
564 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
565 &service, cqs_[1].get());
566 TestAllMethods();
567 SendEchoToDupService();
568 response_stream_handler_thread.join();
569 request_stream_handler_thread.join();
570 }
571
572 // Add a second service that is fully Streamed Unary
573 class FullyStreamedUnaryDupPkg
574 : public duplicate::EchoTestService::StreamedUnaryService {
575 public:
StreamedEcho(ServerContext * context,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)576 Status StreamedEcho(
577 ServerContext* context,
578 ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
579 EchoRequest req;
580 EchoResponse resp;
581 uint32_t next_msg_sz;
582 stream->NextMessageSize(&next_msg_sz);
583 gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
584 GPR_ASSERT(stream->Read(&req));
585 resp.set_message(req.message() + "_dup");
586 GPR_ASSERT(stream->Write(resp));
587 return Status::OK;
588 }
589 };
590
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncFullyStreamedUnaryDupService)591 TEST_F(HybridEnd2endTest,
592 AsyncRequestStreamResponseStream_SyncFullyStreamedUnaryDupService) {
593 typedef EchoTestService::WithAsyncMethod_RequestStream<
594 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
595 SType;
596 SType service;
597 FullyStreamedUnaryDupPkg dup_service;
598 SetUpServer(&service, &dup_service, nullptr, 8192);
599 ResetStub();
600 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
601 &service, cqs_[0].get());
602 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
603 &service, cqs_[1].get());
604 TestAllMethods();
605 SendEchoToDupService();
606 response_stream_handler_thread.join();
607 request_stream_handler_thread.join();
608 }
609
610 // Add a second service with one sync split server streaming method.
611 class SplitResponseStreamDupPkg
612 : public duplicate::EchoTestService::
613 WithSplitStreamingMethod_ResponseStream<TestServiceImplDupPkg> {
614 public:
StreamedResponseStream(ServerContext * context,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)615 Status StreamedResponseStream(
616 ServerContext* context,
617 ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
618 EchoRequest req;
619 EchoResponse resp;
620 uint32_t next_msg_sz;
621 stream->NextMessageSize(&next_msg_sz);
622 gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
623 GPR_ASSERT(stream->Read(&req));
624 for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
625 resp.set_message(req.message() + grpc::to_string(i) + "_dup");
626 GPR_ASSERT(stream->Write(resp));
627 }
628 return Status::OK;
629 }
630 };
631
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncSplitStreamedDupService)632 TEST_F(HybridEnd2endTest,
633 AsyncRequestStreamResponseStream_SyncSplitStreamedDupService) {
634 typedef EchoTestService::WithAsyncMethod_RequestStream<
635 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
636 SType;
637 SType service;
638 SplitResponseStreamDupPkg dup_service;
639 SetUpServer(&service, &dup_service, nullptr, 8192);
640 ResetStub();
641 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
642 &service, cqs_[0].get());
643 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
644 &service, cqs_[1].get());
645 TestAllMethods();
646 SendSimpleServerStreamingToDupService();
647 response_stream_handler_thread.join();
648 request_stream_handler_thread.join();
649 }
650
651 // Add a second service that is fully split server streamed
652 class FullySplitStreamedDupPkg
653 : public duplicate::EchoTestService::SplitStreamedService {
654 public:
StreamedResponseStream(ServerContext * context,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)655 Status StreamedResponseStream(
656 ServerContext* context,
657 ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
658 EchoRequest req;
659 EchoResponse resp;
660 uint32_t next_msg_sz;
661 stream->NextMessageSize(&next_msg_sz);
662 gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
663 GPR_ASSERT(stream->Read(&req));
664 for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
665 resp.set_message(req.message() + grpc::to_string(i) + "_dup");
666 GPR_ASSERT(stream->Write(resp));
667 }
668 return Status::OK;
669 }
670 };
671
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_FullySplitStreamedDupService)672 TEST_F(HybridEnd2endTest,
673 AsyncRequestStreamResponseStream_FullySplitStreamedDupService) {
674 typedef EchoTestService::WithAsyncMethod_RequestStream<
675 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
676 SType;
677 SType service;
678 FullySplitStreamedDupPkg dup_service;
679 SetUpServer(&service, &dup_service, nullptr, 8192);
680 ResetStub();
681 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
682 &service, cqs_[0].get());
683 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
684 &service, cqs_[1].get());
685 TestAllMethods();
686 SendSimpleServerStreamingToDupService();
687 response_stream_handler_thread.join();
688 request_stream_handler_thread.join();
689 }
690
691 // Add a second service that is fully server streamed
692 class FullyStreamedDupPkg : public duplicate::EchoTestService::StreamedService {
693 public:
StreamedEcho(ServerContext * context,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)694 Status StreamedEcho(
695 ServerContext* context,
696 ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
697 EchoRequest req;
698 EchoResponse resp;
699 uint32_t next_msg_sz;
700 stream->NextMessageSize(&next_msg_sz);
701 gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
702 GPR_ASSERT(stream->Read(&req));
703 resp.set_message(req.message() + "_dup");
704 GPR_ASSERT(stream->Write(resp));
705 return Status::OK;
706 }
StreamedResponseStream(ServerContext * context,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)707 Status StreamedResponseStream(
708 ServerContext* context,
709 ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
710 EchoRequest req;
711 EchoResponse resp;
712 uint32_t next_msg_sz;
713 stream->NextMessageSize(&next_msg_sz);
714 gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
715 GPR_ASSERT(stream->Read(&req));
716 for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
717 resp.set_message(req.message() + grpc::to_string(i) + "_dup");
718 GPR_ASSERT(stream->Write(resp));
719 }
720 return Status::OK;
721 }
722 };
723
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_FullyStreamedDupService)724 TEST_F(HybridEnd2endTest,
725 AsyncRequestStreamResponseStream_FullyStreamedDupService) {
726 typedef EchoTestService::WithAsyncMethod_RequestStream<
727 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
728 SType;
729 SType service;
730 FullyStreamedDupPkg dup_service;
731 SetUpServer(&service, &dup_service, nullptr, 8192);
732 ResetStub();
733 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
734 &service, cqs_[0].get());
735 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
736 &service, cqs_[1].get());
737 TestAllMethods();
738 SendEchoToDupService();
739 SendSimpleServerStreamingToDupService();
740 response_stream_handler_thread.join();
741 request_stream_handler_thread.join();
742 }
743
744 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_AsyncDupService)745 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) {
746 typedef EchoTestService::WithAsyncMethod_RequestStream<
747 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
748 SType;
749 SType service;
750 duplicate::EchoTestService::AsyncService dup_service;
751 SetUpServer(&service, &dup_service, nullptr);
752 ResetStub();
753 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
754 &service, cqs_[0].get());
755 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
756 &service, cqs_[1].get());
757 std::thread echo_handler_thread(
758 HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
759 cqs_[2].get(), true);
760 TestAllMethods();
761 SendEchoToDupService();
762 response_stream_handler_thread.join();
763 request_stream_handler_thread.join();
764 echo_handler_thread.join();
765 }
766
TEST_F(HybridEnd2endTest,GenericEcho)767 TEST_F(HybridEnd2endTest, GenericEcho) {
768 EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
769 AsyncGenericService generic_service;
770 SetUpServer(&service, nullptr, &generic_service);
771 ResetStub();
772 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
773 cqs_[0].get());
774 TestAllMethods();
775 generic_handler_thread.join();
776 }
777
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream)778 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) {
779 typedef EchoTestService::WithAsyncMethod_RequestStream<
780 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
781 SType;
782 SType service;
783 AsyncGenericService generic_service;
784 SetUpServer(&service, nullptr, &generic_service);
785 ResetStub();
786 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
787 cqs_[0].get());
788 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
789 &service, cqs_[1].get());
790 TestAllMethods();
791 generic_handler_thread.join();
792 request_stream_handler_thread.join();
793 }
794
795 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream_SyncDupService)796 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_SyncDupService) {
797 typedef EchoTestService::WithAsyncMethod_RequestStream<
798 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
799 SType;
800 SType service;
801 AsyncGenericService generic_service;
802 TestServiceImplDupPkg dup_service;
803 SetUpServer(&service, &dup_service, &generic_service);
804 ResetStub();
805 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
806 cqs_[0].get());
807 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
808 &service, cqs_[1].get());
809 TestAllMethods();
810 SendEchoToDupService();
811 generic_handler_thread.join();
812 request_stream_handler_thread.join();
813 }
814
815 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream_AsyncDupService)816 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_AsyncDupService) {
817 typedef EchoTestService::WithAsyncMethod_RequestStream<
818 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
819 SType;
820 SType service;
821 AsyncGenericService generic_service;
822 duplicate::EchoTestService::AsyncService dup_service;
823 SetUpServer(&service, &dup_service, &generic_service);
824 ResetStub();
825 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
826 cqs_[0].get());
827 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
828 &service, cqs_[1].get());
829 std::thread echo_handler_thread(
830 HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
831 cqs_[2].get(), true);
832 TestAllMethods();
833 SendEchoToDupService();
834 generic_handler_thread.join();
835 request_stream_handler_thread.join();
836 echo_handler_thread.join();
837 }
838
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamResponseStream)839 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
840 typedef EchoTestService::WithAsyncMethod_RequestStream<
841 EchoTestService::WithGenericMethod_Echo<
842 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
843 SType;
844 SType service;
845 AsyncGenericService generic_service;
846 SetUpServer(&service, nullptr, &generic_service);
847 ResetStub();
848 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
849 cqs_[0].get());
850 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
851 &service, cqs_[1].get());
852 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
853 &service, cqs_[2].get());
854 TestAllMethods();
855 generic_handler_thread.join();
856 request_stream_handler_thread.join();
857 response_stream_handler_thread.join();
858 }
859
TEST_F(HybridEnd2endTest,GenericEchoRequestStreamAsyncResponseStream)860 TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) {
861 typedef EchoTestService::WithGenericMethod_RequestStream<
862 EchoTestService::WithGenericMethod_Echo<
863 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
864 SType;
865 SType service;
866 AsyncGenericService generic_service;
867 SetUpServer(&service, nullptr, &generic_service);
868 ResetStub();
869 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
870 cqs_[0].get());
871 std::thread generic_handler_thread2(HandleGenericCall, &generic_service,
872 cqs_[1].get());
873 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
874 &service, cqs_[2].get());
875 TestAllMethods();
876 generic_handler_thread.join();
877 generic_handler_thread2.join();
878 response_stream_handler_thread.join();
879 }
880
881 // If WithGenericMethod is called and no generic service is registered, the
882 // server will fail to build.
TEST_F(HybridEnd2endTest,GenericMethodWithoutGenericService)883 TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) {
884 EchoTestService::WithGenericMethod_RequestStream<
885 EchoTestService::WithGenericMethod_Echo<
886 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
887 service;
888 SetUpServer(&service, nullptr, nullptr);
889 EXPECT_EQ(nullptr, server_.get());
890 }
891
892 } // namespace
893 } // namespace testing
894 } // namespace grpc
895
main(int argc,char ** argv)896 int main(int argc, char** argv) {
897 grpc_test_init(argc, argv);
898 ::testing::InitGoogleTest(&argc, argv);
899 return RUN_ALL_TESTS();
900 }
901