1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <cinttypes>
20 #include <memory>
21 #include <thread>
22
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/channel.h>
28 #include <grpcpp/client_context.h>
29 #include <grpcpp/create_channel.h>
30 #include <grpcpp/server.h>
31 #include <grpcpp/server_builder.h>
32 #include <grpcpp/server_context.h>
33
34 #include "src/core/lib/gpr/env.h"
35 #include "src/core/lib/iomgr/port.h"
36 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
37 #include "src/proto/grpc/testing/echo.grpc.pb.h"
38 #include "test/core/util/port.h"
39 #include "test/core/util/test_config.h"
40 #include "test/cpp/util/byte_buffer_proto_helper.h"
41 #include "test/cpp/util/string_ref_helper.h"
42
43 #include <gtest/gtest.h>
44
45 using grpc::testing::EchoRequest;
46 using grpc::testing::EchoResponse;
47
48 namespace grpc {
49 namespace testing {
50
51 namespace {
52
tag(int i)53 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
detag(void * p)54 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
55
56 class Verifier {
57 public:
Verifier()58 Verifier() {}
59
60 // Expect sets the expected ok value for a specific tag
Expect(int i,bool expect_ok)61 Verifier& Expect(int i, bool expect_ok) {
62 expectations_[tag(i)] = expect_ok;
63 return *this;
64 }
65
66 // Next waits for 1 async tag to complete, checks its
67 // expectations, and returns the tag
Next(CompletionQueue * cq,bool ignore_ok)68 int Next(CompletionQueue* cq, bool ignore_ok) {
69 bool ok;
70 void* got_tag;
71 EXPECT_TRUE(cq->Next(&got_tag, &ok));
72 GotTag(got_tag, ok, ignore_ok);
73 return detag(got_tag);
74 }
75
76 // Verify keeps calling Next until all currently set
77 // expected tags are complete
Verify(CompletionQueue * cq)78 void Verify(CompletionQueue* cq) {
79 GPR_ASSERT(!expectations_.empty());
80 while (!expectations_.empty()) {
81 Next(cq, false);
82 }
83 }
84
85 private:
GotTag(void * got_tag,bool ok,bool ignore_ok)86 void GotTag(void* got_tag, bool ok, bool ignore_ok) {
87 auto it = expectations_.find(got_tag);
88 if (it != expectations_.end()) {
89 if (!ignore_ok) {
90 EXPECT_EQ(it->second, ok);
91 }
92 expectations_.erase(it);
93 }
94 }
95
96 std::map<void*, bool> expectations_;
97 };
98
99 class RawEnd2EndTest : public ::testing::Test {
100 protected:
RawEnd2EndTest()101 RawEnd2EndTest() {}
102
SetUp()103 void SetUp() override {
104 port_ = grpc_pick_unused_port_or_die();
105 server_address_ << "localhost:" << port_;
106 }
107
TearDown()108 void TearDown() override {
109 server_->Shutdown();
110 void* ignored_tag;
111 bool ignored_ok;
112 cq_->Shutdown();
113 while (cq_->Next(&ignored_tag, &ignored_ok))
114 ;
115 stub_.reset();
116 grpc_recycle_unused_port(port_);
117 }
118
119 template <typename ServerType>
BuildAndStartServer()120 std::unique_ptr<ServerType> BuildAndStartServer() {
121 ServerBuilder builder;
122 builder.AddListeningPort(server_address_.str(),
123 grpc::InsecureServerCredentials());
124 std::unique_ptr<ServerType> service(new ServerType());
125 builder.RegisterService(service.get());
126 cq_ = builder.AddCompletionQueue();
127 server_ = builder.BuildAndStart();
128 return service;
129 }
130
ResetStub()131 void ResetStub() {
132 ChannelArguments args;
133 std::shared_ptr<Channel> channel = CreateChannel(
134 server_address_.str(), grpc::InsecureChannelCredentials());
135 stub_ = grpc::testing::EchoTestService::NewStub(channel);
136 }
137
138 std::unique_ptr<ServerCompletionQueue> cq_;
139 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
140 std::unique_ptr<Server> server_;
141 std::ostringstream server_address_;
142 int port_;
143
144 // For the client application to populate and send to server.
145 EchoRequest send_request_;
146 ::grpc::ByteBuffer send_request_buffer_;
147
148 // For the server to give to gRPC to be populated by incoming request
149 // from client.
150 EchoRequest recv_request_;
151 ::grpc::ByteBuffer recv_request_buffer_;
152
153 // For the server application to populate and send back to client.
154 EchoResponse send_response_;
155 ::grpc::ByteBuffer send_response_buffer_;
156
157 // For the client to give to gRPC to be populated by incoming response
158 // from server.
159 EchoResponse recv_response_;
160 ::grpc::ByteBuffer recv_response_buffer_;
161 Status recv_status_;
162
163 // Both sides need contexts
164 ClientContext cli_ctx_;
165 ServerContext srv_ctx_;
166 };
167
168 // Regular Async, both peers use proto
TEST_F(RawEnd2EndTest,PureAsyncService)169 TEST_F(RawEnd2EndTest, PureAsyncService) {
170 typedef grpc::testing::EchoTestService::AsyncService SType;
171 ResetStub();
172 auto service = BuildAndStartServer<SType>();
173 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx_);
174
175 send_request_.set_message("hello");
176 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
177 stub_->AsyncEcho(&cli_ctx_, send_request_, cq_.get()));
178 service->RequestEcho(&srv_ctx_, &recv_request_, &response_writer, cq_.get(),
179 cq_.get(), tag(2));
180 response_reader->Finish(&recv_response_, &recv_status_, tag(4));
181 Verifier().Expect(2, true).Verify(cq_.get());
182 EXPECT_EQ(send_request_.message(), recv_request_.message());
183 send_response_.set_message(recv_request_.message());
184 response_writer.Finish(send_response_, Status::OK, tag(3));
185 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
186
187 EXPECT_EQ(send_response_.message(), recv_response_.message());
188 EXPECT_TRUE(recv_status_.ok());
189 }
190
191 // Client uses proto, server uses generic codegen, unary
TEST_F(RawEnd2EndTest,RawServerUnary)192 TEST_F(RawEnd2EndTest, RawServerUnary) {
193 typedef grpc::testing::EchoTestService::WithRawMethod_Echo<
194 grpc::testing::EchoTestService::Service>
195 SType;
196 ResetStub();
197 auto service = BuildAndStartServer<SType>();
198 grpc::GenericServerAsyncResponseWriter response_writer(&srv_ctx_);
199
200 send_request_.set_message("hello unary");
201 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
202 stub_->AsyncEcho(&cli_ctx_, send_request_, cq_.get()));
203 service->RequestEcho(&srv_ctx_, &recv_request_buffer_, &response_writer,
204 cq_.get(), cq_.get(), tag(2));
205 response_reader->Finish(&recv_response_, &recv_status_, tag(4));
206 Verifier().Expect(2, true).Verify(cq_.get());
207 EXPECT_TRUE(ParseFromByteBuffer(&recv_request_buffer_, &recv_request_));
208 EXPECT_EQ(send_request_.message(), recv_request_.message());
209 send_response_.set_message(recv_request_.message());
210 EXPECT_TRUE(
211 SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_));
212 response_writer.Finish(send_response_buffer_, Status::OK, tag(3));
213 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
214
215 EXPECT_EQ(send_response_.message(), recv_response_.message());
216 EXPECT_TRUE(recv_status_.ok());
217 }
218
219 // Client uses proto, server uses generic codegen, client streaming
TEST_F(RawEnd2EndTest,RawServerClientStreaming)220 TEST_F(RawEnd2EndTest, RawServerClientStreaming) {
221 typedef grpc::testing::EchoTestService::WithRawMethod_RequestStream<
222 grpc::testing::EchoTestService::Service>
223 SType;
224 ResetStub();
225 auto service = BuildAndStartServer<SType>();
226
227 grpc::GenericServerAsyncReader srv_stream(&srv_ctx_);
228
229 send_request_.set_message("hello client streaming");
230 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
231 stub_->AsyncRequestStream(&cli_ctx_, &recv_response_, cq_.get(), tag(1)));
232
233 service->RequestRequestStream(&srv_ctx_, &srv_stream, cq_.get(), cq_.get(),
234 tag(2));
235
236 Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
237
238 cli_stream->Write(send_request_, tag(3));
239 srv_stream.Read(&recv_request_buffer_, tag(4));
240 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
241 ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
242 EXPECT_EQ(send_request_.message(), recv_request_.message());
243
244 cli_stream->Write(send_request_, tag(5));
245 srv_stream.Read(&recv_request_buffer_, tag(6));
246 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
247
248 ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
249 EXPECT_EQ(send_request_.message(), recv_request_.message());
250 cli_stream->WritesDone(tag(7));
251 srv_stream.Read(&recv_request_buffer_, tag(8));
252 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
253
254 ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
255 send_response_.set_message(recv_request_.message());
256 SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_);
257 srv_stream.Finish(send_response_buffer_, Status::OK, tag(9));
258 cli_stream->Finish(&recv_status_, tag(10));
259 Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
260
261 EXPECT_EQ(send_response_.message(), recv_response_.message());
262 EXPECT_TRUE(recv_status_.ok());
263 }
264
265 // Client uses proto, server uses generic codegen, server streaming
TEST_F(RawEnd2EndTest,RawServerServerStreaming)266 TEST_F(RawEnd2EndTest, RawServerServerStreaming) {
267 typedef grpc::testing::EchoTestService::WithRawMethod_ResponseStream<
268 grpc::testing::EchoTestService::Service>
269 SType;
270 ResetStub();
271 auto service = BuildAndStartServer<SType>();
272 grpc::GenericServerAsyncWriter srv_stream(&srv_ctx_);
273
274 send_request_.set_message("hello server streaming");
275 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
276 stub_->AsyncResponseStream(&cli_ctx_, send_request_, cq_.get(), tag(1)));
277
278 service->RequestResponseStream(&srv_ctx_, &recv_request_buffer_, &srv_stream,
279 cq_.get(), cq_.get(), tag(2));
280
281 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
282 ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
283 EXPECT_EQ(send_request_.message(), recv_request_.message());
284
285 send_response_.set_message(recv_request_.message());
286 SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_);
287 srv_stream.Write(send_response_buffer_, tag(3));
288 cli_stream->Read(&recv_response_, tag(4));
289 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
290 EXPECT_EQ(send_response_.message(), recv_response_.message());
291
292 srv_stream.Write(send_response_buffer_, tag(5));
293 cli_stream->Read(&recv_response_, tag(6));
294 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
295 EXPECT_EQ(send_response_.message(), recv_response_.message());
296
297 srv_stream.Finish(Status::OK, tag(7));
298 cli_stream->Read(&recv_response_, tag(8));
299 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
300
301 cli_stream->Finish(&recv_status_, tag(9));
302 Verifier().Expect(9, true).Verify(cq_.get());
303
304 EXPECT_TRUE(recv_status_.ok());
305 }
306
307 // Client uses proto, server uses generic codegen, bidi streaming
TEST_F(RawEnd2EndTest,RawServerBidiStreaming)308 TEST_F(RawEnd2EndTest, RawServerBidiStreaming) {
309 typedef grpc::testing::EchoTestService::WithRawMethod_BidiStream<
310 grpc::testing::EchoTestService::Service>
311 SType;
312 ResetStub();
313 auto service = BuildAndStartServer<SType>();
314
315 grpc::GenericServerAsyncReaderWriter srv_stream(&srv_ctx_);
316
317 send_request_.set_message("hello bidi streaming");
318 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
319 cli_stream(stub_->AsyncBidiStream(&cli_ctx_, cq_.get(), tag(1)));
320
321 service->RequestBidiStream(&srv_ctx_, &srv_stream, cq_.get(), cq_.get(),
322 tag(2));
323
324 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
325
326 cli_stream->Write(send_request_, tag(3));
327 srv_stream.Read(&recv_request_buffer_, tag(4));
328 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
329 ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
330 EXPECT_EQ(send_request_.message(), recv_request_.message());
331
332 send_response_.set_message(recv_request_.message());
333 SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_);
334 srv_stream.Write(send_response_buffer_, tag(5));
335 cli_stream->Read(&recv_response_, tag(6));
336 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
337 EXPECT_EQ(send_response_.message(), recv_response_.message());
338
339 cli_stream->WritesDone(tag(7));
340 srv_stream.Read(&recv_request_buffer_, tag(8));
341 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
342
343 srv_stream.Finish(Status::OK, tag(9));
344 cli_stream->Finish(&recv_status_, tag(10));
345 Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
346
347 EXPECT_TRUE(recv_status_.ok());
348 }
349
350 // Testing that this pattern compiles
TEST_F(RawEnd2EndTest,CompileTest)351 TEST_F(RawEnd2EndTest, CompileTest) {
352 typedef grpc::testing::EchoTestService::WithRawMethod_Echo<
353 grpc::testing::EchoTestService::AsyncService>
354 SType;
355 ResetStub();
356 auto service = BuildAndStartServer<SType>();
357 }
358
359 } // namespace
360 } // namespace testing
361 } // namespace grpc
362
main(int argc,char ** argv)363 int main(int argc, char** argv) {
364 // Change the backup poll interval from 5s to 100ms to speed up the
365 // ReconnectChannel test
366 grpc_test_init(argc, argv);
367 ::testing::InitGoogleTest(&argc, argv);
368 int ret = RUN_ALL_TESTS();
369 return ret;
370 }
371