• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 #include <inttypes.h>
21 
22 #include "src/core/lib/iomgr/port.h"
23 
24 #ifdef GRPC_WINSOCK_SOCKET
25 
26 #include <grpc/event_engine/endpoint_config.h>
27 #include <grpc/slice_buffer.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log_windows.h>
30 
31 #include "absl/log/check.h"
32 #include "src/core/lib/address_utils/sockaddr_utils.h"
33 #include "src/core/lib/event_engine/shim.h"
34 #include "src/core/lib/iomgr/event_engine_shims/tcp_client.h"
35 #include "src/core/lib/iomgr/iocp_windows.h"
36 #include "src/core/lib/iomgr/sockaddr.h"
37 #include "src/core/lib/iomgr/sockaddr_windows.h"
38 #include "src/core/lib/iomgr/socket_windows.h"
39 #include "src/core/lib/iomgr/tcp_client.h"
40 #include "src/core/lib/iomgr/tcp_windows.h"
41 #include "src/core/lib/iomgr/timer.h"
42 #include "src/core/lib/resource_quota/api.h"
43 #include "src/core/lib/slice/slice_internal.h"
44 #include "src/core/util/crash.h"
45 
46 using ::grpc_event_engine::experimental::EndpointConfig;
47 
48 struct async_connect {
49   grpc_closure* on_done;
50   gpr_mu mu;
51   grpc_winsocket* socket;
52   grpc_timer alarm;
53   grpc_closure on_alarm;
54   std::string addr_name;
55   int refs;
56   grpc_closure on_connect;
57   grpc_endpoint** endpoint;
58 };
59 
async_connect_unlock_and_cleanup(async_connect * ac,grpc_winsocket * socket)60 static void async_connect_unlock_and_cleanup(async_connect* ac,
61                                              grpc_winsocket* socket) {
62   int done = (--ac->refs == 0);
63   gpr_mu_unlock(&ac->mu);
64   if (done) {
65     gpr_mu_destroy(&ac->mu);
66     delete ac;
67   }
68   if (socket != NULL) grpc_winsocket_destroy(socket);
69 }
70 
on_alarm(void * acp,grpc_error_handle)71 static void on_alarm(void* acp, grpc_error_handle /* error */) {
72   async_connect* ac = (async_connect*)acp;
73   gpr_mu_lock(&ac->mu);
74   grpc_winsocket* socket = ac->socket;
75   ac->socket = NULL;
76   if (socket != NULL) {
77     grpc_winsocket_shutdown(socket);
78   }
79   async_connect_unlock_and_cleanup(ac, socket);
80 }
81 
on_connect(void * acp,grpc_error_handle error)82 static void on_connect(void* acp, grpc_error_handle error) {
83   async_connect* ac = (async_connect*)acp;
84   grpc_endpoint** ep = ac->endpoint;
85   CHECK(*ep == NULL);
86   grpc_closure* on_done = ac->on_done;
87 
88   gpr_mu_lock(&ac->mu);
89   grpc_winsocket* socket = ac->socket;
90   ac->socket = NULL;
91   gpr_mu_unlock(&ac->mu);
92 
93   grpc_timer_cancel(&ac->alarm);
94 
95   gpr_mu_lock(&ac->mu);
96 
97   if (error.ok()) {
98     if (socket != NULL) {
99       DWORD transferred_bytes = 0;
100       DWORD flags;
101       BOOL wsa_success =
102           WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
103                                  &transferred_bytes, FALSE, &flags);
104       CHECK_EQ(transferred_bytes, 0);
105       if (!wsa_success) {
106         error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
107         closesocket(socket->socket);
108       } else {
109         *ep = grpc_tcp_create(socket, ac->addr_name);
110         socket = nullptr;
111       }
112     } else {
113       error = GRPC_ERROR_CREATE("socket is null");
114     }
115   }
116 
117   async_connect_unlock_and_cleanup(ac, socket);
118   // If the connection was aborted, the callback was already called when
119   // the deadline was met.
120   grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
121 }
122 
123 // Tries to issue one async connection, then schedules both an IOCP
124 // notification request for the connection, and one timeout alert.
tcp_connect(grpc_closure * on_done,grpc_endpoint ** endpoint,grpc_pollset_set *,const EndpointConfig & config,const grpc_resolved_address * addr,grpc_core::Timestamp deadline)125 static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint,
126                            grpc_pollset_set* /* interested_parties */,
127                            const EndpointConfig& config,
128                            const grpc_resolved_address* addr,
129                            grpc_core::Timestamp deadline) {
130   if (grpc_event_engine::experimental::UseEventEngineClient()) {
131     return grpc_event_engine::experimental::event_engine_tcp_client_connect(
132         on_done, endpoint, config, addr, deadline);
133   }
134   SOCKET sock = INVALID_SOCKET;
135   BOOL success;
136   int status;
137   grpc_resolved_address addr6_v4mapped;
138   grpc_resolved_address local_address;
139   grpc_winsocket* socket = NULL;
140   LPFN_CONNECTEX ConnectEx;
141   GUID guid = WSAID_CONNECTEX;
142   DWORD ioctl_num_bytes;
143   grpc_winsocket_callback_info* info;
144   grpc_error_handle error;
145   async_connect* ac = NULL;
146   absl::StatusOr<std::string> addr_uri;
147   int addr_family;
148   int protocol;
149 
150   addr_uri = grpc_sockaddr_to_uri(addr);
151   if (!addr_uri.ok()) {
152     error = GRPC_ERROR_CREATE(addr_uri.status().ToString());
153     goto failure;
154   }
155 
156   *endpoint = NULL;
157 
158   // Use dualstack sockets where available.
159   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
160     addr = &addr6_v4mapped;
161   }
162 
163   // extract family
164   addr_family =
165       (grpc_sockaddr_get_family(addr) == AF_UNIX) ? AF_UNIX : AF_INET6;
166   protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP;
167 
168   sock = WSASocket(addr_family, SOCK_STREAM, protocol, NULL, 0,
169                    grpc_get_default_wsa_socket_flags());
170   if (sock == INVALID_SOCKET) {
171     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
172     goto failure;
173   }
174 
175   if (addr_family == AF_UNIX) {
176     // tcp settings for af_unix are skipped.
177     error = grpc_tcp_set_non_block(sock);
178   } else {
179     error = grpc_tcp_prepare_socket(sock);
180   }
181 
182   if (!error.ok()) {
183     goto failure;
184   }
185 
186   // Grab the function pointer for ConnectEx for that specific socket.
187   // It may change depending on the interface.
188   status =
189       WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
190                &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, NULL, NULL);
191 
192   if (status != 0) {
193     error = GRPC_WSA_ERROR(WSAGetLastError(),
194                            "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)");
195     goto failure;
196   }
197 
198   if (addr_family == AF_UNIX) {
199     // For ConnectEx() to work for AF_UNIX, the sock needs to be bound to
200     // the local address of an unnamed socket.
201     local_address = {};
202     ((grpc_sockaddr*)local_address.addr)->sa_family = AF_UNIX;
203     local_address.len = sizeof(grpc_sockaddr);
204   } else {
205     grpc_sockaddr_make_wildcard6(0, &local_address);
206   }
207 
208   status =
209       bind(sock, (grpc_sockaddr*)&local_address.addr, (int)local_address.len);
210   if (status != 0) {
211     error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
212     goto failure;
213   }
214 
215   socket = grpc_winsocket_create(sock, "client");
216   info = &socket->write_info;
217   success = ConnectEx(sock, (grpc_sockaddr*)&addr->addr, (int)addr->len, NULL,
218                       0, NULL, &info->overlapped);
219   // It wouldn't be unusual to get a success immediately. But we'll still get
220   // an IOCP notification, so let's ignore it.
221   if (!success) {
222     int last_error = WSAGetLastError();
223     if (last_error != ERROR_IO_PENDING) {
224       error = GRPC_WSA_ERROR(last_error, "ConnectEx");
225       goto failure;
226     }
227   }
228   ac = new async_connect();
229   ac->on_done = on_done;
230   ac->socket = socket;
231   gpr_mu_init(&ac->mu);
232   ac->refs = 2;
233   ac->addr_name = addr_uri.value();
234   ac->endpoint = endpoint;
235   GRPC_CLOSURE_INIT(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
236 
237   GRPC_CLOSURE_INIT(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx);
238   gpr_mu_lock(&ac->mu);
239   grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
240   grpc_socket_notify_on_write(socket, &ac->on_connect);
241   gpr_mu_unlock(&ac->mu);
242   return 0;
243 
244 failure:
245   CHECK(!error.ok());
246   grpc_error_handle final_error =
247       GRPC_ERROR_CREATE_REFERENCING("Failed to connect", &error, 1);
248   if (socket != NULL) {
249     grpc_winsocket_destroy(socket);
250   } else if (sock != INVALID_SOCKET) {
251     closesocket(sock);
252   }
253   grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, final_error);
254   return 0;
255 }
256 
tcp_cancel_connect(int64_t connection_handle)257 static bool tcp_cancel_connect(int64_t connection_handle) {
258   if (grpc_event_engine::experimental::UseEventEngineClient()) {
259     return grpc_event_engine::experimental::
260         event_engine_tcp_client_cancel_connect(connection_handle);
261   }
262   return false;
263 }
264 
265 grpc_tcp_client_vtable grpc_windows_tcp_client_vtable = {tcp_connect,
266                                                          tcp_cancel_connect};
267 
268 #endif  // GRPC_WINSOCK_SOCKET
269