• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 /// Event engine based on Apple's CFRunLoop API family. If the CFRunLoop engine
20 /// is enabled (see iomgr_posix_cfstream.cc), a global thread is started to
21 /// handle and trigger all the CFStream events. The CFStream streams register
22 /// themselves with the run loop with functions grpc_apple_register_read_stream
23 /// and grpc_apple_register_read_stream. Pollsets are phony and block on a
24 /// condition variable in pollset_work().
25 
26 #include <grpc/support/port_platform.h>
27 
28 #include "src/core/lib/iomgr/port.h"
29 
30 #ifdef GRPC_APPLE_EV
31 
32 #include <CoreFoundation/CoreFoundation.h>
33 
34 #include <list>
35 
36 #include "absl/time/time.h"
37 #include "src/core/lib/iomgr/ev_apple.h"
38 #include "src/core/util/thd.h"
39 #include "src/core/util/time_util.h"
40 
41 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
42 
43 struct GlobalRunLoopContext {
44   grpc_core::CondVar init_cv;
45   grpc_core::CondVar input_source_cv;
46 
47   grpc_core::Mutex mu;
48 
49   // Whether an input source registration is pending. Protected by mu.
50   bool input_source_registered = false;
51 
52   // The reference to the global run loop object. Protected by mu.
53   CFRunLoopRef run_loop;
54 
55   // Whether the pollset has been globally shut down. Protected by mu.
56   bool is_shutdown = false;
57 };
58 
59 struct GrpcAppleWorker {
60   // The condition variable to kick the worker. Works with the pollset's lock
61   // (GrpcApplePollset.mu).
62   grpc_core::CondVar cv;
63 
64   // Whether the worker is kicked. Protected by the pollset's lock
65   // (GrpcApplePollset.mu).
66   bool kicked = false;
67 };
68 
69 struct GrpcApplePollset {
70   grpc_core::Mutex mu;
71 
72   // Tracks the current workers in the pollset. Protected by mu.
73   std::list<GrpcAppleWorker*> workers;
74 
75   // Whether the pollset is shut down. Protected by mu.
76   bool is_shutdown = false;
77 
78   // Closure to call when shutdown is done. Protected by mu.
79   grpc_closure* shutdown_closure;
80 
81   // Whether there's an outstanding kick that was not processed. Protected by
82   // mu.
83   bool kicked_without_poller = false;
84 };
85 
86 static GlobalRunLoopContext* gGlobalRunLoopContext = nullptr;
87 static grpc_core::Thread* gGlobalRunLoopThread = nullptr;
88 
89 /// Register the stream with the dispatch queue. Callbacks of the stream will be
90 /// issued to the dispatch queue when a network event happens and will be
91 /// managed by Grand Central Dispatch.
grpc_apple_register_read_stream_queue(CFReadStreamRef read_stream,dispatch_queue_t dispatch_queue)92 static void grpc_apple_register_read_stream_queue(
93     CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) {
94   CFReadStreamSetDispatchQueue(read_stream, dispatch_queue);
95 }
96 
97 /// Register the stream with the dispatch queue. Callbacks of the stream will be
98 /// issued to the dispatch queue when a network event happens and will be
99 /// managed by Grand Central Dispatch.
grpc_apple_register_write_stream_queue(CFWriteStreamRef write_stream,dispatch_queue_t dispatch_queue)100 static void grpc_apple_register_write_stream_queue(
101     CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) {
102   CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue);
103 }
104 
105 /// Register the stream with the global run loop. Callbacks of the stream will
106 /// be issued to the run loop when a network event happens and will be driven by
107 /// the global run loop thread gGlobalRunLoopThread.
grpc_apple_register_read_stream_run_loop(CFReadStreamRef read_stream,dispatch_queue_t)108 static void grpc_apple_register_read_stream_run_loop(
109     CFReadStreamRef read_stream, dispatch_queue_t /*dispatch_queue*/) {
110   GRPC_TRACE_VLOG(apple_polling, 2)
111       << "(polling) Register read stream: " << read_stream;
112   grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
113   CFReadStreamScheduleWithRunLoop(read_stream, gGlobalRunLoopContext->run_loop,
114                                   kCFRunLoopDefaultMode);
115   gGlobalRunLoopContext->input_source_registered = true;
116   gGlobalRunLoopContext->input_source_cv.Signal();
117 }
118 
119 /// Register the stream with the global run loop. Callbacks of the stream will
120 /// be issued to the run loop when a network event happens, and will be driven
121 /// by the global run loop thread gGlobalRunLoopThread.
grpc_apple_register_write_stream_run_loop(CFWriteStreamRef write_stream,dispatch_queue_t)122 static void grpc_apple_register_write_stream_run_loop(
123     CFWriteStreamRef write_stream, dispatch_queue_t /*dispatch_queue*/) {
124   GRPC_TRACE_VLOG(apple_polling, 2)
125       << "(polling) Register write stream: " << write_stream;
126   grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
127   CFWriteStreamScheduleWithRunLoop(
128       write_stream, gGlobalRunLoopContext->run_loop, kCFRunLoopDefaultMode);
129   gGlobalRunLoopContext->input_source_registered = true;
130   gGlobalRunLoopContext->input_source_cv.Signal();
131 }
132 
133 /// The default implementation of stream registration is to register the stream
134 /// to a dispatch queue. However, if the CFRunLoop based pollset is enabled (by
135 /// macro and environment variable, see docs in iomgr_posix_cfstream.cc), the
136 /// CFStream streams are registered with the global run loop instead (see
137 /// pollset_global_init below).
138 static void (*grpc_apple_register_read_stream_impl)(
139     CFReadStreamRef, dispatch_queue_t) = grpc_apple_register_read_stream_queue;
140 static void (*grpc_apple_register_write_stream_impl)(CFWriteStreamRef,
141                                                      dispatch_queue_t) =
142     grpc_apple_register_write_stream_queue;
143 
grpc_apple_register_read_stream(CFReadStreamRef read_stream,dispatch_queue_t dispatch_queue)144 void grpc_apple_register_read_stream(CFReadStreamRef read_stream,
145                                      dispatch_queue_t dispatch_queue) {
146   grpc_apple_register_read_stream_impl(read_stream, dispatch_queue);
147 }
148 
grpc_apple_register_write_stream(CFWriteStreamRef write_stream,dispatch_queue_t dispatch_queue)149 void grpc_apple_register_write_stream(CFWriteStreamRef write_stream,
150                                       dispatch_queue_t dispatch_queue) {
151   grpc_apple_register_write_stream_impl(write_stream, dispatch_queue);
152 }
153 
154 /// Drive the run loop in a global singleton thread until the global run loop is
155 /// shutdown.
GlobalRunLoopFunc(void *)156 static void GlobalRunLoopFunc(void* /*arg*/) {
157   grpc_core::LockableAndReleasableMutexLock lock(&gGlobalRunLoopContext->mu);
158   gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent();
159   gGlobalRunLoopContext->init_cv.Signal();
160 
161   while (!gGlobalRunLoopContext->is_shutdown) {
162     // CFRunLoopRun() will return immediately if no stream is registered on it.
163     // So we wait on a conditional variable until a stream is registered;
164     // otherwise we'll be running a spinning loop.
165     while (!gGlobalRunLoopContext->input_source_registered) {
166       gGlobalRunLoopContext->input_source_cv.Wait(&gGlobalRunLoopContext->mu);
167     }
168     gGlobalRunLoopContext->input_source_registered = false;
169     lock.Release();
170     CFRunLoopRun();
171     lock.Lock();
172   }
173   lock.Release();
174 }
175 
176 // pollset implementation
177 
pollset_global_init(void)178 static void pollset_global_init(void) {
179   gGlobalRunLoopContext = new GlobalRunLoopContext;
180 
181   grpc_apple_register_read_stream_impl =
182       grpc_apple_register_read_stream_run_loop;
183   grpc_apple_register_write_stream_impl =
184       grpc_apple_register_write_stream_run_loop;
185 
186   grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
187   gGlobalRunLoopThread =
188       new grpc_core::Thread("apple_ev", GlobalRunLoopFunc, nullptr);
189   gGlobalRunLoopThread->Start();
190   while (gGlobalRunLoopContext->run_loop == NULL)
191     gGlobalRunLoopContext->init_cv.Wait(&gGlobalRunLoopContext->mu);
192 }
193 
pollset_global_shutdown(void)194 static void pollset_global_shutdown(void) {
195   {
196     grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
197     gGlobalRunLoopContext->is_shutdown = true;
198     CFRunLoopStop(gGlobalRunLoopContext->run_loop);
199   }
200   gGlobalRunLoopThread->Join();
201   delete gGlobalRunLoopThread;
202   delete gGlobalRunLoopContext;
203 }
204 
205 /// The caller must acquire the lock GrpcApplePollset.mu before calling this
206 /// function. The lock may be temporarily released when waiting on the condition
207 /// variable but will be re-acquired before the function returns.
208 ///
209 /// The Apple pollset simply waits on a condition variable until it is kicked.
210 /// The network events are handled in the global run loop thread. Processing of
211 /// these events will eventually trigger the kick.
pollset_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_core::Timestamp deadline)212 static grpc_error_handle pollset_work(grpc_pollset* pollset,
213                                       grpc_pollset_worker** worker,
214                                       grpc_core::Timestamp deadline) {
215   GRPC_TRACE_VLOG(apple_polling, 2)
216       << "(polling) pollset work: " << pollset << ", worker: " << worker
217       << ", deadline: " << deadline.milliseconds_after_process_epoch();
218   GrpcApplePollset* apple_pollset =
219       reinterpret_cast<GrpcApplePollset*>(pollset);
220   GrpcAppleWorker actual_worker;
221   if (worker) {
222     *worker = reinterpret_cast<grpc_pollset_worker*>(&actual_worker);
223   }
224 
225   if (apple_pollset->kicked_without_poller) {
226     // Process the outstanding kick and reset the flag. Do not block.
227     apple_pollset->kicked_without_poller = false;
228   } else {
229     // Block until kicked, timed out, or the pollset shuts down.
230     apple_pollset->workers.push_front(&actual_worker);
231     auto it = apple_pollset->workers.begin();
232 
233     while (!actual_worker.kicked && !apple_pollset->is_shutdown) {
234       if (actual_worker.cv.WaitWithDeadline(
235               &apple_pollset->mu, grpc_core::ToAbslTime(deadline.as_timespec(
236                                       GPR_CLOCK_REALTIME)))) {
237         // timed out
238         break;
239       }
240     }
241 
242     apple_pollset->workers.erase(it);
243 
244     // If the pollset is shut down asynchronously and this is the last pending
245     // worker, the shutdown process is complete at this moment and the shutdown
246     // callback will be called.
247     if (apple_pollset->is_shutdown && apple_pollset->workers.empty()) {
248       grpc_core::ExecCtx::Run(DEBUG_LOCATION, apple_pollset->shutdown_closure,
249                               absl::OkStatus());
250     }
251   }
252 
253   return absl::OkStatus();
254 }
255 
256 /// Kick a specific worker. The caller must acquire the lock GrpcApplePollset.mu
257 /// before calling this function.
kick_worker(GrpcAppleWorker * worker)258 static void kick_worker(GrpcAppleWorker* worker) {
259   worker->kicked = true;
260   worker->cv.Signal();
261 }
262 
263 /// The caller must acquire the lock GrpcApplePollset.mu before calling this
264 /// function. The kick action simply signals the condition variable of the
265 /// worker.
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)266 static grpc_error_handle pollset_kick(grpc_pollset* pollset,
267                                       grpc_pollset_worker* specific_worker) {
268   GrpcApplePollset* apple_pollset =
269       reinterpret_cast<GrpcApplePollset*>(pollset);
270 
271   GRPC_TRACE_VLOG(apple_polling, 2) << "(polling) pollset kick: " << pollset
272                                     << ", worker:" << specific_worker;
273 
274   if (specific_worker == nullptr) {
275     if (apple_pollset->workers.empty()) {
276       apple_pollset->kicked_without_poller = true;
277     } else {
278       GrpcAppleWorker* actual_worker = apple_pollset->workers.front();
279       kick_worker(actual_worker);
280     }
281   } else if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
282     for (auto& actual_worker : apple_pollset->workers) {
283       kick_worker(actual_worker);
284     }
285   } else {
286     GrpcAppleWorker* actual_worker =
287         reinterpret_cast<GrpcAppleWorker*>(specific_worker);
288     kick_worker(actual_worker);
289   }
290 
291   return absl::OkStatus();
292 }
293 
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)294 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
295   GRPC_TRACE_VLOG(apple_polling, 2) << "(polling) pollset init: " << pollset;
296   GrpcApplePollset* apple_pollset = new (pollset) GrpcApplePollset();
297   *mu = grpc_core::GetUnderlyingGprMu(&apple_pollset->mu);
298 }
299 
300 /// The caller must acquire the lock GrpcApplePollset.mu before calling this
301 /// function.
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)302 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
303   GRPC_TRACE_VLOG(apple_polling, 2)
304       << "(polling) pollset shutdown: " << pollset;
305 
306   GrpcApplePollset* apple_pollset =
307       reinterpret_cast<GrpcApplePollset*>(pollset);
308   apple_pollset->is_shutdown = true;
309   (void)pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
310 
311   // If there is any worker blocked, shutdown will be done asynchronously.
312   if (apple_pollset->workers.empty()) {
313     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
314   } else {
315     apple_pollset->shutdown_closure = closure;
316   }
317 }
318 
pollset_destroy(grpc_pollset * pollset)319 static void pollset_destroy(grpc_pollset* pollset) {
320   GRPC_TRACE_VLOG(apple_polling, 2) << "(polling) pollset destroy: " << pollset;
321   GrpcApplePollset* apple_pollset =
322       reinterpret_cast<GrpcApplePollset*>(pollset);
323   apple_pollset->~GrpcApplePollset();
324 }
325 
pollset_size(void)326 size_t pollset_size(void) { return sizeof(GrpcApplePollset); }
327 
328 grpc_pollset_vtable grpc_apple_pollset_vtable = {
329     pollset_global_init, pollset_global_shutdown,
330     pollset_init,        pollset_shutdown,
331     pollset_destroy,     pollset_work,
332     pollset_kick,        pollset_size};
333 
334 // pollset_set implementation
335 
pollset_set_create(void)336 grpc_pollset_set* pollset_set_create(void) { return nullptr; }
pollset_set_destroy(grpc_pollset_set *)337 void pollset_set_destroy(grpc_pollset_set* /*pollset_set*/) {}
pollset_set_add_pollset(grpc_pollset_set *,grpc_pollset *)338 void pollset_set_add_pollset(grpc_pollset_set* /*pollset_set*/,
339                              grpc_pollset* /*pollset*/) {}
pollset_set_del_pollset(grpc_pollset_set *,grpc_pollset *)340 void pollset_set_del_pollset(grpc_pollset_set* /*pollset_set*/,
341                              grpc_pollset* /*pollset*/) {}
pollset_set_add_pollset_set(grpc_pollset_set *,grpc_pollset_set *)342 void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/,
343                                  grpc_pollset_set* /*item*/) {}
pollset_set_del_pollset_set(grpc_pollset_set *,grpc_pollset_set *)344 void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/,
345                                  grpc_pollset_set* /*item*/) {}
346 
347 grpc_pollset_set_vtable grpc_apple_pollset_set_vtable = {
348     pollset_set_create,          pollset_set_destroy,
349     pollset_set_add_pollset,     pollset_set_del_pollset,
350     pollset_set_add_pollset_set, pollset_set_del_pollset_set};
351 
352 #endif
353