• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include <limits.h>
26 
27 #include "src/core/lib/iomgr/sockaddr_windows.h"
28 
29 #include <grpc/slice_buffer.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/log_windows.h>
33 #include <grpc/support/string_util.h>
34 
35 #include "src/core/lib/gpr/useful.h"
36 #include "src/core/lib/iomgr/iocp_windows.h"
37 #include "src/core/lib/iomgr/sockaddr.h"
38 #include "src/core/lib/iomgr/sockaddr_utils.h"
39 #include "src/core/lib/iomgr/socket_windows.h"
40 #include "src/core/lib/iomgr/tcp_client.h"
41 #include "src/core/lib/iomgr/tcp_windows.h"
42 #include "src/core/lib/iomgr/timer.h"
43 #include "src/core/lib/slice/slice_internal.h"
44 #include "src/core/lib/slice/slice_string_helpers.h"
45 
46 #if defined(__MSYS__) && defined(GPR_ARCH_64)
47 /* Nasty workaround for nasty bug when using the 64 bits msys compiler
48    in conjunction with Microsoft Windows headers. */
49 #define GRPC_FIONBIO _IOW('f', 126, uint32_t)
50 #else
51 #define GRPC_FIONBIO FIONBIO
52 #endif
53 
54 extern grpc_core::TraceFlag grpc_tcp_trace;
55 
grpc_tcp_set_non_block(SOCKET sock)56 grpc_error* grpc_tcp_set_non_block(SOCKET sock) {
57   int status;
58   uint32_t param = 1;
59   DWORD ret;
60   status = WSAIoctl(sock, GRPC_FIONBIO, &param, sizeof(param), NULL, 0, &ret,
61                     NULL, NULL);
62   return status == 0
63              ? GRPC_ERROR_NONE
64              : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)");
65 }
66 
set_dualstack(SOCKET sock)67 static grpc_error* set_dualstack(SOCKET sock) {
68   int status;
69   unsigned long param = 0;
70   status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&param,
71                       sizeof(param));
72   return status == 0
73              ? GRPC_ERROR_NONE
74              : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)");
75 }
76 
enable_socket_low_latency(SOCKET sock)77 static grpc_error* enable_socket_low_latency(SOCKET sock) {
78   int status;
79   BOOL param = TRUE;
80   status = ::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
81                         reinterpret_cast<char*>(&param), sizeof(param));
82   if (status == SOCKET_ERROR) {
83     status = WSAGetLastError();
84   }
85   return status == 0 ? GRPC_ERROR_NONE
86                      : GRPC_WSA_ERROR(status, "setsockopt(TCP_NODELAY)");
87 }
88 
grpc_tcp_prepare_socket(SOCKET sock)89 grpc_error* grpc_tcp_prepare_socket(SOCKET sock) {
90   grpc_error* err;
91   err = grpc_tcp_set_non_block(sock);
92   if (err != GRPC_ERROR_NONE) return err;
93   err = set_dualstack(sock);
94   if (err != GRPC_ERROR_NONE) return err;
95   err = enable_socket_low_latency(sock);
96   if (err != GRPC_ERROR_NONE) return err;
97   return GRPC_ERROR_NONE;
98 }
99 
100 typedef struct grpc_tcp {
101   /* This is our C++ class derivation emulation. */
102   grpc_endpoint base;
103   /* The one socket this endpoint is using. */
104   grpc_winsocket* socket;
105   /* Refcounting how many operations are in progress. */
106   gpr_refcount refcount;
107 
108   grpc_closure on_read;
109   grpc_closure on_write;
110 
111   grpc_closure* read_cb;
112   grpc_closure* write_cb;
113 
114   /* garbage after the last read */
115   grpc_slice_buffer last_read_buffer;
116 
117   grpc_slice_buffer* write_slices;
118   grpc_slice_buffer* read_slices;
119 
120   grpc_resource_user* resource_user;
121 
122   /* The IO Completion Port runs from another thread. We need some mechanism
123      to protect ourselves when requesting a shutdown. */
124   gpr_mu mu;
125   int shutting_down;
126   grpc_error* shutdown_error;
127 
128   char* peer_string;
129 } grpc_tcp;
130 
tcp_free(grpc_tcp * tcp)131 static void tcp_free(grpc_tcp* tcp) {
132   grpc_winsocket_destroy(tcp->socket);
133   gpr_mu_destroy(&tcp->mu);
134   gpr_free(tcp->peer_string);
135   grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
136   grpc_resource_user_unref(tcp->resource_user);
137   if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error);
138   gpr_free(tcp);
139 }
140 
141 #ifndef NDEBUG
142 #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
143 #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
tcp_unref(grpc_tcp * tcp,const char * reason,const char * file,int line)144 static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file,
145                       int line) {
146   if (grpc_tcp_trace.enabled()) {
147     gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
148     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
149             "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
150             val - 1);
151   }
152   if (gpr_unref(&tcp->refcount)) {
153     tcp_free(tcp);
154   }
155 }
156 
tcp_ref(grpc_tcp * tcp,const char * reason,const char * file,int line)157 static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
158                     int line) {
159   if (grpc_tcp_trace.enabled()) {
160     gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
161     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
162             "TCP   ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
163             val + 1);
164   }
165   gpr_ref(&tcp->refcount);
166 }
167 #else
168 #define TCP_UNREF(tcp, reason) tcp_unref((tcp))
169 #define TCP_REF(tcp, reason) tcp_ref((tcp))
tcp_unref(grpc_tcp * tcp)170 static void tcp_unref(grpc_tcp* tcp) {
171   if (gpr_unref(&tcp->refcount)) {
172     tcp_free(tcp);
173   }
174 }
175 
tcp_ref(grpc_tcp * tcp)176 static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
177 #endif
178 
179 /* Asynchronous callback from the IOCP, or the background thread. */
on_read(void * tcpp,grpc_error * error)180 static void on_read(void* tcpp, grpc_error* error) {
181   grpc_tcp* tcp = (grpc_tcp*)tcpp;
182   grpc_closure* cb = tcp->read_cb;
183   grpc_winsocket* socket = tcp->socket;
184   grpc_winsocket_callback_info* info = &socket->read_info;
185 
186   if (grpc_tcp_trace.enabled()) {
187     gpr_log(GPR_INFO, "TCP:%p on_read", tcp);
188   }
189 
190   GRPC_ERROR_REF(error);
191 
192   if (error == GRPC_ERROR_NONE) {
193     if (info->wsa_error != 0 && !tcp->shutting_down) {
194       char* utf8_message = gpr_format_message(info->wsa_error);
195       error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(utf8_message);
196       gpr_free(utf8_message);
197       grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
198     } else {
199       if (info->bytes_transferred != 0 && !tcp->shutting_down) {
200         GPR_ASSERT((size_t)info->bytes_transferred <= tcp->read_slices->length);
201         if (static_cast<size_t>(info->bytes_transferred) !=
202             tcp->read_slices->length) {
203           grpc_slice_buffer_trim_end(
204               tcp->read_slices,
205               tcp->read_slices->length -
206                   static_cast<size_t>(info->bytes_transferred),
207               &tcp->last_read_buffer);
208         }
209         GPR_ASSERT((size_t)info->bytes_transferred == tcp->read_slices->length);
210 
211         if (grpc_tcp_trace.enabled()) {
212           size_t i;
213           for (i = 0; i < tcp->read_slices->count; i++) {
214             char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
215                                          GPR_DUMP_HEX | GPR_DUMP_ASCII);
216             gpr_log(GPR_INFO, "READ %p (peer=%s): %s", tcp, tcp->peer_string,
217                     dump);
218             gpr_free(dump);
219           }
220         }
221       } else {
222         if (grpc_tcp_trace.enabled()) {
223           gpr_log(GPR_INFO, "TCP:%p unref read_slice", tcp);
224         }
225         grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
226         error = tcp->shutting_down
227                     ? GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
228                           "TCP stream shutting down", &tcp->shutdown_error, 1)
229                     : GRPC_ERROR_CREATE_FROM_STATIC_STRING("End of TCP stream");
230       }
231     }
232   }
233 
234   tcp->read_cb = NULL;
235   TCP_UNREF(tcp, "read");
236   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
237 }
238 
239 #define DEFAULT_TARGET_READ_SIZE 8192
240 #define MAX_WSABUF_COUNT 16
win_read(grpc_endpoint * ep,grpc_slice_buffer * read_slices,grpc_closure * cb,bool urgent)241 static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
242                      grpc_closure* cb, bool urgent) {
243   grpc_tcp* tcp = (grpc_tcp*)ep;
244   grpc_winsocket* handle = tcp->socket;
245   grpc_winsocket_callback_info* info = &handle->read_info;
246   int status;
247   DWORD bytes_read = 0;
248   DWORD flags = 0;
249   WSABUF buffers[MAX_WSABUF_COUNT];
250   size_t i;
251 
252   if (grpc_tcp_trace.enabled()) {
253     gpr_log(GPR_INFO, "TCP:%p win_read", tcp);
254   }
255 
256   if (tcp->shutting_down) {
257     grpc_core::ExecCtx::Run(
258         DEBUG_LOCATION, cb,
259         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
260             "TCP socket is shutting down", &tcp->shutdown_error, 1));
261     return;
262   }
263 
264   tcp->read_cb = cb;
265   tcp->read_slices = read_slices;
266   grpc_slice_buffer_reset_and_unref_internal(read_slices);
267   grpc_slice_buffer_swap(read_slices, &tcp->last_read_buffer);
268 
269   if (tcp->read_slices->length < DEFAULT_TARGET_READ_SIZE / 2 &&
270       tcp->read_slices->count < MAX_WSABUF_COUNT) {
271     // TODO(jtattermusch): slice should be allocated using resource quota
272     grpc_slice_buffer_add(tcp->read_slices,
273                           GRPC_SLICE_MALLOC(DEFAULT_TARGET_READ_SIZE));
274   }
275 
276   GPR_ASSERT(tcp->read_slices->count <= MAX_WSABUF_COUNT);
277   for (i = 0; i < tcp->read_slices->count; i++) {
278     buffers[i].len = (ULONG)GRPC_SLICE_LENGTH(
279         tcp->read_slices->slices[i]);  // we know slice size fits in 32bit.
280     buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[i]);
281   }
282 
283   TCP_REF(tcp, "read");
284 
285   /* First let's try a synchronous, non-blocking read. */
286   status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count,
287                    &bytes_read, &flags, NULL, NULL);
288   info->wsa_error = status == 0 ? 0 : WSAGetLastError();
289 
290   /* Did we get data immediately ? Yay. */
291   if (info->wsa_error != WSAEWOULDBLOCK) {
292     info->bytes_transferred = bytes_read;
293     grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read, GRPC_ERROR_NONE);
294     return;
295   }
296 
297   /* Otherwise, let's retry, by queuing a read. */
298   memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
299   status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count,
300                    &bytes_read, &flags, &info->overlapped, NULL);
301 
302   if (status != 0) {
303     int wsa_error = WSAGetLastError();
304     if (wsa_error != WSA_IO_PENDING) {
305       info->wsa_error = wsa_error;
306       grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read,
307                               GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
308       return;
309     }
310   }
311 
312   grpc_socket_notify_on_read(tcp->socket, &tcp->on_read);
313 }
314 
315 /* Asynchronous callback from the IOCP, or the background thread. */
on_write(void * tcpp,grpc_error * error)316 static void on_write(void* tcpp, grpc_error* error) {
317   grpc_tcp* tcp = (grpc_tcp*)tcpp;
318   grpc_winsocket* handle = tcp->socket;
319   grpc_winsocket_callback_info* info = &handle->write_info;
320   grpc_closure* cb;
321 
322   if (grpc_tcp_trace.enabled()) {
323     gpr_log(GPR_INFO, "TCP:%p on_write", tcp);
324   }
325 
326   GRPC_ERROR_REF(error);
327 
328   gpr_mu_lock(&tcp->mu);
329   cb = tcp->write_cb;
330   tcp->write_cb = NULL;
331   gpr_mu_unlock(&tcp->mu);
332 
333   if (error == GRPC_ERROR_NONE) {
334     if (info->wsa_error != 0) {
335       error = GRPC_WSA_ERROR(info->wsa_error, "WSASend");
336     } else {
337       GPR_ASSERT(info->bytes_transferred == tcp->write_slices->length);
338     }
339   }
340 
341   TCP_UNREF(tcp, "write");
342   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
343 }
344 
345 /* Initiates a write. */
win_write(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)346 static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
347                       grpc_closure* cb, void* arg) {
348   grpc_tcp* tcp = (grpc_tcp*)ep;
349   grpc_winsocket* socket = tcp->socket;
350   grpc_winsocket_callback_info* info = &socket->write_info;
351   unsigned i;
352   DWORD bytes_sent;
353   int status;
354   WSABUF local_buffers[MAX_WSABUF_COUNT];
355   WSABUF* allocated = NULL;
356   WSABUF* buffers = local_buffers;
357   size_t len;
358 
359   if (grpc_tcp_trace.enabled()) {
360     size_t i;
361     for (i = 0; i < slices->count; i++) {
362       char* data =
363           grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
364       gpr_log(GPR_INFO, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
365       gpr_free(data);
366     }
367   }
368 
369   if (tcp->shutting_down) {
370     grpc_core::ExecCtx::Run(
371         DEBUG_LOCATION, cb,
372         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
373             "TCP socket is shutting down", &tcp->shutdown_error, 1));
374     return;
375   }
376 
377   tcp->write_cb = cb;
378   tcp->write_slices = slices;
379   GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
380   if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) {
381     buffers = (WSABUF*)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count);
382     allocated = buffers;
383   }
384 
385   for (i = 0; i < tcp->write_slices->count; i++) {
386     len = GRPC_SLICE_LENGTH(tcp->write_slices->slices[i]);
387     GPR_ASSERT(len <= ULONG_MAX);
388     buffers[i].len = (ULONG)len;
389     buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->write_slices->slices[i]);
390   }
391 
392   /* First, let's try a synchronous, non-blocking write. */
393   status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
394                    &bytes_sent, 0, NULL, NULL);
395   info->wsa_error = status == 0 ? 0 : WSAGetLastError();
396 
397   /* We would kind of expect to get a WSAEWOULDBLOCK here, especially on a busy
398      connection that has its send queue filled up. But if we don't, then we can
399      avoid doing an async write operation at all. */
400   if (info->wsa_error != WSAEWOULDBLOCK) {
401     grpc_error* error = status == 0
402                             ? GRPC_ERROR_NONE
403                             : GRPC_WSA_ERROR(info->wsa_error, "WSASend");
404     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
405     if (allocated) gpr_free(allocated);
406     return;
407   }
408 
409   TCP_REF(tcp, "write");
410 
411   /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
412      operation, this time asynchronously. */
413   memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
414   status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
415                    &bytes_sent, 0, &socket->write_info.overlapped, NULL);
416   if (allocated) gpr_free(allocated);
417 
418   if (status != 0) {
419     int wsa_error = WSAGetLastError();
420     if (wsa_error != WSA_IO_PENDING) {
421       TCP_UNREF(tcp, "write");
422       grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb,
423                               GRPC_WSA_ERROR(wsa_error, "WSASend"));
424       return;
425     }
426   }
427 
428   /* As all is now setup, we can now ask for the IOCP notification. It may
429      trigger the callback immediately however, but no matter. */
430   grpc_socket_notify_on_write(socket, &tcp->on_write);
431 }
432 
win_add_to_pollset(grpc_endpoint * ep,grpc_pollset * ps)433 static void win_add_to_pollset(grpc_endpoint* ep, grpc_pollset* ps) {
434   grpc_tcp* tcp;
435   (void)ps;
436   tcp = (grpc_tcp*)ep;
437   grpc_iocp_add_socket(tcp->socket);
438 }
439 
win_add_to_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pss)440 static void win_add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pss) {
441   grpc_tcp* tcp;
442   (void)pss;
443   tcp = (grpc_tcp*)ep;
444   grpc_iocp_add_socket(tcp->socket);
445 }
446 
win_delete_from_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pss)447 static void win_delete_from_pollset_set(grpc_endpoint* ep,
448                                         grpc_pollset_set* pss) {}
449 
450 /* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
451    for the potential read and write operations. It is up to the caller to
452    guarantee this isn't called in parallel to a read or write request, so
453    we're not going to protect against these. However the IO Completion Port
454    callback will happen from another thread, so we need to protect against
455    concurrent access of the data structure in that regard. */
win_shutdown(grpc_endpoint * ep,grpc_error * why)456 static void win_shutdown(grpc_endpoint* ep, grpc_error* why) {
457   grpc_tcp* tcp = (grpc_tcp*)ep;
458   gpr_mu_lock(&tcp->mu);
459   /* At that point, what may happen is that we're already inside the IOCP
460      callback. See the comments in on_read and on_write. */
461   if (!tcp->shutting_down) {
462     tcp->shutting_down = 1;
463     tcp->shutdown_error = why;
464   } else {
465     GRPC_ERROR_UNREF(why);
466   }
467   grpc_winsocket_shutdown(tcp->socket);
468   gpr_mu_unlock(&tcp->mu);
469   grpc_resource_user_shutdown(tcp->resource_user);
470 }
471 
win_destroy(grpc_endpoint * ep)472 static void win_destroy(grpc_endpoint* ep) {
473   grpc_tcp* tcp = (grpc_tcp*)ep;
474   grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
475   TCP_UNREF(tcp, "destroy");
476 }
477 
win_get_peer(grpc_endpoint * ep)478 static char* win_get_peer(grpc_endpoint* ep) {
479   grpc_tcp* tcp = (grpc_tcp*)ep;
480   return gpr_strdup(tcp->peer_string);
481 }
482 
win_get_resource_user(grpc_endpoint * ep)483 static grpc_resource_user* win_get_resource_user(grpc_endpoint* ep) {
484   grpc_tcp* tcp = (grpc_tcp*)ep;
485   return tcp->resource_user;
486 }
487 
win_get_fd(grpc_endpoint * ep)488 static int win_get_fd(grpc_endpoint* ep) { return -1; }
489 
win_can_track_err(grpc_endpoint * ep)490 static bool win_can_track_err(grpc_endpoint* ep) { return false; }
491 
492 static grpc_endpoint_vtable vtable = {win_read,
493                                       win_write,
494                                       win_add_to_pollset,
495                                       win_add_to_pollset_set,
496                                       win_delete_from_pollset_set,
497                                       win_shutdown,
498                                       win_destroy,
499                                       win_get_resource_user,
500                                       win_get_peer,
501                                       win_get_fd,
502                                       win_can_track_err};
503 
grpc_tcp_create(grpc_winsocket * socket,grpc_channel_args * channel_args,const char * peer_string)504 grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
505                                grpc_channel_args* channel_args,
506                                const char* peer_string) {
507   grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
508   if (channel_args != NULL) {
509     for (size_t i = 0; i < channel_args->num_args; i++) {
510       if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
511         grpc_resource_quota_unref_internal(resource_quota);
512         resource_quota = grpc_resource_quota_ref_internal(
513             (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
514       }
515     }
516   }
517   grpc_tcp* tcp = (grpc_tcp*)gpr_malloc(sizeof(grpc_tcp));
518   memset(tcp, 0, sizeof(grpc_tcp));
519   tcp->base.vtable = &vtable;
520   tcp->socket = socket;
521   gpr_mu_init(&tcp->mu);
522   gpr_ref_init(&tcp->refcount, 1);
523   GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
524   GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
525   tcp->peer_string = gpr_strdup(peer_string);
526   grpc_slice_buffer_init(&tcp->last_read_buffer);
527   tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
528   grpc_resource_quota_unref_internal(resource_quota);
529 
530   return &tcp->base;
531 }
532 
533 #endif /* GRPC_WINSOCK_SOCKET */
534