• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <memory>
20 #include <thread>
21 
22 #include <grpc/grpc.h>
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/create_channel.h>
26 #include <grpcpp/generic/async_generic_service.h>
27 #include <grpcpp/server.h>
28 #include <grpcpp/server_builder.h>
29 #include <grpcpp/server_context.h>
30 
31 #include "src/core/lib/gpr/env.h"
32 #include "src/core/lib/iomgr/iomgr.h"
33 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
34 #include "src/proto/grpc/testing/echo.grpc.pb.h"
35 #include "test/core/util/port.h"
36 #include "test/core/util/test_config.h"
37 #include "test/cpp/end2end/test_service_impl.h"
38 #include "test/cpp/util/byte_buffer_proto_helper.h"
39 
40 #include <gtest/gtest.h>
41 
42 namespace grpc {
43 namespace testing {
44 namespace {
45 
46 #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL
47 using ::grpc::experimental::CallbackGenericService;
48 using ::grpc::experimental::GenericCallbackServerContext;
49 using ::grpc::experimental::ServerGenericBidiReactor;
50 #endif
51 
tag(int i)52 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
53 
VerifyReturnSuccess(CompletionQueue * cq,int i)54 bool VerifyReturnSuccess(CompletionQueue* cq, int i) {
55   void* got_tag;
56   bool ok;
57   EXPECT_TRUE(cq->Next(&got_tag, &ok));
58   EXPECT_EQ(tag(i), got_tag);
59   return ok;
60 }
61 
Verify(CompletionQueue * cq,int i,bool expect_ok)62 void Verify(CompletionQueue* cq, int i, bool expect_ok) {
63   EXPECT_EQ(expect_ok, VerifyReturnSuccess(cq, i));
64 }
65 
66 // Handlers to handle async request at a server. To be run in a separate thread.
67 template <class Service>
HandleEcho(Service * service,ServerCompletionQueue * cq,bool dup_service)68 void HandleEcho(Service* service, ServerCompletionQueue* cq, bool dup_service) {
69   ServerContext srv_ctx;
70   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
71   EchoRequest recv_request;
72   EchoResponse send_response;
73   service->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq, cq,
74                        tag(1));
75   Verify(cq, 1, true);
76   send_response.set_message(recv_request.message());
77   if (dup_service) {
78     send_response.mutable_message()->append("_dup");
79   }
80   response_writer.Finish(send_response, Status::OK, tag(2));
81   Verify(cq, 2, true);
82 }
83 
84 // Handlers to handle raw request at a server. To be run in a
85 // separate thread. Note that this is the same as the async version, except
86 // that the req/resp are ByteBuffers
87 template <class Service>
HandleRawEcho(Service * service,ServerCompletionQueue * cq,bool)88 void HandleRawEcho(Service* service, ServerCompletionQueue* cq,
89                    bool /*dup_service*/) {
90   ServerContext srv_ctx;
91   GenericServerAsyncResponseWriter response_writer(&srv_ctx);
92   ByteBuffer recv_buffer;
93   service->RequestEcho(&srv_ctx, &recv_buffer, &response_writer, cq, cq,
94                        tag(1));
95   Verify(cq, 1, true);
96   EchoRequest recv_request;
97   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
98   EchoResponse send_response;
99   send_response.set_message(recv_request.message());
100   auto send_buffer = SerializeToByteBuffer(&send_response);
101   response_writer.Finish(*send_buffer, Status::OK, tag(2));
102   Verify(cq, 2, true);
103 }
104 
105 template <class Service>
HandleClientStreaming(Service * service,ServerCompletionQueue * cq)106 void HandleClientStreaming(Service* service, ServerCompletionQueue* cq) {
107   ServerContext srv_ctx;
108   EchoRequest recv_request;
109   EchoResponse send_response;
110   ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
111   service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
112   Verify(cq, 1, true);
113   int i = 1;
114   do {
115     i++;
116     send_response.mutable_message()->append(recv_request.message());
117     srv_stream.Read(&recv_request, tag(i));
118   } while (VerifyReturnSuccess(cq, i));
119   srv_stream.Finish(send_response, Status::OK, tag(100));
120   Verify(cq, 100, true);
121 }
122 
123 template <class Service>
HandleRawClientStreaming(Service * service,ServerCompletionQueue * cq)124 void HandleRawClientStreaming(Service* service, ServerCompletionQueue* cq) {
125   ServerContext srv_ctx;
126   ByteBuffer recv_buffer;
127   EchoRequest recv_request;
128   EchoResponse send_response;
129   GenericServerAsyncReader srv_stream(&srv_ctx);
130   service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
131   Verify(cq, 1, true);
132   int i = 1;
133   while (true) {
134     i++;
135     srv_stream.Read(&recv_buffer, tag(i));
136     if (!VerifyReturnSuccess(cq, i)) {
137       break;
138     }
139     EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
140     send_response.mutable_message()->append(recv_request.message());
141   }
142   auto send_buffer = SerializeToByteBuffer(&send_response);
143   srv_stream.Finish(*send_buffer, Status::OK, tag(100));
144   Verify(cq, 100, true);
145 }
146 
147 template <class Service>
HandleServerStreaming(Service * service,ServerCompletionQueue * cq)148 void HandleServerStreaming(Service* service, ServerCompletionQueue* cq) {
149   ServerContext srv_ctx;
150   EchoRequest recv_request;
151   EchoResponse send_response;
152   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
153   service->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq, cq,
154                                  tag(1));
155   Verify(cq, 1, true);
156   send_response.set_message(recv_request.message() + "0");
157   srv_stream.Write(send_response, tag(2));
158   Verify(cq, 2, true);
159   send_response.set_message(recv_request.message() + "1");
160   srv_stream.Write(send_response, tag(3));
161   Verify(cq, 3, true);
162   send_response.set_message(recv_request.message() + "2");
163   srv_stream.Write(send_response, tag(4));
164   Verify(cq, 4, true);
165   srv_stream.Finish(Status::OK, tag(5));
166   Verify(cq, 5, true);
167 }
168 
HandleGenericEcho(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)169 void HandleGenericEcho(GenericServerAsyncReaderWriter* stream,
170                        CompletionQueue* cq) {
171   ByteBuffer recv_buffer;
172   stream->Read(&recv_buffer, tag(2));
173   Verify(cq, 2, true);
174   EchoRequest recv_request;
175   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
176   EchoResponse send_response;
177   send_response.set_message(recv_request.message());
178   auto send_buffer = SerializeToByteBuffer(&send_response);
179   stream->Write(*send_buffer, tag(3));
180   Verify(cq, 3, true);
181   stream->Finish(Status::OK, tag(4));
182   Verify(cq, 4, true);
183 }
184 
HandleGenericRequestStream(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)185 void HandleGenericRequestStream(GenericServerAsyncReaderWriter* stream,
186                                 CompletionQueue* cq) {
187   ByteBuffer recv_buffer;
188   EchoRequest recv_request;
189   EchoResponse send_response;
190   int i = 1;
191   while (true) {
192     i++;
193     stream->Read(&recv_buffer, tag(i));
194     if (!VerifyReturnSuccess(cq, i)) {
195       break;
196     }
197     EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
198     send_response.mutable_message()->append(recv_request.message());
199   }
200   auto send_buffer = SerializeToByteBuffer(&send_response);
201   stream->Write(*send_buffer, tag(99));
202   Verify(cq, 99, true);
203   stream->Finish(Status::OK, tag(100));
204   Verify(cq, 100, true);
205 }
206 
207 // Request and handle one generic call.
HandleGenericCall(AsyncGenericService * service,ServerCompletionQueue * cq)208 void HandleGenericCall(AsyncGenericService* service,
209                        ServerCompletionQueue* cq) {
210   GenericServerContext srv_ctx;
211   GenericServerAsyncReaderWriter stream(&srv_ctx);
212   service->RequestCall(&srv_ctx, &stream, cq, cq, tag(1));
213   Verify(cq, 1, true);
214   if (srv_ctx.method() == "/grpc.testing.EchoTestService/Echo") {
215     HandleGenericEcho(&stream, cq);
216   } else if (srv_ctx.method() ==
217              "/grpc.testing.EchoTestService/RequestStream") {
218     HandleGenericRequestStream(&stream, cq);
219   } else {  // other methods not handled yet.
220     gpr_log(GPR_ERROR, "method: %s", srv_ctx.method().c_str());
221     GPR_ASSERT(0);
222   }
223 }
224 
225 class TestServiceImplDupPkg
226     : public ::grpc::testing::duplicate::EchoTestService::Service {
227  public:
Echo(ServerContext *,const EchoRequest * request,EchoResponse * response)228   Status Echo(ServerContext* /*context*/, const EchoRequest* request,
229               EchoResponse* response) override {
230     response->set_message(request->message() + "_dup");
231     return Status::OK;
232   }
233 };
234 
235 class HybridEnd2endTest : public ::testing::TestWithParam<bool> {
236  protected:
HybridEnd2endTest()237   HybridEnd2endTest() {}
238 
SetUpTestCase()239   static void SetUpTestCase() {
240 #if TARGET_OS_IPHONE
241     // Workaround Apple CFStream bug
242     gpr_setenv("grpc_cfstream", "0");
243 #endif
244   }
245 
SetUp()246   void SetUp() override {
247     inproc_ = (::testing::UnitTest::GetInstance()
248                    ->current_test_info()
249                    ->value_param() != nullptr)
250                   ? GetParam()
251                   : false;
252   }
253 
SetUpServer(::grpc::Service * service1,::grpc::Service * service2,AsyncGenericService * generic_service,CallbackGenericService * callback_generic_service,int max_message_size=0)254   bool SetUpServer(::grpc::Service* service1, ::grpc::Service* service2,
255                    AsyncGenericService* generic_service,
256                    CallbackGenericService* callback_generic_service,
257                    int max_message_size = 0) {
258     int port = grpc_pick_unused_port_or_die();
259     server_address_ << "localhost:" << port;
260 
261     // Setup server
262     ServerBuilder builder;
263     builder.AddListeningPort(server_address_.str(),
264                              grpc::InsecureServerCredentials());
265     // Always add a sync unimplemented service: we rely on having at least one
266     // synchronous method to get a listening cq
267     builder.RegisterService(&unimplemented_service_);
268     builder.RegisterService(service1);
269     if (service2) {
270       builder.RegisterService(service2);
271     }
272     if (generic_service) {
273       builder.RegisterAsyncGenericService(generic_service);
274     }
275     if (callback_generic_service) {
276 #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
277       builder.RegisterCallbackGenericService(callback_generic_service);
278 #else
279       builder.experimental().RegisterCallbackGenericService(
280           callback_generic_service);
281 #endif
282     }
283 
284     if (max_message_size != 0) {
285       builder.SetMaxMessageSize(max_message_size);
286     }
287 
288     // Create a separate cq for each potential handler.
289     for (int i = 0; i < 5; i++) {
290       cqs_.push_back(builder.AddCompletionQueue(false));
291     }
292     server_ = builder.BuildAndStart();
293 
294     // If there is a generic callback service, this setup is only successful if
295     // we have an iomgr that can run in the background or are inprocess
296     return !callback_generic_service || grpc_iomgr_run_in_background() ||
297            inproc_;
298   }
299 
TearDown()300   void TearDown() override {
301     if (server_) {
302       server_->Shutdown();
303     }
304     void* ignored_tag;
305     bool ignored_ok;
306     for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
307       (*it)->Shutdown();
308       while ((*it)->Next(&ignored_tag, &ignored_ok))
309         ;
310     }
311   }
312 
ResetStub()313   void ResetStub() {
314     std::shared_ptr<Channel> channel =
315         inproc_ ? server_->InProcessChannel(ChannelArguments())
316                 : grpc::CreateChannel(server_address_.str(),
317                                       InsecureChannelCredentials());
318     stub_ = grpc::testing::EchoTestService::NewStub(channel);
319   }
320 
321   // Test all rpc methods.
TestAllMethods()322   void TestAllMethods() {
323     SendEcho();
324     SendSimpleClientStreaming();
325     SendSimpleServerStreaming();
326     SendBidiStreaming();
327   }
328 
SendEcho()329   void SendEcho() {
330     EchoRequest send_request;
331     EchoResponse recv_response;
332     ClientContext cli_ctx;
333     cli_ctx.set_wait_for_ready(true);
334     send_request.set_message("Hello");
335     Status recv_status = stub_->Echo(&cli_ctx, send_request, &recv_response);
336     EXPECT_EQ(send_request.message(), recv_response.message());
337     EXPECT_TRUE(recv_status.ok());
338   }
339 
SendEchoToDupService()340   void SendEchoToDupService() {
341     std::shared_ptr<Channel> channel = grpc::CreateChannel(
342         server_address_.str(), InsecureChannelCredentials());
343     auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
344     EchoRequest send_request;
345     EchoResponse recv_response;
346     ClientContext cli_ctx;
347     cli_ctx.set_wait_for_ready(true);
348     send_request.set_message("Hello");
349     Status recv_status = stub->Echo(&cli_ctx, send_request, &recv_response);
350     EXPECT_EQ(send_request.message() + "_dup", recv_response.message());
351     EXPECT_TRUE(recv_status.ok());
352   }
353 
SendSimpleClientStreaming()354   void SendSimpleClientStreaming() {
355     EchoRequest send_request;
356     EchoResponse recv_response;
357     std::string expected_message;
358     ClientContext cli_ctx;
359     cli_ctx.set_wait_for_ready(true);
360     send_request.set_message("Hello");
361     auto stream = stub_->RequestStream(&cli_ctx, &recv_response);
362     for (int i = 0; i < 5; i++) {
363       EXPECT_TRUE(stream->Write(send_request));
364       expected_message.append(send_request.message());
365     }
366     stream->WritesDone();
367     Status recv_status = stream->Finish();
368     EXPECT_EQ(expected_message, recv_response.message());
369     EXPECT_TRUE(recv_status.ok());
370   }
371 
SendSimpleServerStreaming()372   void SendSimpleServerStreaming() {
373     EchoRequest request;
374     EchoResponse response;
375     ClientContext context;
376     context.set_wait_for_ready(true);
377     request.set_message("hello");
378 
379     auto stream = stub_->ResponseStream(&context, request);
380     EXPECT_TRUE(stream->Read(&response));
381     EXPECT_EQ(response.message(), request.message() + "0");
382     EXPECT_TRUE(stream->Read(&response));
383     EXPECT_EQ(response.message(), request.message() + "1");
384     EXPECT_TRUE(stream->Read(&response));
385     EXPECT_EQ(response.message(), request.message() + "2");
386     EXPECT_FALSE(stream->Read(&response));
387 
388     Status s = stream->Finish();
389     EXPECT_TRUE(s.ok());
390   }
391 
SendSimpleServerStreamingToDupService()392   void SendSimpleServerStreamingToDupService() {
393     std::shared_ptr<Channel> channel = grpc::CreateChannel(
394         server_address_.str(), InsecureChannelCredentials());
395     auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
396     EchoRequest request;
397     EchoResponse response;
398     ClientContext context;
399     context.set_wait_for_ready(true);
400     request.set_message("hello");
401 
402     auto stream = stub->ResponseStream(&context, request);
403     EXPECT_TRUE(stream->Read(&response));
404     EXPECT_EQ(response.message(), request.message() + "0_dup");
405     EXPECT_TRUE(stream->Read(&response));
406     EXPECT_EQ(response.message(), request.message() + "1_dup");
407     EXPECT_TRUE(stream->Read(&response));
408     EXPECT_EQ(response.message(), request.message() + "2_dup");
409     EXPECT_FALSE(stream->Read(&response));
410 
411     Status s = stream->Finish();
412     EXPECT_TRUE(s.ok());
413   }
414 
SendBidiStreaming()415   void SendBidiStreaming() {
416     EchoRequest request;
417     EchoResponse response;
418     ClientContext context;
419     context.set_wait_for_ready(true);
420     std::string msg("hello");
421 
422     auto stream = stub_->BidiStream(&context);
423 
424     request.set_message(msg + "0");
425     EXPECT_TRUE(stream->Write(request));
426     EXPECT_TRUE(stream->Read(&response));
427     EXPECT_EQ(response.message(), request.message());
428 
429     request.set_message(msg + "1");
430     EXPECT_TRUE(stream->Write(request));
431     EXPECT_TRUE(stream->Read(&response));
432     EXPECT_EQ(response.message(), request.message());
433 
434     request.set_message(msg + "2");
435     EXPECT_TRUE(stream->Write(request));
436     EXPECT_TRUE(stream->Read(&response));
437     EXPECT_EQ(response.message(), request.message());
438 
439     stream->WritesDone();
440     EXPECT_FALSE(stream->Read(&response));
441     EXPECT_FALSE(stream->Read(&response));
442 
443     Status s = stream->Finish();
444     EXPECT_TRUE(s.ok());
445   }
446 
447   grpc::testing::UnimplementedEchoService::Service unimplemented_service_;
448   std::vector<std::unique_ptr<ServerCompletionQueue>> cqs_;
449   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
450   std::unique_ptr<Server> server_;
451   std::ostringstream server_address_;
452   bool inproc_;
453 };
454 
TEST_F(HybridEnd2endTest,AsyncEcho)455 TEST_F(HybridEnd2endTest, AsyncEcho) {
456   typedef EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> SType;
457   SType service;
458   SetUpServer(&service, nullptr, nullptr, nullptr);
459   ResetStub();
460   std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
461                                   false);
462   TestAllMethods();
463   echo_handler_thread.join();
464 }
465 
TEST_F(HybridEnd2endTest,RawEcho)466 TEST_F(HybridEnd2endTest, RawEcho) {
467   typedef EchoTestService::WithRawMethod_Echo<TestServiceImpl> SType;
468   SType service;
469   SetUpServer(&service, nullptr, nullptr, nullptr);
470   ResetStub();
471   std::thread echo_handler_thread(HandleRawEcho<SType>, &service, cqs_[0].get(),
472                                   false);
473   TestAllMethods();
474   echo_handler_thread.join();
475 }
476 
TEST_F(HybridEnd2endTest,RawRequestStream)477 TEST_F(HybridEnd2endTest, RawRequestStream) {
478   typedef EchoTestService::WithRawMethod_RequestStream<TestServiceImpl> SType;
479   SType service;
480   SetUpServer(&service, nullptr, nullptr, nullptr);
481   ResetStub();
482   std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
483                                             &service, cqs_[0].get());
484   TestAllMethods();
485   request_stream_handler_thread.join();
486 }
487 
TEST_F(HybridEnd2endTest,AsyncEchoRawRequestStream)488 TEST_F(HybridEnd2endTest, AsyncEchoRawRequestStream) {
489   typedef EchoTestService::WithRawMethod_RequestStream<
490       EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
491       SType;
492   SType service;
493   SetUpServer(&service, nullptr, nullptr, nullptr);
494   ResetStub();
495   std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
496                                   false);
497   std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
498                                             &service, cqs_[1].get());
499   TestAllMethods();
500   request_stream_handler_thread.join();
501   echo_handler_thread.join();
502 }
503 
TEST_F(HybridEnd2endTest,GenericEchoRawRequestStream)504 TEST_F(HybridEnd2endTest, GenericEchoRawRequestStream) {
505   typedef EchoTestService::WithRawMethod_RequestStream<
506       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
507       SType;
508   SType service;
509   AsyncGenericService generic_service;
510   SetUpServer(&service, nullptr, &generic_service, nullptr);
511   ResetStub();
512   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
513                                      cqs_[0].get());
514   std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
515                                             &service, cqs_[1].get());
516   TestAllMethods();
517   generic_handler_thread.join();
518   request_stream_handler_thread.join();
519 }
520 
TEST_F(HybridEnd2endTest,AsyncEchoRequestStream)521 TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) {
522   typedef EchoTestService::WithAsyncMethod_RequestStream<
523       EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
524       SType;
525   SType service;
526   SetUpServer(&service, nullptr, nullptr, nullptr);
527   ResetStub();
528   std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
529                                   false);
530   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
531                                             &service, cqs_[1].get());
532   TestAllMethods();
533   echo_handler_thread.join();
534   request_stream_handler_thread.join();
535 }
536 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream)537 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) {
538   typedef EchoTestService::WithAsyncMethod_RequestStream<
539       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
540       SType;
541   SType service;
542   SetUpServer(&service, nullptr, nullptr, nullptr);
543   ResetStub();
544   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
545                                              &service, cqs_[0].get());
546   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
547                                             &service, cqs_[1].get());
548   TestAllMethods();
549   response_stream_handler_thread.join();
550   request_stream_handler_thread.join();
551 }
552 
553 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncDupService)554 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) {
555   typedef EchoTestService::WithAsyncMethod_RequestStream<
556       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
557       SType;
558   SType service;
559   TestServiceImplDupPkg dup_service;
560   SetUpServer(&service, &dup_service, nullptr, nullptr);
561   ResetStub();
562   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
563                                              &service, cqs_[0].get());
564   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
565                                             &service, cqs_[1].get());
566   TestAllMethods();
567   SendEchoToDupService();
568   response_stream_handler_thread.join();
569   request_stream_handler_thread.join();
570 }
571 
572 // Add a second service with one sync streamed unary method.
573 class StreamedUnaryDupPkg
574     : public duplicate::EchoTestService::WithStreamedUnaryMethod_Echo<
575           TestServiceImplDupPkg> {
576  public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)577   Status StreamedEcho(
578       ServerContext* /*context*/,
579       ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
580     EchoRequest req;
581     EchoResponse resp;
582     uint32_t next_msg_sz;
583     stream->NextMessageSize(&next_msg_sz);
584     gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
585     GPR_ASSERT(stream->Read(&req));
586     resp.set_message(req.message() + "_dup");
587     GPR_ASSERT(stream->Write(resp));
588     return Status::OK;
589   }
590 };
591 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncStreamedUnaryDupService)592 TEST_F(HybridEnd2endTest,
593        AsyncRequestStreamResponseStream_SyncStreamedUnaryDupService) {
594   typedef EchoTestService::WithAsyncMethod_RequestStream<
595       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
596       SType;
597   SType service;
598   StreamedUnaryDupPkg dup_service;
599   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
600   ResetStub();
601   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
602                                              &service, cqs_[0].get());
603   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
604                                             &service, cqs_[1].get());
605   TestAllMethods();
606   SendEchoToDupService();
607   response_stream_handler_thread.join();
608   request_stream_handler_thread.join();
609 }
610 
611 // Add a second service that is fully Streamed Unary
612 class FullyStreamedUnaryDupPkg
613     : public duplicate::EchoTestService::StreamedUnaryService {
614  public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)615   Status StreamedEcho(
616       ServerContext* /*context*/,
617       ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
618     EchoRequest req;
619     EchoResponse resp;
620     uint32_t next_msg_sz;
621     stream->NextMessageSize(&next_msg_sz);
622     gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
623     GPR_ASSERT(stream->Read(&req));
624     resp.set_message(req.message() + "_dup");
625     GPR_ASSERT(stream->Write(resp));
626     return Status::OK;
627   }
628 };
629 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncFullyStreamedUnaryDupService)630 TEST_F(HybridEnd2endTest,
631        AsyncRequestStreamResponseStream_SyncFullyStreamedUnaryDupService) {
632   typedef EchoTestService::WithAsyncMethod_RequestStream<
633       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
634       SType;
635   SType service;
636   FullyStreamedUnaryDupPkg dup_service;
637   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
638   ResetStub();
639   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
640                                              &service, cqs_[0].get());
641   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
642                                             &service, cqs_[1].get());
643   TestAllMethods();
644   SendEchoToDupService();
645   response_stream_handler_thread.join();
646   request_stream_handler_thread.join();
647 }
648 
649 // Add a second service with one sync split server streaming method.
650 class SplitResponseStreamDupPkg
651     : public duplicate::EchoTestService::
652           WithSplitStreamingMethod_ResponseStream<TestServiceImplDupPkg> {
653  public:
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)654   Status StreamedResponseStream(
655       ServerContext* /*context*/,
656       ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
657     EchoRequest req;
658     EchoResponse resp;
659     uint32_t next_msg_sz;
660     stream->NextMessageSize(&next_msg_sz);
661     gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
662     GPR_ASSERT(stream->Read(&req));
663     for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
664       resp.set_message(req.message() + std::to_string(i) + "_dup");
665       GPR_ASSERT(stream->Write(resp));
666     }
667     return Status::OK;
668   }
669 };
670 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncSplitStreamedDupService)671 TEST_F(HybridEnd2endTest,
672        AsyncRequestStreamResponseStream_SyncSplitStreamedDupService) {
673   typedef EchoTestService::WithAsyncMethod_RequestStream<
674       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
675       SType;
676   SType service;
677   SplitResponseStreamDupPkg dup_service;
678   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
679   ResetStub();
680   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
681                                              &service, cqs_[0].get());
682   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
683                                             &service, cqs_[1].get());
684   TestAllMethods();
685   SendSimpleServerStreamingToDupService();
686   response_stream_handler_thread.join();
687   request_stream_handler_thread.join();
688 }
689 
690 // Add a second service that is fully split server streamed
691 class FullySplitStreamedDupPkg
692     : public duplicate::EchoTestService::SplitStreamedService {
693  public:
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)694   Status StreamedResponseStream(
695       ServerContext* /*context*/,
696       ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
697     EchoRequest req;
698     EchoResponse resp;
699     uint32_t next_msg_sz;
700     stream->NextMessageSize(&next_msg_sz);
701     gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
702     GPR_ASSERT(stream->Read(&req));
703     for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
704       resp.set_message(req.message() + std::to_string(i) + "_dup");
705       GPR_ASSERT(stream->Write(resp));
706     }
707     return Status::OK;
708   }
709 };
710 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_FullySplitStreamedDupService)711 TEST_F(HybridEnd2endTest,
712        AsyncRequestStreamResponseStream_FullySplitStreamedDupService) {
713   typedef EchoTestService::WithAsyncMethod_RequestStream<
714       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
715       SType;
716   SType service;
717   FullySplitStreamedDupPkg dup_service;
718   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
719   ResetStub();
720   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
721                                              &service, cqs_[0].get());
722   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
723                                             &service, cqs_[1].get());
724   TestAllMethods();
725   SendSimpleServerStreamingToDupService();
726   response_stream_handler_thread.join();
727   request_stream_handler_thread.join();
728 }
729 
730 // Add a second service that is fully server streamed
731 class FullyStreamedDupPkg : public duplicate::EchoTestService::StreamedService {
732  public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)733   Status StreamedEcho(
734       ServerContext* /*context*/,
735       ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
736     EchoRequest req;
737     EchoResponse resp;
738     uint32_t next_msg_sz;
739     stream->NextMessageSize(&next_msg_sz);
740     gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
741     GPR_ASSERT(stream->Read(&req));
742     resp.set_message(req.message() + "_dup");
743     GPR_ASSERT(stream->Write(resp));
744     return Status::OK;
745   }
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)746   Status StreamedResponseStream(
747       ServerContext* /*context*/,
748       ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
749     EchoRequest req;
750     EchoResponse resp;
751     uint32_t next_msg_sz;
752     stream->NextMessageSize(&next_msg_sz);
753     gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
754     GPR_ASSERT(stream->Read(&req));
755     for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
756       resp.set_message(req.message() + std::to_string(i) + "_dup");
757       GPR_ASSERT(stream->Write(resp));
758     }
759     return Status::OK;
760   }
761 };
762 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_FullyStreamedDupService)763 TEST_F(HybridEnd2endTest,
764        AsyncRequestStreamResponseStream_FullyStreamedDupService) {
765   typedef EchoTestService::WithAsyncMethod_RequestStream<
766       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
767       SType;
768   SType service;
769   FullyStreamedDupPkg dup_service;
770   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
771   ResetStub();
772   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
773                                              &service, cqs_[0].get());
774   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
775                                             &service, cqs_[1].get());
776   TestAllMethods();
777   SendEchoToDupService();
778   SendSimpleServerStreamingToDupService();
779   response_stream_handler_thread.join();
780   request_stream_handler_thread.join();
781 }
782 
783 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_AsyncDupService)784 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) {
785   typedef EchoTestService::WithAsyncMethod_RequestStream<
786       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
787       SType;
788   SType service;
789   duplicate::EchoTestService::AsyncService dup_service;
790   SetUpServer(&service, &dup_service, nullptr, nullptr);
791   ResetStub();
792   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
793                                              &service, cqs_[0].get());
794   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
795                                             &service, cqs_[1].get());
796   std::thread echo_handler_thread(
797       HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
798       cqs_[2].get(), true);
799   TestAllMethods();
800   SendEchoToDupService();
801   response_stream_handler_thread.join();
802   request_stream_handler_thread.join();
803   echo_handler_thread.join();
804 }
805 
TEST_F(HybridEnd2endTest,GenericEcho)806 TEST_F(HybridEnd2endTest, GenericEcho) {
807   EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
808   AsyncGenericService generic_service;
809   SetUpServer(&service, nullptr, &generic_service, nullptr);
810   ResetStub();
811   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
812                                      cqs_[0].get());
813   TestAllMethods();
814   generic_handler_thread.join();
815 }
816 
TEST_P(HybridEnd2endTest,CallbackGenericEcho)817 TEST_P(HybridEnd2endTest, CallbackGenericEcho) {
818   EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
819   class GenericEchoService : public CallbackGenericService {
820    private:
821     ServerGenericBidiReactor* CreateReactor(
822         GenericCallbackServerContext* context) override {
823       EXPECT_EQ(context->method(), "/grpc.testing.EchoTestService/Echo");
824       gpr_log(GPR_DEBUG, "Constructor of generic service %d",
825               static_cast<int>(context->deadline().time_since_epoch().count()));
826 
827       class Reactor : public ServerGenericBidiReactor {
828        public:
829         Reactor() { StartRead(&request_); }
830 
831        private:
832         void OnDone() override { delete this; }
833         void OnReadDone(bool ok) override {
834           if (!ok) {
835             EXPECT_EQ(reads_complete_, 1);
836           } else {
837             EXPECT_EQ(reads_complete_++, 0);
838             response_ = request_;
839             StartWrite(&response_);
840             StartRead(&request_);
841           }
842         }
843         void OnWriteDone(bool ok) override {
844           Finish(ok ? Status::OK
845                     : Status(StatusCode::UNKNOWN, "Unexpected failure"));
846         }
847         ByteBuffer request_;
848         ByteBuffer response_;
849         std::atomic_int reads_complete_{0};
850       };
851       return new Reactor;
852     }
853   } generic_service;
854 
855   if (!SetUpServer(&service, nullptr, nullptr, &generic_service)) {
856     return;
857   }
858   ResetStub();
859   TestAllMethods();
860 }
861 
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream)862 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) {
863   typedef EchoTestService::WithAsyncMethod_RequestStream<
864       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
865       SType;
866   SType service;
867   AsyncGenericService generic_service;
868   SetUpServer(&service, nullptr, &generic_service, nullptr);
869   ResetStub();
870   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
871                                      cqs_[0].get());
872   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
873                                             &service, cqs_[1].get());
874   TestAllMethods();
875   generic_handler_thread.join();
876   request_stream_handler_thread.join();
877 }
878 
879 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream_SyncDupService)880 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_SyncDupService) {
881   typedef EchoTestService::WithAsyncMethod_RequestStream<
882       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
883       SType;
884   SType service;
885   AsyncGenericService generic_service;
886   TestServiceImplDupPkg dup_service;
887   SetUpServer(&service, &dup_service, &generic_service, nullptr);
888   ResetStub();
889   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
890                                      cqs_[0].get());
891   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
892                                             &service, cqs_[1].get());
893   TestAllMethods();
894   SendEchoToDupService();
895   generic_handler_thread.join();
896   request_stream_handler_thread.join();
897 }
898 
899 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream_AsyncDupService)900 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_AsyncDupService) {
901   typedef EchoTestService::WithAsyncMethod_RequestStream<
902       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
903       SType;
904   SType service;
905   AsyncGenericService generic_service;
906   duplicate::EchoTestService::AsyncService dup_service;
907   SetUpServer(&service, &dup_service, &generic_service, nullptr);
908   ResetStub();
909   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
910                                      cqs_[0].get());
911   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
912                                             &service, cqs_[1].get());
913   std::thread echo_handler_thread(
914       HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
915       cqs_[2].get(), true);
916   TestAllMethods();
917   SendEchoToDupService();
918   generic_handler_thread.join();
919   request_stream_handler_thread.join();
920   echo_handler_thread.join();
921 }
922 
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamResponseStream)923 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
924   typedef EchoTestService::WithAsyncMethod_RequestStream<
925       EchoTestService::WithGenericMethod_Echo<
926           EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
927       SType;
928   SType service;
929   AsyncGenericService generic_service;
930   SetUpServer(&service, nullptr, &generic_service, nullptr);
931   ResetStub();
932   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
933                                      cqs_[0].get());
934   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
935                                             &service, cqs_[1].get());
936   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
937                                              &service, cqs_[2].get());
938   TestAllMethods();
939   generic_handler_thread.join();
940   request_stream_handler_thread.join();
941   response_stream_handler_thread.join();
942 }
943 
TEST_F(HybridEnd2endTest,GenericEchoRequestStreamAsyncResponseStream)944 TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) {
945   typedef EchoTestService::WithGenericMethod_RequestStream<
946       EchoTestService::WithGenericMethod_Echo<
947           EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
948       SType;
949   SType service;
950   AsyncGenericService generic_service;
951   SetUpServer(&service, nullptr, &generic_service, nullptr);
952   ResetStub();
953   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
954                                      cqs_[0].get());
955   std::thread generic_handler_thread2(HandleGenericCall, &generic_service,
956                                       cqs_[1].get());
957   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
958                                              &service, cqs_[2].get());
959   TestAllMethods();
960   generic_handler_thread.join();
961   generic_handler_thread2.join();
962   response_stream_handler_thread.join();
963 }
964 
965 // If WithGenericMethod is called and no generic service is registered, the
966 // server will fail to build.
TEST_F(HybridEnd2endTest,GenericMethodWithoutGenericService)967 TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) {
968   EchoTestService::WithGenericMethod_RequestStream<
969       EchoTestService::WithGenericMethod_Echo<
970           EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
971       service;
972   SetUpServer(&service, nullptr, nullptr, nullptr);
973   EXPECT_EQ(nullptr, server_.get());
974 }
975 
976 INSTANTIATE_TEST_SUITE_P(HybridEnd2endTest, HybridEnd2endTest,
977                          ::testing::Bool());
978 
979 }  // namespace
980 }  // namespace testing
981 }  // namespace grpc
982 
main(int argc,char ** argv)983 int main(int argc, char** argv) {
984   grpc::testing::TestEnvironment env(argc, argv);
985   ::testing::InitGoogleTest(&argc, argv);
986   return RUN_ALL_TESTS();
987 }
988