1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include <limits.h>
26
27 #include "src/core/lib/iomgr/sockaddr_windows.h"
28
29 #include <grpc/slice_buffer.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/log_windows.h>
33 #include <grpc/support/string_util.h>
34
35 #include "src/core/lib/gpr/useful.h"
36 #include "src/core/lib/iomgr/iocp_windows.h"
37 #include "src/core/lib/iomgr/sockaddr.h"
38 #include "src/core/lib/iomgr/sockaddr_utils.h"
39 #include "src/core/lib/iomgr/socket_windows.h"
40 #include "src/core/lib/iomgr/tcp_client.h"
41 #include "src/core/lib/iomgr/tcp_windows.h"
42 #include "src/core/lib/iomgr/timer.h"
43 #include "src/core/lib/slice/slice_internal.h"
44 #include "src/core/lib/slice/slice_string_helpers.h"
45
46 #if defined(__MSYS__) && defined(GPR_ARCH_64)
47 /* Nasty workaround for nasty bug when using the 64 bits msys compiler
48 in conjunction with Microsoft Windows headers. */
49 #define GRPC_FIONBIO _IOW('f', 126, uint32_t)
50 #else
51 #define GRPC_FIONBIO FIONBIO
52 #endif
53
54 extern grpc_core::TraceFlag grpc_tcp_trace;
55
grpc_tcp_set_non_block(SOCKET sock)56 grpc_error* grpc_tcp_set_non_block(SOCKET sock) {
57 int status;
58 uint32_t param = 1;
59 DWORD ret;
60 status = WSAIoctl(sock, GRPC_FIONBIO, ¶m, sizeof(param), NULL, 0, &ret,
61 NULL, NULL);
62 return status == 0
63 ? GRPC_ERROR_NONE
64 : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)");
65 }
66
set_dualstack(SOCKET sock)67 static grpc_error* set_dualstack(SOCKET sock) {
68 int status;
69 unsigned long param = 0;
70 status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)¶m,
71 sizeof(param));
72 return status == 0
73 ? GRPC_ERROR_NONE
74 : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)");
75 }
76
enable_socket_low_latency(SOCKET sock)77 static grpc_error* enable_socket_low_latency(SOCKET sock) {
78 int status;
79 BOOL param = TRUE;
80 status = ::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
81 reinterpret_cast<char*>(¶m), sizeof(param));
82 if (status == SOCKET_ERROR) {
83 status = WSAGetLastError();
84 }
85 return status == 0 ? GRPC_ERROR_NONE
86 : GRPC_WSA_ERROR(status, "setsockopt(TCP_NODELAY)");
87 }
88
grpc_tcp_prepare_socket(SOCKET sock)89 grpc_error* grpc_tcp_prepare_socket(SOCKET sock) {
90 grpc_error* err;
91 err = grpc_tcp_set_non_block(sock);
92 if (err != GRPC_ERROR_NONE) return err;
93 err = set_dualstack(sock);
94 if (err != GRPC_ERROR_NONE) return err;
95 err = enable_socket_low_latency(sock);
96 if (err != GRPC_ERROR_NONE) return err;
97 return GRPC_ERROR_NONE;
98 }
99
100 typedef struct grpc_tcp {
101 /* This is our C++ class derivation emulation. */
102 grpc_endpoint base;
103 /* The one socket this endpoint is using. */
104 grpc_winsocket* socket;
105 /* Refcounting how many operations are in progress. */
106 gpr_refcount refcount;
107
108 grpc_closure on_read;
109 grpc_closure on_write;
110
111 grpc_closure* read_cb;
112 grpc_closure* write_cb;
113
114 /* garbage after the last read */
115 grpc_slice_buffer last_read_buffer;
116
117 grpc_slice_buffer* write_slices;
118 grpc_slice_buffer* read_slices;
119
120 grpc_resource_user* resource_user;
121
122 /* The IO Completion Port runs from another thread. We need some mechanism
123 to protect ourselves when requesting a shutdown. */
124 gpr_mu mu;
125 int shutting_down;
126 grpc_error* shutdown_error;
127
128 std::string peer_string;
129 std::string local_address;
130 } grpc_tcp;
131
tcp_free(grpc_tcp * tcp)132 static void tcp_free(grpc_tcp* tcp) {
133 grpc_winsocket_destroy(tcp->socket);
134 gpr_mu_destroy(&tcp->mu);
135 grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
136 grpc_resource_user_unref(tcp->resource_user);
137 if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error);
138 delete tcp;
139 }
140
141 #ifndef NDEBUG
142 #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
143 #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
tcp_unref(grpc_tcp * tcp,const char * reason,const char * file,int line)144 static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file,
145 int line) {
146 if (grpc_tcp_trace.enabled()) {
147 gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
148 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
149 "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
150 val - 1);
151 }
152 if (gpr_unref(&tcp->refcount)) {
153 tcp_free(tcp);
154 }
155 }
156
tcp_ref(grpc_tcp * tcp,const char * reason,const char * file,int line)157 static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
158 int line) {
159 if (grpc_tcp_trace.enabled()) {
160 gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
161 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
162 "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
163 val + 1);
164 }
165 gpr_ref(&tcp->refcount);
166 }
167 #else
168 #define TCP_UNREF(tcp, reason) tcp_unref((tcp))
169 #define TCP_REF(tcp, reason) tcp_ref((tcp))
tcp_unref(grpc_tcp * tcp)170 static void tcp_unref(grpc_tcp* tcp) {
171 if (gpr_unref(&tcp->refcount)) {
172 tcp_free(tcp);
173 }
174 }
175
tcp_ref(grpc_tcp * tcp)176 static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
177 #endif
178
179 /* Asynchronous callback from the IOCP, or the background thread. */
on_read(void * tcpp,grpc_error * error)180 static void on_read(void* tcpp, grpc_error* error) {
181 grpc_tcp* tcp = (grpc_tcp*)tcpp;
182 grpc_closure* cb = tcp->read_cb;
183 grpc_winsocket* socket = tcp->socket;
184 grpc_winsocket_callback_info* info = &socket->read_info;
185
186 if (grpc_tcp_trace.enabled()) {
187 gpr_log(GPR_INFO, "TCP:%p on_read", tcp);
188 }
189
190 GRPC_ERROR_REF(error);
191
192 if (error == GRPC_ERROR_NONE) {
193 if (info->wsa_error != 0 && !tcp->shutting_down) {
194 char* utf8_message = gpr_format_message(info->wsa_error);
195 error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(utf8_message);
196 gpr_free(utf8_message);
197 grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
198 } else {
199 if (info->bytes_transferred != 0 && !tcp->shutting_down) {
200 GPR_ASSERT((size_t)info->bytes_transferred <= tcp->read_slices->length);
201 if (static_cast<size_t>(info->bytes_transferred) !=
202 tcp->read_slices->length) {
203 grpc_slice_buffer_trim_end(
204 tcp->read_slices,
205 tcp->read_slices->length -
206 static_cast<size_t>(info->bytes_transferred),
207 &tcp->last_read_buffer);
208 }
209 GPR_ASSERT((size_t)info->bytes_transferred == tcp->read_slices->length);
210
211 if (grpc_tcp_trace.enabled()) {
212 size_t i;
213 for (i = 0; i < tcp->read_slices->count; i++) {
214 char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
215 GPR_DUMP_HEX | GPR_DUMP_ASCII);
216 gpr_log(GPR_INFO, "READ %p (peer=%s): %s", tcp,
217 tcp->peer_string.c_str(), dump);
218 gpr_free(dump);
219 }
220 }
221 } else {
222 if (grpc_tcp_trace.enabled()) {
223 gpr_log(GPR_INFO, "TCP:%p unref read_slice", tcp);
224 }
225 grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
226 error = tcp->shutting_down
227 ? GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
228 "TCP stream shutting down", &tcp->shutdown_error, 1)
229 : GRPC_ERROR_CREATE_FROM_STATIC_STRING("End of TCP stream");
230 }
231 }
232 }
233
234 tcp->read_cb = NULL;
235 TCP_UNREF(tcp, "read");
236 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
237 }
238
239 #define DEFAULT_TARGET_READ_SIZE 8192
240 #define MAX_WSABUF_COUNT 16
win_read(grpc_endpoint * ep,grpc_slice_buffer * read_slices,grpc_closure * cb,bool urgent)241 static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
242 grpc_closure* cb, bool urgent) {
243 grpc_tcp* tcp = (grpc_tcp*)ep;
244 grpc_winsocket* handle = tcp->socket;
245 grpc_winsocket_callback_info* info = &handle->read_info;
246 int status;
247 DWORD bytes_read = 0;
248 DWORD flags = 0;
249 WSABUF buffers[MAX_WSABUF_COUNT];
250 size_t i;
251
252 if (grpc_tcp_trace.enabled()) {
253 gpr_log(GPR_INFO, "TCP:%p win_read", tcp);
254 }
255
256 if (tcp->shutting_down) {
257 grpc_core::ExecCtx::Run(
258 DEBUG_LOCATION, cb,
259 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
260 "TCP socket is shutting down", &tcp->shutdown_error, 1));
261 return;
262 }
263
264 tcp->read_cb = cb;
265 tcp->read_slices = read_slices;
266 grpc_slice_buffer_reset_and_unref_internal(read_slices);
267 grpc_slice_buffer_swap(read_slices, &tcp->last_read_buffer);
268
269 if (tcp->read_slices->length < DEFAULT_TARGET_READ_SIZE / 2 &&
270 tcp->read_slices->count < MAX_WSABUF_COUNT) {
271 // TODO(jtattermusch): slice should be allocated using resource quota
272 grpc_slice_buffer_add(tcp->read_slices,
273 GRPC_SLICE_MALLOC(DEFAULT_TARGET_READ_SIZE));
274 }
275
276 GPR_ASSERT(tcp->read_slices->count <= MAX_WSABUF_COUNT);
277 for (i = 0; i < tcp->read_slices->count; i++) {
278 buffers[i].len = (ULONG)GRPC_SLICE_LENGTH(
279 tcp->read_slices->slices[i]); // we know slice size fits in 32bit.
280 buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[i]);
281 }
282
283 TCP_REF(tcp, "read");
284
285 /* First let's try a synchronous, non-blocking read. */
286 status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count,
287 &bytes_read, &flags, NULL, NULL);
288 info->wsa_error = status == 0 ? 0 : WSAGetLastError();
289
290 /* Did we get data immediately ? Yay. */
291 if (info->wsa_error != WSAEWOULDBLOCK) {
292 info->bytes_transferred = bytes_read;
293 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read, GRPC_ERROR_NONE);
294 return;
295 }
296
297 /* Otherwise, let's retry, by queuing a read. */
298 memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
299 status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count,
300 &bytes_read, &flags, &info->overlapped, NULL);
301
302 if (status != 0) {
303 int wsa_error = WSAGetLastError();
304 if (wsa_error != WSA_IO_PENDING) {
305 info->wsa_error = wsa_error;
306 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read,
307 GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
308 return;
309 }
310 }
311
312 grpc_socket_notify_on_read(tcp->socket, &tcp->on_read);
313 }
314
315 /* Asynchronous callback from the IOCP, or the background thread. */
on_write(void * tcpp,grpc_error * error)316 static void on_write(void* tcpp, grpc_error* error) {
317 grpc_tcp* tcp = (grpc_tcp*)tcpp;
318 grpc_winsocket* handle = tcp->socket;
319 grpc_winsocket_callback_info* info = &handle->write_info;
320 grpc_closure* cb;
321
322 if (grpc_tcp_trace.enabled()) {
323 gpr_log(GPR_INFO, "TCP:%p on_write", tcp);
324 }
325
326 GRPC_ERROR_REF(error);
327
328 gpr_mu_lock(&tcp->mu);
329 cb = tcp->write_cb;
330 tcp->write_cb = NULL;
331 gpr_mu_unlock(&tcp->mu);
332
333 if (error == GRPC_ERROR_NONE) {
334 if (info->wsa_error != 0) {
335 error = GRPC_WSA_ERROR(info->wsa_error, "WSASend");
336 } else {
337 GPR_ASSERT(info->bytes_transferred == tcp->write_slices->length);
338 }
339 }
340
341 TCP_UNREF(tcp, "write");
342 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
343 }
344
345 /* Initiates a write. */
win_write(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)346 static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
347 grpc_closure* cb, void* arg) {
348 grpc_tcp* tcp = (grpc_tcp*)ep;
349 grpc_winsocket* socket = tcp->socket;
350 grpc_winsocket_callback_info* info = &socket->write_info;
351 unsigned i;
352 DWORD bytes_sent;
353 int status;
354 WSABUF local_buffers[MAX_WSABUF_COUNT];
355 WSABUF* allocated = NULL;
356 WSABUF* buffers = local_buffers;
357 size_t len;
358
359 if (grpc_tcp_trace.enabled()) {
360 size_t i;
361 for (i = 0; i < slices->count; i++) {
362 char* data =
363 grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
364 gpr_log(GPR_INFO, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string.c_str(),
365 data);
366 gpr_free(data);
367 }
368 }
369
370 if (tcp->shutting_down) {
371 grpc_core::ExecCtx::Run(
372 DEBUG_LOCATION, cb,
373 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
374 "TCP socket is shutting down", &tcp->shutdown_error, 1));
375 return;
376 }
377
378 tcp->write_cb = cb;
379 tcp->write_slices = slices;
380 GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
381 if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) {
382 buffers = (WSABUF*)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count);
383 allocated = buffers;
384 }
385
386 for (i = 0; i < tcp->write_slices->count; i++) {
387 len = GRPC_SLICE_LENGTH(tcp->write_slices->slices[i]);
388 GPR_ASSERT(len <= ULONG_MAX);
389 buffers[i].len = (ULONG)len;
390 buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->write_slices->slices[i]);
391 }
392
393 /* First, let's try a synchronous, non-blocking write. */
394 status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
395 &bytes_sent, 0, NULL, NULL);
396 info->wsa_error = status == 0 ? 0 : WSAGetLastError();
397
398 /* We would kind of expect to get a WSAEWOULDBLOCK here, especially on a busy
399 connection that has its send queue filled up. But if we don't, then we can
400 avoid doing an async write operation at all. */
401 if (info->wsa_error != WSAEWOULDBLOCK) {
402 grpc_error* error = status == 0
403 ? GRPC_ERROR_NONE
404 : GRPC_WSA_ERROR(info->wsa_error, "WSASend");
405 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
406 if (allocated) gpr_free(allocated);
407 return;
408 }
409
410 TCP_REF(tcp, "write");
411
412 /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
413 operation, this time asynchronously. */
414 memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
415 status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
416 &bytes_sent, 0, &socket->write_info.overlapped, NULL);
417 if (allocated) gpr_free(allocated);
418
419 if (status != 0) {
420 int wsa_error = WSAGetLastError();
421 if (wsa_error != WSA_IO_PENDING) {
422 TCP_UNREF(tcp, "write");
423 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb,
424 GRPC_WSA_ERROR(wsa_error, "WSASend"));
425 return;
426 }
427 }
428
429 /* As all is now setup, we can now ask for the IOCP notification. It may
430 trigger the callback immediately however, but no matter. */
431 grpc_socket_notify_on_write(socket, &tcp->on_write);
432 }
433
win_add_to_pollset(grpc_endpoint * ep,grpc_pollset * ps)434 static void win_add_to_pollset(grpc_endpoint* ep, grpc_pollset* ps) {
435 grpc_tcp* tcp;
436 (void)ps;
437 tcp = (grpc_tcp*)ep;
438 grpc_iocp_add_socket(tcp->socket);
439 }
440
win_add_to_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pss)441 static void win_add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pss) {
442 grpc_tcp* tcp;
443 (void)pss;
444 tcp = (grpc_tcp*)ep;
445 grpc_iocp_add_socket(tcp->socket);
446 }
447
win_delete_from_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pss)448 static void win_delete_from_pollset_set(grpc_endpoint* ep,
449 grpc_pollset_set* pss) {}
450
451 /* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
452 for the potential read and write operations. It is up to the caller to
453 guarantee this isn't called in parallel to a read or write request, so
454 we're not going to protect against these. However the IO Completion Port
455 callback will happen from another thread, so we need to protect against
456 concurrent access of the data structure in that regard. */
win_shutdown(grpc_endpoint * ep,grpc_error * why)457 static void win_shutdown(grpc_endpoint* ep, grpc_error* why) {
458 grpc_tcp* tcp = (grpc_tcp*)ep;
459 gpr_mu_lock(&tcp->mu);
460 /* At that point, what may happen is that we're already inside the IOCP
461 callback. See the comments in on_read and on_write. */
462 if (!tcp->shutting_down) {
463 tcp->shutting_down = 1;
464 tcp->shutdown_error = why;
465 } else {
466 GRPC_ERROR_UNREF(why);
467 }
468 grpc_winsocket_shutdown(tcp->socket);
469 gpr_mu_unlock(&tcp->mu);
470 grpc_resource_user_shutdown(tcp->resource_user);
471 }
472
win_destroy(grpc_endpoint * ep)473 static void win_destroy(grpc_endpoint* ep) {
474 grpc_tcp* tcp = (grpc_tcp*)ep;
475 grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
476 TCP_UNREF(tcp, "destroy");
477 }
478
win_get_peer(grpc_endpoint * ep)479 static absl::string_view win_get_peer(grpc_endpoint* ep) {
480 grpc_tcp* tcp = (grpc_tcp*)ep;
481 return tcp->peer_string;
482 }
483
win_get_local_address(grpc_endpoint * ep)484 static absl::string_view win_get_local_address(grpc_endpoint* ep) {
485 grpc_tcp* tcp = (grpc_tcp*)ep;
486 return tcp->local_address;
487 }
488
win_get_resource_user(grpc_endpoint * ep)489 static grpc_resource_user* win_get_resource_user(grpc_endpoint* ep) {
490 grpc_tcp* tcp = (grpc_tcp*)ep;
491 return tcp->resource_user;
492 }
493
win_get_fd(grpc_endpoint * ep)494 static int win_get_fd(grpc_endpoint* ep) { return -1; }
495
win_can_track_err(grpc_endpoint * ep)496 static bool win_can_track_err(grpc_endpoint* ep) { return false; }
497
498 static grpc_endpoint_vtable vtable = {win_read,
499 win_write,
500 win_add_to_pollset,
501 win_add_to_pollset_set,
502 win_delete_from_pollset_set,
503 win_shutdown,
504 win_destroy,
505 win_get_resource_user,
506 win_get_peer,
507 win_get_local_address,
508 win_get_fd,
509 win_can_track_err};
510
grpc_tcp_create(grpc_winsocket * socket,grpc_channel_args * channel_args,const char * peer_string)511 grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
512 grpc_channel_args* channel_args,
513 const char* peer_string) {
514 grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
515 if (channel_args != NULL) {
516 for (size_t i = 0; i < channel_args->num_args; i++) {
517 if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
518 grpc_resource_quota_unref_internal(resource_quota);
519 resource_quota = grpc_resource_quota_ref_internal(
520 (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
521 }
522 }
523 }
524 grpc_tcp* tcp = new grpc_tcp;
525 memset(tcp, 0, sizeof(grpc_tcp));
526 tcp->base.vtable = &vtable;
527 tcp->socket = socket;
528 gpr_mu_init(&tcp->mu);
529 gpr_ref_init(&tcp->refcount, 1);
530 GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
531 GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
532 grpc_resolved_address resolved_local_addr;
533 resolved_local_addr.len = sizeof(resolved_local_addr.addr);
534 if (getsockname(tcp->socket->socket,
535 reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
536 &resolved_local_addr.len) < 0) {
537 tcp->local_address = "";
538 } else {
539 tcp->local_address = grpc_sockaddr_to_uri(&resolved_local_addr);
540 }
541 tcp->peer_string = peer_string;
542 grpc_slice_buffer_init(&tcp->last_read_buffer);
543 tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
544 grpc_resource_quota_unref_internal(resource_quota);
545
546 return &tcp->base;
547 }
548
549 #endif /* GRPC_WINSOCK_SOCKET */
550