• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <memory>
20 #include <thread>
21 
22 #include <grpc/grpc.h>
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/create_channel.h>
26 #include <grpcpp/generic/async_generic_service.h>
27 #include <grpcpp/server.h>
28 #include <grpcpp/server_builder.h>
29 #include <grpcpp/server_context.h>
30 
31 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
32 #include "src/proto/grpc/testing/echo.grpc.pb.h"
33 #include "test/core/util/port.h"
34 #include "test/core/util/test_config.h"
35 #include "test/cpp/end2end/test_service_impl.h"
36 #include "test/cpp/util/byte_buffer_proto_helper.h"
37 
38 #include <gtest/gtest.h>
39 
40 namespace grpc {
41 namespace testing {
42 
43 namespace {
44 
tag(int i)45 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
46 
VerifyReturnSuccess(CompletionQueue * cq,int i)47 bool VerifyReturnSuccess(CompletionQueue* cq, int i) {
48   void* got_tag;
49   bool ok;
50   EXPECT_TRUE(cq->Next(&got_tag, &ok));
51   EXPECT_EQ(tag(i), got_tag);
52   return ok;
53 }
54 
Verify(CompletionQueue * cq,int i,bool expect_ok)55 void Verify(CompletionQueue* cq, int i, bool expect_ok) {
56   EXPECT_EQ(expect_ok, VerifyReturnSuccess(cq, i));
57 }
58 
59 // Handlers to handle async request at a server. To be run in a separate thread.
60 template <class Service>
HandleEcho(Service * service,ServerCompletionQueue * cq,bool dup_service)61 void HandleEcho(Service* service, ServerCompletionQueue* cq, bool dup_service) {
62   ServerContext srv_ctx;
63   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
64   EchoRequest recv_request;
65   EchoResponse send_response;
66   service->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq, cq,
67                        tag(1));
68   Verify(cq, 1, true);
69   send_response.set_message(recv_request.message());
70   if (dup_service) {
71     send_response.mutable_message()->append("_dup");
72   }
73   response_writer.Finish(send_response, Status::OK, tag(2));
74   Verify(cq, 2, true);
75 }
76 
77 // Handlers to handle raw request at a server. To be run in a
78 // separate thread. Note that this is the same as the async version, except
79 // that the req/resp are ByteBuffers
80 template <class Service>
HandleRawEcho(Service * service,ServerCompletionQueue * cq,bool dup_service)81 void HandleRawEcho(Service* service, ServerCompletionQueue* cq,
82                    bool dup_service) {
83   ServerContext srv_ctx;
84   GenericServerAsyncResponseWriter response_writer(&srv_ctx);
85   ByteBuffer recv_buffer;
86   service->RequestEcho(&srv_ctx, &recv_buffer, &response_writer, cq, cq,
87                        tag(1));
88   Verify(cq, 1, true);
89   EchoRequest recv_request;
90   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
91   EchoResponse send_response;
92   send_response.set_message(recv_request.message());
93   auto send_buffer = SerializeToByteBuffer(&send_response);
94   response_writer.Finish(*send_buffer, Status::OK, tag(2));
95   Verify(cq, 2, true);
96 }
97 
98 template <class Service>
HandleClientStreaming(Service * service,ServerCompletionQueue * cq)99 void HandleClientStreaming(Service* service, ServerCompletionQueue* cq) {
100   ServerContext srv_ctx;
101   EchoRequest recv_request;
102   EchoResponse send_response;
103   ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
104   service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
105   Verify(cq, 1, true);
106   int i = 1;
107   do {
108     i++;
109     send_response.mutable_message()->append(recv_request.message());
110     srv_stream.Read(&recv_request, tag(i));
111   } while (VerifyReturnSuccess(cq, i));
112   srv_stream.Finish(send_response, Status::OK, tag(100));
113   Verify(cq, 100, true);
114 }
115 
116 template <class Service>
HandleRawClientStreaming(Service * service,ServerCompletionQueue * cq)117 void HandleRawClientStreaming(Service* service, ServerCompletionQueue* cq) {
118   ServerContext srv_ctx;
119   ByteBuffer recv_buffer;
120   EchoRequest recv_request;
121   EchoResponse send_response;
122   GenericServerAsyncReader srv_stream(&srv_ctx);
123   service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
124   Verify(cq, 1, true);
125   int i = 1;
126   while (true) {
127     i++;
128     srv_stream.Read(&recv_buffer, tag(i));
129     if (!VerifyReturnSuccess(cq, i)) {
130       break;
131     }
132     EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
133     send_response.mutable_message()->append(recv_request.message());
134   }
135   auto send_buffer = SerializeToByteBuffer(&send_response);
136   srv_stream.Finish(*send_buffer, Status::OK, tag(100));
137   Verify(cq, 100, true);
138 }
139 
140 template <class Service>
HandleServerStreaming(Service * service,ServerCompletionQueue * cq)141 void HandleServerStreaming(Service* service, ServerCompletionQueue* cq) {
142   ServerContext srv_ctx;
143   EchoRequest recv_request;
144   EchoResponse send_response;
145   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
146   service->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq, cq,
147                                  tag(1));
148   Verify(cq, 1, true);
149   send_response.set_message(recv_request.message() + "0");
150   srv_stream.Write(send_response, tag(2));
151   Verify(cq, 2, true);
152   send_response.set_message(recv_request.message() + "1");
153   srv_stream.Write(send_response, tag(3));
154   Verify(cq, 3, true);
155   send_response.set_message(recv_request.message() + "2");
156   srv_stream.Write(send_response, tag(4));
157   Verify(cq, 4, true);
158   srv_stream.Finish(Status::OK, tag(5));
159   Verify(cq, 5, true);
160 }
161 
HandleGenericEcho(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)162 void HandleGenericEcho(GenericServerAsyncReaderWriter* stream,
163                        CompletionQueue* cq) {
164   ByteBuffer recv_buffer;
165   stream->Read(&recv_buffer, tag(2));
166   Verify(cq, 2, true);
167   EchoRequest recv_request;
168   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
169   EchoResponse send_response;
170   send_response.set_message(recv_request.message());
171   auto send_buffer = SerializeToByteBuffer(&send_response);
172   stream->Write(*send_buffer, tag(3));
173   Verify(cq, 3, true);
174   stream->Finish(Status::OK, tag(4));
175   Verify(cq, 4, true);
176 }
177 
HandleGenericRequestStream(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)178 void HandleGenericRequestStream(GenericServerAsyncReaderWriter* stream,
179                                 CompletionQueue* cq) {
180   ByteBuffer recv_buffer;
181   EchoRequest recv_request;
182   EchoResponse send_response;
183   int i = 1;
184   while (true) {
185     i++;
186     stream->Read(&recv_buffer, tag(i));
187     if (!VerifyReturnSuccess(cq, i)) {
188       break;
189     }
190     EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
191     send_response.mutable_message()->append(recv_request.message());
192   }
193   auto send_buffer = SerializeToByteBuffer(&send_response);
194   stream->Write(*send_buffer, tag(99));
195   Verify(cq, 99, true);
196   stream->Finish(Status::OK, tag(100));
197   Verify(cq, 100, true);
198 }
199 
200 // Request and handle one generic call.
HandleGenericCall(AsyncGenericService * service,ServerCompletionQueue * cq)201 void HandleGenericCall(AsyncGenericService* service,
202                        ServerCompletionQueue* cq) {
203   GenericServerContext srv_ctx;
204   GenericServerAsyncReaderWriter stream(&srv_ctx);
205   service->RequestCall(&srv_ctx, &stream, cq, cq, tag(1));
206   Verify(cq, 1, true);
207   if (srv_ctx.method() == "/grpc.testing.EchoTestService/Echo") {
208     HandleGenericEcho(&stream, cq);
209   } else if (srv_ctx.method() ==
210              "/grpc.testing.EchoTestService/RequestStream") {
211     HandleGenericRequestStream(&stream, cq);
212   } else {  // other methods not handled yet.
213     gpr_log(GPR_ERROR, "method: %s", srv_ctx.method().c_str());
214     GPR_ASSERT(0);
215   }
216 }
217 
218 class TestServiceImplDupPkg
219     : public ::grpc::testing::duplicate::EchoTestService::Service {
220  public:
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)221   Status Echo(ServerContext* context, const EchoRequest* request,
222               EchoResponse* response) override {
223     response->set_message(request->message() + "_dup");
224     return Status::OK;
225   }
226 };
227 
228 class HybridEnd2endTest : public ::testing::Test {
229  protected:
HybridEnd2endTest()230   HybridEnd2endTest() {}
231 
SetUpServer(::grpc::Service * service1,::grpc::Service * service2,AsyncGenericService * generic_service,int max_message_size=0)232   void SetUpServer(::grpc::Service* service1, ::grpc::Service* service2,
233                    AsyncGenericService* generic_service,
234                    int max_message_size = 0) {
235     int port = grpc_pick_unused_port_or_die();
236     server_address_ << "localhost:" << port;
237 
238     // Setup server
239     ServerBuilder builder;
240     builder.AddListeningPort(server_address_.str(),
241                              grpc::InsecureServerCredentials());
242     // Always add a sync unimplemented service: we rely on having at least one
243     // synchronous method to get a listening cq
244     builder.RegisterService(&unimplemented_service_);
245     builder.RegisterService(service1);
246     if (service2) {
247       builder.RegisterService(service2);
248     }
249     if (generic_service) {
250       builder.RegisterAsyncGenericService(generic_service);
251     }
252 
253     if (max_message_size != 0) {
254       builder.SetMaxMessageSize(max_message_size);
255     }
256 
257     // Create a separate cq for each potential handler.
258     for (int i = 0; i < 5; i++) {
259       cqs_.push_back(builder.AddCompletionQueue(false));
260     }
261     server_ = builder.BuildAndStart();
262   }
263 
TearDown()264   void TearDown() override {
265     if (server_) {
266       server_->Shutdown();
267     }
268     void* ignored_tag;
269     bool ignored_ok;
270     for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
271       (*it)->Shutdown();
272       while ((*it)->Next(&ignored_tag, &ignored_ok))
273         ;
274     }
275   }
276 
ResetStub()277   void ResetStub() {
278     std::shared_ptr<Channel> channel =
279         CreateChannel(server_address_.str(), InsecureChannelCredentials());
280     stub_ = grpc::testing::EchoTestService::NewStub(channel);
281   }
282 
283   // Test all rpc methods.
TestAllMethods()284   void TestAllMethods() {
285     SendEcho();
286     SendSimpleClientStreaming();
287     SendSimpleServerStreaming();
288     SendBidiStreaming();
289   }
290 
SendEcho()291   void SendEcho() {
292     EchoRequest send_request;
293     EchoResponse recv_response;
294     ClientContext cli_ctx;
295     cli_ctx.set_wait_for_ready(true);
296     send_request.set_message("Hello");
297     Status recv_status = stub_->Echo(&cli_ctx, send_request, &recv_response);
298     EXPECT_EQ(send_request.message(), recv_response.message());
299     EXPECT_TRUE(recv_status.ok());
300   }
301 
SendEchoToDupService()302   void SendEchoToDupService() {
303     std::shared_ptr<Channel> channel =
304         CreateChannel(server_address_.str(), InsecureChannelCredentials());
305     auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
306     EchoRequest send_request;
307     EchoResponse recv_response;
308     ClientContext cli_ctx;
309     cli_ctx.set_wait_for_ready(true);
310     send_request.set_message("Hello");
311     Status recv_status = stub->Echo(&cli_ctx, send_request, &recv_response);
312     EXPECT_EQ(send_request.message() + "_dup", recv_response.message());
313     EXPECT_TRUE(recv_status.ok());
314   }
315 
SendSimpleClientStreaming()316   void SendSimpleClientStreaming() {
317     EchoRequest send_request;
318     EchoResponse recv_response;
319     grpc::string expected_message;
320     ClientContext cli_ctx;
321     cli_ctx.set_wait_for_ready(true);
322     send_request.set_message("Hello");
323     auto stream = stub_->RequestStream(&cli_ctx, &recv_response);
324     for (int i = 0; i < 5; i++) {
325       EXPECT_TRUE(stream->Write(send_request));
326       expected_message.append(send_request.message());
327     }
328     stream->WritesDone();
329     Status recv_status = stream->Finish();
330     EXPECT_EQ(expected_message, recv_response.message());
331     EXPECT_TRUE(recv_status.ok());
332   }
333 
SendSimpleServerStreaming()334   void SendSimpleServerStreaming() {
335     EchoRequest request;
336     EchoResponse response;
337     ClientContext context;
338     context.set_wait_for_ready(true);
339     request.set_message("hello");
340 
341     auto stream = stub_->ResponseStream(&context, request);
342     EXPECT_TRUE(stream->Read(&response));
343     EXPECT_EQ(response.message(), request.message() + "0");
344     EXPECT_TRUE(stream->Read(&response));
345     EXPECT_EQ(response.message(), request.message() + "1");
346     EXPECT_TRUE(stream->Read(&response));
347     EXPECT_EQ(response.message(), request.message() + "2");
348     EXPECT_FALSE(stream->Read(&response));
349 
350     Status s = stream->Finish();
351     EXPECT_TRUE(s.ok());
352   }
353 
SendSimpleServerStreamingToDupService()354   void SendSimpleServerStreamingToDupService() {
355     std::shared_ptr<Channel> channel =
356         CreateChannel(server_address_.str(), InsecureChannelCredentials());
357     auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
358     EchoRequest request;
359     EchoResponse response;
360     ClientContext context;
361     context.set_wait_for_ready(true);
362     request.set_message("hello");
363 
364     auto stream = stub->ResponseStream(&context, request);
365     EXPECT_TRUE(stream->Read(&response));
366     EXPECT_EQ(response.message(), request.message() + "0_dup");
367     EXPECT_TRUE(stream->Read(&response));
368     EXPECT_EQ(response.message(), request.message() + "1_dup");
369     EXPECT_TRUE(stream->Read(&response));
370     EXPECT_EQ(response.message(), request.message() + "2_dup");
371     EXPECT_FALSE(stream->Read(&response));
372 
373     Status s = stream->Finish();
374     EXPECT_TRUE(s.ok());
375   }
376 
SendBidiStreaming()377   void SendBidiStreaming() {
378     EchoRequest request;
379     EchoResponse response;
380     ClientContext context;
381     context.set_wait_for_ready(true);
382     grpc::string msg("hello");
383 
384     auto stream = stub_->BidiStream(&context);
385 
386     request.set_message(msg + "0");
387     EXPECT_TRUE(stream->Write(request));
388     EXPECT_TRUE(stream->Read(&response));
389     EXPECT_EQ(response.message(), request.message());
390 
391     request.set_message(msg + "1");
392     EXPECT_TRUE(stream->Write(request));
393     EXPECT_TRUE(stream->Read(&response));
394     EXPECT_EQ(response.message(), request.message());
395 
396     request.set_message(msg + "2");
397     EXPECT_TRUE(stream->Write(request));
398     EXPECT_TRUE(stream->Read(&response));
399     EXPECT_EQ(response.message(), request.message());
400 
401     stream->WritesDone();
402     EXPECT_FALSE(stream->Read(&response));
403     EXPECT_FALSE(stream->Read(&response));
404 
405     Status s = stream->Finish();
406     EXPECT_TRUE(s.ok());
407   }
408 
409   grpc::testing::UnimplementedEchoService::Service unimplemented_service_;
410   std::vector<std::unique_ptr<ServerCompletionQueue>> cqs_;
411   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
412   std::unique_ptr<Server> server_;
413   std::ostringstream server_address_;
414 };
415 
TEST_F(HybridEnd2endTest,AsyncEcho)416 TEST_F(HybridEnd2endTest, AsyncEcho) {
417   typedef EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> SType;
418   SType service;
419   SetUpServer(&service, nullptr, nullptr);
420   ResetStub();
421   std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
422                                   false);
423   TestAllMethods();
424   echo_handler_thread.join();
425 }
426 
TEST_F(HybridEnd2endTest,RawEcho)427 TEST_F(HybridEnd2endTest, RawEcho) {
428   typedef EchoTestService::WithRawMethod_Echo<TestServiceImpl> SType;
429   SType service;
430   SetUpServer(&service, nullptr, nullptr);
431   ResetStub();
432   std::thread echo_handler_thread(HandleRawEcho<SType>, &service, cqs_[0].get(),
433                                   false);
434   TestAllMethods();
435   echo_handler_thread.join();
436 }
437 
TEST_F(HybridEnd2endTest,RawRequestStream)438 TEST_F(HybridEnd2endTest, RawRequestStream) {
439   typedef EchoTestService::WithRawMethod_RequestStream<TestServiceImpl> SType;
440   SType service;
441   SetUpServer(&service, nullptr, nullptr);
442   ResetStub();
443   std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
444                                             &service, cqs_[0].get());
445   TestAllMethods();
446   request_stream_handler_thread.join();
447 }
448 
TEST_F(HybridEnd2endTest,AsyncEchoRawRequestStream)449 TEST_F(HybridEnd2endTest, AsyncEchoRawRequestStream) {
450   typedef EchoTestService::WithRawMethod_RequestStream<
451       EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
452       SType;
453   SType service;
454   SetUpServer(&service, nullptr, nullptr);
455   ResetStub();
456   std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
457                                   false);
458   std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
459                                             &service, cqs_[1].get());
460   TestAllMethods();
461   request_stream_handler_thread.join();
462   echo_handler_thread.join();
463 }
464 
TEST_F(HybridEnd2endTest,GenericEchoRawRequestStream)465 TEST_F(HybridEnd2endTest, GenericEchoRawRequestStream) {
466   typedef EchoTestService::WithRawMethod_RequestStream<
467       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
468       SType;
469   SType service;
470   AsyncGenericService generic_service;
471   SetUpServer(&service, nullptr, &generic_service);
472   ResetStub();
473   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
474                                      cqs_[0].get());
475   std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
476                                             &service, cqs_[1].get());
477   TestAllMethods();
478   generic_handler_thread.join();
479   request_stream_handler_thread.join();
480 }
481 
TEST_F(HybridEnd2endTest,AsyncEchoRequestStream)482 TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) {
483   typedef EchoTestService::WithAsyncMethod_RequestStream<
484       EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
485       SType;
486   SType service;
487   SetUpServer(&service, nullptr, nullptr);
488   ResetStub();
489   std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
490                                   false);
491   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
492                                             &service, cqs_[1].get());
493   TestAllMethods();
494   echo_handler_thread.join();
495   request_stream_handler_thread.join();
496 }
497 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream)498 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) {
499   typedef EchoTestService::WithAsyncMethod_RequestStream<
500       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
501       SType;
502   SType service;
503   SetUpServer(&service, nullptr, nullptr);
504   ResetStub();
505   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
506                                              &service, cqs_[0].get());
507   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
508                                             &service, cqs_[1].get());
509   TestAllMethods();
510   response_stream_handler_thread.join();
511   request_stream_handler_thread.join();
512 }
513 
514 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncDupService)515 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) {
516   typedef EchoTestService::WithAsyncMethod_RequestStream<
517       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
518       SType;
519   SType service;
520   TestServiceImplDupPkg dup_service;
521   SetUpServer(&service, &dup_service, nullptr);
522   ResetStub();
523   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
524                                              &service, cqs_[0].get());
525   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
526                                             &service, cqs_[1].get());
527   TestAllMethods();
528   SendEchoToDupService();
529   response_stream_handler_thread.join();
530   request_stream_handler_thread.join();
531 }
532 
533 // Add a second service with one sync streamed unary method.
534 class StreamedUnaryDupPkg
535     : public duplicate::EchoTestService::WithStreamedUnaryMethod_Echo<
536           TestServiceImplDupPkg> {
537  public:
StreamedEcho(ServerContext * context,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)538   Status StreamedEcho(
539       ServerContext* context,
540       ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
541     EchoRequest req;
542     EchoResponse resp;
543     uint32_t next_msg_sz;
544     stream->NextMessageSize(&next_msg_sz);
545     gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
546     GPR_ASSERT(stream->Read(&req));
547     resp.set_message(req.message() + "_dup");
548     GPR_ASSERT(stream->Write(resp));
549     return Status::OK;
550   }
551 };
552 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncStreamedUnaryDupService)553 TEST_F(HybridEnd2endTest,
554        AsyncRequestStreamResponseStream_SyncStreamedUnaryDupService) {
555   typedef EchoTestService::WithAsyncMethod_RequestStream<
556       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
557       SType;
558   SType service;
559   StreamedUnaryDupPkg dup_service;
560   SetUpServer(&service, &dup_service, nullptr, 8192);
561   ResetStub();
562   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
563                                              &service, cqs_[0].get());
564   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
565                                             &service, cqs_[1].get());
566   TestAllMethods();
567   SendEchoToDupService();
568   response_stream_handler_thread.join();
569   request_stream_handler_thread.join();
570 }
571 
572 // Add a second service that is fully Streamed Unary
573 class FullyStreamedUnaryDupPkg
574     : public duplicate::EchoTestService::StreamedUnaryService {
575  public:
StreamedEcho(ServerContext * context,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)576   Status StreamedEcho(
577       ServerContext* context,
578       ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
579     EchoRequest req;
580     EchoResponse resp;
581     uint32_t next_msg_sz;
582     stream->NextMessageSize(&next_msg_sz);
583     gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
584     GPR_ASSERT(stream->Read(&req));
585     resp.set_message(req.message() + "_dup");
586     GPR_ASSERT(stream->Write(resp));
587     return Status::OK;
588   }
589 };
590 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncFullyStreamedUnaryDupService)591 TEST_F(HybridEnd2endTest,
592        AsyncRequestStreamResponseStream_SyncFullyStreamedUnaryDupService) {
593   typedef EchoTestService::WithAsyncMethod_RequestStream<
594       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
595       SType;
596   SType service;
597   FullyStreamedUnaryDupPkg dup_service;
598   SetUpServer(&service, &dup_service, nullptr, 8192);
599   ResetStub();
600   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
601                                              &service, cqs_[0].get());
602   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
603                                             &service, cqs_[1].get());
604   TestAllMethods();
605   SendEchoToDupService();
606   response_stream_handler_thread.join();
607   request_stream_handler_thread.join();
608 }
609 
610 // Add a second service with one sync split server streaming method.
611 class SplitResponseStreamDupPkg
612     : public duplicate::EchoTestService::
613           WithSplitStreamingMethod_ResponseStream<TestServiceImplDupPkg> {
614  public:
StreamedResponseStream(ServerContext * context,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)615   Status StreamedResponseStream(
616       ServerContext* context,
617       ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
618     EchoRequest req;
619     EchoResponse resp;
620     uint32_t next_msg_sz;
621     stream->NextMessageSize(&next_msg_sz);
622     gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
623     GPR_ASSERT(stream->Read(&req));
624     for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
625       resp.set_message(req.message() + grpc::to_string(i) + "_dup");
626       GPR_ASSERT(stream->Write(resp));
627     }
628     return Status::OK;
629   }
630 };
631 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncSplitStreamedDupService)632 TEST_F(HybridEnd2endTest,
633        AsyncRequestStreamResponseStream_SyncSplitStreamedDupService) {
634   typedef EchoTestService::WithAsyncMethod_RequestStream<
635       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
636       SType;
637   SType service;
638   SplitResponseStreamDupPkg dup_service;
639   SetUpServer(&service, &dup_service, nullptr, 8192);
640   ResetStub();
641   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
642                                              &service, cqs_[0].get());
643   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
644                                             &service, cqs_[1].get());
645   TestAllMethods();
646   SendSimpleServerStreamingToDupService();
647   response_stream_handler_thread.join();
648   request_stream_handler_thread.join();
649 }
650 
651 // Add a second service that is fully split server streamed
652 class FullySplitStreamedDupPkg
653     : public duplicate::EchoTestService::SplitStreamedService {
654  public:
StreamedResponseStream(ServerContext * context,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)655   Status StreamedResponseStream(
656       ServerContext* context,
657       ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
658     EchoRequest req;
659     EchoResponse resp;
660     uint32_t next_msg_sz;
661     stream->NextMessageSize(&next_msg_sz);
662     gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
663     GPR_ASSERT(stream->Read(&req));
664     for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
665       resp.set_message(req.message() + grpc::to_string(i) + "_dup");
666       GPR_ASSERT(stream->Write(resp));
667     }
668     return Status::OK;
669   }
670 };
671 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_FullySplitStreamedDupService)672 TEST_F(HybridEnd2endTest,
673        AsyncRequestStreamResponseStream_FullySplitStreamedDupService) {
674   typedef EchoTestService::WithAsyncMethod_RequestStream<
675       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
676       SType;
677   SType service;
678   FullySplitStreamedDupPkg dup_service;
679   SetUpServer(&service, &dup_service, nullptr, 8192);
680   ResetStub();
681   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
682                                              &service, cqs_[0].get());
683   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
684                                             &service, cqs_[1].get());
685   TestAllMethods();
686   SendSimpleServerStreamingToDupService();
687   response_stream_handler_thread.join();
688   request_stream_handler_thread.join();
689 }
690 
691 // Add a second service that is fully server streamed
692 class FullyStreamedDupPkg : public duplicate::EchoTestService::StreamedService {
693  public:
StreamedEcho(ServerContext * context,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)694   Status StreamedEcho(
695       ServerContext* context,
696       ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
697     EchoRequest req;
698     EchoResponse resp;
699     uint32_t next_msg_sz;
700     stream->NextMessageSize(&next_msg_sz);
701     gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
702     GPR_ASSERT(stream->Read(&req));
703     resp.set_message(req.message() + "_dup");
704     GPR_ASSERT(stream->Write(resp));
705     return Status::OK;
706   }
StreamedResponseStream(ServerContext * context,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)707   Status StreamedResponseStream(
708       ServerContext* context,
709       ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
710     EchoRequest req;
711     EchoResponse resp;
712     uint32_t next_msg_sz;
713     stream->NextMessageSize(&next_msg_sz);
714     gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
715     GPR_ASSERT(stream->Read(&req));
716     for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
717       resp.set_message(req.message() + grpc::to_string(i) + "_dup");
718       GPR_ASSERT(stream->Write(resp));
719     }
720     return Status::OK;
721   }
722 };
723 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_FullyStreamedDupService)724 TEST_F(HybridEnd2endTest,
725        AsyncRequestStreamResponseStream_FullyStreamedDupService) {
726   typedef EchoTestService::WithAsyncMethod_RequestStream<
727       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
728       SType;
729   SType service;
730   FullyStreamedDupPkg dup_service;
731   SetUpServer(&service, &dup_service, nullptr, 8192);
732   ResetStub();
733   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
734                                              &service, cqs_[0].get());
735   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
736                                             &service, cqs_[1].get());
737   TestAllMethods();
738   SendEchoToDupService();
739   SendSimpleServerStreamingToDupService();
740   response_stream_handler_thread.join();
741   request_stream_handler_thread.join();
742 }
743 
744 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_AsyncDupService)745 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) {
746   typedef EchoTestService::WithAsyncMethod_RequestStream<
747       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
748       SType;
749   SType service;
750   duplicate::EchoTestService::AsyncService dup_service;
751   SetUpServer(&service, &dup_service, nullptr);
752   ResetStub();
753   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
754                                              &service, cqs_[0].get());
755   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
756                                             &service, cqs_[1].get());
757   std::thread echo_handler_thread(
758       HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
759       cqs_[2].get(), true);
760   TestAllMethods();
761   SendEchoToDupService();
762   response_stream_handler_thread.join();
763   request_stream_handler_thread.join();
764   echo_handler_thread.join();
765 }
766 
TEST_F(HybridEnd2endTest,GenericEcho)767 TEST_F(HybridEnd2endTest, GenericEcho) {
768   EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
769   AsyncGenericService generic_service;
770   SetUpServer(&service, nullptr, &generic_service);
771   ResetStub();
772   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
773                                      cqs_[0].get());
774   TestAllMethods();
775   generic_handler_thread.join();
776 }
777 
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream)778 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) {
779   typedef EchoTestService::WithAsyncMethod_RequestStream<
780       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
781       SType;
782   SType service;
783   AsyncGenericService generic_service;
784   SetUpServer(&service, nullptr, &generic_service);
785   ResetStub();
786   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
787                                      cqs_[0].get());
788   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
789                                             &service, cqs_[1].get());
790   TestAllMethods();
791   generic_handler_thread.join();
792   request_stream_handler_thread.join();
793 }
794 
795 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream_SyncDupService)796 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_SyncDupService) {
797   typedef EchoTestService::WithAsyncMethod_RequestStream<
798       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
799       SType;
800   SType service;
801   AsyncGenericService generic_service;
802   TestServiceImplDupPkg dup_service;
803   SetUpServer(&service, &dup_service, &generic_service);
804   ResetStub();
805   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
806                                      cqs_[0].get());
807   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
808                                             &service, cqs_[1].get());
809   TestAllMethods();
810   SendEchoToDupService();
811   generic_handler_thread.join();
812   request_stream_handler_thread.join();
813 }
814 
815 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream_AsyncDupService)816 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_AsyncDupService) {
817   typedef EchoTestService::WithAsyncMethod_RequestStream<
818       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
819       SType;
820   SType service;
821   AsyncGenericService generic_service;
822   duplicate::EchoTestService::AsyncService dup_service;
823   SetUpServer(&service, &dup_service, &generic_service);
824   ResetStub();
825   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
826                                      cqs_[0].get());
827   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
828                                             &service, cqs_[1].get());
829   std::thread echo_handler_thread(
830       HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
831       cqs_[2].get(), true);
832   TestAllMethods();
833   SendEchoToDupService();
834   generic_handler_thread.join();
835   request_stream_handler_thread.join();
836   echo_handler_thread.join();
837 }
838 
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamResponseStream)839 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
840   typedef EchoTestService::WithAsyncMethod_RequestStream<
841       EchoTestService::WithGenericMethod_Echo<
842           EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
843       SType;
844   SType service;
845   AsyncGenericService generic_service;
846   SetUpServer(&service, nullptr, &generic_service);
847   ResetStub();
848   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
849                                      cqs_[0].get());
850   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
851                                             &service, cqs_[1].get());
852   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
853                                              &service, cqs_[2].get());
854   TestAllMethods();
855   generic_handler_thread.join();
856   request_stream_handler_thread.join();
857   response_stream_handler_thread.join();
858 }
859 
TEST_F(HybridEnd2endTest,GenericEchoRequestStreamAsyncResponseStream)860 TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) {
861   typedef EchoTestService::WithGenericMethod_RequestStream<
862       EchoTestService::WithGenericMethod_Echo<
863           EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
864       SType;
865   SType service;
866   AsyncGenericService generic_service;
867   SetUpServer(&service, nullptr, &generic_service);
868   ResetStub();
869   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
870                                      cqs_[0].get());
871   std::thread generic_handler_thread2(HandleGenericCall, &generic_service,
872                                       cqs_[1].get());
873   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
874                                              &service, cqs_[2].get());
875   TestAllMethods();
876   generic_handler_thread.join();
877   generic_handler_thread2.join();
878   response_stream_handler_thread.join();
879 }
880 
881 // If WithGenericMethod is called and no generic service is registered, the
882 // server will fail to build.
TEST_F(HybridEnd2endTest,GenericMethodWithoutGenericService)883 TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) {
884   EchoTestService::WithGenericMethod_RequestStream<
885       EchoTestService::WithGenericMethod_Echo<
886           EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
887       service;
888   SetUpServer(&service, nullptr, nullptr);
889   EXPECT_EQ(nullptr, server_.get());
890 }
891 
892 }  // namespace
893 }  // namespace testing
894 }  // namespace grpc
895 
main(int argc,char ** argv)896 int main(int argc, char** argv) {
897   grpc_test_init(argc, argv);
898   ::testing::InitGoogleTest(&argc, argv);
899   return RUN_ALL_TESTS();
900 }
901