• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <cinttypes>
20 #include <memory>
21 #include <thread>
22 
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/channel.h>
28 #include <grpcpp/client_context.h>
29 #include <grpcpp/create_channel.h>
30 #include <grpcpp/server.h>
31 #include <grpcpp/server_builder.h>
32 #include <grpcpp/server_context.h>
33 
34 #include "src/core/lib/gpr/env.h"
35 #include "src/core/lib/iomgr/port.h"
36 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
37 #include "src/proto/grpc/testing/echo.grpc.pb.h"
38 #include "test/core/util/port.h"
39 #include "test/core/util/test_config.h"
40 #include "test/cpp/util/byte_buffer_proto_helper.h"
41 #include "test/cpp/util/string_ref_helper.h"
42 
43 #include <gtest/gtest.h>
44 
45 using grpc::testing::EchoRequest;
46 using grpc::testing::EchoResponse;
47 
48 namespace grpc {
49 namespace testing {
50 
51 namespace {
52 
tag(int i)53 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
detag(void * p)54 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
55 
56 class Verifier {
57  public:
Verifier()58   Verifier() {}
59 
60   // Expect sets the expected ok value for a specific tag
Expect(int i,bool expect_ok)61   Verifier& Expect(int i, bool expect_ok) {
62     expectations_[tag(i)] = expect_ok;
63     return *this;
64   }
65 
66   // Next waits for 1 async tag to complete, checks its
67   // expectations, and returns the tag
Next(CompletionQueue * cq,bool ignore_ok)68   int Next(CompletionQueue* cq, bool ignore_ok) {
69     bool ok;
70     void* got_tag;
71     EXPECT_TRUE(cq->Next(&got_tag, &ok));
72     GotTag(got_tag, ok, ignore_ok);
73     return detag(got_tag);
74   }
75 
76   // Verify keeps calling Next until all currently set
77   // expected tags are complete
Verify(CompletionQueue * cq)78   void Verify(CompletionQueue* cq) {
79     GPR_ASSERT(!expectations_.empty());
80     while (!expectations_.empty()) {
81       Next(cq, false);
82     }
83   }
84 
85  private:
GotTag(void * got_tag,bool ok,bool ignore_ok)86   void GotTag(void* got_tag, bool ok, bool ignore_ok) {
87     auto it = expectations_.find(got_tag);
88     if (it != expectations_.end()) {
89       if (!ignore_ok) {
90         EXPECT_EQ(it->second, ok);
91       }
92       expectations_.erase(it);
93     }
94   }
95 
96   std::map<void*, bool> expectations_;
97 };
98 
99 class RawEnd2EndTest : public ::testing::Test {
100  protected:
RawEnd2EndTest()101   RawEnd2EndTest() {}
102 
SetUp()103   void SetUp() override {
104     port_ = grpc_pick_unused_port_or_die();
105     server_address_ << "localhost:" << port_;
106   }
107 
TearDown()108   void TearDown() override {
109     server_->Shutdown();
110     void* ignored_tag;
111     bool ignored_ok;
112     cq_->Shutdown();
113     while (cq_->Next(&ignored_tag, &ignored_ok))
114       ;
115     stub_.reset();
116     grpc_recycle_unused_port(port_);
117   }
118 
119   template <typename ServerType>
BuildAndStartServer()120   std::unique_ptr<ServerType> BuildAndStartServer() {
121     ServerBuilder builder;
122     builder.AddListeningPort(server_address_.str(),
123                              grpc::InsecureServerCredentials());
124     std::unique_ptr<ServerType> service(new ServerType());
125     builder.RegisterService(service.get());
126     cq_ = builder.AddCompletionQueue();
127     server_ = builder.BuildAndStart();
128     return service;
129   }
130 
ResetStub()131   void ResetStub() {
132     ChannelArguments args;
133     std::shared_ptr<Channel> channel = CreateChannel(
134         server_address_.str(), grpc::InsecureChannelCredentials());
135     stub_ = grpc::testing::EchoTestService::NewStub(channel);
136   }
137 
138   std::unique_ptr<ServerCompletionQueue> cq_;
139   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
140   std::unique_ptr<Server> server_;
141   std::ostringstream server_address_;
142   int port_;
143 
144   // For the client application to populate and send to server.
145   EchoRequest send_request_;
146   ::grpc::ByteBuffer send_request_buffer_;
147 
148   // For the server to give to gRPC to be populated by incoming request
149   // from client.
150   EchoRequest recv_request_;
151   ::grpc::ByteBuffer recv_request_buffer_;
152 
153   // For the server application to populate and send back to client.
154   EchoResponse send_response_;
155   ::grpc::ByteBuffer send_response_buffer_;
156 
157   // For the client to give to gRPC to be populated by incoming response
158   // from server.
159   EchoResponse recv_response_;
160   ::grpc::ByteBuffer recv_response_buffer_;
161   Status recv_status_;
162 
163   // Both sides need contexts
164   ClientContext cli_ctx_;
165   ServerContext srv_ctx_;
166 };
167 
168 // Regular Async, both peers use proto
TEST_F(RawEnd2EndTest,PureAsyncService)169 TEST_F(RawEnd2EndTest, PureAsyncService) {
170   typedef grpc::testing::EchoTestService::AsyncService SType;
171   ResetStub();
172   auto service = BuildAndStartServer<SType>();
173   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx_);
174 
175   send_request_.set_message("hello");
176   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
177       stub_->AsyncEcho(&cli_ctx_, send_request_, cq_.get()));
178   service->RequestEcho(&srv_ctx_, &recv_request_, &response_writer, cq_.get(),
179                        cq_.get(), tag(2));
180   response_reader->Finish(&recv_response_, &recv_status_, tag(4));
181   Verifier().Expect(2, true).Verify(cq_.get());
182   EXPECT_EQ(send_request_.message(), recv_request_.message());
183   send_response_.set_message(recv_request_.message());
184   response_writer.Finish(send_response_, Status::OK, tag(3));
185   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
186 
187   EXPECT_EQ(send_response_.message(), recv_response_.message());
188   EXPECT_TRUE(recv_status_.ok());
189 }
190 
191 // Client uses proto, server uses generic codegen, unary
TEST_F(RawEnd2EndTest,RawServerUnary)192 TEST_F(RawEnd2EndTest, RawServerUnary) {
193   typedef grpc::testing::EchoTestService::WithRawMethod_Echo<
194       grpc::testing::EchoTestService::Service>
195       SType;
196   ResetStub();
197   auto service = BuildAndStartServer<SType>();
198   grpc::GenericServerAsyncResponseWriter response_writer(&srv_ctx_);
199 
200   send_request_.set_message("hello unary");
201   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
202       stub_->AsyncEcho(&cli_ctx_, send_request_, cq_.get()));
203   service->RequestEcho(&srv_ctx_, &recv_request_buffer_, &response_writer,
204                        cq_.get(), cq_.get(), tag(2));
205   response_reader->Finish(&recv_response_, &recv_status_, tag(4));
206   Verifier().Expect(2, true).Verify(cq_.get());
207   EXPECT_TRUE(ParseFromByteBuffer(&recv_request_buffer_, &recv_request_));
208   EXPECT_EQ(send_request_.message(), recv_request_.message());
209   send_response_.set_message(recv_request_.message());
210   EXPECT_TRUE(
211       SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_));
212   response_writer.Finish(send_response_buffer_, Status::OK, tag(3));
213   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
214 
215   EXPECT_EQ(send_response_.message(), recv_response_.message());
216   EXPECT_TRUE(recv_status_.ok());
217 }
218 
219 // Client uses proto, server uses generic codegen, client streaming
TEST_F(RawEnd2EndTest,RawServerClientStreaming)220 TEST_F(RawEnd2EndTest, RawServerClientStreaming) {
221   typedef grpc::testing::EchoTestService::WithRawMethod_RequestStream<
222       grpc::testing::EchoTestService::Service>
223       SType;
224   ResetStub();
225   auto service = BuildAndStartServer<SType>();
226 
227   grpc::GenericServerAsyncReader srv_stream(&srv_ctx_);
228 
229   send_request_.set_message("hello client streaming");
230   std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
231       stub_->AsyncRequestStream(&cli_ctx_, &recv_response_, cq_.get(), tag(1)));
232 
233   service->RequestRequestStream(&srv_ctx_, &srv_stream, cq_.get(), cq_.get(),
234                                 tag(2));
235 
236   Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
237 
238   cli_stream->Write(send_request_, tag(3));
239   srv_stream.Read(&recv_request_buffer_, tag(4));
240   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
241   ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
242   EXPECT_EQ(send_request_.message(), recv_request_.message());
243 
244   cli_stream->Write(send_request_, tag(5));
245   srv_stream.Read(&recv_request_buffer_, tag(6));
246   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
247 
248   ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
249   EXPECT_EQ(send_request_.message(), recv_request_.message());
250   cli_stream->WritesDone(tag(7));
251   srv_stream.Read(&recv_request_buffer_, tag(8));
252   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
253 
254   ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
255   send_response_.set_message(recv_request_.message());
256   SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_);
257   srv_stream.Finish(send_response_buffer_, Status::OK, tag(9));
258   cli_stream->Finish(&recv_status_, tag(10));
259   Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
260 
261   EXPECT_EQ(send_response_.message(), recv_response_.message());
262   EXPECT_TRUE(recv_status_.ok());
263 }
264 
265 // Client uses proto, server uses generic codegen, server streaming
TEST_F(RawEnd2EndTest,RawServerServerStreaming)266 TEST_F(RawEnd2EndTest, RawServerServerStreaming) {
267   typedef grpc::testing::EchoTestService::WithRawMethod_ResponseStream<
268       grpc::testing::EchoTestService::Service>
269       SType;
270   ResetStub();
271   auto service = BuildAndStartServer<SType>();
272   grpc::GenericServerAsyncWriter srv_stream(&srv_ctx_);
273 
274   send_request_.set_message("hello server streaming");
275   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
276       stub_->AsyncResponseStream(&cli_ctx_, send_request_, cq_.get(), tag(1)));
277 
278   service->RequestResponseStream(&srv_ctx_, &recv_request_buffer_, &srv_stream,
279                                  cq_.get(), cq_.get(), tag(2));
280 
281   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
282   ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
283   EXPECT_EQ(send_request_.message(), recv_request_.message());
284 
285   send_response_.set_message(recv_request_.message());
286   SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_);
287   srv_stream.Write(send_response_buffer_, tag(3));
288   cli_stream->Read(&recv_response_, tag(4));
289   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
290   EXPECT_EQ(send_response_.message(), recv_response_.message());
291 
292   srv_stream.Write(send_response_buffer_, tag(5));
293   cli_stream->Read(&recv_response_, tag(6));
294   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
295   EXPECT_EQ(send_response_.message(), recv_response_.message());
296 
297   srv_stream.Finish(Status::OK, tag(7));
298   cli_stream->Read(&recv_response_, tag(8));
299   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
300 
301   cli_stream->Finish(&recv_status_, tag(9));
302   Verifier().Expect(9, true).Verify(cq_.get());
303 
304   EXPECT_TRUE(recv_status_.ok());
305 }
306 
307 // Client uses proto, server uses generic codegen, bidi streaming
TEST_F(RawEnd2EndTest,RawServerBidiStreaming)308 TEST_F(RawEnd2EndTest, RawServerBidiStreaming) {
309   typedef grpc::testing::EchoTestService::WithRawMethod_BidiStream<
310       grpc::testing::EchoTestService::Service>
311       SType;
312   ResetStub();
313   auto service = BuildAndStartServer<SType>();
314 
315   grpc::GenericServerAsyncReaderWriter srv_stream(&srv_ctx_);
316 
317   send_request_.set_message("hello bidi streaming");
318   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
319       cli_stream(stub_->AsyncBidiStream(&cli_ctx_, cq_.get(), tag(1)));
320 
321   service->RequestBidiStream(&srv_ctx_, &srv_stream, cq_.get(), cq_.get(),
322                              tag(2));
323 
324   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
325 
326   cli_stream->Write(send_request_, tag(3));
327   srv_stream.Read(&recv_request_buffer_, tag(4));
328   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
329   ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
330   EXPECT_EQ(send_request_.message(), recv_request_.message());
331 
332   send_response_.set_message(recv_request_.message());
333   SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_);
334   srv_stream.Write(send_response_buffer_, tag(5));
335   cli_stream->Read(&recv_response_, tag(6));
336   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
337   EXPECT_EQ(send_response_.message(), recv_response_.message());
338 
339   cli_stream->WritesDone(tag(7));
340   srv_stream.Read(&recv_request_buffer_, tag(8));
341   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
342 
343   srv_stream.Finish(Status::OK, tag(9));
344   cli_stream->Finish(&recv_status_, tag(10));
345   Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
346 
347   EXPECT_TRUE(recv_status_.ok());
348 }
349 
350 // Testing that this pattern compiles
TEST_F(RawEnd2EndTest,CompileTest)351 TEST_F(RawEnd2EndTest, CompileTest) {
352   typedef grpc::testing::EchoTestService::WithRawMethod_Echo<
353       grpc::testing::EchoTestService::AsyncService>
354       SType;
355   ResetStub();
356   auto service = BuildAndStartServer<SType>();
357 }
358 
359 }  // namespace
360 }  // namespace testing
361 }  // namespace grpc
362 
main(int argc,char ** argv)363 int main(int argc, char** argv) {
364   // Change the backup poll interval from 5s to 100ms to speed up the
365   // ReconnectChannel test
366   grpc_test_init(argc, argv);
367   ::testing::InitGoogleTest(&argc, argv);
368   int ret = RUN_ALL_TESTS();
369   return ret;
370 }
371