1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include <limits.h>
26
27 #include "src/core/lib/iomgr/sockaddr_windows.h"
28
29 #include <grpc/slice_buffer.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/log_windows.h>
33 #include <grpc/support/string_util.h>
34
35 #include "src/core/lib/gpr/useful.h"
36 #include "src/core/lib/iomgr/iocp_windows.h"
37 #include "src/core/lib/iomgr/sockaddr.h"
38 #include "src/core/lib/iomgr/sockaddr_utils.h"
39 #include "src/core/lib/iomgr/socket_windows.h"
40 #include "src/core/lib/iomgr/tcp_client.h"
41 #include "src/core/lib/iomgr/tcp_windows.h"
42 #include "src/core/lib/iomgr/timer.h"
43 #include "src/core/lib/slice/slice_internal.h"
44 #include "src/core/lib/slice/slice_string_helpers.h"
45
46 #if defined(__MSYS__) && defined(GPR_ARCH_64)
47 /* Nasty workaround for nasty bug when using the 64 bits msys compiler
48 in conjunction with Microsoft Windows headers. */
49 #define GRPC_FIONBIO _IOW('f', 126, uint32_t)
50 #else
51 #define GRPC_FIONBIO FIONBIO
52 #endif
53
54 extern grpc_core::TraceFlag grpc_tcp_trace;
55
grpc_tcp_set_non_block(SOCKET sock)56 grpc_error* grpc_tcp_set_non_block(SOCKET sock) {
57 int status;
58 uint32_t param = 1;
59 DWORD ret;
60 status = WSAIoctl(sock, GRPC_FIONBIO, ¶m, sizeof(param), NULL, 0, &ret,
61 NULL, NULL);
62 return status == 0
63 ? GRPC_ERROR_NONE
64 : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)");
65 }
66
set_dualstack(SOCKET sock)67 static grpc_error* set_dualstack(SOCKET sock) {
68 int status;
69 unsigned long param = 0;
70 status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)¶m,
71 sizeof(param));
72 return status == 0
73 ? GRPC_ERROR_NONE
74 : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)");
75 }
76
enable_socket_low_latency(SOCKET sock)77 static grpc_error* enable_socket_low_latency(SOCKET sock) {
78 int status;
79 BOOL param = TRUE;
80 status = ::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
81 reinterpret_cast<char*>(¶m), sizeof(param));
82 if (status == SOCKET_ERROR) {
83 status = WSAGetLastError();
84 }
85 return status == 0 ? GRPC_ERROR_NONE
86 : GRPC_WSA_ERROR(status, "setsockopt(TCP_NODELAY)");
87 }
88
grpc_tcp_prepare_socket(SOCKET sock)89 grpc_error* grpc_tcp_prepare_socket(SOCKET sock) {
90 grpc_error* err;
91 err = grpc_tcp_set_non_block(sock);
92 if (err != GRPC_ERROR_NONE) return err;
93 err = set_dualstack(sock);
94 if (err != GRPC_ERROR_NONE) return err;
95 err = enable_socket_low_latency(sock);
96 if (err != GRPC_ERROR_NONE) return err;
97 return GRPC_ERROR_NONE;
98 }
99
100 typedef struct grpc_tcp {
101 /* This is our C++ class derivation emulation. */
102 grpc_endpoint base;
103 /* The one socket this endpoint is using. */
104 grpc_winsocket* socket;
105 /* Refcounting how many operations are in progress. */
106 gpr_refcount refcount;
107
108 grpc_closure on_read;
109 grpc_closure on_write;
110
111 grpc_closure* read_cb;
112 grpc_closure* write_cb;
113
114 /* garbage after the last read */
115 grpc_slice_buffer last_read_buffer;
116
117 grpc_slice_buffer* write_slices;
118 grpc_slice_buffer* read_slices;
119
120 grpc_resource_user* resource_user;
121
122 /* The IO Completion Port runs from another thread. We need some mechanism
123 to protect ourselves when requesting a shutdown. */
124 gpr_mu mu;
125 int shutting_down;
126 grpc_error* shutdown_error;
127
128 char* peer_string;
129 } grpc_tcp;
130
tcp_free(grpc_tcp * tcp)131 static void tcp_free(grpc_tcp* tcp) {
132 grpc_winsocket_destroy(tcp->socket);
133 gpr_mu_destroy(&tcp->mu);
134 gpr_free(tcp->peer_string);
135 grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
136 grpc_resource_user_unref(tcp->resource_user);
137 if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error);
138 gpr_free(tcp);
139 }
140
141 #ifndef NDEBUG
142 #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
143 #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
tcp_unref(grpc_tcp * tcp,const char * reason,const char * file,int line)144 static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file,
145 int line) {
146 if (grpc_tcp_trace.enabled()) {
147 gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
148 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
149 "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
150 val - 1);
151 }
152 if (gpr_unref(&tcp->refcount)) {
153 tcp_free(tcp);
154 }
155 }
156
tcp_ref(grpc_tcp * tcp,const char * reason,const char * file,int line)157 static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
158 int line) {
159 if (grpc_tcp_trace.enabled()) {
160 gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
161 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
162 "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
163 val + 1);
164 }
165 gpr_ref(&tcp->refcount);
166 }
167 #else
168 #define TCP_UNREF(tcp, reason) tcp_unref((tcp))
169 #define TCP_REF(tcp, reason) tcp_ref((tcp))
tcp_unref(grpc_tcp * tcp)170 static void tcp_unref(grpc_tcp* tcp) {
171 if (gpr_unref(&tcp->refcount)) {
172 tcp_free(tcp);
173 }
174 }
175
tcp_ref(grpc_tcp * tcp)176 static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
177 #endif
178
179 /* Asynchronous callback from the IOCP, or the background thread. */
on_read(void * tcpp,grpc_error * error)180 static void on_read(void* tcpp, grpc_error* error) {
181 grpc_tcp* tcp = (grpc_tcp*)tcpp;
182 grpc_closure* cb = tcp->read_cb;
183 grpc_winsocket* socket = tcp->socket;
184 grpc_winsocket_callback_info* info = &socket->read_info;
185
186 if (grpc_tcp_trace.enabled()) {
187 gpr_log(GPR_INFO, "TCP:%p on_read", tcp);
188 }
189
190 GRPC_ERROR_REF(error);
191
192 if (error == GRPC_ERROR_NONE) {
193 if (info->wsa_error != 0 && !tcp->shutting_down) {
194 char* utf8_message = gpr_format_message(info->wsa_error);
195 error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(utf8_message);
196 gpr_free(utf8_message);
197 grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
198 } else {
199 if (info->bytes_transferred != 0 && !tcp->shutting_down) {
200 GPR_ASSERT((size_t)info->bytes_transferred <= tcp->read_slices->length);
201 if (static_cast<size_t>(info->bytes_transferred) !=
202 tcp->read_slices->length) {
203 grpc_slice_buffer_trim_end(
204 tcp->read_slices,
205 tcp->read_slices->length -
206 static_cast<size_t>(info->bytes_transferred),
207 &tcp->last_read_buffer);
208 }
209 GPR_ASSERT((size_t)info->bytes_transferred == tcp->read_slices->length);
210
211 if (grpc_tcp_trace.enabled()) {
212 size_t i;
213 for (i = 0; i < tcp->read_slices->count; i++) {
214 char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
215 GPR_DUMP_HEX | GPR_DUMP_ASCII);
216 gpr_log(GPR_INFO, "READ %p (peer=%s): %s", tcp, tcp->peer_string,
217 dump);
218 gpr_free(dump);
219 }
220 }
221 } else {
222 if (grpc_tcp_trace.enabled()) {
223 gpr_log(GPR_INFO, "TCP:%p unref read_slice", tcp);
224 }
225 grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
226 error = tcp->shutting_down
227 ? GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
228 "TCP stream shutting down", &tcp->shutdown_error, 1)
229 : GRPC_ERROR_CREATE_FROM_STATIC_STRING("End of TCP stream");
230 }
231 }
232 }
233
234 tcp->read_cb = NULL;
235 TCP_UNREF(tcp, "read");
236 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
237 }
238
239 #define DEFAULT_TARGET_READ_SIZE 8192
240 #define MAX_WSABUF_COUNT 16
win_read(grpc_endpoint * ep,grpc_slice_buffer * read_slices,grpc_closure * cb,bool urgent)241 static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
242 grpc_closure* cb, bool urgent) {
243 grpc_tcp* tcp = (grpc_tcp*)ep;
244 grpc_winsocket* handle = tcp->socket;
245 grpc_winsocket_callback_info* info = &handle->read_info;
246 int status;
247 DWORD bytes_read = 0;
248 DWORD flags = 0;
249 WSABUF buffers[MAX_WSABUF_COUNT];
250 size_t i;
251
252 if (grpc_tcp_trace.enabled()) {
253 gpr_log(GPR_INFO, "TCP:%p win_read", tcp);
254 }
255
256 if (tcp->shutting_down) {
257 grpc_core::ExecCtx::Run(
258 DEBUG_LOCATION, cb,
259 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
260 "TCP socket is shutting down", &tcp->shutdown_error, 1));
261 return;
262 }
263
264 tcp->read_cb = cb;
265 tcp->read_slices = read_slices;
266 grpc_slice_buffer_reset_and_unref_internal(read_slices);
267 grpc_slice_buffer_swap(read_slices, &tcp->last_read_buffer);
268
269 if (tcp->read_slices->length < DEFAULT_TARGET_READ_SIZE / 2 &&
270 tcp->read_slices->count < MAX_WSABUF_COUNT) {
271 // TODO(jtattermusch): slice should be allocated using resource quota
272 grpc_slice_buffer_add(tcp->read_slices,
273 GRPC_SLICE_MALLOC(DEFAULT_TARGET_READ_SIZE));
274 }
275
276 GPR_ASSERT(tcp->read_slices->count <= MAX_WSABUF_COUNT);
277 for (i = 0; i < tcp->read_slices->count; i++) {
278 buffers[i].len = (ULONG)GRPC_SLICE_LENGTH(
279 tcp->read_slices->slices[i]); // we know slice size fits in 32bit.
280 buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[i]);
281 }
282
283 TCP_REF(tcp, "read");
284
285 /* First let's try a synchronous, non-blocking read. */
286 status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count,
287 &bytes_read, &flags, NULL, NULL);
288 info->wsa_error = status == 0 ? 0 : WSAGetLastError();
289
290 /* Did we get data immediately ? Yay. */
291 if (info->wsa_error != WSAEWOULDBLOCK) {
292 info->bytes_transferred = bytes_read;
293 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read, GRPC_ERROR_NONE);
294 return;
295 }
296
297 /* Otherwise, let's retry, by queuing a read. */
298 memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
299 status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count,
300 &bytes_read, &flags, &info->overlapped, NULL);
301
302 if (status != 0) {
303 int wsa_error = WSAGetLastError();
304 if (wsa_error != WSA_IO_PENDING) {
305 info->wsa_error = wsa_error;
306 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read,
307 GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
308 return;
309 }
310 }
311
312 grpc_socket_notify_on_read(tcp->socket, &tcp->on_read);
313 }
314
315 /* Asynchronous callback from the IOCP, or the background thread. */
on_write(void * tcpp,grpc_error * error)316 static void on_write(void* tcpp, grpc_error* error) {
317 grpc_tcp* tcp = (grpc_tcp*)tcpp;
318 grpc_winsocket* handle = tcp->socket;
319 grpc_winsocket_callback_info* info = &handle->write_info;
320 grpc_closure* cb;
321
322 if (grpc_tcp_trace.enabled()) {
323 gpr_log(GPR_INFO, "TCP:%p on_write", tcp);
324 }
325
326 GRPC_ERROR_REF(error);
327
328 gpr_mu_lock(&tcp->mu);
329 cb = tcp->write_cb;
330 tcp->write_cb = NULL;
331 gpr_mu_unlock(&tcp->mu);
332
333 if (error == GRPC_ERROR_NONE) {
334 if (info->wsa_error != 0) {
335 error = GRPC_WSA_ERROR(info->wsa_error, "WSASend");
336 } else {
337 GPR_ASSERT(info->bytes_transferred == tcp->write_slices->length);
338 }
339 }
340
341 TCP_UNREF(tcp, "write");
342 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
343 }
344
345 /* Initiates a write. */
win_write(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)346 static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
347 grpc_closure* cb, void* arg) {
348 grpc_tcp* tcp = (grpc_tcp*)ep;
349 grpc_winsocket* socket = tcp->socket;
350 grpc_winsocket_callback_info* info = &socket->write_info;
351 unsigned i;
352 DWORD bytes_sent;
353 int status;
354 WSABUF local_buffers[MAX_WSABUF_COUNT];
355 WSABUF* allocated = NULL;
356 WSABUF* buffers = local_buffers;
357 size_t len;
358
359 if (grpc_tcp_trace.enabled()) {
360 size_t i;
361 for (i = 0; i < slices->count; i++) {
362 char* data =
363 grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
364 gpr_log(GPR_INFO, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
365 gpr_free(data);
366 }
367 }
368
369 if (tcp->shutting_down) {
370 grpc_core::ExecCtx::Run(
371 DEBUG_LOCATION, cb,
372 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
373 "TCP socket is shutting down", &tcp->shutdown_error, 1));
374 return;
375 }
376
377 tcp->write_cb = cb;
378 tcp->write_slices = slices;
379 GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
380 if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) {
381 buffers = (WSABUF*)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count);
382 allocated = buffers;
383 }
384
385 for (i = 0; i < tcp->write_slices->count; i++) {
386 len = GRPC_SLICE_LENGTH(tcp->write_slices->slices[i]);
387 GPR_ASSERT(len <= ULONG_MAX);
388 buffers[i].len = (ULONG)len;
389 buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->write_slices->slices[i]);
390 }
391
392 /* First, let's try a synchronous, non-blocking write. */
393 status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
394 &bytes_sent, 0, NULL, NULL);
395 info->wsa_error = status == 0 ? 0 : WSAGetLastError();
396
397 /* We would kind of expect to get a WSAEWOULDBLOCK here, especially on a busy
398 connection that has its send queue filled up. But if we don't, then we can
399 avoid doing an async write operation at all. */
400 if (info->wsa_error != WSAEWOULDBLOCK) {
401 grpc_error* error = status == 0
402 ? GRPC_ERROR_NONE
403 : GRPC_WSA_ERROR(info->wsa_error, "WSASend");
404 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
405 if (allocated) gpr_free(allocated);
406 return;
407 }
408
409 TCP_REF(tcp, "write");
410
411 /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
412 operation, this time asynchronously. */
413 memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
414 status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
415 &bytes_sent, 0, &socket->write_info.overlapped, NULL);
416 if (allocated) gpr_free(allocated);
417
418 if (status != 0) {
419 int wsa_error = WSAGetLastError();
420 if (wsa_error != WSA_IO_PENDING) {
421 TCP_UNREF(tcp, "write");
422 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb,
423 GRPC_WSA_ERROR(wsa_error, "WSASend"));
424 return;
425 }
426 }
427
428 /* As all is now setup, we can now ask for the IOCP notification. It may
429 trigger the callback immediately however, but no matter. */
430 grpc_socket_notify_on_write(socket, &tcp->on_write);
431 }
432
win_add_to_pollset(grpc_endpoint * ep,grpc_pollset * ps)433 static void win_add_to_pollset(grpc_endpoint* ep, grpc_pollset* ps) {
434 grpc_tcp* tcp;
435 (void)ps;
436 tcp = (grpc_tcp*)ep;
437 grpc_iocp_add_socket(tcp->socket);
438 }
439
win_add_to_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pss)440 static void win_add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pss) {
441 grpc_tcp* tcp;
442 (void)pss;
443 tcp = (grpc_tcp*)ep;
444 grpc_iocp_add_socket(tcp->socket);
445 }
446
win_delete_from_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pss)447 static void win_delete_from_pollset_set(grpc_endpoint* ep,
448 grpc_pollset_set* pss) {}
449
450 /* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
451 for the potential read and write operations. It is up to the caller to
452 guarantee this isn't called in parallel to a read or write request, so
453 we're not going to protect against these. However the IO Completion Port
454 callback will happen from another thread, so we need to protect against
455 concurrent access of the data structure in that regard. */
win_shutdown(grpc_endpoint * ep,grpc_error * why)456 static void win_shutdown(grpc_endpoint* ep, grpc_error* why) {
457 grpc_tcp* tcp = (grpc_tcp*)ep;
458 gpr_mu_lock(&tcp->mu);
459 /* At that point, what may happen is that we're already inside the IOCP
460 callback. See the comments in on_read and on_write. */
461 if (!tcp->shutting_down) {
462 tcp->shutting_down = 1;
463 tcp->shutdown_error = why;
464 } else {
465 GRPC_ERROR_UNREF(why);
466 }
467 grpc_winsocket_shutdown(tcp->socket);
468 gpr_mu_unlock(&tcp->mu);
469 grpc_resource_user_shutdown(tcp->resource_user);
470 }
471
win_destroy(grpc_endpoint * ep)472 static void win_destroy(grpc_endpoint* ep) {
473 grpc_tcp* tcp = (grpc_tcp*)ep;
474 grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
475 TCP_UNREF(tcp, "destroy");
476 }
477
win_get_peer(grpc_endpoint * ep)478 static char* win_get_peer(grpc_endpoint* ep) {
479 grpc_tcp* tcp = (grpc_tcp*)ep;
480 return gpr_strdup(tcp->peer_string);
481 }
482
win_get_resource_user(grpc_endpoint * ep)483 static grpc_resource_user* win_get_resource_user(grpc_endpoint* ep) {
484 grpc_tcp* tcp = (grpc_tcp*)ep;
485 return tcp->resource_user;
486 }
487
win_get_fd(grpc_endpoint * ep)488 static int win_get_fd(grpc_endpoint* ep) { return -1; }
489
win_can_track_err(grpc_endpoint * ep)490 static bool win_can_track_err(grpc_endpoint* ep) { return false; }
491
492 static grpc_endpoint_vtable vtable = {win_read,
493 win_write,
494 win_add_to_pollset,
495 win_add_to_pollset_set,
496 win_delete_from_pollset_set,
497 win_shutdown,
498 win_destroy,
499 win_get_resource_user,
500 win_get_peer,
501 win_get_fd,
502 win_can_track_err};
503
grpc_tcp_create(grpc_winsocket * socket,grpc_channel_args * channel_args,const char * peer_string)504 grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
505 grpc_channel_args* channel_args,
506 const char* peer_string) {
507 grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
508 if (channel_args != NULL) {
509 for (size_t i = 0; i < channel_args->num_args; i++) {
510 if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
511 grpc_resource_quota_unref_internal(resource_quota);
512 resource_quota = grpc_resource_quota_ref_internal(
513 (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
514 }
515 }
516 }
517 grpc_tcp* tcp = (grpc_tcp*)gpr_malloc(sizeof(grpc_tcp));
518 memset(tcp, 0, sizeof(grpc_tcp));
519 tcp->base.vtable = &vtable;
520 tcp->socket = socket;
521 gpr_mu_init(&tcp->mu);
522 gpr_ref_init(&tcp->refcount, 1);
523 GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
524 GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
525 tcp->peer_string = gpr_strdup(peer_string);
526 grpc_slice_buffer_init(&tcp->last_read_buffer);
527 tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
528 grpc_resource_quota_unref_internal(resource_quota);
529
530 return &tcp->base;
531 }
532
533 #endif /* GRPC_WINSOCK_SOCKET */
534