1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/iomgr/port.h"
20
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET
23
24 #include "src/core/lib/iomgr/udp_server.h"
25
26 #include <netinet/in.h>
27 #include <string.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/sync.h>
35 #include <grpc/support/time.h>
36
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/gpr/useful.h"
39 #include "src/core/lib/gprpp/memory.h"
40 #include "src/core/lib/iomgr/ev_posix.h"
41 #include "src/core/lib/iomgr/iomgr.h"
42 #include "src/core/lib/iomgr/socket_factory_posix.h"
43 #include "src/core/lib/iomgr/socket_utils_posix.h"
44 #include "test/core/util/test_config.h"
45
46 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x)
47
48 static grpc_pollset* g_pollset;
49 static gpr_mu* g_mu;
50 static int g_number_of_reads = 0;
51 static int g_number_of_writes = 0;
52 static int g_number_of_bytes_read = 0;
53 static int g_number_of_orphan_calls = 0;
54 static int g_number_of_starts = 0;
55
56 int rcv_buf_size = 1024;
57 int snd_buf_size = 1024;
58
59 static int g_num_listeners = 1;
60
61 class TestGrpcUdpHandler : public GrpcUdpHandler {
62 public:
TestGrpcUdpHandler(grpc_fd * emfd,void * user_data)63 TestGrpcUdpHandler(grpc_fd* emfd, void* user_data)
64 : GrpcUdpHandler(emfd, user_data), emfd_(emfd) {
65 g_number_of_starts++;
66 }
~TestGrpcUdpHandler()67 ~TestGrpcUdpHandler() override {}
68
69 protected:
Read()70 bool Read() override {
71 char read_buffer[512];
72 ssize_t byte_count;
73
74 gpr_mu_lock(g_mu);
75 byte_count =
76 recv(grpc_fd_wrapped_fd(emfd()), read_buffer, sizeof(read_buffer), 0);
77
78 g_number_of_reads++;
79 g_number_of_bytes_read += static_cast<int>(byte_count);
80
81 gpr_log(GPR_DEBUG, "receive %zu on handler %p", byte_count, this);
82 GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
83 grpc_pollset_kick(g_pollset, nullptr)));
84 gpr_mu_unlock(g_mu);
85 return false;
86 }
87
OnCanWrite(void * user_data,grpc_closure * notify_on_write_closure)88 void OnCanWrite(void* user_data,
89 grpc_closure* notify_on_write_closure) override {
90 gpr_mu_lock(g_mu);
91 g_number_of_writes++;
92
93 GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
94 grpc_pollset_kick(g_pollset, nullptr)));
95 gpr_mu_unlock(g_mu);
96 }
97
OnFdAboutToOrphan(grpc_closure * orphan_fd_closure,void * user_data)98 void OnFdAboutToOrphan(grpc_closure* orphan_fd_closure,
99 void* user_data) override {
100 gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
101 grpc_fd_wrapped_fd(emfd()));
102 GRPC_CLOSURE_SCHED(orphan_fd_closure, GRPC_ERROR_NONE);
103 g_number_of_orphan_calls++;
104 }
105
emfd()106 grpc_fd* emfd() { return emfd_; }
107
108 private:
109 grpc_fd* emfd_;
110 };
111
112 class TestGrpcUdpHandlerFactory : public GrpcUdpHandlerFactory {
113 public:
CreateUdpHandler(grpc_fd * emfd,void * user_data)114 GrpcUdpHandler* CreateUdpHandler(grpc_fd* emfd, void* user_data) override {
115 gpr_log(GPR_INFO, "create udp handler for fd %d", grpc_fd_wrapped_fd(emfd));
116 return grpc_core::New<TestGrpcUdpHandler>(emfd, user_data);
117 }
118
DestroyUdpHandler(GrpcUdpHandler * handler)119 void DestroyUdpHandler(GrpcUdpHandler* handler) override {
120 gpr_log(GPR_INFO, "Destroy handler");
121 grpc_core::Delete(reinterpret_cast<TestGrpcUdpHandler*>(handler));
122 }
123 };
124
125 TestGrpcUdpHandlerFactory handler_factory;
126
127 struct test_socket_factory {
128 grpc_socket_factory base;
129 int number_of_socket_calls;
130 int number_of_bind_calls;
131 };
132 typedef struct test_socket_factory test_socket_factory;
133
test_socket_factory_socket(grpc_socket_factory * factory,int domain,int type,int protocol)134 static int test_socket_factory_socket(grpc_socket_factory* factory, int domain,
135 int type, int protocol) {
136 test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory);
137 f->number_of_socket_calls++;
138 return socket(domain, type, protocol);
139 }
140
test_socket_factory_bind(grpc_socket_factory * factory,int sockfd,const grpc_resolved_address * addr)141 static int test_socket_factory_bind(grpc_socket_factory* factory, int sockfd,
142 const grpc_resolved_address* addr) {
143 test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory);
144 f->number_of_bind_calls++;
145 return bind(sockfd,
146 reinterpret_cast<struct sockaddr*>(const_cast<char*>(addr->addr)),
147 static_cast<socklen_t>(addr->len));
148 }
149
test_socket_factory_compare(grpc_socket_factory * a,grpc_socket_factory * b)150 static int test_socket_factory_compare(grpc_socket_factory* a,
151 grpc_socket_factory* b) {
152 return GPR_ICMP(a, b);
153 }
154
test_socket_factory_destroy(grpc_socket_factory * factory)155 static void test_socket_factory_destroy(grpc_socket_factory* factory) {
156 test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory);
157 gpr_free(f);
158 }
159
160 static const grpc_socket_factory_vtable test_socket_factory_vtable = {
161 test_socket_factory_socket, test_socket_factory_bind,
162 test_socket_factory_compare, test_socket_factory_destroy};
163
test_socket_factory_create(void)164 static test_socket_factory* test_socket_factory_create(void) {
165 test_socket_factory* factory = static_cast<test_socket_factory*>(
166 gpr_malloc(sizeof(test_socket_factory)));
167 grpc_socket_factory_init(&factory->base, &test_socket_factory_vtable);
168 factory->number_of_socket_calls = 0;
169 factory->number_of_bind_calls = 0;
170 return factory;
171 }
172
destroy_pollset(void * p,grpc_error * error)173 static void destroy_pollset(void* p, grpc_error* error) {
174 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
175 }
176
shutdown_and_destroy_pollset()177 static void shutdown_and_destroy_pollset() {
178 gpr_mu_lock(g_mu);
179 auto closure = GRPC_CLOSURE_CREATE(destroy_pollset, g_pollset,
180 grpc_schedule_on_exec_ctx);
181 grpc_pollset_shutdown(g_pollset, closure);
182 gpr_mu_unlock(g_mu);
183 /* Flush exec_ctx to run |destroyed| */
184 grpc_core::ExecCtx::Get()->Flush();
185 }
186
test_no_op(void)187 static void test_no_op(void) {
188 grpc_pollset_init(g_pollset, &g_mu);
189 grpc_core::ExecCtx exec_ctx;
190 grpc_udp_server* s = grpc_udp_server_create(nullptr);
191 LOG_TEST("test_no_op");
192 grpc_udp_server_destroy(s, nullptr);
193 shutdown_and_destroy_pollset();
194 }
195
test_no_op_with_start(void)196 static void test_no_op_with_start(void) {
197 grpc_pollset_init(g_pollset, &g_mu);
198 grpc_core::ExecCtx exec_ctx;
199 grpc_udp_server* s = grpc_udp_server_create(nullptr);
200 LOG_TEST("test_no_op_with_start");
201 grpc_udp_server_start(s, nullptr, 0, nullptr);
202 grpc_udp_server_destroy(s, nullptr);
203 shutdown_and_destroy_pollset();
204 }
205
test_no_op_with_port(void)206 static void test_no_op_with_port(void) {
207 grpc_pollset_init(g_pollset, &g_mu);
208 g_number_of_orphan_calls = 0;
209 grpc_core::ExecCtx exec_ctx;
210 grpc_resolved_address resolved_addr;
211 struct sockaddr_in* addr =
212 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
213 grpc_udp_server* s = grpc_udp_server_create(nullptr);
214 LOG_TEST("test_no_op_with_port");
215
216 memset(&resolved_addr, 0, sizeof(resolved_addr));
217 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
218 addr->sin_family = AF_INET;
219 GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
220 snd_buf_size, &handler_factory,
221 g_num_listeners));
222
223 grpc_udp_server_destroy(s, nullptr);
224
225 /* The server haven't start listening, so no udp handler to be notified. */
226 GPR_ASSERT(g_number_of_orphan_calls == 0);
227 shutdown_and_destroy_pollset();
228 }
229
test_no_op_with_port_and_socket_factory(void)230 static void test_no_op_with_port_and_socket_factory(void) {
231 grpc_pollset_init(g_pollset, &g_mu);
232 g_number_of_orphan_calls = 0;
233 grpc_core::ExecCtx exec_ctx;
234 grpc_resolved_address resolved_addr;
235 struct sockaddr_in* addr =
236 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
237
238 test_socket_factory* socket_factory = test_socket_factory_create();
239 grpc_arg socket_factory_arg =
240 grpc_socket_factory_to_arg(&socket_factory->base);
241 grpc_channel_args* channel_args =
242 grpc_channel_args_copy_and_add(nullptr, &socket_factory_arg, 1);
243 grpc_udp_server* s = grpc_udp_server_create(channel_args);
244 grpc_channel_args_destroy(channel_args);
245
246 LOG_TEST("test_no_op_with_port_and_socket_factory");
247
248 memset(&resolved_addr, 0, sizeof(resolved_addr));
249 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
250 addr->sin_family = AF_INET;
251 GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
252 snd_buf_size, &handler_factory,
253 g_num_listeners));
254 GPR_ASSERT(socket_factory->number_of_socket_calls == g_num_listeners);
255 GPR_ASSERT(socket_factory->number_of_bind_calls == g_num_listeners);
256
257 grpc_udp_server_destroy(s, nullptr);
258
259 grpc_socket_factory_unref(&socket_factory->base);
260
261 /* The server haven't start listening, so no udp handler to be notified. */
262 GPR_ASSERT(g_number_of_orphan_calls == 0);
263 shutdown_and_destroy_pollset();
264 }
265
test_no_op_with_port_and_start(void)266 static void test_no_op_with_port_and_start(void) {
267 grpc_pollset_init(g_pollset, &g_mu);
268 g_number_of_orphan_calls = 0;
269 grpc_core::ExecCtx exec_ctx;
270 grpc_resolved_address resolved_addr;
271 struct sockaddr_in* addr =
272 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
273 grpc_udp_server* s = grpc_udp_server_create(nullptr);
274 LOG_TEST("test_no_op_with_port_and_start");
275
276 memset(&resolved_addr, 0, sizeof(resolved_addr));
277 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
278 addr->sin_family = AF_INET;
279 GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
280 snd_buf_size, &handler_factory,
281 g_num_listeners));
282
283 grpc_udp_server_start(s, nullptr, 0, nullptr);
284 GPR_ASSERT(g_number_of_starts == g_num_listeners);
285 grpc_udp_server_destroy(s, nullptr);
286
287 /* The server had a single FD, which is orphaned exactly once in *
288 * grpc_udp_server_destroy. */
289 GPR_ASSERT(g_number_of_orphan_calls == g_num_listeners);
290 shutdown_and_destroy_pollset();
291 }
292
test_receive(int number_of_clients)293 static void test_receive(int number_of_clients) {
294 grpc_pollset_init(g_pollset, &g_mu);
295 grpc_core::ExecCtx exec_ctx;
296 grpc_resolved_address resolved_addr;
297 struct sockaddr_storage* addr =
298 reinterpret_cast<struct sockaddr_storage*>(resolved_addr.addr);
299 int clifd, svrfd;
300 grpc_udp_server* s = grpc_udp_server_create(nullptr);
301 int i;
302 grpc_millis deadline;
303 grpc_pollset* pollsets[1];
304 LOG_TEST("test_receive");
305 gpr_log(GPR_INFO, "clients=%d", number_of_clients);
306
307 g_number_of_bytes_read = 0;
308 g_number_of_orphan_calls = 0;
309
310 memset(&resolved_addr, 0, sizeof(resolved_addr));
311 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
312 addr->ss_family = AF_INET;
313 GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
314 snd_buf_size, &handler_factory,
315 g_num_listeners));
316
317 svrfd = grpc_udp_server_get_fd(s, 0);
318 GPR_ASSERT(svrfd >= 0);
319 GPR_ASSERT(getsockname(svrfd, (struct sockaddr*)addr,
320 (socklen_t*)&resolved_addr.len) == 0);
321 GPR_ASSERT(resolved_addr.len <= sizeof(struct sockaddr_storage));
322
323 pollsets[0] = g_pollset;
324 grpc_udp_server_start(s, pollsets, 1, nullptr);
325
326 gpr_mu_lock(g_mu);
327
328 for (i = 0; i < number_of_clients; i++) {
329 deadline =
330 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
331
332 int number_of_bytes_read_before = g_number_of_bytes_read;
333 /* Create a socket, send a packet to the UDP server. */
334 clifd = socket(addr->ss_family, SOCK_DGRAM, 0);
335 GPR_ASSERT(clifd >= 0);
336 GPR_ASSERT(connect(clifd, (struct sockaddr*)addr,
337 (socklen_t)resolved_addr.len) == 0);
338 GPR_ASSERT(5 == write(clifd, "hello", 5));
339 while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) &&
340 deadline > grpc_core::ExecCtx::Get()->Now()) {
341 grpc_pollset_worker* worker = nullptr;
342 GPR_ASSERT(GRPC_LOG_IF_ERROR(
343 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
344 gpr_mu_unlock(g_mu);
345 grpc_core::ExecCtx::Get()->Flush();
346 gpr_mu_lock(g_mu);
347 }
348 close(clifd);
349 }
350 GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients);
351
352 gpr_mu_unlock(g_mu);
353
354 grpc_udp_server_destroy(s, nullptr);
355
356 /* The server had a single FD, which is orphaned exactly once in *
357 * grpc_udp_server_destroy. */
358 GPR_ASSERT(g_number_of_orphan_calls == g_num_listeners);
359 shutdown_and_destroy_pollset();
360 }
361
main(int argc,char ** argv)362 int main(int argc, char** argv) {
363 grpc_test_init(argc, argv);
364 grpc_init();
365 if (grpc_is_socket_reuse_port_supported()) {
366 g_num_listeners = 10;
367 }
368 {
369 grpc_core::ExecCtx exec_ctx;
370 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
371
372 test_no_op();
373 test_no_op_with_start();
374 test_no_op_with_port();
375 test_no_op_with_port_and_socket_factory();
376 test_no_op_with_port_and_start();
377 test_receive(1);
378 test_receive(10);
379
380 gpr_free(g_pollset);
381 }
382 grpc_shutdown();
383 return 0;
384 }
385
386 #else /* GRPC_POSIX_SOCKET */
387
main(int argc,char ** argv)388 int main(int argc, char** argv) { return 1; }
389
390 #endif /* GRPC_POSIX_SOCKET */
391