1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <memory>
20
21 #include <grpcpp/channel.h>
22 #include <grpcpp/client_context.h>
23 #include <grpcpp/create_channel.h>
24 #include <grpcpp/server.h>
25 #include <grpcpp/server_builder.h>
26 #include <grpcpp/server_context.h>
27
28 #include "src/core/lib/gpr/tls.h"
29 #include "src/core/lib/iomgr/port.h"
30 #include "src/proto/grpc/testing/echo.grpc.pb.h"
31 #include "test/core/util/port.h"
32 #include "test/core/util/test_config.h"
33
34 #ifdef GRPC_POSIX_SOCKET
35 #include "src/core/lib/iomgr/ev_posix.h"
36 #endif // GRPC_POSIX_SOCKET
37
38 #include <gtest/gtest.h>
39
40 #ifdef GRPC_POSIX_SOCKET
41 // Thread-local variable to so that only polls from this test assert
42 // non-blocking (not polls from resolver, timer thread, etc), and only when the
43 // thread is waiting on polls caused by CompletionQueue::AsyncNext (not for
44 // picking a port or other reasons).
45 GPR_TLS_DECL(g_is_nonblocking_poll);
46
47 namespace {
48
maybe_assert_non_blocking_poll(struct pollfd * pfds,nfds_t nfds,int timeout)49 int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
50 int timeout) {
51 // Only assert that this poll should have zero timeout if we're in the
52 // middle of a zero-timeout CQ Next.
53 if (gpr_tls_get(&g_is_nonblocking_poll)) {
54 GPR_ASSERT(timeout == 0);
55 }
56 return poll(pfds, nfds, timeout);
57 }
58
59 } // namespace
60
61 namespace grpc {
62 namespace testing {
63 namespace {
64
tag(int i)65 void* tag(int i) { return reinterpret_cast<void*>(static_cast<intptr_t>(i)); }
detag(void * p)66 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
67
68 class NonblockingTest : public ::testing::Test {
69 protected:
NonblockingTest()70 NonblockingTest() {}
71
SetUp()72 void SetUp() override {
73 port_ = grpc_pick_unused_port_or_die();
74 server_address_ << "localhost:" << port_;
75
76 // Setup server
77 BuildAndStartServer();
78 }
79
LoopForTag(void ** tag,bool * ok)80 bool LoopForTag(void** tag, bool* ok) {
81 // Temporarily set the thread-local nonblocking poll flag so that the polls
82 // caused by this loop are indeed sent by the library with zero timeout.
83 intptr_t orig_val = gpr_tls_get(&g_is_nonblocking_poll);
84 gpr_tls_set(&g_is_nonblocking_poll, static_cast<intptr_t>(true));
85 for (;;) {
86 auto r = cq_->AsyncNext(tag, ok, gpr_time_0(GPR_CLOCK_REALTIME));
87 if (r == CompletionQueue::SHUTDOWN) {
88 gpr_tls_set(&g_is_nonblocking_poll, orig_val);
89 return false;
90 } else if (r == CompletionQueue::GOT_EVENT) {
91 gpr_tls_set(&g_is_nonblocking_poll, orig_val);
92 return true;
93 }
94 }
95 }
96
TearDown()97 void TearDown() override {
98 server_->Shutdown();
99 void* ignored_tag;
100 bool ignored_ok;
101 cq_->Shutdown();
102 while (LoopForTag(&ignored_tag, &ignored_ok))
103 ;
104 stub_.reset();
105 grpc_recycle_unused_port(port_);
106 }
107
BuildAndStartServer()108 void BuildAndStartServer() {
109 ServerBuilder builder;
110 builder.AddListeningPort(server_address_.str(),
111 grpc::InsecureServerCredentials());
112 service_.reset(new grpc::testing::EchoTestService::AsyncService());
113 builder.RegisterService(service_.get());
114 cq_ = builder.AddCompletionQueue();
115 server_ = builder.BuildAndStart();
116 }
117
ResetStub()118 void ResetStub() {
119 std::shared_ptr<Channel> channel = grpc::CreateChannel(
120 server_address_.str(), grpc::InsecureChannelCredentials());
121 stub_ = grpc::testing::EchoTestService::NewStub(channel);
122 }
123
SendRpc(int num_rpcs)124 void SendRpc(int num_rpcs) {
125 for (int i = 0; i < num_rpcs; i++) {
126 EchoRequest send_request;
127 EchoRequest recv_request;
128 EchoResponse send_response;
129 EchoResponse recv_response;
130 Status recv_status;
131
132 ClientContext cli_ctx;
133 ServerContext srv_ctx;
134 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
135
136 send_request.set_message("hello non-blocking world");
137 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
138 stub_->PrepareAsyncEcho(&cli_ctx, send_request, cq_.get()));
139
140 response_reader->StartCall();
141 response_reader->Finish(&recv_response, &recv_status, tag(4));
142
143 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
144 cq_.get(), cq_.get(), tag(2));
145
146 void* got_tag;
147 bool ok;
148 EXPECT_TRUE(LoopForTag(&got_tag, &ok));
149 EXPECT_TRUE(ok);
150 EXPECT_EQ(detag(got_tag), 2);
151 EXPECT_EQ(send_request.message(), recv_request.message());
152
153 send_response.set_message(recv_request.message());
154 response_writer.Finish(send_response, Status::OK, tag(3));
155
156 int tagsum = 0;
157 int tagprod = 1;
158 EXPECT_TRUE(LoopForTag(&got_tag, &ok));
159 EXPECT_TRUE(ok);
160 tagsum += detag(got_tag);
161 tagprod *= detag(got_tag);
162
163 EXPECT_TRUE(LoopForTag(&got_tag, &ok));
164 EXPECT_TRUE(ok);
165 tagsum += detag(got_tag);
166 tagprod *= detag(got_tag);
167
168 EXPECT_EQ(tagsum, 7);
169 EXPECT_EQ(tagprod, 12);
170 EXPECT_EQ(send_response.message(), recv_response.message());
171 EXPECT_TRUE(recv_status.ok());
172 }
173 }
174
175 std::unique_ptr<ServerCompletionQueue> cq_;
176 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
177 std::unique_ptr<Server> server_;
178 std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
179 std::ostringstream server_address_;
180 int port_;
181 };
182
TEST_F(NonblockingTest,SimpleRpc)183 TEST_F(NonblockingTest, SimpleRpc) {
184 ResetStub();
185 SendRpc(10);
186 }
187
188 } // namespace
189 } // namespace testing
190 } // namespace grpc
191
192 #endif // GRPC_POSIX_SOCKET
193
main(int argc,char ** argv)194 int main(int argc, char** argv) {
195 #ifdef GRPC_POSIX_SOCKET
196 // Override the poll function before anything else can happen
197 grpc_poll_function = maybe_assert_non_blocking_poll;
198
199 grpc::testing::TestEnvironment env(argc, argv);
200 ::testing::InitGoogleTest(&argc, argv);
201 gpr_tls_init(&g_is_nonblocking_poll);
202
203 // Start the nonblocking poll thread-local variable as false because the
204 // thread that issues RPCs starts by picking a port (which has non-zero
205 // timeout).
206 gpr_tls_set(&g_is_nonblocking_poll, static_cast<intptr_t>(false));
207
208 int ret = RUN_ALL_TESTS();
209 gpr_tls_destroy(&g_is_nonblocking_poll);
210 return ret;
211 #else // GRPC_POSIX_SOCKET
212 return 0;
213 #endif // GRPC_POSIX_SOCKET
214 }
215