• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include <winsock2.h>
26 #include <limits>
27 
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/log_windows.h>
31 
32 #include "src/core/lib/debug/stats.h"
33 #include "src/core/lib/gprpp/thd.h"
34 #include "src/core/lib/iomgr/iocp_windows.h"
35 #include "src/core/lib/iomgr/iomgr_internal.h"
36 #include "src/core/lib/iomgr/socket_windows.h"
37 #include "src/core/lib/iomgr/timer.h"
38 
39 static ULONG g_iocp_kick_token;
40 static OVERLAPPED g_iocp_custom_overlap;
41 
42 static gpr_atm g_custom_events = 0;
43 
44 static HANDLE g_iocp;
45 
deadline_to_millis_timeout(grpc_millis deadline)46 static DWORD deadline_to_millis_timeout(grpc_millis deadline) {
47   if (deadline == GRPC_MILLIS_INF_FUTURE) {
48     return INFINITE;
49   }
50   grpc_millis now = grpc_core::ExecCtx::Get()->Now();
51   if (deadline < now) return 0;
52   grpc_millis timeout = deadline - now;
53   if (timeout > std::numeric_limits<DWORD>::max()) return INFINITE;
54   return static_cast<DWORD>(deadline - now);
55 }
56 
grpc_iocp_work(grpc_millis deadline)57 grpc_iocp_work_status grpc_iocp_work(grpc_millis deadline) {
58   BOOL success;
59   DWORD bytes = 0;
60   DWORD flags = 0;
61   ULONG_PTR completion_key;
62   LPOVERLAPPED overlapped;
63   grpc_winsocket* socket;
64   grpc_winsocket_callback_info* info;
65   GRPC_STATS_INC_SYSCALL_POLL();
66   success =
67       GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped,
68                                 deadline_to_millis_timeout(deadline));
69   grpc_core::ExecCtx::Get()->InvalidateNow();
70   if (success == 0 && overlapped == NULL) {
71     return GRPC_IOCP_WORK_TIMEOUT;
72   }
73   GPR_ASSERT(completion_key && overlapped);
74   if (overlapped == &g_iocp_custom_overlap) {
75     gpr_atm_full_fetch_add(&g_custom_events, -1);
76     if (completion_key == (ULONG_PTR)&g_iocp_kick_token) {
77       /* We were awoken from a kick. */
78       return GRPC_IOCP_WORK_KICK;
79     }
80     gpr_log(GPR_ERROR, "Unknown custom completion key.");
81     abort();
82   }
83 
84   socket = (grpc_winsocket*)completion_key;
85   if (overlapped == &socket->write_info.overlapped) {
86     info = &socket->write_info;
87   } else if (overlapped == &socket->read_info.overlapped) {
88     info = &socket->read_info;
89   } else {
90     abort();
91   }
92   if (socket->shutdown_called) {
93     info->bytes_transferred = 0;
94     info->wsa_error = WSA_OPERATION_ABORTED;
95   } else {
96     success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
97                                      FALSE, &flags);
98     info->bytes_transferred = bytes;
99     info->wsa_error = success ? 0 : WSAGetLastError();
100   }
101   GPR_ASSERT(overlapped == &info->overlapped);
102   grpc_socket_become_ready(socket, info);
103   return GRPC_IOCP_WORK_WORK;
104 }
105 
grpc_iocp_init(void)106 void grpc_iocp_init(void) {
107   g_iocp =
108       CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
109   GPR_ASSERT(g_iocp);
110 }
111 
grpc_iocp_kick(void)112 void grpc_iocp_kick(void) {
113   BOOL success;
114 
115   gpr_atm_full_fetch_add(&g_custom_events, 1);
116   success = PostQueuedCompletionStatus(g_iocp, 0, (ULONG_PTR)&g_iocp_kick_token,
117                                        &g_iocp_custom_overlap);
118   GPR_ASSERT(success);
119 }
120 
grpc_iocp_flush(void)121 void grpc_iocp_flush(void) {
122   grpc_core::ExecCtx exec_ctx;
123   grpc_iocp_work_status work_status;
124 
125   do {
126     work_status = grpc_iocp_work(GRPC_MILLIS_INF_PAST);
127   } while (work_status == GRPC_IOCP_WORK_KICK ||
128            grpc_core::ExecCtx::Get()->Flush());
129 }
130 
grpc_iocp_shutdown(void)131 void grpc_iocp_shutdown(void) {
132   grpc_core::ExecCtx exec_ctx;
133   while (gpr_atm_acq_load(&g_custom_events)) {
134     grpc_iocp_work(GRPC_MILLIS_INF_FUTURE);
135     grpc_core::ExecCtx::Get()->Flush();
136   }
137 
138   GPR_ASSERT(CloseHandle(g_iocp));
139 }
140 
grpc_iocp_add_socket(grpc_winsocket * socket)141 void grpc_iocp_add_socket(grpc_winsocket* socket) {
142   HANDLE ret;
143   if (socket->added_to_iocp) return;
144   ret = CreateIoCompletionPort((HANDLE)socket->socket, g_iocp,
145                                (uintptr_t)socket, 0);
146   if (!ret) {
147     char* utf8_message = gpr_format_message(WSAGetLastError());
148     gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message);
149     gpr_free(utf8_message);
150     __debugbreak();
151     abort();
152   }
153   socket->added_to_iocp = 1;
154   GPR_ASSERT(ret == g_iocp);
155 }
156 
157 #endif /* GRPC_WINSOCK_SOCKET */
158