• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log_windows.h>
27 #include <winsock2.h>
28 
29 #include <limits>
30 
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "src/core/lib/iomgr/iocp_windows.h"
34 #include "src/core/lib/iomgr/iomgr_internal.h"
35 #include "src/core/lib/iomgr/socket_windows.h"
36 #include "src/core/lib/iomgr/timer.h"
37 #include "src/core/telemetry/stats.h"
38 #include "src/core/telemetry/stats_data.h"
39 #include "src/core/util/crash.h"
40 #include "src/core/util/thd.h"
41 
42 static ULONG g_iocp_kick_token;
43 static OVERLAPPED g_iocp_custom_overlap;
44 
45 static gpr_atm g_custom_events = 0;
46 static gpr_atm g_pending_socket_shutdowns = 0;
47 
48 static HANDLE g_iocp;
49 
deadline_to_millis_timeout(grpc_core::Timestamp deadline)50 static DWORD deadline_to_millis_timeout(grpc_core::Timestamp deadline) {
51   if (deadline == grpc_core::Timestamp::InfFuture()) {
52     return INFINITE;
53   }
54   grpc_core::Timestamp now = grpc_core::Timestamp::Now();
55   if (deadline < now) return 0;
56   grpc_core::Duration timeout = deadline - now;
57   if (timeout.millis() > std::numeric_limits<DWORD>::max()) return INFINITE;
58   return static_cast<DWORD>(timeout.millis());
59 }
60 
grpc_iocp_work(grpc_core::Timestamp deadline)61 grpc_iocp_work_status grpc_iocp_work(grpc_core::Timestamp deadline) {
62   BOOL success;
63   DWORD bytes = 0;
64   DWORD flags = 0;
65   ULONG_PTR completion_key;
66   LPOVERLAPPED overlapped;
67   grpc_winsocket* socket;
68   grpc_winsocket_callback_info* info;
69   success =
70       GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped,
71                                 deadline_to_millis_timeout(deadline));
72   grpc_core::ExecCtx::Get()->InvalidateNow();
73   if (success == 0 && overlapped == NULL) {
74     return GRPC_IOCP_WORK_TIMEOUT;
75   }
76   CHECK(completion_key);
77   CHECK(overlapped);
78   if (overlapped == &g_iocp_custom_overlap) {
79     gpr_atm_full_fetch_add(&g_custom_events, -1);
80     if (completion_key == (ULONG_PTR)&g_iocp_kick_token) {
81       // We were awoken from a kick.
82       return GRPC_IOCP_WORK_KICK;
83     }
84     grpc_core::Crash("Unknown custom completion key.");
85   }
86 
87   socket = (grpc_winsocket*)completion_key;
88   if (overlapped == &socket->write_info.overlapped) {
89     info = &socket->write_info;
90   } else if (overlapped == &socket->read_info.overlapped) {
91     info = &socket->read_info;
92   } else {
93     abort();
94   }
95   gpr_mu_lock(&socket->state_mu);
96   if (socket->shutdown_called) {
97     info->bytes_transferred = 0;
98     info->wsa_error = WSA_OPERATION_ABORTED;
99   } else {
100     success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
101                                      FALSE, &flags);
102     info->bytes_transferred = bytes;
103     info->wsa_error = success ? 0 : WSAGetLastError();
104   }
105   CHECK(overlapped == &info->overlapped);
106   bool should_destroy = grpc_socket_become_ready(socket, info);
107   gpr_mu_unlock(&socket->state_mu);
108   if (should_destroy) {
109     grpc_winsocket_finish(socket);
110   }
111   return GRPC_IOCP_WORK_WORK;
112 }
113 
grpc_iocp_init(void)114 void grpc_iocp_init(void) {
115   g_iocp =
116       CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
117   CHECK(g_iocp);
118 }
119 
grpc_iocp_kick(void)120 void grpc_iocp_kick(void) {
121   BOOL success;
122 
123   gpr_atm_full_fetch_add(&g_custom_events, 1);
124   success = PostQueuedCompletionStatus(g_iocp, 0, (ULONG_PTR)&g_iocp_kick_token,
125                                        &g_iocp_custom_overlap);
126   CHECK(success);
127 }
128 
grpc_iocp_flush(void)129 void grpc_iocp_flush(void) {
130   grpc_core::ExecCtx exec_ctx;
131   grpc_iocp_work_status work_status;
132   // This method is called during grpc_shutdown. We make the loop
133   // spin until any pending socket shutdowns are complete.
134   do {
135     work_status = grpc_iocp_work(grpc_core::Timestamp::InfPast());
136   } while (work_status == GRPC_IOCP_WORK_KICK ||
137            grpc_core::ExecCtx::Get()->Flush() ||
138            gpr_atm_acq_load(&g_pending_socket_shutdowns) != 0);
139 }
140 
grpc_iocp_shutdown(void)141 void grpc_iocp_shutdown(void) {
142   grpc_core::ExecCtx exec_ctx;
143   while (gpr_atm_acq_load(&g_custom_events)) {
144     grpc_iocp_work(grpc_core::Timestamp::InfFuture());
145     grpc_core::ExecCtx::Get()->Flush();
146   }
147 
148   CHECK(CloseHandle(g_iocp));
149 }
150 
grpc_iocp_add_socket(grpc_winsocket * socket)151 void grpc_iocp_add_socket(grpc_winsocket* socket) {
152   HANDLE ret;
153   if (socket->added_to_iocp) return;
154   ret = CreateIoCompletionPort((HANDLE)socket->socket, g_iocp,
155                                (uintptr_t)socket, 0);
156   if (!ret) {
157     char* utf8_message = gpr_format_message(WSAGetLastError());
158     LOG(ERROR) << "Unable to add socket to iocp: " << utf8_message;
159     gpr_free(utf8_message);
160     __debugbreak();
161     abort();
162   }
163   socket->added_to_iocp = 1;
164   CHECK(ret == g_iocp);
165 }
166 
grpc_iocp_register_socket_shutdown_socket_locked(grpc_winsocket * socket)167 void grpc_iocp_register_socket_shutdown_socket_locked(grpc_winsocket* socket) {
168   if (!socket->shutdown_registered) {
169     socket->shutdown_registered = true;
170     gpr_atm_full_fetch_add(&g_pending_socket_shutdowns, 1);
171   }
172 }
173 
grpc_iocp_finish_socket_shutdown(grpc_winsocket * socket)174 void grpc_iocp_finish_socket_shutdown(grpc_winsocket* socket) {
175   if (socket->shutdown_registered) {
176     gpr_atm_full_fetch_add(&g_pending_socket_shutdowns, -1);
177   }
178 }
179 
180 #endif  // GRPC_WINSOCK_SOCKET
181