• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include <limits.h>
26 
27 #include "src/core/lib/iomgr/network_status_tracker.h"
28 #include "src/core/lib/iomgr/sockaddr_windows.h"
29 
30 #include <grpc/slice_buffer.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/log_windows.h>
34 #include <grpc/support/string_util.h>
35 
36 #include "src/core/lib/gpr/useful.h"
37 #include "src/core/lib/iomgr/iocp_windows.h"
38 #include "src/core/lib/iomgr/sockaddr.h"
39 #include "src/core/lib/iomgr/sockaddr_utils.h"
40 #include "src/core/lib/iomgr/socket_windows.h"
41 #include "src/core/lib/iomgr/tcp_client.h"
42 #include "src/core/lib/iomgr/tcp_windows.h"
43 #include "src/core/lib/iomgr/timer.h"
44 #include "src/core/lib/slice/slice_internal.h"
45 
46 #if defined(__MSYS__) && defined(GPR_ARCH_64)
47 /* Nasty workaround for nasty bug when using the 64 bits msys compiler
48    in conjunction with Microsoft Windows headers. */
49 #define GRPC_FIONBIO _IOW('f', 126, uint32_t)
50 #else
51 #define GRPC_FIONBIO FIONBIO
52 #endif
53 
54 extern grpc_core::TraceFlag grpc_tcp_trace;
55 
grpc_tcp_set_non_block(SOCKET sock)56 grpc_error* grpc_tcp_set_non_block(SOCKET sock) {
57   int status;
58   uint32_t param = 1;
59   DWORD ret;
60   status = WSAIoctl(sock, GRPC_FIONBIO, &param, sizeof(param), NULL, 0, &ret,
61                     NULL, NULL);
62   return status == 0
63              ? GRPC_ERROR_NONE
64              : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)");
65 }
66 
set_dualstack(SOCKET sock)67 static grpc_error* set_dualstack(SOCKET sock) {
68   int status;
69   unsigned long param = 0;
70   status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&param,
71                       sizeof(param));
72   return status == 0
73              ? GRPC_ERROR_NONE
74              : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)");
75 }
76 
enable_loopback_fast_path(SOCKET sock)77 static grpc_error* enable_loopback_fast_path(SOCKET sock) {
78   int status;
79   uint32_t param = 1;
80   DWORD ret;
81   status = WSAIoctl(sock, /*SIO_LOOPBACK_FAST_PATH==*/_WSAIOW(IOC_VENDOR, 16),
82                     &param, sizeof(param), NULL, 0, &ret, 0, 0);
83   if (status == SOCKET_ERROR) {
84     status = WSAGetLastError();
85   }
86   return status == 0 || status == WSAEOPNOTSUPP
87              ? GRPC_ERROR_NONE
88              : GRPC_WSA_ERROR(status, "WSAIoctl(SIO_LOOPBACK_FAST_PATH)");
89 }
90 
grpc_tcp_prepare_socket(SOCKET sock)91 grpc_error* grpc_tcp_prepare_socket(SOCKET sock) {
92   grpc_error* err;
93   err = grpc_tcp_set_non_block(sock);
94   if (err != GRPC_ERROR_NONE) return err;
95   err = set_dualstack(sock);
96   if (err != GRPC_ERROR_NONE) return err;
97   err = enable_loopback_fast_path(sock);
98   if (err != GRPC_ERROR_NONE) return err;
99   return GRPC_ERROR_NONE;
100 }
101 
102 typedef struct grpc_tcp {
103   /* This is our C++ class derivation emulation. */
104   grpc_endpoint base;
105   /* The one socket this endpoint is using. */
106   grpc_winsocket* socket;
107   /* Refcounting how many operations are in progress. */
108   gpr_refcount refcount;
109 
110   grpc_closure on_read;
111   grpc_closure on_write;
112 
113   grpc_closure* read_cb;
114   grpc_closure* write_cb;
115   grpc_slice read_slice;
116   grpc_slice_buffer* write_slices;
117   grpc_slice_buffer* read_slices;
118 
119   grpc_resource_user* resource_user;
120 
121   /* The IO Completion Port runs from another thread. We need some mechanism
122      to protect ourselves when requesting a shutdown. */
123   gpr_mu mu;
124   int shutting_down;
125   grpc_error* shutdown_error;
126 
127   char* peer_string;
128 } grpc_tcp;
129 
tcp_free(grpc_tcp * tcp)130 static void tcp_free(grpc_tcp* tcp) {
131   grpc_winsocket_destroy(tcp->socket);
132   gpr_mu_destroy(&tcp->mu);
133   gpr_free(tcp->peer_string);
134   grpc_resource_user_unref(tcp->resource_user);
135   if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error);
136   gpr_free(tcp);
137 }
138 
139 #ifndef NDEBUG
140 #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
141 #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
tcp_unref(grpc_tcp * tcp,const char * reason,const char * file,int line)142 static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file,
143                       int line) {
144   if (grpc_tcp_trace.enabled()) {
145     gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
146     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
147             "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
148             val - 1);
149   }
150   if (gpr_unref(&tcp->refcount)) {
151     tcp_free(tcp);
152   }
153 }
154 
tcp_ref(grpc_tcp * tcp,const char * reason,const char * file,int line)155 static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
156                     int line) {
157   if (grpc_tcp_trace.enabled()) {
158     gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
159     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
160             "TCP   ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
161             val + 1);
162   }
163   gpr_ref(&tcp->refcount);
164 }
165 #else
166 #define TCP_UNREF(tcp, reason) tcp_unref((tcp))
167 #define TCP_REF(tcp, reason) tcp_ref((tcp))
tcp_unref(grpc_tcp * tcp)168 static void tcp_unref(grpc_tcp* tcp) {
169   if (gpr_unref(&tcp->refcount)) {
170     tcp_free(tcp);
171   }
172 }
173 
tcp_ref(grpc_tcp * tcp)174 static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
175 #endif
176 
177 /* Asynchronous callback from the IOCP, or the background thread. */
on_read(void * tcpp,grpc_error * error)178 static void on_read(void* tcpp, grpc_error* error) {
179   grpc_tcp* tcp = (grpc_tcp*)tcpp;
180   grpc_closure* cb = tcp->read_cb;
181   grpc_winsocket* socket = tcp->socket;
182   grpc_slice sub;
183   grpc_winsocket_callback_info* info = &socket->read_info;
184 
185   GRPC_ERROR_REF(error);
186 
187   if (error == GRPC_ERROR_NONE) {
188     if (info->wsa_error != 0 && !tcp->shutting_down) {
189       char* utf8_message = gpr_format_message(info->wsa_error);
190       error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(utf8_message);
191       gpr_free(utf8_message);
192       grpc_slice_unref_internal(tcp->read_slice);
193     } else {
194       if (info->bytes_transfered != 0 && !tcp->shutting_down) {
195         sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
196         grpc_slice_buffer_add(tcp->read_slices, sub);
197       } else {
198         grpc_slice_unref_internal(tcp->read_slice);
199         error = tcp->shutting_down
200                     ? GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
201                           "TCP stream shutting down", &tcp->shutdown_error, 1)
202                     : GRPC_ERROR_CREATE_FROM_STATIC_STRING("End of TCP stream");
203       }
204     }
205   }
206 
207   tcp->read_cb = NULL;
208   TCP_UNREF(tcp, "read");
209   GRPC_CLOSURE_SCHED(cb, error);
210 }
211 
win_read(grpc_endpoint * ep,grpc_slice_buffer * read_slices,grpc_closure * cb)212 static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
213                      grpc_closure* cb) {
214   grpc_tcp* tcp = (grpc_tcp*)ep;
215   grpc_winsocket* handle = tcp->socket;
216   grpc_winsocket_callback_info* info = &handle->read_info;
217   int status;
218   DWORD bytes_read = 0;
219   DWORD flags = 0;
220   WSABUF buffer;
221 
222   if (tcp->shutting_down) {
223     GRPC_CLOSURE_SCHED(
224         cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
225                 "TCP socket is shutting down", &tcp->shutdown_error, 1));
226     return;
227   }
228 
229   tcp->read_cb = cb;
230   tcp->read_slices = read_slices;
231   grpc_slice_buffer_reset_and_unref_internal(read_slices);
232 
233   tcp->read_slice = GRPC_SLICE_MALLOC(8192);
234 
235   buffer.len = (ULONG)GRPC_SLICE_LENGTH(
236       tcp->read_slice);  // we know slice size fits in 32bit.
237   buffer.buf = (char*)GRPC_SLICE_START_PTR(tcp->read_slice);
238 
239   TCP_REF(tcp, "read");
240 
241   /* First let's try a synchronous, non-blocking read. */
242   status =
243       WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, NULL, NULL);
244   info->wsa_error = status == 0 ? 0 : WSAGetLastError();
245 
246   /* Did we get data immediately ? Yay. */
247   if (info->wsa_error != WSAEWOULDBLOCK) {
248     info->bytes_transfered = bytes_read;
249     GRPC_CLOSURE_SCHED(&tcp->on_read, GRPC_ERROR_NONE);
250     return;
251   }
252 
253   /* Otherwise, let's retry, by queuing a read. */
254   memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
255   status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
256                    &info->overlapped, NULL);
257 
258   if (status != 0) {
259     int wsa_error = WSAGetLastError();
260     if (wsa_error != WSA_IO_PENDING) {
261       info->wsa_error = wsa_error;
262       GRPC_CLOSURE_SCHED(&tcp->on_read,
263                          GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
264       return;
265     }
266   }
267 
268   grpc_socket_notify_on_read(tcp->socket, &tcp->on_read);
269 }
270 
271 /* Asynchronous callback from the IOCP, or the background thread. */
on_write(void * tcpp,grpc_error * error)272 static void on_write(void* tcpp, grpc_error* error) {
273   grpc_tcp* tcp = (grpc_tcp*)tcpp;
274   grpc_winsocket* handle = tcp->socket;
275   grpc_winsocket_callback_info* info = &handle->write_info;
276   grpc_closure* cb;
277 
278   GRPC_ERROR_REF(error);
279 
280   gpr_mu_lock(&tcp->mu);
281   cb = tcp->write_cb;
282   tcp->write_cb = NULL;
283   gpr_mu_unlock(&tcp->mu);
284 
285   if (error == GRPC_ERROR_NONE) {
286     if (info->wsa_error != 0) {
287       error = GRPC_WSA_ERROR(info->wsa_error, "WSASend");
288     } else {
289       GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
290     }
291   }
292 
293   TCP_UNREF(tcp, "write");
294   GRPC_CLOSURE_SCHED(cb, error);
295 }
296 
297 /* Initiates a write. */
win_write(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)298 static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
299                       grpc_closure* cb, void* arg) {
300   grpc_tcp* tcp = (grpc_tcp*)ep;
301   grpc_winsocket* socket = tcp->socket;
302   grpc_winsocket_callback_info* info = &socket->write_info;
303   unsigned i;
304   DWORD bytes_sent;
305   int status;
306   WSABUF local_buffers[16];
307   WSABUF* allocated = NULL;
308   WSABUF* buffers = local_buffers;
309   size_t len;
310 
311   if (tcp->shutting_down) {
312     GRPC_CLOSURE_SCHED(
313         cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
314                 "TCP socket is shutting down", &tcp->shutdown_error, 1));
315     return;
316   }
317 
318   tcp->write_cb = cb;
319   tcp->write_slices = slices;
320   GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
321   if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) {
322     buffers = (WSABUF*)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count);
323     allocated = buffers;
324   }
325 
326   for (i = 0; i < tcp->write_slices->count; i++) {
327     len = GRPC_SLICE_LENGTH(tcp->write_slices->slices[i]);
328     GPR_ASSERT(len <= ULONG_MAX);
329     buffers[i].len = (ULONG)len;
330     buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->write_slices->slices[i]);
331   }
332 
333   /* First, let's try a synchronous, non-blocking write. */
334   status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
335                    &bytes_sent, 0, NULL, NULL);
336   info->wsa_error = status == 0 ? 0 : WSAGetLastError();
337 
338   /* We would kind of expect to get a WSAEWOULDBLOCK here, especially on a busy
339      connection that has its send queue filled up. But if we don't, then we can
340      avoid doing an async write operation at all. */
341   if (info->wsa_error != WSAEWOULDBLOCK) {
342     grpc_error* error = status == 0
343                             ? GRPC_ERROR_NONE
344                             : GRPC_WSA_ERROR(info->wsa_error, "WSASend");
345     GRPC_CLOSURE_SCHED(cb, error);
346     if (allocated) gpr_free(allocated);
347     return;
348   }
349 
350   TCP_REF(tcp, "write");
351 
352   /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
353      operation, this time asynchronously. */
354   memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
355   status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
356                    &bytes_sent, 0, &socket->write_info.overlapped, NULL);
357   if (allocated) gpr_free(allocated);
358 
359   if (status != 0) {
360     int wsa_error = WSAGetLastError();
361     if (wsa_error != WSA_IO_PENDING) {
362       TCP_UNREF(tcp, "write");
363       GRPC_CLOSURE_SCHED(cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
364       return;
365     }
366   }
367 
368   /* As all is now setup, we can now ask for the IOCP notification. It may
369      trigger the callback immediately however, but no matter. */
370   grpc_socket_notify_on_write(socket, &tcp->on_write);
371 }
372 
win_add_to_pollset(grpc_endpoint * ep,grpc_pollset * ps)373 static void win_add_to_pollset(grpc_endpoint* ep, grpc_pollset* ps) {
374   grpc_tcp* tcp;
375   (void)ps;
376   tcp = (grpc_tcp*)ep;
377   grpc_iocp_add_socket(tcp->socket);
378 }
379 
win_add_to_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pss)380 static void win_add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pss) {
381   grpc_tcp* tcp;
382   (void)pss;
383   tcp = (grpc_tcp*)ep;
384   grpc_iocp_add_socket(tcp->socket);
385 }
386 
win_delete_from_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pss)387 static void win_delete_from_pollset_set(grpc_endpoint* ep,
388                                         grpc_pollset_set* pss) {}
389 
390 /* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
391    for the potential read and write operations. It is up to the caller to
392    guarantee this isn't called in parallel to a read or write request, so
393    we're not going to protect against these. However the IO Completion Port
394    callback will happen from another thread, so we need to protect against
395    concurrent access of the data structure in that regard. */
win_shutdown(grpc_endpoint * ep,grpc_error * why)396 static void win_shutdown(grpc_endpoint* ep, grpc_error* why) {
397   grpc_tcp* tcp = (grpc_tcp*)ep;
398   gpr_mu_lock(&tcp->mu);
399   /* At that point, what may happen is that we're already inside the IOCP
400      callback. See the comments in on_read and on_write. */
401   if (!tcp->shutting_down) {
402     tcp->shutting_down = 1;
403     tcp->shutdown_error = why;
404   } else {
405     GRPC_ERROR_UNREF(why);
406   }
407   grpc_winsocket_shutdown(tcp->socket);
408   gpr_mu_unlock(&tcp->mu);
409   grpc_resource_user_shutdown(tcp->resource_user);
410 }
411 
win_destroy(grpc_endpoint * ep)412 static void win_destroy(grpc_endpoint* ep) {
413   grpc_network_status_unregister_endpoint(ep);
414   grpc_tcp* tcp = (grpc_tcp*)ep;
415   TCP_UNREF(tcp, "destroy");
416 }
417 
win_get_peer(grpc_endpoint * ep)418 static char* win_get_peer(grpc_endpoint* ep) {
419   grpc_tcp* tcp = (grpc_tcp*)ep;
420   return gpr_strdup(tcp->peer_string);
421 }
422 
win_get_resource_user(grpc_endpoint * ep)423 static grpc_resource_user* win_get_resource_user(grpc_endpoint* ep) {
424   grpc_tcp* tcp = (grpc_tcp*)ep;
425   return tcp->resource_user;
426 }
427 
win_get_fd(grpc_endpoint * ep)428 static int win_get_fd(grpc_endpoint* ep) { return -1; }
429 
430 static grpc_endpoint_vtable vtable = {win_read,
431                                       win_write,
432                                       win_add_to_pollset,
433                                       win_add_to_pollset_set,
434                                       win_delete_from_pollset_set,
435                                       win_shutdown,
436                                       win_destroy,
437                                       win_get_resource_user,
438                                       win_get_peer,
439                                       win_get_fd};
440 
grpc_tcp_create(grpc_winsocket * socket,grpc_channel_args * channel_args,const char * peer_string)441 grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
442                                grpc_channel_args* channel_args,
443                                const char* peer_string) {
444   grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
445   if (channel_args != NULL) {
446     for (size_t i = 0; i < channel_args->num_args; i++) {
447       if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
448         grpc_resource_quota_unref_internal(resource_quota);
449         resource_quota = grpc_resource_quota_ref_internal(
450             (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
451       }
452     }
453   }
454   grpc_tcp* tcp = (grpc_tcp*)gpr_malloc(sizeof(grpc_tcp));
455   memset(tcp, 0, sizeof(grpc_tcp));
456   tcp->base.vtable = &vtable;
457   tcp->socket = socket;
458   gpr_mu_init(&tcp->mu);
459   gpr_ref_init(&tcp->refcount, 1);
460   GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
461   GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
462   tcp->peer_string = gpr_strdup(peer_string);
463   tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
464   /* Tell network status tracking code about the new endpoint */
465   grpc_network_status_register_endpoint(&tcp->base);
466   grpc_resource_quota_unref_internal(resource_quota);
467 
468   return &tcp->base;
469 }
470 
471 #endif /* GRPC_WINSOCK_SOCKET */
472