1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <memory>
20 #include <thread>
21
22 #include <grpc/grpc.h>
23 #include <grpc/support/time.h>
24 #include <grpcpp/channel.h>
25 #include <grpcpp/client_context.h>
26 #include <grpcpp/create_channel.h>
27 #include <grpcpp/generic/async_generic_service.h>
28 #include <grpcpp/generic/generic_stub.h>
29 #include <grpcpp/impl/codegen/proto_utils.h>
30 #include <grpcpp/server.h>
31 #include <grpcpp/server_builder.h>
32 #include <grpcpp/server_context.h>
33 #include <grpcpp/support/slice.h>
34
35 #include "src/proto/grpc/testing/echo.grpc.pb.h"
36 #include "test/core/util/port.h"
37 #include "test/core/util/test_config.h"
38 #include "test/cpp/util/byte_buffer_proto_helper.h"
39
40 #include <gtest/gtest.h>
41
42 using grpc::testing::EchoRequest;
43 using grpc::testing::EchoResponse;
44 using std::chrono::system_clock;
45
46 namespace grpc {
47 namespace testing {
48 namespace {
49
tag(int i)50 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
51
verify_ok(CompletionQueue * cq,int i,bool expect_ok)52 void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
53 bool ok;
54 void* got_tag;
55 EXPECT_TRUE(cq->Next(&got_tag, &ok));
56 EXPECT_EQ(expect_ok, ok);
57 EXPECT_EQ(tag(i), got_tag);
58 }
59
60 class GenericEnd2endTest : public ::testing::Test {
61 protected:
GenericEnd2endTest()62 GenericEnd2endTest() : server_host_("localhost") {}
63
SetUp()64 void SetUp() override {
65 shut_down_ = false;
66 int port = grpc_pick_unused_port_or_die();
67 server_address_ << server_host_ << ":" << port;
68 // Setup server
69 ServerBuilder builder;
70 builder.AddListeningPort(server_address_.str(),
71 InsecureServerCredentials());
72 builder.RegisterAsyncGenericService(&generic_service_);
73 // Include a second call to RegisterAsyncGenericService to make sure that
74 // we get an error in the log, since it is not allowed to have 2 async
75 // generic services
76 builder.RegisterAsyncGenericService(&generic_service_);
77 srv_cq_ = builder.AddCompletionQueue();
78 server_ = builder.BuildAndStart();
79 }
80
ShutDownServerAndCQs()81 void ShutDownServerAndCQs() {
82 if (!shut_down_) {
83 server_->Shutdown();
84 void* ignored_tag;
85 bool ignored_ok;
86 cli_cq_.Shutdown();
87 srv_cq_->Shutdown();
88 while (cli_cq_.Next(&ignored_tag, &ignored_ok))
89 ;
90 while (srv_cq_->Next(&ignored_tag, &ignored_ok))
91 ;
92 shut_down_ = true;
93 }
94 }
TearDown()95 void TearDown() override { ShutDownServerAndCQs(); }
96
ResetStub()97 void ResetStub() {
98 std::shared_ptr<Channel> channel = grpc::CreateChannel(
99 server_address_.str(), InsecureChannelCredentials());
100 stub_ = grpc::testing::EchoTestService::NewStub(channel);
101 generic_stub_.reset(new GenericStub(channel));
102 }
103
server_ok(int i)104 void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
client_ok(int i)105 void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
server_fail(int i)106 void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
client_fail(int i)107 void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
108
SendRpc(int num_rpcs)109 void SendRpc(int num_rpcs) {
110 SendRpc(num_rpcs, false, gpr_inf_future(GPR_CLOCK_MONOTONIC));
111 }
112
SendRpc(int num_rpcs,bool check_deadline,gpr_timespec deadline)113 void SendRpc(int num_rpcs, bool check_deadline, gpr_timespec deadline) {
114 const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
115 for (int i = 0; i < num_rpcs; i++) {
116 EchoRequest send_request;
117 EchoRequest recv_request;
118 EchoResponse send_response;
119 EchoResponse recv_response;
120 Status recv_status;
121
122 ClientContext cli_ctx;
123 GenericServerContext srv_ctx;
124 GenericServerAsyncReaderWriter stream(&srv_ctx);
125
126 // The string needs to be long enough to test heap-based slice.
127 send_request.set_message("Hello world. Hello world. Hello world.");
128
129 if (check_deadline) {
130 cli_ctx.set_deadline(deadline);
131 }
132
133 // Rather than using the original kMethodName, make a short-lived
134 // copy to also confirm that we don't refer to this object beyond
135 // the initial call preparation
136 const std::string* method_name = new std::string(kMethodName);
137
138 std::unique_ptr<GenericClientAsyncReaderWriter> call =
139 generic_stub_->PrepareCall(&cli_ctx, *method_name, &cli_cq_);
140
141 delete method_name; // Make sure that this is not needed after invocation
142
143 call->StartCall(tag(1));
144 client_ok(1);
145 std::unique_ptr<ByteBuffer> send_buffer =
146 SerializeToByteBuffer(&send_request);
147 call->Write(*send_buffer, tag(2));
148 // Send ByteBuffer can be destroyed after calling Write.
149 send_buffer.reset();
150 client_ok(2);
151 call->WritesDone(tag(3));
152 client_ok(3);
153
154 generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
155 srv_cq_.get(), tag(4));
156
157 verify_ok(srv_cq_.get(), 4, true);
158 EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
159 EXPECT_EQ(kMethodName, srv_ctx.method());
160
161 if (check_deadline) {
162 EXPECT_TRUE(gpr_time_similar(deadline, srv_ctx.raw_deadline(),
163 gpr_time_from_millis(1000, GPR_TIMESPAN)));
164 }
165
166 ByteBuffer recv_buffer;
167 stream.Read(&recv_buffer, tag(5));
168 server_ok(5);
169 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
170 EXPECT_EQ(send_request.message(), recv_request.message());
171
172 send_response.set_message(recv_request.message());
173 send_buffer = SerializeToByteBuffer(&send_response);
174 stream.Write(*send_buffer, tag(6));
175 send_buffer.reset();
176 server_ok(6);
177
178 stream.Finish(Status::OK, tag(7));
179 server_ok(7);
180
181 recv_buffer.Clear();
182 call->Read(&recv_buffer, tag(8));
183 client_ok(8);
184 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
185
186 call->Finish(&recv_status, tag(9));
187 client_ok(9);
188
189 EXPECT_EQ(send_response.message(), recv_response.message());
190 EXPECT_TRUE(recv_status.ok());
191 }
192 }
193
194 // Return errors to up to one call that comes in on the supplied completion
195 // queue, until the CQ is being shut down (and therefore we can no longer
196 // enqueue further events).
DriveCompletionQueue()197 void DriveCompletionQueue() {
198 enum class Event : uintptr_t {
199 kCallReceived,
200 kResponseSent,
201 };
202 // Request the call, but only if the main thread hasn't beaten us to
203 // shutting down the CQ.
204 grpc::GenericServerContext server_context;
205 grpc::GenericServerAsyncReaderWriter reader_writer(&server_context);
206
207 {
208 std::lock_guard<std::mutex> lock(shutting_down_mu_);
209 if (!shutting_down_) {
210 generic_service_.RequestCall(
211 &server_context, &reader_writer, srv_cq_.get(), srv_cq_.get(),
212 reinterpret_cast<void*>(Event::kCallReceived));
213 }
214 }
215 // Process events.
216 {
217 Event event;
218 bool ok;
219 while (srv_cq_->Next(reinterpret_cast<void**>(&event), &ok)) {
220 std::lock_guard<std::mutex> lock(shutting_down_mu_);
221 if (shutting_down_) {
222 // The main thread has started shutting down. Simply continue to drain
223 // events.
224 continue;
225 }
226
227 switch (event) {
228 case Event::kCallReceived:
229 reader_writer.Finish(
230 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "go away"),
231 reinterpret_cast<void*>(Event::kResponseSent));
232 break;
233
234 case Event::kResponseSent:
235 // We are done.
236 break;
237 }
238 }
239 }
240 }
241
242 CompletionQueue cli_cq_;
243 std::unique_ptr<ServerCompletionQueue> srv_cq_;
244 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
245 std::unique_ptr<grpc::GenericStub> generic_stub_;
246 std::unique_ptr<Server> server_;
247 AsyncGenericService generic_service_;
248 const std::string server_host_;
249 std::ostringstream server_address_;
250 bool shutting_down_;
251 bool shut_down_;
252 std::mutex shutting_down_mu_;
253 };
254
TEST_F(GenericEnd2endTest,SimpleRpc)255 TEST_F(GenericEnd2endTest, SimpleRpc) {
256 ResetStub();
257 SendRpc(1);
258 }
259
TEST_F(GenericEnd2endTest,SequentialRpcs)260 TEST_F(GenericEnd2endTest, SequentialRpcs) {
261 ResetStub();
262 SendRpc(10);
263 }
264
TEST_F(GenericEnd2endTest,SequentialUnaryRpcs)265 TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) {
266 ResetStub();
267 const int num_rpcs = 10;
268 const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
269 for (int i = 0; i < num_rpcs; i++) {
270 EchoRequest send_request;
271 EchoRequest recv_request;
272 EchoResponse send_response;
273 EchoResponse recv_response;
274 Status recv_status;
275
276 ClientContext cli_ctx;
277 GenericServerContext srv_ctx;
278 GenericServerAsyncReaderWriter stream(&srv_ctx);
279
280 // The string needs to be long enough to test heap-based slice.
281 send_request.set_message("Hello world. Hello world. Hello world.");
282
283 std::unique_ptr<ByteBuffer> cli_send_buffer =
284 SerializeToByteBuffer(&send_request);
285 // Use the same cq as server so that events can be polled in time.
286 std::unique_ptr<GenericClientAsyncResponseReader> call =
287 generic_stub_->PrepareUnaryCall(&cli_ctx, kMethodName,
288 *cli_send_buffer.get(), &cli_cq_);
289 call->StartCall();
290 ByteBuffer cli_recv_buffer;
291 call->Finish(&cli_recv_buffer, &recv_status, tag(1));
292 std::thread client_check([this] { client_ok(1); });
293
294 generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
295 srv_cq_.get(), tag(4));
296
297 server_ok(4);
298 EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
299 EXPECT_EQ(kMethodName, srv_ctx.method());
300
301 ByteBuffer srv_recv_buffer;
302 stream.Read(&srv_recv_buffer, tag(5));
303 server_ok(5);
304 EXPECT_TRUE(ParseFromByteBuffer(&srv_recv_buffer, &recv_request));
305 EXPECT_EQ(send_request.message(), recv_request.message());
306
307 send_response.set_message(recv_request.message());
308 std::unique_ptr<ByteBuffer> srv_send_buffer =
309 SerializeToByteBuffer(&send_response);
310 stream.Write(*srv_send_buffer, tag(6));
311 server_ok(6);
312
313 stream.Finish(Status::OK, tag(7));
314 server_ok(7);
315
316 client_check.join();
317 EXPECT_TRUE(ParseFromByteBuffer(&cli_recv_buffer, &recv_response));
318 EXPECT_EQ(send_response.message(), recv_response.message());
319 EXPECT_TRUE(recv_status.ok());
320 }
321 }
322
323 // One ping, one pong.
TEST_F(GenericEnd2endTest,SimpleBidiStreaming)324 TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
325 ResetStub();
326
327 const std::string kMethodName(
328 "/grpc.cpp.test.util.EchoTestService/BidiStream");
329 EchoRequest send_request;
330 EchoRequest recv_request;
331 EchoResponse send_response;
332 EchoResponse recv_response;
333 Status recv_status;
334 ClientContext cli_ctx;
335 GenericServerContext srv_ctx;
336 GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
337
338 cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
339 send_request.set_message("Hello");
340 std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
341 generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
342 cli_stream->StartCall(tag(1));
343 client_ok(1);
344
345 generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
346 srv_cq_.get(), tag(2));
347
348 verify_ok(srv_cq_.get(), 2, true);
349 EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
350 EXPECT_EQ(kMethodName, srv_ctx.method());
351
352 std::unique_ptr<ByteBuffer> send_buffer =
353 SerializeToByteBuffer(&send_request);
354 cli_stream->Write(*send_buffer, tag(3));
355 send_buffer.reset();
356 client_ok(3);
357
358 ByteBuffer recv_buffer;
359 srv_stream.Read(&recv_buffer, tag(4));
360 server_ok(4);
361 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
362 EXPECT_EQ(send_request.message(), recv_request.message());
363
364 send_response.set_message(recv_request.message());
365 send_buffer = SerializeToByteBuffer(&send_response);
366 srv_stream.Write(*send_buffer, tag(5));
367 send_buffer.reset();
368 server_ok(5);
369
370 cli_stream->Read(&recv_buffer, tag(6));
371 client_ok(6);
372 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
373 EXPECT_EQ(send_response.message(), recv_response.message());
374
375 cli_stream->WritesDone(tag(7));
376 client_ok(7);
377
378 srv_stream.Read(&recv_buffer, tag(8));
379 server_fail(8);
380
381 srv_stream.Finish(Status::OK, tag(9));
382 server_ok(9);
383
384 cli_stream->Finish(&recv_status, tag(10));
385 client_ok(10);
386
387 EXPECT_EQ(send_response.message(), recv_response.message());
388 EXPECT_TRUE(recv_status.ok());
389 }
390
TEST_F(GenericEnd2endTest,Deadline)391 TEST_F(GenericEnd2endTest, Deadline) {
392 ResetStub();
393 SendRpc(1, true,
394 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
395 gpr_time_from_seconds(10, GPR_TIMESPAN)));
396 }
397
TEST_F(GenericEnd2endTest,ShortDeadline)398 TEST_F(GenericEnd2endTest, ShortDeadline) {
399 ResetStub();
400
401 ClientContext cli_ctx;
402 EchoRequest request;
403 EchoResponse response;
404
405 shutting_down_ = false;
406 std::thread driver([this] { DriveCompletionQueue(); });
407
408 request.set_message("");
409 cli_ctx.set_deadline(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
410 gpr_time_from_micros(500, GPR_TIMESPAN)));
411 Status s = stub_->Echo(&cli_ctx, request, &response);
412 EXPECT_FALSE(s.ok());
413 {
414 std::lock_guard<std::mutex> lock(shutting_down_mu_);
415 shutting_down_ = true;
416 }
417 ShutDownServerAndCQs();
418 driver.join();
419 }
420
421 } // namespace
422 } // namespace testing
423 } // namespace grpc
424
main(int argc,char ** argv)425 int main(int argc, char** argv) {
426 grpc::testing::TestEnvironment env(argc, argv);
427 ::testing::InitGoogleTest(&argc, argv);
428 return RUN_ALL_TESTS();
429 }
430