• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #include <inttypes.h>
24 
25 #include "src/core/lib/iomgr/timer.h"
26 
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/cpu.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/string_util.h>
31 #include <grpc/support/sync.h>
32 
33 #include "src/core/lib/debug/trace.h"
34 #include "src/core/lib/gpr/spinlock.h"
35 #include "src/core/lib/gpr/tls.h"
36 #include "src/core/lib/gpr/useful.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/iomgr/time_averaged_stats.h"
39 #include "src/core/lib/iomgr/timer_heap.h"
40 
41 #define INVALID_HEAP_INDEX 0xffffffffu
42 
43 #define ADD_DEADLINE_SCALE 0.33
44 #define MIN_QUEUE_WINDOW_DURATION 0.01
45 #define MAX_QUEUE_WINDOW_DURATION 1
46 
47 grpc_core::TraceFlag grpc_timer_trace(false, "timer");
48 grpc_core::TraceFlag grpc_timer_check_trace(false, "timer_check");
49 
50 /* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
51  * deadlines earlier than 'queue_deadline" cap are maintained in the heap and
52  * others are maintained in the list (unordered). This helps to keep the number
53  * of elements in the heap low.
54  *
55  * The 'queue_deadline_cap' gets recomputed periodically based on the timer
56  * stats maintained in 'stats' and the relevant timers are then moved from the
57  * 'list' to 'heap'
58  */
59 typedef struct {
60   gpr_mu mu;
61   grpc_time_averaged_stats stats;
62   /* All and only timers with deadlines <= this will be in the heap. */
63   grpc_millis queue_deadline_cap;
64   /* The deadline of the next timer due in this shard */
65   grpc_millis min_deadline;
66   /* Index of this timer_shard in the g_shard_queue */
67   uint32_t shard_queue_index;
68   /* This holds all timers with deadlines < queue_deadline_cap. Timers in this
69      list have the top bit of their deadline set to 0. */
70   grpc_timer_heap heap;
71   /* This holds timers whose deadline is >= queue_deadline_cap. */
72   grpc_timer list;
73 } timer_shard;
74 
75 static size_t g_num_shards;
76 
77 /* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
78  * is hashed to select the timer shard to add the timer to */
79 static timer_shard* g_shards;
80 
81 /* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
82  * the deadline of the next timer in each shard).
83  * Access to this is protected by g_shared_mutables.mu */
84 static timer_shard** g_shard_queue;
85 
86 #ifndef NDEBUG
87 
88 /* == Hash table for duplicate timer detection == */
89 
90 #define NUM_HASH_BUCKETS 1009 /* Prime number close to 1000 */
91 
92 static gpr_mu g_hash_mu[NUM_HASH_BUCKETS]; /* One mutex per bucket */
93 static grpc_timer* g_timer_ht[NUM_HASH_BUCKETS] = {nullptr};
94 
init_timer_ht()95 static void init_timer_ht() {
96   for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
97     gpr_mu_init(&g_hash_mu[i]);
98   }
99 }
100 
destroy_timer_ht()101 static void destroy_timer_ht() {
102   for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
103     gpr_mu_destroy(&g_hash_mu[i]);
104   }
105 }
106 
is_in_ht(grpc_timer * t)107 static bool is_in_ht(grpc_timer* t) {
108   size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
109 
110   gpr_mu_lock(&g_hash_mu[i]);
111   grpc_timer* p = g_timer_ht[i];
112   while (p != nullptr && p != t) {
113     p = p->hash_table_next;
114   }
115   gpr_mu_unlock(&g_hash_mu[i]);
116 
117   return (p == t);
118 }
119 
add_to_ht(grpc_timer * t)120 static void add_to_ht(grpc_timer* t) {
121   GPR_ASSERT(!t->hash_table_next);
122   size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
123 
124   gpr_mu_lock(&g_hash_mu[i]);
125   grpc_timer* p = g_timer_ht[i];
126   while (p != nullptr && p != t) {
127     p = p->hash_table_next;
128   }
129 
130   if (p == t) {
131     grpc_closure* c = t->closure;
132     gpr_log(GPR_ERROR,
133             "** Duplicate timer (%p) being added. Closure: (%p), created at: "
134             "(%s:%d), scheduled at: (%s:%d) **",
135             t, c, c->file_created, c->line_created, c->file_initiated,
136             c->line_initiated);
137     abort();
138   }
139 
140   /* Timer not present in the bucket. Insert at head of the list */
141   t->hash_table_next = g_timer_ht[i];
142   g_timer_ht[i] = t;
143   gpr_mu_unlock(&g_hash_mu[i]);
144 }
145 
remove_from_ht(grpc_timer * t)146 static void remove_from_ht(grpc_timer* t) {
147   size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
148   bool removed = false;
149 
150   gpr_mu_lock(&g_hash_mu[i]);
151   if (g_timer_ht[i] == t) {
152     g_timer_ht[i] = g_timer_ht[i]->hash_table_next;
153     removed = true;
154   } else if (g_timer_ht[i] != nullptr) {
155     grpc_timer* p = g_timer_ht[i];
156     while (p->hash_table_next != nullptr && p->hash_table_next != t) {
157       p = p->hash_table_next;
158     }
159 
160     if (p->hash_table_next == t) {
161       p->hash_table_next = t->hash_table_next;
162       removed = true;
163     }
164   }
165   gpr_mu_unlock(&g_hash_mu[i]);
166 
167   if (!removed) {
168     grpc_closure* c = t->closure;
169     gpr_log(GPR_ERROR,
170             "** Removing timer (%p) that is not added to hash table. Closure "
171             "(%p), created at: (%s:%d), scheduled at: (%s:%d) **",
172             t, c, c->file_created, c->line_created, c->file_initiated,
173             c->line_initiated);
174     abort();
175   }
176 
177   t->hash_table_next = nullptr;
178 }
179 
180 /* If a timer is added to a timer shard (either heap or a list), it cannot
181  * be pending. A timer is added to hash table only-if it is added to the
182  * timer shard.
183  * Therefore, if timer->pending is false, it cannot be in hash table */
validate_non_pending_timer(grpc_timer * t)184 static void validate_non_pending_timer(grpc_timer* t) {
185   if (!t->pending && is_in_ht(t)) {
186     grpc_closure* c = t->closure;
187     gpr_log(GPR_ERROR,
188             "** gpr_timer_cancel() called on a non-pending timer (%p) which "
189             "is in the hash table. Closure: (%p), created at: (%s:%d), "
190             "scheduled at: (%s:%d) **",
191             t, c, c->file_created, c->line_created, c->file_initiated,
192             c->line_initiated);
193     abort();
194   }
195 }
196 
197 #define INIT_TIMER_HASH_TABLE() init_timer_ht()
198 #define DESTROY_TIMER_HASH_TABLE() destroy_timer_ht()
199 #define ADD_TO_HASH_TABLE(t) add_to_ht((t))
200 #define REMOVE_FROM_HASH_TABLE(t) remove_from_ht((t))
201 #define VALIDATE_NON_PENDING_TIMER(t) validate_non_pending_timer((t))
202 
203 #else
204 
205 #define INIT_TIMER_HASH_TABLE()
206 #define DESTROY_TIMER_HASH_TABLE()
207 #define ADD_TO_HASH_TABLE(t)
208 #define REMOVE_FROM_HASH_TABLE(t)
209 #define VALIDATE_NON_PENDING_TIMER(t)
210 
211 #endif
212 
213 #if GPR_ARCH_64
214 /* NOTE: TODO(sreek) - Currently the thread local storage support in grpc is
215    for intptr_t which means on 32-bit machines it is not wide enough to hold
216    grpc_millis which is 64-bit. Adding thread local support for 64 bit values
217    is a lot of work for very little gain. So we are currently restricting this
218    optimization to only 64 bit machines */
219 
220 /* Thread local variable that stores the deadline of the next timer the thread
221  * has last-seen. This is an optimization to prevent the thread from checking
222  * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
223  * an expensive operation) */
224 GPR_TLS_DECL(g_last_seen_min_timer);
225 #endif
226 
227 struct shared_mutables {
228   /* The deadline of the next timer due across all timer shards */
229   grpc_millis min_timer;
230   /* Allow only one run_some_expired_timers at once */
231   gpr_spinlock checker_mu;
232   bool initialized;
233   /* Protects g_shard_queue (and the shared_mutables struct itself) */
234   gpr_mu mu;
235 } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
236 
237 static struct shared_mutables g_shared_mutables;
238 
saturating_add(grpc_millis a,grpc_millis b)239 static grpc_millis saturating_add(grpc_millis a, grpc_millis b) {
240   if (a > GRPC_MILLIS_INF_FUTURE - b) {
241     return GRPC_MILLIS_INF_FUTURE;
242   }
243   return a + b;
244 }
245 
246 static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
247                                                        grpc_millis* next,
248                                                        grpc_error* error);
249 
compute_min_deadline(timer_shard * shard)250 static grpc_millis compute_min_deadline(timer_shard* shard) {
251   return grpc_timer_heap_is_empty(&shard->heap)
252              ? saturating_add(shard->queue_deadline_cap, 1)
253              : grpc_timer_heap_top(&shard->heap)->deadline;
254 }
255 
timer_list_init()256 static void timer_list_init() {
257   uint32_t i;
258 
259   g_num_shards = GPR_MIN(1, 2 * gpr_cpu_num_cores());
260   g_shards =
261       static_cast<timer_shard*>(gpr_zalloc(g_num_shards * sizeof(*g_shards)));
262   g_shard_queue = static_cast<timer_shard**>(
263       gpr_zalloc(g_num_shards * sizeof(*g_shard_queue)));
264 
265   g_shared_mutables.initialized = true;
266   g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
267   gpr_mu_init(&g_shared_mutables.mu);
268   g_shared_mutables.min_timer = grpc_core::ExecCtx::Get()->Now();
269 
270 #if GPR_ARCH_64
271   gpr_tls_init(&g_last_seen_min_timer);
272   gpr_tls_set(&g_last_seen_min_timer, 0);
273 #endif
274 
275   for (i = 0; i < g_num_shards; i++) {
276     timer_shard* shard = &g_shards[i];
277     gpr_mu_init(&shard->mu);
278     grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
279                                   0.5);
280     shard->queue_deadline_cap = g_shared_mutables.min_timer;
281     shard->shard_queue_index = i;
282     grpc_timer_heap_init(&shard->heap);
283     shard->list.next = shard->list.prev = &shard->list;
284     shard->min_deadline = compute_min_deadline(shard);
285     g_shard_queue[i] = shard;
286   }
287 
288   INIT_TIMER_HASH_TABLE();
289 }
290 
timer_list_shutdown()291 static void timer_list_shutdown() {
292   size_t i;
293   run_some_expired_timers(
294       GRPC_MILLIS_INF_FUTURE, nullptr,
295       GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
296   for (i = 0; i < g_num_shards; i++) {
297     timer_shard* shard = &g_shards[i];
298     gpr_mu_destroy(&shard->mu);
299     grpc_timer_heap_destroy(&shard->heap);
300   }
301   gpr_mu_destroy(&g_shared_mutables.mu);
302 
303 #if GPR_ARCH_64
304   gpr_tls_destroy(&g_last_seen_min_timer);
305 #endif
306 
307   gpr_free(g_shards);
308   gpr_free(g_shard_queue);
309   g_shared_mutables.initialized = false;
310 
311   DESTROY_TIMER_HASH_TABLE();
312 }
313 
314 /* returns true if the first element in the list */
list_join(grpc_timer * head,grpc_timer * timer)315 static void list_join(grpc_timer* head, grpc_timer* timer) {
316   timer->next = head;
317   timer->prev = head->prev;
318   timer->next->prev = timer->prev->next = timer;
319 }
320 
list_remove(grpc_timer * timer)321 static void list_remove(grpc_timer* timer) {
322   timer->next->prev = timer->prev;
323   timer->prev->next = timer->next;
324 }
325 
swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index)326 static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
327   timer_shard* temp;
328   temp = g_shard_queue[first_shard_queue_index];
329   g_shard_queue[first_shard_queue_index] =
330       g_shard_queue[first_shard_queue_index + 1];
331   g_shard_queue[first_shard_queue_index + 1] = temp;
332   g_shard_queue[first_shard_queue_index]->shard_queue_index =
333       first_shard_queue_index;
334   g_shard_queue[first_shard_queue_index + 1]->shard_queue_index =
335       first_shard_queue_index + 1;
336 }
337 
note_deadline_change(timer_shard * shard)338 static void note_deadline_change(timer_shard* shard) {
339   while (shard->shard_queue_index > 0 &&
340          shard->min_deadline <
341              g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
342     swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
343   }
344   while (shard->shard_queue_index < g_num_shards - 1 &&
345          shard->min_deadline >
346              g_shard_queue[shard->shard_queue_index + 1]->min_deadline) {
347     swap_adjacent_shards_in_queue(shard->shard_queue_index);
348   }
349 }
350 
grpc_timer_init_unset(grpc_timer * timer)351 void grpc_timer_init_unset(grpc_timer* timer) { timer->pending = false; }
352 
timer_init(grpc_timer * timer,grpc_millis deadline,grpc_closure * closure)353 static void timer_init(grpc_timer* timer, grpc_millis deadline,
354                        grpc_closure* closure) {
355   int is_first_timer = 0;
356   timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
357   timer->closure = closure;
358   timer->deadline = deadline;
359 
360 #ifndef NDEBUG
361   timer->hash_table_next = nullptr;
362 #endif
363 
364   if (grpc_timer_trace.enabled()) {
365     gpr_log(GPR_INFO, "TIMER %p: SET %" PRId64 " now %" PRId64 " call %p[%p]",
366             timer, deadline, grpc_core::ExecCtx::Get()->Now(), closure,
367             closure->cb);
368   }
369 
370   if (!g_shared_mutables.initialized) {
371     timer->pending = false;
372     GRPC_CLOSURE_SCHED(timer->closure,
373                        GRPC_ERROR_CREATE_FROM_STATIC_STRING(
374                            "Attempt to create timer before initialization"));
375     return;
376   }
377 
378   gpr_mu_lock(&shard->mu);
379   timer->pending = true;
380   grpc_millis now = grpc_core::ExecCtx::Get()->Now();
381   if (deadline <= now) {
382     timer->pending = false;
383     GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE);
384     gpr_mu_unlock(&shard->mu);
385     /* early out */
386     return;
387   }
388 
389   grpc_time_averaged_stats_add_sample(
390       &shard->stats, static_cast<double>(deadline - now) / 1000.0);
391 
392   ADD_TO_HASH_TABLE(timer);
393 
394   if (deadline < shard->queue_deadline_cap) {
395     is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
396   } else {
397     timer->heap_index = INVALID_HEAP_INDEX;
398     list_join(&shard->list, timer);
399   }
400   if (grpc_timer_trace.enabled()) {
401     gpr_log(GPR_INFO,
402             "  .. add to shard %d with queue_deadline_cap=%" PRId64
403             " => is_first_timer=%s",
404             static_cast<int>(shard - g_shards), shard->queue_deadline_cap,
405             is_first_timer ? "true" : "false");
406   }
407   gpr_mu_unlock(&shard->mu);
408 
409   /* Deadline may have decreased, we need to adjust the master queue.  Note
410      that there is a potential racy unlocked region here.  There could be a
411      reordering of multiple grpc_timer_init calls, at this point, but the < test
412      below should ensure that we err on the side of caution.  There could
413      also be a race with grpc_timer_check, which might beat us to the lock.  In
414      that case, it is possible that the timer that we added will have already
415      run by the time we hold the lock, but that too is a safe error.
416      Finally, it's possible that the grpc_timer_check that intervened failed to
417      trigger the new timer because the min_deadline hadn't yet been reduced.
418      In that case, the timer will simply have to wait for the next
419      grpc_timer_check. */
420   if (is_first_timer) {
421     gpr_mu_lock(&g_shared_mutables.mu);
422     if (grpc_timer_trace.enabled()) {
423       gpr_log(GPR_INFO, "  .. old shard min_deadline=%" PRId64,
424               shard->min_deadline);
425     }
426     if (deadline < shard->min_deadline) {
427       grpc_millis old_min_deadline = g_shard_queue[0]->min_deadline;
428       shard->min_deadline = deadline;
429       note_deadline_change(shard);
430       if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
431 #if GPR_ARCH_64
432         // TODO: sreek - Using c-style cast here. static_cast<> gives an error
433         // (on mac platforms complaining that gpr_atm* is (long *) while
434         // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
435         // safe since we know that both are pointer types and 64-bit wide.
436         gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
437                                  deadline);
438 #else
439         // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
440         // types (like grpc_millis). So all reads and writes to
441         // g_shared_mutables.min_timer varialbe under g_shared_mutables.mu
442         g_shared_mutables.min_timer = deadline;
443 #endif
444         grpc_kick_poller();
445       }
446     }
447     gpr_mu_unlock(&g_shared_mutables.mu);
448   }
449 }
450 
timer_consume_kick(void)451 static void timer_consume_kick(void) {
452 #if GPR_ARCH_64
453   /* Force re-evaluation of last seen min */
454   gpr_tls_set(&g_last_seen_min_timer, 0);
455 #endif
456 }
457 
timer_cancel(grpc_timer * timer)458 static void timer_cancel(grpc_timer* timer) {
459   if (!g_shared_mutables.initialized) {
460     /* must have already been cancelled, also the shard mutex is invalid */
461     return;
462   }
463 
464   timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
465   gpr_mu_lock(&shard->mu);
466   if (grpc_timer_trace.enabled()) {
467     gpr_log(GPR_INFO, "TIMER %p: CANCEL pending=%s", timer,
468             timer->pending ? "true" : "false");
469   }
470 
471   if (timer->pending) {
472     REMOVE_FROM_HASH_TABLE(timer);
473 
474     GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_CANCELLED);
475     timer->pending = false;
476     if (timer->heap_index == INVALID_HEAP_INDEX) {
477       list_remove(timer);
478     } else {
479       grpc_timer_heap_remove(&shard->heap, timer);
480     }
481   } else {
482     VALIDATE_NON_PENDING_TIMER(timer);
483   }
484   gpr_mu_unlock(&shard->mu);
485 }
486 
487 /* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
488    all relevant timers in shard->list (i.e timers with deadlines earlier than
489    'queue_deadline_cap') into into shard->heap.
490    Returns 'true' if shard->heap has atleast ONE element
491    REQUIRES: shard->mu locked */
refill_heap(timer_shard * shard,grpc_millis now)492 static int refill_heap(timer_shard* shard, grpc_millis now) {
493   /* Compute the new queue window width and bound by the limits: */
494   double computed_deadline_delta =
495       grpc_time_averaged_stats_update_average(&shard->stats) *
496       ADD_DEADLINE_SCALE;
497   double deadline_delta =
498       GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION,
499                 MAX_QUEUE_WINDOW_DURATION);
500   grpc_timer *timer, *next;
501 
502   /* Compute the new cap and put all timers under it into the queue: */
503   shard->queue_deadline_cap =
504       saturating_add(GPR_MAX(now, shard->queue_deadline_cap),
505                      static_cast<grpc_millis>(deadline_delta * 1000.0));
506 
507   if (grpc_timer_check_trace.enabled()) {
508     gpr_log(GPR_INFO, "  .. shard[%d]->queue_deadline_cap --> %" PRId64,
509             static_cast<int>(shard - g_shards), shard->queue_deadline_cap);
510   }
511   for (timer = shard->list.next; timer != &shard->list; timer = next) {
512     next = timer->next;
513 
514     if (timer->deadline < shard->queue_deadline_cap) {
515       if (grpc_timer_check_trace.enabled()) {
516         gpr_log(GPR_INFO, "  .. add timer with deadline %" PRId64 " to heap",
517                 timer->deadline);
518       }
519       list_remove(timer);
520       grpc_timer_heap_add(&shard->heap, timer);
521     }
522   }
523   return !grpc_timer_heap_is_empty(&shard->heap);
524 }
525 
526 /* This pops the next non-cancelled timer with deadline <= now from the
527    queue, or returns NULL if there isn't one.
528    REQUIRES: shard->mu locked */
pop_one(timer_shard * shard,grpc_millis now)529 static grpc_timer* pop_one(timer_shard* shard, grpc_millis now) {
530   grpc_timer* timer;
531   for (;;) {
532     if (grpc_timer_check_trace.enabled()) {
533       gpr_log(GPR_INFO, "  .. shard[%d]: heap_empty=%s",
534               static_cast<int>(shard - g_shards),
535               grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false");
536     }
537     if (grpc_timer_heap_is_empty(&shard->heap)) {
538       if (now < shard->queue_deadline_cap) return nullptr;
539       if (!refill_heap(shard, now)) return nullptr;
540     }
541     timer = grpc_timer_heap_top(&shard->heap);
542     if (grpc_timer_check_trace.enabled()) {
543       gpr_log(GPR_INFO,
544               "  .. check top timer deadline=%" PRId64 " now=%" PRId64,
545               timer->deadline, now);
546     }
547     if (timer->deadline > now) return nullptr;
548     if (grpc_timer_trace.enabled()) {
549       gpr_log(GPR_INFO, "TIMER %p: FIRE %" PRId64 "ms late via %s scheduler",
550               timer, now - timer->deadline,
551               timer->closure->scheduler->vtable->name);
552     }
553     timer->pending = false;
554     grpc_timer_heap_pop(&shard->heap);
555     return timer;
556   }
557 }
558 
559 /* REQUIRES: shard->mu unlocked */
pop_timers(timer_shard * shard,grpc_millis now,grpc_millis * new_min_deadline,grpc_error * error)560 static size_t pop_timers(timer_shard* shard, grpc_millis now,
561                          grpc_millis* new_min_deadline, grpc_error* error) {
562   size_t n = 0;
563   grpc_timer* timer;
564   gpr_mu_lock(&shard->mu);
565   while ((timer = pop_one(shard, now))) {
566     REMOVE_FROM_HASH_TABLE(timer);
567     GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_REF(error));
568     n++;
569   }
570   *new_min_deadline = compute_min_deadline(shard);
571   gpr_mu_unlock(&shard->mu);
572   if (grpc_timer_check_trace.enabled()) {
573     gpr_log(GPR_INFO, "  .. shard[%d] popped %" PRIdPTR,
574             static_cast<int>(shard - g_shards), n);
575   }
576   return n;
577 }
578 
run_some_expired_timers(grpc_millis now,grpc_millis * next,grpc_error * error)579 static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
580                                                        grpc_millis* next,
581                                                        grpc_error* error) {
582   grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED;
583 
584 #if GPR_ARCH_64
585   // TODO: sreek - Using c-style cast here. static_cast<> gives an error (on
586   // mac platforms complaining that gpr_atm* is (long *) while
587   // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
588   // safe since we know that both are pointer types and 64-bit wide
589   grpc_millis min_timer = static_cast<grpc_millis>(
590       gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer)));
591   gpr_tls_set(&g_last_seen_min_timer, min_timer);
592 #else
593   // On 32-bit systems, gpr_atm_no_barrier_load does not work on 64-bit types
594   // (like grpc_millis). So all reads and writes to g_shared_mutables.min_timer
595   // are done under g_shared_mutables.mu
596   gpr_mu_lock(&g_shared_mutables.mu);
597   grpc_millis min_timer = g_shared_mutables.min_timer;
598   gpr_mu_unlock(&g_shared_mutables.mu);
599 #endif
600   if (now < min_timer) {
601     if (next != nullptr) *next = GPR_MIN(*next, min_timer);
602     return GRPC_TIMERS_CHECKED_AND_EMPTY;
603   }
604 
605   if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) {
606     gpr_mu_lock(&g_shared_mutables.mu);
607     result = GRPC_TIMERS_CHECKED_AND_EMPTY;
608 
609     if (grpc_timer_check_trace.enabled()) {
610       gpr_log(GPR_INFO, "  .. shard[%d]->min_deadline = %" PRId64,
611               static_cast<int>(g_shard_queue[0] - g_shards),
612               g_shard_queue[0]->min_deadline);
613     }
614 
615     while (g_shard_queue[0]->min_deadline < now ||
616            (now != GRPC_MILLIS_INF_FUTURE &&
617             g_shard_queue[0]->min_deadline == now)) {
618       grpc_millis new_min_deadline;
619 
620       /* For efficiency, we pop as many available timers as we can from the
621          shard.  This may violate perfect timer deadline ordering, but that
622          shouldn't be a big deal because we don't make ordering guarantees. */
623       if (pop_timers(g_shard_queue[0], now, &new_min_deadline, error) > 0) {
624         result = GRPC_TIMERS_FIRED;
625       }
626 
627       if (grpc_timer_check_trace.enabled()) {
628         gpr_log(GPR_INFO,
629                 "  .. result --> %d"
630                 ", shard[%d]->min_deadline %" PRId64 " --> %" PRId64
631                 ", now=%" PRId64,
632                 result, static_cast<int>(g_shard_queue[0] - g_shards),
633                 g_shard_queue[0]->min_deadline, new_min_deadline, now);
634       }
635 
636       /* An grpc_timer_init() on the shard could intervene here, adding a new
637          timer that is earlier than new_min_deadline.  However,
638          grpc_timer_init() will block on the master_lock before it can call
639          set_min_deadline, so this one will complete first and then the Addtimer
640          will reduce the min_deadline (perhaps unnecessarily). */
641       g_shard_queue[0]->min_deadline = new_min_deadline;
642       note_deadline_change(g_shard_queue[0]);
643     }
644 
645     if (next) {
646       *next = GPR_MIN(*next, g_shard_queue[0]->min_deadline);
647     }
648 
649 #if GPR_ARCH_64
650     // TODO: sreek - Using c-style cast here. static_cast<> gives an error (on
651     // mac platforms complaining that gpr_atm* is (long *) while
652     // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
653     // safe since we know that both are pointer types and 64-bit wide
654     gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
655                              g_shard_queue[0]->min_deadline);
656 #else
657     // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
658     // types (like grpc_millis). So all reads and writes to
659     // g_shared_mutables.min_timer are done under g_shared_mutables.mu
660     g_shared_mutables.min_timer = g_shard_queue[0]->min_deadline;
661 #endif
662     gpr_mu_unlock(&g_shared_mutables.mu);
663     gpr_spinlock_unlock(&g_shared_mutables.checker_mu);
664   }
665 
666   GRPC_ERROR_UNREF(error);
667 
668   return result;
669 }
670 
timer_check(grpc_millis * next)671 static grpc_timer_check_result timer_check(grpc_millis* next) {
672   // prelude
673   grpc_millis now = grpc_core::ExecCtx::Get()->Now();
674 
675 #if GPR_ARCH_64
676   /* fetch from a thread-local first: this avoids contention on a globally
677      mutable cacheline in the common case */
678   grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer);
679 #else
680   // On 32-bit systems, we currently do not have thread local support for 64-bit
681   // types. In this case, directly read from g_shared_mutables.min_timer.
682   // Also, note that on 32-bit systems, gpr_atm_no_barrier_store does not work
683   // on 64-bit types (like grpc_millis). So all reads and writes to
684   // g_shared_mutables.min_timer are done under g_shared_mutables.mu
685   gpr_mu_lock(&g_shared_mutables.mu);
686   grpc_millis min_timer = g_shared_mutables.min_timer;
687   gpr_mu_unlock(&g_shared_mutables.mu);
688 #endif
689 
690   if (now < min_timer) {
691     if (next != nullptr) {
692       *next = GPR_MIN(*next, min_timer);
693     }
694     if (grpc_timer_check_trace.enabled()) {
695       gpr_log(GPR_INFO, "TIMER CHECK SKIP: now=%" PRId64 " min_timer=%" PRId64,
696               now, min_timer);
697     }
698     return GRPC_TIMERS_CHECKED_AND_EMPTY;
699   }
700 
701   grpc_error* shutdown_error =
702       now != GRPC_MILLIS_INF_FUTURE
703           ? GRPC_ERROR_NONE
704           : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system");
705 
706   // tracing
707   if (grpc_timer_check_trace.enabled()) {
708     char* next_str;
709     if (next == nullptr) {
710       next_str = gpr_strdup("NULL");
711     } else {
712       gpr_asprintf(&next_str, "%" PRId64, *next);
713     }
714 #if GPR_ARCH_64
715     gpr_log(GPR_INFO,
716             "TIMER CHECK BEGIN: now=%" PRId64 " next=%s tls_min=%" PRId64
717             " glob_min=%" PRId64,
718             now, next_str, min_timer,
719             static_cast<grpc_millis>(gpr_atm_no_barrier_load(
720                 (gpr_atm*)(&g_shared_mutables.min_timer))));
721 #else
722     gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s min=%" PRId64,
723             now, next_str, min_timer);
724 #endif
725     gpr_free(next_str);
726   }
727   // actual code
728   grpc_timer_check_result r =
729       run_some_expired_timers(now, next, shutdown_error);
730   // tracing
731   if (grpc_timer_check_trace.enabled()) {
732     char* next_str;
733     if (next == nullptr) {
734       next_str = gpr_strdup("NULL");
735     } else {
736       gpr_asprintf(&next_str, "%" PRId64, *next);
737     }
738     gpr_log(GPR_INFO, "TIMER CHECK END: r=%d; next=%s", r, next_str);
739     gpr_free(next_str);
740   }
741   return r;
742 }
743 
744 grpc_timer_vtable grpc_generic_timer_vtable = {
745     timer_init,      timer_cancel,        timer_check,
746     timer_list_init, timer_list_shutdown, timer_consume_kick};
747