• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
24 
25 #include "src/core/lib/iomgr/tcp_client_posix.h"
26 
27 #include <errno.h>
28 #include <netinet/in.h>
29 #include <string.h>
30 #include <unistd.h>
31 
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/string_util.h>
35 #include <grpc/support/time.h>
36 
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/iomgr/ev_posix.h"
40 #include "src/core/lib/iomgr/iomgr_posix.h"
41 #include "src/core/lib/iomgr/sockaddr.h"
42 #include "src/core/lib/iomgr/sockaddr_utils.h"
43 #include "src/core/lib/iomgr/socket_mutator.h"
44 #include "src/core/lib/iomgr/socket_utils_posix.h"
45 #include "src/core/lib/iomgr/tcp_posix.h"
46 #include "src/core/lib/iomgr/timer.h"
47 #include "src/core/lib/iomgr/unix_sockets_posix.h"
48 #include "src/core/lib/slice/slice_internal.h"
49 
50 extern grpc_core::TraceFlag grpc_tcp_trace;
51 
52 typedef struct {
53   gpr_mu mu;
54   grpc_fd* fd;
55   grpc_timer alarm;
56   grpc_closure on_alarm;
57   int refs;
58   grpc_closure write_closure;
59   grpc_pollset_set* interested_parties;
60   char* addr_str;
61   grpc_endpoint** ep;
62   grpc_closure* closure;
63   grpc_channel_args* channel_args;
64 } async_connect;
65 
prepare_socket(const grpc_resolved_address * addr,int fd,const grpc_channel_args * channel_args)66 static grpc_error* prepare_socket(const grpc_resolved_address* addr, int fd,
67                                   const grpc_channel_args* channel_args) {
68   grpc_error* err = GRPC_ERROR_NONE;
69 
70   GPR_ASSERT(fd >= 0);
71 
72   err = grpc_set_socket_nonblocking(fd, 1);
73   if (err != GRPC_ERROR_NONE) goto error;
74   err = grpc_set_socket_cloexec(fd, 1);
75   if (err != GRPC_ERROR_NONE) goto error;
76   if (!grpc_is_unix_socket(addr)) {
77     err = grpc_set_socket_low_latency(fd, 1);
78     if (err != GRPC_ERROR_NONE) goto error;
79     err = grpc_set_socket_tcp_user_timeout(fd, channel_args,
80                                            true /* is_client */);
81     if (err != GRPC_ERROR_NONE) goto error;
82   }
83   err = grpc_set_socket_no_sigpipe_if_possible(fd);
84   if (err != GRPC_ERROR_NONE) goto error;
85   if (channel_args) {
86     for (size_t i = 0; i < channel_args->num_args; i++) {
87       if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_SOCKET_MUTATOR)) {
88         GPR_ASSERT(channel_args->args[i].type == GRPC_ARG_POINTER);
89         grpc_socket_mutator* mutator = static_cast<grpc_socket_mutator*>(
90             channel_args->args[i].value.pointer.p);
91         err = grpc_set_socket_with_mutator(fd, mutator);
92         if (err != GRPC_ERROR_NONE) goto error;
93       }
94     }
95   }
96   goto done;
97 
98 error:
99   if (fd >= 0) {
100     close(fd);
101   }
102 done:
103   return err;
104 }
105 
tc_on_alarm(void * acp,grpc_error * error)106 static void tc_on_alarm(void* acp, grpc_error* error) {
107   int done;
108   async_connect* ac = static_cast<async_connect*>(acp);
109   if (grpc_tcp_trace.enabled()) {
110     const char* str = grpc_error_string(error);
111     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_alarm: error=%s", ac->addr_str,
112             str);
113   }
114   gpr_mu_lock(&ac->mu);
115   if (ac->fd != nullptr) {
116     grpc_fd_shutdown(
117         ac->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"));
118   }
119   done = (--ac->refs == 0);
120   gpr_mu_unlock(&ac->mu);
121   if (done) {
122     gpr_mu_destroy(&ac->mu);
123     gpr_free(ac->addr_str);
124     grpc_channel_args_destroy(ac->channel_args);
125     gpr_free(ac);
126   }
127 }
128 
grpc_tcp_client_create_from_fd(grpc_fd * fd,const grpc_channel_args * channel_args,const char * addr_str)129 grpc_endpoint* grpc_tcp_client_create_from_fd(
130     grpc_fd* fd, const grpc_channel_args* channel_args, const char* addr_str) {
131   return grpc_tcp_create(fd, channel_args, addr_str);
132 }
133 
on_writable(void * acp,grpc_error * error)134 static void on_writable(void* acp, grpc_error* error) {
135   async_connect* ac = static_cast<async_connect*>(acp);
136   int so_error = 0;
137   socklen_t so_error_size;
138   int err;
139   int done;
140   grpc_endpoint** ep = ac->ep;
141   grpc_closure* closure = ac->closure;
142   grpc_fd* fd;
143 
144   GRPC_ERROR_REF(error);
145 
146   if (grpc_tcp_trace.enabled()) {
147     const char* str = grpc_error_string(error);
148     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_writable: error=%s", ac->addr_str,
149             str);
150   }
151 
152   gpr_mu_lock(&ac->mu);
153   GPR_ASSERT(ac->fd);
154   fd = ac->fd;
155   ac->fd = nullptr;
156   gpr_mu_unlock(&ac->mu);
157 
158   grpc_timer_cancel(&ac->alarm);
159 
160   gpr_mu_lock(&ac->mu);
161   if (error != GRPC_ERROR_NONE) {
162     error =
163         grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
164                            grpc_slice_from_static_string("Timeout occurred"));
165     goto finish;
166   }
167 
168   do {
169     so_error_size = sizeof(so_error);
170     err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
171                      &so_error_size);
172   } while (err < 0 && errno == EINTR);
173   if (err < 0) {
174     error = GRPC_OS_ERROR(errno, "getsockopt");
175     goto finish;
176   }
177 
178   switch (so_error) {
179     case 0:
180       grpc_pollset_set_del_fd(ac->interested_parties, fd);
181       *ep = grpc_tcp_client_create_from_fd(fd, ac->channel_args, ac->addr_str);
182       fd = nullptr;
183       break;
184     case ENOBUFS:
185       /* We will get one of these errors if we have run out of
186          memory in the kernel for the data structures allocated
187          when you connect a socket.  If this happens it is very
188          likely that if we wait a little bit then try again the
189          connection will work (since other programs or this
190          program will close their network connections and free up
191          memory).  This does _not_ indicate that there is anything
192          wrong with the server we are connecting to, this is a
193          local problem.
194 
195          If you are looking at this code, then chances are that
196          your program or another program on the same computer
197          opened too many network connections.  The "easy" fix:
198          don't do that! */
199       gpr_log(GPR_ERROR, "kernel out of buffers");
200       gpr_mu_unlock(&ac->mu);
201       grpc_fd_notify_on_write(fd, &ac->write_closure);
202       return;
203     case ECONNREFUSED:
204       /* This error shouldn't happen for anything other than connect(). */
205       error = GRPC_OS_ERROR(so_error, "connect");
206       break;
207     default:
208       /* We don't really know which syscall triggered the problem here,
209          so punt by reporting getsockopt(). */
210       error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
211       break;
212   }
213 
214 finish:
215   if (fd != nullptr) {
216     grpc_pollset_set_del_fd(ac->interested_parties, fd);
217     grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
218     fd = nullptr;
219   }
220   done = (--ac->refs == 0);
221   // Create a copy of the data from "ac" to be accessed after the unlock, as
222   // "ac" and its contents may be deallocated by the time they are read.
223   const grpc_slice addr_str_slice = grpc_slice_from_copied_string(ac->addr_str);
224   gpr_mu_unlock(&ac->mu);
225   if (error != GRPC_ERROR_NONE) {
226     char* error_descr;
227     grpc_slice str;
228     bool ret = grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &str);
229     GPR_ASSERT(ret);
230     char* desc = grpc_slice_to_c_string(str);
231     gpr_asprintf(&error_descr, "Failed to connect to remote host: %s", desc);
232     error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION,
233                                grpc_slice_from_copied_string(error_descr));
234     gpr_free(error_descr);
235     gpr_free(desc);
236     error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
237                                addr_str_slice /* takes ownership */);
238   } else {
239     grpc_slice_unref_internal(addr_str_slice);
240   }
241   if (done) {
242     // This is safe even outside the lock, because "done", the sentinel, is
243     // populated *inside* the lock.
244     gpr_mu_destroy(&ac->mu);
245     gpr_free(ac->addr_str);
246     grpc_channel_args_destroy(ac->channel_args);
247     gpr_free(ac);
248   }
249   GRPC_CLOSURE_SCHED(closure, error);
250 }
251 
grpc_tcp_client_prepare_fd(const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_resolved_address * mapped_addr,grpc_fd ** fdobj)252 grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
253                                        const grpc_resolved_address* addr,
254                                        grpc_resolved_address* mapped_addr,
255                                        grpc_fd** fdobj) {
256   grpc_dualstack_mode dsmode;
257   int fd;
258   grpc_error* error;
259   char* name;
260   char* addr_str;
261   *fdobj = nullptr;
262   /* Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
263      v6. */
264   if (!grpc_sockaddr_to_v4mapped(addr, mapped_addr)) {
265     /* addr is v4 mapped to v6 or v6. */
266     memcpy(mapped_addr, addr, sizeof(*mapped_addr));
267   }
268   error =
269       grpc_create_dualstack_socket(mapped_addr, SOCK_STREAM, 0, &dsmode, &fd);
270   if (error != GRPC_ERROR_NONE) {
271     return error;
272   }
273   if (dsmode == GRPC_DSMODE_IPV4) {
274     /* Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4. */
275     if (!grpc_sockaddr_is_v4mapped(addr, mapped_addr)) {
276       memcpy(mapped_addr, addr, sizeof(*mapped_addr));
277     }
278   }
279   if ((error = prepare_socket(mapped_addr, fd, channel_args)) !=
280       GRPC_ERROR_NONE) {
281     return error;
282   }
283   addr_str = grpc_sockaddr_to_uri(mapped_addr);
284   gpr_asprintf(&name, "tcp-client:%s", addr_str);
285   *fdobj = grpc_fd_create(fd, name, true);
286   gpr_free(name);
287   gpr_free(addr_str);
288   return GRPC_ERROR_NONE;
289 }
290 
grpc_tcp_client_create_from_prepared_fd(grpc_pollset_set * interested_parties,grpc_closure * closure,grpc_fd * fdobj,const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_millis deadline,grpc_endpoint ** ep)291 void grpc_tcp_client_create_from_prepared_fd(
292     grpc_pollset_set* interested_parties, grpc_closure* closure, grpc_fd* fdobj,
293     const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
294     grpc_millis deadline, grpc_endpoint** ep) {
295   const int fd = grpc_fd_wrapped_fd(fdobj);
296   int err;
297   async_connect* ac;
298   do {
299     err = connect(fd, reinterpret_cast<const grpc_sockaddr*>(addr->addr),
300                   addr->len);
301   } while (err < 0 && errno == EINTR);
302   if (err >= 0) {
303     char* addr_str = grpc_sockaddr_to_uri(addr);
304     *ep = grpc_tcp_client_create_from_fd(fdobj, channel_args, addr_str);
305     gpr_free(addr_str);
306     GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
307     return;
308   }
309   if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
310     grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
311     GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect"));
312     return;
313   }
314 
315   grpc_pollset_set_add_fd(interested_parties, fdobj);
316 
317   ac = static_cast<async_connect*>(gpr_malloc(sizeof(async_connect)));
318   ac->closure = closure;
319   ac->ep = ep;
320   ac->fd = fdobj;
321   ac->interested_parties = interested_parties;
322   ac->addr_str = grpc_sockaddr_to_uri(addr);
323   gpr_mu_init(&ac->mu);
324   ac->refs = 2;
325   GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
326                     grpc_schedule_on_exec_ctx);
327   ac->channel_args = grpc_channel_args_copy(channel_args);
328 
329   if (grpc_tcp_trace.enabled()) {
330     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: asynchronously connecting fd %p",
331             ac->addr_str, fdobj);
332   }
333 
334   gpr_mu_lock(&ac->mu);
335   GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
336   grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
337   grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
338   gpr_mu_unlock(&ac->mu);
339 }
340 
tcp_connect(grpc_closure * closure,grpc_endpoint ** ep,grpc_pollset_set * interested_parties,const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_millis deadline)341 static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
342                         grpc_pollset_set* interested_parties,
343                         const grpc_channel_args* channel_args,
344                         const grpc_resolved_address* addr,
345                         grpc_millis deadline) {
346   grpc_resolved_address mapped_addr;
347   grpc_fd* fdobj = nullptr;
348   grpc_error* error;
349   *ep = nullptr;
350   if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
351                                           &fdobj)) != GRPC_ERROR_NONE) {
352     GRPC_CLOSURE_SCHED(closure, error);
353     return;
354   }
355   grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fdobj,
356                                           channel_args, &mapped_addr, deadline,
357                                           ep);
358 }
359 
360 grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect};
361 #endif
362