1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include <grpc/support/log.h>
26
27 #include "src/core/lib/gprpp/crash.h"
28 #include "src/core/lib/gprpp/thd.h"
29 #include "src/core/lib/iomgr/iocp_windows.h"
30 #include "src/core/lib/iomgr/iomgr_internal.h"
31 #include "src/core/lib/iomgr/pollset.h"
32 #include "src/core/lib/iomgr/pollset_windows.h"
33
34 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
35
36 grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount");
37
38 gpr_mu grpc_polling_mu;
39 static grpc_pollset_worker* g_active_poller;
40 static grpc_pollset_worker g_global_root_worker;
41
pollset_global_init(void)42 static void pollset_global_init(void) {
43 gpr_mu_init(&grpc_polling_mu);
44 g_active_poller = NULL;
45 g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
46 g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
47 &g_global_root_worker;
48 }
49
pollset_global_shutdown(void)50 static void pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
51
remove_worker(grpc_pollset_worker * worker,grpc_pollset_worker_link_type type)52 static void remove_worker(grpc_pollset_worker* worker,
53 grpc_pollset_worker_link_type type) {
54 worker->links[type].prev->links[type].next = worker->links[type].next;
55 worker->links[type].next->links[type].prev = worker->links[type].prev;
56 worker->links[type].next = worker->links[type].prev = worker;
57 }
58
has_workers(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)59 static int has_workers(grpc_pollset_worker* root,
60 grpc_pollset_worker_link_type type) {
61 return root->links[type].next != root;
62 }
63
pop_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)64 static grpc_pollset_worker* pop_front_worker(
65 grpc_pollset_worker* root, grpc_pollset_worker_link_type type) {
66 if (has_workers(root, type)) {
67 grpc_pollset_worker* w = root->links[type].next;
68 remove_worker(w, type);
69 return w;
70 } else {
71 return NULL;
72 }
73 }
74
push_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type,grpc_pollset_worker * worker)75 static void push_front_worker(grpc_pollset_worker* root,
76 grpc_pollset_worker_link_type type,
77 grpc_pollset_worker* worker) {
78 worker->links[type].prev = root;
79 worker->links[type].next = worker->links[type].prev->links[type].next;
80 worker->links[type].prev->links[type].next =
81 worker->links[type].next->links[type].prev = worker;
82 }
83
pollset_size(void)84 static size_t pollset_size(void) { return sizeof(grpc_pollset); }
85
86 // There isn't really any such thing as a pollset under Windows, due to the
87 // nature of the IO completion ports. We're still going to provide a minimal
88 // set of features for the sake of the rest of grpc. But grpc_pollset_work
89 // won't actually do any polling, and return as quickly as possible.
90
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)91 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
92 *mu = &grpc_polling_mu;
93 pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
94 pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
95 &pollset->root_worker;
96 }
97
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)98 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
99 pollset->shutting_down = 1;
100 std::ignore = grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
101 if (!pollset->is_iocp_worker) {
102 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
103 } else {
104 pollset->on_shutdown = closure;
105 }
106 }
107
pollset_destroy(grpc_pollset *)108 static void pollset_destroy(grpc_pollset* /* pollset */) {}
109
pollset_work(grpc_pollset * pollset,grpc_pollset_worker ** worker_hdl,grpc_core::Timestamp deadline)110 static grpc_error_handle pollset_work(grpc_pollset* pollset,
111 grpc_pollset_worker** worker_hdl,
112 grpc_core::Timestamp deadline) {
113 grpc_pollset_worker worker;
114 if (worker_hdl) *worker_hdl = &worker;
115
116 int added_worker = 0;
117 worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
118 worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
119 worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
120 worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL;
121 worker.kicked = 0;
122 worker.pollset = pollset;
123 gpr_cv_init(&worker.cv);
124 if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
125 if (g_active_poller == NULL) {
126 grpc_pollset_worker* next_worker;
127 // become poller
128 pollset->is_iocp_worker = 1;
129 g_active_poller = &worker;
130 gpr_mu_unlock(&grpc_polling_mu);
131 grpc_iocp_work(deadline);
132 grpc_core::ExecCtx::Get()->Flush();
133 gpr_mu_lock(&grpc_polling_mu);
134 pollset->is_iocp_worker = 0;
135 g_active_poller = NULL;
136 // try to get a worker from this pollsets worker list
137 next_worker = pop_front_worker(&pollset->root_worker,
138 GRPC_POLLSET_WORKER_LINK_POLLSET);
139 if (next_worker == NULL) {
140 // try to get a worker from the global list
141 next_worker = pop_front_worker(&g_global_root_worker,
142 GRPC_POLLSET_WORKER_LINK_GLOBAL);
143 }
144 if (next_worker != NULL) {
145 next_worker->kicked = 1;
146 gpr_cv_signal(&next_worker->cv);
147 }
148
149 if (pollset->shutting_down && pollset->on_shutdown != NULL) {
150 grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->on_shutdown,
151 absl::OkStatus());
152 pollset->on_shutdown = NULL;
153 }
154 goto done;
155 }
156 push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL,
157 &worker);
158 push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET,
159 &worker);
160 added_worker = 1;
161 while (!worker.kicked) {
162 if (gpr_cv_wait(&worker.cv, &grpc_polling_mu,
163 deadline.as_timespec(GPR_CLOCK_REALTIME))) {
164 grpc_core::ExecCtx::Get()->InvalidateNow();
165 break;
166 }
167 grpc_core::ExecCtx::Get()->InvalidateNow();
168 }
169 } else {
170 pollset->kicked_without_pollers = 0;
171 }
172 done:
173 if (!grpc_closure_list_empty(*grpc_core::ExecCtx::Get()->closure_list())) {
174 gpr_mu_unlock(&grpc_polling_mu);
175 grpc_core::ExecCtx::Get()->Flush();
176 gpr_mu_lock(&grpc_polling_mu);
177 }
178 if (added_worker) {
179 remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
180 remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
181 }
182 gpr_cv_destroy(&worker.cv);
183 if (worker_hdl) *worker_hdl = NULL;
184 return absl::OkStatus();
185 }
186
pollset_kick(grpc_pollset * p,grpc_pollset_worker * specific_worker)187 static grpc_error_handle pollset_kick(grpc_pollset* p,
188 grpc_pollset_worker* specific_worker) {
189 bool should_kick_global = false;
190 if (specific_worker != NULL) {
191 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
192 should_kick_global = true;
193 for (specific_worker =
194 p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next;
195 specific_worker != &p->root_worker;
196 specific_worker =
197 specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) {
198 specific_worker->kicked = 1;
199 should_kick_global = false;
200 gpr_cv_signal(&specific_worker->cv);
201 }
202 p->kicked_without_pollers = 1;
203 if (p->is_iocp_worker) {
204 grpc_iocp_kick();
205 should_kick_global = false;
206 }
207 } else {
208 if (p->is_iocp_worker && g_active_poller == specific_worker) {
209 grpc_iocp_kick();
210 } else {
211 specific_worker->kicked = 1;
212 gpr_cv_signal(&specific_worker->cv);
213 }
214 }
215 } else {
216 specific_worker =
217 pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
218 if (specific_worker != NULL) {
219 std::ignore = grpc_pollset_kick(p, specific_worker);
220 } else if (p->is_iocp_worker) {
221 grpc_iocp_kick();
222 } else {
223 p->kicked_without_pollers = 1;
224 should_kick_global = true;
225 }
226 }
227 if (should_kick_global && g_active_poller == NULL) {
228 grpc_pollset_worker* next_global_worker = pop_front_worker(
229 &g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
230 if (next_global_worker != NULL) {
231 next_global_worker->kicked = 1;
232 gpr_cv_signal(&next_global_worker->cv);
233 }
234 }
235 return absl::OkStatus();
236 }
237
238 grpc_pollset_vtable grpc_windows_pollset_vtable = {
239 pollset_global_init, pollset_global_shutdown,
240 pollset_init, pollset_shutdown,
241 pollset_destroy, pollset_work,
242 pollset_kick, pollset_size};
243
244 #endif // GRPC_WINSOCK_SOCKET
245