• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include <grpc/support/log.h>
26 
27 #include "src/core/lib/gprpp/thd.h"
28 #include "src/core/lib/iomgr/iocp_windows.h"
29 #include "src/core/lib/iomgr/iomgr_internal.h"
30 #include "src/core/lib/iomgr/pollset.h"
31 #include "src/core/lib/iomgr/pollset_windows.h"
32 
33 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
34 
35 grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount");
36 
37 gpr_mu grpc_polling_mu;
38 static grpc_pollset_worker* g_active_poller;
39 static grpc_pollset_worker g_global_root_worker;
40 
pollset_global_init(void)41 static void pollset_global_init(void) {
42   gpr_mu_init(&grpc_polling_mu);
43   g_active_poller = NULL;
44   g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
45       g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
46           &g_global_root_worker;
47 }
48 
pollset_global_shutdown(void)49 static void pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
50 
remove_worker(grpc_pollset_worker * worker,grpc_pollset_worker_link_type type)51 static void remove_worker(grpc_pollset_worker* worker,
52                           grpc_pollset_worker_link_type type) {
53   worker->links[type].prev->links[type].next = worker->links[type].next;
54   worker->links[type].next->links[type].prev = worker->links[type].prev;
55   worker->links[type].next = worker->links[type].prev = worker;
56 }
57 
has_workers(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)58 static int has_workers(grpc_pollset_worker* root,
59                        grpc_pollset_worker_link_type type) {
60   return root->links[type].next != root;
61 }
62 
pop_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)63 static grpc_pollset_worker* pop_front_worker(
64     grpc_pollset_worker* root, grpc_pollset_worker_link_type type) {
65   if (has_workers(root, type)) {
66     grpc_pollset_worker* w = root->links[type].next;
67     remove_worker(w, type);
68     return w;
69   } else {
70     return NULL;
71   }
72 }
73 
push_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type,grpc_pollset_worker * worker)74 static void push_front_worker(grpc_pollset_worker* root,
75                               grpc_pollset_worker_link_type type,
76                               grpc_pollset_worker* worker) {
77   worker->links[type].prev = root;
78   worker->links[type].next = worker->links[type].prev->links[type].next;
79   worker->links[type].prev->links[type].next =
80       worker->links[type].next->links[type].prev = worker;
81 }
82 
pollset_size(void)83 static size_t pollset_size(void) { return sizeof(grpc_pollset); }
84 
85 /* There isn't really any such thing as a pollset under Windows, due to the
86    nature of the IO completion ports. We're still going to provide a minimal
87    set of features for the sake of the rest of grpc. But grpc_pollset_work
88    won't actually do any polling, and return as quickly as possible. */
89 
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)90 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
91   *mu = &grpc_polling_mu;
92   pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
93       pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
94           &pollset->root_worker;
95 }
96 
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)97 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
98   pollset->shutting_down = 1;
99   grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
100   if (!pollset->is_iocp_worker) {
101     GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
102   } else {
103     pollset->on_shutdown = closure;
104   }
105 }
106 
pollset_destroy(grpc_pollset * pollset)107 static void pollset_destroy(grpc_pollset* pollset) {}
108 
pollset_work(grpc_pollset * pollset,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)109 static grpc_error* pollset_work(grpc_pollset* pollset,
110                                 grpc_pollset_worker** worker_hdl,
111                                 grpc_millis deadline) {
112   grpc_pollset_worker worker;
113   if (worker_hdl) *worker_hdl = &worker;
114 
115   int added_worker = 0;
116   worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
117       worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
118           worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
119               worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL;
120   worker.kicked = 0;
121   worker.pollset = pollset;
122   gpr_cv_init(&worker.cv);
123   if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
124     if (g_active_poller == NULL) {
125       grpc_pollset_worker* next_worker;
126       /* become poller */
127       pollset->is_iocp_worker = 1;
128       g_active_poller = &worker;
129       gpr_mu_unlock(&grpc_polling_mu);
130       grpc_iocp_work(deadline);
131       grpc_core::ExecCtx::Get()->Flush();
132       gpr_mu_lock(&grpc_polling_mu);
133       pollset->is_iocp_worker = 0;
134       g_active_poller = NULL;
135       /* try to get a worker from this pollsets worker list */
136       next_worker = pop_front_worker(&pollset->root_worker,
137                                      GRPC_POLLSET_WORKER_LINK_POLLSET);
138       if (next_worker == NULL) {
139         /* try to get a worker from the global list */
140         next_worker = pop_front_worker(&g_global_root_worker,
141                                        GRPC_POLLSET_WORKER_LINK_GLOBAL);
142       }
143       if (next_worker != NULL) {
144         next_worker->kicked = 1;
145         gpr_cv_signal(&next_worker->cv);
146       }
147 
148       if (pollset->shutting_down && pollset->on_shutdown != NULL) {
149         GRPC_CLOSURE_SCHED(pollset->on_shutdown, GRPC_ERROR_NONE);
150         pollset->on_shutdown = NULL;
151       }
152       goto done;
153     }
154     push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL,
155                       &worker);
156     push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET,
157                       &worker);
158     added_worker = 1;
159     while (!worker.kicked) {
160       if (gpr_cv_wait(&worker.cv, &grpc_polling_mu,
161                       grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
162         grpc_core::ExecCtx::Get()->InvalidateNow();
163         break;
164       }
165       grpc_core::ExecCtx::Get()->InvalidateNow();
166     }
167   } else {
168     pollset->kicked_without_pollers = 0;
169   }
170 done:
171   if (!grpc_closure_list_empty(*grpc_core::ExecCtx::Get()->closure_list())) {
172     gpr_mu_unlock(&grpc_polling_mu);
173     grpc_core::ExecCtx::Get()->Flush();
174     gpr_mu_lock(&grpc_polling_mu);
175   }
176   if (added_worker) {
177     remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
178     remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
179   }
180   gpr_cv_destroy(&worker.cv);
181   if (worker_hdl) *worker_hdl = NULL;
182   return GRPC_ERROR_NONE;
183 }
184 
pollset_kick(grpc_pollset * p,grpc_pollset_worker * specific_worker)185 static grpc_error* pollset_kick(grpc_pollset* p,
186                                 grpc_pollset_worker* specific_worker) {
187   if (specific_worker != NULL) {
188     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
189       for (specific_worker =
190                p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next;
191            specific_worker != &p->root_worker;
192            specific_worker =
193                specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) {
194         specific_worker->kicked = 1;
195         gpr_cv_signal(&specific_worker->cv);
196       }
197       p->kicked_without_pollers = 1;
198       if (p->is_iocp_worker) {
199         grpc_iocp_kick();
200       }
201     } else {
202       if (p->is_iocp_worker && g_active_poller == specific_worker) {
203         grpc_iocp_kick();
204       } else {
205         specific_worker->kicked = 1;
206         gpr_cv_signal(&specific_worker->cv);
207       }
208     }
209   } else {
210     specific_worker =
211         pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
212     if (specific_worker != NULL) {
213       grpc_pollset_kick(p, specific_worker);
214     } else if (p->is_iocp_worker) {
215       grpc_iocp_kick();
216     } else {
217       p->kicked_without_pollers = 1;
218     }
219   }
220   return GRPC_ERROR_NONE;
221 }
222 
223 grpc_pollset_vtable grpc_windows_pollset_vtable = {
224     pollset_global_init, pollset_global_shutdown,
225     pollset_init,        pollset_shutdown,
226     pollset_destroy,     pollset_work,
227     pollset_kick,        pollset_size};
228 
229 #endif /* GRPC_WINSOCK_SOCKET */
230