• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/timer_manager.h"
22 
23 #include <inttypes.h>
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/gprpp/thd.h"
30 #include "src/core/lib/iomgr/timer.h"
31 
32 struct completed_thread {
33   grpc_core::Thread thd;
34   completed_thread* next;
35 };
36 
37 extern grpc_core::TraceFlag grpc_timer_check_trace;
38 
39 // global mutex
40 static gpr_mu g_mu;
41 // are we multi-threaded
42 static bool g_threaded;
43 // cv to wait until a thread is needed
44 static gpr_cv g_cv_wait;
45 // cv for notification when threading ends
46 static gpr_cv g_cv_shutdown;
47 // number of threads in the system
48 static int g_thread_count;
49 // number of threads sitting around waiting
50 static int g_waiter_count;
51 // linked list of threads that have completed (and need joining)
52 static completed_thread* g_completed_threads;
53 // was the manager kicked by the timer system
54 static bool g_kicked;
55 // is there a thread waiting until the next timer should fire?
56 static bool g_has_timed_waiter;
57 // the deadline of the current timed waiter thread (only relevant if
58 // g_has_timed_waiter is true)
59 static grpc_millis g_timed_waiter_deadline;
60 // generation counter to track which thread is waiting for the next timer
61 static uint64_t g_timed_waiter_generation;
62 // number of timer wakeups
63 static uint64_t g_wakeups;
64 
65 static void timer_thread(void* completed_thread_ptr);
66 
gc_completed_threads(void)67 static void gc_completed_threads(void) {
68   if (g_completed_threads != nullptr) {
69     completed_thread* to_gc = g_completed_threads;
70     g_completed_threads = nullptr;
71     gpr_mu_unlock(&g_mu);
72     while (to_gc != nullptr) {
73       to_gc->thd.Join();
74       completed_thread* next = to_gc->next;
75       gpr_free(to_gc);
76       to_gc = next;
77     }
78     gpr_mu_lock(&g_mu);
79   }
80 }
81 
start_timer_thread_and_unlock(void)82 static void start_timer_thread_and_unlock(void) {
83   GPR_ASSERT(g_threaded);
84   ++g_waiter_count;
85   ++g_thread_count;
86   gpr_mu_unlock(&g_mu);
87   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
88     gpr_log(GPR_INFO, "Spawn timer thread");
89   }
90   completed_thread* ct =
91       static_cast<completed_thread*>(gpr_malloc(sizeof(*ct)));
92   ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct);
93   ct->thd.Start();
94 }
95 
grpc_timer_manager_tick()96 void grpc_timer_manager_tick() {
97   grpc_core::ExecCtx exec_ctx;
98   grpc_timer_check(nullptr);
99 }
100 
run_some_timers()101 static void run_some_timers() {
102   // In the case of timers, the ExecCtx for the thread is declared
103   // in the timer thread itself, but this is the point where we
104   // could start seeing application-level callbacks. No need to
105   // create a new ExecCtx, though, since there already is one and it is
106   // flushed (but not destructed) in this function itself
107   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
108       GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
109 
110   // if there's something to execute...
111   gpr_mu_lock(&g_mu);
112   // remove a waiter from the pool, and start another thread if necessary
113   --g_waiter_count;
114   if (g_waiter_count == 0 && g_threaded) {
115     // The number of timer threads is always increasing until all the threads
116     // are stopped. In rare cases, if a large number of timers fire
117     // simultaneously, we may end up using a large number of threads.
118     start_timer_thread_and_unlock();
119   } else {
120     // if there's no thread waiting with a timeout, kick an existing untimed
121     // waiter so that the next deadline is not missed
122     if (!g_has_timed_waiter) {
123       if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
124         gpr_log(GPR_INFO, "kick untimed waiter");
125       }
126       gpr_cv_signal(&g_cv_wait);
127     }
128     gpr_mu_unlock(&g_mu);
129   }
130   // without our lock, flush the exec_ctx
131   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
132     gpr_log(GPR_INFO, "flush exec_ctx");
133   }
134   grpc_core::ExecCtx::Get()->Flush();
135   gpr_mu_lock(&g_mu);
136   // garbage collect any threads hanging out that are dead
137   gc_completed_threads();
138   // get ready to wait again
139   ++g_waiter_count;
140   gpr_mu_unlock(&g_mu);
141 }
142 
143 // wait until 'next' (or forever if there is already a timed waiter in the pool)
144 // returns true if the thread should continue executing (false if it should
145 // shutdown)
wait_until(grpc_millis next)146 static bool wait_until(grpc_millis next) {
147   gpr_mu_lock(&g_mu);
148   // if we're not threaded anymore, leave
149   if (!g_threaded) {
150     gpr_mu_unlock(&g_mu);
151     return false;
152   }
153 
154   // If g_kicked is true at this point, it means there was a kick from the timer
155   // system that the timer-manager threads here missed. We cannot trust 'next'
156   // here any longer (since there might be an earlier deadline). So if g_kicked
157   // is true at this point, we should quickly exit this and get the next
158   // deadline from the timer system
159 
160   if (!g_kicked) {
161     // if there's no timed waiter, we should become one: that waiter waits
162     // only until the next timer should expire. All other timers wait forever
163     //
164     // 'g_timed_waiter_generation' is a global generation counter. The idea here
165     // is that the thread becoming a timed-waiter increments and stores this
166     // global counter locally in 'my_timed_waiter_generation' before going to
167     // sleep. After waking up, if my_timed_waiter_generation ==
168     // g_timed_waiter_generation, it can be sure that it was the timed_waiter
169     // thread (and that no other thread took over while this was asleep)
170     //
171     // Initialize my_timed_waiter_generation to some value that is NOT equal to
172     // g_timed_waiter_generation
173     uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
174 
175     /* If there's no timed waiter, we should become one: that waiter waits only
176        until the next timer should expire. All other timer threads wait forever
177        unless their 'next' is earlier than the current timed-waiter's deadline
178        (in which case the thread with earlier 'next' takes over as the new timed
179        waiter) */
180     if (next != GRPC_MILLIS_INF_FUTURE) {
181       if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) {
182         my_timed_waiter_generation = ++g_timed_waiter_generation;
183         g_has_timed_waiter = true;
184         g_timed_waiter_deadline = next;
185 
186         if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
187           grpc_millis wait_time = next - grpc_core::ExecCtx::Get()->Now();
188           gpr_log(GPR_INFO, "sleep for a %" PRId64 " milliseconds", wait_time);
189         }
190       } else {  // g_timed_waiter == true && next >= g_timed_waiter_deadline
191         next = GRPC_MILLIS_INF_FUTURE;
192       }
193     }
194 
195     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace) &&
196         next == GRPC_MILLIS_INF_FUTURE) {
197       gpr_log(GPR_INFO, "sleep until kicked");
198     }
199 
200     gpr_cv_wait(&g_cv_wait, &g_mu,
201                 grpc_millis_to_timespec(next, GPR_CLOCK_MONOTONIC));
202 
203     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
204       gpr_log(GPR_INFO, "wait ended: was_timed:%d kicked:%d",
205               my_timed_waiter_generation == g_timed_waiter_generation,
206               g_kicked);
207     }
208     // if this was the timed waiter, then we need to check timers, and flag
209     // that there's now no timed waiter... we'll look for a replacement if
210     // there's work to do after checking timers (code above)
211     if (my_timed_waiter_generation == g_timed_waiter_generation) {
212       ++g_wakeups;
213       g_has_timed_waiter = false;
214       g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
215     }
216   }
217 
218   // if this was a kick from the timer system, consume it (and don't stop
219   // this thread yet)
220   if (g_kicked) {
221     grpc_timer_consume_kick();
222     g_kicked = false;
223   }
224 
225   gpr_mu_unlock(&g_mu);
226   return true;
227 }
228 
timer_main_loop()229 static void timer_main_loop() {
230   for (;;) {
231     grpc_millis next = GRPC_MILLIS_INF_FUTURE;
232     grpc_core::ExecCtx::Get()->InvalidateNow();
233 
234     // check timer state, updates next to the next time to run a check
235     switch (grpc_timer_check(&next)) {
236       case GRPC_TIMERS_FIRED:
237         run_some_timers();
238         break;
239       case GRPC_TIMERS_NOT_CHECKED:
240         /* This case only happens under contention, meaning more than one timer
241            manager thread checked timers concurrently.
242 
243            If that happens, we're guaranteed that some other thread has just
244            checked timers, and this will avalanche into some other thread seeing
245            empty timers and doing a timed sleep.
246 
247            Consequently, we can just sleep forever here and be happy at some
248            saved wakeup cycles. */
249         if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
250           gpr_log(GPR_INFO, "timers not checked: expect another thread to");
251         }
252         next = GRPC_MILLIS_INF_FUTURE;
253       // fallthrough
254       case GRPC_TIMERS_CHECKED_AND_EMPTY:
255         if (!wait_until(next)) {
256           return;
257         }
258         break;
259     }
260   }
261 }
262 
timer_thread_cleanup(completed_thread * ct)263 static void timer_thread_cleanup(completed_thread* ct) {
264   gpr_mu_lock(&g_mu);
265   // terminate the thread: drop the waiter count, thread count, and let whomever
266   // stopped the threading stuff know that we're done
267   --g_waiter_count;
268   --g_thread_count;
269   if (0 == g_thread_count) {
270     gpr_cv_signal(&g_cv_shutdown);
271   }
272   ct->next = g_completed_threads;
273   g_completed_threads = ct;
274   gpr_mu_unlock(&g_mu);
275   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
276     gpr_log(GPR_INFO, "End timer thread");
277   }
278 }
279 
timer_thread(void * completed_thread_ptr)280 static void timer_thread(void* completed_thread_ptr) {
281   // this threads exec_ctx: we try to run things through to completion here
282   // since it's easy to spin up new threads
283   grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
284   timer_main_loop();
285 
286   timer_thread_cleanup(static_cast<completed_thread*>(completed_thread_ptr));
287 }
288 
start_threads(void)289 static void start_threads(void) {
290   gpr_mu_lock(&g_mu);
291   if (!g_threaded) {
292     g_threaded = true;
293     start_timer_thread_and_unlock();
294   } else {
295     gpr_mu_unlock(&g_mu);
296   }
297 }
298 
grpc_timer_manager_init(void)299 void grpc_timer_manager_init(void) {
300   gpr_mu_init(&g_mu);
301   gpr_cv_init(&g_cv_wait);
302   gpr_cv_init(&g_cv_shutdown);
303   g_threaded = false;
304   g_thread_count = 0;
305   g_waiter_count = 0;
306   g_completed_threads = nullptr;
307 
308   g_has_timed_waiter = false;
309   g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
310 
311   start_threads();
312 }
313 
stop_threads(void)314 static void stop_threads(void) {
315   gpr_mu_lock(&g_mu);
316   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
317     gpr_log(GPR_INFO, "stop timer threads: threaded=%d", g_threaded);
318   }
319   if (g_threaded) {
320     g_threaded = false;
321     gpr_cv_broadcast(&g_cv_wait);
322     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
323       gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
324     }
325     while (g_thread_count > 0) {
326       gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
327       if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
328         gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
329       }
330       gc_completed_threads();
331     }
332   }
333   g_wakeups = 0;
334   gpr_mu_unlock(&g_mu);
335 }
336 
grpc_timer_manager_shutdown(void)337 void grpc_timer_manager_shutdown(void) {
338   stop_threads();
339 
340   gpr_mu_destroy(&g_mu);
341   gpr_cv_destroy(&g_cv_wait);
342   gpr_cv_destroy(&g_cv_shutdown);
343 }
344 
grpc_timer_manager_set_threading(bool enabled)345 void grpc_timer_manager_set_threading(bool enabled) {
346   if (enabled) {
347     start_threads();
348   } else {
349     stop_threads();
350   }
351 }
352 
grpc_kick_poller(void)353 void grpc_kick_poller(void) {
354   gpr_mu_lock(&g_mu);
355   g_kicked = true;
356   g_has_timed_waiter = false;
357   g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
358   ++g_timed_waiter_generation;
359   gpr_cv_signal(&g_cv_wait);
360   gpr_mu_unlock(&g_mu);
361 }
362 
grpc_timer_manager_get_wakeups_testonly(void)363 uint64_t grpc_timer_manager_get_wakeups_testonly(void) { return g_wakeups; }
364