• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include "src/core/lib/iomgr/iocp_windows.h"
26 #include "src/core/lib/iomgr/iomgr_internal.h"
27 #include "src/core/lib/iomgr/pollset.h"
28 #include "src/core/lib/iomgr/pollset_windows.h"
29 #include "src/core/util/crash.h"
30 #include "src/core/util/thd.h"
31 
32 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
33 
34 gpr_mu grpc_polling_mu;
35 static grpc_pollset_worker* g_active_poller;
36 static grpc_pollset_worker g_global_root_worker;
37 
pollset_global_init(void)38 static void pollset_global_init(void) {
39   gpr_mu_init(&grpc_polling_mu);
40   g_active_poller = NULL;
41   g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
42       g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
43           &g_global_root_worker;
44 }
45 
pollset_global_shutdown(void)46 static void pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
47 
remove_worker(grpc_pollset_worker * worker,grpc_pollset_worker_link_type type)48 static void remove_worker(grpc_pollset_worker* worker,
49                           grpc_pollset_worker_link_type type) {
50   worker->links[type].prev->links[type].next = worker->links[type].next;
51   worker->links[type].next->links[type].prev = worker->links[type].prev;
52   worker->links[type].next = worker->links[type].prev = worker;
53 }
54 
has_workers(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)55 static int has_workers(grpc_pollset_worker* root,
56                        grpc_pollset_worker_link_type type) {
57   return root->links[type].next != root;
58 }
59 
pop_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)60 static grpc_pollset_worker* pop_front_worker(
61     grpc_pollset_worker* root, grpc_pollset_worker_link_type type) {
62   if (has_workers(root, type)) {
63     grpc_pollset_worker* w = root->links[type].next;
64     remove_worker(w, type);
65     return w;
66   } else {
67     return NULL;
68   }
69 }
70 
push_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type,grpc_pollset_worker * worker)71 static void push_front_worker(grpc_pollset_worker* root,
72                               grpc_pollset_worker_link_type type,
73                               grpc_pollset_worker* worker) {
74   worker->links[type].prev = root;
75   worker->links[type].next = worker->links[type].prev->links[type].next;
76   worker->links[type].prev->links[type].next =
77       worker->links[type].next->links[type].prev = worker;
78 }
79 
pollset_size(void)80 static size_t pollset_size(void) { return sizeof(grpc_pollset); }
81 
82 // There isn't really any such thing as a pollset under Windows, due to the
83 // nature of the IO completion ports. We're still going to provide a minimal
84 // set of features for the sake of the rest of grpc. But grpc_pollset_work
85 // won't actually do any polling, and return as quickly as possible.
86 
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)87 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
88   *mu = &grpc_polling_mu;
89   pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
90       pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
91           &pollset->root_worker;
92 }
93 
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)94 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
95   pollset->shutting_down = 1;
96   std::ignore = grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
97   if (!pollset->is_iocp_worker) {
98     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
99   } else {
100     pollset->on_shutdown = closure;
101   }
102 }
103 
pollset_destroy(grpc_pollset *)104 static void pollset_destroy(grpc_pollset* /* pollset */) {}
105 
pollset_work(grpc_pollset * pollset,grpc_pollset_worker ** worker_hdl,grpc_core::Timestamp deadline)106 static grpc_error_handle pollset_work(grpc_pollset* pollset,
107                                       grpc_pollset_worker** worker_hdl,
108                                       grpc_core::Timestamp deadline) {
109   grpc_pollset_worker worker;
110   if (worker_hdl) *worker_hdl = &worker;
111 
112   int added_worker = 0;
113   worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
114       worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
115           worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
116               worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL;
117   worker.kicked = 0;
118   worker.pollset = pollset;
119   gpr_cv_init(&worker.cv);
120   if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
121     if (g_active_poller == NULL) {
122       grpc_pollset_worker* next_worker;
123       // become poller
124       pollset->is_iocp_worker = 1;
125       g_active_poller = &worker;
126       gpr_mu_unlock(&grpc_polling_mu);
127       grpc_iocp_work(deadline);
128       grpc_core::ExecCtx::Get()->Flush();
129       gpr_mu_lock(&grpc_polling_mu);
130       pollset->is_iocp_worker = 0;
131       g_active_poller = NULL;
132       // try to get a worker from this pollsets worker list
133       next_worker = pop_front_worker(&pollset->root_worker,
134                                      GRPC_POLLSET_WORKER_LINK_POLLSET);
135       if (next_worker == NULL) {
136         // try to get a worker from the global list
137         next_worker = pop_front_worker(&g_global_root_worker,
138                                        GRPC_POLLSET_WORKER_LINK_GLOBAL);
139       }
140       if (next_worker != NULL) {
141         next_worker->kicked = 1;
142         gpr_cv_signal(&next_worker->cv);
143       }
144 
145       if (pollset->shutting_down && pollset->on_shutdown != NULL) {
146         grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->on_shutdown,
147                                 absl::OkStatus());
148         pollset->on_shutdown = NULL;
149       }
150       goto done;
151     }
152     push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL,
153                       &worker);
154     push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET,
155                       &worker);
156     added_worker = 1;
157     while (!worker.kicked) {
158       if (gpr_cv_wait(&worker.cv, &grpc_polling_mu,
159                       deadline.as_timespec(GPR_CLOCK_REALTIME))) {
160         grpc_core::ExecCtx::Get()->InvalidateNow();
161         break;
162       }
163       grpc_core::ExecCtx::Get()->InvalidateNow();
164     }
165   } else {
166     pollset->kicked_without_pollers = 0;
167   }
168 done:
169   if (!grpc_closure_list_empty(*grpc_core::ExecCtx::Get()->closure_list())) {
170     gpr_mu_unlock(&grpc_polling_mu);
171     grpc_core::ExecCtx::Get()->Flush();
172     gpr_mu_lock(&grpc_polling_mu);
173   }
174   if (added_worker) {
175     remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
176     remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
177   }
178   gpr_cv_destroy(&worker.cv);
179   if (worker_hdl) *worker_hdl = NULL;
180   return absl::OkStatus();
181 }
182 
pollset_kick(grpc_pollset * p,grpc_pollset_worker * specific_worker)183 static grpc_error_handle pollset_kick(grpc_pollset* p,
184                                       grpc_pollset_worker* specific_worker) {
185   bool should_kick_global = false;
186   if (specific_worker != NULL) {
187     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
188       should_kick_global = true;
189       for (specific_worker =
190                p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next;
191            specific_worker != &p->root_worker;
192            specific_worker =
193                specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) {
194         specific_worker->kicked = 1;
195         should_kick_global = false;
196         gpr_cv_signal(&specific_worker->cv);
197       }
198       p->kicked_without_pollers = 1;
199       if (p->is_iocp_worker) {
200         grpc_iocp_kick();
201         should_kick_global = false;
202       }
203     } else {
204       if (p->is_iocp_worker && g_active_poller == specific_worker) {
205         grpc_iocp_kick();
206       } else {
207         specific_worker->kicked = 1;
208         gpr_cv_signal(&specific_worker->cv);
209       }
210     }
211   } else {
212     specific_worker =
213         pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
214     if (specific_worker != NULL) {
215       std::ignore = grpc_pollset_kick(p, specific_worker);
216     } else if (p->is_iocp_worker) {
217       grpc_iocp_kick();
218     } else {
219       p->kicked_without_pollers = 1;
220       should_kick_global = true;
221     }
222   }
223   if (should_kick_global && g_active_poller == NULL) {
224     grpc_pollset_worker* next_global_worker = pop_front_worker(
225         &g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
226     if (next_global_worker != NULL) {
227       next_global_worker->kicked = 1;
228       gpr_cv_signal(&next_global_worker->cv);
229     }
230   }
231   return absl::OkStatus();
232 }
233 
234 grpc_pollset_vtable grpc_windows_pollset_vtable = {
235     pollset_global_init, pollset_global_shutdown,
236     pollset_init,        pollset_shutdown,
237     pollset_destroy,     pollset_work,
238     pollset_kick,        pollset_size};
239 
240 #endif  // GRPC_WINSOCK_SOCKET
241