• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include <grpc/support/log.h>
26 
27 #include "src/core/lib/gprpp/crash.h"
28 #include "src/core/lib/gprpp/thd.h"
29 #include "src/core/lib/iomgr/iocp_windows.h"
30 #include "src/core/lib/iomgr/iomgr_internal.h"
31 #include "src/core/lib/iomgr/pollset.h"
32 #include "src/core/lib/iomgr/pollset_windows.h"
33 
34 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
35 
36 grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount");
37 
38 gpr_mu grpc_polling_mu;
39 static grpc_pollset_worker* g_active_poller;
40 static grpc_pollset_worker g_global_root_worker;
41 
pollset_global_init(void)42 static void pollset_global_init(void) {
43   gpr_mu_init(&grpc_polling_mu);
44   g_active_poller = NULL;
45   g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
46       g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
47           &g_global_root_worker;
48 }
49 
pollset_global_shutdown(void)50 static void pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
51 
remove_worker(grpc_pollset_worker * worker,grpc_pollset_worker_link_type type)52 static void remove_worker(grpc_pollset_worker* worker,
53                           grpc_pollset_worker_link_type type) {
54   worker->links[type].prev->links[type].next = worker->links[type].next;
55   worker->links[type].next->links[type].prev = worker->links[type].prev;
56   worker->links[type].next = worker->links[type].prev = worker;
57 }
58 
has_workers(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)59 static int has_workers(grpc_pollset_worker* root,
60                        grpc_pollset_worker_link_type type) {
61   return root->links[type].next != root;
62 }
63 
pop_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type)64 static grpc_pollset_worker* pop_front_worker(
65     grpc_pollset_worker* root, grpc_pollset_worker_link_type type) {
66   if (has_workers(root, type)) {
67     grpc_pollset_worker* w = root->links[type].next;
68     remove_worker(w, type);
69     return w;
70   } else {
71     return NULL;
72   }
73 }
74 
push_front_worker(grpc_pollset_worker * root,grpc_pollset_worker_link_type type,grpc_pollset_worker * worker)75 static void push_front_worker(grpc_pollset_worker* root,
76                               grpc_pollset_worker_link_type type,
77                               grpc_pollset_worker* worker) {
78   worker->links[type].prev = root;
79   worker->links[type].next = worker->links[type].prev->links[type].next;
80   worker->links[type].prev->links[type].next =
81       worker->links[type].next->links[type].prev = worker;
82 }
83 
pollset_size(void)84 static size_t pollset_size(void) { return sizeof(grpc_pollset); }
85 
86 // There isn't really any such thing as a pollset under Windows, due to the
87 // nature of the IO completion ports. We're still going to provide a minimal
88 // set of features for the sake of the rest of grpc. But grpc_pollset_work
89 // won't actually do any polling, and return as quickly as possible.
90 
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)91 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
92   *mu = &grpc_polling_mu;
93   pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
94       pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
95           &pollset->root_worker;
96 }
97 
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)98 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
99   pollset->shutting_down = 1;
100   std::ignore = grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
101   if (!pollset->is_iocp_worker) {
102     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
103   } else {
104     pollset->on_shutdown = closure;
105   }
106 }
107 
pollset_destroy(grpc_pollset *)108 static void pollset_destroy(grpc_pollset* /* pollset */) {}
109 
pollset_work(grpc_pollset * pollset,grpc_pollset_worker ** worker_hdl,grpc_core::Timestamp deadline)110 static grpc_error_handle pollset_work(grpc_pollset* pollset,
111                                       grpc_pollset_worker** worker_hdl,
112                                       grpc_core::Timestamp deadline) {
113   grpc_pollset_worker worker;
114   if (worker_hdl) *worker_hdl = &worker;
115 
116   int added_worker = 0;
117   worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
118       worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
119           worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
120               worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL;
121   worker.kicked = 0;
122   worker.pollset = pollset;
123   gpr_cv_init(&worker.cv);
124   if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
125     if (g_active_poller == NULL) {
126       grpc_pollset_worker* next_worker;
127       // become poller
128       pollset->is_iocp_worker = 1;
129       g_active_poller = &worker;
130       gpr_mu_unlock(&grpc_polling_mu);
131       grpc_iocp_work(deadline);
132       grpc_core::ExecCtx::Get()->Flush();
133       gpr_mu_lock(&grpc_polling_mu);
134       pollset->is_iocp_worker = 0;
135       g_active_poller = NULL;
136       // try to get a worker from this pollsets worker list
137       next_worker = pop_front_worker(&pollset->root_worker,
138                                      GRPC_POLLSET_WORKER_LINK_POLLSET);
139       if (next_worker == NULL) {
140         // try to get a worker from the global list
141         next_worker = pop_front_worker(&g_global_root_worker,
142                                        GRPC_POLLSET_WORKER_LINK_GLOBAL);
143       }
144       if (next_worker != NULL) {
145         next_worker->kicked = 1;
146         gpr_cv_signal(&next_worker->cv);
147       }
148 
149       if (pollset->shutting_down && pollset->on_shutdown != NULL) {
150         grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->on_shutdown,
151                                 absl::OkStatus());
152         pollset->on_shutdown = NULL;
153       }
154       goto done;
155     }
156     push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL,
157                       &worker);
158     push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET,
159                       &worker);
160     added_worker = 1;
161     while (!worker.kicked) {
162       if (gpr_cv_wait(&worker.cv, &grpc_polling_mu,
163                       deadline.as_timespec(GPR_CLOCK_REALTIME))) {
164         grpc_core::ExecCtx::Get()->InvalidateNow();
165         break;
166       }
167       grpc_core::ExecCtx::Get()->InvalidateNow();
168     }
169   } else {
170     pollset->kicked_without_pollers = 0;
171   }
172 done:
173   if (!grpc_closure_list_empty(*grpc_core::ExecCtx::Get()->closure_list())) {
174     gpr_mu_unlock(&grpc_polling_mu);
175     grpc_core::ExecCtx::Get()->Flush();
176     gpr_mu_lock(&grpc_polling_mu);
177   }
178   if (added_worker) {
179     remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
180     remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
181   }
182   gpr_cv_destroy(&worker.cv);
183   if (worker_hdl) *worker_hdl = NULL;
184   return absl::OkStatus();
185 }
186 
pollset_kick(grpc_pollset * p,grpc_pollset_worker * specific_worker)187 static grpc_error_handle pollset_kick(grpc_pollset* p,
188                                       grpc_pollset_worker* specific_worker) {
189   bool should_kick_global = false;
190   if (specific_worker != NULL) {
191     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
192       should_kick_global = true;
193       for (specific_worker =
194                p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next;
195            specific_worker != &p->root_worker;
196            specific_worker =
197                specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) {
198         specific_worker->kicked = 1;
199         should_kick_global = false;
200         gpr_cv_signal(&specific_worker->cv);
201       }
202       p->kicked_without_pollers = 1;
203       if (p->is_iocp_worker) {
204         grpc_iocp_kick();
205         should_kick_global = false;
206       }
207     } else {
208       if (p->is_iocp_worker && g_active_poller == specific_worker) {
209         grpc_iocp_kick();
210       } else {
211         specific_worker->kicked = 1;
212         gpr_cv_signal(&specific_worker->cv);
213       }
214     }
215   } else {
216     specific_worker =
217         pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
218     if (specific_worker != NULL) {
219       std::ignore = grpc_pollset_kick(p, specific_worker);
220     } else if (p->is_iocp_worker) {
221       grpc_iocp_kick();
222     } else {
223       p->kicked_without_pollers = 1;
224       should_kick_global = true;
225     }
226   }
227   if (should_kick_global && g_active_poller == NULL) {
228     grpc_pollset_worker* next_global_worker = pop_front_worker(
229         &g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
230     if (next_global_worker != NULL) {
231       next_global_worker->kicked = 1;
232       gpr_cv_signal(&next_global_worker->cv);
233     }
234   }
235   return absl::OkStatus();
236 }
237 
238 grpc_pollset_vtable grpc_windows_pollset_vtable = {
239     pollset_global_init, pollset_global_shutdown,
240     pollset_init,        pollset_shutdown,
241     pollset_destroy,     pollset_work,
242     pollset_kick,        pollset_size};
243 
244 #endif  // GRPC_WINSOCK_SOCKET
245