• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include <grpc/support/log.h>
26 
27 #include "src/core/lib/gprpp/thd.h"
28 #include "src/core/lib/iomgr/iocp_windows.h"
29 #include "src/core/lib/iomgr/iomgr_internal.h"
30 #include "src/core/lib/iomgr/pollset.h"
31 #include "src/core/lib/iomgr/pollset_windows.h"
32 
33 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
34 
35 grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount");
36 
37 gpr_mu grpc_polling_mu;
38 static grpc_pollset_worker* g_active_poller;
39 static grpc_pollset_worker g_global_root_worker;
40 
pollset_global_init(void)41 static void pollset_global_init(void) {
42   gpr_mu_init(&grpc_polling_mu);
43   g_active_poller = NULL;
44   g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
45       g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
46           &g_global_root_worker;
47 }
48 
pollset_global_shutdown(void)49 static void pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
50 
remove_worker(grpc_pollset_worker * worker,grpc_pollset_worker_link_type type)51 static void remove_worker(grpc_pollset_worker* worker,
52                           grpc_pollset_worker_link_type type) {
53   worker->links[type].prev->links[type].next = worker->links[type].next;
54   worker->links[type].next->links[type].prev = worker->links[type].prev;
55   worker->links[type].next = worker->links[type].prev = worker;
56 }
57 
has_workers(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)58 static int has_workers(grpc_pollset_worker* root,
59                        grpc_pollset_worker_link_type type) {
60   return root->links[type].next != root;
61 }
62 
pop_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)63 static grpc_pollset_worker* pop_front_worker(
64     grpc_pollset_worker* root, grpc_pollset_worker_link_type type) {
65   if (has_workers(root, type)) {
66     grpc_pollset_worker* w = root->links[type].next;
67     remove_worker(w, type);
68     return w;
69   } else {
70     return NULL;
71   }
72 }
73 
push_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type,grpc_pollset_worker * worker)74 static void push_front_worker(grpc_pollset_worker* root,
75                               grpc_pollset_worker_link_type type,
76                               grpc_pollset_worker* worker) {
77   worker->links[type].prev = root;
78   worker->links[type].next = worker->links[type].prev->links[type].next;
79   worker->links[type].prev->links[type].next =
80       worker->links[type].next->links[type].prev = worker;
81 }
82 
pollset_size(void)83 static size_t pollset_size(void) { return sizeof(grpc_pollset); }
84 
85 /* There isn't really any such thing as a pollset under Windows, due to the
86    nature of the IO completion ports. We're still going to provide a minimal
87    set of features for the sake of the rest of grpc. But grpc_pollset_work
88    won't actually do any polling, and return as quickly as possible. */
89 
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)90 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
91   *mu = &grpc_polling_mu;
92   pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
93       pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
94           &pollset->root_worker;
95 }
96 
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)97 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
98   pollset->shutting_down = 1;
99   grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
100   if (!pollset->is_iocp_worker) {
101     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
102   } else {
103     pollset->on_shutdown = closure;
104   }
105 }
106 
pollset_destroy(grpc_pollset * pollset)107 static void pollset_destroy(grpc_pollset* pollset) {}
108 
pollset_work(grpc_pollset * pollset,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)109 static grpc_error* pollset_work(grpc_pollset* pollset,
110                                 grpc_pollset_worker** worker_hdl,
111                                 grpc_millis deadline) {
112   grpc_pollset_worker worker;
113   if (worker_hdl) *worker_hdl = &worker;
114 
115   int added_worker = 0;
116   worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
117       worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
118           worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
119               worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL;
120   worker.kicked = 0;
121   worker.pollset = pollset;
122   gpr_cv_init(&worker.cv);
123   if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
124     if (g_active_poller == NULL) {
125       grpc_pollset_worker* next_worker;
126       /* become poller */
127       pollset->is_iocp_worker = 1;
128       g_active_poller = &worker;
129       gpr_mu_unlock(&grpc_polling_mu);
130       grpc_iocp_work(deadline);
131       grpc_core::ExecCtx::Get()->Flush();
132       gpr_mu_lock(&grpc_polling_mu);
133       pollset->is_iocp_worker = 0;
134       g_active_poller = NULL;
135       /* try to get a worker from this pollsets worker list */
136       next_worker = pop_front_worker(&pollset->root_worker,
137                                      GRPC_POLLSET_WORKER_LINK_POLLSET);
138       if (next_worker == NULL) {
139         /* try to get a worker from the global list */
140         next_worker = pop_front_worker(&g_global_root_worker,
141                                        GRPC_POLLSET_WORKER_LINK_GLOBAL);
142       }
143       if (next_worker != NULL) {
144         next_worker->kicked = 1;
145         gpr_cv_signal(&next_worker->cv);
146       }
147 
148       if (pollset->shutting_down && pollset->on_shutdown != NULL) {
149         grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->on_shutdown,
150                                 GRPC_ERROR_NONE);
151         pollset->on_shutdown = NULL;
152       }
153       goto done;
154     }
155     push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL,
156                       &worker);
157     push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET,
158                       &worker);
159     added_worker = 1;
160     while (!worker.kicked) {
161       if (gpr_cv_wait(&worker.cv, &grpc_polling_mu,
162                       grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
163         grpc_core::ExecCtx::Get()->InvalidateNow();
164         break;
165       }
166       grpc_core::ExecCtx::Get()->InvalidateNow();
167     }
168   } else {
169     pollset->kicked_without_pollers = 0;
170   }
171 done:
172   if (!grpc_closure_list_empty(*grpc_core::ExecCtx::Get()->closure_list())) {
173     gpr_mu_unlock(&grpc_polling_mu);
174     grpc_core::ExecCtx::Get()->Flush();
175     gpr_mu_lock(&grpc_polling_mu);
176   }
177   if (added_worker) {
178     remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
179     remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
180   }
181   gpr_cv_destroy(&worker.cv);
182   if (worker_hdl) *worker_hdl = NULL;
183   return GRPC_ERROR_NONE;
184 }
185 
pollset_kick(grpc_pollset * p,grpc_pollset_worker * specific_worker)186 static grpc_error* pollset_kick(grpc_pollset* p,
187                                 grpc_pollset_worker* specific_worker) {
188   bool should_kick_global = false;
189   if (specific_worker != NULL) {
190     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
191       should_kick_global = true;
192       for (specific_worker =
193                p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next;
194            specific_worker != &p->root_worker;
195            specific_worker =
196                specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) {
197         specific_worker->kicked = 1;
198         should_kick_global = false;
199         gpr_cv_signal(&specific_worker->cv);
200       }
201       p->kicked_without_pollers = 1;
202       if (p->is_iocp_worker) {
203         grpc_iocp_kick();
204         should_kick_global = false;
205       }
206     } else {
207       if (p->is_iocp_worker && g_active_poller == specific_worker) {
208         grpc_iocp_kick();
209       } else {
210         specific_worker->kicked = 1;
211         gpr_cv_signal(&specific_worker->cv);
212       }
213     }
214   } else {
215     specific_worker =
216         pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
217     if (specific_worker != NULL) {
218       grpc_pollset_kick(p, specific_worker);
219     } else if (p->is_iocp_worker) {
220       grpc_iocp_kick();
221     } else {
222       p->kicked_without_pollers = 1;
223       should_kick_global = true;
224     }
225   }
226   if (should_kick_global && g_active_poller == NULL) {
227     grpc_pollset_worker* next_global_worker = pop_front_worker(
228         &g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
229     if (next_global_worker != NULL) {
230       next_global_worker->kicked = 1;
231       gpr_cv_signal(&next_global_worker->cv);
232     }
233   }
234   return GRPC_ERROR_NONE;
235 }
236 
237 grpc_pollset_vtable grpc_windows_pollset_vtable = {
238     pollset_global_init, pollset_global_shutdown,
239     pollset_init,        pollset_shutdown,
240     pollset_destroy,     pollset_work,
241     pollset_kick,        pollset_size};
242 
243 #endif /* GRPC_WINSOCK_SOCKET */
244