• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/iomgr/port.h"
20 
21 // This test won't work except with libuv
22 #ifdef GRPC_UV
23 
24 #include <uv.h>
25 
26 #include "src/core/lib/iomgr/tcp_server.h"
27 
28 #include <string.h>
29 
30 #include <grpc/grpc.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/sync.h>
34 #include <grpc/support/time.h>
35 
36 #include "src/core/lib/iomgr/iomgr.h"
37 #include "src/core/lib/iomgr/resolve_address.h"
38 #include "src/core/lib/iomgr/sockaddr_utils.h"
39 #include "test/core/util/port.h"
40 #include "test/core/util/test_config.h"
41 
42 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x)
43 
44 static gpr_mu* g_mu;
45 static grpc_pollset* g_pollset;
46 static int g_nconnects = 0;
47 
48 typedef struct on_connect_result {
49   /* Owns a ref to server. */
50   grpc_tcp_server* server;
51   unsigned port_index;
52   unsigned fd_index;
53 } on_connect_result;
54 
55 typedef struct server_weak_ref {
56   grpc_tcp_server* server;
57 
58   /* arg is this server_weak_ref. */
59   grpc_closure server_shutdown;
60 } server_weak_ref;
61 
62 static on_connect_result g_result = {NULL, 0, 0};
63 
on_connect_result_init(on_connect_result * result)64 static void on_connect_result_init(on_connect_result* result) {
65   result->server = NULL;
66   result->port_index = 0;
67   result->fd_index = 0;
68 }
69 
on_connect_result_set(on_connect_result * result,const grpc_tcp_server_acceptor * acceptor)70 static void on_connect_result_set(on_connect_result* result,
71                                   const grpc_tcp_server_acceptor* acceptor) {
72   result->server = grpc_tcp_server_ref(acceptor->from_server);
73   result->port_index = acceptor->port_index;
74   result->fd_index = acceptor->fd_index;
75 }
76 
server_weak_ref_shutdown(void * arg,grpc_error * error)77 static void server_weak_ref_shutdown(void* arg, grpc_error* error) {
78   server_weak_ref* weak_ref = static_cast<server_weak_ref*>(arg);
79   weak_ref->server = NULL;
80 }
81 
server_weak_ref_init(server_weak_ref * weak_ref)82 static void server_weak_ref_init(server_weak_ref* weak_ref) {
83   weak_ref->server = NULL;
84   GRPC_CLOSURE_INIT(&weak_ref->server_shutdown, server_weak_ref_shutdown,
85                     weak_ref, grpc_schedule_on_exec_ctx);
86 }
87 
88 /* Make weak_ref->server_shutdown a shutdown_starting cb on server.
89    grpc_tcp_server promises that the server object will live until
90    weak_ref->server_shutdown has returned. A strong ref on grpc_tcp_server
91    should be held until server_weak_ref_set() returns to avoid a race where the
92    server is deleted before the shutdown_starting cb is added. */
server_weak_ref_set(server_weak_ref * weak_ref,grpc_tcp_server * server)93 static void server_weak_ref_set(server_weak_ref* weak_ref,
94                                 grpc_tcp_server* server) {
95   grpc_tcp_server_shutdown_starting_add(server, &weak_ref->server_shutdown);
96   weak_ref->server = server;
97 }
98 
on_connect(void * arg,grpc_endpoint * tcp,grpc_pollset * pollset,grpc_tcp_server_acceptor * acceptor)99 static void on_connect(void* arg, grpc_endpoint* tcp, grpc_pollset* pollset,
100                        grpc_tcp_server_acceptor* acceptor) {
101   grpc_endpoint_shutdown(tcp,
102                          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected"));
103   grpc_endpoint_destroy(tcp);
104 
105   on_connect_result temp_result;
106   on_connect_result_set(&temp_result, acceptor);
107   gpr_free(acceptor);
108 
109   gpr_mu_lock(g_mu);
110   g_result = temp_result;
111   g_nconnects++;
112   GPR_ASSERT(
113       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
114   gpr_mu_unlock(g_mu);
115 }
116 
test_no_op(void)117 static void test_no_op(void) {
118   grpc_core::ExecCtx exec_ctx;
119   grpc_tcp_server* s;
120   GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
121   grpc_tcp_server_unref(s);
122 }
123 
test_no_op_with_start(void)124 static void test_no_op_with_start(void) {
125   grpc_core::ExecCtx exec_ctx;
126   grpc_tcp_server* s;
127   GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
128   LOG_TEST("test_no_op_with_start");
129   grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
130   grpc_tcp_server_unref(s);
131 }
132 
test_no_op_with_port(void)133 static void test_no_op_with_port(void) {
134   grpc_core::ExecCtx exec_ctx;
135   grpc_resolved_address resolved_addr;
136   struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr;
137   grpc_tcp_server* s;
138   GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
139   LOG_TEST("test_no_op_with_port");
140 
141   memset(&resolved_addr, 0, sizeof(resolved_addr));
142   resolved_addr.len = sizeof(struct sockaddr_in);
143   addr->sin_family = AF_INET;
144   int port;
145   GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr, &port) ==
146                  GRPC_ERROR_NONE &&
147              port > 0);
148 
149   grpc_tcp_server_unref(s);
150 }
151 
test_no_op_with_port_and_start(void)152 static void test_no_op_with_port_and_start(void) {
153   grpc_core::ExecCtx exec_ctx;
154   grpc_resolved_address resolved_addr;
155   struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr;
156   grpc_tcp_server* s;
157   GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
158   LOG_TEST("test_no_op_with_port_and_start");
159   int port;
160 
161   memset(&resolved_addr, 0, sizeof(resolved_addr));
162   resolved_addr.len = sizeof(struct sockaddr_in);
163   addr->sin_family = AF_INET;
164   GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr, &port) ==
165                  GRPC_ERROR_NONE &&
166              port > 0);
167 
168   grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
169 
170   grpc_tcp_server_unref(s);
171 }
172 
connect_cb(uv_connect_t * req,int status)173 static void connect_cb(uv_connect_t* req, int status) {
174   GPR_ASSERT(status == 0);
175   gpr_free(req);
176 }
177 
close_cb(uv_handle_t * handle)178 static void close_cb(uv_handle_t* handle) { gpr_free(handle); }
179 
tcp_connect(const struct sockaddr * remote,socklen_t remote_len,on_connect_result * result)180 static void tcp_connect(const struct sockaddr* remote, socklen_t remote_len,
181                         on_connect_result* result) {
182   gpr_timespec deadline = grpc_timeout_seconds_to_deadline(10);
183   uv_tcp_t* client_handle =
184       static_cast<uv_tcp_t*>(gpr_malloc(sizeof(uv_tcp_t)));
185   uv_connect_t* req =
186       static_cast<uv_connect_t*>(gpr_malloc(sizeof(uv_connect_t)));
187   int nconnects_before;
188 
189   gpr_mu_lock(g_mu);
190   nconnects_before = g_nconnects;
191   on_connect_result_init(&g_result);
192   GPR_ASSERT(uv_tcp_init(uv_default_loop(), client_handle) == 0);
193   gpr_log(GPR_DEBUG, "start connect");
194   GPR_ASSERT(uv_tcp_connect(req, client_handle, remote, connect_cb) == 0);
195   gpr_log(GPR_DEBUG, "wait");
196   while (g_nconnects == nconnects_before &&
197          gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
198     grpc_pollset_worker* worker = NULL;
199     GPR_ASSERT(GRPC_LOG_IF_ERROR(
200         "pollset_work",
201         grpc_pollset_work(g_pollset, &worker,
202                           grpc_timespec_to_millis_round_up(deadline))));
203     gpr_mu_unlock(g_mu);
204 
205     gpr_mu_lock(g_mu);
206   }
207   gpr_log(GPR_DEBUG, "wait done");
208   GPR_ASSERT(g_nconnects == nconnects_before + 1);
209   uv_close((uv_handle_t*)client_handle, close_cb);
210   *result = g_result;
211 
212   gpr_mu_unlock(g_mu);
213 }
214 
215 /* Tests a tcp server with multiple ports. */
test_connect(unsigned n)216 static void test_connect(unsigned n) {
217   grpc_core::ExecCtx exec_ctx;
218   grpc_resolved_address resolved_addr;
219   grpc_resolved_address resolved_addr1;
220   struct sockaddr_storage* addr = (struct sockaddr_storage*)resolved_addr.addr;
221   struct sockaddr_storage* addr1 =
222       (struct sockaddr_storage*)resolved_addr1.addr;
223   int svr_port;
224   int svr1_port;
225   grpc_tcp_server* s;
226   GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
227   unsigned i;
228   server_weak_ref weak_ref;
229   server_weak_ref_init(&weak_ref);
230   LOG_TEST("test_connect");
231   gpr_log(GPR_INFO, "clients=%d", n);
232   memset(&resolved_addr, 0, sizeof(resolved_addr));
233   memset(&resolved_addr1, 0, sizeof(resolved_addr1));
234   resolved_addr.len = sizeof(struct sockaddr_storage);
235   resolved_addr1.len = sizeof(struct sockaddr_storage);
236   addr->ss_family = addr1->ss_family = AF_INET;
237   GPR_ASSERT(GRPC_ERROR_NONE ==
238              grpc_tcp_server_add_port(s, &resolved_addr, &svr_port));
239   GPR_ASSERT(svr_port > 0);
240   GPR_ASSERT((uv_ip6_addr("::", svr_port, (struct sockaddr_in6*)addr)) == 0);
241   /* Cannot use wildcard (port==0), because add_port() will try to reuse the
242      same port as a previous add_port(). */
243   svr1_port = grpc_pick_unused_port_or_die();
244   grpc_sockaddr_set_port(&resolved_addr1, svr1_port);
245   GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr1, &svr_port) ==
246                  GRPC_ERROR_NONE &&
247              svr_port == svr1_port);
248 
249   grpc_tcp_server_start(s, &g_pollset, 1, on_connect, NULL);
250 
251   GPR_ASSERT(uv_ip6_addr("::", svr_port, (struct sockaddr_in6*)addr1) == 0);
252 
253   for (i = 0; i < n; i++) {
254     on_connect_result result;
255     on_connect_result_init(&result);
256     tcp_connect((struct sockaddr*)addr, (socklen_t)resolved_addr.len, &result);
257     GPR_ASSERT(result.port_index == 0);
258     GPR_ASSERT(result.server == s);
259     if (weak_ref.server == NULL) {
260       server_weak_ref_set(&weak_ref, result.server);
261     }
262     grpc_tcp_server_unref(result.server);
263 
264     on_connect_result_init(&result);
265     tcp_connect((struct sockaddr*)addr1, (socklen_t)resolved_addr1.len,
266                 &result);
267     GPR_ASSERT(result.port_index == 1);
268     GPR_ASSERT(result.server == s);
269     grpc_tcp_server_unref(result.server);
270   }
271 
272   /* Weak ref to server valid until final unref. */
273   GPR_ASSERT(weak_ref.server != NULL);
274 
275   grpc_tcp_server_unref(s);
276 
277   /* Weak ref lost. */
278   GPR_ASSERT(weak_ref.server == NULL);
279 }
280 
destroy_pollset(void * p,grpc_error * error)281 static void destroy_pollset(void* p, grpc_error* error) {
282   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
283 }
284 
main(int argc,char ** argv)285 int main(int argc, char** argv) {
286   grpc_closure destroyed;
287   grpc_core::ExecCtx exec_ctx;
288   grpc_test_init(argc, argv);
289   grpc_init();
290   g_pollset = static_cast<grpc_pollset*>(gpr_malloc(grpc_pollset_size()));
291   grpc_pollset_init(g_pollset, &g_mu);
292 
293   test_no_op();
294   test_no_op_with_start();
295   test_no_op_with_port();
296   test_no_op_with_port_and_start();
297   test_connect(1);
298   test_connect(10);
299 
300   GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
301                     grpc_schedule_on_exec_ctx);
302   grpc_pollset_shutdown(g_pollset, &destroyed);
303 
304   grpc_shutdown();
305   gpr_free(g_pollset);
306   return 0;
307 }
308 
309 #else /* GRPC_UV */
310 
main(int argc,char ** argv)311 int main(int argc, char** argv) { return 1; }
312 
313 #endif /* GRPC_UV */
314