• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/grpc.h>
20 #include <grpc/support/time.h>
21 #include <grpcpp/channel.h>
22 #include <grpcpp/client_context.h>
23 #include <grpcpp/create_channel.h>
24 #include <grpcpp/generic/async_generic_service.h>
25 #include <grpcpp/generic/generic_stub.h>
26 #include <grpcpp/impl/proto_utils.h>
27 #include <grpcpp/server.h>
28 #include <grpcpp/server_builder.h>
29 #include <grpcpp/server_context.h>
30 #include <grpcpp/support/slice.h>
31 #include <gtest/gtest.h>
32 
33 #include <memory>
34 #include <thread>
35 
36 #include "absl/memory/memory.h"
37 #include "src/proto/grpc/testing/echo.grpc.pb.h"
38 #include "test/core/test_util/port.h"
39 #include "test/core/test_util/test_config.h"
40 #include "test/cpp/util/byte_buffer_proto_helper.h"
41 
42 namespace grpc {
43 namespace testing {
44 namespace {
45 
tag(int i)46 void* tag(int i) { return reinterpret_cast<void*>(i); }
47 
verify_ok(CompletionQueue * cq,int i,bool expect_ok)48 void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
49   bool ok;
50   void* got_tag;
51   EXPECT_TRUE(cq->Next(&got_tag, &ok));
52   EXPECT_EQ(expect_ok, ok);
53   EXPECT_EQ(tag(i), got_tag);
54 }
55 
56 class GenericEnd2endTest : public ::testing::Test {
57  protected:
GenericEnd2endTest()58   GenericEnd2endTest() : server_host_("localhost") {}
59 
SetUp()60   void SetUp() override {
61     shut_down_ = false;
62     int port = grpc_pick_unused_port_or_die();
63     server_address_ << server_host_ << ":" << port;
64     // Setup server
65     ServerBuilder builder;
66     builder.AddListeningPort(server_address_.str(),
67                              InsecureServerCredentials());
68     builder.RegisterAsyncGenericService(&generic_service_);
69     // Include a second call to RegisterAsyncGenericService to make sure that
70     // we get an error in the log, since it is not allowed to have 2 async
71     // generic services
72     builder.RegisterAsyncGenericService(&generic_service_);
73     srv_cq_ = builder.AddCompletionQueue();
74     server_ = builder.BuildAndStart();
75   }
76 
ShutDownServerAndCQs()77   void ShutDownServerAndCQs() {
78     if (!shut_down_) {
79       server_->Shutdown();
80       void* ignored_tag;
81       bool ignored_ok;
82       cli_cq_.Shutdown();
83       srv_cq_->Shutdown();
84       while (cli_cq_.Next(&ignored_tag, &ignored_ok)) {
85       }
86       while (srv_cq_->Next(&ignored_tag, &ignored_ok)) {
87       }
88       shut_down_ = true;
89     }
90   }
TearDown()91   void TearDown() override { ShutDownServerAndCQs(); }
92 
ResetStub()93   void ResetStub() {
94     std::shared_ptr<Channel> channel = grpc::CreateChannel(
95         server_address_.str(), InsecureChannelCredentials());
96     stub_ = grpc::testing::EchoTestService::NewStub(channel);
97     generic_stub_ = std::make_unique<GenericStub>(channel);
98   }
99 
server_ok(int i)100   void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
client_ok(int i)101   void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
server_fail(int i)102   void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
client_fail(int i)103   void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
104 
SendRpc(int num_rpcs)105   void SendRpc(int num_rpcs) {
106     SendRpc(num_rpcs, false, gpr_inf_future(GPR_CLOCK_MONOTONIC));
107   }
108 
SendRpc(int num_rpcs,bool check_deadline,gpr_timespec deadline)109   void SendRpc(int num_rpcs, bool check_deadline, gpr_timespec deadline) {
110     const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
111     for (int i = 0; i < num_rpcs; i++) {
112       EchoRequest send_request;
113       EchoRequest recv_request;
114       EchoResponse send_response;
115       EchoResponse recv_response;
116       Status recv_status;
117 
118       ClientContext cli_ctx;
119       GenericServerContext srv_ctx;
120       GenericServerAsyncReaderWriter stream(&srv_ctx);
121 
122       // The string needs to be long enough to test heap-based slice.
123       send_request.set_message("Hello world. Hello world. Hello world.");
124 
125       if (check_deadline) {
126         cli_ctx.set_deadline(deadline);
127       }
128 
129       // Rather than using the original kMethodName, make a short-lived
130       // copy to also confirm that we don't refer to this object beyond
131       // the initial call preparation
132       const std::string* method_name = new std::string(kMethodName);
133 
134       std::unique_ptr<GenericClientAsyncReaderWriter> call =
135           generic_stub_->PrepareCall(&cli_ctx, *method_name, &cli_cq_);
136 
137       delete method_name;  // Make sure that this is not needed after invocation
138 
139       std::thread request_call([this]() { server_ok(4); });
140       call->StartCall(tag(1));
141       client_ok(1);
142       std::unique_ptr<ByteBuffer> send_buffer =
143           SerializeToByteBuffer(&send_request);
144       call->Write(*send_buffer, tag(2));
145       // Send ByteBuffer can be destroyed after calling Write.
146       send_buffer.reset();
147       client_ok(2);
148       call->WritesDone(tag(3));
149       client_ok(3);
150 
151       generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
152                                    srv_cq_.get(), tag(4));
153 
154       request_call.join();
155       EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
156       EXPECT_EQ(kMethodName, srv_ctx.method());
157 
158       if (check_deadline) {
159         EXPECT_TRUE(gpr_time_similar(deadline, srv_ctx.raw_deadline(),
160                                      gpr_time_from_millis(1000, GPR_TIMESPAN)));
161       }
162 
163       ByteBuffer recv_buffer;
164       stream.Read(&recv_buffer, tag(5));
165       server_ok(5);
166       EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
167       EXPECT_EQ(send_request.message(), recv_request.message());
168 
169       send_response.set_message(recv_request.message());
170       send_buffer = SerializeToByteBuffer(&send_response);
171       stream.Write(*send_buffer, tag(6));
172       send_buffer.reset();
173       server_ok(6);
174 
175       stream.Finish(Status::OK, tag(7));
176       server_ok(7);
177 
178       recv_buffer.Clear();
179       call->Read(&recv_buffer, tag(8));
180       client_ok(8);
181       EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
182 
183       call->Finish(&recv_status, tag(9));
184       client_ok(9);
185 
186       EXPECT_EQ(send_response.message(), recv_response.message());
187       EXPECT_TRUE(recv_status.ok());
188     }
189   }
190 
191   // Return errors to up to one call that comes in on the supplied completion
192   // queue, until the CQ is being shut down (and therefore we can no longer
193   // enqueue further events).
DriveCompletionQueue()194   void DriveCompletionQueue() {
195     enum class Event : uintptr_t {
196       kCallReceived,
197       kResponseSent,
198     };
199     // Request the call, but only if the main thread hasn't beaten us to
200     // shutting down the CQ.
201     grpc::GenericServerContext server_context;
202     grpc::GenericServerAsyncReaderWriter reader_writer(&server_context);
203 
204     {
205       std::lock_guard<std::mutex> lock(shutting_down_mu_);
206       if (!shutting_down_) {
207         generic_service_.RequestCall(
208             &server_context, &reader_writer, srv_cq_.get(), srv_cq_.get(),
209             reinterpret_cast<void*>(Event::kCallReceived));
210       }
211     }
212     // Process events.
213     {
214       Event event;
215       bool ok;
216       while (srv_cq_->Next(reinterpret_cast<void**>(&event), &ok)) {
217         std::lock_guard<std::mutex> lock(shutting_down_mu_);
218         if (shutting_down_) {
219           // The main thread has started shutting down. Simply continue to drain
220           // events.
221           continue;
222         }
223 
224         switch (event) {
225           case Event::kCallReceived:
226             reader_writer.Finish(
227                 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "go away"),
228                 reinterpret_cast<void*>(Event::kResponseSent));
229             break;
230 
231           case Event::kResponseSent:
232             // We are done.
233             break;
234         }
235       }
236     }
237   }
238 
239   CompletionQueue cli_cq_;
240   std::unique_ptr<ServerCompletionQueue> srv_cq_;
241   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
242   std::unique_ptr<grpc::GenericStub> generic_stub_;
243   std::unique_ptr<Server> server_;
244   AsyncGenericService generic_service_;
245   const std::string server_host_;
246   std::ostringstream server_address_;
247   bool shutting_down_;
248   bool shut_down_;
249   std::mutex shutting_down_mu_;
250 };
251 
TEST_F(GenericEnd2endTest,SimpleRpc)252 TEST_F(GenericEnd2endTest, SimpleRpc) {
253   ResetStub();
254   SendRpc(1);
255 }
256 
TEST_F(GenericEnd2endTest,SequentialRpcs)257 TEST_F(GenericEnd2endTest, SequentialRpcs) {
258   ResetStub();
259   SendRpc(10);
260 }
261 
TEST_F(GenericEnd2endTest,SequentialUnaryRpcs)262 TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) {
263   ResetStub();
264   const int num_rpcs = 10;
265   const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
266   for (int i = 0; i < num_rpcs; i++) {
267     EchoRequest send_request;
268     EchoRequest recv_request;
269     EchoResponse send_response;
270     EchoResponse recv_response;
271     Status recv_status;
272 
273     ClientContext cli_ctx;
274     GenericServerContext srv_ctx;
275     GenericServerAsyncReaderWriter stream(&srv_ctx);
276 
277     // The string needs to be long enough to test heap-based slice.
278     send_request.set_message("Hello world. Hello world. Hello world.");
279 
280     std::unique_ptr<ByteBuffer> cli_send_buffer =
281         SerializeToByteBuffer(&send_request);
282     std::thread request_call([this]() { server_ok(4); });
283     std::unique_ptr<GenericClientAsyncResponseReader> call =
284         generic_stub_->PrepareUnaryCall(&cli_ctx, kMethodName, *cli_send_buffer,
285                                         &cli_cq_);
286     call->StartCall();
287     ByteBuffer cli_recv_buffer;
288     call->Finish(&cli_recv_buffer, &recv_status, tag(1));
289     std::thread client_check([this] { client_ok(1); });
290 
291     generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
292                                  srv_cq_.get(), tag(4));
293     request_call.join();
294     EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
295     EXPECT_EQ(kMethodName, srv_ctx.method());
296 
297     ByteBuffer srv_recv_buffer;
298     stream.Read(&srv_recv_buffer, tag(5));
299     server_ok(5);
300     EXPECT_TRUE(ParseFromByteBuffer(&srv_recv_buffer, &recv_request));
301     EXPECT_EQ(send_request.message(), recv_request.message());
302 
303     send_response.set_message(recv_request.message());
304     std::unique_ptr<ByteBuffer> srv_send_buffer =
305         SerializeToByteBuffer(&send_response);
306     stream.Write(*srv_send_buffer, tag(6));
307     server_ok(6);
308 
309     stream.Finish(Status::OK, tag(7));
310     server_ok(7);
311 
312     client_check.join();
313     EXPECT_TRUE(ParseFromByteBuffer(&cli_recv_buffer, &recv_response));
314     EXPECT_EQ(send_response.message(), recv_response.message());
315     EXPECT_TRUE(recv_status.ok());
316   }
317 }
318 
319 // One ping, one pong.
TEST_F(GenericEnd2endTest,SimpleBidiStreaming)320 TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
321   ResetStub();
322 
323   const std::string kMethodName(
324       "/grpc.cpp.test.util.EchoTestService/BidiStream");
325   EchoRequest send_request;
326   EchoRequest recv_request;
327   EchoResponse send_response;
328   EchoResponse recv_response;
329   Status recv_status;
330   ClientContext cli_ctx;
331   GenericServerContext srv_ctx;
332   GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
333 
334   cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
335   send_request.set_message("Hello");
336   std::thread request_call([this]() { server_ok(2); });
337   std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
338       generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
339   cli_stream->StartCall(tag(1));
340   client_ok(1);
341 
342   generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
343                                srv_cq_.get(), tag(2));
344   request_call.join();
345 
346   EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
347   EXPECT_EQ(kMethodName, srv_ctx.method());
348 
349   std::unique_ptr<ByteBuffer> send_buffer =
350       SerializeToByteBuffer(&send_request);
351   cli_stream->Write(*send_buffer, tag(3));
352   send_buffer.reset();
353   client_ok(3);
354 
355   ByteBuffer recv_buffer;
356   srv_stream.Read(&recv_buffer, tag(4));
357   server_ok(4);
358   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
359   EXPECT_EQ(send_request.message(), recv_request.message());
360 
361   send_response.set_message(recv_request.message());
362   send_buffer = SerializeToByteBuffer(&send_response);
363   srv_stream.Write(*send_buffer, tag(5));
364   send_buffer.reset();
365   server_ok(5);
366 
367   cli_stream->Read(&recv_buffer, tag(6));
368   client_ok(6);
369   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
370   EXPECT_EQ(send_response.message(), recv_response.message());
371 
372   cli_stream->WritesDone(tag(7));
373   client_ok(7);
374 
375   srv_stream.Read(&recv_buffer, tag(8));
376   server_fail(8);
377 
378   srv_stream.Finish(Status::OK, tag(9));
379   server_ok(9);
380 
381   cli_stream->Finish(&recv_status, tag(10));
382   client_ok(10);
383 
384   EXPECT_EQ(send_response.message(), recv_response.message());
385   EXPECT_TRUE(recv_status.ok());
386 }
387 
TEST_F(GenericEnd2endTest,Deadline)388 TEST_F(GenericEnd2endTest, Deadline) {
389   ResetStub();
390   SendRpc(1, true,
391           gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
392                        gpr_time_from_seconds(10, GPR_TIMESPAN)));
393 }
394 
TEST_F(GenericEnd2endTest,ShortDeadline)395 TEST_F(GenericEnd2endTest, ShortDeadline) {
396   ResetStub();
397 
398   ClientContext cli_ctx;
399   EchoRequest request;
400   EchoResponse response;
401 
402   shutting_down_ = false;
403   std::thread driver([this] { DriveCompletionQueue(); });
404 
405   request.set_message("");
406   cli_ctx.set_deadline(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
407                                     gpr_time_from_micros(500, GPR_TIMESPAN)));
408   Status s = stub_->Echo(&cli_ctx, request, &response);
409   EXPECT_FALSE(s.ok());
410   {
411     std::lock_guard<std::mutex> lock(shutting_down_mu_);
412     shutting_down_ = true;
413   }
414   ShutDownServerAndCQs();
415   driver.join();
416 }
417 
418 }  // namespace
419 }  // namespace testing
420 }  // namespace grpc
421 
main(int argc,char ** argv)422 int main(int argc, char** argv) {
423   grpc::testing::TestEnvironment env(&argc, argv);
424   ::testing::InitGoogleTest(&argc, argv);
425   return RUN_ALL_TESTS();
426 }
427