• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <memory>
20 #include <thread>
21 
22 #include <grpc/grpc.h>
23 #include <grpc/support/time.h>
24 #include <grpcpp/channel.h>
25 #include <grpcpp/client_context.h>
26 #include <grpcpp/create_channel.h>
27 #include <grpcpp/generic/async_generic_service.h>
28 #include <grpcpp/generic/generic_stub.h>
29 #include <grpcpp/impl/codegen/proto_utils.h>
30 #include <grpcpp/server.h>
31 #include <grpcpp/server_builder.h>
32 #include <grpcpp/server_context.h>
33 #include <grpcpp/support/slice.h>
34 
35 #include "src/proto/grpc/testing/echo.grpc.pb.h"
36 #include "test/core/util/port.h"
37 #include "test/core/util/test_config.h"
38 #include "test/cpp/util/byte_buffer_proto_helper.h"
39 
40 #include <gtest/gtest.h>
41 
42 using grpc::testing::EchoRequest;
43 using grpc::testing::EchoResponse;
44 using std::chrono::system_clock;
45 
46 namespace grpc {
47 namespace testing {
48 namespace {
49 
tag(int i)50 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
51 
verify_ok(CompletionQueue * cq,int i,bool expect_ok)52 void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
53   bool ok;
54   void* got_tag;
55   EXPECT_TRUE(cq->Next(&got_tag, &ok));
56   EXPECT_EQ(expect_ok, ok);
57   EXPECT_EQ(tag(i), got_tag);
58 }
59 
60 class GenericEnd2endTest : public ::testing::Test {
61  protected:
GenericEnd2endTest()62   GenericEnd2endTest() : server_host_("localhost") {}
63 
SetUp()64   void SetUp() override {
65     shut_down_ = false;
66     int port = grpc_pick_unused_port_or_die();
67     server_address_ << server_host_ << ":" << port;
68     // Setup server
69     ServerBuilder builder;
70     builder.AddListeningPort(server_address_.str(),
71                              InsecureServerCredentials());
72     builder.RegisterAsyncGenericService(&generic_service_);
73     // Include a second call to RegisterAsyncGenericService to make sure that
74     // we get an error in the log, since it is not allowed to have 2 async
75     // generic services
76     builder.RegisterAsyncGenericService(&generic_service_);
77     srv_cq_ = builder.AddCompletionQueue();
78     server_ = builder.BuildAndStart();
79   }
80 
ShutDownServerAndCQs()81   void ShutDownServerAndCQs() {
82     if (!shut_down_) {
83       server_->Shutdown();
84       void* ignored_tag;
85       bool ignored_ok;
86       cli_cq_.Shutdown();
87       srv_cq_->Shutdown();
88       while (cli_cq_.Next(&ignored_tag, &ignored_ok))
89         ;
90       while (srv_cq_->Next(&ignored_tag, &ignored_ok))
91         ;
92       shut_down_ = true;
93     }
94   }
TearDown()95   void TearDown() override { ShutDownServerAndCQs(); }
96 
ResetStub()97   void ResetStub() {
98     std::shared_ptr<Channel> channel = grpc::CreateChannel(
99         server_address_.str(), InsecureChannelCredentials());
100     stub_ = grpc::testing::EchoTestService::NewStub(channel);
101     generic_stub_.reset(new GenericStub(channel));
102   }
103 
server_ok(int i)104   void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
client_ok(int i)105   void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
server_fail(int i)106   void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
client_fail(int i)107   void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
108 
SendRpc(int num_rpcs)109   void SendRpc(int num_rpcs) {
110     SendRpc(num_rpcs, false, gpr_inf_future(GPR_CLOCK_MONOTONIC));
111   }
112 
SendRpc(int num_rpcs,bool check_deadline,gpr_timespec deadline)113   void SendRpc(int num_rpcs, bool check_deadline, gpr_timespec deadline) {
114     const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
115     for (int i = 0; i < num_rpcs; i++) {
116       EchoRequest send_request;
117       EchoRequest recv_request;
118       EchoResponse send_response;
119       EchoResponse recv_response;
120       Status recv_status;
121 
122       ClientContext cli_ctx;
123       GenericServerContext srv_ctx;
124       GenericServerAsyncReaderWriter stream(&srv_ctx);
125 
126       // The string needs to be long enough to test heap-based slice.
127       send_request.set_message("Hello world. Hello world. Hello world.");
128 
129       if (check_deadline) {
130         cli_ctx.set_deadline(deadline);
131       }
132 
133       // Rather than using the original kMethodName, make a short-lived
134       // copy to also confirm that we don't refer to this object beyond
135       // the initial call preparation
136       const std::string* method_name = new std::string(kMethodName);
137 
138       std::unique_ptr<GenericClientAsyncReaderWriter> call =
139           generic_stub_->PrepareCall(&cli_ctx, *method_name, &cli_cq_);
140 
141       delete method_name;  // Make sure that this is not needed after invocation
142 
143       call->StartCall(tag(1));
144       client_ok(1);
145       std::unique_ptr<ByteBuffer> send_buffer =
146           SerializeToByteBuffer(&send_request);
147       call->Write(*send_buffer, tag(2));
148       // Send ByteBuffer can be destroyed after calling Write.
149       send_buffer.reset();
150       client_ok(2);
151       call->WritesDone(tag(3));
152       client_ok(3);
153 
154       generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
155                                    srv_cq_.get(), tag(4));
156 
157       verify_ok(srv_cq_.get(), 4, true);
158       EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
159       EXPECT_EQ(kMethodName, srv_ctx.method());
160 
161       if (check_deadline) {
162         EXPECT_TRUE(gpr_time_similar(deadline, srv_ctx.raw_deadline(),
163                                      gpr_time_from_millis(1000, GPR_TIMESPAN)));
164       }
165 
166       ByteBuffer recv_buffer;
167       stream.Read(&recv_buffer, tag(5));
168       server_ok(5);
169       EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
170       EXPECT_EQ(send_request.message(), recv_request.message());
171 
172       send_response.set_message(recv_request.message());
173       send_buffer = SerializeToByteBuffer(&send_response);
174       stream.Write(*send_buffer, tag(6));
175       send_buffer.reset();
176       server_ok(6);
177 
178       stream.Finish(Status::OK, tag(7));
179       server_ok(7);
180 
181       recv_buffer.Clear();
182       call->Read(&recv_buffer, tag(8));
183       client_ok(8);
184       EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
185 
186       call->Finish(&recv_status, tag(9));
187       client_ok(9);
188 
189       EXPECT_EQ(send_response.message(), recv_response.message());
190       EXPECT_TRUE(recv_status.ok());
191     }
192   }
193 
194   // Return errors to up to one call that comes in on the supplied completion
195   // queue, until the CQ is being shut down (and therefore we can no longer
196   // enqueue further events).
DriveCompletionQueue()197   void DriveCompletionQueue() {
198     enum class Event : uintptr_t {
199       kCallReceived,
200       kResponseSent,
201     };
202     // Request the call, but only if the main thread hasn't beaten us to
203     // shutting down the CQ.
204     grpc::GenericServerContext server_context;
205     grpc::GenericServerAsyncReaderWriter reader_writer(&server_context);
206 
207     {
208       std::lock_guard<std::mutex> lock(shutting_down_mu_);
209       if (!shutting_down_) {
210         generic_service_.RequestCall(
211             &server_context, &reader_writer, srv_cq_.get(), srv_cq_.get(),
212             reinterpret_cast<void*>(Event::kCallReceived));
213       }
214     }
215     // Process events.
216     {
217       Event event;
218       bool ok;
219       while (srv_cq_->Next(reinterpret_cast<void**>(&event), &ok)) {
220         std::lock_guard<std::mutex> lock(shutting_down_mu_);
221         if (shutting_down_) {
222           // The main thread has started shutting down. Simply continue to drain
223           // events.
224           continue;
225         }
226 
227         switch (event) {
228           case Event::kCallReceived:
229             reader_writer.Finish(
230                 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "go away"),
231                 reinterpret_cast<void*>(Event::kResponseSent));
232             break;
233 
234           case Event::kResponseSent:
235             // We are done.
236             break;
237         }
238       }
239     }
240   }
241 
242   CompletionQueue cli_cq_;
243   std::unique_ptr<ServerCompletionQueue> srv_cq_;
244   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
245   std::unique_ptr<grpc::GenericStub> generic_stub_;
246   std::unique_ptr<Server> server_;
247   AsyncGenericService generic_service_;
248   const std::string server_host_;
249   std::ostringstream server_address_;
250   bool shutting_down_;
251   bool shut_down_;
252   std::mutex shutting_down_mu_;
253 };
254 
TEST_F(GenericEnd2endTest,SimpleRpc)255 TEST_F(GenericEnd2endTest, SimpleRpc) {
256   ResetStub();
257   SendRpc(1);
258 }
259 
TEST_F(GenericEnd2endTest,SequentialRpcs)260 TEST_F(GenericEnd2endTest, SequentialRpcs) {
261   ResetStub();
262   SendRpc(10);
263 }
264 
TEST_F(GenericEnd2endTest,SequentialUnaryRpcs)265 TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) {
266   ResetStub();
267   const int num_rpcs = 10;
268   const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
269   for (int i = 0; i < num_rpcs; i++) {
270     EchoRequest send_request;
271     EchoRequest recv_request;
272     EchoResponse send_response;
273     EchoResponse recv_response;
274     Status recv_status;
275 
276     ClientContext cli_ctx;
277     GenericServerContext srv_ctx;
278     GenericServerAsyncReaderWriter stream(&srv_ctx);
279 
280     // The string needs to be long enough to test heap-based slice.
281     send_request.set_message("Hello world. Hello world. Hello world.");
282 
283     std::unique_ptr<ByteBuffer> cli_send_buffer =
284         SerializeToByteBuffer(&send_request);
285     // Use the same cq as server so that events can be polled in time.
286     std::unique_ptr<GenericClientAsyncResponseReader> call =
287         generic_stub_->PrepareUnaryCall(&cli_ctx, kMethodName,
288                                         *cli_send_buffer.get(), &cli_cq_);
289     call->StartCall();
290     ByteBuffer cli_recv_buffer;
291     call->Finish(&cli_recv_buffer, &recv_status, tag(1));
292     std::thread client_check([this] { client_ok(1); });
293 
294     generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
295                                  srv_cq_.get(), tag(4));
296 
297     server_ok(4);
298     EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
299     EXPECT_EQ(kMethodName, srv_ctx.method());
300 
301     ByteBuffer srv_recv_buffer;
302     stream.Read(&srv_recv_buffer, tag(5));
303     server_ok(5);
304     EXPECT_TRUE(ParseFromByteBuffer(&srv_recv_buffer, &recv_request));
305     EXPECT_EQ(send_request.message(), recv_request.message());
306 
307     send_response.set_message(recv_request.message());
308     std::unique_ptr<ByteBuffer> srv_send_buffer =
309         SerializeToByteBuffer(&send_response);
310     stream.Write(*srv_send_buffer, tag(6));
311     server_ok(6);
312 
313     stream.Finish(Status::OK, tag(7));
314     server_ok(7);
315 
316     client_check.join();
317     EXPECT_TRUE(ParseFromByteBuffer(&cli_recv_buffer, &recv_response));
318     EXPECT_EQ(send_response.message(), recv_response.message());
319     EXPECT_TRUE(recv_status.ok());
320   }
321 }
322 
323 // One ping, one pong.
TEST_F(GenericEnd2endTest,SimpleBidiStreaming)324 TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
325   ResetStub();
326 
327   const std::string kMethodName(
328       "/grpc.cpp.test.util.EchoTestService/BidiStream");
329   EchoRequest send_request;
330   EchoRequest recv_request;
331   EchoResponse send_response;
332   EchoResponse recv_response;
333   Status recv_status;
334   ClientContext cli_ctx;
335   GenericServerContext srv_ctx;
336   GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
337 
338   cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
339   send_request.set_message("Hello");
340   std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
341       generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
342   cli_stream->StartCall(tag(1));
343   client_ok(1);
344 
345   generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
346                                srv_cq_.get(), tag(2));
347 
348   verify_ok(srv_cq_.get(), 2, true);
349   EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
350   EXPECT_EQ(kMethodName, srv_ctx.method());
351 
352   std::unique_ptr<ByteBuffer> send_buffer =
353       SerializeToByteBuffer(&send_request);
354   cli_stream->Write(*send_buffer, tag(3));
355   send_buffer.reset();
356   client_ok(3);
357 
358   ByteBuffer recv_buffer;
359   srv_stream.Read(&recv_buffer, tag(4));
360   server_ok(4);
361   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
362   EXPECT_EQ(send_request.message(), recv_request.message());
363 
364   send_response.set_message(recv_request.message());
365   send_buffer = SerializeToByteBuffer(&send_response);
366   srv_stream.Write(*send_buffer, tag(5));
367   send_buffer.reset();
368   server_ok(5);
369 
370   cli_stream->Read(&recv_buffer, tag(6));
371   client_ok(6);
372   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
373   EXPECT_EQ(send_response.message(), recv_response.message());
374 
375   cli_stream->WritesDone(tag(7));
376   client_ok(7);
377 
378   srv_stream.Read(&recv_buffer, tag(8));
379   server_fail(8);
380 
381   srv_stream.Finish(Status::OK, tag(9));
382   server_ok(9);
383 
384   cli_stream->Finish(&recv_status, tag(10));
385   client_ok(10);
386 
387   EXPECT_EQ(send_response.message(), recv_response.message());
388   EXPECT_TRUE(recv_status.ok());
389 }
390 
TEST_F(GenericEnd2endTest,Deadline)391 TEST_F(GenericEnd2endTest, Deadline) {
392   ResetStub();
393   SendRpc(1, true,
394           gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
395                        gpr_time_from_seconds(10, GPR_TIMESPAN)));
396 }
397 
TEST_F(GenericEnd2endTest,ShortDeadline)398 TEST_F(GenericEnd2endTest, ShortDeadline) {
399   ResetStub();
400 
401   ClientContext cli_ctx;
402   EchoRequest request;
403   EchoResponse response;
404 
405   shutting_down_ = false;
406   std::thread driver([this] { DriveCompletionQueue(); });
407 
408   request.set_message("");
409   cli_ctx.set_deadline(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
410                                     gpr_time_from_micros(500, GPR_TIMESPAN)));
411   Status s = stub_->Echo(&cli_ctx, request, &response);
412   EXPECT_FALSE(s.ok());
413   {
414     std::lock_guard<std::mutex> lock(shutting_down_mu_);
415     shutting_down_ = true;
416   }
417   ShutDownServerAndCQs();
418   driver.join();
419 }
420 
421 }  // namespace
422 }  // namespace testing
423 }  // namespace grpc
424 
main(int argc,char ** argv)425 int main(int argc, char** argv) {
426   grpc::testing::TestEnvironment env(argc, argv);
427   ::testing::InitGoogleTest(&argc, argv);
428   return RUN_ALL_TESTS();
429 }
430