1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include <grpc/support/log.h>
26
27 #include "src/core/lib/gprpp/thd.h"
28 #include "src/core/lib/iomgr/iocp_windows.h"
29 #include "src/core/lib/iomgr/iomgr_internal.h"
30 #include "src/core/lib/iomgr/pollset.h"
31 #include "src/core/lib/iomgr/pollset_windows.h"
32
33 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
34
35 grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount");
36
37 gpr_mu grpc_polling_mu;
38 static grpc_pollset_worker* g_active_poller;
39 static grpc_pollset_worker g_global_root_worker;
40
pollset_global_init(void)41 static void pollset_global_init(void) {
42 gpr_mu_init(&grpc_polling_mu);
43 g_active_poller = NULL;
44 g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
45 g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
46 &g_global_root_worker;
47 }
48
pollset_global_shutdown(void)49 static void pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
50
remove_worker(grpc_pollset_worker * worker,grpc_pollset_worker_link_type type)51 static void remove_worker(grpc_pollset_worker* worker,
52 grpc_pollset_worker_link_type type) {
53 worker->links[type].prev->links[type].next = worker->links[type].next;
54 worker->links[type].next->links[type].prev = worker->links[type].prev;
55 worker->links[type].next = worker->links[type].prev = worker;
56 }
57
has_workers(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)58 static int has_workers(grpc_pollset_worker* root,
59 grpc_pollset_worker_link_type type) {
60 return root->links[type].next != root;
61 }
62
pop_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)63 static grpc_pollset_worker* pop_front_worker(
64 grpc_pollset_worker* root, grpc_pollset_worker_link_type type) {
65 if (has_workers(root, type)) {
66 grpc_pollset_worker* w = root->links[type].next;
67 remove_worker(w, type);
68 return w;
69 } else {
70 return NULL;
71 }
72 }
73
push_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type,grpc_pollset_worker * worker)74 static void push_front_worker(grpc_pollset_worker* root,
75 grpc_pollset_worker_link_type type,
76 grpc_pollset_worker* worker) {
77 worker->links[type].prev = root;
78 worker->links[type].next = worker->links[type].prev->links[type].next;
79 worker->links[type].prev->links[type].next =
80 worker->links[type].next->links[type].prev = worker;
81 }
82
pollset_size(void)83 static size_t pollset_size(void) { return sizeof(grpc_pollset); }
84
85 /* There isn't really any such thing as a pollset under Windows, due to the
86 nature of the IO completion ports. We're still going to provide a minimal
87 set of features for the sake of the rest of grpc. But grpc_pollset_work
88 won't actually do any polling, and return as quickly as possible. */
89
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)90 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
91 *mu = &grpc_polling_mu;
92 pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
93 pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
94 &pollset->root_worker;
95 }
96
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)97 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
98 pollset->shutting_down = 1;
99 grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
100 if (!pollset->is_iocp_worker) {
101 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
102 } else {
103 pollset->on_shutdown = closure;
104 }
105 }
106
pollset_destroy(grpc_pollset * pollset)107 static void pollset_destroy(grpc_pollset* pollset) {}
108
pollset_work(grpc_pollset * pollset,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)109 static grpc_error* pollset_work(grpc_pollset* pollset,
110 grpc_pollset_worker** worker_hdl,
111 grpc_millis deadline) {
112 grpc_pollset_worker worker;
113 if (worker_hdl) *worker_hdl = &worker;
114
115 int added_worker = 0;
116 worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
117 worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
118 worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
119 worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL;
120 worker.kicked = 0;
121 worker.pollset = pollset;
122 gpr_cv_init(&worker.cv);
123 if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
124 if (g_active_poller == NULL) {
125 grpc_pollset_worker* next_worker;
126 /* become poller */
127 pollset->is_iocp_worker = 1;
128 g_active_poller = &worker;
129 gpr_mu_unlock(&grpc_polling_mu);
130 grpc_iocp_work(deadline);
131 grpc_core::ExecCtx::Get()->Flush();
132 gpr_mu_lock(&grpc_polling_mu);
133 pollset->is_iocp_worker = 0;
134 g_active_poller = NULL;
135 /* try to get a worker from this pollsets worker list */
136 next_worker = pop_front_worker(&pollset->root_worker,
137 GRPC_POLLSET_WORKER_LINK_POLLSET);
138 if (next_worker == NULL) {
139 /* try to get a worker from the global list */
140 next_worker = pop_front_worker(&g_global_root_worker,
141 GRPC_POLLSET_WORKER_LINK_GLOBAL);
142 }
143 if (next_worker != NULL) {
144 next_worker->kicked = 1;
145 gpr_cv_signal(&next_worker->cv);
146 }
147
148 if (pollset->shutting_down && pollset->on_shutdown != NULL) {
149 grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->on_shutdown,
150 GRPC_ERROR_NONE);
151 pollset->on_shutdown = NULL;
152 }
153 goto done;
154 }
155 push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL,
156 &worker);
157 push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET,
158 &worker);
159 added_worker = 1;
160 while (!worker.kicked) {
161 if (gpr_cv_wait(&worker.cv, &grpc_polling_mu,
162 grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
163 grpc_core::ExecCtx::Get()->InvalidateNow();
164 break;
165 }
166 grpc_core::ExecCtx::Get()->InvalidateNow();
167 }
168 } else {
169 pollset->kicked_without_pollers = 0;
170 }
171 done:
172 if (!grpc_closure_list_empty(*grpc_core::ExecCtx::Get()->closure_list())) {
173 gpr_mu_unlock(&grpc_polling_mu);
174 grpc_core::ExecCtx::Get()->Flush();
175 gpr_mu_lock(&grpc_polling_mu);
176 }
177 if (added_worker) {
178 remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
179 remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
180 }
181 gpr_cv_destroy(&worker.cv);
182 if (worker_hdl) *worker_hdl = NULL;
183 return GRPC_ERROR_NONE;
184 }
185
pollset_kick(grpc_pollset * p,grpc_pollset_worker * specific_worker)186 static grpc_error* pollset_kick(grpc_pollset* p,
187 grpc_pollset_worker* specific_worker) {
188 bool should_kick_global = false;
189 if (specific_worker != NULL) {
190 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
191 should_kick_global = true;
192 for (specific_worker =
193 p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next;
194 specific_worker != &p->root_worker;
195 specific_worker =
196 specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) {
197 specific_worker->kicked = 1;
198 should_kick_global = false;
199 gpr_cv_signal(&specific_worker->cv);
200 }
201 p->kicked_without_pollers = 1;
202 if (p->is_iocp_worker) {
203 grpc_iocp_kick();
204 should_kick_global = false;
205 }
206 } else {
207 if (p->is_iocp_worker && g_active_poller == specific_worker) {
208 grpc_iocp_kick();
209 } else {
210 specific_worker->kicked = 1;
211 gpr_cv_signal(&specific_worker->cv);
212 }
213 }
214 } else {
215 specific_worker =
216 pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
217 if (specific_worker != NULL) {
218 grpc_pollset_kick(p, specific_worker);
219 } else if (p->is_iocp_worker) {
220 grpc_iocp_kick();
221 } else {
222 p->kicked_without_pollers = 1;
223 should_kick_global = true;
224 }
225 }
226 if (should_kick_global && g_active_poller == NULL) {
227 grpc_pollset_worker* next_global_worker = pop_front_worker(
228 &g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
229 if (next_global_worker != NULL) {
230 next_global_worker->kicked = 1;
231 gpr_cv_signal(&next_global_worker->cv);
232 }
233 }
234 return GRPC_ERROR_NONE;
235 }
236
237 grpc_pollset_vtable grpc_windows_pollset_vtable = {
238 pollset_global_init, pollset_global_shutdown,
239 pollset_init, pollset_shutdown,
240 pollset_destroy, pollset_work,
241 pollset_kick, pollset_size};
242
243 #endif /* GRPC_WINSOCK_SOCKET */
244