1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <memory>
20
21 #include <grpc/grpc.h>
22 #include <grpc/support/time.h>
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/create_channel.h>
26 #include <grpcpp/generic/async_generic_service.h>
27 #include <grpcpp/generic/generic_stub.h>
28 #include <grpcpp/impl/codegen/proto_utils.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <grpcpp/server_context.h>
32 #include <grpcpp/support/slice.h>
33
34 #include "src/proto/grpc/testing/echo.grpc.pb.h"
35 #include "test/core/util/port.h"
36 #include "test/core/util/test_config.h"
37 #include "test/cpp/util/byte_buffer_proto_helper.h"
38
39 #include <gtest/gtest.h>
40
41 using grpc::testing::EchoRequest;
42 using grpc::testing::EchoResponse;
43 using std::chrono::system_clock;
44
45 namespace grpc {
46 namespace testing {
47 namespace {
48
tag(int i)49 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
50
verify_ok(CompletionQueue * cq,int i,bool expect_ok)51 void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
52 bool ok;
53 void* got_tag;
54 EXPECT_TRUE(cq->Next(&got_tag, &ok));
55 EXPECT_EQ(expect_ok, ok);
56 EXPECT_EQ(tag(i), got_tag);
57 }
58
59 class GenericEnd2endTest : public ::testing::Test {
60 protected:
GenericEnd2endTest()61 GenericEnd2endTest() : server_host_("localhost") {}
62
SetUp()63 void SetUp() override {
64 int port = grpc_pick_unused_port_or_die();
65 server_address_ << server_host_ << ":" << port;
66 // Setup server
67 ServerBuilder builder;
68 builder.AddListeningPort(server_address_.str(),
69 InsecureServerCredentials());
70 builder.RegisterAsyncGenericService(&generic_service_);
71 // Include a second call to RegisterAsyncGenericService to make sure that
72 // we get an error in the log, since it is not allowed to have 2 async
73 // generic services
74 builder.RegisterAsyncGenericService(&generic_service_);
75 srv_cq_ = builder.AddCompletionQueue();
76 server_ = builder.BuildAndStart();
77 }
78
TearDown()79 void TearDown() override {
80 server_->Shutdown();
81 void* ignored_tag;
82 bool ignored_ok;
83 cli_cq_.Shutdown();
84 srv_cq_->Shutdown();
85 while (cli_cq_.Next(&ignored_tag, &ignored_ok))
86 ;
87 while (srv_cq_->Next(&ignored_tag, &ignored_ok))
88 ;
89 }
90
ResetStub()91 void ResetStub() {
92 std::shared_ptr<Channel> channel =
93 CreateChannel(server_address_.str(), InsecureChannelCredentials());
94 generic_stub_.reset(new GenericStub(channel));
95 }
96
server_ok(int i)97 void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
client_ok(int i)98 void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
server_fail(int i)99 void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
client_fail(int i)100 void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
101
SendRpc(int num_rpcs)102 void SendRpc(int num_rpcs) {
103 SendRpc(num_rpcs, false, gpr_inf_future(GPR_CLOCK_MONOTONIC));
104 }
105
SendRpc(int num_rpcs,bool check_deadline,gpr_timespec deadline)106 void SendRpc(int num_rpcs, bool check_deadline, gpr_timespec deadline) {
107 const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
108 for (int i = 0; i < num_rpcs; i++) {
109 EchoRequest send_request;
110 EchoRequest recv_request;
111 EchoResponse send_response;
112 EchoResponse recv_response;
113 Status recv_status;
114
115 ClientContext cli_ctx;
116 GenericServerContext srv_ctx;
117 GenericServerAsyncReaderWriter stream(&srv_ctx);
118
119 // The string needs to be long enough to test heap-based slice.
120 send_request.set_message("Hello world. Hello world. Hello world.");
121
122 if (check_deadline) {
123 cli_ctx.set_deadline(deadline);
124 }
125
126 std::unique_ptr<GenericClientAsyncReaderWriter> call =
127 generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
128 call->StartCall(tag(1));
129 client_ok(1);
130 std::unique_ptr<ByteBuffer> send_buffer =
131 SerializeToByteBuffer(&send_request);
132 call->Write(*send_buffer, tag(2));
133 // Send ByteBuffer can be destroyed after calling Write.
134 send_buffer.reset();
135 client_ok(2);
136 call->WritesDone(tag(3));
137 client_ok(3);
138
139 generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
140 srv_cq_.get(), tag(4));
141
142 verify_ok(srv_cq_.get(), 4, true);
143 EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
144 EXPECT_EQ(kMethodName, srv_ctx.method());
145
146 if (check_deadline) {
147 EXPECT_TRUE(gpr_time_similar(deadline, srv_ctx.raw_deadline(),
148 gpr_time_from_millis(1000, GPR_TIMESPAN)));
149 }
150
151 ByteBuffer recv_buffer;
152 stream.Read(&recv_buffer, tag(5));
153 server_ok(5);
154 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
155 EXPECT_EQ(send_request.message(), recv_request.message());
156
157 send_response.set_message(recv_request.message());
158 send_buffer = SerializeToByteBuffer(&send_response);
159 stream.Write(*send_buffer, tag(6));
160 send_buffer.reset();
161 server_ok(6);
162
163 stream.Finish(Status::OK, tag(7));
164 server_ok(7);
165
166 recv_buffer.Clear();
167 call->Read(&recv_buffer, tag(8));
168 client_ok(8);
169 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
170
171 call->Finish(&recv_status, tag(9));
172 client_ok(9);
173
174 EXPECT_EQ(send_response.message(), recv_response.message());
175 EXPECT_TRUE(recv_status.ok());
176 }
177 }
178
179 CompletionQueue cli_cq_;
180 std::unique_ptr<ServerCompletionQueue> srv_cq_;
181 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
182 std::unique_ptr<grpc::GenericStub> generic_stub_;
183 std::unique_ptr<Server> server_;
184 AsyncGenericService generic_service_;
185 const grpc::string server_host_;
186 std::ostringstream server_address_;
187 };
188
TEST_F(GenericEnd2endTest,SimpleRpc)189 TEST_F(GenericEnd2endTest, SimpleRpc) {
190 ResetStub();
191 SendRpc(1);
192 }
193
TEST_F(GenericEnd2endTest,SequentialRpcs)194 TEST_F(GenericEnd2endTest, SequentialRpcs) {
195 ResetStub();
196 SendRpc(10);
197 }
198
TEST_F(GenericEnd2endTest,SequentialUnaryRpcs)199 TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) {
200 ResetStub();
201 const int num_rpcs = 10;
202 const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
203 for (int i = 0; i < num_rpcs; i++) {
204 EchoRequest send_request;
205 EchoRequest recv_request;
206 EchoResponse send_response;
207 EchoResponse recv_response;
208 Status recv_status;
209
210 ClientContext cli_ctx;
211 GenericServerContext srv_ctx;
212 GenericServerAsyncReaderWriter stream(&srv_ctx);
213
214 // The string needs to be long enough to test heap-based slice.
215 send_request.set_message("Hello world. Hello world. Hello world.");
216
217 std::unique_ptr<ByteBuffer> cli_send_buffer =
218 SerializeToByteBuffer(&send_request);
219 // Use the same cq as server so that events can be polled in time.
220 std::unique_ptr<GenericClientAsyncResponseReader> call =
221 generic_stub_->PrepareUnaryCall(&cli_ctx, kMethodName,
222 *cli_send_buffer.get(), srv_cq_.get());
223 call->StartCall();
224 ByteBuffer cli_recv_buffer;
225 call->Finish(&cli_recv_buffer, &recv_status, tag(1));
226
227 generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
228 srv_cq_.get(), tag(4));
229
230 server_ok(4);
231 EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
232 EXPECT_EQ(kMethodName, srv_ctx.method());
233
234 ByteBuffer srv_recv_buffer;
235 stream.Read(&srv_recv_buffer, tag(5));
236 server_ok(5);
237 EXPECT_TRUE(ParseFromByteBuffer(&srv_recv_buffer, &recv_request));
238 EXPECT_EQ(send_request.message(), recv_request.message());
239
240 send_response.set_message(recv_request.message());
241 std::unique_ptr<ByteBuffer> srv_send_buffer =
242 SerializeToByteBuffer(&send_response);
243 stream.Write(*srv_send_buffer, tag(6));
244 server_ok(6);
245
246 stream.Finish(Status::OK, tag(7));
247 server_ok(7);
248
249 verify_ok(srv_cq_.get(), 1, true);
250 EXPECT_TRUE(ParseFromByteBuffer(&cli_recv_buffer, &recv_response));
251 EXPECT_EQ(send_response.message(), recv_response.message());
252 EXPECT_TRUE(recv_status.ok());
253 }
254 }
255
256 // One ping, one pong.
TEST_F(GenericEnd2endTest,SimpleBidiStreaming)257 TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
258 ResetStub();
259
260 const grpc::string kMethodName(
261 "/grpc.cpp.test.util.EchoTestService/BidiStream");
262 EchoRequest send_request;
263 EchoRequest recv_request;
264 EchoResponse send_response;
265 EchoResponse recv_response;
266 Status recv_status;
267 ClientContext cli_ctx;
268 GenericServerContext srv_ctx;
269 GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
270
271 cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
272 send_request.set_message("Hello");
273 std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
274 generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
275 cli_stream->StartCall(tag(1));
276 client_ok(1);
277
278 generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
279 srv_cq_.get(), tag(2));
280
281 verify_ok(srv_cq_.get(), 2, true);
282 EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
283 EXPECT_EQ(kMethodName, srv_ctx.method());
284
285 std::unique_ptr<ByteBuffer> send_buffer =
286 SerializeToByteBuffer(&send_request);
287 cli_stream->Write(*send_buffer, tag(3));
288 send_buffer.reset();
289 client_ok(3);
290
291 ByteBuffer recv_buffer;
292 srv_stream.Read(&recv_buffer, tag(4));
293 server_ok(4);
294 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
295 EXPECT_EQ(send_request.message(), recv_request.message());
296
297 send_response.set_message(recv_request.message());
298 send_buffer = SerializeToByteBuffer(&send_response);
299 srv_stream.Write(*send_buffer, tag(5));
300 send_buffer.reset();
301 server_ok(5);
302
303 cli_stream->Read(&recv_buffer, tag(6));
304 client_ok(6);
305 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
306 EXPECT_EQ(send_response.message(), recv_response.message());
307
308 cli_stream->WritesDone(tag(7));
309 client_ok(7);
310
311 srv_stream.Read(&recv_buffer, tag(8));
312 server_fail(8);
313
314 srv_stream.Finish(Status::OK, tag(9));
315 server_ok(9);
316
317 cli_stream->Finish(&recv_status, tag(10));
318 client_ok(10);
319
320 EXPECT_EQ(send_response.message(), recv_response.message());
321 EXPECT_TRUE(recv_status.ok());
322 }
323
TEST_F(GenericEnd2endTest,Deadline)324 TEST_F(GenericEnd2endTest, Deadline) {
325 ResetStub();
326 SendRpc(1, true,
327 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
328 gpr_time_from_seconds(10, GPR_TIMESPAN)));
329 }
330
331 } // namespace
332 } // namespace testing
333 } // namespace grpc
334
main(int argc,char ** argv)335 int main(int argc, char** argv) {
336 grpc_test_init(argc, argv);
337 ::testing::InitGoogleTest(&argc, argv);
338 return RUN_ALL_TESTS();
339 }
340