• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/alloc.h>
20 #include <grpc/support/cpu.h>
21 #include <grpc/support/port_platform.h>
22 #include <grpc/support/sync.h>
23 #include <inttypes.h>
24 
25 #include <string>
26 
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31 #include "src/core/lib/debug/trace.h"
32 #include "src/core/lib/iomgr/exec_ctx.h"
33 #include "src/core/lib/iomgr/port.h"
34 #include "src/core/lib/iomgr/timer.h"
35 #include "src/core/lib/iomgr/timer_heap.h"
36 #include "src/core/util/crash.h"
37 #include "src/core/util/manual_constructor.h"
38 #include "src/core/util/spinlock.h"
39 #include "src/core/util/time.h"
40 #include "src/core/util/time_averaged_stats.h"
41 #include "src/core/util/useful.h"
42 
43 #define INVALID_HEAP_INDEX 0xffffffffu
44 
45 #define ADD_DEADLINE_SCALE 0.33
46 #define MIN_QUEUE_WINDOW_DURATION 0.01
47 #define MAX_QUEUE_WINDOW_DURATION 1.0
48 
49 // A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
50 // deadlines earlier than 'queue_deadline_cap' are maintained in the heap and
51 // others are maintained in the list (unordered). This helps to keep the number
52 // of elements in the heap low.
53 //
54 // The 'queue_deadline_cap' gets recomputed periodically based on the timer
55 // stats maintained in 'stats' and the relevant timers are then moved from the
56 // 'list' to 'heap'.
57 //
58 struct timer_shard {
59   gpr_mu mu;
60   grpc_core::ManualConstructor<grpc_core::TimeAveragedStats> stats;
61   // All and only timers with deadlines < this will be in the heap.
62   grpc_core::Timestamp queue_deadline_cap;
63   // The deadline of the next timer due in this shard.
64   grpc_core::Timestamp min_deadline;
65   // Index of this timer_shard in the g_shard_queue.
66   uint32_t shard_queue_index;
67   // This holds all timers with deadlines < queue_deadline_cap. Timers in this
68   // list have the top bit of their deadline set to 0.
69   grpc_timer_heap heap;
70   // This holds timers whose deadline is >= queue_deadline_cap.
71   grpc_timer list;
72 };
73 static size_t g_num_shards;
74 
75 // Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
76 // is hashed to select the timer shard to add the timer to
77 static timer_shard* g_shards;
78 
79 // Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
80 // the deadline of the next timer in each shard).
81 // Access to this is protected by g_shared_mutables.mu
82 static timer_shard** g_shard_queue;
83 
84 #ifndef NDEBUG
85 
86 // == DEBUG ONLY: hash table for duplicate timer detection ==
87 
88 #define NUM_HASH_BUCKETS 1009  // Prime number close to 1000
89 
90 static gpr_mu g_hash_mu[NUM_HASH_BUCKETS];  // One mutex per bucket
91 static grpc_timer* g_timer_ht[NUM_HASH_BUCKETS] = {nullptr};
92 
init_timer_ht()93 static void init_timer_ht() {
94   for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
95     gpr_mu_init(&g_hash_mu[i]);
96   }
97 }
98 
destroy_timer_ht()99 static void destroy_timer_ht() {
100   for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
101     gpr_mu_destroy(&g_hash_mu[i]);
102   }
103 }
104 
is_in_ht(grpc_timer * t)105 static bool is_in_ht(grpc_timer* t) {
106   size_t i = grpc_core::HashPointer(t, NUM_HASH_BUCKETS);
107 
108   gpr_mu_lock(&g_hash_mu[i]);
109   grpc_timer* p = g_timer_ht[i];
110   while (p != nullptr && p != t) {
111     p = p->hash_table_next;
112   }
113   gpr_mu_unlock(&g_hash_mu[i]);
114 
115   return (p == t);
116 }
117 
add_to_ht(grpc_timer * t)118 static void add_to_ht(grpc_timer* t) {
119   CHECK(!t->hash_table_next);
120   size_t i = grpc_core::HashPointer(t, NUM_HASH_BUCKETS);
121 
122   gpr_mu_lock(&g_hash_mu[i]);
123   grpc_timer* p = g_timer_ht[i];
124   while (p != nullptr && p != t) {
125     p = p->hash_table_next;
126   }
127 
128   if (p == t) {
129     grpc_closure* c = t->closure;
130     grpc_core::Crash(absl::StrFormat(
131         "** Duplicate timer (%p) being added. Closure: (%p), created at: "
132         "(%s:%d), scheduled at: (%s:%d) **",
133         t, c, c->file_created, c->line_created, c->file_initiated,
134         c->line_initiated));
135   }
136 
137   // Timer not present in the bucket. Insert at head of the list
138   t->hash_table_next = g_timer_ht[i];
139   g_timer_ht[i] = t;
140   gpr_mu_unlock(&g_hash_mu[i]);
141 }
142 
remove_from_ht(grpc_timer * t)143 static void remove_from_ht(grpc_timer* t) {
144   size_t i = grpc_core::HashPointer(t, NUM_HASH_BUCKETS);
145   bool removed = false;
146 
147   gpr_mu_lock(&g_hash_mu[i]);
148   if (g_timer_ht[i] == t) {
149     g_timer_ht[i] = g_timer_ht[i]->hash_table_next;
150     removed = true;
151   } else if (g_timer_ht[i] != nullptr) {
152     grpc_timer* p = g_timer_ht[i];
153     while (p->hash_table_next != nullptr && p->hash_table_next != t) {
154       p = p->hash_table_next;
155     }
156 
157     if (p->hash_table_next == t) {
158       p->hash_table_next = t->hash_table_next;
159       removed = true;
160     }
161   }
162   gpr_mu_unlock(&g_hash_mu[i]);
163 
164   if (!removed) {
165     grpc_closure* c = t->closure;
166     grpc_core::Crash(absl::StrFormat(
167         "** Removing timer (%p) that is not added to hash table. Closure "
168         "(%p), created at: (%s:%d), scheduled at: (%s:%d) **",
169         t, c, c->file_created, c->line_created, c->file_initiated,
170         c->line_initiated));
171   }
172 
173   t->hash_table_next = nullptr;
174 }
175 
176 // If a timer is added to a timer shard (either heap or a list), it must
177 // be pending. A timer is added to hash table only-if it is added to the
178 // timer shard.
179 // Therefore, if timer->pending is false, it cannot be in hash table
validate_non_pending_timer(grpc_timer * t)180 static void validate_non_pending_timer(grpc_timer* t) {
181   if (!t->pending && is_in_ht(t)) {
182     grpc_closure* c = t->closure;
183     grpc_core::Crash(absl::StrFormat(
184         "** gpr_timer_cancel() called on a non-pending timer (%p) which "
185         "is in the hash table. Closure: (%p), created at: (%s:%d), "
186         "scheduled at: (%s:%d) **",
187         t, c, c->file_created, c->line_created, c->file_initiated,
188         c->line_initiated));
189   }
190 }
191 
192 #define INIT_TIMER_HASH_TABLE() init_timer_ht()
193 #define DESTROY_TIMER_HASH_TABLE() destroy_timer_ht()
194 #define ADD_TO_HASH_TABLE(t) add_to_ht((t))
195 #define REMOVE_FROM_HASH_TABLE(t) remove_from_ht((t))
196 #define VALIDATE_NON_PENDING_TIMER(t) validate_non_pending_timer((t))
197 
198 #else
199 
200 #define INIT_TIMER_HASH_TABLE()
201 #define DESTROY_TIMER_HASH_TABLE()
202 #define ADD_TO_HASH_TABLE(t)
203 #define REMOVE_FROM_HASH_TABLE(t)
204 #define VALIDATE_NON_PENDING_TIMER(t)
205 
206 #endif
207 
208 // Thread local variable that stores the deadline of the next timer the thread
209 // has last-seen. This is an optimization to prevent the thread from checking
210 // shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
211 // an expensive operation)
212 static thread_local int64_t g_last_seen_min_timer;
213 
214 struct shared_mutables {
215   // The deadline of the next timer due across all timer shards
216   grpc_core::Timestamp min_timer;
217   // Allow only one run_some_expired_timers at once
218   gpr_spinlock checker_mu;
219   bool initialized;
220   // Protects g_shard_queue (and the shared_mutables struct itself)
221   gpr_mu mu;
222 } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
223 
224 static struct shared_mutables g_shared_mutables;
225 
226 static grpc_timer_check_result run_some_expired_timers(
227     grpc_core::Timestamp now, grpc_core::Timestamp* next,
228     grpc_error_handle error);
229 
compute_min_deadline(timer_shard * shard)230 static grpc_core::Timestamp compute_min_deadline(timer_shard* shard) {
231   return grpc_timer_heap_is_empty(&shard->heap)
232              ? shard->queue_deadline_cap + grpc_core::Duration::Epsilon()
233              : grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
234                    grpc_timer_heap_top(&shard->heap)->deadline);
235 }
236 
timer_list_init()237 static void timer_list_init() {
238   uint32_t i;
239 
240   g_num_shards = grpc_core::Clamp(2 * gpr_cpu_num_cores(), 1u, 32u);
241   g_shards =
242       static_cast<timer_shard*>(gpr_zalloc(g_num_shards * sizeof(*g_shards)));
243   g_shard_queue = static_cast<timer_shard**>(
244       gpr_zalloc(g_num_shards * sizeof(*g_shard_queue)));
245 
246   g_shared_mutables.initialized = true;
247   g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
248   gpr_mu_init(&g_shared_mutables.mu);
249   g_shared_mutables.min_timer = grpc_core::Timestamp::Now();
250 
251   g_last_seen_min_timer = 0;
252 
253   for (i = 0; i < g_num_shards; i++) {
254     timer_shard* shard = &g_shards[i];
255     gpr_mu_init(&shard->mu);
256     shard->stats.Init(1.0 / ADD_DEADLINE_SCALE, 0.1, 0.5);
257     shard->queue_deadline_cap = g_shared_mutables.min_timer;
258     shard->shard_queue_index = i;
259     grpc_timer_heap_init(&shard->heap);
260     shard->list.next = shard->list.prev = &shard->list;
261     shard->min_deadline = compute_min_deadline(shard);
262     g_shard_queue[i] = shard;
263   }
264 
265   INIT_TIMER_HASH_TABLE();
266 }
267 
timer_list_shutdown()268 static void timer_list_shutdown() {
269   size_t i;
270   run_some_expired_timers(grpc_core::Timestamp::InfFuture(), nullptr,
271                           GRPC_ERROR_CREATE("Timer list shutdown"));
272   for (i = 0; i < g_num_shards; i++) {
273     timer_shard* shard = &g_shards[i];
274     gpr_mu_destroy(&shard->mu);
275     grpc_timer_heap_destroy(&shard->heap);
276   }
277   gpr_mu_destroy(&g_shared_mutables.mu);
278   gpr_free(g_shards);
279   gpr_free(g_shard_queue);
280   g_shared_mutables.initialized = false;
281 
282   DESTROY_TIMER_HASH_TABLE();
283 }
284 
285 // returns true if the first element in the list
list_join(grpc_timer * head,grpc_timer * timer)286 static void list_join(grpc_timer* head, grpc_timer* timer) {
287   timer->next = head;
288   timer->prev = head->prev;
289   timer->next->prev = timer->prev->next = timer;
290 }
291 
list_remove(grpc_timer * timer)292 static void list_remove(grpc_timer* timer) {
293   timer->next->prev = timer->prev;
294   timer->prev->next = timer->next;
295 }
296 
swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index)297 static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
298   timer_shard* temp;
299   temp = g_shard_queue[first_shard_queue_index];
300   g_shard_queue[first_shard_queue_index] =
301       g_shard_queue[first_shard_queue_index + 1];
302   g_shard_queue[first_shard_queue_index + 1] = temp;
303   g_shard_queue[first_shard_queue_index]->shard_queue_index =
304       first_shard_queue_index;
305   g_shard_queue[first_shard_queue_index + 1]->shard_queue_index =
306       first_shard_queue_index + 1;
307 }
308 
note_deadline_change(timer_shard * shard)309 static void note_deadline_change(timer_shard* shard) {
310   while (shard->shard_queue_index > 0 &&
311          shard->min_deadline <
312              g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
313     swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
314   }
315   while (shard->shard_queue_index < g_num_shards - 1 &&
316          shard->min_deadline >
317              g_shard_queue[shard->shard_queue_index + 1]->min_deadline) {
318     swap_adjacent_shards_in_queue(shard->shard_queue_index);
319   }
320 }
321 
grpc_timer_init_unset(grpc_timer * timer)322 void grpc_timer_init_unset(grpc_timer* timer) { timer->pending = false; }
323 
timer_init(grpc_timer * timer,grpc_core::Timestamp deadline,grpc_closure * closure)324 static void timer_init(grpc_timer* timer, grpc_core::Timestamp deadline,
325                        grpc_closure* closure) {
326   int is_first_timer = 0;
327   timer_shard* shard = &g_shards[grpc_core::HashPointer(timer, g_num_shards)];
328   timer->closure = closure;
329   timer->deadline = deadline.milliseconds_after_process_epoch();
330 
331 #ifndef NDEBUG
332   timer->hash_table_next = nullptr;
333 #endif
334 
335   GRPC_TRACE_VLOG(timer, 2)
336       << "TIMER " << timer << ": SET "
337       << deadline.milliseconds_after_process_epoch() << " now "
338       << grpc_core::Timestamp::Now().milliseconds_after_process_epoch()
339       << " call " << closure << "[" << closure->cb << "]";
340 
341   if (!g_shared_mutables.initialized) {
342     timer->pending = false;
343     grpc_core::ExecCtx::Run(
344         DEBUG_LOCATION, timer->closure,
345         GRPC_ERROR_CREATE("Attempt to create timer before initialization"));
346     return;
347   }
348 
349   gpr_mu_lock(&shard->mu);
350   timer->pending = true;
351   grpc_core::Timestamp now = grpc_core::Timestamp::Now();
352   if (deadline <= now) {
353     timer->pending = false;
354     grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, absl::OkStatus());
355     gpr_mu_unlock(&shard->mu);
356     // early out
357     return;
358   }
359 
360   shard->stats->AddSample((deadline - now).millis() / 1000.0);
361 
362   ADD_TO_HASH_TABLE(timer);
363 
364   if (deadline < shard->queue_deadline_cap) {
365     is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
366   } else {
367     timer->heap_index = INVALID_HEAP_INDEX;
368     list_join(&shard->list, timer);
369   }
370   GRPC_TRACE_VLOG(timer, 2)
371       << "  .. add to shard " << (shard - g_shards)
372       << " with queue_deadline_cap="
373       << shard->queue_deadline_cap.milliseconds_after_process_epoch()
374       << " => is_first_timer=" << (is_first_timer ? "true" : "false");
375   gpr_mu_unlock(&shard->mu);
376 
377   // Deadline may have decreased, we need to adjust the main queue.  Note
378   // that there is a potential racy unlocked region here.  There could be a
379   // reordering of multiple grpc_timer_init calls, at this point, but the < test
380   // below should ensure that we err on the side of caution.  There could
381   // also be a race with grpc_timer_check, which might beat us to the lock.  In
382   // that case, it is possible that the timer that we added will have already
383   // run by the time we hold the lock, but that too is a safe error.
384   // Finally, it's possible that the grpc_timer_check that intervened failed to
385   // trigger the new timer because the min_deadline hadn't yet been reduced.
386   // In that case, the timer will simply have to wait for the next
387   // grpc_timer_check.
388   if (is_first_timer) {
389     gpr_mu_lock(&g_shared_mutables.mu);
390     GRPC_TRACE_VLOG(timer, 2)
391         << "  .. old shard min_deadline="
392         << shard->min_deadline.milliseconds_after_process_epoch();
393     if (deadline < shard->min_deadline) {
394       grpc_core::Timestamp old_min_deadline = g_shard_queue[0]->min_deadline;
395       shard->min_deadline = deadline;
396       note_deadline_change(shard);
397       if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
398 #if GPR_ARCH_64
399         // TODO(sreek): Using c-style cast here. static_cast<> gives an error
400         // (on mac platforms complaining that gpr_atm* is (long *) while
401         // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
402         // safe since we know that both are pointer types and 64-bit wide.
403         gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
404                                  deadline.milliseconds_after_process_epoch());
405 #else
406         // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
407         // types (like grpc_core::Timestamp). So all reads and writes to
408         // g_shared_mutables.min_timer variable under g_shared_mutables.mu
409         g_shared_mutables.min_timer = deadline;
410 #endif
411         grpc_kick_poller();
412       }
413     }
414     gpr_mu_unlock(&g_shared_mutables.mu);
415   }
416 }
417 
timer_consume_kick(void)418 static void timer_consume_kick(void) {
419   // Force re-evaluation of last seen min
420   g_last_seen_min_timer = 0;
421 }
422 
timer_cancel(grpc_timer * timer)423 static void timer_cancel(grpc_timer* timer) {
424   if (!g_shared_mutables.initialized) {
425     // must have already been cancelled, also the shard mutex is invalid
426     return;
427   }
428 
429   timer_shard* shard = &g_shards[grpc_core::HashPointer(timer, g_num_shards)];
430   gpr_mu_lock(&shard->mu);
431   GRPC_TRACE_VLOG(timer, 2)
432       << "TIMER " << timer
433       << ": CANCEL pending=" << (timer->pending ? "true" : "false");
434 
435   if (timer->pending) {
436     REMOVE_FROM_HASH_TABLE(timer);
437 
438     grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure,
439                             absl::CancelledError());
440     timer->pending = false;
441     if (timer->heap_index == INVALID_HEAP_INDEX) {
442       list_remove(timer);
443     } else {
444       grpc_timer_heap_remove(&shard->heap, timer);
445     }
446   } else {
447     VALIDATE_NON_PENDING_TIMER(timer);
448   }
449   gpr_mu_unlock(&shard->mu);
450 }
451 
452 // Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
453 // all relevant timers in shard->list (i.e timers with deadlines earlier than
454 // 'queue_deadline_cap') into into shard->heap.
455 // Returns 'true' if shard->heap has at least ONE element
456 // REQUIRES: shard->mu locked
refill_heap(timer_shard * shard,grpc_core::Timestamp now)457 static bool refill_heap(timer_shard* shard, grpc_core::Timestamp now) {
458   // Compute the new queue window width and bound by the limits:
459   double computed_deadline_delta =
460       shard->stats->UpdateAverage() * ADD_DEADLINE_SCALE;
461   double deadline_delta =
462       grpc_core::Clamp(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION,
463                        MAX_QUEUE_WINDOW_DURATION);
464   grpc_timer *timer, *next;
465 
466   // Compute the new cap and put all timers under it into the queue:
467   shard->queue_deadline_cap =
468       std::max(now, shard->queue_deadline_cap) +
469       grpc_core::Duration::FromSecondsAsDouble(deadline_delta);
470 
471   GRPC_TRACE_VLOG(timer_check, 2)
472       << "  .. shard[" << (shard - g_shards) << "]->queue_deadline_cap --> "
473       << shard->queue_deadline_cap.milliseconds_after_process_epoch();
474   for (timer = shard->list.next; timer != &shard->list; timer = next) {
475     next = timer->next;
476     auto timer_deadline =
477         grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
478             timer->deadline);
479 
480     if (timer_deadline < shard->queue_deadline_cap) {
481       GRPC_TRACE_VLOG(timer_check, 2)
482           << "  .. add timer with deadline "
483           << timer_deadline.milliseconds_after_process_epoch() << " to heap";
484       list_remove(timer);
485       grpc_timer_heap_add(&shard->heap, timer);
486     }
487   }
488   return !grpc_timer_heap_is_empty(&shard->heap);
489 }
490 
491 // This pops the next non-cancelled timer with deadline <= now from the
492 // queue, or returns NULL if there isn't one.
493 // REQUIRES: shard->mu locked
pop_one(timer_shard * shard,grpc_core::Timestamp now)494 static grpc_timer* pop_one(timer_shard* shard, grpc_core::Timestamp now) {
495   grpc_timer* timer;
496   for (;;) {
497     GRPC_TRACE_VLOG(timer_check, 2)
498         << "  .. shard[" << (shard - g_shards) << "]: heap_empty="
499         << (grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false");
500     if (grpc_timer_heap_is_empty(&shard->heap)) {
501       if (now < shard->queue_deadline_cap) return nullptr;
502       if (!refill_heap(shard, now)) return nullptr;
503     }
504     timer = grpc_timer_heap_top(&shard->heap);
505     auto timer_deadline =
506         grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
507             timer->deadline);
508     GRPC_TRACE_VLOG(timer_check, 2)
509         << "  .. check top timer deadline="
510         << timer_deadline.milliseconds_after_process_epoch()
511         << " now=" << now.milliseconds_after_process_epoch();
512     if (timer_deadline > now) return nullptr;
513     GRPC_TRACE_VLOG(timer, 2) << "TIMER " << timer << ": FIRE "
514                               << (now - timer_deadline).millis() << "ms late";
515     timer->pending = false;
516     grpc_timer_heap_pop(&shard->heap);
517     return timer;
518   }
519 }
520 
521 // REQUIRES: shard->mu unlocked
pop_timers(timer_shard * shard,grpc_core::Timestamp now,grpc_core::Timestamp * new_min_deadline,grpc_error_handle error)522 static size_t pop_timers(timer_shard* shard, grpc_core::Timestamp now,
523                          grpc_core::Timestamp* new_min_deadline,
524                          grpc_error_handle error) {
525   size_t n = 0;
526   grpc_timer* timer;
527   gpr_mu_lock(&shard->mu);
528   while ((timer = pop_one(shard, now))) {
529     REMOVE_FROM_HASH_TABLE(timer);
530     grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, error);
531     n++;
532   }
533   *new_min_deadline = compute_min_deadline(shard);
534   gpr_mu_unlock(&shard->mu);
535   GRPC_TRACE_VLOG(timer_check, 2)
536       << "  .. shard[" << (shard - g_shards) << "] popped " << n;
537   return n;
538 }
539 
run_some_expired_timers(grpc_core::Timestamp now,grpc_core::Timestamp * next,grpc_error_handle error)540 static grpc_timer_check_result run_some_expired_timers(
541     grpc_core::Timestamp now, grpc_core::Timestamp* next,
542     grpc_error_handle error) {
543   grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED;
544 
545 #if GPR_ARCH_64
546   // TODO(sreek): Using c-style cast here. static_cast<> gives an error (on
547   // mac platforms complaining that gpr_atm* is (long *) while
548   // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
549   // safe since we know that both are pointer types and 64-bit wide
550   grpc_core::Timestamp min_timer =
551       grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
552           gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer)));
553 #else
554   // On 32-bit systems, gpr_atm_no_barrier_load does not work on 64-bit types
555   // (like grpc_core::Timestamp). So all reads and writes to
556   // g_shared_mutables.min_timer are done under g_shared_mutables.mu
557   gpr_mu_lock(&g_shared_mutables.mu);
558   grpc_core::Timestamp min_timer = g_shared_mutables.min_timer;
559   gpr_mu_unlock(&g_shared_mutables.mu);
560 #endif
561   g_last_seen_min_timer = min_timer.milliseconds_after_process_epoch();
562 
563   if (now < min_timer) {
564     if (next != nullptr) *next = std::min(*next, min_timer);
565     return GRPC_TIMERS_CHECKED_AND_EMPTY;
566   }
567 
568   if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) {
569     gpr_mu_lock(&g_shared_mutables.mu);
570     result = GRPC_TIMERS_CHECKED_AND_EMPTY;
571 
572     GRPC_TRACE_VLOG(timer_check, 2)
573         << "  .. shard[" << (g_shard_queue[0] - g_shards)
574         << "]->min_deadline = "
575         << g_shard_queue[0]->min_deadline.milliseconds_after_process_epoch();
576 
577     while (g_shard_queue[0]->min_deadline < now ||
578            (now != grpc_core::Timestamp::InfFuture() &&
579             g_shard_queue[0]->min_deadline == now)) {
580       grpc_core::Timestamp new_min_deadline;
581 
582       // For efficiency, we pop as many available timers as we can from the
583       // shard.  This may violate perfect timer deadline ordering, but that
584       // shouldn't be a big deal because we don't make ordering guarantees.
585       if (pop_timers(g_shard_queue[0], now, &new_min_deadline, error) > 0) {
586         result = GRPC_TIMERS_FIRED;
587       }
588 
589       GRPC_TRACE_VLOG(timer_check, 2)
590           << "  .. result --> " << result << ", shard["
591           << (g_shard_queue[0] - g_shards) << "]->min_deadline "
592           << g_shard_queue[0]->min_deadline.milliseconds_after_process_epoch()
593           << " --> " << new_min_deadline.milliseconds_after_process_epoch()
594           << ", now=" << now.milliseconds_after_process_epoch();
595 
596       // An grpc_timer_init() on the shard could intervene here, adding a new
597       // timer that is earlier than new_min_deadline.  However,
598       // grpc_timer_init() will block on the mutex before it can call
599       // set_min_deadline, so this one will complete first and then the Addtimer
600       // will reduce the min_deadline (perhaps unnecessarily).
601       g_shard_queue[0]->min_deadline = new_min_deadline;
602       note_deadline_change(g_shard_queue[0]);
603     }
604 
605     if (next) {
606       *next = std::min(*next, g_shard_queue[0]->min_deadline);
607     }
608 
609 #if GPR_ARCH_64
610     // TODO(sreek): Using c-style cast here. static_cast<> gives an error (on
611     // mac platforms complaining that gpr_atm* is (long *) while
612     // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
613     // safe since we know that both are pointer types and 64-bit wide
614     gpr_atm_no_barrier_store(
615         (gpr_atm*)(&g_shared_mutables.min_timer),
616         g_shard_queue[0]->min_deadline.milliseconds_after_process_epoch());
617 #else
618     // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
619     // types (like grpc_core::Timestamp). So all reads and writes to
620     // g_shared_mutables.min_timer are done under g_shared_mutables.mu
621     g_shared_mutables.min_timer = g_shard_queue[0]->min_deadline;
622 #endif
623     gpr_mu_unlock(&g_shared_mutables.mu);
624     gpr_spinlock_unlock(&g_shared_mutables.checker_mu);
625   }
626 
627   return result;
628 }
629 
timer_check(grpc_core::Timestamp * next)630 static grpc_timer_check_result timer_check(grpc_core::Timestamp* next) {
631   // prelude
632   grpc_core::Timestamp now = grpc_core::Timestamp::Now();
633 
634   // fetch from a thread-local first: this avoids contention on a globally
635   // mutable cacheline in the common case
636   grpc_core::Timestamp min_timer =
637       grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
638           g_last_seen_min_timer);
639 
640   if (now < min_timer) {
641     if (next != nullptr) {
642       *next = std::min(*next, min_timer);
643     }
644     GRPC_TRACE_VLOG(timer_check, 2)
645         << "TIMER CHECK SKIP: now=" << now.milliseconds_after_process_epoch()
646         << " min_timer=" << min_timer.milliseconds_after_process_epoch();
647     return GRPC_TIMERS_CHECKED_AND_EMPTY;
648   }
649 
650   grpc_error_handle shutdown_error =
651       now != grpc_core::Timestamp::InfFuture()
652           ? absl::OkStatus()
653           : GRPC_ERROR_CREATE("Shutting down timer system");
654 
655   // tracing
656   if (GRPC_TRACE_FLAG_ENABLED(timer_check)) {
657     std::string next_str;
658     if (next == nullptr) {
659       next_str = "NULL";
660     } else {
661       next_str = absl::StrCat(next->milliseconds_after_process_epoch());
662     }
663 #if GPR_ARCH_64
664     VLOG(2) << "TIMER CHECK BEGIN: now="
665             << now.milliseconds_after_process_epoch() << " next=" << next_str
666             << " tls_min=" << min_timer.milliseconds_after_process_epoch()
667             << " glob_min="
668             << grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
669                    gpr_atm_no_barrier_load(
670                        (gpr_atm*)(&g_shared_mutables.min_timer)))
671                    .milliseconds_after_process_epoch();
672 #else
673     VLOG(2) << "TIMER CHECK BEGIN: now="
674             << now.milliseconds_after_process_epoch() << " next=" << next_str
675             << " min=" << min_timer.milliseconds_after_process_epoch();
676 #endif
677   }
678   // actual code
679   grpc_timer_check_result r =
680       run_some_expired_timers(now, next, shutdown_error);
681   // tracing
682   if (GRPC_TRACE_FLAG_ENABLED(timer_check)) {
683     std::string next_str;
684     if (next == nullptr) {
685       next_str = "NULL";
686     } else {
687       next_str = absl::StrCat(next->milliseconds_after_process_epoch());
688     }
689     VLOG(2) << "TIMER CHECK END: r=" << r << "; next=" << next_str.c_str();
690   }
691   return r;
692 }
693 
694 grpc_timer_vtable grpc_generic_timer_vtable = {
695     timer_init,      timer_cancel,        timer_check,
696     timer_list_init, timer_list_shutdown, timer_consume_kick};
697