1 #![warn(rust_2018_idioms)]
2 #![cfg(all(feature = "full", tokio_unstable, not(target_os = "wasi")))]
3
4 use std::future::Future;
5 use std::sync::{Arc, Mutex};
6 use std::task::Poll;
7 use tokio::macros::support::poll_fn;
8
9 use tokio::runtime::Runtime;
10 use tokio::task::consume_budget;
11 use tokio::time::{self, Duration};
12
13 #[test]
num_workers()14 fn num_workers() {
15 let rt = current_thread();
16 assert_eq!(1, rt.metrics().num_workers());
17
18 let rt = threaded();
19 assert_eq!(2, rt.metrics().num_workers());
20 }
21
22 #[test]
num_blocking_threads()23 fn num_blocking_threads() {
24 let rt = current_thread();
25 assert_eq!(0, rt.metrics().num_blocking_threads());
26 let _ = rt.block_on(rt.spawn_blocking(move || {}));
27 assert_eq!(1, rt.metrics().num_blocking_threads());
28 }
29
30 #[test]
num_idle_blocking_threads()31 fn num_idle_blocking_threads() {
32 let rt = current_thread();
33 assert_eq!(0, rt.metrics().num_idle_blocking_threads());
34 let _ = rt.block_on(rt.spawn_blocking(move || {}));
35 rt.block_on(async {
36 time::sleep(Duration::from_millis(5)).await;
37 });
38
39 // We need to wait until the blocking thread has become idle. Usually 5ms is
40 // enough for this to happen, but not always. When it isn't enough, sleep
41 // for another second. We don't always wait for a whole second since we want
42 // the test suite to finish quickly.
43 //
44 // Note that the timeout for idle threads to be killed is 10 seconds.
45 if 0 == rt.metrics().num_idle_blocking_threads() {
46 rt.block_on(async {
47 time::sleep(Duration::from_secs(1)).await;
48 });
49 }
50
51 assert_eq!(1, rt.metrics().num_idle_blocking_threads());
52 }
53
54 #[test]
blocking_queue_depth()55 fn blocking_queue_depth() {
56 let rt = tokio::runtime::Builder::new_current_thread()
57 .enable_all()
58 .max_blocking_threads(1)
59 .build()
60 .unwrap();
61
62 assert_eq!(0, rt.metrics().blocking_queue_depth());
63
64 let ready = Arc::new(Mutex::new(()));
65 let guard = ready.lock().unwrap();
66
67 let ready_cloned = ready.clone();
68 let wait_until_ready = move || {
69 let _unused = ready_cloned.lock().unwrap();
70 };
71
72 let h1 = rt.spawn_blocking(wait_until_ready.clone());
73 let h2 = rt.spawn_blocking(wait_until_ready);
74 assert!(rt.metrics().blocking_queue_depth() > 0);
75
76 drop(guard);
77
78 let _ = rt.block_on(h1);
79 let _ = rt.block_on(h2);
80
81 assert_eq!(0, rt.metrics().blocking_queue_depth());
82 }
83
84 #[test]
active_tasks_count()85 fn active_tasks_count() {
86 let rt = current_thread();
87 let metrics = rt.metrics();
88 assert_eq!(0, metrics.active_tasks_count());
89 rt.spawn(async move {
90 assert_eq!(1, metrics.active_tasks_count());
91 });
92
93 let rt = threaded();
94 let metrics = rt.metrics();
95 assert_eq!(0, metrics.active_tasks_count());
96 rt.spawn(async move {
97 assert_eq!(1, metrics.active_tasks_count());
98 });
99 }
100
101 #[test]
remote_schedule_count()102 fn remote_schedule_count() {
103 use std::thread;
104
105 let rt = current_thread();
106 let handle = rt.handle().clone();
107 let task = thread::spawn(move || {
108 handle.spawn(async {
109 // DO nothing
110 })
111 })
112 .join()
113 .unwrap();
114
115 rt.block_on(task).unwrap();
116
117 assert_eq!(1, rt.metrics().remote_schedule_count());
118
119 let rt = threaded();
120 let handle = rt.handle().clone();
121 let task = thread::spawn(move || {
122 handle.spawn(async {
123 // DO nothing
124 })
125 })
126 .join()
127 .unwrap();
128
129 rt.block_on(task).unwrap();
130
131 assert_eq!(1, rt.metrics().remote_schedule_count());
132 }
133
134 #[test]
worker_park_count()135 fn worker_park_count() {
136 let rt = current_thread();
137 let metrics = rt.metrics();
138 rt.block_on(async {
139 time::sleep(Duration::from_millis(1)).await;
140 });
141 drop(rt);
142 assert!(1 <= metrics.worker_park_count(0));
143
144 let rt = threaded();
145 let metrics = rt.metrics();
146 rt.block_on(async {
147 time::sleep(Duration::from_millis(1)).await;
148 });
149 drop(rt);
150 assert!(1 <= metrics.worker_park_count(0));
151 assert!(1 <= metrics.worker_park_count(1));
152 }
153
154 #[test]
worker_noop_count()155 fn worker_noop_count() {
156 // There isn't really a great way to generate no-op parks as they happen as
157 // false-positive events under concurrency.
158
159 let rt = current_thread();
160 let metrics = rt.metrics();
161 rt.block_on(async {
162 time::sleep(Duration::from_millis(1)).await;
163 });
164 drop(rt);
165 assert!(0 < metrics.worker_noop_count(0));
166
167 let rt = threaded();
168 let metrics = rt.metrics();
169 rt.block_on(async {
170 time::sleep(Duration::from_millis(1)).await;
171 });
172 drop(rt);
173 assert!(0 < metrics.worker_noop_count(0));
174 assert!(0 < metrics.worker_noop_count(1));
175 }
176
177 #[test]
worker_steal_count()178 fn worker_steal_count() {
179 // This metric only applies to the multi-threaded runtime.
180 //
181 // We use a blocking channel to backup one worker thread.
182 use std::sync::mpsc::channel;
183
184 let rt = threaded();
185 let metrics = rt.metrics();
186
187 rt.block_on(async {
188 let (tx, rx) = channel();
189
190 // Move to the runtime.
191 tokio::spawn(async move {
192 // Spawn the task that sends to the channel
193 tokio::spawn(async move {
194 tx.send(()).unwrap();
195 });
196
197 // Spawn a task that bumps the previous task out of the "next
198 // scheduled" slot.
199 tokio::spawn(async {});
200
201 // Blocking receive on the channel.
202 rx.recv().unwrap();
203 })
204 .await
205 .unwrap();
206 });
207
208 drop(rt);
209
210 let n: u64 = (0..metrics.num_workers())
211 .map(|i| metrics.worker_steal_count(i))
212 .sum();
213
214 assert_eq!(1, n);
215 }
216
217 #[test]
worker_poll_count_and_time()218 fn worker_poll_count_and_time() {
219 const N: u64 = 5;
220
221 async fn task() {
222 // Sync sleep
223 std::thread::sleep(std::time::Duration::from_micros(10));
224 }
225
226 let rt = current_thread();
227 let metrics = rt.metrics();
228 rt.block_on(async {
229 for _ in 0..N {
230 tokio::spawn(task()).await.unwrap();
231 }
232 });
233 drop(rt);
234 assert_eq!(N, metrics.worker_poll_count(0));
235 // Not currently supported for current-thread runtime
236 assert_eq!(Duration::default(), metrics.worker_mean_poll_time(0));
237
238 // Does not populate the histogram
239 assert!(!metrics.poll_count_histogram_enabled());
240 for i in 0..10 {
241 assert_eq!(0, metrics.poll_count_histogram_bucket_count(0, i));
242 }
243
244 let rt = threaded();
245 let metrics = rt.metrics();
246 rt.block_on(async {
247 for _ in 0..N {
248 tokio::spawn(task()).await.unwrap();
249 }
250 });
251 drop(rt);
252 // Account for the `block_on` task
253 let n = (0..metrics.num_workers())
254 .map(|i| metrics.worker_poll_count(i))
255 .sum();
256
257 assert_eq!(N, n);
258
259 let n: Duration = (0..metrics.num_workers())
260 .map(|i| metrics.worker_mean_poll_time(i))
261 .sum();
262
263 assert!(n > Duration::default());
264
265 // Does not populate the histogram
266 assert!(!metrics.poll_count_histogram_enabled());
267 for n in 0..metrics.num_workers() {
268 for i in 0..10 {
269 assert_eq!(0, metrics.poll_count_histogram_bucket_count(n, i));
270 }
271 }
272 }
273
274 #[test]
worker_poll_count_histogram()275 fn worker_poll_count_histogram() {
276 const N: u64 = 5;
277
278 let rts = [
279 tokio::runtime::Builder::new_current_thread()
280 .enable_all()
281 .enable_metrics_poll_count_histogram()
282 .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
283 .metrics_poll_count_histogram_buckets(3)
284 .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
285 .build()
286 .unwrap(),
287 tokio::runtime::Builder::new_multi_thread()
288 .worker_threads(2)
289 .enable_all()
290 .enable_metrics_poll_count_histogram()
291 .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
292 .metrics_poll_count_histogram_buckets(3)
293 .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
294 .build()
295 .unwrap(),
296 ];
297
298 for rt in rts {
299 let metrics = rt.metrics();
300 rt.block_on(async {
301 for _ in 0..N {
302 tokio::spawn(async {}).await.unwrap();
303 }
304 });
305 drop(rt);
306
307 let num_workers = metrics.num_workers();
308 let num_buckets = metrics.poll_count_histogram_num_buckets();
309
310 assert!(metrics.poll_count_histogram_enabled());
311 assert_eq!(num_buckets, 3);
312
313 let n = (0..num_workers)
314 .flat_map(|i| (0..num_buckets).map(move |j| (i, j)))
315 .map(|(worker, bucket)| metrics.poll_count_histogram_bucket_count(worker, bucket))
316 .sum();
317 assert_eq!(N, n);
318 }
319 }
320
321 #[test]
worker_poll_count_histogram_range()322 fn worker_poll_count_histogram_range() {
323 let max = Duration::from_nanos(u64::MAX);
324
325 let rt = tokio::runtime::Builder::new_current_thread()
326 .enable_all()
327 .enable_metrics_poll_count_histogram()
328 .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
329 .metrics_poll_count_histogram_buckets(3)
330 .metrics_poll_count_histogram_resolution(us(50))
331 .build()
332 .unwrap();
333 let metrics = rt.metrics();
334
335 assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..us(50));
336 assert_eq!(
337 metrics.poll_count_histogram_bucket_range(1),
338 us(50)..us(100)
339 );
340 assert_eq!(metrics.poll_count_histogram_bucket_range(2), us(100)..max);
341
342 let rt = tokio::runtime::Builder::new_current_thread()
343 .enable_all()
344 .enable_metrics_poll_count_histogram()
345 .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Log)
346 .metrics_poll_count_histogram_buckets(3)
347 .metrics_poll_count_histogram_resolution(us(50))
348 .build()
349 .unwrap();
350 let metrics = rt.metrics();
351
352 let a = Duration::from_nanos(50000_u64.next_power_of_two());
353 let b = a * 2;
354
355 assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..a);
356 assert_eq!(metrics.poll_count_histogram_bucket_range(1), a..b);
357 assert_eq!(metrics.poll_count_histogram_bucket_range(2), b..max);
358 }
359
360 #[test]
worker_poll_count_histogram_disabled_without_explicit_enable()361 fn worker_poll_count_histogram_disabled_without_explicit_enable() {
362 let rts = [
363 tokio::runtime::Builder::new_current_thread()
364 .enable_all()
365 .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
366 .metrics_poll_count_histogram_buckets(3)
367 .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
368 .build()
369 .unwrap(),
370 tokio::runtime::Builder::new_multi_thread()
371 .worker_threads(2)
372 .enable_all()
373 .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
374 .metrics_poll_count_histogram_buckets(3)
375 .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
376 .build()
377 .unwrap(),
378 ];
379
380 for rt in rts {
381 let metrics = rt.metrics();
382 assert!(!metrics.poll_count_histogram_enabled());
383 }
384 }
385
386 #[test]
worker_total_busy_duration()387 fn worker_total_busy_duration() {
388 const N: usize = 5;
389
390 let zero = Duration::from_millis(0);
391
392 let rt = current_thread();
393 let metrics = rt.metrics();
394
395 rt.block_on(async {
396 for _ in 0..N {
397 tokio::spawn(async {
398 tokio::task::yield_now().await;
399 })
400 .await
401 .unwrap();
402 }
403 });
404
405 drop(rt);
406
407 assert!(zero < metrics.worker_total_busy_duration(0));
408
409 let rt = threaded();
410 let metrics = rt.metrics();
411
412 rt.block_on(async {
413 for _ in 0..N {
414 tokio::spawn(async {
415 tokio::task::yield_now().await;
416 })
417 .await
418 .unwrap();
419 }
420 });
421
422 drop(rt);
423
424 for i in 0..metrics.num_workers() {
425 assert!(zero < metrics.worker_total_busy_duration(i));
426 }
427 }
428
429 #[test]
worker_local_schedule_count()430 fn worker_local_schedule_count() {
431 let rt = current_thread();
432 let metrics = rt.metrics();
433 rt.block_on(async {
434 tokio::spawn(async {}).await.unwrap();
435 });
436 drop(rt);
437
438 assert_eq!(1, metrics.worker_local_schedule_count(0));
439 assert_eq!(0, metrics.remote_schedule_count());
440
441 let rt = threaded();
442 let metrics = rt.metrics();
443 rt.block_on(async {
444 // Move to the runtime
445 tokio::spawn(async {
446 tokio::spawn(async {}).await.unwrap();
447 })
448 .await
449 .unwrap();
450 });
451 drop(rt);
452
453 let n: u64 = (0..metrics.num_workers())
454 .map(|i| metrics.worker_local_schedule_count(i))
455 .sum();
456
457 assert_eq!(2, n);
458 assert_eq!(1, metrics.remote_schedule_count());
459 }
460
461 #[test]
worker_overflow_count()462 fn worker_overflow_count() {
463 // Only applies to the threaded worker
464 let rt = threaded();
465 let metrics = rt.metrics();
466 rt.block_on(async {
467 // Move to the runtime
468 tokio::spawn(async {
469 let (tx1, rx1) = std::sync::mpsc::channel();
470 let (tx2, rx2) = std::sync::mpsc::channel();
471
472 // First, we need to block the other worker until all tasks have
473 // been spawned.
474 tokio::spawn(async move {
475 tx1.send(()).unwrap();
476 rx2.recv().unwrap();
477 });
478
479 // Bump the next-run spawn
480 tokio::spawn(async {});
481
482 rx1.recv().unwrap();
483
484 // Spawn many tasks
485 for _ in 0..300 {
486 tokio::spawn(async {});
487 }
488
489 tx2.send(()).unwrap();
490 })
491 .await
492 .unwrap();
493 });
494 drop(rt);
495
496 let n: u64 = (0..metrics.num_workers())
497 .map(|i| metrics.worker_overflow_count(i))
498 .sum();
499
500 assert_eq!(1, n);
501 }
502
503 #[test]
injection_queue_depth()504 fn injection_queue_depth() {
505 use std::thread;
506
507 let rt = current_thread();
508 let handle = rt.handle().clone();
509 let metrics = rt.metrics();
510
511 thread::spawn(move || {
512 handle.spawn(async {});
513 })
514 .join()
515 .unwrap();
516
517 assert_eq!(1, metrics.injection_queue_depth());
518
519 let rt = threaded();
520 let handle = rt.handle().clone();
521 let metrics = rt.metrics();
522
523 // First we need to block the runtime workers
524 let (tx1, rx1) = std::sync::mpsc::channel();
525 let (tx2, rx2) = std::sync::mpsc::channel();
526 let (tx3, rx3) = std::sync::mpsc::channel();
527 let rx3 = Arc::new(Mutex::new(rx3));
528
529 rt.spawn(async move { rx1.recv().unwrap() });
530 rt.spawn(async move { rx2.recv().unwrap() });
531
532 // Spawn some more to make sure there are items
533 for _ in 0..10 {
534 let rx = rx3.clone();
535 rt.spawn(async move {
536 rx.lock().unwrap().recv().unwrap();
537 });
538 }
539
540 thread::spawn(move || {
541 handle.spawn(async {});
542 })
543 .join()
544 .unwrap();
545
546 let n = metrics.injection_queue_depth();
547 assert!(1 <= n, "{}", n);
548 assert!(15 >= n, "{}", n);
549
550 for _ in 0..10 {
551 tx3.send(()).unwrap();
552 }
553
554 tx1.send(()).unwrap();
555 tx2.send(()).unwrap();
556 }
557
558 #[test]
worker_local_queue_depth()559 fn worker_local_queue_depth() {
560 const N: usize = 100;
561
562 let rt = current_thread();
563 let metrics = rt.metrics();
564 rt.block_on(async {
565 for _ in 0..N {
566 tokio::spawn(async {});
567 }
568
569 assert_eq!(N, metrics.worker_local_queue_depth(0));
570 });
571
572 let rt = threaded();
573 let metrics = rt.metrics();
574 rt.block_on(async move {
575 // Move to the runtime
576 tokio::spawn(async move {
577 let (tx1, rx1) = std::sync::mpsc::channel();
578 let (tx2, rx2) = std::sync::mpsc::channel();
579
580 // First, we need to block the other worker until all tasks have
581 // been spawned.
582 tokio::spawn(async move {
583 tx1.send(()).unwrap();
584 rx2.recv().unwrap();
585 });
586
587 // Bump the next-run spawn
588 tokio::spawn(async {});
589
590 rx1.recv().unwrap();
591
592 // Spawn some tasks
593 for _ in 0..100 {
594 tokio::spawn(async {});
595 }
596
597 let n: usize = (0..metrics.num_workers())
598 .map(|i| metrics.worker_local_queue_depth(i))
599 .sum();
600
601 assert_eq!(n, N);
602
603 tx2.send(()).unwrap();
604 })
605 .await
606 .unwrap();
607 });
608 }
609
610 #[test]
budget_exhaustion_yield()611 fn budget_exhaustion_yield() {
612 let rt = current_thread();
613 let metrics = rt.metrics();
614
615 assert_eq!(0, metrics.budget_forced_yield_count());
616
617 let mut did_yield = false;
618
619 // block on a task which consumes budget until it yields
620 rt.block_on(poll_fn(|cx| loop {
621 if did_yield {
622 return Poll::Ready(());
623 }
624
625 let fut = consume_budget();
626 tokio::pin!(fut);
627
628 if fut.poll(cx).is_pending() {
629 did_yield = true;
630 return Poll::Pending;
631 }
632 }));
633
634 assert_eq!(1, rt.metrics().budget_forced_yield_count());
635 }
636
637 #[test]
budget_exhaustion_yield_with_joins()638 fn budget_exhaustion_yield_with_joins() {
639 let rt = current_thread();
640 let metrics = rt.metrics();
641
642 assert_eq!(0, metrics.budget_forced_yield_count());
643
644 let mut did_yield_1 = false;
645 let mut did_yield_2 = false;
646
647 // block on a task which consumes budget until it yields
648 rt.block_on(async {
649 tokio::join!(
650 poll_fn(|cx| loop {
651 if did_yield_1 {
652 return Poll::Ready(());
653 }
654
655 let fut = consume_budget();
656 tokio::pin!(fut);
657
658 if fut.poll(cx).is_pending() {
659 did_yield_1 = true;
660 return Poll::Pending;
661 }
662 }),
663 poll_fn(|cx| loop {
664 if did_yield_2 {
665 return Poll::Ready(());
666 }
667
668 let fut = consume_budget();
669 tokio::pin!(fut);
670
671 if fut.poll(cx).is_pending() {
672 did_yield_2 = true;
673 return Poll::Pending;
674 }
675 })
676 )
677 });
678
679 assert_eq!(1, rt.metrics().budget_forced_yield_count());
680 }
681
682 #[cfg(any(target_os = "linux", target_os = "macos"))]
683 #[test]
io_driver_fd_count()684 fn io_driver_fd_count() {
685 let rt = current_thread();
686 let metrics = rt.metrics();
687
688 assert_eq!(metrics.io_driver_fd_registered_count(), 0);
689
690 let stream = tokio::net::TcpStream::connect("google.com:80");
691 let stream = rt.block_on(async move { stream.await.unwrap() });
692
693 assert_eq!(metrics.io_driver_fd_registered_count(), 1);
694 assert_eq!(metrics.io_driver_fd_deregistered_count(), 0);
695
696 drop(stream);
697
698 assert_eq!(metrics.io_driver_fd_deregistered_count(), 1);
699 assert_eq!(metrics.io_driver_fd_registered_count(), 1);
700 }
701
702 #[cfg(any(target_os = "linux", target_os = "macos"))]
703 #[test]
io_driver_ready_count()704 fn io_driver_ready_count() {
705 let rt = current_thread();
706 let metrics = rt.metrics();
707
708 let stream = tokio::net::TcpStream::connect("google.com:80");
709 let _stream = rt.block_on(async move { stream.await.unwrap() });
710
711 assert_eq!(metrics.io_driver_ready_count(), 1);
712 }
713
current_thread() -> Runtime714 fn current_thread() -> Runtime {
715 tokio::runtime::Builder::new_current_thread()
716 .enable_all()
717 .build()
718 .unwrap()
719 }
720
threaded() -> Runtime721 fn threaded() -> Runtime {
722 tokio::runtime::Builder::new_multi_thread()
723 .worker_threads(2)
724 .enable_all()
725 .build()
726 .unwrap()
727 }
728
us(n: u64) -> Duration729 fn us(n: u64) -> Duration {
730 Duration::from_micros(n)
731 }
732