• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
24 
25 #include "src/core/lib/iomgr/tcp_client_posix.h"
26 
27 #include <errno.h>
28 #include <netinet/in.h>
29 #include <string.h>
30 #include <unistd.h>
31 
32 #include "absl/strings/str_cat.h"
33 
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/time.h>
37 
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/gpr/string.h"
40 #include "src/core/lib/iomgr/ev_posix.h"
41 #include "src/core/lib/iomgr/iomgr_internal.h"
42 #include "src/core/lib/iomgr/sockaddr.h"
43 #include "src/core/lib/iomgr/sockaddr_utils.h"
44 #include "src/core/lib/iomgr/socket_mutator.h"
45 #include "src/core/lib/iomgr/socket_utils_posix.h"
46 #include "src/core/lib/iomgr/tcp_posix.h"
47 #include "src/core/lib/iomgr/timer.h"
48 #include "src/core/lib/iomgr/unix_sockets_posix.h"
49 #include "src/core/lib/slice/slice_internal.h"
50 
51 extern grpc_core::TraceFlag grpc_tcp_trace;
52 
53 struct async_connect {
54   gpr_mu mu;
55   grpc_fd* fd;
56   grpc_timer alarm;
57   grpc_closure on_alarm;
58   int refs;
59   grpc_closure write_closure;
60   grpc_pollset_set* interested_parties;
61   std::string addr_str;
62   grpc_endpoint** ep;
63   grpc_closure* closure;
64   grpc_channel_args* channel_args;
65 };
66 
prepare_socket(const grpc_resolved_address * addr,int fd,const grpc_channel_args * channel_args)67 static grpc_error* prepare_socket(const grpc_resolved_address* addr, int fd,
68                                   const grpc_channel_args* channel_args) {
69   grpc_error* err = GRPC_ERROR_NONE;
70 
71   GPR_ASSERT(fd >= 0);
72 
73   err = grpc_set_socket_nonblocking(fd, 1);
74   if (err != GRPC_ERROR_NONE) goto error;
75   err = grpc_set_socket_cloexec(fd, 1);
76   if (err != GRPC_ERROR_NONE) goto error;
77   if (!grpc_is_unix_socket(addr)) {
78     err = grpc_set_socket_low_latency(fd, 1);
79     if (err != GRPC_ERROR_NONE) goto error;
80     err = grpc_set_socket_reuse_addr(fd, 1);
81     if (err != GRPC_ERROR_NONE) goto error;
82     err = grpc_set_socket_tcp_user_timeout(fd, channel_args,
83                                            true /* is_client */);
84     if (err != GRPC_ERROR_NONE) goto error;
85   }
86   err = grpc_set_socket_no_sigpipe_if_possible(fd);
87   if (err != GRPC_ERROR_NONE) goto error;
88 
89   err = grpc_apply_socket_mutator_in_args(fd, channel_args);
90   if (err != GRPC_ERROR_NONE) goto error;
91 
92   goto done;
93 
94 error:
95   if (fd >= 0) {
96     close(fd);
97   }
98 done:
99   return err;
100 }
101 
tc_on_alarm(void * acp,grpc_error * error)102 static void tc_on_alarm(void* acp, grpc_error* error) {
103   int done;
104   async_connect* ac = static_cast<async_connect*>(acp);
105   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
106     const char* str = grpc_error_string(error);
107     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_alarm: error=%s",
108             ac->addr_str.c_str(), str);
109   }
110   gpr_mu_lock(&ac->mu);
111   if (ac->fd != nullptr) {
112     grpc_fd_shutdown(
113         ac->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"));
114   }
115   done = (--ac->refs == 0);
116   gpr_mu_unlock(&ac->mu);
117   if (done) {
118     gpr_mu_destroy(&ac->mu);
119     grpc_channel_args_destroy(ac->channel_args);
120     delete ac;
121   }
122 }
123 
grpc_tcp_client_create_from_fd(grpc_fd * fd,const grpc_channel_args * channel_args,const char * addr_str)124 grpc_endpoint* grpc_tcp_client_create_from_fd(
125     grpc_fd* fd, const grpc_channel_args* channel_args, const char* addr_str) {
126   return grpc_tcp_create(fd, channel_args, addr_str);
127 }
128 
on_writable(void * acp,grpc_error * error)129 static void on_writable(void* acp, grpc_error* error) {
130   async_connect* ac = static_cast<async_connect*>(acp);
131   int so_error = 0;
132   socklen_t so_error_size;
133   int err;
134   int done;
135   grpc_endpoint** ep = ac->ep;
136   grpc_closure* closure = ac->closure;
137   grpc_fd* fd;
138 
139   GRPC_ERROR_REF(error);
140 
141   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
142     const char* str = grpc_error_string(error);
143     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_writable: error=%s",
144             ac->addr_str.c_str(), str);
145   }
146 
147   gpr_mu_lock(&ac->mu);
148   GPR_ASSERT(ac->fd);
149   fd = ac->fd;
150   ac->fd = nullptr;
151   gpr_mu_unlock(&ac->mu);
152 
153   grpc_timer_cancel(&ac->alarm);
154 
155   gpr_mu_lock(&ac->mu);
156   if (error != GRPC_ERROR_NONE) {
157     error =
158         grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
159                            grpc_slice_from_static_string("Timeout occurred"));
160     goto finish;
161   }
162 
163   do {
164     so_error_size = sizeof(so_error);
165     err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
166                      &so_error_size);
167   } while (err < 0 && errno == EINTR);
168   if (err < 0) {
169     error = GRPC_OS_ERROR(errno, "getsockopt");
170     goto finish;
171   }
172 
173   switch (so_error) {
174     case 0:
175       grpc_pollset_set_del_fd(ac->interested_parties, fd);
176       *ep = grpc_tcp_client_create_from_fd(fd, ac->channel_args,
177                                            ac->addr_str.c_str());
178       fd = nullptr;
179       break;
180     case ENOBUFS:
181       /* We will get one of these errors if we have run out of
182          memory in the kernel for the data structures allocated
183          when you connect a socket.  If this happens it is very
184          likely that if we wait a little bit then try again the
185          connection will work (since other programs or this
186          program will close their network connections and free up
187          memory).  This does _not_ indicate that there is anything
188          wrong with the server we are connecting to, this is a
189          local problem.
190 
191          If you are looking at this code, then chances are that
192          your program or another program on the same computer
193          opened too many network connections.  The "easy" fix:
194          don't do that! */
195       gpr_log(GPR_ERROR, "kernel out of buffers");
196       gpr_mu_unlock(&ac->mu);
197       grpc_fd_notify_on_write(fd, &ac->write_closure);
198       return;
199     case ECONNREFUSED:
200       /* This error shouldn't happen for anything other than connect(). */
201       error = GRPC_OS_ERROR(so_error, "connect");
202       break;
203     default:
204       /* We don't really know which syscall triggered the problem here,
205          so punt by reporting getsockopt(). */
206       error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
207       break;
208   }
209 
210 finish:
211   if (fd != nullptr) {
212     grpc_pollset_set_del_fd(ac->interested_parties, fd);
213     grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
214     fd = nullptr;
215   }
216   done = (--ac->refs == 0);
217   // Create a copy of the data from "ac" to be accessed after the unlock, as
218   // "ac" and its contents may be deallocated by the time they are read.
219   const grpc_slice addr_str_slice = grpc_slice_from_cpp_string(ac->addr_str);
220   gpr_mu_unlock(&ac->mu);
221   if (error != GRPC_ERROR_NONE) {
222     grpc_slice str;
223     bool ret = grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &str);
224     GPR_ASSERT(ret);
225     std::string description = absl::StrCat("Failed to connect to remote host: ",
226                                            grpc_core::StringViewFromSlice(str));
227     error =
228         grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION,
229                            grpc_slice_from_cpp_string(std::move(description)));
230     error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
231                                addr_str_slice /* takes ownership */);
232   } else {
233     grpc_slice_unref_internal(addr_str_slice);
234   }
235   if (done) {
236     // This is safe even outside the lock, because "done", the sentinel, is
237     // populated *inside* the lock.
238     gpr_mu_destroy(&ac->mu);
239     grpc_channel_args_destroy(ac->channel_args);
240     delete ac;
241   }
242   grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
243 }
244 
grpc_tcp_client_prepare_fd(const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_resolved_address * mapped_addr,int * fd)245 grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
246                                        const grpc_resolved_address* addr,
247                                        grpc_resolved_address* mapped_addr,
248                                        int* fd) {
249   grpc_dualstack_mode dsmode;
250   grpc_error* error;
251   *fd = -1;
252   /* Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
253      v6. */
254   if (!grpc_sockaddr_to_v4mapped(addr, mapped_addr)) {
255     /* addr is v4 mapped to v6 or v6. */
256     memcpy(mapped_addr, addr, sizeof(*mapped_addr));
257   }
258   error =
259       grpc_create_dualstack_socket(mapped_addr, SOCK_STREAM, 0, &dsmode, fd);
260   if (error != GRPC_ERROR_NONE) {
261     return error;
262   }
263   if (dsmode == GRPC_DSMODE_IPV4) {
264     /* Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4. */
265     if (!grpc_sockaddr_is_v4mapped(addr, mapped_addr)) {
266       memcpy(mapped_addr, addr, sizeof(*mapped_addr));
267     }
268   }
269   if ((error = prepare_socket(mapped_addr, *fd, channel_args)) !=
270       GRPC_ERROR_NONE) {
271     return error;
272   }
273   return GRPC_ERROR_NONE;
274 }
275 
grpc_tcp_client_create_from_prepared_fd(grpc_pollset_set * interested_parties,grpc_closure * closure,const int fd,const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_millis deadline,grpc_endpoint ** ep)276 void grpc_tcp_client_create_from_prepared_fd(
277     grpc_pollset_set* interested_parties, grpc_closure* closure, const int fd,
278     const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
279     grpc_millis deadline, grpc_endpoint** ep) {
280   int err;
281   do {
282     err = connect(fd, reinterpret_cast<const grpc_sockaddr*>(addr->addr),
283                   addr->len);
284   } while (err < 0 && errno == EINTR);
285 
286   std::string name = absl::StrCat("tcp-client:", grpc_sockaddr_to_uri(addr));
287   grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
288 
289   if (err >= 0) {
290     *ep = grpc_tcp_client_create_from_fd(fdobj, channel_args,
291                                          grpc_sockaddr_to_uri(addr).c_str());
292     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
293     return;
294   }
295   if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
296     grpc_error* error = GRPC_OS_ERROR(errno, "connect");
297     error = grpc_error_set_str(
298         error, GRPC_ERROR_STR_TARGET_ADDRESS,
299         grpc_slice_from_cpp_string(grpc_sockaddr_to_uri(addr)));
300     grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
301     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
302     return;
303   }
304 
305   grpc_pollset_set_add_fd(interested_parties, fdobj);
306 
307   async_connect* ac = new async_connect();
308   ac->closure = closure;
309   ac->ep = ep;
310   ac->fd = fdobj;
311   ac->interested_parties = interested_parties;
312   ac->addr_str = grpc_sockaddr_to_uri(addr);
313   gpr_mu_init(&ac->mu);
314   ac->refs = 2;
315   GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
316                     grpc_schedule_on_exec_ctx);
317   ac->channel_args = grpc_channel_args_copy(channel_args);
318 
319   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
320     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: asynchronously connecting fd %p",
321             ac->addr_str.c_str(), fdobj);
322   }
323 
324   gpr_mu_lock(&ac->mu);
325   GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
326   grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
327   grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
328   gpr_mu_unlock(&ac->mu);
329 }
330 
tcp_connect(grpc_closure * closure,grpc_endpoint ** ep,grpc_pollset_set * interested_parties,const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_millis deadline)331 static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
332                         grpc_pollset_set* interested_parties,
333                         const grpc_channel_args* channel_args,
334                         const grpc_resolved_address* addr,
335                         grpc_millis deadline) {
336   grpc_resolved_address mapped_addr;
337   int fd = -1;
338   grpc_error* error;
339   *ep = nullptr;
340   if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
341                                           &fd)) != GRPC_ERROR_NONE) {
342     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
343     return;
344   }
345   grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fd,
346                                           channel_args, &mapped_addr, deadline,
347                                           ep);
348 }
349 
350 grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect};
351 #endif
352