1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/grpc.h>
20 #include <grpcpp/channel.h>
21 #include <grpcpp/client_context.h>
22 #include <grpcpp/create_channel.h>
23 #include <grpcpp/generic/async_generic_service.h>
24 #include <grpcpp/server.h>
25 #include <grpcpp/server_builder.h>
26 #include <grpcpp/server_context.h>
27 #include <gtest/gtest.h>
28
29 #include <memory>
30 #include <thread>
31
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "src/core/lib/iomgr/iomgr.h"
35 #include "src/core/util/env.h"
36 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
37 #include "src/proto/grpc/testing/echo.grpc.pb.h"
38 #include "test/core/test_util/port.h"
39 #include "test/core/test_util/test_config.h"
40 #include "test/cpp/end2end/test_service_impl.h"
41 #include "test/cpp/util/byte_buffer_proto_helper.h"
42
43 namespace grpc {
44 namespace testing {
45 namespace {
46
tag(int i)47 void* tag(int i) { return reinterpret_cast<void*>(i); }
48
VerifyReturnSuccess(CompletionQueue * cq,int i)49 bool VerifyReturnSuccess(CompletionQueue* cq, int i) {
50 void* got_tag;
51 bool ok;
52 EXPECT_TRUE(cq->Next(&got_tag, &ok));
53 EXPECT_EQ(tag(i), got_tag);
54 return ok;
55 }
56
Verify(CompletionQueue * cq,int i,bool expect_ok)57 void Verify(CompletionQueue* cq, int i, bool expect_ok) {
58 EXPECT_EQ(expect_ok, VerifyReturnSuccess(cq, i));
59 }
60
61 // Handlers to handle async request at a server. To be run in a separate thread.
62 template <class Service>
HandleEcho(Service * service,ServerCompletionQueue * cq,bool dup_service)63 void HandleEcho(Service* service, ServerCompletionQueue* cq, bool dup_service) {
64 ServerContext srv_ctx;
65 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
66 EchoRequest recv_request;
67 EchoResponse send_response;
68 service->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq, cq,
69 tag(1));
70 Verify(cq, 1, true);
71 send_response.set_message(recv_request.message());
72 if (dup_service) {
73 send_response.mutable_message()->append("_dup");
74 }
75 response_writer.Finish(send_response, Status::OK, tag(2));
76 Verify(cq, 2, true);
77 }
78
79 // Handlers to handle raw request at a server. To be run in a
80 // separate thread. Note that this is the same as the async version, except
81 // that the req/resp are ByteBuffers
82 template <class Service>
HandleRawEcho(Service * service,ServerCompletionQueue * cq,bool)83 void HandleRawEcho(Service* service, ServerCompletionQueue* cq,
84 bool /*dup_service*/) {
85 ServerContext srv_ctx;
86 GenericServerAsyncResponseWriter response_writer(&srv_ctx);
87 ByteBuffer recv_buffer;
88 service->RequestEcho(&srv_ctx, &recv_buffer, &response_writer, cq, cq,
89 tag(1));
90 Verify(cq, 1, true);
91 EchoRequest recv_request;
92 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
93 EchoResponse send_response;
94 send_response.set_message(recv_request.message());
95 auto send_buffer = SerializeToByteBuffer(&send_response);
96 response_writer.Finish(*send_buffer, Status::OK, tag(2));
97 Verify(cq, 2, true);
98 }
99
100 template <class Service>
HandleClientStreaming(Service * service,ServerCompletionQueue * cq)101 void HandleClientStreaming(Service* service, ServerCompletionQueue* cq) {
102 ServerContext srv_ctx;
103 EchoRequest recv_request;
104 EchoResponse send_response;
105 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
106 service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
107 Verify(cq, 1, true);
108 int i = 1;
109 do {
110 i++;
111 send_response.mutable_message()->append(recv_request.message());
112 srv_stream.Read(&recv_request, tag(i));
113 } while (VerifyReturnSuccess(cq, i));
114 srv_stream.Finish(send_response, Status::OK, tag(100));
115 Verify(cq, 100, true);
116 }
117
118 template <class Service>
HandleRawClientStreaming(Service * service,ServerCompletionQueue * cq)119 void HandleRawClientStreaming(Service* service, ServerCompletionQueue* cq) {
120 ServerContext srv_ctx;
121 ByteBuffer recv_buffer;
122 EchoRequest recv_request;
123 EchoResponse send_response;
124 GenericServerAsyncReader srv_stream(&srv_ctx);
125 service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
126 Verify(cq, 1, true);
127 int i = 1;
128 while (true) {
129 i++;
130 srv_stream.Read(&recv_buffer, tag(i));
131 if (!VerifyReturnSuccess(cq, i)) {
132 break;
133 }
134 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
135 send_response.mutable_message()->append(recv_request.message());
136 }
137 auto send_buffer = SerializeToByteBuffer(&send_response);
138 srv_stream.Finish(*send_buffer, Status::OK, tag(100));
139 Verify(cq, 100, true);
140 }
141
142 template <class Service>
HandleServerStreaming(Service * service,ServerCompletionQueue * cq)143 void HandleServerStreaming(Service* service, ServerCompletionQueue* cq) {
144 ServerContext srv_ctx;
145 EchoRequest recv_request;
146 EchoResponse send_response;
147 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
148 service->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq, cq,
149 tag(1));
150 Verify(cq, 1, true);
151 send_response.set_message(recv_request.message() + "0");
152 srv_stream.Write(send_response, tag(2));
153 Verify(cq, 2, true);
154 send_response.set_message(recv_request.message() + "1");
155 srv_stream.Write(send_response, tag(3));
156 Verify(cq, 3, true);
157 send_response.set_message(recv_request.message() + "2");
158 srv_stream.Write(send_response, tag(4));
159 Verify(cq, 4, true);
160 srv_stream.Finish(Status::OK, tag(5));
161 Verify(cq, 5, true);
162 }
163
HandleGenericEcho(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)164 void HandleGenericEcho(GenericServerAsyncReaderWriter* stream,
165 CompletionQueue* cq) {
166 ByteBuffer recv_buffer;
167 stream->Read(&recv_buffer, tag(2));
168 Verify(cq, 2, true);
169 EchoRequest recv_request;
170 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
171 EchoResponse send_response;
172 send_response.set_message(recv_request.message());
173 auto send_buffer = SerializeToByteBuffer(&send_response);
174 stream->Write(*send_buffer, tag(3));
175 Verify(cq, 3, true);
176 stream->Finish(Status::OK, tag(4));
177 Verify(cq, 4, true);
178 }
179
HandleGenericRequestStream(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)180 void HandleGenericRequestStream(GenericServerAsyncReaderWriter* stream,
181 CompletionQueue* cq) {
182 ByteBuffer recv_buffer;
183 EchoRequest recv_request;
184 EchoResponse send_response;
185 int i = 1;
186 while (true) {
187 i++;
188 stream->Read(&recv_buffer, tag(i));
189 if (!VerifyReturnSuccess(cq, i)) {
190 break;
191 }
192 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
193 send_response.mutable_message()->append(recv_request.message());
194 }
195 auto send_buffer = SerializeToByteBuffer(&send_response);
196 stream->Write(*send_buffer, tag(99));
197 Verify(cq, 99, true);
198 stream->Finish(Status::OK, tag(100));
199 Verify(cq, 100, true);
200 }
201
202 // Request and handle one generic call.
HandleGenericCall(AsyncGenericService * service,ServerCompletionQueue * cq)203 void HandleGenericCall(AsyncGenericService* service,
204 ServerCompletionQueue* cq) {
205 GenericServerContext srv_ctx;
206 GenericServerAsyncReaderWriter stream(&srv_ctx);
207 service->RequestCall(&srv_ctx, &stream, cq, cq, tag(1));
208 Verify(cq, 1, true);
209 if (srv_ctx.method() == "/grpc.testing.EchoTestService/Echo") {
210 HandleGenericEcho(&stream, cq);
211 } else if (srv_ctx.method() ==
212 "/grpc.testing.EchoTestService/RequestStream") {
213 HandleGenericRequestStream(&stream, cq);
214 } else { // other methods not handled yet.
215 LOG(ERROR) << "method: " << srv_ctx.method();
216 CHECK(0);
217 }
218 }
219
220 class TestServiceImplDupPkg
221 : public grpc::testing::duplicate::EchoTestService::Service {
222 public:
Echo(ServerContext *,const EchoRequest * request,EchoResponse * response)223 Status Echo(ServerContext* /*context*/, const EchoRequest* request,
224 EchoResponse* response) override {
225 response->set_message(request->message() + "_dup");
226 return Status::OK;
227 }
228 };
229
230 class HybridEnd2endTest : public ::testing::TestWithParam<bool> {
231 protected:
HybridEnd2endTest()232 HybridEnd2endTest() {}
233
SetUpTestSuite()234 static void SetUpTestSuite() {
235 #if TARGET_OS_IPHONE
236 // Workaround Apple CFStream bug
237 grpc_core::SetEnv("grpc_cfstream", "0");
238 #endif
239 }
240
SetUp()241 void SetUp() override {
242 inproc_ = (::testing::UnitTest::GetInstance()
243 ->current_test_info()
244 ->value_param() != nullptr)
245 ? GetParam()
246 : false;
247 }
248
SetUpServer(grpc::Service * service1,grpc::Service * service2,AsyncGenericService * generic_service,CallbackGenericService * callback_generic_service,int max_message_size=0)249 bool SetUpServer(grpc::Service* service1, grpc::Service* service2,
250 AsyncGenericService* generic_service,
251 CallbackGenericService* callback_generic_service,
252 int max_message_size = 0) {
253 int port = grpc_pick_unused_port_or_die();
254 server_address_ << "localhost:" << port;
255
256 // Setup server
257 ServerBuilder builder;
258 builder.AddListeningPort(server_address_.str(),
259 grpc::InsecureServerCredentials());
260 // Always add a sync unimplemented service: we rely on having at least one
261 // synchronous method to get a listening cq
262 builder.RegisterService(&unimplemented_service_);
263 builder.RegisterService(service1);
264 if (service2) {
265 builder.RegisterService(service2);
266 }
267 if (generic_service) {
268 builder.RegisterAsyncGenericService(generic_service);
269 }
270 if (callback_generic_service) {
271 builder.RegisterCallbackGenericService(callback_generic_service);
272 }
273
274 if (max_message_size != 0) {
275 builder.SetMaxMessageSize(max_message_size);
276 }
277
278 // Create a separate cq for each potential handler.
279 for (int i = 0; i < 5; i++) {
280 cqs_.push_back(builder.AddCompletionQueue(false));
281 }
282 server_ = builder.BuildAndStart();
283
284 // If there is a generic callback service, this setup is only successful if
285 // we have an iomgr that can run in the background or are inprocess
286 return !callback_generic_service || grpc_iomgr_run_in_background() ||
287 inproc_;
288 }
289
TearDown()290 void TearDown() override {
291 if (server_) {
292 server_->Shutdown();
293 }
294 void* ignored_tag;
295 bool ignored_ok;
296 for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
297 (*it)->Shutdown();
298 while ((*it)->Next(&ignored_tag, &ignored_ok)) {
299 }
300 }
301 }
302
ResetStub()303 void ResetStub() {
304 std::shared_ptr<Channel> channel =
305 inproc_ ? server_->InProcessChannel(ChannelArguments())
306 : grpc::CreateChannel(server_address_.str(),
307 InsecureChannelCredentials());
308 stub_ = grpc::testing::EchoTestService::NewStub(channel);
309 }
310
311 // Test all rpc methods.
TestAllMethods()312 void TestAllMethods() {
313 SendEcho();
314 SendSimpleClientStreaming();
315 SendSimpleServerStreaming();
316 SendBidiStreaming();
317 }
318
SendEcho()319 void SendEcho() {
320 EchoRequest send_request;
321 EchoResponse recv_response;
322 ClientContext cli_ctx;
323 cli_ctx.set_wait_for_ready(true);
324 send_request.set_message("Hello");
325 Status recv_status = stub_->Echo(&cli_ctx, send_request, &recv_response);
326 EXPECT_EQ(send_request.message(), recv_response.message());
327 EXPECT_TRUE(recv_status.ok());
328 }
329
SendEchoToDupService()330 void SendEchoToDupService() {
331 std::shared_ptr<Channel> channel = grpc::CreateChannel(
332 server_address_.str(), InsecureChannelCredentials());
333 auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
334 EchoRequest send_request;
335 EchoResponse recv_response;
336 ClientContext cli_ctx;
337 cli_ctx.set_wait_for_ready(true);
338 send_request.set_message("Hello");
339 Status recv_status = stub->Echo(&cli_ctx, send_request, &recv_response);
340 EXPECT_EQ(send_request.message() + "_dup", recv_response.message());
341 EXPECT_TRUE(recv_status.ok());
342 }
343
SendSimpleClientStreaming()344 void SendSimpleClientStreaming() {
345 EchoRequest send_request;
346 EchoResponse recv_response;
347 std::string expected_message;
348 ClientContext cli_ctx;
349 cli_ctx.set_wait_for_ready(true);
350 send_request.set_message("Hello");
351 auto stream = stub_->RequestStream(&cli_ctx, &recv_response);
352 for (int i = 0; i < 5; i++) {
353 EXPECT_TRUE(stream->Write(send_request));
354 expected_message.append(send_request.message());
355 }
356 stream->WritesDone();
357 Status recv_status = stream->Finish();
358 EXPECT_EQ(expected_message, recv_response.message());
359 EXPECT_TRUE(recv_status.ok());
360 }
361
SendSimpleServerStreaming()362 void SendSimpleServerStreaming() {
363 EchoRequest request;
364 EchoResponse response;
365 ClientContext context;
366 context.set_wait_for_ready(true);
367 request.set_message("hello");
368
369 auto stream = stub_->ResponseStream(&context, request);
370 EXPECT_TRUE(stream->Read(&response));
371 EXPECT_EQ(response.message(), request.message() + "0");
372 EXPECT_TRUE(stream->Read(&response));
373 EXPECT_EQ(response.message(), request.message() + "1");
374 EXPECT_TRUE(stream->Read(&response));
375 EXPECT_EQ(response.message(), request.message() + "2");
376 EXPECT_FALSE(stream->Read(&response));
377
378 Status s = stream->Finish();
379 EXPECT_TRUE(s.ok());
380 }
381
SendSimpleServerStreamingToDupService()382 void SendSimpleServerStreamingToDupService() {
383 std::shared_ptr<Channel> channel = grpc::CreateChannel(
384 server_address_.str(), InsecureChannelCredentials());
385 auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
386 EchoRequest request;
387 EchoResponse response;
388 ClientContext context;
389 context.set_wait_for_ready(true);
390 request.set_message("hello");
391
392 auto stream = stub->ResponseStream(&context, request);
393 EXPECT_TRUE(stream->Read(&response));
394 EXPECT_EQ(response.message(), request.message() + "0_dup");
395 EXPECT_TRUE(stream->Read(&response));
396 EXPECT_EQ(response.message(), request.message() + "1_dup");
397 EXPECT_TRUE(stream->Read(&response));
398 EXPECT_EQ(response.message(), request.message() + "2_dup");
399 EXPECT_FALSE(stream->Read(&response));
400
401 Status s = stream->Finish();
402 EXPECT_TRUE(s.ok());
403 }
404
SendBidiStreaming()405 void SendBidiStreaming() {
406 EchoRequest request;
407 EchoResponse response;
408 ClientContext context;
409 context.set_wait_for_ready(true);
410 std::string msg("hello");
411
412 auto stream = stub_->BidiStream(&context);
413
414 request.set_message(msg + "0");
415 EXPECT_TRUE(stream->Write(request));
416 EXPECT_TRUE(stream->Read(&response));
417 EXPECT_EQ(response.message(), request.message());
418
419 request.set_message(msg + "1");
420 EXPECT_TRUE(stream->Write(request));
421 EXPECT_TRUE(stream->Read(&response));
422 EXPECT_EQ(response.message(), request.message());
423
424 request.set_message(msg + "2");
425 EXPECT_TRUE(stream->Write(request));
426 EXPECT_TRUE(stream->Read(&response));
427 EXPECT_EQ(response.message(), request.message());
428
429 stream->WritesDone();
430 EXPECT_FALSE(stream->Read(&response));
431 EXPECT_FALSE(stream->Read(&response));
432
433 Status s = stream->Finish();
434 EXPECT_TRUE(s.ok());
435 }
436
437 grpc::testing::UnimplementedEchoService::Service unimplemented_service_;
438 std::vector<std::unique_ptr<ServerCompletionQueue>> cqs_;
439 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
440 std::unique_ptr<Server> server_;
441 std::ostringstream server_address_;
442 bool inproc_;
443 };
444
TEST_F(HybridEnd2endTest,AsyncEcho)445 TEST_F(HybridEnd2endTest, AsyncEcho) {
446 typedef EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> SType;
447 SType service;
448 SetUpServer(&service, nullptr, nullptr, nullptr);
449 ResetStub();
450 std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
451 false);
452 TestAllMethods();
453 echo_handler_thread.join();
454 }
455
TEST_F(HybridEnd2endTest,RawEcho)456 TEST_F(HybridEnd2endTest, RawEcho) {
457 typedef EchoTestService::WithRawMethod_Echo<TestServiceImpl> SType;
458 SType service;
459 SetUpServer(&service, nullptr, nullptr, nullptr);
460 ResetStub();
461 std::thread echo_handler_thread(HandleRawEcho<SType>, &service, cqs_[0].get(),
462 false);
463 TestAllMethods();
464 echo_handler_thread.join();
465 }
466
TEST_F(HybridEnd2endTest,RawRequestStream)467 TEST_F(HybridEnd2endTest, RawRequestStream) {
468 typedef EchoTestService::WithRawMethod_RequestStream<TestServiceImpl> SType;
469 SType service;
470 SetUpServer(&service, nullptr, nullptr, nullptr);
471 ResetStub();
472 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
473 &service, cqs_[0].get());
474 TestAllMethods();
475 request_stream_handler_thread.join();
476 }
477
TEST_F(HybridEnd2endTest,AsyncEchoRawRequestStream)478 TEST_F(HybridEnd2endTest, AsyncEchoRawRequestStream) {
479 typedef EchoTestService::WithRawMethod_RequestStream<
480 EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
481 SType;
482 SType service;
483 SetUpServer(&service, nullptr, nullptr, nullptr);
484 ResetStub();
485 std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
486 false);
487 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
488 &service, cqs_[1].get());
489 TestAllMethods();
490 request_stream_handler_thread.join();
491 echo_handler_thread.join();
492 }
493
TEST_F(HybridEnd2endTest,GenericEchoRawRequestStream)494 TEST_F(HybridEnd2endTest, GenericEchoRawRequestStream) {
495 typedef EchoTestService::WithRawMethod_RequestStream<
496 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
497 SType;
498 SType service;
499 AsyncGenericService generic_service;
500 SetUpServer(&service, nullptr, &generic_service, nullptr);
501 ResetStub();
502 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
503 cqs_[0].get());
504 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
505 &service, cqs_[1].get());
506 TestAllMethods();
507 generic_handler_thread.join();
508 request_stream_handler_thread.join();
509 }
510
TEST_F(HybridEnd2endTest,AsyncEchoRequestStream)511 TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) {
512 typedef EchoTestService::WithAsyncMethod_RequestStream<
513 EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
514 SType;
515 SType service;
516 SetUpServer(&service, nullptr, nullptr, nullptr);
517 ResetStub();
518 std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
519 false);
520 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
521 &service, cqs_[1].get());
522 TestAllMethods();
523 echo_handler_thread.join();
524 request_stream_handler_thread.join();
525 }
526
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream)527 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) {
528 typedef EchoTestService::WithAsyncMethod_RequestStream<
529 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
530 SType;
531 SType service;
532 SetUpServer(&service, nullptr, nullptr, nullptr);
533 ResetStub();
534 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
535 &service, cqs_[0].get());
536 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
537 &service, cqs_[1].get());
538 TestAllMethods();
539 response_stream_handler_thread.join();
540 request_stream_handler_thread.join();
541 }
542
543 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncDupService)544 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStreamSyncDupService) {
545 typedef EchoTestService::WithAsyncMethod_RequestStream<
546 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
547 SType;
548 SType service;
549 TestServiceImplDupPkg dup_service;
550 SetUpServer(&service, &dup_service, nullptr, nullptr);
551 ResetStub();
552 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
553 &service, cqs_[0].get());
554 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
555 &service, cqs_[1].get());
556 TestAllMethods();
557 SendEchoToDupService();
558 response_stream_handler_thread.join();
559 request_stream_handler_thread.join();
560 }
561
562 // Add a second service with one sync streamed unary method.
563 class StreamedUnaryDupPkg
564 : public duplicate::EchoTestService::WithStreamedUnaryMethod_Echo<
565 TestServiceImplDupPkg> {
566 public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)567 Status StreamedEcho(
568 ServerContext* /*context*/,
569 ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
570 EchoRequest req;
571 EchoResponse resp;
572 uint32_t next_msg_sz;
573 stream->NextMessageSize(&next_msg_sz);
574 LOG(INFO) << "Streamed Unary Next Message Size is " << next_msg_sz;
575 CHECK(stream->Read(&req));
576 resp.set_message(req.message() + "_dup");
577 CHECK(stream->Write(resp));
578 return Status::OK;
579 }
580 };
581
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncStreamedUnaryDupService)582 TEST_F(HybridEnd2endTest,
583 AsyncRequestStreamResponseStreamSyncStreamedUnaryDupService) {
584 typedef EchoTestService::WithAsyncMethod_RequestStream<
585 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
586 SType;
587 SType service;
588 StreamedUnaryDupPkg dup_service;
589 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
590 ResetStub();
591 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
592 &service, cqs_[0].get());
593 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
594 &service, cqs_[1].get());
595 TestAllMethods();
596 SendEchoToDupService();
597 response_stream_handler_thread.join();
598 request_stream_handler_thread.join();
599 }
600
601 // Add a second service that is fully Streamed Unary
602 class FullyStreamedUnaryDupPkg
603 : public duplicate::EchoTestService::StreamedUnaryService {
604 public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)605 Status StreamedEcho(
606 ServerContext* /*context*/,
607 ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
608 EchoRequest req;
609 EchoResponse resp;
610 uint32_t next_msg_sz;
611 stream->NextMessageSize(&next_msg_sz);
612 LOG(INFO) << "Streamed Unary Next Message Size is " << next_msg_sz;
613 CHECK(stream->Read(&req));
614 resp.set_message(req.message() + "_dup");
615 CHECK(stream->Write(resp));
616 return Status::OK;
617 }
618 };
619
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncFullyStreamedUnaryDupService)620 TEST_F(HybridEnd2endTest,
621 AsyncRequestStreamResponseStreamSyncFullyStreamedUnaryDupService) {
622 typedef EchoTestService::WithAsyncMethod_RequestStream<
623 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
624 SType;
625 SType service;
626 FullyStreamedUnaryDupPkg dup_service;
627 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
628 ResetStub();
629 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
630 &service, cqs_[0].get());
631 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
632 &service, cqs_[1].get());
633 TestAllMethods();
634 SendEchoToDupService();
635 response_stream_handler_thread.join();
636 request_stream_handler_thread.join();
637 }
638
639 // Add a second service with one sync split server streaming method.
640 class SplitResponseStreamDupPkg
641 : public duplicate::EchoTestService::
642 WithSplitStreamingMethod_ResponseStream<TestServiceImplDupPkg> {
643 public:
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)644 Status StreamedResponseStream(
645 ServerContext* /*context*/,
646 ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
647 EchoRequest req;
648 EchoResponse resp;
649 uint32_t next_msg_sz;
650 stream->NextMessageSize(&next_msg_sz);
651 LOG(INFO) << "Split Streamed Next Message Size is " << next_msg_sz;
652 CHECK(stream->Read(&req));
653 for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
654 resp.set_message(req.message() + std::to_string(i) + "_dup");
655 CHECK(stream->Write(resp));
656 }
657 return Status::OK;
658 }
659 };
660
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncSplitStreamedDupService)661 TEST_F(HybridEnd2endTest,
662 AsyncRequestStreamResponseStreamSyncSplitStreamedDupService) {
663 typedef EchoTestService::WithAsyncMethod_RequestStream<
664 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
665 SType;
666 SType service;
667 SplitResponseStreamDupPkg dup_service;
668 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
669 ResetStub();
670 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
671 &service, cqs_[0].get());
672 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
673 &service, cqs_[1].get());
674 TestAllMethods();
675 SendSimpleServerStreamingToDupService();
676 response_stream_handler_thread.join();
677 request_stream_handler_thread.join();
678 }
679
680 // Add a second service that is fully split server streamed
681 class FullySplitStreamedDupPkg
682 : public duplicate::EchoTestService::SplitStreamedService {
683 public:
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)684 Status StreamedResponseStream(
685 ServerContext* /*context*/,
686 ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
687 EchoRequest req;
688 EchoResponse resp;
689 uint32_t next_msg_sz;
690 stream->NextMessageSize(&next_msg_sz);
691 LOG(INFO) << "Split Streamed Next Message Size is " << next_msg_sz;
692 CHECK(stream->Read(&req));
693 for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
694 resp.set_message(req.message() + std::to_string(i) + "_dup");
695 CHECK(stream->Write(resp));
696 }
697 return Status::OK;
698 }
699 };
700
TEST_F(HybridEnd2endTest,asyncRequestStreamResponseStreamFullySplitStreamedDupService)701 TEST_F(HybridEnd2endTest,
702 asyncRequestStreamResponseStreamFullySplitStreamedDupService) {
703 typedef EchoTestService::WithAsyncMethod_RequestStream<
704 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
705 SType;
706 SType service;
707 FullySplitStreamedDupPkg dup_service;
708 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
709 ResetStub();
710 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
711 &service, cqs_[0].get());
712 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
713 &service, cqs_[1].get());
714 TestAllMethods();
715 SendSimpleServerStreamingToDupService();
716 response_stream_handler_thread.join();
717 request_stream_handler_thread.join();
718 }
719
720 // Add a second service that is fully server streamed
721 class FullyStreamedDupPkg : public duplicate::EchoTestService::StreamedService {
722 public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)723 Status StreamedEcho(
724 ServerContext* /*context*/,
725 ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
726 EchoRequest req;
727 EchoResponse resp;
728 uint32_t next_msg_sz;
729 stream->NextMessageSize(&next_msg_sz);
730 LOG(INFO) << "Streamed Unary Next Message Size is " << next_msg_sz;
731 CHECK(stream->Read(&req));
732 resp.set_message(req.message() + "_dup");
733 CHECK(stream->Write(resp));
734 return Status::OK;
735 }
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)736 Status StreamedResponseStream(
737 ServerContext* /*context*/,
738 ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
739 EchoRequest req;
740 EchoResponse resp;
741 uint32_t next_msg_sz;
742 stream->NextMessageSize(&next_msg_sz);
743 LOG(INFO) << "Split Streamed Next Message Size is " << next_msg_sz;
744 CHECK(stream->Read(&req));
745 for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
746 resp.set_message(req.message() + std::to_string(i) + "_dup");
747 CHECK(stream->Write(resp));
748 }
749 return Status::OK;
750 }
751 };
752
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamFullyStreamedDupService)753 TEST_F(HybridEnd2endTest,
754 AsyncRequestStreamResponseStreamFullyStreamedDupService) {
755 typedef EchoTestService::WithAsyncMethod_RequestStream<
756 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
757 SType;
758 SType service;
759 FullyStreamedDupPkg dup_service;
760 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
761 ResetStub();
762 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
763 &service, cqs_[0].get());
764 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
765 &service, cqs_[1].get());
766 TestAllMethods();
767 SendEchoToDupService();
768 SendSimpleServerStreamingToDupService();
769 response_stream_handler_thread.join();
770 request_stream_handler_thread.join();
771 }
772
773 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamAsyncDupService)774 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStreamAsyncDupService) {
775 typedef EchoTestService::WithAsyncMethod_RequestStream<
776 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
777 SType;
778 SType service;
779 duplicate::EchoTestService::AsyncService dup_service;
780 SetUpServer(&service, &dup_service, nullptr, nullptr);
781 ResetStub();
782 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
783 &service, cqs_[0].get());
784 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
785 &service, cqs_[1].get());
786 std::thread echo_handler_thread(
787 HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
788 cqs_[2].get(), true);
789 TestAllMethods();
790 SendEchoToDupService();
791 response_stream_handler_thread.join();
792 request_stream_handler_thread.join();
793 echo_handler_thread.join();
794 }
795
TEST_F(HybridEnd2endTest,GenericEcho)796 TEST_F(HybridEnd2endTest, GenericEcho) {
797 EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
798 AsyncGenericService generic_service;
799 SetUpServer(&service, nullptr, &generic_service, nullptr);
800 ResetStub();
801 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
802 cqs_[0].get());
803 TestAllMethods();
804 generic_handler_thread.join();
805 }
806
TEST_P(HybridEnd2endTest,CallbackGenericEcho)807 TEST_P(HybridEnd2endTest, CallbackGenericEcho) {
808 EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
809 class GenericEchoService : public CallbackGenericService {
810 private:
811 ServerGenericBidiReactor* CreateReactor(
812 GenericCallbackServerContext* context) override {
813 EXPECT_EQ(context->method(), "/grpc.testing.EchoTestService/Echo");
814 VLOG(2) << "Constructor of generic service "
815 << context->deadline().time_since_epoch().count();
816
817 class Reactor : public ServerGenericBidiReactor {
818 public:
819 Reactor() { StartRead(&request_); }
820
821 private:
822 void OnDone() override { delete this; }
823 void OnReadDone(bool ok) override {
824 if (!ok) {
825 EXPECT_EQ(reads_complete_, 1);
826 } else {
827 EXPECT_EQ(reads_complete_++, 0);
828 response_ = request_;
829 StartWrite(&response_);
830 StartRead(&request_);
831 }
832 }
833 void OnWriteDone(bool ok) override {
834 Finish(ok ? Status::OK
835 : Status(StatusCode::UNKNOWN, "Unexpected failure"));
836 }
837 ByteBuffer request_;
838 ByteBuffer response_;
839 std::atomic_int reads_complete_{0};
840 };
841 return new Reactor;
842 }
843 } generic_service;
844
845 if (!SetUpServer(&service, nullptr, nullptr, &generic_service)) {
846 return;
847 }
848 ResetStub();
849 TestAllMethods();
850 }
851
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream)852 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) {
853 typedef EchoTestService::WithAsyncMethod_RequestStream<
854 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
855 SType;
856 SType service;
857 AsyncGenericService generic_service;
858 SetUpServer(&service, nullptr, &generic_service, nullptr);
859 ResetStub();
860 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
861 cqs_[0].get());
862 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
863 &service, cqs_[1].get());
864 TestAllMethods();
865 generic_handler_thread.join();
866 request_stream_handler_thread.join();
867 }
868
869 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamSyncDupService)870 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamSyncDupService) {
871 typedef EchoTestService::WithAsyncMethod_RequestStream<
872 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
873 SType;
874 SType service;
875 AsyncGenericService generic_service;
876 TestServiceImplDupPkg dup_service;
877 SetUpServer(&service, &dup_service, &generic_service, nullptr);
878 ResetStub();
879 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
880 cqs_[0].get());
881 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
882 &service, cqs_[1].get());
883 TestAllMethods();
884 SendEchoToDupService();
885 generic_handler_thread.join();
886 request_stream_handler_thread.join();
887 }
888
889 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamAsyncDupService)890 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamAsyncDupService) {
891 typedef EchoTestService::WithAsyncMethod_RequestStream<
892 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
893 SType;
894 SType service;
895 AsyncGenericService generic_service;
896 duplicate::EchoTestService::AsyncService dup_service;
897 SetUpServer(&service, &dup_service, &generic_service, nullptr);
898 ResetStub();
899 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
900 cqs_[0].get());
901 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
902 &service, cqs_[1].get());
903 std::thread echo_handler_thread(
904 HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
905 cqs_[2].get(), true);
906 TestAllMethods();
907 SendEchoToDupService();
908 generic_handler_thread.join();
909 request_stream_handler_thread.join();
910 echo_handler_thread.join();
911 }
912
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamResponseStream)913 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
914 typedef EchoTestService::WithAsyncMethod_RequestStream<
915 EchoTestService::WithGenericMethod_Echo<
916 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
917 SType;
918 SType service;
919 AsyncGenericService generic_service;
920 SetUpServer(&service, nullptr, &generic_service, nullptr);
921 ResetStub();
922 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
923 cqs_[0].get());
924 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
925 &service, cqs_[1].get());
926 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
927 &service, cqs_[2].get());
928 TestAllMethods();
929 generic_handler_thread.join();
930 request_stream_handler_thread.join();
931 response_stream_handler_thread.join();
932 }
933
TEST_F(HybridEnd2endTest,GenericEchoRequestStreamAsyncResponseStream)934 TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) {
935 typedef EchoTestService::WithGenericMethod_RequestStream<
936 EchoTestService::WithGenericMethod_Echo<
937 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
938 SType;
939 SType service;
940 AsyncGenericService generic_service;
941 SetUpServer(&service, nullptr, &generic_service, nullptr);
942 ResetStub();
943 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
944 cqs_[0].get());
945 std::thread generic_handler_thread2(HandleGenericCall, &generic_service,
946 cqs_[1].get());
947 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
948 &service, cqs_[2].get());
949 TestAllMethods();
950 generic_handler_thread.join();
951 generic_handler_thread2.join();
952 response_stream_handler_thread.join();
953 }
954
955 // If WithGenericMethod is called and no generic service is registered, the
956 // server will fail to build.
TEST_F(HybridEnd2endTest,GenericMethodWithoutGenericService)957 TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) {
958 EchoTestService::WithGenericMethod_RequestStream<
959 EchoTestService::WithGenericMethod_Echo<
960 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
961 service;
962 SetUpServer(&service, nullptr, nullptr, nullptr);
963 EXPECT_EQ(nullptr, server_.get());
964 }
965
966 INSTANTIATE_TEST_SUITE_P(HybridEnd2endTest, HybridEnd2endTest,
967 ::testing::Bool());
968
969 } // namespace
970 } // namespace testing
971 } // namespace grpc
972
main(int argc,char ** argv)973 int main(int argc, char** argv) {
974 grpc::testing::TestEnvironment env(&argc, argv);
975 ::testing::InitGoogleTest(&argc, argv);
976 return RUN_ALL_TESTS();
977 }
978