1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/exec_ctx.h"
22 #include "src/core/lib/iomgr/port.h"
23
24 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
25
26 #include <errno.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/time.h>
29 #include <netinet/in.h>
30 #include <string.h>
31 #include <unistd.h>
32
33 #include "absl/container/flat_hash_map.h"
34 #include "absl/log/check.h"
35 #include "absl/log/log.h"
36 #include "absl/strings/str_cat.h"
37 #include "src/core/lib/address_utils/sockaddr_utils.h"
38 #include "src/core/lib/event_engine/resolved_address_internal.h"
39 #include "src/core/lib/event_engine/shim.h"
40 #include "src/core/lib/iomgr/ev_posix.h"
41 #include "src/core/lib/iomgr/event_engine_shims/tcp_client.h"
42 #include "src/core/lib/iomgr/executor.h"
43 #include "src/core/lib/iomgr/iomgr_internal.h"
44 #include "src/core/lib/iomgr/sockaddr.h"
45 #include "src/core/lib/iomgr/socket_mutator.h"
46 #include "src/core/lib/iomgr/socket_utils_posix.h"
47 #include "src/core/lib/iomgr/tcp_client_posix.h"
48 #include "src/core/lib/iomgr/tcp_posix.h"
49 #include "src/core/lib/iomgr/timer.h"
50 #include "src/core/lib/iomgr/unix_sockets_posix.h"
51 #include "src/core/lib/iomgr/vsock.h"
52 #include "src/core/lib/slice/slice_internal.h"
53 #include "src/core/util/crash.h"
54 #include "src/core/util/string.h"
55
56 using ::grpc_event_engine::experimental::EndpointConfig;
57
58 struct async_connect {
59 gpr_mu mu;
60 grpc_fd* fd;
61 grpc_timer alarm;
62 grpc_closure on_alarm;
63 int refs;
64 grpc_closure write_closure;
65 grpc_pollset_set* interested_parties;
66 std::string addr_str;
67 grpc_endpoint** ep;
68 grpc_closure* closure;
69 int64_t connection_handle;
70 bool connect_cancelled;
71 grpc_core::PosixTcpOptions options;
72 };
73
74 struct ConnectionShard {
75 grpc_core::Mutex mu;
76 absl::flat_hash_map<int64_t, async_connect*> pending_connections
77 ABSL_GUARDED_BY(&mu);
78 };
79
80 namespace {
81
82 gpr_once g_tcp_client_posix_init = GPR_ONCE_INIT;
83 std::vector<ConnectionShard>* g_connection_shards = nullptr;
84 std::atomic<int64_t> g_connection_id{1};
85
do_tcp_client_global_init(void)86 void do_tcp_client_global_init(void) {
87 size_t num_shards = std::max(2 * gpr_cpu_num_cores(), 1u);
88 g_connection_shards = new std::vector<struct ConnectionShard>(num_shards);
89 }
90
91 } // namespace
92
grpc_tcp_client_global_init()93 void grpc_tcp_client_global_init() {
94 gpr_once_init(&g_tcp_client_posix_init, do_tcp_client_global_init);
95 }
96
prepare_socket(const grpc_resolved_address * addr,int fd,const grpc_core::PosixTcpOptions & options)97 static grpc_error_handle prepare_socket(
98 const grpc_resolved_address* addr, int fd,
99 const grpc_core::PosixTcpOptions& options) {
100 grpc_error_handle err;
101
102 CHECK_GE(fd, 0);
103
104 err = grpc_set_socket_nonblocking(fd, 1);
105 if (!err.ok()) goto error;
106 err = grpc_set_socket_cloexec(fd, 1);
107 if (!err.ok()) goto error;
108 if (options.tcp_receive_buffer_size != options.kReadBufferSizeUnset) {
109 err = grpc_set_socket_rcvbuf(fd, options.tcp_receive_buffer_size);
110 if (!err.ok()) goto error;
111 }
112 if (!grpc_is_unix_socket(addr) && !grpc_is_vsock(addr)) {
113 err = grpc_set_socket_low_latency(fd, 1);
114 if (!err.ok()) goto error;
115 err = grpc_set_socket_reuse_addr(fd, 1);
116 if (!err.ok()) goto error;
117 err = grpc_set_socket_dscp(fd, options.dscp);
118 if (!err.ok()) goto error;
119 err = grpc_set_socket_tcp_user_timeout(fd, options, true /* is_client */);
120 if (!err.ok()) goto error;
121 }
122 err = grpc_set_socket_no_sigpipe_if_possible(fd);
123 if (!err.ok()) goto error;
124
125 err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_CLIENT_CONNECTION_USAGE,
126 options);
127 if (!err.ok()) goto error;
128
129 goto done;
130
131 error:
132 if (fd >= 0) {
133 close(fd);
134 }
135 done:
136 return err;
137 }
138
tc_on_alarm(void * acp,grpc_error_handle error)139 static void tc_on_alarm(void* acp, grpc_error_handle error) {
140 int done;
141 async_connect* ac = static_cast<async_connect*>(acp);
142 GRPC_TRACE_LOG(tcp, INFO)
143 << "CLIENT_CONNECT: " << ac->addr_str
144 << ": on_alarm: error=" << grpc_core::StatusToString(error);
145 gpr_mu_lock(&ac->mu);
146 if (ac->fd != nullptr) {
147 grpc_fd_shutdown(ac->fd, GRPC_ERROR_CREATE("connect() timed out"));
148 }
149 done = (--ac->refs == 0);
150 gpr_mu_unlock(&ac->mu);
151 if (done) {
152 gpr_mu_destroy(&ac->mu);
153 delete ac;
154 }
155 }
156
grpc_tcp_client_create_from_fd(grpc_fd * fd,const grpc_core::PosixTcpOptions & options,absl::string_view addr_str)157 static grpc_endpoint* grpc_tcp_client_create_from_fd(
158 grpc_fd* fd, const grpc_core::PosixTcpOptions& options,
159 absl::string_view addr_str) {
160 return grpc_tcp_create(fd, options, addr_str);
161 }
162
grpc_tcp_create_from_fd(grpc_fd * fd,const grpc_event_engine::experimental::EndpointConfig & config,absl::string_view addr_str)163 grpc_endpoint* grpc_tcp_create_from_fd(
164 grpc_fd* fd, const grpc_event_engine::experimental::EndpointConfig& config,
165 absl::string_view addr_str) {
166 return grpc_tcp_create(fd, TcpOptionsFromEndpointConfig(config), addr_str);
167 }
168
on_writable(void * acp,grpc_error_handle error)169 static void on_writable(void* acp, grpc_error_handle error) {
170 async_connect* ac = static_cast<async_connect*>(acp);
171 int so_error = 0;
172 socklen_t so_error_size;
173 int err;
174 int done;
175 grpc_endpoint** ep = ac->ep;
176 grpc_closure* closure = ac->closure;
177 std::string addr_str = ac->addr_str;
178 grpc_fd* fd;
179
180 GRPC_TRACE_LOG(tcp, INFO)
181 << "CLIENT_CONNECT: " << ac->addr_str
182 << ": on_writable: error=" << grpc_core::StatusToString(error);
183
184 gpr_mu_lock(&ac->mu);
185 CHECK(ac->fd);
186 fd = ac->fd;
187 ac->fd = nullptr;
188 bool connect_cancelled = ac->connect_cancelled;
189 gpr_mu_unlock(&ac->mu);
190
191 grpc_timer_cancel(&ac->alarm);
192
193 gpr_mu_lock(&ac->mu);
194 if (!error.ok()) {
195 error = grpc_core::AddMessagePrefix("Timeout occurred", error);
196 goto finish;
197 }
198
199 if (connect_cancelled) {
200 // The callback should not get scheduled in this case.
201 error = absl::OkStatus();
202 goto finish;
203 }
204
205 do {
206 so_error_size = sizeof(so_error);
207 err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
208 &so_error_size);
209 } while (err < 0 && errno == EINTR);
210 if (err < 0) {
211 error = GRPC_OS_ERROR(errno, "getsockopt");
212 goto finish;
213 }
214
215 switch (so_error) {
216 case 0:
217 grpc_pollset_set_del_fd(ac->interested_parties, fd);
218 *ep = grpc_tcp_client_create_from_fd(fd, ac->options, ac->addr_str);
219 fd = nullptr;
220 break;
221 case ENOBUFS:
222 // We will get one of these errors if we have run out of
223 // memory in the kernel for the data structures allocated
224 // when you connect a socket. If this happens it is very
225 // likely that if we wait a little bit then try again the
226 // connection will work (since other programs or this
227 // program will close their network connections and free up
228 // memory). This does _not_ indicate that there is anything
229 // wrong with the server we are connecting to, this is a
230 // local problem.
231
232 // If you are looking at this code, then chances are that
233 // your program or another program on the same computer
234 // opened too many network connections. The "easy" fix:
235 // don't do that!
236 LOG(ERROR) << "kernel out of buffers";
237 gpr_mu_unlock(&ac->mu);
238 grpc_fd_notify_on_write(fd, &ac->write_closure);
239 return;
240 case ECONNREFUSED:
241 // This error shouldn't happen for anything other than connect().
242 error = GRPC_OS_ERROR(so_error, "connect");
243 break;
244 default:
245 // We don't really know which syscall triggered the problem here,
246 // so punt by reporting getsockopt().
247 error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
248 break;
249 }
250
251 finish:
252 if (!connect_cancelled) {
253 int shard_number = ac->connection_handle % (*g_connection_shards).size();
254 struct ConnectionShard* shard = &(*g_connection_shards)[shard_number];
255 {
256 grpc_core::MutexLock lock(&shard->mu);
257 shard->pending_connections.erase(ac->connection_handle);
258 }
259 }
260 if (fd != nullptr) {
261 grpc_pollset_set_del_fd(ac->interested_parties, fd);
262 grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
263 fd = nullptr;
264 }
265 done = (--ac->refs == 0);
266 gpr_mu_unlock(&ac->mu);
267 if (!error.ok()) {
268 std::string str;
269 bool ret = grpc_error_get_str(
270 error, grpc_core::StatusStrProperty::kDescription, &str);
271 CHECK(ret);
272 std::string description =
273 absl::StrCat("Failed to connect to remote host: ", str);
274 error = grpc_error_set_str(
275 error, grpc_core::StatusStrProperty::kDescription, description);
276 }
277 if (done) {
278 // This is safe even outside the lock, because "done", the sentinel, is
279 // populated *inside* the lock.
280 gpr_mu_destroy(&ac->mu);
281 delete ac;
282 }
283 // Push async connect closure to the executor since this may actually be
284 // called during the shutdown process, in which case a deadlock could form
285 // between the core shutdown mu and the connector mu (b/188239051)
286 if (!connect_cancelled) {
287 grpc_core::Executor::Run(closure, error);
288 }
289 }
290
grpc_tcp_client_prepare_fd(const grpc_core::PosixTcpOptions & options,const grpc_resolved_address * addr,grpc_resolved_address * mapped_addr,int * fd)291 grpc_error_handle grpc_tcp_client_prepare_fd(
292 const grpc_core::PosixTcpOptions& options,
293 const grpc_resolved_address* addr, grpc_resolved_address* mapped_addr,
294 int* fd) {
295 grpc_dualstack_mode dsmode;
296 grpc_error_handle error;
297 *fd = -1;
298 // Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
299 // v6.
300 if (!grpc_sockaddr_to_v4mapped(addr, mapped_addr)) {
301 // addr is v4 mapped to v6 or v6.
302 memcpy(mapped_addr, addr, sizeof(*mapped_addr));
303 }
304 error =
305 grpc_create_dualstack_socket(mapped_addr, SOCK_STREAM, 0, &dsmode, fd);
306 if (!error.ok()) {
307 return error;
308 }
309 if (dsmode == GRPC_DSMODE_IPV4) {
310 // Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4.
311 if (!grpc_sockaddr_is_v4mapped(addr, mapped_addr)) {
312 memcpy(mapped_addr, addr, sizeof(*mapped_addr));
313 }
314 }
315 if ((error = prepare_socket(mapped_addr, *fd, options)) != absl::OkStatus()) {
316 return error;
317 }
318 return absl::OkStatus();
319 }
320
grpc_tcp_client_create_from_prepared_fd(grpc_pollset_set * interested_parties,grpc_closure * closure,const int fd,const grpc_core::PosixTcpOptions & options,const grpc_resolved_address * addr,grpc_core::Timestamp deadline,grpc_endpoint ** ep)321 int64_t grpc_tcp_client_create_from_prepared_fd(
322 grpc_pollset_set* interested_parties, grpc_closure* closure, const int fd,
323 const grpc_core::PosixTcpOptions& options,
324 const grpc_resolved_address* addr, grpc_core::Timestamp deadline,
325 grpc_endpoint** ep) {
326 int err;
327 do {
328 err = connect(fd, reinterpret_cast<const grpc_sockaddr*>(addr->addr),
329 addr->len);
330 } while (err < 0 && errno == EINTR);
331 int connect_errno = (err < 0) ? errno : 0;
332
333 auto addr_uri = grpc_sockaddr_to_uri(addr);
334 if (!addr_uri.ok()) {
335 grpc_error_handle error = GRPC_ERROR_CREATE(addr_uri.status().ToString());
336 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
337 return 0;
338 }
339
340 std::string name = absl::StrCat("tcp-client:", *addr_uri);
341 grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
342 int64_t connection_id = 0;
343 if (connect_errno == EWOULDBLOCK || connect_errno == EINPROGRESS) {
344 // Connection is still in progress.
345 connection_id = g_connection_id.fetch_add(1, std::memory_order_acq_rel);
346 }
347
348 if (err >= 0) {
349 // Connection already succeeded. Return 0 to discourage any cancellation
350 // attempts.
351 *ep = grpc_tcp_client_create_from_fd(fdobj, options, *addr_uri);
352 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
353 return 0;
354 }
355 if (connect_errno != EWOULDBLOCK && connect_errno != EINPROGRESS) {
356 // Connection already failed. Return 0 to discourage any cancellation
357 // attempts.
358 grpc_error_handle error = GRPC_OS_ERROR(connect_errno, "connect");
359 grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
360 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
361 return 0;
362 }
363
364 grpc_pollset_set_add_fd(interested_parties, fdobj);
365
366 async_connect* ac = new async_connect();
367 ac->closure = closure;
368 ac->ep = ep;
369 ac->fd = fdobj;
370 ac->interested_parties = interested_parties;
371 ac->addr_str = addr_uri.value();
372 ac->connection_handle = connection_id;
373 ac->connect_cancelled = false;
374 gpr_mu_init(&ac->mu);
375 ac->refs = 2;
376 GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
377 grpc_schedule_on_exec_ctx);
378 ac->options = options;
379
380 GRPC_TRACE_LOG(tcp, INFO) << "CLIENT_CONNECT: " << ac->addr_str
381 << ": asynchronously connecting fd " << fdobj;
382
383 int shard_number = connection_id % (*g_connection_shards).size();
384 struct ConnectionShard* shard = &(*g_connection_shards)[shard_number];
385 {
386 grpc_core::MutexLock lock(&shard->mu);
387 shard->pending_connections.insert_or_assign(connection_id, ac);
388 }
389
390 gpr_mu_lock(&ac->mu);
391 GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
392 grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
393 grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
394 gpr_mu_unlock(&ac->mu);
395 return connection_id;
396 }
397
tcp_connect(grpc_closure * closure,grpc_endpoint ** ep,grpc_pollset_set * interested_parties,const EndpointConfig & config,const grpc_resolved_address * addr,grpc_core::Timestamp deadline)398 static int64_t tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
399 grpc_pollset_set* interested_parties,
400 const EndpointConfig& config,
401 const grpc_resolved_address* addr,
402 grpc_core::Timestamp deadline) {
403 if (grpc_event_engine::experimental::UseEventEngineClient()) {
404 return grpc_event_engine::experimental::event_engine_tcp_client_connect(
405 closure, ep, config, addr, deadline);
406 }
407 grpc_resolved_address mapped_addr;
408 grpc_core::PosixTcpOptions options(TcpOptionsFromEndpointConfig(config));
409 int fd = -1;
410 grpc_error_handle error;
411 *ep = nullptr;
412 if ((error = grpc_tcp_client_prepare_fd(options, addr, &mapped_addr, &fd)) !=
413 absl::OkStatus()) {
414 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
415 return 0;
416 }
417 return grpc_tcp_client_create_from_prepared_fd(
418 interested_parties, closure, fd, options, &mapped_addr, deadline, ep);
419 }
420
tcp_cancel_connect(int64_t connection_handle)421 static bool tcp_cancel_connect(int64_t connection_handle) {
422 if (grpc_event_engine::experimental::UseEventEngineClient()) {
423 return grpc_event_engine::experimental::
424 event_engine_tcp_client_cancel_connect(connection_handle);
425 }
426 if (connection_handle <= 0) {
427 return false;
428 }
429 int shard_number = connection_handle % (*g_connection_shards).size();
430 struct ConnectionShard* shard = &(*g_connection_shards)[shard_number];
431 async_connect* ac = nullptr;
432 {
433 grpc_core::MutexLock lock(&shard->mu);
434 auto it = shard->pending_connections.find(connection_handle);
435 if (it != shard->pending_connections.end()) {
436 ac = it->second;
437 CHECK_NE(ac, nullptr);
438 // Trying to acquire ac->mu here would could cause a deadlock because
439 // the on_writable method tries to acquire the two mutexes used
440 // here in the reverse order. But we dont need to acquire ac->mu before
441 // incrementing ac->refs here. This is because the on_writable
442 // method decrements ac->refs only after deleting the connection handle
443 // from the corresponding hashmap. If the code enters here, it means that
444 // deletion hasn't happened yet. The deletion can only happen after the
445 // corresponding g_shard_mu is unlocked.
446 ++ac->refs;
447 // Remove connection from list of active connections.
448 shard->pending_connections.erase(it);
449 }
450 }
451 if (ac == nullptr) {
452 return false;
453 }
454 gpr_mu_lock(&ac->mu);
455 bool connection_cancel_success = (ac->fd != nullptr);
456 if (connection_cancel_success) {
457 // Connection is still pending. The on_writable callback hasn't executed
458 // yet because ac->fd != nullptr.
459 ac->connect_cancelled = true;
460 // Shutdown the fd. This would cause on_writable to run as soon as possible.
461 // We dont need to pass a custom error here because it wont be used since
462 // the on_connect_closure is not run if connect cancellation is successful.
463 grpc_fd_shutdown(ac->fd, absl::OkStatus());
464 }
465 bool done = (--ac->refs == 0);
466 gpr_mu_unlock(&ac->mu);
467 if (done) {
468 // This is safe even outside the lock, because "done", the sentinel, is
469 // populated *inside* the lock.
470 gpr_mu_destroy(&ac->mu);
471 delete ac;
472 }
473 return connection_cancel_success;
474 }
475
476 grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect,
477 tcp_cancel_connect};
478 #endif
479