• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::runtime::metrics::{HistogramBatch, WorkerMetrics};
2 
3 use std::sync::atomic::Ordering::Relaxed;
4 use std::time::{Duration, Instant};
5 
6 pub(crate) struct MetricsBatch {
7     /// Number of times the worker parked.
8     park_count: u64,
9 
10     /// Number of times the worker woke w/o doing work.
11     noop_count: u64,
12 
13     /// Number of tasks stolen.
14     steal_count: u64,
15 
16     /// Number of times tasks where stolen.
17     steal_operations: u64,
18 
19     /// Number of tasks that were polled by the worker.
20     poll_count: u64,
21 
22     /// Number of tasks polled when the worker entered park. This is used to
23     /// track the noop count.
24     poll_count_on_last_park: u64,
25 
26     /// Number of tasks that were scheduled locally on this worker.
27     local_schedule_count: u64,
28 
29     /// Number of tasks moved to the global queue to make space in the local
30     /// queue
31     overflow_count: u64,
32 
33     /// The total busy duration in nanoseconds.
34     busy_duration_total: u64,
35 
36     /// Instant at which work last resumed (continued after park).
37     processing_scheduled_tasks_started_at: Instant,
38 
39     /// If `Some`, tracks poll times in nanoseconds
40     poll_timer: Option<PollTimer>,
41 }
42 
43 struct PollTimer {
44     /// Histogram of poll counts within each band.
45     poll_counts: HistogramBatch,
46 
47     /// Instant when the most recent task started polling.
48     poll_started_at: Instant,
49 }
50 
51 impl MetricsBatch {
new(worker_metrics: &WorkerMetrics) -> MetricsBatch52     pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch {
53         let now = Instant::now();
54 
55         MetricsBatch {
56             park_count: 0,
57             noop_count: 0,
58             steal_count: 0,
59             steal_operations: 0,
60             poll_count: 0,
61             poll_count_on_last_park: 0,
62             local_schedule_count: 0,
63             overflow_count: 0,
64             busy_duration_total: 0,
65             processing_scheduled_tasks_started_at: now,
66             poll_timer: worker_metrics
67                 .poll_count_histogram
68                 .as_ref()
69                 .map(|worker_poll_counts| PollTimer {
70                     poll_counts: HistogramBatch::from_histogram(worker_poll_counts),
71                     poll_started_at: now,
72                 }),
73         }
74     }
75 
submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64)76     pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
77         worker.mean_poll_time.store(mean_poll_time, Relaxed);
78         worker.park_count.store(self.park_count, Relaxed);
79         worker.noop_count.store(self.noop_count, Relaxed);
80         worker.steal_count.store(self.steal_count, Relaxed);
81         worker
82             .steal_operations
83             .store(self.steal_operations, Relaxed);
84         worker.poll_count.store(self.poll_count, Relaxed);
85 
86         worker
87             .busy_duration_total
88             .store(self.busy_duration_total, Relaxed);
89 
90         worker
91             .local_schedule_count
92             .store(self.local_schedule_count, Relaxed);
93         worker.overflow_count.store(self.overflow_count, Relaxed);
94 
95         if let Some(poll_timer) = &self.poll_timer {
96             let dst = worker.poll_count_histogram.as_ref().unwrap();
97             poll_timer.poll_counts.submit(dst);
98         }
99     }
100 
101     /// The worker is about to park.
about_to_park(&mut self)102     pub(crate) fn about_to_park(&mut self) {
103         self.park_count += 1;
104 
105         if self.poll_count_on_last_park == self.poll_count {
106             self.noop_count += 1;
107         } else {
108             self.poll_count_on_last_park = self.poll_count;
109         }
110     }
111 
112     /// Start processing a batch of tasks
start_processing_scheduled_tasks(&mut self)113     pub(crate) fn start_processing_scheduled_tasks(&mut self) {
114         self.processing_scheduled_tasks_started_at = Instant::now();
115     }
116 
117     /// Stop processing a batch of tasks
end_processing_scheduled_tasks(&mut self)118     pub(crate) fn end_processing_scheduled_tasks(&mut self) {
119         let busy_duration = self.processing_scheduled_tasks_started_at.elapsed();
120         self.busy_duration_total += duration_as_u64(busy_duration);
121     }
122 
123     /// Start polling an individual task
start_poll(&mut self)124     pub(crate) fn start_poll(&mut self) {
125         self.poll_count += 1;
126 
127         if let Some(poll_timer) = &mut self.poll_timer {
128             poll_timer.poll_started_at = Instant::now();
129         }
130     }
131 
132     /// Stop polling an individual task
end_poll(&mut self)133     pub(crate) fn end_poll(&mut self) {
134         if let Some(poll_timer) = &mut self.poll_timer {
135             let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed());
136             poll_timer.poll_counts.measure(elapsed, 1);
137         }
138     }
139 
inc_local_schedule_count(&mut self)140     pub(crate) fn inc_local_schedule_count(&mut self) {
141         self.local_schedule_count += 1;
142     }
143 }
144 
145 cfg_rt_multi_thread! {
146     impl MetricsBatch {
147         pub(crate) fn incr_steal_count(&mut self, by: u16) {
148             self.steal_count += by as u64;
149         }
150 
151         pub(crate) fn incr_steal_operations(&mut self) {
152             self.steal_operations += 1;
153         }
154 
155         pub(crate) fn incr_overflow_count(&mut self) {
156             self.overflow_count += 1;
157         }
158     }
159 }
160 
duration_as_u64(dur: Duration) -> u64161 fn duration_as_u64(dur: Duration) -> u64 {
162     u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
163 }
164