1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #include <inttypes.h>
24
25 #include "src/core/lib/iomgr/timer.h"
26
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/cpu.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/string_util.h>
31 #include <grpc/support/sync.h>
32
33 #include "src/core/lib/debug/trace.h"
34 #include "src/core/lib/gpr/spinlock.h"
35 #include "src/core/lib/gpr/tls.h"
36 #include "src/core/lib/gpr/useful.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/iomgr/time_averaged_stats.h"
39 #include "src/core/lib/iomgr/timer_heap.h"
40
41 #define INVALID_HEAP_INDEX 0xffffffffu
42
43 #define ADD_DEADLINE_SCALE 0.33
44 #define MIN_QUEUE_WINDOW_DURATION 0.01
45 #define MAX_QUEUE_WINDOW_DURATION 1
46
47 grpc_core::TraceFlag grpc_timer_trace(false, "timer");
48 grpc_core::TraceFlag grpc_timer_check_trace(false, "timer_check");
49
50 /* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
51 * deadlines earlier than 'queue_deadline" cap are maintained in the heap and
52 * others are maintained in the list (unordered). This helps to keep the number
53 * of elements in the heap low.
54 *
55 * The 'queue_deadline_cap' gets recomputed periodically based on the timer
56 * stats maintained in 'stats' and the relevant timers are then moved from the
57 * 'list' to 'heap'
58 */
59 typedef struct {
60 gpr_mu mu;
61 grpc_time_averaged_stats stats;
62 /* All and only timers with deadlines <= this will be in the heap. */
63 grpc_millis queue_deadline_cap;
64 /* The deadline of the next timer due in this shard */
65 grpc_millis min_deadline;
66 /* Index of this timer_shard in the g_shard_queue */
67 uint32_t shard_queue_index;
68 /* This holds all timers with deadlines < queue_deadline_cap. Timers in this
69 list have the top bit of their deadline set to 0. */
70 grpc_timer_heap heap;
71 /* This holds timers whose deadline is >= queue_deadline_cap. */
72 grpc_timer list;
73 } timer_shard;
74
75 static size_t g_num_shards;
76
77 /* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
78 * is hashed to select the timer shard to add the timer to */
79 static timer_shard* g_shards;
80
81 /* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
82 * the deadline of the next timer in each shard).
83 * Access to this is protected by g_shared_mutables.mu */
84 static timer_shard** g_shard_queue;
85
86 #ifndef NDEBUG
87
88 /* == Hash table for duplicate timer detection == */
89
90 #define NUM_HASH_BUCKETS 1009 /* Prime number close to 1000 */
91
92 static gpr_mu g_hash_mu[NUM_HASH_BUCKETS]; /* One mutex per bucket */
93 static grpc_timer* g_timer_ht[NUM_HASH_BUCKETS] = {nullptr};
94
init_timer_ht()95 static void init_timer_ht() {
96 for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
97 gpr_mu_init(&g_hash_mu[i]);
98 }
99 }
100
destroy_timer_ht()101 static void destroy_timer_ht() {
102 for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
103 gpr_mu_destroy(&g_hash_mu[i]);
104 }
105 }
106
is_in_ht(grpc_timer * t)107 static bool is_in_ht(grpc_timer* t) {
108 size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
109
110 gpr_mu_lock(&g_hash_mu[i]);
111 grpc_timer* p = g_timer_ht[i];
112 while (p != nullptr && p != t) {
113 p = p->hash_table_next;
114 }
115 gpr_mu_unlock(&g_hash_mu[i]);
116
117 return (p == t);
118 }
119
add_to_ht(grpc_timer * t)120 static void add_to_ht(grpc_timer* t) {
121 GPR_ASSERT(!t->hash_table_next);
122 size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
123
124 gpr_mu_lock(&g_hash_mu[i]);
125 grpc_timer* p = g_timer_ht[i];
126 while (p != nullptr && p != t) {
127 p = p->hash_table_next;
128 }
129
130 if (p == t) {
131 grpc_closure* c = t->closure;
132 gpr_log(GPR_ERROR,
133 "** Duplicate timer (%p) being added. Closure: (%p), created at: "
134 "(%s:%d), scheduled at: (%s:%d) **",
135 t, c, c->file_created, c->line_created, c->file_initiated,
136 c->line_initiated);
137 abort();
138 }
139
140 /* Timer not present in the bucket. Insert at head of the list */
141 t->hash_table_next = g_timer_ht[i];
142 g_timer_ht[i] = t;
143 gpr_mu_unlock(&g_hash_mu[i]);
144 }
145
remove_from_ht(grpc_timer * t)146 static void remove_from_ht(grpc_timer* t) {
147 size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
148 bool removed = false;
149
150 gpr_mu_lock(&g_hash_mu[i]);
151 if (g_timer_ht[i] == t) {
152 g_timer_ht[i] = g_timer_ht[i]->hash_table_next;
153 removed = true;
154 } else if (g_timer_ht[i] != nullptr) {
155 grpc_timer* p = g_timer_ht[i];
156 while (p->hash_table_next != nullptr && p->hash_table_next != t) {
157 p = p->hash_table_next;
158 }
159
160 if (p->hash_table_next == t) {
161 p->hash_table_next = t->hash_table_next;
162 removed = true;
163 }
164 }
165 gpr_mu_unlock(&g_hash_mu[i]);
166
167 if (!removed) {
168 grpc_closure* c = t->closure;
169 gpr_log(GPR_ERROR,
170 "** Removing timer (%p) that is not added to hash table. Closure "
171 "(%p), created at: (%s:%d), scheduled at: (%s:%d) **",
172 t, c, c->file_created, c->line_created, c->file_initiated,
173 c->line_initiated);
174 abort();
175 }
176
177 t->hash_table_next = nullptr;
178 }
179
180 /* If a timer is added to a timer shard (either heap or a list), it cannot
181 * be pending. A timer is added to hash table only-if it is added to the
182 * timer shard.
183 * Therefore, if timer->pending is false, it cannot be in hash table */
validate_non_pending_timer(grpc_timer * t)184 static void validate_non_pending_timer(grpc_timer* t) {
185 if (!t->pending && is_in_ht(t)) {
186 grpc_closure* c = t->closure;
187 gpr_log(GPR_ERROR,
188 "** gpr_timer_cancel() called on a non-pending timer (%p) which "
189 "is in the hash table. Closure: (%p), created at: (%s:%d), "
190 "scheduled at: (%s:%d) **",
191 t, c, c->file_created, c->line_created, c->file_initiated,
192 c->line_initiated);
193 abort();
194 }
195 }
196
197 #define INIT_TIMER_HASH_TABLE() init_timer_ht()
198 #define DESTROY_TIMER_HASH_TABLE() destroy_timer_ht()
199 #define ADD_TO_HASH_TABLE(t) add_to_ht((t))
200 #define REMOVE_FROM_HASH_TABLE(t) remove_from_ht((t))
201 #define VALIDATE_NON_PENDING_TIMER(t) validate_non_pending_timer((t))
202
203 #else
204
205 #define INIT_TIMER_HASH_TABLE()
206 #define DESTROY_TIMER_HASH_TABLE()
207 #define ADD_TO_HASH_TABLE(t)
208 #define REMOVE_FROM_HASH_TABLE(t)
209 #define VALIDATE_NON_PENDING_TIMER(t)
210
211 #endif
212
213 #if GPR_ARCH_64
214 /* NOTE: TODO(sreek) - Currently the thread local storage support in grpc is
215 for intptr_t which means on 32-bit machines it is not wide enough to hold
216 grpc_millis which is 64-bit. Adding thread local support for 64 bit values
217 is a lot of work for very little gain. So we are currently restricting this
218 optimization to only 64 bit machines */
219
220 /* Thread local variable that stores the deadline of the next timer the thread
221 * has last-seen. This is an optimization to prevent the thread from checking
222 * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
223 * an expensive operation) */
224 GPR_TLS_DECL(g_last_seen_min_timer);
225 #endif
226
227 struct shared_mutables {
228 /* The deadline of the next timer due across all timer shards */
229 grpc_millis min_timer;
230 /* Allow only one run_some_expired_timers at once */
231 gpr_spinlock checker_mu;
232 bool initialized;
233 /* Protects g_shard_queue (and the shared_mutables struct itself) */
234 gpr_mu mu;
235 } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
236
237 static struct shared_mutables g_shared_mutables;
238
saturating_add(grpc_millis a,grpc_millis b)239 static grpc_millis saturating_add(grpc_millis a, grpc_millis b) {
240 if (a > GRPC_MILLIS_INF_FUTURE - b) {
241 return GRPC_MILLIS_INF_FUTURE;
242 }
243 return a + b;
244 }
245
246 static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
247 grpc_millis* next,
248 grpc_error* error);
249
compute_min_deadline(timer_shard * shard)250 static grpc_millis compute_min_deadline(timer_shard* shard) {
251 return grpc_timer_heap_is_empty(&shard->heap)
252 ? saturating_add(shard->queue_deadline_cap, 1)
253 : grpc_timer_heap_top(&shard->heap)->deadline;
254 }
255
timer_list_init()256 static void timer_list_init() {
257 uint32_t i;
258
259 g_num_shards = GPR_MIN(1, 2 * gpr_cpu_num_cores());
260 g_shards =
261 static_cast<timer_shard*>(gpr_zalloc(g_num_shards * sizeof(*g_shards)));
262 g_shard_queue = static_cast<timer_shard**>(
263 gpr_zalloc(g_num_shards * sizeof(*g_shard_queue)));
264
265 g_shared_mutables.initialized = true;
266 g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
267 gpr_mu_init(&g_shared_mutables.mu);
268 g_shared_mutables.min_timer = grpc_core::ExecCtx::Get()->Now();
269
270 #if GPR_ARCH_64
271 gpr_tls_init(&g_last_seen_min_timer);
272 gpr_tls_set(&g_last_seen_min_timer, 0);
273 #endif
274
275 for (i = 0; i < g_num_shards; i++) {
276 timer_shard* shard = &g_shards[i];
277 gpr_mu_init(&shard->mu);
278 grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
279 0.5);
280 shard->queue_deadline_cap = g_shared_mutables.min_timer;
281 shard->shard_queue_index = i;
282 grpc_timer_heap_init(&shard->heap);
283 shard->list.next = shard->list.prev = &shard->list;
284 shard->min_deadline = compute_min_deadline(shard);
285 g_shard_queue[i] = shard;
286 }
287
288 INIT_TIMER_HASH_TABLE();
289 }
290
timer_list_shutdown()291 static void timer_list_shutdown() {
292 size_t i;
293 run_some_expired_timers(
294 GRPC_MILLIS_INF_FUTURE, nullptr,
295 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
296 for (i = 0; i < g_num_shards; i++) {
297 timer_shard* shard = &g_shards[i];
298 gpr_mu_destroy(&shard->mu);
299 grpc_timer_heap_destroy(&shard->heap);
300 }
301 gpr_mu_destroy(&g_shared_mutables.mu);
302
303 #if GPR_ARCH_64
304 gpr_tls_destroy(&g_last_seen_min_timer);
305 #endif
306
307 gpr_free(g_shards);
308 gpr_free(g_shard_queue);
309 g_shared_mutables.initialized = false;
310
311 DESTROY_TIMER_HASH_TABLE();
312 }
313
314 /* returns true if the first element in the list */
list_join(grpc_timer * head,grpc_timer * timer)315 static void list_join(grpc_timer* head, grpc_timer* timer) {
316 timer->next = head;
317 timer->prev = head->prev;
318 timer->next->prev = timer->prev->next = timer;
319 }
320
list_remove(grpc_timer * timer)321 static void list_remove(grpc_timer* timer) {
322 timer->next->prev = timer->prev;
323 timer->prev->next = timer->next;
324 }
325
swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index)326 static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
327 timer_shard* temp;
328 temp = g_shard_queue[first_shard_queue_index];
329 g_shard_queue[first_shard_queue_index] =
330 g_shard_queue[first_shard_queue_index + 1];
331 g_shard_queue[first_shard_queue_index + 1] = temp;
332 g_shard_queue[first_shard_queue_index]->shard_queue_index =
333 first_shard_queue_index;
334 g_shard_queue[first_shard_queue_index + 1]->shard_queue_index =
335 first_shard_queue_index + 1;
336 }
337
note_deadline_change(timer_shard * shard)338 static void note_deadline_change(timer_shard* shard) {
339 while (shard->shard_queue_index > 0 &&
340 shard->min_deadline <
341 g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
342 swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
343 }
344 while (shard->shard_queue_index < g_num_shards - 1 &&
345 shard->min_deadline >
346 g_shard_queue[shard->shard_queue_index + 1]->min_deadline) {
347 swap_adjacent_shards_in_queue(shard->shard_queue_index);
348 }
349 }
350
grpc_timer_init_unset(grpc_timer * timer)351 void grpc_timer_init_unset(grpc_timer* timer) { timer->pending = false; }
352
timer_init(grpc_timer * timer,grpc_millis deadline,grpc_closure * closure)353 static void timer_init(grpc_timer* timer, grpc_millis deadline,
354 grpc_closure* closure) {
355 int is_first_timer = 0;
356 timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
357 timer->closure = closure;
358 timer->deadline = deadline;
359
360 #ifndef NDEBUG
361 timer->hash_table_next = nullptr;
362 #endif
363
364 if (grpc_timer_trace.enabled()) {
365 gpr_log(GPR_INFO, "TIMER %p: SET %" PRId64 " now %" PRId64 " call %p[%p]",
366 timer, deadline, grpc_core::ExecCtx::Get()->Now(), closure,
367 closure->cb);
368 }
369
370 if (!g_shared_mutables.initialized) {
371 timer->pending = false;
372 GRPC_CLOSURE_SCHED(timer->closure,
373 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
374 "Attempt to create timer before initialization"));
375 return;
376 }
377
378 gpr_mu_lock(&shard->mu);
379 timer->pending = true;
380 grpc_millis now = grpc_core::ExecCtx::Get()->Now();
381 if (deadline <= now) {
382 timer->pending = false;
383 GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE);
384 gpr_mu_unlock(&shard->mu);
385 /* early out */
386 return;
387 }
388
389 grpc_time_averaged_stats_add_sample(
390 &shard->stats, static_cast<double>(deadline - now) / 1000.0);
391
392 ADD_TO_HASH_TABLE(timer);
393
394 if (deadline < shard->queue_deadline_cap) {
395 is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
396 } else {
397 timer->heap_index = INVALID_HEAP_INDEX;
398 list_join(&shard->list, timer);
399 }
400 if (grpc_timer_trace.enabled()) {
401 gpr_log(GPR_INFO,
402 " .. add to shard %d with queue_deadline_cap=%" PRId64
403 " => is_first_timer=%s",
404 static_cast<int>(shard - g_shards), shard->queue_deadline_cap,
405 is_first_timer ? "true" : "false");
406 }
407 gpr_mu_unlock(&shard->mu);
408
409 /* Deadline may have decreased, we need to adjust the master queue. Note
410 that there is a potential racy unlocked region here. There could be a
411 reordering of multiple grpc_timer_init calls, at this point, but the < test
412 below should ensure that we err on the side of caution. There could
413 also be a race with grpc_timer_check, which might beat us to the lock. In
414 that case, it is possible that the timer that we added will have already
415 run by the time we hold the lock, but that too is a safe error.
416 Finally, it's possible that the grpc_timer_check that intervened failed to
417 trigger the new timer because the min_deadline hadn't yet been reduced.
418 In that case, the timer will simply have to wait for the next
419 grpc_timer_check. */
420 if (is_first_timer) {
421 gpr_mu_lock(&g_shared_mutables.mu);
422 if (grpc_timer_trace.enabled()) {
423 gpr_log(GPR_INFO, " .. old shard min_deadline=%" PRId64,
424 shard->min_deadline);
425 }
426 if (deadline < shard->min_deadline) {
427 grpc_millis old_min_deadline = g_shard_queue[0]->min_deadline;
428 shard->min_deadline = deadline;
429 note_deadline_change(shard);
430 if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
431 #if GPR_ARCH_64
432 // TODO: sreek - Using c-style cast here. static_cast<> gives an error
433 // (on mac platforms complaining that gpr_atm* is (long *) while
434 // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
435 // safe since we know that both are pointer types and 64-bit wide.
436 gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
437 deadline);
438 #else
439 // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
440 // types (like grpc_millis). So all reads and writes to
441 // g_shared_mutables.min_timer varialbe under g_shared_mutables.mu
442 g_shared_mutables.min_timer = deadline;
443 #endif
444 grpc_kick_poller();
445 }
446 }
447 gpr_mu_unlock(&g_shared_mutables.mu);
448 }
449 }
450
timer_consume_kick(void)451 static void timer_consume_kick(void) {
452 #if GPR_ARCH_64
453 /* Force re-evaluation of last seen min */
454 gpr_tls_set(&g_last_seen_min_timer, 0);
455 #endif
456 }
457
timer_cancel(grpc_timer * timer)458 static void timer_cancel(grpc_timer* timer) {
459 if (!g_shared_mutables.initialized) {
460 /* must have already been cancelled, also the shard mutex is invalid */
461 return;
462 }
463
464 timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
465 gpr_mu_lock(&shard->mu);
466 if (grpc_timer_trace.enabled()) {
467 gpr_log(GPR_INFO, "TIMER %p: CANCEL pending=%s", timer,
468 timer->pending ? "true" : "false");
469 }
470
471 if (timer->pending) {
472 REMOVE_FROM_HASH_TABLE(timer);
473
474 GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_CANCELLED);
475 timer->pending = false;
476 if (timer->heap_index == INVALID_HEAP_INDEX) {
477 list_remove(timer);
478 } else {
479 grpc_timer_heap_remove(&shard->heap, timer);
480 }
481 } else {
482 VALIDATE_NON_PENDING_TIMER(timer);
483 }
484 gpr_mu_unlock(&shard->mu);
485 }
486
487 /* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
488 all relevant timers in shard->list (i.e timers with deadlines earlier than
489 'queue_deadline_cap') into into shard->heap.
490 Returns 'true' if shard->heap has atleast ONE element
491 REQUIRES: shard->mu locked */
refill_heap(timer_shard * shard,grpc_millis now)492 static int refill_heap(timer_shard* shard, grpc_millis now) {
493 /* Compute the new queue window width and bound by the limits: */
494 double computed_deadline_delta =
495 grpc_time_averaged_stats_update_average(&shard->stats) *
496 ADD_DEADLINE_SCALE;
497 double deadline_delta =
498 GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION,
499 MAX_QUEUE_WINDOW_DURATION);
500 grpc_timer *timer, *next;
501
502 /* Compute the new cap and put all timers under it into the queue: */
503 shard->queue_deadline_cap =
504 saturating_add(GPR_MAX(now, shard->queue_deadline_cap),
505 static_cast<grpc_millis>(deadline_delta * 1000.0));
506
507 if (grpc_timer_check_trace.enabled()) {
508 gpr_log(GPR_INFO, " .. shard[%d]->queue_deadline_cap --> %" PRId64,
509 static_cast<int>(shard - g_shards), shard->queue_deadline_cap);
510 }
511 for (timer = shard->list.next; timer != &shard->list; timer = next) {
512 next = timer->next;
513
514 if (timer->deadline < shard->queue_deadline_cap) {
515 if (grpc_timer_check_trace.enabled()) {
516 gpr_log(GPR_INFO, " .. add timer with deadline %" PRId64 " to heap",
517 timer->deadline);
518 }
519 list_remove(timer);
520 grpc_timer_heap_add(&shard->heap, timer);
521 }
522 }
523 return !grpc_timer_heap_is_empty(&shard->heap);
524 }
525
526 /* This pops the next non-cancelled timer with deadline <= now from the
527 queue, or returns NULL if there isn't one.
528 REQUIRES: shard->mu locked */
pop_one(timer_shard * shard,grpc_millis now)529 static grpc_timer* pop_one(timer_shard* shard, grpc_millis now) {
530 grpc_timer* timer;
531 for (;;) {
532 if (grpc_timer_check_trace.enabled()) {
533 gpr_log(GPR_INFO, " .. shard[%d]: heap_empty=%s",
534 static_cast<int>(shard - g_shards),
535 grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false");
536 }
537 if (grpc_timer_heap_is_empty(&shard->heap)) {
538 if (now < shard->queue_deadline_cap) return nullptr;
539 if (!refill_heap(shard, now)) return nullptr;
540 }
541 timer = grpc_timer_heap_top(&shard->heap);
542 if (grpc_timer_check_trace.enabled()) {
543 gpr_log(GPR_INFO,
544 " .. check top timer deadline=%" PRId64 " now=%" PRId64,
545 timer->deadline, now);
546 }
547 if (timer->deadline > now) return nullptr;
548 if (grpc_timer_trace.enabled()) {
549 gpr_log(GPR_INFO, "TIMER %p: FIRE %" PRId64 "ms late via %s scheduler",
550 timer, now - timer->deadline,
551 timer->closure->scheduler->vtable->name);
552 }
553 timer->pending = false;
554 grpc_timer_heap_pop(&shard->heap);
555 return timer;
556 }
557 }
558
559 /* REQUIRES: shard->mu unlocked */
pop_timers(timer_shard * shard,grpc_millis now,grpc_millis * new_min_deadline,grpc_error * error)560 static size_t pop_timers(timer_shard* shard, grpc_millis now,
561 grpc_millis* new_min_deadline, grpc_error* error) {
562 size_t n = 0;
563 grpc_timer* timer;
564 gpr_mu_lock(&shard->mu);
565 while ((timer = pop_one(shard, now))) {
566 REMOVE_FROM_HASH_TABLE(timer);
567 GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_REF(error));
568 n++;
569 }
570 *new_min_deadline = compute_min_deadline(shard);
571 gpr_mu_unlock(&shard->mu);
572 if (grpc_timer_check_trace.enabled()) {
573 gpr_log(GPR_INFO, " .. shard[%d] popped %" PRIdPTR,
574 static_cast<int>(shard - g_shards), n);
575 }
576 return n;
577 }
578
run_some_expired_timers(grpc_millis now,grpc_millis * next,grpc_error * error)579 static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
580 grpc_millis* next,
581 grpc_error* error) {
582 grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED;
583
584 #if GPR_ARCH_64
585 // TODO: sreek - Using c-style cast here. static_cast<> gives an error (on
586 // mac platforms complaining that gpr_atm* is (long *) while
587 // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
588 // safe since we know that both are pointer types and 64-bit wide
589 grpc_millis min_timer = static_cast<grpc_millis>(
590 gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer)));
591 gpr_tls_set(&g_last_seen_min_timer, min_timer);
592 #else
593 // On 32-bit systems, gpr_atm_no_barrier_load does not work on 64-bit types
594 // (like grpc_millis). So all reads and writes to g_shared_mutables.min_timer
595 // are done under g_shared_mutables.mu
596 gpr_mu_lock(&g_shared_mutables.mu);
597 grpc_millis min_timer = g_shared_mutables.min_timer;
598 gpr_mu_unlock(&g_shared_mutables.mu);
599 #endif
600 if (now < min_timer) {
601 if (next != nullptr) *next = GPR_MIN(*next, min_timer);
602 return GRPC_TIMERS_CHECKED_AND_EMPTY;
603 }
604
605 if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) {
606 gpr_mu_lock(&g_shared_mutables.mu);
607 result = GRPC_TIMERS_CHECKED_AND_EMPTY;
608
609 if (grpc_timer_check_trace.enabled()) {
610 gpr_log(GPR_INFO, " .. shard[%d]->min_deadline = %" PRId64,
611 static_cast<int>(g_shard_queue[0] - g_shards),
612 g_shard_queue[0]->min_deadline);
613 }
614
615 while (g_shard_queue[0]->min_deadline < now ||
616 (now != GRPC_MILLIS_INF_FUTURE &&
617 g_shard_queue[0]->min_deadline == now)) {
618 grpc_millis new_min_deadline;
619
620 /* For efficiency, we pop as many available timers as we can from the
621 shard. This may violate perfect timer deadline ordering, but that
622 shouldn't be a big deal because we don't make ordering guarantees. */
623 if (pop_timers(g_shard_queue[0], now, &new_min_deadline, error) > 0) {
624 result = GRPC_TIMERS_FIRED;
625 }
626
627 if (grpc_timer_check_trace.enabled()) {
628 gpr_log(GPR_INFO,
629 " .. result --> %d"
630 ", shard[%d]->min_deadline %" PRId64 " --> %" PRId64
631 ", now=%" PRId64,
632 result, static_cast<int>(g_shard_queue[0] - g_shards),
633 g_shard_queue[0]->min_deadline, new_min_deadline, now);
634 }
635
636 /* An grpc_timer_init() on the shard could intervene here, adding a new
637 timer that is earlier than new_min_deadline. However,
638 grpc_timer_init() will block on the master_lock before it can call
639 set_min_deadline, so this one will complete first and then the Addtimer
640 will reduce the min_deadline (perhaps unnecessarily). */
641 g_shard_queue[0]->min_deadline = new_min_deadline;
642 note_deadline_change(g_shard_queue[0]);
643 }
644
645 if (next) {
646 *next = GPR_MIN(*next, g_shard_queue[0]->min_deadline);
647 }
648
649 #if GPR_ARCH_64
650 // TODO: sreek - Using c-style cast here. static_cast<> gives an error (on
651 // mac platforms complaining that gpr_atm* is (long *) while
652 // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
653 // safe since we know that both are pointer types and 64-bit wide
654 gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
655 g_shard_queue[0]->min_deadline);
656 #else
657 // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
658 // types (like grpc_millis). So all reads and writes to
659 // g_shared_mutables.min_timer are done under g_shared_mutables.mu
660 g_shared_mutables.min_timer = g_shard_queue[0]->min_deadline;
661 #endif
662 gpr_mu_unlock(&g_shared_mutables.mu);
663 gpr_spinlock_unlock(&g_shared_mutables.checker_mu);
664 }
665
666 GRPC_ERROR_UNREF(error);
667
668 return result;
669 }
670
timer_check(grpc_millis * next)671 static grpc_timer_check_result timer_check(grpc_millis* next) {
672 // prelude
673 grpc_millis now = grpc_core::ExecCtx::Get()->Now();
674
675 #if GPR_ARCH_64
676 /* fetch from a thread-local first: this avoids contention on a globally
677 mutable cacheline in the common case */
678 grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer);
679 #else
680 // On 32-bit systems, we currently do not have thread local support for 64-bit
681 // types. In this case, directly read from g_shared_mutables.min_timer.
682 // Also, note that on 32-bit systems, gpr_atm_no_barrier_store does not work
683 // on 64-bit types (like grpc_millis). So all reads and writes to
684 // g_shared_mutables.min_timer are done under g_shared_mutables.mu
685 gpr_mu_lock(&g_shared_mutables.mu);
686 grpc_millis min_timer = g_shared_mutables.min_timer;
687 gpr_mu_unlock(&g_shared_mutables.mu);
688 #endif
689
690 if (now < min_timer) {
691 if (next != nullptr) {
692 *next = GPR_MIN(*next, min_timer);
693 }
694 if (grpc_timer_check_trace.enabled()) {
695 gpr_log(GPR_INFO, "TIMER CHECK SKIP: now=%" PRId64 " min_timer=%" PRId64,
696 now, min_timer);
697 }
698 return GRPC_TIMERS_CHECKED_AND_EMPTY;
699 }
700
701 grpc_error* shutdown_error =
702 now != GRPC_MILLIS_INF_FUTURE
703 ? GRPC_ERROR_NONE
704 : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system");
705
706 // tracing
707 if (grpc_timer_check_trace.enabled()) {
708 char* next_str;
709 if (next == nullptr) {
710 next_str = gpr_strdup("NULL");
711 } else {
712 gpr_asprintf(&next_str, "%" PRId64, *next);
713 }
714 #if GPR_ARCH_64
715 gpr_log(GPR_INFO,
716 "TIMER CHECK BEGIN: now=%" PRId64 " next=%s tls_min=%" PRId64
717 " glob_min=%" PRId64,
718 now, next_str, min_timer,
719 static_cast<grpc_millis>(gpr_atm_no_barrier_load(
720 (gpr_atm*)(&g_shared_mutables.min_timer))));
721 #else
722 gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s min=%" PRId64,
723 now, next_str, min_timer);
724 #endif
725 gpr_free(next_str);
726 }
727 // actual code
728 grpc_timer_check_result r =
729 run_some_expired_timers(now, next, shutdown_error);
730 // tracing
731 if (grpc_timer_check_trace.enabled()) {
732 char* next_str;
733 if (next == nullptr) {
734 next_str = gpr_strdup("NULL");
735 } else {
736 gpr_asprintf(&next_str, "%" PRId64, *next);
737 }
738 gpr_log(GPR_INFO, "TIMER CHECK END: r=%d; next=%s", r, next_str);
739 gpr_free(next_str);
740 }
741 return r;
742 }
743
744 grpc_timer_vtable grpc_generic_timer_vtable = {
745 timer_init, timer_cancel, timer_check,
746 timer_list_init, timer_list_shutdown, timer_consume_kick};
747