1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/iomgr/port.h"
20
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET_UDP_SERVER
23
24 #include "src/core/lib/iomgr/udp_server.h"
25
26 #include <netinet/in.h>
27 #include <string.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 #include <vector>
32
33 #include <grpc/grpc.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/sync.h>
37 #include <grpc/support/time.h>
38
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/gpr/useful.h"
41 #include "src/core/lib/gprpp/memory.h"
42 #include "src/core/lib/iomgr/ev_posix.h"
43 #include "src/core/lib/iomgr/iomgr.h"
44 #include "src/core/lib/iomgr/socket_factory_posix.h"
45 #include "src/core/lib/iomgr/socket_utils_posix.h"
46 #include "test/core/util/test_config.h"
47
48 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x)
49
50 static grpc_pollset* g_pollset;
51 static gpr_mu* g_mu;
52 static int g_number_of_reads = 0;
53 static int g_number_of_writes = 0;
54 static int g_number_of_bytes_read = 0;
55 static int g_number_of_orphan_calls = 0;
56 static int g_number_of_starts = 0;
57
58 int rcv_buf_size = 1024;
59 int snd_buf_size = 1024;
60
61 static int g_num_listeners = 1;
62
63 class TestGrpcUdpHandler : public GrpcUdpHandler {
64 public:
TestGrpcUdpHandler(grpc_fd * emfd,void * user_data)65 TestGrpcUdpHandler(grpc_fd* emfd, void* user_data)
66 : GrpcUdpHandler(emfd, user_data), emfd_(emfd) {
67 g_number_of_starts++;
68 }
~TestGrpcUdpHandler()69 ~TestGrpcUdpHandler() override {}
70
71 protected:
Read()72 bool Read() override {
73 char read_buffer[512];
74 ssize_t byte_count;
75
76 gpr_mu_lock(g_mu);
77 byte_count =
78 recv(grpc_fd_wrapped_fd(emfd()), read_buffer, sizeof(read_buffer), 0);
79
80 g_number_of_reads++;
81 g_number_of_bytes_read += static_cast<int>(byte_count);
82
83 gpr_log(GPR_DEBUG, "receive %zu on handler %p", byte_count, this);
84 GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
85 grpc_pollset_kick(g_pollset, nullptr)));
86 gpr_mu_unlock(g_mu);
87 return false;
88 }
89
OnCanWrite(void *,grpc_closure *)90 void OnCanWrite(void* /*user_data*/,
91 grpc_closure* /*notify_on_write_closure*/) override {
92 gpr_mu_lock(g_mu);
93 g_number_of_writes++;
94
95 GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
96 grpc_pollset_kick(g_pollset, nullptr)));
97 gpr_mu_unlock(g_mu);
98 }
99
OnFdAboutToOrphan(grpc_closure * orphan_fd_closure,void *)100 void OnFdAboutToOrphan(grpc_closure* orphan_fd_closure,
101 void* /*user_data*/) override {
102 gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
103 grpc_fd_wrapped_fd(emfd()));
104 grpc_core::ExecCtx::Run(DEBUG_LOCATION, orphan_fd_closure, GRPC_ERROR_NONE);
105 g_number_of_orphan_calls++;
106 }
107
emfd()108 grpc_fd* emfd() { return emfd_; }
109
110 private:
111 grpc_fd* emfd_;
112 };
113
114 class TestGrpcUdpHandlerFactory : public GrpcUdpHandlerFactory {
115 public:
CreateUdpHandler(grpc_fd * emfd,void * user_data)116 GrpcUdpHandler* CreateUdpHandler(grpc_fd* emfd, void* user_data) override {
117 gpr_log(GPR_INFO, "create udp handler for fd %d", grpc_fd_wrapped_fd(emfd));
118 return new TestGrpcUdpHandler(emfd, user_data);
119 }
120
DestroyUdpHandler(GrpcUdpHandler * handler)121 void DestroyUdpHandler(GrpcUdpHandler* handler) override {
122 gpr_log(GPR_INFO, "Destroy handler");
123 delete reinterpret_cast<TestGrpcUdpHandler*>(handler);
124 }
125 };
126
127 TestGrpcUdpHandlerFactory handler_factory;
128
129 struct test_socket_factory {
130 grpc_socket_factory base;
131 int number_of_socket_calls;
132 int number_of_bind_calls;
133 };
134 typedef struct test_socket_factory test_socket_factory;
135
test_socket_factory_socket(grpc_socket_factory * factory,int domain,int type,int protocol)136 static int test_socket_factory_socket(grpc_socket_factory* factory, int domain,
137 int type, int protocol) {
138 test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory);
139 f->number_of_socket_calls++;
140 return socket(domain, type, protocol);
141 }
142
test_socket_factory_bind(grpc_socket_factory * factory,int sockfd,const grpc_resolved_address * addr)143 static int test_socket_factory_bind(grpc_socket_factory* factory, int sockfd,
144 const grpc_resolved_address* addr) {
145 test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory);
146 f->number_of_bind_calls++;
147 return bind(sockfd,
148 reinterpret_cast<struct sockaddr*>(const_cast<char*>(addr->addr)),
149 static_cast<socklen_t>(addr->len));
150 }
151
test_socket_factory_compare(grpc_socket_factory * a,grpc_socket_factory * b)152 static int test_socket_factory_compare(grpc_socket_factory* a,
153 grpc_socket_factory* b) {
154 return GPR_ICMP(a, b);
155 }
156
test_socket_factory_destroy(grpc_socket_factory * factory)157 static void test_socket_factory_destroy(grpc_socket_factory* factory) {
158 test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory);
159 gpr_free(f);
160 }
161
162 static const grpc_socket_factory_vtable test_socket_factory_vtable = {
163 test_socket_factory_socket, test_socket_factory_bind,
164 test_socket_factory_compare, test_socket_factory_destroy};
165
test_socket_factory_create(void)166 static test_socket_factory* test_socket_factory_create(void) {
167 test_socket_factory* factory = static_cast<test_socket_factory*>(
168 gpr_malloc(sizeof(test_socket_factory)));
169 grpc_socket_factory_init(&factory->base, &test_socket_factory_vtable);
170 factory->number_of_socket_calls = 0;
171 factory->number_of_bind_calls = 0;
172 return factory;
173 }
174
destroy_pollset(void * p,grpc_error *)175 static void destroy_pollset(void* p, grpc_error* /*error*/) {
176 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
177 }
178
shutdown_and_destroy_pollset()179 static void shutdown_and_destroy_pollset() {
180 gpr_mu_lock(g_mu);
181 auto closure = GRPC_CLOSURE_CREATE(destroy_pollset, g_pollset,
182 grpc_schedule_on_exec_ctx);
183 grpc_pollset_shutdown(g_pollset, closure);
184 gpr_mu_unlock(g_mu);
185 /* Flush exec_ctx to run |destroyed| */
186 grpc_core::ExecCtx::Get()->Flush();
187 }
188
test_no_op(void)189 static void test_no_op(void) {
190 grpc_pollset_init(g_pollset, &g_mu);
191 grpc_core::ExecCtx exec_ctx;
192 grpc_udp_server* s = grpc_udp_server_create(nullptr);
193 LOG_TEST("test_no_op");
194 grpc_udp_server_destroy(s, nullptr);
195 shutdown_and_destroy_pollset();
196 }
197
test_no_op_with_start(void)198 static void test_no_op_with_start(void) {
199 grpc_pollset_init(g_pollset, &g_mu);
200 grpc_core::ExecCtx exec_ctx;
201 grpc_udp_server* s = grpc_udp_server_create(nullptr);
202 LOG_TEST("test_no_op_with_start");
203 std::vector<grpc_pollset*> empty_pollset;
204 grpc_udp_server_start(s, &empty_pollset, nullptr);
205 grpc_udp_server_destroy(s, nullptr);
206 shutdown_and_destroy_pollset();
207 }
208
test_no_op_with_port(void)209 static void test_no_op_with_port(void) {
210 grpc_pollset_init(g_pollset, &g_mu);
211 g_number_of_orphan_calls = 0;
212 grpc_core::ExecCtx exec_ctx;
213 grpc_resolved_address resolved_addr;
214 struct sockaddr_in* addr =
215 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
216 grpc_udp_server* s = grpc_udp_server_create(nullptr);
217 LOG_TEST("test_no_op_with_port");
218
219 memset(&resolved_addr, 0, sizeof(resolved_addr));
220 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
221 addr->sin_family = AF_INET;
222 GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
223 snd_buf_size, &handler_factory,
224 g_num_listeners) > 0);
225
226 grpc_udp_server_destroy(s, nullptr);
227
228 /* The server haven't start listening, so no udp handler to be notified. */
229 GPR_ASSERT(g_number_of_orphan_calls == 0);
230 shutdown_and_destroy_pollset();
231 }
232
test_no_op_with_port_and_socket_factory(void)233 static void test_no_op_with_port_and_socket_factory(void) {
234 grpc_pollset_init(g_pollset, &g_mu);
235 g_number_of_orphan_calls = 0;
236 grpc_core::ExecCtx exec_ctx;
237 grpc_resolved_address resolved_addr;
238 struct sockaddr_in* addr =
239 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
240
241 test_socket_factory* socket_factory = test_socket_factory_create();
242 grpc_arg socket_factory_arg =
243 grpc_socket_factory_to_arg(&socket_factory->base);
244 grpc_channel_args* channel_args =
245 grpc_channel_args_copy_and_add(nullptr, &socket_factory_arg, 1);
246 grpc_udp_server* s = grpc_udp_server_create(channel_args);
247 grpc_channel_args_destroy(channel_args);
248
249 LOG_TEST("test_no_op_with_port_and_socket_factory");
250
251 memset(&resolved_addr, 0, sizeof(resolved_addr));
252 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
253 addr->sin_family = AF_INET;
254 GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
255 snd_buf_size, &handler_factory,
256 g_num_listeners) > 0);
257 GPR_ASSERT(socket_factory->number_of_socket_calls == g_num_listeners);
258 GPR_ASSERT(socket_factory->number_of_bind_calls == g_num_listeners);
259
260 grpc_udp_server_destroy(s, nullptr);
261
262 grpc_socket_factory_unref(&socket_factory->base);
263
264 /* The server haven't start listening, so no udp handler to be notified. */
265 GPR_ASSERT(g_number_of_orphan_calls == 0);
266 shutdown_and_destroy_pollset();
267 }
268
test_no_op_with_port_and_start(void)269 static void test_no_op_with_port_and_start(void) {
270 grpc_pollset_init(g_pollset, &g_mu);
271 g_number_of_orphan_calls = 0;
272 grpc_core::ExecCtx exec_ctx;
273 grpc_resolved_address resolved_addr;
274 struct sockaddr_in* addr =
275 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
276 grpc_udp_server* s = grpc_udp_server_create(nullptr);
277 LOG_TEST("test_no_op_with_port_and_start");
278
279 memset(&resolved_addr, 0, sizeof(resolved_addr));
280 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
281 addr->sin_family = AF_INET;
282 GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
283 snd_buf_size, &handler_factory,
284 g_num_listeners) > 0);
285
286 std::vector<grpc_pollset*> empty_pollset;
287 grpc_udp_server_start(s, &empty_pollset, nullptr);
288 GPR_ASSERT(g_number_of_starts == g_num_listeners);
289 grpc_udp_server_destroy(s, nullptr);
290
291 /* The server had a single FD, which is orphaned exactly once in *
292 * grpc_udp_server_destroy. */
293 GPR_ASSERT(g_number_of_orphan_calls == g_num_listeners);
294 shutdown_and_destroy_pollset();
295 }
296
test_receive(int number_of_clients)297 static void test_receive(int number_of_clients) {
298 grpc_pollset_init(g_pollset, &g_mu);
299 grpc_core::ExecCtx exec_ctx;
300 grpc_resolved_address resolved_addr;
301 struct sockaddr_storage* addr =
302 reinterpret_cast<struct sockaddr_storage*>(resolved_addr.addr);
303 int clifd, svrfd;
304 grpc_udp_server* s = grpc_udp_server_create(nullptr);
305 int i;
306 grpc_millis deadline;
307 LOG_TEST("test_receive");
308 gpr_log(GPR_INFO, "clients=%d", number_of_clients);
309
310 g_number_of_bytes_read = 0;
311 g_number_of_orphan_calls = 0;
312
313 memset(&resolved_addr, 0, sizeof(resolved_addr));
314 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
315 addr->ss_family = AF_INET;
316 GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
317 snd_buf_size, &handler_factory,
318 g_num_listeners) > 0);
319
320 svrfd = grpc_udp_server_get_fd(s, 0);
321 GPR_ASSERT(svrfd >= 0);
322 GPR_ASSERT(getsockname(svrfd, (struct sockaddr*)addr,
323 (socklen_t*)&resolved_addr.len) == 0);
324 GPR_ASSERT(resolved_addr.len <= sizeof(struct sockaddr_storage));
325
326 std::vector<grpc_pollset*> test_pollsets;
327 test_pollsets.emplace_back(g_pollset);
328 grpc_udp_server_start(s, &test_pollsets, nullptr);
329
330 gpr_mu_lock(g_mu);
331
332 for (i = 0; i < number_of_clients; i++) {
333 deadline =
334 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
335
336 int number_of_bytes_read_before = g_number_of_bytes_read;
337 /* Create a socket, send a packet to the UDP server. */
338 clifd = socket(addr->ss_family, SOCK_DGRAM, 0);
339 GPR_ASSERT(clifd >= 0);
340 GPR_ASSERT(connect(clifd, (struct sockaddr*)addr,
341 (socklen_t)resolved_addr.len) == 0);
342 GPR_ASSERT(5 == write(clifd, "hello", 5));
343 while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) &&
344 deadline > grpc_core::ExecCtx::Get()->Now()) {
345 grpc_pollset_worker* worker = nullptr;
346 GPR_ASSERT(GRPC_LOG_IF_ERROR(
347 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
348 gpr_mu_unlock(g_mu);
349 grpc_core::ExecCtx::Get()->Flush();
350 gpr_mu_lock(g_mu);
351 }
352 close(clifd);
353 }
354 GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients);
355
356 gpr_mu_unlock(g_mu);
357
358 grpc_udp_server_destroy(s, nullptr);
359
360 /* The server had a single FD, which is orphaned exactly once in *
361 * grpc_udp_server_destroy. */
362 GPR_ASSERT(g_number_of_orphan_calls == g_num_listeners);
363 shutdown_and_destroy_pollset();
364 }
365
main(int argc,char ** argv)366 int main(int argc, char** argv) {
367 grpc::testing::TestEnvironment env(argc, argv);
368 grpc_init();
369 if (grpc_is_socket_reuse_port_supported()) {
370 g_num_listeners = 10;
371 }
372 {
373 grpc_core::ExecCtx exec_ctx;
374 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
375
376 test_no_op();
377 test_no_op_with_start();
378 test_no_op_with_port();
379 test_no_op_with_port_and_socket_factory();
380 test_no_op_with_port_and_start();
381 test_receive(1);
382 test_receive(10);
383
384 gpr_free(g_pollset);
385 }
386 grpc_shutdown();
387 return 0;
388 }
389
390 #else /* GRPC_POSIX_SOCKET_UDP_SERVER */
391
main(int argc,char ** argv)392 int main(int argc, char** argv) { return 1; }
393
394 #endif /* GRPC_POSIX_SOCKET_UDP_SERVER */
395