• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/iomgr/port.h"
20 
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET_UDP_SERVER
23 
24 #include "src/core/lib/iomgr/udp_server.h"
25 
26 #include <netinet/in.h>
27 #include <string.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30 
31 #include <vector>
32 
33 #include <grpc/grpc.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/sync.h>
37 #include <grpc/support/time.h>
38 
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/gpr/useful.h"
41 #include "src/core/lib/gprpp/memory.h"
42 #include "src/core/lib/iomgr/ev_posix.h"
43 #include "src/core/lib/iomgr/iomgr.h"
44 #include "src/core/lib/iomgr/socket_factory_posix.h"
45 #include "src/core/lib/iomgr/socket_utils_posix.h"
46 #include "test/core/util/test_config.h"
47 
48 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x)
49 
50 static grpc_pollset* g_pollset;
51 static gpr_mu* g_mu;
52 static int g_number_of_reads = 0;
53 static int g_number_of_writes = 0;
54 static int g_number_of_bytes_read = 0;
55 static int g_number_of_orphan_calls = 0;
56 static int g_number_of_starts = 0;
57 
58 int rcv_buf_size = 1024;
59 int snd_buf_size = 1024;
60 
61 static int g_num_listeners = 1;
62 
63 class TestGrpcUdpHandler : public GrpcUdpHandler {
64  public:
TestGrpcUdpHandler(grpc_fd * emfd,void * user_data)65   TestGrpcUdpHandler(grpc_fd* emfd, void* user_data)
66       : GrpcUdpHandler(emfd, user_data), emfd_(emfd) {
67     g_number_of_starts++;
68   }
~TestGrpcUdpHandler()69   ~TestGrpcUdpHandler() override {}
70 
71  protected:
Read()72   bool Read() override {
73     char read_buffer[512];
74     ssize_t byte_count;
75 
76     gpr_mu_lock(g_mu);
77     byte_count =
78         recv(grpc_fd_wrapped_fd(emfd()), read_buffer, sizeof(read_buffer), 0);
79 
80     g_number_of_reads++;
81     g_number_of_bytes_read += static_cast<int>(byte_count);
82 
83     gpr_log(GPR_DEBUG, "receive %zu on handler %p", byte_count, this);
84     GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
85                                  grpc_pollset_kick(g_pollset, nullptr)));
86     gpr_mu_unlock(g_mu);
87     return false;
88   }
89 
OnCanWrite(void *,grpc_closure *)90   void OnCanWrite(void* /*user_data*/,
91                   grpc_closure* /*notify_on_write_closure*/) override {
92     gpr_mu_lock(g_mu);
93     g_number_of_writes++;
94 
95     GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
96                                  grpc_pollset_kick(g_pollset, nullptr)));
97     gpr_mu_unlock(g_mu);
98   }
99 
OnFdAboutToOrphan(grpc_closure * orphan_fd_closure,void *)100   void OnFdAboutToOrphan(grpc_closure* orphan_fd_closure,
101                          void* /*user_data*/) override {
102     gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
103             grpc_fd_wrapped_fd(emfd()));
104     grpc_core::ExecCtx::Run(DEBUG_LOCATION, orphan_fd_closure, GRPC_ERROR_NONE);
105     g_number_of_orphan_calls++;
106   }
107 
emfd()108   grpc_fd* emfd() { return emfd_; }
109 
110  private:
111   grpc_fd* emfd_;
112 };
113 
114 class TestGrpcUdpHandlerFactory : public GrpcUdpHandlerFactory {
115  public:
CreateUdpHandler(grpc_fd * emfd,void * user_data)116   GrpcUdpHandler* CreateUdpHandler(grpc_fd* emfd, void* user_data) override {
117     gpr_log(GPR_INFO, "create udp handler for fd %d", grpc_fd_wrapped_fd(emfd));
118     return new TestGrpcUdpHandler(emfd, user_data);
119   }
120 
DestroyUdpHandler(GrpcUdpHandler * handler)121   void DestroyUdpHandler(GrpcUdpHandler* handler) override {
122     gpr_log(GPR_INFO, "Destroy handler");
123     delete reinterpret_cast<TestGrpcUdpHandler*>(handler);
124   }
125 };
126 
127 TestGrpcUdpHandlerFactory handler_factory;
128 
129 struct test_socket_factory {
130   grpc_socket_factory base;
131   int number_of_socket_calls;
132   int number_of_bind_calls;
133 };
134 typedef struct test_socket_factory test_socket_factory;
135 
test_socket_factory_socket(grpc_socket_factory * factory,int domain,int type,int protocol)136 static int test_socket_factory_socket(grpc_socket_factory* factory, int domain,
137                                       int type, int protocol) {
138   test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory);
139   f->number_of_socket_calls++;
140   return socket(domain, type, protocol);
141 }
142 
test_socket_factory_bind(grpc_socket_factory * factory,int sockfd,const grpc_resolved_address * addr)143 static int test_socket_factory_bind(grpc_socket_factory* factory, int sockfd,
144                                     const grpc_resolved_address* addr) {
145   test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory);
146   f->number_of_bind_calls++;
147   return bind(sockfd,
148               reinterpret_cast<struct sockaddr*>(const_cast<char*>(addr->addr)),
149               static_cast<socklen_t>(addr->len));
150 }
151 
test_socket_factory_compare(grpc_socket_factory * a,grpc_socket_factory * b)152 static int test_socket_factory_compare(grpc_socket_factory* a,
153                                        grpc_socket_factory* b) {
154   return GPR_ICMP(a, b);
155 }
156 
test_socket_factory_destroy(grpc_socket_factory * factory)157 static void test_socket_factory_destroy(grpc_socket_factory* factory) {
158   test_socket_factory* f = reinterpret_cast<test_socket_factory*>(factory);
159   gpr_free(f);
160 }
161 
162 static const grpc_socket_factory_vtable test_socket_factory_vtable = {
163     test_socket_factory_socket, test_socket_factory_bind,
164     test_socket_factory_compare, test_socket_factory_destroy};
165 
test_socket_factory_create(void)166 static test_socket_factory* test_socket_factory_create(void) {
167   test_socket_factory* factory = static_cast<test_socket_factory*>(
168       gpr_malloc(sizeof(test_socket_factory)));
169   grpc_socket_factory_init(&factory->base, &test_socket_factory_vtable);
170   factory->number_of_socket_calls = 0;
171   factory->number_of_bind_calls = 0;
172   return factory;
173 }
174 
destroy_pollset(void * p,grpc_error *)175 static void destroy_pollset(void* p, grpc_error* /*error*/) {
176   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
177 }
178 
shutdown_and_destroy_pollset()179 static void shutdown_and_destroy_pollset() {
180   gpr_mu_lock(g_mu);
181   auto closure = GRPC_CLOSURE_CREATE(destroy_pollset, g_pollset,
182                                      grpc_schedule_on_exec_ctx);
183   grpc_pollset_shutdown(g_pollset, closure);
184   gpr_mu_unlock(g_mu);
185   /* Flush exec_ctx to run |destroyed| */
186   grpc_core::ExecCtx::Get()->Flush();
187 }
188 
test_no_op(void)189 static void test_no_op(void) {
190   grpc_pollset_init(g_pollset, &g_mu);
191   grpc_core::ExecCtx exec_ctx;
192   grpc_udp_server* s = grpc_udp_server_create(nullptr);
193   LOG_TEST("test_no_op");
194   grpc_udp_server_destroy(s, nullptr);
195   shutdown_and_destroy_pollset();
196 }
197 
test_no_op_with_start(void)198 static void test_no_op_with_start(void) {
199   grpc_pollset_init(g_pollset, &g_mu);
200   grpc_core::ExecCtx exec_ctx;
201   grpc_udp_server* s = grpc_udp_server_create(nullptr);
202   LOG_TEST("test_no_op_with_start");
203   std::vector<grpc_pollset*> empty_pollset;
204   grpc_udp_server_start(s, &empty_pollset, nullptr);
205   grpc_udp_server_destroy(s, nullptr);
206   shutdown_and_destroy_pollset();
207 }
208 
test_no_op_with_port(void)209 static void test_no_op_with_port(void) {
210   grpc_pollset_init(g_pollset, &g_mu);
211   g_number_of_orphan_calls = 0;
212   grpc_core::ExecCtx exec_ctx;
213   grpc_resolved_address resolved_addr;
214   struct sockaddr_in* addr =
215       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
216   grpc_udp_server* s = grpc_udp_server_create(nullptr);
217   LOG_TEST("test_no_op_with_port");
218 
219   memset(&resolved_addr, 0, sizeof(resolved_addr));
220   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
221   addr->sin_family = AF_INET;
222   GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
223                                       snd_buf_size, &handler_factory,
224                                       g_num_listeners) > 0);
225 
226   grpc_udp_server_destroy(s, nullptr);
227 
228   /* The server haven't start listening, so no udp handler to be notified. */
229   GPR_ASSERT(g_number_of_orphan_calls == 0);
230   shutdown_and_destroy_pollset();
231 }
232 
test_no_op_with_port_and_socket_factory(void)233 static void test_no_op_with_port_and_socket_factory(void) {
234   grpc_pollset_init(g_pollset, &g_mu);
235   g_number_of_orphan_calls = 0;
236   grpc_core::ExecCtx exec_ctx;
237   grpc_resolved_address resolved_addr;
238   struct sockaddr_in* addr =
239       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
240 
241   test_socket_factory* socket_factory = test_socket_factory_create();
242   grpc_arg socket_factory_arg =
243       grpc_socket_factory_to_arg(&socket_factory->base);
244   grpc_channel_args* channel_args =
245       grpc_channel_args_copy_and_add(nullptr, &socket_factory_arg, 1);
246   grpc_udp_server* s = grpc_udp_server_create(channel_args);
247   grpc_channel_args_destroy(channel_args);
248 
249   LOG_TEST("test_no_op_with_port_and_socket_factory");
250 
251   memset(&resolved_addr, 0, sizeof(resolved_addr));
252   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
253   addr->sin_family = AF_INET;
254   GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
255                                       snd_buf_size, &handler_factory,
256                                       g_num_listeners) > 0);
257   GPR_ASSERT(socket_factory->number_of_socket_calls == g_num_listeners);
258   GPR_ASSERT(socket_factory->number_of_bind_calls == g_num_listeners);
259 
260   grpc_udp_server_destroy(s, nullptr);
261 
262   grpc_socket_factory_unref(&socket_factory->base);
263 
264   /* The server haven't start listening, so no udp handler to be notified. */
265   GPR_ASSERT(g_number_of_orphan_calls == 0);
266   shutdown_and_destroy_pollset();
267 }
268 
test_no_op_with_port_and_start(void)269 static void test_no_op_with_port_and_start(void) {
270   grpc_pollset_init(g_pollset, &g_mu);
271   g_number_of_orphan_calls = 0;
272   grpc_core::ExecCtx exec_ctx;
273   grpc_resolved_address resolved_addr;
274   struct sockaddr_in* addr =
275       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
276   grpc_udp_server* s = grpc_udp_server_create(nullptr);
277   LOG_TEST("test_no_op_with_port_and_start");
278 
279   memset(&resolved_addr, 0, sizeof(resolved_addr));
280   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
281   addr->sin_family = AF_INET;
282   GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
283                                       snd_buf_size, &handler_factory,
284                                       g_num_listeners) > 0);
285 
286   std::vector<grpc_pollset*> empty_pollset;
287   grpc_udp_server_start(s, &empty_pollset, nullptr);
288   GPR_ASSERT(g_number_of_starts == g_num_listeners);
289   grpc_udp_server_destroy(s, nullptr);
290 
291   /* The server had a single FD, which is orphaned exactly once in *
292    * grpc_udp_server_destroy. */
293   GPR_ASSERT(g_number_of_orphan_calls == g_num_listeners);
294   shutdown_and_destroy_pollset();
295 }
296 
test_receive(int number_of_clients)297 static void test_receive(int number_of_clients) {
298   grpc_pollset_init(g_pollset, &g_mu);
299   grpc_core::ExecCtx exec_ctx;
300   grpc_resolved_address resolved_addr;
301   struct sockaddr_storage* addr =
302       reinterpret_cast<struct sockaddr_storage*>(resolved_addr.addr);
303   int clifd, svrfd;
304   grpc_udp_server* s = grpc_udp_server_create(nullptr);
305   int i;
306   grpc_millis deadline;
307   LOG_TEST("test_receive");
308   gpr_log(GPR_INFO, "clients=%d", number_of_clients);
309 
310   g_number_of_bytes_read = 0;
311   g_number_of_orphan_calls = 0;
312 
313   memset(&resolved_addr, 0, sizeof(resolved_addr));
314   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
315   addr->ss_family = AF_INET;
316   GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
317                                       snd_buf_size, &handler_factory,
318                                       g_num_listeners) > 0);
319 
320   svrfd = grpc_udp_server_get_fd(s, 0);
321   GPR_ASSERT(svrfd >= 0);
322   GPR_ASSERT(getsockname(svrfd, (struct sockaddr*)addr,
323                          (socklen_t*)&resolved_addr.len) == 0);
324   GPR_ASSERT(resolved_addr.len <= sizeof(struct sockaddr_storage));
325 
326   std::vector<grpc_pollset*> test_pollsets;
327   test_pollsets.emplace_back(g_pollset);
328   grpc_udp_server_start(s, &test_pollsets, nullptr);
329 
330   gpr_mu_lock(g_mu);
331 
332   for (i = 0; i < number_of_clients; i++) {
333     deadline =
334         grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
335 
336     int number_of_bytes_read_before = g_number_of_bytes_read;
337     /* Create a socket, send a packet to the UDP server. */
338     clifd = socket(addr->ss_family, SOCK_DGRAM, 0);
339     GPR_ASSERT(clifd >= 0);
340     GPR_ASSERT(connect(clifd, (struct sockaddr*)addr,
341                        (socklen_t)resolved_addr.len) == 0);
342     GPR_ASSERT(5 == write(clifd, "hello", 5));
343     while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) &&
344            deadline > grpc_core::ExecCtx::Get()->Now()) {
345       grpc_pollset_worker* worker = nullptr;
346       GPR_ASSERT(GRPC_LOG_IF_ERROR(
347           "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
348       gpr_mu_unlock(g_mu);
349       grpc_core::ExecCtx::Get()->Flush();
350       gpr_mu_lock(g_mu);
351     }
352     close(clifd);
353   }
354   GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients);
355 
356   gpr_mu_unlock(g_mu);
357 
358   grpc_udp_server_destroy(s, nullptr);
359 
360   /* The server had a single FD, which is orphaned exactly once in *
361    * grpc_udp_server_destroy. */
362   GPR_ASSERT(g_number_of_orphan_calls == g_num_listeners);
363   shutdown_and_destroy_pollset();
364 }
365 
main(int argc,char ** argv)366 int main(int argc, char** argv) {
367   grpc::testing::TestEnvironment env(argc, argv);
368   grpc_init();
369   if (grpc_is_socket_reuse_port_supported()) {
370     g_num_listeners = 10;
371   }
372   {
373     grpc_core::ExecCtx exec_ctx;
374     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
375 
376     test_no_op();
377     test_no_op_with_start();
378     test_no_op_with_port();
379     test_no_op_with_port_and_socket_factory();
380     test_no_op_with_port_and_start();
381     test_receive(1);
382     test_receive(10);
383 
384     gpr_free(g_pollset);
385   }
386   grpc_shutdown();
387   return 0;
388 }
389 
390 #else /* GRPC_POSIX_SOCKET_UDP_SERVER */
391 
main(int argc,char ** argv)392 int main(int argc, char** argv) { return 1; }
393 
394 #endif /* GRPC_POSIX_SOCKET_UDP_SERVER */
395