1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20 #include <inttypes.h>
21
22 #include "src/core/lib/iomgr/port.h"
23
24 #ifdef GRPC_WINSOCK_SOCKET
25
26 #include <grpc/event_engine/endpoint_config.h>
27 #include <grpc/slice_buffer.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log_windows.h>
30
31 #include "absl/log/check.h"
32 #include "src/core/lib/address_utils/sockaddr_utils.h"
33 #include "src/core/lib/event_engine/shim.h"
34 #include "src/core/lib/iomgr/event_engine_shims/tcp_client.h"
35 #include "src/core/lib/iomgr/iocp_windows.h"
36 #include "src/core/lib/iomgr/sockaddr.h"
37 #include "src/core/lib/iomgr/sockaddr_windows.h"
38 #include "src/core/lib/iomgr/socket_windows.h"
39 #include "src/core/lib/iomgr/tcp_client.h"
40 #include "src/core/lib/iomgr/tcp_windows.h"
41 #include "src/core/lib/iomgr/timer.h"
42 #include "src/core/lib/resource_quota/api.h"
43 #include "src/core/lib/slice/slice_internal.h"
44 #include "src/core/util/crash.h"
45
46 using ::grpc_event_engine::experimental::EndpointConfig;
47
48 struct async_connect {
49 grpc_closure* on_done;
50 gpr_mu mu;
51 grpc_winsocket* socket;
52 grpc_timer alarm;
53 grpc_closure on_alarm;
54 std::string addr_name;
55 int refs;
56 grpc_closure on_connect;
57 grpc_endpoint** endpoint;
58 };
59
async_connect_unlock_and_cleanup(async_connect * ac,grpc_winsocket * socket)60 static void async_connect_unlock_and_cleanup(async_connect* ac,
61 grpc_winsocket* socket) {
62 int done = (--ac->refs == 0);
63 gpr_mu_unlock(&ac->mu);
64 if (done) {
65 gpr_mu_destroy(&ac->mu);
66 delete ac;
67 }
68 if (socket != NULL) grpc_winsocket_destroy(socket);
69 }
70
on_alarm(void * acp,grpc_error_handle)71 static void on_alarm(void* acp, grpc_error_handle /* error */) {
72 async_connect* ac = (async_connect*)acp;
73 gpr_mu_lock(&ac->mu);
74 grpc_winsocket* socket = ac->socket;
75 ac->socket = NULL;
76 if (socket != NULL) {
77 grpc_winsocket_shutdown(socket);
78 }
79 async_connect_unlock_and_cleanup(ac, socket);
80 }
81
on_connect(void * acp,grpc_error_handle error)82 static void on_connect(void* acp, grpc_error_handle error) {
83 async_connect* ac = (async_connect*)acp;
84 grpc_endpoint** ep = ac->endpoint;
85 CHECK(*ep == NULL);
86 grpc_closure* on_done = ac->on_done;
87
88 gpr_mu_lock(&ac->mu);
89 grpc_winsocket* socket = ac->socket;
90 ac->socket = NULL;
91 gpr_mu_unlock(&ac->mu);
92
93 grpc_timer_cancel(&ac->alarm);
94
95 gpr_mu_lock(&ac->mu);
96
97 if (error.ok()) {
98 if (socket != NULL) {
99 DWORD transferred_bytes = 0;
100 DWORD flags;
101 BOOL wsa_success =
102 WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
103 &transferred_bytes, FALSE, &flags);
104 CHECK_EQ(transferred_bytes, 0);
105 if (!wsa_success) {
106 error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
107 closesocket(socket->socket);
108 } else {
109 *ep = grpc_tcp_create(socket, ac->addr_name);
110 socket = nullptr;
111 }
112 } else {
113 error = GRPC_ERROR_CREATE("socket is null");
114 }
115 }
116
117 async_connect_unlock_and_cleanup(ac, socket);
118 // If the connection was aborted, the callback was already called when
119 // the deadline was met.
120 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
121 }
122
123 // Tries to issue one async connection, then schedules both an IOCP
124 // notification request for the connection, and one timeout alert.
tcp_connect(grpc_closure * on_done,grpc_endpoint ** endpoint,grpc_pollset_set *,const EndpointConfig & config,const grpc_resolved_address * addr,grpc_core::Timestamp deadline)125 static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint,
126 grpc_pollset_set* /* interested_parties */,
127 const EndpointConfig& config,
128 const grpc_resolved_address* addr,
129 grpc_core::Timestamp deadline) {
130 if (grpc_event_engine::experimental::UseEventEngineClient()) {
131 return grpc_event_engine::experimental::event_engine_tcp_client_connect(
132 on_done, endpoint, config, addr, deadline);
133 }
134 SOCKET sock = INVALID_SOCKET;
135 BOOL success;
136 int status;
137 grpc_resolved_address addr6_v4mapped;
138 grpc_resolved_address local_address;
139 grpc_winsocket* socket = NULL;
140 LPFN_CONNECTEX ConnectEx;
141 GUID guid = WSAID_CONNECTEX;
142 DWORD ioctl_num_bytes;
143 grpc_winsocket_callback_info* info;
144 grpc_error_handle error;
145 async_connect* ac = NULL;
146 absl::StatusOr<std::string> addr_uri;
147 int addr_family;
148 int protocol;
149
150 addr_uri = grpc_sockaddr_to_uri(addr);
151 if (!addr_uri.ok()) {
152 error = GRPC_ERROR_CREATE(addr_uri.status().ToString());
153 goto failure;
154 }
155
156 *endpoint = NULL;
157
158 // Use dualstack sockets where available.
159 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
160 addr = &addr6_v4mapped;
161 }
162
163 // extract family
164 addr_family =
165 (grpc_sockaddr_get_family(addr) == AF_UNIX) ? AF_UNIX : AF_INET6;
166 protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP;
167
168 sock = WSASocket(addr_family, SOCK_STREAM, protocol, NULL, 0,
169 grpc_get_default_wsa_socket_flags());
170 if (sock == INVALID_SOCKET) {
171 error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
172 goto failure;
173 }
174
175 if (addr_family == AF_UNIX) {
176 // tcp settings for af_unix are skipped.
177 error = grpc_tcp_set_non_block(sock);
178 } else {
179 error = grpc_tcp_prepare_socket(sock);
180 }
181
182 if (!error.ok()) {
183 goto failure;
184 }
185
186 // Grab the function pointer for ConnectEx for that specific socket.
187 // It may change depending on the interface.
188 status =
189 WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
190 &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, NULL, NULL);
191
192 if (status != 0) {
193 error = GRPC_WSA_ERROR(WSAGetLastError(),
194 "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)");
195 goto failure;
196 }
197
198 if (addr_family == AF_UNIX) {
199 // For ConnectEx() to work for AF_UNIX, the sock needs to be bound to
200 // the local address of an unnamed socket.
201 local_address = {};
202 ((grpc_sockaddr*)local_address.addr)->sa_family = AF_UNIX;
203 local_address.len = sizeof(grpc_sockaddr);
204 } else {
205 grpc_sockaddr_make_wildcard6(0, &local_address);
206 }
207
208 status =
209 bind(sock, (grpc_sockaddr*)&local_address.addr, (int)local_address.len);
210 if (status != 0) {
211 error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
212 goto failure;
213 }
214
215 socket = grpc_winsocket_create(sock, "client");
216 info = &socket->write_info;
217 success = ConnectEx(sock, (grpc_sockaddr*)&addr->addr, (int)addr->len, NULL,
218 0, NULL, &info->overlapped);
219 // It wouldn't be unusual to get a success immediately. But we'll still get
220 // an IOCP notification, so let's ignore it.
221 if (!success) {
222 int last_error = WSAGetLastError();
223 if (last_error != ERROR_IO_PENDING) {
224 error = GRPC_WSA_ERROR(last_error, "ConnectEx");
225 goto failure;
226 }
227 }
228 ac = new async_connect();
229 ac->on_done = on_done;
230 ac->socket = socket;
231 gpr_mu_init(&ac->mu);
232 ac->refs = 2;
233 ac->addr_name = addr_uri.value();
234 ac->endpoint = endpoint;
235 GRPC_CLOSURE_INIT(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
236
237 GRPC_CLOSURE_INIT(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx);
238 gpr_mu_lock(&ac->mu);
239 grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
240 grpc_socket_notify_on_write(socket, &ac->on_connect);
241 gpr_mu_unlock(&ac->mu);
242 return 0;
243
244 failure:
245 CHECK(!error.ok());
246 grpc_error_handle final_error =
247 GRPC_ERROR_CREATE_REFERENCING("Failed to connect", &error, 1);
248 if (socket != NULL) {
249 grpc_winsocket_destroy(socket);
250 } else if (sock != INVALID_SOCKET) {
251 closesocket(sock);
252 }
253 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, final_error);
254 return 0;
255 }
256
tcp_cancel_connect(int64_t connection_handle)257 static bool tcp_cancel_connect(int64_t connection_handle) {
258 if (grpc_event_engine::experimental::UseEventEngineClient()) {
259 return grpc_event_engine::experimental::
260 event_engine_tcp_client_cancel_connect(connection_handle);
261 }
262 return false;
263 }
264
265 grpc_tcp_client_vtable grpc_windows_tcp_client_vtable = {tcp_connect,
266 tcp_cancel_connect};
267
268 #endif // GRPC_WINSOCK_SOCKET
269