• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/iomgr/timer_manager.h"
20 
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/port_platform.h>
23 #include <inttypes.h>
24 
25 #include "absl/log/check.h"
26 #include "absl/log/log.h"
27 #include "src/core/lib/debug/trace.h"
28 #include "src/core/lib/iomgr/timer.h"
29 #include "src/core/util/crash.h"
30 #include "src/core/util/thd.h"
31 
32 struct completed_thread {
33   grpc_core::Thread thd;
34   completed_thread* next;
35 };
36 
37 // global mutex
38 static gpr_mu g_mu;
39 // are we multi-threaded
40 static bool g_threaded;
41 // should we start multi-threaded
42 static bool g_start_threaded = true;
43 // cv to wait until a thread is needed
44 static gpr_cv g_cv_wait;
45 // cv for notification when threading ends
46 static gpr_cv g_cv_shutdown;
47 // number of threads in the system
48 static int g_thread_count;
49 // number of threads sitting around waiting
50 static int g_waiter_count;
51 // linked list of threads that have completed (and need joining)
52 static completed_thread* g_completed_threads;
53 // was the manager kicked by the timer system
54 static bool g_kicked;
55 // is there a thread waiting until the next timer should fire?
56 static bool g_has_timed_waiter;
57 // the deadline of the current timed waiter thread (only relevant if
58 // g_has_timed_waiter is true)
59 static grpc_core::Timestamp g_timed_waiter_deadline;
60 // generation counter to track which thread is waiting for the next timer
61 static uint64_t g_timed_waiter_generation;
62 // number of timer wakeups
63 static uint64_t g_wakeups;
64 
65 static void timer_thread(void* completed_thread_ptr);
66 
gc_completed_threads(void)67 static void gc_completed_threads(void) {
68   if (g_completed_threads != nullptr) {
69     completed_thread* to_gc = g_completed_threads;
70     g_completed_threads = nullptr;
71     gpr_mu_unlock(&g_mu);
72     while (to_gc != nullptr) {
73       to_gc->thd.Join();
74       completed_thread* next = to_gc->next;
75       gpr_free(to_gc);
76       to_gc = next;
77     }
78     gpr_mu_lock(&g_mu);
79   }
80 }
81 
start_timer_thread_and_unlock(void)82 static void start_timer_thread_and_unlock(void) {
83   CHECK(g_threaded);
84   ++g_waiter_count;
85   ++g_thread_count;
86   gpr_mu_unlock(&g_mu);
87   GRPC_TRACE_LOG(timer_check, INFO) << "Spawn timer thread";
88   completed_thread* ct =
89       static_cast<completed_thread*>(gpr_malloc(sizeof(*ct)));
90   ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct);
91   ct->thd.Start();
92 }
93 
grpc_timer_manager_tick()94 void grpc_timer_manager_tick() {
95   grpc_core::ExecCtx exec_ctx;
96   grpc_timer_check(nullptr);
97 }
98 
run_some_timers()99 static void run_some_timers() {
100   // In the case of timers, the ExecCtx for the thread is declared
101   // in the timer thread itself, but this is the point where we
102   // could start seeing application-level callbacks. No need to
103   // create a new ExecCtx, though, since there already is one and it is
104   // flushed (but not destructed) in this function itself
105   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
106       GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
107 
108   // if there's something to execute...
109   gpr_mu_lock(&g_mu);
110   // remove a waiter from the pool, and start another thread if necessary
111   --g_waiter_count;
112   if (g_waiter_count == 0 && g_threaded) {
113     // The number of timer threads is always increasing until all the threads
114     // are stopped. In rare cases, if a large number of timers fire
115     // simultaneously, we may end up using a large number of threads.
116     start_timer_thread_and_unlock();
117   } else {
118     // if there's no thread waiting with a timeout, kick an existing untimed
119     // waiter so that the next deadline is not missed
120     if (!g_has_timed_waiter) {
121       GRPC_TRACE_LOG(timer_check, INFO) << "kick untimed waiter";
122       gpr_cv_signal(&g_cv_wait);
123     }
124     gpr_mu_unlock(&g_mu);
125   }
126   // without our lock, flush the exec_ctx
127   GRPC_TRACE_LOG(timer_check, INFO) << "flush exec_ctx";
128   grpc_core::ExecCtx::Get()->Flush();
129   gpr_mu_lock(&g_mu);
130   // garbage collect any threads that are dead
131   gc_completed_threads();
132   // get ready to wait again
133   ++g_waiter_count;
134   gpr_mu_unlock(&g_mu);
135 }
136 
137 // wait until 'next' (or forever if there is already a timed waiter in the pool)
138 // returns true if the thread should continue executing (false if it should
139 // shutdown)
wait_until(grpc_core::Timestamp next)140 static bool wait_until(grpc_core::Timestamp next) {
141   gpr_mu_lock(&g_mu);
142   // if we're not threaded anymore, leave
143   if (!g_threaded) {
144     gpr_mu_unlock(&g_mu);
145     return false;
146   }
147 
148   // If g_kicked is true at this point, it means there was a kick from the timer
149   // system that the timer-manager threads here missed. We cannot trust 'next'
150   // here any longer (since there might be an earlier deadline). So if g_kicked
151   // is true at this point, we should quickly exit this and get the next
152   // deadline from the timer system
153 
154   if (!g_kicked) {
155     // if there's no timed waiter, we should become one: that waiter waits
156     // only until the next timer should expire. All other timers wait forever
157     //
158     // 'g_timed_waiter_generation' is a global generation counter. The idea here
159     // is that the thread becoming a timed-waiter increments and stores this
160     // global counter locally in 'my_timed_waiter_generation' before going to
161     // sleep. After waking up, if my_timed_waiter_generation ==
162     // g_timed_waiter_generation, it can be sure that it was the timed_waiter
163     // thread (and that no other thread took over while this was asleep)
164     //
165     // Initialize my_timed_waiter_generation to some value that is NOT equal to
166     // g_timed_waiter_generation
167     uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
168 
169     // If there's no timed waiter, we should become one: that waiter waits only
170     // until the next timer should expire. All other timer threads wait forever
171     // unless their 'next' is earlier than the current timed-waiter's deadline
172     // (in which case the thread with earlier 'next' takes over as the new timed
173     // waiter)
174     if (next != grpc_core::Timestamp::InfFuture()) {
175       if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) {
176         my_timed_waiter_generation = ++g_timed_waiter_generation;
177         g_has_timed_waiter = true;
178         g_timed_waiter_deadline = next;
179 
180         if (GRPC_TRACE_FLAG_ENABLED(timer_check)) {
181           grpc_core::Duration wait_time = next - grpc_core::Timestamp::Now();
182           LOG(INFO) << "sleep for a " << wait_time.millis() << " milliseconds";
183         }
184       } else {  // g_timed_waiter == true && next >= g_timed_waiter_deadline
185         next = grpc_core::Timestamp::InfFuture();
186       }
187     }
188 
189     if (GRPC_TRACE_FLAG_ENABLED(timer_check) &&
190         next == grpc_core::Timestamp::InfFuture()) {
191       LOG(INFO) << "sleep until kicked";
192     }
193 
194     gpr_cv_wait(&g_cv_wait, &g_mu, next.as_timespec(GPR_CLOCK_MONOTONIC));
195 
196     GRPC_TRACE_LOG(timer_check, INFO)
197         << "wait ended: was_timed:"
198         << (my_timed_waiter_generation == g_timed_waiter_generation)
199         << " kicked:" << g_kicked;
200     // if this was the timed waiter, then we need to check timers, and flag
201     // that there's now no timed waiter... we'll look for a replacement if
202     // there's work to do after checking timers (code above)
203     if (my_timed_waiter_generation == g_timed_waiter_generation) {
204       ++g_wakeups;
205       g_has_timed_waiter = false;
206       g_timed_waiter_deadline = grpc_core::Timestamp::InfFuture();
207     }
208   }
209 
210   // if this was a kick from the timer system, consume it (and don't stop
211   // this thread yet)
212   if (g_kicked) {
213     grpc_timer_consume_kick();
214     g_kicked = false;
215   }
216 
217   gpr_mu_unlock(&g_mu);
218   return true;
219 }
220 
timer_main_loop()221 static void timer_main_loop() {
222   for (;;) {
223     grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
224     grpc_core::ExecCtx::Get()->InvalidateNow();
225 
226     // check timer state, updates next to the next time to run a check
227     switch (grpc_timer_check(&next)) {
228       case GRPC_TIMERS_FIRED:
229         run_some_timers();
230         break;
231       case GRPC_TIMERS_NOT_CHECKED:
232         // This case only happens under contention, meaning more than one timer
233         // manager thread checked timers concurrently.
234 
235         // If that happens, we're guaranteed that some other thread has just
236         // checked timers, and this will avalanche into some other thread seeing
237         // empty timers and doing a timed sleep.
238 
239         // Consequently, we can just sleep forever here and be happy at some
240         // saved wakeup cycles.
241         GRPC_TRACE_LOG(timer_check, INFO)
242             << "timers not checked: expect another thread to";
243         next = grpc_core::Timestamp::InfFuture();
244         ABSL_FALLTHROUGH_INTENDED;
245       case GRPC_TIMERS_CHECKED_AND_EMPTY:
246         if (!wait_until(next)) {
247           return;
248         }
249         break;
250     }
251   }
252 }
253 
timer_thread_cleanup(completed_thread * ct)254 static void timer_thread_cleanup(completed_thread* ct) {
255   gpr_mu_lock(&g_mu);
256   // terminate the thread: drop the waiter count, thread count, and let whomever
257   // stopped the threading stuff know that we're done
258   --g_waiter_count;
259   --g_thread_count;
260   if (0 == g_thread_count) {
261     gpr_cv_signal(&g_cv_shutdown);
262   }
263   ct->next = g_completed_threads;
264   g_completed_threads = ct;
265   gpr_mu_unlock(&g_mu);
266   GRPC_TRACE_LOG(timer_check, INFO) << "End timer thread";
267 }
268 
timer_thread(void * completed_thread_ptr)269 static void timer_thread(void* completed_thread_ptr) {
270   // this threads exec_ctx: we try to run things through to completion here
271   // since it's easy to spin up new threads
272   grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
273   timer_main_loop();
274 
275   timer_thread_cleanup(static_cast<completed_thread*>(completed_thread_ptr));
276 }
277 
start_threads(void)278 static void start_threads(void) {
279   gpr_mu_lock(&g_mu);
280   if (!g_threaded) {
281     g_threaded = true;
282     start_timer_thread_and_unlock();
283   } else {
284     gpr_mu_unlock(&g_mu);
285   }
286 }
287 
grpc_timer_manager_init(void)288 void grpc_timer_manager_init(void) {
289   gpr_mu_init(&g_mu);
290   gpr_cv_init(&g_cv_wait);
291   gpr_cv_init(&g_cv_shutdown);
292   g_threaded = false;
293   g_thread_count = 0;
294   g_waiter_count = 0;
295   g_completed_threads = nullptr;
296 
297   g_has_timed_waiter = false;
298   g_timed_waiter_deadline = grpc_core::Timestamp::InfFuture();
299 
300   if (g_start_threaded) start_threads();
301 }
302 
stop_threads(void)303 static void stop_threads(void) {
304   gpr_mu_lock(&g_mu);
305   GRPC_TRACE_LOG(timer_check, INFO)
306       << "stop timer threads: threaded=" << g_threaded;
307   if (g_threaded) {
308     g_threaded = false;
309     gpr_cv_broadcast(&g_cv_wait);
310     GRPC_TRACE_LOG(timer_check, INFO)
311         << "num timer threads: " << g_thread_count;
312     while (g_thread_count > 0) {
313       gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
314       GRPC_TRACE_LOG(timer_check, INFO)
315           << "num timer threads: " << g_thread_count;
316       gc_completed_threads();
317     }
318   }
319   g_wakeups = 0;
320   gpr_mu_unlock(&g_mu);
321 }
322 
grpc_timer_manager_shutdown(void)323 void grpc_timer_manager_shutdown(void) {
324   stop_threads();
325 
326   gpr_mu_destroy(&g_mu);
327   gpr_cv_destroy(&g_cv_wait);
328   gpr_cv_destroy(&g_cv_shutdown);
329 }
330 
grpc_timer_manager_set_threading(bool enabled)331 void grpc_timer_manager_set_threading(bool enabled) {
332   if (enabled) {
333     start_threads();
334   } else {
335     stop_threads();
336   }
337 }
338 
grpc_timer_manager_set_start_threaded(bool enabled)339 void grpc_timer_manager_set_start_threaded(bool enabled) {
340   g_start_threaded = enabled;
341 }
342 
grpc_kick_poller(void)343 void grpc_kick_poller(void) {
344   gpr_mu_lock(&g_mu);
345   g_kicked = true;
346   g_has_timed_waiter = false;
347   g_timed_waiter_deadline = grpc_core::Timestamp::InfFuture();
348   ++g_timed_waiter_generation;
349   gpr_cv_signal(&g_cv_wait);
350   gpr_mu_unlock(&g_mu);
351 }
352 
grpc_timer_manager_get_wakeups_testonly(void)353 uint64_t grpc_timer_manager_get_wakeups_testonly(void) { return g_wakeups; }
354