• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #include <inttypes.h>
24 
25 #include <string>
26 
27 #include "absl/strings/str_cat.h"
28 
29 #include "src/core/lib/iomgr/timer.h"
30 
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/cpu.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/sync.h>
35 
36 #include "src/core/lib/debug/trace.h"
37 #include "src/core/lib/gpr/spinlock.h"
38 #include "src/core/lib/gpr/tls.h"
39 #include "src/core/lib/gpr/useful.h"
40 #include "src/core/lib/iomgr/exec_ctx.h"
41 #include "src/core/lib/iomgr/time_averaged_stats.h"
42 #include "src/core/lib/iomgr/timer_heap.h"
43 
44 #define INVALID_HEAP_INDEX 0xffffffffu
45 
46 #define ADD_DEADLINE_SCALE 0.33
47 #define MIN_QUEUE_WINDOW_DURATION 0.01
48 #define MAX_QUEUE_WINDOW_DURATION 1
49 
50 grpc_core::TraceFlag grpc_timer_trace(false, "timer");
51 grpc_core::TraceFlag grpc_timer_check_trace(false, "timer_check");
52 
53 /* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
54  * deadlines earlier than 'queue_deadline_cap' are maintained in the heap and
55  * others are maintained in the list (unordered). This helps to keep the number
56  * of elements in the heap low.
57  *
58  * The 'queue_deadline_cap' gets recomputed periodically based on the timer
59  * stats maintained in 'stats' and the relevant timers are then moved from the
60  * 'list' to 'heap'.
61  */
62 struct timer_shard {
63   gpr_mu mu;
64   grpc_time_averaged_stats stats;
65   /* All and only timers with deadlines < this will be in the heap. */
66   grpc_millis queue_deadline_cap;
67   /* The deadline of the next timer due in this shard. */
68   grpc_millis min_deadline;
69   /* Index of this timer_shard in the g_shard_queue. */
70   uint32_t shard_queue_index;
71   /* This holds all timers with deadlines < queue_deadline_cap. Timers in this
72      list have the top bit of their deadline set to 0. */
73   grpc_timer_heap heap;
74   /* This holds timers whose deadline is >= queue_deadline_cap. */
75   grpc_timer list;
76 };
77 static size_t g_num_shards;
78 
79 /* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
80  * is hashed to select the timer shard to add the timer to */
81 static timer_shard* g_shards;
82 
83 /* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
84  * the deadline of the next timer in each shard).
85  * Access to this is protected by g_shared_mutables.mu */
86 static timer_shard** g_shard_queue;
87 
88 #ifndef NDEBUG
89 
90 /* == DEBUG ONLY: hash table for duplicate timer detection == */
91 
92 #define NUM_HASH_BUCKETS 1009 /* Prime number close to 1000 */
93 
94 static gpr_mu g_hash_mu[NUM_HASH_BUCKETS]; /* One mutex per bucket */
95 static grpc_timer* g_timer_ht[NUM_HASH_BUCKETS] = {nullptr};
96 
init_timer_ht()97 static void init_timer_ht() {
98   for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
99     gpr_mu_init(&g_hash_mu[i]);
100   }
101 }
102 
destroy_timer_ht()103 static void destroy_timer_ht() {
104   for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
105     gpr_mu_destroy(&g_hash_mu[i]);
106   }
107 }
108 
is_in_ht(grpc_timer * t)109 static bool is_in_ht(grpc_timer* t) {
110   size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
111 
112   gpr_mu_lock(&g_hash_mu[i]);
113   grpc_timer* p = g_timer_ht[i];
114   while (p != nullptr && p != t) {
115     p = p->hash_table_next;
116   }
117   gpr_mu_unlock(&g_hash_mu[i]);
118 
119   return (p == t);
120 }
121 
add_to_ht(grpc_timer * t)122 static void add_to_ht(grpc_timer* t) {
123   GPR_ASSERT(!t->hash_table_next);
124   size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
125 
126   gpr_mu_lock(&g_hash_mu[i]);
127   grpc_timer* p = g_timer_ht[i];
128   while (p != nullptr && p != t) {
129     p = p->hash_table_next;
130   }
131 
132   if (p == t) {
133     grpc_closure* c = t->closure;
134     gpr_log(GPR_ERROR,
135             "** Duplicate timer (%p) being added. Closure: (%p), created at: "
136             "(%s:%d), scheduled at: (%s:%d) **",
137             t, c, c->file_created, c->line_created, c->file_initiated,
138             c->line_initiated);
139     abort();
140   }
141 
142   /* Timer not present in the bucket. Insert at head of the list */
143   t->hash_table_next = g_timer_ht[i];
144   g_timer_ht[i] = t;
145   gpr_mu_unlock(&g_hash_mu[i]);
146 }
147 
remove_from_ht(grpc_timer * t)148 static void remove_from_ht(grpc_timer* t) {
149   size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
150   bool removed = false;
151 
152   gpr_mu_lock(&g_hash_mu[i]);
153   if (g_timer_ht[i] == t) {
154     g_timer_ht[i] = g_timer_ht[i]->hash_table_next;
155     removed = true;
156   } else if (g_timer_ht[i] != nullptr) {
157     grpc_timer* p = g_timer_ht[i];
158     while (p->hash_table_next != nullptr && p->hash_table_next != t) {
159       p = p->hash_table_next;
160     }
161 
162     if (p->hash_table_next == t) {
163       p->hash_table_next = t->hash_table_next;
164       removed = true;
165     }
166   }
167   gpr_mu_unlock(&g_hash_mu[i]);
168 
169   if (!removed) {
170     grpc_closure* c = t->closure;
171     gpr_log(GPR_ERROR,
172             "** Removing timer (%p) that is not added to hash table. Closure "
173             "(%p), created at: (%s:%d), scheduled at: (%s:%d) **",
174             t, c, c->file_created, c->line_created, c->file_initiated,
175             c->line_initiated);
176     abort();
177   }
178 
179   t->hash_table_next = nullptr;
180 }
181 
182 /* If a timer is added to a timer shard (either heap or a list), it must
183  * be pending. A timer is added to hash table only-if it is added to the
184  * timer shard.
185  * Therefore, if timer->pending is false, it cannot be in hash table */
validate_non_pending_timer(grpc_timer * t)186 static void validate_non_pending_timer(grpc_timer* t) {
187   if (!t->pending && is_in_ht(t)) {
188     grpc_closure* c = t->closure;
189     gpr_log(GPR_ERROR,
190             "** gpr_timer_cancel() called on a non-pending timer (%p) which "
191             "is in the hash table. Closure: (%p), created at: (%s:%d), "
192             "scheduled at: (%s:%d) **",
193             t, c, c->file_created, c->line_created, c->file_initiated,
194             c->line_initiated);
195     abort();
196   }
197 }
198 
199 #define INIT_TIMER_HASH_TABLE() init_timer_ht()
200 #define DESTROY_TIMER_HASH_TABLE() destroy_timer_ht()
201 #define ADD_TO_HASH_TABLE(t) add_to_ht((t))
202 #define REMOVE_FROM_HASH_TABLE(t) remove_from_ht((t))
203 #define VALIDATE_NON_PENDING_TIMER(t) validate_non_pending_timer((t))
204 
205 #else
206 
207 #define INIT_TIMER_HASH_TABLE()
208 #define DESTROY_TIMER_HASH_TABLE()
209 #define ADD_TO_HASH_TABLE(t)
210 #define REMOVE_FROM_HASH_TABLE(t)
211 #define VALIDATE_NON_PENDING_TIMER(t)
212 
213 #endif
214 
215 #if GPR_ARCH_64
216 /* NOTE: TODO(sreek) - Currently the thread local storage support in grpc is
217    for intptr_t which means on 32-bit machines it is not wide enough to hold
218    grpc_millis which is 64-bit. Adding thread local support for 64 bit values
219    is a lot of work for very little gain. So we are currently restricting this
220    optimization to only 64 bit machines */
221 
222 /* Thread local variable that stores the deadline of the next timer the thread
223  * has last-seen. This is an optimization to prevent the thread from checking
224  * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
225  * an expensive operation) */
226 GPR_TLS_DECL(g_last_seen_min_timer);
227 #endif
228 
229 struct shared_mutables {
230   /* The deadline of the next timer due across all timer shards */
231   grpc_millis min_timer;
232   /* Allow only one run_some_expired_timers at once */
233   gpr_spinlock checker_mu;
234   bool initialized;
235   /* Protects g_shard_queue (and the shared_mutables struct itself) */
236   gpr_mu mu;
237 } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
238 
239 static struct shared_mutables g_shared_mutables;
240 
saturating_add(grpc_millis a,grpc_millis b)241 static grpc_millis saturating_add(grpc_millis a, grpc_millis b) {
242   if (a > GRPC_MILLIS_INF_FUTURE - b) {
243     return GRPC_MILLIS_INF_FUTURE;
244   }
245   return a + b;
246 }
247 
248 static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
249                                                        grpc_millis* next,
250                                                        grpc_error* error);
251 
compute_min_deadline(timer_shard * shard)252 static grpc_millis compute_min_deadline(timer_shard* shard) {
253   return grpc_timer_heap_is_empty(&shard->heap)
254              ? saturating_add(shard->queue_deadline_cap, 1)
255              : grpc_timer_heap_top(&shard->heap)->deadline;
256 }
257 
timer_list_init()258 static void timer_list_init() {
259   uint32_t i;
260 
261   g_num_shards = GPR_CLAMP(2 * gpr_cpu_num_cores(), 1, 32);
262   g_shards =
263       static_cast<timer_shard*>(gpr_zalloc(g_num_shards * sizeof(*g_shards)));
264   g_shard_queue = static_cast<timer_shard**>(
265       gpr_zalloc(g_num_shards * sizeof(*g_shard_queue)));
266 
267   g_shared_mutables.initialized = true;
268   g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
269   gpr_mu_init(&g_shared_mutables.mu);
270   g_shared_mutables.min_timer = grpc_core::ExecCtx::Get()->Now();
271 
272 #if GPR_ARCH_64
273   gpr_tls_init(&g_last_seen_min_timer);
274   gpr_tls_set(&g_last_seen_min_timer, 0);
275 #endif
276 
277   for (i = 0; i < g_num_shards; i++) {
278     timer_shard* shard = &g_shards[i];
279     gpr_mu_init(&shard->mu);
280     grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
281                                   0.5);
282     shard->queue_deadline_cap = g_shared_mutables.min_timer;
283     shard->shard_queue_index = i;
284     grpc_timer_heap_init(&shard->heap);
285     shard->list.next = shard->list.prev = &shard->list;
286     shard->min_deadline = compute_min_deadline(shard);
287     g_shard_queue[i] = shard;
288   }
289 
290   INIT_TIMER_HASH_TABLE();
291 }
292 
timer_list_shutdown()293 static void timer_list_shutdown() {
294   size_t i;
295   run_some_expired_timers(
296       GRPC_MILLIS_INF_FUTURE, nullptr,
297       GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
298   for (i = 0; i < g_num_shards; i++) {
299     timer_shard* shard = &g_shards[i];
300     gpr_mu_destroy(&shard->mu);
301     grpc_timer_heap_destroy(&shard->heap);
302   }
303   gpr_mu_destroy(&g_shared_mutables.mu);
304 
305 #if GPR_ARCH_64
306   gpr_tls_destroy(&g_last_seen_min_timer);
307 #endif
308 
309   gpr_free(g_shards);
310   gpr_free(g_shard_queue);
311   g_shared_mutables.initialized = false;
312 
313   DESTROY_TIMER_HASH_TABLE();
314 }
315 
316 /* returns true if the first element in the list */
list_join(grpc_timer * head,grpc_timer * timer)317 static void list_join(grpc_timer* head, grpc_timer* timer) {
318   timer->next = head;
319   timer->prev = head->prev;
320   timer->next->prev = timer->prev->next = timer;
321 }
322 
list_remove(grpc_timer * timer)323 static void list_remove(grpc_timer* timer) {
324   timer->next->prev = timer->prev;
325   timer->prev->next = timer->next;
326 }
327 
swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index)328 static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
329   timer_shard* temp;
330   temp = g_shard_queue[first_shard_queue_index];
331   g_shard_queue[first_shard_queue_index] =
332       g_shard_queue[first_shard_queue_index + 1];
333   g_shard_queue[first_shard_queue_index + 1] = temp;
334   g_shard_queue[first_shard_queue_index]->shard_queue_index =
335       first_shard_queue_index;
336   g_shard_queue[first_shard_queue_index + 1]->shard_queue_index =
337       first_shard_queue_index + 1;
338 }
339 
note_deadline_change(timer_shard * shard)340 static void note_deadline_change(timer_shard* shard) {
341   while (shard->shard_queue_index > 0 &&
342          shard->min_deadline <
343              g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
344     swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
345   }
346   while (shard->shard_queue_index < g_num_shards - 1 &&
347          shard->min_deadline >
348              g_shard_queue[shard->shard_queue_index + 1]->min_deadline) {
349     swap_adjacent_shards_in_queue(shard->shard_queue_index);
350   }
351 }
352 
grpc_timer_init_unset(grpc_timer * timer)353 void grpc_timer_init_unset(grpc_timer* timer) { timer->pending = false; }
354 
timer_init(grpc_timer * timer,grpc_millis deadline,grpc_closure * closure)355 static void timer_init(grpc_timer* timer, grpc_millis deadline,
356                        grpc_closure* closure) {
357   int is_first_timer = 0;
358   timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
359   timer->closure = closure;
360   timer->deadline = deadline;
361 
362 #ifndef NDEBUG
363   timer->hash_table_next = nullptr;
364 #endif
365 
366   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
367     gpr_log(GPR_INFO, "TIMER %p: SET %" PRId64 " now %" PRId64 " call %p[%p]",
368             timer, deadline, grpc_core::ExecCtx::Get()->Now(), closure,
369             closure->cb);
370   }
371 
372   if (!g_shared_mutables.initialized) {
373     timer->pending = false;
374     grpc_core::ExecCtx::Run(
375         DEBUG_LOCATION, timer->closure,
376         GRPC_ERROR_CREATE_FROM_STATIC_STRING(
377             "Attempt to create timer before initialization"));
378     return;
379   }
380 
381   gpr_mu_lock(&shard->mu);
382   timer->pending = true;
383   grpc_millis now = grpc_core::ExecCtx::Get()->Now();
384   if (deadline <= now) {
385     timer->pending = false;
386     grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, GRPC_ERROR_NONE);
387     gpr_mu_unlock(&shard->mu);
388     /* early out */
389     return;
390   }
391 
392   grpc_time_averaged_stats_add_sample(
393       &shard->stats, static_cast<double>(deadline - now) / 1000.0);
394 
395   ADD_TO_HASH_TABLE(timer);
396 
397   if (deadline < shard->queue_deadline_cap) {
398     is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
399   } else {
400     timer->heap_index = INVALID_HEAP_INDEX;
401     list_join(&shard->list, timer);
402   }
403   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
404     gpr_log(GPR_INFO,
405             "  .. add to shard %d with queue_deadline_cap=%" PRId64
406             " => is_first_timer=%s",
407             static_cast<int>(shard - g_shards), shard->queue_deadline_cap,
408             is_first_timer ? "true" : "false");
409   }
410   gpr_mu_unlock(&shard->mu);
411 
412   /* Deadline may have decreased, we need to adjust the master queue.  Note
413      that there is a potential racy unlocked region here.  There could be a
414      reordering of multiple grpc_timer_init calls, at this point, but the < test
415      below should ensure that we err on the side of caution.  There could
416      also be a race with grpc_timer_check, which might beat us to the lock.  In
417      that case, it is possible that the timer that we added will have already
418      run by the time we hold the lock, but that too is a safe error.
419      Finally, it's possible that the grpc_timer_check that intervened failed to
420      trigger the new timer because the min_deadline hadn't yet been reduced.
421      In that case, the timer will simply have to wait for the next
422      grpc_timer_check. */
423   if (is_first_timer) {
424     gpr_mu_lock(&g_shared_mutables.mu);
425     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
426       gpr_log(GPR_INFO, "  .. old shard min_deadline=%" PRId64,
427               shard->min_deadline);
428     }
429     if (deadline < shard->min_deadline) {
430       grpc_millis old_min_deadline = g_shard_queue[0]->min_deadline;
431       shard->min_deadline = deadline;
432       note_deadline_change(shard);
433       if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
434 #if GPR_ARCH_64
435         // TODO(sreek): Using c-style cast here. static_cast<> gives an error
436         // (on mac platforms complaining that gpr_atm* is (long *) while
437         // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
438         // safe since we know that both are pointer types and 64-bit wide.
439         gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
440                                  deadline);
441 #else
442         // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
443         // types (like grpc_millis). So all reads and writes to
444         // g_shared_mutables.min_timer varialbe under g_shared_mutables.mu
445         g_shared_mutables.min_timer = deadline;
446 #endif
447         grpc_kick_poller();
448       }
449     }
450     gpr_mu_unlock(&g_shared_mutables.mu);
451   }
452 }
453 
timer_consume_kick(void)454 static void timer_consume_kick(void) {
455 #if GPR_ARCH_64
456   /* Force re-evaluation of last seen min */
457   gpr_tls_set(&g_last_seen_min_timer, 0);
458 #endif
459 }
460 
timer_cancel(grpc_timer * timer)461 static void timer_cancel(grpc_timer* timer) {
462   if (!g_shared_mutables.initialized) {
463     /* must have already been cancelled, also the shard mutex is invalid */
464     return;
465   }
466 
467   timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
468   gpr_mu_lock(&shard->mu);
469   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
470     gpr_log(GPR_INFO, "TIMER %p: CANCEL pending=%s", timer,
471             timer->pending ? "true" : "false");
472   }
473 
474   if (timer->pending) {
475     REMOVE_FROM_HASH_TABLE(timer);
476 
477     grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure,
478                             GRPC_ERROR_CANCELLED);
479     timer->pending = false;
480     if (timer->heap_index == INVALID_HEAP_INDEX) {
481       list_remove(timer);
482     } else {
483       grpc_timer_heap_remove(&shard->heap, timer);
484     }
485   } else {
486     VALIDATE_NON_PENDING_TIMER(timer);
487   }
488   gpr_mu_unlock(&shard->mu);
489 }
490 
491 /* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
492    all relevant timers in shard->list (i.e timers with deadlines earlier than
493    'queue_deadline_cap') into into shard->heap.
494    Returns 'true' if shard->heap has at least ONE element
495    REQUIRES: shard->mu locked */
refill_heap(timer_shard * shard,grpc_millis now)496 static bool refill_heap(timer_shard* shard, grpc_millis now) {
497   /* Compute the new queue window width and bound by the limits: */
498   double computed_deadline_delta =
499       grpc_time_averaged_stats_update_average(&shard->stats) *
500       ADD_DEADLINE_SCALE;
501   double deadline_delta =
502       GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION,
503                 MAX_QUEUE_WINDOW_DURATION);
504   grpc_timer *timer, *next;
505 
506   /* Compute the new cap and put all timers under it into the queue: */
507   shard->queue_deadline_cap =
508       saturating_add(GPR_MAX(now, shard->queue_deadline_cap),
509                      static_cast<grpc_millis>(deadline_delta * 1000.0));
510 
511   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
512     gpr_log(GPR_INFO, "  .. shard[%d]->queue_deadline_cap --> %" PRId64,
513             static_cast<int>(shard - g_shards), shard->queue_deadline_cap);
514   }
515   for (timer = shard->list.next; timer != &shard->list; timer = next) {
516     next = timer->next;
517 
518     if (timer->deadline < shard->queue_deadline_cap) {
519       if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
520         gpr_log(GPR_INFO, "  .. add timer with deadline %" PRId64 " to heap",
521                 timer->deadline);
522       }
523       list_remove(timer);
524       grpc_timer_heap_add(&shard->heap, timer);
525     }
526   }
527   return !grpc_timer_heap_is_empty(&shard->heap);
528 }
529 
530 /* This pops the next non-cancelled timer with deadline <= now from the
531    queue, or returns NULL if there isn't one.
532    REQUIRES: shard->mu locked */
pop_one(timer_shard * shard,grpc_millis now)533 static grpc_timer* pop_one(timer_shard* shard, grpc_millis now) {
534   grpc_timer* timer;
535   for (;;) {
536     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
537       gpr_log(GPR_INFO, "  .. shard[%d]: heap_empty=%s",
538               static_cast<int>(shard - g_shards),
539               grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false");
540     }
541     if (grpc_timer_heap_is_empty(&shard->heap)) {
542       if (now < shard->queue_deadline_cap) return nullptr;
543       if (!refill_heap(shard, now)) return nullptr;
544     }
545     timer = grpc_timer_heap_top(&shard->heap);
546     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
547       gpr_log(GPR_INFO,
548               "  .. check top timer deadline=%" PRId64 " now=%" PRId64,
549               timer->deadline, now);
550     }
551     if (timer->deadline > now) return nullptr;
552     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
553       gpr_log(GPR_INFO, "TIMER %p: FIRE %" PRId64 "ms late", timer,
554               now - timer->deadline);
555     }
556     timer->pending = false;
557     grpc_timer_heap_pop(&shard->heap);
558     return timer;
559   }
560 }
561 
562 /* REQUIRES: shard->mu unlocked */
pop_timers(timer_shard * shard,grpc_millis now,grpc_millis * new_min_deadline,grpc_error * error)563 static size_t pop_timers(timer_shard* shard, grpc_millis now,
564                          grpc_millis* new_min_deadline, grpc_error* error) {
565   size_t n = 0;
566   grpc_timer* timer;
567   gpr_mu_lock(&shard->mu);
568   while ((timer = pop_one(shard, now))) {
569     REMOVE_FROM_HASH_TABLE(timer);
570     grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure,
571                             GRPC_ERROR_REF(error));
572     n++;
573   }
574   *new_min_deadline = compute_min_deadline(shard);
575   gpr_mu_unlock(&shard->mu);
576   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
577     gpr_log(GPR_INFO, "  .. shard[%d] popped %" PRIdPTR,
578             static_cast<int>(shard - g_shards), n);
579   }
580   return n;
581 }
582 
run_some_expired_timers(grpc_millis now,grpc_millis * next,grpc_error * error)583 static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
584                                                        grpc_millis* next,
585                                                        grpc_error* error) {
586   grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED;
587 
588 #if GPR_ARCH_64
589   // TODO(sreek): Using c-style cast here. static_cast<> gives an error (on
590   // mac platforms complaining that gpr_atm* is (long *) while
591   // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
592   // safe since we know that both are pointer types and 64-bit wide
593   grpc_millis min_timer = static_cast<grpc_millis>(
594       gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer)));
595   gpr_tls_set(&g_last_seen_min_timer, min_timer);
596 #else
597   // On 32-bit systems, gpr_atm_no_barrier_load does not work on 64-bit types
598   // (like grpc_millis). So all reads and writes to g_shared_mutables.min_timer
599   // are done under g_shared_mutables.mu
600   gpr_mu_lock(&g_shared_mutables.mu);
601   grpc_millis min_timer = g_shared_mutables.min_timer;
602   gpr_mu_unlock(&g_shared_mutables.mu);
603 #endif
604   if (now < min_timer) {
605     if (next != nullptr) *next = GPR_MIN(*next, min_timer);
606     return GRPC_TIMERS_CHECKED_AND_EMPTY;
607   }
608 
609   if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) {
610     gpr_mu_lock(&g_shared_mutables.mu);
611     result = GRPC_TIMERS_CHECKED_AND_EMPTY;
612 
613     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
614       gpr_log(GPR_INFO, "  .. shard[%d]->min_deadline = %" PRId64,
615               static_cast<int>(g_shard_queue[0] - g_shards),
616               g_shard_queue[0]->min_deadline);
617     }
618 
619     while (g_shard_queue[0]->min_deadline < now ||
620            (now != GRPC_MILLIS_INF_FUTURE &&
621             g_shard_queue[0]->min_deadline == now)) {
622       grpc_millis new_min_deadline;
623 
624       /* For efficiency, we pop as many available timers as we can from the
625          shard.  This may violate perfect timer deadline ordering, but that
626          shouldn't be a big deal because we don't make ordering guarantees. */
627       if (pop_timers(g_shard_queue[0], now, &new_min_deadline, error) > 0) {
628         result = GRPC_TIMERS_FIRED;
629       }
630 
631       if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
632         gpr_log(GPR_INFO,
633                 "  .. result --> %d"
634                 ", shard[%d]->min_deadline %" PRId64 " --> %" PRId64
635                 ", now=%" PRId64,
636                 result, static_cast<int>(g_shard_queue[0] - g_shards),
637                 g_shard_queue[0]->min_deadline, new_min_deadline, now);
638       }
639 
640       /* An grpc_timer_init() on the shard could intervene here, adding a new
641          timer that is earlier than new_min_deadline.  However,
642          grpc_timer_init() will block on the master_lock before it can call
643          set_min_deadline, so this one will complete first and then the Addtimer
644          will reduce the min_deadline (perhaps unnecessarily). */
645       g_shard_queue[0]->min_deadline = new_min_deadline;
646       note_deadline_change(g_shard_queue[0]);
647     }
648 
649     if (next) {
650       *next = GPR_MIN(*next, g_shard_queue[0]->min_deadline);
651     }
652 
653 #if GPR_ARCH_64
654     // TODO(sreek): Using c-style cast here. static_cast<> gives an error (on
655     // mac platforms complaining that gpr_atm* is (long *) while
656     // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
657     // safe since we know that both are pointer types and 64-bit wide
658     gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
659                              g_shard_queue[0]->min_deadline);
660 #else
661     // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
662     // types (like grpc_millis). So all reads and writes to
663     // g_shared_mutables.min_timer are done under g_shared_mutables.mu
664     g_shared_mutables.min_timer = g_shard_queue[0]->min_deadline;
665 #endif
666     gpr_mu_unlock(&g_shared_mutables.mu);
667     gpr_spinlock_unlock(&g_shared_mutables.checker_mu);
668   }
669 
670   GRPC_ERROR_UNREF(error);
671 
672   return result;
673 }
674 
timer_check(grpc_millis * next)675 static grpc_timer_check_result timer_check(grpc_millis* next) {
676   // prelude
677   grpc_millis now = grpc_core::ExecCtx::Get()->Now();
678 
679 #if GPR_ARCH_64
680   /* fetch from a thread-local first: this avoids contention on a globally
681      mutable cacheline in the common case */
682   grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer);
683 #else
684   // On 32-bit systems, we currently do not have thread local support for 64-bit
685   // types. In this case, directly read from g_shared_mutables.min_timer.
686   // Also, note that on 32-bit systems, gpr_atm_no_barrier_store does not work
687   // on 64-bit types (like grpc_millis). So all reads and writes to
688   // g_shared_mutables.min_timer are done under g_shared_mutables.mu
689   gpr_mu_lock(&g_shared_mutables.mu);
690   grpc_millis min_timer = g_shared_mutables.min_timer;
691   gpr_mu_unlock(&g_shared_mutables.mu);
692 #endif
693 
694   if (now < min_timer) {
695     if (next != nullptr) {
696       *next = GPR_MIN(*next, min_timer);
697     }
698     if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
699       gpr_log(GPR_INFO, "TIMER CHECK SKIP: now=%" PRId64 " min_timer=%" PRId64,
700               now, min_timer);
701     }
702     return GRPC_TIMERS_CHECKED_AND_EMPTY;
703   }
704 
705   grpc_error* shutdown_error =
706       now != GRPC_MILLIS_INF_FUTURE
707           ? GRPC_ERROR_NONE
708           : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system");
709 
710   // tracing
711   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
712     std::string next_str;
713     if (next == nullptr) {
714       next_str = "NULL";
715     } else {
716       next_str = absl::StrCat(*next);
717     }
718 #if GPR_ARCH_64
719     gpr_log(GPR_INFO,
720             "TIMER CHECK BEGIN: now=%" PRId64 " next=%s tls_min=%" PRId64
721             " glob_min=%" PRId64,
722             now, next_str.c_str(), min_timer,
723             static_cast<grpc_millis>(gpr_atm_no_barrier_load(
724                 (gpr_atm*)(&g_shared_mutables.min_timer))));
725 #else
726     gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s min=%" PRId64,
727             now, next_str.c_str(), min_timer);
728 #endif
729   }
730   // actual code
731   grpc_timer_check_result r =
732       run_some_expired_timers(now, next, shutdown_error);
733   // tracing
734   if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
735     std::string next_str;
736     if (next == nullptr) {
737       next_str = "NULL";
738     } else {
739       next_str = absl::StrCat(*next);
740     }
741     gpr_log(GPR_INFO, "TIMER CHECK END: r=%d; next=%s", r, next_str.c_str());
742   }
743   return r;
744 }
745 
746 grpc_timer_vtable grpc_generic_timer_vtable = {
747     timer_init,      timer_cancel,        timer_check,
748     timer_list_init, timer_list_shutdown, timer_consume_kick};
749