1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <inttypes.h>
22
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25
26 #include "src/core/lib/debug/trace.h"
27 #include "src/core/lib/gprpp/thd.h"
28 #include "src/core/lib/iomgr/timer.h"
29 #include "src/core/lib/iomgr/timer_manager.h"
30
31 struct completed_thread {
32 grpc_core::Thread thd;
33 completed_thread* next;
34 };
35
36 extern grpc_core::TraceFlag grpc_timer_check_trace;
37
38 // global mutex
39 static gpr_mu g_mu;
40 // are we multi-threaded
41 static bool g_threaded;
42 // cv to wait until a thread is needed
43 static gpr_cv g_cv_wait;
44 // cv for notification when threading ends
45 static gpr_cv g_cv_shutdown;
46 // number of threads in the system
47 static int g_thread_count;
48 // number of threads sitting around waiting
49 static int g_waiter_count;
50 // linked list of threads that have completed (and need joining)
51 static completed_thread* g_completed_threads;
52 // was the manager kicked by the timer system
53 static bool g_kicked;
54 // is there a thread waiting until the next timer should fire?
55 static bool g_has_timed_waiter;
56 // the deadline of the current timed waiter thread (only relevant if
57 // g_has_timed_waiter is true)
58 static grpc_millis g_timed_waiter_deadline;
59 // generation counter to track which thread is waiting for the next timer
60 static uint64_t g_timed_waiter_generation;
61
62 static void timer_thread(void* completed_thread_ptr);
63
gc_completed_threads(void)64 static void gc_completed_threads(void) {
65 if (g_completed_threads != nullptr) {
66 completed_thread* to_gc = g_completed_threads;
67 g_completed_threads = nullptr;
68 gpr_mu_unlock(&g_mu);
69 while (to_gc != nullptr) {
70 to_gc->thd.Join();
71 completed_thread* next = to_gc->next;
72 gpr_free(to_gc);
73 to_gc = next;
74 }
75 gpr_mu_lock(&g_mu);
76 }
77 }
78
start_timer_thread_and_unlock(void)79 static void start_timer_thread_and_unlock(void) {
80 GPR_ASSERT(g_threaded);
81 ++g_waiter_count;
82 ++g_thread_count;
83 gpr_mu_unlock(&g_mu);
84 if (grpc_timer_check_trace.enabled()) {
85 gpr_log(GPR_INFO, "Spawn timer thread");
86 }
87 completed_thread* ct =
88 static_cast<completed_thread*>(gpr_malloc(sizeof(*ct)));
89 ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct);
90 ct->thd.Start();
91 }
92
grpc_timer_manager_tick()93 void grpc_timer_manager_tick() {
94 grpc_core::ExecCtx exec_ctx;
95 grpc_millis next = GRPC_MILLIS_INF_FUTURE;
96 grpc_timer_check(&next);
97 }
98
run_some_timers()99 static void run_some_timers() {
100 // if there's something to execute...
101 gpr_mu_lock(&g_mu);
102 // remove a waiter from the pool, and start another thread if necessary
103 --g_waiter_count;
104 if (g_waiter_count == 0 && g_threaded) {
105 start_timer_thread_and_unlock();
106 } else {
107 // if there's no thread waiting with a timeout, kick an existing
108 // waiter so that the next deadline is not missed
109 if (!g_has_timed_waiter) {
110 if (grpc_timer_check_trace.enabled()) {
111 gpr_log(GPR_INFO, "kick untimed waiter");
112 }
113 gpr_cv_signal(&g_cv_wait);
114 }
115 gpr_mu_unlock(&g_mu);
116 }
117 // without our lock, flush the exec_ctx
118 if (grpc_timer_check_trace.enabled()) {
119 gpr_log(GPR_INFO, "flush exec_ctx");
120 }
121 grpc_core::ExecCtx::Get()->Flush();
122 gpr_mu_lock(&g_mu);
123 // garbage collect any threads hanging out that are dead
124 gc_completed_threads();
125 // get ready to wait again
126 ++g_waiter_count;
127 gpr_mu_unlock(&g_mu);
128 }
129
130 // wait until 'next' (or forever if there is already a timed waiter in the pool)
131 // returns true if the thread should continue executing (false if it should
132 // shutdown)
wait_until(grpc_millis next)133 static bool wait_until(grpc_millis next) {
134 gpr_mu_lock(&g_mu);
135 // if we're not threaded anymore, leave
136 if (!g_threaded) {
137 gpr_mu_unlock(&g_mu);
138 return false;
139 }
140
141 // If g_kicked is true at this point, it means there was a kick from the timer
142 // system that the timer-manager threads here missed. We cannot trust 'next'
143 // here any longer (since there might be an earlier deadline). So if g_kicked
144 // is true at this point, we should quickly exit this and get the next
145 // deadline from the timer system
146
147 if (!g_kicked) {
148 // if there's no timed waiter, we should become one: that waiter waits
149 // only until the next timer should expire. All other timers wait forever
150 //
151 // 'g_timed_waiter_generation' is a global generation counter. The idea here
152 // is that the thread becoming a timed-waiter increments and stores this
153 // global counter locally in 'my_timed_waiter_generation' before going to
154 // sleep. After waking up, if my_timed_waiter_generation ==
155 // g_timed_waiter_generation, it can be sure that it was the timed_waiter
156 // thread (and that no other thread took over while this was asleep)
157 //
158 // Initialize my_timed_waiter_generation to some value that is NOT equal to
159 // g_timed_waiter_generation
160 uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
161
162 /* If there's no timed waiter, we should become one: that waiter waits only
163 until the next timer should expire. All other timer threads wait forever
164 unless their 'next' is earlier than the current timed-waiter's deadline
165 (in which case the thread with earlier 'next' takes over as the new timed
166 waiter) */
167 if (next != GRPC_MILLIS_INF_FUTURE) {
168 if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) {
169 my_timed_waiter_generation = ++g_timed_waiter_generation;
170 g_has_timed_waiter = true;
171 g_timed_waiter_deadline = next;
172
173 if (grpc_timer_check_trace.enabled()) {
174 grpc_millis wait_time = next - grpc_core::ExecCtx::Get()->Now();
175 gpr_log(GPR_INFO, "sleep for a %" PRId64 " milliseconds", wait_time);
176 }
177 } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline
178 next = GRPC_MILLIS_INF_FUTURE;
179 }
180 }
181
182 if (grpc_timer_check_trace.enabled() && next == GRPC_MILLIS_INF_FUTURE) {
183 gpr_log(GPR_INFO, "sleep until kicked");
184 }
185
186 gpr_cv_wait(&g_cv_wait, &g_mu,
187 grpc_millis_to_timespec(next, GPR_CLOCK_MONOTONIC));
188
189 if (grpc_timer_check_trace.enabled()) {
190 gpr_log(GPR_INFO, "wait ended: was_timed:%d kicked:%d",
191 my_timed_waiter_generation == g_timed_waiter_generation,
192 g_kicked);
193 }
194 // if this was the timed waiter, then we need to check timers, and flag
195 // that there's now no timed waiter... we'll look for a replacement if
196 // there's work to do after checking timers (code above)
197 if (my_timed_waiter_generation == g_timed_waiter_generation) {
198 g_has_timed_waiter = false;
199 g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
200 }
201 }
202
203 // if this was a kick from the timer system, consume it (and don't stop
204 // this thread yet)
205 if (g_kicked) {
206 grpc_timer_consume_kick();
207 g_kicked = false;
208 }
209
210 gpr_mu_unlock(&g_mu);
211 return true;
212 }
213
timer_main_loop()214 static void timer_main_loop() {
215 for (;;) {
216 grpc_millis next = GRPC_MILLIS_INF_FUTURE;
217 grpc_core::ExecCtx::Get()->InvalidateNow();
218
219 // check timer state, updates next to the next time to run a check
220 switch (grpc_timer_check(&next)) {
221 case GRPC_TIMERS_FIRED:
222 run_some_timers();
223 break;
224 case GRPC_TIMERS_NOT_CHECKED:
225 /* This case only happens under contention, meaning more than one timer
226 manager thread checked timers concurrently.
227
228 If that happens, we're guaranteed that some other thread has just
229 checked timers, and this will avalanche into some other thread seeing
230 empty timers and doing a timed sleep.
231
232 Consequently, we can just sleep forever here and be happy at some
233 saved wakeup cycles. */
234 if (grpc_timer_check_trace.enabled()) {
235 gpr_log(GPR_INFO, "timers not checked: expect another thread to");
236 }
237 next = GRPC_MILLIS_INF_FUTURE;
238 /* fall through */
239 case GRPC_TIMERS_CHECKED_AND_EMPTY:
240 if (!wait_until(next)) {
241 return;
242 }
243 break;
244 }
245 }
246 }
247
timer_thread_cleanup(completed_thread * ct)248 static void timer_thread_cleanup(completed_thread* ct) {
249 gpr_mu_lock(&g_mu);
250 // terminate the thread: drop the waiter count, thread count, and let whomever
251 // stopped the threading stuff know that we're done
252 --g_waiter_count;
253 --g_thread_count;
254 if (0 == g_thread_count) {
255 gpr_cv_signal(&g_cv_shutdown);
256 }
257 ct->next = g_completed_threads;
258 g_completed_threads = ct;
259 gpr_mu_unlock(&g_mu);
260 if (grpc_timer_check_trace.enabled()) {
261 gpr_log(GPR_INFO, "End timer thread");
262 }
263 }
264
timer_thread(void * completed_thread_ptr)265 static void timer_thread(void* completed_thread_ptr) {
266 // this threads exec_ctx: we try to run things through to completion here
267 // since it's easy to spin up new threads
268 grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
269 timer_main_loop();
270
271 timer_thread_cleanup(static_cast<completed_thread*>(completed_thread_ptr));
272 }
273
start_threads(void)274 static void start_threads(void) {
275 gpr_mu_lock(&g_mu);
276 if (!g_threaded) {
277 g_threaded = true;
278 start_timer_thread_and_unlock();
279 } else {
280 gpr_mu_unlock(&g_mu);
281 }
282 }
283
grpc_timer_manager_init(void)284 void grpc_timer_manager_init(void) {
285 gpr_mu_init(&g_mu);
286 gpr_cv_init(&g_cv_wait);
287 gpr_cv_init(&g_cv_shutdown);
288 g_threaded = false;
289 g_thread_count = 0;
290 g_waiter_count = 0;
291 g_completed_threads = nullptr;
292
293 g_has_timed_waiter = false;
294 g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
295
296 start_threads();
297 }
298
stop_threads(void)299 static void stop_threads(void) {
300 gpr_mu_lock(&g_mu);
301 if (grpc_timer_check_trace.enabled()) {
302 gpr_log(GPR_INFO, "stop timer threads: threaded=%d", g_threaded);
303 }
304 if (g_threaded) {
305 g_threaded = false;
306 gpr_cv_broadcast(&g_cv_wait);
307 if (grpc_timer_check_trace.enabled()) {
308 gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
309 }
310 while (g_thread_count > 0) {
311 gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
312 if (grpc_timer_check_trace.enabled()) {
313 gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
314 }
315 gc_completed_threads();
316 }
317 }
318 gpr_mu_unlock(&g_mu);
319 }
320
grpc_timer_manager_shutdown(void)321 void grpc_timer_manager_shutdown(void) {
322 stop_threads();
323
324 gpr_mu_destroy(&g_mu);
325 gpr_cv_destroy(&g_cv_wait);
326 gpr_cv_destroy(&g_cv_shutdown);
327 }
328
grpc_timer_manager_set_threading(bool threaded)329 void grpc_timer_manager_set_threading(bool threaded) {
330 if (threaded) {
331 start_threads();
332 } else {
333 stop_threads();
334 }
335 }
336
grpc_kick_poller(void)337 void grpc_kick_poller(void) {
338 gpr_mu_lock(&g_mu);
339 g_kicked = true;
340 g_has_timed_waiter = false;
341 g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
342 ++g_timed_waiter_generation;
343 gpr_cv_signal(&g_cv_wait);
344 gpr_mu_unlock(&g_mu);
345 }
346