1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log_windows.h>
27 #include <winsock2.h>
28
29 #include <limits>
30
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "src/core/lib/iomgr/iocp_windows.h"
34 #include "src/core/lib/iomgr/iomgr_internal.h"
35 #include "src/core/lib/iomgr/socket_windows.h"
36 #include "src/core/lib/iomgr/timer.h"
37 #include "src/core/telemetry/stats.h"
38 #include "src/core/telemetry/stats_data.h"
39 #include "src/core/util/crash.h"
40 #include "src/core/util/thd.h"
41
42 static ULONG g_iocp_kick_token;
43 static OVERLAPPED g_iocp_custom_overlap;
44
45 static gpr_atm g_custom_events = 0;
46 static gpr_atm g_pending_socket_shutdowns = 0;
47
48 static HANDLE g_iocp;
49
deadline_to_millis_timeout(grpc_core::Timestamp deadline)50 static DWORD deadline_to_millis_timeout(grpc_core::Timestamp deadline) {
51 if (deadline == grpc_core::Timestamp::InfFuture()) {
52 return INFINITE;
53 }
54 grpc_core::Timestamp now = grpc_core::Timestamp::Now();
55 if (deadline < now) return 0;
56 grpc_core::Duration timeout = deadline - now;
57 if (timeout.millis() > std::numeric_limits<DWORD>::max()) return INFINITE;
58 return static_cast<DWORD>(timeout.millis());
59 }
60
grpc_iocp_work(grpc_core::Timestamp deadline)61 grpc_iocp_work_status grpc_iocp_work(grpc_core::Timestamp deadline) {
62 BOOL success;
63 DWORD bytes = 0;
64 DWORD flags = 0;
65 ULONG_PTR completion_key;
66 LPOVERLAPPED overlapped;
67 grpc_winsocket* socket;
68 grpc_winsocket_callback_info* info;
69 success =
70 GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped,
71 deadline_to_millis_timeout(deadline));
72 grpc_core::ExecCtx::Get()->InvalidateNow();
73 if (success == 0 && overlapped == NULL) {
74 return GRPC_IOCP_WORK_TIMEOUT;
75 }
76 CHECK(completion_key);
77 CHECK(overlapped);
78 if (overlapped == &g_iocp_custom_overlap) {
79 gpr_atm_full_fetch_add(&g_custom_events, -1);
80 if (completion_key == (ULONG_PTR)&g_iocp_kick_token) {
81 // We were awoken from a kick.
82 return GRPC_IOCP_WORK_KICK;
83 }
84 grpc_core::Crash("Unknown custom completion key.");
85 }
86
87 socket = (grpc_winsocket*)completion_key;
88 if (overlapped == &socket->write_info.overlapped) {
89 info = &socket->write_info;
90 } else if (overlapped == &socket->read_info.overlapped) {
91 info = &socket->read_info;
92 } else {
93 abort();
94 }
95 gpr_mu_lock(&socket->state_mu);
96 if (socket->shutdown_called) {
97 info->bytes_transferred = 0;
98 info->wsa_error = WSA_OPERATION_ABORTED;
99 } else {
100 success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
101 FALSE, &flags);
102 info->bytes_transferred = bytes;
103 info->wsa_error = success ? 0 : WSAGetLastError();
104 }
105 CHECK(overlapped == &info->overlapped);
106 bool should_destroy = grpc_socket_become_ready(socket, info);
107 gpr_mu_unlock(&socket->state_mu);
108 if (should_destroy) {
109 grpc_winsocket_finish(socket);
110 }
111 return GRPC_IOCP_WORK_WORK;
112 }
113
grpc_iocp_init(void)114 void grpc_iocp_init(void) {
115 g_iocp =
116 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
117 CHECK(g_iocp);
118 }
119
grpc_iocp_kick(void)120 void grpc_iocp_kick(void) {
121 BOOL success;
122
123 gpr_atm_full_fetch_add(&g_custom_events, 1);
124 success = PostQueuedCompletionStatus(g_iocp, 0, (ULONG_PTR)&g_iocp_kick_token,
125 &g_iocp_custom_overlap);
126 CHECK(success);
127 }
128
grpc_iocp_flush(void)129 void grpc_iocp_flush(void) {
130 grpc_core::ExecCtx exec_ctx;
131 grpc_iocp_work_status work_status;
132 // This method is called during grpc_shutdown. We make the loop
133 // spin until any pending socket shutdowns are complete.
134 do {
135 work_status = grpc_iocp_work(grpc_core::Timestamp::InfPast());
136 } while (work_status == GRPC_IOCP_WORK_KICK ||
137 grpc_core::ExecCtx::Get()->Flush() ||
138 gpr_atm_acq_load(&g_pending_socket_shutdowns) != 0);
139 }
140
grpc_iocp_shutdown(void)141 void grpc_iocp_shutdown(void) {
142 grpc_core::ExecCtx exec_ctx;
143 while (gpr_atm_acq_load(&g_custom_events)) {
144 grpc_iocp_work(grpc_core::Timestamp::InfFuture());
145 grpc_core::ExecCtx::Get()->Flush();
146 }
147
148 CHECK(CloseHandle(g_iocp));
149 }
150
grpc_iocp_add_socket(grpc_winsocket * socket)151 void grpc_iocp_add_socket(grpc_winsocket* socket) {
152 HANDLE ret;
153 if (socket->added_to_iocp) return;
154 ret = CreateIoCompletionPort((HANDLE)socket->socket, g_iocp,
155 (uintptr_t)socket, 0);
156 if (!ret) {
157 char* utf8_message = gpr_format_message(WSAGetLastError());
158 LOG(ERROR) << "Unable to add socket to iocp: " << utf8_message;
159 gpr_free(utf8_message);
160 __debugbreak();
161 abort();
162 }
163 socket->added_to_iocp = 1;
164 CHECK(ret == g_iocp);
165 }
166
grpc_iocp_register_socket_shutdown_socket_locked(grpc_winsocket * socket)167 void grpc_iocp_register_socket_shutdown_socket_locked(grpc_winsocket* socket) {
168 if (!socket->shutdown_registered) {
169 socket->shutdown_registered = true;
170 gpr_atm_full_fetch_add(&g_pending_socket_shutdowns, 1);
171 }
172 }
173
grpc_iocp_finish_socket_shutdown(grpc_winsocket * socket)174 void grpc_iocp_finish_socket_shutdown(grpc_winsocket* socket) {
175 if (socket->shutdown_registered) {
176 gpr_atm_full_fetch_add(&g_pending_socket_shutdowns, -1);
177 }
178 }
179
180 #endif // GRPC_WINSOCK_SOCKET
181