1 use crate::runtime::{Config, MetricsBatch, WorkerMetrics}; 2 3 use std::cmp; 4 use std::time::{Duration, Instant}; 5 6 /// Per-worker statistics. This is used for both tuning the scheduler and 7 /// reporting runtime-level metrics/stats. 8 pub(crate) struct Stats { 9 /// The metrics batch used to report runtime-level metrics/stats to the 10 /// user. 11 batch: MetricsBatch, 12 13 /// Exponentially-weighted moving average of time spent polling scheduled a 14 /// task. 15 /// 16 /// Tracked in nanoseconds, stored as a f64 since that is what we use with 17 /// the EWMA calculations 18 task_poll_time_ewma: f64, 19 } 20 21 /// Transient state 22 pub(crate) struct Ephemeral { 23 /// Instant at which work last resumed (continued after park). 24 /// 25 /// This duplicates the value stored in `MetricsBatch`. We will unify 26 /// `Stats` and `MetricsBatch` when we stabilize metrics. 27 processing_scheduled_tasks_started_at: Instant, 28 29 /// Number of tasks polled in the batch of scheduled tasks 30 tasks_polled_in_batch: usize, 31 32 /// Used to ensure calls to start / stop batch are paired 33 #[cfg(debug_assertions)] 34 batch_started: bool, 35 } 36 37 impl Ephemeral { new() -> Ephemeral38 pub(crate) fn new() -> Ephemeral { 39 Ephemeral { 40 processing_scheduled_tasks_started_at: Instant::now(), 41 tasks_polled_in_batch: 0, 42 #[cfg(debug_assertions)] 43 batch_started: false, 44 } 45 } 46 } 47 48 /// How to weigh each individual poll time, value is plucked from thin air. 49 const TASK_POLL_TIME_EWMA_ALPHA: f64 = 0.1; 50 51 /// Ideally, we wouldn't go above this, value is plucked from thin air. 52 const TARGET_GLOBAL_QUEUE_INTERVAL: f64 = Duration::from_micros(200).as_nanos() as f64; 53 54 /// Max value for the global queue interval. This is 2x the previous default 55 const MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 127; 56 57 /// This is the previous default 58 const TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 61; 59 60 impl Stats { 61 pub(crate) const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 62 TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL; 63 new(worker_metrics: &WorkerMetrics) -> Stats64 pub(crate) fn new(worker_metrics: &WorkerMetrics) -> Stats { 65 // Seed the value with what we hope to see. 66 let task_poll_time_ewma = 67 TARGET_GLOBAL_QUEUE_INTERVAL / TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL as f64; 68 69 Stats { 70 batch: MetricsBatch::new(worker_metrics), 71 task_poll_time_ewma, 72 } 73 } 74 tuned_global_queue_interval(&self, config: &Config) -> u3275 pub(crate) fn tuned_global_queue_interval(&self, config: &Config) -> u32 { 76 // If an interval is explicitly set, don't tune. 77 if let Some(configured) = config.global_queue_interval { 78 return configured; 79 } 80 81 // As of Rust 1.45, casts from f64 -> u32 are saturating, which is fine here. 82 let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32; 83 84 cmp::max( 85 // We don't want to return less than 2 as that would result in the 86 // global queue always getting checked first. 87 2, 88 cmp::min( 89 MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL, 90 tasks_per_interval, 91 ), 92 ) 93 } 94 submit(&mut self, to: &WorkerMetrics)95 pub(crate) fn submit(&mut self, to: &WorkerMetrics) { 96 self.batch.submit(to, self.task_poll_time_ewma as u64); 97 } 98 about_to_park(&mut self)99 pub(crate) fn about_to_park(&mut self) { 100 self.batch.about_to_park(); 101 } 102 inc_local_schedule_count(&mut self)103 pub(crate) fn inc_local_schedule_count(&mut self) { 104 self.batch.inc_local_schedule_count(); 105 } 106 start_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral)107 pub(crate) fn start_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) { 108 self.batch.start_processing_scheduled_tasks(); 109 110 #[cfg(debug_assertions)] 111 { 112 debug_assert!(!ephemeral.batch_started); 113 ephemeral.batch_started = true; 114 } 115 116 ephemeral.processing_scheduled_tasks_started_at = Instant::now(); 117 ephemeral.tasks_polled_in_batch = 0; 118 } 119 end_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral)120 pub(crate) fn end_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) { 121 self.batch.end_processing_scheduled_tasks(); 122 123 #[cfg(debug_assertions)] 124 { 125 debug_assert!(ephemeral.batch_started); 126 ephemeral.batch_started = false; 127 } 128 129 // Update the EWMA task poll time 130 if ephemeral.tasks_polled_in_batch > 0 { 131 let now = Instant::now(); 132 133 // If we "overflow" this conversion, we have bigger problems than 134 // slightly off stats. 135 let elapsed = (now - ephemeral.processing_scheduled_tasks_started_at).as_nanos() as f64; 136 let num_polls = ephemeral.tasks_polled_in_batch as f64; 137 138 // Calculate the mean poll duration for a single task in the batch 139 let mean_poll_duration = elapsed / num_polls; 140 141 // Compute the alpha weighted by the number of tasks polled this batch. 142 let weighted_alpha = 1.0 - (1.0 - TASK_POLL_TIME_EWMA_ALPHA).powf(num_polls); 143 144 // Now compute the new weighted average task poll time. 145 self.task_poll_time_ewma = weighted_alpha * mean_poll_duration 146 + (1.0 - weighted_alpha) * self.task_poll_time_ewma; 147 } 148 } 149 start_poll(&mut self, ephemeral: &mut Ephemeral)150 pub(crate) fn start_poll(&mut self, ephemeral: &mut Ephemeral) { 151 self.batch.start_poll(); 152 153 ephemeral.tasks_polled_in_batch += 1; 154 } 155 end_poll(&mut self)156 pub(crate) fn end_poll(&mut self) { 157 self.batch.end_poll(); 158 } 159 incr_steal_count(&mut self, by: u16)160 pub(crate) fn incr_steal_count(&mut self, by: u16) { 161 self.batch.incr_steal_count(by); 162 } 163 incr_steal_operations(&mut self)164 pub(crate) fn incr_steal_operations(&mut self) { 165 self.batch.incr_steal_operations(); 166 } 167 incr_overflow_count(&mut self)168 pub(crate) fn incr_overflow_count(&mut self) { 169 self.batch.incr_overflow_count(); 170 } 171 } 172