1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/timer_manager.h"
22
23 #include <inttypes.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/gprpp/thd.h"
30 #include "src/core/lib/iomgr/timer.h"
31
32 struct completed_thread {
33 grpc_core::Thread thd;
34 completed_thread* next;
35 };
36
37 extern grpc_core::TraceFlag grpc_timer_check_trace;
38
39 // global mutex
40 static gpr_mu g_mu;
41 // are we multi-threaded
42 static bool g_threaded;
43 // cv to wait until a thread is needed
44 static gpr_cv g_cv_wait;
45 // cv for notification when threading ends
46 static gpr_cv g_cv_shutdown;
47 // number of threads in the system
48 static int g_thread_count;
49 // number of threads sitting around waiting
50 static int g_waiter_count;
51 // linked list of threads that have completed (and need joining)
52 static completed_thread* g_completed_threads;
53 // was the manager kicked by the timer system
54 static bool g_kicked;
55 // is there a thread waiting until the next timer should fire?
56 static bool g_has_timed_waiter;
57 // the deadline of the current timed waiter thread (only relevant if
58 // g_has_timed_waiter is true)
59 static grpc_millis g_timed_waiter_deadline;
60 // generation counter to track which thread is waiting for the next timer
61 static uint64_t g_timed_waiter_generation;
62 // number of timer wakeups
63 static uint64_t g_wakeups;
64
65 static void timer_thread(void* completed_thread_ptr);
66
gc_completed_threads(void)67 static void gc_completed_threads(void) {
68 if (g_completed_threads != nullptr) {
69 completed_thread* to_gc = g_completed_threads;
70 g_completed_threads = nullptr;
71 gpr_mu_unlock(&g_mu);
72 while (to_gc != nullptr) {
73 to_gc->thd.Join();
74 completed_thread* next = to_gc->next;
75 gpr_free(to_gc);
76 to_gc = next;
77 }
78 gpr_mu_lock(&g_mu);
79 }
80 }
81
start_timer_thread_and_unlock(void)82 static void start_timer_thread_and_unlock(void) {
83 GPR_ASSERT(g_threaded);
84 ++g_waiter_count;
85 ++g_thread_count;
86 gpr_mu_unlock(&g_mu);
87 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
88 gpr_log(GPR_INFO, "Spawn timer thread");
89 }
90 completed_thread* ct =
91 static_cast<completed_thread*>(gpr_malloc(sizeof(*ct)));
92 ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct);
93 ct->thd.Start();
94 }
95
grpc_timer_manager_tick()96 void grpc_timer_manager_tick() {
97 grpc_core::ExecCtx exec_ctx;
98 grpc_timer_check(nullptr);
99 }
100
run_some_timers()101 static void run_some_timers() {
102 // In the case of timers, the ExecCtx for the thread is declared
103 // in the timer thread itself, but this is the point where we
104 // could start seeing application-level callbacks. No need to
105 // create a new ExecCtx, though, since there already is one and it is
106 // flushed (but not destructed) in this function itself
107 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
108 GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
109
110 // if there's something to execute...
111 gpr_mu_lock(&g_mu);
112 // remove a waiter from the pool, and start another thread if necessary
113 --g_waiter_count;
114 if (g_waiter_count == 0 && g_threaded) {
115 // The number of timer threads is always increasing until all the threads
116 // are stopped. In rare cases, if a large number of timers fire
117 // simultaneously, we may end up using a large number of threads.
118 start_timer_thread_and_unlock();
119 } else {
120 // if there's no thread waiting with a timeout, kick an existing untimed
121 // waiter so that the next deadline is not missed
122 if (!g_has_timed_waiter) {
123 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
124 gpr_log(GPR_INFO, "kick untimed waiter");
125 }
126 gpr_cv_signal(&g_cv_wait);
127 }
128 gpr_mu_unlock(&g_mu);
129 }
130 // without our lock, flush the exec_ctx
131 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
132 gpr_log(GPR_INFO, "flush exec_ctx");
133 }
134 grpc_core::ExecCtx::Get()->Flush();
135 gpr_mu_lock(&g_mu);
136 // garbage collect any threads hanging out that are dead
137 gc_completed_threads();
138 // get ready to wait again
139 ++g_waiter_count;
140 gpr_mu_unlock(&g_mu);
141 }
142
143 // wait until 'next' (or forever if there is already a timed waiter in the pool)
144 // returns true if the thread should continue executing (false if it should
145 // shutdown)
wait_until(grpc_millis next)146 static bool wait_until(grpc_millis next) {
147 gpr_mu_lock(&g_mu);
148 // if we're not threaded anymore, leave
149 if (!g_threaded) {
150 gpr_mu_unlock(&g_mu);
151 return false;
152 }
153
154 // If g_kicked is true at this point, it means there was a kick from the timer
155 // system that the timer-manager threads here missed. We cannot trust 'next'
156 // here any longer (since there might be an earlier deadline). So if g_kicked
157 // is true at this point, we should quickly exit this and get the next
158 // deadline from the timer system
159
160 if (!g_kicked) {
161 // if there's no timed waiter, we should become one: that waiter waits
162 // only until the next timer should expire. All other timers wait forever
163 //
164 // 'g_timed_waiter_generation' is a global generation counter. The idea here
165 // is that the thread becoming a timed-waiter increments and stores this
166 // global counter locally in 'my_timed_waiter_generation' before going to
167 // sleep. After waking up, if my_timed_waiter_generation ==
168 // g_timed_waiter_generation, it can be sure that it was the timed_waiter
169 // thread (and that no other thread took over while this was asleep)
170 //
171 // Initialize my_timed_waiter_generation to some value that is NOT equal to
172 // g_timed_waiter_generation
173 uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
174
175 /* If there's no timed waiter, we should become one: that waiter waits only
176 until the next timer should expire. All other timer threads wait forever
177 unless their 'next' is earlier than the current timed-waiter's deadline
178 (in which case the thread with earlier 'next' takes over as the new timed
179 waiter) */
180 if (next != GRPC_MILLIS_INF_FUTURE) {
181 if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) {
182 my_timed_waiter_generation = ++g_timed_waiter_generation;
183 g_has_timed_waiter = true;
184 g_timed_waiter_deadline = next;
185
186 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
187 grpc_millis wait_time = next - grpc_core::ExecCtx::Get()->Now();
188 gpr_log(GPR_INFO, "sleep for a %" PRId64 " milliseconds", wait_time);
189 }
190 } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline
191 next = GRPC_MILLIS_INF_FUTURE;
192 }
193 }
194
195 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace) &&
196 next == GRPC_MILLIS_INF_FUTURE) {
197 gpr_log(GPR_INFO, "sleep until kicked");
198 }
199
200 gpr_cv_wait(&g_cv_wait, &g_mu,
201 grpc_millis_to_timespec(next, GPR_CLOCK_MONOTONIC));
202
203 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
204 gpr_log(GPR_INFO, "wait ended: was_timed:%d kicked:%d",
205 my_timed_waiter_generation == g_timed_waiter_generation,
206 g_kicked);
207 }
208 // if this was the timed waiter, then we need to check timers, and flag
209 // that there's now no timed waiter... we'll look for a replacement if
210 // there's work to do after checking timers (code above)
211 if (my_timed_waiter_generation == g_timed_waiter_generation) {
212 ++g_wakeups;
213 g_has_timed_waiter = false;
214 g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
215 }
216 }
217
218 // if this was a kick from the timer system, consume it (and don't stop
219 // this thread yet)
220 if (g_kicked) {
221 grpc_timer_consume_kick();
222 g_kicked = false;
223 }
224
225 gpr_mu_unlock(&g_mu);
226 return true;
227 }
228
timer_main_loop()229 static void timer_main_loop() {
230 for (;;) {
231 grpc_millis next = GRPC_MILLIS_INF_FUTURE;
232 grpc_core::ExecCtx::Get()->InvalidateNow();
233
234 // check timer state, updates next to the next time to run a check
235 switch (grpc_timer_check(&next)) {
236 case GRPC_TIMERS_FIRED:
237 run_some_timers();
238 break;
239 case GRPC_TIMERS_NOT_CHECKED:
240 /* This case only happens under contention, meaning more than one timer
241 manager thread checked timers concurrently.
242
243 If that happens, we're guaranteed that some other thread has just
244 checked timers, and this will avalanche into some other thread seeing
245 empty timers and doing a timed sleep.
246
247 Consequently, we can just sleep forever here and be happy at some
248 saved wakeup cycles. */
249 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
250 gpr_log(GPR_INFO, "timers not checked: expect another thread to");
251 }
252 next = GRPC_MILLIS_INF_FUTURE;
253 // fallthrough
254 case GRPC_TIMERS_CHECKED_AND_EMPTY:
255 if (!wait_until(next)) {
256 return;
257 }
258 break;
259 }
260 }
261 }
262
timer_thread_cleanup(completed_thread * ct)263 static void timer_thread_cleanup(completed_thread* ct) {
264 gpr_mu_lock(&g_mu);
265 // terminate the thread: drop the waiter count, thread count, and let whomever
266 // stopped the threading stuff know that we're done
267 --g_waiter_count;
268 --g_thread_count;
269 if (0 == g_thread_count) {
270 gpr_cv_signal(&g_cv_shutdown);
271 }
272 ct->next = g_completed_threads;
273 g_completed_threads = ct;
274 gpr_mu_unlock(&g_mu);
275 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
276 gpr_log(GPR_INFO, "End timer thread");
277 }
278 }
279
timer_thread(void * completed_thread_ptr)280 static void timer_thread(void* completed_thread_ptr) {
281 // this threads exec_ctx: we try to run things through to completion here
282 // since it's easy to spin up new threads
283 grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
284 timer_main_loop();
285
286 timer_thread_cleanup(static_cast<completed_thread*>(completed_thread_ptr));
287 }
288
start_threads(void)289 static void start_threads(void) {
290 gpr_mu_lock(&g_mu);
291 if (!g_threaded) {
292 g_threaded = true;
293 start_timer_thread_and_unlock();
294 } else {
295 gpr_mu_unlock(&g_mu);
296 }
297 }
298
grpc_timer_manager_init(void)299 void grpc_timer_manager_init(void) {
300 gpr_mu_init(&g_mu);
301 gpr_cv_init(&g_cv_wait);
302 gpr_cv_init(&g_cv_shutdown);
303 g_threaded = false;
304 g_thread_count = 0;
305 g_waiter_count = 0;
306 g_completed_threads = nullptr;
307
308 g_has_timed_waiter = false;
309 g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
310
311 start_threads();
312 }
313
stop_threads(void)314 static void stop_threads(void) {
315 gpr_mu_lock(&g_mu);
316 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
317 gpr_log(GPR_INFO, "stop timer threads: threaded=%d", g_threaded);
318 }
319 if (g_threaded) {
320 g_threaded = false;
321 gpr_cv_broadcast(&g_cv_wait);
322 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
323 gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
324 }
325 while (g_thread_count > 0) {
326 gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
327 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
328 gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
329 }
330 gc_completed_threads();
331 }
332 }
333 g_wakeups = 0;
334 gpr_mu_unlock(&g_mu);
335 }
336
grpc_timer_manager_shutdown(void)337 void grpc_timer_manager_shutdown(void) {
338 stop_threads();
339
340 gpr_mu_destroy(&g_mu);
341 gpr_cv_destroy(&g_cv_wait);
342 gpr_cv_destroy(&g_cv_shutdown);
343 }
344
grpc_timer_manager_set_threading(bool enabled)345 void grpc_timer_manager_set_threading(bool enabled) {
346 if (enabled) {
347 start_threads();
348 } else {
349 stop_threads();
350 }
351 }
352
grpc_kick_poller(void)353 void grpc_kick_poller(void) {
354 gpr_mu_lock(&g_mu);
355 g_kicked = true;
356 g_has_timed_waiter = false;
357 g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
358 ++g_timed_waiter_generation;
359 gpr_cv_signal(&g_cv_wait);
360 gpr_mu_unlock(&g_mu);
361 }
362
grpc_timer_manager_get_wakeups_testonly(void)363 uint64_t grpc_timer_manager_get_wakeups_testonly(void) { return g_wakeups; }
364