1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <memory>
20 #include <thread>
21
22 #include <grpc/grpc.h>
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/create_channel.h>
26 #include <grpcpp/generic/async_generic_service.h>
27 #include <grpcpp/server.h>
28 #include <grpcpp/server_builder.h>
29 #include <grpcpp/server_context.h>
30
31 #include "src/core/lib/gpr/env.h"
32 #include "src/core/lib/iomgr/iomgr.h"
33 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
34 #include "src/proto/grpc/testing/echo.grpc.pb.h"
35 #include "test/core/util/port.h"
36 #include "test/core/util/test_config.h"
37 #include "test/cpp/end2end/test_service_impl.h"
38 #include "test/cpp/util/byte_buffer_proto_helper.h"
39
40 #include <gtest/gtest.h>
41
42 namespace grpc {
43 namespace testing {
44 namespace {
45
46 #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL
47 using ::grpc::experimental::CallbackGenericService;
48 using ::grpc::experimental::GenericCallbackServerContext;
49 using ::grpc::experimental::ServerGenericBidiReactor;
50 #endif
51
tag(int i)52 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
53
VerifyReturnSuccess(CompletionQueue * cq,int i)54 bool VerifyReturnSuccess(CompletionQueue* cq, int i) {
55 void* got_tag;
56 bool ok;
57 EXPECT_TRUE(cq->Next(&got_tag, &ok));
58 EXPECT_EQ(tag(i), got_tag);
59 return ok;
60 }
61
Verify(CompletionQueue * cq,int i,bool expect_ok)62 void Verify(CompletionQueue* cq, int i, bool expect_ok) {
63 EXPECT_EQ(expect_ok, VerifyReturnSuccess(cq, i));
64 }
65
66 // Handlers to handle async request at a server. To be run in a separate thread.
67 template <class Service>
HandleEcho(Service * service,ServerCompletionQueue * cq,bool dup_service)68 void HandleEcho(Service* service, ServerCompletionQueue* cq, bool dup_service) {
69 ServerContext srv_ctx;
70 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
71 EchoRequest recv_request;
72 EchoResponse send_response;
73 service->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq, cq,
74 tag(1));
75 Verify(cq, 1, true);
76 send_response.set_message(recv_request.message());
77 if (dup_service) {
78 send_response.mutable_message()->append("_dup");
79 }
80 response_writer.Finish(send_response, Status::OK, tag(2));
81 Verify(cq, 2, true);
82 }
83
84 // Handlers to handle raw request at a server. To be run in a
85 // separate thread. Note that this is the same as the async version, except
86 // that the req/resp are ByteBuffers
87 template <class Service>
HandleRawEcho(Service * service,ServerCompletionQueue * cq,bool)88 void HandleRawEcho(Service* service, ServerCompletionQueue* cq,
89 bool /*dup_service*/) {
90 ServerContext srv_ctx;
91 GenericServerAsyncResponseWriter response_writer(&srv_ctx);
92 ByteBuffer recv_buffer;
93 service->RequestEcho(&srv_ctx, &recv_buffer, &response_writer, cq, cq,
94 tag(1));
95 Verify(cq, 1, true);
96 EchoRequest recv_request;
97 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
98 EchoResponse send_response;
99 send_response.set_message(recv_request.message());
100 auto send_buffer = SerializeToByteBuffer(&send_response);
101 response_writer.Finish(*send_buffer, Status::OK, tag(2));
102 Verify(cq, 2, true);
103 }
104
105 template <class Service>
HandleClientStreaming(Service * service,ServerCompletionQueue * cq)106 void HandleClientStreaming(Service* service, ServerCompletionQueue* cq) {
107 ServerContext srv_ctx;
108 EchoRequest recv_request;
109 EchoResponse send_response;
110 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
111 service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
112 Verify(cq, 1, true);
113 int i = 1;
114 do {
115 i++;
116 send_response.mutable_message()->append(recv_request.message());
117 srv_stream.Read(&recv_request, tag(i));
118 } while (VerifyReturnSuccess(cq, i));
119 srv_stream.Finish(send_response, Status::OK, tag(100));
120 Verify(cq, 100, true);
121 }
122
123 template <class Service>
HandleRawClientStreaming(Service * service,ServerCompletionQueue * cq)124 void HandleRawClientStreaming(Service* service, ServerCompletionQueue* cq) {
125 ServerContext srv_ctx;
126 ByteBuffer recv_buffer;
127 EchoRequest recv_request;
128 EchoResponse send_response;
129 GenericServerAsyncReader srv_stream(&srv_ctx);
130 service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
131 Verify(cq, 1, true);
132 int i = 1;
133 while (true) {
134 i++;
135 srv_stream.Read(&recv_buffer, tag(i));
136 if (!VerifyReturnSuccess(cq, i)) {
137 break;
138 }
139 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
140 send_response.mutable_message()->append(recv_request.message());
141 }
142 auto send_buffer = SerializeToByteBuffer(&send_response);
143 srv_stream.Finish(*send_buffer, Status::OK, tag(100));
144 Verify(cq, 100, true);
145 }
146
147 template <class Service>
HandleServerStreaming(Service * service,ServerCompletionQueue * cq)148 void HandleServerStreaming(Service* service, ServerCompletionQueue* cq) {
149 ServerContext srv_ctx;
150 EchoRequest recv_request;
151 EchoResponse send_response;
152 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
153 service->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq, cq,
154 tag(1));
155 Verify(cq, 1, true);
156 send_response.set_message(recv_request.message() + "0");
157 srv_stream.Write(send_response, tag(2));
158 Verify(cq, 2, true);
159 send_response.set_message(recv_request.message() + "1");
160 srv_stream.Write(send_response, tag(3));
161 Verify(cq, 3, true);
162 send_response.set_message(recv_request.message() + "2");
163 srv_stream.Write(send_response, tag(4));
164 Verify(cq, 4, true);
165 srv_stream.Finish(Status::OK, tag(5));
166 Verify(cq, 5, true);
167 }
168
HandleGenericEcho(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)169 void HandleGenericEcho(GenericServerAsyncReaderWriter* stream,
170 CompletionQueue* cq) {
171 ByteBuffer recv_buffer;
172 stream->Read(&recv_buffer, tag(2));
173 Verify(cq, 2, true);
174 EchoRequest recv_request;
175 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
176 EchoResponse send_response;
177 send_response.set_message(recv_request.message());
178 auto send_buffer = SerializeToByteBuffer(&send_response);
179 stream->Write(*send_buffer, tag(3));
180 Verify(cq, 3, true);
181 stream->Finish(Status::OK, tag(4));
182 Verify(cq, 4, true);
183 }
184
HandleGenericRequestStream(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)185 void HandleGenericRequestStream(GenericServerAsyncReaderWriter* stream,
186 CompletionQueue* cq) {
187 ByteBuffer recv_buffer;
188 EchoRequest recv_request;
189 EchoResponse send_response;
190 int i = 1;
191 while (true) {
192 i++;
193 stream->Read(&recv_buffer, tag(i));
194 if (!VerifyReturnSuccess(cq, i)) {
195 break;
196 }
197 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
198 send_response.mutable_message()->append(recv_request.message());
199 }
200 auto send_buffer = SerializeToByteBuffer(&send_response);
201 stream->Write(*send_buffer, tag(99));
202 Verify(cq, 99, true);
203 stream->Finish(Status::OK, tag(100));
204 Verify(cq, 100, true);
205 }
206
207 // Request and handle one generic call.
HandleGenericCall(AsyncGenericService * service,ServerCompletionQueue * cq)208 void HandleGenericCall(AsyncGenericService* service,
209 ServerCompletionQueue* cq) {
210 GenericServerContext srv_ctx;
211 GenericServerAsyncReaderWriter stream(&srv_ctx);
212 service->RequestCall(&srv_ctx, &stream, cq, cq, tag(1));
213 Verify(cq, 1, true);
214 if (srv_ctx.method() == "/grpc.testing.EchoTestService/Echo") {
215 HandleGenericEcho(&stream, cq);
216 } else if (srv_ctx.method() ==
217 "/grpc.testing.EchoTestService/RequestStream") {
218 HandleGenericRequestStream(&stream, cq);
219 } else { // other methods not handled yet.
220 gpr_log(GPR_ERROR, "method: %s", srv_ctx.method().c_str());
221 GPR_ASSERT(0);
222 }
223 }
224
225 class TestServiceImplDupPkg
226 : public ::grpc::testing::duplicate::EchoTestService::Service {
227 public:
Echo(ServerContext *,const EchoRequest * request,EchoResponse * response)228 Status Echo(ServerContext* /*context*/, const EchoRequest* request,
229 EchoResponse* response) override {
230 response->set_message(request->message() + "_dup");
231 return Status::OK;
232 }
233 };
234
235 class HybridEnd2endTest : public ::testing::TestWithParam<bool> {
236 protected:
HybridEnd2endTest()237 HybridEnd2endTest() {}
238
SetUpTestCase()239 static void SetUpTestCase() {
240 #if TARGET_OS_IPHONE
241 // Workaround Apple CFStream bug
242 gpr_setenv("grpc_cfstream", "0");
243 #endif
244 }
245
SetUp()246 void SetUp() override {
247 inproc_ = (::testing::UnitTest::GetInstance()
248 ->current_test_info()
249 ->value_param() != nullptr)
250 ? GetParam()
251 : false;
252 }
253
SetUpServer(::grpc::Service * service1,::grpc::Service * service2,AsyncGenericService * generic_service,CallbackGenericService * callback_generic_service,int max_message_size=0)254 bool SetUpServer(::grpc::Service* service1, ::grpc::Service* service2,
255 AsyncGenericService* generic_service,
256 CallbackGenericService* callback_generic_service,
257 int max_message_size = 0) {
258 int port = grpc_pick_unused_port_or_die();
259 server_address_ << "localhost:" << port;
260
261 // Setup server
262 ServerBuilder builder;
263 builder.AddListeningPort(server_address_.str(),
264 grpc::InsecureServerCredentials());
265 // Always add a sync unimplemented service: we rely on having at least one
266 // synchronous method to get a listening cq
267 builder.RegisterService(&unimplemented_service_);
268 builder.RegisterService(service1);
269 if (service2) {
270 builder.RegisterService(service2);
271 }
272 if (generic_service) {
273 builder.RegisterAsyncGenericService(generic_service);
274 }
275 if (callback_generic_service) {
276 #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
277 builder.RegisterCallbackGenericService(callback_generic_service);
278 #else
279 builder.experimental().RegisterCallbackGenericService(
280 callback_generic_service);
281 #endif
282 }
283
284 if (max_message_size != 0) {
285 builder.SetMaxMessageSize(max_message_size);
286 }
287
288 // Create a separate cq for each potential handler.
289 for (int i = 0; i < 5; i++) {
290 cqs_.push_back(builder.AddCompletionQueue(false));
291 }
292 server_ = builder.BuildAndStart();
293
294 // If there is a generic callback service, this setup is only successful if
295 // we have an iomgr that can run in the background or are inprocess
296 return !callback_generic_service || grpc_iomgr_run_in_background() ||
297 inproc_;
298 }
299
TearDown()300 void TearDown() override {
301 if (server_) {
302 server_->Shutdown();
303 }
304 void* ignored_tag;
305 bool ignored_ok;
306 for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
307 (*it)->Shutdown();
308 while ((*it)->Next(&ignored_tag, &ignored_ok))
309 ;
310 }
311 }
312
ResetStub()313 void ResetStub() {
314 std::shared_ptr<Channel> channel =
315 inproc_ ? server_->InProcessChannel(ChannelArguments())
316 : grpc::CreateChannel(server_address_.str(),
317 InsecureChannelCredentials());
318 stub_ = grpc::testing::EchoTestService::NewStub(channel);
319 }
320
321 // Test all rpc methods.
TestAllMethods()322 void TestAllMethods() {
323 SendEcho();
324 SendSimpleClientStreaming();
325 SendSimpleServerStreaming();
326 SendBidiStreaming();
327 }
328
SendEcho()329 void SendEcho() {
330 EchoRequest send_request;
331 EchoResponse recv_response;
332 ClientContext cli_ctx;
333 cli_ctx.set_wait_for_ready(true);
334 send_request.set_message("Hello");
335 Status recv_status = stub_->Echo(&cli_ctx, send_request, &recv_response);
336 EXPECT_EQ(send_request.message(), recv_response.message());
337 EXPECT_TRUE(recv_status.ok());
338 }
339
SendEchoToDupService()340 void SendEchoToDupService() {
341 std::shared_ptr<Channel> channel = grpc::CreateChannel(
342 server_address_.str(), InsecureChannelCredentials());
343 auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
344 EchoRequest send_request;
345 EchoResponse recv_response;
346 ClientContext cli_ctx;
347 cli_ctx.set_wait_for_ready(true);
348 send_request.set_message("Hello");
349 Status recv_status = stub->Echo(&cli_ctx, send_request, &recv_response);
350 EXPECT_EQ(send_request.message() + "_dup", recv_response.message());
351 EXPECT_TRUE(recv_status.ok());
352 }
353
SendSimpleClientStreaming()354 void SendSimpleClientStreaming() {
355 EchoRequest send_request;
356 EchoResponse recv_response;
357 std::string expected_message;
358 ClientContext cli_ctx;
359 cli_ctx.set_wait_for_ready(true);
360 send_request.set_message("Hello");
361 auto stream = stub_->RequestStream(&cli_ctx, &recv_response);
362 for (int i = 0; i < 5; i++) {
363 EXPECT_TRUE(stream->Write(send_request));
364 expected_message.append(send_request.message());
365 }
366 stream->WritesDone();
367 Status recv_status = stream->Finish();
368 EXPECT_EQ(expected_message, recv_response.message());
369 EXPECT_TRUE(recv_status.ok());
370 }
371
SendSimpleServerStreaming()372 void SendSimpleServerStreaming() {
373 EchoRequest request;
374 EchoResponse response;
375 ClientContext context;
376 context.set_wait_for_ready(true);
377 request.set_message("hello");
378
379 auto stream = stub_->ResponseStream(&context, request);
380 EXPECT_TRUE(stream->Read(&response));
381 EXPECT_EQ(response.message(), request.message() + "0");
382 EXPECT_TRUE(stream->Read(&response));
383 EXPECT_EQ(response.message(), request.message() + "1");
384 EXPECT_TRUE(stream->Read(&response));
385 EXPECT_EQ(response.message(), request.message() + "2");
386 EXPECT_FALSE(stream->Read(&response));
387
388 Status s = stream->Finish();
389 EXPECT_TRUE(s.ok());
390 }
391
SendSimpleServerStreamingToDupService()392 void SendSimpleServerStreamingToDupService() {
393 std::shared_ptr<Channel> channel = grpc::CreateChannel(
394 server_address_.str(), InsecureChannelCredentials());
395 auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
396 EchoRequest request;
397 EchoResponse response;
398 ClientContext context;
399 context.set_wait_for_ready(true);
400 request.set_message("hello");
401
402 auto stream = stub->ResponseStream(&context, request);
403 EXPECT_TRUE(stream->Read(&response));
404 EXPECT_EQ(response.message(), request.message() + "0_dup");
405 EXPECT_TRUE(stream->Read(&response));
406 EXPECT_EQ(response.message(), request.message() + "1_dup");
407 EXPECT_TRUE(stream->Read(&response));
408 EXPECT_EQ(response.message(), request.message() + "2_dup");
409 EXPECT_FALSE(stream->Read(&response));
410
411 Status s = stream->Finish();
412 EXPECT_TRUE(s.ok());
413 }
414
SendBidiStreaming()415 void SendBidiStreaming() {
416 EchoRequest request;
417 EchoResponse response;
418 ClientContext context;
419 context.set_wait_for_ready(true);
420 std::string msg("hello");
421
422 auto stream = stub_->BidiStream(&context);
423
424 request.set_message(msg + "0");
425 EXPECT_TRUE(stream->Write(request));
426 EXPECT_TRUE(stream->Read(&response));
427 EXPECT_EQ(response.message(), request.message());
428
429 request.set_message(msg + "1");
430 EXPECT_TRUE(stream->Write(request));
431 EXPECT_TRUE(stream->Read(&response));
432 EXPECT_EQ(response.message(), request.message());
433
434 request.set_message(msg + "2");
435 EXPECT_TRUE(stream->Write(request));
436 EXPECT_TRUE(stream->Read(&response));
437 EXPECT_EQ(response.message(), request.message());
438
439 stream->WritesDone();
440 EXPECT_FALSE(stream->Read(&response));
441 EXPECT_FALSE(stream->Read(&response));
442
443 Status s = stream->Finish();
444 EXPECT_TRUE(s.ok());
445 }
446
447 grpc::testing::UnimplementedEchoService::Service unimplemented_service_;
448 std::vector<std::unique_ptr<ServerCompletionQueue>> cqs_;
449 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
450 std::unique_ptr<Server> server_;
451 std::ostringstream server_address_;
452 bool inproc_;
453 };
454
TEST_F(HybridEnd2endTest,AsyncEcho)455 TEST_F(HybridEnd2endTest, AsyncEcho) {
456 typedef EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> SType;
457 SType service;
458 SetUpServer(&service, nullptr, nullptr, nullptr);
459 ResetStub();
460 std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
461 false);
462 TestAllMethods();
463 echo_handler_thread.join();
464 }
465
TEST_F(HybridEnd2endTest,RawEcho)466 TEST_F(HybridEnd2endTest, RawEcho) {
467 typedef EchoTestService::WithRawMethod_Echo<TestServiceImpl> SType;
468 SType service;
469 SetUpServer(&service, nullptr, nullptr, nullptr);
470 ResetStub();
471 std::thread echo_handler_thread(HandleRawEcho<SType>, &service, cqs_[0].get(),
472 false);
473 TestAllMethods();
474 echo_handler_thread.join();
475 }
476
TEST_F(HybridEnd2endTest,RawRequestStream)477 TEST_F(HybridEnd2endTest, RawRequestStream) {
478 typedef EchoTestService::WithRawMethod_RequestStream<TestServiceImpl> SType;
479 SType service;
480 SetUpServer(&service, nullptr, nullptr, nullptr);
481 ResetStub();
482 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
483 &service, cqs_[0].get());
484 TestAllMethods();
485 request_stream_handler_thread.join();
486 }
487
TEST_F(HybridEnd2endTest,AsyncEchoRawRequestStream)488 TEST_F(HybridEnd2endTest, AsyncEchoRawRequestStream) {
489 typedef EchoTestService::WithRawMethod_RequestStream<
490 EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
491 SType;
492 SType service;
493 SetUpServer(&service, nullptr, nullptr, nullptr);
494 ResetStub();
495 std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
496 false);
497 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
498 &service, cqs_[1].get());
499 TestAllMethods();
500 request_stream_handler_thread.join();
501 echo_handler_thread.join();
502 }
503
TEST_F(HybridEnd2endTest,GenericEchoRawRequestStream)504 TEST_F(HybridEnd2endTest, GenericEchoRawRequestStream) {
505 typedef EchoTestService::WithRawMethod_RequestStream<
506 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
507 SType;
508 SType service;
509 AsyncGenericService generic_service;
510 SetUpServer(&service, nullptr, &generic_service, nullptr);
511 ResetStub();
512 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
513 cqs_[0].get());
514 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
515 &service, cqs_[1].get());
516 TestAllMethods();
517 generic_handler_thread.join();
518 request_stream_handler_thread.join();
519 }
520
TEST_F(HybridEnd2endTest,AsyncEchoRequestStream)521 TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) {
522 typedef EchoTestService::WithAsyncMethod_RequestStream<
523 EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
524 SType;
525 SType service;
526 SetUpServer(&service, nullptr, nullptr, nullptr);
527 ResetStub();
528 std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
529 false);
530 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
531 &service, cqs_[1].get());
532 TestAllMethods();
533 echo_handler_thread.join();
534 request_stream_handler_thread.join();
535 }
536
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream)537 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) {
538 typedef EchoTestService::WithAsyncMethod_RequestStream<
539 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
540 SType;
541 SType service;
542 SetUpServer(&service, nullptr, nullptr, nullptr);
543 ResetStub();
544 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
545 &service, cqs_[0].get());
546 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
547 &service, cqs_[1].get());
548 TestAllMethods();
549 response_stream_handler_thread.join();
550 request_stream_handler_thread.join();
551 }
552
553 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncDupService)554 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) {
555 typedef EchoTestService::WithAsyncMethod_RequestStream<
556 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
557 SType;
558 SType service;
559 TestServiceImplDupPkg dup_service;
560 SetUpServer(&service, &dup_service, nullptr, nullptr);
561 ResetStub();
562 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
563 &service, cqs_[0].get());
564 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
565 &service, cqs_[1].get());
566 TestAllMethods();
567 SendEchoToDupService();
568 response_stream_handler_thread.join();
569 request_stream_handler_thread.join();
570 }
571
572 // Add a second service with one sync streamed unary method.
573 class StreamedUnaryDupPkg
574 : public duplicate::EchoTestService::WithStreamedUnaryMethod_Echo<
575 TestServiceImplDupPkg> {
576 public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)577 Status StreamedEcho(
578 ServerContext* /*context*/,
579 ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
580 EchoRequest req;
581 EchoResponse resp;
582 uint32_t next_msg_sz;
583 stream->NextMessageSize(&next_msg_sz);
584 gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
585 GPR_ASSERT(stream->Read(&req));
586 resp.set_message(req.message() + "_dup");
587 GPR_ASSERT(stream->Write(resp));
588 return Status::OK;
589 }
590 };
591
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncStreamedUnaryDupService)592 TEST_F(HybridEnd2endTest,
593 AsyncRequestStreamResponseStream_SyncStreamedUnaryDupService) {
594 typedef EchoTestService::WithAsyncMethod_RequestStream<
595 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
596 SType;
597 SType service;
598 StreamedUnaryDupPkg dup_service;
599 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
600 ResetStub();
601 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
602 &service, cqs_[0].get());
603 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
604 &service, cqs_[1].get());
605 TestAllMethods();
606 SendEchoToDupService();
607 response_stream_handler_thread.join();
608 request_stream_handler_thread.join();
609 }
610
611 // Add a second service that is fully Streamed Unary
612 class FullyStreamedUnaryDupPkg
613 : public duplicate::EchoTestService::StreamedUnaryService {
614 public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)615 Status StreamedEcho(
616 ServerContext* /*context*/,
617 ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
618 EchoRequest req;
619 EchoResponse resp;
620 uint32_t next_msg_sz;
621 stream->NextMessageSize(&next_msg_sz);
622 gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
623 GPR_ASSERT(stream->Read(&req));
624 resp.set_message(req.message() + "_dup");
625 GPR_ASSERT(stream->Write(resp));
626 return Status::OK;
627 }
628 };
629
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncFullyStreamedUnaryDupService)630 TEST_F(HybridEnd2endTest,
631 AsyncRequestStreamResponseStream_SyncFullyStreamedUnaryDupService) {
632 typedef EchoTestService::WithAsyncMethod_RequestStream<
633 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
634 SType;
635 SType service;
636 FullyStreamedUnaryDupPkg dup_service;
637 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
638 ResetStub();
639 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
640 &service, cqs_[0].get());
641 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
642 &service, cqs_[1].get());
643 TestAllMethods();
644 SendEchoToDupService();
645 response_stream_handler_thread.join();
646 request_stream_handler_thread.join();
647 }
648
649 // Add a second service with one sync split server streaming method.
650 class SplitResponseStreamDupPkg
651 : public duplicate::EchoTestService::
652 WithSplitStreamingMethod_ResponseStream<TestServiceImplDupPkg> {
653 public:
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)654 Status StreamedResponseStream(
655 ServerContext* /*context*/,
656 ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
657 EchoRequest req;
658 EchoResponse resp;
659 uint32_t next_msg_sz;
660 stream->NextMessageSize(&next_msg_sz);
661 gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
662 GPR_ASSERT(stream->Read(&req));
663 for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
664 resp.set_message(req.message() + std::to_string(i) + "_dup");
665 GPR_ASSERT(stream->Write(resp));
666 }
667 return Status::OK;
668 }
669 };
670
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_SyncSplitStreamedDupService)671 TEST_F(HybridEnd2endTest,
672 AsyncRequestStreamResponseStream_SyncSplitStreamedDupService) {
673 typedef EchoTestService::WithAsyncMethod_RequestStream<
674 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
675 SType;
676 SType service;
677 SplitResponseStreamDupPkg dup_service;
678 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
679 ResetStub();
680 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
681 &service, cqs_[0].get());
682 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
683 &service, cqs_[1].get());
684 TestAllMethods();
685 SendSimpleServerStreamingToDupService();
686 response_stream_handler_thread.join();
687 request_stream_handler_thread.join();
688 }
689
690 // Add a second service that is fully split server streamed
691 class FullySplitStreamedDupPkg
692 : public duplicate::EchoTestService::SplitStreamedService {
693 public:
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)694 Status StreamedResponseStream(
695 ServerContext* /*context*/,
696 ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
697 EchoRequest req;
698 EchoResponse resp;
699 uint32_t next_msg_sz;
700 stream->NextMessageSize(&next_msg_sz);
701 gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
702 GPR_ASSERT(stream->Read(&req));
703 for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
704 resp.set_message(req.message() + std::to_string(i) + "_dup");
705 GPR_ASSERT(stream->Write(resp));
706 }
707 return Status::OK;
708 }
709 };
710
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_FullySplitStreamedDupService)711 TEST_F(HybridEnd2endTest,
712 AsyncRequestStreamResponseStream_FullySplitStreamedDupService) {
713 typedef EchoTestService::WithAsyncMethod_RequestStream<
714 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
715 SType;
716 SType service;
717 FullySplitStreamedDupPkg dup_service;
718 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
719 ResetStub();
720 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
721 &service, cqs_[0].get());
722 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
723 &service, cqs_[1].get());
724 TestAllMethods();
725 SendSimpleServerStreamingToDupService();
726 response_stream_handler_thread.join();
727 request_stream_handler_thread.join();
728 }
729
730 // Add a second service that is fully server streamed
731 class FullyStreamedDupPkg : public duplicate::EchoTestService::StreamedService {
732 public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)733 Status StreamedEcho(
734 ServerContext* /*context*/,
735 ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
736 EchoRequest req;
737 EchoResponse resp;
738 uint32_t next_msg_sz;
739 stream->NextMessageSize(&next_msg_sz);
740 gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
741 GPR_ASSERT(stream->Read(&req));
742 resp.set_message(req.message() + "_dup");
743 GPR_ASSERT(stream->Write(resp));
744 return Status::OK;
745 }
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)746 Status StreamedResponseStream(
747 ServerContext* /*context*/,
748 ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
749 EchoRequest req;
750 EchoResponse resp;
751 uint32_t next_msg_sz;
752 stream->NextMessageSize(&next_msg_sz);
753 gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
754 GPR_ASSERT(stream->Read(&req));
755 for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
756 resp.set_message(req.message() + std::to_string(i) + "_dup");
757 GPR_ASSERT(stream->Write(resp));
758 }
759 return Status::OK;
760 }
761 };
762
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_FullyStreamedDupService)763 TEST_F(HybridEnd2endTest,
764 AsyncRequestStreamResponseStream_FullyStreamedDupService) {
765 typedef EchoTestService::WithAsyncMethod_RequestStream<
766 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
767 SType;
768 SType service;
769 FullyStreamedDupPkg dup_service;
770 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
771 ResetStub();
772 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
773 &service, cqs_[0].get());
774 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
775 &service, cqs_[1].get());
776 TestAllMethods();
777 SendEchoToDupService();
778 SendSimpleServerStreamingToDupService();
779 response_stream_handler_thread.join();
780 request_stream_handler_thread.join();
781 }
782
783 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream_AsyncDupService)784 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) {
785 typedef EchoTestService::WithAsyncMethod_RequestStream<
786 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
787 SType;
788 SType service;
789 duplicate::EchoTestService::AsyncService dup_service;
790 SetUpServer(&service, &dup_service, nullptr, nullptr);
791 ResetStub();
792 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
793 &service, cqs_[0].get());
794 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
795 &service, cqs_[1].get());
796 std::thread echo_handler_thread(
797 HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
798 cqs_[2].get(), true);
799 TestAllMethods();
800 SendEchoToDupService();
801 response_stream_handler_thread.join();
802 request_stream_handler_thread.join();
803 echo_handler_thread.join();
804 }
805
TEST_F(HybridEnd2endTest,GenericEcho)806 TEST_F(HybridEnd2endTest, GenericEcho) {
807 EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
808 AsyncGenericService generic_service;
809 SetUpServer(&service, nullptr, &generic_service, nullptr);
810 ResetStub();
811 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
812 cqs_[0].get());
813 TestAllMethods();
814 generic_handler_thread.join();
815 }
816
TEST_P(HybridEnd2endTest,CallbackGenericEcho)817 TEST_P(HybridEnd2endTest, CallbackGenericEcho) {
818 EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
819 class GenericEchoService : public CallbackGenericService {
820 private:
821 ServerGenericBidiReactor* CreateReactor(
822 GenericCallbackServerContext* context) override {
823 EXPECT_EQ(context->method(), "/grpc.testing.EchoTestService/Echo");
824 gpr_log(GPR_DEBUG, "Constructor of generic service %d",
825 static_cast<int>(context->deadline().time_since_epoch().count()));
826
827 class Reactor : public ServerGenericBidiReactor {
828 public:
829 Reactor() { StartRead(&request_); }
830
831 private:
832 void OnDone() override { delete this; }
833 void OnReadDone(bool ok) override {
834 if (!ok) {
835 EXPECT_EQ(reads_complete_, 1);
836 } else {
837 EXPECT_EQ(reads_complete_++, 0);
838 response_ = request_;
839 StartWrite(&response_);
840 StartRead(&request_);
841 }
842 }
843 void OnWriteDone(bool ok) override {
844 Finish(ok ? Status::OK
845 : Status(StatusCode::UNKNOWN, "Unexpected failure"));
846 }
847 ByteBuffer request_;
848 ByteBuffer response_;
849 std::atomic_int reads_complete_{0};
850 };
851 return new Reactor;
852 }
853 } generic_service;
854
855 if (!SetUpServer(&service, nullptr, nullptr, &generic_service)) {
856 return;
857 }
858 ResetStub();
859 TestAllMethods();
860 }
861
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream)862 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) {
863 typedef EchoTestService::WithAsyncMethod_RequestStream<
864 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
865 SType;
866 SType service;
867 AsyncGenericService generic_service;
868 SetUpServer(&service, nullptr, &generic_service, nullptr);
869 ResetStub();
870 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
871 cqs_[0].get());
872 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
873 &service, cqs_[1].get());
874 TestAllMethods();
875 generic_handler_thread.join();
876 request_stream_handler_thread.join();
877 }
878
879 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream_SyncDupService)880 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_SyncDupService) {
881 typedef EchoTestService::WithAsyncMethod_RequestStream<
882 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
883 SType;
884 SType service;
885 AsyncGenericService generic_service;
886 TestServiceImplDupPkg dup_service;
887 SetUpServer(&service, &dup_service, &generic_service, nullptr);
888 ResetStub();
889 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
890 cqs_[0].get());
891 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
892 &service, cqs_[1].get());
893 TestAllMethods();
894 SendEchoToDupService();
895 generic_handler_thread.join();
896 request_stream_handler_thread.join();
897 }
898
899 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream_AsyncDupService)900 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_AsyncDupService) {
901 typedef EchoTestService::WithAsyncMethod_RequestStream<
902 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
903 SType;
904 SType service;
905 AsyncGenericService generic_service;
906 duplicate::EchoTestService::AsyncService dup_service;
907 SetUpServer(&service, &dup_service, &generic_service, nullptr);
908 ResetStub();
909 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
910 cqs_[0].get());
911 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
912 &service, cqs_[1].get());
913 std::thread echo_handler_thread(
914 HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
915 cqs_[2].get(), true);
916 TestAllMethods();
917 SendEchoToDupService();
918 generic_handler_thread.join();
919 request_stream_handler_thread.join();
920 echo_handler_thread.join();
921 }
922
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamResponseStream)923 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
924 typedef EchoTestService::WithAsyncMethod_RequestStream<
925 EchoTestService::WithGenericMethod_Echo<
926 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
927 SType;
928 SType service;
929 AsyncGenericService generic_service;
930 SetUpServer(&service, nullptr, &generic_service, nullptr);
931 ResetStub();
932 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
933 cqs_[0].get());
934 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
935 &service, cqs_[1].get());
936 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
937 &service, cqs_[2].get());
938 TestAllMethods();
939 generic_handler_thread.join();
940 request_stream_handler_thread.join();
941 response_stream_handler_thread.join();
942 }
943
TEST_F(HybridEnd2endTest,GenericEchoRequestStreamAsyncResponseStream)944 TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) {
945 typedef EchoTestService::WithGenericMethod_RequestStream<
946 EchoTestService::WithGenericMethod_Echo<
947 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
948 SType;
949 SType service;
950 AsyncGenericService generic_service;
951 SetUpServer(&service, nullptr, &generic_service, nullptr);
952 ResetStub();
953 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
954 cqs_[0].get());
955 std::thread generic_handler_thread2(HandleGenericCall, &generic_service,
956 cqs_[1].get());
957 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
958 &service, cqs_[2].get());
959 TestAllMethods();
960 generic_handler_thread.join();
961 generic_handler_thread2.join();
962 response_stream_handler_thread.join();
963 }
964
965 // If WithGenericMethod is called and no generic service is registered, the
966 // server will fail to build.
TEST_F(HybridEnd2endTest,GenericMethodWithoutGenericService)967 TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) {
968 EchoTestService::WithGenericMethod_RequestStream<
969 EchoTestService::WithGenericMethod_Echo<
970 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
971 service;
972 SetUpServer(&service, nullptr, nullptr, nullptr);
973 EXPECT_EQ(nullptr, server_.get());
974 }
975
976 INSTANTIATE_TEST_SUITE_P(HybridEnd2endTest, HybridEnd2endTest,
977 ::testing::Bool());
978
979 } // namespace
980 } // namespace testing
981 } // namespace grpc
982
main(int argc,char ** argv)983 int main(int argc, char** argv) {
984 grpc::testing::TestEnvironment env(argc, argv);
985 ::testing::InitGoogleTest(&argc, argv);
986 return RUN_ALL_TESTS();
987 }
988