1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include <grpc/support/log.h>
26
27 #include "src/core/lib/gprpp/thd.h"
28 #include "src/core/lib/iomgr/iocp_windows.h"
29 #include "src/core/lib/iomgr/iomgr_internal.h"
30 #include "src/core/lib/iomgr/pollset.h"
31 #include "src/core/lib/iomgr/pollset_windows.h"
32
33 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
34
35 grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount");
36
37 gpr_mu grpc_polling_mu;
38 static grpc_pollset_worker* g_active_poller;
39 static grpc_pollset_worker g_global_root_worker;
40
pollset_global_init(void)41 static void pollset_global_init(void) {
42 gpr_mu_init(&grpc_polling_mu);
43 g_active_poller = NULL;
44 g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
45 g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
46 &g_global_root_worker;
47 }
48
pollset_global_shutdown(void)49 static void pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
50
remove_worker(grpc_pollset_worker * worker,grpc_pollset_worker_link_type type)51 static void remove_worker(grpc_pollset_worker* worker,
52 grpc_pollset_worker_link_type type) {
53 worker->links[type].prev->links[type].next = worker->links[type].next;
54 worker->links[type].next->links[type].prev = worker->links[type].prev;
55 worker->links[type].next = worker->links[type].prev = worker;
56 }
57
has_workers(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)58 static int has_workers(grpc_pollset_worker* root,
59 grpc_pollset_worker_link_type type) {
60 return root->links[type].next != root;
61 }
62
pop_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)63 static grpc_pollset_worker* pop_front_worker(
64 grpc_pollset_worker* root, grpc_pollset_worker_link_type type) {
65 if (has_workers(root, type)) {
66 grpc_pollset_worker* w = root->links[type].next;
67 remove_worker(w, type);
68 return w;
69 } else {
70 return NULL;
71 }
72 }
73
push_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type,grpc_pollset_worker * worker)74 static void push_front_worker(grpc_pollset_worker* root,
75 grpc_pollset_worker_link_type type,
76 grpc_pollset_worker* worker) {
77 worker->links[type].prev = root;
78 worker->links[type].next = worker->links[type].prev->links[type].next;
79 worker->links[type].prev->links[type].next =
80 worker->links[type].next->links[type].prev = worker;
81 }
82
pollset_size(void)83 static size_t pollset_size(void) { return sizeof(grpc_pollset); }
84
85 /* There isn't really any such thing as a pollset under Windows, due to the
86 nature of the IO completion ports. We're still going to provide a minimal
87 set of features for the sake of the rest of grpc. But grpc_pollset_work
88 won't actually do any polling, and return as quickly as possible. */
89
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)90 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
91 *mu = &grpc_polling_mu;
92 pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
93 pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
94 &pollset->root_worker;
95 }
96
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)97 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
98 pollset->shutting_down = 1;
99 grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
100 if (!pollset->is_iocp_worker) {
101 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
102 } else {
103 pollset->on_shutdown = closure;
104 }
105 }
106
pollset_destroy(grpc_pollset * pollset)107 static void pollset_destroy(grpc_pollset* pollset) {}
108
pollset_work(grpc_pollset * pollset,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)109 static grpc_error* pollset_work(grpc_pollset* pollset,
110 grpc_pollset_worker** worker_hdl,
111 grpc_millis deadline) {
112 grpc_pollset_worker worker;
113 if (worker_hdl) *worker_hdl = &worker;
114
115 int added_worker = 0;
116 worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
117 worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
118 worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
119 worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL;
120 worker.kicked = 0;
121 worker.pollset = pollset;
122 gpr_cv_init(&worker.cv);
123 if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
124 if (g_active_poller == NULL) {
125 grpc_pollset_worker* next_worker;
126 /* become poller */
127 pollset->is_iocp_worker = 1;
128 g_active_poller = &worker;
129 gpr_mu_unlock(&grpc_polling_mu);
130 grpc_iocp_work(deadline);
131 grpc_core::ExecCtx::Get()->Flush();
132 gpr_mu_lock(&grpc_polling_mu);
133 pollset->is_iocp_worker = 0;
134 g_active_poller = NULL;
135 /* try to get a worker from this pollsets worker list */
136 next_worker = pop_front_worker(&pollset->root_worker,
137 GRPC_POLLSET_WORKER_LINK_POLLSET);
138 if (next_worker == NULL) {
139 /* try to get a worker from the global list */
140 next_worker = pop_front_worker(&g_global_root_worker,
141 GRPC_POLLSET_WORKER_LINK_GLOBAL);
142 }
143 if (next_worker != NULL) {
144 next_worker->kicked = 1;
145 gpr_cv_signal(&next_worker->cv);
146 }
147
148 if (pollset->shutting_down && pollset->on_shutdown != NULL) {
149 GRPC_CLOSURE_SCHED(pollset->on_shutdown, GRPC_ERROR_NONE);
150 pollset->on_shutdown = NULL;
151 }
152 goto done;
153 }
154 push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL,
155 &worker);
156 push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET,
157 &worker);
158 added_worker = 1;
159 while (!worker.kicked) {
160 if (gpr_cv_wait(&worker.cv, &grpc_polling_mu,
161 grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
162 grpc_core::ExecCtx::Get()->InvalidateNow();
163 break;
164 }
165 grpc_core::ExecCtx::Get()->InvalidateNow();
166 }
167 } else {
168 pollset->kicked_without_pollers = 0;
169 }
170 done:
171 if (!grpc_closure_list_empty(*grpc_core::ExecCtx::Get()->closure_list())) {
172 gpr_mu_unlock(&grpc_polling_mu);
173 grpc_core::ExecCtx::Get()->Flush();
174 gpr_mu_lock(&grpc_polling_mu);
175 }
176 if (added_worker) {
177 remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
178 remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
179 }
180 gpr_cv_destroy(&worker.cv);
181 if (worker_hdl) *worker_hdl = NULL;
182 return GRPC_ERROR_NONE;
183 }
184
pollset_kick(grpc_pollset * p,grpc_pollset_worker * specific_worker)185 static grpc_error* pollset_kick(grpc_pollset* p,
186 grpc_pollset_worker* specific_worker) {
187 if (specific_worker != NULL) {
188 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
189 for (specific_worker =
190 p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next;
191 specific_worker != &p->root_worker;
192 specific_worker =
193 specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) {
194 specific_worker->kicked = 1;
195 gpr_cv_signal(&specific_worker->cv);
196 }
197 p->kicked_without_pollers = 1;
198 if (p->is_iocp_worker) {
199 grpc_iocp_kick();
200 }
201 } else {
202 if (p->is_iocp_worker && g_active_poller == specific_worker) {
203 grpc_iocp_kick();
204 } else {
205 specific_worker->kicked = 1;
206 gpr_cv_signal(&specific_worker->cv);
207 }
208 }
209 } else {
210 specific_worker =
211 pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
212 if (specific_worker != NULL) {
213 grpc_pollset_kick(p, specific_worker);
214 } else if (p->is_iocp_worker) {
215 grpc_iocp_kick();
216 } else {
217 p->kicked_without_pollers = 1;
218 }
219 }
220 return GRPC_ERROR_NONE;
221 }
222
223 grpc_pollset_vtable grpc_windows_pollset_vtable = {
224 pollset_global_init, pollset_global_shutdown,
225 pollset_init, pollset_shutdown,
226 pollset_destroy, pollset_work,
227 pollset_kick, pollset_size};
228
229 #endif /* GRPC_WINSOCK_SOCKET */
230