1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include <limits.h>
26
27 #include "src/core/lib/iomgr/network_status_tracker.h"
28 #include "src/core/lib/iomgr/sockaddr_windows.h"
29
30 #include <grpc/slice_buffer.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/log_windows.h>
34 #include <grpc/support/string_util.h>
35
36 #include "src/core/lib/gpr/useful.h"
37 #include "src/core/lib/iomgr/iocp_windows.h"
38 #include "src/core/lib/iomgr/sockaddr.h"
39 #include "src/core/lib/iomgr/sockaddr_utils.h"
40 #include "src/core/lib/iomgr/socket_windows.h"
41 #include "src/core/lib/iomgr/tcp_client.h"
42 #include "src/core/lib/iomgr/tcp_windows.h"
43 #include "src/core/lib/iomgr/timer.h"
44 #include "src/core/lib/slice/slice_internal.h"
45
46 #if defined(__MSYS__) && defined(GPR_ARCH_64)
47 /* Nasty workaround for nasty bug when using the 64 bits msys compiler
48 in conjunction with Microsoft Windows headers. */
49 #define GRPC_FIONBIO _IOW('f', 126, uint32_t)
50 #else
51 #define GRPC_FIONBIO FIONBIO
52 #endif
53
54 extern grpc_core::TraceFlag grpc_tcp_trace;
55
grpc_tcp_set_non_block(SOCKET sock)56 grpc_error* grpc_tcp_set_non_block(SOCKET sock) {
57 int status;
58 uint32_t param = 1;
59 DWORD ret;
60 status = WSAIoctl(sock, GRPC_FIONBIO, ¶m, sizeof(param), NULL, 0, &ret,
61 NULL, NULL);
62 return status == 0
63 ? GRPC_ERROR_NONE
64 : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)");
65 }
66
set_dualstack(SOCKET sock)67 static grpc_error* set_dualstack(SOCKET sock) {
68 int status;
69 unsigned long param = 0;
70 status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)¶m,
71 sizeof(param));
72 return status == 0
73 ? GRPC_ERROR_NONE
74 : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)");
75 }
76
enable_loopback_fast_path(SOCKET sock)77 static grpc_error* enable_loopback_fast_path(SOCKET sock) {
78 int status;
79 uint32_t param = 1;
80 DWORD ret;
81 status = WSAIoctl(sock, /*SIO_LOOPBACK_FAST_PATH==*/_WSAIOW(IOC_VENDOR, 16),
82 ¶m, sizeof(param), NULL, 0, &ret, 0, 0);
83 if (status == SOCKET_ERROR) {
84 status = WSAGetLastError();
85 }
86 return status == 0 || status == WSAEOPNOTSUPP
87 ? GRPC_ERROR_NONE
88 : GRPC_WSA_ERROR(status, "WSAIoctl(SIO_LOOPBACK_FAST_PATH)");
89 }
90
grpc_tcp_prepare_socket(SOCKET sock)91 grpc_error* grpc_tcp_prepare_socket(SOCKET sock) {
92 grpc_error* err;
93 err = grpc_tcp_set_non_block(sock);
94 if (err != GRPC_ERROR_NONE) return err;
95 err = set_dualstack(sock);
96 if (err != GRPC_ERROR_NONE) return err;
97 err = enable_loopback_fast_path(sock);
98 if (err != GRPC_ERROR_NONE) return err;
99 return GRPC_ERROR_NONE;
100 }
101
102 typedef struct grpc_tcp {
103 /* This is our C++ class derivation emulation. */
104 grpc_endpoint base;
105 /* The one socket this endpoint is using. */
106 grpc_winsocket* socket;
107 /* Refcounting how many operations are in progress. */
108 gpr_refcount refcount;
109
110 grpc_closure on_read;
111 grpc_closure on_write;
112
113 grpc_closure* read_cb;
114 grpc_closure* write_cb;
115 grpc_slice read_slice;
116 grpc_slice_buffer* write_slices;
117 grpc_slice_buffer* read_slices;
118
119 grpc_resource_user* resource_user;
120
121 /* The IO Completion Port runs from another thread. We need some mechanism
122 to protect ourselves when requesting a shutdown. */
123 gpr_mu mu;
124 int shutting_down;
125 grpc_error* shutdown_error;
126
127 char* peer_string;
128 } grpc_tcp;
129
tcp_free(grpc_tcp * tcp)130 static void tcp_free(grpc_tcp* tcp) {
131 grpc_winsocket_destroy(tcp->socket);
132 gpr_mu_destroy(&tcp->mu);
133 gpr_free(tcp->peer_string);
134 grpc_resource_user_unref(tcp->resource_user);
135 if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error);
136 gpr_free(tcp);
137 }
138
139 #ifndef NDEBUG
140 #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
141 #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
tcp_unref(grpc_tcp * tcp,const char * reason,const char * file,int line)142 static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file,
143 int line) {
144 if (grpc_tcp_trace.enabled()) {
145 gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
146 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
147 "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
148 val - 1);
149 }
150 if (gpr_unref(&tcp->refcount)) {
151 tcp_free(tcp);
152 }
153 }
154
tcp_ref(grpc_tcp * tcp,const char * reason,const char * file,int line)155 static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
156 int line) {
157 if (grpc_tcp_trace.enabled()) {
158 gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
159 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
160 "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
161 val + 1);
162 }
163 gpr_ref(&tcp->refcount);
164 }
165 #else
166 #define TCP_UNREF(tcp, reason) tcp_unref((tcp))
167 #define TCP_REF(tcp, reason) tcp_ref((tcp))
tcp_unref(grpc_tcp * tcp)168 static void tcp_unref(grpc_tcp* tcp) {
169 if (gpr_unref(&tcp->refcount)) {
170 tcp_free(tcp);
171 }
172 }
173
tcp_ref(grpc_tcp * tcp)174 static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
175 #endif
176
177 /* Asynchronous callback from the IOCP, or the background thread. */
on_read(void * tcpp,grpc_error * error)178 static void on_read(void* tcpp, grpc_error* error) {
179 grpc_tcp* tcp = (grpc_tcp*)tcpp;
180 grpc_closure* cb = tcp->read_cb;
181 grpc_winsocket* socket = tcp->socket;
182 grpc_slice sub;
183 grpc_winsocket_callback_info* info = &socket->read_info;
184
185 GRPC_ERROR_REF(error);
186
187 if (error == GRPC_ERROR_NONE) {
188 if (info->wsa_error != 0 && !tcp->shutting_down) {
189 char* utf8_message = gpr_format_message(info->wsa_error);
190 error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(utf8_message);
191 gpr_free(utf8_message);
192 grpc_slice_unref_internal(tcp->read_slice);
193 } else {
194 if (info->bytes_transfered != 0 && !tcp->shutting_down) {
195 sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
196 grpc_slice_buffer_add(tcp->read_slices, sub);
197 } else {
198 grpc_slice_unref_internal(tcp->read_slice);
199 error = tcp->shutting_down
200 ? GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
201 "TCP stream shutting down", &tcp->shutdown_error, 1)
202 : GRPC_ERROR_CREATE_FROM_STATIC_STRING("End of TCP stream");
203 }
204 }
205 }
206
207 tcp->read_cb = NULL;
208 TCP_UNREF(tcp, "read");
209 GRPC_CLOSURE_SCHED(cb, error);
210 }
211
win_read(grpc_endpoint * ep,grpc_slice_buffer * read_slices,grpc_closure * cb)212 static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
213 grpc_closure* cb) {
214 grpc_tcp* tcp = (grpc_tcp*)ep;
215 grpc_winsocket* handle = tcp->socket;
216 grpc_winsocket_callback_info* info = &handle->read_info;
217 int status;
218 DWORD bytes_read = 0;
219 DWORD flags = 0;
220 WSABUF buffer;
221
222 if (tcp->shutting_down) {
223 GRPC_CLOSURE_SCHED(
224 cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
225 "TCP socket is shutting down", &tcp->shutdown_error, 1));
226 return;
227 }
228
229 tcp->read_cb = cb;
230 tcp->read_slices = read_slices;
231 grpc_slice_buffer_reset_and_unref_internal(read_slices);
232
233 tcp->read_slice = GRPC_SLICE_MALLOC(8192);
234
235 buffer.len = (ULONG)GRPC_SLICE_LENGTH(
236 tcp->read_slice); // we know slice size fits in 32bit.
237 buffer.buf = (char*)GRPC_SLICE_START_PTR(tcp->read_slice);
238
239 TCP_REF(tcp, "read");
240
241 /* First let's try a synchronous, non-blocking read. */
242 status =
243 WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, NULL, NULL);
244 info->wsa_error = status == 0 ? 0 : WSAGetLastError();
245
246 /* Did we get data immediately ? Yay. */
247 if (info->wsa_error != WSAEWOULDBLOCK) {
248 info->bytes_transfered = bytes_read;
249 GRPC_CLOSURE_SCHED(&tcp->on_read, GRPC_ERROR_NONE);
250 return;
251 }
252
253 /* Otherwise, let's retry, by queuing a read. */
254 memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
255 status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
256 &info->overlapped, NULL);
257
258 if (status != 0) {
259 int wsa_error = WSAGetLastError();
260 if (wsa_error != WSA_IO_PENDING) {
261 info->wsa_error = wsa_error;
262 GRPC_CLOSURE_SCHED(&tcp->on_read,
263 GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
264 return;
265 }
266 }
267
268 grpc_socket_notify_on_read(tcp->socket, &tcp->on_read);
269 }
270
271 /* Asynchronous callback from the IOCP, or the background thread. */
on_write(void * tcpp,grpc_error * error)272 static void on_write(void* tcpp, grpc_error* error) {
273 grpc_tcp* tcp = (grpc_tcp*)tcpp;
274 grpc_winsocket* handle = tcp->socket;
275 grpc_winsocket_callback_info* info = &handle->write_info;
276 grpc_closure* cb;
277
278 GRPC_ERROR_REF(error);
279
280 gpr_mu_lock(&tcp->mu);
281 cb = tcp->write_cb;
282 tcp->write_cb = NULL;
283 gpr_mu_unlock(&tcp->mu);
284
285 if (error == GRPC_ERROR_NONE) {
286 if (info->wsa_error != 0) {
287 error = GRPC_WSA_ERROR(info->wsa_error, "WSASend");
288 } else {
289 GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
290 }
291 }
292
293 TCP_UNREF(tcp, "write");
294 GRPC_CLOSURE_SCHED(cb, error);
295 }
296
297 /* Initiates a write. */
win_write(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)298 static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
299 grpc_closure* cb, void* arg) {
300 grpc_tcp* tcp = (grpc_tcp*)ep;
301 grpc_winsocket* socket = tcp->socket;
302 grpc_winsocket_callback_info* info = &socket->write_info;
303 unsigned i;
304 DWORD bytes_sent;
305 int status;
306 WSABUF local_buffers[16];
307 WSABUF* allocated = NULL;
308 WSABUF* buffers = local_buffers;
309 size_t len;
310
311 if (tcp->shutting_down) {
312 GRPC_CLOSURE_SCHED(
313 cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
314 "TCP socket is shutting down", &tcp->shutdown_error, 1));
315 return;
316 }
317
318 tcp->write_cb = cb;
319 tcp->write_slices = slices;
320 GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
321 if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) {
322 buffers = (WSABUF*)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count);
323 allocated = buffers;
324 }
325
326 for (i = 0; i < tcp->write_slices->count; i++) {
327 len = GRPC_SLICE_LENGTH(tcp->write_slices->slices[i]);
328 GPR_ASSERT(len <= ULONG_MAX);
329 buffers[i].len = (ULONG)len;
330 buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->write_slices->slices[i]);
331 }
332
333 /* First, let's try a synchronous, non-blocking write. */
334 status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
335 &bytes_sent, 0, NULL, NULL);
336 info->wsa_error = status == 0 ? 0 : WSAGetLastError();
337
338 /* We would kind of expect to get a WSAEWOULDBLOCK here, especially on a busy
339 connection that has its send queue filled up. But if we don't, then we can
340 avoid doing an async write operation at all. */
341 if (info->wsa_error != WSAEWOULDBLOCK) {
342 grpc_error* error = status == 0
343 ? GRPC_ERROR_NONE
344 : GRPC_WSA_ERROR(info->wsa_error, "WSASend");
345 GRPC_CLOSURE_SCHED(cb, error);
346 if (allocated) gpr_free(allocated);
347 return;
348 }
349
350 TCP_REF(tcp, "write");
351
352 /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
353 operation, this time asynchronously. */
354 memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
355 status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
356 &bytes_sent, 0, &socket->write_info.overlapped, NULL);
357 if (allocated) gpr_free(allocated);
358
359 if (status != 0) {
360 int wsa_error = WSAGetLastError();
361 if (wsa_error != WSA_IO_PENDING) {
362 TCP_UNREF(tcp, "write");
363 GRPC_CLOSURE_SCHED(cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
364 return;
365 }
366 }
367
368 /* As all is now setup, we can now ask for the IOCP notification. It may
369 trigger the callback immediately however, but no matter. */
370 grpc_socket_notify_on_write(socket, &tcp->on_write);
371 }
372
win_add_to_pollset(grpc_endpoint * ep,grpc_pollset * ps)373 static void win_add_to_pollset(grpc_endpoint* ep, grpc_pollset* ps) {
374 grpc_tcp* tcp;
375 (void)ps;
376 tcp = (grpc_tcp*)ep;
377 grpc_iocp_add_socket(tcp->socket);
378 }
379
win_add_to_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pss)380 static void win_add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pss) {
381 grpc_tcp* tcp;
382 (void)pss;
383 tcp = (grpc_tcp*)ep;
384 grpc_iocp_add_socket(tcp->socket);
385 }
386
win_delete_from_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pss)387 static void win_delete_from_pollset_set(grpc_endpoint* ep,
388 grpc_pollset_set* pss) {}
389
390 /* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
391 for the potential read and write operations. It is up to the caller to
392 guarantee this isn't called in parallel to a read or write request, so
393 we're not going to protect against these. However the IO Completion Port
394 callback will happen from another thread, so we need to protect against
395 concurrent access of the data structure in that regard. */
win_shutdown(grpc_endpoint * ep,grpc_error * why)396 static void win_shutdown(grpc_endpoint* ep, grpc_error* why) {
397 grpc_tcp* tcp = (grpc_tcp*)ep;
398 gpr_mu_lock(&tcp->mu);
399 /* At that point, what may happen is that we're already inside the IOCP
400 callback. See the comments in on_read and on_write. */
401 if (!tcp->shutting_down) {
402 tcp->shutting_down = 1;
403 tcp->shutdown_error = why;
404 } else {
405 GRPC_ERROR_UNREF(why);
406 }
407 grpc_winsocket_shutdown(tcp->socket);
408 gpr_mu_unlock(&tcp->mu);
409 grpc_resource_user_shutdown(tcp->resource_user);
410 }
411
win_destroy(grpc_endpoint * ep)412 static void win_destroy(grpc_endpoint* ep) {
413 grpc_network_status_unregister_endpoint(ep);
414 grpc_tcp* tcp = (grpc_tcp*)ep;
415 TCP_UNREF(tcp, "destroy");
416 }
417
win_get_peer(grpc_endpoint * ep)418 static char* win_get_peer(grpc_endpoint* ep) {
419 grpc_tcp* tcp = (grpc_tcp*)ep;
420 return gpr_strdup(tcp->peer_string);
421 }
422
win_get_resource_user(grpc_endpoint * ep)423 static grpc_resource_user* win_get_resource_user(grpc_endpoint* ep) {
424 grpc_tcp* tcp = (grpc_tcp*)ep;
425 return tcp->resource_user;
426 }
427
win_get_fd(grpc_endpoint * ep)428 static int win_get_fd(grpc_endpoint* ep) { return -1; }
429
430 static grpc_endpoint_vtable vtable = {win_read,
431 win_write,
432 win_add_to_pollset,
433 win_add_to_pollset_set,
434 win_delete_from_pollset_set,
435 win_shutdown,
436 win_destroy,
437 win_get_resource_user,
438 win_get_peer,
439 win_get_fd};
440
grpc_tcp_create(grpc_winsocket * socket,grpc_channel_args * channel_args,const char * peer_string)441 grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
442 grpc_channel_args* channel_args,
443 const char* peer_string) {
444 grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
445 if (channel_args != NULL) {
446 for (size_t i = 0; i < channel_args->num_args; i++) {
447 if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
448 grpc_resource_quota_unref_internal(resource_quota);
449 resource_quota = grpc_resource_quota_ref_internal(
450 (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
451 }
452 }
453 }
454 grpc_tcp* tcp = (grpc_tcp*)gpr_malloc(sizeof(grpc_tcp));
455 memset(tcp, 0, sizeof(grpc_tcp));
456 tcp->base.vtable = &vtable;
457 tcp->socket = socket;
458 gpr_mu_init(&tcp->mu);
459 gpr_ref_init(&tcp->refcount, 1);
460 GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
461 GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
462 tcp->peer_string = gpr_strdup(peer_string);
463 tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
464 /* Tell network status tracking code about the new endpoint */
465 grpc_network_status_register_endpoint(&tcp->base);
466 grpc_resource_quota_unref_internal(resource_quota);
467
468 return &tcp->base;
469 }
470
471 #endif /* GRPC_WINSOCK_SOCKET */
472