1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include <grpc/slice_buffer.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/string_util.h>
28 #include <limits.h>
29
30 #include "absl/log/check.h"
31 #include "absl/log/log.h"
32 #include "src/core/lib/address_utils/sockaddr_utils.h"
33 #include "src/core/lib/iomgr/iocp_windows.h"
34 #include "src/core/lib/iomgr/sockaddr.h"
35 #include "src/core/lib/iomgr/sockaddr_windows.h"
36 #include "src/core/lib/iomgr/socket_windows.h"
37 #include "src/core/lib/iomgr/tcp_client.h"
38 #include "src/core/lib/iomgr/tcp_windows.h"
39 #include "src/core/lib/iomgr/timer.h"
40 #include "src/core/lib/slice/slice_internal.h"
41 #include "src/core/lib/slice/slice_string_helpers.h"
42 #include "src/core/util/crash.h"
43 #include "src/core/util/string.h"
44 #include "src/core/util/useful.h"
45
46 #if defined(__MSYS__) && defined(GPR_ARCH_64)
47 // Nasty workaround for nasty bug when using the 64 bits msys compiler
48 // in conjunction with Microsoft Windows headers.
49 #define GRPC_FIONBIO _IOW('f', 126, uint32_t)
50 #else
51 #define GRPC_FIONBIO FIONBIO
52 #endif
53
grpc_tcp_set_non_block(SOCKET sock)54 grpc_error_handle grpc_tcp_set_non_block(SOCKET sock) {
55 int status;
56 uint32_t param = 1;
57 DWORD ret;
58 status = WSAIoctl(sock, GRPC_FIONBIO, ¶m, sizeof(param), NULL, 0, &ret,
59 NULL, NULL);
60 return status == 0
61 ? absl::OkStatus()
62 : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)");
63 }
64
set_dualstack(SOCKET sock)65 static grpc_error_handle set_dualstack(SOCKET sock) {
66 int status;
67 unsigned long param = 0;
68 status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)¶m,
69 sizeof(param));
70 return status == 0
71 ? absl::OkStatus()
72 : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)");
73 }
74
enable_socket_low_latency(SOCKET sock)75 static grpc_error_handle enable_socket_low_latency(SOCKET sock) {
76 int status;
77 BOOL param = TRUE;
78 status = ::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
79 reinterpret_cast<char*>(¶m), sizeof(param));
80 if (status == SOCKET_ERROR) {
81 status = WSAGetLastError();
82 }
83 return status == 0 ? absl::OkStatus()
84 : GRPC_WSA_ERROR(status, "setsockopt(TCP_NODELAY)");
85 }
86
grpc_tcp_prepare_socket(SOCKET sock)87 grpc_error_handle grpc_tcp_prepare_socket(SOCKET sock) {
88 grpc_error_handle err;
89 err = grpc_tcp_set_non_block(sock);
90 if (!err.ok()) return err;
91 err = set_dualstack(sock);
92 if (!err.ok()) return err;
93 err = enable_socket_low_latency(sock);
94 if (!err.ok()) return err;
95 return absl::OkStatus();
96 }
97
98 typedef struct grpc_tcp {
99 // This is our C++ class derivation emulation.
100 grpc_endpoint base;
101 // The one socket this endpoint is using.
102 grpc_winsocket* socket;
103 // Refcounting how many operations are in progress.
104 gpr_refcount refcount;
105
106 grpc_closure on_read;
107 grpc_closure on_write;
108
109 grpc_closure* read_cb;
110 grpc_closure* write_cb;
111
112 // garbage after the last read
113 grpc_slice_buffer last_read_buffer;
114
115 grpc_slice_buffer* write_slices;
116 grpc_slice_buffer* read_slices;
117
118 // The IO Completion Port runs from another thread. We need some mechanism
119 // to protect ourselves when requesting a shutdown.
120 gpr_mu mu;
121 int shutting_down;
122
123 std::string peer_string;
124 std::string local_address;
125 } grpc_tcp;
126
tcp_free(grpc_tcp * tcp)127 static void tcp_free(grpc_tcp* tcp) {
128 grpc_winsocket_destroy(tcp->socket);
129 gpr_mu_destroy(&tcp->mu);
130 grpc_slice_buffer_destroy(&tcp->last_read_buffer);
131 delete tcp;
132 }
133
134 #ifndef NDEBUG
135 #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
136 #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
tcp_unref(grpc_tcp * tcp,const char * reason,const char * file,int line)137 static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file,
138 int line) {
139 if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
140 gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
141 VLOG(2).AtLocation(file, line) << "TCP unref " << tcp << " : " << reason
142 << " " << val << " -> " << val - 1;
143 }
144 if (gpr_unref(&tcp->refcount)) {
145 tcp_free(tcp);
146 }
147 }
148
tcp_ref(grpc_tcp * tcp,const char * reason,const char * file,int line)149 static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
150 int line) {
151 if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
152 gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
153 VLOG(2).AtLocation(file, line) << "TCP ref " << tcp << " : " << reason
154 << " " << val << " -> " << val + 1;
155 }
156 gpr_ref(&tcp->refcount);
157 }
158 #else
159 #define TCP_UNREF(tcp, reason) tcp_unref((tcp))
160 #define TCP_REF(tcp, reason) tcp_ref((tcp))
tcp_unref(grpc_tcp * tcp)161 static void tcp_unref(grpc_tcp* tcp) {
162 if (gpr_unref(&tcp->refcount)) {
163 tcp_free(tcp);
164 }
165 }
166
tcp_ref(grpc_tcp * tcp)167 static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
168 #endif
169
170 // Asynchronous callback from the IOCP, or the background thread.
on_read(void * tcpp,grpc_error_handle error)171 static void on_read(void* tcpp, grpc_error_handle error) {
172 grpc_tcp* tcp = (grpc_tcp*)tcpp;
173 grpc_closure* cb = tcp->read_cb;
174 grpc_winsocket* socket = tcp->socket;
175 grpc_winsocket_callback_info* info = &socket->read_info;
176
177 GRPC_TRACE_LOG(tcp, INFO) << "TCP:" << tcp << " on_read";
178
179 if (error.ok()) {
180 if (info->wsa_error != 0 && !tcp->shutting_down) {
181 error = GRPC_WSA_ERROR(info->wsa_error, "IOCP/Socket");
182 grpc_slice_buffer_reset_and_unref(tcp->read_slices);
183 } else {
184 if (info->bytes_transferred != 0 && !tcp->shutting_down) {
185 CHECK((size_t)info->bytes_transferred <= tcp->read_slices->length);
186 if (static_cast<size_t>(info->bytes_transferred) !=
187 tcp->read_slices->length) {
188 grpc_slice_buffer_trim_end(
189 tcp->read_slices,
190 tcp->read_slices->length -
191 static_cast<size_t>(info->bytes_transferred),
192 &tcp->last_read_buffer);
193 }
194 CHECK((size_t)info->bytes_transferred == tcp->read_slices->length);
195
196 if (GRPC_TRACE_FLAG_ENABLED(tcp) && ABSL_VLOG_IS_ON(2)) {
197 size_t i;
198 for (i = 0; i < tcp->read_slices->count; i++) {
199 char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
200 GPR_DUMP_HEX | GPR_DUMP_ASCII);
201 VLOG(2) << "READ " << tcp << " (peer=" << tcp->peer_string
202 << "): " << dump;
203 gpr_free(dump);
204 }
205 }
206 } else {
207 GRPC_TRACE_LOG(tcp, INFO) << "TCP:" << tcp << " unref read_slice";
208 grpc_slice_buffer_reset_and_unref(tcp->read_slices);
209 error = grpc_error_set_int(
210 tcp->shutting_down ? GRPC_ERROR_CREATE("TCP stream shutting down")
211 : GRPC_ERROR_CREATE("End of TCP stream"),
212 grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE);
213 }
214 }
215 }
216
217 tcp->read_cb = NULL;
218 TCP_UNREF(tcp, "read");
219 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
220 }
221
222 #define DEFAULT_TARGET_READ_SIZE 8192
223 #define MAX_WSABUF_COUNT 16
win_read(grpc_endpoint * ep,grpc_slice_buffer * read_slices,grpc_closure * cb,bool,int)224 static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
225 grpc_closure* cb, bool /* urgent */,
226 int /* min_progress_size */) {
227 grpc_tcp* tcp = (grpc_tcp*)ep;
228 grpc_winsocket* handle = tcp->socket;
229 grpc_winsocket_callback_info* info = &handle->read_info;
230 int status;
231 DWORD bytes_read = 0;
232 DWORD flags = 0;
233 WSABUF buffers[MAX_WSABUF_COUNT];
234 size_t i;
235
236 GRPC_TRACE_LOG(tcp, INFO) << "TCP:" << tcp << " win_read";
237
238 if (tcp->shutting_down) {
239 grpc_core::ExecCtx::Run(
240 DEBUG_LOCATION, cb,
241 grpc_error_set_int(GRPC_ERROR_CREATE("TCP socket is shutting down"),
242 grpc_core::StatusIntProperty::kRpcStatus,
243 GRPC_STATUS_UNAVAILABLE));
244 return;
245 }
246
247 tcp->read_cb = cb;
248 tcp->read_slices = read_slices;
249 grpc_slice_buffer_reset_and_unref(read_slices);
250 grpc_slice_buffer_swap(read_slices, &tcp->last_read_buffer);
251
252 if (tcp->read_slices->length < DEFAULT_TARGET_READ_SIZE / 2 &&
253 tcp->read_slices->count < MAX_WSABUF_COUNT) {
254 // TODO(jtattermusch): slice should be allocated using resource quota
255 grpc_slice_buffer_add(tcp->read_slices,
256 GRPC_SLICE_MALLOC(DEFAULT_TARGET_READ_SIZE));
257 }
258
259 CHECK(tcp->read_slices->count <= MAX_WSABUF_COUNT);
260 for (i = 0; i < tcp->read_slices->count; i++) {
261 buffers[i].len = (ULONG)GRPC_SLICE_LENGTH(
262 tcp->read_slices->slices[i]); // we know slice size fits in 32bit.
263 buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[i]);
264 }
265
266 TCP_REF(tcp, "read");
267
268 // First let's try a synchronous, non-blocking read.
269 status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count,
270 &bytes_read, &flags, NULL, NULL);
271 info->wsa_error = status == 0 ? 0 : WSAGetLastError();
272
273 // Did we get data immediately ? Yay.
274 if (info->wsa_error != WSAEWOULDBLOCK) {
275 info->bytes_transferred = bytes_read;
276 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read, absl::OkStatus());
277 return;
278 }
279
280 // Otherwise, let's retry, by queuing a read.
281 memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
282 status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count,
283 &bytes_read, &flags, &info->overlapped, NULL);
284
285 if (status != 0) {
286 int wsa_error = WSAGetLastError();
287 if (wsa_error != WSA_IO_PENDING) {
288 info->wsa_error = wsa_error;
289 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read,
290 GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
291 return;
292 }
293 }
294
295 grpc_socket_notify_on_read(tcp->socket, &tcp->on_read);
296 }
297
298 // Asynchronous callback from the IOCP, or the background thread.
on_write(void * tcpp,grpc_error_handle error)299 static void on_write(void* tcpp, grpc_error_handle error) {
300 grpc_tcp* tcp = (grpc_tcp*)tcpp;
301 grpc_winsocket* handle = tcp->socket;
302 grpc_winsocket_callback_info* info = &handle->write_info;
303 grpc_closure* cb;
304
305 GRPC_TRACE_LOG(tcp, INFO) << "TCP:" << tcp << " on_write";
306
307 gpr_mu_lock(&tcp->mu);
308 cb = tcp->write_cb;
309 tcp->write_cb = NULL;
310 gpr_mu_unlock(&tcp->mu);
311
312 if (error.ok()) {
313 if (info->wsa_error != 0) {
314 error = GRPC_WSA_ERROR(info->wsa_error, "WSASend");
315 } else {
316 CHECK(info->bytes_transferred <= tcp->write_slices->length);
317 }
318 }
319
320 TCP_UNREF(tcp, "write");
321 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
322 }
323
324 // Initiates a write.
win_write(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void *,int)325 static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
326 grpc_closure* cb, void* /* arg */,
327 int /* max_frame_size */) {
328 grpc_tcp* tcp = (grpc_tcp*)ep;
329 grpc_winsocket* socket = tcp->socket;
330 grpc_winsocket_callback_info* info = &socket->write_info;
331 unsigned i;
332 DWORD bytes_sent;
333 int status;
334 WSABUF local_buffers[MAX_WSABUF_COUNT];
335 WSABUF* allocated = NULL;
336 WSABUF* buffers = local_buffers;
337 size_t len, async_buffers_offset = 0;
338
339 if (GRPC_TRACE_FLAG_ENABLED(tcp) && ABSL_VLOG_IS_ON(2)) {
340 size_t i;
341 for (i = 0; i < slices->count; i++) {
342 char* data =
343 grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
344 VLOG(2) << "WRITE " << tcp << " (peer=" << tcp->peer_string
345 << "): " << data;
346 gpr_free(data);
347 }
348 }
349
350 if (tcp->shutting_down) {
351 grpc_core::ExecCtx::Run(
352 DEBUG_LOCATION, cb,
353 grpc_error_set_int(GRPC_ERROR_CREATE("TCP socket is shutting down"),
354 grpc_core::StatusIntProperty::kRpcStatus,
355 GRPC_STATUS_UNAVAILABLE));
356 return;
357 }
358
359 tcp->write_cb = cb;
360 tcp->write_slices = slices;
361 CHECK(tcp->write_slices->count <= UINT_MAX);
362 if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) {
363 buffers = (WSABUF*)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count);
364 allocated = buffers;
365 }
366
367 for (i = 0; i < tcp->write_slices->count; i++) {
368 len = GRPC_SLICE_LENGTH(tcp->write_slices->slices[i]);
369 CHECK(len <= ULONG_MAX);
370 buffers[i].len = (ULONG)len;
371 buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->write_slices->slices[i]);
372 }
373
374 // First, let's try a synchronous, non-blocking write.
375 status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
376 &bytes_sent, 0, NULL, NULL);
377
378 if (status == 0) {
379 if (bytes_sent == tcp->write_slices->length) {
380 info->wsa_error = 0;
381 grpc_error_handle error = absl::OkStatus();
382 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
383 if (allocated) gpr_free(allocated);
384 return;
385 }
386
387 // The data was not completely delivered, we should send the rest of
388 // them by doing an async write operation.
389 for (i = 0; i < tcp->write_slices->count; i++) {
390 if (buffers[i].len > bytes_sent) {
391 buffers[i].buf += bytes_sent;
392 buffers[i].len -= bytes_sent;
393 break;
394 }
395 bytes_sent -= buffers[i].len;
396 async_buffers_offset++;
397 }
398 } else {
399 info->wsa_error = WSAGetLastError();
400
401 // We would kind of expect to get a WSAEWOULDBLOCK here, especially on a
402 // busy connection that has its send queue filled up. But if we don't, then
403 // we can avoid doing an async write operation at all.
404 if (info->wsa_error != WSAEWOULDBLOCK) {
405 grpc_error_handle error = GRPC_WSA_ERROR(info->wsa_error, "WSASend");
406 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
407 if (allocated) gpr_free(allocated);
408 return;
409 }
410 }
411
412 TCP_REF(tcp, "write");
413
414 // If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
415 // operation, this time asynchronously.
416 memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
417 status = WSASend(socket->socket, buffers + async_buffers_offset,
418 (DWORD)(tcp->write_slices->count - async_buffers_offset),
419 NULL, 0, &socket->write_info.overlapped, NULL);
420 if (allocated) gpr_free(allocated);
421
422 if (status != 0) {
423 int wsa_error = WSAGetLastError();
424 if (wsa_error != WSA_IO_PENDING) {
425 TCP_UNREF(tcp, "write");
426 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb,
427 GRPC_WSA_ERROR(wsa_error, "WSASend"));
428 return;
429 }
430 }
431
432 // As all is now setup, we can now ask for the IOCP notification. It may
433 // trigger the callback immediately however, but no matter.
434 grpc_socket_notify_on_write(socket, &tcp->on_write);
435 }
436
win_add_to_pollset(grpc_endpoint * ep,grpc_pollset * ps)437 static void win_add_to_pollset(grpc_endpoint* ep, grpc_pollset* ps) {
438 grpc_tcp* tcp;
439 (void)ps;
440 tcp = (grpc_tcp*)ep;
441 grpc_iocp_add_socket(tcp->socket);
442 }
443
win_add_to_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pss)444 static void win_add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pss) {
445 grpc_tcp* tcp;
446 (void)pss;
447 tcp = (grpc_tcp*)ep;
448 grpc_iocp_add_socket(tcp->socket);
449 }
450
win_delete_from_pollset_set(grpc_endpoint *,grpc_pollset_set *)451 static void win_delete_from_pollset_set(grpc_endpoint* /* ep */,
452 grpc_pollset_set* /* pss */) {}
453
454 // Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
455 // for the potential read and write operations. It is up to the caller to
456 // guarantee this isn't called in parallel to a read or write request, so
457 // we're not going to protect against these. However the IO Completion Port
458 // callback will happen from another thread, so we need to protect against
459 // concurrent access of the data structure in that regard.
win_destroy(grpc_endpoint * ep)460 static void win_destroy(grpc_endpoint* ep) {
461 grpc_tcp* tcp = (grpc_tcp*)ep;
462 gpr_mu_lock(&tcp->mu);
463 // At that point, what may happen is that we're already inside the IOCP
464 // callback. See the comments in on_read and on_write.
465 tcp->shutting_down = 1;
466 grpc_winsocket_shutdown(tcp->socket);
467 gpr_mu_unlock(&tcp->mu);
468 grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
469 TCP_UNREF(tcp, "destroy");
470 }
471
win_get_peer(grpc_endpoint * ep)472 static absl::string_view win_get_peer(grpc_endpoint* ep) {
473 grpc_tcp* tcp = (grpc_tcp*)ep;
474 return tcp->peer_string;
475 }
476
win_get_local_address(grpc_endpoint * ep)477 static absl::string_view win_get_local_address(grpc_endpoint* ep) {
478 grpc_tcp* tcp = (grpc_tcp*)ep;
479 return tcp->local_address;
480 }
481
win_get_fd(grpc_endpoint *)482 static int win_get_fd(grpc_endpoint* /* ep */) { return -1; }
483
win_can_track_err(grpc_endpoint *)484 static bool win_can_track_err(grpc_endpoint* /* ep */) { return false; }
485
486 static grpc_endpoint_vtable vtable = {win_read,
487 win_write,
488 win_add_to_pollset,
489 win_add_to_pollset_set,
490 win_delete_from_pollset_set,
491 win_destroy,
492 win_get_peer,
493 win_get_local_address,
494 win_get_fd,
495 win_can_track_err};
496
grpc_tcp_create(grpc_winsocket * socket,absl::string_view peer_string)497 grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
498 absl::string_view peer_string) {
499 // TODO(jtattermusch): C++ize grpc_tcp and its dependencies (i.e. add
500 // constructors) to ensure proper initialization
501 grpc_tcp* tcp = new grpc_tcp{};
502 tcp->base.vtable = &vtable;
503 tcp->socket = socket;
504 gpr_mu_init(&tcp->mu);
505 gpr_ref_init(&tcp->refcount, 1);
506 GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
507 GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
508 grpc_resolved_address resolved_local_addr;
509 resolved_local_addr.len = sizeof(resolved_local_addr.addr);
510 absl::StatusOr<std::string> addr_uri;
511 if (getsockname(tcp->socket->socket,
512 reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
513 &resolved_local_addr.len) < 0 ||
514 !(addr_uri = grpc_sockaddr_to_uri(&resolved_local_addr)).ok()) {
515 tcp->local_address = "";
516 } else {
517 tcp->local_address = grpc_sockaddr_to_uri(&resolved_local_addr).value();
518 }
519 tcp->peer_string = std::string(peer_string);
520 grpc_slice_buffer_init(&tcp->last_read_buffer);
521 return &tcp->base;
522 }
523
524 #endif // GRPC_WINSOCK_SOCKET
525