• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/grpc.h>
20 #include <grpcpp/channel.h>
21 #include <grpcpp/client_context.h>
22 #include <grpcpp/create_channel.h>
23 #include <grpcpp/generic/async_generic_service.h>
24 #include <grpcpp/server.h>
25 #include <grpcpp/server_builder.h>
26 #include <grpcpp/server_context.h>
27 #include <gtest/gtest.h>
28 
29 #include <memory>
30 #include <thread>
31 
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "src/core/lib/iomgr/iomgr.h"
35 #include "src/core/util/env.h"
36 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
37 #include "src/proto/grpc/testing/echo.grpc.pb.h"
38 #include "test/core/test_util/port.h"
39 #include "test/core/test_util/test_config.h"
40 #include "test/cpp/end2end/test_service_impl.h"
41 #include "test/cpp/util/byte_buffer_proto_helper.h"
42 
43 namespace grpc {
44 namespace testing {
45 namespace {
46 
tag(int i)47 void* tag(int i) { return reinterpret_cast<void*>(i); }
48 
VerifyReturnSuccess(CompletionQueue * cq,int i)49 bool VerifyReturnSuccess(CompletionQueue* cq, int i) {
50   void* got_tag;
51   bool ok;
52   EXPECT_TRUE(cq->Next(&got_tag, &ok));
53   EXPECT_EQ(tag(i), got_tag);
54   return ok;
55 }
56 
Verify(CompletionQueue * cq,int i,bool expect_ok)57 void Verify(CompletionQueue* cq, int i, bool expect_ok) {
58   EXPECT_EQ(expect_ok, VerifyReturnSuccess(cq, i));
59 }
60 
61 // Handlers to handle async request at a server. To be run in a separate thread.
62 template <class Service>
HandleEcho(Service * service,ServerCompletionQueue * cq,bool dup_service)63 void HandleEcho(Service* service, ServerCompletionQueue* cq, bool dup_service) {
64   ServerContext srv_ctx;
65   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
66   EchoRequest recv_request;
67   EchoResponse send_response;
68   service->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq, cq,
69                        tag(1));
70   Verify(cq, 1, true);
71   send_response.set_message(recv_request.message());
72   if (dup_service) {
73     send_response.mutable_message()->append("_dup");
74   }
75   response_writer.Finish(send_response, Status::OK, tag(2));
76   Verify(cq, 2, true);
77 }
78 
79 // Handlers to handle raw request at a server. To be run in a
80 // separate thread. Note that this is the same as the async version, except
81 // that the req/resp are ByteBuffers
82 template <class Service>
HandleRawEcho(Service * service,ServerCompletionQueue * cq,bool)83 void HandleRawEcho(Service* service, ServerCompletionQueue* cq,
84                    bool /*dup_service*/) {
85   ServerContext srv_ctx;
86   GenericServerAsyncResponseWriter response_writer(&srv_ctx);
87   ByteBuffer recv_buffer;
88   service->RequestEcho(&srv_ctx, &recv_buffer, &response_writer, cq, cq,
89                        tag(1));
90   Verify(cq, 1, true);
91   EchoRequest recv_request;
92   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
93   EchoResponse send_response;
94   send_response.set_message(recv_request.message());
95   auto send_buffer = SerializeToByteBuffer(&send_response);
96   response_writer.Finish(*send_buffer, Status::OK, tag(2));
97   Verify(cq, 2, true);
98 }
99 
100 template <class Service>
HandleClientStreaming(Service * service,ServerCompletionQueue * cq)101 void HandleClientStreaming(Service* service, ServerCompletionQueue* cq) {
102   ServerContext srv_ctx;
103   EchoRequest recv_request;
104   EchoResponse send_response;
105   ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
106   service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
107   Verify(cq, 1, true);
108   int i = 1;
109   do {
110     i++;
111     send_response.mutable_message()->append(recv_request.message());
112     srv_stream.Read(&recv_request, tag(i));
113   } while (VerifyReturnSuccess(cq, i));
114   srv_stream.Finish(send_response, Status::OK, tag(100));
115   Verify(cq, 100, true);
116 }
117 
118 template <class Service>
HandleRawClientStreaming(Service * service,ServerCompletionQueue * cq)119 void HandleRawClientStreaming(Service* service, ServerCompletionQueue* cq) {
120   ServerContext srv_ctx;
121   ByteBuffer recv_buffer;
122   EchoRequest recv_request;
123   EchoResponse send_response;
124   GenericServerAsyncReader srv_stream(&srv_ctx);
125   service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
126   Verify(cq, 1, true);
127   int i = 1;
128   while (true) {
129     i++;
130     srv_stream.Read(&recv_buffer, tag(i));
131     if (!VerifyReturnSuccess(cq, i)) {
132       break;
133     }
134     EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
135     send_response.mutable_message()->append(recv_request.message());
136   }
137   auto send_buffer = SerializeToByteBuffer(&send_response);
138   srv_stream.Finish(*send_buffer, Status::OK, tag(100));
139   Verify(cq, 100, true);
140 }
141 
142 template <class Service>
HandleServerStreaming(Service * service,ServerCompletionQueue * cq)143 void HandleServerStreaming(Service* service, ServerCompletionQueue* cq) {
144   ServerContext srv_ctx;
145   EchoRequest recv_request;
146   EchoResponse send_response;
147   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
148   service->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq, cq,
149                                  tag(1));
150   Verify(cq, 1, true);
151   send_response.set_message(recv_request.message() + "0");
152   srv_stream.Write(send_response, tag(2));
153   Verify(cq, 2, true);
154   send_response.set_message(recv_request.message() + "1");
155   srv_stream.Write(send_response, tag(3));
156   Verify(cq, 3, true);
157   send_response.set_message(recv_request.message() + "2");
158   srv_stream.Write(send_response, tag(4));
159   Verify(cq, 4, true);
160   srv_stream.Finish(Status::OK, tag(5));
161   Verify(cq, 5, true);
162 }
163 
HandleGenericEcho(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)164 void HandleGenericEcho(GenericServerAsyncReaderWriter* stream,
165                        CompletionQueue* cq) {
166   ByteBuffer recv_buffer;
167   stream->Read(&recv_buffer, tag(2));
168   Verify(cq, 2, true);
169   EchoRequest recv_request;
170   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
171   EchoResponse send_response;
172   send_response.set_message(recv_request.message());
173   auto send_buffer = SerializeToByteBuffer(&send_response);
174   stream->Write(*send_buffer, tag(3));
175   Verify(cq, 3, true);
176   stream->Finish(Status::OK, tag(4));
177   Verify(cq, 4, true);
178 }
179 
HandleGenericRequestStream(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)180 void HandleGenericRequestStream(GenericServerAsyncReaderWriter* stream,
181                                 CompletionQueue* cq) {
182   ByteBuffer recv_buffer;
183   EchoRequest recv_request;
184   EchoResponse send_response;
185   int i = 1;
186   while (true) {
187     i++;
188     stream->Read(&recv_buffer, tag(i));
189     if (!VerifyReturnSuccess(cq, i)) {
190       break;
191     }
192     EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
193     send_response.mutable_message()->append(recv_request.message());
194   }
195   auto send_buffer = SerializeToByteBuffer(&send_response);
196   stream->Write(*send_buffer, tag(99));
197   Verify(cq, 99, true);
198   stream->Finish(Status::OK, tag(100));
199   Verify(cq, 100, true);
200 }
201 
202 // Request and handle one generic call.
HandleGenericCall(AsyncGenericService * service,ServerCompletionQueue * cq)203 void HandleGenericCall(AsyncGenericService* service,
204                        ServerCompletionQueue* cq) {
205   GenericServerContext srv_ctx;
206   GenericServerAsyncReaderWriter stream(&srv_ctx);
207   service->RequestCall(&srv_ctx, &stream, cq, cq, tag(1));
208   Verify(cq, 1, true);
209   if (srv_ctx.method() == "/grpc.testing.EchoTestService/Echo") {
210     HandleGenericEcho(&stream, cq);
211   } else if (srv_ctx.method() ==
212              "/grpc.testing.EchoTestService/RequestStream") {
213     HandleGenericRequestStream(&stream, cq);
214   } else {  // other methods not handled yet.
215     LOG(ERROR) << "method: " << srv_ctx.method();
216     CHECK(0);
217   }
218 }
219 
220 class TestServiceImplDupPkg
221     : public grpc::testing::duplicate::EchoTestService::Service {
222  public:
Echo(ServerContext *,const EchoRequest * request,EchoResponse * response)223   Status Echo(ServerContext* /*context*/, const EchoRequest* request,
224               EchoResponse* response) override {
225     response->set_message(request->message() + "_dup");
226     return Status::OK;
227   }
228 };
229 
230 class HybridEnd2endTest : public ::testing::TestWithParam<bool> {
231  protected:
HybridEnd2endTest()232   HybridEnd2endTest() {}
233 
SetUpTestSuite()234   static void SetUpTestSuite() {
235 #if TARGET_OS_IPHONE
236     // Workaround Apple CFStream bug
237     grpc_core::SetEnv("grpc_cfstream", "0");
238 #endif
239   }
240 
SetUp()241   void SetUp() override {
242     inproc_ = (::testing::UnitTest::GetInstance()
243                    ->current_test_info()
244                    ->value_param() != nullptr)
245                   ? GetParam()
246                   : false;
247   }
248 
SetUpServer(grpc::Service * service1,grpc::Service * service2,AsyncGenericService * generic_service,CallbackGenericService * callback_generic_service,int max_message_size=0)249   bool SetUpServer(grpc::Service* service1, grpc::Service* service2,
250                    AsyncGenericService* generic_service,
251                    CallbackGenericService* callback_generic_service,
252                    int max_message_size = 0) {
253     int port = grpc_pick_unused_port_or_die();
254     server_address_ << "localhost:" << port;
255 
256     // Setup server
257     ServerBuilder builder;
258     builder.AddListeningPort(server_address_.str(),
259                              grpc::InsecureServerCredentials());
260     // Always add a sync unimplemented service: we rely on having at least one
261     // synchronous method to get a listening cq
262     builder.RegisterService(&unimplemented_service_);
263     builder.RegisterService(service1);
264     if (service2) {
265       builder.RegisterService(service2);
266     }
267     if (generic_service) {
268       builder.RegisterAsyncGenericService(generic_service);
269     }
270     if (callback_generic_service) {
271       builder.RegisterCallbackGenericService(callback_generic_service);
272     }
273 
274     if (max_message_size != 0) {
275       builder.SetMaxMessageSize(max_message_size);
276     }
277 
278     // Create a separate cq for each potential handler.
279     for (int i = 0; i < 5; i++) {
280       cqs_.push_back(builder.AddCompletionQueue(false));
281     }
282     server_ = builder.BuildAndStart();
283 
284     // If there is a generic callback service, this setup is only successful if
285     // we have an iomgr that can run in the background or are inprocess
286     return !callback_generic_service || grpc_iomgr_run_in_background() ||
287            inproc_;
288   }
289 
TearDown()290   void TearDown() override {
291     if (server_) {
292       server_->Shutdown();
293     }
294     void* ignored_tag;
295     bool ignored_ok;
296     for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
297       (*it)->Shutdown();
298       while ((*it)->Next(&ignored_tag, &ignored_ok)) {
299       }
300     }
301   }
302 
ResetStub()303   void ResetStub() {
304     std::shared_ptr<Channel> channel =
305         inproc_ ? server_->InProcessChannel(ChannelArguments())
306                 : grpc::CreateChannel(server_address_.str(),
307                                       InsecureChannelCredentials());
308     stub_ = grpc::testing::EchoTestService::NewStub(channel);
309   }
310 
311   // Test all rpc methods.
TestAllMethods()312   void TestAllMethods() {
313     SendEcho();
314     SendSimpleClientStreaming();
315     SendSimpleServerStreaming();
316     SendBidiStreaming();
317   }
318 
SendEcho()319   void SendEcho() {
320     EchoRequest send_request;
321     EchoResponse recv_response;
322     ClientContext cli_ctx;
323     cli_ctx.set_wait_for_ready(true);
324     send_request.set_message("Hello");
325     Status recv_status = stub_->Echo(&cli_ctx, send_request, &recv_response);
326     EXPECT_EQ(send_request.message(), recv_response.message());
327     EXPECT_TRUE(recv_status.ok());
328   }
329 
SendEchoToDupService()330   void SendEchoToDupService() {
331     std::shared_ptr<Channel> channel = grpc::CreateChannel(
332         server_address_.str(), InsecureChannelCredentials());
333     auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
334     EchoRequest send_request;
335     EchoResponse recv_response;
336     ClientContext cli_ctx;
337     cli_ctx.set_wait_for_ready(true);
338     send_request.set_message("Hello");
339     Status recv_status = stub->Echo(&cli_ctx, send_request, &recv_response);
340     EXPECT_EQ(send_request.message() + "_dup", recv_response.message());
341     EXPECT_TRUE(recv_status.ok());
342   }
343 
SendSimpleClientStreaming()344   void SendSimpleClientStreaming() {
345     EchoRequest send_request;
346     EchoResponse recv_response;
347     std::string expected_message;
348     ClientContext cli_ctx;
349     cli_ctx.set_wait_for_ready(true);
350     send_request.set_message("Hello");
351     auto stream = stub_->RequestStream(&cli_ctx, &recv_response);
352     for (int i = 0; i < 5; i++) {
353       EXPECT_TRUE(stream->Write(send_request));
354       expected_message.append(send_request.message());
355     }
356     stream->WritesDone();
357     Status recv_status = stream->Finish();
358     EXPECT_EQ(expected_message, recv_response.message());
359     EXPECT_TRUE(recv_status.ok());
360   }
361 
SendSimpleServerStreaming()362   void SendSimpleServerStreaming() {
363     EchoRequest request;
364     EchoResponse response;
365     ClientContext context;
366     context.set_wait_for_ready(true);
367     request.set_message("hello");
368 
369     auto stream = stub_->ResponseStream(&context, request);
370     EXPECT_TRUE(stream->Read(&response));
371     EXPECT_EQ(response.message(), request.message() + "0");
372     EXPECT_TRUE(stream->Read(&response));
373     EXPECT_EQ(response.message(), request.message() + "1");
374     EXPECT_TRUE(stream->Read(&response));
375     EXPECT_EQ(response.message(), request.message() + "2");
376     EXPECT_FALSE(stream->Read(&response));
377 
378     Status s = stream->Finish();
379     EXPECT_TRUE(s.ok());
380   }
381 
SendSimpleServerStreamingToDupService()382   void SendSimpleServerStreamingToDupService() {
383     std::shared_ptr<Channel> channel = grpc::CreateChannel(
384         server_address_.str(), InsecureChannelCredentials());
385     auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
386     EchoRequest request;
387     EchoResponse response;
388     ClientContext context;
389     context.set_wait_for_ready(true);
390     request.set_message("hello");
391 
392     auto stream = stub->ResponseStream(&context, request);
393     EXPECT_TRUE(stream->Read(&response));
394     EXPECT_EQ(response.message(), request.message() + "0_dup");
395     EXPECT_TRUE(stream->Read(&response));
396     EXPECT_EQ(response.message(), request.message() + "1_dup");
397     EXPECT_TRUE(stream->Read(&response));
398     EXPECT_EQ(response.message(), request.message() + "2_dup");
399     EXPECT_FALSE(stream->Read(&response));
400 
401     Status s = stream->Finish();
402     EXPECT_TRUE(s.ok());
403   }
404 
SendBidiStreaming()405   void SendBidiStreaming() {
406     EchoRequest request;
407     EchoResponse response;
408     ClientContext context;
409     context.set_wait_for_ready(true);
410     std::string msg("hello");
411 
412     auto stream = stub_->BidiStream(&context);
413 
414     request.set_message(msg + "0");
415     EXPECT_TRUE(stream->Write(request));
416     EXPECT_TRUE(stream->Read(&response));
417     EXPECT_EQ(response.message(), request.message());
418 
419     request.set_message(msg + "1");
420     EXPECT_TRUE(stream->Write(request));
421     EXPECT_TRUE(stream->Read(&response));
422     EXPECT_EQ(response.message(), request.message());
423 
424     request.set_message(msg + "2");
425     EXPECT_TRUE(stream->Write(request));
426     EXPECT_TRUE(stream->Read(&response));
427     EXPECT_EQ(response.message(), request.message());
428 
429     stream->WritesDone();
430     EXPECT_FALSE(stream->Read(&response));
431     EXPECT_FALSE(stream->Read(&response));
432 
433     Status s = stream->Finish();
434     EXPECT_TRUE(s.ok());
435   }
436 
437   grpc::testing::UnimplementedEchoService::Service unimplemented_service_;
438   std::vector<std::unique_ptr<ServerCompletionQueue>> cqs_;
439   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
440   std::unique_ptr<Server> server_;
441   std::ostringstream server_address_;
442   bool inproc_;
443 };
444 
TEST_F(HybridEnd2endTest,AsyncEcho)445 TEST_F(HybridEnd2endTest, AsyncEcho) {
446   typedef EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> SType;
447   SType service;
448   SetUpServer(&service, nullptr, nullptr, nullptr);
449   ResetStub();
450   std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
451                                   false);
452   TestAllMethods();
453   echo_handler_thread.join();
454 }
455 
TEST_F(HybridEnd2endTest,RawEcho)456 TEST_F(HybridEnd2endTest, RawEcho) {
457   typedef EchoTestService::WithRawMethod_Echo<TestServiceImpl> SType;
458   SType service;
459   SetUpServer(&service, nullptr, nullptr, nullptr);
460   ResetStub();
461   std::thread echo_handler_thread(HandleRawEcho<SType>, &service, cqs_[0].get(),
462                                   false);
463   TestAllMethods();
464   echo_handler_thread.join();
465 }
466 
TEST_F(HybridEnd2endTest,RawRequestStream)467 TEST_F(HybridEnd2endTest, RawRequestStream) {
468   typedef EchoTestService::WithRawMethod_RequestStream<TestServiceImpl> SType;
469   SType service;
470   SetUpServer(&service, nullptr, nullptr, nullptr);
471   ResetStub();
472   std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
473                                             &service, cqs_[0].get());
474   TestAllMethods();
475   request_stream_handler_thread.join();
476 }
477 
TEST_F(HybridEnd2endTest,AsyncEchoRawRequestStream)478 TEST_F(HybridEnd2endTest, AsyncEchoRawRequestStream) {
479   typedef EchoTestService::WithRawMethod_RequestStream<
480       EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
481       SType;
482   SType service;
483   SetUpServer(&service, nullptr, nullptr, nullptr);
484   ResetStub();
485   std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
486                                   false);
487   std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
488                                             &service, cqs_[1].get());
489   TestAllMethods();
490   request_stream_handler_thread.join();
491   echo_handler_thread.join();
492 }
493 
TEST_F(HybridEnd2endTest,GenericEchoRawRequestStream)494 TEST_F(HybridEnd2endTest, GenericEchoRawRequestStream) {
495   typedef EchoTestService::WithRawMethod_RequestStream<
496       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
497       SType;
498   SType service;
499   AsyncGenericService generic_service;
500   SetUpServer(&service, nullptr, &generic_service, nullptr);
501   ResetStub();
502   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
503                                      cqs_[0].get());
504   std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
505                                             &service, cqs_[1].get());
506   TestAllMethods();
507   generic_handler_thread.join();
508   request_stream_handler_thread.join();
509 }
510 
TEST_F(HybridEnd2endTest,AsyncEchoRequestStream)511 TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) {
512   typedef EchoTestService::WithAsyncMethod_RequestStream<
513       EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
514       SType;
515   SType service;
516   SetUpServer(&service, nullptr, nullptr, nullptr);
517   ResetStub();
518   std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
519                                   false);
520   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
521                                             &service, cqs_[1].get());
522   TestAllMethods();
523   echo_handler_thread.join();
524   request_stream_handler_thread.join();
525 }
526 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream)527 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) {
528   typedef EchoTestService::WithAsyncMethod_RequestStream<
529       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
530       SType;
531   SType service;
532   SetUpServer(&service, nullptr, nullptr, nullptr);
533   ResetStub();
534   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
535                                              &service, cqs_[0].get());
536   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
537                                             &service, cqs_[1].get());
538   TestAllMethods();
539   response_stream_handler_thread.join();
540   request_stream_handler_thread.join();
541 }
542 
543 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncDupService)544 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStreamSyncDupService) {
545   typedef EchoTestService::WithAsyncMethod_RequestStream<
546       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
547       SType;
548   SType service;
549   TestServiceImplDupPkg dup_service;
550   SetUpServer(&service, &dup_service, nullptr, nullptr);
551   ResetStub();
552   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
553                                              &service, cqs_[0].get());
554   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
555                                             &service, cqs_[1].get());
556   TestAllMethods();
557   SendEchoToDupService();
558   response_stream_handler_thread.join();
559   request_stream_handler_thread.join();
560 }
561 
562 // Add a second service with one sync streamed unary method.
563 class StreamedUnaryDupPkg
564     : public duplicate::EchoTestService::WithStreamedUnaryMethod_Echo<
565           TestServiceImplDupPkg> {
566  public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)567   Status StreamedEcho(
568       ServerContext* /*context*/,
569       ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
570     EchoRequest req;
571     EchoResponse resp;
572     uint32_t next_msg_sz;
573     stream->NextMessageSize(&next_msg_sz);
574     LOG(INFO) << "Streamed Unary Next Message Size is " << next_msg_sz;
575     CHECK(stream->Read(&req));
576     resp.set_message(req.message() + "_dup");
577     CHECK(stream->Write(resp));
578     return Status::OK;
579   }
580 };
581 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncStreamedUnaryDupService)582 TEST_F(HybridEnd2endTest,
583        AsyncRequestStreamResponseStreamSyncStreamedUnaryDupService) {
584   typedef EchoTestService::WithAsyncMethod_RequestStream<
585       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
586       SType;
587   SType service;
588   StreamedUnaryDupPkg dup_service;
589   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
590   ResetStub();
591   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
592                                              &service, cqs_[0].get());
593   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
594                                             &service, cqs_[1].get());
595   TestAllMethods();
596   SendEchoToDupService();
597   response_stream_handler_thread.join();
598   request_stream_handler_thread.join();
599 }
600 
601 // Add a second service that is fully Streamed Unary
602 class FullyStreamedUnaryDupPkg
603     : public duplicate::EchoTestService::StreamedUnaryService {
604  public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)605   Status StreamedEcho(
606       ServerContext* /*context*/,
607       ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
608     EchoRequest req;
609     EchoResponse resp;
610     uint32_t next_msg_sz;
611     stream->NextMessageSize(&next_msg_sz);
612     LOG(INFO) << "Streamed Unary Next Message Size is " << next_msg_sz;
613     CHECK(stream->Read(&req));
614     resp.set_message(req.message() + "_dup");
615     CHECK(stream->Write(resp));
616     return Status::OK;
617   }
618 };
619 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncFullyStreamedUnaryDupService)620 TEST_F(HybridEnd2endTest,
621        AsyncRequestStreamResponseStreamSyncFullyStreamedUnaryDupService) {
622   typedef EchoTestService::WithAsyncMethod_RequestStream<
623       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
624       SType;
625   SType service;
626   FullyStreamedUnaryDupPkg dup_service;
627   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
628   ResetStub();
629   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
630                                              &service, cqs_[0].get());
631   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
632                                             &service, cqs_[1].get());
633   TestAllMethods();
634   SendEchoToDupService();
635   response_stream_handler_thread.join();
636   request_stream_handler_thread.join();
637 }
638 
639 // Add a second service with one sync split server streaming method.
640 class SplitResponseStreamDupPkg
641     : public duplicate::EchoTestService::
642           WithSplitStreamingMethod_ResponseStream<TestServiceImplDupPkg> {
643  public:
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)644   Status StreamedResponseStream(
645       ServerContext* /*context*/,
646       ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
647     EchoRequest req;
648     EchoResponse resp;
649     uint32_t next_msg_sz;
650     stream->NextMessageSize(&next_msg_sz);
651     LOG(INFO) << "Split Streamed Next Message Size is " << next_msg_sz;
652     CHECK(stream->Read(&req));
653     for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
654       resp.set_message(req.message() + std::to_string(i) + "_dup");
655       CHECK(stream->Write(resp));
656     }
657     return Status::OK;
658   }
659 };
660 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncSplitStreamedDupService)661 TEST_F(HybridEnd2endTest,
662        AsyncRequestStreamResponseStreamSyncSplitStreamedDupService) {
663   typedef EchoTestService::WithAsyncMethod_RequestStream<
664       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
665       SType;
666   SType service;
667   SplitResponseStreamDupPkg dup_service;
668   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
669   ResetStub();
670   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
671                                              &service, cqs_[0].get());
672   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
673                                             &service, cqs_[1].get());
674   TestAllMethods();
675   SendSimpleServerStreamingToDupService();
676   response_stream_handler_thread.join();
677   request_stream_handler_thread.join();
678 }
679 
680 // Add a second service that is fully split server streamed
681 class FullySplitStreamedDupPkg
682     : public duplicate::EchoTestService::SplitStreamedService {
683  public:
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)684   Status StreamedResponseStream(
685       ServerContext* /*context*/,
686       ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
687     EchoRequest req;
688     EchoResponse resp;
689     uint32_t next_msg_sz;
690     stream->NextMessageSize(&next_msg_sz);
691     LOG(INFO) << "Split Streamed Next Message Size is " << next_msg_sz;
692     CHECK(stream->Read(&req));
693     for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
694       resp.set_message(req.message() + std::to_string(i) + "_dup");
695       CHECK(stream->Write(resp));
696     }
697     return Status::OK;
698   }
699 };
700 
TEST_F(HybridEnd2endTest,asyncRequestStreamResponseStreamFullySplitStreamedDupService)701 TEST_F(HybridEnd2endTest,
702        asyncRequestStreamResponseStreamFullySplitStreamedDupService) {
703   typedef EchoTestService::WithAsyncMethod_RequestStream<
704       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
705       SType;
706   SType service;
707   FullySplitStreamedDupPkg dup_service;
708   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
709   ResetStub();
710   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
711                                              &service, cqs_[0].get());
712   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
713                                             &service, cqs_[1].get());
714   TestAllMethods();
715   SendSimpleServerStreamingToDupService();
716   response_stream_handler_thread.join();
717   request_stream_handler_thread.join();
718 }
719 
720 // Add a second service that is fully server streamed
721 class FullyStreamedDupPkg : public duplicate::EchoTestService::StreamedService {
722  public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)723   Status StreamedEcho(
724       ServerContext* /*context*/,
725       ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
726     EchoRequest req;
727     EchoResponse resp;
728     uint32_t next_msg_sz;
729     stream->NextMessageSize(&next_msg_sz);
730     LOG(INFO) << "Streamed Unary Next Message Size is " << next_msg_sz;
731     CHECK(stream->Read(&req));
732     resp.set_message(req.message() + "_dup");
733     CHECK(stream->Write(resp));
734     return Status::OK;
735   }
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)736   Status StreamedResponseStream(
737       ServerContext* /*context*/,
738       ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
739     EchoRequest req;
740     EchoResponse resp;
741     uint32_t next_msg_sz;
742     stream->NextMessageSize(&next_msg_sz);
743     LOG(INFO) << "Split Streamed Next Message Size is " << next_msg_sz;
744     CHECK(stream->Read(&req));
745     for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
746       resp.set_message(req.message() + std::to_string(i) + "_dup");
747       CHECK(stream->Write(resp));
748     }
749     return Status::OK;
750   }
751 };
752 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamFullyStreamedDupService)753 TEST_F(HybridEnd2endTest,
754        AsyncRequestStreamResponseStreamFullyStreamedDupService) {
755   typedef EchoTestService::WithAsyncMethod_RequestStream<
756       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
757       SType;
758   SType service;
759   FullyStreamedDupPkg dup_service;
760   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
761   ResetStub();
762   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
763                                              &service, cqs_[0].get());
764   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
765                                             &service, cqs_[1].get());
766   TestAllMethods();
767   SendEchoToDupService();
768   SendSimpleServerStreamingToDupService();
769   response_stream_handler_thread.join();
770   request_stream_handler_thread.join();
771 }
772 
773 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamAsyncDupService)774 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStreamAsyncDupService) {
775   typedef EchoTestService::WithAsyncMethod_RequestStream<
776       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
777       SType;
778   SType service;
779   duplicate::EchoTestService::AsyncService dup_service;
780   SetUpServer(&service, &dup_service, nullptr, nullptr);
781   ResetStub();
782   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
783                                              &service, cqs_[0].get());
784   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
785                                             &service, cqs_[1].get());
786   std::thread echo_handler_thread(
787       HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
788       cqs_[2].get(), true);
789   TestAllMethods();
790   SendEchoToDupService();
791   response_stream_handler_thread.join();
792   request_stream_handler_thread.join();
793   echo_handler_thread.join();
794 }
795 
TEST_F(HybridEnd2endTest,GenericEcho)796 TEST_F(HybridEnd2endTest, GenericEcho) {
797   EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
798   AsyncGenericService generic_service;
799   SetUpServer(&service, nullptr, &generic_service, nullptr);
800   ResetStub();
801   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
802                                      cqs_[0].get());
803   TestAllMethods();
804   generic_handler_thread.join();
805 }
806 
TEST_P(HybridEnd2endTest,CallbackGenericEcho)807 TEST_P(HybridEnd2endTest, CallbackGenericEcho) {
808   EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
809   class GenericEchoService : public CallbackGenericService {
810    private:
811     ServerGenericBidiReactor* CreateReactor(
812         GenericCallbackServerContext* context) override {
813       EXPECT_EQ(context->method(), "/grpc.testing.EchoTestService/Echo");
814       VLOG(2) << "Constructor of generic service "
815               << context->deadline().time_since_epoch().count();
816 
817       class Reactor : public ServerGenericBidiReactor {
818        public:
819         Reactor() { StartRead(&request_); }
820 
821        private:
822         void OnDone() override { delete this; }
823         void OnReadDone(bool ok) override {
824           if (!ok) {
825             EXPECT_EQ(reads_complete_, 1);
826           } else {
827             EXPECT_EQ(reads_complete_++, 0);
828             response_ = request_;
829             StartWrite(&response_);
830             StartRead(&request_);
831           }
832         }
833         void OnWriteDone(bool ok) override {
834           Finish(ok ? Status::OK
835                     : Status(StatusCode::UNKNOWN, "Unexpected failure"));
836         }
837         ByteBuffer request_;
838         ByteBuffer response_;
839         std::atomic_int reads_complete_{0};
840       };
841       return new Reactor;
842     }
843   } generic_service;
844 
845   if (!SetUpServer(&service, nullptr, nullptr, &generic_service)) {
846     return;
847   }
848   ResetStub();
849   TestAllMethods();
850 }
851 
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream)852 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) {
853   typedef EchoTestService::WithAsyncMethod_RequestStream<
854       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
855       SType;
856   SType service;
857   AsyncGenericService generic_service;
858   SetUpServer(&service, nullptr, &generic_service, nullptr);
859   ResetStub();
860   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
861                                      cqs_[0].get());
862   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
863                                             &service, cqs_[1].get());
864   TestAllMethods();
865   generic_handler_thread.join();
866   request_stream_handler_thread.join();
867 }
868 
869 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamSyncDupService)870 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamSyncDupService) {
871   typedef EchoTestService::WithAsyncMethod_RequestStream<
872       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
873       SType;
874   SType service;
875   AsyncGenericService generic_service;
876   TestServiceImplDupPkg dup_service;
877   SetUpServer(&service, &dup_service, &generic_service, nullptr);
878   ResetStub();
879   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
880                                      cqs_[0].get());
881   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
882                                             &service, cqs_[1].get());
883   TestAllMethods();
884   SendEchoToDupService();
885   generic_handler_thread.join();
886   request_stream_handler_thread.join();
887 }
888 
889 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamAsyncDupService)890 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamAsyncDupService) {
891   typedef EchoTestService::WithAsyncMethod_RequestStream<
892       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
893       SType;
894   SType service;
895   AsyncGenericService generic_service;
896   duplicate::EchoTestService::AsyncService dup_service;
897   SetUpServer(&service, &dup_service, &generic_service, nullptr);
898   ResetStub();
899   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
900                                      cqs_[0].get());
901   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
902                                             &service, cqs_[1].get());
903   std::thread echo_handler_thread(
904       HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
905       cqs_[2].get(), true);
906   TestAllMethods();
907   SendEchoToDupService();
908   generic_handler_thread.join();
909   request_stream_handler_thread.join();
910   echo_handler_thread.join();
911 }
912 
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamResponseStream)913 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
914   typedef EchoTestService::WithAsyncMethod_RequestStream<
915       EchoTestService::WithGenericMethod_Echo<
916           EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
917       SType;
918   SType service;
919   AsyncGenericService generic_service;
920   SetUpServer(&service, nullptr, &generic_service, nullptr);
921   ResetStub();
922   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
923                                      cqs_[0].get());
924   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
925                                             &service, cqs_[1].get());
926   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
927                                              &service, cqs_[2].get());
928   TestAllMethods();
929   generic_handler_thread.join();
930   request_stream_handler_thread.join();
931   response_stream_handler_thread.join();
932 }
933 
TEST_F(HybridEnd2endTest,GenericEchoRequestStreamAsyncResponseStream)934 TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) {
935   typedef EchoTestService::WithGenericMethod_RequestStream<
936       EchoTestService::WithGenericMethod_Echo<
937           EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
938       SType;
939   SType service;
940   AsyncGenericService generic_service;
941   SetUpServer(&service, nullptr, &generic_service, nullptr);
942   ResetStub();
943   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
944                                      cqs_[0].get());
945   std::thread generic_handler_thread2(HandleGenericCall, &generic_service,
946                                       cqs_[1].get());
947   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
948                                              &service, cqs_[2].get());
949   TestAllMethods();
950   generic_handler_thread.join();
951   generic_handler_thread2.join();
952   response_stream_handler_thread.join();
953 }
954 
955 // If WithGenericMethod is called and no generic service is registered, the
956 // server will fail to build.
TEST_F(HybridEnd2endTest,GenericMethodWithoutGenericService)957 TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) {
958   EchoTestService::WithGenericMethod_RequestStream<
959       EchoTestService::WithGenericMethod_Echo<
960           EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
961       service;
962   SetUpServer(&service, nullptr, nullptr, nullptr);
963   EXPECT_EQ(nullptr, server_.get());
964 }
965 
966 INSTANTIATE_TEST_SUITE_P(HybridEnd2endTest, HybridEnd2endTest,
967                          ::testing::Bool());
968 
969 }  // namespace
970 }  // namespace testing
971 }  // namespace grpc
972 
main(int argc,char ** argv)973 int main(int argc, char** argv) {
974   grpc::testing::TestEnvironment env(&argc, argv);
975   ::testing::InitGoogleTest(&argc, argv);
976   return RUN_ALL_TESTS();
977 }
978