1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #include <inttypes.h>
24
25 #include <string>
26
27 #include "absl/strings/str_cat.h"
28
29 #include "src/core/lib/iomgr/timer.h"
30
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/cpu.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/sync.h>
35
36 #include "src/core/lib/debug/trace.h"
37 #include "src/core/lib/gpr/spinlock.h"
38 #include "src/core/lib/gpr/tls.h"
39 #include "src/core/lib/gpr/useful.h"
40 #include "src/core/lib/iomgr/exec_ctx.h"
41 #include "src/core/lib/iomgr/time_averaged_stats.h"
42 #include "src/core/lib/iomgr/timer_heap.h"
43
44 #define INVALID_HEAP_INDEX 0xffffffffu
45
46 #define ADD_DEADLINE_SCALE 0.33
47 #define MIN_QUEUE_WINDOW_DURATION 0.01
48 #define MAX_QUEUE_WINDOW_DURATION 1
49
50 grpc_core::TraceFlag grpc_timer_trace(false, "timer");
51 grpc_core::TraceFlag grpc_timer_check_trace(false, "timer_check");
52
53 /* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
54 * deadlines earlier than 'queue_deadline_cap' are maintained in the heap and
55 * others are maintained in the list (unordered). This helps to keep the number
56 * of elements in the heap low.
57 *
58 * The 'queue_deadline_cap' gets recomputed periodically based on the timer
59 * stats maintained in 'stats' and the relevant timers are then moved from the
60 * 'list' to 'heap'.
61 */
62 struct timer_shard {
63 gpr_mu mu;
64 grpc_time_averaged_stats stats;
65 /* All and only timers with deadlines < this will be in the heap. */
66 grpc_millis queue_deadline_cap;
67 /* The deadline of the next timer due in this shard. */
68 grpc_millis min_deadline;
69 /* Index of this timer_shard in the g_shard_queue. */
70 uint32_t shard_queue_index;
71 /* This holds all timers with deadlines < queue_deadline_cap. Timers in this
72 list have the top bit of their deadline set to 0. */
73 grpc_timer_heap heap;
74 /* This holds timers whose deadline is >= queue_deadline_cap. */
75 grpc_timer list;
76 };
77 static size_t g_num_shards;
78
79 /* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
80 * is hashed to select the timer shard to add the timer to */
81 static timer_shard* g_shards;
82
83 /* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
84 * the deadline of the next timer in each shard).
85 * Access to this is protected by g_shared_mutables.mu */
86 static timer_shard** g_shard_queue;
87
88 #ifndef NDEBUG
89
90 /* == DEBUG ONLY: hash table for duplicate timer detection == */
91
92 #define NUM_HASH_BUCKETS 1009 /* Prime number close to 1000 */
93
94 static gpr_mu g_hash_mu[NUM_HASH_BUCKETS]; /* One mutex per bucket */
95 static grpc_timer* g_timer_ht[NUM_HASH_BUCKETS] = {nullptr};
96
init_timer_ht()97 static void init_timer_ht() {
98 for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
99 gpr_mu_init(&g_hash_mu[i]);
100 }
101 }
102
destroy_timer_ht()103 static void destroy_timer_ht() {
104 for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
105 gpr_mu_destroy(&g_hash_mu[i]);
106 }
107 }
108
is_in_ht(grpc_timer * t)109 static bool is_in_ht(grpc_timer* t) {
110 size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
111
112 gpr_mu_lock(&g_hash_mu[i]);
113 grpc_timer* p = g_timer_ht[i];
114 while (p != nullptr && p != t) {
115 p = p->hash_table_next;
116 }
117 gpr_mu_unlock(&g_hash_mu[i]);
118
119 return (p == t);
120 }
121
add_to_ht(grpc_timer * t)122 static void add_to_ht(grpc_timer* t) {
123 GPR_ASSERT(!t->hash_table_next);
124 size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
125
126 gpr_mu_lock(&g_hash_mu[i]);
127 grpc_timer* p = g_timer_ht[i];
128 while (p != nullptr && p != t) {
129 p = p->hash_table_next;
130 }
131
132 if (p == t) {
133 grpc_closure* c = t->closure;
134 gpr_log(GPR_ERROR,
135 "** Duplicate timer (%p) being added. Closure: (%p), created at: "
136 "(%s:%d), scheduled at: (%s:%d) **",
137 t, c, c->file_created, c->line_created, c->file_initiated,
138 c->line_initiated);
139 abort();
140 }
141
142 /* Timer not present in the bucket. Insert at head of the list */
143 t->hash_table_next = g_timer_ht[i];
144 g_timer_ht[i] = t;
145 gpr_mu_unlock(&g_hash_mu[i]);
146 }
147
remove_from_ht(grpc_timer * t)148 static void remove_from_ht(grpc_timer* t) {
149 size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
150 bool removed = false;
151
152 gpr_mu_lock(&g_hash_mu[i]);
153 if (g_timer_ht[i] == t) {
154 g_timer_ht[i] = g_timer_ht[i]->hash_table_next;
155 removed = true;
156 } else if (g_timer_ht[i] != nullptr) {
157 grpc_timer* p = g_timer_ht[i];
158 while (p->hash_table_next != nullptr && p->hash_table_next != t) {
159 p = p->hash_table_next;
160 }
161
162 if (p->hash_table_next == t) {
163 p->hash_table_next = t->hash_table_next;
164 removed = true;
165 }
166 }
167 gpr_mu_unlock(&g_hash_mu[i]);
168
169 if (!removed) {
170 grpc_closure* c = t->closure;
171 gpr_log(GPR_ERROR,
172 "** Removing timer (%p) that is not added to hash table. Closure "
173 "(%p), created at: (%s:%d), scheduled at: (%s:%d) **",
174 t, c, c->file_created, c->line_created, c->file_initiated,
175 c->line_initiated);
176 abort();
177 }
178
179 t->hash_table_next = nullptr;
180 }
181
182 /* If a timer is added to a timer shard (either heap or a list), it must
183 * be pending. A timer is added to hash table only-if it is added to the
184 * timer shard.
185 * Therefore, if timer->pending is false, it cannot be in hash table */
validate_non_pending_timer(grpc_timer * t)186 static void validate_non_pending_timer(grpc_timer* t) {
187 if (!t->pending && is_in_ht(t)) {
188 grpc_closure* c = t->closure;
189 gpr_log(GPR_ERROR,
190 "** gpr_timer_cancel() called on a non-pending timer (%p) which "
191 "is in the hash table. Closure: (%p), created at: (%s:%d), "
192 "scheduled at: (%s:%d) **",
193 t, c, c->file_created, c->line_created, c->file_initiated,
194 c->line_initiated);
195 abort();
196 }
197 }
198
199 #define INIT_TIMER_HASH_TABLE() init_timer_ht()
200 #define DESTROY_TIMER_HASH_TABLE() destroy_timer_ht()
201 #define ADD_TO_HASH_TABLE(t) add_to_ht((t))
202 #define REMOVE_FROM_HASH_TABLE(t) remove_from_ht((t))
203 #define VALIDATE_NON_PENDING_TIMER(t) validate_non_pending_timer((t))
204
205 #else
206
207 #define INIT_TIMER_HASH_TABLE()
208 #define DESTROY_TIMER_HASH_TABLE()
209 #define ADD_TO_HASH_TABLE(t)
210 #define REMOVE_FROM_HASH_TABLE(t)
211 #define VALIDATE_NON_PENDING_TIMER(t)
212
213 #endif
214
215 #if GPR_ARCH_64
216 /* NOTE: TODO(sreek) - Currently the thread local storage support in grpc is
217 for intptr_t which means on 32-bit machines it is not wide enough to hold
218 grpc_millis which is 64-bit. Adding thread local support for 64 bit values
219 is a lot of work for very little gain. So we are currently restricting this
220 optimization to only 64 bit machines */
221
222 /* Thread local variable that stores the deadline of the next timer the thread
223 * has last-seen. This is an optimization to prevent the thread from checking
224 * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
225 * an expensive operation) */
226 GPR_TLS_DECL(g_last_seen_min_timer);
227 #endif
228
229 struct shared_mutables {
230 /* The deadline of the next timer due across all timer shards */
231 grpc_millis min_timer;
232 /* Allow only one run_some_expired_timers at once */
233 gpr_spinlock checker_mu;
234 bool initialized;
235 /* Protects g_shard_queue (and the shared_mutables struct itself) */
236 gpr_mu mu;
237 } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
238
239 static struct shared_mutables g_shared_mutables;
240
saturating_add(grpc_millis a,grpc_millis b)241 static grpc_millis saturating_add(grpc_millis a, grpc_millis b) {
242 if (a > GRPC_MILLIS_INF_FUTURE - b) {
243 return GRPC_MILLIS_INF_FUTURE;
244 }
245 return a + b;
246 }
247
248 static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
249 grpc_millis* next,
250 grpc_error* error);
251
compute_min_deadline(timer_shard * shard)252 static grpc_millis compute_min_deadline(timer_shard* shard) {
253 return grpc_timer_heap_is_empty(&shard->heap)
254 ? saturating_add(shard->queue_deadline_cap, 1)
255 : grpc_timer_heap_top(&shard->heap)->deadline;
256 }
257
timer_list_init()258 static void timer_list_init() {
259 uint32_t i;
260
261 g_num_shards = GPR_CLAMP(2 * gpr_cpu_num_cores(), 1, 32);
262 g_shards =
263 static_cast<timer_shard*>(gpr_zalloc(g_num_shards * sizeof(*g_shards)));
264 g_shard_queue = static_cast<timer_shard**>(
265 gpr_zalloc(g_num_shards * sizeof(*g_shard_queue)));
266
267 g_shared_mutables.initialized = true;
268 g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
269 gpr_mu_init(&g_shared_mutables.mu);
270 g_shared_mutables.min_timer = grpc_core::ExecCtx::Get()->Now();
271
272 #if GPR_ARCH_64
273 gpr_tls_init(&g_last_seen_min_timer);
274 gpr_tls_set(&g_last_seen_min_timer, 0);
275 #endif
276
277 for (i = 0; i < g_num_shards; i++) {
278 timer_shard* shard = &g_shards[i];
279 gpr_mu_init(&shard->mu);
280 grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
281 0.5);
282 shard->queue_deadline_cap = g_shared_mutables.min_timer;
283 shard->shard_queue_index = i;
284 grpc_timer_heap_init(&shard->heap);
285 shard->list.next = shard->list.prev = &shard->list;
286 shard->min_deadline = compute_min_deadline(shard);
287 g_shard_queue[i] = shard;
288 }
289
290 INIT_TIMER_HASH_TABLE();
291 }
292
timer_list_shutdown()293 static void timer_list_shutdown() {
294 size_t i;
295 run_some_expired_timers(
296 GRPC_MILLIS_INF_FUTURE, nullptr,
297 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
298 for (i = 0; i < g_num_shards; i++) {
299 timer_shard* shard = &g_shards[i];
300 gpr_mu_destroy(&shard->mu);
301 grpc_timer_heap_destroy(&shard->heap);
302 }
303 gpr_mu_destroy(&g_shared_mutables.mu);
304
305 #if GPR_ARCH_64
306 gpr_tls_destroy(&g_last_seen_min_timer);
307 #endif
308
309 gpr_free(g_shards);
310 gpr_free(g_shard_queue);
311 g_shared_mutables.initialized = false;
312
313 DESTROY_TIMER_HASH_TABLE();
314 }
315
316 /* returns true if the first element in the list */
list_join(grpc_timer * head,grpc_timer * timer)317 static void list_join(grpc_timer* head, grpc_timer* timer) {
318 timer->next = head;
319 timer->prev = head->prev;
320 timer->next->prev = timer->prev->next = timer;
321 }
322
list_remove(grpc_timer * timer)323 static void list_remove(grpc_timer* timer) {
324 timer->next->prev = timer->prev;
325 timer->prev->next = timer->next;
326 }
327
swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index)328 static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
329 timer_shard* temp;
330 temp = g_shard_queue[first_shard_queue_index];
331 g_shard_queue[first_shard_queue_index] =
332 g_shard_queue[first_shard_queue_index + 1];
333 g_shard_queue[first_shard_queue_index + 1] = temp;
334 g_shard_queue[first_shard_queue_index]->shard_queue_index =
335 first_shard_queue_index;
336 g_shard_queue[first_shard_queue_index + 1]->shard_queue_index =
337 first_shard_queue_index + 1;
338 }
339
note_deadline_change(timer_shard * shard)340 static void note_deadline_change(timer_shard* shard) {
341 while (shard->shard_queue_index > 0 &&
342 shard->min_deadline <
343 g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
344 swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
345 }
346 while (shard->shard_queue_index < g_num_shards - 1 &&
347 shard->min_deadline >
348 g_shard_queue[shard->shard_queue_index + 1]->min_deadline) {
349 swap_adjacent_shards_in_queue(shard->shard_queue_index);
350 }
351 }
352
grpc_timer_init_unset(grpc_timer * timer)353 void grpc_timer_init_unset(grpc_timer* timer) { timer->pending = false; }
354
timer_init(grpc_timer * timer,grpc_millis deadline,grpc_closure * closure)355 static void timer_init(grpc_timer* timer, grpc_millis deadline,
356 grpc_closure* closure) {
357 int is_first_timer = 0;
358 timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
359 timer->closure = closure;
360 timer->deadline = deadline;
361
362 #ifndef NDEBUG
363 timer->hash_table_next = nullptr;
364 #endif
365
366 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
367 gpr_log(GPR_INFO, "TIMER %p: SET %" PRId64 " now %" PRId64 " call %p[%p]",
368 timer, deadline, grpc_core::ExecCtx::Get()->Now(), closure,
369 closure->cb);
370 }
371
372 if (!g_shared_mutables.initialized) {
373 timer->pending = false;
374 grpc_core::ExecCtx::Run(
375 DEBUG_LOCATION, timer->closure,
376 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
377 "Attempt to create timer before initialization"));
378 return;
379 }
380
381 gpr_mu_lock(&shard->mu);
382 timer->pending = true;
383 grpc_millis now = grpc_core::ExecCtx::Get()->Now();
384 if (deadline <= now) {
385 timer->pending = false;
386 grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, GRPC_ERROR_NONE);
387 gpr_mu_unlock(&shard->mu);
388 /* early out */
389 return;
390 }
391
392 grpc_time_averaged_stats_add_sample(
393 &shard->stats, static_cast<double>(deadline - now) / 1000.0);
394
395 ADD_TO_HASH_TABLE(timer);
396
397 if (deadline < shard->queue_deadline_cap) {
398 is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
399 } else {
400 timer->heap_index = INVALID_HEAP_INDEX;
401 list_join(&shard->list, timer);
402 }
403 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
404 gpr_log(GPR_INFO,
405 " .. add to shard %d with queue_deadline_cap=%" PRId64
406 " => is_first_timer=%s",
407 static_cast<int>(shard - g_shards), shard->queue_deadline_cap,
408 is_first_timer ? "true" : "false");
409 }
410 gpr_mu_unlock(&shard->mu);
411
412 /* Deadline may have decreased, we need to adjust the master queue. Note
413 that there is a potential racy unlocked region here. There could be a
414 reordering of multiple grpc_timer_init calls, at this point, but the < test
415 below should ensure that we err on the side of caution. There could
416 also be a race with grpc_timer_check, which might beat us to the lock. In
417 that case, it is possible that the timer that we added will have already
418 run by the time we hold the lock, but that too is a safe error.
419 Finally, it's possible that the grpc_timer_check that intervened failed to
420 trigger the new timer because the min_deadline hadn't yet been reduced.
421 In that case, the timer will simply have to wait for the next
422 grpc_timer_check. */
423 if (is_first_timer) {
424 gpr_mu_lock(&g_shared_mutables.mu);
425 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
426 gpr_log(GPR_INFO, " .. old shard min_deadline=%" PRId64,
427 shard->min_deadline);
428 }
429 if (deadline < shard->min_deadline) {
430 grpc_millis old_min_deadline = g_shard_queue[0]->min_deadline;
431 shard->min_deadline = deadline;
432 note_deadline_change(shard);
433 if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
434 #if GPR_ARCH_64
435 // TODO(sreek): Using c-style cast here. static_cast<> gives an error
436 // (on mac platforms complaining that gpr_atm* is (long *) while
437 // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
438 // safe since we know that both are pointer types and 64-bit wide.
439 gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
440 deadline);
441 #else
442 // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
443 // types (like grpc_millis). So all reads and writes to
444 // g_shared_mutables.min_timer varialbe under g_shared_mutables.mu
445 g_shared_mutables.min_timer = deadline;
446 #endif
447 grpc_kick_poller();
448 }
449 }
450 gpr_mu_unlock(&g_shared_mutables.mu);
451 }
452 }
453
timer_consume_kick(void)454 static void timer_consume_kick(void) {
455 #if GPR_ARCH_64
456 /* Force re-evaluation of last seen min */
457 gpr_tls_set(&g_last_seen_min_timer, 0);
458 #endif
459 }
460
timer_cancel(grpc_timer * timer)461 static void timer_cancel(grpc_timer* timer) {
462 if (!g_shared_mutables.initialized) {
463 /* must have already been cancelled, also the shard mutex is invalid */
464 return;
465 }
466
467 timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
468 gpr_mu_lock(&shard->mu);
469 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
470 gpr_log(GPR_INFO, "TIMER %p: CANCEL pending=%s", timer,
471 timer->pending ? "true" : "false");
472 }
473
474 if (timer->pending) {
475 REMOVE_FROM_HASH_TABLE(timer);
476
477 grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure,
478 GRPC_ERROR_CANCELLED);
479 timer->pending = false;
480 if (timer->heap_index == INVALID_HEAP_INDEX) {
481 list_remove(timer);
482 } else {
483 grpc_timer_heap_remove(&shard->heap, timer);
484 }
485 } else {
486 VALIDATE_NON_PENDING_TIMER(timer);
487 }
488 gpr_mu_unlock(&shard->mu);
489 }
490
491 /* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
492 all relevant timers in shard->list (i.e timers with deadlines earlier than
493 'queue_deadline_cap') into into shard->heap.
494 Returns 'true' if shard->heap has at least ONE element
495 REQUIRES: shard->mu locked */
refill_heap(timer_shard * shard,grpc_millis now)496 static bool refill_heap(timer_shard* shard, grpc_millis now) {
497 /* Compute the new queue window width and bound by the limits: */
498 double computed_deadline_delta =
499 grpc_time_averaged_stats_update_average(&shard->stats) *
500 ADD_DEADLINE_SCALE;
501 double deadline_delta =
502 GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION,
503 MAX_QUEUE_WINDOW_DURATION);
504 grpc_timer *timer, *next;
505
506 /* Compute the new cap and put all timers under it into the queue: */
507 shard->queue_deadline_cap =
508 saturating_add(GPR_MAX(now, shard->queue_deadline_cap),
509 static_cast<grpc_millis>(deadline_delta * 1000.0));
510
511 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
512 gpr_log(GPR_INFO, " .. shard[%d]->queue_deadline_cap --> %" PRId64,
513 static_cast<int>(shard - g_shards), shard->queue_deadline_cap);
514 }
515 for (timer = shard->list.next; timer != &shard->list; timer = next) {
516 next = timer->next;
517
518 if (timer->deadline < shard->queue_deadline_cap) {
519 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
520 gpr_log(GPR_INFO, " .. add timer with deadline %" PRId64 " to heap",
521 timer->deadline);
522 }
523 list_remove(timer);
524 grpc_timer_heap_add(&shard->heap, timer);
525 }
526 }
527 return !grpc_timer_heap_is_empty(&shard->heap);
528 }
529
530 /* This pops the next non-cancelled timer with deadline <= now from the
531 queue, or returns NULL if there isn't one.
532 REQUIRES: shard->mu locked */
pop_one(timer_shard * shard,grpc_millis now)533 static grpc_timer* pop_one(timer_shard* shard, grpc_millis now) {
534 grpc_timer* timer;
535 for (;;) {
536 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
537 gpr_log(GPR_INFO, " .. shard[%d]: heap_empty=%s",
538 static_cast<int>(shard - g_shards),
539 grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false");
540 }
541 if (grpc_timer_heap_is_empty(&shard->heap)) {
542 if (now < shard->queue_deadline_cap) return nullptr;
543 if (!refill_heap(shard, now)) return nullptr;
544 }
545 timer = grpc_timer_heap_top(&shard->heap);
546 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
547 gpr_log(GPR_INFO,
548 " .. check top timer deadline=%" PRId64 " now=%" PRId64,
549 timer->deadline, now);
550 }
551 if (timer->deadline > now) return nullptr;
552 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_trace)) {
553 gpr_log(GPR_INFO, "TIMER %p: FIRE %" PRId64 "ms late", timer,
554 now - timer->deadline);
555 }
556 timer->pending = false;
557 grpc_timer_heap_pop(&shard->heap);
558 return timer;
559 }
560 }
561
562 /* REQUIRES: shard->mu unlocked */
pop_timers(timer_shard * shard,grpc_millis now,grpc_millis * new_min_deadline,grpc_error * error)563 static size_t pop_timers(timer_shard* shard, grpc_millis now,
564 grpc_millis* new_min_deadline, grpc_error* error) {
565 size_t n = 0;
566 grpc_timer* timer;
567 gpr_mu_lock(&shard->mu);
568 while ((timer = pop_one(shard, now))) {
569 REMOVE_FROM_HASH_TABLE(timer);
570 grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure,
571 GRPC_ERROR_REF(error));
572 n++;
573 }
574 *new_min_deadline = compute_min_deadline(shard);
575 gpr_mu_unlock(&shard->mu);
576 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
577 gpr_log(GPR_INFO, " .. shard[%d] popped %" PRIdPTR,
578 static_cast<int>(shard - g_shards), n);
579 }
580 return n;
581 }
582
run_some_expired_timers(grpc_millis now,grpc_millis * next,grpc_error * error)583 static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
584 grpc_millis* next,
585 grpc_error* error) {
586 grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED;
587
588 #if GPR_ARCH_64
589 // TODO(sreek): Using c-style cast here. static_cast<> gives an error (on
590 // mac platforms complaining that gpr_atm* is (long *) while
591 // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
592 // safe since we know that both are pointer types and 64-bit wide
593 grpc_millis min_timer = static_cast<grpc_millis>(
594 gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer)));
595 gpr_tls_set(&g_last_seen_min_timer, min_timer);
596 #else
597 // On 32-bit systems, gpr_atm_no_barrier_load does not work on 64-bit types
598 // (like grpc_millis). So all reads and writes to g_shared_mutables.min_timer
599 // are done under g_shared_mutables.mu
600 gpr_mu_lock(&g_shared_mutables.mu);
601 grpc_millis min_timer = g_shared_mutables.min_timer;
602 gpr_mu_unlock(&g_shared_mutables.mu);
603 #endif
604 if (now < min_timer) {
605 if (next != nullptr) *next = GPR_MIN(*next, min_timer);
606 return GRPC_TIMERS_CHECKED_AND_EMPTY;
607 }
608
609 if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) {
610 gpr_mu_lock(&g_shared_mutables.mu);
611 result = GRPC_TIMERS_CHECKED_AND_EMPTY;
612
613 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
614 gpr_log(GPR_INFO, " .. shard[%d]->min_deadline = %" PRId64,
615 static_cast<int>(g_shard_queue[0] - g_shards),
616 g_shard_queue[0]->min_deadline);
617 }
618
619 while (g_shard_queue[0]->min_deadline < now ||
620 (now != GRPC_MILLIS_INF_FUTURE &&
621 g_shard_queue[0]->min_deadline == now)) {
622 grpc_millis new_min_deadline;
623
624 /* For efficiency, we pop as many available timers as we can from the
625 shard. This may violate perfect timer deadline ordering, but that
626 shouldn't be a big deal because we don't make ordering guarantees. */
627 if (pop_timers(g_shard_queue[0], now, &new_min_deadline, error) > 0) {
628 result = GRPC_TIMERS_FIRED;
629 }
630
631 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
632 gpr_log(GPR_INFO,
633 " .. result --> %d"
634 ", shard[%d]->min_deadline %" PRId64 " --> %" PRId64
635 ", now=%" PRId64,
636 result, static_cast<int>(g_shard_queue[0] - g_shards),
637 g_shard_queue[0]->min_deadline, new_min_deadline, now);
638 }
639
640 /* An grpc_timer_init() on the shard could intervene here, adding a new
641 timer that is earlier than new_min_deadline. However,
642 grpc_timer_init() will block on the master_lock before it can call
643 set_min_deadline, so this one will complete first and then the Addtimer
644 will reduce the min_deadline (perhaps unnecessarily). */
645 g_shard_queue[0]->min_deadline = new_min_deadline;
646 note_deadline_change(g_shard_queue[0]);
647 }
648
649 if (next) {
650 *next = GPR_MIN(*next, g_shard_queue[0]->min_deadline);
651 }
652
653 #if GPR_ARCH_64
654 // TODO(sreek): Using c-style cast here. static_cast<> gives an error (on
655 // mac platforms complaining that gpr_atm* is (long *) while
656 // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
657 // safe since we know that both are pointer types and 64-bit wide
658 gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
659 g_shard_queue[0]->min_deadline);
660 #else
661 // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
662 // types (like grpc_millis). So all reads and writes to
663 // g_shared_mutables.min_timer are done under g_shared_mutables.mu
664 g_shared_mutables.min_timer = g_shard_queue[0]->min_deadline;
665 #endif
666 gpr_mu_unlock(&g_shared_mutables.mu);
667 gpr_spinlock_unlock(&g_shared_mutables.checker_mu);
668 }
669
670 GRPC_ERROR_UNREF(error);
671
672 return result;
673 }
674
timer_check(grpc_millis * next)675 static grpc_timer_check_result timer_check(grpc_millis* next) {
676 // prelude
677 grpc_millis now = grpc_core::ExecCtx::Get()->Now();
678
679 #if GPR_ARCH_64
680 /* fetch from a thread-local first: this avoids contention on a globally
681 mutable cacheline in the common case */
682 grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer);
683 #else
684 // On 32-bit systems, we currently do not have thread local support for 64-bit
685 // types. In this case, directly read from g_shared_mutables.min_timer.
686 // Also, note that on 32-bit systems, gpr_atm_no_barrier_store does not work
687 // on 64-bit types (like grpc_millis). So all reads and writes to
688 // g_shared_mutables.min_timer are done under g_shared_mutables.mu
689 gpr_mu_lock(&g_shared_mutables.mu);
690 grpc_millis min_timer = g_shared_mutables.min_timer;
691 gpr_mu_unlock(&g_shared_mutables.mu);
692 #endif
693
694 if (now < min_timer) {
695 if (next != nullptr) {
696 *next = GPR_MIN(*next, min_timer);
697 }
698 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
699 gpr_log(GPR_INFO, "TIMER CHECK SKIP: now=%" PRId64 " min_timer=%" PRId64,
700 now, min_timer);
701 }
702 return GRPC_TIMERS_CHECKED_AND_EMPTY;
703 }
704
705 grpc_error* shutdown_error =
706 now != GRPC_MILLIS_INF_FUTURE
707 ? GRPC_ERROR_NONE
708 : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system");
709
710 // tracing
711 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
712 std::string next_str;
713 if (next == nullptr) {
714 next_str = "NULL";
715 } else {
716 next_str = absl::StrCat(*next);
717 }
718 #if GPR_ARCH_64
719 gpr_log(GPR_INFO,
720 "TIMER CHECK BEGIN: now=%" PRId64 " next=%s tls_min=%" PRId64
721 " glob_min=%" PRId64,
722 now, next_str.c_str(), min_timer,
723 static_cast<grpc_millis>(gpr_atm_no_barrier_load(
724 (gpr_atm*)(&g_shared_mutables.min_timer))));
725 #else
726 gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s min=%" PRId64,
727 now, next_str.c_str(), min_timer);
728 #endif
729 }
730 // actual code
731 grpc_timer_check_result r =
732 run_some_expired_timers(now, next, shutdown_error);
733 // tracing
734 if (GRPC_TRACE_FLAG_ENABLED(grpc_timer_check_trace)) {
735 std::string next_str;
736 if (next == nullptr) {
737 next_str = "NULL";
738 } else {
739 next_str = absl::StrCat(*next);
740 }
741 gpr_log(GPR_INFO, "TIMER CHECK END: r=%d; next=%s", r, next_str.c_str());
742 }
743 return r;
744 }
745
746 grpc_timer_vtable grpc_generic_timer_vtable = {
747 timer_init, timer_cancel, timer_check,
748 timer_list_init, timer_list_shutdown, timer_consume_kick};
749