1 use crate::runtime::metrics::{HistogramBatch, WorkerMetrics};
2
3 use std::sync::atomic::Ordering::Relaxed;
4 use std::time::{Duration, Instant};
5
6 pub(crate) struct MetricsBatch {
7 /// Number of times the worker parked.
8 park_count: u64,
9
10 /// Number of times the worker woke w/o doing work.
11 noop_count: u64,
12
13 /// Number of tasks stolen.
14 steal_count: u64,
15
16 /// Number of times tasks where stolen.
17 steal_operations: u64,
18
19 /// Number of tasks that were polled by the worker.
20 poll_count: u64,
21
22 /// Number of tasks polled when the worker entered park. This is used to
23 /// track the noop count.
24 poll_count_on_last_park: u64,
25
26 /// Number of tasks that were scheduled locally on this worker.
27 local_schedule_count: u64,
28
29 /// Number of tasks moved to the global queue to make space in the local
30 /// queue
31 overflow_count: u64,
32
33 /// The total busy duration in nanoseconds.
34 busy_duration_total: u64,
35
36 /// Instant at which work last resumed (continued after park).
37 processing_scheduled_tasks_started_at: Instant,
38
39 /// If `Some`, tracks poll times in nanoseconds
40 poll_timer: Option<PollTimer>,
41 }
42
43 struct PollTimer {
44 /// Histogram of poll counts within each band.
45 poll_counts: HistogramBatch,
46
47 /// Instant when the most recent task started polling.
48 poll_started_at: Instant,
49 }
50
51 impl MetricsBatch {
new(worker_metrics: &WorkerMetrics) -> MetricsBatch52 pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch {
53 let now = Instant::now();
54
55 MetricsBatch {
56 park_count: 0,
57 noop_count: 0,
58 steal_count: 0,
59 steal_operations: 0,
60 poll_count: 0,
61 poll_count_on_last_park: 0,
62 local_schedule_count: 0,
63 overflow_count: 0,
64 busy_duration_total: 0,
65 processing_scheduled_tasks_started_at: now,
66 poll_timer: worker_metrics
67 .poll_count_histogram
68 .as_ref()
69 .map(|worker_poll_counts| PollTimer {
70 poll_counts: HistogramBatch::from_histogram(worker_poll_counts),
71 poll_started_at: now,
72 }),
73 }
74 }
75
submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64)76 pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
77 worker.mean_poll_time.store(mean_poll_time, Relaxed);
78 worker.park_count.store(self.park_count, Relaxed);
79 worker.noop_count.store(self.noop_count, Relaxed);
80 worker.steal_count.store(self.steal_count, Relaxed);
81 worker
82 .steal_operations
83 .store(self.steal_operations, Relaxed);
84 worker.poll_count.store(self.poll_count, Relaxed);
85
86 worker
87 .busy_duration_total
88 .store(self.busy_duration_total, Relaxed);
89
90 worker
91 .local_schedule_count
92 .store(self.local_schedule_count, Relaxed);
93 worker.overflow_count.store(self.overflow_count, Relaxed);
94
95 if let Some(poll_timer) = &self.poll_timer {
96 let dst = worker.poll_count_histogram.as_ref().unwrap();
97 poll_timer.poll_counts.submit(dst);
98 }
99 }
100
101 /// The worker is about to park.
about_to_park(&mut self)102 pub(crate) fn about_to_park(&mut self) {
103 self.park_count += 1;
104
105 if self.poll_count_on_last_park == self.poll_count {
106 self.noop_count += 1;
107 } else {
108 self.poll_count_on_last_park = self.poll_count;
109 }
110 }
111
112 /// Start processing a batch of tasks
start_processing_scheduled_tasks(&mut self)113 pub(crate) fn start_processing_scheduled_tasks(&mut self) {
114 self.processing_scheduled_tasks_started_at = Instant::now();
115 }
116
117 /// Stop processing a batch of tasks
end_processing_scheduled_tasks(&mut self)118 pub(crate) fn end_processing_scheduled_tasks(&mut self) {
119 let busy_duration = self.processing_scheduled_tasks_started_at.elapsed();
120 self.busy_duration_total += duration_as_u64(busy_duration);
121 }
122
123 /// Start polling an individual task
start_poll(&mut self)124 pub(crate) fn start_poll(&mut self) {
125 self.poll_count += 1;
126
127 if let Some(poll_timer) = &mut self.poll_timer {
128 poll_timer.poll_started_at = Instant::now();
129 }
130 }
131
132 /// Stop polling an individual task
end_poll(&mut self)133 pub(crate) fn end_poll(&mut self) {
134 if let Some(poll_timer) = &mut self.poll_timer {
135 let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed());
136 poll_timer.poll_counts.measure(elapsed, 1);
137 }
138 }
139
inc_local_schedule_count(&mut self)140 pub(crate) fn inc_local_schedule_count(&mut self) {
141 self.local_schedule_count += 1;
142 }
143 }
144
145 cfg_rt_multi_thread! {
146 impl MetricsBatch {
147 pub(crate) fn incr_steal_count(&mut self, by: u16) {
148 self.steal_count += by as u64;
149 }
150
151 pub(crate) fn incr_steal_operations(&mut self) {
152 self.steal_operations += 1;
153 }
154
155 pub(crate) fn incr_overflow_count(&mut self) {
156 self.overflow_count += 1;
157 }
158 }
159 }
160
duration_as_u64(dur: Duration) -> u64161 fn duration_as_u64(dur: Duration) -> u64 {
162 u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
163 }
164