1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
24
25 #include "src/core/lib/iomgr/tcp_client_posix.h"
26
27 #include <errno.h>
28 #include <netinet/in.h>
29 #include <string.h>
30 #include <unistd.h>
31
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/string_util.h>
35 #include <grpc/support/time.h>
36
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/iomgr/ev_posix.h"
40 #include "src/core/lib/iomgr/iomgr_posix.h"
41 #include "src/core/lib/iomgr/sockaddr.h"
42 #include "src/core/lib/iomgr/sockaddr_utils.h"
43 #include "src/core/lib/iomgr/socket_mutator.h"
44 #include "src/core/lib/iomgr/socket_utils_posix.h"
45 #include "src/core/lib/iomgr/tcp_posix.h"
46 #include "src/core/lib/iomgr/timer.h"
47 #include "src/core/lib/iomgr/unix_sockets_posix.h"
48 #include "src/core/lib/slice/slice_internal.h"
49
50 extern grpc_core::TraceFlag grpc_tcp_trace;
51
52 typedef struct {
53 gpr_mu mu;
54 grpc_fd* fd;
55 grpc_timer alarm;
56 grpc_closure on_alarm;
57 int refs;
58 grpc_closure write_closure;
59 grpc_pollset_set* interested_parties;
60 char* addr_str;
61 grpc_endpoint** ep;
62 grpc_closure* closure;
63 grpc_channel_args* channel_args;
64 } async_connect;
65
prepare_socket(const grpc_resolved_address * addr,int fd,const grpc_channel_args * channel_args)66 static grpc_error* prepare_socket(const grpc_resolved_address* addr, int fd,
67 const grpc_channel_args* channel_args) {
68 grpc_error* err = GRPC_ERROR_NONE;
69
70 GPR_ASSERT(fd >= 0);
71
72 err = grpc_set_socket_nonblocking(fd, 1);
73 if (err != GRPC_ERROR_NONE) goto error;
74 err = grpc_set_socket_cloexec(fd, 1);
75 if (err != GRPC_ERROR_NONE) goto error;
76 if (!grpc_is_unix_socket(addr)) {
77 err = grpc_set_socket_low_latency(fd, 1);
78 if (err != GRPC_ERROR_NONE) goto error;
79 err = grpc_set_socket_tcp_user_timeout(fd, channel_args,
80 true /* is_client */);
81 if (err != GRPC_ERROR_NONE) goto error;
82 }
83 err = grpc_set_socket_no_sigpipe_if_possible(fd);
84 if (err != GRPC_ERROR_NONE) goto error;
85 if (channel_args) {
86 for (size_t i = 0; i < channel_args->num_args; i++) {
87 if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_SOCKET_MUTATOR)) {
88 GPR_ASSERT(channel_args->args[i].type == GRPC_ARG_POINTER);
89 grpc_socket_mutator* mutator = static_cast<grpc_socket_mutator*>(
90 channel_args->args[i].value.pointer.p);
91 err = grpc_set_socket_with_mutator(fd, mutator);
92 if (err != GRPC_ERROR_NONE) goto error;
93 }
94 }
95 }
96 goto done;
97
98 error:
99 if (fd >= 0) {
100 close(fd);
101 }
102 done:
103 return err;
104 }
105
tc_on_alarm(void * acp,grpc_error * error)106 static void tc_on_alarm(void* acp, grpc_error* error) {
107 int done;
108 async_connect* ac = static_cast<async_connect*>(acp);
109 if (grpc_tcp_trace.enabled()) {
110 const char* str = grpc_error_string(error);
111 gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_alarm: error=%s", ac->addr_str,
112 str);
113 }
114 gpr_mu_lock(&ac->mu);
115 if (ac->fd != nullptr) {
116 grpc_fd_shutdown(
117 ac->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"));
118 }
119 done = (--ac->refs == 0);
120 gpr_mu_unlock(&ac->mu);
121 if (done) {
122 gpr_mu_destroy(&ac->mu);
123 gpr_free(ac->addr_str);
124 grpc_channel_args_destroy(ac->channel_args);
125 gpr_free(ac);
126 }
127 }
128
grpc_tcp_client_create_from_fd(grpc_fd * fd,const grpc_channel_args * channel_args,const char * addr_str)129 grpc_endpoint* grpc_tcp_client_create_from_fd(
130 grpc_fd* fd, const grpc_channel_args* channel_args, const char* addr_str) {
131 return grpc_tcp_create(fd, channel_args, addr_str);
132 }
133
on_writable(void * acp,grpc_error * error)134 static void on_writable(void* acp, grpc_error* error) {
135 async_connect* ac = static_cast<async_connect*>(acp);
136 int so_error = 0;
137 socklen_t so_error_size;
138 int err;
139 int done;
140 grpc_endpoint** ep = ac->ep;
141 grpc_closure* closure = ac->closure;
142 grpc_fd* fd;
143
144 GRPC_ERROR_REF(error);
145
146 if (grpc_tcp_trace.enabled()) {
147 const char* str = grpc_error_string(error);
148 gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_writable: error=%s", ac->addr_str,
149 str);
150 }
151
152 gpr_mu_lock(&ac->mu);
153 GPR_ASSERT(ac->fd);
154 fd = ac->fd;
155 ac->fd = nullptr;
156 gpr_mu_unlock(&ac->mu);
157
158 grpc_timer_cancel(&ac->alarm);
159
160 gpr_mu_lock(&ac->mu);
161 if (error != GRPC_ERROR_NONE) {
162 error =
163 grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
164 grpc_slice_from_static_string("Timeout occurred"));
165 goto finish;
166 }
167
168 do {
169 so_error_size = sizeof(so_error);
170 err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
171 &so_error_size);
172 } while (err < 0 && errno == EINTR);
173 if (err < 0) {
174 error = GRPC_OS_ERROR(errno, "getsockopt");
175 goto finish;
176 }
177
178 switch (so_error) {
179 case 0:
180 grpc_pollset_set_del_fd(ac->interested_parties, fd);
181 *ep = grpc_tcp_client_create_from_fd(fd, ac->channel_args, ac->addr_str);
182 fd = nullptr;
183 break;
184 case ENOBUFS:
185 /* We will get one of these errors if we have run out of
186 memory in the kernel for the data structures allocated
187 when you connect a socket. If this happens it is very
188 likely that if we wait a little bit then try again the
189 connection will work (since other programs or this
190 program will close their network connections and free up
191 memory). This does _not_ indicate that there is anything
192 wrong with the server we are connecting to, this is a
193 local problem.
194
195 If you are looking at this code, then chances are that
196 your program or another program on the same computer
197 opened too many network connections. The "easy" fix:
198 don't do that! */
199 gpr_log(GPR_ERROR, "kernel out of buffers");
200 gpr_mu_unlock(&ac->mu);
201 grpc_fd_notify_on_write(fd, &ac->write_closure);
202 return;
203 case ECONNREFUSED:
204 /* This error shouldn't happen for anything other than connect(). */
205 error = GRPC_OS_ERROR(so_error, "connect");
206 break;
207 default:
208 /* We don't really know which syscall triggered the problem here,
209 so punt by reporting getsockopt(). */
210 error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
211 break;
212 }
213
214 finish:
215 if (fd != nullptr) {
216 grpc_pollset_set_del_fd(ac->interested_parties, fd);
217 grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
218 fd = nullptr;
219 }
220 done = (--ac->refs == 0);
221 // Create a copy of the data from "ac" to be accessed after the unlock, as
222 // "ac" and its contents may be deallocated by the time they are read.
223 const grpc_slice addr_str_slice = grpc_slice_from_copied_string(ac->addr_str);
224 gpr_mu_unlock(&ac->mu);
225 if (error != GRPC_ERROR_NONE) {
226 char* error_descr;
227 grpc_slice str;
228 bool ret = grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &str);
229 GPR_ASSERT(ret);
230 char* desc = grpc_slice_to_c_string(str);
231 gpr_asprintf(&error_descr, "Failed to connect to remote host: %s", desc);
232 error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION,
233 grpc_slice_from_copied_string(error_descr));
234 gpr_free(error_descr);
235 gpr_free(desc);
236 error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
237 addr_str_slice /* takes ownership */);
238 } else {
239 grpc_slice_unref_internal(addr_str_slice);
240 }
241 if (done) {
242 // This is safe even outside the lock, because "done", the sentinel, is
243 // populated *inside* the lock.
244 gpr_mu_destroy(&ac->mu);
245 gpr_free(ac->addr_str);
246 grpc_channel_args_destroy(ac->channel_args);
247 gpr_free(ac);
248 }
249 GRPC_CLOSURE_SCHED(closure, error);
250 }
251
grpc_tcp_client_prepare_fd(const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_resolved_address * mapped_addr,grpc_fd ** fdobj)252 grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
253 const grpc_resolved_address* addr,
254 grpc_resolved_address* mapped_addr,
255 grpc_fd** fdobj) {
256 grpc_dualstack_mode dsmode;
257 int fd;
258 grpc_error* error;
259 char* name;
260 char* addr_str;
261 *fdobj = nullptr;
262 /* Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
263 v6. */
264 if (!grpc_sockaddr_to_v4mapped(addr, mapped_addr)) {
265 /* addr is v4 mapped to v6 or v6. */
266 memcpy(mapped_addr, addr, sizeof(*mapped_addr));
267 }
268 error =
269 grpc_create_dualstack_socket(mapped_addr, SOCK_STREAM, 0, &dsmode, &fd);
270 if (error != GRPC_ERROR_NONE) {
271 return error;
272 }
273 if (dsmode == GRPC_DSMODE_IPV4) {
274 /* Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4. */
275 if (!grpc_sockaddr_is_v4mapped(addr, mapped_addr)) {
276 memcpy(mapped_addr, addr, sizeof(*mapped_addr));
277 }
278 }
279 if ((error = prepare_socket(mapped_addr, fd, channel_args)) !=
280 GRPC_ERROR_NONE) {
281 return error;
282 }
283 addr_str = grpc_sockaddr_to_uri(mapped_addr);
284 gpr_asprintf(&name, "tcp-client:%s", addr_str);
285 *fdobj = grpc_fd_create(fd, name, true);
286 gpr_free(name);
287 gpr_free(addr_str);
288 return GRPC_ERROR_NONE;
289 }
290
grpc_tcp_client_create_from_prepared_fd(grpc_pollset_set * interested_parties,grpc_closure * closure,grpc_fd * fdobj,const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_millis deadline,grpc_endpoint ** ep)291 void grpc_tcp_client_create_from_prepared_fd(
292 grpc_pollset_set* interested_parties, grpc_closure* closure, grpc_fd* fdobj,
293 const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
294 grpc_millis deadline, grpc_endpoint** ep) {
295 const int fd = grpc_fd_wrapped_fd(fdobj);
296 int err;
297 async_connect* ac;
298 do {
299 err = connect(fd, reinterpret_cast<const grpc_sockaddr*>(addr->addr),
300 addr->len);
301 } while (err < 0 && errno == EINTR);
302 if (err >= 0) {
303 char* addr_str = grpc_sockaddr_to_uri(addr);
304 *ep = grpc_tcp_client_create_from_fd(fdobj, channel_args, addr_str);
305 gpr_free(addr_str);
306 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
307 return;
308 }
309 if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
310 grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
311 GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect"));
312 return;
313 }
314
315 grpc_pollset_set_add_fd(interested_parties, fdobj);
316
317 ac = static_cast<async_connect*>(gpr_malloc(sizeof(async_connect)));
318 ac->closure = closure;
319 ac->ep = ep;
320 ac->fd = fdobj;
321 ac->interested_parties = interested_parties;
322 ac->addr_str = grpc_sockaddr_to_uri(addr);
323 gpr_mu_init(&ac->mu);
324 ac->refs = 2;
325 GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
326 grpc_schedule_on_exec_ctx);
327 ac->channel_args = grpc_channel_args_copy(channel_args);
328
329 if (grpc_tcp_trace.enabled()) {
330 gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: asynchronously connecting fd %p",
331 ac->addr_str, fdobj);
332 }
333
334 gpr_mu_lock(&ac->mu);
335 GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
336 grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
337 grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
338 gpr_mu_unlock(&ac->mu);
339 }
340
tcp_connect(grpc_closure * closure,grpc_endpoint ** ep,grpc_pollset_set * interested_parties,const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_millis deadline)341 static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
342 grpc_pollset_set* interested_parties,
343 const grpc_channel_args* channel_args,
344 const grpc_resolved_address* addr,
345 grpc_millis deadline) {
346 grpc_resolved_address mapped_addr;
347 grpc_fd* fdobj = nullptr;
348 grpc_error* error;
349 *ep = nullptr;
350 if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
351 &fdobj)) != GRPC_ERROR_NONE) {
352 GRPC_CLOSURE_SCHED(closure, error);
353 return;
354 }
355 grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fdobj,
356 channel_args, &mapped_addr, deadline,
357 ep);
358 }
359
360 grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect};
361 #endif
362