• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include <limits.h>
26 
27 #include "src/core/lib/iomgr/sockaddr_windows.h"
28 
29 #include <grpc/slice_buffer.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/log_windows.h>
33 #include <grpc/support/string_util.h>
34 
35 #include "src/core/lib/gpr/useful.h"
36 #include "src/core/lib/iomgr/iocp_windows.h"
37 #include "src/core/lib/iomgr/sockaddr.h"
38 #include "src/core/lib/iomgr/sockaddr_utils.h"
39 #include "src/core/lib/iomgr/socket_windows.h"
40 #include "src/core/lib/iomgr/tcp_client.h"
41 #include "src/core/lib/iomgr/tcp_windows.h"
42 #include "src/core/lib/iomgr/timer.h"
43 #include "src/core/lib/slice/slice_internal.h"
44 #include "src/core/lib/slice/slice_string_helpers.h"
45 
46 #if defined(__MSYS__) && defined(GPR_ARCH_64)
47 /* Nasty workaround for nasty bug when using the 64 bits msys compiler
48    in conjunction with Microsoft Windows headers. */
49 #define GRPC_FIONBIO _IOW('f', 126, uint32_t)
50 #else
51 #define GRPC_FIONBIO FIONBIO
52 #endif
53 
54 extern grpc_core::TraceFlag grpc_tcp_trace;
55 
grpc_tcp_set_non_block(SOCKET sock)56 grpc_error* grpc_tcp_set_non_block(SOCKET sock) {
57   int status;
58   uint32_t param = 1;
59   DWORD ret;
60   status = WSAIoctl(sock, GRPC_FIONBIO, &param, sizeof(param), NULL, 0, &ret,
61                     NULL, NULL);
62   return status == 0
63              ? GRPC_ERROR_NONE
64              : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)");
65 }
66 
set_dualstack(SOCKET sock)67 static grpc_error* set_dualstack(SOCKET sock) {
68   int status;
69   unsigned long param = 0;
70   status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&param,
71                       sizeof(param));
72   return status == 0
73              ? GRPC_ERROR_NONE
74              : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)");
75 }
76 
enable_socket_low_latency(SOCKET sock)77 static grpc_error* enable_socket_low_latency(SOCKET sock) {
78   int status;
79   BOOL param = TRUE;
80   status = ::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
81                         reinterpret_cast<char*>(&param), sizeof(param));
82   if (status == SOCKET_ERROR) {
83     status = WSAGetLastError();
84   }
85   return status == 0 ? GRPC_ERROR_NONE
86                      : GRPC_WSA_ERROR(status, "setsockopt(TCP_NODELAY)");
87 }
88 
grpc_tcp_prepare_socket(SOCKET sock)89 grpc_error* grpc_tcp_prepare_socket(SOCKET sock) {
90   grpc_error* err;
91   err = grpc_tcp_set_non_block(sock);
92   if (err != GRPC_ERROR_NONE) return err;
93   err = set_dualstack(sock);
94   if (err != GRPC_ERROR_NONE) return err;
95   err = enable_socket_low_latency(sock);
96   if (err != GRPC_ERROR_NONE) return err;
97   return GRPC_ERROR_NONE;
98 }
99 
100 typedef struct grpc_tcp {
101   /* This is our C++ class derivation emulation. */
102   grpc_endpoint base;
103   /* The one socket this endpoint is using. */
104   grpc_winsocket* socket;
105   /* Refcounting how many operations are in progress. */
106   gpr_refcount refcount;
107 
108   grpc_closure on_read;
109   grpc_closure on_write;
110 
111   grpc_closure* read_cb;
112   grpc_closure* write_cb;
113 
114   /* garbage after the last read */
115   grpc_slice_buffer last_read_buffer;
116 
117   grpc_slice_buffer* write_slices;
118   grpc_slice_buffer* read_slices;
119 
120   grpc_resource_user* resource_user;
121 
122   /* The IO Completion Port runs from another thread. We need some mechanism
123      to protect ourselves when requesting a shutdown. */
124   gpr_mu mu;
125   int shutting_down;
126   grpc_error* shutdown_error;
127 
128   std::string peer_string;
129   std::string local_address;
130 } grpc_tcp;
131 
tcp_free(grpc_tcp * tcp)132 static void tcp_free(grpc_tcp* tcp) {
133   grpc_winsocket_destroy(tcp->socket);
134   gpr_mu_destroy(&tcp->mu);
135   grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
136   grpc_resource_user_unref(tcp->resource_user);
137   if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error);
138   delete tcp;
139 }
140 
141 #ifndef NDEBUG
142 #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
143 #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
tcp_unref(grpc_tcp * tcp,const char * reason,const char * file,int line)144 static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file,
145                       int line) {
146   if (grpc_tcp_trace.enabled()) {
147     gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
148     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
149             "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
150             val - 1);
151   }
152   if (gpr_unref(&tcp->refcount)) {
153     tcp_free(tcp);
154   }
155 }
156 
tcp_ref(grpc_tcp * tcp,const char * reason,const char * file,int line)157 static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
158                     int line) {
159   if (grpc_tcp_trace.enabled()) {
160     gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
161     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
162             "TCP   ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
163             val + 1);
164   }
165   gpr_ref(&tcp->refcount);
166 }
167 #else
168 #define TCP_UNREF(tcp, reason) tcp_unref((tcp))
169 #define TCP_REF(tcp, reason) tcp_ref((tcp))
tcp_unref(grpc_tcp * tcp)170 static void tcp_unref(grpc_tcp* tcp) {
171   if (gpr_unref(&tcp->refcount)) {
172     tcp_free(tcp);
173   }
174 }
175 
tcp_ref(grpc_tcp * tcp)176 static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
177 #endif
178 
179 /* Asynchronous callback from the IOCP, or the background thread. */
on_read(void * tcpp,grpc_error * error)180 static void on_read(void* tcpp, grpc_error* error) {
181   grpc_tcp* tcp = (grpc_tcp*)tcpp;
182   grpc_closure* cb = tcp->read_cb;
183   grpc_winsocket* socket = tcp->socket;
184   grpc_winsocket_callback_info* info = &socket->read_info;
185 
186   if (grpc_tcp_trace.enabled()) {
187     gpr_log(GPR_INFO, "TCP:%p on_read", tcp);
188   }
189 
190   GRPC_ERROR_REF(error);
191 
192   if (error == GRPC_ERROR_NONE) {
193     if (info->wsa_error != 0 && !tcp->shutting_down) {
194       char* utf8_message = gpr_format_message(info->wsa_error);
195       error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(utf8_message);
196       gpr_free(utf8_message);
197       grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
198     } else {
199       if (info->bytes_transferred != 0 && !tcp->shutting_down) {
200         GPR_ASSERT((size_t)info->bytes_transferred <= tcp->read_slices->length);
201         if (static_cast<size_t>(info->bytes_transferred) !=
202             tcp->read_slices->length) {
203           grpc_slice_buffer_trim_end(
204               tcp->read_slices,
205               tcp->read_slices->length -
206                   static_cast<size_t>(info->bytes_transferred),
207               &tcp->last_read_buffer);
208         }
209         GPR_ASSERT((size_t)info->bytes_transferred == tcp->read_slices->length);
210 
211         if (grpc_tcp_trace.enabled()) {
212           size_t i;
213           for (i = 0; i < tcp->read_slices->count; i++) {
214             char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
215                                          GPR_DUMP_HEX | GPR_DUMP_ASCII);
216             gpr_log(GPR_INFO, "READ %p (peer=%s): %s", tcp,
217                     tcp->peer_string.c_str(), dump);
218             gpr_free(dump);
219           }
220         }
221       } else {
222         if (grpc_tcp_trace.enabled()) {
223           gpr_log(GPR_INFO, "TCP:%p unref read_slice", tcp);
224         }
225         grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
226         error = tcp->shutting_down
227                     ? GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
228                           "TCP stream shutting down", &tcp->shutdown_error, 1)
229                     : GRPC_ERROR_CREATE_FROM_STATIC_STRING("End of TCP stream");
230       }
231     }
232   }
233 
234   tcp->read_cb = NULL;
235   TCP_UNREF(tcp, "read");
236   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
237 }
238 
239 #define DEFAULT_TARGET_READ_SIZE 8192
240 #define MAX_WSABUF_COUNT 16
win_read(grpc_endpoint * ep,grpc_slice_buffer * read_slices,grpc_closure * cb,bool urgent)241 static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
242                      grpc_closure* cb, bool urgent) {
243   grpc_tcp* tcp = (grpc_tcp*)ep;
244   grpc_winsocket* handle = tcp->socket;
245   grpc_winsocket_callback_info* info = &handle->read_info;
246   int status;
247   DWORD bytes_read = 0;
248   DWORD flags = 0;
249   WSABUF buffers[MAX_WSABUF_COUNT];
250   size_t i;
251 
252   if (grpc_tcp_trace.enabled()) {
253     gpr_log(GPR_INFO, "TCP:%p win_read", tcp);
254   }
255 
256   if (tcp->shutting_down) {
257     grpc_core::ExecCtx::Run(
258         DEBUG_LOCATION, cb,
259         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
260             "TCP socket is shutting down", &tcp->shutdown_error, 1));
261     return;
262   }
263 
264   tcp->read_cb = cb;
265   tcp->read_slices = read_slices;
266   grpc_slice_buffer_reset_and_unref_internal(read_slices);
267   grpc_slice_buffer_swap(read_slices, &tcp->last_read_buffer);
268 
269   if (tcp->read_slices->length < DEFAULT_TARGET_READ_SIZE / 2 &&
270       tcp->read_slices->count < MAX_WSABUF_COUNT) {
271     // TODO(jtattermusch): slice should be allocated using resource quota
272     grpc_slice_buffer_add(tcp->read_slices,
273                           GRPC_SLICE_MALLOC(DEFAULT_TARGET_READ_SIZE));
274   }
275 
276   GPR_ASSERT(tcp->read_slices->count <= MAX_WSABUF_COUNT);
277   for (i = 0; i < tcp->read_slices->count; i++) {
278     buffers[i].len = (ULONG)GRPC_SLICE_LENGTH(
279         tcp->read_slices->slices[i]);  // we know slice size fits in 32bit.
280     buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[i]);
281   }
282 
283   TCP_REF(tcp, "read");
284 
285   /* First let's try a synchronous, non-blocking read. */
286   status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count,
287                    &bytes_read, &flags, NULL, NULL);
288   info->wsa_error = status == 0 ? 0 : WSAGetLastError();
289 
290   /* Did we get data immediately ? Yay. */
291   if (info->wsa_error != WSAEWOULDBLOCK) {
292     info->bytes_transferred = bytes_read;
293     grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read, GRPC_ERROR_NONE);
294     return;
295   }
296 
297   /* Otherwise, let's retry, by queuing a read. */
298   memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
299   status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count,
300                    &bytes_read, &flags, &info->overlapped, NULL);
301 
302   if (status != 0) {
303     int wsa_error = WSAGetLastError();
304     if (wsa_error != WSA_IO_PENDING) {
305       info->wsa_error = wsa_error;
306       grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read,
307                               GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
308       return;
309     }
310   }
311 
312   grpc_socket_notify_on_read(tcp->socket, &tcp->on_read);
313 }
314 
315 /* Asynchronous callback from the IOCP, or the background thread. */
on_write(void * tcpp,grpc_error * error)316 static void on_write(void* tcpp, grpc_error* error) {
317   grpc_tcp* tcp = (grpc_tcp*)tcpp;
318   grpc_winsocket* handle = tcp->socket;
319   grpc_winsocket_callback_info* info = &handle->write_info;
320   grpc_closure* cb;
321 
322   if (grpc_tcp_trace.enabled()) {
323     gpr_log(GPR_INFO, "TCP:%p on_write", tcp);
324   }
325 
326   GRPC_ERROR_REF(error);
327 
328   gpr_mu_lock(&tcp->mu);
329   cb = tcp->write_cb;
330   tcp->write_cb = NULL;
331   gpr_mu_unlock(&tcp->mu);
332 
333   if (error == GRPC_ERROR_NONE) {
334     if (info->wsa_error != 0) {
335       error = GRPC_WSA_ERROR(info->wsa_error, "WSASend");
336     } else {
337       GPR_ASSERT(info->bytes_transferred == tcp->write_slices->length);
338     }
339   }
340 
341   TCP_UNREF(tcp, "write");
342   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
343 }
344 
345 /* Initiates a write. */
win_write(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)346 static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
347                       grpc_closure* cb, void* arg) {
348   grpc_tcp* tcp = (grpc_tcp*)ep;
349   grpc_winsocket* socket = tcp->socket;
350   grpc_winsocket_callback_info* info = &socket->write_info;
351   unsigned i;
352   DWORD bytes_sent;
353   int status;
354   WSABUF local_buffers[MAX_WSABUF_COUNT];
355   WSABUF* allocated = NULL;
356   WSABUF* buffers = local_buffers;
357   size_t len;
358 
359   if (grpc_tcp_trace.enabled()) {
360     size_t i;
361     for (i = 0; i < slices->count; i++) {
362       char* data =
363           grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
364       gpr_log(GPR_INFO, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string.c_str(),
365               data);
366       gpr_free(data);
367     }
368   }
369 
370   if (tcp->shutting_down) {
371     grpc_core::ExecCtx::Run(
372         DEBUG_LOCATION, cb,
373         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
374             "TCP socket is shutting down", &tcp->shutdown_error, 1));
375     return;
376   }
377 
378   tcp->write_cb = cb;
379   tcp->write_slices = slices;
380   GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
381   if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) {
382     buffers = (WSABUF*)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count);
383     allocated = buffers;
384   }
385 
386   for (i = 0; i < tcp->write_slices->count; i++) {
387     len = GRPC_SLICE_LENGTH(tcp->write_slices->slices[i]);
388     GPR_ASSERT(len <= ULONG_MAX);
389     buffers[i].len = (ULONG)len;
390     buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->write_slices->slices[i]);
391   }
392 
393   /* First, let's try a synchronous, non-blocking write. */
394   status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
395                    &bytes_sent, 0, NULL, NULL);
396   info->wsa_error = status == 0 ? 0 : WSAGetLastError();
397 
398   /* We would kind of expect to get a WSAEWOULDBLOCK here, especially on a busy
399      connection that has its send queue filled up. But if we don't, then we can
400      avoid doing an async write operation at all. */
401   if (info->wsa_error != WSAEWOULDBLOCK) {
402     grpc_error* error = status == 0
403                             ? GRPC_ERROR_NONE
404                             : GRPC_WSA_ERROR(info->wsa_error, "WSASend");
405     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
406     if (allocated) gpr_free(allocated);
407     return;
408   }
409 
410   TCP_REF(tcp, "write");
411 
412   /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
413      operation, this time asynchronously. */
414   memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
415   status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count,
416                    &bytes_sent, 0, &socket->write_info.overlapped, NULL);
417   if (allocated) gpr_free(allocated);
418 
419   if (status != 0) {
420     int wsa_error = WSAGetLastError();
421     if (wsa_error != WSA_IO_PENDING) {
422       TCP_UNREF(tcp, "write");
423       grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb,
424                               GRPC_WSA_ERROR(wsa_error, "WSASend"));
425       return;
426     }
427   }
428 
429   /* As all is now setup, we can now ask for the IOCP notification. It may
430      trigger the callback immediately however, but no matter. */
431   grpc_socket_notify_on_write(socket, &tcp->on_write);
432 }
433 
win_add_to_pollset(grpc_endpoint * ep,grpc_pollset * ps)434 static void win_add_to_pollset(grpc_endpoint* ep, grpc_pollset* ps) {
435   grpc_tcp* tcp;
436   (void)ps;
437   tcp = (grpc_tcp*)ep;
438   grpc_iocp_add_socket(tcp->socket);
439 }
440 
win_add_to_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pss)441 static void win_add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pss) {
442   grpc_tcp* tcp;
443   (void)pss;
444   tcp = (grpc_tcp*)ep;
445   grpc_iocp_add_socket(tcp->socket);
446 }
447 
win_delete_from_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pss)448 static void win_delete_from_pollset_set(grpc_endpoint* ep,
449                                         grpc_pollset_set* pss) {}
450 
451 /* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
452    for the potential read and write operations. It is up to the caller to
453    guarantee this isn't called in parallel to a read or write request, so
454    we're not going to protect against these. However the IO Completion Port
455    callback will happen from another thread, so we need to protect against
456    concurrent access of the data structure in that regard. */
win_shutdown(grpc_endpoint * ep,grpc_error * why)457 static void win_shutdown(grpc_endpoint* ep, grpc_error* why) {
458   grpc_tcp* tcp = (grpc_tcp*)ep;
459   gpr_mu_lock(&tcp->mu);
460   /* At that point, what may happen is that we're already inside the IOCP
461      callback. See the comments in on_read and on_write. */
462   if (!tcp->shutting_down) {
463     tcp->shutting_down = 1;
464     tcp->shutdown_error = why;
465   } else {
466     GRPC_ERROR_UNREF(why);
467   }
468   grpc_winsocket_shutdown(tcp->socket);
469   gpr_mu_unlock(&tcp->mu);
470   grpc_resource_user_shutdown(tcp->resource_user);
471 }
472 
win_destroy(grpc_endpoint * ep)473 static void win_destroy(grpc_endpoint* ep) {
474   grpc_tcp* tcp = (grpc_tcp*)ep;
475   grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
476   TCP_UNREF(tcp, "destroy");
477 }
478 
win_get_peer(grpc_endpoint * ep)479 static absl::string_view win_get_peer(grpc_endpoint* ep) {
480   grpc_tcp* tcp = (grpc_tcp*)ep;
481   return tcp->peer_string;
482 }
483 
win_get_local_address(grpc_endpoint * ep)484 static absl::string_view win_get_local_address(grpc_endpoint* ep) {
485   grpc_tcp* tcp = (grpc_tcp*)ep;
486   return tcp->local_address;
487 }
488 
win_get_resource_user(grpc_endpoint * ep)489 static grpc_resource_user* win_get_resource_user(grpc_endpoint* ep) {
490   grpc_tcp* tcp = (grpc_tcp*)ep;
491   return tcp->resource_user;
492 }
493 
win_get_fd(grpc_endpoint * ep)494 static int win_get_fd(grpc_endpoint* ep) { return -1; }
495 
win_can_track_err(grpc_endpoint * ep)496 static bool win_can_track_err(grpc_endpoint* ep) { return false; }
497 
498 static grpc_endpoint_vtable vtable = {win_read,
499                                       win_write,
500                                       win_add_to_pollset,
501                                       win_add_to_pollset_set,
502                                       win_delete_from_pollset_set,
503                                       win_shutdown,
504                                       win_destroy,
505                                       win_get_resource_user,
506                                       win_get_peer,
507                                       win_get_local_address,
508                                       win_get_fd,
509                                       win_can_track_err};
510 
grpc_tcp_create(grpc_winsocket * socket,grpc_channel_args * channel_args,const char * peer_string)511 grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
512                                grpc_channel_args* channel_args,
513                                const char* peer_string) {
514   grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
515   if (channel_args != NULL) {
516     for (size_t i = 0; i < channel_args->num_args; i++) {
517       if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
518         grpc_resource_quota_unref_internal(resource_quota);
519         resource_quota = grpc_resource_quota_ref_internal(
520             (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
521       }
522     }
523   }
524   grpc_tcp* tcp = new grpc_tcp;
525   memset(tcp, 0, sizeof(grpc_tcp));
526   tcp->base.vtable = &vtable;
527   tcp->socket = socket;
528   gpr_mu_init(&tcp->mu);
529   gpr_ref_init(&tcp->refcount, 1);
530   GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
531   GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
532   grpc_resolved_address resolved_local_addr;
533   resolved_local_addr.len = sizeof(resolved_local_addr.addr);
534   if (getsockname(tcp->socket->socket,
535                   reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
536                   &resolved_local_addr.len) < 0) {
537     tcp->local_address = "";
538   } else {
539     tcp->local_address = grpc_sockaddr_to_uri(&resolved_local_addr);
540   }
541   tcp->peer_string = peer_string;
542   grpc_slice_buffer_init(&tcp->last_read_buffer);
543   tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
544   grpc_resource_quota_unref_internal(resource_quota);
545 
546   return &tcp->base;
547 }
548 
549 #endif /* GRPC_WINSOCK_SOCKET */
550