• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include <winsock2.h>
26 
27 #include <limits>
28 
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/log_windows.h>
32 
33 #include "src/core/lib/debug/stats.h"
34 #include "src/core/lib/debug/stats_data.h"
35 #include "src/core/lib/gprpp/crash.h"
36 #include "src/core/lib/gprpp/thd.h"
37 #include "src/core/lib/iomgr/iocp_windows.h"
38 #include "src/core/lib/iomgr/iomgr_internal.h"
39 #include "src/core/lib/iomgr/socket_windows.h"
40 #include "src/core/lib/iomgr/timer.h"
41 
42 static ULONG g_iocp_kick_token;
43 static OVERLAPPED g_iocp_custom_overlap;
44 
45 static gpr_atm g_custom_events = 0;
46 static gpr_atm g_pending_socket_shutdowns = 0;
47 
48 static HANDLE g_iocp;
49 
deadline_to_millis_timeout(grpc_core::Timestamp deadline)50 static DWORD deadline_to_millis_timeout(grpc_core::Timestamp deadline) {
51   if (deadline == grpc_core::Timestamp::InfFuture()) {
52     return INFINITE;
53   }
54   grpc_core::Timestamp now = grpc_core::Timestamp::Now();
55   if (deadline < now) return 0;
56   grpc_core::Duration timeout = deadline - now;
57   if (timeout.millis() > std::numeric_limits<DWORD>::max()) return INFINITE;
58   return static_cast<DWORD>(timeout.millis());
59 }
60 
grpc_iocp_work(grpc_core::Timestamp deadline)61 grpc_iocp_work_status grpc_iocp_work(grpc_core::Timestamp deadline) {
62   BOOL success;
63   DWORD bytes = 0;
64   DWORD flags = 0;
65   ULONG_PTR completion_key;
66   LPOVERLAPPED overlapped;
67   grpc_winsocket* socket;
68   grpc_winsocket_callback_info* info;
69   success =
70       GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped,
71                                 deadline_to_millis_timeout(deadline));
72   grpc_core::ExecCtx::Get()->InvalidateNow();
73   if (success == 0 && overlapped == NULL) {
74     return GRPC_IOCP_WORK_TIMEOUT;
75   }
76   GPR_ASSERT(completion_key && overlapped);
77   if (overlapped == &g_iocp_custom_overlap) {
78     gpr_atm_full_fetch_add(&g_custom_events, -1);
79     if (completion_key == (ULONG_PTR)&g_iocp_kick_token) {
80       // We were awoken from a kick.
81       return GRPC_IOCP_WORK_KICK;
82     }
83     grpc_core::Crash("Unknown custom completion key.");
84   }
85 
86   socket = (grpc_winsocket*)completion_key;
87   if (overlapped == &socket->write_info.overlapped) {
88     info = &socket->write_info;
89   } else if (overlapped == &socket->read_info.overlapped) {
90     info = &socket->read_info;
91   } else {
92     abort();
93   }
94   gpr_mu_lock(&socket->state_mu);
95   if (socket->shutdown_called) {
96     info->bytes_transferred = 0;
97     info->wsa_error = WSA_OPERATION_ABORTED;
98   } else {
99     success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
100                                      FALSE, &flags);
101     info->bytes_transferred = bytes;
102     info->wsa_error = success ? 0 : WSAGetLastError();
103   }
104   GPR_ASSERT(overlapped == &info->overlapped);
105   bool should_destroy = grpc_socket_become_ready(socket, info);
106   gpr_mu_unlock(&socket->state_mu);
107   if (should_destroy) {
108     grpc_winsocket_finish(socket);
109   }
110   return GRPC_IOCP_WORK_WORK;
111 }
112 
grpc_iocp_init(void)113 void grpc_iocp_init(void) {
114   g_iocp =
115       CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
116   GPR_ASSERT(g_iocp);
117 }
118 
grpc_iocp_kick(void)119 void grpc_iocp_kick(void) {
120   BOOL success;
121 
122   gpr_atm_full_fetch_add(&g_custom_events, 1);
123   success = PostQueuedCompletionStatus(g_iocp, 0, (ULONG_PTR)&g_iocp_kick_token,
124                                        &g_iocp_custom_overlap);
125   GPR_ASSERT(success);
126 }
127 
grpc_iocp_flush(void)128 void grpc_iocp_flush(void) {
129   grpc_core::ExecCtx exec_ctx;
130   grpc_iocp_work_status work_status;
131   // This method is called during grpc_shutdown. We make the loop
132   // spin until any pending socket shutdowns are complete.
133   do {
134     work_status = grpc_iocp_work(grpc_core::Timestamp::InfPast());
135   } while (work_status == GRPC_IOCP_WORK_KICK ||
136            grpc_core::ExecCtx::Get()->Flush() ||
137            gpr_atm_acq_load(&g_pending_socket_shutdowns) != 0);
138 }
139 
grpc_iocp_shutdown(void)140 void grpc_iocp_shutdown(void) {
141   grpc_core::ExecCtx exec_ctx;
142   while (gpr_atm_acq_load(&g_custom_events)) {
143     grpc_iocp_work(grpc_core::Timestamp::InfFuture());
144     grpc_core::ExecCtx::Get()->Flush();
145   }
146 
147   GPR_ASSERT(CloseHandle(g_iocp));
148 }
149 
grpc_iocp_add_socket(grpc_winsocket * socket)150 void grpc_iocp_add_socket(grpc_winsocket* socket) {
151   HANDLE ret;
152   if (socket->added_to_iocp) return;
153   ret = CreateIoCompletionPort((HANDLE)socket->socket, g_iocp,
154                                (uintptr_t)socket, 0);
155   if (!ret) {
156     char* utf8_message = gpr_format_message(WSAGetLastError());
157     gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message);
158     gpr_free(utf8_message);
159     __debugbreak();
160     abort();
161   }
162   socket->added_to_iocp = 1;
163   GPR_ASSERT(ret == g_iocp);
164 }
165 
grpc_iocp_register_socket_shutdown_socket_locked(grpc_winsocket * socket)166 void grpc_iocp_register_socket_shutdown_socket_locked(grpc_winsocket* socket) {
167   if (!socket->shutdown_registered) {
168     socket->shutdown_registered = true;
169     gpr_atm_full_fetch_add(&g_pending_socket_shutdowns, 1);
170   }
171 }
172 
grpc_iocp_finish_socket_shutdown(grpc_winsocket * socket)173 void grpc_iocp_finish_socket_shutdown(grpc_winsocket* socket) {
174   if (socket->shutdown_registered) {
175     gpr_atm_full_fetch_add(&g_pending_socket_shutdowns, -1);
176   }
177 }
178 
179 #endif  // GRPC_WINSOCK_SOCKET
180