1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/iomgr/timer_manager.h"
20
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/port_platform.h>
23 #include <inttypes.h>
24
25 #include "absl/log/check.h"
26 #include "absl/log/log.h"
27 #include "src/core/lib/debug/trace.h"
28 #include "src/core/lib/iomgr/timer.h"
29 #include "src/core/util/crash.h"
30 #include "src/core/util/thd.h"
31
32 struct completed_thread {
33 grpc_core::Thread thd;
34 completed_thread* next;
35 };
36
37 // global mutex
38 static gpr_mu g_mu;
39 // are we multi-threaded
40 static bool g_threaded;
41 // should we start multi-threaded
42 static bool g_start_threaded = true;
43 // cv to wait until a thread is needed
44 static gpr_cv g_cv_wait;
45 // cv for notification when threading ends
46 static gpr_cv g_cv_shutdown;
47 // number of threads in the system
48 static int g_thread_count;
49 // number of threads sitting around waiting
50 static int g_waiter_count;
51 // linked list of threads that have completed (and need joining)
52 static completed_thread* g_completed_threads;
53 // was the manager kicked by the timer system
54 static bool g_kicked;
55 // is there a thread waiting until the next timer should fire?
56 static bool g_has_timed_waiter;
57 // the deadline of the current timed waiter thread (only relevant if
58 // g_has_timed_waiter is true)
59 static grpc_core::Timestamp g_timed_waiter_deadline;
60 // generation counter to track which thread is waiting for the next timer
61 static uint64_t g_timed_waiter_generation;
62 // number of timer wakeups
63 static uint64_t g_wakeups;
64
65 static void timer_thread(void* completed_thread_ptr);
66
gc_completed_threads(void)67 static void gc_completed_threads(void) {
68 if (g_completed_threads != nullptr) {
69 completed_thread* to_gc = g_completed_threads;
70 g_completed_threads = nullptr;
71 gpr_mu_unlock(&g_mu);
72 while (to_gc != nullptr) {
73 to_gc->thd.Join();
74 completed_thread* next = to_gc->next;
75 gpr_free(to_gc);
76 to_gc = next;
77 }
78 gpr_mu_lock(&g_mu);
79 }
80 }
81
start_timer_thread_and_unlock(void)82 static void start_timer_thread_and_unlock(void) {
83 CHECK(g_threaded);
84 ++g_waiter_count;
85 ++g_thread_count;
86 gpr_mu_unlock(&g_mu);
87 GRPC_TRACE_LOG(timer_check, INFO) << "Spawn timer thread";
88 completed_thread* ct =
89 static_cast<completed_thread*>(gpr_malloc(sizeof(*ct)));
90 ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct);
91 ct->thd.Start();
92 }
93
grpc_timer_manager_tick()94 void grpc_timer_manager_tick() {
95 grpc_core::ExecCtx exec_ctx;
96 grpc_timer_check(nullptr);
97 }
98
run_some_timers()99 static void run_some_timers() {
100 // In the case of timers, the ExecCtx for the thread is declared
101 // in the timer thread itself, but this is the point where we
102 // could start seeing application-level callbacks. No need to
103 // create a new ExecCtx, though, since there already is one and it is
104 // flushed (but not destructed) in this function itself
105 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
106 GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
107
108 // if there's something to execute...
109 gpr_mu_lock(&g_mu);
110 // remove a waiter from the pool, and start another thread if necessary
111 --g_waiter_count;
112 if (g_waiter_count == 0 && g_threaded) {
113 // The number of timer threads is always increasing until all the threads
114 // are stopped. In rare cases, if a large number of timers fire
115 // simultaneously, we may end up using a large number of threads.
116 start_timer_thread_and_unlock();
117 } else {
118 // if there's no thread waiting with a timeout, kick an existing untimed
119 // waiter so that the next deadline is not missed
120 if (!g_has_timed_waiter) {
121 GRPC_TRACE_LOG(timer_check, INFO) << "kick untimed waiter";
122 gpr_cv_signal(&g_cv_wait);
123 }
124 gpr_mu_unlock(&g_mu);
125 }
126 // without our lock, flush the exec_ctx
127 GRPC_TRACE_LOG(timer_check, INFO) << "flush exec_ctx";
128 grpc_core::ExecCtx::Get()->Flush();
129 gpr_mu_lock(&g_mu);
130 // garbage collect any threads that are dead
131 gc_completed_threads();
132 // get ready to wait again
133 ++g_waiter_count;
134 gpr_mu_unlock(&g_mu);
135 }
136
137 // wait until 'next' (or forever if there is already a timed waiter in the pool)
138 // returns true if the thread should continue executing (false if it should
139 // shutdown)
wait_until(grpc_core::Timestamp next)140 static bool wait_until(grpc_core::Timestamp next) {
141 gpr_mu_lock(&g_mu);
142 // if we're not threaded anymore, leave
143 if (!g_threaded) {
144 gpr_mu_unlock(&g_mu);
145 return false;
146 }
147
148 // If g_kicked is true at this point, it means there was a kick from the timer
149 // system that the timer-manager threads here missed. We cannot trust 'next'
150 // here any longer (since there might be an earlier deadline). So if g_kicked
151 // is true at this point, we should quickly exit this and get the next
152 // deadline from the timer system
153
154 if (!g_kicked) {
155 // if there's no timed waiter, we should become one: that waiter waits
156 // only until the next timer should expire. All other timers wait forever
157 //
158 // 'g_timed_waiter_generation' is a global generation counter. The idea here
159 // is that the thread becoming a timed-waiter increments and stores this
160 // global counter locally in 'my_timed_waiter_generation' before going to
161 // sleep. After waking up, if my_timed_waiter_generation ==
162 // g_timed_waiter_generation, it can be sure that it was the timed_waiter
163 // thread (and that no other thread took over while this was asleep)
164 //
165 // Initialize my_timed_waiter_generation to some value that is NOT equal to
166 // g_timed_waiter_generation
167 uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
168
169 // If there's no timed waiter, we should become one: that waiter waits only
170 // until the next timer should expire. All other timer threads wait forever
171 // unless their 'next' is earlier than the current timed-waiter's deadline
172 // (in which case the thread with earlier 'next' takes over as the new timed
173 // waiter)
174 if (next != grpc_core::Timestamp::InfFuture()) {
175 if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) {
176 my_timed_waiter_generation = ++g_timed_waiter_generation;
177 g_has_timed_waiter = true;
178 g_timed_waiter_deadline = next;
179
180 if (GRPC_TRACE_FLAG_ENABLED(timer_check)) {
181 grpc_core::Duration wait_time = next - grpc_core::Timestamp::Now();
182 LOG(INFO) << "sleep for a " << wait_time.millis() << " milliseconds";
183 }
184 } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline
185 next = grpc_core::Timestamp::InfFuture();
186 }
187 }
188
189 if (GRPC_TRACE_FLAG_ENABLED(timer_check) &&
190 next == grpc_core::Timestamp::InfFuture()) {
191 LOG(INFO) << "sleep until kicked";
192 }
193
194 gpr_cv_wait(&g_cv_wait, &g_mu, next.as_timespec(GPR_CLOCK_MONOTONIC));
195
196 GRPC_TRACE_LOG(timer_check, INFO)
197 << "wait ended: was_timed:"
198 << (my_timed_waiter_generation == g_timed_waiter_generation)
199 << " kicked:" << g_kicked;
200 // if this was the timed waiter, then we need to check timers, and flag
201 // that there's now no timed waiter... we'll look for a replacement if
202 // there's work to do after checking timers (code above)
203 if (my_timed_waiter_generation == g_timed_waiter_generation) {
204 ++g_wakeups;
205 g_has_timed_waiter = false;
206 g_timed_waiter_deadline = grpc_core::Timestamp::InfFuture();
207 }
208 }
209
210 // if this was a kick from the timer system, consume it (and don't stop
211 // this thread yet)
212 if (g_kicked) {
213 grpc_timer_consume_kick();
214 g_kicked = false;
215 }
216
217 gpr_mu_unlock(&g_mu);
218 return true;
219 }
220
timer_main_loop()221 static void timer_main_loop() {
222 for (;;) {
223 grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
224 grpc_core::ExecCtx::Get()->InvalidateNow();
225
226 // check timer state, updates next to the next time to run a check
227 switch (grpc_timer_check(&next)) {
228 case GRPC_TIMERS_FIRED:
229 run_some_timers();
230 break;
231 case GRPC_TIMERS_NOT_CHECKED:
232 // This case only happens under contention, meaning more than one timer
233 // manager thread checked timers concurrently.
234
235 // If that happens, we're guaranteed that some other thread has just
236 // checked timers, and this will avalanche into some other thread seeing
237 // empty timers and doing a timed sleep.
238
239 // Consequently, we can just sleep forever here and be happy at some
240 // saved wakeup cycles.
241 GRPC_TRACE_LOG(timer_check, INFO)
242 << "timers not checked: expect another thread to";
243 next = grpc_core::Timestamp::InfFuture();
244 ABSL_FALLTHROUGH_INTENDED;
245 case GRPC_TIMERS_CHECKED_AND_EMPTY:
246 if (!wait_until(next)) {
247 return;
248 }
249 break;
250 }
251 }
252 }
253
timer_thread_cleanup(completed_thread * ct)254 static void timer_thread_cleanup(completed_thread* ct) {
255 gpr_mu_lock(&g_mu);
256 // terminate the thread: drop the waiter count, thread count, and let whomever
257 // stopped the threading stuff know that we're done
258 --g_waiter_count;
259 --g_thread_count;
260 if (0 == g_thread_count) {
261 gpr_cv_signal(&g_cv_shutdown);
262 }
263 ct->next = g_completed_threads;
264 g_completed_threads = ct;
265 gpr_mu_unlock(&g_mu);
266 GRPC_TRACE_LOG(timer_check, INFO) << "End timer thread";
267 }
268
timer_thread(void * completed_thread_ptr)269 static void timer_thread(void* completed_thread_ptr) {
270 // this threads exec_ctx: we try to run things through to completion here
271 // since it's easy to spin up new threads
272 grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
273 timer_main_loop();
274
275 timer_thread_cleanup(static_cast<completed_thread*>(completed_thread_ptr));
276 }
277
start_threads(void)278 static void start_threads(void) {
279 gpr_mu_lock(&g_mu);
280 if (!g_threaded) {
281 g_threaded = true;
282 start_timer_thread_and_unlock();
283 } else {
284 gpr_mu_unlock(&g_mu);
285 }
286 }
287
grpc_timer_manager_init(void)288 void grpc_timer_manager_init(void) {
289 gpr_mu_init(&g_mu);
290 gpr_cv_init(&g_cv_wait);
291 gpr_cv_init(&g_cv_shutdown);
292 g_threaded = false;
293 g_thread_count = 0;
294 g_waiter_count = 0;
295 g_completed_threads = nullptr;
296
297 g_has_timed_waiter = false;
298 g_timed_waiter_deadline = grpc_core::Timestamp::InfFuture();
299
300 if (g_start_threaded) start_threads();
301 }
302
stop_threads(void)303 static void stop_threads(void) {
304 gpr_mu_lock(&g_mu);
305 GRPC_TRACE_LOG(timer_check, INFO)
306 << "stop timer threads: threaded=" << g_threaded;
307 if (g_threaded) {
308 g_threaded = false;
309 gpr_cv_broadcast(&g_cv_wait);
310 GRPC_TRACE_LOG(timer_check, INFO)
311 << "num timer threads: " << g_thread_count;
312 while (g_thread_count > 0) {
313 gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
314 GRPC_TRACE_LOG(timer_check, INFO)
315 << "num timer threads: " << g_thread_count;
316 gc_completed_threads();
317 }
318 }
319 g_wakeups = 0;
320 gpr_mu_unlock(&g_mu);
321 }
322
grpc_timer_manager_shutdown(void)323 void grpc_timer_manager_shutdown(void) {
324 stop_threads();
325
326 gpr_mu_destroy(&g_mu);
327 gpr_cv_destroy(&g_cv_wait);
328 gpr_cv_destroy(&g_cv_shutdown);
329 }
330
grpc_timer_manager_set_threading(bool enabled)331 void grpc_timer_manager_set_threading(bool enabled) {
332 if (enabled) {
333 start_threads();
334 } else {
335 stop_threads();
336 }
337 }
338
grpc_timer_manager_set_start_threaded(bool enabled)339 void grpc_timer_manager_set_start_threaded(bool enabled) {
340 g_start_threaded = enabled;
341 }
342
grpc_kick_poller(void)343 void grpc_kick_poller(void) {
344 gpr_mu_lock(&g_mu);
345 g_kicked = true;
346 g_has_timed_waiter = false;
347 g_timed_waiter_deadline = grpc_core::Timestamp::InfFuture();
348 ++g_timed_waiter_generation;
349 gpr_cv_signal(&g_cv_wait);
350 gpr_mu_unlock(&g_mu);
351 }
352
grpc_timer_manager_get_wakeups_testonly(void)353 uint64_t grpc_timer_manager_get_wakeups_testonly(void) { return g_wakeups; }
354