1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
24
25 #include "src/core/lib/iomgr/tcp_client_posix.h"
26
27 #include <errno.h>
28 #include <netinet/in.h>
29 #include <string.h>
30 #include <unistd.h>
31
32 #include "absl/strings/str_cat.h"
33
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/time.h>
37
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/gpr/string.h"
40 #include "src/core/lib/iomgr/ev_posix.h"
41 #include "src/core/lib/iomgr/iomgr_internal.h"
42 #include "src/core/lib/iomgr/sockaddr.h"
43 #include "src/core/lib/iomgr/sockaddr_utils.h"
44 #include "src/core/lib/iomgr/socket_mutator.h"
45 #include "src/core/lib/iomgr/socket_utils_posix.h"
46 #include "src/core/lib/iomgr/tcp_posix.h"
47 #include "src/core/lib/iomgr/timer.h"
48 #include "src/core/lib/iomgr/unix_sockets_posix.h"
49 #include "src/core/lib/slice/slice_internal.h"
50
51 extern grpc_core::TraceFlag grpc_tcp_trace;
52
53 struct async_connect {
54 gpr_mu mu;
55 grpc_fd* fd;
56 grpc_timer alarm;
57 grpc_closure on_alarm;
58 int refs;
59 grpc_closure write_closure;
60 grpc_pollset_set* interested_parties;
61 std::string addr_str;
62 grpc_endpoint** ep;
63 grpc_closure* closure;
64 grpc_channel_args* channel_args;
65 };
66
prepare_socket(const grpc_resolved_address * addr,int fd,const grpc_channel_args * channel_args)67 static grpc_error* prepare_socket(const grpc_resolved_address* addr, int fd,
68 const grpc_channel_args* channel_args) {
69 grpc_error* err = GRPC_ERROR_NONE;
70
71 GPR_ASSERT(fd >= 0);
72
73 err = grpc_set_socket_nonblocking(fd, 1);
74 if (err != GRPC_ERROR_NONE) goto error;
75 err = grpc_set_socket_cloexec(fd, 1);
76 if (err != GRPC_ERROR_NONE) goto error;
77 if (!grpc_is_unix_socket(addr)) {
78 err = grpc_set_socket_low_latency(fd, 1);
79 if (err != GRPC_ERROR_NONE) goto error;
80 err = grpc_set_socket_reuse_addr(fd, 1);
81 if (err != GRPC_ERROR_NONE) goto error;
82 err = grpc_set_socket_tcp_user_timeout(fd, channel_args,
83 true /* is_client */);
84 if (err != GRPC_ERROR_NONE) goto error;
85 }
86 err = grpc_set_socket_no_sigpipe_if_possible(fd);
87 if (err != GRPC_ERROR_NONE) goto error;
88
89 err = grpc_apply_socket_mutator_in_args(fd, channel_args);
90 if (err != GRPC_ERROR_NONE) goto error;
91
92 goto done;
93
94 error:
95 if (fd >= 0) {
96 close(fd);
97 }
98 done:
99 return err;
100 }
101
tc_on_alarm(void * acp,grpc_error * error)102 static void tc_on_alarm(void* acp, grpc_error* error) {
103 int done;
104 async_connect* ac = static_cast<async_connect*>(acp);
105 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
106 const char* str = grpc_error_string(error);
107 gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_alarm: error=%s",
108 ac->addr_str.c_str(), str);
109 }
110 gpr_mu_lock(&ac->mu);
111 if (ac->fd != nullptr) {
112 grpc_fd_shutdown(
113 ac->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"));
114 }
115 done = (--ac->refs == 0);
116 gpr_mu_unlock(&ac->mu);
117 if (done) {
118 gpr_mu_destroy(&ac->mu);
119 grpc_channel_args_destroy(ac->channel_args);
120 delete ac;
121 }
122 }
123
grpc_tcp_client_create_from_fd(grpc_fd * fd,const grpc_channel_args * channel_args,const char * addr_str)124 grpc_endpoint* grpc_tcp_client_create_from_fd(
125 grpc_fd* fd, const grpc_channel_args* channel_args, const char* addr_str) {
126 return grpc_tcp_create(fd, channel_args, addr_str);
127 }
128
on_writable(void * acp,grpc_error * error)129 static void on_writable(void* acp, grpc_error* error) {
130 async_connect* ac = static_cast<async_connect*>(acp);
131 int so_error = 0;
132 socklen_t so_error_size;
133 int err;
134 int done;
135 grpc_endpoint** ep = ac->ep;
136 grpc_closure* closure = ac->closure;
137 grpc_fd* fd;
138
139 GRPC_ERROR_REF(error);
140
141 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
142 const char* str = grpc_error_string(error);
143 gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_writable: error=%s",
144 ac->addr_str.c_str(), str);
145 }
146
147 gpr_mu_lock(&ac->mu);
148 GPR_ASSERT(ac->fd);
149 fd = ac->fd;
150 ac->fd = nullptr;
151 gpr_mu_unlock(&ac->mu);
152
153 grpc_timer_cancel(&ac->alarm);
154
155 gpr_mu_lock(&ac->mu);
156 if (error != GRPC_ERROR_NONE) {
157 error =
158 grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
159 grpc_slice_from_static_string("Timeout occurred"));
160 goto finish;
161 }
162
163 do {
164 so_error_size = sizeof(so_error);
165 err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
166 &so_error_size);
167 } while (err < 0 && errno == EINTR);
168 if (err < 0) {
169 error = GRPC_OS_ERROR(errno, "getsockopt");
170 goto finish;
171 }
172
173 switch (so_error) {
174 case 0:
175 grpc_pollset_set_del_fd(ac->interested_parties, fd);
176 *ep = grpc_tcp_client_create_from_fd(fd, ac->channel_args,
177 ac->addr_str.c_str());
178 fd = nullptr;
179 break;
180 case ENOBUFS:
181 /* We will get one of these errors if we have run out of
182 memory in the kernel for the data structures allocated
183 when you connect a socket. If this happens it is very
184 likely that if we wait a little bit then try again the
185 connection will work (since other programs or this
186 program will close their network connections and free up
187 memory). This does _not_ indicate that there is anything
188 wrong with the server we are connecting to, this is a
189 local problem.
190
191 If you are looking at this code, then chances are that
192 your program or another program on the same computer
193 opened too many network connections. The "easy" fix:
194 don't do that! */
195 gpr_log(GPR_ERROR, "kernel out of buffers");
196 gpr_mu_unlock(&ac->mu);
197 grpc_fd_notify_on_write(fd, &ac->write_closure);
198 return;
199 case ECONNREFUSED:
200 /* This error shouldn't happen for anything other than connect(). */
201 error = GRPC_OS_ERROR(so_error, "connect");
202 break;
203 default:
204 /* We don't really know which syscall triggered the problem here,
205 so punt by reporting getsockopt(). */
206 error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
207 break;
208 }
209
210 finish:
211 if (fd != nullptr) {
212 grpc_pollset_set_del_fd(ac->interested_parties, fd);
213 grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
214 fd = nullptr;
215 }
216 done = (--ac->refs == 0);
217 // Create a copy of the data from "ac" to be accessed after the unlock, as
218 // "ac" and its contents may be deallocated by the time they are read.
219 const grpc_slice addr_str_slice = grpc_slice_from_cpp_string(ac->addr_str);
220 gpr_mu_unlock(&ac->mu);
221 if (error != GRPC_ERROR_NONE) {
222 grpc_slice str;
223 bool ret = grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &str);
224 GPR_ASSERT(ret);
225 std::string description = absl::StrCat("Failed to connect to remote host: ",
226 grpc_core::StringViewFromSlice(str));
227 error =
228 grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION,
229 grpc_slice_from_cpp_string(std::move(description)));
230 error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
231 addr_str_slice /* takes ownership */);
232 } else {
233 grpc_slice_unref_internal(addr_str_slice);
234 }
235 if (done) {
236 // This is safe even outside the lock, because "done", the sentinel, is
237 // populated *inside* the lock.
238 gpr_mu_destroy(&ac->mu);
239 grpc_channel_args_destroy(ac->channel_args);
240 delete ac;
241 }
242 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
243 }
244
grpc_tcp_client_prepare_fd(const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_resolved_address * mapped_addr,int * fd)245 grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
246 const grpc_resolved_address* addr,
247 grpc_resolved_address* mapped_addr,
248 int* fd) {
249 grpc_dualstack_mode dsmode;
250 grpc_error* error;
251 *fd = -1;
252 /* Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
253 v6. */
254 if (!grpc_sockaddr_to_v4mapped(addr, mapped_addr)) {
255 /* addr is v4 mapped to v6 or v6. */
256 memcpy(mapped_addr, addr, sizeof(*mapped_addr));
257 }
258 error =
259 grpc_create_dualstack_socket(mapped_addr, SOCK_STREAM, 0, &dsmode, fd);
260 if (error != GRPC_ERROR_NONE) {
261 return error;
262 }
263 if (dsmode == GRPC_DSMODE_IPV4) {
264 /* Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4. */
265 if (!grpc_sockaddr_is_v4mapped(addr, mapped_addr)) {
266 memcpy(mapped_addr, addr, sizeof(*mapped_addr));
267 }
268 }
269 if ((error = prepare_socket(mapped_addr, *fd, channel_args)) !=
270 GRPC_ERROR_NONE) {
271 return error;
272 }
273 return GRPC_ERROR_NONE;
274 }
275
grpc_tcp_client_create_from_prepared_fd(grpc_pollset_set * interested_parties,grpc_closure * closure,const int fd,const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_millis deadline,grpc_endpoint ** ep)276 void grpc_tcp_client_create_from_prepared_fd(
277 grpc_pollset_set* interested_parties, grpc_closure* closure, const int fd,
278 const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
279 grpc_millis deadline, grpc_endpoint** ep) {
280 int err;
281 do {
282 err = connect(fd, reinterpret_cast<const grpc_sockaddr*>(addr->addr),
283 addr->len);
284 } while (err < 0 && errno == EINTR);
285
286 std::string name = absl::StrCat("tcp-client:", grpc_sockaddr_to_uri(addr));
287 grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
288
289 if (err >= 0) {
290 *ep = grpc_tcp_client_create_from_fd(fdobj, channel_args,
291 grpc_sockaddr_to_uri(addr).c_str());
292 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
293 return;
294 }
295 if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
296 grpc_error* error = GRPC_OS_ERROR(errno, "connect");
297 error = grpc_error_set_str(
298 error, GRPC_ERROR_STR_TARGET_ADDRESS,
299 grpc_slice_from_cpp_string(grpc_sockaddr_to_uri(addr)));
300 grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
301 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
302 return;
303 }
304
305 grpc_pollset_set_add_fd(interested_parties, fdobj);
306
307 async_connect* ac = new async_connect();
308 ac->closure = closure;
309 ac->ep = ep;
310 ac->fd = fdobj;
311 ac->interested_parties = interested_parties;
312 ac->addr_str = grpc_sockaddr_to_uri(addr);
313 gpr_mu_init(&ac->mu);
314 ac->refs = 2;
315 GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
316 grpc_schedule_on_exec_ctx);
317 ac->channel_args = grpc_channel_args_copy(channel_args);
318
319 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
320 gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: asynchronously connecting fd %p",
321 ac->addr_str.c_str(), fdobj);
322 }
323
324 gpr_mu_lock(&ac->mu);
325 GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
326 grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
327 grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
328 gpr_mu_unlock(&ac->mu);
329 }
330
tcp_connect(grpc_closure * closure,grpc_endpoint ** ep,grpc_pollset_set * interested_parties,const grpc_channel_args * channel_args,const grpc_resolved_address * addr,grpc_millis deadline)331 static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
332 grpc_pollset_set* interested_parties,
333 const grpc_channel_args* channel_args,
334 const grpc_resolved_address* addr,
335 grpc_millis deadline) {
336 grpc_resolved_address mapped_addr;
337 int fd = -1;
338 grpc_error* error;
339 *ep = nullptr;
340 if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
341 &fd)) != GRPC_ERROR_NONE) {
342 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
343 return;
344 }
345 grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fd,
346 channel_args, &mapped_addr, deadline,
347 ep);
348 }
349
350 grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect};
351 #endif
352